SnowFlake query Q8 plans

Plans for Spark 1.5

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
 
//DF
import org.apache.spark.sql.DataFrame
 
import scala.reflect.ClassTag
import org.apache.spark.HashPartitioner
 
val NB_FRAGMENTS = sc.defaultParallelism
 
val dir = "/user/olivier"
val scale = "10k" //   1k 10k
val lubmDir = dir + "/lubm" + scale
 
val inputFile = lubmDir + s"/univ${scale}_encoded" 
 
// --------------------------------------------------------
// charger les META DONNEES
// -------------------------------------------------------
 
val conceptFilename = dir + "/lubmConcepts_id2URI.txt"
val propertyFilename = dir + "/lubmProp_id2URI.txt"
val SOFilename = inputFile + "_dict_S_O_C"
 
 
// taille en mémoire du dict S O C: 76GB déserialisé
// -------------- 
val dictNbPart = 1200
val SOByName = sc.textFile(SOFilename).
  coalesce(NB_FRAGMENTS).
  map(line => line.split("\t")).
  map(tab => (tab(0), tab(1).toLong)).
  partitionBy(new HashPartitioner(dictNbPart)). // partitionné pour que le lookup acède à une seule partition : accès en O(n/dictNbPart) au lieu de non O(n)
  setName("SOByName").persist()
 
SOByName.count
//328 620 776
 
 
//-----------------------------
//namespace and qualified names
//-----------------------------
 
val nsUB = "http://www.univ-mlv.fr/~ocure/lubm.owl"
val nsRDF = "http://www.w3.org/1999/02/22-rdf-syntax-ns"
val nameSpaces = Map( "ub" -> nsUB, "rdf" -> nsRDF)
def qname(ns: String, prop: String): String = nameSpaces.get(ns).get + "#" + prop
 
// ----------
// Concepts 
// --------
 
val conceptById = sc.textFile(conceptFilename).coalesce(1).
  map(line => line.split("\t")).
  map(tab => (tab(0).toLong, ( tab(1), tab(2).toByte, tab(3).toByte) ) ).
  // partitionBy(new HashPartitioner(NB_FRAGMENTS)) // partitioned by id
  setName("conceptById").
  persist()
 
conceptById.count
 
val conceptByName = conceptById.map{ case (id, (name, start, length)) => (name, (id, start, length))}.
  // partitionBy(new HashPartitioner(NB_FRAGMENTS)). // partitioned by URI
  setName("conceptByName").
  persist()
 
conceptByName.count
 
// ----------
// Properties
// ----------
 
val propertyById = sc.textFile(propertyFilename).coalesce(1).
  map(line => line.split("\t")).
  map(tab => {
    if (tab.size == 3) ( 0L, (tab(1), 0.toByte, 0.toByte))
    else ( tab(0).toLong, (tab(1), tab(2).toByte, tab(3).toByte))}
  ).
  // partitionBy(new HashPartitioner(NB_FRAGMENTS)). // partitioned by id
  setName("propertyById").
  persist()
 
propertyById.count
 
val propertyByName = propertyById.
  map{ case (id, (name, start, length)) => (name, (id, start, length))}.
  // partitionBy(new HashPartitioner(NB_FRAGMENTS)). // partitioned by URI
  setName("propertyByName").
  persist()
 
propertyByName.count
 
 
 
// --------------------------------------------------------
// load data either random partitioning or subject based
// -------------------------------------------------------
 
 
val triplesNonPartitioné = sc.textFile(inputFile).
  coalesce(NB_FRAGMENTS).
  map(line => line.split("\t")).
  map( tab => (tab(0).toLong, (tab(1).toLong, tab(2).toLong)) ) 
 
// pour comparer DF avec RDD (star optim), on persist la RDD triples
//triplesNonPartitionné.setName("Triples").persist()
 
val triples = triplesNonPartitioné.
  partitionBy(new HashPartitioner(NB_FRAGMENTS)) // partitioned by subject
 
// DataFrame contenant les données
 
val d = triples.map{case(s,(p,o))=> (s,p,o)}.toDF("subject", "predicate", "object")
d.persist()
d.count
 
//   133 573 854 pour 1k
// 1 334 681 190 pour 10k
 
//d.take(1).head.getLong(1)
 
// --------------------------------------------------------
// fonctions auxillaires
// -------------------------------------------------------
 
def bounds(concept: (Long, Byte, Byte)): (Long, Long) = {
  // CONSTANTE GLOBALE
  val codeLength = 31
  // calculer la borne sup
  val id = concept._1
  val start = concept._2
  val localLength = concept._3
  val shift = (codeLength - (start + localLength))
  val prefix = id >> shift
  val upperBound = (prefix+1) << shift
  return (id, upperBound)
}
 
/*
 function isIn (est un concept OU un sous concept)
 retourne true si le concept est egal au superconcept
                  ou est un de ses sous concepts :
 
                  lower <= concept < upper
*/
def isIn( concept: Long, bounds: (Long, Long)): Boolean = {
  return concept >= bounds._1 && concept < bounds._2
}
 
/*
 function queryTimeRDD:  Chronometre la durée d'évaluation d'une RDD
*/
def queryTimeRDD[T: ClassTag](q: RDD[T]): Double = {
  var start = java.lang.System.currentTimeMillis();
  var c = q.count
  var t = (java.lang.System.currentTimeMillis() - start).toDouble /1000
  println("")
  println(s"Count=$c, Time= $t (s)")
  t
}
 
/*
 function queryTime:  Chronometre la durée d'évaluation d'un DataFrame
*/
def queryTime(q: DataFrame): Double = {
  var start = java.lang.System.currentTimeMillis();
  var c = q.count
  var t = (java.lang.System.currentTimeMillis() - start).toDouble /1000
  println("")
  println(s"Count=$c, Time= $t (s)")
  t
}
 
 
 
 
// ==================================================
// ==================================================
 
//               QUERY
 
// ==================================================
// ==================================================
 
 
 
// Concepts (rdf:type)
// ------------------
val prof = conceptByName.lookup(qname("ub", "Professor")).head._1
val chair = conceptByName.lookup(qname("ub", "Chair")).head._1
val university = conceptByName.lookup(qname("ub", "University")).head._1
val department = conceptByName.lookup(qname("ub", "Department")).head._1
 
// Concept interval:
// ----------------
val profInterval: (Long, Long) = bounds(conceptByName.lookup(qname("ub", "Professor")).head)
 
// studentInterval est nécessaire pour Q8 car aucun type Student dans le dataset mais seulement des sous types 
val studentInterval: (Long, Long) = bounds(conceptByName.lookup(qname("ub", "Student")).head) 
val std1 = studentInterval._1
val std2 = studentInterval._2
 
 
// Property:
// --------
val typeOf = propertyByName.lookup(qname("rdf", "type")).head._1
val worksFor = propertyByName.lookup(qname("ub", "worksFor")).head._1
val subOrg = propertyByName.lookup(qname("ub", "subOrganizationOf")).head._1
val memberOf = propertyByName.lookup(qname("ub", "memberOf")).head._1
val email = propertyByName.lookup(qname("ub", "emailAddress")).head._1
 
 
// Property interval:
// -----------------
val worksForInterval: (Long, Long) = bounds(propertyByName.lookup(qname("ub", "worksFor")).head)
val memberOfInterval: (Long, Long) = bounds(propertyByName.lookup(qname("ub", "memberOf")).head)
 
 
// Object literal:
// --------------
val univ0 = SOByName.lookup("<http://www.University0.edu>").head
 
val bcMemberOf = sc.broadcast(memberOf)
val bcEmail = sc.broadcast(email)
 
 
/* 
   ----------------------
      Q8 using Sparql DF method
   ======================
*/
 
// predicats des triplets sur Y
// -----------------------------
 
val strP1 = s"object >= $std1 and object <= $std2 and predicate = 0"
val strP5 = s"predicate = $email"
 
 
  // predicats des triplets sur X
  // -----------------------------
val strP2 = s"predicate = 0 and object= $department"
val strP4 = s"predicate = $subOrg and object = $univ0"
val strP3 = s"predicate = $memberOf"
 
val strP1P3P5 = s"($strP1) or ($strP3) or ($strP5)"
 
 
val t1 = d.where(strP1).select("subject").withColumnRenamed("subject", "X")
val t2 = d.where(strP2).select("subject").withColumnRenamed("subject", "Y")
val t3 = d.where(strP3).select("subject", "object").withColumnRenamed("subject", "X").withColumnRenamed("object", "Y")
val t4 = d.where(strP4).select("subject").withColumnRenamed("subject", "Y")
val t5 = d.where(strP5).select("subject", "object").withColumnRenamed("subject", "X").withColumnRenamed("object", "Z")
 
 
val q8df = t4.join(t2, Seq("Y")).join(t3, Seq("Y")).join(t1, Seq("X")).join(t5, Seq("X"))
queryTime(q8df)
 
 
 
 
 
// ---------------------------------------------
// Q8 using the SPARQL Hybrid DF method
// ==============================================
 
 
/*
  class IterP2P4  etoile P2 P4
  ----------------
*/
class IterP2P4(iter: Iterator[org.apache.spark.sql.Row]) extends Iterator[Long] {
 
 val a:  scala.collection.mutable.ArrayBuffer[org.apache.spark.sql.Row] = new scala.collection.mutable.ArrayBuffer()
  var i: Iterator[Long] = null
  def hasNext = {
    if(i == null) {
      while(iter.hasNext) {
        val r = iter.next
        a.append(r)
      }
 
      // group by subject
      val b = a.groupBy(x => x.getLong(0))
      val c = b.flatMap{ case (s, rowList) =>  {
        // on sait qu'il y a au plus 1 type Dept et 1 subOrg Univ0 (car il n'y a pas de triplets en double dans le dataset)
        // donc les 2 triplets de P2 et P4 existent ssi le nombre de triplets vaut 2
        if(rowList.size == 2) Seq(s)
        else Seq()
      }
      }
      i = c.iterator
    }
    i.hasNext
  }
  def next = i.next
}
 
val tmp24 = d.where(s"$strP2 or $strP4").mapPartitions(iter => new IterP2P4(iter)).toDF("Y")
queryTime(tmp24)  
 
//  P2P4 broadcast
val bcEtoile24 = sc.broadcast(tmp24.map(x => (x.getLong(0), true)).collect.toMap)
 
 
/*
  class IterQ8 : broadcast de l'étoile p2 p4  and join with p1 p3 p5 
 -------------------
*/
class IterQ8(iter: Iterator[org.apache.spark.sql.Row], bcEtoile: org.apache.spark.broadcast.Broadcast[Map[Long,Boolean]]) extends Iterator[(Long,Long,Long)] with java.io.Serializable {
 
  // valeurs internes à la classe
  val localMemberOf = 1140850688L
  val localEmail = 872415232L
 
  val a = new scala.collection.mutable.HashMap[Long, scala.collection.mutable.ArrayBuffer[org.apache.spark.sql.Row]]()
  var i: Iterator[(Long, Long, Long)] = null
 
  def hasNext = {
    if(i == null) {
      // parcours séquentiel d'une partition
 
      // group by subject
      while(iter.hasNext) {
        val r = iter.next
        val s = r.getLong(0)
        if(a.contains(s)) {
          a(s).append(r)
        }
        else {
          val liste = new scala.collection.mutable.ArrayBuffer[org.apache.spark.sql.Row]()
          liste.append(r)
          a.put(s, liste)
        }
      }
 
      // process a subject
      val c = a.flatMap{ case (s, rowList) =>  {
 
        // group by predicate
        val byPred = rowList.groupBy(y => y.getLong(1))
        // if a star pattern has at least one triple of each type
        if( byPred.contains(0) && byPred.contains(localMemberOf) && byPred.contains(localEmail)) {
          val p1List = byPred(0)
          val p3List = byPred(localMemberOf)
          val p5List = byPred(localEmail)
          val size = p1List.size * p3List.size * p5List.size
          val res = new scala.collection.mutable.ArrayBuffer[(Long,Long,Long)](size)
 
          for(vt1 <- p1List) {
            for(vt3 <- p3List){
              if(bcEtoile.value.contains(vt3.getLong(2))) {
                for(vt5 <- p5List){
                  res.append( (s, vt3.getLong(2), vt5.getLong(2)) )
                }
              }
            }
          }
          res
        }
        else {
          Array.empty[(Long, Long, Long)]
        }
      }
      }
      i = c.iterator
    }
    i.hasNext
  }
 
  def next = i.next
}
 
 
val q8 = d.where(s"($strP1) or ($strP3) or ($strP5)").mapPartitions(iter => new IterQ8(iter, bcEtoile24))
queryTimeRDD(q8)