effective spark on multi-tenant clusters

47
1 © Cloudera, Inc. All rights reserved. Effective Spark on Multi-Tenant Clusters Kostas Sakellis

Upload: dataworks-summithadoop-summit

Post on 16-Apr-2017

1.090 views

Category:

Technology


0 download

TRANSCRIPT

1© Cloudera, Inc. All rights reserved.

Effective Spark on Multi-Tenant ClustersKostas Sakellis

2© Cloudera, Inc. All rights reserved.

Me

• Spark Tech Lead Manager at Cloudera•Contributed to Apache Spark•Previously, stint on Cloudera Manager

3© Cloudera, Inc. All rights reserved.

Challenges

•Predictable execution time of Spark jobs•Prevent Starvation

•Optimal cluster utilization• Secure Data access•Configuration Management

4© Cloudera, Inc. All rights reserved.

Spark on YARN

5© Cloudera, Inc. All rights reserved.

Why YARN?

• Spark supports pluggable Cluster Managers• local, Standalone, YARN and Mesos

• YARN contains proper resource manager•Enables multi-platform jobs

• Spark on YARN is mature with active community

6© Cloudera, Inc. All rights reserved.

Running an application

spark-submit --master yarn-cluster

--executor-memory 2g

--num-executors 3

--num-cores 2

<your-class>

7© Cloudera, Inc. All rights reserved.

Host-b.mydomain.com

System Architecturehost-a.mydomain.com

Resource Manager

Node Manager

Host-c.mydomain.com

Node Manager

Node Manager

Container

App Master

Exec2

Exec1

Exec3

Driver

Driver

Exec1 Exec2

8© Cloudera, Inc. All rights reserved.

Gotchas

• Ensure compatible YARN configuration• yarn.nodemanager.resource.[memory-mb|cpu-vcores]• yarn.scheduler.maximum-allocation-[vcores|mb]• ...

•Remember overhead memory• spark.yarn.executor.memoryOverhead •Default of 10% since Spark 1.4

9© Cloudera, Inc. All rights reserved.

Container [pid=63375,containerID=container_1388158490598_0001_01_000003] is running beyond physical memory limits. Current usage: 2.1 GB of 2 GB physical memory used; 2.8 GB of 4.2 GB virtual memory used. Killing container. [...]

Otherwise…

10© Cloudera, Inc. All rights reserved.

Container [pid=63375,containerID=container_1388158490598_0001_01_000003] is running beyond physical memory limits. Current usage: 2.1 GB of 2 GB physical memory used; 2.8 GB of 4.2 GB virtual memory used. Killing container. [...]

Otherwise…

11© Cloudera, Inc. All rights reserved.

Host-b.mydomain.com

System Architecturehost-a.mydomain.com

Resource Manager

Node Manager

Host-c.mydomain.com

Node Manager

Node Manager

Exec2

Exec1

Exec3

Driver

Driver

Exec1 Exec2

Exec3

Exec2

Exec1

Driver

12© Cloudera, Inc. All rights reserved.

How do we share a common resource?

Courtesy of: https://radioglobalistic.files.wordpress.com/2011/02/lagos-traffic.jpg

13© Cloudera, Inc. All rights reserved.

Resource Management

• YARN has ability to create resource queues•Priorities can be set per queues

•Preemption is also available•Fixed in Spark 1.6 (SPARK-8167)• yarn.scheduler.fair.preemption

14© Cloudera, Inc. All rights reserved.

Running an application

spark-submit --master yarn-cluster

--queue my-special-queue

--executor-memory 2g

--num-executors 3

--num-cores 2

<your-class>

15© Cloudera, Inc. All rights reserved.

How about locality?

Courtesy of: https://radioglobalistic.files.wordpress.com/2011/02/lagos-traffic.jpgCourtesy of: https://blog.voxbone.com/wp-content/uploads/2015/07/think-global-act-local.jpg

16© Cloudera, Inc. All rights reserved.

ExecutorExecutor

Task Scheduling

Driver Executor

DAG Scheduler

Task Scheduler

Core

TaskTask

Shuffle

Shuffle

stagestageStage

Spark Context JobJobJob

17© Cloudera, Inc. All rights reserved.

Host-b.mydomain.com

Localityhost-a.mydomain.com

Resource Manager

Node Manager

HDFS

x:B1 x:B2 y:B1 y:B3

Host-c.mydomain.com

Node Manager

Node Manager

HDFS

x:B3 x:B2 y:B2 y:B3

HDFS

x:B3 x:B1 y:B1 y:B2

hdfs://x

hdfs://y

Exec2

Exec1Driver

18© Cloudera, Inc. All rights reserved.

Spark creates executors before executing code!

19© Cloudera, Inc. All rights reserved.

Underutilized Clusters

Courtesy of: http://media.nbclosangeles.com/images/1200*675/60-freeway-repair-dec16-2-empty.JPG

20© Cloudera, Inc. All rights reserved.

Dynamic Allocation

• Spark applications scale the number of executors based on load•Removes need for: --num-executors• Idle executors get killed

• First supported in CDH 5.4• Ideal for:•Long ETL jobs with large shuffles• shell applications: hive and spark shell

21© Cloudera, Inc. All rights reserved.

Task Scheduling

Driver

DAG Scheduler

Task Scheduler

stagestageStage

Spark Context JobJobJob

host-a.mydomain.com

Node Manager

Exec1

host-b.mydomain.com

Node Manager

Exec2

host-c.mydomain.com

Node Manager

Task

TaskExec3

Task

Task

RM

22© Cloudera, Inc. All rights reserved.

Dynamic Allocation Configuration

•Many Knobs• spark.dynamicAllocation.enabled• spark.dynamicAllocation.[min|max|initial]Executors• spark.dynamicAllocation.executorIdleTimeout• spark.dynamicAllocation.cachedExecutorIdleTimeout• ...

• --num-executors will disable dynamic allocation

23© Cloudera, Inc. All rights reserved.

Dynamic Allocation Limitations

• Still required to specify cores•--num-cores

•Memory•--executor-memory• Includes JVM overhead

•Caching• spark.dynamicAllocation.cachedExecutorIdleTimeout

24© Cloudera, Inc. All rights reserved.

The Future of Dynamic Allocation

•Only “task size” needed: --task-size• Eliminates•--num-cores•--num-executors•--executor-memory

• Leads to better cluster utilization

25© Cloudera, Inc. All rights reserved.

Dynamic Allocation respects Locality!

26© Cloudera, Inc. All rights reserved.

Security, oh no!

Courtesy of: https://www.iti.illinois.edu/sites/default/files/Cybersecurity_image.jpg

27© Cloudera, Inc. All rights reserved.

Security

• Shared resources -> Shared data• Security has many facets•Encryption•Authentication•Authorization

• Encryption is interesting for multi-tenant clusters

28© Cloudera, Inc. All rights reserved.

Encryption

Who’s looking at the data?

29© Cloudera, Inc. All rights reserved.

Data Flow in Spark

Driver

Executor

Executor

Spark Submit

Control Plane

File Distribution

Shuffle Blocks

UI

Disk

DiskSpilled/Shuffle Blocks

30© Cloudera, Inc. All rights reserved.

Prior to Spark 1.6

•Different channel, different method•Control plane• File distribution• Shuffle Blocks•User UI / REST API• Spilled/Shuffle Blocks

SSLSSLSASL EncryptionNo EncryptionUse encrypfs (or equivalent)

31© Cloudera, Inc. All rights reserved.

What is wrong with SSL?

32© Cloudera, Inc. All rights reserved.

Why not SSL?

• SSL can be hard to set up•Need certificates readable on every node• Sharing certificates not as secure•Hard to have per-user certificate

33© Cloudera, Inc. All rights reserved.

Spark 1.6

• Standardize around a common transport library•Replaces Akka RPC (SPARK-6028)•Replaces HTTP File service (SPARK-11140)•Uses Netty transport library with SASL Encryption

•But..•WebUI still has no encryption•Shuffle / Spilled blocks still require FS-level encryption•SASL in JVM restricted to 3DES – not very strong and slow

34© Cloudera, Inc. All rights reserved.

Spark 2.0

•REPL class distribution using transport lib (SPARK-11563)•HTTPS Support for WebUI (SPARK-2750)• Encrypting spilled blocks is almost available (SPARK-5682)•Depends on third party Chimera library for encryption•Work is being done to add Chimera to Apache Commons

• Future:•Use Chimera to encrypt over-the-wire data

35© Cloudera, Inc. All rights reserved.

Gateways: launching Spark Application

Courtesy of: http://www.gottardo2016.ch/sites/default/files/styles/hero/public/parallax_story_8_tunnelsystem.jpg?itok=p2Mtg5be

36© Cloudera, Inc. All rights reserved.

Host-b.mydomain.com

Spark Gateway

Resource Manager

Host-c.mydomain.com

Node Manager

Node Manager

gateway-a.mydomain.com

Bob Client

Client Configs

Spark Install

RandomPorts

Driver

Exec1 Exec2

Exec1 Driver

SSH

37© Cloudera, Inc. All rights reserved.

Gateway Considerations

•Gateway hosts actively managed by administrators•Updates to client configurations and Spark installs

•Users need to tunnel into network•Difficult to put users behind firewall

• YARN allows different Spark versions•spark.yarn.jar or spark.yarn.archive•Shared Spark services makes this difficult

38© Cloudera, Inc. All rights reserved.

Host-b.mydomain.com

Shared Services

Resource Manager

Host-c.mydomain.com

Node Manager

Node Manager

gateway-a.mydomain.com

Bob Client

Client Configs

Spark Install

RandomPorts

Driver

Exec1 Exec2

Exec1 Driver

SSH

SS

SS

History Service

39© Cloudera, Inc. All rights reserved.

Alternative

An open source Apache licensed REST web service that manages long running Spark contexts in your cluster

40© Cloudera, Inc. All rights reserved.

Livy Architecture

Rest Server

Cluster Manager

Driver ExecutorExecutor

Client

Driver ExecutorExecutor

The Managed ClusterHTTP

Context 1

Context 2

Context 2

Context 1

41© Cloudera, Inc. All rights reserved.

Case 1: Spark Application JAR Submission

• Enables spark applications to be submitted without needing a Spark installation•Basically a wrapper around spark-submit

% curl –XPOST localhost:8998/batches -d '{ "file": "<path_to_file>", “className”: “com.foo.bar..” ...}'

42© Cloudera, Inc. All rights reserved.

How do you retrieve results?

43© Cloudera, Inc. All rights reserved.

Case 2: Fine grained Job submission

•Programmatic submission of Spark jobs to a long running application•A thin Java (and Scala) client available for easier integration•Provides automatic serialization/deserialization

• Enables Web/Mobile applications to use Spark as a backend

44© Cloudera, Inc. All rights reserved.

Case 2: Example// Create Livy ClientLivyClient client = new LivyClientBuilder(false) .setURI(new URI(”<uri>")) .setAll(<config>) .build()

// JobHandle allows monitoring of jobsJobHandle<Long> handle = client.submit(new YourJob());

// Block until results are returnedhandle.get(TIMEOUT, TimeUnit.SECONDS)

// Close connectionsclient.stop()

45© Cloudera, Inc. All rights reserved.

Case 2: Example

private static class YourJob implements Job<Long> { @Override public Long call(JobContext jc) { ArrayList<Long> list = Arrays.asList(1, 2, 3, 4, 5); JavaRDD<Integer> rdd = jc.sc().parallelize(list); return rdd.count(); }}

// Job Interface to Implementpublic interface Job<T> extends Serializable { T call(JobContext jc) throws Exception;}

46© Cloudera, Inc. All rights reserved.

Contributions Welcome!

•http://livy.io/•Code: https://github.com/cloudera/livy• JIRA: https://issues.cloudera.org/browse/LIVY•Users: http://groups.google.com/a/cloudera.org/group/livy-user•Dev: http://groups.google.com/a/cloudera.org/group/livy-dev

47© Cloudera, Inc. All rights reserved.

Thank you