Symfony Messenger component for CQRS applications

Oryginally published at my personal website: patryk.it

Hi there! This article is mostly for people who, like me some time ago, are looking for information on how to configure Messenger for applications based on CQRS architectural pattern. To be honest, Messenger is my favorite SF component.

So at first — what is a CQRS? It’s the acronym of Command-Query Segregation Responsibility. In simple words — you’re split write-model from the read-model. That’s an approach first described by Greg Young.

Read-Model is the part of your application that is reading (wow?!) something from the source (eg. database, files, etc.). Read-model is not mutating the state of a system.

Write-Model is a part of your app that is mutating the state, for example via deleting resources, updating, or creating new ones. It’s not returning anything — just changing the resources.

I recommended you to read Martin Fowler’s post.

Of course, you can “do CQRS” using separate databases per write/read model or either different database systems per write/read. Remember — that’s unnecessary in all the cases. Please treat CQRS as a tool. It’s just a hammer, and you’re deciding if it needs a wood grip or steel is enough.

At the point — in this post, I’d like to describe how I’m managing the configuration of the Symfony Messenger component to work with a pure CQRS system.

Some abstraction of Messenger is always welcome

You know, using directly Symfony components it’s not the best approach that you can find. When I’m starting from scratch with a new project — I’m just creating a thin layer between my application code and the infrastructure elements like Symfony Messenger or any other dependency. That’s just a good habit to have dependency inverted (using some small abstraction) in places like this one.

Let’s start from including new dependency of our application via composer simply running the following command:

composer require messenger

Command building blocks

In my example, I have a common package with CQRS component elements and there are three interfaces responsible for commands like the following:

Command and CommandHandler are just empty interfaces that are defining functionalities. CommandBus interface is a place where the DI lives. Thanks to this we can in simply way invert the dependencies. After all, we have an implementation that is using the Messenger component. Remember: you should NEVER use directly the Messenger implementation of the CommandBus.

As you see the name of the MessageBusInterface parameter is commandBus — it covers the dedicated bus to handle your command messages. We’ll cover this topic later.

Query building blocks

Yeah! We already have a “C” from CQRS done. It’s time to prepare some queries. To do this we need another three interfaces — very similar to the above.

Query and QueryHandler are without any declared methods. And QueryBus is similar to CommandBus - place where is the point of dependency inversion and abstraction of Messenger. Another reminder — don’t use directly the Messenger implementation of QueryBus, do it only via QueryBus interface. Look at the MessageBusInterface parameter — the name is $queryBus because it indicates name of the bus (query.bus).

Do you see differences between QueryBus and CommandBus implementations? In a MessengerQueryBus, I’ve used HandleTrait because as default, buses are not returning results and this trait just makes easier fetching results from the buses. I hate that in the PHP we don’t have generic types so creating QueryBuses would be much more elegant. Meh, just dreams. In the real world, we just need to define `mixed` return type in a docblock.

What about configuration?

Okay, writing the code as far as I think is easy for every developer that is reading this post, and you’re here because you want to get something interesting. Configuring services is not the most exciting activity, but anyway, let’s dive into this topic.

At first, we need to define buses and their transports. Most simply, you can add these lines to your messenger YAML configuration.

Few words about the buses

As you can see we have defined two buses — one for queries and one for commands. If you’re using Doctrine you can add middleware for the command bus that is handling the messages transactional so thanks to this you don’t need to do any mess in your command handlers. :) Just assign doctrine_transaction middleware and that’s all. You can also define there default middleware for enabling the option to create buses that don’t have any handler of some messages. That’s especially important for creating event buses so it’s not covered in this article but I want to let you know about it. It could looks like:

default_middleware: allow_no_handlers

Messenger transports

Okay. So the next lines of given YAML configuration define transports (ways to handle messages). In this example, you can see the sync and async as the simplest cases. Sync means the transport is realized synchronously in the request-response-cycle and async is about doing something in the background (in simple words). Let’s stop for a moment here. If you’re testing-freak — it’s good to add the definition of async for test purposes. You can do that creating a test configuration like the following one:

Thanks to this — your functional testing is dispatching asynchronous messages in memory (instead of RabbitMQ or something) so you can verify the behavior of the application the closest to real conditions. That’s just a testing-tip — for more you can visit this article.

Tag your handlers

So you have configured buses and transports. It’s time to tag your handlers to specific buses using Messenger mechanisms. That’s easy. You can add the subsequent lines to your main services.yaml.

Thanks to this, all the query and command handlers are tagged as Symfony message handlers to the specific buses.

Solving business use-cases in handlers

Finally, we’re here! Let’s dive into the solving problems using business-case handlers.
In this example, we’re in the e-Commerce context, so we have a use-case like: create a shop if the given shop name is unique. This case could be implemented using simple CRUD mechanisms and scaffolded controllers, but WE ARE AMBITIOUS… and that’s just an example.

Command + Handler

Let’s start by creating a command and his handler. In this case, we’re using UUIDs for identifying the resources so IDs are strings. This process will be asynchronous.

That’s simple, huh? Now it’s time to create a handler of this command.

Suppose that we already have a shop factory class (ShopFactory) and the repository of shops (Shops). They aren’t covered in this article. The write model realized by the Command+Handler is checking if the given shop name is unique and any other shop doesn’t exist with this name. If exists then we’re throwing an exception and discarding creating shop process (here is can read more about raising exceptions).

Query + Handler

As you see — we have our write-model done. Let’s consider creating a read-model for use in controllers, validators, forms, etc. Maybe you remember what you have read a few seconds before: we’re throwing an exception when the given shop name is already taken. We don’t want to face users with this exception moreover we need to do a pre-check before dispatching the command. Ah, okay — this command is processed asynchronous so the user’s experience would be a false-positive because he wouldn’t see any warning, error, problem — nothing! That’s another reason to create a read-model and give the user a satisfying experience with nice information about failed action. Look!

And now look at the query handler. The class name can be strange to you. Prefix DoctrineDBAL means the implementation is provided using Doctrine ORM connection. This class should be placed with the other infrastructural parts of your system in a path similar to src/Shops/Infrastructure/Doctrine/DBAL/Query/DoctrineDBALShopExistsWithNameHandler. So long name and I’m not a Java dev. 😅

Another way to implement a query handler is to create an abstraction of the connection. I think that’s an unnecessary level of abstraction in this case and your query-handler will just forward the parameter to it so it’s an anti-pattern in the terms of modular programming (shallow method). This way may create god-classes like repositories that are cover n-cases. The first solution (from the above example) has pros like ONE query for ONE implementation of ONE business-case query.

Yaaay! We have implemented simple CQRS — let’s use it!

Let’s presume that we have a controller that is using our newly created functionalities. We want to create a shop with a specific name. In the first step, we need to do input validation (name can not be empty and must be strict) and then business validation (shop name is unique).

Maybe I will just show my simple implementation?

As you can see I’ve added a private method for the handling query — just for a strictly typed result. The query is done via the synchronous process and we’re waiting for the response from the read-model but the write-model is realized via asynchronous processing so we can’t wait for the result. You can also create a validator using our query to check if the name is unique and create for it specific validation constraint.

Testing CQRS systems

You may be asking yourself how to test applications based on CQRS architectural approach. It’s not that hard — but of course, exist many ways to do that.

Unit testing

Using the unit tests you can check the domain-layer that is not covered in this article. I mean — these blocks will be used inside of the application layer (use-cases) which are covered in our example via Command Handlers.

The next thing that you can check via unit tests is an infrastructure layer that is not an adapter of the external world like implementations of the repositories, HTTP clients, etc. In this example, you can do that with the Messenger implementations of the buses. They are based on the interface MessageBusInterface so you can easily mock it using spy test-double to verify if something occurred.

In this example I’ve used an anonymous class that is implementing MessageBusInterface— but in a real-world application, you can split it as a SpyMessageBus with the lastDispatchedCommand(): Command. Also, the application-layer is unit-testable.

Integration testing

In a system like this, you should test with integration tests the adapters to the external world. In this example, we don’t have them but you could check via integration testing things e.g. like the implementation of the Doctrine repository. For info what is a mother object check this article.

That’s an example!

Functional testing

As functional testing, I mean checking if the whole process works and returns the expected result. These tests are not using third party adapters like Doctrine, etc. — they should use fake implementations that help us with quick functional application testing. We can test controllers, UI CLI commands, and any place that is our user interface layer of the system.

Of course, you should verify if the response is invalid format, etc. Similarly, we could test the CLI commands.

So…

As you see implementing CQRS in the Symfony using the Messenger component is pretty easy. At this moment I recommend to you read some articles about the whole approach that CQRS is:

Author of patryk.it / product Engineer with many years of experience in creating and designing web applications. #DDD #TDD freak

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store