Commit ac0cee8f authored by alexpott's avatar alexpott

Issue #2038275 by tim.plunkett, mr.baileys, Sam Hermans: Convert...

Issue #2038275 by tim.plunkett, mr.baileys, Sam Hermans: Convert hook_queue_info() to plugin system.
parent 829715b7
......@@ -162,7 +162,7 @@ services:
arguments: ['@typed_data_manager']
cron:
class: Drupal\Core\Cron
arguments: ['@module_handler', '@lock', '@queue', '@state', '@current_user', '@session_manager', '@logger.channel.cron']
arguments: ['@module_handler', '@lock', '@queue', '@state', '@current_user', '@session_manager', '@logger.channel.cron', '@plugin.manager.queue_worker']
diff.formatter:
class: Drupal\Core\Diff\DiffFormatter
arguments: ['@config.factory']
......@@ -353,6 +353,9 @@ services:
plugin.manager.display_variant:
class: Drupal\Core\Display\VariantManager
parent: default_plugin_manager
plugin.manager.queue_worker:
class: Drupal\Core\Queue\QueueWorkerManager
parent: default_plugin_manager
plugin.cache_clearer:
class: Drupal\Core\Plugin\CachedDiscoveryClearer
paramconverter.menu_link:
......
<?php
/**
* @file
* Contains \Drupal\Core\Annotation\QueueWorker.
*/
namespace Drupal\Core\Annotation;
use Drupal\Component\Annotation\Plugin;
/**
* Declare queue workers that need to be run periodically.
*
* While there can be only one hook_cron() process running at the same time,
* there can be any number of processes defined here running. Because of
* this, long running tasks are much better suited for this API. Items queued
* in hook_cron() might be processed in the same cron run if there are not many
* items in the queue, otherwise it might take several requests, which can be
* run in parallel.
*
* You can create queues, add items to them, claim them, etc. without using a
* QueueWorker plugin if you want, however, you need to take care of processing
* the items in the queue in that case. See \Drupal\Core\Cron for an example.
*
* Plugin Namespace: Plugin\QueueWorker
*
* For a working example, see
* \Drupal\aggregator\Plugin\QueueWorker\AggregatorRefresh.
*
* @see \Drupal\Core\Queue\QueueWorkerInterface
* @see \Drupal\Core\Queue\QueueWorkerBase
* @see \Drupal\Core\Queue\QueueWorkerManager
* @see plugin_api
*
* @Annotation
*/
class QueueWorker extends Plugin {
/**
* The plugin ID.
*
* @var string
*/
public $id;
/**
* The human-readable title of the plugin.
*
* @ingroup plugin_translatable
*
* @var \Drupal\Core\Annotation\Translation
*/
public $title;
/**
* An associative array containing the optional key:
* - time: (optional) How much time Drupal cron should spend on calling
* this worker in seconds. Defaults to 15.
*
* @var array (optional)
*/
public $cron;
}
......@@ -8,6 +8,7 @@
namespace Drupal\Core;
use Drupal\Core\Extension\ModuleHandlerInterface;
use Drupal\Core\Queue\QueueWorkerManagerInterface;
use Drupal\Core\State\StateInterface;
use Drupal\Core\Lock\LockBackendInterface;
use Drupal\Core\Queue\QueueFactory;
......@@ -71,6 +72,13 @@ class Cron implements CronInterface {
*/
protected $logger;
/**
* The queue plugin manager.
*
* @var \Drupal\Core\Queue\QueueWorkerManagerInterface
*/
protected $queueManager;
/**
* Constructs a cron object.
*
......@@ -88,8 +96,10 @@ class Cron implements CronInterface {
* The session manager.
* @param \Psr\Log\LoggerInterface $logger
* A logger instance.
* @param \Drupal\Core\Queue\QueueWorkerManagerInterface
* The queue plugin manager.
*/
public function __construct(ModuleHandlerInterface $module_handler, LockBackendInterface $lock, QueueFactory $queue_factory, StateInterface $state, AccountProxyInterface $current_user, SessionManagerInterface $session_manager, LoggerInterface $logger) {
public function __construct(ModuleHandlerInterface $module_handler, LockBackendInterface $lock, QueueFactory $queue_factory, StateInterface $state, AccountProxyInterface $current_user, SessionManagerInterface $session_manager, LoggerInterface $logger, QueueWorkerManagerInterface $queue_manager) {
$this->moduleHandler = $module_handler;
$this->lock = $lock;
$this->queueFactory = $queue_factory;
......@@ -97,6 +107,7 @@ public function __construct(ModuleHandlerInterface $module_handler, LockBackendI
$this->currentUser = $current_user;
$this->sessionManager = $session_manager;
$this->logger = $logger;
$this->queueManager = $queue_manager;
}
/**
......@@ -162,21 +173,18 @@ protected function setCronLastTime() {
*/
protected function processQueues() {
// Grab the defined cron queues.
$queues = $this->moduleHandler->invokeAll('queue_info');
$this->moduleHandler->alter('queue_info', $queues);
foreach ($queues as $queue_name => $info) {
foreach ($this->queueManager->getDefinitions() as $queue_name => $info) {
if (isset($info['cron'])) {
// Make sure every queue exists. There is no harm in trying to recreate
// an existing queue.
$this->queueFactory->get($queue_name)->createQueue();
$callback = $info['worker callback'];
$queue_worker = $this->queueManager->createInstance($queue_name);
$end = time() + (isset($info['cron']['time']) ? $info['cron']['time'] : 15);
$queue = $this->queueFactory->get($queue_name);
while (time() < $end && ($item = $queue->claimItem())) {
try {
call_user_func_array($callback, array($item->data));
$queue_worker->processItem($item->data);
$queue->deleteItem($item);
}
catch (SuspendQueueException $e) {
......
......@@ -50,7 +50,7 @@ function __construct(Settings $settings) {
* least once is important, FALSE if scalability is the main concern. Defaults
* to FALSE.
*
* @return \Drupal\Core\QueueStore\QueueInterface
* @return \Drupal\Core\Queue\QueueInterface
* A queue implementation for the given name.
*/
public function get($name, $reliable = FALSE) {
......
<?php
/**
* @file
* Contains \Drupal\Core\Queue\QueueWorkerBase.
*/
namespace Drupal\Core\Queue;
use Drupal\Component\Plugin\PluginBase;
/**
* Provides a base implementation for a QueueWorker plugin.
*
* @see \Drupal\Core\Queue\QueueWorkerInterface
* @see \Drupal\Core\Queue\QueueWorkerManager
* @see \Drupal\Core\Annotation\QueueWorker
* @see plugin_api
*/
abstract class QueueWorkerBase extends PluginBase implements QueueWorkerInterface {
}
<?php
/**
* @file
* Contains \Drupal\Core\Queue\QueueWorkerInterface.
*/
namespace Drupal\Core\Queue;
use Drupal\Component\Plugin\PluginInspectionInterface;
/**
* Defines an interface for a QueueWorker plugin.
*
* @see \Drupal\Core\Queue\QueueWorkerBase
* @see \Drupal\Core\Queue\QueueWorkerManager
* @see \Drupal\Core\Annotation\QueueWorker
* @see plugin_api
*/
interface QueueWorkerInterface extends PluginInspectionInterface {
/**
* Works on a single queue item.
*
* @param mixed $data
* The data that was passed to
* \Drupal\Core\Queue\QueueInterface::createItem() when the item was queued.
*
* @throws \Exception
* A QueueWorker plugin may throw an exception to indicate there was a
* problem. The cron process will log the exception, and leave the item in
* the queue to be processed again later.
* @throws \Drupal\Core\Queue\SuspendQueueException
* More specifically, a SuspendQueueException should be thrown when a
* QueueWorker plugin is aware that the problem will affect all subsequent
* workers of its queue. For example, a callback that makes HTTP requests
* may find that the remote server is not responding. The cron process will
* behave as with a normal Exception, and in addition will not attempt to
* process further items from the current item's queue during the current
* cron run.
*
* @see \Drupal\Core\Cron::processQueues()
*/
public function processItem($data);
}
<?php
/**
* @file
* Contains \Drupal\Core\Queue\QueueWorkerManager.
*/
namespace Drupal\Core\Queue;
use Drupal\Core\Cache\CacheBackendInterface;
use Drupal\Core\Extension\ModuleHandlerInterface;
use Drupal\Core\Plugin\DefaultPluginManager;
/**
* Defines the queue worker manager.
*
* @see \Drupal\Core\Queue\QueueWorkerInterface
* @see \Drupal\Core\Queue\QueueWorkerBase
* @see \Drupal\Core\Annotation\QueueWorker
* @see plugin_api
*/
class QueueWorkerManager extends DefaultPluginManager implements QueueWorkerManagerInterface {
/**
* Constructs an QueueWorkerManager object.
*
* @param \Traversable $namespaces
* An object that implements \Traversable which contains the root paths
* keyed by the corresponding namespace to look for plugin implementations.
* @param \Drupal\Core\Cache\CacheBackendInterface $cache_backend
* Cache backend instance to use.
* @param \Drupal\Core\Extension\ModuleHandlerInterface $module_handler
* The module handler.
*/
public function __construct(\Traversable $namespaces, CacheBackendInterface $cache_backend, ModuleHandlerInterface $module_handler) {
parent::__construct('Plugin/QueueWorker', $namespaces, $module_handler, 'Drupal\Core\Queue\QueueWorkerInterface', 'Drupal\Core\Annotation\QueueWorker');
$this->setCacheBackend($cache_backend, 'queue_plugins');
$this->alterInfo('queue_info');
}
/**
* {@inheritdoc}
*/
public function processDefinition(&$definition, $plugin_id) {
parent::processDefinition($definition, $plugin_id);
// Assign a default time if a cron is specified.
if (isset($definition['cron'])) {
$definition['cron'] += [
'time' => 15,
];
}
}
/**
* {@inheritdoc}
*
* @return \Drupal\Core\Queue\QueueWorkerInterface
*/
public function createInstance($plugin_id, array $configuration = []) {
return parent::createInstance($plugin_id, $configuration);
}
}
<?php
/**
* @file
* Contains \Drupal\Core\Queue\QueueWorkerManagerInterface.
*/
namespace Drupal\Core\Queue;
use Drupal\Component\Plugin\PluginManagerInterface;
/**
* Provides an interface for a queue worker manager.
*/
interface QueueWorkerManagerInterface extends PluginManagerInterface {
}
......@@ -6,7 +6,6 @@
*/
use Drupal\aggregator\Entity\Feed;
use Drupal\aggregator\FeedInterface;
use Drupal\Component\Utility\Xss;
use Drupal\Core\Routing\RouteMatchInterface;
......@@ -161,22 +160,6 @@ function aggregator_cron() {
}
}
/**
* Implements hook_queue_info().
*/
function aggregator_queue_info() {
$queues['aggregator_feeds'] = array(
'title' => t('Aggregator refresh'),
'worker callback' => function (FeedInterface $feed) {
$feed->refreshItems();
},
'cron' => array(
'time' => 60,
),
);
return $queues;
}
/**
* Renders the HTML content safely, as allowed.
*
......
<?php
/**
* @file
* Contains \Drupal\aggregator\Plugin\QueueWorker\AggregatorRefresh.
*/
namespace Drupal\aggregator\Plugin\QueueWorker;
use Drupal\aggregator\FeedInterface;
use Drupal\Core\Queue\QueueWorkerBase;
/**
* @QueueWorker(
* id = "aggregator_feeds",
* title = @Translation("Aggregator refresh"),
* cron = {"time" = 60}
* )
*/
class AggregatorRefresh extends QueueWorkerBase {
/**
* {@inheritdoc}
*/
public function processItem($data) {
if ($data instanceof FeedInterface) {
$data->refreshItems();
}
}
}
......@@ -21,11 +21,11 @@ public function testCron() {
$this->createSampleNodes();
$feed = $this->createFeed();
$this->cronRun();
$this->assertEqual(5, db_query('SELECT COUNT(*) FROM {aggregator_item} WHERE fid = :fid', array(':fid' => $feed->id()))->fetchField(), 'Expected number of items in database.');
$this->assertEqual(5, db_query('SELECT COUNT(*) FROM {aggregator_item} WHERE fid = :fid', array(':fid' => $feed->id()))->fetchField());
$this->deleteFeedItems($feed);
$this->assertEqual(0, db_query('SELECT COUNT(*) FROM {aggregator_item} WHERE fid = :fid', array(':fid' => $feed->id()))->fetchField(), 'Expected number of items in database.');
$this->assertEqual(0, db_query('SELECT COUNT(*) FROM {aggregator_item} WHERE fid = :fid', array(':fid' => $feed->id()))->fetchField());
$this->cronRun();
$this->assertEqual(5, db_query('SELECT COUNT(*) FROM {aggregator_item} WHERE fid = :fid', array(':fid' => $feed->id()))->fetchField(), 'Expected number of items in database.');
$this->assertEqual(5, db_query('SELECT COUNT(*) FROM {aggregator_item} WHERE fid = :fid', array(':fid' => $feed->id()))->fetchField());
// Test feed locking when queued for update.
$this->deleteFeedItems($feed);
......@@ -36,7 +36,7 @@ public function testCron() {
))
->execute();
$this->cronRun();
$this->assertEqual(0, db_query('SELECT COUNT(*) FROM {aggregator_item} WHERE fid = :fid', array(':fid' => $feed->id()))->fetchField(), 'Expected number of items in database.');
$this->assertEqual(0, db_query('SELECT COUNT(*) FROM {aggregator_item} WHERE fid = :fid', array(':fid' => $feed->id()))->fetchField());
db_update('aggregator_feed')
->condition('fid', $feed->id())
->fields(array(
......@@ -44,6 +44,6 @@ public function testCron() {
))
->execute();
$this->cronRun();
$this->assertEqual(5, db_query('SELECT COUNT(*) FROM {aggregator_item} WHERE fid = :fid', array(':fid' => $feed->id()))->fetchField(), 'Expected number of items in database.');
$this->assertEqual(5, db_query('SELECT COUNT(*) FROM {aggregator_item} WHERE fid = :fid', array(':fid' => $feed->id()))->fetchField());
}
}
......@@ -361,88 +361,18 @@ function locale_themes_uninstalled($themes) {
/**
* Implements hook_cron().
*
* @see locale_queue_info()
* @see \Drupal\locale\Plugin\QueueWorker\LocaleTranslation
*/
function locale_cron() {
// Update translations only when an update frequency was set by the admin
// and a translatable language was set.
// Update tasks are added to the queue here but processed by Drupal's cron
// using the cron worker defined in locale_queue_info().
// Update tasks are added to the queue here but processed by Drupal's cron.
if ($frequency = \Drupal::config('locale.settings')->get('translation.update_interval_days') && locale_translatable_language_list()) {
module_load_include('translation.inc', 'locale');
locale_cron_fill_queue();
}
}
/**
* Implements hook_queue_info().
*/
function locale_queue_info() {
$queues['locale_translation'] = array(
'title' => t('Update translations'),
'worker callback' => 'locale_translation_worker',
'cron' => array(
'time' => 30,
),
);
return $queues;
}
/**
* Callback: Executes interface translation queue tasks.
*
* The translation update functions executed here are batch operations which
* are also used in translation update batches. The batch functions may need to
* be executed multiple times to complete their task, typically this is the
* translation import function. When a batch function is not finished, a new
* queue task is created and added to the end of the queue. The batch context
* data is needed to continue the batch task is stored in the queue with the
* queue data.
*
* @param array $data
* Queue data array containing:
* - Function name.
* - Array of function arguments. Optionally contains the batch context data.
*
* @see locale_queue_info()
*/
function locale_translation_worker($data) {
module_load_include('batch.inc', 'locale');
list($function, $args) = $data;
// We execute batch operation functions here to check, download and import the
// translation files. Batch functions use a context variable as last argument
// which is passed by reference. When a batch operation is called for the
// first time a default batch context is created. When called iterative
// (usually the batch import function) the batch context is passed through via
// the queue and is part of the $data.
$last = count($args) - 1;
if (!is_array($args[$last]) || !isset($args[$last]['finished'])) {
$batch_context = array(
'sandbox' => array(),
'results' => array(),
'finished' => 1,
'message' => '',
);
}
else {
$batch_context = $args[$last];
unset ($args[$last]);
}
$args = array_merge($args, array(&$batch_context));
// Call the batch operation function.
call_user_func_array($function, $args);
// If the batch operation is not finished we create a new queue task to
// continue the task. This is typically the translation import task.
if ($batch_context['finished'] < 1) {
unset($batch_context['strings']);
$queue = \Drupal::queue('locale_translation', TRUE);
$queue->createItem(array($function, $args));
}
}
/**
* Imports translations when new modules or themes are installed.
*
......
<?php
/**
* @file
* Contains \Drupal\locale\Plugin\QueueWorker\LocaleTranslation.
*/
namespace Drupal\locale\Plugin\QueueWorker;
use Drupal\Core\Extension\ModuleHandlerInterface;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\Queue\QueueInterface;
use Drupal\Core\Queue\QueueWorkerBase;
use Symfony\Component\DependencyInjection\ContainerInterface;
/**
* Executes interface translation queue tasks.
*
* @QueueWorker(
* id = "locale_translation",
* title = @Translation("Update translations"),
* cron = {"time" = 30}
* )
*/
class LocaleTranslation extends QueueWorkerBase implements ContainerFactoryPluginInterface {
/**
* The module handler.
*
* @var \Drupal\Core\Extension\ModuleHandlerInterface
*/
protected $moduleHandler;
/**
* The queue object.
*
* @var \Drupal\Core\Queue\QueueInterface
*/
protected $queue;
/**
* Constructs a new LocaleTranslation object.
*
* @param array $configuration
* A configuration array containing information about the plugin instance.
* @param string $plugin_id
* The plugin_id for the plugin instance.
* @param array $plugin_definition
* The plugin implementation definition.
* @param \Drupal\Core\Extension\ModuleHandlerInterface $module_handler
* The module handler.
* @param \Drupal\Core\Queue\QueueInterface $queue
* The queue object.
*/
public function __construct(array $configuration, $plugin_id, array $plugin_definition, ModuleHandlerInterface $module_handler, QueueInterface $queue) {
parent::__construct($configuration, $plugin_id, $plugin_definition);
$this->moduleHandler = $module_handler;
$this->queue = $queue;
}
/**
* {@inheritdoc}
*/
public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
return new static(
$configuration,
$plugin_id,
$plugin_definition,
$container->get('module_handler'),
$container->get('queue')->get('locale_translation', TRUE)
);
}
/**
* {@inheritdoc}
*
* The translation update functions executed here are batch operations which
* are also used in translation update batches. The batch functions may need
* to be executed multiple times to complete their task, typically this is the
* translation import function. When a batch function is not finished, a new
* queue task is created and added to the end of the queue. The batch context
* data is needed to continue the batch task is stored in the queue with the
* queue data.
*/
public function processItem($data) {
$this->moduleHandler->loadInclude('locale', 'batch.inc');
list($function, $args) = $data;
// We execute batch operation functions here to check, download and import
// the translation files. Batch functions use a context variable as last
// argument which is passed by reference. When a batch operation is called
// for the first time a default batch context is created. When called
// iterative (usually the batch import function) the batch context is passed
// through via the queue and is part of the $data.
$last = count($args) - 1;
if (!is_array($args[$last]) || !isset($args[$last]['finished'])) {
$batch_context = [
'sandbox' => [],
'results' => [],
'finished' => 1,
'message' => '',
];
}
else {
$batch_context = $args[$last];
unset ($args[$last]);
}
$args = array_merge($args, [&$batch_context]);
// Call the batch operation function.
call_user_func_array($function, $args);
// If the batch operation is not finished we create a new queue task to
// continue the task. This is typically the translation import task.
if ($batch_context['finished'] < 1) {
unset($batch_context['strings']);
$this->queue->createItem([$function, $args]);
}
}
}
......@@ -62,8 +62,8 @@ function hook_hook_info() {
* Long-running tasks and tasks that could time out, such as retrieving remote
* data, sending email, and intensive file tasks, should use the queue API
* instead of executing the tasks directly. To do this, first define one or
* more queues via hook_queue_info(). Then, add items that need to be
* processed to the defined queues.
* more queues via a \Drupal\Core\Annotation\QueueWorker plugin. Then, add items
* that need to be processed to the defined queues.
*/
function hook_cron() {
// Short-running operation example, not using a queue:
......@@ -98,46 +98,6 @@ function hook_data_type_info_alter(&$data_types) {
$data_types['email']['class'] = '\Drupal\mymodule\Type\Email';
}
/**
* Declare queues holding items that need to be run periodically.
*
* While there can be only one hook_cron() process running at the same time,
* there can be any number of processes defined here running. Because of
* this, long running tasks are much better suited for this API. Items queued
* in hook_cron() might be processed in the same cron run if there are not many
* items in the queue, otherwise it might take several requests, which can be
* run in parallel.
*
* You can create queues, add items to them, claim them, etc without declaring
* the queue in this hook if you want, however, you need to take care of
* processing the items in the queue in that case.
*
* @return
* An associative array where the key is the queue name and the value is
* again an associative array. Possible keys are:
* - 'worker callback': A PHP callable to call that is an implementation of
* callback_queue_worker().
* - 'cron': (optional) An associative array containing the optional key:
* - 'time': (optional) How much time Drupal cron should spend on calling
* this worker in seconds. Defaults to 15.
* If the cron key is not defined, the queue will not be processed by cron,
* and must be processed by other means.
*
* @see hook_cron()
* @see hook_queue_info_alter()
*/
function hook_queue_info() {
$queues['aggregator_feeds'] = array(
'title' => t('Aggregator refresh'),
'worker callback' => array('Drupal\my_module\MyClass', 'aggregatorRefresh'),
// Only needed if this queue should be processed by cron.
'cron' => array(
'time' => 60,
),
);
return $queues;
}
/**
* Alter cron queue information before cron runs.
*
......@@ -147,7 +107,8 @@ function hook_queue_info() {
* @param array $queues
* An array of cron queue information.
*
* @see hook_queue_info()
* @see \Drupal\Core\QueueWorker\QueueWorkerInterface
* @see \Drupal\Core\Annotation\QueueWorker
* @see \Drupal\Core\Cron
*/
function hook_queue_info_alter(&$queues) {
......@@ -156,35 +117,6 @@ function hook_queue_info_alter(&$queues) {
$queues['aggregator_feeds']['cron']['time'] = 90;
}
/**
* Work on a single queue item.
*
* Callback for hook_queue_info().
*
* @param $queue_item_data
* The data that was passed to \Drupal\Core\Queue\QueueInterface::createItem()
* when the item was queued.
*
* @throws \Exception
* The worker callback may throw an exception to indicate there was a problem.
* The cron process will log the exception, and leave the item in the queue to
* be processed again later.
* @throws \Drupal\Core\Queue\SuspendQueueException
* More specifically, a SuspendQueueException should be thrown when the
* callback is aware that the problem will affect all subsequent workers of
* its queue. For example, a callback that makes HTTP requests may find that
* the remote server is not responding. The cron process will behave as with a
* normal Exception, and in addition will not attempt to process further items
* from the current item's queue during the current cron run.
*
* @see \Drupal\Core\Cron::run()
*/
function callback_queue_worker($queue_item_data) {
$node = node_load($queue_item_data);
$node->title = 'Updated title';
$node->save();
}
/**
* Allows modules to declare their own Form API element types and specify their