openbus documentation - read the docs · pdf fileopenbus documentation, release 1 an open...

23
Openbus Documentation Release 1 Produban February 17, 2014

Upload: dodang

Post on 07-Mar-2018

228 views

Category:

Documents


2 download

TRANSCRIPT

Page 1: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT

Openbus DocumentationRelease 1

Produban

February 17, 2014

Page 2: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT
Page 3: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT

Contents

i

Page 4: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT

ii

Page 5: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT

Openbus Documentation, Release 1

An open source architecture able to process the massive amount of events that occur in a banking IT Infraestructure.

Contents:

Contents 1

Page 6: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT

Openbus Documentation, Release 1

2 Contents

Page 7: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT

CHAPTER 1

Introduction

The objective of Openbus is to define an architecture able to process the massive amount of events that occur in abanking IT Infraestructure. Those events are of different types, from a variety of sources and with different formats.

Depending on the nature of events, we will be processing them in a batch-oriented or near-realtime fashion.

To achieve this flexibility and big capability, we have defined Openbus as a concrete implementation of the so calledLambda Architecture for Big Data systems [Marz].

Lambda Architecture defines three main layers for the processing of data streams: Batch layer, Speed layer and Servinglayer.

In our case, Openbus is comprised of a set of technologies that interact between them to implement these layers:

• Apache Kafka: this is our data stream. Different systems will be generating messages in Kafka topics.

• HDFS: this is where our “master dataset” is stored.

• MapReduce: This is how our batch layer recomputes batch views. Mapreduce is also used for a batch ETLprocess that dumps data from Kafka to HDFS.

• Apache Storm: This is what we use as our speed layer. Events are consumed from Kafka and processed into“realtime views”

• HBase: This is where we store the “realtime views” generated by Storm topologies

1.1 Use Cases

Some use cases where openbus could be applied are:

• Web analytics

• Social Network Analysis

• Security Information and Event Management

3

Page 8: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT

Openbus Documentation, Release 1

4 Chapter 1. Introduction

Page 9: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT

CHAPTER 2

Architecture

2.1 High level architecture

2.2 Data stream: Apache Kafka

We use Apache kafka as a central hub for collecting different types of events.

Multiple systems will be publishing events into Kafka topics.

At the moment we are using Avro format for all the published events.

2.2.1 Introduction

Kafka is designed as a unified platform for handling all the real-time data feeds a large company might have.

Kafka is run as a cluster of broker servers. Messages are stored in categories called topics. Those messages arepublished by producers and consumed and further processed by consumers.

It uses a custom TCP Protocol for communication between clients and servers.

A Kafka topic is splitted into one or more partitions. Partitions are distributed over the servers in the Kafka cluster.Each partition is replicated across a configurable number of servers.

2.2.2 Producing

Kafka uses a custom TCP based protocol to expose its API. Apart from a JVM client maintained in its own codebase,there are client libraries for the following languages:

• Python

• Go

• C

• C++

• Clojure

• Ruby

• NodeJS

• Storm

5

Page 10: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT

Openbus Documentation, Release 1

• Scala DSL

• JRuby

Becoming a publisher in Kafka is not very difficult. You will need a partial list of your Kafka brokers (it doesn’t haveto be exhaustive, since the client uses those endpoints to query about the topic leaders) and a topic name.

This is an example of a very simple Kafka producer with Java:

import java.util.Date;import java.util.Properties;import java.util.Random;

import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;

public class BasicProducer {

Producer<String, String> producer;

public BasicProducer(String brokerList, boolean requiredAcks) {Properties props = new Properties();props.put("metadata.broker.list", brokerList);props.put("request.required.acks", requiredAcks ? "1" : "0");props.put("serializer.class", "kafka.serializer.StringEncoder");

producer = new Producer<String, String>(new ProducerConfig(props));}

public void sendMessage(String topic, String key, String value){KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, key, value);producer.send(data);

}

/*** Creates a simulated random log event and sends it to a kafka topic.

** @param topic topic where the message will be sent

*/public void sendRandomLogEvent(String topic){

//Build random IP messageRandom rnd = new Random();long runtime = new Date().getTime();String ip = "192.168.2." + rnd.nextInt(255);String msg = runtime + ", www.example.com, "+ ip;

//Send the message to the brokerthis.sendMessage(topic, ip, msg);

}}

2.2.3 Avro

As previously said, we are using the Avro data format to serialize all the data events we produce in our Kafka datastream.

This means that prior to send a message into Kafka, we are serializing it in Avro format, using a concrete Avroschema. This schema is embedded in the data we send to Kafka, so every future consumer of the message will be able

6 Chapter 2. Architecture

Page 11: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT

Openbus Documentation, Release 1

to deserialize it.

In the Openbus code you can find AvroSerializer and AvroDeserialzer Java lasses, that can be of great help when usingproducing or consuming Avro messages from Kafka.

This is the current Avro schema we are using for Log messages:

{"type": "record","name": "ApacheLog","namespace": "openbus.schema","doc": "Apache Log Event","fields": [{"name": "host", "type": "string"},{"name": "log", "type": "string"},{"name": "user", "type": "string"},{"name": "datetime", "type": "string"},{"name": "request", "type": "string"},{"name": "status", "type": "string"},{"name": "size", "type": "string"},{"name": "referer", "type": "string"},{"name": "userAgent", "type": "string"},{"name": "session", "type": "string"},{"name": "responseTime", "type": "string"}

]}

An example of producing Avro messages into Kafka is our AvroProducer class:

public class AvroProducer {

private Producer<byte[], byte[]> producer;private AvroSerializer serializer;private String topic;

public AvroProducer(String brokerList, String topic, String avroSchemaPath, String[] fields) {

this.topic=topic;this.serializer = new AvroSerializer(ClassLoader.class.getResourceAsStream(avroSchemaPath), fields );

Properties props = new Properties();props.put("metadata.broker.list", brokerList);this.producer = new kafka.javaapi.producer.Producer<>(new ProducerConfig(props));

}

/*** Send a message

* @param values Array of Avro field values to be sent to kafka

*/public void send(Object[] values) {

Message message = new Message(serializer.serialize(values));//producer.send(new KeyedMessage<byte[], byte[]>(topic, message.buffer().array()));

producer.send(new KeyedMessage<byte[], byte[]>(topic, serializer.serialize(values)));}

/*** closes producer

*/public void close() {

2.2. Data stream: Apache Kafka 7

Page 12: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT

Openbus Documentation, Release 1

producer.close();}

}

2.2.4 Consuming

2.3 Batch Layer: Hadoop

2.3.1 All Data: HDFS

2.3.2 Generating Batch views: Mapreduce

2.4 Speed Layer: Storm

2.4.1 Introduction

Storm is a distributed, reliable, fault-tolerant system for processing streams of data.

The work is delegated to different types of components that are each responsible for a simple specific processing task.

The input stream of a Storm cluster is handled by a components called Spout and Bolts.

In Storm execution units are Spouts and Bolts that unfold in typologies. Spouts are the data sources, the Bolts are theprocessors.

Spouts and connections between and among Bolts Bolts have settings to indicate how the stream is divided betweenthe “threads” of execution. In terminology Storm each “thread” is a separate instance of a parallel Bolt.

2.4.2 Use Cases Storm

Processing of Strems

RPC (Remote Procedure Call) distributed

Continuous Computation

8 Chapter 2. Architecture

Page 13: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT

Openbus Documentation, Release 1

2.4.3 Concepts

Tuples: An ordered list of elements

Stream: Stream of tuples

Spout: Producer Stream

Bolt: Processor and creator of new Streams

Topologies: Map Spouts and Bolts

2.4. Speed Layer: Storm 9

Page 14: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT

Openbus Documentation, Release 1

2.4.4 Fisic diagram

2.4.5 Components of a Storm topology

Shuffle Grouping:

Sending the bolts tuples is random and uniform. Valid for atomic operations

Local o Shuffle:

If the destination Bolt has one or more tasks in the same work process tuples are preferably sent to these workers

Fiels Grouping:

The stream is divided by the specified fields in the cluster. An example would be if the tuples have the user field alltuples with the same usuaio always go to the same task

10 Chapter 2. Architecture

Page 15: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT

Openbus Documentation, Release 1

All Grouping:

Tuples are sent to all tasks of bolts

Direct Grouping:

The producer decides which task tuple consumer receive this tuple

Global Grouping:

Send all tuples intancias to a single destination

Custom Grouping:

Lets implement a custom grouping

2.4.6 Trident

Trident is a high-level abstraction for doing realtime computing on top of Storm. Trident has joins, aggregations,grouping, functions, and filters. In addition to these, Trident adds primitives for doing stateful, incremental processingon top of any database or persistence store. Trident has consistent, exactly-once semantics, so it is easy to reason aboutTrident topologies.

2.4.7 Example Topology

2.4.8 Generating Realtime views: Storm Topologies

Based topology: Kafka -> Storm -> HBase

2.4. Speed Layer: Storm 11

Page 16: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT

Openbus Documentation, Release 1

stream = topology.newStream("spout", openbusBrokerSpout.getPartitionedTridentSpout());stream = stream.each(new Fields("bytes"), new AvroLogDecoder(), new Fields(fieldsWebLog));stream = stream.each(new Fields(fieldsWebLog), new WebServerLogFilter());

stream.each(new Fields("request", "datetime"), new DatePartition(), new Fields("cq", "cf")).groupBy(new Fields("request", "cq", "cf")).persistentAggregate(stateRequest, new Count(), new Fields("count")).newValuesStream().each(new Fields("request", "cq", "cf", "count"), new LogFilter());

stream.each(new Fields("user", "datetime"), new DatePartition(), new Fields("cq", "cf")).groupBy(new Fields("user", "cq", "cf")).persistentAggregate(stateUser, new Count(), new Fields("count")).newValuesStream().each(new Fields("user", "cq", "cf", "count"), new LogFilter());

stream.each(new Fields("session", "datetime"), new DatePartition(), new Fields("cq", "cf")).groupBy(new Fields("session", "cq", "cf")).persistentAggregate(stateSession, new Count(), new Fields("count")).newValuesStream().each(new Fields("session", "cq", "cf", "count"), new LogFilter());

return topology.build();

Optional HDFS and OpenTSDB

if (Constant.YES.equals(conf.get(Conf.PROP_OPENTSDB_USE))) {LOG.info("OpenTSDB: " + conf.get(Conf.PROP_OPENTSDB_USE));stream.groupBy(new Fields(fieldsWebLog)).aggregate(new Fields(fieldsWebLog), new WebServerLog2TSDB(), new Fields("count"));

}

if (Constant.YES.equals(conf.get(Conf.PROP_HDFS_USE))) {LOG.info("HDFS: " + conf.get(Conf.PROP_HDFS_USE));stream.each(new Fields(fieldsWebLog), new HDFSPersistence(), new Fields("result"));

}

2.5 Serving Layer

2.5.1 HBase

Hbase is the default NoSql database supplied with Hadoop. These are its main features

• Distributed

• Column-oriented

• High availability

• High performance

• Data volumen in order of TeraBytes and PetaBytes

• Horizontal scalability with the addition of nodes in a cluster

• Random read/write

Persistent states in HBase with Trident

12 Chapter 2. Architecture

Page 17: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT

Openbus Documentation, Release 1

@SuppressWarnings("rawtypes")TridentConfig configRequest = new TridentConfig(

(String)conf.get(Conf.PROP_HBASE_TABLE_REQUEST),(String)conf.get(Conf.PROP_HBASE_ROWID_REQUEST));

@SuppressWarnings("unchecked")StateFactory stateRequest = HBaseAggregateState.transactional(configRequest);

@SuppressWarnings("rawtypes")TridentConfig configUser = new TridentConfig(

(String)conf.get(Conf.PROP_HBASE_TABLE_USER),(String)conf.get(Conf.PROP_HBASE_ROWID_REQUEST));

@SuppressWarnings("unchecked")StateFactory stateUser = HBaseAggregateState.transactional(configUser);

@SuppressWarnings("rawtypes")TridentConfig configSession = new TridentConfig(

(String)conf.get(Conf.PROP_HBASE_TABLE_SESSION),(String)conf.get(Conf.PROP_HBASE_ROWID_SESSION));

@SuppressWarnings("unchecked")StateFactory stateSession = HBaseAggregateState.transactional(configSession);

Queries: in HBase in Openbus

#!/bin/bash

# ./hbaseReqRows.sh wslog_request daily:20131105

TABLE=$1DATE=$2

exec hbase shell << EOFscan ’${TABLE}’ , {COLUMNS => [’${DATE}’]}EOF

2.5. Serving Layer 13

Page 18: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT

Openbus Documentation, Release 1

14 Chapter 2. Architecture

Page 19: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT

CHAPTER 3

Installation

Deploying the Openbus architecture in your environment involves the following steps:

• Install dependencies

• Build Openbus code

• Run examples

We have tested Openbus in a Red Hat Enterprise Linux 6.4 environment

3.1 Installing dependencies

• Install Hadoop

• Install Kafka

• Install Storm

• Install Camus

3.2 Building openbus

Clone the project from github:

#> git clone https://github.com/Produban/openbus.git

Build the project using maven:

#> cd openbus#> mvn compile

15

Page 20: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT

Openbus Documentation, Release 1

16 Chapter 3. Installation

Page 21: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT

CHAPTER 4

Running examples

4.1 Submitting events to the Kafka broker

Launch javaAvroKafka sample:

#> cd $javaAvroKafkaHome#> java -jar avroKafka-1.0-SNAPSHOT-shaded.jar wslog 50 2 3 3 -90

Arguments are kafka topic, number of requests, number of users, number of user sessions, number of session requests,date simulation offset (0 for today).

4.2 Running batch ETL processes from Kafka to Hadoop

Launch Camus ETL:

#> cd $camusHome#> hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar com.linkedin.camus.etl.kafka.CamusJob -P <camus.properties>

where <camus.properties> is a file path pointing to camus configuration as described inhttps://github.com/Produban/camus under configuration section.

4.3 Running real time analysis with Storm topologies

Launch Openbus Topology:

#> cd $openBusRealTimeHome#> storm jar target/openbus-realtime-0.0.1-shaded.jar com.produban.openbus.processor.topology.OpenbusProcessorTopology openbus -zookepperHost vmlbcnimbusl01:2181 -topic wslog -staticHost vmlbcbrokerl01,vmlbcbrokerl02

Arguments are topology, kafka topic, zookeeper host and kafka broker list.

4.4 Visualizing data

View Hits per Day/Month/Week:

#> cd $openBusRealTimeHome/hbase/queryscripts#> ./hitsPer<Period>.sh <date> [requestId]

17

Page 22: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT

Openbus Documentation, Release 1

where <Period> can be Day, Month or Week. First arguments is the date in format “yyyyMMdd” for a day of a year,“yyyyMM” for a month of a year and “yyyyWW” for a week of a year. Second argument is optional for filtering withan specific request.

View Users per Day/Month/Week:

#> cd $openBusRealTimeHome/hbase/queryscripts#> ./usersPer<Period>.sh <date> [userId]

where <Period> can be Day, Month or Week. First arguments is the date in format “yyyyMMdd” for a day of a year,“yyyyMM” for a month of a year and “yyyyWW” for a week of a year. Second argument is optional for filtering withan specific user.

View Sessions per Day/Month/Week:

#> cd $openBusRealTimeHome/hbase/queryscripts#> ./sessionsPer<Period>.sh <date> [userId]

where <Period> can be Day, Month or Week. First arguments is the date in format “yyyyMMdd” for a day of a year,“yyyyMM” for a month of a year and “yyyyWW” for a week of a year. Second argument is optional for filtering withan specific session.

18 Chapter 4. Running examples

Page 23: Openbus Documentation - Read the Docs · PDF fileOpenbus Documentation, Release 1 An open source architecture able to process the massive amount of events that occur in a banking IT

Bibliography

[Marz] Nathan Marz, James Warren. “Big Data. Principles and best practices of scalable realtime data systems”Manning MEAP

19