big data warsaw
TRANSCRIPT
Stream Processing with Apache Flink
Maximilian Michels Flink PMC member [email protected] @stadtlegende
Apache Flink▪ A distributed open-source data analysis
framework ▪ True streaming at its core ▪ Streaming & Batch API
3
Historic data
Kafka,RabbitMQ,...
HDFS,JDBC,...
EventlogsETL, Graphs,Machine LearningRelational, …
Low latency,windowing, aggregations, ...
Flink Community
Top 5 Apache Big Data project in the Apache Software Foundation 500+ messages/month on the mailing list 8400+ commits 1500+ pull requests merged 950+ stars 510+ forks
Use Case: Log File Analysis▪ Load log files from a distributed file system ▪ Process them, sessionize according to the user id ▪ Write a view to the database or dump more data
for further processing
8
• Process • Analyze • Aggregate
Use Case: Tweet Impressions
9
Continuous Stream of Tweets (each with a timestamp)
▪ How do we measure the importance of Tweets? • Total number of views • Views within a time period
▪ We need to process and aggregate Tweets!
Max Marie Jonas Tim are tweeting.
Use Case: Tweet Impressions
10
Max Marie Jonas Tim are tweeting.
Last minute
Last hour
Last day
Impressions
Impression Events Aggregation of Impressions OutputMoreat:http://data-artisans.com/extending-the-yahoo-streaming-benchmark/
Why Stream Processing?▪ Most problems have streaming nature ▪ Stream processing gives lower latency ▪ Data volumes more easily tamed ▪ More predictable resource consumption
12
Eventstream
batch (solved)
event based
Challenges in Streaming▪ Latency ▪ Throughput ▪ Fault-Tolerance ▪ Correctness ▪ Elements may be out-of-order ▪ Elements may be processed more than
once13
Windows▪ A grouping of records according to time,
count, or session, e.g. • Count: The last 100 records • Session: All records for user X • Time: All records of the last 2 minutes
14
Event Time▪ Processing time: when data is processed ▪ Ingestion time: when data is loaded ▪ Event time: when data is generated
▪ Almost always, the three are different ▪ Event time helps to process out-of-order or
to replay elements as they occurred15
Event Time & Watermarks▪ Elements arrives: How do we know what time it
is? ▪ Processing time: take the hardware clock ▪ Event time: Watermarks ▪ Watermarks are timestamps ▪ No elements later than the timestamp are
expected to arrive16
From Program to ExecutioncaseclassPath(from:Long,to:Long)valtc=edges.iterate(10){paths:DataSet[Path]=>valnext=paths.join(edges).where("to").equalTo("from"){(path,edge)=>Path(path.from,edge.to)}.union(paths).distinct()next}
Cost-based optimizer
Type extraction stack
Task scheduling
Recovery metadata
Pre-flight (Client)
MasterWorkers
DataSource orders.tbl
Filter
Map DataSource lineitem.tbl
Join Hybrid Hash
buildHT probe
hash-part [0] hash-part [0]
GroupRedsort
forward
Program
DataflowGraph
Memory manager
Out-of-core algorithms
Batch & Streaming
State & Checkpoints
deployoperators
trackintermediate
results
Flink Applications
21
Streaming topologies
Heavy Batch jobs
Machine Learning at scale
Graph processing at scale
E.g.: Non-Native Iterations
22
Step Step Step Step Step
Client
for(inti=0;i<maxIterations;i++){ //ExecuteMapReducejob}
Iterative Processing in Flink
▪ Built-in iterations and delta iterations ▪ Executes machine learning and graph
algorithms efficiently
23
E.g.: Non-Native Streaming
24
discretize stream
Job Job Job Job
while(true){//getnextfewrecords//issuebatchjob}
Pipelining
25
Basic building block to “keep data moving”
• Low latency • Operators push data
forward • Data shipping as
buffers, not tuple-wise
• Natural handling of back-pressure
Flink Engine1. Execute everything as streams
2. Iterative (cyclic) dataflows
3. Mutable state in operators State+Computation
Flink Engine1. Execute everything as streams
2. Iterative (cyclic) dataflows
3. Mutable state in operators
4. Operate on managed memory
State+Computation
Flink Engine1. Execute everything as streams
2. Iterative (cyclic) dataflows
3. Mutable state in operators
4. Operate on managed memory
5. Special code paths for batch
State+Computation
Flink Engine1. Execute everything as streams
2. Iterative (cyclic) dataflows
3. Mutable state in operators
4. Operate on managed memory
5. Special code paths for batch
6. HA mode – no single point of failure
State+Computation
Flink Engine1. Execute everything as streams
2. Iterative (cyclic) dataflows
3. Mutable state in operators
4. Operate on managed memory
5. Special code paths for batch
6. HA mode – no single point of failure
7. Checkpointing of operator state
State+Computation
Flink Eco System
Gel
ly
Tabl
e
ML
SAM
OA
DataSet (Java/Scala/Python) DataStream
Had
oop
M/R
Local Cluster Yarn
Dat
aflo
w
Dat
aflo
w
MRQ
L
Tabl
e
Casc
adin
g
Streaming dataflow runtime
Stor
m
Zepp
elin
Flink Eco System
Gel
ly
Tabl
e
ML
SAM
OA
DataSet (Java/Scala/Python) DataStream
Had
oop
M/R
Local Cluster Yarn
Dat
aflo
w
Dat
aflo
w
MRQ
L
Tabl
e
Casc
adin
g
Streaming dataflow runtime
Stor
m
Zepp
elin
HDFS
HBase
Kafka
RabbitMQ
Flume
HCatalog
JDBC
API Structure//CreateEnvironmentStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//AddSourceDataStream<Type>source=env.addSource(…);
//PerformtransformationsDataStream<Type2>trans=source.keyBy(“field”).map(…).timeWindow(...)
//AddSinktrans.addSink(…);
//Execute!env.execute();
29
Hourly Impressions//readfromKafkaTweetImpressionstopicDataStream<Tweet>tweets=env.addSource(newFlinkKafkaConsumer<>(...));
//counttotalnumberoftweetsDataStream<Tweet>summaryStream=tweets.filter(tweet->tweet.tweetId!=null).keyBy(tweet->tweet.tweetId).window(TumblingTimeWindows.of(Time.hours(1))).sum("impressions");//outputtoKafkasummaryStream.addSink(newFlinkKafkaProducer<Tweet>(...));
30
classTweet{StringtweetId;StringuserId;Stringtext;longimpressions;}
Up-to-date Daily Impressions//readfromKafkaTweetImpressionstopicDataStream<Tweet>tweets=env.addSource(newFlinkKafkaConsumer<>(...));
//counttotalnumberoftweetsDataStream<Tweet>summaryStream=tweets.filter(tweet->tweet.tweetId!=null).keyBy(tweet->tweet.tweetId).window(SlidingTimeWindows.of(Time.days(1),Time.minutes(1))).sum("impressions");//outputtodatabaseorKafkasummaryStream.addSink(newFlinkKafkaProducer<Tweet>(...));
31
classTweet{StringtweetId;StringuserId;Stringtext;longimpressions;}
Hourly Impression SummaryDataStream<Summary>summaryStream=tweets.keyBy(tweet->tweet.tweetId).window(TumblingTimeWindows.of(Time.hours(1))).apply(newWindowFunction<>(){publicvoidapply(StringtweetId,TimeWindowwindow,Iterable<Tweet>impressions, Collector<Summary>out){ longcount=0;Tweettweet=null;for(Tweetval:impressions){tweet=val;count++;}//outputsummaryout.collect(newSummary(tweet,count,window.getStart(),window.getEnd()));}});
32
classTweet{StringtweetId;StringuserId;Stringtext;}
classSummary{Tweettweet;longimpressions;longbeginTime;longendTime;}
Apache Flink▪ A powerful framework with stream
processor at its core ▪ Features • True Streaming with great Batch support • Easy to use APIs, library ecosystem • Fault-tolerant and Consistent • Low latency - High throughput • Growing community
I ♥ , do you?
35
▪ More information on flink.apache.org ▪ Flink Training at data-artisans.com ▪ Subscribe to the mailing lists ▪ Follow @ApacheFlink
▪ Next: 1.0.0 release ▪ Soon: Stream SQL, Mesos, Dynamic scaling