MigrateExecutable.php 18 KB
Newer Older
1 2 3 4 5 6 7 8 9
<?php

/**
 * @file
 * Contains \Drupal\migrate\MigrateExecutable.
 */

namespace Drupal\migrate;

10
use Drupal\Core\Utility\Error;
11
use Drupal\Core\StringTranslation\StringTranslationTrait;
12 13 14 15 16 17 18
use Drupal\migrate\Entity\MigrationInterface;
use Drupal\migrate\Plugin\MigrateIdMapInterface;

/**
 * Defines a migrate executable class.
 */
class MigrateExecutable {
19
  use StringTranslationTrait;
20 21

  /**
22
   * The configuration of the migration to do.
23
   *
24
   * @var \Drupal\migrate\Entity\Migration
25 26 27 28 29 30 31 32
   */
  protected $migration;

  /**
   * The number of successfully imported rows since feedback was given.
   *
   * @var int
   */
33
  protected $successesSinceFeedback;
34 35 36 37 38 39

  /**
   * The number of rows that were successfully processed.
   *
   * @var int
   */
40
  protected $totalSuccesses;
41 42 43 44 45 46 47 48 49

  /**
   * Status of one row.
   *
   * The value is a MigrateIdMapInterface::STATUS_* constant, for example:
   * STATUS_IMPORTED.
   *
   * @var int
   */
50
  protected $sourceRowStatus;
51 52 53 54 55 56 57 58

  /**
   * The number of rows processed.
   *
   * The total attempted, whether or not they were successful.
   *
   * @var int
   */
59
  protected $totalProcessed;
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82

  /**
   * The queued messages not yet saved.
   *
   * Each element in the array is an array with two keys:
   * - 'message': The message string.
   * - 'level': The level, a MigrationInterface::MESSAGE_* constant.
   *
   * @var array
   */
  protected $queuedMessages = array();

  /**
   * The options that can be set when executing the migration.
   *
   * Values can be set for:
   * - 'limit': Sets a time limit.
   *
   * @var array
   */
  protected $options;

  /**
83
   * The PHP max_execution_time.
84
   *
85 86 87 88
   * @var int
   */
  protected $maxExecTime;

89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
  /**
   * The ratio of the memory limit at which an operation will be interrupted.
   *
   * @var float
   */
  protected $memoryThreshold = 0.85;

  /**
   * The ratio of the time limit at which an operation will be interrupted.
   *
   * @var float
   */
  public $timeThreshold = 0.90;

  /**
   * The time limit when executing the migration.
   *
   * @var array
   */
  public $limit = array();

110 111
  /**
   * The configuration values of the source.
112
   *
113
   * @var array
114
   */
115 116 117 118 119 120 121 122
  protected $sourceIdValues;

  /**
   * The number of rows processed since feedback was given.
   *
   * @var int
   */
  protected $processedSinceFeedback = 0;
123 124 125 126 127 128 129 130 131

  /**
   * The PHP memory_limit expressed in bytes.
   *
   * @var int
   */
  protected $memoryLimit;

  /**
132
   * The rollback action to be saved for the current row.
133 134 135
   *
   * @var int
   */
136
  public $rollbackAction;
137 138

  /**
139
   * An array of counts. Initially used for cache hit/miss tracking.
140
   *
141 142
   * @var array
   */
143
  protected $counts = array();
144 145

  /**
146 147 148
   * The maximum number of items to pass in a single call during a rollback.
   *
   * For use in bulkRollback(). Can be overridden in derived class constructor.
149
   *
150 151
   * @var int
   */
152
  protected $rollbackBatchSize = 50;
153 154

  /**
155
   * The object currently being constructed.
156
   *
157
   * @var \stdClass
158
   */
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
  protected $destinationValues;

  /**
   * The source.
   *
   * @var \Drupal\migrate\Source
   */
  protected $source;

  /**
   * The current data row retrieved from the source.
   *
   * @var \stdClass
   */
  protected $sourceValues;
174 175

  /**
176 177 178 179 180 181
   * Constructs a MigrateExecutable and verifies and sets the memory limit.
   *
   * @param \Drupal\migrate\Entity\MigrationInterface $migration
   *   The migration to run.
   * @param \Drupal\migrate\MigrateMessageInterface $message
   *   The message to record.
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
   *
   * @throws \Drupal\migrate\MigrateException
   */
  public function __construct(MigrationInterface $migration, MigrateMessageInterface $message) {
    $this->migration = $migration;
    $this->message = $message;
    $this->migration->getIdMap()->setMessage($message);
    // Record the memory limit in bytes
    $limit = trim(ini_get('memory_limit'));
    if ($limit == '-1') {
      $this->memoryLimit = PHP_INT_MAX;
    }
    else {
      if (!is_numeric($limit)) {
        $last = strtolower(substr($limit, -1));
        switch ($last) {
          case 'g':
            $limit *= 1024;
          case 'm':
            $limit *= 1024;
          case 'k':
            $limit *= 1024;
            break;
          default:
            throw new MigrateException($this->t('Invalid PHP memory_limit !limit',
              array('!limit' => $limit)));
        }
      }
      $this->memoryLimit = $limit;
    }
212 213
    // Record the maximum execution time limit.
    $this->maxExecTime = ini_get('max_execution_time');
214 215 216
  }

  /**
217 218 219 220
   * Returns the source.
   *
   * Makes sure source is initialized based on migration settings.
   *
221
   * @return \Drupal\migrate\Source
222
   *   The source.
223 224 225 226 227 228 229 230 231
   */
  public function getSource() {
    if (!isset($this->source)) {
      $this->source = new Source($this->migration, $this);
    }
    return $this->source;
  }

  /**
232
   * Performs an import operation - migrate items from source to destination.
233 234
   */
  public function import() {
235 236 237 238 239 240
    // Knock off migration if the requirements haven't been met.
    if (!$this->migration->checkRequirements()) {
      $this->message->display(
        $this->t('Migration @id did not meet the requirements', array('@id' => $this->migration->id())), 'error');
      return MigrationInterface::RESULT_FAILED;
    }
241 242 243 244 245 246 247 248 249 250
    $return = MigrationInterface::RESULT_COMPLETED;
    $source = $this->getSource();
    $id_map = $this->migration->getIdMap();

    try {
      $source->rewind();
    }
    catch (\Exception $e) {
      $this->message->display(
        $this->t('Migration failed with source plugin exception: !e',
251
          array('!e' => $e->getMessage())), 'error');
252 253
      return MigrationInterface::RESULT_FAILED;
    }
254

255 256
    $destination = $this->migration->getDestinationPlugin();

257 258 259 260
    while ($source->valid()) {
      $row = $source->current();
      if ($this->sourceIdValues = $row->getSourceIdValues()) {
        // Wipe old messages, and save any new messages.
261
        $id_map->delete($this->sourceIdValues, TRUE);
262 263 264 265
        $this->saveQueuedMessages();
      }

      try {
266 267
        $this->processRow($row);
        $save = TRUE;
268
      }
269 270 271
      catch (MigrateSkipRowException $e) {
        $id_map->saveIdMapping($row, array(), MigrateIdMapInterface::STATUS_IGNORED, $this->rollbackAction);
        $save = FALSE;
272
      }
273 274 275

      if ($save) {
        try {
276
          $destination_id_values = $destination->import($row, $id_map->lookupDestinationId($this->sourceIdValues));
277
          if ($destination_id_values) {
278 279 280 281
            // We do not save an idMap entry for config.
            if ($destination_id_values !== TRUE) {
              $id_map->saveIdMapping($row, $destination_id_values, $this->sourceRowStatus, $this->rollbackAction);
            }
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296
            $this->successesSinceFeedback++;
            $this->totalSuccesses++;
          }
          else {
            $id_map->saveIdMapping($row, array(), MigrateIdMapInterface::STATUS_FAILED, $this->rollbackAction);
            if (!$id_map->messageCount()) {
              $message = $this->t('New object was not saved, no error provided');
              $this->saveMessage($message);
              $this->message->display($message);
            }
          }
        }
        catch (MigrateException $e) {
          $this->migration->getIdMap()->saveIdMapping($row, array(), $e->getStatus(), $this->rollbackAction);
          $this->saveMessage($e->getMessage(), $e->getLevel());
297
          $this->message->display($e->getMessage(), 'error');
298 299 300 301 302
        }
        catch (\Exception $e) {
          $this->migration->getIdMap()->saveIdMapping($row, array(), MigrateIdMapInterface::STATUS_FAILED, $this->rollbackAction);
          $this->handleException($e);
        }
303
      }
304 305
      $this->totalProcessed++;
      $this->processedSinceFeedback++;
306 307 308 309 310 311
      if ($highwater_property = $this->migration->get('highwaterProperty')) {
        $this->migration->saveHighwater($row->getSourceProperty($highwater_property['name']));
      }

      // Reset row properties.
      unset($sourceValues, $destinationValues);
312
      $this->sourceRowStatus = MigrateIdMapInterface::STATUS_IMPORTED;
313 314 315 316 317 318 319 320 321 322 323 324 325

      if (($return = $this->checkStatus()) != MigrationInterface::RESULT_COMPLETED) {
        break;
      }
      if ($this->timeOptionExceeded()) {
        break;
      }
      try {
        $source->next();
      }
      catch (\Exception $e) {
        $this->message->display(
          $this->t('Migration failed with source plugin exception: !e',
326
            array('!e' => $e->getMessage())), 'error');
327 328 329 330 331 332 333 334 335
        return MigrationInterface::RESULT_FAILED;
      }
    }

    /**
     * @TODO uncomment this
     */
    #$this->progressMessage($return);

336
    $this->migration->setMigrationResult($return);
337 338 339 340
    return $return;
  }

  /**
341 342 343
   * Processes a row.
   *
   * @param \Drupal\migrate\Row $row
344 345
   *   The $row to be processed.
   * @param array $process
346 347
   *   (optional) A process pipeline configuration. If not set, the top level
   *   process configuration in the migration entity is used.
348
   * @param mixed $value
349
   *   (optional) Initial value of the pipeline for the first destination.
350 351 352 353 354
   *   Usually setting this is not necessary as $process typically starts with
   *   a 'get'. This is useful only when the $process contains a single
   *   destination and needs to access a value outside of the source. See
   *   \Drupal\migrate\Plugin\migrate\process\Iterator::transformKey for an
   *   example.
355 356
   *
   * @throws \Drupal\migrate\MigrateException
357 358 359
   */
  public function processRow(Row $row, array $process = NULL, $value = NULL) {
    foreach ($this->migration->getProcessPlugins($process) as $destination => $plugins) {
360
      $multiple = FALSE;
361
      /** @var $plugin \Drupal\migrate\Plugin\MigrateProcessInterface */
362
      foreach ($plugins as $plugin) {
363 364 365 366 367 368 369 370 371 372
        $definition = $plugin->getPluginDefinition();
        // Many plugins expect a scalar value but the current value of the
        // pipeline might be multiple scalars (this is set by the previous
        // plugin) and in this case the current value needs to be iterated
        // and each scalar separately transformed.
        if ($multiple && !$definition['handle_multiples']) {
          $new_value = array();
          if (!is_array($value)) {
            throw new MigrateException(sprintf('Pipeline failed for destination %s: %s got instead of an array,', $destination, $value));
          }
373
          $break = FALSE;
374
          foreach ($value as $scalar_value) {
375 376 377 378 379 380
            try {
              $new_value[] = $plugin->transform($scalar_value, $this, $row, $destination);
            }
            catch (MigrateSkipProcessException $e) {
              $break = TRUE;
            }
381 382
          }
          $value = $new_value;
383 384 385
          if ($break) {
            break;
          }
386 387
        }
        else {
388 389 390 391 392 393
          try {
            $value = $plugin->transform($value, $this, $row, $destination);
          }
          catch (MigrateSkipProcessException $e) {
            break;
          }
394 395
          $multiple = $multiple || $plugin->multiple();
        }
396
      }
397 398 399 400
      // No plugins means do not set.
      if ($plugins) {
        $row->setDestinationProperty($destination, $value);
      }
401 402 403 404 405 406
      // Reset the value.
      $value = NULL;
    }
  }

  /**
407
   * Fetches the key array for the current source record.
408 409
   *
   * @return array
410
   *   The current source IDs.
411 412 413 414 415 416
   */
  protected function currentSourceIds() {
    return $this->getSource()->getCurrentIds();
  }

  /**
417
   * Tests whether we've exceeded the designated time limit.
418
   *
419 420
   * @return bool
   *   TRUE if the threshold is exceeded, FALSE if not.
421 422
   */
  protected function timeOptionExceeded() {
423
    // If there is no time limit, then it is not exceeded.
424 425 426
    if (!$time_limit = $this->getTimeLimit()) {
      return FALSE;
    }
427 428
    // Calculate if the time limit is exceeded.
    $time_elapsed = $this->getTimeElapsed();
429 430 431 432 433 434 435 436
    if ($time_elapsed >= $time_limit) {
      return TRUE;
    }
    else {
      return FALSE;
    }
  }

437 438 439 440 441 442
  /**
   * Returns the time limit.
   *
   * @return null|int
   *   The time limit, NULL if no limit or if the units were not in seconds.
   */
443
  public function getTimeLimit() {
444
    $limit = $this->limit;
445 446
    if (isset($limit['unit']) && isset($limit['value']) && ($limit['unit'] == 'seconds' || $limit['unit'] == 'second')) {
      return $limit['value'];
447 448 449 450 451 452 453
    }
    else {
      return NULL;
    }
  }

  /**
454
   * Passes messages through to the map class.
455 456
   *
   * @param string $message
457
   *   The message to record.
458
   * @param int $level
459
   *   (optional) Message severity (defaults to MESSAGE_ERROR).
460 461 462 463 464 465
   */
  public function saveMessage($message, $level = MigrationInterface::MESSAGE_ERROR) {
    $this->migration->getIdMap()->saveMessage($this->sourceIdValues, $message, $level);
  }

  /**
466
   * Queues messages to be later saved through the map class.
467 468
   *
   * @param string $message
469
   *   The message to record.
470
   * @param int $level
471
   *   (optional) Message severity (defaults to MESSAGE_ERROR).
472 473 474 475 476 477
   */
  public function queueMessage($message, $level = MigrationInterface::MESSAGE_ERROR) {
    $this->queuedMessages[] = array('message' => $message, 'level' => $level);
  }

  /**
478
   * Saves any messages we've queued up to the message table.
479 480 481 482 483 484 485 486 487
   */
  public function saveQueuedMessages() {
    foreach ($this->queuedMessages as $queued_message) {
      $this->saveMessage($queued_message['message'], $queued_message['level']);
    }
    $this->queuedMessages = array();
  }

  /**
488 489 490
   * Checks for exceptional conditions, and display feedback.
   *
   * Standard top-of-loop stuff, common between rollback and import.
491 492 493 494 495
   */
  protected function checkStatus() {
    if ($this->memoryExceeded()) {
      return MigrationInterface::RESULT_INCOMPLETE;
    }
496
    if ($this->maxExecTimeExceeded()) {
497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519
      return MigrationInterface::RESULT_INCOMPLETE;
    }
    /*
     * @TODO uncomment this
    if ($this->getStatus() == MigrationInterface::STATUS_STOPPING) {
      return MigrationBase::RESULT_STOPPED;
    }
    */
    // If feedback is requested, produce a progress message at the proper time
    /*
     * @TODO uncomment this
    if (isset($this->feedback)) {
      if (($this->feedback_unit == 'seconds' && time() - $this->lastfeedback >= $this->feedback) ||
          ($this->feedback_unit == 'items' && $this->processed_since_feedback >= $this->feedback)) {
        $this->progressMessage(MigrationInterface::RESULT_INCOMPLETE);
      }
    }
    */

    return MigrationInterface::RESULT_COMPLETED;
  }

  /**
520 521 522
   * Tests whether we've exceeded the desired memory threshold.
   *
   * If so, output a message.
523
   *
524 525
   * @return bool
   *   TRUE if the threshold is exceeded, otherwise FALSE.
526 527
   */
  protected function memoryExceeded() {
528
    $usage = $this->getMemoryUsage();
529
    $pct_memory = $usage / $this->memoryLimit;
530
    if (!$threshold = $this->memoryThreshold) {
531 532 533
      return FALSE;
    }
    if ($pct_memory > $threshold) {
534
      $this->message->display(
535
        $this->t('Memory usage is !usage (!pct% of limit !limit), reclaiming memory.',
536
          array('!pct' => round($pct_memory*100),
537 538
                '!usage' => $this->formatSize($usage),
                '!limit' => $this->formatSize($this->memoryLimit))),
539
        'warning');
540 541
      $usage = $this->attemptMemoryReclaim();
      $pct_memory = $usage / $this->memoryLimit;
542 543
      // Use a lower threshold - we don't want to be in a situation where we keep
      // coming back here and trimming a tiny amount
544
      if ($pct_memory > (0.90 * $threshold)) {
545 546 547
        $this->message->display(
          $this->t('Memory usage is now !usage (!pct% of limit !limit), not enough reclaimed, starting new batch',
            array('!pct' => round($pct_memory*100),
548 549
                  '!usage' => $this->formatSize($usage),
                  '!limit' => $this->formatSize($this->memoryLimit))),
550 551 552 553 554 555 556
          'warning');
        return TRUE;
      }
      else {
        $this->message->display(
          $this->t('Memory usage is now !usage (!pct% of limit !limit), reclaimed enough, continuing',
            array('!pct' => round($pct_memory*100),
557 558
                  '!usage' => $this->formatSize($usage),
                  '!limit' => $this->formatSize($this->memoryLimit))),
559 560 561 562 563 564 565 566 567 568
          'warning');
        return FALSE;
      }
    }
    else {
      return FALSE;
    }
  }

  /**
569
   * Returns the memory usage so far.
570
   *
571 572
   * @return int
   *   The memory usage.
573
   */
574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589
  protected function getMemoryUsage() {
    return memory_get_usage();
  }

  /**
   * Tries to reclaim memory.
   *
   * @return int
   *   The memory usage after reclaim.
   */
  protected function attemptMemoryReclaim() {
    // First, try resetting Drupal's static storage - this frequently releases
    // plenty of memory to continue.
    drupal_static_reset();
    // @TODO: explore resetting the container.
    return memory_get_usage();
590 591 592
  }

  /**
593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611
   * Generates a string representation for the given byte count.
   *
   * @param int $size
   *   A size in bytes.
   *
   * @return string
   *   A translated string representation of the size.
   */
  protected function formatSize($size) {
    return format_size($size);
  }

  /**
   * Tests whether we're approaching the PHP maximum execution time limit.
   *
   * @return bool
   *   TRUE if the threshold is exceeded, FALSE if not.
   */
  protected function maxExecTimeExceeded() {
612
    return $this->maxExecTime && (($this->getTimeElapsed() / $this->maxExecTime) > $this->timeThreshold);
613 614 615 616 617 618 619 620 621 622 623 624 625 626 627
  }

  /**
   * Returns the time elapsed.
   *
   * This allows a test to set a fake elapsed time.
   */
  protected function getTimeElapsed() {
    return time() - REQUEST_TIME;
  }

  /**
   * Takes an Exception object and both saves and displays it.
   *
   * Pulls in additional information on the location triggering the exception.
628 629
   *
   * @param \Exception $exception
630 631 632 633
   *   Object representing the exception.
   * @param bool $save
   *   (optional) Whether to save the message in the migration's mapping table.
   *   Set to FALSE in contexts where this doesn't make sense.
634
   */
635
  public function handleException(\Exception $exception, $save = TRUE) {
636
    $result = Error::decodeException($exception);
637 638 639 640
    $message = $result['!message'] . ' (' . $result['%file'] . ':' . $result['%line'] . ')';
    if ($save) {
      $this->saveMessage($message);
    }
641
    $this->message->display($message, 'error');
642 643 644
  }

}