Implementation for the CommandBus, the QueryBus and the EventBus in PHP 7 and using PSR-11.
- Installation
- Introduction
- What is a Message Bus?
- What are its benefits?
- 1. CommandBus
- 2. QueryBus
- 3. EventBus
- 3.1 - Usage
- 3.1.1 - Create an Event
- 3.1.2 - Create an EventHandler
- 3.1.3 - (Optional) Set the EventHandler's Priority
- 3.1.4 - Register the EventHandler
- 3.1.5 - Setting up the EventBusMiddleware
- 3.1.6 - Registering the remaining EventBus classes
- 3.1.7 - Running the EventBus
- 3.1.8 - (Optional) Running the EventBus as a Queue
- 3.2 - Predefined Middlewares
- 3.3 - Custom Middlewares
- 3.1 - Usage
- 4 - Serializers
- Contribute
- Support
- Authors
- License
Use Composer to install the package:
$ composer require nilportugues/messagebus
The idea of a message bus is that you create message objects that represent what you want your application to do. Then, you toss it into the bus and the bus makes sure that the message object gets to where it needs to go.
Easy right? Keep reading!
A Message Bus is a pipe of Messages. This implementation takes care of 3 types of messages, Commands, Queries and Events. While all look similar at first, their intent is different.
- Command: its intent is about expressing an order to the system and modifies its current state. User expects no response.
- Event: its intent is to express something that has already happened in the system, and record it. User expects no response.
- Query: its intent is about expressing a question to the system. User expects a response.
From this classification, one can spot that Command and Events can work together very well.
Given the nature of the message, implementing an interface, you may write behaviours that wrap the message to log, process or modify the response using the Decorator pattern. These are called Middleware.
For instance:
- Implementing task-based user-interfaces you can map concepts to Commands, Queries and Events easily.
- It allows you to easily write a logging system to know what's going, whether its a Command, a Query or an Event. It's possible.
To wrap-up, its benefits are:
- Encourages separation of concerns.
- Encourages single responsibility design.
<?php
use NilPortugues\MessageBus\CommandBus\Contracts\Command;
final class RegisterUser implements Command
{
private $username;
private $password;
private $emailAddress;
public function __construct(string $username, string $password, string $emailAddress)
{
$this->username = $username;
$this->password = $password;
$this->emailAddress = $emailAddress;
}
//getters...
}
The Command Handler must implement the CommandHandler
interface and implement the __invoke
method.
For instance:
<?php
use NilPortugues\MessageBus\CommandBus\Contracts\CommandHandler;
use NilPortugues\MessageBus\CommandBus\Contracts\Command;
final class RegisterUserHandler implements CommandHandler
{
private $userRepository;
public function __construct($userRepository)
{
$this->userRepository = $userRepository;
}
public function __invoke(RegisterUser $command)
{
$user = new User(
$command->getUsername(),
$command->getPassword(),
$command->getEmail(),
);
$this->userRepository->save($user);
}
}
I'm assuming you're using some kind Service Container. Now it's time to register your CommandHandler.
For instance, in a Psr\Container
compliant Service Container, we can do this as follows:
<?php
//...your other registered classes
$container['RegisterUserHandler'] = function($container) {
return new RegisterUserHandler($container->get('UserRepository');
};
The Command Bus Middleware requires two classes to be injected. First one is the command translator, and second one the handler resolver.
CommandTranslator
Classes implementing this interface will provide the FQN for the Handler class given a Command.
This package provides an implementation, NilPortugues\MessageBus\CommandBus\Translator\AppendStrategy
which basically appends the word Handler
to the provided Command
class.
For custom strategies, you may write your own implementing the NilPortugues\MessageBus\CommandBus\Contracts\CommandTranslator
interface.
CommandHandlerResolver
Classes implementing this interface will be resolving the class for the instance required based on the output of the CommandTranslator used.
This package provides an implementation, NilPortugues\MessageBus\CommandBus\Resolver\PsrContainerResolver
, that expects any Service Container implementing the Psr\Container
interface.
For Symfony 2 and 3 framework users up to version 3.2, you should use Symfony Container: NilPortugues\MessageBus\CommandBus\Resolver\SymfonyContainerResolver
. For Symfony 3.3 and up use the PSR-11 ContainerResolver class.
The minimum set up to get the Command Bus working is:
<?php
//...your other registered classes
$container['CommandTranslator'] = function($container) {
return new \NilPortugues\MessageBus\CommandBus\Translator\AppendStrategy('Handler');
};
$container['CommandHandlerResolver'] = function($container) {
return new \NilPortugues\MessageBus\CommandBus\Resolver\PsrContainerResolver($container);
};
$container['CommandBusMiddleware'] = function($container) {
return new \NilPortugues\MessageBus\CommandBus\CommandBusMiddleware(
$container->get('CommandTranslator'),
$container->get('CommandHandlerResolver'),
);
};
$container['CommandBus'] = function($container) {
return new \NilPortugues\MessageBus\CommandBus\CommandBus([
$container->get('CommandBusMiddleware'),
]);
};
If for instance, we want to log everything happening in the Command Bus, we'll add to the middleware list the logger middleware. This will wrap the Command Bus, being able to log before and after it ran, and if there was an error.
<?php
//...your other registered classes
$container['LoggerCommandBusMiddleware'] = function($container) {
return new \NilPortugues\MessageBus\CommandBus\LoggerCommandBusMiddleware(
$container->get('Monolog')
);
};
//Update the CommandBus with the LoggerCommandBusMiddleware
$container['CommandBus'] = function($container) {
return new \NilPortugues\MessageBus\CommandBus\CommandBus([
$container->get('LoggerCommandBusMiddleware'),
$container->get('CommandBusMiddleware'),
]);
};
Finally, to make use of the CommandBus, all you need to do is run this code:
<?php
$commandBus = $container->get('CommandBus');
$command = new RegisterUser('MyUsername', 'MySecretPassword', '[email protected]');
$commandBus($command);
TransactionalCommandBusMiddleware
- Class:
NilPortugues\MessageBus\CommandBus\TransactionalCommandBusMiddleware
- Class construct method expects a PDO connection. It will wrap all the underlying middleware calls with
beginTransaction-commit
androllback
if any kind of exception is thrown.
LoggerQueryBusMiddleware
- Class:
NilPortugues\MessageBus\CommandBus\LoggerCommandBusMiddleware
- Class construct method expects a PSR3 Logger implementation.
In order to write custom middleware a new class implementing the NilPortugues\MessageBus\CommandBus\Contracts\CommandBusMiddleware
interface is required.
<?php
use NilPortugues\MessageBus\QueryBus\Contracts\Query;
final class GetUser implements Query
{
private $userId;
public function __construct(string $userId)
{
$this->userId = $userId;
}
public function getUserId() : string
{
return $this->userId;
}
}
The Query Handler must implement the QueryHandler
interface and implement the __invoke
method.
For instance:
<?php
use NilPortugues\MessageBus\QueryBus\Contracts\Query;
use NilPortugues\MessageBus\QueryBus\Contracts\QueryHandler;
use NilPortugues\MessageBus\QueryBus\Contracts\QueryResponse;
final class GetUserHandler implements QueryHandler
{
private $userRepository;
public function __construct($userRepository)
{
$this->userRepository = $userRepository;
}
public function __invoke(GetUser $query) : QueryResponse
{
$userId = $query->getUserId();
$user = $this->userRepository->find($userId);
return new UserQueryResponse($user);
}
}
Response queries are generic responses.
If you take into account section 2.1.2, you'll see the UserQueryResponse has a $user injected into the constructor. This has been done in order to reuse the QueryResponse in other scenarios such as fetching an updated user.
<?php
class UserQueryResponse implements QueryResponse
{
public function __construct(User $user)
{
//fetch the relevant properties from the User object
}
//..getters. No setters required!
}
I'm assuming you're using some kind Service Container. Now it's time to register your QueryHandler.
For instance, in a Psr\Container
compliant Service Container, we can do this as follows:
<?php
//...your other registered classes
$container['GetUserHandler'] = function($container) {
return new GetUserHandler($container->get('UserRepository'));
};
The Query Bus Middleware requires two classes to be injected. First one is the query translator, and second one the handler resolver.
QueryTranslator
Classes implementing this interface will provide the FQN for the Handler class given a Query.
This package provides an implementation, NilPortugues\MessageBus\QueryBus\Translator\AppendStrategy
which basically appends the word Handler
to the provided Query
class.
For custom strategies, you may write your own implementing the NilPortugues\MessageBus\QueryBus\Contracts\QueryTranslator
interface.
QueryHandlerResolver
Classes implementing this interface will be resolving the class for the instance required based on the output of the QueryTranslator used.
This package provides an implementation, NilPortugues\MessageBus\QueryBus\Resolver\PsrContainerResolver
, that expects any Service Container implementing the Psr\Container
interface.
For Symfony 2 and 3 framework users up to version 3.2, you should use Symfony Container: NilPortugues\MessageBus\QueryBus\Resolver\SymfonyContainerResolver
. For Symfony 3.3 and up use the PSR-11 ContainerResolver class.
The minimum set up to get the Query Bus working is:
<?php
//...your other registered classes
$container['QueryTranslator'] = function($container) {
return new \NilPortugues\MessageBus\QueryBus\Translator\AppendStrategy('Handler');
};
$container['QueryHandlerResolver'] = function($container) {
return new \NilPortugues\MessageBus\QueryBus\Resolver\PsrContainerResolver($container);
};
$container['QueryBusMiddleware'] = function($container) {
return new \NilPortugues\MessageBus\QueryBus\QueryBusMiddleware(
$container->get('QueryTranslator'),
$container->get('QueryHandlerResolver'),
);
};
$container['QueryBus'] = function($container) {
return new \NilPortugues\MessageBus\QueryBus\QueryBus([
$container->get('QueryBusMiddleware'),
]);
};
If for instance, we want to log everything happening in the Query Bus, we'll add to the middleware list the logger middleware. This will wrap the Query Bus, being able to log before and after it ran, and if there was an error.
<?php
//...your other registered classes
$container['LoggerQueryBusMiddleware'] = function($container) {
return new \NilPortugues\MessageBus\QueryBus\LoggerQueryBusMiddleware(
$container->get('Monolog')
);
};
//Update the QueryBus with the LoggerQueryBusMiddleware
$container['QueryBus'] = function($container) {
return new \NilPortugues\MessageBus\QueryBus\QueryBus([
$container->get('LoggerQueryBusMiddleware'),
$container->get('QueryBusMiddleware'),
]);
};
Finally, to make use of the QueryBus, all you need to do is run this code:
<?php
$queryBus = $container->get('QueryBus');
$query = new GetUser(1):
$userQueryResponse = $queryBus($query);
CacheQueryBusMiddleware
- Class:
NilPortugues\MessageBus\QueryBus\CacheQueryBusMiddleware
- Class construct method expects a Serializer (see below), a PSR6 Caching implementation and queue name.
LoggerQueryBusMiddleware
- Class:
NilPortugues\MessageBus\QueryBus\LoggerQueryBusMiddleware
- Class construct method expects a PSR3 Logger implementation.
In order to write custom middleware a new class implementing the NilPortugues\MessageBus\QueryBus\Contracts\QueryBusMiddleware
interface is required.
We'll be creating an Event. Due to the nature of events, an event may be mapped to one or more Event Handlers.
<?php
use NilPortugues\MessageBus\EventBus\Contracts\Event;
final class UserRegistered implements Event
{
private $userId;
private $email;
public function __construct(string $userId, string $email)
{
$this->userId = $userId;
$this->email = $email;
}
public function getUserId() : string
{
return $this->userId;
}
public function getEmail() : string
{
return $this->email;
}
}
To illustrate the power of eventing, we'll map the previous event UserRegistered
to two EventHandlers.
First Event Handler
First event handler we'll create assumes we've got an email service to send a welcome email.
<?php
use NilPortugues\MessageBus\EventBus\Contracts\Event;
use NilPortugues\MessageBus\EventBus\Contracts\EventHandler;
final class SendWelcomeEmailHandler implements EventHandler
{
private $emailProvider;
public function __construct($emailProvider)
{
$this->emailProvider = $emailProvider;
}
public function __invoke(UserRegistered $event)
{
$this->guard($event);
$this->emailProvider->send('welcome_email', $event->getEmail());
}
public static function subscribedTo() : string
{
return UserRegistered::class;
}
}
Second Event Handler
Second event handler we'll create the relationships in our database for user friends and user credits.
<?php
use NilPortugues\MessageBus\EventBus\Contracts\Event;
use NilPortugues\MessageBus\EventBus\Contracts\EventHandler;
final class SetupUserAccountHandler implements EventHandler
{
private $userFriendsRepository;
private $userCreditsRepository;
public function __construct($userFriendsRepository, $userCreditsRepository)
{
$this->userFriendsRepository = $userFriendsRepository;
$this->userCreditsRepository = $userCreditsRepository;
}
public function __invoke(UserRegistered $event)
{
$this->userFriendsRepository->add(new UserFriendsCollection($event->getUserId(), []));
$this->userCreditsRepository->add(new UserCredits($event->getUserId(), new Credits(0));
}
public static function subscribedTo() : string
{
return UserRegistered::class;
}
}
Sometimes you want or must make sure an action precedes another one.
By default, all events have a priority, this being set by the EventHandlerPriority::LOW_PRIORITY
constant value.
To implement your priority order in your classes implement the EventHandler
interface, must implement another interface, the EventHandlerPriority
.
For instance, if we would like SendWelcomeEmailHandler
to happen after SetupUserAccountHandler
, we should give the first less priority, or the latter more.
SetupUserAccountHandler should go first when dispatching event UserRegistered
<?php
use NilPortugues\MessageBus\EventBus\Contracts\Event;
use NilPortugues\MessageBus\EventBus\Contracts\EventHandler;
use NilPortugues\MessageBus\EventBus\Contracts\EventHandlerPriority;
final class SetupUserAccountHandler implements EventHandler, EventHandlerPriority
{
//same as before...
public static function priority() : int
{
return EventHandlerPriority::MAX_PRIORITY;
}
}
SendWelcomeEmailHandler should go second when dispatching event UserRegistered
Notice how a good idea is to set up relative ordering by subtracting to the MAX_PRIORITY order.
<?php
use NilPortugues\MessageBus\EventBus\Contracts\Event;
use NilPortugues\MessageBus\EventBus\Contracts\EventHandler;
use NilPortugues\MessageBus\EventBus\Contracts\EventHandlerPriority;
final class SendWelcomeEmailHandler implements EventHandler, EventHandlerPriority
{
//same as before...
public static function priority() : int
{
return EventHandlerPriority::MAX_PRIORITY - 1;
}
}
I'm assuming you're using some kind Service Container. Now it's time to register your Event Handlers.
For instance, in a Psr\Container
compliant Service Container, we can do this as follows:
<?php
//...your other registered classes
$container['UserFriendRepository'] = function($container) {
return []; //your repository
};
$container['UserCreditsRepository'] = function($container) {
return []; //your repository
};
$container['EmailProvider'] = function($container) {
return []; //your email provider
};
$container['SetupUserAccountHandler'] = function($container) {
return new SetupUserAccountHandler(
$container->get('UserFriendRepository'),
$container->get('UserCreditsRepository')
);
};
$container['SendWelcomeEmailHandler'] = function($container) {
return new SendWelcomeEmailHandler($container->get('EmailProvider');
};
The Event Bus Middleware requires two classes to be injected. First one is the Event translator, and second one the handler resolver.
EventTranslator
Takes care of registering the EventHandlers subscribed to an Event.
Its implementation can be found at: NilPortugues\MessageBus\EventBus\Translator\EventFullyQualifiedClassNameStrategy
.
EventHandlerResolver
Classes implementing this interface will be resolving the class for the instance required based on the output of the EventTranslator used.
This package provides an implementation, NilPortugues\MessageBus\EventBus\Resolver\PsrContainerResolver
, that expects any Service Container implementing the Psr\Container
interface.
For Symfony 2 and 3 framework users up to version 3.2, you should use Symfony Container: NilPortugues\MessageBus\EventBus\Resolver\SymfonyContainerResolver
. For Symfony 3.3 and up use the PSR-11 ContainerResolver class.
The minimum set up to get the Event Bus working is:
<?php
//...your other registered classes
$container['EventTranslator'] = function($container) {
$handlers = [
SendWelcomeEmailHandler::class,
SetupUserAccountHandler::class,
];
return new \NilPortugues\MessageBus\EventBus\Translator\EventFullyQualifiedClassNameStrategy($handlers);
};
$container['EventHandlerResolver'] = function($container) {
return new \NilPortugues\MessageBus\EventBus\Resolver\PsrContainerResolver($container);
};
$container['EventBusMiddleware'] = function($container) {
return new \NilPortugues\MessageBus\EventBus\EventBusMiddleware(
$container->get('EventTranslator'),
$container->get('EventHandlerResolver'),
);
};
$container['EventBus'] = function($container) {
return new \NilPortugues\MessageBus\EventBus\EventBus([
$container->get('EventBusMiddleware'),
]);
};
If for instance, we want to log everything happening in the Event Bus, we'll add to the middleware list the logger middleware. This will wrap the Event Bus, being able to log before and after it ran, and if there was an error.
<?php
//...your other registered classes
$container['LoggerEventBusMiddleware'] = function($container) {
return new \NilPortugues\MessageBus\EventBus\LoggerEventBusMiddleware(
$container->get('Monolog')
);
};
//Update the EventBus with the LoggerEventBusMiddleware
$container['EventBus'] = function($container) {
return new \NilPortugues\MessageBus\EventBus\EventBus([
$container->get('LoggerEventBusMiddleware'),
$container->get('EventBusMiddleware'),
]);
};
Finally, to make use of the EventBus, all you need to do is run this code:
<?php
$eventBus = $container->get('EventBus');
$Event = new GetUser(1):
$userEventResponse = $eventBus($Event);
Save your users time and load your pages faster! Go asynchronous using a queue.
To do so, you'll have to require an additional package: EventBus Queue. This extension can be downloaded using composer:
composer require nilportugues/eventbus-queue
Documentation and installation guide can be found in its repository.
TransactionalEventBusMiddleware
- Class:
NilPortugues\MessageBus\EventBus\TransactionalEventBusMiddleware
- Class construct method expects a PDO connection. It will wrap all the underlying middleware calls with beginTransaction-commit and rollback if any kind of exception is thrown.
LoggerEventBusMiddleware
- Class:
NilPortugues\MessageBus\EventBus\LoggerEventBusMiddleware
- Class construct method expects a PSR3 Logger implementation.
ProducerEventBusMiddleware
- Class:
NilPortugues\MessageBus\EventBusQueue\ProducerEventBusMiddleware
- Adds events to an Event Queue. Required running
composer require nilportugues/eventbus-queue
first.
In order to write custom middleware a new class implementing the NilPortugues\MessageBus\EventBus\Contracts\EventBusMiddleware
interface is required.
Serializers are to be used mainly all the <Name>ProducerEventBusMiddleware
classes. You may also find this in Cache classes.
Choose one or another depending on your needs.
For caching, this is the best option.
In the EventBus use this if your Consumer is written in PHP and will share the same code base as the object serialized.
- Pros: Fastest serialization possible.
- Cons: Consumer must be written in PHP and classes must be available or unserialize will fail.
Not recommended for caching.
In the EventBus use this if your Consumer is written in PHP but your consumers may be written in many languages.
- Pros: If consumer is PHP, the data will be restored and if sharing the same code base as the object serialized, an object can be obtained on unserialize. If not, you may be able to fetch data manually as regular JSON.
- Cons : If fetching data from the data-store as JSON it will hold references to the PHP data-structure, but does not interfere consuming data.
Doesn't work for caching.
In the EventBus use this if your Consumer is written in PHP but you're not consuming data currently.
- Pros: JSON can be reused to consume this data in the future as its supported everywhere.
- Cons: You'll have to write your consumer to read the JSON structure.
Contributions to the package are always welcome!
- Report any bugs or issues you find on the issue tracker.
- You can grab the source code at the package's Git repository.
Get in touch with me using one of the following means:
- Emailing me at [email protected]
- Opening an Issue
The code base is licensed under the MIT license.