so you think you can stream.pptx
TRANSCRIPT
![Page 1: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/1.jpg)
So you think you can Stream?
Vida Ha (@femineer)
Prakash Chockalingam (@prakash573)
![Page 2: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/2.jpg)
Who am I ?
● Solutions Architect @ Databricks
● Formerly Square, Reporting & Analytics
● Formerly Google, Mobile Web Search & Speech Recognition
2
![Page 3: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/3.jpg)
Who am I ?
● Solutions Architect @ Databricks
● Formerly Netflix, Personalization Infrastructure
3
![Page 4: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/4.jpg)
About Databricks
Founded by creators of Spark in 2013
Cloud enterprise data platform- Managed Spark clusters- Interactive data science- Production pipelines- Data governance, security, …
![Page 5: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/5.jpg)
Sign up for the Databricks Community Edition
Learn Apache Spark for free
● Mini 6GB cluster for learning Spark
● Interactive notebooks and dashboards
● Online learning resources
● Public environment to share your work
https://databricks.com/try-databricks
![Page 6: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/6.jpg)
Agenda
• Introduction to Spark Streaming
• Lifecycle of a Spark streaming app
• Aggregations and best practices
• Operationalization tips
• Key benefits of Spark streaming
![Page 7: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/7.jpg)
What is Spark Streaming?
![Page 8: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/8.jpg)
Spark Streaming
![Page 9: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/9.jpg)
![Page 10: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/10.jpg)
How does it work?
● Receivers receive data streams and chops them in to batches.
● Spark processes the batches and pushes out the results
![Page 11: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/11.jpg)
Word Count
val context = new StreamingContext(conf, Seconds(1))
val lines = context.socketTextStream(...)
Entry point Batch Interval
DStream: represents a data stream
![Page 12: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/12.jpg)
Word Countval context = new StreamingContext(conf, Seconds(1))
val lines = context.socketTextStream(...)
val words = lines.flatMap(_.split(“ “))
Transformations: transform data to create new DStreams
![Page 13: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/13.jpg)
Word Countval context = new StreamingContext(conf, Seconds(1))
val lines = context.socketTextStream(...)
val words = lines.flatMap(_.split(“ “))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_+_)
wordCounts.print()
context.start()Print the DStream contents on screen
Start the streaming job
![Page 14: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/14.jpg)
Lifecycle of a streaming app
![Page 15: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/15.jpg)
Execution in any Spark Application
Spark Driver
User code runs in the driver process
YARN / Mesos / Spark Standalone cluster
Tasks sent to executors for processing data
Spark Executor
Spark Executor
Spark Executor
Driver launches executors in cluster
![Page 16: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/16.jpg)
Execution in Spark Streaming: Receiving dataExecutor
Executor
Driver runs receivers as long running tasks
Receiver Data stream
DriverReceiver divides stream into blocks and keeps in memory
Data Blocks
Blocks also replicated to another executor
Data Blocks
![Page 17: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/17.jpg)
Execution in Spark Streaming: Processing dataExecutor
Executor
Receiver
Data Blocks
Data Blocks
results
results
Data store
Every batch interval, driver launches tasks to
process the blocksDriver
![Page 18: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/18.jpg)
End-to-end view
18
t1 = ssc.socketStream(“…”)
t2 = ssc.socketStream(“…”)
t = t1.union(t2).map(…)
t.saveAsHadoopFiles(…)
t.map(…).foreach(…)
t.filter(…).foreach(…)
T
U
M
T
M FFE
FE FE
B
U
M
B
M F
Input DStreams
Output operations
RDD Actions / Spark Jobs
BlockRDDs
DStreamGraphDAG of RDDs
every intervalDAG of stagesevery interval
Stage 1
Stage 2
Stage 3
Streaming appTasks
every interval
B
U
M
B
M F
B
U
M
B
M F
Stage 1
Stage 2
Stage 3
Stage 1
Stage 2
Stage 3
Spark StreamingJobScheduler + JobGenerator
SparkDAGScheduler
SparkTaskScheduler
Exec
utor
s
YOU write this
![Page 19: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/19.jpg)
Aggregations
![Page 20: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/20.jpg)
Word count over a time window
val wordCounts = wordStream.reduceByKeyAndWindow((x:
Int, y:Int) => x+y, windowSize, slidingInterval)
Reduces over a time window
![Page 21: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/21.jpg)
Word count over a time window
Scenario: Word count for the last 30 minutesHow to optimize for good performance?
● Increase batch interval, if possible● Incremental aggregations with inverse reduce function
val wordCounts = wordStream.reduceByKeyAndWindow( (x: Int, y:Int) => x+y, (x: Int, y: Int) => x-y, windowSize, slidingInterval)
● CheckpointingwordStream.checkpoint(checkpointInterval)
![Page 22: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/22.jpg)
Stateful: Global Aggregations
Scenario: Maintain a global state based on the input events comingin. Ex: Word count from beginning of time.
updateStateByKey (Spark 1.5 and before)● Performance is proportional to the size of the state.
mapWithState (Spark 1.6+)● Performance is proportional to the size of the batch.
![Page 23: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/23.jpg)
Stateful: Global Aggregations
![Page 24: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/24.jpg)
Stateful: Global Aggregations
Key features of mapWithState:● An initial state - Read from somewhere as a RDD● # of partitions for the state - If you have a good estimate of the size of the
state, you can specify the # of partitions.● Partitioner - Default: Hash partitioner. If you have a good understanding of
the key space, then you can provide a custom partitioner● Timeout - Keys whose values are not updated within the specified timeout
period will be removed from the state.
![Page 25: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/25.jpg)
Stateful: Global Aggregations (Word count)
val stateSpec = StateSpec.function(updateState _)
.initialState(initialRDD)
.numPartitions(100)
.partitioner(MyPartitioner())
.timeout(Minutes(120))
val wordCountState = wordStream.mapWithState(stateSpec)
![Page 26: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/26.jpg)
Stateful: Global Aggregations (Word count)
def updateState(batchTime: Time,
key: String,
value: Option[Int],
state: State[Long])
: Option[(String, Long)]
Current batch time
A Word in the input stream
Current value (= 1)
Counts so far for the word
The word and its new count
![Page 27: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/27.jpg)
Operationalization
![Page 28: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/28.jpg)
Operationalization
Three main themes
● Checkpointing
● Achieving good throughput
● Debugging a streaming job
![Page 29: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/29.jpg)
Checkpoint
Two types of checkpointing:
● Checkpointing Data
● Checkpointing Metadata
![Page 30: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/30.jpg)
Checkpoint Data● Checkpointing DStreams
• Primarily needed to cut long lineage on past batches (updateStateByKey/reduceByKeyAndWindow).
• Example: wordStream.checkpoint(checkpointInterval)
![Page 31: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/31.jpg)
Checkpoint Metadata
● Checkpointing Metadata• All the configuration, DStream operations and incomplete batches are
checkpointed.• Required for failure recovery if the driver process crashes.• Example:
–streamingContext.checkpoint(directory)–StreamingContext.getActiveOrCreate(directory, ...)
![Page 32: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/32.jpg)
Achieving good throughput
context.socketStream(...).map(...).filter(...).saveAsHadoopFile(...)
Problem: There will be 1 receiver which receives all the data and stores it in its executor and all the processing happens on that executor. Adding more nodes doesn’t help.
![Page 33: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/33.jpg)
Achieving good throughputSolution: Increase the # of receivers and union them.
● Each receiver is run in 1 executor. Having 5 receivers will ensure that the data gets received in parallel in 5 executors.
● Data gets distributed in 5 executors. So all the subsequent Spark map/filter operations will be distributed
val numStreams = 5val inputStreams = (1 to numStreams).map(i => context.
socketStream(...))val fullStream = context.union(inputStreams)fullStream.map(...).filter(...).saveAsHadoopFile(...)
![Page 34: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/34.jpg)
Achieving good throughput
● In the case of direct receivers (like Kafka), set the appropriate #
of partitions in Kafka.
● Each kafka partition gets mapped to a Spark partition.
● More partitions in Kafka = More parallelism in Spark
![Page 35: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/35.jpg)
Achieving good throughput
● Provide the right # of partitions based on your cluster size for operations causing shuffles.
words.map(x => (x, 1)).reduceByKey(_+_, 100)
# of partitions
![Page 36: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/36.jpg)
Debugging a Streaming applicationStreaming tab in Spark UI
![Page 37: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/37.jpg)
Debugging a Streaming applicationProcessing Time
● Make sure that the processing time < batch interval
![Page 38: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/38.jpg)
Debugging a Streaming application
![Page 39: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/39.jpg)
Debugging a Streaming applicationBatch Details Page:
● Input to the batch● Jobs that were run as part of the processing for the batch
![Page 40: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/40.jpg)
Debugging a Streaming applicationJob Details Page
● DAG Visualization● Stages of a Spark job
![Page 41: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/41.jpg)
Debugging a Streaming applicationTask Details PageEnsure that the tasks are executed on multiple executors (nodes) in your cluster to have enough parallelism while processing. If you have a single receiver, sometimes only one executor might be doing all the work though you have more than one executor in your cluster.
![Page 42: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/42.jpg)
Key benefits of Spark streaming
![Page 43: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/43.jpg)
Dynamic Load Balancing
![Page 44: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/44.jpg)
Fast failure and Straggler recovery
![Page 45: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/45.jpg)
Combine Batch and Stream ProcessingJoin data streams with static data sets
val dataset = sparkContext.hadoopFile(“file”)…kafkaStream.transform{ batchRdd =>
batchRdd.join(dataset).filter(...)}
![Page 46: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/46.jpg)
Combine ML and Stream Processing
Learn models offline, apply them online
val model = KMeans.train(dataset, …)kakfaStream.map { event =>
model.predict(event.feature)}
![Page 47: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/47.jpg)
Combine SQL and Stream Processing
inputStream.foreachRDD{ rdd =>val df = SQLContext.createDataframe(rdd)df.select(...).where(...).groupBy(...)
}
![Page 48: So you think you can stream.pptx](https://reader030.vdocuments.us/reader030/viewer/2022020203/58a45bc21a28abb8288b460d/html5/thumbnails/48.jpg)
Thank you.