Commit 767cd6e4 authored by John's avatar John Committed by Bojan Živanović
Browse files

Issue #3270631 by johnwebdev: Add signal handling

parent bb7d1a0e
Loading
Loading
Loading
Loading
+12 −0
Original line number Diff line number Diff line
@@ -61,6 +61,18 @@ class AdvancedQueueCommands extends DrushCommands {
      throw new \Exception(dt('Could not find queue "@queue_id".', ['@queue_id' => $queue_id]));
    }

    if (extension_loaded('pcntl')) {
      pcntl_async_signals(true);

      pcntl_signal(SIGTERM, function () {
        $this->processor->stop();
      });

      pcntl_signal(SIGINT, function () {
        $this->processor->stop();
      });
    }

    $start = microtime(TRUE);
    $num_processed = $this->processor->processQueue($queue);
    $elapsed = microtime(TRUE) - $start;
+20 −0
Original line number Diff line number Diff line
@@ -36,6 +36,13 @@ class Processor implements ProcessorInterface {
   */
  protected $jobTypeManager;

  /**
   * Indicates if the processor should stop.
   *
   * @var bool
   */
  protected $shouldStop = FALSE;

  /**
   * Constructs a new Processor object.
   *
@@ -58,6 +65,9 @@ class Processor implements ProcessorInterface {
  public function processQueue(QueueInterface $queue) {
    // Start from a clean slate.
    $queue->getBackend()->cleanupQueue();

    $this->shouldStop = FALSE;

    // Allow unlimited processing time only on the CLI.
    $processing_time = $queue->getProcessingTime();
    if ($processing_time == 0 && PHP_SAPI != 'cli') {
@@ -67,6 +77,9 @@ class Processor implements ProcessorInterface {
    $num_processed = 0;

    while (TRUE) {
      if ($this->shouldStop) {
        break;
      }
      $job = $queue->getBackend()->claimJob();
      if (!$job) {
        // The queue is empty. Stop here.
@@ -127,4 +140,11 @@ class Processor implements ProcessorInterface {
    return $result;
  }

  /**
   * {@inheritdoc}
   */
  public function stop() {
    $this->shouldStop = TRUE;
  }

}
+7 −0
Original line number Diff line number Diff line
@@ -36,4 +36,11 @@ interface ProcessorInterface {
   */
  public function processJob(Job $job, QueueInterface $queue);

  /**
   * Stops the processing of the queue.
   *
   * @return void
   */
  public function stop();

}