WatDiv Query F5 plans
SPARQL Hybrid DF plan
// First triple pattern
val retailer = getIdSO("wsdbm", "Retailer3")
val idP1 = getIdP("gr", "offers")
val s1 = List(
("gr","validThrough"),
("gr", "includes"),
("gr", "price"))
val s2 = List(
("og", "title"),
("rdf", "type"))
val subset = df.where(s"(p=$idP1 and s=$retailer) or p in (3,12,26,38,57)").persist
subset.count
val DATA = subset
val tp1 = DATA.where(s"s=$retailer").select("o").withColumnRenamed("o","s")
val e1 = sc.parallelize(1 to NB_FRAGMENTS, NB_FRAGMENTS).map(x => -1).toDF("s")
val tp1a = t1.unionAll(e1)
val l1 = s1.map{case(ns, p) => {
val idP = getIdP(ns, p)
DATA.where(s"p=$idP").select("s","o").withColumnRenamed("o", s"o$idP")}}
val l2 = s2.map{case(ns, p) => {
val idP = getIdP(ns, p)
DATA.where(s"p=$idP").select("s","o").withColumnRenamed("o", s"o$idP")}}
val v1 = getIdP("gr", "includes")
val snf1 = tp1a.join(l1(0),"s").join(l1(1),"s").join(l1(2),"s").withColumnRenamed("s", "v0").withColumnRenamed(s"o$v1","s")
val query = snf1.join(l2(0),"s").join(l2(1),"s")
S2RDF plan
val VP2EXP=VP2Random
val tp1 = VP2EXP(getIdP("gr", "offers")).where(s"s=$retailer").select("o").withColumnRenamed("o","s")
val l1 = s1.map{case(ns, p) => {
val idP = getIdP(ns, p)
VP2EXP(idP).withColumnRenamed("o", s"o$idP")}}
val v1 = getIdP("gr", "includes")
val snf1 = tp1.join(l1(0),"s").join(l1(1),"s").join(l1(2),"s").withColumnRenamed("s", "v0").withColumnRenamed(s"o$v1","s")
val l2 = s2.map{case(ns, p) => {
val idP = getIdP(ns, p)
VP2EXP(idP).withColumnRenamed("o", s"o$idP")}}
val query = snf1.join(l2(0),"s").join(l2(1),"s")
S2RDF + Hybrid plan
val VP2EXP=VP2Subject
val s1 = List(
("gr","validThrough"),
("gr", "includes"),
("gr", "price"))
val s2 = List(
("og", "title"),
("rdf", "type"))
// First triple pattern
val retailer = getIdSO("wsdbm", "Retailer3")
val tp1 = VP2EXP(getIdP("gr", "offers")).where(s"s=$retailer").select("o").withColumnRenamed("o","s")
val e1 = sc.parallelize(1 to NB_FRAGMENTS, NB_FRAGMENTS).map(x => -1).toDF("s")
val tp1a = tp1.unionAll(e1)
val l1 = s1.map{case(ns, p) => {
val idP = getIdP(ns, p)
VP2EXP(idP).withColumnRenamed("o", s"o$idP")}}
val v1 = getIdP("gr", "includes")
val snf1 = tp1a.join(l1(0),"s").join(l1(1),"s").join(l1(2),"s").withColumnRenamed("s", "v0").withColumnRenamed(s"o$v1","s")
val l2 = s2.map{case(ns, p) => {
val idP = getIdP(ns, p)
VP2EXP(idP).withColumnRenamed("o", s"o$idP")}}
val query = snf1.join(l2(0),"s").join(l2(1),"s")
Go to SPARQL query processing with Apache Spark