best practices for stream processing with gridgain and ... · best practices for stream processing...
Post on 26-Apr-2020
10 Views
Preview:
TRANSCRIPT
BestPracticesforStreamProcessingwithGridGainand
ApacheIgniteandKafka
AlexeyKukushkinProfessionalServices
RobMeyerOutboundProductManagement
Agenda
• WhyweneedKafka/Confluent-Ignite/GridGainintegration• Ignite/GridGainKafka/ConfluentConnectors• Deployment,monitoringandmanagement• IntegrationExamples• Performanceandscalabilitytuning• Q&A
WhyweneedKafka/Confluent-Ignite/GridGainintegration
ApacheKafkaandConfluent
Adistributedstreamingplatform:• Publish/subscribe• Scalable• Fault-tolerant• Real-time• Persistent• WrittenmostlyinScala
GridGainCompanyConfidential
GridGain In-MemoryComputingPlatform
GridGain In-Memory Computing Platform
New Applications, AnalyticsExisting Applications Streaming, Machine Learning
RDBMS NoSQL Hadoop
In-MemoryData Grid
In-Memory Database
Streaming Analytics
• BuiltonApacheIgnite• Comprehensiveplatformthatsupportsallprojects
• Noripandreplace• In-memoryspeed,petabytescale• EnablesHTAP,streaminganalyticsandcontinuouslearning
• WhatGridGain adds• Production-readyreleases• Enterprise-gradesecurity,deploymentandmanagement
• Globalsupportandservices• Provenformissioncriticalapps
ContinuousLearning
ComputeTransactions Continuous Learning(Machine, Deep Learning)SQLStreaming Services
Spark (DataFrame, RDD, HDFS)ODBC/JDBC
Memory-Centric Storage
StreamingAnalytics,MachineandDeepLearning
RDBMS NoSQL Hadoop
Native Persistence 3rd party Persistence
NoSQLSQL
Messaging Java, .NET, … R1, Python1
1.RandPythondeveloperscurrentlyinvokeJavaclasses.DirectRandPythonsupportplanned.
KafkaCamelSparkStormJMSMQTT…
KafkaCamelSparkStormJMSMQTT…
Decision AutomationStream Ingestion Machine, Deep Learning AnalyticsStream Processing
Ignite/GridGain&KafkaIntegration
• Kafkaiscommonlyusedasamessagingbackboneinaheterogeneoussystem• AddIgnite/GridGaintoaKafka-basedsystem
https://www.imcsummit.org/2018/eu/session/embracing-service-consumption-shift-banking
IgniteandGridGainKafkaConnectors
DevelopingKafkaConsumers&Producers
YoucandevelopKafkaintegrationforanysystemusingKafkaProducerandConsumerAPIsbutyouneedtosolveproblemslike:• HowtouseeachAPIofeveryproducerandconsumer• HowKafkawillunderstandyourdata• Howdatawillbeconvertedbetweenproducersandconsumers• Howtoscaletheproducer-to-consumerflow• Howtorecoverfromafailure• …andmanymore
GridGain-KafkaConnector:Out-of-the-boxIntegration
• Addressesalltheintegrationchallengesusingbestpractices• Doesnotneedanycodingeveninthemostcomplexintegrations• DevelopedbyGridGain/IgniteCommunitywithhelpfromConfluenttoensurebothIgniteandKafkabestpractices• BasedonKafkaConnectandIgniteAPIs• KafkaConnectAPIencouragesdesignforscalability,failoveranddataschema• GridGainSourceConnectorusesIgniteContinuousQueries• GridGainSinkConnectorusesIgniteDataStreamer
KafkaSourceandSinkConnectors
KafkaConnectServerTypes
Ingeneral,thereare4separateclustersinKafkaConnectinfrastructure:• Kafkacluster• clusternodescalledBrokers
• KafkaConnectcluster• clusternodescalledWorkers
• SourceandSinkGridGain/Igniteclusters• ServerNodes
GridGainConnectorFeatures
Twoconnectorsindependentfromeachother:• GridGainSourceConnector• streamsdatafromGridGainintoKafka• usesIgnitecontinuousqueries
• GridGainSinkConnector• streamsdatafromKafkaintoGridGain• usesIgnitedatastreamers
GridGainSourceConnector:Scalability
Scalesbyassigningmultiplesourcepartitions toKafkaConnecttasks.ForGridGainSourceConnector:• Partition=Cache• Record=CacheEntry
KafkaSourceConnectorModel:
GridGainSourceConnector:RebalancingandFailoverRebalancing:re-assignmentofKafkaConnectorsandTasksto Workerswhen
• AWorkerjoinsorleavesthecluster• Acacheisaddedorremoved
Failover:resumingoperationafterafailure• howtoresumeafterfailureorrebalancingwithoutlosingcacheupdatesoccurredwhenKafkaWorkernodewasdown?
SourceOffset:positioninthesourcestream.KafkaConnect:• providespersistentanddistributedsourceoffsetstorage• automaticallysaveslastcommittedoffset• allowsresumingfromthelastoffsetwithoutlosingdata.
Problem:cacheshavenooffsets!
GridGainSourceConnector:FailoverPolicies
• None:nosourceoffsetsaved,startlisteningtocurrentdataafterrestart• Cons:updatesoccurredduringdowntimearelost(“atleastonce”datadeliveryguaranteeviolated)• Pros:fastest
• FullSnapshot:nosourceoffsetsaved,alwayspullalldatafromthecacheuponstartup• Cons:
• Slow,notapplicableforbigcaches• Duplicatedata(”exactlyonce”datadeliveryguaranteeisviolated)
• Pros:nodataislost
GridGainSourceConnector:FailoverPolicies• Backlog:resumefromthelastcommittedsourceoffset• KafkaBacklogcacheinIgnite
• Key:incrementaloffset• Value:cachenameandserializedcacheentries
• KafkaBacklogserviceinIgnite• RunscontinuousqueriespullingdatafromsourcecachesintoBacklog
• SourceConnectorgetsdatafromBacklogfrombacklogstartingfromthelastcommittedoffset
• Cons• Intrusive:GridGainclusterimpact• Complexconfiguration:needtoestimateamountofmemoryforBacklog
GridGainSourceConnector:DynamicReconfigurationConnectormonitorslistofavailablecachesandre-configuresitselfifacacheisaddedorremoved.UsecacheWhitelist and cacheBlacklist propertiestodefinefromwhichcachestopulldata.
GridGainSourceConnector:InitialDataLoad
UseshallLoadInitialData configurationpropertytospecifyifyouwanttheConnectortoloadthedatathatisalreadyinthecachebythetimetheConnectorstarts.
GridGainSinkConnector
• SinkConnectorsareinherentlyscalablesinceconsumingdatafromaKafkatopicisscalable• SinkConnectorsinherentlysupportfailoverthankstotheKafkaConnectorframeworkauto-committingoffsetsofthepusheddata.
GridGainConnectorDataSchema
BothSourceandSinkGridGainConnectorssupportdataschema.• AllowsGridGainConnectorsunderstanddatawithattachedschemafromotherKafkaproducersandconsumers• SourceConnectorattachesKafkaschemabuiltfromIgniteBinaryobjects• SinkConnectorconvertsKafkarecordstoIgniteBinaryobjectsusingKafkaschema
Limitations:• IgniteAnnotationsarenotsupported• IgniteCHARconvertedtoKafkaSHORT(sameforarrays)• IgniteUUIDandCLASSconvertedtoKafkaSTRING(sameforarrays)
IgniteConnectorFeatures
• IgniteSourceConnector• pushesdatafromIgniteintoKafka• usesIgniteEvents• mustenableEVT_CACHE_OBJECT_PUT,whichnegativelyimpactsclusterperformance
• IgniteSinkConnector• pullsdatafromKafkaintoIgnite• useIgnitedatastreamer
ApacheIgnitevs.GridGainConnectors
Feature ApacheIgniteConnector GridGainConnector
Scalability LimitedSourceconnectorisnotparallelSinkconnectorisparallel
SourceconnectorcreatesataskpercacheSinkconnectorisparallel
Failover NOSourcedataislostduringconnectorrestartorrebalancing
YESSourceconnectorcanbeconfiguredtoresumefromthelastcommittedoffset
Preservingsourcedataschema
NO YES
Handlingmultiplecaches
NO YESConnectorcanbeconfiguredtohandleanynumberofcaches
DynamicReconfiguration
NO YESSourceconnectordetectsaddedorremovedcachesandre-configuresitself
ApacheIgnitevs.GridGainConnectors
Feature ApacheIgniteConnector GridGainConnector
InitialDataLoad NO YES
Handlingdataremovals
YES YES
SerializationandDeserializationofdata
YES YES
Filtering LimitedOnlysourceconnectorsupportsafilter
YESBothsourceandsinkconnectorssupportfilters
Transformations KafkaSMTs KafkaSMTs
ApacheIgnitevs.GridGainConnectors
Feature ApacheIgniteConnector GridGainConnector
DevOps Somefree-texterrorlogging HealthModeldefined
Support ApacheIgniteCommunity SupportedbyGridGain,certifiedbyConfluent
Packaging UberJAR ConnectorPackage
Deployment PluginPATHonallKafkaConnectworkers
PluginPATHonallKafkaConnectworkers.CLASSPATHonallGridGainnodes.
KafkaAPIVersion 0.10 2.0
SourceAPI Igniteevents Ignitecontinuousqueries
SinkAPI Ignitedatastreamer Ignitedatastreamer
Deployment,monitoringandmanagement
GridGainConnectorDeployment
1. PrepareConnectorPackage2. RegisterConnectorwithKafka3. RegisterConnectorwithGridGain
PrepareGridGainConnectorPackage
1. GridGain-KafkaConnectorispartofGridGainEnterpriseandUltimate8.5.3(tobereleasedintheendofOctober2018)
2. Theconnectorisin$GRIDGAIN_HOME/integration/gridgain-kafka-connect
• (GRIDGAIN_HOMEenvironmentvariablepointstotherootGridGaininstallationdirectory)
3. Pullmissingconnectordependenciesintothepackage:cd $GRIDGAIN_HOME/integration/gridgain-kafka-connect./copy-dependencies.sh
RegisterGridGainConnectorwithKafka
ForeveryKafkaConnectWorker:1. CopyGridGainConnectorpackagedirectorytowhereyouwant
KafkaConnectorstobelocatedforexample,into/opt/kafka/connect directory
2. EditKafkaConnectWorkerconfiguration(kafka-connect-standalone.properties orkafka-connect-distributed.properties)toregistertheconnectoronthepluginpath:plugin.path=/opt/kafka/connect/gridgain-kafka-connect
RegisterGridGainConnectorwithGridGain
ThisassumesGridGainversionis8.5.3OneveryGridGainservernodecopythebelowJARsinto$GRIDGAIN_HOME/libs/user directory.GettheKafkaJARsfromtheKafkaConnectworkers:• gridgain-kafka-connect-8.5.3.jar• connect-api-2.0.0.jar• kafka-clients-2.0.0.jar
IgniteConnectorDeployment
1. PrepareConnectorPackage2. RegisterConnectorwithKafka
PrepareIgniteConnectorPackage
ThisassumesIgniteversionis2.6.Createadirecotory containingthebelowJARs(findJARsinthe$IGNITE_HOME/libs sub-directories):
• ignite-kafka-connect-0.10.0.1.jar• ignite-core-2.6.0.jar• ignite-spring-2.6.0.jar• cache-api-1.0.0.jar• spring-aop-4.3.16.RELEASE.jar• spring-beans-4.3.16.RELEASE.jar• spring-context-4.3.16.RELEASE.jar• spring-core-4.3.16.RELEASE.jar• spring-expression-4.3.16.RELEASE.jar• commons-logging-1.1.1.jar
RegisterGridGainConnectorwithKafka
ForeveryKafkaConnectWorker:1. CopyIgniteConnectorpackagedirectorytowhereyouwantKafka
Connectorstobelocatedforexample,into/opt/kafka/connect directory
2. EditKafkaConnectWorkerconfiguration(kafka-connect-standalone.properties orkafka-connect-distributed.properties)toregistertheconnectoronthepluginpath:plugin.path=/opt/kafka/connect/ignite-kafka-connect
Monitoring:GridGainConnector
WelldefinedHealthModel:• NumericEventIDuniquelyidentifiesspecificproblem• Eventseverity• Problemdescriptionandrecoveryactionisavailableathttps://docs.gridgain.com/docs/certified-kafka-connector-monitoring
ConfigureyourmonitoringsystemtodetecteventIDinthelogsandmayberunautomatedrecoveryasdefinedintheHealthModel
• Samplestructuredlogentry(#usedasadelimiter):09-10-2018 19:57:35 # ERROR # 15000 # Spring XML configuration path is invalid: /invalid/path/ignite.xml
Monitoring:IgniteConnector
NoHealthModelisdefined.1. Runnegativetests2. CheckKafkaandIgnitelogsoutput3. Configureyourmonitoringsystemtodetectcorrespondingtext
patternsinthelogs
IntegrationExamplesPropagatingRDBMSupdatesintoGridGain
PropagatingRDBMSupdatesintoGridGain
Ignite/GridGainhasa3rd PartyPersistencefeature(CacheStore)thatallows:• PropagatingcachechangestoexternalstoragelikeRDBMS• AutomaticallycopyingdatafromexternalstoragetoIgniteuponaccessingdatamissedinIgnite
WhatifyouwanttopropagateexternalstoragechangetoIgniteatthemomentofthechange?- 3rd PartyPersistencecannotdothat!
PropagatingRDBMSupdatesintoGridGain
UseKafkatoachievethatwithoutwritingsinglelineofcode!
Assumptions
• Forsimplicitywewillruneverythingonthesamehost• IndistributedmodeGridGainnodes,KafkaConnectworkersandKafkabrokersarerunningondifferenthosts
• GridGain8.5.3clusterwithGRIDGAIN_HOMEvariablesetonthenodes• Kafka2.0clusterwithKAFKA_HOMEvariablesetonallbrokers
1.RunDBServer
WewilluseH2Databaseinthisdemo.Wewilluse/tmp/gridgain-h2-connect asaworkdirectory.• DownloadH2andsetH2_HOMEenvironmentvariable.• RunH2Server:java -cp $H2_HOME/bin/h2*.jar org.h2.tools.Server -webPort 18082 -tcpPort 19092
TCP server running at tcp://172.25.4.74:19092 (only local connections)
PG server running at pg://172.25.4.74:5435 (only local connections)
Web Console server running at http://172.25.4.74:18082 (only local connections)
• IntheopenedH2WebConsolespecifyJDBCURL:jdbc:h2:/tmp/gridgain-h2-connect/marketdata
• PressConnect
2.CreateDBTablesandAddSomeData
InH2WebConsoleExecute:• CREATE TABLE IF NOT EXISTS QUOTES (id int, date_time timestamp, price double, PRIMARY KEY (id));
• CREATE TABLE IF NOT EXISTS TRADES (id int, symbol varchar, PRIMARY KEY (id));
• INSERT INTO TRADES (id, symbol) VALUES (1, 'IBM');
• INSERT INTO QUOTES (id, date_time, price) VALUES (1, CURRENT_TIMESTAMP(), 1.0);
3.StartGridGainCluster(Single-node)
$GRIDGAIN_HOME/bin/ignite.sh /tmp/gridgain-h2-connect/ignite-server.xml[15:41:15] Ignite node started OK (id=b9963f9a)
[15:41:15] Topology snapshot [ver=1, servers=1, clients=0, CPUs=8, offheap=3.2GB, heap=1.0GB]
[15:41:15] ^-- Node [id=B9963F9A-8F1E-4177-9743-F129414EB133, clusterState=ACTIVE]
<beanid="ignite.cfg"class="org.apache.ignite.configuration.IgniteConfiguration"><propertyname="discoverySpi"><beanclass="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"><propertyname="ipFinder"><beanclass="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"><propertyname="addresses"><list><value>127.0.0.1:47500</value>
</list></property>
</bean></property>
</bean></property>
</bean>
4.DeploySourceandSinkConnectors
• DownloadConfluentJDBCConnectorpackagefromhttps://www.confluent.io/connector/kafka-connect-jdbc/• UnzipConfluentJDBCConnectorpackageinto/tmp/gridgain-h2-connect/confluentinc-kafka-connect-jdbc
• CopyGridGainConnectorpackagefrom$GRIDGAIN_HOME/integration/gridgain-kafka-connect into/tmp/gridgain-h2-connect/gridgain-kafka-connect• Copykafka-connect-standalone.properties Kafkaworkerconfigurationfilefrom$KAFKA_HOME/config into/tmp/gridgain-h2-connect andsetthepluginpathproperty:
plugin.path=/tmp/gridgain-h2-connect/confluentinc-kafka-connect-jdbc-5.1.0-SNAPSHOT,/tmp/gridgain-h2-connect/gridgain-gridgain-kafka-connect-8.7.0-SNAPSHOT
5.StartKafkaCluster(Single-broker)• ConfigureZookeeperwith/tmp/gridgain-h2-connect/zookeeper.properties:dataDir=/tmp/gridgain-h2-connect/zookeeperclientPort=2181
• StartZookeeper:$KAFKA_HOME/bin/zookeeper-server-start.sh /tmp/gridgain-h2-connect/zookeeper.properties
• ConfigureKafkabroker:copydefault$KAFKA_HOME/config/server.properties to/tmp/gridgain-h2-connect/kafka-server.properties customizeit:
broker.id=0listeners=PLAINTEXT://:9092log.dirs=/tmp/gridgain-h2-connect/kafka-logszookeeper.connect=localhost:2181
• StartKafkabroker:$KAFKA_HOME/bin/kafka-server-start.sh /tmp/gridgain-h2-connect/kafka-server.properties[2018-10-10 16:11:21,573] INFO Kafka version : 2.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2018-10-10 16:11:21,573] INFO Kafka commitId : 3402a8361b734732 (org.apache.kafka.common.utils.AppInfoParser)
[2018-10-10 16:11:21,574] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
6.ConfigureSourceJDBCConnector
/tmp/gridgain-h2-connect/kafka-connect-h2-source.properties:
name=h2-marketdata-sourceconnector.class=io.confluent.connect.jdbc.JdbcSourceConnectortasks.max=10
connection.url=jdbc:h2:tcp://localhost:19092//tmp/gridgain-h2-connect/marketdatatable.whitelist=quotes,trades
mode=timestamp+incrementingtimestamp.column.name=date_timeincrementing.column.name=id
topic.prefix=h2-
7.ConfigureSinkGridGainConnector
/tmp/gridgain-h2-connect/kafka-connect-gridgain-sink.properties:
name=gridgain-marketdata-sinktopics=h2-QUOTES,h2-TRADEStasks.max=10connector.class=org.gridgain.kafka.sink.IgniteSinkConnector
igniteCfg=/tmp/gridgain-h2-connect/ignite-client-sink.xmltopicPrefix=h2-
8.StartKafka-ConnectCluster(Single-worker)
$KAFKA_HOME/bin/connect-standalone.sh \
/tmp/gridgain-h2-connect/kafka-connect-standalone.properties \
/tmp/gridgain-h2-connect/kafka-connect-h2-source.properties \
/tmp/gridgain-h2-connect/kafka-connect-gridgain-sink.properties
[2018-10-10 16:52:21,618] INFO Created connector h2-marketdata-source (org.apache.kafka.connect.cli.ConnectStandalone:104)
[2018-10-10 16:52:22,254] INFO Created connector gridgain-marketdata-sink (org.apache.kafka.connect.cli.ConnectStandalone:104)
9.SeeCachesCreatedinGridGain
OpenGridGainWebConsoleMonitoringDashboardathttps://console.gridgain.com/monitoring/dashboard andseeGridGainSinkConnectorcreatedQUOTESandTRADEScaches:
10.SeeInitialH2DatainGridGain
OpenGridGainWebConsoleQueriespageandrunScanqueriesforQUOTESandTRADES:
11.UpdateH2Tables
InH2WebConsoleExecute:• INSERT INTO TRADES (id, symbol) VALUES (2, ‘INTL');
• INSERT INTO QUOTES (id, date_time, price) VALUES (2, CURRENT_TIMESTAMP(), 2.0);
12.SeeRealtimeH2DatainGridGain
OpenGridGainWebConsoleQueriespageandrunScanqueriesforQUOTESandTRADES:
Performanceandscalabilitytuning
DisableProcessingofUpdates
Forperformancereasons,SinkConnectordoesnotsupportexistingcacheentryupdatebydefault.Set shallProcessUpdates configurationsettingto true tomaketheSinkConnectorupdateexistingentries.
DisableDynamicSchema
Sourceconnectorcacheskeyandvaluesschemas.• Theschemasarecreatedasthefirstcacheentryispulledandre-usedforallsubsequententries.
Thisworksonlyiftheschemasneverchange.• Set isSchemaDynamic to true tosupportschemachanges.
ConsiderDisablingSchema
SourceConnectordoesnotgenerateschemasif isSchemaless configurationsettingis true.Disablingschemassignificantlyimprovesperformance.
CarefullyChooseFailoverPolicy
• Canallowlosingdata?UseNone.• Cachesaresmall(e.g.referencedatacaches)?UseFullSnapshot.• OtherwiseuseBacklog.
PlanKafkaConnectBacklogCapacity
OnlyBacklog failoverpolicysupportsboth“atleastonce”and“exactlyonce”deliveryguarantee.GridGainSourceConnectorcreatesBackloginthe“kafka-connect”memoryregion,whichrequirescapacityplanningtoavoidlosingdatabyeviction(unlesspersistenceisenabled).Considertheworstcasescenario:• MaximumKafkaConnectorworkerdowntimeallowedinyoursystem• Peaktraffic
Multiplepeaktrafficbymaxdowntimetoestimate“kafka-connect”dataregionsize.
Q&AThankyou!
top related