manchester hadoop meetup: cassandra spark internals

Post on 14-Aug-2015

114 Views

Category:

Software

1 Downloads

Preview:

Click to see full reader

TRANSCRIPT

©2013 DataStax Confidential. Do not distribute without consent.

@chbateyChristopher Batey

Manchester Hadoop and Big Data Meetup

@chbatey

Who am I?•Maintainer of Stubbed Cassandra•Other OS projects: akka-persistence, wiremock•Advocate for Apache Cassandra•Part time consultant

@chbatey

Agenda• Why - running Spark + C*• How - Spark partitions are built up• Example - KillrWeather

@chbatey

OLTP OLAP Batch

Weather data streaming

Incomingweatherevents

Apache KafkaProducer

Consumer

NodeGuardian

Dashboard

@chbatey

@chbatey

@chbatey

Run this your self• https://github.com/killrweather/killrweather

@chbatey

The details

@chbatey

Pop quiz!• Spark RDD• Spark partition• Spark worker• Spark task• Cassandra row• Cassandra partition• Cassandra token range

@chbatey

Spark architecture

@chbatey

org.apache.spark.rdd.RDD• Resilient Distributed Dataset (RDD)• Created through transformations on data (map,filter..) or other RDDs • Immutable• Partitioned• Reusable

@chbatey

RDD Operations• Transformations - Similar to Scala collections API• Produce new RDDs • filter, flatmap, map, distinct, groupBy, union, zip, reduceByKey, subtract

• Actions• Require materialization of the records to generate a value• collect: Array[T], count, fold, reduce..

Spark RDDs Represent a Large

Amount of Data Partitioned into Chunks

RDD

1 2 3

4 5 6

7 8 9Worker 2

Worker 1 Worker 3

Worker 4

Worker 2

Worker 1

Spark RDDs Represent a Large

Amount of Data Partitioned into Chunks

RDD

2

346

7 8 9

Worker 3

Worker 4

1 5

Cassandra tableCREATE TABLE daily_aggregate_precip ( weather_station text, year int, month int, day int, precipitation counter, PRIMARY KEY ((weather_station), year, month, day) )

PRIMARY KEY ((weatherstation_id),year,month,day)

Partition Key Clustering Columns

Cassandra Data is Distributed By Token Range

Cassandra Data is Distributed By Token Range

0

500

999

Cassandra Data is Distributed By Token Range

0

500

Node 1

Node 2

Node 3

Node 4

Cassandra Data is Distributed By Token Range

0

500

Node 1

Node 2

Node 3

Node 4

Without vnodes

Cassandra Data is Distributed By Token Range

0

500

Node 1

Node 2

Node 3

Node 4

With vnodes

@chbatey

Replication strategy• NetworkTopology- Every Cassandra node knows its DC and Rack- Replicas won’t be put on the same rack unless Replication Factor > # of racks- Unfortunately Cassandra can’t create servers and racks on the fly to fix this :(

@chbatey

Replication

DC1 DC2

client

RF3 RF3

C

RC

WRITECL = 1 We have replication!

@chbatey

Pop quiz!• Spark RDD• Spark partition• Spark worker• Spark task• Cassandra row• Cassandra partition• Cassandra token range

@chbatey

Goals• Spark partitions made up of token ranges on the same

node• Tasks to be executed on workers co-located with that

node• Same(ish) amount of data in each Spark partition

Node 1

120-220

300-500

780-830

0-50

• spark.cassandra.input.split.size_in_mb 64 • system.size_estimates (# partitions & mean size) • tokens per spark partition

The Connector Uses Information on the Node to Make Spark Partitions

Node 1

120-220

300-500

0-50

The Connector Uses Information on the Node to Make Spark Partitions

1

780-830

1

Node 1

120-220

300-500

0-50

The Connector Uses Information on the Node to Make Spark Partitions

780-830

2

1

Node 1 300-500

0-50

The Connector Uses Information on the Node to Make Spark Partitions

780-830

2

1

Node 1 300-500

0-50

The Connector Uses Information on the Node to Make Spark Partitions

780-830

2

1

Node 1

300-400

0-50

The Connector Uses Information on the Node to Make Spark Partitions

780-830400-500

21

Node 1

0-50

The Connector Uses Information on the Node to Make Spark Partitions

780-830400-500

21

Node 1

0-50

The Connector Uses Information on the Node to Make Spark Partitions

780-830400-500

3

21

Node 1

0-50

The Connector Uses Information on the Node to Make Spark Partitions

780-830

3

400-500

21

Node 1

0-50

The Connector Uses Information on the Node to Make Spark Partitions

780-830

3

4

21

Node 1

0-50

The Connector Uses Information on the Node to Make Spark Partitions

780-830

3

4

21

Node 1

0-50

The Connector Uses Information on the Node to Make Spark Partitions

780-830

3

421

Node 1

The Connector Uses Information on the Node to Make Spark Partitions

3

@chbatey

Key classes• CassandraTableScanRDD, CassandraRDD- getPreferredLocations• CassandraTableRowReaderProvider- DataSizeEstimates - goes to C* • CassandraPartitioner- Gets ring information from the driver• CassandraPartition- endpoints- tokenRanges

4

spark.cassandra.input.fetch.size_in_rows 50

Data is Retrieved Using the DataStax Java Driver

0-50780-830

Node 1

4

spark.cassandra.input.fetch.size_in_rows 50

Data is Retrieved Using the DataStax Java Driver

0-50

780-830

Node 1

SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830

SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50

4

spark.cassandra.input.fetch.size_in_rows 50

Data is Retrieved Using the DataStax Java Driver

0-50

780-830

Node 1

SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830

SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50

4

spark.cassandra.input.fetch.size_in_rows 50

Data is Retrieved Using the DataStax Java Driver

0-50

780-830

Node 1

SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830

SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50

50 CQL Rows

4

spark.cassandra.input.fetch.size_in_rows 50

Data is Retrieved Using the DataStax Java Driver

0-50

780-830

Node 1

SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830

SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50

50 CQL Rows

4

spark.cassandra.input.fetch.size_in_rows 50

Data is Retrieved Using the DataStax Java Driver

0-50

780-830

Node 1

SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830

SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50

50 CQL Rows50 CQL Rows

4

spark.cassandra.input.fetch.size_in_rows 50

Data is Retrieved Using the DataStax Java Driver

0-50

780-830

Node 1

SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830

SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50

50 CQL Rows50 CQL Rows

4

spark.cassandra.input.fetch.size_in_rows 50

Data is Retrieved Using the DataStax Java Driver

0-50

780-830

Node 1

SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830

SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50

50 CQL Rows50 CQL Rows50 CQL Rows

4

spark.cassandra.input.fetch.size_in_rows 50

Data is Retrieved Using the DataStax Java Driver

0-50

780-830

Node 1

SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830

SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50

50 CQL Rows50 CQL Rows50 CQL Rows

4

spark.cassandra.input.fetch.size_in_rows 50

Data is Retrieved Using the DataStax Java Driver

0-50

780-830

Node 1

SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830

SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50

50 CQL Rows50 CQL Rows50 CQL Rows 50 CQL Rows

4

spark.cassandra.input.page.row.size 50

Data is Retrieved Using the DataStax Java Driver

0-50

780-830

Node 1

SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 5050 CQL Rows50 CQL Rows50 CQL Rows50 CQL Rows50 CQL Rows

50 CQL Rows

4

spark.cassandra.input.page.row.size 50

Data is Retrieved Using the DataStax Java Driver

0-50

780-830

Node 1

SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 5050 CQL Rows50 CQL Rows50 CQL Rows50 CQL Rows50 CQL Rows

50 CQL Rows

50 CQL Rows50 CQL Rows50 CQL Rows50 CQL Rows

4

spark.cassandra.input.page.row.size 50

Data is Retrieved Using the DataStax Java Driver

0-50

780-830

Node 1

SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 5050 CQL Rows50 CQL Rows50 CQL Rows50 CQL Rows50 CQL Rows

50 CQL Rows50 CQL Rows50 CQL Rows50 CQL Rows50 CQL Rows

@chbatey

Paging

@chbatey

Other bits and bobs• LocalNodeFirstLoadBalancingPolicy

@chbatey

Then we’re into Spark land• Spark partitions are made up of C* partitions that exist

on the same node• C* connector tells Spark which workers to use via

information from the C* driver

@chbatey

RDD -> C* Table

Node 2

Node 1

RDD

2

346

7 8 9

Node 3

Node 4

1 5

The Spark Cassandra Connector saveToCassandra

method can be called on almost all RDDs

rdd.saveToCassandra("Keyspace","Table")

Node 11 Java Driver

Node 11 Java Driver

1,1,1

1,2,1

2,1,1

3,8,1

3,2,1

3,4,1

3,5,1

3,1,1

1,4,1

5,4,1

2,4,1

8,4,1

9,4,1

3,9,1

Node 11 Java Driver

1,1,1

1,2,1

2,1,1

3,8,1

3,2,1

3,4,1

3,5,1

3,1,1

1,4,1

5,4,1

2,4,1

8,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partitionspark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

3,9,1

Node 11 Java Driver

1,1,11,2,1

2,1,1

3,8,1

3,2,1

3,4,1

3,5,1

3,1,1

1,4,1

5,4,1

2,4,1

8,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partitionspark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

3,9,1

PK=1

Node 11 Java Driver

1,1,1 1,2,1

2,1,1

3,8,1

3,2,1

3,4,1

3,5,1

3,1,1

1,4,1

5,4,1

2,4,1

8,4,1

9,4,1

11,4,

spark.cassandra.output.batch.grouping.key partitionspark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

3,9,1

PK=1

Node 11 Java Driver

1,1,1 1,2,1

2,1,1

3,8,1

3,2,1

3,4,1

3,5,1

3,1,1

1,4,1

5,4,1

2,4,1

8,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partitionspark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

3,9,1

PK=1

PK=2

Node 11 Java Driver

1,1,1 1,2,1

2,1,1

3,8,1

3,2,1

3,4,1

3,5,1

3,1,1

1,4,1

5,4,1

2,4,1

8,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partitionspark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

3,9,1

PK=1

PK=2

Node 11 Java Driver

1,1,1 1,2,1

2,1,1

3,8,13,2,1 3,4,1 3,5,1

3,1,1

1,4,1

5,4,1

2,4,1

8,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partition spark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

3,9,1

PK=1

PK=2

PK=3

Node 11 Java Driver

1,1,1 1,2,1

2,1,1

1,4,1

5,4,1

2,4,1

8,4,1

9,4,1

11,4,3,9,1

3,1,1

spark.cassandra.output.batch.grouping.key partition spark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

PK=1

PK=2

Node 11 Java Driver

1,1,1 1,2,1

2,1,1

3,1,1

1,4,1

5,4,1

2,4,1

8,4,1

9,4,1

11,4,3,9,1 spark.cassandra.output.batch.grouping.key partition spark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

PK=1

PK=2

PK=3

Node 11 Java Driver

1,1,1 1,2,1

2,1,1

3,1,1

1,4,1

5,4,1

2,4,1

8,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partition spark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

3,9,1

PK=1

PK=2

PK=3

Node 11 Java Driver

2,1,1

3,1,1

5,4,1

2,4,1

8,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partition spark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

3,9,1

PK=2

PK=3

Node 11 Java Driver

2,1,1

3,1,1

5,4,1

2,4,1

8,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partition spark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

3,9,1

PK=2

PK=3

PK=5

Node 11 Java Driver

2,1,1

3,1,1

5,4,1

2,4,1

8,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partition spark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

3,9,1

PK=2

PK=3

PK=5

Node 11 Java Driver

2,1,1

3,1,1

5,4,1

2,4,18,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partition spark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

3,9,13,9,1

PK=2

PK=3

PK=5

Node 11 Java Driver

2,1,1

3,1,1

5,4,1

2,4,18,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partition spark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

3,9,13,9,1

Write Acknowledged PK=2

PK=3

PK=5

Node 11 Java Driver

2,1,1

3,1,1

5,4,1

2,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partition spark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

8,4,1

3,9,1

PK=2

PK=3

PK=5

Node 11 Java Driver

3,1,1

5,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partition spark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

8,4,1

3,9,1

PK=3

PK=5

Node 11 Java Driver

3,1,1

5,4,1

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partition spark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

8,4,1

3,9,1

PK=8

PK=3

PK=5

Node 11 Java Driver

9,4,1

11,4, spark.cassandra.output.batch.grouping.key partition spark.cassandra.output.batch.size.rows 4spark.cassandra.output.batch.grouping.buffer.size 3spark.cassandra.output.concurrent.writes 2

3,1,1

5,4,1

8,4,1

3,9,1

PK=8

PK=3

PK=5

@chbatey

Example

@chbatey

Weather Station Analysis•Weather station collects data•Cassandra stores in sequence• Spark rolls up data into new

tables

Windsor CaliforniaJuly 1, 2014

High: 73.4FLow : 51.4F

raw_weather_dataCREATE TABLE raw_weather_data ( weather_station text, // Composite of Air Force Datsav3 station number and NCDC WBAN number year int, // Year collected month int, // Month collected day int, // Day collected hour int, // Hour collected temperature double, // Air temperature (degrees Celsius) dewpoint double, // Dew point temperature (degrees Celsius) pressure double, // Sea level pressure (hectopascals) wind_direction int, // Wind direction in degrees. 0-359 wind_speed double, // Wind speed (meters per second) sky_condition int, // Total cloud cover (coded, see format documentation) sky_condition_text text, // Non-coded sky conditions one_hour_precip double, // One-hour accumulated liquid precipitation (millimeters) six_hour_precip double, // Six-hour accumulated liquid precipitation (millimeters) PRIMARY KEY ((weather_station), year, month, day, hour) ) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC, hour DESC);

Reverses data in the storage engine.

@chbatey

Primary key relationship

PRIMARY KEY ((weatherstation_id),year,month,day,hour)

Primary key relationship

PRIMARY KEY ((weatherstation_id),year,month,day,hour)

Partition Key

Primary key relationship

PRIMARY KEY ((weatherstation_id),year,month,day,hour)

Partition Key Clustering Columns

WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC, hour DESC);

Primary key relationship

PRIMARY KEY ((weatherstation_id),year,month,day,hour)

Partition Key Clustering Columns

10010:99999

WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC, hour DESC);

2005:12:1:7:temp

-5.6

Primary key relationship

PRIMARY KEY ((weatherstation_id),year,month,day,hour)

Partition Key Clustering Columns

10010:99999-5.1

2005:12:1:8:temp

-4.9

2005:12:1:9:temp

-5.3

2005:12:1:10:temp

WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC, hour DESC);

Data Locality

weatherstation_id=‘10010:99999’ ?

1000 Node Cluster

You are here!

Query patterns•Range queries• “Slice” operation on disk

SELECT weatherstation,hour,temperature FROM raw_weather_data WHERE weatherstation_id=‘10010:99999' AND year = 2005 AND month = 12 AND day = 1 AND hour >= 7 AND hour <= 10;

Single seek on disk

2005:12:1:12

-5.4

2005:12:1:11

-4.9

2005:12:1:7

-5.6-5.1

2005:12:1:8

-4.9

2005:12:1:910010:99999

-5.3

2005:12:1:10

Partition key for locality

Query patterns•Range queries• “Slice” operation on disk

Programmers like this

Sorted by event_time

2005:12:1:7

-5.6

2005:12:1:8

-5.1

2005:12:1:9

-4.9

10010:99999

10010:99999

10010:99999

weather_station hour temperature

2005:12:1:10

-5.3

10010:99999

SELECT weatherstation,hour,temperature FROM raw_weather_data WHERE weatherstation_id=‘10010:99999' AND year = 2005 AND month = 12 AND day = 1 AND hour >= 7 AND hour <= 10;

weather_stationCREATE TABLE weather_station ( id text PRIMARY KEY, // Composite of Air Force Datsav3 station number and NCDC WBAN number name text, // Name of reporting station country_code text, // 2 letter ISO Country ID state_code text, // 2 letter state code for US stations call_sign text, // International station call sign lat double, // Latitude in decimal degrees long double, // Longitude in decimal degrees elevation double // Elevation in meters );

Lookup table

daily_aggregate_temperatureCREATE TABLE daily_aggregate_temperature ( weather_station text, year int, month int, day int, high double, low double, mean double, variance double, stdev double, PRIMARY KEY ((weather_station), year, month, day) ) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC);

SELECT high, low FROM daily_aggregate_temperature WHERE weather_station='010010:99999' AND year=2005 AND month=12 AND day=3;

high | low ------+------ 1.8 | -1.5

daily_aggregate_precipCREATE TABLE daily_aggregate_precip ( weather_station text, year int, month int, day int, precipitation counter, PRIMARY KEY ((weather_station), year, month, day) ) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC);

SELECT precipitation FROM daily_aggregate_precip WHERE weather_station='010010:99999' AND year=2005 AND month=12 AND day>=1 AND day <= 7;

0

10

20

30

40

1 2 3 4 5 6 7

17

26

20

33

12

0

Weather Station Stream Analysis•Weather station collects data•Data processed in stream•Data stored in Cassandra

Windsor CaliforniaTodayRainfall total: 1.2cmHigh: 73.4FLow : 51.4F

Incoming data from Kafka725030:14732,2008,01,01,00,5.0,-3.9,1020.4,270,4.6,2,0.0,0.0

@chbatey

Creating a Stream

@chbatey

Saving the raw data

@chbatey

Building an aggregateCREATE TABLE daily_aggregate_precip ( weather_station text, year int, month int, day int, precipitation counter, PRIMARY KEY ((weather_station), year, month, day) ) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC);

CQL Counter

@chbatey

Batch job on the fly?

(count: 24, mean: 14.428150, stdev: 7.092196, max: 28.034969, min: 0.675863)

(count: 11242, mean: 8.921956, stdev: 7.428311, max: 29.997986, min: -2.200000)

Weather data streaming

Load Generator or Data import

Apache KafkaProducer

Consumer

NodeGuardian

Dashboard

@chbatey

Summary• Cassandra - always-on operational database• Spark- Batch analytics- Stream processing and saving back to Cassandra

@chbatey

Thanks for listening• Follow me on twitter @chbatey• Cassandra + Fault tolerance posts a plenty: • http://christopher-batey.blogspot.co.uk/• Cassandra resources: http://planetcassandra.org/

top related