scaling python to cpus and gpus

115
Scaling Python to GPUs and CPUs Stanford Stats 285 October 30, 2017 Travis E. Oliphant President, Chief Data Scientist, Co-founder Anaconda, Inc. 1

Upload: travis-oliphant

Post on 28-Jan-2018

1.297 views

Category:

Software


1 download

TRANSCRIPT

Page 1: Scaling Python to CPUs and GPUs

Scaling Python to GPUs and CPUs

Stanford Stats 285

October 30, 2017

Travis E. Oliphant

President, Chief Data Scientist, Co-founder Anaconda, Inc.

1

Page 2: Scaling Python to CPUs and GPUs

My Background

2

• MS/BS degrees in Elec. Comp. Engineering

• PhD from Mayo Clinic in Biomedical Engineering

(Ultrasound and MRI)

• Creator and Developer of SciPy (1998-2009)

• Professor at BYU (2001-2007) Inverse Problems

• Creator and Developer of NumPy (2005-2012)

• Started Numba (2012)

• Founder of NumFOCUS / PyData

• Python Software Foundation Director (2012)

• Co-founder of Continuum Analytics => Anaconda, Inc.

• CEO (2012) => Chief Data Scientist (2017)

SciPy

Page 3: Scaling Python to CPUs and GPUs

3

Empower domain experts with high-level tools that

exploit modern hard-ware

Array Oriented Computing

expertise

Page 4: Scaling Python to CPUs and GPUs

• Express domain knowledge

directly in arrays (tensors,

matrices, vectors) --- easier to

teach programming in domain

• Can take advantage of

parallelism and accelerators

• Array expressions

Why Array-oriented computing

4

Object

Attr1

Attr2

Attr3

Object

Attr1

Attr2

Attr3

Object

Attr1

Attr2

Attr3

Object

Attr1

Attr2

Attr3

Object

Attr1

Attr2

Attr3

Object

Attr1

Attr2

Attr3

Attr1 Attr2 Attr3

Object1

Object2

Object3

Object4

Object5

Object6

Page 5: Scaling Python to CPUs and GPUs

• Today’s vector machines (and vector co-processors, or GPUS) were

made for array-oriented computing.

• The software stack has just not caught up --- unfortunate because

APL came out in 1963.

• There is a reason Fortran remains popular.

More reasons for array-oriented

5

Page 6: Scaling Python to CPUs and GPUs

6

Python and in particular PyData is Growing

Page 7: Scaling Python to CPUs and GPUs

7

Python’s Scientific Stack

Page 8: Scaling Python to CPUs and GPUs

8

Python’s Scientific Stack

Page 9: Scaling Python to CPUs and GPUs

9

Bokeh

Python’s Scientific Stack

Page 10: Scaling Python to CPUs and GPUs

10

Bokeh

Python’s Scientific Stack

Page 11: Scaling Python to CPUs and GPUs

Python’s Scientific Ecosystem

11

(and

many,

many

more)

Bokeh

Page 12: Scaling Python to CPUs and GPUs

12

Page 13: Scaling Python to CPUs and GPUs

13

Bringing

Technology

Together

Page 14: Scaling Python to CPUs and GPUs

Data Science Workflow

14

New Data

NotebooksUnderstand Data

Getting Data

Understand World

Reports

Microservices

Dashboards

Applications

Decisions

and

ActionsModels

Exploratory Data Analysis and Viz

Data Products

Page 15: Scaling Python to CPUs and GPUs

15

New Data

NotebooksUnderstand Data

Getting Data

Understand World

Reports

Microservices

Dashboards

Applications

Decisions

and

ActionsModels

Exploratory Data Analysis and Viz

Data ProductsData Science Workflow

Page 16: Scaling Python to CPUs and GPUs

Machine Learning Explosion

16

Scikit-Learn

Tensorflow

Keras

XGBoost

theano

lasagne

caffe/caffe2

torch

mxnet / minpy

neon

CNTK

DAAL

Chainer

Dynet

Apache Singa

Shogun

https://github.com/josephmisiti/awesome-machine-learning#python-general-purpose

http://deeplearning.net/software_links/

http://scikit-learn.org/stable/related_projects.html

Page 17: Scaling Python to CPUs and GPUs

17

A Representation of Packages for ML

© 2017 Anaconda, Inc. - Confidential & Proprietary

Page 18: Scaling Python to CPUs and GPUs

18

NumPy and Packages that Depend on It

© 2017 Anaconda, Inc. - Confidential & Proprietary

Page 19: Scaling Python to CPUs and GPUs

19

pandas Depends on NumPy(and other packages depend on pandas)

© 2017 Anaconda, Inc. - Confidential & Proprietary

Page 20: Scaling Python to CPUs and GPUs

20

Caffe Depends on pandas and NumPy

© 2017 Anaconda, Inc. - Confidential & Proprietary

Page 21: Scaling Python to CPUs and GPUs

Embrace Innovation Without Anarchy

21

From http://www.slideshare.net/RevolutionAnalytics/r-at-microsoft

Reproducibility

Page 22: Scaling Python to CPUs and GPUs

22

Conda

Conda Forge

Conda Environments

Anaconda Project

A cross-platform and language agnostic package and

environment manager

A community-led collection of recipes, build

infrastructure, and packages for conda.

Custom isolated software sandboxes to allow easy

reproducibility and sharing of data-science work.

Reproducible, executable project directories

Page 23: Scaling Python to CPUs and GPUs

• Language independent

• Platform independent

• No special privileges required

• No VMs or containers

• Enables:- Reproducibility- Collaboration- Scaling

“conda – package everything”

23

A

Python v2.7

Conda Sandboxing TechnologyB

Python

v3.4

Pandas

v0.18

Jupyter

C

R

R

Essentials

conda

NumPy

v1.11

NumPy

v1.10

Pandas

v0.16

Page 24: Scaling Python to CPUs and GPUs

24

$ anaconda-project run plot —show

conda install tensorflow

Page 25: Scaling Python to CPUs and GPUs

Basic Conda Usage

25

Install a package conda install sympy

List all installed packages conda list

Search for packagesconda search llvm

Create a new environmentconda create -n py3k python=3

Remove a packageconda remove nose

Get helpconda install --help

Page 26: Scaling Python to CPUs and GPUs

Advanced Conda Usage

26

Install a package in an environment conda install -n py3k sympy

Update all packages conda update --all

Export list of packages conda list --export packages.txt

Install packages from an export conda install --file packages.txt

See package history conda list --revisions

Revert to a revision conda install --revision 23

Remove unused packages and cached

tarballsconda clean -pt

Page 27: Scaling Python to CPUs and GPUs

27

Development Deployment

Conda eases rapid deployment

Page 28: Scaling Python to CPUs and GPUs

NumPy

28

Page 29: Scaling Python to CPUs and GPUs

Without NumPy

29

from math import sin, pi

def sinc(x):

if x == 0:

return 1.0

else:

pix = pi*x

return sin(pix)/pix

def step(x):

if x > 0:

return 1.0

elif x < 0:

return 0.0

else:

return 0.5

functions.py

>>> import functions as f

>>> xval = [x/3.0 for x in

range(-10,10)]

>>> yval1 = [f.sinc(x) for x

in xval]

>>> yval2 = [f.step(x) for x

in xval]

Python is a great language but needed a way to operate quickly and cleanly over multi-dimensional arrays.

Page 30: Scaling Python to CPUs and GPUs

With NumPy

30

from numpy import sin, pi

from numpy import vectorize

import functions as f

vsinc = vectorize(f.sinc)

def sinc(x):

pix = pi*x

val = sin(pix)/pix

val[x==0] = 1.0

return val

vstep = vectorize(f.step)

def step(x):

y = x*0.0

y[x>0] = 1

y[x==0] = 0.5

return y

>>> import functions2 as f

>>> from numpy import *

>>> x = r_[-10:10]/3.0

>>> y1 = f.sinc(x)

>>> y2 = f.step(x)

functions2.py

Offers N-D array, element-by-element functions, and basic random numbers, linear algebra, and FFT capability for Python

http://numpy.org

Fiscally sponsored by NumFOCUS

Page 31: Scaling Python to CPUs and GPUs

NumPy: an Array Extension of Python

31

• Data: the array object

– slicing and shaping

– data-type map to Bytes

• Fast Math (ufuncs):

– vectorization

– broadcasting

– aggregations

Page 32: Scaling Python to CPUs and GPUs

shape

NumPy Array

32

Key Attributes• dtype

• shape

• ndim

• strides

• data

Page 33: Scaling Python to CPUs and GPUs

NumPy Examples

33

2d array

3d array

[439 472 477]

[217 205 261 222 245 238]

9.98330639789 2.96677717122

Page 34: Scaling Python to CPUs and GPUs

NumPy Slicing (Selection)

34

>>> a[0,3:5]

array([3, 4])

>>> a[4:,4:]

array([[44, 45],

[54, 55]])

>>> a[:,2]

array([2,12,22,32,42,52])

>>> a[2::2,::2]

array([[20, 22, 24],

[40, 42, 44]])

Page 35: Scaling Python to CPUs and GPUs

Summary

35

• Provides foundational N-dimensional array composed

of homogeneous elements of a particular “dtype”

• The dtype of the elements is extensive (but difficult to

extend)

• Arrays can be sliced and diced with simple syntax to

provide easy manipulation and selection.

• Provides fast and powerful math, statistics, and linear

algebra functions that operate over arrays.

• Utilities for sorting, reading and writing data also

provided.

Page 36: Scaling Python to CPUs and GPUs

Scaling Up and Out with Numba and Dask

36

Page 37: Scaling Python to CPUs and GPUs

Scale Up vs Scale Out

37

Big Memory &

Many Cores

/ GPU Box

Best of Both

(e.g. GPU Cluster)

Many commodity

nodes in a cluster

Scale

Up

(Big

ger

Nodes)

Scale Out

(More Nodes)

Numba

Dask

Dask with Numba

Page 38: Scaling Python to CPUs and GPUs

© 2017 Anaconda, Inc. - Confidential & Proprietary

Development

Name Latest

Release

Number of

Releases

GitHub Stars Contributors Downloads in

T12m

numba 0.35 92 2647 74 1.8m

dask 0.15.4 38 2001 112 1.2m

dask-ml 0.3.1 6 104 4

Numba

Dask

Dask-ml

http://numba.pydata.org http://github.com/numba

http://dask.pydata.org http://github.com/dask

http://dask-ml.readthedocs.io/en/latest/index.htmlhttp://github.com/dask/dask-ml

Page 39: Scaling Python to CPUs and GPUs

Numba

39

Page 40: Scaling Python to CPUs and GPUs

Numba (compile Python to CPUs and GPUs)

40

conda install numba

Intermediate

Representatio

n (IR)

x86

ARM

PTX

Python

LLVMNumba

Code Generation Backend

Parsing Frontend

Page 41: Scaling Python to CPUs and GPUs

41

@jit('void(f8[:,:],f8[:,:],f8[:,:])')

def filter(image, filt, output):

M, N = image.shape

m, n = filt.shape

for i in range(m//2, M-m//2):

for j in range(n//2, N-n//2):

result = 0.0

for k in range(m):

for l in range(n):

result += image[i+k-m//2,j+l-n//2]*filt[k, l]

output[i,j] = result

~1500x speed-up

Image Processing

Page 42: Scaling Python to CPUs and GPUs

Works with and does not replace the standard Python interpreter

(all of your existing Python libraries are still available)

Numba Features

42

Page 43: Scaling Python to CPUs and GPUs

Example: Filter an array

43

Page 44: Scaling Python to CPUs and GPUs

Example: Filter an array

44

Array Allocation

Looping over ndarray x as an iterator

Using numpy math functions

Returning a slice of the array

Numba decorator

(nopython=True not required)

2.7x Speedup

over NumPy!

Page 45: Scaling Python to CPUs and GPUs

NumPy UFuncs and GUFuncs

45

NumPy ufuncs (and gufuncs) are functions that operate “element-wise” (or “sub-dimension-wise”) across an array without an explicit loop.

This implicit loop (which is in machine code) is at the core of why NumPy is fast. Dispatch is done internally to a particular code-segment based on the type of the array. It is a very powerful abstraction in the PyData stack.

Making new fast ufuncs used to be only possible in C — painful!

With numba.vectorize and numba.guvectorize it is now easy!

The inner secrets of NumPy are now at your finger-tips for you to make your own magic!

Page 46: Scaling Python to CPUs and GPUs

Simple Ufunc

46

@vectorize

def dot2(a,b,x,y):

return a*x + b*y

>>> a, b, x, y = np.random.randn(4,1000)

>>> z = a * x + b * y

>>> z2 = dot2(a, b, x, y) # faster

Faster (especially) as N grows because it does not create temporaries.NumPy creates temporary arrays for intermediate results.Numba creates a fast machine-code kernel from the Python template

and calls it for every element in the arrays.

Page 47: Scaling Python to CPUs and GPUs

Generalized Ufunc

47

@guvectorize(‘f8[:], f8[:], f8[:]’, ‘(n),(n)->()’)

def dot2(a,b,c):

c[0]=a[0]*b[0] + a[1]*b[1]

>>> a, b = np.random.randn(10000,2), np.random.randn(10000,2)

>>> z1 = np.einsum(‘ij,ij->i’, a, b)

>>> z2 = dot2(a, b) # uses last dimension as in each kernel

This can create quite a bit of computation with very little code.

Numba creates a fast machine-code kernel from the Python templateand calls it for every element in the arrays.

3.8x faster

Page 48: Scaling Python to CPUs and GPUs

48

Perform a computationon a finite window of the input.

For a linear system, this is a FIR filter and what np.convolve or sp.signal.lfilter can do.

But what if you want some arbitrary computation like a windowed median filter.

Example: Making a windowed compute filter

Page 49: Scaling Python to CPUs and GPUs

49

Hand-coded implementation

Build a ufunc for the kernel which is faster for large arrays!

This can now run easily on GPUwith ‘target=cuda’ and many-cores ‘target=parallel’

Array-oriented!

Example: Making a windowed compute filter

Page 50: Scaling Python to CPUs and GPUs

50

Example: Making a windowed compute filter

Page 51: Scaling Python to CPUs and GPUs

1. Create a realistic benchmark test case.

(Do not use your unit tests as a benchmark!)

2. Run a profiler on your benchmark.

(cProfile is a good choice)

3. Identify hotspots that could potentially be compiled by Numba with a little refactoring.

(see online documentation)

4. Apply @numba.jit, @numba.vectorize, and @numba.guvectorize as needed to critical

functions. (Small rewrites may be needed to work around Numba limitations.)

5. Re-run benchmark to check if there was a performance improvement.

6. Use target=parallel to get access to multiple cores (or target=cuda if you have a

GPU)

How to Use Numba

51

Page 52: Scaling Python to CPUs and GPUs

How Numba works

52

Bytecode

Analysis

Python Function

(bytecode)

Function

Arguments

Type

Inference

Numba IR

LLVM IRMachine

Code

@jit

def do_math(a,b):

>>> do_math(x, y)

Cache

Execute!

Rewrite IR

Lowering

LLVM JIT

Page 53: Scaling Python to CPUs and GPUs

7 things about Numba you may not know

53

1

2

3

4

5

6

7

Numba is 100% Open Source

Numba + Jupyter = Rapid

CUDA Prototyping

Numba can compile for the

CPU and the GPU at the same time

Numba makes array processing

easy with @(gu)vectorize

Numba comes with a

CUDA Simulator

You can send Numba

functions over the network

Numba developers are working

On a GPU DataFrame (pygdf)

Page 54: Scaling Python to CPUs and GPUs

Numba is quite popular!

54

A numba mailing list reports experiments of a SciPy author who got 2x speed-up by removing their Cython type annotations and surrounding function with numba.jit (with a few minor changes needed to the code).

With Numba’s ahead-of-time compilation one can use Numba to create a library that you ship to others (who then don’t need to have Numba installed). This is not as clean as it could be.

SciPy (and NumPy) would look very different in Numba had existed 16 years ago when SciPy was getting started — and the PyPy crowd would be happier.

Page 55: Scaling Python to CPUs and GPUs

Releasing the GIL

55

Only nopython

mode functions

can release the

GIL

Page 56: Scaling Python to CPUs and GPUs

Releasing the GIL

56

2.8x speedup with 4

cores

Page 57: Scaling Python to CPUs and GPUs

CUDA Python (in open-source Numba!)

57

CUDA Development

using Python syntax for

optimal performance!

10-20x faster than CPU

You have to understand

CUDA at least a little —

writing kernels that launch in

parallel on the GPU

Page 58: Scaling Python to CPUs and GPUs

Classic Example

58

from numba import jit

@jit

def mandel(x, y, max_iters):

c = complex(x,y)

z = 0j

for i in range(max_iters):

z = z*z + c

if z.real * z.real + z.imag * z.imag >= 4:

return 255 * i // max_iters

return 255

Mandelbrot

Page 59: Scaling Python to CPUs and GPUs

The Basics

59

CPython 1x

Numpy array-wide operations 13x

Numba (CPU) 120x

Numba (NVidia Tesla K20c) 2100x

Mandelbrot

Page 60: Scaling Python to CPUs and GPUs

Other topics

60

CUDA Python — write general GPU kernels with Python

Device Arrays — manage memory transfer from host to GPU

Streaming — manage asynchronous and parallel GPU compute streams

CUDA Simulator in Python — to help debug your kernels

HSA Support — early support for HSA-based GPUs and APUs

Pyculib — access to cuFFT, cuBLAS, cuSPARSE, cuRAND, CUDA Sorting

https://github.com/ContinuumIO/gtc2017-numba

Page 61: Scaling Python to CPUs and GPUs

Dask

61

Page 62: Scaling Python to CPUs and GPUs

• Designed to parallelize the Python ecosystem

• Handles complex algorithms

• Co-developed with Pandas/SKLearn/Jupyter teams

• Familiar APIs for Python users

• Scales

• Scales from multicore to 1000-node clusters

• Resilience, responsive, and real-time

Page 63: Scaling Python to CPUs and GPUs

• Parallelizes NumPy, Pandas, SKLearn

• Satisfies subset of these APIs

• Uses these libraries internally

• Co-developed with these teams

• Task scheduler supports custom algorithms

• Parallelize existing code

• Build novel real-time systems

• Arbitrary task graphs with data dependencies

• Same scalability

Page 64: Scaling Python to CPUs and GPUs

demo video

• High level: Scaling Pandas

• Same Pandas look and feel

• Uses Pandas under the hood

• Scales nicely onto many machines

• Low level: Arbitrary task scheduling

• Parallelize normal Python code

• Build custom algorithms

• React real-time

• Demo deployed with

• dask-kubernetes Google Compute Engine

• github.com/dask/dask-kubernetes

• Youtube link

• https://www.youtube.com/watch?v=ods97a5Pzw0&

Page 65: Scaling Python to CPUs and GPUs

Why do people choose Dask?• Familiar with Python:

• Drop-in NumPy/Pandas/SKLearn APIs

• Native memory environment

• Easy debugging and diagnostics

• Have complex problems:

• Parallelize existing code without expensive rewrites

• Sophisticated algorithms and systems

• Real-time response to small-data

• Scales up and down:

• Scales to 1000-node clusters

• Also runs cheaply on a laptop

#import pandas as pd

import dask.dataframe as dd

Page 66: Scaling Python to CPUs and GPUs

Dask

66

• Started as part of Blaze in early 2014.

• General parallel programming engine

• Flexible and therefore highly suited for

• Commodity Clusters

• Advanced Algorithms

• Wide community adoption and use

conda install -c conda-forge dask

pip install dask[complete] distributed --upgrade

Page 67: Scaling Python to CPUs and GPUs

67

Big DataSmall Data

Numba

Page 68: Scaling Python to CPUs and GPUs

Dask: From User Interaction to Execution

68

delayed

Page 70: Scaling Python to CPUs and GPUs

Dask is a Python parallel computing library that is:

• Familiar: Implements parallel NumPy and Pandas objects

• Fast: Optimized for demanding for numerical applications

• Flexible: for sophisticated and messy algorithms

• Scales up: Runs resiliently on clusters of 100s of machines

• Scales down: Pragmatic in a single process on a laptop

• Interactive: Responsive and fast for interactive data science

Dask complements the rest of Anaconda. It was developed with

NumPy, Pandas, and scikit-learn developers.

Overview of Dask

70

Page 71: Scaling Python to CPUs and GPUs

x.T - x.mean(axis=0)

df.groupby(df.index).value.mean()def load(filename):def clean(data):def analyze(result):

Dask array (mimics NumPy)

Dask dataframe (mimics Pandas) Dask delayed (wraps custom code)

b.map(json.loads).foldby(...)

Dask bag (collection of data)

Dask Collections: Familiar Expressions and API

71

Page 72: Scaling Python to CPUs and GPUs

72

>>> import pandas as pd

>>> df = pd.read_csv('iris.csv')

>>> df.head()

sepal_length sepal_width petal_length petal_width species

0 5.1 3.5 1.4 0.2 Iris-setosa

1 4.9 3.0 1.4 0.2 Iris-setosa

2 4.7 3.2 1.3 0.2 Iris-setosa

3 4.6 3.1 1.5 0.2 Iris-setosa

4 5.0 3.6 1.4 0.2 Iris-setosa

>>> max_sepal_length_setosa = df[df.species

== 'setosa'].sepal_length.max()

5.7999999999999998

>>> import dask.dataframe as dd

>>> ddf = dd.read_csv('*.csv')

>>> ddf.head()

sepal_length sepal_width petal_length petal_width species

0 5.1 3.5 1.4 0.2 Iris-setosa

1 4.9 3.0 1.4 0.2 Iris-setosa

2 4.7 3.2 1.3 0.2 Iris-setosa

3 4.6 3.1 1.5 0.2 Iris-setosa

4 5.0 3.6 1.4 0.2 Iris-setosa

>>> d_max_sepal_length_setosa = ddf[ddf.species

== 'setosa'].sepal_length.max()

>>> d_max_sepal_length_setosa.compute()

5.7999999999999998

Dask DataFrame is like Pandas

Page 73: Scaling Python to CPUs and GPUs

New Spark/Hadoop clusters

• Create and provision a Spark/Hadoop cluster with a few simple steps

• Work on the cloud or with your existing in-house servers

Dask Graphs: Example Machine Learning Pipeline

73

Page 74: Scaling Python to CPUs and GPUs

Example 1: Using Dask DataFrames on a cluster with CSV data

74

• Built from Pandas DataFrames

• Match Pandas interface

• Access data from HDFS, S3, local, etc.

• Fast, low latency

• Responsive user interface

Page 75: Scaling Python to CPUs and GPUs

75

>>> import numpy as np

>>> np_ones = np.ones((5000, 1000))

>>> np_ones

array([[ 1., 1., 1., ..., 1., 1., 1.],

[ 1., 1., 1., ..., 1., 1., 1.],

[ 1., 1., 1., ..., 1., 1., 1.],

...,

[ 1., 1., 1., ..., 1., 1., 1.],

[ 1., 1., 1., ..., 1., 1., 1.],

[ 1., 1., 1., ..., 1., 1., 1.]])

>>> np_y = np.log(np_ones + 1)[:5].sum(axis=1)

>>> np_y

array([ 693.14718056, 693.14718056,

693.14718056, 693.14718056, 693.14718056])

>>> import dask.array as da

>>> da_ones = da.ones((5000000, 1000000),

chunks=(1000, 1000))

>>> da_ones.compute()

array([[ 1., 1., 1., ..., 1., 1., 1.],

[ 1., 1., 1., ..., 1., 1., 1.],

[ 1., 1., 1., ..., 1., 1., 1.],

...,

[ 1., 1., 1., ..., 1., 1., 1.],

[ 1., 1., 1., ..., 1., 1., 1.],

[ 1., 1., 1., ..., 1., 1., 1.]])

>>> da_y = da.log(da_ones + 1)[:5].sum(axis=1)

>>> np_da_y = np.array(da_y) #fits in memory

array([ 693.14718056, 693.14718056,

693.14718056, 693.14718056, …, 693.14718056])

# If result doesn’t fit in memory

>>> da_y.to_hdf5('myfile.hdf5', 'result')

Dask Array is like NumPy

Page 76: Scaling Python to CPUs and GPUs

Example 3: Using Dask Arrays with global temperature data

76

• Built from NumPy

n-dimensional arrays

• Matches NumPy interface

(subset)

• Solve medium-large

problems

• Complex algorithms

Page 77: Scaling Python to CPUs and GPUs

Dask Schedulers: Distributed Scheduler

77

Page 78: Scaling Python to CPUs and GPUs

Cluster Architecture Diagram

78

Client Machine Compute

Node

Compute

Node

Compute

Node

Head Node

Page 79: Scaling Python to CPUs and GPUs

• Single machine with multiple threads or processes

• On a cluster with SSH (dcluster)

• Resource management: YARN (knit), SGE, Slurm

• On the cloud with Amazon EC2 (dec2)

• On a cluster with Anaconda for cluster management

• Manage multiple conda environments and packages

on bare-metal or cloud-based clusters

Using Anaconda and Dask on your Cluster

79

Page 80: Scaling Python to CPUs and GPUs

• The scheduler, workers, and clients pass messages between each other.

Semantically these messages encode commands, status updates, and

data, like the following:

• Please compute the function sum on the data x and store in y

• The computation y has been completed

• Be advised that a new worker named alice is available for use

• Here is the data for the keys 'x', and 'y'

Dask Protocol

80

In practice we represent these messages with dictionaries/mappings

Page 81: Scaling Python to CPUs and GPUs

• Protocol is a combination of msg pack for instructions and headers

• pickle/cloudpickle for arbitrary python objects

• byte strings (with optional compression) for large data.

• Prefer LZ4 to Snappy but either will be tried for messages above 1000B

• Protocol is extensible by registering serializers.

Dask Protocol

81

http://distributed.readthedocs.io/en/latest/protocol.html

Page 82: Scaling Python to CPUs and GPUs

Dask Protocol - Scheduler

82

However, the Scheduler never uses the language-specific serialization

and instead only deals with MsgPack. If the client sends a pickled

function up to the scheduler the scheduler will not unpack function but

will instead keep it as bytes. Eventually those bytes will be sent to a

worker, which will then unpack the bytes into a proper Python function.

Because the Scheduler never unpacks language-specific serialized bytes

it may be in a different language.

• The Scheduler is protected from unpickling unsafe code

• The Scheduler can be run under pypy for improved performance. This is only useful for

larger clusters.

• We could conceivably implement workers and clients for other languages (like R or Julia)

and reuse the Python scheduler. The worker and client code is fairly simple and much

easier to reimplement than the scheduler, which is complex.

• The scheduler might some day be rewritten in more heavily optimized C or Go

Page 83: Scaling Python to CPUs and GPUs

• Scheduling arbitrary graphs is hard.

• Optimal graph scheduling is NP-hard

• Scalable Scheduling requires Linear time solutions

• Fortunately dask does well with a lot of heuristics

• … and a lot of monitoring and data about sizes

• … and how long functions take.

Dask Scheduler

83

Page 84: Scaling Python to CPUs and GPUs

Scheduler Visualization with Bokeh

84

Page 85: Scaling Python to CPUs and GPUs

What makes Dask different?Lets look at some pictures of directed graphs

Page 86: Scaling Python to CPUs and GPUs
Page 87: Scaling Python to CPUs and GPUs
Page 88: Scaling Python to CPUs and GPUs

Most Parallel Framework Architectures

User APIHigh Level Representation

Logical Plan

Low Level Representation

Physical Plan

Task scheduler

for execution

Page 89: Scaling Python to CPUs and GPUs

SQL Database Architecture

SELECT avg(value)

FROM accounts

INNER JOIN customers ON …

WHERE name == ‘Alice’

Page 90: Scaling Python to CPUs and GPUs

SQL Database Architecture

SELECT avg(value)

FROM accounts

WHERE name == ‘Alice’

INNER JOIN customers ON …

Optimize

Page 91: Scaling Python to CPUs and GPUs

Spark Architecture

df.join(df2, …)

.select(…)

.filter(…)

Optimize

Page 92: Scaling Python to CPUs and GPUs

Large Matrix Architecture

(A’ * A) \ A’ * b

Optimize

Page 93: Scaling Python to CPUs and GPUs

Dask Architecture

Page 94: Scaling Python to CPUs and GPUs

Dask Architecture

accts=dd.read_parquet(…)

accts=accts[accts.name == ‘Alice’]

df=dd.merge(accts, customers)

df.value.mean().compute()

Page 95: Scaling Python to CPUs and GPUs

Dask Architecture

u, s, v = da.linalg.svd(X)

Y = u.dot(da.diag(s)).dot(v.T)

da.linalg.norm(X - y)

Page 96: Scaling Python to CPUs and GPUs

Dask Architecture

for i in range(256):

x = dask.delayed(f)(i)

y = dask.delayed(g)(x)

z = dask.delayed(add)(x, y

Page 97: Scaling Python to CPUs and GPUs

Dask Architecture

async def func():

client = await Client()

futures = client.map(…)

async for f in as_completed(…):

result = await f

Page 98: Scaling Python to CPUs and GPUs

Dask Architecture

Your own

system here

Page 99: Scaling Python to CPUs and GPUs

By dropping the high level representation

Costs• Lose specialization

• Lose opportunities for high level optimization

Benefits• Become generalists

• More flexibility for new domains and algorithms

• Access to smarter algorithms

• Better task scheduling

Resource constraints, GPUs, multiple clients,

async-real-time, etc..

Page 100: Scaling Python to CPUs and GPUs

Ten Reasons People Choose Dask

Page 101: Scaling Python to CPUs and GPUs

Scalable Pandas DataFrames

• Same APIimport dask.dataframe as dd

df = dd.read_parquet(‘s3://bucket/accounts/2017')df.groupby(df.name).value.mean().compute()

• Efficient Timeseries Operationsdf.loc[‘2017-01-01’] # Uses the Pandas index…

df.value.rolling(10).std() # for efficient…

df.value.resample(‘10m’).mean() # operations.

• Co-developed with Pandasand by the Pandas developer community

Page 102: Scaling Python to CPUs and GPUs

Scalable NumPy Arrays• Same API

import dask.array as da

x = da.from_array(my_hdf5_file)

y = x.dot(x.T)

• Applications

• Atmospheric science

• Satellite imagery

• Biomedical imagery

• Optimization algorithmscheck out dask-glm

Page 103: Scaling Python to CPUs and GPUs

Parallelize Scikit-Learn/Joblib• Scikit-Learn parallelizes with Joblib

estimator = RandomForest(…)

estimator.fit(train_data, train_labels, njobs=8)

• Joblib can use Dask

from sklearn.externals.joblib import parallel_backend

with parallel_backend('dask', scheduler=‘…’):

estimator.fit(train_data, train_labels)

https://pythonhosted.org/joblib/

http://distributed.readthedocs.io/en/latest/joblib.html

Joblib

Thread pool

Page 104: Scaling Python to CPUs and GPUs

Parallelize Scikit-Learn/Joblib• Scikit-Learn parallelizes with Joblib

estimator = RandomForest(…)

estimator.fit(train_data, train_labels, njobs=8)

• Joblib can use Dask

from sklearn.externals.joblib import parallel_backend

with parallel_backend('dask', scheduler=‘…’):

estimator.fit(train_data, train_labels)

https://pythonhosted.org/joblib/

http://distributed.readthedocs.io/en/latest/joblib.html

Joblib

Dask

Page 105: Scaling Python to CPUs and GPUs

Many Other Libraries in Anaconda• Scikit-Image uses dask to break down images and speed

up algorithms with overlapping regions

• Geopandas can use Dask to partition data spatially and accelerate spatial joins

Page 106: Scaling Python to CPUs and GPUs

Dask Scales Up• Thousand node clusters

• Cloud computing

• Super computers

• Gigabyte/s bandwidth

• 200 microsecond task overhead

Dask Scales Down (the median cluster size is one)• Can run in a single Python thread pool

• Almost no performance penalty (microseconds)

• Lightweight

• Few dependencies

• Easy install

Page 107: Scaling Python to CPUs and GPUs

Parallelize Web Backends• Web servers process thousands of small computations asynchronously

for web pages or REST endpoints

• Dask provides dynamic, heterogenous computation

• Supports small data

• 10ms roundtrip times

• Dynamic scaling for different loads

• Supports asynchronous Python (like GoLang)

async def serve(request):

future = dask_client.submit(process, request)

result = await future

return result

Page 108: Scaling Python to CPUs and GPUs

Debugging support• Clean Python tracebacks when user code breaks

• Connect to remote workers with IPython sessions for advanced debugging

Page 109: Scaling Python to CPUs and GPUs

Resource constraints• Define limited hardware resources for workers

• Specify resource constraints when submitting tasks

$ dask-worker … —resources GPU=2

$ dask-worker … —resources GPU=2

$ dask-worker … —resources special-db=1

future = client.submit(my_function, resources={‘GPU’: 1})

• Used for GPUs, big-memory machines, special hardware, database connections, I/O machines, etc..

Page 110: Scaling Python to CPUs and GPUs

Collaboration• Many users can share the same cluster simultaneously

• Define public datasets

• Repeated computation and data use is shared among everyone

df = dd.read_parquet(…).persist()

client.publish_dataset(accounts=df)

df = client.get_dataset(‘accounts’)

Page 111: Scaling Python to CPUs and GPUs

Beautiful Diagnostic Dashboards

• Fast responsive dashboards

• Provide users performance insight

• Powered by Bokeh

Page 112: Scaling Python to CPUs and GPUs

Some Reasons not to Choose Dask

Page 113: Scaling Python to CPUs and GPUs

• Dask is not a SQL database. Does Pandas well, but won’t optimize complex queries.

• Dask is not MPIVery fast, but does leave some performance on the table200us task overheada couple copies in the network stack

• Dask is not a JVM technologyIt’s a Python library(although Julia bindings available)

• Dask is not always necessary You may not need parallelism

Dask’s limitations

Page 114: Scaling Python to CPUs and GPUs

dask.pydata.org

conda install dask

Page 115: Scaling Python to CPUs and GPUs

© 2017 Anaconda, Inc. - Confidential & Proprietary

Scalable Machine Learning (ML)

Dask-ml — organized work for general scalable machine learning using dask

First release last week!

Organizes work done over the past year into a single home

A single place to discuss scalable machine learning with Python

https://tomaugspurger.github.io/scalable-ml-01

https://tomaugspurger.github.io/scalable-ml-02

https://github.com/dask/dask-ml/releases/tag/v0.2.3

conda install -c conda-forge dask-ml

https://tomaugspurger.github.io/dask-ml-announce