escape from hadoop: ultra fast data analysis with spark & cassandra
DESCRIPTION
We present the basic functionality of the official DataStax spark-cassandra connector - how to load cassandra tables as Spark RDDs and how to save Spark RDDs to Cassandra.TRANSCRIPT
Escape From Hadoop:Ultra Fast Data Analysis
with Apache Cassandra & Spark
Kurt Russell SpitzerPiotr KołaczkowskiPiotr KołaczkowskiDataStax
slides by
presented by
Why escape from Hadoop?
Hadoop
Many Moving Pieces
Map Reduce
Lots of Overhead
And there is a way out!
Single Points of Failure
Spark Provides a Simple and Efficient framework for Distributed Computations
Node Roles 2
In Memory Caching Yes!
Fault Tolerance Yes!
Great Abstraction For Datasets?
RDD!
SparkWorkerSpark
Worker
SparkWorkerSpark
WorkerSpark
MasterSpark
Master
Spark WorkerSpark
Worker
Resilient Distributed Dataset
Resilient Distributed Dataset
Spark ExecutorSpark Executor
Spark is Compatible with HDFS, JDBC, Parquet, CSVs, ….
AND
APACHE CASSANDRA
ApacheCassandra
ApacheCassandra
Apache Cassandra is a Linearly Scaling and Fault Tolerant noSQL Database
Linearly Scaling: The power of the database increases linearly with the number of machines2x machines = 2x throughput
http://techblog.netflix.com/2011/11/benchmarking-cassandra-scalability-on.html
Fault Tolerant: Nodes down != Database DownDatacenter down != Database Down
Apache Cassandra Architecture is Very Simple
Replication
Node Roles 1
Replication Tunable
Consistency Tunable
C*C*
C*C*C*C*
C*C*
ClientClient
DataStax OSS Connector Spark to Cassandra
https://github.com/datastax/spark-cassandra-connector
Keyspace Keyspace TableTable
CassandraCassandra SparkSpark
RDD[CassandraRow]RDD[CassandraRow]
RDD[Tuples]RDD[Tuples]
Bundled and Supported with DSE 4.5!
DataStax ConnectorSpark to Cassandra
By the numbers:● 370 commits● 17 branches● 10 releases● 11 contributors● 168 issues (65 open)● 98 pull requests (6 open)
Spark Cassandra Connector uses the DataStax Java Driver to Read from and Write to C*
C*C*
Full Token Range
Full Token Range
Each Executor Maintains a connection to the C* Cluster
Spark Executor
Spark Executor
DataStax Java DriverDataStax
Java Driver
Tokens 1-1000
Tokens 1001 -2000
Tokens …
RDD’s read into different splits based on token ranges
Co-locate Spark and C* for Best Performance
C*C*
C*C*C*C*
C*C*Running Spark Workers onthe same nodes as your C* cluster will save network hops when reading and writing Spark
WorkerSpark
Worker
SparkWorkerSpark
WorkerSpark
MasterSpark
Master
Spark WorkerSpark
Worker
Setting up C* and Spark
DSE > 4.5.0Just start your nodes with
dse cassandra -k
Apache CassandraFollow the excellent guide by Al Tobey
http://tobert.github.io/post/2014-07-15-installing-cassandra-spark-stack.html
We need a Distributed System For Analytics and Batch Jobs
But it doesn’t have to be complicated!
Even count needs to be distributed
You could make this easier by adding yet another technology to your Hadoop Stack (hive, pig, impala) orwe could just do one liners on the spark shell.
Ask me to write a Map Reduce for word count, I dare you.
Basics: Getting a Table and Counting
CREATE KEYSPACE newyork WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };USE newyork;CREATE TABLE presidentlocations ( time int, location text , PRIMARY KEY time );INSERT INTO presidentlocations (time, location ) VALUES ( 1 , 'White House' );INSERT INTO presidentlocations (time, location ) VALUES ( 2 , 'White House' );INSERT INTO presidentlocations (time, location ) VALUES ( 3 , 'White House' );INSERT INTO presidentlocations (time, location ) VALUES ( 4 , 'White House' );INSERT INTO presidentlocations (time, location ) VALUES ( 5 , 'Air Force 1' );INSERT INTO presidentlocations (time, location ) VALUES ( 6 , 'Air Force 1' );INSERT INTO presidentlocations (time, location ) VALUES ( 7 , 'Air Force 1' );INSERT INTO presidentlocations (time, location ) VALUES ( 8 , 'NYC' );INSERT INTO presidentlocations (time, location ) VALUES ( 9 , 'NYC' );INSERT INTO presidentlocations (time, location ) VALUES ( 10 , 'NYC' );
CREATE KEYSPACE newyork WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };USE newyork;CREATE TABLE presidentlocations ( time int, location text , PRIMARY KEY time );INSERT INTO presidentlocations (time, location ) VALUES ( 1 , 'White House' );INSERT INTO presidentlocations (time, location ) VALUES ( 2 , 'White House' );INSERT INTO presidentlocations (time, location ) VALUES ( 3 , 'White House' );INSERT INTO presidentlocations (time, location ) VALUES ( 4 , 'White House' );INSERT INTO presidentlocations (time, location ) VALUES ( 5 , 'Air Force 1' );INSERT INTO presidentlocations (time, location ) VALUES ( 6 , 'Air Force 1' );INSERT INTO presidentlocations (time, location ) VALUES ( 7 , 'Air Force 1' );INSERT INTO presidentlocations (time, location ) VALUES ( 8 , 'NYC' );INSERT INTO presidentlocations (time, location ) VALUES ( 9 , 'NYC' );INSERT INTO presidentlocations (time, location ) VALUES ( 10 , 'NYC' );
scala> sc.cassandraTable(“newyork","presidentlocations").count
res3: Long = 10
scala> sc.cassandraTable(“newyork","presidentlocations").count
res3: Long = 10
cassandraTable
count10
Basics: take() and toArrayscala> sc.cassandraTable("newyork","presidentlocations").take(1)
res2: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{time: 9, location: NYC})
scala> sc.cassandraTable("newyork","presidentlocations").take(1)
res2: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{time: 9, location: NYC})
cassandraTable
take(1)
99 NYCNYC
Array of CassandraRows
cassandraTable
toArray
99 NYCNYC
Array of CassandraRows
scala> sc.cassandraTable(“newyork","presidentlocations").toArray
res3: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{time: 9, location: NYC}, CassandraRow{time: 3, location: White House}, …,CassandraRow{time: 6, location: Air Force 1})
scala> sc.cassandraTable(“newyork","presidentlocations").toArray
res3: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{time: 9, location: NYC}, CassandraRow{time: 3, location: White House}, …,CassandraRow{time: 6, location: Air Force 1})
99 NYCNYC99 NYCNYC99 NYCNYC99 NYCNYC
Basics: Getting Row Values out of a CassandraRow
scala> sc.cassandraTable("newyork","presidentlocations").first.get[Int]("time")
res5: Int = 9
scala> sc.cassandraTable("newyork","presidentlocations").first.get[Int]("time")
res5: Int = 9
cassandraTable
first
99 NYCNYC
A CassandraRow object
99get[Int]
get[Int]get[String]get[List[...]]…get[Any]
Got null ?get[Option[Int]]
http://www.datastax.com/documentation/datastax_enterprise/4.5/datastax_enterprise/spark/sparkSupportedTypes.html
C*C*
Copy A Table
Say we want to restructure our table or add a new column?
CREATE TABLE characterlocations (time int, character text, location text, PRIMARY KEY (time,character)
);
CREATE TABLE characterlocations (time int, character text, location text, PRIMARY KEY (time,character)
);
scala> sc.cassandraTable(“newyork","presidentlocations") .map( row => (
row.get[Int](“time"), "president", row.get[String](“location")))
.saveToCassandra("newyork","characterlocations")
scala> sc.cassandraTable(“newyork","presidentlocations") .map( row => (
row.get[Int](“time"), "president", row.get[String](“location")))
.saveToCassandra("newyork","characterlocations")
cqlsh:newyork> SELECT * FROM characterlocations ;
time | character | location------+-----------+------------- 5 | president | Air Force 1 10 | president | NYC……
cqlsh:newyork> SELECT * FROM characterlocations ;
time | character | location------+-----------+------------- 5 | president | Air Force 1 10 | president | NYC……
cassandraTable
11 white housewhite house
get[String]get[Int]
1,president,white house1,president,white house
saveToCassandra
Filter a Table
scala> sc.cassandraTable(“newyork","presidentlocations").filter( _.getInt("time") > 7 ).toArray
res9: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{time: 9, location: NYC}, CassandraRow{time: 10, location: NYC}, CassandraRow{time: 8, location: NYC}
)
scala> sc.cassandraTable(“newyork","presidentlocations").filter( _.getInt("time") > 7 ).toArray
res9: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{time: 9, location: NYC}, CassandraRow{time: 10, location: NYC}, CassandraRow{time: 8, location: NYC}
)
cassandraTable
What if we want to filter based on a non-clustering key column?
11 white housewhite house
getInt
11
>7
filter
Backfill a Table with a Different Key!
CREATE TABLE timelines ( time int, character text, location text, PRIMARY KEY ((character), time))
CREATE TABLE timelines ( time int, character text, location text, PRIMARY KEY ((character), time))
If we actually want to have quick access to timelines we need a C* table with a different structure.
sc.cassandraTable("newyork","characterlocations") .saveToCassandra("newyork","timelines")sc.cassandraTable("newyork","characterlocations") .saveToCassandra("newyork","timelines")
11 white housewhite house
cassandraTable
presidentpresident C*C*
saveToCassandra
cqlsh:newyork> select * from timelines;
character | time | location-----------+------+------------- president | 1 | White House president | 2 | White House president | 3 | White House president | 4 | White House president | 5 | Air Force 1 president | 6 | Air Force 1 president | 7 | Air Force 1 president | 8 | NYC president | 9 | NYC president | 10 | NYC
cqlsh:newyork> select * from timelines;
character | time | location-----------+------+------------- president | 1 | White House president | 2 | White House president | 3 | White House president | 4 | White House president | 5 | Air Force 1 president | 6 | Air Force 1 president | 7 | Air Force 1 president | 8 | NYC president | 9 | NYC president | 10 | NYC
Import a CSV
sc.textFile("file:///home/pkolaczk/ReallyImportantDocuments/PlisskenLocations.csv").map(_.split(",")).map(line => (line(0),line(1),line(2))).saveToCassandra("newyork","timelines", SomeColumns("character", "time", "location"))
sc.textFile("file:///home/pkolaczk/ReallyImportantDocuments/PlisskenLocations.csv").map(_.split(",")).map(line => (line(0),line(1),line(2))).saveToCassandra("newyork","timelines", SomeColumns("character", "time", "location"))
I have some data in another source which I could really use in my Cassandra table
textFilemap
plissken,1,white house
plissken,1,white houseplissken,1,white house
split
plisskenplissken white housewhite house11
C*C*
saveToCassandra
cqlsh:newyork> select * from timelines where character = 'plissken';
character | time | location-----------+------+----------------- plissken | 1 | Federal Reserve plissken | 2 | Federal Reserve plissken | 3 | Federal Reserve plissken | 4 | Court plissken | 5 | Court plissken | 6 | Court plissken | 7 | Court plissken | 8 | Stealth Glider plissken | 9 | NYC plissken | 10 | NYC
cqlsh:newyork> select * from timelines where character = 'plissken';
character | time | location-----------+------+----------------- plissken | 1 | Federal Reserve plissken | 2 | Federal Reserve plissken | 3 | Federal Reserve plissken | 4 | Court plissken | 5 | Court plissken | 6 | Court plissken | 7 | Court plissken | 8 | Stealth Glider plissken | 9 | NYC plissken | 10 | NYC
Perform a Join with MySQL
Maybe a little more than one line …
import java.sql._import org.apache.spark.rdd.JdbcRDD
Class.forName("com.mysql.jdbc.Driver").newInstance(); val quotes = new JdbcRDD( sc, getConnection = () => DriverManager.getConnection("jdbc:mysql://Localhost/escape_from_ny?user=root"), sql = "SELECT * FROM quotes WHERE ? <= ID and ID <= ?", lowerBound = 0, upperBound = 100, numPartitions = 5, mapRow = (r: ResultSet) => (r.getInt(2),r.getString(3)))
quotes: org.apache.spark.rdd.JdbcRDD[(Int, String)] = JdbcRDD[9] at JdbcRDD at <console>:23
import java.sql._import org.apache.spark.rdd.JdbcRDD
Class.forName("com.mysql.jdbc.Driver").newInstance(); val quotes = new JdbcRDD( sc, getConnection = () => DriverManager.getConnection("jdbc:mysql://Localhost/escape_from_ny?user=root"), sql = "SELECT * FROM quotes WHERE ? <= ID and ID <= ?", lowerBound = 0, upperBound = 100, numPartitions = 5, mapRow = (r: ResultSet) => (r.getInt(2),r.getString(3)))
quotes: org.apache.spark.rdd.JdbcRDD[(Int, String)] = JdbcRDD[9] at JdbcRDD at <console>:23
Perform a Join with MySQL
Maybe a little more than one line …
val locations = sc.cassandraTable("newyork","timelines") .filter(_.getString("character") == "plissken") .map(row => (row.getInt("time"), row.getString("location")))
quotes.join(locations) .take(1) .foreach(println)
val locations = sc.cassandraTable("newyork","timelines") .filter(_.getString("character") == "plissken") .map(row => (row.getInt("time"), row.getString("location")))
quotes.join(locations) .take(1) .foreach(println)
cassandraTable
JdbcRDD
plissken, 5, courtplissken, 5, court
5, ‘Bob Hauk: …'5, ‘Bob Hauk: …'
5,court5,court 5,(‘Bob Hauk: …’,court)5,(‘Bob Hauk: …’,court)
(5, ( Bob Hauk: There was an accident. About an hour ago, a small jet went down inside New York City.
The President was on board. Snake Plissken: The president of what?, Court))
(5, ( Bob Hauk: There was an accident. About an hour ago, a small jet went down inside New York City.
The President was on board. Snake Plissken: The president of what?, Court))
join
Easy Objects with Case Classes
We have the technology to make this even easier!
cassandraTable[TimelineRow]
character,time,locationcharacter,time,location
character:plissken, time:8, location: Stealth Glidercharacter:plissken, time:8, location: Stealth Glider
filter
character == plisskencharacter == plissken
time == 8time == 8
case class TimelineRow(character: String, time: Int, location: String)
sc.cassandraTable[TimelineRow]("newyork","timelines").filter(_.character == "plissken").filter(_.time == 8).toArray
res13: Array[TimelineRow] = Array(TimelineRow(plissken,8,Stealth Glider))
case class TimelineRow(character: String, time: Int, location: String)
sc.cassandraTable[TimelineRow]("newyork","timelines").filter(_.character == "plissken").filter(_.time == 8).toArray
res13: Array[TimelineRow] = Array(TimelineRow(plissken,8,Stealth Glider))
TimelineRow
A Map Reduce for Word Count …
scala> sc.cassandraTable("newyork","presidentlocations").map(_.getString("location")).flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).toArray
res17: Array[(String, Int)] = Array((1,3), (House,4), (NYC,3), (Force,3), (White,4), (Air,3))
scala> sc.cassandraTable("newyork","presidentlocations").map(_.getString("location")).flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).toArray
res17: Array[(String, Int)] = Array((1,3), (House,4), (NYC,3), (Force,3), (White,4), (Air,3))
11 white housewhite house
whitewhite househouse
white, 1white, 1 house, 1house, 1
house, 1house, 1 house, 1house, 1
house, 2house, 2
cassandraTable
getString
_.split(" ")
(_,1)
reduceByKey(_ + _)
white housewhite house
Selected RDD transformations
● min(), max(), count()● reduce[T](f: (T, T) ⇒ T): T● fold[T](zeroValue: T)(op: (T, T) ⇒ T): T● aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U
● flatMap[U](func: (T) ⇒ TraversableOnce[U]): RDD[U]● mapPartitions[U](
f: (Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean): RDD[U]
● sortBy[K](f: (T) ⇒ K, ascending: Boolean = true)● groupBy[K](f: (T) ⇒ K): RDD[(K, Iterable[T])]
● intersection(other: RDD[T]): RDD[T]● union(other: RDD[T]): RDD[T]● subtract(other: RDD[T]): RDD[T]
● zip[U](other: RDD[U]): RDD[(T, U)]● keyBy[K](f: (T) ⇒ K): RDD[(K, T)]
● sample(withReplacement: Boolean, fraction: Double)
RDD can do even more...
How Fast is it?
● Reading big data from Cassandra: – Spark ~2x faster than Hadoop
● Minimum latency (1 node, vnodes disabled, tiny data):– Spark: 0.7s
– Hadoop: ~20s
● Minimum latency (1 node, vnodes enabled):– Spark: 1s
– Hadoop: ~8 minutes
● In memory processing:– up to 100x faster than Hadoop
source: https://amplab.cs.berkeley.edu/benchmark/
In-memory Processing
val rdd = sc.cassandraTable("newyork","presidentlocations") .filter(...) .map(...) .reduce(...) .cache
rdd.first // slow, loads data from Cassandra and keeps in memoryrdd.first // fast, doesn't read from Cassandra, reads from memory
val rdd = sc.cassandraTable("newyork","presidentlocations") .filter(...) .map(...) .reduce(...) .cache
rdd.first // slow, loads data from Cassandra and keeps in memoryrdd.first // fast, doesn't read from Cassandra, reads from memory
Call cache or persist(storageLevel) to store RDD data in memory.
Multiple StorageLevels available:● MEMORY_ONLY● MEMORY_ONLY_SER● MEMORY_AND_DISK● MEMORY_AND_DISK_SER● DISK_ONLY
Also replicated variants available: just append _2 to the constant name.
Fault Tolerance
11 22
44 55
33
66
Node 1
44 55
77 88
66
99
Node 2
77 88
11 22
99
33
Node 3
Replication Factor = 2
Cassandra RDD
MappedRDD
FilteredRDD
11
map
filter
cassandraTable
22 3311 44 55 66 77 88 99
77 88 99
Standalone App Example
https://github.com/RussellSpitzer/spark-cassandra-csv
Dodge, Caravan, RedFord, F150, Black
Toyota, Prius, Green
Dodge, Caravan, RedFord, F150, Black
Toyota, Prius, Green
Car, Model, ColorCar, Model, Color
RDD[CassandraRow]RDD[CassandraRow]
CassandraCassandra
FavoriteCarsTable
FavoriteCarsTable
Column Mapping
CSV
Useful modules / projects
● Java API– for diehard Java developers
● Python API– for those allergic to static types
● Shark – Hive QL on Spark (discontinued)
● Spark SQL – new SQL engine based on Catalyst query planner
● Spark Streaming– microbatch streaming framework
● MLLib – machine learning library
● GraphX– efficient representation and processing of graph data
We're hiring!
http://www.datastax.com/company/careers
Thanks for listening!
Questions?
There is plenty more we can do with Spark but …