Jusqu'à présent, on a vu avec Spark des outils qui permettent de réaliser facilement des opérations bas niveau de manière distribuée. Ces briques élémentaires nous permettent de réécrire des algorithmes entiers pour les distribuer sur des clusters de calcul. Cependant il peut être fastidieux de réécrire des algorithmes complexes pour lesquels il existe déjà des implémentations populaires mais non distribuées. MLlib est une librairie de machine learning pour Spark qui résout ce problème. De la même manière, on pourrait réécrire des méthodes de traitement de données structurées mais les concepteurs de Spark ont pensé à tout et ont créé pour ça Spark SQL.
Spark SQL
Commençons par parler de Spark SQL, qui va nous permettre d'introduire un nouveau type d'objet : les DataFrames. Il s'agit d'un type très similaire aux RDD, à la différence qu'il permet de stocker de manière distribuée des données structurées, là où les RDD nous permettent de stocker des données non structurées. Pas de panique, on va voir qu'en pratique les différences sont mineures.
Quel est l'intérêt de stocker des données structurées de manière distribuées ? Cela va nous permettre d'analyser nos données à l'aide de requêtes similaires à des requêtes SQL — souvenez-vous que "SQL" signifie Structured Query Language. Structurer des données massives va donc vous permettre de mettre à disposition de vos utilisateurs des données qu'ils pourront exploiter comme s'ils effectuaient des requêtes sur une base de données relationnelle classique, et ils n'auront pour ça pas à apprendre un nouveau langage de requête.
Lecture de données
Pour comprendre Spark SQL et les DataFrames, le mieux est encore de travailler sur un exemple. Nous allons analyser une base de données contenant un échantillon de la population mondiale.
Commençons par télécharger nos données :
$ wget https://s3-eu-west-1.amazonaws.com/course.oc-static.com/courses/4297166/agents.json
$ head -5 agents.json
{"id":227417393,"longitude":100.85840672174572,"latitude":33.15219798270325,"country_name":"China","sex":"Male"}
{"id":6821129477,"longitude":-72.43795260265814,"latitude":19.325567983697297,"country_name":"Haiti","sex":"Female"}
{"id":2078667700,"longitude":80.85636526088884,"latitude":23.645271492037235,"country_name":"India","sex":"Female"}
{"id":477556555,"longitude":93.33604038078953,"latitude":33.45864668881662,"country_name":"China","sex":"Female"}
{"id":1379059984,"longitude":80.7728698035823,"latitude":28.816938290678692,"country_name":"India","sex":"Female"}
Nous allons faire des expériences avec Spark SQL dans un Spark Shell :
$ ./code/spark-2.3.1-bin-hadoop2.7/bin/pyspark
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.3.1
/_/
Using Python version 3.6.5 (default, Apr 1 2018 05:46:30)
SparkSession available as 'spark'.
Peut-être aviez-vous déjà remarqué ce message lors du lancement d'un Spark Shell :SparkSession available as 'spark'
? Effectivement, un objetspark
de typepyspark.sql.session.SparkSession
est disponible dans le shell :
>>> spark
<pyspark.sql.session.SparkSession object at 0x7fdbb735ea50>
Cet objet nous permet d'instancier des objets de typeDataFrame
, de la même manière que leSparkContext
nous permettait d'instancier desRDD
:
>>> agents = spark.read.json("agents.json")
>>> agents
DataFrame[country_name: string, id: bigint, latitude: double, longitude: double, sex: string]
On remarque que le schéma des données a été automatiquement inféré, et nous disposons donc maintenant d'une interface structurée vers le contenu de notre fichier. Mais attention ! Les données ne sont pas chargées en mémoire pour autant. Comme pour les RDD, les données ne seront lues que lorsque ça sera nécessaire, en suivant le même principe d'évaluation paresseuse.
Écrire des requêtes
Continuons notre exemple précédent pour analyser le contenu de notre base d'agents :
>>> french_agents = agents.filter(agents.country_name == "France")
>>> french_agents
DataFrame[country_name: string, id: bigint, latitude: double, longitude: double, sex: string]
L'opérationfilter
renvoie un objet de typeDataFrame
, tout comme les transformations sur les objets de typeRDD
renvoient un objet de typeRDD
. De fait, l'opérationfilter
ne fait pas grand chose, en l'état, puisqu'elle n'a pas encore été évaluée. Pour l'évaluer, il faut réaliser une action :
>>> french_agents.count()
94
La récupération du premier objet duDataFrame
constitue aussi une action et déclenche donc l'évaluation defilter
:
>>> agent = french_agents.first()
>>> agent
Row(country_name=u'France', id=5130782577, latitude=-0.21142875508479517, longitude=-0.003950214433749498, sex=u'Female')
On peut voir que les objets retournés par leDataFrame
sont de typeRow
. On accède aux valeurs des colonnes de cet objet comme à des attributs standards :
>>> print(agent.country_name, agent.id)
France 5130782577
En réalité, l'API des DataFrames est conçue comme un ORM ("Object-Relational Mapping"), et nous pouvons réaliser des requêtes SQL en enchaînant des appels de méthodes :
>>> agents.filter(agents.country_name == "France").filter(agents.latitude < 0).count()
41
La même opération peut être réalisée à l'aide d'opérateurs booléens :
>>> agents.filter((agents.country_name == "France") & (agents.latitude < 0)).count()
41
Le contenu d'un DataFrame peut être visualisé à l'aide de la méthodeshow()
, qui est aussi une action :
>>> agents.limit(5).show()
+------------+----------+------------------+------------------+------+
|country_name| id| latitude| longitude| sex|
+------------+----------+------------------+------------------+------+
| China| 227417393| 33.15219798270325|100.85840672174572| Male|
| Haiti|6821129477|19.325567983697297|-72.43795260265814|Female|
| India|2078667700|23.645271492037235| 80.85636526088884|Female|
| China| 477556555| 33.45864668881662| 93.33604038078953|Female|
| India|1379059984|28.816938290678692| 80.7728698035823|Female|
+------------+----------+------------------+------------------+------+
On peut également réaliser des requêtes SQL sans passer par l'ORM en créant une vue temporaire associée à un DataFrame :
>>> agents.createTempView("agents_table")
>>> spark.sql("SELECT * FROM agents_table ORDER BY id DESC LIMIT 10").show()
+-----------------+----------+-------------------+-------------------+------+
| country_name| id| latitude| longitude| sex|
+-----------------+----------+-------------------+-------------------+------+
| French Polynesia|7170821229|-15.004219445056265|-140.01650828107668| Male|
| Cabo Verde|7167692449| 16.00676587564149| -23.90898775675409| Male|
| Suriname|7166451460| 4.008871704322331| -55.97275746253122|Female|
| Suriname|7166235088| 3.96442417744574|-56.077562332679605|Female|
| Macau|7166034642| 21.944944804684596| 114.02447154998114|Female|
| Montenegro|7164357515| 42.32131745506727| 19.168822000529843| Male|
|Equatorial Guinea|7163867872| 3.651402073464487| 9.913739020397387|Female|
| Bhutan|7163256789| 27.419739555133912| 90.29001406759927|Female|
| Bhutan|7163004645| 27.281480489455422| 90.17405662751794| Male|
| Bhutan|7162877973| 27.37149433886258| 90.38882928596311| Male|
+-----------------+----------+-------------------+-------------------+------+
La vueagents_table
ainsi créée est temporaire ; vous pouvez la remplacer à l'aide de la méthodecreateOrReplaceTempView
. Pour plus de précisions concernant la gestion de vues temporaires ou globales, n'hésitez pas à consulter la documentation officielle.
Il faut bien comprendre que tous les principes essentiels qu'on a vu sur les RDD dans le chapitre précédent s'appliquent de la même manière aux DataFrames. Par exemple, on peut stocker en cache un DataFrame à l'aide de la méthodepersist
:
>>> agents.persist()
DataFrame[country_name: string, id: bigint, latitude: double, longitude: double, sex: string]
Et les RDD ne sont jamais loin puisque le RDD associé à un DataFrame est accessible via la propriétérdd
:
>>> agents.rdd.filter(lambda row: row.country_name == "France").count()
94
Il est donc très aisé de convertir un DataFrame en RDD. Par ailleurs, unSparkSession
peut convertir un RDD contenant des objets de typeRow
en DataFrame :
>>> from pyspark.sql import Row
>>> rdd = sc.parallelize([Row(name="Alice"), Row(name="Bob")])
>>> spark.createDataFrame(rdd)
DataFrame[name: string]
Pourquoi ce long laïus préambulatoire sur les DataFrames ? Et bien tout d'abord, les DataFrames permettent de réaliser des requêtes sur des données structurées. Ceci vous permet de mettre à disposition de vos équipes de business intelligence ou de data scientists une interface dotée d'un langage de requêtes dont ils sont déjà familiers. Ensuite, les DataFrames sont fortement utilisées dans Spark pour faire du machine learning, comme on va le voir maintenant.
Spark ML
Savez-vous ce qu'est le machine learning ? On est censé parler d'"apprentissage automatique" en français mais cette appellation n'a jamais été très populaire... Le machine learning est la science que pratiquent les data scientists : d'un point de vue très formel, le machine learning permet de réaliser des prédictions sur des données à partir d'autres données, ce qui constitue, vous en conviendrez, une définition assez aride. Pour être plus concret, voyons quelques exemples de problèmes de machine learning :
Estimer la fréquentation d'un lieu touristique le mois prochain en fonction de sa fréquentation des dix dernières années.
Décrire le contenu d'une image à partir des valeurs des pixels de l'image et du contenu d'un million d'autres images.
Détecter les groupes d'utilisateurs les plus soudés d'un réseau social.
Ces exemples sont des problèmes typiques auxquels sont confrontés les data scientists. Pour en savoir plus, je vous conseille de consulter l'excellent cours Initiez-vous au machine learning. Prenez votre temps je ne bouge pas d'ici :)
Vous n'avez pas forcément besoin de devenir un data scientist aguerri pour réaliser des calculs distribués avec Spark. Cependant, si vous êtes amenés à travailler en tant que data architect (ce qu'on vous souhaite !) vous aurez à collaborer avec des data scientists et à comprendre les enjeux du machine learning. En particulier, les deux problèmes les plus fréquents rencontrés par les data scientists concernent :
La rapidité d'exécution de leurs calculs.
Le volume de données qu'ils peuvent utiliser pour réaliser l'apprentissage de leurs algorithmes.
En effet, la réalisation de prédictions sur des données nécessite (presque toujours) de calculer les paramètres d'un modèle. Par exemple, si l'on cherche à créer un modèle de prédiction de la météo, il faut mesurer l'impact quantitatif d'un vent du nord sur l'hygrométrie. Pour distinguer des images de chatons d'images de chiots, il faut déterminer quels sont les attributs visuels qui différencient les deux espèces. Et pour apprendre ces modèles il faut des données, beaucoup de données. Plus il y a de paramètres dans le modèle, plus il faut de données pour réaliser son apprentissage. Et les data scientists ont tendance à créer des modèles complexes nécessitant beaucoup de paramètres. Ils ne font pas ça exprès pour vous embêter, c'est juste que la réalité c'est complexe et difficile à modéliser.
Par ailleurs, même si un modèle nécessite relativement peu de paramètres, il sera d'autant plus réaliste que beaucoup de données auront été utilisées pour le créer. Par exemple, pour ajuster précisément les paramètres de votre modèle météorologique, vous avez besoin d'un historique le plus ancien possible des relevés de température et d'hygrométrie.
Bref, l'augmentation du nombre de paramètres et la nécessité d'ajuster avec précision les paramètres d'un modèles sont deux phénomènes qui concourent à utiliser le plus de données possible pour réaliser l'apprentissage d'un modèle. Le problème, pour les data scientists, c'est que ces besoins en données excèdent rapidement les ressources en calcul et en mémoire d'une unique machine. Et votre rôle, en tant que data architect, est de permettre malgré tout la réalisation de ces calculs complexes... Vous l'aurez deviné, en mettant à disposition une infrastructure de calcul distribuée !
Mais trève de bavardages, rentrons dans le vif du sujet avec l'apprentissage d'un modèle de classification.
Classification automatique de textes
Nous allons créer un modèle de classification de textes capable de prédire quel est le sujet d'une conversation. Enfin, pas n'importe quelle conversation, restons simples. Nous allons utiliser un jeu de données public contenant des conversations ayant eu lieu dans des newsgroups Usenet (à chanter sur l'air de "Je vous parle d'un temps que les moins de vingt ans ne peuvent pas connaaaaaaaaîtreuuuuux").
La documentation relative au jeu de données est donnée ici. Chacun des 18821 messages appartient à un newsgroup, parmi 20 possibles. Chaque newsgroup parle d'un sujet différent : il y a des sujets légers comme les voitures, le baseball, ou le hockey, ou beaucoup plus sérieux, comme le Moyen Orient, la religion ou Windows.
Vous êtes prêts ? C'est parti. Commençons, par télécharger les données d'apprentissage et de test :
$ wget http://ana.cachopo.org/datasets-for-single-label-text-categorization/20ng-train-all-terms.txt
$ wget http://ana.cachopo.org/datasets-for-single-label-text-categorization/20ng-test-all-terms.txt
La différence entre les données d'apprentissage et de test, c'est que les premières peuvent être utilisées pour l'apprentissage du modèle, tandis que les secondes ne peuvent servir que pour son évaluation. S'il s'agissait d'un modèle météo, on aurait dans les données d'apprentissage toutes les données météo du passé, tandis que les données de test concerneraient la journée en cours.
Nous allons charger ces données sous forme de RDD et les convertir en DataFrames, qui est le format recommandé lors de l'utilisation de Spark ML :
from pyspark.sql import Row
def load_dataframe(path):
rdd = sc.textFile(path)\
.map(lambda line: line.split())\
.map(lambda words: Row(label=words[0], words=words[1:]))
return spark.createDataFrame(rdd)
train_data = load_dataframe("20ng-train-all-terms.txt")
test_data = load_dataframe("20ng-test-all-terms.txt")
Dans le jeu de données téléchargé, chaque message est un ensemble de mots séparés par des espaces sur une seule ligne. La ponctuation a déjà été supprimée et tous les mots sont en minuscule. Le premier mot indique le newsgroup auquel appartient le message. On dit que c'est le label du message (le but étant, rappelons-le, de prédire les labels des messages de test).
Nous allons commencer par créer une représentation de nos messages sous la forme d'un sac de mots (bag of words). Un sac de mots est une représentation d'un texte sous la forme d'un histogramme indiquant le nombre d'occurences de chaque mot. On appelle ça un "sac" parce que l'ordre des mots est perdu dans cette représentation. C'est en fait la même chose qu'un nuage de mots. A titre d'exemple, voilà le nuage de mots de l'Iliade :
Réalisé avec WordItOut
Dans un sac de mots, un texte est représenté par un tableau d'entiers de longueur égale à celle du dictionnaire, donc potentiellement de très grande taille. Par exemple, admettons que la langue anglaise contienne 100000 mots. Si je cherche à représenter sous la forme d'un sac de mots anglais la phrase "hello world" il me faudra stocker un vecteur de taille 100000 ne contenant que des zéros, sauf à deux emplacements correspondant à "hello" et "world", où on stockera des 1.
Avec Spark ML, cette opération se fait à l'aide d'un CountVectorizer
:
from pyspark.ml.feature import CountVectorizer
vectorizer = CountVectorizer(inputCol="words", outputCol="bag_of_words")
Cet objetvectorizer
est unEstimator
: lorsque l'on appelle sa méthodefit
sur unDataFrame
il produit unTransformer
:
vectorizer_transformer = vectorizer.fit(train_data)
Le rôle d'unTransformer
est... de transformer les données :
train_bag_of_words = vectorizer_transformer.transform(train_data)
test_bag_of_words = vectorizer_transformer.transform(test_data)
Dans le cas présent,vectorizer_transformer
transforme unDataFrame
contenant une colonnewords
en un autreDataFrame
contenant une colonnebag_of_words
. Comme vous l'avez peut-être deviné, lorsqu'il est appliqué à unDataFrame
la méthodetransform
est évaluée de manière paresseuse, comme pour les transformations réalisées sur les RDD.
Nous avons maintenant dans la colonnebag_of_words
des DataFrames d'apprentissage et de test une représentation sous forme de vecteur d'entiers de nos textes. Il nous reste à apprendre un classifieur. Nous allons utiliser le classifieur NaiveBayes
qui a l'avantage d'être multi-classes : ça signifie que ce modèle peut être utilisé pour différencier plus de deux classes différentes, comme c'est le cas ici. Pour d'autres modèles, il n'est possible de réaliser leur apprentissage que sur des jeux de données contenant deux classes.
Cependant, pour réaliser l'apprentissage du classifieurNaiveBayes
, nous avons besoin de labels sous la forme de nombres flottants, et non de chaînes de caractères comme c'est le cas pour l'instant. En effet, la colonnelabel
contient pour l'instant les noms des newsgroups auxquels appartiennent les messages :
>>> train_data.select("label").distinct().sort("label").show(truncate=False)
+------------------------+
|label |
+------------------------+
|alt.atheism |
|comp.graphics |
|comp.os.ms-windows.misc |
|comp.sys.ibm.pc.hardware|
|comp.sys.mac.hardware |
|comp.windows.x |
|misc.forsale |
|rec.autos |
|rec.motorcycles |
|rec.sport.baseball |
|rec.sport.hockey |
|sci.crypt |
|sci.electronics |
|sci.med |
|sci.space |
|soc.religion.christian |
|talk.politics.guns |
|talk.politics.mideast |
|talk.politics.misc |
|talk.religion.misc |
+------------------------+
On pourrait réaliser une associationlabel --> label_index
à la main en récupérant la liste des labels, mais Spark ML fournit unEstimator
qui fait précisément cela :
from pyspark.ml.feature import StringIndexer
label_indexer = StringIndexer(inputCol="label", outputCol="label_index")
label_indexer_transformer = label_indexer.fit(train_bag_of_words)
train_bag_of_words = label_indexer_transformer.transform(train_bag_of_words)
test_bag_of_words = label_indexer_transformer.transform(test_bag_of_words)
Nous pouvons maintenant réaliser l'apprentissage de notre modèle de classification :
from pyspark.ml.classification import NaiveBayes
classifier = NaiveBayes(
labelCol="label_index", featuresCol="bag_of_words", predictionCol="label_index_predicted"
)
classifier_transformer = classifier.fit(train_bag_of_words)
Appliquons ce modèle à nos données de test :
test_predicted = classifier_transformer.transform(test_bag_of_words)
Examinons le résultat :
>>> test_predicted.select("label_index", "label_index_predicted").limit(10).show()
+-----------+---------------------+
|label_index|label_index_predicted|
+-----------+---------------------+
| 17.0| 17.0|
| 17.0| 17.0|
| 17.0| 17.0|
| 17.0| 17.0|
| 17.0| 17.0|
| 17.0| 17.0|
| 17.0| 19.0|
| 17.0| 17.0|
| 17.0| 17.0|
| 17.0| 17.0|
+-----------+---------------------+
C'est pas si mal ! Sur les dix premiers messages notre classifieur ne s'est trompé qu'une fois.
La précision de notre modèle peut être évaluée facilement à l'aide de la classe MulticlassClassificationEvaluator
:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="label_index_predicted", metricName="accuracy")
accuracy = evaluator.evaluate(test_predicted)
print("Accuracy = {:.2f}".format(accuracy))
On obtient une précision de 0.80, ce qui signifie que 80% de nos données sont correctement classifiées. Pas mal ! D'après la page à partir de laquelle nous avons téléchargé les données, la meilleure précision obtenue est de 0.8284.
Récapitulons le code que nous avons rédigé sous la forme d'une application Spark :
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext()
spark = SparkSession.builder.getOrCreate()
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import CountVectorizer, StringIndexer
from pyspark.sql import Row
def load_dataframe(path):
rdd = sc.textFile(path)\
.map(lambda line: line.split())\
.map(lambda words: Row(label=words[0], words=words[1:]))
return spark.createDataFrame(rdd)
# Load train and test data
train_data = load_dataframe("20ng-train-all-terms.txt")
test_data = load_dataframe("20ng-test-all-terms.txt")
# Learn the vocabulary of our training data
vectorizer = CountVectorizer(inputCol="words", outputCol="bag_of_words")
vectorizer_transformer = vectorizer.fit(train_data)
# Create bags of words for train and test data
train_bag_of_words = vectorizer_transformer.transform(train_data)
test_bag_of_words = vectorizer_transformer.transform(test_data)
# Convert string labels to floats
label_indexer = StringIndexer(inputCol="label", outputCol="label_index")
label_indexer_transformer = label_indexer.fit(train_bag_of_words)
train_bag_of_words = label_indexer_transformer.transform(train_bag_of_words)
test_bag_of_words = label_indexer_transformer.transform(test_bag_of_words)
# Learn multiclass classifier on training data
classifier = NaiveBayes(
labelCol="label_index", featuresCol="bag_of_words", predictionCol="label_index_predicted"
)
classifier_transformer = classifier.fit(train_bag_of_words)
# Predict labels on test data
test_predicted = classifier_transformer.transform(test_bag_of_words)
# Classifier evaluation
evaluator = MulticlassClassificationEvaluator(
labelCol="label_index", predictionCol="label_index_predicted", metricName="accuracy"
)
accuracy = evaluator.evaluate(test_predicted)
print("Accuracy = {:.2f}".format(accuracy))
Comme vous l'avez peut-être remarqué, cette application est très verbeuse. A quatre reprises nous créons unEstimator
sur lequel nous appelons la méthodefit
qui retourne unTransformer
, sur lequel nous appliquons à son tour la méthodetransform
. Cette succession d'opérations est classique en machine learning. Pour simplifier le code des applications, Spark ML propose de formaliser cet enchaînement sous la forme d'unPipeline
.
Simplifiez-vous la vie avec les Pipelines
Plutôt que d'instancier chaqueEstimator
, d'appeler la méthodefit
puistransform
séparément, nous allons instancier un Pipeline
dont le rôle va être d'appliquer une série d'Estimator à nos données. UnPipeline
s'instancie de la manière suivante :
from pyspark.ml import Pipeline
pipeline = Pipeline([estimator1, estimator2, transformerr1, estimator3, ...])
Lorsqu'on appelle la méthodefit
sur unPipeline
, les Estimators sont changés en Transformers, et les Transformers ne changent pas.
Bref, sans plus tarder, voyons comment unPipeline
permet de simplifier notre code :
vectorizer = CountVectorizer(inputCol="words", outputCol="bag_of_words")
label_indexer = StringIndexer(inputCol="label", outputCol="label_index")
classifier = NaiveBayes(
labelCol="label_index", featuresCol="bag_of_words", predictionCol="label_index_predicted",
)
pipeline = Pipeline(stages=[vectorizer, label_indexer, classifier])
pipeline_model = pipeline.fit(train_data)
test_predicted = pipeline_model.transform(test_data)
Plus simple, n'est-ce pas ? L'intérêt de cette formalisation est qu'elle nous permet d'ajouter facilement des étapes au Pipeline. L'application réécrite à l'aide de pipelines peut être consultée (et améliorée !) sur le dépôt de code github consacré au cours. A titre d'exercice, ajoutez une pondération TF-IDF à notre représentation de sac de mots. Il se pourrait bien que vous battiez le record de précision pour notre jeu de données...
Un dernier mot sur les RDD...
À l'heure où nous parlons, l'API de Spark ML est en cours de réécriture. L'API à base de DataFrames que nous avons vue est la version moderne, tandis qu'il existe une version plus ancienne qui opère, elle, sur des RDD. Comme indiqué dans la documentation de Spark l'API à base de RDD est vouée à disparaître dans Spark 3.0, et aucune nouvelle fonctionnalité n'y sera ajoutée d'ici là. Cependant il existe encore des fonctionnalités qui ne sont pas présentes dans l'API à base de DataFrames. Par exemple, la réduction de dimension par PCA (Principal Component Analysis) et la classification par SVM (Support Vector Machine), pour ne citer qu'eux, opèrent encore sur des RDD.
Les fonctionnalités à base de RDD sont disponibles dans le modulepyspark.mllib
et non danspyspark.ml
comme on l'a vu jusqu'à présent. Ceci peut vous aider à savoir sur quel type de données opèrent les fonctions que vous utilisez.
Nous n'allons pas couvrir l'API de Spark MLlib dans ce cours, même si nous vous encourageons à consulter sa documentation. Toutes les notions que nous avons vues ensemble dans les chapitres précédents sur les RDD de ce cours restent valables dans Spark MLlib. Vous devez seulement savoir que les classifieurs de Spark MLlib opèrent sur des RDD contenant des objets de type LabeledPoint
.
UnLabeledPoint
est un objet qui contient un label, sous la forme d'un flottant, et un vecteur d'attributs. Pour reprendre l'exemple précédent sur la classification de textes, voici comment convertir un DataFrame contenant unlabel_index
et unbag_of_words
en un RDD de LabeledPoints :
from pyspark.mllib.classification import LabeledPoint
dataframe.rdd.map(lambda row: LabeledPoint(row.label_index, row.bag_of_words.toArray()))
Avec un label et des données, unLabeledPoint
contient tout ce qui est nécessaire à un classifieur pour réaliser son apprentissage.