PySpark et Docker : premiers pas

Au cours de mon stage chez Zenika, j’ai pu découvrir de nombreuses nouvelles technologies. L’une d’entre elles fait beaucoup de bruit dans le domaine du Big Data et du Machine Learning. Je veux bien sur parler ici de Spark. Je vais vous présenter rapidement l’outil et vous proposer de lancer un traitement simple en Python à l’aide de PySpark.

Premiers pas avec PySpark sur Docker

Général

Spark est un projet qui a été initialisé par l’AMPLab de l’université de Berkeley. C’est un framework open sources et généraliste permettant le traitement d’importants volumes de données de manière rapide et distribuée. Semblable à Hadoop, il étend cependant ses possibilités de traitements. Si Hadoop se limite au couple de traitement Map/Reduce, Spark fournit davantage de fonctions de manipulation des données donnant des performances jusqu’à 100 fois supérieures. Il est adapté au Machine Learning grâce à ses capacités de traitements de données en mémoire et cela de manière distribuée sur plusieurs noeuds d’un cluster. Pour la gestion du stockage distribué et du clustering, il peut être utilisé seul ou avec d’autres outils comme Hadoop et son système de fichiers HDFS, Cassandra, Mesos ou Amazon S3.

Spark possède plusieurs composants ayant des objectifs différents. Spark Core est le coeur de l’outil. Il contient les RDD (Resilient Distributed Dataset) qui sont les objets de manipulation des données. SparkSQL est une surcouche de Spark Core permettant l’utilisation d’un langage de requétage sur des données structurée. Il ajoute également le langage SQL à travers une interface en ligne de commande et des serveurs ODBC et JDBC. Spark Streaming permet de gérer les données envoyées de manière continue ou sous forme de petits batchs, en temps réel avec les capacités du Spark Core. MLlib est une librairie de Machine Learning qui utilise les capacités de traitements distribués de Spark Core. D’après les développeurs de MLlib, cette librairie serait neuf fois plus rapide dans l’implémentation de l’algorithme ALS qu’Apache Mahout. Aujourd’hui, Spark propose des API pour les langages Scala (langage de développement), Java, Python et R depuis peu. Nous allons voir comment utiliser PySpark.

SparkCore et RDD

Spark Core repose principalement sur les objets de type Resilient Distributed Dataset. Ces objets sont des collections d’objets traitées en parallèle et résistantes aux erreurs. Les RDD possèdent deux types de méthodes : les transformations et les actions. Les transformations sont des méthodes qui vont appliquer une fonction sur chaque ligne d’un RDD pour en fournir un nouveau. Les actions vont appliqués les transformations et retourner les données résultantes. Les méthodes de type traitement sont dites lazy, elles ne sont pas exécutées au moment de l’appel de la méthode mais lorsqu’une action est appelée. Chacune des transformations est donc stockée temporairement jusqu’à ce qu’une méthode action impose leur application sur le jeu de données d’origine. Par défaut tous les RDD sont appliqués à chaque appel. Cependant il est possible d’utiliser la méthode cache qui conserve les données traitées du dernier RDD en mémoire pour les réutiliser plus rapidement plus tard. Il est préférable de le faire après un traitement lourd et lorsque l’on sait que les données vont être réutilisées.

Les différentes méthodes de la classe RDD sont présentes à cette adresse. Dans cette article, je vais utiliser quelques une de ces méthodes. Certaines autres sont également importantes à connaître, n’hésitez pas à aller prendre connaissance de leur existence

Utilisation du conteneur Docker

Je ne décrirai pas ici comment installer Spark sur une machine directement. Je pense qu’il peut être intéressant d’étudier Spark dans Docker car cela permettrait de facilement gérer l’augmentation de la charge. Le principe de cluster s’inscrit bien dans l’utilisation de Docker. Je vous invite donc à récuperer le conteneur présent sur DockerHub avec Spark et PySpark.

Afin de pouvoir réaliser nos traitements à l’intérieur d’un conteneur, nous allons le lancer en montant deux volumes, un pour les fichiers sources et l’autre pour les données. Assurez vous de bien diriger les volumes vers vos propres dossiers.

Jeu de données

Pour cet exemple nous allons travailler sur un jeu de données tiré des crimes perpétrés à Chicago entre 2013 et 2015. Ces données ne m’appartiennent pas et sont distribuées publiquement par la ville de Chicago. Ci-dessous, un exemple des premières lignes du jeu de données. A noter que la première ligne n’est présente qu’à titre indicatif et qu’elle n’est pas présente dans le fichier. Attention si vous utilisez un export du site, certaines colonnes contiennent des virgules qui est le séparateur par défaut dans l’export CSV.

Lien de téléchargement du fichier.

Création d’un script Python

Nous allons créer un script Python, CrimeTreatment.py. Celui-ci est présent en entier à la fin de cet article. Dans ce script, nous avons besoin de nous connecter au contexte de Spark qui nous donnera accès à toutes les méthodes de manipulation et nous liera au cluster Spark.

Une fois le contexte initialisé, nous allons pouvoir lire le fichier et charger nos données.

En réalité, comme nous l’avons vu plus tôt, au moment de l’exécution de cette ligne, les données ne sont pas encore montées en mémoire. Nous initialisons simplement un RDD qui sera la base de nos traitements futurs. C’est ce RDD qui va nous permettre de manipuler nos données. Lorsqu’une action sera exécutée, il chargera le fichier sur le cluster Spark et ainsi les traitements pourront être effectués sur les différents noeuds du cluster.

Traitement

Tout d’abord, nous allons séparer nos données en fonction du séparateur CSV qui ici est la virgule. La fonction map permet de retourner un nouveau RDD en appliquant une fonction sur chacune des lignes du RDD d’origine. Nous appliquons donc une fonction de séparation de chaînes de caractères.

 

Nombre de crimes par an

Notre premier exercice va être de rechercher le nombre de crimes commis chaque année. Le jeu de données que je fournis ne contient que trois années afin de ne pas être trop important. Dans un premier temps, récupérons l’année avec la méthode map sur la 6ème colonne du tableau (index 5) issu du split précédent. Nous allons dans cette même méthode lui associer le chiffre 1. La seconde partie du traitement va nous permettre de compter le nombre de fois où apparaît chaque clé. La clé est toujours le premier élément de la ligne traitée, cela peut être un objet ou une valeur primaire comme ici. Le premier élément est ici l’année. La fonction reduceByKey va réunir les valeurs associées à chaque clé en utilisant la fonction fournie. Dans notre cas, l’une des valeurs est un compteur qui sera additionné à la valeur « 1 » définie plus tôt. Ceci permet de tenir le compte du nombre de crimes. Ce fonctionnement est préférable à l’utilisation de la méthode groupBy qui va imposer l’échange de nombreuses données entre les noeuds du cluster.

La variable nbCrimesByYear est un RDD. Pour en voir le contenu, il faut utiliser une méthode action pour exécuter les traitements. La méthode collect permet de récupérer l’ensemble du RDD. Les résultats sont donc 296 459 crimes en 2013, 264 960 en 2014 et 119 963 en 2015.

 

Nombre de crimes incluant un véhicule

Nous allons maintenant parcourir notre dataset à la recherche des crimes incluant un véhicule. Pour cela, il nous faut filtrer la colonne « type primaire » d’index 1 et vérifier que le terme « VEHICLE » est présent avec la méthode filter. Une fois ce filtre appliqué, une méthode action count ne fournira notre compte. Nous avons 25806 crimes incluant un véhicule.

 

District le plus actif

Ici, le raisonement est le même que le précédent, nous allons d’abord compter les crimes par district puis trier. Pour le établir le compte, j’ai choisi d’extraire la lambda et d’en faire une fonction. Ceci permet de factoriser le code. Cette factorisation peut être étendue à l’ensemble de l’opération de mapping et de réduction. Une fois le compte établi, nous trions les résultats de manière décroissante avec la fonction sortBy. La méthode action sera cette fois-ci la méthode first qui nous permet d’obtenir uniquement la première ligne de résultats. Ici, le district 11 est celui qui as le plus de crimes référencés.

 

Exécution du Script

Afin d’exécuter notre script Python, nous allons appeler la commande spark-submit. Cette commande va communiquer les scripts à un noeud Spark qui va ensuite répartir les traitements sur le cluster. Cette commande possède plusieurs arguments mais nous n’utiliserons ici que l’argument –master qui permet de définir le noeud master. Avec la valeur local[4], on indique que l’on souhaite utiliser seulement le noeud local et que celui-ci doit utiliser 4 cores de notre machine. Ceci permet le lancement de tests en développement.

Script complet

Travailler avec Spark est vraiment très intéressant. Ses capacités sont vraiment poussée et permettent la réalisation de traitement sur une grande échelle. L’API Python n’est pas la plus aboutie mais permet d’avoir un bon aperçu des possibilités de l’outil.

spacer

Laisser un commentaire