Skip to content
Snippets Groups Projects
Commit 2d0782d9 authored by catch's avatar catch
Browse files

Issue #3198868 by dpi, acbramley, larowlan: Add delay to queue suspend

parent 4dbf3f46
No related branches found
No related tags found
28 merge requests!54479.5.x SF update,!5014Issue #3071143: Table Render Array Example Is Incorrect,!4868Issue #1428520: Improve menu parent link selection,!4289Issue #1344552 by marcingy, Niklas Fiekas, Ravi.J, aleevas, Eduardo Morales...,!4114Issue #2707291: Disable body-level scrolling when a dialog is open as a modal,!4100Issue #3249600: Add support for PHP 8.1 Enums as allowed values for list_* data types,!3630Issue #2815301 by Chi, DanielVeza, kostyashupenko, smustgrave: Allow to create...,!3600Issue #3344629: Passing null to parameter #1 ($haystack) of type string is deprecated,!2378Issue #2875033: Optimize joins and table selection in SQL entity query implementation,!2334Issue #3228209: Add hasRole() method to AccountInterface,!2062Issue #3246454: Add weekly granularity to views date sort,!1591Issue #3199697: Add JSON:API Translation experimental module,!1484Exposed filters get values from URL when Ajax is on,!1255Issue #3238922: Refactor (if feasible) uses of the jQuery serialize function to use vanillaJS,!1162Issue #3100350: Unable to save '/' root path alias,!1105Issue #3025039: New non translatable field on translatable content throws error,!1073issue #3191727: Focus states on mobile second level navigation items fixed,!10223132456: Fix issue where views instances are emptied before an ajax request is complete,!925Issue #2339235: Remove taxonomy hard dependency on node module,!877Issue #2708101: Default value for link text is not saved,!872Draft: Issue #3221319: Race condition when creating menu links and editing content deletes menu links,!844Resolve #3036010 "Updaters",!617Issue #3043725: Provide a Entity Handler for user cancelation,!579Issue #2230909: Simple decimals fail to pass validation,!560Move callback classRemove outside of the loop,!555Issue #3202493,!485Sets the autocomplete attribute for username/password input field on login form.,!30Issue #3182188: Updates composer usage to point at ./vendor/bin/composer
......@@ -213,3 +213,9 @@ parameters:
maxAge: false
# Sets the Access-Control-Allow-Credentials header.
supportsCredentials: false
queue.config:
# The maximum number of seconds to wait if a queue is temporarily suspended.
# This is not applicable when a queue is suspended but does not specify
# how long to wait before attempting to resume.
suspendMaximumWait: 30
......@@ -56,6 +56,8 @@ parameters:
maxAge: false
supportsCredentials: false
tempstore.expire: 604800
queue.config:
suspendMaximumWait: 30.0
services:
# Simple cache contexts, directly derived from the request context.
cache_context.ip:
......@@ -390,7 +392,7 @@ services:
arguments: [ '@service_container' ]
cron:
class: Drupal\Core\Cron
arguments: ['@module_handler', '@lock', '@queue', '@state', '@account_switcher', '@logger.channel.cron', '@plugin.manager.queue_worker', '@datetime.time']
arguments: ['@module_handler', '@lock', '@queue', '@state', '@account_switcher', '@logger.channel.cron', '@plugin.manager.queue_worker', '@datetime.time', '%queue.config%']
lazy: true
Drupal\Core\CronInterface: '@cron'
diff.formatter:
......
......@@ -9,6 +9,8 @@
use Drupal\Core\Lock\LockBackendInterface;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Queue\DelayableQueueInterface;
use Drupal\Core\Queue\QueueInterface;
use Drupal\Core\Queue\QueueWorkerInterface;
use Drupal\Core\Queue\QueueWorkerManagerInterface;
use Drupal\Core\Queue\DelayedRequeueException;
use Drupal\Core\Queue\RequeueException;
......@@ -80,6 +82,13 @@ class Cron implements CronInterface {
*/
protected $time;
/**
* The queue config.
*
* @var array
*/
protected array $queueConfig;
/**
* Constructs a cron object.
*
......@@ -97,10 +106,12 @@ class Cron implements CronInterface {
* A logger instance.
* @param \Drupal\Core\Queue\QueueWorkerManagerInterface $queue_manager
* The queue plugin manager.
* @param \Drupal\Component\Datetime\TimeInterface $time
* @param \Drupal\Component\Datetime\TimeInterface|null $time
* The time service.
* @param mixed[]|null $queue_config
* Queue configuration from the service container.
*/
public function __construct(ModuleHandlerInterface $module_handler, LockBackendInterface $lock, QueueFactory $queue_factory, StateInterface $state, AccountSwitcherInterface $account_switcher, LoggerInterface $logger, QueueWorkerManagerInterface $queue_manager, TimeInterface $time = NULL) {
public function __construct(ModuleHandlerInterface $module_handler, LockBackendInterface $lock, QueueFactory $queue_factory, StateInterface $state, AccountSwitcherInterface $account_switcher, LoggerInterface $logger, QueueWorkerManagerInterface $queue_manager, TimeInterface $time = NULL, ?array $queue_config = NULL) {
$this->moduleHandler = $module_handler;
$this->lock = $lock;
$this->queueFactory = $queue_factory;
......@@ -108,7 +119,18 @@ public function __construct(ModuleHandlerInterface $module_handler, LockBackendI
$this->accountSwitcher = $account_switcher;
$this->logger = $logger;
$this->queueManager = $queue_manager;
$this->time = $time ?: \Drupal::service('datetime.time');
if (!isset($time)) {
@trigger_error('Calling ' . __METHOD__ . '() without the $time argument is deprecated in drupal:10.1.0 and will be required in drupal:11.0.0. See https://www.drupal.org/node/3343743', E_USER_DEPRECATED);
$time = \Drupal::service('datetime.time');
}
$this->time = $time;
if (!isset($queue_config)) {
@trigger_error('Calling ' . __METHOD__ . '() without the $queue_config argument is deprecated in drupal:10.1.0 and will be required in drupal:11.0.0. See https://www.drupal.org/node/3343743', E_USER_DEPRECATED);
$queue_config = \Drupal::getContainer()->getParameter('queue.config');
}
$this->queueConfig = $queue_config + [
'suspendMaximumWait' => 30.0,
];
}
/**
......@@ -167,57 +189,121 @@ protected function setCronLastTime() {
* Processes cron queues.
*/
protected function processQueues() {
// Grab the defined cron queues.
foreach ($this->queueManager->getDefinitions() as $queue_name => $info) {
if (isset($info['cron'])) {
// Make sure every queue exists. There is no harm in trying to recreate
// an existing queue.
$this->queueFactory->get($queue_name)->createQueue();
$queue_worker = $this->queueManager->createInstance($queue_name);
$end = $this->time->getCurrentTime() + $info['cron']['time'];
$queue = $this->queueFactory->get($queue_name);
$lease_time = $info['cron']['time'];
while ($this->time->getCurrentTime() < $end && ($item = $queue->claimItem($lease_time))) {
try {
$queue_worker->processItem($item->data);
$queue->deleteItem($item);
}
catch (DelayedRequeueException $e) {
// The worker requested the task not be immediately re-queued.
// - If the queue doesn't support ::delayItem(), we should leave the
// item's current expiry time alone.
// - If the queue does support ::delayItem(), we should allow the
// queue to update the item's expiry using the requested delay.
if ($queue instanceof DelayableQueueInterface) {
// This queue can handle a custom delay; use the duration provided
// by the exception.
$queue->delayItem($item, $e->getDelay());
}
}
catch (RequeueException $e) {
// The worker requested the task be immediately requeued.
$queue->releaseItem($item);
}
catch (SuspendQueueException $e) {
// If the worker indicates the whole queue should be skipped,
// release the item and go to the next queue.
$queue->releaseItem($item);
$this->logger->debug('A worker for @queue queue suspended further processing of the queue.', [
'@queue' => $queue_name,
]);
// Skip to the next queue.
continue 2;
}
catch (\Exception $e) {
// In case of any other kind of exception, log it and leave the item
// in the queue to be processed again later.
watchdog_exception('cron', $e);
}
$max_wait = (float) $this->queueConfig['suspendMaximumWait'];
$queues = array_filter(
array_values($this->queueManager->getDefinitions()),
function (array $queueInfo) {
return isset($queueInfo['cron']);
}
);
// Build a stack of queues to work on.
/** @var array<array{process_from: int<0, max>, queue: \Drupal\Core\Queue\QueueInterface, worker: \Drupal\Core\Queue\QueueWorkerInterface}> $queues */
$queues = array_map(function (array $queue_info) {
$queue_name = $queue_info['id'];
$queue = $this->queueFactory->get($queue_name);
// Make sure every queue exists. There is no harm in trying to recreate
// an existing queue.
$queue->createQueue();
$worker = $this->queueManager->createInstance($queue_name);
return [
// Set process_from to zero so each queue is always processed
// immediately for the first time. This process_from timestamp will
// change if a queue throws a delayable SuspendQueueException.
'process_from' => 0,
'queue' => $queue,
'worker' => $worker,
];
}, $queues);
// Work through stack of queues, re-adding to the stack when a delay is
// necessary.
while ($item = array_shift($queues)) {
[
'queue' => $queue,
'worker' => $worker,
'process_from' => $process_from,
] = $item;
// Each queue will be processed immediately when it is reached for the
// first time, as zero > currentTime will never be true.
if ($process_from > $this->time->getCurrentMicroTime()) {
$this->usleep(round($process_from - $this->time->getCurrentMicroTime(), 3) * 1000000);
}
try {
$this->processQueue($queue, $worker);
}
catch (SuspendQueueException $e) {
// Return to this queue after processing other queues if the delay is
// within the threshold.
if ($e->isDelayable() && ($e->getDelay() < $max_wait)) {
$item['process_from'] = $this->time->getCurrentMicroTime() + $e->getDelay();
// Place this queue back in the stack for processing later.
array_push($queues, $item);
}
}
// Reorder the queue by next 'process_from' timestamp.
usort($queues, function (array $queueA, array $queueB) {
return $queueA['process_from'] <=> $queueB['process_from'];
});
}
}
/**
* Processes a cron queue.
*
* @param \Drupal\Core\Queue\QueueInterface $queue
* The queue.
* @param \Drupal\Core\Queue\QueueWorkerInterface $worker
* The queue worker.
*
* @throws \Drupal\Core\Queue\SuspendQueueException
* If the queue was suspended.
*/
protected function processQueue(QueueInterface $queue, QueueWorkerInterface $worker) {
$lease_time = $worker->getPluginDefinition()['cron']['time'];
$end = $this->time->getCurrentTime() + $lease_time;
while ($this->time->getCurrentTime() < $end && ($item = $queue->claimItem($lease_time))) {
try {
$worker->processItem($item->data);
$queue->deleteItem($item);
}
catch (DelayedRequeueException $e) {
// The worker requested the task not be immediately re-queued.
// - If the queue doesn't support ::delayItem(), we should leave the
// item's current expiry time alone.
// - If the queue does support ::delayItem(), we should allow the
// queue to update the item's expiry using the requested delay.
if ($queue instanceof DelayableQueueInterface) {
// This queue can handle a custom delay; use the duration provided
// by the exception.
$queue->delayItem($item, $e->getDelay());
}
}
catch (RequeueException) {
// The worker requested the task be immediately requeued.
$queue->releaseItem($item);
}
catch (SuspendQueueException $e) {
// If the worker indicates the whole queue should be skipped, release
// the item and go to the next queue.
$queue->releaseItem($item);
$this->logger->debug('A worker for @queue queue suspended further processing of the queue.', [
'@queue' => $worker->getPluginId(),
]);
// Skip to the next queue.
throw $e;
}
catch (\Exception $e) {
// In case of any other kind of exception, log it and leave the item
// in the queue to be processed again later.
watchdog_exception('cron', $e);
}
}
}
......@@ -266,4 +352,14 @@ protected function invokeCronHandlers() {
}
}
/**
* Delay execution in microseconds.
*
* @param int $microseconds
* Halt time in microseconds.
*/
protected function usleep(int $microseconds): void {
usleep($microseconds);
}
}
......@@ -12,4 +12,54 @@
* that workers of subsequent items would encounter it too. For example, if a
* remote site that the queue worker depends on appears to be inaccessible.
*/
class SuspendQueueException extends \RuntimeException {}
class SuspendQueueException extends \RuntimeException {
/**
* Seconds to wait before resuming the queue, or NULL if unknown.
*
* @var float|null
*/
protected $delay = NULL;
/**
* Constructs a SuspendQueueException.
*
* @param string $message
* The error message.
* @param int $code
* The error code.
* @param \Throwable|null $previous
* The previous throwable used for the exception chaining.
* @param float|null $delay
* If the time for when the queue will be ready to resume processing is
* known, pass an interval in seconds. Otherwise NULL if the time to resume
* processing the queue is not known.
*/
public function __construct(string $message = '', int $code = 0, \Throwable $previous = NULL, ?float $delay = NULL) {
parent::__construct($message, $code, $previous);
$this->delay = $delay;
}
/**
* Get the desired delay interval for this item.
*
* @return float|null
* If the time for when the queue will be ready to resume processing is
* known, pass an interval in seconds. Otherwise NULL if the time to resume
* processing the queue is not known.
*/
public function getDelay(): ?float {
return $this->delay;
}
/**
* Determine whether the next time the queue should be checked is known.
*
* @return bool
* Whether the time to resume processing the queue is known.
*/
public function isDelayable(): bool {
return isset($this->delay);
}
}
......@@ -73,6 +73,7 @@ protected function setUp(): void {
$time = $this->prophesize('Drupal\Component\Datetime\TimeInterface');
$time->getCurrentTime()->willReturn($this->currentTime);
$time->getCurrentMicroTime()->willReturn(100.0);
$time->getRequestTime()->willReturn($this->currentTime);
\Drupal::getContainer()->set('datetime.time', $time->reveal());
$this->assertEquals($this->currentTime, \Drupal::time()->getCurrentTime());
......
<?php
namespace Drupal\Tests\Core\Cron;
use Drupal\Component\Datetime\TimeInterface;
use Drupal\Core\Config\ConfigFactoryInterface;
use Drupal\Core\Config\ImmutableConfig;
use Drupal\Core\Cron;
use Drupal\Core\DependencyInjection\ContainerBuilder;
use Drupal\Core\Extension\ModuleHandlerInterface;
use Drupal\Core\Lock\LockBackendInterface;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Queue\QueueInterface;
use Drupal\Core\Queue\QueueWorkerInterface;
use Drupal\Core\Queue\QueueWorkerManagerInterface;
use Drupal\Core\Queue\SuspendQueueException;
use Drupal\Core\Session\AccountSwitcherInterface;
use Drupal\Core\State\StateInterface;
use Drupal\Tests\UnitTestCase;
use Psr\Log\LoggerInterface;
/**
* Test Cron handling of suspended queues with a delay.
*
* @group Cron
* @covers \Drupal\Core\Queue\SuspendQueueException
* @coversDefaultClass \Drupal\Core\Cron
*/
final class CronSuspendQueueDelayTest extends UnitTestCase {
/**
* Constructor arguments for \Drupal\Core\Cron.
*
* @var object[]|\PHPUnit\Framework\MockObject\MockObject[]
*/
protected $cronConstructorArguments;
/**
* A worker for testing.
*
* @var \Drupal\Core\Queue\QueueWorkerInterface|\PHPUnit\Framework\MockObject\MockObject
*/
protected $workerA;
/**
* A worker for testing.
*
* @var \Drupal\Core\Queue\QueueWorkerInterface|\PHPUnit\Framework\MockObject\MockObject
*/
protected $workerB;
/**
* {@inheritdoc}
*/
protected function setUp(): void {
parent::setUp();
$lock = $this->createMock(LockBackendInterface::class);
$lock->expects($this->any())
->method('acquire')
->willReturn(TRUE);
$this->cronConstructorArguments = [
'module_handler' => $this->createMock(ModuleHandlerInterface::class),
'lock' => $lock,
'queue_factory' => $this->createMock(QueueFactory::class),
'state' => $this->createMock(StateInterface::class),
'account_switcher' => $this->createMock(AccountSwitcherInterface::class),
'logger' => $this->createMock(LoggerInterface::class),
'queue_manager' => $this->createMock(QueueWorkerManagerInterface::class),
'time' => $this->createMock(TimeInterface::class),
'queue_config' => [],
];
// Capture logs to watchdog_exception().
$config = $this->createMock(ImmutableConfig::class);
$config->expects($this->any())
->method('get')
->with('logging')
->willReturn(0);
$configFactory = $this->createMock(ConfigFactoryInterface::class);
$configFactory->expects($this->any())
->method('get')
->with('system.cron')
->willReturn($config);
$container = new ContainerBuilder();
$container->set('config.factory', $configFactory);
\Drupal::setContainer($container);
$this->workerA = $this->createMock(QueueWorkerInterface::class);
$this->workerA->expects($this->any())
->method('getPluginDefinition')
->willReturn([
'cron' => [
'time' => 300,
],
]);
$this->workerB = $this->createMock(QueueWorkerInterface::class);
$this->workerB->expects($this->any())
->method('getPluginDefinition')
->willReturn([
'cron' => [
'time' => 300,
],
]);
}
/**
* Tests a queue is reprocessed again after other queues.
*
* Two queues are created:
* - test_worker_a.
* - test_worker_b.
*
* Queues and items are processed:
* - test_worker_a:
* - item throws SuspendQueueException with 2.0 delay.
* - test_worker_b:
* - item executes normally.
* - test_worker_a:
* - item throws SuspendQueueException with 3.0 delay.
* - test_worker_a:
* - no items remaining, quits.
*/
public function testSuspendQueue(): void {
[
'queue_factory' => $queueFactory,
'queue_manager' => $queueManager,
'time' => $time,
] = $this->cronConstructorArguments;
$cron = $this->getMockBuilder(Cron::class)
->onlyMethods(['usleep'])
->setConstructorArgs($this->cronConstructorArguments)
->getMock();
$cron->expects($this->exactly(2))
->method('usleep')
->withConsecutive(
[$this->equalTo(2000000)],
[$this->equalTo(3000000)],
);
$queueManager->expects($this->once())
->method('getDefinitions')
->willReturn([
'test_worker_a' => [
'id' => 'test_worker_a',
'cron' => ['time' => 300],
],
'test_worker_b' => [
'id' => 'test_worker_b',
'cron' => ['time' => 300],
],
]);
$queueA = $this->createMock(QueueInterface::class);
$queueB = $this->createMock(QueueInterface::class);
$queueFactory->expects($this->exactly(2))
->method('get')
->willReturnMap([
['test_worker_a', FALSE, $queueA],
['test_worker_b', FALSE, $queueB],
]);
// Expect this queue to be processed twice.
$queueA->expects($this->exactly(3))
->method('claimItem')
->willReturnOnConsecutiveCalls(
// First run will suspend for 2 seconds.
(object) ['data' => 'test_data_a1'],
// Second run will suspend for 3 seconds.
(object) ['data' => 'test_data_a2'],
// This will terminate the queue normally.
FALSE,
);
// Expect this queue to be processed once.
$queueB->expects($this->exactly(2))
->method('claimItem')
->willReturnOnConsecutiveCalls(
(object) ['data' => 'test_data_b1'],
// This will terminate the queue normally.
FALSE,
);
$queueManager->expects($this->any())
->method('createInstance')
->willReturnMap([
['test_worker_a', [], $this->workerA],
['test_worker_b', [], $this->workerB],
]);
$this->workerA->expects($this->exactly(2))
->method('processItem')
->with($this->anything())
->willReturnOnConsecutiveCalls(
$this->throwException(new SuspendQueueException('', 0, NULL, 2.0)),
$this->throwException(new SuspendQueueException('', 0, NULL, 3.0))
);
$this->workerB->expects($this->once())
->method('processItem')
->with('test_data_b1');
$time->expects($this->any())
->method('getCurrentTime')
->willReturn(60);
$cron->run();
}
/**
* Tests queues may be re-processed by whether delay exceeds threshold.
*
* Cron will pause and reprocess a queue after a delay if a worker throws
* a SuspendQueueException with a delay time not exceeding the maximum wait
* config.
*
* @param float $threshold
* The configured threshold.
* @param float $suspendQueueDelay
* An interval in seconds a worker will suspend the queue.
* @param bool $expectQueueDelay
* Whether to expect cron to sleep and re-process the queue.
*
* @dataProvider providerSuspendQueueThreshold
*/
public function testSuspendQueueThreshold(float $threshold, float $suspendQueueDelay, bool $expectQueueDelay): void {
$this->cronConstructorArguments['queue_config'] = [
'suspendMaximumWait' => $threshold,
];
[
'queue_factory' => $queueFactory,
'queue_manager' => $queueManager,
] = $this->cronConstructorArguments;
$cron = $this->getMockBuilder(Cron::class)
->onlyMethods(['usleep'])
->setConstructorArgs($this->cronConstructorArguments)
->getMock();
$cron->expects($expectQueueDelay ? $this->once() : $this->never())
->method('usleep');
$queueManager->expects($this->once())
->method('getDefinitions')
->willReturn([
'test_worker' => [
'id' => 'test_worker',
'cron' => 300,
],
]);
$queue = $this->createMock(QueueInterface::class);
$queueFactory->expects($this->once())
->method('get')
->willReturnMap([
['test_worker', FALSE, $queue],
]);
$queue->expects($this->exactly($expectQueueDelay ? 2 : 1))
->method('claimItem')
->willReturnOnConsecutiveCalls(
(object) ['data' => 'test_data'],
FALSE,
);
$queueManager->expects($this->exactly(1))
->method('createInstance')
->with('test_worker')
->willReturn($this->workerA);
$this->workerA->expects($this->once())
->method('processItem')
->with($this->anything())
->willReturnOnConsecutiveCalls(
$this->throwException(new SuspendQueueException('', 0, NULL, $suspendQueueDelay)),
);
$cron->run();
}
/**
* Data for testing.
*
* @return array
* Scenarios for testing.
*/
public function providerSuspendQueueThreshold(): array {
$scenarios = [];
$scenarios['cron will wait for the queue, and rerun'] = [
15.0,
10.0,
TRUE,
];
$scenarios['cron will not wait for the queue, and exit'] = [
15.0,
20.0,
FALSE,
];
return $scenarios;
}
/**
* Tests queues are executed in order.
*
* If multiple queues are delayed, they must execute in order of time.
*/
public function testSuspendQueueOrder(): void {
[
'queue_factory' => $queueFactory,
'queue_manager' => $queueManager,
'time' => $time,
] = $this->cronConstructorArguments;
$cron = $this->getMockBuilder(Cron::class)
->onlyMethods(['usleep'])
->setConstructorArgs($this->cronConstructorArguments)
->getMock();
$cron->expects($this->any())
->method('usleep');
$queueManager->expects($this->once())
->method('getDefinitions')
->willReturn([
'test_worker_a' => [
'id' => 'test_worker_a',
'cron' => ['time' => 300],
],
'test_worker_b' => [
'id' => 'test_worker_b',
'cron' => ['time' => 300],
],
'test_worker_c' => [
'id' => 'test_worker_c',
'cron' => ['time' => 300],
],
'test_worker_d' => [
'id' => 'test_worker_d',
'cron' => ['time' => 300],
],
]);
$queueA = $this->createMock(QueueInterface::class);
$queueB = $this->createMock(QueueInterface::class);
$queueC = $this->createMock(QueueInterface::class);
$queueD = $this->createMock(QueueInterface::class);
$queueFactory->expects($this->exactly(4))
->method('get')
->willReturnMap([
['test_worker_a', FALSE, $queueA],
['test_worker_b', FALSE, $queueB],
['test_worker_c', FALSE, $queueC],
['test_worker_d', FALSE, $queueD],
]);
$queueA->expects($this->any())
->method('claimItem')
->willReturnOnConsecutiveCalls(
(object) ['data' => 'test_data_from_queue_a'],
FALSE,
);
$queueB->expects($this->any())
->method('claimItem')
->willReturnOnConsecutiveCalls(
(object) ['data' => 'test_data_from_queue_b'],
(object) ['data' => 'test_data_from_queue_b'],
FALSE,
);
$queueC->expects($this->any())
->method('claimItem')
->willReturnOnConsecutiveCalls(
(object) ['data' => 'test_data_from_queue_c'],
(object) ['data' => 'test_data_from_queue_c'],
FALSE,
);
$queueD->expects($this->any())
->method('claimItem')
->willReturnOnConsecutiveCalls(
(object) ['data' => 'test_data_from_queue_d'],
FALSE,
);
// Recycle the same worker for all queues to test order sanely:
$queueManager->expects($this->any())
->method('createInstance')
->willReturnMap([
['test_worker_a', [], $this->workerA],
['test_worker_b', [], $this->workerA],
['test_worker_c', [], $this->workerA],
['test_worker_d', [], $this->workerA],
]);
$this->workerA->expects($this->exactly(6))
->method('processItem')
->withConsecutive(
// All queues are executed in sequence of definition:
[$this->equalTo('test_data_from_queue_a')],
[$this->equalTo('test_data_from_queue_b')],
[$this->equalTo('test_data_from_queue_c')],
[$this->equalTo('test_data_from_queue_d')],
// Queue C is executed again, and before queue B.
[$this->equalTo('test_data_from_queue_c')],
// Queue B is executed again, after queue C since its delay was longer.
[$this->equalTo('test_data_from_queue_b')],
)
->willReturnOnConsecutiveCalls(
NULL,
$this->throwException(new SuspendQueueException('', 0, NULL, 16.0)),
$this->throwException(new SuspendQueueException('', 0, NULL, 8.0)),
NULL,
NULL,
NULL,
);
$currentTime = 60;
$time->expects($this->any())
->method('getCurrentTime')
->willReturnCallback(function () use (&$currentTime): int {
return (int) $currentTime;
});
$time->expects($this->any())
->method('getCurrentMicroTime')
->willReturnCallback(function () use (&$currentTime): float {
return (float) $currentTime;
});
$cron->expects($this->exactly(2))
->method('usleep')
->withConsecutive(
// Expect to wait for 8 seconds.
[
$this->callback(function (int $microseconds) use (&$currentTime) {
// Accelerate time by 4 seconds.
$currentTime += 4;
return $microseconds === 8000000;
}),
],
// SuspendQueueException requests to delay by 16 seconds, but 4 seconds
// have passed above, so there are just 12 seconds remaining:
[$this->equalTo(12000000)],
);
$cron->run();
}
}
......@@ -102,6 +102,12 @@ protected function setUp(): void {
$queue_worker_manager = $this->prophesize('Drupal\Core\Queue\QueueWorkerManagerInterface');
$state = $this->prophesize('Drupal\Core\State\StateInterface');
$account_switcher = $this->prophesize('Drupal\Core\Session\AccountSwitcherInterface');
$queueConfig = [
'suspendMaximumWait' => 30.0,
];
// Create a lock that will always fail when attempting to acquire; we're
// only interested in testing ::processQueues(), not the other stuff.
$lock_backend = $this->prophesize('Drupal\Core\Lock\LockBackendInterface');
$lock_backend->acquire('cron', Argument::cetera())->willReturn(TRUE);
$lock_backend->release('cron')->shouldBeCalled();
......@@ -121,6 +127,8 @@ protected function setUp(): void {
// Create a mock queue worker plugin instance based on above definition.
$queue_worker_plugin = $this->prophesize('Drupal\Core\Queue\QueueWorkerInterface');
$queue_worker_plugin->getPluginId()->willReturn($queue_worker);
$queue_worker_plugin->getPluginDefinition()->willReturn($queue_worker_definition);
$queue_worker_plugin->processItem('Complete')->willReturn();
$queue_worker_plugin->processItem('Exception')->willThrow(\Exception::class);
$queue_worker_plugin->processItem('DelayedRequeueException')->willThrow(DelayedRequeueException::class);
......@@ -147,7 +155,7 @@ protected function setUp(): void {
$queue_worker_manager->createInstance($queue_worker)->willReturn($queue_worker_plugin->reveal());
// Construct the Cron class to test.
$this->cron = new Cron($module_handler->reveal(), $lock_backend->reveal(), $queue_factory->reveal(), $state->reveal(), $account_switcher->reveal(), $logger->reveal(), $queue_worker_manager->reveal(), $time->reveal());
$this->cron = new Cron($module_handler->reveal(), $lock_backend->reveal(), $queue_factory->reveal(), $state->reveal(), $account_switcher->reveal(), $logger->reveal(), $queue_worker_manager->reveal(), $time->reveal(), $queueConfig);
}
/**
......
......@@ -213,3 +213,9 @@ parameters:
maxAge: false
# Sets the Access-Control-Allow-Credentials header.
supportsCredentials: false
queue.config:
# The maximum number of seconds to wait if a queue is temporarily suspended.
# This is not applicable when a queue is suspended but does not specify
# how long to wait before attempting to resume.
suspendMaximumWait: 30
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment