real-time insights, powered by reactive programming

Post on 11-Apr-2017

120 Views

Category:

Software

2 Downloads

Preview:

Click to see full reader

TRANSCRIPT

Real-time Insights powered by Reactive Programming

Jay Phelps | @_jayphelps

Jay Phelps | @_jayphelps

Let's talk why

Jay Phelps | @_jayphelps

Time is money

Jay Phelps | @_jayphelps

127+ million person-hours lost

per year

$500k — $1+ million

per hour average hourly cost of

critical failure

https://devops.com/real-cost-downtime/

Fortune 500

Jay Phelps | @_jayphelps

Every second counts

Jay Phelps | @_jayphelps

Not just outages...

Jay Phelps | @_jayphelps

debugging, testing, and even Information Security too

Jay Phelps | @_jayphelps

improving Developer Experience

Senior Software Engineer |

@_jayphelps

Jay Phelps

Jay Phelps | @_jayphelps

debugging, testing, and InfoSec

Jay Phelps | @_jayphelps

InfoSec

“preventing unauthorized access, use, or disruption of information”

Jay Phelps | @_jayphelps

stopping hackers

Jay Phelps | @_jayphelps

InfoSec

Jay Phelps | @_jayphelps

Jay Phelps | @_jayphelps

I'm on it.

Jay Phelps | @_jayphelps

We can block exploits using our gateway proxy

Jay Phelps | @_jayphelps

...we need to know it's working...

Jay Phelps | @_jayphelps

...we want to watch attackers try...

Jay Phelps | @_jayphelps

...in real-time

Jay Phelps | @_jayphelps

We need real-time insights For debugging, testing, and InfoSec

Jay Phelps | @_jayphelps

Logging is the answer

Jay Phelps | @_jayphelps

LOG ALL THE THINGSLOG ALL THE THINGSLOG ALL THE THINGSLOG ALL THE THINGSLOG ALL THE THINGS

Jay Phelps | @_jayphelps

One little problem...

Jay Phelps | @_jayphelps

We have millions of devices...

Jay Phelps | @_jayphelps

We have thousands of servers...

Jay Phelps | @_jayphelps

We need to process them in real-time

Jay Phelps | @_jayphelps

“Netflix is a log generating company that happens to stream movies” - Adrian Cockroft

Jay Phelps | @_jayphelps

Massive amount of streaming logs

Jay Phelps | @_jayphelps

Reactive Extensions

Jay Phelps | @_jayphelps

Rx

Jay Phelps | @_jayphelps

The best ideas from the Observer pattern, the Iterator pattern, and functional programming

Jay Phelps | @_jayphelps

“lodash for events”

Jay Phelps | @_jayphelps

Jay Phelps | @_jayphelps

Jay Phelps | @_jayphelps

High-level intro

Jay Phelps | @_jayphelps

.map(value=>value*10)

.forEach(value=>{

console.log(value);

});

[1,2,3]

Jay Phelps | @_jayphelps

.map(value=>value*10)

.forEach(value=>{

console.log(value);

});

[1,2,3]

Jay Phelps | @_jayphelps

.map(value=>value*10)

.forEach(value=>{

console.log(value);

});

[1,2,3]

Jay Phelps | @_jayphelps

.map(value=>value*10)

.forEach(value=>{

console.log(value);

});

[1,2,3]

Jay Phelps | @_jayphelps

.map(value=>value*10)

.forEach(value=>{

console.log(value);

});

[1,2,3]

Jay Phelps | @_jayphelps

.map(value=>value*10)

.forEach(value=>{

console.log(value);

});

[1,2,3]

//10

Jay Phelps | @_jayphelps

.map(value=>value*10)

.forEach(value=>{

console.log(value);

});

[1,2,3]

//10//20

Jay Phelps | @_jayphelps

.map(value=>value*10)

.forEach(value=>{

console.log(value);

});

[1,2,3]

//10//20//30

Jay Phelps | @_jayphelps

[1,2,3]

//10//20//30

.map(value=>value*10)

.forEach(value=>{

console.log(value);

});

Jay Phelps | @_jayphelps

[1,2,3]

//10//20//30

.map(value=>value*10)

.forEach(value=>{

console.log(value);

});

Jay Phelps | @_jayphelps

Observable.of(1,2,3)

.map(value=>value*10)

.forEach(value=>{

console.log(value);

});

Jay Phelps | @_jayphelps

Observable.of(1,2,3)

.map(value=>value*10)

.forEach(value=>{

console.log(value);

});

Jay Phelps | @_jayphelps

Observable.of(1,2,3)

.map(value=>value*10)

.subscribe(value=>{

console.log(value);

});

Jay Phelps | @_jayphelps

Observable.of(1,2,3)

.map(value=>value*10)

.subscribe(value=>{

console.log(value);

});

Jay Phelps | @_jayphelps

Observable.of(1,2,3)

//10

.map(value=>value*10)

.subscribe(value=>{

console.log(value);

});

Jay Phelps | @_jayphelps

Observable.of(1,2,3)

//10//20

.map(value=>value*10)

.subscribe(value=>{

console.log(value);

});

Jay Phelps | @_jayphelps

Observable.of(1,2,3)

//10//20//30

.map(value=>value*10)

.subscribe(value=>{

console.log(value);

});

Jay Phelps | @_jayphelps

Array is a collection

Jay Phelps | @_jayphelps

Observable is a collection that arrives over time

Jay Phelps | @_jayphelps

Represents a stream

Jay Phelps | @_jayphelps

button.addEventListener('click',event=>{

console.log('youclicked!');

});

Jay Phelps | @_jayphelps

Observable.fromEvent(button,'click')

.subscribe(event=>{

console.log('youclicked!');

});

.subscribe(event=>{

console.log('youclicked!');

});

Jay Phelps | @_jayphelps

Observable.fromEvent(button,'click')

.debounceTime(500)

.subscribe(event=>{

console.log('youclicked!');

});

Jay Phelps | @_jayphelps

Observable.fromEvent(button,'click')

.debounceTime(500)

Jay Phelps | @_jayphelps

Observable.fromEvent(button,'click')

.debounceTime(500)

Jay Phelps | @_jayphelps

Observable.fromEvent(button,'click')

.debounceTime(500)

Jay Phelps | @_jayphelps

Observable.fromEvent(button,'click')

.debounceTime(500)

Jay Phelps | @_jayphelps

Observable.fromEvent(button,'click')

.debounceTime(500)

Jay Phelps | @_jayphelps

Observable.fromEvent(button,'click')

.debounceTime(500)

Observable.fromEvent(button,'click')

.debounceTime(500)

Jay Phelps | @_jayphelps

Observable.fromEvent(button,'click')

.debounceTime(500)

Jay Phelps | @_jayphelps

Observable.fromEvent(button,'click')

.debounceTime(500)

Jay Phelps | @_jayphelps

Observable.fromEvent(button,'click')

.debounceTime(500)

Jay Phelps | @_jayphelps

Observable.fromEvent(button,'click')

.debounceTime(500)

Jay Phelps | @_jayphelps

Observable.fromEvent(button,'click')

.debounceTime(500)

Jay Phelps | @_jayphelps

Observable.fromEvent(button,'click')

.debounceTime(500)

Jay Phelps | @_jayphelps

500 ms

Observable.fromEvent(button,'click')

.debounceTime(500)

Jay Phelps | @_jayphelps

500 ms

Observable.fromEvent(button,'click')

.debounceTime(500)

Jay Phelps | @_jayphelps

500 ms

Observable.fromEvent(button,'click')

.debounceTime(500)

Jay Phelps | @_jayphelps

500 ms

Observable.fromEvent(button,'click')

.debounceTime(500)

Jay Phelps | @_jayphelps

500 ms

Observable.fromEvent(button,'click')

.debounceTime(500)

Jay Phelps | @_jayphelps

500 ms

Observable.fromEvent(button,'click')

.debounceTime(500)

Jay Phelps | @_jayphelps

500 ms

Observable.fromEvent(button,'click')

.debounceTime(500)

Jay Phelps | @_jayphelps

500 ms

Observable.fromEvent(button,'click')

.debounceTime(500)

Jay Phelps | @_jayphelps

500 ms 500 ms

Observable.fromEvent(button,'click')

.debounceTime(500)

Jay Phelps | @_jayphelps

500 ms 500 ms

Observable.fromEvent(button,'click')

.debounceTime(500)

Jay Phelps | @_jayphelps

500 ms 500 ms

Observable.fromEvent(button,'click')

.debounceTime(500)

Jay Phelps | @_jayphelps

500 ms 500 ms

Observable.fromEvent(button,'click')

.debounceTime(500)

Jay Phelps | @_jayphelps

500 ms 500 ms500 ms

Observable.fromEvent(button,'click')

.debounceTime(500)

Jay Phelps | @_jayphelps

500 ms 500 ms500 ms

Observable.fromEvent(button,'click')

.debounceTime(500)

Jay Phelps | @_jayphelps

500 ms 500 ms500 ms

Jay Phelps | @_jayphelps

Observable.fromEvent(button,'click')

.debounceTime(500)

Jay Phelps | @_jayphelps

Stream of logs

Jay Phelps | @_jayphelps

{

"path":"/something",

"status":500

}

Log message in JSON

Jay Phelps | @_jayphelps

import{webSocket}from"rxjs/observable/webSocket";

letstreamOfLogs=webSocket("ws://logs.netflix.com")

.subscribe(msg=>{

console.log(msg);

});

Jay Phelps | @_jayphelps

streamOfLogs

.filter(msg=>msg.status!==200)

.subscribe(msg=>{

console.log(msg);

});

Jay Phelps | @_jayphelps

streamOfLogs

.filter(msg=>msg.status!==200)

500 200 404 301

Jay Phelps | @_jayphelps

500

streamOfLogs

.filter(msg=>msg.status!==200)

404

Jay Phelps | @_jayphelps

Learning Rx isn't easy

Jay Phelps | @_jayphelps

It's worth it!

Jay Phelps | @_jayphelps

It's worth it!Jay!!

Jay Phelps | @_jayphelps

Case Study

Scaling high volume logs in real-time

Jay Phelps | @_jayphelps

We're going to log JSON

Jay Phelps | @_jayphelps

{

"path":"/something",

"status":200

}

Device{

"event":"StartPlay",

"device":"XBox360"

}

Server

Jay Phelps | @_jayphelps

Stream them all through a single pipeline

Jay Phelps | @_jayphelps

How do you find and process the logs you want

Jay Phelps | @_jayphelps

Write a custom job in Java?

Jay Phelps | @_jayphelps

Job: a unit of work

Jay Phelps | @_jayphelps

{

"path":"/something",

"status":500,

"duration":135,

//etc...

}

Server log message

Jay Phelps | @_jayphelps

streamOfLogs

.filter(event->event.get("status")!=200)

.map(event->

newJSONObject()

.put("path",event.get("path"))

.put("status",event.get("status"))

);

Jay Phelps | @_jayphelps

Powerful, but... not ideal

Jay Phelps | @_jayphelps

Query Language

Jay Phelps | @_jayphelps

SELECTpath,statusWHEREstatus!=200

Jay Phelps | @_jayphelps

Just-in-time (JIT) Compilation

Jay Phelps | @_jayphelps

SELECTpath,status,durationWHEREstatus!=200

Jay Phelps | @_jayphelps

SELECTpath,status,durationWHEREstatus!=200

Jay Phelps | @_jayphelps

streamOfLogs

.filter(event->event.get("status")!=200)

.map(event->

newJSONObject()

.put("path",event.get("path"))

.put("status",event.get("status"))

);

SELECTpath,status,durationWHEREstatus!=200

Jay Phelps | @_jayphelps

What's next?

Jay Phelps | @_jayphelps

8+ million messages per second, peakm

essa

ges

per s

econ

d

Time

Jay Phelps | @_jayphelps

How do we scale this?

Jay Phelps | @_jayphelps

Distribute work via autoscaling

Jay Phelps | @_jayphelps

A

Source

B

Server

B

Server

50%

50%

Load balancing a job

Jay Phelps | @_jayphelps

Chain jobs together

Jay Phelps | @_jayphelps

SELECTpath,statusWHEREsource=="API"

Chain jobs together

Jay Phelps | @_jayphelps

A

Job

B C

Job Job

Chain jobs together

Jay Phelps | @_jayphelps

High-volume, distributed systems have a problem...

Jay Phelps | @_jayphelps

Backpressure

Jay Phelps | @_jayphelps

“pressure opposed to the desired flow of gases in confined places such as a pipe” - wikipedia.org

Jay Phelps | @_jayphelps

A B C

Server Server Server

100 rps 75 rps

Jay Phelps | @_jayphelps

A B C

Server Server Server

60 sec * 25 rps = 1,500 rpm!

100 rps 75 rps

Jay Phelps | @_jayphelps

A B C

Server Server Server

100 rps 75 rps

60 sec * 60 min * 25 rps = 90,000 rph!

Jay Phelps | @_jayphelps

Buffer or Drop

Jay Phelps | @_jayphelps

onBackpressureBuffer

.onBackpressureBuffer()

Jay Phelps | @_jayphelps

streamOfLogs

.subscribe(value->{

//somethingexpensive

Thread.sleep(100);

});

.onBackpressureBuffer()

Jay Phelps | @_jayphelps

streamOfLogs

.subscribe(value->{

//somethingexpensive

Thread.sleep(100);

});

.onBackpressureBuffer()

Jay Phelps | @_jayphelps

streamOfLogs

.subscribe(value->{

//somethingexpensive

Thread.sleep(100);

});

Jay Phelps | @_jayphelps

1 2 3 4 5

.onBackpressureBuffer()

[3,4,5]

1 2

Jay Phelps | @_jayphelps

onBackpressureDrop

.onBackpressureDrop()

streamOfLogs

.subscribe(value->{

//somethingexpensive

Thread.sleep(100);

});

Jay Phelps | @_jayphelps

.onBackpressureDrop()

streamOfLogs

.subscribe(value->{

//somethingexpensive

Thread.sleep(100);

});

Jay Phelps | @_jayphelps

.onBackpressureDrop()

streamOfLogs

.subscribe(value->{

//somethingexpensive

Thread.sleep(100);

});

Jay Phelps | @_jayphelps

Jay Phelps | @_jayphelps

1 2 3 4 5

.onBackpressureDrop()

1 4

Jay Phelps | @_jayphelps

Job authors choose which

Jay Phelps | @_jayphelps

700+ jobs running 24/7 8+ million events/second

Autoscaling

MantisLow-latency, high throughput stream-processing job platform

http://techblog.netflix.com

Jay Phelps | @_jayphelps

We need somewhere to submit and view query results

Jay Phelps | @_jayphelps

Query Builder UI

Jay Phelps | @_jayphelps

SELECTpath,statusWHEREstatus!=200

Jay Phelps | @_jayphelps

Jay Phelps | @_jayphelps

Jay Phelps | @_jayphelps

Jay Phelps | @_jayphelps

Jay Phelps | @_jayphelps

Jay Phelps | @_jayphelps

Jay Phelps | @_jayphelps

Can be extremely high-volume

100k+ rps

Jay Phelps | @_jayphelps

We simply can’t render that fast!

Jay Phelps | @_jayphelps

Even if we could, it would be bad UX

Jay Phelps | @_jayphelps

Performance solutions are often driven by UX

Jay Phelps | @_jayphelps

So what do we do?

Jay Phelps | @_jayphelps

UI virtualization aka Virtual Table?

VirtualizedNOT Virtualized vs.

Jay Phelps | @_jayphelps

We still can’t update that virtual table 100k per second

Jay Phelps | @_jayphelps

This is also backpressure

Jay Phelps | @_jayphelps

Buffer or Drop

Jay Phelps | @_jayphelps

Buffer for 1 second

Jay Phelps | @_jayphelps

getWebSocket()

.bufferTime(1000)

Jay Phelps | @_jayphelps

Buffer size is unbounded

Jay Phelps | @_jayphelps

See what your users actually do

Jay Phelps | @_jayphelps

Users want a sample

Jay Phelps | @_jayphelps

Batch sampling

Jay Phelps | @_jayphelps

Drop after reaching a certain threshold

Jay Phelps | @_jayphelps

Batch sampling

Jay Phelps | @_jayphelps

Batch sampling

Jay Phelps | @_jayphelps

Batch sampling

Jay Phelps | @_jayphelps

Batch sampling

Jay Phelps | @_jayphelps

letbuffer=getWebSocket()

.bufferTime(1000);

letgate=newBehaviorSubject(true);

letbatchSize=50;

letbatchSizeCounter=0;

letresults=gate$

.switchMap(enabled=>enabled?buffer:Observable.never())

.do(buffer=>{

batchSizeCounter+=buffer.length;

if(batchSizeCounter>=batchSize){

//truncatesthearray,ifit'soverbatchSizebuffer.length=batchSize;

//turnsonthegate,pausingthestream

gate.next(false);

batchSizeCounter=0;

}

});

https://goo.gl/DMOqBA

Jay Phelps | @_jayphelps

letbuffer=getWebSocket()

.bufferTime(1000);

letgate=newBehaviorSubject(true);

letbatchSize=50;

letbatchSizeCounter=0;

letresults=gate$

.switchMap(enabled=>enabled?buffer:Observable.never())

.do(buffer=>{

batchSizeCounter+=buffer.length;

if(batchSizeCounter>=batchSize){

//truncatesthearray,ifit'soverbatchSizebuffer.length=batchSize;

//turnsonthegate,pausingthestream

gate.next(false);

batchSizeCounter=0;

}

});

https://goo.gl/DMOqBA

Jay Phelps | @_jayphelps

letbuffer=getWebSocket()

.bufferTime(1000);

letgate=newBehaviorSubject(true);

letbatchSize=50;

letbatchSizeCounter=0;

letresults=gate$

.switchMap(enabled=>enabled?buffer:Observable.never())

.do(buffer=>{

batchSizeCounter+=buffer.length;

if(batchSizeCounter>=batchSize){

//truncatesthearray,ifit'soverbatchSizebuffer.length=batchSize;

//turnsonthegate,pausingthestream

gate.next(false);

batchSizeCounter=0;

}

});

https://goo.gl/DMOqBA

Jay Phelps | @_jayphelps

letbuffer=getWebSocket()

.bufferTime(1000);

letgate=newBehaviorSubject(true);

letbatchSize=50;

letbatchSizeCounter=0;

letresults=gate$

.switchMap(enabled=>enabled?buffer:Observable.never())

.do(buffer=>{

batchSizeCounter+=buffer.length;

if(batchSizeCounter>=batchSize){

//truncatesthearray,ifit'soverbatchSizebuffer.length=batchSize;

//turnsonthegate,pausingthestream

gate.next(false);

batchSizeCounter=0;

}

});

https://goo.gl/DMOqBA

Jay Phelps | @_jayphelps

letbuffer=getWebSocket()

.bufferTime(1000);

letgate=newBehaviorSubject(true);

letbatchSize=50;

letbatchSizeCounter=0;

letresults=gate$

.switchMap(enabled=>enabled?buffer:Observable.never())

.do(buffer=>{

batchSizeCounter+=buffer.length;

if(batchSizeCounter>=batchSize){

//truncatesthearray,ifit'soverbatchSizebuffer.length=batchSize;

//turnsonthegate,pausingthestream

gate.next(false);

batchSizeCounter=0;

}

});

https://goo.gl/DMOqBA

Jay Phelps | @_jayphelps

letbuffer=getWebSocket()

.bufferTime(1000);

letgate=newBehaviorSubject(true);

letbatchSize=50;

letbatchSizeCounter=0;

letresults=gate$

.switchMap(enabled=>enabled?buffer:Observable.never())

.do(buffer=>{

batchSizeCounter+=buffer.length;

if(batchSizeCounter>=batchSize){

//truncatesthearray,ifit'soverbatchSizebuffer.length=batchSize;

//turnsonthegate,pausingthestream

gate.next(false);

batchSizeCounter=0;

}

});

https://goo.gl/DMOqBA

Jay Phelps | @_jayphelps

letbuffer=getWebSocket()

.bufferTime(1000);

letgate=newBehaviorSubject(true);

letbatchSize=50;

letbatchSizeCounter=0;

letresults=gate$

.switchMap(enabled=>enabled?buffer:Observable.never())

.do(buffer=>{

batchSizeCounter+=buffer.length;

if(batchSizeCounter>=batchSize){

//truncatesthearray,ifit'soverbatchSizebuffer.length=batchSize;

//turnsonthegate,pausingthestream

gate.next(false);

batchSizeCounter=0;

}

});

https://goo.gl/DMOqBA

Jay Phelps | @_jayphelps

Works for low-volume queries too

Jay Phelps | @_jayphelps

Raven

Jay Phelps | @_jayphelps

RavenSweet Christmas

Jay Phelps | @_jayphelps

improved debugging, testing, and InfoSec

Jay Phelps | @_jayphelps

Netflix loves Rx

Jay Phelps | @_jayphelps

Rx is powerful, cross-platform

Thanks!

@_jayphelps

top related