Commit 0308eb77 authored by alexpott's avatar alexpott

Issue #2485385 by phenaproxima, quietone, Berdir, blazey, hussainweb,...

Issue #2485385 by phenaproxima, quietone, Berdir, blazey, hussainweb, mikeryan, benjy, alexpott: Move highwater field support to the source plugin, and do not expose its internals on MigrationInterface
parent 0c29f756
......@@ -15,7 +15,7 @@ class CommentSourceWithHighWaterTest extends CommentTestBase {
* {@inheritdoc}
*/
protected function setUp() {
$this->migrationConfiguration['highWaterProperty']['field'] = 'timestamp';
$this->migrationConfiguration['source']['high_water_property']['name'] = 'timestamp';
array_shift($this->expectedResults);
parent::setUp();
}
......
......@@ -15,7 +15,6 @@ class FilterFormatTest extends MigrateSqlSourceTestCase {
protected $migrationConfiguration = array(
'id' => 'test',
'highWaterProperty' => array('field' => 'test'),
'source' => array(
'plugin' => 'd6_filter_formats',
),
......
services:
migrate.plugin_event_subscriber:
class: Drupal\migrate\Plugin\PluginEventSubscriber
tags:
- { name: event_subscriber }
cache.migrate:
class: Drupal\Core\Cache\CacheBackendInterface
tags:
......
<?php
namespace Drupal\migrate\Event;
/**
* Interface for plugins that react to pre- or post-import events.
*/
interface ImportAwareInterface {
/**
* Performs pre-import tasks.
*
* @param \Drupal\migrate\Event\MigrateImportEvent $event
* The pre-import event object.
*/
public function preImport(MigrateImportEvent $event);
/**
* Performs post-import tasks.
*
* @param \Drupal\migrate\Event\MigrateImportEvent $event
* The post-import event object.
*/
public function postImport(MigrateImportEvent $event);
}
<?php
namespace Drupal\migrate\Event;
/**
* Interface for plugins that react to pre- or post-rollback events.
*/
interface RollbackAwareInterface {
/**
* Performs pre-rollback tasks.
*
* @param \Drupal\migrate\Event\MigrateRollbackEvent $event
* The pre-rollback event object.
*/
public function preRollback(MigrateRollbackEvent $event);
/**
* Performs post-rollback tasks.
*
* @param \Drupal\migrate\Event\MigrateRollbackEvent $event
* The post-rollback event object.
*/
public function postRollback(MigrateRollbackEvent $event);
}
......@@ -262,9 +262,6 @@ public function import() {
$this->handleException($e);
}
}
if ($high_water_property = $this->migration->getHighWaterProperty()) {
$this->migration->saveHighWater($row->getSourceProperty($high_water_property['name']));
}
// Reset row properties.
unset($sourceValues, $destinationValues);
......@@ -354,10 +351,6 @@ public function rollback() {
break;
}
}
// If rollback completed successfully, reset the high water mark.
if ($return == MigrationInterface::RESULT_COMPLETED) {
$this->migration->saveHighWater(NULL);
}
// Notify modules that rollback attempt was complete.
$this->getEventDispatcher()->dispatch(MigrateEvents::POST_ROLLBACK, new MigrateRollbackEvent($this->migration));
......
<?php
namespace Drupal\migrate\Plugin;
use Drupal\Component\Plugin\PluginInspectionInterface;
use Drupal\migrate\Row;
......
......@@ -125,15 +125,6 @@ class Migration extends PluginBase implements MigrationInterface, RequirementsIn
*/
protected $destinationIds = [];
/**
* Information on the property used as the high watermark.
*
* Array of 'name' & (optional) db 'alias' properties used for high watermark.
*
* @var array
*/
protected $highWaterProperty;
/**
* Indicate whether the primary system of record for this migration is the
* source, or the destination (Drupal). In the source case, migration of
......@@ -154,11 +145,6 @@ class Migration extends PluginBase implements MigrationInterface, RequirementsIn
*/
protected $sourceRowStatus = MigrateIdMapInterface::STATUS_IMPORTED;
/**
* @var \Drupal\Core\KeyValueStore\KeyValueStoreInterface
*/
protected $highWaterStorage;
/**
* Track time of last import if TRUE.
*
......@@ -443,33 +429,6 @@ public function getIdMap() {
return $this->idMapPlugin;
}
/**
* Get the high water storage object.
*
* @return \Drupal\Core\KeyValueStore\KeyValueStoreInterface
* The storage object.
*/
protected function getHighWaterStorage() {
if (!isset($this->highWaterStorage)) {
$this->highWaterStorage = \Drupal::keyValue('migrate:high_water');
}
return $this->highWaterStorage;
}
/**
* {@inheritdoc}
*/
public function getHighWater() {
return $this->getHighWaterStorage()->get($this->id());
}
/**
* {@inheritdoc}
*/
public function saveHighWater($high_water) {
$this->getHighWaterStorage()->set($this->id(), $high_water);
}
/**
* {@inheritdoc}
*/
......@@ -722,13 +681,6 @@ public function getSourceConfiguration() {
return $this->source;
}
/**
* {@inheritdoc}
*/
public function getHighWaterProperty() {
return $this->highWaterProperty;
}
/**
* {@inheritdoc}
*/
......
......@@ -152,26 +152,6 @@ public function getDestinationPlugin($stub_being_requested = FALSE);
*/
public function getIdMap();
/**
* The current value of the high water mark.
*
* The high water mark defines a timestamp stating the time the import was last
* run. If the mark is set, only content with a higher timestamp will be
* imported.
*
* @return int
* A Unix timestamp representing the high water mark.
*/
public function getHighWater();
/**
* Save the new high water mark.
*
* @param int $high_water
* The high water timestamp.
*/
public function saveHighWater($high_water);
/**
* Check if all source rows from this migration have been processed.
*
......@@ -343,18 +323,6 @@ public function getDestinationConfiguration();
*/
public function getSourceConfiguration();
/**
* Get information on the property used as the high watermark.
*
* Array of 'name' & (optional) db 'alias' properties used for high watermark.
*
* @see Drupal\migrate\Plugin\migrate\source\SqlBase::initializeIterator()
*
* @return array
* The property used as the high watermark.
*/
public function getHighWaterProperty();
/**
* If true, track time of last import.
*
......
<?php
namespace Drupal\migrate\Plugin;
use Drupal\migrate\Event\ImportAwareInterface;
use Drupal\migrate\Event\MigrateEvents;
use Drupal\migrate\Event\MigrateImportEvent;
use Drupal\migrate\Event\MigrateRollbackEvent;
use Drupal\migrate\Event\RollbackAwareInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
/**
* Event subscriber to forward Migrate events to source and destination plugins.
*/
class PluginEventSubscriber implements EventSubscriberInterface {
/**
* Tries to invoke event handling methods on source and destination plugins.
*
* @param string $method
* The method to invoke.
* @param \Drupal\migrate\Event\MigrateImportEvent|\Drupal\migrate\Event\MigrateRollbackEvent $event
* The event that has triggered the invocation.
* @param string $plugin_interface
* The interface which plugins must implement in order to be invoked.
*/
protected function invoke($method, $event, $plugin_interface) {
$migration = $event->getMigration();
$source = $migration->getSourcePlugin();
if ($source instanceof $plugin_interface) {
call_user_func([$source, $method], $event);
}
$destination = $migration->getDestinationPlugin();
if ($destination instanceof $plugin_interface) {
call_user_func([$destination, $method], $event);
}
}
/**
* Forwards pre-import events to the source and destination plugins.
*
* @param \Drupal\migrate\Event\MigrateImportEvent $event
* The import event.
*/
public function preImport(MigrateImportEvent $event) {
$this->invoke('preImport', $event, ImportAwareInterface::class);
}
/**
* Forwards post-import events to the source and destination plugins.
*
* @param \Drupal\migrate\Event\MigrateImportEvent $event
* The import event.
*/
public function postImport(MigrateImportEvent $event) {
$this->invoke('postImport', $event, ImportAwareInterface::class);
}
/**
* Forwards pre-rollback events to the source and destination plugins.
*
* @param \Drupal\migrate\Event\MigrateRollbackEvent $event
* The rollback event.
*/
public function preRollback(MigrateRollbackEvent $event) {
$this->invoke('preRollback', $event, RollbackAwareInterface::class);
}
/**
* Forwards post-rollback events to the source and destination plugins.
*
* @param \Drupal\migrate\Event\MigrateRollbackEvent $event
* The rollback event.
*/
public function postRollback(MigrateRollbackEvent $event) {
$this->invoke('postRollback', $event, RollbackAwareInterface::class);
}
/**
* {@inheritdoc}
*/
public static function getSubscribedEvents() {
$events = [];
$events[MigrateEvents::PRE_IMPORT][] = ['preImport'];
$events[MigrateEvents::POST_IMPORT][] = ['postImport'];
$events[MigrateEvents::PRE_ROLLBACK][] = ['preRollback'];
$events[MigrateEvents::POST_ROLLBACK][] = ['postRollback'];
return $events;
}
}
......@@ -3,6 +3,8 @@
namespace Drupal\migrate\Plugin\migrate\source;
use Drupal\Core\Plugin\PluginBase;
use Drupal\migrate\Event\MigrateRollbackEvent;
use Drupal\migrate\Event\RollbackAwareInterface;
use Drupal\migrate\Plugin\MigrationInterface;
use Drupal\migrate\MigrateException;
use Drupal\migrate\MigrateSkipRowException;
......@@ -20,7 +22,7 @@
*
* @ingroup migration
*/
abstract class SourcePluginBase extends PluginBase implements MigrateSourceInterface {
abstract class SourcePluginBase extends PluginBase implements MigrateSourceInterface, RollbackAwareInterface {
/**
* The module handler service.
......@@ -36,15 +38,6 @@ abstract class SourcePluginBase extends PluginBase implements MigrateSourceInter
*/
protected $migration;
/**
* The name and type of the highwater property in the source.
*
* @var array
*
* @see $originalHighwater
*/
protected $highWaterProperty;
/**
* The current row from the query.
*
......@@ -59,10 +52,27 @@ abstract class SourcePluginBase extends PluginBase implements MigrateSourceInter
*/
protected $currentSourceIds;
/**
* Information on the property used as the high-water mark.
*
* Array of 'name' and (optional) db 'alias' properties used for high-water
* mark.
*
* @var array
*/
protected $highWaterProperty = [];
/**
* The key-value storage for the high-water value.
*
* @var \Drupal\Core\KeyValueStore\KeyValueStoreInterface
*/
protected $highWaterStorage;
/**
* The high water mark at the beginning of the import operation.
*
* If the source has a property for tracking changes (like Drupal ha
* If the source has a property for tracking changes (like Drupal has
* node.changed) then this is the highest value of those imported so far.
*
* @var int
......@@ -146,10 +156,11 @@ public function __construct(array $configuration, $plugin_id, $plugin_definition
$this->cacheKey = !empty($configuration['cache_key']) ? $configuration['cache_key'] : NULL;
$this->trackChanges = !empty($configuration['track_changes']) ? $configuration['track_changes'] : FALSE;
$this->idMap = $this->migration->getIdMap();
$this->highWaterProperty = !empty($configuration['high_water_property']) ? $configuration['high_water_property'] : FALSE;
// Pull out the current highwater mark if we have a highwater property.
if ($this->highWaterProperty = $this->migration->getHighWaterProperty()) {
$this->originalHighWater = $this->migration->getHighWater();
if ($this->highWaterProperty) {
$this->originalHighWater = $this->getHighWater();
}
// Don't allow the use of both highwater and track changes together.
......@@ -324,6 +335,10 @@ public function next() {
if (!$row->getIdMap() || $row->needsUpdate() || $this->aboveHighwater($row) || $this->rowChanged($row)) {
$this->currentRow = $row->freezeSource();
}
if ($this->getHighWaterProperty()) {
$this->saveHighWater($row->getSourceProperty($this->highWaterProperty['name']));
}
}
}
......@@ -337,7 +352,7 @@ public function next() {
* TRUE if the highwater value in the row is greater than our current value.
*/
protected function aboveHighwater(Row $row) {
return $this->highWaterProperty && $row->getSourceProperty($this->highWaterProperty['name']) > $this->originalHighWater;
return $this->getHighWaterProperty() && $row->getSourceProperty($this->highWaterProperty['name']) > $this->originalHighWater;
}
/**
......@@ -417,4 +432,90 @@ protected function getCache() {
return $this->cache;
}
/**
* Get the high water storage object.
*
* @return \Drupal\Core\KeyValueStore\KeyValueStoreInterface
* The storage object.
*/
protected function getHighWaterStorage() {
if (!isset($this->highWaterStorage)) {
$this->highWaterStorage = \Drupal::keyValue('migrate:high_water');
}
return $this->highWaterStorage;
}
/**
* The current value of the high water mark.
*
* The high water mark defines a timestamp stating the time the import was last
* run. If the mark is set, only content with a higher timestamp will be
* imported.
*
* @return int|null
* A Unix timestamp representing the high water mark, or NULL if no high
* water mark has been stored.
*/
protected function getHighWater() {
return $this->getHighWaterStorage()->get($this->migration->id());
}
/**
* Save the new high water mark.
*
* @param int $high_water
* The high water timestamp.
*/
protected function saveHighWater($high_water) {
$this->getHighWaterStorage()->set($this->migration->id(), $high_water);
}
/**
* Get information on the property used as the high watermark.
*
* Array of 'name' & (optional) db 'alias' properties used for high watermark.
*
* @see \Drupal\migrate\Plugin\migrate\source\SqlBase::initializeIterator()
*
* @return array
* The property used as the high watermark.
*/
protected function getHighWaterProperty() {
return $this->highWaterProperty;
}
/**
* Get the name of the field used as the high watermark.
*
* The name of the field qualified with an alias if available.
*
* @see \Drupal\migrate\Plugin\migrate\source\SqlBase::initializeIterator()
*
* @return string|null
* The name of the field for the high water mark, or NULL if not set.
*/
protected function getHighWaterField() {
if (!empty($this->highWaterProperty['name'])) {
return !empty($this->highWaterProperty['alias']) ?
$this->highWaterProperty['alias'] . '.' . $this->highWaterProperty['name'] :
$this->highWaterProperty['name'];
}
return NULL;
}
/**
* {@inheritdoc}
*/
public function preRollback(MigrateRollbackEvent $event) {
// Nothing to do in this implementation.
}
/**
* {@inheritdoc}
*/
public function postRollback(MigrateRollbackEvent $event) {
// Reset the high-water mark.
$this->saveHighWater(NULL);
}
}
......@@ -161,7 +161,6 @@ protected function prepareQuery() {
*/
protected function initializeIterator() {
$this->prepareQuery();
$high_water_property = $this->migration->getHighWaterProperty();
// Get the key values, for potential use in joining to the map table.
$keys = array();
......@@ -213,15 +212,10 @@ protected function initializeIterator() {
}
// 2. If we are using high water marks, also include rows above the mark.
// But, include all rows if the high water mark is not set.
if (isset($high_water_property['name']) && ($high_water = $this->migration->getHighWater()) !== '') {
if (isset($high_water_property['alias'])) {
$high_water = $high_water_property['alias'] . '.' . $high_water_property['name'];
}
else {
$high_water = $high_water_property['name'];
}
$conditions->condition($high_water, $high_water, '>');
$condition_added = TRUE;
if ($this->getHighWaterProperty() && ($high_water = $this->getHighWater()) !== '') {
$high_water_field = $this->getHighWaterField();
$conditions->condition($high_water_field, $high_water, '>');
$this->query->orderBy($high_water_field);
}
if ($condition_added) {
$this->query->condition($conditions);
......
langcode: en
status: true
name: High Water import node
type: high_water_import_node
description: ''
help: ''
new_revision: false
preview_mode: 1
display_submitted: true
type: module
name: Migrate SQL Source test
description: 'Provides a database table and records for SQL import testing.'
package: Testing
core: 8.x
dependencies:
- migrate
id: high_water_test
label: High water test.
source:
plugin: high_water_test
high_water_property:
name: changed
destination:
plugin: entity:node
migration_tags:
test: test
process:
changed: changed
title: title
type:
plugin: default_value
default_value: high_water_import_node
<?php
namespace Drupal\migrate_sql_test\Plugin\migrate\source;
use Drupal\migrate\Plugin\migrate\source\SqlBase;
/**
* Source plugin for migration high water tests.
*
* @MigrateSource(
* id = "high_water_test"
* )
*/
class HighWaterTest extends SqlBase {
/**
* {@inheritdoc}
*/
public function query() {
$query = $this
->select('high_water_node', 'm')
->fields('m', array_keys($this->fields()));
return $query;
}
/**
* {@inheritdoc}
*/
public function fields() {
$fields = [
'id' => $this->t('Id'),
'title' => $this->t('Title'),
'changed' => $this->t('Changed'),
];
return $fields;
}
/**
* {@inheritdoc}
*/
public function getIds() {
return [
'id' => [
'type' => 'integer',
],
];
}
}
<?php
namespace Drupal\Tests\migrate\Kernel;
/**
* Tests migration high water property.
*
* @group migrate
*/
class HighWaterTest extends MigrateTestBase {
/**
* {@inheritdoc}
*/
public static $modules = [
'system',
'user',
'node',
'migrate',
'migrate_sql_test',
'field',
];
/**
* {@inheritdoc}
*/
protected function setUp() {
parent::setUp();
// Create source test table.
$this->sourceDatabase->schema()->createTable('high_water_node', [
'fields' => [
'id' => [