Commit cf074bfd authored by dpi's avatar dpi

Added a message queue service.

Moved transmission logic from message queue item entity to the service.
Added a message queue service test.

Fixed #14
parent e08a878c
...@@ -4,10 +4,13 @@ services: ...@@ -4,10 +4,13 @@ services:
parent: default_plugin_manager parent: default_plugin_manager
courier.manager: courier.manager:
class: Drupal\courier\Service\CourierManager class: Drupal\courier\Service\CourierManager
arguments: ['@entity_type.manager', '@config.factory', '@logger.factory', '@plugin.manager.identity_channel'] arguments: ['@entity_type.manager', '@config.factory', '@logger.factory', '@plugin.manager.identity_channel', '@courier.manager.message_queue']
courier.manager.global_template_collection: courier.manager.global_template_collection:
class: Drupal\courier\Service\GlobalTemplateCollectionManager class: Drupal\courier\Service\GlobalTemplateCollectionManager
arguments: ['@keyvalue', '@courier.manager'] arguments: ['@keyvalue', '@courier.manager']
courier.manager.message_queue:
class: Drupal\courier\Service\MessageQueueManager
arguments: ['@logger.factory', '@plugin.manager.identity_channel']
courier.paramconverter.channel: courier.paramconverter.channel:
class: Drupal\courier\ParamConverter\CourierChannelConverter class: Drupal\courier\ParamConverter\CourierChannelConverter
arguments: ['@entity_type.manager'] arguments: ['@entity_type.manager']
......
<?php <?php
/**
* @file
* Contains \Drupal\courier\Entity\MessageQueueItem.
*/
namespace Drupal\courier\Entity; namespace Drupal\courier\Entity;
use Drupal\Core\Entity\ContentEntityBase; use Drupal\Core\Entity\ContentEntityBase;
...@@ -17,7 +12,7 @@ use Drupal\Core\Entity\EntityStorageInterface; ...@@ -17,7 +12,7 @@ use Drupal\Core\Entity\EntityStorageInterface;
use Drupal\courier\ChannelInterface; use Drupal\courier\ChannelInterface;
/** /**
* Defines a courier_template_collection entity. * Defines a Message queue item entity.
* *
* @ContentEntityType( * @ContentEntityType(
* id = "courier_message_queue_item", * id = "courier_message_queue_item",
...@@ -102,58 +97,6 @@ class MessageQueueItem extends ContentEntityBase implements MessageQueueItemInte ...@@ -102,58 +97,6 @@ class MessageQueueItem extends ContentEntityBase implements MessageQueueItemInte
return $this; return $this;
} }
/**
* {@inheritdoc}
*/
public function sendMessage() {
$options = $this->getOptions();
$channel_options = array_key_exists('channels', $options) ? $options['channels'] : [];
unset($options['channels']);
/** @var \Drupal\courier\Service\IdentityChannelManagerInterface $icm */
$icm = \Drupal::service('plugin.manager.identity_channel');
// Instead of iterating over messages, get the identity' channel preferences
// again. This ensures preference order is up to date since significant time
// may have passed since adding to queue.
$messages = [];
foreach ($icm->getChannelsForIdentity($this->getIdentity()) as $channel) {
if ($message = $this->getMessage($channel)) {
$messages[] = $message;
}
}
/** @var ChannelInterface[] $messages */
foreach ($messages as $message) {
$message_options = $options;
// Transform options based on channel.
$channel = $message->getEntityTypeId();
if (array_key_exists($channel, $channel_options)) {
$message_options = array_merge($message_options, $channel_options[$channel]);
}
$t_args = [
'@channel' => $channel,
'@identity' => $this->getIdentity()->label(),
];
try {
$message::sendMessages([$message], $message_options);
\Drupal::logger('courier')->info('Successfully sent @channel to @identity', $t_args);
return $message;
}
catch (\Exception $e) {
$t_args['@exception'] = $e->getMessage();
\Drupal::logger('courier')->warning('Failed to send @channel to @identity: @exception', $t_args);
continue;
}
break;
}
return FALSE;
}
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */
......
...@@ -107,16 +107,4 @@ interface MessageQueueItemInterface extends ContentEntityInterface { ...@@ -107,16 +107,4 @@ interface MessageQueueItemInterface extends ContentEntityInterface {
*/ */
public function setCreatedTime($timestamp); public function setCreatedTime($timestamp);
/**
* Attempts to send the messages on the entity.
*
* Attempts will halt as soon as a message is sent successfully.
*
* This entity can be deleted as soon as the message is sent.
*
* @return \Drupal\courier\ChannelInterface|FALSE
* The message that was sent, or FALSE if all messages failed to send.
*/
public function sendMessage();
} }
...@@ -28,9 +28,11 @@ class MessageWorker extends QueueWorkerBase { ...@@ -28,9 +28,11 @@ class MessageWorker extends QueueWorkerBase {
* - integer $id: ID of a courier_message_queue_item entity. * - integer $id: ID of a courier_message_queue_item entity.
*/ */
public function processItem($data) { public function processItem($data) {
if ($message_queue = MessageQueueItem::load($data['id'])) { $message_queue = MessageQueueItem::load($data['id']);
$message_queue->sendMessage(); if ($message_queue) {
$message_queue->delete(); /** @var \Drupal\courier\Service\MessageQueueManagerInterface $service */
$service = \Drupal::service('courier.manager.message_queue');
$service->sendMessage($message_queue);
} }
} }
......
...@@ -43,6 +43,13 @@ class CourierManager implements CourierManagerInterface { ...@@ -43,6 +43,13 @@ class CourierManager implements CourierManagerInterface {
*/ */
protected $identityChannelManager; protected $identityChannelManager;
/**
* The message queue service
*
* @var \Drupal\courier\Service\MessageQueueManagerInterface
*/
protected $messageQueue;
/** /**
* Constructs the Courier Manager. * Constructs the Courier Manager.
* *
...@@ -54,12 +61,15 @@ class CourierManager implements CourierManagerInterface { ...@@ -54,12 +61,15 @@ class CourierManager implements CourierManagerInterface {
* The logger factory service. * The logger factory service.
* @param \Drupal\courier\Service\IdentityChannelManagerInterface $identity_channel_manager * @param \Drupal\courier\Service\IdentityChannelManagerInterface $identity_channel_manager
* The identity channel manager. * The identity channel manager.
* @param \Drupal\courier\Service\MessageQueueManagerInterface $message_queue
* The message queue service.
*/ */
public function __construct(EntityTypeManagerInterface $entity_type_manager, ConfigFactoryInterface $config_factory, LoggerChannelFactoryInterface $logger_factory, IdentityChannelManagerInterface $identity_channel_manager) { public function __construct(EntityTypeManagerInterface $entity_type_manager, ConfigFactoryInterface $config_factory, LoggerChannelFactoryInterface $logger_factory, IdentityChannelManagerInterface $identity_channel_manager, MessageQueueManagerInterface $message_queue) {
$this->entityTypeManager = $entity_type_manager; $this->entityTypeManager = $entity_type_manager;
$this->configFactory = $config_factory; $this->configFactory = $config_factory;
$this->logger = $logger_factory->get('courier'); $this->logger = $logger_factory->get('courier');
$this->identityChannelManager = $identity_channel_manager; $this->identityChannelManager = $identity_channel_manager;
$this->messageQueue = $message_queue;
} }
/** /**
...@@ -157,7 +167,8 @@ class CourierManager implements CourierManagerInterface { ...@@ -157,7 +167,8 @@ class CourierManager implements CourierManagerInterface {
if ($message_queue->getMessages()) { if ($message_queue->getMessages()) {
if ($this->getSkipQueue()) { if ($this->getSkipQueue()) {
$message_queue->sendMessage(); $this->messageQueue
->sendMessage($message_queue);
} }
else { else {
$message_queue->save(); $message_queue->save();
......
<?php
namespace Drupal\courier\Service;
use Drupal\Core\Logger\LoggerChannelFactoryInterface;
use Drupal\courier\MessageQueueItemInterface;
/**
* The message queue manager.
*/
class MessageQueueManager implements MessageQueueManagerInterface {
/**
* The logger for the Courier channel.
*
* @var \Psr\Log\LoggerInterface
*/
protected $logger;
/**
* The identity channel manager.
*
* @var \Drupal\courier\Service\IdentityChannelManager
*/
protected $identityChannelManager;
/**
* Constructs a message queue manager.
*
* @param \Drupal\Core\Logger\LoggerChannelFactoryInterface $logger_factory
* The logger factory service.
* @param \Drupal\courier\Service\IdentityChannelManagerInterface $identity_channel_manager
* The identity channel manager.
*/
function __construct(LoggerChannelFactoryInterface $logger_factory, IdentityChannelManagerInterface $identity_channel_manager) {
$this->logger = $logger_factory->get('courier');
$this->identityChannelManager = $identity_channel_manager;
}
/**
* {@inheritdoc}
*/
public function sendMessage(MessageQueueItemInterface $mqi) {
$options = $mqi->getOptions();
$channel_options = array_key_exists('channels', $options) ? $options['channels'] : [];
unset($options['channels']);
// Instead of iterating over messages, get the identity' channel preferences
// again. This ensures preference order is up to date since significant time
// may have passed since adding to queue.
$channels = $this->identityChannelManager
->getChannelsForIdentity($mqi->getIdentity());
$messages = [];
foreach ($channels as $channel) {
if ($message = $mqi->getMessage($channel)) {
$messages[] = $message;
}
}
/** @var \Drupal\courier\ChannelInterface[] $messages */
foreach ($messages as $message) {
$message_options = $options;
// Transform options based on channel.
$channel = $message->getEntityTypeId();
if (array_key_exists($channel, $channel_options)) {
$message_options = array_merge($message_options, $channel_options[$channel]);
}
$t_args = [
'@channel' => $channel,
'@identity' => $mqi->getIdentity()->label(),
];
try {
$message::sendMessages([$message], $message_options);
$this->logger
->info('Successfully sent @channel to @identity', $t_args);
$mqi->delete();
return $message;
}
catch (\Exception $e) {
$t_args['@exception'] = $e->getMessage();
$this->logger
->warning('Failed to send @channel to @identity: @exception', $t_args);
continue;
}
break;
}
return FALSE;
}
}
<?php
namespace Drupal\courier\Service;
use Drupal\courier\MessageQueueItemInterface;
/**
* Interface for message queue manager.
*
* Notice: this service is internal to Courier. It should not be called outside of
* the core module.
*/
interface MessageQueueManagerInterface {
/**
* Attempts to send the messages in the message queue item.
*
* Attempts will halt as soon as a message is sent successfully, then the
* message queue item will be deleted.
*
* @param \Drupal\courier\MessageQueueItemInterface $mqi
* A message queue item.
*
* @return \Drupal\courier\ChannelInterface|FALSE
* The message that was sent, or FALSE if all messages failed to send.
*/
public function sendMessage(MessageQueueItemInterface $mqi);
}
...@@ -16,7 +16,7 @@ class CourierManagerTest extends CourierKernelTestBase { ...@@ -16,7 +16,7 @@ class CourierManagerTest extends CourierKernelTestBase {
use AssertMailTrait; use AssertMailTrait;
public static $modules = ['courier_test_message', 'user', 'system', 'dblog']; public static $modules = ['courier_test_message', 'user', 'system'];
/** /**
* @var \Drupal\courier\Service\CourierManagerInterface * @var \Drupal\courier\Service\CourierManagerInterface
...@@ -35,7 +35,6 @@ class CourierManagerTest extends CourierKernelTestBase { ...@@ -35,7 +35,6 @@ class CourierManagerTest extends CourierKernelTestBase {
$this->installEntitySchema('courier_test_message'); $this->installEntitySchema('courier_test_message');
$this->installEntitySchema('user'); $this->installEntitySchema('user');
$this->courierManager = $this->container->get('courier.manager'); $this->courierManager = $this->container->get('courier.manager');
$this->installSchema('dblog', ['watchdog']);
} }
/** /**
......
<?php
namespace Drupal\Tests\courier\Kernel;
use Drupal\courier\Entity\MessageQueueItem;
use Drupal\courier_test_message\Entity\TestMessage;
use Drupal\user\Entity\User;
/**
* Tests message queue manager.
*
* @group courier
*/
class CourierMessageQueueManagerTest extends CourierKernelTestBase {
public static $modules = ['courier_test_message', 'user'];
/**
* @var \Drupal\courier\Service\MessageQueueManagerInterface
*/
protected $messageQueue;
/**
* {@inheritdoc}
*/
protected function setUp() {
parent::setUp();
$this->installConfig(['courier']);
$this->installEntitySchema('courier_message_queue_item');
$this->installEntitySchema('courier_test_message');
$this->installEntitySchema('user');
$this->messageQueue = $this->container->get('courier.manager.message_queue');
$this->config('courier.settings')
->set('skip_queue', TRUE)
->set('channel_preferences', ['user' => ['courier_test_message']])
->save();
}
/**
* Test message queue send.
*/
public function testSendMessage() {
$identity = User::create(['uid' => 1, 'name' => $this->randomMachineName()]);
$message = TestMessage::create()
->setMessage($this->randomString());
$mqi = MessageQueueItem::create()
->setIdentity($identity)
->addMessage($message);
$result = $this->messageQueue->sendMessage($mqi);
$this->assertTrue($message === $result);
$this->assertEquals(1, count(\Drupal::state()->get('courier_test_message.messages', [])));
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment