spring for apache kafka
TRANSCRIPT
Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /
Spring for Apache KafkaGary P. Russell
@gprussell
Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /
Spring for Apache Kafka• Brings the familiar Spring programming model to Apache Kafka
• KafkaTemplate• MessageListenerContainer• @KafkaListener Annotation
http://projects.spring.io/spring-kafka/
2
Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /
Spring for Apache Kafkaspring-kafka: • 1.0.2.RELEASE (0.9.x.x client)• 1.1.0.BUILD-SNAPSHOT (0.10.x.x client) - milestone soon• https://github.com/spring-projects/spring-kafka
spring-integration-kafka:• inbound/outbound channel adapters• 2.0.1.RELEASE (based on spring-kafka) • 1.3.1.RELEASE (uses 0.8.x.x client directly)• https://github.com/spring-projects/spring-integration-kafka
3
Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /
ProducerFactory
4
@Beanpublic ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.brokerAddress);// ...props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);return new DefaultKafkaProducerFactory<>(props);
}
Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /
KafkaTemplate
5
@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());}
ListenableFuture<SendResult<K, V>> send(String topic, V data);ListenableFuture<SendResult<K, V>> send(String topic,
int partition, V data);
etc.
Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /
ConsumerFactory
6
@Beanpublic ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.brokerAddress);props.put(ConsumerConfig.GROUP_ID_CONFIG, "s1pGroup");// ...props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);return new DefaultKafkaConsumerFactory<>(props);
}
Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /
@KafkaListener
7
@KafkaListener(id = "foo", topics = “${my.topic}")public void listen1(String foo) {
// ...}
@KafkaListener(id = "bar", topicPattern = “${topic.root}\.*”)public void listen2(@Payload String foo,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
// ...}
Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /
@KafkaListener - Explicit Partition Assignment
8
@KafkaListener(topicPartitions = @TopicPartition(topic = "${kafka.topic}", partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "0"),@PartitionOffset(partition = "1", initialOffset = "0"),
}))public void listen(String foo) {// ...
}
Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /
@KafkaListener - Container Factory
9
@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setMessageConverter(
new StringJsonMessageConverter());return factory;
}
Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /
Listener Containers
10
@Beanpublic KafkaMessageListenerContainer<String, String> container(
ConsumerFactory<String, String> consumerFactory,ConfigProperties config) {
ContainerProperties containerProperties = new ContainerProperties(config.getTopic());
containerProperties.setMessageListener(listener());return new KafkaMessageListenerContainer<>(consumerFactory,
containerProperties);}
Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /
MessageListener
11
public class Listener implements MessageListener<String, String> {
@Overridepublic void onMessage(ConsumerRecord<String, String> record) {
// ...}
}
Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /
AcknowledgingMessageListener
12
public static class AckListener implements AcknowledgingMessageListener<String, String> {
@Overridepublic void onMessage(ConsumerRecord<String, String> record,
Acknowledgment acknowledgment) {// ...acknowledgment.acknowledge();
}
}
Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /
Listener Adapters• Add behavior to listeners
• Retry adapter• Filtering adapter
• Also configurable when using @KafkaListener
13
Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /
Spring for Apache Kafka Test Support• Embedded Kafka Server - JUnit @Rule
• Creates topics with configurable partitions• Utility methods to wait for partition assignment• Hamcrest Matchers• AssertJ Conditions
14
Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /
Some Caveats• Polling, Pause, Resume, auto commit problems
• Let the container manage the commits (or manual)
15
Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /
Spring for Apache Kafka
DEMO
16
Unless o therw ise ind ica ted , these s l ides are © 2013-2016 P ivo ta l So f tware , Inc . and l i censed under a Creat ive Commons At t r ibu t ion-NonCommerc ia l l i cense: h t tp : / / c rea t ivecommons.org / l i censes /by-nc /3 .0 /
Learn More. Stay Connected.
Contribute!!
https://github.com/spring-projects/spring-kafka/CONTRIBUTING.adoc
@springcentralspring.io/blog
@pivotalpivotal.io/blog
@pivotalcfhttp://engineering.pivotal.io