This shows you the differences between two versions of the page.
Both sides previous revision Previous revision Next revision | Previous revision | ||
en:site:recherche:logiciels:sparqlwithspark:snowflakeq8 [15/09/2016 16:13] hubert [SnowFlake query (LUBM's Q8)] |
en:site:recherche:logiciels:sparqlwithspark:snowflakeq8 [16/09/2016 23:14] (current) hubert |
||
---|---|---|---|
Line 1: | Line 1: | ||
- | {{indexmenu_n>5}} | + | {{indexmenu_n>8}} |
====== SnowFlake query Q8 plans ====== | ====== SnowFlake query Q8 plans ====== | ||
Line 5: | Line 5: | ||
- | ===== Plan for Spark 1.5 ===== | + | ===== Plans for Spark 1.5 ===== |
- | <code> | + | <code scala> |
import org.apache.spark.rdd.RDD | import org.apache.spark.rdd.RDD | ||
import org.apache.spark.SparkContext | import org.apache.spark.SparkContext | ||
- | //import org.apache.spark.SparkContext._ | ||
//DF | //DF | ||
import org.apache.spark.sql.DataFrame | import org.apache.spark.sql.DataFrame | ||
- | //import sqlContext.implicits._ | ||
- | //import org.apache.spark.sql._ | ||
- | |||
- | // Import Row. | ||
- | // import org.apache.spark.sql.Row; | ||
- | |||
- | // Import Spark SQL data types | ||
- | //import org.apache.spark.sql.types.{StructType,StructField,LongType,StringType}; | ||
import scala.reflect.ClassTag | import scala.reflect.ClassTag | ||
- | //import scala.collection.mutable.ListBuffer | ||
import org.apache.spark.HashPartitioner | import org.apache.spark.HashPartitioner | ||
- | |||
- | //import java.io.Serializable | ||
- | |||
- | |||
val NB_FRAGMENTS = sc.defaultParallelism | val NB_FRAGMENTS = sc.defaultParallelism | ||
Line 60: | Line 46: | ||
SOByName.count | SOByName.count | ||
//328 620 776 | //328 620 776 | ||
- | |||
- | // on garde le dictionnaire en tant que RDD et pas en tant que DataFrame, car un DataFrame n'a pas la méthode lookup (avec accès à une seule partition) | ||
- | // durée moyenne du lookup d'un sujet avec un RDD trié vS partitionné | ||
- | // sortByKey: 330 ms | ||
- | // partitionBy (300 part): 160 ms | ||
- | // partitionBy (1200 part): 66 ms | ||
- | // partitionBy (3000 part): 83 ms | ||
Line 125: | Line 104: | ||
// -------------------------------------------------------- | // -------------------------------------------------------- | ||
- | // charger les DONNEES et les partitionner par sujet | + | // load data either random partitioning or subject based |
// ------------------------------------------------------- | // ------------------------------------------------------- | ||
Line 293: | Line 272: | ||
// --------------------------------------------- | // --------------------------------------------- | ||
- | // Q8 using the MinScan DF method | + | // Q8 using the SPARQL Hybrid DF method |
// ============================================== | // ============================================== | ||
Line 303: | Line 282: | ||
class IterP2P4(iter: Iterator[org.apache.spark.sql.Row]) extends Iterator[Long] { | class IterP2P4(iter: Iterator[org.apache.spark.sql.Row]) extends Iterator[Long] { | ||
- | // A FAIRE: remplacer la liste a par une MAP pour éviter de faire le groupBy subject | + | val a: scala.collection.mutable.ArrayBuffer[org.apache.spark.sql.Row] = new scala.collection.mutable.ArrayBuffer() |
- | // -------- | + | |
- | val a: scala.collection.mutable.ArrayBuffer[org.apache.spark.sql.Row] = new scala.collection.mutable.ArrayBuffer() | + | |
var i: Iterator[Long] = null | var i: Iterator[Long] = null | ||
def hasNext = { | def hasNext = { | ||
Line 333: | Line 310: | ||
queryTime(tmp24) | queryTime(tmp24) | ||
- | // diffusion de la petite sous-requete P2P4 (broadcast) | + | // P2P4 broadcast |
val bcEtoile24 = sc.broadcast(tmp24.map(x => (x.getLong(0), true)).collect.toMap) | val bcEtoile24 = sc.broadcast(tmp24.map(x => (x.getLong(0), true)).collect.toMap) | ||
/* | /* | ||
- | class IterQ8 : broadcast de l'étoile p2 p4 pour jointure avec l'étoile p1 p3 p5 | + | class IterQ8 : broadcast de l'étoile p2 p4 and join with p1 p3 p5 |
------------------- | ------------------- | ||
*/ | */ |