From 0fc50fa51dc91bef7c98292a4e8eb8b1091e6bb1 Mon Sep 17 00:00:00 2001 From: Alex Pott <alex.a.pott@googlemail.com> Date: Tue, 3 May 2022 13:01:23 +0100 Subject: [PATCH] Issue #3230541 by cliddell, jday, yogeshmpawar, neclimdul, cmlara, Charlie ChX Negyesi: Queue items only reserved by cron for 1 second --- .../Drupal/Core/Annotation/QueueWorker.php | 4 ++- core/lib/Drupal/Core/Cron.php | 6 ++-- core/lib/Drupal/Core/CronInterface.php | 10 +++++++ core/lib/Drupal/Core/Queue/DatabaseQueue.php | 4 +-- .../Drupal/Core/Queue/QueueWorkerManager.php | 14 --------- .../CronQueueTestDatabaseDelayException.php | 5 +++- .../QueueWorker/CronQueueTestLeaseTime.php | 30 +++++++++++++++++++ .../CronQueueTestMemoryDelayException.php | 5 +++- .../tests/src/Kernel/System/CronQueueTest.php | 30 +++++++++++++++++-- 9 files changed, 83 insertions(+), 25 deletions(-) create mode 100644 core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestLeaseTime.php diff --git a/core/lib/Drupal/Core/Annotation/QueueWorker.php b/core/lib/Drupal/Core/Annotation/QueueWorker.php index 354642a72c60..a8375e6d3bb8 100644 --- a/core/lib/Drupal/Core/Annotation/QueueWorker.php +++ b/core/lib/Drupal/Core/Annotation/QueueWorker.php @@ -52,13 +52,15 @@ class QueueWorker extends Plugin { public $title; /** - * An associative array containing an optional key. + * An associative array containing optional keys. * * This property is optional and it does not need to be declared. * * Available keys: * - time (optional): How much time Drupal cron should spend on calling this * worker in seconds. Defaults to 15. + * - lease_time: (optional) How long the lease is for a queue item when + * called from cron in seconds. Defaults to 30. * * @var array */ diff --git a/core/lib/Drupal/Core/Cron.php b/core/lib/Drupal/Core/Cron.php index 5464f7238d0e..b8d8b8e1a4eb 100644 --- a/core/lib/Drupal/Core/Cron.php +++ b/core/lib/Drupal/Core/Cron.php @@ -174,10 +174,10 @@ protected function processQueues() { $this->queueFactory->get($queue_name)->createQueue(); $queue_worker = $this->queueManager->createInstance($queue_name); - $end = time() + ($info['cron']['time'] ?? 15); + $end = $this->time->getCurrentTime() + $info['cron']['time'] ?? static::DEFAULT_QUEUE_CRON_TIME; $queue = $this->queueFactory->get($queue_name); - $lease_time = isset($info['cron']['time']) ?: NULL; - while (time() < $end && ($item = $queue->claimItem($lease_time))) { + $lease_time = $info['cron']['lease_time'] ?? static::DEFAULT_QUEUE_CRON_LEASE_TIME; + while ($this->time->getCurrentTime() < $end && ($item = $queue->claimItem($lease_time))) { try { $queue_worker->processItem($item->data); $queue->deleteItem($item); diff --git a/core/lib/Drupal/Core/CronInterface.php b/core/lib/Drupal/Core/CronInterface.php index 98c3f37c6b71..22a93f91ab50 100644 --- a/core/lib/Drupal/Core/CronInterface.php +++ b/core/lib/Drupal/Core/CronInterface.php @@ -9,6 +9,16 @@ */ interface CronInterface { + /** + * The default time cron should execute each queue in seconds. + */ + public const DEFAULT_QUEUE_CRON_TIME = 15; + + /** + * The default lease time a queue item should get when called from cron. + */ + public const DEFAULT_QUEUE_CRON_LEASE_TIME = 30; + /** * Executes a cron run. * diff --git a/core/lib/Drupal/Core/Queue/DatabaseQueue.php b/core/lib/Drupal/Core/Queue/DatabaseQueue.php index 32a87feb323e..2af4360dcab2 100644 --- a/core/lib/Drupal/Core/Queue/DatabaseQueue.php +++ b/core/lib/Drupal/Core/Queue/DatabaseQueue.php @@ -241,7 +241,7 @@ public function garbageCollection() { try { // Clean up the queue for failed batches. $this->connection->delete(static::TABLE_NAME) - ->condition('created', REQUEST_TIME - 864000, '<') + ->condition('created', \Drupal::time()->getRequestTime() - 864000, '<') ->condition('name', 'drupal_batch:%', 'LIKE') ->execute(); @@ -252,7 +252,7 @@ public function garbageCollection() { 'expire' => 0, ]) ->condition('expire', 0, '<>') - ->condition('expire', REQUEST_TIME, '<') + ->condition('expire', \Drupal::time()->getRequestTime(), '<') ->execute(); } catch (\Exception $e) { diff --git a/core/lib/Drupal/Core/Queue/QueueWorkerManager.php b/core/lib/Drupal/Core/Queue/QueueWorkerManager.php index b1aa40f51c97..c54135709fa0 100644 --- a/core/lib/Drupal/Core/Queue/QueueWorkerManager.php +++ b/core/lib/Drupal/Core/Queue/QueueWorkerManager.php @@ -34,20 +34,6 @@ public function __construct(\Traversable $namespaces, CacheBackendInterface $cac $this->alterInfo('queue_info'); } - /** - * {@inheritdoc} - */ - public function processDefinition(&$definition, $plugin_id) { - parent::processDefinition($definition, $plugin_id); - - // Assign a default time if a cron is specified. - if (isset($definition['cron'])) { - $definition['cron'] += [ - 'time' => 15, - ]; - } - } - /** * {@inheritdoc} * diff --git a/core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestDatabaseDelayException.php b/core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestDatabaseDelayException.php index 93e4a1e2cb4b..b9f3dbfe25a1 100644 --- a/core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestDatabaseDelayException.php +++ b/core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestDatabaseDelayException.php @@ -11,7 +11,10 @@ * @QueueWorker( * id = "cron_queue_test_database_delay_exception", * title = @Translation("Database delay exception test"), - * cron = {"time" = 1} + * cron = { + * "time" = 1, + * "lease_time" = 2 + * } * ) */ class CronQueueTestDatabaseDelayException extends QueueWorkerBase { diff --git a/core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestLeaseTime.php b/core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestLeaseTime.php new file mode 100644 index 000000000000..eab32490f2fd --- /dev/null +++ b/core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestLeaseTime.php @@ -0,0 +1,30 @@ +<?php + +namespace Drupal\cron_queue_test\Plugin\QueueWorker; + +use Drupal\Core\Queue\QueueWorkerBase; + +/** + * @QueueWorker( + * id = "cron_queue_test_lease_time", + * title = @Translation("Lease time test"), + * cron = { + * "time" = 5, + * "lease_time" = 2 + * } + * ) + */ +class CronQueueTestLeaseTime extends QueueWorkerBase { + + /** + * {@inheritdoc} + */ + public function processItem($data) { + $state = \Drupal::state(); + $count = $state->get('cron_queue_test_lease_time', 0); + $count++; + $state->set('cron_queue_test_lease_time', $count); + throw new \Exception('Leave me queued and leased!'); + } + +} diff --git a/core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestMemoryDelayException.php b/core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestMemoryDelayException.php index 6548868608d8..08f2c92ccbde 100644 --- a/core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestMemoryDelayException.php +++ b/core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestMemoryDelayException.php @@ -11,7 +11,10 @@ * @QueueWorker( * id = "cron_queue_test_memory_delay_exception", * title = @Translation("Memory delay exception test"), - * cron = {"time" = 1} + * cron = { + * "time" = 1, + * "lease_time" = 2 + * } * ) */ class CronQueueTestMemoryDelayException extends QueueWorkerBase { diff --git a/core/modules/system/tests/src/Kernel/System/CronQueueTest.php b/core/modules/system/tests/src/Kernel/System/CronQueueTest.php index 7eef7520602d..940e1a529283 100644 --- a/core/modules/system/tests/src/Kernel/System/CronQueueTest.php +++ b/core/modules/system/tests/src/Kernel/System/CronQueueTest.php @@ -89,8 +89,8 @@ public function testDelayException() { // Get the queue worker plugin manager. $manager = $this->container->get('plugin.manager.queue_worker'); $definitions = $manager->getDefinitions(); - $this->assertNotEmpty($database_lease_time = $definitions['cron_queue_test_database_delay_exception']['cron']['time']); - $this->assertNotEmpty($memory_lease_time = $definitions['cron_queue_test_memory_delay_exception']['cron']['time']); + $this->assertNotEmpty($database_lease_time = $definitions['cron_queue_test_database_delay_exception']['cron']['lease_time']); + $this->assertNotEmpty($memory_lease_time = $definitions['cron_queue_test_memory_delay_exception']['cron']['lease_time']); // Create the necessary test data and run cron. $database->createItem('test'); @@ -120,6 +120,30 @@ public function testDelayException() { $this->assertEquals($this->currentTime + $memory_lease_time, reset($memory_queue_internal)->expire); } + /** + * Tests that leases are expiring correctly, also within the same request. + */ + public function testLeaseTime() { + $queue = $this->container->get('queue')->get('cron_queue_test_lease_time'); + $queue->createItem([$this->randomMachineName() => $this->randomMachineName()]); + $this->cron->run(); + static::assertEquals(1, \Drupal::state()->get('cron_queue_test_lease_time')); + $this->cron->run(); + static::assertEquals(1, \Drupal::state()->get('cron_queue_test_lease_time')); + + // Set the expiration time to 3 seconds ago, so the lease should + // automatically expire. + \Drupal::database() + ->update(DatabaseQueue::TABLE_NAME) + ->fields(['expire' => $this->currentTime - 3]) + ->execute(); + + $this->cron->run(); + static::assertEquals(2, \Drupal::state()->get('cron_queue_test_lease_time')); + $this->cron->run(); + static::assertEquals(2, \Drupal::state()->get('cron_queue_test_lease_time')); + } + /** * Tests that exceptions thrown by workers are handled properly. */ @@ -145,7 +169,7 @@ public function testExceptions() { // @see \Drupal\Core\Cron::processQueues() $this->connection->update('queue') ->condition('name', 'cron_queue_test_exception') - ->fields(['expire' => REQUEST_TIME - 1]) + ->fields(['expire' => \Drupal::time()->getRequestTime() - 1]) ->execute(); $this->cron->run(); $this->assertEquals(2, \Drupal::state()->get('cron_queue_test_exception')); -- GitLab