hadoop strata talk - uber, your hadoop has arrived

47
D A T A “Uber, Your Hadoop Has Arrived” Vinoth Chandar

Upload: vinoth-chandar

Post on 06-Jan-2017

1.576 views

Category:

Engineering


3 download

TRANSCRIPT

D A T A

“Uber, Your Hadoop Has Arrived”Vinoth Chandar

Edit or delete footer text in Master ipsandella doloreium dem isciame ndaestia nessed quibus aut hiligenet ut ea debisci eturiate poresti vid min core, vercidigent.

Uber’s Mission

“Transportation as reliable as running water,

everywhere, for everyone”

400+ Cities 69 Countries

And growing...

Agenda

Bringing Hadoop To Uber

Hadoop Ecosystem

Challenges Ahead

Data @ Uber : Impact

1. City OPS○ Data Users Operating a massive transportation system

2. Analysts & Execs○ Marketing Spend, Forecasting

3. Engineers & Data Scientists○ Fraud Detection & Discovery

4. Critical Business Operations○ Incentive Payments/Background Checks

5. Fending Off Uber’s Legal/Regulatory Challenges○ “You have to produce this data in X hours”

Data @ Uber : Circa 2014

Kafka7 Logs

Schemaless Databases

RDBMS Tables

Vertica

Applications- Incentive Payments- Machine Learning- Safety- Background Checks

uploader Amazon S3 EMR

Wall-e ETL

Adhoc SQL- City Ops/DOPS- Data Scientists

Pain Point #1: Data Reliability

- Free Form python/node objects -> heavily nested JSON- Word-of-mouth Schema communication

Lots ofEngineers &Lots of services

Lots ofCity OPS

ProducersData Team

Consumers

$$$$ Data Pipeline

Pain Point #2: System Scalability

- Kafka7 : Heavy Topics/No HA- Wall-e : Celery workers unable to keep up with Kafka/Schemaless- Vertica Queries : More & More Raw Data piling on

H1 2014

H2 2014 & beyond

Pain Point #3: Fragile Ingestion Model

- Multiple fetching from sources- Painful Backfills, since projections & transformation are in pipelines

mezzanine

trips_table1

trips_table2

trips_table3 Warehouse

mezzanine

trips_table1

trips_table2

trips_table3 Warehouse

VS

DataPool?

Pain Point #4: No Multi-DC Support

- No Unified view of data, More complexity from consumer- Wasteful use of WAN traffic

DC1

DC2

Global Warehouse

Hadoop Data Lake: Pain,Pain Go Away!

- (Pain 1) Schematize All Data (old & new) - Heatpipe/Schema Service/Paricon

- (Pain 2) All Infrastructure Shall Scale Horizontally - Kafka8 & Hadoop- Streamific/Sqoop (Deliver data to HDFS)- Lizzie(Feed Vertica)/Komondor(Feed Hive)

- (Pain 3) Store raw data in nested glory in Hadoop - Json -> Avro records -> Parquet!

- (Pain 4) Global View Of All Data - Unified tables! Yay!

Uber’s Schema Talk : Tomorrow, 2:40PM

Hadoop Ecosystem: Overview

Kafka8 Logs

Schemaless Databases

SOA Tables

Vertica

Adhoc SQL (Web UI)

Lizzie ETL (Spark)

Streamific

Json,Avro

Hive (parquet)

Streamific

Sqoop

ETL(Modeled Tables)

Janus

Fraud (Hive)

Machine Learning (Spark)

Safety Apps (Spark)

Backfill Pipelines (Spark)

Hadoop

ETL Modeled Tables (Hive)

Back to- Hive- Kafka- NoSQL

flat table

modeled table

Komondor (Spark)

Hadoop Ecosystem: Data Ingestion

Row Based (HBase/SequenceFiles)

(Parquet) Columnar

HDFS

Komondor(Batch)

Kafka Logs

DB Redo Logs

DC1

DC2

DC1

DC2

Streamific(Streaming,duh..)

Hadoop Ecosystem: Streamific

Long-running service- Backfills/Catch-up don’t hurt

sources

Low Latency delivery into row-oriented storage

- HBase/HDFS Append**

Deployed/Monitored the ‘uber’ way.

- Can run on DCs without YARN etc

Core (HA, Checkpointing, Ordering, Throttling, Atleast-once guarantees) + Pluggable In/Out streams. Akka (Peak 900MB/sec),Helix (300K partitions)

HBase

HDFS

Kafka

Kafka

Schema-less

S3

Hadoop Ecosystem: Komondor

The YARN/Spark Muscle- Parquet writing is expensive- 1->N mapping from raw to

parquet/Hive table

Control Data Quality - Schema Enforcement- Cleaning JSON- Hive Partitioning

File Stitching- Keeps NN happy & queries

performant

Let’s “micro batch”?- HDFS iNotify stability issues

Kafka logs

DB Changelogs

Full Snapshot - Trips (partitioned by request date)- User (partitioned by join date)

- Kafka events (partitioned by event publish date)- Transaction history (partitioned by charge date)

Snapshot tables

Incremental tables

Full dump

New Files

(HBase)

(HDFS)

(HDFS)

Hadoop Ecosystem: Consuming Data

1. Adhoc SQLa. Gateway service => Janus

i. Keep bad queries out!ii. Choose YARN queues

b. Hits HiveServer2/Tez

2. Data Appsa. Spark/SparkSQL via HiveContext b. Support for saving results to Hive/Kafkac. Monitoring/Debugging the ‘uber’ way

3. Lightweight Appsa. Python apps hitting gatewayb. Fetch Small results via WebHDFS

Hadoop Ecosystem: Feeding Data-marts

Vertica- SparkSQL/Oozie ETL framework to produce flattened tables

- High Volume- Simple projections/row-level xforms

- HiveQL to produce well-modelled tables- + Complex joins- Also lands tables into Hive

Real-time Dashboarding- Batch layer for lambda architecture- Memsql/Riak as the real-time stores

Hadoop Ecosystem: Some Numbers

HDFS - (Total) 4 PB in 1 HDFS Cluster - (Growth) ~3 TB/day and growing

YARN

- 6300 VCores (total)- Number of daily jobs - 60K- Number of compute hours daily - 78K (3250 days)

Hadoop Ecosystem: 2015 Wins

1. Hadoop is source-of-truth for analytics dataa. 100% of All analytics

2. Powered Critical Business Operations a. Partner Incentive Payments

3. Unlocked Dataa. Data in Hadoop >> Data in Vertica

We (almost) caught up!

Hadoop Ecosystem: 2016 Challenges

1. Interactive SQL at Scalea. Put the power of data in our City Ops’s hands

2. All-Activea. Keep data apps working during failovers

3. Fresher Data in Hadoopa. Trips in Hive lands in 6 hrs, but 1 hr in Vertica

4. Incremental Computation a. Most Jobs run daily off raw tablesb. Intra hour jobs to build modeled tables

Hadoop Ecosystem: 2016 Challenges

1. Interactive SQL at Scalea. Put the power of data in our Ops’s hands

2. All-Activea. Keep data apps working during failovers

3. Fresher Data in Hadoopa. Trips in Hive lands in 6 hrs, but 1 hr in Vertica

4. Incremental Computation a. Most Jobs run daily off raw tablesb. Intra hour jobs to build modeled tables

#1- Interactive SQL at Scale: Motivation

Vertica

Fast

Can’t cheaply scale

Powerful, scales reliably

Slowww….

Hive

#1- Interactive SQL at Scale: Presto

Fast- (-er than SparkSQL, -errr

than Hive-on-tez)Deployed at Scale

- (FB/Netflix)

Lack of UDF Interop

- Hive ⇔ Spark UDF interop is great!

Out-of-box Geo support

- ESRI/Magellan

Other Challenges:- Heavy joins in 100K+ existing queries

- Vertica degrades more gracefully- Colocation With Hadoop

- Network isolation

#1- Interactive SQL at Scale: Spark Notebooks

1. Great for data scientists!- Iterative prototyping/exploration

2. Zeppelin/JupyterHub on HDFS- Run off mesos clusters

3. Of course, Spark Shell!- Pipeline troubleshooting

#1- Interactive SQL at Scale: Plan

1. Get Presto up & running- Work off “modelled tables” out of Hive- Equivalent of Vertica usage today

2. Presto on Raw nested data- Raw data in Hive (will be) available at low latency- Uber’s scalable near real-time warehouse

3. Support Spark Notebook use cases- Similar QoS issues hitting HDFS from Mesos

Hadoop Ecosystem: 2016 Challenges

1. Interactive SQL at Scalea. Put the power of data in our Ops’s hands

2. All-Activea. Keep data apps working during failovers

3. Fresher Data in Hadoopa. Trips in Hive lands in 6 hrs, but 1 hr in Vertica

4. Incremental Computation a. Most Jobs run daily off raw tablesb. Intra hour jobs to build modeled tables

#2- All-Active: Motivation

Low availability, SPOF

Data From All DCs replicated to single global data lake

Data copied in-out, high SLA

Assumes unbounded WAN links

Less Operational overhead

#2- All-Active: Plan**

Same HA as online services(You failover, with data readily

available)

Maintain N Hadoop Lakes?

Data is replicated to peer data centers and into global

data lakes (solid lines).

#2- All-Active: Challenges

1. Cross DC replicator design- File Level vs Record Level Replication

2. Policy Management - Which data is worth replicating- Which fields are PII?

3. Reducing Storage Footprint- 9 copies!! (2 Local Lakes + 1 Global Lake = 3 * 3 times from HDFS)- Federation across WAN?

4. Capacity Management for Failover- Degraded mode or hot standby?

Hadoop Ecosystem: 2016 Challenges

1. Interactive SQL at Scalea. Put the power of data in our Ops’s hands

2. All-Activea. Keep data apps working during failovers

3. Fresher Data in Hadoopa. Trips in Hive lands in 6 hrs, but 1 hr in Vertica

4. Incremental Computation a. Most Jobs run daily off raw tablesb. Intra hour jobs to build modeled tables

#3- Fresher Data in Hadoop: Motivation1. Uber’s business is inherently ‘real-time’

- Uber’s City Ops fresh data, to ‘debug’ Uber.

2. All the Data is in Hadoop Anyway- Reduce mindless data movement

3. Leverage Power Of Existing SQL Engines- Standard SQL Interface & Mature Join support

Vertica

#3- Fresher Data: Trips on Hadoop (Today)

Schemaless (dc1)

Schemaless(dc2)

Hadoop

Streamific

trips(raw table)

Rows (tripid => row)

(new/updated trip rows)

Vertica

Cells (Streaming)

Cells(Streaming)

Streaming

10 mins, file uploads 1 hr (tunable)

6 hr, snapshot

Incremental Update

Snapshot: Inefficient & Slow

a) Snapshot job => 100s of Mappersb) Reads TBs, Writes TBsc) But, just X GB actual data per day!!!!

Hive HBase

Changes To HBase

6 hrs

~1 hr

trips (flat table)

fact_trip (modeled table)

#3- Fresher Data: Modelled Tables In Hadoop

Schemaless (dc1)

Schemaless(dc2)

Hadoop

Streamific

trips(raw

table)

fact_trip (modelled table)

(~7-8+hrs)

Rows (tripid => row)

Vertica

Cells (Streaming)

Cells (Streaming)

Streaming6 hr, snapshot

Latency & Inefficiency worsen further

a)Spark/Presto on modelled tables goes from 1-2 hrs to 7-8 hrs!!b)Resource Usage shoots up

Hive HBase

(new/updated trip rows)

Changes To HBase

fact_trip(modelled

table)

Hive

1-2 hr, snapshot10 mins, file

uploads7-8+ hr

#3- Fresher Data: Let’s incrementally update?

Schemaless (dc1)

Schemaless(dc2)

Hadoop

Streamific

trips(raw

table)Rows (tripid => row)

Cells (Streaming)

Cells (Streaming)

Streaming

So Problem Solved, right?

a) Same pattern as Vertica load

b) Saves a bunch of resources

c) And shrinks down latency.

Hive

HBase

(new/updated trip rows)

Changes To HBase

trips(modelled

table)Hive

30 mins, Incremental Update

Incremental Update30 mins

10 mins, file uploads

< 1 hr

~1 hr

#3- Fresher Data: HDFS/Hive Updates are tedious

Hadoop

So Problem Solved, right?

Except

HBase

Changes To HBase

Hive

Cells (Streaming)

Streamific

10 mins, file uploads

(new/updated trip rows)

Incremental Update30 mins

30 mins, Incremental Update

trips(modelled

table)

trips(raw

table)

Schemaless (dc1)

Schemaless(dc2)

Rows (tripid => row)

Cells (Streaming)

Streaming

Hive

Update!

#3- Fresher Data: Trip Updates Problem Raw Trips Table in Hive

New trips/Updated

Trips

2010-2014

2016/01/02

2016/01/03New Data

Unaffected Data

Updated Data

2015/12/(01-31)

Incremental update

2015/(01-05)

2015/(06-11)

Last 1 hr

Day level partitions

#3- Fresher Data: HDFS/Hive Updates are tedious

Hadoop

So Problem Solved, right?

Yes, except…

HBase

Changes To HBase

Hive

Cells (Streaming)

Streamific

10 mins, file uploads

(new/updated trip rows)

Incremental Update30 mins

30 mins, Incremental Update

trips(modelled

table)

trips(raw

table)

Schemaless (dc1)

Schemaless(dc2)

Rows (tripid => row)

Cells (Streaming)

Streaming

Hive

Update!Good News: Solve this & everything becomes

#3- Fresher Data: Solving Updates

1. Simple Folder/Partition Re-writing- Most commonly used approach in Hadoop land

2. File Level Updates- Similar to a, but at file level

3. Record Level Updates- Feels like a k-v store on parquet (and thus more complex)- Similar to Kudu/Hive transactions

#3- Fresher Data: Plan

● Pick File Level Update approach○ Establish all the machinery (Custom InputFormat, Spark/Presto

Connectors)○ Get latency down to 15mins - 1 hour

● Record Level Update approach, if needed○ Study numbers from production○ Switch will be transparent to consumers

● In Summary, ○ Unlocks interactive SQL on raw “nested” table at low latency

Hadoop Ecosystem: 2016 Challenges

1. Interactive SQL at Scalea. Put the power of data in our Ops’s hands

2. All-Activea. Keep data apps working during failovers

3. Fresher Data in Hadoopa. Trips in Hive lands in 6 hrs, but 1 hr in Vertica

4. Incremental Computation a. Most Jobs run daily off raw tablesb. Intra hour jobs to build modeled tables

#4- Incremental Computation: Recurring Jobs

● State of the art : Consume Complete/Immutable Partitions

- Determine when partition/time window is complete- Trigger workflows waiting on that partition/time window

● As partition size , incoming data into old time bucket- With 1-min/10-min partitions, keep getting new data for old time buckets

● Fundamental tradeoff- More Latency => More Completeness

-

#4- Incremental Computation: Use Cases

- Diff apps => Diff needsa. Hadoop has one mode

“completeness”

- Apps must be able to choosea. job.trigger.

atCompleteness(90)b. job.schedule.atInterval

(10 mins)- Closest thing out there -

Google CloudflowCompleteness

Latency

Incentive payments

Fraud Detection

Backfill Pipelines

Business Dashboards/ETL

Days

Hour

< 1 hr

Day Data Science

Safety App

#4- Incremental Computation: Tailing Tables

● Adding a new style of consuming data - Obtain new records loaded into table, since last run, across partitions- Consume in 15-30 min batches

● Favours Latency - Providing new data quickly- Consumer logic responsible for reconciliation with previous results

● Need a special marker to denote consumption point- commitTime: For each record, the time at which it was last updated

#4- Incremental Computation: Plan

● Add Metadata at Record-level to enable tailing○ Extra book-keeping to map commitTime to Hive Partitions/Files

■ Avoid disastrous full scans ○ Can be combined with granular Hive partitions if needed

■ 15 min Hive Partitions => ~200K partitions for trip table

● Open Items:○ Late arrival handling

■ Tracking when a time-window become complete■ Design to (re)trigger workflows

○ Incrementally Recomputing aggregates

Hadoop Ecosystem: 2016 Challenges

1. Interactive SQL at Scalea. Put the power of data in our Ops’s hands

2. All-Activea. Keep data apps working during failovers

3. Fresher Data in Hadoopa. Trips in Hive lands in 6 hrs, but 1 hr in Vertica

4. Incremental Computation a. Most Jobs run daily off raw tablesb. Intra hour jobs to build modeled tables

Summary

Today- Living, breathing data ecosystem- Catch(-ing) up to the state-of-art

Tomorrow- Push edges based on Uber’s needs

- Near Real-time Warehouse- Incremental Compute- All-Active

- Make Every Decision (Human/Machine) data driven

Of course, we are hiring ;)

Thank you!