![Page 1: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/1.jpg)
PySpark CassandraAnalytics with Cassandra and PySpark
+ +
![Page 2: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/2.jpg)
Frens Jan Rumph• Database and processing architect
at Target Holding
• Contact me at:[email protected]
![Page 3: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/3.jpg)
Target Holding• Machine Learning Company
• Timeseries:– Prediction, Search,
Anomaly detection, ...
• Text:– Search, matching (e.g. jobs
and resumes), ...
• Markets:– media– human resources– infrastructure
(energy, waterworks, ...)– health
![Page 4: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/4.jpg)
PySpark Cassandra• Techology background
– Cassandra, Spark and PySpark• PySpark Cassandra
– Introduction– Features and use cases– Getting started– Operators and examples
![Page 5: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/5.jpg)
Technology backgroundCassandra, Spark and PySpark
![Page 6: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/6.jpg)
Cassandra• Distributed database• Originated at
Facebook• Roots in Amazon
Dynamo
![Page 7: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/7.jpg)
Cassandra Query LanguageMain 'user interface' of Cassandra with a SQL feel (tables with rows)• DML
– Insert into ..., Select from ..., Update ..., Delete from ...
• DDL– Create table ..., Create index ...
• Column types:– Numbers, strings, etc.,– Collections (lists, sets and maps)– Counters
![Page 8: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/8.jpg)
Distribution and replication• Distributed map of
ordered maps– Under the hood some
updates in C* 3
• Consistent hashing• Replication along ring
– keys usually 'placed on ring' through hashing
Image by DataStax
![Page 9: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/9.jpg)
Local datastructures and 2i• Memtables• SSTables
– Ordered within partition on clustering columns
• Various caches• Various indices
• Materialized views– manually < C* 3.0– similar to normal tables
• Secondary indices– scatter gather model– 'normal'– or 'search'
![Page 10: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/10.jpg)
Spark• Distributed data
processing engine• 'Doesn't touch disk if
it doesn't have to'• Layers on top of data
sources– HDFS, Cassandra,
Elasticsearch, JDBC, ...
![Page 11: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/11.jpg)
Resiliant Distributed Dataset• Partitioned (and distributed) collection of rows• Part of computational graph
– RDD has linkage to 'source' RDD(s)
• DataFrame, DataSet / Frame with stronger typing and declarative querying layered on top
![Page 12: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/12.jpg)
Transformations and actions• Narrow transformations
are 'data-local'– map, filter, ...
• Wide transformations aren't– join, sort, reduce, group, ...
• Actions to– read results to the driver– write results to disk,
database, ...
Image by Apache
![Page 13: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/13.jpg)
Distribution / topology
Where your app 'lives'
Coordinate resourcesStandalone MESOS or YARN
Where the work happens
Image by Apache
![Page 14: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/14.jpg)
PySpark• Wrapper around Java APIs
– JVM for data shipping when working with Python RDDs– 'Query language' when working with DataFrames
• CPython interpreters as (extra) executors– essentially the multiprocessing model– but distributed– cpython executors forked per job (not application)
![Page 15: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/15.jpg)
Pickle• Object serialization shipped with Python• Pickle used for messaging between
CPython interpreters and JVM• cPickle / cloudpickle in CPython• Py4J in the JVM
![Page 16: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/16.jpg)
PySpark Cassandra
![Page 17: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/17.jpg)
PySpark Cassandra• Developed at Target Holding
– use a lot of python– and Cassandra– Spark option for processing
• Build on Spark Cassandra Connector– Datastax provides Spark Cassandra Connector– Python + Cassandra link was missing– PySpark Cassandra
![Page 18: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/18.jpg)
Features and use casesPySpark Cassandra
![Page 19: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/19.jpg)
Features• Distributed C* table scanning into RDD's• Writing RDD's and DStreams to C*• Joining RDD's and DStreams with C* tables
+ +
![Page 20: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/20.jpg)
Use cases• Perform bulk 'queries' you normally can't
(C* doesn't do group by or join)
takes a prohibitive amount of time /or just because it's easy once it's set up
• Data wrangling ('cooking' features, etc.)• As a streaming processing platform
![Page 21: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/21.jpg)
Use cases at Target HoldingPySpark Cassandra
![Page 22: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/22.jpg)
Media metadata processing• We disambiguated close to 90 million
'authorships' using names, contact details, publication keywords– In order to build a analytical
applications for a large publisher of scientific journals
• Spark / PySpark Cassandra for data wrangling
![Page 23: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/23.jpg)
Earthquake monitoring• We are building an monitoring system
for use with many low cost vibration sensors• Spark / PySpark Cassandra for
– Enriching the event stream– Saving the event stream– Bulk processing– Anomaly detection
![Page 24: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/24.jpg)
Processing time series data• We collect time series data in various fields
– Energy (electricity and gas usage)– Music (tracking online music and video portals)
• Spark / PySpark Cassandra for– data wrangling– rolling up data– bulk forecasting– anomaly detection
![Page 25: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/25.jpg)
Getting startedPySpark Cassandra
![Page 26: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/26.jpg)
Getting started• 'Homepage': github.com/
TargetHolding/pyspark-cassandra
• Available on: spark-packages.org/package/TargetHolding/pyspark-cassandra
• Also read: github.com/datastax/spark-cassandra-connector
![Page 27: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/27.jpg)
Compatibility• Spark 1.5 and 1.6
– (supported older versions in the past)
• Cassandra 2.1.5, 2.2 and 3
• Python 2.7 and 3
+ +
![Page 28: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/28.jpg)
High over• Read from and write to C* using Spark as a
colocated distributed processing platform
Image by DataStax
![Page 29: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/29.jpg)
Software setup
Python partPySpark Cassandra
Scala partPySpark Cassandra
Datastax Spark Cassandra Connector
PySpark Spark
CPython JVM JVM
Cassandra
Application
![Page 30: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/30.jpg)
Submit scriptspark-submit --packages TargetHolding/pyspark-cassandra:0.3.5 --conf spark.cassandra.connection.host=cas1,cas2,cas3 --master spark://spark-master:7077 yourscript.py
import ...
conf = SparkConf()
sc = CassandraSparkContext(conf=conf)
# your script
![Page 31: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/31.jpg)
PySpark shellIPYTHON_OPTS=notebookPYSPARK_DRIVER_PYTHON=ipythonpyspark --packages TargetHolding/pyspark-cassandra:0.3.5 --conf ... ...
import pyspark_cassandra
![Page 32: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/32.jpg)
Operators and examplesPySpark Cassandra
![Page 33: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/33.jpg)
Operators• Scan• Project (select)• Filter (where)• Limit, etc.• Count
• 'Spanning'• Join• Save
![Page 34: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/34.jpg)
Scan• cassandraTable() to scanning C*
– Determine basic token ranges– Group them into partitions
• taking size and location into account– Execute (concurrent) CQL queries to C*
rows = sc.cassandraTable('keyspace', 'table')
![Page 35: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/35.jpg)
Scan• Basically executing this query many times:SELECT columnsFROM keyspace.tableWHERE token(pk) > ? and token(pk) < ? filterORDER BY ...LIMIT ...ALLOW FILTERING
![Page 36: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/36.jpg)
Scan• Quite tunable if neccessary
sc.cassandraTable('keyspace', 'table', row_format=RowFormat.DICT, # ROW, DICT or TUPLE split_count=1000, # no partitions (splits) split_size=100000, # size of a partition fetch_size=1000, # query page size consistency_level='ALL', metrics_enabled=True)
![Page 37: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/37.jpg)
Project / Select• To make things go a little faster,
select only the columns you need.– This saves in communication:
C* ↔ Spark JVM ↔ CPython
sc.cassandraTable(...).select('col1', 'col2', ...)
![Page 38: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/38.jpg)
TypesCQL Python
ascii unicode string
bigint long
blob bytearray
boolean boolean
counter int, long
decimal decimal
double float
float float
inet str
int int
CQL Python
set set
list list
text unicode string
timestamp datetime.datetime
timeuuid uuid.UUID
varchar unicode string
varint long
uuid uuid.UUID
UDT pyspark_cassandra.UDT
![Page 39: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/39.jpg)
Key by primary key• Cassandra RDD's can be keyed by primary key
– yielding an RDD of key value pairs– Keying by partition key not yet supported
sc.cassandraTable(...).by_primary_key()
![Page 40: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/40.jpg)
Filter / where• Clauses on primary keys, clustering columns or
secondary indices can be pushed down– If a where with allow filtering works in cql
• Otherwise resort to RDD.filter or DF.filter
sc.cassandraTable(...).where(
'col2 > ?', datetime.now() - timedelta(days=14)
)
![Page 41: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/41.jpg)
Combine with 2i• With the cassandra lucene indexsc.cassandraTable(...).where('lucene = ?', '''{ filter : { field: "loc", type: "geo_bbox", min_latitude: 53.217, min_longitude: 6.521, max_latitude: 53.219, max_longitude: 6.523 }}''')
![Page 42: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/42.jpg)
Limit, take and first• limit() the number of rows per query
– there are as least as many queries as there are token ranges
• take(n) at most n rows from the RDD– applying limit to make it just a tad bit faster
sc.cassandraTable(...).limit(1)...
sc.cassandraTable(...).take(3)
sc.cassandraTable(...).first()
![Page 43: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/43.jpg)
Push down count• cassandraCount() pushes down count(*)
queries down to C*– counting in partitions and then reduced
• When all you want to do is count records in C*– doesn't force caching
sc.cassandraTable(...).cassandraCount()
![Page 44: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/44.jpg)
Spanning• Wide rows in C* are retrieved in order, are
consecutive and don't cross partition boundaries• spanBy() is like groupBy() for wide rows
sc.cassandraTable(...).spanBy('doc_id')
![Page 45: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/45.jpg)
Save• Save any PySpark RDD to C*
– for as long as it consists of dicts, tuples or Rows
rdd.saveToCassandra('keyspace', 'table', ...)
![Page 46: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/46.jpg)
Saverows = [dict(
key = k,
stamp = datetime.now(),
val = random() * 10,
tags = ['a', 'b', 'c'],
options = dict(foo='bar', baz='qux'),
) for k in ('x', 'y', 'z')]
rdd = sc.parallelize(rows)
rdd.saveToCassandra('keyspace', 'table')
![Page 47: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/47.jpg)
Saverdd.saveToCassandra(...,
columns = ('col1', 'col2'), # The columns to save/ how to interpret the elements in a tuple
row_format = RowFormat.DICT,# RDD format hint
keyed=True, # Whether RDD are key value pairs
)
![Page 48: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/48.jpg)
Saverdd.saveToCassandra(..., batch_size = 16*1024, # Max. size of a batch batch_buffer_size = 1000, # Max. pending batches batch_grouping_key = "partition" # How batches are formed any / replicaset / partition consistency_level = "LOCAL_ONE", parallelism_level = 8, # Max. batches in flight throughput_mibps = MAX_LONG,# Max. MB/s ttl = timedelta(days=3), # TTL metrics_enabled = False)
![Page 49: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/49.jpg)
Save DStream• Just like saving an RDD!• But then for every micro-batch in the DStream
dstream
.map(lambda e: e[1]) .filter(lambda v: v > 3)
.saveToCassandra('keyspace', 'table', ...)
![Page 50: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/50.jpg)
Join• Join (inner) any RDD with a C* table• No outer joins supported
rdd.joinWithCassandraTable('keyspace', 'table')
.on('id')
.select('col1', 'col2')
![Page 51: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/51.jpg)
Join• Query per PK in left RDD
for row(s) in joined table• Somewhat similar to hash join• Usual caveats of skew, 'shuffle' overhead, etc.
• (repartitionByCassandraReplica not yet supported)
![Page 52: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/52.jpg)
Future workPySpark Cassandra
![Page 53: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/53.jpg)
Future work• Main engine:
– Build on DF/S more?– Build on native python cassandra driver?– Use marshal or ... instead of pickle?
• More features:– repartitioning– multi cluster– expose C* session to python
• Suggestions?
![Page 54: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/54.jpg)
We're hiringhttp://www.target-holding.nl/organisatie/vacaturesdata scientists, backend and database engineers, webdevelopers, ...
![Page 55: Meetupfiles.meetup.com/16413072/PySpark Cassandra - Frens... · +,$+012/31*451/+, -. / . ) . # * + * + ' ( " 1 1 8](https://reader034.vdocuments.us/reader034/viewer/2022050400/5f7dbc2e320cb7692c0f4f54/html5/thumbnails/55.jpg)
Q&A