kafka streams - power without weight (jeeconf)
TRANSCRIPT
KStreamBuilder builder =new KStreamBuilder();
KStreamBuilder builder =new KStreamBuilder();
KStream<String, String> textLines = builder.stream(inputTopic);
KStreamBuilder builder =new KStreamBuilder();
KStream<String, String> textLines = builder.stream(inputTopic);
KTable<String, Long> wordCounts = textLines.flatMapValues(value -> toWords(value)).groupBy((key, word) -> word).count("Counts");
KStreamBuilder builder =new KStreamBuilder();
KStream<String, String> textLines = builder.stream(inputTopic);
KTable<String, Long> wordCounts = textLines.flatMapValues(value -> toWords(value)).groupBy((key, word) -> word).count("Counts");
wordCounts.toStream().to(outputTopic);
KStreamBuilder builder = new KStreamBuilder();
KStreamBuilder builder = new KStreamBuilder();
StateStoreSupplier store = Stores.create(”Store").persistent().build();
KStreamBuilder builder = new KStreamBuilder();
builder.addSource("SOURCE", inputTopic)
KStreamBuilder builder = new KStreamBuilder();
builder.addSource("SOURCE", inputTopic)
.addProcessor("WORDS_COUNT",new ProcessorSupplier(store.name()),
@Override public void process(byte[] key, String value) {Optional<Long> count =
Optional.ofNullable(stateStore.get(value));Long incrementedCount = count.orElse(0L) + 1;stateStore.put(value, incrementedCount);processorContext.forward(value, incrementedCount);processorContext.commit();
}
KStreamBuilder builder = new KStreamBuilder();
builder.addSource("SOURCE", inputTopic)
.addProcessor("WORDS_COUNT",new ProcessorSupplier(store.name()),
.addStateStore(wordCountsStore, "WORDS_COUNT")
KStreamBuilder builder = new KStreamBuilder();
builder.addSource("SOURCE", inputTopic)
.addProcessor("WORDS_COUNT",new ProcessorSupplier(store.name()),
.addStateStore(wordCountsStore, "WORDS_COUNT")
.addSink("OUTPUT", outputTopic, "WORDS_COUNT");
•elasticity and scalability
•Local/Remote state in stream processing