SELECT t1.s, t2.o, t3.o FROM triple t1, triple t2, triple t3 WHERE t1.p LIKE '<http://xmlns.com/foaf/0.1/page>' AND t1.o LIKE '<http://dbpedia.org/page/Ibuprofen>' AND t2.p LIKE '<http://xmlns.com/foaf/0.1/page> <http://dbpedia.org/page/Ibuprofen>' AND t3.p LIKE '<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/casRegistryNumber>' AND t1.s=t2.s AND t1.s=t3.s;
import scala.collection.mutable.ListBuffer import scala.collection.mutable.ArrayBuffer import org.apache.spark.HashPartitioner import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext //DF import org.apache.spark.sql.DataFrame import scala.reflect.ClassTag import java.io.Serializable val NB_FRAGMENTS = sc.defaultParallelism val part = sc.defaultParallelism // avec données non encodées, on enlève les doubles val triples0 = sc.textFile("/user/olivier/drugbank").map(x=>x.split(" ")).map(t=>(t(0),(t(1),t(2)))).distinct // on partitionne par sujet val triples = triples0.partitionBy(new HashPartitioner(part)) //val triples2 = triples.setName("triples sans doubles").persist() //triples2.count val df = triples.map{case(s, (p, o)) => (s, p, o)}.toDF("s", "p", "o") df.persist() df.count //====================================== // Timer for RDD and DF //====================================== /* function queryTime1: Timer for RDD */ def queryTime1[T: ClassTag](q: RDD[T]): Double = { var start = java.lang.System.currentTimeMillis(); var c = q.count var t = (java.lang.System.currentTimeMillis() - start).toDouble /1000 println("") println(s"Count=$c, Time= $t (s)") t } /* function queryTime2: timer for DataFrame */ def queryTime2(q: DataFrame): Double = { var start = java.lang.System.currentTimeMillis(); var c = q.count var t = (java.lang.System.currentTimeMillis() - start).toDouble /1000 println("") println(s"Count=$c, Time= $t (s)") t } /* function queryAverageTime: Average query response time for RDD */ def queryAverageTime[T: ClassTag](q: RDD[T], nbIter: Int=10): Unit = { val l = new ArrayBuffer[Double](nbIter) for( i <- 1 to nbIter) { var start = java.lang.System.currentTimeMillis(); var c = q.count var t = (java.lang.System.currentTimeMillis() - start).toDouble /1000 print(s"$t ") l.append(t) } val avg = l.reduce(_+_)/l.size println(" ") println(s"AVERAGE time: $avg (s)") println(" ") } //================================================ // Values for properties and objects //================================================ // 3 branches val p1 = "<http://xmlns.com/foaf/0.1/page>" val o1 = "<http://dbpedia.org/page/Ibuprofen>" val p2 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/chebiId>" val p3 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/casRegistryNumber>" // 5 branches val p4 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/keggDrugId>" val p5 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/keggCompoundId>" // 10 branches val p6 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/pharmacology>" val p7 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/mechanismOfAction>" val p8 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/predictedLogs>" val p9 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/halfLife>" val p10 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/dpdDrugIdNumber>" // 15 branches val p11 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/contraindicationInsert>" val p12 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/interactionInsert>" val p13 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/structure>" val p14 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/state>" val p15 = "<http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/rxlistLink>" val o15 = "<http://www.rxlist.com/cgi/generic/ibup.htm>" //================================================ // Star DataFrame 3 branches //================================================ class Star3Branches(iter: Iterator[org.apache.spark.sql.Row], vp1: String, vo1: String, vp2:String, vp3:String) extends Iterator[(String, String, String)] with java.io.Serializable { val a: scala.collection.mutable.ArrayBuffer[org.apache.spark.sql.Row] = new scala.collection.mutable.ArrayBuffer() var i: Iterator[(String, String, String)] = null def hasNext = { if(i == null) { while(iter.hasNext) { val r = iter.next a.append(r) } // group by subject val b = a.groupBy(x => x.getString(0)) // process a subject val c = b.flatMap{ case (s, rowList) => { // group by predicate val byPred = rowList.groupBy(y => y.getString(1)) // star pattern matching if( byPred.contains(vp1) && byPred.contains(vp2) && byPred.contains(vp3)) { val p1List = byPred(vp1) val p2List = byPred(vp2) val p3List = byPred(vp3) val size = p1List.size * p2List.size * p3List.size val res = new scala.collection.mutable.ArrayBuffer[(String, String, String)](size) // generate all the matchings (this step is only for SELECT queries) for(i1 <- p1List) { for(i2 <- p2List){ for(i3 <- p3List){ res.append( (s, i2.getString(2), i3.getString(2)) ) } } } res } else { Array.empty[(String, String, String)] } } } i = c.iterator } i.hasNext } def next = i.next } val r3 = df.where(s" (p='$p1' and o='$o1') or p='$p2' or p='$p3' ").mapPartitions(iter => new Star3Branches(iter, p1, o1, p2, p3)) queryTime1(r3) queryAverageTime(r3,20) //================================================ // Star DataFrame 5 branches //================================================ class Star5Branches(iter: Iterator[org.apache.spark.sql.Row], vp1: String, vo1: String, vp2:String, vp3:String, vp4:String, vp5:String) extends Iterator[(String, String, String, String, String)] with java.io.Serializable { val a: scala.collection.mutable.ArrayBuffer[org.apache.spark.sql.Row] = new scala.collection.mutable.ArrayBuffer() var i: Iterator[(String, String, String, String, String)] = null def hasNext = { if(i == null) { while(iter.hasNext) { val r = iter.next a.append(r) } // group by subject val b = a.groupBy(x => x.getString(0)) // process a subject val c = b.flatMap{ case (s, rowList) => { // group by predicate val byPred = rowList.groupBy(y => y.getString(1)) // star pattern matching if( byPred.contains(vp1) && byPred.contains(vp2) && byPred.contains(vp3)&& byPred.contains(vp4)&& byPred.contains(vp5)) { val p1List = byPred(vp1) val p2List = byPred(vp2) val p3List = byPred(vp3) val p4List = byPred(vp4) val p5List = byPred(vp5) val size = p1List.size * p2List.size * p3List.size * p4List.size * p5List.size val res = new scala.collection.mutable.ArrayBuffer[(String, String, String, String, String)](size) // generate all the matchings (this step is only for SELECT queries) for(i1 <- p1List) { for(i2 <- p2List){ for(i3 <- p3List){ for(i4 <- p4List){ for(i5 <- p5List){ res.append( (s, i2.getString(2), i3.getString(2), i4.getString(2), i5.getString(2)) ) } } } } } res } else { Array.empty[(String, String, String, String, String)] } } } i = c.iterator } i.hasNext } def next = i.next } val r5 = df.where(s" (p='$p1' and o='$o1') or p='$p2' or p='$p3' or p='$p4' or p='$p5' ").mapPartitions(iter => new Star5Branches(iter, p1, o1, p2, p3, p4, p5)) queryTime1(r5) queryAverageTime(r5, 20) //================================================ // Star DataFrame 10 branches //================================================ class Star10Branches(iter: Iterator[org.apache.spark.sql.Row], vp1: String, vo1: String, vp2:String, vp3:String, vp4:String, vp5:String, vp6:String, vp7:String, vp8:String, vp9:String, vp10:String) extends Iterator[(String, String, String, String, String, String, String, String, String, String)] with java.io.Serializable { val a: scala.collection.mutable.ArrayBuffer[org.apache.spark.sql.Row] = new scala.collection.mutable.ArrayBuffer() var i: Iterator[(String, String, String, String, String,String, String, String, String, String)] = null def hasNext = { if(i == null) { while(iter.hasNext) { val r = iter.next a.append(r) } // group by subject val b = a.groupBy(x => x.getString(0)) // process a subject val c = b.flatMap{ case (s, rowList) => { // group by predicate val byPred = rowList.groupBy(y => y.getString(1)) // star pattern matching if( byPred.contains(vp1) && byPred.contains(vp2) && byPred.contains(vp3)&& byPred.contains(vp4) && byPred.contains(vp5) && byPred.contains(vp6) && byPred.contains(vp7) && byPred.contains(vp8) && byPred.contains(vp9) && byPred.contains(vp10)) { val p1List = byPred(vp1) val p2List = byPred(vp2) val p3List = byPred(vp3) val p4List = byPred(vp4) val p5List = byPred(vp5) val p6List = byPred(vp6) val p7List = byPred(vp7) val p8List = byPred(vp8) val p9List = byPred(vp9) val p10List = byPred(vp10) val size = p1List.size * p2List.size * p3List.size * p4List.size * p5List.size * p6List.size * p7List.size * p8List.size * p9List.size * p10List.size val res = new scala.collection.mutable.ArrayBuffer[(String, String, String, String, String, String, String, String, String, String)](size) // generate all the matchings (this step is only for SELECT queries) for(i1 <- p1List) { for(i2 <- p2List){ for(i3 <- p3List){ for(i4 <- p4List){ for(i5 <- p5List){ for(i6 <- p6List) { for(i7 <- p7List) { for(i8 <- p8List) { for(i9 <- p9List) { for(i10 <- p10List) { res.append( (s, i2.getString(2), i3.getString(2), i4.getString(2), i5.getString(2), i6.getString(2), i7.getString(2), i8.getString(2), i9.getString(2), i10.getString(2)) ) } } } } } } } } } } res } else { Array.empty[(String, String, String, String, String, String, String, String, String, String)] } } } i = c.iterator } i.hasNext } def next = i.next } val r10 = df.where(s" (p='$p1' and o='$o1') or p='$p2' or p='$p3' or p='$p4' or p='$p5' or p='$p6' or p='$p7' or p='$p8' or p='$p9' or p='$p10' ").mapPartitions(iter => new Star10Branches(iter, p1, o1, p2, p3, p4, p5, p6, p7, p8, p9, p10)) queryTime1(r10) queryAverageTime(r10)