Commit 170f1ebc authored by Drew Webber's avatar Drew Webber
Browse files

Issue #998898 by stefan.r, Damien Tournoud, mradcliffe, mcdruid, sylus,...

Issue #998898 by stefan.r, Damien Tournoud, mradcliffe, mcdruid, sylus, poker10, RoSk0, bzrudi71, Josh Waihi, Stevel, chalet16, chx, cmonnow, webchick, jhedstrom: Make sure that the identifiers are not more the 63 characters on PostgreSQL
parent e3b239c3
Loading
Loading
Loading
Loading
+123 −13
Original line number Diff line number Diff line
@@ -23,6 +23,64 @@ class DatabaseSchema_pgsql extends DatabaseSchema {
   */
  protected $tableInformation = array();

  /**
   * The maximum allowed length for index, primary key and constraint names.
   *
   * Value will usually be set to a 63 chars limit but PostgreSQL allows
   * to higher this value before compiling, so we need to check for that.
   *
   * @var int
   */
  protected $maxIdentifierLength;

  /**
   * Make sure to limit identifiers according to PostgreSQL compiled in length.
   *
   * PostgreSQL allows in standard configuration identifiers no longer than 63
   * chars for table/relation names, indexes, primary keys, and constraints. So
   * we map all identifiers that are too long to drupal_base64hash_tag, where
   * tag is one of:
   *   - idx for indexes
   *   - key for constraints
   *   - pkey for primary keys
   *   - seq for sequences
   *
   * @param string $table_identifier_part
   *   The first argument used to build the identifier string. This usually
   *   refers to a table/relation name.
   * @param string $column_identifier_part
   *   The second argument used to build the identifier string. This usually
   *   refers to one or more column names.
   * @param string $tag
   *   The identifier tag. It can be one of 'idx', 'key', 'pkey' or 'seq'.
   *
   * @return string
   *   The index/constraint/pkey identifier.
   */
  protected function ensureIdentifiersLength($table_identifier_part, $column_identifier_part, $tag) {
    $info = $this->getPrefixInfo($table_identifier_part);
    $table_identifier_part = $info['table'];

    // Filters out potentially empty $column_identifier_part to ensure
    // compatibility with old naming convention (see prefixNonTable()).
    $identifiers = array_filter(array($table_identifier_part, $column_identifier_part, $tag));
    $identifierName = implode('_', $identifiers);

    // Retrieve the max identifier length which is usually 63 characters
    // but can be altered before PostgreSQL is compiled so we need to check.
    if (empty($this->maxIdentifierLength)) {
      $this->maxIdentifierLength = $this->connection->query("SHOW max_identifier_length")->fetchField();
    }

    if (strlen($identifierName) > $this->maxIdentifierLength) {
      $saveIdentifier = 'drupal_' . $this->hashBase64($identifierName) . '_' . $tag;
    }
    else {
      $saveIdentifier = $identifierName;
    }
    return $saveIdentifier;
  }

  /**
   * Fetch the list of blobs and sequences used on a table.
   *
@@ -124,11 +182,11 @@ protected function createTableSql($name, $table) {

    $sql_keys = array();
    if (isset($table['primary key']) && is_array($table['primary key'])) {
      $sql_keys[] = 'PRIMARY KEY (' . implode(', ', $table['primary key']) . ')';
      $sql_keys[] = 'CONSTRAINT ' . $this->ensureIdentifiersLength($name, '', 'pkey') . ' PRIMARY KEY (' . implode(', ', $table['primary key']) . ')';
    }
    if (isset($table['unique keys']) && is_array($table['unique keys'])) {
      foreach ($table['unique keys'] as $key_name => $key) {
        $sql_keys[] = 'CONSTRAINT ' . $this->prefixNonTable($name, $key_name, 'key') . ' UNIQUE (' . implode(', ', $key) . ')';
        $sql_keys[] = 'CONSTRAINT ' . $this->ensureIdentifiersLength($name, $key_name, 'key') . ' UNIQUE (' . implode(', ', $key) . ')';
      }
    }

@@ -328,10 +386,31 @@ function renameTable($table, $new_name) {
    // rename them when renaming the table.
    $indexes = $this->connection->query('SELECT indexname FROM pg_indexes WHERE schemaname = :schema AND tablename = :table', array(':schema' => $old_schema, ':table' => $old_table_name));
    foreach ($indexes as $index) {
      if (preg_match('/^' . preg_quote($old_full_name) . '_(.*)$/', $index->indexname, $matches)) {
      // Get the index type by suffix, e.g. idx/key/pkey
      $index_type = substr($index->indexname, strrpos($index->indexname, '_') + 1);

      // If the index is already rewritten by ensureIdentifiersLength() to not
      // exceed the 63 chars limit of PostgreSQL, we need to take care of that.
      // Example (drupal_Gk7Su_T1jcBHVuvSPeP22_I3Ni4GrVEgTYlIYnBJkro_idx).
      if (strpos($index->indexname, 'drupal_') !== FALSE) {
        preg_match('/^drupal_(.*)_' . preg_quote($index_type) . '/', $index->indexname, $matches);
        $index_name = $matches[1];
        $this->connection->query('ALTER INDEX ' . $index->indexname . ' RENAME TO {' . $new_name . '}_' . $index_name);
      }
      else {
        if ($index_type == 'pkey') {
          // Primary keys do not have a specific name in D7.
          $index_name = '';
        }
        else {
          // Make sure to remove the suffix from index names, because
          // ensureIdentifiersLength() will add the suffix again and thus
          // would result in a wrong index name.
          preg_match('/^' . preg_quote($old_full_name) . '_(.*)_' . preg_quote($index_type) . '/', $index->indexname, $matches);
          $index_name = $matches[1];
        }
      }

      $this->connection->query('ALTER INDEX ' . $index->indexname . ' RENAME TO ' . $this->ensureIdentifiersLength($new_name, $index_name, $index_type));
    }

    // Now rename the table.
@@ -415,8 +494,8 @@ public function fieldSetNoDefault($table, $field) {
  }

  public function indexExists($table, $name) {
    // Details http://www.postgresql.org/docs/8.3/interactive/view-pg-indexes.html
    $index_name = '{' . $table . '}_' . $name . '_idx';
    // Details https://www.postgresql.org/docs/10/view-pg-indexes.html
    $index_name = $this->ensureIdentifiersLength($table, $name, 'idx');
    return (bool) $this->connection->query("SELECT 1 FROM pg_indexes WHERE indexname = '$index_name'")->fetchField();
  }

@@ -429,7 +508,18 @@ public function indexExists($table, $name) {
   *   The name of the constraint (typically 'pkey' or '[constraint]_key').
   */
  protected function constraintExists($table, $name) {
    $constraint_name = '{' . $table . '}_' . $name;
    // ensureIdentifiersLength() expects three parameters, thus we split our
    // constraint name in a proper name and a suffix.
    if ($name == 'pkey') {
      $suffix = $name;
      $name = '';
    }
    else {
      $pos = strrpos($name, '_');
      $suffix = substr($name, $pos + 1);
      $name = substr($name, 0, $pos);
    }
    $constraint_name = $this->ensureIdentifiersLength($table, $name, $suffix);
    return (bool) $this->connection->query("SELECT 1 FROM pg_constraint WHERE conname = '$constraint_name'")->fetchField();
  }

@@ -441,7 +531,7 @@ public function addPrimaryKey($table, $fields) {
      throw new DatabaseSchemaObjectExistsException(t("Cannot add primary key to table @table: primary key already exists.", array('@table' => $table)));
    }

    $this->connection->query('ALTER TABLE {' . $table . '} ADD PRIMARY KEY (' . implode(',', $fields) . ')');
    $this->connection->query('ALTER TABLE {' . $table . '} ADD CONSTRAINT ' . $this->ensureIdentifiersLength($table, '', 'pkey') . ' PRIMARY KEY (' . implode(',', $fields) . ')');
  }

  public function dropPrimaryKey($table) {
@@ -449,7 +539,7 @@ public function dropPrimaryKey($table) {
      return FALSE;
    }

    $this->connection->query('ALTER TABLE {' . $table . '} DROP CONSTRAINT ' . $this->prefixNonTable($table, 'pkey'));
    $this->connection->query('ALTER TABLE {' . $table . '} DROP CONSTRAINT ' . $this->ensureIdentifiersLength($table, '', 'pkey'));
    return TRUE;
  }

@@ -461,7 +551,7 @@ function addUniqueKey($table, $name, $fields) {
      throw new DatabaseSchemaObjectExistsException(t("Cannot add unique key @name to table @table: unique key already exists.", array('@table' => $table, '@name' => $name)));
    }

    $this->connection->query('ALTER TABLE {' . $table . '} ADD CONSTRAINT "' . $this->prefixNonTable($table, $name, 'key') . '" UNIQUE (' . implode(',', $fields) . ')');
    $this->connection->query('ALTER TABLE {' . $table . '} ADD CONSTRAINT "' . $this->ensureIdentifiersLength($table, $name, 'key') . '" UNIQUE (' . implode(',', $fields) . ')');
  }

  public function dropUniqueKey($table, $name) {
@@ -469,7 +559,7 @@ public function dropUniqueKey($table, $name) {
      return FALSE;
    }

    $this->connection->query('ALTER TABLE {' . $table . '} DROP CONSTRAINT "' . $this->prefixNonTable($table, $name, 'key') . '"');
    $this->connection->query('ALTER TABLE {' . $table . '} DROP CONSTRAINT "' . $this->ensureIdentifiersLength($table, $name, 'key') . '"');
    return TRUE;
  }

@@ -489,7 +579,7 @@ public function dropIndex($table, $name) {
      return FALSE;
    }

    $this->connection->query('DROP INDEX ' . $this->prefixNonTable($table, $name, 'idx'));
    $this->connection->query('DROP INDEX ' . $this->ensureIdentifiersLength($table, $name, 'idx'));
    return TRUE;
  }

@@ -580,7 +670,7 @@ public function changeField($table, $field, $field_new, $spec, $new_keys = array
  }

  protected function _createIndexSql($table, $name, $fields) {
    $query = 'CREATE INDEX "' . $this->prefixNonTable($table, $name, 'idx') . '" ON {' . $table . '} (';
    $query = 'CREATE INDEX "' . $this->ensureIdentifiersLength($table, $name, 'idx') . '" ON {' . $table . '} (';
    $query .= $this->_createKeySql($fields) . ')';
    return $query;
  }
@@ -614,4 +704,24 @@ public function getComment($table, $column = NULL) {
      return $this->connection->query('SELECT obj_description(oid, ?) FROM pg_class WHERE relname = ?', array('pg_class', $info['table']))->fetchField();
    }
  }

  /**
   * Calculates a base-64 encoded, PostgreSQL-safe sha-256 hash per PostgreSQL
   * documentation: 4.1. Lexical Structure.
   *
   * @param $data
   *   String to be hashed.
   *
   * @return string
   *   A base-64 encoded sha-256 hash, with + and / replaced with _ and any =
   *   padding characters removed.
   */
  protected function hashBase64($data) {
    // Ensure lowercase as D7's pgsql driver does not quote identifiers
    // consistently, and they are therefore folded to lowercase by PostgreSQL.
    $hash = strtolower(base64_encode(hash('sha256', $data, TRUE)));
    // Modify the hash so it's safe to use in PostgreSQL identifiers.
    return strtr($hash, array('+' => '_', '/' => '_', '=' => ''));
  }

}