// random partitioning val DATA = dfDefault val t1 = DATA.where(s"(p=$idOffers and s=$retailer)").select("o").withColumnRenamed("o","s") val e1 = sc.parallelize(1 to NB_FRAGMENTS, NB_FRAGMENTS).map(x => -1).toDF("s") val t1OK = t1.unionAll(e1) var plan = t1OK // ordered by increasing triple tp size val orderedProp = List( ("sorg", "priceValidUntil"), ("gr", "validFrom"), ("gr", "validThrough"), ("gr", "includes"), ("gr", "serialNumber"), ("sorg", "eligibleQuantity"), ("sorg", "eligibleRegion"), ("gr", "price")) val triples = orderedProp.map{case(ns, p) => { val idP = getIdP(ns, p) DATA.where(s"p=$idP").select("s","o").withColumnRenamed("o", s"o$idP") }} // next triples for( i <- triples) { plan = plan.join(i, "s") } // Execute query plan for S1 //---------------------------- queryTimeDFIter(plan, 10) // TIME=18.6s INPUT=126GB SHFR=484MB
val subset = df.where(s"(p=51 and s=$retailer) or p in (3,9,38,40,56,57,63,69)").persist subset.count // 78 413 119 // Merging time=4,885s val DATA = subset val t1 = DATA.where(s"(p=$idOffers and s=$retailer)").select("o").withColumnRenamed("o","s") val e1 = sc.parallelize(1 to NB_FRAGMENTS, NB_FRAGMENTS).map(x => -1).toDF("s") val t1OK = t1.unionAll(e1) var plan = t1OK // ordered by increasing triple tp size val orderedProp = List( ("sorg", "priceValidUntil"), ("gr", "validFrom"), ("gr", "validThrough"), ("gr", "includes"), ("gr", "serialNumber"), ("sorg", "eligibleQuantity"), ("sorg", "eligibleRegion"), ("gr", "price")) val triples = orderedProp.map{case(ns, p) => { val idP = getIdP(ns, p) DATA.where(s"p=$idP").select("s","o").withColumnRenamed("o", s"o$idP") }} // next triples for( i <- triples) { plan = plan.join(i, "s") } // Execute query plan for S1 //---------------------------- queryTimeDFIter(plan, 10) // 2,87 + 4,885 = 7,76s INPUT=14+6.2=20,2GB SHFR=32KB
val VP2EXP=VP2Random //triple patterns joining on their subject val tp = List( ("gr", "includes"), ("gr", "price"), ("gr", "serialNumber"), ("gr", "validFrom"), ("gr", "validThrough"), ("sorg", "eligibleQuantity"), ("sorg", "eligibleRegion"), ("sorg", "priceValidUntil")) val tpSize = tp.map{case(ns, p) => (p, getIdP(ns,p), VP2Size.get(getIdP(ns, p)).get)}.sortBy{case (p, idp, size)=> size} val selections = tpSize.map{case (_, idP, _)=> VP2EXP(idP).withColumnRenamed("o", s"o$idP")} /* first triple pattern */ val retailer = getIdSO("wsdbm", "Retailer3") val sel1 = VP2EXP(getIdP("gr", "offers")).where(s"s=$retailer").select("o").withColumnRenamed("o","s") /* step required for Spark to preserve PROCESS_LOCAL locality level during the first join operation */ val e1 = sc.parallelize(1 to NB_FRAGMENTS, NB_FRAGMENTS).map(x => -1).toDF("s") val sel1a = t1.unionAll(e1) var plan = sel1a /* next triple patterns */ for( i<-sel1) { plan = plan.join(i, "s") } queryTimeDFIter(plan, 10) //TIME=2.106s INPUT=1,2GB SHFR=484MB
// VP's partitioned by subject val VP2EXP=VP2Subject //triple patterns joining on their subject val tp = List( ("gr", "includes"), ("gr", "price"), ("gr", "serialNumber"), ("gr", "validFrom"), ("gr", "validThrough"), ("sorg", "eligibleQuantity"), ("sorg", "eligibleRegion"), ("sorg", "priceValidUntil")) val tpSize = tp.map{case(ns, p) => (p, getIdP(ns,p), VP2Size.get(getIdP(ns, p)).get)}.sortBy{case (p, idp, size)=> size} val selections = tpSize.map{case (_, idP, _)=> VP2EXP(idP).withColumnRenamed("o", s"o$idP")} /* first triple pattern */ val retailer = getIdSO("wsdbm", "Retailer3") val sel1 = VP2EXP(getIdP("gr", "offers")).where(s"s=$retailer").select("o").withColumnRenamed("o","s") /* step required for Spark to preserve PROCESS_LOCAL locality level during the first join operation */ val e1 = sc.parallelize(1 to NB_FRAGMENTS, NB_FRAGMENTS).map(x => -1).toDF("s") val sel1a = t1.unionAll(e1) var plan = sel1a /* next triple patterns */ for( i<-sel1) { plan = plan.join(i, "s") } queryTimeDFIter(plan, 10) //TIME=2.106 INPUT=100+60=160MB SHFR=38KB