L'objectif de ce TME est de comprendre les différentes façons de traiter une jointure dans un environnement d'exécution parallèle et réparti.
On manipule les données avec l'API Spark en Scala. Dans votre navigateur, ouvrir 2 onglets pour l'API des RDD et celle des DataSet
Quand votre programme spark s'exécutera, vous pourrez ouvrir l'interface Web dans le navigateur local http://localhost:4040
source /Infos/bd/spark-config idea.sh &
Dans IDEA : importer le fichier PartitionEtudiant.scala en tant que script scala dans votre projet courant. Exemple si votre projet s'appelle BDLE :
cd cp /Infos/bd/spark/PartitionEtudiant.scala IdeaProjects/BDLE/src/main/scala
case class Triplet(sujet: String, prop: String, objet: String) // Extraire les 10 premiers éléments de chaque partition // ----------------------------------------------------- def entete(numP: Int, iter: Iterator[Any]): Iterator[String] = { val s = new StringBuffer() s.append("Contenu de la partition " + numP + " \n") var i = 0 while(iter.hasNext && i < 10) { i+=1 s.append(iter.next + "\n") } if(iter.hasNext) s.append( "... \n") return List(s.toString()).iterator } def affichePartitions[T: ClassTag](d: Dataset[T]): Unit = d.rdd.mapPartitionsWithIndex(entete).collect.foreach(println) // Déterminer le nombre d'éléments contenus dans chaque partition // --------------------------------------------------------------- def nbObjet(numP: Int, iter: Iterator[Any]): Iterator[(String)] = { var nb = 0 for(t <- iter) { nb += 1 } val res = ("La partition " + numP + " a " + nb + " éléments") return List(res).iterator } // Supprimer les collections persistant en mémoire def clearStorage() = { spark.catalog.clearCache() }
On utilise les données de YAGO du fichier yagoMelange.json. Consulter un extrait des données :
ls -lh /Infos/bd/spark/dataset/yago/yagoMelange*.json head -n 20 /Infos/bd/spark/dataset/yago/yagoMelange.json head -n 20 /Infos/bd/spark/dataset/yago/yagoMelange1M.json
Données réparties sur plusieurs machines Pour faciliter le bon déroulement du TME, chaque étudiant utilise une seule machine contenant toutes les données d'exemple. Par la suite on considérera le cas plus général où les fragments sont répartis sur plusieurs machines. Tous les exercices de ce TME s'appliquent de la même façon sur une ou plusieurs machines.
Lire les données :
val yago = spark.read.json("/Infos/bd/spark/dataset/yago/yagoMelange1M.json"). select("sujet", "prop", "objet").as[Triplet] yago.show(20, false)
// Logement: une personne vit dans une ou plusieurs villes val a = List( "Alice vit Paris", "Dan vit Nice", "Bob vit Lyon", "Alice vit Aix", "Claire vit Aix", "Dan vit Paris" )
Répartir les données dans 4 partitions :
Répartition par découpage suivant l'ordre des données initiales (ressemble à un round robin)
val vit = spark.sparkContext.parallelize(a, 4).map(s=> s.split(" ")).map(t => Triplet(t(0), t(1), t(2))) // afficher les partitions vit.mapPartitionsWithIndex(entete).collect.foreach(println)
Contenu des partitions :
Contenu de la partition 0 Triplet(Alice,vit,Paris) Contenu de la partition 1 Triplet(Dan,vit,Nice) Triplet(Bob,vit,Lyon) Contenu de la partition 2 Triplet(Alice,vit,Aix) Contenu de la partition 3 Triplet(Claire,vit,Aix) Triplet(Dan,vit,Paris)
Répartition par hachage
Répartition par sujet (i.e. par personne)
val vitParPersonne = vit.toDS().repartition(4, col("sujet")) // afficher les partitions vitParPersonne.rdd.mapPartitionsWithIndex(entete).collect.foreach(println)
Contenu des partitions :
partition 0 : Triplet(Alice,vit,Paris) Triplet(Dan,vit,Nice) Triplet(Alice,vit,Aix) Triplet(Dan,vit,Paris) partition 1 : Triplet(Bob,vit,Lyon) partition 2 : Triplet(Claire,vit,Aix) partition 3 : vide
Répartition par objet (i.e. par ville)
val vitParVille = vit.toDS().repartition(4, col("objet")) // afficher les partitions vitParVille.rdd.mapPartitionsWithIndex(entete).collect.foreach(println)
Contenu des partitions :
partition 0 : Triplet(Dan,vit,Nice) partition 1 : vide partition 2 : Triplet(Bob,vit,Lyon) partition 3 : Triplet(Alice,vit,Paris) Triplet(Alice,vit,Aix) Triplet(Claire,vit,Aix) Triplet(Dan,vit,Paris)
Répartition par prop (ici il n'y a qu'une seule valeur de propriété: “vit”)
val f = vit.toDS().repartition(4, col("prop")) // afficher les partitions f.rdd.mapPartitionsWithIndex(entete).collect.foreach(println)
Contenu des partitions :
tous les triplets dans la partition 2
Répartition sans préciser de clé de partitionnement:
val g = vit.toDS().repartition(4) // afficher les partitions g.rdd.mapPartitionsWithIndex(entete).collect.foreach(println)
Contenu des partitions :
partition 0 : Triplet(Bob,vit,Lyon) Triplet(Dan,vit,Paris) partition 1 : vide partition 2 : vide partition 3 : Triplet(Alice,vit,Paris) Triplet(Dan,vit,Nice) Triplet(Alice,vit,Aix) Triplet(Claire,vit,Aix)
TRI et répartition par intervalle
val t1 = vit.toDS().orderBy("sujet") // afficher les partitions t1.rdd.mapPartitionsWithIndex(entete).collect.foreach(println)
Contenu des partitions :
partition 0 : Triplet(Alice,vit,Paris) Triplet(Alice,vit,Aix) partition 1 : Triplet(Bob,vit,Lyon) partition 2 : Triplet(Claire,vit,Aix) partition 3 : Triplet(Dan,vit,Nice) Triplet(Dan,vit,Paris) partition 4 vide
val t2 = vit.toDS().orderBy("objet") // afficher les partitions t2.rdd.mapPartitionsWithIndex(entete).collect.foreach(println)
Contenu des partitions :
partition 0 : Triplet(Alice,vit,Aix) Triplet(Claire,vit,Aix) partition 1 : Triplet(Bob,vit,Lyon) partition 2 : Triplet(Dan,vit,Nice) partition 3 : Triplet(Alice,vit,Paris) Triplet(Dan,vit,Paris) partition 4: vide
JOINTURE ET PARTITIONNEMENT
Sport: une personne pratique un ou plusieurs sports
val sport1 = List( "Alice fait vélo", "Claire fait surf", "Claire fait vélo", "Alice fait natation", "Claire fait moto", "Dan fait foot" ) val sport2 = spark.sparkContext.parallelize(sport1, 4).map(s=> s.split(" ")).map(t => Triplet(t(0), t(1), t(2))) // afficher les partitions sport2.mapPartitionsWithIndex(entete).collect.foreach(println)
JOINTURE sur des données partitionnées en round robin
//réinitialisation: vider le stockage persistent sc.getPersistentRDDs.values.foreach(x => x.unpersist()) val VIT = vit VIT.persist.count val SPORT = sport2 SPORT.setName("SPORT").persist.count val J1 = VIT.toDS().join(SPORT.toDS(), "sujet") J1.rdd.mapPartitionsWithIndex(entete).collect.foreach(println)
JOINTURE sur des données partitionnées sur le sujet
// réinitialisation: vider le stockage persistent cleanStorage() val VIT = vitParPersonne VIT.persist.count // afficher les partitions vitParPersonne.rdd.mapPartitionsWithIndex(entete).collect.foreach(println) //répartition du Sport par sujet (i.e. par personne) val sportParPersonne = sport2.toDS().repartition(4, col("sujet")) // afficher les partitions sportParPersonne.rdd.mapPartitionsWithIndex(entete).collect.foreach(println) val SPORT = sportParPersonne SPORT.persist.count val J2 = VIT.join(SPORT, "sujet") J2.rdd.mapPartitionsWithIndex(entete).collect.foreach(println)
Le but est d'observer et comprendre l'évaluation d'une requête en fonction du partitionnement des données. Evaluation d'une jointure lorsque les données sont partitionnées sur l'attribut (ou les attributs) d'équi-jointure. Inversement, évaluation d'une jointure lorsque le partitionnement des données diffère des attributs de jointure.
On partitionne les triplets du dataset YAGO de 3 façons différentes: dans l'ordre du initial fichier, par sujet et par objet.
val yagoFile = "/Infos/bd/spark/dataset/yago/yagoMelange1M.json" val yagoInitial = // à compléter ... yagoInitial.persist.count val yagoParSujet = yagoInitial.repartition(8, col("sujet")) yagoParSujet.persist.count val yagoParObjet = yagoInitial.repartition(8, col("objet")) yagoParObjet.persist.count
Afficher un extrait des données pour les sprinter (les personnes dont le sujet contient sprinter) :
val yagoSprinter = yago.where("sujet like '%(sprinter)%'") affichePartitions(yagoSprinter)
Est-ce que tous les triplets au sujet du sprinter Percy_Williams sont dans la même partition ? Combien de partitions faut-il lire pour trouver le genre et les nationalités de Percy_Williams ?
val percy = yago.where("sujet = '<Percy_Williams_(sprinter)>' ") affichePartitions(percy)
Remarque concernant le renommage d'attributs dans une requête de jointure. Lorsque les données sont partitionnées sur l'attribut de jointure, Spark tient compte du partitionnement seulement si l'attribut de jointure a le même nom que l'attribut de partitionnement (mentionné dans la méthode repartition ). Par exemple pour yagoParSujet, la jointure sur le sujet doit se faire sur l'attribut nommé 'sujet'. Si le nom de l'attribut diffère, cela provoque des shuffle inutiles.
Soit la requête R1 : Afficher la résidence et la nationalité des personnes
?x <livesIn> ?y . ?x <isCitizenOf> ?z
a) Vérifier que la requête s'exécute sans aucun transfert lorsque les données sont partitionnées par sujet (yagoParSujet). Combien d'étape (appellée stage) sont nécesaires pour évaluer cette requête ?
val t1 = yagoParSujet.where("prop = '<livesIn>'"). select(col("sujet"), col("objet") as "residence"). val t2 = .... A compléter val R1 = t1.join(t2,"sujet") R1.count() // pour traiter entièrement la requête //R1.show(10)
Puis consulter l'interface de spark: → dernier job, voir les stages et lire le nombre de shuffle read
Réponse
val t1 = yagoParSujet.where("prop = '<livesIn>'"). select(col("sujet"), col("objet") as "residence"). val t2 = yagoParSujet.where("prop = '<isCitizenOf>'"). select(col("sujet"), col("objet") as "nationalite"). val R1 = t1.join(t2,"sujet") R1.count() R1.show(10)
b) Mesurer la taille en MB des données transférées (shuffle read) pour évaluer cette requête sur les données yagoParObjet et yagoInitial.
val t1 = yagoParObjet.where...select... val t2 = yagoParObjet.... val R1 = t1.join(t2,"sujet") R1.count()
Réponse
// PAR OBJET val t1 = yagoParObjet.where("prop = '<livesIn>'"). select(col("sujet"), col("objet") as "residence") val t2 = yagoParObjet.where("prop = '<isCitizenOf>'"). select(col("sujet"), col("objet") as "nationalite") val R1 = t1.join(t2,"sujet") // Le show n'execute pas la requête entièrement : il ne permet pas de voir la quantité totale de shuffle read // R1.show(10) // Le count permet de voir la quantité totale de shuffle read nécesaire pour traiter la requête en entier R1.count
SHUFFLE READ = 2.3 MB
Mêmes questions pour la requête R2
?x <livesIn> ?y . ?y <isLocatedIn> ?z
Soit la requête R3 (en forme de flocon):
?x <influences> ?y . ?y <livesIn> ?v2 . ?x <livesIn> ?v1 . ?y <isCitizenOf> ?v3
Ecrire 3 solutions équivalentes pour traiter cette requête en faisant varier l'ordre des jointures. Exemples d'ordres de jointure :
?x <influences> ?y
et ?x <livesIn> ?v1
y <livesIn> ?v2
et ?y <isCitizenOf> ?v3
Autre ordre possible :
?x <influences> ?y
et y <livesIn> ?v2
?x <livesIn> ?v1
?y <isCitizenOf> ?v3
etc …
Pour les données partitionnées par sujet (yagoParSujet) quelle solution minimise les transferts?
Rmq: faire un R3.count() exécuter la requête entièrement et connaitre la quantité totale de données transférées (shuffle read).
Soit la requête R4 (en forme de triangle):
?x <influences> ?y . ?y <livesIn> ?z . ?x <livesIn> ?z
Ecrire plusieurs solutions équivalentes pour traiter cette requête en faisant varier l'ordre des jointures. Pour les données partitionnées par sujet (yagoParSujet) quelle ordre minimise les transferts?
Cette méthode est généralement plus rapide dans le cas d'une jointure entre une petite collection et une grande collection.
On considère une jointure entre T1 et T2. On suppose que la taille des données de T1 est petite par rapport à T2 (T1 « T2) et peut tenir entièrement en mémoire sur chaque machine qui évalue la jointure.
On suppose que T1 contient des couples (s1,o1), et que chaque s1 est associé à un seul o1. T1 peut être converti en un dictionnaire (c'est à dire une Map s1 → o1) permettant d'obtenir l'attribut o1 asocié à s1.
//convertir T1 en ensemble de paires (clé, valeur) puis en un dictionnaire val dictionnaire = T1.as[(String, String)].collect.toMap // on diffuse (ou broadcast) T1 sur chaque machine. val dicoReplique = sc.broadcast(dictionnaire) // equi jointure entre T2 et T1 sur le sujet (s1=s2) val jointureParBroadcast = T2.map{ case(s2,o2) => (s2, o2, dicoReplique.value(s2)) } jointureParBroadcast.take(10)
Vérifier dans le navigateur que le job de la jointure a une seule étape (stage). En combien de tâches indépendantes est décomposée cette étape ?
Soit la requête R5
?x <isLocatedIn> ?y . ?y <hasCapital> ?z
La grande relation est T2: les triplets <isLocatedIn>
précisent (entre autres) le pays d'une ville.
La petite relation est T1: les triplets <hasCapital>
précisent la capitale d'un pays
val T1 = yagoParSujet.where("prop = '<hasCapital>'"). select(col("sujet") as "y", col("objet") as "z").as[(String, String)] val T2 = ...
La requête affiche les personnes avec leur ville de résidence et le pays associé. Certaines données de t2 ne joignent pas avec t1: si aucun pays n'est associé avec la ville de résidence alors ne rien afficher.
Pour les données partitionnées par sujet, proposer une solution de jointure par boradcast qui est évaluée sans aucun shuffle.
val dicoReplique = sc.broadcast(T1.collect.toMap) val J = T2.flatMap{ case(s2,o2) => { dicoReplique.value.get(o2) match { case Some(o1) => Seq((s2,o2, o1)) case None => Seq() } } } J.take(10)
Généraliser la solution par broadcast pour une equi-jointure où o2 peut être associé à plusieurs o1 (exple un pays associé à plusieurs capitales).
Vérifier que votre solution est correcte en comparant son résultat avec celui de la jointure par broadcast implémentée nativement:
val J = T2.join(T1.hint("broadcast"), "attr") // où attr est l'attribut d'equi-jointure
Reprendre les requêtes R3 et R4 de l'exercice précédent et proposer une nouvelle solution qui peut utiliser les 2 algorithmes de jointures (répartie ou par broadcast) pour réduire la quantité de shuffle read.
Voir diapo du cours : Calculer la similarité entre 2 utilisateurs de MovieLens ou 2 individus de Yago
def jaccard(l1: Iterable[Int], l2: Iterable[Int]): Double = { return ... }
Les méthodes pouvant servir sont toList
(pour convertir un Iterable en List), l'intersection intersect
, et l'union union
de deux listes, toDouble
pour convertir un nombre entier en réel.
Ne pas saisir de tabulation dans le corps d'une fonction.
La fonction de similarité étant symétrique, proposer une solution pour réduire le coût des transferts. Peut-on éviter de diffuser toutes les données sur toutes les machines ?
Définir la fonction simPearson (voir diapo) qui tient compte des notes attribuées aux films.
La methode zipWithIndex (numérotant les éléments d'une collection) existe pour un RDD mais pas pour un Dataset.
Ecrire la fonction numeroration[T](d: Dataset[T])
qui retourne un Dataset avec des éléments numérotés. Les numéros doivent être consécutifs.
Rmq: une solution consiste à utiliser mapPartitionsWithIndex pour connaitre la taille des partitions et parcourir une partition pour affecter les numéros consécutifs à chaque élément.
Pour les données de yago utilisées précédemment, déterminer les chemins partant des sujets qui vivent (<livesIn>) en 'France'. Un chemin doit être sans circuit (ou sans cycle : ne jamais repasser sur le même sujet). Combien y a -t-il de chemin de longueur 2, 3 ,4 ? Quelle est la longueur maximale ? Montrer que votre solution est efficace. Expliquer ce que vous avez mis en oeuvre pour apporter plus d'efficacité.
Lien vers le sujet du TME de 2016: du Ancien TME 2016: Jointure parallèle (on ne demande pas de faire ce sujet)