Bases de Données / Databases

Site Web de l'équipe BD du LIP6 / LIP6 DB Web Site

User Tools

Site Tools


en:site:recherche:logiciels:sparqlwithspark:watdivs1

This is an old revision of the document!


WatDiv Query S1

SPARQL DF plan

// random partitioning
val DATA = dfDefault

val t1 = DATA.where(s"(p=$idOffers and s=$retailer)").select("o").withColumnRenamed("o","s")
val e1 = sc.parallelize(1 to NB_FRAGMENTS, NB_FRAGMENTS).map(x => -1).toDF("s")
val t1OK = t1.unionAll(e1)
var plan = t1OK


// ordered by increasing triple tp size
val orderedProp = List(
  ("sorg", "priceValidUntil"),
  ("gr", "validFrom"), 
  ("gr", "validThrough"), 
  ("gr", "includes"),
  ("gr", "serialNumber"), 
  ("sorg", "eligibleQuantity"), 
  ("sorg", "eligibleRegion"), 
  ("gr", "price"))

val triples = orderedProp.map{case(ns, p) => {
  val idP = getIdP(ns, p)
  DATA.where(s"p=$idP").select("s","o").withColumnRenamed("o", s"o$idP")
}}

// next triples
for( i <- triples) {
  plan = plan.join(i, "s")
}


// Execute query plan for S1
//----------------------------
queryTimeDFIter(plan, 10)
// TIME=18.6s     INPUT=126GB      SHFR=484MB 

SPARQL Hybrid DF plan

val subset = df.where(s"(p=51 and s=$retailer) or p in (3,9,38,40,56,57,63,69)").persist
subset.count  // 78 413 119
// Merging time=4,885s 

val DATA = subset

val t1 = DATA.where(s"(p=$idOffers and s=$retailer)").select("o").withColumnRenamed("o","s")
val e1 = sc.parallelize(1 to NB_FRAGMENTS, NB_FRAGMENTS).map(x => -1).toDF("s")
val t1OK = t1.unionAll(e1)
var plan = t1OK


// ordered by increasing triple tp size
val orderedProp = List(
  ("sorg", "priceValidUntil"),
  ("gr", "validFrom"), 
  ("gr", "validThrough"), 
  ("gr", "includes"),
  ("gr", "serialNumber"), 
  ("sorg", "eligibleQuantity"), 
  ("sorg", "eligibleRegion"), 
  ("gr", "price"))

val triples = orderedProp.map{case(ns, p) => {
  val idP = getIdP(ns, p)
  DATA.where(s"p=$idP").select("s","o").withColumnRenamed("o", s"o$idP")
}}

// next triples
for( i <- triples) {
  plan = plan.join(i, "s")
}


// Execute query plan for S1
//----------------------------
queryTimeDFIter(plan, 10)
// 2,87 + 4,885 = 7,76s   INPUT=14+6.2=20,2GB SHFR=32KB 

S2RDF plan

val VP2EXP=VP2Random

//triple patterns joining on their subject
val tp = List(
  ("gr", "includes"),
  ("gr", "price"), 
  ("gr", "serialNumber"), 
  ("gr", "validFrom"), 
  ("gr", "validThrough"), 
  ("sorg", "eligibleQuantity"), 
  ("sorg", "eligibleRegion"), 
  ("sorg", "priceValidUntil"))

val tpSize = tp.map{case(ns, p) => (p, getIdP(ns,p), VP2Size.get(getIdP(ns, p)).get)}.sortBy{case (p, idp, size)=> size}
val selections = tpSize.map{case (_, idP, _)=> VP2EXP(idP).withColumnRenamed("o", s"o$idP")}

/* first triple pattern */
val retailer = getIdSO("wsdbm", "Retailer3")
val sel1 = VP2EXP(getIdP("gr", "offers")).where(s"s=$retailer").select("o").withColumnRenamed("o","s")
/* step required for Spark to preserve PROCESS_LOCAL locality level during the first join operation */
val e1 = sc.parallelize(1 to NB_FRAGMENTS, NB_FRAGMENTS).map(x => -1).toDF("s")
val sel1a = t1.unionAll(e1)
var plan = sel1a
/* next triple patterns */
for( i<-sel1) {
  plan = plan.join(i, "s")
}
queryTimeDFIter(plan, 10)  
//TIME=2.106s  INPUT=1,2GB   SHFR=484MB  

S2RDF+Hybrid plan

// VP's partitioned by subject
val VP2EXP=VP2Subject

//triple patterns joining on their subject
val tp = List(
  ("gr", "includes"),
  ("gr", "price"), 
  ("gr", "serialNumber"), 
  ("gr", "validFrom"), 
  ("gr", "validThrough"), 
  ("sorg", "eligibleQuantity"), 
  ("sorg", "eligibleRegion"), 
  ("sorg", "priceValidUntil"))

val tpSize = tp.map{case(ns, p) => (p, getIdP(ns,p), VP2Size.get(getIdP(ns, p)).get)}.sortBy{case (p, idp, size)=> size}
val selections = tpSize.map{case (_, idP, _)=> VP2EXP(idP).withColumnRenamed("o", s"o$idP")}

/* first triple pattern */
val retailer = getIdSO("wsdbm", "Retailer3")
val sel1 = VP2EXP(getIdP("gr", "offers")).where(s"s=$retailer").select("o").withColumnRenamed("o","s")
/* step required for Spark to preserve PROCESS_LOCAL locality level during the first join operation */
val e1 = sc.parallelize(1 to NB_FRAGMENTS, NB_FRAGMENTS).map(x => -1).toDF("s")
val sel1a = t1.unionAll(e1)
var plan = sel1a
/* next triple patterns */
for( i<-sel1) {
  plan = plan.join(i, "s")
}
queryTimeDFIter(plan, 10)  
//TIME=2.106  INPUT=100+60=160MB   SHFR=38KB   

Go to SPARQL query processing with Apache Spark

en/site/recherche/logiciels/sparqlwithspark/watdivs1.1473926443.txt.gz · Last modified: 15/09/2016 10:00 by hubert