Commit abf0b95c authored by catch's avatar catch
Browse files

Issue #2966607 by Wim Leers, Berdir, claudiu.cristea, dww, Fabianx,...

Issue #2966607 by Wim Leers, Berdir, claudiu.cristea, dww, Fabianx, effulgentsia, bendev, mondrake, Grayle, neclimdul, markgifford, larowlan: Invalidating 'node_list' and other broad cache tags early in a transaction severely increases lock wait time and probability of deadlock

(cherry picked from commit dbca960d4374a735bff9d2a4e28fcc4531b32e91)
parent 9286e7ce
......@@ -18,6 +18,23 @@
*/
interface CacheTagsChecksumInterface {
/**
* The invalid checksum returned if a database transaction is in progress.
*
* Every cache backend SHOULD detect this and not write cache items that have
* this checksum. Not detecting this would not yield incorrect cache reads,
* but would be a useless write.
*
* While a database transaction is progress, cache tag invalidations are
* delayed to occur just before the commit, to allow:
* - deadlock potential to be minimized, since semaphores to avoid concurrent
* writes can be acquired for the shortest period possible
* - Non-database-based implementations of this service can delay tag
* invalidations until the transaction is committed to avoid
* race conditions.
*/
const INVALID_CHECKSUM_WHILE_IN_TRANSACTION = -1;
/**
* Returns the sum total of validations for a given set of tags.
*
......
<?php
namespace Drupal\Core\Cache;
/**
* A trait for cache tag checksum implementations.
*
* Handles delayed cache tag invalidations.
*/
trait CacheTagsChecksumTrait {
/**
* A list of tags that have already been invalidated in this request.
*
* Used to prevent the invalidation of the same cache tag multiple times.
*
* @var bool[]
*/
protected $invalidatedTags = [];
/**
* The set of cache tags whose invalidation is delayed.
*
* @var string[]
*/
protected $delayedTags = [];
/**
* Contains already loaded tag invalidation counts from the storage.
*
* @var int[]
*/
protected $tagCache = [];
/**
* Callback to be invoked just after a database transaction gets committed.
*
* Executes all delayed tag invalidations.
*
* @param bool $success
* Whether or not the transaction was successful.
*/
public function rootTransactionEndCallback($success) {
if ($success) {
$this->doInvalidateTags($this->delayedTags);
}
$this->delayedTags = [];
}
/**
* Implements \Drupal\Core\Cache\CacheTagsChecksumInterface::invalidateTags()
*/
public function invalidateTags(array $tags) {
// Only invalidate tags once per request unless they are written again.
foreach ($tags as $key => $tag) {
if (isset($this->invalidatedTags[$tag])) {
unset($tags[$key]);
}
else {
$this->invalidatedTags[$tag] = TRUE;
unset($this->tagCache[$tag]);
}
}
if (!$tags) {
return;
}
$in_transaction = $this->getDatabaseConnection()->inTransaction();
if ($in_transaction) {
if (empty($this->delayedTags)) {
$this->getDatabaseConnection()->addRootTransactionEndCallback([$this, 'rootTransactionEndCallback']);
}
$this->delayedTags = Cache::mergeTags($this->delayedTags, $tags);
}
else {
$this->doInvalidateTags($tags);
}
}
/**
* Implements \Drupal\Core\Cache\CacheTagsChecksumInterface::getCurrentChecksum()
*/
public function getCurrentChecksum(array $tags) {
// Any cache writes in this request containing cache tags whose invalidation
// has been delayed due to an in-progress transaction must not be read by
// any other request, so use a nonsensical checksum which will cause any
// written cache items to be ignored.
if (!empty(array_intersect($tags, $this->delayedTags))) {
return CacheTagsChecksumInterface::INVALID_CHECKSUM_WHILE_IN_TRANSACTION;
}
// Remove tags that were already invalidated during this request from the
// static caches so that another invalidation can occur later in the same
// request. Without that, written cache items would not be invalidated
// correctly.
foreach ($tags as $tag) {
unset($this->invalidatedTags[$tag]);
}
return $this->calculateChecksum($tags);
}
/**
* Implements \Drupal\Core\Cache\CacheTagsChecksumInterface::isValid()
*/
public function isValid($checksum, array $tags) {
// Any cache reads in this request involving cache tags whose invalidation
// has been delayed due to an in-progress transaction are not allowed to use
// data stored in cache; it must be assumed to be stale. This forces those
// results to be computed instead. Together with the logic in
// ::getCurrentChecksum(), it also prevents that computed data from being
// written to the cache.
if (!empty(array_intersect($tags, $this->delayedTags))) {
return FALSE;
}
return $checksum == $this->calculateChecksum($tags);
}
/**
* Calculates the current checksum for a given set of tags.
*
* @param string[] $tags
* The array of tags to calculate the checksum for.
*
* @return int
* The calculated checksum.
*/
protected function calculateChecksum(array $tags) {
$checksum = 0;
$query_tags = array_diff($tags, array_keys($this->tagCache));
if ($query_tags) {
$tag_invalidations = $this->getTagInvalidationCounts($query_tags);
$this->tagCache += $tag_invalidations;
// Fill static cache with empty objects for tags not found in the storage.
$this->tagCache += array_fill_keys(array_diff($query_tags, array_keys($tag_invalidations)), 0);
}
foreach ($tags as $tag) {
$checksum += $this->tagCache[$tag];
}
return $checksum;
}
/**
* Implements \Drupal\Core\Cache\CacheTagsChecksumInterface::reset()
*/
public function reset() {
$this->tagCache = [];
$this->invalidatedTags = [];
}
/**
* Fetches invalidation counts for cache tags.
*
* @param string[] $tags
* The list of tags to fetch invalidations for.
*
* @return int[]
* List of invalidation counts keyed by the respective cache tag.
*/
abstract protected function getTagInvalidationCounts(array $tags);
/**
* Returns the database connection.
*
* @return \Drupal\Core\Database\Connection
* The database connection.
*/
abstract protected function getDatabaseConnection();
/**
* Marks cache items with any of the specified tags as invalid.
*
* @param string[] $tags
* The set of tags for which to invalidate cache items.
*/
abstract protected function doInvalidateTags(array $tags);
}
......@@ -236,6 +236,11 @@ protected function doSetMultiple(array $items) {
'checksum' => $this->checksumProvider->getCurrentChecksum($item['tags']),
];
// Avoid useless writes.
if ($fields['checksum'] === CacheTagsChecksumInterface::INVALID_CHECKSUM_WHILE_IN_TRANSACTION) {
continue;
}
if (!is_string($item['data'])) {
$fields['data'] = serialize($item['data']);
$fields['serialized'] = 1;
......@@ -247,6 +252,11 @@ protected function doSetMultiple(array $items) {
$values[] = $fields;
}
// If all $items were useless writes, we may end up with zero writes.
if (empty($values)) {
return;
}
// Use an upsert query which is atomic and optimized for multiple-row
// merges.
$query = $this->connection
......
......@@ -10,6 +10,8 @@
*/
class DatabaseCacheTagsChecksum implements CacheTagsChecksumInterface, CacheTagsInvalidatorInterface {
use CacheTagsChecksumTrait;
/**
* The database connection.
*
......@@ -17,22 +19,6 @@ class DatabaseCacheTagsChecksum implements CacheTagsChecksumInterface, CacheTags
*/
protected $connection;
/**
* Contains already loaded cache invalidations from the database.
*
* @var array
*/
protected $tagCache = [];
/**
* A list of tags that have already been invalidated in this request.
*
* Used to prevent the invalidation of the same cache tag multiple times.
*
* @var array
*/
protected $invalidatedTags = [];
/**
* Constructs a DatabaseCacheTagsChecksum object.
*
......@@ -46,15 +32,9 @@ public function __construct(Connection $connection) {
/**
* {@inheritdoc}
*/
public function invalidateTags(array $tags) {
protected function doInvalidateTags(array $tags) {
try {
foreach ($tags as $tag) {
// Only invalidate tags once per request unless they are written again.
if (isset($this->invalidatedTags[$tag])) {
continue;
}
$this->invalidatedTags[$tag] = TRUE;
unset($this->tagCache[$tag]);
$this->connection->merge('cachetags')
->insertFields(['invalidations' => 1])
->expression('invalidations', 'invalidations + 1')
......@@ -75,67 +55,18 @@ public function invalidateTags(array $tags) {
/**
* {@inheritdoc}
*/
public function getCurrentChecksum(array $tags) {
// Remove tags that were already invalidated during this request from the
// static caches so that another invalidation can occur later in the same
// request. Without that, written cache items would not be invalidated
// correctly.
foreach ($tags as $tag) {
unset($this->invalidatedTags[$tag]);
protected function getTagInvalidationCounts(array $tags) {
try {
return $this->connection->query('SELECT tag, invalidations FROM {cachetags} WHERE tag IN ( :tags[] )', [':tags[]' => $tags])
->fetchAllKeyed();
}
return $this->calculateChecksum($tags);
}
/**
* {@inheritdoc}
*/
public function isValid($checksum, array $tags) {
return $checksum == $this->calculateChecksum($tags);
}
/**
* Calculates the current checksum for a given set of tags.
*
* @param array $tags
* The array of tags to calculate the checksum for.
*
* @return int
* The calculated checksum.
*/
protected function calculateChecksum(array $tags) {
$checksum = 0;
$query_tags = array_diff($tags, array_keys($this->tagCache));
if ($query_tags) {
$db_tags = [];
try {
$db_tags = $this->connection->query('SELECT tag, invalidations FROM {cachetags} WHERE tag IN ( :tags[] )', [':tags[]' => $query_tags])
->fetchAllKeyed();
$this->tagCache += $db_tags;
}
catch (\Exception $e) {
// If the table does not exist yet, create.
if (!$this->ensureTableExists()) {
$this->catchException($e);
}
catch (\Exception $e) {
// If the table does not exist yet, create.
if (!$this->ensureTableExists()) {
$this->catchException($e);
}
// Fill static cache with empty objects for tags not found in the database.
$this->tagCache += array_fill_keys(array_diff($query_tags, array_keys($db_tags)), 0);
}
foreach ($tags as $tag) {
$checksum += $this->tagCache[$tag];
}
return $checksum;
}
/**
* {@inheritdoc}
*/
public function reset() {
$this->tagCache = [];
$this->invalidatedTags = [];
return [];
}
/**
......@@ -207,4 +138,11 @@ protected function catchException(\Exception $e) {
}
}
/**
* {@inheritdoc}
*/
public function getDatabaseConnection() {
return $this->connection;
}
}
......@@ -154,6 +154,13 @@ abstract class Connection {
*/
protected $escapedAliases = [];
/**
* Post-root (non-nested) transaction commit callbacks.
*
* @var callable[]
*/
protected $rootTransactionEndCallbacks = [];
/**
* Constructs a Connection object.
*
......@@ -1133,6 +1140,14 @@ public function rollBack($savepoint_name = 'drupal_transaction') {
$rolled_back_other_active_savepoints = TRUE;
}
}
// Notify the callbacks about the rollback.
$callbacks = $this->rootTransactionEndCallbacks;
$this->rootTransactionEndCallbacks = [];
foreach ($callbacks as $callback) {
call_user_func($callback, FALSE);
}
$this->connection->rollBack();
if ($rolled_back_other_active_savepoints) {
throw new TransactionOutOfOrderException();
......@@ -1202,7 +1217,37 @@ public function popTransaction($name) {
}
/**
* Internal function: commit all the transaction layers that can commit.
* Adds a root transaction end callback.
*
* These callbacks are invoked immediately after the transaction has been
* committed.
*
* It can for example be used to avoid deadlocks on write-heavy tables that
* do not need to be part of the transaction, like cache tag invalidations.
*
* Another use case is that services using alternative backends like Redis and
* Memcache cache implementations can replicate the transaction-behavior of
* the database cache backend and avoid race conditions.
*
* An argument is passed to the callbacks that indicates whether the
* transaction was successful or not.
*
* @param callable $callback
* The callback to invoke.
*
* @see \Drupal\Core\Database\Connection::doCommit()
*/
public function addRootTransactionEndCallback(callable $callback) {
if (!$this->transactionLayers) {
throw new \LogicException('Root transaction end callbacks can only be added when there is an active transaction.');
}
$this->rootTransactionEndCallbacks[] = $callback;
}
/**
* Commit all the transaction layers that can commit.
*
* @internal
*/
protected function popCommittableTransactions() {
// Commit all the committable layers.
......@@ -1215,9 +1260,7 @@ protected function popCommittableTransactions() {
// If there are no more layers left then we should commit.
unset($this->transactionLayers[$name]);
if (empty($this->transactionLayers)) {
if (!$this->connection->commit()) {
throw new TransactionCommitFailedException();
}
$this->doCommit();
}
else {
$this->query('RELEASE SAVEPOINT ' . $name);
......@@ -1225,6 +1268,26 @@ protected function popCommittableTransactions() {
}
}
/**
* Do the actual commit, invoke post-commit callbacks.
*
* @internal
*/
protected function doCommit() {
$success = $this->connection->commit();
if (!empty($this->rootTransactionEndCallbacks)) {
$callbacks = $this->rootTransactionEndCallbacks;
$this->rootTransactionEndCallbacks = [];
foreach ($callbacks as $callback) {
call_user_func($callback, $success);
}
}
if (!$success) {
throw new TransactionCommitFailedException();
}
}
/**
* Runs a limited-range query on this database object.
*
......
......@@ -7,7 +7,6 @@
use Drupal\Core\Database\Database;
use Drupal\Core\Database\DatabaseNotFoundException;
use Drupal\Core\Database\TransactionCommitFailedException;
use Drupal\Core\Database\DatabaseException;
use Drupal\Core\Database\Connection as DatabaseConnection;
use Drupal\Component\Utility\Unicode;
......@@ -634,9 +633,7 @@ protected function popCommittableTransactions() {
// If there are no more layers left then we should commit.
unset($this->transactionLayers[$name]);
if (empty($this->transactionLayers)) {
if (!$this->connection->commit()) {
throw new TransactionCommitFailedException();
}
$this->doCommit();
}
else {
// Attempt to release this savepoint in the standard way.
......@@ -657,7 +654,7 @@ protected function popCommittableTransactions() {
$this->transactionLayers = [];
// We also have to explain to PDO that the transaction stack has
// been cleaned-up.
$this->connection->commit();
$this->doCommit();
}
else {
throw $e;
......
name: 'Database Statement Monitoring Test'
type: module
description: 'Support module for Database layer tests that need to monitor executed database statements.'
core: 8.x
package: Testing
version: VERSION
<?php
namespace Drupal\database_statement_monitoring_test;
/**
* Trait for Connection classes that can store logged statements.
*/
trait LoggedStatementsTrait {
/**
* Logged statements.
*
* @var string[]
*/
protected $loggedStatements;
/**
* {@inheritdoc}
*/
public function query($query, array $args = [], $options = []) {
// Log the query if it is a string, can receive statement objects e.g
// in the pgsql driver. These are hard to log as the table name has already
// been replaced.
if (is_string($query)) {
$stringified_args = array_map(function ($v) {
return is_array($v) ? implode(',', $v) : $v;
}, $args);
$this->loggedStatements[] = str_replace(array_keys($stringified_args), array_values($stringified_args), $query);
}
return parent::query($query, $args, $options);
}
/**
* Resets logged statements.
*
* @return $this
*/
public function resetLoggedStatements() {
$this->loggedStatements = [];
return $this;
}
/**
* {@inheritdoc}
*/
public function getDriverClass($class) {
// Override because the base class uses reflection to determine namespace
// based on object, which would break.
$namespace = (new \ReflectionClass(get_parent_class($this)))->getNamespaceName();
$driver_class = $namespace . '\\' . $class;
return class_exists($driver_class) ? $driver_class : $class;
}
/**
* Returns the executed queries.
*
* @return string[]
*/
public function getLoggedStatements() {
return $this->loggedStatements;
}
}
<?php
namespace Drupal\database_statement_monitoring_test\mysql;
use Drupal\Core\Database\Driver\mysql\Connection as BaseConnection;
use Drupal\database_statement_monitoring_test\LoggedStatementsTrait;
/**
* MySQL Connection class that can log executed queries.
*/
class Connection extends BaseConnection {
use LoggedStatementsTrait;
}
<?php
namespace Drupal\database_statement_monitoring_test\pgsql;
use Drupal\Core\Database\Driver\pgsql\Connection as BaseConnection;
use Drupal\database_statement_monitoring_test\LoggedStatementsTrait;
/**
* PostgreSQL Connection class that can log executed queries.
*/
class Connection extends BaseConnection {
use LoggedStatementsTrait;
}
<