Bases de Données / Databases

Site Web de l'équipe BD du LIP6 / LIP6 DB Web Site

Outils pour utilisateurs

Outils du site


site:recherche:logiciels:rdfdist

Différences

Ci-dessous, les différences entre deux révisions de la page.

Lien vers cette vue comparative

Les deux révisions précédentes Révision précédente
Prochaine révision
Révision précédente
site:recherche:logiciels:rdfdist [11/05/2015 12:31]
amine [RDFdist]
site:recherche:logiciels:rdfdist [26/04/2017 13:06] (Version actuelle)
amann
Ligne 1: Ligne 1:
-===== RDFdist =====+{{indexmenu_n>​1}} 
 + 
 +===== RDFdist ​:  RDF distribution approaches using Spark =====
  
 This wiki page provides information about the experiments RDF distribution approaches using [[http://​spark.apache.org/​|Spark]]. This wiki page provides information about the experiments RDF distribution approaches using [[http://​spark.apache.org/​|Spark]].
-This information consists of i) the query workload ii) the source code for both data preparation and query evaluation, and iii) the  description of datasets used in the experiments.+This information consists of i) the query workload ii) the source code for both data preparation and query evaluation, and iii) the  description of two datasets used in the experiments.
 For the sake of reproducibility,​ each source code is provided as a script which can be directly executed in the spark shell. For the sake of reproducibility,​ each source code is provided as a script which can be directly executed in the spark shell.
-=====Source Code===== 
  
-====Hash-based approaches====+=====Query workload===
 +We picked three queries from the [[http://​swat.cse.lehigh.edu/​projects/​lubm/​|LUBM ]] dataset, namely #2, #9 and #12 
 +which are referred to as Query 2, Query 3 and Query 4, respectively. 
 +We have created an addirional one referred to as Query 1. 
 +We also created two queries for the [[https://​www.wikidata.org/​wiki/​Wikidata:​Main_Page|Wikidata]] dataset which are referred to as Query 5 and Query 6.
  
 +===Query 1 (synthetic, LUBM)===
 +<​code>​
 +SELECT ?x ?y ?z 
 +WHERE
 +{?x lubm:​advisor ?y.
 + ?y lubm:​worksFor ?z.
 + ?z lubm:​subOrganisation ?t
 +}
 +</​code>​
  
-The following code presents the random hashing approach for the LUBM datasets, i.e., queries Q1, Q2, Q3 and Q4 are executed and evaluated. 
  
 +
 +
 +===Query 2 (#2 of LUBM)===
 +<​code>​
 +PREFIX rdf: <​http://​www.w3.org/​1999/​02/​22-rdf-syntax-ns#> ​
 +PREFIX ub: <​http://​www.lehigh.edu/​~zhp2/​2004/​0401/​univ-bench.owl#> ​
 +SELECT ?X, ?Y, ?Z 
 +WHERE 
 +{?X rdf:type ub:​GraduateStudent . 
 +  ?Y rdf:type ub:​University . 
 +  ?Z rdf:type ub:​Department . 
 +  ?X ub:memberOf ?Z . 
 +  ?Z ub:​subOrganizationOf ?Y . 
 +  ?X ub:​undergraduateDegreeFrom ?Y}
 +</​code>​
 +
 +===Query 3 (#9 of LUBM)===
 +<​code>​
 +PREFIX rdf: <​http://​www.w3.org/​1999/​02/​22-rdf-syntax-ns#> ​
 +PREFIX ub: <​http://​www.lehigh.edu/​~zhp2/​2004/​0401/​univ-bench.owl#> ​
 +SELECT ?X, ?Y, ?Z 
 +WHERE 
 +{?X rdf:type ub:Student . 
 +  ?Y rdf:type ub:Faculty . 
 +  ?Z rdf:type ub:Course . 
 +  ?X ub:advisor ?Y . 
 +  ?Y ub:​teacherOf ?Z . 
 +  ?X ub:​takesCourse ?Z}
 +</​code>​
 +
 +===Query 4 (#12 of LUBM)===
 +<​code>​
 +PREFIX rdf: <​http://​www.w3.org/​1999/​02/​22-rdf-syntax-ns#> ​
 +PREFIX ub: <​http://​www.lehigh.edu/​~zhp2/​2004/​0401/​univ-bench.owl#> ​
 +SELECT ?X, ?Y 
 +WHERE 
 +{?X rdf:type ub:Chair . 
 +  ?Y rdf:type ub:​Department . 
 +  ?X ub:worksFor ?Y . 
 +  ?Y ub:​subOrganizationOf <​http://​www.University0.edu>​} ​
 +
 +</​code>​
 +
 +===Query 5 (synthetic, Wikidata)===
 +<​code>​
 +SELECT ?x ?y ?z 
 +WHERE  ​
 +{?x entity:​P131s ?y. 
 +?y entity:​P961v>​ ?z. 
 +?z entity:​P704s ?w
 +}
 +</​code>​
 +
 +===Query 6 (synthetic, Wikidata) ===
 +<​code>​
 +SELECT ?x ?y ?z 
 +{?x entity:P39v ?y. 
 +?x entity:​P580q ?z. 
 +?x rdf:type ?w
 +}
 +</​code>​
 +
 +=====Source Code=====
 +
 +
 +
 +====Hash-based approaches====
 +The following code corresponds to the triple hashing and subject hashing approaches for the LUBM datasets.
 +It consists of a data preparation part followd by a query evaluation part.
  
  
Ligne 28: Ligne 110:
  
 val t1 = java.lang.System.currentTimeMillis();​ val t1 = java.lang.System.currentTimeMillis();​
 +
 +
 +/**
 +* set inputData with the path to the data encoded as quadruples (see Datasets excerpts)
 +*/
  
 // loading and transformating the dataset // loading and transformating the dataset
Ligne 36: Ligne 123:
 println("​Loading time "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​ println("​Loading time "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​
  
-// Partitioning the dataset +// Partitioning the dataset  
-val triples = triples0.map{case(f,​(s,​p,​o))=>​((s+p+o),​(s,​p,​o))}.partitionBy(new HashPartitioner(part))+/** 
 +* Uncomment one of the following lines depending on whether hashing is applied 
 +on the entire triple or only the subject  
 +*/ 
 +// val triples = triples0.partitionBy(new HashPartitioner(part))  
 +// val triples = triples0.map{case(f,​(s,​p,​o))=>​((s+p+o),​(s,​p,​o))}.partitionBy(new HashPartitioner(part)) 
  
 //​triples.persist //​triples.persist
Ligne 63: Ligne 156:
 val takco : Long = 1115684864 val takco : Long = 1115684864
  
-def ajout(a : ListBuffer[(Long,​ Long, Long)], e: (Long, Long, Long) ) : ListBuffer[(Long,​ Long, Long)] = { 
-  a += e 
-  return a 
-} 
  
     // -----------------------------------------------------------     // -----------------------------------------------------------
Ligne 144: Ligne 233:
  
 println("​Processing LUBM #12 "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​ println("​Processing LUBM #12 "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​
 +
 +// -----------------------------------------------------------
 +// highly selective on wikidata :: 8 results ​
 +    // QR5:  ?x <​http://​www.wikidata.org/​entity/​P131s>​ ?y. 
 +    ?y <​http://​www.wikidata.org/​entity/​P961v>​ ?z. 
 +    ?z <​http://​www.wikidata.org/​entity/​P704s>​ ?w. 
 +    // -----------------------------------------------------------
 +val qr5 = triples.filter({case(s,​p,​o)=>​p==P131s}).map({case(s,​p,​o)=>​(o,​(s,​p))}).
 +    join(triples.filter({case(s,​p,​o)=>​p==P961v}).map({case(s,​p,​o)=>​(s,​(p,​o))})).
 +    map({case(k,​((s1,​p1),​(p2,​o2)))=>​(o2,​s1)}).
 +    join(triples.filter({case(s,​p,​o)=>​p==P704v}).map({case(s,​p,​o)=>​(s,​(p,​o))}))
 +qr5.persist
 +qr5.collect ​
 +
 +// mid selectivity start shaped query : 10418 results
 +// QR6: ?x <​http://​www.wikidata.org/​entity/​P39v>​ ?y. ?x <​http://​www.wikidata.org/​entity/​P580q>​ ?z. ?x <​http://​www.w3.org/​1999/​02/​22-rdf-syntax-ns#​type>​ ?w
 +
 +val qr6 = triples.filter({case(s,​p,​o)=>​p==P39v}).map({case(s,​p,​o)=>​(s,​null)}).
 +    join(triples.filter({case(s,​p,​o)=>​p==P580q}).map({case(s,​p,​o)=>​(s,​null)})).
 +    join(triples.filter({case(s,​p,​o)=>​p==rdftype}))
 +
 +qr6.persist
 +qr6.collect ​
 +
 </​code>​ </​code>​
  
-==Hashing applied on the subject == 
  
-====Partitioning-based approaches==== 
  
-==nHop==+====Graph partitioning-based approaches====
  
-==WARP==+===Huang Approach ​=== 
 +<​code>​ 
 +import org.apache.spark.HashPartitioner 
 +import scala.collection.mutable.ListBuffer
  
-==Hybrid==+val folder "​lubm"​ //"​watdiv"​  
 +val dataset"​univ"​ //"​watdiv"​  
 +val scale="​1k"​ 
 +val part=20 //10, 20
  
 +val folderName = folder +scale
 +val fileName = dataset+scale+"​_encoded_unique_quads.part."​+part+"​.2hop"​
 +
 +val t1 = java.lang.System.currentTimeMillis();​
 +
 +val quads_I_SPO = sc.textFile(s"/​user/​olivier/​${folderName}/​${fileName}"​).coalesce(part).map(x=>​x.split(","​)).map(t=>​(t(3).replace("​)",""​).toLong,​ (t(0).replace("​(",""​).toLong,​t(1).toLong,​t(2).toLong)))
 +
 +val quadsDist = quads_I_SPO.partitionBy(new HashPartitioner(part)).persist
 +
 +
 +
 +val t2 = java.lang.System.currentTimeMillis();​
 +
 +print("​Loading time of quads : "​+(t2-t1)/​1000 +" sec")
 +
 +    val advisor : Long = 1233125376
 +    val worksFor : Long = 1136656384
 +    val suborg : Long = 1224736768
 +    val memof : Long = 113246208
 +    val undeg : Long = 1101004800
 +    val teaof : Long = 1199570944
 +    val takco : Long = 1115684864
 +
 +
 +    // -----------------------------------------------------------
 +    // Query 1 : (not part of the benchmark)
 +    // Pattern: (x advisor y) (y worksFor z) (z subOrganisation t)
 +    // -----------------------------------------------------------
 +
 +var t1 = java.lang.System.currentTimeMillis();​
 +
 +var pataws = quadsDist.filter({case(i,​(s,​p,​o)) => p==advisor}).map({case(i,​(s,​p,​o)) => (o,s)}).
 +    join(quadsDist.filter({case(i,​(s,​p,​o)) => p==worksFor}).map({case(i,​(s,​p,​o)) => (s,​o)}),​part).
 +    map({case (y,(x,z)) => (z,​(x,​y))}).
 +    join(quadsDist.filter({case(i,​(s,​p,​o)) => p==suborg}).map({case(i,​(s,​p,​o)) => (s,o)}), part).
 +    map({case (z,​((x,​y),​t)) => (x,y,z,t)})
 +
 +pataws.count
 +
 +var pataws2 = pataws.flatMap(x=>​x).distinct
 +
 +var t2= java.lang.System.currentTimeMillis();​
 +
 +println("​Processing Q1 "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​
 +
 +
 +    // -----------------------------------------------------------
 +    // LUBM 2 : MSU
 +    // Pattern: (x memberOf y) (y subOrg z) (x UndergraduateDegreeFrom z)
 +    // -----------------------------------------------------------
 +
 +var t1 = java.lang.System.currentTimeMillis();​
 +
 +//var pmemof = quadsDist.filter({case(i,​(s,​p,​o)) => p==memof}).cache()
 +
 +var patmsu = quadsDist.filter({case(i,​(s,​p,​o)) => p==memof}).map({case(i,​(s,​p,​o)) => (o,s)}).
 +             ​join(quadsDist.filter({case(i,​(s,​p,​o)) => p==suborg}).map({case(i,​(s,​p,​o)) => (s,​o)}),​part).
 +             ​map({case (y,(x,z)) => (x+""​+z,​(x,​y,​z))}). ​
 +             ​join(quadsDist.filter({case(i,​(s,​p,​o)) => p==undeg}).map({case(i,​(x,​p,​z))=>​ (x+""​+z,​null)}))
 +
 +patmsu.count
 +
 +var patmsu2 = patmsu.flatMap(identity).distinct
 +
 +var t2= java.lang.System.currentTimeMillis();​
 +
 +println("​Processing Q2 "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​
 +
 +    // -----------------------------------------------------------
 +    // LUBM 9 : ATT
 +    // Pattern: (x advisor y) (y teacherOf z) (x takesCourse z)
 +    // -----------------------------------------------------------
 +
 +var t1 = java.lang.System.currentTimeMillis();​
 +
 +var patatt = quadsDist.filter({case(i,​(s,​p,​o)) => p==advisor}).map({case(i,​(s,​p,​o)) => (o,s)}).
 +              join(quadsDist.filter({case(i,​(s,​p,​o)) => p==teaof}).map({case(i,​(s,​p,​o)) => (s,o)}), part).
 +              map({case (y,(x,z)) => (x+""​+z,​(x,​y,​z))}).
 +              join(quadsDist.filter({case(i,​(s,​p,​o)) => p==takco}).map({case(i,​(s,​p,​o))=>​ (s+""​+o,​null)}),​ part)
 +
 +patatt.distinct.count
 +
 +var t2= java.lang.System.currentTimeMillis();​
 +
 +println("​Processing Q3 (LUBM #9) "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​
 +
 +
 +    // -----------------------------------------------------------
 +    // LUBM 12 : WS
 +    // Pattern: (y worksFor z) (z subOrganisation t)
 +    // -----------------------------------------------------------
 +
 +val t1 = java.lang.System.currentTimeMillis();​
 +
 +val patws = quadsDist.filter({case(i,​(s,​p,​o)) => p==worksFor}).map({case(i,​(s,​p,​o)) => ((i,​o),​s)}).join(quadsDist.filter({case(i,​(s,​p,​o)) => p==suborg}).map({case(i,​(s,​p,​o)) => ((i,​s),​o)}),​part).map({case ((i,​k),​(s,​o)) => (s,o)})
 +
 +val ans_patws = patws.distinct.count
 +
 +val t2= java.lang.System.currentTimeMillis();​
 +
 +println("​Processing LUBM #12 "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​
 +</​code>​
 +
 +===Warp===
 +<​code>​
 +// Spark implementation of WARP replication
 +// usage: run this code into the spark-shell
 +
 +import scala.collection.mutable.ListBuffer
 +import org.apache.spark.rdd.RDD
 +import scala.reflect.ClassTag
 +import org.apache.spark.SparkContext
 +import org.apache.spark.SparkContext._
 +
 +import org.apache.spark.HashPartitioner
 +import org.apache.spark.Partitioner
 +
 +import java.io.Serializable
 +
 +
 +val folder= "​lubm"​ //"​watdiv" ​
 +val dataset= "​univ"​ //"​watdiv"//​
 +val scale="​1k"​
 +
 +/* We have 15 cores per machine. ​
 +   Each core is accessing a separate part in parallel.
 +   The default parallelism = #machines * 15 cores
 +   <​=> ​  #​machines = defaultParallelism / 15
 +*/
 +val machine = sc.defaultParallelism /15
 +val part =  sc.defaultParallelism  ​
 +val folderName = folder + scale
 +val fileName = dataset + scale
 +/**
 +* set inputData with the path to the data encoded as quadruples (see Datasets excerpts)
 +*/
 +val inputData = s"/​user/​olivier/​${folderName}/​${fileName}_encoded_unique_quads.part.${machine}"​
 +
 +// Initial state, delete the storage
 +sc.getPersistentRDDs.values.foreach(x => x.unpersist())
 +sc.getPersistentRDDs.values.size
 +
 +var t1 = java.lang.System.currentTimeMillis();​
 +
 +def affiche(s: String): Unit={
 +println("####​ ------------------------------- ####")
 +println(s)
 +println("####​ ------------------------------- ####")
 +}
 +
 +
 +/*
 +-----------------------
 +STEP 1: read triples
 +-----------------------
 +*/
 +
 +/**
 +  Function ​ lireTriples reads the input data
 +  returns a quad (subject, property, object, partID)
 +  with partID = hash(subject)
 +**/
 +def lireTriples(input:​ String): RDD[(Long, Long, Long, Int)] = {
 +   ​return sc.textFile(input).
 +    map(line => line.substring(1,​ line.length -1).split(","​)).
 +    map(tab => (tab(0).toLong,​ tab(1).toLong,​ tab(2).toLong,​ (tab(0).toInt).hashCode%machine))
 +}
 +
 +val triples = lireTriples(inputData)
 +triples.setName("​triples"​)
 +triples.persist
 +
 +//nombre total de nuplets
 +//​triples.count
 +
 +// stat: nb triplets par partition
 +//​triples.map{ case(s,​p,​o,​f) => (f,​1)}.reduceByKey( (a,b) => a+b).collect.foreach(println)
 +
 +
 +
 +/*
 +---------------------------------------------------------------------------
 +STEP 2: COMPUTE REPLICATION TRIPLES
 +
 +Computes the triple to replicate in every partition, for each query
 +---------------------------------------------------------------------------
 +*/
 +
 +/*
 + ​Fonction filterProp : selects the triples a a given property
 + */
 +def filterProp(input:​ RDD[(Long, Long, Long, Int)], prop: Long): RDD[(Long, Long, Int)] = {
 +x  // on projette sur S et O car P est fixe
 +  return input.filter(x => x._2 == prop).map{ case (s,p,o,f) => (s,o,f)}
 +}
 +
 +def getOutsideTriple(seedF:​ Int, t: ListBuffer[(Long,​ Long, Int)], properties: Array[Long]):​ Seq[(Long, Long, Long, Int)] = {
 +  var a = t.toArray
 +  var res: ListBuffer[(Long,​ Long, Long, Int)] = ListBuffer()
 +
 +  for(i <-0 to (a.length - 1)) {
 +    if( a(i)._3 != seedF) {
 +      res.append( (a(i)._1, ​ properties(i),​ a(i)._2, ​ seedF))
 +    }
 +  }
 +  return res
 +}
 +
 +def getReplicaForQuery(query:​ RDD[ListBuffer[(Long,​ Long, Int)]], properties: Array[Long],​ nbCandidateSeed:​ Int): RDD[(Long, Long, Long, Int)] = {
 +  var replica = sc.parallelize(List((0L,​0L,​0L,​0)))
 +  var min_nb = -1L
 +
 +  for (i <- 0 to (nbCandidateSeed-1)) {
 +    var t1 = query.map(x => (x(i)._3, x))
 +    //​t1.take(5).foreach(println)
 +
 +    // lister les triplets a  repliquer = ceux qui ne sont pas dans la seed partition
 +    var t2 = t1.flatMap{ case(seedF, tripleList) => getOutsideTriple(seedF,​ tripleList, properties)}.distinct
 +
 +    // count the triples to replicate
 +    var nb = t2.count
 +    if(min_nb == -1 || nb < min_nb){
 +      min_nb=nb
 +      replica=t2
 +      println("​current:"​)
 +      println(s"​seed $i, replica count: $nb ")
 +    }
 +    else {
 +      println(s"​-----ignore seed $i, replica count: $nb ")
 +    }
 +  }
 +  return replica
 +}
 +
 +// -------------------------------------------------------
 +//  Compute replication for the folowing QUERIES ​
 +// -------------------------------------------------------
 +
 +// based on dictionnary encoding
 +val advisor : Long = 1233125376
 +val worksFor : Long = 1136656384
 +val suborg : Long = 1224736768
 +val memof : Long = 1132462080
 +val undeg : Long = 1101004800
 +val teaof : Long = 1199570944
 +val takco : Long = 1115684864
 +
 +// -----------------------------------------------------------
 +// Query 1 : (not part of the benchmark)
 +// Pattern: (x advisor y) (y worksFor z) (z subOrganisation t)
 +// -----------------------------------------------------------
 +val query1 = filterProp(triples,​ advisor).map{ case (x, y, f1) => ( y, (x, f1))}.
 +  join(filterProp(triples,​ worksFor).map{ case (y, z, f2) => ( y, (z, f2) )}).
 +  map{ case ( y, ((x,f1), (z, f2))) => (z, (x, f1, y, f2)) }.
 +  join(filterProp(triples,​ suborg).map{ case ( z, t, f3) => (z, (t,f3))}).
 +  map{ case ( z, ( (x,​f1,​y,​f2),​ (t,f3) )) => ListBuffer( (x, y, f1), (y, z, f2) , (z, t, f3))}
 +
 +query1.setName("​q1"​)
 +//​query1.persist()
 +//​query1.count
 +// resultat: 4108791
 +
 +var t1 = java.lang.System.currentTimeMillis();​
 +
 +val replica1 =  getReplicaForQuery(query1,​ Array(advisor,​ worksFor, suborg), 3)
 +
 +var t2= java.lang.System.currentTimeMillis();​
 +
 +println("​Processing Q1 "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​
 +
 +// stat: nb triples a repliquer dans chaque partition
 +//​replica1.map{ case(s,​p,​o,​f) => (f,​1)}.reduceByKey( (a,b) => a+b).collect.foreach(println)
 +// (0,15551) (1,15776) (2,17639) (3,201275) (4,46068)
 +
 +
 +// -----------------------------------------------------------
 +// LUBM 2 : MSU
 +// Pattern: (x memberOf y) (y subOrg z) (x UndergraduateDegreeFrom z)
 +// -----------------------------------------------------------
 +val query2 = filterProp(triples,​ memof).map{ case (x, y, f1) => ( y, (x, f1))}.
 +  join(filterProp(triples,​ suborg).map{ case (y, z, f2) => ( y, (z, f2) )}).
 +  map{ case ( y, ((x,f1), (z,f2) )) => ((x, z), (y, f1, f2)) }.
 +  join(filterProp(triples,​ undeg).map{ case (x, z, f3) => ( (x, z), f3)}).
 +  map{ case ((x, z), ((y, f1, f2), f3)) => ListBuffer((x,​ y, f1), (y, z, f2), (x, z, f3)) }
 +
 +query2.setName("​q2"​)
 +//​query2.persist()
 +//​query2.count
 +
 +val replica2 =  getReplicaForQuery(query2,​ Array(memof,​ suborg, undeg), 2)
 +
 +//stat: nb triples a  repliquer dans chaque partition
 +//​replica2.map{ case(s,​p,​o,​f) => (f,​1)}.reduceByKey( (a,b) => a+b).collect.foreach(println)
 +//(0,1) (1,3) (3,4) (4,896)
 +
 +// -----------------------------------------------------------
 +// LUBM 9 : ATT
 +// Pattern: (x advisor y) (y teacherOf z) (x takesCourse z)
 +// -----------------------------------------------------------
 +val query9 = filterProp(triples,​ advisor).map{ case (x, y, f1) => ( y,(x, f1))}.
 +  join(filterProp(triples,​ teaof).map{ case (y, z, f2) => ( y, (z, f2) )}).
 +  map{ case ( y, ((x, f1), (z, f2) )) => ((x, z), (y, f1, f2)) }.
 +  join(filterProp(triples,​ takco).map{ case (x, z, f3) => ( (x, z), f3)}).
 +  map{ case ((x, z), ((y, f1, f2), f3)) => ListBuffer((x,​ y, f1), (y, z, f2), (x, z, f3)) }
 +
 +query9.setName("​q9"​)
 +//​query9.persist()
 +query9.count
 +// 272982
 +
 +val replica9 =  getReplicaForQuery(query9,​ Array(advisor,​ teaof, takco) ,2)
 +
 +// stat : nb triples à repliquer dans chaque partition
 +//​replica9.map{ case(s,​p,​o,​f) => (f,​1)}.reduceByKey( (a,b) => a+b).collect.foreach(println)
 +//(0,3131) (1,3213) (2,3504) (3,44706) (4,2014)
 +
 +
 +// -----------------------------------------------------------
 +// LUBM 12 : WS
 +// Pattern: (y worksFor z) (z subOrganisation t)
 +// -----------------------------------------------------------
 +val query12 = filterProp(triples,​ worksFor).map{ case (y, z, f1) => ( z, (y, f1))}.
 +  join(filterProp(triples,​ suborg).map{ case (z, t, f2) => ( z, (t, f2))}).
 +  map{ case ( z, ((y, f1), (t, f2) )) => ListBuffer((y,​ z, f1), (z, t, f2)) }
 +
 +query12.setName("​q12"​)
 +//​query12.persist()
 +query12.count
 +//720628
 +
 +val replica12 =  getReplicaForQuery(query12,​ Array(worksFor,​ suborg), 2)
 +
 +//stat : nb triples à repliquer dans chaque partition
 +//​replica12.map{ case(s,​p,​o,​f) => (f,​1)}.reduceByKey( (a,b) => a+b).collect.foreach(println)
 +
 +
 +/*
 +---------------------------------------------------------------------------
 +STEP 3: UNION DES REPLICAS CALCULES POUR CHAQUE REQUETE ​
 +---------------------------------------------------------------------------
 +*/
 +
 +val allreplica = replica1.union(replica2).union(replica9).union(replica12).distinct
 +
 +val nbAjout = allreplica.count
 +// 357 161 pour 5 part
 +// 734 295 pour 10 part
 +// 779 983 pour 20 part
 +
 +// replication rate
 +affiche("​N ajout: ​ " + nbAjout.toDouble)
 +affiche("​taux de replication:​ " + (nbAjout.toDouble / triples.count))
 +
 +var t2= java.lang.System.currentTimeMillis();​
 +affiche("​Preparation time "+ (t2 - t1).toDouble/​1000 +" sec for " + part + " partitions"​);​
 +
 +val oDist = triples.map{ case(s,​p,​o,​f) => (f,​1)}.reduceByKey( (a,b) => a+b)
 +val oMean = oDist.map{case(f,​c)=>​c}.sum / machine
 +val odevs = oDist.map{case(f,​c)=>​c}.map(score => (score - oMean) * (score - oMean))
 +val ostddev = Math.sqrt(odevs.sum / machine)
 +
 +val nDist = oDist.join(allreplica.map{ case(s,​p,​o,​f) => (f,​1)}.reduceByKey( (a,b) => a+b)).map{case(f,​(oc,​on))=>​(oc+on)}
 +val nMean = nDist.sum / machine
 +val ndevs = nDist.map(score => (score - nMean) * (score - nMean))
 +val nstddev = Math.sqrt(ndevs.sum / machine)
 + 
 +
 +
 +/*
 +---------------------------------------------------------------------------
 +STEP 4: LOCAL QUERY PROCESSING ​
 +On the WARP replicated data
 +---------------------------------------------------------------------------
 +*/
 +
 +// Extends each partition with the WARP replicated triples.
 +// Each machine stores one partition
 +val triplesWARP = triples.union(allreplica).
 +  map{ case (s,p,o,f) => (f, (s,p,o))}.
 +  partitionBy(new HashPartitioner(machine)).map{ case (f, (s,p,o)) => (s,p,o,f)}
 +
 +triplesWARP.setName("​triples WARP")
 +triplesWARP.persist()
 +triplesWARP.count
 +
 +
 +var t2= java.lang.System.currentTimeMillis();​
 +affiche("​Preparation time "+ (t2 - t1).toDouble/​1000 +" sec for " + part + " partitions"​);​
 +
 +
 +// -------------------
 +//  Q1 LOCAL
 +// -------------------
 +t1= java.lang.System.currentTimeMillis();​
 +
 +// Query1 locale avec projection sur les variables (x,y,z,t)
 +val localQuery1 = filterProp(triplesWARP,​ advisor).map{ case (x, y, f) => ( (y, f), x)}.
 +  join(filterProp(triplesWARP,​ worksFor).map{ case (y, z, f) => ( (y, f), z )}).
 +  map{ case ( (y, f), (x, z)) => ((z, f), (x, y)) }.
 +  join(filterProp(triplesWARP,​ suborg).map{ case ( z, t, f) => ((z, f), t)}).
 +  map{ case ( (z, f), ( (x,y), t)) => (x, y, z, t)}
 +
 +localQuery1.count
 +//​4108791 ​ OK idem sans replication
 +
 +t2= java.lang.System.currentTimeMillis();​
 +affiche("​Duréee de la requête Q1 : "+ (t2 - t1) +" msec pour " + part + " partitions"​);​
 +
 +// -----------------------------------------------------------
 +// Q2 LOCAL:
 +// Pattern: (x memberOf y) (y subOrg z) (x UndergraduateDegreeFrom z)
 +// -----------------------------------------------------------
 +t1= java.lang.System.currentTimeMillis();​
 +
 +val localQuery2 = filterProp(triplesWARP,​ memof).map{ case (x, y, f) => ( (y, f), x)}.
 +  join(filterProp(triplesWARP,​ suborg).map{ case (y, z, f) => ( (y, f), z )}).
 +  map{ case ( (y, f), (x, z)) => ((x, z, f), y) }.
 +  join(filterProp(triplesWARP,​ undeg).map{ case (x, z, f) => ( (x, z, f), 1)}).
 +  map{ case ((x, z, f), (y, 1)) => (x, y, z) }
 +
 +localQuery2.count
 +//2528
 +t2= java.lang.System.currentTimeMillis();​
 +affiche("​Duree de la requete Q2 : "+ (t2 - t1) +" msec pour " + part + " partitions"​);​
 +
 +
 +// -----------------------------------------------------------
 +// Q9 LOCAL
 +// Pattern: (x advisor y) (y teacherOf z) (x takesCourse z)
 +// -----------------------------------------------------------
 +t1= java.lang.System.currentTimeMillis();​
 +
 +val localQuery9 = filterProp(triplesWARP,​ advisor).map{ case (x, y, f) => ( (y, f), x)}.
 +  join(filterProp(triplesWARP,​ teaof).map{ case (y, z, f) => ( (y, f), z )}).
 +  map{ case ( (y, f), (x, z)) => ((x, z, f), y) }.
 +  join(filterProp(triplesWARP,​ takco).map{ case (x, z, f) => ( (x, z, f), 1)}).
 +  map{ case ((x, z, f), (y, 1)) => (x, y, z) }
 +
 +localQuery9.count
 +//272982
 +
 +t2= java.lang.System.currentTimeMillis();​
 +affiche("​Duree de la requete Q9 : "+ (t2 - t1) +" msec pour " + part + " partitions"​);​
 +
 +
 +// -----------------------------------------------------------
 +// Q12 LOCAL
 +// Pattern: (y worksFor z) (z subOrganisation t)
 +// -----------------------------------------------------------
 +t1= java.lang.System.currentTimeMillis();​
 +
 +val localQuery12 = filterProp(triplesWARP,​ worksFor).map{ case (y, z, f) => ( (z, f), y)}.
 +  join(filterProp(triplesWARP,​ suborg).map{ case (z, t, f) => ( (z, f), t)}).
 +  map{ case ( (z, f), (y, t)) => (y, z, t) }
 +
 +localQuery12.count
 +//938356
 +
 +t2= java.lang.System.currentTimeMillis();​
 +affiche("​Duree de la requete Q12: "+ (t2 - t1) +" msec pour " + part + " partitions"​);​
 +
 +
 +</​code>​
 +===2-hop based approach===
 +<​code>​
 +val folder= ​ "​lubm"​
 +val dataset= "​univ"​
 +val scale="​10k"​
 +
 +val folderName = folder +scale
 +val part = Array(5,​10,​20)
 +
 +for (p <- part)
 +{
 +val fileName = dataset+scale+"​_encoded_unique_quads.part."​+p
 +val fileNamewatdiv2k_encoded_unique_quads.partNew.5
 +val t1 = java.lang.System.currentTimeMillis();​
 +
 +val quads = sc.textFile(s"/​user/​olivier/​${folderName}/​${fileName}"​).map(x=>​x.split(","​)).map(t=>​(t(0).replace("​(",""​).toLong,​t(1).toLong,​t(2).toLong,​t(3).replace("​)",""​).toLong))
 +
 +var addOneHop = quads.map({case(s,​p,​o,​i)=>​(o,​i)}).join(quads.map({case(s,​p,​o,​i)=>​(s,​(p,​o,​i))})).filter({case(termS,​(i1,​(p,​o,​i2)))=>​i1!=i2}).distinct.map({case(termS,​(i1,​(p,​o,​i2)))=>​(termS,​p,​o,​i1)})
 +
 +val newQuads = quads.union(addOneHop).distinct
 +val newQuadsSize = newQuads.count
 +
 +val t2 = java.lang.System.currentTimeMillis();​
 +val hopSize = addOneHop.count
 +println(s"​Time to compute one more hop on $folderName for $p partitions is ${t2-t1}"​)
 +println(s"​ new size = $newQuadsSize , added $hopSize"​)
 +newQuads.saveAsTextFile(s"/​user/​olivier/​${folderName}/​${fileName}.2hop"​)
 +
 +</​code>​
 ====Datasets excerpts==== ====Datasets excerpts====
 +===Encoding of LUBM concepts and properties===
 +
 +<​code>​
 +Properties:
 +0 <​http://​www.w3.org/​1999/​02/​22-rdf-syntax-ns#​type>​
 +603979776 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​officeNumber>​
 +671088640 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​name>​
 +738197504 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​title>​
 +805306368 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​age>​
 +872415232 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​telephone>​
 +939524096 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​emailAddress>​
 +1006632960 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​researchInterest>​
 +1082130432 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​researchProject>​
 +1090519040 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​hasAlumnus>​
 +1098907648 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​degreeFrom>​
 +1101004800 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​undergraduateDegreeFrom>​
 +1103101952 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​mastersDegreeFrom>​
 +1105199104 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​doctoralDegreeFrom>​
 +1107296256 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​orgPublication>​
 +1115684864 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​takesCourse>​
 +1124073472 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​member>​
 +1132462080 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​memberOf>​
 +1136656384 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​worksFor>​
 +1138753536 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​headOf>​
 +1140850688 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​teachingAssistantOf>​
 +1149239296 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​listedCourse>​
 +1157627904 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​softwareDocumentation>​
 +1166016512 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​publicationAuthor>​
 +1174405120 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​softwareVersion>​
 +1182793728 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​affiliateOf>​
 +1191182336 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​tenured>​
 +1199570944 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​teacherOf>​
 +1207959552 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​publicationDate>​
 +1216348160 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​affiliatedOrganizationOf>​
 +1224736768 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​subOrganizationOf>​
 +1233125376 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​advisor>​
 +1241513984 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​publicationResearch>​
 +
 +Concepts:
 +0 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Schedule>​
 +268435456 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Organization>​
 +301989888 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​College>​
 +335544320 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Department>​
 +369098752 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Institute>​
 +402653184 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​ResearchGroup>​
 +436207616 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Program>​
 +469762048 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​University>​
 +536870912 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Publication>​
 +570425344 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Software>​
 +603979776 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Book>​
 +637534208 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Specification>​
 +671088640 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Manual>​
 +704643072 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Article>​
 +713031680 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​TechnicalReport>​
 +721420288 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​ConferencePaper>​
 +729808896 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​JournalArticle>​
 +738197504 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​UnofficialPublication>​
 +805306368 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Person>​
 +872415232 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​TeachingAssistant>​
 +939524096 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Student>​
 +956301312 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​GraduateStudent>​
 +973078528 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​UndergraduateStudent>​
 +1006632960 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Employee>​
 +1015021568 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​ResearchAssistant>​
 +1023410176 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Director>​
 +1031798784 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​AdministrativeStaff>​
 +1033895936 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​SystemsStaff>​
 +1035993088 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​ClericalStaff>​
 +1040187392 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Faculty>​
 +1042284544 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​PostDoc>​
 +1044381696 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Professor>​
 +1044643840 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Chair>​
 +1044905984 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​VisitingProfessor>​
 +1045168128 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​AssociateProfessor>​
 +1045430272 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Dean>​
 +1045692416 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​FullProfessor>​
 +1045954560 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​AssistantProfessor>​
 +1046478848 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Lecturer>​
 +1073741824 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Work>​
 +1140850688 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Course>​
 +1174405120 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​GraduateCourse>​
 +1207959552 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Research> ​
 +</​code>​
 +
 +===LUBM Univ1  ===
 +-[[http://​webia.lip6.fr/​~baazizi/​research/​iswc2015eval/​sources/​univ1_encoded_unique.id|encoded triples]](2.1MB)
 +
 +-[[http://​webia.lip6.fr/​~baazizi/​research/​iswc2015eval/​sources/​quads_plus_replicas.id|encoded quaruples with replication]](2.3MB)
 +
 +-[[http://​webia.lip6.fr/​~baazizi/​research/​iswc2015eval/​sources/​quads.id|replicated quaruples]](0.3MB)
 +
 +
  
 +-[[http://​webia.lip6.fr/​~baazizi/​research/​iswc2015eval/​sources/​univ1.nt|univ1.nt]] (16.3MB)
site/recherche/logiciels/rdfdist.1431340276.txt.gz · Dernière modification: 11/05/2015 12:31 par amine