spark cassandra connector: past, present and furure
TRANSCRIPT
Spark Cassandra Connector: Past, Present and Future
Spark Cassandra Connector
Past, Present and Future
Brian HessSr. Product Manager, Analytics
DataStax
The Past: Hadoop and C*
3
You
Hadoop integration with C* required a bit of knowledge and was generally not very easy.
Map Reduce Code
public static class ReducerToCassandra extends Reducer<Text, IntWritable, Map<String, ByteBuffer>, List<ByteBuffer>> { private Map<String, ByteBuffer> keys; private ByteBuffer key; protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context) throws IOException, InterruptedException { keys = new LinkedHashMap<String, ByteBuffer>(); }
public void reduce(Text word, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) sum += val.get(); keys.put("word", ByteBufferUtil.bytes(word.toString())); context.write(keys, getBindVariables(word, sum)); }
private List<ByteBuffer> getBindVariables(Text word, int sum) { List<ByteBuffer> variables = new ArrayList<ByteBuffer>(); variables.add(ByteBufferUtil.bytes(String.valueOf(sum))); return variables; } }
Hadoop Interfaces are … difficult
4© 2015. All Rights Reserved.
https://github.com/apache/cassandra/blob/trunk/examples/hadoop_cql3_word_count/src/WordCount.java
Even simple integration with a Hadoop cluster took a lot of experience to get right.
public static class ReducerToCassandra extends Reducer<Text, IntWritable, Map<String, ByteBuffer>, List<ByteBuffer>> { private Map<String, ByteBuffer> keys; private ByteBuffer key; protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context) throws IOException, InterruptedException { keys = new LinkedHashMap<String, ByteBuffer>(); }
public void reduce(Text word, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) sum += val.get(); keys.put("word", ByteBufferUtil.bytes(word.toString())); context.write(keys, getBindVariables(word, sum)); }
private List<ByteBuffer> getBindVariables(Text word, int sum) { List<ByteBuffer> variables = new ArrayList<ByteBuffer>(); variables.add(ByteBufferUtil.bytes(String.valueOf(sum))); return variables; } }
Hadoop Interfaces are … difficult
5© 2015. All Rights Reserved.
https://github.com/apache/cassandra/blob/trunk/examples/hadoop_cql3_word_count/src/WordCount.java
Well at least you have Pig built in right?moredata = load 'cql://cql3ks/compmore' USING CqlNativeStorage;insertformat = FOREACH moredata GENERATE TOTUPLE (TOTUPLE('a',x),TOTUPLE('b',y), TOTUPLE('c',z)),TOTUPLE(data);STORE insertformat INTO 'cql://cql3ks/compotable?output_query=UPDATE%20cql3ks.compotable%20SET%20d%20%3D%20%3F' USING CqlNativeStorage;
Even simple integration with a Hadoop cluster took a lot of experience to get right.
Spark Offers a New Path
6© 2015. All Rights Reserved.
Core Libraries for ML/StreamingNo need for HDFS/HadoopEasy integration with other Data Sources
val lines = sc.textFile("data.txt")val pairs = lines.map(s => (s, 1))val counts = pairs.reduceByKey((a, b) => a + b)
RDD Api
df.groupBy("age").count().show()
Dataframes Api
head(filter(df, df$waiting < 50))
R Api
SELECT name FROM people
SQL API
Driver
Executor
Enter The Spark Cassandra Connector
7© 2015. All Rights Reserved.
First Public Release at the Spark Summit in June 2014
If you write a Spark application that needs access to
Cassandra, this library is for you-Piotr Kołaczkowski
https://github.com/datastax/spark-cassandra-connector
Open Source Software
1394 Commits28 Contributors
Why do we even want a Distributed Analytics tool?
8© 2015. All Rights Reserved.
Why do we even want a Distributed Analytics tool?
9© 2015. All Rights Reserved.
• Generating Reports• Direct Analytics on our data• Cassandra Maintenance
• Making new views• Changing partition keys
• Streaming• Machine Learning• ETL Data between different sources
We have small questions and big questions and they need to work in different ways
10© 2015. All Rights Reserved.
How many shoes did Marty buy?
How many shoes were sold last year compared to this year grouped by demographic?
BIG DATA
We have small questions and big questions and they need to work in different ways
11© 2015. All Rights Reserved.
How many shoes did Marty buy?
How many shoes were sold last year compared to this year grouped by demographic?
BIG DATAMarty Purchase History
BIG DATA
We have small questions and big questions and they need to work in different ways
12© 2015. All Rights Reserved.
How many shoes did Marty buy?
All Shoe Data
How many shoes were sold last year compared to this year grouped by demographic?
Part of Shoe Data
When we actually want to work with large amounts of data we break it into parts
13© 2015. All Rights Reserved.
Distributed FS/databases already do this for us
Node1 Node2 Node3 Node4
Part of Shoe Data Part of Shoe Data Part of Shoe Data
Spark describes underlying large multi-machine sets of data using The RDD (Resilient Distributed Dataset)
14© 2015. All Rights Reserved.
RDD
Part of Shoe Data
Node1 Node2 Node3 Node4
Part of Shoe Data Part of Shoe Data Part of Shoe Data
Spark Partitions
In Cassandra this distribution is mapped out by token ranges
15© 2015. All Rights Reserved.
1 - 10000 10001-20000 20001-30000 30001 - 40000
Tokens
Part of Shoe Data
Node1 Node2 Node3 Node4
Part of Shoe Data Part of Shoe Data Part of Shoe Data
This distribution is key to how Cassandra handles OLTP Requests
16© 2015. All Rights Reserved.
SELECT amount from orders where customer = martyID
1 - 10000 10001-20000 20001-30000 30001 - 40000
Tokens
Part of Shoe Data
Node1 Node2 Node3 Node4
Part of Shoe Data Part of Shoe Data Part of Shoe Data
How many shoes did Marty buy?
martyId -> Token -> 3470
Lookup Data for marty
The Connector Maps Cassandra Tokens to Spark Partitions
17© 2015. All Rights Reserved.
sc.cassandraTable("keyspace","tablename")
1 - 10000 10001-20000 30001 - 40000Tokens
Part of Shoe Data
Node1 Node2 Node3 Node4
Part of Shoe Data Part of Shoe Data Part of Shoe Data
20001-30000
00001-
02500
02501-
05000
05001-
07500
07501-
10000
CassandraRDD10001
-12500
12501-
15000
15001-
17500
17501-
20000
20001-
22500
22501-
25000
25001-
27500
27501-
30000
30001-
32500
32501-
35000
35001-
37500
37501-
40000
This allows for Node Local operations!
18© 2015. All Rights Reserved.
sc.cassandraTable("keyspace","tablename")
10001-20000 30001 - 40000Tokens
Node2 Node3 Node4
Part of Shoe Data Part of Shoe Data Part of Shoe Data
20001-30000
CassandraRDD10001
-12500
12501-
15000
15001-
17500
17501-
20000
20001-
22500
22501-
25000
25001-
27500
27501-
30000
30001-
32500
32501-
35000
35001-
37500
37501-
40000
Under the Hood the Spark Cassandra Connector Uses the Java Driver to pull Information from C*
19© 2015. All Rights Reserved.
Check out my videos on Datastax Academy For a Deep Dive!
https://academy.datastax.com/tutorialshttps://academy.datastax.com/demos/how-spark-cassandra-connector-reads-datahttps://academy.datastax.com/demos/how-spark-cassandra-connector-writes-datahttps://academy.datastax.com/demos/how-spark-works-dsestandalone-mode
The Present:Capabilities and Features
20© 2015. All Rights Reserved.
Official Releases for Spark 1.0 - 1.4Milestone Release for 1.5
Read Cassandra Data into RDDsWrite RDDs into Cassandra
21© 2015. All Rights Reserved.
RDD[Letter]
case class Letter(mailbox: Int, body: String, fromuser: String, : touser: String)
CREATE TABLE important.letters ( mailbox int, touser text, fromuser text, body text, PRIMARY KEY ((mailbox), touser, fromuser));
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md
Read Cassandra Data into RDDsWrite RDDs into Cassandra
22© 2015. All Rights Reserved.
RDD[Letter]sc.cassandraTable[Letter]("important","letters")
case class Letter(mailbox: Int, body: String, fromuser: String, : touser: String)
CREATE TABLE important.letters ( mailbox int, touser text, fromuser text, body text, PRIMARY KEY ((mailbox), touser, fromuser));
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md
Read Cassandra Data into RDDsWrite RDDs into Cassandra
23© 2015. All Rights Reserved.
RDD[Letter]sc.cassandraTable[Letter]("important","letters")
rdd.saveToCassandra("important","letters")
case class Letter(mailbox: Int, body: String, fromuser: String, : touser: String)
CREATE TABLE important.letters ( mailbox int, touser text, fromuser text, body text, PRIMARY KEY ((mailbox), touser, fromuser));
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md
Ability to push down relevant filters to the C* Server
24© 2015. All Rights Reserved.
CREATE TABLE important.letters ( mailbox int, touser text, fromuser text, body text, PRIMARY KEY ((mailbox), touser, fromuser));
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md
Ability to push down relevant filters to the C* Server
25© 2015. All Rights Reserved.
CREATE TABLE important.letters ( mailbox int, touser text, fromuser text, body text, PRIMARY KEY ((mailbox), touser, fromuser));
Partition for Mailbox 1 Partition for Mailbox 2
Orde
red
by
tous
er
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md
Ability to push down relevant filters to the C* Server
26© 2015. All Rights Reserved.
mailbox: 2touser: martyfromuser: docbody: It's your kids, Marty. Something gotta be done about your kids!
mailbox: 1touser: docfromuser: martybody: What happens to us in the future?
mailbox: 1touser: lorrainefromuser: martybody: Calvin? Wh… Why do you keep calling me calvin
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md
Partition for Mailbox 1 Partition for Mailbox 2
Orde
red
by
tous
er
CREATE TABLE important.letters ( mailbox int, touser text, fromuser text, body text, PRIMARY KEY ((mailbox), touser, fromuser));
Ability to push down relevant filters to the C* Server
27© 2015. All Rights Reserved.
sc.cassandraTable("important", "letters") .select("body") .where("touser = >", "einstein") .collect
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md
mailbox: 1touser: docfromuser: martybody: What happens to us in the future?
mailbox: 1touser: lorrainefromuser: martybody: Calvin? Wh… Why do you keep calling me calvin
Partition for Mailbox 1 Partition for Mailbox 2
Orde
red
by
tous
er
CREATE TABLE important.letters ( mailbox int, touser text, fromuser text, body text, PRIMARY KEY ((mailbox), touser, fromuser));
mailbox: 2touser: martyfromuser: docbody: It's your kids, Marty. Something gotta be done about your kids!
Ability to push down relevant filters to the C* Server
28© 2015. All Rights Reserved.Select lets us only request certain columns from C*
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md
sc.cassandraTable("important", "letters") .select("body") .where("touser = >", "einstein") .collect
mailbox: 1touser: docfromuser: martybody: What happens to us in the future?
mailbox: 1touser: lorrainefromuser: martybody: Calvin? Wh… Why do you keep calling me calvin
Partition for Mailbox 1 Partition for Mailbox 2
Orde
red
by
tous
er
CREATE TABLE important.letters ( mailbox int, touser text, fromuser text, body text, PRIMARY KEY ((mailbox), touser, fromuser));
mailbox: 2touser: martyfromuser: docbody: It's your kids, Marty. Something gotta be done about your kids!
Ability to push down relevant filters to the C* Server
29© 2015. All Rights Reserved.Where lets us put in CQL Predicates that are allowed
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md
sc.cassandraTable("important", "letters") .select("body") .where("touser = >", "einstein") .collect
mailbox: 1touser: docfromuser: martybody: What happens to us in the future?
mailbox: 1touser: lorrainefromuser: martybody: Calvin? Wh… Why do you keep calling me calvin
Partition for Mailbox 1 Partition for Mailbox 2
Orde
red
by
tous
er
CREATE TABLE important.letters ( mailbox int, touser text, fromuser text, body text, PRIMARY KEY ((mailbox), touser, fromuser));
mailbox: 2touser: martyfromuser: docbody: It's your kids, Marty. Something gotta be done about your kids!
Ability to push down relevant filters to the C* Server
30© 2015. All Rights Reserved.https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md
Only the data we specifically request is pulled form C*
sc.cassandraTable("important", "letters") .select("body") .where("touser = >", "einstein") .collect
mailbox: 1touser: docfromuser: martybody: What happens to us in the future?
mailbox: 1touser: lorrainefromuser: martybody: Calvin? Wh… Why do you keep calling me calvin
Partition for Mailbox 1 Partition for Mailbox 2
Orde
red
by
tous
er
CREATE TABLE important.letters ( mailbox int, touser text, fromuser text, body text, PRIMARY KEY ((mailbox), touser, fromuser));
mailbox: 2touser: martyfromuser: docbody: It's your kids, Marty. Something gotta be done about your kids!
Java API Support
31© 2015. All Rights Reserved.
JavaRDD<Double> pricesRDD = javaFunctions(sc) .cassandraTable("important", "letters", mapColumnTo(Letter.class)) .select("body");
All functionality introduced in the Scala API is also available in the Java API
javaFunctions(rdd).writerBuilder( "important", "letters", mapToRow(Letters.class)).saveToCassandra();
Reading
Writing
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md
32© 2015. All Rights Reserved.
But what if you want to work with brand newDataframes?
Full Dataframes Support : org.apache.spark.sql.cassandra
33© 2015. All Rights Reserved.
Dataframes (aka SchemaRDDs) provide a new and moregeneric api for working with RDD's
val df = sqlContext .read .format("org.apache.spark.sql.cassandra") .options(
Map( "keyspace" -> "important", "table" -> "letters"))
.load()
Reading
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md
Full Dataframes Support : org.apache.spark.sql.cassandra
34© 2015. All Rights Reserved.
Dataframes (aka SchemaRDDs) provide a new and moregeneric api for working with RDD's
val df = sqlContext .read .format("org.apache.spark.sql.cassandra") .options(
Map( "keyspace" -> "important", "table" -> "letters"))
.load()
CREATE TABLE letters USING org.apache.spark.sql.cassandra OPTIONS ( keyspace "important", table "letters" )
Reading
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md
Full Dataframes Support : org.apache.spark.sql.cassandra
35© 2015. All Rights Reserved.
Dataframes (aka SchemaRDDs) provide a new and moregeneric api for working with RDD's
val df = sqlContext .read .format("org.apache.spark.sql.cassandra") .options(
Map( "keyspace" -> "important", "table" -> "letters"))
.load()
CREATE TABLE letters USING org.apache.spark.sql.cassandra OPTIONS ( keyspace "important", table "letters" )
Reading
Writing
df.write .format("org.apache.spark.sql.cassandra") .options( Map( "keyspace" -> "important", "table" -> "letters" )) .save()
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md
Full Dataframes Support : org.apache.spark.sql.cassandra
36© 2015. All Rights Reserved.
Dataframes (aka SchemaRDDs) provide a new and moregeneric api for working with RDD's
val df = sqlContext .read .format("org.apache.spark.sql.cassandra") .options(
Map( "keyspace" -> "important", "table" -> "letters"))
.load()
CREATE TABLE letters USING org.apache.spark.sql.cassandra OPTIONS ( keyspace "important", table "letters" )
Reading
Writing
df.write .format("org.apache.spark.sql.cassandra") .options( Map( "keyspace" -> "important", "table" -> "letters" )) .save()
CREATE TABLE letters_copy USING org.apache.spark.sql.cassandra OPTIONS ( keyspace "important", table "letters_copy" )
INSERT INTO TABLE letters_copy SELECT * FROM letters;
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md
val df = sqlContext .read .format("org.apache.spark.sql.cassandra") .options(
Map( "keyspace" -> "important", "table" -> "letters"))
.load()
CREATE TABLE letters USING org.apache.spark.sql.cassandra OPTIONS ( keyspace "important", table "letters" )
Reading
Writing
df.write .format("org.apache.spark.sql.cassandra") .options( Map( "keyspace" -> "important", "table" -> "letters" )) .save()
CREATE TABLE letters_copy USING org.apache.spark.sql.cassandra OPTIONS ( keyspace "important", table "letters_copy" )
INSERT INTO TABLE letters_copy SELECT * FROM letters;
Full Dataframes Support
37© 2015. All Rights Reserved.https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md
Backed By CassandraRDDSo we can prune
and pushdown predicates!
Integrated Pushdown of Predicates to C* in Dataframes
38© 2015. All Rights Reserved.
There is no need for special functions when using Dataframes since the pushdown is done by the Catalyst
optimizer
CREATE TABLE important.letters ( mailbox int, touser text, fromuser text, body text, PRIMARY KEY ((mailbox), touser, fromuser));
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md
scala> df.filter( "touser > 'einstein'").explain== Physical Plan ==Filter (touser#1 > einstein) PhysicalRDD [mailbox#0,touser#1,fromuser#2,body#3], MapPartitionsRDD[6] at explain at <console>:59
Automatically Checked Against C* rules for pushing downpredicates. Valid predicates will be applied as if you did a
.where on CassandraRDD.
Pyspark and Dataframes Also Supported
39© 2015. All Rights Reserved.
Dataframes in PySpark run Native Code, no need for Python <-> Java Serialization
sqlContext.read\ .format("org.apache.spark.sql.cassandra")\ .options(table="kv", keyspace="test")\ .load().show()
You can tell it's python because of
my need to escape line ends
Pure Python in Pyspark PySpark Dataframes!
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/15_python.md
Pyspark and Dataframes Also Supported
40© 2015. All Rights Reserved. https://github.com/datastax/spark-cassandra-connector/blob/master/doc/15_python.md
sqlContext.read\ .format("org.apache.spark.sql.cassandra")\ .options(table="kv", keyspace="test")\ .load().show()
You can tell it's python because of
my need to escape line ends
Pure Python in Pyspark PySpark Dataframes!
SparkR Also Works with Cassandra Dataframes!
Repartition by Cassandra Replica
41© 2015. All Rights Reserved.
Repartition any RDD to get Data Locality to C*!https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md
1955 1985 2015
RDD
Spark Partitions Located on Different Nodes than Their Respective C* Data
Repartition by Cassandra Replica
42© 2015. All Rights Reserved.
Repartition any RDD to get Data Locality to C*!https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md
1955 1985 2015
Repartition by Cassandra Replica
43© 2015. All Rights Reserved.
Repartition any RDD to get Data Locality to C*!https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md
1955 1985 2015
mailboxesToCheck .repartitionByCassandraReplica("important", "letters", 10)
JoinWithCassandraTable pulls specific Partition Keys From Cassandra
44© 2015. All Rights Reserved.
mailboxesToCheck .repartitionByCassandraReplica("important", "letters", 10) .joinWithCassandraTable("important","letters")
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234
Node1 Node2 Node3 Node4
Several thousand mailboxes
CREATE TABLE important.letters ( mailbox int, touser text, fromuser text, body text, PRIMARY KEY ((mailbox), touser, fromuser));
JoinWithCassandraTable pulls specific Partition Keys From Cassandra
45© 2015. All Rights Reserved.
mailboxesToCheck .repartitionByCassandraReplica("important", "letters", 10) .joinWithCassandraTable("important","letters")
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox8765Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox3
Mailbox13234 Mailbox2341Mailbox13234
Mailbox43211Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox754567
Mailbox13452Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox52352
Node1 Node2 Node3 Node4
Repartition places our keys local to the data they will
retrieve
CREATE TABLE important.letters ( mailbox int, touser text, fromuser text, body text, PRIMARY KEY ((mailbox), touser, fromuser));
JoinWithCassandraTable pulls specific Partition Keys From Cassandra
46© 2015. All Rights Reserved.
mailboxesToCheck .repartitionByCassandraReplica("important", "letters", 10) .joinWithCassandraTable("important","letters")
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox8765Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox3
Mailbox13234 Mailbox2341Mailbox13234
Mailbox43211Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox13234Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox754567
Mailbox13452Mailbox13234
Mailbox13234Mailbox13234
Mailbox13234Mailbox52352
Node1 Node2 Node3 Node4
The Join then retrieves the rows in parallel
CREATE TABLE important.letters ( mailbox int, touser text, fromuser text, body text, PRIMARY KEY ((mailbox), touser, fromuser));
Manual Driver Sessions are available!
47© 2015. All Rights Reserved. https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md
import com.datastax.spark.connector.cql.CassandraConnector
CassandraConnector(conf).withSessionDo { session => session.execute("CREATE KEYSPACE test2 WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }") session.execute("CREATE TABLE test2.words (word text PRIMARY KEY, count int)")}
Any Connections Made through CassandraConnector will use a Connection pool (even remotely!)
48© 2015. All Rights Reserved. https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md
CassandraConnector(conf).withSessionDo {}
Gains a handle on a running Cluster object made with
Configuration conf
Executor Thread 2
Executor Thread 3
Executor Thread1
Executor JVMCassandra Connection
Pool
Cassandra Connection
Pool
Any Connections Made through CassandraConnector will use a Connection pool (even remotely!)
49© 2015. All Rights Reserved. https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md
Multiple threads/executor cores will end up using the
same Connection
Executor Thread 2
Executor Thread 3Executor JVM
Cluster
CassandraConnector(conf).withSessionDo {}
Executor Thread1
Cassandra Connector can be used in Closuresand Prepared Statements will be Cached as well
50© 2015. All Rights Reserved. https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md
rdd.mapPartitions{ it => CassandraConnector.withSessionDo( session => ps = session.prepare(query) ) }
Reference to already created prepared statement will be used if
available
Cassandra Connection
PoolExecutor Thread 2
Executor Thread 3Executor JVM
Cluster
Prepared Statement CacheExecutor Thread1
What is the Future of the Spark Cassandra Connector?
51© 2015. All Rights Reserved.
You!
52© 2015. All Rights Reserved.
The more people that contribute to the project the better it will become! We welcome any contributions or just send us a letter on the mailing list!
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/FAQ.md#can-i-contribute-to-the-spark-cassandra-connector
Spark Packages!
53© 2015. All Rights Reserved.
http://spark-packages.org/package/datastax/spark-cassandra-connector
Update Even Faster to New Spark Versions
54© 2015. All Rights Reserved.
We'll be testing against Spark Release Candidates in the future so that we can have a compatibleSpark Cassandra Connectors out the moment an official Spark Release is ready!
Even better Dataframes
55© 2015. All Rights Reserved.
Automatic integration of repartitionByCassandra andjoinWithCassandraTable
Make it that any joins against Cassandra Tablesare automatically detected, and if possible converted to JoinWithCassandraTable calls. No need to manually determinewhen you should or shouldn't use the method.
Create Cassandra Tables from Dataframes AutomaticallyCurrently all tables need to have been created in C* prior to saving, we'd like it ifusers could specify what kind of key they would like on their C* table and have it automatically generated on data frame writes.
Improve Spark-Cassandra-Stress
56© 2015. All Rights Reserved.https://github.com/datastax/spark-cassandra-stress
Open source tool which lets you test maximum throughputof your cluster with Spark and C*
• Write Tests • Read Tests • Streaming Tests
Includes!
Thank you