Bases de Données / Databases

Site Web de l'équipe BD du LIP6 / LIP6 DB Web Site

Outils pour utilisateurs

Outils du site


site:enseignement:master:bdle:tmes:tmejointure2016

Ancien TME 2016: Jointure parallèle

L'objectif de ce TME est de comprendre les différentes façons de traiter une jointure dans un environnement d'exécution parallèle et réparti.

Pour 2017: Aller vers le TME Données réparties et jointure parallèle

Préparation

Editer le fichier de TME

cd <répertoire de travail> # aller dans votre répertoire de tme
cp  /Infos/bd/spark/tme-dataset-etudiant.scala .
emacs tme-dataset-etudiant.scala &

Configurer l'environnement

source /Infos/bd/spark-config 

Vérifier l'alias pour le spark-shell

alias spark-shell

Cela doit afficher : alias spark-shell='spark-shell –driver-memory 10G –properties-file /Infos/bd/spark/prop.txt' Puis lancer le spark-shell

spark-shell

seulement si l'étape précédente n'a pas été faite, lancer spark-shell en indiquant le fichier contenant la configuration des propriétés concernant ce TME.

spark-shell --driver-memory 8G --properties-file /Infos/bd/spark/prop.txt

Puis quand l'invite scala> est prête :

sc.setLogLevel("ERROR")

Vérifier les valeurs des paramètres configurés pour ce TME:

sc.getConf.getOption("spark.sql.shuffle.partitions")

Cela signifie que les données seront réparties sur 8 partitions lors d'un shuffle.

sc.getConf.getOption("spark.sql.autoBroadcastJoinThreshold")

La valeur retournée -1 signifie que toutes les jointures se font par hachage (aucune jointure par broadcast).

Ouvrir l'interface Web dans le navigateur local http://localhost:4040

On manipule les données avec l'API Spark en Scala. Dans votre navigateur, ouvrir 2 onglets pour l'API des RDD et celle des DataSet

En fin de TME, pour quitter Spark, saisir

:q

Données

On utilise les données de YAGO (voir Jeux de données) que vous avez déjà utilisées dans le [TME II-2] Algèbre Spark: Dataset. Rappel :

cd /tmp/BDLE/dataset  # ce dossier aura été préalablement créé.
tar zxvf /Infos/bd/spark/dataset/yago/yagoFacts5M.tgz
cd -

Données réparties sur plusieurs machines Pour faciliter le bon déroulement du TME, chaque étudiant utilise une seule machine contenant toutes les données d'exemple. Par la suite on considérera le cas plus général où les fragments sont répartis sur plusieurs machines. Tous les exercices de ce TME s'appliquent de la même façon sur une ou plusieurs machines.

Exercice 1 : Equi-jointure parallèle et répartie

2015: jointure de RDD

Pour observer plus facilement le traitement parallèle d'une jointure entre deux RDD, on fixe le degré de parallélisme de la jointure à 5.

val j = USERSRDD.join(RATINGSRDD,  5)
j.count

La jointure (appelée job) est traitée en trois étapes (appelées stage) :

Etape 1 : Grouper les USERSRDD selon l'attribut de jointure numU pour former 5 plages de users.

Etape 2 : Grouper les RATINGSRDD selon l'attribut de jointure numU pour former 5 plages de ratings.

Les étapes 1 et 2 sont effectuées pour chaque fragment de USERSRDD et RATINGSRDD. Une tâche traite un fragment. Il y a donc 10 tâches pour l'étape 1 et 20 tâches pour l'étape 2. Le résultat des étapes 1 et 2 est appelé shuffle write. Consulter l'interface web : quelle est la taille totale du shuffle write ? Quelle est la taille moyenne du shuffle write produit par une tâche ? Quelle est la durée moyenne d'une tâche ?

Etape 3 : Jointure entre le résultat de l'étape 1 et celui de l'étape 2. La jointure est calculée indépendamment pour chacune des 5 plages. Cela fait donc 5 tâches de jointure indépendantes qui peuvent être traitées en parallèle. Par exemple, si l'utilisateur 'Alice' est dans la première plage de USERSRDD, alors les notes d'Alice sont dans la première plage de RATINGSRDD.

L'accès au contenu d'une plage pour calculer la jointure s'appelle shuffle read. Le terme shuffle correspond au transfert nécessaire pour rassembler le contenu d'une plage sur une machine. Consulter l'interface web : vérifier que la taille du shuffle read est égale à la somme des shuffle write? Quelle est la durée moyenne d'une tâche de jointure ?

Exercice 2 : Equi-jointure parallèle entre des données déjà fragmentées

Lorsque les données sont déjà fragmentées selon l'attribut de jointure, on peut traiter la jointure directement, sans aucun transfert de données.

2016

Traiter cet exercice avec des données au format DataSet. Le nombre de partitions pour un dataset est fixé à 200 par défaut.

2015

Répartir les données selon la clé numU. On crée 5 partitions pour users et autant pour ratings.

import org.apache.spark.HashPartitioner
val U = USERSRDD.partitionBy(new HashPartitioner(5))
U.setName("Users").persist()
U.count
 
val R = RATINGSRDD.partitionBy(new HashPartitioner(5))
R.setName("Ratings").persist()
R.count

Vérifier que les données sont bien partitionnées : dans l'onglet Storage, cliquer sur la RDD nommée Ratings, et voir ses 5 fragments.

Puisque les données sont bien partitionnées, l'accès aux notes d'un utilisateur précis (par exemple l'utilisateur numéro 1234) nécessite de lire un seul fragment.

R.lookup(1234).take(10)

Vérifier dans le navigateur que le job lookup, a une seule étape (stage) lookup contenant une seule tâche qui lit un seul fragment.

A titre de comparaison, l'accès aux notes de l'utilisateur 1234 contenues dans RATINGSRDD (fragmenté sans critère spécifique) nécessite de lire tous les fragments

RATINGSRDD.lookup(1234).take(10)

Combien de tâches sont nécessaires pour ce lookup ?

Jointure

Calculer la jointure, sans préciser le nombre de tâches à traiter en parallèle

val j2 = R.join(U)
j2.count

Pourquoi est-ce qu'une seule étape (stage) suffit pour calculer cette jointure ? Combien de tâches y a-t-il pour l'étape de jointure ? Vérifier qu'il n'y a aucun shuffle (ni shuffle write ni shuffle read).

plan d'execution d'une jointure

Exercice 3 : Equi-jointure par broadcast join

Cette méthode est généralement plus rapide dans le cas d'une jointure entre une 'grande' collection et une 'petite' collection.

Dans notre exemple, la taille des données de USERS est petite et peut tenir en mémoire sur chaque machine chargée de traiter la jointure.

Equi-jointure sur clé

On suppose que pour chaque note dans Ratings il existe exactement un utilisateur dans Users. C'est le cas simple d'une jointure entre une clé étrangère et une clé primaire. Par exemple, afficher les paires de films (F1,F2) pour lesquelles un utilisateur a mis la note 1 à F1 et 5 à F2.

// on diffuse les USERS sur chaque machine. 
val UsersDiffusion = sc.broadcast(USERSRDD.collect.toMap)
 
// N représente un triplet (movie,rating,date)
val jointureParDiffusion = RATINGSRDD.map{ case( user, N) => (user, N, UsersDiffusion.value(user)) }
 
jointureParDiffusion.count

Vérifier dans le navigateur que le job de la jointure a une seule étape (stage). En combien de tâches indépendante est décomposée cette étape ? Contrairement à la jointure avec l'opérateur join() étudiée ci-dessus, on ne fixe pas le degré de parallélisme, pourquoi ?

Equi-jointure lorsque les attribut n'ont pas le même domaine

On considère le sous ensemble des utilisateurs ayant 18 ans :

val U18 = USERSRDD.filter{ case(numU, (genre, age, profession, ville)) => age == 18}

Certaines notes dans RATINGS ne joignent pas avec U18. Proposer une solution de jointure par diffusion qui tienne compte de ce cas :

val U18Diffusion = sc.broadcast(U18.collect.toMap)
val j18 = RATINGSRDD.flatMap{ case(numU, note) => {
  U18Diffusion.value.get(numU) match {
    case Some(user) => Seq((numU,note,user))
    case None => Seq()
  }
}
}
 
j18.count

Equi-jointure entre des clés étrangères

Proposer une solution encore plus générale capable de calculer la jointure entre 2 attributs non uniques (par exemple 2 clés étrangères). Une valeur de l'attribut de jointure peut correspondre à 0 ou plusieurs éléments dans chacune des 2 collections à joindre.

Exercice 4 : Jointure entre deux Datasets

Lire l'article qui présente la structure de donnés appelée DataFrame. Spark SQL: Relational Data Processing in Spark, publié à SIGMOD 2015,

Plus généralement, on considère la structure de données appellée Dataset représentant une collection typée d'objets. Tous les objets dans un Dataset[T] sont de type T. Un DataFrame est simplement un DataSet contenant des nuplets de type Row.

A partir des exemples illustrant les exercices précédents, étudier les possibilités de jointure avec l'API DataFrame. Existe-il plusieurs façon de calculer une jointure entre deux DataSets? Mettre en évidence le cas d'une jointure écrite en SQL et pour laquelle l'optimiseur décide d'exécuter une jointure par diffusion.

Exercice 5 : Produit Cartesien

Voir diapo 33: Calculer la similarité entre 2 utilisateurs de MovieLens.

  • Définir une fonction qui calcule la similarité entre 2 listes de films : sim(x, y) = cardinal de l'intersection / cardinal de l'union
def sim(l1: Iterable[Int], l2: Iterable[Int]): Double = {
return ...
}

Les méthodes pouvant servir sont toList (pour convertir un Iterable en List), l'intersection intersect, et l'union union de deux listes, toDouble pour convertir un nombre entier en réel. Ne pas mettre de tabulation dans le corps de la fonction sim.

  • Effectuer le produit cartesien entre les utilisateurs et eux mêmes, puis invoquer la méthode sim sur chaque paire produite

Définir la fonction simPearson (voir diapo) qui tient compte des notes attribuées aux films.

La fonction de similarité étant symétrique, proposer une solution pour réduire le coût des transferts. Peut-on éviter de diffuser toutes les données sur toutes les machines ?

site/enseignement/master/bdle/tmes/tmejointure2016.txt · Dernière modification: 18/12/2017 16:02 par hubert