real time streams at scale with kafka: couchbase connect 2015
TRANSCRIPT
REAL TIME STREAMS AT SCALE WITH KAFKAEwen Cheslack-PostavaEngineer@Confluent David MaierPrincipal Solutions Engineer
©2015 Couchbase Inc. 2
Agenda
Welcome! Introduction into Kafka Couchbase specific Use Cases Behind the Scenes An Example Producer and Consumer Demo
Confluent
Mission: Make this a practical reality everywhere Product
Stream processing integration for Kafka Connectors for streaming data flow for common systems Monitor end-to-end data flow Schemas and metadata management First release - Confluent Platform 1.0 – http://confluent.io
©2015 Couchbase Inc. 35
Couchbase & Kafka Use Cases
Couchbase as the Master Database Follow the changes those are happening in the Bucket by
updating data somewhere else
Triggers/Event Handling Handle events like Deletions/Expirations externally (E.G.
Expiration & replicated session tokens)
Real-time Data Integration Extract from Couchbase , transform and load data in real
time
Real-time Data Processing Extract from a Bucket, process in real-time and load to
another Bucket
©2015 Couchbase Inc. 37
DCP
Database Change Protocol Since Couchbase Server 3.x internal de-facto standard to handle changes within a
Bucket Clients: Intra-Cluster Replication, Indexing, XDCR
Mutation Event which is raised in case of a creation, update or delete Each mutation that occurs in a vBucket has a sequence number
Core of the 2.x Java SDK Can consume DCP streams Important: Not yet officially exposed, but used to implement Connectors that
are provided by Couchbase
©2015 Couchbase Inc. 38
A DCP Receiver Example (without Kafka)…
public class Receiver {
…
/**
* Initialize the receiver by passing an environment
*
* @param nodes
* @param bucket
* @param env
* @param password
*/
public Receiver(String[] nodes, String bucket, String password, IHandler handler, ReceiverEnv env) {
this.env = env;
this.core = new CouchbaseCore(env);
this.nodes = nodes;
this.bucket = bucket;
this.password = password;
this.handler = handler;
}
©2015 Couchbase Inc. 39
A DCP Receiver Example (without Kafka) /**
* Connect to the Couchbase Cluster
*/
public void connect()
{
//This sets up the bootstrap nodes
core.send(new SeedNodesRequest(nodes))
.timeout(CONNECT_TIMEOUT, TimeUnit.SECONDS)
.toBlocking()
.single();
//Now open a bucket connection
core.send(new OpenBucketRequest(bucket, password))
.timeout(CONNECT_TIMEOUT, TimeUnit.SECONDS)
.toBlocking()
.single();
connected = true;
}
©2015 Couchbase Inc. 40
A DCP Receiver Example (without Kafka) /**
* Retrieve the number of vBuckets of the Bucket
* @return
*/
private Observable<Integer> numOfVBuckets()
{
return core.<GetClusterConfigResponse>send(new GetClusterConfigRequest())
.map(cfg -> {
CouchbaseBucketConfig bucketCfg = (CouchbaseBucketConfig) cfg.config().bucketConfig(bucket);
return bucketCfg.numberOfPartitions();
} );
}
©2015 Couchbase Inc. 41
A DCP Receiver Example (without Kafka) /**
* Request the streams for all vBuckets
* @return
*/
private Observable<DCPRequest> requestStreams(int numOfVBuckets)
{
return Observable.merge( //Merge the streams to one stream
Observable.range(0, numOfVBuckets) //For each vBucket
.flatMap(vBucket -> core.<StreamRequestResponse>send(new StreamRequestRequest(vBucket.shortValue(), bucket))) //Request a stream
.map(response -> response.stream()) //Return the stream as Observable of DCPRequest
);
}
©2015 Couchbase Inc. 42
A DCP Receiver Example (without Kafka) /**
* Open the DCP streams and handle them by using the passed handler
*/
public void stream()
{
core.send(new OpenConnectionRequest(STREAM_NAME, bucket))
.toList()
.flatMap(resp -> numOfVBuckets()) //Send a cluster config request and map the result to the number of vBuckets
.flatMap(count -> requestStreams(count)) //Stream by taking the number of vBuckets into account
.toBlocking()
.forEach(dcp -> handler.handle(dcp)); //Now handle every result of the stream here
}
…
©2015 Couchbase Inc. 44
A Kafka Producer Examplepublic class CouchbaseProducer implements IProducer {
...
/**
* The constructor which takes all connection details as arguments
*/
public CouchbaseProducer(String couchbaseNode, String bucket, String password, String zookeeper, String topic) {
DefaultCouchbaseKafkaEnvironment.Builder envBuilder = (DefaultCouchbaseKafkaEnvironment.Builder) DefaultCouchbaseKafkaEnvironment
.builder()
.kafkaFilterClass(MutationMessageFilter.class.getName())
.kafkaValueSerializerClass(MutationMessageEncoder.class.getName())
.dcpEnabled(true);
//-- The envrionment is optional
//Per default com.couchbase.kafka.coder.JsonEncoder is used for the value serialization
innerConnector = CouchbaseKafkaConnector.create(envBuilder.build(), node, bucket, password, zookeeper , topic);
}
@Override
public void produce() {
innerConnector.run();
}
...
©2015 Couchbase Inc. 45
A Kafka Consumer Examplepublic class LogConsumer extends BaseConsumer {
…
/**
* Consume single threaded by using the derived executor
*/
@Override
public void consume()
{
List<KafkaStream<byte[], byte[]>> streamList = this.streams.get(this.topic);
for (final KafkaStream<byte[], byte[]> s : streamList) {
ConsumerIterator<byte[], byte[]> it = s.iterator();
while (it.hasNext()) {
System.out.println("msg = " + new String(it.next().message()));
}
}
shutdown();
}
...
©2015 Couchbase Inc. 47
Demo Create a Kafka Topic Use a Couchbase Producer in order
to consume the DCP stream Filter on Mutation Messages Encode Messages Consume by logging to Stdout