manchester hadoop meetup: cassandra spark internals

102
©2013 DataStax Confidential. Do not distribute without consent. @chbatey Christopher Batey Manchester Hadoop and Big Data Meetup

Upload: christopher-batey

Post on 14-Aug-2015

113 views

Category:

Software


1 download

TRANSCRIPT

Page 1: Manchester Hadoop Meetup: Cassandra Spark internals

©2013 DataStax Confidential. Do not distribute without consent.

@chbateyChristopher Batey

Manchester Hadoop and Big Data Meetup

Page 2: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

Who am I?•Maintainer of Stubbed Cassandra•Other OS projects: akka-persistence, wiremock•Advocate for Apache Cassandra•Part time consultant

Page 3: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

Agenda• Why - running Spark + C*• How - Spark partitions are built up• Example - KillrWeather

Page 4: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

OLTP OLAP Batch

Page 5: Manchester Hadoop Meetup: Cassandra Spark internals

Weather data streaming

Incomingweatherevents

Apache KafkaProducer

Consumer

NodeGuardian

Dashboard

Page 6: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

Page 7: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

Page 8: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

Run this your self• https://github.com/killrweather/killrweather

Page 9: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

The details

Page 10: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

Pop quiz!• Spark RDD• Spark partition• Spark worker• Spark task• Cassandra row• Cassandra partition• Cassandra token range

Page 11: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

Spark architecture

Page 12: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

org.apache.spark.rdd.RDD• Resilient Distributed Dataset (RDD)• Created through transformations on data (map,filter..) or other RDDs • Immutable• Partitioned• Reusable

Page 13: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

RDD Operations• Transformations - Similar to Scala collections API• Produce new RDDs • filter, flatmap, map, distinct, groupBy, union, zip, reduceByKey, subtract

• Actions• Require materialization of the records to generate a value• collect: Array[T], count, fold, reduce..

Page 14: Manchester Hadoop Meetup: Cassandra Spark internals

Spark RDDs Represent a Large

Amount of Data Partitioned into Chunks

RDD

1 2 3

4 5 6

7 8 9Worker 2

Worker 1 Worker 3

Worker 4

Page 15: Manchester Hadoop Meetup: Cassandra Spark internals

Worker 2

Worker 1

Spark RDDs Represent a Large

Amount of Data Partitioned into Chunks

RDD

2

346

7 8 9

Worker 3

Worker 4

1 5

Page 16: Manchester Hadoop Meetup: Cassandra Spark internals

Cassandra tableCREATE TABLE daily_aggregate_precip ( weather_station text, year int, month int, day int, precipitation counter, PRIMARY KEY ((weather_station), year, month, day) )

PRIMARY KEY ((weatherstation_id),year,month,day)

Partition Key Clustering Columns

Page 17: Manchester Hadoop Meetup: Cassandra Spark internals

Cassandra Data is Distributed By Token Range

Page 18: Manchester Hadoop Meetup: Cassandra Spark internals

Cassandra Data is Distributed By Token Range

0

500

999

Page 19: Manchester Hadoop Meetup: Cassandra Spark internals

Cassandra Data is Distributed By Token Range

0

500

Node 1

Node 2

Node 3

Node 4

Page 20: Manchester Hadoop Meetup: Cassandra Spark internals

Cassandra Data is Distributed By Token Range

0

500

Node 1

Node 2

Node 3

Node 4

Without vnodes

Page 21: Manchester Hadoop Meetup: Cassandra Spark internals

Cassandra Data is Distributed By Token Range

0

500

Node 1

Node 2

Node 3

Node 4

With vnodes

Page 22: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

Replication strategy• NetworkTopology- Every Cassandra node knows its DC and Rack- Replicas won’t be put on the same rack unless Replication Factor > # of racks- Unfortunately Cassandra can’t create servers and racks on the fly to fix this :(

Page 23: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

Replication

DC1 DC2

client

RF3 RF3

C

RC

WRITECL = 1 We have replication!

Page 24: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

Pop quiz!• Spark RDD• Spark partition• Spark worker• Spark task• Cassandra row• Cassandra partition• Cassandra token range

Page 25: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

Goals• Spark partitions made up of token ranges on the same

node• Tasks to be executed on workers co-located with that

node• Same(ish) amount of data in each Spark partition

Page 26: Manchester Hadoop Meetup: Cassandra Spark internals

Node 1

120-220

300-500

780-830

0-50

• spark.cassandra.input.split.size_in_mb 64 • system.size_estimates (# partitions & mean size) • tokens per spark partition

The Connector Uses Information on the Node to Make Spark Partitions

Page 27: Manchester Hadoop Meetup: Cassandra Spark internals

Node 1

120-220

300-500

0-50

The Connector Uses Information on the Node to Make Spark Partitions

1

780-830

Page 28: Manchester Hadoop Meetup: Cassandra Spark internals

1

Node 1

120-220

300-500

0-50

The Connector Uses Information on the Node to Make Spark Partitions

780-830

Page 29: Manchester Hadoop Meetup: Cassandra Spark internals

2

1

Node 1 300-500

0-50

The Connector Uses Information on the Node to Make Spark Partitions

780-830

Page 30: Manchester Hadoop Meetup: Cassandra Spark internals

2

1

Node 1 300-500

0-50

The Connector Uses Information on the Node to Make Spark Partitions

780-830

Page 31: Manchester Hadoop Meetup: Cassandra Spark internals

2

1

Node 1

300-400

0-50

The Connector Uses Information on the Node to Make Spark Partitions

780-830400-500

Page 32: Manchester Hadoop Meetup: Cassandra Spark internals

21

Node 1

0-50

The Connector Uses Information on the Node to Make Spark Partitions

780-830400-500

Page 33: Manchester Hadoop Meetup: Cassandra Spark internals

21

Node 1

0-50

The Connector Uses Information on the Node to Make Spark Partitions

780-830400-500

3

Page 34: Manchester Hadoop Meetup: Cassandra Spark internals

21

Node 1

0-50

The Connector Uses Information on the Node to Make Spark Partitions

780-830

3

400-500

Page 35: Manchester Hadoop Meetup: Cassandra Spark internals

21

Node 1

0-50

The Connector Uses Information on the Node to Make Spark Partitions

780-830

3

Page 36: Manchester Hadoop Meetup: Cassandra Spark internals

4

21

Node 1

0-50

The Connector Uses Information on the Node to Make Spark Partitions

780-830

3

Page 37: Manchester Hadoop Meetup: Cassandra Spark internals

4

21

Node 1

0-50

The Connector Uses Information on the Node to Make Spark Partitions

780-830

3

Page 38: Manchester Hadoop Meetup: Cassandra Spark internals

421

Node 1

The Connector Uses Information on the Node to Make Spark Partitions

3

Page 39: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

Key classes• CassandraTableScanRDD, CassandraRDD- getPreferredLocations• CassandraTableRowReaderProvider- DataSizeEstimates - goes to C* • CassandraPartitioner- Gets ring information from the driver• CassandraPartition- endpoints- tokenRanges

Page 40: Manchester Hadoop Meetup: Cassandra Spark internals

4

spark.cassandra.input.fetch.size_in_rows 50

Data is Retrieved Using the DataStax Java Driver

0-50780-830

Node 1

Page 41: Manchester Hadoop Meetup: Cassandra Spark internals

4

spark.cassandra.input.fetch.size_in_rows 50

Data is Retrieved Using the DataStax Java Driver

0-50

780-830

Node 1

SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830

SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50

Page 42: Manchester Hadoop Meetup: Cassandra Spark internals

4

spark.cassandra.input.fetch.size_in_rows 50

Data is Retrieved Using the DataStax Java Driver

0-50

780-830

Node 1

SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830

SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50

Page 43: Manchester Hadoop Meetup: Cassandra Spark internals

4

spark.cassandra.input.fetch.size_in_rows 50

Data is Retrieved Using the DataStax Java Driver

0-50

780-830

Node 1

SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830

SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50

50 CQL Rows

Page 44: Manchester Hadoop Meetup: Cassandra Spark internals

4

spark.cassandra.input.fetch.size_in_rows 50

Data is Retrieved Using the DataStax Java Driver

0-50

780-830

Node 1

SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830

SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50

50 CQL Rows

Page 45: Manchester Hadoop Meetup: Cassandra Spark internals

4

spark.cassandra.input.fetch.size_in_rows 50

Data is Retrieved Using the DataStax Java Driver

0-50

780-830

Node 1

SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830

SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50

50 CQL Rows50 CQL Rows

Page 46: Manchester Hadoop Meetup: Cassandra Spark internals

4

spark.cassandra.input.fetch.size_in_rows 50

Data is Retrieved Using the DataStax Java Driver

0-50

780-830

Node 1

SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830

SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50

50 CQL Rows50 CQL Rows

Page 47: Manchester Hadoop Meetup: Cassandra Spark internals

4

spark.cassandra.input.fetch.size_in_rows 50

Data is Retrieved Using the DataStax Java Driver

0-50

780-830

Node 1

SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830

SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50

50 CQL Rows50 CQL Rows50 CQL Rows

Page 48: Manchester Hadoop Meetup: Cassandra Spark internals

4

spark.cassandra.input.fetch.size_in_rows 50

Data is Retrieved Using the DataStax Java Driver

0-50

780-830

Node 1

SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830

SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50

50 CQL Rows50 CQL Rows50 CQL Rows

Page 49: Manchester Hadoop Meetup: Cassandra Spark internals

4

spark.cassandra.input.fetch.size_in_rows 50

Data is Retrieved Using the DataStax Java Driver

0-50

780-830

Node 1

SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830

SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50

50 CQL Rows50 CQL Rows50 CQL Rows 50 CQL Rows

Page 50: Manchester Hadoop Meetup: Cassandra Spark internals

4

spark.cassandra.input.page.row.size 50

Data is Retrieved Using the DataStax Java Driver

0-50

780-830

Node 1

SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 5050 CQL Rows50 CQL Rows50 CQL Rows50 CQL Rows50 CQL Rows

50 CQL Rows

Page 51: Manchester Hadoop Meetup: Cassandra Spark internals

4

spark.cassandra.input.page.row.size 50

Data is Retrieved Using the DataStax Java Driver

0-50

780-830

Node 1

SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 5050 CQL Rows50 CQL Rows50 CQL Rows50 CQL Rows50 CQL Rows

50 CQL Rows

50 CQL Rows50 CQL Rows50 CQL Rows50 CQL Rows

Page 52: Manchester Hadoop Meetup: Cassandra Spark internals

4

spark.cassandra.input.page.row.size 50

Data is Retrieved Using the DataStax Java Driver

0-50

780-830

Node 1

SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 5050 CQL Rows50 CQL Rows50 CQL Rows50 CQL Rows50 CQL Rows

50 CQL Rows50 CQL Rows50 CQL Rows50 CQL Rows50 CQL Rows

Page 53: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

Paging

Page 54: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

Other bits and bobs• LocalNodeFirstLoadBalancingPolicy

Page 55: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

Then we’re into Spark land• Spark partitions are made up of C* partitions that exist

on the same node• C* connector tells Spark which workers to use via

information from the C* driver

Page 56: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

RDD -> C* Table

Page 57: Manchester Hadoop Meetup: Cassandra Spark internals

Node 2

Node 1

RDD

2

346

7 8 9

Node 3

Node 4

1 5

The Spark Cassandra Connector saveToCassandra

method can be called on almost all RDDs

rdd.saveToCassandra("Keyspace","Table")

Page 58: Manchester Hadoop Meetup: Cassandra Spark internals

Node 11 Java Driver

Page 59: Manchester Hadoop Meetup: Cassandra Spark internals

Node 11 Java Driver

1,1,1

1,2,1

2,1,1

3,8,1

3,2,1

3,4,1

3,5,1

3,1,1

1,4,1

5,4,1

2,4,1

8,4,1

9,4,1

3,9,1

Page 60: Manchester Hadoop Meetup: Cassandra Spark internals

Node 11 Java Driver

1,1,1

1,2,1

2,1,1

3,8,1

3,2,1

3,4,1

3,5,1

3,1,1

1,4,1

5,4,1

2,4,1

8,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partitionspark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

3,9,1

Page 61: Manchester Hadoop Meetup: Cassandra Spark internals

Node 11 Java Driver

1,1,11,2,1

2,1,1

3,8,1

3,2,1

3,4,1

3,5,1

3,1,1

1,4,1

5,4,1

2,4,1

8,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partitionspark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

3,9,1

PK=1

Page 62: Manchester Hadoop Meetup: Cassandra Spark internals

Node 11 Java Driver

1,1,1 1,2,1

2,1,1

3,8,1

3,2,1

3,4,1

3,5,1

3,1,1

1,4,1

5,4,1

2,4,1

8,4,1

9,4,1

11,4,

spark.cassandra.output.batch.grouping.key partitionspark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

3,9,1

PK=1

Page 63: Manchester Hadoop Meetup: Cassandra Spark internals

Node 11 Java Driver

1,1,1 1,2,1

2,1,1

3,8,1

3,2,1

3,4,1

3,5,1

3,1,1

1,4,1

5,4,1

2,4,1

8,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partitionspark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

3,9,1

PK=1

PK=2

Page 64: Manchester Hadoop Meetup: Cassandra Spark internals

Node 11 Java Driver

1,1,1 1,2,1

2,1,1

3,8,1

3,2,1

3,4,1

3,5,1

3,1,1

1,4,1

5,4,1

2,4,1

8,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partitionspark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

3,9,1

PK=1

PK=2

Page 65: Manchester Hadoop Meetup: Cassandra Spark internals

Node 11 Java Driver

1,1,1 1,2,1

2,1,1

3,8,13,2,1 3,4,1 3,5,1

3,1,1

1,4,1

5,4,1

2,4,1

8,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partition spark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

3,9,1

PK=1

PK=2

PK=3

Page 66: Manchester Hadoop Meetup: Cassandra Spark internals

Node 11 Java Driver

1,1,1 1,2,1

2,1,1

1,4,1

5,4,1

2,4,1

8,4,1

9,4,1

11,4,3,9,1

3,1,1

spark.cassandra.output.batch.grouping.key partition spark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

PK=1

PK=2

Page 67: Manchester Hadoop Meetup: Cassandra Spark internals

Node 11 Java Driver

1,1,1 1,2,1

2,1,1

3,1,1

1,4,1

5,4,1

2,4,1

8,4,1

9,4,1

11,4,3,9,1 spark.cassandra.output.batch.grouping.key partition spark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

PK=1

PK=2

PK=3

Page 68: Manchester Hadoop Meetup: Cassandra Spark internals

Node 11 Java Driver

1,1,1 1,2,1

2,1,1

3,1,1

1,4,1

5,4,1

2,4,1

8,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partition spark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

3,9,1

PK=1

PK=2

PK=3

Page 69: Manchester Hadoop Meetup: Cassandra Spark internals

Node 11 Java Driver

2,1,1

3,1,1

5,4,1

2,4,1

8,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partition spark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

3,9,1

PK=2

PK=3

Page 70: Manchester Hadoop Meetup: Cassandra Spark internals

Node 11 Java Driver

2,1,1

3,1,1

5,4,1

2,4,1

8,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partition spark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

3,9,1

PK=2

PK=3

PK=5

Page 71: Manchester Hadoop Meetup: Cassandra Spark internals

Node 11 Java Driver

2,1,1

3,1,1

5,4,1

2,4,1

8,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partition spark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

3,9,1

PK=2

PK=3

PK=5

Page 72: Manchester Hadoop Meetup: Cassandra Spark internals

Node 11 Java Driver

2,1,1

3,1,1

5,4,1

2,4,18,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partition spark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

3,9,13,9,1

PK=2

PK=3

PK=5

Page 73: Manchester Hadoop Meetup: Cassandra Spark internals

Node 11 Java Driver

2,1,1

3,1,1

5,4,1

2,4,18,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partition spark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

3,9,13,9,1

Write Acknowledged PK=2

PK=3

PK=5

Page 74: Manchester Hadoop Meetup: Cassandra Spark internals

Node 11 Java Driver

2,1,1

3,1,1

5,4,1

2,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partition spark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

8,4,1

3,9,1

PK=2

PK=3

PK=5

Page 75: Manchester Hadoop Meetup: Cassandra Spark internals

Node 11 Java Driver

3,1,1

5,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partition spark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

8,4,1

3,9,1

PK=3

PK=5

Page 76: Manchester Hadoop Meetup: Cassandra Spark internals

Node 11 Java Driver

3,1,1

5,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partition spark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

8,4,1

3,9,1

PK=8

PK=3

PK=5

Page 77: Manchester Hadoop Meetup: Cassandra Spark internals

Node 11 Java Driver

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partition spark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

3,1,1

5,4,1

8,4,1

3,9,1

PK=8

PK=3

PK=5

Page 78: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

Example

Page 79: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

Page 80: Manchester Hadoop Meetup: Cassandra Spark internals

Weather Station Analysis•Weather station collects data•Cassandra stores in sequence• Spark rolls up data into new

tables

Windsor CaliforniaJuly 1, 2014

High: 73.4FLow : 51.4F

Page 81: Manchester Hadoop Meetup: Cassandra Spark internals

raw_weather_dataCREATE TABLE raw_weather_data ( weather_station text, // Composite of Air Force Datsav3 station number and NCDC WBAN number year int, // Year collected month int, // Month collected day int, // Day collected hour int, // Hour collected temperature double, // Air temperature (degrees Celsius) dewpoint double, // Dew point temperature (degrees Celsius) pressure double, // Sea level pressure (hectopascals) wind_direction int, // Wind direction in degrees. 0-359 wind_speed double, // Wind speed (meters per second) sky_condition int, // Total cloud cover (coded, see format documentation) sky_condition_text text, // Non-coded sky conditions one_hour_precip double, // One-hour accumulated liquid precipitation (millimeters) six_hour_precip double, // Six-hour accumulated liquid precipitation (millimeters) PRIMARY KEY ((weather_station), year, month, day, hour) ) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC, hour DESC);

Reverses data in the storage engine.

Page 82: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

Page 83: Manchester Hadoop Meetup: Cassandra Spark internals

Primary key relationship

PRIMARY KEY ((weatherstation_id),year,month,day,hour)

Page 84: Manchester Hadoop Meetup: Cassandra Spark internals

Primary key relationship

PRIMARY KEY ((weatherstation_id),year,month,day,hour)

Partition Key

Page 85: Manchester Hadoop Meetup: Cassandra Spark internals

Primary key relationship

PRIMARY KEY ((weatherstation_id),year,month,day,hour)

Partition Key Clustering Columns

WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC, hour DESC);

Page 86: Manchester Hadoop Meetup: Cassandra Spark internals

Primary key relationship

PRIMARY KEY ((weatherstation_id),year,month,day,hour)

Partition Key Clustering Columns

10010:99999

WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC, hour DESC);

Page 87: Manchester Hadoop Meetup: Cassandra Spark internals

2005:12:1:7:temp

-5.6

Primary key relationship

PRIMARY KEY ((weatherstation_id),year,month,day,hour)

Partition Key Clustering Columns

10010:99999-5.1

2005:12:1:8:temp

-4.9

2005:12:1:9:temp

-5.3

2005:12:1:10:temp

WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC, hour DESC);

Page 88: Manchester Hadoop Meetup: Cassandra Spark internals

Data Locality

weatherstation_id=‘10010:99999’ ?

1000 Node Cluster

You are here!

Page 89: Manchester Hadoop Meetup: Cassandra Spark internals

Query patterns•Range queries• “Slice” operation on disk

SELECT weatherstation,hour,temperature FROM raw_weather_data WHERE weatherstation_id=‘10010:99999' AND year = 2005 AND month = 12 AND day = 1 AND hour >= 7 AND hour <= 10;

Single seek on disk

2005:12:1:12

-5.4

2005:12:1:11

-4.9

2005:12:1:7

-5.6-5.1

2005:12:1:8

-4.9

2005:12:1:910010:99999

-5.3

2005:12:1:10

Partition key for locality

Page 90: Manchester Hadoop Meetup: Cassandra Spark internals

Query patterns•Range queries• “Slice” operation on disk

Programmers like this

Sorted by event_time

2005:12:1:7

-5.6

2005:12:1:8

-5.1

2005:12:1:9

-4.9

10010:99999

10010:99999

10010:99999

weather_station hour temperature

2005:12:1:10

-5.3

10010:99999

SELECT weatherstation,hour,temperature FROM raw_weather_data WHERE weatherstation_id=‘10010:99999' AND year = 2005 AND month = 12 AND day = 1 AND hour >= 7 AND hour <= 10;

Page 91: Manchester Hadoop Meetup: Cassandra Spark internals

weather_stationCREATE TABLE weather_station ( id text PRIMARY KEY, // Composite of Air Force Datsav3 station number and NCDC WBAN number name text, // Name of reporting station country_code text, // 2 letter ISO Country ID state_code text, // 2 letter state code for US stations call_sign text, // International station call sign lat double, // Latitude in decimal degrees long double, // Longitude in decimal degrees elevation double // Elevation in meters );

Lookup table

Page 92: Manchester Hadoop Meetup: Cassandra Spark internals

daily_aggregate_temperatureCREATE TABLE daily_aggregate_temperature ( weather_station text, year int, month int, day int, high double, low double, mean double, variance double, stdev double, PRIMARY KEY ((weather_station), year, month, day) ) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC);

SELECT high, low FROM daily_aggregate_temperature WHERE weather_station='010010:99999' AND year=2005 AND month=12 AND day=3;

high | low ------+------ 1.8 | -1.5

Page 93: Manchester Hadoop Meetup: Cassandra Spark internals

daily_aggregate_precipCREATE TABLE daily_aggregate_precip ( weather_station text, year int, month int, day int, precipitation counter, PRIMARY KEY ((weather_station), year, month, day) ) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC);

SELECT precipitation FROM daily_aggregate_precip WHERE weather_station='010010:99999' AND year=2005 AND month=12 AND day>=1 AND day <= 7;

0

10

20

30

40

1 2 3 4 5 6 7

17

26

20

33

12

0

Page 94: Manchester Hadoop Meetup: Cassandra Spark internals

Weather Station Stream Analysis•Weather station collects data•Data processed in stream•Data stored in Cassandra

Windsor CaliforniaTodayRainfall total: 1.2cmHigh: 73.4FLow : 51.4F

Page 95: Manchester Hadoop Meetup: Cassandra Spark internals

Incoming data from Kafka725030:14732,2008,01,01,00,5.0,-3.9,1020.4,270,4.6,2,0.0,0.0

Page 96: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

Creating a Stream

Page 97: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

Saving the raw data

Page 98: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

Building an aggregateCREATE TABLE daily_aggregate_precip ( weather_station text, year int, month int, day int, precipitation counter, PRIMARY KEY ((weather_station), year, month, day) ) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC);

CQL Counter

Page 99: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

Batch job on the fly?

(count: 24, mean: 14.428150, stdev: 7.092196, max: 28.034969, min: 0.675863)

(count: 11242, mean: 8.921956, stdev: 7.428311, max: 29.997986, min: -2.200000)

Page 100: Manchester Hadoop Meetup: Cassandra Spark internals

Weather data streaming

Load Generator or Data import

Apache KafkaProducer

Consumer

NodeGuardian

Dashboard

Page 101: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

Summary• Cassandra - always-on operational database• Spark- Batch analytics- Stream processing and saving back to Cassandra

Page 102: Manchester Hadoop Meetup: Cassandra Spark internals

@chbatey

Thanks for listening• Follow me on twitter @chbatey• Cassandra + Fault tolerance posts a plenty: • http://christopher-batey.blogspot.co.uk/• Cassandra resources: http://planetcassandra.org/