apache: big data - starting with apache spark, best practices

109

Upload: felixcss

Post on 23-Jan-2018

241 views

Category:

Data & Analytics


3 download

TRANSCRIPT

Page 1: Apache: Big Data - Starting with Apache Spark, Best Practices
Page 2: Apache: Big Data - Starting with Apache Spark, Best Practices

Starting with Apache Spark, Best Practices and Learning from

the FieldFelix Cheung, Principal Engineer + Spark Committer

Spark@Microsoft

Page 3: Apache: Big Data - Starting with Apache Spark, Best Practices
Page 4: Apache: Big Data - Starting with Apache Spark, Best Practices

Best PracticesEnterprise Solutions

Page 5: Apache: Big Data - Starting with Apache Spark, Best Practices
Page 6: Apache: Big Data - Starting with Apache Spark, Best Practices
Page 7: Apache: Big Data - Starting with Apache Spark, Best Practices

Resilient - Fault tolerant

Page 8: Apache: Big Data - Starting with Apache Spark, Best Practices
Page 9: Apache: Big Data - Starting with Apache Spark, Best Practices

19,500+ commits

Page 10: Apache: Big Data - Starting with Apache Spark, Best Practices

TungstenAMPLab becoming RISELab• Drizzle – low latency execution, 3.5x lower than

Spark Streaming• Ernest – performance prediction, automatically

choose the optimal resource config on the cloud

Page 11: Apache: Big Data - Starting with Apache Spark, Best Practices
Page 12: Apache: Big Data - Starting with Apache Spark, Best Practices

DeploymentSchedulerResource Manager (aka Cluster Manager)

- Spark History Server, Spark UI

Spark Core

Page 13: Apache: Big Data - Starting with Apache Spark, Best Practices
Page 14: Apache: Big Data - Starting with Apache Spark, Best Practices

Parallelization, PartitionTransformationActionShuffle

Page 15: Apache: Big Data - Starting with Apache Spark, Best Practices

Doing multiple things at the same time

Page 16: Apache: Big Data - Starting with Apache Spark, Best Practices

A unit of parallelization

Page 17: Apache: Big Data - Starting with Apache Spark, Best Practices

Manipulating data - immutable"Narrow""Wide"

Page 18: Apache: Big Data - Starting with Apache Spark, Best Practices
Page 19: Apache: Big Data - Starting with Apache Spark, Best Practices
Page 20: Apache: Big Data - Starting with Apache Spark, Best Practices

Processing: sorting, serialize/deserialize, compressionTransfer: disk IO, network bandwidth/latencyTake up memory, or spill to disk for intermediate results ("shuffle file")

Page 21: Apache: Big Data - Starting with Apache Spark, Best Practices

Materialize resultsExecute the chain of transformations that leads to output – lazy evaluationcountcollect -> takewrite

Page 22: Apache: Big Data - Starting with Apache Spark, Best Practices

DataFrameDatasetData sourceExecution engine - Catalyst

SQL

Page 23: Apache: Big Data - Starting with Apache Spark, Best Practices

Execution PlanPredicate Pushdown

Page 24: Apache: Big Data - Starting with Apache Spark, Best Practices

Strong typingOptimized execution

Page 25: Apache: Big Data - Starting with Apache Spark, Best Practices

Dataset[Row]

Partition = set of Row's

Page 26: Apache: Big Data - Starting with Apache Spark, Best Practices

"format" - Parquet, CSV, JSON, or Cassandra, HBase

Page 27: Apache: Big Data - Starting with Apache Spark, Best Practices
Page 28: Apache: Big Data - Starting with Apache Spark, Best Practices

Ability to process expressions as early in the plan as possible

Page 29: Apache: Big Data - Starting with Apache Spark, Best Practices

spark.read.jdbc(jdbcUrl, "food", connectionProperties)

// with pushdownspark.read.jdbc(jdbcUrl, "food", connectionProperties).select("hotdog", "pizza", "sushi")

Page 30: Apache: Big Data - Starting with Apache Spark, Best Practices

Discretized Streams (DStreams)Receiver DStreamDirect DStreamBasic and Advanced Sources

Streaming

Page 31: Apache: Big Data - Starting with Apache Spark, Best Practices

SourceReliabilityReceiver + Write Ahead Log (WAL)Checkpointing

Page 32: Apache: Big Data - Starting with Apache Spark, Best Practices
Page 33: Apache: Big Data - Starting with Apache Spark, Best Practices
Page 34: Apache: Big Data - Starting with Apache Spark, Best Practices
Page 35: Apache: Big Data - Starting with Apache Spark, Best Practices

https://databricks.com/wp-content/uploads/2015/01/blog-ha-52.png

Page 36: Apache: Big Data - Starting with Apache Spark, Best Practices

Only for reliable messaging sources that supports read from positionStronger fault-tolerance, exactly-once*No receiver/WAL– less resource, lower overhead

Page 37: Apache: Big Data - Starting with Apache Spark, Best Practices

Saving to reliable storage to recover from failure1. Metadata checkpointingStreamingContext.checkpoint()

2. Data checkpointingdstream.checkpoint()

Page 38: Apache: Big Data - Starting with Apache Spark, Best Practices

ML PipelineTransformerEstimatorEvaluator

Machine Learning

Page 39: Apache: Big Data - Starting with Apache Spark, Best Practices

DataFrame-based- leverage optimizations and support transformationsa sequence of algorithms- PipelineStages

Page 40: Apache: Big Data - Starting with Apache Spark, Best Practices

Transformer EstimatorTransformerDataFrame

Feature engineering Modeling

Page 41: Apache: Big Data - Starting with Apache Spark, Best Practices

Feature transformer- take a DataFrame and its Column and append one or more new Column

Page 42: Apache: Big Data - Starting with Apache Spark, Best Practices

StopWordsRemoverBinarizerSQLTransformerVectorAssembler

Page 43: Apache: Big Data - Starting with Apache Spark, Best Practices

Estimators

An algorithmDataFrame -> ModelA Model is a Transformer

LinearRegressionKMeans

Page 44: Apache: Big Data - Starting with Apache Spark, Best Practices

Evaluator

Metric to measure Model performance on held-out test data

Page 45: Apache: Big Data - Starting with Apache Spark, Best Practices

Evaluator

MulticlassClassificationEvaluatorBinaryClassificationEvaluatorRegressionEvaluator

Page 46: Apache: Big Data - Starting with Apache Spark, Best Practices

MLWriter/MLReader

Pipeline persistenceInclude transformers, estimators, Params

Page 47: Apache: Big Data - Starting with Apache Spark, Best Practices

GraphPregelGraph AlgorithmsGraph Queries

Graph

Page 48: Apache: Big Data - Starting with Apache Spark, Best Practices

Directed multigraph with user properties on edges and vertices

SEANYC

LAX

Page 49: Apache: Big Data - Starting with Apache Spark, Best Practices

PageRankConnectedComponents

ranks = tripGraph.pageRank(resetProbability=0.15, maxIter=5)

Page 50: Apache: Big Data - Starting with Apache Spark, Best Practices

DataFrame-basedSimplify loading graph data, wranglingSupport Graph Queries

Page 51: Apache: Big Data - Starting with Apache Spark, Best Practices

Pattern matchingMix pattern with SQL syntax

motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a); !(c)-[]->(a)").filter("a.id = 'MIA'")

Page 52: Apache: Big Data - Starting with Apache Spark, Best Practices

Structured Streaming ModelSourceSinkStreamingQuery

Structured Streaming

Page 53: Apache: Big Data - Starting with Apache Spark, Best Practices

Extending same DataFrame to include incremental execution of unbounded input

Reliability, correctness / exactly-once -checkpointing (2.1 JSON format)

Page 54: Apache: Big Data - Starting with Apache Spark, Best Practices

Stream as Unbounded Input

https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html

Page 55: Apache: Big Data - Starting with Apache Spark, Best Practices

Watermark (2.1) - handling of late dataStreaming ETL, joining static data, partitioning, windowing

Page 56: Apache: Big Data - Starting with Apache Spark, Best Practices

FileStreamSourceKafkaSourceMemoryStream (not for production)TextSocketSourceMQTT

Page 57: Apache: Big Data - Starting with Apache Spark, Best Practices

FileStreamSink (new formats in 2.1)ConsoleSinkForeachSink (Scala only)MemorySink – as Temp View

Page 58: Apache: Big Data - Starting with Apache Spark, Best Practices

staticDF = (spark

.read

.schema(jsonSchema)

.json(inputPath))

Page 59: Apache: Big Data - Starting with Apache Spark, Best Practices

streamingDF = (spark

.readStream

.schema(jsonSchema)

.option("maxFilesPerTrigger", 1)

.json(inputPath))# Take a list of files as a stream

Page 60: Apache: Big Data - Starting with Apache Spark, Best Practices

streamingCountsDF = (streamingDF

.groupBy(streamingDF.word,window(streamingDF.time,"1 hour"))

.count())

Page 61: Apache: Big Data - Starting with Apache Spark, Best Practices

query = (streamingCountsDF

.writeStream

.format("memory")

.queryName("word_counts")

.outputMode("complete")

.start())spark.sql("select count from word_counts order by time")

Page 62: Apache: Big Data - Starting with Apache Spark, Best Practices
Page 63: Apache: Big Data - Starting with Apache Spark, Best Practices

How much going in affects how much work it's going to take

Page 64: Apache: Big Data - Starting with Apache Spark, Best Practices

Size does matter!CSV or JSON is "simple" but also tend to be bigJSON-> Parquet (compressed)- 7x faster

Page 65: Apache: Big Data - Starting with Apache Spark, Best Practices

Format also does matter

Recommended format - ParquetDefault data source/format• VectorizedReader• Better dictionary decoding

Page 66: Apache: Big Data - Starting with Apache Spark, Best Practices

Parquet Columnar Format

Column chunk co-locatedMetadata and headers for skipping

Page 67: Apache: Big Data - Starting with Apache Spark, Best Practices

Recommend Parquet

Page 68: Apache: Big Data - Starting with Apache Spark, Best Practices

Compression is a factor

gzip <100MB/s vs snappy 500MB/sTradeoffs: faster or smaller?Spark 2.0+ defaults to snappy

Page 69: Apache: Big Data - Starting with Apache Spark, Best Practices

Sidenote: Table Partitioning

Storage data into groups of partitioning columnsEncoded path structure matches Hivetable/event_date=2017-02-01

Page 70: Apache: Big Data - Starting with Apache Spark, Best Practices

Spark UITimeline view

https://databricks.com/blog/2015/06/22/understanding-your-spark-application-through-visualization.html

Page 71: Apache: Big Data - Starting with Apache Spark, Best Practices
Page 72: Apache: Big Data - Starting with Apache Spark, Best Practices

Spark UIDAG view

https://databricks.com/blog/2015/06/22/understanding-your-spark-application-through-visualization.html

Page 73: Apache: Big Data - Starting with Apache Spark, Best Practices

Executor tab

Page 74: Apache: Big Data - Starting with Apache Spark, Best Practices

SQL tab

Page 75: Apache: Big Data - Starting with Apache Spark, Best Practices
Page 76: Apache: Big Data - Starting with Apache Spark, Best Practices

Understanding Queries

explain() is your friendbut it could be hard to understand at times == Parsed Logical Plan ==

Aggregate [count(1) AS count#79L]+- Sort [speed_y#49 ASC], true

+- Join Inner, (speed_x#48 = speed_y#49):- Project [speed#2 AS speed_x#48, dist#3]: +- LogicalRDD [speed#2, dist#3]+- Project [speed#18 AS speed_y#49, dist#19]

+- LogicalRDD [speed#18, dist#19]

Page 77: Apache: Big Data - Starting with Apache Spark, Best Practices
Page 78: Apache: Big Data - Starting with Apache Spark, Best Practices

== Physical Plan ==*HashAggregate(keys=[], functions=[count(1)], output=[count#79L])+- Exchange SinglePartition

+- *HashAggregate(keys=[], functions=[partial_count(1)], output=[count#83L])

+- *Project+- *Sort [speed_y#49 ASC], true, 0

+- Exchange rangepartitioning(speed_y#49 ASC, 200)+- *Project [speed_y#49]

+- *SortMergeJoin [speed_x#48], [speed_y#49], Inner:- *Sort [speed_x#48 ASC], false, 0: +- Exchange hashpartitioning(speed_x#48, 200): +- *Project [speed#2 AS speed_x#48]: +- *Filter isnotnull(speed#2): +- Scan ExistingRDD[speed#2,dist#3]+- *Sort [speed_y#49 ASC], false, 0

+- Exchange hashpartitioning(speed_y#49, 200)+- *Project [speed#18 AS speed_y#49]

Page 79: Apache: Big Data - Starting with Apache Spark, Best Practices

UDF

Write you own custom transformsBut... Catalyst can't see through it (yet?!)Always prefer to use builtin transforms as much as possible

Page 80: Apache: Big Data - Starting with Apache Spark, Best Practices

UDF vs Builtin Example

Remember Predicate Pushdown?val isSeattle = udf { (s: String) => s == "Seattle" }cities.where(isSeattle('name))*Filter UDF(name#2)+- *FileScan parquet [id#128L,name#2] Batched: true, Format: ParquetFormat, InputPaths: file:/Users/b/cities.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,name:string>

Page 81: Apache: Big Data - Starting with Apache Spark, Best Practices

UDF vs Builtin Example

cities.where('name === "Seattle")*Project [id#128L, name#2]+- *Filter (isnotnull(name#2) && (name#2 = Seattle))

+- *FileScan parquet [id#128L,name#2] Batched: true, Format: ParquetFormat, InputPaths: file:/Users/b/cities.parquet, PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,Seattle)], ReadSchema: struct<id:bigint,name:string>

Page 82: Apache: Big Data - Starting with Apache Spark, Best Practices

UDF in Python

Avoid!Why? Pickling, transfer, extra memory to run Python interpreter- Hard to debug errors!

from pyspark.sql.types import IntegerTypesqlContext.udf.register("stringLengthInt", lambda x: len(x), IntegerType())sqlContext.sql("SELECT stringLengthInt('test')").take(1)

Page 83: Apache: Big Data - Starting with Apache Spark, Best Practices

Going for Performance

Stored in compressed ParquetPartitioned tablePredicate PushdownAvoid UDF

Page 84: Apache: Big Data - Starting with Apache Spark, Best Practices

Shuffling for Join

Can be veryexpensive

Page 85: Apache: Big Data - Starting with Apache Spark, Best Practices

Optimizing for Join

Partition!Narrow transform if left and right partitioned with same scheme

Page 86: Apache: Big Data - Starting with Apache Spark, Best Practices

Optimizing for JoinBroadcast Join (aka Map-Side Join in Hadoop)

Smaller table against large table - avoid shuffling large table

Default 10MB auto broadcast

Page 87: Apache: Big Data - Starting with Apache Spark, Best Practices

BroadcastHashJoinleft.join(right, Seq("id"), "leftanti").explain

== Physical Plan ==*BroadcastHashJoin [id#50], [id#60], LeftAnti, BuildRight:- LocalTableScan [id#50, left#51]+- BroadcastExchangeHashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))

+- LocalTableScan [id#60]

Page 88: Apache: Big Data - Starting with Apache Spark, Best Practices

Repartition

To numPartitions or by ColumnsIncrease parallelism – will shuffle

coalesce() – combine partitions in place

Page 89: Apache: Big Data - Starting with Apache Spark, Best Practices

Cache

cache() or persist()Flush least-recently-used (LRU)- Make sure there is enough memory!MEMORY_AND_DISK to avoid expensive recompute (but spill to disk is slow)

Page 90: Apache: Big Data - Starting with Apache Spark, Best Practices

Streaming

Use Structured Streaming (2.1+)If not...

If reliable messaging (Kafka) use Direct DStream

Page 91: Apache: Big Data - Starting with Apache Spark, Best Practices

Metadata - ConfigPosition from streaming source (aka offset)- could get duplicates! (at-least-once)Pending batches

Page 92: Apache: Big Data - Starting with Apache Spark, Best Practices

Persist stateful transformations- data lost if not savedCut short execution that could grow indefinitely

Page 93: Apache: Big Data - Starting with Apache Spark, Best Practices

Direct DStream

Checkpoint also store offset

Turn off auto commit- do when in good state for exactly-once

Page 94: Apache: Big Data - Starting with Apache Spark, Best Practices

CheckpointingStream/ML/Graph/SQL- more efficient indefinite/iterative- recoveryGenerally not versioning-safe

Use reliable distributed file system(caution on “object store”)

Page 95: Apache: Big Data - Starting with Apache Spark, Best Practices
Page 96: Apache: Big Data - Starting with Apache Spark, Best Practices
Page 97: Apache: Big Data - Starting with Apache Spark, Best Practices

Hadoop

WebLog Spark SQL

ExternalDataSource

BI ToolsHDFS

Hive

HiveMetastore

FrontEnd

Hourly

Page 98: Apache: Big Data - Starting with Apache Spark, Best Practices

SparkStreaming

SparkML

Kafka

HDFS

FrontEnd

Near-RealTime(end-to-end roundtrip:8-20 sec)

OfflineAnalysis

Page 99: Apache: Big Data - Starting with Apache Spark, Best Practices
Page 100: Apache: Big Data - Starting with Apache Spark, Best Practices

Spark SQL

RDBMS

BI Tools

Hive

SQLAppliance

BI Tools

Page 101: Apache: Big Data - Starting with Apache Spark, Best Practices
Page 102: Apache: Big Data - Starting with Apache Spark, Best Practices

SparkStreaming

MessageBus

SparkMLKafka

Storage

Spark SQL

Visualization

ExternalDataSource

BI Tools

Data Lake

HiveMetastore

SparkML

Page 103: Apache: Big Data - Starting with Apache Spark, Best Practices
Page 104: Apache: Big Data - Starting with Apache Spark, Best Practices

SparkStreaming

Kafka

HDFS

Spark SQLVisualization

Presto

BI ToolsHive

SQL

Flume

SQL

DataScienceNotebook

Page 105: Apache: Big Data - Starting with Apache Spark, Best Practices
Page 106: Apache: Big Data - Starting with Apache Spark, Best Practices

SparkStreaming

MessageBus

StorageSQL

Spark SQL Spark SQL

DataFactory

Page 107: Apache: Big Data - Starting with Apache Spark, Best Practices

Moving

Page 108: Apache: Big Data - Starting with Apache Spark, Best Practices
Page 109: Apache: Big Data - Starting with Apache Spark, Best Practices

https://www.linkedin.com/in/felix-cheung-b4067510https://github.com/felixcheung