Interogation
La littérature (exemple ici) nous indique que les DataFrame Spark sont bien plus rapide que les RDD.
Intuitivement une surcouche (DataFrame est une surcouche de RDD) est moins rapide que son entité mère.
DataFrame utilise l’optimiseur Catalyze afin d’optimiser, compiler, ré-organiser… les actions du DataFrame et peux ainsi améliorer les performance par rapport à RDD.
Cette optimisation est-elle toujours viable ?
Je me suis posé la question sur des domaines non adaptés à la structure des DataFrame.
Le DataFrame est une matrice : structure très adapté à la description de mesure multi-variables (et donc aux traitement de statistique et de machine learning)
Que se passe-t-il sur des traitements non orienté matrice ?
Domaine de test
Je me suis donc penché sur une source non structurée : un ensemble de simple fichier texte (une dizaine de romans bien épais dans le style « guerre et paix ») pour valider (ou non) l’optimisation de DataFrame par rapport à RDD.
Intuitivement RDD semble plus proche (car orienté données plus brute que DataFrame) de ce type de donné et plus simple à manipuler : on peut abuser des fonctions lambda avec RDD tandis que DataFrame pousse à utiliser les méthodes de son API.
Jeux de test
Il suffit de télécharger un gros livre, roman, encyclopédie… en format texte (UTF8) sur Internet : il en existe des centaines en domaine public.
Source
Préparation : import, context, variables d’environnement..
import os, time os.environ['SPARK_HOME'] = '/home/lde/spark-2.0.0-bin-hadoop2.7' os.environ['PYSPARK_PYTHON']='python3' from pyspark import SparkContext from pyspark.sql import SQLContext from pyspark.sql.functions import length from pyspark.sql.functions import split, explode sc = SparkContext("local", "Simple App") sqlContext = SQLContext(sc) dataFile = "./data_book/*.txt"
RDD
textFile = sc.textFile(dataFile) start=time.time() rd1=textFile.flatMap(lambda x: x.split(" ")) \ .map(lambda word : (word.lower(),1)) \ .reduceByKey(lambda a,b : a+b) \ .filter(lambda v : v[1]>50) \ .filter(lambda v : len(v[0])>5) \ .coun() stop=time.time() print("RDD : ",int(1000*(stop-start)))
DataFrame
df=sqlContext.read.text(dataFile) start=time.time() df1=(df .select(explode(split(df.value," ")).alias("word")) .groupBy("word") .count() .withColumnRenamed("count","wordCount") .filter(length("word")>5) ).cache() df2=(df1 .filter(df1.wordCount>50) .orderBy(df1.word.desc()) .count() )
Le source du DataFrame contient la création de 2 Dataframe alors que la création du RDD n'en demande qu'un.
Je rédigerai un article sur ce point ainsi que sur les problème de nommage de colonnes générées qui sont, à mon sens, assez maladroit et facteur de bug.
Bench
On lance (boucle) 10 itérations de test. Les temps sont exprimés en millisecondes.
| 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | RDD | 10320 | 7320 | 7184 | 7387 | 7509 | 7146 | 7186 | 6836 | 7533 | 7496 | DataFrame | 17737 | 9671 | 7982 | 7767 | 7585 | 7667 | 7185 | 7002 | 7092 | 6693 |
Remarques :
- La première itération est bien plus longe à traiter que les suivante
- Lié à l'environnement de test ?
- Source éxécuté via l'EDI Spyder
- Problématique sachant qu'en BigData le but est de lancer 1 fois un (gros) processus et non de l'itérer (hormis Spark Streaming mais je ne vais pas parler d'un domaine que je ne maîtrise pas encore)
- Lié à l'environnement de test ?
- Le DataFrame donne des résultats moins performants que le RDD (pour ce cas précis)
Conclusion
- Le DataFrame n'est peut être pas adapté à tous les type de données, en particulier les données non structurées.
- Je me suis peut être fourvoyé dans mon bench : en effet je suis un "junior" en Spark
- Réflexion :
- Tester un modèle hydride
- RDD pour l'ETL des données non structurées, puis DataFrame pour la partie traitement métier.
- Tester un modèle hydride
N'hésitez pas à commenter, proposer des alternatives à ce code 🙂