Table of Contents

Loading WatDiv Dataset

Data preparation: encode raw data

import org.apache.spark.sql.DataFrame
 
 
val NB_FRAGMENTS = sc.defaultParallelism
val dir = "/user/hubert/watdiv"
val scale = "1G"
 
val inputFile = dir + "/WD" + scale + ".nt"
val dictSOFile = dir + "/dictSO" + scale
val encodedFile = dir + "/frame" + scale
val vpDir = dir + "/vp" + scale
 
// properties
val dictPFile = dir + "/dictP"
 
 
/*
-------------------------------------------------
   Propertiy encoding
-------------------------------------------------
*/
// id P in [0, 86[
 
val smallInput = dir + "/WD1M.nt"
val dictP = sc.textFile(smallInput).coalesce(NB_FRAGMENTS).
  map(line => line.substring(0, line.length -2).split("\t")).
  map(tab => tab(1)).
  distinct.zipWithIndex().
  coalesce(1).
  toDF("p","id")
 
dictP.save(dictPFile)
 
// Encoding of subjects and objects
// ---------------------------------
 
// read textual WatDiv dataset
val d = sc.textFile(inputFile).coalesce(NB_FRAGMENTS).
  map(line => line.substring(0, line.length -2).split("\t")).
  map(tab => (tab(0), tab(1), tab(2)) )
 
//d.distinct.count //1 091 840 151
//d.count          //1 098 889 684
 
 
// dictionnary for S,O ids
// ids in [0, nb noeuds distincts[
val dictSO = d.flatMap{case (s,p,o) => Array(s,o)}.
  distinct.zipWithIndex().
  toDF("so","id")
 
dictSO.save(dictSOFile)
 
 
// read dictionnaries
//val dictP = sqlContext.read.parquet(dictPFile)
//val dictSO = sqlContext.read.parquet(dictSOFile)
 
// ---------------------
// Encoding triples
// ---------------------
 
// Remove doubles in the original dataset, then encode
val numD = d.toDF("s", "p", "o").
  distinct().
  join(dictP, Seq("p")).
  select("s","id","o").
  withColumnRenamed("id","idP").
  join(dictSO.withColumnRenamed("so","o"), Seq("o")).
  select("s","idP","id").
  withColumnRenamed("id","idO").
  join(dictSO.withColumnRenamed("so","s"), Seq("s")).
  select("id","idP","idO").
  withColumnRenamed("id","idS")
 
numD.save(encodedFile)

Create VP's

Create one dataset per property.

/*
val df = num.
  withColumnRenamed("idS","s").
  withColumnRenamed("idP","p").
  withColumnRenamed("idO","o")
 */
 
val df = sqlContext.read.parquet(encodedFile).coalesce(NB_FRAGMENTS).
  withColumnRenamed("idS","s").
  withColumnRenamed("idP","p").
  withColumnRenamed("idO","o")
 
// size of VPs
val VPSize = df.groupBy("p").count().
  withColumnRenamed("count","card")
VPSize.coalesce(1).save(vpDir + "/size")
 
// VP definition and materialization
//-----------------------------------
val nbP = dictP.count.toInt
val v = (0 to nbP-1)
 
val VP = v.map(i => (i, df.where(s"p=$i").select("s","o")) ).toMap
 
// save VPs
VP.map{case(i,v) => v.coalesce(48).save(vpDir + "/p" + i)}

Load VP's

// S2RDF VP
// --------
 
import org.apache.spark.sql.DataFrame
 
val NB_FRAGMENTS = sc.defaultParallelism
 
val dir = "/user/hubert/watdiv"
 
// 1 billion triples
val scale = "1G"
 
 
val encodedFile = dir + "/frame" + scale
 
// Dictionnaries 
// -------------
val dictSOFile = dir + "/dictSO" + scale
val dictPFile = dir + "/dictP"
 
val dictP = sqlContext.read.parquet(dictPFile).coalesce(1)
dictP.persist().count
 
val dictSO = sqlContext.read.parquet(dictSOFile).coalesce(NB_FRAGMENTS)
//val dictSO = sqlContext.read.parquet(dictSOFile).repartition(NB_FRAGMENTS, col("so"))
dictSO.persist().count
 
 
// VP Dataset
// -------
val vpDir = dir + "/vp" + scale
 
 
// TIMER
def queryTimeDFIter(q: DataFrame, nbIter: Int): Unit = {
  var l = new scala.collection.mutable.ArrayBuffer[Double](nbIter)
  for( i <- 1 to nbIter) {
    var start = java.lang.System.currentTimeMillis();
    var c = q.count
    var t = (java.lang.System.currentTimeMillis() - start).toDouble /1000
    l.append(t)
    println("")
    println(s"Count=$c, Time= $t (s)")
  }
  val avg = l.reduce(_+_).toDouble/l.size
  println(s"AVERAGE time for ${l.size} values is: $avg")
}
 
 
// Define the VPs to be loaded
//-------------------------
val nbP = dictP.count.toInt
val v = (0 to nbP-1)
 
 
// SPECIFY THE PARTITIONING : either default or subject based
// ------------------------
// Default partitioning (round robin)
val VP2Random = v.map(i => (i, sqlContext.read.parquet(vpDir + "/p" + i).repartition(NB_FRAGMENTS) )).toMap
 
// Partitioning by SUBJECT (s)
val VP2 = v.map(i => (i, sqlContext.read.parquet(vpDir + "/p" + i).repartition(NB_FRAGMENTS, col("s")) )).toMap
 
 
// load VP sizes
val VP2Size = sqlContext.read.parquet(vpDir + "/size").collect.map(r => (r.getLong(0).toInt, r.getLong(1))).toMap
 
val nameSpace = Map( 
"dc" -> "http://purl.org/dc/terms/",
"foaf" -> "http://xmlns.com/foaf/",
"gr" -> "http://purl.org/goodrelations/",
"gn" -> "http://www.geonames.org/ontology#",
"mo" -> "http://purl.org/ontology/mo/",
"og" -> "http://ogp.me/ns#",
"rev" -> "http://purl.org/stuff/rev#",
"rdf" -> "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
"rdfs" -> "http://www.w3.org/2000/01/rdf-schema#",
"sorg" -> "http://schema.org/",
"wsdbm" -> "http://db.uwaterloo.ca/~galuc/wsdbm/")
 
def getIdP(prefix: String, p: String):Int =  {
  val ns = nameSpace.get(prefix).get
  val full = ns + p
  return dictP.where(s"p = '<$full>'").take(1).head.getLong(1).toInt
}
 
 
def getIdSO(prefix: String, s: String): Long =  {
  val ns = nameSpace.get(prefix).get
  val full = ns + s
  return dictSO.where(s"so = '<$full>'").take(1).head.getLong(1)
}