spark - alexis seigneurin (français)
TRANSCRIPT
Alexis Seigneurin@aseigneurin @ippontech
Spark
● Traitement de larges volumes de données● Traitement distribué (commodity hardware)● Ecrit en Scala, bindings Java et Python
Histoire
● 2009 : AMPLab de l'Université de Berkeley● Juin 2013 : "Top-level project" de la
fondation Apache● Mai 2014 : version 1.0.0● Actuellement : version 1.2.0
Use cases
● Analyse de logs● Traitement de fichiers texte● Analytics● Recherche distribuée (Google, avant)● Détection de fraude● Recommendation (articles, produits...)
Proximité avec Hadoop
● Mêmes use cases● Même modèle de
développement : MapReduce
● Intégration dans l'écosystème
Plus simple qu’Hadoop
● API plus simple à prendre en main● Modèle MapReduce "relâché"● Spark Shell : traitement interactif
Plus rapide qu’Hadoop
Spark officially sets a new record in large-scale sorting (5 novembre 2014)
● Tri de 100 To de données● Hadoop MR : 72 minutes
○ Avec 2100 noeuds (50400 cores)
● Spark : 23 minutes○ Avec 206 noeuds (6592 cores)
● Resilient Distributed Dataset● Abstraction, collection traitée en parallèle● Tolérant à la panne● Manipulation de tuples :
○ Clé - Valeur○ Tuples indépendants les uns des autres
RDD
Sources
● Fichier sur HDFS● Fichier local● Collection en mémoire● Amazon S3● Base NoSQL● ...● Ou une implémentation custom de
InputFormat
Transformations
● Manipule un RDD, retourne un autre RDD● Lazy !● Exemples :
○ map() : une valeur → une valeur○ mapToPair() : une valeur → un tuple○ filter() : filtre les valeurs/tuples○ groupByKey() : regroupe la valeurs par clés○ reduceByKey() : aggrège les valeurs par clés○ join(), cogroup()... : jointure entre deux RDD
Actions finales
● Ne retournent pas un RDD● Exemples :
○ count() : compte les valeurs/tuples○ saveAsHadoopFile() : sauve les résultats au
format Hadoop○ foreach() : exécute une fonction sur chaque
valeur/tuple○ collect() : récupère les valeurs dans une liste
(List<T>)
● Arbres de Paris : fichier CSV en Open Data● Comptage d’arbres par espèce
Spark - Exemple
geom_x_y;circonfere;adresse;hauteurenm;espece;varieteouc;dateplanta48.8648454814, 2.3094155344;140.0;COURS ALBERT 1ER;10.0;Aesculus hippocastanum;;48.8782668139, 2.29806967519;100.0;PLACE DES TERNES;15.0;Tilia platyphyllos;;48.889306184, 2.30400164126;38.0;BOULEVARD MALESHERBES;0.0;Platanus x hispanica;;48.8599934405, 2.29504883623;65.0;QUAI BRANLY;10.0;Paulownia tomentosa;;1996-02-29...
Spark - ExempleJavaSparkContext sc = new JavaSparkContext("local", "arbres");
sc.textFile("data/arbresalignementparis2010.csv") .filter(line -> !line.startsWith("geom")) .map(line -> line.split(";")) .mapToPair(fields -> new Tuple2<String, Integer>(fields[4], 1)) .reduceByKey((x, y) -> x + y) .sortByKey() .foreach(t -> System.out.println(t._1 + " : " + t._2));
[... ; … ; …]
[... ; … ; …]
[... ; … ; …]
[... ; … ; …]
[... ; … ; …]
[... ; … ; …]
u
m
k
m
a
a
textFile mapToPairmap
reduceByKey
foreach
1
1
1
1
1
u
m
k
1
2
1
2a
...
...
...
...
filter
...
...
sortByKey
a
m
2
1
2
1u
...
...
...
...
...
...
geom;...
1 k
Spark - ExempleAcacia dealbata : 2
Acer acerifolius : 39
Acer buergerianum : 14
Acer campestre : 452
...
Topologie & Terminologie
● Un master / des workers○ (+ un master en standby)
● On soumet une application● Exécution pilotée par un driver
Spark en cluster
Plusieurs options
● YARN● Mesos● Standalone
○ Workers démarrés individuellement○ Workers démarrés par le master
MapReduce● Spark (API)● Traitement distribué● Tolérant à la panne
Stockage● HDFS, base NoSQL...● Stockage distribué● Tolérant à la panne
Stockage & traitements
Colocation données & traitement
● “Data locality”● Traiter la donnée là où elle se trouve● Eviter les network I/Os
Colocation données & traitement
Spark Worker
HDFS Datanode
Spark Worker
HDFS Datanode
Spark Worker
HDFS Datanode
Spark Master
HDFS Namenode
HDFS Namenode (Standby)
SparkMaster
(Standby)
Démo$ $SPARK_HOME/sbin/start-master.sh
$ $SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker spark://MBP-de-Alexis:7077 --cores 2 --memory 2G
$ mvn clean package$ $SPARK_HOME/bin/spark-submit --master spark://MBP-de-Alexis:7077 --class com.seigneurin.spark.WikipediaMapReduceByKey --deploy-mode cluster target/pres-spark-0.0.1-SNAPSHOT.jar
● Exploitation d’un RDD en SQL● Moteur d’exécution SQL : convertit les
requêtes en instructions de base
Spark SQL
Spark SQL
Préalable :
● Disposer de données tabulaires● Décrire le schéma → SchemaRDD
Description de schéma :
● Description programmatique des données● Inférence de schéma par réflexion (POJO)
JavaRDD<Row> rdd = trees.map(fields -> Row.create( Float.parseFloat(fields[3]), fields[4]));
● Création de données tabulaires (type Row)
Spark SQL - Exemple
---------------------------------------
| 10.0 | Aesculus hippocastanum |
| 15.0 | Tilia platyphyllos |
| 0.0 | Platanus x hispanica |
| 10.0 | Paulownia tomentosa |
| ... | ... |
Spark SQL - Exemple
List<StructField> fields = new ArrayList<StructField>();fields.add(DataType.createStructField("hauteurenm", DataType.FloatType, false));fields.add(DataType.createStructField("espece", DataType.StringType, false));
StructType schema = DataType.createStructType(fields);
JavaSchemaRDD schemaRDD = sqlContext.applySchema(rdd, schema);schemaRDD.registerTempTable("tree");
---------------------------------------
| hauteurenm | espece |
---------------------------------------
| 10.0 | Aesculus hippocastanum |
| 15.0 | Tilia platyphyllos |
| 0.0 | Platanus x hispanica |
| 10.0 | Paulownia tomentosa |
| ... | ... |
● Description du schéma
● Comptage d’arbres par espèce
Spark SQL - Exemple
sqlContext.sql("SELECT espece, COUNT(*) FROM tree WHERE espece <> '' GROUP BY espece ORDER BY espece") .foreach(row -> System.out.println(row.getString(0)+" : "+row.getLong(1)));
Acacia dealbata : 2
Acer acerifolius : 39
Acer buergerianum : 14
Acer campestre : 452
...
Window operations
● Fenêtre glissante● Réutilise des données d'autres fenêtres● Initialisé avec window length et slide interval
Sources
● Socket● Kafka● Flume● HDFS● MQ (ZeroMQ...)● Twitter● ...● Ou une implémentation custom de Receiver
Démo de Spark Streaming
● Consommation de Tweets #Android○ Twitter4J
● Détection de la langue du Tweet○ Language Detection
● Indexation dans Elasticsearch● Analyse dans Kibana 4
$ curl -X DELETE localhost:9200$ curl -X PUT localhost:9200/spark/_mapping/tweets '{ "tweets": { "properties": { "user": {"type": "string","index": "not_analyzed"}, "text": {"type": "string"}, "createdAt": {"type": "date","format": "date_time"}, "language": {"type": "string","index": "not_analyzed"} } }}'
● Lancer ElasticSearch
Démo
● Lancer Kibana -> http://localhost:5601● Lancer le traitement
@aseigneurin
aseigneurin.github.io
@ippontech
blog.ippon.fr