This shows you the differences between two versions of the page.
Both sides previous revision Previous revision Next revision | Previous revision | ||
en:site:recherche:logiciels:sparqlwithspark:datasetwatdiv [14/09/2016 14:18] hubert [Loading data] |
en:site:recherche:logiciels:sparqlwithspark:datasetwatdiv [16/09/2016 23:01] (current) hubert [Load VP's] |
||
---|---|---|---|
Line 1: | Line 1: | ||
+ | {{indexmenu_n>1}} | ||
+ | |||
====== Loading WatDiv Dataset ====== | ====== Loading WatDiv Dataset ====== | ||
- | ===== Load and encode data ===== | + | ===== Data preparation: encode raw data ===== |
- | <code> | + | <code scala> |
import org.apache.spark.sql.DataFrame | import org.apache.spark.sql.DataFrame | ||
Line 83: | Line 85: | ||
</code> | </code> | ||
- | ===== VP creation===== | + | ===== Create VP's ===== |
- | <code> | + | |
- | + | ||
- | // ------------------- | + | |
- | // creation of VP's | + | |
- | // ------------------- | + | |
- | + | ||
- | // triple(id, dataframe, count) | + | |
+ | Create one dataset per property. | ||
+ | <code scala> | ||
/* | /* | ||
val df = num. | val df = num. | ||
Line 103: | Line 100: | ||
withColumnRenamed("idP","p"). | withColumnRenamed("idP","p"). | ||
withColumnRenamed("idO","o") | withColumnRenamed("idO","o") | ||
- | + | | |
- | + | ||
// size of VPs | // size of VPs | ||
val VPSize = df.groupBy("p").count(). | val VPSize = df.groupBy("p").count(). | ||
withColumnRenamed("count","card") | withColumnRenamed("count","card") | ||
VPSize.coalesce(1).save(vpDir + "/size") | VPSize.coalesce(1).save(vpDir + "/size") | ||
- | |||
// VP definition and materialization | // VP definition and materialization | ||
Line 123: | Line 117: | ||
</code> | </code> | ||
+ | |||
+ | ===== Load VP's ===== | ||
+ | <code scala> | ||
+ | |||
+ | // S2RDF VP | ||
+ | // -------- | ||
+ | |||
+ | import org.apache.spark.sql.DataFrame | ||
+ | |||
+ | val NB_FRAGMENTS = sc.defaultParallelism | ||
+ | |||
+ | val dir = "/user/hubert/watdiv" | ||
+ | |||
+ | // 1 billion triples | ||
+ | val scale = "1G" | ||
+ | |||
+ | |||
+ | val encodedFile = dir + "/frame" + scale | ||
+ | |||
+ | // Dictionnaries | ||
+ | // ------------- | ||
+ | val dictSOFile = dir + "/dictSO" + scale | ||
+ | val dictPFile = dir + "/dictP" | ||
+ | |||
+ | val dictP = sqlContext.read.parquet(dictPFile).coalesce(1) | ||
+ | dictP.persist().count | ||
+ | |||
+ | val dictSO = sqlContext.read.parquet(dictSOFile).coalesce(NB_FRAGMENTS) | ||
+ | //val dictSO = sqlContext.read.parquet(dictSOFile).repartition(NB_FRAGMENTS, col("so")) | ||
+ | dictSO.persist().count | ||
+ | |||
+ | |||
+ | // VP Dataset | ||
+ | // ------- | ||
+ | val vpDir = dir + "/vp" + scale | ||
+ | |||
+ | |||
+ | // TIMER | ||
+ | def queryTimeDFIter(q: DataFrame, nbIter: Int): Unit = { | ||
+ | var l = new scala.collection.mutable.ArrayBuffer[Double](nbIter) | ||
+ | for( i <- 1 to nbIter) { | ||
+ | var start = java.lang.System.currentTimeMillis(); | ||
+ | var c = q.count | ||
+ | var t = (java.lang.System.currentTimeMillis() - start).toDouble /1000 | ||
+ | l.append(t) | ||
+ | println("") | ||
+ | println(s"Count=$c, Time= $t (s)") | ||
+ | } | ||
+ | val avg = l.reduce(_+_).toDouble/l.size | ||
+ | println(s"AVERAGE time for ${l.size} values is: $avg") | ||
+ | } | ||
+ | |||
+ | |||
+ | // Define the VPs to be loaded | ||
+ | //------------------------- | ||
+ | val nbP = dictP.count.toInt | ||
+ | val v = (0 to nbP-1) | ||
+ | |||
+ | |||
+ | // SPECIFY THE PARTITIONING : either default or subject based | ||
+ | // ------------------------ | ||
+ | // Default partitioning (round robin) | ||
+ | val VP2Random = v.map(i => (i, sqlContext.read.parquet(vpDir + "/p" + i).repartition(NB_FRAGMENTS) )).toMap | ||
+ | |||
+ | // Partitioning by SUBJECT (s) | ||
+ | val VP2 = v.map(i => (i, sqlContext.read.parquet(vpDir + "/p" + i).repartition(NB_FRAGMENTS, col("s")) )).toMap | ||
+ | |||
+ | |||
+ | // load VP sizes | ||
+ | val VP2Size = sqlContext.read.parquet(vpDir + "/size").collect.map(r => (r.getLong(0).toInt, r.getLong(1))).toMap | ||
+ | |||
+ | val nameSpace = Map( | ||
+ | "dc" -> "http://purl.org/dc/terms/", | ||
+ | "foaf" -> "http://xmlns.com/foaf/", | ||
+ | "gr" -> "http://purl.org/goodrelations/", | ||
+ | "gn" -> "http://www.geonames.org/ontology#", | ||
+ | "mo" -> "http://purl.org/ontology/mo/", | ||
+ | "og" -> "http://ogp.me/ns#", | ||
+ | "rev" -> "http://purl.org/stuff/rev#", | ||
+ | "rdf" -> "http://www.w3.org/1999/02/22-rdf-syntax-ns#", | ||
+ | "rdfs" -> "http://www.w3.org/2000/01/rdf-schema#", | ||
+ | "sorg" -> "http://schema.org/", | ||
+ | "wsdbm" -> "http://db.uwaterloo.ca/~galuc/wsdbm/") | ||
+ | |||
+ | def getIdP(prefix: String, p: String):Int = { | ||
+ | val ns = nameSpace.get(prefix).get | ||
+ | val full = ns + p | ||
+ | return dictP.where(s"p = '<$full>'").take(1).head.getLong(1).toInt | ||
+ | } | ||
+ | |||
+ | |||
+ | def getIdSO(prefix: String, s: String): Long = { | ||
+ | val ns = nameSpace.get(prefix).get | ||
+ | val full = ns + s | ||
+ | return dictSO.where(s"so = '<$full>'").take(1).head.getLong(1) | ||
+ | } | ||
+ | |||
+ | </code> | ||
+ |