kafka connect the dots - amazon s3€¦ · streaming data between apache kafka and other systems....
TRANSCRIPT
© 2018 Dbvisit Software | dbvisit.com© 2018 Dbvisit Software | dbvisit.com
Kafka Connect the Dots
Building Oracle Change Data Capture Pipelines With Kafka
Mike Donovan
CTO
Dbvisit Software
© 2018 Dbvisit Software | dbvisit.com
2
Mike Donovan
Chief Technology Officer, Dbvisit Software
• Multi-platform DBA, (Oracle, MSSQL…..)
• Conference speaker: OOW, RMOUG, dbTech Showcase, Collaborate, nlOUG
• NZOUG member
• Technical Writer and Editor
• Kafka enthusiast
• (new) Oracle ACE
• Old furniture at Dbvisit
Professional not-knower of things
© 2018 Dbvisit Software | dbvisit.com
• Real-time Oracle Database Streaming software solutions
• In the Cloud | Hybrid | On-Premises
• New Zealand-based, US office, Asia Sales office, EU office (Prague)
• Unique offering: disaster recovery solutions for Oracle Standard Edition
• Logical replication for moving data where ever and whenever you wish
• Flexible licensing, cost effective pricing models available
• Exceptional growth, 1300+ customers
• Peerless customer support
Dbvisit Software
© 2018 Dbvisit Software | dbvisit.com
BEFORE: Many Ad Hoc Pipelines
© 2018 Dbvisit Software | dbvisit.com
Stream Data Platform with Kafka
• Distributed
• Fault Tolerant
o Stream Processing
o Data Integration
o Message Store
© 2018 Dbvisit Software | dbvisit.com
Quick Recap: what is Kafka?
A scalable, fault tolerant, distributed system where messages are kept in
topics that are partitioned and replicated across multiple nodes.
• Developed at LinkedIn ~2010
• Confluent and the OS project (NB!)
An open-source publish-subscribe messaging implemented as a
distributed commit log
© 2018 Dbvisit Software | dbvisit.com
Quick Recap: what is Kafka?
• Data is written to Kafka in the form of key-value pair messages (can have
null)
• Each message belongs to a topic
• Messages as a continuous flow (stream) of events
• Producers (writers) decoupled from Consumers (readers)
• A delivery channel/platform (if you like) – crossing systems (data
Integration)
© 2018 Dbvisit Software | dbvisit.com
Kafka - a log writer/readerPartition 0 Partition 1 Partition 2
Old
New
• Organized by topics
• Sub-categorization by
partitions (log files on
disk)
• Replicated between
nodes for redundancy
© 2018 Dbvisit Software | dbvisit.com
Making use of KafkaFor what?
• Messaging system
• Data streaming platform
• Data storage
To do what?
• Messaging
• Website Activity
• Tracking Metrics
• Log Aggregation
• Stream Processing
• Event Sourcing
• Commit Log
© 2018 Dbvisit Software | dbvisit.com
Kafka - components
Zookeeper
Schema Registry
Kafka
REST Proxy
Kafka Connect
data
What about KSQL and Kafka
Streams?
© 2018 Dbvisit Software | dbvisit.com
Bridging the Old World and the New...
Indicative use cases:
• Real-time System Monitoring and Alerting (financial trading, fraud detection)
• Real-time Business Intelligence and Analytics
• Kleppmann: Update search indexes, invalidate caches, create snapshots, generate recommendations, copy data into another database
Data Pipelines
© 2018 Dbvisit Software | dbvisit.com
Kafka Connect - export/import toolConnect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka. Kafka Connect can ingest entire databases or collect metrics from all your application servers into Kafka topics, making the data available for stream processing with low latency. An export job can deliver data from Kafka topics into secondary storage and query systems or into batch systems for offline
Cluster-able
© 2018 Dbvisit Software | dbvisit.com
Kafka Connect - export/import tool
STANDALONE mode
DISTRIBUTED mode
Key Differences:
• Topic Storage
• Rebalancing
• Interaction
Core Processes:
• Connectors
• Workers
• Tasks
© 2018 Dbvisit Software | dbvisit.com
Kafka Connect - serious power
• What about topic creation
• Offset management
• SMT (single message transformations)
• Override Kafka settings
© 2018 Dbvisit Software | dbvisit.com
Kafka Connect - export/import tool
SINK CONNECTORS
• Cassandra
• Elasticsearch
• Google BigQuery
• Hbase
• HDFS
• JDBC
• Kudu
• MongoDB
• Postgres
• S3
• SAP HANA
• Solr
• Vertica
SOURCE CONNECTORS
• JDBC
• Couchbase
• Vertica
• Blockchain
• Files/Directories
• GitHub
• FTP
• Google PubSub
• MongoDB
• PostgreSQL
• Salesforce
© 2018 Dbvisit Software | dbvisit.com
Kafka Connect - export/import tool
© 2018 Dbvisit Software | dbvisit.com
Kafka Connect - export/import tool
“Look ma, no code!” - @RMOFF
Filesource.properties
NAME=local-file-source
CONNECTOR.CLASS=FileStreamSource
TASKS.MAX=1
FILE=/u01/app/oracle/diag/rdbms/xe/XE/trace/alert_XE.log
TOPIC=alertlog_test
© 2018 Dbvisit Software | dbvisit.com
• Beyond the plumbing...
• Example use cases: caches, mviews, aggregates pre-computes, alerts
• Your Oracle data tells a story
logon
activity
sales channel data
Building Oracle CDC Data Pipelines
© 2018 Dbvisit Software | dbvisit.com
The New World of data
• Data centralization
• Real time delivery
• Integration
• Stream data processing
• New data end points/stores
© 2018 Dbvisit Software | dbvisit.com
INPUT(source connectors)
© 2018 Dbvisit Software | dbvisit.com
Kafka Connect – setting up
Finding your way around the directories
Installing new connectors
• Connector JAR file (Java class)
• Properties file
Properties files (JSON and properties)
Running connect
• Make use of the Confluent CLI!
© 2018 Dbvisit Software | dbvisit.com
Example 1: SOURCE File Connector
Write to a file with UTL_FILE package need to do the following as SYS (sysdba)CREATE OR REPLACE DIRECTORY MIKESDIR AS '/home/oracle/'; GRANT READ ON DIRECTORY MIKESDIR TO PUBLIC; grant execute on utl_file to system;
DECLARE out_File UTL_FILE.FILE_TYPE;
BEGIN out_File := UTL_FILE.FOPEN('MIKESDIR', 'hellotest.txt' , 'a');
UTL_FILE.PUT_LINE(out_file , 'hello world (a message from Oracle)'); UTL_FILE.FCLOSE(out_file);
END;
© 2018 Dbvisit Software | dbvisit.com
Example 1: SOURCE File Connector
GOTCHAs:CONFIG FILE:
name=local-file-source connector.class=FileStreamSourcetasks.max=1 file=/u01/app/oracle/diag/rdbms/xe/XE/trace/alert_XE.logtopic=alertlog_test
© 2018 Dbvisit Software | dbvisit.com
Example 1: SOURCE File Connector
DEMO
© 2018 Dbvisit Software | dbvisit.com
Example 2: SOURCE File Connector
Read from the Oracle database alert log:/u01/app/oracle/diag/rdbms/xe/XE/trace/alert_XE.log
GOTCHAs....
CONFIG FILE:
name=local-file-source connector.class=FileStreamSourcetasks.max=1 file=/u01/app/oracle/diag/rdbms/xe/XE/trace/alert_XE.logtopic=alertlog_test
© 2018 Dbvisit Software | dbvisit.com
Example 2: SOURCE File Connector
DEMO
© 2018 Dbvisit Software | dbvisit.com
Example 2: SOURCE File Connector
SPECIAL BONUS TOPIC:SMT
single message transformations
© 2018 Dbvisit Software | dbvisit.com
SPECIAL BONUS TOPIC: SMTs
CONFIG FILE
transforms=HoistField,InsertSourcetransforms.HoistField.type=org.apache.kafka.connect.transforms.HoistField$Valuetransforms.HoistField.org.apache.kafka.connect.transforms.HoistFieldtransforms.HoistField.field=alertlog_msgtransforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Valuetransforms.InsertSource.static.field=SMT_DATABASEtransforms.InsertSource.static.value=XE
© 2018 Dbvisit Software | dbvisit.com
SPECIAL BONUS TOPIC: SMTs
JSON OUTPUT:
Struct{alertlog_msg=Mon Feb 12 15:30:34 2018,SMT_DATABASE=XE}Struct{alertlog_msg=Thread 1 advanced to log sequence 1721 (LGWR switch),SMT_DATABASE=XE}Struct{alertlog_msg= Current log# 1 seq# 1721 mem# 0: /u01/app/oracle/fast_recovery_area/XE/onlinelog/o1_mf_1_8x1y15xj_.log,SMT_DATABASE=XE}Struct{alertlog_msg=Mon Feb 12 15:30:34 2018,SMT_DATABASE=XE}Struct{alertlog_msg=Archived Log entry 1709 added for thread 1 sequence 1720 ID 0xa0fa1263 dest 1:,SMT_DATABASE=XE}Struct{alertlog_msg=Mon Feb 12 15:30:53 2018,SMT_DATABASE=XE}Struct{alertlog_msg=Thread 1 cannot allocate new log, sequence 1722,SMT_DATABASE=XE}Struct{alertlog_msg=Checkpoint not complete,SMT_DATABASE=XE}Struct{alertlog_msg= Current log# 1 seq# 1721 mem# 0: /u01/app/oracle/fast_recovery_area/XE/onlinelog/o1_mf_1_8x1y15xj_.log,SMT_DATABASE=XE}Struct{alertlog_msg=Thread 1 advanced to log sequence 1722 (LGWR switch),SMT_DATABASE=XE}
© 2018 Dbvisit Software | dbvisit.com
Example 3: SOURCE JDBC Connector
Read from an Oracle database table MODE
• Incrementing column
• Timestamp column
• BATCH
• Table Whitelist
© 2018 Dbvisit Software | dbvisit.com
Example 3: SOURCE JDBC Connector
GOTCHAs:
• Installing the JDBC drivers
CONFIG FILE:
name=test-oracle-jdbc-autoincrementconnector.class=io.confluent.connect.jdbc.JdbcSourceConnectortasks.max=1connection.password = examplePasswordconnection.url = jdbc:oracle:thin:@example.oracle.server.com:1521/ExampleServiceNameconnection.user = exampleUsertable.whitelist=USERSmode=incrementingincrementing.column.name=IDtopic.prefix=test-oracle-jdbc-
© 2018 Dbvisit Software | dbvisit.com
Example 3: SOURCE JDBC Connector
DEMO
© 2018 Dbvisit Software | dbvisit.com
Example 4: SOURCE CDC Connector
Read Oracle database change data from
the Oracle redo log
• Redolog scanner applications
© 2018 Dbvisit Software | dbvisit.com
Oracle Change Data – delivered to Kafka
INSERT...
into SCOTT.TEST9
metadata
© 2018 Dbvisit Software | dbvisit.com
Example 4: SOURCE CDC Connector
GOTCHAs:CONFIG FILE:
name=dbvisit-replicateconnector.class=com.dbvisit.replicate.kafkaconnect.ReplicateSourceConnectortasks.max=4topic.prefix=REP2-plog.location.uri=file:/home/oracle/RQ-3595/mineplog.data.flush.size=100topic.name.transaction.info=TX.METAconnector.publish.cdc.format=changerowconnector.publish.transaction.info=trueconnector.publish.keys=trueconnector.publish.no.schema.evolution=falseconnector.catalog.topic.name=replicate-info
© 2018 Dbvisit Software | dbvisit.com
Example 4: SOURCE JDBC Connector
DEMO
© 2018 Dbvisit Software | dbvisit.com
OUTPUT(sink connectors)
© 2018 Dbvisit Software | dbvisit.com
Example 5: SINK File Connector
GOTCHAs:CONFIG FILE:
name=local-file-sink connector.class=FileStreamSinktasks.max=1 file=/home/oracle/mike/test.sink.txttopics=alertlog_test
© 2018 Dbvisit Software | dbvisit.com
Example 5: SINK File Connector
DEMO
© 2018 Dbvisit Software | dbvisit.com
Building Oracle CDC Data Pipelines
Elasticsearch• Storage
• Search
• Analytics
Kibana• Visualization
• Reports
• Dashboards
© 2018 Dbvisit Software | dbvisit.com
Example 6: SINK File Connector
GOTCHAs:
• Key values
CONFIG FILE:
name=elasticsearch-sinkconnector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnectortasks.max=1topics=REP-SOE.CUSTOMERSconnection.url=http://localhost:9200type.name=kafka-connectkey.ignore=truetopic.index.map=REP-SOE.CUSTOMERS:rep-soe.customerstopic.key.ignore=REP-SOE.CUSTOMERS
© 2018 Dbvisit Software | dbvisit.com
Example 6: SINK File Connector
DEMO• Elastic queries
• Kibana visualizations
• Slack alerts
© 2018 Dbvisit Software | dbvisit.com
• Kafka and Kafka Connectwww.confluent.io
• Download the Confluent Platform (bundled connectors)
• Check out the available community connectors
• Try running it in Docker
Get started with Kafka Connect
© 2018 Dbvisit Software | dbvisit.com© 2018 Dbvisit Software | dbvisit.com
Thank you