This shows you the differences between two versions of the page.
Both sides previous revision Previous revision Next revision | Previous revision | ||
en:site:recherche:logiciels:sparqlwithspark:watdivs1 [15/09/2016 09:51] hubert |
en:site:recherche:logiciels:sparqlwithspark:watdivs1 [16/09/2016 23:06] (current) hubert [WatDiv Query S1 plans] |
||
---|---|---|---|
Line 1: | Line 1: | ||
- | ====== WatDiv Query S1 ====== | + | {{indexmenu_n>2}} |
+ | |||
+ | ====== WatDiv Query S1 plans====== | ||
+ | |||
+ | === SPARQL DF plan === | ||
+ | |||
+ | <code scala> | ||
+ | // random partitioning | ||
+ | val DATA = dfDefault | ||
+ | |||
+ | val t1 = DATA.where(s"(p=$idOffers and s=$retailer)").select("o").withColumnRenamed("o","s") | ||
+ | val e1 = sc.parallelize(1 to NB_FRAGMENTS, NB_FRAGMENTS).map(x => -1).toDF("s") | ||
+ | val t1OK = t1.unionAll(e1) | ||
+ | var plan = t1OK | ||
+ | |||
+ | |||
+ | // ordered by increasing triple tp size | ||
+ | val orderedProp = List( | ||
+ | ("sorg", "priceValidUntil"), | ||
+ | ("gr", "validFrom"), | ||
+ | ("gr", "validThrough"), | ||
+ | ("gr", "includes"), | ||
+ | ("gr", "serialNumber"), | ||
+ | ("sorg", "eligibleQuantity"), | ||
+ | ("sorg", "eligibleRegion"), | ||
+ | ("gr", "price")) | ||
+ | |||
+ | val triples = orderedProp.map{case(ns, p) => { | ||
+ | val idP = getIdP(ns, p) | ||
+ | DATA.where(s"p=$idP").select("s","o").withColumnRenamed("o", s"o$idP") | ||
+ | }} | ||
+ | |||
+ | // next triples | ||
+ | for( i <- triples) { | ||
+ | plan = plan.join(i, "s") | ||
+ | } | ||
+ | |||
+ | |||
+ | // Execute query plan for S1 | ||
+ | //---------------------------- | ||
+ | queryTimeDFIter(plan, 10) | ||
+ | // TIME=18.6s INPUT=126GB SHFR=484MB | ||
+ | </code> | ||
+ | |||
+ | === SPARQL Hybrid DF plan === | ||
+ | |||
+ | <code scala> | ||
+ | |||
+ | val subset = df.where(s"(p=51 and s=$retailer) or p in (3,9,38,40,56,57,63,69)").persist | ||
+ | subset.count // 78 413 119 | ||
+ | // Merging time=4,885s | ||
+ | |||
+ | val DATA = subset | ||
+ | |||
+ | val t1 = DATA.where(s"(p=$idOffers and s=$retailer)").select("o").withColumnRenamed("o","s") | ||
+ | val e1 = sc.parallelize(1 to NB_FRAGMENTS, NB_FRAGMENTS).map(x => -1).toDF("s") | ||
+ | val t1OK = t1.unionAll(e1) | ||
+ | var plan = t1OK | ||
+ | |||
+ | |||
+ | // ordered by increasing triple tp size | ||
+ | val orderedProp = List( | ||
+ | ("sorg", "priceValidUntil"), | ||
+ | ("gr", "validFrom"), | ||
+ | ("gr", "validThrough"), | ||
+ | ("gr", "includes"), | ||
+ | ("gr", "serialNumber"), | ||
+ | ("sorg", "eligibleQuantity"), | ||
+ | ("sorg", "eligibleRegion"), | ||
+ | ("gr", "price")) | ||
+ | |||
+ | val triples = orderedProp.map{case(ns, p) => { | ||
+ | val idP = getIdP(ns, p) | ||
+ | DATA.where(s"p=$idP").select("s","o").withColumnRenamed("o", s"o$idP") | ||
+ | }} | ||
+ | |||
+ | // next triples | ||
+ | for( i <- triples) { | ||
+ | plan = plan.join(i, "s") | ||
+ | } | ||
+ | |||
+ | |||
+ | // Execute query plan for S1 | ||
+ | //---------------------------- | ||
+ | queryTimeDFIter(plan, 10) | ||
+ | // 2,87 + 4,885 = 7,76s INPUT=14+6.2=20,2GB SHFR=32KB | ||
+ | </code> | ||
+ | |||
=== S2RDF plan === | === S2RDF plan === | ||
- | <code> | + | <code scala> |
val VP2EXP=VP2Random | val VP2EXP=VP2Random | ||
Line 37: | Line 125: | ||
=== S2RDF+Hybrid plan === | === S2RDF+Hybrid plan === | ||
- | <code> | + | <code scala> |
// VP's partitioned by subject | // VP's partitioned by subject | ||
val VP2EXP=VP2Subject | val VP2EXP=VP2Subject |