From 082bf59e86c1471fc12a1031e7413197eb743e23 Mon Sep 17 00:00:00 2001 From: Alex Pott <alex.a.pott@googlemail.com> Date: Fri, 23 May 2014 14:46:49 +0100 Subject: [PATCH] =?UTF-8?q?Issue=20#1524550=20by=20superspring,=20joachim,?= =?UTF-8?q?=20David=20Hern=C3=A1ndez,=20marthinal,=20socketwench,=20subson?= =?UTF-8?q?=20|=20jfinkel:=20Fixed=20queue=20cron=20workers=20can't=20sign?= =?UTF-8?q?al=20a=20broken=20queue.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/lib/Drupal/Core/Cron.php | 15 ++++++++++-- .../Core/Queue/SuspendQueueException.php | 20 ++++++++++++++++ .../system/Tests/System/CronQueueTest.php | 24 ++++++++++++++++++- core/modules/system/system.api.php | 7 ++++++ .../cron_queue_test/cron_queue_test.module | 21 ++++++++++++++++ 5 files changed, 84 insertions(+), 3 deletions(-) create mode 100644 core/lib/Drupal/Core/Queue/SuspendQueueException.php diff --git a/core/lib/Drupal/Core/Cron.php b/core/lib/Drupal/Core/Cron.php index 046216e9efa3..b639f5c43636 100644 --- a/core/lib/Drupal/Core/Cron.php +++ b/core/lib/Drupal/Core/Cron.php @@ -14,6 +14,7 @@ use Drupal\Core\Session\AccountProxyInterface; use Drupal\Core\Session\AnonymousUserSession; use Drupal\Core\Session\SessionManagerInterface; +use Drupal\Core\Queue\SuspendQueueException; /** * The Drupal core Cron service. @@ -167,9 +168,19 @@ protected function processQueues() { call_user_func_array($callback, array($item->data)); $queue->deleteItem($item); } + catch (SuspendQueueException $e) { + // If the worker indicates there is a problem with the whole queue, + // release the item and skip to the next queue. + $queue->releaseItem($item); + + watchdog_exception('cron', $e); + + // Skip to the next queue. + continue 2; + } catch (\Exception $e) { - // In case of exception log it and leave the item in the queue - // to be processed again later. + // In case of any other kind of exception, log it and leave the item + // in the queue to be processed again later. watchdog_exception('cron', $e); } } diff --git a/core/lib/Drupal/Core/Queue/SuspendQueueException.php b/core/lib/Drupal/Core/Queue/SuspendQueueException.php new file mode 100644 index 000000000000..50c3c132cb79 --- /dev/null +++ b/core/lib/Drupal/Core/Queue/SuspendQueueException.php @@ -0,0 +1,20 @@ +<?php + +/** + * @file + * Definition of Drupal\Core\Queue\SuspendQueueException. + */ + +namespace Drupal\Core\Queue; + +/** + * Exception class to throw to indicate that a cron queue should be skipped. + * + * An implementation of callback_queue_worker() may throw this class of + * exception to indicate that processing of the whole queue should be skipped. + * This should be thrown rather than a normal Exception if the problem + * encountered by the queue worker is such that it can be deduced that workers + * of subsequent items would encounter it too. For example, if a remote site + * that the queue worker depends on appears to be inaccessible. + */ +class SuspendQueueException extends \RuntimeException {} diff --git a/core/modules/system/lib/Drupal/system/Tests/System/CronQueueTest.php b/core/modules/system/lib/Drupal/system/Tests/System/CronQueueTest.php index 2bbd872b11b1..389e880aa684 100644 --- a/core/modules/system/lib/Drupal/system/Tests/System/CronQueueTest.php +++ b/core/modules/system/lib/Drupal/system/Tests/System/CronQueueTest.php @@ -33,7 +33,7 @@ public static function getInfo() { * Tests that exceptions thrown by workers are handled properly. */ public function testExceptions() { - + // Get the queue to test the normal Exception. $queue = $this->container->get('queue')->get('cron_queue_test_exception'); // Enqueue an item for processing. @@ -45,5 +45,27 @@ public function testExceptions() { // The item should be left in the queue. $this->assertEqual($queue->numberOfItems(), 1, 'Failing item still in the queue after throwing an exception.'); + + // Get the queue to test the specific SuspendQueueException. + $queue = $this->container->get('queue')->get('cron_queue_test_broken_queue'); + + // Enqueue several item for processing. + $queue->createItem('process'); + $queue->createItem('crash'); + $queue->createItem('ignored'); + + // Run cron; the worker for this queue should process as far as the crashing + // item. + $this->cronRun(); + + // Only one item should have been processed. + $this->assertEqual($queue->numberOfItems(), 2, 'Failing queue stopped processing at the failing item.'); + + // Check the items remaining in the queue. The item that throws the + // exception gets released by cron, so we can claim it again to check it. + $item = $queue->claimItem(); + $this->assertEqual($item->data, 'crash', 'Failing item remains in the queue.'); + $item = $queue->claimItem(); + $this->assertEqual($item->data, 'ignored', 'Item beyond the failing item remains in the queue.'); } } diff --git a/core/modules/system/system.api.php b/core/modules/system/system.api.php index 7574ed98009f..c7c8a498937e 100644 --- a/core/modules/system/system.api.php +++ b/core/modules/system/system.api.php @@ -168,6 +168,13 @@ function hook_queue_info_alter(&$queues) { * The worker callback may throw an exception to indicate there was a problem. * The cron process will log the exception, and leave the item in the queue to * be processed again later. + * @throws \Drupal\Core\Queue\SuspendQueueException + * More specifically, a SuspendQueueException should be thrown when the + * callback is aware that the problem will affect all subsequent workers of + * its queue. For example, a callback that makes HTTP requests may find that + * the remote server is not responding. The cron process will behave as with a + * normal Exception, and in addition will not attempt to process further items + * from the current item's queue during the current cron run. * * @see \Drupal\Core\Cron::run() */ diff --git a/core/modules/system/tests/modules/cron_queue_test/cron_queue_test.module b/core/modules/system/tests/modules/cron_queue_test/cron_queue_test.module index b88892feaf66..c46fdb903c62 100644 --- a/core/modules/system/tests/modules/cron_queue_test/cron_queue_test.module +++ b/core/modules/system/tests/modules/cron_queue_test/cron_queue_test.module @@ -9,9 +9,30 @@ function cron_queue_test_queue_info() { 'time' => 60, ), ); + $queues['cron_queue_test_broken_queue'] = array( + 'title' => t('Broken queue test'), + 'worker callback' => 'cron_queue_test_broken_queue', + // Only needed if this queue should be processed by cron. + 'cron' => array( + 'time' => 60, + ), + ); + return $queues; } function cron_queue_test_exception($item) { throw new Exception('That is not supposed to happen.'); } + +/** + * Implements callback_queue_worker(). + * + * This queue is declared broken if the queue item data is 'crash'. + */ +function cron_queue_test_broken_queue($queue_item_data) { + if ($queue_item_data == 'crash') { + throw new \Drupal\Core\Queue\SuspendQueueException('The queue is broken.'); + } + // Do nothing otherwise. +} -- GitLab