Commit 0f10d21c authored by catch's avatar catch
Browse files

Issue #3116478 by clayfreeman, jungle, mrinalini9, jonathanshaw, neclimdul,...

Issue #3116478 by clayfreeman, jungle, mrinalini9, jonathanshaw, neclimdul, Charlie ChX Negyesi, longwave, alexpott, catch, andypost: Add a way to silently keep an item locked when processing a queue via cron
parent 856ab5fa
......@@ -8,7 +8,9 @@
use Drupal\Core\Extension\ModuleHandlerInterface;
use Drupal\Core\Lock\LockBackendInterface;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Queue\DelayableQueueInterface;
use Drupal\Core\Queue\QueueWorkerManagerInterface;
use Drupal\Core\Queue\DelayedRequeueException;
use Drupal\Core\Queue\RequeueException;
use Drupal\Core\Queue\SuspendQueueException;
use Drupal\Core\Session\AccountSwitcherInterface;
......@@ -180,6 +182,18 @@ protected function processQueues() {
$queue_worker->processItem($item->data);
$queue->deleteItem($item);
}
catch (DelayedRequeueException $e) {
// The worker requested the task not be immediately re-queued.
// - If the queue doesn't support ::delayItem(), we should leave the
// item's current expiry time alone.
// - If the queue does support ::delayItem(), we should allow the
// queue to update the item's expiry using the requested delay.
if ($queue instanceof DelayableQueueInterface) {
// This queue can handle a custom delay; use the duration provided
// by the exception.
$queue->delayItem($item, $e->getDelay());
}
}
catch (RequeueException $e) {
// The worker requested the task be immediately requeued.
$queue->releaseItem($item);
......
......@@ -11,7 +11,7 @@
*
* @ingroup queue
*/
class DatabaseQueue implements ReliableQueueInterface, QueueGarbageCollectionInterface {
class DatabaseQueue implements ReliableQueueInterface, QueueGarbageCollectionInterface, DelayableQueueInterface {
use DependencySerializationTrait;
......@@ -89,7 +89,7 @@ protected function doCreateItem($data) {
'data' => serialize($data),
// We cannot rely on REQUEST_TIME because many items might be created
// by a single request which takes longer than 1 second.
'created' => time(),
'created' => \Drupal::time()->getCurrentTime(),
]);
// Return the new serial ID, or FALSE on failure.
return $query->execute();
......@@ -140,7 +140,7 @@ public function claimItem($lease_time = 30) {
// should really expire.
$update = $this->connection->update(static::TABLE_NAME)
->fields([
'expire' => time() + $lease_time,
'expire' => \Drupal::time()->getCurrentTime() + $lease_time,
])
->condition('item_id', $item->item_id)
->condition('expire', 0);
......@@ -171,6 +171,33 @@ public function releaseItem($item) {
}
}
/**
* {@inheritdoc}
*/
public function delayItem($item, int $delay) {
// Only allow a positive delay interval.
if ($delay < 0) {
throw new \InvalidArgumentException('$delay must be non-negative');
}
try {
// Add the delay relative to the current time.
$expire = \Drupal::time()->getCurrentTime() + $delay;
// Update the expiry time of this item.
$update = $this->connection->update(static::TABLE_NAME)
->fields([
'expire' => $expire,
])
->condition('item_id', $item->item_id);
return $update->execute();
}
catch (\Exception $e) {
$this->catchException($e);
// If the table doesn't exist we should consider the item nonexistent.
return TRUE;
}
}
/**
* {@inheritdoc}
*/
......
<?php
namespace Drupal\Core\Queue;
/**
* Delayable queue interface.
*
* Classes implementing this interface allow an item to be released on a delay.
*
* @ingroup queue
*/
interface DelayableQueueInterface extends QueueInterface {
/**
* Delay an item so it runs in the future.
*
* @param object $item
* The item returned by \Drupal\Core\Queue\QueueInterface::claimItem().
* @param int $delay
* A delay before the item's lock should expire (in seconds). Relative to
* the current time, not the item's current expiry.
*
* @throws \InvalidArgumentException
* When a negative $delay is provided; $delay must be non-negative.
*
* @see \Drupal\Core\Queue\QueueInterface::releaseItem()
* To immediately release an item without delay.
*
* @return bool
* TRUE if the item has been updated, FALSE otherwise.
*/
public function delayItem($item, int $delay);
}
<?php
namespace Drupal\Core\Queue;
/**
* Throw this exception to leave an item in the queue until its lock expires.
*
* @see \Drupal\Core\Cron::processQueues()
* For more information about how this exception interacts with Drupal's queue
* processing via the built-in cron service.
* @see \Drupal\Core\Queue\DelayableQueueInterface
* Queues must implement this interface to support custom delay intervals; if
* this interface is missing, any custom delay interval specified for this
* exception will be ignored and the remaining time in the original lease will
* be used as the duration of the delay interval.
* @see \Drupal\Core\Queue\RequeueException
* For use when an item needs to be requeued immediately.
*/
class DelayedRequeueException extends \RuntimeException {
/**
* The interval of time that the item should remain locked (in seconds).
*
* @var int
*/
protected $delay = 0;
/**
* Constructs a DelayedRequeueException.
*
* @param int $delay
* The desired delay interval for this item.
*/
public function __construct(int $delay = 0) {
if ($delay > 0) {
$this->delay = $delay;
}
}
/**
* Get the desired delay interval for this item.
*
* @see self::$delay
* For recommended value usage in a queue processor.
*
* @return int
* The desired delay interval for this item.
*/
public function getDelay(): int {
return $this->delay;
}
}
......@@ -12,6 +12,7 @@
* @ingroup queue
*/
class Memory implements QueueInterface {
/**
* The queue data.
*
......@@ -44,7 +45,7 @@ public function createItem($data) {
$item = new \stdClass();
$item->item_id = $this->idSequence++;
$item->data = $data;
$item->created = time();
$item->created = \Drupal::time()->getCurrentTime();
$item->expire = 0;
$this->queue[$item->item_id] = $item;
return $item->item_id;
......@@ -63,7 +64,7 @@ public function numberOfItems() {
public function claimItem($lease_time = 30) {
foreach ($this->queue as $key => $item) {
if ($item->expire == 0) {
$item->expire = time() + $lease_time;
$item->expire = \Drupal::time()->getCurrentTime() + $lease_time;
$this->queue[$key] = $item;
return $item;
}
......
......@@ -410,6 +410,7 @@ deduplicates
defalt
defaultable
defgroup
delayable
deletable
deletedline
deletee
......
<?php
namespace Drupal\cron_queue_test\Plugin\QueueWorker;
use Drupal\Core\Queue\DelayedRequeueException;
use Drupal\Core\Queue\QueueWorkerBase;
/**
* A queue worker for testing cron exception handling.
*
* @QueueWorker(
* id = "cron_queue_test_database_delay_exception",
* title = @Translation("Database delay exception test"),
* cron = {"time" = 1}
* )
*/
class CronQueueTestDatabaseDelayException extends QueueWorkerBase {
const DELAY_INTERVAL = 100;
/**
* {@inheritdoc}
*/
public function processItem($data) {
throw new DelayedRequeueException(self::DELAY_INTERVAL);
}
}
<?php
namespace Drupal\cron_queue_test\Plugin\QueueWorker;
use Drupal\Core\Queue\DelayedRequeueException;
use Drupal\Core\Queue\QueueWorkerBase;
/**
* A queue worker for testing cron exception handling.
*
* @QueueWorker(
* id = "cron_queue_test_memory_delay_exception",
* title = @Translation("Memory delay exception test"),
* cron = {"time" = 1}
* )
*/
class CronQueueTestMemoryDelayException extends QueueWorkerBase {
/**
* {@inheritdoc}
*/
public function processItem($data) {
// Set the delay to something larger than the original lease.
$cron_time = $this->pluginDefinition['cron']['time'];
throw new DelayedRequeueException($cron_time + 100);
}
}
......@@ -3,7 +3,11 @@
namespace Drupal\Tests\system\Kernel\System;
use Drupal\Core\Database\Database;
use Drupal\Core\Queue\DatabaseQueue;
use Drupal\Core\Queue\Memory;
use Drupal\KernelTests\KernelTestBase;
use Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestDatabaseDelayException;
use Prophecy\Argument;
/**
* Tests the Cron Queue runner.
......@@ -33,6 +37,15 @@ class CronQueueTest extends KernelTestBase {
*/
protected $cron;
/**
* The fake current time used for queue worker / cron testing purposes.
*
* This value should be greater than or equal to zero.
*
* @var int
*/
protected $currentTime = 1000;
/**
* {@inheritdoc}
*/
......@@ -41,6 +54,70 @@ protected function setUp(): void {
$this->connection = Database::getConnection();
$this->cron = \Drupal::service('cron');
$time = $this->prophesize('Drupal\Component\Datetime\TimeInterface');
$time->getCurrentTime()->willReturn($this->currentTime);
$time->getRequestTime()->willReturn($this->currentTime);
\Drupal::getContainer()->set('datetime.time', $time->reveal());
$this->assertEquals($this->currentTime, \Drupal::time()->getCurrentTime());
$this->assertEquals($this->currentTime, \Drupal::time()->getRequestTime());
$realQueueFactory = $this->container->get('queue');
$queue_factory = $this->prophesize(get_class($realQueueFactory));
$database = new DatabaseQueue('cron_queue_test_database_delay_exception', $this->connection);
$memory = new Memory('cron_queue_test_memory_delay_exception');
$queue_factory->get('cron_queue_test_database_delay_exception', Argument::cetera())->willReturn($database);
$queue_factory->get('cron_queue_test_memory_delay_exception', Argument::cetera())->willReturn($memory);
$queue_factory->get(Argument::any(), Argument::cetera())->will(function ($args) use ($realQueueFactory) {
return $realQueueFactory->get($args[0], $args[1] ?? FALSE);
});
$this->container->set('queue', $queue_factory->reveal());
}
/**
* Tests that DelayedRequeueException behaves as expected when running cron.
*/
public function testDelayException() {
$database = $this->container->get('queue')->get('cron_queue_test_database_delay_exception');
$memory = $this->container->get('queue')->get('cron_queue_test_memory_delay_exception');
// Ensure that the queues are of the correct type for this test.
$this->assertInstanceOf('Drupal\Core\Queue\DelayableQueueInterface', $database);
$this->assertNotInstanceOf('Drupal\Core\Queue\DelayableQueueInterface', $memory);
// Get the queue worker plugin manager.
$manager = $this->container->get('plugin.manager.queue_worker');
$definitions = $manager->getDefinitions();
$this->assertNotEmpty($database_lease_time = $definitions['cron_queue_test_database_delay_exception']['cron']['time']);
$this->assertNotEmpty($memory_lease_time = $definitions['cron_queue_test_memory_delay_exception']['cron']['time']);
// Create the necessary test data and run cron.
$database->createItem('test');
$memory->createItem('test');
$this->cron->run();
// Fetch the expiry time for the database queue.
$query = $this->connection->select('queue');
$query->condition('name', 'cron_queue_test_database_delay_exception');
$query->addField('queue', 'expire');
$query->range(0, 1);
$expire = $query->execute()->fetchField();
// Assert that the delay interval is greater than the lease interval. This
// allows us to assume that (if updated) the new expiry time will be greater
// than the initial expiry time. We can then also assume that the new expiry
// time offset will be identical to the delay interval.
$this->assertGreaterThan($database_lease_time, CronQueueTestDatabaseDelayException::DELAY_INTERVAL);
$this->assertGreaterThan($this->currentTime + $database_lease_time, $expire);
$this->assertEquals(CronQueueTestDatabaseDelayException::DELAY_INTERVAL, $expire - $this->currentTime);
// Ensure that the memory queue expiry time is unchanged after the
// DelayedRequeueException has been thrown.
$property = (new \ReflectionClass($memory))->getProperty('queue');
$property->setAccessible(TRUE);
$memory_queue_internal = $property->getValue($memory);
$this->assertEquals($this->currentTime + $memory_lease_time, reset($memory_queue_internal)->expire);
}
/**
......
<?php
namespace Drupal\Tests\Core;
use Drupal\Core\Cron;
use Drupal\Core\KeyValueStore\KeyValueMemoryFactory;
use Drupal\Core\Queue\DelayedRequeueException;
use Drupal\Core\Queue\Memory;
use Drupal\Core\Queue\RequeueException;
use Drupal\Core\Queue\SuspendQueueException;
use Drupal\Core\State\State;
use Drupal\Tests\UnitTestCase;
use Prophecy\Argument;
use Prophecy\Argument\ArgumentsWildcard;
use Psr\Log\LoggerInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
/**
* Tests the Cron class.
*
* @group Cron
* @coversDefaultClass \Drupal\Core\Cron
*/
class CronTest extends UnitTestCase {
const REQUEUE_COUNT = 3;
/**
* Define the duration of each item claim for this test.
*
* @var int
*/
protected $claimTime = 300;
/**
* An instance of the Cron class for testing.
*
* @var \Drupal\Core\Cron
*/
protected $cron;
/**
* The queue used to store test work items.
*
* @var \Drupal\Core\Queue\QueueInterface
*/
protected $queue;
/**
* The current state of the test in memory.
*
* @var \Drupal\Core\State\State
*/
protected $state;
/**
* {@inheritdoc}
*/
protected function setUp(): void {
parent::setUp();
// @todo Remove in https://www.drupal.org/project/drupal/issues/2932518
//
// This line is currently needed so that watchdog_exception() is available
// when unit testing Drupal\Core\Cron and can safely be removed once that
// class no longer refers to it.
require_once $this->root . '/core/includes/bootstrap.inc';
// Construct a state object used for testing logger assertions.
$this->state = new State(new KeyValueMemoryFactory());
// Create a mock logger to set a flag in the resulting state.
$logger = $this->prophesize('Drupal\Core\Logger\LoggerChannelInterface');
// Safely ignore the cron re-run message when failing to acquire a lock.
//
// We don't need to run regular cron tasks, and we're still implicitly
// testing that queues are being processed.
//
// This argument will need to be updated to match the message text in
// Drupal\Core\Cron::run() should the original text ever be updated.
$logger->warning(Argument::exact('Attempting to re-run cron while it is already running.'))->shouldBeCalled();
// Set a flag to track when a message is logged by adding a callback
// function for each logging method.
foreach (get_class_methods(LoggerInterface::class) as $logger_method) {
$logger->{$logger_method}(Argument::cetera())->will(function () {
\Drupal::state()->set('cron_test.message_logged', TRUE);
});
}
// Create a logger factory to produce the resulting logger.
$logger_factory = $this->prophesize('Drupal\Core\Logger\LoggerChannelFactoryInterface');
$logger_factory->get(Argument::exact('cron'))->willReturn($logger->reveal());
// Create a mock time service.
$time = $this->prophesize('Drupal\Component\Datetime\TimeInterface');
// Build the container using the resulting mock objects.
\Drupal::setContainer(new ContainerBuilder());
\Drupal::getContainer()->set('logger.factory', $logger_factory->reveal());
\Drupal::getContainer()->set('datetime.time', $time->reveal());
\Drupal::getContainer()->set('state', $this->state);
// Create mock objects for constructing the Cron class.
$module_handler = $this->prophesize('Drupal\Core\Extension\ModuleHandlerInterface');
$queue_factory = $this->prophesize('Drupal\Core\Queue\QueueFactory');
$queue_worker_manager = $this->prophesize('Drupal\Core\Queue\QueueWorkerManagerInterface');
$state = $this->prophesize('Drupal\Core\State\StateInterface');
$account_switcher = $this->prophesize('Drupal\Core\Session\AccountSwitcherInterface');
// Create a lock that will always fail when attempting to acquire; we're
// only interested in testing ::processQueues(), not the other stuff.
$lock_backend = $this->prophesize('Drupal\Core\Lock\LockBackendInterface');
$lock_backend->acquire(Argument::exact('cron'), Argument::cetera())->willReturn(FALSE);
// Create a queue worker definition for testing purposes.
$queue_worker = $this->randomMachineName();
$queue_worker_definition = [
'id' => $queue_worker,
'cron' => [
'time' => &$this->claimTime,
],
];
// Create a queue instance for this queue worker.
$this->queue = new Memory($queue_worker);
$queue_factory->get($queue_worker)->willReturn($this->queue);
// Create a mock queue worker plugin instance based on above definition.
$queue_worker_plugin = $this->prophesize('Drupal\Core\Queue\QueueWorkerInterface');
$queue_worker_plugin->processItem('Complete')->willReturn();
$queue_worker_plugin->processItem('Exception')->willThrow(\Exception::class);
$queue_worker_plugin->processItem('DelayedRequeueException')->willThrow(DelayedRequeueException::class);
$queue_worker_plugin->processItem('SuspendQueueException')->willThrow(SuspendQueueException::class);
// 'RequeueException' would normally result in an infinite loop.
//
// This is avoided by throwing RequeueException for the first few calls to
// ::processItem() and then returning void. ::testRequeueException()
// establishes sanity assertions for this case.
$queue_worker_plugin->processItem('RequeueException')->will(function ($args, $mock, $method) {
// Fetch the number of calls to this prophesied method. This value will
// start at zero during the first call.
$method_calls = count($mock->findProphecyMethodCalls($method->getMethodName(), new ArgumentsWildcard($args)));
// Throw the expected exception on the first few calls.
if ($method_calls < self::REQUEUE_COUNT) {
\Drupal::state()->set('cron_test.requeue_count', $method_calls + 1);
throw new RequeueException();
}
});
// Set the mock queue worker manager to return the definition/plugin.
$queue_worker_manager->getDefinitions()->willReturn([$queue_worker => $queue_worker_definition]);
$queue_worker_manager->createInstance($queue_worker)->willReturn($queue_worker_plugin->reveal());
// Construct the Cron class to test.
$this->cron = new Cron($module_handler->reveal(), $lock_backend->reveal(), $queue_factory->reveal(), $state->reveal(), $account_switcher->reveal(), $logger->reveal(), $queue_worker_manager->reveal(), $time->reveal());
}
/**
* Resets the testing state.
*/
protected function resetTestingState() {
$this->queue->deleteQueue();
$this->state->set('cron_test.message_logged', FALSE);
$this->state->set('cron_test.requeue_count', NULL);
}
/**
* Data provider for ::testProcessQueues() method.
*/
public function processQueuesTestData() {
return [
['Complete', 'assertFalse', 0],
['Exception', 'assertTrue', 1],
['DelayedRequeueException', 'assertFalse', 1],
['SuspendQueueException', 'assertTrue', 1],
['RequeueException', 'assertFalse', 0],
];
}
/**
* Tests the ::processQueues() method.
*
* @covers ::processQueues
* @dataProvider processQueuesTestData
*/
public function testProcessQueues($item, $message_logged_assertion, $count_post_run) {
$this->resetTestingState();
$this->queue->createItem($item);
$this->assertFalse($this->state->get('cron_test.message_logged'));
$this->assertEquals(1, $this->queue->numberOfItems());
$this->cron->run();
$this->{$message_logged_assertion}($this->state->get('cron_test.message_logged'));
$this->assertEquals($count_post_run, $this->queue->numberOfItems());
}
/**
* Verify that RequeueException causes an item to be processed multiple times.
*/
public function testRequeueException() {
$this->resetTestingState();
$this->queue->createItem('RequeueException');
$this->cron->run();
// Fetch the number of times this item was requeued.
$actual_requeue_count = $this->state->get('cron_test.requeue_count');
// Make sure the item was requeued at least once.
$this->assertIsInt($actual_requeue_count);
// Ensure that the actual requeue count matches the expected value.
$this->assertEquals(self::REQUEUE_COUNT, $actual_requeue_count);
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment