Commit 76882b6f authored by Dries's avatar Dries
Browse files

- Patch #602306 by chx: simplify the default queue implementation.

parent 820b3535
......@@ -1226,12 +1226,6 @@ function system_schema() {
'default' => '',
'description' => 'The queue name.',
),
'consumer_id' => array(
'type' => 'int',
'not null' => TRUE,
'default' => 0,
'description' => 'The ID of the dequeuing consumer.',
),
'data' => array(
'type' => 'text',
'not null' => FALSE,
......@@ -1254,21 +1248,9 @@ function system_schema() {
),
'primary key' => array('item_id'),
'indexes' => array(
'consumer_queue' => array('consumer_id', 'name', 'created'),
'consumer_expire' => array('consumer_id', 'expire'),
),
);
$schema['queue_consumer_id'] = array(
'description' => 'Stores queue consumer IDs, used to auto-increment the consumer ID so that a unique consumer ID is used.',
'fields' => array(
'consumer_id' => array(
'type' => 'serial',
'not null' => TRUE,
'description' => 'Primary Key: Unique consumer ID used to make sure only one consumer gets one item.',
),
'name_created' => array('name', 'created'),
'expire' => array('expire'),
),
'primary key' => array('consumer_id'),
);
$schema['registry'] = array(
......@@ -2253,12 +2235,6 @@ function system_update_7022() {
'default' => '',
'description' => 'The queue name.',
),
'consumer_id' => array(
'type' => 'int',
'not null' => TRUE,
'default' => 0,
'description' => 'The ID of the dequeuing consumer.',
),
'data' => array(
'type' => 'text',
'not null' => FALSE,
......@@ -2281,25 +2257,12 @@ function system_update_7022() {
),
'primary key' => array('item_id'),
'indexes' => array(
'consumer_queue' => array('consumer_id', 'name', 'created'),
'consumer_expire' => array('consumer_id', 'expire'),
),
);
$schema['queue_consumer_id'] = array(
'description' => 'Stores queue consumer IDs, used to auto-incrament the consumer ID so that a unique consumer ID is used.',
'fields' => array(
'consumer_id' => array(
'type' => 'serial',
'not null' => TRUE,
'description' => 'Primary Key: Unique consumer ID used to make sure only one consumer gets one item.',
),
'name_created' => array('name', 'created'),
'expire' => array('expire'),
),
'primary key' => array('consumer_id'),
);
db_create_table('queue', $schema['queue']);
db_create_table('queue_consumer_id', $schema['queue_consumer_id']);
}
/**
......
......@@ -2426,7 +2426,6 @@ function system_cron() {
// not used, this will simply be a no-op.
db_update('queue')
->fields(array(
'consumer_id' => 0,
'expire' => 0,
))
->condition('expire', REQUEST_TIME, '<')
......
......@@ -182,7 +182,6 @@ public function createItem($data) {
$record = new stdClass();
$record->name = $this->name;
$record->data = $data;
$record->consumer_id = 0;
// We cannot rely on REQUEST_TIME because many items might be created by a
// single request which takes longer than 1 second.
$record->created = time();
......@@ -194,30 +193,25 @@ public function numberOfItems() {
}
public function claimItem($lease_time = 30) {
if (!isset($this->consumerId)) {
$this->consumerId = db_insert('queue_consumer_id')
->useDefaults(array('consumer_id'))
->execute();
}
// Claim an item by updating its consumer_id and expire fields. If claim
// is not successful another thread may have claimed the item in the
// meantime. Therefore loop until an item is successfully claimed or we are
// reasonably sure there are no unclaimed items left.
// Claim an item by updating its expire fields. If claim is not successful
// another thread may have claimed the item in the meantime. Therefore loop
// until an item is successfully claimed or we are reasonably sure there
// are no unclaimed items left.
while (TRUE) {
$item = db_query_range('SELECT data, item_id FROM {queue} q WHERE consumer_id = 0 AND name = :name ORDER BY created ASC', 0, 1, array(':name' => $this->name))->fetchObject();
$item = db_query_range('SELECT data, item_id FROM {queue} q WHERE name = :name ORDER BY created ASC', 0, 1, array(':name' => $this->name))->fetchObject();
if ($item) {
// Try to mark the item as ours. We cannot rely on REQUEST_TIME
// because items might be claimed by a single consumer which runs
// longer than 1 second. If we continue to use REQUEST_TIME instead of
// the current time(), we steal time from the lease, and will tend to
// reset items before the lease should really expire.
// Try to update the item. Only one thread can succeed in UPDATEing the
// same row. We cannot rely on REQUEST_TIME because items might be
// claimed by a single consumer which runs longer than 1 second. If we
// continue to use REQUEST_TIME instead of the current time(), we steal
// time from the lease, and will tend to reset items before the lease
// should really expire.
$update = db_update('queue')
->fields(array(
'consumer_id' => $this->consumerId,
'expire' => time() + $lease_time,
))
->condition('item_id', $item->item_id)
->condition('consumer_id', 0);
->condition('expire', 0);
// If there are affected rows, this update succeeded.
if ($update->execute()) {
$item->data = unserialize($item->data);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment