real time streams at scale with kafka: couchbase connect 2015

48
REAL TIME STREAMS AT SCALE WITH KAFKA Ewen Cheslack-Postava Engineer@Confluent David Maier Principal Solutions Engineer

Upload: couchbase

Post on 10-Aug-2015

314 views

Category:

Technology


0 download

TRANSCRIPT

REAL TIME STREAMS AT SCALE WITH KAFKAEwen Cheslack-PostavaEngineer@Confluent David MaierPrincipal Solutions Engineer

©2015 Couchbase Inc. 2

Agenda

Welcome! Introduction into Kafka Couchbase specific Use Cases Behind the Scenes An Example Producer and Consumer Demo

©2015 Couchbase Inc. 3

Welcome!

Ewen Cheslack-PostavaEngineer

Introduction into KafkaThe Confluent Platform

Confluent

Mission: Make this a practical reality everywhere Product

Stream processing integration for Kafka Connectors for streaming data flow for common systems Monitor end-to-end data flow Schemas and metadata management First release - Confluent Platform 1.0 – http://confluent.io

Couchbase specific Use Cases

©2015 Couchbase Inc. 35

Couchbase & Kafka Use Cases

Couchbase as the Master Database Follow the changes those are happening in the Bucket by

updating data somewhere else

Triggers/Event Handling Handle events like Deletions/Expirations externally (E.G.

Expiration & replicated session tokens)

Real-time Data Integration Extract from Couchbase , transform and load data in real

time

Real-time Data Processing Extract from a Bucket, process in real-time and load to

another Bucket

Behind the Scenesof the Couchbase Connector

©2015 Couchbase Inc. 37

DCP

Database Change Protocol Since Couchbase Server 3.x internal de-facto standard to handle changes within a

Bucket Clients: Intra-Cluster Replication, Indexing, XDCR

Mutation Event which is raised in case of a creation, update or delete Each mutation that occurs in a vBucket has a sequence number

Core of the 2.x Java SDK Can consume DCP streams Important: Not yet officially exposed, but used to implement Connectors that

are provided by Couchbase

©2015 Couchbase Inc. 38

A DCP Receiver Example (without Kafka)…

public class Receiver {

/**

* Initialize the receiver by passing an environment

*

* @param nodes

* @param bucket

* @param env

* @param password

*/

public Receiver(String[] nodes, String bucket, String password, IHandler handler, ReceiverEnv env) {

this.env = env;

this.core = new CouchbaseCore(env);

this.nodes = nodes;

this.bucket = bucket;

this.password = password;

this.handler = handler;

}

©2015 Couchbase Inc. 39

A DCP Receiver Example (without Kafka) /**

* Connect to the Couchbase Cluster

*/

public void connect()

{

//This sets up the bootstrap nodes

core.send(new SeedNodesRequest(nodes))

.timeout(CONNECT_TIMEOUT, TimeUnit.SECONDS)

.toBlocking()

.single();

//Now open a bucket connection

core.send(new OpenBucketRequest(bucket, password))

.timeout(CONNECT_TIMEOUT, TimeUnit.SECONDS)

.toBlocking()

.single();

connected = true;

}

©2015 Couchbase Inc. 40

A DCP Receiver Example (without Kafka) /**

* Retrieve the number of vBuckets of the Bucket

* @return

*/

private Observable<Integer> numOfVBuckets()

{

return core.<GetClusterConfigResponse>send(new GetClusterConfigRequest())

.map(cfg -> {

CouchbaseBucketConfig bucketCfg = (CouchbaseBucketConfig) cfg.config().bucketConfig(bucket);

return bucketCfg.numberOfPartitions();

} );

}

©2015 Couchbase Inc. 41

A DCP Receiver Example (without Kafka) /**

* Request the streams for all vBuckets

* @return

*/

private Observable<DCPRequest> requestStreams(int numOfVBuckets)

{

return Observable.merge( //Merge the streams to one stream

Observable.range(0, numOfVBuckets) //For each vBucket

.flatMap(vBucket -> core.<StreamRequestResponse>send(new StreamRequestRequest(vBucket.shortValue(), bucket))) //Request a stream

.map(response -> response.stream()) //Return the stream as Observable of DCPRequest

);

}

©2015 Couchbase Inc. 42

A DCP Receiver Example (without Kafka) /**

* Open the DCP streams and handle them by using the passed handler

*/

public void stream()

{

core.send(new OpenConnectionRequest(STREAM_NAME, bucket))

.toList()

.flatMap(resp -> numOfVBuckets()) //Send a cluster config request and map the result to the number of vBuckets

.flatMap(count -> requestStreams(count)) //Stream by taking the number of vBuckets into account

.toBlocking()

.forEach(dcp -> handler.handle(dcp)); //Now handle every result of the stream here

}

An Example Producer and ConsumerConnecting Couchbase via Kafka to an

Application

©2015 Couchbase Inc. 44

A Kafka Producer Examplepublic class CouchbaseProducer implements IProducer {

...

/**

* The constructor which takes all connection details as arguments

*/

public CouchbaseProducer(String couchbaseNode, String bucket, String password, String zookeeper, String topic) {

DefaultCouchbaseKafkaEnvironment.Builder envBuilder = (DefaultCouchbaseKafkaEnvironment.Builder) DefaultCouchbaseKafkaEnvironment

.builder()

.kafkaFilterClass(MutationMessageFilter.class.getName())

.kafkaValueSerializerClass(MutationMessageEncoder.class.getName())

.dcpEnabled(true);

//-- The envrionment is optional

//Per default com.couchbase.kafka.coder.JsonEncoder is used for the value serialization

innerConnector = CouchbaseKafkaConnector.create(envBuilder.build(), node, bucket, password, zookeeper , topic);

}

@Override

public void produce() {

innerConnector.run();

}

...

©2015 Couchbase Inc. 45

A Kafka Consumer Examplepublic class LogConsumer extends BaseConsumer {

/**

* Consume single threaded by using the derived executor

*/

@Override

public void consume()

{

List<KafkaStream<byte[], byte[]>> streamList = this.streams.get(this.topic);

for (final KafkaStream<byte[], byte[]> s : streamList) {

ConsumerIterator<byte[], byte[]> it = s.iterator();

while (it.hasNext()) {

System.out.println("msg = " + new String(it.next().message()));

}

}

shutdown();

}

...

Demo

©2015 Couchbase Inc. 47

Demo Create a Kafka Topic Use a Couchbase Producer in order

to consume the DCP stream Filter on Mutation Messages Encode Messages Consume by logging to Stdout

Thank you!Q&A