Commit 265ee77e authored by Dries's avatar Dries

Issue #2213451 by andypost, bdone, benjy, penyaskito, chx, claudiu.cristea,...

Issue #2213451 by andypost, bdone, benjy, penyaskito, chx, claudiu.cristea, damiankloip, gregboggs, InternetDevels, jessehs, jhedstrom, marvil07, mikeryan, pcambra, Xano, YesCT: Update Migrate API in core
parent c7592975
<?php
/**
* @file
* Contains \Drupal\migrate\Annotation\MigrateDestination.
*/
namespace Drupal\migrate\Annotation;
use Drupal\Component\Annotation\Plugin;
/**
* Defines a migration destination plugin annotation object.
*
* @Annotation
*/
class MigrateDestination extends Plugin {
/**
* A unique identifier for the process plugin.
*
* @var string
*/
public $id;
/**
* Whether requirements are met.
*
* If TRUE and a 'provider' key is present in the annotation then the
* default destination plugin manager will set this to FALSE if the
* provider (module/theme) doesn't exist.
*
* @var bool
*/
public $requirements_met = TRUE;
/**
* A class to make the plugin derivative aware.
*
* @var string
*
* @see \Drupal\Component\Plugin\Discovery\DerivativeDiscoveryDecorator
*/
public $derivative;
}
<?php
/**
* @file
* Contains \Drupal\migrate\Annotation\MigrateDestination.
*/
namespace Drupal\migrate\Annotation;
use Drupal\Component\Annotation\Plugin;
/**
* Defines a migration destination plugin annotation object.
*
* @Annotation
*/
class MigrateSource extends Plugin {
/**
* A unique identifier for the process plugin.
*
* @var string
*/
public $id;
/**
* Whether requirements are met.
*
* @var bool
*/
public $requirements_met = TRUE;
}
......@@ -10,6 +10,7 @@
use Drupal\Core\Config\Entity\ConfigEntityBase;
use Drupal\migrate\MigrateException;
use Drupal\migrate\Plugin\MigrateIdMapInterface;
use Drupal\migrate\Plugin\RequirementsInterface;
/**
* Defines the Migration entity.
......@@ -22,12 +23,7 @@
* label = @Translation("Migration"),
* module = "migrate",
* controllers = {
* "list" = "Drupal\Core\Config\Entity\DraggableListController",
* "form" = {
* "add" = "Drupal\Core\Entity\EntityFormController",
* "edit" = "Drupal\Core\Entity\EntityFormController",
* "delete" = "Drupal\Core\Entity\EntityFormController"
* }
* "storage" = "Drupal\migrate\MigrationStorageController"
* },
* entity_keys = {
* "id" = "id",
......@@ -36,7 +32,7 @@
* }
* )
*/
class Migration extends ConfigEntityBase implements MigrationInterface {
class Migration extends ConfigEntityBase implements MigrationInterface, RequirementsInterface {
/**
* The migration ID (machine name).
......@@ -82,6 +78,13 @@ class Migration extends ConfigEntityBase implements MigrationInterface {
*/
public $process;
/**
* The cached process plugins.
*
* @var array
*/
protected $processPlugins = array();
/**
* The destination configuration, with at least a 'plugin' key.
*
......@@ -132,7 +135,7 @@ class Migration extends ConfigEntityBase implements MigrationInterface {
*
* @var array
*/
public $destinationIds = array();
public $destinationIds = FALSE;
/**
* Information on the highwater mark.
......@@ -161,16 +164,6 @@ class Migration extends ConfigEntityBase implements MigrationInterface {
*/
public $sourceRowStatus = MigrateIdMapInterface::STATUS_IMPORTED;
/**
* The ratio of the memory limit at which an operation will be interrupted.
*
* Can be overridden by a Migration subclass if one would like to push the
* envelope. Defaults to 0.85.
*
* @var float
*/
protected $memoryThreshold = 0.85;
/**
* @var \Drupal\Core\KeyValueStore\KeyValueStoreInterface
*/
......@@ -182,21 +175,18 @@ class Migration extends ConfigEntityBase implements MigrationInterface {
public $trackLastImported = FALSE;
/**
* The ratio of the time limit at which an operation will be interrupted.
* These migrations must be already executed before this migration can run.
*
* Can be overridden by a Migration subclass if one would like to push the
* envelope. Defaults to 0.9.
*
* @var float
* @var array
*/
public $timeThreshold = 0.90;
protected $requirements = array();
/**
* The time limit when executing the migration.
* These migrations, if ran at all, must be executed before this migration.
*
* @var array
*/
public $limit = array();
public $dependencies = array();
/**
* {@inheritdoc}
......@@ -215,23 +205,25 @@ public function getProcessPlugins(array $process = NULL) {
if (!isset($process)) {
$process = $this->process;
}
$process_plugins = array();
foreach ($this->getProcessNormalized($process) as $property => $configurations) {
$process_plugins[$property] = array();
foreach ($configurations as $configuration) {
if (isset($configuration['source'])) {
$process_plugins[$property][] = \Drupal::service('plugin.manager.migrate.process')->createInstance('get', $configuration, $this);
}
// Get is already handled.
if ($configuration['plugin'] != 'get') {
$process_plugins[$property][] = \Drupal::service('plugin.manager.migrate.process')->createInstance($configuration['plugin'], $configuration, $this);
}
if (!$process_plugins[$property]) {
throw new MigrateException("Invalid process configuration for $property");
$index = serialize($process);
if (!isset($this->processPlugins[$index])) {
foreach ($this->getProcessNormalized($process) as $property => $configurations) {
$this->processPlugins[$index][$property] = array();
foreach ($configurations as $configuration) {
if (isset($configuration['source'])) {
$this->processPlugins[$index][$property][] = \Drupal::service('plugin.manager.migrate.process')->createInstance('get', $configuration, $this);
}
// Get is already handled.
if ($configuration['plugin'] != 'get') {
$this->processPlugins[$index][$property][] = \Drupal::service('plugin.manager.migrate.process')->createInstance($configuration['plugin'], $configuration, $this);
}
if (!$this->processPlugins[$index][$property]) {
throw new MigrateException("Invalid process configuration for $property");
}
}
}
}
return $process_plugins;
return $this->processPlugins[$index];
}
/**
......@@ -283,7 +275,10 @@ public function getIdMap() {
}
/**
* Get the highwater storage object.
*
* @return \Drupal\Core\KeyValueStore\KeyValueStoreInterface
* The storage object.
*/
protected function getHighWaterStorage() {
if (!isset($this->highwaterStorage)) {
......@@ -292,11 +287,56 @@ protected function getHighWaterStorage() {
return $this->highwaterStorage;
}
/**
* {@inheritdoc}
*/
public function getHighwater() {
return $this->getHighWaterStorage()->get($this->id());
}
/**
* {@inheritdoc}
*/
public function saveHighwater($highwater) {
$this->getHighWaterStorage()->set($this->id(), $highwater);
}
/**
* {@inheritdoc}
*/
public function checkRequirements() {
// Check whether the current migration source and destination plugin
// requirements are met or not.
try {
if ($this->getSourcePlugin() instanceof RequirementsInterface && !$this->getSourcePlugin()->checkRequirements()) {
return FALSE;
}
if ($this->getDestinationPlugin() instanceof RequirementsInterface && !$this->getDestinationPlugin()->checkRequirements()) {
return FALSE;
}
/** @var \Drupal\migrate\Entity\MigrationInterface[] $required_migrations */
$required_migrations = \Drupal::entityManager()->getStorageController('migration')->loadMultiple($this->requirements);
// Check if the dependencies are in good shape.
foreach ($required_migrations as $required_migration) {
// If the dependent source migration has no IDs then no mappings can
// be recorded thus it is impossible to see whether the migration ran.
if (!$required_migration->getSourcePlugin()->getIds()) {
return FALSE;
}
// If the dependent migration has not processed any record, it means the
// dependency requirements are not met.
if (!$required_migration->getIdMap()->processedCount()) {
return FALSE;
}
}
}
catch (\Exception $e) {
return FALSE;
}
return TRUE;
}
}
......@@ -84,10 +84,22 @@ public function getDestinationPlugin();
public function getIdMap();
/**
* The current value of the highwater mark.
*
* The highwater mark defines a timestamp stating the time the import was last
* run. If the mark is set, only content with a higher timestamp will be
* imported.
*
* @return int
*/
public function getHighwater();
/**
* Save the new highwater mark.
*
* @param int $highwater
* The highwater timestamp.
*/
public function saveHighwater($highwater);
}
<?php
/**
* @file
* Contains \Drupal\migrate\MigrateBuildDependencyInterface.
*/
namespace Drupal\migrate;
interface MigrateBuildDependencyInterface {
/**
* Builds a dependency tree for the migrations and set their order.
*
* @param \Drupal\migrate\Entity\MigrationInterface[] $migrations
* Array of loaded migrations with their declared dependencies.
* @param array $dynamic_ids
* Keys are dynamic ids (for example node:*) values are a list of loaded
* migration ids (for example node:page, node:article).
*
* @return array
* An array of migrations.
*/
public function buildDependencyMigration(array $migrations, array $dynamic_ids);
}
......@@ -48,7 +48,7 @@ class MigrateException extends \Exception {
* The status of the item for the map table, a MigrateMap::STATUS_*
* constant.
*/
public function __construct($message = null, $code = 0, \Exception $previous = null, $level = MigrationInterface::MESSAGE_ERROR, $status = MigrateIdMapInterface::STATUS_FAILED) {
public function __construct($message = NULL, $code = 0, \Exception $previous = NULL, $level = MigrationInterface::MESSAGE_ERROR, $status = MigrateIdMapInterface::STATUS_FAILED) {
$this->level = $level;
$this->status = $status;
parent::__construct($message);
......@@ -58,6 +58,7 @@ public function __construct($message = null, $code = 0, \Exception $previous = n
* Gets the level.
*
* @return int
* An integer status code. @see Migration::MESSAGE_*
*/
public function getLevel() {
return $this->level;
......@@ -67,6 +68,7 @@ public function getLevel() {
* Gets the status of the current item.
*
* @return int
* An integer status code. @see MigrateMap::STATUS_*
*/
public function getStatus() {
return $this->status;
......
......@@ -84,6 +84,27 @@ class MigrateExecutable {
*/
protected $maxExecTime;
/**
* The ratio of the memory limit at which an operation will be interrupted.
*
* @var float
*/
protected $memoryThreshold = 0.85;
/**
* The ratio of the time limit at which an operation will be interrupted.
*
* @var float
*/
public $timeThreshold = 0.90;
/**
* The time limit when executing the migration.
*
* @var array
*/
public $limit = array();
/**
* The configuration values of the source.
*
......@@ -216,9 +237,14 @@ public function getSource() {
* Performs an import operation - migrate items from source to destination.
*/
public function import() {
// Knock off migration if the requirements haven't been met.
if (!$this->migration->checkRequirements()) {
$this->message->display(
$this->t('Migration @id did not meet the requirements', array('@id' => $this->migration->id())), 'error');
return MigrationInterface::RESULT_FAILED;
}
$return = MigrationInterface::RESULT_COMPLETED;
$source = $this->getSource();
$destination = $this->migration->getDestinationPlugin();
$id_map = $this->migration->getIdMap();
try {
......@@ -227,10 +253,12 @@ public function import() {
catch (\Exception $e) {
$this->message->display(
$this->t('Migration failed with source plugin exception: !e',
array('!e' => $e->getMessage())));
array('!e' => $e->getMessage())), 'error');
return MigrationInterface::RESULT_FAILED;
}
$destination = $this->migration->getDestinationPlugin();
while ($source->valid()) {
$row = $source->current();
if ($this->sourceIdValues = $row->getSourceIdValues()) {
......@@ -250,11 +278,12 @@ public function import() {
if ($save) {
try {
$destination_id_values = $destination->import($row);
// @todo Handle the successful but no ID case like config,
// https://drupal.org/node/2160835.
$destination_id_values = $destination->import($row, $id_map->lookupDestinationId($this->sourceIdValues));
if ($destination_id_values) {
$id_map->saveIdMapping($row, $destination_id_values, $this->sourceRowStatus, $this->rollbackAction);
// We do not save an idMap entry for config.
if ($destination_id_values !== TRUE) {
$id_map->saveIdMapping($row, $destination_id_values, $this->sourceRowStatus, $this->rollbackAction);
}
$this->successesSinceFeedback++;
$this->totalSuccesses++;
}
......@@ -270,7 +299,7 @@ public function import() {
catch (MigrateException $e) {
$this->migration->getIdMap()->saveIdMapping($row, array(), $e->getStatus(), $this->rollbackAction);
$this->saveMessage($e->getMessage(), $e->getLevel());
$this->message->display($e->getMessage());
$this->message->display($e->getMessage(), 'error');
}
catch (\Exception $e) {
$this->migration->getIdMap()->saveIdMapping($row, array(), MigrateIdMapInterface::STATUS_FAILED, $this->rollbackAction);
......@@ -299,7 +328,7 @@ public function import() {
catch (\Exception $e) {
$this->message->display(
$this->t('Migration failed with source plugin exception: !e',
array('!e' => $e->getMessage())));
array('!e' => $e->getMessage())), 'error');
return MigrationInterface::RESULT_FAILED;
}
}
......@@ -345,13 +374,27 @@ public function processRow(Row $row, array $process = NULL, $value = NULL) {
if (!is_array($value)) {
throw new MigrateException(sprintf('Pipeline failed for destination %s: %s got instead of an array,', $destination, $value));
}
$break = FALSE;
foreach ($value as $scalar_value) {
$new_value[] = $plugin->transform($scalar_value, $this, $row, $destination);
try {
$new_value[] = $plugin->transform($scalar_value, $this, $row, $destination);
}
catch (MigrateSkipProcessException $e) {
$break = TRUE;
}
}
$value = $new_value;
if ($break) {
break;
}
}
else {
$value = $plugin->transform($value, $this, $row, $destination);
try {
$value = $plugin->transform($value, $this, $row, $destination);
}
catch (MigrateSkipProcessException $e) {
break;
}
$multiple = $multiple || $plugin->multiple();
}
}
......@@ -402,7 +445,7 @@ protected function timeOptionExceeded() {
* The time limit, NULL if no limit or if the units were not in seconds.
*/
public function getTimeLimit() {
$limit = $this->migration->get('limit');
$limit = $this->limit;
if (isset($limit['unit']) && isset($limit['value']) && ($limit['unit'] == 'seconds' || $limit['unit'] == 'second')) {
return $limit['value'];
}
......@@ -488,7 +531,7 @@ protected function checkStatus() {
protected function memoryExceeded() {
$usage = $this->getMemoryUsage();
$pct_memory = $usage / $this->memoryLimit;
if (!$threshold = $this->migration->get('memoryThreshold')) {
if (!$threshold = $this->memoryThreshold) {
return FALSE;
}
if ($pct_memory > $threshold) {
......@@ -570,7 +613,7 @@ protected function formatSize($size) {
* TRUE if the threshold is exceeded, FALSE if not.
*/
protected function maxExecTimeExceeded() {
return $this->maxExecTime && (($this->getTimeElapsed() / $this->maxExecTime) > $this->migration->get('timeThreshold'));
return $this->maxExecTime && (($this->getTimeElapsed() / $this->maxExecTime) > $this->timeThreshold);
}
/**
......@@ -599,7 +642,7 @@ public function handleException(\Exception $exception, $save = TRUE) {
if ($save) {
$this->saveMessage($message);
}
$this->message->display($message);
$this->message->display($message, 'error');
}
/**
......
......@@ -13,15 +13,20 @@
class MigrateMessage implements MigrateMessageInterface {
/**
* Displays a migrate message.
* The map between migrate status and watchdog severity.
*
* @param string $message
* The message to display.
* @param string $type
* The type of message, for example: status or warning.
* @var array
*/
function display($message, $type = 'status') {
drupal_set_message($message, $type);
protected $map = array(
'status' => WATCHDOG_INFO,
'error' => WATCHDOG_ERROR,
);
/**
* {@inheritdoc}
*/