Table des matières

[TME II-1] Introduction à Spark (Algèbre RDD)

Remarque générale : Le cours ne peut être self-contained –> consulter la documentation en ligne de Spark.

Pour l'aide sur l'utilisation de Spark voir ici

Exercice 1

Copier le fichier

 /Infos/bd/spark/bdle/2015/data/wordcount.txt.bz2

sur votre espace personnel. Lancer le spark-shell en mode local (voir Doc) en suivant les instructions fournies puis charger le fichier

 wordcount.txt 

au moyen de la méthode textFile() invoquée à partir de la variable context comme suit :

 val data = sc.textFile("<le_chemin_dans_votre_espace_perso>/wordcount.txt")

Manipulation de RDDs sous forme de paires clé-valeur

  1. Structurer le contenu de data de sorte à obtenir une RDD de tableaux de chaînes de caractères. Cette RDD devra être stockée dans une nouvelle variable nommée list.
  2. Afficher ensuite les 100 premiers éléments de la 3e colonne de list.
  3. Transformer le contenu de list en une liste de paires (‘mot’, nb) où mot correspond à la première colonne de list et nb sa troisième colonne.
  4. Grouper les paires par ‘mot’ et additionner leur nombre nb.
  5. Reprendre les questions 3 et 4 en calculant ‘mot’ différemment : désormais, ‘mot’ doit correspondre au préfixe du premier sous-élément de chaque élément de list, çad, pour en.d, mot doit être en, pour fr.d, mot doit être fr, etc. Comparer les résultats avec ceux obtenus précédemment.

Remarque pour partitionner une chaîne de caractères en utilisant le point (.) comme délimiteur à l'aide de la méthode split(), il faut protéger le point avec \, i.e split(“\\.”)

Réponse

//1.	Structurer le contenu de data de sorte à obtenir un tableau de tableaux de chaines de caractères. Ce dernier devra être stocké  dans une nouvelle variable nommée list.
 
val list = data.map(_.split(" "))
 
 
//2.	Afficher ensuite les 100 premiers éléments de la 3e colonne de list.
 
val q12 = list.map(x=>x(2))
 
//3.	Transformer le contenu de list en une liste de paires (‘mot’, nb) où mot correspond à la première colonne de list et nb sa troisième colonne. 
 
val q13 = list.map(x=>(x(0),x(2).toInt))
 
//4.	Grouper les paires par ‘mot’ et additionner leur nombre nb.
 
val q14 = q13.reduceByKey((x,y)=>x+y) 
//ou bien q13.reduceByKey(_+_)
 
//5.	Reprendre les questions 3 et 4 en calculant ‘mot’ différemment : désormais, ‘mot’ doit correspondre au préfixe du premier sous-élément de chaque élément de list, çad, pour en.d, mot doit être en, pour fr.d, mot doit être fr, etc.
 
val q13bis = list.map(x=>(x(0),x(2).toInt)).map(x=>(x._1.split("\\.")(0), x._2))

Exercice 2

Cet exercice s’intéresse à la formulation de jointures simples en Scala. Pour cet exercice, utiliser le jeux de données films (cf Jeux de données) qui contient les fichiers :

Comme d’habitude, commencez par formater les données afin de pouvoir les traiter.

Pour information :

Pour plus d'informations, voir movielens.

Préparation des données

Charger et Transformer en tableau de String chaque ligne des fichiers ratings.dat, users.dat et movies.dat dans les variables notes, utilis, et films respectivement en effectuant les conversions de type nécessaires tel que indiqué dans le schéma suivant :

Interrogation des données

Exprimer en Scala les requêtes suivantes

  1. le nombre de notes (ratings) réalisées par chaque utilisateur identifié par son UserID
  2. le nombre de notes (ratings) réalisées par chaque localisation donnée par le Zip-code
  3. le nombre de notes (ratings) réalisées par chaque groupe de genres de film. (voir Remarque ci-dessous)
  4. les 10 utilisateurs ayant noté le plus de films.
  5. les films ayant reçu le moins de notes
  6. les utilisateurs n’ayant noté aucun film

Remarque : on considère dans un premier temps qu’un genre est représenté par le groupe de genres donné par la chaine de caractères de l’attribut Genres. Dans un second temps, on considère les genres « atomiques » qui sont séparés par des barres « | ». Pour ce faire, déplier les données de films et les stocker dans une valeur intermédiaire films_bis qui contient un nuplet de fims avec un genre à la fois. Par exemple, pour le nuplet (1,Toy Story (1995),Animation|Children's|Comedy) de films, il existe trois nuplets dans films_bis : (1,Toy Story (1995),Animation), (1,Toy Story (1995), Children's) et (1,Toy Story (1995), Comedy). Indice: pour construire films_bis, il est possible d’imbriquer une fonction map à l’intérieur d’une autre (cf. question 2 de l’exercice 3).

Réponse

//a.	le nombre de notes (ratings) réalisées par chaque utilisateur identifié par son UserID
val q2a  = notes.map{case(userId,movieId,rating,ts)=>(userId,1)}.reduceByKey(_+_)
 
//b.	le nombre de notes (ratings) réalisées par chaque localisation donnée par le Zip-code
val q2b = utilis.map{case(userId,gender,age,occup,zipcode)=>(userId,zipcode)} join(notes.map{case(userId,movieId,rating,ts)=>(userId,1)}) map{case (userId,(zipcode, nb))=>(zipcode, 1)} reduceByKey(_+_)
 
 
//c.	le nombre de notes (ratings) réalisées par chaque genre de film
//jointure en notes et films 
val q2c = notes.map(x=>(x(1),1)).join(films.map(x=>(x(0),x(2)))).map{case (movieID, (genre,nb))=>(genre, 1)}.reduceByKey(_+_)
 
 
//d.	les 10 utilisateurs ayant noté le plus de films.
val q2d = notes.map(x=>(x(0),1)).reduceByKey(_+_).takeOrdered(10)
 
//e.	Les films ayant reçu le moins de notes
//f.	Les utilisateurs n’ayant noté aucun film
 
 
 
//genre de films
val films_bis = films.map(x=>(x._1,x._2,x._3.split("\\|"))).flatMap{case(a,b,l)=>l.map(x=>(a,b,x))}

Exercice Subsidiaire : reprendre les questions précédentes en utilisant l'API Dataset

Pour utiliser les Dataset, récupérer le fichier suivant:

cp /Infos/bd/spark/tme-dataset-etudiant.scala <votre repertoire de travail>
emacs tme-dataset-etudiant.scala &