diff --git a/core/lib/Drupal/Core/Queue/Batch.php b/core/lib/Drupal/Core/Queue/Batch.php index 2d172d9e1699543fe13bef8ac0a1e3a18b28a229..631f9845f8601211e5c666dd9fed35be0503c28d 100644 --- a/core/lib/Drupal/Core/Queue/Batch.php +++ b/core/lib/Drupal/Core/Queue/Batch.php @@ -30,10 +30,15 @@ class Batch extends DatabaseQueue { * item to be claimed repeatedly until it is deleted. */ public function claimItem($lease_time = 0) { - $item = $this->connection->queryRange('SELECT data, item_id FROM {queue} q WHERE name = :name ORDER BY item_id ASC', 0, 1, array(':name' => $this->name))->fetchObject(); - if ($item) { - $item->data = unserialize($item->data); - return $item; + try { + $item = $this->connection->queryRange('SELECT data, item_id FROM {queue} q WHERE name = :name ORDER BY item_id ASC', 0, 1, array(':name' => $this->name))->fetchObject(); + if ($item) { + $item->data = unserialize($item->data); + return $item; + } + } + catch (\Exception $e) { + $this->catchException($e); } return FALSE; } @@ -49,9 +54,14 @@ public function claimItem($lease_time = 0) { */ public function getAllItems() { $result = array(); - $items = $this->connection->query('SELECT data FROM {queue} q WHERE name = :name ORDER BY item_id ASC', array(':name' => $this->name))->fetchAll(); - foreach ($items as $item) { - $result[] = unserialize($item->data); + try { + $items = $this->connection->query('SELECT data FROM {queue} q WHERE name = :name ORDER BY item_id ASC', array(':name' => $this->name))->fetchAll(); + foreach ($items as $item) { + $result[] = unserialize($item->data); + } + } + catch (\Exception $e) { + $this->catchException($e); } return $result; } diff --git a/core/lib/Drupal/Core/Queue/DatabaseQueue.php b/core/lib/Drupal/Core/Queue/DatabaseQueue.php index 9a9573bc40a3dd07b3aeb06582274af0da0f94a5..78cabe811b516832c4ce863e17f8193c0aa9f75f 100644 --- a/core/lib/Drupal/Core/Queue/DatabaseQueue.php +++ b/core/lib/Drupal/Core/Queue/DatabaseQueue.php @@ -8,6 +8,7 @@ namespace Drupal\Core\Queue; use Drupal\Core\Database\Connection; +use Drupal\Core\Database\SchemaObjectExistsException; use Drupal\Core\DependencyInjection\DependencySerializationTrait; /** @@ -15,10 +16,15 @@ * * @ingroup queue */ -class DatabaseQueue implements ReliableQueueInterface { +class DatabaseQueue implements ReliableQueueInterface, QueueGarbageCollectionInterface { use DependencySerializationTrait; + /** + * The database table name. + */ + const TABLE_NAME = 'queue'; + /** * The name of the queue this instance is working with. * @@ -50,7 +56,39 @@ function __construct($name, Connection $connection) { * {@inheritdoc} */ public function createItem($data) { - $query = $this->connection->insert('queue') + $try_again = FALSE; + try { + $id = $this->doCreateItem($data); + } + catch (\Exception $e) { + // If there was an exception, try to create the table. + if (!$try_again = $this->ensureTableExists()) { + // If the exception happened for other reason than the missing table, + // propagate the exception. + throw $e; + } + } + // Now that the table has been created, try again if necessary. + if ($try_again) { + $id = $this->doCreateItem($data); + } + return $id; + } + + /** + * Adds a queue item and store it directly to the queue. + * + * @param $data + * Arbitrary data to be associated with the new task in the queue. + * + * @return + * A unique ID if the item was successfully created and was (best effort) + * added to the queue, otherwise FALSE. We don't guarantee the item was + * committed to disk etc, but as far as we know, the item is now in the + * queue. + */ + protected function doCreateItem($data) { + $query = $this->connection->insert(static::TABLE_NAME) ->fields(array( 'name' => $this->name, 'data' => serialize($data), @@ -66,7 +104,15 @@ public function createItem($data) { * {@inheritdoc} */ public function numberOfItems() { - return $this->connection->query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField(); + try { + return $this->connection->query('SELECT COUNT(item_id) FROM {' . static::TABLE_NAME . '} WHERE name = :name', array(':name' => $this->name)) + ->fetchField(); + } + catch (\Exception $e) { + $this->catchException($e); + // If there is no table there cannot be any items. + return 0; + } } /** @@ -78,7 +124,15 @@ public function claimItem($lease_time = 30) { // until an item is successfully claimed or we are reasonably sure there // are no unclaimed items left. while (TRUE) { - $item = $this->connection->queryRange('SELECT data, created, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, 1, array(':name' => $this->name))->fetchObject(); + try { + $item = $this->connection->queryRange('SELECT data, created, item_id FROM {' . static::TABLE_NAME . '} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, 1, array(':name' => $this->name))->fetchObject(); + } + catch (\Exception $e) { + $this->catchException($e); + // If the table does not exist there are no items currently available to + // claim. + return FALSE; + } if ($item) { // Try to update the item. Only one thread can succeed in UPDATEing the // same row. We cannot rely on REQUEST_TIME because items might be @@ -86,7 +140,7 @@ public function claimItem($lease_time = 30) { // continue to use REQUEST_TIME instead of the current time(), we steal // time from the lease, and will tend to reset items before the lease // should really expire. - $update = $this->connection->update('queue') + $update = $this->connection->update(static::TABLE_NAME) ->fields(array( 'expire' => time() + $lease_time, )) @@ -109,38 +163,169 @@ public function claimItem($lease_time = 30) { * {@inheritdoc} */ public function releaseItem($item) { - $update = $this->connection->update('queue') - ->fields(array( - 'expire' => 0, - )) - ->condition('item_id', $item->item_id); + try { + $update = $this->connection->update(static::TABLE_NAME) + ->fields(array( + 'expire' => 0, + )) + ->condition('item_id', $item->item_id); return $update->execute(); + } + catch (\Exception $e) { + $this->catchException($e); + // If the table doesn't exist we should consider the item released. + return TRUE; + } } /** * {@inheritdoc} */ public function deleteItem($item) { - $this->connection->delete('queue') - ->condition('item_id', $item->item_id) - ->execute(); + try { + $this->connection->delete(static::TABLE_NAME) + ->condition('item_id', $item->item_id) + ->execute(); + } + catch (\Exception $e) { + $this->catchException($e); + } } /** * {@inheritdoc} */ public function createQueue() { - // All tasks are stored in a single database table (which is created when - // Drupal is first installed) so there is nothing we need to do to create - // a new queue. + // All tasks are stored in a single database table (which is created on + // demand) so there is nothing we need to do to create a new queue. } /** * {@inheritdoc} */ public function deleteQueue() { - $this->connection->delete('queue') - ->condition('name', $this->name) - ->execute(); + try { + $this->connection->delete(static::TABLE_NAME) + ->condition('name', $this->name) + ->execute(); + } + catch (\Exception $e) { + $this->catchException($e); + } } + + /** + * {@inheritdoc} + */ + public function garbageCollection() { + try { + // Clean up the queue for failed batches. + $this->connection->delete(static::TABLE_NAME) + ->condition('created', REQUEST_TIME - 864000, '<') + ->condition('name', 'drupal_batch:%', 'LIKE') + ->execute(); + + // Reset expired items in the default queue implementation table. If that's + // not used, this will simply be a no-op. + $this->connection->update(static::TABLE_NAME) + ->fields(array( + 'expire' => 0, + )) + ->condition('expire', 0, '<>') + ->condition('expire', REQUEST_TIME, '<') + ->execute(); + } + catch (\Exception $e) { + $this->catchException($e); + } + } + + /** + * Check if the table exists and create it if not. + */ + protected function ensureTableExists() { + try { + $database_schema = $this->connection->schema(); + if (!$database_schema->tableExists(static::TABLE_NAME)) { + $schema_definition = $this->schemaDefinition(); + $database_schema->createTable(static::TABLE_NAME, $schema_definition); + return TRUE; + } + } + // If another process has already created the queue table, attempting to + // recreate it will throw an exception. In this case just catch the + // exception and do nothing. + catch (SchemaObjectExistsException $e) { + return TRUE; + } + return FALSE; + } + + /** + * Act on an exception when queue might be stale. + * + * If the table does not yet exist, that's fine, but if the table exists and + * yet the query failed, then the queue is stale and the exception needs to + * propagate. + * + * @param $e + * The exception. + * + * @throws \Exception + * If the table exists the exception passed in is rethrown. + */ + protected function catchException(\Exception $e) { + if ($this->connection->schema()->tableExists(static::TABLE_NAME)) { + throw $e; + } + } + + /** + * Defines the schema for the queue table. + */ + public function schemaDefinition() { + return [ + 'description' => 'Stores items in queues.', + 'fields' => [ + 'item_id' => [ + 'type' => 'serial', + 'unsigned' => TRUE, + 'not null' => TRUE, + 'description' => 'Primary Key: Unique item ID.', + ], + 'name' => [ + 'type' => 'varchar_ascii', + 'length' => 255, + 'not null' => TRUE, + 'default' => '', + 'description' => 'The queue name.', + ], + 'data' => [ + 'type' => 'blob', + 'not null' => FALSE, + 'size' => 'big', + 'serialize' => TRUE, + 'description' => 'The arbitrary data for the item.', + ], + 'expire' => [ + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'Timestamp when the claim lease expires on the item.', + ], + 'created' => [ + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'Timestamp when the item was created.', + ], + ], + 'primary key' => ['item_id'], + 'indexes' => [ + 'name_created' => ['name', 'created'], + 'expire' => ['expire'], + ], + ]; + } + } diff --git a/core/lib/Drupal/Core/Queue/QueueGarbageCollectionInterface.php b/core/lib/Drupal/Core/Queue/QueueGarbageCollectionInterface.php new file mode 100644 index 0000000000000000000000000000000000000000..63d440a82b25e50bc2e793aae9a9fafef86e6e7c --- /dev/null +++ b/core/lib/Drupal/Core/Queue/QueueGarbageCollectionInterface.php @@ -0,0 +1,23 @@ +<?php + +/** + * @file + * Contains \Drupal\Core\Queue\QueueGarbageCollectionInterface. + */ + +namespace Drupal\Core\Queue; + +/** + * If the Drupal 'queue' service implements this interface, the + * garbageCollection() method will be called during cron. + * + * @see system_cron() + */ +interface QueueGarbageCollectionInterface { + + /** + * Cleans queues of garbage. + */ + public function garbageCollection(); + +} diff --git a/core/modules/locale/src/Tests/LocaleConfigSubscriberTest.php b/core/modules/locale/src/Tests/LocaleConfigSubscriberTest.php index 376ec44655c68466f5d88cea7659bb620d622647..38e8ad083b6b960986c5828fba5d143c8d7e95e9 100644 --- a/core/modules/locale/src/Tests/LocaleConfigSubscriberTest.php +++ b/core/modules/locale/src/Tests/LocaleConfigSubscriberTest.php @@ -62,7 +62,6 @@ protected function setUp() { $this->setUpDefaultLanguage(); $this->installSchema('locale', ['locales_source', 'locales_target', 'locales_location']); - $this->installSchema('system', ['queue']); $this->setupLanguages(); diff --git a/core/modules/system/src/Tests/KeyValueStore/GarbageCollectionTest.php b/core/modules/system/src/Tests/KeyValueStore/GarbageCollectionTest.php index a1d4467fa3b07f3390b6701e8ff8253ae2dcf984..4a39c6fadcb8ff0368ed98bed329bd950fe49eb5 100644 --- a/core/modules/system/src/Tests/KeyValueStore/GarbageCollectionTest.php +++ b/core/modules/system/src/Tests/KeyValueStore/GarbageCollectionTest.php @@ -30,7 +30,7 @@ protected function setUp() { parent::setUp(); // These additional tables are necessary due to the call to system_cron(). - $this->installSchema('system', array('key_value_expire', 'queue')); + $this->installSchema('system', array('key_value_expire')); } /** diff --git a/core/modules/system/src/Tests/Queue/QueueSerializationTest.php b/core/modules/system/src/Tests/Queue/QueueSerializationTest.php index 638b3f58c70dd6c84149e8b9aaf64eda6dd975f3..4a35cce25927aa5ef239c6c33f39878caeded828 100644 --- a/core/modules/system/src/Tests/Queue/QueueSerializationTest.php +++ b/core/modules/system/src/Tests/Queue/QueueSerializationTest.php @@ -81,7 +81,7 @@ public function submitForm(array &$form, FormStateInterface $form_state) { */ protected function setUp() { parent::setUp(); - $this->installSchema('system', ['key_value_expire', 'sequences', 'queue']); + $this->installSchema('system', ['key_value_expire', 'sequences']); $this->installEntitySchema('user'); $this->queue = \Drupal::service('queue.database')->get('aggregator_refresh'); $test_user = User::create(array( diff --git a/core/modules/system/src/Tests/Queue/QueueTest.php b/core/modules/system/src/Tests/Queue/QueueTest.php index ab1b083dc7d825b197396cdf3b1ce27572d972d5..a1ab45e419c9646f10e70ffd73cde4e8cf01ddb5 100644 --- a/core/modules/system/src/Tests/Queue/QueueTest.php +++ b/core/modules/system/src/Tests/Queue/QueueTest.php @@ -19,18 +19,10 @@ */ class QueueTest extends KernelTestBase { - /** - * The modules to enable. - * - * @var array - */ - public static $modules = array('system'); - /** * Tests the System queue. */ public function testSystemQueue() { - $this->installSchema('system', 'queue'); // Create two queues. $queue1 = new DatabaseQueue($this->randomMachineName(), Database::getConnection()); $queue1->createQueue(); diff --git a/core/modules/system/system.install b/core/modules/system/system.install index 49b5ff35141e12149831bdb26858dc383fe1569d..10118cb8a6bed78165f4a45594cfc05941c9212f 100644 --- a/core/modules/system/system.install +++ b/core/modules/system/system.install @@ -954,49 +954,6 @@ function system_schema() { ), ); - $schema['queue'] = array( - 'description' => 'Stores items in queues.', - 'fields' => array( - 'item_id' => array( - 'type' => 'serial', - 'unsigned' => TRUE, - 'not null' => TRUE, - 'description' => 'Primary Key: Unique item ID.', - ), - 'name' => array( - 'type' => 'varchar_ascii', - 'length' => 255, - 'not null' => TRUE, - 'default' => '', - 'description' => 'The queue name.', - ), - 'data' => array( - 'type' => 'blob', - 'not null' => FALSE, - 'size' => 'big', - 'serialize' => TRUE, - 'description' => 'The arbitrary data for the item.', - ), - 'expire' => array( - 'type' => 'int', - 'not null' => TRUE, - 'default' => 0, - 'description' => 'Timestamp when the claim lease expires on the item.', - ), - 'created' => array( - 'type' => 'int', - 'not null' => TRUE, - 'default' => 0, - 'description' => 'Timestamp when the item was created.', - ), - ), - 'primary key' => array('item_id'), - 'indexes' => array( - 'name_created' => array('name', 'created'), - 'expire' => array('expire'), - ), - ); - $schema['router'] = array( 'description' => 'Maps paths to various callbacks (access, page and title)', 'fields' => array( diff --git a/core/modules/system/system.module b/core/modules/system/system.module index 786c3d67802508a78f24e03318f952727c513ec9..2572a39d435086a0c6d9e84365b146ca9e580e5a 100644 --- a/core/modules/system/system.module +++ b/core/modules/system/system.module @@ -9,6 +9,7 @@ use Drupal\Component\Utility\UrlHelper; use Drupal\Core\Asset\AttachedAssetsInterface; use Drupal\Core\Cache\Cache; +use Drupal\Core\Queue\QueueGarbageCollectionInterface; use Drupal\Core\Database\Query\AlterableInterface; use Drupal\Core\Extension\Extension; use Drupal\Core\Extension\ExtensionDiscovery; @@ -1258,21 +1259,10 @@ function system_cron() { \Drupal::service('keyvalue.expirable.database')->garbageCollection(); } - // Clean up the queue for failed batches. - db_delete('queue') - ->condition('created', REQUEST_TIME - 864000, '<') - ->condition('name', 'drupal_batch:%', 'LIKE') - ->execute(); - - // Reset expired items in the default queue implementation table. If that's - // not used, this will simply be a no-op. - db_update('queue') - ->fields(array( - 'expire' => 0, - )) - ->condition('expire', 0, '<>') - ->condition('expire', REQUEST_TIME, '<') - ->execute(); + // Clean up any garbage in the queue service. + if (\Drupal::service('queue') instanceof QueueGarbageCollectionInterface) { + \Drupal::service('queue')->garbageCollection(); + } // Clean up PHP storage. PhpStorageFactory::get('container')->garbageCollection();