streaming using kafka flink & elasticsearch

10
Keira Zhou May 11, 2016

Upload: keira-zhou

Post on 06-Jan-2017

342 views

Category:

Engineering


5 download

TRANSCRIPT

Page 1: Streaming using Kafka Flink & Elasticsearch

Keira Zhou

May 11, 2016

Page 2: Streaming using Kafka Flink & Elasticsearch

§ Install on your laptop

§ Kafka 0.9

§ Flink 1.0.2

§ Elasticserach 2.3.2

Page 3: Streaming using Kafka Flink & Elasticsearch

§ Create a topic§ bin/kafka-topics.sh \

--create \

--zookeeper localhost:2181 \--replication-factor 1 \--partitions 1 \--topic viper_test

Page 4: Streaming using Kafka Flink & Elasticsearch

§ Create an index§ curl -XPUT 'http://localhost:9200/viper-test/' -d '{

"settings" : {

"index" : {

"number_of_shards" : 1,

"number_of_replicas" : 0

}

}

}’

§ Put mapping of a doc type within the index§ curl -XPUT 'localhost:9200/viper-test/_mapping/viper-log' -d '{

"properties": {

"ip": { "type": "string", "index": "not_analyzed" },

"info": { "type": "string" }

}

}'

Page 5: Streaming using Kafka Flink & Elasticsearch

§ More info: § https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html

§ Maven dependency§ <dependency>

<groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.9_2.10</artifactId> <version>1.0.2</version>

</dependency>

§ Example Java code§ public static DataStream<String> readFromKafka(StreamExecutionEnvironment env) {

env.enableCheckpointing(5000); // set up the execution environment Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test"); DataStream<String> stream = env.addSource(

new FlinkKafkaConsumer09<>("test", new SimpleStringSchema(), properties)); return stream;

}

Page 6: Streaming using Kafka Flink & Elasticsearch

§ More info:§ https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/elasticsearch2.html

§ Maven dependency§ <dependency>

<groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch2_2.10</artifactId> <version>1.1-SNAPSHOT</version>

</dependency>

§ Example Java code

§ Next page…

Page 7: Streaming using Kafka Flink & Elasticsearch

§ Example Java code§ public static void writeElastic(DataStream<String> input) {

Map<String, String> config = new HashMap<>(); // This instructs the sink to emit after every element, otherwise they would be buffered config.put("bulk.flush.max.actions", "1"); config.put("cluster.name", "es_keira"); try {

// Add elasticsearch hosts on startup List<InetSocketAddress> transports = new ArrayList<>(); transports.add(new InetSocketAddress("127.0.0.1", 9300)); // port is 9300 not 9200 for ES TransportClientElasticsearchSinkFunction<String> indexLog = new ElasticsearchSinkFunction<String>() {

public IndexRequest createIndexRequest(String element) { String[] logContent = element.trim().split("\t"); Map<String, String> esJson = new HashMap<>(); esJson.put("IP", logContent[0]); esJson.put("info", logContent[1]); return Requests .indexRequest() .index("viper-test") .type("viper-log") .source(esJson); }

@Override public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {

indexer.add(createIndexRequest(element)); }

}; ElasticsearchSink esSink = new ElasticsearchSink(config, transports, indexLog); input.addSink(esSink);

} catch (Exception e) { System.out.println(e);

} }

Page 8: Streaming using Kafka Flink & Elasticsearch

§ https://github.com/keiraqz/KafkaFlinkElastic/blob/master/src/main/java/viper/KafkaFlinkElastic.java

Page 9: Streaming using Kafka Flink & Elasticsearch

§ Start your Flink program in your IDE

§ Start Kafka producer cli interface§ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic viper_test

§ In your terminal, type (it’s tab separated):§ 10.20.30.40 test

§ Afterwards, in elastic:§ curl 'localhost:9200/viper-test/viper-log/_search?pretty’§ You should see:

§ { "took" : 1, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "failed" : 0 }, "hits" : { "total" : 1, "max_score" : 1.0,

"hits" : [ { "_index" : "viper-test", "_type" : "viper-log", "_id" : ”SOMETHING", "_score" : 1.0, "_source" : { "IP" : "10.20.30.40", "info" : "test" } } ]

}}

Page 10: Streaming using Kafka Flink & Elasticsearch

§ https://github.com/keiraqz/KafkaFlinkElastic