Bases de Données / Databases

Site Web de l'équipe BD du LIP6 / LIP6 DB Web Site

Outils pour utilisateurs

Outils du site


site:enseignement:master:bdle:supports-cours:spark

Différences

Ci-dessous, les différences entre deux révisions de la page.

Lien vers cette vue comparative

Les deux révisions précédentes Révision précédente
Prochaine révision
Révision précédente
site:enseignement:master:bdle:supports-cours:spark [15/11/2018 17:20]
amine
site:enseignement:master:bdle:supports-cours:spark [15/11/2018 18:12] (Version actuelle)
amine [Analyser du JSON en Spark SQL]
Ligne 2: Ligne 2:
 ===== Datasets utilisés ===== ===== Datasets utilisés =====
 ==== TPCH ==== ==== TPCH ====
 +Désarchiver dans votre espace perso <code bash>/​Infos/​bd/​spark/​dataset/​tpch/​tpch-extrait.tgz</​code>​
 +L'​archive contient deux fichiers lineitem.tbl et  part.tbl, chacun ayant pour en-tete le schéma de la table qui porte son nom. Les fichiers sont en csv, voici un extrait de part.tbl
 +<​code>​
 +PARTKEY,​NAME,​MFGR,​BRAND,​TYPE,​SIZE,​CONTAINER,​RETAILPRICE,​COMMENT
 +1,goldenrod lace spring peru powder,​Manufacturer#​1,​Brand#​13,​PROMO BURNISHED COPPER,​7,​JUMBO PKG,​901.00,​final deposits s
 +2,blush rosy metallic lemon navajo,​Manufacturer#​1,​Brand#​13,​LARGE BRUSHED BRASS,1,LG CASE,​902.00,​final platelets hang f
 +</​code>​
 +En étant dans tpch-extrait,​ créer les données dans hdfs en tapant
 +<code bash>
 +hadoop fs -mkdir /tpch
 +hadoop fs -put lineitem.tbl /tpch
 +hadoop fs -put part.tbl /tpch
 +</​code>​
 +puis vérifier que les fichiers existent
 +<code bash>
 +hadoop fs -ls /tpch
 +</​code>​
 +
 +
 +Les instructions suivantes sont communes aux deux sous-sections qui suivent et permette de charger les fichiers de l'​archive dans des RDD.
 +
 +<code scala>
 +val tpch="/​tpch/"​
 +val lineitem_t = tpch+"​lineitem.tbl"​
 +val part_t = tpch+"​part.tbl"​
 +</​code>​
 +
  
  
 ===== Spark RDD ===== ===== Spark RDD =====
 +
 +Les instructions suivantes correspondent à la version Q17 simplifiée de TPCH suivante (attention, syntaxe incompatible avec certains compilateurs SQL)
 +<code sql>
 +
 +SELECT sum(l_extendedprice) / 7.0 AS avg_yearly ​
 +FROM (SELECT l_partkey, 0.2* avg(l_quantity) AS t1
 + FROM lineitem GROUP BY l_partkey) AS inner, ​
 + (SELECT l_partkey,​l_quantity,​l_extendedprice FROM lineitem, part
 + WHERE p_partkey = l_partkey) AS outer ​
 +WHERE outer.l_partkey = inner.l_partkey AND outer.l_quantity < inner.t1;
 +</​code>​
 +
 +On peut exprimer cette requête en RDD comme suit
 +<code scala>
 +
 +val lineitem = sc.textFile(lineitem_t).
 + filter(!_.contains("​ORDERKEY"​))
 + .map(x=>​x.split(","​))
 + .map(x=>​(x(1).toInt,​x(4).toInt,​x(5).toDouble,​x(6).toDouble,​x(8),​x(9)))
 +//schema (PARTKEY, QUANTITY, EXTENDEDPRICE,​DISCOUNT,​RETURNFLAG,​LINESTATUS )
 +//count = 6,001,215
 +
 +val part = sc.textFile(part_t)
 + .filter(!_.contains("​PARTKEY"​))
 + .map(x=>​x.split(","​))
 + .map(x=>​(x(0).toInt))
 +
 +//schema (PARTKEY)
 +//count = 200,000
 +
 +def myAvg(tab:​Iterable[Int])=tab.reduce(_+_)/​tab.size
 +val inner = lineitem
 + .map{case(partkey,​quantity,​_)=>​(partkey,​quantity)}
 + .groupByKey
 + .mapValues(x=>​.2*myAvg(x))
 +
 +val outer = lineitem
 + .map{case(partkey,​quantity,​extended)=>​(partkey,​(quantity,​extended))}
 + .join(part.map(x=>​(x,​null)))
 + .map{case(partkey,​((quantity,​extended),​_))=>​(partkey,​(quantity,​extended))}
 +
 +val query = inner
 + .join(outer)
 + .filter{case(partkey,​(t1,​(quantity,​extended)))=>​quantity<​t1}
 + .map{case(partkey,​(t1,​(quantity,​extended)))=>​extended}
 +
 +val res = query.sum/7
 +</​code>​
 +Pour afficher le plan d'​exécution en mode textuel, taper
 +<code scala>​print(query.toDebugString)
 +</​code>​
 +Pour visualiser le plan d'​exécution en version graphique, aller sur [[ http://:​4040]] puis sur l'​onglet "​Stages"​ (voir la [[https://​spark.apache.org/​docs/​latest/​monitoring.html|doc de Spark]] concernant le monitoring )
 +
 +
  
 ===== Spark SQL ===== ===== Spark SQL =====
  
 +Le but ici est d'​exprimer la requête Q17 modifiée en dataset. Pour ce faire, commencer par charger les données tout en inférant les types de chaque attribut. ​
 +
 +
 +<code scala>
 +import spark.implicits._
 +
 +
 +val lineitem = spark.read.format("​csv"​).option("​header",​true).option("​inferSchema",​true).load(lineitem_t).coalesce(6)
 +
 +val part = spark.read.format("​csv"​).option("​header",​true).option("​inferSchema",​true).load(part_t).coalesce(1)
 +
 +</​code>​
 +
 +Les instructions suivantes expriment les sous-expression de la requête
 +<​code>​
 +val inner = lineitem.groupBy("​PARTKEY"​).avg("​QUANTITY"​).withColumnRenamed("​avg(QUANTITY)","​p_quantity"​)
 +
 +val outer = lineitem.join(part,​ "​PARTKEY"​).select("​PARTKEY",​ "​QUANTITY",​ "​EXTENDEDPRICE"​)
 +
 +val q17_simp = inner.join(outer,​ "​PARTKEY"​).where("​p_quantity<​QUANTITY"​).agg(sum($"​EXTENDEDPRICE"​)/​7)
 +
 +q17_simp.show()
 +</​code>​
 +
 +Pour examiner les plans logiques et physique utiliser le explain
 +<code scala>
 +q17_simp.explain(true)
 +</​code>​
 +Il est aussi possible de visualizer le plan physique et les Stages de l'​exécution en utilisant l'​interface graphique.
 +===== Analyser du JSON en Spark SQL =====
 +Les extrait du cours sont dans <code bash>/​Infos/​bd/​spark/​dataset/​json/​json_samples.tar</​code>​
 +Chaque fichier de l'​archive respecte le format [[http://​jsonlines.org|Json lines]] et contient une collection d'​objets JSON.
 +L'​instruction suivant permet de charger une collection depuis //​fichier.json//​ dans un dataset //coll//
 +
 +<code scala>​val coll = spark.read.json(fichier.json)</​code>​
site/enseignement/master/bdle/supports-cours/spark.1542298846.txt.gz · Dernière modification: 15/11/2018 17:20 par amine