disruptor

46
DISRUPTOR Larry Nung

Upload: larry-nung

Post on 20-Jan-2017

327 views

Category:

Technology


3 download

TRANSCRIPT

Page 1: Disruptor

DISRUPTORLarry Nung

Page 2: Disruptor

AGENDAIntroductionRelative TechnologiesDisruptorLMAX ArchitectureReferenceQ & A

2

Page 3: Disruptor

INTRODUCTION

Page 4: Disruptor

INTRODUCTION High Performance Inter-Thread Messaging

Library LMAX build the system on the JVM platform

and centers on a Business Logic Processor that can handle 6 million orders per second on a single thread with very low latency: 100K TPS with 1 ms latency.

Multicast events to consumers, with consumer dependency graph.

Pre-allocate memory for events. Optionally lock-free.

Page 5: Disruptor

RELATIVE TECHNOLOGIES

Page 6: Disruptor

CAS Compare-and-swap An atomic instruction used in multithreading

to achieve synchronization

56

Core1 Core2

56=>57 55=>58

Page 7: Disruptor

CAS

Page 8: Disruptor

MEMORY BARRIERS A type of barrier instruction that causes a

central processing unit (CPU) or compiler to enforce an ordering constraint on memory operations issued before and after the barrier instruction.

Page 9: Disruptor

CACHE LINE Unit of transfer between memory and CPU Cache-Lines size is (typically) 64 bytes.

Page 10: Disruptor

FALSE SHARING When a system participant attempts to

periodically access data that will never be altered by another party, but that data shares a cache block with data that is altered, the caching protocol may force the first participant to reload the whole unit despite a lack of logical necessity. 

Page 11: Disruptor

FALSE SHARINGstruct foo { int x; int y; }; static struct foo f;

/* The two following functions are running concurrently: */ int sum_a() { int s = 0; int i; for (i = 0; i < 1000000; ++i) s += f.x; return s; }

void inc_b() { int i; for (i = 0; i < 1000000; ++i) ++f.y; }

Page 12: Disruptor

CACHE LINE PADDING

Page 13: Disruptor

DISRUPTOR

Page 14: Disruptor

DISRUPTOR ARCHITECTURE Ring buffer size = 2 ^ n

Index = sequence & (bufferSize - 1) no comparisons no branches safe

Producer

Consumer

Page 15: Disruptor

CONSUMER DEPENDENCY GRAPH Unicast: 1P - 1C

Three Step Pipeline: 1P – 3C

P1 C1

P1 C1 C2 C3

P1

C1

P1

C1

C2

C3

Page 16: Disruptor

CONSUMER DEPENDENCY GRAPH Sequencer: 3P – 1C

Multicast: 1P – 3C

P1

P2

P3

C1

P1

C1

C2

C3

P1

C1

P2

P3

P1

C1

C2

C3

Page 17: Disruptor

CONSUMER DEPENDENCY GRAPH Diamond: 1P – 3C

P1

C1

C2

C3

P1

C1

C2C3

Page 18: Disruptor

CONSUMER DEPENDENCY GRAPH

Page 19: Disruptor

WAIT STRATEGY BlockingWaitStrategy BusySpinWaitStrategy SleepingWaitStrategy TimeoutBlockingWaitStrategy PhasedBackoffWaitStrategy YieldingWaitStrategy

Page 20: Disruptor

SETUP DISRUPTOR

Page 21: Disruptor

CUSTOM EVENTHANDLER... public class Data { public string Value { get; set; } } public class DataEventHandler : IEventHandler<Data> { public string Name { get; private set; } public DataEventHandler(string name) { this.Name = name; } public void OnEvent(Data data, long sequence, bool endOfBatch) { Console.WriteLine("Thread = {0}, Handler = {1}, Sequence = {2},

Value = {3}", Thread.CurrentThread.ManagedThreadId.ToString(), this.Name, sequence, data.Value);

} } ...

Page 22: Disruptor

DSLGenerate disruptor

instance with factory method and ring

buffer size

Setup consumer dependency graph

with Disruptor.HandleEvent

sWith

Start disruptor with Disruptor.Start

Get ring bufferGet next ring buffer sequenceSet data

Publish data

Page 23: Disruptor

DSL... var disruptor = new Disruptor.Dsl.Disruptor<Data>(() => new Data(),

(int)Math.Pow(2,4), TaskScheduler.Default);

disruptor.HandleEventsWith(new DataEventHandler("Handler1"));

var ringBuffer = disruptor.Start(); var sequenceNo = ringBuffer.Next(); var data = ringBuffer[sequenceNo];

data.Value = "Hello"; ringBuffer.Publish(sequenceNo); sequenceNo = ringBuffer.Next();

data = ringBuffer[sequenceNo]; data.Value = "World"; ringBuffer.Publish(sequenceNo);

disruptor.Shutdown(); ...

Page 24: Disruptor

NON-DSL

Generate ring buffer with factory method and ring buffer size

Setup consumer dependency graph with

EventProcessor And Barrier

Start EventProcessor in another thread

Get next ring buffer sequenceSet dataPublish data

Page 25: Disruptor

NON-DSL... var ringBuffer = RingBuffer<Data>.CreateSingleProducer(() => new Data(), (int)Math.Pow(2, 4)); var barrier = ringBuffer.NewBarrier(); var eventProcessor = new BatchEventProcessor<Data>(ringBuffer, barrier, new

DataEventHandler("Handler1"));

Task.Factory.StartNew(() => eventProcessor.Run());

var sequenceNo = ringBuffer.Next(); var data = ringBuffer[sequenceNo]; data.Value = "Hello"; ringBuffer.Publish(sequenceNo);

sequenceNo = ringBuffer.Next(); data = ringBuffer[sequenceNo]; data.Value = "World"; ringBuffer.Publish(sequenceNo);

eventProcessor.Halt();

Application.DoEvents(); ...

Page 26: Disruptor

UNICAST: 1P - 1C...using Disruptor; namespace ConsoleApplication29 { … class Program { static void Main( string[] args) { var disruptor = new Disruptor.Dsl. Disruptor<Data>(() => new Data(),

(int)Math .Pow(2,4), TaskScheduler.Default); disruptor.HandleEventsWith(new DataEventHandler("Handler1”)); var ringBuffer = disruptor.Start(); var idx = 0; while ( true) { var sequenceNo = ringBuffer.Next(); var data = ringBuffer[sequenceNo]; data.Value = idx++.ToString(); ringBuffer.Publish(sequenceNo); Thread.Sleep(250); } disruptor.Shutdown(); } } }

P1 C1

Page 27: Disruptor

UNICAST: 1P - 1C... var ringBuffer =

RingBuffer<Data>.CreateSingleProducer(() => new Data(), (int)Math.Pow(2, 4));

var barrier = ringBuffer.NewBarrier(); var eventProcessor = new

BatchEventProcessor<Data>(ringBuffer, barrier, new DataEventHandler("Handler1"));

Task.Factory.StartNew(() => eventProcessor.Run()); ... eventProcessor.Halt(); ...

P1

Barrier C1

Page 28: Disruptor

UNICAST: 1P - 1C

Page 29: Disruptor

THREE STEP PIPELINE: 1P – 3C... var disruptor = new Disruptor.Dsl.Disruptor<Data>(() =>

new Data(), (int)Math.Pow(2,4), TaskScheduler.Default);

disruptor.HandleEventsWith(new DataEventHandler("Handler1"))

.Then(new DataEventHandler("Handler2"))

.Then(new DataEventHandler("Handler3"));

var ringBuffer = disruptor.Start(); ...disruptor.Shutdown(); … P1 C1 C2 C3

Page 30: Disruptor

THREE STEP PIPELINE: 1P – 3C... var ringBuffer = RingBuffer<Data>.CreateSingleProducer(() => new Data(),

(int)Math.Pow(2, 4)); var eventProcessor1 = new BatchEventProcessor<Data>(ringBuffer,

ringBuffer.NewBarrier(), new DataEventHandler("Handler1")); var eventProcessor2 = new BatchEventProcessor<Data>(ringBuffer,

ringBuffer.NewBarrier(eventProcessor1.Sequence), new DataEventHandler("Handler2"));

var eventProcessor3 = new BatchEventProcessor<Data>(ringBuffer, ringBuffer.NewBarrier(eventProcessor2.Sequence), new DataEventHandler("Handler3"));

Task.Factory.StartNew(() => eventProcessor1.Run()); Task.Factory.StartNew(() => eventProcessor2.Run()); Task.Factory.StartNew(() => eventProcessor3.Run()); ... eventProcessor1.Halt(); eventProcessor2.Halt(); eventProcessor3.Halt();...

P1 C1 C2 C3

Barrier

Barrier

Barrier

Page 31: Disruptor

THREE STEP PIPELINE: 1P – 3C

Page 32: Disruptor

MULTICAST: 1P – 3C... var disruptor = new

Disruptor.Dsl.Disruptor<Data>(() => new Data(), (int)Math.Pow(2,4), TaskScheduler.Default);

disruptor.HandleEventsWith(new DataEventHandler("Handler1"), new DataEventHandler("Handler2"), new DataEventHandler("Handler3"));

var ringBuffer = disruptor.Start(); ...disruptor.Shutdown(); …

P1

C1

C2

C3

Page 33: Disruptor

MULTICAST: 1P – 3C... var ringBuffer = RingBuffer<Data>.CreateSingleProducer(() => new Data(),

(int)Math.Pow(2, 4)); var barrier = ringBuffer.NewBarrier(); var eventProcessor1 = new BatchEventProcessor<Data>(ringBuffer, barrier, new

DataEventHandler("Handler1")); var eventProcessor2 = new BatchEventProcessor<Data>(ringBuffer, barrier, new

DataEventHandler("Handler2")); var eventProcessor3 = new BatchEventProcessor<Data>(ringBuffer, barrier, new

DataEventHandler("Handler3"));

Task.Factory.StartNew(() => eventProcessor1.Run()); Task.Factory.StartNew(() => eventProcessor2.Run()); Task.Factory.StartNew(() => eventProcessor3.Run());

... eventProcessor1.Halt(); eventProcessor2.Halt(); eventProcessor3.Halt(); ...

P1

C1

C2

C3

Barrier

Page 34: Disruptor

MULTICAST: 1P – 3C

Page 35: Disruptor

DIAMOND: 1P – 3C... var disruptor = new Disruptor.Dsl.Disruptor<Data>(()

=> new Data(), (int)Math.Pow(2,4), TaskScheduler.Default);

disruptor.HandleEventsWith(new DataEventHandler("Handler1"), new DataEventHandler("Handler2")).Then(new DataEventHandler("Handler3"));

var ringBuffer = disruptor.Start();...disruptor.Shutdown(); …

P1C1

C2C3

Page 36: Disruptor

DIAMOND: 1P – 3C... var ringBuffer = RingBuffer<Data>.CreateSingleProducer(() => new Data(),

(int)Math.Pow(2, 4)); var barrier = ringBuffer.NewBarrier(); var eventProcessor1 = new BatchEventProcessor<Data>(ringBuffer, barrier, new

DataEventHandler("Handler1")); var eventProcessor2 = new BatchEventProcessor<Data>(ringBuffer, barrier, new

DataEventHandler("Handler2")); var eventProcessor3 = new BatchEventProcessor<Data>(ringBuffer,

ringBuffer.NewBarrier(eventProcessor1.Sequence, eventProcessor2.Sequence), new DataEventHandler("Handler3"));

Task.Factory.StartNew(() => eventProcessor1.Run()); Task.Factory.StartNew(() => eventProcessor2.Run()); Task.Factory.StartNew(() => eventProcessor3.Run()); ... eventProcessor1.Halt(); eventProcessor2.Halt(); eventProcessor3.Halt(); ...

P1C1

C2C3

Barrier

Barrier

Page 37: Disruptor

DIAMOND: 1P – 3C

Page 38: Disruptor

LMAX ARCHITECTURE

Page 39: Disruptor

LMAX ARCHITECTURE

Page 40: Disruptor

LMAX ARCHITECTURE

Page 41: Disruptor

LMAX ARCHITECTURE Journaler

Store all the events in a durable form Replicator

Use IP multicasting to sync with slave node

Page 42: Disruptor

REFERENCE42

Page 43: Disruptor

REFERENCE The LMAX Architecture

http://martinfowler.com/articles/lmax.html

Compare-and-swap - Wikipedia, the free encyclopedia https://en.wikipedia.org/wiki/Compare-and-swap

Memory barrier - Wikipedia, the free encyclopedia https://en.wikipedia.org/wiki/Memory_barrier

43

Page 44: Disruptor

REFERENCE Circular buffer - Wikipedia, the free

encyclopedia https://en.wikipedia.org/wiki/Circular_buffer

并发框架 Disruptor 译文 | 并发编程网 - ifeve.com http://ifeve.com/disruptor/

[C#][VB.NET].NET 4.0 Barrier Class | Level Up - 點部落 https://dotblogs.com.tw/larrynung/archive/2009/0

8/22/10182.aspx?fid=945744

Page 45: Disruptor

Q&A45

Page 46: Disruptor

QUESTION & ANSWER

46