Newer
Older
Dries Buytaert
committed
<?php
namespace Drupal\migrate;
use Drupal\Component\Utility\Bytes;
catch
committed
use Drupal\Core\Utility\Error;
Alex Pott
committed
use Drupal\Core\StringTranslation\StringTranslationTrait;
Angie Byron
committed
use Drupal\migrate\Event\MigrateEvents;
use Drupal\migrate\Event\MigrateImportEvent;
use Drupal\migrate\Event\MigratePostRowSaveEvent;
use Drupal\migrate\Event\MigratePreRowSaveEvent;
Angie Byron
committed
use Drupal\migrate\Event\MigrateRollbackEvent;
use Drupal\migrate\Event\MigrateRowDeleteEvent;
use Drupal\migrate\Exception\RequirementsException;
Dries Buytaert
committed
use Drupal\migrate\Plugin\MigrateIdMapInterface;
use Drupal\migrate\Plugin\MigrationInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
Dries Buytaert
committed
/**
* Defines a migrate executable class.
*/
class MigrateExecutable implements MigrateExecutableInterface {
Alex Pott
committed
use StringTranslationTrait;
Dries Buytaert
committed
/**
* The configuration of the migration to do.
Dries Buytaert
committed
*
* @var \Drupal\migrate\Plugin\MigrationInterface
Dries Buytaert
committed
*/
protected $migration;
/**
* Status of one row.
*
* The value is a MigrateIdMapInterface::STATUS_* constant, for example:
* STATUS_IMPORTED.
*
* @var int
*/
protected $sourceRowStatus;
Dries Buytaert
committed
Angie Byron
committed
/**
* The ratio of the memory limit at which an operation will be interrupted.
*
* @var float
*/
protected $memoryThreshold = 0.85;
/**
* The PHP memory_limit expressed in bytes.
*
* @var int
*/
protected $memoryLimit;
catch
committed
/**
* The configuration values of the source.
Dries Buytaert
committed
*
catch
committed
* @var array
Dries Buytaert
committed
*/
catch
committed
protected $sourceIdValues;
Dries Buytaert
committed
/**
catch
committed
* An array of counts. Initially used for cache hit/miss tracking.
Dries Buytaert
committed
* @var array
*/
protected $counts = [];
Dries Buytaert
committed
catch
committed
/**
* The source.
*
Alex Pott
committed
* @var \Drupal\migrate\Plugin\MigrateSourceInterface
catch
committed
*/
protected $source;
Angie Byron
committed
/**
* The event dispatcher.
*
* @var \Symfony\Contracts\EventDispatcher\EventDispatcherInterface
Angie Byron
committed
*/
protected $eventDispatcher;
/**
* Migration message service.
*
* @todo https://www.drupal.org/node/2822663 Make this protected.
*
* @var \Drupal\migrate\MigrateMessageInterface
*/
public $message;
Dries Buytaert
committed
/**
* Constructs a MigrateExecutable and verifies and sets the memory limit.
*
* @param \Drupal\migrate\Plugin\MigrationInterface $migration
* The migration to run.
* @param \Drupal\migrate\MigrateMessageInterface $message
* (optional) The migrate message service.
* @param \Symfony\Contracts\EventDispatcher\EventDispatcherInterface $event_dispatcher
* (optional) The event dispatcher.
Dries Buytaert
committed
*/
public function __construct(MigrationInterface $migration, MigrateMessageInterface $message = NULL, EventDispatcherInterface $event_dispatcher = NULL) {
Dries Buytaert
committed
$this->migration = $migration;
$this->message = $message ?: new MigrateMessage();
$this->getIdMap()->setMessage($this->message);
Angie Byron
committed
$this->eventDispatcher = $event_dispatcher;
Angie Byron
committed
// Record the memory limit in bytes
$limit = trim(ini_get('memory_limit'));
if ($limit == '-1') {
$this->memoryLimit = PHP_INT_MAX;
}
else {
Alex Pott
committed
$this->memoryLimit = Bytes::toNumber($limit);
Angie Byron
committed
}
Dries Buytaert
committed
}
/**
* Returns the source.
*
* Makes sure source is initialized based on migration settings.
*
Alex Pott
committed
* @return \Drupal\migrate\Plugin\MigrateSourceInterface
* The source.
Dries Buytaert
committed
*/
protected function getSource() {
Dries Buytaert
committed
if (!isset($this->source)) {
Alex Pott
committed
$this->source = $this->migration->getSourcePlugin();
Dries Buytaert
committed
}
return $this->source;
}
Angie Byron
committed
/**
* Gets the event dispatcher.
*
* @return \Symfony\Contracts\EventDispatcher\EventDispatcherInterface
Angie Byron
committed
*/
protected function getEventDispatcher() {
if (!$this->eventDispatcher) {
$this->eventDispatcher = \Drupal::service('event_dispatcher');
}
return $this->eventDispatcher;
}
Dries Buytaert
committed
/**
* {@inheritdoc}
Dries Buytaert
committed
*/
public function import() {
// Only begin the import operation if the migration is currently idle.
if ($this->migration->getStatus() !== MigrationInterface::STATUS_IDLE) {
$this->message->display($this->t('Migration @id is busy with another operation: @status',
[
'@id' => $this->migration->id(),
'@status' => $this->t($this->migration->getStatusLabel()),
]), 'error');
return MigrationInterface::RESULT_FAILED;
}
$this->getEventDispatcher()->dispatch(new MigrateImportEvent($this->migration, $this->message), MigrateEvents::PRE_IMPORT);
Angie Byron
committed
Dries Buytaert
committed
// Knock off migration if the requirements haven't been met.
try {
$this->migration->checkRequirements();
}
catch (RequirementsException $e) {
Dries Buytaert
committed
$this->message->display(
$this->t(
'Migration @id did not meet the requirements. @message',
[
'@id' => $this->migration->id(),
'@message' => $e->getMessage(),
]
),
'error'
);
Dries Buytaert
committed
return MigrationInterface::RESULT_FAILED;
}
$this->migration->setStatus(MigrationInterface::STATUS_IMPORTING);
Dries Buytaert
committed
$source = $this->getSource();
try {
$source->rewind();
}
catch (\Exception $e) {
$this->message->display(
Lee Rowlands
committed
$this->t('Migration failed with source plugin exception: @e in @file line @line', [
'@e' => $e->getMessage(),
'@file' => $e->getFile(),
'@line' => $e->getLine(),
]), 'error');
$this->migration->setStatus(MigrationInterface::STATUS_IDLE);
Dries Buytaert
committed
return MigrationInterface::RESULT_FAILED;
}
catch
committed
// Get the process pipeline.
$pipeline = FALSE;
if ($source->valid()) {
Dries Buytaert
committed
try {
$pipeline = $this->migration->getProcessPlugins();
Dries Buytaert
committed
}
catch (MigrateException $e) {
$row = $source->current();
$this->sourceIdValues = $row->getSourceIdValues();
$this->getIdMap()->saveIdMapping($row, [], $e->getStatus());
$this->saveMessage($e->getMessage(), $e->getLevel());
Dries Buytaert
committed
}
}
$return = MigrationInterface::RESULT_COMPLETED;
if ($pipeline) {
$id_map = $this->getIdMap();
$destination = $this->migration->getDestinationPlugin();
while ($source->valid()) {
$row = $source->current();
$this->sourceIdValues = $row->getSourceIdValues();
catch
committed
try {
foreach ($pipeline as $destination_property_name => $plugins) {
$this->processPipeline($row, $destination_property_name, $plugins, NULL);
catch
committed
}
$save = TRUE;
catch
committed
}
catch (MigrateException $e) {
$this->getIdMap()->saveIdMapping($row, [], $e->getStatus());
$msg = sprintf("%s:%s:%s", $this->migration->getPluginId(), $destination_property_name, $e->getMessage());
$this->saveMessage($msg, $e->getLevel());
$save = FALSE;
catch
committed
}
catch (MigrateSkipRowException $e) {
if ($e->getSaveToMap()) {
$id_map->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_IGNORED);
}
if ($message = trim($e->getMessage())) {
$msg = sprintf("%s:%s: %s", $this->migration->getPluginId(), $destination_property_name, $message);
$this->saveMessage($msg, MigrationInterface::MESSAGE_INFORMATIONAL);
}
$save = FALSE;
catch
committed
}
Dries Buytaert
committed
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
if ($save) {
try {
$this->getEventDispatcher()
->dispatch(new MigratePreRowSaveEvent($this->migration, $this->message, $row), MigrateEvents::PRE_ROW_SAVE);
$destination_ids = $id_map->lookupDestinationIds($this->sourceIdValues);
$destination_id_values = $destination_ids ? reset($destination_ids) : [];
$destination_id_values = $destination->import($row, $destination_id_values);
$this->getEventDispatcher()
->dispatch(new MigratePostRowSaveEvent($this->migration, $this->message, $row, $destination_id_values), MigrateEvents::POST_ROW_SAVE);
if ($destination_id_values) {
// We do not save an idMap entry for config.
if ($destination_id_values !== TRUE) {
$id_map->saveIdMapping($row, $destination_id_values, $this->sourceRowStatus, $destination->rollbackAction());
}
}
else {
$id_map->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_FAILED);
if (!$id_map->messageCount()) {
$message = $this->t('New object was not saved, no error provided');
$this->saveMessage($message);
$this->message->display($message);
}
}
}
catch (MigrateException $e) {
$this->getIdMap()->saveIdMapping($row, [], $e->getStatus());
$this->saveMessage($e->getMessage(), $e->getLevel());
}
catch (\Exception $e) {
$this->getIdMap()
->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_FAILED);
$this->handleException($e);
}
}
Dries Buytaert
committed
$this->sourceRowStatus = MigrateIdMapInterface::STATUS_IMPORTED;
Angie Byron
committed
// Check for memory exhaustion.
if (($return = $this->checkStatus()) != MigrationInterface::RESULT_COMPLETED) {
break;
}
Angie Byron
committed
// If anyone has requested we stop, return the requested result.
if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) {
$return = $this->migration->getInterruptionResult();
$this->migration->clearInterruptionResult();
break;
}
try {
$source->next();
}
catch (\Exception $e) {
$this->message->display(
$this->t('Migration failed with source plugin exception: @e in @file line @line', [
'@e' => $e->getMessage(),
'@file' => $e->getFile(),
'@line' => $e->getLine(),
]), 'error');
$this->migration->setStatus(MigrationInterface::STATUS_IDLE);
return MigrationInterface::RESULT_FAILED;
}
Dries Buytaert
committed
}
}
$this->getEventDispatcher()->dispatch(new MigrateImportEvent($this->migration, $this->message), MigrateEvents::POST_IMPORT);
$this->migration->setStatus(MigrationInterface::STATUS_IDLE);
Dries Buytaert
committed
return $return;
}
Angie Byron
committed
/**
* {@inheritdoc}
*/
public function rollback() {
// Only begin the rollback operation if the migration is currently idle.
if ($this->migration->getStatus() !== MigrationInterface::STATUS_IDLE) {
$this->message->display($this->t('Migration @id is busy with another operation: @status', ['@id' => $this->migration->id(), '@status' => $this->t($this->migration->getStatusLabel())]), 'error');
return MigrationInterface::RESULT_FAILED;
}
// Announce that rollback is about to happen.
$this->getEventDispatcher()->dispatch(new MigrateRollbackEvent($this->migration), MigrateEvents::PRE_ROLLBACK);
Angie Byron
committed
// Optimistically assume things are going to work out; if not, $return will be
// updated to some other status.
$return = MigrationInterface::RESULT_COMPLETED;
$this->migration->setStatus(MigrationInterface::STATUS_ROLLING_BACK);
$id_map = $this->getIdMap();
Angie Byron
committed
$destination = $this->migration->getDestinationPlugin();
// Loop through each row in the map, and try to roll it back.
$id_map->rewind();
while ($id_map->valid()) {
Angie Byron
committed
$destination_key = $id_map->currentDestination();
if ($destination_key) {
$map_row = $id_map->getRowByDestination($destination_key);
if (!isset($map_row['rollback_action']) || $map_row['rollback_action'] == MigrateIdMapInterface::ROLLBACK_DELETE) {
$this->getEventDispatcher()
->dispatch(new MigrateRowDeleteEvent($this->migration, $destination_key), MigrateEvents::PRE_ROW_DELETE);
$destination->rollback($destination_key);
$this->getEventDispatcher()
->dispatch(new MigrateRowDeleteEvent($this->migration, $destination_key), MigrateEvents::POST_ROW_DELETE);
}
Angie Byron
committed
// We're now done with this row, so remove it from the map.
Angie Byron
committed
$id_map->deleteDestination($destination_key);
Angie Byron
committed
}
Alex Pott
committed
else {
// If there is no destination key the import probably failed and we can
// remove the row without further action.
$source_key = $id_map->currentSource();
$id_map->delete($source_key);
}
$id_map->next();
Angie Byron
committed
// Check for memory exhaustion.
if (($return = $this->checkStatus()) != MigrationInterface::RESULT_COMPLETED) {
break;
}
// If anyone has requested we stop, return the requested result.
if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) {
Angie Byron
committed
$return = $this->migration->getInterruptionResult();
$this->migration->clearInterruptionResult();
Angie Byron
committed
break;
}
}
// Notify modules that rollback attempt was complete.
$this->getEventDispatcher()->dispatch(new MigrateRollbackEvent($this->migration), MigrateEvents::POST_ROLLBACK);
Angie Byron
committed
$this->migration->setStatus(MigrationInterface::STATUS_IDLE);
return $return;
}
/**
* Get the ID map from the current migration.
*
* @return \Drupal\migrate\Plugin\MigrateIdMapInterface
* The ID map.
*/
protected function getIdMap() {
return $this->migration->getIdMap();
}
Dries Buytaert
committed
/**
* {@inheritdoc}
Dries Buytaert
committed
*/
public function processRow(Row $row, array $process = NULL, $value = NULL) {
foreach ($this->migration->getProcessPlugins($process) as $destination => $plugins) {
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
$this->processPipeline($row, $destination, $plugins, $value);
}
}
/**
* Runs a process pipeline.
*
* @param \Drupal\migrate\Row $row
* The $row to be processed.
* @param string $destination
* The destination property name.
* @param array $plugins
* The process pipeline plugins.
* @param mixed $value
* (optional) Initial value of the pipeline for the destination.
*
* @see \Drupal\migrate\MigrateExecutableInterface::processRow
*
* @throws \Drupal\migrate\MigrateException
*/
protected function processPipeline(Row $row, string $destination, array $plugins, $value) {
$multiple = FALSE;
/** @var \Drupal\migrate\Plugin\MigrateProcessInterface $plugin */
foreach ($plugins as $plugin) {
$definition = $plugin->getPluginDefinition();
// Many plugins expect a scalar value but the current value of the
// pipeline might be multiple scalars (this is set by the previous plugin)
// and in this case the current value needs to be iterated and each scalar
// separately transformed.
if ($multiple && !$definition['handle_multiples']) {
$new_value = [];
if (!is_array($value)) {
throw new MigrateException(sprintf('Pipeline failed at %s plugin for destination %s: %s received instead of an array,', $plugin->getPluginId(), $destination, $value));
$break = FALSE;
foreach ($value as $scalar_value) {
Dries Buytaert
committed
try {
$new_value[] = $plugin->transform($scalar_value, $this, $row, $destination);
Dries Buytaert
committed
}
catch (MigrateSkipProcessException $e) {
$new_value[] = NULL;
$break = TRUE;
Dries Buytaert
committed
}
catch (MigrateException $e) {
// Prepend the process plugin id to the message.
$message = sprintf("%s: %s", $plugin->getPluginId(), $e->getMessage());
throw new MigrateException($message);
}
}
$value = $new_value;
if ($break) {
break;
Dries Buytaert
committed
}
else {
try {
$value = $plugin->transform($value, $this, $row, $destination);
}
catch (MigrateSkipProcessException $e) {
$value = NULL;
break;
}
catch (MigrateException $e) {
// Prepend the process plugin id to the message.
$message = sprintf("%s: %s", $plugin->getPluginId(), $e->getMessage());
throw new MigrateException($message);
}
$multiple = $plugin->multiple();
}
}
// Ensure all values, including nulls, are migrated.
if ($plugins) {
if (isset($value)) {
$row->setDestinationProperty($destination, $value);
}
else {
$row->setEmptyDestinationProperty($destination);
catch
committed
}
Dries Buytaert
committed
}
}
/**
catch
committed
* Fetches the key array for the current source record.
Dries Buytaert
committed
*
* @return array
catch
committed
* The current source IDs.
Dries Buytaert
committed
*/
protected function currentSourceIds() {
return $this->getSource()->getCurrentIds();
}
/**
* {@inheritdoc}
Dries Buytaert
committed
*/
public function saveMessage($message, $level = MigrationInterface::MESSAGE_ERROR) {
$this->getIdMap()->saveMessage($this->sourceIdValues, $message, $level);
Dries Buytaert
committed
}
catch
committed
/**
* Takes an Exception object and both saves and displays it.
*
* Pulls in additional information on the location triggering the exception.
Dries Buytaert
committed
*
* @param \Exception $exception
catch
committed
* Object representing the exception.
* @param bool $save
* (optional) Whether to save the message in the migration's mapping table.
* Set to FALSE in contexts where this doesn't make sense.
Dries Buytaert
committed
*/
protected function handleException(\Exception $exception, $save = TRUE) {
catch
committed
$result = Error::decodeException($exception);
$message = $result['@message'] . ' (' . $result['%file'] . ':' . $result['%line'] . ')';
Dries Buytaert
committed
if ($save) {
$this->saveMessage($message);
}
Dries Buytaert
committed
$this->message->display($message, 'error');
Dries Buytaert
committed
}
Angie Byron
committed
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
/**
* Checks for exceptional conditions, and display feedback.
*/
protected function checkStatus() {
if ($this->memoryExceeded()) {
return MigrationInterface::RESULT_INCOMPLETE;
}
return MigrationInterface::RESULT_COMPLETED;
}
/**
* Tests whether we've exceeded the desired memory threshold.
*
* If so, output a message.
*
* @return bool
* TRUE if the threshold is exceeded, otherwise FALSE.
*/
protected function memoryExceeded() {
$usage = $this->getMemoryUsage();
$pct_memory = $usage / $this->memoryLimit;
if (!$threshold = $this->memoryThreshold) {
return FALSE;
}
if ($pct_memory > $threshold) {
$this->message->display(
$this->t(
'Memory usage is @usage (@pct% of limit @limit), reclaiming memory.',
[
'@pct' => round($pct_memory * 100),
'@usage' => $this->formatSize($usage),
'@limit' => $this->formatSize($this->memoryLimit),
]
),
'warning'
);
Angie Byron
committed
$usage = $this->attemptMemoryReclaim();
$pct_memory = $usage / $this->memoryLimit;
// Use a lower threshold - we don't want to be in a situation where we keep
// coming back here and trimming a tiny amount
if ($pct_memory > (0.90 * $threshold)) {
$this->message->display(
$this->t(
'Memory usage is now @usage (@pct% of limit @limit), not enough reclaimed, starting new batch',
[
'@pct' => round($pct_memory * 100),
'@usage' => $this->formatSize($usage),
'@limit' => $this->formatSize($this->memoryLimit),
]
),
'warning'
);
Angie Byron
committed
return TRUE;
}
else {
$this->message->display(
$this->t(
'Memory usage is now @usage (@pct% of limit @limit), reclaimed enough, continuing',
[
'@pct' => round($pct_memory * 100),
'@usage' => $this->formatSize($usage),
'@limit' => $this->formatSize($this->memoryLimit),
]
),
Angie Byron
committed
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
'warning');
return FALSE;
}
}
else {
return FALSE;
}
}
/**
* Returns the memory usage so far.
*
* @return int
* The memory usage.
*/
protected function getMemoryUsage() {
return memory_get_usage();
}
/**
* Tries to reclaim memory.
*
* @return int
* The memory usage after reclaim.
*/
protected function attemptMemoryReclaim() {
// First, try resetting Drupal's static storage - this frequently releases
// plenty of memory to continue.
drupal_static_reset();
Angie Byron
committed
// Entity storage can blow up with caches, so clear it out.
\Drupal::service('entity.memory_cache')->deleteAll();
Angie Byron
committed
Angie Byron
committed
// @TODO: explore resetting the container.
Angie Byron
committed
// Run garbage collector to further reduce memory.
gc_collect_cycles();
Angie Byron
committed
return memory_get_usage();
}
/**
* Generates a string representation for the given byte count.
*
* @param int $size
* A size in bytes.
*
* @return string
* A translated string representation of the size.
*/
protected function formatSize($size) {
return format_size($size);
}