activator and reactive at play nyc meetup
DESCRIPTION
Slides from the presentation given at PlayNYC and GiltTech meetup in New York on 24th of September.TRANSCRIPT
Henrik Engstrom Software Engineer
@h3nk3
What Constitutes a Reactive Application
Reactive Patterns Applied
2
3
Activator UI!(JavaScript, Knockout, CSS3, black magic…)
Play Application!(Play 2.3.x, Akka 2.3.x, sbt 0.13.x)
sbt Server
websockets
sbt protocol
Async Communication!Websockets!Akka Actors
Other Clients
(Eclipse, Terminal, etc.)
DEMO
4
Reactive Applications
The Four Reactive Traits
5
http://reactivemanifesto.org/
Message-Driven !
The Foundation of being Reactive
Akka Actors in 5 minutes or so…
7
8
ACTORS
9
import akka.actor._ !class GreetingActor extends Actor with ActorLogging { import GreetingActor._ def receive = { case User(name) => log.info(s“Hello there ${name}”) } } !object GreetingActor { case class User(name: String) def props: Props = Props(classOf[GreetingActor]) }
10
MAILBOXES
11
THREADS
12
STATE
13
CONTAINERS
14
val system = ActorSystem(“MySystem”) val greeter: ActorRef = system.actorOf(GreetingActor.props, “greeter”)
15
MESSAGES
16
greeter ! GreetingActor.User(“PlayNYC”) // or greeter.tell(GreetingActor.User(“PlayNYC”))
17
import akka.actor._ !object Sample extends App { val system = ActorSystem(“MySystem”) val greeter = system.actorOf(GreetingActor.props, “greeter”) greeter ! GreetingActor.User(“PlayNYC”) Thread.sleep(1000) // wait a little before shutting down system.shutdown() } !class GreetingActor extends Actor with ActorLogging { def receive = { case GreetingActor.User(name) => log.info(s“Hello there ${name}”) } } !object GreetingActor { case class User(name: String) def props: Props = Props(classOf[GreetingActor]) }
18
he-mbp:sample he$ scalac Sample.scala !he-mbp:sample he$ scala Sample ![INFO] [09/15/2014 16:57:19.879] [MySystem-akka.actor.default-dispatcher-4] [akka://MySystem/user/greeter] Hello there PlayNYC !he-mbp:sample he$
Resilient !
Responsive in the Face of Failure
20
SUPERVISION
21
import akka.actor.OneForOneStrategy import akka.actor.SupervisorStrategy._ import scala.concurrent.duration._ class SupervisorActor extends Actor with ActorLogging { override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { case _: SomeDbException => Resume case _: SomeOtherDbException => Restart case _: Exception => Escalate } ! val dbActor = context.actorOf(DbActor.props) def receive = { case op: SomeDbOperation => dbActor forward op } }
22
BULKHEAD
23
CIRCUIT BREAKER
Elastic !
Responsive in the Face of Changing Load
2525
Proxy
Play App 1 Play App2 Play App n…
SOME CHALLENGES • Each user has state (location) • Horizontal scaling => !sticky sessions • How do we route to the right node? • Node Crashes • Proxy is SPOF
2626
Play App
Akka!Cluster!Node
Akka!Cluster!Node
Akka!Cluster!Node
Akka!Cluster!Node
27
Akka!Cluster!Node
Akka!Cluster!Node
Akka!Cluster!Node
Akka!Cluster!Node
Shard Coord.
Shard Region
Shard Region
Shard Region
Shard Region
28
Supported HTTP commands (Play routes file) !GET / controller.Application.index PUT /create/user/:userId c.Application.createUser(userId: String) POST /update/user/:userId c.Application.updateUser(userId: String)
29
package controller object Application extends Controller { val userController = AppGlobal.system.actorOf(UserController.props) ! def index = Action { Ok(“Node is up”) } ! def createUser(userId: String) = Action { userController ! UserController.CreateUser(userId) Ok("new user is created") } ! def updateUser(userId: String) = Action { userController ! UserController.UpdateUser(userId, request.getQueryString(“msg")) Ok("user is updated") } }
30
object AppGlobal extends GlobalSettings { private[this] var _system: Option[ActorSystem] = None def system = _system.getOrElse(throw new RuntimeException(“…”)) ! override def onStart(app: Application) = { _system = Some(ActorSystem(“ClusterSystem”)) _system.foreach(createShard(_)) } ! override def onStop(app: Application) = { system.foreach(_.shutdown()) } ! private def createShard(system: ActorSystem) = { ClusterSharding(system).start( typeName = User.shardName, entryProps = Some(User.props), idExtractor = User.idExtractor, shardResolver = User.shardResolver ) } }
31
object User { val shardName = "users" def props: Props = Props(classOf[User]) ! trait UserMessage { def userId: String } case class CreateUser(userId: String) extends UserMessage case class UpdateUser(userId: String, text: String) extends UserMessage val idExtractor: ShardRegion.IdExctrator = { case um: UserMessage => (um.userId, um) } val shardResolver: ShardRegion.ShardResolver = { case um: UserMessage => (Math.abs(um.userId.hashCode) % 20).toString } }
32
class User extends Actor with ActorLogging { var currentMessage: Option[String] = None def receive = { case CreateUser(userId) => currentMessage = Some(s"User created: ${userId}”) case UpdateUser(userId, text) => log.info(s“User ${userId} now has msg ${text}”) currentMessage = Some(text) } }
33
akka { actor.provider = "akka.cluster.ClusterActorRefProvider" remote.nettytcp { port = 0 hostname = "127.0.0.1" } ! remote.log-remote-lifecycle-events = on ! cluster { seed-nodes = [ "akka.tcp://[email protected]:2551", "akka.tcp://[email protected]:2552" ] } ! auto-down-unreachable-after = 30s }
3434
Play App
Akka!Cluster!Node
Akka!Cluster!Node
Akka!Cluster!Node
Akka!Cluster!Node
Responsive !
Always Available - Interactive -
(near) Real-Time
36
package controllers !import play.api._ import play.api.mvc._ import play.api.libs.json._ import play.api.Play.current import actors.WebSocketActor !object Application extends Controller { def index = Action { Ok(views.html.index("Your new application is ready.")) } ! def socket = WebSocket.acceptWithActor[JsValue, JsValue] { request => out => WebSocketActor.props(out) } }
37
object WebSocketActor { def props(out: ActorRef) = Props(new WebSocketActor(out)) case object Again } !class WebSocketActor(out: ActorRef) extends Actor { import WebSocketActor._ var user: Option[String] = None def receive = { case json: JsValue => val name = (json \ "name").as[String] user = Some(name) reply(name) context.system.scheduler.scheduleOnce(3 seconds, self, Again) case Again => user.foreach{reply(_)} } ! def reply(name: String) = { out ! Json.obj("message" -> s"Hello there ${name}") } }
38
# routes file GET / controllers.Application.index GET /socket controllers.Application.socket !> var ws = new WebSocket("ws://localhost:9000/socket"); > ws.onmessage = function(msg) { console.log(">", msg); }; > ws.send(JSON.stringify({"name": “PlayNYC"})); !> MessageEvent {ports: Array[0], data: "{"message":"Hello there PlayNYC"}", source: null, lastEventId: "", origin: "ws://localhost:9000"…} !> MessageEvent {ports: Array[0], data: "{"message":"Hello there PlayNYC"}", source: null, lastEventId: "", origin: "ws://localhost:9000"…}
39
import play.api.libs.json._ import play.api.mvc.WebSocket.FrameFormatter !implicit val inEventFormat = Json.format[InEvent] implicit val outEventFormat = Json.format[OutEvent] !//Play provides default FrameFormatter for String or JsValue implicit val inEFF = FrameFormatter.jsonFrame[InEvent] implicit val outEFF = FrameFormatter.jsonFrame[OutEvent] !def socket = WebSocket.acceptWithActor[InEvent, OutEvent] { request => out => WebSocketActor.props(out) }
4040
CoordActor
TwActor FBActor WActor
Twitter Facebook weather.com
Entry Actor
Request Response
41
RESOURCES !Activator - get it now! http://typesafe.com/platform/getstarted !Awesome Example Code by Nilanjan Raychaudhuri https://github.com/nraychaudhuri/scaladays2014/tree/master/play-akka-sharding !Play Websockets https://www.playframework.com/documentation/2.3.x/ScalaWebSockets !Akka Cluster Sharding http://doc.akka.io/docs/akka/2.3.6/contrib/cluster-sharding.html
Reactive Applications
The Four Reactive Traits
42
http://reactivemanifesto.org/
©Typesafe 2014 – All Rights Reserved All images in this presentation are from www.morguefile.com