Loading src/Commands/AdvancedQueueCommands.php +12 −1 Original line number Diff line number Diff line Loading @@ -48,12 +48,18 @@ class AdvancedQueueCommands extends DrushCommands { * * @param string $queue_id * The queue ID. * @param array $options * The options passed to this drush function. * * @throws \Exception * * @command advancedqueue:queue:process * @option timeout The maximum execution time of the script. Be warned that this is a rough estimate as the time is only checked between two items. * @usage advancedqueue:queue:process queuename --timeout=60 * Create a daemon-esque process for 60 seconds to process the * {queuename} queue. After this, the process will complete. */ public function process($queue_id) { public function process($queue_id, array $options = ['timeout' => 90]) { $queue_storage = $this->entityTypeManager->getStorage('advancedqueue_queue'); /** @var \Drupal\advancedqueue\Entity\QueueInterface $queue */ $queue = $queue_storage->load($queue_id); Loading @@ -73,6 +79,11 @@ class AdvancedQueueCommands extends DrushCommands { }); } // Set the processing time for this Drush command. Note: it is up to // Processor implementations to handle this. See the default // \Drupal\advancedqueue\Processor class for an example of this. $queue->setProcessingTime((int) $options['timeout']); $start = microtime(TRUE); $num_processed = $this->processor->processQueue($queue); $elapsed = microtime(TRUE) - $start; Loading src/Processor.php +9 −7 Original line number Diff line number Diff line Loading @@ -96,18 +96,20 @@ class Processor implements ProcessorInterface { if ($this->shouldStop) { break; } $job = $queue->getBackend()->claimJob(); if (!$job) { // The queue is empty. Stop here. break; } $this->processJob($job, $queue); $num_processed++; if ($processing_time && $this->time->getCurrentTime() >= $expected_end) { // Time limit reached. Stop here. break; } $job = $queue->getBackend()->claimJob(); if (!$job) { // No item processed in that round, let the CPU rest. sleep(1); continue; } $this->processJob($job, $queue); $num_processed++; } return $num_processed; Loading tests/modules/advancedqueue_test/src/Plugin/AdvancedQueue/JobType/Retry.php +1 −1 Original line number Diff line number Diff line Loading @@ -13,7 +13,7 @@ use Drupal\advancedqueue\Plugin\AdvancedQueue\JobType\JobTypeBase; * id = "retry", * label = @Translation("Retry"), * max_retries = 1, * retry_delay = 1, * retry_delay = 5, * ) */ class Retry extends JobTypeBase { Loading tests/src/Kernel/ProcessorTest.php +3 −2 Original line number Diff line number Diff line Loading @@ -108,6 +108,7 @@ class ProcessorTest extends KernelTestBase { * @dataProvider retryJobProvider */ public function testRetry(Job $job) { $this->queue->setProcessingTime(2); $this->queue->enqueueJob($job); // Confirm that the job has been requeued. Loading @@ -121,7 +122,7 @@ class ProcessorTest extends KernelTestBase { $this->assertEquals(0, $num_processed); // Confirm that the job was re-processed, and left after the $retry_limit. sleep(1); sleep(5); $num_processed = $this->processor->processQueue($this->queue); $this->assertEquals(1, $num_processed); $counts = $this->queue->getBackend()->countJobs(); Loading Loading @@ -151,7 +152,7 @@ class ProcessorTest extends KernelTestBase { 'expected_state' => Job::STATE_FAILURE, 'expected_message' => '', 'max_retries' => '1', 'retry_delay' => 1, 'retry_delay' => 5, ]); return [[$first_job], [$second_job]]; Loading Loading
src/Commands/AdvancedQueueCommands.php +12 −1 Original line number Diff line number Diff line Loading @@ -48,12 +48,18 @@ class AdvancedQueueCommands extends DrushCommands { * * @param string $queue_id * The queue ID. * @param array $options * The options passed to this drush function. * * @throws \Exception * * @command advancedqueue:queue:process * @option timeout The maximum execution time of the script. Be warned that this is a rough estimate as the time is only checked between two items. * @usage advancedqueue:queue:process queuename --timeout=60 * Create a daemon-esque process for 60 seconds to process the * {queuename} queue. After this, the process will complete. */ public function process($queue_id) { public function process($queue_id, array $options = ['timeout' => 90]) { $queue_storage = $this->entityTypeManager->getStorage('advancedqueue_queue'); /** @var \Drupal\advancedqueue\Entity\QueueInterface $queue */ $queue = $queue_storage->load($queue_id); Loading @@ -73,6 +79,11 @@ class AdvancedQueueCommands extends DrushCommands { }); } // Set the processing time for this Drush command. Note: it is up to // Processor implementations to handle this. See the default // \Drupal\advancedqueue\Processor class for an example of this. $queue->setProcessingTime((int) $options['timeout']); $start = microtime(TRUE); $num_processed = $this->processor->processQueue($queue); $elapsed = microtime(TRUE) - $start; Loading
src/Processor.php +9 −7 Original line number Diff line number Diff line Loading @@ -96,18 +96,20 @@ class Processor implements ProcessorInterface { if ($this->shouldStop) { break; } $job = $queue->getBackend()->claimJob(); if (!$job) { // The queue is empty. Stop here. break; } $this->processJob($job, $queue); $num_processed++; if ($processing_time && $this->time->getCurrentTime() >= $expected_end) { // Time limit reached. Stop here. break; } $job = $queue->getBackend()->claimJob(); if (!$job) { // No item processed in that round, let the CPU rest. sleep(1); continue; } $this->processJob($job, $queue); $num_processed++; } return $num_processed; Loading
tests/modules/advancedqueue_test/src/Plugin/AdvancedQueue/JobType/Retry.php +1 −1 Original line number Diff line number Diff line Loading @@ -13,7 +13,7 @@ use Drupal\advancedqueue\Plugin\AdvancedQueue\JobType\JobTypeBase; * id = "retry", * label = @Translation("Retry"), * max_retries = 1, * retry_delay = 1, * retry_delay = 5, * ) */ class Retry extends JobTypeBase { Loading
tests/src/Kernel/ProcessorTest.php +3 −2 Original line number Diff line number Diff line Loading @@ -108,6 +108,7 @@ class ProcessorTest extends KernelTestBase { * @dataProvider retryJobProvider */ public function testRetry(Job $job) { $this->queue->setProcessingTime(2); $this->queue->enqueueJob($job); // Confirm that the job has been requeued. Loading @@ -121,7 +122,7 @@ class ProcessorTest extends KernelTestBase { $this->assertEquals(0, $num_processed); // Confirm that the job was re-processed, and left after the $retry_limit. sleep(1); sleep(5); $num_processed = $this->processor->processQueue($this->queue); $this->assertEquals(1, $num_processed); $counts = $this->queue->getBackend()->countJobs(); Loading Loading @@ -151,7 +152,7 @@ class ProcessorTest extends KernelTestBase { 'expected_state' => Job::STATE_FAILURE, 'expected_message' => '', 'max_retries' => '1', 'retry_delay' => 1, 'retry_delay' => 5, ]); return [[$first_job], [$second_job]]; Loading