Loading src/Commands/AdvancedQueueCommands.php +12 −0 Original line number Diff line number Diff line Loading @@ -61,6 +61,18 @@ class AdvancedQueueCommands extends DrushCommands { throw new \Exception(dt('Could not find queue "@queue_id".', ['@queue_id' => $queue_id])); } if (extension_loaded('pcntl')) { pcntl_async_signals(true); pcntl_signal(SIGTERM, function () { $this->processor->stop(); }); pcntl_signal(SIGINT, function () { $this->processor->stop(); }); } $start = microtime(TRUE); $num_processed = $this->processor->processQueue($queue); $elapsed = microtime(TRUE) - $start; Loading src/Processor.php +20 −0 Original line number Diff line number Diff line Loading @@ -36,6 +36,13 @@ class Processor implements ProcessorInterface { */ protected $jobTypeManager; /** * Indicates if the processor should stop. * * @var bool */ protected $shouldStop = FALSE; /** * Constructs a new Processor object. * Loading @@ -58,6 +65,9 @@ class Processor implements ProcessorInterface { public function processQueue(QueueInterface $queue) { // Start from a clean slate. $queue->getBackend()->cleanupQueue(); $this->shouldStop = FALSE; // Allow unlimited processing time only on the CLI. $processing_time = $queue->getProcessingTime(); if ($processing_time == 0 && PHP_SAPI != 'cli') { Loading @@ -67,6 +77,9 @@ class Processor implements ProcessorInterface { $num_processed = 0; while (TRUE) { if ($this->shouldStop) { break; } $job = $queue->getBackend()->claimJob(); if (!$job) { // The queue is empty. Stop here. Loading Loading @@ -127,4 +140,11 @@ class Processor implements ProcessorInterface { return $result; } /** * {@inheritdoc} */ public function stop() { $this->shouldStop = TRUE; } } src/ProcessorInterface.php +7 −0 Original line number Diff line number Diff line Loading @@ -36,4 +36,11 @@ interface ProcessorInterface { */ public function processJob(Job $job, QueueInterface $queue); /** * Stops the processing of the queue. * * @return void */ public function stop(); } Loading
src/Commands/AdvancedQueueCommands.php +12 −0 Original line number Diff line number Diff line Loading @@ -61,6 +61,18 @@ class AdvancedQueueCommands extends DrushCommands { throw new \Exception(dt('Could not find queue "@queue_id".', ['@queue_id' => $queue_id])); } if (extension_loaded('pcntl')) { pcntl_async_signals(true); pcntl_signal(SIGTERM, function () { $this->processor->stop(); }); pcntl_signal(SIGINT, function () { $this->processor->stop(); }); } $start = microtime(TRUE); $num_processed = $this->processor->processQueue($queue); $elapsed = microtime(TRUE) - $start; Loading
src/Processor.php +20 −0 Original line number Diff line number Diff line Loading @@ -36,6 +36,13 @@ class Processor implements ProcessorInterface { */ protected $jobTypeManager; /** * Indicates if the processor should stop. * * @var bool */ protected $shouldStop = FALSE; /** * Constructs a new Processor object. * Loading @@ -58,6 +65,9 @@ class Processor implements ProcessorInterface { public function processQueue(QueueInterface $queue) { // Start from a clean slate. $queue->getBackend()->cleanupQueue(); $this->shouldStop = FALSE; // Allow unlimited processing time only on the CLI. $processing_time = $queue->getProcessingTime(); if ($processing_time == 0 && PHP_SAPI != 'cli') { Loading @@ -67,6 +77,9 @@ class Processor implements ProcessorInterface { $num_processed = 0; while (TRUE) { if ($this->shouldStop) { break; } $job = $queue->getBackend()->claimJob(); if (!$job) { // The queue is empty. Stop here. Loading Loading @@ -127,4 +140,11 @@ class Processor implements ProcessorInterface { return $result; } /** * {@inheritdoc} */ public function stop() { $this->shouldStop = TRUE; } }
src/ProcessorInterface.php +7 −0 Original line number Diff line number Diff line Loading @@ -36,4 +36,11 @@ interface ProcessorInterface { */ public function processJob(Job $job, QueueInterface $queue); /** * Stops the processing of the queue. * * @return void */ public function stop(); }