Leveraging the Command Pattern: Enhancing Drupal with Symfony Messenger.pdf

lussoluca 79 views 49 slides Sep 25, 2024
Slide 1
Slide 1 of 49
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21
Slide 22
22
Slide 23
23
Slide 24
24
Slide 25
25
Slide 26
26
Slide 27
27
Slide 28
28
Slide 29
29
Slide 30
30
Slide 31
31
Slide 32
32
Slide 33
33
Slide 34
34
Slide 35
35
Slide 36
36
Slide 37
37
Slide 38
38
Slide 39
39
Slide 40
40
Slide 41
41
Slide 42
42
Slide 43
43
Slide 44
44
Slide 45
45
Slide 46
46
Slide 47
47
Slide 48
48
Slide 49
49

About This Presentation

Managing user interactions can be complicated. Some operations must be executed immediately, whereas others can be scheduled for the future. Drupal core has batches and queues, but we can do better. We can dispatch commands that can be synchronous and asynchronous, in real-time or delayed.

The Comm...


Slide Content

Leveraging the
command Pattern:
Enhancing Drupal
with
Symfony Messenger
Lusso Luca

About me
Drupal / PHP / Go developer @ SparkFabrik
Drupal contributor (WebProfiler, Monolog, Symfony
Messenger, Search API Typesense) and speaker

Drupal.org: https://www.drupal.org/u/lussoluca
LinkedIn: www.linkedin.com/in/lussoluca
Slack (drupal.slack.com): lussoluca
Mastodon: @[email protected]




@lussoluca

WE ARE A TECH COMPANY OF ENGINEERS,
DEVELOPERS AND DESIGNERS WHO WILL
THINK, DESIGN AND BUILD YOUR CUSTOM APPLICATIONS,
MODERNIZE YOUR LEGACY
AND TAKE YOU TO THE CLOUD NATIVE ERA

4
PROUD OF OUR PARTNERSHIPS
We help italian businesses to
bridge the gap with China
thanks to our
official partnership with Alibaba
Cloud
We are Google Cloud Platform
Technology Partner
We are official AWS partners

5
PROUD OF OUR MEMBERSHIPS
We arere Silver Member of the
Open Source Security
Foundation
We are supporter of the
Cloud Transformation
Observatory of the PoliMi
We are Silver Member of the
Cloud Native Computing
Foundation

We are Silver Member of the
Linux Foundation Europe

Messenger provides a message bus with the ability to
send messages and then handle them immediately in
your application or send them through transports (e.g.
queues) to be handled later.

Command is a behavioral design pattern that turns a
request into a stand-alone object that contains all
information about the request. This transformation lets
you pass requests as a method arguments, delay or
queue a request’s execution.
Symfony Messenger
➔ https://symfony.com/doc/current/messenger.html
➔ https://refactoring.guru/design-patterns/command
6

A little bit of history
7




Why there are two projects on drupal.org about
Symfony Messenger:

➔ https://www.drupal.org/project/symfony_messenger
➔ https://www.drupal.org/project/sm

???

lussoluca: creates the project
https://www.drupal.org/project/symfony_messenger on
26 May 2023 (based on some code for an internal
client) ??????

dpi: pushed a MR with a lot of new code on 6 September
2023 ??????

lussoluca: has a lot to do because the upcoming
DrupalCon Lille, ignoring dpi’s MR for too long ??????

dpi: on 13 October 2023 decides to fork the module on a
new project: https://www.drupal.org/project/sm ??????

lussoluca: discovers that dpi’s version is way better than
his own, deprecates symfony_messenger and starts
contributing to sm ??????

8
➔ Main task is to properly register and configure services in the service container
➔ It leverages most of the power and the features provided by the Symfony
DependencyInjection component
➔ To work on the sm code it must be important to know all those service container features
Sm module is a thin wrapper around Symfony Messenger

9
➔ Tags
➔ Service providers
➔ Compiler passes
➔ Autowiring
➔ Autoconfiguration
➔ Aliases
➔ Service visibility
➔ Abstract services
➔ Abstract service arguments
➔ Named arguments
➔ Calls

Service container deep dive

10
messenger.retry_strategy_locator:
class: Symfony\Component\DependencyInjection\ServiceLocator
arguments:
- []
tags:
- { name: 'container.service_locator' }
➔ A service can be tagged to be
retrieved and collected later
Tags

11
declare(strict_types=1);

namespace Drupal\webprofiler;

use Drupal\Core\DependencyInjection\ContainerBuilder;
use Drupal\Core\DependencyInjection\ServiceProviderBase;

class WebprofilerServiceProvider extends ServiceProviderBase {

public function register(ContainerBuilder $container): void {
[...]
}

public function alter(ContainerBuilder $container): void {
[...]
}
➔ Some services cannot be easily
defined using yaml, or must be
defined conditionally
➔ Some services provided by core
or other modules can be
removed/altered
Service providers

12
declare(strict_types=1);

namespace Drupal\sm;

use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface ;
use Symfony\Component\DependencyInjection\ContainerBuilder;

final class SmCompilerPass implements CompilerPassInterface {

public function process(ContainerBuilder $container): void {
[...]
}
}
➔ The service container is
“compiled”, all its services are
collected and processed to build
the final container
Compiler passes

13
services:
_defaults:
autowire: true
example.service:
class: Drupal\drupalcon2024\ExampleService



final class ExampleService {

public function __construct(
private readonly MessageBusInterface $bus,
) {
}

public function exampleMethod(): void {
$this
->bus
->dispatch(new ExampleMessage('Hello, DrupalCon 2024!' ));
}
The service will be instantiated in
exactly the same way as before, but
there is no need to explicitly specify
which arguments are required; the
interfaces that are declared in the
service constructor will be used to
discover which services should be
injected.

➔ https://www.drupal.org/node/321
8156
➔ https://symfony.com/doc/curren
t/service_container/autowiring.h
tml
Autowiring

14
# No need to tag example_subscriber with “event_subscriber” tag.
services:
_defaults:
autoconfigure: true
example_subscriber:
class: Drupal\drupalcon2024\ExampleSubscriber


// In CoreServiceProvider.
$container
->registerForAutoconfiguration (EventSubscriberInterface ::class)
->addTag('event_subscriber');

// ExampleSubscriber is an event_subscriber
final class ExampleSubscriber implements EventSubscriberInterface {

public static function getSubscribedEvents(): array {
...
}




This means that services that
implement a specific interface no longer
need to be individually tagged, you can
just specify autoconfigure: true in the
_defaults section of a module's
services.yml and all services will be
automatically tagged.

In core services that implement those
interfaces are automatically tagged:

➔ MediaLibraryOpenerInterface
(tag: media_library.opener)
➔ EventSubscriberInterface
(tag: event_subscriber)
➔ LoggerAwareInterface
(tag: logger_aware)
➔ QueueFactoryInterface
(tag: queue_factory)
Autoconfiguration

15
# From:
services:
drupalcon2024.example_subscriber :
class: Drupal\drupalcon2024\EventSubscriber\ExampleSubscriber
arguments:
- '@config.factory'
- '@current_user'
- '@router.admin_context'
- '@current_route_match'
- '@messenger'
tags:
- { name: event_subscriber }

# To:
services:
_defaults:
autoconfigure: true
autowiring: true

Drupal\drupalcon2024\EventSubscriber\ExampleSubscriber : ~
➔ Services definition can be written
with a very short syntax
➔ Arguments and tags can be
defined in code, without the
need to change the yaml
definition
➔ Autowire and autoconfigure must
be enabled module by module, in
the module_name.services.yml
file
Autowiring +
autoconfiguration

16
#[AsMessageHandler]
final class ExampleMessageHandler {

public function __construct(
#[Autowire(service: 'logger.channel.drupalcon2024' )]
private readonly LoggerInterface $logger
) {
}
}
➔ Some services cannot be
autowired, usually because the
interface is implemented by
multiple different services (like
loggers in Drupal)
Autowiring +
autoconfiguration

17
messenger.middleware.send_message:
class: Symfony\Component\Messenger\Middleware\SendMessageMiddleware
abstract: true
arguments:
$eventDispatcher: '@event_dispatcher'
calls:
- [setLogger, ['@logger.channel.sm']]
public: false
➔ A different technique to inject
arguments that cannot be
autowired is to use a named
argument
➔ Useful when you cannot change
the service class implementation
Named arguments

18
class SendMessageMiddleware implements MiddlewareInterface {
use LoggerAwareTrait;

public function __construct(
private SendersLocatorInterface $sendersLocator,
private ?EventDispatcherInterface $eventDispatcher = null,
private bool $allowNoSenders = true,
) {
}
}
➔ The constructor argument must
have the same variable name as
the argument
Named arguments

19
cache_tags.invalidator.checksum :
class: Drupal\Core\Cache\DatabaseCacheTagsChecksum
arguments: ['@database']
tags:
- { name: cache_tags_invalidator }
- { name: backend_overridable }
Drupal\Core\Cache\CacheTagsChecksumInterface : '@cache_tags.invalidator.checksum'
➔ https://www.drupal.org/node/33
23122
➔ Useful for autowiring
Aliases

20
messenger.transport.native_php_serializer:
class: Symfony\Component\Messenger\Transport\Serialization\PhpSerializer
autowire: true
public: false
➔ A private service can be injected
as argument in other services but
it cannot be retrieved by itself
Service visibility

21
logger.channel_base:
abstract: true
class: Drupal\Core\Logger\LoggerChannel
factory: ['@logger.factory', 'get']


logger.channel.sm:
parent: logger.channel_base
arguments:
$channel: 'sm'
public: false
➔ An abstract service cannot be
retrieved by itself
➔ Another service must exist that
extends it (by using the parent
key)
Abstract services

22
messenger.routable_message_bus:
class: Symfony\Component\Messenger\RoutableMessageBus
arguments:
- "!abstract 'message bus locator'"
public: false
➔ Sometimes an argument is not
available in the yaml definition
➔ Its value can only be calculated
at runtime in a compiler pass
Abstract service arguments

23
class MessengerPass implements CompilerPassInterface {
public function process(ContainerBuilder $container) {
[...]
$container
->getDefinition('messenger.routable_message_bus' )
->replaceArgument(
0,
ServiceLocatorTagPass ::register($container, $buses)
);
[...]
}
}
➔ Use a compiler pass to replace
the abstract argument with a real
one
Abstract service arguments

24
messenger.middleware.send_message:
class: Symfony\Component\Messenger\Middleware\SendMessageMiddleware
abstract: true
arguments:
$eventDispatcher: '@event_dispatcher'
calls:
- [setLogger, ['@logger.channel.sm' ]]
public: false
➔ Useful to call methods after the
service class has been created
➔ Usually used when the called
method comes from a trait (in
this case setLogger is defined in
the LoggerAwareTrait trait)
Calls

25
➔ Middleware — code that takes action on all
message types, and has access to the containing
envelope and stamps.
➔ Bus — a series of middleware in a particular order.
There is a default bus, and a default set of
middleware.
➔ Transport — a transport comprises a receiver and
sender. In the case of the Drupal SQL transport, its
sender will serialize the message and store it in the
database. The receiver will listen for messages
ready to be sent, and then unserialize them.
➔ Worker — a command line application responsible
for unserializing messages immediately, or at a
scheduled time in the future.


➔ Message — an arbitrary PHP object, it must be
serialisable.
➔ Message handler — a class that takes action
based on the message it is given. Typically a
message handler is designed to consume one type
of message.
➔ Envelope — an envelope contains a single
message, and it may have many stamps. A
message always has an envelope.
➔ Stamp — a piece of metadata associated with an
envelope. The most common use case is to track
whether a middleware has already operated on the
envelope. Useful when a transport re-runs a
message through the bus after deserialization.
Another useful stamp is one to set the date and
time for when a message should be processed.
Symfony messenger
Source: https://www.previousnext.com.au/blog/symfony-messenger/post-1-introducing-symfony-messenger

26
Message dispatching and handling
Symfony messenger

<?php

declare(strict_types=1);

namespace Drupal\drupalcon2024;

class ExampleMessage {

public function __construct(
public string $message,
) {}

}

27
➔ Plain Old PHP Object
➔ Must be serializable
➔ If you need to include entities,
use their IDs
An example message

#[AsMessageHandler]
final class ExampleMessageHandler {

public function __construct(
#[Autowire(service: 'logger.channel.drupalcon2024' )]
private readonly LoggerInterface $logger
) {
}

public function __invoke(ExampleMessage $message): void {
$this
->logger
->debug(
'Message received: {message}' ,
['message' => $message->message]
);
}
}
28
➔ Defined with the
AsMessageHandler attribute
➔ A single public __invoke method
that takes as argument a
message object
A message handler

<?php
namespace Drupal\drupalcon2024\Controller;

use Drupal\Core\Controller\ControllerBase;
use Drupal\drupalcon2024\ExampleMessage;
use Symfony\Component\Messenger\MessageBusInterface;

final class Drupalcon2024Controller extends ControllerBase {

public function __construct(
private readonly MessageBusInterface $bus,
) {}

public function sync(): array {
$this
->bus
->dispatch(new ExampleMessage('Hello, DrupalCon 2024!' ));

return [...];
}
29
➔ Just retrieve the bus and
dispatch the message
Dispatch a message

30
➔ A new widget to collect
dispatched messages
Webprofiler integration

31
➔ On the messenger pane,
Webprofiler shows a set of
useful information about the
dispatched message
Webprofiler integration

32
➔ On the log pane we can see all
the messages logged by the
sm module and the messages
logged by our custom handler
Webprofiler integration

33
# messenger.services.yml
parameters:
sm.routing:
Drupal\drupalcon2024\ExampleMessage : asynchronous
# Drupal\my_module\MyMessage: mytransport
# Drupal\my_module\MyMessage2: [mytransport1, mytransport2]
# 'Drupal\my_module\*': mytransport
# '*': mytransport
➔ Not yet included in sm:
https://git.drupalcode.org/projec
t/sm/-/merge_requests/23
➔ All messages are by default sync
unless configured using the
sm.routing parameter
➔ asynchronous is the name of the
async transport defined in the
sm module, but can be the name
of every other existing transport
Async

➔ messenger_messages table is automatically created when the
first async message is dispatched
➔ Messages are processed in a First In-First Out order
Async
34

➔ sm module provides with a custom console command to consume
async messages
➔ Runtime should be managed by a process manager like Supervisor
➔ Every time the code changes, the consumer process must be
restarted to pick the new version
Async
35

# sm.services.yml
parameters:
sm.transports:
synchronous:
dsn: 'sync://'
asynchronous:
dsn: 'drupal-db://default'
failed:
dsn: 'drupal-db://default?queue_name=failed'
sm.failure_transport:
failed
36
➔ Happens when a handler is not
able to handle a message
➔ Failed messages are not
discarded, they’re sent to a
failure transport
Failures

#[AsMessageHandler]
final class ExampleMessageHandler {

public function __construct(
#[Autowire(service: 'logger.channel.drupalcon2024')]
private readonly LoggerInterface $logger
) {
}

public function __invoke(ExampleMessage $message): void {
[...]

throw new \Exception('This is an example exception' );
}
}
37
➔ Happens when a handler is not
able to handle a message
Failures

38
➔ A failure can be recoverable (i.e. for a network issue): RecoverableMessageHandlingException
➔A failure can be unrecoverable (maybe some business logic issue): UnrecoverableMessageHandlingException
Failures

➔ After the issue is fixed, messages in the failed queue can be
reprocessed with: ./vendor/bin/sm messenger:consume failed
➔ The envelope has the ErrorDetailsStamp attached to retrieve the
exception details
Failures
39

40
➔A failed async message will be processed multiple times
➔ Default configuration is:
◆ Max_retries: 3
◆ Delay: 1000
◆ Multiplier: 2
◆ Max_delay: 2
◆ Jitter: 0.1
➔ If handler throws a RecoverableMessageHandlingException, the message will always be retried infinitely and
max_retries setting will be ignored
➔ If handler throws a UnrecoverableMessageHandlingException, the message will not be retried
➔ If, after all retries, the handler still throws an error, the message is moved into the failed queue
Retries

➔ Retry configuration can be
altered by a transport
Retries
# messenger.services.yml
parameters:
sm.routing:
Drupal\drupalcon2024\ExampleMessage : stubborn
sm.transports:
synchronous:
dsn: 'sync://'
asynchronous:
dsn: 'drupal-db://default'
failed:
dsn: 'drupal-db://default?queue_name=failed'
stubborn:
dsn: 'drupal-db://default?queue_name=stubborn'
retry_strategy:
max_retries: 5
41

➔ Schedule a message to be sent in
the future
Delayed send
$envelope = new Envelope(
new AsyncExampleMessage('Hello, DrupalCon 2024!' )
);
$delayed = $envelope->with(new DelayStamp(2000));

$this->bus->dispatch($delayed);
42

43
// settings.php
$settings['queue_default'] = 'Drupal\sm\QueueInterceptor\SmLegacyQueueFactory' ;

// In a controller:
$this
->queueFactory
->get('example_queue_worker' )
->createItem('Hello, DrupalCon 2024!' );

➔ Override the default queue
factory with
SmLegacyQueueFactory
➔ Send a message to the queue as
usual
➔ Internally sm will convert the
item to a message and dispatch
it to the bus
➔ A message handler will pick up
the message and process it with
a QueueWorker
Queue replacement

44
namespace Drupal\drupalcon2024\Plugin\QueueWorker;

use Drupal\Core\Plugin\ContainerFactoryPluginInterface ;
use Drupal\Core\Queue\QueueWorkerBase;
use Psr\Log\LoggerInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;

/**
* @QueueWorker(
* id = 'example_queue_worker',
* title = @Translation("Example queue worker"),
* cron = {"time" = 60}
* )
*/
class ExampleQueueWorker extends QueueWorkerBase {

public function processItem(mixed $data): void {
$this->logger->debug('Message received: {message}' , ['message' => $data]);
}
}
➔ From a developer perspective
nothing changes
➔ But now the message is handled
by Symfony Messenger, so
failures and retries are managed
automatically
Queue replacement

45
use Symfony\Component\Scheduler\Attribute\AsSchedule;
use Symfony\Component\Scheduler\RecurringMessage;
// ...

#[AsSchedule('default')]
class DefaultScheduleProvider implements ScheduleProviderInterface
{
public function getSchedule(): Schedule
{
return (new Schedule())->add(
RecurringMessage ::every('2 days', new PendingOrdersMessage())
);
}
}
➔ https://www.drupal.org/project/s
m_scheduler
➔ cron replacement on steroids
(Fabien Potencier)

Symfony scheduler

46
➔ Symfony mailer is in core
➔ Integration with Symfony Messenger is a work in
progress
(https://www.drupal.org/project/mailer_transport/
issues/3394123)
➔ The final goal is to be able to send emails using an
async transport provided by Symfony Messenger
Symfony mailer

47
1.Introducing Symfony Messenger integrations with Drupal
2.Symfony Messenger’ message and message handlers,
and comparison with @QueueWorker
3.Real-time: Symfony Messenger’ Consume command and
prioritised messages
4.Automatic message scheduling and replacing hook_cron
5.Adding real-time processing to QueueWorker plugins
6.Handling emails asynchronously: integrating Symfony
Mailer and Messenger
7.Displaying notifications when Symfony Messenger
messages are processed
8.Future of Symfony Messenger in Drupal

Resources

Join us for contribution opportunities!
Mentored
Contribution
First Time
Contributor Workshop
General
Contribution
27 September:
09:00 – 18:00
Room 111

24 September: 16:30 - 17:15
Room BoF 4 (121)
25 September: 11:30 - 12:15
Room BoF 4 (121)
27 September: 09:00 - 12:30
Room 111

24-26 September: 9:00 - 18:00
Area 1
27 September: 09 - 18:00
Room 112

#DrupalContributions

Any questions? Thanks!