big sql v3.0 - manchester meetup - 12-aug-2014files.meetup.com/10751222/big sql v3.0 -...
Post on 25-Apr-2020
5 Views
Preview:
TRANSCRIPT
© 2014 IBM Corporation
Datawarehouse-grade SQL on Hadoop
3.03.03.0
Big!Big!
Scott C. Gray (sgray@us.ibm.com)Big SQL Architect, IBM InfoSphere BigInsights
12-Aug-2014
Please Note
IBM’s statements regarding its plans, directions, and intent are subject to change
or withdrawal without notice at IBM’s sole discretion.
Information regarding potential future products is intended to outline our general
product direction and it should not be relied on in making a purchasing decision.
The information mentioned regarding potential future products is not a
commitment, promise, or legal obligation to deliver any material, code or
functionality. Information about potential future products may not be incorporated
into any contract. The development, release, and timing of any future features or functionality described for our products remains at our sole discretion.
Performance is based on measurements and projections using standard IBM
benchmarks in a controlled environment. The actual throughput or performance
that any user will experience will vary depending upon many factors, including
considerations such as the amount of multiprogramming in the user’s job stream,
the I/O configuration, the storage configuration, and the workload processed. Therefore, no assurance can be given that an individual user will achieve results
similar to those stated here.
Agenda
Brief introduction to Hadoop
Why SQL on Hadoop?
SQL-on-Hadoop landscape
What is Hive?
Big SQL 3.0
• What is it?
• SQL capabilities
• Architecture
• Application portability andintegration
• Enterprise capabilities
• Performance
Conclusion2
What is Hadoop?
Hadoop is not a piece of software, you can't install "hadoop"
It is an ecosystem of software that work together
• Hadoop Core (API's)
• HDFS (File system)
• MapReduce (Data processing framework)
• Hive (SQL access)
• HBase (NoSQL database)
• Sqoop (Data movement)
• Oozie (Job workflow)
• …. There are is a LOT of "Hadoop" software
However, there is one common component they all build on: HDFS…
• *Not exactly 100% true but 99.999% true
The Hadoop Filesystem (HDFS)
Driving principals
• Files are stored across the entire cluster
• Programs are brought to the data, not the data to the program
Distributed file system (DFS) stores blocks across the whole cluster
• Blocks of a single file are distributed across the cluster
• A given block is typically replicated as well for resiliency
• Just like a regular file system, the contents of a file is up to the application
10110100101001001110011111100101
0011101001010010110010010101001100010100
1011101011101011110110110101011010010101
00101010101011100100110101110100
Logical File
1
2
3
4
Blocks
1
Cluster
1
1
2
22
3
3
34
44
Data processing on Hadoop
Hadoop (HDFS) doesn't dictate file content/structure
• It is just a filesystem!
• It looks and smells almost exactly like the filesystem on your laptop
• Except, you can ask it "where does each block of my file live?"
The entire Hadoop ecosystem is built around that question!
• Parallelize work by sending your programs to the data
• Each copy processes a given block of the file
• Other nodes may be chosen to aggregate together computed results
1011010010100100
1110011111100101001110100101001011001001
0101001100010100101110101110101111011011
0101011010010101
1
2
3
Logical File
Splits
1
Cluster
23
App(Read)
App(Read)
App(Read)
App(Compute)
Result
Hadoop MapReduce
MapReduce is a way of writing parallel processing programs
• Leverages the design of the HDFS filesystem
Programs are written in two pieces: Map and Reduce
Programs are submitted to the MapReduce job scheduler (JobTracker)
• The scheduler looks at the blocks of input needed for the job (the "splits")
• For each split, tries to schedule the processing on a host holding the split
• Hosts are chosen based upon available processing resources
Program is shipped to a host and given a split to process
Output of the program is written back to HDFS
MapReduce - Mappers
Mappers
• Small program (typically), distributed across the cluster, local to data
• Handed a portion of the input data (called a split)• Each mapper parses, filters, or transforms its input
• Produces grouped <key,value> pairs
10110100
1010010011100111111001010011101001010010
1100100101010011000101001011101011101011
11011011010101101001010100101010
101011100100110101110100
Logical Input File
1
2
3
4
1 map
sort
2 map
sort
3 map
sort
4 map
sort
reduce
reduce
copy merge
merge
10110100
101001001110011111100101001110100101001011001001
10110100
101001001110011111100101001110100101001011001001
Logical Output File
Logical Output File
To DFS
To DFS
Map Phase
MapReduce – The Shuffle
The shuffle is transparently orchestrated by MapReduce
The output of each mapper is locally grouped together by key
One node is chosen to process data for each unique key
Shuffle
10110100
1010010011100111111001010011101001010010
1100100101010011000101001011101011101011
11011011010101101001010100101010
101011100100110101110100
1
2
3
4
1 map
sort
2 map
sort
3 map
sort
4 map
sort
reduce
reduce
copy merge
merge
10110100
101001001110011111100101001110100101001011001001
10110100
101001001110011111100101001110100101001011001001
Logical Output File
Logical Output File
To DFS
To DFS
MapReduce - Reduce
Reducers
• Small programs (typically) that aggregate all of the values for the key that they are responsible for
• Each reducer writes output to its own file
Reduce Phase
10110100
1010010011100111111001010011101001010010
1100100101010011000101001011101011101011
11011011010101101001010100101010
101011100100110101110100
1
2
3
4
1 map
sort
2 map
sort
3 map
sort
4 map
sort
reduce
reduce
copy merge
merge
10110100
101001001110011111100101001110100101001011001001
10110100
101001001110011111100101001110100101001011001001
Logical Output File
Logical Output File
To DFS
To DFS
Why SQL for Hadoop?
Hadoop is designed for any data
• Doesn't impose any structure
• Extremely flexible
At lowest levels is API based
• Requires strong programming
expertise
• Steep learning curve
• Even simple operations can be tedious
Yet many, if not most, use cases deal with structured data!
• e.g. aging old warehouse data into queriable archive
Why not use SQL in places its strengths shine?
• Familiar widely used syntax
• Separation of what you want vs. how to get it
• Robust ecosystem of tools
Pre-Processing Hub Query-able Archive Exploratory Analysis
Information Integration
Data Warehouse
StreamsReal-time processing
BigInsightsLanding zone
for all data
Data Warehouse
BigInsights Can combine with
unstructured information
Data Warehouse
1 2 3
SQL-on-Hadoop landscape
The SQL-on-Hadoop landscape is changing rapidly!
They all have their different strengths and weaknesses
Many, including Big SQL, draw their basic designs on Hive…
Then along came Hive
Hive was the first SQL interface for Hadoop data
• Defacto standard for SQL on Hadoop
• Ships with all major Hadoop distributions
SQL queries are executed using MapReduce (today)
Hive introduced several important concepts/components…
Hive tables
In most cases, a table is simply a directory (on HDFS) full of files
Hive doesn't dictate the content/structure of these files
• It is designed to work with existing user data
• In fact, there is no such thing as a "Hive table"
In Hive java classes define a "table"
/biginsights/hive/warehouse/myschema.db/mytable/
file1
file2
…
CREATE TABLE my_strange_table (
c1 string,
c2 timestamp,
c3 double
)
ROW FORMAT SERDE "com.myco.MyStrangeSerDe"
WITH SERDEPROPERTIES ( "timestamp.format" = "mm/dd/yyyy" )
INPUTFORMAT "com.myco.MyStrangeInputFormat"
OUTPUTFORMAT "com.myco.MyStrangeOutputFormat"
InputFormat and SerDe
InputFormat – Hadoop concept
• Defines a java class that can read from a particular data source
– E.g. file format, database connection, region servers, web servers, etc.
• Each InputFormat produces its own record format as output
• Responsible for determining splits: how to break up the data from the data source to that work can be split up between mappers
• Each table defines an InputFormat (in the catalog) that understands the
table’s file structure
SerDe (Serializer/Deserializer) – Hive concept
• A class written to interpret the records produced by an InputFormat
• Responsible converting that record to a row (and back)
• A row is a clearly defined Hive definition (an array of values)
1011010010100100111001111110
010100110101110111010
data file InputFormat
(records)
SerDe
(rows)
Hive tables (cont.)
For many common file formats Hive provides a simplified syntax
This just pre-selects combinations of classes and configurations
create table users
(
id int,
office_id int
)
row format delimited
fields terminated by '|'
stored as textfile
create table users
(
id int,
office_id int
)
row format serde 'org.apache.hive…LazySimpleSerde'
with serdeproperties ( 'field.delim' = '|' )
inputformat 'org.apache.hadoop.mapred.TextInputFormat'
outputformat 'org.apache.hadoop.mapred.TextOutputFormat'
Hive partitioned tables
Most table types can be partitioned
Partitioning is on one or more columns
Each unique value becomes a partition
Query predicates can be used to eliminated scanned partitions
CREATE TABLE demo.sales (
part_id int,
part_name string,
qty int,
cost double
)
PARTITIONED BY (
state char(2)
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|';
biginsights
hive
warehouse
demo.db
sales
state=NJ
state=AR
state=CA
state=NY
data1.csv
data1.csvdata2.csv
data1.csvselect * from demo.saleswhere state in ('NJ', 'CA');
select * from demo.saleswhere state in ('NJ', 'CA');
Hive MetaStore
Hive maintains a centralized database of metadata
• Typically stored in a traditional RDBMS
Contains table definitions
• Location (directory on HDFS)
• Column names and types
• Partition information
• Classes used to read/write the table
• Etc.
Security
• Groups, roles, permissions
Query processing in Hive
Up until version 0.13 (just released) Hive used MapReduce for query processing
• They are moving to a new framework called "Tez"
It is useful to understand query processing in MR to understand
how Big SQL tackles the same problem…
Hive – Joins in MapReduce
For joins, MR is used to group data together at the same reducer based upon the join key
• Mappers read blocks from each “table” in the join
• The <key> is the value of the join key, the <value> is the record to be joined
• Reducer receives a mix of records from each table with the same join key
• Reducers produce the results of the join
reducedept 1
reducedept 2
reducedept 3
1011010
010100100111001111110010100110101110111010
11 map
2 map2
1 map
employees
1011010
010100100111100111011
1
depts
select e.fname, e.lname, d.dept_namefrom employees e, depts d
where e.salary > 30000and d.dept_id = e.dept_id
select e.fname, e.lname, d.dept_namefrom employees e, depts d
where e.salary > 30000and d.dept_id = e.dept_id
N-way Joins in MapReduce
For N-way joins involving different join keys, multiple jobs are used
reducedept 1
reducedept 2
reducedept 3
101101001010010011100
1111110010100110101110111010
1 1 map
2 map
2
1 map
employees
10110100101001
00111100111011
1
select e.fname, e.lname, d.dept_name, p.phone_type, p.phone_numberfrom employees e, depts d, emp_phones p
where e.salary > 30000and d.dept_id = e.dept_idand p.emp_id = e.emp_id
select e.fname, e.lname, d.dept_name, p.phone_type, p.phone_numberfrom employees e, depts d, emp_phones p
where e.salary > 30000and d.dept_id = e.dept_idand p.emp_id = e.emp_id
depts
10110100101001
00111001111110010100110101110111010
1
2
1011010
010100100111100111011
1
1011010010100100111001111110
010100110101110111010
1
2
10110100101001
00111001111110010100110101110111010
1
2
emp_phones
(temp files)
1 map
2 map
1 map
1 map
2 map
1 map
2 map
reducedept 1 reduce
emp_id 1
reduceemp_id 2
reduceemp_id N
results
results
results
Agenda
Brief introduction to Hadoop
Why SQL on Hadoop?
What is Hive?
SQL-on-Hadoop landscape
Big SQL 3.0
• What is it?
• SQL capabilities
• Architecture
• Application portability andintegration
• Enterprise capabilities
• Performance
Conclusion21
Big SQL and BigInsights 3.0
Big SQL is a standard component in all IBM BigInsights versions
• Seamlessly integrated with other BigInsights components
• Expands IBM's continued focus on large scale, high performance analytics
Standard Edition
Breadth of capabilities
En
terp
rise c
lass
Enterprise Edition
- Spreadsheet-style tool
- Web console
- Dashboards
- Pre-built applications
- Eclipse tooling
- RDBMS connectivity
- Big SQL-- Monitoring and alerts
-- Platform enhancements
-- . . .
- Accelerators
- GPFS – FPO
- Adaptive MapReduce
- Text analytics
- Enterprise Integration
-- Big R
-- InfoSphere Streams*
-- Watson Explorer*
-- Cognos BI*
-- Data Click*
-- . . .
-* Limited use license
ApacheHadoop
Quick Start Free. Non-production
Same features as Standard Edition plus text analytics and Big R
Big SQL 3.0 – At a glance
Application Portability & Integration
Data shared with Hadoop ecosystem
Comprehensive file format support
Superior enablement of IBM software
Enhanced by Third Party software
Performance
Modern MPP runtime
Powerful SQL query rewriter
Cost based optimizer
Optimized for concurrent user throughput
Results not constrained by memory
Federation
Distributed requests to multiple data
sources within a single SQL statement
Main data sources supported:
DB2 LUW, DB2/z, Teradata, Oracle,
Netezza
Enterprise Features
Advanced security/auditing
Resource and workload management
Self tuning memory management
Comprehensive monitoring
Rich SQLComprehensive SQL Support
IBM SQL PL compatibility
Open processing
As with Hive, Big SQL applies SQL to your existing data
• No propriety storage format
A "table" is simply a view on your Hadoop data
Table definitions shared with Hive
• The Hive Metastore catalogs table definitions
• Reading/writing data logic is sharedwith Hive
• Definitions can be shared across theHadoop ecosystem
Sometimes SQL isn't the answer!
• Use the right tool for the right job
Hive
Hive Metastore
Hadoop Cluster
Pig
Hive APIs
Sqoop
Hive APIs
Big SQL
Hive APIs
Creating tables in Big SQL
Big SQL syntax is derived from Hive's syntax with extensions
create hadoop table users
(
id int not null primary key,
office_id int null,
fname varchar(30) not null,
lname varchar(30) not null,
salary timestamp(3) null,
constraint fk_ofc foreign key (office_id)
references office (office_id)
)
row format delimited
fields terminated by '|'
stored as textfile;
Creating tables in Big SQL
Big SQL syntax is derived from Hive's syntax with extensions
create hadoop table users
(
id int not null primary key,
office_id int null,
fname varchar(30) not null,
lname varchar(30) not null,
salary timestamp(3) null,
constraint fk_ofc foreign key (office_id)
references office (office_id)
)
row format delimited
fields terminated by '|'
stored as textfile; Hadoop Keyword• Big SQL requires the HADOOP keyword
• Big SQL has internal traditional RDBMS table support
• Stored only at the head node• Does not live on HDFS
• Supports full ACID capabilities• Not usable for "big" data
• The HADOOP keyword identifies the table as living on HDFS
Hadoop Keyword• Big SQL requires the HADOOP keyword
• Big SQL has internal traditional RDBMS table support
• Stored only at the head node• Does not live on HDFS
• Supports full ACID capabilities• Not usable for "big" data
• The HADOOP keyword identifies the table as living on HDFS
Creating tables in Big SQL
Big SQL syntax is derived from Hive's syntax with extensions
create hadoop table users
(
id int not null primary key,
office_id int null,
fname varchar(30) not null,
lname varchar(30) not null,
salary timestamp(3) null,
constraint fk_ofc foreign key (office_id)
references office (office_id)
)
row format delimited
fields terminated by '|'
stored as textfile; Nullability Indicators
• Enforced on read
• Used by query optimizer for smarter rewrites
Nullability Indicators
• Enforced on read
• Used by query optimizer for smarter rewrites
Creating tables in Big SQL
Big SQL syntax is derived from Hive's syntax with extensions
create hadoop table users
(
id int not null primary key,
office_id int null,
fname varchar(30) not null,
lname varchar(30) not null,
salary timestamp(3) null,
constraint fk_ofc foreign key (office_id)
references office (office_id)
)
row format delimited
fields terminated by '|'
stored as textfile;
Constraints• Unenforced• Useful as documentation and to drive query builders• Used by query optimizer for smarter rewrites
Constraints• Unenforced
• Useful as documentation and to drive query builders• Used by query optimizer for smarter rewrites
Table types
Big SQL supports many of the "standard" Hadoop storage formats
• Text delimited
• Text delimited sequence files
• Binary delimited sequence files
• Parquet
• RC
• ORC
• Avro
Each has different features/advantages/disadvantages
Custom file formats may be supported as well via custom java classes
Populating Big SQL tables
There are a number of ways to populate tables
Tables can be defined against existing data
• All validation is performed at query time
Rows can be directly inserted into tables
• Data is validated is performed and converted to storage format
• Only suitable for testing
• Produces one physical data file per call to INSERT
create external hadoop table csv_data
(
c1 int not null primary key,
c2 varchar(20) null
)
row format delimited fields terminated by ','
stored as textfile
location '/user/bob/csv_data'
insert into t1 values (5, 'foo'), (6, 'bar'), (7, 'baz')
Populating Big SQL tables (cont.)
Tables can be populated from other tables
Tables can be created from other tables
• Great way to convert between storage types or partition data
insert into top_sellers
select employee_id, rank() over (order by sales)
from (
select employee_id, sum(sales) sales
from product_sales
group by employee_id
)
limit 10;
create hadoop table partitioned_sales
partitioned by (dept_id int not null)
stored as rcfile
as
select emp_id, prod_id, qty, cost, dept_id
from sales
Populating Big SQL tables (cont.)
The LOAD HADOOP is used to populate Hadoop tables from an external data source
• Statement runs on the cluster – cannot access data at the client
• Nodes of the cluster ingest data in parallel
• Performs data validation during load
• Performs data conversion (to storage format) during load
Supports the following sources of data
• Any valid Hadoop URL (e.g. hdfs://, sftp://, etc.)
• JDBC data sources (e.g. Oracle, DB2, Netezza, etc.)
Loading from URL
Data may be loaded from delimited files read via any valid URL
• If no URI specified is provided, HDFS is assumed:
Example loading via SFTP:
Just remember LOAD HADOOP executes on the cluster
• So file:// will be local to the node chosen to run the statement
LOAD HADOOP USING FILE URL '/user/biadmin/mydir/abc.csv'
WITH SOURCE PROPERTIES(
'field.delimiter'=',',
'date.time.format'=''yyyy-MM-dd-HH.mm.ss.S')
LOAD HADOOP USING FILE URL
sftp://biadmin.biadmin@myserver.abc.com:22/home/biadmin/mydir'
LOAD HADOOP USING FILE URL file:///path/to/myfile/file.csv'
Loading from JDBC data source
A JDBC URL may be used to load directly from external data source
• Tested internally against Oracle, Teradata, DB2, and Netezza
It supports many options to partition the extraction of data
• Providing a table and partitioning column
• Providing a query and a WHERE clause to use for partitioning
Example usage:
LOAD USING JDBC
CONNECTION URL 'jdbc:db2://myhost:50000/SAMPLE'
WITH PARAMETERS (
user = 'myuser',
password='mypassword'
)
FROM TABLE STAFF WHERE "dept=66 and job='Sales'"
INTO TABLE staff_sales
PARTITION ( dept=66 , job='Sales')
APPEND WITH LOAD PROPERTIES (bigsql.load.num.map.tasks = 1) ;
SQL capabilities
Leverage IBM's rich SQL heritage, expertise, and technology
• Modern SQL:2011 capabilities
• DB2 compatible SQL PL support
– SQL bodied functions
– SQL bodied stored procedures
– Robust error handling
– Application logic/security encapsulation
– Flow of control logic
The same SQL you use on your data warehouse should run with
few or no modifications
SQL capability highlights
Full support for subqueries
• In SELECT, FROM, WHERE and HAVING
• Correlated and uncorrelated
• Equality, non-equality subqueries
• EXISTS, NOT EXISTS, IN, ANY, SOME, etc.
All standard join operations
• Standard and ANSI join syntax
• Inner, outer, and full outer joins
• Equality, non-equality, cross join support
• Multi-value join (WHERE (c1, c2) = …)
• UNION, INTERSECT, EXCEPT
SELECT
s_name,
count(*) AS numwait
FROM
supplier,
lineitem l1,
orders,
nation
WHERE
s_suppkey = l1.l_suppkey
AND o_orderkey = l1.l_orderkey
AND o_orderstatus = 'F'
AND l1.l_receiptdate > l1.l_commitdate
AND EXISTS (
SELECT
*
FROM
lineitem l2
WHERE
l2.l_orderkey = l1.l_orderkey
AND l2.l_suppkey <> l1.l_suppkey
)
AND NOT EXISTS (
SELECT
*
FROM
lineitem l3
WHERE
l3.l_orderkey = l1.l_orderkey
AND l3.l_suppkey <> l1.l_suppkey
AND l3.l_receiptdate >
l3.l_commitdate
)
AND s_nationkey = n_nationkey
AND n_name = ':1'
GROUP BY
s_name
ORDER BY
numwait desc,
s_name;
SELECT
s_name,
count(*) AS numwait
FROM
supplier,
lineitem l1,
orders,
nation
WHERE
s_suppkey = l1.l_suppkey
AND o_orderkey = l1.l_orderkey
AND o_orderstatus = 'F'
AND l1.l_receiptdate > l1.l_commitdate
AND EXISTS (
SELECT
*
FROM
lineitem l2
WHERE
l2.l_orderkey = l1.l_orderkey
AND l2.l_suppkey <> l1.l_suppkey
)
AND NOT EXISTS (
SELECT
*
FROM
lineitem l3
WHERE
l3.l_orderkey = l1.l_orderkey
AND l3.l_suppkey <> l1.l_suppkey
AND l3.l_receiptdate >
l3.l_commitdate
)
AND s_nationkey = n_nationkey
AND n_name = ':1'
GROUP BY
s_name
ORDER BY
numwait desc,
s_name;
SQL capability highlights (cont.)
Extensive analytic capabilities
• Grouping sets with CUBE and ROLLUP
• Standard OLAP operations
• Analytic aggregates
LEAD LAG RANK DENSE_RANK
ROW_NUMBER RATIO_TO_REPORT FIRST_VALUE LAST_VALUE
CORRELATION COVARIANCE STDDEV VARIANCE
REGR_AVGX REGR_AVGY REGR_COUNT REGR_INTERCEPT
REGR_ICPT REGR_R2 REGR_SLOPE REGR_XXX
REGR_SXY REGR_XYY WIDTH_BUCKET VAR_SAMP
VAR_POP STDDEV_POP STDDEV_SAMP COVAR_SAMP
COVAR_POP NTILE
Architected for performance
Architected from the ground up for low latency and high throughput
MapReduce replaced with a modern MPP architecture
• Compiler and runtime are native code (not java)
• Big SQL worker daemons live directly on cluster
– Continuously running (no startup latency)
– Processing happens locally at the data
• Message passing allows data to flow directly
between nodes
Operations occur in memory with the ability
to spill to disk
• Supports aggregations and sorts larger than
available RAM
InfoSphere BigInsights
Big SQL
SQL MPP Runtime
Data Sources
Parquet CSV Seq RC
Avro ORC JSON Custom
SQL-based
Application
IBM Data Server Client
Big SQL 3.0 – Architecture
39
Management Node
Big SQLMaster Node
Management Node
Big SQLScheduler
Big SQLWorker Node
JavaI/O
FMP
NativeI/O
FMP
HDFS Data Node
MRTaskTracker
Other
ServiceHDFSData HDFS
Data HDFSData
TempData
UDF FMP
Compute Node
Database Service
Hive
Metastore
Hive Server
Big SQLWorker Node
JavaI/O
FMP
NativeI/O
FMP
HDFS Data Node
MRTask
Tracker
Other ServiceHDFS
Data HDFS
Data HDFSData
TempData
UDF FMP
Compute Node
Big SQLWorker Node
JavaI/O
FMP
NativeI/O
FMP
HDFS
Data Node
MRTaskTracker
Other ServiceHDFS
Data HDFSData HDFS
Data
TempData
UDF FMP
Compute Node
DDLFMP
UDF FMP
*FMP = Fenced mode process
Scheduler Service
The Scheduler is the main RDBMS↔Hadoop service interface
Interfaces with Hive metastore for table metadata• Compiler ask it for some "hadoop" metadata, such as partitioning
columns
Acts like the MapReduce job tracker for Big SQL• Big SQL provides query predicates for scheduler to perform
partition elimination• Determines splits for each “table” involved in the query• Schedules splits on available Big SQL nodes
(favoring scheduling locally to the data)• Serves work (splits) to I/O engines• Coordinates “commits” after INSERTs
Management Node
Big SQLMaster Node
Big SQLScheduler
DDLFMP
UDF FMP
Mgmt Node
Database Service
Hive
Metastore
Big SQLWorker Node
JavaI/O
FMP
NativeI/O
FMP
HDFS
Data
Node
MRTaskTrackerUDF
FMP
DDL Processing
Big SQL’s DDL is heavily derived from Hive’s DDL with extensions, e.g.• Referential constraints
• SQL standard identifier conventions
The database engine doesn’t understandthe DDL statements natively
Statements are forwarded to the DDL FMP• Statement decomposed to native database
DDL statement to define the relational structure of the table
• Statement decomposed to Hive DDLto populate the Hive metastore
• Scheduler is notified of changes
Management Node
Big SQLMaster Node
Big SQLScheduler
DDLFMP
UDF FMP
Mgmt Node
Database Service
Hive
Metastore
create hadoop table users
(
id int not null primary key,
office_id int null,
fname varchar(30) not null,
lname varchar(30) not null,
salary timestamp(3) null,
constraint fk_ofc foreign key (office_id)
references office (office_id)
)
row format delimited
fields terminated by '|'
stored as textfile;
create hadoop table users
(
id int not null primary key,
office_id int null,
fname varchar(30) not null,
lname varchar(30) not null,
salary timestamp(3) null,
constraint fk_ofc foreign key (office_id)
references office (office_id)
)
row format delimited
fields terminated by '|'
stored as textfile;
I/O Processing
Native I/O FMP• The high-speed interface for common file formats
• Delimited, Parquet, RC, Avro, and Sequencefile
Java I/O FMP• Handles all other formats via standard Hadoop/Hive API’s
Both perform multi-threaded direct I/O on local data
The database engine understands storage format capabilities• Projection list is pushed into I/O format
• Predicates are pushed as close to the data as possible (into storage format, if possible)
• Predicates that cannot be pushed down areevaluated within the database engine
The database engine is only aware of which nodesneed to read
• Scheduler directs the readers to their portion of work
Big SQLWorker Node
JavaI/O
FMP
NativeI/O
FMP
HDFS Data
Node
MRTaskTracker
Other
ServiceHDFSData HDFS
Data HDFSData
TempData
UDF FMP
Compute Node
Resource management
Big SQL doesn't run in isolation
Nodes tend to be shared with a variety of Hadoop services
• Task tracker
• Data node• HBase region servers
• MapReduce jobs
• etc.
Big SQL can be constrained to limit its footprint on the cluster
• % of CPU utilization
• % of memory utilization
Resources are automatically adjusted based upon workload
• Always fitting within constraints
• Self-tuning memory manager that re-distributes resources across
components dynamically
• default WLM concurrency control for heavy queries
Compute Node
Task Tracker
Data Node
BigSQL
HBaseMR
TaskMR
TaskMR
Task
PerformanceQuery rewrites
• Exhaustive query rewrite capabilities
• Leverages additional metadata such as constraints and nullability
Optimization
• Statistics and heuristic driven query optimization• Query optimizer based upon decades of IBM RDBMS experience
Tools and metrics
• Highly detailed explain plans and query diagnostic tools
• Extensive number of available performance metrics
SELECT ITEM_DESC, SUM(QUANTITY_SOLD),
AVG(PRICE), AVG(COST)
FROM PERIOD, DAILY_SALES, PRODUCT,
STORE
WHERE
PERIOD.PERKEY=DAILY_SALES.PERKEY AND
PRODUCT.PRODKEY=DAILY_SALES.PRODKEY AND
STORE.STOREKEY=DAILY_SALES.STOREKEY
AND
CALENDAR_DATE BETWEEN AND
'01/01/2012' AND '04/28/2012' AND
STORE_NUMBER='03' AND
CATEGORY=72
GROUP BY ITEM_DESC
Access plan generationQuery transformation
Dozens of query
transformations
Hundreds or thousands
of access plan options
Store
Product
Product Store
NLJOIN
Daily SalesNLJOIN
Period
NLJOIN
Product
NLJOIN
Daily Sales
NLJOIN
Period
NLJOIN
Store
HSJOIN
Daily Sales
HSJOIN
Period
HSJOIN
Product
StoreZZJOIN
Daily Sales
HSJOIN
Period
Application portability and integration
Big SQL 3.0 adopts IBM's standard Data Server Client Drivers
• Robust, standards compliant ODBC, JDBC, and .NET drivers
• Same driver used for DB2 LUW, DB2/z and Informix
• Expands support to numerous languages (Python, Ruby, Perl, etc.)
Putting the story together….
• Big SQL shares a common SQL dialect with DB2
• Big SQL shares the same client drivers with DB2
Compatible SQL
Compatible SQL
Compatible Drivers
Compatible Drivers
Portable Application
Portable Application
Query federation
Data never lives in isolation
• Either as a landing zone or a queryable archive it is desirable to query data across Hadoop and active Data warehouses
Big SQL provides the ability to query heterogeneous systems
• Join Hadoop to other relational databases
• Query optimizer understands capabilities of external system
– Including available statistics
• As much work as possible is pushed to each system to process
Head Node
Big SQL
Compute Node
Task Tracker
Data Node
BigSQL
Compute Node
Task Tracker
Data Node
BigSQL
Compute Node
Task Tracker
Data Node
BigSQL
Compute Node
Task Tracker
Data Node
BigSQL
Enterprise security
Users may be authenticated via
• Operating system
• Lightweight directory access protocol (LDAP)
• Kerberos
User authorization mechanisms include
• Full GRANT/REVOKE based security
• Group and role based hierarchical security
• Object level, column level, or row level (fine-grained) access controls
Auditing
• You may define audit policies and track user activity
Transport layer security (TLS)
• Protect integrity and confidentiality of data between the client and Big SQL
MonitoringComprehensive runtime monitoring infrastructure that helps
answer the question: what is going on in my system?• SQL interfaces to the monitoring data via table functions
• Ability to drill down into more granular metrics for problem determination and/ or
detailed performance analysis
• Runtime statistics collected during the execution of the section for a (SQL) access
plan
• Support for event monitors to track specific types of operations and activities
• Protect against and discover unknown/ suspicious behaviour by monitoring data
access via Audit facility.
Reporting Level
(Example: Service Class)
Big SQL 3.0
Worker Threads
Connection
Control Blocks
Worker Threads Collect Locally
Push Up Data Incrementally
Extract Data Directly From
Reporting level
Monitor Query
Performance, Benchmarking, Benchmarketing
Performance matters to customers
Benchmarking appeals to Engineers to drive product innovation
Benchmarketing used to convey performance in a memorable
and appealing way
SQL over Hadoop is in the “Wild West” of Benchmarketing
• 100x claims! Compared to what? Conforming to what rules?
The TPC (Transaction Processing Performance Council) is the
grand-daddy of all multi-vendor SQL-oriented organizations
• Formed in August, 1988
• TPC-H and TPC-DS are the most relevant to SQL over Hadoop
– R/W nature of workload not suitable for HDFS
Big Data Benchmarking Community (BDBC) formed
49
Power and Performance of Standard SQL
Everyone loves performance numbers, but that's not the whole story
• How much work do you have to do to achieve those numbers?
A portion of our internal performance numbers are based upon read-only
versions of TPC benchmarks
Big SQL is capable of executing
• All 22 TPC-H queries without modification
• All 99 TPC-DS queries without modification
SELECT s_name, count(*) AS numwait
FROM supplier, lineitem l1, orders, nation
WHERE s_suppkey = l1.l_suppkeyAND o_orderkey = l1.l_orderkey
AND o_orderstatus = 'F'
AND l1.l_receiptdate > l1.l_commitdate
AND EXISTS (
SELECT *FROM lineitem l2
WHERE l2.l_orderkey = l1.l_orderkey
AND l2.l_suppkey <> l1.l_suppkey)
AND NOT EXISTS (
SELECT *FROM lineitem l3
WHERE l3.l_orderkey = l1.l_orderkey
AND l3.l_suppkey <> l1.l_suppkey
AND l3.l_receiptdate > l3.l_commitdate)AND s_nationkey = n_nationkey
AND n_name = ':1'
GROUP BY s_name
ORDER BY numwait desc, s_name
SELECT s_name, count(*) AS numwait
FROM supplier, lineitem l1, orders, nation
WHERE s_suppkey = l1.l_suppkeyAND o_orderkey = l1.l_orderkey
AND o_orderstatus = 'F'
AND l1.l_receiptdate > l1.l_commitdate
AND EXISTS (
SELECT *FROM lineitem l2
WHERE l2.l_orderkey = l1.l_orderkey
AND l2.l_suppkey <> l1.l_suppkey)
AND NOT EXISTS (
SELECT *FROM lineitem l3
WHERE l3.l_orderkey = l1.l_orderkey
AND l3.l_suppkey <> l1.l_suppkey
AND l3.l_receiptdate > l3.l_commitdate)AND s_nationkey = n_nationkey
AND n_name = ':1'
GROUP BY s_name
ORDER BY numwait desc, s_name
JOIN
(SELECT s_name, l_orderkey, l_suppkey
FROM orders o
JOIN(SELECT s_name, l_orderkey, l_suppkey
FROM nation n
JOIN supplier s
ON s.s_nationkey = n.n_nationkeyAND n.n_name = 'INDONESIA'
JOIN lineitem l
ON s.s_suppkey = l.l_suppkey
WHERE l.l_receiptdate > l.l_commitdate) l1
ON o.o_orderkey = l1.l_orderkeyAND o.o_orderstatus = 'F') l2
ON l2.l_orderkey = t1.l_orderkey) a
WHERE (count_suppkey > 1) or ((count_suppkey=1)
AND (l_suppkey <> max_suppkey))) l3
ON l3.l_orderkey = t2.l_orderkey) bWHERE (count_suppkey is null)
OR ((count_suppkey=1) AND (l_suppkey = max_suppkey))) c
GROUP BY s_name
ORDER BY numwait DESC, s_name
JOIN
(SELECT s_name, l_orderkey, l_suppkey
FROM orders o
JOIN(SELECT s_name, l_orderkey, l_suppkey
FROM nation n
JOIN supplier s
ON s.s_nationkey = n.n_nationkeyAND n.n_name = 'INDONESIA'
JOIN lineitem l
ON s.s_suppkey = l.l_suppkey
WHERE l.l_receiptdate > l.l_commitdate) l1
ON o.o_orderkey = l1.l_orderkeyAND o.o_orderstatus = 'F') l2
ON l2.l_orderkey = t1.l_orderkey) a
WHERE (count_suppkey > 1) or ((count_suppkey=1)
AND (l_suppkey <> max_suppkey))) l3
ON l3.l_orderkey = t2.l_orderkey) bWHERE (count_suppkey is null)
OR ((count_suppkey=1) AND (l_suppkey = max_suppkey))) c
GROUP BY s_name
ORDER BY numwait DESC, s_name
SELECT s_name, count(1) AS numwait
FROM
(SELECT s_name FROM(SELECT s_name, t2.l_orderkey, l_suppkey,
count_suppkey, max_suppkey
FROM
(SELECT l_orderkey,
count(distinct l_suppkey) as count_suppkey, max(l_suppkey) as max_suppkey
FROM lineitem
WHERE l_receiptdate > l_commitdate
GROUP BY l_orderkey) t2
RIGHT OUTER JOIN(SELECT s_name, l_orderkey, l_suppkey
FROM
(SELECT s_name, t1.l_orderkey, l_suppkey,
count_suppkey, max_suppkey
FROM (SELECT l_orderkey,
count(distinct l_suppkey) as count_suppkey,
max(l_suppkey) as max_suppkey
FROM lineitem
GROUP BY l_orderkey) t1
SELECT s_name, count(1) AS numwait
FROM
(SELECT s_name FROM(SELECT s_name, t2.l_orderkey, l_suppkey,
count_suppkey, max_suppkey
FROM
(SELECT l_orderkey,
count(distinct l_suppkey) as count_suppkey, max(l_suppkey) as max_suppkey
FROM lineitem
WHERE l_receiptdate > l_commitdate
GROUP BY l_orderkey) t2
RIGHT OUTER JOIN(SELECT s_name, l_orderkey, l_suppkey
FROM
(SELECT s_name, t1.l_orderkey, l_suppkey,
count_suppkey, max_suppkey
FROM (SELECT l_orderkey,
count(distinct l_suppkey) as count_suppkey,
max(l_suppkey) as max_suppkey
FROM lineitem
GROUP BY l_orderkey) t1
Original QueryRe-written for Hive
50
51
Comparing Big SQL and Hive 0.12 for Ad-Hoc Queries
*Based on IBM internal tests comparing IBM Infosphere Biginsights 3.0 Big SQL with Hive 0.12 executing the "1TB Classic
BI Workload" in a controlled laboratory environment. The 1TB Classic BI Workload is a workload derived from the TPC-H Benchmark Standard, running at 1TB scale factor. It is materially equivalent with the exception that no update functions are performed. TPC Benchmark and TPC-H are trademarks of the Transaction Processing Performance Council (TPC).
Configuration: Cluster of 9 System x3650HD servers, each with 64GB RAM and 9x2TB HDDs running Redhat Linux 6.3. Results may not be typical and will vary based on actual workload, configuration, applications, queries and other variables in
a production environment. Results as of April 22, 2014
Big SQL is up to
41x faster than
Hive 0.12
Big SQL is up to
41x faster than
Hive 0.12
52
Comparing Big SQL and Hive 0.12
for Decision Support Queries
* Based on IBM internal tests comparing IBM Infosphere Biginsights 3.0 Big SQL with Hive 0.12 executing the "1TB Modern BI Workload" in a controlled laboratory environment. The 1TB Modern BI Workload is a workload derived from the TPC-DS Benchmark Standard, running at 1TB scale factor. It is materially equivalent with the exception that no updates are performed, and only 43 out of 99 queries are executed. The test measured sequential query execution of all 43 queries for which Hive syntax was publically available. TPC Benchmark and TPC-DS are trademarks of the Transaction Processing Performance Council (TPC).Configuration: Cluster of 9 System x3650HD servers, each with 64GB RAM and 9x2TB HDDs running Redhat Linux 6.3. Results may not be typical and will vary based on actual workload, configuration, applications, queries and other variables in a production environment. Results as of April 22, 2014
Big SQL 10x faster than
Hive 0.12
(total workload elapsed time)
Big SQL 10x faster than
Hive 0.12
(total workload elapsed time)
How many times faster is Big SQL than Hive 0.12?
* Based on IBM internal tests comparing IBM Infosphere Biginsights 3.0 Big SQL with Hive 0.12 executing the "1TB Modern BI Workload" in a controlled laboratory environment. The 1TB Modern BI Workload is a workload derived from the TPC-DS Benchmark Standard, running at 1TB scale factor. It is materially equivalent with the exception that no updats are performed, and only 43 out of 99 queries are executed. The test measured sequential query execution of all 43 queries for which Hive syntax was publically available. TPC Benchmark and TPC-DS are trademarks of the Transaction Processing Performance Council (TPC).Configuration: Cluster of 9 System x3650HD servers, each with 64GB RAM and 9x2TB HDDs running Redhat Linux 6.3. Results may not be typical and will vary based on actual workload, configuration, applications, queries and other variables in a production environment. Results as of April 22, 2014
53
Queries sorted by speed up ratio (worst to best)
Max Speedup 74xMax Speedup 74x
Avg Speedup 20xAvg Speedup 20x
BigInsights Big SQL 3.0: Summary
Big SQL provides rich, robust, standards-based SQL support for data stored in BigInsights
• Uses IBM common client ODBC/JDBC drivers
Big SQL fully integrates with SQL applications and tools
• Existing queries run with no or few modifications*
• Existing JDBC and ODBC compliant tools can be leveraged
Big SQL provides faster and more reliable performance
• Big SQL uses more efficient access paths to the data
• Queries processed by Big SQL no longer need to use MapReduce
• Big SQL is optimized to more efficiently move data over the network
Big SQL provides and enterprise grade data management
• Security, Auditing, workload management …
54
Conclusion
Today, it seems, performance numbers are the name of the game
But in reality there is so much more…
• How rich is the SQL?
• How difficult is it to (re-)use your existing SQL?
• How secure is your data?• Is your data still open for other uses on Hadoop?
• Can your queries span your enterprise?
• Can other Hadoop workloads co-exist in harmony?
• …
With Big SQL 3.0 performance doesn't mean compromise
Questions?
Useful links
• Big SQL technology preview (cloud demo)http://ibm.biz/bigsqlpreview
• BigInsights Quickstart Edition (includes Big SQL)http://ibm.co/quickstart
– Tutorial: https://developer.ibm.com/hadoop/videos/getting-started-biginsights-quick-start-edition-single-node/
– Labs:
http://www.ibm.com/support/knowledgecenter/SSPT3X_3.0.0/com.ibm.swg.im.infosphere.biginsights.tut.doc/doc/tut_intro_bigsql.html
BACKUP SLIDES
Statistics
Big SQL utilizes Hive statistics collection with some extensions:
• Additional support for column groups, histograms and frequent values
• Automatic determination of partitions that require statistics collection vs. explicit
• Partitioned tables: added table-level versions of NDV, Min, Max, Null count, Average column length
• Hive catalogs as well as database engine catalogs are also populated
• We are restructuring the relevant code for submission back to Hive
Capability for statistic fabrication if no stats available at compile time
Table statistics• Cardinality (count)• Number of Files• Total File Size
Column statistics• Minimum value (all types)• Maximum value (all types)• Cardinality (non-nulls)• Distribution (Number of Distinct Values
NDV)• Number of null values• Average Length of the column value (all
types)• Histogram - Number of buckets configurable
• Frequent Values (MFV) – Number configurable
Column group statistics
top related