import org.apache.spark.sql.DataFrame val NB_FRAGMENTS = sc.defaultParallelism val dir = "/user/hubert/watdiv" val scale = "1G" val inputFile = dir + "/WD" + scale + ".nt" val dictSOFile = dir + "/dictSO" + scale val encodedFile = dir + "/frame" + scale val vpDir = dir + "/vp" + scale // properties val dictPFile = dir + "/dictP" /* ------------------------------------------------- Propertiy encoding ------------------------------------------------- */ // id P in [0, 86[ val smallInput = dir + "/WD1M.nt" val dictP = sc.textFile(smallInput).coalesce(NB_FRAGMENTS). map(line => line.substring(0, line.length -2).split("\t")). map(tab => tab(1)). distinct.zipWithIndex(). coalesce(1). toDF("p","id") dictP.save(dictPFile) // Encoding of subjects and objects // --------------------------------- // read textual WatDiv dataset val d = sc.textFile(inputFile).coalesce(NB_FRAGMENTS). map(line => line.substring(0, line.length -2).split("\t")). map(tab => (tab(0), tab(1), tab(2)) ) //d.distinct.count //1 091 840 151 //d.count //1 098 889 684 // dictionnary for S,O ids // ids in [0, nb noeuds distincts[ val dictSO = d.flatMap{case (s,p,o) => Array(s,o)}. distinct.zipWithIndex(). toDF("so","id") dictSO.save(dictSOFile) // read dictionnaries //val dictP = sqlContext.read.parquet(dictPFile) //val dictSO = sqlContext.read.parquet(dictSOFile) // --------------------- // Encoding triples // --------------------- // Remove doubles in the original dataset, then encode val numD = d.toDF("s", "p", "o"). distinct(). join(dictP, Seq("p")). select("s","id","o"). withColumnRenamed("id","idP"). join(dictSO.withColumnRenamed("so","o"), Seq("o")). select("s","idP","id"). withColumnRenamed("id","idO"). join(dictSO.withColumnRenamed("so","s"), Seq("s")). select("id","idP","idO"). withColumnRenamed("id","idS") numD.save(encodedFile)
Create one dataset per property.
/* val df = num. withColumnRenamed("idS","s"). withColumnRenamed("idP","p"). withColumnRenamed("idO","o") */ val df = sqlContext.read.parquet(encodedFile).coalesce(NB_FRAGMENTS). withColumnRenamed("idS","s"). withColumnRenamed("idP","p"). withColumnRenamed("idO","o") // size of VPs val VPSize = df.groupBy("p").count(). withColumnRenamed("count","card") VPSize.coalesce(1).save(vpDir + "/size") // VP definition and materialization //----------------------------------- val nbP = dictP.count.toInt val v = (0 to nbP-1) val VP = v.map(i => (i, df.where(s"p=$i").select("s","o")) ).toMap // save VPs VP.map{case(i,v) => v.coalesce(48).save(vpDir + "/p" + i)}
// S2RDF VP // -------- import org.apache.spark.sql.DataFrame val NB_FRAGMENTS = sc.defaultParallelism val dir = "/user/hubert/watdiv" // 1 billion triples val scale = "1G" val encodedFile = dir + "/frame" + scale // Dictionnaries // ------------- val dictSOFile = dir + "/dictSO" + scale val dictPFile = dir + "/dictP" val dictP = sqlContext.read.parquet(dictPFile).coalesce(1) dictP.persist().count val dictSO = sqlContext.read.parquet(dictSOFile).coalesce(NB_FRAGMENTS) //val dictSO = sqlContext.read.parquet(dictSOFile).repartition(NB_FRAGMENTS, col("so")) dictSO.persist().count // VP Dataset // ------- val vpDir = dir + "/vp" + scale // TIMER def queryTimeDFIter(q: DataFrame, nbIter: Int): Unit = { var l = new scala.collection.mutable.ArrayBuffer[Double](nbIter) for( i <- 1 to nbIter) { var start = java.lang.System.currentTimeMillis(); var c = q.count var t = (java.lang.System.currentTimeMillis() - start).toDouble /1000 l.append(t) println("") println(s"Count=$c, Time= $t (s)") } val avg = l.reduce(_+_).toDouble/l.size println(s"AVERAGE time for ${l.size} values is: $avg") } // Define the VPs to be loaded //------------------------- val nbP = dictP.count.toInt val v = (0 to nbP-1) // SPECIFY THE PARTITIONING : either default or subject based // ------------------------ // Default partitioning (round robin) val VP2Random = v.map(i => (i, sqlContext.read.parquet(vpDir + "/p" + i).repartition(NB_FRAGMENTS) )).toMap // Partitioning by SUBJECT (s) val VP2 = v.map(i => (i, sqlContext.read.parquet(vpDir + "/p" + i).repartition(NB_FRAGMENTS, col("s")) )).toMap // load VP sizes val VP2Size = sqlContext.read.parquet(vpDir + "/size").collect.map(r => (r.getLong(0).toInt, r.getLong(1))).toMap val nameSpace = Map( "dc" -> "http://purl.org/dc/terms/", "foaf" -> "http://xmlns.com/foaf/", "gr" -> "http://purl.org/goodrelations/", "gn" -> "http://www.geonames.org/ontology#", "mo" -> "http://purl.org/ontology/mo/", "og" -> "http://ogp.me/ns#", "rev" -> "http://purl.org/stuff/rev#", "rdf" -> "http://www.w3.org/1999/02/22-rdf-syntax-ns#", "rdfs" -> "http://www.w3.org/2000/01/rdf-schema#", "sorg" -> "http://schema.org/", "wsdbm" -> "http://db.uwaterloo.ca/~galuc/wsdbm/") def getIdP(prefix: String, p: String):Int = { val ns = nameSpace.get(prefix).get val full = ns + p return dictP.where(s"p = '<$full>'").take(1).head.getLong(1).toInt } def getIdSO(prefix: String, s: String): Long = { val ns = nameSpace.get(prefix).get val full = ns + s return dictSO.where(s"so = '<$full>'").take(1).head.getLong(1) }