Verified Commit 7a35abe9 authored by alexpott's avatar alexpott
Browse files

Issue #3230541 by cliddell, jday, yogeshmpawar, neclimdul, cmlara, Charlie ChX...

Issue #3230541 by cliddell, jday, yogeshmpawar, neclimdul, cmlara, Charlie ChX Negyesi: Queue items only reserved by cron for 1 second

(cherry picked from commit 0fc50fa5)
parent fc89b10f
......@@ -52,13 +52,15 @@ class QueueWorker extends Plugin {
public $title;
/**
* An associative array containing an optional key.
* An associative array containing optional keys.
*
* This property is optional and it does not need to be declared.
*
* Available keys:
* - time (optional): How much time Drupal cron should spend on calling this
* worker in seconds. Defaults to 15.
* - lease_time: (optional) How long the lease is for a queue item when
* called from cron in seconds. Defaults to 30.
*
* @var array
*/
......
......@@ -174,10 +174,10 @@ protected function processQueues() {
$this->queueFactory->get($queue_name)->createQueue();
$queue_worker = $this->queueManager->createInstance($queue_name);
$end = time() + ($info['cron']['time'] ?? 15);
$end = $this->time->getCurrentTime() + $info['cron']['time'] ?? static::DEFAULT_QUEUE_CRON_TIME;
$queue = $this->queueFactory->get($queue_name);
$lease_time = isset($info['cron']['time']) ?: NULL;
while (time() < $end && ($item = $queue->claimItem($lease_time))) {
$lease_time = $info['cron']['lease_time'] ?? static::DEFAULT_QUEUE_CRON_LEASE_TIME;
while ($this->time->getCurrentTime() < $end && ($item = $queue->claimItem($lease_time))) {
try {
$queue_worker->processItem($item->data);
$queue->deleteItem($item);
......
......@@ -9,6 +9,16 @@
*/
interface CronInterface {
/**
* The default time cron should execute each queue in seconds.
*/
public const DEFAULT_QUEUE_CRON_TIME = 15;
/**
* The default lease time a queue item should get when called from cron.
*/
public const DEFAULT_QUEUE_CRON_LEASE_TIME = 30;
/**
* Executes a cron run.
*
......
......@@ -241,7 +241,7 @@ public function garbageCollection() {
try {
// Clean up the queue for failed batches.
$this->connection->delete(static::TABLE_NAME)
->condition('created', REQUEST_TIME - 864000, '<')
->condition('created', \Drupal::time()->getRequestTime() - 864000, '<')
->condition('name', 'drupal_batch:%', 'LIKE')
->execute();
......@@ -252,7 +252,7 @@ public function garbageCollection() {
'expire' => 0,
])
->condition('expire', 0, '<>')
->condition('expire', REQUEST_TIME, '<')
->condition('expire', \Drupal::time()->getRequestTime(), '<')
->execute();
}
catch (\Exception $e) {
......
......@@ -34,20 +34,6 @@ public function __construct(\Traversable $namespaces, CacheBackendInterface $cac
$this->alterInfo('queue_info');
}
/**
* {@inheritdoc}
*/
public function processDefinition(&$definition, $plugin_id) {
parent::processDefinition($definition, $plugin_id);
// Assign a default time if a cron is specified.
if (isset($definition['cron'])) {
$definition['cron'] += [
'time' => 15,
];
}
}
/**
* {@inheritdoc}
*
......
......@@ -11,7 +11,10 @@
* @QueueWorker(
* id = "cron_queue_test_database_delay_exception",
* title = @Translation("Database delay exception test"),
* cron = {"time" = 1}
* cron = {
* "time" = 1,
* "lease_time" = 2
* }
* )
*/
class CronQueueTestDatabaseDelayException extends QueueWorkerBase {
......
<?php
namespace Drupal\cron_queue_test\Plugin\QueueWorker;
use Drupal\Core\Queue\QueueWorkerBase;
/**
* @QueueWorker(
* id = "cron_queue_test_lease_time",
* title = @Translation("Lease time test"),
* cron = {
* "time" = 5,
* "lease_time" = 2
* }
* )
*/
class CronQueueTestLeaseTime extends QueueWorkerBase {
/**
* {@inheritdoc}
*/
public function processItem($data) {
$state = \Drupal::state();
$count = $state->get('cron_queue_test_lease_time', 0);
$count++;
$state->set('cron_queue_test_lease_time', $count);
throw new \Exception('Leave me queued and leased!');
}
}
......@@ -11,7 +11,10 @@
* @QueueWorker(
* id = "cron_queue_test_memory_delay_exception",
* title = @Translation("Memory delay exception test"),
* cron = {"time" = 1}
* cron = {
* "time" = 1,
* "lease_time" = 2
* }
* )
*/
class CronQueueTestMemoryDelayException extends QueueWorkerBase {
......
......@@ -89,8 +89,8 @@ public function testDelayException() {
// Get the queue worker plugin manager.
$manager = $this->container->get('plugin.manager.queue_worker');
$definitions = $manager->getDefinitions();
$this->assertNotEmpty($database_lease_time = $definitions['cron_queue_test_database_delay_exception']['cron']['time']);
$this->assertNotEmpty($memory_lease_time = $definitions['cron_queue_test_memory_delay_exception']['cron']['time']);
$this->assertNotEmpty($database_lease_time = $definitions['cron_queue_test_database_delay_exception']['cron']['lease_time']);
$this->assertNotEmpty($memory_lease_time = $definitions['cron_queue_test_memory_delay_exception']['cron']['lease_time']);
// Create the necessary test data and run cron.
$database->createItem('test');
......@@ -120,6 +120,30 @@ public function testDelayException() {
$this->assertEquals($this->currentTime + $memory_lease_time, reset($memory_queue_internal)->expire);
}
/**
* Tests that leases are expiring correctly, also within the same request.
*/
public function testLeaseTime() {
$queue = $this->container->get('queue')->get('cron_queue_test_lease_time');
$queue->createItem([$this->randomMachineName() => $this->randomMachineName()]);
$this->cron->run();
static::assertEquals(1, \Drupal::state()->get('cron_queue_test_lease_time'));
$this->cron->run();
static::assertEquals(1, \Drupal::state()->get('cron_queue_test_lease_time'));
// Set the expiration time to 3 seconds ago, so the lease should
// automatically expire.
\Drupal::database()
->update(DatabaseQueue::TABLE_NAME)
->fields(['expire' => $this->currentTime - 3])
->execute();
$this->cron->run();
static::assertEquals(2, \Drupal::state()->get('cron_queue_test_lease_time'));
$this->cron->run();
static::assertEquals(2, \Drupal::state()->get('cron_queue_test_lease_time'));
}
/**
* Tests that exceptions thrown by workers are handled properly.
*/
......@@ -145,7 +169,7 @@ public function testExceptions() {
// @see \Drupal\Core\Cron::processQueues()
$this->connection->update('queue')
->condition('name', 'cron_queue_test_exception')
->fields(['expire' => REQUEST_TIME - 1])
->fields(['expire' => \Drupal::time()->getRequestTime() - 1])
->execute();
$this->cron->run();
$this->assertEquals(2, \Drupal::state()->get('cron_queue_test_exception'));
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment