Commit 082bf59e authored by alexpott's avatar alexpott

Issue #1524550 by superspring, joachim, David Hernández, marthinal,...

Issue #1524550 by superspring, joachim, David Hernández, marthinal, socketwench, subson | jfinkel: Fixed queue cron workers can't signal a broken queue.
parent d8cdb35f
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
use Drupal\Core\Session\AccountProxyInterface; use Drupal\Core\Session\AccountProxyInterface;
use Drupal\Core\Session\AnonymousUserSession; use Drupal\Core\Session\AnonymousUserSession;
use Drupal\Core\Session\SessionManagerInterface; use Drupal\Core\Session\SessionManagerInterface;
use Drupal\Core\Queue\SuspendQueueException;
/** /**
* The Drupal core Cron service. * The Drupal core Cron service.
...@@ -167,9 +168,19 @@ protected function processQueues() { ...@@ -167,9 +168,19 @@ protected function processQueues() {
call_user_func_array($callback, array($item->data)); call_user_func_array($callback, array($item->data));
$queue->deleteItem($item); $queue->deleteItem($item);
} }
catch (SuspendQueueException $e) {
// If the worker indicates there is a problem with the whole queue,
// release the item and skip to the next queue.
$queue->releaseItem($item);
watchdog_exception('cron', $e);
// Skip to the next queue.
continue 2;
}
catch (\Exception $e) { catch (\Exception $e) {
// In case of exception log it and leave the item in the queue // In case of any other kind of exception, log it and leave the item
// to be processed again later. // in the queue to be processed again later.
watchdog_exception('cron', $e); watchdog_exception('cron', $e);
} }
} }
......
<?php
/**
* @file
* Definition of Drupal\Core\Queue\SuspendQueueException.
*/
namespace Drupal\Core\Queue;
/**
* Exception class to throw to indicate that a cron queue should be skipped.
*
* An implementation of callback_queue_worker() may throw this class of
* exception to indicate that processing of the whole queue should be skipped.
* This should be thrown rather than a normal Exception if the problem
* encountered by the queue worker is such that it can be deduced that workers
* of subsequent items would encounter it too. For example, if a remote site
* that the queue worker depends on appears to be inaccessible.
*/
class SuspendQueueException extends \RuntimeException {}
...@@ -33,7 +33,7 @@ public static function getInfo() { ...@@ -33,7 +33,7 @@ public static function getInfo() {
* Tests that exceptions thrown by workers are handled properly. * Tests that exceptions thrown by workers are handled properly.
*/ */
public function testExceptions() { public function testExceptions() {
// Get the queue to test the normal Exception.
$queue = $this->container->get('queue')->get('cron_queue_test_exception'); $queue = $this->container->get('queue')->get('cron_queue_test_exception');
// Enqueue an item for processing. // Enqueue an item for processing.
...@@ -45,5 +45,27 @@ public function testExceptions() { ...@@ -45,5 +45,27 @@ public function testExceptions() {
// The item should be left in the queue. // The item should be left in the queue.
$this->assertEqual($queue->numberOfItems(), 1, 'Failing item still in the queue after throwing an exception.'); $this->assertEqual($queue->numberOfItems(), 1, 'Failing item still in the queue after throwing an exception.');
// Get the queue to test the specific SuspendQueueException.
$queue = $this->container->get('queue')->get('cron_queue_test_broken_queue');
// Enqueue several item for processing.
$queue->createItem('process');
$queue->createItem('crash');
$queue->createItem('ignored');
// Run cron; the worker for this queue should process as far as the crashing
// item.
$this->cronRun();
// Only one item should have been processed.
$this->assertEqual($queue->numberOfItems(), 2, 'Failing queue stopped processing at the failing item.');
// Check the items remaining in the queue. The item that throws the
// exception gets released by cron, so we can claim it again to check it.
$item = $queue->claimItem();
$this->assertEqual($item->data, 'crash', 'Failing item remains in the queue.');
$item = $queue->claimItem();
$this->assertEqual($item->data, 'ignored', 'Item beyond the failing item remains in the queue.');
} }
} }
...@@ -168,6 +168,13 @@ function hook_queue_info_alter(&$queues) { ...@@ -168,6 +168,13 @@ function hook_queue_info_alter(&$queues) {
* The worker callback may throw an exception to indicate there was a problem. * The worker callback may throw an exception to indicate there was a problem.
* The cron process will log the exception, and leave the item in the queue to * The cron process will log the exception, and leave the item in the queue to
* be processed again later. * be processed again later.
* @throws \Drupal\Core\Queue\SuspendQueueException
* More specifically, a SuspendQueueException should be thrown when the
* callback is aware that the problem will affect all subsequent workers of
* its queue. For example, a callback that makes HTTP requests may find that
* the remote server is not responding. The cron process will behave as with a
* normal Exception, and in addition will not attempt to process further items
* from the current item's queue during the current cron run.
* *
* @see \Drupal\Core\Cron::run() * @see \Drupal\Core\Cron::run()
*/ */
......
...@@ -9,9 +9,30 @@ function cron_queue_test_queue_info() { ...@@ -9,9 +9,30 @@ function cron_queue_test_queue_info() {
'time' => 60, 'time' => 60,
), ),
); );
$queues['cron_queue_test_broken_queue'] = array(
'title' => t('Broken queue test'),
'worker callback' => 'cron_queue_test_broken_queue',
// Only needed if this queue should be processed by cron.
'cron' => array(
'time' => 60,
),
);
return $queues; return $queues;
} }
function cron_queue_test_exception($item) { function cron_queue_test_exception($item) {
throw new Exception('That is not supposed to happen.'); throw new Exception('That is not supposed to happen.');
} }
/**
* Implements callback_queue_worker().
*
* This queue is declared broken if the queue item data is 'crash'.
*/
function cron_queue_test_broken_queue($queue_item_data) {
if ($queue_item_data == 'crash') {
throw new \Drupal\Core\Queue\SuspendQueueException('The queue is broken.');
}
// Do nothing otherwise.
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment