Paris Kafka Meetup - How to develop with Kafka

Download Paris Kafka Meetup - How to develop with Kafka

Post on 11-Apr-2017

181 views

Category:

Software

0 download

TRANSCRIPT

  • Paris Apache Kafka Meetup

    Florian HUSSONNOISZenika

    @fhussonnois

  • Async, Sync, Batch, Partitioner et Retries

  • Properties config = new Properties();

    config.put("bootstrap.servers", "localhost:9092");

    KafkaProducer producer = new KafkaProducer(config);

    ProducerRecord record = new ProducerRecord("my_topic", "my_key", "my_value");

    producer.send(record);

    producer.close();

  • Lappel la mthode send()est asynchrone et retourne immdiatement

    Le message est ajout un buffer avant dtre envoy

    //...

    config.put("batch.size", 16384);

    config.put("linger.ms", 1);

    //... Latence entre chaque transmission de messages

    Taille maximum dun batch

  • List batchRecords = new ArrayList();

    //...

    for(ProducerRecord record : batchRecords)

    producer.send(record);

    producer.flush();

    producer.close(); Force lenvoi des messages et bloque jusqu leurcompltion

  • Future future = producer.send(record);

    RecordMetadata metadata = future.get(); // BLOCK

    LOG.info("message sent to topic {}, partition {}, offset {}",

    metadata.topic(),

    metadata.partition(),

    metadata.offset());

  • ProducerRecord record = new ProducerRecord("my_topic", "my_key", "my_value");

    Future future = producer.send(record, (metadata, e) -> {

    if(e != null)

    LOG.info("Message sent to topic {}, partition {}, offset {}",

    metadata.topic(),

    metadata.partition(),

    metadata.offset());

    else

    LOG.error("Damn it!", e);

    });

  • Configurationconfig.put("partitioner.class", DefaultPartitioner.class.getName()

    Implmenter un Partitionerpublic interface Partitioner {

    int partition(String topic,

    Object key, byte[] keyBytes,

    Object value, byte[] valueBytes, Cluster cluster);

    }

    Spcifier directement la partition ciblenew ProducerRecord("my_topic", 0, "my_key", "my_value");

  • Acknowledgmentsconfig.put("ack", "all "); // plus lent, messages rpliqus par tous les ISR

    Le Producer peut rejouer automatiquement les messages en erreursconfig.put("retries", "0 "); // dsactiv

    /!\ Peut provoquer des doublons (At-Least Once)

    /!\ Peut changer lordre de publication des messages

  • Event Loop, Polling Model, Offset et Group Management

  • Properties config = new Properties();

    config.put("bootstrap.servers", "localhost:9092");

    KafkaConsumer consumer = new KafkaConsumer(config);

    consumer.subscribe(Arrays.asList("topic1, topic2"));

    while(true) {

    ConsumerRecords records = consumer.poll(1000);

    records.forEach(record ->

    LOG.info("key={}, value={}", record.key(), record.value()));

    }

    Event Loop, Polling Model

  • Properties config = new Properties();

    config.put("bootstrap.servers", "localhost:9092");

    config.put("enable.auto.commit", false); // dsactive auto-commit

    config.put("auto.commit.interval.ms", 100);

    KafkaConsumer consumer = new KafkaConsumer(config);

    consumer.subscribe(Arrays.asList("topic1, topic2"));

    while(true) {

    ConsumerRecords records = consumer.poll(1000);

    records.forEach(record ->

    LOG.info("key={}, value={}", record.key(), record.value()));

    consumer.commitAsync();

    }

    }

  • while(true) {

    ConsumerRecords records = consumer.poll(1000);

    consumer.commitSync(); // Commit offsets before processing messages.

    records.forEach(record ->

    LOG.info("key={}, value={}", record.key(), record.value()));

    }

  • Properties config = new Properties();

    config.put("bootstrap.servers", "localhost:9092");

    config.put("group.id", "my_group");

    KafkaConsumer consumer = new KafkaConsumer(config);

    consumer.subscribe(Arrays.asList("topic1, topic2"));

  • KafkaConsumer consumer = new KafkaConsumer(config);

    consumer.subscribe(Arrays.asList("topic1, topic2"), new

    ConsumerRebalanceListener() {

    @Override

    public void onPartitionsRevoked(Collection partitions) {

    //do some stuff

    }

    @Override

    public void onPartitionsAssigned(Collection partitions) {

    //do some stuff

    }

    });

  • Chaque consumer dun groupe doit notifier le coordinateur

    Uniquement possible sur un appel aux mthodes poll, commit, etc.

    Dclench si un consumer rejoint ou quitte un groupe

    Lopration de rebalance est impacte par les paramtres : session.timeout.ms (30 secondes) heartbeat.interval.ms

    Rebalance intempestif en cas de traitement dun message trop long

  • ConsumerRecords records = consumer.poll(1000);

    if( ! records.isEmpty() ) {

    consumer.pause(consumer.assignment().toArray(new TopicPartition[0]));Future future = executorService.submit(() -> {

    records.forEach(record -> LOG.info("key={}, value={}", record.key(), record.value()));

    return true;

    });

    Boolean isCompleted = false;

    while(!isCompleted) {

    try {

    isCompleted = future.get(5, TimeUnit.SECONDS); // Wait before polling

    } catch (TimeoutException e) {

    consumer.poll(0); // heart-beat

    } catch (CancellationException |ExecutionException | InterruptedException e) {

    break;

    }

    }

    consumer.resume(consumer.assignment().toArray(new TopicPartition[0]));consumer.commitSync();

    }

    ExecutorService

  • Se positionner un offset spcifiqueconsumer.seek(new TopicPartition("my_topic", 0), 42);

    consumer.seekToEnd(new TopicPartition("my_topic", 0));

    consumer.seekToBeginning(new TopicPartition("my_topic", 0));

    Assignements manuelconsumer.assign(Arrays.asList(new TopicPartition("my_topic", 0)));

    Obtenir les mtriquesconsumer.metrics();

  • Nous recrutons ! jobs@zenika.com@ZenikaIT

    Prochain Meetup le

    mailto:jobs@zenika.com