Nous avons vu ensemble comment exécuter localement un job Spark, et comment lancer un cluster Spark sur sa machine locale. Mais tout ceci ne nous dit pas comment Spark parvient à distribuer les calculs sur les différents workers. On pourrait penser qu'il n'est pas nécessaire de comprendre comment fonctionnent les rouages internes de Spark pour réaliser des calculs distribués... et on se tromperait lourdement ! Comprendre la logique selon laquelle Spark distribue les calculs est essentiel pour concevoir des applications de manière optimale.
Anatomie d'une application Spark
Je vous propose de décortiquer ligne par ligne l'application Spark qui nous a accompagnés jusqu'à présent :
import sys
from pyspark import SparkContext
sc = SparkContext()
lines = sc.textFile(sys.argv[1])
words = lines.flatMap(lambda line: line.split(' '))
words_with_1 = words.map(lambda word: (word, 1))
word_counts = words_with_1.reduceByKey(lambda count1, count2: count1 + count2)
result = word_counts.collect()
for (word, count) in result:
print(word, count)
Vous pouvez exécuter cette application ligne par ligne dans Spark Shell pour examiner chacun des objets. Par souci de concision, nous allons uniquement nous intéresser au code source de l'application Python, mais ces explications sont aussi valables pour les applications écrites en Scala ou en Java.
import sys
from pyspark import SparkContext
Il s'agit des imports nécessaires pour exécuter le code qui va suivre. Vous vous souvenez que c'est le driver qui exécute l'application ? Il faut donc que ces modules soient disponibles sur le driver. Notez que si les opérations réalisées dans votre application nécessitent des packages externes, il faudra les installer sur les exécuteurs.
sys
est un module standard de Python qui va nous permettre de lire les arguments passés à l'application en ligne de commande.pyspark
est un package fourni par Spark que vous pouvez trouver dans le répertoirepython/pyspark
. La classeSparkContext
est implémentée danspython/pyspark/context.py
.
sc = SparkContext()
Cette ligne permet d'instancier un objet de typeSparkContext
, qui gère les propriétés globales de votre application, telles que le niveau de logging ou le niveau de parallélisation par défaut. UnSparkContext
est également chargé de la lecture de données en provenance d'une variété de sources possibles : un ou plusieurs fichiers contenant du texte, des données au format JSON, des fichiers binaires, etc. C'est d'ailleurs ce qui est fait dans la ligne suivante :
lines = sc.textFile(sys.argv[1])
Ici,sys.argv[1]
est le paramètre passé en ligne de commande à notre application. Cette ligne de code crée un objetlines
de typeRDD
. Mais qu'est-ce qu'un RDD ? C'est ce qu'on appelle un Resilient Distributed Dataset. On touche là au cœur de Spark ! Vous êtes prêts ? Attention, c'est un peu compliqué.
Pour commencer, sachez que, en l'état, l'objetlines
ne contient pas encore les lignes de votre fichier texte. En effet, imaginez que votre fichier fasse 1 To : vous n'auriez (probablement) pas suffisamment de RAM pour le stocker en mémoire. Pour l'instant, le RDD ne contient qu'un pointeur vers le fichier. En fait, chaque executor ne va lire que les données qui vont lui servir, et seulement au moment où il en a besoin. Mais, du coup, à quel moment le RDD va-t-il réaliser les opérations suivantes ? Vous l'aurez deviné, pas immédiatement.
words = lines.flatMap(lambda line: line.split(' '))
Ici, nous transformons les lignes en mots ; c'est un peu comme une fonctionmap
sauf qu'à chaque donnée en entrée on peut associer plus d'une donnée en sortie. Par exemple aux deux lignes "Sur mes cahiers" et "Sur mon pupitre" sont associés la liste de mots["Sur", "mes", "cahiers", "Sur", "mon", "pupitre"]
.
L'objetwords
est lui aussi un RDD : l'opérationflatMap
a tranformé un RDD en un autre. On dit que c'est une transformation. Ca a l'air tout bête, dit comme ça, mais c'est important.
words_with_1 = words.map(lambda word: (word, 1))
De la même manière,map
est ici une transformation. A la liste de mots["Sur", "mes", "cahiers", "Sur", "mon", "pupitre"]
elle associe la liste constituée de tuples[("Sur", 1), ("mes", 1), ("Sur", 1), ("mon", 1), ("pupitre", 1)]
. Comme les objets en sortie demap
sont des tuples contenant deux valeurs, ils peuvent être considérés comme des paires(clé, valeur)
, ce qui nous permet de faire appel à la transformation suivante :
word_counts = words_with_1.reduceByKey(lambda count1, count2: count1 + count2)
L'argument passé àreduceByKey
est une fonction qui prend pour arguments à chaque appel deux valeurs provenant des paires(clé, valeur)
. Encore une fois,reduceByKey
est une transformation, ce qui signifie que, vous l'aurez deviné,word_counts
est un RDD. Attention, ça change avec la ligne suivante :
result = word_counts.collect()
Ici,result
n'est pas un RDD mais une liste de tuples(clé, valeur)
. On dit quecollect
est une action.
Résumons. Les RDD possèdent deux types de méthodes :
Les transformations qui donnent en sortie un autre RDD.
Les actions qui donnent en sortie... autre chose qu'un RDD.
Pourquoi est-ce que cette distinction est importante ? Parce que les transformations ne sont pas évaluées immédiatement. On dit qu'elles sont évaluées de manière paresseuse ("lazy evaluation"), c'est à dire uniquement lorsqu'on en a besoin. Et on n'a besoin du résultat d'une transformation que lorsqu'on effectue une action. Est-ce que vous avez remarqué que les appels àflatMap
,map
etreduceByKey
se font de manière quasi instantanée ? C'est parce que ces appels, en soit, ne font pas grand-chose. Les fonctions passées en argument de ces transformations ne sont appelées qu'au moment de l'appel àcollect
.
Calculs distribués sous forme de graphe avec les DAG
Si les RDD sont aussi importants dans Spark, c'est parce qu'ils dictent la manière dont les calculs vont être distribués sur les différentes machines. Ce sont aussi les RDD qui permettent une tolérance aux pannes, sujet épineux comme on a pu le voir en introduction de ce chapitre.
Dans une application Spark, les transformations et les actions réalisées sur les RDD permettent de construire un graphe acyclique orienté (DAG : "directed acyclic graph").
Dans un DAG, les nœuds sont les RDD et les résultats. Les connexions entre les nœuds sont soit des transformations, soit des actions. Ces connexions sont orientées car elles ne permettent de passer d'un RDD à un autre que dans un sens. Le graphe est dit acyclique car aucun RDD ne permet de se transformer en lui-même via une série d'actions.
Lorsqu'un nœud devient indisponible, à cause d'une malfonction quelconque, il peut être regénéré à partir de ses nœuds parents. C'est précisément ce qui permet la tolérance aux pannes des applications Spark.
Bien, nous savons maintenant ce que sont les RDD et à quoi ils servent. Mais comment les créer et les manipuler ? Pour ça, nous devons nous intéresser à la librairie standard de Spark.
Lecture de données
Pour concevoir des applications Spark optimales, il est important de bien connaître la librairie standard de Spark. Heureusement, elle est relativement légère et il est possible d'en couvrir les fonctionnalités les plus importantes assez rapidement. Commençons par la lecture de données : elle est la responsabilité duSparkContext
qui produit en sortie un RDD.Comme nous l'avons mentionné plus haut, il est possible de lire les données à partir d'une grande variété de sources différentes. La liste complète est disponible dans la documentation (et dans le code source) ; voyons-en quelques unes ici :
sc.textFile(path)
: comme on l'a vu, l'appel à cette méthode crée un RDD à partir des lignes du fichier. A noter que lepath
du fichier peut être un chemin réseau, ce qui nous sera utile par la suite.sc.wholeTextFiles(path)
: ici,path
peut désigner un chemin vers un fichier ou un répertoire contenant plusieurs fichiers. Le RDD construit est un ensemble de clés-valeurs ou chaque clé indique le chemin vers un fichier et les valeurs sont le contenu complet des fichiers. Le(s) fichier(s) ne sont donc pas décomposés en lignes.sc.parallelize(iterable)
: utilisé pour transformer des objets Python (ou Scala) en RDD.parallelize
est principalement utilisé dans le Spark Shell pour déboguer des applications.
Transformations et actions
De quelles transformations et actions disposons-nous grâce à la librairie standard de Spark ? La liste complète est disponible dans la documentation de Spark mais nous allons lister ici les opérations les plus fréquemment utilisées.
Transformations :
map(func)
: applique une fonction à chacune des données.filter(func)
: permet d'éliminer certaines données. Par exemple, danswordcount.py
, pour ne sélectionner que les mots commençant par une majuscule, on peut écrire :words.filter(lambda word: word[0].upper() == word[0])
.flatMap(func)
: tout comme map, mais chacune des données d'entrée peut être transformée en plusieurs données de sortie.sample(withReplacement, fraction, seed)
: récolte un échantillon aléatoire des données. Cette méthode est utile pour tester un algorithme sur un petit pourcentage de données. L'argumentseed
permet de réaliser des expérience reproductibles. Exemple :.sample(False, 0.01, 42)
.distinct()
: supprime les doublons.groupByKey()
: transforme des clés-valeurs (K, V) en (K, W) où W est un object itérable. Par exemple, (K, U) et (K, V) seront transformées en (K, [U, V]).reduceByKey(func)
: applique une fonction de réduction aux valeurs de chaque clé. Comme on a pu le voir, la fonction de réduction est appelée avec deux arguments. Si ce genre de choses vous intéresse, sachez qu'il est attendu que la fonction de réduction soit commutative et associative, c'est à dire que :func(a, b) = func(b, a)
etfunc(func(a, b), c) = func(a, func(b, c))
. Par exemple, la fonctionfunc
peut renvoyer le maximum entre deux valeurs.sortByKey(ascending)
: utilisée pour trier le résultat par clé. Par exemple, si on avait voulu obtenir la liste de tous les mots triés par ordre alphabétique, on aurait écrit :wordcounts = wordcounts.sortByKey()
.join(rdd)
: permet de réaliser une jointure, ce qui a le même sens que dans les bases de données relationnelles. À noter qu'il existe également des fonctionsleftOuterJoin
,rightOuterJoin
etfullOuterJoin
. Les jointures sont réalisées sur la clé. Par exemple :>>> rdd1 = sc.parallelize([(1, "a1"), (2, "b1")]) >>> rdd2 = sc.parallelize([(1, "a2"), (2, "b2"), (3, "c2")]) >>> rdd1.join(rdd2).collect() [(1, ('a1', 'a2')), (2, ('b1', 'b2'))] >>> rdd1.rightOuterJoin(rdd2).collect() [(1, ('a1', 'a2')), (2, ('b1', 'b2')), (3, (None, 'c2'))]
Actions :
reduce(func)
: applique une réduction à l'ensemble des données.collect()
: retourne toutes les données contenues dans le RDD sous la forme de liste.count()
: retourne le nombre de données contenues dans RDD. Par exemple,words.count()
permet de compter le nombre de mots dans l'Iliade (154642).takeOrdered(n, key_func)
: retourne lesn
premiers éléments du RDD ordonnés selonkey_func
. Par exemple, pour obtenir la liste des 10 mots les plus fréquents (donc triés par ordre décroissant de fréquence) on écrit :word_counts.takeOrdered(10, lambda i: -i[1]
).
Cycle de vie d'une application
Bien, nous savons maintenant comment traiter et analyser des données avec les actions et les transformations de Spark. Mais tout cela ne nous dit pas comment Spark distribue les calculs sur les différents executors ? En particulier, comment les données sont-elles réparties entre les executors ?
Les données sont découpées en partitions. Chaque partition est assignée à un des executors. Le traitement d'une partition représente une tâche : c'est la plus petite unité de traitement de données que nous allons voir. Un cluster Spark ne peut traiter qu'une tâche à la fois par executor, et en général il y a un executor par cœur de processeur. Par ailleurs, la taille d'une partition doit rester inférieure à la mémoire disponible pour son executor. Le choix du nombre de partitions est donc crucial, puisqu'il détermine le nombre de tâches qui seront réalisées de manière concurrente sur le cluster. Nous nous pencherons plus précisément sur ce point dans le chapitre sur le déboguage et l'optimisation d'applications Spark.
Un ensemble de tâches réalisées en parallèle, une par partition d'un RDD, constitue une étape (stage). Toutes les tâches d'une étape doivent être terminées avant que l'on puisse passer à l'étape suivante. Un job Spark est composé d'une succession d'étapes ; la progression d'un job peut donc être mesurée au nombre d'étapes qui ont été réalisées. Un job Spark est créé pour chaque action qu'on réalise sur un RDD.
Quelles tâches regroupent-on dans une même étape ? On passe d'une étape à une autre dès lors qu'on doit redistribuer les données entre les nœuds. On dit alors qu'il y a un shuffle. Les shuffle peuvent se produire pour des raisons différentes. Par exemple, lors d'unreduceByKey
toutes les données correspondant à une même clé sont regroupées sur la même partition. Il est important de comprendre quelles actions nécessitent un shuffle car le transfert de données entre différentes machines (par le réseau) est coûteux en temps.
Persistence
Il n'aura pas échappé aux esprits attentifs que l'on peut réaliser plusieurs actions sur un même RDD. Par exemple, on compte ici le nombre de mots contenus dans un texte et on liste également les dix premiers mots par ordre alphabétique :
# Transformations
all_words = lines.flatMap(lambda line: line.split(' '))
different_words = all_words.distinct()
# Actions
different_words_count = different_words.count()
first_10_words = different_words.takeOrdered(10)
CommeflatMap
etdistinct
sont des transformations, elles ne sont évaluées que lors des appels àcount
ettakeOrdered
, qui sont des actions. Cependant, comme il y a deux actions, chacune des transformations sera évaluée deux fois ! Effectivement, le résultat des transformations n'est pas gardé en mémoire. Le stockage des RDD prendrait beaucoup trop de place si on devait conserver le résultat de toutes les transformations. Mais réaliser les transformations deux fois est évidemment inutile et coûteux en temps de calcul. Pour faire en sorte que les transformations ne soient pas réalisées plusieurs fois, il faut faire appel à la méthodepersist()
:
# Transformations
...
different_words.persist()
# Actions
...
Un RDD sur lequel on aura appelépersist()
sera stocké en cache lors de sa première action ; c'est-à-dire, ici, lors de l'appel àcount()
. Puis, lors des actions ultérieures (takeOrdered(10)
) la valeur stockée en cache est réutilisée, ce qui permet de ne pas refaire les transformationsflatMap()
etdistinct()
.
La persistence d'un RDD peut être réalisée en mémoire ou sur disque. Pour plus de détails, vous pouvez consulter la documentation officielle.