Bases de Données / Databases

Site Web de l'équipe BD du LIP6 / LIP6 DB Web Site

Outils pour utilisateurs

Outils du site


site:recherche:logiciels:rdfdist

Ceci est une ancienne révision du document !


RDFdist

This wiki page provides information about the experiments RDF distribution approaches using Spark. This information consists of i) the source code for both data preparation and query evaluation, ii) the query workload and iii) the description of datasets used in the experiments. For the sake of reproducibility, each source code is provied as a script which can be directly executed in the spark shell.

Source Code

Hash-based approaches

The following code presents the random hashing approach for the LUBM datasets, i.e., queries Q1, Q2, Q3 and Q4 are executed and evaluated.

import org.apache.spark.HashPartitioner
import scala.collection.mutable.ListBuffer

val folder= "lubm"
val dataset= "univ"
val scale="10k"
val coreNumber = 20
val machine = sc.defaultParallelism /coreNumber
val part =  sc.defaultParallelism

val folderName = folder + scale
val fileName   = dataset + scale

val t1 = java.lang.System.currentTimeMillis();

// loading and transformating the dataset
val triples0 = sc.textFile(s"/user/olivier/${folderName}/${fileName}_encoded_unique").map(x => x.split(" ")).map(t => ((t(0).toLong+t(1).toLong+t(2).toLong).hashCode%machine,(t(0).toLong,t(1).toLong, t(2).toLong))).persist
triples0.count
val t2= java.lang.System.currentTimeMillis();

println("Loading time "+ (t2 - t1) +" msec for "+part+" partitions");

// Partitioning the dataset
val triples = triples0.map{case(f,(s,p,o))=>((s+p+o),(s,p,o))}.partitionBy(new HashPartitioner(part))

//triples.persist
triples.count

val t3= java.lang.System.currentTimeMillis();

println("Partitioning time "+ (t3 - t2) +" msec for "+part+" partitions");

val oDist = triples.map{ case(f,(s,p,o)) => (f,1)}.reduceByKey( (a,b) => a+b)
val oMean = oDist.map{case(f,c)=>c}.sum / machine
val odevs = oDist.map{case(f,c)=>c}.map(score => (score - oMean) * (score - oMean))
val ostddev = Math.sqrt(odevs.sum / machine)

triples.partitions.length

// declare constants for BGP translation
// This constants have been generated with our encoding method
val advisor : Long = 1233125376
val worksFor : Long = 1136656384
val suborg : Long = 1224736768
val memof : Long = 1132462080
val undeg : Long = 1101004800
val teaof : Long = 1199570944
val takco : Long = 1115684864

def ajout(a : ListBuffer[(Long, Long, Long)], e: (Long, Long, Long) ) : ListBuffer[(Long, Long, Long)] = {
  a += e
  return a
}

    // -----------------------------------------------------------
    // Query 1 : (not part of the benchmark)
    // Pattern: (x advisor y) (y worksFor z) (z subOrganisation t)
    // -----------------------------------------------------------

var t1 = java.lang.System.currentTimeMillis();

var pataws = triples.filter({case(s,p,o) => p==advisor}).map({case(s,p,o) => (o,s)}).
    join(triples.filter({case(s,p,o) => p==worksFor}).map({case(s,p,o) => (s,o)}),part).
    map({case (y,(x,z)) => (z,(x,y))}).
    join(triples.filter({case(s,p,o) => p==suborg}).map({case(s,p,o) => (s,o)}),part).
    map({case (y,(x,z)) => (z,(x,y))}).distinct

pataws.count

var t2= java.lang.System.currentTimeMillis();

println("Processing Q1 "+ (t2 - t1) +" msec for "+part+" partitions");


    // -----------------------------------------------------------
    // LUBM 2 : MSU
    // Pattern: (x memberOf y) (y subOrg z) (x UndergraduateDegreeFrom z)
    // -----------------------------------------------------------

var t1 = java.lang.System.currentTimeMillis();

var pmemof = triples.filter({case(s,p,o) => p==memof}).cache()

var patmsu = pmemof.map({case(s,p,o) => (o,s)}).
             join(triples.filter({case(s,p,o) => p==suborg}).map({case(s,p,o) => (s,o)}),part).
             map({case (y,(x,z)) => (x+""+z,(x,y,z))}).
             join(triples.filter({case(s,p,o) => p==undeg}).map({case(x,p,z)=> (x+""+z,null)}), part)

patmsu.count

var patmsu2 = patmsu.flatMap( identity).distinct

var t2= java.lang.System.currentTimeMillis();

println("Processing Q2 "+ (t2 - t1) +" msec for "+part+" partitions");

    // -----------------------------------------------------------
    // LUBM 9 : ATT
    // Pattern: (x advisor y) (y teacherOf z) (x takesCourse z)
    // -----------------------------------------------------------

var t1 = java.lang.System.currentTimeMillis();

var patatt = triples.filter({case(s,p,o) => p==advisor}).map({case(s,p,o) => (o,s)}).
              join(triples.filter({case(s,p,o) => p==teaof}).map({case(s,p,o) => (s,o)}),part).
              map({case (y,(x,z)) => (x+""+z,(x,y,z))}).
              join(triples.filter({case(s,p,o) => p==takco}).map({case(s,p,o)=> (s+""+o,null)}),part)

patatt.distinct.count

var t2= java.lang.System.currentTimeMillis();

println("Processing Q3 (LUBM #9) "+ (t2 - t1) +" msec for "+part+" partitions");

    // -----------------------------------------------------------
    // LUBM 12 : WS
    // Pattern: (y worksFor z) (z subOrganisation t)
    // -----------------------------------------------------------

var t1 = java.lang.System.currentTimeMillis();

var patws = triples.filter({case(s,p,o) => p==worksFor}).map({case(s,p,o) => (o,(s,p,o))}).
              join(triples.filter({case(s,p,o) => p==suborg}).map({case(s,p,o) => (s,(s,p,o))}),part).
              map({case (k,((s1,p1,o1),(s2,p2,o2))) => (s1,o1,o2)})

var ans_patws = patws.distinct.count

var t2= java.lang.System.currentTimeMillis();

println("Processing LUBM #12 "+ (t2 - t1) +" msec for "+part+" partitions");
Hashing applied on the subject

Partitioning-based approaches

nHop
WARP
Hybrid

Datasets excerpts

site/recherche/logiciels/rdfdist.1431340207.txt.gz · Dernière modification: 11/05/2015 12:30 par amine