event sourcing
DESCRIPTION
event sourcing made easy, presented at community events http://goo.gl/VguQiTRANSCRIPT
Event Sourcing
Stop fussing with infrastructure.Start solving business problems.
DefectId Description Status Priority ChangedBy
1 Application hangs when resuming from Hibernate
1 1 3
2 Changing permissions to owner and removing yourself …
1 2 2
3 The link text should be sentence case on the Home page
2 3 1
4 Switching back and forth between the tabs causes …
3 2 4
5 Double clicking on the tab causes the editing cursor …
2 2 5
StatusId Name1 Open
2 Fixed
3 Closed
PriorityId Name
1 P1
2 P2
3 P3
UserId Name
1 Dave Hoerster
2 Craig Oaks
3 Anthony Mastrean
4 Dennis Kozora
5 Chris Geihsler
DefectId Description Status Priority ChangedBy
2 Changing permissions to owner and removing … Open P1 Anthony
DefectId Description Status Priority ChangedBy
2 Changing permissions to owner and removing … Fixed P1 Chris
DefectId Description Status Priority ChangedBy
2 Changing permissions to owner and removing … Open P1 Anthony
EntityId Type CurrentVersion
2 Defect 1
EntityId Version EventType Data
2 1 DefectOpened Byte[]
Entity Table Event Table
EntityId Type CurrentVersion
2 Defect 2
EntityId Version EventType Data
2 1 DefectOpened Byte[]
2 2 DefectFixed Byte[]
Entity Table Event Table
EntityId Type CurrentVersion
2 Defect 3
EntityId Version EventType Data
2 1 DefectOpened Byte[]
2 2 DefectFixed Byte[]
2 3 DefectOpened Byte[]
Entity Table Event Table
class EventStore{ void Add(int entityId, int expectedVersion, IEnumerable<DomainEvent> events)
List<DomainEvent> GetAllEvents<TEntity>(int entityId)}
class MarkDefectAsFixedCommand{ int DefectId;}
class MarkDefectAsFixedCommandHandler : ICommandHandler<MarkDefectAsFixed>{ void Handle(MarkDefectAsFixedCommand command) { var defect = // TODO get from the event store defect.MarkAsFixed(); repository.Save(defect); }}
class Defect : Entity{ bool isFixed;
void MarkAsFixed() { if(isFixed) throw new InvalidOperationException(); isFixed = true; Raise(new DefectWasFixedEvent(this.EntityId)); }}
class DefectWasFixedEvent : DomainEvent{ int DefectId;}
abstract class Entity{ int EntityId; int CurrentVersion; List<DomainEvent> unsavedEvents;
List<DomainEvent> GetUnsavedEvents() { return unsavedEvents; }
void Raise(DomainEvent event) { unsavedEvents.Add(event); }
void ReplayEvents(IEnumerable<DomainEvent> events) { foreach(var event in events) Handle(event) }}
class Repository{ void Save<TEntity>(TEntity entity) { var unsavedEvents = entity.GetUnsavedEvents(); eventStore.Add(entity.EntityId, entity.CurrentVersion, unsavedEvents); }
TEntity GetById<TEntity>(int id) { var eventsToReplay = eventStore.GetAllEvents<TEntity>(id) var entity = new TEntity(); entity.ReplayEvents(eventsToReplay); return entity; }}
class EventStore{ void Add(int entityId, int expectedVersion, IEnumerable<DomainEvent> events) { Begin version = SELECT CurrentVersion FROM Entity WHERE EntityId = @(entityId)
if version is null INSERT INTO Entity CurrentVersion = 0 end
if(version != @(expectedVersion) raise concurrency problem
foreach event insert event with incremented version number
update entity with last version number End Transaction } List<DomainEvent> GetAllEvents<TEntity>(int entityId) { SELECT * FROM Events WHERE EntityId = @(entityId) ORDER BY Version }}
class MarkDefectAsFixedCommand{ int DefectId;}
class MarkDefectAsFixedCommandHandler : ICommandHandler<MarkDefectAsFixed>{ void Handle(MarkDefectAsFixedCommand command) { var defect = repository.GetById<Defect>(command.DefectId); defect.MarkAsFixed(); repository.Save(defect); }}
class Repository{ void Save<TEntity>(TEntity entity) { ... }
TEntity GetById<TEntity>(int entityId) { var eventsToReplay = eventStore.GetAllEvents<TEntity>(entityId); // TODO rebuild entity from those events }}
class EventStore{ void Add(int entityId, int expectedVersion, IEnumerable<DomainEvent> events) {
... } List<DomainEvent> GetAllEvents<TEntity>(int entityId) { SELECT * FROM Events WHERE EntityId = @(entityId) ORDER BY Version }}
class Repository{ void Save<TEntity>(TEntity entity) { ... }
TEntity GetById<TEntity>(int entityId) { var eventsToReplay = eventStore.GetAllEvents<TEntity>(entityId); var entity = new TEntity(); entity.ReplayEvents(eventsToReplay); return entity; }}
abstract class Entity{ int EntityId; int CurrentVersion; List<DomainEvent> unsavedEvents;
List<DomainEvent> GetUnsavedEvents() { return unsavedEvents; }
void Raise(DomainEvent event) { unsavedEvents.Add(event); }
void ReplayEvents(IEnumerable<DomainEvent> events) { foreach(var event in events) Handle(event) }}
class Defect : Entity{ bool isFixed; int defectId;
void MarkAsFixed() { if(isFixed) throw new InvalidOperationException(); isFixed = true; Raise(new DefectWasFixedEvent(defectId)); }
void OnDefectWasFixed(DefectWasFixedEvent event) { isFixed = true; }}
class Defect : Entity{ bool isFixed; int defectId;
void MarkAsFixed() { if(isFixed) throw new InvalidOperationException(); Raise(new DefectWasFixedEvent(defectId)); }
void OnDefectWasFixed(DefectWasFixedEvent event) { isFixed = true; }}
abstract class Entity{ List<DomainEvent> unsavedEvents; List<DomainEvent> GetUnsavedEvents() { return unsavedEvents; } void Handle(DomainEvent event) { // event handler retrieval/invocation } void Raise(DomainEvent event) { Handle(event); unsavedEvents.Add(event); }
void ReplayEvents(IEnumerable<DomainEvent> events) { foreach(var event in events) Handle(event) }}
Queries
class DefectListQueryHandler : IQueryHandler<DefectListQueryRequest>{ IEnumerable Query(DefectListQueryRequest request) {
return SELECT * FROM DefectList }}
DefectId Description Status Priority
2 Changing permissions to owner and removing … Open P1
DefectList Table
class Repository{ void Save<TEntity>(TEntity entity) { var unsavedEvents = entity.GetUnsavedEvents(); eventStore.Add(entity.EntityId, entity.CurrentVersion, unsavedEvents); // publish events “externally” }
TEntity GetById<TEntity>(int id) { ... }}
class DefectWasFixedDenormalizer : IDenormalizer<DefectWasFixedEvent>{ void Denormalize(DefectWasFixedEvent event) {
UPDATEDefectList
SETStatus = ‘Fixed’ //The actual string, not a
lookup!WHERE
DefectId = @(event.DefectId) }}
Snapshots
6
5
4
3
2
1
7
5
snap
4
3
2
1
6
class Defect : Entity, ISnapshotable{ bool isFixed; int fixedByUserId;
DefectSnapshot ISnapshotable.TakeSnapshot() { return new DefectSnapshot { IsFixed = isFixed, FixedByUserId = fixedByUserId }; }
void ISnapshotable.RebuildFromSnapshot(DefectSnapshot snapshot) { isFixed = snapshot.IsFixed; fixedByUserId = snapshot.FixedByUserId; }
...}
class Repository{ void Save<TEntity>(TEntity entity) { ... }
TEntity GetById<TEntity>(int entityId) { var entity = new TEntity();
var currentSnapshot = eventStore.GetCurrentSnapshot<TEntity>(entityId); entity.RebuildFromSnapshot(currentSnapshot);
var eventsAfterSnapshot = eventStore.GetEventsAfter<TEntity>( entityId, currentSnapshot.EntityVersion); entity.ReplayEvents(eventsToReplay);
return entity; }}
Event Versioning
class MarkDefectAsFixedCommand{ int DefectId; int FixedByUserId;}
class MarkDefectAsFixedCommandHandler : ICommandHandler<MarkDefectAsFixed>{ void Handle(MarkDefectAsFixedCommand command) { var defect = repository.GetById<Defect>(command.DefectId); defect.MarkAsFixed(command.FixedByUserId); repository.Save(defect); }}
class Defect : Entity{ bool isFixed; int fixedByUserId;
void MarkAsFixed(int fixedByUserId) { if(isFixed) throw new InvalidOperationException(); Raise(new DefectWasFixedEvent_v2(defectId, fixedByUserId)); }
void OnDefectWasFixed(DefectWasFixedEvent_v2 event) { isFixed = true; fixedByUserId = event.FixedByUserId; }}
class DefectWasFixedEvent_v2 : DomainEvent{ int DefectId; int FixedByUserId;}
abstract class Entity{ ...
DomainEvent ConvertEventToCurrentVersion(DomainEvent oldEvent) { // find and execute any converters that exist for this event type }
void ReplayEvents(IEnumerable<DomainEvent> events) { foreach(var event in events) Handle(ConvertEventToCurrentVersion(event)) }}
class MarkAsFixedDomainEventConverter_v1_to_v2 : IDomainEventConverter<MarkAsFixedDomainEvent, MarkAsFixedDomainEvent_v2>{ MarkAsFixedDomainEvent_v2 Convert(MarkAsFixedDomainEvent oldEvent) { return new DefectWasFixedEvent_v2(oldEvent.DefectId, UnknownUserId); }}
True Separation of Domain Model from Schema!
class Defect : Entity{ DefectState currentState;
void MarkAsFixed(int fixedByUserId) { Raise(new DefectWasFixedEvent_v2(defectId, fixedByUserId)); }
void OnDefectWasFixed(DefectWasFixedEvent_v2 event) { currentState.PromoteToFixed(fixedByUserId); }}
class DefectState{ void PromoteToFixed(int fixedByUserId) { ... }}
Testing
[TestClass]class DefectTests : EntityTest<Defect>{ [TestMethod] void MarkAsFixedRaisesEvent() { var defect = new Defect(); defect.MarkAsFixed(); AssertEventIsRaised<DefectWasFixedEvent>(defect); } [TestMethod] [ExpectedException(typeof(InvalidOperationException)] void CannotMarkADefectFixedTwice() { var defect = new Defect(); defect.MarkAsFixed(); defect.MarkAsFixed(); } }
abstract class EntityTest<TEntity>{ void AssertEventIsRaised<TDomainEvent>(TEntity entity) { Assert.IsNotEmpty(entity.GetUnsavedEvents().OfType<TDomainEvent>()); }}
Concurrency
class EventStore{ void Add(int entityId, int expectedVersion, IEnumerable<DomainEvent> events) { Begin version = SELECT CurrentVersion FROM Entity WHERE EntityId = @(entityId)
if version is null INSERT INTO Entity CurrentVersion = 0 end
if(version != @(expectedVersion) raise concurrency problem
foreach event insert event with incremented version number
update entity with last version number End Transaction } List<DomainEvent> GetAllEvents<TEntity>(int entityId) { SELECT * FROM Events WHERE EntityId = @(entityId) ORDER BY Version }}
DefectId Description Status Priority ChangedBy
2 Changing permissions to owner and removing … Open P1 Anthony
DefectId Description Status Priority ChangedBy
2 Changing permissions to owner and removing … Fixed P1 Chris
DefectId Description Status Priority ChangedBy
2 Changing permissions to owner and removing … Open P2 Anthony
DefectId Description Status Priority ChangedBy
2 Changing permissions to owner and removing … Fixed P1 Chris
DefectId Description Status Priority ChangedBy
2 Changing permissions to owner and removing … Open P2 Anthony
EntityId Version EventType Data
2 1 DefectOpened Byte[]
Event Table
EntityId Version EventType Data
2 1 DefectOpened Byte[]
EntityId Version EventType Data
2 1 DefectOpened Byte[]
2 2 DefectFixed Byte[]
2 2 DefectReprioritized Byte[]
Event Table
EntityId Version EventType Data
2 1 DefectOpened Byte[]
EntityId Version EventType Data
2 1 DefectOpened Byte[]
2 2 DefectFixed Byte[]
2 2 DefectReprioritized Byte[]
EntityId Version EventType Data
2 1 DefectOpened Byte[]
2 2 DefectFixed Byte[]
2 3 DefectReprioritized Byte[]
Event Table
EntityId Version EventType Data
2 1 DefectOpened Byte[]
Event Table
EntityId Version EventType Data
2 1 DefectOpened Byte[]
EntityId Version EventType Data
2 1 DefectOpened Byte[]
2 2 DefectClosed Byte[]
2 2 DefectFixed Byte[]
Event Table
EntityId Version EventType Data
2 1 DefectOpened Byte[]
EntityId Version EventType Data
2 1 DefectOpened Byte[]
2 2 DefectClosed Byte[]
2 2 DefectFixed Byte[]
Event Table
Contraindications
More InformationGreg YoungJonathan OliverUdi DahanCQRS InfoNCQRS Framework