MigrateExecutable.php 11.3 KB
Newer Older
1 2 3 4 5 6 7 8 9
<?php

/**
 * @file
 * Contains \Drupal\migrate\MigrateExecutable.
 */

namespace Drupal\migrate;

10
use Drupal\Core\Utility\Error;
11
use Drupal\Core\StringTranslation\StringTranslationTrait;
12
use Drupal\migrate\Entity\MigrationInterface;
13 14 15 16
use Drupal\migrate\Event\MigrateEvents;
use Drupal\migrate\Event\MigrateImportEvent;
use Drupal\migrate\Event\MigratePostRowSaveEvent;
use Drupal\migrate\Event\MigratePreRowSaveEvent;
17
use Drupal\migrate\Exception\RequirementsException;
18
use Drupal\migrate\Plugin\MigrateIdMapInterface;
19
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
20 21 22 23

/**
 * Defines a migrate executable class.
 */
24
class MigrateExecutable implements MigrateExecutableInterface {
25
  use StringTranslationTrait;
26 27

  /**
28
   * The configuration of the migration to do.
29
   *
30
   * @var \Drupal\migrate\Entity\Migration
31 32 33 34 35 36 37 38 39 40 41
   */
  protected $migration;

  /**
   * Status of one row.
   *
   * The value is a MigrateIdMapInterface::STATUS_* constant, for example:
   * STATUS_IMPORTED.
   *
   * @var int
   */
42
  protected $sourceRowStatus;
43 44 45 46 47 48 49 50 51 52 53 54

  /**
   * The queued messages not yet saved.
   *
   * Each element in the array is an array with two keys:
   * - 'message': The message string.
   * - 'level': The level, a MigrationInterface::MESSAGE_* constant.
   *
   * @var array
   */
  protected $queuedMessages = array();

55 56
  /**
   * The configuration values of the source.
57
   *
58
   * @var array
59
   */
60 61
  protected $sourceIdValues;

62
  /**
63
   * The rollback action to be saved for the current row.
64 65 66
   *
   * @var int
   */
67
  public $rollbackAction;
68 69

  /**
70
   * An array of counts. Initially used for cache hit/miss tracking.
71
   *
72 73
   * @var array
   */
74
  protected $counts = array();
75 76

  /**
77 78 79
   * The maximum number of items to pass in a single call during a rollback.
   *
   * For use in bulkRollback(). Can be overridden in derived class constructor.
80
   *
81 82
   * @var int
   */
83
  protected $rollbackBatchSize = 50;
84 85

  /**
86
   * The object currently being constructed.
87
   *
88
   * @var \stdClass
89
   */
90 91 92 93 94
  protected $destinationValues;

  /**
   * The source.
   *
95
   * @var \Drupal\migrate\Plugin\MigrateSourceInterface
96 97 98 99 100 101 102 103 104
   */
  protected $source;

  /**
   * The current data row retrieved from the source.
   *
   * @var \stdClass
   */
  protected $sourceValues;
105

106 107 108 109 110 111 112
  /**
   * The event dispatcher.
   *
   * @var \Symfony\Component\EventDispatcher\EventDispatcherInterface
   */
  protected $eventDispatcher;

113
  /**
114 115 116 117 118 119
   * Constructs a MigrateExecutable and verifies and sets the memory limit.
   *
   * @param \Drupal\migrate\Entity\MigrationInterface $migration
   *   The migration to run.
   * @param \Drupal\migrate\MigrateMessageInterface $message
   *   The message to record.
120 121
   * @param \Symfony\Component\EventDispatcher\EventDispatcherInterface $event_dispatcher
   *   The event dispatcher.
122 123 124
   *
   * @throws \Drupal\migrate\MigrateException
   */
125
  public function __construct(MigrationInterface $migration, MigrateMessageInterface $message, EventDispatcherInterface $event_dispatcher = NULL) {
126 127 128
    $this->migration = $migration;
    $this->message = $message;
    $this->migration->getIdMap()->setMessage($message);
129
    $this->eventDispatcher = $event_dispatcher;
130 131 132
  }

  /**
133 134 135 136
   * Returns the source.
   *
   * Makes sure source is initialized based on migration settings.
   *
137
   * @return \Drupal\migrate\Plugin\MigrateSourceInterface
138
   *   The source.
139
   */
140
  protected function getSource() {
141
    if (!isset($this->source)) {
142 143 144
      $this->source = $this->migration->getSourcePlugin();

      // @TODO, find out how to remove this.
145
      // @see https://www.drupal.org/node/2443617
146
      $this->source->migrateExecutable = $this;
147 148 149 150
    }
    return $this->source;
  }

151 152 153 154 155 156 157 158 159 160 161 162
  /**
   * Gets the event dispatcher.
   *
   * @return \Symfony\Component\EventDispatcher\EventDispatcherInterface
   */
  protected function getEventDispatcher() {
    if (!$this->eventDispatcher) {
      $this->eventDispatcher = \Drupal::service('event_dispatcher');
    }
    return $this->eventDispatcher;
  }

163
  /**
164
   * {@inheritdoc}
165 166
   */
  public function import() {
167 168
    $this->getEventDispatcher()->dispatch(MigrateEvents::PRE_IMPORT, new MigrateImportEvent($this->migration));

169
    // Knock off migration if the requirements haven't been met.
170 171 172 173
    try {
      $this->migration->checkRequirements();
    }
    catch (RequirementsException $e) {
174
      $this->message->display(
175 176 177 178 179
        $this->t('Migration @id did not meet the requirements. @message @requirements', array(
          '@id' => $this->migration->id(),
          '@message' => $e->getMessage(),
          '@requirements' => $e->getRequirementsString(),
        )), 'error');
180 181
      return MigrationInterface::RESULT_FAILED;
    }
182

183 184 185 186 187 188 189 190 191
    $return = MigrationInterface::RESULT_COMPLETED;
    $source = $this->getSource();
    $id_map = $this->migration->getIdMap();

    try {
      $source->rewind();
    }
    catch (\Exception $e) {
      $this->message->display(
192
        $this->t('Migration failed with source plugin exception: !e', array('!e' => $e->getMessage())), 'error');
193 194
      return MigrationInterface::RESULT_FAILED;
    }
195

196
    $destination = $this->migration->getDestinationPlugin();
197 198 199 200
    while ($source->valid()) {
      $row = $source->current();
      if ($this->sourceIdValues = $row->getSourceIdValues()) {
        // Wipe old messages, and save any new messages.
201
        $id_map->delete($this->sourceIdValues, TRUE);
202 203 204 205
        $this->saveQueuedMessages();
      }

      try {
206 207
        $this->processRow($row);
        $save = TRUE;
208
      }
209 210 211
      catch (MigrateSkipRowException $e) {
        $id_map->saveIdMapping($row, array(), MigrateIdMapInterface::STATUS_IGNORED, $this->rollbackAction);
        $save = FALSE;
212
      }
213 214 215

      if ($save) {
        try {
216
          $this->getEventDispatcher()->dispatch(MigrateEvents::PRE_ROW_SAVE, new MigratePreRowSaveEvent($this->migration, $row));
217
          $destination_id_values = $destination->import($row, $id_map->lookupDestinationId($this->sourceIdValues));
218
          $this->getEventDispatcher()->dispatch(MigrateEvents::POST_ROW_SAVE, new MigratePostRowSaveEvent($this->migration, $row, $destination_id_values));
219
          if ($destination_id_values) {
220 221 222 223
            // We do not save an idMap entry for config.
            if ($destination_id_values !== TRUE) {
              $id_map->saveIdMapping($row, $destination_id_values, $this->sourceRowStatus, $this->rollbackAction);
            }
224 225 226 227 228 229 230 231 232 233 234 235 236
          }
          else {
            $id_map->saveIdMapping($row, array(), MigrateIdMapInterface::STATUS_FAILED, $this->rollbackAction);
            if (!$id_map->messageCount()) {
              $message = $this->t('New object was not saved, no error provided');
              $this->saveMessage($message);
              $this->message->display($message);
            }
          }
        }
        catch (MigrateException $e) {
          $this->migration->getIdMap()->saveIdMapping($row, array(), $e->getStatus(), $this->rollbackAction);
          $this->saveMessage($e->getMessage(), $e->getLevel());
237
          $this->message->display($e->getMessage(), 'error');
238 239 240 241 242
        }
        catch (\Exception $e) {
          $this->migration->getIdMap()->saveIdMapping($row, array(), MigrateIdMapInterface::STATUS_FAILED, $this->rollbackAction);
          $this->handleException($e);
        }
243
      }
244 245
      if ($high_water_property = $this->migration->get('highWaterProperty')) {
        $this->migration->saveHighWater($row->getSourceProperty($high_water_property['name']));
246 247 248 249
      }

      // Reset row properties.
      unset($sourceValues, $destinationValues);
250
      $this->sourceRowStatus = MigrateIdMapInterface::STATUS_IMPORTED;
251 252 253 254 255 256 257

      try {
        $source->next();
      }
      catch (\Exception $e) {
        $this->message->display(
          $this->t('Migration failed with source plugin exception: !e',
258
            array('!e' => $e->getMessage())), 'error');
259 260 261 262
        return MigrationInterface::RESULT_FAILED;
      }
    }

263
    $this->migration->setMigrationResult($return);
264
    $this->getEventDispatcher()->dispatch(MigrateEvents::POST_IMPORT, new MigrateImportEvent($this->migration));
265 266 267 268
    return $return;
  }

  /**
269
   * {@inheritdoc}
270 271 272
   */
  public function processRow(Row $row, array $process = NULL, $value = NULL) {
    foreach ($this->migration->getProcessPlugins($process) as $destination => $plugins) {
273
      $multiple = FALSE;
274
      /** @var $plugin \Drupal\migrate\Plugin\MigrateProcessInterface */
275
      foreach ($plugins as $plugin) {
276 277 278 279 280 281 282 283 284 285
        $definition = $plugin->getPluginDefinition();
        // Many plugins expect a scalar value but the current value of the
        // pipeline might be multiple scalars (this is set by the previous
        // plugin) and in this case the current value needs to be iterated
        // and each scalar separately transformed.
        if ($multiple && !$definition['handle_multiples']) {
          $new_value = array();
          if (!is_array($value)) {
            throw new MigrateException(sprintf('Pipeline failed for destination %s: %s got instead of an array,', $destination, $value));
          }
286
          $break = FALSE;
287
          foreach ($value as $scalar_value) {
288 289 290 291 292 293
            try {
              $new_value[] = $plugin->transform($scalar_value, $this, $row, $destination);
            }
            catch (MigrateSkipProcessException $e) {
              $break = TRUE;
            }
294 295
          }
          $value = $new_value;
296 297 298
          if ($break) {
            break;
          }
299 300
        }
        else {
301 302 303 304 305 306
          try {
            $value = $plugin->transform($value, $this, $row, $destination);
          }
          catch (MigrateSkipProcessException $e) {
            break;
          }
307 308
          $multiple = $multiple || $plugin->multiple();
        }
309
      }
310 311 312 313
      // No plugins means do not set.
      if ($plugins) {
        $row->setDestinationProperty($destination, $value);
      }
314 315 316 317 318 319
      // Reset the value.
      $value = NULL;
    }
  }

  /**
320
   * Fetches the key array for the current source record.
321 322
   *
   * @return array
323
   *   The current source IDs.
324 325 326 327 328 329
   */
  protected function currentSourceIds() {
    return $this->getSource()->getCurrentIds();
  }

  /**
330
   * {@inheritdoc}
331 332 333 334 335 336
   */
  public function saveMessage($message, $level = MigrationInterface::MESSAGE_ERROR) {
    $this->migration->getIdMap()->saveMessage($this->sourceIdValues, $message, $level);
  }

  /**
337
   * {@inheritdoc}
338 339 340 341 342 343
   */
  public function queueMessage($message, $level = MigrationInterface::MESSAGE_ERROR) {
    $this->queuedMessages[] = array('message' => $message, 'level' => $level);
  }

  /**
344
   * {@inheritdoc}
345 346 347 348 349 350 351 352
   */
  public function saveQueuedMessages() {
    foreach ($this->queuedMessages as $queued_message) {
      $this->saveMessage($queued_message['message'], $queued_message['level']);
    }
    $this->queuedMessages = array();
  }

353 354 355 356
  /**
   * Takes an Exception object and both saves and displays it.
   *
   * Pulls in additional information on the location triggering the exception.
357 358
   *
   * @param \Exception $exception
359 360 361 362
   *   Object representing the exception.
   * @param bool $save
   *   (optional) Whether to save the message in the migration's mapping table.
   *   Set to FALSE in contexts where this doesn't make sense.
363
   */
364
  protected function handleException(\Exception $exception, $save = TRUE) {
365
    $result = Error::decodeException($exception);
366 367 368 369
    $message = $result['!message'] . ' (' . $result['%file'] . ':' . $result['%line'] . ')';
    if ($save) {
      $this->saveMessage($message);
    }
370
    $this->message->display($message, 'error');
371 372 373
  }

}