nyc ml meetup talk july2015 final ml meetup talk - ufora.pdf · braxton mckee founder, ceo keith...

Post on 09-Oct-2020

0 Views

Category:

Documents

0 Downloads

Preview:

Click to see full reader

TRANSCRIPT

NYC  Machine  Learning  MeetUp

“A  peek  under  the  hood  at  Ufora”

Braxton  McKee,  CEObraxton@ufora.com

Braxton McKee Founder, CEO

Keith KegleyPresident

Eyal GoldwergerExec. Chairman

Ben BeineckeHead of Strategy

Ronen HilewiczVP Engineering

Tom PetersEngineer

Steven BenarioHead of Product

Jay MoolenaarHead of Sales

Alex TsannesEngineer

Ross GoodwinEngineer

Amichai LevyEngineer

Tony JebaraScientific Advisor & Head of ML Lab at

Columbia

Why  should  I  have  to  write  a  different  program  if  I  have  1000  rows  or  1  billion?

Advantage

Shortest  time-­‐to-­‐codeWrite in  single-­‐threaded  python,  R,  or  equivalent.

Disadvantage

Slow,  can’t  handle  scale.

Advantage

Shortest  time-­‐to-­‐code

Maximal  performance

Write in  single-­‐threaded  python,  R,  or  equivalent.

Code  parallel  version  by  hand

Disadvantage

Slow,  can’t  handle  scale.

Lots of  code,  hard  to  get  right  (race  conditions).

Advantage

Shortest  time-­‐to-­‐code

Maximal  performance

Only  have  to  implement  a  few  patterns.

Write in  single-­‐threaded  python,  R,  or  equivalent.

Code  parallel  version  by  hand

Code against  APIs  that  implement  particular  patterns.  (MapReduce)

Disadvantage

Slow,  can’t  handle  scale.

Lots of  code,  hard  to  get  right  (race  conditions).

Can be  hard  to  fit  a  problem   into  the  given  pattern,  or  it’s  really  inefficient.

Advantage

Shortest  time-­‐to-­‐code

Maximal  performance

Only  have  to  implement  a  few  patterns.

Users  can  implement   lots  of  algorithms very  quickly  without  a  lot  of  code.

Write in  single-­‐threaded  python,  R,  or  equivalent.

Code  parallel  version  by  hand

Code against  APIs  that  implement  particular  patterns.  (MapReduce)

Figure  out  how  to  automatically  scale  up  programs  written  in  python,   R,  etc.

Disadvantage

Slow,  can’t  handle  scale.

Lots of  code,  hard  to  get  right  (race  conditions).

Can be  hard  to  fit  a  problem   into  the  given  pattern,  or  it’s  really  inefficient.

Tough   to  implement.    Can  be hard  to  make  optimal  decisions  without  knowing  what  the  computation  will  do.

Advantage

Shortest  time-­‐to-­‐code

Maximal  performance

Only  have  to  implement  a  few  patterns.

Users  can  implement   lots  of  algorithms very  quickly  without  a  lot  of  code.

Write in  single-­‐threaded  python,  R,  or  equivalent.

Code  parallel  version  by  hand

Code against  APIs  that  implement  particular  patterns.  (MapReduce)

Figure  out  how  to  automatically  scale  up  programs  written  in  python,   R,  etc.

Disadvantage

Slow,  can’t  handle  scale.

Lots of  code,  hard  to  get  right  (race  conditions).

Can be  hard  to  fit  a  problem   into  the  given  pattern,  or  it’s  really  inefficient.

Tough   to  implement.    Can  be hard  to  make  optimal  decisions  without  knowing  what  the  computation  will  do.

Business  Vision:

Empower  data  scientists  to  run  any  computation  on  any  data  quickly  and  easily  

by  automating the  engineering.

Technical  Vision:

Create  a  VM  that  can  automatically  scale  any  algorithm  across  a  large  number  of  machines  

without  direct  supervision  by  the  user.

What  is  Ufora?

A  data-­‐processing  platform  that  automatically  parallelizes  user  programs  and  executes  them  across  a  cluster  of  machines.

Use  Cases:

•Data  processing  and  cleaning•Large-­‐scale  machine  learning•Modeling  and  simulation

Design  Goal:

Completely  separate  “what”  from  “how”

Key  Components

• Implicit  parallelism  at  language  level• JIT  compilation•Fault  Tolerance•Automatic  co-­‐location  of  data  and  compute

What  do  we  give  up?

•Restrict  mutability  of  data-­‐structures•Restrict  side-­‐effects•Emphasize  “functional”  programming  style•Some  features  of  host  languages  won’t  work

def isPrime(p):x  =  2while  x*x  <=  p:

if  p%x ==  0:return  0

x  =  x  +  1return  1

def filter(v,f):if  len(v)  ==  0:  

return  []if  len(v)  ==  1:

return  v  if  f(v[0])  else  []

mid  =  len(v)/2return  filter(v[:mid],f)  +  \

filter(v[mid:],f)

print  filter(range(100000000),isPrime)

Naturally parallel(divide and conquer)

A  Nice  Example

Naturally Sequential (because of the loop)

CORE #1 CORE #2 CORE #3 CORE #4

0 – 25M 25M – 50M 50M – 75M 75M – 100M

100M Integers

0 – 50M 50M – 100M

filter(v,  isPrime)

Splitting

Adaptive Parallelism

Key  Components

• Implicit  parallelism• JIT  compilation•Fault  Tolerance•Automatic  co-­‐location  of  data  and  compute

Our  solution  – react  dynamically  as  the  program  runs

Watch  running   threads  to  see  what  blocks  of  data  they’re  accessing.

Our  solution  – react  dynamically  as  the  program  runs

Watch  running   threads  to  see  what  blocks  of  data  they’re  accessing.

Detect  when  blocks  of  data  absolutely  have  to  be  on  the  same  machine.

Our  solution  – react  dynamically  as  the  program  runs

Watch  running   threads  to  see  what  blocks  of  data  they’re  accessing.

Detect  when  blocks  of  data  absolutely  have  to  be  on  the  same  machine.

Build  a  statistical  models  of  correlations  between  block  accesses.

Our  solution  – react  dynamically  as  the  program  runs

Watch  running   threads  to  see  what  blocks  of  data  they’re  accessing.

Detect  when  blocks  of  data  absolutely  have  to  be  on  the  same  machine.

Build  a  statistical  models  of  correlations  between  block  accesses.

Place  data  to  minimize  expected  future  number  of  machine  boundary  crossings.

A  Simple  Example

v  =  range(0,  2*10**9)

[0.0,1.0,2.0,3.0,4.0,…,1999999999.0]

v  =  range(0,  2*10**9)

User  writes

To  the  user,  ‘v’  is  now  a  big  contiguous  array:

[0.0,1.0,2.0,3.0,4.0,…,1999999999.0]

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16

What’s  going  on  under  the  hood?

In this example, each block is 1 GB. This block contains the first

125,000,000 numbers.

v

“v” is actually a pointer to a bunch of blocks of data

v  =  range(0,  2*10**9)

Machine  1

Machine  2

1 2 3 4

5 6 7 8

9 10 11 12

13 14 15 16

Machine  3

Machine  4

What  if  the  dataset  is  bigger  than  one  machine  can  hold?

Machines w/ 8 GB RAM16 GB of data

Machine  1

Machine  2

1 2 3 4

5 6 7 8

9 10 11 12

13 14 15 16

Machine  3

Machine  4

Simple  case  – just  stripe  data  linearly

Place first 4 GB on machine 1

What  happens  when  we  start  using  the  data?

User  writes

Now  the  computation  wants  to  scan  sequentially  over  the  dataset

for  x  in  v:    # some  complicated  state  machine

Machine  1

Machine  2

1 2 3 4

5 6 7 8

9 10 11 12

13 14 15 16

Machine  3

Machine  4

Computation  starts  on  Machine  1

When  the  computation  exhausts  the  data  on  one  machine,  the  runtime  moves  it  to  the  next

for  x  in  v:    …  

But  real  access  patterns  are  more  complex!

User  writes

Now  the  computation  is  looking  at  all  pairs  v[i]  and  v[i+10]  

res  =  0def f(x,y):

#  some  functionfor  i in  xrange(0,  len(v)-­‐10):  

res  =  res  +  f(v[i],  v[i+10])

Machine  1

Machine  2

1 2 3 4

5 6 7 8

9 10 11 12

13 14 15 16

Machine  3

Machine  4

But  when  the  computation  reaches  the  end  of  block  4,  v[i]  and  v[i+10]  aren’t  on  the  same  machine!

At  first,  everything  is  OK,  since  v[ix]  and  v[ix+10]  are  close  to  each  other  in  the  data

When  v[i+10]  needs  data  on  machine  2,  the  runtime  has  to  move  the  

computation  to  the  other  machine

Block  4  on  Machine  1

Block  5  on  Machine  2v[i]

v[i+10]

But  then  we  increment  ‘i’  and  have  to  go  back  to  the  first  machine.

Block  4  on  Machine  1

Block  5  on  Machine  2v[ix]

v[ix+10]

Now  the  computation  is  alternating  between  accessing  data  on  Machine  1  

and  Machine  2

Block  4  on  Machine  1

Block  5  on  Machine  2v[ix]

v[ix+10]

Every  time  we  have  to  move  the  computation,  we’re  hitting  the  network.

Block  4  on  Machine  1

Block  5  on  Machine  2v[ix]

v[ix+10]

This  is  really  slow!

Machine  1

Machine  2

1 2 3 4

5 6 7 8

9 10 11 12

13 14 15 16

Machine  3

Machine  4

Solution:  Replicate  blocks  so  that  they  overlap

5

9

13

Data  can  live  on  two  different  machines  at  the  same  time  because  its  immutable!

Machine  1

Machine  2

1 2 3 4

5 6 7 8

9 10 11 12

13 14 15 16

Machine  3

Machine  4

Solution:  Replicate  blocks  so  that  they  overlap

5

Now  v[i]  and  v[i+10]  are  always  available  together  on  some  machine

9

13

What  about  a  different  access  pattern?

Imagine  the  user  writes

Now  the  computation  will  jump  back  and  forth  between  the  beginning  and  the  end  of  the  vector.

res  =  0def f(x,y):

#  some  functionfor  i in  xrange(0,  len(v)):  

res  =  res  +  f(v[i],  v[-­‐i])

Machine  1

Machine  2

1 2 3 4

5 6 7 8

9 10 11 12

13 14 15 16

Machine  3

Machine  4

5

9

13

The  computation  will  jump  back  and  forth  between  the  beginning  and  the  end  of  the  vector.

f(v[i],  v[-­‐i])

V[1]V[2]V[3]

V[-­‐1]V[-­‐2]V[-­‐3]

Machine  1

Machine  2

1

16

Machine  3

Machine  4

2 3 4

5 6 7 8

9 10 11 12

13 14 15

5

9

13

But  these  blocks  are  on  separate  machines,  which  we  know  is  slow

Machine  1

Machine  2

1 2 3 4

5 6 7 8

9 10 11 12

13 14 15 16

Machine  3

Machine  4

5

9

13

Under  this  layout,  the  computation  will  have  to  move  between  machines  2  billion  times!

Machine  1

Machine  2

1 2

3 4

5 6

7 8 9 10

11 12

14

15 16

Machine  3

Machine  4

13

But  if  we  lay  out  the  data  out  like  this,we’ll  only  have  to  move  a  few  times

There  are  lots  of  different  access  patterns.

Each  one  has  a  different  optimal  layout.

Let’s  look  at  a  couple  that  show  up  in  ML  applications

def dot(v1,  v2):return  sum(0,  len(v1),  

lambda  i:  v1[i]  *  v2[i])

Example:  Simple  Dot  Product

Example:  Simple  Dot  Product

def dot(v1,  v2):return  sum(0,  len(v1),  

lambda  i:  v1[i]  *  v2[i])

M1

M2

M3

M4

tExample:  Linear  Regressiondef range(a,b,f):

if  a  >=  b:return  []

if  a+1  >=  b:return  [f(a)]

mid  =  (a+b)/2return  range(a,mid,f)  +  range(mid,b,f)

def computeXtX(columns):return  range(0,  len(columns),  lambda  c1:

range(0,  len(columns),  lambda  c2:dot(columns[c1],  columns[c2])))

Basically computing a covariance matrix

t

100

Each  column  is  1  GB

Example:  Linear  Regression

t

100

Example:  Linear  Regression

t

100

4950  Pairs

Example:  Linear  Regression

t

100

4950  Pairs

Example:  Linear  Regression

M1

t

100

4950  Pairs

Example:  Linear  Regression

M1

M2

M3

M4

tExample:  DecisionTree

• Tries  to  learn  a  function  of  Y  given  a  set  of  X’s  in  N  dimensions.

• Common  algorithm  for  both  GBM and  random  forest

• Complex  parallelism  pattern• Has  both  static  data  (that  sits  still)  and  

rapidly  moving  data

tExample:  DecisionTree

X0  >  10

X9  >  1.2 X33  >  -­‐5.0

X65  >  .111 Y=20 Y=30

Y=0Y=10

Y=1

tExample:  DecisionTree

How  do  we  pick  split  points?

tExample:  DecisionTree

X0

Y

Scan  over  the  data  for  each  column

tExample:  DecisionTree

X0

Y

Build  a  histogram

tExample:  DecisionTree

X0

Y

Pick  the  “X”  point  that  maximizes  separation  of  Y

tExample:  DecisionTree Pick  the  best  rule  over  all  the  columns,  

and  divide  the  dataset  in  half  

according  to  that  rule

X9  >  1.2

tExample:  DecisionTreeAnd  recurse!

X9  >  1.2

X0  >  0.5

X33  >  7.1

tExample:  DecisionTreeAnd  compute  

averages  over  Y  over  all  the  leaf  datasets.

X9  >  1.2

X0  >  0.5

X33  >  7.1

Y  =  20 Y  =  30

tExample:  DecisionTreeAnd  compute  

averages  over  Y  over  all  the  leaf  datasets.

X9  >  1.2

X0  >  0.5

X33  >  7.1

Y  =  20 Y  =  30

tExample:  DecisionTree

We  don’t  actually  make  a  full  copy  of  the  data  at  each  split.

Instead,  we  can  track  “Active  Indices,”  which  contain  the  set  of  row  indices  in  a  given  subset.

This  is  much  smaller  than  the  whole  set.

YT`

100

Example:  DecisionTree

M1

M2

M3

M4

Active  Indices

t

100

Example:  DecisionTree

M1

M2

M3

M4

Active  Indices

Y

t

100

Example:  DecisionTree

M1

M2

M3

M4

Active  Indices

Y

t

100

Example:  DecisionTree

M1

M2

M3

M4

Active  Indices

Y

How  is  this  implemented?

Backend  in  C++Codegen using  LLVM

Language  bindings  in  Python

Python Code

Ufora IR

FORA code

Threads

Data

Ufora Worker

Thread  Scheduler

Ufora Worker

JIT  compiler  (LLVM)

Thread  Scheduler

Ufora Worker

JIT  compiler  (LLVM)

Thread  Scheduler

Ufora Worker

JIT  compiler  (LLVM)

Thread  Scheduler

Data  Scheduler

Ufora Worker

JIT  compiler  (LLVM)

Thread  Scheduler

Data  Scheduler

Ufora Worker

JIT  compiler  (LLVM)

Thread  Scheduler

Data  Scheduler Global  Scheduler

Data  backplane  to  other  Machines

Ufora Worker

JIT  compiler  (LLVM)

Thread  Scheduler

Data  Scheduler Global  Scheduler

Local  Disk

The  End•We’re  Hiring

•We  give  out  trial  licenses  for  compelling  projects

•Contact  me:    braxton@ufora.com

top related