Bases de Données / Databases

Site Web de l'équipe BD du LIP6 / LIP6 DB Web Site

User Tools

Site Tools


en:site:recherche:logiciels:sparqlwithspark:utility

Utilities

Data loading utilities

// TIMER
def queryTimeDFIter(q: DataFrame, nbIter: Int): Unit = {
  var l = new scala.collection.mutable.ArrayBuffer[Double](nbIter)
  for( i <- 1 to nbIter) {
    var start = java.lang.System.currentTimeMillis();
    var c = q.count
    var t = (java.lang.System.currentTimeMillis() - start).toDouble /1000
    l.append(t)
    println("")
    println(s"Count=$c, Time= $t (s)")
  }
  val avg = l.reduce(_+_).toDouble/l.size
  println(s"AVERAGE time for ${l.size} values is: $avg")
}
 
 
// define VPs to be loaded
//-------------------------
val nbP = dictP.count.toInt
val r = (0 to nbP-1)
 
 
// SPECIFY THE PARTITIONING : either default or subject based
// ------------------------
// Default partitioning (s,o) 
val VP2Random = r.map(i => (i, sqlContext.read.parquet(vpDir + "/p" + i).repartition(NB_FRAGMENTS) )).toMap
 
// Partitioning by SUBJECT (s)
val VP2 = r.map(i => (i, sqlContext.read.parquet(vpDir + "/p" + i).repartition(NB_FRAGMENTS, col("s")) )).toMap
 
 
// load VP sizes
val VP2Size = sqlContext.read.parquet(vpDir + "/size").collect.map(r => (r.getLong(0).toInt, r.getLong(1))).toMap
 
 
val nameSpace = Map( 
"dc" -> "http://purl.org/dc/terms/",
"foaf" -> "http://xmlns.com/foaf/",
"gr" -> "http://purl.org/goodrelations/",
"gn" -> "http://www.geonames.org/ontology#",
"mo" -> "http://purl.org/ontology/mo/",
"og" -> "http://ogp.me/ns#",
"rev" -> "http://purl.org/stuff/rev#",
"rdf" -> "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
"rdfs" -> "http://www.w3.org/2000/01/rdf-schema#",
"sorg" -> "http://schema.org/",
"wsdbm" -> "http://db.uwaterloo.ca/~galuc/wsdbm/")
 
def getIdP(prefix: String, p: String):Int =  {
  val ns = nameSpace.get(prefix).get
  val full = ns + p
  return dictP.where(s"p = '<$full>'").take(1).head.getLong(1).toInt
}
 
 
def getIdSO(prefix: String, s: String): Long =  {
  val ns = nameSpace.get(prefix).get
  val full = ns + s
  return dictSO.where(s"so = '<$full>'").take(1).head.getLong(1)
}

QUERY utilities

case class v(n:Int)
 
def sparqlSplit(sparql:  List[(Any, String, Any)]): List[(Any, (String, String), Any)] = {
  return sparql.map{case(s,p,o)=>(s,p.split(":"),o)}.map{case(s,p,o)=>(s,(p(0), p(1)), o)}
}
 
 
//persist VPs accessed by a query q
def persistQueryVP(q: List[(Any, (String, String), Any)], vps: Map[Int,org.apache.spark.sql.DataFrame])={
q.map{case(_,(ns,p),_) => vps(getIdP(ns, p)).persist().count}
}
 
// encode properties and literals in a query
def sparqlEncode(splittedSparql: List[(Any, (String, String), Any)]): List[(Any, Int, Any)]  = { 
  return splittedSparql.map{ case (s,(ns,p),o) => 
                          val idP = getIdP(ns, p)
                          (s, o) match {
                            case(v(a), v(b))=> (v(a), idP, v(b))
                            case(lit:String, v(b)) => { val idLit = getIdSO(lit.split(":")(0), lit.split(":")(1))  
                                                        (idLit, idP, v(b))}
                            case(v(a), lit:String)  => { val idLit = getIdSO(lit.split(":")(0), lit.split(":")(1))  
                                                        (v(a), idP, idLit)}
}}}
 
 
// generate selection operator for each triple pattern
def tpOperators(encodedQuery: List[(Any, Int, Any)], d: Map[Int, DataFrame]) = 
encodedQuery.map{ case(v(a), p, v(b))=> ( (v(a), p, v(b)), d(p).withColumnRenamed("o", s"v$b"))
                        case(lit,  p, v(b))=> ( (lit , p, v(b)), d(p).where(s"s=$lit").select("o").withColumnRenamed("o", s"v$b"))
                        case(v(a), p, lit) => ( (v(a), p, lit ), d(p).where(s"o=$lit").select("s"))
                      }

back to SPARQL Query Processing with Apache Spark

en/site/recherche/logiciels/sparqlwithspark/utility.txt · Last modified: 16/09/2016 23:05 by hubert