Bases de Données / Databases

Site Web de l'équipe BD du LIP6 / LIP6 DB Web Site

User Tools

Site Tools


en:site:recherche:logiciels:sparqlwithspark:datasetwatdiv

This is an old revision of the document!


Loading WatDiv Dataset

Load and encode data

import org.apache.spark.sql.DataFrame


val NB_FRAGMENTS = sc.defaultParallelism
val dir = "/user/hubert/watdiv"
val scale = "1G"

val inputFile = dir + "/WD" + scale + ".nt"
val dictSOFile = dir + "/dictSO" + scale
val encodedFile = dir + "/frame" + scale
val vpDir = dir + "/vp" + scale

// properties
val dictPFile = dir + "/dictP"


/*
-------------------------------------------------
   Propertiy encoding
-------------------------------------------------
*/
// id P in [0, 86[

val smallInput = dir + "/WD1M.nt"
val dictP = sc.textFile(smallInput).coalesce(NB_FRAGMENTS).
  map(line => line.substring(0, line.length -2).split("\t")).
  map(tab => tab(1)).
  distinct.zipWithIndex().
  coalesce(1).
  toDF("p","id")

dictP.save(dictPFile)

// Encoding of subjects and objects
// ---------------------------------

// read textual WatDiv dataset
val d = sc.textFile(inputFile).coalesce(NB_FRAGMENTS).
  map(line => line.substring(0, line.length -2).split("\t")).
  map(tab => (tab(0), tab(1), tab(2)) )

//d.distinct.count //1 091 840 151
//d.count          //1 098 889 684


// dictionnary for S,O ids
// ids in [0, nb noeuds distincts[
val dictSO = d.flatMap{case (s,p,o) => Array(s,o)}.
  distinct.zipWithIndex().
  toDF("so","id")

dictSO.save(dictSOFile)


// read dictionnaries
//val dictP = sqlContext.read.parquet(dictPFile)
//val dictSO = sqlContext.read.parquet(dictSOFile)

// ---------------------
// Encoding triples
// ---------------------

// Remove doubles in the original dataset, then encode
val numD = d.toDF("s", "p", "o").
  distinct().
  join(dictP, Seq("p")).
  select("s","id","o").
  withColumnRenamed("id","idP").
  join(dictSO.withColumnRenamed("so","o"), Seq("o")).
  select("s","idP","id").
  withColumnRenamed("id","idO").
  join(dictSO.withColumnRenamed("so","s"), Seq("s")).
  select("id","idP","idO").
  withColumnRenamed("id","idS")

numD.save(encodedFile)

Create VP's

Create one dataset per property.


// triple(id, dataframe, count)

/*
val df = num.
  withColumnRenamed("idS","s").
  withColumnRenamed("idP","p").
  withColumnRenamed("idO","o")
 */

val df = sqlContext.read.parquet(encodedFile).coalesce(NB_FRAGMENTS).
  withColumnRenamed("idS","s").
  withColumnRenamed("idP","p").
  withColumnRenamed("idO","o")



// size of VPs
val VPSize = df.groupBy("p").count().
  withColumnRenamed("count","card")
VPSize.coalesce(1).save(vpDir + "/size")


// VP definition and materialization
//-----------------------------------
val nbP = dictP.count.toInt
val v = (0 to nbP-1)

val VP = v.map(i => (i, df.where(s"p=$i").select("s","o")) ).toMap

// save VPs
VP.map{case(i,v) => v.coalesce(48).save(vpDir + "/p" + i)}
en/site/recherche/logiciels/sparqlwithspark/datasetwatdiv.1473855558.txt.gz · Last modified: 14/09/2016 14:19 by hubert