openbus documentation - read the docs · pdf fileopenbus documentation, release 1 an open...
TRANSCRIPT
![Page 1: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT](https://reader034.vdocuments.us/reader034/viewer/2022042708/5a9f97717f8b9a7f178d0351/html5/thumbnails/1.jpg)
Openbus DocumentationRelease 1
Produban
February 17, 2014
![Page 2: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT](https://reader034.vdocuments.us/reader034/viewer/2022042708/5a9f97717f8b9a7f178d0351/html5/thumbnails/2.jpg)
![Page 3: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT](https://reader034.vdocuments.us/reader034/viewer/2022042708/5a9f97717f8b9a7f178d0351/html5/thumbnails/3.jpg)
Contents
i
![Page 4: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT](https://reader034.vdocuments.us/reader034/viewer/2022042708/5a9f97717f8b9a7f178d0351/html5/thumbnails/4.jpg)
ii
![Page 5: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT](https://reader034.vdocuments.us/reader034/viewer/2022042708/5a9f97717f8b9a7f178d0351/html5/thumbnails/5.jpg)
Openbus Documentation, Release 1
An open source architecture able to process the massive amount of events that occur in a banking IT Infraestructure.
Contents:
Contents 1
![Page 6: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT](https://reader034.vdocuments.us/reader034/viewer/2022042708/5a9f97717f8b9a7f178d0351/html5/thumbnails/6.jpg)
Openbus Documentation, Release 1
2 Contents
![Page 7: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT](https://reader034.vdocuments.us/reader034/viewer/2022042708/5a9f97717f8b9a7f178d0351/html5/thumbnails/7.jpg)
CHAPTER 1
Introduction
The objective of Openbus is to define an architecture able to process the massive amount of events that occur in abanking IT Infraestructure. Those events are of different types, from a variety of sources and with different formats.
Depending on the nature of events, we will be processing them in a batch-oriented or near-realtime fashion.
To achieve this flexibility and big capability, we have defined Openbus as a concrete implementation of the so calledLambda Architecture for Big Data systems [Marz].
Lambda Architecture defines three main layers for the processing of data streams: Batch layer, Speed layer and Servinglayer.
In our case, Openbus is comprised of a set of technologies that interact between them to implement these layers:
• Apache Kafka: this is our data stream. Different systems will be generating messages in Kafka topics.
• HDFS: this is where our “master dataset” is stored.
• MapReduce: This is how our batch layer recomputes batch views. Mapreduce is also used for a batch ETLprocess that dumps data from Kafka to HDFS.
• Apache Storm: This is what we use as our speed layer. Events are consumed from Kafka and processed into“realtime views”
• HBase: This is where we store the “realtime views” generated by Storm topologies
1.1 Use Cases
Some use cases where openbus could be applied are:
• Web analytics
• Social Network Analysis
• Security Information and Event Management
3
![Page 8: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT](https://reader034.vdocuments.us/reader034/viewer/2022042708/5a9f97717f8b9a7f178d0351/html5/thumbnails/8.jpg)
Openbus Documentation, Release 1
4 Chapter 1. Introduction
![Page 9: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT](https://reader034.vdocuments.us/reader034/viewer/2022042708/5a9f97717f8b9a7f178d0351/html5/thumbnails/9.jpg)
CHAPTER 2
Architecture
2.1 High level architecture
2.2 Data stream: Apache Kafka
We use Apache kafka as a central hub for collecting different types of events.
Multiple systems will be publishing events into Kafka topics.
At the moment we are using Avro format for all the published events.
2.2.1 Introduction
Kafka is designed as a unified platform for handling all the real-time data feeds a large company might have.
Kafka is run as a cluster of broker servers. Messages are stored in categories called topics. Those messages arepublished by producers and consumed and further processed by consumers.
It uses a custom TCP Protocol for communication between clients and servers.
A Kafka topic is splitted into one or more partitions. Partitions are distributed over the servers in the Kafka cluster.Each partition is replicated across a configurable number of servers.
2.2.2 Producing
Kafka uses a custom TCP based protocol to expose its API. Apart from a JVM client maintained in its own codebase,there are client libraries for the following languages:
• Python
• Go
• C
• C++
• Clojure
• Ruby
• NodeJS
• Storm
5
![Page 10: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT](https://reader034.vdocuments.us/reader034/viewer/2022042708/5a9f97717f8b9a7f178d0351/html5/thumbnails/10.jpg)
Openbus Documentation, Release 1
• Scala DSL
• JRuby
Becoming a publisher in Kafka is not very difficult. You will need a partial list of your Kafka brokers (it doesn’t haveto be exhaustive, since the client uses those endpoints to query about the topic leaders) and a topic name.
This is an example of a very simple Kafka producer with Java:
import java.util.Date;import java.util.Properties;import java.util.Random;
import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;
public class BasicProducer {
Producer<String, String> producer;
public BasicProducer(String brokerList, boolean requiredAcks) {Properties props = new Properties();props.put("metadata.broker.list", brokerList);props.put("request.required.acks", requiredAcks ? "1" : "0");props.put("serializer.class", "kafka.serializer.StringEncoder");
producer = new Producer<String, String>(new ProducerConfig(props));}
public void sendMessage(String topic, String key, String value){KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, key, value);producer.send(data);
}
/*** Creates a simulated random log event and sends it to a kafka topic.
** @param topic topic where the message will be sent
*/public void sendRandomLogEvent(String topic){
//Build random IP messageRandom rnd = new Random();long runtime = new Date().getTime();String ip = "192.168.2." + rnd.nextInt(255);String msg = runtime + ", www.example.com, "+ ip;
//Send the message to the brokerthis.sendMessage(topic, ip, msg);
}}
2.2.3 Avro
As previously said, we are using the Avro data format to serialize all the data events we produce in our Kafka datastream.
This means that prior to send a message into Kafka, we are serializing it in Avro format, using a concrete Avroschema. This schema is embedded in the data we send to Kafka, so every future consumer of the message will be able
6 Chapter 2. Architecture
![Page 11: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT](https://reader034.vdocuments.us/reader034/viewer/2022042708/5a9f97717f8b9a7f178d0351/html5/thumbnails/11.jpg)
Openbus Documentation, Release 1
to deserialize it.
In the Openbus code you can find AvroSerializer and AvroDeserialzer Java lasses, that can be of great help when usingproducing or consuming Avro messages from Kafka.
This is the current Avro schema we are using for Log messages:
{"type": "record","name": "ApacheLog","namespace": "openbus.schema","doc": "Apache Log Event","fields": [{"name": "host", "type": "string"},{"name": "log", "type": "string"},{"name": "user", "type": "string"},{"name": "datetime", "type": "string"},{"name": "request", "type": "string"},{"name": "status", "type": "string"},{"name": "size", "type": "string"},{"name": "referer", "type": "string"},{"name": "userAgent", "type": "string"},{"name": "session", "type": "string"},{"name": "responseTime", "type": "string"}
]}
An example of producing Avro messages into Kafka is our AvroProducer class:
public class AvroProducer {
private Producer<byte[], byte[]> producer;private AvroSerializer serializer;private String topic;
public AvroProducer(String brokerList, String topic, String avroSchemaPath, String[] fields) {
this.topic=topic;this.serializer = new AvroSerializer(ClassLoader.class.getResourceAsStream(avroSchemaPath), fields );
Properties props = new Properties();props.put("metadata.broker.list", brokerList);this.producer = new kafka.javaapi.producer.Producer<>(new ProducerConfig(props));
}
/*** Send a message
* @param values Array of Avro field values to be sent to kafka
*/public void send(Object[] values) {
Message message = new Message(serializer.serialize(values));//producer.send(new KeyedMessage<byte[], byte[]>(topic, message.buffer().array()));
producer.send(new KeyedMessage<byte[], byte[]>(topic, serializer.serialize(values)));}
/*** closes producer
*/public void close() {
2.2. Data stream: Apache Kafka 7
![Page 12: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT](https://reader034.vdocuments.us/reader034/viewer/2022042708/5a9f97717f8b9a7f178d0351/html5/thumbnails/12.jpg)
Openbus Documentation, Release 1
producer.close();}
}
2.2.4 Consuming
2.3 Batch Layer: Hadoop
2.3.1 All Data: HDFS
2.3.2 Generating Batch views: Mapreduce
2.4 Speed Layer: Storm
2.4.1 Introduction
Storm is a distributed, reliable, fault-tolerant system for processing streams of data.
The work is delegated to different types of components that are each responsible for a simple specific processing task.
The input stream of a Storm cluster is handled by a components called Spout and Bolts.
In Storm execution units are Spouts and Bolts that unfold in typologies. Spouts are the data sources, the Bolts are theprocessors.
Spouts and connections between and among Bolts Bolts have settings to indicate how the stream is divided betweenthe “threads” of execution. In terminology Storm each “thread” is a separate instance of a parallel Bolt.
2.4.2 Use Cases Storm
Processing of Strems
RPC (Remote Procedure Call) distributed
Continuous Computation
8 Chapter 2. Architecture
![Page 13: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT](https://reader034.vdocuments.us/reader034/viewer/2022042708/5a9f97717f8b9a7f178d0351/html5/thumbnails/13.jpg)
Openbus Documentation, Release 1
2.4.3 Concepts
Tuples: An ordered list of elements
Stream: Stream of tuples
Spout: Producer Stream
Bolt: Processor and creator of new Streams
Topologies: Map Spouts and Bolts
2.4. Speed Layer: Storm 9
![Page 14: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT](https://reader034.vdocuments.us/reader034/viewer/2022042708/5a9f97717f8b9a7f178d0351/html5/thumbnails/14.jpg)
Openbus Documentation, Release 1
2.4.4 Fisic diagram
2.4.5 Components of a Storm topology
Shuffle Grouping:
Sending the bolts tuples is random and uniform. Valid for atomic operations
Local o Shuffle:
If the destination Bolt has one or more tasks in the same work process tuples are preferably sent to these workers
Fiels Grouping:
The stream is divided by the specified fields in the cluster. An example would be if the tuples have the user field alltuples with the same usuaio always go to the same task
10 Chapter 2. Architecture
![Page 15: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT](https://reader034.vdocuments.us/reader034/viewer/2022042708/5a9f97717f8b9a7f178d0351/html5/thumbnails/15.jpg)
Openbus Documentation, Release 1
All Grouping:
Tuples are sent to all tasks of bolts
Direct Grouping:
The producer decides which task tuple consumer receive this tuple
Global Grouping:
Send all tuples intancias to a single destination
Custom Grouping:
Lets implement a custom grouping
2.4.6 Trident
Trident is a high-level abstraction for doing realtime computing on top of Storm. Trident has joins, aggregations,grouping, functions, and filters. In addition to these, Trident adds primitives for doing stateful, incremental processingon top of any database or persistence store. Trident has consistent, exactly-once semantics, so it is easy to reason aboutTrident topologies.
2.4.7 Example Topology
2.4.8 Generating Realtime views: Storm Topologies
Based topology: Kafka -> Storm -> HBase
2.4. Speed Layer: Storm 11
![Page 16: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT](https://reader034.vdocuments.us/reader034/viewer/2022042708/5a9f97717f8b9a7f178d0351/html5/thumbnails/16.jpg)
Openbus Documentation, Release 1
stream = topology.newStream("spout", openbusBrokerSpout.getPartitionedTridentSpout());stream = stream.each(new Fields("bytes"), new AvroLogDecoder(), new Fields(fieldsWebLog));stream = stream.each(new Fields(fieldsWebLog), new WebServerLogFilter());
stream.each(new Fields("request", "datetime"), new DatePartition(), new Fields("cq", "cf")).groupBy(new Fields("request", "cq", "cf")).persistentAggregate(stateRequest, new Count(), new Fields("count")).newValuesStream().each(new Fields("request", "cq", "cf", "count"), new LogFilter());
stream.each(new Fields("user", "datetime"), new DatePartition(), new Fields("cq", "cf")).groupBy(new Fields("user", "cq", "cf")).persistentAggregate(stateUser, new Count(), new Fields("count")).newValuesStream().each(new Fields("user", "cq", "cf", "count"), new LogFilter());
stream.each(new Fields("session", "datetime"), new DatePartition(), new Fields("cq", "cf")).groupBy(new Fields("session", "cq", "cf")).persistentAggregate(stateSession, new Count(), new Fields("count")).newValuesStream().each(new Fields("session", "cq", "cf", "count"), new LogFilter());
return topology.build();
Optional HDFS and OpenTSDB
if (Constant.YES.equals(conf.get(Conf.PROP_OPENTSDB_USE))) {LOG.info("OpenTSDB: " + conf.get(Conf.PROP_OPENTSDB_USE));stream.groupBy(new Fields(fieldsWebLog)).aggregate(new Fields(fieldsWebLog), new WebServerLog2TSDB(), new Fields("count"));
}
if (Constant.YES.equals(conf.get(Conf.PROP_HDFS_USE))) {LOG.info("HDFS: " + conf.get(Conf.PROP_HDFS_USE));stream.each(new Fields(fieldsWebLog), new HDFSPersistence(), new Fields("result"));
}
2.5 Serving Layer
2.5.1 HBase
Hbase is the default NoSql database supplied with Hadoop. These are its main features
• Distributed
• Column-oriented
• High availability
• High performance
• Data volumen in order of TeraBytes and PetaBytes
• Horizontal scalability with the addition of nodes in a cluster
• Random read/write
Persistent states in HBase with Trident
12 Chapter 2. Architecture
![Page 17: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT](https://reader034.vdocuments.us/reader034/viewer/2022042708/5a9f97717f8b9a7f178d0351/html5/thumbnails/17.jpg)
Openbus Documentation, Release 1
@SuppressWarnings("rawtypes")TridentConfig configRequest = new TridentConfig(
(String)conf.get(Conf.PROP_HBASE_TABLE_REQUEST),(String)conf.get(Conf.PROP_HBASE_ROWID_REQUEST));
@SuppressWarnings("unchecked")StateFactory stateRequest = HBaseAggregateState.transactional(configRequest);
@SuppressWarnings("rawtypes")TridentConfig configUser = new TridentConfig(
(String)conf.get(Conf.PROP_HBASE_TABLE_USER),(String)conf.get(Conf.PROP_HBASE_ROWID_REQUEST));
@SuppressWarnings("unchecked")StateFactory stateUser = HBaseAggregateState.transactional(configUser);
@SuppressWarnings("rawtypes")TridentConfig configSession = new TridentConfig(
(String)conf.get(Conf.PROP_HBASE_TABLE_SESSION),(String)conf.get(Conf.PROP_HBASE_ROWID_SESSION));
@SuppressWarnings("unchecked")StateFactory stateSession = HBaseAggregateState.transactional(configSession);
Queries: in HBase in Openbus
#!/bin/bash
# ./hbaseReqRows.sh wslog_request daily:20131105
TABLE=$1DATE=$2
exec hbase shell << EOFscan ’${TABLE}’ , {COLUMNS => [’${DATE}’]}EOF
2.5. Serving Layer 13
![Page 18: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT](https://reader034.vdocuments.us/reader034/viewer/2022042708/5a9f97717f8b9a7f178d0351/html5/thumbnails/18.jpg)
Openbus Documentation, Release 1
14 Chapter 2. Architecture
![Page 19: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT](https://reader034.vdocuments.us/reader034/viewer/2022042708/5a9f97717f8b9a7f178d0351/html5/thumbnails/19.jpg)
CHAPTER 3
Installation
Deploying the Openbus architecture in your environment involves the following steps:
• Install dependencies
• Build Openbus code
• Run examples
We have tested Openbus in a Red Hat Enterprise Linux 6.4 environment
3.1 Installing dependencies
• Install Hadoop
• Install Kafka
• Install Storm
• Install Camus
3.2 Building openbus
Clone the project from github:
#> git clone https://github.com/Produban/openbus.git
Build the project using maven:
#> cd openbus#> mvn compile
15
![Page 20: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT](https://reader034.vdocuments.us/reader034/viewer/2022042708/5a9f97717f8b9a7f178d0351/html5/thumbnails/20.jpg)
Openbus Documentation, Release 1
16 Chapter 3. Installation
![Page 21: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT](https://reader034.vdocuments.us/reader034/viewer/2022042708/5a9f97717f8b9a7f178d0351/html5/thumbnails/21.jpg)
CHAPTER 4
Running examples
4.1 Submitting events to the Kafka broker
Launch javaAvroKafka sample:
#> cd $javaAvroKafkaHome#> java -jar avroKafka-1.0-SNAPSHOT-shaded.jar wslog 50 2 3 3 -90
Arguments are kafka topic, number of requests, number of users, number of user sessions, number of session requests,date simulation offset (0 for today).
4.2 Running batch ETL processes from Kafka to Hadoop
Launch Camus ETL:
#> cd $camusHome#> hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar com.linkedin.camus.etl.kafka.CamusJob -P <camus.properties>
where <camus.properties> is a file path pointing to camus configuration as described inhttps://github.com/Produban/camus under configuration section.
4.3 Running real time analysis with Storm topologies
Launch Openbus Topology:
#> cd $openBusRealTimeHome#> storm jar target/openbus-realtime-0.0.1-shaded.jar com.produban.openbus.processor.topology.OpenbusProcessorTopology openbus -zookepperHost vmlbcnimbusl01:2181 -topic wslog -staticHost vmlbcbrokerl01,vmlbcbrokerl02
Arguments are topology, kafka topic, zookeeper host and kafka broker list.
4.4 Visualizing data
View Hits per Day/Month/Week:
#> cd $openBusRealTimeHome/hbase/queryscripts#> ./hitsPer<Period>.sh <date> [requestId]
17
![Page 22: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT](https://reader034.vdocuments.us/reader034/viewer/2022042708/5a9f97717f8b9a7f178d0351/html5/thumbnails/22.jpg)
Openbus Documentation, Release 1
where <Period> can be Day, Month or Week. First arguments is the date in format “yyyyMMdd” for a day of a year,“yyyyMM” for a month of a year and “yyyyWW” for a week of a year. Second argument is optional for filtering withan specific request.
View Users per Day/Month/Week:
#> cd $openBusRealTimeHome/hbase/queryscripts#> ./usersPer<Period>.sh <date> [userId]
where <Period> can be Day, Month or Week. First arguments is the date in format “yyyyMMdd” for a day of a year,“yyyyMM” for a month of a year and “yyyyWW” for a week of a year. Second argument is optional for filtering withan specific user.
View Sessions per Day/Month/Week:
#> cd $openBusRealTimeHome/hbase/queryscripts#> ./sessionsPer<Period>.sh <date> [userId]
where <Period> can be Day, Month or Week. First arguments is the date in format “yyyyMMdd” for a day of a year,“yyyyMM” for a month of a year and “yyyyWW” for a week of a year. Second argument is optional for filtering withan specific session.
18 Chapter 4. Running examples
![Page 23: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT](https://reader034.vdocuments.us/reader034/viewer/2022042708/5a9f97717f8b9a7f178d0351/html5/thumbnails/23.jpg)
Bibliography
[Marz] Nathan Marz, James Warren. “Big Data. Principles and best practices of scalable realtime data systems”Manning MEAP
19