• 20 heures
  • Difficile

Ce cours est visible gratuitement en ligne.

course.header.alt.is_video

course.header.alt.is_certifying

J'ai tout compris !

Mis à jour le 17/10/2023

Domptez les Resilient Distributed Datasets

Nous avons vu ensemble comment exécuter localement un job Spark, et comment lancer un cluster Spark sur sa machine locale. Mais tout ceci ne nous dit pas comment Spark parvient à distribuer les calculs sur les différents workers. On pourrait penser qu'il n'est pas nécessaire de comprendre comment fonctionnent les rouages internes de Spark pour réaliser des calculs distribués... et on se tromperait lourdement ! Comprendre la logique selon laquelle Spark distribue les calculs est essentiel pour concevoir des applications de manière optimale.

Anatomie d'une application Spark

Je vous propose de décortiquer ligne par ligne l'application Spark qui nous a accompagnés jusqu'à présent :

import sys
from pyspark import SparkContext

sc = SparkContext()
lines = sc.textFile(sys.argv[1])
words = lines.flatMap(lambda line: line.split(' '))
words_with_1 = words.map(lambda word: (word, 1))
word_counts = words_with_1.reduceByKey(lambda count1, count2: count1 + count2)
result = word_counts.collect()

for (word, count) in result:
    print(word, count)

Vous pouvez exécuter cette application ligne par ligne dans Spark Shell pour examiner chacun des objets. Par souci de concision, nous allons uniquement nous intéresser au code source de l'application Python, mais ces explications sont aussi valables pour les applications écrites en Scala ou en Java.

import sys
from pyspark import SparkContext

Il s'agit des imports nécessaires pour exécuter le code qui va suivre. Vous vous souvenez que c'est le driver qui exécute l'application ? Il faut donc que ces modules soient disponibles sur le driver. Notez que si les opérations réalisées dans votre application nécessitent des packages externes, il faudra les installer sur les exécuteurs.

sysest un module standard de Python qui va nous permettre de lire les arguments passés à l'application en ligne de commande.pysparkest un package fourni par Spark que vous pouvez trouver dans le répertoirepython/pyspark. La classeSparkContextest implémentée danspython/pyspark/context.py.

sc = SparkContext()

Cette ligne permet d'instancier un objet de typeSparkContext, qui gère les propriétés globales de votre application, telles que le niveau de logging ou le niveau de parallélisation par défaut. UnSparkContextest également chargé de la lecture de données en provenance d'une variété de sources possibles : un ou plusieurs fichiers contenant du texte, des données au format JSON, des fichiers binaires, etc. C'est d'ailleurs ce qui est fait dans la ligne suivante :

lines = sc.textFile(sys.argv[1])

Ici,sys.argv[1]est le paramètre passé en ligne de commande à notre application. Cette ligne de code crée un objetlinesde typeRDD. Mais qu'est-ce qu'un RDD ? C'est ce qu'on appelle un Resilient Distributed Dataset. On touche là au cœur de Spark ! Vous êtes prêts ? Attention, c'est un peu compliqué.

Pour commencer, sachez que, en l'état, l'objetlinesne contient pas encore les lignes de votre fichier texte. En effet, imaginez que votre fichier fasse 1 To : vous n'auriez (probablement) pas suffisamment de RAM pour le stocker en mémoire. Pour l'instant, le RDD ne contient qu'un pointeur vers le fichier. En fait, chaque executor ne va lire que les données qui vont lui servir, et seulement au moment où il en a besoin. Mais, du coup, à quel moment le RDD va-t-il réaliser les opérations suivantes ? Vous l'aurez deviné, pas immédiatement.

words = lines.flatMap(lambda line: line.split(' '))

Ici, nous transformons les lignes en mots ; c'est un peu comme une fonctionmapsauf qu'à chaque donnée en entrée on peut associer plus d'une donnée en sortie. Par exemple aux deux lignes "Sur mes cahiers" et "Sur mon pupitre" sont associés la liste de mots["Sur", "mes", "cahiers", "Sur", "mon", "pupitre"].

L'objetwordsest lui aussi un RDD : l'opérationflatMapa tranformé un RDD en un autre. On dit que c'est une transformation. Ca a l'air tout bête, dit comme ça, mais c'est important.

words_with_1 = words.map(lambda word: (word, 1))

De la même manière,mapest ici une transformation. A la liste de mots["Sur", "mes", "cahiers", "Sur", "mon", "pupitre"]elle associe la liste constituée de tuples[("Sur", 1), ("mes", 1), ("Sur", 1), ("mon", 1), ("pupitre", 1)]. Comme les objets en sortie demapsont des tuples contenant deux valeurs, ils peuvent être considérés comme des paires(clé, valeur), ce qui nous permet de faire appel à la transformation suivante :

word_counts = words_with_1.reduceByKey(lambda count1, count2: count1 + count2)

L'argument passé àreduceByKeyest une fonction qui prend pour arguments à chaque appel deux valeurs provenant des paires(clé, valeur). Encore une fois,reduceByKeyest une transformation, ce qui signifie que, vous l'aurez deviné,word_countsest un RDD. Attention, ça change avec la ligne suivante :

result = word_counts.collect()

Ici,resultn'est pas un RDD mais une liste de tuples(clé, valeur). On dit quecollectest une action.

Résumons. Les RDD possèdent deux types de méthodes :

  • Les transformations qui donnent en sortie un autre RDD.

  • Les actions qui donnent en sortie... autre chose qu'un RDD.

Pourquoi est-ce que cette distinction est importante ? Parce que les transformations ne sont pas évaluées immédiatement. On dit qu'elles sont évaluées de manière paresseuse ("lazy evaluation"), c'est à dire uniquement lorsqu'on en a besoin. Et on n'a besoin du résultat d'une transformation que lorsqu'on effectue une action. Est-ce que vous avez remarqué que les appels àflatMap,mapetreduceByKeyse font de manière quasi instantanée ? C'est parce que ces appels, en soit, ne font pas grand-chose. Les fonctions passées en argument de ces transformations ne sont appelées qu'au moment de l'appel àcollect.

Calculs distribués sous forme de graphe avec les DAG

Si les RDD sont aussi importants dans Spark, c'est parce qu'ils dictent la manière dont les calculs vont être distribués sur les différentes machines. Ce sont aussi les RDD qui permettent une tolérance aux pannes, sujet épineux comme on a pu le voir en introduction de ce chapitre.

Dans une application Spark, les transformations et les actions réalisées sur les RDD permettent de construire un graphe acyclique orienté (DAG : "directed acyclic graph").dag.svg

Dans un DAG, les nœuds sont les RDD et les résultats. Les connexions entre les nœuds sont soit des transformations, soit des actions. Ces connexions sont orientées car elles ne permettent de passer d'un RDD à un autre que dans un sens. Le graphe est dit acyclique car aucun RDD ne permet de se transformer en lui-même via une série d'actions.

Lorsqu'un nœud devient indisponible, à cause d'une malfonction quelconque, il peut être regénéré à partir de ses nœuds parents. C'est précisément ce qui permet la tolérance aux pannes des applications Spark.

Bien, nous savons maintenant ce que sont les RDD et à quoi ils servent. Mais comment les créer et les manipuler ? Pour ça, nous devons nous intéresser à la librairie standard de Spark.

Lecture de données

Pour concevoir des applications Spark optimales, il est important de bien connaître la librairie standard de Spark. Heureusement, elle est relativement légère et il est possible d'en couvrir les fonctionnalités les plus importantes assez rapidement. Commençons par la lecture de données : elle est la responsabilité duSparkContextqui produit en sortie un RDD.Comme nous l'avons mentionné plus haut, il est possible de lire les données à partir d'une grande variété de sources différentes. La liste complète est disponible dans la documentation (et dans le code source) ; voyons-en quelques unes ici :

  • sc.textFile(path): comme on l'a vu, l'appel à cette méthode crée un RDD à partir des lignes du fichier. A noter que lepathdu fichier peut être un chemin réseau, ce qui nous sera utile par la suite.

  • sc.wholeTextFiles(path): ici,pathpeut désigner un chemin vers un fichier ou un répertoire contenant plusieurs fichiers. Le RDD construit est un ensemble de clés-valeurs ou chaque clé indique le chemin vers un fichier et les valeurs sont le contenu complet des fichiers. Le(s) fichier(s) ne sont donc pas décomposés en lignes.

  • sc.parallelize(iterable): utilisé pour transformer des objets Python (ou Scala) en RDD.parallelizeest principalement utilisé dans le Spark Shell pour déboguer des applications.

Transformations et actions

De quelles transformations et actions disposons-nous grâce à la librairie standard de Spark ? La liste complète est disponible dans la documentation de Spark mais nous allons lister ici les opérations les plus fréquemment utilisées.

Transformations :

  • map(func): applique une fonction à chacune des données.

  • filter(func): permet d'éliminer certaines données. Par exemple, danswordcount.py, pour ne sélectionner que les mots commençant par une majuscule, on peut écrire :words.filter(lambda word: word[0].upper() == word[0]).

  • flatMap(func): tout comme map, mais chacune des données d'entrée peut être transformée en plusieurs données de sortie.

  • sample(withReplacement, fraction, seed): récolte un échantillon aléatoire des données. Cette méthode est utile pour tester un algorithme sur un petit pourcentage de données. L'argumentseedpermet de réaliser des expérience reproductibles. Exemple :.sample(False, 0.01, 42).

  • distinct(): supprime les doublons.

  • groupByKey(): transforme des clés-valeurs (K, V) en (K, W) où W est un object itérable. Par exemple, (K, U) et (K, V) seront transformées en (K, [U, V]).

  • reduceByKey(func): applique une fonction de réduction aux valeurs de chaque clé. Comme on a pu le voir, la fonction de réduction est appelée avec deux arguments. Si ce genre de choses vous intéresse, sachez qu'il est attendu que la fonction de réduction soit commutative et associative, c'est à dire que :func(a, b) = func(b, a)etfunc(func(a, b), c) = func(a, func(b, c)). Par exemple, la fonctionfuncpeut renvoyer le maximum entre deux valeurs.

  • sortByKey(ascending): utilisée pour trier le résultat par clé. Par exemple, si on avait voulu obtenir la liste de tous les mots triés par ordre alphabétique, on aurait écrit :wordcounts = wordcounts.sortByKey().

  • join(rdd): permet de réaliser une jointure, ce qui a le même sens que dans les bases de données relationnelles. À noter qu'il existe également des fonctionsleftOuterJoin,rightOuterJoinetfullOuterJoin. Les jointures sont réalisées sur la clé. Par exemple :

    >>> rdd1 = sc.parallelize([(1, "a1"), (2, "b1")])
    >>> rdd2 = sc.parallelize([(1, "a2"), (2, "b2"), (3, "c2")])
    >>> rdd1.join(rdd2).collect()
    [(1, ('a1', 'a2')), (2, ('b1', 'b2'))]
    >>> rdd1.rightOuterJoin(rdd2).collect()
    [(1, ('a1', 'a2')), (2, ('b1', 'b2')), (3, (None, 'c2'))]

Actions :

  • reduce(func): applique une réduction à l'ensemble des données.

  • collect(): retourne toutes les données contenues dans le RDD sous la forme de liste.

  • count(): retourne le nombre de données contenues dans RDD. Par exemple,words.count()permet de compter le nombre de mots dans l'Iliade (154642).

  • takeOrdered(n, key_func): retourne lesnpremiers éléments du RDD ordonnés selonkey_func. Par exemple, pour obtenir la liste des 10 mots les plus fréquents (donc triés par ordre décroissant de fréquence) on écrit :word_counts.takeOrdered(10, lambda i: -i[1]).

Cycle de vie d'une application

Bien, nous savons maintenant comment traiter et analyser des données avec les actions et les transformations de Spark. Mais tout cela ne nous dit pas comment Spark distribue les calculs sur les différents executors ? En particulier, comment les données sont-elles réparties entre les executors ?

Les données sont découpées en partitions. Chaque partition est assignée à un des executors. Le traitement d'une partition représente une tâche : c'est la plus petite unité de traitement de données que nous allons voir. Un cluster Spark ne peut traiter qu'une tâche à la fois par executor, et en général il y a un executor par cœur de processeur. Par ailleurs, la taille d'une partition doit rester inférieure à la mémoire disponible pour son executor. Le choix du nombre de partitions est donc crucial, puisqu'il détermine le nombre de tâches qui seront réalisées de manière concurrente sur le cluster. Nous nous pencherons plus précisément sur ce point dans le chapitre sur le déboguage et l'optimisation d'applications Spark.

Un ensemble de tâches réalisées en parallèle, une par partition d'un RDD, constitue une étape (stage). Toutes les tâches d'une étape doivent être terminées avant que l'on puisse passer à l'étape suivante. Un job Spark est composé d'une succession d'étapes ; la progression d'un job peut donc être mesurée au nombre d'étapes qui ont été réalisées. Un job Spark est créé pour chaque action qu'on réalise sur un RDD.

spark-stages-rdds.svg

Quelles tâches regroupent-on dans une même étape ? On passe d'une étape à une autre dès lors qu'on doit redistribuer les données entre les nœuds. On dit alors qu'il y a un shuffle. Les shuffle peuvent se produire pour des raisons différentes. Par exemple, lors d'unreduceByKeytoutes les données correspondant à une même clé sont regroupées sur la même partition. Il est important de comprendre quelles actions nécessitent un shuffle car le transfert de données entre différentes machines (par le réseau) est coûteux en temps.

spark-workflow.svg

Persistence

Il n'aura pas échappé aux esprits attentifs que l'on peut réaliser plusieurs actions sur un même RDD. Par exemple, on compte ici le nombre de mots contenus dans un texte et on liste également les dix premiers mots par ordre alphabétique :

# Transformations
all_words = lines.flatMap(lambda line: line.split(' '))
different_words = all_words.distinct()

# Actions
different_words_count = different_words.count()
first_10_words        = different_words.takeOrdered(10)

CommeflatMapetdistinctsont des transformations, elles ne sont évaluées que lors des appels àcountettakeOrdered, qui sont des actions. Cependant, comme il y a deux actions, chacune des transformations sera évaluée deux fois ! Effectivement, le résultat des transformations n'est pas gardé en mémoire. Le stockage des RDD prendrait beaucoup trop de place si on devait conserver le résultat de toutes les transformations. Mais réaliser les transformations deux fois est évidemment inutile et coûteux en temps de calcul. Pour faire en sorte que les transformations ne soient pas réalisées plusieurs fois, il faut faire appel à la méthodepersist():

# Transformations
...
different_words.persist()

# Actions
...

Un RDD sur lequel on aura appelépersist()sera stocké en cache lors de sa première action ; c'est-à-dire, ici, lors de l'appel àcount(). Puis, lors des actions ultérieures (takeOrdered(10)) la valeur stockée en cache est réutilisée, ce qui permet de ne pas refaire les transformationsflatMap()etdistinct().

La persistence d'un RDD peut être réalisée en mémoire ou sur disque. Pour plus de détails, vous pouvez consulter la documentation officielle.

Exemple de certificat de réussite
Exemple de certificat de réussite