disruptor
TRANSCRIPT
DISRUPTORLarry Nung
AGENDAIntroductionRelative TechnologiesDisruptorLMAX ArchitectureReferenceQ & A
2
INTRODUCTION
INTRODUCTION High Performance Inter-Thread Messaging
Library LMAX build the system on the JVM platform
and centers on a Business Logic Processor that can handle 6 million orders per second on a single thread with very low latency: 100K TPS with 1 ms latency.
Multicast events to consumers, with consumer dependency graph.
Pre-allocate memory for events. Optionally lock-free.
RELATIVE TECHNOLOGIES
CAS Compare-and-swap An atomic instruction used in multithreading
to achieve synchronization
56
Core1 Core2
56=>57 55=>58
CAS
MEMORY BARRIERS A type of barrier instruction that causes a
central processing unit (CPU) or compiler to enforce an ordering constraint on memory operations issued before and after the barrier instruction.
CACHE LINE Unit of transfer between memory and CPU Cache-Lines size is (typically) 64 bytes.
FALSE SHARING When a system participant attempts to
periodically access data that will never be altered by another party, but that data shares a cache block with data that is altered, the caching protocol may force the first participant to reload the whole unit despite a lack of logical necessity.
FALSE SHARINGstruct foo { int x; int y; }; static struct foo f;
/* The two following functions are running concurrently: */ int sum_a() { int s = 0; int i; for (i = 0; i < 1000000; ++i) s += f.x; return s; }
void inc_b() { int i; for (i = 0; i < 1000000; ++i) ++f.y; }
CACHE LINE PADDING
DISRUPTOR
DISRUPTOR ARCHITECTURE Ring buffer size = 2 ^ n
Index = sequence & (bufferSize - 1) no comparisons no branches safe
Producer
Consumer
CONSUMER DEPENDENCY GRAPH Unicast: 1P - 1C
Three Step Pipeline: 1P – 3C
P1 C1
P1 C1 C2 C3
P1
C1
P1
C1
C2
C3
CONSUMER DEPENDENCY GRAPH Sequencer: 3P – 1C
Multicast: 1P – 3C
P1
P2
P3
C1
P1
C1
C2
C3
P1
C1
P2
P3
P1
C1
C2
C3
CONSUMER DEPENDENCY GRAPH Diamond: 1P – 3C
P1
C1
C2
C3
P1
C1
C2C3
CONSUMER DEPENDENCY GRAPH
WAIT STRATEGY BlockingWaitStrategy BusySpinWaitStrategy SleepingWaitStrategy TimeoutBlockingWaitStrategy PhasedBackoffWaitStrategy YieldingWaitStrategy
SETUP DISRUPTOR
CUSTOM EVENTHANDLER... public class Data { public string Value { get; set; } } public class DataEventHandler : IEventHandler<Data> { public string Name { get; private set; } public DataEventHandler(string name) { this.Name = name; } public void OnEvent(Data data, long sequence, bool endOfBatch) { Console.WriteLine("Thread = {0}, Handler = {1}, Sequence = {2},
Value = {3}", Thread.CurrentThread.ManagedThreadId.ToString(), this.Name, sequence, data.Value);
} } ...
DSLGenerate disruptor
instance with factory method and ring
buffer size
Setup consumer dependency graph
with Disruptor.HandleEvent
sWith
Start disruptor with Disruptor.Start
Get ring bufferGet next ring buffer sequenceSet data
Publish data
DSL... var disruptor = new Disruptor.Dsl.Disruptor<Data>(() => new Data(),
(int)Math.Pow(2,4), TaskScheduler.Default);
disruptor.HandleEventsWith(new DataEventHandler("Handler1"));
var ringBuffer = disruptor.Start(); var sequenceNo = ringBuffer.Next(); var data = ringBuffer[sequenceNo];
data.Value = "Hello"; ringBuffer.Publish(sequenceNo); sequenceNo = ringBuffer.Next();
data = ringBuffer[sequenceNo]; data.Value = "World"; ringBuffer.Publish(sequenceNo);
disruptor.Shutdown(); ...
NON-DSL
Generate ring buffer with factory method and ring buffer size
Setup consumer dependency graph with
EventProcessor And Barrier
Start EventProcessor in another thread
Get next ring buffer sequenceSet dataPublish data
NON-DSL... var ringBuffer = RingBuffer<Data>.CreateSingleProducer(() => new Data(), (int)Math.Pow(2, 4)); var barrier = ringBuffer.NewBarrier(); var eventProcessor = new BatchEventProcessor<Data>(ringBuffer, barrier, new
DataEventHandler("Handler1"));
Task.Factory.StartNew(() => eventProcessor.Run());
var sequenceNo = ringBuffer.Next(); var data = ringBuffer[sequenceNo]; data.Value = "Hello"; ringBuffer.Publish(sequenceNo);
sequenceNo = ringBuffer.Next(); data = ringBuffer[sequenceNo]; data.Value = "World"; ringBuffer.Publish(sequenceNo);
eventProcessor.Halt();
Application.DoEvents(); ...
UNICAST: 1P - 1C...using Disruptor; namespace ConsoleApplication29 { … class Program { static void Main( string[] args) { var disruptor = new Disruptor.Dsl. Disruptor<Data>(() => new Data(),
(int)Math .Pow(2,4), TaskScheduler.Default); disruptor.HandleEventsWith(new DataEventHandler("Handler1”)); var ringBuffer = disruptor.Start(); var idx = 0; while ( true) { var sequenceNo = ringBuffer.Next(); var data = ringBuffer[sequenceNo]; data.Value = idx++.ToString(); ringBuffer.Publish(sequenceNo); Thread.Sleep(250); } disruptor.Shutdown(); } } }
P1 C1
UNICAST: 1P - 1C... var ringBuffer =
RingBuffer<Data>.CreateSingleProducer(() => new Data(), (int)Math.Pow(2, 4));
var barrier = ringBuffer.NewBarrier(); var eventProcessor = new
BatchEventProcessor<Data>(ringBuffer, barrier, new DataEventHandler("Handler1"));
Task.Factory.StartNew(() => eventProcessor.Run()); ... eventProcessor.Halt(); ...
P1
Barrier C1
UNICAST: 1P - 1C
THREE STEP PIPELINE: 1P – 3C... var disruptor = new Disruptor.Dsl.Disruptor<Data>(() =>
new Data(), (int)Math.Pow(2,4), TaskScheduler.Default);
disruptor.HandleEventsWith(new DataEventHandler("Handler1"))
.Then(new DataEventHandler("Handler2"))
.Then(new DataEventHandler("Handler3"));
var ringBuffer = disruptor.Start(); ...disruptor.Shutdown(); … P1 C1 C2 C3
THREE STEP PIPELINE: 1P – 3C... var ringBuffer = RingBuffer<Data>.CreateSingleProducer(() => new Data(),
(int)Math.Pow(2, 4)); var eventProcessor1 = new BatchEventProcessor<Data>(ringBuffer,
ringBuffer.NewBarrier(), new DataEventHandler("Handler1")); var eventProcessor2 = new BatchEventProcessor<Data>(ringBuffer,
ringBuffer.NewBarrier(eventProcessor1.Sequence), new DataEventHandler("Handler2"));
var eventProcessor3 = new BatchEventProcessor<Data>(ringBuffer, ringBuffer.NewBarrier(eventProcessor2.Sequence), new DataEventHandler("Handler3"));
Task.Factory.StartNew(() => eventProcessor1.Run()); Task.Factory.StartNew(() => eventProcessor2.Run()); Task.Factory.StartNew(() => eventProcessor3.Run()); ... eventProcessor1.Halt(); eventProcessor2.Halt(); eventProcessor3.Halt();...
P1 C1 C2 C3
Barrier
Barrier
Barrier
THREE STEP PIPELINE: 1P – 3C
MULTICAST: 1P – 3C... var disruptor = new
Disruptor.Dsl.Disruptor<Data>(() => new Data(), (int)Math.Pow(2,4), TaskScheduler.Default);
disruptor.HandleEventsWith(new DataEventHandler("Handler1"), new DataEventHandler("Handler2"), new DataEventHandler("Handler3"));
var ringBuffer = disruptor.Start(); ...disruptor.Shutdown(); …
P1
C1
C2
C3
MULTICAST: 1P – 3C... var ringBuffer = RingBuffer<Data>.CreateSingleProducer(() => new Data(),
(int)Math.Pow(2, 4)); var barrier = ringBuffer.NewBarrier(); var eventProcessor1 = new BatchEventProcessor<Data>(ringBuffer, barrier, new
DataEventHandler("Handler1")); var eventProcessor2 = new BatchEventProcessor<Data>(ringBuffer, barrier, new
DataEventHandler("Handler2")); var eventProcessor3 = new BatchEventProcessor<Data>(ringBuffer, barrier, new
DataEventHandler("Handler3"));
Task.Factory.StartNew(() => eventProcessor1.Run()); Task.Factory.StartNew(() => eventProcessor2.Run()); Task.Factory.StartNew(() => eventProcessor3.Run());
... eventProcessor1.Halt(); eventProcessor2.Halt(); eventProcessor3.Halt(); ...
P1
C1
C2
C3
Barrier
MULTICAST: 1P – 3C
DIAMOND: 1P – 3C... var disruptor = new Disruptor.Dsl.Disruptor<Data>(()
=> new Data(), (int)Math.Pow(2,4), TaskScheduler.Default);
disruptor.HandleEventsWith(new DataEventHandler("Handler1"), new DataEventHandler("Handler2")).Then(new DataEventHandler("Handler3"));
var ringBuffer = disruptor.Start();...disruptor.Shutdown(); …
P1C1
C2C3
DIAMOND: 1P – 3C... var ringBuffer = RingBuffer<Data>.CreateSingleProducer(() => new Data(),
(int)Math.Pow(2, 4)); var barrier = ringBuffer.NewBarrier(); var eventProcessor1 = new BatchEventProcessor<Data>(ringBuffer, barrier, new
DataEventHandler("Handler1")); var eventProcessor2 = new BatchEventProcessor<Data>(ringBuffer, barrier, new
DataEventHandler("Handler2")); var eventProcessor3 = new BatchEventProcessor<Data>(ringBuffer,
ringBuffer.NewBarrier(eventProcessor1.Sequence, eventProcessor2.Sequence), new DataEventHandler("Handler3"));
Task.Factory.StartNew(() => eventProcessor1.Run()); Task.Factory.StartNew(() => eventProcessor2.Run()); Task.Factory.StartNew(() => eventProcessor3.Run()); ... eventProcessor1.Halt(); eventProcessor2.Halt(); eventProcessor3.Halt(); ...
P1C1
C2C3
Barrier
Barrier
DIAMOND: 1P – 3C
LMAX ARCHITECTURE
LMAX ARCHITECTURE
LMAX ARCHITECTURE
LMAX ARCHITECTURE Journaler
Store all the events in a durable form Replicator
Use IP multicasting to sync with slave node
REFERENCE42
REFERENCE The LMAX Architecture
http://martinfowler.com/articles/lmax.html
Compare-and-swap - Wikipedia, the free encyclopedia https://en.wikipedia.org/wiki/Compare-and-swap
Memory barrier - Wikipedia, the free encyclopedia https://en.wikipedia.org/wiki/Memory_barrier
43
REFERENCE Circular buffer - Wikipedia, the free
encyclopedia https://en.wikipedia.org/wiki/Circular_buffer
并发框架 Disruptor 译文 | 并发编程网 - ifeve.com http://ifeve.com/disruptor/
[C#][VB.NET].NET 4.0 Barrier Class | Level Up - 點部落 https://dotblogs.com.tw/larrynung/archive/2009/0
8/22/10182.aspx?fid=945744
Q&A45
QUESTION & ANSWER
46