binary studio academy: concurrency in c# 5.0
TRANSCRIPT
Concurrency in C# 5.0Binary Studio Academy
binary-studio.com
Contents
1. Concurrency
2. Multithreading Programming
3. Thread Synchronization4. Parallel Programming5. Collections
6. Asynchronous Programming
7. Cancellation
8. Async Synchronization
9. Exceptions
Concurrency
Asynchronous
Asynchronous
OK
First Name
Last Name
Birthdate Program Storage
Asynchronous
OK
Salary
PercentProgram
Loop
Asynchronous
OK
Salary
PercentProgram
Loop
Multithreading Programming
Main thread
Thread_1 Thread_2 Thread_3
Thread_4 Thread_5 Thread_6Thread_7
Thread_8 Thread_9
Thread
Program
Thread:- Culture- Principals- Priority- Execution Context
Demo
ThreadPool
Program
ThreadPool(10)
Thread1 Thread2
Demo
BackgroundWorker
- DoWork (ReportProgress())- ProgressChanged- RunWorkerCompleted
Demo
Timers
System.Threading.Timer:- callback based- need to dispose
System.Timers.Timer:- event based- thread safety
Timers in depth
Demo
Types Of Threads
● Main Thread (UI Thread)
○ Entry point
○ Sync
● Worker Thread
○ Independent flow
○ Async
Thread Synchronization
Thread Synchronization Primitives
● Lock● Mutex● AutoResetEvent● Semaphore
Demo
Parallel Programming
Data Parallelism
1
2
3
4
5
6
7
Process:y =x * x
1
4
9
16
25
36
49
Task Parallelism
1
2
3
4
5
6
7
Process:Console.WriteLine(collection.Max())
Process:Console.WriteLine(collection.Min())
Process:Console.WriteLine(collection.Average())
Process:Console.WriteLine(collection.Sum())
Task
PLINQ
collection.AsParallel().Where(c => c > 0).ForAll()
(from c in collection.AsParallel()where c > 0select c).ForAll()
Demo
Collections
Immutable CollectionsThread Safe collectionsBlocking CollectionsAsynchronous collections
Immutable Collections
● ImmutableStack<T>● ImmutableQueue<T>● ImmutableList<T>● ImmutableSet<T>● ImmutableDictionary<T, K>
var list = ImmutableList<int>.Empty;list = list.Insert(0, 1);list = list.Add(2);list = list.RemoveAt(0);Console.WriteLine(list[0]);
Thread Safe Collections
● ConcurrentStack<T>● ConcurrentQueue<T>● ConcurrentBag<T>
ConcurrentDictionary<int, string> dictionary = new ConcurrentDictionary<int, string>();
dictionary.AddOrUpdate(k, key => value,(key, oldvalue) => value); // if key exist
Blocking Collections
new BlockingCollection<T>(); // first-in-first-outnew BlockingCollection<T>(new ConcurrentStack<T>()); // last-in-last-outnew BlockingCollection<T>(new ConcurrentBag<T>()); // unordered
Thread1:_bc.Add(1);_bc.Add(2);_blockingQueue.CompleteAdding();
Thread2:foreach(var item in _bc.GetConsumingEnumerable())
Console.WriteLine(item);
Asynchronous Collections
BufferBlock<T>
AsyncProducerConsumerQueue<T>AsyncCollection<T> Nito.AsyncEx
Demo
Asynchronous Programming
OK
First Name
Last Name
Birthdate Program Storage
Asynchronous Programming Model (APM)
● BeginMethod
● Callback
● IAsyncResult
● IAsyncState
● EndMethod
FileStream fs = new FileStream(path, FileMode.OpenOrCreate, FileAccess.Write, FileShare.None, 8, FileOptions.Asynchronous);string content = "A quick brown fox jumps over the lazy dog";byte[] data = Encoding.Unicode.GetBytes(content);fs.BeginWrite(data, 0, data.Length, OnWriteCompleted, fs);
private static void OnWriteCompleted(IAsyncResult asyncResult) { FileStream fs = (FileStream)asyncResult.AsyncState; fs.EndWrite(asyncResult); fs.Close();}
Event-based Asynchronous Pattern (EAP)
var wc = new WebClient();wc.DownloadStringCompleted += (sender, args) =>{ if (args.Cancelled)
Console.WriteLine ("Canceled"); else if (args.Error != null)
Console.WriteLine ("Exception: " + args.Error.Message); else {
Console.WriteLine (args.Result.Length + " chars were downloaded");
// We could update the UI from here... }};wc.DownloadStringAsync (new Uri ("http://www.linqpad.net")); // Start it
● Event
● EventArgs
● ObjectState
Task-based Asynchronous Pattern (TAP)
public Task<T> MethodNameAsync(…, CancellationToken cancellationToken, IProgress<T> progress);
● Task, Task<T>
● CancellationToken
● IProgress
● syntax support (async/await)
Create task
● new Task(() => return 42).Start()
● TaskFactory.Start(() => return 42)
● Task.Run(() => return 42)
public async Task Method()
{
await Task.Delay(500);
return Task.Run(() => Console.WriteLine(“Async”))
.ContinueWith((t) => Console.WriteLine(“Task finished”));
}
Task Helpers
● Task.WhenAll()
● Task.WhenAny()
● Task.WaitAll()
● Task.WhenAny()
● Task.Delay()
● Task.Run()
● Task.Yield()
ConfigureAwait
Public async void Click(object sender, EventArgs args)
{
var str = await service.DownloadString(Url).ConfigureAwait(false);
UxTextBlock.Text = str; // Runtime Error
}
Public async void Click(object sender, EventArgs args)
{
var str = await service.DownloadString(Url);
UxTextBlock.Text = str;
}
Task Completion Source
EAP
APM
TAP
SetResultSetException
Task Completion Source
public Task UseBackgroundWorker() { var tcs = new TaskCompletionSource<double>(); var backgroundWorker = new BackgroundWorker(); backgroundWorker.DoWork += (o, e) => { Thread.Sleep(2000); e.Result = 42; }; backgroundWorker.RunWorkerCompleted += (o, e) => { if (e.Error != null) { tcs.SetException(e.Error); } else { tcs.SetResult((double)e.Result); } }; return tcs.Task; }
Cancellation
CancellationTokenSource
CancellationTokenSource.Cancel()
CancellationToken
Cancellation.Token.IfCancellationRequested
CancellationToken.ThrowIfCancellationRequest
Demo
Async Synchronization
Lock
Task t;lock{
t = httpClient.GetAsync();}var result = await t;
Async lock
SemaphoreSlim _mutex = new SemaphoreSlim(1);…public async Task Method(){
await _mutex.WaitAsync();try{
var result = await httpClient.GetAsync(Url);}finally{
_mutext.Release();}}
AsyncLock _mutex = new AsyncLock();…public async Task Method(){
using (await _mutex.LockAsync()){
var result = await httpClient.GetAsync(Url);}}
Nito.AsyncEx
Exceptions
Wait()
WaitAll()
WaitAny()
Result
AggregationException
Aggregation Exception
Task task1 = Task.Factory.StartNew(() => LongOperationAsync()); try { task1.Wait(); } catch (AggregateException ae) { ae.Handle((x) => { if (x is MyException) { Console.WriteLine("HandleException."); return true; } return false; }); }
The end