This wiki page provides information about the experiments RDF distribution approaches using Spark. This information consists of i) the query workload ii) the source code for both data preparation and query evaluation, and iii) the description of two datasets used in the experiments. For the sake of reproducibility, each source code is provided as a script which can be directly executed in the spark shell.
We picked three queries from the LUBM dataset, namely #2, #9 and #12 which are referred to as Query 2, Query 3 and Query 4, respectively. We have created an addirional one referred to as Query 1. We also created two queries for the Wikidata dataset which are referred to as Query 5 and Query 6.
SELECT ?x ?y ?z WHERE {?x lubm:advisor ?y. ?y lubm:worksFor ?z. ?z lubm:subOrganisation ?t }
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> PREFIX ub: <http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#> SELECT ?X, ?Y, ?Z WHERE {?X rdf:type ub:GraduateStudent . ?Y rdf:type ub:University . ?Z rdf:type ub:Department . ?X ub:memberOf ?Z . ?Z ub:subOrganizationOf ?Y . ?X ub:undergraduateDegreeFrom ?Y}
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> PREFIX ub: <http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#> SELECT ?X, ?Y, ?Z WHERE {?X rdf:type ub:Student . ?Y rdf:type ub:Faculty . ?Z rdf:type ub:Course . ?X ub:advisor ?Y . ?Y ub:teacherOf ?Z . ?X ub:takesCourse ?Z}
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> PREFIX ub: <http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#> SELECT ?X, ?Y WHERE {?X rdf:type ub:Chair . ?Y rdf:type ub:Department . ?X ub:worksFor ?Y . ?Y ub:subOrganizationOf <http://www.University0.edu>}
SELECT ?x ?y ?z WHERE {?x entity:P131s ?y. ?y entity:P961v> ?z. ?z entity:P704s ?w }
SELECT ?x ?y ?z {?x entity:P39v ?y. ?x entity:P580q ?z. ?x rdf:type ?w }
The following code corresponds to the triple hashing and subject hashing approaches for the LUBM datasets. It consists of a data preparation part followd by a query evaluation part.
import org.apache.spark.HashPartitioner import scala.collection.mutable.ListBuffer val folder= "lubm" val dataset= "univ" val scale="10k" val coreNumber = 20 val machine = sc.defaultParallelism /coreNumber val part = sc.defaultParallelism val folderName = folder + scale val fileName = dataset + scale val t1 = java.lang.System.currentTimeMillis(); /** * set inputData with the path to the data encoded as quadruples (see Datasets excerpts) */ // loading and transformating the dataset val triples0 = sc.textFile(s"/user/olivier/${folderName}/${fileName}_encoded_unique").map(x => x.split(" ")).map(t => ((t(0).toLong+t(1).toLong+t(2).toLong).hashCode%machine,(t(0).toLong,t(1).toLong, t(2).toLong))).persist triples0.count val t2= java.lang.System.currentTimeMillis(); println("Loading time "+ (t2 - t1) +" msec for "+part+" partitions"); // Partitioning the dataset /** * Uncomment one of the following lines depending on whether hashing is applied on the entire triple or only the subject */ // val triples = triples0.partitionBy(new HashPartitioner(part)) // val triples = triples0.map{case(f,(s,p,o))=>((s+p+o),(s,p,o))}.partitionBy(new HashPartitioner(part)) //triples.persist triples.count val t3= java.lang.System.currentTimeMillis(); println("Partitioning time "+ (t3 - t2) +" msec for "+part+" partitions"); val oDist = triples.map{ case(f,(s,p,o)) => (f,1)}.reduceByKey( (a,b) => a+b) val oMean = oDist.map{case(f,c)=>c}.sum / machine val odevs = oDist.map{case(f,c)=>c}.map(score => (score - oMean) * (score - oMean)) val ostddev = Math.sqrt(odevs.sum / machine) triples.partitions.length // declare constants for BGP translation // This constants have been generated with our encoding method val advisor : Long = 1233125376 val worksFor : Long = 1136656384 val suborg : Long = 1224736768 val memof : Long = 1132462080 val undeg : Long = 1101004800 val teaof : Long = 1199570944 val takco : Long = 1115684864 // ----------------------------------------------------------- // Query 1 : (not part of the benchmark) // Pattern: (x advisor y) (y worksFor z) (z subOrganisation t) // ----------------------------------------------------------- var t1 = java.lang.System.currentTimeMillis(); var pataws = triples.filter({case(s,p,o) => p==advisor}).map({case(s,p,o) => (o,s)}). join(triples.filter({case(s,p,o) => p==worksFor}).map({case(s,p,o) => (s,o)}),part). map({case (y,(x,z)) => (z,(x,y))}). join(triples.filter({case(s,p,o) => p==suborg}).map({case(s,p,o) => (s,o)}),part). map({case (y,(x,z)) => (z,(x,y))}).distinct pataws.count var t2= java.lang.System.currentTimeMillis(); println("Processing Q1 "+ (t2 - t1) +" msec for "+part+" partitions"); // ----------------------------------------------------------- // LUBM 2 : MSU // Pattern: (x memberOf y) (y subOrg z) (x UndergraduateDegreeFrom z) // ----------------------------------------------------------- var t1 = java.lang.System.currentTimeMillis(); var pmemof = triples.filter({case(s,p,o) => p==memof}).cache() var patmsu = pmemof.map({case(s,p,o) => (o,s)}). join(triples.filter({case(s,p,o) => p==suborg}).map({case(s,p,o) => (s,o)}),part). map({case (y,(x,z)) => (x+""+z,(x,y,z))}). join(triples.filter({case(s,p,o) => p==undeg}).map({case(x,p,z)=> (x+""+z,null)}), part) patmsu.count var patmsu2 = patmsu.flatMap( identity).distinct var t2= java.lang.System.currentTimeMillis(); println("Processing Q2 "+ (t2 - t1) +" msec for "+part+" partitions"); // ----------------------------------------------------------- // LUBM 9 : ATT // Pattern: (x advisor y) (y teacherOf z) (x takesCourse z) // ----------------------------------------------------------- var t1 = java.lang.System.currentTimeMillis(); var patatt = triples.filter({case(s,p,o) => p==advisor}).map({case(s,p,o) => (o,s)}). join(triples.filter({case(s,p,o) => p==teaof}).map({case(s,p,o) => (s,o)}),part). map({case (y,(x,z)) => (x+""+z,(x,y,z))}). join(triples.filter({case(s,p,o) => p==takco}).map({case(s,p,o)=> (s+""+o,null)}),part) patatt.distinct.count var t2= java.lang.System.currentTimeMillis(); println("Processing Q3 (LUBM #9) "+ (t2 - t1) +" msec for "+part+" partitions"); // ----------------------------------------------------------- // LUBM 12 : WS // Pattern: (y worksFor z) (z subOrganisation t) // ----------------------------------------------------------- var t1 = java.lang.System.currentTimeMillis(); var patws = triples.filter({case(s,p,o) => p==worksFor}).map({case(s,p,o) => (o,(s,p,o))}). join(triples.filter({case(s,p,o) => p==suborg}).map({case(s,p,o) => (s,(s,p,o))}),part). map({case (k,((s1,p1,o1),(s2,p2,o2))) => (s1,o1,o2)}) var ans_patws = patws.distinct.count var t2= java.lang.System.currentTimeMillis(); println("Processing LUBM #12 "+ (t2 - t1) +" msec for "+part+" partitions"); // ----------------------------------------------------------- // highly selective on wikidata :: 8 results // QR5: ?x <http://www.wikidata.org/entity/P131s> ?y. ?y <http://www.wikidata.org/entity/P961v> ?z. ?z <http://www.wikidata.org/entity/P704s> ?w. // ----------------------------------------------------------- val qr5 = triples.filter({case(s,p,o)=>p==P131s}).map({case(s,p,o)=>(o,(s,p))}). join(triples.filter({case(s,p,o)=>p==P961v}).map({case(s,p,o)=>(s,(p,o))})). map({case(k,((s1,p1),(p2,o2)))=>(o2,s1)}). join(triples.filter({case(s,p,o)=>p==P704v}).map({case(s,p,o)=>(s,(p,o))})) qr5.persist qr5.collect // mid selectivity start shaped query : 10418 results // QR6: ?x <http://www.wikidata.org/entity/P39v> ?y. ?x <http://www.wikidata.org/entity/P580q> ?z. ?x <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> ?w val qr6 = triples.filter({case(s,p,o)=>p==P39v}).map({case(s,p,o)=>(s,null)}). join(triples.filter({case(s,p,o)=>p==P580q}).map({case(s,p,o)=>(s,null)})). join(triples.filter({case(s,p,o)=>p==rdftype})) qr6.persist qr6.collect
import org.apache.spark.HashPartitioner import scala.collection.mutable.ListBuffer val folder= "lubm" //"watdiv" val dataset= "univ" //"watdiv" val scale="1k" val part=20 //10, 20 val folderName = folder +scale val fileName = dataset+scale+"_encoded_unique_quads.part."+part+".2hop" val t1 = java.lang.System.currentTimeMillis(); val quads_I_SPO = sc.textFile(s"/user/olivier/${folderName}/${fileName}").coalesce(part).map(x=>x.split(",")).map(t=>(t(3).replace(")","").toLong, (t(0).replace("(","").toLong,t(1).toLong,t(2).toLong))) val quadsDist = quads_I_SPO.partitionBy(new HashPartitioner(part)).persist val t2 = java.lang.System.currentTimeMillis(); print("Loading time of quads : "+(t2-t1)/1000 +" sec") val advisor : Long = 1233125376 val worksFor : Long = 1136656384 val suborg : Long = 1224736768 val memof : Long = 113246208 val undeg : Long = 1101004800 val teaof : Long = 1199570944 val takco : Long = 1115684864 // ----------------------------------------------------------- // Query 1 : (not part of the benchmark) // Pattern: (x advisor y) (y worksFor z) (z subOrganisation t) // ----------------------------------------------------------- var t1 = java.lang.System.currentTimeMillis(); var pataws = quadsDist.filter({case(i,(s,p,o)) => p==advisor}).map({case(i,(s,p,o)) => (o,s)}). join(quadsDist.filter({case(i,(s,p,o)) => p==worksFor}).map({case(i,(s,p,o)) => (s,o)}),part). map({case (y,(x,z)) => (z,(x,y))}). join(quadsDist.filter({case(i,(s,p,o)) => p==suborg}).map({case(i,(s,p,o)) => (s,o)}), part). map({case (z,((x,y),t)) => (x,y,z,t)}) pataws.count var pataws2 = pataws.flatMap(x=>x).distinct var t2= java.lang.System.currentTimeMillis(); println("Processing Q1 "+ (t2 - t1) +" msec for "+part+" partitions"); // ----------------------------------------------------------- // LUBM 2 : MSU // Pattern: (x memberOf y) (y subOrg z) (x UndergraduateDegreeFrom z) // ----------------------------------------------------------- var t1 = java.lang.System.currentTimeMillis(); //var pmemof = quadsDist.filter({case(i,(s,p,o)) => p==memof}).cache() var patmsu = quadsDist.filter({case(i,(s,p,o)) => p==memof}).map({case(i,(s,p,o)) => (o,s)}). join(quadsDist.filter({case(i,(s,p,o)) => p==suborg}).map({case(i,(s,p,o)) => (s,o)}),part). map({case (y,(x,z)) => (x+""+z,(x,y,z))}). join(quadsDist.filter({case(i,(s,p,o)) => p==undeg}).map({case(i,(x,p,z))=> (x+""+z,null)})) patmsu.count var patmsu2 = patmsu.flatMap(identity).distinct var t2= java.lang.System.currentTimeMillis(); println("Processing Q2 "+ (t2 - t1) +" msec for "+part+" partitions"); // ----------------------------------------------------------- // LUBM 9 : ATT // Pattern: (x advisor y) (y teacherOf z) (x takesCourse z) // ----------------------------------------------------------- var t1 = java.lang.System.currentTimeMillis(); var patatt = quadsDist.filter({case(i,(s,p,o)) => p==advisor}).map({case(i,(s,p,o)) => (o,s)}). join(quadsDist.filter({case(i,(s,p,o)) => p==teaof}).map({case(i,(s,p,o)) => (s,o)}), part). map({case (y,(x,z)) => (x+""+z,(x,y,z))}). join(quadsDist.filter({case(i,(s,p,o)) => p==takco}).map({case(i,(s,p,o))=> (s+""+o,null)}), part) patatt.distinct.count var t2= java.lang.System.currentTimeMillis(); println("Processing Q3 (LUBM #9) "+ (t2 - t1) +" msec for "+part+" partitions"); // ----------------------------------------------------------- // LUBM 12 : WS // Pattern: (y worksFor z) (z subOrganisation t) // ----------------------------------------------------------- val t1 = java.lang.System.currentTimeMillis(); val patws = quadsDist.filter({case(i,(s,p,o)) => p==worksFor}).map({case(i,(s,p,o)) => ((i,o),s)}).join(quadsDist.filter({case(i,(s,p,o)) => p==suborg}).map({case(i,(s,p,o)) => ((i,s),o)}),part).map({case ((i,k),(s,o)) => (s,o)}) val ans_patws = patws.distinct.count val t2= java.lang.System.currentTimeMillis(); println("Processing LUBM #12 "+ (t2 - t1) +" msec for "+part+" partitions");
// Spark implementation of WARP replication
// usage: run this code into the spark-shell
import scala.collection.mutable.ListBuffer
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.HashPartitioner
import org.apache.spark.Partitioner
import java.io.Serializable
val folder= "lubm" //"watdiv"
val dataset= "univ" //"watdiv"//
val scale="1k"
/* We have 15 cores per machine.
Each core is accessing a separate part in parallel.
The default parallelism = #machines * 15 cores
<=> #machines = defaultParallelism / 15
*/
val machine = sc.defaultParallelism /15
val part = sc.defaultParallelism
val folderName = folder + scale
val fileName = dataset + scale
/**
* set inputData with the path to the data encoded as quadruples (see Datasets excerpts)
*/
val inputData = s"/user/olivier/${folderName}/${fileName}_encoded_unique_quads.part.${machine}"
// Initial state, delete the storage
sc.getPersistentRDDs.values.foreach(x => x.unpersist())
sc.getPersistentRDDs.values.size
var t1 = java.lang.System.currentTimeMillis();
def affiche(s: String): Unit={
println("#### ------------------------------- ####")
println(s)
println("#### ------------------------------- ####")
}
/*
-----------------------
STEP 1: read triples
-----------------------
*/
/**
Function lireTriples reads the input data
returns a quad (subject, property, object, partID)
with partID = hash(subject)
**/
def lireTriples(input: String): RDD[(Long, Long, Long, Int)] = {
return sc.textFile(input).
map(line => line.substring(1, line.length -1).split(",")).
map(tab => (tab(0).toLong, tab(1).toLong, tab(2).toLong, (tab(0).toInt).hashCode%machine))
}
val triples = lireTriples(inputData)
triples.setName("triples")
triples.persist
//nombre total de nuplets
//triples.count
// stat: nb triplets par partition
//triples.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b).collect.foreach(println)
/*
---------------------------------------------------------------------------
STEP 2: COMPUTE REPLICATION TRIPLES
Computes the triple to replicate in every partition, for each query
---------------------------------------------------------------------------
*/
/*
Fonction filterProp : selects the triples a a given property
*/
def filterProp(input: RDD[(Long, Long, Long, Int)], prop: Long): RDD[(Long, Long, Int)] = {
x // on projette sur S et O car P est fixe
return input.filter(x => x._2 == prop).map{ case (s,p,o,f) => (s,o,f)}
}
def getOutsideTriple(seedF: Int, t: ListBuffer[(Long, Long, Int)], properties: Array[Long]): Seq[(Long, Long, Long, Int)] = {
var a = t.toArray
var res: ListBuffer[(Long, Long, Long, Int)] = ListBuffer()
for(i <-0 to (a.length - 1)) {
if( a(i)._3 != seedF) {
res.append( (a(i)._1, properties(i), a(i)._2, seedF))
}
}
return res
}
def getReplicaForQuery(query: RDD[ListBuffer[(Long, Long, Int)]], properties: Array[Long], nbCandidateSeed: Int): RDD[(Long, Long, Long, Int)] = {
var replica = sc.parallelize(List((0L,0L,0L,0)))
var min_nb = -1L
for (i <- 0 to (nbCandidateSeed-1)) {
var t1 = query.map(x => (x(i)._3, x))
//t1.take(5).foreach(println)
// lister les triplets a repliquer = ceux qui ne sont pas dans la seed partition
var t2 = t1.flatMap{ case(seedF, tripleList) => getOutsideTriple(seedF, tripleList, properties)}.distinct
// count the triples to replicate
var nb = t2.count
if(min_nb == -1 || nb < min_nb){
min_nb=nb
replica=t2
println("current:")
println(s"seed $i, replica count: $nb ")
}
else {
println(s"-----ignore seed $i, replica count: $nb ")
}
}
return replica
}
// -------------------------------------------------------
// Compute replication for the folowing QUERIES
// -------------------------------------------------------
// based on dictionnary encoding
val advisor : Long = 1233125376
val worksFor : Long = 1136656384
val suborg : Long = 1224736768
val memof : Long = 1132462080
val undeg : Long = 1101004800
val teaof : Long = 1199570944
val takco : Long = 1115684864
// -----------------------------------------------------------
// Query 1 : (not part of the benchmark)
// Pattern: (x advisor y) (y worksFor z) (z subOrganisation t)
// -----------------------------------------------------------
val query1 = filterProp(triples, advisor).map{ case (x, y, f1) => ( y, (x, f1))}.
join(filterProp(triples, worksFor).map{ case (y, z, f2) => ( y, (z, f2) )}).
map{ case ( y, ((x,f1), (z, f2))) => (z, (x, f1, y, f2)) }.
join(filterProp(triples, suborg).map{ case ( z, t, f3) => (z, (t,f3))}).
map{ case ( z, ( (x,f1,y,f2), (t,f3) )) => ListBuffer( (x, y, f1), (y, z, f2) , (z, t, f3))}
query1.setName("q1")
//query1.persist()
//query1.count
// resultat: 4108791
var t1 = java.lang.System.currentTimeMillis();
val replica1 = getReplicaForQuery(query1, Array(advisor, worksFor, suborg), 3)
var t2= java.lang.System.currentTimeMillis();
println("Processing Q1 "+ (t2 - t1) +" msec for "+part+" partitions");
// stat: nb triples a repliquer dans chaque partition
//replica1.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b).collect.foreach(println)
// (0,15551) (1,15776) (2,17639) (3,201275) (4,46068)
// -----------------------------------------------------------
// LUBM 2 : MSU
// Pattern: (x memberOf y) (y subOrg z) (x UndergraduateDegreeFrom z)
// -----------------------------------------------------------
val query2 = filterProp(triples, memof).map{ case (x, y, f1) => ( y, (x, f1))}.
join(filterProp(triples, suborg).map{ case (y, z, f2) => ( y, (z, f2) )}).
map{ case ( y, ((x,f1), (z,f2) )) => ((x, z), (y, f1, f2)) }.
join(filterProp(triples, undeg).map{ case (x, z, f3) => ( (x, z), f3)}).
map{ case ((x, z), ((y, f1, f2), f3)) => ListBuffer((x, y, f1), (y, z, f2), (x, z, f3)) }
query2.setName("q2")
//query2.persist()
//query2.count
val replica2 = getReplicaForQuery(query2, Array(memof, suborg, undeg), 2)
//stat: nb triples a repliquer dans chaque partition
//replica2.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b).collect.foreach(println)
//(0,1) (1,3) (3,4) (4,896)
// -----------------------------------------------------------
// LUBM 9 : ATT
// Pattern: (x advisor y) (y teacherOf z) (x takesCourse z)
// -----------------------------------------------------------
val query9 = filterProp(triples, advisor).map{ case (x, y, f1) => ( y,(x, f1))}.
join(filterProp(triples, teaof).map{ case (y, z, f2) => ( y, (z, f2) )}).
map{ case ( y, ((x, f1), (z, f2) )) => ((x, z), (y, f1, f2)) }.
join(filterProp(triples, takco).map{ case (x, z, f3) => ( (x, z), f3)}).
map{ case ((x, z), ((y, f1, f2), f3)) => ListBuffer((x, y, f1), (y, z, f2), (x, z, f3)) }
query9.setName("q9")
//query9.persist()
query9.count
// 272982
val replica9 = getReplicaForQuery(query9, Array(advisor, teaof, takco) ,2)
// stat : nb triples à repliquer dans chaque partition
//replica9.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b).collect.foreach(println)
//(0,3131) (1,3213) (2,3504) (3,44706) (4,2014)
// -----------------------------------------------------------
// LUBM 12 : WS
// Pattern: (y worksFor z) (z subOrganisation t)
// -----------------------------------------------------------
val query12 = filterProp(triples, worksFor).map{ case (y, z, f1) => ( z, (y, f1))}.
join(filterProp(triples, suborg).map{ case (z, t, f2) => ( z, (t, f2))}).
map{ case ( z, ((y, f1), (t, f2) )) => ListBuffer((y, z, f1), (z, t, f2)) }
query12.setName("q12")
//query12.persist()
query12.count
//720628
val replica12 = getReplicaForQuery(query12, Array(worksFor, suborg), 2)
//stat : nb triples à repliquer dans chaque partition
//replica12.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b).collect.foreach(println)
/*
---------------------------------------------------------------------------
STEP 3: UNION DES REPLICAS CALCULES POUR CHAQUE REQUETE
---------------------------------------------------------------------------
*/
val allreplica = replica1.union(replica2).union(replica9).union(replica12).distinct
val nbAjout = allreplica.count
// 357 161 pour 5 part
// 734 295 pour 10 part
// 779 983 pour 20 part
// replication rate
affiche("N ajout: " + nbAjout.toDouble)
affiche("taux de replication: " + (nbAjout.toDouble / triples.count))
var t2= java.lang.System.currentTimeMillis();
affiche("Preparation time "+ (t2 - t1).toDouble/1000 +" sec for " + part + " partitions");
val oDist = triples.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b)
val oMean = oDist.map{case(f,c)=>c}.sum / machine
val odevs = oDist.map{case(f,c)=>c}.map(score => (score - oMean) * (score - oMean))
val ostddev = Math.sqrt(odevs.sum / machine)
val nDist = oDist.join(allreplica.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b)).map{case(f,(oc,on))=>(oc+on)}
val nMean = nDist.sum / machine
val ndevs = nDist.map(score => (score - nMean) * (score - nMean))
val nstddev = Math.sqrt(ndevs.sum / machine)
/*
---------------------------------------------------------------------------
STEP 4: LOCAL QUERY PROCESSING
On the WARP replicated data
---------------------------------------------------------------------------
*/
// Extends each partition with the WARP replicated triples.
// Each machine stores one partition
val triplesWARP = triples.union(allreplica).
map{ case (s,p,o,f) => (f, (s,p,o))}.
partitionBy(new HashPartitioner(machine)).map{ case (f, (s,p,o)) => (s,p,o,f)}
triplesWARP.setName("triples WARP")
triplesWARP.persist()
triplesWARP.count
var t2= java.lang.System.currentTimeMillis();
affiche("Preparation time "+ (t2 - t1).toDouble/1000 +" sec for " + part + " partitions");
// -------------------
// Q1 LOCAL
// -------------------
t1= java.lang.System.currentTimeMillis();
// Query1 locale avec projection sur les variables (x,y,z,t)
val localQuery1 = filterProp(triplesWARP, advisor).map{ case (x, y, f) => ( (y, f), x)}.
join(filterProp(triplesWARP, worksFor).map{ case (y, z, f) => ( (y, f), z )}).
map{ case ( (y, f), (x, z)) => ((z, f), (x, y)) }.
join(filterProp(triplesWARP, suborg).map{ case ( z, t, f) => ((z, f), t)}).
map{ case ( (z, f), ( (x,y), t)) => (x, y, z, t)}
localQuery1.count
//4108791 OK idem sans replication
t2= java.lang.System.currentTimeMillis();
affiche("Duréee de la requête Q1 : "+ (t2 - t1) +" msec pour " + part + " partitions");
// -----------------------------------------------------------
// Q2 LOCAL:
// Pattern: (x memberOf y) (y subOrg z) (x UndergraduateDegreeFrom z)
// -----------------------------------------------------------
t1= java.lang.System.currentTimeMillis();
val localQuery2 = filterProp(triplesWARP, memof).map{ case (x, y, f) => ( (y, f), x)}.
join(filterProp(triplesWARP, suborg).map{ case (y, z, f) => ( (y, f), z )}).
map{ case ( (y, f), (x, z)) => ((x, z, f), y) }.
join(filterProp(triplesWARP, undeg).map{ case (x, z, f) => ( (x, z, f), 1)}).
map{ case ((x, z, f), (y, 1)) => (x, y, z) }
localQuery2.count
//2528
t2= java.lang.System.currentTimeMillis();
affiche("Duree de la requete Q2 : "+ (t2 - t1) +" msec pour " + part + " partitions");
// -----------------------------------------------------------
// Q9 LOCAL
// Pattern: (x advisor y) (y teacherOf z) (x takesCourse z)
// -----------------------------------------------------------
t1= java.lang.System.currentTimeMillis();
val localQuery9 = filterProp(triplesWARP, advisor).map{ case (x, y, f) => ( (y, f), x)}.
join(filterProp(triplesWARP, teaof).map{ case (y, z, f) => ( (y, f), z )}).
map{ case ( (y, f), (x, z)) => ((x, z, f), y) }.
join(filterProp(triplesWARP, takco).map{ case (x, z, f) => ( (x, z, f), 1)}).
map{ case ((x, z, f), (y, 1)) => (x, y, z) }
localQuery9.count
//272982
t2= java.lang.System.currentTimeMillis();
affiche("Duree de la requete Q9 : "+ (t2 - t1) +" msec pour " + part + " partitions");
// -----------------------------------------------------------
// Q12 LOCAL
// Pattern: (y worksFor z) (z subOrganisation t)
// -----------------------------------------------------------
t1= java.lang.System.currentTimeMillis();
val localQuery12 = filterProp(triplesWARP, worksFor).map{ case (y, z, f) => ( (z, f), y)}.
join(filterProp(triplesWARP, suborg).map{ case (z, t, f) => ( (z, f), t)}).
map{ case ( (z, f), (y, t)) => (y, z, t) }
localQuery12.count
//938356
t2= java.lang.System.currentTimeMillis();
affiche("Duree de la requete Q12: "+ (t2 - t1) +" msec pour " + part + " partitions");
val folder= "lubm" val dataset= "univ" val scale="10k" val folderName = folder +scale val part = Array(5,10,20) for (p <- part) { val fileName = dataset+scale+"_encoded_unique_quads.part."+p val fileNamewatdiv2k_encoded_unique_quads.partNew.5 val t1 = java.lang.System.currentTimeMillis(); val quads = sc.textFile(s"/user/olivier/${folderName}/${fileName}").map(x=>x.split(",")).map(t=>(t(0).replace("(","").toLong,t(1).toLong,t(2).toLong,t(3).replace(")","").toLong)) var addOneHop = quads.map({case(s,p,o,i)=>(o,i)}).join(quads.map({case(s,p,o,i)=>(s,(p,o,i))})).filter({case(termS,(i1,(p,o,i2)))=>i1!=i2}).distinct.map({case(termS,(i1,(p,o,i2)))=>(termS,p,o,i1)}) val newQuads = quads.union(addOneHop).distinct val newQuadsSize = newQuads.count val t2 = java.lang.System.currentTimeMillis(); val hopSize = addOneHop.count println(s"Time to compute one more hop on $folderName for $p partitions is ${t2-t1}") println(s" new size = $newQuadsSize , added $hopSize") newQuads.saveAsTextFile(s"/user/olivier/${folderName}/${fileName}.2hop") }
Properties: 0 <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> 603979776 <http://www.univ-mlv.fr/~ocure/lubm.owl#officeNumber> 671088640 <http://www.univ-mlv.fr/~ocure/lubm.owl#name> 738197504 <http://www.univ-mlv.fr/~ocure/lubm.owl#title> 805306368 <http://www.univ-mlv.fr/~ocure/lubm.owl#age> 872415232 <http://www.univ-mlv.fr/~ocure/lubm.owl#telephone> 939524096 <http://www.univ-mlv.fr/~ocure/lubm.owl#emailAddress> 1006632960 <http://www.univ-mlv.fr/~ocure/lubm.owl#researchInterest> 1082130432 <http://www.univ-mlv.fr/~ocure/lubm.owl#researchProject> 1090519040 <http://www.univ-mlv.fr/~ocure/lubm.owl#hasAlumnus> 1098907648 <http://www.univ-mlv.fr/~ocure/lubm.owl#degreeFrom> 1101004800 <http://www.univ-mlv.fr/~ocure/lubm.owl#undergraduateDegreeFrom> 1103101952 <http://www.univ-mlv.fr/~ocure/lubm.owl#mastersDegreeFrom> 1105199104 <http://www.univ-mlv.fr/~ocure/lubm.owl#doctoralDegreeFrom> 1107296256 <http://www.univ-mlv.fr/~ocure/lubm.owl#orgPublication> 1115684864 <http://www.univ-mlv.fr/~ocure/lubm.owl#takesCourse> 1124073472 <http://www.univ-mlv.fr/~ocure/lubm.owl#member> 1132462080 <http://www.univ-mlv.fr/~ocure/lubm.owl#memberOf> 1136656384 <http://www.univ-mlv.fr/~ocure/lubm.owl#worksFor> 1138753536 <http://www.univ-mlv.fr/~ocure/lubm.owl#headOf> 1140850688 <http://www.univ-mlv.fr/~ocure/lubm.owl#teachingAssistantOf> 1149239296 <http://www.univ-mlv.fr/~ocure/lubm.owl#listedCourse> 1157627904 <http://www.univ-mlv.fr/~ocure/lubm.owl#softwareDocumentation> 1166016512 <http://www.univ-mlv.fr/~ocure/lubm.owl#publicationAuthor> 1174405120 <http://www.univ-mlv.fr/~ocure/lubm.owl#softwareVersion> 1182793728 <http://www.univ-mlv.fr/~ocure/lubm.owl#affiliateOf> 1191182336 <http://www.univ-mlv.fr/~ocure/lubm.owl#tenured> 1199570944 <http://www.univ-mlv.fr/~ocure/lubm.owl#teacherOf> 1207959552 <http://www.univ-mlv.fr/~ocure/lubm.owl#publicationDate> 1216348160 <http://www.univ-mlv.fr/~ocure/lubm.owl#affiliatedOrganizationOf> 1224736768 <http://www.univ-mlv.fr/~ocure/lubm.owl#subOrganizationOf> 1233125376 <http://www.univ-mlv.fr/~ocure/lubm.owl#advisor> 1241513984 <http://www.univ-mlv.fr/~ocure/lubm.owl#publicationResearch> Concepts: 0 <http://www.univ-mlv.fr/~ocure/lubm.owl#Schedule> 268435456 <http://www.univ-mlv.fr/~ocure/lubm.owl#Organization> 301989888 <http://www.univ-mlv.fr/~ocure/lubm.owl#College> 335544320 <http://www.univ-mlv.fr/~ocure/lubm.owl#Department> 369098752 <http://www.univ-mlv.fr/~ocure/lubm.owl#Institute> 402653184 <http://www.univ-mlv.fr/~ocure/lubm.owl#ResearchGroup> 436207616 <http://www.univ-mlv.fr/~ocure/lubm.owl#Program> 469762048 <http://www.univ-mlv.fr/~ocure/lubm.owl#University> 536870912 <http://www.univ-mlv.fr/~ocure/lubm.owl#Publication> 570425344 <http://www.univ-mlv.fr/~ocure/lubm.owl#Software> 603979776 <http://www.univ-mlv.fr/~ocure/lubm.owl#Book> 637534208 <http://www.univ-mlv.fr/~ocure/lubm.owl#Specification> 671088640 <http://www.univ-mlv.fr/~ocure/lubm.owl#Manual> 704643072 <http://www.univ-mlv.fr/~ocure/lubm.owl#Article> 713031680 <http://www.univ-mlv.fr/~ocure/lubm.owl#TechnicalReport> 721420288 <http://www.univ-mlv.fr/~ocure/lubm.owl#ConferencePaper> 729808896 <http://www.univ-mlv.fr/~ocure/lubm.owl#JournalArticle> 738197504 <http://www.univ-mlv.fr/~ocure/lubm.owl#UnofficialPublication> 805306368 <http://www.univ-mlv.fr/~ocure/lubm.owl#Person> 872415232 <http://www.univ-mlv.fr/~ocure/lubm.owl#TeachingAssistant> 939524096 <http://www.univ-mlv.fr/~ocure/lubm.owl#Student> 956301312 <http://www.univ-mlv.fr/~ocure/lubm.owl#GraduateStudent> 973078528 <http://www.univ-mlv.fr/~ocure/lubm.owl#UndergraduateStudent> 1006632960 <http://www.univ-mlv.fr/~ocure/lubm.owl#Employee> 1015021568 <http://www.univ-mlv.fr/~ocure/lubm.owl#ResearchAssistant> 1023410176 <http://www.univ-mlv.fr/~ocure/lubm.owl#Director> 1031798784 <http://www.univ-mlv.fr/~ocure/lubm.owl#AdministrativeStaff> 1033895936 <http://www.univ-mlv.fr/~ocure/lubm.owl#SystemsStaff> 1035993088 <http://www.univ-mlv.fr/~ocure/lubm.owl#ClericalStaff> 1040187392 <http://www.univ-mlv.fr/~ocure/lubm.owl#Faculty> 1042284544 <http://www.univ-mlv.fr/~ocure/lubm.owl#PostDoc> 1044381696 <http://www.univ-mlv.fr/~ocure/lubm.owl#Professor> 1044643840 <http://www.univ-mlv.fr/~ocure/lubm.owl#Chair> 1044905984 <http://www.univ-mlv.fr/~ocure/lubm.owl#VisitingProfessor> 1045168128 <http://www.univ-mlv.fr/~ocure/lubm.owl#AssociateProfessor> 1045430272 <http://www.univ-mlv.fr/~ocure/lubm.owl#Dean> 1045692416 <http://www.univ-mlv.fr/~ocure/lubm.owl#FullProfessor> 1045954560 <http://www.univ-mlv.fr/~ocure/lubm.owl#AssistantProfessor> 1046478848 <http://www.univ-mlv.fr/~ocure/lubm.owl#Lecturer> 1073741824 <http://www.univ-mlv.fr/~ocure/lubm.owl#Work> 1140850688 <http://www.univ-mlv.fr/~ocure/lubm.owl#Course> 1174405120 <http://www.univ-mlv.fr/~ocure/lubm.owl#GraduateCourse> 1207959552 <http://www.univ-mlv.fr/~ocure/lubm.owl#Research>
-encoded triples(2.1MB)
-encoded quaruples with replication(2.3MB)
-replicated quaruples(0.3MB)
-univ1.nt (16.3MB)