scaling with couchbase, kafka and apache spark

Post on 05-Aug-2015

302 Views

Category:

Software

7 Downloads

Preview:

Click to see full reader

TRANSCRIPT

Scaling with Couchbase, Kafka and Apache Spark

Matt Ingenthron, Sr. Director SDK Engineering and Developer Advocacy

2

About Me

Matt IngenthronWorked on large site scalability problems at previous company…

memcached contributor

Joined Couchbase very early and helped define key parts of system @ingenthr

A Brief Aside: What is Couchbase

©2015 Couchbase Inc. 4

Couchbase provides a complete Data Management solution

4

High availability cache

Key-value store

Document database

Embedded database

Sync management

Multi-purpose capabilities support a broad range of apps and use cases

Enterprises often start with cache, then broaden usage to other apps and use cases

©2015 Couchbase Inc. 5

What makes Couchbase unique?

Performance & scalability

leaderSub millisecond latency with high throughput; memory-centric architecture

Multi-purpose

Simplified administrationEasy to deploy & manage; integrated Admin Console, single-click cluster expansion & rebalance

Cache, key value store, document database, and local/mobile database in single platform

Always-on availability

Data replication across nodes, clusters, and data centers

Enterprises choose Couchbase for several key advantages

24x365

©2015 Couchbase Inc. 6

Couchbase Server Architecture

Single-node type means easier administration and scaling

Single installation

Two major components/processes: Data manager cluster manager

Data manager: C/C++ Layer consolidation of caching and

persistence

Cluster manager: Erlang/OTP Administration UI’s Out-of-band for data requests

©2014 Couchbase Inc.

ACTIVE ACTIVE ACTIVE

REPLICA REPLICA REPLICA

Couchbase Server 1 Couchbase Server 2 Couchbase Server 3

Basic Operation

7

SHARD5

SHARD2

SHARD9

SHARD SHARD SHARD

SHARD4

SHARD7

SHARD8

SHARD SHARD SHARD

SHARD1

SHARD3

SHARD6

SHARD SHARD SHARD

SHARD4

SHARD1

SHARD8

SHARD SHARD SHARD

SHARD6

SHARD3

SHARD2

SHARD SHARD SHARD

SHARD7

SHARD9

SHARD5

SHARD SHARD SHARD

Application has single logical connection to cluster (client object)

• Data is automatically sharded resulting in even document data distribution across cluster

• Each vbucket replicated 1, 2 or 3 times (“peer-to-peer” replication)

• Docs are automatically hashed by the client to a shard’

• Cluster map provides location of which server a shard is on

• Every read/write/update/delete goes to same node for a given key

• Strongly consistent data access (“read your own writes”)

• A single Couchbase node can achieve 100k’s ops/sec so no need to scale reads

©2014 Couchbase Inc.

Add Nodes to Cluster

8

ACTIVE ACTIVE ACTIVE

REPLICA REPLICA REPLICA

Couchbase Server 1 Couchbase Server 2 Couchbase Server 3

ACTIVE ACTIVE

REPLICA REPLICA

Couchbase Server 4 Couchbase Server 5

SHARD5

SHARD2

SHARD SHARD

SHARD4

SHARD SHARD

SHARD1

SHARD3

SHARD SHARD

SHARD4

SHARD1

SHARD8

SHARD SHARD SHARD

SHARD6

SHARD3

SHARD2

SHARD SHARD SHARD

SHARD7

SHARD9

SHARD5

SHARD SHARD SHARD

SHARD7

SHARD

SHARD6

SHARD

SHARD8

SHARD9

SHARD

READ/WRITE/UPDATE

Application has single logical connection to cluster (client object)

Multiple nodes added or removed at once

One-click operation

Incremental movement of active and replica vbuckets and data

Client library updated via cluster map

Fully online operation, no downtime or loss of performance

Couchbase in Big Data

a.k.a. the main program

©2015 Couchbase Inc. 10

Lambda Architecture

1

4

5

DATA

SERVE

QUERY

New Data Stream

Analysis

All DataPrecompute

Views (Map Reduce)

Process Stream

Incremental Views

BatchRecompute

Real-TimeIncrement

Batch Layer

Serving Layer

Speed Layer

2 BATCH

3 SPEED

©2015 Couchbase Inc. 11

Lambda Architecture

New Data Stream

Merged View

All DataPrecompute

Views (Map Reduce)

Process Stream

Incremental Views

Partial Aggregat

e

Partial Aggregat

e

Partial Aggregat

e

Real-Time Data

BatchRecompute

Batch Views

Real-Time Views

Real-TimeIncrement

Merge

Batch Layer

Serving Layer

Speed Layer

Lambda + Couchbase

New Data Stream

Merged View

All DataPrecompute

Views (Map Reduce)

Process Stream

Incremental Views

Partial Aggregat

e

Partial Aggregat

e

Partial Aggregat

e

Real-Time Data

BatchRecompute

Batch Views

Real-Time Views

Real-TimeIncrement

Merge

Batch Layer

Serving Layer

Speed Layer

Lamba + Hadoop + Spark + Storm

New Data Stream

Merged View

All DataPrecompute

Views (Map Reduce)

Process Stream

Incremental Views

Partial Aggregat

e

Partial Aggregat

e

Partial Aggregat

e

Real-Time Data

BatchRecompute

Batch Views

Real-Time Views

Real-TimeIncrement

Merge

Batch Layer

Serving Layer

Speed Layer

Couchbase HadoopConnector (Sqoop)

COMPLEX EVENT PROCESSING

Real TimeREPOSITORY

PERPETUALSTORE

ANALYTICALDB

BUSINESSINTELLIGENCE

MONITORING

CHAT/VOICESYSTEM

BATCHTRACK

REAL-TIMETRACK

DASHBOARD

TRACKING and COLLECTION

ANALYSIS ANDVISUALIZATION

REST FILTER METRICS

Integration at Scale

17

Requirements for data streaming in modern systems…

• Must support high throughput and low latency• Need to handle failures

• Pick up where you left off• Be efficient about resource usage

Data Sync is the Heart of Any Big Data System

Fundamental piece of the architecture- Data Sync maintains Data Redundancy for High

Availability (HA) & Disaster Recovery (DR)- Protect against failures – node, rack, region etc.

- Data Sync maintains Indexes- Indexing is key to building faster access paths to query data- Spatial, Full-text

DCP and Couchbase Server Architecture

Database Change Protocol: DCP

20

OrderingTo build interesting features a streaming protocol needs to have a concept of when operations happened.

Couchbase operation ordering at the node level:

Each mutation is assigned a sequence numberSequence numbers increase monotonically Sequence numbers are assigned on a per VBucket basis

Restart-abilityNeed to handle failures with grace, in particular being efficient about the amount of data being moved around.

Consistency pointsPoints in time for incremental backup, query consistency.

PerformanceRecognize that durability on a distributed system may have different definitions.

Design Goals

21

Version History (Failover Example)

VB UUID Seqno

X 0

VB UUID Seqno

DCP Replication Stream

X 0

Last Seqno: 100 Last Seqno: 0Last Seqno: 100

Active ReplicaActive

100Y

Last Seqno: 0

22

Restartability Example

VB UUID Seqno

X 0

VB UUID Seqno

DCP Replication Stream

X 0

Last Seqno: 80 Last Seqno: 150

Down Active

100Y

Replica

0 <= 80 <= 100

Y 100

23

Restartability Example (With Rollback)

VB UUID Seqno

X 0

VB UUID Seqno

DCP Replication Stream

X 0

Last Seqno: 110 Last Seqno: 150

Down Active

100Y

Replica

0 <= 110 <= 100

Last Seqno: 100

0 <= 100 <= 100

Y 100

Demo

25

Shopper Tracking(click stream)

Lightweight Analytics:• Department shopped• Tech platform• Click tracks by Income

Heavier Analytics, Develop Profiles

26

And at scale…

N1QL Queryand Apache Spark

Introduction & Integration

What do Application Developers Need

NoSQL databases provide Excellent performance and scalability Simplicity for working with data in aggregates A modern toolset for integration with mobile and web applications

Before N1QL, they tend to come with some unexpected baggage Map-Reduce for processing is great for some types of processing,

but is not expressive enough for other kinds Some say “but no joins” or have odd ways of duplicating data to

make it fit the query model

Key Concepts

What is N1QL?

N1QL is a new query language that systematically extends SQL to

document-oriented data

N1QL extends SQL to handle data that is:

Nested: Contains nested objects, arrays

Heterogeneous: Schema-optional, non-uniform

Distributed: Partitioned across a cluster

Language Highlights

SELECT

Standard SELECT pipeline

SELECT, FROM, WHERE, GROUP BY, ORDER BY, LIMIT, OFFSET

JOINs (by keys)

Subqueries

Aggregation

Set operators

UNION, INTERSECT, EXCEPT

EXPLAIN

Metadata access

Couchbase has metadata about documents in addition to the document values itself.

The META() function allows for this kind of access

select META(`travel-sample`) from `travel-sample` limit 1; { "$1": { "cas": 1.60955901476865e+14, "flags": 0, "id": "landmark_25257", "type": "json" } }

Metadata access

Couchbase has metadata about documents in addition to the document values itself.

The META() function allows for this kind of access

select META(`travel-sample`).id from `travel-sample` limit 10; { "id": "landmark_26021" }, { "id": "airport_1421" },…

Indexing

CREATE / DROP INDEX

Two types of indexes

View indexes

GSI indexes (new secondary indexes)

Document-oriented Extensions

Nested Paths—user.profile.email, children[0], children[0:2]

NEST, UNNEST

Ranging—FOR EVERY child IN children

Transformations—SELECT {“name”: first_name || last_name}

Non-uniform IS MISSING

Type checking and conversion

Distributed Direct lookup—USE KEYS

Efficient joins—ON KEYS

Apache Spark

What is Spark?

Apache is a fast and general engine for large-scale data processing.

Spark Components

Spark Core: RDDs, Clustering, Execution, Fault Management

Spark Components

Spark SQL: Work with structured data, distributed SQL querying

Spark Components

Spark Streaming: Build fault-tolerant streaming applications

Spark Components

Mlib: Machine Learning built in

Spark Components

GraphX: Graph processing and graph-parallel computations

Daytona GraySort Performance

Hadoop MR Record Spark Record

Data Size 102.5 TB 100 TB

Elapsed Time 72 mins 23 mins

# Nodes 2100 206

# Cores 50400 physical 6592 virtual

Cluster Disk Throughput 3150 GB/s 618 GB/s

Network Dedicated DC, 10Gbps EC2, 10Gbps

Sort Rate 1.42 TB/min 4.27 TB/min

Sort Rate/Node 0.67 GB/min 20.7 GB/min

Source: http://databricks.com/blog/2014/11/05/spark-officially-sets-a-new-record-in-large-scale-sorting.htmlBenchmark: http://sortbenchmark.org/

How does it work?

Resilient Distributed Datatypes paper: https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf

RDDCreation

SchedulingDAG

TaskExecution

How does it work?

Resilient Distributed Datatypes paper: https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf

RDDCreation

SchedulingDAG

TaskExecution

How does it work?

Resilient Distributed Datatypes paper: https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf

RDDCreation

SchedulingDAG

TaskExecution

Spark vs Hadoop

• Spark is RAM while Hadoop is HDFS (disk) bound

• API easier to reason about & to develop against

• Fully compatible with Hadoop Input/Output formats

• Hadoop more mature, Spark ecosystem growing fast

Ecosystem Flexibility

RDBMS

StreamsWeb APIs

DCPKVN1QLViews

BatchingArchived Data

OLTP

Infrastructure Consolidation

StreamsWeb APIs

UserInteraction

51

What does this look like?

Read a sequence of RDDs in, and apply a few transformations…

sc.couchbaseGet[JsonDocument(Seq( "21st_amendment_brewery_cafe-21a_ipa", "aass_brewery-genuine_pilsner")) .map(doc => doc.content().getString("name")) .collect() .foreach(println)

52

Reading a Couchbase Secondary Index…

Query a Couchbase View, subfilter, and then map over them…

// Read the first 10 rows and load their full documentsval beers = sc.couchbaseView(ViewQuery.from("beer", "brewery_beers")) .map(_.id) .couchbaseGet[JsonDocument]() .filter(doc => doc.content().getString("type") == "beer") .cache() // Calculate the mean for all beersprintln(beers .map(doc => doc.content().getDouble("abv").asInstanceOf[Double]) .mean())

Couchbase Connector

Spark Core Automatic Cluster and Resource Management Creating and Persisting RDDs Java APIs in addition to Scala (planned before GA)

Spark SQL Easy JSON handling and querying Tight N1QL Integration (dp2)

Spark Streaming Persisting DStreams DCP source (planned before GA)

Connector Facts

• Current Version: 1.0.0-dp• Beta in July• GA planned for Q3

Code:https://github.com/couchbaselabs/couchbase-spark-connector

Docs until GA: https://github.com/couchbaselabs/couchbase-spark-connector/wiki

Demo #2

Questions

Couchbase Spark Samples:https://github.com/couchbaselabs/couchbase-spark-samples

Couchbase Spark Connector:http://blog.couchbase.com/introducing-the-couchbase-spark-connectorhttps://github.com/couchbaselabs/couchbase-spark-connector

Couchbase Kafka Connector:http://blog.couchbase.com/introducing-the-couchbase-kafka-connectorhttps://github.com/couchbase/couchbase-kafka-connector

Actions!

Matt Ingenthron @ingenthr

Thanks

top related