data in motion: streaming static data efficiently 2
TRANSCRIPT
![Page 1: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/1.jpg)
MANCHESTER LONDON NEW YORK
![Page 2: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/2.jpg)
Martin Zapletal @zapletal_martin#ScalaDays
Data in Motion: Streaming Static Data Efficientlyin Akka Persistence (and elsewhere)
@cakesolutions
![Page 3: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/3.jpg)
Data at scale
● Reactive● Real time, asynchronous and message driven● Elastic and scalable● Resilient and fault tolerant
![Page 4: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/4.jpg)
Streams
![Page 5: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/5.jpg)
persistence_id1, event 2
persistence_id1, event 3
persistence_id1, event 4
persistence_id1, event 1
235
Akka Persistence
1 4
![Page 6: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/6.jpg)
class AccountActor(protected[this] val passivationTimeout: Duration) extends PersistentActor {
override val persistenceId: String = extractId(self.path.name) override def receiveCommand: Receive = active(initialState)
private def active( balance: State ): Receive = { case command: AccountCommand => command match { case cmd: UpdateBalanceCommand => cmd.validate().fold({ balanceUpdated => persist(balanceUpdated) { persisted => val updatedState = balance.update(persisted) sender() ! updatedState context.become(active(updatedState)) } }, processValidationErrors)
... } }}
![Page 7: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/7.jpg)
class AccountActor(protected[this] val passivationTimeout: Duration) extends PersistentActor {
override val persistenceId: String = extractId(self.path.name) override def receiveCommand: Receive = active(initialState)
private def active( balance: State ): Receive = { case command: AccountCommand => command match { case cmd: UpdateBalanceCommand => cmd.validate().fold({ balanceUpdated => persist(balanceUpdated) { persisted => val updatedState = balance.update(persisted) sender() ! updatedState context.become(active(updatedState)) } }, processValidationErrors)
... } }}
![Page 8: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/8.jpg)
class AccountActor(protected[this] val passivationTimeout: Duration) extends PersistentActor {
override val persistenceId: String = extractId(self.path.name) override def receiveCommand: Receive = active(initialState)
private def active( balance: State ): Receive = { case command: AccountCommand => command match { case cmd: UpdateBalanceCommand => cmd.validate().fold({ balanceUpdated => persist(balanceUpdated) { persisted => val updatedState = balance.update(persisted) sender() ! updatedState context.become(active(updatedState)) } }, processValidationErrors)
... } }}
![Page 9: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/9.jpg)
class AccountActor(protected[this] val passivationTimeout: Duration) extends PersistentActor {
override val persistenceId: String = extractId(self.path.name) override def receiveCommand: Receive = active(initialState)
private def active( balance: State ): Receive = { case command: AccountCommand => command match { case cmd: UpdateBalanceCommand => cmd.validate().fold({ balanceUpdated => persist(balanceUpdated) { persisted => val updatedState = balance.update(persisted) sender() ! updatedState context.become(active(updatedState)) } }, processValidationErrors)
... } }}
![Page 10: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/10.jpg)
class AccountActor(protected[this] val passivationTimeout: Duration) extends PersistentActor {
override val persistenceId: String = extractId(self.path.name) override def receiveCommand: Receive = active(initialState)
private def active( balance: State ): Receive = { case command: AccountCommand => command match { case cmd: UpdateBalanceCommand => cmd.validate().fold({ balanceUpdated => persist(balanceUpdated) { persisted => val updatedState = balance.update(persisted) sender() ! updatedState context.become(active(updatedState)) } }, processValidationErrors)
... } }}
![Page 11: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/11.jpg)
case cmd: UpdateGroupBalanceCommand => cmd.validate().fold({ groupBalanceUpdated => persist(Tagged(groupBalanceUpdated, Set("tag1"))) { persisted => sender() ! groupBalanceUpdated } }, processValidationErrors)
![Page 12: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/12.jpg)
case cmd: UpdateGroupBalanceCommand => cmd.validate().fold({ groupBalanceUpdated => persist(Tagged(groupBalanceUpdated, Set("tag1"))) { persisted => sender() ! groupBalanceUpdated } }, processValidationErrors)
![Page 13: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/13.jpg)
override def receiveRecover: Receive = { var state: State = initialState
{ case balanceUpdated: BalanceUpdatedEvent => state = state.update(balanceUpdated)
case RecoveryCompleted => context.become(active(state)) }}
![Page 14: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/14.jpg)
override def receiveRecover: Receive = { var state: State = initialState
{ case balanceUpdated: BalanceUpdatedEvent => state = state.update(balanceUpdated)
case RecoveryCompleted => context.become(active(state)) }}
![Page 15: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/15.jpg)
0
1
2
3
4
0
5
10
1
5
Inserted value 0
Inserted value 5
Inserted value 10
Inserted value 1
Inserted value 55
Log data structure
![Page 16: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/16.jpg)
Persistence_ id partition_nr
0 00 1
event 1
event 100 event 101 event 102
event 0 event 2
1 0 event 0 event 1 event 2
Akka Persistence Cassandra● Purely pull● Event (log) data
![Page 17: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/17.jpg)
Akka Persistence Query● eventsByPersistenceId, allPersistenceIds, eventsByTag
1 4 235
persistence_id1, event 2
persistence_id1, event 3
persistence_id1, event 4
persistence_id1, event 1
![Page 18: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/18.jpg)
implicit val system: ActorSystem = ...implicit val materializer: Materializer = ...
lazy val queries: CassandraReadJournal = PersistenceQuery(system) .readJournalFor[CassandraReadJournal]("cassandra-query-journal")
queries .eventsByPersistenceId(persistenceId, 0, Long.MaxValue) .runForeach(println)
![Page 19: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/19.jpg)
implicit val system: ActorSystem = ...implicit val materializer: Materializer = ...
lazy val queries: CassandraReadJournal = PersistenceQuery(system) .readJournalFor[CassandraReadJournal]("cassandra-query-journal")
queries .eventsByPersistenceId(persistenceId, 0, Long.MaxValue) .runForeach(println)
![Page 20: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/20.jpg)
implicit val system: ActorSystem = ...implicit val materializer: Materializer = ...
lazy val queries: CassandraReadJournal = PersistenceQuery(system) .readJournalFor[CassandraReadJournal]("cassandra-query-journal")
queries .eventsByPersistenceId(persistenceId, 0, Long.MaxValue) .runForeach(println)
![Page 21: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/21.jpg)
EventEnvelope(1,persistenceId,1,GroupBalanceUpdatedEvent(9248.0))EventEnvelope(2,persistenceId,2,BalanceUpdatedEvent(4355.0))EventEnvelope(3,persistenceId,3,BalanceUpdatedEvent(5245.0))EventEnvelope(4,persistenceId,4,BalanceUpdatedEvent(4631.0))EventEnvelope(5,persistenceId,5,BalanceUpdatedEvent(973.0))...
![Page 22: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/22.jpg)
implicit val system: ActorSystem = ...implicit val materializer: Materializer = ...
lazy val queries: CassandraReadJournal = PersistenceQuery(system) .readJournalFor[CassandraReadJournal]("cassandra-query-journal")
queries .allPersistenceIds() .runForeach(println)
![Page 23: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/23.jpg)
implicit val system: ActorSystem = ...implicit val materializer: Materializer = ...
lazy val queries: CassandraReadJournal = PersistenceQuery(system) .readJournalFor[CassandraReadJournal]("cassandra-query-journal")
queries .allPersistenceIds() .runForeach(println)
![Page 24: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/24.jpg)
persistenceId5persistenceId2persistenceId4persistenceId1persistenceId4...
![Page 25: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/25.jpg)
implicit val system: ActorSystem = ...implicit val materializer: Materializer = ...
lazy val queries: CassandraReadJournal = PersistenceQuery(system) .readJournalFor[CassandraReadJournal]("cassandra-query-journal")
queries .eventsByTag("tag1", 0) .runForeach(println)
![Page 26: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/26.jpg)
implicit val system: ActorSystem = ...implicit val materializer: Materializer = ...
lazy val queries: CassandraReadJournal = PersistenceQuery(system) .readJournalFor[CassandraReadJournal]("cassandra-query-journal")
queries .eventsByTag("tag1", 0) .runForeach(println)
![Page 27: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/27.jpg)
implicit val system: ActorSystem = ...implicit val materializer: Materializer = ...
lazy val queries: CassandraReadJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal]("cassandra-query-journal")
val transform = Flow[EventEnvelope] .collect { case EventEnvelope(_, _, _, BalanceUpdatedEvent(value)) => value } .scan(new CircularFifoQueue[Double](5)){ (s, d) => s.add(d); s }
val g = RunnableGraph.fromGraph { GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => import akka.stream.scaladsl.GraphDSL.Implicits._
queries.eventsByPersistenceId(persistenceId, 0, Long.MaxValue) ~> transform ~> kafkaSink ClosedShape }}
g.run()
![Page 28: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/28.jpg)
implicit val system: ActorSystem = ...implicit val materializer: Materializer = ...
lazy val queries: CassandraReadJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal]("cassandra-query-journal")
val transform = Flow[EventEnvelope] .collect { case EventEnvelope(_, _, _, BalanceUpdatedEvent(value)) => value } .scan(new CircularFifoQueue[Double](5)){ (s, d) => s.add(d); s }
val g = RunnableGraph.fromGraph { GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => import akka.stream.scaladsl.GraphDSL.Implicits._
queries.eventsByPersistenceId(persistenceId, 0, Long.MaxValue) ~> transform ~> kafkaSink ClosedShape }}
g.run()
![Page 29: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/29.jpg)
implicit val system: ActorSystem = ...implicit val materializer: Materializer = ...
lazy val queries: CassandraReadJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal]("cassandra-query-journal")
val transform = Flow[EventEnvelope] .collect { case EventEnvelope(_, _, _, BalanceUpdatedEvent(value)) => value } .scan(new CircularFifoQueue[Double](5)){ (s, d) => s.add(d); s }
val g = RunnableGraph.fromGraph { GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => import akka.stream.scaladsl.GraphDSL.Implicits._
queries.eventsByPersistenceId(persistenceId, 0, Long.MaxValue) ~> transform ~> kafkaSink ClosedShape }}
g.run()
![Page 30: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/30.jpg)
public class AccountEntity extends PersistentEntity<AccountCommand, AccountEvent, State> {
@Override public Behavior initialBehavior(Optional<State> snapshotState) {
BehaviorBuilder b = newBehaviorBuilder(snapshotState.orElse(initialState);
b.setCommandHandler(UpdateBalanceCommand.class, (cmd, ctx) -> { if (! validate(cmd)) { ctx.invalidCommand("..."); return ctx.done(); } else { return ctx.thenPersist( new BalanceUpdatedEvent(cmd.value), () -> ctx.reply(Done.getInstance())); } });
b.setEventHandler(BalanceUpdatedEvent.class, evt -> state.update(evt));
return b.build(); }}
![Page 31: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/31.jpg)
public class AccountEntity extends PersistentEntity<AccountCommand, AccountEvent, State> {
@Override public Behavior initialBehavior(Optional<State> snapshotState) {
BehaviorBuilder b = newBehaviorBuilder(snapshotState.orElse(initialState);
b.setCommandHandler(UpdateBalanceCommand.class, (cmd, ctx) -> { if (! validate(cmd)) { ctx.invalidCommand("..."); return ctx.done(); } else { return ctx.thenPersist( new BalanceUpdatedEvent(cmd.value), () -> ctx.reply(Done.getInstance())); } });
b.setEventHandler(BalanceUpdatedEvent.class, evt -> state.update(evt));
return b.build(); }}
![Page 32: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/32.jpg)
public class AccountEntity extends PersistentEntity<AccountCommand, AccountEvent, State> {
@Override public Behavior initialBehavior(Optional<State> snapshotState) {
BehaviorBuilder b = newBehaviorBuilder(snapshotState.orElse(initialState);
b.setCommandHandler(UpdateBalanceCommand.class, (cmd, ctx) -> { if (! validate(cmd)) { ctx.invalidCommand("..."); return ctx.done(); } else { return ctx.thenPersist( new BalanceUpdatedEvent(cmd.value), () -> ctx.reply(Done.getInstance())); } });
b.setEventHandler(BalanceUpdatedEvent.class, evt -> state.update(evt));
return b.build(); }}
![Page 33: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/33.jpg)
public class AccountEventProcessor extends CassandraReadSideProcessor<AccountEvent> { AccountEventProcessor state = ... @Override public AggregateEventTag<AccountEvent> aggregateTag() { return Tag1.INSTANCE; }
@Override public CompletionStage<Optional<UUID>> prepare(CassandraSession session) { return prepareCreateTables(session).thenCompose(a -> … // Prepare tables, statements, etc. }
@Override public EventHandlers defineEventHandlers(EventHandlersBuilder builder) { builder.setEventHandler(AccountEvent.class, this::processAccountEvent); return builder.build(); }
private CompletionStage<List<BoundStatement>> processAccountEvent(AccountEvent event, UUID offset) { BoundStatement bindWriteAnalytics = writeAnalytics.bind(); writeAnalytics.setString("entity_id", event.id); ... return completedStatements(Arrays.asList(bindWriteAnalytics)); } }
![Page 34: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/34.jpg)
public class AccountEventProcessor extends CassandraReadSideProcessor<AccountEvent> { AccountEventProcessor state = ... @Override public AggregateEventTag<AccountEvent> aggregateTag() { return Tag1.INSTANCE; }
@Override public CompletionStage<Optional<UUID>> prepare(CassandraSession session) { return prepareCreateTables(session).thenCompose(a -> … // Prepare tables, statements, etc. }
@Override public EventHandlers defineEventHandlers(EventHandlersBuilder builder) { builder.setEventHandler(AccountEvent.class, this::processAccountEvent); return builder.build(); }
private CompletionStage<List<BoundStatement>> processAccountEvent(AccountEvent event, UUID offset) { BoundStatement bindWriteAnalytics = writeAnalytics.bind(); writeAnalytics.setString("entity_id", event.id); ... return completedStatements(Arrays.asList(bindWriteAnalytics)); } }
![Page 35: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/35.jpg)
public class AccountEventProcessor extends CassandraReadSideProcessor<AccountEvent> { AccountEventProcessor state = ... @Override public AggregateEventTag<AccountEvent> aggregateTag() { return Tag1.INSTANCE; }
@Override public CompletionStage<Optional<UUID>> prepare(CassandraSession session) { return prepareCreateTables(session).thenCompose(a -> … // Prepare tables, statements, etc. }
@Override public EventHandlers defineEventHandlers(EventHandlersBuilder builder) { builder.setEventHandler(AccountEvent.class, this::processAccountEvent); return builder.build(); }
private CompletionStage<List<BoundStatement>> processAccountEvent(AccountEvent event, UUID offset) { BoundStatement bindWriteAnalytics = writeAnalytics.bind(); writeAnalytics.setString("entity_id", event.id); ... return completedStatements(Arrays.asList(bindWriteAnalytics)); } }
![Page 36: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/36.jpg)
Streaming static data
● Turning database into a stream
![Page 37: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/37.jpg)
Pulling data from a log
0 0
105 5
10
![Page 38: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/38.jpg)
0 0
105 5
100 0
![Page 39: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/39.jpg)
00 0
105 5
105 5 0
![Page 40: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/40.jpg)
10 10 5 5 0 0
0 0
105 5
10
![Page 41: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/41.jpg)
10 10 5 5 0 0
0 0
1015 15
5 510
![Page 42: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/42.jpg)
0 0
15 15
5 5 15 15 10 10 5 5 0 010 10
![Page 43: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/43.jpg)
Actor publisherprivate[query] abstract class QueryActorPublisher[MessageType, State: ClassTag](refreshInterval: Option[FiniteDuration]) extends ActorPublisher[MessageType] {
protected def initialState: Future[State] protected def initialQuery(initialState: State): Future[Action] protected def requestNext(state: State, resultSet: ResultSet): Future[Action] protected def requestNextFinished(state: State, resultSet: ResultSet): Future[Action] protected def updateState(state: State, row: Row): (Option[MessageType], State) protected def completionCondition(state: State): Boolean
private[this] def nextBehavior(...): Receive = { if (shouldFetchMore(...)) { listenableFutureToFuture(resultSet.fetchMoreResults()).map(FetchedResultSet).pipeTo(self) awaiting(resultSet, state, finished) } else if (shouldIdle(...)) { idle(resultSet, state, finished) } else if (shouldComplete(...)) { onCompleteThenStop() Actor.emptyBehavior } else if (shouldRequestMore(...)) { if (finished) requestNextFinished(state, resultSet).pipeTo(self) else requestNext(state, resultSet).pipeTo(self) awaiting(resultSet, state, finished) } else { idle(resultSet, state, finished) } }}
![Page 44: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/44.jpg)
private[query] abstract class QueryActorPublisher[MessageType, State: ClassTag](refreshInterval: Option[FiniteDuration]) extends ActorPublisher[MessageType] {
protected def initialState: Future[State] protected def initialQuery(initialState: State): Future[Action] protected def requestNext(state: State, resultSet: ResultSet): Future[Action] protected def requestNextFinished(state: State, resultSet: ResultSet): Future[Action] protected def updateState(state: State, row: Row): (Option[MessageType], State) protected def completionCondition(state: State): Boolean
private[this] def nextBehavior(...): Receive = { if (shouldFetchMore(...)) { listenableFutureToFuture(resultSet.fetchMoreResults()).map(FetchedResultSet).pipeTo(self) awaiting(resultSet, state, finished) } else if (shouldIdle(...)) { idle(resultSet, state, finished) } else if (shouldComplete(...)) { onCompleteThenStop() Actor.emptyBehavior } else if (shouldRequestMore(...)) { if (finished) requestNextFinished(state, resultSet).pipeTo(self) else requestNext(state, resultSet).pipeTo(self) awaiting(resultSet, state, finished) } else { idle(resultSet, state, finished) } }}
}
![Page 45: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/45.jpg)
private[query] abstract class QueryActorPublisher[MessageType, State: ClassTag](refreshInterval: Option[FiniteDuration]) extends ActorPublisher[MessageType] {
protected def initialState: Future[State] protected def initialQuery(initialState: State): Future[Action] protected def requestNext(state: State, resultSet: ResultSet): Future[Action] protected def requestNextFinished(state: State, resultSet: ResultSet): Future[Action] protected def updateState(state: State, row: Row): (Option[MessageType], State) protected def completionCondition(state: State): Boolean
private[this] def nextBehavior(...): Receive = { if (shouldFetchMore(...)) { listenableFutureToFuture(resultSet.fetchMoreResults()).map(FetchedResultSet).pipeTo(self) awaiting(resultSet, state, finished) } else if (shouldIdle(...)) { idle(resultSet, state, finished) } else if (shouldComplete(...)) { onCompleteThenStop() Actor.emptyBehavior } else if (shouldRequestMore(...)) { if (finished) requestNextFinished(state, resultSet).pipeTo(self) else requestNext(state, resultSet).pipeTo(self) awaiting(resultSet, state, finished) } else { idle(resultSet, state, finished) } }}
}
![Page 46: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/46.jpg)
initialQuery
Cancel
initialFinished
shouldFetchMore
shouldIdle
shouldTerminate
shouldRequestMore
SubscriptionTimeout
Cancel
SubscriptionTimeout
initialNewResultSet
request newResultSet
fetchedResultSet
finished
Cancel
SubscriptionTimeout
requestcontinue
Red transitionsdeliver buffer and update internal state (progress)
Blue transitions asynchronous database query
![Page 47: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/47.jpg)
SELECT * FROM ${tableName} WHERE persistence_id = ? AND partition_nr = ? AND sequence_nr >= ? AND sequence_nr <= ?
0 0
0 1
event 1
event 100 event 101 event 102
event 0 event 2
Events by persistence id
![Page 48: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/48.jpg)
0 0
0 1
event 1
event 100 event 101 event 102
event 2event 0
![Page 49: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/49.jpg)
0 0
0 1
event 1
event 100 event 101 event 102
event 2event 0
![Page 50: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/50.jpg)
0 0
0 1
event 1
event 100 event 101 event 102
event 2event 0
![Page 51: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/51.jpg)
0 0
0 1
event 1
event 100 event 101 event 102
event 2event 0
![Page 52: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/52.jpg)
0 0
0 1
event 1
event 100 event 101 event 102
event 2event 0
![Page 53: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/53.jpg)
0 0
0 1
event 0 event 1
event 100 event 101 event 102
event 2
![Page 54: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/54.jpg)
private[query] class EventsByPersistenceIdPublisher(...) extends QueryActorPublisher[PersistentRepr, EventsByPersistenceIdState](...) { override protected def initialState: Future[EventsByPersistenceIdState] = { ... EventsByPersistenceIdState(initialFromSequenceNr, 0, currentPnr) }
override protected def updateState( state: EventsByPersistenceIdState, Row: Row): (Option[PersistentRepr], EventsByPersistenceIdState) = { val event = extractEvent(row) val partitionNr = row.getLong("partition_nr") + 1
(Some(event), EventsByPersistenceIdState(event.sequenceNr + 1, state.count + 1, partitionNr)) }}
![Page 55: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/55.jpg)
private[query] class EventsByPersistenceIdPublisher(...) extends QueryActorPublisher[PersistentRepr, EventsByPersistenceIdState](...) { override protected def initialState: Future[EventsByPersistenceIdState] = { ... EventsByPersistenceIdState(initialFromSequenceNr, 0, currentPnr) }
override protected def updateState( state: EventsByPersistenceIdState, Row: Row): (Option[PersistentRepr], EventsByPersistenceIdState) = { val event = extractEvent(row) val partitionNr = row.getLong("partition_nr") + 1
(Some(event), EventsByPersistenceIdState(event.sequenceNr + 1, state.count + 1, partitionNr)) }}
![Page 56: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/56.jpg)
0 0
0 1
event 1
event 100 event 101 event 102
event 0 event 2
1 0 event 0 event 1 event 2
All persistence idsSELECT DISTINCT persistence_id, partition_nr FROM $tableName
![Page 57: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/57.jpg)
0 0
0 1
event 1
event 100 event 101 event 102
event 0 event 2
1 0 event 0 event 1 event 2
![Page 58: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/58.jpg)
0 0
0 1
event 1
event 100 event 101 event 102
event 0 event 2
1 0 event 0 event 1 event 2
![Page 59: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/59.jpg)
0
0
0
1
event 1
event 100 event 101 event 102
event 0 event 2
1 0 event 0 event 1 event 2
![Page 60: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/60.jpg)
private[query] class AllPersistenceIdsPublisher(...) extends QueryActorPublisher[String, AllPersistenceIdsState](...) {
override protected def initialState: Future[AllPersistenceIdsState] = Future.successful(AllPersistenceIdsState(Set.empty))
override protected def updateState( state: AllPersistenceIdsState, row: Row): (Option[String], AllPersistenceIdsState) = {
val event = row.getString("persistence_id")
if (state.knownPersistenceIds.contains(event)) { (None, state) } else { (Some(event), state.copy(knownPersistenceIds = state.knownPersistenceIds + event)) } }}
![Page 61: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/61.jpg)
private[query] class AllPersistenceIdsPublisher(...) extends QueryActorPublisher[String, AllPersistenceIdsState](...) {
override protected def initialState: Future[AllPersistenceIdsState] = Future.successful(AllPersistenceIdsState(Set.empty))
override protected def updateState( state: AllPersistenceIdsState, row: Row): (Option[String], AllPersistenceIdsState) = {
val event = row.getString("persistence_id")
if (state.knownPersistenceIds.contains(event)) { (None, state) } else { (Some(event), state.copy(knownPersistenceIds = state.knownPersistenceIds + event)) } }}
![Page 62: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/62.jpg)
Events by tag
0 0
0 1
event 1,tag 1
event 100,tag 1
event 101 event 102
event 0 event 2,tag 1
1 0 event 0 event 1 event 2,tag 1
![Page 63: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/63.jpg)
0 0
0 1
event 1,tag 1
event 100,tag 1
event 101 event 102
event 2,tag 1
1 0 event 0 event 1
event 0
event 2,tag 1
![Page 64: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/64.jpg)
0 0
0 1
event 1,tag 1
event 100,tag 1
event 101 event 102
event 0 event 2,tag 1
1 0 event 1event 0 event 2,tag 1
![Page 65: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/65.jpg)
0 0
0 1
event 1,tag 1
event 100,tag 1
event 101 event 102
event 0 event 2,tag 1
1 0 event 0 event 1 event 2,tag 1
![Page 66: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/66.jpg)
event 0
event 0
0 0
0 1
event 1,tag 1
event 100,tag 1
event 101 event 102
event 2,tag 1
1 0 event 1 event 2,tag 1
![Page 67: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/67.jpg)
event 0
event 0 event 1
0 0
0 1event 100,tag 1
event 101 event 102
event 2,tag 1
1 0event 2,tag 1
event 1,tag 1
![Page 68: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/68.jpg)
0 0
0 1
event 1,tag 1
event 100,tag 1
event 101 event 102
event 2,tag 1
1 0event 2,tag 1
event 0
event 0 event 1
event 1,tag 1
![Page 69: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/69.jpg)
event 1,tag 1
event 2,tag 1
event 0
event 0 event 1
event 1,tag 10 0
0 1event 100,tag 1
event 101 event 102
1 0event 2,tag 1
![Page 70: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/70.jpg)
event 2,tag 1
event 0
event 0 event 1
0 0
0 1event 100,tag 1
event 101 event 102
1 0
event 2,tag 1
event 1,tag 1
![Page 71: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/71.jpg)
0 0
0 1
1 0event 2,tag 1
event 0
event 0 event 1
event 100,tag 1
event 101 event 102
event 2,tag 1
event 1,tag 1
![Page 72: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/72.jpg)
Events by tag
Id 0, event 1
Id 1,event 2
Id 0, event 100
0 0
0 1
event 1,tag 1
event 100,tag 1
event 101 event 102
event 0
1 0 event 0 event 1 event 2,tag 1
Id 0, event 2
tag 1 1/1/2016
tag 1 1/2/2016
event 2,tag 1
SELECT * FROM $eventsByTagViewName$tagId WHERE tag$tagId = ? AND timebucket = ? AND timestamp > ? AND timestamp <= ? ORDER BY timestamp ASC LIMIT ?
![Page 73: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/73.jpg)
Id 1,event 2
Id 0, event 100
Id 0, event 1
0 0
0 1
event 1,tag 1
event 100,tag 1
event 101 event 102
event 0
Id 0, event 2
1 0 event 0 event 1 event 2,tag 1
tag 1 1/1/2016
tag 1 1/2/2016
event 2,tag 1
![Page 74: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/74.jpg)
Id 1,event 2
Id 0, event 100
Id 0, event 1
0 0
0 1
event 1,tag 1
event 100,tag 1
event 101 event 102
event 0
Id 0, event 2
1 0 event 0 event 1 event 2,tag 1
tag 1 1/1/2016
tag 1 1/2/2016
event 2,tag 1
![Page 75: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/75.jpg)
Id 0, event 100
Id 1,event 2
Id 0, event 1
0 0
0 1
event 1,tag 1
event 100,tag 1
event 101 event 102
event 0
Id 0, event 2
1 0 event 0 event 1 event 2,tag 1
tag 1 1/1/2016
tag 1 1/2/2016
event 2,tag 1
![Page 76: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/76.jpg)
Id 0, event 100
Id 1,event 2
Id 0, event 1
0 0
0 1
event 1,tag 1
event 100,tag 1
event 101 event 102
event 0
1 0 event 0 event 1 event 2,tag 1
tag 1 1/1/2016
tag 1 1/2/2016
event 2,tag 1
Id 0, event 2
![Page 77: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/77.jpg)
PRAM
MR MWRYW
Strong Serializable
Linearizable Serializable
Sequential RR SI
Causal
WFR
EC
CS MAW
RC
P-CI
![Page 78: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/78.jpg)
PRAM
MR MWRYW
Strong Serializable
Linearizable Serializable
Sequential RR SI
Causal
WFR
EC
CS MAW
RC
P-CI
![Page 79: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/79.jpg)
PRAM
MR MWRYW
Strong Serializable
Linearizable Serializable
Sequential RR SI
Causal
WFR
EC
CS MAW
RC
P-CI
![Page 80: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/80.jpg)
0 0
0 1
event 1,tag 1
event 100,tag 1
event 101 event 102
event 0 event 2,tag 1
1 0 event 0 event 1 event 2,tag 1
tag 1 1/1/2016
tag 1 1/2/2016
![Page 81: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/81.jpg)
tag 1 1/1/2016
tag 1 1/2/2016
Id 0, event 1
0 0
0 1
event 1,tag 1
event 100,tag 1
event 101 event 102
event 0
1 0 event 0 event 1 event 2,tag 1
persistence_id
seq
0 11 . . .
event 2,tag 1
![Page 82: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/82.jpg)
Id 0, event 100
Id 0, event 1
0 0
0 1
event 1,tag 1
event 100,tag 1
event 101 event 102
event 0
1 0 event 0 event 1 event 2,tag 1
persistence_id
seq
0 ?1 . . .
event 2,tag 1
tag 1 1/1/2016
tag 1 1/2/2016
![Page 83: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/83.jpg)
Id 0, event 100
Id 0, event 2
Id 0, event 1
0 0
0 1
event 1,tag 1
event 100,tag 1
event 101 event 102
event 0
1 0 event 0 event 1 event 2,tag 1
persistence_id
seq
0 ?1
event 2,tag 1
tag 1 1/1/2016
tag 1 1/2/2016
. . .
![Page 84: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/84.jpg)
def replay(): Unit = { val backtracking = isBacktracking val limit = if (backtracking) maxBufferSize else maxBufferSize - buf.size val toOffs = if (backtracking && abortDeadline.isEmpty) highestOffset else UUIDs.endOf(System.currentTimeMillis() - eventualConsistencyDelayMillis) context.actorOf(EventsByTagFetcher.props(tag, currTimeBucket, currOffset, toOffs, limit, backtracking, self, session, preparedSelect, seqNumbers, settings)) context.become(replaying(limit))}
def replaying(limit: Int): Receive = { case env @ UUIDPersistentRepr(offs, _) => // Deliver buffer case ReplayDone(count, seqN, highest) => // Request more case ReplayAborted(seqN, pid, expectedSeqNr, gotSeqNr) => // Causality violation, wait and retry. Only applicable if all events for persistence_id are tagged case ReplayFailed(cause) => // Failure case _: Request => // Deliver buffer case Continue => // Do nothing case Cancel => // Stop}
![Page 85: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/85.jpg)
def replay(): Unit = { val backtracking = isBacktracking val limit = if (backtracking) maxBufferSize else maxBufferSize - buf.size val toOffs = if (backtracking && abortDeadline.isEmpty) highestOffset else UUIDs.endOf(System.currentTimeMillis() - eventualConsistencyDelayMillis) context.actorOf(EventsByTagFetcher.props(tag, currTimeBucket, currOffset, toOffs, limit, backtracking, self, session, preparedSelect, seqNumbers, settings)) context.become(replaying(limit))}
def replaying(limit: Int): Receive = { case env @ UUIDPersistentRepr(offs, _) => // Deliver buffer case ReplayDone(count, seqN, highest) => // Request more case ReplayAborted(seqN, pid, expectedSeqNr, gotSeqNr) => // Causality violation, wait and retry. Only applicable if all events for persistence_id are tagged case ReplayFailed(cause) => // Failure case _: Request => // Deliver buffer case Continue => // Do nothing case Cancel => // Stop}
![Page 86: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/86.jpg)
Akka Persistence Cassandra Replaydef asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long) (replayCallback: (PersistentRepr) => Unit): Future[Unit] = Future { new MessageIterator(persistenceId, fromSequenceNr, toSequenceNr, max).foreach(msg => { replayCallback(msg) }) }
class MessageIterator(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long) extends Iterator[PersistentRepr] { private val initialFromSequenceNr = math.max(highestDeletedSequenceNumber(persistenceId) + 1, fromSequenceNr) private val iter = new RowIterator(persistenceId, initialFromSequenceNr, toSequenceNr) private var mcnt = 0L private var c: PersistentRepr = null private var n: PersistentRepr = PersistentRepr(Undefined) fetch() def hasNext: Boolean = ... def next(): PersistentRepr = … ...}
![Page 87: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/87.jpg)
Akka Persistence Cassandra Replaydef asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long) (replayCallback: (PersistentRepr) => Unit): Future[Unit] = Future { new MessageIterator(persistenceId, fromSequenceNr, toSequenceNr, max).foreach(msg => { replayCallback(msg) }) }
class MessageIterator(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long) extends Iterator[PersistentRepr] { private val initialFromSequenceNr = math.max(highestDeletedSequenceNumber(persistenceId) + 1, fromSequenceNr) private val iter = new RowIterator(persistenceId, initialFromSequenceNr, toSequenceNr) private var mcnt = 0L private var c: PersistentRepr = null private var n: PersistentRepr = PersistentRepr(Undefined) fetch() def hasNext: Boolean = ... def next(): PersistentRepr = … ...}
![Page 88: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/88.jpg)
Akka Persistence Cassandra Replaydef asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long) (replayCallback: (PersistentRepr) => Unit): Future[Unit] = Future { new MessageIterator(persistenceId, fromSequenceNr, toSequenceNr, max).foreach(msg => { replayCallback(msg) }) }
class MessageIterator(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long) extends Iterator[PersistentRepr] { private val initialFromSequenceNr = math.max(highestDeletedSequenceNumber(persistenceId) + 1, fromSequenceNr) private val iter = new RowIterator(persistenceId, initialFromSequenceNr, toSequenceNr) private var mcnt = 0L private var c: PersistentRepr = null private var n: PersistentRepr = PersistentRepr(Undefined) fetch() def hasNext: Boolean = ... def next(): PersistentRepr = … ...}
![Page 89: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/89.jpg)
class RowIterator(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long) extends Iterator[Row] { var currentPnr = partitionNr(fromSequenceNr) var currentSnr = fromSequenceNr var fromSnr = fromSequenceNr var toSnr = toSequenceNr var iter = newIter()
def newIter() = session.execute(preparedSelectMessages.bind(persistenceId, currentPnr, fromSnr, toSnr)).iterator
final def hasNext: Boolean = { if (iter.hasNext) true else if (!inUse) false } else { currentPnr += 1 fromSnr = currentSnr iter = newIter() hasNext } }
def next(): Row = { val row = iter.next() currentSnr = row.getLong("sequence_nr") row }}
![Page 90: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/90.jpg)
class RowIterator(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long) extends Iterator[Row] { var currentPnr = partitionNr(fromSequenceNr) var currentSnr = fromSequenceNr var fromSnr = fromSequenceNr var toSnr = toSequenceNr var iter = newIter()
def newIter() = session.execute(preparedSelectMessages.bind(persistenceId, currentPnr, fromSnr, toSnr)).iterator
final def hasNext: Boolean = { if (iter.hasNext) true else if (!inUse) false } else { currentPnr += 1 fromSnr = currentSnr iter = newIter() hasNext } }
def next(): Row = { val row = iter.next() currentSnr = row.getLong("sequence_nr") row }}
![Page 91: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/91.jpg)
class RowIterator(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long) extends Iterator[Row] { var currentPnr = partitionNr(fromSequenceNr) var currentSnr = fromSequenceNr var fromSnr = fromSequenceNr var toSnr = toSequenceNr var iter = newIter()
def newIter() = session.execute(preparedSelectMessages.bind(persistenceId, currentPnr, fromSnr, toSnr)).iterator
final def hasNext: Boolean = { if (iter.hasNext) true else if (!inUse) false } else { currentPnr += 1 fromSnr = currentSnr iter = newIter() hasNext } }
def next(): Row = { val row = iter.next() currentSnr = row.getLong("sequence_nr") row }}
![Page 92: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/92.jpg)
Non blocking asynchronous replayprivate[this] val queries: CassandraReadJournal = new CassandraReadJournal( extendedActorSystem, context.system.settings.config.getConfig("cassandra-query-journal"))
override def asyncReplayMessages( persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) => Unit): Future[Unit] = queries .eventsByPersistenceId( persistenceId, fromSequenceNr, toSequenceNr, max, replayMaxResultSize, None, "asyncReplayMessages") .runForeach(replayCallback) .map(_ => ())
![Page 93: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/93.jpg)
private[this] val queries: CassandraReadJournal = new CassandraReadJournal( extendedActorSystem, context.system.settings.config.getConfig("cassandra-query-journal"))
override def asyncReplayMessages( persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) => Unit): Future[Unit] = queries .eventsByPersistenceId( persistenceId, fromSequenceNr, toSequenceNr, max, replayMaxResultSize, None, "asyncReplayMessages") .runForeach(replayCallback) .map(_ => ())
![Page 94: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/94.jpg)
Benchmarks
500010 00015 00020 00025 00030 00035 00040 000
500010 00015 00020 00025 00030 00035 00040 000
0 0
10 00020 00030 00040 000
0
50 000
Time
(ms
)
Time
(ms
)
Time
(ms
)
Actors
Threads, Actors
Threads 20 40 60 80 100 120 1405000 10000 15000 20000 25000 30000
10 20 30 40 50 60 70
45 00050 000
blockingasynchronous
REPLAY STRONG SCALING
WEAK SCALING
![Page 95: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/95.jpg)
my-dispatcher { type = "Dispatcher" executor = "thread-pool-executor"
thread-pool-executor { fixed-pool-size = $fixedPoolSize } throughput = $throughput}
my-dispatcher { type = "Dispatcher" executor = "fork-join-executor"
fork-join-executor { parallelism-min = $parallelismMin parallelism-max = $parallelismMax parallelism-factor = $parallelismFactor }
throughput = $throughput}
![Page 96: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/96.jpg)
cassandra-journal { plugin-dispatcher = $pluginDispatcher replay-dispatcher = $replayDispatcher max-result-size = $resultSize max-result-size-replay = $resultSizeReplay target-partition-size = $partitionSize}
cassandra-query-journal { plugin-dispatcher = $queryPluginDispatcher max-buffer-size = $bufferSize max-result-size-query = $resultSizeReplay}
![Page 97: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/97.jpg)
node_id
Alternative architecture
0
1
persistence_id 0, event 0
persistence_id 0, event 1
persistence_id 1, event 0
persistence_id 0, event 2
persistence_id 2, event 0
persistence_id 0, event 3
![Page 98: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/98.jpg)
persistence_id 0, event 0
persistence_id 0, event 1
persistence_id 1, event 0
persistence_id 2, event 0
persistence_id 0, event 2
persistence_id 0, event 3
![Page 99: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/99.jpg)
tag 1 0
all Ids
Id 0, event 1
Id 2, event 1
0 1
0 0 event 1event o
![Page 100: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/100.jpg)
tag 1 0
allIds
Id 0, event 1
Id 2, event 1
0 1
0 0 event 0 event 1
val boundStatements = statementGroup(eventsByPersistenceId, eventsByTag, allPersistenceIds)
Future.sequence(boundStatements).flatMap { stmts => val batch = new BatchStatement().setConsistencyLevel(...).setRetryPolicy(...) stmts.foreach(batch.add) session.underlying().flatMap(_.executeAsync(batch))}
![Page 101: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/101.jpg)
tag 1 0
allIds
Id 0, event 1
Id 2, event 1
0 1
0 0 event 0 event 1
val boundStatements = statementGroup(eventsByPersistenceId, eventsByTag, allPersistenceIds)
Future.sequence(boundStatements).flatMap { stmts => val batch = new BatchStatement().setConsistencyLevel(...).setRetryPolicy(...) stmts.foreach(batch.add) session.underlying().flatMap(_.executeAsync(batch))}
![Page 102: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/102.jpg)
val eventsByPersistenceIdStatement = statementGroup(eventsByPersistenceIdStatement)val boundStatements = statementGroup(eventsByTagStatement, allPersistenceIdsStatement)...session.underlying().flatMap { s => val ebpResult = s.executeAsync(eventsByPersistenceIdStatement) val batchResult = s.executeAsync(batch)) ...}
tag 1 0
allIds
Id 0, event 1
Id 2, event 1
0 1
0 0 event 0 event 1
![Page 103: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/103.jpg)
val eventsByPersistenceIdStatement = statementGroup(eventsByPersistenceIdStatement)val boundStatements = statementGroup(eventsByTagStatement, allPersistenceIdsStatement)...session.underlying().flatMap { s => val ebpResult = s.executeAsync(eventsByPersistenceIdStatement) val batchResult = s.executeAsync(batch)) ...}
tag 1 0
allIds
Id 0, event 1
Id 2, event 1
0 1
0 0 event 0 event 1
![Page 104: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/104.jpg)
Event time processing● Ingestion time, processing time, event time
![Page 105: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/105.jpg)
![Page 106: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/106.jpg)
Ordering
10 2
1 12:34:57 1
KEY TIME VALUE
2 12:34:58 2
KEY TIME VALUE
0 12:34:56 0
KEY TIME VALUE
![Page 107: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/107.jpg)
0
1
21 12:34:57 1
KEY TIME VALUE
2 12:34:58 2
KEY TIME VALUE
0 12:34:56 0
KEY TIME VALUE
![Page 108: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/108.jpg)
Distributed causal stream merging
Id 0,event 2
Id 0,event 1
Id 0,event 0
Id 1,event 00
1Id 2,event 0
Id 0,event 3
node_id
![Page 109: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/109.jpg)
Id 0,event 2
Id 0,event 1
Id 0,event 0
Id 1,event 00
1Id 2,event 0
Id 0,event 3
Id 0,event 0
node_id
![Page 110: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/110.jpg)
Id 0,event 2
Id 0,event 1
Id 0,event 0
Id 1,event 00
1Id 2,event 0
Id 0,event 3
Id 0,event 0
node_id
![Page 111: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/111.jpg)
Id 0,event 2
Id 0,event 1
Id 0,event 0
Id 1,event 00
1Id 2,event 0
Id 0,event 3
Id 0,event 0
node_id
persistence_id
seq
0 0
1 . . .
2 . . .
![Page 112: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/112.jpg)
persistence_id
seq
0 1
1 . . .
2 . . .
Id 0,event 2
Id 0,event 1
Id 0,event 0
Id 1,event 0
node_id
0
1Id 2,event 0
Id 0,event 0
Id 0,event 1
Id 0,event 3
![Page 113: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/113.jpg)
persistence_id
seq
0 2
1 0
2 0Id 0,event 1
Id 0,event 0
Id 1,event 0
node_id
0
1Id 2,event 0
Id 0,event 0
Id 0,event 1
Id 0,event 2
Id 0,event 3
Id 2,event 0
Id 0,event 2
Id 1,event 0
![Page 114: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/114.jpg)
Id 0,event 2
Id 0,event 1
Id 0,event 0
Id 1,event 00
1Id 2,event 0
Id 0,event 3
Id 0,event 0
Id 0,event 1
Id 2,event 0
Id 0,event 2
Id 0,event 3
node_id
Id 1,event 0
persistence_id
seq
0 3
1 0
2 0
![Page 115: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/115.jpg)
Id 0,event 2
Id 0,event 1
Id 0,event 0
Id 1,event 00
1 Id 2,event 0
Id 0,event 3
Id 0,event 0
Id 0,event 1
Id 2,event 0
Id 0,event 2
node_id
Id 1,event 0 0 0 Id 0,
event 0Id 0,event 1
Replay
![Page 116: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/116.jpg)
Id 0,event 2
Id 0,event 1
Id 0,event 0
Id 1,event 00
1 Id 2,event 0
Id 0,event 3
Id 0,event 0
Id 0,event 1
Id 2,event 0
Id 0,event 2
node_id
Id 1,event 0 0 0 Id 0,
event 0Id 0,event 1
![Page 117: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/117.jpg)
Id 0,event 2
Id 0,event 1
Id 0,event 0
Id 1,event 00
1 Id 2,event 0
Id 0,event 3
Id 0,event 0
Id 0,event 1
Id 2,event 0
Id 0,event 2
Id 1,event 0 0 0 Id 0,
event 0Id 0,event 1
node_id
![Page 118: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/118.jpg)
Id 0,event 2
Id 0,event 1
Id 0,event 0
Id 1,event 00
1 Id 2,event 0
Id 0,event 3
Id 0,event 0
Id 0,event 1
Id 2,event 0
Id 0,event 2
Id 1,event 0 0 0 Id 0,
event 0Id 0,event 1
node_id
persistence_id
seq
0 2
![Page 119: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/119.jpg)
Id 0,event 2
Id 0,event 1
Id 0,event 0
Id 1,event 00
Id 2,event 0
Id 0,event 3
Id 0,event 0
Id 0,event 1
Id 2,event 0
Id 0,event 2
Id 1,event 0 0 0 Id 0,
event 0Id 0,event 1
persistence_id
seq
0 2
stream_id seq
0 1
1 2
1
node_id
![Page 120: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/120.jpg)
Exactly once delivery
![Page 121: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/121.jpg)
Id 0,event 0
Id 0,event 1
Id 2,event 0
Id 0,event 2
Id 0,event 3
Id 1,event 0
![Page 122: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/122.jpg)
Id 0,event 0
Id 0,event 1
Id 2,event 0
Id 0,event 2
Id 0,event 3
Id 1,event 0
![Page 123: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/123.jpg)
Id 0,event 0
Id 0,event 1
Id 2,event 0
Id 0,event 2
Id 0,event 3
Id 1,event 0
Id 0,event 0
Id 0,event 1
Id 2,event 0
Id 0,event 3
Id 1,event 0
ACK ACK ACK ACK ACK
![Page 124: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/124.jpg)
Id 0,event 0
Id 0,event 1
Id 2,event 0
Id 0,event 2
Id 0,event 3
Id 1,event 0
Id 0,event 0
Id 0,event 1
Id 2,event 0
Id 0,event 3
Id 1,event 0
ACK ACK ACK ACK ACK
![Page 125: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/125.jpg)
Id 0,event 0
Id 0,event 1
Id 2,event 0
Id 0,event 2
Id 0,event 3
Id 1,event 0
Id 0,event 0
Id 0,event 1
Id 2,event 0
Id 0,event 3
Id 1,event 0
ACK ACK ACK ACK ACK
![Page 126: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/126.jpg)
Exactly once delivery● Durable offset
0 1 2 3 4
![Page 127: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/127.jpg)
0 1 2 3 4
![Page 128: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/128.jpg)
10 2 3 4
![Page 129: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/129.jpg)
10 3 42
![Page 130: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/130.jpg)
node_id
0
1
Id 0, event 0
Id 0, event 1
Id 1, event 0
Id 0, event 2
Id 2, event 0
Id 0, event 3
Id 0, event 0
Id 0, event 1
Id 1, event 0
Id 2, event 0
Id 0, event 2
Id 0, event 3 tag 1 0
allIds
Id 0, event 1
Id 2, event 1
0 1
0 0 event 0 event 1
![Page 131: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/131.jpg)
val conf = new SparkConf().setAppName("...").setMaster("...").set("spark.cassandra.connection.host", "...")
val sc = new SparkContext(conf)
implicit val ordering = new Ordering[(String, Double)] { override def compare(x: (String, Double), y: (String, Double)): Int = implicitly[Ordering[Double]].compare(x._2, y._2)}
sc.eventTable() .cache() .flatMap { case (JournalKey(persistenceId, _, _), BalanceUpdatedEvent(change)) => (persistenceId -> change) :: Nil case _ => Nil } .reduceByKey(_ + _) .top(100) .foreach(println)
sc.stop()
Akka Analytics
![Page 132: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/132.jpg)
val conf = new SparkConf().setAppName("...").setMaster("...").set("spark.cassandra.connection.host", "...")
val sc = new StreamingContext(conf, Seconds(5))
implicit val ordering = new Ordering[(String, Double)] { override def compare(x: (String, Double), y: (String, Double)): Int = implicitly[Ordering[Double]].compare(x._2, y._2)}
sc.eventTable() .cache() .flatMap { case (JournalKey(persistenceId, _, _), BalanceUpdatedEvent(change)) => (persistenceId -> change) :: Nil case _ => Nil } .reduceByKey(_ + _) .top(100) .foreach(println)
sc.stop()
![Page 133: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/133.jpg)
internet
services
devices
social
Kafka Stream processing
apps
Stream consumer
Search
Apps
Services
Databases
Batch
Batch
Serialisation
![Page 134: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/134.jpg)
Distributed systems
User
Mobile
System
Microservice
Microservice
MicroserviceMicroservice Microservice Microservice
Microservice
CQRS/ES Relational NoSQL
![Page 135: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/135.jpg)
Client 1
Client 2
Client 3
Update
Update
UpdateModel devices Model devices Model devices
Input data Input data Input data
Parameter devices
P
ΔP
ΔP
ΔP
![Page 136: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/136.jpg)
Challenges
● All the solved problems○ Exactly once delivery○ Consistency○ Availability○ Fault tolerance○ Cross service invariants and consistency○ Transactions○ Automated deployment and configuration management○ Serialization, versioning, compatibility○ Automated elasticity○ No downtime version upgrades○ Graceful shutdown of nodes○ Distributed system verification, logging, tracing, monitoring, debugging○ Split brains○ ...
![Page 137: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/137.jpg)
Conclusion
● From request, response, synchronous, mutable state● To streams, asynchronous messaging
● Production ready distributed systems
![Page 138: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/138.jpg)
Questions
MANCHESTER LONDON NEW YORK
![Page 139: Data in Motion: Streaming Static Data Efficiently 2](https://reader033.vdocuments.us/reader033/viewer/2022051521/586f78351a28ab10258b6aab/html5/thumbnails/139.jpg)
MANCHESTER LONDON NEW YORK
@zapletal_martin @cakesolutions
347 708 1518
We are hiringhttp://www.cakesolutions.net/careers