big data lambda architecture - streaming layer hands on
Post on 21-Jan-2017
258 Views
Preview:
TRANSCRIPT
Big Data PipelineLambda Architecture - Streaming(Real-Time) Layer
with Apache Kafka
Apache HadoopApache Spark
Apache Cassandraon Amazon Web Services Cloud Platform
INGEST STORE Process Visualize
BIG Data Pipeline
Data Pipeline
AngularJS Web App
ClickStreamData
ApacheWeb Logs
Log/Data File
SparkStreaming
SparkSQL
ApacheKafka
S3
HDFS
ApacheCassandra
AngularJS Web App
April
May
June
July
INGEST STREAM PROCESSVISUALIZE
STORE
InteractiveQueries
Spark Cluster
TCPSockets
BIG Data Streaming (Real-Time) Layer Pipeline
Install Kafka - 3 Node Cluster on AWS
3 EC2 instance for Kafka Cluster
Repeat commands for all - 3 EC2 instance for Kafka Cluster
cat /etc/*-release
sudo add-apt-repository ppa:webupd8team/java
sudo apt-get update
sudo apt-get install oracle-java8-installer
java -version
mkdir kafka
cd kafka
wget http://download.nextag.com/apache/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz
tar -zxvf kafka_2.11-0.10.0.0.tgz
cd kafka_2.11-0.10.0.0
ZooKeeper ==> 172.31.48.208 / 52.91.1.93
Kafka-datanode1 ==> 172.31.63.203 / 54.173.215.211
Kafka-datanode2 ==> 172.31.9.25 / 54.226.29.194
Modify config/server.properties for kafka-datanode1 & kafkadatanode2
ZooKeeper ==> 172.31.48.208 / 52.91.1.93
Kafka-datanode1 ==> 172.31.63.203 / 54.173.215.211
Kafka-datanode2 ==> 172.31.9.25 / 54.226.29.194
Kafka-datanode1 (set following properties for config/server.properties)
ubuntu@ip-172-31-63-203:~/kafka/kafka_2.11-0.10.0.0$ vi config/server.properties
broker.id=1listeners=PLAINTEXT://172.31.63.203:9092advertised.listeners=PLAINTEXT://54.173.215.211:9092zookeeper.connect=52.91.1.93:2181
Kafka-datanode2 (set following properties for config/server.properties)
ubuntu@ip-172-31-9-25:~/kafka/kafka_2.11-0.10.0.0$ vi config/server.properties broker.id=2listeners=PLAINTEXT://172.31.9.25:9092advertised.listeners=PLAINTEXT://54.226.29.194:9092zookeeper.connect=52.91.1.93:2181
Launch zookeeper / datanode1 / datanode2
ZooKeeper ==> 172.31.48.208 / 52.91.1.93
Kafka-datanode1 ==> 172.31.63.203 / 54.173.215.211
Kafka-datanode2 ==> 172.31.9.25 / 54.226.29.194
1) Start zookeeperbin/zookeeper-server-start.sh config/zookeeper.properties
2) Start server on Kafka-datanode1bin/kafka-server-start.sh config/server.properties
3) Start server on Kafka-datanode2bin/kafka-server-start.sh config/server.properties
4) Create Topic & Start consumer bin/kafka-topics.sh --zookeeper 52.91.1.93:2181 --create --topic data --partitions 1 --replication-factor 2bin/kafka-console-consumer.sh --zookeeper 52.91.1.93:2181 --topic data --from-beginning
Java - Kafka Producer Sample Applicationpackage com.himanshu;import java.io.BufferedReader;import java.io.FileNotFoundException;import java.io.FileReader;import java.io.IOException;//import util.properties packagesimport java.util.Properties;
//import simple producer packagesimport org.apache.kafka.clients.producer.Producer;
//import KafkaProducer packagesimport org.apache.kafka.clients.producer.KafkaProducer;
//import ProducerRecord packagesimport org.apache.kafka.clients.producer.ProducerRecord;
public class DataProducer {
public static void main(String[] args) {
// Check arguments length value/*if(args.length == 0) {
System.out.println("Enter topic name");
return;
} */ //Assign topicName to string variable String topicName = "data"; //args[0].toString(); // create instance for properties to access producer configs Properties props = new Properties(); //Assign localhost id props.put("bootstrap.servers", "54.173.215.211:9092,54.226.29.194:9092"); //props.put("metadata.broker.list", "172.31.63.203:9092,172.31.9.25:9092");
//Set acknowledgements for producer requests. props.put("acks", "all"); //If the request fails, the producer can automatically retry, props.put("retries", 0); //Specify buffer size in config props.put("batch.size", 16384); //Reduce the no of requests less than 0 props.put("linger.ms", 1);
//The buffer.memory controls the total amount of memory available to the producer for buffering. props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<String, String>(props); String csvFile = "/Users/himanshu/Documents/workspace/KafkaProducer/src/com/himanshu/invoice.txt"; String csvSplitBy = ","; BufferedReader br = null; String lineInvoice = ""; try {
br = new BufferedReader(new FileReader(csvFile)); while((lineInvoice = br.readLine()) != null ) { String[] invoice = lineInvoice.split(csvSplitBy); producer.send(new ProducerRecord<String, String>(topicName, lineInvoice)); System.out.println("Message sent successfully...."); }
} catch (FileNotFoundException e) { e.printStackTrace(); }
Java - Kafka Producer Sample Application
catch (IOException e) { e.printStackTrace(); } finally { producer.close(); if (br != null) { try { br.close(); } catch (IOException e) { e.printStackTrace(); } } }}
}
Java - Kafka Producer Sample Application
Sample data which we will be sending to Kafka Serverfrom Java Kafka Producer (csv file)
Message received on kafka datanode1
RealTime Streaming with Kafka
Apache SparkApache Cassandra
Launch Kafka Cluster (Zookeeper/kafka datanode1/ kafka datanode2)
Execute Python / Kafka Spark Job
Sample data which we will be sending to Kafka Serverfrom Java Kafka Producer (csv file)
Python Spark Job Processing Data from AWS Kafka Cluster
Python Spark Job Processing Data from AWS Kafka Cluster&
Processed Data stored in AWS Cassandra Cluster
Sample data which we will be sending to Kafka Serverfrom Java Kafka Producer
Python Spark Job Processing Data from AWS Kafka Cluster&
Processed Data stored in AWS Cassandra Cluster
Apache Spark UI
Python Spark Streaming Application
top related