import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext //DF import org.apache.spark.sql.DataFrame import scala.reflect.ClassTag import org.apache.spark.HashPartitioner val NB_FRAGMENTS = sc.defaultParallelism val dir = "/user/olivier" val scale = "10k" // 1k 10k val lubmDir = dir + "/lubm" + scale val inputFile = lubmDir + s"/univ${scale}_encoded" // -------------------------------------------------------- // charger les META DONNEES // ------------------------------------------------------- val conceptFilename = dir + "/lubmConcepts_id2URI.txt" val propertyFilename = dir + "/lubmProp_id2URI.txt" val SOFilename = inputFile + "_dict_S_O_C" // taille en mémoire du dict S O C: 76GB déserialisé // -------------- val dictNbPart = 1200 val SOByName = sc.textFile(SOFilename). coalesce(NB_FRAGMENTS). map(line => line.split("\t")). map(tab => (tab(0), tab(1).toLong)). partitionBy(new HashPartitioner(dictNbPart)). // partitionné pour que le lookup acède à une seule partition : accès en O(n/dictNbPart) au lieu de non O(n) setName("SOByName").persist() SOByName.count //328 620 776 //----------------------------- //namespace and qualified names //----------------------------- val nsUB = "http://www.univ-mlv.fr/~ocure/lubm.owl" val nsRDF = "http://www.w3.org/1999/02/22-rdf-syntax-ns" val nameSpaces = Map( "ub" -> nsUB, "rdf" -> nsRDF) def qname(ns: String, prop: String): String = nameSpaces.get(ns).get + "#" + prop // ---------- // Concepts // -------- val conceptById = sc.textFile(conceptFilename).coalesce(1). map(line => line.split("\t")). map(tab => (tab(0).toLong, ( tab(1), tab(2).toByte, tab(3).toByte) ) ). // partitionBy(new HashPartitioner(NB_FRAGMENTS)) // partitioned by id setName("conceptById"). persist() conceptById.count val conceptByName = conceptById.map{ case (id, (name, start, length)) => (name, (id, start, length))}. // partitionBy(new HashPartitioner(NB_FRAGMENTS)). // partitioned by URI setName("conceptByName"). persist() conceptByName.count // ---------- // Properties // ---------- val propertyById = sc.textFile(propertyFilename).coalesce(1). map(line => line.split("\t")). map(tab => { if (tab.size == 3) ( 0L, (tab(1), 0.toByte, 0.toByte)) else ( tab(0).toLong, (tab(1), tab(2).toByte, tab(3).toByte))} ). // partitionBy(new HashPartitioner(NB_FRAGMENTS)). // partitioned by id setName("propertyById"). persist() propertyById.count val propertyByName = propertyById. map{ case (id, (name, start, length)) => (name, (id, start, length))}. // partitionBy(new HashPartitioner(NB_FRAGMENTS)). // partitioned by URI setName("propertyByName"). persist() propertyByName.count // -------------------------------------------------------- // load data either random partitioning or subject based // ------------------------------------------------------- val triplesNonPartitioné = sc.textFile(inputFile). coalesce(NB_FRAGMENTS). map(line => line.split("\t")). map( tab => (tab(0).toLong, (tab(1).toLong, tab(2).toLong)) ) // pour comparer DF avec RDD (star optim), on persist la RDD triples //triplesNonPartitionné.setName("Triples").persist() val triples = triplesNonPartitioné. partitionBy(new HashPartitioner(NB_FRAGMENTS)) // partitioned by subject // DataFrame contenant les données val d = triples.map{case(s,(p,o))=> (s,p,o)}.toDF("subject", "predicate", "object") d.persist() d.count // 133 573 854 pour 1k // 1 334 681 190 pour 10k //d.take(1).head.getLong(1) // -------------------------------------------------------- // fonctions auxillaires // ------------------------------------------------------- def bounds(concept: (Long, Byte, Byte)): (Long, Long) = { // CONSTANTE GLOBALE val codeLength = 31 // calculer la borne sup val id = concept._1 val start = concept._2 val localLength = concept._3 val shift = (codeLength - (start + localLength)) val prefix = id >> shift val upperBound = (prefix+1) << shift return (id, upperBound) } /* function isIn (est un concept OU un sous concept) retourne true si le concept est egal au superconcept ou est un de ses sous concepts : lower <= concept < upper */ def isIn( concept: Long, bounds: (Long, Long)): Boolean = { return concept >= bounds._1 && concept < bounds._2 } /* function queryTimeRDD: Chronometre la durée d'évaluation d'une RDD */ def queryTimeRDD[T: ClassTag](q: RDD[T]): Double = { var start = java.lang.System.currentTimeMillis(); var c = q.count var t = (java.lang.System.currentTimeMillis() - start).toDouble /1000 println("") println(s"Count=$c, Time= $t (s)") t } /* function queryTime: Chronometre la durée d'évaluation d'un DataFrame */ def queryTime(q: DataFrame): Double = { var start = java.lang.System.currentTimeMillis(); var c = q.count var t = (java.lang.System.currentTimeMillis() - start).toDouble /1000 println("") println(s"Count=$c, Time= $t (s)") t } // ================================================== // ================================================== // QUERY // ================================================== // ================================================== // Concepts (rdf:type) // ------------------ val prof = conceptByName.lookup(qname("ub", "Professor")).head._1 val chair = conceptByName.lookup(qname("ub", "Chair")).head._1 val university = conceptByName.lookup(qname("ub", "University")).head._1 val department = conceptByName.lookup(qname("ub", "Department")).head._1 // Concept interval: // ---------------- val profInterval: (Long, Long) = bounds(conceptByName.lookup(qname("ub", "Professor")).head) // studentInterval est nécessaire pour Q8 car aucun type Student dans le dataset mais seulement des sous types val studentInterval: (Long, Long) = bounds(conceptByName.lookup(qname("ub", "Student")).head) val std1 = studentInterval._1 val std2 = studentInterval._2 // Property: // -------- val typeOf = propertyByName.lookup(qname("rdf", "type")).head._1 val worksFor = propertyByName.lookup(qname("ub", "worksFor")).head._1 val subOrg = propertyByName.lookup(qname("ub", "subOrganizationOf")).head._1 val memberOf = propertyByName.lookup(qname("ub", "memberOf")).head._1 val email = propertyByName.lookup(qname("ub", "emailAddress")).head._1 // Property interval: // ----------------- val worksForInterval: (Long, Long) = bounds(propertyByName.lookup(qname("ub", "worksFor")).head) val memberOfInterval: (Long, Long) = bounds(propertyByName.lookup(qname("ub", "memberOf")).head) // Object literal: // -------------- val univ0 = SOByName.lookup("<http://www.University0.edu>").head val bcMemberOf = sc.broadcast(memberOf) val bcEmail = sc.broadcast(email) /* ---------------------- Q8 using Sparql DF method ====================== */ // predicats des triplets sur Y // ----------------------------- val strP1 = s"object >= $std1 and object <= $std2 and predicate = 0" val strP5 = s"predicate = $email" // predicats des triplets sur X // ----------------------------- val strP2 = s"predicate = 0 and object= $department" val strP4 = s"predicate = $subOrg and object = $univ0" val strP3 = s"predicate = $memberOf" val strP1P3P5 = s"($strP1) or ($strP3) or ($strP5)" val t1 = d.where(strP1).select("subject").withColumnRenamed("subject", "X") val t2 = d.where(strP2).select("subject").withColumnRenamed("subject", "Y") val t3 = d.where(strP3).select("subject", "object").withColumnRenamed("subject", "X").withColumnRenamed("object", "Y") val t4 = d.where(strP4).select("subject").withColumnRenamed("subject", "Y") val t5 = d.where(strP5).select("subject", "object").withColumnRenamed("subject", "X").withColumnRenamed("object", "Z") val q8df = t4.join(t2, Seq("Y")).join(t3, Seq("Y")).join(t1, Seq("X")).join(t5, Seq("X")) queryTime(q8df) // --------------------------------------------- // Q8 using the SPARQL Hybrid DF method // ============================================== /* class IterP2P4 etoile P2 P4 ---------------- */ class IterP2P4(iter: Iterator[org.apache.spark.sql.Row]) extends Iterator[Long] { val a: scala.collection.mutable.ArrayBuffer[org.apache.spark.sql.Row] = new scala.collection.mutable.ArrayBuffer() var i: Iterator[Long] = null def hasNext = { if(i == null) { while(iter.hasNext) { val r = iter.next a.append(r) } // group by subject val b = a.groupBy(x => x.getLong(0)) val c = b.flatMap{ case (s, rowList) => { // on sait qu'il y a au plus 1 type Dept et 1 subOrg Univ0 (car il n'y a pas de triplets en double dans le dataset) // donc les 2 triplets de P2 et P4 existent ssi le nombre de triplets vaut 2 if(rowList.size == 2) Seq(s) else Seq() } } i = c.iterator } i.hasNext } def next = i.next } val tmp24 = d.where(s"$strP2 or $strP4").mapPartitions(iter => new IterP2P4(iter)).toDF("Y") queryTime(tmp24) // P2P4 broadcast val bcEtoile24 = sc.broadcast(tmp24.map(x => (x.getLong(0), true)).collect.toMap) /* class IterQ8 : broadcast de l'étoile p2 p4 and join with p1 p3 p5 ------------------- */ class IterQ8(iter: Iterator[org.apache.spark.sql.Row], bcEtoile: org.apache.spark.broadcast.Broadcast[Map[Long,Boolean]]) extends Iterator[(Long,Long,Long)] with java.io.Serializable { // valeurs internes à la classe val localMemberOf = 1140850688L val localEmail = 872415232L val a = new scala.collection.mutable.HashMap[Long, scala.collection.mutable.ArrayBuffer[org.apache.spark.sql.Row]]() var i: Iterator[(Long, Long, Long)] = null def hasNext = { if(i == null) { // parcours séquentiel d'une partition // group by subject while(iter.hasNext) { val r = iter.next val s = r.getLong(0) if(a.contains(s)) { a(s).append(r) } else { val liste = new scala.collection.mutable.ArrayBuffer[org.apache.spark.sql.Row]() liste.append(r) a.put(s, liste) } } // process a subject val c = a.flatMap{ case (s, rowList) => { // group by predicate val byPred = rowList.groupBy(y => y.getLong(1)) // if a star pattern has at least one triple of each type if( byPred.contains(0) && byPred.contains(localMemberOf) && byPred.contains(localEmail)) { val p1List = byPred(0) val p3List = byPred(localMemberOf) val p5List = byPred(localEmail) val size = p1List.size * p3List.size * p5List.size val res = new scala.collection.mutable.ArrayBuffer[(Long,Long,Long)](size) for(vt1 <- p1List) { for(vt3 <- p3List){ if(bcEtoile.value.contains(vt3.getLong(2))) { for(vt5 <- p5List){ res.append( (s, vt3.getLong(2), vt5.getLong(2)) ) } } } } res } else { Array.empty[(Long, Long, Long)] } } } i = c.iterator } i.hasNext } def next = i.next } val q8 = d.where(s"($strP1) or ($strP3) or ($strP5)").mapPartitions(iter => new IterQ8(iter, bcEtoile24)) queryTimeRDD(q8)