handson with twitter heron

58
Twitter Heron KARTHIK RAMASAMY @KARTHIKZ #TwitterHeron

Upload: data-engineers-guild-meetup-group

Post on 13-Feb-2017

1.019 views

Category:

Technology


0 download

TRANSCRIPT

Page 1: Handson with Twitter Heron

Twitter Heron

KARTHIK RAMASAMY @KARTHIKZ

#TwitterHeron

Page 2: Handson with Twitter Heron

BEGIN

END

HERON OVERVIEW

!I

HERON PERFORMANCE

(II

CONCLUSION

KV

HERON LOAD

SHEDDING

ZIV

TALK OUTLINE

HERON BACKPRESSURE

bIII

Page 3: Handson with Twitter Heron

HERON OVERVIEW

b

Page 4: Handson with Twitter Heron

STORM/HERON TERMINOLOGYTOPOLOGY

Directed acyclic graph

Vertices=computation, and edges=streams of data tuples

SPOUTS

Sources of data tuples for the topology

Examples - Kafka/Kestrel/MySQL/Postgres

BOLTS

Process incoming tuples and emit outgoing tuples

Examples - filtering/aggregation/join/arbitrary function

,

%

Page 5: Handson with Twitter Heron

STORM/HERON TOPOLOGY

%

%

%

%

%

SPOUT 1

SPOUT 2

BOLT 1

BOLT 2

BOLT 3

BOLT 4

BOLT 5

Page 6: Handson with Twitter Heron

WORD COUNT TOPOLOGY

% %TWEET SPOUT PARSE TWEET BOLT WORD COUNT BOLT

Live stream of Tweets

LOGICAL PLAN

Page 7: Handson with Twitter Heron

WORD COUNT TOPOLOGY

% %TWEET SPOUT

TASKSPARSE TWEET BOLT

TASKSWORD COUNT BOLT

TASKS

%%%% %%%%

When a parse tweet bolt task emits a tuple which word count bolt task should it send to?

Page 8: Handson with Twitter Heron

STREAM GROUPINGS

Random distribution of tuples

Group tuples by a field or multiple

fields

Replicates tuples to all tasks

SHUFFLE GROUPING FIELDS GROUPING ALL GROUPING

Sends the entire stream to one task

GLOBAL GROUPING

/ - ,.

Page 9: Handson with Twitter Heron

WORD COUNT TOPOLOGY

% %TWEET SPOUT

TASKSPARSE TWEET BOLT

TASKSWORD COUNT BOLT

TASKS

%%%% %%%%

SHUFFLE GROUPING FIELDS GROUPING

Page 10: Handson with Twitter Heron

WHY HERON?

PERFORMANCE PREDICTABILITY

EASE OF MANAGEABILITY

!

d

"IMPROVE DEVELOPER PRODUCTIVITY

Page 11: Handson with Twitter Heron

HERON DESIGN DECISIONSFULLY API COMPATIBLE WITH STORM

Directed acyclic graph

Topologies, spouts and bolts

USE OF MAIN STREAM LANGUAGES

C++/JAVA/Python

!

d

"TASK ISOLATION

Ease of debug ability/resource isolation/profiling

Page 12: Handson with Twitter Heron

HERON ARCHITECTURE

Topology 1

TOPOLOGY SUBMISSION

Scheduler

Topology 2

Topology 3

Topology N

Page 13: Handson with Twitter Heron

TOPOLOGY ARCHITECTURE

Topology Master

ZK CLUSTER

Stream Manager

I1 I2 I3 I4

Stream Manager

I1 I2 I3 I4

Logical Plan, Physical Plan and Execution State

Sync Physical Plan

CONTAINER CONTAINER

Metrics Manager

Metrics Manager

Page 14: Handson with Twitter Heron

HERON SAMPLE TOPOLOGIES

Page 15: Handson with Twitter Heron

Large amount of data produced every day

Large cluster Several hundred topologies deployed

Several billion messages every day

HERON @TWITTER

1 stage 10 stages

3x reduction in cores and memory

Heron has been in production for 2 years

Page 16: Handson with Twitter Heron

HERON USE CASES

REALTIME ETL

REAL TIME BI

SPAM DETECTION

REAL TIME TRENDS

REALTIME ML

REAL TIME MEDIA

REAL TIME OPS

Page 17: Handson with Twitter Heron

COMPONENTIZING HERON

x

9

Page 18: Handson with Twitter Heron

HETEROGENOUS ECOSYSTEM

Unmanaged Schedulers

Managed Schedulers State ManagersJob Uploaders

Page 19: Handson with Twitter Heron

LOOKING BACK- MICROKERNELS

Page 20: Handson with Twitter Heron

HERON- MICROSTREAM ENGINE

HARDWARE

BASIC INTER/INTRA IPC

Topology Master

Stream Manager

InstanceMetrics Manager

Scribe Graphite

SC

HED

ULE

R S

PI S

TATE

MA

NA

GER

SPI

Page 21: Handson with Twitter Heron

HERON - MICROSTREAM ENGINE

PLUG AND PLAY COMPONENTS

As environment changes, core does not change

MULTI LANGUAGE INSTANCES

Support multiple language API with native instances

MULTIPLE PROCESSING SEMANTICS

Efficient stream managers for each semantics

EASE OF DEVELOPMENT

Faster development of components with little dependency

!

!

!

!

Page 22: Handson with Twitter Heron

HERON ENVIRONMENT

Local Scheduler Local State Manager

Local Uploader

Aurora/Scheduler Zookeeper State Manager

Packer Uploader

Mesos/Aurora Scheduler Zookeeper State Manager

HDFS Uploader

Laptop/Server Cluster/ProductionCluster/Testing

Page 23: Handson with Twitter Heron

HERON RESOURCE USAGE

x

9

Page 24: Handson with Twitter Heron

HERON RESOURCE USAGE

Event Spout Aggregate Bolt

60-100M/min

Filter8-12M/min

Flat-Map40-60M/min

AggregateCache 1 sec

Output25-42M/min

Redis

Page 25: Handson with Twitter Heron

RESOURCE CONSUMPTION

Cores Requested

Cores Used

Memory Requested

(GB)

Memory Used

Redis 24 2-4 48 N/A

Heron 120 30-50 200 180

Page 26: Handson with Twitter Heron

RESOURCE CONSUMPTION

7%9%

84%

Spout Instances Bolt Instances Heron Overhead

Page 27: Handson with Twitter Heron

PROFILING SPOUTS

2%7%

16%

6%

6%63%

Deserialize Parse/Filter Mapping Kafka Iterator Kafka Fetch Rest

Page 28: Handson with Twitter Heron

PROFILING BOLTS

2%4%5%2%

19%

68%

Write Data Serialize Deserialize Aggregation Data Transport Rest

Page 29: Handson with Twitter Heron

RESOURCE CONSUMPTION - BREAKDOWN

8%11%

21%61%

Fetching Data User Logic Heron Usage Writing Data

Page 30: Handson with Twitter Heron

HERON BACK PRESSURE

x

9

Page 31: Handson with Twitter Heron

STRAGGLERS

BAD HOST EXECUTION SKEW

INADEQUATE PROVISIONING

b \ Ñ

Stragglers are the norm in a multi-tenant distributed systems

Page 32: Handson with Twitter Heron

APPROACHES TO HANDLE STRAGGLERS

SENDERS TO STRAGGLER DROP DATA

DETECT STRAGGLERS AND RESCHEDULE THEM

!

d

" SENDERS SLOW DOWN TO THE SPEED OF STRAGGLER

Page 33: Handson with Twitter Heron

DROP DATA STRATEGY

UNPREDICTABLE AFFECTS ACCURACY

POOR VISIBILITY

b \ Ñ

Page 34: Handson with Twitter Heron

SLOW DOWN SENDER STRATEGY

PROVIDES PREDICTABILITY

PROCESSES DATA AT

MAXIMUM RATE

REDUCE RECOVERY

TIMES

HANDLES TEMPORARY

SPIKES

/b \ Ñ

back pressure

Page 35: Handson with Twitter Heron

SLOW DOWN SENDER STRATEGY

% %

S1 B2 B3

%

B4

back pressure

Page 36: Handson with Twitter Heron

S1 B2

B3

SLOW DOWN SENDERS STRATEGY

Stream Manager

Stream Manager

Stream Manager

Stream Manager

S1 B2

B3 B4

S1 B2

B3

S1 B2

B3 B4

B4

S1 S1

S1S1

Page 37: Handson with Twitter Heron

BACK PRESSURE IN PRACTICE

Read pointer Write pointer

Manually restart the container

Page 38: Handson with Twitter Heron

BACK PRESSURE IN PRACTICE

IN MOST SCENARIOS BACK PRESSURE RECOVERS

Without any manual intervention

SOMETIMES USER PREFER DROPPING OF DATA

Care about only latest data

!

d

"SUSTAINED BACK PRESSURE

Irrecoverable GC cycles

Bad or faulty host

Page 39: Handson with Twitter Heron

DETECTING BAD/FAULTY HOSTS

Page 40: Handson with Twitter Heron

HERON LOAD SHEDDING

x

9

Page 41: Handson with Twitter Heron

LOAD SHEDDING

SAMPLING BASED APPROACHES

Down sample the incoming stream and scale up the results

Easy to reason if the sampling is uniform

Hard to achieve uniformity across distributed spouts

!

"DROP BASED APPROACHES

Simply drop older data

Spouts takes a lag threshold and a lag adjustment value

Works well in practice

Page 42: Handson with Twitter Heron

CURIOUS TO LEARN MORE…

1

Twitter Heron: Stream Processing at Scale

Sanjeev Kulkarni, Nikunj Bhagat, Maosong Fu, Vikas Kedigehalli, Christopher Kellogg,

Sailesh Mittal, Jignesh M. Patel*,1

, Karthik Ramasamy, Siddarth Taneja

@sanjeevrk, @challenger_nik, @Louis_Fumaosong, @vikkyrk, @cckellogg,

@saileshmittal, @pateljm, @karthikz, @staneja

Twitter, Inc., *University of Wisconsin – Madison

ABSTRACT Storm has long served as the main platform for real-time analytics at Twitter. However, as the scale of data being processed in real-time at Twitter has increased, along with an increase in the diversity and the number of use cases, many limitations of Storm have become apparent. We need a system that scales better, has better debug-ability, has better performance, and is easier to manage – all while working in a shared cluster infrastructure. We considered various alternatives to meet these needs, and in the end concluded that we needed to build a new real-time stream data processing system. This paper presents the design and implementation of this new system, called Heron. Heron is now the de facto stream data processing engine inside Twitter, and in this paper we also share our experiences from running Heron in production. In this paper, we also provide empirical evidence demonstrating the efficiency and scalability of Heron. ACM Classification H.2.4 [Information Systems]: Database Management—systems

Keywords Stream data processing systems; real-time data processing.

1. INTRODUCTION Twitter, like many other organizations, relies heavily on real-time streaming. For example, real-time streaming is used to compute the real-time active user counts (RTAC), and to measure the real-time engagement of users to tweets and advertisements. For many years, Storm [16, 20] was used as the real-time streaming engine inside Twitter. But, using Storm at our current scale was becoming increasingly challenging due to issues related to scalability, debug-ability, manageability, and efficient sharing of cluster resources with other data services.

A big challenge when working with Storm in production is the issue of debug-ability. When a topology misbehaves – which could be for a variety of reasons including load changes, misbehaving user code, or failing hardware – it is important to quickly determine the root-causes for the performance degradation. In Storm, work from multiple components of a topology is bundled into one operating

system process, which makes debugging very challenging. Thus, we needed a cleaner mapping from the logical units of computation to each physical process. The importance of such clean mapping for debug-ability is really crucial when responding to pager alerts for a failing topology, especially if it is a topology that is critical to the underlying business model.

In addition, Storm needs dedicated cluster resources, which requires special hardware allocation to run Storm topologies. This approach leads to inefficiencies in using precious cluster resources, and also limits the ability to scale on demand. We needed the ability to work in a more flexible way with popular cluster scheduling software that allows sharing the cluster resources across different types of data processing systems (and not just a stream processing system). Internally at Twitter, this meant working with Aurora [1], as that is the dominant cluster management system in use.

With Storm, provisioning a new production topology requires manual isolation of machines, and conversely, when a topology is no longer needed, the machines allocated to serve that topology now have to be decommissioned. Managing machine provisioning in this way is cumbersome. Furthermore, we also wanted to be far more efficient than the Storm system in production, simply because at Twitter’s scale, any improvement in performance translates into significant reduction in infrastructure costs and also significant improvements in the productivity of our end users.

We wanted to meet all the goals outlined above without forcing a rewrite of the large number of applications that have already been written for Storm; i.e. compatibility with the Storm and Summingbird APIs was essential. (Summingbird [8], which provides a Scala-idiomatic way for programmers to express their computation and constraints, generates many of the Storm topologies that are run in production.)1

After examining various options, we concluded that we needed to design a new stream processing system to meet the design goals outlined above. This new system is called Heron. Heron is API-compatible with Storm, which makes it easy for Storm users to migrate to Heron. All production topologies inside Twitter now run on Heron. Besides providing us significant performance improvements and lower resource consumption over Storm, Heron also has big advantages in terms of debug-ability, scalability, and manageability.

In this paper, we present the design of Heron, and also present results from an empirical evaluation of Heron. We begin by briefly describing related work in the next section. Then, in Section 3, we describe Storm and motivate the need for Heron.

1 Work done while consulting for Twitter.

Permission to make digital or hard copies of all or part of this work for personal or classroom use is granted without fee provided that copies are not made or distributed for profit or commercial advantage and that copies bear this notice and the full citation on the first page. Copyrights for components of this work owned by others than ACM must be honored. Abstracting with credit is permitted. To copy otherwise, or republish, to post on servers or to redistribute to lists, requires prior specific permission and/or a fee. Request permissions from [email protected]. SIGMOD’15, May 31–June 4, 2015, Melbourne, Victoria, Australia. ACM 978-1-4503-2758-9/15/05. http://dx.doi.org/10.1145/2723372.2723374

239

Storm @Twitter

Ankit Toshniwal, Siddarth Taneja, Amit Shukla, Karthik Ramasamy, Jignesh M. Patel*, Sanjeev Kulkarni, Jason Jackson, Krishna Gade, Maosong Fu, Jake Donham, Nikunj Bhagat, Sailesh Mittal, Dmitriy Ryaboy

@ankitoshniwal, @staneja, @amits, @karthikz, @pateljm, @sanjeevrk, @jason_j, @krishnagade, @Louis_Fumaosong, @jakedonham, @challenger_nik, @saileshmittal, @squarecog

Twitter, Inc., *University of Wisconsin – Madison

ABSTRACT This paper describes the use of Storm at Twitter. Storm is a real-time fault-tolerant and distributed stream data processing system. Storm is currently being used to run various critical computations in Twitter at scale, and in real-time. This paper describes the architecture of Storm and its methods for distributed scale-out and fault-tolerance. This paper also describes how queries (aka. topologies) are executed in Storm, and presents some operational stories based on running Storm at Twitter. We also present results from an empirical evaluation demonstrating the resilience of Storm in dealing with machine failures. Storm is under active development at Twitter and we also present some potential directions for future work.

1. INTRODUCTION Many modern data processing environments require processing complex computation on streaming data in real-time. This is particularly true at Twitter where each interaction with a user requires making a number of complex decisions, often based on data that has just been created.

Storm is a real-time distributed stream data processing engine at Twitter that powers the real-time stream data management tasks that are crucial to provide Twitter services. Storm is designed to be:

1. Scalable: The operations team needs to easily add or remove

nodes from the Storm cluster without disrupting existing data flows through Storm topologies (aka. standing queries).

2. Resilient: Fault-tolerance is crucial to Storm as it is often deployed on large clusters, and hardware components can fail. The Storm cluster must continue processing existing topologies with a minimal performance impact.

3. Extensible: Storm topologies may call arbitrary external functions (e.g. looking up a MySQL service for the social graph), and thus needs a framework that allows extensibility.

4. Efficient: Since Storm is used in real-time applications; it must have good performance characteristics. Storm uses a number of techniques, including keeping all its storage and computational data structures in memory.

5. Easy to Administer: Since Storm is at that heart of user interactions on Twitter, end-users immediately notice if there are (failure or performance) issues associated with Storm. The operational team needs early warning tools and must be able to quickly point out the source of problems as they arise. Thus, easy-to-use administration tools are not a “nice to have feature,” but a critical part of the requirement.

We note that Storm traces its lineage to the rich body of work on stream data processing (e.g. [1, 2, 3, 4]), and borrows heavily from that line of thinking. However a key difference is in bringing all the aspects listed above together in a single system. We also note that while Storm was one of the early stream processing systems, there have been other notable systems including S4 [5], and more recent systems such as MillWheel [6], Samza [7], Spark Streaming [8], and Photon [19]. Stream data processing technology has also been integrated as part of traditional database product pipelines (e.g. [9, 10, 11]).

Many earlier stream data processing systems have led the way in terms of introducing various concepts (e.g. extensibility, scalability, resilience), and we do not claim that these concepts were invented in Storm, but rather recognize that stream processing is quickly becoming a crucial component of a comprehensive data processing solution for enterprises, and Storm

Permission to make digital or hard copies of all or part of this work for personal or classroom use is granted without fee provided that copies are not made or distributed for profit or commercial advantage and that copies bear this notice and the full citation on the first page. Copyrights for components of this work owned by others than ACM must be honored. Abstracting with credit is permitted. To copy otherwise, or republish, to post on servers or to redistribute to lists, requires prior specific permission and/or a fee. Request permissions from [email protected]. SIGMOD’14, June 22–27, 2014, Snowbird, Utah, USA. Copyright © 2014 ACM 978-1-4503-2376-5/14/06…$15.00. http://dx.doi.org/10.1145/2588555.2595641

147

Streaming@Twitter

Maosong Fu, Sailesh Mittal, Vikas Kedigehalli, Karthik Ramasamy, Michael Barry,Andrew Jorgensen, Christopher Kellogg, Neng Lu, Bill Graham, Jingwei Wu

Twitter, Inc.

Abstract

Twitter generates tens of billions of events per hour when users interact with it. Analyzing theseevents to surface relevant content and to derive insights in real time is a challenge. To address this, wedeveloped Heron, a new real time distributed streaming engine. In this paper, we first describe the designgoals of Heron and show how the Heron architecture achieves task isolation and resource reservationto ease debugging, troubleshooting, and seamless use of shared cluster infrastructure with other criticalTwitter services. We subsequently explore how a topology self adjusts using back pressure so that thepace of the topology goes as its slowest component. Finally, we outline how Heron implements at mostonce and at least once semantics and we describe a few operational stories based on running Heron inproduction.

1 Introduction

Stream processing platforms enable enterprises to extract business value from data in motion similar to batchprocessing platforms that facilitated the same with data at rest [42]. The goal of stream processing is to enablereal time or near real time decision making by providing capabilities to inspect, correlate and analyze data asit flows through data processing pipelines. There is an emerging trend to transition from predominant batchanalytics to streaming analytics driven by a combination of the increased data collection in real time and theneed to make decisions instantly. Several scenarios in different industries require stream processing capabilitiesthat can process millions and even hundreds of millions of events per second. Twitter is no exception.

Twitter is synonymous with real time. When a user tweets, his or her tweet can reach millions of usersinstantly. Twitter users post several hundred millions of tweets every day. These tweets vary in diversity ofcontent [28] including but not limited to news, pass along (information or URL sharing), status updates (dailychatter), and real time conversations surrounding events such as the Super Bowl, the Oscars, etc. Due to thevolume and variety of tweets, it is necessary to surface the relevant content in the form of break out momentsand trending #hashtags to users in real time. In addition, there are several use cases in real time such as analyzinguser engagements, extract/transform/load (ETL), model building, etc.

In order to power the aforementioned crucial use cases, Twitter developed an entirely new real time dis-tributed stream processing engine called Heron. Heron is designed to provide

Copyright 2015 IEEE. Personal use of this material is permitted. However, permission to reprint/republish this material foradvertising or promotional purposes or for creating new collective works for resale or redistribution to servers or lists, or to reuse anycopyrighted component of this work in other works must be obtained from the IEEE.Bulletin of the IEEE Computer Society Technical Committee on Data Engineering

1

Page 43: Handson with Twitter Heron

INTERESTED IN HERON?

CONTRIBUTIONS ARE WELCOME! https://github.com/twitter/heron

http://heronstreaming.io

HERON IS OPEN SOURCED

FOLLOW US @HERONSTREAMING

Page 44: Handson with Twitter Heron

"

#ThankYouFOR LISTENING

Page 45: Handson with Twitter Heron

QUESTIONS

and

ANSWERS

R# Go ahead. Ask away.

Page 46: Handson with Twitter Heron

Heron Hands On

NENG LU, PANKAJ GUPTA MARK LI, ZUYU ZHANG

TAISHI NOJIMA

#TwitterHeron

Page 47: Handson with Twitter Heron

BEGIN

END

INSTALL HERON

!I

LAUNCH TOPOLOGIES

(II

CONCLUSION

KV

HERON STARTER

ZIV

AGENDA OUTLINE

EXPLORE UI

bIII

Page 48: Handson with Twitter Heron

INSTALL HERON

x

9

Page 49: Handson with Twitter Heron

HERON PREREQS

JAVA 7 OR ABOVE

PYTHON 2.7

LINUX PLATFORMS: INSTALL LIBUNWIND

Page 50: Handson with Twitter Heron

INSTALL HERON CLIENT

STEP 1: DOWNLOAD CLIENT BINARIES

https://github.com/twitter/heron/releases/tag/0.14.1

heron-client-install-0.14.1-<platform>.sh

STEP 2: INSTALL CLIENT BINARIES

chmod +x heron-client-install-0.14.1-<platform>.sh

./heron-client-install-0.14.1-<platform>.sh —user

Page 51: Handson with Twitter Heron

INSTALL HERON TOOLS

STEP 1: DOWNLOAD TOOLS BINARIES

https://github.com/twitter/heron/releases/tag/0.14.1

heron-tools-install-0.14.1-<platform>.sh

STEP 2: INSTALL TOOLS BINARIES

chmod +x heron-tools-install-0.14.1-<platform>.sh

./heron-tools-install-0.14.1-<platform>.sh —user

Page 52: Handson with Twitter Heron

LAUNCH HERON TOOLS

STEP 1: LAUNCH HERON TRACKER

heron-tracker

STEP 2: LAUNCH HERON UI

heron-ui

Page 53: Handson with Twitter Heron

LAUNCH EXAMPLE TOPOLOGIESSTEP 1: SUBMIT TOPOLOGYheron submit local ~/.heron/examples/heron-examples.jar \ com.twitter.heron.examples.ExclamationTopology \ ExclamationTopology

STEP 2: SUBMIT YET ANOTHER TOPOLOGYheron submit local ~/.heron/examples/heron-examples.jar \ com.twitter.heron.examples.AckingTopology \ AckingTopology

STEP 3: SUBMIT YET ANOTHER ANOTHER TOPOLOGYheron submit local ~/.heron/examples/heron-examples.jar \ com.twitter.heron.examples.MultiStageAckingTopology \ MultiStageAckingTopology

Page 54: Handson with Twitter Heron

EXPLORE UI

STEP 1: LOGICAL PLAN AND PHYSICAL PLAN

STEP 2: EXPLORE DASHBOARD

STEP 3: VIEW METRICS

Page 55: Handson with Twitter Heron

EXPLORE UI

STEP 4: VIEW PROCESS LOGS

STEP 5: VIEW EXCEPTIONS

STEP 6: EXPLORE OTHER FEATURES

Page 56: Handson with Twitter Heron

HERON STARTER

SEVERAL STARTER EXAMPLES

https://github.com/kramasamy/heron-starter

INTERESTING EXERCISES

Trending Twitter Words Trending Twitter Hashtags Find top retweeters for given hash tag

Page 57: Handson with Twitter Heron

QUESTIONS IN HERON?

ANY QUESTIONS/CLARIFICATIONS? https://groups.google.com/forum/#!forum/heron-users

http://heronstreaming.io

FOR UPDATES FOLLOW

@HERONSTREAMING

Page 58: Handson with Twitter Heron

"

#ThankYouFOR TRYING