This is an old revision of the document!
val VP2EXP=VP2Random //triple patterns joining on their subject val tp = List( ("gr", "includes"), ("gr", "price"), ("gr", "serialNumber"), ("gr", "validFrom"), ("gr", "validThrough"), ("sorg", "eligibleQuantity"), ("sorg", "eligibleRegion"), ("sorg", "priceValidUntil")) val tpSize = tp.map{case(ns, p) => (p, getIdP(ns,p), VP2Size.get(getIdP(ns, p)).get)}.sortBy{case (p, idp, size)=> size} val selections = tpSize.map{case (_, idP, _)=> VP2EXP(idP).withColumnRenamed("o", s"o$idP")} /* first triple pattern */ val retailer = getIdSO("wsdbm", "Retailer3") val sel1 = VP2EXP(getIdP("gr", "offers")).where(s"s=$retailer").select("o").withColumnRenamed("o","s") /* step required for Spark to preserve PROCESS_LOCAL locality level during the first join operation */ val e1 = sc.parallelize(1 to NB_FRAGMENTS, NB_FRAGMENTS).map(x => -1).toDF("s") val sel1a = t1.unionAll(e1) var plan = sel1a /* next triple patterns */ for( i<-sel1) { plan = plan.join(i, "s") } queryTimeDFIter(plan, 10) //TIME=2.106s INPUT=1,2GB SHFR=484MB
// VP's partitioned by subject val VP2EXP=VP2Subject //triple patterns joining on their subject val tp = List( ("gr", "includes"), ("gr", "price"), ("gr", "serialNumber"), ("gr", "validFrom"), ("gr", "validThrough"), ("sorg", "eligibleQuantity"), ("sorg", "eligibleRegion"), ("sorg", "priceValidUntil")) val tpSize = tp.map{case(ns, p) => (p, getIdP(ns,p), VP2Size.get(getIdP(ns, p)).get)}.sortBy{case (p, idp, size)=> size} val selections = tpSize.map{case (_, idP, _)=> VP2EXP(idP).withColumnRenamed("o", s"o$idP")} /* first triple pattern */ val retailer = getIdSO("wsdbm", "Retailer3") val sel1 = VP2EXP(getIdP("gr", "offers")).where(s"s=$retailer").select("o").withColumnRenamed("o","s") /* step required for Spark to preserve PROCESS_LOCAL locality level during the first join operation */ val e1 = sc.parallelize(1 to NB_FRAGMENTS, NB_FRAGMENTS).map(x => -1).toDF("s") val sel1a = t1.unionAll(e1) var plan = sel1a /* next triple patterns */ for( i<-sel1) { plan = plan.join(i, "s") } queryTimeDFIter(plan, 10) //TIME=2.106 INPUT=100+60=160MB SHFR=38KB