The plans produced by each method are:
val d1 = triples.filter{case(s,(p,o))=> p==P1}.map{case(x1,(p, x2)) => (x2, x1)} val d2 = triples.filter{case(s,(p,o))=> p==P2}.mapValues{case(p,x3) => x3} val d3 = triples.filter{case(s,(p,o))=> p==P3}.mapValues{case(p,x4) => x4} val d4 = triples.filter{case(s,(p,o))=> p==P4}.mapValues{case(p,x5) => x5} val j1 = d1.join(d2).map{case(x2,(x1, x3))=> (x3, (x1,x2))} val j2 = j1.join(d3).map{case(x3,((x1,x2), x4))=> (x4, (x1,x2,x3))} val j3 = j2.join(d4).map{case(x4,((x1,x2,x3), x5))=> (x5, (x1,x2,x3,x4))} j3.count
val t1 = df.where(s"p=$P1").select("s","o").withColumnRenamed("s", "x1").withColumnRenamed("o", "x2") val t2 = df.where(s"p=$P2").select("s","o").withColumnRenamed("s", "x2").withColumnRenamed("o", "x3") val t3 = df.where(s"p=$P3").select("s","o").withColumnRenamed("s", "x3").withColumnRenamed("o", "x4") val t4 = df.where(s"p=$P4").select("s","o").withColumnRenamed("s", "x4").withColumnRenamed("o", "x5") val res = t1.join(t2,Seq("x2")).join(t3,Seq("x3")).join(t4,Seq("x4"))
val subg = df.where(s"p in ($P2, $P3, $P4)") subg.persist subg.count val st2 = subg.where(s"p= $P2").select("s","o").withColumnRenamed("s", "x2").withColumnRenamed("o", "x3") val st3 = subg.where(s"p= $P3").select("s","o").withColumnRenamed("s", "x3").withColumnRenamed("o", "x4") val st4 = subg.where(s"p= $P4").select("s","o").withColumnRenamed("s", "x4").withColumnRenamed("o", "x5") val res = t1.join(st2,Seq("x2")).join(st3,Seq("x3")).join(st4,Seq("x4")) res.count