Table of Contents

Star shape query plans

SPARQL SQL for a star with 3 branches

SELECT t1.s, t2.o, t3.o 
FROM triple t1, triple t2, triple t3 
WHERE t1.p LIKE '<http://xmlns.com/foaf/0.1/page>' 
AND t1.o LIKE '<http://dbpedia.org/page/Ibuprofen>' 
AND t2.p LIKE '<http://xmlns.com/foaf/0.1/page> <http://dbpedia.org/page/Ibuprofen>' 
AND t3.p LIKE '<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/casRegistryNumber>' 
AND t1.s=t2.s AND t1.s=t3.s;

SPARQL Hybrid DF : plan for Spark v1.5

import scala.collection.mutable.ListBuffer
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.HashPartitioner
 
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
 
//DF
import org.apache.spark.sql.DataFrame
import scala.reflect.ClassTag
 
import java.io.Serializable
 
 
val NB_FRAGMENTS = sc.defaultParallelism
val part =  sc.defaultParallelism 
 
// avec données non encodées, on enlève les doubles
val triples0 = sc.textFile("/user/olivier/drugbank").map(x=>x.split(" ")).map(t=>(t(0),(t(1),t(2)))).distinct
 
// on partitionne par sujet
val triples = triples0.partitionBy(new HashPartitioner(part))
 
//val triples2 = triples.setName("triples sans doubles").persist()
//triples2.count
 
val df = triples.map{case(s, (p, o)) => (s, p, o)}.toDF("s", "p", "o")
 
df.persist()
df.count
 
 
 
//======================================
// Timer for RDD and DF
//======================================
 
/*
 function queryTime1:  Timer for RDD
*/
def queryTime1[T: ClassTag](q: RDD[T]): Double = {
  var start = java.lang.System.currentTimeMillis();
  var c = q.count
  var t = (java.lang.System.currentTimeMillis() - start).toDouble /1000
  println("")
  println(s"Count=$c, Time= $t (s)")
  t
}
 
/*
 function queryTime2:  timer for DataFrame
*/
def queryTime2(q: DataFrame): Double = {
  var start = java.lang.System.currentTimeMillis();
  var c = q.count
  var t = (java.lang.System.currentTimeMillis() - start).toDouble /1000
  println("")
  println(s"Count=$c, Time= $t (s)")
  t
}
 
/*
 function queryAverageTime:  Average query response time for RDD
*/
def queryAverageTime[T: ClassTag](q: RDD[T], nbIter: Int=10): Unit = {
  val l = new ArrayBuffer[Double](nbIter)
  for( i <- 1 to nbIter) {
    var start = java.lang.System.currentTimeMillis();
    var c = q.count
    var t = (java.lang.System.currentTimeMillis() - start).toDouble /1000
    print(s"$t ")
    l.append(t)
  }
  val avg = l.reduce(_+_)/l.size
  println(" ")
  println(s"AVERAGE time: $avg (s)")
  println(" ")
}
 
 
//================================================
// Values for properties and objects
//================================================
 
 
// 3 branches
val p1 = "<http://xmlns.com/foaf/0.1/page>"
val o1 = "<http://dbpedia.org/page/Ibuprofen>"
val p2 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/chebiId>"
val p3 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/casRegistryNumber>"
 
// 5 branches
val p4 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/keggDrugId>"
val p5 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/keggCompoundId>"
 
// 10 branches
val p6 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/pharmacology>"
val p7 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/mechanismOfAction>"
val p8 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/predictedLogs>"
val p9 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/halfLife>"
val p10 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/dpdDrugIdNumber>"
 
// 15 branches
val p11 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/contraindicationInsert>"
val p12 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/interactionInsert>"
val p13 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/structure>"
val p14 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/state>"
val p15 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/rxlistLink>"
val o15 = "<http://www.rxlist.com/cgi/generic/ibup.htm>"
 
 
//================================================
// Star DataFrame 3 branches
//================================================
 
class Star3Branches(iter: Iterator[org.apache.spark.sql.Row], vp1: String, vo1: String, vp2:String, vp3:String) extends Iterator[(String, String, String)] with java.io.Serializable {
 
  val a:  scala.collection.mutable.ArrayBuffer[org.apache.spark.sql.Row] = new scala.collection.mutable.ArrayBuffer()
  var i: Iterator[(String, String, String)] = null
 
  def hasNext = {
    if(i == null) {
      while(iter.hasNext) {
        val r = iter.next
        a.append(r)
      }
      // group by subject
      val b = a.groupBy(x => x.getString(0))
 
      // process a subject
      val c = b.flatMap{ case (s, rowList) =>  {
 
        // group by predicate
        val byPred = rowList.groupBy(y => y.getString(1))
 
        // star pattern matching
        if( byPred.contains(vp1) && byPred.contains(vp2) && byPred.contains(vp3)) {
          val p1List = byPred(vp1)
          val p2List = byPred(vp2)
          val p3List = byPred(vp3)
          val size = p1List.size * p2List.size * p3List.size
          val res = new scala.collection.mutable.ArrayBuffer[(String, String, String)](size)
 
          // generate all the matchings (this step is only for SELECT queries)
          for(i1 <- p1List) {
            for(i2 <- p2List){
              for(i3 <- p3List){
                res.append( (s, i2.getString(2), i3.getString(2)) )
              }
            }
          }
          res          
        }
        else {
          Array.empty[(String, String, String)]
        }
      }
      }
      i = c.iterator
    }
    i.hasNext
  }
 
  def next = i.next
}
 
val r3 = df.where(s" (p='$p1' and o='$o1') or p='$p2' or p='$p3' ").mapPartitions(iter => new Star3Branches(iter, p1, o1, p2, p3))
 
queryTime1(r3)  
queryAverageTime(r3,20)  
 
 
 
//================================================
// Star DataFrame 5 branches
//================================================
 
class Star5Branches(iter: Iterator[org.apache.spark.sql.Row], vp1: String, vo1: String, vp2:String, vp3:String, vp4:String, vp5:String) extends Iterator[(String, String, String, String, String)] with java.io.Serializable {
 
  val a:  scala.collection.mutable.ArrayBuffer[org.apache.spark.sql.Row] = new scala.collection.mutable.ArrayBuffer()
  var i: Iterator[(String, String, String, String, String)] = null
 
  def hasNext = {
    if(i == null) {
      while(iter.hasNext) {
        val r = iter.next
        a.append(r)
      }
      // group by subject
      val b = a.groupBy(x => x.getString(0))
 
      // process a subject
      val c = b.flatMap{ case (s, rowList) =>  {
 
        // group by predicate
        val byPred = rowList.groupBy(y => y.getString(1))
 
        // star pattern matching
        if( byPred.contains(vp1) && byPred.contains(vp2) && byPred.contains(vp3)&& byPred.contains(vp4)&& byPred.contains(vp5)) {
          val p1List = byPred(vp1)
          val p2List = byPred(vp2)
          val p3List = byPred(vp3)
          val p4List = byPred(vp4)
          val p5List = byPred(vp5)
          val size = p1List.size * p2List.size * p3List.size * p4List.size * p5List.size
          val res = new scala.collection.mutable.ArrayBuffer[(String, String, String, String, String)](size)
 
          // generate all the matchings (this step is only for SELECT queries)
          for(i1 <- p1List) {
            for(i2 <- p2List){
              for(i3 <- p3List){
                for(i4 <- p4List){
                  for(i5 <- p5List){
                    res.append( (s, i2.getString(2), i3.getString(2), i4.getString(2), i5.getString(2)) )
                  }
                }
              }
            }
          }
          res          
        }
        else {
          Array.empty[(String, String, String, String, String)]
        }
      }
      }
      i = c.iterator
    }
    i.hasNext
  }
 
  def next = i.next
}
 
val r5 = df.where(s" (p='$p1' and o='$o1') or p='$p2' or p='$p3' or p='$p4' or p='$p5' ").mapPartitions(iter => new Star5Branches(iter, p1, o1, p2, p3, p4, p5))
 
queryTime1(r5)  
queryAverageTime(r5, 20)  
 
 
//================================================
// Star DataFrame 10 branches
//================================================
 
class Star10Branches(iter: Iterator[org.apache.spark.sql.Row], vp1: String, vo1: String, vp2:String, vp3:String, vp4:String, vp5:String, vp6:String, vp7:String, vp8:String, vp9:String, vp10:String) extends Iterator[(String, String, String, String, String, String, String, String, String, String)] with java.io.Serializable {
 
  val a:  scala.collection.mutable.ArrayBuffer[org.apache.spark.sql.Row] = new scala.collection.mutable.ArrayBuffer()
  var i: Iterator[(String, String, String, String, String,String, String, String, String, String)] = null
 
  def hasNext = {
    if(i == null) {
      while(iter.hasNext) {
        val r = iter.next
        a.append(r)
      }
      // group by subject
      val b = a.groupBy(x => x.getString(0))
 
      // process a subject
      val c = b.flatMap{ case (s, rowList) =>  {
 
        // group by predicate
        val byPred = rowList.groupBy(y => y.getString(1))
 
        // star pattern matching
        if( byPred.contains(vp1) && byPred.contains(vp2) && byPred.contains(vp3)&& byPred.contains(vp4) && byPred.contains(vp5) && byPred.contains(vp6) && byPred.contains(vp7) && byPred.contains(vp8) && byPred.contains(vp9) && byPred.contains(vp10)) {
          val p1List = byPred(vp1)
          val p2List = byPred(vp2)
          val p3List = byPred(vp3)
          val p4List = byPred(vp4)
          val p5List = byPred(vp5)
          val p6List = byPred(vp6)
          val p7List = byPred(vp7)
          val p8List = byPred(vp8)
          val p9List = byPred(vp9)
          val p10List = byPred(vp10)
          val size = p1List.size * p2List.size * p3List.size * p4List.size * p5List.size * p6List.size * p7List.size * p8List.size * p9List.size * p10List.size
          val res = new scala.collection.mutable.ArrayBuffer[(String, String, String, String, String, String, String, String, String, String)](size)
 
          // generate all the matchings (this step is only for SELECT queries)
          for(i1 <- p1List) {
            for(i2 <- p2List){
              for(i3 <- p3List){
                for(i4 <- p4List){
                  for(i5 <- p5List){
                    for(i6 <- p6List) {
                      for(i7 <- p7List) {
                        for(i8 <- p8List) {
                          for(i9 <- p9List) {
                            for(i10 <- p10List) {
                              res.append( (s, i2.getString(2), i3.getString(2), i4.getString(2), i5.getString(2), i6.getString(2), i7.getString(2), i8.getString(2), i9.getString(2), i10.getString(2)) )
                            }
                          }
                        }
                      }
                    }
                  }
                }
              }
            }
          }
          res          
        }
        else {
          Array.empty[(String, String, String, String, String, String, String, String, String, String)]
        }
      }
      }
      i = c.iterator
    }
    i.hasNext
  }
 
  def next = i.next
}
 
val r10 = df.where(s" (p='$p1' and o='$o1') or p='$p2' or p='$p3' or p='$p4' or p='$p5' or p='$p6' or p='$p7' or p='$p8' or p='$p9' or p='$p10' ").mapPartitions(iter => new Star10Branches(iter, p1, o1, p2, p3, p4, p5, p6, p7, p8, p9, p10))
 
queryTime1(r10)  
queryAverageTime(r10)

Go to SPARQL query processing with Apache Spark