hadoop strata talk - uber, your hadoop has arrived
TRANSCRIPT
Edit or delete footer text in Master ipsandella doloreium dem isciame ndaestia nessed quibus aut hiligenet ut ea debisci eturiate poresti vid min core, vercidigent.
Uber’s Mission
“Transportation as reliable as running water,
everywhere, for everyone”
400+ Cities 69 Countries
And growing...
Data @ Uber : Impact
1. City OPS○ Data Users Operating a massive transportation system
2. Analysts & Execs○ Marketing Spend, Forecasting
3. Engineers & Data Scientists○ Fraud Detection & Discovery
4. Critical Business Operations○ Incentive Payments/Background Checks
5. Fending Off Uber’s Legal/Regulatory Challenges○ “You have to produce this data in X hours”
Data @ Uber : Circa 2014
Kafka7 Logs
Schemaless Databases
RDBMS Tables
Vertica
Applications- Incentive Payments- Machine Learning- Safety- Background Checks
uploader Amazon S3 EMR
Wall-e ETL
Adhoc SQL- City Ops/DOPS- Data Scientists
Pain Point #1: Data Reliability
- Free Form python/node objects -> heavily nested JSON- Word-of-mouth Schema communication
Lots ofEngineers &Lots of services
Lots ofCity OPS
ProducersData Team
Consumers
$$$$ Data Pipeline
Pain Point #2: System Scalability
- Kafka7 : Heavy Topics/No HA- Wall-e : Celery workers unable to keep up with Kafka/Schemaless- Vertica Queries : More & More Raw Data piling on
H1 2014
H2 2014 & beyond
Pain Point #3: Fragile Ingestion Model
- Multiple fetching from sources- Painful Backfills, since projections & transformation are in pipelines
mezzanine
trips_table1
trips_table2
trips_table3 Warehouse
mezzanine
trips_table1
trips_table2
trips_table3 Warehouse
VS
DataPool?
Pain Point #4: No Multi-DC Support
- No Unified view of data, More complexity from consumer- Wasteful use of WAN traffic
DC1
DC2
Global Warehouse
Hadoop Data Lake: Pain,Pain Go Away!
- (Pain 1) Schematize All Data (old & new) - Heatpipe/Schema Service/Paricon
- (Pain 2) All Infrastructure Shall Scale Horizontally - Kafka8 & Hadoop- Streamific/Sqoop (Deliver data to HDFS)- Lizzie(Feed Vertica)/Komondor(Feed Hive)
- (Pain 3) Store raw data in nested glory in Hadoop - Json -> Avro records -> Parquet!
- (Pain 4) Global View Of All Data - Unified tables! Yay!
Hadoop Ecosystem: Overview
Kafka8 Logs
Schemaless Databases
SOA Tables
Vertica
Adhoc SQL (Web UI)
Lizzie ETL (Spark)
Streamific
Json,Avro
Hive (parquet)
Streamific
Sqoop
ETL(Modeled Tables)
Janus
Fraud (Hive)
Machine Learning (Spark)
Safety Apps (Spark)
Backfill Pipelines (Spark)
Hadoop
ETL Modeled Tables (Hive)
Back to- Hive- Kafka- NoSQL
flat table
modeled table
Komondor (Spark)
Hadoop Ecosystem: Data Ingestion
Row Based (HBase/SequenceFiles)
(Parquet) Columnar
HDFS
Komondor(Batch)
Kafka Logs
DB Redo Logs
DC1
DC2
DC1
DC2
Streamific(Streaming,duh..)
Hadoop Ecosystem: Streamific
Long-running service- Backfills/Catch-up don’t hurt
sources
Low Latency delivery into row-oriented storage
- HBase/HDFS Append**
Deployed/Monitored the ‘uber’ way.
- Can run on DCs without YARN etc
Core (HA, Checkpointing, Ordering, Throttling, Atleast-once guarantees) + Pluggable In/Out streams. Akka (Peak 900MB/sec),Helix (300K partitions)
HBase
HDFS
Kafka
Kafka
Schema-less
S3
Hadoop Ecosystem: Komondor
The YARN/Spark Muscle- Parquet writing is expensive- 1->N mapping from raw to
parquet/Hive table
Control Data Quality - Schema Enforcement- Cleaning JSON- Hive Partitioning
File Stitching- Keeps NN happy & queries
performant
Let’s “micro batch”?- HDFS iNotify stability issues
Kafka logs
DB Changelogs
Full Snapshot - Trips (partitioned by request date)- User (partitioned by join date)
- Kafka events (partitioned by event publish date)- Transaction history (partitioned by charge date)
Snapshot tables
Incremental tables
Full dump
New Files
(HBase)
(HDFS)
(HDFS)
Hadoop Ecosystem: Consuming Data
1. Adhoc SQLa. Gateway service => Janus
i. Keep bad queries out!ii. Choose YARN queues
b. Hits HiveServer2/Tez
2. Data Appsa. Spark/SparkSQL via HiveContext b. Support for saving results to Hive/Kafkac. Monitoring/Debugging the ‘uber’ way
3. Lightweight Appsa. Python apps hitting gatewayb. Fetch Small results via WebHDFS
Hadoop Ecosystem: Feeding Data-marts
Vertica- SparkSQL/Oozie ETL framework to produce flattened tables
- High Volume- Simple projections/row-level xforms
- HiveQL to produce well-modelled tables- + Complex joins- Also lands tables into Hive
Real-time Dashboarding- Batch layer for lambda architecture- Memsql/Riak as the real-time stores
Hadoop Ecosystem: Some Numbers
HDFS - (Total) 4 PB in 1 HDFS Cluster - (Growth) ~3 TB/day and growing
YARN
- 6300 VCores (total)- Number of daily jobs - 60K- Number of compute hours daily - 78K (3250 days)
Hadoop Ecosystem: 2015 Wins
1. Hadoop is source-of-truth for analytics dataa. 100% of All analytics
2. Powered Critical Business Operations a. Partner Incentive Payments
3. Unlocked Dataa. Data in Hadoop >> Data in Vertica
We (almost) caught up!
Hadoop Ecosystem: 2016 Challenges
1. Interactive SQL at Scalea. Put the power of data in our City Ops’s hands
2. All-Activea. Keep data apps working during failovers
3. Fresher Data in Hadoopa. Trips in Hive lands in 6 hrs, but 1 hr in Vertica
4. Incremental Computation a. Most Jobs run daily off raw tablesb. Intra hour jobs to build modeled tables
Hadoop Ecosystem: 2016 Challenges
1. Interactive SQL at Scalea. Put the power of data in our Ops’s hands
2. All-Activea. Keep data apps working during failovers
3. Fresher Data in Hadoopa. Trips in Hive lands in 6 hrs, but 1 hr in Vertica
4. Incremental Computation a. Most Jobs run daily off raw tablesb. Intra hour jobs to build modeled tables
#1- Interactive SQL at Scale: Motivation
Vertica
Fast
Can’t cheaply scale
Powerful, scales reliably
Slowww….
Hive
#1- Interactive SQL at Scale: Presto
Fast- (-er than SparkSQL, -errr
than Hive-on-tez)Deployed at Scale
- (FB/Netflix)
Lack of UDF Interop
- Hive ⇔ Spark UDF interop is great!
Out-of-box Geo support
- ESRI/Magellan
Other Challenges:- Heavy joins in 100K+ existing queries
- Vertica degrades more gracefully- Colocation With Hadoop
- Network isolation
#1- Interactive SQL at Scale: Spark Notebooks
1. Great for data scientists!- Iterative prototyping/exploration
2. Zeppelin/JupyterHub on HDFS- Run off mesos clusters
3. Of course, Spark Shell!- Pipeline troubleshooting
#1- Interactive SQL at Scale: Plan
1. Get Presto up & running- Work off “modelled tables” out of Hive- Equivalent of Vertica usage today
2. Presto on Raw nested data- Raw data in Hive (will be) available at low latency- Uber’s scalable near real-time warehouse
3. Support Spark Notebook use cases- Similar QoS issues hitting HDFS from Mesos
Hadoop Ecosystem: 2016 Challenges
1. Interactive SQL at Scalea. Put the power of data in our Ops’s hands
2. All-Activea. Keep data apps working during failovers
3. Fresher Data in Hadoopa. Trips in Hive lands in 6 hrs, but 1 hr in Vertica
4. Incremental Computation a. Most Jobs run daily off raw tablesb. Intra hour jobs to build modeled tables
#2- All-Active: Motivation
Low availability, SPOF
Data From All DCs replicated to single global data lake
Data copied in-out, high SLA
Assumes unbounded WAN links
Less Operational overhead
#2- All-Active: Plan**
Same HA as online services(You failover, with data readily
available)
Maintain N Hadoop Lakes?
Data is replicated to peer data centers and into global
data lakes (solid lines).
#2- All-Active: Challenges
1. Cross DC replicator design- File Level vs Record Level Replication
2. Policy Management - Which data is worth replicating- Which fields are PII?
3. Reducing Storage Footprint- 9 copies!! (2 Local Lakes + 1 Global Lake = 3 * 3 times from HDFS)- Federation across WAN?
4. Capacity Management for Failover- Degraded mode or hot standby?
Hadoop Ecosystem: 2016 Challenges
1. Interactive SQL at Scalea. Put the power of data in our Ops’s hands
2. All-Activea. Keep data apps working during failovers
3. Fresher Data in Hadoopa. Trips in Hive lands in 6 hrs, but 1 hr in Vertica
4. Incremental Computation a. Most Jobs run daily off raw tablesb. Intra hour jobs to build modeled tables
#3- Fresher Data in Hadoop: Motivation1. Uber’s business is inherently ‘real-time’
- Uber’s City Ops fresh data, to ‘debug’ Uber.
2. All the Data is in Hadoop Anyway- Reduce mindless data movement
3. Leverage Power Of Existing SQL Engines- Standard SQL Interface & Mature Join support
Vertica
#3- Fresher Data: Trips on Hadoop (Today)
Schemaless (dc1)
Schemaless(dc2)
Hadoop
Streamific
trips(raw table)
Rows (tripid => row)
(new/updated trip rows)
Vertica
Cells (Streaming)
Cells(Streaming)
Streaming
10 mins, file uploads 1 hr (tunable)
6 hr, snapshot
Incremental Update
Snapshot: Inefficient & Slow
a) Snapshot job => 100s of Mappersb) Reads TBs, Writes TBsc) But, just X GB actual data per day!!!!
Hive HBase
Changes To HBase
6 hrs
~1 hr
trips (flat table)
fact_trip (modeled table)
#3- Fresher Data: Modelled Tables In Hadoop
Schemaless (dc1)
Schemaless(dc2)
Hadoop
Streamific
trips(raw
table)
fact_trip (modelled table)
(~7-8+hrs)
Rows (tripid => row)
Vertica
Cells (Streaming)
Cells (Streaming)
Streaming6 hr, snapshot
Latency & Inefficiency worsen further
a)Spark/Presto on modelled tables goes from 1-2 hrs to 7-8 hrs!!b)Resource Usage shoots up
Hive HBase
(new/updated trip rows)
Changes To HBase
fact_trip(modelled
table)
Hive
1-2 hr, snapshot10 mins, file
uploads7-8+ hr
#3- Fresher Data: Let’s incrementally update?
Schemaless (dc1)
Schemaless(dc2)
Hadoop
Streamific
trips(raw
table)Rows (tripid => row)
Cells (Streaming)
Cells (Streaming)
Streaming
So Problem Solved, right?
a) Same pattern as Vertica load
b) Saves a bunch of resources
c) And shrinks down latency.
Hive
HBase
(new/updated trip rows)
Changes To HBase
trips(modelled
table)Hive
30 mins, Incremental Update
Incremental Update30 mins
10 mins, file uploads
< 1 hr
~1 hr
#3- Fresher Data: HDFS/Hive Updates are tedious
Hadoop
So Problem Solved, right?
Except
HBase
Changes To HBase
Hive
Cells (Streaming)
Streamific
10 mins, file uploads
(new/updated trip rows)
Incremental Update30 mins
30 mins, Incremental Update
trips(modelled
table)
trips(raw
table)
Schemaless (dc1)
Schemaless(dc2)
Rows (tripid => row)
Cells (Streaming)
Streaming
Hive
Update!
#3- Fresher Data: Trip Updates Problem Raw Trips Table in Hive
New trips/Updated
Trips
2010-2014
2016/01/02
2016/01/03New Data
Unaffected Data
Updated Data
2015/12/(01-31)
Incremental update
2015/(01-05)
2015/(06-11)
Last 1 hr
Day level partitions
#3- Fresher Data: HDFS/Hive Updates are tedious
Hadoop
So Problem Solved, right?
Yes, except…
HBase
Changes To HBase
Hive
Cells (Streaming)
Streamific
10 mins, file uploads
(new/updated trip rows)
Incremental Update30 mins
30 mins, Incremental Update
trips(modelled
table)
trips(raw
table)
Schemaless (dc1)
Schemaless(dc2)
Rows (tripid => row)
Cells (Streaming)
Streaming
Hive
Update!Good News: Solve this & everything becomes
#3- Fresher Data: Solving Updates
1. Simple Folder/Partition Re-writing- Most commonly used approach in Hadoop land
2. File Level Updates- Similar to a, but at file level
3. Record Level Updates- Feels like a k-v store on parquet (and thus more complex)- Similar to Kudu/Hive transactions
#3- Fresher Data: Plan
● Pick File Level Update approach○ Establish all the machinery (Custom InputFormat, Spark/Presto
Connectors)○ Get latency down to 15mins - 1 hour
● Record Level Update approach, if needed○ Study numbers from production○ Switch will be transparent to consumers
● In Summary, ○ Unlocks interactive SQL on raw “nested” table at low latency
Hadoop Ecosystem: 2016 Challenges
1. Interactive SQL at Scalea. Put the power of data in our Ops’s hands
2. All-Activea. Keep data apps working during failovers
3. Fresher Data in Hadoopa. Trips in Hive lands in 6 hrs, but 1 hr in Vertica
4. Incremental Computation a. Most Jobs run daily off raw tablesb. Intra hour jobs to build modeled tables
#4- Incremental Computation: Recurring Jobs
● State of the art : Consume Complete/Immutable Partitions
- Determine when partition/time window is complete- Trigger workflows waiting on that partition/time window
● As partition size , incoming data into old time bucket- With 1-min/10-min partitions, keep getting new data for old time buckets
● Fundamental tradeoff- More Latency => More Completeness
-
#4- Incremental Computation: Use Cases
- Diff apps => Diff needsa. Hadoop has one mode
“completeness”
- Apps must be able to choosea. job.trigger.
atCompleteness(90)b. job.schedule.atInterval
(10 mins)- Closest thing out there -
Google CloudflowCompleteness
Latency
Incentive payments
Fraud Detection
Backfill Pipelines
Business Dashboards/ETL
Days
Hour
< 1 hr
Day Data Science
Safety App
#4- Incremental Computation: Tailing Tables
● Adding a new style of consuming data - Obtain new records loaded into table, since last run, across partitions- Consume in 15-30 min batches
● Favours Latency - Providing new data quickly- Consumer logic responsible for reconciliation with previous results
● Need a special marker to denote consumption point- commitTime: For each record, the time at which it was last updated
#4- Incremental Computation: Plan
● Add Metadata at Record-level to enable tailing○ Extra book-keeping to map commitTime to Hive Partitions/Files
■ Avoid disastrous full scans ○ Can be combined with granular Hive partitions if needed
■ 15 min Hive Partitions => ~200K partitions for trip table
● Open Items:○ Late arrival handling
■ Tracking when a time-window become complete■ Design to (re)trigger workflows
○ Incrementally Recomputing aggregates
Hadoop Ecosystem: 2016 Challenges
1. Interactive SQL at Scalea. Put the power of data in our Ops’s hands
2. All-Activea. Keep data apps working during failovers
3. Fresher Data in Hadoopa. Trips in Hive lands in 6 hrs, but 1 hr in Vertica
4. Incremental Computation a. Most Jobs run daily off raw tablesb. Intra hour jobs to build modeled tables
Summary
Today- Living, breathing data ecosystem- Catch(-ing) up to the state-of-art
Tomorrow- Push edges based on Uber’s needs
- Near Real-time Warehouse- Incremental Compute- All-Active
- Make Every Decision (Human/Machine) data driven
Of course, we are hiring ;)