Introduction to CQRS + Event Sourcing: Part 2

In the last article, I started with the basics of CQRS + Event Sourcing. This time I propose to continue and take a closer look at ES.

In the example that I uploaded with my previous article, the магия Event Sourcing'а была скрыта за абстракцией IRepository и двумя методами IRepository.Save() и IRepository.GetById<>().
In order to understand in more detail what is happening, I decided to talk about the process of saving and loading the unit from the Event Store using the example of the Lokad IDDD Sample project from Rinat Abdulin. In his application services, there is a direct appeal to the Event Store, without additional abstractions, so everything looks very clear. Application Service is an analogue of CommandHandler, but which processes all the commands of one unit. This approach is very convenient and we also switched to it in one project.

ApplicationService


The IApplicationService interface is extremely simple.
public interface IApplicationService
{
   void Execute(ICommand cmd);
}

In the Execute method, we pass any commands and we hope that the service will redirect them to the desired handler.

Since Rinat in the example has only one Customer aggregate, the service also has only one CustomerApplicationService. Actually, therefore, there is no need to make any logic in the base class. Great solution for an example in my opinion.

The Execute method passes the processing of the command to one of the When method overloads matching the signature. The implementation of the Execute method is very simple using speakers, and you do not need to run reflection on all methods.
public void Execute(ICommand cmd)
{
   // pass command to a specific method named when
   // that can handle the command
   ((dynamic)this).When((dynamic)cmd);
}

Let's start with the create command - CreateCustomer.
[Serializable]
public sealed class CreateCustomer : ICommand
{
   public CustomerId Id { get; set; }
   public string Name { get; set; }
   public Currency Currency { get; set; }

   public override string ToString()
   {
       return string.Format("Create {0} named '{1}' with {2}", Id, Name, Currency);
   }
}

In a real project, you most likely will have a message queue between the UI and the ApplicationService, but for example Rinat limited himself to sending a command directly to the service application object (see class ApplicationServer).
After the CreateCustomer command enters the Execute method, it is redirected to the When method.
public void When(CreateCustomer c)
{
   Update(c.Id, a => a.Create(c.Id,c.Name, c.Currency, _pricingService, DateTime.UtcNow));
}

In the Update method, we pass the identifier of the aggregate and the action that calls the method of changing the state of the aggregate. In general, in my opinion, it is better not to create the Create method on the aggregate, but to create another constructor, since calling the Create method in this context looks a bit strange. We seem to be creating an aggregate, but for some reason we pass the Create method as a method of changing the state. With the designer, this would not have happened.

Let’s return to the Update method, he has the following task: 1) load all events for the current aggregate, 2) create an aggregate and restore its state using loaded events, 3) perform the passed Action execute action on the aggregate, and 4) save new events if any.
void Update(CustomerId id, Action<Customer> execute)
{
   // Load event stream from the store
   EventStream stream = _eventStore.LoadEventStream(id);
   // create new Customer aggregate from the history
   Customer customer = new Customer(stream.Events);
   // execute delegated action
   execute(customer);
   // append resulting changes to the stream
   _eventStore.AppendToStream(id, stream.Version, customer.Changes);
}


State recovery


In the example that I showed in the last article, the state of the aggregate was stored as private fields in the aggregate class. This is not very convenient if you want to add snapshots, as will have to somehow suck out the state every time or use reflection. Rinat has a much more convenient approach - the state has a separate class called CustomerState, which makes it possible to extract projection methods from the aggregate and it is much easier to save and load snapshots, although they are not in the example.
As you can see, the list of events of the same unit is transferred to the constructor, as it is not difficult to guess, in order for it to restore its state.
The unit in turn delegates state recovery to the CustomerState class.
/// <summary>
/// Aggregate state, which is separate from this class in order to ensure,
/// that we modify it ONLY by passing events.
/// </summary>
readonly CustomerState _state;

public Customer(IEnumerable<IEvent> events)
{
   _state = new CustomerState(events);
}

I’ll give you the code for the entire CustomerState class, I’ll just remove some When projection methods.
/// <summary>
/// This is the state of the customer aggregate.
/// It can be mutated only by passing events to it.
/// </summary>
public class CustomerState
{
   public string Name { get; private set; }
   public bool Created { get; private set; }
   public CustomerId Id { get; private set; }
   public bool ConsumptionLocked { get; private set; }
   public bool ManualBilling { get; private set; }
   public Currency Currency { get; private set; }
   public CurrencyAmount Balance { get; private set; }

   public int MaxTransactionId { get; private set; }

   public CustomerState(IEnumerable<IEvent> events)
   {
       foreach (var e in events)
       {
           Mutate(e);
       }
   }
...
   public void When(CustomerCreated e)
   {
       Created = true;
       Name = e.Name;
       Id = e.Id;
       Currency = e.Currency;
       Balance = new CurrencyAmount(0, e.Currency);
   }

   public void When(CustomerRenamed e)
   {
       Name = e.Name;
   }

   public void Mutate(IEvent e)
   {
       // .NET magic to call one of the 'When' handlers with
       // matching signature
       ((dynamic) this).When((dynamic)e);
   }
}

As you can see in the constructor, we run on the passed events and pass them to the Mutate method, which in turn strains them further into a suitable projection method.
You can notice that all properties have a private setter method, which guarantees that we can change the state from the outside only by passing the corresponding event.

The state was restored, now you can try to change it. Let's go back a little to the state change method. Since I began to understand the CreateCustomer command, we will also look at the Create method on the unit.
public void Create(CustomerId id, string name, Currency currency, IPricingService service, DateTime utc)
{
   if (_state.Created)
       throw new InvalidOperationException("Customer was already created");
   Apply(new CustomerCreated
       {
           Created = utc,
           Name = name,
           Id = id,
           Currency = currency
       });

   var bonus = service.GetWelcomeBonus(currency);
   if (bonus.Amount > 0)
       AddPayment("Welcome bonus", bonus, utc);
}

This is the place to check our business rules, since we have access to the current state of the unit. We have a business rule that Customer can be created only once (however, there is also a technical limitation), so when we try to call Create on an already created unit, we drop the action.
If all business rules are satisfied, then we pass the CustomerCreated event to the Apply method. The Apply method is very simple, it just passes the event to the state object and adds it to the list of current changes.
public readonly IList<IEvent> Changes = new List<IEvent>();
readonly CustomerState _state;

void Apply(IEvent e)
{
   // pass each event to modify current in-memory state
   _state.Mutate(e);
   // append event to change list for further persistence
   Changes.Add(e);
}

In most cases, one event occurs per operation with an aggregate. But just in the case of the Create method, there may be two events.
After we passed the CustomerCreate event to the Apply method, we can add a welcome bonus to the current customer if the business rule is that the bonus amount must be greater than zero. In this case, the AddPayment aggregate method is called, which does not aggregate any checks but simply triggers the CustomerPaymentAdded event.
public void AddPayment(string name, CurrencyAmount amount, DateTime utc)
{
   Apply(new CustomerPaymentAdded()
       {
           Id = _state.Id,
           Payment = amount,
           NewBalance = _state.Balance + amount,
           PaymentName = name,
           Transaction = _state.MaxTransactionId + 1,
           TimeUtc = utc
       });
}

Now you have to save new events and publish them in the Read model. I suspect that this is the next line.
// append resulting changes to the stream
_eventStore.AppendToStream(id, stream.Version, customer.Changes);

Make sure ...
public void AppendToStream(IIdentity id, long originalVersion, ICollection<IEvent> events)
{
   if (events.Count == 0)
       return;
   var name = IdentityToString(id);
   var data = SerializeEvent(events.ToArray());
   try
   {
       _appendOnlyStore.Append(name, data, originalVersion);
   }
   catch(AppendOnlyStoreConcurrencyException e)
   {
       // load server events
       var server = LoadEventStream(id, 0, int.MaxValue);
       // throw a real problem
       throw OptimisticConcurrencyException.Create(server.Version, e.ExpectedStreamVersion, id, server.Events);
   }

   // technically there should be a parallel process that queries new changes
   // from the event store and sends them via messages (avoiding 2PC problem).
   // however, for demo purposes, we'll just send them to the console from here
   Console.ForegroundColor = ConsoleColor.DarkGreen;
   foreach (var @event in events)
   {
       Console.WriteLine("  {0} r{1} Event: {2}", id,originalVersion, @event);
   }
   Console.ForegroundColor = ConsoleColor.DarkGray;
}

Well, almost pleased. Events are serialized and saved in the append only store (we are not going to delete and modify them). But instead of sending them to the read-model (at the presentation level), Rinat simply displays them on the console.
However, for example, this is enough.
If you want to see how this will work with the message queue, you can see an example on the github from my previous article. I changed it a bit and also introduced part of the Event Sourcing infrastructure into the solution. In this example, I want to show how you can add snapshots.

Snapshots


Snapshots are needed to take intermediate snapshots of the state of the unit. This allows us not to download the entire flow of events of the unit, but only those that occurred after we made the last snapshot.
So let's look at the implementation.
The UserCommandHandler class has an Update method very similar to that of Rinat, but for me it still saves and loads snapshots. We make snapshots every 100 versions.
private const int SnapshotInterval = 100;

First, we raise a snapshot from the repository, then load the stream of events that occurred after we made this snapshot. Then we pass all this to the designer of the unit.
private void Update(string userId, Action<UserAR> updateAction)
{
   var snapshot = _snapshotRepository.Load(userId);
   var startVersion = snapshot == null ? 0 : snapshot.StreamVersion + 1;
   var stream = _eventStore.OpenStream(userId, startVersion, int.MaxValue);
   var user = new UserAR(snapshot, stream);
   updateAction(user);
   var originalVersion = stream.GetVersion();
   _eventStore.AppendToStream(userId, originalVersion, user.Changes);
   var newVersion = originalVersion + 1;
   if (newVersion % SnapshotInterval == 0)
   {
       _snapshotRepository.Save(new Snapshot(userId, newVersion,user.State));
   }
}

In the constructor, we try to get the state from the snapshot or create a new state if there is no snapshot. On the received state, we lose all the missing events, and as a result we get the current version of the unit.
public UserAR(Snapshot snapshot, TransitionStream stream)
{
   _state = snapshot != null ? (UserState) snapshot.Payload : new UserState();
   foreach (var transition in stream.Read())
   {
       foreach (var @event in transition.Events)
       {
           _state.Mutate((IEvent) @event.Data);
       }
   }
}

After manipulating the aggregate, we check whether the version of the aggregate is a multiple of the interval through which we take snapshots, and if so, then save the new snapshot to the repository. To get the state of the aggregate from UserCommandHandler, we had to make the State property for it an internal getter.

That's all, now we have snapshots, which allowed us to restore the state of the unit much faster if it has a lot of events.

Feedback


If you are interested in the topic of CQRS + ES, please feel free to ask questions in the comments. You can also write to me on Skype if there is no aka on a habr. Recently, a friend from Chelyabinsk knocked on my Skype and thanks to his questions I got a lot of ideas for the next article. Since I now have more free time at my disposal, I plan to write them more often.