Bases de Données / Databases

Site Web de l'équipe BD du LIP6 / LIP6 DB Web Site

Outils pour utilisateurs

Outils du site


site:enseignement:master:bdle:tmes:tmejointure

TME Données réparties et jointure parallèle

L'objectif de ce TME est de comprendre les différentes façons de traiter une jointure dans un environnement d'exécution parallèle et réparti.

Préparation

On manipule les données avec l'API Spark en Scala. Dans votre navigateur, ouvrir 2 onglets pour l'API des RDD et celle des DataSet

Quand votre programme spark s'exécutera, vous pourrez ouvrir l'interface Web dans le navigateur local http://localhost:4040

Configurer l'environnement et doc des API

source /Infos/bd/spark-config 
idea.sh &

Dans IDEA : importer le fichier PartitionEtudiant.scala en tant que script scala dans votre projet courant. Exemple si votre projet s'appelle BDLE :

cd
cp  /Infos/bd/spark/PartitionEtudiant.scala IdeaProjects/BDLE/src/main/scala

Classes et Fonctions auxiliaires

case class Triplet(sujet: String, prop: String, objet: String)
 
// Extraire les 10 premiers éléments de chaque partition
// -----------------------------------------------------
def entete(numP: Int, iter: Iterator[Any]): Iterator[String] = {
  val s = new StringBuffer()
  s.append("Contenu de la partition " + numP + " \n")  
  var i = 0
  while(iter.hasNext && i < 10) {
    i+=1
    s.append(iter.next + "\n")
  }
  if(iter.hasNext) s.append( "... \n")
  return List(s.toString()).iterator
}
 
 def affichePartitions[T: ClassTag](d: Dataset[T]): Unit = d.rdd.mapPartitionsWithIndex(entete).collect.foreach(println)
 
 
// Déterminer le nombre d'éléments contenus dans chaque partition
// ---------------------------------------------------------------
def nbObjet(numP: Int, iter: Iterator[Any]): Iterator[(String)] = {
  var nb = 0
  for(t <- iter) { nb += 1 }
  val res = ("La partition " + numP + " a " + nb + " éléments")
  return List(res).iterator
}
 
 
// Supprimer les collections persistant en mémoire
  def clearStorage() = {
    spark.catalog.clearCache()
  }

Données

On utilise les données de YAGO du fichier yagoMelange.json. Consulter un extrait des données :

ls -lh /Infos/bd/spark/dataset/yago/yagoMelange*.json
head -n 20 /Infos/bd/spark/dataset/yago/yagoMelange.json 
head -n 20 /Infos/bd/spark/dataset/yago/yagoMelange1M.json 

Données réparties sur plusieurs machines Pour faciliter le bon déroulement du TME, chaque étudiant utilise une seule machine contenant toutes les données d'exemple. Par la suite on considérera le cas plus général où les fragments sont répartis sur plusieurs machines. Tous les exercices de ce TME s'appliquent de la même façon sur une ou plusieurs machines.

Lire les données :

  val yago = spark.read.json("/Infos/bd/spark/dataset/yago/yagoMelange1M.json").
    select("sujet", "prop", "objet").as[Triplet]
 
  yago.show(20, false)

Exercice 1 : Répartition des données

Exemple 1

// Logement: une personne vit dans une ou plusieurs villes
 
val a = List(
"Alice vit Paris",
"Dan vit Nice",
"Bob vit Lyon",
"Alice vit Aix",
"Claire vit Aix",
"Dan vit Paris"
)

Répartir les données dans 4 partitions :

Répartition par découpage suivant l'ordre des données initiales (ressemble à un round robin)

val vit = spark.sparkContext.parallelize(a, 4).map(s=> s.split(" ")).map(t => Triplet(t(0), t(1), t(2)))
 
// afficher les partitions
vit.mapPartitionsWithIndex(entete).collect.foreach(println)

Contenu des partitions :

Contenu de la partition 0
Triplet(Alice,vit,Paris)
 
Contenu de la partition 1
Triplet(Dan,vit,Nice)
Triplet(Bob,vit,Lyon)
 
Contenu de la partition 2
Triplet(Alice,vit,Aix)
 
Contenu de la partition 3
Triplet(Claire,vit,Aix)
Triplet(Dan,vit,Paris)

Répartition par hachage

Répartition par sujet (i.e. par personne)

val vitParPersonne = vit.toDS().repartition(4, col("sujet"))
 
// afficher les partitions
vitParPersonne.rdd.mapPartitionsWithIndex(entete).collect.foreach(println)

Contenu des partitions :

partition 0 :
Triplet(Alice,vit,Paris)
Triplet(Dan,vit,Nice)
Triplet(Alice,vit,Aix)
Triplet(Dan,vit,Paris)
 
partition 1 :
Triplet(Bob,vit,Lyon)
 
partition 2 :
Triplet(Claire,vit,Aix)
 
partition 3 : vide

Répartition par objet (i.e. par ville)

val vitParVille = vit.toDS().repartition(4, col("objet"))
 
// afficher les partitions
vitParVille.rdd.mapPartitionsWithIndex(entete).collect.foreach(println)

Contenu des partitions :

partition 0 :
Triplet(Dan,vit,Nice)
 
partition 1 : vide
 
partition 2 :
Triplet(Bob,vit,Lyon)
 
partition 3 :
Triplet(Alice,vit,Paris)
Triplet(Alice,vit,Aix)
Triplet(Claire,vit,Aix)
Triplet(Dan,vit,Paris)

Répartition par prop (ici il n'y a qu'une seule valeur de propriété: “vit”)

val f = vit.toDS().repartition(4, col("prop"))
 
// afficher les partitions
f.rdd.mapPartitionsWithIndex(entete).collect.foreach(println)

Contenu des partitions :

tous les triplets dans la partition 2

Répartition sans préciser de clé de partitionnement:

val g = vit.toDS().repartition(4)
 
// afficher les partitions
g.rdd.mapPartitionsWithIndex(entete).collect.foreach(println)

Contenu des partitions :

partition 0 :
Triplet(Bob,vit,Lyon)
Triplet(Dan,vit,Paris)
 
partition 1 : vide
 
partition 2 : vide
 
partition 3 :
Triplet(Alice,vit,Paris)
Triplet(Dan,vit,Nice)
Triplet(Alice,vit,Aix)
Triplet(Claire,vit,Aix)

TRI et répartition par intervalle

val t1 = vit.toDS().orderBy("sujet")
 
// afficher les partitions
t1.rdd.mapPartitionsWithIndex(entete).collect.foreach(println)

Contenu des partitions :

partition 0 :
Triplet(Alice,vit,Paris)
Triplet(Alice,vit,Aix)
 
partition 1 : 
Triplet(Bob,vit,Lyon)
 
partition 2 :
Triplet(Claire,vit,Aix)
 
partition 3 :
Triplet(Dan,vit,Nice)
Triplet(Dan,vit,Paris)
 
partition 4 vide
val t2 = vit.toDS().orderBy("objet")
 
// afficher les partitions
t2.rdd.mapPartitionsWithIndex(entete).collect.foreach(println)

Contenu des partitions :

partition 0 :
Triplet(Alice,vit,Aix)
Triplet(Claire,vit,Aix)
 
partition 1 :
Triplet(Bob,vit,Lyon)
 
partition 2 :
Triplet(Dan,vit,Nice)
 
partition 3 :
Triplet(Alice,vit,Paris)
Triplet(Dan,vit,Paris)
 
partition 4: vide

Exemple 2

JOINTURE ET PARTITIONNEMENT

Sport: une personne pratique un ou plusieurs sports

val sport1 = List(
"Alice fait vélo",
"Claire fait surf",
"Claire fait vélo",
"Alice fait natation",
"Claire fait moto",
"Dan fait foot"
)
 
val sport2 = spark.sparkContext.parallelize(sport1, 4).map(s=> s.split(" ")).map(t => Triplet(t(0), t(1), t(2)))
 
// afficher les partitions
sport2.mapPartitionsWithIndex(entete).collect.foreach(println)

JOINTURE sur des données partitionnées en round robin

//réinitialisation: vider le stockage persistent
sc.getPersistentRDDs.values.foreach(x => x.unpersist())
 
val VIT = vit
VIT.persist.count
 
val SPORT = sport2
SPORT.setName("SPORT").persist.count
 
val J1 = VIT.toDS().join(SPORT.toDS(), "sujet")
 
J1.rdd.mapPartitionsWithIndex(entete).collect.foreach(println)

JOINTURE sur des données partitionnées sur le sujet

// réinitialisation: vider le stockage persistent
cleanStorage()
 
val VIT = vitParPersonne
VIT.persist.count
// afficher les partitions
vitParPersonne.rdd.mapPartitionsWithIndex(entete).collect.foreach(println)
 
//répartition du Sport par sujet (i.e. par personne)
val sportParPersonne = sport2.toDS().repartition(4, col("sujet"))
 
// afficher les partitions
sportParPersonne.rdd.mapPartitionsWithIndex(entete).collect.foreach(println)
 
val SPORT = sportParPersonne
SPORT.persist.count
 
val J2 = VIT.join(SPORT, "sujet")
 
J2.rdd.mapPartitionsWithIndex(entete).collect.foreach(println)

Exercice 2 : Equi-jointure parallèle et répartie

Le but est d'observer et comprendre l'évaluation d'une requête en fonction du partitionnement des données. Evaluation d'une jointure lorsque les données sont partitionnées sur l'attribut (ou les attributs) d'équi-jointure. Inversement, évaluation d'une jointure lorsque le partitionnement des données diffère des attributs de jointure.

Partitionner les données YAGO

On partitionne les triplets du dataset YAGO de 3 façons différentes: dans l'ordre du initial fichier, par sujet et par objet.

val yagoFile = "/Infos/bd/spark/dataset/yago/yagoMelange1M.json"
val yagoInitial = // à compléter ...
yagoInitial.persist.count
 
val yagoParSujet =  yagoInitial.repartition(8, col("sujet"))
yagoParSujet.persist.count
 
val yagoParObjet =  yagoInitial.repartition(8, col("objet"))
yagoParObjet.persist.count

Afficher un extrait des données pour les sprinter (les personnes dont le sujet contient sprinter) :

  val yagoSprinter = yago.where("sujet like '%(sprinter)%'")
  affichePartitions(yagoSprinter)

Est-ce que tous les triplets au sujet du sprinter Percy_Williams sont dans la même partition ? Combien de partitions faut-il lire pour trouver le genre et les nationalités de Percy_Williams ?

  val percy = yago.where("sujet = '<Percy_Williams_(sprinter)>' ")
  affichePartitions(percy)

Remarque concernant le renommage d'attributs dans une requête de jointure. Lorsque les données sont partitionnées sur l'attribut de jointure, Spark tient compte du partitionnement seulement si l'attribut de jointure a le même nom que l'attribut de partitionnement (mentionné dans la méthode repartition ). Par exemple pour yagoParSujet, la jointure sur le sujet doit se faire sur l'attribut nommé 'sujet'. Si le nom de l'attribut diffère, cela provoque des shuffle inutiles.

Requête R1

Soit la requête R1 : Afficher la résidence et la nationalité des personnes

?x <livesIn> ?y .
?x <isCitizenOf> ?z

a) Vérifier que la requête s'exécute sans aucun transfert lorsque les données sont partitionnées par sujet (yagoParSujet). Combien d'étape (appellée stage) sont nécesaires pour évaluer cette requête ?

val t1 = yagoParSujet.where("prop = '<livesIn>'").
  select(col("sujet"), col("objet") as "residence").
 
val t2 = .... A compléter
 
val R1 = t1.join(t2,"sujet")
R1.count() // pour traiter entièrement la requête
//R1.show(10)

Puis consulter l'interface de spark: → dernier job, voir les stages et lire le nombre de shuffle read

Réponse

val t1 = yagoParSujet.where("prop = '<livesIn>'").
  select(col("sujet"), col("objet") as "residence").
 
val t2 = yagoParSujet.where("prop = '<isCitizenOf>'").
  select(col("sujet"), col("objet") as "nationalite").
 
val R1 = t1.join(t2,"sujet")
R1.count()
R1.show(10)

b) Mesurer la taille en MB des données transférées (shuffle read) pour évaluer cette requête sur les données yagoParObjet et yagoInitial.

val t1 = yagoParObjet.where...select...
 
val t2 = yagoParObjet....
 
val R1 = t1.join(t2,"sujet")
R1.count()

Réponse

// PAR OBJET
 
val t1 = yagoParObjet.where("prop = '<livesIn>'").
  select(col("sujet"), col("objet") as "residence")
val t2 = yagoParObjet.where("prop = '<isCitizenOf>'").
  select(col("sujet"), col("objet") as "nationalite")
val R1 = t1.join(t2,"sujet")
// Le show n'execute pas la requête entièrement : il ne permet pas de voir la quantité totale de shuffle read 
// R1.show(10)
// Le count permet de voir la quantité totale de shuffle read nécesaire pour traiter la requête en entier
R1.count

SHUFFLE READ = 2.3 MB

Requête R2

Mêmes questions pour la requête R2

?x <livesIn> ?y . ?y <isLocatedIn> ?z

Requête R3

Soit la requête R3 (en forme de flocon):

?x <influences> ?y  . ?y <livesIn> ?v2 .
?x <livesIn> ?v1    . ?y <isCitizenOf> ?v3

Ecrire 3 solutions équivalentes pour traiter cette requête en faisant varier l'ordre des jointures. Exemples d'ordres de jointure :

  • T1 : jointutre entre ?x <influences> ?y et ?x <livesIn> ?v1
  • T2: jointutre entre y <livesIn> ?v2 et ?y <isCitizenOf> ?v3
  • puis jointure entre T1 et T2

Autre ordre possible :

  • Jointure entre ?x <influences> ?y et y <livesIn> ?v2
  • puis jointure avec ?x <livesIn> ?v1
  • puis jointure avec ?y <isCitizenOf> ?v3

etc …

Pour les données partitionnées par sujet (yagoParSujet) quelle solution minimise les transferts?

Rmq: faire un R3.count() exécuter la requête entièrement et connaitre la quantité totale de données transférées (shuffle read).

Requête R4

Soit la requête R4 (en forme de triangle):

?x <influences> ?y . ?y <livesIn> ?z .
?x <livesIn> ?z

Ecrire plusieurs solutions équivalentes pour traiter cette requête en faisant varier l'ordre des jointures. Pour les données partitionnées par sujet (yagoParSujet) quelle ordre minimise les transferts?

Exercice 3 : Equi-jointure par broadcast join

Cette méthode est généralement plus rapide dans le cas d'une jointure entre une petite collection et une grande collection.

On considère une jointure entre T1 et T2. On suppose que la taille des données de T1 est petite par rapport à T2 (T1 « T2) et peut tenir entièrement en mémoire sur chaque machine qui évalue la jointure.

Equi-jointure sur clé

On suppose que T1 contient des couples (s1,o1), et que chaque s1 est associé à un seul o1. T1 peut être converti en un dictionnaire (c'est à dire une Map s1 → o1) permettant d'obtenir l'attribut o1 asocié à s1.

//convertir T1 en ensemble de paires (clé, valeur) puis en un dictionnaire
val dictionnaire = T1.as[(String, String)].collect.toMap
 
// on diffuse (ou broadcast) T1 sur chaque machine. 
val dicoReplique = sc.broadcast(dictionnaire)
 
// equi jointure entre T2 et T1 sur le sujet (s1=s2)
val jointureParBroadcast = T2.map{ case(s2,o2) => (s2, o2, dicoReplique.value(s2)) }
 
jointureParBroadcast.take(10)

Vérifier dans le navigateur que le job de la jointure a une seule étape (stage). En combien de tâches indépendantes est décomposée cette étape ?

Equi-jointure lorsque les attributs n'ont pas le même domaine

Soit la requête R5

?x <isLocatedIn> ?y . ?y <hasCapital> ?z 

La grande relation est T2: les triplets <isLocatedIn> précisent (entre autres) le pays d'une ville.

La petite relation est T1: les triplets <hasCapital> précisent la capitale d'un pays

val T1 = yagoParSujet.where("prop = '<hasCapital>'").
  select(col("sujet") as "y", col("objet") as "z").as[(String, String)]
 
val T2 = ...

La requête affiche les personnes avec leur ville de résidence et le pays associé. Certaines données de t2 ne joignent pas avec t1: si aucun pays n'est associé avec la ville de résidence alors ne rien afficher.

Pour les données partitionnées par sujet, proposer une solution de jointure par boradcast qui est évaluée sans aucun shuffle.

val dicoReplique = sc.broadcast(T1.collect.toMap)
val J = T2.flatMap{ case(s2,o2) => {
  dicoReplique.value.get(o2) match {
    case Some(o1) => Seq((s2,o2, o1))
    case None => Seq()
  }
}
}
 
J.take(10)

Généraliser la solution par broadcast pour une equi-jointure où o2 peut être associé à plusieurs o1 (exple un pays associé à plusieurs capitales).

Vérifier que votre solution est correcte en comparant son résultat avec celui de la jointure par broadcast implémentée nativement:

val J = T2.join(T1.hint("broadcast"), "attr") // où attr est l'attribut d'equi-jointure

Reprendre les requêtes R3 et R4 de l'exercice précédent et proposer une nouvelle solution qui peut utiliser les 2 algorithmes de jointures (répartie ou par broadcast) pour réduire la quantité de shuffle read.

Exercice 4 : Produit Cartésien

Voir diapo du cours : Calculer la similarité entre 2 utilisateurs de MovieLens ou 2 individus de Yago

  • Définir une fonction qui calcule la similarité de Jaccard entre 2 listes d'objets : jaccard(x, y) = cardinal de l'intersection / cardinal de l'union
def jaccard(l1: Iterable[Int], l2: Iterable[Int]): Double = {
return ...
}

Les méthodes pouvant servir sont toList (pour convertir un Iterable en List), l'intersection intersect, et l'union union de deux listes, toDouble pour convertir un nombre entier en réel. Ne pas saisir de tabulation dans le corps d'une fonction.

  • Effectuer le produit cartesien entre les utilisateurs et eux mêmes, puis invoquer la fonction jaccard sur chaque paire produite.

La fonction de similarité étant symétrique, proposer une solution pour réduire le coût des transferts. Peut-on éviter de diffuser toutes les données sur toutes les machines ?

Définir la fonction simPearson (voir diapo) qui tient compte des notes attribuées aux films.

Exercice 5 : Traitement itératif par partition et par groupe d'objets

La methode zipWithIndex (numérotant les éléments d'une collection) existe pour un RDD mais pas pour un Dataset. Ecrire la fonction numeroration[T](d: Dataset[T]) qui retourne un Dataset avec des éléments numérotés. Les numéros doivent être consécutifs. Rmq: une solution consiste à utiliser mapPartitionsWithIndex pour connaitre la taille des partitions et parcourir une partition pour affecter les numéros consécutifs à chaque élément.

Exercice 6 : Chemin le plus long

Pour les données de yago utilisées précédemment, déterminer les chemins partant des sujets qui vivent (<livesIn>) en 'France'. Un chemin doit être sans circuit (ou sans cycle : ne jamais repasser sur le même sujet). Combien y a -t-il de chemin de longueur 2, 3 ,4 ? Quelle est la longueur maximale ? Montrer que votre solution est efficace. Expliquer ce que vous avez mis en oeuvre pour apporter plus d'efficacité.

Divers

Lien vers le sujet du TME de 2016: du Ancien TME 2016: Jointure parallèle (on ne demande pas de faire ce sujet)

site/enseignement/master/bdle/tmes/tmejointure.txt · Dernière modification: 14/12/2018 16:18 par hubert