Bases de Données / Databases

Site Web de l'équipe BD du LIP6 / LIP6 DB Web Site

Outils pour utilisateurs

Outils du site


site:recherche:logiciels:rdfdist

Différences

Ci-dessous, les différences entre deux révisions de la page.

Lien vers cette vue comparative

Les deux révisions précédentes Révision précédente
Prochaine révision
Révision précédente
site:recherche:logiciels:rdfdist [11/05/2015 12:30]
amine [Hash-based approaches]
site:recherche:logiciels:rdfdist [26/04/2017 13:06] (Version actuelle)
amann
Ligne 1: Ligne 1:
-===== RDFdist =====+{{indexmenu_n>​1}} 
 + 
 +===== RDFdist ​:  RDF distribution approaches using Spark =====
  
 This wiki page provides information about the experiments RDF distribution approaches using [[http://​spark.apache.org/​|Spark]]. This wiki page provides information about the experiments RDF distribution approaches using [[http://​spark.apache.org/​|Spark]].
-This information consists of i) the source code for both data preparation and query evaluation, ​ii) the query workload ​and iii) the  description of datasets used in the experiments. +This information consists of i) the query workload ii) the source code for both data preparation and query evaluation, and iii) the  description of two datasets used in the experiments. 
-For the sake of reproducibility,​ each source code is provied ​as a script which can be directly executed in the spark shell. +For the sake of reproducibility,​ each source code is provided ​as a script which can be directly executed in the spark shell.
-=====Source Code=====+
  
-====Hash-based approaches====+=====Query workload===
 +We picked three queries from the [[http://​swat.cse.lehigh.edu/​projects/​lubm/​|LUBM ]] dataset, namely #2, #9 and #12 
 +which are referred to as Query 2, Query 3 and Query 4, respectively. 
 +We have created an addirional one referred to as Query 1. 
 +We also created two queries for the [[https://​www.wikidata.org/​wiki/​Wikidata:​Main_Page|Wikidata]] dataset which are referred to as Query 5 and Query 6.
  
 +===Query 1 (synthetic, LUBM)===
 +<​code>​
 +SELECT ?x ?y ?z 
 +WHERE
 +{?x lubm:​advisor ?y.
 + ?y lubm:​worksFor ?z.
 + ?z lubm:​subOrganisation ?t
 +}
 +</​code>​
  
-The following code presents the random hashing approach for the LUBM datasets, i.e., queries Q1, Q2, Q3 and Q4 are executed and evaluated. 
  
 +
 +
 +===Query 2 (#2 of LUBM)===
 +<​code>​
 +PREFIX rdf: <​http://​www.w3.org/​1999/​02/​22-rdf-syntax-ns#> ​
 +PREFIX ub: <​http://​www.lehigh.edu/​~zhp2/​2004/​0401/​univ-bench.owl#> ​
 +SELECT ?X, ?Y, ?Z 
 +WHERE 
 +{?X rdf:type ub:​GraduateStudent . 
 +  ?Y rdf:type ub:​University . 
 +  ?Z rdf:type ub:​Department . 
 +  ?X ub:memberOf ?Z . 
 +  ?Z ub:​subOrganizationOf ?Y . 
 +  ?X ub:​undergraduateDegreeFrom ?Y}
 +</​code>​
 +
 +===Query 3 (#9 of LUBM)===
 +<​code>​
 +PREFIX rdf: <​http://​www.w3.org/​1999/​02/​22-rdf-syntax-ns#> ​
 +PREFIX ub: <​http://​www.lehigh.edu/​~zhp2/​2004/​0401/​univ-bench.owl#> ​
 +SELECT ?X, ?Y, ?Z 
 +WHERE 
 +{?X rdf:type ub:Student . 
 +  ?Y rdf:type ub:Faculty . 
 +  ?Z rdf:type ub:Course . 
 +  ?X ub:advisor ?Y . 
 +  ?Y ub:​teacherOf ?Z . 
 +  ?X ub:​takesCourse ?Z}
 +</​code>​
 +
 +===Query 4 (#12 of LUBM)===
 +<​code>​
 +PREFIX rdf: <​http://​www.w3.org/​1999/​02/​22-rdf-syntax-ns#> ​
 +PREFIX ub: <​http://​www.lehigh.edu/​~zhp2/​2004/​0401/​univ-bench.owl#> ​
 +SELECT ?X, ?Y 
 +WHERE 
 +{?X rdf:type ub:Chair . 
 +  ?Y rdf:type ub:​Department . 
 +  ?X ub:worksFor ?Y . 
 +  ?Y ub:​subOrganizationOf <​http://​www.University0.edu>​} ​
 +
 +</​code>​
 +
 +===Query 5 (synthetic, Wikidata)===
 +<​code>​
 +SELECT ?x ?y ?z 
 +WHERE  ​
 +{?x entity:​P131s ?y. 
 +?y entity:​P961v>​ ?z. 
 +?z entity:​P704s ?w
 +}
 +</​code>​
 +
 +===Query 6 (synthetic, Wikidata) ===
 +<​code>​
 +SELECT ?x ?y ?z 
 +{?x entity:P39v ?y. 
 +?x entity:​P580q ?z. 
 +?x rdf:type ?w
 +}
 +</​code>​
 +
 +=====Source Code=====
 +
 +
 +
 +====Hash-based approaches====
 +The following code corresponds to the triple hashing and subject hashing approaches for the LUBM datasets.
 +It consists of a data preparation part followd by a query evaluation part.
  
  
Ligne 28: Ligne 110:
  
 val t1 = java.lang.System.currentTimeMillis();​ val t1 = java.lang.System.currentTimeMillis();​
 +
 +
 +/**
 +* set inputData with the path to the data encoded as quadruples (see Datasets excerpts)
 +*/
  
 // loading and transformating the dataset // loading and transformating the dataset
Ligne 36: Ligne 123:
 println("​Loading time "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​ println("​Loading time "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​
  
-// Partitioning the dataset +// Partitioning the dataset  
-val triples = triples0.map{case(f,​(s,​p,​o))=>​((s+p+o),​(s,​p,​o))}.partitionBy(new HashPartitioner(part))+/** 
 +* Uncomment one of the following lines depending on whether hashing is applied 
 +on the entire triple or only the subject  
 +*/ 
 +// val triples = triples0.partitionBy(new HashPartitioner(part))  
 +// val triples = triples0.map{case(f,​(s,​p,​o))=>​((s+p+o),​(s,​p,​o))}.partitionBy(new HashPartitioner(part)) 
  
 //​triples.persist //​triples.persist
Ligne 63: Ligne 156:
 val takco : Long = 1115684864 val takco : Long = 1115684864
  
-def ajout(a : ListBuffer[(Long,​ Long, Long)], e: (Long, Long, Long) ) : ListBuffer[(Long,​ Long, Long)] = { 
-  a += e 
-  return a 
-} 
  
     // -----------------------------------------------------------     // -----------------------------------------------------------
Ligne 144: Ligne 233:
  
 println("​Processing LUBM #12 "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​ println("​Processing LUBM #12 "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​
 +
 +// -----------------------------------------------------------
 +// highly selective on wikidata :: 8 results ​
 +    // QR5:  ?x <​http://​www.wikidata.org/​entity/​P131s>​ ?y. 
 +    ?y <​http://​www.wikidata.org/​entity/​P961v>​ ?z. 
 +    ?z <​http://​www.wikidata.org/​entity/​P704s>​ ?w. 
 +    // -----------------------------------------------------------
 +val qr5 = triples.filter({case(s,​p,​o)=>​p==P131s}).map({case(s,​p,​o)=>​(o,​(s,​p))}).
 +    join(triples.filter({case(s,​p,​o)=>​p==P961v}).map({case(s,​p,​o)=>​(s,​(p,​o))})).
 +    map({case(k,​((s1,​p1),​(p2,​o2)))=>​(o2,​s1)}).
 +    join(triples.filter({case(s,​p,​o)=>​p==P704v}).map({case(s,​p,​o)=>​(s,​(p,​o))}))
 +qr5.persist
 +qr5.collect ​
 +
 +// mid selectivity start shaped query : 10418 results
 +// QR6: ?x <​http://​www.wikidata.org/​entity/​P39v>​ ?y. ?x <​http://​www.wikidata.org/​entity/​P580q>​ ?z. ?x <​http://​www.w3.org/​1999/​02/​22-rdf-syntax-ns#​type>​ ?w
 +
 +val qr6 = triples.filter({case(s,​p,​o)=>​p==P39v}).map({case(s,​p,​o)=>​(s,​null)}).
 +    join(triples.filter({case(s,​p,​o)=>​p==P580q}).map({case(s,​p,​o)=>​(s,​null)})).
 +    join(triples.filter({case(s,​p,​o)=>​p==rdftype}))
 +
 +qr6.persist
 +qr6.collect ​
 +
 </​code>​ </​code>​
  
-==Hashing applied on the subject == 
  
-====Partitioning-based approaches==== 
  
-==nHop==+====Graph partitioning-based approaches====
  
-==WARP==+===Huang Approach ​=== 
 +<​code>​ 
 +import org.apache.spark.HashPartitioner 
 +import scala.collection.mutable.ListBuffer
  
-==Hybrid==+val folder "​lubm"​ //"​watdiv"​  
 +val dataset"​univ"​ //"​watdiv"​  
 +val scale="​1k"​ 
 +val part=20 //10, 20
  
 +val folderName = folder +scale
 +val fileName = dataset+scale+"​_encoded_unique_quads.part."​+part+"​.2hop"​
 +
 +val t1 = java.lang.System.currentTimeMillis();​
 +
 +val quads_I_SPO = sc.textFile(s"/​user/​olivier/​${folderName}/​${fileName}"​).coalesce(part).map(x=>​x.split(","​)).map(t=>​(t(3).replace("​)",""​).toLong,​ (t(0).replace("​(",""​).toLong,​t(1).toLong,​t(2).toLong)))
 +
 +val quadsDist = quads_I_SPO.partitionBy(new HashPartitioner(part)).persist
 +
 +
 +
 +val t2 = java.lang.System.currentTimeMillis();​
 +
 +print("​Loading time of quads : "​+(t2-t1)/​1000 +" sec")
 +
 +    val advisor : Long = 1233125376
 +    val worksFor : Long = 1136656384
 +    val suborg : Long = 1224736768
 +    val memof : Long = 113246208
 +    val undeg : Long = 1101004800
 +    val teaof : Long = 1199570944
 +    val takco : Long = 1115684864
 +
 +
 +    // -----------------------------------------------------------
 +    // Query 1 : (not part of the benchmark)
 +    // Pattern: (x advisor y) (y worksFor z) (z subOrganisation t)
 +    // -----------------------------------------------------------
 +
 +var t1 = java.lang.System.currentTimeMillis();​
 +
 +var pataws = quadsDist.filter({case(i,​(s,​p,​o)) => p==advisor}).map({case(i,​(s,​p,​o)) => (o,s)}).
 +    join(quadsDist.filter({case(i,​(s,​p,​o)) => p==worksFor}).map({case(i,​(s,​p,​o)) => (s,​o)}),​part).
 +    map({case (y,(x,z)) => (z,​(x,​y))}).
 +    join(quadsDist.filter({case(i,​(s,​p,​o)) => p==suborg}).map({case(i,​(s,​p,​o)) => (s,o)}), part).
 +    map({case (z,​((x,​y),​t)) => (x,y,z,t)})
 +
 +pataws.count
 +
 +var pataws2 = pataws.flatMap(x=>​x).distinct
 +
 +var t2= java.lang.System.currentTimeMillis();​
 +
 +println("​Processing Q1 "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​
 +
 +
 +    // -----------------------------------------------------------
 +    // LUBM 2 : MSU
 +    // Pattern: (x memberOf y) (y subOrg z) (x UndergraduateDegreeFrom z)
 +    // -----------------------------------------------------------
 +
 +var t1 = java.lang.System.currentTimeMillis();​
 +
 +//var pmemof = quadsDist.filter({case(i,​(s,​p,​o)) => p==memof}).cache()
 +
 +var patmsu = quadsDist.filter({case(i,​(s,​p,​o)) => p==memof}).map({case(i,​(s,​p,​o)) => (o,s)}).
 +             ​join(quadsDist.filter({case(i,​(s,​p,​o)) => p==suborg}).map({case(i,​(s,​p,​o)) => (s,​o)}),​part).
 +             ​map({case (y,(x,z)) => (x+""​+z,​(x,​y,​z))}). ​
 +             ​join(quadsDist.filter({case(i,​(s,​p,​o)) => p==undeg}).map({case(i,​(x,​p,​z))=>​ (x+""​+z,​null)}))
 +
 +patmsu.count
 +
 +var patmsu2 = patmsu.flatMap(identity).distinct
 +
 +var t2= java.lang.System.currentTimeMillis();​
 +
 +println("​Processing Q2 "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​
 +
 +    // -----------------------------------------------------------
 +    // LUBM 9 : ATT
 +    // Pattern: (x advisor y) (y teacherOf z) (x takesCourse z)
 +    // -----------------------------------------------------------
 +
 +var t1 = java.lang.System.currentTimeMillis();​
 +
 +var patatt = quadsDist.filter({case(i,​(s,​p,​o)) => p==advisor}).map({case(i,​(s,​p,​o)) => (o,s)}).
 +              join(quadsDist.filter({case(i,​(s,​p,​o)) => p==teaof}).map({case(i,​(s,​p,​o)) => (s,o)}), part).
 +              map({case (y,(x,z)) => (x+""​+z,​(x,​y,​z))}).
 +              join(quadsDist.filter({case(i,​(s,​p,​o)) => p==takco}).map({case(i,​(s,​p,​o))=>​ (s+""​+o,​null)}),​ part)
 +
 +patatt.distinct.count
 +
 +var t2= java.lang.System.currentTimeMillis();​
 +
 +println("​Processing Q3 (LUBM #9) "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​
 +
 +
 +    // -----------------------------------------------------------
 +    // LUBM 12 : WS
 +    // Pattern: (y worksFor z) (z subOrganisation t)
 +    // -----------------------------------------------------------
 +
 +val t1 = java.lang.System.currentTimeMillis();​
 +
 +val patws = quadsDist.filter({case(i,​(s,​p,​o)) => p==worksFor}).map({case(i,​(s,​p,​o)) => ((i,​o),​s)}).join(quadsDist.filter({case(i,​(s,​p,​o)) => p==suborg}).map({case(i,​(s,​p,​o)) => ((i,​s),​o)}),​part).map({case ((i,​k),​(s,​o)) => (s,o)})
 +
 +val ans_patws = patws.distinct.count
 +
 +val t2= java.lang.System.currentTimeMillis();​
 +
 +println("​Processing LUBM #12 "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​
 +</​code>​
 +
 +===Warp===
 +<​code>​
 +// Spark implementation of WARP replication
 +// usage: run this code into the spark-shell
 +
 +import scala.collection.mutable.ListBuffer
 +import org.apache.spark.rdd.RDD
 +import scala.reflect.ClassTag
 +import org.apache.spark.SparkContext
 +import org.apache.spark.SparkContext._
 +
 +import org.apache.spark.HashPartitioner
 +import org.apache.spark.Partitioner
 +
 +import java.io.Serializable
 +
 +
 +val folder= "​lubm"​ //"​watdiv" ​
 +val dataset= "​univ"​ //"​watdiv"//​
 +val scale="​1k"​
 +
 +/* We have 15 cores per machine. ​
 +   Each core is accessing a separate part in parallel.
 +   The default parallelism = #machines * 15 cores
 +   <​=> ​  #​machines = defaultParallelism / 15
 +*/
 +val machine = sc.defaultParallelism /15
 +val part =  sc.defaultParallelism  ​
 +val folderName = folder + scale
 +val fileName = dataset + scale
 +/**
 +* set inputData with the path to the data encoded as quadruples (see Datasets excerpts)
 +*/
 +val inputData = s"/​user/​olivier/​${folderName}/​${fileName}_encoded_unique_quads.part.${machine}"​
 +
 +// Initial state, delete the storage
 +sc.getPersistentRDDs.values.foreach(x => x.unpersist())
 +sc.getPersistentRDDs.values.size
 +
 +var t1 = java.lang.System.currentTimeMillis();​
 +
 +def affiche(s: String): Unit={
 +println("####​ ------------------------------- ####")
 +println(s)
 +println("####​ ------------------------------- ####")
 +}
 +
 +
 +/*
 +-----------------------
 +STEP 1: read triples
 +-----------------------
 +*/
 +
 +/**
 +  Function ​ lireTriples reads the input data
 +  returns a quad (subject, property, object, partID)
 +  with partID = hash(subject)
 +**/
 +def lireTriples(input:​ String): RDD[(Long, Long, Long, Int)] = {
 +   ​return sc.textFile(input).
 +    map(line => line.substring(1,​ line.length -1).split(","​)).
 +    map(tab => (tab(0).toLong,​ tab(1).toLong,​ tab(2).toLong,​ (tab(0).toInt).hashCode%machine))
 +}
 +
 +val triples = lireTriples(inputData)
 +triples.setName("​triples"​)
 +triples.persist
 +
 +//nombre total de nuplets
 +//​triples.count
 +
 +// stat: nb triplets par partition
 +//​triples.map{ case(s,​p,​o,​f) => (f,​1)}.reduceByKey( (a,b) => a+b).collect.foreach(println)
 +
 +
 +
 +/*
 +---------------------------------------------------------------------------
 +STEP 2: COMPUTE REPLICATION TRIPLES
 +
 +Computes the triple to replicate in every partition, for each query
 +---------------------------------------------------------------------------
 +*/
 +
 +/*
 + ​Fonction filterProp : selects the triples a a given property
 + */
 +def filterProp(input:​ RDD[(Long, Long, Long, Int)], prop: Long): RDD[(Long, Long, Int)] = {
 +x  // on projette sur S et O car P est fixe
 +  return input.filter(x => x._2 == prop).map{ case (s,p,o,f) => (s,o,f)}
 +}
 +
 +def getOutsideTriple(seedF:​ Int, t: ListBuffer[(Long,​ Long, Int)], properties: Array[Long]):​ Seq[(Long, Long, Long, Int)] = {
 +  var a = t.toArray
 +  var res: ListBuffer[(Long,​ Long, Long, Int)] = ListBuffer()
 +
 +  for(i <-0 to (a.length - 1)) {
 +    if( a(i)._3 != seedF) {
 +      res.append( (a(i)._1, ​ properties(i),​ a(i)._2, ​ seedF))
 +    }
 +  }
 +  return res
 +}
 +
 +def getReplicaForQuery(query:​ RDD[ListBuffer[(Long,​ Long, Int)]], properties: Array[Long],​ nbCandidateSeed:​ Int): RDD[(Long, Long, Long, Int)] = {
 +  var replica = sc.parallelize(List((0L,​0L,​0L,​0)))
 +  var min_nb = -1L
 +
 +  for (i <- 0 to (nbCandidateSeed-1)) {
 +    var t1 = query.map(x => (x(i)._3, x))
 +    //​t1.take(5).foreach(println)
 +
 +    // lister les triplets a  repliquer = ceux qui ne sont pas dans la seed partition
 +    var t2 = t1.flatMap{ case(seedF, tripleList) => getOutsideTriple(seedF,​ tripleList, properties)}.distinct
 +
 +    // count the triples to replicate
 +    var nb = t2.count
 +    if(min_nb == -1 || nb < min_nb){
 +      min_nb=nb
 +      replica=t2
 +      println("​current:"​)
 +      println(s"​seed $i, replica count: $nb ")
 +    }
 +    else {
 +      println(s"​-----ignore seed $i, replica count: $nb ")
 +    }
 +  }
 +  return replica
 +}
 +
 +// -------------------------------------------------------
 +//  Compute replication for the folowing QUERIES ​
 +// -------------------------------------------------------
 +
 +// based on dictionnary encoding
 +val advisor : Long = 1233125376
 +val worksFor : Long = 1136656384
 +val suborg : Long = 1224736768
 +val memof : Long = 1132462080
 +val undeg : Long = 1101004800
 +val teaof : Long = 1199570944
 +val takco : Long = 1115684864
 +
 +// -----------------------------------------------------------
 +// Query 1 : (not part of the benchmark)
 +// Pattern: (x advisor y) (y worksFor z) (z subOrganisation t)
 +// -----------------------------------------------------------
 +val query1 = filterProp(triples,​ advisor).map{ case (x, y, f1) => ( y, (x, f1))}.
 +  join(filterProp(triples,​ worksFor).map{ case (y, z, f2) => ( y, (z, f2) )}).
 +  map{ case ( y, ((x,f1), (z, f2))) => (z, (x, f1, y, f2)) }.
 +  join(filterProp(triples,​ suborg).map{ case ( z, t, f3) => (z, (t,f3))}).
 +  map{ case ( z, ( (x,​f1,​y,​f2),​ (t,f3) )) => ListBuffer( (x, y, f1), (y, z, f2) , (z, t, f3))}
 +
 +query1.setName("​q1"​)
 +//​query1.persist()
 +//​query1.count
 +// resultat: 4108791
 +
 +var t1 = java.lang.System.currentTimeMillis();​
 +
 +val replica1 =  getReplicaForQuery(query1,​ Array(advisor,​ worksFor, suborg), 3)
 +
 +var t2= java.lang.System.currentTimeMillis();​
 +
 +println("​Processing Q1 "+ (t2 - t1) +" msec for "​+part+"​ partitions"​);​
 +
 +// stat: nb triples a repliquer dans chaque partition
 +//​replica1.map{ case(s,​p,​o,​f) => (f,​1)}.reduceByKey( (a,b) => a+b).collect.foreach(println)
 +// (0,15551) (1,15776) (2,17639) (3,201275) (4,46068)
 +
 +
 +// -----------------------------------------------------------
 +// LUBM 2 : MSU
 +// Pattern: (x memberOf y) (y subOrg z) (x UndergraduateDegreeFrom z)
 +// -----------------------------------------------------------
 +val query2 = filterProp(triples,​ memof).map{ case (x, y, f1) => ( y, (x, f1))}.
 +  join(filterProp(triples,​ suborg).map{ case (y, z, f2) => ( y, (z, f2) )}).
 +  map{ case ( y, ((x,f1), (z,f2) )) => ((x, z), (y, f1, f2)) }.
 +  join(filterProp(triples,​ undeg).map{ case (x, z, f3) => ( (x, z), f3)}).
 +  map{ case ((x, z), ((y, f1, f2), f3)) => ListBuffer((x,​ y, f1), (y, z, f2), (x, z, f3)) }
 +
 +query2.setName("​q2"​)
 +//​query2.persist()
 +//​query2.count
 +
 +val replica2 =  getReplicaForQuery(query2,​ Array(memof,​ suborg, undeg), 2)
 +
 +//stat: nb triples a  repliquer dans chaque partition
 +//​replica2.map{ case(s,​p,​o,​f) => (f,​1)}.reduceByKey( (a,b) => a+b).collect.foreach(println)
 +//(0,1) (1,3) (3,4) (4,896)
 +
 +// -----------------------------------------------------------
 +// LUBM 9 : ATT
 +// Pattern: (x advisor y) (y teacherOf z) (x takesCourse z)
 +// -----------------------------------------------------------
 +val query9 = filterProp(triples,​ advisor).map{ case (x, y, f1) => ( y,(x, f1))}.
 +  join(filterProp(triples,​ teaof).map{ case (y, z, f2) => ( y, (z, f2) )}).
 +  map{ case ( y, ((x, f1), (z, f2) )) => ((x, z), (y, f1, f2)) }.
 +  join(filterProp(triples,​ takco).map{ case (x, z, f3) => ( (x, z), f3)}).
 +  map{ case ((x, z), ((y, f1, f2), f3)) => ListBuffer((x,​ y, f1), (y, z, f2), (x, z, f3)) }
 +
 +query9.setName("​q9"​)
 +//​query9.persist()
 +query9.count
 +// 272982
 +
 +val replica9 =  getReplicaForQuery(query9,​ Array(advisor,​ teaof, takco) ,2)
 +
 +// stat : nb triples à repliquer dans chaque partition
 +//​replica9.map{ case(s,​p,​o,​f) => (f,​1)}.reduceByKey( (a,b) => a+b).collect.foreach(println)
 +//(0,3131) (1,3213) (2,3504) (3,44706) (4,2014)
 +
 +
 +// -----------------------------------------------------------
 +// LUBM 12 : WS
 +// Pattern: (y worksFor z) (z subOrganisation t)
 +// -----------------------------------------------------------
 +val query12 = filterProp(triples,​ worksFor).map{ case (y, z, f1) => ( z, (y, f1))}.
 +  join(filterProp(triples,​ suborg).map{ case (z, t, f2) => ( z, (t, f2))}).
 +  map{ case ( z, ((y, f1), (t, f2) )) => ListBuffer((y,​ z, f1), (z, t, f2)) }
 +
 +query12.setName("​q12"​)
 +//​query12.persist()
 +query12.count
 +//720628
 +
 +val replica12 =  getReplicaForQuery(query12,​ Array(worksFor,​ suborg), 2)
 +
 +//stat : nb triples à repliquer dans chaque partition
 +//​replica12.map{ case(s,​p,​o,​f) => (f,​1)}.reduceByKey( (a,b) => a+b).collect.foreach(println)
 +
 +
 +/*
 +---------------------------------------------------------------------------
 +STEP 3: UNION DES REPLICAS CALCULES POUR CHAQUE REQUETE ​
 +---------------------------------------------------------------------------
 +*/
 +
 +val allreplica = replica1.union(replica2).union(replica9).union(replica12).distinct
 +
 +val nbAjout = allreplica.count
 +// 357 161 pour 5 part
 +// 734 295 pour 10 part
 +// 779 983 pour 20 part
 +
 +// replication rate
 +affiche("​N ajout: ​ " + nbAjout.toDouble)
 +affiche("​taux de replication:​ " + (nbAjout.toDouble / triples.count))
 +
 +var t2= java.lang.System.currentTimeMillis();​
 +affiche("​Preparation time "+ (t2 - t1).toDouble/​1000 +" sec for " + part + " partitions"​);​
 +
 +val oDist = triples.map{ case(s,​p,​o,​f) => (f,​1)}.reduceByKey( (a,b) => a+b)
 +val oMean = oDist.map{case(f,​c)=>​c}.sum / machine
 +val odevs = oDist.map{case(f,​c)=>​c}.map(score => (score - oMean) * (score - oMean))
 +val ostddev = Math.sqrt(odevs.sum / machine)
 +
 +val nDist = oDist.join(allreplica.map{ case(s,​p,​o,​f) => (f,​1)}.reduceByKey( (a,b) => a+b)).map{case(f,​(oc,​on))=>​(oc+on)}
 +val nMean = nDist.sum / machine
 +val ndevs = nDist.map(score => (score - nMean) * (score - nMean))
 +val nstddev = Math.sqrt(ndevs.sum / machine)
 + 
 +
 +
 +/*
 +---------------------------------------------------------------------------
 +STEP 4: LOCAL QUERY PROCESSING ​
 +On the WARP replicated data
 +---------------------------------------------------------------------------
 +*/
 +
 +// Extends each partition with the WARP replicated triples.
 +// Each machine stores one partition
 +val triplesWARP = triples.union(allreplica).
 +  map{ case (s,p,o,f) => (f, (s,p,o))}.
 +  partitionBy(new HashPartitioner(machine)).map{ case (f, (s,p,o)) => (s,p,o,f)}
 +
 +triplesWARP.setName("​triples WARP")
 +triplesWARP.persist()
 +triplesWARP.count
 +
 +
 +var t2= java.lang.System.currentTimeMillis();​
 +affiche("​Preparation time "+ (t2 - t1).toDouble/​1000 +" sec for " + part + " partitions"​);​
 +
 +
 +// -------------------
 +//  Q1 LOCAL
 +// -------------------
 +t1= java.lang.System.currentTimeMillis();​
 +
 +// Query1 locale avec projection sur les variables (x,y,z,t)
 +val localQuery1 = filterProp(triplesWARP,​ advisor).map{ case (x, y, f) => ( (y, f), x)}.
 +  join(filterProp(triplesWARP,​ worksFor).map{ case (y, z, f) => ( (y, f), z )}).
 +  map{ case ( (y, f), (x, z)) => ((z, f), (x, y)) }.
 +  join(filterProp(triplesWARP,​ suborg).map{ case ( z, t, f) => ((z, f), t)}).
 +  map{ case ( (z, f), ( (x,y), t)) => (x, y, z, t)}
 +
 +localQuery1.count
 +//​4108791 ​ OK idem sans replication
 +
 +t2= java.lang.System.currentTimeMillis();​
 +affiche("​Duréee de la requête Q1 : "+ (t2 - t1) +" msec pour " + part + " partitions"​);​
 +
 +// -----------------------------------------------------------
 +// Q2 LOCAL:
 +// Pattern: (x memberOf y) (y subOrg z) (x UndergraduateDegreeFrom z)
 +// -----------------------------------------------------------
 +t1= java.lang.System.currentTimeMillis();​
 +
 +val localQuery2 = filterProp(triplesWARP,​ memof).map{ case (x, y, f) => ( (y, f), x)}.
 +  join(filterProp(triplesWARP,​ suborg).map{ case (y, z, f) => ( (y, f), z )}).
 +  map{ case ( (y, f), (x, z)) => ((x, z, f), y) }.
 +  join(filterProp(triplesWARP,​ undeg).map{ case (x, z, f) => ( (x, z, f), 1)}).
 +  map{ case ((x, z, f), (y, 1)) => (x, y, z) }
 +
 +localQuery2.count
 +//2528
 +t2= java.lang.System.currentTimeMillis();​
 +affiche("​Duree de la requete Q2 : "+ (t2 - t1) +" msec pour " + part + " partitions"​);​
 +
 +
 +// -----------------------------------------------------------
 +// Q9 LOCAL
 +// Pattern: (x advisor y) (y teacherOf z) (x takesCourse z)
 +// -----------------------------------------------------------
 +t1= java.lang.System.currentTimeMillis();​
 +
 +val localQuery9 = filterProp(triplesWARP,​ advisor).map{ case (x, y, f) => ( (y, f), x)}.
 +  join(filterProp(triplesWARP,​ teaof).map{ case (y, z, f) => ( (y, f), z )}).
 +  map{ case ( (y, f), (x, z)) => ((x, z, f), y) }.
 +  join(filterProp(triplesWARP,​ takco).map{ case (x, z, f) => ( (x, z, f), 1)}).
 +  map{ case ((x, z, f), (y, 1)) => (x, y, z) }
 +
 +localQuery9.count
 +//272982
 +
 +t2= java.lang.System.currentTimeMillis();​
 +affiche("​Duree de la requete Q9 : "+ (t2 - t1) +" msec pour " + part + " partitions"​);​
 +
 +
 +// -----------------------------------------------------------
 +// Q12 LOCAL
 +// Pattern: (y worksFor z) (z subOrganisation t)
 +// -----------------------------------------------------------
 +t1= java.lang.System.currentTimeMillis();​
 +
 +val localQuery12 = filterProp(triplesWARP,​ worksFor).map{ case (y, z, f) => ( (z, f), y)}.
 +  join(filterProp(triplesWARP,​ suborg).map{ case (z, t, f) => ( (z, f), t)}).
 +  map{ case ( (z, f), (y, t)) => (y, z, t) }
 +
 +localQuery12.count
 +//938356
 +
 +t2= java.lang.System.currentTimeMillis();​
 +affiche("​Duree de la requete Q12: "+ (t2 - t1) +" msec pour " + part + " partitions"​);​
 +
 +
 +</​code>​
 +===2-hop based approach===
 +<​code>​
 +val folder= ​ "​lubm"​
 +val dataset= "​univ"​
 +val scale="​10k"​
 +
 +val folderName = folder +scale
 +val part = Array(5,​10,​20)
 +
 +for (p <- part)
 +{
 +val fileName = dataset+scale+"​_encoded_unique_quads.part."​+p
 +val fileNamewatdiv2k_encoded_unique_quads.partNew.5
 +val t1 = java.lang.System.currentTimeMillis();​
 +
 +val quads = sc.textFile(s"/​user/​olivier/​${folderName}/​${fileName}"​).map(x=>​x.split(","​)).map(t=>​(t(0).replace("​(",""​).toLong,​t(1).toLong,​t(2).toLong,​t(3).replace("​)",""​).toLong))
 +
 +var addOneHop = quads.map({case(s,​p,​o,​i)=>​(o,​i)}).join(quads.map({case(s,​p,​o,​i)=>​(s,​(p,​o,​i))})).filter({case(termS,​(i1,​(p,​o,​i2)))=>​i1!=i2}).distinct.map({case(termS,​(i1,​(p,​o,​i2)))=>​(termS,​p,​o,​i1)})
 +
 +val newQuads = quads.union(addOneHop).distinct
 +val newQuadsSize = newQuads.count
 +
 +val t2 = java.lang.System.currentTimeMillis();​
 +val hopSize = addOneHop.count
 +println(s"​Time to compute one more hop on $folderName for $p partitions is ${t2-t1}"​)
 +println(s"​ new size = $newQuadsSize , added $hopSize"​)
 +newQuads.saveAsTextFile(s"/​user/​olivier/​${folderName}/​${fileName}.2hop"​)
 +
 +</​code>​
 ====Datasets excerpts==== ====Datasets excerpts====
 +===Encoding of LUBM concepts and properties===
 +
 +<​code>​
 +Properties:
 +0 <​http://​www.w3.org/​1999/​02/​22-rdf-syntax-ns#​type>​
 +603979776 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​officeNumber>​
 +671088640 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​name>​
 +738197504 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​title>​
 +805306368 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​age>​
 +872415232 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​telephone>​
 +939524096 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​emailAddress>​
 +1006632960 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​researchInterest>​
 +1082130432 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​researchProject>​
 +1090519040 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​hasAlumnus>​
 +1098907648 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​degreeFrom>​
 +1101004800 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​undergraduateDegreeFrom>​
 +1103101952 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​mastersDegreeFrom>​
 +1105199104 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​doctoralDegreeFrom>​
 +1107296256 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​orgPublication>​
 +1115684864 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​takesCourse>​
 +1124073472 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​member>​
 +1132462080 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​memberOf>​
 +1136656384 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​worksFor>​
 +1138753536 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​headOf>​
 +1140850688 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​teachingAssistantOf>​
 +1149239296 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​listedCourse>​
 +1157627904 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​softwareDocumentation>​
 +1166016512 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​publicationAuthor>​
 +1174405120 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​softwareVersion>​
 +1182793728 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​affiliateOf>​
 +1191182336 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​tenured>​
 +1199570944 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​teacherOf>​
 +1207959552 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​publicationDate>​
 +1216348160 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​affiliatedOrganizationOf>​
 +1224736768 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​subOrganizationOf>​
 +1233125376 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​advisor>​
 +1241513984 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​publicationResearch>​
 +
 +Concepts:
 +0 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Schedule>​
 +268435456 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Organization>​
 +301989888 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​College>​
 +335544320 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Department>​
 +369098752 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Institute>​
 +402653184 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​ResearchGroup>​
 +436207616 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Program>​
 +469762048 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​University>​
 +536870912 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Publication>​
 +570425344 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Software>​
 +603979776 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Book>​
 +637534208 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Specification>​
 +671088640 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Manual>​
 +704643072 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Article>​
 +713031680 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​TechnicalReport>​
 +721420288 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​ConferencePaper>​
 +729808896 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​JournalArticle>​
 +738197504 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​UnofficialPublication>​
 +805306368 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Person>​
 +872415232 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​TeachingAssistant>​
 +939524096 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Student>​
 +956301312 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​GraduateStudent>​
 +973078528 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​UndergraduateStudent>​
 +1006632960 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Employee>​
 +1015021568 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​ResearchAssistant>​
 +1023410176 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Director>​
 +1031798784 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​AdministrativeStaff>​
 +1033895936 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​SystemsStaff>​
 +1035993088 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​ClericalStaff>​
 +1040187392 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Faculty>​
 +1042284544 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​PostDoc>​
 +1044381696 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Professor>​
 +1044643840 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Chair>​
 +1044905984 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​VisitingProfessor>​
 +1045168128 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​AssociateProfessor>​
 +1045430272 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Dean>​
 +1045692416 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​FullProfessor>​
 +1045954560 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​AssistantProfessor>​
 +1046478848 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Lecturer>​
 +1073741824 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Work>​
 +1140850688 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Course>​
 +1174405120 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​GraduateCourse>​
 +1207959552 <​http://​www.univ-mlv.fr/​~ocure/​lubm.owl#​Research> ​
 +</​code>​
 +
 +===LUBM Univ1  ===
 +-[[http://​webia.lip6.fr/​~baazizi/​research/​iswc2015eval/​sources/​univ1_encoded_unique.id|encoded triples]](2.1MB)
 +
 +-[[http://​webia.lip6.fr/​~baazizi/​research/​iswc2015eval/​sources/​quads_plus_replicas.id|encoded quaruples with replication]](2.3MB)
 +
 +-[[http://​webia.lip6.fr/​~baazizi/​research/​iswc2015eval/​sources/​quads.id|replicated quaruples]](0.3MB)
 +
 +
  
 +-[[http://​webia.lip6.fr/​~baazizi/​research/​iswc2015eval/​sources/​univ1.nt|univ1.nt]] (16.3MB)
site/recherche/logiciels/rdfdist.1431340207.txt.gz · Dernière modification: 11/05/2015 12:30 par amine