cassandra spark connector
TRANSCRIPT
![Page 1: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/1.jpg)
@doanduyhai
Cassandra Spark Connector DuyHai DOAN, Technical Advocate
![Page 2: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/2.jpg)
@doanduyhai
Who Am I ?!
2
Duy Hai DOAN Cassandra technical advocate • talks, meetups, confs • open-source devs (Achilles, …) • OSS Cassandra point of contact
☞ [email protected] ☞ @doanduyhai
![Page 3: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/3.jpg)
@doanduyhai
Datastax!
3
• Founded in April 2010
• We contribute a lot to Apache Cassandra™
• 400+ customers (25 of the Fortune 100), 200+ employees
• Headquarter in San Francisco Bay area
• EU headquarter in London, offices in France and Germany
• Datastax Enterprise = OSS Cassandra + extra features
![Page 4: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/4.jpg)
Spark/C* Connector Architecture!
Token ranges reminder!Stand-alone cluster deployment!
Data locality!!
![Page 5: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/5.jpg)
@doanduyhai
Spark eco-system!
5
Local Standalone cluster YARN Mesos
Spark Core Engine (Scala/Java/Python)
Spark Streaming MLLib GraphX Spark SQL
Persistence
Cluster Manager
…
![Page 6: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/6.jpg)
@doanduyhai
Spark eco-system!
6
Local Standalone cluster YARN Mesos
Spark Core Engine (Scala/Java/Python)
Spark Streaming MLLib GraphX Spark SQL
Persistence
Cluster Manager
…
![Page 7: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/7.jpg)
@doanduyhai
Data distribution!Random: hash of #partition → token = hash(#p) Hash: ]-X, X] X = huge number (264/2)
n1
n2
n3
n4
n5
n6
n7
n8
7
![Page 8: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/8.jpg)
@doanduyhai
Normal token ranges!A: ]0, X/8] B: ] X/8, 2X/8] C: ] 2X/8, 3X/8] D: ] 3X/8, 4X/8] E: ] 4X/8, 5X/8] F: ] 5X/8, 6X/8] G: ] 6X/8, 7X/8] H: ] 7X/8, X]
n1
n2
n3
n4
n5
n6
n7
n8
A
B
C
D
E
F
G
H
8
![Page 9: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/9.jpg)
@doanduyhai
Cassandra Query Language (CQL)!
9
INSERT INTO users(login, name, age) VALUES(‘jdoe’, ‘John DOE’, 33);
UPDATE users SET age = 34 WHERE login = jdoe;
DELETE age FROM users WHERE login = jdoe;
SELECT age FROM users WHERE login = jdoe;
![Page 10: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/10.jpg)
@doanduyhai
Why Spark on Cassandra ?!
10
Fast disk access
Structured data (columnar format)
Multi data-center !!!
Cross-table operations (JOIN, UNION, etc.)
Real-time/batch processing
Complex analytics (e.g. machine learning)
For Spark
For Cassandra
![Page 11: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/11.jpg)
@doanduyhai
Connector objectives!
11
Data locality Resources-efficient, performant Fluent & friendly API Object mapper
![Page 12: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/12.jpg)
@doanduyhai
Cluster deployment!
12
C* SparkM SparkW
C* SparkW
C* SparkW
C* SparkW
C* SparkW
Stand-alone cluster
![Page 13: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/13.jpg)
@doanduyhai
Cluster deployment!
13
Spark Master
Spark Worker Spark Worker Spark Worker Spark Worker
Executor Executor Executor Executor
Driver Program
Cassandra – Spark placement
C* C* C* C*
![Page 14: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/14.jpg)
@doanduyhai
Connector architecture – Core API!
14
Cassandra tables exposed as Spark RDDs
Read from and write to Cassandra
Mapping of C* tables and rows to Scala objects • CassandraRow • case class (object mapper) • Scala tuples
![Page 15: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/15.jpg)
@doanduyhai
Connector architecture – Spark SQL !
15
Mapping of C* table to SchemaRDD • custom query plan • CassandraRDD à SchemaRDD • push predicates to CQL
![Page 16: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/16.jpg)
@doanduyhai
Connector architecture – Spark Streaming !
16
Streaming data INTO Cassandra table • trivial setup • be careful about your Cassandra data model !!!
Streaming data OUT of Cassandra table • fetch all data from table • send each row as a DStream
![Page 17: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/17.jpg)
@doanduyhai
Connector architecture!
17
All Cassandra types supported and converted to Scala types Server side data selection (SELECT … WHERE …) Use Java-driver underneath !Scala and Java support
![Page 18: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/18.jpg)
@doanduyhai
Data Locality!
18
C* SparkM SparkW
C* SparkW
C* SparkW
C* SparkW
C* SparkW
Spark partition RDD
Cassandra tokens ranges
![Page 19: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/19.jpg)
@doanduyhai
Data Locality!
19
C* SparkM SparkW
C* SparkW
C* SparkW
C* SparkW
C* SparkW
Use Murmur3Partitioner
![Page 20: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/20.jpg)
@doanduyhai
Data locality!
20
Read/Write from/to Cassandra
Spark shuffle operations
![Page 21: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/21.jpg)
@doanduyhai
Data Locality!
21
Remember RDD interface ?
abstract'class'RDD[T](…)'{'' @DeveloperApi'' def'compute(split:'Partition,'context:'TaskContext):'Iterator[T]''' protected'def'getPartitions:'Array[Partition]'' '' protected'def'getPreferredLocations(split:'Partition):'Seq[String]'='Nil'''''''''''''''}'
![Page 22: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/22.jpg)
@doanduyhai
Data Locality!
22
getPartitions : 1. fetch all token ranges and their corresponding nodes from C*
(describe_ring method)
2. group token ranges together so that 1 Spark partition = n token ranges belonging to the same node
![Page 23: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/23.jpg)
@doanduyhai
Data Locality!
23
def getPreferredLocations(split: Partition): Cassandra node IP corresponding to this Spark partition compute(split: Partition, context: TaskContext): read from Cassandra/write to Cassandra
![Page 24: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/24.jpg)
Q & R
! " !
![Page 25: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/25.jpg)
Connector API & Usage!
Resources handling!Connector API!
Live demo!
![Page 26: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/26.jpg)
@doanduyhai
Resources Handling!
26
Open connections to C* cluster Connections pooled (using Ref counting) on each executor Scala Loan Pattern
!connector.withSessionDo!{!! session!=>!session.execute("SELECT!xxx!FROM!yyy").all()!!}!
![Page 27: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/27.jpg)
@doanduyhai
Connector API!
27
Connecting to Cassandra
!//!Import!Cassandra.specific!functions!on!SparkContext!and!RDD!objects!!import!com.datastax.driver.spark._!!!!//!Spark!connection!options!!val!conf!=!new!SparkConf(true)!! .setMaster("spark://192.168.123.10:7077")!! .setAppName("cassandra.demo")!! .set("cassandra.connection.host","192.168.123.10")!//!initial!contact!! .set("cassandra.username",!"cassandra")!! .set("cassandra.password",!"cassandra")!!!val!sc!=!new!SparkContext(conf)!
![Page 28: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/28.jpg)
@doanduyhai
Connector API!
28
Preparing test data
CREATE&TABLE&test.words&(word&text&PRIMARY&KEY,&count&int);&&INSERT&INTO&test.words&(word,&count)&VALUES&('bar',&30);&INSERT&INTO&test.words&(word,&count)&VALUES&('foo',&20);&
![Page 29: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/29.jpg)
@doanduyhai
Connector API!
29
Reading from Cassandra
!//!Use!table!as!RDD!!val!rdd!=!sc.cassandraTable("test",!"words")!!//!rdd:!CassandraRDD[CassandraRow]!=!CassandraRDD[0]!!!rdd.toArray.foreach(println)!!//!CassandraRow[word:!bar,!count:!30]!!//!CassandraRow[word:!foo,!count:!20]!!!rdd.columnNames!!!!//!Stream(word,!count)!!rdd.size!!!!!!!!!!!//!2!!!val!firstRow!=!rdd.first!!//firstRow:CassandraRow=CassandraRow[word:!bar,!count:!30]!!!firstRow.getInt("count")!!//!Int!=!30!
![Page 30: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/30.jpg)
@doanduyhai
Connector API!
30
Writing data to Cassandra
!val!newRdd!=!sc.parallelize(Seq(("cat",!40),!("fox",!50)))!!!//!newRdd:!org.apache.spark.rdd.RDD[(String,!Int)]!=!ParallelCollectionRDD[2]!!!!!newRdd.saveToCassandra("test",!"words",!Seq("word",!"count"))!
SELECT&*&FROM&test.words;&&&&&&word&|&count&&&&&&999999+9999999&&&&&&bar&|&&&&30&&&&&&foo&|&&&&20&&&&&&cat&|&&&&40&&&&&&fox&|&&&&50&&
![Page 31: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/31.jpg)
Demo
https://github.com/doanduyhai/Cassandra-Spark-Demo
![Page 32: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/32.jpg)
Q & R
! " !
![Page 33: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/33.jpg)
DSE features!
![Page 34: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/34.jpg)
@doanduyhai
Use Cases!
34
Load data from various sources
Analytics (join, aggregate, transform, …)
Sanitize, validate, normalize data
Schema migration, Data conversion
![Page 35: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/35.jpg)
@doanduyhai
Without DSE!
35
C* SparkM SparkW
C* SparkW
C* SparkW
C* SparkW
C* SparkW
![Page 36: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/36.jpg)
@doanduyhai
With DSE!
36
C* SparkM SparkW
C* SparkW*
C* SparkW
C* SparkW
C* SparkW
Master state in C*
Spare master for H/A
![Page 37: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/37.jpg)
@doanduyhai
Spark/Cassandra integration!
37
Master state • saved in Cassandra
Integration • packaging • start-up script (dse –k) • fine tuning for resources (CPU, memory …) • more to come …
![Page 38: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/38.jpg)
@doanduyhai
Multi-DC with Spark!
38
Workload segregation with virtual DC
n2
n3
n4
n5
n6
n7
n8
n1
n2
n3
n4 n5
n1
Production (Live)
Analytics with Spark
Same physical DC
Async replication
![Page 39: Cassandra spark connector](https://reader033.vdocuments.us/reader033/viewer/2022052413/55a92ae51a28ab7e3e8b458c/html5/thumbnails/39.jpg)
Q & R
! " !