Commit 4ef77a59 authored by Dries's avatar Dries
Browse files

- Patch #391340 by chx, dww, neclimdul, Crell, alex_b, et al: job queue API.

The queue system allows placing items in a queue and processing them later.  The system tries to ensure that only one consumer can process an item.

Before a queue can be used it needs to be created by DrupalQueueInterface::createQueue().

Items can be added to the queue by passing an arbitrary data object to DrupalQueueInterface::createItem().

To process an item, call DrupalQueueInterface::claimItem() and specify how long you want to have a lease for working on that item. When finished processing, the item needs to be deleted by calling DrupalQueueInterface::deleteItem(). If the consumer dies, the item will be made available again by the DrapalQueueInterface implementation once the lease expires. Another consumer will then be able to receive it when calling DrupalQueueInterface::claimItem().

The $item object used by the DrupalQueueInterface can contain arbitrary metadata depending on the implementation. Systems using the interface should only rely on the data property which will contain the information passed to DrupalQueueInterface::createItem(). The full queue item returned by DrupalQueueInterface::createItem() needs to be passed to DrupalQueueInterface::deleteItem() once processing is completed.

While the queue system makes a best effort to preserve order in messages, due to the pluggable nature of the queue, there is no guarantee that items will be delivered on claim in the order they were sent. For example, some implementations like beanstalkd or others with distributed back-ends like Amazon SQS will be managing jobs for a large set of producers and consumers where a strict FIFO ordering will likely not be preserved.

The system also makes no guarantees about a task only being executed once: callers that have non-idempotent tasks either need to live with the possiblity of the task being invoked multiple times in cases where a claim lease expires, or need to implement their own transactions to make their tasks idempotent.
parent 65e0a3f2
......@@ -6,6 +6,7 @@ version = VERSION
core = 7.x
files[] = system.module
files[] = system.admin.inc
files[] = system.queue.inc
files[] = image.gd.inc
files[] = system.install
required = TRUE
......@@ -1052,6 +1052,67 @@ function system_schema() {
'primary key' => array('mlid'),
);
$schema['queue'] = array(
'description' => 'Stores items in queues.',
'fields' => array(
'item_id' => array(
'type' => 'serial',
'unsigned' => TRUE,
'not null' => TRUE,
'description' => 'Primary Key: Unique item ID.',
),
'name' => array(
'type' => 'varchar',
'length' => 255,
'not null' => TRUE,
'default' => '',
'description' => 'The queue name.',
),
'consumer_id' => array(
'type' => 'int',
'not null' => TRUE,
'default' => 0,
'description' => 'The ID of the dequeuing consumer.',
),
'data' => array(
'type' => 'text',
'not null' => FALSE,
'size' => 'big',
'serialize' => TRUE,
'description' => 'The arbitrary data for the item.',
),
'expire' => array(
'type' => 'int',
'not null' => TRUE,
'default' => 0,
'description' => 'Timestamp when the claim lease expires on the item.',
),
'created' => array(
'type' => 'int',
'not null' => TRUE,
'default' => 0,
'description' => 'Timestamp when the item was created.',
),
),
'primary key' => array('item_id'),
'indexes' => array(
'consumer_queue' => array('consumer_id', 'name', 'created'),
'consumer_expire' => array('consumer_id', 'expire'),
),
);
$schema['queue_consumer_id'] = array(
'description' => 'Stores queue consumer IDs, used to auto-increment the consumer ID so that a unique consumer ID is used.',
'fields' => array(
'consumer_id' => array(
'type' => 'serial',
'not null' => TRUE,
'description' => 'Primary Key: Unique consumer ID used to make sure only one consumer gets one item.',
),
),
'primary key' => array('consumer_id'),
);
$schema['registry'] = array(
'description' => "Each record is a function, class, or interface name and the file it is in.",
'fields' => array(
......@@ -3298,6 +3359,76 @@ function system_update_7021() {
return $ret;
}
/**
* Add the queue tables.
*/
function system_update_7022() {
$schema['queue'] = array(
'description' => 'Stores items in queues.',
'fields' => array(
'item_id' => array(
'type' => 'serial',
'unsigned' => TRUE,
'not null' => TRUE,
'description' => 'Primary Key: Unique item ID.',
),
'name' => array(
'type' => 'varchar',
'length' => 255,
'not null' => TRUE,
'default' => '',
'description' => 'The queue name.',
),
'consumer_id' => array(
'type' => 'int',
'not null' => TRUE,
'default' => 0,
'description' => 'The ID of the dequeuing consumer.',
),
'data' => array(
'type' => 'text',
'not null' => FALSE,
'size' => 'big',
'serialize' => TRUE,
'description' => 'The arbitrary data for the item.',
),
'expire' => array(
'type' => 'int',
'not null' => TRUE,
'default' => 0,
'description' => 'Timestamp when the claim lease expires on the item.',
),
'created' => array(
'type' => 'int',
'not null' => TRUE,
'default' => 0,
'description' => 'Timestamp when the item was created.',
),
),
'primary key' => array('item_id'),
'indexes' => array(
'consumer_queue' => array('consumer_id', 'name', 'created'),
'consumer_expire' => array('consumer_id', 'expire'),
),
);
$schema['queue_consumer_id'] = array(
'description' => 'Stores queue consumer IDs, used to auto-incrament the consumer ID so that a unique consumer ID is used.',
'fields' => array(
'consumer_id' => array(
'type' => 'serial',
'not null' => TRUE,
'description' => 'Primary Key: Unique consumer ID used to make sure only one consumer gets one item.',
),
),
'primary key' => array('consumer_id'),
);
db_create_table($ret, 'queue', $schema['queue']);
db_create_table($ret, 'queue_consumer_id', $schema['queue_consumer_id']);
return $ret;
}
/**
* @} End of "defgroup updates-6.x-to-7.x"
* The next series of updates should start at 8000.
......
......@@ -1587,6 +1587,16 @@ function system_cron() {
foreach ($cache_tables as $table) {
cache_clear_all(NULL, $table);
}
// Reset expired items in the default queue implementation table. If that's
// not used, this will simply be a no-op.
db_update('queue')
->fields(array(
'consumer_id' => 0,
'expire' => 0,
))
->condition('expire', REQUEST_TIME, '<')
->execute();
}
/**
......
<?php
// $Id$
/**
* @file
* Queue functionality.
*/
/**
* @defgroup queue Queue operations
* @{
* The queue system allows placing items in a queue and processing them later.
* The system tries to ensure that only one consumer can process an item.
*
* Before a queue can be used it needs to be created by
* DrupalQueueInterface::createQueue().
*
* Items can be added to the queue by passing an arbitrary data object to
* DrupalQueueInterface::createItem().
*
* To process an item, call DrupalQueueInterface::claimItem() and specify how
* long you want to have a lease for working on that item. When finished
* processing, the item needs to be deleted by calling
* DrupalQueueInterface::deleteItem(). If the consumer dies, the item will be
* made available again by the DrapalQueueInterface implementation once the
* lease expires. Another consumer will then be able to receive it when calling
* DrupalQueueInterface::claimItem().
*
* The $item object used by the DrupalQueueInterface can contain arbitrary
* metadata depending on the implementation. Systems using the interface should
* only rely on the data property which will contain the information passed to
* DrupalQueueInterface::createItem(). The full queue item returned by
* DrupalQueueInterface::createItem() needs to be passed to
* DrupalQueueInterface::deleteItem() once processing is completed.
*
* While the queue system makes a best effort to preserve order in messages,
* due to the pluggable nature of the queue, there is no guarantee that items
* will be delivered on claim in the order they were sent. For example, some
* implementations like beanstalkd or others with distributed back-ends like
* Amazon SQS will be managing jobs for a large set of producers and consumers
* where a strict FIFO ordering will likely not be preserved.
*
* The system also makes no guarantees about a task only being executed once:
* callers that have non-idempotent tasks either need to live with the
* possiblity of the task being invoked multiple times in cases where a claim
* lease expires, or need to implement their own transactions to make their
* tasks idempotent.
*/
/**
* Factory class for interacting with queues.
*/
class DrupalQueue {
/**
* Get a queue object for a given name.
*
* @param $name
* Arbitrary string. The name of the queue to work with.
* @return
* The queue object for a given name.
*/
public static function get($name) {
static $queues;
if (!isset($queues[$name])) {
$class = variable_get('queue_module_'. $name, 'System') . 'Queue';
$queues[$name] = new $class($name);
}
return $queues[$name];
}
}
interface DrupalQueueInterface {
/**
* Start working with a queue.
*
* @param $name
* Arbitrary string. The name of the queue to work with.
*/
public function __construct($name);
/**
* Add a queue item and store it directly to the queue.
*
* @param $data
* Arbitrary data to be associated with the new task in the queue.
* @return
* TRUE if the item was successfully created and was (best effort) added
* to the queue, otherwise FALSE. We don't guarantee the item was
* committed to disk, that your disk wasn't hit by a meteor, etc, but as
* far as we know, the item is now in the queue.
*/
public function createItem($data);
/**
* Retrieve the number of items in the queue.
*
* This is intended to provide a "best guess" count of the number of items in
* the queue. Depending on the implementation and the setup, the accuracy of
* the results of this function may vary.
*
* e.g. On a busy system with a large number of consumers and items, the
* result might only be valid for a fraction of a second and not provide an
* accurate representation.
*
* @return
* An integer estimate of the number of items in the queue.
*/
public function numberOfItems();
/**
* Claim an item in the queue for processing.
*
* @param $lease_time
* How long the processing is expected to take in seconds, defaults to an
* hour. After this lease expires, the item will be reset and another
* consumer can claim the item. For idempotent tasks (which can be run
* multiple times without side effects), shorter lease times would result
* in lower latency in case a consumer fails. For tasks that should not be
* run more than once (non-idempotent), a larger lease time will make it
* more rare for a given task to run multiple times in cases of failure,
* at the cost of higher latency.
* @return
* On success we return an item object. If the queue is unable to claim an
* item it returns false. This implies a best effort to retrieve an item
* and either the queue is empty or there is some other non-recoverable
* problem.
*/
public function claimItem($lease_time = 3600);
/**
* Delete a finished item from the queue.
*
* @param $item
* The item returned by claimItem().
*/
public function deleteItem($item);
/**
* Create a queue.
*
* Called during installation and should be used to perform any necessary
* initialization operations. This should not be confused with the
* constructor for these objects, which is called every time an object is
* instantiated to operate on a queue. This operation is only needed the
* first time a given queue is going to be initialized (for example, to make
* a new database table or directory to hold tasks for the queue -- it
* depends on the queue implementation if this is necessary at all).
*/
public function createQueue();
/**
* Delete a queue and every item in the queue.
*/
public function deleteQueue();
}
/**
* Default queue implementation.
*/
class SystemQueue implements DrupalQueueInterface {
/**
* Our internal consumer ID for this queue instance.
*
* This is created lazily when we start consuming items with claimItem().
*
* @var integer
*/
protected $consumerId;
/**
* The name of the queue this instance is working with.
*
* @var string
*/
protected $name;
public function __construct($name) {
$this->name = $name;
}
public function createItem($data) {
$record = new stdClass();
$record->name = $this->name;
$record->data = $data;
$record->consumer_id = 0;
// We cannot rely on REQUEST_TIME because many items might be created by a
// single request which takes longer than 1 second.
$record->created = time();
return drupal_write_record('queue', $record) !== FALSE;
}
public function numberOfItems() {
return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField();
}
public function claimItem($lease_time = 30) {
if (!isset($this->consumerId)) {
$this->consumerId = db_insert('queue_consumer_id')
->useDefaults(array('consumer_id'))
->execute();
}
// Claim an item by updating its consumer_id and expire fields. If claim
// is not successful another thread may have claimed the item in the
// meantime. Therefore loop until an item is successfully claimed or we are
// reasonably sure there are no unclaimed items left.
while (TRUE) {
$item = db_query_range('SELECT data, item_id FROM {queue} q WHERE consumer_id = 0 AND name = :name ORDER BY created ASC', array(':name' => $this->name), 0, 1)->fetchObject();
if ($item) {
// Try to mark the item as ours. We cannot rely on REQUEST_TIME
// because items might be claimed by a single consumer which runs
// longer than 1 second. If we continue to use REQUEST_TIME instead of
// the current time(), we steal time from the lease, and will tend to
// reset items before the lease should really expire.
$update = db_update('queue')
->fields(array(
'consumer_id' => $this->consumerId,
'expire' => time() + $lease_time,
))
->condition('item_id', $item->item_id)
->condition('consumer_id', 0);
// If there are affected rows, this update succeeded.
if ($update->execute()) {
$item->data = unserialize($item->data);
return $item;
}
}
else {
// No items currently available to claim.
return FALSE;
}
}
}
public function deleteItem($item) {
db_delete('queue')
->condition('item_id', $item->item_id)
->execute();
}
public function createQueue() {
// All tasks are stored in a single database table (which is created when
// Drupal is first installed) so there is nothing we need to do to create
// a new queue.
}
public function deleteQueue() {
db_delete('queue')
->condition('name', $this->name)
->execute();
}
}
/**
* @} End of "defgroup queue".
*/
......@@ -895,3 +895,95 @@ class SystemThemeFunctionalTest extends DrupalWebTestCase {
$this->assertRaw('themes/garland', t('Site default theme used on the add content page.'));
}
}
/**
* Test the basic queue functionality.
*/
class QueueTestCase extends DrupalWebTestCase {
public static function getInfo() {
return array(
'name' => t('Queue functionality'),
'description' => t('Queues and dequeues a set of items to check the basic queue functionality.'),
'group' => t('System'),
);
}
/**
* Queues and dequeues a set of items to check the basic queue functionality.
*/
function testQueue() {
// Create two queues.
$queue1 = DrupalQueue::get($this->randomName());
$queue1->createQueue();
$queue2 = DrupalQueue::get($this->randomName());
$queue2->createQueue();
// Create four items.
$data = array();
for ($i = 0; $i < 4; $i++) {
$data[] = array($this->randomName() => $this->randomName());
}
// Queue items 1 and 2 in the queue1.
$queue1->createItem($data[0]);
$queue1->createItem($data[1]);
// Retrieve two items from queue1.
$items = array();
$new_items = array();
$items[] = $item = $queue1->claimItem();
$new_items[] = $item->data;
$items[] = $item = $queue1->claimItem();
$new_items[] = $item->data;
// First two dequeued items should match the first two items we queued.
$this->assertEqual($this->queueScore($data, $new_items), 2, t('Two items matched'));
// Add two more items.
$queue1->createItem($data[2]);
$queue1->createItem($data[3]);
$this->assertTrue($queue1->numberOfItems(), t('Queue 1 is not empty after adding items.'));
$this->assertFalse($queue2->numberOfItems(), t('Queue 2 is empty while Queue 1 has items'));
$items[] = $item = $queue1->claimItem();
$new_items[] = $item->data;
$items[] = $item = $queue1->claimItem();
$new_items[] = $item->data;
// All dequeued items should match the items we queued exactly once,
// therefore the score must be exactly 4.
$this->assertEqual($this->queueScore($data, $new_items), 4, t('Four items matched'));
// There should be no duplicate items.
$this->assertEqual($this->queueScore($new_items, $new_items), 4, t('Four items matched'));
// Delete all items from queue1.
foreach ($items as $item) {
$queue1->deleteItem($item);
}
// Check that both queues are empty.
$this->assertFalse($queue1->numberOfItems(), t('Queue 1 is empty'));
$this->assertFalse($queue2->numberOfItems(), t('Queue 2 is empty'));
}
/**
* This function returns the number of equal items in two arrays.
*/
function queueScore($items, $new_items) {
$score = 0;
foreach ($items as $item) {
foreach ($new_items as $new_item) {
if ($item === $new_item) {
$score++;
}
}
}
return $score;
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment