Loading core/lib/Drupal/Core/Cron.php +5 −3 Original line number Diff line number Diff line Loading @@ -200,11 +200,13 @@ protected function processQueues() { $queue->releaseItem($item); } catch (SuspendQueueException $e) { // If the worker indicates there is a problem with the whole queue, // release the item and skip to the next queue. // If the worker indicates the whole queue should be skipped, // release the item and go to the next queue. $queue->releaseItem($item); watchdog_exception('cron', $e); $this->logger->debug('A worker for @queue queue suspended further processing of the queue.', [ '@queue' => $queue_name, ]); // Skip to the next queue. continue 2; Loading core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestException.php +6 −1 Original line number Diff line number Diff line Loading @@ -6,13 +6,18 @@ /** * @QueueWorker( * id = "cron_queue_test_exception", * id = \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestException::PLUGIN_ID, * title = @Translation("Exception test"), * cron = {"time" = 1} * ) */ class CronQueueTestException extends QueueWorkerBase { /** * The plugin ID. */ public const PLUGIN_ID = 'cron_queue_test_exception'; /** * {@inheritdoc} */ Loading core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestRequeueException.php +6 −1 Original line number Diff line number Diff line Loading @@ -7,13 +7,18 @@ /** * @QueueWorker( * id = "cron_queue_test_requeue_exception", * id = \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestRequeueException::PLUGIN_ID, * title = @Translation("RequeueException test"), * cron = {"time" = 60} * ) */ class CronQueueTestRequeueException extends QueueWorkerBase { /** * The plugin ID. */ public const PLUGIN_ID = 'cron_queue_test_requeue_exception'; /** * {@inheritdoc} */ Loading core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestBrokenQueue.php→core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestSuspendQueue.php +9 −4 Original line number Diff line number Diff line Loading @@ -7,18 +7,23 @@ /** * @QueueWorker( * id = "cron_queue_test_broken_queue", * title = @Translation("Broken queue test"), * id = \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestSuspendQueue::PLUGIN_ID, * title = @Translation("Suspend queue test"), * cron = {"time" = 60} * ) */ class CronQueueTestBrokenQueue extends QueueWorkerBase { class CronQueueTestSuspendQueue extends QueueWorkerBase { /** * The plugin ID. */ public const PLUGIN_ID = 'cron_queue_test_suspend'; /** * {@inheritdoc} */ public function processItem($data) { if ($data == 'crash') { if ($data === 'suspend') { throw new SuspendQueueException('The queue is broken.'); } // Do nothing otherwise. Loading core/modules/system/tests/src/Kernel/System/CronQueueTest.php +89 −11 Original line number Diff line number Diff line Loading @@ -3,12 +3,18 @@ namespace Drupal\Tests\system\Kernel\System; use Drupal\Core\Database\Database; use Drupal\Core\DependencyInjection\ContainerBuilder; use Drupal\Core\Logger\RfcLogLevel; use Drupal\Core\Queue\DatabaseQueue; use Drupal\Core\Queue\Memory; use Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestException; use Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestRequeueException; use Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestSuspendQueue; use Drupal\Core\Queue\QueueWorkerManagerInterface; use Drupal\KernelTests\KernelTestBase; use Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestDatabaseDelayException; use Prophecy\Argument; use Psr\Log\LoggerInterface; /** * Tests the Cron Queue runner. Loading Loading @@ -47,10 +53,19 @@ class CronQueueTest extends KernelTestBase { */ protected $currentTime = 1000; /** * A logger for testing. * * @var \PHPUnit\Framework\MockObject\MockObject|\Psr\Log\LoggerInterface */ protected $logger; /** * {@inheritdoc} */ protected function setUp(): void { // Setup logger before register() is called. $this->logger = $this->createMock(LoggerInterface::class); parent::setUp(); $this->connection = Database::getConnection(); Loading Loading @@ -152,11 +167,31 @@ public function testLeaseTime() { } /** * Tests that exceptions thrown by workers are handled properly. * Tests that non-queue exceptions thrown by workers are handled properly. * * @see \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestException */ public function testExceptions() { public function testUncaughtExceptions() { $this->logger->expects($this->atLeast(2)) ->method('log') ->withConsecutive( [ $this->equalTo(RfcLogLevel::ERROR), $this->equalTo('%type: @message in %function (line %line of %file).'), $this->callback(function ($args) { return $args['@message'] === 'That is not supposed to happen.' && $args['exception'] instanceof \Exception; }), ], [ $this->equalTo(RfcLogLevel::INFO), $this->equalTo('Cron run completed.'), $this->anything(), ], ); // Get the queue to test the normal Exception. $queue = $this->container->get('queue')->get('cron_queue_test_exception'); $queue = $this->container->get('queue')->get(CronQueueTestException::PLUGIN_ID); // Enqueue an item for processing. $queue->createItem([$this->randomMachineName() => $this->randomMachineName()]); Loading @@ -181,31 +216,64 @@ public function testExceptions() { $this->cron->run(); $this->assertEquals(2, \Drupal::state()->get('cron_queue_test_exception')); $this->assertEquals(0, $queue->numberOfItems(), 'Item was processed and removed from the queue.'); } /** * Tests suspend queue exception is handled properly. * * @see \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestSuspendQueue * @covers \Drupal\Core\Queue\SuspendQueueException */ public function testSuspendQueueException(): void { $this->logger->expects($this->atLeast(2)) ->method('log') ->withConsecutive( [ $this->equalTo(RfcLogLevel::DEBUG), $this->equalTo('A worker for @queue queue suspended further processing of the queue.'), $this->callback(function ($args) { return $args['@queue'] === CronQueueTestSuspendQueue::PLUGIN_ID; }), ], [ $this->equalTo(RfcLogLevel::INFO), $this->equalTo('Cron run completed.'), $this->anything(), ], ); // Get the queue to test the specific SuspendQueueException. $queue = $this->container->get('queue')->get('cron_queue_test_broken_queue'); $queue = \Drupal::queue(CronQueueTestSuspendQueue::PLUGIN_ID); // Enqueue several item for processing. $queue->createItem('process'); $queue->createItem('crash'); $queue->createItem('suspend'); $queue->createItem('ignored'); // Run cron; the worker for this queue should process as far as the crashing // item. // Run cron; the worker for this queue should process as far as the // suspending item. $this->cron->run(); // Only one item should have been processed. $this->assertEquals(2, $queue->numberOfItems(), 'Failing queue stopped processing at the failing item.'); $this->assertEquals(2, $queue->numberOfItems(), 'Suspended queue stopped processing at the suspending item.'); // Check the items remaining in the queue. The item that throws the // exception gets released by cron, so we can claim it again to check it. $item = $queue->claimItem(); $this->assertEquals('crash', $item->data, 'Failing item remains in the queue.'); $this->assertEquals('suspend', $item->data, 'Suspending item remains in the queue.'); $item = $queue->claimItem(); $this->assertEquals('ignored', $item->data, 'Item beyond the failing item remains in the queue.'); $this->assertEquals('ignored', $item->data, 'Item beyond the suspending item remains in the queue.'); } /** * Tests requeue exception is handled properly. * * @see \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestRequeueException * @covers \Drupal\Core\Queue\RequeueException */ public function testRequeueException(): void { // Test the requeueing functionality. $queue = $this->container->get('queue')->get('cron_queue_test_requeue_exception'); $queue = $this->container->get('queue')->get(CronQueueTestRequeueException::PLUGIN_ID); $queue->createItem([]); $this->cron->run(); Loading Loading @@ -261,4 +329,14 @@ public function testQueueWorkerManagerSafeguard(): void { $this->assertEquals(QueueWorkerManagerInterface::DEFAULT_QUEUE_CRON_TIME, $definition['cron']['time']); } /** * {@inheritdoc} */ public function register(ContainerBuilder $container) { parent::register($container); $container->register('test_logger', get_class($this->logger)) ->addTag('logger'); $container->set('test_logger', $this->logger); } } Loading
core/lib/Drupal/Core/Cron.php +5 −3 Original line number Diff line number Diff line Loading @@ -200,11 +200,13 @@ protected function processQueues() { $queue->releaseItem($item); } catch (SuspendQueueException $e) { // If the worker indicates there is a problem with the whole queue, // release the item and skip to the next queue. // If the worker indicates the whole queue should be skipped, // release the item and go to the next queue. $queue->releaseItem($item); watchdog_exception('cron', $e); $this->logger->debug('A worker for @queue queue suspended further processing of the queue.', [ '@queue' => $queue_name, ]); // Skip to the next queue. continue 2; Loading
core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestException.php +6 −1 Original line number Diff line number Diff line Loading @@ -6,13 +6,18 @@ /** * @QueueWorker( * id = "cron_queue_test_exception", * id = \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestException::PLUGIN_ID, * title = @Translation("Exception test"), * cron = {"time" = 1} * ) */ class CronQueueTestException extends QueueWorkerBase { /** * The plugin ID. */ public const PLUGIN_ID = 'cron_queue_test_exception'; /** * {@inheritdoc} */ Loading
core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestRequeueException.php +6 −1 Original line number Diff line number Diff line Loading @@ -7,13 +7,18 @@ /** * @QueueWorker( * id = "cron_queue_test_requeue_exception", * id = \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestRequeueException::PLUGIN_ID, * title = @Translation("RequeueException test"), * cron = {"time" = 60} * ) */ class CronQueueTestRequeueException extends QueueWorkerBase { /** * The plugin ID. */ public const PLUGIN_ID = 'cron_queue_test_requeue_exception'; /** * {@inheritdoc} */ Loading
core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestBrokenQueue.php→core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestSuspendQueue.php +9 −4 Original line number Diff line number Diff line Loading @@ -7,18 +7,23 @@ /** * @QueueWorker( * id = "cron_queue_test_broken_queue", * title = @Translation("Broken queue test"), * id = \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestSuspendQueue::PLUGIN_ID, * title = @Translation("Suspend queue test"), * cron = {"time" = 60} * ) */ class CronQueueTestBrokenQueue extends QueueWorkerBase { class CronQueueTestSuspendQueue extends QueueWorkerBase { /** * The plugin ID. */ public const PLUGIN_ID = 'cron_queue_test_suspend'; /** * {@inheritdoc} */ public function processItem($data) { if ($data == 'crash') { if ($data === 'suspend') { throw new SuspendQueueException('The queue is broken.'); } // Do nothing otherwise. Loading
core/modules/system/tests/src/Kernel/System/CronQueueTest.php +89 −11 Original line number Diff line number Diff line Loading @@ -3,12 +3,18 @@ namespace Drupal\Tests\system\Kernel\System; use Drupal\Core\Database\Database; use Drupal\Core\DependencyInjection\ContainerBuilder; use Drupal\Core\Logger\RfcLogLevel; use Drupal\Core\Queue\DatabaseQueue; use Drupal\Core\Queue\Memory; use Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestException; use Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestRequeueException; use Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestSuspendQueue; use Drupal\Core\Queue\QueueWorkerManagerInterface; use Drupal\KernelTests\KernelTestBase; use Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestDatabaseDelayException; use Prophecy\Argument; use Psr\Log\LoggerInterface; /** * Tests the Cron Queue runner. Loading Loading @@ -47,10 +53,19 @@ class CronQueueTest extends KernelTestBase { */ protected $currentTime = 1000; /** * A logger for testing. * * @var \PHPUnit\Framework\MockObject\MockObject|\Psr\Log\LoggerInterface */ protected $logger; /** * {@inheritdoc} */ protected function setUp(): void { // Setup logger before register() is called. $this->logger = $this->createMock(LoggerInterface::class); parent::setUp(); $this->connection = Database::getConnection(); Loading Loading @@ -152,11 +167,31 @@ public function testLeaseTime() { } /** * Tests that exceptions thrown by workers are handled properly. * Tests that non-queue exceptions thrown by workers are handled properly. * * @see \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestException */ public function testExceptions() { public function testUncaughtExceptions() { $this->logger->expects($this->atLeast(2)) ->method('log') ->withConsecutive( [ $this->equalTo(RfcLogLevel::ERROR), $this->equalTo('%type: @message in %function (line %line of %file).'), $this->callback(function ($args) { return $args['@message'] === 'That is not supposed to happen.' && $args['exception'] instanceof \Exception; }), ], [ $this->equalTo(RfcLogLevel::INFO), $this->equalTo('Cron run completed.'), $this->anything(), ], ); // Get the queue to test the normal Exception. $queue = $this->container->get('queue')->get('cron_queue_test_exception'); $queue = $this->container->get('queue')->get(CronQueueTestException::PLUGIN_ID); // Enqueue an item for processing. $queue->createItem([$this->randomMachineName() => $this->randomMachineName()]); Loading @@ -181,31 +216,64 @@ public function testExceptions() { $this->cron->run(); $this->assertEquals(2, \Drupal::state()->get('cron_queue_test_exception')); $this->assertEquals(0, $queue->numberOfItems(), 'Item was processed and removed from the queue.'); } /** * Tests suspend queue exception is handled properly. * * @see \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestSuspendQueue * @covers \Drupal\Core\Queue\SuspendQueueException */ public function testSuspendQueueException(): void { $this->logger->expects($this->atLeast(2)) ->method('log') ->withConsecutive( [ $this->equalTo(RfcLogLevel::DEBUG), $this->equalTo('A worker for @queue queue suspended further processing of the queue.'), $this->callback(function ($args) { return $args['@queue'] === CronQueueTestSuspendQueue::PLUGIN_ID; }), ], [ $this->equalTo(RfcLogLevel::INFO), $this->equalTo('Cron run completed.'), $this->anything(), ], ); // Get the queue to test the specific SuspendQueueException. $queue = $this->container->get('queue')->get('cron_queue_test_broken_queue'); $queue = \Drupal::queue(CronQueueTestSuspendQueue::PLUGIN_ID); // Enqueue several item for processing. $queue->createItem('process'); $queue->createItem('crash'); $queue->createItem('suspend'); $queue->createItem('ignored'); // Run cron; the worker for this queue should process as far as the crashing // item. // Run cron; the worker for this queue should process as far as the // suspending item. $this->cron->run(); // Only one item should have been processed. $this->assertEquals(2, $queue->numberOfItems(), 'Failing queue stopped processing at the failing item.'); $this->assertEquals(2, $queue->numberOfItems(), 'Suspended queue stopped processing at the suspending item.'); // Check the items remaining in the queue. The item that throws the // exception gets released by cron, so we can claim it again to check it. $item = $queue->claimItem(); $this->assertEquals('crash', $item->data, 'Failing item remains in the queue.'); $this->assertEquals('suspend', $item->data, 'Suspending item remains in the queue.'); $item = $queue->claimItem(); $this->assertEquals('ignored', $item->data, 'Item beyond the failing item remains in the queue.'); $this->assertEquals('ignored', $item->data, 'Item beyond the suspending item remains in the queue.'); } /** * Tests requeue exception is handled properly. * * @see \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestRequeueException * @covers \Drupal\Core\Queue\RequeueException */ public function testRequeueException(): void { // Test the requeueing functionality. $queue = $this->container->get('queue')->get('cron_queue_test_requeue_exception'); $queue = $this->container->get('queue')->get(CronQueueTestRequeueException::PLUGIN_ID); $queue->createItem([]); $this->cron->run(); Loading Loading @@ -261,4 +329,14 @@ public function testQueueWorkerManagerSafeguard(): void { $this->assertEquals(QueueWorkerManagerInterface::DEFAULT_QUEUE_CRON_TIME, $definition['cron']['time']); } /** * {@inheritdoc} */ public function register(ContainerBuilder $container) { parent::register($container); $container->register('test_logger', get_class($this->logger)) ->addTag('logger'); $container->set('test_logger', $this->logger); } }