Table of Contents

WatDiv Query S1 plans

SPARQL DF plan

// random partitioning
val DATA = dfDefault
 
val t1 = DATA.where(s"(p=$idOffers and s=$retailer)").select("o").withColumnRenamed("o","s")
val e1 = sc.parallelize(1 to NB_FRAGMENTS, NB_FRAGMENTS).map(x => -1).toDF("s")
val t1OK = t1.unionAll(e1)
var plan = t1OK
 
 
// ordered by increasing triple tp size
val orderedProp = List(
  ("sorg", "priceValidUntil"),
  ("gr", "validFrom"), 
  ("gr", "validThrough"), 
  ("gr", "includes"),
  ("gr", "serialNumber"), 
  ("sorg", "eligibleQuantity"), 
  ("sorg", "eligibleRegion"), 
  ("gr", "price"))
 
val triples = orderedProp.map{case(ns, p) => {
  val idP = getIdP(ns, p)
  DATA.where(s"p=$idP").select("s","o").withColumnRenamed("o", s"o$idP")
}}
 
// next triples
for( i <- triples) {
  plan = plan.join(i, "s")
}
 
 
// Execute query plan for S1
//----------------------------
queryTimeDFIter(plan, 10)
// TIME=18.6s     INPUT=126GB      SHFR=484MB 

SPARQL Hybrid DF plan

val subset = df.where(s"(p=51 and s=$retailer) or p in (3,9,38,40,56,57,63,69)").persist
subset.count  // 78 413 119
// Merging time=4,885s 
 
val DATA = subset
 
val t1 = DATA.where(s"(p=$idOffers and s=$retailer)").select("o").withColumnRenamed("o","s")
val e1 = sc.parallelize(1 to NB_FRAGMENTS, NB_FRAGMENTS).map(x => -1).toDF("s")
val t1OK = t1.unionAll(e1)
var plan = t1OK
 
 
// ordered by increasing triple tp size
val orderedProp = List(
  ("sorg", "priceValidUntil"),
  ("gr", "validFrom"), 
  ("gr", "validThrough"), 
  ("gr", "includes"),
  ("gr", "serialNumber"), 
  ("sorg", "eligibleQuantity"), 
  ("sorg", "eligibleRegion"), 
  ("gr", "price"))
 
val triples = orderedProp.map{case(ns, p) => {
  val idP = getIdP(ns, p)
  DATA.where(s"p=$idP").select("s","o").withColumnRenamed("o", s"o$idP")
}}
 
// next triples
for( i <- triples) {
  plan = plan.join(i, "s")
}
 
 
// Execute query plan for S1
//----------------------------
queryTimeDFIter(plan, 10)
// 2,87 + 4,885 = 7,76s   INPUT=14+6.2=20,2GB SHFR=32KB 

S2RDF plan

val VP2EXP=VP2Random
 
//triple patterns joining on their subject
val tp = List(
  ("gr", "includes"),
  ("gr", "price"), 
  ("gr", "serialNumber"), 
  ("gr", "validFrom"), 
  ("gr", "validThrough"), 
  ("sorg", "eligibleQuantity"), 
  ("sorg", "eligibleRegion"), 
  ("sorg", "priceValidUntil"))
 
val tpSize = tp.map{case(ns, p) => (p, getIdP(ns,p), VP2Size.get(getIdP(ns, p)).get)}.sortBy{case (p, idp, size)=> size}
val selections = tpSize.map{case (_, idP, _)=> VP2EXP(idP).withColumnRenamed("o", s"o$idP")}
 
/* first triple pattern */
val retailer = getIdSO("wsdbm", "Retailer3")
val sel1 = VP2EXP(getIdP("gr", "offers")).where(s"s=$retailer").select("o").withColumnRenamed("o","s")
/* step required for Spark to preserve PROCESS_LOCAL locality level during the first join operation */
val e1 = sc.parallelize(1 to NB_FRAGMENTS, NB_FRAGMENTS).map(x => -1).toDF("s")
val sel1a = t1.unionAll(e1)
var plan = sel1a
/* next triple patterns */
for( i<-sel1) {
  plan = plan.join(i, "s")
}
queryTimeDFIter(plan, 10)  
//TIME=2.106s  INPUT=1,2GB   SHFR=484MB  

S2RDF+Hybrid plan

// VP's partitioned by subject
val VP2EXP=VP2Subject
 
//triple patterns joining on their subject
val tp = List(
  ("gr", "includes"),
  ("gr", "price"), 
  ("gr", "serialNumber"), 
  ("gr", "validFrom"), 
  ("gr", "validThrough"), 
  ("sorg", "eligibleQuantity"), 
  ("sorg", "eligibleRegion"), 
  ("sorg", "priceValidUntil"))
 
val tpSize = tp.map{case(ns, p) => (p, getIdP(ns,p), VP2Size.get(getIdP(ns, p)).get)}.sortBy{case (p, idp, size)=> size}
val selections = tpSize.map{case (_, idP, _)=> VP2EXP(idP).withColumnRenamed("o", s"o$idP")}
 
/* first triple pattern */
val retailer = getIdSO("wsdbm", "Retailer3")
val sel1 = VP2EXP(getIdP("gr", "offers")).where(s"s=$retailer").select("o").withColumnRenamed("o","s")
/* step required for Spark to preserve PROCESS_LOCAL locality level during the first join operation */
val e1 = sc.parallelize(1 to NB_FRAGMENTS, NB_FRAGMENTS).map(x => -1).toDF("s")
val sel1a = t1.unionAll(e1)
var plan = sel1a
/* next triple patterns */
for( i<-sel1) {
  plan = plan.join(i, "s")
}
queryTimeDFIter(plan, 10)  
//TIME=2.106  INPUT=100+60=160MB   SHFR=38KB   

Go to SPARQL query processing with Apache Spark