reactive programming in .net - actorbased computing with akka.net
TRANSCRIPT
akka.net
Reactive Manifest und Akka.Net
Wie kommen wir da hin? Reactive Manifesto
event Handling data flow graph processing
System Architecure
zuverlässige skalierbare
elastic
message-driven
resilient
responsive
http://www.reactivemanifesto.org/de
Elasticreact to load
Message-DrivenServices / Components Interaction
Resilientreact to failures
Responsivereact to users
Reactive System
goal
methods
concepts
resilient elastic
Up OutDown In
at first, dry theory…
Carl Hewitt (1973),Actor Model of Computation:Scalable Robust Information Systems
…jetzt.
• Typesafe: Actor-Framework• Scala (JVM) => Java
• ausgereiftes, erprobtes Framework
• Akka.Net• Portierung von Akka (2013)
• Aaron Stannard Roger Johansson
• http://getakka.net
akka.net
Concurreny
Scalability
Fault-tolerance
Simpler
Programming Model
Managed Runtime
Open Source Distribution
…with a single unified
Scale UP
Scale OUT
http://doc.akka.io/docs/akka/current/intro/use-cases.html
PM> Install-Package Akkanuget
using Akka.Actor;
using (var actorSystem = ActorSystem.Create("MyActorSystem")){
// universe}
nur innerhalb dieses Blocks„leben“ Aktoren
var actorSystem = ActorSystem.Create("MyActorSystem"));
// universe
actorSystem.WhenTerminated.Wait(); // wait for termination-taskactorSystem.AwaitTermination(); // deprecated
oder
// universeIActorRef myActor = actorSystem.ActorOf(
Props.Create(() => new MyActor()));
using (var actorSystem = ActorSystem.Create("MyActorSystem")){
}
// talk to the actormyActor.Tell(new SomeMessage("rise up slave..."));
erzeuge AktorSystem
fordere spezifischenAktor an
sprich mit dem erhaltenenAktor
public class SomeMessage{
public SomeMessage(string name){
Name = name;}
public string Name { get; private set; }}
public class MyActor : ReceiveActor{
public MyActor(){
Receive<string>(message => {Sender.Tell(message);
});
Receive<SomeMessage>(message => {// react to message
});}
}
alle Nachrichten sindimmutable
Nachrichten werdenprinzipiell nach ihremTyp unterschieden *
* es geht fein-granularerund abstrakter
Receive<string>(s => Console.WriteLine("Received string: " + s)); //1Receive<int>(i => Console.WriteLine("Received integer: " + i)); //2
IActorRef sender = Sender;
//Receive using FuncsReceive<string>(s =>{
if (s.Length > 5) {Console.WriteLine("1: " + s);return true;
}return false;
});Receive<string>(s => Console.WriteLine("2: " + s));
//predicatesReceive<string>(s => s.Length > 5, s => Console.WriteLine("1: " + s)); //1Receive<string>(s => s.Length > 2, s => Console.WriteLine("2: " + s)); //2Receive<string>(s => Console.WriteLine("3: " + s));
//handler priorityReceive<string>(s => s.Length > 5, s => Console.WriteLine("1: " + s)); //1Receive<string>(s => s.Length > 2, s => Console.WriteLine("2: " + s)); //2Receive<string>(s => Console.WriteLine("3: " + s)); //3
public class MyActor : ActorBase{
public MyActor(){
//nothing todo}
protected override bool Receive(object _message){
//handle different messagesvar message = _message as SomeMessage;if(message != null){
return false;}
return false;}
}
Nachrichten werdenprinzipiell nach ihremTyp unterschieden *
* es geht fein-granularerund abstrakter
using Akka.Actor;
public class MyUntypedActor : UntypedActor{
protected override void OnReceive(object message) {
var msg = message as Messages.InputError; if (msg != null) {
// cool, its for me} else{
Unhandled(message); //from ActorBase}
} }
• anders als beim ReceiveActor müssen nicht behandelteNachrichten selbst als solche markiert werden
IActorRef friend = Context.ActorOf(Props.Create(() => new MyReceiveActor()));sender.Tell(new SomeMessage("Re: " + message.Name));
IActorRef friend = ...friend.Tell(new SomeMessage("Forward: " + message.Name), sender);
IActorRef friend = ...friend.Forward(new SomeMessage("Forward: " + message.Name)); forward
tell + explicit sender
tell
Nicht die da, andere….
event-driven thread
Actor
Behavior
Mailbox
State
Childs
Supervisor-Strategy
ActorRef 1 2 34Transport [Akka]
IActorRef myActor = ...myActor.Tell(new Message("four"));
• IActorRef: handle oder reference auf einen Actor• Nachrichten werden niemals direkt an einen
Actor gesendet• ActorSystem fügt Metadaten (Sender, Empfänger) hinzu• ActorSystem garantiert Ankunft jeder Nachricht *
IActorRef
• Actor Namen sind optional• best practise: named actors
Create
Look up
IActorRef myActor = actorSystem.ActorOf(Props.Create(() => new MyActor()),"myactor1"
);
• adressierbar über ActorPath• Actoren bilden eine Hierarchy
Parent
Children
Sender
ActorSelection randomActor =actorSystem.ActorSelection("akka://ActorSys/user/myactor1");
Props props1 = Props.Create(typeof(MyActor));
Props props2 = Props.Create(() => new MyActor());
Props props3 = Props.Create<MyActor>();
typeof Syntax
lambda Syntax
generic Syntax
• einziger Weg um Parameterzu übergeben
• unsicher
Context.ActorOf(props2);
actorSystem.ActorOf(props2);
aus einem Actor heraus:
direkt im ActorSystem:-> erzeugt HierarchieVerwendung
akka.tcp://MySystem@localhost:9001/user/actorName1
Protocol
ActorSystem
Akka
Path
Akka Remote
Address
/system
Guardians
/a1 /a2top levelactor
/b1
/c1 /cx
/b2
/c2 /cx
akka://ActorSystem/user
akka://ActorSystem/user/a1
akka://ActorSystem/user/a1/b2
akka://ActorSystem/user/a1/*/cx
/user
/
parent
parent
Failure
SupervisionStrategy• Restart, Stop, Resume, Escalate
• 2 Default-Strategien: One-For-One und All-For-One
• erklärtes Ziel ist es Fehler einzugrenzen:
• localizing the failure: fehleranfälligen Code
The critical thing to know here is that *whatever action is
taken on a parent propagates to its children*. If a parent is
halted, all its children halt. If it is restarted, all its children
restart.
protected override SupervisorStrategy SupervisorStrategy() {// immer die gleiche Entscheidungreturn new OneForOneStrategy(
maxNrOfRetries: 10,withinTimeRange: TimeSpan.FromSeconds(30),localOnlyDecider: exception =>{
Console.WriteLine("*** Supervision: Restart");return Directive.Restart;
});
}
Clients[SignalR]
Clients[SignalR]
Clients[SignalR]
GameHub[SignalR]
Bridge-Actor
[Akka]
GameCtrl-Actor
[Akka]
Player-Actor
[Akka]
internetjoin
attack attack
refreshState
sendState
changeHealth
join
attack
Clients[SignalR]
Clients[SignalR]
Clients[SignalR]
GameHub[SignalR]
internet
join
attack
Act
orS
yste
m[A
kka]
public class MvcApplication : HttpApplication {
protected void Application_Start(){
//ASP.Net Stuff: Filters, Routes, ...
//Akka initGameActorSystem.Create();
}
void Application_End(){
GameActorSystem.Shutdown();}
}
public static class GameActorSystem{
private static ActorSystem ActorSystem;private static IGameEventsPusher _gameEventsPusher;
public static void Create(){
_gameEventsPusher = new SignalRGameEventPusher();
ActorSystem = Akka.Actor.ActorSystem.Create("GameSystem");
ActorReferences.GameController = ActorSystem.ActorOf<GameControllerActor>();ActorReferences.SignalRBridge = ActorSystem.ActorOf(
Props.Create(() =>new SignalRBridgeActor(_gameEventsPusher, ActorReferences.GameController)),
"SignalRBridge„);
}
public static void Shutdown(){
ActorSystem.Shutdown();ActorSystem.AwaitTermination(TimeSpan.FromSeconds(1));
}
public static class ActorReferences ...}
class SignalRGameEventPusher : IGameEventsPusher {
private static readonly IHubContext _gameHubContext;
static SignalRGameEventPusher() //static CTOR{
_gameHubContext = GlobalHost.ConnectionManager.GetHubContext<GameHub>();}
public void PlayerJoined(string playerName, int playerHealth){
_gameHubContext.Clients.All.playerJoined(playerName, playerHealth);}
public void UpdatePlayerHealth(string playerName, int playerHealth){
_gameHubContext.Clients.All.updatePlayerHealth(playerName, playerHealth);}
}
GameHub[SignalR]
Act
orS
yste
m[A
kka]
playerJoined
updatePlayerHealth
ClientsClients
Clients
public class GameHub : Hub{
public void JoinGame(string playerName){
GameActorSystem.ActorReferences.SignalRBridge.Tell(new JoinGameMessage(playerName));
}
public void Attack(string playerName){
GameActorSystem.ActorReferences.SignalRBridge.Tell(new AttackPlayerMessage(playerName));
}}
GameHub[SignalR]
join
attack
Act
orS
yste
m[A
kka]
Closed
Open
Deleted
open close
delete