Bases de Données / Databases

Site Web de l'équipe BD du LIP6 / LIP6 DB Web Site

User Tools

Site Tools


en:site:recherche:logiciels:sparqlwithspark:utility

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Next revision
Previous revision
en:site:recherche:logiciels:sparqlwithspark:utility [13/09/2016 18:42]
hubert created
en:site:recherche:logiciels:sparqlwithspark:utility [16/09/2016 23:05] (current)
hubert [Utilities]
Line 1: Line 1:
 +{{indexmenu_n>​9}}
 +
 ====== Utilities ====== ====== Utilities ======
  
 +
 +=== Data loading utilities===
 +
 +<code scala>
 +
 +// TIMER
 +def queryTimeDFIter(q:​ DataFrame, nbIter: Int): Unit = {
 +  var l = new scala.collection.mutable.ArrayBuffer[Double](nbIter)
 +  for( i <- 1 to nbIter) {
 +    var start = java.lang.System.currentTimeMillis();​
 +    var c = q.count
 +    var t = (java.lang.System.currentTimeMillis() - start).toDouble /1000
 +    l.append(t)
 +    println(""​)
 +    println(s"​Count=$c,​ Time= $t (s)")
 +  }
 +  val avg = l.reduce(_+_).toDouble/​l.size
 +  println(s"​AVERAGE time for ${l.size} values is: $avg")
 +}
 +
 +
 +// define VPs to be loaded
 +//​-------------------------
 +val nbP = dictP.count.toInt
 +val r = (0 to nbP-1)
 +
 +
 +// SPECIFY THE PARTITIONING : either default or subject based
 +// ------------------------
 +// Default partitioning (s,o) 
 +val VP2Random = r.map(i => (i, sqlContext.read.parquet(vpDir + "/​p"​ + i).repartition(NB_FRAGMENTS) )).toMap
 +
 +// Partitioning by SUBJECT (s)
 +val VP2 = r.map(i => (i, sqlContext.read.parquet(vpDir + "/​p"​ + i).repartition(NB_FRAGMENTS,​ col("​s"​)) )).toMap
 +
 +
 +// load VP sizes
 +val VP2Size = sqlContext.read.parquet(vpDir + "/​size"​).collect.map(r => (r.getLong(0).toInt,​ r.getLong(1))).toMap
 +
 +
 +val nameSpace = Map( 
 +"​dc"​ -> "​http://​purl.org/​dc/​terms/",​
 +"​foaf"​ -> "​http://​xmlns.com/​foaf/",​
 +"​gr"​ -> "​http://​purl.org/​goodrelations/",​
 +"​gn"​ -> "​http://​www.geonames.org/​ontology#",​
 +"​mo"​ -> "​http://​purl.org/​ontology/​mo/",​
 +"​og"​ -> "​http://​ogp.me/​ns#",​
 +"​rev"​ -> "​http://​purl.org/​stuff/​rev#",​
 +"​rdf"​ -> "​http://​www.w3.org/​1999/​02/​22-rdf-syntax-ns#",​
 +"​rdfs"​ -> "​http://​www.w3.org/​2000/​01/​rdf-schema#",​
 +"​sorg"​ -> "​http://​schema.org/",​
 +"​wsdbm"​ -> "​http://​db.uwaterloo.ca/​~galuc/​wsdbm/"​)
 +
 +def getIdP(prefix:​ String, p: String):Int =  {
 +  val ns = nameSpace.get(prefix).get
 +  val full = ns + p
 +  return dictP.where(s"​p = '<​$full>'"​).take(1).head.getLong(1).toInt
 +}
 +
 +
 +def getIdSO(prefix:​ String, s: String): Long =  {
 +  val ns = nameSpace.get(prefix).get
 +  val full = ns + s
 +  return dictSO.where(s"​so = '<​$full>'"​).take(1).head.getLong(1)
 +}
 +
 +</​code>​
 +
 +=== QUERY utilities ===
 +
 +<code scala>
 +case class v(n:Int)
 +
 +def sparqlSplit(sparql: ​ List[(Any, String, Any)]): List[(Any, (String, String), Any)] = {
 +  return sparql.map{case(s,​p,​o)=>​(s,​p.split(":"​),​o)}.map{case(s,​p,​o)=>​(s,​(p(0),​ p(1)), o)}
 +}
 +
 +
 +//persist VPs accessed by a query q
 +def persistQueryVP(q:​ List[(Any, (String, String), Any)], vps: Map[Int,​org.apache.spark.sql.DataFrame])={
 +q.map{case(_,​(ns,​p),​_) => vps(getIdP(ns,​ p)).persist().count}
 +}
 +
 +// encode properties and literals in a query
 +def sparqlEncode(splittedSparql:​ List[(Any, (String, String), Any)]): List[(Any, Int, Any)]  = { 
 +  return splittedSparql.map{ case (s,​(ns,​p),​o) => 
 +                          val idP = getIdP(ns, p)
 +                          (s, o) match {
 +                            case(v(a), v(b))=> (v(a), idP, v(b))
 +                            case(lit:​String,​ v(b)) => { val idLit = getIdSO(lit.split(":"​)(0),​ lit.split(":"​)(1))  ​
 +                                                        (idLit, idP, v(b))}
 +                            case(v(a), lit:​String) ​ => { val idLit = getIdSO(lit.split(":"​)(0),​ lit.split(":"​)(1))  ​
 +                                                        (v(a), idP, idLit)}
 +}}}
 +
 +
 +// generate selection operator for each triple pattern
 +def tpOperators(encodedQuery:​ List[(Any, Int, Any)], d: Map[Int, DataFrame]) = 
 +encodedQuery.map{ case(v(a), p, v(b))=> ( (v(a), p, v(b)), d(p).withColumnRenamed("​o",​ s"​v$b"​))
 +                        case(lit, ​ p, v(b))=> ( (lit , p, v(b)), d(p).where(s"​s=$lit"​).select("​o"​).withColumnRenamed("​o",​ s"​v$b"​))
 +                        case(v(a), p, lit) => ( (v(a), p, lit ), d(p).where(s"​o=$lit"​).select("​s"​))
 +                      }
 +
 +
 +</​code>​
 +
 +
 +
 +back to   ​[[en:​site:​recherche:​logiciels:​sparqlwithspark| SPARQL Query Processing with Apache Spark]]
en/site/recherche/logiciels/sparqlwithspark/utility.1473784924.txt.gz · Last modified: 13/09/2016 18:42 by hubert