Commit 6377fc1f authored by catch's avatar catch

Issue #2667294 by chx: Allow for peaceful requeueing

parent 2be16d8a
......@@ -9,6 +9,7 @@
use Drupal\Core\Extension\ModuleHandlerInterface;
use Drupal\Core\Queue\QueueWorkerManagerInterface;
use Drupal\Core\Queue\RequeueException;
use Drupal\Core\State\StateInterface;
use Drupal\Core\Lock\LockBackendInterface;
use Drupal\Core\Queue\QueueFactory;
......@@ -168,6 +169,10 @@ protected function processQueues() {
$queue_worker->processItem($item->data);
$queue->deleteItem($item);
}
catch (RequeueException $e) {
// The worker requested the task be immediately requeued.
$queue->releaseItem($item);
}
catch (SuspendQueueException $e) {
// If the worker indicates there is a problem with the whole queue,
// release the item and skip to the next queue.
......
......@@ -26,6 +26,9 @@ interface QueueWorkerInterface extends PluginInspectionInterface {
* The data that was passed to
* \Drupal\Core\Queue\QueueInterface::createItem() when the item was queued.
*
* @throws \Drupal\Core\Queue\RequeueException
* Processing is not yet finished. This will allow another process to claim
* the item immediately.
* @throws \Exception
* A QueueWorker plugin may throw an exception to indicate there was a
* problem. The cron process will log the exception, and leave the item in
......
<?php
/**
* @file
* Contains \Drupal\Core\Queue\RequeueException.
*/
namespace Drupal\Core\Queue;
/**
* Throw this exception to release the item allowing it to be processed again.
*/
class RequeueException extends \RuntimeException {}
......@@ -61,5 +61,13 @@ public function testExceptions() {
$this->assertEqual($item->data, 'crash', 'Failing item remains in the queue.');
$item = $queue->claimItem();
$this->assertEqual($item->data, 'ignored', 'Item beyond the failing item remains in the queue.');
// Test the requeueing functionality.
$queue = $this->container->get('queue')->get('cron_queue_test_requeue_exception');
$queue->createItem([]);
$this->cronRun();
$this->assertEqual(\Drupal::state()->get('cron_queue_test_requeue_exception'), 2);
$this->assertFalse($queue->numberOfItems());
}
}
<?php
/**
* @file
* Contains \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestRequeueException.
*/
namespace Drupal\cron_queue_test\Plugin\QueueWorker;
use Drupal\Core\Queue\QueueWorkerBase;
use Drupal\Core\Queue\RequeueException;
/**
* @QueueWorker(
* id = "cron_queue_test_requeue_exception",
* title = @Translation("RequeueException test"),
* cron = {"time" = 60}
* )
*/
class CronQueueTestRequeueException extends QueueWorkerBase {
/**
* {@inheritdoc}
*/
public function processItem($data) {
$state = \Drupal::state();
if (!$state->get('cron_queue_test_requeue_exception')) {
$state->set('cron_queue_test_requeue_exception', 1);
throw new RequeueException('I am not done yet!');
}
else {
$state->set('cron_queue_test_requeue_exception', 2);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment