spark streaming: best practices
TRANSCRIPT
![Page 1: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/1.jpg)
Spark Streaming: Best Practices
Prakash Chockalingam@prakash573
![Page 2: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/2.jpg)
Who am I ?
2
● Solutions Architect / Product Manager at Databricks
● Formerly Netflix, Personalization Infrastructure
● Formerly Yahoo!, Personalized Ad Targeting
![Page 3: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/3.jpg)
About Databricks
Founded by creators of Spark in 2013
Cloud enterprise data platform- Managed Spark clusters- Interactive data science- Production pipelines- Data governance, security, …
![Page 4: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/4.jpg)
Agenda
● Introduction to Spark Streaming
● Lifecycle of a Spark streaming app
● Aggregations and best practices
● Operationalization tips
● Key benefits of Spark streaming
![Page 5: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/5.jpg)
What is Spark Streaming?
![Page 6: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/6.jpg)
Spark Streaming
![Page 7: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/7.jpg)
![Page 8: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/8.jpg)
How does it work?
● Receivers receive data streams and chops them in to batches.
● Spark processes the batches and pushes out the results
![Page 9: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/9.jpg)
Word Count
val context = new StreamingContext(conf, Seconds(1))
val lines = context.socketTextStream(...)
Entry point Batch Interval
DStream: represents a data stream
![Page 10: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/10.jpg)
Word Count
val context = new StreamingContext(conf, Seconds(1))
val lines = context.socketTextStream(...)
val words = lines.flatMap(_.split(“ “))
Transformations: transform data to create new DStreams
![Page 11: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/11.jpg)
Word Count
val context = new StreamingContext(conf, Seconds(1))
val lines = context.socketTextStream(...)
val words = lines.flatMap(_.split(“ “))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_+_)
wordCounts.print()
context.start()
Print the DStream contents on screen
Start the streaming job
![Page 12: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/12.jpg)
Lifecycle of a streaming app
![Page 13: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/13.jpg)
Execution in any Spark Application
![Page 14: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/14.jpg)
Execution in Spark Streaming: Receiving data
![Page 15: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/15.jpg)
Execution in Spark Streaming: Processing data
![Page 16: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/16.jpg)
End-to-end view
16
t1 = ssc.socketStream(“…”)
t2 = ssc.socketStream(“…”)
t = t1.union(t2).map(…)
t.saveAsHadoopFiles(…)
t.map(…).foreach(…)
t.filter(…).foreach(…)
T
U
M
T
M FFE
FE FE
B
U
M
B
M F
Input DStreams
Output operations
RDD Actions / Spark Jobs
BlockRDDs
DStreamGraph DAG of RDDsevery interval
DAG of stagesevery interval
Stage 1
Stage 2
Stage 3
Streaming app Tasksevery interval
B
U
M
B
M F
B
U
M
B
M F
Stage 1
Stage 2
Stage 3
Stage 1
Stage 2
Stage 3
Spark StreamingJobScheduler + JobGenerator
SparkDAGScheduler
SparkTaskScheduler
Exec
uto
rs
YOU write this
![Page 17: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/17.jpg)
Aggregations
![Page 18: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/18.jpg)
Word count over a time window
val wordCounts = wordStream.reduceByKeyAndWindow((x:
Int, y:Int) => x+y, windowSize, slidingInterval)
Parent DStream
window size
sliding interval
Reduces over a time window
![Page 19: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/19.jpg)
Word count over a time window
Scenario: Word count for the last 30 minutesHow to optimize for good performance?
● Increase batch interval, if possible● Incremental aggregations with inverse reduce function
val wordCounts = wordStream.reduceByKeyAndWindow((x: Int, y:Int) => x+y, (x: Int, y: Int) => x-y, windowSize, slidingInterval)
● CheckpointingwordStream.checkpoint(checkpointInterval)
![Page 20: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/20.jpg)
Stateful: Global Aggregations
Scenario: Maintain a global state based on the input events comingin. Ex: Word count from beginning of time.
updateStateByKey (Spark 1.5 and before)● Performance is proportional to the size of the state.
mapWithState (Spark 1.6+)● Performance is proportional to the size of the batch.
![Page 21: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/21.jpg)
Stateful: Global Aggregations
![Page 22: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/22.jpg)
Stateful: Global Aggregations
Key features of mapWithState:● An initial state - Read from somewhere as a RDD
● # of partitions for the state - If you have a good estimate of the size of the state, you can specify the # of partitions.
● Partitioner - Default: Hash partitioner. If you have a good understanding of the key space, then you can provide a custom partitioner
● Timeout - Keys whose values are not updated within the specified timeout period
will be removed from the state.
![Page 23: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/23.jpg)
Stateful: Global Aggregations (Word count)
val stateSpec = StateSpec.function(updateState _)
.initialState(initialRDD)
.numPartitions(100)
.partitioner(MyPartitioner())
.timeout(Minutes(120))
val wordCountState = wordStream.mapWithState(stateSpec)
![Page 24: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/24.jpg)
Stateful: Global Aggregations (Word count)
def updateState(batchTime: Time,
key: String,
value: Option[Int],
state: State[Long])
: Option[(String, Long)]
Current batch time
A Word in the input stream
Current value (= 1)
Counts so far for the word
The word and its new count
![Page 25: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/25.jpg)
Operationalization
![Page 26: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/26.jpg)
Checkpoint
Two types of checkpointing:
● Checkpointing Data
● Checkpointing Metadata
![Page 27: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/27.jpg)
Checkpoint Data● Checkpointing DStreams
○ Primarily needed to cut long lineage on past batches (updateStateByKey/reduceByKeyAndWindow).
○ Example: wordStream.checkpoint(checkpointInterval)
![Page 28: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/28.jpg)
Checkpoint Metadata
● Checkpointing Metadata○ All the configuration, DStream operations and incomplete batches are
checkpointed.○ Required for failure recovery if the driver process crashes.○ Example: streamingContext.checkpoint(directory)
![Page 29: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/29.jpg)
Achieving good throughput
context.socketStream(...)
.map(...)
.filter(...)
.saveAsHadoopFile(...)
Problem: There will be 1 receiver which receives all the data and stores it in its executor and all the processing happens on that executor. Adding more nodes doesn’t help.
![Page 30: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/30.jpg)
Achieving good throughputSolution: Increase the # of receivers and union them.
● Each receiver is run in 1 executor. Having 5 receivers will ensure that the data gets received in parallel in 5 executors.
● Data gets distributed in 5 executors. So all the subsequent Spark map/filter operations will be distributed
val numStreams = 5
val inputStreams = (1 to numStreams).map(i => context.socketStream(...))
val fullStream = context.union(inputStreams)
fullStream.map(...).filter(...).saveAsHadoopFile(...)
![Page 31: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/31.jpg)
Achieving good throughput
● In the case of direct receivers (like Kafka), set the appropriate #
of partitions in Kafka.
● Each kafka paratition gets mapped to a Spark partition.
● More partitions in Kafka = More parallelism in Spark
![Page 32: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/32.jpg)
Achieving good throughput
● Provide the right # of partitions based on your cluster size for operations causing shuffles.
words.map(x => (x, 1)).reduceByKey(_+_, 100)
# of partitions
![Page 33: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/33.jpg)
Debugging a Streaming applicationStreaming tab in Spark UI
![Page 34: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/34.jpg)
Debugging a Streaming applicationProcessing Time
● Make sure that the processing time < batch interval
![Page 35: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/35.jpg)
Debugging a Streaming application
![Page 36: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/36.jpg)
Debugging a Streaming applicationBatch Details Page:
● Input to the batch● Jobs that were run as part of the processing for the batch
![Page 37: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/37.jpg)
Debugging a Streaming applicationJob Details Page
● DAG Visualization● Stages of a Spark job
![Page 38: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/38.jpg)
Debugging a Streaming applicationTask Details PageEnsure that the tasks are executed on multiple executors (nodes) in your cluster to have enough parallelism while processing. If you have a single receiver, sometimes only one executor might be doing all the work though you have more than one executor in your cluster.
![Page 39: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/39.jpg)
Key benefits of Spark streaming
![Page 40: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/40.jpg)
Dynamic Load Balancing
![Page 41: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/41.jpg)
Fast failure and Straggler recovery
![Page 42: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/42.jpg)
Combine Batch and Stream ProcessingJoin data streams with static data sets
val dataset = sparkContext.hadoopFile(“file”)
…
kafkaStream.transform{ batchRdd =>
batchRdd.join(dataset).filter(...)
}
![Page 43: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/43.jpg)
Combine ML and Stream Processing
Learn models offline, apply them online
val model = KMeans.train(dataset, …)
kakfaStream.map { event =>
model.predict(event.feature)
}
![Page 44: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/44.jpg)
Combine SQL and Stream Processing
inputStream.foreachRDD{ rdd =>
val df = SQLContext.createDataframe(rdd)
df.select(...).where(...).groupBy(...)
}
![Page 45: Spark streaming: Best Practices](https://reader031.vdocuments.us/reader031/viewer/2022021500/587014b01a28ab7f428b51dd/html5/thumbnails/45.jpg)
Thank you.