event sourcing

64
Event Sourcing Stop fussing with infrastructure. Start solving business problems.

Upload: anthony-mastrean

Post on 11-May-2015

1.563 views

Category:

Technology


0 download

DESCRIPTION

event sourcing made easy, presented at community events http://goo.gl/VguQi

TRANSCRIPT

Page 1: Event sourcing

Event Sourcing

Stop fussing with infrastructure.Start solving business problems.

Page 2: Event sourcing
Page 3: Event sourcing
Page 4: Event sourcing
Page 5: Event sourcing

DefectId Description Status Priority ChangedBy

1 Application hangs when resuming from Hibernate

1 1 3

2 Changing permissions to owner and removing yourself …

1 2 2

3 The link text should be sentence case on the Home page

2 3 1

4 Switching back and forth between the tabs causes …

3 2 4

5 Double clicking on the tab causes the editing cursor …

2 2 5

StatusId Name1 Open

2 Fixed

3 Closed

PriorityId Name

1 P1

2 P2

3 P3

UserId Name

1 Dave Hoerster

2 Craig Oaks

3 Anthony Mastrean

4 Dennis Kozora

5 Chris Geihsler

Page 6: Event sourcing
Page 7: Event sourcing

DefectId Description Status Priority ChangedBy

2 Changing permissions to owner and removing … Open P1 Anthony

Page 8: Event sourcing

DefectId Description Status Priority ChangedBy

2 Changing permissions to owner and removing … Fixed P1 Chris

Page 9: Event sourcing

DefectId Description Status Priority ChangedBy

2 Changing permissions to owner and removing … Open P1 Anthony

Page 10: Event sourcing

EntityId Type CurrentVersion

2 Defect 1

EntityId Version EventType Data

2 1 DefectOpened Byte[]

Entity Table Event Table

Page 11: Event sourcing

EntityId Type CurrentVersion

2 Defect 2

EntityId Version EventType Data

2 1 DefectOpened Byte[]

2 2 DefectFixed Byte[]

Entity Table Event Table

Page 12: Event sourcing

EntityId Type CurrentVersion

2 Defect 3

EntityId Version EventType Data

2 1 DefectOpened Byte[]

2 2 DefectFixed Byte[]

2 3 DefectOpened Byte[]

Entity Table Event Table

Page 13: Event sourcing

class EventStore{ void Add(int entityId, int expectedVersion, IEnumerable<DomainEvent> events)

List<DomainEvent> GetAllEvents<TEntity>(int entityId)}

Page 14: Event sourcing
Page 15: Event sourcing
Page 16: Event sourcing

class MarkDefectAsFixedCommand{ int DefectId;}

class MarkDefectAsFixedCommandHandler : ICommandHandler<MarkDefectAsFixed>{ void Handle(MarkDefectAsFixedCommand command) { var defect = // TODO get from the event store defect.MarkAsFixed(); repository.Save(defect); }}

Page 17: Event sourcing

class Defect : Entity{ bool isFixed;

void MarkAsFixed() { if(isFixed) throw new InvalidOperationException(); isFixed = true; Raise(new DefectWasFixedEvent(this.EntityId)); }}

class DefectWasFixedEvent : DomainEvent{ int DefectId;}

Page 18: Event sourcing

abstract class Entity{ int EntityId; int CurrentVersion; List<DomainEvent> unsavedEvents;

List<DomainEvent> GetUnsavedEvents() { return unsavedEvents; }

void Raise(DomainEvent event) { unsavedEvents.Add(event); }

void ReplayEvents(IEnumerable<DomainEvent> events) { foreach(var event in events) Handle(event) }}

Page 19: Event sourcing

class Repository{ void Save<TEntity>(TEntity entity) { var unsavedEvents = entity.GetUnsavedEvents(); eventStore.Add(entity.EntityId, entity.CurrentVersion, unsavedEvents); }

TEntity GetById<TEntity>(int id) { var eventsToReplay = eventStore.GetAllEvents<TEntity>(id) var entity = new TEntity(); entity.ReplayEvents(eventsToReplay); return entity; }}

Page 20: Event sourcing

class EventStore{ void Add(int entityId, int expectedVersion, IEnumerable<DomainEvent> events) { Begin version = SELECT CurrentVersion FROM Entity WHERE EntityId = @(entityId)

if version is null INSERT INTO Entity CurrentVersion = 0 end

if(version != @(expectedVersion) raise concurrency problem

foreach event insert event with incremented version number

update entity with last version number End Transaction } List<DomainEvent> GetAllEvents<TEntity>(int entityId) { SELECT * FROM Events WHERE EntityId = @(entityId) ORDER BY Version }}

Page 21: Event sourcing
Page 22: Event sourcing

class MarkDefectAsFixedCommand{ int DefectId;}

class MarkDefectAsFixedCommandHandler : ICommandHandler<MarkDefectAsFixed>{ void Handle(MarkDefectAsFixedCommand command) { var defect = repository.GetById<Defect>(command.DefectId); defect.MarkAsFixed(); repository.Save(defect); }}

Page 23: Event sourcing

class Repository{ void Save<TEntity>(TEntity entity) { ... }

TEntity GetById<TEntity>(int entityId) { var eventsToReplay = eventStore.GetAllEvents<TEntity>(entityId); // TODO rebuild entity from those events }}

Page 24: Event sourcing

class EventStore{ void Add(int entityId, int expectedVersion, IEnumerable<DomainEvent> events) {

... } List<DomainEvent> GetAllEvents<TEntity>(int entityId) { SELECT * FROM Events WHERE EntityId = @(entityId) ORDER BY Version }}

Page 25: Event sourcing

class Repository{ void Save<TEntity>(TEntity entity) { ... }

TEntity GetById<TEntity>(int entityId) { var eventsToReplay = eventStore.GetAllEvents<TEntity>(entityId); var entity = new TEntity(); entity.ReplayEvents(eventsToReplay); return entity; }}

Page 26: Event sourcing

abstract class Entity{ int EntityId; int CurrentVersion; List<DomainEvent> unsavedEvents;

List<DomainEvent> GetUnsavedEvents() { return unsavedEvents; }

void Raise(DomainEvent event) { unsavedEvents.Add(event); }

void ReplayEvents(IEnumerable<DomainEvent> events) { foreach(var event in events) Handle(event) }}

Page 27: Event sourcing

class Defect : Entity{ bool isFixed; int defectId;

void MarkAsFixed() { if(isFixed) throw new InvalidOperationException(); isFixed = true; Raise(new DefectWasFixedEvent(defectId)); }

void OnDefectWasFixed(DefectWasFixedEvent event) { isFixed = true; }}

Page 28: Event sourcing

class Defect : Entity{ bool isFixed; int defectId;

void MarkAsFixed() { if(isFixed) throw new InvalidOperationException(); Raise(new DefectWasFixedEvent(defectId)); }

void OnDefectWasFixed(DefectWasFixedEvent event) { isFixed = true; }}

Page 29: Event sourcing

abstract class Entity{ List<DomainEvent> unsavedEvents; List<DomainEvent> GetUnsavedEvents() { return unsavedEvents; } void Handle(DomainEvent event) { // event handler retrieval/invocation } void Raise(DomainEvent event) { Handle(event); unsavedEvents.Add(event); }

void ReplayEvents(IEnumerable<DomainEvent> events) { foreach(var event in events) Handle(event) }}

Page 30: Event sourcing
Page 31: Event sourcing

Queries

Page 32: Event sourcing
Page 33: Event sourcing
Page 34: Event sourcing
Page 35: Event sourcing

class DefectListQueryHandler : IQueryHandler<DefectListQueryRequest>{ IEnumerable Query(DefectListQueryRequest request) {

return SELECT * FROM DefectList }}

DefectId Description Status Priority

2 Changing permissions to owner and removing … Open P1

DefectList Table

Page 36: Event sourcing

class Repository{ void Save<TEntity>(TEntity entity) { var unsavedEvents = entity.GetUnsavedEvents(); eventStore.Add(entity.EntityId, entity.CurrentVersion, unsavedEvents); // publish events “externally” }

TEntity GetById<TEntity>(int id) { ... }}

Page 37: Event sourcing

class DefectWasFixedDenormalizer : IDenormalizer<DefectWasFixedEvent>{ void Denormalize(DefectWasFixedEvent event) {

UPDATEDefectList

SETStatus = ‘Fixed’ //The actual string, not a

lookup!WHERE

DefectId = @(event.DefectId) }}

Page 38: Event sourcing
Page 39: Event sourcing

Snapshots

Page 40: Event sourcing

6

5

4

3

2

1

7

Page 41: Event sourcing

5

snap

4

3

2

1

6

Page 42: Event sourcing

class Defect : Entity, ISnapshotable{ bool isFixed; int fixedByUserId;

DefectSnapshot ISnapshotable.TakeSnapshot() { return new DefectSnapshot { IsFixed = isFixed, FixedByUserId = fixedByUserId }; }

void ISnapshotable.RebuildFromSnapshot(DefectSnapshot snapshot) { isFixed = snapshot.IsFixed; fixedByUserId = snapshot.FixedByUserId; }

...}

Page 43: Event sourcing

class Repository{ void Save<TEntity>(TEntity entity) { ... }

TEntity GetById<TEntity>(int entityId) { var entity = new TEntity();

var currentSnapshot = eventStore.GetCurrentSnapshot<TEntity>(entityId); entity.RebuildFromSnapshot(currentSnapshot);

var eventsAfterSnapshot = eventStore.GetEventsAfter<TEntity>( entityId, currentSnapshot.EntityVersion); entity.ReplayEvents(eventsToReplay);

return entity; }}

Page 44: Event sourcing

Event Versioning

Page 45: Event sourcing

class MarkDefectAsFixedCommand{ int DefectId; int FixedByUserId;}

class MarkDefectAsFixedCommandHandler : ICommandHandler<MarkDefectAsFixed>{ void Handle(MarkDefectAsFixedCommand command) { var defect = repository.GetById<Defect>(command.DefectId); defect.MarkAsFixed(command.FixedByUserId); repository.Save(defect); }}

Page 46: Event sourcing

class Defect : Entity{ bool isFixed; int fixedByUserId;

void MarkAsFixed(int fixedByUserId) { if(isFixed) throw new InvalidOperationException(); Raise(new DefectWasFixedEvent_v2(defectId, fixedByUserId)); }

void OnDefectWasFixed(DefectWasFixedEvent_v2 event) { isFixed = true; fixedByUserId = event.FixedByUserId; }}

class DefectWasFixedEvent_v2 : DomainEvent{ int DefectId; int FixedByUserId;}

Page 47: Event sourcing

abstract class Entity{ ...

DomainEvent ConvertEventToCurrentVersion(DomainEvent oldEvent) { // find and execute any converters that exist for this event type }

void ReplayEvents(IEnumerable<DomainEvent> events) { foreach(var event in events) Handle(ConvertEventToCurrentVersion(event)) }}

class MarkAsFixedDomainEventConverter_v1_to_v2 : IDomainEventConverter<MarkAsFixedDomainEvent, MarkAsFixedDomainEvent_v2>{ MarkAsFixedDomainEvent_v2 Convert(MarkAsFixedDomainEvent oldEvent) { return new DefectWasFixedEvent_v2(oldEvent.DefectId, UnknownUserId); }}

Page 48: Event sourcing

True Separation of Domain Model from Schema!

Page 49: Event sourcing

class Defect : Entity{ DefectState currentState;

void MarkAsFixed(int fixedByUserId) { Raise(new DefectWasFixedEvent_v2(defectId, fixedByUserId)); }

void OnDefectWasFixed(DefectWasFixedEvent_v2 event) { currentState.PromoteToFixed(fixedByUserId); }}

class DefectState{ void PromoteToFixed(int fixedByUserId) { ... }}

Page 50: Event sourcing

Testing

Page 51: Event sourcing

[TestClass]class DefectTests : EntityTest<Defect>{ [TestMethod] void MarkAsFixedRaisesEvent() { var defect = new Defect(); defect.MarkAsFixed(); AssertEventIsRaised<DefectWasFixedEvent>(defect); } [TestMethod] [ExpectedException(typeof(InvalidOperationException)] void CannotMarkADefectFixedTwice() { var defect = new Defect(); defect.MarkAsFixed(); defect.MarkAsFixed(); } }

abstract class EntityTest<TEntity>{ void AssertEventIsRaised<TDomainEvent>(TEntity entity) { Assert.IsNotEmpty(entity.GetUnsavedEvents().OfType<TDomainEvent>()); }}

Page 52: Event sourcing

Concurrency

Page 53: Event sourcing

class EventStore{ void Add(int entityId, int expectedVersion, IEnumerable<DomainEvent> events) { Begin version = SELECT CurrentVersion FROM Entity WHERE EntityId = @(entityId)

if version is null INSERT INTO Entity CurrentVersion = 0 end

if(version != @(expectedVersion) raise concurrency problem

foreach event insert event with incremented version number

update entity with last version number End Transaction } List<DomainEvent> GetAllEvents<TEntity>(int entityId) { SELECT * FROM Events WHERE EntityId = @(entityId) ORDER BY Version }}

Page 54: Event sourcing

DefectId Description Status Priority ChangedBy

2 Changing permissions to owner and removing … Open P1 Anthony

Page 55: Event sourcing

DefectId Description Status Priority ChangedBy

2 Changing permissions to owner and removing … Fixed P1 Chris

DefectId Description Status Priority ChangedBy

2 Changing permissions to owner and removing … Open P2 Anthony

Page 56: Event sourcing

DefectId Description Status Priority ChangedBy

2 Changing permissions to owner and removing … Fixed P1 Chris

DefectId Description Status Priority ChangedBy

2 Changing permissions to owner and removing … Open P2 Anthony

Page 57: Event sourcing

EntityId Version EventType Data

2 1 DefectOpened Byte[]

Event Table

Page 58: Event sourcing

EntityId Version EventType Data

2 1 DefectOpened Byte[]

EntityId Version EventType Data

2 1 DefectOpened Byte[]

2 2 DefectFixed Byte[]

2 2 DefectReprioritized Byte[]

Event Table

Page 59: Event sourcing

EntityId Version EventType Data

2 1 DefectOpened Byte[]

EntityId Version EventType Data

2 1 DefectOpened Byte[]

2 2 DefectFixed Byte[]

2 2 DefectReprioritized Byte[]

EntityId Version EventType Data

2 1 DefectOpened Byte[]

2 2 DefectFixed Byte[]

2 3 DefectReprioritized Byte[]

Event Table

Page 60: Event sourcing

EntityId Version EventType Data

2 1 DefectOpened Byte[]

Event Table

Page 61: Event sourcing

EntityId Version EventType Data

2 1 DefectOpened Byte[]

EntityId Version EventType Data

2 1 DefectOpened Byte[]

2 2 DefectClosed Byte[]

2 2 DefectFixed Byte[]

Event Table

Page 62: Event sourcing

EntityId Version EventType Data

2 1 DefectOpened Byte[]

EntityId Version EventType Data

2 1 DefectOpened Byte[]

2 2 DefectClosed Byte[]

2 2 DefectFixed Byte[]

Event Table

Page 63: Event sourcing

Contraindications