paris kafka meetup - how to develop with kafka

Post on 11-Apr-2017

190 Views

Category:

Software

0 Downloads

Preview:

Click to see full reader

TRANSCRIPT

Paris Apache Kafka Meetup

Florian HUSSONNOISZenika

@fhussonnois

Async, Sync, Batch, Partitioner et Retries

Properties config = new Properties();

config.put("bootstrap.servers", "localhost:9092");

KafkaProducer<String, String> producer = new KafkaProducer(config);

ProducerRecord record = new ProducerRecord("my_topic", "my_key", "my_value");

producer.send(record);

producer.close();

L’appel à la méthode send()est asynchrone et retourne immédiatement

Le message est ajouté à un buffer avant d’être envoyé

//...

config.put("batch.size", 16384);

config.put("linger.ms", 1);

//... Latence entre chaque transmission de messages

Taille maximum d’un batch

List<ProducerRecord> batchRecords = new ArrayList<>();

//...

for(ProducerRecord record : batchRecords)

producer.send(record);

producer.flush();

producer.close(); Force l’envoi des messages et bloque jusqu’à leurcomplétion

Future<RecordMetadata> future = producer.send(record);

RecordMetadata metadata = future.get(); // BLOCK

LOG.info("message sent to topic {}, partition {}, offset {}",

metadata.topic(),

metadata.partition(),

metadata.offset());

ProducerRecord record = new ProducerRecord("my_topic", "my_key", "my_value");

Future<RecordMetadata> future = producer.send(record, (metadata, e) -> {

if(e != null)

LOG.info("Message sent to topic {}, partition {}, offset {}",

metadata.topic(),

metadata.partition(),

metadata.offset());

else

LOG.error("Damn it!", e);

});

Configurationconfig.put("partitioner.class", DefaultPartitioner.class.getName()

Implémenter un Partitionerpublic interface Partitioner {

int partition(String topic,

Object key, byte[] keyBytes,

Object value, byte[] valueBytes, Cluster cluster);

}

Spécifier directement la partition ciblenew ProducerRecord("my_topic", 0, "my_key", "my_value");

Acknowledgmentsconfig.put("ack", "all "); // plus lent, messages répliqués par tous les ISR

Le Producer peut rejouer automatiquement les messages en erreursconfig.put("retries", "0 "); // désactivé

/!\ Peut provoquer des doublons (At-Least Once)

/!\ Peut changer l’ordre de publication des messages

Event Loop, Polling Model, Offset et Group Management

Properties config = new Properties();

config.put("bootstrap.servers", "localhost:9092");

KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(config);

consumer.subscribe(Arrays.asList("topic1, topic2"));

while(true) {

ConsumerRecords<Object, Object> records = consumer.poll(1000);

records.forEach(record ->

LOG.info("key={}, value={}", record.key(), record.value()));

}

Event Loop, Polling Model

Properties config = new Properties();

config.put("bootstrap.servers", "localhost:9092");

config.put("enable.auto.commit", false); // désactive auto-commit

config.put("auto.commit.interval.ms", 100);

KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(config);

consumer.subscribe(Arrays.asList("topic1, topic2"));

while(true) {

ConsumerRecords<Object, Object> records = consumer.poll(1000);

records.forEach(record ->

LOG.info("key={}, value={}", record.key(), record.value()));

consumer.commitAsync();

}

}

while(true) {

ConsumerRecords<Object, Object> records = consumer.poll(1000);

consumer.commitSync(); // Commit offsets before processing messages.

records.forEach(record ->

LOG.info("key={}, value={}", record.key(), record.value()));

}

Properties config = new Properties();

config.put("bootstrap.servers", "localhost:9092");

config.put("group.id", "my_group");

KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(config);

consumer.subscribe(Arrays.asList("topic1, topic2"));

KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(config);

consumer.subscribe(Arrays.asList("topic1, topic2"), new

ConsumerRebalanceListener() {

@Override

public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

//do some stuff

}

@Override

public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

//do some stuff

}

});

Chaque consumer d’un groupe doit notifier le coordinateur

Uniquement possible sur un appel aux méthodes poll, commit, etc.

Déclenché si un consumer rejoint ou quitte un groupe

L’opération de « rebalance » est impactée par les paramètres : • session.timeout.ms (30 secondes)• heartbeat.interval.ms

Rebalance intempestif en cas de traitement d’un message trop long

ConsumerRecords<Object, Object> records = consumer.poll(1000);

if( ! records.isEmpty() ) {

consumer.pause(consumer.assignment().toArray(new TopicPartition[0]));Future<Boolean> future = executorService.submit(() -> {

records.forEach(record -> LOG.info("key={}, value={}", record.key(), record.value()));

return true;

});

Boolean isCompleted = false;

while(!isCompleted) {

try {

isCompleted = future.get(5, TimeUnit.SECONDS); // Wait before polling

} catch (TimeoutException e) {

consumer.poll(0); // heart-beat

} catch (CancellationException |ExecutionException | InterruptedException e) {

break;

}

}

consumer.resume(consumer.assignment().toArray(new TopicPartition[0]));consumer.commitSync();

}

ExecutorService

Se positionner à un offset spécifiqueconsumer.seek(new TopicPartition("my_topic", 0), 42);

consumer.seekToEnd(new TopicPartition("my_topic", 0));

consumer.seekToBeginning(new TopicPartition("my_topic", 0));

Assignements manuelconsumer.assign(Arrays.asList(new TopicPartition("my_topic", 0)));

Obtenir les métriquesconsumer.metrics();

Nous recrutons ! jobs@zenika.com@ZenikaIT

Prochain Meetup le

top related