Commit 2d0782d9 authored by catch's avatar catch
Browse files

Issue #3198868 by dpi, acbramley, larowlan: Add delay to queue suspend

parent 4dbf3f46
Loading
Loading
Loading
Loading
+6 −0
Original line number Diff line number Diff line
@@ -213,3 +213,9 @@ parameters:
    maxAge: false
    # Sets the Access-Control-Allow-Credentials header.
    supportsCredentials: false

  queue.config:
    # The maximum number of seconds to wait if a queue is temporarily suspended.
    # This is not applicable when a queue is suspended but does not specify
    # how long to wait before attempting to resume.
    suspendMaximumWait: 30
+3 −1
Original line number Diff line number Diff line
@@ -56,6 +56,8 @@ parameters:
    maxAge: false
    supportsCredentials: false
  tempstore.expire: 604800
  queue.config:
    suspendMaximumWait: 30.0
services:
  # Simple cache contexts, directly derived from the request context.
  cache_context.ip:
@@ -390,7 +392,7 @@ services:
    arguments: [ '@service_container' ]
  cron:
    class: Drupal\Core\Cron
    arguments: ['@module_handler', '@lock', '@queue', '@state', '@account_switcher', '@logger.channel.cron', '@plugin.manager.queue_worker', '@datetime.time']
    arguments: ['@module_handler', '@lock', '@queue', '@state', '@account_switcher', '@logger.channel.cron', '@plugin.manager.queue_worker', '@datetime.time', '%queue.config%']
    lazy: true
  Drupal\Core\CronInterface: '@cron'
  diff.formatter:
+148 −52
Original line number Diff line number Diff line
@@ -9,6 +9,8 @@
use Drupal\Core\Lock\LockBackendInterface;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Queue\DelayableQueueInterface;
use Drupal\Core\Queue\QueueInterface;
use Drupal\Core\Queue\QueueWorkerInterface;
use Drupal\Core\Queue\QueueWorkerManagerInterface;
use Drupal\Core\Queue\DelayedRequeueException;
use Drupal\Core\Queue\RequeueException;
@@ -80,6 +82,13 @@ class Cron implements CronInterface {
   */
  protected $time;

  /**
   * The queue config.
   *
   * @var array
   */
  protected array $queueConfig;

  /**
   * Constructs a cron object.
   *
@@ -97,10 +106,12 @@ class Cron implements CronInterface {
   *   A logger instance.
   * @param \Drupal\Core\Queue\QueueWorkerManagerInterface $queue_manager
   *   The queue plugin manager.
   * @param \Drupal\Component\Datetime\TimeInterface $time
   * @param \Drupal\Component\Datetime\TimeInterface|null $time
   *   The time service.
   * @param mixed[]|null $queue_config
   *   Queue configuration from the service container.
   */
  public function __construct(ModuleHandlerInterface $module_handler, LockBackendInterface $lock, QueueFactory $queue_factory, StateInterface $state, AccountSwitcherInterface $account_switcher, LoggerInterface $logger, QueueWorkerManagerInterface $queue_manager, TimeInterface $time = NULL) {
  public function __construct(ModuleHandlerInterface $module_handler, LockBackendInterface $lock, QueueFactory $queue_factory, StateInterface $state, AccountSwitcherInterface $account_switcher, LoggerInterface $logger, QueueWorkerManagerInterface $queue_manager, TimeInterface $time = NULL, ?array $queue_config = NULL) {
    $this->moduleHandler = $module_handler;
    $this->lock = $lock;
    $this->queueFactory = $queue_factory;
@@ -108,7 +119,18 @@ public function __construct(ModuleHandlerInterface $module_handler, LockBackendI
    $this->accountSwitcher = $account_switcher;
    $this->logger = $logger;
    $this->queueManager = $queue_manager;
    $this->time = $time ?: \Drupal::service('datetime.time');
    if (!isset($time)) {
      @trigger_error('Calling ' . __METHOD__ . '() without the $time argument is deprecated in drupal:10.1.0 and will be required in drupal:11.0.0. See https://www.drupal.org/node/3343743', E_USER_DEPRECATED);
      $time = \Drupal::service('datetime.time');
    }
    $this->time = $time;
    if (!isset($queue_config)) {
      @trigger_error('Calling ' . __METHOD__ . '() without the $queue_config argument is deprecated in drupal:10.1.0 and will be required in drupal:11.0.0. See https://www.drupal.org/node/3343743', E_USER_DEPRECATED);
      $queue_config = \Drupal::getContainer()->getParameter('queue.config');
    }
    $this->queueConfig = $queue_config + [
      'suspendMaximumWait' => 30.0,
    ];
  }

  /**
@@ -167,20 +189,86 @@ protected function setCronLastTime() {
   * Processes cron queues.
   */
  protected function processQueues() {
    // Grab the defined cron queues.
    foreach ($this->queueManager->getDefinitions() as $queue_name => $info) {
      if (isset($info['cron'])) {
    $max_wait = (float) $this->queueConfig['suspendMaximumWait'];

    $queues = array_filter(
      array_values($this->queueManager->getDefinitions()),
      function (array $queueInfo) {
        return isset($queueInfo['cron']);
      }
    );

    // Build a stack of queues to work on.
    /** @var array<array{process_from: int<0, max>, queue: \Drupal\Core\Queue\QueueInterface, worker: \Drupal\Core\Queue\QueueWorkerInterface}> $queues */
    $queues = array_map(function (array $queue_info) {
      $queue_name = $queue_info['id'];
      $queue = $this->queueFactory->get($queue_name);
      // Make sure every queue exists. There is no harm in trying to recreate
      // an existing queue.
        $this->queueFactory->get($queue_name)->createQueue();
      $queue->createQueue();
      $worker = $this->queueManager->createInstance($queue_name);
      return [
        // Set process_from to zero so each queue is always processed
        // immediately for the first time. This process_from timestamp will
        // change if a queue throws a delayable SuspendQueueException.
        'process_from' => 0,
        'queue' => $queue,
        'worker' => $worker,
      ];
    }, $queues);

    // Work through stack of queues, re-adding to the stack when a delay is
    // necessary.
    while ($item = array_shift($queues)) {
      [
        'queue' => $queue,
        'worker' => $worker,
        'process_from' => $process_from,
      ] = $item;

      // Each queue will be processed immediately when it is reached for the
      // first time, as zero > currentTime will never be true.
      if ($process_from > $this->time->getCurrentMicroTime()) {
        $this->usleep(round($process_from - $this->time->getCurrentMicroTime(), 3) * 1000000);
      }

        $queue_worker = $this->queueManager->createInstance($queue_name);
        $end = $this->time->getCurrentTime() + $info['cron']['time'];
        $queue = $this->queueFactory->get($queue_name);
        $lease_time = $info['cron']['time'];
      try {
        $this->processQueue($queue, $worker);
      }
      catch (SuspendQueueException $e) {
        // Return to this queue after processing other queues if the delay is
        // within the threshold.
        if ($e->isDelayable() && ($e->getDelay() < $max_wait)) {
          $item['process_from'] = $this->time->getCurrentMicroTime() + $e->getDelay();
          // Place this queue back in the stack for processing later.
          array_push($queues, $item);
        }
      }

      // Reorder the queue by next 'process_from' timestamp.
      usort($queues, function (array $queueA, array $queueB) {
        return $queueA['process_from'] <=> $queueB['process_from'];
      });
    }
  }

  /**
   * Processes a cron queue.
   *
   * @param \Drupal\Core\Queue\QueueInterface $queue
   *   The queue.
   * @param \Drupal\Core\Queue\QueueWorkerInterface $worker
   *   The queue worker.
   *
   * @throws \Drupal\Core\Queue\SuspendQueueException
   *   If the queue was suspended.
   */
  protected function processQueue(QueueInterface $queue, QueueWorkerInterface $worker) {
    $lease_time = $worker->getPluginDefinition()['cron']['time'];
    $end = $this->time->getCurrentTime() + $lease_time;
    while ($this->time->getCurrentTime() < $end && ($item = $queue->claimItem($lease_time))) {
      try {
            $queue_worker->processItem($item->data);
        $worker->processItem($item->data);
        $queue->deleteItem($item);
      }
      catch (DelayedRequeueException $e) {
@@ -195,21 +283,21 @@ protected function processQueues() {
          $queue->delayItem($item, $e->getDelay());
        }
      }
          catch (RequeueException $e) {
      catch (RequeueException) {
        // The worker requested the task be immediately requeued.
        $queue->releaseItem($item);
      }
      catch (SuspendQueueException $e) {
            // If the worker indicates the whole queue should be skipped,
            // release the item and go to the next queue.
        // If the worker indicates the whole queue should be skipped, release
        // the item and go to the next queue.
        $queue->releaseItem($item);

        $this->logger->debug('A worker for @queue queue suspended further processing of the queue.', [
              '@queue' => $queue_name,
          '@queue' => $worker->getPluginId(),
        ]);

        // Skip to the next queue.
            continue 2;
        throw $e;
      }
      catch (\Exception $e) {
        // In case of any other kind of exception, log it and leave the item
@@ -218,8 +306,6 @@ protected function processQueues() {
      }
    }
  }
    }
  }

  /**
   * Invokes any cron handlers implementing hook_cron.
@@ -266,4 +352,14 @@ protected function invokeCronHandlers() {
    }
  }

  /**
   * Delay execution in microseconds.
   *
   * @param int $microseconds
   *   Halt time in microseconds.
   */
  protected function usleep(int $microseconds): void {
    usleep($microseconds);
  }

}
+51 −1
Original line number Diff line number Diff line
@@ -12,4 +12,54 @@
 * that workers of subsequent items would encounter it too. For example, if a
 * remote site that the queue worker depends on appears to be inaccessible.
 */
class SuspendQueueException extends \RuntimeException {}
class SuspendQueueException extends \RuntimeException {

  /**
   * Seconds to wait before resuming the queue, or NULL if unknown.
   *
   * @var float|null
   */
  protected $delay = NULL;

  /**
   * Constructs a SuspendQueueException.
   *
   * @param string $message
   *   The error message.
   * @param int $code
   *   The error code.
   * @param \Throwable|null $previous
   *   The previous throwable used for the exception chaining.
   * @param float|null $delay
   *   If the time for when the queue will be ready to resume processing is
   *   known, pass an interval in seconds. Otherwise NULL if the time to resume
   *   processing the queue is not known.
   */
  public function __construct(string $message = '', int $code = 0, \Throwable $previous = NULL, ?float $delay = NULL) {
    parent::__construct($message, $code, $previous);
    $this->delay = $delay;
  }

  /**
   * Get the desired delay interval for this item.
   *
   * @return float|null
   *   If the time for when the queue will be ready to resume processing is
   *   known, pass an interval in seconds. Otherwise NULL if the time to resume
   *   processing the queue is not known.
   */
  public function getDelay(): ?float {
    return $this->delay;
  }

  /**
   * Determine whether the next time the queue should be checked is known.
   *
   * @return bool
   *   Whether the time to resume processing the queue is known.
   */
  public function isDelayable(): bool {
    return isset($this->delay);
  }

}
+1 −0
Original line number Diff line number Diff line
@@ -73,6 +73,7 @@ protected function setUp(): void {

    $time = $this->prophesize('Drupal\Component\Datetime\TimeInterface');
    $time->getCurrentTime()->willReturn($this->currentTime);
    $time->getCurrentMicroTime()->willReturn(100.0);
    $time->getRequestTime()->willReturn($this->currentTime);
    \Drupal::getContainer()->set('datetime.time', $time->reveal());
    $this->assertEquals($this->currentTime, \Drupal::time()->getCurrentTime());
Loading