Schema.php 31.8 KB
Newer Older
Crell's avatar
Crell committed
1 2 3 4
<?php

namespace Drupal\Core\Database\Driver\pgsql;

5
use Drupal\Component\Utility\Unicode;
Crell's avatar
Crell committed
6 7 8 9 10
use Drupal\Core\Database\SchemaObjectExistsException;
use Drupal\Core\Database\SchemaObjectDoesNotExistException;
use Drupal\Core\Database\Schema as DatabaseSchema;

/**
11
 * @addtogroup schemaapi
Crell's avatar
Crell committed
12 13 14
 * @{
 */

15 16 17
/**
 * PostgreSQL implementation of \Drupal\Core\Database\Schema.
 */
Crell's avatar
Crell committed
18 19 20 21 22 23 24 25
class Schema extends DatabaseSchema {

  /**
   * A cache of information about blob columns and sequences of tables.
   *
   * This is collected by DatabaseConnection_pgsql->queryTableInformation(),
   * by introspecting the database.
   *
26
   * @see \Drupal\Core\Database\Driver\pgsql\Schema::queryTableInformation()
Crell's avatar
Crell committed
27 28 29 30
   * @var array
   */
  protected $tableInformation = array();

31 32 33 34 35 36
  /**
   * The maximum allowed length for index, primary key and constraint names.
   *
   * Value will usually be set to a 63 chars limit but PostgreSQL allows
   * to higher this value before compiling, so we need to check for that.
   *
37
   * @var int
38 39 40 41 42 43
   */
  protected $maxIdentifierLength;

  /**
   * Make sure to limit identifiers according to PostgreSQL compiled in length.
   *
44 45 46 47
   * PostgreSQL allows in standard configuration no longer identifiers than 63
   * chars for table/relation names, indexes, primary keys, and constraints. So
   * we map all identifiers that are too long to drupal_base64hash_tag, where
   * tag is one of:
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
   *   - idx for indexes
   *   - key for constraints
   *   - pkey for primary keys
   *
   * @param $identifiers
   *   The arguments to build the identifier string
   * @return
   *   The index/constraint/pkey identifier
   */
  protected function ensureIdentifiersLength($identifier) {
    $args = func_get_args();
    $info = $this->getPrefixInfo($identifier);
    $args[0] = $info['table'];
    $identifierName = implode('__', $args);

    // Retrieve the max identifier length which is usually 63 characters
    // but can be altered before PostgreSQL is compiled so we need to check.
    $this->maxIdentifierLength = $this->connection->query("SHOW max_identifier_length")->fetchField();

    if (strlen($identifierName) > $this->maxIdentifierLength) {
68
      $saveIdentifier = '"drupal_' . $this->hashBase64($identifierName) . '_' . $args[2] . '"';
69 70 71 72 73 74 75
    }
    else {
      $saveIdentifier = $identifierName;
    }
    return $saveIdentifier;
  }

Crell's avatar
Crell committed
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
  /**
   * Fetch the list of blobs and sequences used on a table.
   *
   * We introspect the database to collect the information required by insert
   * and update queries.
   *
   * @param $table_name
   *   The non-prefixed name of the table.
   * @return
   *   An object with two member variables:
   *     - 'blob_fields' that lists all the blob fields in the table.
   *     - 'sequences' that lists the sequences used in that table.
   */
  public function queryTableInformation($table) {
    // Generate a key to reference this table's information on.
    $key = $this->connection->prefixTables('{' . $table . '}');
92 93 94 95 96

    // Take into account that temporary tables are stored in a different schema.
    // \Drupal\Core\Database\Connection::generateTemporaryTableName() sets the
    // 'db_temporary_' prefix to all temporary tables.
    if (strpos($key, '.') === FALSE && strpos($table, 'db_temporary_') === FALSE) {
Crell's avatar
Crell committed
97 98
      $key = 'public.' . $key;
    }
99 100 101 102
    else {
      $schema = $this->connection->query('SELECT nspname FROM pg_namespace WHERE oid = pg_my_temp_schema()')->fetchField();
      $key = $schema . '.' . $key;
    }
Crell's avatar
Crell committed
103 104 105 106 107 108 109 110 111

    if (!isset($this->tableInformation[$key])) {
      // Split the key into schema and table for querying.
      list($schema, $table_name) = explode('.', $key);
      $table_information = (object) array(
        'blob_fields' => array(),
        'sequences' => array(),
      );
      // Don't use {} around information_schema.columns table.
112 113 114
      $this->connection->addSavepoint();

      try {
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
        // Check if the table information exists in the PostgreSQL metadata.
        $table_information_exists = (bool) $this->connection->query("SELECT 1 FROM pg_class WHERE relname = :table", array(':table' => $table_name))->fetchField();

        // If the table information does not yet exist in the PostgreSQL
        // metadata, then return the default table information here, so that it
        // will not be cached.
        if (!$table_information_exists) {
          $this->connection->releaseSavepoint();
          return $table_information;
        }
        else {
          $result = $this->connection->query("SELECT column_name, data_type, column_default FROM information_schema.columns WHERE table_schema = :schema AND table_name = :table AND (data_type = 'bytea' OR (numeric_precision IS NOT NULL AND column_default LIKE :default))", array(
            ':schema' => $schema,
            ':table' => $table_name,
            ':default' => '%nextval%',
          ));
        }
132 133 134 135 136 137 138
      }
      catch (\Exception $e) {
        $this->connection->rollbackSavepoint();
        throw $e;
      }
      $this->connection->releaseSavepoint();

Crell's avatar
Crell committed
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
      foreach ($result as $column) {
        if ($column->data_type == 'bytea') {
          $table_information->blob_fields[$column->column_name] = TRUE;
        }
        elseif (preg_match("/nextval\('([^']+)'/", $column->column_default, $matches)) {
          // We must know of any sequences in the table structure to help us
          // return the last insert id. If there is more than 1 sequences the
          // first one (index 0 of the sequences array) will be used.
          $table_information->sequences[] = $matches[1];
          $table_information->serial_fields[] = $column->column_name;
        }
      }
      $this->tableInformation[$key] = $table_information;
    }
    return $this->tableInformation[$key];
  }

156 157 158 159 160 161 162 163 164 165 166 167 168 169
  /**
   * Resets information about table blobs, sequences and serial fields.
   *
   * @param $table
   *   The non-prefixed name of the table.
   */
  protected function resetTableInformation($table) {
    $key = $this->connection->prefixTables('{' . $table . '}');
    if (strpos($key, '.') === FALSE) {
      $key = 'public.' . $key;
    }
    unset($this->tableInformation[$key]);
  }

Crell's avatar
Crell committed
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
  /**
   * Fetch the list of CHECK constraints used on a field.
   *
   * We introspect the database to collect the information required by field
   * alteration.
   *
   * @param $table
   *   The non-prefixed name of the table.
   * @param $field
   *   The name of the field.
   * @return
   *   An array of all the checks for the field.
   */
  public function queryFieldInformation($table, $field) {
    $prefixInfo = $this->getPrefixInfo($table, TRUE);

    // Split the key into schema and table for querying.
    $schema = $prefixInfo['schema'];
    $table_name = $prefixInfo['table'];

190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
    $this->connection->addSavepoint();

    try {
      $checks = $this->connection->query("SELECT conname FROM pg_class cl INNER JOIN pg_constraint co ON co.conrelid = cl.oid INNER JOIN pg_attribute attr ON attr.attrelid = cl.oid AND attr.attnum = ANY (co.conkey) INNER JOIN pg_namespace ns ON cl.relnamespace = ns.oid WHERE co.contype = 'c' AND ns.nspname = :schema AND cl.relname = :table AND attr.attname = :column", array(
        ':schema' => $schema,
        ':table' => $table_name,
        ':column' => $field,
      ));
    }
    catch (\Exception $e) {
      $this->connection->rollbackSavepoint();
      throw $e;
    }

    $this->connection->releaseSavepoint();

Crell's avatar
Crell committed
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
    $field_information = $checks->fetchCol();

    return $field_information;
  }

  /**
   * Generate SQL to create a new table from a Drupal schema definition.
   *
   * @param $name
   *   The name of the table to create.
   * @param $table
   *   A Schema API table definition array.
   * @return
   *   An array of SQL statements to create the table.
   */
  protected function createTableSql($name, $table) {
    $sql_fields = array();
    foreach ($table['fields'] as $field_name => $field) {
      $sql_fields[] = $this->createFieldSql($field_name, $this->processField($field));
    }

    $sql_keys = array();
    if (isset($table['primary key']) && is_array($table['primary key'])) {
229
      $sql_keys[] = 'CONSTRAINT ' . $this->ensureIdentifiersLength($name, '', 'pkey') . ' PRIMARY KEY (' . $this->createPrimaryKeySql($table['primary key']) . ')';
Crell's avatar
Crell committed
230 231 232
    }
    if (isset($table['unique keys']) && is_array($table['unique keys'])) {
      foreach ($table['unique keys'] as $key_name => $key) {
233
        $sql_keys[] = 'CONSTRAINT ' . $this->ensureIdentifiersLength($name, $key_name, 'key') . ' UNIQUE (' . implode(', ', $key) . ')';
Crell's avatar
Crell committed
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
      }
    }

    $sql = "CREATE TABLE {" . $name . "} (\n\t";
    $sql .= implode(",\n\t", $sql_fields);
    if (count($sql_keys) > 0) {
      $sql .= ",\n\t";
    }
    $sql .= implode(",\n\t", $sql_keys);
    $sql .= "\n)";
    $statements[] = $sql;

    if (isset($table['indexes']) && is_array($table['indexes'])) {
      foreach ($table['indexes'] as $key_name => $key) {
        $statements[] = $this->_createIndexSql($name, $key_name, $key);
      }
    }

    // Add table comment.
    if (!empty($table['description'])) {
      $statements[] = 'COMMENT ON TABLE {' . $name . '} IS ' . $this->prepareComment($table['description']);
    }

    // Add column comments.
    foreach ($table['fields'] as $field_name => $field) {
      if (!empty($field['description'])) {
        $statements[] = 'COMMENT ON COLUMN {' . $name . '}.' . $field_name . ' IS ' . $this->prepareComment($field['description']);
      }
    }

    return $statements;
  }

  /**
   * Create an SQL string for a field to be used in table creation or
   * alteration.
   *
   * Before passing a field out of a schema definition into this
   * function it has to be processed by _db_process_field().
   *
   * @param $name
   *    Name of the field.
   * @param $spec
   *    The field specification, as per the schema data structure format.
   */
  protected function createFieldSql($name, $spec) {
280 281
    // The PostgreSQL server converts names into lowercase, unless quoted.
    $sql = '"' . $name . '" ' . $spec['pgsql_type'];
Crell's avatar
Crell committed
282 283 284 285 286

    if (isset($spec['type']) && $spec['type'] == 'serial') {
      unset($spec['not null']);
    }

287
    if (in_array($spec['pgsql_type'], array('varchar', 'character')) && isset($spec['length'])) {
Crell's avatar
Crell committed
288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
      $sql .= '(' . $spec['length'] . ')';
    }
    elseif (isset($spec['precision']) && isset($spec['scale'])) {
      $sql .= '(' . $spec['precision'] . ', ' . $spec['scale'] . ')';
    }

    if (!empty($spec['unsigned'])) {
      $sql .= " CHECK ($name >= 0)";
    }

    if (isset($spec['not null'])) {
      if ($spec['not null']) {
        $sql .= ' NOT NULL';
      }
      else {
        $sql .= ' NULL';
      }
    }
306 307
    if (array_key_exists('default', $spec)) {
      $default = $this->escapeDefaultValue($spec['default']);
Crell's avatar
Crell committed
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
      $sql .= " default $default";
    }

    return $sql;
  }

  /**
   * Set database-engine specific properties for a field.
   *
   * @param $field
   *   A field description array, as specified in the schema documentation.
   */
  protected function processField($field) {
    if (!isset($field['size'])) {
      $field['size'] = 'normal';
    }

    // Set the correct database-engine specific datatype.
    // In case one is already provided, force it to lowercase.
    if (isset($field['pgsql_type'])) {
328
      $field['pgsql_type'] = Unicode::strtolower($field['pgsql_type']);
Crell's avatar
Crell committed
329 330 331 332 333 334 335
    }
    else {
      $map = $this->getFieldTypeMap();
      $field['pgsql_type'] = $map[$field['type'] . ':' . $field['size']];
    }

    if (!empty($field['unsigned'])) {
336
      // Unsigned datatypes are not supported in PostgreSQL 9.1. In MySQL,
Crell's avatar
Crell committed
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369
      // they are used to ensure a positive number is inserted and it also
      // doubles the maximum integer size that can be stored in a field.
      // The PostgreSQL schema in Drupal creates a check constraint
      // to ensure that a value inserted is >= 0. To provide the extra
      // integer capacity, here, we bump up the column field size.
      if (!isset($map)) {
        $map = $this->getFieldTypeMap();
      }
      switch ($field['pgsql_type']) {
        case 'smallint':
          $field['pgsql_type'] = $map['int:medium'];
          break;
        case 'int' :
          $field['pgsql_type'] = $map['int:big'];
          break;
      }
    }
    if (isset($field['type']) && $field['type'] == 'serial') {
      unset($field['not null']);
    }
    return $field;
  }

  /**
   * This maps a generic data type in combination with its data size
   * to the engine-specific data type.
   */
  function getFieldTypeMap() {
    // Put :normal last so it gets preserved by array_flip. This makes
    // it much easier for modules (such as schema.module) to map
    // database types back into schema types.
    // $map does not use drupal_static as its value never changes.
    static $map = array(
370 371
      'varchar_ascii:normal' => 'varchar',

Crell's avatar
Crell committed
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419
      'varchar:normal' => 'varchar',
      'char:normal' => 'character',

      'text:tiny' => 'text',
      'text:small' => 'text',
      'text:medium' => 'text',
      'text:big' => 'text',
      'text:normal' => 'text',

      'int:tiny' => 'smallint',
      'int:small' => 'smallint',
      'int:medium' => 'int',
      'int:big' => 'bigint',
      'int:normal' => 'int',

      'float:tiny' => 'real',
      'float:small' => 'real',
      'float:medium' => 'real',
      'float:big' => 'double precision',
      'float:normal' => 'real',

      'numeric:normal' => 'numeric',

      'blob:big' => 'bytea',
      'blob:normal' => 'bytea',

      'serial:tiny' => 'serial',
      'serial:small' => 'serial',
      'serial:medium' => 'serial',
      'serial:big' => 'bigserial',
      'serial:normal' => 'serial',
      );
    return $map;
  }

  protected function _createKeySql($fields) {
    $return = array();
    foreach ($fields as $field) {
      if (is_array($field)) {
        $return[] = 'substr(' . $field[0] . ', 1, ' . $field[1] . ')';
      }
      else {
        $return[] = '"' . $field . '"';
      }
    }
    return implode(', ', $return);
  }

420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439
  /**
   * Create the SQL expression for primary keys.
   *
   * Postgresql does not support key length. It does support fillfactor, but
   * that requires a separate database lookup for each column in the key. The
   * key length defined in the schema is ignored.
   */
  protected function createPrimaryKeySql($fields) {
    $return = array();
    foreach ($fields as $field) {
      if (is_array($field)) {
        $return[] = '"' . $field[0] . '"';
      }
      else {
        $return[] = '"' . $field . '"';
      }
    }
    return implode(', ', $return);
  }

440 441 442 443 444 445 446 447 448
  /**
   * {@inheritdoc}
   */
  public function tableExists($table) {
    $prefixInfo = $this->getPrefixInfo($table, TRUE);

    return (bool) $this->connection->query("SELECT 1 FROM pg_tables WHERE schemaname = :schema AND tablename = :table", array(':schema' => $prefixInfo['schema'], ':table' => $prefixInfo['table']))->fetchField();
  }

Crell's avatar
Crell committed
449 450
  function renameTable($table, $new_name) {
    if (!$this->tableExists($table)) {
451
      throw new SchemaObjectDoesNotExistException(t("Cannot rename @table to @table_new: table @table doesn't exist.", array('@table' => $table, '@table_new' => $new_name)));
Crell's avatar
Crell committed
452 453
    }
    if ($this->tableExists($new_name)) {
454
      throw new SchemaObjectExistsException(t("Cannot rename @table to @table_new: table @table_new already exists.", array('@table' => $table, '@table_new' => $new_name)));
Crell's avatar
Crell committed
455 456 457 458 459 460 461 462 463
    }

    // Get the schema and tablename for the old table.
    $old_full_name = $this->connection->prefixTables('{' . $table . '}');
    list($old_schema, $old_table_name) = strpos($old_full_name, '.') ? explode('.', $old_full_name) : array('public', $old_full_name);

    // Index names and constraint names are global in PostgreSQL, so we need to
    // rename them when renaming the table.
    $indexes = $this->connection->query('SELECT indexname FROM pg_indexes WHERE schemaname = :schema AND tablename = :table', array(':schema' => $old_schema, ':table' => $old_table_name));
464

Crell's avatar
Crell committed
465
    foreach ($indexes as $index) {
466 467 468 469 470 471 472 473
      // Get the index type by suffix, e.g. idx/key/pkey
      $index_type = substr($index->indexname, strrpos($index->indexname, '_') + 1);

      // If the index is already rewritten by ensureIdentifiersLength() to not
      // exceed the 63 chars limit of PostgreSQL, we need to take care of that.
      // Example (drupal_Gk7Su_T1jcBHVuvSPeP22_I3Ni4GrVEgTYlIYnBJkro_idx).
      if (strpos($index->indexname, 'drupal_') !== FALSE) {
        preg_match('/^drupal_(.*)_' . preg_quote($index_type) . '/', $index->indexname, $matches);
Crell's avatar
Crell committed
474 475
        $index_name = $matches[1];
      }
476 477 478 479 480 481 482
      else {
        // Make sure to remove the suffix from index names, because
        // $this->ensureIdentifiersLength() will add the suffix again and thus
        // would result in a wrong index name.
        preg_match('/^' . preg_quote($old_full_name) . '__(.*)__' . preg_quote($index_type) . '/', $index->indexname, $matches);
        $index_name = $matches[1];
      }
483
      $this->connection->query('ALTER INDEX "' . $index->indexname . '" RENAME TO ' . $this->ensureIdentifiersLength($new_name, $index_name, $index_type) . '');
Crell's avatar
Crell committed
484 485 486 487
    }

    // Ensure the new table name does not include schema syntax.
    $prefixInfo = $this->getPrefixInfo($new_name);
488 489 490 491 492 493 494 495 496 497 498

    // Rename sequences if there's a serial fields.
    $info = $this->queryTableInformation($table);
    if (!empty($info->serial_fields)) {
      foreach ($info->serial_fields as $field) {
        $old_sequence = $this->prefixNonTable($table, $field, 'seq');
        $new_sequence = $this->prefixNonTable($new_name, $field, 'seq');
        $this->connection->query('ALTER SEQUENCE ' . $old_sequence . ' RENAME TO ' . $new_sequence);
      }
    }
    // Now rename the table.
Crell's avatar
Crell committed
499
    $this->connection->query('ALTER TABLE {' . $table . '} RENAME TO ' . $prefixInfo['table']);
500
    $this->resetTableInformation($table);
Crell's avatar
Crell committed
501 502 503 504 505 506 507 508
  }

  public function dropTable($table) {
    if (!$this->tableExists($table)) {
      return FALSE;
    }

    $this->connection->query('DROP TABLE {' . $table . '}');
509
    $this->resetTableInformation($table);
Crell's avatar
Crell committed
510 511 512 513 514
    return TRUE;
  }

  public function addField($table, $field, $spec, $new_keys = array()) {
    if (!$this->tableExists($table)) {
515
      throw new SchemaObjectDoesNotExistException(t("Cannot add field @table.@field: table doesn't exist.", array('@field' => $field, '@table' => $table)));
Crell's avatar
Crell committed
516 517
    }
    if ($this->fieldExists($table, $field)) {
518
      throw new SchemaObjectExistsException(t("Cannot add field @table.@field: field already exists.", array('@field' => $field, '@table' => $table)));
Crell's avatar
Crell committed
519 520 521 522 523 524 525 526 527 528 529 530 531 532 533
    }

    $fixnull = FALSE;
    if (!empty($spec['not null']) && !isset($spec['default'])) {
      $fixnull = TRUE;
      $spec['not null'] = FALSE;
    }
    $query = 'ALTER TABLE {' . $table . '} ADD COLUMN ';
    $query .= $this->createFieldSql($field, $this->processField($spec));
    $this->connection->query($query);
    if (isset($spec['initial'])) {
      $this->connection->update($table)
        ->fields(array($field => $spec['initial']))
        ->execute();
    }
534 535 536 537 538
    if (isset($spec['initial_from_field'])) {
      $this->connection->update($table)
        ->expression($field, $spec['initial_from_field'])
        ->execute();
    }
Crell's avatar
Crell committed
539 540 541 542 543 544 545 546 547 548
    if ($fixnull) {
      $this->connection->query("ALTER TABLE {" . $table . "} ALTER $field SET NOT NULL");
    }
    if (isset($new_keys)) {
      $this->_createKeys($table, $new_keys);
    }
    // Add column comment.
    if (!empty($spec['description'])) {
      $this->connection->query('COMMENT ON COLUMN {' . $table . '}.' . $field . ' IS ' . $this->prepareComment($spec['description']));
    }
549
    $this->resetTableInformation($table);
Crell's avatar
Crell committed
550 551 552 553 554 555 556 557
  }

  public function dropField($table, $field) {
    if (!$this->fieldExists($table, $field)) {
      return FALSE;
    }

    $this->connection->query('ALTER TABLE {' . $table . '} DROP COLUMN "' . $field . '"');
558
    $this->resetTableInformation($table);
Crell's avatar
Crell committed
559 560 561 562 563
    return TRUE;
  }

  public function fieldSetDefault($table, $field, $default) {
    if (!$this->fieldExists($table, $field)) {
564
      throw new SchemaObjectDoesNotExistException(t("Cannot set default value of field @table.@field: field doesn't exist.", array('@table' => $table, '@field' => $field)));
Crell's avatar
Crell committed
565 566
    }

567
    $default = $this->escapeDefaultValue($default);
Crell's avatar
Crell committed
568 569 570 571 572 573

    $this->connection->query('ALTER TABLE {' . $table . '} ALTER COLUMN "' . $field . '" SET DEFAULT ' . $default);
  }

  public function fieldSetNoDefault($table, $field) {
    if (!$this->fieldExists($table, $field)) {
574
      throw new SchemaObjectDoesNotExistException(t("Cannot remove default value of field @table.@field: field doesn't exist.", array('@table' => $table, '@field' => $field)));
Crell's avatar
Crell committed
575 576 577 578 579 580
    }

    $this->connection->query('ALTER TABLE {' . $table . '} ALTER COLUMN "' . $field . '" DROP DEFAULT');
  }

  public function indexExists($table, $name) {
581
    // Details http://www.postgresql.org/docs/9.1/interactive/view-pg-indexes.html
582
    $index_name = $this->ensureIdentifiersLength($table, $name, 'idx');
583 584 585
    // Remove leading and trailing quotes because the index name is in a WHERE
    // clause and not used as an identifier.
    $index_name = str_replace('"', '', $index_name);
Crell's avatar
Crell committed
586 587 588 589 590 591
    return (bool) $this->connection->query("SELECT 1 FROM pg_indexes WHERE indexname = '$index_name'")->fetchField();
  }

  /**
   * Helper function: check if a constraint (PK, FK, UK) exists.
   *
592
   * @param string $table
Crell's avatar
Crell committed
593
   *   The name of the table.
594 595 596 597 598
   * @param string $name
   *   The name of the constraint (typically 'pkey' or '[constraint]__key').
   *
   * @return bool
   *   TRUE if the constraint exists, FALSE otherwise.
Crell's avatar
Crell committed
599
   */
600
  public function constraintExists($table, $name) {
601 602 603 604 605 606 607 608 609 610 611 612 613
    // ::ensureIdentifiersLength() expects three parameters, although not
    // explicitly stated in its signature, thus we split our constraint name in
    // a proper name and a suffix.
    if ($name == 'pkey') {
      $suffix = $name;
      $name = '';
    }
    else {
      $pos = strrpos($name, '__');
      $suffix = substr($name, $pos + 2);
      $name = substr($name, 0, $pos);
    }
    $constraint_name = $this->ensureIdentifiersLength($table, $name, $suffix);
614 615 616
    // Remove leading and trailing quotes because the index name is in a WHERE
    // clause and not used as an identifier.
    $constraint_name = str_replace('"', '', $constraint_name);
Crell's avatar
Crell committed
617 618 619 620 621
    return (bool) $this->connection->query("SELECT 1 FROM pg_constraint WHERE conname = '$constraint_name'")->fetchField();
  }

  public function addPrimaryKey($table, $fields) {
    if (!$this->tableExists($table)) {
622
      throw new SchemaObjectDoesNotExistException(t("Cannot add primary key to table @table: table doesn't exist.", array('@table' => $table)));
Crell's avatar
Crell committed
623 624
    }
    if ($this->constraintExists($table, 'pkey')) {
625
      throw new SchemaObjectExistsException(t("Cannot add primary key to table @table: primary key already exists.", array('@table' => $table)));
Crell's avatar
Crell committed
626 627
    }

628
    $this->connection->query('ALTER TABLE {' . $table . '} ADD CONSTRAINT ' . $this->ensureIdentifiersLength($table, '', 'pkey') . ' PRIMARY KEY (' . $this->createPrimaryKeySql($fields) . ')');
629
    $this->resetTableInformation($table);
Crell's avatar
Crell committed
630 631 632 633 634 635 636
  }

  public function dropPrimaryKey($table) {
    if (!$this->constraintExists($table, 'pkey')) {
      return FALSE;
    }

637
    $this->connection->query('ALTER TABLE {' . $table . '} DROP CONSTRAINT ' . $this->ensureIdentifiersLength($table, '', 'pkey'));
638
    $this->resetTableInformation($table);
Crell's avatar
Crell committed
639 640 641 642 643
    return TRUE;
  }

  function addUniqueKey($table, $name, $fields) {
    if (!$this->tableExists($table)) {
644
      throw new SchemaObjectDoesNotExistException(t("Cannot add unique key @name to table @table: table doesn't exist.", array('@table' => $table, '@name' => $name)));
Crell's avatar
Crell committed
645
    }
646
    if ($this->constraintExists($table, $name . '__key')) {
647
      throw new SchemaObjectExistsException(t("Cannot add unique key @name to table @table: unique key already exists.", array('@table' => $table, '@name' => $name)));
Crell's avatar
Crell committed
648 649
    }

650
    $this->connection->query('ALTER TABLE {' . $table . '} ADD CONSTRAINT ' . $this->ensureIdentifiersLength($table, $name, 'key') . ' UNIQUE (' . implode(',', $fields) . ')');
651
    $this->resetTableInformation($table);
Crell's avatar
Crell committed
652 653 654
  }

  public function dropUniqueKey($table, $name) {
655
    if (!$this->constraintExists($table, $name . '__key')) {
Crell's avatar
Crell committed
656 657 658
      return FALSE;
    }

659
    $this->connection->query('ALTER TABLE {' . $table . '} DROP CONSTRAINT ' . $this->ensureIdentifiersLength($table, $name, 'key'));
660
    $this->resetTableInformation($table);
Crell's avatar
Crell committed
661 662 663
    return TRUE;
  }

664 665 666 667
  /**
   * {@inheritdoc}
   */
  public function addIndex($table, $name, $fields, array $spec) {
Crell's avatar
Crell committed
668
    if (!$this->tableExists($table)) {
669
      throw new SchemaObjectDoesNotExistException(t("Cannot add index @name to table @table: table doesn't exist.", array('@table' => $table, '@name' => $name)));
Crell's avatar
Crell committed
670 671
    }
    if ($this->indexExists($table, $name)) {
672
      throw new SchemaObjectExistsException(t("Cannot add index @name to table @table: index already exists.", array('@table' => $table, '@name' => $name)));
Crell's avatar
Crell committed
673 674 675
    }

    $this->connection->query($this->_createIndexSql($table, $name, $fields));
676
    $this->resetTableInformation($table);
Crell's avatar
Crell committed
677 678 679 680 681 682 683
  }

  public function dropIndex($table, $name) {
    if (!$this->indexExists($table, $name)) {
      return FALSE;
    }

684
    $this->connection->query('DROP INDEX ' . $this->ensureIdentifiersLength($table, $name, 'idx'));
685
    $this->resetTableInformation($table);
Crell's avatar
Crell committed
686 687 688 689 690
    return TRUE;
  }

  public function changeField($table, $field, $field_new, $spec, $new_keys = array()) {
    if (!$this->fieldExists($table, $field)) {
691
      throw new SchemaObjectDoesNotExistException(t("Cannot change the definition of field @table.@name: field doesn't exist.", array('@table' => $table, '@name' => $field)));
Crell's avatar
Crell committed
692 693
    }
    if (($field != $field_new) && $this->fieldExists($table, $field_new)) {
694
      throw new SchemaObjectExistsException(t("Cannot rename field @table.@name to @name_new: target field already exists.", array('@table' => $table, '@name' => $field, '@name_new' => $field_new)));
Crell's avatar
Crell committed
695 696 697 698
    }

    $spec = $this->processField($spec);

699 700 701 702 703
    // Type 'serial' is known to PostgreSQL, but only during table creation,
    // not when altering. Because of that, we create it here as an 'int'. After
    // we create it we manually re-apply the sequence.
    if (in_array($spec['pgsql_type'], array('serial', 'bigserial'))) {
      $field_def = 'int';
Crell's avatar
Crell committed
704 705
    }
    else {
706
      $field_def = $spec['pgsql_type'];
Crell's avatar
Crell committed
707 708 709
    }

    if (in_array($spec['pgsql_type'], array('varchar', 'character', 'text')) && isset($spec['length'])) {
710
      $field_def .= '(' . $spec['length'] . ')';
Crell's avatar
Crell committed
711 712
    }
    elseif (isset($spec['precision']) && isset($spec['scale'])) {
713
      $field_def .= '(' . $spec['precision'] . ', ' . $spec['scale'] . ')';
Crell's avatar
Crell committed
714 715 716 717 718 719 720 721 722 723 724 725
    }

    // Remove old check constraints.
    $field_info = $this->queryFieldInformation($table, $field);

    foreach ($field_info as $check) {
      $this->connection->query('ALTER TABLE {' . $table . '} DROP CONSTRAINT "' . $check . '"');
    }

    // Remove old default.
    $this->fieldSetNoDefault($table, $field);

726 727 728 729
    // Convert field type.
    // Usually, we do this via a simple typecast 'USING fieldname::type'. But
    // the typecast does not work for conversions to bytea.
    // @see http://www.postgresql.org/docs/current/static/datatype-binary.html
730 731
    $table_information = $this->queryTableInformation($table);
    $is_bytea = !empty($table_information->blob_fields[$field]);
732
    if ($spec['pgsql_type'] != 'bytea') {
733 734 735 736 737 738
      if ($is_bytea) {
        $this->connection->query('ALTER TABLE {' . $table . '} ALTER "' . $field . '" TYPE ' . $field_def . ' USING convert_from("' . $field . '"' . ", 'UTF8')");
      }
      else {
        $this->connection->query('ALTER TABLE {' . $table . '} ALTER "' . $field . '" TYPE ' . $field_def . ' USING "' . $field . '"::' . $field_def);
      }
739 740 741
    }
    else {
      // Do not attempt to convert a field that is bytea already.
742
      if (!$is_bytea) {
743 744 745
        // Convert to a bytea type by using the SQL replace() function to
        // convert any single backslashes in the field content to double
        // backslashes ('\' to '\\').
746
        $this->connection->query('ALTER TABLE {' . $table . '} ALTER "' . $field . '" TYPE ' . $field_def . ' USING decode(replace("' . $field . '"' . ", E'\\\\', E'\\\\\\\\'), 'escape');");
747 748
      }
    }
Crell's avatar
Crell committed
749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768

    if (isset($spec['not null'])) {
      if ($spec['not null']) {
        $nullaction = 'SET NOT NULL';
      }
      else {
        $nullaction = 'DROP NOT NULL';
      }
      $this->connection->query('ALTER TABLE {' . $table . '} ALTER "' . $field . '" ' . $nullaction);
    }

    if (in_array($spec['pgsql_type'], array('serial', 'bigserial'))) {
      // Type "serial" is known to PostgreSQL, but *only* during table creation,
      // not when altering. Because of that, the sequence needs to be created
      // and initialized by hand.
      $seq = "{" . $table . "}_" . $field_new . "_seq";
      $this->connection->query("CREATE SEQUENCE " . $seq);
      // Set sequence to maximal field value to not conflict with existing
      // entries.
      $this->connection->query("SELECT setval('" . $seq . "', MAX(\"" . $field . '")) FROM {' . $table . "}");
769
      $this->connection->query('ALTER TABLE {' . $table . '} ALTER ' . $field . ' SET DEFAULT nextval(' . $this->connection->quote($seq) . ')');
Crell's avatar
Crell committed
770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794
    }

    // Rename the column if necessary.
    if ($field != $field_new) {
      $this->connection->query('ALTER TABLE {' . $table . '} RENAME "' . $field . '" TO "' . $field_new . '"');
    }

    // Add unsigned check if necessary.
    if (!empty($spec['unsigned'])) {
      $this->connection->query('ALTER TABLE {' . $table . '} ADD CHECK ("' . $field_new . '" >= 0)');
    }

    // Add default if necessary.
    if (isset($spec['default'])) {
      $this->fieldSetDefault($table, $field_new, $spec['default']);
    }

    // Change description if necessary.
    if (!empty($spec['description'])) {
      $this->connection->query('COMMENT ON COLUMN {' . $table . '}."' . $field_new . '" IS ' . $this->prepareComment($spec['description']));
    }

    if (isset($new_keys)) {
      $this->_createKeys($table, $new_keys);
    }
795
    $this->resetTableInformation($table);
Crell's avatar
Crell committed
796 797 798
  }

  protected function _createIndexSql($table, $name, $fields) {
799
    $query = 'CREATE INDEX ' . $this->ensureIdentifiersLength($table, $name, 'idx') . ' ON {' . $table . '} (';
Crell's avatar
Crell committed
800 801 802 803 804 805 806 807 808 809 810 811 812 813 814
    $query .= $this->_createKeySql($fields) . ')';
    return $query;
  }

  protected function _createKeys($table, $new_keys) {
    if (isset($new_keys['primary key'])) {
      $this->addPrimaryKey($table, $new_keys['primary key']);
    }
    if (isset($new_keys['unique keys'])) {
      foreach ($new_keys['unique keys'] as $name => $fields) {
        $this->addUniqueKey($table, $name, $fields);
      }
    }
    if (isset($new_keys['indexes'])) {
      foreach ($new_keys['indexes'] as $name => $fields) {
815 816 817 818
        // Even though $new_keys is not a full schema it still has 'indexes' and
        // so is a partial schema. Technically addIndex() doesn't do anything
        // with it so passing an empty array would work as well.
        $this->addIndex($table, $name, $fields, $new_keys);
Crell's avatar
Crell committed
819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835
      }
    }
  }

  /**
   * Retrieve a table or column comment.
   */
  public function getComment($table, $column = NULL) {
    $info = $this->getPrefixInfo($table);
    // Don't use {} around pg_class, pg_attribute tables.
    if (isset($column)) {
      return $this->connection->query('SELECT col_description(oid, attnum) FROM pg_class, pg_attribute WHERE attrelid = oid AND relname = ? AND attname = ?', array($info['table'], $column))->fetchField();
    }
    else {
      return $this->connection->query('SELECT obj_description(oid, ?) FROM pg_class WHERE relname = ?', array('pg_class', $info['table']))->fetchField();
    }
  }
836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851

  /**
   * Calculates a base-64 encoded, PostgreSQL-safe sha-256 hash per PostgreSQL
   * documentation: 4.1. Lexical Structure.
   *
   * @param $data
   *   String to be hashed.
   * @return string
   *   A base-64 encoded sha-256 hash, with + and / replaced with _ and any =
   *   padding characters removed.
   */
  protected function hashBase64($data) {
    $hash = base64_encode(hash('sha256', $data, TRUE));
    // Modify the hash so it's safe to use in PostgreSQL identifiers.
    return strtr($hash, array('+' => '_', '/' => '_', '=' => ''));
  }
852

Crell's avatar
Crell committed
853 854 855
}

/**
856
 * @} End of "addtogroup schemaapi".
Crell's avatar
Crell committed
857
 */