cassandra spark connector
Post on 17-Jul-2015
1.078 Views
Preview:
TRANSCRIPT
@doanduyhai
Cassandra Spark Connector DuyHai DOAN, Technical Advocate
@doanduyhai
Who Am I ?!
2
Duy Hai DOAN Cassandra technical advocate • talks, meetups, confs • open-source devs (Achilles, …) • OSS Cassandra point of contact
☞ duy_hai.doan@datastax.com ☞ @doanduyhai
@doanduyhai
Datastax!
3
• Founded in April 2010
• We contribute a lot to Apache Cassandra™
• 400+ customers (25 of the Fortune 100), 200+ employees
• Headquarter in San Francisco Bay area
• EU headquarter in London, offices in France and Germany
• Datastax Enterprise = OSS Cassandra + extra features
Spark/C* Connector Architecture!
Token ranges reminder!Stand-alone cluster deployment!
Data locality!!
@doanduyhai
Spark eco-system!
5
Local Standalone cluster YARN Mesos
Spark Core Engine (Scala/Java/Python)
Spark Streaming MLLib GraphX Spark SQL
Persistence
Cluster Manager
…
@doanduyhai
Spark eco-system!
6
Local Standalone cluster YARN Mesos
Spark Core Engine (Scala/Java/Python)
Spark Streaming MLLib GraphX Spark SQL
Persistence
Cluster Manager
…
@doanduyhai
Data distribution!Random: hash of #partition → token = hash(#p) Hash: ]-X, X] X = huge number (264/2)
n1
n2
n3
n4
n5
n6
n7
n8
7
@doanduyhai
Normal token ranges!A: ]0, X/8] B: ] X/8, 2X/8] C: ] 2X/8, 3X/8] D: ] 3X/8, 4X/8] E: ] 4X/8, 5X/8] F: ] 5X/8, 6X/8] G: ] 6X/8, 7X/8] H: ] 7X/8, X]
n1
n2
n3
n4
n5
n6
n7
n8
A
B
C
D
E
F
G
H
8
@doanduyhai
Cassandra Query Language (CQL)!
9
INSERT INTO users(login, name, age) VALUES(‘jdoe’, ‘John DOE’, 33);
UPDATE users SET age = 34 WHERE login = jdoe;
DELETE age FROM users WHERE login = jdoe;
SELECT age FROM users WHERE login = jdoe;
@doanduyhai
Why Spark on Cassandra ?!
10
Fast disk access
Structured data (columnar format)
Multi data-center !!!
Cross-table operations (JOIN, UNION, etc.)
Real-time/batch processing
Complex analytics (e.g. machine learning)
For Spark
For Cassandra
@doanduyhai
Connector objectives!
11
Data locality Resources-efficient, performant Fluent & friendly API Object mapper
@doanduyhai
Cluster deployment!
12
C* SparkM SparkW
C* SparkW
C* SparkW
C* SparkW
C* SparkW
Stand-alone cluster
@doanduyhai
Cluster deployment!
13
Spark Master
Spark Worker Spark Worker Spark Worker Spark Worker
Executor Executor Executor Executor
Driver Program
Cassandra – Spark placement
C* C* C* C*
@doanduyhai
Connector architecture – Core API!
14
Cassandra tables exposed as Spark RDDs
Read from and write to Cassandra
Mapping of C* tables and rows to Scala objects • CassandraRow • case class (object mapper) • Scala tuples
@doanduyhai
Connector architecture – Spark SQL !
15
Mapping of C* table to SchemaRDD • custom query plan • CassandraRDD à SchemaRDD • push predicates to CQL
@doanduyhai
Connector architecture – Spark Streaming !
16
Streaming data INTO Cassandra table • trivial setup • be careful about your Cassandra data model !!!
Streaming data OUT of Cassandra table • fetch all data from table • send each row as a DStream
@doanduyhai
Connector architecture!
17
All Cassandra types supported and converted to Scala types Server side data selection (SELECT … WHERE …) Use Java-driver underneath !Scala and Java support
@doanduyhai
Data Locality!
18
C* SparkM SparkW
C* SparkW
C* SparkW
C* SparkW
C* SparkW
Spark partition RDD
Cassandra tokens ranges
@doanduyhai
Data Locality!
19
C* SparkM SparkW
C* SparkW
C* SparkW
C* SparkW
C* SparkW
Use Murmur3Partitioner
@doanduyhai
Data locality!
20
Read/Write from/to Cassandra
Spark shuffle operations
@doanduyhai
Data Locality!
21
Remember RDD interface ?
abstract'class'RDD[T](…)'{'' @DeveloperApi'' def'compute(split:'Partition,'context:'TaskContext):'Iterator[T]''' protected'def'getPartitions:'Array[Partition]'' '' protected'def'getPreferredLocations(split:'Partition):'Seq[String]'='Nil'''''''''''''''}'
@doanduyhai
Data Locality!
22
getPartitions : 1. fetch all token ranges and their corresponding nodes from C*
(describe_ring method)
2. group token ranges together so that 1 Spark partition = n token ranges belonging to the same node
@doanduyhai
Data Locality!
23
def getPreferredLocations(split: Partition): Cassandra node IP corresponding to this Spark partition compute(split: Partition, context: TaskContext): read from Cassandra/write to Cassandra
Q & R
! " !
Connector API & Usage!
Resources handling!Connector API!
Live demo!
@doanduyhai
Resources Handling!
26
Open connections to C* cluster Connections pooled (using Ref counting) on each executor Scala Loan Pattern
!connector.withSessionDo!{!! session!=>!session.execute("SELECT!xxx!FROM!yyy").all()!!}!
@doanduyhai
Connector API!
27
Connecting to Cassandra
!//!Import!Cassandra.specific!functions!on!SparkContext!and!RDD!objects!!import!com.datastax.driver.spark._!!!!//!Spark!connection!options!!val!conf!=!new!SparkConf(true)!! .setMaster("spark://192.168.123.10:7077")!! .setAppName("cassandra.demo")!! .set("cassandra.connection.host","192.168.123.10")!//!initial!contact!! .set("cassandra.username",!"cassandra")!! .set("cassandra.password",!"cassandra")!!!val!sc!=!new!SparkContext(conf)!
@doanduyhai
Connector API!
28
Preparing test data
CREATE&TABLE&test.words&(word&text&PRIMARY&KEY,&count&int);&&INSERT&INTO&test.words&(word,&count)&VALUES&('bar',&30);&INSERT&INTO&test.words&(word,&count)&VALUES&('foo',&20);&
@doanduyhai
Connector API!
29
Reading from Cassandra
!//!Use!table!as!RDD!!val!rdd!=!sc.cassandraTable("test",!"words")!!//!rdd:!CassandraRDD[CassandraRow]!=!CassandraRDD[0]!!!rdd.toArray.foreach(println)!!//!CassandraRow[word:!bar,!count:!30]!!//!CassandraRow[word:!foo,!count:!20]!!!rdd.columnNames!!!!//!Stream(word,!count)!!rdd.size!!!!!!!!!!!//!2!!!val!firstRow!=!rdd.first!!//firstRow:CassandraRow=CassandraRow[word:!bar,!count:!30]!!!firstRow.getInt("count")!!//!Int!=!30!
@doanduyhai
Connector API!
30
Writing data to Cassandra
!val!newRdd!=!sc.parallelize(Seq(("cat",!40),!("fox",!50)))!!!//!newRdd:!org.apache.spark.rdd.RDD[(String,!Int)]!=!ParallelCollectionRDD[2]!!!!!newRdd.saveToCassandra("test",!"words",!Seq("word",!"count"))!
SELECT&*&FROM&test.words;&&&&&&word&|&count&&&&&&999999+9999999&&&&&&bar&|&&&&30&&&&&&foo&|&&&&20&&&&&&cat&|&&&&40&&&&&&fox&|&&&&50&&
Demo
https://github.com/doanduyhai/Cassandra-Spark-Demo
Q & R
! " !
DSE features!
@doanduyhai
Use Cases!
34
Load data from various sources
Analytics (join, aggregate, transform, …)
Sanitize, validate, normalize data
Schema migration, Data conversion
@doanduyhai
Without DSE!
35
C* SparkM SparkW
C* SparkW
C* SparkW
C* SparkW
C* SparkW
@doanduyhai
With DSE!
36
C* SparkM SparkW
C* SparkW*
C* SparkW
C* SparkW
C* SparkW
Master state in C*
Spare master for H/A
@doanduyhai
Spark/Cassandra integration!
37
Master state • saved in Cassandra
Integration • packaging • start-up script (dse –k) • fine tuning for resources (CPU, memory …) • more to come …
@doanduyhai
Multi-DC with Spark!
38
Workload segregation with virtual DC
n2
n3
n4
n5
n6
n7
n8
n1
n2
n3
n4 n5
n1
Production (Live)
Analytics with Spark
Same physical DC
Async replication
Q & R
! " !
Thank You @doanduyhai
duy_hai.doan@datastax.com
https://academy.datastax.com/
top related