L'objectif de ce TME est de comprendre les différentes façons de traiter une jointure dans un environnement d'exécution parallèle et réparti.
Pour 2017: Aller vers le TME Données réparties et jointure parallèle
Editer le fichier de TME
cd <répertoire de travail> # aller dans votre répertoire de tme cp /Infos/bd/spark/tme-dataset-etudiant.scala . emacs tme-dataset-etudiant.scala &
source /Infos/bd/spark-config
Vérifier l'alias pour le spark-shell
alias spark-shell
Cela doit afficher : alias spark-shell='spark-shell –driver-memory 10G –properties-file /Infos/bd/spark/prop.txt' Puis lancer le spark-shell
spark-shell
seulement si l'étape précédente n'a pas été faite, lancer spark-shell en indiquant le fichier contenant la configuration des propriétés concernant ce TME.
spark-shell --driver-memory 8G --properties-file /Infos/bd/spark/prop.txt
Puis quand l'invite scala> est prête :
sc.setLogLevel("ERROR")
Vérifier les valeurs des paramètres configurés pour ce TME:
sc.getConf.getOption("spark.sql.shuffle.partitions")
Cela signifie que les données seront réparties sur 8 partitions lors d'un shuffle.
sc.getConf.getOption("spark.sql.autoBroadcastJoinThreshold")
La valeur retournée -1
signifie que toutes les jointures se font par hachage (aucune jointure par broadcast).
Ouvrir l'interface Web dans le navigateur local http://localhost:4040
On manipule les données avec l'API Spark en Scala. Dans votre navigateur, ouvrir 2 onglets pour l'API des RDD et celle des DataSet
En fin de TME, pour quitter Spark, saisir
:q
On utilise les données de YAGO (voir Jeux de données) que vous avez déjà utilisées dans le [TME II-2] Algèbre Spark: Dataset. Rappel :
cd /tmp/BDLE/dataset # ce dossier aura été préalablement créé. tar zxvf /Infos/bd/spark/dataset/yago/yagoFacts5M.tgz cd -
Données réparties sur plusieurs machines Pour faciliter le bon déroulement du TME, chaque étudiant utilise une seule machine contenant toutes les données d'exemple. Par la suite on considérera le cas plus général où les fragments sont répartis sur plusieurs machines. Tous les exercices de ce TME s'appliquent de la même façon sur une ou plusieurs machines.
Pour observer plus facilement le traitement parallèle d'une jointure entre deux RDD, on fixe le degré de parallélisme de la jointure à 5.
val j = USERSRDD.join(RATINGSRDD, 5) j.count
La jointure (appelée job) est traitée en trois étapes (appelées stage) :
Etape 1 : Grouper les USERSRDD selon l'attribut de jointure numU pour former 5 plages de users.
Etape 2 : Grouper les RATINGSRDD selon l'attribut de jointure numU pour former 5 plages de ratings.
Les étapes 1 et 2 sont effectuées pour chaque fragment de USERSRDD et RATINGSRDD. Une tâche traite un fragment. Il y a donc 10 tâches pour l'étape 1 et 20 tâches pour l'étape 2. Le résultat des étapes 1 et 2 est appelé shuffle write. Consulter l'interface web : quelle est la taille totale du shuffle write ? Quelle est la taille moyenne du shuffle write produit par une tâche ? Quelle est la durée moyenne d'une tâche ?
Etape 3 : Jointure entre le résultat de l'étape 1 et celui de l'étape 2. La jointure est calculée indépendamment pour chacune des 5 plages. Cela fait donc 5 tâches de jointure indépendantes qui peuvent être traitées en parallèle. Par exemple, si l'utilisateur 'Alice' est dans la première plage de USERSRDD, alors les notes d'Alice sont dans la première plage de RATINGSRDD.
L'accès au contenu d'une plage pour calculer la jointure s'appelle shuffle read. Le terme shuffle correspond au transfert nécessaire pour rassembler le contenu d'une plage sur une machine. Consulter l'interface web : vérifier que la taille du shuffle read est égale à la somme des shuffle write? Quelle est la durée moyenne d'une tâche de jointure ?
Lorsque les données sont déjà fragmentées selon l'attribut de jointure, on peut traiter la jointure directement, sans aucun transfert de données.
Traiter cet exercice avec des données au format DataSet. Le nombre de partitions pour un dataset est fixé à 200 par défaut.
Répartir les données selon la clé numU. On crée 5 partitions pour users et autant pour ratings.
import org.apache.spark.HashPartitioner val U = USERSRDD.partitionBy(new HashPartitioner(5)) U.setName("Users").persist() U.count val R = RATINGSRDD.partitionBy(new HashPartitioner(5)) R.setName("Ratings").persist() R.count
Vérifier que les données sont bien partitionnées : dans l'onglet Storage, cliquer sur la RDD nommée Ratings, et voir ses 5 fragments.
Puisque les données sont bien partitionnées, l'accès aux notes d'un utilisateur précis (par exemple l'utilisateur numéro 1234) nécessite de lire un seul fragment.
R.lookup(1234).take(10)
Vérifier dans le navigateur que le job lookup, a une seule étape (stage) lookup contenant une seule tâche qui lit un seul fragment.
A titre de comparaison, l'accès aux notes de l'utilisateur 1234 contenues dans RATINGSRDD (fragmenté sans critère spécifique) nécessite de lire tous les fragments
RATINGSRDD.lookup(1234).take(10)
Combien de tâches sont nécessaires pour ce lookup ?
Calculer la jointure, sans préciser le nombre de tâches à traiter en parallèle
val j2 = R.join(U) j2.count
Pourquoi est-ce qu'une seule étape (stage) suffit pour calculer cette jointure ? Combien de tâches y a-t-il pour l'étape de jointure ? Vérifier qu'il n'y a aucun shuffle (ni shuffle write ni shuffle read).
Cette méthode est généralement plus rapide dans le cas d'une jointure entre une 'grande' collection et une 'petite' collection.
Dans notre exemple, la taille des données de USERS est petite et peut tenir en mémoire sur chaque machine chargée de traiter la jointure.
On suppose que pour chaque note dans Ratings il existe exactement un utilisateur dans Users. C'est le cas simple d'une jointure entre une clé étrangère et une clé primaire. Par exemple, afficher les paires de films (F1,F2) pour lesquelles un utilisateur a mis la note 1 à F1 et 5 à F2.
// on diffuse les USERS sur chaque machine. val UsersDiffusion = sc.broadcast(USERSRDD.collect.toMap) // N représente un triplet (movie,rating,date) val jointureParDiffusion = RATINGSRDD.map{ case( user, N) => (user, N, UsersDiffusion.value(user)) } jointureParDiffusion.count
Vérifier dans le navigateur que le job de la jointure a une seule étape (stage). En combien de tâches indépendante est décomposée cette étape ? Contrairement à la jointure avec l'opérateur join() étudiée ci-dessus, on ne fixe pas le degré de parallélisme, pourquoi ?
On considère le sous ensemble des utilisateurs ayant 18 ans :
val U18 = USERSRDD.filter{ case(numU, (genre, age, profession, ville)) => age == 18}
Certaines notes dans RATINGS ne joignent pas avec U18. Proposer une solution de jointure par diffusion qui tienne compte de ce cas :
val U18Diffusion = sc.broadcast(U18.collect.toMap) val j18 = RATINGSRDD.flatMap{ case(numU, note) => { U18Diffusion.value.get(numU) match { case Some(user) => Seq((numU,note,user)) case None => Seq() } } } j18.count
Proposer une solution encore plus générale capable de calculer la jointure entre 2 attributs non uniques (par exemple 2 clés étrangères). Une valeur de l'attribut de jointure peut correspondre à 0 ou plusieurs éléments dans chacune des 2 collections à joindre.
Lire l'article qui présente la structure de donnés appelée DataFrame. Spark SQL: Relational Data Processing in Spark, publié à SIGMOD 2015,
Plus généralement, on considère la structure de données appellée Dataset représentant une collection typée d'objets. Tous les objets dans un Dataset[T] sont de type T. Un DataFrame est simplement un DataSet contenant des nuplets de type Row.
A partir des exemples illustrant les exercices précédents, étudier les possibilités de jointure avec l'API DataFrame. Existe-il plusieurs façon de calculer une jointure entre deux DataSets? Mettre en évidence le cas d'une jointure écrite en SQL et pour laquelle l'optimiseur décide d'exécuter une jointure par diffusion.
Voir diapo 33: Calculer la similarité entre 2 utilisateurs de MovieLens.
def sim(l1: Iterable[Int], l2: Iterable[Int]): Double = { return ... }
Les méthodes pouvant servir sont toList
(pour convertir un Iterable en List), l'intersection intersect
, et l'union union
de deux listes, toDouble
pour convertir un nombre entier en réel.
Ne pas mettre de tabulation dans le corps de la fonction sim.
Définir la fonction simPearson (voir diapo) qui tient compte des notes attribuées aux films.
La fonction de similarité étant symétrique, proposer une solution pour réduire le coût des transferts. Peut-on éviter de diffuser toutes les données sur toutes les machines ?