SqlBase.php 9.1 KB
Newer Older
1
2
3
4
5
<?php

namespace Drupal\migrate\Plugin\migrate\source;

use Drupal\Core\Database\Database;
6
7
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\State\StateInterface;
8
use Drupal\migrate\Plugin\MigrationInterface;
9
use Drupal\migrate\Plugin\migrate\id_map\Sql;
10
use Drupal\migrate\Plugin\MigrateIdMapInterface;
11
use Symfony\Component\DependencyInjection\ContainerInterface;
12
13
14

/**
 * Sources whose data may be fetched via DBTNG.
15
 *
16
17
18
19
20
 * By default, an existing database connection with key 'migrate' and target
 * 'default' is used. These may be overridden with explicit 'key' and/or
 * 'target' configuration keys. In addition, if the configuration key 'database'
 * is present, it is used as a database connection information array to define
 * the connection.
21
 */
22
abstract class SqlBase extends SourcePluginBase implements ContainerFactoryPluginInterface {
23
24

  /**
25
26
   * The query string.
   *
27
28
29
30
31
   * @var \Drupal\Core\Database\Query\SelectInterface
   */
  protected $query;

  /**
32
33
   * The database object.
   *
34
35
36
37
   * @var \Drupal\Core\Database\Connection
   */
  protected $database;

38
39
40
41
42
43
44
  /**
   * State service for retrieving database info.
   *
   * @var \Drupal\Core\State\StateInterface
   */
  protected $state;

45
46
47
  /**
   * {@inheritdoc}
   */
48
  public function __construct(array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration, StateInterface $state) {
49
    parent::__construct($configuration, $plugin_id, $plugin_definition, $migration);
50
51
52
53
54
55
56
57
58
59
60
61
62
63
    $this->state = $state;
  }

  /**
   * {@inheritdoc}
   */
  public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration = NULL) {
    return new static(
      $configuration,
      $plugin_id,
      $plugin_definition,
      $migration,
      $container->get('state')
    );
64
65
66
  }

  /**
67
   * Prints the query string when the object is used as a string.
68
69
70
   *
   * @return string
   *   The query string.
71
   */
72
  public function __toString() {
73
    return (string) $this->query();
74
75
76
  }

  /**
77
   * Gets the database connection object.
78
   *
79
   * @return \Drupal\Core\Database\Connection
80
   *   The database connection.
81
82
   */
  public function getDatabase() {
83
    if (!isset($this->database)) {
84
85
86
87
      // See if the database info is in state - if not, fallback to
      // configuration.
      if (isset($this->configuration['database_state_key'])) {
        $this->database = $this->setUpDatabase($this->state->get($this->configuration['database_state_key']));
88
      }
89
90
91
      elseif (($fallback_state_key = $this->state->get('migrate.fallback_state_key'))) {
        $this->database = $this->setUpDatabase($this->state->get($fallback_state_key));
      }
92
      else {
93
        $this->database = $this->setUpDatabase($this->configuration);
94
      }
95
96
97
98
    }
    return $this->database;
  }

99
  /**
100
101
102
   * Gets a connection to the referenced database.
   *
   * This method will add the database connection if necessary.
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
   *
   * @param array $database_info
   *   Configuration for the source database connection. The keys are:
   *    'key' - The database connection key.
   *    'target' - The database connection target.
   *    'database' - Database configuration array as accepted by
   *      Database::addConnectionInfo.
   *
   * @return \Drupal\Core\Database\Connection
   *   The connection to use for this plugin's queries.
   */
  protected function setUpDatabase(array $database_info) {
    if (isset($database_info['key'])) {
      $key = $database_info['key'];
    }
    else {
      $key = 'migrate';
    }
    if (isset($database_info['target'])) {
      $target = $database_info['target'];
    }
    else {
      $target = 'default';
    }
    if (isset($database_info['database'])) {
      Database::addConnectionInfo($key, $target, $database_info['database']);
    }
    return Database::getConnection($target, $key);
  }

133
134
135
  /**
   * Wrapper for database select.
   */
136
137
138
139
140
141
  protected function select($table, $alias = NULL, array $options = array()) {
    $options['fetch'] = \PDO::FETCH_ASSOC;
    return $this->getDatabase()->select($table, $alias, $options);
  }

  /**
142
   * Adds tags and metadata to the query.
143
   *
144
145
   * @return \Drupal\Core\Database\Query\SelectInterface
   *   The query with additional tags and metadata.
146
   */
147
  protected function prepareQuery() {
148
149
150
151
    $this->query = clone $this->query();
    $this->query->addTag('migrate');
    $this->query->addTag('migrate_' . $this->migration->id());
    $this->query->addMetaData('migration', $this->migration);
152
153
154
155
156
157
158
159
160
161

    return $this->query;
  }

  /**
   * Implementation of MigrateSource::performRewind().
   *
   * We could simply execute the query and be functionally correct, but
   * we will take advantage of the PDO-based API to optimize the query up-front.
   */
162
  protected function initializeIterator() {
163
    $this->prepareQuery();
164

165
    // Get the key values, for potential use in joining to the map table.
166
167
168
    $keys = array();

    // The rules for determining what conditions to add to the query are as
169
    // follows (applying first applicable rule):
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
    // 1. If the map is joinable, join it. We will want to accept all rows
    //    which are either not in the map, or marked in the map as NEEDS_UPDATE.
    //    Note that if high water fields are in play, we want to accept all rows
    //    above the high water mark in addition to those selected by the map
    //    conditions, so we need to OR them together (but AND with any existing
    //    conditions in the query). So, ultimately the SQL condition will look
    //    like (original conditions) AND (map IS NULL OR map needs update
    //      OR above high water).
    $conditions = $this->query->orConditionGroup();
    $condition_added = FALSE;
    if (empty($this->configuration['ignore_map']) && $this->mapJoinable()) {
      // Build the join to the map table. Because the source key could have
      // multiple fields, we need to build things up.
      $count = 1;
      $map_join = '';
      $delimiter = '';
      foreach ($this->getIds() as $field_name => $field_schema) {
        if (isset($field_schema['alias'])) {
188
          $field_name = $field_schema['alias'] . '.' . $this->query->escapeField($field_name);
189
        }
190
191
192
        $map_join .= "$delimiter$field_name = map.sourceid" . $count++;
        $delimiter = ' AND ';
      }
193

194
195
196
197
      $alias = $this->query->leftJoin($this->migration->getIdMap()->getQualifiedMapTableName(), 'map', $map_join);
      $conditions->isNull($alias . '.sourceid1');
      $conditions->condition($alias . '.source_row_status', MigrateIdMapInterface::STATUS_NEEDS_UPDATE);
      $condition_added = TRUE;
198

199
200
201
202
203
204
      // And as long as we have the map table, add its data to the row.
      $n = count($this->getIds());
      for ($count = 1; $count <= $n; $count++) {
        $map_key = 'sourceid' . $count;
        $this->query->addField($alias, $map_key, "migrate_map_$map_key");
      }
205
      if ($n = count($this->migration->getDestinationIds())) {
206
        for ($count = 1; $count <= $n; $count++) {
207
          $map_key = 'destid' . $count++;
208
209
210
          $this->query->addField($alias, $map_key, "migrate_map_$map_key");
        }
      }
211
212
213
214
      $this->query->addField($alias, 'source_row_status', 'migrate_map_source_row_status');
    }
    // 2. If we are using high water marks, also include rows above the mark.
    //    But, include all rows if the high water mark is not set.
215
216
217
218
    if ($this->getHighWaterProperty() && ($high_water = $this->getHighWater()) !== '') {
      $high_water_field = $this->getHighWaterField();
      $conditions->condition($high_water_field, $high_water, '>');
      $this->query->orderBy($high_water_field);
219
220
221
    }
    if ($condition_added) {
      $this->query->condition($conditions);
222
223
224
225
226
227
228
229
    }

    return new \IteratorIterator($this->query->execute());
  }

  /**
   * @return \Drupal\Core\Database\Query\SelectInterface
   */
230
  abstract public function query();
231
232
233
234
235
236
237
238

  /**
   * {@inheritdoc}
   */
  public function count() {
    return $this->query()->countQuery()->execute()->fetchField();
  }

239
  /**
240
   * Checks if we can join against the map table.
241
242
243
244
245
246
247
   *
   * This function specifically catches issues when we're migrating with
   * unique sets of credentials for the source and destination database.
   *
   * @return bool
   *   TRUE if we can join against the map table otherwise FALSE.
   */
248
249
250
251
252
253
254
255
256
257
  protected function mapJoinable() {
    if (!$this->getIds()) {
      return FALSE;
    }
    $id_map = $this->migration->getIdMap();
    if (!$id_map instanceof Sql) {
      return FALSE;
    }
    $id_map_database_options = $id_map->getDatabase()->getConnectionOptions();
    $source_database_options = $this->getDatabase()->getConnectionOptions();
258
259
260
261
262
263
264
265
266

    // Special handling for sqlite which deals with files.
    if ($id_map_database_options['driver'] === 'sqlite' &&
      $source_database_options['driver'] === 'sqlite' &&
      $id_map_database_options['database'] != $source_database_options['database']
    ) {
      return FALSE;
    }

267
    foreach (array('username', 'password', 'host', 'port', 'namespace', 'driver') as $key) {
268
269
270
271
      if (isset($source_database_options[$key])) {
        if ($id_map_database_options[$key] != $source_database_options[$key]) {
          return FALSE;
        }
272
273
274
275
      }
    }
    return TRUE;
  }
276

277
}