spring for apache kafka

17
Unless otherwise indicated, these slides are © 2013-2016 Pivotal Software, Inc. and licensed under a Creative Commons Attribution-NonCommercial license: http://creativecommons.org/licenses/by-nc/3.0/ Spring for Apache Kafka Gary P. Russell @gprussell

Upload: spring-io

Post on 06-Jan-2017

407 views

Category:

Technology


0 download

TRANSCRIPT

Page 1: Spring For Apache Kafka

Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /

Spring for Apache KafkaGary P. Russell

@gprussell

Page 2: Spring For Apache Kafka

Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /

Spring for Apache Kafka• Brings the familiar Spring programming model to Apache Kafka

• KafkaTemplate• MessageListenerContainer• @KafkaListener Annotation

http://projects.spring.io/spring-kafka/

2

Page 3: Spring For Apache Kafka

Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /

Spring for Apache Kafkaspring-kafka: • 1.0.2.RELEASE (0.9.x.x client)• 1.1.0.BUILD-SNAPSHOT (0.10.x.x client) - milestone soon• https://github.com/spring-projects/spring-kafka

spring-integration-kafka:• inbound/outbound channel adapters• 2.0.1.RELEASE (based on spring-kafka) • 1.3.1.RELEASE (uses 0.8.x.x client directly)• https://github.com/spring-projects/spring-integration-kafka

3

Page 4: Spring For Apache Kafka

Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /

ProducerFactory

4

@Beanpublic ProducerFactory<String, String> producerFactory() {

Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,

this.brokerAddress);// ...props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,

StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,

StringSerializer.class);return new DefaultKafkaProducerFactory<>(props);

}

Page 5: Spring For Apache Kafka

Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /

KafkaTemplate

5

@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {

return new KafkaTemplate<>(producerFactory());}

ListenableFuture<SendResult<K, V>> send(String topic, V data);ListenableFuture<SendResult<K, V>> send(String topic,

int partition, V data);

etc.

Page 6: Spring For Apache Kafka

Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /

ConsumerFactory

6

@Beanpublic ConsumerFactory<String, String> consumerFactory() {

Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,

this.brokerAddress);props.put(ConsumerConfig.GROUP_ID_CONFIG, "s1pGroup");// ...props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,

StringDeserializer.class);return new DefaultKafkaConsumerFactory<>(props);

}

Page 7: Spring For Apache Kafka

Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /

@KafkaListener

7

@KafkaListener(id = "foo", topics = “${my.topic}")public void listen1(String foo) {

// ...}

@KafkaListener(id = "bar", topicPattern = “${topic.root}\.*”)public void listen2(@Payload String foo,

@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

// ...}

Page 8: Spring For Apache Kafka

Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /

@KafkaListener - Explicit Partition Assignment

8

@KafkaListener(topicPartitions = @TopicPartition(topic = "${kafka.topic}", partitionOffsets = {

@PartitionOffset(partition = "0", initialOffset = "0"),@PartitionOffset(partition = "1", initialOffset = "0"),

}))public void listen(String foo) {// ...

}

Page 9: Spring For Apache Kafka

Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /

@KafkaListener - Container Factory

9

@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String>

kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory =

new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setMessageConverter(

new StringJsonMessageConverter());return factory;

}

Page 10: Spring For Apache Kafka

Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /

Listener Containers

10

@Beanpublic KafkaMessageListenerContainer<String, String> container(

ConsumerFactory<String, String> consumerFactory,ConfigProperties config) {

ContainerProperties containerProperties = new ContainerProperties(config.getTopic());

containerProperties.setMessageListener(listener());return new KafkaMessageListenerContainer<>(consumerFactory,

containerProperties);}

Page 11: Spring For Apache Kafka

Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /

MessageListener

11

public class Listener implements MessageListener<String, String> {

@Overridepublic void onMessage(ConsumerRecord<String, String> record) {

// ...}

}

Page 12: Spring For Apache Kafka

Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /

AcknowledgingMessageListener

12

public static class AckListener implements AcknowledgingMessageListener<String, String> {

@Overridepublic void onMessage(ConsumerRecord<String, String> record,

Acknowledgment acknowledgment) {// ...acknowledgment.acknowledge();

}

}

Page 13: Spring For Apache Kafka

Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /

Listener Adapters• Add behavior to listeners

• Retry adapter• Filtering adapter

• Also configurable when using @KafkaListener

13

Page 14: Spring For Apache Kafka

Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /

Spring for Apache Kafka Test Support• Embedded Kafka Server - JUnit @Rule

• Creates topics with configurable partitions• Utility methods to wait for partition assignment• Hamcrest Matchers• AssertJ Conditions

14

Page 15: Spring For Apache Kafka

Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /

Some Caveats• Polling, Pause, Resume, auto commit problems

• Let the container manage the commits (or manual)

15

Page 16: Spring For Apache Kafka

Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /

Spring for Apache Kafka

DEMO

16

Page 17: Spring For Apache Kafka

Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /

Learn More. Stay Connected.

Contribute!!

https://github.com/spring-projects/spring-kafka/CONTRIBUTING.adoc

@springcentralspring.io/blog

@pivotalpivotal.io/blog

@pivotalcfhttp://engineering.pivotal.io