paris kafka meetup - how to develop with kafka

20
Paris Apache Kafka Meetup Florian HUSSONNOIS Zenika @fhussonnois

Upload: florian-hussonnois

Post on 11-Apr-2017

190 views

Category:

Software


0 download

TRANSCRIPT

Page 1: Paris Kafka Meetup - How to develop with Kafka

Paris Apache Kafka Meetup

Florian HUSSONNOISZenika

@fhussonnois

Page 2: Paris Kafka Meetup - How to develop with Kafka

Async, Sync, Batch, Partitioner et Retries

Page 3: Paris Kafka Meetup - How to develop with Kafka

Properties config = new Properties();

config.put("bootstrap.servers", "localhost:9092");

KafkaProducer<String, String> producer = new KafkaProducer(config);

ProducerRecord record = new ProducerRecord("my_topic", "my_key", "my_value");

producer.send(record);

producer.close();

Page 4: Paris Kafka Meetup - How to develop with Kafka

L’appel à la méthode send()est asynchrone et retourne immédiatement

Le message est ajouté à un buffer avant d’être envoyé

//...

config.put("batch.size", 16384);

config.put("linger.ms", 1);

//... Latence entre chaque transmission de messages

Taille maximum d’un batch

Page 5: Paris Kafka Meetup - How to develop with Kafka

List<ProducerRecord> batchRecords = new ArrayList<>();

//...

for(ProducerRecord record : batchRecords)

producer.send(record);

producer.flush();

producer.close(); Force l’envoi des messages et bloque jusqu’à leurcomplétion

Page 6: Paris Kafka Meetup - How to develop with Kafka

Future<RecordMetadata> future = producer.send(record);

RecordMetadata metadata = future.get(); // BLOCK

LOG.info("message sent to topic {}, partition {}, offset {}",

metadata.topic(),

metadata.partition(),

metadata.offset());

Page 7: Paris Kafka Meetup - How to develop with Kafka

ProducerRecord record = new ProducerRecord("my_topic", "my_key", "my_value");

Future<RecordMetadata> future = producer.send(record, (metadata, e) -> {

if(e != null)

LOG.info("Message sent to topic {}, partition {}, offset {}",

metadata.topic(),

metadata.partition(),

metadata.offset());

else

LOG.error("Damn it!", e);

});

Page 8: Paris Kafka Meetup - How to develop with Kafka

Configurationconfig.put("partitioner.class", DefaultPartitioner.class.getName()

Implémenter un Partitionerpublic interface Partitioner {

int partition(String topic,

Object key, byte[] keyBytes,

Object value, byte[] valueBytes, Cluster cluster);

}

Spécifier directement la partition ciblenew ProducerRecord("my_topic", 0, "my_key", "my_value");

Page 9: Paris Kafka Meetup - How to develop with Kafka

Acknowledgmentsconfig.put("ack", "all "); // plus lent, messages répliqués par tous les ISR

Le Producer peut rejouer automatiquement les messages en erreursconfig.put("retries", "0 "); // désactivé

/!\ Peut provoquer des doublons (At-Least Once)

/!\ Peut changer l’ordre de publication des messages

Page 10: Paris Kafka Meetup - How to develop with Kafka

Event Loop, Polling Model, Offset et Group Management

Page 11: Paris Kafka Meetup - How to develop with Kafka

Properties config = new Properties();

config.put("bootstrap.servers", "localhost:9092");

KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(config);

consumer.subscribe(Arrays.asList("topic1, topic2"));

while(true) {

ConsumerRecords<Object, Object> records = consumer.poll(1000);

records.forEach(record ->

LOG.info("key={}, value={}", record.key(), record.value()));

}

Event Loop, Polling Model

Page 12: Paris Kafka Meetup - How to develop with Kafka

Properties config = new Properties();

config.put("bootstrap.servers", "localhost:9092");

config.put("enable.auto.commit", false); // désactive auto-commit

config.put("auto.commit.interval.ms", 100);

KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(config);

consumer.subscribe(Arrays.asList("topic1, topic2"));

while(true) {

ConsumerRecords<Object, Object> records = consumer.poll(1000);

records.forEach(record ->

LOG.info("key={}, value={}", record.key(), record.value()));

consumer.commitAsync();

}

}

Page 13: Paris Kafka Meetup - How to develop with Kafka

while(true) {

ConsumerRecords<Object, Object> records = consumer.poll(1000);

consumer.commitSync(); // Commit offsets before processing messages.

records.forEach(record ->

LOG.info("key={}, value={}", record.key(), record.value()));

}

Page 14: Paris Kafka Meetup - How to develop with Kafka
Page 15: Paris Kafka Meetup - How to develop with Kafka

Properties config = new Properties();

config.put("bootstrap.servers", "localhost:9092");

config.put("group.id", "my_group");

KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(config);

consumer.subscribe(Arrays.asList("topic1, topic2"));

Page 16: Paris Kafka Meetup - How to develop with Kafka

KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(config);

consumer.subscribe(Arrays.asList("topic1, topic2"), new

ConsumerRebalanceListener() {

@Override

public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

//do some stuff

}

@Override

public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

//do some stuff

}

});

Page 17: Paris Kafka Meetup - How to develop with Kafka

Chaque consumer d’un groupe doit notifier le coordinateur

Uniquement possible sur un appel aux méthodes poll, commit, etc.

Déclenché si un consumer rejoint ou quitte un groupe

L’opération de « rebalance » est impactée par les paramètres : • session.timeout.ms (30 secondes)• heartbeat.interval.ms

Rebalance intempestif en cas de traitement d’un message trop long

Page 18: Paris Kafka Meetup - How to develop with Kafka

ConsumerRecords<Object, Object> records = consumer.poll(1000);

if( ! records.isEmpty() ) {

consumer.pause(consumer.assignment().toArray(new TopicPartition[0]));Future<Boolean> future = executorService.submit(() -> {

records.forEach(record -> LOG.info("key={}, value={}", record.key(), record.value()));

return true;

});

Boolean isCompleted = false;

while(!isCompleted) {

try {

isCompleted = future.get(5, TimeUnit.SECONDS); // Wait before polling

} catch (TimeoutException e) {

consumer.poll(0); // heart-beat

} catch (CancellationException |ExecutionException | InterruptedException e) {

break;

}

}

consumer.resume(consumer.assignment().toArray(new TopicPartition[0]));consumer.commitSync();

}

ExecutorService

Page 19: Paris Kafka Meetup - How to develop with Kafka

Se positionner à un offset spécifiqueconsumer.seek(new TopicPartition("my_topic", 0), 42);

consumer.seekToEnd(new TopicPartition("my_topic", 0));

consumer.seekToBeginning(new TopicPartition("my_topic", 0));

Assignements manuelconsumer.assign(Arrays.asList(new TopicPartition("my_topic", 0)));

Obtenir les métriquesconsumer.metrics();

Page 20: Paris Kafka Meetup - How to develop with Kafka

Nous recrutons ! [email protected]@ZenikaIT

Prochain Meetup le