Spark DataFrame vs RDD : petit bench appliqué à la recherche d’occurrences de mots.

Interogation

La littérature (exemple ici) nous indique que les DataFrame Spark sont bien plus rapide que les RDD.

Intuitivement une surcouche (DataFrame est une surcouche de RDD) est moins rapide que son entité mère.

DataFrame utilise l’optimiseur Catalyze afin d’optimiser, compiler, ré-organiser… les actions du DataFrame et peux ainsi améliorer les performance par rapport à RDD.

Cette optimisation est-elle toujours viable ?

Je me suis posé la question sur des domaines non adaptés à la structure des DataFrame.

Le DataFrame est une matrice : structure très adapté à la description de mesure multi-variables (et donc aux traitement de statistique et de machine learning)

Que se passe-t-il sur des traitements non orienté matrice ?

Domaine de test

Je me suis donc penché sur une source non structurée : un ensemble de simple fichier texte (une dizaine de romans bien épais dans le style « guerre et paix ») pour valider (ou non) l’optimisation de DataFrame par rapport à RDD.

Intuitivement RDD semble plus proche (car orienté données plus brute que DataFrame) de ce type de donné et plus simple à manipuler : on peut abuser des fonctions lambda avec RDD tandis que DataFrame pousse à utiliser les méthodes de son API.

Jeux de test

Il suffit de télécharger un gros livre, roman, encyclopédie… en format texte (UTF8) sur Internet : il en existe des centaines en domaine public.

Source

Préparation : import, context, variables d’environnement..


import os, time


os.environ['SPARK_HOME'] = '/home/lde/spark-2.0.0-bin-hadoop2.7'
os.environ['PYSPARK_PYTHON']='python3'


from pyspark import SparkContext
from pyspark.sql import SQLContext
from  pyspark.sql.functions import length
from pyspark.sql.functions import split, explode


sc = SparkContext("local", "Simple App")
sqlContext = SQLContext(sc)
dataFile = "./data_book/*.txt"

RDD


textFile = sc.textFile(dataFile)
start=time.time()
rd1=textFile.flatMap(lambda x: x.split(" ")) \
          .map(lambda word : (word.lower(),1)) \
          .reduceByKey(lambda a,b : a+b) \
          .filter(lambda v : v[1]>50) \
          .filter(lambda v : len(v[0])>5) \
          .coun()
stop=time.time()
print("RDD : ",int(1000*(stop-start)))

DataFrame


df=sqlContext.read.text(dataFile) 
start=time.time()
df1=(df
      .select(explode(split(df.value," ")).alias("word"))
      .groupBy("word")
      .count()    
      .withColumnRenamed("count","wordCount")
      .filter(length("word")>5)
    ).cache()
df2=(df1
      .filter(df1.wordCount>50)
      .orderBy(df1.word.desc())
      .count()
)

Le source du DataFrame contient la création de 2 Dataframe alors que la création du RDD n'en demande qu'un.

Je rédigerai un article sur ce point ainsi que sur les problème de nommage de colonnes générées qui sont, à mon sens, assez maladroit et facteur de bug.

Bench

On lance (boucle) 10 itérations de test. Les temps sont exprimés en millisecondes.

          |  1    |  2   |  3   |  4   |  5   |  6   |  7   |  8   |  9   |  10  | 
RDD       | 10320 | 7320 | 7184 | 7387 | 7509 | 7146 | 7186 | 6836 | 7533 | 7496 |
DataFrame | 17737 | 9671 | 7982 | 7767 | 7585 | 7667 | 7185 | 7002 | 7092 | 6693 |

Remarques :

  • La première itération est bien plus longe à traiter que les suivante
    • Lié à l'environnement de test ?
      • Source éxécuté via l'EDI Spyder
    • Problématique sachant qu'en BigData le but est de lancer 1 fois un (gros) processus et non de l'itérer (hormis Spark Streaming mais je ne vais pas parler d'un domaine que je ne maîtrise pas encore)
  • Le DataFrame donne des résultats moins performants que le RDD (pour ce cas précis)

Conclusion

  • Le DataFrame n'est peut être pas adapté à tous les type de données, en particulier les données non structurées.
  • Je me suis peut être fourvoyé dans mon bench : en effet je suis un "junior" en Spark
  • Réflexion :
    • Tester un modèle hydride
      • RDD pour l'ETL des données non structurées, puis DataFrame pour la partie traitement métier.

N'hésitez pas à commenter, proposer des alternatives à ce code 🙂

Biographe :