Découvrir le Framework Apache Spark

La notion d’exécution distribuée a le « vent en poupe » ces dernières années, avec l’avènement de Hadoop et de son éco système. Apache Hadoop est composé de plusieurs briques, permettant de paralléliser des traitements lourds, de requêter des données volumineuses, de distribuer le stockage et plus encore.

Tous les projets n’ont pas forcément besoin de monter tout cet écosystème, pour distribuer l’exécution de leur programme. Dans ce sens là, Spark peut être perçu comme un concurrent au mammouth qu’est Hadoop.

Spark se veut être un moteur rapide, distribué, et facilement scalable, pour le traitement de gros volume de données. Spark est hébergé par la fondation Apache, comme Hadoop, il est d’ailleurs intégré dans certaines distributions d’Hadoop, afin de remplacer la brique Map Reduce.

Introduction

Le but de cet article, est de vous initier, et vous donner envie de découvrir ou d’approfondir l’outil Spark. Au même titre qu’Hadoop, c’est un outil puissant, mais à utiliser à bon escient, car certains traitements ne se prêtent pas à la distribution.

Premier exemple

Pour ce premier exemple, je me suis allégrement basé sur un exemple fourni sur le site officiel de Spark (https://spark.apache.org/examples.html).

L’exemple « word count » me semblait être un parfait candidat pour se rendre compte de l’intérêt d’un tel outil dans nos architectures.

En effet, même si un tel traitement n’a fondamentalement que peu d’utilité, il nécessite une gestion de la distribution robuste, afin de permettre de distribuer le travail, et d’agréger les résultats.

Le principe de l’exemple est de parcourir un fichier volumineux, constitué d’une succession de mots, et d’afficher au final le nombre d’itérations de chacun des mots. Lâchez-vous sur la taille du fichier, tout l’intérêt de la distribution est dans le traitement de fichiers volumineux !

JavaRDD<String> file = spark.textFile(« hdfs://… »);
JavaRDD<String> words = file.flatMap(new FlatMapFunction<String, String>() {
  public Iterable<String> call(String s) { return Arrays.asList(s.split( » « )); }
});
JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
  public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); }
});
JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer>() {
  public Integer call(Integer a, Integer b) { return a + b; }
});
counts.saveAsTextFile(« hdfs://… »);

Bon, à l’heure du java 8 et des lambda expressions je me suis permis de reprendre ce code pour le simplifier,  et afficher le résultat sur la sortie standard :

sparkContext.textFile(« file://myFile.txt »)
      .flatMap(
            s -> Arrays.asList(s.split( » « )))
      .mapToPair(
            s -> new Tuple2<>(s, 1))
      .reduceByKey((a, b) -> a + b)

      .foreach(t -> System.out.println(t._1() +  »  » + t._2()));

Le principe du code est assez simple, et le nom des méthodes plutôt parlant, je ne me lancerais donc pas dans une longue explication du code. Bon allez, ok, puisque vous insistez :

  • On split la liste à partir du caractère espace;
  • On forme des « tuples » constitués pour la clé d’un mot et pour la valeur d’un compteur pour son nombre d’apparition (1 initialement);
  • On réduit le résultat de la map en remplaçant la valeur du Tuple par l’addition du nombre d’apparition d’un même mot;
  • Pour chaque tuple restant, après le passage de la méthode « reduceByKey », on affiche le mot et le nombre d’apparitions.

Pour les aficionados de Java 8, cette écriture vous rappelle quelque chose je suppose…

Quelques remarques

Vous avez vu apparaître la notion de RDD (de JavaRDD dans mon exemple pour être précis). Cet objet fait partie du cœur de Spark, il signifie « Resilient Distributed DataSet ». Les données contenues dans cet objet sont distribuées à travers le cluster Spark, et non sur une unique machine, mais on y reviendra plus tard …

Spark propose 2 méthodes pour la phase de Reduce :

  • Reduce : réduit 2 à 2 les éléments indépendamment de leur type, en leur appliquant une fonction;
  • ReduceByKey : ne travaille que sur des JavaPairRDD (une liste de tuples clé/valeur), et regroupe les valeurs ayant la même clé en leur appliquant une fonction.

Pour exécuter le code ci-dessus sur votre petite machine, il faudra créer au préalable un SparkContext de cette manière-là :

SparkConf conf = new SparkConf(true)
            .setMaster(« local »)
            .setAppName(« spongebob »);
JavaSparkContext sparkContext = new JavaSparkContext(conf);

Pour faire simple, vous venez de créer un contexte Spark, qui s’exécutera sur une unique machine, et sur un seul thread (mettre local[n] avec le nombre de thread souhaité).

Java 8 fait ça très bien

Alors oui, en Java 8, on peut faire aussi bien, sinon même plus clair, en utilisant les Stream et les Collectors  (2 nouveautés ultra pratiques de Java 8):

File file = new File(« myFile.txt »);

Files.lines(Paths.get(file.toURI()), Charset.defaultCharset())
      .parallel()
      .flatMap(line -> Arrays.stream(line.split( » « )))
      .collect(Collectors.groupingBy(String::toString, Collectors.counting()))
      .forEach((s, a) -> System.out.println(s +  »  » + a));

On remarquera, au passage, l’utilisation de « parallel() » permettant d’optimiser l’exécution du code en le rendant  multi thread.

En l’état, sans mise en place d’un cluster Spark, le code natif de java 8 écrase, en terme de performance, l’exécution avec Spark. Si on distribue, ça va être une toute autre histoire…

Mise en cluster

Le gros, pour ne pas dire le seul, intérêt d’un outil comme Spark, c’est la possibilité de distribuer l’exécution d’une tache :

  • Plusieurs machines vont travailler en combinant leur puissance de calcul, pour résoudre votre problème le plus rapidement possible.

Il faut donc créer un cluster, composé d’un master ou Cluster Manager, ainsi que de Workers (les petites mains qui vont réaliser le travail).

Ci-dessous, un schéma tiré du site officiel de Spark :

cluster-overview

Le Driver Program est l’application qui crée l’objet Spark Context. Le jar de cette application (uber Jar) sera envoyé dans le cluster pour que les « workers » puissent travailler.

Pour information, il y a 3 moyens de gérer son Cluster acceptés par Spark :

  • Spark : cluster manager intrinsèque à Spark s’appuyant sur Akka;
  • Mesos : un gestionnaire de cluster de la fondation Apache;
  • Yarn : gestionnaire de cluster Hadoop.

Pour en revenir à notre petit code, pour l’exécuter sur un mode cluster, il vous faudra remplacer le mot clé local, dans la création de votre contexte Spark, par le nom de votre master et son port d’écoute (si des informations sur le contexte Spark sont définies en paramètre du spark-submit, elles surchargent celles définies dans le code directement).

Voici les quelques commandes nécessaires pour le démarrage d’un « master », l’inscription d’un « worker », et finalement le lancement de notre code sur le cluster nouvellement créé.

  • spark-class org.apache.spark.deploy.master.Master
  • spark-class org.apache.spark.deploy.worker.Worker spark://ITEM-81859.emea.msad.sopra:7077
  • spark-submit –master spark://ITEM-81859.emea.msad.sopra:7077 –class fr.perso.Launcher –executor-memory 500M –total-executor-cores 4 D:/dev/java/workspaces/perso/spark/target/spark-1.0-SNAPSHOT.jar

Et en bonus, en vous connectant sur le port 8080 (port par défaut) sur votre master, vous aurez la surprise de découvrir une interface d’administration de votre cluster !

Maintenant, il vous suffit d’ajouter des workers, pour voir diminuer le temps de traitement de votre tâche.

Une petite remarque pour les plus sceptiques :

Si le fichier sur lequel vous comptez les mots est trop petit, le simple fait de distribuer son traitement pourra être plus long que le traitement lui-même sur votre machine en local. Dans ce cas, un outil de Map Reduce distribué ne vous apportera pas grand-chose !

Si vous tournez sous Windows…

Lors de vos développements, vous êtes peut être amené à travailler sous Windows. Lors de l’élaboration des exemples j’ai rencontré quelques problèmes lors de la mise en place du cluster et du test du code. Ces conseils ne couvrent que la partie «Stand Alone» de Spark, je n’ai pas déployé sur Windows un véritable cluster Spark.

Voici quelques pièges à éviter :

Lors du build des sources de spark, vous rencontrerez un problème avec le Scala Style :
« LDAModel.scala fails scalastyle »
Ci-joint le JIRA avec, en attendant la version 1.4, la résolution du problème : https://issues.apache.org/jira/browse/SPARK-6532

Si vous récupérez les exemples de code de ce blog ou sur internet et que vous les exécutez en l’état sous windows vous allez avoir l’erreur suivante :

ERROR Shell: Failed to locate the winutils binary in the hadoop binary path

En effet, le code de Spark référence un composant Hadoop, même si vous n’utilisez aucune fonctionnalité dans votre code.

Pour le résoudre, suivez la solution exposée ici : http://qnalist.com/questions/4994960/run-spark-unit-test-on-windows-7

Conclusion

Avec ce petit exemple, j’espère que vous avez pu apercevoir la puissance d’un tel outil. La documentation officielle est bien fournie, et la communauté des utilisateurs est très active. Cet outil, bien que s’adaptant parfaitement à un écosystème Hadoop, peut être aussi vu comme une alternative au rouleau compresseur qu’est Hadoop.

En espérant vous avoir donné envie de découvrir cet outil !

Bon code à tous…

Sources:

Learning Spark chez O’REILLY

https://spark.apache.org/

Le source des exemples ci-dessous :

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* Created by fleroy on 08/04/2015.
*/
public class Launcher implements Serializable {

   public static void main(String[] args) throws IOException {

      System.setProperty(« hadoop.home.dir », « C:/ winutils »);

      SparkConf conf = new SparkConf(true)
            .setMaster(« local »)
            .setAppName(« spongebob »);
      JavaSparkContext sparkContext = new JavaSparkContext(conf);

      long startTimeSpark = System.currentTimeMillis();

      final JavaPairRDD<String, Integer> stringIntegerJavaPairRDD = sparkContext.textFile(
            « file://myFile.txt »).flatMap(s -> Arrays.asList(s.split( » « ))).mapToPair(
            s -> new Tuple2<>(s, 1)).cache().reduceByKey((a, b) -> a + b);
      //.foreach(t -> System.out.println(t._1() +  »  » + t._2()));

stringIntegerJavaPairRDD.foreach(t -> System.out.println(t._1() +  »  » + t._2()));

      long stopTimeSpark = System.currentTimeMillis();
      long elapsedTimeSpark = stopTimeSpark – startTimeSpark;
      System.out.println(« en spark :  » + elapsedTimeSpark);

      // ####################### En pure Java 8 #######################

File file = new File(« myFile.txt »);

      long startTime = System.currentTimeMillis();

      Files.lines(Paths.get(file.toURI()), Charset.defaultCharset())
            .flatMap(line -> Arrays.stream(line.split( » « )))
            .collect(Collectors.groupingBy(String::toString, Collectors.counting()))
            .forEach((s, a) -> System.out.println(s +  »  » + a));

      long stopTime = System.currentTimeMillis();
      long elapsedTime = stopTime – startTime;
      System.out.println(« en full java 8 :  » + elapsedTime);

      startTime = System.currentTimeMillis();

      Files.lines(Paths.get(file.toURI()), Charset.defaultCharset())
            .parallel()
            .flatMap(line -> Arrays.stream(line.split( » « )))
            .collect(Collectors.groupingBy(String::toString, Collectors.counting()))
            .forEach((s, a) -> System.out.println(s +  »  » + a));

      stopTime = System.currentTimeMillis();
      elapsedTime = stopTime – startTime;
      System.out.println(« en full java 8 mode parallel :  » + elapsedTime);

   }

}

Leave a Reply

Your email address will not be published. Required fields are marked *