setting up a big data platform at kelkoo
TRANSCRIPT
Setting up a Big Data Platform atKelkooData Platform
Fabrice dos Santos1st of Sep. 2015
Kelkoo DataPlatform / Big Data ?• “Football is a simple game. Twenty-two men chase a ball for 90 minutes and at
the end, the Germans always win” • And why do they win ?
– Because they use big data !– German team partnered with German software giant SAP AG to create a custom
match analysis tool that collects and analyzes massive amounts of player performance data.
• Big data is a broad term for data sets so large or complex that traditional data processing applications are inadequate. Challenges include analysis, capture, data curation, search, sharing, storage, transfer, visualization, andinformation privacy.
• http://blogs.wsj.com/cio/2014/07/10/germanys-12th-man-at-the-world-cup-big-data/
• https://www.youtube.com/watch?v=JX5NLUViIMc• http://www.lesechos.fr/idees-debats/cercle/cercle-111048-le-monde-nouveau-du-
big-data-1047390.php
Gary Lineker
Kelkoo DataPlatform transitioninng ::: AGENDA & Goal
FlumeData collection and
aggregation
HDFSDistributed storage
•Name node / Datanodes
•HDFS INPUTS: LOGS
•HDFS OUTPUT: REPORTS
Spark on Yarn
Distributedprocessing
• ResourceManager / Nodemanager
• Spark applications
Hive / SparkSQL
Query dataRead and analyse
• GOAL
• give you the core concepts of hadoop platform @ Kelkoo
• understand dataflow
• starts getting used with vocabulary
1/ Kelkoo DataPlatform :: Flume
Flume Agent
Acheminent des données
HDFS
Stockage des données
• Name node / Datanodes
• HDFS INPUTS: LOGS
• HDFS OUTPUT: REPORTS
Spark on Yarn
Analyse et traitement des données
• ResourceManager / Nodemanager
• Apache Flume is a distributed, reliable, and available system for efficiently collecting, aggregating and moving large amounts of log data from many different sources to a centralized data store.
FLUME AGENT (kelkoo_a1)
Flume:: Core concepts
Rec
eiv
eev
en ts
External
client (ECS, KLS
etc…)
Push even
ts
Source(rImp, rLead)
POLL
Channel (cImp,
cLead)
Forw
ard
even
ts
Sink(sImp,sLe
ad)
Read events
HDFS
• Event : unit of data transported by Flume• [ Header (timestamp, hostname …) | Body
(data)]
• Client (ECS,KLS): point of origin of eventsthat deliver them to a Flume agent
• Flume agent (kelkooFlume_a?) jvm process:
• Source: consume events and hands itover to the channel
• Channel: buffers incoming events until a sink drains them for further transport => reliability
• Sink: remove events from a channel and transmit it to next agent (HDFS sinkhere)
• Channel periodically writes a backup check point out to disk => recoverability
• HDFS storage : terminal repository
Checkpoint/opt/kookel/data
/kelkooFlume
Flume @ Kelkoo
• Source type– We use Avro, which is a data serialization format: compact and fast binary data format– Other type of sources: memory, exec (tail –f /opt/… )
• Flume = transactional design = each event is treated as a transaction– The events are removed from a channel only after they are stored in the channel of the next
agent or in the terminal repository, thus maintaining a queue of current events until the storage confirmation is received
• Distributed and scalable system : 4 agents in Kelkoo to spread the load installed on 2 servers
• Channel type: file– The File Channel is Flume’s persistent channel. – Writes out all events to disk : no data loss on process or machine shutdown or crash. – The File Channel ensures that any events committed into the channel are removed from the
channel only when a sink takes the events and commits the transaction, even if the machine or agent crashed and was restarted.
– Designed to be highly concurrent and to handle several sources and sinks at the same time.
Flume monitoring
• Agent has a json servlet
– http://haddop-server:34545/metrics
– Returns a json output easily managable for monitoring purpose, using a simple shell script with jq extension
2 / KelkooDataPlatform :: HDFS distributed storage
FlumeData collection and
aggregation
HDFSDistributed storage
•Name node / Datanodes
•HDFS INPUTS: LOGS
•HDFS OUTPUT: REPORTS
Spark on YarnDistributedprocessing
• ResourceManager / Nodemanager
• Spark applications
Hive / SparkSQL
Query dataRead and analyse
::: HDFS definition
• HDFS is a highly scalable, distributed file system, meant to store large amount of data
• based on GoogleFS.
• Appears as a single disk: abstract physicalarchitecture , we can manipulate files as if wewere on a single disk.
• HDFS ?
– HADOOP DISTRIBUTED FILESYSTEM
HDFS Daemons Overview
• 2 types of processes: – Namenode (must always be running):
• Stores the metadata about files and blocks on the filesystem, manage namespaces– Maps a file name to a set of blocks– Maps a block to a set of Datanodes
• Redirect client for read/writes to appropriate datanode
– Datanodes:• Stores the data in local filesystem (ext4 in Kelkoo) • periodically reports to Namenode the list of blocks they host and send heartbeat to the
namenode• Serves data and meta-data to Clients• Runs on several machines
NameNodedc1-kdp-prod-hadoop-
00
Datanode 1ex: dc1-kdp-prod-
hadoop-06 Datanode 2 Datanode 3 … Datanode n
StanbyNameNode
dc1-kdp-prod-hadoop-01
HDFS files and blocks example• « toto.txt » file is managed by Namenode,
stored by Datanodes
– File split into blocks: Block #1 + Block # 2
– when a file is read, Datanode ask the namenodeon which blocks data is located
– Blocks are replicated (default is 3) : ensuresrobustness and availability
Namenodedc1-kdp-prod-
hadoop-00
Datanode 1 Datanode 2 Datanode 3…
Datanode n
B1
B2
B1
B2
B2
B1
B2
B1
SAME BLOCK on multiple
machines
Shared edits
HDFS High availability with Hadoop 2+Zookeeper service3 Zookeeper instances
Active NameNodedc1-kdp-prod-hadoop-00
Stanby NameNodedc1-kdp-prod-hadoop-01
• Namenodes: one active and one standby namenode, standby takes over if the active namenode goes down, (avoid SPOF ).
• Zookeeper : High availibility of process• Zookeeper server: keeps a copy of the
state of the entire system and persists this information in local log files.
• ZooKeeper Failover Controller ZKFC : monitors NameNode and failover when the Active NameNode is unavailable.
• Quorum Journal Manager & JournalNodes: High availability of data
• Instead of storing HDFS edit logs in a single location (nfs), store them in several remote locations => the JournalNodes
• Active Namenode : writes edits to journalNodes
• QJM (feature of the Namenode) ensuresthat we « reach the quorum » ie ensurethe journal log is written to the majorityof the JournalNodes
• Stanby Namenode : read edits
• On the server: conf written in • /etc/hadoop/hdfs_site.xml:
JournalNode JournalNode JournalNode
QJM
Monitor and maintain active lock
ZKFCZKFC
Monitor and try to take active
writes
QJM
reads
HDFS User Interface• Interacting with HDFS using Filesystem shell commands (as kookel)
– All commands are on hadoop doc– hdfs dfs -<command> <options>– hdfs dfs -du -chs /user/kookel/logs/flume/
• Command for HDFS administration– hdfs dfsadmin -report -live | grep --color dc1-kdp-prod-hadoop-
10.prod.dc1.kelkoo.net -A1Name: 10.76.99.60:50010 (dc1-kdp-prod-hadoop-10.prod.dc1.kelkoo.net)Hostname: dc1-kdp-prod-hadoop-10.prod.dc1.kelkoo.netDecommission Status : Normal
– hdfs dfsadmin -getDatanodeInfo dc1-kdp-prod-hadoop-06.prod.dc1.kelkoo.net:50020
• Command for HighAvailabilty admin– hdfs haadmin -failover nn1 nn2
• Web interface :– http://dc1-kdp-prod-hadoop-00.prod.dc1.kelkoo.net:50070/dfsclusterhealth.html– http://dc1-kdp-prod-hadoop-00.prod.dc1.kelkoo.net:50070/dfschealth.html
3/ Spark on Yarn• HDFS = distributed storage
• Spark = distributed processing
• Apache Spark is an open-source data analytics cluster computing framework.[1] Spark fits into the Hadoop open-source community, building on top of the HadoopDistributed File System
FlumeData collection and
aggregation
HDFSDistributed storage
•Name node / Datanodes
•HDFS INPUTS: LOGS
•HDFS OUTPUT: REPORTS
Spark on YarnDistributedprocessing
• ResourceManager / Nodemanager
• Spark applications
Hive / SparkSQL
Query dataRead and analyse
Hadoop Yarn Cluster :: overview
• YARN ? (Yet Another Resource Negotiator)• 2 types of processes:
– ResourceManager• Arbitrates the available cluster resources
– helps manage the distributed applications running on the YARN system– orchestrates the division of resources (compute, memory, bandwidth, etc.) to
underlying NodeManagers
– NodeManagers:• Takes instruction from the ResourceManager• Monitor containers resource usage (cpu, memory, disk) • Reporting resources status to ResourceManager/Scheduler
YarnResource Managerdc1-kdp-prod-
hadoop-02
NodeManager 1ex: dc1-kdp-prod-
hadoop-06 NodeManager 2 NodeManager 3 … NodeManager n
on Yarn :: spark application lifecycleCLIEN
T
YARN ContainerSpark
Application
MasterYARN Containe
r
YARN Containe
r SparkExecut
or
SparkExecut
or
Yarn Resource Manager
dc1-kdp-prod-hadoop-02
YARN Containe
r
SparkTask
SparkExecut
orSparkTask
Starts AM
on Yarn ::: spark application lifecycle
• Key concepts:– Application: maybe a single job, sequence of jobs, KDP Spark
applications are mainly launched via Azkaban• sparkAppRunner is a component that allow to run a sparkApp on Yarn
in Kelkoo
– Application Master: • one per application, negotiate resource with YARN • Runs inside a container• Requests more hosts/containers to run the Spark application tasks
– Container @kelkoo => /d0/yarn/local/nm-local-dir/usercache/kookel/appcache/application_1441098196522_0213/container_1441098196522_0213_02_000006
– Spark Executor: A single JVM instance on a node that serves a single Spark application.
– Spark Task: a unit of work on a partition of a distributed dataset.
Yarn Cluster focus• Yarn Resource manager manages applications
• Yarn commands (installed on all servers running in the Yarn cluster)– Can be useful to monitor applications from the Resource manager, commands are invoked by the bin/yarn
script• yarn application -status application_1428487296152_99148• yarn kill application_1428487296152_99148
• Yarn Rest API (more or less like yarn script but more complete)– Xml output : curl --compressed -H "Accept: application/xml" -X GET http://hadoop-
server:8088/ws/v1/cluster/apps/application_1428487296152_99610– Json output: curl --compressed -H "Accept: application/json" -X GET http://hadoop-
server:8088/ws/v1/cluster/apps/application_1428487296152_99610– Cluster metrics : http://dc1-kdp-prod-hadoop-02.prod.dc1.kelkoo.net:8088/ws/v1/cluster/metrics
HDFS & YARN :: don’t mix things upYARN HDFS
Resource Managerresources
allocation for applications in
containers
NameNodehandles
metadata, mapfiles with
blocks,blockswith datanodes
NodeManagersmanage
containers
Datanodes
Containerapplication
data/d0/yarn/local/d1/yarn/local
Data directory/d0/d1
Kelkoo DataPlatform:: Accessing data • Hive: data warehouse infrastructure
– Kelkoo => hiveMetastoreSchema
– System for managing and querying structureddata, built on top of Hadoop
– Provides a simple query language called Hive QL, which is based on SQL
– Hive holds persistent data
HiveMetastore
DB
Hdfs file: consistency_key_metrics
Hive service
Monitoring
• checking DataPlatform input and output– INPUT : missing logs– OUTPUT : missing or failed reports
• Monitoring hdfs:– Check all datanodes are live (HDFS) and HA is running fine (Zookeeper etc..)– Monitor capacity
• Flume monitoring – Check flume is up – Monitor flume Channel on Grafana
• Yarn– Monitor failed spark applications – Check all nodemanagers are live (YARN)– Monitor allocated ressources in containers
• Azkaban– Monitor failed processes
• Monitoring tools– Nagios: – Grafana:
Wrapp up : what you must remember
• Why Big data: to analyse large amounts of data• FLUME:
– Aggregate and stream data into HDFS– Transactionnal mode: client -> agent (source, channel, sink) -> HDFS Storage– Recoverability / reliable and scalable
• HDFS cluster, high performance distributed filesystem– NameNode (master), Datanodes (data)– HDFS High Availability with Zookeeper and Journal nodes– HDFS files and blocks, blocks are replicated (3 by default)
• Spark on Yarn– Yarn ResoureceManager (master), NodeManager(data)– Yarn is used to run Spark applications in distributed mode
• Hive mestatore– Turn hdfs files into structured data
Further reading
• Sources:– The « Bible »: http://hadoop.apache.org/docs/current/– http://fr.slideshare.net/martyhall/hadoop-tutorial-hdfs-part-1-overview– https://www.facebook.com/BigDataHyderabad– HA :
• http://blog.cloudera.com/blog/2012/10/quorum-based-journaling-in-cdh4-1/• http://hortonworks.com/blog/namenode-high-availability-in-hdp-2-0/• http://www-
01.ibm.com/support/knowledgecenter/SSPT3X_3.0.0/com.ibm.swg.im.infosphere.biginsights.install.doc/doc/bi_install_large_cluster_qjm.html
– Spark & YARN:• http://fr.slideshare.net/AdamKawa/apache-hadoop-yarn-simply-explained• http://blog.cloudera.com/blog/2014/05/apache-spark-resource-management-and-yarn-app-models/• http://hortonworks.com/blog/apache-hadoop-yarn-concepts-and-applications/
– http://hortonworks.com/blog/apache-hadoop-yarn-resourcemanager/– https://www.youtube.com/watch?v=nu16zSDw0BA
– HIVE: http://www.slideshare.net/athusoo/hive-apachecon-2008-presentation/– FLUME:
• http://blog.cloudera.com/blog/2011/12/apache-flume-architecture-of-flume-ng-2/• http://www.ibm.com/developerworks/library/bd-flumews/• http://www.slideshare.net/getindata/apache-flume-37675297• Filechannel data: http://grokbase.com/t/flume/user/128qcby5nx/filechannel-data-directory-usage
– Code: https://www.codatlas.com/github.com/apache