spark/cassandra integration theory & practicedoanduyhai spark/cassandra integration theory &...
TRANSCRIPT
![Page 1: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/1.jpg)
@doanduyhai
Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate
![Page 2: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/2.jpg)
@doanduyhai
Who Am I ?!Duy Hai DOAN Cassandra technical advocate • talks, meetups, confs • open-source devs (Achilles, …) • OSS Cassandra point of contact
☞ [email protected] ☞ @doanduyhai
2
![Page 3: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/3.jpg)
@doanduyhai
Datastax!• Founded in April 2010
• We contribute a lot to Apache Cassandra™
• 400+ customers (25 of the Fortune 100), 400+ employees
• Headquarter in San Francisco Bay area
• EU headquarter in London, offices in France and Germany
• Datastax Enterprise = OSS Cassandra + extra features
3
![Page 4: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/4.jpg)
@doanduyhai
Spark – Cassandra Use Cases!
Load data from various sources
Analytics (join, aggregate, transform, …)
Sanitize, validate, normalize, transform data
Schema migration, Data conversion
4
![Page 5: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/5.jpg)
Spark & Cassandra Presentation !
Spark & its eco-system!Cassandra Quick Recap!
!
![Page 6: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/6.jpg)
@doanduyhai
What is Apache Spark ?!Created at Apache Project since 2010 General data processing framework Faster than Hadoop, in memory One-framework-many-components approach
6
![Page 7: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/7.jpg)
@doanduyhai
Spark code example!Setup
Data-set (can be from text, CSV, JSON, Cassandra, HDFS, …)
val$conf$=$new$SparkConf(true)$$ .setAppName("basic_example")$$ .setMaster("local[3]")$$val$sc$=$new$SparkContext(conf)$
val$people$=$List(("jdoe","John$DOE",$33),$$$$$$$$$$$$$$$$$$$("hsue","Helen$SUE",$24),$$$$$$$$$$$$$$$$$$$("rsmith",$"Richard$Smith",$33))$
7
![Page 8: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/8.jpg)
@doanduyhai
RDDs!RDD = Resilient Distributed Dataset val$parallelPeople:$RDD[(String,$String,$Int)]$=$sc.parallelize(people)$$val$extractAge:$RDD[(Int,$(String,$String,$Int))]$=$parallelPeople$$ $ $ $ $ $ .map(tuple$=>$(tuple._3,$tuple))$$val$groupByAge:$RDD[(Int,$Iterable[(String,$String,$Int)])]=extractAge.groupByKey()$$val$countByAge:$Map[Int,$Long]$=$groupByAge.countByKey()$
8
![Page 9: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/9.jpg)
@doanduyhai
RDDs!RDD[A] = distributed collection of A • RDD[Person] • RDD[(String,Int)], … RDD[A] split into partitions Partitions distributed over n workers à parallel computing
9
![Page 10: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/10.jpg)
@doanduyhai
Direct transformations!map(f: A => B): RDD[B] filter(f: A => Boolean): RDD[A] …
10
![Page 11: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/11.jpg)
@doanduyhai
Transformations requiring shuffle!groupByKey(): RDD[(K,V)] reduceByKey(f: (V,V) => V): RDD[(K,V)] join[W](otherRDD: RDD[(K,W)]): RDD[(K, (V,W))] …
11
![Page 12: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/12.jpg)
@doanduyhai
Actions!collect(): Array[A] take(number: Int): Array[A] foreach(f: A => Unit): Unit …
12
![Page 13: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/13.jpg)
@doanduyhai
Partitions transformations!
map(tuple => (tuple._3, tuple))
Direct transformation
Shuffle (expensive !)
groupByKey()
countByKey()
partition
RDD
Final action
13
![Page 14: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/14.jpg)
@doanduyhai
Spark eco-system!
Local Standalone cluster YARN Mesos
Spark Core Engine (Scala/Java/Python)
Spark Streaming MLLib GraphX Spark SQL
Persistence
Cluster Manager
…
etc…
14
![Page 15: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/15.jpg)
@doanduyhai
Spark eco-system!
Local Standalone cluster YARN Mesos
Spark Core Engine (Scala/Java/Python)
Spark Streaming MLLib GraphX Spark SQL
Persistence
Cluster Manager
…
etc…
15
![Page 16: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/16.jpg)
@doanduyhai
What is Apache Cassandra?!Created at Apache Project since 2009 Distributed NoSQL database Eventual consistency (A & P of the CAP theorem) Distributed table abstraction
16
![Page 17: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/17.jpg)
@doanduyhai
Cassandra data distribution reminder!Random: hash of #partition → token = hash(#p) Hash: ]-X, X] X = huge number (264/2)
n1
n2
n3
n4
n5
n6
n7
n8
17
![Page 18: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/18.jpg)
@doanduyhai
Cassandra token ranges!A: ]0, X/8] B: ] X/8, 2X/8] C: ] 2X/8, 3X/8] D: ] 3X/8, 4X/8] E: ] 4X/8, 5X/8] F: ] 5X/8, 6X/8] G: ] 6X/8, 7X/8] H: ] 7X/8, X] Murmur3 hash function
n1
n2
n3
n4
n5
n6
n7
n8
A
B
C
D
E
F
G
H
18
![Page 19: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/19.jpg)
@doanduyhai
Linear scalability!
n1
n2
n3
n4
n5
n6
n7
n8
A
B
C
D
E
F
G
H
user_id1
user_id2
user_id3
user_id4
user_id5
19
![Page 20: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/20.jpg)
@doanduyhai
Linear scalability!
n1
n2
n3
n4
n5
n6
n7
n8
A
B
C
D
E
F
G
H
user_id1
user_id2
user_id3
user_id4
user_id5
20
![Page 21: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/21.jpg)
@doanduyhai
Cassandra Query Language (CQL)!
INSERT INTO users(login, name, age) VALUES(‘jdoe’, ‘John DOE’, 33);
UPDATE users SET age = 34 WHERE login = ‘jdoe’;
DELETE age FROM users WHERE login = ‘jdoe’;
SELECT age FROM users WHERE login = ‘jdoe’;
21
![Page 22: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/22.jpg)
Spark & Cassandra Connector!
Spark Core API!SparkSQL/DataFrame!
Spark Streaming!
![Page 23: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/23.jpg)
@doanduyhai
Spark/Cassandra connector architecture!All Cassandra types supported and converted to Scala types Server side data filtering (SELECT … WHERE …) Use Java-driver underneath !Scala and Java support. Python support via PySpark (exp.)
23
![Page 24: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/24.jpg)
@doanduyhai
Connector architecture – Core API!Cassandra tables exposed as Spark RDDs
Read from and write to Cassandra
Mapping of C* tables and rows to Scala objects • CassandraRDD and CassandraRow • Scala case class (object mapper) • Scala tuples
24
![Page 25: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/25.jpg)
@doanduyhai
Spark Core https://github.com/doanduyhai/incubator-zeppelin/tree/ApacheBigData
![Page 26: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/26.jpg)
@doanduyhai
Connector architecture – DataFrame !Mapping of Cassandra table to DataFrame
• CassandraSQLContext à org.apache.spark.sql.SQLContext
• CassandraSQLRow à org.apache.spark.sql.catalyst.expressions.Row
• Mapping of Cassandra types to Catalyst types
• CassandraCatalog à Catalog (used by Catalyst Analyzer)
26
![Page 27: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/27.jpg)
@doanduyhai
Connector architecture – DataFrame !Mapping of Cassandra table to SchemaRDD
• CassandraSourceRelation • extends BaseRelation with InsertableRelation with PruntedFilteredScan
• custom query plan
• push predicates to CQL for early filtering (if possible)
SELECT * FROM user_emails WHERE login = ‘jdoe’;
27
![Page 28: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/28.jpg)
@doanduyhai
Spark SQL https://github.com/doanduyhai/incubator-zeppelin/tree/ApacheBigData
![Page 29: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/29.jpg)
@doanduyhai
Connector architecture – Spark Streaming !Streaming data INTO Cassandra table • trivial setup • be careful about your Cassandra data model when having an infinite
stream !!!
Streaming data OUT of Cassandra tables (CASSANDRA-8844) ? • notification system (publish/subscribe) • at-least-once delivery semantics • work in progress …
29
![Page 30: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/30.jpg)
@doanduyhai
Spark Streaming https://github.com/doanduyhai/incubator-zeppelin/tree/ApacheBigData
![Page 31: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/31.jpg)
@doanduyhai
Q & R
! " !
![Page 32: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/32.jpg)
Spark/Cassandra operations!
Cluster deployment & job lifecycle!Data locality!
Failure handling!Cross-region operations!
![Page 33: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/33.jpg)
@doanduyhai
Cluster deployment!
C* SparkM SparkW
C* SparkW
C* SparkW
C* SparkW
C* SparkW
Stand-alone cluster
33
![Page 34: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/34.jpg)
@doanduyhai
Cassandra – Spark placement!
Spark Worker Spark Worker Spark Worker Spark Worker
1 Cassandra process ⟷ 1 Spark worker
C* C* C* C*
34
Spark Master
![Page 35: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/35.jpg)
@doanduyhai
Cassandra – Spark job lifecycle!
Spark Worker Spark Worker Spark Worker Spark Worker
C* C* C* C*
35
Spark Master
Spark Client
Driver Program
Spark Context 1
D e f i n e y o u r business logic here !
![Page 36: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/36.jpg)
@doanduyhai
Cassandra – Spark job lifecycle!
Spark Worker Spark Worker Spark Worker Spark Worker
C* C* C* C*
36
Spark Master
Spark Client
Driver Program
Spark Context
2
![Page 37: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/37.jpg)
@doanduyhai
Cassandra – Spark job lifecycle!
Spark Worker Spark Worker Spark Worker Spark Worker
C* C* C* C*
37
Spark Master
Spark Client
Driver Program
Spark Context
3 3 3 3
![Page 38: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/38.jpg)
@doanduyhai
Cassandra – Spark job lifecycle!
Spark Worker Spark Worker Spark Worker Spark Worker
C* C* C* C*
38
Spark Master
Spark Client
Driver Program
Spark Context
Spark Executor Spark Executor Spark Executor Spark Executor
4 4 4 4
![Page 39: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/39.jpg)
@doanduyhai
Cassandra – Spark job lifecycle!
Spark Worker Spark Worker Spark Worker Spark Worker
C* C* C* C*
39
Spark Master
Spark Client
Driver Program
Spark Context
Spark Executor Spark Executor Spark Executor Spark Executor
5 5 5 5
![Page 40: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/40.jpg)
@doanduyhai
Data Locality – Cassandra token ranges!A: ]0, X/8] B: ] X/8, 2X/8] C: ] 2X/8, 3X/8] D: ] 3X/8, 4X/8] E: ] 4X/8, 5X/8] F: ] 5X/8, 6X/8] G: ] 6X/8, 7X/8] H: ] 7X/8, X]
n1
n2
n3
n4
n5
n6
n7
n8
A
B
C
D
E
F
G
H
40
![Page 41: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/41.jpg)
@doanduyhai
Data Locality – How To!C*
SparkM SparkW
C* SparkW
C* SparkW
C* SparkW
C* SparkW
Spark partition RDD
Cassandra tokens ranges
41
![Page 42: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/42.jpg)
@doanduyhai
Data Locality – How To!C*
SparkM SparkW
C* SparkW
C* SparkW
C* SparkW
C* SparkW
Use Murmur3Partitioner
42
![Page 43: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/43.jpg)
@doanduyhai
Read data locality!Read from Cassandra
43
![Page 44: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/44.jpg)
@doanduyhai
Read data locality!Spark shuffle operations
44
![Page 45: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/45.jpg)
@doanduyhai
Write to Cassandra without data locality!
Async batches fan-out writes to Cassandra
Because of shuffle, original data locality is lost
45
![Page 46: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/46.jpg)
@doanduyhai
Write to Cassandra with data locality!
Write to Cassandra
rdd.repartitionByCassandraReplica("keyspace","table")
46
![Page 47: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/47.jpg)
@doanduyhai
Write data locality!• either stream data in Spark layer using repartitionByCassandraReplica() • or flush data to Cassandra by async batches • in any case, there will be data movement on network (sorry no magic)
47
![Page 48: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/48.jpg)
@doanduyhai
Joins with data locality! CREATE TABLE artists(name text, style text, … PRIMARY KEY(name));
CREATE TABLE albums(title text, artist text, year int,… PRIMARY KEY(title));
val join: CassandraJoinRDD[(String,Int), (String,String)] = sc.cassandraTable[(String,Int)](KEYSPACE, ALBUMS) // Select only useful columns for join and processing .select("artist","year") .as((_:String, _:Int)) // Repartition RDDs by "artists" PK, which is "name" .repartitionByCassandraReplica(KEYSPACE, ARTISTS) // Join with "artists" table, selecting only "name" and "country" columns .joinWithCassandraTable[(String,String)](KEYSPACE, ARTISTS, SomeColumns("name","country")) .on(SomeColumns("name"))
48
![Page 49: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/49.jpg)
@doanduyhai
Joins pipeline with data locality!
LOCAL READ FROM CASSANDRA
49
![Page 50: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/50.jpg)
@doanduyhai
Joins pipeline with data locality!
SHUFFLE DATA WITH SPARK
50
![Page 51: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/51.jpg)
@doanduyhai
Joins pipeline with data locality!
REPARTITION TO MAP CASSANDRA REPLICA
51
![Page 52: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/52.jpg)
@doanduyhai
Joins pipeline with data locality!
JOIN WITH DATA LOCALITY
52
![Page 53: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/53.jpg)
@doanduyhai
Joins pipeline with data locality!
ANOTHER ROUND OF SHUFFLING
53
![Page 54: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/54.jpg)
@doanduyhai
Joins pipeline with data locality!
REPARTITION AGAIN FOR CASSANDRA
54
![Page 55: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/55.jpg)
@doanduyhai
Joins pipeline with data locality!
SAVE TO CASSANDRA WITH LOCALITY
55
![Page 56: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/56.jpg)
@doanduyhai
Perfect data locality scenario!• read localy from Cassandra • use operations that do not require shuffle in Spark (map, filter, …) • repartitionbyCassandraReplica() • à to a table having same partition key as original table • save back into this Cassandra table
Sanitize, validate, normalize, transform data USE CASE
56
![Page 57: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/57.jpg)
@doanduyhai
Failure handling!Stand-alone cluster C*
SparkM SparkW
C* SparkW
C* SparkW
C* SparkW
C* SparkW
57
![Page 58: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/58.jpg)
@doanduyhai
Failure handling!What if 1 node down ? What if 1 node overloaded ?
C* SparkM SparkW
C* SparkW
C* SparkW
C* SparkW
C* SparkW
58
![Page 59: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/59.jpg)
@doanduyhai
Failure handling!What if 1 node down ? What if 1 node overloaded ? ☞ Spark master will re-assign the job to another worker
C* SparkM SparkW
C* SparkW
C* SparkW
C* SparkW
C* SparkW
59
![Page 60: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/60.jpg)
@doanduyhai
Failure handling!
Oh no, my data locality !!!
60
![Page 61: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/61.jpg)
@doanduyhai
Failure handling!
61
![Page 62: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/62.jpg)
@doanduyhai
Data Locality Impl!RDD interface (extract)
abstract'class'RDD[T](…)'{'' @DeveloperApi'' def'compute(split:'Partition,'context:'TaskContext):'Iterator[T]''' protected'def'getPartitions:'Array[Partition]'' '' protected'def'getPreferredLocations(split:'Partition):'Seq[String]'='Nil'''''''''''''''}'
62
![Page 63: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/63.jpg)
@doanduyhai
CassandraRDD!def getPreferredLocations(split: Partition): Cassandra replicas IP address corresponding to this Spark partition
63
![Page 64: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/64.jpg)
@doanduyhai
Failure handling!If RF > 1 the Spark master choses the next preferred location, which
is a replica 😎 Tune parameters: ① spark.locality.wait ② spark.locality.wait.process ③ spark.locality.wait.node
C* SparkM SparkW
C* SparkW
C* SparkW
C* SparkW
C* SparkW
64
![Page 65: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/65.jpg)
@doanduyhai
val confDC1 = new SparkConf(true) .setAppName("data_migration") .setMaster("master_ip") .set("spark.cassandra.connection.host", "DC_1_hostnames") .set("spark.cassandra.connection.local_dc", "DC_1") val confDC2 = new SparkConf(true) .setAppName("data_migration") .setMaster("master_ip") .set("spark.cassandra.connection.host", "DC_2_hostnames") .set("spark.cassandra.connection.local_dc", "DC_2 ") val sc = new SparkContext(confDC1) sc.cassandraTable[Performer](KEYSPACE,PERFORMERS) .map[Performer](???) .saveToCassandra(KEYSPACE,PERFORMERS) (CassandraConnector(confDC2),implicitly[RowWriterFactory[Performer]])
Cross-DC operations!
65
![Page 66: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/66.jpg)
@doanduyhai
val confCluster1 = new SparkConf(true) .setAppName("data_migration") .setMaster("master_ip") .set("spark.cassandra.connection.host", "cluster_1_hostnames") val confCluster2 = new SparkConf(true) .setAppName("data_migration") .setMaster("master_ip") .set("spark.cassandra.connection.host", "cluster_2_hostnames") val sc = new SparkContext(confCluster1) sc.cassandraTable[Performer](KEYSPACE,PERFORMERS) .map[Performer](???) .saveToCassandra(KEYSPACE,PERFORMERS) (CassandraConnector(confCluster2),implicitly[RowWriterFactory[Performer]])
Cross-cluster operations!
66
![Page 67: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/67.jpg)
Spark/Cassandra use-case demos!
Data cleaning!Schema migration!
Analytics!!
![Page 68: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/68.jpg)
@doanduyhai
Use Cases!
Load data from various sources
Analytics (join, aggregate, transform, …)
Sanitize, validate, normalize, transform data
Schema migration, Data conversion
68
![Page 69: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/69.jpg)
@doanduyhai
Data cleaning use-cases!Bug in your application ? Dirty input data ? ☞ Spark job to clean it up! (perfect data locality)
Sanitize, validate, normalize, transform data
69
![Page 70: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/70.jpg)
@doanduyhai
Data Cleaning https://github.com/doanduyhai/incubator-zeppelin/tree/ApacheBigData
![Page 71: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/71.jpg)
@doanduyhai
Schema migration use-cases!Business requirements change with time ? Current data model no longer relevant ? ☞ Spark job to migrate data !
Schema migration, Data conversion
71
![Page 72: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/72.jpg)
@doanduyhai
Data Migration https://github.com/doanduyhai/incubator-zeppelin/tree/ApacheBigData
![Page 73: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/73.jpg)
@doanduyhai
Analytics use-cases!Given existing tables of performers and albums, I want to : • count the number of albums releases by decade (70’s, 80’s, 90’s, …)
☞ Spark job to compute analytics !
Analytics (join, aggregate, transform, …)
73
![Page 74: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/74.jpg)
@doanduyhai
Analytics pipeline!① Read from production transactional tables
② Perform aggregation with Spark
③ Save back data into dedicated tables for fast visualization
④ Repeat step ①
74
![Page 75: Spark/Cassandra Integration Theory & Practicedoanduyhai Spark/Cassandra Integration Theory & Practice DuyHai DOAN, Technical Advocate](https://reader031.vdocuments.us/reader031/viewer/2022022423/5a9ffa957f8b9a84178d889e/html5/thumbnails/75.jpg)
@doanduyhai
Analytics https://github.com/doanduyhai/incubator-zeppelin/tree/ApacheBigData