Ceci est une ancienne révision du document !
Désarchiver dans votre espace perso
/Infos/bd/spark/dataset/tpch/tpch-extrait.tgz
L'archive contient deux fichiers lineitem.tbl et part.tbl, chacun ayant pour en-tete le schéma de la table qui porte son nom. Les fichiers sont en csv, voici un extrait de part.tbl
PARTKEY,NAME,MFGR,BRAND,TYPE,SIZE,CONTAINER,RETAILPRICE,COMMENT 1,goldenrod lace spring peru powder,Manufacturer#1,Brand#13,PROMO BURNISHED COPPER,7,JUMBO PKG,901.00,final deposits s 2,blush rosy metallic lemon navajo,Manufacturer#1,Brand#13,LARGE BRUSHED BRASS,1,LG CASE,902.00,final platelets hang f
En étant dans tpch-extrait, créer les données dans hdfs en tapant
hadoop fs -mkdir /tpch hadoop fs -put lineitem.tbl /tpch hadoop fs -put part.tbl /tpch
puis vérifier que les fichiers existent
hadoop fs -ls /tpch
Les instructions suivantes sont communes aux deux sous-sections qui suivent et permette de charger les fichiers de l'archive dans des RDD.
val tpch="/tpch/" val lineitem_t = tpch+"lineitem.tbl" val part_t = tpch+"part.tbl"
Les instructions suivantes correspondent à la version Q17 simplifiée de TPCH suivante (attention, syntaxe incompatible avec certains compilateurs SQL)
SELECT SUM(l_extendedprice) / 7.0 AS avg_yearly FROM (SELECT l_partkey, 0.2* avg(l_quantity) AS t1 FROM lineitem GROUP BY l_partkey) AS INNER, (SELECT l_partkey,l_quantity,l_extendedprice FROM lineitem, part WHERE p_partkey = l_partkey) AS OUTER WHERE OUTER.l_partkey = INNER.l_partkey AND OUTER.l_quantity < INNER.t1;
On peut exprimer cette requête en RDD comme suit
val lineitem = sc.textFile(lineitem_t). filter(!_.contains("ORDERKEY")) .map(x=>x.split(",")) .map(x=>(x(1).toInt,x(4).toInt,x(5).toDouble,x(6).toDouble,x(8),x(9))) //schema (PARTKEY, QUANTITY, EXTENDEDPRICE,DISCOUNT,RETURNFLAG,LINESTATUS ) //count = 6,001,215 val part = sc.textFile(part_t) .filter(!_.contains("PARTKEY")) .map(x=>x.split(",")) .map(x=>(x(0).toInt)) //schema (PARTKEY) //count = 200,000 def myAvg(tab:Iterable[Int])=tab.reduce(_+_)/tab.size val inner = lineitem .map{case(partkey,quantity,_)=>(partkey,quantity)} .groupByKey .mapValues(x=>.2*myAvg(x)) val outer = lineitem .map{case(partkey,quantity,extended)=>(partkey,(quantity,extended))} .join(part.map(x=>(x,null))) .map{case(partkey,((quantity,extended),_))=>(partkey,(quantity,extended))} val query = inner .join(outer) .filter{case(partkey,(t1,(quantity,extended)))=>quantity<t1} .map{case(partkey,(t1,(quantity,extended)))=>extended} val res = query.sum/7
Pour afficher le plan d'exécution en mode textuel, taper
print(query.toDebugString)
Pour visualiser le plan d'exécution en version graphique, aller sur http://:4040 puis sur l'onglet “Stages” (voir la doc de Spark concernant le monitoring )
Le but ici est d'exprimer la requête Q17 modifiée en dataset. Pour ce faire, commencer par charger les données tout en inférant les types de chaque attribut.
import spark.implicits._ val lineitem = spark.read.format("csv").option("header",true).option("inferSchema",true).load(lineitem_t).coalesce(6) val part = spark.read.format("csv").option("header",true).option("inferSchema",true).load(part_t).coalesce(1)
Les instructions suivantes expriment les sous-expression de la requête
val inner = lineitem.groupBy("PARTKEY").avg("QUANTITY").withColumnRenamed("avg(QUANTITY)","p_quantity") val outer = lineitem.join(part, "PARTKEY").select("PARTKEY", "QUANTITY", "EXTENDEDPRICE") val q17_simp = inner.join(outer, "PARTKEY").where("p_quantity<QUANTITY").agg(sum($"EXTENDEDPRICE")/7) q17_simp.show()
Pour examiner les plans logiques et physique utiliser le explain
q17_simp.explain(true)
Il est aussi possible de visualizer le plan physique et les Stages de l'exécution en utilisant l'interface graphique.
Les extrait du cours sont dans
/Infos/bd/spark/dataset/json/json_samples.tar<code> Chaque fichier de l'archive respecte le format [[http://jsonlines.org|Json lines]] et contient une collection d'objets JSON. L'instruction suivant permet de charger une collection depuis //fichier.json// dans un dataset //coll// <code scala>val coll = spark.read.json(fichier.json)