WatDiv Query S1 plans
SPARQL DF plan
// random partitioning
val DATA = dfDefault
val t1 = DATA.where(s"(p=$idOffers and s=$retailer)").select("o").withColumnRenamed("o","s")
val e1 = sc.parallelize(1 to NB_FRAGMENTS, NB_FRAGMENTS).map(x => -1).toDF("s")
val t1OK = t1.unionAll(e1)
var plan = t1OK
// ordered by increasing triple tp size
val orderedProp = List(
("sorg", "priceValidUntil"),
("gr", "validFrom"),
("gr", "validThrough"),
("gr", "includes"),
("gr", "serialNumber"),
("sorg", "eligibleQuantity"),
("sorg", "eligibleRegion"),
("gr", "price"))
val triples = orderedProp.map{case(ns, p) => {
val idP = getIdP(ns, p)
DATA.where(s"p=$idP").select("s","o").withColumnRenamed("o", s"o$idP")
}}
// next triples
for( i <- triples) {
plan = plan.join(i, "s")
}
// Execute query plan for S1
//----------------------------
queryTimeDFIter(plan, 10)
// TIME=18.6s INPUT=126GB SHFR=484MB
SPARQL Hybrid DF plan
val subset = df.where(s"(p=51 and s=$retailer) or p in (3,9,38,40,56,57,63,69)").persist
subset.count // 78 413 119
// Merging time=4,885s
val DATA = subset
val t1 = DATA.where(s"(p=$idOffers and s=$retailer)").select("o").withColumnRenamed("o","s")
val e1 = sc.parallelize(1 to NB_FRAGMENTS, NB_FRAGMENTS).map(x => -1).toDF("s")
val t1OK = t1.unionAll(e1)
var plan = t1OK
// ordered by increasing triple tp size
val orderedProp = List(
("sorg", "priceValidUntil"),
("gr", "validFrom"),
("gr", "validThrough"),
("gr", "includes"),
("gr", "serialNumber"),
("sorg", "eligibleQuantity"),
("sorg", "eligibleRegion"),
("gr", "price"))
val triples = orderedProp.map{case(ns, p) => {
val idP = getIdP(ns, p)
DATA.where(s"p=$idP").select("s","o").withColumnRenamed("o", s"o$idP")
}}
// next triples
for( i <- triples) {
plan = plan.join(i, "s")
}
// Execute query plan for S1
//----------------------------
queryTimeDFIter(plan, 10)
// 2,87 + 4,885 = 7,76s INPUT=14+6.2=20,2GB SHFR=32KB
S2RDF plan
val VP2EXP=VP2Random
//triple patterns joining on their subject
val tp = List(
("gr", "includes"),
("gr", "price"),
("gr", "serialNumber"),
("gr", "validFrom"),
("gr", "validThrough"),
("sorg", "eligibleQuantity"),
("sorg", "eligibleRegion"),
("sorg", "priceValidUntil"))
val tpSize = tp.map{case(ns, p) => (p, getIdP(ns,p), VP2Size.get(getIdP(ns, p)).get)}.sortBy{case (p, idp, size)=> size}
val selections = tpSize.map{case (_, idP, _)=> VP2EXP(idP).withColumnRenamed("o", s"o$idP")}
/* first triple pattern */
val retailer = getIdSO("wsdbm", "Retailer3")
val sel1 = VP2EXP(getIdP("gr", "offers")).where(s"s=$retailer").select("o").withColumnRenamed("o","s")
/* step required for Spark to preserve PROCESS_LOCAL locality level during the first join operation */
val e1 = sc.parallelize(1 to NB_FRAGMENTS, NB_FRAGMENTS).map(x => -1).toDF("s")
val sel1a = t1.unionAll(e1)
var plan = sel1a
/* next triple patterns */
for( i<-sel1) {
plan = plan.join(i, "s")
}
queryTimeDFIter(plan, 10)
//TIME=2.106s INPUT=1,2GB SHFR=484MB
S2RDF+Hybrid plan
// VP's partitioned by subject
val VP2EXP=VP2Subject
//triple patterns joining on their subject
val tp = List(
("gr", "includes"),
("gr", "price"),
("gr", "serialNumber"),
("gr", "validFrom"),
("gr", "validThrough"),
("sorg", "eligibleQuantity"),
("sorg", "eligibleRegion"),
("sorg", "priceValidUntil"))
val tpSize = tp.map{case(ns, p) => (p, getIdP(ns,p), VP2Size.get(getIdP(ns, p)).get)}.sortBy{case (p, idp, size)=> size}
val selections = tpSize.map{case (_, idP, _)=> VP2EXP(idP).withColumnRenamed("o", s"o$idP")}
/* first triple pattern */
val retailer = getIdSO("wsdbm", "Retailer3")
val sel1 = VP2EXP(getIdP("gr", "offers")).where(s"s=$retailer").select("o").withColumnRenamed("o","s")
/* step required for Spark to preserve PROCESS_LOCAL locality level during the first join operation */
val e1 = sc.parallelize(1 to NB_FRAGMENTS, NB_FRAGMENTS).map(x => -1).toDF("s")
val sel1a = t1.unionAll(e1)
var plan = sel1a
/* next triple patterns */
for( i<-sel1) {
plan = plan.join(i, "s")
}
queryTimeDFIter(plan, 10)
//TIME=2.106 INPUT=100+60=160MB SHFR=38KB
Go to SPARQL query processing with Apache Spark