Bases de Données / Databases

Site Web de l'équipe BD du LIP6 / LIP6 DB Web Site

User Tools

Site Tools


en:site:recherche:logiciels:sparqlwithspark:star

This is an old revision of the document!


Star shape queries over DrugBank

SPARQL queries Query star 1 (3 branches) <code> SELECT ?x ?a ?b WHERE { ?x <http://xmlns.com/foaf/0.1/page> <http://dbpedia.org/page/Ibuprofen>. ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/chebiId> ?a . ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/casRegistryNumber> ?b.} </code> in SQL

SELECT t1.s, t2.o, t3.o FROM triple t1, triple t2, triple t3 WHERE t1.p LIKE '<http://xmlns.com/foaf/0.1/page>' AND t1.o LIKE '<http://dbpedia.org/page/Ibuprofen>' AND t2.p LIKE '<http://xmlns.com/foaf/0.1/page> <http://dbpedia.org/page/Ibuprofen>' AND t3.p LIKE '<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/casRegistryNumber>' AND t1.s=t2.s AND t1.s=t3.s;

Query star 1 (5 branches) <code> SELECT ?x ?a ?b ?c ?d WHERE { ?x <http://xmlns.com/foaf/0.1/page> <http://dbpedia.org/page/Ibuprofen>. ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/chebiId> ?a . ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/casRegistryNumber> ?b . ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/keggDrugId> ?c . ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/keggCompoundId> ?d . } </code> Query star 1 (10 branches)

SELECT ?x ?a ?b ?c ?d ?g ?h ?i
WHERE {
 ?x <http://xmlns.com/foaf/0.1/page> <http://dbpedia.org/page/Ibuprofen>.
 ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/chebiId> ?a .
 ?x  <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/casRegistryNumber> ?b . 
 ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/keggDrugId> ?c .
 ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/keggCompoundId> ?d .
 ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/pharmacology> "?e.
 ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/mechanismOfAction> ?f
 ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/predictedLogs> ?g .
 ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/halfLife> ?h .
 ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/dpdDrugIdNumber> ?i .
}

Query star 1 (15 branches) <code> SELECT ?x ?a ?b ?c ?d ?g ?h ?i ?j ?k ?l WHERE { ?x <http://xmlns.com/foaf/0.1/page> <http://dbpedia.org/page/Ibuprofen>. ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/chebiId> ?a . ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/casRegistryNumber> ?b . ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/keggDrugId> ?c . ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/keggCompoundId> ?d . ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/pharmacology> “?e. ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/mechanismOfAction> ?f ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/predictedLogs> ?g . ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/halfLife> ?h . ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/dpdDrugIdNumber> ?i . ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/contraindicationInsert> ?j . ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/interactionInsert> ?k . ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/structure> ?l. ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/state> ?m . ?x <http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/rxlistLink> <http://www.rxlist.com/cgi/generic/ibup.htm> .} </code> Scala code: <code> import scala.collection.mutable.ListBuffer import scala.collection.mutable.ArrayBuffer import org.apache.spark.HashPartitioner import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext DF import org.apache.spark.sql.DataFrame import scala.reflect.ClassTag

import java.io.Serializable

val NB_FRAGMENTS = sc.defaultParallelism val part = sc.defaultParallelism

avec données non encodées, on enlève les doubles val triples0 = sc.textFile(”/user/olivier/drugbank“).map(x⇒x.split(” “)).map(t⇒(t(0),(t(1),t(2)))).distinct on partitionne par sujet val triples = triples0.partitionBy(new HashPartitioner(part))

val triples2 = triples.setName(“triples sans doubles”).persist() triples2.count

val df = triples.map{case(s, (p, o)) ⇒ (s, p, o)}.toDF(“s”, “p”, “o”)

df.persist() df.count

====================================== Timer for RDD and DF ====================================== def queryTime1[T: ClassTag](q: RDD[T]): Double = { var start = java.lang.System.currentTimeMillis(); var c = q.count var t = (java.lang.System.currentTimeMillis() - start).toDouble /1000 println(”“) println(s”Count=$c, Time= $t (s)“) t } def queryTime2(q: DataFrame): Double = { var start = java.lang.System.currentTimeMillis(); var c = q.count var t = (java.lang.System.currentTimeMillis() - start).toDouble /1000 println(”“) println(s”Count=$c, Time= $t (s)“) t } def queryAverageTime[T: ClassTag](q: RDD[T], nbIter: Int=10): Unit = { val l = new ArrayBuffer[Double](nbIter) for( i ← 1 to nbIter) { var start = java.lang.System.currentTimeMillis(); var c = q.count var t = (java.lang.System.currentTimeMillis() - start).toDouble /1000 print(s”$t “) l.append(t) } val avg = l.reduce(_+_)/l.size println(” “) println(s”AVERAGE time: $avg (s)“) println(” “) }

Values for properties and objects

3 branches val p1 = ”<http://xmlns.com/foaf/0.1/page>“ val o1 = ”<http://dbpedia.org/page/Ibuprofen>“ val p2 = ”<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/chebiId>“ val p3 = ”<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/casRegistryNumber>“ 5 branches val p4 = ”<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/keggDrugId>“ val p5 = ”<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/keggCompoundId>“

10 branches val p6 = ”<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/pharmacology>“ val p7 = ”<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/mechanismOfAction>“ val p8 = ”<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/predictedLogs>“ val p9 = ”<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/halfLife>“ val p10 = ”<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/dpdDrugIdNumber>“ 15 branches val p11 = ”<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/contraindicationInsert>“ val p12 = ”<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/interactionInsert>“ val p13 = ”<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/structure>“ val p14 = ”<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/state>“ val p15 = ”<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/rxlistLink>“ val o15 = ”<http://www.rxlist.com/cgi/generic/ibup.htm>“

================================================ Star DataFrame 3 branches ================================================ class Star3Branches(iter: Iterator[org.apache.spark.sql.Row], vp1: String, vo1: String, vp2:String, vp3:String) extends Iterator[(String, String, String)] with java.io.Serializable { val a: scala.collection.mutable.ArrayBuffer[org.apache.spark.sql.Row] = new scala.collection.mutable.ArrayBuffer() var i: Iterator[(String, String, String)] = null def hasNext = { if(i == null) { while(iter.hasNext) { val r = iter.next a.append® } group by subject

    val b = a.groupBy(x => x.getString(0))
    // process a subject
    val c = b.flatMap{ case (s, rowList) =>  {
      // group by predicate
      val byPred = rowList.groupBy(y => y.getString(1))
      // star pattern matching
      if( byPred.contains(vp1) && byPred.contains(vp2) && byPred.contains(vp3)) {
        val p1List = byPred(vp1)
        val p2List = byPred(vp2)
        val p3List = byPred(vp3)
        val size = p1List.size * p2List.size * p3List.size
        val res = new scala.collection.mutable.ArrayBuffer[(String, String, String)](size)
        
        // generate all the matchings (this step is only for SELECT queries)
        for(i1 <- p1List) {
          for(i2 <- p2List){
            for(i3 <- p3List){
              res.append( (s, i2.getString(2), i3.getString(2)) )
            }
          }
        }
        res          
      }
      else {
        Array.empty[(String, String, String)]
      }
    }
    }
    i = c.iterator
  }
  i.hasNext
}
def next = i.next

}

val r3 = df.where(s” (p='$p1' and o='$o1') or p='$p2' or p='$p3' “).mapPartitions(iter ⇒ new Star3Branches(iter, p1, o1, p2, p3))

queryTime1(r3) queryAverageTime(r3,20)

================================================ Star DataFrame 5 branches ================================================ class Star5Branches(iter: Iterator[org.apache.spark.sql.Row], vp1: String, vo1: String, vp2:String, vp3:String, vp4:String, vp5:String) extends Iterator[(String, String, String, String, String)] with java.io.Serializable { val a: scala.collection.mutable.ArrayBuffer[org.apache.spark.sql.Row] = new scala.collection.mutable.ArrayBuffer() var i: Iterator[(String, String, String, String, String)] = null def hasNext = { if(i == null) { while(iter.hasNext) { val r = iter.next a.append® } group by subject

    val b = a.groupBy(x => x.getString(0))
    // process a subject
    val c = b.flatMap{ case (s, rowList) =>  {
      // group by predicate
      val byPred = rowList.groupBy(y => y.getString(1))
      // star pattern matching
      if( byPred.contains(vp1) && byPred.contains(vp2) && byPred.contains(vp3)&& byPred.contains(vp4)&& byPred.contains(vp5)) {
        val p1List = byPred(vp1)
        val p2List = byPred(vp2)
        val p3List = byPred(vp3)
        val p4List = byPred(vp4)
        val p5List = byPred(vp5)
        val size = p1List.size * p2List.size * p3List.size * p4List.size * p5List.size
        val res = new scala.collection.mutable.ArrayBuffer[(String, String, String, String, String)](size)
        
        // generate all the matchings (this step is only for SELECT queries)
        for(i1 <- p1List) {
          for(i2 <- p2List){
            for(i3 <- p3List){
              for(i4 <- p4List){
                for(i5 <- p5List){
                  res.append( (s, i2.getString(2), i3.getString(2), i4.getString(2), i5.getString(2)) )
                }
              }
            }
          }
        }
        res          
      }
      else {
        Array.empty[(String, String, String, String, String)]
      }
    }
    }
    i = c.iterator
  }
  i.hasNext
}
def next = i.next

}

val r5 = df.where(s” (p='$p1' and o='$o1') or p='$p2' or p='$p3' or p='$p4' or p='$p5' “).mapPartitions(iter ⇒ new Star5Branches(iter, p1, o1, p2, p3, p4, p5))

queryTime1(r5) queryAverageTime(r5, 20)

================================================ Star DataFrame 10 branches ================================================ class Star10Branches(iter: Iterator[org.apache.spark.sql.Row], vp1: String, vo1: String, vp2:String, vp3:String, vp4:String, vp5:String, vp6:String, vp7:String, vp8:String, vp9:String, vp10:String) extends Iterator[(String, String, String, String, String, String, String, String, String, String)] with java.io.Serializable { val a: scala.collection.mutable.ArrayBuffer[org.apache.spark.sql.Row] = new scala.collection.mutable.ArrayBuffer() var i: Iterator[(String, String, String, String, String,String, String, String, String, String)] = null def hasNext = { if(i == null) { while(iter.hasNext) { val r = iter.next a.append® } group by subject

    val b = a.groupBy(x => x.getString(0))
    // process a subject
    val c = b.flatMap{ case (s, rowList) =>  {
      // group by predicate
      val byPred = rowList.groupBy(y => y.getString(1))
      // star pattern matching
      if( byPred.contains(vp1) && byPred.contains(vp2) && byPred.contains(vp3)&& byPred.contains(vp4) && byPred.contains(vp5) && byPred.contains(vp6) && byPred.contains(vp7) && byPred.contains(vp8) && byPred.contains(vp9) && byPred.contains(vp10)) {
        val p1List = byPred(vp1)
        val p2List = byPred(vp2)
        val p3List = byPred(vp3)
        val p4List = byPred(vp4)
        val p5List = byPred(vp5)
        val p6List = byPred(vp6)
        val p7List = byPred(vp7)
        val p8List = byPred(vp8)
        val p9List = byPred(vp9)
        val p10List = byPred(vp10)
        val size = p1List.size * p2List.size * p3List.size * p4List.size * p5List.size * p6List.size * p7List.size * p8List.size * p9List.size * p10List.size
        val res = new scala.collection.mutable.ArrayBuffer[(String, String, String, String, String, String, String, String, String, String)](size)
        
        // generate all the matchings (this step is only for SELECT queries)
        for(i1 <- p1List) {
          for(i2 <- p2List){
            for(i3 <- p3List){
              for(i4 <- p4List){
                for(i5 <- p5List){
                  for(i6 <- p6List) {
                    for(i7 <- p7List) {
                      for(i8 <- p8List) {
                        for(i9 <- p9List) {
                          for(i10 <- p10List) {
                            res.append( (s, i2.getString(2), i3.getString(2), i4.getString(2), i5.getString(2), i6.getString(2), i7.getString(2), i8.getString(2), i9.getString(2), i10.getString(2)) )
                          }
                        }
                      }
                    }
                  }
                }
              }
            }
          }
        }
        res          
      }
      else {
        Array.empty[(String, String, String, String, String, String, String, String, String, String)]
      }
    }
    }
    i = c.iterator
  }
  i.hasNext
}
def next = i.next

}

val r10 = df.where(s” (p='$p1' and o='$o1') or p='$p2' or p='$p3' or p='$p4' or p='$p5' or p='$p6' or p='$p7' or p='$p8' or p='$p9' or p='$p10' “).mapPartitions(iter ⇒ new Star10Branches(iter, p1, o1, p2, p3, p4, p5, p6, p7, p8, p9, p10))

queryTime1(r10) queryAverageTime(r10)

</code>

en/site/recherche/logiciels/sparqlwithspark/star.1473802143.txt.gz · Last modified: 13/09/2016 23:29 by hubert