scaling with couchbase, kafka and apache spark

58
Scaling with Couchbase, Kafka and Apache Spark Matt Ingenthron, Sr. Director SDK Engineering and Developer Advocacy

Upload: couchbase

Post on 05-Aug-2015

302 views

Category:

Software


7 download

TRANSCRIPT

Page 1: Scaling with Couchbase, Kafka and Apache Spark

Scaling with Couchbase, Kafka and Apache Spark

Matt Ingenthron, Sr. Director SDK Engineering and Developer Advocacy

Page 2: Scaling with Couchbase, Kafka and Apache Spark

2

About Me

Matt IngenthronWorked on large site scalability problems at previous company…

memcached contributor

Joined Couchbase very early and helped define key parts of system @ingenthr

Page 3: Scaling with Couchbase, Kafka and Apache Spark

A Brief Aside: What is Couchbase

Page 4: Scaling with Couchbase, Kafka and Apache Spark

©2015 Couchbase Inc. 4

Couchbase provides a complete Data Management solution

4

High availability cache

Key-value store

Document database

Embedded database

Sync management

Multi-purpose capabilities support a broad range of apps and use cases

Enterprises often start with cache, then broaden usage to other apps and use cases

Page 5: Scaling with Couchbase, Kafka and Apache Spark

©2015 Couchbase Inc. 5

What makes Couchbase unique?

Performance & scalability

leaderSub millisecond latency with high throughput; memory-centric architecture

Multi-purpose

Simplified administrationEasy to deploy & manage; integrated Admin Console, single-click cluster expansion & rebalance

Cache, key value store, document database, and local/mobile database in single platform

Always-on availability

Data replication across nodes, clusters, and data centers

Enterprises choose Couchbase for several key advantages

24x365

Page 6: Scaling with Couchbase, Kafka and Apache Spark

©2015 Couchbase Inc. 6

Couchbase Server Architecture

Single-node type means easier administration and scaling

Single installation

Two major components/processes: Data manager cluster manager

Data manager: C/C++ Layer consolidation of caching and

persistence

Cluster manager: Erlang/OTP Administration UI’s Out-of-band for data requests

Page 7: Scaling with Couchbase, Kafka and Apache Spark

©2014 Couchbase Inc.

ACTIVE ACTIVE ACTIVE

REPLICA REPLICA REPLICA

Couchbase Server 1 Couchbase Server 2 Couchbase Server 3

Basic Operation

7

SHARD5

SHARD2

SHARD9

SHARD SHARD SHARD

SHARD4

SHARD7

SHARD8

SHARD SHARD SHARD

SHARD1

SHARD3

SHARD6

SHARD SHARD SHARD

SHARD4

SHARD1

SHARD8

SHARD SHARD SHARD

SHARD6

SHARD3

SHARD2

SHARD SHARD SHARD

SHARD7

SHARD9

SHARD5

SHARD SHARD SHARD

Application has single logical connection to cluster (client object)

• Data is automatically sharded resulting in even document data distribution across cluster

• Each vbucket replicated 1, 2 or 3 times (“peer-to-peer” replication)

• Docs are automatically hashed by the client to a shard’

• Cluster map provides location of which server a shard is on

• Every read/write/update/delete goes to same node for a given key

• Strongly consistent data access (“read your own writes”)

• A single Couchbase node can achieve 100k’s ops/sec so no need to scale reads

Page 8: Scaling with Couchbase, Kafka and Apache Spark

©2014 Couchbase Inc.

Add Nodes to Cluster

8

ACTIVE ACTIVE ACTIVE

REPLICA REPLICA REPLICA

Couchbase Server 1 Couchbase Server 2 Couchbase Server 3

ACTIVE ACTIVE

REPLICA REPLICA

Couchbase Server 4 Couchbase Server 5

SHARD5

SHARD2

SHARD SHARD

SHARD4

SHARD SHARD

SHARD1

SHARD3

SHARD SHARD

SHARD4

SHARD1

SHARD8

SHARD SHARD SHARD

SHARD6

SHARD3

SHARD2

SHARD SHARD SHARD

SHARD7

SHARD9

SHARD5

SHARD SHARD SHARD

SHARD7

SHARD

SHARD6

SHARD

SHARD8

SHARD9

SHARD

READ/WRITE/UPDATE

Application has single logical connection to cluster (client object)

Multiple nodes added or removed at once

One-click operation

Incremental movement of active and replica vbuckets and data

Client library updated via cluster map

Fully online operation, no downtime or loss of performance

Page 9: Scaling with Couchbase, Kafka and Apache Spark

Couchbase in Big Data

a.k.a. the main program

Page 10: Scaling with Couchbase, Kafka and Apache Spark

©2015 Couchbase Inc. 10

Lambda Architecture

1

4

5

DATA

SERVE

QUERY

New Data Stream

Analysis

All DataPrecompute

Views (Map Reduce)

Process Stream

Incremental Views

BatchRecompute

Real-TimeIncrement

Batch Layer

Serving Layer

Speed Layer

2 BATCH

3 SPEED

Page 11: Scaling with Couchbase, Kafka and Apache Spark

©2015 Couchbase Inc. 11

Lambda Architecture

New Data Stream

Merged View

All DataPrecompute

Views (Map Reduce)

Process Stream

Incremental Views

Partial Aggregat

e

Partial Aggregat

e

Partial Aggregat

e

Real-Time Data

BatchRecompute

Batch Views

Real-Time Views

Real-TimeIncrement

Merge

Batch Layer

Serving Layer

Speed Layer

Page 12: Scaling with Couchbase, Kafka and Apache Spark

Lambda + Couchbase

New Data Stream

Merged View

All DataPrecompute

Views (Map Reduce)

Process Stream

Incremental Views

Partial Aggregat

e

Partial Aggregat

e

Partial Aggregat

e

Real-Time Data

BatchRecompute

Batch Views

Real-Time Views

Real-TimeIncrement

Merge

Batch Layer

Serving Layer

Speed Layer

Page 13: Scaling with Couchbase, Kafka and Apache Spark

Lamba + Hadoop + Spark + Storm

New Data Stream

Merged View

All DataPrecompute

Views (Map Reduce)

Process Stream

Incremental Views

Partial Aggregat

e

Partial Aggregat

e

Partial Aggregat

e

Real-Time Data

BatchRecompute

Batch Views

Real-Time Views

Real-TimeIncrement

Merge

Batch Layer

Serving Layer

Speed Layer

Couchbase HadoopConnector (Sqoop)

Page 14: Scaling with Couchbase, Kafka and Apache Spark

COMPLEX EVENT PROCESSING

Real TimeREPOSITORY

PERPETUALSTORE

ANALYTICALDB

BUSINESSINTELLIGENCE

MONITORING

CHAT/VOICESYSTEM

BATCHTRACK

REAL-TIMETRACK

DASHBOARD

Page 15: Scaling with Couchbase, Kafka and Apache Spark

TRACKING and COLLECTION

ANALYSIS ANDVISUALIZATION

REST FILTER METRICS

Page 16: Scaling with Couchbase, Kafka and Apache Spark

Integration at Scale

Page 17: Scaling with Couchbase, Kafka and Apache Spark

17

Requirements for data streaming in modern systems…

• Must support high throughput and low latency• Need to handle failures

• Pick up where you left off• Be efficient about resource usage

Page 18: Scaling with Couchbase, Kafka and Apache Spark

Data Sync is the Heart of Any Big Data System

Fundamental piece of the architecture- Data Sync maintains Data Redundancy for High

Availability (HA) & Disaster Recovery (DR)- Protect against failures – node, rack, region etc.

- Data Sync maintains Indexes- Indexing is key to building faster access paths to query data- Spatial, Full-text

DCP and Couchbase Server Architecture

Page 19: Scaling with Couchbase, Kafka and Apache Spark

Database Change Protocol: DCP

Page 20: Scaling with Couchbase, Kafka and Apache Spark

20

OrderingTo build interesting features a streaming protocol needs to have a concept of when operations happened.

Couchbase operation ordering at the node level:

Each mutation is assigned a sequence numberSequence numbers increase monotonically Sequence numbers are assigned on a per VBucket basis

Restart-abilityNeed to handle failures with grace, in particular being efficient about the amount of data being moved around.

Consistency pointsPoints in time for incremental backup, query consistency.

PerformanceRecognize that durability on a distributed system may have different definitions.

Design Goals

Page 21: Scaling with Couchbase, Kafka and Apache Spark

21

Version History (Failover Example)

VB UUID Seqno

X 0

VB UUID Seqno

DCP Replication Stream

X 0

Last Seqno: 100 Last Seqno: 0Last Seqno: 100

Active ReplicaActive

100Y

Last Seqno: 0

Page 22: Scaling with Couchbase, Kafka and Apache Spark

22

Restartability Example

VB UUID Seqno

X 0

VB UUID Seqno

DCP Replication Stream

X 0

Last Seqno: 80 Last Seqno: 150

Down Active

100Y

Replica

0 <= 80 <= 100

Y 100

Page 23: Scaling with Couchbase, Kafka and Apache Spark

23

Restartability Example (With Rollback)

VB UUID Seqno

X 0

VB UUID Seqno

DCP Replication Stream

X 0

Last Seqno: 110 Last Seqno: 150

Down Active

100Y

Replica

0 <= 110 <= 100

Last Seqno: 100

0 <= 100 <= 100

Y 100

Page 24: Scaling with Couchbase, Kafka and Apache Spark

Demo

Page 25: Scaling with Couchbase, Kafka and Apache Spark

25

Shopper Tracking(click stream)

Lightweight Analytics:• Department shopped• Tech platform• Click tracks by Income

Heavier Analytics, Develop Profiles

Page 26: Scaling with Couchbase, Kafka and Apache Spark

26

And at scale…

Page 27: Scaling with Couchbase, Kafka and Apache Spark

N1QL Queryand Apache Spark

Introduction & Integration

Page 28: Scaling with Couchbase, Kafka and Apache Spark

What do Application Developers Need

NoSQL databases provide Excellent performance and scalability Simplicity for working with data in aggregates A modern toolset for integration with mobile and web applications

Before N1QL, they tend to come with some unexpected baggage Map-Reduce for processing is great for some types of processing,

but is not expressive enough for other kinds Some say “but no joins” or have odd ways of duplicating data to

make it fit the query model

Page 29: Scaling with Couchbase, Kafka and Apache Spark

Key Concepts

Page 30: Scaling with Couchbase, Kafka and Apache Spark

What is N1QL?

N1QL is a new query language that systematically extends SQL to

document-oriented data

N1QL extends SQL to handle data that is:

Nested: Contains nested objects, arrays

Heterogeneous: Schema-optional, non-uniform

Distributed: Partitioned across a cluster

Page 31: Scaling with Couchbase, Kafka and Apache Spark

Language Highlights

Page 32: Scaling with Couchbase, Kafka and Apache Spark

SELECT

Standard SELECT pipeline

SELECT, FROM, WHERE, GROUP BY, ORDER BY, LIMIT, OFFSET

JOINs (by keys)

Subqueries

Aggregation

Set operators

UNION, INTERSECT, EXCEPT

EXPLAIN

Page 33: Scaling with Couchbase, Kafka and Apache Spark

Metadata access

Couchbase has metadata about documents in addition to the document values itself.

The META() function allows for this kind of access

select META(`travel-sample`) from `travel-sample` limit 1; { "$1": { "cas": 1.60955901476865e+14, "flags": 0, "id": "landmark_25257", "type": "json" } }

Page 34: Scaling with Couchbase, Kafka and Apache Spark

Metadata access

Couchbase has metadata about documents in addition to the document values itself.

The META() function allows for this kind of access

select META(`travel-sample`).id from `travel-sample` limit 10; { "id": "landmark_26021" }, { "id": "airport_1421" },…

Page 35: Scaling with Couchbase, Kafka and Apache Spark

Indexing

CREATE / DROP INDEX

Two types of indexes

View indexes

GSI indexes (new secondary indexes)

Page 36: Scaling with Couchbase, Kafka and Apache Spark

Document-oriented Extensions

Nested Paths—user.profile.email, children[0], children[0:2]

NEST, UNNEST

Ranging—FOR EVERY child IN children

Transformations—SELECT {“name”: first_name || last_name}

Non-uniform IS MISSING

Type checking and conversion

Distributed Direct lookup—USE KEYS

Efficient joins—ON KEYS

Page 37: Scaling with Couchbase, Kafka and Apache Spark

Apache Spark

Page 38: Scaling with Couchbase, Kafka and Apache Spark

What is Spark?

Apache is a fast and general engine for large-scale data processing.

Page 39: Scaling with Couchbase, Kafka and Apache Spark

Spark Components

Spark Core: RDDs, Clustering, Execution, Fault Management

Page 40: Scaling with Couchbase, Kafka and Apache Spark

Spark Components

Spark SQL: Work with structured data, distributed SQL querying

Page 41: Scaling with Couchbase, Kafka and Apache Spark

Spark Components

Spark Streaming: Build fault-tolerant streaming applications

Page 42: Scaling with Couchbase, Kafka and Apache Spark

Spark Components

Mlib: Machine Learning built in

Page 43: Scaling with Couchbase, Kafka and Apache Spark

Spark Components

GraphX: Graph processing and graph-parallel computations

Page 44: Scaling with Couchbase, Kafka and Apache Spark

Daytona GraySort Performance

Hadoop MR Record Spark Record

Data Size 102.5 TB 100 TB

Elapsed Time 72 mins 23 mins

# Nodes 2100 206

# Cores 50400 physical 6592 virtual

Cluster Disk Throughput 3150 GB/s 618 GB/s

Network Dedicated DC, 10Gbps EC2, 10Gbps

Sort Rate 1.42 TB/min 4.27 TB/min

Sort Rate/Node 0.67 GB/min 20.7 GB/min

Source: http://databricks.com/blog/2014/11/05/spark-officially-sets-a-new-record-in-large-scale-sorting.htmlBenchmark: http://sortbenchmark.org/

Page 45: Scaling with Couchbase, Kafka and Apache Spark

How does it work?

Resilient Distributed Datatypes paper: https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf

RDDCreation

SchedulingDAG

TaskExecution

Page 46: Scaling with Couchbase, Kafka and Apache Spark

How does it work?

Resilient Distributed Datatypes paper: https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf

RDDCreation

SchedulingDAG

TaskExecution

Page 47: Scaling with Couchbase, Kafka and Apache Spark

How does it work?

Resilient Distributed Datatypes paper: https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf

RDDCreation

SchedulingDAG

TaskExecution

Page 48: Scaling with Couchbase, Kafka and Apache Spark

Spark vs Hadoop

• Spark is RAM while Hadoop is HDFS (disk) bound

• API easier to reason about & to develop against

• Fully compatible with Hadoop Input/Output formats

• Hadoop more mature, Spark ecosystem growing fast

Page 49: Scaling with Couchbase, Kafka and Apache Spark

Ecosystem Flexibility

RDBMS

StreamsWeb APIs

DCPKVN1QLViews

BatchingArchived Data

OLTP

Page 50: Scaling with Couchbase, Kafka and Apache Spark

Infrastructure Consolidation

StreamsWeb APIs

UserInteraction

Page 51: Scaling with Couchbase, Kafka and Apache Spark

51

What does this look like?

Read a sequence of RDDs in, and apply a few transformations…

sc.couchbaseGet[JsonDocument(Seq( "21st_amendment_brewery_cafe-21a_ipa", "aass_brewery-genuine_pilsner")) .map(doc => doc.content().getString("name")) .collect() .foreach(println)

Page 52: Scaling with Couchbase, Kafka and Apache Spark

52

Reading a Couchbase Secondary Index…

Query a Couchbase View, subfilter, and then map over them…

// Read the first 10 rows and load their full documentsval beers = sc.couchbaseView(ViewQuery.from("beer", "brewery_beers")) .map(_.id) .couchbaseGet[JsonDocument]() .filter(doc => doc.content().getString("type") == "beer") .cache() // Calculate the mean for all beersprintln(beers .map(doc => doc.content().getDouble("abv").asInstanceOf[Double]) .mean())

Page 53: Scaling with Couchbase, Kafka and Apache Spark

Couchbase Connector

Spark Core Automatic Cluster and Resource Management Creating and Persisting RDDs Java APIs in addition to Scala (planned before GA)

Spark SQL Easy JSON handling and querying Tight N1QL Integration (dp2)

Spark Streaming Persisting DStreams DCP source (planned before GA)

Page 54: Scaling with Couchbase, Kafka and Apache Spark

Connector Facts

• Current Version: 1.0.0-dp• Beta in July• GA planned for Q3

Code:https://github.com/couchbaselabs/couchbase-spark-connector

Docs until GA: https://github.com/couchbaselabs/couchbase-spark-connector/wiki

Page 55: Scaling with Couchbase, Kafka and Apache Spark

Demo #2

Page 56: Scaling with Couchbase, Kafka and Apache Spark

Questions

Page 57: Scaling with Couchbase, Kafka and Apache Spark

Couchbase Spark Samples:https://github.com/couchbaselabs/couchbase-spark-samples

Couchbase Spark Connector:http://blog.couchbase.com/introducing-the-couchbase-spark-connectorhttps://github.com/couchbaselabs/couchbase-spark-connector

Couchbase Kafka Connector:http://blog.couchbase.com/introducing-the-couchbase-kafka-connectorhttps://github.com/couchbase/couchbase-kafka-connector

Actions!

Page 58: Scaling with Couchbase, Kafka and Apache Spark

Matt Ingenthron @ingenthr

Thanks