Bases de Données / Databases

Site Web de l'équipe BD du LIP6 / LIP6 DB Web Site

User Tools

Site Tools


en:site:recherche:logiciels:sparqlwithspark:datasetwatdiv

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Next revision
Previous revision
en:site:recherche:logiciels:sparqlwithspark:datasetwatdiv [14/09/2016 14:15]
hubert created
en:site:recherche:logiciels:sparqlwithspark:datasetwatdiv [16/09/2016 23:01] (current)
hubert [Load VP's]
Line 1: Line 1:
-====== WatDiv Dataset ======+{{indexmenu_n>​1}}
  
 +====== Loading WatDiv Dataset ======
  
-VP creation + 
-<​code>​+===== Data preparation:​ encode raw data ===== 
 + 
 +<​code ​scala>
 import org.apache.spark.sql.DataFrame import org.apache.spark.sql.DataFrame
  
Line 80: Line 83:
  
 numD.save(encodedFile) numD.save(encodedFile)
 +</​code>​
  
 +===== Create VP's =====
  
- +Create one dataset per property. 
-// ------------------- +<code scala>
-// creation of VP's +
-// ------------------- +
- +
-// triple(id, dataframe, count) +
 /* /*
 val df = num. val df = num.
Line 100: Line 100:
   withColumnRenamed("​idP","​p"​).   withColumnRenamed("​idP","​p"​).
   withColumnRenamed("​idO","​o"​)   withColumnRenamed("​idO","​o"​)
- +  ​
- +
 // size of VPs // size of VPs
 val VPSize = df.groupBy("​p"​).count(). val VPSize = df.groupBy("​p"​).count().
   withColumnRenamed("​count","​card"​)   withColumnRenamed("​count","​card"​)
 VPSize.coalesce(1).save(vpDir + "/​size"​) VPSize.coalesce(1).save(vpDir + "/​size"​)
- 
  
 // VP definition and materialization // VP definition and materialization
Line 120: Line 117:
  
 </​code>​ </​code>​
 +
 +===== Load VP's =====
 +<code scala>
 +
 +// S2RDF VP
 +// --------
 +
 +import org.apache.spark.sql.DataFrame
 +
 +val NB_FRAGMENTS = sc.defaultParallelism
 +
 +val dir = "/​user/​hubert/​watdiv"​
 +
 +// 1 billion triples
 +val scale = "​1G"​
 +
 +
 +val encodedFile = dir + "/​frame"​ + scale
 +
 +// Dictionnaries ​
 +// -------------
 +val dictSOFile = dir + "/​dictSO"​ + scale
 +val dictPFile = dir + "/​dictP"​
 +
 +val dictP = sqlContext.read.parquet(dictPFile).coalesce(1)
 +dictP.persist().count
 +
 +val dictSO = sqlContext.read.parquet(dictSOFile).coalesce(NB_FRAGMENTS)
 +//val dictSO = sqlContext.read.parquet(dictSOFile).repartition(NB_FRAGMENTS,​ col("​so"​))
 +dictSO.persist().count
 +
 +
 +// VP Dataset
 +// -------
 +val vpDir = dir + "/​vp"​ + scale
 +
 +
 +// TIMER
 +def queryTimeDFIter(q:​ DataFrame, nbIter: Int): Unit = {
 +  var l = new scala.collection.mutable.ArrayBuffer[Double](nbIter)
 +  for( i <- 1 to nbIter) {
 +    var start = java.lang.System.currentTimeMillis();​
 +    var c = q.count
 +    var t = (java.lang.System.currentTimeMillis() - start).toDouble /1000
 +    l.append(t)
 +    println(""​)
 +    println(s"​Count=$c,​ Time= $t (s)")
 +  }
 +  val avg = l.reduce(_+_).toDouble/​l.size
 +  println(s"​AVERAGE time for ${l.size} values is: $avg")
 +}
 +
 +
 +// Define the VPs to be loaded
 +//​-------------------------
 +val nbP = dictP.count.toInt
 +val v = (0 to nbP-1)
 +
 +
 +// SPECIFY THE PARTITIONING : either default or subject based
 +// ------------------------
 +// Default partitioning (round robin)
 +val VP2Random = v.map(i => (i, sqlContext.read.parquet(vpDir + "/​p"​ + i).repartition(NB_FRAGMENTS) )).toMap
 +
 +// Partitioning by SUBJECT (s)
 +val VP2 = v.map(i => (i, sqlContext.read.parquet(vpDir + "/​p"​ + i).repartition(NB_FRAGMENTS,​ col("​s"​)) )).toMap
 +
 +
 +// load VP sizes
 +val VP2Size = sqlContext.read.parquet(vpDir + "/​size"​).collect.map(r => (r.getLong(0).toInt,​ r.getLong(1))).toMap
 +
 +val nameSpace = Map( 
 +"​dc"​ -> "​http://​purl.org/​dc/​terms/",​
 +"​foaf"​ -> "​http://​xmlns.com/​foaf/",​
 +"​gr"​ -> "​http://​purl.org/​goodrelations/",​
 +"​gn"​ -> "​http://​www.geonames.org/​ontology#",​
 +"​mo"​ -> "​http://​purl.org/​ontology/​mo/",​
 +"​og"​ -> "​http://​ogp.me/​ns#",​
 +"​rev"​ -> "​http://​purl.org/​stuff/​rev#",​
 +"​rdf"​ -> "​http://​www.w3.org/​1999/​02/​22-rdf-syntax-ns#",​
 +"​rdfs"​ -> "​http://​www.w3.org/​2000/​01/​rdf-schema#",​
 +"​sorg"​ -> "​http://​schema.org/",​
 +"​wsdbm"​ -> "​http://​db.uwaterloo.ca/​~galuc/​wsdbm/"​)
 +
 +def getIdP(prefix:​ String, p: String):Int =  {
 +  val ns = nameSpace.get(prefix).get
 +  val full = ns + p
 +  return dictP.where(s"​p = '<​$full>'"​).take(1).head.getLong(1).toInt
 +}
 +
 +
 +def getIdSO(prefix:​ String, s: String): Long =  {
 +  val ns = nameSpace.get(prefix).get
 +  val full = ns + s
 +  return dictSO.where(s"​so = '<​$full>'"​).take(1).head.getLong(1)
 +}
 +
 +</​code>​
 +
en/site/recherche/logiciels/sparqlwithspark/datasetwatdiv.1473855330.txt.gz · Last modified: 14/09/2016 14:15 by hubert