Bases de Données / Databases

Site Web de l'équipe BD du LIP6 / LIP6 DB Web Site

Outils pour utilisateurs

Outils du site


site:recherche:logiciels:rdfdist

Ceci est une ancienne révision du document !


RDFdist

This wiki page provides information about the experiments RDF distribution approaches using Spark. This information consists of i) the query workload ii) the source code for both data preparation and query evaluation, and iii) the description of two datasets used in the experiments. For the sake of reproducibility, each source code is provided as a script which can be directly executed in the spark shell.

Query workload

We picked three queries from the LUBM dataset, namely #2, #9 and #12 which are referred to as Query 2, Query 3 and Query 4, respectively. We have created an addirional one referred to as Query 1. We also created two queries for the Wikidata dataset which are referred to as Query 5 and Query 6.

Query 1 (synthetic, LUMB)

SELECT ?x ?y ?z 
WHERE
{?x lubm:advisor ?y.
 ?y lubm:worksFor ?z.
 ?z lubm:subOrganisation ?t
}

Query 2 (#2 of LUBM)

PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 
PREFIX ub: <http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#> 
SELECT ?X, ?Y, ?Z 
WHERE 
{?X rdf:type ub:GraduateStudent . 
  ?Y rdf:type ub:University . 
  ?Z rdf:type ub:Department . 
  ?X ub:memberOf ?Z . 
  ?Z ub:subOrganizationOf ?Y . 
  ?X ub:undergraduateDegreeFrom ?Y}

Query 3 (#9 of LUBM)

PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 
PREFIX ub: <http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#> 
SELECT ?X, ?Y, ?Z 
WHERE 
{?X rdf:type ub:Student . 
  ?Y rdf:type ub:Faculty . 
  ?Z rdf:type ub:Course . 
  ?X ub:advisor ?Y . 
  ?Y ub:teacherOf ?Z . 
  ?X ub:takesCourse ?Z}

Query 4 (#12 of LUBM)

PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 
PREFIX ub: <http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#> 
SELECT ?X, ?Y 
WHERE 
{?X rdf:type ub:Chair . 
  ?Y rdf:type ub:Department . 
  ?X ub:worksFor ?Y . 
  ?Y ub:subOrganizationOf <http://www.University0.edu>} 

Query 5 (synthetic, Wikidata)

SELECT ?x ?y ?z 
WHERE  
{?x entity:P131s ?y. 
?y entity:P961v> ?z. 
?z entity:P704s ?w
}

Query 6 (synthetic, Wikidata)

SELECT ?x ?y ?z 
{?x entity:P39v ?y. 
?x entity:P580q ?z. 
?x rdf:type ?w
}

Source Code

Hash-based approaches

The following code corresponds to the triple hashing and subject hashing approaches for the LUBM datasets. It consists of a data preparation part followd by a query evaluation part.

import org.apache.spark.HashPartitioner
import scala.collection.mutable.ListBuffer

val folder= "lubm"
val dataset= "univ"
val scale="10k"
val coreNumber = 20
val machine = sc.defaultParallelism /coreNumber
val part =  sc.defaultParallelism

val folderName = folder + scale
val fileName   = dataset + scale

val t1 = java.lang.System.currentTimeMillis();

// loading and transformating the dataset
val triples0 = sc.textFile(s"/user/olivier/${folderName}/${fileName}_encoded_unique").map(x => x.split(" ")).map(t => ((t(0).toLong+t(1).toLong+t(2).toLong).hashCode%machine,(t(0).toLong,t(1).toLong, t(2).toLong))).persist
triples0.count
val t2= java.lang.System.currentTimeMillis();

println("Loading time "+ (t2 - t1) +" msec for "+part+" partitions");

// Partitioning the dataset 
/**
* Uncomment one of the following lines depending on whether hashing is applied
on the entire triple or only the subject 
*/
// val triples = triples0.partitionBy(new HashPartitioner(part)) 
// val triples = triples0.map{case(f,(s,p,o))=>((s+p+o),(s,p,o))}.partitionBy(new HashPartitioner(part))


//triples.persist
triples.count

val t3= java.lang.System.currentTimeMillis();

println("Partitioning time "+ (t3 - t2) +" msec for "+part+" partitions");

val oDist = triples.map{ case(f,(s,p,o)) => (f,1)}.reduceByKey( (a,b) => a+b)
val oMean = oDist.map{case(f,c)=>c}.sum / machine
val odevs = oDist.map{case(f,c)=>c}.map(score => (score - oMean) * (score - oMean))
val ostddev = Math.sqrt(odevs.sum / machine)

triples.partitions.length

// declare constants for BGP translation
// This constants have been generated with our encoding method
val advisor : Long = 1233125376
val worksFor : Long = 1136656384
val suborg : Long = 1224736768
val memof : Long = 1132462080
val undeg : Long = 1101004800
val teaof : Long = 1199570944
val takco : Long = 1115684864

def ajout(a : ListBuffer[(Long, Long, Long)], e: (Long, Long, Long) ) : ListBuffer[(Long, Long, Long)] = {
  a += e
  return a
}

    // -----------------------------------------------------------
    // Query 1 : (not part of the benchmark)
    // Pattern: (x advisor y) (y worksFor z) (z subOrganisation t)
    // -----------------------------------------------------------

var t1 = java.lang.System.currentTimeMillis();

var pataws = triples.filter({case(s,p,o) => p==advisor}).map({case(s,p,o) => (o,s)}).
    join(triples.filter({case(s,p,o) => p==worksFor}).map({case(s,p,o) => (s,o)}),part).
    map({case (y,(x,z)) => (z,(x,y))}).
    join(triples.filter({case(s,p,o) => p==suborg}).map({case(s,p,o) => (s,o)}),part).
    map({case (y,(x,z)) => (z,(x,y))}).distinct

pataws.count

var t2= java.lang.System.currentTimeMillis();

println("Processing Q1 "+ (t2 - t1) +" msec for "+part+" partitions");


    // -----------------------------------------------------------
    // LUBM 2 : MSU
    // Pattern: (x memberOf y) (y subOrg z) (x UndergraduateDegreeFrom z)
    // -----------------------------------------------------------

var t1 = java.lang.System.currentTimeMillis();

var pmemof = triples.filter({case(s,p,o) => p==memof}).cache()

var patmsu = pmemof.map({case(s,p,o) => (o,s)}).
             join(triples.filter({case(s,p,o) => p==suborg}).map({case(s,p,o) => (s,o)}),part).
             map({case (y,(x,z)) => (x+""+z,(x,y,z))}).
             join(triples.filter({case(s,p,o) => p==undeg}).map({case(x,p,z)=> (x+""+z,null)}), part)

patmsu.count

var patmsu2 = patmsu.flatMap( identity).distinct

var t2= java.lang.System.currentTimeMillis();

println("Processing Q2 "+ (t2 - t1) +" msec for "+part+" partitions");

    // -----------------------------------------------------------
    // LUBM 9 : ATT
    // Pattern: (x advisor y) (y teacherOf z) (x takesCourse z)
    // -----------------------------------------------------------

var t1 = java.lang.System.currentTimeMillis();

var patatt = triples.filter({case(s,p,o) => p==advisor}).map({case(s,p,o) => (o,s)}).
              join(triples.filter({case(s,p,o) => p==teaof}).map({case(s,p,o) => (s,o)}),part).
              map({case (y,(x,z)) => (x+""+z,(x,y,z))}).
              join(triples.filter({case(s,p,o) => p==takco}).map({case(s,p,o)=> (s+""+o,null)}),part)

patatt.distinct.count

var t2= java.lang.System.currentTimeMillis();

println("Processing Q3 (LUBM #9) "+ (t2 - t1) +" msec for "+part+" partitions");

    // -----------------------------------------------------------
    // LUBM 12 : WS
    // Pattern: (y worksFor z) (z subOrganisation t)
    // -----------------------------------------------------------

var t1 = java.lang.System.currentTimeMillis();

var patws = triples.filter({case(s,p,o) => p==worksFor}).map({case(s,p,o) => (o,(s,p,o))}).
              join(triples.filter({case(s,p,o) => p==suborg}).map({case(s,p,o) => (s,(s,p,o))}),part).
              map({case (k,((s1,p1,o1),(s2,p2,o2))) => (s1,o1,o2)})

var ans_patws = patws.distinct.count

var t2= java.lang.System.currentTimeMillis();

println("Processing LUBM #12 "+ (t2 - t1) +" msec for "+part+" partitions");

// -----------------------------------------------------------
// highly selective on wikidata :: 8 results 
    // QR5:  ?x <http://www.wikidata.org/entity/P131s> ?y. 
    ?y <http://www.wikidata.org/entity/P961v> ?z. 
    ?z <http://www.wikidata.org/entity/P704s> ?w. 
    // -----------------------------------------------------------
val qr5 = triples.filter({case(s,p,o)=>p==P131s}).map({case(s,p,o)=>(o,(s,p))}).
    join(triples.filter({case(s,p,o)=>p==P961v}).map({case(s,p,o)=>(s,(p,o))})).
    map({case(k,((s1,p1),(p2,o2)))=>(o2,s1)}).
    join(triples.filter({case(s,p,o)=>p==P704v}).map({case(s,p,o)=>(s,(p,o))}))
qr5.persist
qr5.collect 

// mid selectivity start shaped query : 10418 results
// QR6: ?x <http://www.wikidata.org/entity/P39v> ?y. ?x <http://www.wikidata.org/entity/P580q> ?z. ?x <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> ?w

val qr6 = triples.filter({case(s,p,o)=>p==P39v}).map({case(s,p,o)=>(s,null)}).
    join(triples.filter({case(s,p,o)=>p==P580q}).map({case(s,p,o)=>(s,null)})).
    join(triples.filter({case(s,p,o)=>p==rdftype}))

qr6.persist
qr6.collect 

Graph partitioning-based approaches

// Spark implementation of WARP replication
// usage: run this code into the spark-shell

import scala.collection.mutable.ListBuffer
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

import org.apache.spark.HashPartitioner
import org.apache.spark.Partitioner

import java.io.Serializable


val folder= "lubm" //"watdiv" 
val dataset= "univ" //"watdiv"//
val scale="1k"

/* We have 15 cores per machine. 
   Each core is accessing a separate part in parallel.
   The default parallelism = #machines * 15 cores
   <=>   #machines = defaultParallelism / 15
*/
val machine = sc.defaultParallelism /15
val part =  sc.defaultParallelism  
val folderName = folder + scale
val fileName = dataset + scale
val inputData = s"/user/olivier/${folderName}/${fileName}_encoded_unique_quads.part.${machine}"

// Initial state, delete the storage
sc.getPersistentRDDs.values.foreach(x => x.unpersist())
sc.getPersistentRDDs.values.size

var t1 = java.lang.System.currentTimeMillis();

def affiche(s: String): Unit={
println("#### ------------------------------- ####")
println(s)
println("#### ------------------------------- ####")
}


/*
-----------------------
STEP 1: read triples
-----------------------
*/

/**
  Function  lireTriples reads the input data
  returns a quad (subject, property, object, partID)
  with partID = hash(subject)
**/
def lireTriples(input: String): RDD[(Long, Long, Long, Int)] = {
   return sc.textFile(input).
    map(line => line.substring(1, line.length -1).split(",")).
    map(tab => (tab(0).toLong, tab(1).toLong, tab(2).toLong, (tab(0).toInt).hashCode%machine))
}

val triples = lireTriples(inputData)
triples.setName("triples")
triples.persist

//nombre total de nuplets
//triples.count

// stat: nb triplets par partition
//triples.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b).collect.foreach(println)



/*
---------------------------------------------------------------------------
STEP 2: COMPUTE REPLICATION TRIPLES

Computes the triple to replicate in every partition, for each query
---------------------------------------------------------------------------
*/

/*
 Fonction filterProp : selects the triples a a given property
 */
def filterProp(input: RDD[(Long, Long, Long, Int)], prop: Long): RDD[(Long, Long, Int)] = {
x  // on projette sur S et O car P est fixe
  return input.filter(x => x._2 == prop).map{ case (s,p,o,f) => (s,o,f)}
}

def getOutsideTriple(seedF: Int, t: ListBuffer[(Long, Long, Int)], properties: Array[Long]): Seq[(Long, Long, Long, Int)] = {
  var a = t.toArray
  var res: ListBuffer[(Long, Long, Long, Int)] = ListBuffer()

  for(i <-0 to (a.length - 1)) {
    if( a(i)._3 != seedF) {
      res.append( (a(i)._1,  properties(i), a(i)._2,  seedF))
    }
  }
  return res
}

def getReplicaForQuery(query: RDD[ListBuffer[(Long, Long, Int)]], properties: Array[Long], nbCandidateSeed: Int): RDD[(Long, Long, Long, Int)] = {
  var replica = sc.parallelize(List((0L,0L,0L,0)))
  var min_nb = -1L

  for (i <- 0 to (nbCandidateSeed-1)) {
    var t1 = query.map(x => (x(i)._3, x))
    //t1.take(5).foreach(println)

    // lister les triplets a  repliquer = ceux qui ne sont pas dans la seed partition
    var t2 = t1.flatMap{ case(seedF, tripleList) => getOutsideTriple(seedF, tripleList, properties)}.distinct

    // count the triples to replicate
    var nb = t2.count
    if(min_nb == -1 || nb < min_nb){
      min_nb=nb
      replica=t2
      println("current:")
      println(s"seed $i, replica count: $nb ")
    }
    else {
      println(s"-----ignore seed $i, replica count: $nb ")
    }
  }
  return replica
}

// -------------------------------------------------------
//  Compute replication for the folowing QUERIES 
// -------------------------------------------------------

// based on dictionnary encoding
val advisor : Long = 1233125376
val worksFor : Long = 1136656384
val suborg : Long = 1224736768
val memof : Long = 1132462080
val undeg : Long = 1101004800
val teaof : Long = 1199570944
val takco : Long = 1115684864

// -----------------------------------------------------------
// Query 1 : (not part of the benchmark)
// Pattern: (x advisor y) (y worksFor z) (z subOrganisation t)
// -----------------------------------------------------------
val query1 = filterProp(triples, advisor).map{ case (x, y, f1) => ( y, (x, f1))}.
  join(filterProp(triples, worksFor).map{ case (y, z, f2) => ( y, (z, f2) )}).
  map{ case ( y, ((x,f1), (z, f2))) => (z, (x, f1, y, f2)) }.
  join(filterProp(triples, suborg).map{ case ( z, t, f3) => (z, (t,f3))}).
  map{ case ( z, ( (x,f1,y,f2), (t,f3) )) => ListBuffer( (x, y, f1), (y, z, f2) , (z, t, f3))}

query1.setName("q1")
//query1.persist()
//query1.count
// resultat: 4108791

var t1 = java.lang.System.currentTimeMillis();

val replica1 =  getReplicaForQuery(query1, Array(advisor, worksFor, suborg), 3)

var t2= java.lang.System.currentTimeMillis();

println("Processing Q1 "+ (t2 - t1) +" msec for "+part+" partitions");

// stat: nb triples a repliquer dans chaque partition
//replica1.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b).collect.foreach(println)
// (0,15551) (1,15776) (2,17639) (3,201275) (4,46068)


// -----------------------------------------------------------
// LUBM 2 : MSU
// Pattern: (x memberOf y) (y subOrg z) (x UndergraduateDegreeFrom z)
// -----------------------------------------------------------
val query2 = filterProp(triples, memof).map{ case (x, y, f1) => ( y, (x, f1))}.
  join(filterProp(triples, suborg).map{ case (y, z, f2) => ( y, (z, f2) )}).
  map{ case ( y, ((x,f1), (z,f2) )) => ((x, z), (y, f1, f2)) }.
  join(filterProp(triples, undeg).map{ case (x, z, f3) => ( (x, z), f3)}).
  map{ case ((x, z), ((y, f1, f2), f3)) => ListBuffer((x, y, f1), (y, z, f2), (x, z, f3)) }

query2.setName("q2")
//query2.persist()
//query2.count

val replica2 =  getReplicaForQuery(query2, Array(memof, suborg, undeg), 2)

//stat: nb triples a  repliquer dans chaque partition
//replica2.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b).collect.foreach(println)
//(0,1) (1,3) (3,4) (4,896)

// -----------------------------------------------------------
// LUBM 9 : ATT
// Pattern: (x advisor y) (y teacherOf z) (x takesCourse z)
// -----------------------------------------------------------
val query9 = filterProp(triples, advisor).map{ case (x, y, f1) => ( y,(x, f1))}.
  join(filterProp(triples, teaof).map{ case (y, z, f2) => ( y, (z, f2) )}).
  map{ case ( y, ((x, f1), (z, f2) )) => ((x, z), (y, f1, f2)) }.
  join(filterProp(triples, takco).map{ case (x, z, f3) => ( (x, z), f3)}).
  map{ case ((x, z), ((y, f1, f2), f3)) => ListBuffer((x, y, f1), (y, z, f2), (x, z, f3)) }

query9.setName("q9")
//query9.persist()
query9.count
// 272982

val replica9 =  getReplicaForQuery(query9, Array(advisor, teaof, takco) ,2)

// stat : nb triples à repliquer dans chaque partition
//replica9.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b).collect.foreach(println)
//(0,3131) (1,3213) (2,3504) (3,44706) (4,2014)


// -----------------------------------------------------------
// LUBM 12 : WS
// Pattern: (y worksFor z) (z subOrganisation t)
// -----------------------------------------------------------
val query12 = filterProp(triples, worksFor).map{ case (y, z, f1) => ( z, (y, f1))}.
  join(filterProp(triples, suborg).map{ case (z, t, f2) => ( z, (t, f2))}).
  map{ case ( z, ((y, f1), (t, f2) )) => ListBuffer((y, z, f1), (z, t, f2)) }

query12.setName("q12")
//query12.persist()
query12.count
//720628

val replica12 =  getReplicaForQuery(query12, Array(worksFor, suborg), 2)

//stat : nb triples à repliquer dans chaque partition
//replica12.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b).collect.foreach(println)


/*
---------------------------------------------------------------------------
STEP 3: UNION DES REPLICAS CALCULES POUR CHAQUE REQUETE 
---------------------------------------------------------------------------
*/

val allreplica = replica1.union(replica2).union(replica9).union(replica12).distinct

val nbAjout = allreplica.count
// 357 161 pour 5 part
// 734 295 pour 10 part
// 779 983 pour 20 part

// replication rate
affiche("N ajout:  " + nbAjout.toDouble)
affiche("taux de replication: " + (nbAjout.toDouble / triples.count))

var t2= java.lang.System.currentTimeMillis();
affiche("Preparation time "+ (t2 - t1).toDouble/1000 +" sec for " + part + " partitions");

val oDist = triples.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b)
val oMean = oDist.map{case(f,c)=>c}.sum / machine
val odevs = oDist.map{case(f,c)=>c}.map(score => (score - oMean) * (score - oMean))
val ostddev = Math.sqrt(odevs.sum / machine)

val nDist = oDist.join(allreplica.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b)).map{case(f,(oc,on))=>(oc+on)}
val nMean = nDist.sum / machine
val ndevs = nDist.map(score => (score - nMean) * (score - nMean))
val nstddev = Math.sqrt(ndevs.sum / machine)
 


/*
---------------------------------------------------------------------------
STEP 4: LOCAL QUERY PROCESSING 
On the WARP replicated data
---------------------------------------------------------------------------
*/

// Extends each partition with the WARP replicated triples.
// Each machine stores one partition
val triplesWARP = triples.union(allreplica).
  map{ case (s,p,o,f) => (f, (s,p,o))}.
  partitionBy(new HashPartitioner(machine)).map{ case (f, (s,p,o)) => (s,p,o,f)}

triplesWARP.setName("triples WARP")
triplesWARP.persist()
triplesWARP.count


var t2= java.lang.System.currentTimeMillis();
affiche("Preparation time "+ (t2 - t1).toDouble/1000 +" sec for " + part + " partitions");


// -------------------
//  Q1 LOCAL
// -------------------
t1= java.lang.System.currentTimeMillis();

// Query1 locale avec projection sur les variables (x,y,z,t)
val localQuery1 = filterProp(triplesWARP, advisor).map{ case (x, y, f) => ( (y, f), x)}.
  join(filterProp(triplesWARP, worksFor).map{ case (y, z, f) => ( (y, f), z )}).
  map{ case ( (y, f), (x, z)) => ((z, f), (x, y)) }.
  join(filterProp(triplesWARP, suborg).map{ case ( z, t, f) => ((z, f), t)}).
  map{ case ( (z, f), ( (x,y), t)) => (x, y, z, t)}

localQuery1.count
//4108791  OK idem sans replication

t2= java.lang.System.currentTimeMillis();
affiche("Duréee de la requête Q1 : "+ (t2 - t1) +" msec pour " + part + " partitions");

// -----------------------------------------------------------
// Q2 LOCAL:
// Pattern: (x memberOf y) (y subOrg z) (x UndergraduateDegreeFrom z)
// -----------------------------------------------------------
t1= java.lang.System.currentTimeMillis();

val localQuery2 = filterProp(triplesWARP, memof).map{ case (x, y, f) => ( (y, f), x)}.
  join(filterProp(triplesWARP, suborg).map{ case (y, z, f) => ( (y, f), z )}).
  map{ case ( (y, f), (x, z)) => ((x, z, f), y) }.
  join(filterProp(triplesWARP, undeg).map{ case (x, z, f) => ( (x, z, f), 1)}).
  map{ case ((x, z, f), (y, 1)) => (x, y, z) }

localQuery2.count
//2528
t2= java.lang.System.currentTimeMillis();
affiche("Duree de la requete Q2 : "+ (t2 - t1) +" msec pour " + part + " partitions");


// -----------------------------------------------------------
// Q9 LOCAL
// Pattern: (x advisor y) (y teacherOf z) (x takesCourse z)
// -----------------------------------------------------------
t1= java.lang.System.currentTimeMillis();

val localQuery9 = filterProp(triplesWARP, advisor).map{ case (x, y, f) => ( (y, f), x)}.
  join(filterProp(triplesWARP, teaof).map{ case (y, z, f) => ( (y, f), z )}).
  map{ case ( (y, f), (x, z)) => ((x, z, f), y) }.
  join(filterProp(triplesWARP, takco).map{ case (x, z, f) => ( (x, z, f), 1)}).
  map{ case ((x, z, f), (y, 1)) => (x, y, z) }

localQuery9.count
//272982

t2= java.lang.System.currentTimeMillis();
affiche("Duree de la requete Q9 : "+ (t2 - t1) +" msec pour " + part + " partitions");


// -----------------------------------------------------------
// Q12 LOCAL
// Pattern: (y worksFor z) (z subOrganisation t)
// -----------------------------------------------------------
t1= java.lang.System.currentTimeMillis();

val localQuery12 = filterProp(triplesWARP, worksFor).map{ case (y, z, f) => ( (z, f), y)}.
  join(filterProp(triplesWARP, suborg).map{ case (z, t, f) => ( (z, f), t)}).
  map{ case ( (z, f), (y, t)) => (y, z, t) }

localQuery12.count
//938356

t2= java.lang.System.currentTimeMillis();
affiche("Duree de la requete Q12: "+ (t2 - t1) +" msec pour " + part + " partitions");

Datasets excerpts

site/recherche/logiciels/rdfdist.1431345688.txt.gz · Dernière modification: 11/05/2015 14:01 par amine