andreas ulbrich ([email protected]) sde – microsoft robotics

41
CCR - Concurrency and Coordination Runtime Andreas Ulbrich ([email protected]) SDE – Microsoft Robotics

Upload: miguel-burrough

Post on 16-Dec-2015

223 views

Category:

Documents


3 download

TRANSCRIPT

Page 1: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

CCR -Concurrency and CoordinationRuntime

Andreas Ulbrich ([email protected])SDE – Microsoft Robotics

Page 2: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

Overview Why CCR? Hello, World! Message-Based Coordination CCR Examples

Asynchronous Programming Model (APM) User Interfaces

Error Handling

Page 3: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

Why CCR? Concurrency

Process many task simultaneously Scalability, Responsiveness Leverage parallelism

Coordination Exercise control Orchestrate asynchronous operations Handle (partial) failures

Runtime Scheduler, Extensibility

Page 4: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

CCR Programming Model Asynchronous message passing (in-

process) No explicit threads, locks, semaphores!

Task scheduled based on message availability Data-dependency scheduler Models concurrency

Coordination primitives (join, choice, …) Composition of data-driven components

Iterative tasks Express sequential control flow of async.

tasks

Page 5: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

Hello, World!

var queue = new DispatcherQueue();var port = new Port<string>(); port.Post("Hello, World!"); Arbiter.Activate(queue, Arbiter.Receive( false, port, message => Console.WriteLine(message) ));

Port: channel for sending and receiving messages

Post: sends a message

Task: delegate that handles the message (message is consumed)

Receive arbiter: coordination primitive

Task queue: dispatcher schedules queues RR, task must be activated on a queue

Port on which to receive the message

Not persistent: handles only one message

Page 6: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

DispatcherPort

DispatcherQueues

ThreadsArbiter

HandlerArbiter is attached to port

Arbiter is activated on queue

Dispatcher schedules items from its queues round-robin to run in its threads.

Post places a message on the port

Arbiter checks whether it can consume the message.

Creates work item frommessage and handler

Enqueue work item

Thread calls handler with message as arg.

Scheduler picks next work item to execute

Page 7: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

HandlerHandler

DispatcherPort

DispatcherQueues

ThreadsArbiter

Handler

There can be many of everything

Page 8: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

Using Messages for Coordination Message triggers an operation

PortSet<Operation1, Operation2> Message carries parameter for operation Decouples sender from actual implementation

Message signals completion of operation Port<Result>, PortSet<Result, Exception>, … Port is stand-in for actual result Port can be send to other tasks

Coordinate tasks with messages

Page 9: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

Coordination Primitives

Single-Port Primitives

Task-Scheduling(non port-specific)

Multi-Port Primitives

Multi-Port Receiver

Choice (Logical OR)

Join (Logical AND)

Multi-Item Receiver

Single Item Receiver

FromHandler

FromIterator Handler

Interleave(Reader/Writer)

Page 10: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

Choice Executes at most one of its child-tasks

PortSet<string, Exception> resultPort = …

Arbiter.Activate(queue,Arbiter.Choice(

resultPort,result => Console.WriteLine("result: " +

result),exception => Console.WriteLine("exception“)

));

Handler if string received

Handler if exception received

Page 11: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

Joined Receive Executes when all of its branches can execute

Coordinate on completion of concurrent tasks Atomic consumption to prevent deadlocks

Arbiter.Activate(queue,Arbiter.JoinedReceive<string, string>(false,

resultPort1, resultPort2,(result1, result2) =>

{Console.WriteLine(“done”);

})

);

Ports on which to receive messages

Handler receives both results as arguments

Page 12: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

Multi-Item/Multi-Port Receive Executes on reception of multiple items of the same type

Scatter/Gather, coordinate completion of parallel tasks Atomic consumption to prevent deadlocks

Arbiter.Activate(queue,Arbiter.MultiplePortReceive(

false,resultPorts,results =>{

Console.WriteLine(results.Length);}

));

Array of ports on which to receive messages

Handler receives array of results as argument

Page 13: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

Overview Why CCR? Hello, World! Message-Based Coordination CCR Examples

Asynchronous Programming Model (APM) User Interfaces

Error Handling

Page 14: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

Asynchronous Programming BCL: Asynchronous versions for many

operations BeginOperation, EndOperation pair Callback when operation completed/failed

BeginRead(buffer, offset, count, callback, state) Returns IAsyncResult (moniker for pending

result) Also passed to callback

EndRead(asyncResult) Returns result of operation

Page 15: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

APM Gotchas Callback maybe called from any thread,

e.g. Calling thread (synchronous completion) Thread pool (potential to starve) Depends on implementation of Begin/End

Coordination is clumsy Sequential (continuation passing, nested

delegates) Recurrence (ping pong between callbacks) Scatter/Gather, Partial failure

Page 16: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

APM with CCR Use CCR to

Model concurrency of application Coordinate asynchronous operations

Getting out of APM is easy In the callback post IAsyncResult to a CCR

port

Page 17: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

Asynchronous ReadIEnumerator<ITask> CcrReadFileAsync(string file){ var resultPort = new Port<IAsyncResult>(); using (var fs = new FileStream(file,…,FileOptions.Asynchronous)) { var buf = new byte[fs.Length]; fs.BeginRead(buf, 0, buf.Length, resultPort.Post, null); IAsyncResult result = null; yield return Arbiter.Receive(false, resultPort, ar => { result = ar; }); try { fs.EndRead(result); ProcessData(buf); } catch { // handle exception } }}

Iterative task: models sequential control flow1)Begin read operation2)“wait” for result of read3)Process read data

123

Use port to coordinate on completion of async. operation

Callback: simply post IAsyncResult as a message

Yield receiver task,Task is activated by dispatcher,Dispatcher calls MoveNext() when task complete.Does not block thread!

Page 18: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

Asynchronous ReadIEnumerator<ITask> CcrReadFileAsync(string file){ var resultPort = new Port<IAsyncResult>(); using (var fs = new FileStream(file,…,FileOptions.Asynchronous)) { var buf = new byte[fs.Length]; fs.BeginRead(buf, 0, buf.Length, resultPort.Post, null); yield return (Receiver)resultPort; var result = (IAsyncResult)resultPort; try { fs.EndRead(result); ProcessData(buf); } catch { // handle exception } }}

Simplified notation

Page 19: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

Activating Iterative Tasks Directly

queue.Enqueue(new IterativeTask(“test.txt”, CcrReadFileAsync)

);

Yielded from an iterative task yield return new IterativeTask(…);

As task conditioned by an arbiter Arbiter.Activate(queue,

Arbiter.ReceiveWithIterator(false, port, CcrReadFileAsync)

)

Page 20: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

Benefits of Iterative Tasks Simplified code

Access to locals, no need to pass async. state

Sequential control flow, e.g. loops No nesting, ping/pong of callbacks

Ability to use using, try/finally Simplifies resource management

Page 21: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

Example: Stream Copy Block-copy large file from input stream to

output stream

Sequential control flow with repeated async. operations while (input not empty):

read block from inputwrite block to output

Page 22: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

CCR Stream Wrapperspublic static PortSet<int, Exception> Read( Stream stream, byte[] buffer, int offset, int count){ var resultPort = new PortSet<int, Exception>(); stream.BeginRead(buffer, offset, count, asyncResult => { try { resultPort.Post(stream.EndRead(asyncResult)); } catch (Exception e) { resultPort.Post(e); } }, null); return resultPort;}

Port set: collection of ports, one for each possible result of the operation

Post result or exception

Page 23: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

CopyStreamstatic IEnumerator<ITask> CopyStream(Stream source, Stream dest){ try { var buffer = new byte[blocksize]; int read = 0; do { Exception exception = null; yield return Arbiter.Choice( StreamAdapter.Read(source, buffer, 0, buffer.Length), r => { read = r; }, e => { exception = e; } ); if (exception == null) { // write to dest

Choice arbiter: executes only one of its branches depending on the received message Port set on

which to receive message

Handler if int is received Handler if

Exception is received

Page 24: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

CopyStreamstatic IEnumerator<ITask> CopyStream(Stream source, Stream dest){ try { var buffer = new byte[blocksize]; int read = 0; do { var readResult = StreamAdapter.Read( source, buffer, 0, buffer.Length); yield return (Choice)readResult; var exception = (Exception)readResult; if (exception == null) { // write to dest read = (int)readResult;

Simplified notation

Page 25: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

CopyStream See the complete sample in Visual

Studio

Page 26: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

CopyStream II Read and write are sequential Modify to exploit parallelism

Read the next block while writing Have to coordinate

“wait” untilread succeeds and write succeeds

orread fails

orwrite fails

Receivers

Join

Choice

Page 27: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

CopyStream IIwhile (write > 0){ var writeResult = StreamAdapter.Write(dest, bufferB, 0, write); if (read > 0) { // read new bytes and write existing buffer readResult = StreamAdapter.Read(source, …);

yield return Arbiter.Choice( Arbiter.JoinedReceive<int, EmptyValue>(false, readResult, writeResult, (r, s) => { read = r; } ), Arbiter.Receive<Exception>(false, readResult, e => { exception = e; }), Arbiter.Receive<Exception>(false, writeResult, e => { exception = e; }) );

Choice arbiter: only one of its branches will execute

Receiver branches for exceptions

Join branch: receives int on readResult and EmptyValue on writeResultJoin handler delegate

gets both messages are parameters

Page 28: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

CopyStream II See the complete sample in Visual

Studio

Page 29: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

User Interfaces Concurrency desirable for

responsivenes UI often single threaded or has thread

affinity WinForms, WPF Manual thread management, cross-thread

calls

Use CCR to decouple UI from application logic Adapter for handling cross thread calls

messages

Page 30: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

User Interface - Adapters

CCR to UI adapter(WPF or

WinForms)

Application

Logic(CCR Tasks)

UI sends message totrigger application logic

Application logic sendsmessage to indicate completion or status

CCR adapter handlesmessage in UI context

Page 31: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

Exception Handling Concurrent, async: try/catch wont do the job Method 1: explicit handling

Exception caught at origin and posted as message for coordination

See CopyStream sample Method 2: Causalities

Captures all exception thrown by tasks that are executed as result of the same cause (transitive)

Nested exception handling in concurrent, asynchronous programs!

Page 32: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

Causalitiesvoid ExecuteWithCausality(){ var causality = new Causality("my causality"); Dispatcher.AddCausality(causality); var port = new Port<int>(); port.Post(42); Arbiter.Activate(queue, Arbiter.Receive(false, port, HandleMessage) ); Arbiter.Activate(queue, Arbiter.Receive(false, (Port<Exception>)causality.ExceptionPort, Console.WriteLine ) );}void HandleMessage(int message){ throw new Exception("Catch me if you can!");}

Create a new causality and add it to the active causalities of the current task

Send a message and activate a receiver. All active causalities travel with the message to the tasks that handle the message.

Handle the exception thrown within the causality.

Page 33: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

DispatcherQueue Manage queue of tasks

Tasks are picked round-robin from queues in scheduler

Parameters Execution policy, queue length, scheduling

rate Use separate queues to

Separate tasks with significantly different arrival rates

Enforce scheduling policies

Page 34: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

Dispatchers Manage a thread pool

Default: 1 per core, or 2 on single core Parameters

Number of threads, STA/MTA, Priority, Background

Use separate dispatchers Interop with legacy components (COM) Isolate “uncooperative” components Separate tasks with significantly different

lengths

Page 35: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

Summary Asynchronous message passing Easy coordination of concurrent,

asynchronous task Compositional Arbiters Sequential control flow with iterators

Exploits parallelism Easy integration into .NET applications

Page 36: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

Where to get it? Microsoft CCR and DSS Toolkit 2008

http://microsoft.com/ccrdss

Microsoft Robotics Developer Studio 2008 http://microsoft.com/robotics Express Version (free for non-commercial use) Standard Version (free for academic institutions)

Internal on \\products

Page 37: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

CCR based Components Derive from CcrServiceBase

Stores reference to dispatcher queue Overloads for Activate, TimeoutPort, etc

Model operations as messages PortSet<SuccessResultType, Exception> for

response

Expose operations port Activate persistent receivers for operation

messages

Page 38: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

CCR based Components (cont) Protect state with Interleave arbiter

TearDownReceiverGroup Terminates Interleave

ExclusiveReceiverGroup Message handlers that modify state

ConcurrentReceiverGroup Message handlers that read state

Efficient reader/writer lock (writer biased) Interleave protection spans iterative tasks!

Page 39: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

CCR based Components (cont)

public CcrComponent(DispatcherQueue queue) : base(queue){ Activate( Arbiter.Interleave( new TeardownReceiverGroup( Arbiter.Receive<StopOperation>(false, _operationsPort, StopHandler)), new ExclusiveReceiverGroup( Arbiter.ReceiveWithIterator<WriteOperation>(true, _operationsPort, WriteHandler)), new ConcurrentReceiverGroup( Arbiter.Receive<ReadOperation>(true, _operationsPort, ReadHandler)) ) );}

Page 40: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

Flowing Context Culture, user principal, … Explicit

Make it part of your message Dispatcher

Useful if set of context is limited E.g. one dispatcher per culture

Custom receivers Capture context on post Set it on execute

Page 41: Andreas Ulbrich (anulb@microsoft.com) SDE – Microsoft Robotics

Performance Intel Xeon 5150 (2x 2.66 GHz), 4 GByte Persisted signal item receiver latency

Send message schedule task run task 1.67 µs

Iterator latency Spawn schedule run task 2.1 µs