akka cluster overview at 010dev
TRANSCRIPT
Dr. Roland Kuhn@rolandkuhn
Distributed by Design
onsdag 3 april 13
actorsremote actors
rise of the clusterwhen the cluster grows up
adding types
outline
onsdag 3 april 13
What is an Actor?
• Akka's unit of computation is called an Actor
• Actors are purely reactive components:– a mailbox– behavior & state– scheduled to run when sent a message
• Each actor has a parent, handling its failures
onsdag 3 april 13
Behavior
State
Actor
onsdag 3 april 13
Event-drivenThread
Behavior
State
Actor
onsdag 3 april 13
Event-drivenThread
Behavior
State
Actor
onsdag 3 april 13
Event-drivenThread
Behavior
State
Actor
onsdag 3 april 13
Event-drivenThread
Behavior
State
Actor
onsdag 3 april 13
Behavior
State
Actor
onsdag 3 april 13
Event-drivenThread
Behavior
State
Actor
onsdag 3 april 13
public class Greeting implements Serializable {public final String who;public Greeting(String who) { this.who = who; }
}
public class GreetingActor extends UntypedActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); int counter = 0;
public void onReceive(Object message) {if (message instanceof Greeting) { counter++;
log.info("Hello #" + counter + " " + ((Greeting) message).who);}
}}
Define Actor
onsdag 3 april 13
public class Greeting implements Serializable {public final String who;public Greeting(String who) { this.who = who; }
}
public class GreetingActor extends UntypedActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); int counter = 0;
public void onReceive(Object message) {if (message instanceof Greeting) { counter++;
log.info("Hello #" + counter + " " + ((Greeting) message).who);}
}}
Define ActorDefine the message(s) the Actor
should be able to respond to
onsdag 3 april 13
public class Greeting implements Serializable {public final String who;public Greeting(String who) { this.who = who; }
}
public class GreetingActor extends UntypedActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); int counter = 0;
public void onReceive(Object message) {if (message instanceof Greeting) { counter++;
log.info("Hello #" + counter + " " + ((Greeting) message).who);}
}}
Define ActorDefine the message(s) the Actor
should be able to respond to
Define the Actor class
onsdag 3 april 13
public class Greeting implements Serializable {public final String who;public Greeting(String who) { this.who = who; }
}
public class GreetingActor extends UntypedActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); int counter = 0;
public void onReceive(Object message) {if (message instanceof Greeting) { counter++;
log.info("Hello #" + counter + " " + ((Greeting) message).who);}
}}
Define ActorDefine the message(s) the Actor
should be able to respond to
Define the Actor class
Define the Actor’s behavior
onsdag 3 april 13
public class Greeting implements Serializable {public final String who;public Greeting(String who) { this.who = who; }
}
public class GreetingActor extends UntypedActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); int counter = 0;
public void onReceive(Object message) {if (message instanceof Greeting) {
counter++; log.info("Hello #" + counter + " " + ((Greeting) message).who);
}}
}
ActorSystem system = ActorSystem.create("MySystem");ActorRef greeter = system.actorOf(new Props(GreetingActor.class), "greeter");
Create Actor
onsdag 3 april 13
public class Greeting implements Serializable {public final String who;public Greeting(String who) { this.who = who; }
}
public class GreetingActor extends UntypedActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); int counter = 0;
public void onReceive(Object message) {if (message instanceof Greeting) {
counter++; log.info("Hello #" + counter + " " + ((Greeting) message).who);
}}
}
ActorSystem system = ActorSystem.create("MySystem");ActorRef greeter = system.actorOf(new Props(GreetingActor.class), "greeter");
Create Actor
Create an Actor system
onsdag 3 april 13
public class Greeting implements Serializable {public final String who;public Greeting(String who) { this.who = who; }
}
public class GreetingActor extends UntypedActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); int counter = 0;
public void onReceive(Object message) {if (message instanceof Greeting) {
counter++; log.info("Hello #" + counter + " " + ((Greeting) message).who);
}}
}
ActorSystem system = ActorSystem.create("MySystem");ActorRef greeter = system.actorOf(new Props(GreetingActor.class), "greeter");
Create Actor
Create an Actor systemActor configuration
onsdag 3 april 13
public class Greeting implements Serializable {public final String who;public Greeting(String who) { this.who = who; }
}
public class GreetingActor extends UntypedActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); int counter = 0;
public void onReceive(Object message) {if (message instanceof Greeting) {
counter++; log.info("Hello #" + counter + " " + ((Greeting) message).who);
}}
}
ActorSystem system = ActorSystem.create("MySystem");ActorRef greeter = system.actorOf(new Props(GreetingActor.class), "greeter");
Create Actor
Create an Actor systemActor configuration
Give it a name
onsdag 3 april 13
public class Greeting implements Serializable {public final String who;public Greeting(String who) { this.who = who; }
}
public class GreetingActor extends UntypedActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); int counter = 0;
public void onReceive(Object message) {if (message instanceof Greeting) {
counter++; log.info("Hello #" + counter + " " + ((Greeting) message).who);
}}
}
ActorSystem system = ActorSystem.create("MySystem");ActorRef greeter = system.actorOf(new Props(GreetingActor.class), "greeter");
Create Actor
Create an Actor system
Create the Actor
Actor configuration
Give it a name
onsdag 3 april 13
public class Greeting implements Serializable {public final String who;public Greeting(String who) { this.who = who; }
}
public class GreetingActor extends UntypedActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); int counter = 0;
public void onReceive(Object message) {if (message instanceof Greeting) {
counter++; log.info("Hello #" + counter + " " + ((Greeting) message).who);
}}
}
ActorSystem system = ActorSystem.create("MySystem");ActorRef greeter = system.actorOf(new Props(GreetingActor.class), "greeter");
Create Actor
Create an Actor system
Create the Actor
Actor configuration
Give it a nameYou get an ActorRef back
onsdag 3 april 13
Guardian System Actor
Actors can form hierarchies
onsdag 3 april 13
Guardian System Actor
system.actorOf( new Props(Foo.class), “Foo”);
Actors can form hierarchies
onsdag 3 april 13
Foo
Guardian System Actor
system.actorOf( new Props(Foo.class), “Foo”);
Actors can form hierarchies
onsdag 3 april 13
Foo
Guardian System Actor
getContext().actorOf( new Props(A.class), “A”);
Actors can form hierarchies
onsdag 3 april 13
A
Foo
Guardian System Actor
getContext().actorOf( new Props(A.class), “A”);
Actors can form hierarchies
onsdag 3 april 13
A
B
BarFoo
C
B E
A
D
C
Guardian System Actor
Actors can form hierarchies
onsdag 3 april 13
A
B
BarFoo
C
B E
A
D
C
Guardian System Actor
Name resolution - like a file-system
onsdag 3 april 13
A
B
BarFoo
C
B E
A
D
C
/Foo
Guardian System Actor
Name resolution - like a file-system
onsdag 3 april 13
A
B
BarFoo
C
B E
A
D
C
/Foo
/Foo/A
Guardian System Actor
Name resolution - like a file-system
onsdag 3 april 13
A
B
BarFoo
C
B E
A
D
C
/Foo
/Foo/A
/Foo/A/B
Guardian System Actor
Name resolution - like a file-system
onsdag 3 april 13
A
B
BarFoo
C
B E
A
D
C
/Foo
/Foo/A
/Foo/A/B
/Foo/A/D
Guardian System Actor
Name resolution - like a file-system
onsdag 3 april 13
Send Messagepublic class Greeting implements Serializable { public final String who; public Greeting(String who) { this.who = who; }}
public class GreetingActor extends UntypedActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); int counter = 0;
public void onReceive(Object message) { if (message instanceof Greeting) { counter++; log.info("Hello #" + counter + " " + ((Greeting) message).who); } }}
ActorSystem system = ActorSystem.create("MySystem");ActorRef greeter = system.actorOf(new Props(GreetingActor.class), "greeter");greeter.tell(new Greeting("Charlie Parker"), null);
onsdag 3 april 13
Send Messagepublic class Greeting implements Serializable { public final String who; public Greeting(String who) { this.who = who; }}
public class GreetingActor extends UntypedActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); int counter = 0;
public void onReceive(Object message) { if (message instanceof Greeting) { counter++; log.info("Hello #" + counter + " " + ((Greeting) message).who); } }}
ActorSystem system = ActorSystem.create("MySystem");ActorRef greeter = system.actorOf(new Props(GreetingActor.class), "greeter");greeter.tell(new Greeting("Charlie Parker"), null);
Send the message
onsdag 3 april 13
public class Greeting implements Serializable { public final String who; public Greeting(String who) { this.who = who; }}
public class GreetingActor extends UntypedActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); int counter = 0;
public void onReceive(Object message) { if (message instanceof Greeting) { counter++; log.info("Hello #" + counter + " " + ((Greeting) message).who); }
}}
ActorSystem system = ActorSystem.create("MySystem");ActorRef greeter = system.actorOf(new Props(GreetingActor.class), "greeter");greeter.tell(new Greeting("Charlie Parker"), null);
Full example
onsdag 3 april 13
Distributableby Design
onsdag 3 april 13
onsdag 3 april 13
ErrorKernel
onsdag 3 april 13
ErrorKernel
Node 1 Node 2
onsdag 3 april 13
Create Actor
ActorRef greeter = system.actorOf(new Props( GreetingActor.class), "greeter");
onsdag 3 april 13
akka { actor { provider = akka.remote.RemoteActorRefProvider deployment { /greeter { remote = } } }}
Just feed the ActorSystem with this configuration
Remote Deployment
onsdag 3 april 13
akka { actor { provider = akka.remote.RemoteActorRefProvider deployment { /greeter { remote = } } }}
Just feed the ActorSystem with this configuration
Configure a Remote Provider
Remote Deployment
onsdag 3 april 13
akka { actor { provider = akka.remote.RemoteActorRefProvider deployment { /greeter { remote = } } }}
Just feed the ActorSystem with this configuration
Configure a Remote Provider
For the Greeter actor
Remote Deployment
onsdag 3 april 13
akka { actor { provider = akka.remote.RemoteActorRefProvider deployment { /greeter { remote = } } }}
Just feed the ActorSystem with this configuration
Configure a Remote Provider
Define Remote Path
For the Greeter actor
Remote Deployment
onsdag 3 april 13
akka { actor { provider = akka.remote.RemoteActorRefProvider deployment { /greeter { remote = } } }}
Just feed the ActorSystem with this configuration
Configure a Remote Provider
Define Remote Path Protocol
For the Greeter actor
akka://
Remote Deployment
onsdag 3 april 13
akka { actor { provider = akka.remote.RemoteActorRefProvider deployment { /greeter { remote = } } }}
Just feed the ActorSystem with this configuration
Configure a Remote Provider
Define Remote Path Protocol Actor System
For the Greeter actor
akka://MySystem
Remote Deployment
onsdag 3 april 13
akka { actor { provider = akka.remote.RemoteActorRefProvider deployment { /greeter { remote = } } }}
Just feed the ActorSystem with this configuration
Configure a Remote Provider
Define Remote Path Protocol Actor System Hostname
For the Greeter actor
akka://MySystem@machine1
Remote Deployment
onsdag 3 april 13
akka { actor { provider = akka.remote.RemoteActorRefProvider deployment { /greeter { remote = } } }}
Just feed the ActorSystem with this configuration
Configure a Remote Provider
Define Remote Path Protocol Actor System Hostname Port
For the Greeter actor
akka://MySystem@machine1:2552
Remote Deployment
onsdag 3 april 13
akka { actor { provider = akka.remote.RemoteActorRefProvider deployment { /greeter { remote = } } }}
Just feed the ActorSystem with this configuration
Zero code changes
Configure a Remote Provider
Define Remote Path Protocol Actor System Hostname Port
For the Greeter actor
akka://MySystem@machine1:2552
Remote Deployment
onsdag 3 april 13
Remote Lookup
ActorRef greeter = system.actorFor( "akka://MySystem@machine1:2552/user/greeter" );
onsdag 3 april 13
Can you see the problem?
onsdag 3 april 13
Fixed Addresses
ActorRef greeter = system.actorFor( "akka://MySystem@machine1:2552/user/greeter" );
akka { actor { provider = akka.remote.RemoteActorRefProvider deployment { /greeter { remote = akka://MySystem@machine1:2552
} } }}
onsdag 3 april 13
Akka Cluster
onsdag 3 april 13
Akka Cluster 2.1
• Gossip-based Cluster Membership
• Failure Detector
• Cluster DeathWatch
• Cluster-Aware Routers
Cluster is experim
ental preview in 2.1
onsdag 3 april 13
Cluster Membership
• Node ring à la Riak / Dynamo
• Gossip-protocol for state dissemination
• Vector Clocks to detect convergence
onsdag 3 april 13
Node ring with gossiping members
MemberNode
onsdag 3 april 13
Node ring with gossiping members
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
onsdag 3 april 13
Node ring with gossiping members
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
onsdag 3 april 13
Node ring with gossiping members
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
onsdag 3 april 13
Vector Clock
• Vector Clocks are used to:
- Generate a partial ordering of events in a distributed system
- Detecting causality violations
• We use Vector Clocks to to reconcile and merge differences in cluster state
onsdag 3 april 13
Gossiping Protocol
onsdag 3 april 13
Gossiping Protocol
Used for :
onsdag 3 april 13
Gossiping Protocol
Used for :– Cluster Membership
onsdag 3 april 13
Gossiping Protocol
Used for :– Cluster Membership– Configuration data
onsdag 3 april 13
Gossiping Protocol
Used for :– Cluster Membership– Configuration data– Leader Determination
onsdag 3 april 13
Gossiping Protocol
Used for :– Cluster Membership– Configuration data– Leader Determination– Partitioning data
onsdag 3 april 13
Gossiping Protocol
Used for :– Cluster Membership– Configuration data– Leader Determination– Partitioning data– Naming Service
onsdag 3 april 13
Push/Pull Gossip
f u ture opt imizat ion in 2 . 3+• Push– sender only sends versions (Vector Clock)
• Pull– receiver only asks for information for which it has
an outdated version
• Partly biased– send fraction of gossip to nodes with older state
onsdag 3 april 13
Cluster Convergence
onsdag 3 april 13
Cluster Convergence
• When each Node has seen the same Vector Clock
onsdag 3 april 13
Cluster Convergence
• When each Node has seen the same Vector Clock
• unreachable nodes will fail this
onsdag 3 april 13
Cluster Convergence
• When each Node has seen the same Vector Clock
• unreachable nodes will fail this
• mark nodes DOWN to proceed
onsdag 3 april 13
Cluster Convergence
• When each Node has seen the same Vector Clock
• unreachable nodes will fail this
• mark nodes DOWN to proceed
– manual Ops intervention
onsdag 3 april 13
Cluster Convergence
• When each Node has seen the same Vector Clock
• unreachable nodes will fail this
• mark nodes DOWN to proceed
– manual Ops intervention– automatic action
onsdag 3 april 13
Member States
onsdag 3 april 13
Member States
• JOINING
• UP
• LEAVING
• EXITING
• DOWN
• REMOVED
onsdag 3 april 13
Member States
• JOINING
• UP
• LEAVING
• EXITING
• DOWN
• REMOVED
onsdag 3 april 13
Leader
onsdag 3 april 13
• Any node can be the leader
Leader
onsdag 3 april 13
• Any node can be the leader
• Just takes the role of being a leader
Leader
onsdag 3 april 13
• Any node can be the leader
• Just takes the role of being a leader
• Is deterministically recognized by all nodes
Leader
onsdag 3 april 13
• Any node can be the leader
• Just takes the role of being a leader
• Is deterministically recognized by all nodes
– always the first member in the sorted membership ring
Leader
onsdag 3 april 13
Cluster Events
public class Listener extends UntypedActor { public void onReceive(Object message) { if (message instanceof MemberUp) { // ... } }}
ActorRef listener = system.actorOf(new Props(Listener.class), "listener");
Cluster.get(system).subscribe(listener, MemberEvent.class);
onsdag 3 april 13
Cluster Events
public class Listener extends UntypedActor { public void onReceive(Object message) { if (message instanceof MemberUp) { MemberUp mUp = (MemberUp) message; getContext().actorFor(mUp.address() + "/user/greeter").tell( new Greeting("Charlie Parker"), getSelf()); } }}
ActorRef listener = system.actorOf(new Props(Listener.class), "listener");
Cluster.get(system).subscribe(listener, MemberEvent.class);
onsdag 3 april 13
Phi Accrual Failure Detector
• B monitors A
• Sample inter-arrival time to expect next beat
• B measures continuum of deadness of A
A Bregular messages
http://ddg.jaist.ac.jp/pub/HDY+04.pdf
onsdag 3 april 13
Phi Accrual Failure Detector
• B monitors A
• Sample inter-arrival time to expect next beat
• B measures continuum of deadness of A
A Bregular messages
http://ddg.jaist.ac.jp/pub/HDY+04.pdf
onsdag 3 april 13
Selective Failure Detection
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
Heartbeat
onsdag 3 april 13
Selective Failure Detection
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
MemberNode
Heartbeat
onsdag 3 april 13
Cluster DeathWatch
• Triggered by marking node «A» DOWN– Tell parents of their lost children on «A»– Kill all children of actors on «A»– Send Terminated for actors on «A»
onsdag 3 april 13
Enable clusteringakka { actor { provider = "akka.cluster.ClusterActorRefProvider" ... } extensions = ["akka.cluster.Cluster"] cluster { seed-nodes = [ "akka://[email protected]:2551", "akka://[email protected]:2552" ] }}
onsdag 3 april 13
Load Balancingonsdag 3 april 13
Routers
ActorRef routerActor = getContext().actorOf( new Props(ExampleActor.class). withRouter(new RoundRobinRouter(nrOfInstances)) );
onsdag 3 april 13
…or from config
akka.actor.deployment { /path/to/actor { router = round-robin nr-of-instances = 5 } }
onsdag 3 april 13
Configure a clustered router
akka.actor.deployment { /statsService/workerRouter { router = consistent-hashing nr-of-instances = 100
cluster { enabled = on max-nr-of-instances-per-node = 3 allow-local-routees = on } }}
onsdag 3 april 13
Multi Node Testingobject MultiNodeSampleConfig extends MultiNodeConfig { val node1 = role("node1") val node2 = role("node2")}
"A MultiNodeSample" must {
"wait for all nodes to enter a barrier" in { enterBarrier("startup") }
}
onsdag 3 april 13
Multi Node TestingrunOn(node2) { system.actorOf(Props(new Actor { def receive = { case "ping" => sender ! "pong" } }), "ponger")}
enterBarrier("deployed")
runOn(node1) { val ponger = system.actorFor(node(node2) / "user" / "ponger") ponger ! "ping" expectMsg("pong")}
enterBarrier("finished")
onsdag 3 april 13
… when the Cluster grows up
onsdag 3 april 13
Adaptive Load Balancing
• Metrics collected and spread– Heap memory– CPU, system load
• Adaptive Router– Biased random with weights based on capacity
onsdag 3 april 13
One tree to rule them all
• One Actor tree per node
• Cluster tree is mapped to local sub-trees
onsdag 3 april 13
One tree to rule them all
onsdag 3 april 13
One tree to rule them all
onsdag 3 april 13
One tree to rule them all
onsdag 3 april 13
The Magic Sauce
• User code only sees cluster://... names
• ActorRef becomes repointable– local– remote
• Can now move actors around transparently– Actor encapsulation makes it possible
onsdag 3 april 13
What does this enable?
• Actor migration
• Actor replication
• Automatic cluster partitioning– later also based on runtime metrics
• Node fail-over– first for stateless actors– later for stateful actors using event sourcing
➾ Fault Tolerance & Distribution
onsdag 3 april 13
Typed ChannelsExperimental in 2.2
onsdag 3 april 13
someActor ! CommandOne
The Problem
onsdag 3 april 13
trait Commandcase class CommandOne(param: String) extends Command
someActor ! CommandOne
The Problem
onsdag 3 april 13
because the other does not compile
someActor <-!- CommandOne(”msg”)
The Vision
onsdag 3 april 13
But How?
• ActorRef must know about message types– Actor type must be parameterized
• Message type is verified against that
onsdag 3 april 13
val f: Future[Response] =
someActor <-?- CommandOne(”hello”)
because the compiler knows
And the replies?
onsdag 3 april 13
And How This?
• ActorRef must know reply types– Actor must be parameterized with them
• Reply types are extracted at call site
onsdag 3 april 13
No Type Pollution
• Generic Filter/Transform Actors– accept management commands– pass on generic other type
• Using just one type is not enough!
• Need to use type unions and allow multiple possible reply types for one input
onsdag 3 april 13
The Implementation
• Tagged type union with:+:[(In, Out), ChannelList] <: ChannelList
• Value class ChannelRef[…](val a: ActorRef)
• Actor mixin Channels[…]
• WrappedMessage[…, LUB](val m: LUB)
• ops desugar to tell/ask after type check
onsdag 3 april 13
msg -?-> firstActor -?-> secondActor -!-> client
msg -?-> someService -*-> (_ map httpOk) -!-> client
Process wiring from the outside
Actors Do Compose
onsdag 3 april 13
class OpinionatedEcho extends Actor with Channels[TNil, (String, String) :+: TNil] {
channel[String] { (str, sender) ⇒ sender <-!- str } // or channel[String] { case (”hello”, sender) ⇒ sender <-!- ”world” case (x, sender) ⇒ sender <-!- s”dunno: $x” }
}
“sender” will accept only String messages
How to Declare it?
onsdag 3 april 13
The Result:
onsdag 3 april 13
Type-Safe Composability
of
Actor Systems
onsdag 3 april 13
get it and learn morehttp://akka.io
http://typesafe.com
http://letitcrash.com
onsdag 3 april 13
E0Fonsdag 3 april 13