Bases de Données / Databases

Site Web de l'équipe BD du LIP6 / LIP6 DB Web Site

Outils pour utilisateurs

Outils du site


site:enseignement:master:bdle:supports-cours:spark

Ceci est une ancienne révision du document !


Supports Cours

Datasets utilisés

TPCH

Copier dans votre espace perso puis désarchiver

/.../tpch-extrait.tgz

En étant dans tpch-extrait, créer les données dans hdfs en tapant

hadoop fs -mkdir /tpch
hadoop fs -put lineitem.tbl /tpch
hadoop fs -put part.tbl /tpch

puis vérifier que les fichiers existent

hadoop fs -ls /tpch

Spark RDD

Commencer par charger les données en RDD

val tpch="/tpch/"
val lineitem_t = tpch+"lineitem.tbl"
val part_t = tpch+"part.tbl"

Les instructions suivantes correspondent à la version Q17 simplifiée de TPCH suivante (attention, syntaxe incompatible avec certains compilateurs SQL)

SELECT SUM(l_extendedprice) / 7.0 AS avg_yearly 
FROM (SELECT l_partkey, 0.2* avg(l_quantity) AS t1
	 FROM lineitem GROUP BY l_partkey) AS INNER, 
	(SELECT 		l_partkey,l_quantity,l_extendedprice 		FROM lineitem, part
	WHERE p_partkey = l_partkey) AS 	OUTER 
WHERE OUTER.l_partkey = INNER.l_partkey AND OUTER.l_quantity < INNER.t1;

On peut exprimer cette requête en RDD comme suit

val lineitem = sc.textFile(lineitem_t).
		filter(!_.contains("ORDERKEY"))
		.map(x=>x.split(","))
		.map(x=>(x(1).toInt,x(4).toInt,x(5).toDouble,x(6).toDouble,x(8),x(9)))
//schema (PARTKEY, QUANTITY, EXTENDEDPRICE,DISCOUNT,RETURNFLAG,LINESTATUS )
//count = 6,001,215
 
val part = sc.textFile(part_t)
	.filter(!_.contains("PARTKEY"))
	.map(x=>x.split(","))
	.map(x=>(x(0).toInt))
 
//schema (PARTKEY)
//count = 200,000
 
def myAvg(tab:Iterable[Int])=tab.reduce(_+_)/tab.size
val inner = lineitem
	.map{case(partkey,quantity,_)=>(partkey,quantity)}
	.groupByKey
	.mapValues(x=>.2*myAvg(x))
 
val outer = lineitem
		.map{case(partkey,quantity,extended)=>(partkey,(quantity,extended))}
		.join(part.map(x=>(x,null)))
		.map{case(partkey,((quantity,extended),_))=>(partkey,(quantity,extended))}
 
val query = inner
		.join(outer)
		.filter{case(partkey,(t1,(quantity,extended)))=>quantity<t1}
		.map{case(partkey,(t1,(quantity,extended)))=>extended}
 
val res = query.sum/7

Pour afficher le plan d'exécution en mode textuel, taper

print(query.toDebugString)

Pour visualiser le plan d'exécution en version graphique, aller sur http://:4040 puis sur l'onglet “Stages” (voir la doc de Spark concernant le monitoring )

Spark SQL

site/enseignement/master/bdle/supports-cours/spark.1542300045.txt.gz · Dernière modification: 15/11/2018 17:40 par amine