sql on hadoop in taiwan
DESCRIPTION
http://2014.hadoopcon.org/wp/?p=10TRANSCRIPT
Masahiro NakagawaSep 13, 2014
Hadoop Meetup in Taiwan
SQL on Hadoopa Perspective of a Cloud-based, Managed Service Provider
Today’s agenda
> Self introduction > Why SQL? > Hive > Presto > Conclusion
Who are you?
> Masahiro Nakagawa > github/twitter: @repeatedly
> Treasure Data, Inc. > Senior Software Engineer > Fluentd / td-agent developer
> I love OSS :) > D language - Phobos committer > Fluentd - Main maintainer > MessagePack / RPC- D and Python (only RPC) > The organizer of Presto Source Code Reading > etc…
Do you love SQL?
Why we love SQL?
> Easy to understand what we are doing > declarative language > common interface for data manipulation
> There are many users > SQL is not the best but
better than uncommon interfaces
We want to use SQL in the Hadoop world
> Hive > Spark SQL
Batch
Short Batch Low latency
Stream
> Presto > Impala > Drill
> Norikra > StreamSQL
> HAWQ > Actian > etc…
This color indicates a commercial product
SQL Players on Hadoop
Latency: minutes - hours
Latency: seconds - minutes
Latency: immediate
> Hive > Spark SQL
SQL Players on Hadoop
Batch
Short Batch Low latency
Stream
> Presto > Impala > Drill
> HAWQ > Actian > etc…
Red Ocean
Blue Ocean?> Norikra > StreamSQL
This color indicates a commercial product
> Hive (batch) > for ETL and scheduled reporting
> Presto (short batch / low latency) > for Ad hoc queries
> Pig > Not SQL > There aren’t as many users… ;(
3 query engines on Treasure Data
Today’s talk
Prestohttps://hive.apache.org/
What’s Hive> Needs no explanation ;)
> Most popular project in the ecosystem > HiveQL and MapReduce
> Writing MapReduce code is hard > Hive is growing rapidly by Stinger initiative
> Vectorized Processing > Query optimization with statistics > Tez instead of MapReduce > etc…
> Low level framework for YARN applications > Next generation query engine > Provide good IR for Hive, Pig and more
> Task and DAG based pipelining > Spark uses a similar DAG model
Apache Tez
ProcessorInput Output
Task DAGhttp://tez.apache.org/
Hive on MR vs. Hive on Tez
MapReduce Tez
http://www.slideshare.net/Hadoop_Summit/w-235phall1pandey/9
M
HDFS
R
R
M M
HDFS HDFS
R
M M
R
M M
R
M
R
M MM
M M
R
R
R
Can avoid unnecessary HDFS write
SELECT g1.x, g2.avg, g2.cnt FROM (SELECT a.x AVERAGE(a.y) AS avg FROM a GROUP BY a.x) g1"JOIN (SELECT b.x, COUNT(b.y) AS avg FROM b GROUP BY b.x) g2"ON (g1.x = g2.x) ORDER BY avg;
GROUP b BY b.xGROUP a BY a.x
JOIN (a, b)
ORDER BY
GROUP BY x
GROUP BY a.x"JOIN (a, b)
ORDER BY
Why still use MapReduce?> The emphasis is on stability / reliability
> Speed is important but not most important > Can use a MPP query engine for short batch
> Tez/Spark are immature > Hard to manage in a multi-tenant env > Different failure models > We are now testing Tez for Hive
• No code change needed for Hive. Spark is hard… • Disabling Tez is easy. Just remove
‘set hive.execution.engine=tez;’
Prestohttp://prestodb.io/
What’s Presto?
A distributed SQL query engine for interactive data analisys against GBs to PBs of data.
Presto’s history
> 2012 Fall: Project started at Facebook > Designed for interactive query
with speed of commercial data warehouse
> and scalability to the size of Facebook > 2013 Winter: Open sourced! > 30+ contributes in 6 months
> including people outside of Facebook
What problems does it solve?> We couldn’t visualize data in HDFS directly
using dashboards or BI tools > because Hive is too slow (not interactive) > or ODBC connectivity is unavailable/unstable
> We needed to store daily-batch results to an interactive DB for quick response(PostgreSQL, Redshift, etc.) > Interactive DB costs more & less scalable
> Some data are not stored in HDFS > We need to copy the data into HDFS to analyze
What problems does it solve?> We couldn’t visualize data in HDFS directly
using dashboards or BI tools > because Hive is too slow (not interactive) > or ODBC connectivity is unavailable/unstable
> We needed to store daily-batch results to an interactive DB for quick response(PostgreSQL, Redshift, etc.) > Interactive DB costs more & less scalable
> Some data are not stored in HDFS > We need to copy the data into HDFS to analyze
What problems does it solve?> We couldn’t visualize data in HDFS directly
using dashboards or BI tools > because Hive is too slow (not interactive) > or ODBC connectivity is unavailable/unstable
> We needed to store daily-batch results to an interactive DB for quick response(PostgreSQL, Redshift, etc.) > Interactive DB costs more & less scalable
> Some data are not stored in HDFS > We need to copy the data into HDFS to analyze
What problems does it solve?> We couldn’t visualize data in HDFS directly
using dashboards or BI tools > because Hive is too slow (not interactive) > or ODBC connectivity is unavailable/unstable
> We needed to store daily-batch results to an interactive DB for quick response(PostgreSQL, Redshift, etc.) > Interactive DB costs more & less scalable
> Some data are not stored in HDFS > We need to copy the data into HDFS to analyze
HDFS
Hive
PostgreSQL, etc.
Daily/Hourly BatchInteractive query
CommercialBI Tools
Batch analysis platform Visualization platform
Dashboard
HDFS
Hive
Daily/Hourly BatchInteractive query
✓ Less scalable ✓ Extra cost
CommercialBI Tools
Dashboard
✓ More work to manage 2 platforms
✓ Can’t query against “live” data directly
Batch analysis platform Visualization platform
PostgreSQL, etc.
HDFS
Hive Dashboard
Presto
PostgreSQL, etc.
Daily/Hourly Batch
HDFS
HiveDashboard
Daily/Hourly Batch
Interactive query
Interactive query
Presto
HDFS
HiveDashboard
Daily/Hourly BatchInteractive query
Cassandra MySQL Commertial DBs
SQL on any data sets
Presto
HDFS
HiveDashboard
Daily/Hourly BatchInteractive query
Cassandra MySQL Commertial DBs
SQL on any data sets CommercialBI Tools
✓ IBM Cognos✓ Tableau ✓ ...
Data analysis platform
dashboard on chart.io: https://chartio.com/
What can Presto do?
> Query interactively (in milliseconds to minutes) > MapReduce and Hive are still necessary for ETL
> Query using commercial BI tools or dashboards > Reliable ODBC/JDBC connectivity
> Query across multiple data sources such as Hive, HBase, Cassandra, or even commercial DBs > Plugin mechanism
> Integrate batch analysis + visualizationinto a single data analysis platform
Presto’s deployment> Facebook
> Multiple geographical regions > scaled to 1,000 nodes > actively used by 1,000+ employees > processing 1PB/day
> Netflix, Dropbox, Treasure Data, Airbnb, Qubole, LINE, GREE, Scaleout, etc
> Presto as a Service > Treasure Data, Qubole
Distributed architecture
Client
Coordinator ConnectorPlugin
Worker
Worker
Worker
Storage / Metadata
Discovery Service
Client
Coordinator ConnectorPlugin
Worker
Worker
Worker
Storage / Metadata
Discovery Service
1. Client sends a query using HTTP
Client
Coordinator ConnectorPlugin
Worker
Worker
Worker
Storage / Metadata
Discovery Service
2. Coordinator builds a query plan
Connector pluginprovides metadata (table schema, etc.)
Client
Coordinator ConnectorPlugin
Worker
Worker
Worker
Storage / Metadata
Discovery Service
3. Coordinator sends tasks to workers
Client
Coordinator ConnectorPlugin
Worker
Worker
Worker
Storage / Metadata
Discovery Service
4. Workers read data through connector plugin
Client
Coordinator ConnectorPlugin
Worker
Worker
Worker
Storage / Metadata
Discovery Service
5. Workers run tasks in memory and in parallel
Coordinator ConnectorPlugin
Worker
Worker
Worker
Storage / Metadata
Discovery Service
6. Client gets the result from a worker
Client
Client
Coordinator ConnectorPlugin
Worker
Worker
Worker
Storage / Metadata
Discovery Service
What’s Connectors?> Access to storage and metadata
> provide table schema to coordinators > provide table rows to workers
> Connectors are pluggable to Presto > written in Java
> Implementations: > Hive connector > Cassandra connector > MySQL through JDBC connector (prerelease) > Or your own connector
Client
Coordinator Hive Connector
Worker
Worker
Worker
HDFS,Hive Metastore
Discovery Service
find servers in a clusterHive connector
Client
Coordinator CassandraConnector
Worker
Worker
Worker
Cassandra
Discovery Service
find servers in a clusterCassandra connector
Client
Coordinator
otherconnectors
...
Worker
Worker
Worker
Cassandra
Discovery Service
find servers in a cluster
Hive Connector
HDFS / Metastore
Multiple connectors in a query
CassandraConnector
Other data sources...
Distributed architecture
> 3 type of servers: > Coordinator, worker, discovery service
> Get data/metadata through connector plugins. > Presto is NOT a database > Presto provides SQL to existent data stores
> Client protocol is HTTP + JSON > Language bindings:
Ruby, Python, PHP, Java (JDBC), R, Node.JS...
Query Execution
Presto’s execution model
> Presto is NOT MapReduce > Use its own execution engine
> Presto’s query plan is based on DAG > more like Apache Tez / Spark or
traditional MPP databases > Impala and Drill use a similar model
How query runs?
> Coordinator > SQL Parser > Query Planner > Execution planner
> Workers > Task execution scheduler
SQL
SQL Parser
AST
Logical Planner
Distributed Planner
Logical Query Plan
Execution Planner
Discovery Server
Connector
DistributedQuery Plan Execution Plan
Optimizer
NodeManager
✓ node list
✓ table schemaMetadata
SQL
SQL Parser
SQL
Distributed Planner
Logical Query Plan
Execution Planner
Discovery Service
Connector
Query Plan Execution Plan
Optimizer
NodeManager
✓ node list
✓ table schemaMetadata
(today’s talk)
Query Planner
Query Planner
SELECT name, count(*) AS c FROM impressions GROUP BY name
SQL
impressions ( name varchar time bigint)
Table schemaTable scan
(name:varchar)
GROUP BY (name,
count(*))
Output (name, c)
+
Sink
Final aggr
Exchange
Sink
Partial aggr
Table scan
Output
Exchange
Logical query plan
Distributed query plan
Query Planner - Stages
Sink
Final aggr
Exchange
Sink
Partial aggr
Table scan
Output
Exchange
inter-worker data transfer
pipelined aggregation
inter-worker data transfer
Stage-0
Stage-1
Stage-2
Sink
Partial aggr
Table scan
Sink
Partial aggr
Table scan
Execution Planner
+Node list✓ 2 workers
Sink
Final aggr
Exchange
Output
Exchange
Sink
Final aggr
Exchange
Sink
Final aggr
Exchange
Sink
Partial aggr
Table scan
Output
Exchange
Worker 1 Worker 2
Execution Planner - Tasks
Sink
Final aggr
Exchange
Sink
Partial aggr
Table scan
Sink
Final aggr
Exchange
Sink
Partial aggr
Table scan
Task1 task / worker / stage ✓ All tasks in parallel
Output
Exchange
Worker 1 Worker 2
Execution Planner - Split
Sink
Final aggr
Exchange
Sink
Partial aggr
Table scan
Sink
Final aggr
Exchange
Sink
Partial aggr
Table scan
Output
ExchangeSplit
many splits / task = many threads / worker (table scan)
1 split / task = 1 thread / worker
Worker 1 Worker 2
1 split / worker = 1 thread / worker
All stages are pipe-lined ✓ No wait time ✓ No fault-tolerance
MapReduce vs. Presto
MapReduce Presto
map map
reduce reduce
task task
task task
task
task
memory-to-memory data transfer ✓ No disk IO ✓ Data chunk must fit in memory
task
disk
map map
reduce reduce
disk
disk
Write data to disk
Wait betweenstages
Query Execution
> SQL is converted into stages, tasks and splits > All tasks run in parallel
> No wait time between stages (pipelined) > If one task fails, all tasks fail at once (query fails)
> Memory-to-memory data transfer > No disk IO > If aggregated data doesn’t fit in memory,
query fails • Note: query dies but worker doesn’t die.
Memory consumption of all queries is fully managed
Why select Presto?
> The ease of operations > Easy to deploy. Just drop a jar > Easy to extend its functionalities
• Pluggable and DI based loose coupling > Doesn’t crash when a query fails
> Standard SQL syntax > Important for existing DB/DWH users > HiveQL is for MapReduce, not MPP DB
> Scheduled reporting for customers
> once every hourOnline Ad
Web/Social
Retail
Our customer use cases
> Scheduled reporting for management
> Compute KPIs
> Scheduled reporting for website, PoS and touch panel data
> Hard deadlines!
> Check ad-network performance
> delivery logic optimization in realtime
> Aggregation for user support
> Measuring the effect of user campaigns
> Ad-hoc query for Basket Analysis
> Aggregate data for the product development
Hive Presto
Conclusion
Batch summary
> MapReduce-based Hive is still the default choice > Stable & Lots of shared experience and knowledge
> Hive with Tez is for Hadoop users > No code change needed > HDP includes Tez by default
> Spark and Spark SQL is a good alternative > Can’t reuse Hadoop knowledge > Mainly for in-memory processing for now
Short batch summary> Presto is a good default choice
> Easy to manage and have useful features > Need faster queries? Try Impala
> for HDFS and HBase > CDH includes Impala by default
> If you are a challenger, check out Drill > The project’s goal is ambitious > The status is developer preview
Stream summary
> Fluentd and Norikra > Fluentd is for robust log collection > Norikra is for SQL based CEP
!
> StreamSQL > for Spark users > Current status is POC
Lastly…> Use different engines for different requirements
> Hadoop/Spark for batch jobs > MapReduce won't die for the time being
> MPP query engine for interactive queries > These engines are integrated into
one system in the future > Batch now use DAG pipeline > Short Batch will support Task recovery
The differences will be minimum
Enjoy SQL!
Check: treasuredata.com
Cloud service for the entire data pipeline, including Presto