diff --git a/core/lib/Drupal/Core/Database/Driver/pgsql/Connection.php b/core/lib/Drupal/Core/Database/Driver/pgsql/Connection.php index ac0184b21662b3461295bf5d11dab657cecd2208..ff38a22c5b84604b0ba42a2bd07696c54ab2834f 100644 --- a/core/lib/Drupal/Core/Database/Driver/pgsql/Connection.php +++ b/core/lib/Drupal/Core/Database/Driver/pgsql/Connection.php @@ -383,6 +383,22 @@ public function rollbackSavepoint($savepoint_name = 'mimic_implicit_commit') { $this->rollback($savepoint_name); } } + + /** + * {@inheritdoc} + */ + public function upsert($table, array $options = array()) { + // Use the (faster) native Upsert implementation for PostgreSQL >= 9.5. + if (version_compare($this->version(), '9.5', '>=')) { + $class = $this->getDriverClass('NativeUpsert'); + } + else { + $class = $this->getDriverClass('Upsert'); + } + + return new $class($this, $table, $options); + } + } /** diff --git a/core/lib/Drupal/Core/Database/Driver/pgsql/NativeUpsert.php b/core/lib/Drupal/Core/Database/Driver/pgsql/NativeUpsert.php new file mode 100644 index 0000000000000000000000000000000000000000..d1c6d11200ea5e667e2bc25f8e84d4a22c3229fb --- /dev/null +++ b/core/lib/Drupal/Core/Database/Driver/pgsql/NativeUpsert.php @@ -0,0 +1,116 @@ +<?php + +/** + * @file + * Contains \Drupal\Core\Database\Driver\pgsql\NativeUpsert. + */ + +namespace Drupal\Core\Database\Driver\pgsql; + +use Drupal\Core\Database\Query\Upsert as QueryUpsert; + +/** + * Implements the native Upsert query for the PostgreSQL database driver. + * + * @see http://www.postgresql.org/docs/9.5/static/sql-insert.html#SQL-ON-CONFLICT + */ +class NativeUpsert extends QueryUpsert { + + /** + * {@inheritdoc} + */ + public function execute() { + if (!$this->preExecute()) { + return NULL; + } + + $stmt = $this->connection->prepareQuery((string) $this); + + // Fetch the list of blobs and sequences used on that table. + $table_information = $this->connection->schema()->queryTableInformation($this->table); + + $max_placeholder = 0; + $blobs = []; + $blob_count = 0; + foreach ($this->insertValues as $insert_values) { + foreach ($this->insertFields as $idx => $field) { + if (isset($table_information->blob_fields[$field])) { + $blobs[$blob_count] = fopen('php://memory', 'a'); + fwrite($blobs[$blob_count], $insert_values[$idx]); + rewind($blobs[$blob_count]); + + $stmt->bindParam(':db_insert_placeholder_' . $max_placeholder++, $blobs[$blob_count], \PDO::PARAM_LOB); + + // Pre-increment is faster in PHP than increment. + ++$blob_count; + } + else { + $stmt->bindParam(':db_insert_placeholder_' . $max_placeholder++, $insert_values[$idx]); + } + } + // Check if values for a serial field has been passed. + if (!empty($table_information->serial_fields)) { + foreach ($table_information->serial_fields as $index => $serial_field) { + $serial_key = array_search($serial_field, $this->insertFields); + if ($serial_key !== FALSE) { + $serial_value = $insert_values[$serial_key]; + + // Sequences must be greater than or equal to 1. + if ($serial_value === NULL || !$serial_value) { + $serial_value = 1; + } + // Set the sequence to the bigger value of either the passed + // value or the max value of the column. It can happen that another + // thread calls nextval() which could lead to a serial number being + // used twice. However, trying to insert a value into a serial + // column should only be done in very rare cases and is not thread + // safe by definition. + $this->connection->query("SELECT setval('" . $table_information->sequences[$index] . "', GREATEST(MAX(" . $serial_field . "), :serial_value)) FROM {" . $this->table . "}", array(':serial_value' => (int)$serial_value)); + } + } + } + } + + $options = $this->queryOptions; + if (!empty($table_information->sequences)) { + $options['sequence_name'] = $table_information->sequences[0]; + } + + $this->connection->query($stmt, [], $options); + + // Re-initialize the values array so that we can re-use this query. + $this->insertValues = []; + + return TRUE; + } + + /** + * {@inheritdoc} + */ + public function __toString() { + // Create a sanitized comment string to prepend to the query. + $comments = $this->connection->makeComment($this->comments); + + // Default fields are always placed first for consistency. + $insert_fields = array_merge($this->defaultFields, $this->insertFields); + $insert_fields = array_map(function($f) { return $this->connection->escapeField($f); }, $insert_fields); + + $query = $comments . 'INSERT INTO {' . $this->table . '} (' . implode(', ', $insert_fields) . ') VALUES '; + + $values = $this->getInsertPlaceholderFragment($this->insertValues, $this->defaultFields); + $query .= implode(', ', $values); + + // Updating the unique / primary key is not necessary. + unset($insert_fields[$this->key]); + + $update = []; + foreach ($insert_fields as $field) { + $update[] = "$field = EXCLUDED.$field"; + } + + $query .= ' ON CONFLICT (' . $this->connection->escapeField($this->key) . ') DO UPDATE SET ' . implode(', ', $update); + + return $query; + } + +} diff --git a/core/lib/Drupal/Core/Database/Driver/pgsql/Upsert.php b/core/lib/Drupal/Core/Database/Driver/pgsql/Upsert.php index a0a9de8d8ec463df727e5f608e5ed8f07443349c..e23092d875b878d6a54f296fc0bf73ecab1b1f16 100644 --- a/core/lib/Drupal/Core/Database/Driver/pgsql/Upsert.php +++ b/core/lib/Drupal/Core/Database/Driver/pgsql/Upsert.php @@ -29,68 +29,38 @@ public function execute() { // Default fields are always placed first for consistency. $insert_fields = array_merge($this->defaultFields, $this->insertFields); - $insert_fields_escaped = array_map(function($f) { return $this->connection->escapeField($f); }, $insert_fields); $table = $this->connection->escapeTable($this->table); - $unique_key = $this->connection->escapeField($this->key); // We have to execute multiple queries, therefore we wrap everything in a // transaction so that it is atomic where possible. $transaction = $this->connection->startTransaction(); try { - // First, create a temporary table with the same schema as the table we - // are trying to upsert in. This results in the following query: - // - // CREATE TEMP TABLE temp_table AS SELECT * FROM table_name LIMIT 0; - $query = 'SELECT * FROM {' . $table . '} LIMIT 0'; - $temp_table = $this->connection->queryTemporary($query, [], $this->queryOptions); - - // Second, insert the data in the temporary table. - $insert = $this->connection->insert($temp_table, $this->queryOptions) - ->fields($insert_fields); + // First, lock the table we're upserting into. + $this->connection->query('LOCK TABLE {' . $table . '} IN SHARE ROW EXCLUSIVE MODE', [], $this->queryOptions); + + // Second, delete all items first so we can do one insert. + $unique_key_position = array_search($this->key, $insert_fields); + $delete_ids = []; foreach ($this->insertValues as $insert_values) { - $insert->values($insert_values); + $delete_ids[] = $insert_values[$unique_key_position]; } - $insert->execute(); - // Third, lock the table we're upserting into. - $this->connection->query('LOCK TABLE {' . $table . '} IN EXCLUSIVE MODE', [], $this->queryOptions); - - // Fourth, update any rows that can be updated. This results in the - // following query: - // - // UPDATE table_name - // SET column1 = temp_table.column1 [, column2 = temp_table.column2, ...] - // FROM temp_table - // WHERE table_name.id = temp_table.id; - $update = []; - foreach ($insert_fields_escaped as $field) { - if ($field !== $unique_key) { - $update[] = "$field = {" . $temp_table . "}.$field"; - } + // Delete in chunks when a large array is passed. + foreach (array_chunk($delete_ids, 1000) as $delete_ids_chunk) { + $this->connection->delete($this->table, $this->queryOptions) + ->condition($this->key, $delete_ids_chunk, 'IN') + ->execute(); } - $update_query = 'UPDATE {' . $table . '} SET ' . implode(', ', $update); - $update_query .= ' FROM {' . $temp_table . '}'; - $update_query .= ' WHERE {' . $temp_table . '}.' . $unique_key . ' = {' . $table . '}.' . $unique_key; - $this->connection->query($update_query, [], $this->queryOptions); - - // Fifth, insert the remaining rows. This results in the following query: - // - // INSERT INTO table_name - // SELECT temp_table.primary_key, temp_table.column1 [, temp_table.column2 ...] - // FROM temp_table - // LEFT OUTER JOIN table_name ON (table_name.id = temp_table.id) - // WHERE table_name.id IS NULL; - $select = $this->connection->select($temp_table, 'temp_table', $this->queryOptions) - ->fields('temp_table', $insert_fields); - $select->leftJoin($this->table, 'actual_table', 'actual_table.' . $this->key . ' = temp_table.' . $this->key); - $select->isNull('actual_table.' . $this->key); - - $this->connection->insert($this->table, $this->queryOptions) - ->from($select) - ->execute(); + // Third, insert all the values. + $insert = $this->connection->insert($this->table, $this->queryOptions) + ->fields($insert_fields); + foreach ($this->insertValues as $insert_values) { + $insert->values($insert_values); + } + $insert->execute(); } catch (\Exception $e) { // One of the queries failed, rollback the whole batch.