Bases de Données / Databases

Site Web de l'équipe BD du LIP6 / LIP6 DB Web Site

Outils pour utilisateurs

Outils du site


site:enseignement:master:bdle:supports-cours:spark

Supports Cours

Datasets utilisés

TPCH

Désarchiver dans votre espace perso

/Infos/bd/spark/dataset/tpch/tpch-extrait.tgz

L'archive contient deux fichiers lineitem.tbl et part.tbl, chacun ayant pour en-tete le schéma de la table qui porte son nom. Les fichiers sont en csv, voici un extrait de part.tbl

PARTKEY,NAME,MFGR,BRAND,TYPE,SIZE,CONTAINER,RETAILPRICE,COMMENT
1,goldenrod lace spring peru powder,Manufacturer#1,Brand#13,PROMO BURNISHED COPPER,7,JUMBO PKG,901.00,final deposits s
2,blush rosy metallic lemon navajo,Manufacturer#1,Brand#13,LARGE BRUSHED BRASS,1,LG CASE,902.00,final platelets hang f

En étant dans tpch-extrait, créer les données dans hdfs en tapant

hadoop fs -mkdir /tpch
hadoop fs -put lineitem.tbl /tpch
hadoop fs -put part.tbl /tpch

puis vérifier que les fichiers existent

hadoop fs -ls /tpch

Les instructions suivantes sont communes aux deux sous-sections qui suivent et permette de charger les fichiers de l'archive dans des RDD.

val tpch="/tpch/"
val lineitem_t = tpch+"lineitem.tbl"
val part_t = tpch+"part.tbl"

Spark RDD

Les instructions suivantes correspondent à la version Q17 simplifiée de TPCH suivante (attention, syntaxe incompatible avec certains compilateurs SQL)

SELECT SUM(l_extendedprice) / 7.0 AS avg_yearly 
FROM (SELECT l_partkey, 0.2* avg(l_quantity) AS t1
	 FROM lineitem GROUP BY l_partkey) AS INNER, 
	(SELECT 		l_partkey,l_quantity,l_extendedprice 		FROM lineitem, part
	WHERE p_partkey = l_partkey) AS 	OUTER 
WHERE OUTER.l_partkey = INNER.l_partkey AND OUTER.l_quantity < INNER.t1;

On peut exprimer cette requête en RDD comme suit

val lineitem = sc.textFile(lineitem_t).
		filter(!_.contains("ORDERKEY"))
		.map(x=>x.split(","))
		.map(x=>(x(1).toInt,x(4).toInt,x(5).toDouble,x(6).toDouble,x(8),x(9)))
//schema (PARTKEY, QUANTITY, EXTENDEDPRICE,DISCOUNT,RETURNFLAG,LINESTATUS )
//count = 6,001,215
 
val part = sc.textFile(part_t)
	.filter(!_.contains("PARTKEY"))
	.map(x=>x.split(","))
	.map(x=>(x(0).toInt))
 
//schema (PARTKEY)
//count = 200,000
 
def myAvg(tab:Iterable[Int])=tab.reduce(_+_)/tab.size
val inner = lineitem
	.map{case(partkey,quantity,_)=>(partkey,quantity)}
	.groupByKey
	.mapValues(x=>.2*myAvg(x))
 
val outer = lineitem
		.map{case(partkey,quantity,extended)=>(partkey,(quantity,extended))}
		.join(part.map(x=>(x,null)))
		.map{case(partkey,((quantity,extended),_))=>(partkey,(quantity,extended))}
 
val query = inner
		.join(outer)
		.filter{case(partkey,(t1,(quantity,extended)))=>quantity<t1}
		.map{case(partkey,(t1,(quantity,extended)))=>extended}
 
val res = query.sum/7

Pour afficher le plan d'exécution en mode textuel, taper

print(query.toDebugString)

Pour visualiser le plan d'exécution en version graphique, aller sur http://:4040 puis sur l'onglet “Stages” (voir la doc de Spark concernant le monitoring )

Spark SQL

Le but ici est d'exprimer la requête Q17 modifiée en dataset. Pour ce faire, commencer par charger les données tout en inférant les types de chaque attribut.

import spark.implicits._
 
 
val lineitem = spark.read.format("csv").option("header",true).option("inferSchema",true).load(lineitem_t).coalesce(6)
 
val part = spark.read.format("csv").option("header",true).option("inferSchema",true).load(part_t).coalesce(1)

Les instructions suivantes expriment les sous-expression de la requête

val inner = lineitem.groupBy("PARTKEY").avg("QUANTITY").withColumnRenamed("avg(QUANTITY)","p_quantity")

val outer = lineitem.join(part, "PARTKEY").select("PARTKEY", "QUANTITY", "EXTENDEDPRICE")

val q17_simp = inner.join(outer, "PARTKEY").where("p_quantity<QUANTITY").agg(sum($"EXTENDEDPRICE")/7)

q17_simp.show()

Pour examiner les plans logiques et physique utiliser le explain

q17_simp.explain(true)

Il est aussi possible de visualizer le plan physique et les Stages de l'exécution en utilisant l'interface graphique.

Analyser du JSON en Spark SQL

Les extrait du cours sont dans

/Infos/bd/spark/dataset/json/json_samples.tar

Chaque fichier de l'archive respecte le format Json lines et contient une collection d'objets JSON. L'instruction suivant permet de charger une collection depuis fichier.json dans un dataset coll

val coll = spark.read.json(fichier.json)
site/enseignement/master/bdle/supports-cours/spark.txt · Dernière modification: 15/11/2018 18:12 par amine