Commit ccae3c04 authored by catch's avatar catch
Browse files

Issue #2867001 by dpi, acbramley, SpadXIII, mstrelan, smustgrave, neclimdul:...

Issue #2867001 by dpi, acbramley, SpadXIII, mstrelan, smustgrave, neclimdul: Dont treat suspending of a queue as erroneous
parent 6acfec45
Loading
Loading
Loading
Loading
+5 −3
Original line number Diff line number Diff line
@@ -200,11 +200,13 @@ protected function processQueues() {
            $queue->releaseItem($item);
          }
          catch (SuspendQueueException $e) {
            // If the worker indicates there is a problem with the whole queue,
            // release the item and skip to the next queue.
            // If the worker indicates the whole queue should be skipped,
            // release the item and go to the next queue.
            $queue->releaseItem($item);

            watchdog_exception('cron', $e);
            $this->logger->debug('A worker for @queue queue suspended further processing of the queue.', [
              '@queue' => $queue_name,
            ]);

            // Skip to the next queue.
            continue 2;
+6 −1
Original line number Diff line number Diff line
@@ -6,13 +6,18 @@

/**
 * @QueueWorker(
 *   id = "cron_queue_test_exception",
 *   id = \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestException::PLUGIN_ID,
 *   title = @Translation("Exception test"),
 *   cron = {"time" = 1}
 * )
 */
class CronQueueTestException extends QueueWorkerBase {

  /**
   * The plugin ID.
   */
  public const PLUGIN_ID = 'cron_queue_test_exception';

  /**
   * {@inheritdoc}
   */
+6 −1
Original line number Diff line number Diff line
@@ -7,13 +7,18 @@

/**
 * @QueueWorker(
 *   id = "cron_queue_test_requeue_exception",
 *   id = \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestRequeueException::PLUGIN_ID,
 *   title = @Translation("RequeueException test"),
 *   cron = {"time" = 60}
 * )
 */
class CronQueueTestRequeueException extends QueueWorkerBase {

  /**
   * The plugin ID.
   */
  public const PLUGIN_ID = 'cron_queue_test_requeue_exception';

  /**
   * {@inheritdoc}
   */
+9 −4
Original line number Diff line number Diff line
@@ -7,18 +7,23 @@

/**
 * @QueueWorker(
 *   id = "cron_queue_test_broken_queue",
 *   title = @Translation("Broken queue test"),
 *   id = \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestSuspendQueue::PLUGIN_ID,
 *   title = @Translation("Suspend queue test"),
 *   cron = {"time" = 60}
 * )
 */
class CronQueueTestBrokenQueue extends QueueWorkerBase {
class CronQueueTestSuspendQueue extends QueueWorkerBase {

  /**
   * The plugin ID.
   */
  public const PLUGIN_ID = 'cron_queue_test_suspend';

  /**
   * {@inheritdoc}
   */
  public function processItem($data) {
    if ($data == 'crash') {
    if ($data === 'suspend') {
      throw new SuspendQueueException('The queue is broken.');
    }
    // Do nothing otherwise.
+89 −11
Original line number Diff line number Diff line
@@ -3,12 +3,18 @@
namespace Drupal\Tests\system\Kernel\System;

use Drupal\Core\Database\Database;
use Drupal\Core\DependencyInjection\ContainerBuilder;
use Drupal\Core\Logger\RfcLogLevel;
use Drupal\Core\Queue\DatabaseQueue;
use Drupal\Core\Queue\Memory;
use Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestException;
use Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestRequeueException;
use Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestSuspendQueue;
use Drupal\Core\Queue\QueueWorkerManagerInterface;
use Drupal\KernelTests\KernelTestBase;
use Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestDatabaseDelayException;
use Prophecy\Argument;
use Psr\Log\LoggerInterface;

/**
 * Tests the Cron Queue runner.
@@ -47,10 +53,19 @@ class CronQueueTest extends KernelTestBase {
   */
  protected $currentTime = 1000;

  /**
   * A logger for testing.
   *
   * @var \PHPUnit\Framework\MockObject\MockObject|\Psr\Log\LoggerInterface
   */
  protected $logger;

  /**
   * {@inheritdoc}
   */
  protected function setUp(): void {
    // Setup logger before register() is called.
    $this->logger = $this->createMock(LoggerInterface::class);
    parent::setUp();

    $this->connection = Database::getConnection();
@@ -152,11 +167,31 @@ public function testLeaseTime() {
  }

  /**
   * Tests that exceptions thrown by workers are handled properly.
   * Tests that non-queue exceptions thrown by workers are handled properly.
   *
   * @see \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestException
   */
  public function testExceptions() {
  public function testUncaughtExceptions() {
    $this->logger->expects($this->atLeast(2))
      ->method('log')
      ->withConsecutive(
        [
          $this->equalTo(RfcLogLevel::ERROR),
          $this->equalTo('%type: @message in %function (line %line of %file).'),
          $this->callback(function ($args) {
            return $args['@message'] === 'That is not supposed to happen.' &&
              $args['exception'] instanceof \Exception;
          }),
        ],
        [
          $this->equalTo(RfcLogLevel::INFO),
          $this->equalTo('Cron run completed.'),
          $this->anything(),
        ],
      );

    // Get the queue to test the normal Exception.
    $queue = $this->container->get('queue')->get('cron_queue_test_exception');
    $queue = $this->container->get('queue')->get(CronQueueTestException::PLUGIN_ID);

    // Enqueue an item for processing.
    $queue->createItem([$this->randomMachineName() => $this->randomMachineName()]);
@@ -181,31 +216,64 @@ public function testExceptions() {
    $this->cron->run();
    $this->assertEquals(2, \Drupal::state()->get('cron_queue_test_exception'));
    $this->assertEquals(0, $queue->numberOfItems(), 'Item was processed and removed from the queue.');
  }

  /**
   * Tests suspend queue exception is handled properly.
   *
   * @see \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestSuspendQueue
   * @covers \Drupal\Core\Queue\SuspendQueueException
   */
  public function testSuspendQueueException(): void {
    $this->logger->expects($this->atLeast(2))
      ->method('log')
      ->withConsecutive(
        [
          $this->equalTo(RfcLogLevel::DEBUG),
          $this->equalTo('A worker for @queue queue suspended further processing of the queue.'),
          $this->callback(function ($args) {
            return $args['@queue'] === CronQueueTestSuspendQueue::PLUGIN_ID;
          }),
        ],
        [
          $this->equalTo(RfcLogLevel::INFO),
          $this->equalTo('Cron run completed.'),
          $this->anything(),
        ],
      );

    // Get the queue to test the specific SuspendQueueException.
    $queue = $this->container->get('queue')->get('cron_queue_test_broken_queue');
    $queue = \Drupal::queue(CronQueueTestSuspendQueue::PLUGIN_ID);

    // Enqueue several item for processing.
    $queue->createItem('process');
    $queue->createItem('crash');
    $queue->createItem('suspend');
    $queue->createItem('ignored');

    // Run cron; the worker for this queue should process as far as the crashing
    // item.
    // Run cron; the worker for this queue should process as far as the
    // suspending item.
    $this->cron->run();

    // Only one item should have been processed.
    $this->assertEquals(2, $queue->numberOfItems(), 'Failing queue stopped processing at the failing item.');
    $this->assertEquals(2, $queue->numberOfItems(), 'Suspended queue stopped processing at the suspending item.');

    // Check the items remaining in the queue. The item that throws the
    // exception gets released by cron, so we can claim it again to check it.
    $item = $queue->claimItem();
    $this->assertEquals('crash', $item->data, 'Failing item remains in the queue.');
    $this->assertEquals('suspend', $item->data, 'Suspending item remains in the queue.');
    $item = $queue->claimItem();
    $this->assertEquals('ignored', $item->data, 'Item beyond the failing item remains in the queue.');
    $this->assertEquals('ignored', $item->data, 'Item beyond the suspending item remains in the queue.');
  }

  /**
   * Tests requeue exception is handled properly.
   *
   * @see \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestRequeueException
   * @covers \Drupal\Core\Queue\RequeueException
   */
  public function testRequeueException(): void {
    // Test the requeueing functionality.
    $queue = $this->container->get('queue')->get('cron_queue_test_requeue_exception');
    $queue = $this->container->get('queue')->get(CronQueueTestRequeueException::PLUGIN_ID);
    $queue->createItem([]);
    $this->cron->run();

@@ -261,4 +329,14 @@ public function testQueueWorkerManagerSafeguard(): void {
    $this->assertEquals(QueueWorkerManagerInterface::DEFAULT_QUEUE_CRON_TIME, $definition['cron']['time']);
  }

  /**
   * {@inheritdoc}
   */
  public function register(ContainerBuilder $container) {
    parent::register($container);
    $container->register('test_logger', get_class($this->logger))
      ->addTag('logger');
    $container->set('test_logger', $this->logger);
  }

}