Ci-dessous, les différences entre deux révisions de la page.
Les deux révisions précédentes Révision précédente Prochaine révision | Révision précédente | ||
site:recherche:logiciels:rdfdist [11/05/2015 12:29] amine [Data preparation and Query evaluation Source Code] |
site:recherche:logiciels:rdfdist [26/04/2017 13:06] amann |
||
---|---|---|---|
Ligne 1: | Ligne 1: | ||
- | ===== RDFdist ===== | + | {{indexmenu_n>1}} |
+ | |||
+ | ===== RDFdist : RDF distribution approaches using Spark ===== | ||
This wiki page provides information about the experiments RDF distribution approaches using [[http://spark.apache.org/|Spark]]. | This wiki page provides information about the experiments RDF distribution approaches using [[http://spark.apache.org/|Spark]]. | ||
- | This information consists of i) the source code for both data preparation and query evaluation, ii) the query workload and iii) the description of datasets used in the experiments. | + | This information consists of i) the query workload ii) the source code for both data preparation and query evaluation, and iii) the description of two datasets used in the experiments. |
- | For the sake of reproducibility, each source code is provied as a script which can be directly executed in the spark shell. | + | For the sake of reproducibility, each source code is provided as a script which can be directly executed in the spark shell. |
- | =====Source Code===== | + | |
- | ====Hash-based approaches==== | + | =====Query workload==== |
+ | We picked three queries from the [[http://swat.cse.lehigh.edu/projects/lubm/|LUBM ]] dataset, namely #2, #9 and #12 | ||
+ | which are referred to as Query 2, Query 3 and Query 4, respectively. | ||
+ | We have created an addirional one referred to as Query 1. | ||
+ | We also created two queries for the [[https://www.wikidata.org/wiki/Wikidata:Main_Page|Wikidata]] dataset which are referred to as Query 5 and Query 6. | ||
+ | ===Query 1 (synthetic, LUBM)=== | ||
+ | <code> | ||
+ | SELECT ?x ?y ?z | ||
+ | WHERE | ||
+ | {?x lubm:advisor ?y. | ||
+ | ?y lubm:worksFor ?z. | ||
+ | ?z lubm:subOrganisation ?t | ||
+ | } | ||
+ | </code> | ||
- | <code>The following code presents the random hashing approach for the LUBM datasets, i.e., queries Q1, Q2, Q3 and Q4 are executed and evaluated. | ||
+ | |||
+ | |||
+ | ===Query 2 (#2 of LUBM)=== | ||
+ | <code> | ||
+ | PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> | ||
+ | PREFIX ub: <http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#> | ||
+ | SELECT ?X, ?Y, ?Z | ||
+ | WHERE | ||
+ | {?X rdf:type ub:GraduateStudent . | ||
+ | ?Y rdf:type ub:University . | ||
+ | ?Z rdf:type ub:Department . | ||
+ | ?X ub:memberOf ?Z . | ||
+ | ?Z ub:subOrganizationOf ?Y . | ||
+ | ?X ub:undergraduateDegreeFrom ?Y} | ||
+ | </code> | ||
+ | |||
+ | ===Query 3 (#9 of LUBM)=== | ||
+ | <code> | ||
+ | PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> | ||
+ | PREFIX ub: <http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#> | ||
+ | SELECT ?X, ?Y, ?Z | ||
+ | WHERE | ||
+ | {?X rdf:type ub:Student . | ||
+ | ?Y rdf:type ub:Faculty . | ||
+ | ?Z rdf:type ub:Course . | ||
+ | ?X ub:advisor ?Y . | ||
+ | ?Y ub:teacherOf ?Z . | ||
+ | ?X ub:takesCourse ?Z} | ||
+ | </code> | ||
+ | |||
+ | ===Query 4 (#12 of LUBM)=== | ||
+ | <code> | ||
+ | PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> | ||
+ | PREFIX ub: <http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#> | ||
+ | SELECT ?X, ?Y | ||
+ | WHERE | ||
+ | {?X rdf:type ub:Chair . | ||
+ | ?Y rdf:type ub:Department . | ||
+ | ?X ub:worksFor ?Y . | ||
+ | ?Y ub:subOrganizationOf <http://www.University0.edu>} | ||
+ | |||
+ | </code> | ||
+ | |||
+ | ===Query 5 (synthetic, Wikidata)=== | ||
+ | <code> | ||
+ | SELECT ?x ?y ?z | ||
+ | WHERE | ||
+ | {?x entity:P131s ?y. | ||
+ | ?y entity:P961v> ?z. | ||
+ | ?z entity:P704s ?w | ||
+ | } | ||
+ | </code> | ||
+ | |||
+ | ===Query 6 (synthetic, Wikidata) === | ||
+ | <code> | ||
+ | SELECT ?x ?y ?z | ||
+ | {?x entity:P39v ?y. | ||
+ | ?x entity:P580q ?z. | ||
+ | ?x rdf:type ?w | ||
+ | } | ||
+ | </code> | ||
+ | |||
+ | =====Source Code===== | ||
+ | |||
+ | |||
+ | |||
+ | ====Hash-based approaches==== | ||
+ | The following code corresponds to the triple hashing and subject hashing approaches for the LUBM datasets. | ||
+ | It consists of a data preparation part followd by a query evaluation part. | ||
Ligne 28: | Ligne 110: | ||
val t1 = java.lang.System.currentTimeMillis(); | val t1 = java.lang.System.currentTimeMillis(); | ||
+ | |||
+ | |||
+ | /** | ||
+ | * set inputData with the path to the data encoded as quadruples (see Datasets excerpts) | ||
+ | */ | ||
// loading and transformating the dataset | // loading and transformating the dataset | ||
Ligne 36: | Ligne 123: | ||
println("Loading time "+ (t2 - t1) +" msec for "+part+" partitions"); | println("Loading time "+ (t2 - t1) +" msec for "+part+" partitions"); | ||
- | // Partitioning the dataset | + | // Partitioning the dataset |
- | val triples = triples0.map{case(f,(s,p,o))=>((s+p+o),(s,p,o))}.partitionBy(new HashPartitioner(part)) | + | /** |
+ | * Uncomment one of the following lines depending on whether hashing is applied | ||
+ | on the entire triple or only the subject | ||
+ | */ | ||
+ | // val triples = triples0.partitionBy(new HashPartitioner(part)) | ||
+ | // val triples = triples0.map{case(f,(s,p,o))=>((s+p+o),(s,p,o))}.partitionBy(new HashPartitioner(part)) | ||
//triples.persist | //triples.persist | ||
Ligne 63: | Ligne 156: | ||
val takco : Long = 1115684864 | val takco : Long = 1115684864 | ||
- | def ajout(a : ListBuffer[(Long, Long, Long)], e: (Long, Long, Long) ) : ListBuffer[(Long, Long, Long)] = { | ||
- | a += e | ||
- | return a | ||
- | } | ||
// ----------------------------------------------------------- | // ----------------------------------------------------------- | ||
Ligne 144: | Ligne 233: | ||
println("Processing LUBM #12 "+ (t2 - t1) +" msec for "+part+" partitions"); | println("Processing LUBM #12 "+ (t2 - t1) +" msec for "+part+" partitions"); | ||
+ | |||
+ | // ----------------------------------------------------------- | ||
+ | // highly selective on wikidata :: 8 results | ||
+ | // QR5: ?x <http://www.wikidata.org/entity/P131s> ?y. | ||
+ | ?y <http://www.wikidata.org/entity/P961v> ?z. | ||
+ | ?z <http://www.wikidata.org/entity/P704s> ?w. | ||
+ | // ----------------------------------------------------------- | ||
+ | val qr5 = triples.filter({case(s,p,o)=>p==P131s}).map({case(s,p,o)=>(o,(s,p))}). | ||
+ | join(triples.filter({case(s,p,o)=>p==P961v}).map({case(s,p,o)=>(s,(p,o))})). | ||
+ | map({case(k,((s1,p1),(p2,o2)))=>(o2,s1)}). | ||
+ | join(triples.filter({case(s,p,o)=>p==P704v}).map({case(s,p,o)=>(s,(p,o))})) | ||
+ | qr5.persist | ||
+ | qr5.collect | ||
+ | |||
+ | // mid selectivity start shaped query : 10418 results | ||
+ | // QR6: ?x <http://www.wikidata.org/entity/P39v> ?y. ?x <http://www.wikidata.org/entity/P580q> ?z. ?x <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> ?w | ||
+ | |||
+ | val qr6 = triples.filter({case(s,p,o)=>p==P39v}).map({case(s,p,o)=>(s,null)}). | ||
+ | join(triples.filter({case(s,p,o)=>p==P580q}).map({case(s,p,o)=>(s,null)})). | ||
+ | join(triples.filter({case(s,p,o)=>p==rdftype})) | ||
+ | |||
+ | qr6.persist | ||
+ | qr6.collect | ||
+ | |||
</code> | </code> | ||
- | ==Hashing applied on the subject == | ||
- | ====Partitioning-based approaches==== | ||
- | ==nHop== | + | ====Graph partitioning-based approaches==== |
- | ==WARP== | + | ===Huang Approach === |
+ | <code> | ||
+ | import org.apache.spark.HashPartitioner | ||
+ | import scala.collection.mutable.ListBuffer | ||
- | ==Hybrid== | + | val folder= "lubm" //"watdiv" |
+ | val dataset= "univ" //"watdiv" | ||
+ | val scale="1k" | ||
+ | val part=20 //10, 20 | ||
+ | val folderName = folder +scale | ||
+ | val fileName = dataset+scale+"_encoded_unique_quads.part."+part+".2hop" | ||
+ | |||
+ | val t1 = java.lang.System.currentTimeMillis(); | ||
+ | |||
+ | val quads_I_SPO = sc.textFile(s"/user/olivier/${folderName}/${fileName}").coalesce(part).map(x=>x.split(",")).map(t=>(t(3).replace(")","").toLong, (t(0).replace("(","").toLong,t(1).toLong,t(2).toLong))) | ||
+ | |||
+ | val quadsDist = quads_I_SPO.partitionBy(new HashPartitioner(part)).persist | ||
+ | |||
+ | |||
+ | |||
+ | val t2 = java.lang.System.currentTimeMillis(); | ||
+ | |||
+ | print("Loading time of quads : "+(t2-t1)/1000 +" sec") | ||
+ | |||
+ | val advisor : Long = 1233125376 | ||
+ | val worksFor : Long = 1136656384 | ||
+ | val suborg : Long = 1224736768 | ||
+ | val memof : Long = 113246208 | ||
+ | val undeg : Long = 1101004800 | ||
+ | val teaof : Long = 1199570944 | ||
+ | val takco : Long = 1115684864 | ||
+ | |||
+ | |||
+ | // ----------------------------------------------------------- | ||
+ | // Query 1 : (not part of the benchmark) | ||
+ | // Pattern: (x advisor y) (y worksFor z) (z subOrganisation t) | ||
+ | // ----------------------------------------------------------- | ||
+ | |||
+ | var t1 = java.lang.System.currentTimeMillis(); | ||
+ | |||
+ | var pataws = quadsDist.filter({case(i,(s,p,o)) => p==advisor}).map({case(i,(s,p,o)) => (o,s)}). | ||
+ | join(quadsDist.filter({case(i,(s,p,o)) => p==worksFor}).map({case(i,(s,p,o)) => (s,o)}),part). | ||
+ | map({case (y,(x,z)) => (z,(x,y))}). | ||
+ | join(quadsDist.filter({case(i,(s,p,o)) => p==suborg}).map({case(i,(s,p,o)) => (s,o)}), part). | ||
+ | map({case (z,((x,y),t)) => (x,y,z,t)}) | ||
+ | |||
+ | pataws.count | ||
+ | |||
+ | var pataws2 = pataws.flatMap(x=>x).distinct | ||
+ | |||
+ | var t2= java.lang.System.currentTimeMillis(); | ||
+ | |||
+ | println("Processing Q1 "+ (t2 - t1) +" msec for "+part+" partitions"); | ||
+ | |||
+ | |||
+ | // ----------------------------------------------------------- | ||
+ | // LUBM 2 : MSU | ||
+ | // Pattern: (x memberOf y) (y subOrg z) (x UndergraduateDegreeFrom z) | ||
+ | // ----------------------------------------------------------- | ||
+ | |||
+ | var t1 = java.lang.System.currentTimeMillis(); | ||
+ | |||
+ | //var pmemof = quadsDist.filter({case(i,(s,p,o)) => p==memof}).cache() | ||
+ | |||
+ | var patmsu = quadsDist.filter({case(i,(s,p,o)) => p==memof}).map({case(i,(s,p,o)) => (o,s)}). | ||
+ | join(quadsDist.filter({case(i,(s,p,o)) => p==suborg}).map({case(i,(s,p,o)) => (s,o)}),part). | ||
+ | map({case (y,(x,z)) => (x+""+z,(x,y,z))}). | ||
+ | join(quadsDist.filter({case(i,(s,p,o)) => p==undeg}).map({case(i,(x,p,z))=> (x+""+z,null)})) | ||
+ | |||
+ | patmsu.count | ||
+ | |||
+ | var patmsu2 = patmsu.flatMap(identity).distinct | ||
+ | |||
+ | var t2= java.lang.System.currentTimeMillis(); | ||
+ | |||
+ | println("Processing Q2 "+ (t2 - t1) +" msec for "+part+" partitions"); | ||
+ | |||
+ | // ----------------------------------------------------------- | ||
+ | // LUBM 9 : ATT | ||
+ | // Pattern: (x advisor y) (y teacherOf z) (x takesCourse z) | ||
+ | // ----------------------------------------------------------- | ||
+ | |||
+ | var t1 = java.lang.System.currentTimeMillis(); | ||
+ | |||
+ | var patatt = quadsDist.filter({case(i,(s,p,o)) => p==advisor}).map({case(i,(s,p,o)) => (o,s)}). | ||
+ | join(quadsDist.filter({case(i,(s,p,o)) => p==teaof}).map({case(i,(s,p,o)) => (s,o)}), part). | ||
+ | map({case (y,(x,z)) => (x+""+z,(x,y,z))}). | ||
+ | join(quadsDist.filter({case(i,(s,p,o)) => p==takco}).map({case(i,(s,p,o))=> (s+""+o,null)}), part) | ||
+ | |||
+ | patatt.distinct.count | ||
+ | |||
+ | var t2= java.lang.System.currentTimeMillis(); | ||
+ | |||
+ | println("Processing Q3 (LUBM #9) "+ (t2 - t1) +" msec for "+part+" partitions"); | ||
+ | |||
+ | |||
+ | // ----------------------------------------------------------- | ||
+ | // LUBM 12 : WS | ||
+ | // Pattern: (y worksFor z) (z subOrganisation t) | ||
+ | // ----------------------------------------------------------- | ||
+ | |||
+ | val t1 = java.lang.System.currentTimeMillis(); | ||
+ | |||
+ | val patws = quadsDist.filter({case(i,(s,p,o)) => p==worksFor}).map({case(i,(s,p,o)) => ((i,o),s)}).join(quadsDist.filter({case(i,(s,p,o)) => p==suborg}).map({case(i,(s,p,o)) => ((i,s),o)}),part).map({case ((i,k),(s,o)) => (s,o)}) | ||
+ | |||
+ | val ans_patws = patws.distinct.count | ||
+ | |||
+ | val t2= java.lang.System.currentTimeMillis(); | ||
+ | |||
+ | println("Processing LUBM #12 "+ (t2 - t1) +" msec for "+part+" partitions"); | ||
+ | </code> | ||
+ | |||
+ | ===Warp=== | ||
+ | <code> | ||
+ | // Spark implementation of WARP replication | ||
+ | // usage: run this code into the spark-shell | ||
+ | |||
+ | import scala.collection.mutable.ListBuffer | ||
+ | import org.apache.spark.rdd.RDD | ||
+ | import scala.reflect.ClassTag | ||
+ | import org.apache.spark.SparkContext | ||
+ | import org.apache.spark.SparkContext._ | ||
+ | |||
+ | import org.apache.spark.HashPartitioner | ||
+ | import org.apache.spark.Partitioner | ||
+ | |||
+ | import java.io.Serializable | ||
+ | |||
+ | |||
+ | val folder= "lubm" //"watdiv" | ||
+ | val dataset= "univ" //"watdiv"// | ||
+ | val scale="1k" | ||
+ | |||
+ | /* We have 15 cores per machine. | ||
+ | Each core is accessing a separate part in parallel. | ||
+ | The default parallelism = #machines * 15 cores | ||
+ | <=> #machines = defaultParallelism / 15 | ||
+ | */ | ||
+ | val machine = sc.defaultParallelism /15 | ||
+ | val part = sc.defaultParallelism | ||
+ | val folderName = folder + scale | ||
+ | val fileName = dataset + scale | ||
+ | /** | ||
+ | * set inputData with the path to the data encoded as quadruples (see Datasets excerpts) | ||
+ | */ | ||
+ | val inputData = s"/user/olivier/${folderName}/${fileName}_encoded_unique_quads.part.${machine}" | ||
+ | |||
+ | // Initial state, delete the storage | ||
+ | sc.getPersistentRDDs.values.foreach(x => x.unpersist()) | ||
+ | sc.getPersistentRDDs.values.size | ||
+ | |||
+ | var t1 = java.lang.System.currentTimeMillis(); | ||
+ | |||
+ | def affiche(s: String): Unit={ | ||
+ | println("#### ------------------------------- ####") | ||
+ | println(s) | ||
+ | println("#### ------------------------------- ####") | ||
+ | } | ||
+ | |||
+ | |||
+ | /* | ||
+ | ----------------------- | ||
+ | STEP 1: read triples | ||
+ | ----------------------- | ||
+ | */ | ||
+ | |||
+ | /** | ||
+ | Function lireTriples reads the input data | ||
+ | returns a quad (subject, property, object, partID) | ||
+ | with partID = hash(subject) | ||
+ | **/ | ||
+ | def lireTriples(input: String): RDD[(Long, Long, Long, Int)] = { | ||
+ | return sc.textFile(input). | ||
+ | map(line => line.substring(1, line.length -1).split(",")). | ||
+ | map(tab => (tab(0).toLong, tab(1).toLong, tab(2).toLong, (tab(0).toInt).hashCode%machine)) | ||
+ | } | ||
+ | |||
+ | val triples = lireTriples(inputData) | ||
+ | triples.setName("triples") | ||
+ | triples.persist | ||
+ | |||
+ | //nombre total de nuplets | ||
+ | //triples.count | ||
+ | |||
+ | // stat: nb triplets par partition | ||
+ | //triples.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b).collect.foreach(println) | ||
+ | |||
+ | |||
+ | |||
+ | /* | ||
+ | --------------------------------------------------------------------------- | ||
+ | STEP 2: COMPUTE REPLICATION TRIPLES | ||
+ | |||
+ | Computes the triple to replicate in every partition, for each query | ||
+ | --------------------------------------------------------------------------- | ||
+ | */ | ||
+ | |||
+ | /* | ||
+ | Fonction filterProp : selects the triples a a given property | ||
+ | */ | ||
+ | def filterProp(input: RDD[(Long, Long, Long, Int)], prop: Long): RDD[(Long, Long, Int)] = { | ||
+ | x // on projette sur S et O car P est fixe | ||
+ | return input.filter(x => x._2 == prop).map{ case (s,p,o,f) => (s,o,f)} | ||
+ | } | ||
+ | |||
+ | def getOutsideTriple(seedF: Int, t: ListBuffer[(Long, Long, Int)], properties: Array[Long]): Seq[(Long, Long, Long, Int)] = { | ||
+ | var a = t.toArray | ||
+ | var res: ListBuffer[(Long, Long, Long, Int)] = ListBuffer() | ||
+ | |||
+ | for(i <-0 to (a.length - 1)) { | ||
+ | if( a(i)._3 != seedF) { | ||
+ | res.append( (a(i)._1, properties(i), a(i)._2, seedF)) | ||
+ | } | ||
+ | } | ||
+ | return res | ||
+ | } | ||
+ | |||
+ | def getReplicaForQuery(query: RDD[ListBuffer[(Long, Long, Int)]], properties: Array[Long], nbCandidateSeed: Int): RDD[(Long, Long, Long, Int)] = { | ||
+ | var replica = sc.parallelize(List((0L,0L,0L,0))) | ||
+ | var min_nb = -1L | ||
+ | |||
+ | for (i <- 0 to (nbCandidateSeed-1)) { | ||
+ | var t1 = query.map(x => (x(i)._3, x)) | ||
+ | //t1.take(5).foreach(println) | ||
+ | |||
+ | // lister les triplets a repliquer = ceux qui ne sont pas dans la seed partition | ||
+ | var t2 = t1.flatMap{ case(seedF, tripleList) => getOutsideTriple(seedF, tripleList, properties)}.distinct | ||
+ | |||
+ | // count the triples to replicate | ||
+ | var nb = t2.count | ||
+ | if(min_nb == -1 || nb < min_nb){ | ||
+ | min_nb=nb | ||
+ | replica=t2 | ||
+ | println("current:") | ||
+ | println(s"seed $i, replica count: $nb ") | ||
+ | } | ||
+ | else { | ||
+ | println(s"-----ignore seed $i, replica count: $nb ") | ||
+ | } | ||
+ | } | ||
+ | return replica | ||
+ | } | ||
+ | |||
+ | // ------------------------------------------------------- | ||
+ | // Compute replication for the folowing QUERIES | ||
+ | // ------------------------------------------------------- | ||
+ | |||
+ | // based on dictionnary encoding | ||
+ | val advisor : Long = 1233125376 | ||
+ | val worksFor : Long = 1136656384 | ||
+ | val suborg : Long = 1224736768 | ||
+ | val memof : Long = 1132462080 | ||
+ | val undeg : Long = 1101004800 | ||
+ | val teaof : Long = 1199570944 | ||
+ | val takco : Long = 1115684864 | ||
+ | |||
+ | // ----------------------------------------------------------- | ||
+ | // Query 1 : (not part of the benchmark) | ||
+ | // Pattern: (x advisor y) (y worksFor z) (z subOrganisation t) | ||
+ | // ----------------------------------------------------------- | ||
+ | val query1 = filterProp(triples, advisor).map{ case (x, y, f1) => ( y, (x, f1))}. | ||
+ | join(filterProp(triples, worksFor).map{ case (y, z, f2) => ( y, (z, f2) )}). | ||
+ | map{ case ( y, ((x,f1), (z, f2))) => (z, (x, f1, y, f2)) }. | ||
+ | join(filterProp(triples, suborg).map{ case ( z, t, f3) => (z, (t,f3))}). | ||
+ | map{ case ( z, ( (x,f1,y,f2), (t,f3) )) => ListBuffer( (x, y, f1), (y, z, f2) , (z, t, f3))} | ||
+ | |||
+ | query1.setName("q1") | ||
+ | //query1.persist() | ||
+ | //query1.count | ||
+ | // resultat: 4108791 | ||
+ | |||
+ | var t1 = java.lang.System.currentTimeMillis(); | ||
+ | |||
+ | val replica1 = getReplicaForQuery(query1, Array(advisor, worksFor, suborg), 3) | ||
+ | |||
+ | var t2= java.lang.System.currentTimeMillis(); | ||
+ | |||
+ | println("Processing Q1 "+ (t2 - t1) +" msec for "+part+" partitions"); | ||
+ | |||
+ | // stat: nb triples a repliquer dans chaque partition | ||
+ | //replica1.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b).collect.foreach(println) | ||
+ | // (0,15551) (1,15776) (2,17639) (3,201275) (4,46068) | ||
+ | |||
+ | |||
+ | // ----------------------------------------------------------- | ||
+ | // LUBM 2 : MSU | ||
+ | // Pattern: (x memberOf y) (y subOrg z) (x UndergraduateDegreeFrom z) | ||
+ | // ----------------------------------------------------------- | ||
+ | val query2 = filterProp(triples, memof).map{ case (x, y, f1) => ( y, (x, f1))}. | ||
+ | join(filterProp(triples, suborg).map{ case (y, z, f2) => ( y, (z, f2) )}). | ||
+ | map{ case ( y, ((x,f1), (z,f2) )) => ((x, z), (y, f1, f2)) }. | ||
+ | join(filterProp(triples, undeg).map{ case (x, z, f3) => ( (x, z), f3)}). | ||
+ | map{ case ((x, z), ((y, f1, f2), f3)) => ListBuffer((x, y, f1), (y, z, f2), (x, z, f3)) } | ||
+ | |||
+ | query2.setName("q2") | ||
+ | //query2.persist() | ||
+ | //query2.count | ||
+ | |||
+ | val replica2 = getReplicaForQuery(query2, Array(memof, suborg, undeg), 2) | ||
+ | |||
+ | //stat: nb triples a repliquer dans chaque partition | ||
+ | //replica2.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b).collect.foreach(println) | ||
+ | //(0,1) (1,3) (3,4) (4,896) | ||
+ | |||
+ | // ----------------------------------------------------------- | ||
+ | // LUBM 9 : ATT | ||
+ | // Pattern: (x advisor y) (y teacherOf z) (x takesCourse z) | ||
+ | // ----------------------------------------------------------- | ||
+ | val query9 = filterProp(triples, advisor).map{ case (x, y, f1) => ( y,(x, f1))}. | ||
+ | join(filterProp(triples, teaof).map{ case (y, z, f2) => ( y, (z, f2) )}). | ||
+ | map{ case ( y, ((x, f1), (z, f2) )) => ((x, z), (y, f1, f2)) }. | ||
+ | join(filterProp(triples, takco).map{ case (x, z, f3) => ( (x, z), f3)}). | ||
+ | map{ case ((x, z), ((y, f1, f2), f3)) => ListBuffer((x, y, f1), (y, z, f2), (x, z, f3)) } | ||
+ | |||
+ | query9.setName("q9") | ||
+ | //query9.persist() | ||
+ | query9.count | ||
+ | // 272982 | ||
+ | |||
+ | val replica9 = getReplicaForQuery(query9, Array(advisor, teaof, takco) ,2) | ||
+ | |||
+ | // stat : nb triples à repliquer dans chaque partition | ||
+ | //replica9.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b).collect.foreach(println) | ||
+ | //(0,3131) (1,3213) (2,3504) (3,44706) (4,2014) | ||
+ | |||
+ | |||
+ | // ----------------------------------------------------------- | ||
+ | // LUBM 12 : WS | ||
+ | // Pattern: (y worksFor z) (z subOrganisation t) | ||
+ | // ----------------------------------------------------------- | ||
+ | val query12 = filterProp(triples, worksFor).map{ case (y, z, f1) => ( z, (y, f1))}. | ||
+ | join(filterProp(triples, suborg).map{ case (z, t, f2) => ( z, (t, f2))}). | ||
+ | map{ case ( z, ((y, f1), (t, f2) )) => ListBuffer((y, z, f1), (z, t, f2)) } | ||
+ | |||
+ | query12.setName("q12") | ||
+ | //query12.persist() | ||
+ | query12.count | ||
+ | //720628 | ||
+ | |||
+ | val replica12 = getReplicaForQuery(query12, Array(worksFor, suborg), 2) | ||
+ | |||
+ | //stat : nb triples à repliquer dans chaque partition | ||
+ | //replica12.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b).collect.foreach(println) | ||
+ | |||
+ | |||
+ | /* | ||
+ | --------------------------------------------------------------------------- | ||
+ | STEP 3: UNION DES REPLICAS CALCULES POUR CHAQUE REQUETE | ||
+ | --------------------------------------------------------------------------- | ||
+ | */ | ||
+ | |||
+ | val allreplica = replica1.union(replica2).union(replica9).union(replica12).distinct | ||
+ | |||
+ | val nbAjout = allreplica.count | ||
+ | // 357 161 pour 5 part | ||
+ | // 734 295 pour 10 part | ||
+ | // 779 983 pour 20 part | ||
+ | |||
+ | // replication rate | ||
+ | affiche("N ajout: " + nbAjout.toDouble) | ||
+ | affiche("taux de replication: " + (nbAjout.toDouble / triples.count)) | ||
+ | |||
+ | var t2= java.lang.System.currentTimeMillis(); | ||
+ | affiche("Preparation time "+ (t2 - t1).toDouble/1000 +" sec for " + part + " partitions"); | ||
+ | |||
+ | val oDist = triples.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b) | ||
+ | val oMean = oDist.map{case(f,c)=>c}.sum / machine | ||
+ | val odevs = oDist.map{case(f,c)=>c}.map(score => (score - oMean) * (score - oMean)) | ||
+ | val ostddev = Math.sqrt(odevs.sum / machine) | ||
+ | |||
+ | val nDist = oDist.join(allreplica.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b)).map{case(f,(oc,on))=>(oc+on)} | ||
+ | val nMean = nDist.sum / machine | ||
+ | val ndevs = nDist.map(score => (score - nMean) * (score - nMean)) | ||
+ | val nstddev = Math.sqrt(ndevs.sum / machine) | ||
+ | |||
+ | |||
+ | |||
+ | /* | ||
+ | --------------------------------------------------------------------------- | ||
+ | STEP 4: LOCAL QUERY PROCESSING | ||
+ | On the WARP replicated data | ||
+ | --------------------------------------------------------------------------- | ||
+ | */ | ||
+ | |||
+ | // Extends each partition with the WARP replicated triples. | ||
+ | // Each machine stores one partition | ||
+ | val triplesWARP = triples.union(allreplica). | ||
+ | map{ case (s,p,o,f) => (f, (s,p,o))}. | ||
+ | partitionBy(new HashPartitioner(machine)).map{ case (f, (s,p,o)) => (s,p,o,f)} | ||
+ | |||
+ | triplesWARP.setName("triples WARP") | ||
+ | triplesWARP.persist() | ||
+ | triplesWARP.count | ||
+ | |||
+ | |||
+ | var t2= java.lang.System.currentTimeMillis(); | ||
+ | affiche("Preparation time "+ (t2 - t1).toDouble/1000 +" sec for " + part + " partitions"); | ||
+ | |||
+ | |||
+ | // ------------------- | ||
+ | // Q1 LOCAL | ||
+ | // ------------------- | ||
+ | t1= java.lang.System.currentTimeMillis(); | ||
+ | |||
+ | // Query1 locale avec projection sur les variables (x,y,z,t) | ||
+ | val localQuery1 = filterProp(triplesWARP, advisor).map{ case (x, y, f) => ( (y, f), x)}. | ||
+ | join(filterProp(triplesWARP, worksFor).map{ case (y, z, f) => ( (y, f), z )}). | ||
+ | map{ case ( (y, f), (x, z)) => ((z, f), (x, y)) }. | ||
+ | join(filterProp(triplesWARP, suborg).map{ case ( z, t, f) => ((z, f), t)}). | ||
+ | map{ case ( (z, f), ( (x,y), t)) => (x, y, z, t)} | ||
+ | |||
+ | localQuery1.count | ||
+ | //4108791 OK idem sans replication | ||
+ | |||
+ | t2= java.lang.System.currentTimeMillis(); | ||
+ | affiche("Duréee de la requête Q1 : "+ (t2 - t1) +" msec pour " + part + " partitions"); | ||
+ | |||
+ | // ----------------------------------------------------------- | ||
+ | // Q2 LOCAL: | ||
+ | // Pattern: (x memberOf y) (y subOrg z) (x UndergraduateDegreeFrom z) | ||
+ | // ----------------------------------------------------------- | ||
+ | t1= java.lang.System.currentTimeMillis(); | ||
+ | |||
+ | val localQuery2 = filterProp(triplesWARP, memof).map{ case (x, y, f) => ( (y, f), x)}. | ||
+ | join(filterProp(triplesWARP, suborg).map{ case (y, z, f) => ( (y, f), z )}). | ||
+ | map{ case ( (y, f), (x, z)) => ((x, z, f), y) }. | ||
+ | join(filterProp(triplesWARP, undeg).map{ case (x, z, f) => ( (x, z, f), 1)}). | ||
+ | map{ case ((x, z, f), (y, 1)) => (x, y, z) } | ||
+ | |||
+ | localQuery2.count | ||
+ | //2528 | ||
+ | t2= java.lang.System.currentTimeMillis(); | ||
+ | affiche("Duree de la requete Q2 : "+ (t2 - t1) +" msec pour " + part + " partitions"); | ||
+ | |||
+ | |||
+ | // ----------------------------------------------------------- | ||
+ | // Q9 LOCAL | ||
+ | // Pattern: (x advisor y) (y teacherOf z) (x takesCourse z) | ||
+ | // ----------------------------------------------------------- | ||
+ | t1= java.lang.System.currentTimeMillis(); | ||
+ | |||
+ | val localQuery9 = filterProp(triplesWARP, advisor).map{ case (x, y, f) => ( (y, f), x)}. | ||
+ | join(filterProp(triplesWARP, teaof).map{ case (y, z, f) => ( (y, f), z )}). | ||
+ | map{ case ( (y, f), (x, z)) => ((x, z, f), y) }. | ||
+ | join(filterProp(triplesWARP, takco).map{ case (x, z, f) => ( (x, z, f), 1)}). | ||
+ | map{ case ((x, z, f), (y, 1)) => (x, y, z) } | ||
+ | |||
+ | localQuery9.count | ||
+ | //272982 | ||
+ | |||
+ | t2= java.lang.System.currentTimeMillis(); | ||
+ | affiche("Duree de la requete Q9 : "+ (t2 - t1) +" msec pour " + part + " partitions"); | ||
+ | |||
+ | |||
+ | // ----------------------------------------------------------- | ||
+ | // Q12 LOCAL | ||
+ | // Pattern: (y worksFor z) (z subOrganisation t) | ||
+ | // ----------------------------------------------------------- | ||
+ | t1= java.lang.System.currentTimeMillis(); | ||
+ | |||
+ | val localQuery12 = filterProp(triplesWARP, worksFor).map{ case (y, z, f) => ( (z, f), y)}. | ||
+ | join(filterProp(triplesWARP, suborg).map{ case (z, t, f) => ( (z, f), t)}). | ||
+ | map{ case ( (z, f), (y, t)) => (y, z, t) } | ||
+ | |||
+ | localQuery12.count | ||
+ | //938356 | ||
+ | |||
+ | t2= java.lang.System.currentTimeMillis(); | ||
+ | affiche("Duree de la requete Q12: "+ (t2 - t1) +" msec pour " + part + " partitions"); | ||
+ | |||
+ | |||
+ | </code> | ||
+ | ===2-hop based approach=== | ||
+ | <code> | ||
+ | val folder= "lubm" | ||
+ | val dataset= "univ" | ||
+ | val scale="10k" | ||
+ | |||
+ | val folderName = folder +scale | ||
+ | val part = Array(5,10,20) | ||
+ | |||
+ | for (p <- part) | ||
+ | { | ||
+ | val fileName = dataset+scale+"_encoded_unique_quads.part."+p | ||
+ | val fileNamewatdiv2k_encoded_unique_quads.partNew.5 | ||
+ | val t1 = java.lang.System.currentTimeMillis(); | ||
+ | |||
+ | val quads = sc.textFile(s"/user/olivier/${folderName}/${fileName}").map(x=>x.split(",")).map(t=>(t(0).replace("(","").toLong,t(1).toLong,t(2).toLong,t(3).replace(")","").toLong)) | ||
+ | |||
+ | var addOneHop = quads.map({case(s,p,o,i)=>(o,i)}).join(quads.map({case(s,p,o,i)=>(s,(p,o,i))})).filter({case(termS,(i1,(p,o,i2)))=>i1!=i2}).distinct.map({case(termS,(i1,(p,o,i2)))=>(termS,p,o,i1)}) | ||
+ | |||
+ | val newQuads = quads.union(addOneHop).distinct | ||
+ | val newQuadsSize = newQuads.count | ||
+ | |||
+ | val t2 = java.lang.System.currentTimeMillis(); | ||
+ | val hopSize = addOneHop.count | ||
+ | println(s"Time to compute one more hop on $folderName for $p partitions is ${t2-t1}") | ||
+ | println(s" new size = $newQuadsSize , added $hopSize") | ||
+ | newQuads.saveAsTextFile(s"/user/olivier/${folderName}/${fileName}.2hop") | ||
+ | } | ||
+ | </code> | ||
====Datasets excerpts==== | ====Datasets excerpts==== | ||
+ | ===Encoding of LUBM concepts and properties=== | ||
+ | |||
+ | <code> | ||
+ | Properties: | ||
+ | 0 <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> | ||
+ | 603979776 <http://www.univ-mlv.fr/~ocure/lubm.owl#officeNumber> | ||
+ | 671088640 <http://www.univ-mlv.fr/~ocure/lubm.owl#name> | ||
+ | 738197504 <http://www.univ-mlv.fr/~ocure/lubm.owl#title> | ||
+ | 805306368 <http://www.univ-mlv.fr/~ocure/lubm.owl#age> | ||
+ | 872415232 <http://www.univ-mlv.fr/~ocure/lubm.owl#telephone> | ||
+ | 939524096 <http://www.univ-mlv.fr/~ocure/lubm.owl#emailAddress> | ||
+ | 1006632960 <http://www.univ-mlv.fr/~ocure/lubm.owl#researchInterest> | ||
+ | 1082130432 <http://www.univ-mlv.fr/~ocure/lubm.owl#researchProject> | ||
+ | 1090519040 <http://www.univ-mlv.fr/~ocure/lubm.owl#hasAlumnus> | ||
+ | 1098907648 <http://www.univ-mlv.fr/~ocure/lubm.owl#degreeFrom> | ||
+ | 1101004800 <http://www.univ-mlv.fr/~ocure/lubm.owl#undergraduateDegreeFrom> | ||
+ | 1103101952 <http://www.univ-mlv.fr/~ocure/lubm.owl#mastersDegreeFrom> | ||
+ | 1105199104 <http://www.univ-mlv.fr/~ocure/lubm.owl#doctoralDegreeFrom> | ||
+ | 1107296256 <http://www.univ-mlv.fr/~ocure/lubm.owl#orgPublication> | ||
+ | 1115684864 <http://www.univ-mlv.fr/~ocure/lubm.owl#takesCourse> | ||
+ | 1124073472 <http://www.univ-mlv.fr/~ocure/lubm.owl#member> | ||
+ | 1132462080 <http://www.univ-mlv.fr/~ocure/lubm.owl#memberOf> | ||
+ | 1136656384 <http://www.univ-mlv.fr/~ocure/lubm.owl#worksFor> | ||
+ | 1138753536 <http://www.univ-mlv.fr/~ocure/lubm.owl#headOf> | ||
+ | 1140850688 <http://www.univ-mlv.fr/~ocure/lubm.owl#teachingAssistantOf> | ||
+ | 1149239296 <http://www.univ-mlv.fr/~ocure/lubm.owl#listedCourse> | ||
+ | 1157627904 <http://www.univ-mlv.fr/~ocure/lubm.owl#softwareDocumentation> | ||
+ | 1166016512 <http://www.univ-mlv.fr/~ocure/lubm.owl#publicationAuthor> | ||
+ | 1174405120 <http://www.univ-mlv.fr/~ocure/lubm.owl#softwareVersion> | ||
+ | 1182793728 <http://www.univ-mlv.fr/~ocure/lubm.owl#affiliateOf> | ||
+ | 1191182336 <http://www.univ-mlv.fr/~ocure/lubm.owl#tenured> | ||
+ | 1199570944 <http://www.univ-mlv.fr/~ocure/lubm.owl#teacherOf> | ||
+ | 1207959552 <http://www.univ-mlv.fr/~ocure/lubm.owl#publicationDate> | ||
+ | 1216348160 <http://www.univ-mlv.fr/~ocure/lubm.owl#affiliatedOrganizationOf> | ||
+ | 1224736768 <http://www.univ-mlv.fr/~ocure/lubm.owl#subOrganizationOf> | ||
+ | 1233125376 <http://www.univ-mlv.fr/~ocure/lubm.owl#advisor> | ||
+ | 1241513984 <http://www.univ-mlv.fr/~ocure/lubm.owl#publicationResearch> | ||
+ | |||
+ | Concepts: | ||
+ | 0 <http://www.univ-mlv.fr/~ocure/lubm.owl#Schedule> | ||
+ | 268435456 <http://www.univ-mlv.fr/~ocure/lubm.owl#Organization> | ||
+ | 301989888 <http://www.univ-mlv.fr/~ocure/lubm.owl#College> | ||
+ | 335544320 <http://www.univ-mlv.fr/~ocure/lubm.owl#Department> | ||
+ | 369098752 <http://www.univ-mlv.fr/~ocure/lubm.owl#Institute> | ||
+ | 402653184 <http://www.univ-mlv.fr/~ocure/lubm.owl#ResearchGroup> | ||
+ | 436207616 <http://www.univ-mlv.fr/~ocure/lubm.owl#Program> | ||
+ | 469762048 <http://www.univ-mlv.fr/~ocure/lubm.owl#University> | ||
+ | 536870912 <http://www.univ-mlv.fr/~ocure/lubm.owl#Publication> | ||
+ | 570425344 <http://www.univ-mlv.fr/~ocure/lubm.owl#Software> | ||
+ | 603979776 <http://www.univ-mlv.fr/~ocure/lubm.owl#Book> | ||
+ | 637534208 <http://www.univ-mlv.fr/~ocure/lubm.owl#Specification> | ||
+ | 671088640 <http://www.univ-mlv.fr/~ocure/lubm.owl#Manual> | ||
+ | 704643072 <http://www.univ-mlv.fr/~ocure/lubm.owl#Article> | ||
+ | 713031680 <http://www.univ-mlv.fr/~ocure/lubm.owl#TechnicalReport> | ||
+ | 721420288 <http://www.univ-mlv.fr/~ocure/lubm.owl#ConferencePaper> | ||
+ | 729808896 <http://www.univ-mlv.fr/~ocure/lubm.owl#JournalArticle> | ||
+ | 738197504 <http://www.univ-mlv.fr/~ocure/lubm.owl#UnofficialPublication> | ||
+ | 805306368 <http://www.univ-mlv.fr/~ocure/lubm.owl#Person> | ||
+ | 872415232 <http://www.univ-mlv.fr/~ocure/lubm.owl#TeachingAssistant> | ||
+ | 939524096 <http://www.univ-mlv.fr/~ocure/lubm.owl#Student> | ||
+ | 956301312 <http://www.univ-mlv.fr/~ocure/lubm.owl#GraduateStudent> | ||
+ | 973078528 <http://www.univ-mlv.fr/~ocure/lubm.owl#UndergraduateStudent> | ||
+ | 1006632960 <http://www.univ-mlv.fr/~ocure/lubm.owl#Employee> | ||
+ | 1015021568 <http://www.univ-mlv.fr/~ocure/lubm.owl#ResearchAssistant> | ||
+ | 1023410176 <http://www.univ-mlv.fr/~ocure/lubm.owl#Director> | ||
+ | 1031798784 <http://www.univ-mlv.fr/~ocure/lubm.owl#AdministrativeStaff> | ||
+ | 1033895936 <http://www.univ-mlv.fr/~ocure/lubm.owl#SystemsStaff> | ||
+ | 1035993088 <http://www.univ-mlv.fr/~ocure/lubm.owl#ClericalStaff> | ||
+ | 1040187392 <http://www.univ-mlv.fr/~ocure/lubm.owl#Faculty> | ||
+ | 1042284544 <http://www.univ-mlv.fr/~ocure/lubm.owl#PostDoc> | ||
+ | 1044381696 <http://www.univ-mlv.fr/~ocure/lubm.owl#Professor> | ||
+ | 1044643840 <http://www.univ-mlv.fr/~ocure/lubm.owl#Chair> | ||
+ | 1044905984 <http://www.univ-mlv.fr/~ocure/lubm.owl#VisitingProfessor> | ||
+ | 1045168128 <http://www.univ-mlv.fr/~ocure/lubm.owl#AssociateProfessor> | ||
+ | 1045430272 <http://www.univ-mlv.fr/~ocure/lubm.owl#Dean> | ||
+ | 1045692416 <http://www.univ-mlv.fr/~ocure/lubm.owl#FullProfessor> | ||
+ | 1045954560 <http://www.univ-mlv.fr/~ocure/lubm.owl#AssistantProfessor> | ||
+ | 1046478848 <http://www.univ-mlv.fr/~ocure/lubm.owl#Lecturer> | ||
+ | 1073741824 <http://www.univ-mlv.fr/~ocure/lubm.owl#Work> | ||
+ | 1140850688 <http://www.univ-mlv.fr/~ocure/lubm.owl#Course> | ||
+ | 1174405120 <http://www.univ-mlv.fr/~ocure/lubm.owl#GraduateCourse> | ||
+ | 1207959552 <http://www.univ-mlv.fr/~ocure/lubm.owl#Research> | ||
+ | </code> | ||
+ | |||
+ | ===LUBM Univ1 === | ||
+ | -[[http://webia.lip6.fr/~baazizi/research/iswc2015eval/sources/univ1_encoded_unique.id|encoded triples]](2.1MB) | ||
+ | |||
+ | -[[http://webia.lip6.fr/~baazizi/research/iswc2015eval/sources/quads_plus_replicas.id|encoded quaruples with replication]](2.3MB) | ||
+ | |||
+ | -[[http://webia.lip6.fr/~baazizi/research/iswc2015eval/sources/quads.id|replicated quaruples]](0.3MB) | ||
+ | |||
+ | |||
+ | -[[http://webia.lip6.fr/~baazizi/research/iswc2015eval/sources/univ1.nt|univ1.nt]] (16.3MB) |