hadoop spark introduction-20150130

42
Spark Introduction Ziv Huang 2015/01/30

Upload: xuan-chao-huang

Post on 19-Jul-2015

339 views

Category:

Software


1 download

TRANSCRIPT

Page 1: Hadoop Spark Introduction-20150130

Spark Introduction Ziv Huang 2015/01/30

Page 2: Hadoop Spark Introduction-20150130

Purpose of this introduction

Help you to construct an idea of Spark regarding its

architecture, data flow, job scheduling, and programming.

Just to give you a picture, not all technical details!

Page 3: Hadoop Spark Introduction-20150130

Outline

What is Spark

Architecture

Application Workflow

Job Scheduling

Submitting Applications

Programming Guide

Page 4: Hadoop Spark Introduction-20150130

Outline

What is Spark

Architecture

Application Workflow

Job Scheduling

Submitting Applications

Programming Guide

Page 5: Hadoop Spark Introduction-20150130

What is Spark A fast and general engine for large-scale data processing

(in Hadoop, it is a MapReduce replacer as long as the stability is

guaranteed and APIs are comprehensive enough)

Run programs up to 100x faster than Hadoop MapReduce in

memory, or 10x faster on disk.

Spark has an advanced DAG execution

engine that supports cyclic data flow and

in-memory computing.

Write applications quickly in Java, Scala or Python

Spark offers over 80 high-level operators

that make it easy to build parallel apps. And

you can use it interactively from the Scala

and Python shells.

Page 6: Hadoop Spark Introduction-20150130

What is Spark Combine SQL, streaming, and complex analytics

Spark powers a stack of high-level tools including Spark

SQL, MLlib for machine learning, GraphX, and Spark

Streaming. You can combine these libraries seamlessly in

the same application

Spark runs on Hadoop Yarn, Mesos, standalone, or in the cloud.

It can access diverse data sources including HDFS, Cassandra,

HBase, S3

Amazon Simple Storage Service

Page 7: Hadoop Spark Introduction-20150130

Outline

What is Spark

Architecture

Application Workflow

Job Scheduling

Submitting Applications

Programming Guide

Page 8: Hadoop Spark Introduction-20150130

Architecture

Data Node Data Node Data Node

Spark

worker

Spark

worker

Spark

worker

Spark

driver

HDFS

1. launch job

2. Allocate resource 3. Assign task & return result

Cluster

manager

Components: Driver (your main program), Cluster manager, and Spark workers

Page 9: Hadoop Spark Introduction-20150130

Architecture

Data Node Data Node Data Node

Spark

worker

Spark

worker

Spark

worker

Spark

driver

HDFS

1. launch job

2. Allocate resource 3. Assign task & return result

Spark applications run as independent sets of processes on a cluster,

coordinated by the SparkContext object in the driver program (your main

program)

SparkContext

Note:

Driver and

Cluster manager

can be on the

same or

different

machines; can be

inside or outside

the HDFS

cluster

Cluster

manager

Page 10: Hadoop Spark Introduction-20150130

Architecture

Data Node Data Node Data Node

Spark

worker

Spark

worker

Spark

worker

Spark

driver

Cluster

manager

HDFS

1. launch job

2. Allocate resource 3. Assign task & return result

To run on a cluster, the SparkContext can connect to several types of cluster

managers (either Spark’s own standalone cluster manager or Mesos/YARN), which

allocate resources across applications

SparkContext

Spark

standalone

master; or

Yarn resource

manager; or

Mesos master

Page 11: Hadoop Spark Introduction-20150130

Architecture

Data Node Data Node Data Node

Spark

worker

Spark

worker

Spark

worker

Spark

driver

Cluster

manager

HDFS

1. launch job

2. Allocate resource 3. Assign task & return result

1. Once connected, Spark acquires executors on nodes in the cluster, which are

processes that run computations and store data for your application.

2. Next, it sends your application code (defined by JAR or Python files passed to

SparkContext) to the executors.

3. Finally, SparkContext sends tasks for the executors to run

SparkContext

Executor Executor Executor

Task Task Task Task Task Task

jar jar jar

Page 12: Hadoop Spark Introduction-20150130

Architecture : Notes

Each application gets its own executor processes, which stay up for

the duration of the whole application and run tasks in multiple

threads.

Pros Isolating applications from each other, on both

the scheduling side (each driver schedules its

own tasks) and executor side (tasks from

different applications run in different JVMs).

Cons Data cannot be shared across different Spark

applications (instances of SparkContext) without

writing it to an external storage system.

Page 13: Hadoop Spark Introduction-20150130

Architecture : Notes

Spark is agnostic to the underlying cluster manager. As long as it

can acquire executor processes, and these communicate with each

other, it is relatively easy to run it even on a cluster manager that

also supports other applications (e.g. Mesos/YARN)

Because the driver schedules tasks on the cluster, it should be run

close to the worker nodes, preferably on the same local area

network. If you’d like to send requests to the cluster remotely, it’s

better to open an RPC to the driver and have it submit operations

from nearby than to run a driver far away from the worker nodes.

Page 14: Hadoop Spark Introduction-20150130

Outline

What is Spark

Architecture

Application Workflow

Job Scheduling

Submitting Applications

Programming Guide

Page 15: Hadoop Spark Introduction-20150130

Application Workflow (an example)

Input Output Depending on the driver program,

there could be many stages in an

application.

Page 16: Hadoop Spark Introduction-20150130

These are RDDs.

RDD (Resilient Distributed Datasets):

a collection of elements partitioned across the nodes of the cluster that

can be operated on in parallel

created by starting with a file in the Hadoop file system, or an existing

Scala collection in the driver program, and transforming it

Application Workflow (an example)

Page 17: Hadoop Spark Introduction-20150130

These are RDDs.

RDD (Resilient Distributed Datasets):

a collection of elements partitioned across the nodes of the cluster that

can be operated on in parallel

created by starting with a file in the Hadoop file system, or an existing

Scala collection in the driver program, and transforming it

Users may also ask Spark to persist an RDD in

memory, allowing it to be reused efficiently

across parallel operations.

RDDs automatically recover from node failures

Application Workflow (an example)

Page 18: Hadoop Spark Introduction-20150130

RDDs support only two types of operations: Transformations and Actions

Application Workflow (an example)

Page 19: Hadoop Spark Introduction-20150130

create a new dataset from

an existing one

return a value to the driver program after

running a computation on the dataset

RDDs support only two types of operations: Transformations and Actions

Application Workflow (an example)

Page 20: Hadoop Spark Introduction-20150130

• Laziness: The transformations are only computed when an action requires a

result to be returned to the driver program. This design enables Spark to

run more efficiently.

• You may also persist an RDD in memory using the persist (or cache) method,

in which case Spark will keep the elements around on the cluster for much

faster access the next time you query it.

Application Workflow (an example)

Page 21: Hadoop Spark Introduction-20150130

Outline

What is Spark

Architecture

Application Workflow

Job Scheduling

Submitting Applications

Programming Guide

Page 22: Hadoop Spark Introduction-20150130

Job Scheduling - Scheduling Across Applications

Standalone mode:

By default, applications run in FIFO (first-in-first-out) order, and

each application will try to use all available resource.

You can control the amount of resource an application uses by

setting the following parameters:

Parameters description

spark.cores.max Max number of cores that an

application uses

spark.executor.memory The amount of memory an

executor can use

• Yarn provides a fair scheduler to arrange resource among

applications; however, there is NO such a fair scheduler

available in Spark standalone mode.

Page 23: Hadoop Spark Introduction-20150130

Mesos:

To use static partitioning on Mesos, set the following

parameters:

Job Scheduling - Scheduling Across Applications

Parameters description

spark.mesos.coarse Set to true to use static

resource partitioning

spark.cores.max limit each application’s

resource share as in the

standalone mode.

spark.executor.memory control the executor memory

Page 24: Hadoop Spark Introduction-20150130

Yarn:

Set the following parameters:

Job Scheduling - Scheduling Across Applications

Parameters description

--num-executors option to the Spark YARN

client, controls how many

executors it will allocate on

the cluster

--executor-cores limit the number of cores an

executor can use

--executor-memory control the executor memory

• Note that none of the modes currently provide memory

sharing across applications.

• In future releases, in-memory storage systems such as Tachyon

will provide another approach to share RDDs.

Page 25: Hadoop Spark Introduction-20150130

Some advantages of using YARN

YARN allows you to dynamically share and centrally configure the same pool of cluster resources between all frameworks that run on YARN. You can throw your entire cluster at a MapReduce job, then use some of it on an Impala query and the rest on Spark application, without any changes in configuration.

You can take advantage of all the features of YARN schedulers for categorizing, isolating, and prioritizing workloads.

Spark standalone mode requires each application to run an executor on every node in the cluster - with YARN, you choose the number of executors to use.

YARN is the only cluster manager for Spark that supports security and Kerberized clusters.

a computer network authentication protocol which works on the basis

of 'tickets' to allow nodes communicating over a non-secure network

to prove their identity to one another in a secure manner.

Page 26: Hadoop Spark Introduction-20150130

Job Scheduling - Within an Application

By default, Spark’s scheduler runs jobs (actions) in FIFO fashion.

Each job is divided into “stages” (e.g. map and reduce phases),

The first job gets priority on all available resources while its

stages have tasks to launch, then the second job gets priority, etc.

Spark provides a fair scheduler --- which assigns tasks between jobs

in a “round robin” fashion, so that all jobs get a roughly equal share

of cluster resources.

later jobs may be delayed significantly !

To enable the fair scheduler, simply set

val conf = new SparkConf().setMaster(...).setAppName(...)

conf.set("spark.scheduler.mode", "FAIR")

val sc = new SparkContext(conf)

Note: you need this only if your application consists of more

than one stages

Page 27: Hadoop Spark Introduction-20150130

Supports grouping jobs into pools, and setting different scheduling

options (e.g. weight) for each pool.

Can be used to allocate resource equally among all users regardless

of how many concurrent jobs they have.

Each pool supports three properties:

schedulingMode

weight

minShare

See next page for detailed descriptions.

Job Scheduling - Scheduler Pools

Page 28: Hadoop Spark Introduction-20150130

Job Scheduling - Scheduler Pools

properties description

schedulingMode FIFO or FAIR

weight This controls the pool’s share of the cluster relative to other pools.

By default, all pools have a weight of 1. If you give a specific pool a

weight of 2, for example, it will get 2x more resources as other

active pools. Setting a high weight such as 1000 also makes it

possible to implement priority between pools—in essence, the

weight-1000 pool will always get to launch tasks first whenever it

has jobs active.

minShare Apart from an overall weight, each pool can be given a minimum

shares (as a number of CPU cores) that the administrator would

like it to have. The fair scheduler always attempts to meet all active

pools’ minimum shares before redistributing extra resources

according to the weights. The minShare property can therefore be

another way to ensure that a pool can always get up to a certain

number of resources (e.g. 10 cores) quickly without giving it a high

priority for the rest of the cluster. By default, each

pool’s minShare is 0.

Page 29: Hadoop Spark Introduction-20150130

The pool properties can be set by creating an XML file and setting

a spark.scheduler.allocation.file property in your SparkConf.

Job Scheduling - Scheduler Pools

conf.set("spark.scheduler.allocation.file", "/path/to/file")

The format of the XML file, for example:

<allocations>

<pool name="production">

<schedulingMode>FAIR</schedulingMode>

<weight>1</weight>

<minShare>2</minShare>

</pool>

<pool name="test">

<schedulingMode>FIFO</schedulingMode>

<weight>2</weight>

<minShare>3</minShare>

</pool>

</allocations>

Page 30: Hadoop Spark Introduction-20150130

Outline

What is Spark

Architecture

Application Workflow

Job Scheduling

Submitting Applications

Programming Guide

Page 31: Hadoop Spark Introduction-20150130

Submitting Applications

Once a user application is bundled, it can be launched using the

bin/spark-submit script.

./bin/spark-submit \

--class <main-class>

--master <master-url> \

--deploy-mode <deploy-mode> \

--conf <key>=<value> \

... # other options

<application-jar> \

[application-arguments]

See next page for detailed descriptions for each argument

This is the only way to submit jobs to Spark!

Page 32: Hadoop Spark Introduction-20150130

Submitting Applications Argument Description

--class The entry point for your application

(e.g. org.apache.spark.examples.SparkPi)

--master The master URL for the cluster

(e.g. spark://23.195.26.187:7077)

--deploy-mode Whether to deploy your driver on the worker nodes

(cluster) or locally as an external client (client)

(default: client)

--conf Arbitrary Spark configuration property in key=value

format. For values that contain spaces wrap

“key=value” in quotes (as shown)

application-jar Path to a bundled jar including your application and

all dependencies. The URL must be globally visible

inside of your cluster, for instance, an hdfs:// path or

a file:// path that is present on all nodes.

application-arguments Arguments passed to the main method of your main

class, if any

Page 33: Hadoop Spark Introduction-20150130

Submitting Applications

Something to say about deploy mode:

Question Answer

When to use client mode?

When your want to submit your application

from a gateway machine that is physically co-

located with your worker machines (e.g. Master

node in a standalone EC2 cluster)

When to use cluster mode?

When your application is submitted from a

machine far from the worker machines (e.g.

locally on your laptop)

For minimizing network latency between the

drivers and the executors!

For more information, see

http://spark.apache.org/docs/1.2.0/submitting-applications.html

Page 34: Hadoop Spark Introduction-20150130

Outline

What is Spark

Architecture

Application Workflow

Job Scheduling

Submitting Applications

Programming Guide

Page 35: Hadoop Spark Introduction-20150130

Programming Guide Linking with Spark

Spark 1.2.0 works with Java 6 and higher. If you are using Java 8, Spark

supports lambda expressions for concisely writing functions, otherwise

you can use the classes in the org.apache.spark.api.java.function package.

To write a Spark application in Java, you need to add a dependency

on Spark. Spark is available through Maven Central at

groupId = org.apache.spark artifactId = spark-core_2.10 version = 1.2.0

If you wish to access an HDFS cluster, you need to add a

dependency on hadoop-client for your version of HDFS

groupId = org.apache.hadoop artifactId = hadoop-client version = <your-hdfs-version>

Page 36: Hadoop Spark Introduction-20150130

Programming Guide Linking with Spark

Spark 1.2.0 works with Java 6 and higher. If you are using Java 8, Spark

supports lambda expressions for concisely writing functions, otherwise

you can use the classes in the org.apache.spark.api.java.function package.

To write a Spark application in Java, you need to add a dependency

on Spark. Spark is available through Maven Central at

groupId = org.apache.spark artifactId = spark-core_2.10 version = 1.2.0

If you wish to access an HDFS cluster, you need to add a

dependency on hadoop-client for your version of HDFS

groupId = org.apache.hadoop artifactId = hadoop-client version = <your-hdfs-version>

I recommend you to make use of Maven for

dependency management – it will save you a lot of

time

Page 37: Hadoop Spark Introduction-20150130

Programming Guide The first thing a Spark program must do is to create a

JavaSparkContext object, which tells Spark how to access a cluster.

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);

JavaSparkContext sc = new JavaSparkContext(conf);

Next, create RDDs, this can be done by

parallelizing an existing collection in your driver program

or referencing a dataset in a shared filesystem, HDFS, HBase, or

any data source offering a Hadoop InputFormat

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);

JavaRDD<Integer> distData = sc.parallelize(data);

JavaRDD<String> distFile = sc.textFile("data.txt");

This method takes an URI for the file (either a local path on the machine, or a

hdfs://, s3n://, etc URI) and reads it as a collection of lines

Page 38: Hadoop Spark Introduction-20150130

Programming Guide

Next, use RDD operations

JavaRDD<String> lines = sc.textFile("data.txt");

JavaRDD<Integer> lineLengths = lines.map(s -> s.length());

int totalLength = lineLengths.reduce((a, b) -> a + b);

The first line defines a base RDD from an external file. This

dataset is not loaded in memory or otherwise acted on: lines is

merely a pointer to the file

The second line defines lineLengths as the result of a map

transformation. Again, lineLengths is not immediately computed,

due to laziness.

“reduce” is an action. At this point Spark breaks the computation into

tasks to run on separate machines, and each machine runs both its

part of the map and a local reduction, returning only its answer to the

driver program

Page 39: Hadoop Spark Introduction-20150130

Programming Guide

If we also want to use lineLengths again later, we could add

lineLengths.persist();

before calling “reduce”, which would cause lineLengths to be

saved in memory after the first time it is computed.

For more programming guide, see

http://spark.apache.org/docs/1.2.0/programming-guide.html

Page 40: Hadoop Spark Introduction-20150130

Issues I encountered in using Spark

When there are more than (approx.) 120 input sequenceFiles

the Spark task may hang there (forever or several minutes

before it fails).

This is a bug, the workaround is to use “coalesce”, which

circumvents the bug and, in addtion, improves the computation

performance

If you want to run Spark on Yarn, your native libs must be placed in

the right place, see discussion here

http://apache-spark-user-list.1001560.n3.nabble.com/how-to-run-

spark-job-on-yarn-with-jni-lib-td15146.html

In Yarn, I find no way to retrieve job status.

In Standalone mode, I grab job status from parsing html string in

WebUI – not good, but I have no other way but this way.

Page 41: Hadoop Spark Introduction-20150130

Issues I encountered in using Spark

Stability and pressure tests are not conducted yet.

I don’t know what will happen if we let Spark stand there for 2

weeks, 3 weeks, or even longer.

I don’t know what will happen if we run a task with data >= 100G,

or run 10 consecutive tasks with each handling 10G data

HA – you may run may Spark masters (one active and the others

standby) by using Zookeeper for failover, but how to get job status

if the active master goes down?

Page 42: Hadoop Spark Introduction-20150130

Thank you!

42