diff --git a/core/assets/scaffold/files/default.services.yml b/core/assets/scaffold/files/default.services.yml index b4d27e05a56f101ca0204d2553e0e5c7f643260f..eb530088efafbdd97b26debce9cffbe11d6c1583 100644 --- a/core/assets/scaffold/files/default.services.yml +++ b/core/assets/scaffold/files/default.services.yml @@ -213,3 +213,9 @@ parameters: maxAge: false # Sets the Access-Control-Allow-Credentials header. supportsCredentials: false + + queue.config: + # The maximum number of seconds to wait if a queue is temporarily suspended. + # This is not applicable when a queue is suspended but does not specify + # how long to wait before attempting to resume. + suspendMaximumWait: 30 diff --git a/core/core.services.yml b/core/core.services.yml index c60a0ddccaf9e3af76c88e8ad3252318eb39e9cb..1c5d787c572734d0610901890e64ee1c4d7eb979 100644 --- a/core/core.services.yml +++ b/core/core.services.yml @@ -56,6 +56,8 @@ parameters: maxAge: false supportsCredentials: false tempstore.expire: 604800 + queue.config: + suspendMaximumWait: 30.0 services: # Simple cache contexts, directly derived from the request context. cache_context.ip: @@ -390,7 +392,7 @@ services: arguments: [ '@service_container' ] cron: class: Drupal\Core\Cron - arguments: ['@module_handler', '@lock', '@queue', '@state', '@account_switcher', '@logger.channel.cron', '@plugin.manager.queue_worker', '@datetime.time'] + arguments: ['@module_handler', '@lock', '@queue', '@state', '@account_switcher', '@logger.channel.cron', '@plugin.manager.queue_worker', '@datetime.time', '%queue.config%'] lazy: true Drupal\Core\CronInterface: '@cron' diff.formatter: diff --git a/core/lib/Drupal/Core/Cron.php b/core/lib/Drupal/Core/Cron.php index d8e4243207084783de7dd0dc6df92b6d25e8fd7c..0dad9211313c892421077a212204fcaae4184ebd 100644 --- a/core/lib/Drupal/Core/Cron.php +++ b/core/lib/Drupal/Core/Cron.php @@ -9,6 +9,8 @@ use Drupal\Core\Lock\LockBackendInterface; use Drupal\Core\Queue\QueueFactory; use Drupal\Core\Queue\DelayableQueueInterface; +use Drupal\Core\Queue\QueueInterface; +use Drupal\Core\Queue\QueueWorkerInterface; use Drupal\Core\Queue\QueueWorkerManagerInterface; use Drupal\Core\Queue\DelayedRequeueException; use Drupal\Core\Queue\RequeueException; @@ -80,6 +82,13 @@ class Cron implements CronInterface { */ protected $time; + /** + * The queue config. + * + * @var array + */ + protected array $queueConfig; + /** * Constructs a cron object. * @@ -97,10 +106,12 @@ class Cron implements CronInterface { * A logger instance. * @param \Drupal\Core\Queue\QueueWorkerManagerInterface $queue_manager * The queue plugin manager. - * @param \Drupal\Component\Datetime\TimeInterface $time + * @param \Drupal\Component\Datetime\TimeInterface|null $time * The time service. + * @param mixed[]|null $queue_config + * Queue configuration from the service container. */ - public function __construct(ModuleHandlerInterface $module_handler, LockBackendInterface $lock, QueueFactory $queue_factory, StateInterface $state, AccountSwitcherInterface $account_switcher, LoggerInterface $logger, QueueWorkerManagerInterface $queue_manager, TimeInterface $time = NULL) { + public function __construct(ModuleHandlerInterface $module_handler, LockBackendInterface $lock, QueueFactory $queue_factory, StateInterface $state, AccountSwitcherInterface $account_switcher, LoggerInterface $logger, QueueWorkerManagerInterface $queue_manager, TimeInterface $time = NULL, ?array $queue_config = NULL) { $this->moduleHandler = $module_handler; $this->lock = $lock; $this->queueFactory = $queue_factory; @@ -108,7 +119,18 @@ public function __construct(ModuleHandlerInterface $module_handler, LockBackendI $this->accountSwitcher = $account_switcher; $this->logger = $logger; $this->queueManager = $queue_manager; - $this->time = $time ?: \Drupal::service('datetime.time'); + if (!isset($time)) { + @trigger_error('Calling ' . __METHOD__ . '() without the $time argument is deprecated in drupal:10.1.0 and will be required in drupal:11.0.0. See https://www.drupal.org/node/3343743', E_USER_DEPRECATED); + $time = \Drupal::service('datetime.time'); + } + $this->time = $time; + if (!isset($queue_config)) { + @trigger_error('Calling ' . __METHOD__ . '() without the $queue_config argument is deprecated in drupal:10.1.0 and will be required in drupal:11.0.0. See https://www.drupal.org/node/3343743', E_USER_DEPRECATED); + $queue_config = \Drupal::getContainer()->getParameter('queue.config'); + } + $this->queueConfig = $queue_config + [ + 'suspendMaximumWait' => 30.0, + ]; } /** @@ -167,57 +189,121 @@ protected function setCronLastTime() { * Processes cron queues. */ protected function processQueues() { - // Grab the defined cron queues. - foreach ($this->queueManager->getDefinitions() as $queue_name => $info) { - if (isset($info['cron'])) { - // Make sure every queue exists. There is no harm in trying to recreate - // an existing queue. - $this->queueFactory->get($queue_name)->createQueue(); - - $queue_worker = $this->queueManager->createInstance($queue_name); - $end = $this->time->getCurrentTime() + $info['cron']['time']; - $queue = $this->queueFactory->get($queue_name); - $lease_time = $info['cron']['time']; - while ($this->time->getCurrentTime() < $end && ($item = $queue->claimItem($lease_time))) { - try { - $queue_worker->processItem($item->data); - $queue->deleteItem($item); - } - catch (DelayedRequeueException $e) { - // The worker requested the task not be immediately re-queued. - // - If the queue doesn't support ::delayItem(), we should leave the - // item's current expiry time alone. - // - If the queue does support ::delayItem(), we should allow the - // queue to update the item's expiry using the requested delay. - if ($queue instanceof DelayableQueueInterface) { - // This queue can handle a custom delay; use the duration provided - // by the exception. - $queue->delayItem($item, $e->getDelay()); - } - } - catch (RequeueException $e) { - // The worker requested the task be immediately requeued. - $queue->releaseItem($item); - } - catch (SuspendQueueException $e) { - // If the worker indicates the whole queue should be skipped, - // release the item and go to the next queue. - $queue->releaseItem($item); - - $this->logger->debug('A worker for @queue queue suspended further processing of the queue.', [ - '@queue' => $queue_name, - ]); - - // Skip to the next queue. - continue 2; - } - catch (\Exception $e) { - // In case of any other kind of exception, log it and leave the item - // in the queue to be processed again later. - watchdog_exception('cron', $e); - } + $max_wait = (float) $this->queueConfig['suspendMaximumWait']; + + $queues = array_filter( + array_values($this->queueManager->getDefinitions()), + function (array $queueInfo) { + return isset($queueInfo['cron']); + } + ); + + // Build a stack of queues to work on. + /** @var array<array{process_from: int<0, max>, queue: \Drupal\Core\Queue\QueueInterface, worker: \Drupal\Core\Queue\QueueWorkerInterface}> $queues */ + $queues = array_map(function (array $queue_info) { + $queue_name = $queue_info['id']; + $queue = $this->queueFactory->get($queue_name); + // Make sure every queue exists. There is no harm in trying to recreate + // an existing queue. + $queue->createQueue(); + $worker = $this->queueManager->createInstance($queue_name); + return [ + // Set process_from to zero so each queue is always processed + // immediately for the first time. This process_from timestamp will + // change if a queue throws a delayable SuspendQueueException. + 'process_from' => 0, + 'queue' => $queue, + 'worker' => $worker, + ]; + }, $queues); + + // Work through stack of queues, re-adding to the stack when a delay is + // necessary. + while ($item = array_shift($queues)) { + [ + 'queue' => $queue, + 'worker' => $worker, + 'process_from' => $process_from, + ] = $item; + + // Each queue will be processed immediately when it is reached for the + // first time, as zero > currentTime will never be true. + if ($process_from > $this->time->getCurrentMicroTime()) { + $this->usleep(round($process_from - $this->time->getCurrentMicroTime(), 3) * 1000000); + } + + try { + $this->processQueue($queue, $worker); + } + catch (SuspendQueueException $e) { + // Return to this queue after processing other queues if the delay is + // within the threshold. + if ($e->isDelayable() && ($e->getDelay() < $max_wait)) { + $item['process_from'] = $this->time->getCurrentMicroTime() + $e->getDelay(); + // Place this queue back in the stack for processing later. + array_push($queues, $item); } } + + // Reorder the queue by next 'process_from' timestamp. + usort($queues, function (array $queueA, array $queueB) { + return $queueA['process_from'] <=> $queueB['process_from']; + }); + } + } + + /** + * Processes a cron queue. + * + * @param \Drupal\Core\Queue\QueueInterface $queue + * The queue. + * @param \Drupal\Core\Queue\QueueWorkerInterface $worker + * The queue worker. + * + * @throws \Drupal\Core\Queue\SuspendQueueException + * If the queue was suspended. + */ + protected function processQueue(QueueInterface $queue, QueueWorkerInterface $worker) { + $lease_time = $worker->getPluginDefinition()['cron']['time']; + $end = $this->time->getCurrentTime() + $lease_time; + while ($this->time->getCurrentTime() < $end && ($item = $queue->claimItem($lease_time))) { + try { + $worker->processItem($item->data); + $queue->deleteItem($item); + } + catch (DelayedRequeueException $e) { + // The worker requested the task not be immediately re-queued. + // - If the queue doesn't support ::delayItem(), we should leave the + // item's current expiry time alone. + // - If the queue does support ::delayItem(), we should allow the + // queue to update the item's expiry using the requested delay. + if ($queue instanceof DelayableQueueInterface) { + // This queue can handle a custom delay; use the duration provided + // by the exception. + $queue->delayItem($item, $e->getDelay()); + } + } + catch (RequeueException) { + // The worker requested the task be immediately requeued. + $queue->releaseItem($item); + } + catch (SuspendQueueException $e) { + // If the worker indicates the whole queue should be skipped, release + // the item and go to the next queue. + $queue->releaseItem($item); + + $this->logger->debug('A worker for @queue queue suspended further processing of the queue.', [ + '@queue' => $worker->getPluginId(), + ]); + + // Skip to the next queue. + throw $e; + } + catch (\Exception $e) { + // In case of any other kind of exception, log it and leave the item + // in the queue to be processed again later. + watchdog_exception('cron', $e); + } } } @@ -266,4 +352,14 @@ protected function invokeCronHandlers() { } } + /** + * Delay execution in microseconds. + * + * @param int $microseconds + * Halt time in microseconds. + */ + protected function usleep(int $microseconds): void { + usleep($microseconds); + } + } diff --git a/core/lib/Drupal/Core/Queue/SuspendQueueException.php b/core/lib/Drupal/Core/Queue/SuspendQueueException.php index 66877c6fe111eb3881e8c2d2b09f0c360ecf38a0..3322c2af4e4977e50e7281df37e1c43bfdaf1f86 100644 --- a/core/lib/Drupal/Core/Queue/SuspendQueueException.php +++ b/core/lib/Drupal/Core/Queue/SuspendQueueException.php @@ -12,4 +12,54 @@ * that workers of subsequent items would encounter it too. For example, if a * remote site that the queue worker depends on appears to be inaccessible. */ -class SuspendQueueException extends \RuntimeException {} +class SuspendQueueException extends \RuntimeException { + + /** + * Seconds to wait before resuming the queue, or NULL if unknown. + * + * @var float|null + */ + protected $delay = NULL; + + /** + * Constructs a SuspendQueueException. + * + * @param string $message + * The error message. + * @param int $code + * The error code. + * @param \Throwable|null $previous + * The previous throwable used for the exception chaining. + * @param float|null $delay + * If the time for when the queue will be ready to resume processing is + * known, pass an interval in seconds. Otherwise NULL if the time to resume + * processing the queue is not known. + */ + public function __construct(string $message = '', int $code = 0, \Throwable $previous = NULL, ?float $delay = NULL) { + parent::__construct($message, $code, $previous); + $this->delay = $delay; + } + + /** + * Get the desired delay interval for this item. + * + * @return float|null + * If the time for when the queue will be ready to resume processing is + * known, pass an interval in seconds. Otherwise NULL if the time to resume + * processing the queue is not known. + */ + public function getDelay(): ?float { + return $this->delay; + } + + /** + * Determine whether the next time the queue should be checked is known. + * + * @return bool + * Whether the time to resume processing the queue is known. + */ + public function isDelayable(): bool { + return isset($this->delay); + } + +} diff --git a/core/modules/system/tests/src/Kernel/System/CronQueueTest.php b/core/modules/system/tests/src/Kernel/System/CronQueueTest.php index f177842f2114ad6fb10ef7ee9d2934eee1de33f7..3935454feca7f2af3bb87ffa79967075b06a0266 100644 --- a/core/modules/system/tests/src/Kernel/System/CronQueueTest.php +++ b/core/modules/system/tests/src/Kernel/System/CronQueueTest.php @@ -73,6 +73,7 @@ protected function setUp(): void { $time = $this->prophesize('Drupal\Component\Datetime\TimeInterface'); $time->getCurrentTime()->willReturn($this->currentTime); + $time->getCurrentMicroTime()->willReturn(100.0); $time->getRequestTime()->willReturn($this->currentTime); \Drupal::getContainer()->set('datetime.time', $time->reveal()); $this->assertEquals($this->currentTime, \Drupal::time()->getCurrentTime()); diff --git a/core/tests/Drupal/Tests/Core/Cron/CronSuspendQueueDelayTest.php b/core/tests/Drupal/Tests/Core/Cron/CronSuspendQueueDelayTest.php new file mode 100644 index 0000000000000000000000000000000000000000..f74ef7676eff0a1da45c0c45b62ca08880e2a5d9 --- /dev/null +++ b/core/tests/Drupal/Tests/Core/Cron/CronSuspendQueueDelayTest.php @@ -0,0 +1,445 @@ +<?php + +namespace Drupal\Tests\Core\Cron; + +use Drupal\Component\Datetime\TimeInterface; +use Drupal\Core\Config\ConfigFactoryInterface; +use Drupal\Core\Config\ImmutableConfig; +use Drupal\Core\Cron; +use Drupal\Core\DependencyInjection\ContainerBuilder; +use Drupal\Core\Extension\ModuleHandlerInterface; +use Drupal\Core\Lock\LockBackendInterface; +use Drupal\Core\Queue\QueueFactory; +use Drupal\Core\Queue\QueueInterface; +use Drupal\Core\Queue\QueueWorkerInterface; +use Drupal\Core\Queue\QueueWorkerManagerInterface; +use Drupal\Core\Queue\SuspendQueueException; +use Drupal\Core\Session\AccountSwitcherInterface; +use Drupal\Core\State\StateInterface; +use Drupal\Tests\UnitTestCase; +use Psr\Log\LoggerInterface; + +/** + * Test Cron handling of suspended queues with a delay. + * + * @group Cron + * @covers \Drupal\Core\Queue\SuspendQueueException + * @coversDefaultClass \Drupal\Core\Cron + */ +final class CronSuspendQueueDelayTest extends UnitTestCase { + + /** + * Constructor arguments for \Drupal\Core\Cron. + * + * @var object[]|\PHPUnit\Framework\MockObject\MockObject[] + */ + protected $cronConstructorArguments; + + /** + * A worker for testing. + * + * @var \Drupal\Core\Queue\QueueWorkerInterface|\PHPUnit\Framework\MockObject\MockObject + */ + protected $workerA; + + /** + * A worker for testing. + * + * @var \Drupal\Core\Queue\QueueWorkerInterface|\PHPUnit\Framework\MockObject\MockObject + */ + protected $workerB; + + /** + * {@inheritdoc} + */ + protected function setUp(): void { + parent::setUp(); + $lock = $this->createMock(LockBackendInterface::class); + $lock->expects($this->any()) + ->method('acquire') + ->willReturn(TRUE); + $this->cronConstructorArguments = [ + 'module_handler' => $this->createMock(ModuleHandlerInterface::class), + 'lock' => $lock, + 'queue_factory' => $this->createMock(QueueFactory::class), + 'state' => $this->createMock(StateInterface::class), + 'account_switcher' => $this->createMock(AccountSwitcherInterface::class), + 'logger' => $this->createMock(LoggerInterface::class), + 'queue_manager' => $this->createMock(QueueWorkerManagerInterface::class), + 'time' => $this->createMock(TimeInterface::class), + 'queue_config' => [], + ]; + + // Capture logs to watchdog_exception(). + $config = $this->createMock(ImmutableConfig::class); + $config->expects($this->any()) + ->method('get') + ->with('logging') + ->willReturn(0); + $configFactory = $this->createMock(ConfigFactoryInterface::class); + $configFactory->expects($this->any()) + ->method('get') + ->with('system.cron') + ->willReturn($config); + $container = new ContainerBuilder(); + $container->set('config.factory', $configFactory); + \Drupal::setContainer($container); + + $this->workerA = $this->createMock(QueueWorkerInterface::class); + $this->workerA->expects($this->any()) + ->method('getPluginDefinition') + ->willReturn([ + 'cron' => [ + 'time' => 300, + ], + ]); + + $this->workerB = $this->createMock(QueueWorkerInterface::class); + $this->workerB->expects($this->any()) + ->method('getPluginDefinition') + ->willReturn([ + 'cron' => [ + 'time' => 300, + ], + ]); + } + + /** + * Tests a queue is reprocessed again after other queues. + * + * Two queues are created: + * - test_worker_a. + * - test_worker_b. + * + * Queues and items are processed: + * - test_worker_a: + * - item throws SuspendQueueException with 2.0 delay. + * - test_worker_b: + * - item executes normally. + * - test_worker_a: + * - item throws SuspendQueueException with 3.0 delay. + * - test_worker_a: + * - no items remaining, quits. + */ + public function testSuspendQueue(): void { + [ + 'queue_factory' => $queueFactory, + 'queue_manager' => $queueManager, + 'time' => $time, + ] = $this->cronConstructorArguments; + + $cron = $this->getMockBuilder(Cron::class) + ->onlyMethods(['usleep']) + ->setConstructorArgs($this->cronConstructorArguments) + ->getMock(); + + $cron->expects($this->exactly(2)) + ->method('usleep') + ->withConsecutive( + [$this->equalTo(2000000)], + [$this->equalTo(3000000)], + ); + + $queueManager->expects($this->once()) + ->method('getDefinitions') + ->willReturn([ + 'test_worker_a' => [ + 'id' => 'test_worker_a', + 'cron' => ['time' => 300], + ], + 'test_worker_b' => [ + 'id' => 'test_worker_b', + 'cron' => ['time' => 300], + ], + ]); + + $queueA = $this->createMock(QueueInterface::class); + $queueB = $this->createMock(QueueInterface::class); + $queueFactory->expects($this->exactly(2)) + ->method('get') + ->willReturnMap([ + ['test_worker_a', FALSE, $queueA], + ['test_worker_b', FALSE, $queueB], + ]); + + // Expect this queue to be processed twice. + $queueA->expects($this->exactly(3)) + ->method('claimItem') + ->willReturnOnConsecutiveCalls( + // First run will suspend for 2 seconds. + (object) ['data' => 'test_data_a1'], + // Second run will suspend for 3 seconds. + (object) ['data' => 'test_data_a2'], + // This will terminate the queue normally. + FALSE, + ); + // Expect this queue to be processed once. + $queueB->expects($this->exactly(2)) + ->method('claimItem') + ->willReturnOnConsecutiveCalls( + (object) ['data' => 'test_data_b1'], + // This will terminate the queue normally. + FALSE, + ); + + $queueManager->expects($this->any()) + ->method('createInstance') + ->willReturnMap([ + ['test_worker_a', [], $this->workerA], + ['test_worker_b', [], $this->workerB], + ]); + + $this->workerA->expects($this->exactly(2)) + ->method('processItem') + ->with($this->anything()) + ->willReturnOnConsecutiveCalls( + $this->throwException(new SuspendQueueException('', 0, NULL, 2.0)), + $this->throwException(new SuspendQueueException('', 0, NULL, 3.0)) + ); + $this->workerB->expects($this->once()) + ->method('processItem') + ->with('test_data_b1'); + + $time->expects($this->any()) + ->method('getCurrentTime') + ->willReturn(60); + + $cron->run(); + } + + /** + * Tests queues may be re-processed by whether delay exceeds threshold. + * + * Cron will pause and reprocess a queue after a delay if a worker throws + * a SuspendQueueException with a delay time not exceeding the maximum wait + * config. + * + * @param float $threshold + * The configured threshold. + * @param float $suspendQueueDelay + * An interval in seconds a worker will suspend the queue. + * @param bool $expectQueueDelay + * Whether to expect cron to sleep and re-process the queue. + * + * @dataProvider providerSuspendQueueThreshold + */ + public function testSuspendQueueThreshold(float $threshold, float $suspendQueueDelay, bool $expectQueueDelay): void { + $this->cronConstructorArguments['queue_config'] = [ + 'suspendMaximumWait' => $threshold, + ]; + [ + 'queue_factory' => $queueFactory, + 'queue_manager' => $queueManager, + ] = $this->cronConstructorArguments; + + $cron = $this->getMockBuilder(Cron::class) + ->onlyMethods(['usleep']) + ->setConstructorArgs($this->cronConstructorArguments) + ->getMock(); + + $cron->expects($expectQueueDelay ? $this->once() : $this->never()) + ->method('usleep'); + + $queueManager->expects($this->once()) + ->method('getDefinitions') + ->willReturn([ + 'test_worker' => [ + 'id' => 'test_worker', + 'cron' => 300, + ], + ]); + + $queue = $this->createMock(QueueInterface::class); + $queueFactory->expects($this->once()) + ->method('get') + ->willReturnMap([ + ['test_worker', FALSE, $queue], + ]); + $queue->expects($this->exactly($expectQueueDelay ? 2 : 1)) + ->method('claimItem') + ->willReturnOnConsecutiveCalls( + (object) ['data' => 'test_data'], + FALSE, + ); + + $queueManager->expects($this->exactly(1)) + ->method('createInstance') + ->with('test_worker') + ->willReturn($this->workerA); + + $this->workerA->expects($this->once()) + ->method('processItem') + ->with($this->anything()) + ->willReturnOnConsecutiveCalls( + $this->throwException(new SuspendQueueException('', 0, NULL, $suspendQueueDelay)), + ); + + $cron->run(); + } + + /** + * Data for testing. + * + * @return array + * Scenarios for testing. + */ + public function providerSuspendQueueThreshold(): array { + $scenarios = []; + $scenarios['cron will wait for the queue, and rerun'] = [ + 15.0, + 10.0, + TRUE, + ]; + $scenarios['cron will not wait for the queue, and exit'] = [ + 15.0, + 20.0, + FALSE, + ]; + return $scenarios; + } + + /** + * Tests queues are executed in order. + * + * If multiple queues are delayed, they must execute in order of time. + */ + public function testSuspendQueueOrder(): void { + [ + 'queue_factory' => $queueFactory, + 'queue_manager' => $queueManager, + 'time' => $time, + ] = $this->cronConstructorArguments; + + $cron = $this->getMockBuilder(Cron::class) + ->onlyMethods(['usleep']) + ->setConstructorArgs($this->cronConstructorArguments) + ->getMock(); + + $cron->expects($this->any()) + ->method('usleep'); + + $queueManager->expects($this->once()) + ->method('getDefinitions') + ->willReturn([ + 'test_worker_a' => [ + 'id' => 'test_worker_a', + 'cron' => ['time' => 300], + ], + 'test_worker_b' => [ + 'id' => 'test_worker_b', + 'cron' => ['time' => 300], + ], + 'test_worker_c' => [ + 'id' => 'test_worker_c', + 'cron' => ['time' => 300], + ], + 'test_worker_d' => [ + 'id' => 'test_worker_d', + 'cron' => ['time' => 300], + ], + ]); + + $queueA = $this->createMock(QueueInterface::class); + $queueB = $this->createMock(QueueInterface::class); + $queueC = $this->createMock(QueueInterface::class); + $queueD = $this->createMock(QueueInterface::class); + $queueFactory->expects($this->exactly(4)) + ->method('get') + ->willReturnMap([ + ['test_worker_a', FALSE, $queueA], + ['test_worker_b', FALSE, $queueB], + ['test_worker_c', FALSE, $queueC], + ['test_worker_d', FALSE, $queueD], + ]); + + $queueA->expects($this->any()) + ->method('claimItem') + ->willReturnOnConsecutiveCalls( + (object) ['data' => 'test_data_from_queue_a'], + FALSE, + ); + $queueB->expects($this->any()) + ->method('claimItem') + ->willReturnOnConsecutiveCalls( + (object) ['data' => 'test_data_from_queue_b'], + (object) ['data' => 'test_data_from_queue_b'], + FALSE, + ); + $queueC->expects($this->any()) + ->method('claimItem') + ->willReturnOnConsecutiveCalls( + (object) ['data' => 'test_data_from_queue_c'], + (object) ['data' => 'test_data_from_queue_c'], + FALSE, + ); + $queueD->expects($this->any()) + ->method('claimItem') + ->willReturnOnConsecutiveCalls( + (object) ['data' => 'test_data_from_queue_d'], + FALSE, + ); + + // Recycle the same worker for all queues to test order sanely: + $queueManager->expects($this->any()) + ->method('createInstance') + ->willReturnMap([ + ['test_worker_a', [], $this->workerA], + ['test_worker_b', [], $this->workerA], + ['test_worker_c', [], $this->workerA], + ['test_worker_d', [], $this->workerA], + ]); + + $this->workerA->expects($this->exactly(6)) + ->method('processItem') + ->withConsecutive( + // All queues are executed in sequence of definition: + [$this->equalTo('test_data_from_queue_a')], + [$this->equalTo('test_data_from_queue_b')], + [$this->equalTo('test_data_from_queue_c')], + [$this->equalTo('test_data_from_queue_d')], + // Queue C is executed again, and before queue B. + [$this->equalTo('test_data_from_queue_c')], + // Queue B is executed again, after queue C since its delay was longer. + [$this->equalTo('test_data_from_queue_b')], + ) + ->willReturnOnConsecutiveCalls( + NULL, + $this->throwException(new SuspendQueueException('', 0, NULL, 16.0)), + $this->throwException(new SuspendQueueException('', 0, NULL, 8.0)), + NULL, + NULL, + NULL, + ); + + $currentTime = 60; + $time->expects($this->any()) + ->method('getCurrentTime') + ->willReturnCallback(function () use (&$currentTime): int { + return (int) $currentTime; + }); + $time->expects($this->any()) + ->method('getCurrentMicroTime') + ->willReturnCallback(function () use (&$currentTime): float { + return (float) $currentTime; + }); + + $cron->expects($this->exactly(2)) + ->method('usleep') + ->withConsecutive( + // Expect to wait for 8 seconds. + [ + $this->callback(function (int $microseconds) use (&$currentTime) { + // Accelerate time by 4 seconds. + $currentTime += 4; + return $microseconds === 8000000; + }), + ], + // SuspendQueueException requests to delay by 16 seconds, but 4 seconds + // have passed above, so there are just 12 seconds remaining: + [$this->equalTo(12000000)], + ); + + $cron->run(); + } + +} diff --git a/core/tests/Drupal/Tests/Core/CronTest.php b/core/tests/Drupal/Tests/Core/CronTest.php index 52c2d09a78d6073fac90ea7719e706968e8f4083..bc968777a539de47ba55a961c06265879063ce94 100644 --- a/core/tests/Drupal/Tests/Core/CronTest.php +++ b/core/tests/Drupal/Tests/Core/CronTest.php @@ -102,6 +102,12 @@ protected function setUp(): void { $queue_worker_manager = $this->prophesize('Drupal\Core\Queue\QueueWorkerManagerInterface'); $state = $this->prophesize('Drupal\Core\State\StateInterface'); $account_switcher = $this->prophesize('Drupal\Core\Session\AccountSwitcherInterface'); + $queueConfig = [ + 'suspendMaximumWait' => 30.0, + ]; + + // Create a lock that will always fail when attempting to acquire; we're + // only interested in testing ::processQueues(), not the other stuff. $lock_backend = $this->prophesize('Drupal\Core\Lock\LockBackendInterface'); $lock_backend->acquire('cron', Argument::cetera())->willReturn(TRUE); $lock_backend->release('cron')->shouldBeCalled(); @@ -121,6 +127,8 @@ protected function setUp(): void { // Create a mock queue worker plugin instance based on above definition. $queue_worker_plugin = $this->prophesize('Drupal\Core\Queue\QueueWorkerInterface'); + $queue_worker_plugin->getPluginId()->willReturn($queue_worker); + $queue_worker_plugin->getPluginDefinition()->willReturn($queue_worker_definition); $queue_worker_plugin->processItem('Complete')->willReturn(); $queue_worker_plugin->processItem('Exception')->willThrow(\Exception::class); $queue_worker_plugin->processItem('DelayedRequeueException')->willThrow(DelayedRequeueException::class); @@ -147,7 +155,7 @@ protected function setUp(): void { $queue_worker_manager->createInstance($queue_worker)->willReturn($queue_worker_plugin->reveal()); // Construct the Cron class to test. - $this->cron = new Cron($module_handler->reveal(), $lock_backend->reveal(), $queue_factory->reveal(), $state->reveal(), $account_switcher->reveal(), $logger->reveal(), $queue_worker_manager->reveal(), $time->reveal()); + $this->cron = new Cron($module_handler->reveal(), $lock_backend->reveal(), $queue_factory->reveal(), $state->reveal(), $account_switcher->reveal(), $logger->reveal(), $queue_worker_manager->reveal(), $time->reveal(), $queueConfig); } /** diff --git a/sites/default/default.services.yml b/sites/default/default.services.yml index b4d27e05a56f101ca0204d2553e0e5c7f643260f..eb530088efafbdd97b26debce9cffbe11d6c1583 100644 --- a/sites/default/default.services.yml +++ b/sites/default/default.services.yml @@ -213,3 +213,9 @@ parameters: maxAge: false # Sets the Access-Control-Allow-Credentials header. supportsCredentials: false + + queue.config: + # The maximum number of seconds to wait if a queue is temporarily suspended. + # This is not applicable when a queue is suspended but does not specify + # how long to wait before attempting to resume. + suspendMaximumWait: 30