Bases de Données / Databases

Site Web de l'équipe BD du LIP6 / LIP6 DB Web Site

User Tools

Site Tools


en:site:recherche:logiciels:sparqlwithspark:watdivs1

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Next revision
Previous revision
en:site:recherche:logiciels:sparqlwithspark:watdivs1 [14/09/2016 14:40]
hubert created
en:site:recherche:logiciels:sparqlwithspark:watdivs1 [16/09/2016 23:06] (current)
hubert [WatDiv Query S1 plans]
Line 1: Line 1:
-====== WatDiv Query S1 ======+{{indexmenu_n>​2}} 
 + 
 +====== WatDiv Query S1 plans====== 
 + 
 +=== SPARQL DF plan === 
 + 
 +<code scala> 
 +// random partitioning 
 +val DATA = dfDefault 
 + 
 +val t1 = DATA.where(s"​(p=$idOffers and s=$retailer)"​).select("​o"​).withColumnRenamed("​o","​s"​) 
 +val e1 = sc.parallelize(1 to NB_FRAGMENTS,​ NB_FRAGMENTS).map(x => -1).toDF("​s"​) 
 +val t1OK = t1.unionAll(e1) 
 +var plan = t1OK 
 + 
 + 
 +// ordered by increasing triple tp size 
 +val orderedProp = List( 
 +  ("​sorg",​ "​priceValidUntil"​),​ 
 +  ("​gr",​ "​validFrom"​),​  
 +  ("​gr",​ "​validThrough"​),​  
 +  ("​gr",​ "​includes"​),​ 
 +  ("​gr",​ "​serialNumber"​),​  
 +  ("​sorg",​ "​eligibleQuantity"​),​  
 +  ("​sorg",​ "​eligibleRegion"​),​  
 +  ("​gr",​ "​price"​)) 
 + 
 +val triples = orderedProp.map{case(ns,​ p) => { 
 +  val idP = getIdP(ns, p) 
 +  DATA.where(s"​p=$idP"​).select("​s","​o"​).withColumnRenamed("​o",​ s"​o$idP"​) 
 +}} 
 + 
 +// next triples 
 +for( i <- triples) { 
 +  plan = plan.join(i,​ "​s"​) 
 +
 + 
 + 
 +// Execute query plan for S1 
 +//​---------------------------- 
 +queryTimeDFIter(plan,​ 10) 
 +// TIME=18.6s ​    ​INPUT=126GB ​     SHFR=484MB  
 +</​code>​ 
 + 
 +=== SPARQL Hybrid DF plan === 
 + 
 +<code scala> 
 + 
 +val subset = df.where(s"​(p=51 and s=$retailer) or p in (3,​9,​38,​40,​56,​57,​63,​69)"​).persist 
 +subset.count ​ // 78 413 119 
 +// Merging time=4,885s  
 + 
 +val DATA = subset 
 + 
 +val t1 = DATA.where(s"​(p=$idOffers and s=$retailer)"​).select("​o"​).withColumnRenamed("​o","​s"​) 
 +val e1 = sc.parallelize(1 to NB_FRAGMENTS,​ NB_FRAGMENTS).map(x => -1).toDF("​s"​) 
 +val t1OK = t1.unionAll(e1) 
 +var plan = t1OK 
 + 
 + 
 +// ordered by increasing triple tp size 
 +val orderedProp = List( 
 +  ("​sorg",​ "​priceValidUntil"​),​ 
 +  ("​gr",​ "​validFrom"​),​  
 +  ("​gr",​ "​validThrough"​),​  
 +  ("​gr",​ "​includes"​),​ 
 +  ("​gr",​ "​serialNumber"​),​  
 +  ("​sorg",​ "​eligibleQuantity"​),​  
 +  ("​sorg",​ "​eligibleRegion"​),​  
 +  ("​gr",​ "​price"​)) 
 + 
 +val triples = orderedProp.map{case(ns,​ p) => { 
 +  val idP = getIdP(ns, p) 
 +  DATA.where(s"​p=$idP"​).select("​s","​o"​).withColumnRenamed("​o",​ s"​o$idP"​) 
 +}} 
 + 
 +// next triples 
 +for( i <- triples) { 
 +  plan = plan.join(i,​ "​s"​) 
 +
 + 
 + 
 +// Execute query plan for S1 
 +//​---------------------------- 
 +queryTimeDFIter(plan,​ 10) 
 +// 2,87 + 4,885 = 7,76s   ​INPUT=14+6.2=20,​2GB SHFR=32KB  
 +</​code>​ 
 + 
 + 
 + 
 +=== S2RDF plan === 
 + 
 +<code scala> 
 +val VP2EXP=VP2Random
  
-<​code>​ 
 //triple patterns joining on their subject //triple patterns joining on their subject
 val tp = List( val tp = List(
Line 28: Line 120:
 } }
 queryTimeDFIter(plan,​ 10)  ​ queryTimeDFIter(plan,​ 10)  ​
 +//​TIME=2.106s ​ INPUT=1,​2GB ​  ​SHFR=484MB  ​
 +</​code>​
  
 +=== S2RDF+Hybrid plan ===
 +
 +<code scala>
 +// VP's partitioned by subject
 +val VP2EXP=VP2Subject
 +
 +//triple patterns joining on their subject
 +val tp = List(
 +  ("​gr",​ "​includes"​),​
 +  ("​gr",​ "​price"​), ​
 +  ("​gr",​ "​serialNumber"​), ​
 +  ("​gr",​ "​validFrom"​), ​
 +  ("​gr",​ "​validThrough"​), ​
 +  ("​sorg",​ "​eligibleQuantity"​), ​
 +  ("​sorg",​ "​eligibleRegion"​), ​
 +  ("​sorg",​ "​priceValidUntil"​))
 +
 +val tpSize = tp.map{case(ns,​ p) => (p, getIdP(ns,​p),​ VP2Size.get(getIdP(ns,​ p)).get)}.sortBy{case (p, idp, size)=> size}
 +val selections = tpSize.map{case (_, idP, _)=> VP2EXP(idP).withColumnRenamed("​o",​ s"​o$idP"​)}
 +
 +/* first triple pattern */
 +val retailer = getIdSO("​wsdbm",​ "​Retailer3"​)
 +val sel1 = VP2EXP(getIdP("​gr",​ "​offers"​)).where(s"​s=$retailer"​).select("​o"​).withColumnRenamed("​o","​s"​)
 +/* step required for Spark to preserve PROCESS_LOCAL locality level during the first join operation */
 +val e1 = sc.parallelize(1 to NB_FRAGMENTS,​ NB_FRAGMENTS).map(x => -1).toDF("​s"​)
 +val sel1a = t1.unionAll(e1)
 +var plan = sel1a
 +/* next triple patterns */
 +for( i<-sel1) {
 +  plan = plan.join(i,​ "​s"​)
 +}
 +queryTimeDFIter(plan,​ 10)  ​
 +//​TIME=2.106 ​ INPUT=100+60=160MB ​  ​SHFR=38KB ​  
 </​code>​ </​code>​
 +
 +Go to [[en:​site:​recherche:​logiciels:​sparqlwithspark]]
 +
en/site/recherche/logiciels/sparqlwithspark/watdivs1.1473856830.txt.gz · Last modified: 14/09/2016 14:40 by hubert