twitter's real time stack - processing billions of events using distributed log and heron

50
Twitter Real Time Stack Processing Billions of Events Using Distributed Log and Heron Karthik Ramasamy Twi/er @karthikz

Upload: karthik-ramasamy

Post on 16-Apr-2017

1.262 views

Category:

Software


1 download

TRANSCRIPT

Page 1: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

Twitter Real Time Stack Processing Billions of Events Using

Distributed Log and HeronKarthik  Ramasamy  

Twi/er

@karthikz

Page 2: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

2

Page 3: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

3

Value of DataIt’s contextual

Value&of&Data&to&Decision

/Making&

Time&

Preven

8ve/&

Pred

ic8ve&

Ac8onable&

Reac8ve&Historical&

Real%&Time&

Seconds& Minutes& Hours& Days&

Tradi8onal&“Batch”&&&&&&&&&&&&&&&Business&&Intelligence&

Informa9on&Half%Life&In&Decision%Making&

Months&

Time/cri8cal&Decisions&

[1]  Courtesy  Michael  Franklin,  BIRTE,  2015.  

Page 4: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

4

What is Real-Time?

BATCH

high throughput

> 1 hour

monthly active users relevance for ads

adhoc queries

REAL TIME

low latency

< 1 ms

Financial Trading

ad impressions count hash tag trends

approximate

10 ms - 1 sec

Near Real Time

latency sensitive

< 500 ms

fanout Tweets search for Tweets

deterministic workflows

OLTP

It’s contextual

Page 5: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

5

Why Real Time?

G

Emerging break out trends in Twitter (in the

form #hashtags)

Ü

Real time sports conversations related

with a topic (recent goal or touchdown)

!

Real time product recommendations based

on your behavior & profile

real time searchreal time trends real time conversations real time recommendations

Real time search of tweets

s

ANALYZING BILLIONS OF EVENTS IN REAL TIME IS A CHALLENGE!

Page 6: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

6

Real Time: Analytics

STREAMING

Analyze  data  as  it  is  being  produced

INTERACTIVEStore  data  and  provide  results  instantly   when   a   query   is  posed

H

C

Page 7: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

7

Real Time Use Cases

Online Services 10s of ms

Near Real Time 100s of ms

Data for Batch Analytics secs to mins

TransacKon  log,  Queues,  RPCs

Change  propagaKon,  Streaming  analyKcs

Log  aggregaKon,  Client  events

I

Page 8: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

8

Real Time StackComponents: Many moving parts

TWITTER REAL TIME

!

scribes

heronJ

Event Bus

a

dlog

b

Page 9: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

9

Scribe

Open source log aggregationOriginally  from  Facebook.  TwiRer  made  significant  enhancements  for  real  Kme  event  aggregaKon

High throughput and scaleDelivers  125M  messages/min.    Provides  Kght  SLAs  on  data  reliability

Runs on every machineSimple,  very  reliable  and  efficiently  uses  memory  and  CPU

!

{"

Page 10: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

Event  Bus  &  Distributed  LogNext Generation Messaging

"

Page 11: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

11

Twitter Messaging

Kestrel

Core  Business  Logic  (tweets,  fanouts  …)

Kestrel

HDFS

Kestrel Book  Keeper My  SQL Ka]a

Scribe

Deferred  RPC

Gizzard Database Search

Page 12: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

12

Kestrel Limitations

Adding subscribers is expensive

Scales poorly as #queues increase

Durability is hard to achieve

Read-behind degrades performance Too many random I/Os

Cross DC replication

!

#"

7!

Page 13: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

13

Kafka Limitations

Relies on file system page cache

Performance degradation when subscribers fall behind - too much random I/O!

"

Page 14: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

14

Rethinking Messaging

Durable writes, intra cluster and geo-replication

Scale resources independently Cost efficiency

Unified Stack - tradeoffs for various workloads

Multi tenancy

Ease of Manageability

!

#"

7!

Page 15: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

15

Event Bus

Durable writes, intra cluster and geo-replication

Scale resources independently Cost efficiency

Unified Stack - tradeoffs for various workloads

Multi tenancy

Ease of Manageability

!

#"

7!

Page 16: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

16

Event Bus - Pub-Sub

Write  Proxy

Read  Proxy

Publisher Subscriber

Metadata

Distributed    Log

Distributed  Log

Page 17: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

17

Distributed Log

Write  Proxy

Read  Proxy

Publisher Subscriber

Metadata

Distributed    Log

Page 18: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

18

Distributed Log @Twitter

01 02 03 04

Manhattan Key Value Store

Durable Deferred RPC

Real Time Search Indexing

Pub Sub System

/.

-,

05

/

Globally Replicated Log

Page 19: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

19

Distributed Log @Twitter

400  TB/Day  IN

10  PB/Day    OUT

2  Trillion  Events/Day  PROCESSED

100  MS  latency

Page 20: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

ALGORITHMSMining

Streaming Data

Page 21: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

Twi/er  HeronNext Generation Streaming Engine

"

Page 22: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

22

Better Storm

Twitter Heron

Container  Based  Architecture\

Separate  Monitoring  and  Scheduling-

Simplified  ExecuTon  Model2

Much  Be/er  Performance$

Page 23: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

23

Twitter Heron

Batching of tuplesAmorKzing  the  cost  of  transferring  tuples !

Task isolationEase  of  debug-­‐ability/isolaKon/profiling#

Fully API compatible with StormDirected  acyclic  graph    

 Topologies,  Spouts  and  Bolts"

Support for back pressureTopologies  should  self  adjusKngg

"

Use of main stream languagesC++,  Java  and  Python !

EfficiencyReduce resource consumption G

Design: Goals

Page 24: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

24

Twitter Heron

Guaranteed Message Passing

Horizontal Scalability

Robust Fault

Tolerance

Concise Code-Focus

on Logic

b \ Ñ /

Page 25: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

25

Heron Terminology

TopologyDirected  acyclic  graph    verKces  =  computaKon,  and    edges  =  streams  of  data  tuples

SpoutsSources  of  data  tuples  for  the  topology  Examples  -­‐  Ka]a/Kestrel/MySQL/Postgres

BoltsProcess  incoming  tuples,  and  emit  outgoing  tuples  Examples  -­‐  filtering/aggregaKon/join/any  funcKon

,

%

Page 26: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

26

Heron Topology

%

%

%

%

%

Spout 1

Spout 2

Bolt 1

Bolt 2

Bolt 3

Bolt 4

Bolt 5

Page 27: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

27

Stream Groupings

01 02 03 04

Shuffle Grouping

Random distribution of tuples

Fields Grouping

Group tuples by a field or multiple fields

All Grouping

Replicates tuples to all tasks

Global Grouping

Send the entire stream to one task

/.

-,

Page 28: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

28

Heron

Topology 1

TopologySubmission

Scheduler

Topology 2

Topology N

Architecture: High Level

Page 29: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

29

Heron

TopologyMaster

ZKCluster

Stream Manager

I1 I2 I3 I4

Stream Manager

I1 I2 I3 I4

Logical Plan, Physical Plan and Execution State

Sync Physical Plan

CONTAINER CONTAINER

Metrics Manager

Metrics Manager

Architecture: Topology

Page 30: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

30

Heron

% %S1 B2 B3

%B4

Stream Manager: BackPressure

Page 31: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

31

Stream Manager

S1 B2

B3

Stream Manager

Stream Manager

Stream Manager

Stream Manager

S1 B2

B3 B4

S1 B2

B3

S1 B2

B3 B4

B4

Stream Manager: BackPressure

Page 32: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

S1 S1

S1S1S1 S1

S1S1

32

Heron

B2

B3

Stream Manager

Stream Manager

Stream Manager

Stream Manager

B2

B3 B4

B2

B3

B2

B3 B4

B4

Stream Manager: Spout BackPressure

Page 33: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

33

Heron Use Cases

REALTIME ETL

REAL TIME BI

SPAM DETECTION REAL TIME

TRENDS REALTIME

MLREAL TIME

OPS

Page 34: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

34

HeronSample Topologies

Page 35: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

35

Heron @Twitter

1 stage 10 stages

3x reduction in cores and memory

Heron has been in production for 2 years

Page 36: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

36

Heron

COMPONENTS EXPT #1 EXPT #2 EXPT #3

Spout 25 100 200

Bolt 25 100 200

# Heron containers 25 100 200

# Storm workers 25 100 200

Performance: Settings

Page 37: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

37

HeronThroughput CPU usage

milli

on tu

ples

/min

0

2750

5500

8250

11000

Spout Parallelism25 100 200

10,200

5,820

1,545 1,920965249

Heron (paper) Heron (master)

# co

res

used

0

112.5

225

337.5

450

Spout Parallelism25 100 200

397.5

217.5

54

261

137

32

Heron (paper) Heron (master)

Performance: Atmost Once

5 - 6x 1.4 -1.6x

Page 38: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

38

Heronm

illion

tupl

es/m

in

0

10

20

30

40

Spout Parallelism25 100 200

Heron (paper) Heron (master)

4-5x

Performance: CPU Usage

Page 39: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

39

Heron @Twitter

>  400  Real  Time  Jobs

500  Billions  Events/Day  PROCESSED

25-­‐200  MS  

latency

Page 40: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

Tying  Together

"

Page 41: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

41

Combining batch and real time

Lambda Architecture

New  DataClient

Page 42: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

42

Lambda Architecture - The Good

Event  BusScribe  CollecKon  Pipeline Heron  AnalyKcs  Pipeline Results

Page 43: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

43

Lambda Architecture - The Bad

Have to fix everything (may be twice)!

How much Duct Tape required?

Have to write everything twice!

Subtle differences in semantics

What about Graphs, ML, SQL, etc?

!

#"

7!

Page 44: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

44

Summingbird to the Rescue

Summingbird  Program

Scalding/Map  Reduce

HDFS

Message  broker

Heron  Topology Online  key  value  result  store

Batch  key  value  result  store

Client

Page 45: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

45

Curious to Learn More?

1

Twitter Heron: Stream Processing at Scale

Sanjeev Kulkarni, Nikunj Bhagat, Maosong Fu, Vikas Kedigehalli, Christopher Kellogg,

Sailesh Mittal, Jignesh M. Patel*,1

, Karthik Ramasamy, Siddarth Taneja

@sanjeevrk, @challenger_nik, @Louis_Fumaosong, @vikkyrk, @cckellogg,

@saileshmittal, @pateljm, @karthikz, @staneja

Twitter, Inc., *University of Wisconsin – Madison

ABSTRACT Storm has long served as the main platform for real-time analytics at Twitter. However, as the scale of data being processed in real-time at Twitter has increased, along with an increase in the diversity and the number of use cases, many limitations of Storm have become apparent. We need a system that scales better, has better debug-ability, has better performance, and is easier to manage – all while working in a shared cluster infrastructure. We considered various alternatives to meet these needs, and in the end concluded that we needed to build a new real-time stream data processing system. This paper presents the design and implementation of this new system, called Heron. Heron is now the de facto stream data processing engine inside Twitter, and in this paper we also share our experiences from running Heron in production. In this paper, we also provide empirical evidence demonstrating the efficiency and scalability of Heron. ACM Classification H.2.4 [Information Systems]: Database Management—systems

Keywords Stream data processing systems; real-time data processing.

1. INTRODUCTION Twitter, like many other organizations, relies heavily on real-time streaming. For example, real-time streaming is used to compute the real-time active user counts (RTAC), and to measure the real-time engagement of users to tweets and advertisements. For many years, Storm [16, 20] was used as the real-time streaming engine inside Twitter. But, using Storm at our current scale was becoming increasingly challenging due to issues related to scalability, debug-ability, manageability, and efficient sharing of cluster resources with other data services.

A big challenge when working with Storm in production is the issue of debug-ability. When a topology misbehaves – which could be for a variety of reasons including load changes, misbehaving user code, or failing hardware – it is important to quickly determine the root-causes for the performance degradation. In Storm, work from multiple components of a topology is bundled into one operating

system process, which makes debugging very challenging. Thus, we needed a cleaner mapping from the logical units of computation to each physical process. The importance of such clean mapping for debug-ability is really crucial when responding to pager alerts for a failing topology, especially if it is a topology that is critical to the underlying business model.

In addition, Storm needs dedicated cluster resources, which requires special hardware allocation to run Storm topologies. This approach leads to inefficiencies in using precious cluster resources, and also limits the ability to scale on demand. We needed the ability to work in a more flexible way with popular cluster scheduling software that allows sharing the cluster resources across different types of data processing systems (and not just a stream processing system). Internally at Twitter, this meant working with Aurora [1], as that is the dominant cluster management system in use.

With Storm, provisioning a new production topology requires manual isolation of machines, and conversely, when a topology is no longer needed, the machines allocated to serve that topology now have to be decommissioned. Managing machine provisioning in this way is cumbersome. Furthermore, we also wanted to be far more efficient than the Storm system in production, simply because at Twitter’s scale, any improvement in performance translates into significant reduction in infrastructure costs and also significant improvements in the productivity of our end users.

We wanted to meet all the goals outlined above without forcing a rewrite of the large number of applications that have already been written for Storm; i.e. compatibility with the Storm and Summingbird APIs was essential. (Summingbird [8], which provides a Scala-idiomatic way for programmers to express their computation and constraints, generates many of the Storm topologies that are run in production.)1

After examining various options, we concluded that we needed to design a new stream processing system to meet the design goals outlined above. This new system is called Heron. Heron is API-compatible with Storm, which makes it easy for Storm users to migrate to Heron. All production topologies inside Twitter now run on Heron. Besides providing us significant performance improvements and lower resource consumption over Storm, Heron also has big advantages in terms of debug-ability, scalability, and manageability.

In this paper, we present the design of Heron, and also present results from an empirical evaluation of Heron. We begin by briefly describing related work in the next section. Then, in Section 3, we describe Storm and motivate the need for Heron.

1 Work done while consulting for Twitter.

Permission to make digital or hard copies of all or part of this work for personal or classroom use is granted without fee provided that copies are not made or distributed for profit or commercial advantage and that copies bear this notice and the full citation on the first page. Copyrights for components of this work owned by others than ACM must be honored. Abstracting with credit is permitted. To copy otherwise, or republish, to post on servers or to redistribute to lists, requires prior specific permission and/or a fee. Request permissions from [email protected]. SIGMOD’15, May 31–June 4, 2015, Melbourne, Victoria, Australia. ACM 978-1-4503-2758-9/15/05. http://dx.doi.org/10.1145/2723372.2723374

239

Storm @Twitter

Ankit Toshniwal, Siddarth Taneja, Amit Shukla, Karthik Ramasamy, Jignesh M. Patel*, Sanjeev Kulkarni, Jason Jackson, Krishna Gade, Maosong Fu, Jake Donham, Nikunj Bhagat, Sailesh Mittal, Dmitriy Ryaboy

@ankitoshniwal, @staneja, @amits, @karthikz, @pateljm, @sanjeevrk, @jason_j, @krishnagade, @Louis_Fumaosong, @jakedonham, @challenger_nik, @saileshmittal, @squarecog

Twitter, Inc., *University of Wisconsin – Madison

ABSTRACT This paper describes the use of Storm at Twitter. Storm is a real-time fault-tolerant and distributed stream data processing system. Storm is currently being used to run various critical computations in Twitter at scale, and in real-time. This paper describes the architecture of Storm and its methods for distributed scale-out and fault-tolerance. This paper also describes how queries (aka. topologies) are executed in Storm, and presents some operational stories based on running Storm at Twitter. We also present results from an empirical evaluation demonstrating the resilience of Storm in dealing with machine failures. Storm is under active development at Twitter and we also present some potential directions for future work.

1. INTRODUCTION Many modern data processing environments require processing complex computation on streaming data in real-time. This is particularly true at Twitter where each interaction with a user requires making a number of complex decisions, often based on data that has just been created.

Storm is a real-time distributed stream data processing engine at Twitter that powers the real-time stream data management tasks that are crucial to provide Twitter services. Storm is designed to be:

1. Scalable: The operations team needs to easily add or remove

nodes from the Storm cluster without disrupting existing data flows through Storm topologies (aka. standing queries).

2. Resilient: Fault-tolerance is crucial to Storm as it is often deployed on large clusters, and hardware components can fail. The Storm cluster must continue processing existing topologies with a minimal performance impact.

3. Extensible: Storm topologies may call arbitrary external functions (e.g. looking up a MySQL service for the social graph), and thus needs a framework that allows extensibility.

4. Efficient: Since Storm is used in real-time applications; it must have good performance characteristics. Storm uses a number of techniques, including keeping all its storage and computational data structures in memory.

5. Easy to Administer: Since Storm is at that heart of user interactions on Twitter, end-users immediately notice if there are (failure or performance) issues associated with Storm. The operational team needs early warning tools and must be able to quickly point out the source of problems as they arise. Thus, easy-to-use administration tools are not a “nice to have feature,” but a critical part of the requirement.

We note that Storm traces its lineage to the rich body of work on stream data processing (e.g. [1, 2, 3, 4]), and borrows heavily from that line of thinking. However a key difference is in bringing all the aspects listed above together in a single system. We also note that while Storm was one of the early stream processing systems, there have been other notable systems including S4 [5], and more recent systems such as MillWheel [6], Samza [7], Spark Streaming [8], and Photon [19]. Stream data processing technology has also been integrated as part of traditional database product pipelines (e.g. [9, 10, 11]).

Many earlier stream data processing systems have led the way in terms of introducing various concepts (e.g. extensibility, scalability, resilience), and we do not claim that these concepts were invented in Storm, but rather recognize that stream processing is quickly becoming a crucial component of a comprehensive data processing solution for enterprises, and Storm

Permission to make digital or hard copies of all or part of this work for personal or classroom use is granted without fee provided that copies are not made or distributed for profit or commercial advantage and that copies bear this notice and the full citation on the first page. Copyrights for components of this work owned by others than ACM must be honored. Abstracting with credit is permitted. To copy otherwise, or republish, to post on servers or to redistribute to lists, requires prior specific permission and/or a fee. Request permissions from [email protected]. SIGMOD’14, June 22–27, 2014, Snowbird, Utah, USA. Copyright © 2014 ACM 978-1-4503-2376-5/14/06…$15.00. http://dx.doi.org/10.1145/2588555.2595641

147

Page 46: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

46

Interested in Heron?

CONTRIBUTIONS ARE WELCOME! https://github.com/twitter/heron

http://heronstreaming.io

HERON IS OPEN SOURCED

FOLLOW US @HERONSTREAMING

Page 47: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

47

Interested in Distributed Log?

CONTRIBUTIONS ARE WELCOME! https://github.com/twitter/heron

http://distributedlog.io

DISTRIBUTED LOG IS OPEN SOURCED

FOLLOW US @DISTRIBUTEDLOG

Page 48: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

48

WHAT WHY WHERE WHEN WHO HOW

Any Question ???

Page 49: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

49

@karthikz Get in Touch

Page 50: Twitter's Real Time Stack - Processing Billions of Events Using Distributed Log and Heron

THANKS  FOR  ATTENDING  !!!