Bases de Données / Databases

Site Web de l'équipe BD du LIP6 / LIP6 DB Web Site

Outils pour utilisateurs

Outils du site


site:recherche:logiciels:rdfdist

Différences

Ci-dessous, les différences entre deux révisions de la page.

Lien vers cette vue comparative

Les deux révisions précédentes Révision précédente
Prochaine révision
Révision précédente
site:recherche:logiciels:rdfdist [11/05/2015 12:49]
amine
site:recherche:logiciels:rdfdist [26/04/2017 13:06] (Version actuelle)
amann
Ligne 1: Ligne 1:
-===== RDFdist =====+{{indexmenu_n>​1}} 
 + 
 +===== RDFdist ​:  RDF distribution approaches using Spark =====
  
 This wiki page provides information about the experiments RDF distribution approaches using [[http://​spark.apache.org/​|Spark]]. This wiki page provides information about the experiments RDF distribution approaches using [[http://​spark.apache.org/​|Spark]].
Ligne 11: Ligne 13:
 We also created two queries for the [[https://​www.wikidata.org/​wiki/​Wikidata:​Main_Page|Wikidata]] dataset which are referred to as Query 5 and Query 6. We also created two queries for the [[https://​www.wikidata.org/​wiki/​Wikidata:​Main_Page|Wikidata]] dataset which are referred to as Query 5 and Query 6.
  
-===Query 1 (synthetic, ​LUMB)===+===Query 1 (synthetic, ​LUBM)===
 <​code>​ <​code>​
 SELECT ?x ?y ?z  SELECT ?x ?y ?z 
Ligne 86: Ligne 88:
 =====Source Code===== =====Source Code=====
  
-====Hash-based approaches==== 
  
  
-The following code presents ​the random ​hashing ​approach ​for the LUBM datasets, i.e., queries Q1, Q2, Q3 and Q4 are executed and evaluated+====Hash-based approaches==== 
 +The following code corresponds to the triple ​hashing ​and subject hashing approaches ​for the LUBM datasets. 
 +It consists of a data preparation part followd by a query evaluation part.
  
  
Ligne 108: Ligne 110:
  
 val t1 = java.lang.System.currentTimeMillis();​ val t1 = java.lang.System.currentTimeMillis();​
 +
 +
 +/**
 +* set inputData with the path to the data encoded as quadruples (see Datasets excerpts)
 +*/
  
 // loading and transformating the dataset // loading and transformating the dataset
Ligne 116: Ligne 123:
 println("​Loading time "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​ println("​Loading time "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​
  
-// Partitioning the dataset +// Partitioning the dataset  
-val triples = triples0.map{case(f,​(s,​p,​o))=>​((s+p+o),​(s,​p,​o))}.partitionBy(new HashPartitioner(part))+/** 
 +* Uncomment one of the following lines depending on whether hashing is applied 
 +on the entire triple or only the subject  
 +*/ 
 +// val triples = triples0.partitionBy(new HashPartitioner(part))  
 +// val triples = triples0.map{case(f,​(s,​p,​o))=>​((s+p+o),​(s,​p,​o))}.partitionBy(new HashPartitioner(part)) 
  
 //​triples.persist //​triples.persist
Ligne 143: Ligne 156:
 val takco : Long = 1115684864 val takco : Long = 1115684864
  
-def ajout(a : ListBuffer[(Long,​ Long, Long)], e: (Long, Long, Long) ) : ListBuffer[(Long,​ Long, Long)] = { 
-  a += e 
-  return a 
-} 
  
     // -----------------------------------------------------------     // -----------------------------------------------------------
Ligne 224: Ligne 233:
  
 println("​Processing LUBM #12 "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​ println("​Processing LUBM #12 "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​
 +
 +// -----------------------------------------------------------
 +// highly selective on wikidata :: 8 results ​
 +    // QR5:  ?x <​http://​www.wikidata.org/​entity/​P131s>​ ?y. 
 +    ?y <​http://​www.wikidata.org/​entity/​P961v>​ ?z. 
 +    ?z <​http://​www.wikidata.org/​entity/​P704s>​ ?w. 
 +    // -----------------------------------------------------------
 +val qr5 = triples.filter({case(s,​p,​o)=>​p==P131s}).map({case(s,​p,​o)=>​(o,​(s,​p))}).
 +    join(triples.filter({case(s,​p,​o)=>​p==P961v}).map({case(s,​p,​o)=>​(s,​(p,​o))})).
 +    map({case(k,​((s1,​p1),​(p2,​o2)))=>​(o2,​s1)}).
 +    join(triples.filter({case(s,​p,​o)=>​p==P704v}).map({case(s,​p,​o)=>​(s,​(p,​o))}))
 +qr5.persist
 +qr5.collect ​
 +
 +// mid selectivity start shaped query : 10418 results
 +// QR6: ?x <​http://​www.wikidata.org/​entity/​P39v>​ ?y. ?x <​http://​www.wikidata.org/​entity/​P580q>​ ?z. ?x <​http://​www.w3.org/​1999/​02/​22-rdf-syntax-ns#​type>​ ?w
 +
 +val qr6 = triples.filter({case(s,​p,​o)=>​p==P39v}).map({case(s,​p,​o)=>​(s,​null)}).
 +    join(triples.filter({case(s,​p,​o)=>​p==P580q}).map({case(s,​p,​o)=>​(s,​null)})).
 +    join(triples.filter({case(s,​p,​o)=>​p==rdftype}))
 +
 +qr6.persist
 +qr6.collect ​
 +
 </​code>​ </​code>​
  
-The code base for the subject hashing is quite similar, in fact one has to replace line the loading and transforming the dataset and Partitioning the dataset by :+ 
 + 
 +====Graph partitioning-based approaches==== 
 + 
 +===Huang Approach ===
 <​code>​ <​code>​
-// Loading and transforming the dataset +import org.apache.spark.HashPartitioner 
-val triples0 ​= sc.textFile(s"/​user/​olivier/​${folderName}/​${fileName}_encoded_unique"​).map(x => x.split("​ "​)).map(t => (t(0).toLong,​(t(0).toLong,​t(1).toLong,​ t(2).toLong)).persist +import scala.collection.mutable.ListBuffer 
-triples0.count+ 
 +val folder= ​ "​lubm" ​//"​watdiv"​  
 +val dataset= "​univ"​ //"​watdiv"​  
 +val scale="​1k"​ 
 +val part=20 //10, 20 
 + 
 +val folderName = folder +scale 
 +val fileName = dataset+scale+"​_encoded_unique_quads.part."​+part+"​.2hop"​ 
 + 
 +val t1 = java.lang.System.currentTimeMillis();​ 
 + 
 +val quads_I_SPO ​= sc.textFile(s"/​user/​olivier/​${folderName}/​${fileName}"​).coalesce(part).map(x=>​x.split("​,"​)).map(t=>​(t(3).replace("​)",""​).toLong, (t(0).replace("​(",""​).toLong,​t(1).toLong,​t(2).toLong))) 
 + 
 +val quadsDist = quads_I_SPO.partitionBy(new HashPartitioner(part)).persist 
 + 
 + 
 + 
 +val t2 = java.lang.System.currentTimeMillis();​ 
 + 
 +print("​Loading time of quads : "​+(t2-t1)/​1000 +" sec"​) 
 + 
 +    val advisor : Long = 1233125376 
 +    val worksFor : Long = 1136656384 
 +    val suborg : Long = 1224736768 
 +    val memof : Long = 113246208 
 +    val undeg : Long = 1101004800 
 +    val teaof : Long = 1199570944 
 +    val takco : Long = 1115684864 
 + 
 + 
 +    // ----------------------------------------------------------- 
 +    // Query 1 : (not part of the benchmark) 
 +    // Pattern: (x advisor y) (y worksFor z) (z subOrganisation t) 
 +    // ----------------------------------------------------------- 
 + 
 +var t1 = java.lang.System.currentTimeMillis();​ 
 + 
 +var pataws = quadsDist.filter({case(i,​(s,​p,​o)) => p==advisor}).map({case(i,​(s,​p,​o)) => (o,s)}). 
 +    join(quadsDist.filter({case(i,​(s,​p,​o)) => p==worksFor}).map({case(i,​(s,​p,​o)) => (s,​o)}),​part). 
 +    map({case (y,(x,z)) => (z,​(x,​y))}). 
 +    join(quadsDist.filter({case(i,​(s,​p,​o)) => p==suborg}).map({case(i,​(s,​p,​o)) => (s,o)}), part). 
 +    map({case (z,​((x,​y),​t)) => (x,​y,​z,​t)}) 
 + 
 +pataws.count 
 + 
 +var pataws2 = pataws.flatMap(x=>​x).distinct 
 + 
 +var t2= java.lang.System.currentTimeMillis();​ 
 + 
 +println("​Processing Q1 "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​ 
 + 
 + 
 +    // ----------------------------------------------------------- 
 +    // LUBM 2 : MSU 
 +    // Pattern: (x memberOf y) (y subOrg z) (x UndergraduateDegreeFrom z) 
 +    // ----------------------------------------------------------- 
 + 
 +var t1 = java.lang.System.currentTimeMillis();​ 
 + 
 +//var pmemof = quadsDist.filter({case(i,​(s,​p,​o)) => p==memof}).cache() 
 + 
 +var patmsu = quadsDist.filter({case(i,​(s,​p,​o)) => p==memof}).map({case(i,​(s,​p,​o)) => (o,s)}). 
 +             ​join(quadsDist.filter({case(i,​(s,​p,​o)) => p==suborg}).map({case(i,​(s,​p,​o)) => (s,​o)}),​part). 
 +             ​map({case (y,(x,z)) => (x+""​+z,​(x,​y,​z))}).  
 +             ​join(quadsDist.filter({case(i,​(s,​p,​o)) => p==undeg}).map({case(i,​(x,​p,​z))=>​ (x+""​+z,​null)})) 
 + 
 +patmsu.count 
 + 
 +var patmsu2 = patmsu.flatMap(identity).distinct 
 + 
 +var t2= java.lang.System.currentTimeMillis();​ 
 + 
 +println("​Processing Q2 "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​ 
 + 
 +    // ----------------------------------------------------------- 
 +    // LUBM 9 : ATT 
 +    // Pattern: (x advisor y) (y teacherOf z) (x takesCourse z) 
 +    // ----------------------------------------------------------- 
 + 
 +var t1 = java.lang.System.currentTimeMillis();​ 
 + 
 +var patatt = quadsDist.filter({case(i,​(s,​p,​o)) => p==advisor}).map({case(i,​(s,​p,​o)) => (o,s)}). 
 +              join(quadsDist.filter({case(i,​(s,​p,​o)) => p==teaof}).map({case(i,​(s,​p,​o)) => (s,o)}), part). 
 +              map({case (y,(x,z)) => (x+""​+z,​(x,​y,​z))}). 
 +              join(quadsDist.filter({case(i,​(s,​p,​o)) => p==takco}).map({case(i,​(s,​p,​o))=>​ (s+""​+o,​null)}),​ part) 
 + 
 +patatt.distinct.count 
 + 
 +var t2= java.lang.System.currentTimeMillis();​ 
 + 
 +println("​Processing Q3 (LUBM #9) "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​ 
 + 
 + 
 +    // ----------------------------------------------------------- 
 +    // LUBM 12 : WS 
 +    // Pattern: (y worksFor z) (z subOrganisation t) 
 +    // ----------------------------------------------------------- 
 + 
 +val t1 = java.lang.System.currentTimeMillis();​ 
 + 
 +val patws = quadsDist.filter({case(i,​(s,​p,​o)) => p==worksFor}).map({case(i,​(s,​p,​o)) => ((i,​o),​s)}).join(quadsDist.filter({case(i,​(s,​p,​o)) => p==suborg}).map({case(i,​(s,​p,​o)) => ((i,​s),​o)}),​part).map({case ((i,​k),​(s,​o)) => (s,o)}) 
 + 
 +val ans_patws = patws.distinct.count
  
 val t2= java.lang.System.currentTimeMillis();​ val t2= java.lang.System.currentTimeMillis();​
  
-println("​Loading time "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​+println("​Processing LUBM #12 "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​ 
 +</​code>​ 
 + 
 +===Warp=== 
 +<​code>​ 
 +// Spark implementation of WARP replication 
 +// usage: run this code into the spark-shell 
 + 
 +import scala.collection.mutable.ListBuffer 
 +import org.apache.spark.rdd.RDD 
 +import scala.reflect.ClassTag 
 +import org.apache.spark.SparkContext 
 +import org.apache.spark.SparkContext._ 
 + 
 +import org.apache.spark.HashPartitioner 
 +import org.apache.spark.Partitioner 
 + 
 +import java.io.Serializable 
 + 
 + 
 +val folder= "​lubm"​ //"​watdiv"​  
 +val dataset= "​univ"​ //"​watdiv"//​ 
 +val scale="​1k"​ 
 + 
 +/* We have 15 cores per machine.  
 +   Each core is accessing a separate part in parallel. 
 +   The default parallelism = #machines * 15 cores 
 +   <​=> ​  #​machines = defaultParallelism / 15 
 +*/ 
 +val machine = sc.defaultParallelism /15 
 +val part =  sc.defaultParallelism ​  
 +val folderName = folder + scale 
 +val fileName = dataset + scale 
 +/** 
 +* set inputData with the path to the data encoded as quadruples (see Datasets excerpts) 
 +*/ 
 +val inputData = s"/​user/​olivier/​${folderName}/​${fileName}_encoded_unique_quads.part.${machine}"​ 
 + 
 +// Initial state, delete the storage 
 +sc.getPersistentRDDs.values.foreach(x => x.unpersist()) 
 +sc.getPersistentRDDs.values.size 
 + 
 +var t1 = java.lang.System.currentTimeMillis();​ 
 + 
 +def affiche(s: String): Unit={ 
 +println("####​ ------------------------------- ####"​) 
 +println(s) 
 +println("####​ ------------------------------- ####"​) 
 +
 + 
 + 
 +/* 
 +----------------------- 
 +STEP 1: read triples 
 +----------------------- 
 +*/ 
 + 
 +/** 
 +  Function ​ lireTriples reads the input data 
 +  returns a quad (subject, property, object, partID) 
 +  with partID = hash(subject) 
 +**/ 
 +def lireTriples(input:​ String): RDD[(Long, Long, Long, Int)] = { 
 +   ​return sc.textFile(input). 
 +    map(line => line.substring(1,​ line.length -1).split(","​)). 
 +    map(tab => (tab(0).toLong,​ tab(1).toLong,​ tab(2).toLong,​ (tab(0).toInt).hashCode%machine)) 
 +
 + 
 +val triples = lireTriples(inputData) 
 +triples.setName("​triples"​) 
 +triples.persist 
 + 
 +//nombre total de nuplets 
 +//​triples.count 
 + 
 +// stat: nb triplets par partition 
 +//​triples.map{ case(s,​p,​o,​f) => (f,​1)}.reduceByKey( (a,b) => a+b).collect.foreach(println) 
 + 
 + 
 + 
 +/* 
 +--------------------------------------------------------------------------- 
 +STEP 2: COMPUTE REPLICATION TRIPLES 
 + 
 +Computes the triple to replicate in every partition, for each query 
 +--------------------------------------------------------------------------- 
 +*/ 
 + 
 +/* 
 + ​Fonction filterProp : selects the triples a a given property 
 + */ 
 +def filterProp(input:​ RDD[(Long, Long, Long, Int)], prop: Long): RDD[(Long, Long, Int)] = { 
 +x  // on projette sur S et O car P est fixe 
 +  return input.filter(x => x._2 == prop).map{ case (s,p,o,f) => (s,o,f)} 
 +
 + 
 +def getOutsideTriple(seedF:​ Int, t: ListBuffer[(Long,​ Long, Int)], properties: Array[Long]):​ Seq[(Long, Long, Long, Int)] = { 
 +  var a = t.toArray 
 +  var res: ListBuffer[(Long,​ Long, Long, Int)] = ListBuffer() 
 + 
 +  for(i <-0 to (a.length - 1)) { 
 +    if( a(i)._3 != seedF) { 
 +      res.append( (a(i)._1, ​ properties(i),​ a(i)._2, ​ seedF)) 
 +    } 
 +  } 
 +  return res 
 +
 + 
 +def getReplicaForQuery(query:​ RDD[ListBuffer[(Long,​ Long, Int)]], properties: Array[Long],​ nbCandidateSeed:​ Int): RDD[(Long, Long, Long, Int)] = { 
 +  var replica = sc.parallelize(List((0L,​0L,​0L,​0))) 
 +  var min_nb = -1L 
 + 
 +  for (i <- 0 to (nbCandidateSeed-1)) { 
 +    var t1 = query.map(x => (x(i)._3, x)) 
 +    //​t1.take(5).foreach(println) 
 + 
 +    // lister les triplets a  repliquer = ceux qui ne sont pas dans la seed partition 
 +    var t2 = t1.flatMap{ case(seedF, tripleList) => getOutsideTriple(seedF,​ tripleList, properties)}.distinct 
 + 
 +    // count the triples to replicate 
 +    var nb = t2.count 
 +    if(min_nb == -1 || nb < min_nb){ 
 +      min_nb=nb 
 +      replica=t2 
 +      println("​current:"​) 
 +      println(s"​seed $i, replica count: $nb ") 
 +    } 
 +    else { 
 +      println(s"​-----ignore seed $i, replica count: $nb ") 
 +    } 
 +  } 
 +  return replica 
 +
 + 
 +// ------------------------------------------------------- 
 +//  Compute replication for the folowing QUERIES  
 +// ------------------------------------------------------- 
 + 
 +// based on dictionnary encoding 
 +val advisor : Long = 1233125376 
 +val worksFor : Long = 1136656384 
 +val suborg : Long = 1224736768 
 +val memof : Long = 1132462080 
 +val undeg : Long = 1101004800 
 +val teaof : Long = 1199570944 
 +val takco : Long = 1115684864 
 + 
 +// ----------------------------------------------------------- 
 +// Query 1 : (not part of the benchmark) 
 +// Pattern: (x advisor y) (y worksFor z) (z subOrganisation t) 
 +// ----------------------------------------------------------- 
 +val query1 = filterProp(triples,​ advisor).map{ case (x, y, f1) => ( y, (x, f1))}. 
 +  join(filterProp(triples,​ worksFor).map{ case (y, z, f2) => ( y, (z, f2) )}). 
 +  map{ case ( y, ((x,f1), (z, f2))) => (z, (x, f1, y, f2)) }. 
 +  join(filterProp(triples,​ suborg).map{ case ( z, t, f3) => (z, (t,​f3))}). 
 +  map{ case ( z, ( (x,​f1,​y,​f2),​ (t,f3) )) => ListBuffer( (x, y, f1), (y, z, f2) , (z, t, f3))} 
 + 
 +query1.setName("​q1"​) 
 +//​query1.persist() 
 +//​query1.count 
 +// resultat: 4108791 
 + 
 +var t1 = java.lang.System.currentTimeMillis();​ 
 + 
 +val replica1 =  getReplicaForQuery(query1,​ Array(advisor,​ worksFor, suborg), 3) 
 + 
 +var t2= java.lang.System.currentTimeMillis();​ 
 + 
 +println("​Processing Q1 "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​ 
 + 
 +// stat: nb triples a repliquer dans chaque partition 
 +//​replica1.map{ case(s,​p,​o,​f) => (f,​1)}.reduceByKey( (a,b) => a+b).collect.foreach(println) 
 +// (0,15551) (1,15776) (2,17639) (3,201275) (4,46068) 
 + 
 + 
 +// ----------------------------------------------------------- 
 +// LUBM 2 : MSU 
 +// Pattern: (x memberOf y) (y subOrg z) (x UndergraduateDegreeFrom z) 
 +// ----------------------------------------------------------- 
 +val query2 = filterProp(triples,​ memof).map{ case (x, y, f1) => ( y, (x, f1))}. 
 +  join(filterProp(triples,​ suborg).map{ case (y, z, f2) => ( y, (z, f2) )}). 
 +  map{ case ( y, ((x,f1), (z,f2) )) => ((x, z), (y, f1, f2)) }. 
 +  join(filterProp(triples,​ undeg).map{ case (x, z, f3) => ( (x, z), f3)}). 
 +  map{ case ((x, z), ((y, f1, f2), f3)) => ListBuffer((x,​ y, f1), (y, z, f2), (x, z, f3)) } 
 + 
 +query2.setName("​q2"​) 
 +//​query2.persist() 
 +//​query2.count 
 + 
 +val replica2 =  getReplicaForQuery(query2,​ Array(memof,​ suborg, undeg), 2) 
 + 
 +//stat: nb triples a  repliquer dans chaque partition 
 +//​replica2.map{ case(s,​p,​o,​f) => (f,​1)}.reduceByKey( (a,b) => a+b).collect.foreach(println) 
 +//(0,1) (1,3) (3,4) (4,896) 
 + 
 +// ----------------------------------------------------------- 
 +// LUBM 9 : ATT 
 +// Pattern: (x advisor y) (y teacherOf z) (x takesCourse z) 
 +// ----------------------------------------------------------- 
 +val query9 = filterProp(triples,​ advisor).map{ case (x, y, f1) => ( y,(x, f1))}. 
 +  join(filterProp(triples,​ teaof).map{ case (y, z, f2) => ( y, (z, f2) )}). 
 +  map{ case ( y, ((x, f1), (z, f2) )) => ((x, z), (y, f1, f2)) }. 
 +  join(filterProp(triples,​ takco).map{ case (x, z, f3) => ( (x, z), f3)}). 
 +  map{ case ((x, z), ((y, f1, f2), f3)) => ListBuffer((x,​ y, f1), (y, z, f2), (x, z, f3)) } 
 + 
 +query9.setName("​q9"​) 
 +//​query9.persist() 
 +query9.count 
 +// 272982 
 + 
 +val replica9 =  getReplicaForQuery(query9,​ Array(advisor,​ teaof, takco) ,2) 
 + 
 +// stat : nb triples à repliquer dans chaque partition 
 +//​replica9.map{ case(s,​p,​o,​f) => (f,​1)}.reduceByKey( (a,b) => a+b).collect.foreach(println) 
 +//(0,3131) (1,3213) (2,3504) (3,44706) (4,2014) 
 + 
 + 
 +// ----------------------------------------------------------- 
 +// LUBM 12 : WS 
 +// Pattern: (y worksFor z) (z subOrganisation t) 
 +// ----------------------------------------------------------- 
 +val query12 = filterProp(triples,​ worksFor).map{ case (y, z, f1) => ( z, (y, f1))}. 
 +  join(filterProp(triples,​ suborg).map{ case (z, t, f2) => ( z, (t, f2))}). 
 +  map{ case ( z, ((y, f1), (t, f2) )) => ListBuffer((y,​ z, f1), (z, t, f2)) } 
 + 
 +query12.setName("​q12"​) 
 +//​query12.persist() 
 +query12.count 
 +//720628 
 + 
 +val replica12 =  getReplicaForQuery(query12,​ Array(worksFor,​ suborg), 2) 
 + 
 +//stat : nb triples à repliquer dans chaque partition 
 +//​replica12.map{ case(s,​p,​o,​f) => (f,​1)}.reduceByKey( (a,b) => a+b).collect.foreach(println) 
 + 
 + 
 +/* 
 +--------------------------------------------------------------------------- 
 +STEP 3: UNION DES REPLICAS CALCULES POUR CHAQUE REQUETE  
 +--------------------------------------------------------------------------- 
 +*/ 
 + 
 +val allreplica = replica1.union(replica2).union(replica9).union(replica12).distinct 
 + 
 +val nbAjout = allreplica.count 
 +// 357 161 pour 5 part 
 +// 734 295 pour 10 part 
 +// 779 983 pour 20 part 
 + 
 +// replication rate 
 +affiche("​N ajout: ​ " + nbAjout.toDouble) 
 +affiche("​taux de replication:​ " + (nbAjout.toDouble / triples.count)) 
 + 
 +var t2= java.lang.System.currentTimeMillis();​ 
 +affiche("​Preparation time "+ (t2 - t1).toDouble/​1000 +" sec for " + part + " partitions"​);​ 
 + 
 +val oDist = triples.map{ case(s,​p,​o,​f) => (f,​1)}.reduceByKey( (a,b) => a+b) 
 +val oMean = oDist.map{case(f,​c)=>​c}.sum / machine 
 +val odevs = oDist.map{case(f,​c)=>​c}.map(score => (score - oMean) * (score - oMean)) 
 +val ostddev = Math.sqrt(odevs.sum / machine) 
 + 
 +val nDist = oDist.join(allreplica.map{ case(s,​p,​o,​f) => (f,​1)}.reduceByKey( (a,b) => a+b)).map{case(f,​(oc,​on))=>​(oc+on)} 
 +val nMean = nDist.sum / machine 
 +val ndevs = nDist.map(score => (score - nMean) * (score - nMean)) 
 +val nstddev = Math.sqrt(ndevs.sum / machine) 
 +  
 + 
 + 
 +/* 
 +--------------------------------------------------------------------------- 
 +STEP 4: LOCAL QUERY PROCESSING  
 +On the WARP replicated data 
 +--------------------------------------------------------------------------- 
 +*/ 
 + 
 +// Extends each partition with the WARP replicated triples. 
 +// Each machine stores one partition 
 +val triplesWARP = triples.union(allreplica). 
 +  map{ case (s,p,o,f) => (f, (s,​p,​o))}. 
 +  partitionBy(new HashPartitioner(machine)).map{ case (f, (s,p,o)) => (s,​p,​o,​f)} 
 + 
 +triplesWARP.setName("​triples WARP"​) 
 +triplesWARP.persist() 
 +triplesWARP.count 
 + 
 + 
 +var t2= java.lang.System.currentTimeMillis();​ 
 +affiche("​Preparation time "+ (t2 - t1).toDouble/​1000 +" sec for " + part + " partitions"​);​ 
 + 
 + 
 +// ------------------- 
 +//  Q1 LOCAL 
 +// ------------------- 
 +t1= java.lang.System.currentTimeMillis();​ 
 + 
 +// Query1 locale avec projection sur les variables (x,y,z,t) 
 +val localQuery1 = filterProp(triplesWARP,​ advisor).map{ case (x, y, f) => ( (y, f), x)}. 
 +  join(filterProp(triplesWARP,​ worksFor).map{ case (y, z, f) => ( (y, f), z )}). 
 +  map{ case ( (y, f), (x, z)) => ((z, f), (x, y)) }. 
 +  join(filterProp(triplesWARP,​ suborg).map{ case ( z, t, f) => ((z, f), t)}). 
 +  map{ case ( (z, f), ( (x,y), t)) => (x, y, z, t)} 
 + 
 +localQuery1.count 
 +//​4108791 ​ OK idem sans replication 
 + 
 +t2= java.lang.System.currentTimeMillis();​ 
 +affiche("​Duréee de la requête Q1 : "+ (t2 - t1) +" msec pour " + part + " partitions"​);​ 
 + 
 +// ----------------------------------------------------------- 
 +// Q2 LOCAL: 
 +// Pattern: (x memberOf y) (y subOrg z) (x UndergraduateDegreeFrom z) 
 +// ----------------------------------------------------------- 
 +t1= java.lang.System.currentTimeMillis();​ 
 + 
 +val localQuery2 = filterProp(triplesWARP,​ memof).map{ case (x, y, f) => ( (y, f), x)}. 
 +  join(filterProp(triplesWARP,​ suborg).map{ case (y, z, f) => ( (y, f), z )}). 
 +  map{ case ( (y, f), (x, z)) => ((x, z, f), y) }. 
 +  join(filterProp(triplesWARP,​ undeg).map{ case (x, z, f) => ( (x, z, f), 1)}). 
 +  map{ case ((x, z, f), (y, 1)) => (x, y, z) } 
 + 
 +localQuery2.count 
 +//2528 
 +t2= java.lang.System.currentTimeMillis();​ 
 +affiche("​Duree de la requete Q2 : "+ (t2 - t1) +" msec pour " + part + " partitions"​);​ 
 + 
 + 
 +// ----------------------------------------------------------- 
 +// Q9 LOCAL 
 +// Pattern: (x advisor y) (y teacherOf z) (x takesCourse z) 
 +// ----------------------------------------------------------- 
 +t1= java.lang.System.currentTimeMillis();​ 
 + 
 +val localQuery9 = filterProp(triplesWARP,​ advisor).map{ case (x, y, f) => ( (y, f), x)}. 
 +  join(filterProp(triplesWARP,​ teaof).map{ case (y, z, f) => ( (y, f), z )}). 
 +  map{ case ( (y, f), (x, z)) => ((x, z, f), y) }. 
 +  join(filterProp(triplesWARP,​ takco).map{ case (x, z, f) => ( (x, z, f), 1)}). 
 +  map{ case ((x, z, f), (y, 1)) => (x, y, z) } 
 + 
 +localQuery9.count 
 +//272982 
 + 
 +t2= java.lang.System.currentTimeMillis();​ 
 +affiche("​Duree de la requete Q9 : "+ (t2 - t1) +" msec pour " + part + " partitions"​);​ 
 + 
 + 
 +// ----------------------------------------------------------- 
 +// Q12 LOCAL 
 +// Pattern: (y worksFor z) (z subOrganisation t) 
 +// ----------------------------------------------------------- 
 +t1= java.lang.System.currentTimeMillis();​ 
 + 
 +val localQuery12 = filterProp(triplesWARP,​ worksFor).map{ case (y, z, f) => ( (z, f), y)}. 
 +  join(filterProp(triplesWARP,​ suborg).map{ case (z, t, f) => ( (z, f), t)}). 
 +  map{ case ( (z, f), (y, t)) => (y, z, t) } 
 + 
 +localQuery12.count 
 +//938356 
 + 
 +t2= java.lang.System.currentTimeMillis();​ 
 +affiche("​Duree de la requete Q12: "+ (t2 - t1) +" msec pour " + part + " partitions"​);​ 
  
-// Partitioning the dataset 
-val triples = triples0.partitionBy(new HashPartitioner(part)) ​ 
 </​code>​ </​code>​
 +===2-hop based approach===
 +<​code>​
 +val folder= ​ "​lubm"​
 +val dataset= "​univ"​
 +val scale="​10k"​
  
-====Partitioning-based approaches====+val folderName ​folder +scale 
 +val part Array(5,​10,​20)
  
-==nHop==+for (p <- part) 
 +
 +val fileName ​dataset+scale+"​_encoded_unique_quads.part."​+p 
 +val fileNamewatdiv2k_encoded_unique_quads.partNew.5 
 +val t1 java.lang.System.currentTimeMillis();​
  
-==WARP==+val quads sc.textFile(s"/​user/​olivier/​${folderName}/​${fileName}"​).map(x=>​x.split(","​)).map(t=>​(t(0).replace("​(",""​).toLong,​t(1).toLong,​t(2).toLong,​t(3).replace("​)",""​).toLong))
  
-==Hybrid==+var addOneHop ​quads.map({case(s,​p,​o,​i)=>​(o,​i)}).join(quads.map({case(s,​p,​o,​i)=>​(s,​(p,​o,​i))})).filter({case(termS,​(i1,​(p,​o,​i2)))=>​i1!=i2}).distinct.map({case(termS,​(i1,​(p,​o,​i2)))=>​(termS,​p,​o,​i1)})
  
 +val newQuads = quads.union(addOneHop).distinct
 +val newQuadsSize = newQuads.count
 +
 +val t2 = java.lang.System.currentTimeMillis();​
 +val hopSize = addOneHop.count
 +println(s"​Time to compute one more hop on $folderName for $p partitions is ${t2-t1}"​)
 +println(s"​ new size = $newQuadsSize , added $hopSize"​)
 +newQuads.saveAsTextFile(s"/​user/​olivier/​${folderName}/​${fileName}.2hop"​)
 +
 +</​code>​
 ====Datasets excerpts==== ====Datasets excerpts====
 +===Encoding of LUBM concepts and properties===
 +
 +<​code>​
 +Properties:
 +0 <​http://​www.w3.org/​1999/​02/​22-rdf-syntax-ns#​type>​
 +603979776 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​officeNumber>​
 +671088640 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​name>​
 +738197504 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​title>​
 +805306368 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​age>​
 +872415232 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​telephone>​
 +939524096 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​emailAddress>​
 +1006632960 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​researchInterest>​
 +1082130432 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​researchProject>​
 +1090519040 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​hasAlumnus>​
 +1098907648 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​degreeFrom>​
 +1101004800 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​undergraduateDegreeFrom>​
 +1103101952 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​mastersDegreeFrom>​
 +1105199104 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​doctoralDegreeFrom>​
 +1107296256 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​orgPublication>​
 +1115684864 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​takesCourse>​
 +1124073472 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​member>​
 +1132462080 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​memberOf>​
 +1136656384 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​worksFor>​
 +1138753536 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​headOf>​
 +1140850688 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​teachingAssistantOf>​
 +1149239296 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​listedCourse>​
 +1157627904 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​softwareDocumentation>​
 +1166016512 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​publicationAuthor>​
 +1174405120 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​softwareVersion>​
 +1182793728 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​affiliateOf>​
 +1191182336 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​tenured>​
 +1199570944 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​teacherOf>​
 +1207959552 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​publicationDate>​
 +1216348160 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​affiliatedOrganizationOf>​
 +1224736768 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​subOrganizationOf>​
 +1233125376 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​advisor>​
 +1241513984 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​publicationResearch>​
 +
 +Concepts:
 +0 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Schedule>​
 +268435456 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Organization>​
 +301989888 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​College>​
 +335544320 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Department>​
 +369098752 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Institute>​
 +402653184 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​ResearchGroup>​
 +436207616 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Program>​
 +469762048 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​University>​
 +536870912 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Publication>​
 +570425344 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Software>​
 +603979776 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Book>​
 +637534208 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Specification>​
 +671088640 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Manual>​
 +704643072 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Article>​
 +713031680 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​TechnicalReport>​
 +721420288 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​ConferencePaper>​
 +729808896 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​JournalArticle>​
 +738197504 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​UnofficialPublication>​
 +805306368 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Person>​
 +872415232 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​TeachingAssistant>​
 +939524096 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Student>​
 +956301312 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​GraduateStudent>​
 +973078528 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​UndergraduateStudent>​
 +1006632960 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Employee>​
 +1015021568 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​ResearchAssistant>​
 +1023410176 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Director>​
 +1031798784 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​AdministrativeStaff>​
 +1033895936 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​SystemsStaff>​
 +1035993088 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​ClericalStaff>​
 +1040187392 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Faculty>​
 +1042284544 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​PostDoc>​
 +1044381696 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Professor>​
 +1044643840 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Chair>​
 +1044905984 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​VisitingProfessor>​
 +1045168128 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​AssociateProfessor>​
 +1045430272 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Dean>​
 +1045692416 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​FullProfessor>​
 +1045954560 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​AssistantProfessor>​
 +1046478848 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Lecturer>​
 +1073741824 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Work>​
 +1140850688 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Course>​
 +1174405120 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​GraduateCourse>​
 +1207959552 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Research> ​
 +</​code>​
 +
 +===LUBM Univ1  ===
 +-[[http://​webia.lip6.fr/​~baazizi/​research/​iswc2015eval/​sources/​univ1_encoded_unique.id|encoded triples]](2.1MB)
 +
 +-[[http://​webia.lip6.fr/​~baazizi/​research/​iswc2015eval/​sources/​quads_plus_replicas.id|encoded quaruples with replication]](2.3MB)
 +
 +-[[http://​webia.lip6.fr/​~baazizi/​research/​iswc2015eval/​sources/​quads.id|replicated quaruples]](0.3MB)
 +
 +
  
 +-[[http://​webia.lip6.fr/​~baazizi/​research/​iswc2015eval/​sources/​univ1.nt|univ1.nt]] (16.3MB)
site/recherche/logiciels/rdfdist.1431341363.txt.gz · Dernière modification: 11/05/2015 12:49 par amine