본문 바로가기
  • AI (Artificial Intelligence)
Fundamental/Technical

Event-sourced game implementation example

by 로샤스 2021. 4. 12.

Part 1/3: Getting started

Ref. scalac.io/blog/event-sourced-game-implementation-example-part-1-3-getting-started/

Hi, in this post series we’ll create a really simple (yet complete) event-sourced game. It won’t be anything spectacular thus its rules are as simple as:

  • game creator specifies players taking part
  • each player, one after another, has an opportunity to roll the dice
  • each player’s opportunity to roll is time limited, if player won’t roll within the limit, his opportunity is gone
  • winners are all players who share the highest rolled number

With these simple rules it’s barely playable but who cares, at least it’s event-sourced!

Full source code is available on GitHub.

Let’s go…

1. Architecture overview

We’ll split our application into three separate modules: game, webappp and statistics.

Game is our core module, it’s where all business logic lives. It has REST API to handle commands and it publishes events to RabbitMQ.

Webapp is an example user interface client consuming our game module. It’ll be created using Play Framework, AngularJS and WebSocket.

Additionaly, we’ll create statistics module that’ll collect data from game’s events and expose simple REST API to get calculated dice rolls statistics.

In this part we’ll take a look at game module followed by webapp and statistics in 2nd and 3rd part of this series.

1. Domain – game’s heart

We’ll begin with implementing main portion of game’s logic – our domain. We’ll do it using event sourcing technique.

Basic idea behind event sourcing is quite simple: an aggregate root (our game) receives (non-persistent) commands.

As a result it generates persistent events that are saved to an event store (this is what Akka persistence takes care of). Right after saving, events are applied to the aggregate root, changing it’s state – it’s the only way to modify it.

Having this in mind, we can restore any past state of an aggregate just by subsequently applying its saved events. On the other hand, it means that if we want to recreate the current state of an aggregate we need to take all of it’s saved events and apply them ony by one. And yes, you are right – this may be inefficient.

Here’s where snapshots come into play – they are state dumps taking into account events up to a certain point in the event stream. If we have a snapshot we only need to reapply the events that happened after it’s creation to obtain the current state. Usually it’s a good idea to use them but here we’ll leave them out for clarity.

Summing up, here are the ingredients that we’ll use:

  • game – contains game rules logic, an aggregate root
  • commands – game is capable of handling them (“Roll the dice” is an example of command)
  • violations – if command being handled breaks game’s rules (for example if player tries to roll while it’s not his turn) we indicate a violation
  • events – they are generated by game when something happens (for example: “Dice has been rolled” or “Game has finished”) – once applied, they change game’s state

Game

Our game is an event-driven aggregate root. It’s immutable and most of its public methods return new copy of itself with some changes. Applied changes are reflected in generated events added to returned game.

Here’s how our game’s interface looks:

sealed trait Game {
def handleCommand(command: GameCommand): Either[GameRulesViolation, Game]
def applyEvent: PartialFunction[GameEvent, Game]
}
 
case class UninitializedGame(...) extends Game {
def start(players: Seq[PlayerId]): Either[GameRulesViolation, Game]
}
 
case class RunningGame(...) extends Game {
def roll(player: PlayerId): Either[GameRulesViolation, Game]
def tickCountdown(): Game
}
 
case class FinishedGame(...) extends Game {}

view rawGame.scala hosted with ❤ by GitHub

[game.scala]

As you can see, game can be in one of 3 states:

  • uninitialized – right after creation
  • running – once started
  • finished – after all players roll (or timeout)

Commands

There are just two commands that our game needs to handle:

sealed trait GameCommand
case class StartGame(players: Seq[PlayerId]) extends GameCommand
case class RollDice(player: PlayerId) extends GameCommand

view rawGame.scala hosted with ❤ by GitHub

[command.scala]

I decided to make Game trait responsible for handling these commands, thats what handleCommand method does. It simply dispatches commands to the corresponding methods (either start or roll). If a command cannot be applied in the current state (for example RollDice in UninitializedGame) it returns the suitable violation.

Some may prefer to have handleCommandimplemented in each subclass and not in the Game trait. Either ways is fine.

def handleCommand(command: GameCommand): Either[GameRulesViolation, Game] = command match {
case StartGame(players) => this match {
case ug: UninitializedGame => ug.start(players)
case _ => GameAlreadyStartedViolation
}
case RollDice(player) => this match {
case rg: RunningGame => rg.roll(player)
case _ => GameNotRunningViolation
}
}

view rawHandleCommand.scala hosted with ❤ by GitHub

[game.scala]

Apart from commands we’ll have a method to update the current turn countdown (tickCountdown). It’ll take care of updating the time left in a player’s turn as well as the turn timeout. We’ll execute this method periodically as time passes.

Violations

You might have already noticed that most public methods of a game returnEither[GameRulesViolation, Game].

We’ll use the following violations to indicate failure:

sealed trait GameRulesViolation
case object NotEnoughPlayersViolation extends GameRulesViolation
case object NotCurrentPlayerViolation extends GameRulesViolation
case object GameAlreadyStartedViolation extends GameRulesViolation
case object GameNotRunningViolation extends GameRulesViolation

view rawGameViolations.scala hosted with ❤ by GitHub

[violation.scala]

Here’s quick explanation:

  • NotEnoughPlayersViolation – returned by start if there are not enough players to start a game (ie, less than 2)
  • NotCurrentPlayerViolation – returned by roll if the player who tries to roll is not the one who’s turn it is
  • GameAlreadyStartedViolation – created when you try to pass a Start command to game that’s already started
  • GameNotRunningViolation – indicates that a player tried to roll before the game was started

Events

If a command result is a valid game (not a violation), all events generated during the command’s processing will be added to the returned game’s uncommittedEvents field. You can later publish, persist or use them in any other way.

Events that we’ll use are:

case class GameStarted(..)
case class TurnCountdownUpdated(..)
case class TurnChanged(...)
case class TurnTimedOut(...)
case class DiceRolled(..)
case class GameFinished(..)

view rawEvents.scala hosted with ❤ by GitHub

[event.scala]

I hope they are self explanatory.

Appropriate events are generated by Game. Here’s a simplified version of the roll method, that generates the event:

def roll(player: PlayerId): Either[GameRulesViolation, Game] = {
if (turn.currentPlayer == player) {
val rolledNumber = randomBetween(1, 6)
applyEvent(DiceRolled(id, rolledNumber))
} else {
NotCurrentPlayerViolation
}
}

view rawRoll.scala hosted with ❤ by GitHub

[game.scala]

We can see that if everything is fine (it’s the currentPlayer that rolls) we generate one event: DiceRolled. From the roll method we return the result of applying this event to the current game’s state.

What does the applyEvent method do then? Here’s example for DiceRolledevent:

override def applyEvent = {
case ev @ DiceRolled(_, rolledNumber) =>
copy(rolledNumbers = rolledNumbers + (turn.currentPlayer -> rolledNumber),
uncommittedEvents = uncommittedEvents :+ ev)
// ...
}

view rawApplyEvent.scala hosted with ❤ by GitHub

[game.scala]

A new copy of the game is created with an updated state (here we add the rolled number to the rolledNumbers map). We also add the new event to uncommitedEvents so that the caller of roll knows what events were generated.

All state changes happen as a result of events being applied. If we want to change the game state we need to generate an event and apply it.

applyEvent is overridden in each of game subclasses and each of them handles different set of events. For example DiceRolled event would be invalid in uninitialized state, thus it’s not handled there.

The boon of this approach is that having a list of past events we can easily restore the current game state, just by applying them one by one.

def applyEvents(events: E*): T =
events.foldLeft(this)(_ applyEvent _)

view rawApplyEvents.scala hosted with ❤ by GitHub

[AggregateRoot.scala]

This method is defined in the AggregateRoot trait that Game extends.

Let’s get back to tickCountdown for a moment. Although it doesn’t handle commands, it too will generate some events.

def tickCountdown(): Game = {
val countdownUpdated = TurnCountdownUpdated(id, turn.secondsLeft - 1)
if (turn.secondsLeft <= 1) {
val timedOut = TurnTimedOut(id)
nextPlayerOpt match {
case Some(nextPlayer) =>
applyEvents(countdownUpdated, timedOut, TurnChanged(id, Turn(nextPlayer, turnTimeoutSeconds)))
case None =>
applyEvents(countdownUpdated, timedOut, GameFinished(id, bestPlayers))
}
} else applyEvent(countdownUpdated)
}

view rawTickCountdown.scala hosted with ❤ by GitHub

[game.scala]

2. Running games – time for Akka

Now that we have our game logic, we can bring it to life. We’ll use Akka to run games and persist events using Akka persistence.

Key part of Akka persistence is PersistentActor. It’s role is to manage event-driven state.

When PersistentActor is created, first thing that it does is state recovery from its previously saved events (or snapshots). If we create an actor that had some events saved before, all these events are given before any other message is handled (incoming messages are cached internally). Only after all past events were processed PersistentActor can handle regular messages.

Where do the events come from then? Akka persistence gives us (amongst other things) a persist method which we can use to save events to the journal. Journals are pluggable and there’s quite a bit of them available (including MongoDB, Cassandra, Kafka). The default one writes to the local filesystem – it’s fine for our simple project.

Knowing this, there are several things we’ll need to implement in our GameActor:

  • persistenceId – is the unique identifier of our persistent actor, used by the persistence mechanism to associate the stored events with our actor; here we’ll use the gameId (which happens to be a UUID)
  • receiveCommand – handles regular messages received by the actor; we’ll implement this to handle game commands as well as self-sent message to update the time
  • receiveRecover – messages that we’ll handle here are either previously saved game events from which we’ll rebuild most recent state or RecoveryCompleted which tells us that state recovery has finished

GameActor will be responsible for managing a running game’s state (stored in it’s game field) – feeding it commands and ticking the time.

class GameActor(id: GameId) extends PersistentActor {
override val persistenceId = id.value
 
var game: Game = Game.create(id)
 
override def receiveCommand = {
case command: GameCommand => handleResult(game.handleCommand(command))
// ...
}
 
def handleResult(result: Either[GameRulesViolation, Game]) = result match {
case Right(updatedGame) =>
sender() ! CommandAccepted
handleChanges(updatedGame)
case Left(violation) =>
sender() ! CommandRejected(violation)
}
 
def handleChanges(updatedGame: Game) =
updatedGame.uncommittedEvents.foreach {
persist(_) { ev =>
game = game.applyEvent(ev).markCommitted
publishEvent(ev)
// ...
}
}
 
def publishEvent(event: GameEvent) = {
system.eventStream.publish(event)
}
 
override def receiveRecover = {
case ev: GameEvent =>
game = game.applyEvent(ev)
// ...
}
}

view rawGamingActor.scala hosted with ❤ by GitHub

[GameActor.scala]

What’s going on here?

  1. A GameCommand is received
  2. It’s passed to the handleCommand method
  3. We inform the sender about a command’s success or failure (CommandAccepted and CommandRejected) A note here: Using CQRS it’s not always immediately possible to tell if something went wrong. We’re lucky that our game is simple enough for this not to be an issue. If we weren’t we could just acknowledge that we started processing a command that might be completed at some later point in time.
  4. If the command succeeded we persist all new events (remember uncommittedEvents?)
  5. After each event is persisted (second argument list in persist is a callback) we apply it to current game state (the one from before command). Also we mark new state as commited, that is, remove all events from uncommittedEvents. We don’t want them anymore (if we left them, by next command processing, we wouldn’t know which events are “fresh” and which ones remained from previous commands)
  6. We publish an event to the ActorSystem’s default eventStream
  7. We perform other actions depending on what event was applied. Omitted here for brevity.

GameActor will also take care of updating the game’s time (remember the tickCountdown method?).

We’ll use the built-in scheduler for that. It will call game.tickCountdownevery second. Once it does it will handle the changes the same way it does in case of a command.

Here’s related code:

var tickCancellable: Option[Cancellable] = None
 
override def receiveCommand = {
// ...
case TickCountdown => game match {
case rg: RunningGame => handleChanges(rg.tickCountdown())
case _ =>
log.warning("Game is not running, cannot update countdown")
cancelCountdownTick()
}
}
 
def scheduleCountdownTick() = {
val cancellable =
system.scheduler.schedule(1.second, 1.seconds, self, TickCountdown)
tickCancellable = Some(cancellable)
}
 
def cancelCountdownTick() = {
tickCancellable.foreach(_.cancel())
tickCancellable = None
}
 
override def receiveRecover = {
// ...
case RecoveryCompleted =>
if (game.isRunning)
scheduleCountdownTick()
}

view rawGamingActorTick.scala hosted with ❤ by GitHub

[GameActor.scala]

3. REST API

To create new games and pass commands to existing ones, we’ll expose a REST API. We’ll use spray-can as a server with the following routes:

  • POST /game – create new game
  • POST /game/:id/start – start previously created game
  • POST /game/:id/roll/:player – roll request from given player

I realize last two are not very RESTful, but for our simple example let’s just stick with them.

We’ll use the actor-per-request pattern, creating one actor to handle each request.

We handle the /game url by creating a new actor as shown below.

val gameRoute =
(pathPrefix("game") & post) {
pathEndOrSingleSlash(handleCreate) // ~ other routes not shown here
}
 
def handleCreate: Route = { ctx =>
actorRefFactory.actorOf(CreateGameRequestActor.props(ctx, gameManager))
}

view rawRoutes.scala hosted with ❤ by GitHub

[GameApi.scala]

The request-handling actor is defined as follows

class CreateGameRequestActor extends Actor {
 
gameManager ! GameManager.CreateGame
 
context.setReceiveTimeout(timeout)
 
override def receive = {
case GameManager.GameCreated(id) =>
ctx.complete(StatusCodes.Created, CreateGameResponseData(id.value))
context stop self
case ReceiveTimeout =>
ctx.complete(StatusCodes.RequestTimeout)
context stop self
}
 
}

view rawRequestActor.scala hosted with ❤ by GitHub

[CreateGameRequestActor.scala]

As you can see all commands go through a GameManager. It’s responsible for creating new GameActors and passing commands to existing ones.

class GameManager extends Actor {
 
override def receive = {
case CreateGame =>
val id = GameId.createRandom
context.actorOf(GameActor.props(id), id.value)
sender() ! GameCreated(id)
case SendCommand(gameId, command) =>
context.child(gameId.value) match {
case Some(game) => game forward command
case None => sender() ! GameDoesNotExist
}
}
 
}

view rawGameManager.scala hosted with ❤ by GitHub

[GameManager.scala]

SendCommand is send by instances of the second per-request actor we have – GameCommandRequestActor – created to handle the start and roll routes.

Once a command is processed, the actor handling the request is stopped.

4. Feeding the Rabbit

We already know how to take some actions in our games, but how do we know what’s actually happening in them? Let’s get back to our events.

For now events from our games just go to the eventStream and get forgotten. That’s not very useful. We’d prefer them to be published for consumption by external clients. There are many possibilities: from exposing a REST API for fetching recent events (polling), through a custom socket-based pub/sub implementation, to full-blown message queue systems. We’ll gear towards the latter and use RabbitMQ to publish our events.

I’ll use the Reactive rabbit library to easily bind a stream (of game events) to a rabbit exchange.

Our events will be sent to a headers exchange and will have gameId and typeheaders (which we can route on in our queues).

Here’s how we create an exchange and bind a publisher to it:

val connection = Connection()
val exchange = Exchange(exchangeName, Headers, durable = false)
 
connection.exchangeDeclare(exchange) onComplete {
case Success(_) =>
Source[GameEvent](EventPublisherActor.props)
.map(toMessage)
.to(Sink(connection.publish(exchange = exchangeName, "")))
.run()(ActorFlowMaterializer())
case Failure(ex) =>
log.error("Cannot create exchange", ex)
sys.exit(1)
}
 
def toMessage(event: GameEvent) = {
val serialized = compact(render(Extraction.decompose(event)))
Message(
body = ByteString(serialized),
contentType = Some(MediaType.JSON_UTF_8),
contentEncoding = Some("UTF-8"),
headers = Map(
"gameId" -> event.id.value,
"type" -> event.getClass.getSimpleName))
}

view rawConnectToMQ.scala hosted with ❤ by GitHub

[Boot.scala]

Our publisher is an actor that catches game events from ActorSystem’s eventStream and publishes them (onNext) in accordance to requested demand.

class EventPublisherActor extends ActorPublisher[GameEvent] {
 
var eventCache: List[GameEvent] = Nil
 
context.system.eventStream.subscribe(self, classOf[GameEvent])
 
override def receive = {
case Request(n) =>
while (isActive && totalDemand > 0 && eventCache.nonEmpty) {
val (head :: tail) = eventCache
onNext(head)
eventCache = tail
}
case event: GameEvent =>
if (isActive && totalDemand > 0)
onNext(event)
else
eventCache :+= event
}
 
}

view rawEventPublisher.scala hosted with ❤ by GitHub

[EventPublisherActor.scala]

Once we pass an event to onNext Reactive rabbit takes care of moving it to RabbitMQ.

5. Running

Now that we have all the building blocks ready it’s time to wire everything up and get it running.

First, let’s start RabbitMQ. I prefer to use a docker container to avoid the tedium of setting it up from scratch.

$ docker run -d -p 5672:5672 -p 15672:15672 dockerfile/rabbitmq

view rawdocker.sh hosted with ❤ by GitHub

Before we start creating games, we’ll bind a queue to game_events exchange so that we can see passing events:

  1. Enter rabbit console. If you used docker above it’s: https://localhost:15672/ (default login/pass: guest/guest)
  2. Go to Queues and in “Add a new queue” section enter queue name, for example “all_games_events”.
  3. Click on newly created queue name and in “Bindings” section in “From exchange” field type “game_events” (leave remaining fields empty). This will bind our queue to all events from all games.

Later, when there’ll be messages going to this queue, you can read them in the Get messages section of the queue view. Just set how many messages you want to see in the Messages field and hit Get Message(s). We are now ready to run the game application:

$ sbt "project game" run

view rawsbt.sh hosted with ❤ by GitHub

Let’s try creating a new game using the REST API we have:

$ curl -X POST http://127.0.0.1:8081/game
 
{"id":"6aa60d73-e9b1-4a3d-a7cc-0a9fd586974d"}

view rawcurl1.sh hosted with ❤ by GitHub

The game has been created and we got it’s id in response. Great! Let’s start it:

$ curl -H "Content-Type: application/json" --data '{"players": ["Player1", "Player2"]}' http://127.0.0.1:8081/game/6aa60d73-e9b1-4a3d-a7cc-0a9fd586974d/start
 
The request has been accepted for processing, but the processing has not been completed

view rawcurl2.sh hosted with ❤ by GitHub

We just started a game with two players: “Player1” and “Player2”. The game should be running and generating events now.

Let’s check that… Go back to rabbit console and get some messages in queue overview. Here’s what I got:

// headers:
gameId: 6aa60d73-e9b1-4a3d-a7cc-0a9fd586974d
type: GameStarted
 
// payload:
{"id":"6aa60d73-e9b1-4a3d-a7cc-0a9fd586974d","players":[{"value":"Player1"},{"value":"Player2"}],"initialTurn":{"currentPlayer":"Player1","secondsLeft":15}}

view rawresponse1.sh hosted with ❤ by GitHub

// headers
gameId: 6aa60d73-e9b1-4a3d-a7cc-0a9fd586974d
type: TurnCountdownUpdated
 
// payload:
{"id":"6aa60d73-e9b1-4a3d-a7cc-0a9fd586974d","secondsLeft":14}

view rawresponse2.sh hosted with ❤ by GitHub

We can clearly see that the game has indeed started and is running. Voila! The turn countdown is being updated every second, and if you wait long enough without any action you’ll surely see the TurnTimedOut event in a few seconds.

Let’s try rolling the dice:

$ curl -X POST http://127.0.0.1:8081/game/6aa60d73-e9b1-4a3d-a7cc-0a9fd586974d/roll/Player1
 
The request has been accepted for processing, but the processing has not been completed.

view rawcurl3.sh hosted with ❤ by GitHub

Good! We’ve just rolled the dice. DiceRolled event should be generated.

Let’s check our validation, what will happen if we try to roll as a player who is not supposed to roll at the moment?

$ curl -X POST http://127.0.0.1:8081/game/6aa60d73-e9b1-4a3d-a7cc-0a9fd586974d/roll/Player3
 
{"message":"Not this player's turn"}

view rawresponse3.sh hosted with ❤ by GitHub

That’s it. Everything works fine.

For the impatient there is also a web interface. Just sbt "project webapp" run and point your browser to https://localhost:9000/ to play around with it.

Summary

I realize this post didn’t cover all aspects in detail, I tried to focus on most important ones and I hope it puts some light on most of them. My goal was to give you a basic idea of how a CQRS/ES-based application could look like.

As a reminder you can access the full source code at Github.

In the next part, we’ll create a simple web application. We’ll see how to combine REST API calls with event listening to create a decent user experience.

Stay tuned!

Other parts:

Do you like this post? Want to stay updated? Follow us on Twitter or subscribe to our Feed.

'Fundamental > Technical ' 카테고리의 다른 글

Webhook Data Format  (0) 2021.04.12
The Logic Apps Webhook Action and the Correlation Identifier Pattern  (0) 2021.04.12
What Is an Event-Driven Architecture?  (0) 2021.04.12
Facebook uses Memcache Clusters  (0) 2021.02.22
Sonatype - Nexus  (0) 2020.12.21

댓글