beyond nosql with spark and solr on cassandra
TRANSCRIPT
@natbusa | linkedin.com: Natalino Busa
Beyond NoSQLwith Spark and Solr on Cassandra
Natalino BusaLead Data Architect @ ING
@natbusa | linkedin.com: Natalino Busa
high available on multiple data centerstunable consistency
very scalable, low latencymasterless architecture, all nodes are equal
flexible data model (more on this later)
2D key-value storehashed access on partition keyssorted on clustering keys
Cassandra
@natbusa | linkedin.com: Natalino Busa
Cassandra: Write replication
ApplicationMurmur3 hashing.Hashes (token) are partitioned on the nodes
Consistent: Nreads + Nwrites > NreplicasTrade availability for consistency
Serialization (Paxos) since 2.0
Compaction and Repartitioninghappen in the background
@natbusa | linkedin.com: Natalino Busa
Application
replication factor 3
Cassandra: Write replication
Murmur3 hashing.Hashes (token) are partitioned on the nodes
Consistent: Nreads + Nwrites > NreplicasTrade availability for consistency
Serialization (Paxos) since 2.0
Compaction and Repartitioninghappen in the background
@natbusa | linkedin.com: Natalino Busa
Application
DC1: replication factor 3 DC2: replication factor 3
Cassandra: Write replication
@natbusa | linkedin.com: Natalino Busa
Cassandra Queries
Can you do …..
filtering: limited
transformations: limited
aggregations: limited
joins: no
Speed comes with a cost.… or shouldn’t it?
It’s NoSQL man.
@natbusa | linkedin.com: Natalino Busa
Cassandra+Solr+Spark Queries
Read OptimizedFixed and Designed at Write TimeNot Flexible, but very Fast
C* Solr SparkFast write-time Field IndexingMore indexes = More FlexibilityRicher QueriesLess planned queries
Ad-Hoc QueriesJoins, AggregateUser Defined FunctionsMachine Learning, Advanced Stats and Analytics
@natbusa | linkedin.com: Natalino Busa
Cassandra-Spark Connector
Cassandra: Store all the dataSpark: Analyze all the data
DC1: replication factor 3 DC2: replication factor 3 DC2: replication factor 3 + Spark Executors Transactional Analytics
@natbusa | linkedin.com: Natalino Busa
Cassandra-Spark Connector
Cassandra: Store all the dataSpark: Analyze all the data
DC1: replication factor 3 DC2: replication factor 3 DC2: replication factor 3 + Spark Executors Transactional Analytics
Application
@natbusa | linkedin.com: Natalino Busa
Cassandra-Spark Stack
Cassandra: Store all the data
Spark: Distributed Data ProcessingExecutors and WorkersCluster Resource Allocators: Mesos, Yarn, Spark
Connector: Data locality,Reduce ShufflingRDD’s to Cassandra
Partitions DC2: replication factor 3 + Spark Executors
Analytics
@natbusa | linkedin.com: Natalino Busa
Cassandra-Spark Stack
Cassandra-Spark Connector
Cassandra
Spark
Streaming SQL MLlib Graphx
Spark SQL: SQL library on Spark RDDsMLlib: Spark Machine Learning libraryGraphx: Spark Graph librarySpark Streaming: scheduling micro batches on Spark
DC2: replication factor 3 + Spark Executors
@natbusa | linkedin.com: Natalino Busa
Cassandra-Spark Stack: Use cases
Cassandra-Spark Connector
Cassandra
Spark
Streaming SQL MLlib Graphx
● Sanitize, Validate
● Transform and Enrich:
○ Model Scoring, Look-up-tables, UDFs
● Schema migration, Data Conversion
● Analytics: Stats and Model building
● Load/Save to other sources
@natbusa | linkedin.com: Natalino Busa
Cassandra-Spark Stack
Cassandra-Spark Connector
Cassandra
Spark
Streaming SQL MLlib Graphx
Extract Data
Create Models,Enrich, Transform
Fetch from other Sources: Kafka
Fetch from other Sources: DB’s, Files
API’s REST resourcesAPI’s (model scoring)
Analytics, Statistics, Data Science, Model Training
Solr
@natbusa | linkedin.com: Natalino Busa
Cassandra-Spark Stack
Cassandra-Spark Connector
Cassandra
Spark
Streaming SQL MLlib Graphx
Extract Data
Create Models,Enrich, Transform
Fetch from other Sources: Kafka
Fetch from other Sources: DB’s, Files
API’s REST resourcesAPI’s (model scoring)
Analytics, Statistics, Data Science, Model Training
This talk Solr
@natbusa | linkedin.com: Natalino Busa
Example: Analyze gowalla check-ins
year | month | day | time | uid | lat | lon | ts | vid------+-------+-----+------+--------+----------+-----------+--------------------------+--------- 2010 | 9 | 14 | 91 | 853 | 40.73474 | -73.87434 | 2010-09-14 00:01:31+0000 | 917955 2010 | 9 | 14 | 328 | 4516 | 40.72585 | -73.99289 | 2010-09-14 00:05:28+0000 | 37160 2010 | 9 | 14 | 344 | 2964 | 40.67621 | -73.98405 | 2010-09-14 00:05:44+0000 | 956870
Check-ins dataset
Venues dataset
vid | name | lat | long ------+-------+-----+------+--------+----------+-----------+--------------------------+---------754108 | My Suit NY | 40.73474 | -73.87434249755 | UA Court Street Stadium 12 | 40.72585 | -73.99289 6919688 | Sky Asian Bistro | 40.67621 | -73.98405
@natbusa | linkedin.com: Natalino Busa
Cassandra Tables
CREATE TABLE checkins ( year int, month int, day int, time int, ts timestamp, uid bigint, lat double, lon double, vid bigint, PRIMARY KEY ((year, month), day, time, uid)) WITH CLUSTERING ORDER BY (day DESC, ts DESC);
Let’s start with a table for check-ins:
@natbusa | linkedin.com: Natalino Busa
Cassandra queries: doThis query can be done in C* directly
/* query a given day */select * from lbsn.checkins where year=2010 and month=9 and day=13 limit 5;
year | month | day | time | uid | lat | lon | ts | vid------+-------+-----+------+--------+----------+-----------+--------------------------+--------- 2010 | 9 | 13 | 39 | 114507 | 40.80794 | -73.96409 | 2010-09-13 00:00:39+0000 | 78472 2010 | 9 | 13 | 138 | 37571 | 40.62563 | -74.25763 | 2010-09-13 00:02:18+0000 | 1663634 2010 | 9 | 13 | 147 | 11321 | 40.73396 | -73.99301 | 2010-09-13 00:02:27+0000 | 35372 2010 | 9 | 13 | 597 | 165226 | 40.64388 | -73.78281 | 2010-09-13 00:09:57+0000 | 23261 2010 | 9 | 13 | 641 | 12719 | 40.72978 | -74.00121 | 2010-09-13 00:10:41+0000 | 758447
@natbusa | linkedin.com: Natalino Busa
Cassandra queries: doThis query can be done in C* directly
/* range by date day */select * from checkins where year=2010 and month=9 and day<16 and day>13 limit 5;
year | month | day | time | uid | lat | lon | ts | vid------+-------+-----+------+--------+----------+-----------+--------------------------+--------- 2010 | 9 | 14 | 91 | 853 | 40.73474 | -73.87434 | 2010-09-14 00:01:31+0000 | 917955 2010 | 9 | 14 | 328 | 4516 | 40.72585 | -73.99289 | 2010-09-14 00:05:28+0000 | 37160 2010 | 9 | 14 | 344 | 2964 | 40.67621 | -73.98405 | 2010-09-14 00:05:44+0000 | 956870 2010 | 9 | 14 | 359 | 48555 | 40.76068 | -73.98699 | 2010-09-14 00:05:59+0000 | 3026508 2010 | 9 | 14 | 688 | 189786 | 40.71588 | -74.00663 | 2010-09-14 00:11:28+0000 | 1036251
@natbusa | linkedin.com: Natalino Busa
Cassandra queries: don’tTwo ranges cannot be provided… because of the ordering of the clustering keys
/* INVALID!: range by date and by time */select * from checkins where year=2010 and month=9 and \ day>13 and day<16 and time>61200 and time<64800 limit 5;
InvalidRequest: code=2200 [Invalid query] message="PRIMARY KEY column "time" cannot be restricted (preceding column "day" is restricted by a non-EQ relation)"
@natbusa | linkedin.com: Natalino Busa
Cassandra + SparkCassandra-Spark Connector: extract the data from Cassandra and create an RDD
Spark SQL: Takes the given RDD’s and operate on it
import com.datastax.spark.connector._import com.datastax.spark.connector.cql._
// by converting to a spark sql/dataframeimport org.apache.spark.sql.cassandra.CassandraSQLContextval cc = new CassandraSQLContext(sc)
val checkins = cc.sql("select ts, uid, vid from lbsn.checkins where day<16 and day>13")checkins.show(10)
@natbusa | linkedin.com: Natalino Busa
Cassandra + SparkCassandra-Spark Connector: if the query can be executed directly in cassandra, it would do so, otherwise it will extract what necessary and perform the rest of the query in Spark
val checkins = cc.sql("select ts, uid, vid from lbsn.checkins where year=2010 and month=9 and vid=57871")
checkins.show(10)
ts uid vid 2010-10-01 01:57:... 1684 57871
@natbusa | linkedin.com: Natalino Busa
Cassandra Tables
CREATE TABLE lbsn.venues ( vid bigint, name text, lat double, lon double, PRIMARY KEY (vid));
Let’s start with a table for venues::
@natbusa | linkedin.com: Natalino Busa
Cassandra + Spark: joins!
// joining tables val df_venues = cc.sql("select vid, name from lbsn.venues").as("venues").cache()val df_checkins = cc.sql("select ts, uid, lat, lon, vid from lbsn.checkins").as("checkins").cache()
val checkins_venues = df_checkins.join(df_venues, $"checkins.vid" === $"venues.vid", "inner").select("ts", "uid", "lat", "lon", "checkins.vid","name")
checkins_venues.show(5)
ts uid lat lon vid name 2010-07-01 02:47:... 578 40.7490532543 -73.9680397511 11831 United Nations 2010-07-02 18:27:... 991 40.7188502243 -73.99594579149999 818431 OK 218 2010-07-03 02:07:... 34359 40.7348441565 -73.9995288849 123831 Kingswood 2010-07-03 18:58:... 578 40.6838680433 -73.9786720276 105831 Pacific Street St...2010-07-03 19:53:... 2737 40.6906938667 -73.9956976167 197431 Floyd, NY
@natbusa | linkedin.com: Natalino Busa
Cassandra + Spark: UDF functionsimport org.apache.spark.sql.functions.udf
val func = (lat:Double, lon:Double) => clusters.predict(Vectors.dense(lat,lon))val sqlfunc = udf(func)
// add predictions as extra column, by using a user define function// remember that clusters closes over the udf, and is broadcasted to the various executorsval locs_cid = checkins_venues.withColumn("cluster", sqlfunc(checkins_venues("lat"), checkins_venues("lon")))
locs_cid.show(5)
ts uid lat lon vid name cluster2010-07-01 02:03:... 10231 40.663200646 -73.984763295 1225113 Thistle Hill Tavern 34 2010-07-01 02:06:... 4907 40.74101965 -73.99416911670001 1078263 Limelight Marketp... 47 2010-07-01 02:10:... 4929 40.747507 -73.989425 1175513 La Rosa Cigars 47 2010-07-01 02:14:... 26851 40.76823395 -73.95315415 164621 David Copperfields 38 2010-07-01 02:17:... 4929 40.74695265 -73.9857679833 141918 J.J. Hat Center 47
@natbusa | linkedin.com: Natalino Busa
Cassandra Tables + Solr Search
CREATE TABLE lbsn.venues ( vid bigint, name text, lat double, lon double, PRIMARY KEY (vid));
The table
<schema name="venues_search" version="1.5">
<fields>
<field name="vid" type="long" indexed="true" stored="true"/>
<field name="name" type="text" indexed="true" stored="true"/>
<field name="lat" type="double" indexed="true" stored="true"/>
<field name="long" type="double" indexed="true" stored="true"/>
</fields>
<defaultSearchField>name</defaultSearchField>
<uniqueKey>vid</uniqueKey>
</schema>
The search schema
@natbusa | linkedin.com: Natalino Busa
Cassandra Tables + Solr SearchTime to load the schema and start indexing
curl http://solr.cassandracluster:8983/solr/resource/lbsn.venues/solrconfig.xml --data-binary @solrconfig.xml -H 'Content-type:text/xml; charset=utf-8'
http://solr.cassandracluster:8983/solr/lbsn.venues/select?q=name:book*
{"response":{"numFound":1,"start":0,"docs":[
{
"vid":1301975,
"name":"Alabaster bookshop",
"lat":40.732748217,
"lon":-73.989666667
}]
}}
@natbusa | linkedin.com: Natalino Busa
Caveats: DSE Cassandra vs Community
Cassandra - Spark Connector works in boths
Some optimization to reduce Cassandra I/O are not available for the community edition
High-Available Multi-Master Spark: only on DSE
@natbusa | linkedin.com: Natalino Busa
Caveats: DSE Cassandra vs CommunityCassandra - Solr: DSE allows embedded queries in cassandra cql / spark-connector
select * from lbsn.venues where solr_query=”name:book*”
Cassandra + Solr or SolandraTwo loosely connected solutionsPossible data duplication, No or light integration
http://solrcluster:8983/solr/lbsn.venues/select?q=name:book*
@natbusa | linkedin.com: Natalino Busa
Cassandra + Spark + Solr Queries
Can you do …..
filtering: Yes
transformations: Yes
aggregations: Yes
joins: Yes
Is it the “Holy Grail” ?
https://en.wikipedia.org/wiki/Indiana_Jones#/media/File:Indianajones4.jpg
@natbusa | linkedin.com: Natalino Busa
Cassandra+Solr+Spark Queries
Read OptimizedFixed and Designed at Write TimeNot Flexible, but very Fast
C* Solr
SparkFast write-time Field IndexingMore indexes = More FlexibilityRicher QueriesLess planned queries
Ad-Hoc QueriesJoins, AggregateUser Defined FunctionsMachine Learning, Advanced Stats and Analytics
It’s a Performance and Flexibility tradeoff
@natbusa | linkedin.com: Natalino Busa
ResourcesSpark + Cassandra: Clustering Eventshttp://www.natalinobusa.com/2015/07/clustering-check-ins-with-spark-and.html
Datastax slidedeckshttp://www.slideshare.net/planetcassandra/a-cassandra-solr-spark-love-triangle-using-datastax-enterprise
http://www.slideshare.net/doanduyhai/spark-cassandra-connector-api-best-practices-and-usecases
Datastax docs on Search and Analyticshttp://docs.datastax.com/en/datastax_enterprise/4.7/datastax_enterprise/anaHome/anaHome.html
http://docs.datastax.com/en/datastax_enterprise/4.7/datastax_enterprise/srch/srchOverview.html
@natbusa | linkedin.com: Natalino Busa
ResourcesSolr tutorials and slidedeckhttp://www.slideshare.net/charliejuggler/lucene-solrlondonug-meetup28nov2014-solr-es-performance
http://www.solrtutorial.com/solr-query-syntax.html
Datasetshttps://snap.stanford.edu/data/loc-gowalla.htmlE. Cho, S. A. Myers, J. Leskovec. Friendship and Mobility: Friendship and Mobility: User Movement in Location-Based Social Networks ACM SIGKDD International Conference on Knowledge Discovery and Data Mining (KDD), 2011
https://code.google.com/p/locrec/downloads/detail?name=gowalla-dataset.zipThe project is being developed in the context of the SInteliGIS project financed by the Portuguese Foundation for Science and Technology (FCT) through project grant PTDC/EIA-EIA/109840/2009. .