sqlbits xi - etl with hadoop

26
Jan Pieter Posthuma – Inter Access ETL with Hadoop and MapReduce

Upload: jan-pieter-posthuma

Post on 19-Jun-2015

486 views

Category:

Technology


2 download

DESCRIPTION

My SQLBits XI presentation about Hadoop, MapReduce and Hive

TRANSCRIPT

Page 1: SQLBits XI - ETL with Hadoop

Jan Pieter Posthuma – Inter Access

ETL with Hadoop and MapReduce

Page 2: SQLBits XI - ETL with Hadoop

2

Introduction

Jan Pieter Posthuma Technical Lead Microsoft BI and

Big Data consultant Inter Access, local consultancy firm in the

Netherlands Architect role at multiple projects Analysis Service, Reporting Service,

PerformancePoint Service, Big Data, HDInsight, Cloud BI

http://twitter.com/jppp

http://linkedin.com/jpposthuma

[email protected]

Page 3: SQLBits XI - ETL with Hadoop

3

Expectations

What to cover Simple ETL, so simple

sources Different way to achieve the

result

What not to cover Big Data Best Practices Deep internals Hadoop

Page 4: SQLBits XI - ETL with Hadoop

4

Agenda

Hadoop HDFS Map/Reduce

– Demo

Hive and Pig– Demo

Polybase

Page 5: SQLBits XI - ETL with Hadoop

5

Hadoop

Hadoop is a collection of software to create a data-intensive distributed cluster running on commodity hardware.

Widely accepted by Database vendors as a solution for unstructured data

Microsoft partners with HortonWorks and delivers their Hadoop Data Platform as Microsoft HDInsight

Available on premise and as an Azure service HortonWorks Data Platform (HDP) 100% Open Source!

Page 6: SQLBits XI - ETL with Hadoop

6

Hadoop

Fast

L

oad

Source Systems

Historical Data(Beyond Active Window)

Summarize & Load

Big Data Sources (Raw, Unstructured)

Alerts, Notifications

Data & Compute Intensive Application

ERP CRM LOB APPS

Integrate/Enrich

SQL Server StreamInsight

Enterprise ETL with SSIS, DQS, MDS

HDInsight on Windows Azure

HDInsight on Windows Server

SQL Server FTDW Data Marts

SQL Server Reporting Services

SQL Server Analysis Server

Business Insights

Interactive Reports

Performance Scorecards

Crawlers

Bots

Devices

Sensors

SQL Server Parallel Data Warehouse

Data Insights Value

Azure Market Place

1. Data

Warehousing:Storing and analysis of

structured data

4. Business Analytics:Interactionwith data

2. Map Reduce:

Storing and processing of

unstructured data

3. Streaming:

Predictive Maintenance aka

Real-time data processing

CREATE EXTERNAL TABLE CustomerWITH (LOCATION=‘hdfs://10.13.12.14:5000/user/Hadoop/Customer’, FORMAT_OPTIONS (FIELDS_TERMINATOR = ‘,’)ASSELECT * FROM DimCustomer

Page 7: SQLBits XI - ETL with Hadoop

7

Hadoop

HDFS – distributed, fault tolerant file system MapReduce – framework for writing/executing distributed,

fault tolerant algorithms Hive & Pig – SQL-like declarative languages Sqoop/PolyBase – package

for moving data between HDFS and relational DB systems

+ Others…

HDFS

Map/Reduce

Hive & PigSqoop /

Poly base

Avr

o (

Se

rial

iza

tion

)

HBaseZo

oke

epe

r

ETL Tools

BI Reporting

RDBMS

Page 8: SQLBits XI - ETL with Hadoop

8

HDFS

Large File11001010100111001010100111001010100111001010100111001100101010011100101010011100101010011100101010011100101010011100101010011100101010011100101010011100101010011100110010101001110010101001110010101001110010101001110010101001110010101001110010101001110010101001110010101001110011001010100111001010100111001010100111001010100111001010100111001010100111001010100111001010100111001010100111001100101010011100101010011100101010011100101010011100101010011100101010011100101010011100101010011100101010011100110010101001110010101001110010101001110010101001110010101001110010101001110010101001110010101001110010101001110011001010100111001010100111001010100111001010100111001010100111001010100111001010100111001010100111001010100111001100101010011100101010011100101010011100101010011100101010011100101010011100101010011100101010011100101010011100110010101001110010101001110010101001110010101001110010101001

6440MB

Block 1

Block 2

Block 3

Block 4

Block 5

Block 6

Block 100

Block 101

64MB 64MB 64MB 64MB 64MB 64MB

64MB 40MB

Block 1

Block 2

Let’s color-code them

Block 3

Block 4

Block 5

Block 6

Block 100

Block 101

e.g., Block Size = 64MBHDFS

Map/Reduce

Hive & Pig Sqoo

p / Poly base

Files are composed of set of blocks• Typically 64MB in size• Each block is stored as a separate

file in the local file system (e.g. NTFS)

Page 9: SQLBits XI - ETL with Hadoop

9

HDFS

NameNode BackupNode

DataNode DataNode DataNode DataNode DataNode

(heartbeat, balancing, replication, etc.)

nodes write to local disk

namespace backups

HDFS was designed with the expectation that failures (both hardware and software) would occur frequently

Page 10: SQLBits XI - ETL with Hadoop

10

Map/Reduce

Programming framework (library and runtime) for analyzing data sets stored in HDFS

MR framework provides all the “glue” and coordinates the execution of the Map and Reduce jobs on the cluster.

– Fault tolerant– Scalable

Map function:

var map = function(key, value, context) {}

Reduce function:

var reduce = function(key, values, context) {} HDFS

Hive & Pig Sqoo

p / Poly base

Map/Reduce

Page 11: SQLBits XI - ETL with Hadoop

11

Map/Reduce<keyA, valuea><keyB, valueb><keyC, valuec>…

<keyA, valuea><keyB, valueb><keyC, valuec>…

<keyA, valuea><keyB, valueb><keyC, valuec>…

<keyA, valuea><keyB, valueb><keyC, valuec>…

Output

Reducer

<keyA, list(valuea, valueb, valuec, …)>

Reducer

<keyB, list(valuea, valueb, valuec, …)>

Reducer

<keyC, list(valuea, valueb, valuec, …)>

Sort and

groupbykey

DataNode

DataNode

DataNode

Mapper<keyi, valuei>

Mapper<keyi, valuei>

Mapper<keyi, valuei>

Mapper<keyi, valuei>

Page 12: SQLBits XI - ETL with Hadoop

12

Demo

Weather info: Need daily max and min temperature per station

var map = function (key, value, context) {

if (value[0] != '#') {

var allValues = value.split(',');

if (allValues[7].trim() != '') {

context.write(allValues[0]+'-'+allValues[1],

allValues[0] + ',' + allValues[1] + ',' + allValues[7]);

}}};

Output <key, value>:

<“210-19510101”, “210,19510101,-4”>

<“210-19510101”, “210,19510101,1”>

# STN,YYYYMMDD,HH, DD,FH, FF,FX, T,T10,TD,SQ, Q,DR,RH, P,VV, N, U,WW,IX, M, R, S, O, Y# 210,19510101, 1,200, , 93, ,-4, , , , , , ,9947, , 8, , 5, , , , , , 210,19510101, 2,190, ,108, , 1, , , , , , ,9937, , 8, , 5, , 0, 0, 0, 0, 0

Page 13: SQLBits XI - ETL with Hadoop

13

Demo (cont.)

var reduce = function (key, values, context) {

var mMax = -9999;

var mMin = 9999;

var mKey = key.split('-');

while (values.hasNext()) {

var mValues = values.next().split(',');

mMax = mValues[2] > mMax ? mValues[2] : mMax;

mMin = mValues[2] < mMin ? mValues[2] : mMin; }

context.write(key.trim(),

mKey[0].toString() + '\t' +

mKey[1].toString() + '\t' +

mMax.toString() + '\t' +

mMin.toString()); };

Reduce Input <key, values:=list(value1, …, valuen)>:<“210-19510101”, {“210,19510101,-4”, “210,19510101,1”}>

Map Output <key, value>:<“210-19510101”, “210,19510101,-4”><“210-19510101”, “210,19510101,1”>

Page 14: SQLBits XI - ETL with Hadoop

Demo

Page 15: SQLBits XI - ETL with Hadoop

15

Hive and Pig

Query: Find the sourceIP address that generated the most adRevenue along with its average pageRank

Rankings (

pageURL STRING,pageRank INT,avgDuration INT

);

UserVisits (

sourceIP STRING,destURL STRINGvisitDate DATE,adRevenue FLOAT,.. // fields omitted

);

Join required

HDFS

Map/Reduce

Hive & PigSqoop / Poly base

package edu.brown.cs.mapreduce.benchmarks;import java.util.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;import org.apache.hadoop.util.*;import org.apache.hadoop.mapred.lib.*;import org.apache.hadoop.fs.*;import edu.brown.cs.mapreduce.BenchmarkBase; public class Benchmark3 extends Configured implements Tool { public static String getTypeString(int type) { if (type == 1) { return ("UserVisits"); } else if (type == 2) { return ("Rankings"); } return ("INVALID"); } /* (non-Javadoc) * @see org.apache.hadoop.util.Tool#run(java.lang.String[]) */ public int run(String[] args) throws Exception { BenchmarkBase base = new BenchmarkBase(this.getConf(), this.getClass(), args); Date startTime = new Date(); System.out.println("Job started: " + startTime);

1

// Phase #1 // ------------------------------------------- JobConf p1_job = base.getJobConf(); p1_job.setJobName(p1_job.getJobName() + ".Phase1"); Path p1_output = new Path(base.getOutputPath().toString() + "/phase1"); FileOutputFormat.setOutputPath(p1_job, p1_output); // // Make sure we have our properties // String required[] = { BenchmarkBase.PROPERTY_START_DATE, BenchmarkBase.PROPERTY_STOP_DATE }; for (String req : required) { if (!base.getOptions().containsKey(req)) { System.err.println("ERROR: The property '" + req + "' is not set"); System.exit(1); } } // FOR p1_job.setInputFormat(base.getSequenceFile() ? SequenceFileInputFormat.class : KeyValueTextInputFormat.class); if (base.getSequenceFile()) p1_job.setOutputFormat(SequenceFileOutputFormat.class); p1_job.setOutputKeyClass(Text.class); p1_job.setOutputValueClass(Text.class); p1_job.setMapperClass(base.getTupleData() ? edu.brown.cs.mapreduce.benchmarks.benchmark3.phase1.TupleWritableMap.class : edu.brown.cs.mapreduce.benchmarks.benchmark3.phase1.TextMap.class); p1_job.setReducerClass(base.getTupleData() ? edu.brown.cs.mapreduce.benchmarks.benchmark3.phase1.TupleWritableReduce.class : edu.brown.cs.mapreduce.benchmarks.benchmark3.phase1.TextReduce.class); p1_job.setCompressMapOutput(base.getCompress()); 2

// Phase #2 // ------------------------------------------- JobConf p2_job = base.getJobConf(); p2_job.setJobName(p2_job.getJobName() + ".Phase2"); p2_job.setInputFormat(base.getSequenceFile() ? SequenceFileInputFormat.class : KeyValueTextInputFormat.class); if (base.getSequenceFile()) p2_job.setOutputFormat(SequenceFileOutputFormat.class); p2_job.setOutputKeyClass(Text.class); p2_job.setOutputValueClass(Text.class); p2_job.setMapperClass(IdentityMapper.class); p2_job.setReducerClass(base.getTupleData() ? edu.brown.cs.mapreduce.benchmarks.benchmark3.phase2.TupleWritableReduce.class : edu.brown.cs.mapreduce.benchmarks.benchmark3.phase2.TextReduce.class); p2_job.setCompressMapOutput(base.getCompress()); // Phase #3 // ------------------------------------------- JobConf p3_job = base.getJobConf(); p3_job.setJobName(p3_job.getJobName() + ".Phase3"); p3_job.setNumReduceTasks(1); p3_job.setInputFormat(base.getSequenceFile() ? SequenceFileInputFormat.class : KeyValueTextInputFormat.class); p3_job.setOutputKeyClass(Text.class); p3_job.setOutputValueClass(Text.class); //p3_job.setMapperClass(Phase3Map.class); p3_job.setMapperClass(IdentityMapper.class); p3_job.setReducerClass(base.getTupleData() ? edu.brown.cs.mapreduce.benchmarks.benchmark3.phase3.TupleWritableReduce.class : edu.brown.cs.mapreduce.benchmarks.benchmark3.phase3.TextReduce.class);

3

  // // Execute #1 // base.runJob(p1_job); // // Execute #2 // Path p2_output = new Path(base.getOutputPath().toString() + "/phase2"); FileOutputFormat.setOutputPath(p2_job, p2_output); FileInputFormat.setInputPaths(p2_job, p1_output); base.runJob(p2_job);  // // Execute #3 // Path p3_output = new Path(base.getOutputPath().toString() + "/phase3"); FileOutputFormat.setOutputPath(p3_job, p3_output); FileInputFormat.setInputPaths(p3_job, p2_output); base.runJob(p3_job); // There does need to be a combine if (base.getCombine()) base.runCombine();  return 0; }}

4

Page 16: SQLBits XI - ETL with Hadoop

16

Hive and Pig

Principle is the same: easy data retrieval Both use MapReduce Different founders Facebook (Hive) and Yahoo (PIG) Different language SQL like (Hive) and more procedural (PIG) Both can store data in tables, which are stored as HDFS file(s) Extra language options to use benefits of Hadoop

– Partition by statement– Map/Reduce statement

‘Of the 150k jobs Facebook runs daily, only 500 are MapReduce jobs. The rest are is HiveQL’

Page 17: SQLBits XI - ETL with Hadoop

17

Hive

Query 1: SELECT count_big(*) FROM lineitem

Query 2: SELECT max(l_quantity) FROM lineitem WHERE l_orderkey>1000 and l_orderkey<100000

GROUP BY l_linestatus

Query 1 Query 20

500

1000

15001318

1397

252 279

HivePDW

Secs.

Page 18: SQLBits XI - ETL with Hadoop

18

Demo

Use the same data file as previous demo But now we directly ‘query’ the file

Page 19: SQLBits XI - ETL with Hadoop

Demo

Page 20: SQLBits XI - ETL with Hadoop

20

Polybase

PDW v2 introduces external tables to represent HDFS data PDW queries can now span HDFS and PDW data Hadoop cluster is not part of the appliance

Social Apps

Sensor & RFID

Mobile Apps

WebApps

Unstructured data Structured data

RDBMS

HDFS EnhancedPDW

query engine

T-SQL

Relationaldatabases

HDFS

Map/Reduce

Hive & Pig Sqoo

p / Poly base

Page 21: SQLBits XI - ETL with Hadoop

Polybase

Control NodeSQL

Server

Compute Node

SQL Server

Compute Node

SQL Server

Compute Node…

SQL ServerPDW Cluster

Namenode(HDFS)

Node

DN

Node

DN

Node

DN

Node

DN

Node

DN

Node

DN

Node

DN

Node

DN

Node

DN

Node

DN

Node

DN

Node

DN

Hadoop Cluster

21

This is PDW!

Page 22: SQLBits XI - ETL with Hadoop

22

PDW Hadoop

1. Retrieve data from HDFS with a PDW query– Seamlessly join structured and semi-structured data

2. Import data from HDFS to PDW– Parallelized CREATE TABLE AS SELECT (CTAS)– External tables as the source– PDW table, either replicated or distributed, as destination

3. Export data from PDW to HDFS– Parallelized CREATE EXTERNAL TABLE AS SELECT (CETAS)– External table as the destination; creates a set of HDFS files

SELECT Username FROM ClickStream c, User u WHERE c.UserID = u.ID AND c.URL=‘www.bing.com’;

CREATE TABLE ClickStreamInPDW WITH DISTRIBUTION = HASH(URL) AS SELECT URL, EventDate, UserID FROM ClickStream;

CREATE EXTERNAL TABLE ClickStream2 (URL, EventDate, UserID) WITH (LOCATION =‘hdfs://MyHadoop:5000/joe’, FORMAT_OPTIONS (...) AS SELECT URL, EventDate, UserID FROM ClickStreamInPDW;

Page 23: SQLBits XI - ETL with Hadoop

23

Recap

Hadoop is the next big thing for DWH/BI Not a replacement, but an new dimension Many ways to integrate it’s data

What’s next?– Polybase combined with (custom) Map/Reduce?– HDInsight appliance?– Polybase for SQL Server vNext?

Page 24: SQLBits XI - ETL with Hadoop

24

References

Microsoft BigData (HDInsight):http://www.microsoft.com/bigdata

Microsoft HDInsight Azure (3 months free trail):http://www.windowsazure.com

Hortonworks Data Platform sandbox (VMware): http://hortonworks.com/download/

Page 25: SQLBits XI - ETL with Hadoop

Q&A

Page 26: SQLBits XI - ETL with Hadoop

Coming up…

Speaker Title Room

Alberto Ferrari DAX Query Engine Internals Theatre

Wesley Backelant An introduction to the wonderful world of OData Exhibition B

Bob Duffy Windows Azure For SQL folk Suite 3

Dejan Sarka Excel 2013 Analytics Suite 1

Mladen PrajdićFrom SQL Traces to Extended Events. The next big switch. Suite 2

Sandip Pani New Analytic Functions in SQL server 2012 Suite 4

#SQLBITS