Bases de Données / Databases

Site Web de l'équipe BD du LIP6 / LIP6 DB Web Site

Outils pour utilisateurs

Outils du site


Ceci est une ancienne révision du document !


This wiki page provides information about the experiments RDF distribution approaches using Spark. This information consists of i) the query workload ii) the source code for both data preparation and query evaluation, and iii) the description of two datasets used in the experiments. For the sake of reproducibility, each source code is provided as a script which can be directly executed in the spark shell.

Query workload

We picked three queries from the LUBM dataset, namely #2, #9 and #12 which are referred to as Query 2, Query 3 and Query 4, respectively. We have created an addirional one referred to as Query 1. We also created two queries for the Wikidata dataset which are referred to as Query 5 and Query 6.

Query 1 (synthetic, LUMB)

SELECT ?x ?y ?z 
{?x lubm:advisor ?y.
 ?y lubm:worksFor ?z.
 ?z lubm:subOrganisation ?t

Query 2 (#2 of LUBM)

PREFIX rdf: <> 
PREFIX ub: <> 
SELECT ?X, ?Y, ?Z 
{?X rdf:type ub:GraduateStudent . 
  ?Y rdf:type ub:University . 
  ?Z rdf:type ub:Department . 
  ?X ub:memberOf ?Z . 
  ?Z ub:subOrganizationOf ?Y . 
  ?X ub:undergraduateDegreeFrom ?Y}

Query 3 (#9 of LUBM)

PREFIX rdf: <> 
PREFIX ub: <> 
SELECT ?X, ?Y, ?Z 
{?X rdf:type ub:Student . 
  ?Y rdf:type ub:Faculty . 
  ?Z rdf:type ub:Course . 
  ?X ub:advisor ?Y . 
  ?Y ub:teacherOf ?Z . 
  ?X ub:takesCourse ?Z}

Query 4 (#12 of LUBM)

PREFIX rdf: <> 
PREFIX ub: <> 
{?X rdf:type ub:Chair . 
  ?Y rdf:type ub:Department . 
  ?X ub:worksFor ?Y . 
  ?Y ub:subOrganizationOf <>} 

Query 5 (synthetic, Wikidata)

SELECT ?x ?y ?z 
{?x entity:P131s ?y. 
?y entity:P961v> ?z. 
?z entity:P704s ?w

Query 6 (synthetic, Wikidata)

SELECT ?x ?y ?z 
{?x entity:P39v ?y. 
?x entity:P580q ?z. 
?x rdf:type ?w

Source Code

Hash-based approaches

The following code presents the random hashing approach for the LUBM datasets, i.e., queries Q1, Q2, Q3 and Q4 are executed and evaluated.

import org.apache.spark.HashPartitioner
import scala.collection.mutable.ListBuffer

val folder= "lubm"
val dataset= "univ"
val scale="10k"
val coreNumber = 20
val machine = sc.defaultParallelism /coreNumber
val part =  sc.defaultParallelism

val folderName = folder + scale
val fileName   = dataset + scale

val t1 = java.lang.System.currentTimeMillis();

// loading and transformating the dataset
val triples0 = sc.textFile(s"/user/olivier/${folderName}/${fileName}_encoded_unique").map(x => x.split(" ")).map(t => ((t(0).toLong+t(1).toLong+t(2).toLong).hashCode%machine,(t(0).toLong,t(1).toLong, t(2).toLong))).persist
val t2= java.lang.System.currentTimeMillis();

println("Loading time "+ (t2 - t1) +" msec for "+part+" partitions");

// Partitioning the dataset
val triples ={case(f,(s,p,o))=>((s+p+o),(s,p,o))}.partitionBy(new HashPartitioner(part))


val t3= java.lang.System.currentTimeMillis();

println("Partitioning time "+ (t3 - t2) +" msec for "+part+" partitions");

val oDist ={ case(f,(s,p,o)) => (f,1)}.reduceByKey( (a,b) => a+b)
val oMean ={case(f,c)=>c}.sum / machine
val odevs ={case(f,c)=>c}.map(score => (score - oMean) * (score - oMean))
val ostddev = Math.sqrt(odevs.sum / machine)


// declare constants for BGP translation
// This constants have been generated with our encoding method
val advisor : Long = 1233125376
val worksFor : Long = 1136656384
val suborg : Long = 1224736768
val memof : Long = 1132462080
val undeg : Long = 1101004800
val teaof : Long = 1199570944
val takco : Long = 1115684864

def ajout(a : ListBuffer[(Long, Long, Long)], e: (Long, Long, Long) ) : ListBuffer[(Long, Long, Long)] = {
  a += e
  return a

    // -----------------------------------------------------------
    // Query 1 : (not part of the benchmark)
    // Pattern: (x advisor y) (y worksFor z) (z subOrganisation t)
    // -----------------------------------------------------------

var t1 = java.lang.System.currentTimeMillis();

var pataws = triples.filter({case(s,p,o) => p==advisor}).map({case(s,p,o) => (o,s)}).
    join(triples.filter({case(s,p,o) => p==worksFor}).map({case(s,p,o) => (s,o)}),part).
    map({case (y,(x,z)) => (z,(x,y))}).
    join(triples.filter({case(s,p,o) => p==suborg}).map({case(s,p,o) => (s,o)}),part).
    map({case (y,(x,z)) => (z,(x,y))}).distinct


var t2= java.lang.System.currentTimeMillis();

println("Processing Q1 "+ (t2 - t1) +" msec for "+part+" partitions");

    // -----------------------------------------------------------
    // LUBM 2 : MSU
    // Pattern: (x memberOf y) (y subOrg z) (x UndergraduateDegreeFrom z)
    // -----------------------------------------------------------

var t1 = java.lang.System.currentTimeMillis();

var pmemof = triples.filter({case(s,p,o) => p==memof}).cache()

var patmsu ={case(s,p,o) => (o,s)}).
             join(triples.filter({case(s,p,o) => p==suborg}).map({case(s,p,o) => (s,o)}),part).
             map({case (y,(x,z)) => (x+""+z,(x,y,z))}).
             join(triples.filter({case(s,p,o) => p==undeg}).map({case(x,p,z)=> (x+""+z,null)}), part)


var patmsu2 = patmsu.flatMap( identity).distinct

var t2= java.lang.System.currentTimeMillis();

println("Processing Q2 "+ (t2 - t1) +" msec for "+part+" partitions");

    // -----------------------------------------------------------
    // LUBM 9 : ATT
    // Pattern: (x advisor y) (y teacherOf z) (x takesCourse z)
    // -----------------------------------------------------------

var t1 = java.lang.System.currentTimeMillis();

var patatt = triples.filter({case(s,p,o) => p==advisor}).map({case(s,p,o) => (o,s)}).
              join(triples.filter({case(s,p,o) => p==teaof}).map({case(s,p,o) => (s,o)}),part).
              map({case (y,(x,z)) => (x+""+z,(x,y,z))}).
              join(triples.filter({case(s,p,o) => p==takco}).map({case(s,p,o)=> (s+""+o,null)}),part)


var t2= java.lang.System.currentTimeMillis();

println("Processing Q3 (LUBM #9) "+ (t2 - t1) +" msec for "+part+" partitions");

    // -----------------------------------------------------------
    // LUBM 12 : WS
    // Pattern: (y worksFor z) (z subOrganisation t)
    // -----------------------------------------------------------

var t1 = java.lang.System.currentTimeMillis();

var patws = triples.filter({case(s,p,o) => p==worksFor}).map({case(s,p,o) => (o,(s,p,o))}).
              join(triples.filter({case(s,p,o) => p==suborg}).map({case(s,p,o) => (s,(s,p,o))}),part).
              map({case (k,((s1,p1,o1),(s2,p2,o2))) => (s1,o1,o2)})

var ans_patws = patws.distinct.count

var t2= java.lang.System.currentTimeMillis();

println("Processing LUBM #12 "+ (t2 - t1) +" msec for "+part+" partitions");

The code base for the subject hashing is quite similar, in fact one has to replace line the loading and transforming the dataset and Partitioning the dataset by :

// Loading and transforming the dataset
val triples0 = sc.textFile(s"/user/olivier/${folderName}/${fileName}_encoded_unique").map(x => x.split(" ")).map(t => (t(0).toLong,(t(0).toLong,t(1).toLong, t(2).toLong)).persist

val t2= java.lang.System.currentTimeMillis();

println("Loading time "+ (t2 - t1) +" msec for "+part+" partitions");

// Partitioning the dataset
val triples = triples0.partitionBy(new HashPartitioner(part)) 

Partitioning-based approaches


Datasets excerpts

site/recherche/logiciels/rdfdist.1431341363.txt.gz · Dernière modification: 11/05/2015 12:49 par amine