разработка серверов и серверных приложений лекция №4
DESCRIPTION
В данной лекции рассмотрена минимальная реализация акторной модели, включающая - отправку сообщений, - создание новых акторов и смену поведения для приема следующего сообщения. Исходный код реализации выложен на https://github.com/hwdtech/HWdTech.DS. Код на C#. При разработке использовались библиотеки: Autofac, NuUnit, MoqTRANSCRIPT
Глава 5. Первая реализация 118
Концепция 119
IMessage 120
public interface IMessage{
string Target { get; }}
MessageBus 121
public class MessageBus{ public static void Send(IMessage message) {…}
public static void Join( string address, Actor actor ) { … }}
Actor 122
public abstract class Actor{ public Actor(string address) { … } public Actor() { … }
public void Become(Action<IMessage> handler) { … } public void Receive(IMessage message) { … } public abstract void Handle(IMessage message); }
Фасад для IoC 123
public class DIContainer{ IContainer container;
public T Resolve<T>() { return container.Resolve<T>(); }}
Фасад для IoC 124
Файл: RouterImpl/IoCRegistration.csusing Autofac;
namespace HWdTech.DS.Internals.Implementation{ class RouterRegistrationModule : Module { protected override void Load(ContainerBuilder builder) { IRouter router = new RouterImpl();
builder.RegisterInstance(router).As<IRouter>(); } }}
Фасад для IoC 125
public class DIContainer{ public DIContainer() { ContainerBuilder builder = new ContainerBuilder(); Assembly[] assemblies = AppDomain.CurrentDomain.GetAssemblies(); builder.RegisterAssemblyModules(assemblies); this.container = builder.Build(); AppDomain.CurrentDomain.AssemblyLoad += AssemblyLoadEventHandler; } void AssemblyLoadEventHandler(object sender, AssemblyLoadEventArgs args) { LoadAssembly(args.LoadedAssembly); } LoadAssembly(Assembly assembly) { ContainerBuilder builder = new ContainerBuilder(); builder.RegisterAssemblyModules(assembly); builder.Update(container); }}
Реализация MessageBus.Join 126
static IRouter router;
public static void Join(string address, Actor actor){ if (null == router) { router = Singleton<DIContainer>.Instance.Resolve<IRouter>(); }
router.RegisterOrReplace(address, actor.Receive);}
Реализация MessageBus.Send 127
static IThreadPool threadPool;
public static void Send(IMessage message){ if (null == threadPool) { threadPool = Singleton<DIContainer>.Instance.Resolve<IThreadPool>(); } threadPool.StartTask(HandleMessage, message);}static void HandleMessage(object o){ IMessage message = (IMessage) o; if (null == router) { router = Singleton<DIContainer>.Instance.Resolve<IRouter>(); } router.Send(message);}
Ping-Pong Test 128
[Test]public void PingPongTest(){ ThreadPoolImpl threadPool = new ThreadPoolImpl(); RouterImpl router = new RouterImpl();
Mock<IMessage> pingMessage = repository.Create<IMessage>(); pingMessage.SetupGet(m => m.Target).Returns("ping");
Actor pongActor = new PongActor(pingMessage.Object);
Mock<IMessage> pongMessage = repository.Create<IMessage>(); pongMessage.SetupGet(m => m.Target).Returns("pong");
ManualResetEvent waitSignal = new ManualResetEvent(false); PingActor pingActor = new PingActor(pongMessage.Object, waitSignal);
Assert.True(waitSignal.WaitOne(1000)); pingActor.Verify();}
Ping-Pong Test: Ping Actor 129
class PingActor : Actor{ int gotMessages; IMessage pongMessage; ManualResetEvent signal; const int numberOfResponsesShoulgGet = 5; public PingActor(IMessage pongMessage, ManualResetEvent signal) : base("ping") { this.signal = signal; this.pongMessage = pongMessage; MessageBus.Send(pongMessage); } public override void Handle(IMessage message) { ++gotMessages; if (numberOfResponsesShoulgGet > gotMessages) { MessageBus.Send(pongMessage); } else { signal.Set(); } } public void Verify() { Assert.AreEqual(numberOfResponsesShoulgGet, gotMessages); } }
Ping-Pong Test: Pong Actor 130
class PongActor : Actor { IMessage pingMessage;
public PongActor(IMessage pingMessage) : base("pong") { this.pingMessage = pingMessage; }
public override void Handle(IMessage message) { MessageBus.Send(pingMessage); } }
Фасад для ThreadPool 131
public interface IThreadPool { void StartTask(Action<object> task, object arg); }
public class ThreadPoolImpl : IThreadPool{ public void StartTask(Action<object> action, object args = null) { System.Threading.ThreadPool.QueueUserWorkItem(new System.Threading.WaitCallback(action), args); }}
Пример теста для ThreadPool 132
[Test] public void ThreadPoolShouldRunTasksWithArgument() { System.Threading.ManualResetEvent canContinue = new System.Threading.ManualResetEvent(false);
ThreadPoolImpl pool = new ThreadPoolImpl();
object o = new object(); bool gotArgument = false;
pool.StartTask((obj) => { gotArgument = object.ReferenceEquals(obj, o); canContinue.Set(); }, o);
Assert.True(canContinue.WaitOne(1000)); Assert.True(gotArgument); }
Фасад для роутера 133
public interface IRouter { void Send(IMessage message); IRouter RegisterOrReplace(string channel, Action<IMessage> handler); }
Фасад для роутера 134
public class RouterImpl: IRouter { public RouterImpl() { routerImpl = new ConcurrentDictionary<string, Action<IMessage>>(); } public void Send(IMessage message) { Action<IMessage> handler; if (routerImpl.TryGetValue(message.Target, out handler)) { handler(message); } else { //ToDo: адресат неизвестен - надо разрешить с помощью внешнего сервиса. } } public IRouter RegisterOrReplace(string channel, Action<IMessage> handler) { routerImpl.AddOrUpdate(channel, handler, (key, oldValue) => { return handler; }); return this; } ConcurrentDictionary<string, Action<IMessage>> routerImpl; }