Commit 8dc9763d authored by alexpott's avatar alexpott

Issue #2549323 by amateescu, mradcliffe, Damien Tournoud: Improve the Upsert...

Issue #2549323 by amateescu, mradcliffe, Damien Tournoud: Improve the Upsert query implementation for PostgreSQL
parent 98e2b931
......@@ -383,6 +383,22 @@ public function rollbackSavepoint($savepoint_name = 'mimic_implicit_commit') {
$this->rollback($savepoint_name);
}
}
/**
* {@inheritdoc}
*/
public function upsert($table, array $options = array()) {
// Use the (faster) native Upsert implementation for PostgreSQL >= 9.5.
if (version_compare($this->version(), '9.5', '>=')) {
$class = $this->getDriverClass('NativeUpsert');
}
else {
$class = $this->getDriverClass('Upsert');
}
return new $class($this, $table, $options);
}
}
/**
......
<?php
/**
* @file
* Contains \Drupal\Core\Database\Driver\pgsql\NativeUpsert.
*/
namespace Drupal\Core\Database\Driver\pgsql;
use Drupal\Core\Database\Query\Upsert as QueryUpsert;
/**
* Implements the native Upsert query for the PostgreSQL database driver.
*
* @see http://www.postgresql.org/docs/9.5/static/sql-insert.html#SQL-ON-CONFLICT
*/
class NativeUpsert extends QueryUpsert {
/**
* {@inheritdoc}
*/
public function execute() {
if (!$this->preExecute()) {
return NULL;
}
$stmt = $this->connection->prepareQuery((string) $this);
// Fetch the list of blobs and sequences used on that table.
$table_information = $this->connection->schema()->queryTableInformation($this->table);
$max_placeholder = 0;
$blobs = [];
$blob_count = 0;
foreach ($this->insertValues as $insert_values) {
foreach ($this->insertFields as $idx => $field) {
if (isset($table_information->blob_fields[$field])) {
$blobs[$blob_count] = fopen('php://memory', 'a');
fwrite($blobs[$blob_count], $insert_values[$idx]);
rewind($blobs[$blob_count]);
$stmt->bindParam(':db_insert_placeholder_' . $max_placeholder++, $blobs[$blob_count], \PDO::PARAM_LOB);
// Pre-increment is faster in PHP than increment.
++$blob_count;
}
else {
$stmt->bindParam(':db_insert_placeholder_' . $max_placeholder++, $insert_values[$idx]);
}
}
// Check if values for a serial field has been passed.
if (!empty($table_information->serial_fields)) {
foreach ($table_information->serial_fields as $index => $serial_field) {
$serial_key = array_search($serial_field, $this->insertFields);
if ($serial_key !== FALSE) {
$serial_value = $insert_values[$serial_key];
// Sequences must be greater than or equal to 1.
if ($serial_value === NULL || !$serial_value) {
$serial_value = 1;
}
// Set the sequence to the bigger value of either the passed
// value or the max value of the column. It can happen that another
// thread calls nextval() which could lead to a serial number being
// used twice. However, trying to insert a value into a serial
// column should only be done in very rare cases and is not thread
// safe by definition.
$this->connection->query("SELECT setval('" . $table_information->sequences[$index] . "', GREATEST(MAX(" . $serial_field . "), :serial_value)) FROM {" . $this->table . "}", array(':serial_value' => (int)$serial_value));
}
}
}
}
$options = $this->queryOptions;
if (!empty($table_information->sequences)) {
$options['sequence_name'] = $table_information->sequences[0];
}
$this->connection->query($stmt, [], $options);
// Re-initialize the values array so that we can re-use this query.
$this->insertValues = [];
return TRUE;
}
/**
* {@inheritdoc}
*/
public function __toString() {
// Create a sanitized comment string to prepend to the query.
$comments = $this->connection->makeComment($this->comments);
// Default fields are always placed first for consistency.
$insert_fields = array_merge($this->defaultFields, $this->insertFields);
$insert_fields = array_map(function($f) { return $this->connection->escapeField($f); }, $insert_fields);
$query = $comments . 'INSERT INTO {' . $this->table . '} (' . implode(', ', $insert_fields) . ') VALUES ';
$values = $this->getInsertPlaceholderFragment($this->insertValues, $this->defaultFields);
$query .= implode(', ', $values);
// Updating the unique / primary key is not necessary.
unset($insert_fields[$this->key]);
$update = [];
foreach ($insert_fields as $field) {
$update[] = "$field = EXCLUDED.$field";
}
$query .= ' ON CONFLICT (' . $this->connection->escapeField($this->key) . ') DO UPDATE SET ' . implode(', ', $update);
return $query;
}
}
......@@ -29,68 +29,38 @@ public function execute() {
// Default fields are always placed first for consistency.
$insert_fields = array_merge($this->defaultFields, $this->insertFields);
$insert_fields_escaped = array_map(function($f) { return $this->connection->escapeField($f); }, $insert_fields);
$table = $this->connection->escapeTable($this->table);
$unique_key = $this->connection->escapeField($this->key);
// We have to execute multiple queries, therefore we wrap everything in a
// transaction so that it is atomic where possible.
$transaction = $this->connection->startTransaction();
try {
// First, create a temporary table with the same schema as the table we
// are trying to upsert in. This results in the following query:
//
// CREATE TEMP TABLE temp_table AS SELECT * FROM table_name LIMIT 0;
$query = 'SELECT * FROM {' . $table . '} LIMIT 0';
$temp_table = $this->connection->queryTemporary($query, [], $this->queryOptions);
// Second, insert the data in the temporary table.
$insert = $this->connection->insert($temp_table, $this->queryOptions)
->fields($insert_fields);
// First, lock the table we're upserting into.
$this->connection->query('LOCK TABLE {' . $table . '} IN SHARE ROW EXCLUSIVE MODE', [], $this->queryOptions);
// Second, delete all items first so we can do one insert.
$unique_key_position = array_search($this->key, $insert_fields);
$delete_ids = [];
foreach ($this->insertValues as $insert_values) {
$insert->values($insert_values);
$delete_ids[] = $insert_values[$unique_key_position];
}
$insert->execute();
// Third, lock the table we're upserting into.
$this->connection->query('LOCK TABLE {' . $table . '} IN EXCLUSIVE MODE', [], $this->queryOptions);
// Fourth, update any rows that can be updated. This results in the
// following query:
//
// UPDATE table_name
// SET column1 = temp_table.column1 [, column2 = temp_table.column2, ...]
// FROM temp_table
// WHERE table_name.id = temp_table.id;
$update = [];
foreach ($insert_fields_escaped as $field) {
if ($field !== $unique_key) {
$update[] = "$field = {" . $temp_table . "}.$field";
}
// Delete in chunks when a large array is passed.
foreach (array_chunk($delete_ids, 1000) as $delete_ids_chunk) {
$this->connection->delete($this->table, $this->queryOptions)
->condition($this->key, $delete_ids_chunk, 'IN')
->execute();
}
$update_query = 'UPDATE {' . $table . '} SET ' . implode(', ', $update);
$update_query .= ' FROM {' . $temp_table . '}';
$update_query .= ' WHERE {' . $temp_table . '}.' . $unique_key . ' = {' . $table . '}.' . $unique_key;
$this->connection->query($update_query, [], $this->queryOptions);
// Fifth, insert the remaining rows. This results in the following query:
//
// INSERT INTO table_name
// SELECT temp_table.primary_key, temp_table.column1 [, temp_table.column2 ...]
// FROM temp_table
// LEFT OUTER JOIN table_name ON (table_name.id = temp_table.id)
// WHERE table_name.id IS NULL;
$select = $this->connection->select($temp_table, 'temp_table', $this->queryOptions)
->fields('temp_table', $insert_fields);
$select->leftJoin($this->table, 'actual_table', 'actual_table.' . $this->key . ' = temp_table.' . $this->key);
$select->isNull('actual_table.' . $this->key);
$this->connection->insert($this->table, $this->queryOptions)
->from($select)
->execute();
// Third, insert all the values.
$insert = $this->connection->insert($this->table, $this->queryOptions)
->fields($insert_fields);
foreach ($this->insertValues as $insert_values) {
$insert->values($insert_values);
}
$insert->execute();
}
catch (\Exception $e) {
// One of the queries failed, rollback the whole batch.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment