Commit c010a3a9 authored by alexpott's avatar alexpott

Issue #2542776 by amateescu, chx, martin107, Crell: Add an Upsert class

parent 0ed610ac
......@@ -786,6 +786,23 @@ public function merge($table, array $options = array()) {
return new $class($this, $table, $options);
}
/**
* Prepares and returns an UPSERT query object.
*
* @param string $table
* The table to use for the upsert query.
* @param array $options
* (optional) An array of options on the query.
*
* @return \Drupal\Core\Database\Query\Upsert
* A new Upsert query object.
*
* @see \Drupal\Core\Database\Query\Upsert
*/
public function upsert($table, array $options = array()) {
$class = $this->getDriverClass('Upsert');
return new $class($this, $table, $options);
}
/**
* Prepares and returns an UPDATE query object.
......
......@@ -55,30 +55,7 @@ public function __toString() {
$query = $comments . 'INSERT INTO {' . $this->table . '} (' . implode(', ', $insert_fields) . ') VALUES ';
$max_placeholder = 0;
$values = array();
if (count($this->insertValues)) {
foreach ($this->insertValues as $insert_values) {
$placeholders = array();
// Default fields aren't really placeholders, but this is the most convenient
// way to handle them.
$placeholders = array_pad($placeholders, count($this->defaultFields), 'default');
$new_placeholder = $max_placeholder + count($insert_values);
for ($i = $max_placeholder; $i < $new_placeholder; ++$i) {
$placeholders[] = ':db_insert_placeholder_' . $i;
}
$max_placeholder = $new_placeholder;
$values[] = '(' . implode(', ', $placeholders) . ')';
}
}
else {
// If there are no values, then this is a default-only query. We still need to handle that.
$placeholders = array_fill(0, count($this->defaultFields), 'default');
$values[] = '(' . implode(', ', $placeholders) . ')';
}
$values = $this->getInsertPlaceholderFragment($this->insertValues, $this->defaultFields);
$query .= implode(', ', $values);
return $query;
......
<?php
/**
* @file
* Contains \Drupal\Core\Database\Driver\mysql\Upsert.
*/
namespace Drupal\Core\Database\Driver\mysql;
use Drupal\Core\Database\Query\Upsert as QueryUpsert;
/**
* Implements the Upsert query for the MySQL database driver.
*/
class Upsert extends QueryUpsert {
/**
* {@inheritdoc}
*/
public function __toString() {
// Create a sanitized comment string to prepend to the query.
$comments = $this->connection->makeComment($this->comments);
// Default fields are always placed first for consistency.
$insert_fields = array_merge($this->defaultFields, $this->insertFields);
$query = $comments . 'INSERT INTO {' . $this->table . '} (' . implode(', ', $insert_fields) . ') VALUES ';
$values = $this->getInsertPlaceholderFragment($this->insertValues, $this->defaultFields);
$query .= implode(', ', $values);
// Updating the unique / primary key is not necessary.
unset($insert_fields[$this->key]);
$update = [];
foreach ($insert_fields as $field) {
$update[] = "$field = VALUES($field)";
}
$query .= ' ON DUPLICATE KEY UPDATE ' . implode(', ', $update);
return $query;
}
}
......@@ -128,30 +128,7 @@ public function __toString() {
$query = $comments . 'INSERT INTO {' . $this->table . '} (' . implode(', ', $insert_fields) . ') VALUES ';
$max_placeholder = 0;
$values = array();
if (count($this->insertValues)) {
foreach ($this->insertValues as $insert_values) {
$placeholders = array();
// Default fields aren't really placeholders, but this is the most convenient
// way to handle them.
$placeholders = array_pad($placeholders, count($this->defaultFields), 'default');
$new_placeholder = $max_placeholder + count($insert_values);
for ($i = $max_placeholder; $i < $new_placeholder; ++$i) {
$placeholders[] = ':db_insert_placeholder_' . $i;
}
$max_placeholder = $new_placeholder;
$values[] = '(' . implode(', ', $placeholders) . ')';
}
}
else {
// If there are no values, then this is a default-only query. We still need to handle that.
$placeholders = array_fill(0, count($this->defaultFields), 'default');
$values[] = '(' . implode(', ', $placeholders) . ')';
}
$values = $this->getInsertPlaceholderFragment($this->insertValues, $this->defaultFields);
$query .= implode(', ', $values);
return $query;
......
......@@ -93,9 +93,17 @@ protected function ensureIdentifiersLength($identifier) {
public function queryTableInformation($table) {
// Generate a key to reference this table's information on.
$key = $this->connection->prefixTables('{' . $table . '}');
if (strpos($key, '.') === FALSE) {
// Take into account that temporary tables are stored in a different schema.
// \Drupal\Core\Database\Connection::generateTemporaryTableName() sets the
// 'db_temporary_' prefix to all temporary tables.
if (strpos($key, '.') === FALSE && strpos($table, 'db_temporary_') === FALSE) {
$key = 'public.' . $key;
}
else {
$schema = $this->connection->query('SELECT nspname FROM pg_namespace WHERE oid = pg_my_temp_schema()')->fetchField();
$key = $schema . '.' . $key;
}
if (!isset($this->tableInformation[$key])) {
// Split the key into schema and table for querying.
......
<?php
/**
* @file
* Contains \Drupal\Core\Database\Driver\pgsql\Upsert.
*/
namespace Drupal\Core\Database\Driver\pgsql;
use Drupal\Core\Database\Query\Upsert as QueryUpsert;
/**
* Implements the Upsert query for the PostgreSQL database driver.
*/
class Upsert extends QueryUpsert {
/**
* {@inheritdoc}
*/
public function execute() {
if (!$this->preExecute()) {
return NULL;
}
// Default options for upsert queries.
$this->queryOptions += array(
'throw_exception' => TRUE,
);
// Default fields are always placed first for consistency.
$insert_fields = array_merge($this->defaultFields, $this->insertFields);
$insert_fields_escaped = array_map(function($f) { return $this->connection->escapeField($f); }, $insert_fields);
$table = $this->connection->escapeTable($this->table);
$unique_key = $this->connection->escapeField($this->key);
// We have to execute multiple queries, therefore we wrap everything in a
// transaction so that it is atomic where possible.
$transaction = $this->connection->startTransaction();
try {
// First, create a temporary table with the same schema as the table we
// are trying to upsert in. This results in the following query:
//
// CREATE TEMP TABLE temp_table AS SELECT * FROM table_name LIMIT 0;
$query = 'SELECT * FROM {' . $table . '} LIMIT 0';
$temp_table = $this->connection->queryTemporary($query, [], $this->queryOptions);
// Second, insert the data in the temporary table.
$insert = $this->connection->insert($temp_table, $this->queryOptions)
->fields($insert_fields);
foreach ($this->insertValues as $insert_values) {
$insert->values($insert_values);
}
$insert->execute();
// Third, lock the table we're upserting into.
$this->connection->query('LOCK TABLE {' . $table . '} IN EXCLUSIVE MODE', [], $this->queryOptions);
// Fourth, update any rows that can be updated. This results in the
// following query:
//
// UPDATE table_name
// SET column1 = temp_table.column1 [, column2 = temp_table.column2, ...]
// FROM temp_table
// WHERE table_name.id = temp_table.id;
$update = [];
foreach ($insert_fields_escaped as $field) {
if ($field !== $unique_key) {
$update[] = "$field = {" . $temp_table . "}.$field";
}
}
$update_query = 'UPDATE {' . $table . '} SET ' . implode(', ', $update);
$update_query .= ' FROM {' . $temp_table . '}';
$update_query .= ' WHERE {' . $temp_table . '}.' . $unique_key . ' = {' . $table . '}.' . $unique_key;
$this->connection->query($update_query, [], $this->queryOptions);
// Fifth, insert the remaining rows. This results in the following query:
//
// INSERT INTO table_name
// SELECT temp_table.primary_key, temp_table.column1 [, temp_table.column2 ...]
// FROM temp_table
// LEFT OUTER JOIN table_name ON (table_name.id = temp_table.id)
// WHERE table_name.id IS NULL;
$select = $this->connection->select($temp_table, 'temp_table', $this->queryOptions)
->fields('temp_table', $insert_fields);
$select->leftJoin($this->table, 'actual_table', 'actual_table.' . $this->key . ' = temp_table.' . $this->key);
$select->isNull('actual_table.' . $this->key);
$this->connection->insert($this->table, $this->queryOptions)
->from($select)
->execute();
}
catch (\Exception $e) {
// One of the queries failed, rollback the whole batch.
$transaction->rollback();
// Rethrow the exception for the calling code.
throw $e;
}
// Re-initialize the values array so that we can re-use this query.
$this->insertValues = array();
// Transaction commits here where $transaction looses scope.
return TRUE;
}
/**
* {@inheritdoc}
*/
public function __toString() {
// Nothing to do.
}
}
<?php
/**
* @file
* Contains \Drupal\Core\Database\Driver\sqlite\Upsert.
*/
namespace Drupal\Core\Database\Driver\sqlite;
use Drupal\Core\Database\Query\Upsert as QueryUpsert;
/**
* Implements the Upsert query for the SQLite database driver.
*/
class Upsert extends QueryUpsert {
/**
* {@inheritdoc}
*/
public function __toString() {
// Create a sanitized comment string to prepend to the query.
$comments = $this->connection->makeComment($this->comments);
// Default fields are always placed first for consistency.
$insert_fields = array_merge($this->defaultFields, $this->insertFields);
$query = $comments . 'INSERT OR REPLACE INTO {' . $this->table . '} (' . implode(', ', $insert_fields) . ') VALUES ';
$values = $this->getInsertPlaceholderFragment($this->insertValues, $this->defaultFields);
$query .= implode(', ', $values);
return $query;
}
}
......@@ -16,43 +16,7 @@
*/
class Insert extends Query {
/**
* The table on which to insert.
*
* @var string
*/
protected $table;
/**
* An array of fields on which to insert.
*
* @var array
*/
protected $insertFields = array();
/**
* An array of fields that should be set to their database-defined defaults.
*
* @var array
*/
protected $defaultFields = array();
/**
* A nested array of values to insert.
*
* $insertValues is an array of arrays. Each sub-array is either an
* associative array whose keys are field names and whose values are field
* values to insert, or a non-associative array of values in the same order
* as $insertFields.
*
* Whether multiple insert sets will be run in a single query or multiple
* queries is left to individual drivers to implement in whatever manner is
* most appropriate. The order of values in each sub-array must match the
* order of fields in $insertFields.
*
* @var array
*/
protected $insertValues = array();
use InsertTrait;
/**
* A SelectQuery object to fetch the rows that should be inserted.
......@@ -79,96 +43,6 @@ public function __construct($connection, $table, array $options = array()) {
$this->table = $table;
}
/**
* Adds a set of field->value pairs to be inserted.
*
* This method may only be called once. Calling it a second time will be
* ignored. To queue up multiple sets of values to be inserted at once,
* use the values() method.
*
* @param $fields
* An array of fields on which to insert. This array may be indexed or
* associative. If indexed, the array is taken to be the list of fields.
* If associative, the keys of the array are taken to be the fields and
* the values are taken to be corresponding values to insert. If a
* $values argument is provided, $fields must be indexed.
* @param $values
* An array of fields to insert into the database. The values must be
* specified in the same order as the $fields array.
*
* @return \Drupal\Core\Database\Query\Insert
* The called object.
*/
public function fields(array $fields, array $values = array()) {
if (empty($this->insertFields)) {
if (empty($values)) {
if (!is_numeric(key($fields))) {
$values = array_values($fields);
$fields = array_keys($fields);
}
}
$this->insertFields = $fields;
if (!empty($values)) {
$this->insertValues[] = $values;
}
}
return $this;
}
/**
* Adds another set of values to the query to be inserted.
*
* If $values is a numeric-keyed array, it will be assumed to be in the same
* order as the original fields() call. If it is associative, it may be
* in any order as long as the keys of the array match the names of the
* fields.
*
* @param $values
* An array of values to add to the query.
*
* @return \Drupal\Core\Database\Query\Insert
* The called object.
*/
public function values(array $values) {
if (is_numeric(key($values))) {
$this->insertValues[] = $values;
}
else {
// Reorder the submitted values to match the fields array.
foreach ($this->insertFields as $key) {
$insert_values[$key] = $values[$key];
}
// For consistency, the values array is always numerically indexed.
$this->insertValues[] = array_values($insert_values);
}
return $this;
}
/**
* Specifies fields for which the database defaults should be used.
*
* If you want to force a given field to use the database-defined default,
* not NULL or undefined, use this method to instruct the database to use
* default values explicitly. In most cases this will not be necessary
* unless you are inserting a row that is all default values, as you cannot
* specify no values in an INSERT query.
*
* Specifying a field both in fields() and in useDefaults() is an error
* and will not execute.
*
* @param $fields
* An array of values for which to use the default values
* specified in the table definition.
*
* @return \Drupal\Core\Database\Query\Insert
* The called object.
*/
public function useDefaults(array $fields) {
$this->defaultFields = $fields;
return $this;
}
/**
* Sets the fromQuery on this InsertQuery object.
*
......@@ -265,13 +139,13 @@ public function __toString() {
/**
* Preprocesses and validates the query.
*
* @return
* @return bool
* TRUE if the validation was successful, FALSE if not.
*
* @throws \Drupal\Core\Database\Query\FieldsOverlapException
* @throws \Drupal\Core\Database\Query\NoFieldsException
*/
public function preExecute() {
protected function preExecute() {
// Confirm that the user did not try to specify an identical
// field and default field.
if (array_intersect($this->insertFields, $this->defaultFields)) {
......
<?php
/**
* @file
* Contains \Drupal\Core\Database\Query\InsertTrait.
*/
namespace Drupal\Core\Database\Query;
/**
* Provides common functionality for INSERT and UPSERT queries.
*
* @ingroup database
*/
trait InsertTrait {
/**
* The table on which to insert.
*
* @var string
*/
protected $table;
/**
* An array of fields on which to insert.
*
* @var array
*/
protected $insertFields = array();
/**
* An array of fields that should be set to their database-defined defaults.
*
* @var array
*/
protected $defaultFields = array();
/**
* A nested array of values to insert.
*
* $insertValues is an array of arrays. Each sub-array is either an
* associative array whose keys are field names and whose values are field
* values to insert, or a non-associative array of values in the same order
* as $insertFields.
*
* Whether multiple insert sets will be run in a single query or multiple
* queries is left to individual drivers to implement in whatever manner is
* most appropriate. The order of values in each sub-array must match the
* order of fields in $insertFields.
*
* @var array
*/
protected $insertValues = array();
/**
* Adds a set of field->value pairs to be inserted.
*
* This method may only be called once. Calling it a second time will be
* ignored. To queue up multiple sets of values to be inserted at once,
* use the values() method.
*
* @param array $fields
* An array of fields on which to insert. This array may be indexed or
* associative. If indexed, the array is taken to be the list of fields.
* If associative, the keys of the array are taken to be the fields and
* the values are taken to be corresponding values to insert. If a
* $values argument is provided, $fields must be indexed.
* @param array $values
* (optional) An array of fields to insert into the database. The values
* must be specified in the same order as the $fields array.
*
* @return $this
* The called object.
*/
public function fields(array $fields, array $values = array()) {
if (empty($this->insertFields)) {
if (empty($values)) {
if (!is_numeric(key($fields))) {
$values = array_values($fields);
$fields = array_keys($fields);
}
}
$this->insertFields = $fields;
if (!empty($values)) {
$this->insertValues[] = $values;
}
}
return $this;
}
/**
* Adds another set of values to the query to be inserted.
*
* If $values is a numeric-keyed array, it will be assumed to be in the same
* order as the original fields() call. If it is associative, it may be
* in any order as long as the keys of the array match the names of the
* fields.
*
* @param array $values
* An array of values to add to the query.
*
* @return $this
* The called object.
*/
public function values(array $values) {
if (is_numeric(key($values))) {
$this->insertValues[] = $values;
}
elseif ($this->insertFields) {
// Reorder the submitted values to match the fields array.
foreach ($this->insertFields as $key) {
$insert_values[$key] = $values[$key];
}
// For consistency, the values array is always numerically indexed.
$this->insertValues[] = array_values($insert_values);
}
return $this;
}
/**
* Specifies fields for which the database defaults should be used.
*
* If you want to force a given field to use the database-defined default,
* not NULL or undefined, use this method to instruct the database to use
* default values explicitly. In most cases this will not be necessary
* unless you are inserting a row that is all default values, as you cannot
* specify no values in an INSERT query.
*
* Specifying a field both in fields() and in useDefaults() is an error
* and will not execute.
*
* @param array $fields
* An array of values for which to use the default values
* specified in the table definition.
*
* @return $this
* The called object.
*/
public function useDefaults(array $fields) {
$this->defaultFields = $fields;
return $this;
}
/**
* Returns the query placeholders for values that will be inserted.
*
* @param array $nested_insert_values
* A nested array of values to insert.
* @param array $default_fields
* An array of fields that should be set to their database-defined defaults.
*
* @return array
* An array of insert placeholders.
*/
protected function getInsertPlaceholderFragment(array $nested_insert_values, array $default_fields) {
$max_placeholder = 0;
$values = array();
if ($nested_insert_values) {
foreach ($nested_insert_values as $insert_values) {
$placeholders = array();
// De