Désarchiver dans votre espace perso
/Infos/bd/spark/dataset/tpch/tpch-extrait.tgz
L'archive contient deux fichiers lineitem.tbl et part.tbl, chacun ayant pour en-tete le schéma de la table qui porte son nom. Les fichiers sont en csv, voici un extrait de part.tbl
PARTKEY,NAME,MFGR,BRAND,TYPE,SIZE,CONTAINER,RETAILPRICE,COMMENT 1,goldenrod lace spring peru powder,Manufacturer#1,Brand#13,PROMO BURNISHED COPPER,7,JUMBO PKG,901.00,final deposits s 2,blush rosy metallic lemon navajo,Manufacturer#1,Brand#13,LARGE BRUSHED BRASS,1,LG CASE,902.00,final platelets hang f
En étant dans tpch-extrait, créer les données dans hdfs en tapant
hadoop fs -mkdir /tpch hadoop fs -put lineitem.tbl /tpch hadoop fs -put part.tbl /tpch
puis vérifier que les fichiers existent
hadoop fs -ls /tpch
Les instructions suivantes sont communes aux deux sous-sections qui suivent et permette de charger les fichiers de l'archive dans des RDD.
val tpch="/tpch/" val lineitem_t = tpch+"lineitem.tbl" val part_t = tpch+"part.tbl"
Les instructions suivantes correspondent à la version Q17 simplifiée de TPCH suivante (attention, syntaxe incompatible avec certains compilateurs SQL)
SELECT SUM(l_extendedprice) / 7.0 AS avg_yearly FROM (SELECT l_partkey, 0.2* avg(l_quantity) AS t1 FROM lineitem GROUP BY l_partkey) AS INNER, (SELECT l_partkey,l_quantity,l_extendedprice FROM lineitem, part WHERE p_partkey = l_partkey) AS OUTER WHERE OUTER.l_partkey = INNER.l_partkey AND OUTER.l_quantity < INNER.t1;
On peut exprimer cette requête en RDD comme suit
val lineitem = sc.textFile(lineitem_t). filter(!_.contains("ORDERKEY")) .map(x=>x.split(",")) .map(x=>(x(1).toInt,x(4).toInt,x(5).toDouble,x(6).toDouble,x(8),x(9))) //schema (PARTKEY, QUANTITY, EXTENDEDPRICE,DISCOUNT,RETURNFLAG,LINESTATUS ) //count = 6,001,215 val part = sc.textFile(part_t) .filter(!_.contains("PARTKEY")) .map(x=>x.split(",")) .map(x=>(x(0).toInt)) //schema (PARTKEY) //count = 200,000 def myAvg(tab:Iterable[Int])=tab.reduce(_+_)/tab.size val inner = lineitem .map{case(partkey,quantity,_)=>(partkey,quantity)} .groupByKey .mapValues(x=>.2*myAvg(x)) val outer = lineitem .map{case(partkey,quantity,extended)=>(partkey,(quantity,extended))} .join(part.map(x=>(x,null))) .map{case(partkey,((quantity,extended),_))=>(partkey,(quantity,extended))} val query = inner .join(outer) .filter{case(partkey,(t1,(quantity,extended)))=>quantity<t1} .map{case(partkey,(t1,(quantity,extended)))=>extended} val res = query.sum/7
Pour afficher le plan d'exécution en mode textuel, taper
print(query.toDebugString)
Pour visualiser le plan d'exécution en version graphique, aller sur http://:4040 puis sur l'onglet “Stages” (voir la doc de Spark concernant le monitoring )
Le but ici est d'exprimer la requête Q17 modifiée en dataset. Pour ce faire, commencer par charger les données tout en inférant les types de chaque attribut.
import spark.implicits._ val lineitem = spark.read.format("csv").option("header",true).option("inferSchema",true).load(lineitem_t).coalesce(6) val part = spark.read.format("csv").option("header",true).option("inferSchema",true).load(part_t).coalesce(1)
Les instructions suivantes expriment les sous-expression de la requête
val inner = lineitem.groupBy("PARTKEY").avg("QUANTITY").withColumnRenamed("avg(QUANTITY)","p_quantity") val outer = lineitem.join(part, "PARTKEY").select("PARTKEY", "QUANTITY", "EXTENDEDPRICE") val q17_simp = inner.join(outer, "PARTKEY").where("p_quantity<QUANTITY").agg(sum($"EXTENDEDPRICE")/7) q17_simp.show()
Pour examiner les plans logiques et physique utiliser le explain
q17_simp.explain(true)
Il est aussi possible de visualizer le plan physique et les Stages de l'exécution en utilisant l'interface graphique.
Les extrait du cours sont dans
/Infos/bd/spark/dataset/json/json_samples.tar
Chaque fichier de l'archive respecte le format Json lines et contient une collection d'objets JSON. L'instruction suivant permet de charger une collection depuis fichier.json dans un dataset coll
val coll = spark.read.json(fichier.json)