Commit 9e43704a authored by catch's avatar catch

Issue #1468244 by amateescu, aspilicious: Convert DrupalQueue system to PSR-0.

parent d4442020
......@@ -5264,7 +5264,7 @@ function drupal_cron_run() {
// Make sure every queue exists. There is no harm in trying to recreate an
// existing queue.
foreach ($queues as $queue_name => $info) {
DrupalQueue::get($queue_name)->createQueue();
queue($queue_name)->createQueue();
}
// Register shutdown callback.
drupal_register_shutdown_function('drupal_cron_cleanup');
......@@ -5294,7 +5294,7 @@ function drupal_cron_run() {
foreach ($queues as $queue_name => $info) {
$function = $info['worker callback'];
$end = time() + (isset($info['time']) ? $info['time'] : 15);
$queue = DrupalQueue::get($queue_name);
$queue = queue($queue_name);
while (time() < $end && ($item = $queue->claimItem())) {
$function($item->data);
$queue->deleteItem($item);
......@@ -7863,3 +7863,97 @@ function drupal_get_filetransfer_info() {
}
return $info;
}
/**
* @defgroup queue Queue operations
* @{
* Queue items to allow later processing.
*
* The queue system allows placing items in a queue and processing them later.
* The system tries to ensure that only one consumer can process an item.
*
* Before a queue can be used it needs to be created by
* Drupal\Core\Queue\QueueInterface::createQueue().
*
* Items can be added to the queue by passing an arbitrary data object to
* Drupal\Core\Queue\QueueInterface::createItem().
*
* To process an item, call Drupal\Core\Queue\QueueInterface::claimItem() and
* specify how long you want to have a lease for working on that item.
* When finished processing, the item needs to be deleted by calling
* Drupal\Core\Queue\QueueInterface::deleteItem(). If the consumer dies, the
* item will be made available again by the Drupal\Core\Queue\QueueInterface
* implementation once the lease expires. Another consumer will then be able to
* receive it when calling Drupal\Core\Queue\QueueInterface::claimItem().
* Due to this, the processing code should be aware that an item might be handed
* over for processing more than once.
*
* The $item object used by the Drupal\Core\Queue\QueueInterface can contain
* arbitrary metadata depending on the implementation. Systems using the
* interface should only rely on the data property which will contain the
* information passed to Drupal\Core\Queue\QueueInterface::createItem().
* The full queue item returned by Drupal\Core\Queue\QueueInterface::claimItem()
* needs to be passed to Drupal\Core\Queue\QueueInterface::deleteItem() once
* processing is completed.
*
* There are two kinds of queue backends available: reliable, which preserves
* the order of messages and guarantees that every item will be executed at
* least once. The non-reliable kind only does a best effort to preserve order
* in messages and to execute them at least once but there is a small chance
* that some items get lost. For example, some distributed back-ends like
* Amazon SQS will be managing jobs for a large set of producers and consumers
* where a strict FIFO ordering will likely not be preserved. Another example
* would be an in-memory queue backend which might lose items if it crashes.
* However, such a backend would be able to deal with significantly more writes
* than a reliable queue and for many tasks this is more important. See
* aggregator_cron() for an example of how to effectively utilize a
* non-reliable queue. Another example is doing Twitter statistics -- the small
* possibility of losing a few items is insignificant next to power of the
* queue being able to keep up with writes. As described in the processing
* section, regardless of the queue being reliable or not, the processing code
* should be aware that an item might be handed over for processing more than
* once (because the processing code might time out before it finishes).
*/
/**
* Instantiates and statically caches the correct class for a queue.
*
* The following variables can be set by variable_set or $conf overrides:
* - queue_class_$name: the class to be used for the queue $name.
* - queue_default_class: the class to use when queue_class_$name is not
* defined. Defaults to Drupal\Core\Queue\System, a reliable backend using
* SQL.
* - queue_default_reliable_class: the class to use when queue_class_$name is
* not defined and the queue_default_class is not reliable. Defaults to
* Drupal\Core\Queue\System.
*
* @param string $name
* The name of the queue to work with.
* @param bool $reliable
* TRUE if the ordering of items and guaranteeing every item executes at
* least once is important, FALSE if scalability is the main concern. Defaults
* to FALSE.
*
* @return Drupal\Core\Queue\QueueInterface
* The queue object for a given name.
*
* @see Drupal\Core\Queue\QueueInterface
*/
function queue($name, $reliable = FALSE) {
static $queues;
if (!isset($queues[$name])) {
$class = variable_get('queue_class_' . $name, NULL);
if ($class && $reliable && in_array('Drupal\Core\Queue\ReliableQueueInterface', class_implements($class))) {
$class = variable_get('queue_default_reliable_class', 'Drupal\Core\Queue\System');
}
elseif (!$class) {
$class = variable_get('queue_default_class', 'Drupal\Core\Queue\System');
}
$queues[$name] = new $class($name);
}
return $queues[$name];
}
/**
* @} End of "defgroup queue".
*/
......@@ -4641,8 +4641,9 @@ function &batch_get() {
/**
* Populates a job queue with the operations of a batch set.
*
* Depending on whether the batch is progressive or not, the BatchQueue or
* BatchMemoryQueue handler classes will be used.
* Depending on whether the batch is progressive or not, the
* Drupal\Core\Queue\Batch or Drupal\Core\Queue\BatchMemory handler classes will
* be used.
*
* @param $batch
* The batch array.
......@@ -4659,7 +4660,7 @@ function _batch_populate_queue(&$batch, $set_id) {
$batch_set += array(
'queue' => array(
'name' => 'drupal_batch:' . $batch['id'] . ':' . $set_id,
'class' => $batch['progressive'] ? 'BatchQueue' : 'BatchMemoryQueue',
'class' => $batch['progressive'] ? 'Drupal\Core\Queue\Batch' : 'Drupal\Core\Queue\BatchMemory',
),
);
......@@ -4685,12 +4686,8 @@ function _batch_populate_queue(&$batch, $set_id) {
function _batch_queue($batch_set) {
static $queues;
// The class autoloader is not available when running update.php, so make
// sure the files are manually included.
if (!isset($queues)) {
$queues = array();
require_once DRUPAL_ROOT . '/core/modules/system/system.queue.inc';
require_once DRUPAL_ROOT . '/core/includes/batch.queue.inc';
}
if (isset($batch_set['queue'])) {
......
......@@ -2,28 +2,30 @@
/**
* @file
* Queue handlers used by the Batch API.
*
* These implementations:
* - Ensure FIFO ordering.
* - Allow an item to be repeatedly claimed until it is actually deleted (no
* notion of lease time or 'expire' date), to allow multipass operations.
* Definition of Drupal\Core\Queue\Batch.
*/
namespace Drupal\Core\Queue;
/**
* Defines a batch queue.
* Defines a batch queue handler used by the Batch API.
*
* This implementation:
* - Ensures FIFO ordering.
* - Allows an item to be repeatedly claimed until it is actually deleted (no
* notion of lease time or 'expire' date), to allow multipass operations.
*
* Stale items from failed batches are cleaned from the {queue} table on cron
* using the 'created' date.
*/
class BatchQueue extends SystemQueue {
class Batch extends System {
/**
* Overrides SystemQueue::claimItem().
* Overrides Drupal\Core\Queue\System::claimItem().
*
* Unlike SystemQueue::claimItem(), this method provides a default lease
* time of 0 (no expiration) instead of 30. This allows the item to be
* claimed repeatedly until it is deleted.
* Unlike Drupal\Core\Queue\System::claimItem(), this method provides a
* default lease time of 0 (no expiration) instead of 30. This allows the item
* to be claimed repeatedly until it is deleted.
*/
public function claimItem($lease_time = 0) {
$item = db_query_range('SELECT data, item_id FROM {queue} q WHERE name = :name ORDER BY item_id ASC', 0, 1, array(':name' => $this->name))->fetchObject();
......@@ -37,7 +39,11 @@ public function claimItem($lease_time = 0) {
/**
* Retrieves all remaining items in the queue.
*
* This is specific to Batch API and is not part of the DrupalQueueInterface.
* This is specific to Batch API and is not part of the
* Drupal\Core\Queue\QueueInterface.
*
* @return array
* An array of queue items.
*/
public function getAllItems() {
$result = array();
......@@ -48,37 +54,3 @@ public function getAllItems() {
return $result;
}
}
/**
* Defines a batch queue for non-progressive batches.
*/
class BatchMemoryQueue extends MemoryQueue {
/**
* Overrides MemoryQueue::claimItem().
*
* Unlike MemoryQueue::claimItem(), this method provides a default lease
* time of 0 (no expiration) instead of 30. This allows the item to be
* claimed repeatedly until it is deleted.
*/
public function claimItem($lease_time = 0) {
if (!empty($this->queue)) {
reset($this->queue);
return current($this->queue);
}
return FALSE;
}
/**
* Retrieves all remaining items in the queue.
*
* This is specific to Batch API and is not part of the DrupalQueueInterface.
*/
public function getAllItems() {
$result = array();
foreach ($this->queue as $item) {
$result[] = $item->data;
}
return $result;
}
}
<?php
/**
* @file
* Definition of Drupal\Core\Queue\BatchMemory.
*/
namespace Drupal\Core\Queue;
/**
* Defines a batch queue handler used by the Batch API for non-progressive
* batches.
*
* This implementation:
* - Ensures FIFO ordering.
* - Allows an item to be repeatedly claimed until it is actually deleted (no
* notion of lease time or 'expire' date), to allow multipass operations.
*/
class BatchMemory extends Memory {
/**
* Overrides Drupal\Core\Queue\Memory::claimItem().
*
* Unlike Drupal\Core\Queue\Memory::claimItem(), this method provides a
* default lease time of 0 (no expiration) instead of 30. This allows the item
* to be claimed repeatedly until it is deleted.
*/
public function claimItem($lease_time = 0) {
if (!empty($this->queue)) {
reset($this->queue);
return current($this->queue);
}
return FALSE;
}
/**
* Retrieves all remaining items in the queue.
*
* This is specific to Batch API and is not part of the
* Drupal\Core\Queue\QueueInterface.
*
* @return array
* An array of queue items.
*/
public function getAllItems() {
$result = array();
foreach ($this->queue as $item) {
$result[] = $item->data;
}
return $result;
}
}
<?php
/**
* @file
* Definition of Drupal\Core\Queue\Memory.
*/
namespace Drupal\Core\Queue;
use stdClass;
/**
* Static queue implementation.
*
* This allows "undelayed" variants of processes relying on the Queue
* interface. The queue data resides in memory. It should only be used for
* items that will be queued and dequeued within a given page request.
*/
class Memory implements QueueInterface {
/**
* The queue data.
*
* @var array
*/
protected $queue;
/**
* Counter for item ids.
*
* @var int
*/
protected $idSequence;
/**
* Implements Drupal\Core\Queue\QueueInterface::__construct().
*/
public function __construct($name) {
$this->queue = array();
$this->idSequence = 0;
}
/**
* Implements Drupal\Core\Queue\QueueInterface::createItem().
*/
public function createItem($data) {
$item = new stdClass();
$item->item_id = $this->idSequence++;
$item->data = $data;
$item->created = time();
$item->expire = 0;
$this->queue[$item->item_id] = $item;
}
/**
* Implements Drupal\Core\Queue\QueueInterface::numberOfItems().
*/
public function numberOfItems() {
return count($this->queue);
}
/**
* Implements Drupal\Core\Queue\QueueInterface::claimItem().
*/
public function claimItem($lease_time = 30) {
foreach ($this->queue as $key => $item) {
if ($item->expire == 0) {
$item->expire = time() + $lease_time;
$this->queue[$key] = $item;
return $item;
}
}
return FALSE;
}
/**
* Implements Drupal\Core\Queue\QueueInterface::deleteItem().
*/
public function deleteItem($item) {
unset($this->queue[$item->item_id]);
}
/**
* Implements Drupal\Core\Queue\QueueInterface::releaseItem().
*/
public function releaseItem($item) {
if (isset($this->queue[$item->item_id]) && $this->queue[$item->item_id]->expire != 0) {
$this->queue[$item->item_id]->expire = 0;
return TRUE;
}
return FALSE;
}
/**
* Implements Drupal\Core\Queue\QueueInterface::createQueue().
*/
public function createQueue() {
// Nothing needed here.
}
/**
* Implements Drupal\Core\Queue\QueueInterface::deleteQueue().
*/
public function deleteQueue() {
$this->queue = array();
$this->idSequence = 0;
}
}
<?php
/**
* @file
* Definition of Drupal\Core\Queue\QueueInterface.
*/
namespace Drupal\Core\Queue;
/**
* Interface for a queue.
*
* Classes implementing this interface will do a best effort to preserve order
* in messages and to execute them at least once.
*/
interface QueueInterface {
/**
* Start working with a queue.
*
* @param $name
* Arbitrary string. The name of the queue to work with.
*/
public function __construct($name);
/**
* Adds a queue item and store it directly to the queue.
*
* @param $data
* Arbitrary data to be associated with the new task in the queue.
*
* @return
* TRUE if the item was successfully created and was (best effort) added
* to the queue, otherwise FALSE. We don't guarantee the item was
* committed to disk etc, but as far as we know, the item is now in the
* queue.
*/
public function createItem($data);
/**
* Retrieves the number of items in the queue.
*
* This is intended to provide a "best guess" count of the number of items in
* the queue. Depending on the implementation and the setup, the accuracy of
* the results of this function may vary.
*
* e.g. On a busy system with a large number of consumers and items, the
* result might only be valid for a fraction of a second and not provide an
* accurate representation.
*
* @return
* An integer estimate of the number of items in the queue.
*/
public function numberOfItems();
/**
* Claims an item in the queue for processing.
*
* @param $lease_time
* How long the processing is expected to take in seconds, defaults to an
* hour. After this lease expires, the item will be reset and another
* consumer can claim the item. For idempotent tasks (which can be run
* multiple times without side effects), shorter lease times would result
* in lower latency in case a consumer fails. For tasks that should not be
* run more than once (non-idempotent), a larger lease time will make it
* more rare for a given task to run multiple times in cases of failure,
* at the cost of higher latency.
*
* @return
* On success we return an item object. If the queue is unable to claim an
* item it returns false. This implies a best effort to retrieve an item
* and either the queue is empty or there is some other non-recoverable
* problem.
*/
public function claimItem($lease_time = 3600);
/**
* Deletes a finished item from the queue.
*
* @param $item
* The item returned by Drupal\Core\Queue\QueueInterface::claimItem().
*/
public function deleteItem($item);
/**
* Releases an item that the worker could not process.
*
* Another worker can come in and process it before the timeout expires.
*
* @param $item
* The item returned by Drupal\Core\Queue\QueueInterface::claimItem().
*
* @return boolean
* TRUE if the item has been released, FALSE otherwise.
*/
public function releaseItem($item);
/**
* Creates a queue.
*
* Called during installation and should be used to perform any necessary
* initialization operations. This should not be confused with the
* constructor for these objects, which is called every time an object is
* instantiated to operate on a queue. This operation is only needed the
* first time a given queue is going to be initialized (for example, to make
* a new database table or directory to hold tasks for the queue -- it
* depends on the queue implementation if this is necessary at all).
*/
public function createQueue();
/**
* Deletes a queue and every item in the queue.
*/
public function deleteQueue();
}
<?php
/**
* @file
* Definition of Drupal\Core\Queue\ReliableQueueInterface.
*/
namespace Drupal\Core\Queue;
/**
* Reliable queue interface.
*
* Classes implementing this interface preserve the order of messages and
* guarantee that every item will be executed at least once.
*/
interface ReliableQueueInterface extends QueueInterface {
}
<?php
/**
* @file
* Definition of Drupal\Core\Queue\System.
*/
namespace Drupal\Core\Queue;
/**
* Default queue implementation.
*/
class System implements ReliableQueueInterface {
/**
* The name of the queue this instance is working with.
*
* @var string
*/
protected $name;
/**
* Implements Drupal\Core\Queue\QueueInterface::__construct().
*/
public function __construct($name) {
$this->name = $name;
}
/**
* Implements Drupal\Core\Queue\QueueInterface::createItem().
*/
public function createItem($data) {
// During a Drupal 6.x to 8.x update, drupal_get_schema() does not contain
// the queue table yet, so we cannot rely on drupal_write_record().
$query = db_insert('queue')
->fields(array(
'name' => $this->name,
'data' => serialize($data),
// We cannot rely on REQUEST_TIME because many items might be created
// by a single request which takes longer than 1 second.
'created' => time(),
));
return (bool) $query->execute();
}
/**
* Implements Drupal\Core\Queue\QueueInterface::numberOfItems().
*/
public function numberOfItems() {
return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField();
}
/**
* Implements Drupal\Core\Queue\QueueInterface::claimItem().
*/
public function claimItem($lease_time = 30) {
// Claim an item by updating its expire fields. If claim is not successful
// another thread may have claimed the item in the meantime. Therefore loop
// until an item is successfully claimed or we are reasonably sure there
// are no unclaimed items left.
while (TRUE) {
$item = db_query_range('SELECT data, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created ASC', 0, 1, array(':name' => $this->name))->fetchObject();
if ($item) {
// Try to update the item. Only one thread can succeed in UPDATEing the
// same row. We cannot rely on REQUEST_TIME because items might be
// claimed by a single consumer which runs longer than 1 second. If we
// continue to use REQUEST_TIME instead of the current time(), we steal
// time from the lease, and will tend to reset items before the lease
// should really expire.
$update = db_update('queue')
->fields(array(
'expire' => time() + $lease_time,
))
->condition('item_id', $item->item_id)
->condition('expire', 0);
// If there are affected rows, this update succeeded.
if ($update->execute()) {
$item->data = unserialize($item->data);
return $item;
}
}
else {
// No items currently available to claim.
return FALSE;
}
}
}
/**
* Implements Drupal\Core\Queue\QueueInterface::releaseItem().
*/
public function releaseItem($item) {
$update = db_update('queue')
->fields(array(
'expire' => 0,
))
->condition('item_id', $item->item_id);
return $update->execute();
}
/**
* Implements Drupal\Core\Queue\QueueInterface::deleteItem().
*/
public function deleteItem($item) {
db_delete('queue')
->condition('item_id', $item->item_id)
->execute();
}
/**
* Implements Drupal\Core\Queue\QueueInterface::createQueue().
*/
public function createQueue() {
// All tasks are stored in a single database table (which is created when
// Drupal is first installed) so there is nothing we need to do to create
// a new queue.
}
/**
* Implements Drupal\Core\Queue\QueueInterface::deleteQueue().
*/
public function deleteQueue() {
db_delete('queue')
->condition('name', $this->name)
->execute();
}
}
......@@ -313,7 +313,7 @@ function aggregator_cron() {
':time' => REQUEST_TIME,
':never' => AGGREGATOR_CLEAR_NEVER
));
$queue = DrupalQueue::get('aggregator_feeds');
$queue = queue('aggregator_feeds');
foreach ($result as $feed) {
if ($queue->createItem($feed)) {
// Add timestamp to avoid queueing item more than once.
......
......@@ -29,6 +29,7 @@ files[] = tests/module.test
files[] = tests/pager.test
files[] = tests/password.test
files[] = tests/path.test
files[] = tests/queue.test
files[] = tests/registry.test
files[] = tests/schema.test
files[] = tests/session.test
......
<?php
use Drupal\Core\Queue\Memory;
use Drupal\Core\Queue\System;
/**
* Tests the basic queue functionality.
*/
class QueueTestCase extends DrupalWebTestCase {
public static function getInfo() {
return array(
'name' => 'Queue functionality',
'description' => 'Queues and dequeues a set of items to check the basic queue functionality.',
'group' => 'Queue',
);
}
/**
* Tests the System queue.
*/
public function testSystemQueue() {
// Create two queues.
$queue1 = new System($this->randomName());
$queue1->createQueue();
$queue2 = new System($this->randomName());
$queue2->createQueue();
$this->queueTest($queue1, $queue2);
}
/**
* Tests the Memory queue.
*/
public function testMemoryQueue() {
// Create two queues.
$queue1 = new Memory($this->randomName());
$queue1->createQueue();
$queue2 = new Memory($this->randomName());
$queue2->createQueue();
$this->queueTest($queue1, $queue2);
}
/**
* Queues and dequeues a set of items to check the basic queue functionality.
*
* @param Drupal\Core\Queue\QueueInterface $queue1
* An instantiated queue object.
* @param Drupal\Core\Queue\QueueInterface $queue2
* An instantiated queue object.
*/
protected function queueTest($queue1, $queue2) {
// Create four items.
$data = array();
for ($i = 0; $i < 4; $i++) {
$data[] = array($this->randomName() => $this->randomName());
}