Commit 260f9367 authored by alexpott's avatar alexpott

Issue #2427335 by benjy, chx, dawehner: Combine legacy Source class into SourcePluginBase

parent 08ce5a70
......@@ -162,7 +162,7 @@ class MigrateExecutable implements MigrateExecutableInterface {
/**
* The source.
*
* @var \Drupal\migrate\Source
* @var \Drupal\migrate\Plugin\MigrateSourceInterface
*/
protected $source;
......@@ -219,12 +219,16 @@ public function __construct(MigrationInterface $migration, MigrateMessageInterfa
*
* Makes sure source is initialized based on migration settings.
*
* @return \Drupal\migrate\Source
* @return \Drupal\migrate\Plugin\MigrateSourceInterface
* The source.
*/
protected function getSource() {
if (!isset($this->source)) {
$this->source = new Source($this->migration, $this);
$this->source = $this->migration->getSourcePlugin();
// @TODO, find out how to remove this.
// @see https://drupal.org/node/2443617
$this->source->migrateExecutable = $this;
}
return $this->source;
}
......@@ -256,13 +260,11 @@ public function import() {
}
catch (\Exception $e) {
$this->message->display(
$this->t('Migration failed with source plugin exception: !e',
array('!e' => $e->getMessage())), 'error');
$this->t('Migration failed with source plugin exception: !e', array('!e' => $e->getMessage())), 'error');
return MigrationInterface::RESULT_FAILED;
}
$destination = $this->migration->getDestinationPlugin();
while ($source->valid()) {
$row = $source->current();
if ($this->sourceIdValues = $row->getSourceIdValues()) {
......
......@@ -19,7 +19,7 @@
*
* @ingroup migration
*/
interface MigrateSourceInterface extends \Countable, PluginInspectionInterface {
interface MigrateSourceInterface extends \Countable, \Iterator, PluginInspectionInterface {
/**
* Returns available fields on the source.
......
......@@ -30,7 +30,7 @@ public function fields() {
/**
* {@inheritdoc}
*/
public function getIterator() {
public function initializeIterator() {
return new \ArrayIterator(array(array('id' => '')));
}
......
......@@ -9,6 +9,8 @@
use Drupal\Core\Plugin\PluginBase;
use Drupal\migrate\Entity\MigrationInterface;
use Drupal\migrate\MigrateException;
use Drupal\migrate\Plugin\MigrateIdMapInterface;
use Drupal\migrate\Plugin\MigrateSourceInterface;
use Drupal\migrate\Row;
......@@ -22,7 +24,7 @@
*
* @ingroup migration
*/
abstract class SourcePluginBase extends PluginBase implements MigrateSourceInterface {
abstract class SourcePluginBase extends PluginBase implements MigrateSourceInterface {
/**
* @var \Drupal\Core\Extension\ModuleHandlerInterface
......@@ -34,14 +36,154 @@ abstract class SourcePluginBase extends PluginBase implements MigrateSourceInter
*/
protected $migration;
/**
* The name and type of the highwater property in the source.
*
* @var array
*
* @see $originalHighwater
*/
protected $highWaterProperty;
/**
* The current row from the query
*
* @var \Drupal\Migrate\Row
*/
protected $currentRow;
/**
* The primary key of the current row
*
* @var array
*/
protected $currentSourceIds;
/**
* Number of rows intentionally ignored (prepareRow() returned FALSE)
*
* @var int
*/
protected $numIgnored = 0;
/**
* Number of rows we've at least looked at.
*
* @var int
*/
protected $numProcessed = 0;
/**
* The high water mark at the beginning of the import operation.
*
* If the source has a property for tracking changes (like Drupal ha
* node.changed) then this is the highest value of those imported so far.
*
* @var int
*/
protected $originalHighWater;
/**
* List of source IDs to process.
*
* @var array
*/
protected $idList = array();
/**
* Whether this instance should cache the source count.
*
* @var boolean
*/
protected $cacheCounts = FALSE;
/**
* Key to use for caching counts.
*
* @var string
*/
protected $cacheKey;
/**
* Whether this instance should not attempt to count the source.
*
* @var boolean
*/
protected $skipCount = FALSE;
/**
* If TRUE, we will maintain hashed source rows to determine whether incoming
* data has changed.
*
* @var bool
*/
protected $trackChanges = FALSE;
/**
* By default, next() will directly read the map row and add it to the data
* row. A source plugin implementation may do this itself (in particular, the
* SQL source can incorporate the map table into the query) - if so, it should
* set this TRUE so we don't duplicate the effort.
*
* @var bool
*/
protected $mapRowAdded = FALSE;
/**
* @var \Drupal\Core\Cache\CacheBackendInterface
*/
protected $cache;
/**
* @var \Drupal\migrate\Plugin\MigrateIdMapInterface
*/
protected $idMap;
/**
* @var \Iterator
*/
protected $iterator;
// @TODO, find out how to remove this.
// @see https://drupal.org/node/2443617
public $migrateExecutable;
/**
* {@inheritdoc}
*/
public function __construct(array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration) {
parent::__construct($configuration, $plugin_id, $plugin_definition);
$this->migration = $migration;
// Set up some defaults based on the source configuration.
$this->cacheCounts = !empty($configuration['cache_counts']);
$this->skipCount = !empty($configuration['skip_count']);
$this->cacheKey = !empty($configuration['cache_key']) ? !empty($configuration['cache_key']) : NULL;
$this->trackChanges = !empty($configuration['track_changes']) ? $configuration['track_changes'] : FALSE;
// Pull out the current highwater mark if we have a highwater property.
if ($this->highWaterProperty = $this->migration->get('highWaterProperty')) {
$this->originalHighWater = $this->migration->getHighWater();
}
if ($id_list = $this->migration->get('idlist')) {
$this->idList = $id_list;
}
// Don't allow the use of both highwater and track changes together.
if ($this->highWaterProperty && $this->trackChanges) {
throw new MigrateException('You should either use a highwater mark or track changes not both. They are both designed to solve the same problem');
}
}
/**
* Initialize the iterator with the source data.
*
* @return array
* An array of the data for this source.
*/
protected abstract function initializeIterator();
/**
* Get the module handler.
*
......@@ -59,12 +201,259 @@ protected function getModuleHandler() {
* {@inheritdoc}
*/
public function prepareRow(Row $row) {
$result = TRUE;
$result_hook = $this->getModuleHandler()->invokeAll('migrate_prepare_row', array($row, $this, $this->migration));
$result_named_hook = $this->getModuleHandler()->invokeAll('migrate_' . $this->migration->id() . '_prepare_row', array($row, $this, $this->migration));
// If any of the hooks returned false, we want to skip the row.
// We're explicitly skipping this row - keep track in the map table.
if (($result_hook && in_array(FALSE, $result_hook)) || ($result_named_hook && in_array(FALSE, $result_named_hook))) {
return FALSE;
// Make sure we replace any previous messages for this item with any
// new ones.
$id_map = $this->migration->getIdMap();
$id_map->delete($this->currentSourceIds, TRUE);
$this->migrateExecutable->saveQueuedMessages();
$id_map->saveIdMapping($row, array(), MigrateIdMapInterface::STATUS_IGNORED, $this->migrateExecutable->rollbackAction);
$this->numIgnored++;
$this->currentRow = NULL;
$this->currentSourceIds = NULL;
$result = FALSE;
}
elseif ($this->trackChanges) {
// When tracking changed data, We want to quietly skip (rather than
// "ignore") rows with changes. The caller needs to make that decision,
// so we need to provide them with the necessary information (before and
// after hashes).
$row->rehash();
}
$this->numProcessed++;
return $result;
}
/**
* Returns the iterator that will yield the row arrays to be processed.
*
* @return \Iterator
*/
public function getIterator() {
if (!isset($this->iterator)) {
$this->iterator = $this->initializeIterator();
}
return $this->iterator;
}
/**
* {@inheritdoc}
*/
public function current() {
return $this->currentRow;
}
/**
* Get the iterator key.
*
* Implementation of Iterator::key - called when entering a loop iteration,
* returning the key of the current row. It must be a scalar - we will
* serialize to fulfill the requirement, but using getCurrentIds() is
* preferable.
*/
public function key() {
return serialize($this->currentSourceIds);
}
/**
* Whether the iterator is currently valid.
*
* Implementation of Iterator::valid() - called at the top of the loop,
* returning TRUE to process the loop and FALSE to terminate it
*/
public function valid() {
return isset($this->currentRow);
}
/**
* Rewind the iterator.
*
* Implementation of Iterator::rewind() - subclasses of MigrateSource should
* implement performRewind() to do any class-specific setup for iterating
* source records.
*/
public function rewind() {
$this->idMap = $this->migration->getIdMap();
$this->numProcessed = 0;
$this->numIgnored = 0;
$this->getIterator()->rewind();
$this->next();
}
/**
* {@inheritdoc}
*
* The migration iterates over rows returned by the source plugin. This
* method determines the next row which will be processed and imported into
* the system.
*
* The method tracks the source and destination IDs using the ID map plugin.
*
* This also takes care about highwater support. Highwater allows to reimport
* rows from a previous migration run, which got changed in the meantime.
* This is done by specifying a highwater field, which is compared with the
* last time, the migration got executed (originalHighWater).
*/
public function next() {
$this->currentSourceIds = NULL;
$this->currentRow = NULL;
$source_configuration = $this->migration->get('source');
// In order to find the next row we want to process, we ask the source
// plugin for the next possible row.
while (!isset($this->currentRow) && $this->getIterator()->valid()) {
$row_data = $this->getIterator()->current() + $source_configuration;
$this->getIterator()->next();
$row = new Row($row_data, $this->migration->getSourcePlugin()->getIds(), $this->migration->get('destinationIds'));
// Populate the source key for this row.
$this->currentSourceIds = $row->getSourceIdValues();
// Pick up the existing map row, if any, unless getNextRow() did it.
if (!$this->mapRowAdded && ($id_map = $this->idMap->getRowBySource($this->currentSourceIds))) {
$row->setIdMap($id_map);
}
// In case we have specified an ID list, but the ID given by the source is
// not in there, we skip the row.
$id_in_the_list = $this->idList && in_array(reset($this->currentSourceIds), $this->idList);
if ($this->idList && !$id_in_the_list) {
continue;
}
// Preparing the row gives source plugins the chance to skip.
if ($this->prepareRow($row) === FALSE) {
continue;
}
// Check whether the row needs processing.
// 1. Explicitly specified IDs.
// 2. This row has not been imported yet.
// 3. Explicitly set to update.
// 4. The row is newer than the current highwater mark.
// 5. If no such property exists then try by checking the hash of the row.
if ($id_in_the_list || !$row->getIdMap() || $row->needsUpdate() || $this->aboveHighwater($row) || $this->rowChanged($row) ) {
$this->currentRow = $row->freezeSource();
}
}
}
/**
* Check if the incoming data is newer than what we've previously imported.
*
* @param \Drupal\migrate\Row $row
* The row we're importing.
*
* @return bool
* TRUE if the highwater value in the row is greater than our current value.
*/
protected function aboveHighwater(Row $row) {
return $this->highWaterProperty && $row->getSourceProperty($this->highWaterProperty['name']) > $this->originalHighWater;
}
/**
* Check if the incoming row has changed since our last import.
*
* @param \Drupal\migrate\Row $row
* The row we're importing.
*
* @return bool
* TRUE if the row has changed otherwise FALSE.
*/
protected function rowChanged(Row $row) {
return $this->trackChanges && $row->changed();
}
/**
* Getter for currentSourceIds data member.
*/
public function getCurrentIds() {
return $this->currentSourceIds;
}
/**
* Getter for numIgnored data member.
*/
public function getIgnored() {
return $this->numIgnored;
}
/**
* Getter for numProcessed data member.
*/
public function getProcessed() {
return $this->numProcessed;
}
/**
* Reset numIgnored back to 0.
*/
public function resetStats() {
$this->numIgnored = 0;
}
/**
* Get the source count.
*
* Return a count of available source records, from the cache if appropriate.
* Returns -1 if the source is not countable.
*
* @param bool $refresh
* Whether or not to refresh the count.
*
* @return int
* The count.
*/
public function count($refresh = FALSE) {
if ($this->skipCount) {
return -1;
}
if (!isset($this->cacheKey)) {
$this->cacheKey = hash('sha256', $this->getPluginId());
}
// If a refresh is requested, or we're not caching counts, ask the derived
// class to get the count from the source.
if ($refresh || !$this->cacheCounts) {
$count = $this->getIterator()->count();
$this->getCache()->set($this->cacheKey, $count, 'cache');
}
else {
// Caching is in play, first try to retrieve a cached count.
$cache_object = $this->getCache()->get($this->cacheKey, 'cache');
if (is_object($cache_object)) {
// Success.
$count = $cache_object->data;
}
else {
// No cached count, ask the derived class to count 'em up, and cache
// the result.
$count = $this->getIterator()->count();
$this->getCache()->set($this->cacheKey, $count, 'cache');
}
}
return $count;
}
/**
* Get the cache object.
*
* @return \Drupal\Core\Cache\CacheBackendInterface
* The cache object.
*/
protected function getCache() {
if (!isset($this->cache)) {
$this->cache = \Drupal::cache('migrate');
}
return $this->cache;
}
}
......@@ -112,7 +112,7 @@ protected function prepareQuery() {
* We could simply execute the query and be functionally correct, but
* we will take advantage of the PDO-based API to optimize the query up-front.
*/
protected function runQuery() {
protected function initializeIterator() {
$this->prepareQuery();
$high_water_property = $this->migration->get('highWaterProperty');
......@@ -203,18 +203,6 @@ public function count() {
return $this->query()->countQuery()->execute()->fetchField();
}
/**
* Returns the iterator that will yield the row arrays to be processed.
*
* @return \Iterator
*/
public function getIterator() {
if (!isset($this->iterator)) {
$this->iterator = $this->runQuery();
}
return $this->iterator;
}
/**
* Check if we can join against the map table.
*
......
......@@ -176,9 +176,12 @@ public function setSourceProperty($property, $data) {
/**
* Freezes the source.
*
* @return $this
*/
public function freezeSource() {
$this->frozen = TRUE;
return $this;
}
/**
......
<?php
/**
* @file
* Contains \Drupal\migrate\Plugin\migrate\source\SourceBase.
*/
namespace Drupal\migrate;
use Drupal\migrate\Entity\MigrationInterface;
use Drupal\migrate\Plugin\MigrateIdMapInterface;
/**
* Source is a caching / decision making wrapper around the source plugin.
*
* Derived classes are expected to define __toString(), returning a string
* describing the source and significant options, i.e. the query.
*
* @see \Drupal\migrate\MigrateSourceInterface
*/
class Source implements \Iterator, \Countable {
/**
* The current row from the quey
*
* @var \Drupal\Migrate\Row
*/
protected $currentRow;
/**
* The primary key of the current row
*
* @var array
*/
protected $currentIds;
/**
* Number of rows intentionally ignored (prepareRow() returned FALSE)
*
* @var int
*/
protected $numIgnored = 0;
/**
* Number of rows we've at least looked at.
*
* @var int
*/
protected $numProcessed = 0;
/**
* The high water mark at the beginning of the import operation.
*
* @var
*/
protected $originalHighWater = '';
/**
* List of source IDs to process.
*
* @var array
*/
protected $idList = array();
/**
* Whether this instance should cache the source count.
*
* @var boolean
*/
protected $cacheCounts = FALSE;
/**
* Key to use for caching counts.
*
* @var string
*/
protected $cacheKey;
/**
* Whether this instance should not attempt to count the source.
*
* @var boolean
*/
protected $skipCount = FALSE;
/**