asd 2015
TRANSCRIPT
10th of Sept., 2015 ASD'2015@Tanger 1
A T rchitectural and echnologies
A D W dvances for ata arehouse
S ystems
Rim MoussaCS Dept., Engineering School of CarthageUniversity of [email protected]
Conférence Magrébine sur les Avancées des Systèmes Décisionnels @Tanger, Kingdom of Morocco. 2015
10th of Sept., 2015
10th of Sept., 2015 ASD'2015@Tanger 2
Outline
●Data Warehouse Systems»Typical architecture»Issues & solution
●Assessment of large scale systems»Requirements »Decision Support Systems' Benchmarks
●Classical Relational systems●SQL-on Hadoop systems●Lambda Architecture
10th of Sept., 2015 ASD'2015@Tanger 3
Data Warehouse System
10th of Sept., 2015 ASD'2015@Tanger 4
DWS: Issues & solutions
10th of Sept., 2015 ASD'2015@Tanger 5
Assessment of Large-Scale systems
High availabilitycontinuity of service despite nodes' failure, Includes: data recovery and query/job recovery
Cost Managementcost in-premises? cost at a CSP?
High PerformanceResponse time: CRUD, OLTP; Processing time: OLAP
ElasticityAutomatic provisioning and relinquish of resources. For storage: bucket split or merge
Scalabilitysystem performance for a n times higher load and n times hardware capacities
10th of Sept., 2015 ASD'2015@Tanger 6
●Non-TPC benchmarks»APB-1
●No scale factor ●TPC benchmarks»TPC-H
●Scale factor ●Snowflake schema, 22 business questions and refresh functions
»TPC-DS●Scale factor ●Data marts●Hundred of business questions
»TPC-DI●18 transformations, historical and incremental loads are scale
factored● Variants of TPC DSS benchmarks
»Star Schema benchmark: turning TPC-H benchmark into a star schema, with a workload of 12 business questions
»TPC-H*d: turning TPC-H benchmark into a multi-dimensional benchmark (MDX workload for cubes and queries)
Decision Support Systems Benchmarks
10th of Sept., 2015 ASD'2015@Tanger 7
System Categorization
Classical Columnar MapReduce Dataflow Array DB
Graph DB
Doc. DB
...
10th of Sept., 2015 ASD'2015@Tanger 8
Classical DWS
●Distributed DWS apply Fragmentation Schemes»Horizontal processing: parallel processing
●Hash partitioning●Range partitioning●List partitioning●Derived horizontal processing●Any combination
»Vertical partitioning●Load only needed data into memory, saves a lots of IOs●Examples: MonetDB, VectorWise
»both »Most adopted partitioning scheme:
»Replicate dimension tables»Partition fact table
●Parallel query processing »Supported by the DBMS (DB links)
»Middleware●Refer to Distributed Databases book, by T. Ozu et P. Valduriez
10th of Sept., 2015 ASD'2015@Tanger 9
●Query rewrite●Cost-based optimization and Rule-based Optimization
»Execute unary operations prior to binary operations●Filters and projections before joins
»Stats●Vectorization
»CPU processes a batch of tuples rather than a single tuple●Co-located joins through derived horizontal partitioning ●Redundant pre-computed data»Examples
»Materialized views»Derived attributes»OLAP indexes»Data Synopsis: histograms, data sampling, ...
»Cons:●Calculation cost,●Costs of refresh, ●Query stale data, ●Storage overhead
»Pros: fast query processing
Performance Tuning
10th of Sept., 2015 ASD'2015@Tanger 10
NoSQL and the V's
●Big Challenges related to:»Volume: management of huge volumes of data ?»Velocity: How fast huge volumes of data can be processed?»Variety: dealing with unstructured, semi-structured, relational
data ?●NoSQL solutions»Adopted by Google, Facebook, Amazon, …»Dynamic horizontal scale-up i.e. scale-out»Nodes are added without bringing the cluster down»Shared-nothing architecture»Independent computing & storage nodes interconnected via a
high speed network»Distributed programming framework: MapReduce (Google)»Apache Hadoop is open source
10th of Sept., 2015 ASD'2015@Tanger 11
Apache Hadoop Ecosystem●Ganglia: monitoring system for clusters
and grids●Sqoop: tool designed for efficiently
transferring bulk data between Apache Hadoop and structured data stores (RDBMS)
●Hama: distributed engine for massive scientific computations such as matrix, graph and network algorithm (BSP)
●HCatalog: table mgmt layer for Hive metadata to other Hadoop applications
●Mahout: scalable machine learning library.
●Ambari: software for provisioning and monitoring Apache Hadoop clusters
●Flume: distributed service for efficiently collecting, aggregating, and moving large amounts of log data
●Giraph: iterative graph processing system
●DRILL: low latency SQL query engine for Hadoop
●Oozie and TEZ: workflow automation
●HDFS: Distributed File System●MapReduce: parallel data processing●Pig latin: data flow scripting language●HBase: distributed, columnar, non-
relational database●Hive: data warehouse infrastructure +
HQL●ZooKeeper: centralized service
providing distributed synchronization
10th of Sept., 2015 ASD'2015@Tanger 12
Hadoop Distributed File System (1/2)
●Distributed File Systems»Network File System (Sun Microsystems, 1984), ...»Google File System (Google, 2000)
●Target: Large scale distributed data intensive systems » big data, I/O-bound applications
●Key properties
»High-throughput»Large blocks: 256MB,.. versus common KB range blocks (8KB, ..)
»Scalability»Yahoo requirements for HDFS in 2006 were,
●storage capacity: 10 PB, ●number of nodes: 10,000 (1TB each), ●number of concurrent clients: 100,000, ...
»High availability»Achieved through blocks' replication
10th of Sept., 2015 ASD'2015@Tanger 13
Hadoop Distributed File System (2/2)
DataNode DataNode DataNode
NameNodeHDFS Client
Namespace backup
...
Metadata: (file name, replicas, each block location...)
heartbeats, balancing, replication, ...
Secondary NameNode
write
read
»HDFS client asks the Name Node for metadata, and performs reads/writes of files on DataNodes.
»Data Nodes communicate with each other for pipeline file reads and writes.
10th of Sept., 2015 ASD'2015@Tanger 14
Hadoop 0|1.x versus Hadoop YARN
Hadoop 0|1.x Hadoop YARN
»Static resource allocation deficiencies
» Job Tracker manages cluster resources and monitors MR Jobs
10th of Sept., 2015 ASD'2015@Tanger 15
Hadoop YARN: Job processing
»Application Master manages the application's lifecycle, negotiates resources from the Resource Manager
»Node Manager manages processes on the node
»Resource Manager is responsible for allocating resources to running applications,
»Container (YARN Child) performs MR tasks and has its CPU, RAM attributes
10th of Sept., 2015 ASD'2015@Tanger 16
Word Count Example (1/3)
10th of Sept., 2015 ASD'2015@Tanger 17
Word Count Example (2/3)
10th of Sept., 2015 ASD'2015@Tanger 18
Word Count Example (3/3)
10th of Sept., 2015 ASD'2015@Tanger 19
MR Job Tuning
●I/O
»Data Block Size
» Can be set for each file
●Parallelism
» Input Split --> Number of mappers
»Number of Reducers●Communication cost
»Data Compression during shuffle●Resource Management
»Each Node has different computing and memory capacities
»Mapper & Reducer allocated resources
»might be different in Hadoop YARN
»Resource on-request ●Code
» Implement combiners (local reducers)
» lower data transfer cost
10th of Sept., 2015 ASD'2015@Tanger 20
Apache Pig Latin
●Google Sawzall (R. Pike et al. 2005)●High-level parallel data flow language
»Open-source MapReduce Code
»Basic operators: boolean ops, arithmetic ops, cast ops, ...
»Relational operators: filtering, projection, join, group, sort, cross, ..
»Aggregation functions: avg, max,count, sum, ..
»Load/Store functions
»Piggybank.jar: open source of UDFs
●Apache Oozie then Apache Tez»Open-source workflow/coordination service to manage data
processing jobs for Apache Hadoop
»A Pig script is translated into a series of MapReduce Jobs which form a DAG (Directed Acyclic Graph)
»A data flow (data move) is an edge
»Each application logic is a vertice
10th of Sept., 2015 ASD'2015@Tanger 21
TPC-H Benchmark -Relational Data Schema
10th of Sept., 2015 ASD'2015@Tanger 22
Q16 in Pig Latin (1/2)
<Q16> The Parts/Supplier Relationship Query counts the number of suppliers who can supply parts that satisfy a particular customer's requirements. The customer is interested in parts of eight different sizes as long as they are not of a given type, not of a given brand, and not from a supplier who has had complaints registered at the Better Business Bureau.
SELECT p_brand, p_type, p_size, count(distinct ps_suppkey) as supplier_cntFROM partsupp, partWHERE p_partkey = ps_partkeyAND p_brand <> '[BRAND]'AND p_type NOT LIKE '[TYPE]%'AND p_size in ([SIZE1], [SIZE2], [SIZE3], [SIZE4], [SIZE5], [SIZE6], [SIZE7], [SIZE8])AND ps_suppkey NOT IN (SELECT s_suppkey FROM supplier
WHERE s_comment like '%Customer%Complaints%')GROUP BY p_brand, p_type, p_sizeORDER BY supplier_cnt DESC, p_brand, p_type, p_size;
10th of Sept., 2015 ASD'2015@Tanger 23
Q16 in Pig Latin (2/2) Suppliers with no complaintssupplier = LOAD 'TPCH/supplier.tbl' USING PigStorage('|') AS (s_suppkey:int, s_name:chararray, s_address:chararray, s_nationkey:int, s_phone:chararray, s_acctbal:double, s_comment:chararray);supplier_pb= FILTER supplier BY NOT(s_comment matches '.*Customer.*Complaints.*');suppkeys_pb = FOREACH supplier_pb GENERATE s_suppkey; Parts size in 49, 14, 23, 45, 19, 3, 36, 9part = LOAD 'TPCH/part.tbl' USING PigStorage('|') AS (...);parts = FILTER part BY (p_brand != 'Brand#45') AND NOT (p_type matches 'MEDIUM POLISHED.*') AND (p_size IN (49, 14, 23, 45, 19, 3, 36 , 9);Join partsupp, selected parts, selected supplierspartsupp = LOAD 'TPCH/partsupp.tbl' using PigStorage('|') AS (...); part_partsupp = JOIN partsupp BY ps_partkey, parts BY p_partkey;not_pb_supp = JOIN part_partsupp BY ps_suppkey, suppkeys_pb BY s_suppkey;selected = FOREACH not_pb_supp GENERATE ps_suppkey, p_brand, p_type, p_size; grouped = GROUP selected BY (p_brand,p_type,p_size);count_supp = FOREEACH grouped GENERATE flatten(group), COUNT(selected.ps_suppkey) as supplier_cnt;result = ORDER count_supp BY supplier_cnt DESC, p_brand, p_type, p_size;STORE result INTO 'OUTPUT_PATH/tpch_query16';
10th of Sept., 2015 ASD'2015@Tanger 24
Extensions
●HDFS »Quantcast File System: erasure codes rather than replication »Spark: Resilient Distributed Dataset --> in-memory data storage
● MapReduce for Iterative jobs »Projects addressing Iterative Jobs for Hadoop 1.x: Peregrine,
HaLoop, ..
●OLAP »Join operations are very expensive
● CoHadoop implements Data Colocation --> Most efficient way to perform large scale joins in parallel
»Indexes: ● Hadoop++ (20×), HAIL (65×faster) than Hadoop: by
J. Dittrich et al.● Indexing data is stored in HDFS blocks read by Mappers
»The Workflow manager orchestrates jobs, and should implement advanced optimization techniques (metadata about data flows, parallel DAG processing)
10th of Sept., 2015 ASD'2015@Tanger 25
Beyond MapReduce
●SQL-on Hadoop»Apache Hive -HiveQL»Apache Drill (Google BigQuery)»Apache Spark -SparkSQL real-time, in-memory,
parallelized processing of Hadoop data»IBM BigSQL»Oracle Big Data SQL»Cloudera Impala
●Apache Hadoop YARN»Hadoop YARN Is open to integrate new frameworks for
parallel data processing: YARN Applications!
●Other Computational Models»BSP (Bulk Synchronous Parallelism): Apache Hama, Pregel
(Apache Giraph)»Multi-level serving trees, Dremel (Google BigQuery),
Apache Drill, Cloudera Impala
10th of Sept., 2015 ASD'2015@Tanger 26
Retrospective Analytics
●Traditional architectures
»DWS are deployed as part of an OLAP System separated out from
the OLTP system
»Data propagates down to the OLAP system, but typically after
some lag,
»Others use Pig latin/ Hadoop, but still introduce a substantial
delay
»This is a sufficient system for retrospective analytics, but does
not suffice in situations that require real-time analytics
10th of Sept., 2015 ASD'2015@Tanger 27
●By Nathan Marz
●An architecture for Big Data processing at Scale made up of 3
layers
●Batch Layer: Parallel DWS, Hadoop,..
●Speed Layer: Storm, Spark...
●Serving Layer: new software for querying both Batch and Speed
views
●Data System
»Store everything
»Data is time-based
»Data is immutable
●Query = function (all data)
Lambda Architecture? (1/5)
10th of Sept., 2015 ASD'2015@Tanger 28
-architecture: big picture (2/5)
10th of Sept., 2015 ASD'2015@Tanger 29
-architecture: Batch Layer (3/5)
●What does ?
» focus on the ingest and storage of large quantities of data and the
calculation of views from that data;
»Storage of an immutable, append-only, constantly expanding master
copy of the system’s data.
»Computation of views, derivative data for consumption by the serving
layer.
●Technologies
»Batch System: Apache Hadoop/MapReduce, Apache Pig latin, Apache
Spark
»Batch view databases: ElephantDB, SploutSQL
10th of Sept., 2015 ASD'2015@Tanger 30
-architecture: Speed Layer (4/5)
●What does?
»processes stream raw data into views and deploys those views on the
Serving Layer.
»Stream processing
»Continuous computation
●Technologies
»Apache Storm, Apache Spark Streaming,
»Speed Layer views
»The views need to be stored in a random writable database and are
Read & Write.
10th of Sept., 2015 ASD'2015@Tanger 31
-architecture: Serving Layer (5/5)
●What does?
»Focus on serving up views of the data as quickly as possible.
»queries the batch & real-time views and merges them,
»Should meet requirements of scalability and fault-tolerance.
●Technologies
»Read-only: ElephantDB, Druid
»Read & write: Riak, Cassanda, Redis
10th of Sept., 2015 ASD'2015@Tanger 32
Speaker research projects
●Past projects»Benchmarking of Apache Pig Latin using TPC-H benchmark»TPC-H*d: a multi-dimensional OLAP benchmark»AutoMDB: a framework for automated design of a multi-dim DB»OLAP*: supporting effectively parallel OLAP workload»DBaaS Expert: comparison and scoring of DBaaS offers
●Current projects»Distributed Approximate Query Processing»Implementation of TPC-DI»Lambda architecture for OLAP
10th of Sept., 2015 ASD'2015@Tanger 33
Thank you!
Q & A
A T A D W rchitectural and echnologies dvances for ata arehouse
Systems
Rim Moussa
ASD'2015@Tanger
10th of September, 2015