kafka connect
TRANSCRIPT
Agenda
Me and DataMountaineer Data IntegrationKafka ConnectDemo
Andrew StevensonLead Mountain Goat at DMSolution Architect
- Fast Data- Big Data as long as it’s Fast
Contributed to- Kafka Connect, Sqoop, Kite SDK
Kafka Connect Connectors20+ connectors.
DataStax Certified Cassandra Sink
Professional ServicesImplementations, Architecture reviews
DevOps & Tooling
Connector Support
Partners
Data Integration
Loading and unloading data should be easy
But it takes too long(certainly on hadoop)
Why?Enterprise pipelines must consider:
Delivery semanticsOffset managementSerialization / de-serializationPartitioning / scalabilityFault tolerance / failoverData model integrationCI/CDMetrics / monitoring
Which results in?Multiple technologies - Bash wrappers on Sqoop- Oozie Xml - Custom Java/Scala/C#- Third Party - Multiple teams hand roll similar solutionsLack of separation of concerns- Extract/loading ends up domain specific
What we really care about…
DOMAIN SPECIFIC TRANSFORMATIONS
Focus on adding value
Kafka Connect?✓Delivery semantics✓Offset management✓Serialization / de-serialization✓Partitioning / scalability✓Fault tolerance / fail-over✓Data model integration✓Metrics
Out of the Box – ONE FRAMEWROKLets you focus on domain logic
Kafka Connect“a common framework facilitating
data streamsbetween kafka and other systems”
Ease of use
deploy flows via configuration files with no code necessary
Out of the box & Community
Configurations are key-value mappingsname connector’s unique nameconnector.class connector’s classmax.tasks maximum tasks to createOption[topics] list of topics (for sinks)
Config Examplename = kudu-sinkconnector.class = KuduSinkConnectortasks.max = 1topics = kudu_testconnect.kudu.master = quickstartconnect.kudu.sink.kcql = INSERT INTO KuduTable SELECT * FROM kudu_test
KCQLis a SQL like syntax allowing streamlined configuration of Kafka Sink Connectors and
then some more..Example:
Project fields, rename or ignore them and further customise in plain text
INSERT INTO transactions SELECT field1 AS column1, field2 AS column2 FROM TransactionTopic;INSERT INTO audits SELECT * FROM AuditsTopic;INSERT INTO logs SELECT * FROM LogsTopic AUTOEVOLVE;INSERT INTO invoices SELECT * FROM InvoiceTopic PK invoiceID;
KCQL |
{ "sensor_id": "01" , "temperature": 52.7943, "ts": 1484648810 }{ “sensor_id": "02" , "temperature": 28.8597, "ts": 1484648810 }
INSERT INTO sensor_ringbuffer SELECT sensor_id, temperature, ts
FROM coap_sensor_topic WITHFORMAT JSONSTOREAS RING_BUFFER
INSERT INTO sensor_reliabletopic SELECT sensor_id, temperature, tsFROM coap_sensor_topic WITHFORMAT AVROSTOREAS RELIABLE_TOPIC
Deeper into ConnectModes
Workers
Connectors
Tasks
Converters
ModesStandalone
- single node, for testing, one off import/exports
Distributed- 1 or more workers on 1 or more serversform a cluster/consumer group
InteractionStandalone
- properties file at start up
Distributed- rest API ./cli- create ./cli create conn < conn.conf- start ./cli start conn < conn.conf- stop ./cli stop conn - restart ./cli restart conn - pause ./cli pause conn - remove ./cli rm conn- status ./cli ps- plugins ./cli plugins- validate ./cli validate < conn.conf
Workers
group.id = cluster1
group.id = cluster1
group.id = cluster1* Kafka Consumer Group
Kafka
ConnectorsDefine the how
- which plugin to use, must be on CLASSPATH- breaks up work into tasks- multiple per cluster, unique name - fault tolerant
Happy flow
Rest API
W1
W2
W3
C1
C1 T1
Config topic
C1 T1
C1 T2
C1 T3
Put
* Coordinator
Unhappy flowW1
W2
W3
C1
C1 T1
Config topic
C1 T1
C1 T2
C1 T3
* ReBalanceListener
Connector APIclass CoherenceSinkConnector extends SinkConnector {
override def taskClass(): Class[_ <: Task]
override def start(props: util.Map[String, String]): Unit
override def taskConfigs(maxTasks: Int): util.List[util.Map[String, String]]
override def stop(): Unit}
TasksPerform the actual work
- loading / unloading- single threaded
Used to Scale- more task more parallelism- kafka consumer group managed
- if (tasks > partitions) => idle tasks
Contains no state, it’s in Kafka- started - stopped- restarted- pause
Task APIclass CoherenceSinkTask extends SinkTask {
override def start(props: util.Map[String, String]): Unit
override def stop(): Unit
override def flush(offsets: util.Map[TopicPartition, OffsetAndMetadata])
override def put(records: util.Collection[SinkRecord]) }
Converters
JsonConverter- ships with Kafka
AvroConverter- ships with Confluent- integrates with Schema Registry
kafka-connect-blockchainkafka-connect-bloombergkafka-connect-cassandrakafka-connect-coapkafka-connect-druidkafka-connect-elastickafka-connect-ftpkafka-connect-hazelcastkafka-connect-hbase
kafka-connect-influxdbkafka-connect-jmskafka-connect-kudukafka-connect-mongodbkafka-connect-mqttkafka-connect-rediskafka-connect-rethinkkafka-connect-voltdbkafka-connect-yahoo
Source: https://github.com/datamountaineer/stream-reactorIntegration Tests: http://coyote.landoop.com/connect/
Connect UISchema UITopic UIDockersCloudera CSD
Kafka ConnectScalable
Fault tolerantCommon framework
Does the hard for work you
DEMO