a brave new object store world

32
© 2017 IBM Corporation Deep dive into high performance analytics with Apache Spark and Object Storage Effi Ofer [email protected]. com In collaboration with Elliot Kolodner, Gil Vernik, Kalman meth, Maya Anderson, and Michael Factor from IBM Research as well as Francesco Pace and Pietro Michiardi from Eurecom Mar 2017 A brave new object store world

Upload: effi-ofer

Post on 21-Apr-2017

70 views

Category:

Data & Analytics


0 download

TRANSCRIPT

Page 1: A Brave new object store world

© 2017 IBM Corporation

Deep dive into high performance analytics with

Apache Spark and Object Storage

Effi Ofer

[email protected]

In collaboration with Elliot Kolodner, Gil Vernik, Kalman meth, Maya Anderson, and Michael Factor from IBM Research as well as Francesco Pace and Pietro Michiardi from Eurecom

Mar 2017

A brave new object store world

Page 2: A Brave new object store world

© 2017 IBM Corporation2Page© 2016 IBM Corporation 2Page© 2016 IBM Corporation

2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020

Data - the natural resource of the 21st century

With the rise of cloud, mobility, IoT, social and analytics, data explosion is accelerating.

This confluence of technologies has amplified the data explosion, creating incredible

growth-on-growth for unstructured data. New data sources are added daily, resulting in a

valuable data ecosystem for every business.

500

375

250

125

75 billion Internet-connected devices by 20202

90%of all data was created in the last 2 years1

3

Sources:

1. Science Daily, Big Data, for better or worse: 90% of world’s data generated over last two year, 2013

2. Business Insider, Morgan Stanley: 75 Billion Devices Will Be Connected to The Internet of Things By 2020, 2013

3. Digital Universe of Opportunities: Rich Data & The Increasing Value of the Internet of Things,

EMC Digital Universe with Research & Analysis by IDC, April 2014

Projected

Exabytes

Page 3: A Brave new object store world

© 2017 IBM Corporation

Historic

Traffic

Sensor Data

Extracting insights from data is critical

Are current traffic

conditions anomalous?What are “normal” traffic

conditions at morning

rush hour?

How has the traffic

flow changed over

time?

How will the flow

change if a traffic lane

is removed?

Page 4: A Brave new object store world

© 2017 IBM Corporation

Agenda

– Introduction to storing data: Object Store and IBM COS

– Introduction to analyzing data: Apache Spark

– Deep dive into Spark and storage

Page 5: A Brave new object store world

© 2017 IBM Corporation

Object Storage

▪ High capacity, low cost storage

▪ Examples:

– IBM Cloud Object Storage (IBM COS)

– Azure Blob Storage

– OpenStack Swift

– Amazon S3

▪ Objects stored in flat address space

▪ An object encapsulates data and metadata

▪ Whole object creation; no update in-place

▪ Accessed through RESTful HTTP

– Ideal for mobile, natural for cloud

Page 6: A Brave new object store world

© 2017 IBM Corporation6Page© 2016 IBM Corporation 6Page© 2016 IBM Corporation

IBM Cloud Object Storage:designed for cloud-scale data

Address unstructured data storage from petabyte to exabyte with reliability, security, availability and disaster recovery

Making IBM COS extremely durable, even at large scale

– Shared nothing architecture, with strong consistency for data

– Scalable namespace mapping with no centralized metadata

– Highly reliable and available without replication using erasure coding

– Distributed Rebuilder

– Distributed collection and storage of statistics needed for

Management

– APIs for integration with external management applications

– Support “lazy” handling of disk failures

– Automated network installation

Page 7: A Brave new object store world

© 2017 IBM Corporation7Page© 2016 IBM Corporation 7Page© 2016 IBM Corporation Our object storage requires only 1.7 TBs raw storage capacity for 1 TB of usable storage.

0.56 TB

D.C.

0.56 TB

Dallas

0.56 TB

San Jose

1.7 TB of raw storage

Three complete copies of the

object—plus overhead

—are distributed and

maintained in separate

locations in case of failure or

disaster. Resulting in 3.6 TB

of total storage consumed.

With traditional storage, a

single 1 TB object will be

replicated three times.

Traditional Storage

1 TB of usable data

IBM Cloud Object Storage

Traditional storage requires 3.6 TBs raw storage capacity for 1 TB of usable storage.

With IBM Cloud Object

storage there’s no need

to store replicated data

in different systems.

A single TB of object storage

is encrypted and sliced but

never replicated.

Slices are distributed

geographically for durability

and availability.

You can lose some number

of slices due to failure or

disaster, and still quickly

recover 100% of your data.

IBM Cloud Object Storage

requires less than half the

storage and 70% lower TCO.

Traditional Storage makes Big Data even biggerMultiple copies, more storage, more overhead

1.2 TB

D.C.

1.2 TB

Dallas

1.2 TB

San Jose

3.6 TB of raw storage

IBM Cloud Object Storage is built for cloud-scale dataJust as reliable, less complex, more cost-efficient than traditional storage

1 TB of usable data

Page 8: A Brave new object store world

© 2017 IBM Corporation8Page© 2016 IBM Corporation 8Page© 2016 IBM Corporation

IBM Cloud Object Storage delivers industry leading flexibility, scalability and simplicity

On-Premise• Single tenant

• Design specific to needs

• Total control of system

• Local to on-premise compute

Dedicated• No datacenter space required

• Single tenant

• Flexible configuration options

• OPEX vs CAPEX

Hybrid

Same as on-premise plus the following:

• Economic benefits of more dispersed sites (i.e., 3 rather than 2)

• On-premise storage replicated to the cloud

• Ability to add capacity to an on-premise deployment when there is no more data center

space available

Public

IBM managed options provide full management, monthly billing

Regional

Cross Regional

• Usage-based pricing

• Elastic capacity

• No datacenter space

required

• Fully managed

• Data local to in-cloud

compute

• Immediate worldwide

footprint

• OPEX vs CAPEX

Page 9: A Brave new object store world

© 2017 IBM Corporation9Page© 2016 IBM Corporation 9Page© 2016 IBM Corporation

0.56 TB

D.C.

0.56 TB

Dallas

0.56 TB

San Jose

1.7 TB of raw storage

IBM Cloud Object Storage

Slices are distributed geographically

for durability and availability.

- D.C

- Dallas

- San Jose

IBM Cloud Object Storage is available

on Bluemix

Traditional Storage makes Big Data even biggerMultiple copies, more storage, more overhead IBM Cloud Object Storage on BluemixCross Regional Support

1 TB of usable data

Page 10: A Brave new object store world

© 2017 IBM Corporation

Agenda

– Introduction to storing data: Object Store and IBM COS

– Introduction to analyzing data: Apache Spark

– Deep dive into Spark and storage

Page 11: A Brave new object store world

© 2017 IBM Corporation

Driving value from data: Data analytics with Apache

Spark is an open source

in-memory application framework for

distributed data processing and

iterative analysis

on massive data volumes

Executor

Task Task

Executor

Task Task

Executor

Task Task

Driver

Page 12: A Brave new object store world

© 2017 IBM Corporation

Spark enables analytic over many different data sources

Spark Core

general compute

engine, handles

distributed task

dispatching, scheduling

and basic I/O functions

Spark SQLSpark

Streaming

MLlib

(machine

learning)

GraphX

(graph)

executes SQL

statements

performs

streaming

analytics using

micro-batches

common

machine

learning and

statistical

algorithms

distributed

graph

processing

framework

large variety of

data sources and

formats can be

supported, both

on-premise or

cloud

BigInsights

(HDFS)

Cloudant

dashDB

Object

Storage

SQL

DB

…many

others

IBM CLOUD OTHER CLOUD OTHER ON-PREM IBM ON-PREM

Page 13: A Brave new object store world

© 2017 IBM Corporation

Agenda

– Introduction to storing data: Object Store and IBM COS

– Introduction to analyzing data: Apache Spark

–Deep dive into Spark and storage

Page 14: A Brave new object store world

© 2017 IBM Corporation

Connecting Spark to Storage

▪ Spark interacts with its storage system using the Hadoop

Filesystem

▪ A connector is implemented for each storage system such as

– HDFS (Hadoop Distributed File System)

– Object Storage such as IBM COS, S3, OpenStack Swift

▪ Example of reading and writing from HDFSdata = sc.textFile(“hdfs://vault1/inputObjectName”)

data.saveAsTextFile(“hdfs://vault1/outputObjectName”)

▪ Example of reading and writing from IBM Cloud Object Storagedata = sc.textFile(“s3d://vault1.service/inputObjectName”)

data.saveAsTextFile(“s3d://vault1.service/outputObjectName”)

Hadoop Filesystem Interface

Page 15: A Brave new object store world

© 2017 IBM Corporation

Using HDFS with co-located storage and compute has its pain points...

▪ Scale storage independently– Directly use data stored for other reasons

▪ Same cloud and on-prem experiences

▪ Need to scale compute with storage – Poor match to explosive data growth

▪ Cloud and on-prem experiences differ– Cloud services run on object storage

Spark

HDFS

Spark

HDFS

Traditional Deployment

COS

Spark Spark

Deployment with IBM Cloud Object Storage

…which object storage as the persistent storage layer addresses

Page 16: A Brave new object store world

© 2017 IBM Corporation

How Spark writes to HDFS

The Spark driver and executor recursively create the directories for the task temporary, job temporary and final output (steps 1-2)

The task outputs the task temporary file (step 3).

At task commit the executor lists the task temporary directory, and renames the file it finds to its job temporary name (steps 4-5).

At job commit the driver recursively lists the job temporary directories and renames the file it finds to its final names (steps 6-7).

The driver writes the _SUCCESS object.

1. Spark driver: make directories recursively:

hdfs://res/data.txt/_temporary/0

2. Spark executor: make directories recursively:

hdfs://res/data.txt/_temporary/0/_temporary/attempt_201702221313_0000_m_000001_1

3. Spark executor: write task temporary object

hdfs://res/data.txt/_temporary/0/_temporary/attempt_201702221313_0000_m_000001_1/part-00001

4. Spark executor: list directory:

hdfs://res/data.txt/_temporary/0/_temporary/attempt_201702221313_0000_m_000001_1

5. Spark executor: rename task temporary object to job temporary object:

hdfs://res/data.txt/_temporary/0/task_201702221313_0000_m_000001/part-00000

6. Spark driver: list job temporary directories recursively:

hdfs://res/data.txt/_temporary/0/task_201702221313_0000_m_000001

7. Spark driver: rename job temporary object to final name:

hdfs://res/data.txt/part-00000

8. Spark driver: write _SUCCESS object

hdfs://res/data.txt/_SUCCESS

Simple code that writes a single file to storage:

val data = Array(1)

val distData = sc.parallelize(data)

val finalData = distData.coalesce(1)

finalData.saveAsTextFile(“hdfs://vault1/data.txt")

Page 17: A Brave new object store world

© 2017 IBM Corporation

How Spark writes to Object Storage

File operations are translated to RESTful calls:

• HEAD

• GET

• PUT

• COPY

• DELETE

1. Spark driver: make directories recursively:

s3a:// vault1/data.txt/_temporary/0

2. Spark executor: make directories recursively:

s3a:// vault1/data.txt/_temporary/0/_temporary/attempt_201702221313_0000_m_000001_1

3. Spark executor: write task temporary object

s3a:// vault1/data.txt/_temporary/0/_temporary/attempt_201702221313_0000_m_000001_1/part-00001

4. Spark executor: list directory:

s3a:// vault1/data.txt/_temporary/0/_temporary/attempt_201702221313_0000_m_000001_1

5. Spark executor: rename task temporary object to job temporary object:

s3a:// vault1/data.txt/_temporary/0/task_201702221313_0000_m_000001/part-00000

6. Spark driver: list job temporary directories recursively:

s3a:// vault1/data.txt/_temporary/0/task_201702221313_0000_m_000001

7. Spark driver: rename job temporary object to final name:

s3a:// vault1/data.txt/part-00000

8. Spark driver: write _SUCCESS object

s3a://vault1/data.txt/_SUCCESS

Simple code that writes a single file to storage:

val data = Array(1)

val distData = sc.parallelize(data)

val finalData = distData.coalesce(1)

finalData.saveAsTextFile(“s3a://vault1/data.txt")

Page 18: A Brave new object store world

© 2017 IBM Corporation

So what is going on here? What is written to Storage?

▪ Output to storage using the Hadoop FileOutputCommitter

▪ Each task execution

– Attempts to write its own task temporary file

– At task commit, renames the task temporary file to a job temporary file

– Task commit is done by the executors, so it occurs in parallel.

▪ When all of the tasks of a job complete, the driver

– Calls the output committer to do job commit

– Renames the job temporary files to their final names

– Task commit occurs in the driver after all of the tasks have committed and does not benefit from

parallelism

Page 19: A Brave new object store world

© 2017 IBM Corporation

Why such complexity ?

Avoids incomplete results being misinterpreted as complete

Can it be simplified ?

Output committer version 2

– Task temporary files are renamed to their final names at task commit

– job commit is largely reduced to the writing of the _SUCCESS object

– However, as of Hadoop 2.7.3, this algorithm is not yet default

https://issues.apache.org/jira/browse/MAPREDUCE-6336

Can it be improved even further ?

Page 20: A Brave new object store world

© 2017 IBM Corporation

Introducing Stocator

▪ A fast object store connector for Apache Spark that takes advantage of object store semantics

▪ Available at https://github.com/SparkTC/stocator

▪ Marks output as made by Stocator– The driver is responsible for creating a ‘directory’ to hold the output dataset

– Stocator makes use of this ‘directory’ as a marker that it wrote the output

– This ‘directory’ is a zero byte object with the name of the dataset

▪ Avoid renames– When asked to create a temporary object,

recognize the pattern of the name and writes the object directly to its final name

– For example<dataset-name>/_temporary/0/_temporary/attempt_<job-timestamp>_0000_m_000000_<attempt-number>/part-<part-number>

– Becomes<dataset-name>/part-<part-number>_attempt_<job-timestamp>_0000_m_000000_<attempt-number>

▪ When all tasks are done, write the _SUCCESS object

Hadoop Filesystem Interface

Stocator

Page 21: A Brave new object store world

© 2017 IBM Corporation

Writing to Object Storage using Stocator

1. Spark driver: create a ‘directory’

s3d:// vault1.service/data.txt

2. Spark driver: get container

s3d:// vault1.service/data.txt

3. Spark executor: create output object:

s3d:// vault1.service/data.txt/part-00000-attempt_201702231115_0001_m_000000_1000

4. Spark driver: write _SUCCESS object

s3d://vault1.service/data.txt/_SUCCESS

Simple code that writes a single file to storage:

val data = Array(1)

val distData = sc.parallelize(data)

val finalData = distData.coalesce(1)

finalData.saveAsTextFile(“s3d://vault1.service/data.txt")

Page 22: A Brave new object store world

© 2017 IBM Corporation

Wordcount using Stocator

First, provide the credentials for the object store where the data resides:

Page 23: A Brave new object store world

© 2017 IBM Corporation

Wordcount using Stocator

And now write your code to access and manipulate the data:

Page 24: A Brave new object store world

© 2017 IBM Corporation

Wordcount using Stocator

Finally, this is our output:

Page 25: A Brave new object store world

© 2017 IBM Corporation

Performance evaluations

▪ Spark cluster

– Spark 2.0.1

• 3 x Dual Intel Xeon E52690 (12 Hyper-threaded Cores, 2.60 GHz)

• 256 GB RAM, 1 x 10Gbps, 1 x 1TB SATA

▪ Object store

– IBM Cloud Object Storage cluster

#Jobs #Stages Input size Output size

Copy 1 1 50 GB 50 GB

Read 1 1 50 GB --

Teragen 1 1 -- 50 GB

Wordcount 1 2 50 GB 1.6 MB

Terasort 2 4 50 GB 50 GB

TPC-DS 112 179 50 GB (raw),

15 GB

Parquet

--

Page 26: A Brave new object store world

© 2017 IBM Corporation

Comparing Stocator to base Hadoop swift and s3a

0

100

200

300

400

500

600

700

800

Teragen Copy Terasort Wordcount Read (50GB) Read (500GB) TPC-DS

Seco

nd

s

Stocator Hadoop Swift S3a

18x 10x 9x 2x 1x1x 1x*

Stocator is

• much faster for write workloads

• about the same for read workloads

* Comparing stocator to s3a

Page 27: A Brave new object store world

© 2017 IBM Corporation

Comparing Stocator to s3a with non default features

0

100

200

300

400

500

600

700

800

Teragen Copy Terasort Wordcount Read (50 GB) Read (500 GB) TPC-DS

seco

nd

s

Stocator S3a S3a CV2 S3a CV2+FU

1.5x 1.3x 1.3x 1.1x 1x1x 1x*

* Comparing stocator to s3a commit version 2 + fast upload

Page 28: A Brave new object store world

© 2017 IBM Corporation

Comparing REST operations

0

5000

10000

15000

20000

25000

30000

35000

40000

45000

Teragen Copy Terasort Wordcount Read (50GB) Read (500GB) TPC-DS

RES

Tfu

l op

erat

ion

s

Stocator Hadoop Swift S3a

33x 25x 24x 25x 2x2x 2x*

Stocator has a lower impact

on the object store than s3a

• Reduce cost

• Reduce overhead

* Comparing stocator to s3a

Page 29: A Brave new object store world

© 2017 IBM Corporation

Questions?

Page 30: A Brave new object store world

© 2017 IBM Corporation

Backup Slides

Page 31: A Brave new object store world

© 2017 IBM Corporation

Reading a dataset from Stocator

▪ Confirm that the dataset was produced by Stocator

– Using the metadata from the initial ‘directory’

▪ Confirm that the _SUCCESS object exists

▪ Lists the object parts belonging to the dataset

– Using GET container RESTful call

▪ Are there multiple objects from different execution attempts?

– Choose the one that has the most data

– Given

• fail-stop assumption (i.e. Spark server executes correctly until it halts)

• All successful execution attempts write the same output

• There are no in place updates in an object store

• At least one attempt succeeded (evident by the _SUCCESS object)

Page 32: A Brave new object store world

© 2017 IBM Corporation

Additional optimizations

▪ Streaming of output– Normally, the length of an object is a parameter to a PUT operation– But this means Spark needs to cache the entire object prior to issuing PUT– Hadoop swift and s3a by default cache the object in a local file system– Stocator leverages HTTP chunked transfer encoding

• Object is sent in chucked (64KB in stocator)• No need to know final object length before issuing the PUT

– Similar to s3a fast upload• Minimum size of 5MB per part

▪ Avoid HEAD operation just before GET– Often the HEAD is used just to confirm that the object exists and determine its size– However GET also returns the object meta data– In many cases Stocator is able to avoid the extra HEAD call before a GET

▪ Cache the results of HEAD operations– Spark assumes input dataset is immutable