This shows you the differences between two versions of the page.
Both sides previous revision Previous revision Next revision | Previous revision | ||
en:site:recherche:logiciels:sparqlwithspark:utility [13/09/2016 18:43] hubert |
en:site:recherche:logiciels:sparqlwithspark:utility [16/09/2016 23:05] (current) hubert [Utilities] |
||
---|---|---|---|
Line 1: | Line 1: | ||
+ | {{indexmenu_n>9}} | ||
+ | |||
====== Utilities ====== | ====== Utilities ====== | ||
+ | |||
+ | === Data loading utilities=== | ||
+ | |||
+ | <code scala> | ||
+ | |||
+ | // TIMER | ||
+ | def queryTimeDFIter(q: DataFrame, nbIter: Int): Unit = { | ||
+ | var l = new scala.collection.mutable.ArrayBuffer[Double](nbIter) | ||
+ | for( i <- 1 to nbIter) { | ||
+ | var start = java.lang.System.currentTimeMillis(); | ||
+ | var c = q.count | ||
+ | var t = (java.lang.System.currentTimeMillis() - start).toDouble /1000 | ||
+ | l.append(t) | ||
+ | println("") | ||
+ | println(s"Count=$c, Time= $t (s)") | ||
+ | } | ||
+ | val avg = l.reduce(_+_).toDouble/l.size | ||
+ | println(s"AVERAGE time for ${l.size} values is: $avg") | ||
+ | } | ||
+ | |||
+ | |||
+ | // define VPs to be loaded | ||
+ | //------------------------- | ||
+ | val nbP = dictP.count.toInt | ||
+ | val r = (0 to nbP-1) | ||
+ | |||
+ | |||
+ | // SPECIFY THE PARTITIONING : either default or subject based | ||
+ | // ------------------------ | ||
+ | // Default partitioning (s,o) | ||
+ | val VP2Random = r.map(i => (i, sqlContext.read.parquet(vpDir + "/p" + i).repartition(NB_FRAGMENTS) )).toMap | ||
+ | |||
+ | // Partitioning by SUBJECT (s) | ||
+ | val VP2 = r.map(i => (i, sqlContext.read.parquet(vpDir + "/p" + i).repartition(NB_FRAGMENTS, col("s")) )).toMap | ||
+ | |||
+ | |||
+ | // load VP sizes | ||
+ | val VP2Size = sqlContext.read.parquet(vpDir + "/size").collect.map(r => (r.getLong(0).toInt, r.getLong(1))).toMap | ||
+ | |||
+ | |||
+ | val nameSpace = Map( | ||
+ | "dc" -> "http://purl.org/dc/terms/", | ||
+ | "foaf" -> "http://xmlns.com/foaf/", | ||
+ | "gr" -> "http://purl.org/goodrelations/", | ||
+ | "gn" -> "http://www.geonames.org/ontology#", | ||
+ | "mo" -> "http://purl.org/ontology/mo/", | ||
+ | "og" -> "http://ogp.me/ns#", | ||
+ | "rev" -> "http://purl.org/stuff/rev#", | ||
+ | "rdf" -> "http://www.w3.org/1999/02/22-rdf-syntax-ns#", | ||
+ | "rdfs" -> "http://www.w3.org/2000/01/rdf-schema#", | ||
+ | "sorg" -> "http://schema.org/", | ||
+ | "wsdbm" -> "http://db.uwaterloo.ca/~galuc/wsdbm/") | ||
+ | |||
+ | def getIdP(prefix: String, p: String):Int = { | ||
+ | val ns = nameSpace.get(prefix).get | ||
+ | val full = ns + p | ||
+ | return dictP.where(s"p = '<$full>'").take(1).head.getLong(1).toInt | ||
+ | } | ||
+ | |||
+ | |||
+ | def getIdSO(prefix: String, s: String): Long = { | ||
+ | val ns = nameSpace.get(prefix).get | ||
+ | val full = ns + s | ||
+ | return dictSO.where(s"so = '<$full>'").take(1).head.getLong(1) | ||
+ | } | ||
+ | |||
+ | </code> | ||
+ | |||
+ | === QUERY utilities === | ||
+ | |||
+ | <code scala> | ||
+ | case class v(n:Int) | ||
+ | |||
+ | def sparqlSplit(sparql: List[(Any, String, Any)]): List[(Any, (String, String), Any)] = { | ||
+ | return sparql.map{case(s,p,o)=>(s,p.split(":"),o)}.map{case(s,p,o)=>(s,(p(0), p(1)), o)} | ||
+ | } | ||
+ | |||
+ | |||
+ | //persist VPs accessed by a query q | ||
+ | def persistQueryVP(q: List[(Any, (String, String), Any)], vps: Map[Int,org.apache.spark.sql.DataFrame])={ | ||
+ | q.map{case(_,(ns,p),_) => vps(getIdP(ns, p)).persist().count} | ||
+ | } | ||
+ | |||
+ | // encode properties and literals in a query | ||
+ | def sparqlEncode(splittedSparql: List[(Any, (String, String), Any)]): List[(Any, Int, Any)] = { | ||
+ | return splittedSparql.map{ case (s,(ns,p),o) => | ||
+ | val idP = getIdP(ns, p) | ||
+ | (s, o) match { | ||
+ | case(v(a), v(b))=> (v(a), idP, v(b)) | ||
+ | case(lit:String, v(b)) => { val idLit = getIdSO(lit.split(":")(0), lit.split(":")(1)) | ||
+ | (idLit, idP, v(b))} | ||
+ | case(v(a), lit:String) => { val idLit = getIdSO(lit.split(":")(0), lit.split(":")(1)) | ||
+ | (v(a), idP, idLit)} | ||
+ | }}} | ||
+ | |||
+ | |||
+ | // generate selection operator for each triple pattern | ||
+ | def tpOperators(encodedQuery: List[(Any, Int, Any)], d: Map[Int, DataFrame]) = | ||
+ | encodedQuery.map{ case(v(a), p, v(b))=> ( (v(a), p, v(b)), d(p).withColumnRenamed("o", s"v$b")) | ||
+ | case(lit, p, v(b))=> ( (lit , p, v(b)), d(p).where(s"s=$lit").select("o").withColumnRenamed("o", s"v$b")) | ||
+ | case(v(a), p, lit) => ( (v(a), p, lit ), d(p).where(s"o=$lit").select("s")) | ||
+ | } | ||
+ | </code> | ||
- | back to | + | back to [[en:site:recherche:logiciels:sparqlwithspark| SPARQL Query Processing with Apache Spark]] |