blinkdb
DESCRIPTION
Introduction to BlinkDB : Queries with Bounded Errors and Bounded Response Times on Very Large DataTRANSCRIPT
Goal : Solve Big Data !
2
How to achieve the best Performance ?
Hard Disks
½ - 1 Hour 1 - 5 Minutes 1 second
?Memory
100 TB on 1000 machines
3
Evolved To Evolves To ?
Better and Faster Frameworks ?
4
If we cannot do better thanIn-Memory than what?
5
Can we use Approximate Computing ?
6
Can you tolerate Errors ?Well, It depends on the
scenario right…
7
Overview of Big Data Space
8
Massive log Batch processing
9
Can we use Approximate Computing ?Answer : YES / NO
10
Streaming data processing
11
Can we use Approximate Computing ?Answer : MAYBE
12
Exploratory Data Analysis
13
Exploratory / InteractiveData Processing-- Getting a sense of data (Data
Scientists)-- Debugging ? (SREs / DevOps)
14
Can we use Approximate Computing ?Answer : YES !
15
1) BlinkDB : Queries with Bounded Errors andbounded Response Times on Very Large Data.
2) Blink and It’s Done : Interactive Queries on VeryLarge Data.
3) A General Bootstrap Performance Diagnostic.
4) Knowing When You’re Wrong : Building Fast andReliable Approximate Query Processing Systems.
Sameer Agrawal, Ariel Kleiner, Henry Milner, Barzan Mozafari,
Ameet Talwalkar, Michael Jordan, Samuel Madden, Ion Stoica
Our GoalSupport interactive SQL-like aggregate queries over massive sets of data
17
Our GoalSupport interactive SQL-like aggregate queries over massive sets of data
blinkdb> SELECT AVG(jobtime)
FROM very_big_logAVG, COUNT, SUM, STDEV,
PERCENTILE etc.
18
Support interactive SQL-like aggregate queries over massive sets of data
blinkdb> SELECT AVG(jobtime)
FROM very_big_log
WHERE src = ‘hadoop’
FILTERS, GROUP BY clauses
Our Goal
19
Support interactive SQL-like aggregate queries over massive sets of data
blinkdb> SELECT AVG(jobtime)
FROM very_big_log
WHERE src = ‘hadoop’
LEFT OUTER JOIN logs2
ON very_big_log.id = logs.id
JOINS, Nested Queries etc.
Our Goal
20
Support interactive SQL-like aggregate queries over massive sets of data
blinkdb> SELECT my_function(jobtime)
FROM very_big_log
WHERE src = ‘hadoop’
LEFT OUTER JOIN logs2
ON very_big_log.id = logs.id
ML Primitives,User Defined Functions
Our Goal
21
Support interactive SQL-like aggregate queries over massive sets of data
blinkdb> SELECT my_function(jobtime)
FROM very_big_log
WHERE src = ‘hadoop’
ERROR WITHIN 10% AT CONFIDENCE 95%
Our Goal
22
Support interactive SQL-like aggregate queries over massive sets of data
blinkdb> SELECT my_function(jobtime)
FROM very_big_log
WHERE src = ‘hadoop’
WITHIN 5 SECONDS
Our Goal
23
ID City Buff Ratio
1 NYC 0.78
2 NYC 0.13
3 Berkeley 0.25
4 NYC 0.19
5 NYC 0.11
6 Berkeley 0.09
7 NYC 0.18
8 NYC 0.15
9 Berkeley 0.13
10 Berkeley 0.49
11 NYC 0.19
12 Berkeley 0.10
Query Execution on Samples
(Exploration Query)What is the average buffering ratio in the table?
0.2325 (Precise)
24
ID City Buff Ratio
1 NYC 0.78
2 NYC 0.13
3 Berkeley 0.25
4 NYC 0.19
5 NYC 0.11
6 Berkeley 0.09
7 NYC 0.18
8 NYC 0.15
9 Berkeley 0.13
10 Berkeley 0.49
11 NYC 0.19
12 Berkeley 0.10
Query Execution on Samples
What is the average buffering ratio in the table?
ID City Buff Ratio Sampling Rate
2 NYC 0.13 1/4
6 Berkeley 0.25 1/4
8 NYC 0.19 1/4
UniformSample
0.190.2325 (Precise)
25
ID City Buff Ratio
1 NYC 0.78
2 NYC 0.13
3 Berkeley 0.25
4 NYC 0.19
5 NYC 0.11
6 Berkeley 0.09
7 NYC 0.18
8 NYC 0.15
9 Berkeley 0.13
10 Berkeley 0.49
11 NYC 0.19
12 Berkeley 0.10
Query Execution on Samples
What is the average buffering ratio in the table?
ID City Buff Ratio Sampling Rate
2 NYC 0.13 1/4
6 Berkeley 0.25 1/4
8 NYC 0.19 1/4
UniformSample
0.19 +/- 0.050.2325 (Precise)
26
ID City Buff Ratio
1 NYC 0.78
2 NYC 0.13
3 Berkeley 0.25
4 NYC 0.19
5 NYC 0.11
6 Berkeley 0.09
7 NYC 0.18
8 NYC 0.15
9 Berkeley 0.13
10 Berkeley 0.49
11 NYC 0.19
12 Berkeley 0.10
Query Execution on Samples
What is the average buffering ratio in the table?
ID City Buff Ratio Sampling Rate
2 NYC 0.13 1/2
3 Berkeley 0.25 1/2
5 NYC 0.19 1/2
6 Berkeley 0.09 1/2
8 NYC 0.18 1/2
12 Berkeley 0.49 1/2
UniformSample
$0.22 +/- 0.02
0.2325 (Precise)0.19 +/- 0.05
27
Speed/Accuracy Trade-offEr
ror
30 mins
Time to Execute on
Entire Dataset
InteractiveQueries
2 sec
Execution Time (Sample Size) 28
Erro
r
30 mins
Time to Execute on
Entire Dataset
InteractiveQueries
2 sec
Speed/Accuracy Trade-off
Pre-ExistingNoise Execution Time (Sample Size) 29
Where do you want to beon the curve ?
30
Sampling Vs No Sampling on100 Machines
0
100
200
300
400
500
600
700
800
900
1000
1 10-1 10-2 10-3 10-4 10-5
Fraction of full data (10TB)
Qu
ery
Re
spo
nse
Tim
e (
Seco
nd
s)
102
1020
18 13 10 8
10x as response timeis dominated by I/O
31
Sampling Vs No Sampling
0
100
200
300
400
500
600
700
800
900
1000
1 10-1 10-2 10-3 10-4 10-5
Fraction of full data
Qu
ery
Re
spo
nse
Tim
e (
Seco
nd
s)
103
1020
18 13 10 8
(0.02%)(0.07%) (1.1%) (3.4%) (11%)
Error Bars
32
Okay, so you can tolerateerrors…
What are some of the fundamentalchallenges ?
What types of Sample to create ? (cannotSample everything)This boils to : What is our assumption on thenature of future query workload ?
33
Usual Assumption: Futurequeries are SIMILAR topast queries.
What is Similarity ?( Choosing the wrong notion has a heavy penalty :Under / Over fitting )
34
Workload Taxonomy
35
Predictable QCS • Fits well on the model of exploratory queries.
(Queries are usually distinct but most will usethe same column)
• What kind of videos are popular for a region ?- Require looking at data from thousands of videos andhundreds of geographical regions. However fixed columnsets : “video titles” (for grouping) and “viewer location” (forfiltering).
• Backed by empirical evidence from Conviva &Facebook. Key reason for BlinkDBefficiency. ( Lots of work in Database theory)
36
37
BlinkDB Overview
38
What is BlinkDB?A framework built on Shark and Spark that …
- Creates and maintains a variety of offlinesamples from underlying data.
- Returns fast, approximate answers with error bars by executing queries on samples of data ( Runtime Error Latency Profile for Sample selection )
- Verifies the correctness of the error bars that it returns at runtime.
39
1) Sample Creation
40
Building Samples for Queries• Uniform Sampling Vs Stratified Sampling.
• Uniform sampling is however inefficient forqueries that compute aggregates fromgroup :
- We could simply miss under-representing group.
- We care about error of each query equally, with uniformsampling we would be assigning more samples to a groupwhich is more represented.
• Solution : Sample size assignment isdeterministic and not random. This can beachieved with Stratified sampling. 41
Some Terminology …
42
QCS to Sample On
43
What QCS to sample on ? • Formulation as an optimization problem,
where three major factors to consider are :“sparsity” of data, “workload characteristics”and “storage cost of samples”.
• Sparsity : Define a sparsity function as thenumber of groups whose size in ‘T’ is lessthan some number ‘M’.
44
QCS to sample (Contd)…
• Workload : A query with QCS ‘qj’ has some unknown probability ‘pj’. The best estimate of pj is past frequency of queries with QCS qj.
• Storage Cost : Assume a simple formulation where K is same for each group. For a set of columns in ϕ the storage cost is |S(ϕ,K)| :
45
Goal : Maximize theweighted sum of coverage.where ‘coverage’ for a query ‘qi’ given a sample is
defined as the probability that a given value ‘x’ for thecolumns is also present among the rows of S(ϕi,K).
46
Optimization ProblemOptimize the following MILP :
where ‘m’ are all possible QCS and ‘j’ indexes over all
queries and ‘i’ over all column sets. 47
How to sample ?
48
Given a known QCS …• Compute Sample Count for a Group :
- K = min ( n’ / D(ϕ) , |T x0 | )
• Take Samples as :- For each group, sample K rows uniformly at random
without replacement forming sample Sx
• The entire sample S(ϕ,K) is the disjoint unionof multiple Sx :
- If |Tx| > K, we answer based on K random tuples otherwisewe can provide an exact answer.
• For aggregate function AVG, SUM, COUNTand Quartile, K directly determines error . 49
Sharing QCS
• Multiple queries with different ‘t’ and ‘n’ willshare the same QCS. We need to select asubset from our sample dependency.
• We need an appropriate storage techniqueto allow such subsets to be identified atruntime.
50
• The rows of stratified sample S(ϕ,K) are
stored sequentially according to order of
columns in ϕ.
• When Sx is spread over multiple HDFS
blocks, each block contains a random
subset from Sx .
• It is then enough to read any subset of
blocks comprising Sx as long as these
blocks contained minimum needed
records.
Storage Technique
51
Bij = Data Block
(HDFS)
52
Storage Requirement
A table with 1 billion (10^12) tuples and acolumn set with a Zipf distribution (heavytailed) with an exponent of 1.5, it turns outthat the storage required by sample S(ϕ, K) isonly 2.4% of the original table for K = 10^4,5.2% for K = 10^5 , and 11.4% for K = 10^6.This is also consistent with real world datafrom Conviva & Facebook.
53
What is BlinkDB?A framework built on Shark and Spark that …
- creates and maintains a variety of samples from underlying data.
- returns fast, approximate answers by executing queries on samples of data with error bars.
- verifies the correctness of the error bars that it returns at runtime.
54
2) BlinkDB Runtime
55
Selecting a Sample• If BlinkDB finds one or more stratified
samples on a set of columns ‘ϕi’ such that ourquery ‘q’ ⊆ ϕi , we pick the ϕi with smallestnumber of columns.
• If no such QCS samples exist, run ‘q’ on in-memory subsets of all samples maintainedby the system. Out of these, we select thosewith high selectivity.
- Selectivity = Number of Rows Selected by q / Number ofrows read by q
56
Selecting right Sample Size
• Construct an ELP (Error Latency Profile) thatcharacterizes the rate at which the errordecreases ( and time increases) with increasein sample size by running query on smallersamples.
• The scaling rate depends on query structurelike JOINS, GROUP BY, Physical dataplacement and underlying data distribution.
57
Error Profile• Given Q’s error constraints : Idea is to predict the
size of smallest sample that satisfies constraints.
• Variance and Closed form aggregate functions areestimated using standard closed form formulas.
• Also BlinkDB estimates query selectivity, inputdata distribution by running queries on smallersubsamples.
• Number of rows are thus calculated usingStatistical Error estimates.
58
59
Latency Profile• Given Q’s time constraints : Idea is to predict the
maximum size sample that we should run query onwithin the constraints.
• Value of ‘n’ depends on input data, physicalplacement of disk, query structure and availableresources. So as a simplification : BlinkDB simplypredicts ‘n’ by assuming latency scales linearly ininput size.
• For very small in-memory samples : BlinkDB runs afew smaller samples until performance seems togrow linearly and then estimate the linear scalingconstants. 60
Correcting Bias
• Running a query on a non-uniform sampleintroduces certain amount of statistical biasand introduce subtle inaccuracies.
• Solution : BlinkDB periodically replaces thesamples use using a low priority backgroundtask which periodically ( daily ) samples theoriginal data which are then used by thesystem.
61
Error EstimationClosed Form Aggregate Functions- Central Limit Theorem- Applicable to AVG, COUNT, SUM,
VARIANCE and STDEV
62
Error EstimationClosed Form Aggregate Functions- Central Limit Theorem- Applicable to AVG, COUNT, SUM,
VARIANCE and STDEV
Generalized Aggregate Functions- Statistical Bootstrap - Applicable to complex and nested
queries, UDFs, joins etc.- Very computationally expensive. 63
But we are not done yet …
• Statistical functions like CLT and Bootstrapoperate under a set of assumption on query /data.
• We need to have some correctness verifiers!
64
What is BlinkDB?A framework built on Shark and Spark that …
- creates and maintains a variety of samples from underlying data
- returns fast, approximate answers with error bars by executing queries on samples of data
- verifies the correctness of the error bars that it returns at runtime
65
Kleiner’s DiagnosticsEr
ror
Sample Size
More Data Higher Accuracy
300 Data Points 97% Accuracy
[KDD 13]
66
300 Data Points ≈ 30KQueries for Bootstrap !
67
So In an Approximate QP :
• One query that estimates the answer.
• Hundred Queries on Resample of data thatcomputes the error.
• Tens of Thousand of Queries to verify if thiserror is correct.
• BAD PERFORMANCE !
• Solution: Single Pass Executionframework. 68
What is BlinkDB?A framework built on Shark and Spark that …
- creates and maintains a variety of samples from underlying data
- returns fast, approximate answers with error bars by executing queries on samples of data
- verifies the correctness of the error bars that it returns at runtime
69
BlinkDB Implementation
70
BlinkDB Architecture
Hadoop Storage (e.g., HDFS, Hbase, Presto)
Metastore
Hadoop/Spark/Presto
SQL Parser
Query Optimizer
Physical Plan
UDFs
Execution
Driver
Command-line Shell Thrift/JDBC
72
Implementation Changes• Additions in Query Language Parser.
• Parser can trigger a sample creation andmaintenance module.
• A sample selection module that re-writes the queryand assigns it an approximately sized sample.
• Uncertainty module to modify all pre-existingaggregation function to return error bars andconfidence intervals.
• A module periodically samples from the originaldata, creating new samples which are then used bythe system. (Co-Relation +Workload Changes) 73
BlinkDB Evaluation
74
BlinkDB Vs. No Sampling
2.5 TB
from
Cache
7.5 TB
from Disk
Log Scale
75
Scaling BlinkDB
76
Each query operates on 100N GB of data.
Response Timeand
Error Bounds …
7720 Conviva queries averaged over 10runs
Play with BlinkDB! https://github.com/sameeragarwal/blinkdb
78
Take Away …
• The only way for now to escape memoryperformance barrier is to use ApproximateComputing .
• A huge role to play in exploratory dataanalysis.
• BlinkDB provides a framework for AQP +Error Bars + Verifies them.
• Great Performance on real world workloads.79
Personal Takeaway : Take aSTATISTICS class!
80
Credits
These slides are derived from Sameer Agarwal’spresentation : http://goo.gl/cvVb1X
81
Questions ?THANK YOU!
82