diff --git a/core/modules/comment/tests/src/Unit/Migrate/d6/CommentSourceWithHighWaterTest.php b/core/modules/comment/tests/src/Unit/Migrate/d6/CommentSourceWithHighWaterTest.php
index 0908adfa1aa94ecf0104535eca5b692e7ae59376..b2cbaa7bb6909b8154c22d3f5ddd8ecb878a8a3b 100644
--- a/core/modules/comment/tests/src/Unit/Migrate/d6/CommentSourceWithHighWaterTest.php
+++ b/core/modules/comment/tests/src/Unit/Migrate/d6/CommentSourceWithHighWaterTest.php
@@ -15,7 +15,7 @@ class CommentSourceWithHighWaterTest extends CommentTestBase {
    * {@inheritdoc}
    */
   protected function setUp() {
-    $this->migrationConfiguration['highWaterProperty']['field'] = 'timestamp';
+    $this->migrationConfiguration['source']['high_water_property']['name'] = 'timestamp';
     array_shift($this->expectedResults);
     parent::setUp();
   }
diff --git a/core/modules/filter/tests/src/Unit/Plugin/migrate/source/d6/FilterFormatTest.php b/core/modules/filter/tests/src/Unit/Plugin/migrate/source/d6/FilterFormatTest.php
index 018c0332307cad180891fab6a9a406c0a2a433f4..6b28613215e923ddfa0e8da00908553898dc0fe4 100644
--- a/core/modules/filter/tests/src/Unit/Plugin/migrate/source/d6/FilterFormatTest.php
+++ b/core/modules/filter/tests/src/Unit/Plugin/migrate/source/d6/FilterFormatTest.php
@@ -15,7 +15,6 @@ class FilterFormatTest extends MigrateSqlSourceTestCase {
 
   protected $migrationConfiguration = array(
     'id' => 'test',
-    'highWaterProperty' => array('field' => 'test'),
     'source' => array(
       'plugin' => 'd6_filter_formats',
     ),
diff --git a/core/modules/migrate/migrate.services.yml b/core/modules/migrate/migrate.services.yml
index c9a645701995305245aeece6f35ba6da219356b3..d00cb4f2245aa3cc76cc8f13d6c199396dd03bef 100644
--- a/core/modules/migrate/migrate.services.yml
+++ b/core/modules/migrate/migrate.services.yml
@@ -1,4 +1,8 @@
 services:
+  migrate.plugin_event_subscriber:
+    class: Drupal\migrate\Plugin\PluginEventSubscriber
+    tags:
+      - { name: event_subscriber }
   cache.migrate:
     class: Drupal\Core\Cache\CacheBackendInterface
     tags:
diff --git a/core/modules/migrate/src/Event/ImportAwareInterface.php b/core/modules/migrate/src/Event/ImportAwareInterface.php
new file mode 100644
index 0000000000000000000000000000000000000000..7e2936fdcca3ba072cca26751915fb2ee76af573
--- /dev/null
+++ b/core/modules/migrate/src/Event/ImportAwareInterface.php
@@ -0,0 +1,26 @@
+<?php
+
+namespace Drupal\migrate\Event;
+
+/**
+ * Interface for plugins that react to pre- or post-import events.
+ */
+interface ImportAwareInterface {
+
+  /**
+   * Performs pre-import tasks.
+   *
+   * @param \Drupal\migrate\Event\MigrateImportEvent $event
+   *   The pre-import event object.
+   */
+  public function preImport(MigrateImportEvent $event);
+
+  /**
+   * Performs post-import tasks.
+   *
+   * @param \Drupal\migrate\Event\MigrateImportEvent $event
+   *   The post-import event object.
+   */
+  public function postImport(MigrateImportEvent $event);
+
+}
diff --git a/core/modules/migrate/src/Event/RollbackAwareInterface.php b/core/modules/migrate/src/Event/RollbackAwareInterface.php
new file mode 100644
index 0000000000000000000000000000000000000000..bbc8fe6c06ffe399f2c10215d37409bf7a319620
--- /dev/null
+++ b/core/modules/migrate/src/Event/RollbackAwareInterface.php
@@ -0,0 +1,26 @@
+<?php
+
+namespace Drupal\migrate\Event;
+
+/**
+ * Interface for plugins that react to pre- or post-rollback events.
+ */
+interface RollbackAwareInterface {
+
+  /**
+   * Performs pre-rollback tasks.
+   *
+   * @param \Drupal\migrate\Event\MigrateRollbackEvent $event
+   *   The pre-rollback event object.
+   */
+  public function preRollback(MigrateRollbackEvent $event);
+
+  /**
+   * Performs post-rollback tasks.
+   *
+   * @param \Drupal\migrate\Event\MigrateRollbackEvent $event
+   *   The post-rollback event object.
+   */
+  public function postRollback(MigrateRollbackEvent $event);
+
+}
diff --git a/core/modules/migrate/src/MigrateExecutable.php b/core/modules/migrate/src/MigrateExecutable.php
index 50bf0181c96a5821f036f773014497f0d753b471..c51b278ad091c9220b605072f7991af1bdd67bb1 100644
--- a/core/modules/migrate/src/MigrateExecutable.php
+++ b/core/modules/migrate/src/MigrateExecutable.php
@@ -262,9 +262,6 @@ public function import() {
           $this->handleException($e);
         }
       }
-      if ($high_water_property = $this->migration->getHighWaterProperty()) {
-        $this->migration->saveHighWater($row->getSourceProperty($high_water_property['name']));
-      }
 
       // Reset row properties.
       unset($sourceValues, $destinationValues);
@@ -354,10 +351,6 @@ public function rollback() {
         break;
       }
     }
-    // If rollback completed successfully, reset the high water mark.
-    if ($return == MigrationInterface::RESULT_COMPLETED) {
-      $this->migration->saveHighWater(NULL);
-    }
 
     // Notify modules that rollback attempt was complete.
     $this->getEventDispatcher()->dispatch(MigrateEvents::POST_ROLLBACK, new MigrateRollbackEvent($this->migration));
diff --git a/core/modules/migrate/src/Plugin/MigrateSourceInterface.php b/core/modules/migrate/src/Plugin/MigrateSourceInterface.php
index 04c74cb5a692a7c5d5f6fb8fddd836549454473d..32d2b983bd8c8ce356e50fae1f137c8fcf78bd10 100644
--- a/core/modules/migrate/src/Plugin/MigrateSourceInterface.php
+++ b/core/modules/migrate/src/Plugin/MigrateSourceInterface.php
@@ -1,6 +1,7 @@
 <?php
 
 namespace Drupal\migrate\Plugin;
+
 use Drupal\Component\Plugin\PluginInspectionInterface;
 use Drupal\migrate\Row;
 
diff --git a/core/modules/migrate/src/Plugin/Migration.php b/core/modules/migrate/src/Plugin/Migration.php
index 03aefc70d32d12a34f5dc3d64cafc76c1bc6fa50..2de870e1739a505b38a59dc68f939355942af85b 100644
--- a/core/modules/migrate/src/Plugin/Migration.php
+++ b/core/modules/migrate/src/Plugin/Migration.php
@@ -125,15 +125,6 @@ class Migration extends PluginBase implements MigrationInterface, RequirementsIn
    */
   protected $destinationIds = [];
 
-  /**
-   * Information on the property used as the high watermark.
-   *
-   * Array of 'name' & (optional) db 'alias' properties used for high watermark.
-   *
-   * @var array
-   */
-  protected $highWaterProperty;
-
   /**
    * Indicate whether the primary system of record for this migration is the
    * source, or the destination (Drupal). In the source case, migration of
@@ -154,11 +145,6 @@ class Migration extends PluginBase implements MigrationInterface, RequirementsIn
    */
   protected $sourceRowStatus = MigrateIdMapInterface::STATUS_IMPORTED;
 
-  /**
-   * @var \Drupal\Core\KeyValueStore\KeyValueStoreInterface
-   */
-  protected $highWaterStorage;
-
   /**
    * Track time of last import if TRUE.
    *
@@ -443,33 +429,6 @@ public function getIdMap() {
     return $this->idMapPlugin;
   }
 
-  /**
-   * Get the high water storage object.
-   *
-   * @return \Drupal\Core\KeyValueStore\KeyValueStoreInterface
-   *   The storage object.
-   */
-  protected function getHighWaterStorage() {
-    if (!isset($this->highWaterStorage)) {
-      $this->highWaterStorage = \Drupal::keyValue('migrate:high_water');
-    }
-    return $this->highWaterStorage;
-  }
-
-  /**
-   * {@inheritdoc}
-   */
-  public function getHighWater() {
-    return $this->getHighWaterStorage()->get($this->id());
-  }
-
-  /**
-   * {@inheritdoc}
-   */
-  public function saveHighWater($high_water) {
-    $this->getHighWaterStorage()->set($this->id(), $high_water);
-  }
-
   /**
    * {@inheritdoc}
    */
@@ -722,13 +681,6 @@ public function getSourceConfiguration() {
     return $this->source;
   }
 
-  /**
-   * {@inheritdoc}
-   */
-  public function getHighWaterProperty() {
-    return $this->highWaterProperty;
-  }
-
   /**
    * {@inheritdoc}
    */
diff --git a/core/modules/migrate/src/Plugin/MigrationInterface.php b/core/modules/migrate/src/Plugin/MigrationInterface.php
index 18d2c666302fedb35c8d4a7fed95222fd56ef653..cf9ab2535ecd889aa93d90b1472961255217f8de 100644
--- a/core/modules/migrate/src/Plugin/MigrationInterface.php
+++ b/core/modules/migrate/src/Plugin/MigrationInterface.php
@@ -152,26 +152,6 @@ public function getDestinationPlugin($stub_being_requested = FALSE);
    */
   public function getIdMap();
 
-  /**
-   * The current value of the high water mark.
-   *
-   * The high water mark defines a timestamp stating the time the import was last
-   * run. If the mark is set, only content with a higher timestamp will be
-   * imported.
-   *
-   * @return int
-   *   A Unix timestamp representing the high water mark.
-   */
-  public function getHighWater();
-
-  /**
-   * Save the new high water mark.
-   *
-   * @param int $high_water
-   *   The high water timestamp.
-   */
-  public function saveHighWater($high_water);
-
   /**
    * Check if all source rows from this migration have been processed.
    *
@@ -343,18 +323,6 @@ public function getDestinationConfiguration();
    */
   public function getSourceConfiguration();
 
-  /**
-   * Get information on the property used as the high watermark.
-   *
-   * Array of 'name' & (optional) db 'alias' properties used for high watermark.
-   *
-   * @see Drupal\migrate\Plugin\migrate\source\SqlBase::initializeIterator()
-   *
-   * @return array
-   *   The property used as the high watermark.
-   */
-  public function getHighWaterProperty();
-
   /**
    * If true, track time of last import.
    *
diff --git a/core/modules/migrate/src/Plugin/PluginEventSubscriber.php b/core/modules/migrate/src/Plugin/PluginEventSubscriber.php
new file mode 100644
index 0000000000000000000000000000000000000000..d503638269be0d9fee8e41a722dc0d5f4beef1c9
--- /dev/null
+++ b/core/modules/migrate/src/Plugin/PluginEventSubscriber.php
@@ -0,0 +1,94 @@
+<?php
+
+namespace Drupal\migrate\Plugin;
+
+use Drupal\migrate\Event\ImportAwareInterface;
+use Drupal\migrate\Event\MigrateEvents;
+use Drupal\migrate\Event\MigrateImportEvent;
+use Drupal\migrate\Event\MigrateRollbackEvent;
+use Drupal\migrate\Event\RollbackAwareInterface;
+use Symfony\Component\EventDispatcher\EventSubscriberInterface;
+
+/**
+ * Event subscriber to forward Migrate events to source and destination plugins.
+ */
+class PluginEventSubscriber implements EventSubscriberInterface {
+
+  /**
+   * Tries to invoke event handling methods on source and destination plugins.
+   *
+   * @param string $method
+   *   The method to invoke.
+   * @param \Drupal\migrate\Event\MigrateImportEvent|\Drupal\migrate\Event\MigrateRollbackEvent $event
+   *   The event that has triggered the invocation.
+   * @param string $plugin_interface
+   *   The interface which plugins must implement in order to be invoked.
+   */
+  protected function invoke($method, $event, $plugin_interface) {
+    $migration = $event->getMigration();
+
+    $source = $migration->getSourcePlugin();
+    if ($source instanceof $plugin_interface) {
+      call_user_func([$source, $method], $event);
+    }
+
+    $destination = $migration->getDestinationPlugin();
+    if ($destination instanceof $plugin_interface) {
+      call_user_func([$destination, $method], $event);
+    }
+  }
+
+  /**
+   * Forwards pre-import events to the source and destination plugins.
+   *
+   * @param \Drupal\migrate\Event\MigrateImportEvent $event
+   *   The import event.
+   */
+  public function preImport(MigrateImportEvent $event) {
+    $this->invoke('preImport', $event, ImportAwareInterface::class);
+  }
+
+  /**
+   * Forwards post-import events to the source and destination plugins.
+   *
+   * @param \Drupal\migrate\Event\MigrateImportEvent $event
+   *   The import event.
+   */
+  public function postImport(MigrateImportEvent $event) {
+    $this->invoke('postImport', $event, ImportAwareInterface::class);
+  }
+
+  /**
+   * Forwards pre-rollback events to the source and destination plugins.
+   *
+   * @param \Drupal\migrate\Event\MigrateRollbackEvent $event
+   *   The rollback event.
+   */
+  public function preRollback(MigrateRollbackEvent $event) {
+    $this->invoke('preRollback', $event, RollbackAwareInterface::class);
+  }
+
+  /**
+   * Forwards post-rollback events to the source and destination plugins.
+   *
+   * @param \Drupal\migrate\Event\MigrateRollbackEvent $event
+   *   The rollback event.
+   */
+  public function postRollback(MigrateRollbackEvent $event) {
+    $this->invoke('postRollback', $event, RollbackAwareInterface::class);
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public static function getSubscribedEvents() {
+    $events = [];
+    $events[MigrateEvents::PRE_IMPORT][] = ['preImport'];
+    $events[MigrateEvents::POST_IMPORT][] = ['postImport'];
+    $events[MigrateEvents::PRE_ROLLBACK][] = ['preRollback'];
+    $events[MigrateEvents::POST_ROLLBACK][] = ['postRollback'];
+
+    return $events;
+  }
+
+}
diff --git a/core/modules/migrate/src/Plugin/migrate/source/SourcePluginBase.php b/core/modules/migrate/src/Plugin/migrate/source/SourcePluginBase.php
index 6df555e4604dc6cb1adb6ec40e67e5f0aac39f2a..f393dc0223e5aa36010b2046ed6d834579631ac6 100644
--- a/core/modules/migrate/src/Plugin/migrate/source/SourcePluginBase.php
+++ b/core/modules/migrate/src/Plugin/migrate/source/SourcePluginBase.php
@@ -3,6 +3,8 @@
 namespace Drupal\migrate\Plugin\migrate\source;
 
 use Drupal\Core\Plugin\PluginBase;
+use Drupal\migrate\Event\MigrateRollbackEvent;
+use Drupal\migrate\Event\RollbackAwareInterface;
 use Drupal\migrate\Plugin\MigrationInterface;
 use Drupal\migrate\MigrateException;
 use Drupal\migrate\MigrateSkipRowException;
@@ -20,7 +22,7 @@
  *
  * @ingroup migration
  */
-abstract class SourcePluginBase extends PluginBase implements MigrateSourceInterface {
+abstract class SourcePluginBase extends PluginBase implements MigrateSourceInterface, RollbackAwareInterface {
 
   /**
    * The module handler service.
@@ -36,15 +38,6 @@ abstract class SourcePluginBase extends PluginBase implements MigrateSourceInter
    */
   protected $migration;
 
-  /**
-   * The name and type of the highwater property in the source.
-   *
-   * @var array
-   *
-   * @see $originalHighwater
-   */
-  protected $highWaterProperty;
-
   /**
    * The current row from the query.
    *
@@ -59,10 +52,27 @@ abstract class SourcePluginBase extends PluginBase implements MigrateSourceInter
    */
   protected $currentSourceIds;
 
+  /**
+   * Information on the property used as the high-water mark.
+   *
+   * Array of 'name' and (optional) db 'alias' properties used for high-water
+   * mark.
+   *
+   * @var array
+   */
+  protected $highWaterProperty = [];
+
+  /**
+   * The key-value storage for the high-water value.
+   *
+   * @var \Drupal\Core\KeyValueStore\KeyValueStoreInterface
+   */
+  protected $highWaterStorage;
+
   /**
    * The high water mark at the beginning of the import operation.
    *
-   * If the source has a property for tracking changes (like Drupal ha
+   * If the source has a property for tracking changes (like Drupal has
    * node.changed) then this is the highest value of those imported so far.
    *
    * @var int
@@ -146,10 +156,11 @@ public function __construct(array $configuration, $plugin_id, $plugin_definition
     $this->cacheKey = !empty($configuration['cache_key']) ? $configuration['cache_key'] : NULL;
     $this->trackChanges = !empty($configuration['track_changes']) ? $configuration['track_changes'] : FALSE;
     $this->idMap = $this->migration->getIdMap();
+    $this->highWaterProperty = !empty($configuration['high_water_property']) ? $configuration['high_water_property'] : FALSE;
 
     // Pull out the current highwater mark if we have a highwater property.
-    if ($this->highWaterProperty = $this->migration->getHighWaterProperty()) {
-      $this->originalHighWater = $this->migration->getHighWater();
+    if ($this->highWaterProperty) {
+      $this->originalHighWater = $this->getHighWater();
     }
 
     // Don't allow the use of both highwater and track changes together.
@@ -324,6 +335,10 @@ public function next() {
       if (!$row->getIdMap() || $row->needsUpdate() || $this->aboveHighwater($row) || $this->rowChanged($row)) {
         $this->currentRow = $row->freezeSource();
       }
+
+      if ($this->getHighWaterProperty()) {
+        $this->saveHighWater($row->getSourceProperty($this->highWaterProperty['name']));
+      }
     }
   }
 
@@ -337,7 +352,7 @@ public function next() {
    *   TRUE if the highwater value in the row is greater than our current value.
    */
   protected function aboveHighwater(Row $row) {
-    return $this->highWaterProperty && $row->getSourceProperty($this->highWaterProperty['name']) > $this->originalHighWater;
+    return $this->getHighWaterProperty() && $row->getSourceProperty($this->highWaterProperty['name']) > $this->originalHighWater;
   }
 
   /**
@@ -417,4 +432,90 @@ protected function getCache() {
     return $this->cache;
   }
 
+  /**
+   * Get the high water storage object.
+   *
+   * @return \Drupal\Core\KeyValueStore\KeyValueStoreInterface
+   *   The storage object.
+   */
+  protected function getHighWaterStorage() {
+    if (!isset($this->highWaterStorage)) {
+      $this->highWaterStorage = \Drupal::keyValue('migrate:high_water');
+    }
+    return $this->highWaterStorage;
+  }
+
+  /**
+   * The current value of the high water mark.
+   *
+   * The high water mark defines a timestamp stating the time the import was last
+   * run. If the mark is set, only content with a higher timestamp will be
+   * imported.
+   *
+   * @return int|null
+   *   A Unix timestamp representing the high water mark, or NULL if no high
+   *   water mark has been stored.
+   */
+  protected function getHighWater() {
+    return $this->getHighWaterStorage()->get($this->migration->id());
+  }
+
+  /**
+   * Save the new high water mark.
+   *
+   * @param int $high_water
+   *   The high water timestamp.
+   */
+  protected function saveHighWater($high_water) {
+    $this->getHighWaterStorage()->set($this->migration->id(), $high_water);
+  }
+
+  /**
+   * Get information on the property used as the high watermark.
+   *
+   * Array of 'name' & (optional) db 'alias' properties used for high watermark.
+   *
+   * @see \Drupal\migrate\Plugin\migrate\source\SqlBase::initializeIterator()
+   *
+   * @return array
+   *   The property used as the high watermark.
+   */
+  protected function getHighWaterProperty() {
+    return $this->highWaterProperty;
+  }
+
+  /**
+   * Get the name of the field used as the high watermark.
+   *
+   * The name of the field qualified with an alias if available.
+   *
+   * @see \Drupal\migrate\Plugin\migrate\source\SqlBase::initializeIterator()
+   *
+   * @return string|null
+   *   The name of the field for the high water mark, or NULL if not set.
+   */
+  protected function getHighWaterField() {
+    if (!empty($this->highWaterProperty['name'])) {
+      return !empty($this->highWaterProperty['alias']) ?
+        $this->highWaterProperty['alias'] . '.' . $this->highWaterProperty['name'] :
+        $this->highWaterProperty['name'];
+    }
+    return NULL;
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function preRollback(MigrateRollbackEvent $event) {
+    // Nothing to do in this implementation.
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function postRollback(MigrateRollbackEvent $event) {
+    // Reset the high-water mark.
+    $this->saveHighWater(NULL);
+  }
+
 }
diff --git a/core/modules/migrate/src/Plugin/migrate/source/SqlBase.php b/core/modules/migrate/src/Plugin/migrate/source/SqlBase.php
index 14aa1583314bc2eec9a980ed741b6f2482f40f9d..0eac141ec171a9e5fb7c5276ac2041a620793ca0 100644
--- a/core/modules/migrate/src/Plugin/migrate/source/SqlBase.php
+++ b/core/modules/migrate/src/Plugin/migrate/source/SqlBase.php
@@ -161,7 +161,6 @@ protected function prepareQuery() {
    */
   protected function initializeIterator() {
     $this->prepareQuery();
-    $high_water_property = $this->migration->getHighWaterProperty();
 
     // Get the key values, for potential use in joining to the map table.
     $keys = array();
@@ -213,15 +212,10 @@ protected function initializeIterator() {
     }
     // 2. If we are using high water marks, also include rows above the mark.
     //    But, include all rows if the high water mark is not set.
-    if (isset($high_water_property['name']) && ($high_water = $this->migration->getHighWater()) !== '') {
-      if (isset($high_water_property['alias'])) {
-        $high_water = $high_water_property['alias'] . '.' . $high_water_property['name'];
-      }
-      else {
-        $high_water = $high_water_property['name'];
-      }
-      $conditions->condition($high_water, $high_water, '>');
-      $condition_added = TRUE;
+    if ($this->getHighWaterProperty() && ($high_water = $this->getHighWater()) !== '') {
+      $high_water_field = $this->getHighWaterField();
+      $conditions->condition($high_water_field, $high_water, '>');
+      $this->query->orderBy($high_water_field);
     }
     if ($condition_added) {
       $this->query->condition($conditions);
diff --git a/core/modules/migrate/tests/modules/migrate_high_water_test/config/install/node.type.sql_import_node.yml b/core/modules/migrate/tests/modules/migrate_high_water_test/config/install/node.type.sql_import_node.yml
new file mode 100644
index 0000000000000000000000000000000000000000..55c0888e6cf46a4cb635a10891232d7a1f1c19f0
--- /dev/null
+++ b/core/modules/migrate/tests/modules/migrate_high_water_test/config/install/node.type.sql_import_node.yml
@@ -0,0 +1,9 @@
+langcode: en
+status: true
+name: High Water import node
+type: high_water_import_node
+description: ''
+help: ''
+new_revision: false
+preview_mode: 1
+display_submitted: true
diff --git a/core/modules/migrate/tests/modules/migrate_high_water_test/migrate_sql_test.info.yml b/core/modules/migrate/tests/modules/migrate_high_water_test/migrate_sql_test.info.yml
new file mode 100644
index 0000000000000000000000000000000000000000..23cd53c6b7ab8fa73dd2a4bb05e8ebf200baf87f
--- /dev/null
+++ b/core/modules/migrate/tests/modules/migrate_high_water_test/migrate_sql_test.info.yml
@@ -0,0 +1,7 @@
+type: module
+name: Migrate SQL Source test
+description: 'Provides a database table and records for SQL import testing.'
+package: Testing
+core: 8.x
+dependencies:
+  - migrate
diff --git a/core/modules/migrate/tests/modules/migrate_high_water_test/migration_templates/migrate.migration.high_water_test.yml b/core/modules/migrate/tests/modules/migrate_high_water_test/migration_templates/migrate.migration.high_water_test.yml
new file mode 100644
index 0000000000000000000000000000000000000000..6a86577acb10ef0bdd6c8a8235094b64d6151b36
--- /dev/null
+++ b/core/modules/migrate/tests/modules/migrate_high_water_test/migration_templates/migrate.migration.high_water_test.yml
@@ -0,0 +1,16 @@
+id: high_water_test
+label: High water test.
+source:
+  plugin: high_water_test
+  high_water_property:
+    name: changed
+destination:
+  plugin: entity:node
+migration_tags:
+  test: test
+process:
+  changed: changed
+  title: title
+  type:
+    plugin: default_value
+    default_value: high_water_import_node
diff --git a/core/modules/migrate/tests/modules/migrate_high_water_test/src/Plugin/migrate/source/HighWaterTest.php b/core/modules/migrate/tests/modules/migrate_high_water_test/src/Plugin/migrate/source/HighWaterTest.php
new file mode 100644
index 0000000000000000000000000000000000000000..c4af3362b636588ba29b7774295259d910c74292
--- /dev/null
+++ b/core/modules/migrate/tests/modules/migrate_high_water_test/src/Plugin/migrate/source/HighWaterTest.php
@@ -0,0 +1,50 @@
+<?php
+
+namespace Drupal\migrate_sql_test\Plugin\migrate\source;
+
+use Drupal\migrate\Plugin\migrate\source\SqlBase;
+
+/**
+ * Source plugin for migration high water tests.
+ *
+ * @MigrateSource(
+ *   id = "high_water_test"
+ * )
+ */
+class HighWaterTest extends SqlBase {
+
+  /**
+   * {@inheritdoc}
+   */
+  public function query() {
+    $query = $this
+      ->select('high_water_node', 'm')
+      ->fields('m', array_keys($this->fields()));
+    return $query;
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function fields() {
+    $fields = [
+      'id' => $this->t('Id'),
+      'title' => $this->t('Title'),
+      'changed' => $this->t('Changed'),
+    ];
+
+    return $fields;
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function getIds() {
+    return [
+      'id' => [
+        'type' => 'integer',
+      ],
+    ];
+  }
+
+}
diff --git a/core/modules/migrate/tests/src/Kernel/HighWaterTest.php b/core/modules/migrate/tests/src/Kernel/HighWaterTest.php
new file mode 100644
index 0000000000000000000000000000000000000000..0b6d3688430a2001f8a63235ba6f89b9369a0256
--- /dev/null
+++ b/core/modules/migrate/tests/src/Kernel/HighWaterTest.php
@@ -0,0 +1,177 @@
+<?php
+
+namespace Drupal\Tests\migrate\Kernel;
+
+/**
+ * Tests migration high water property.
+ *
+ * @group migrate
+ */
+class HighWaterTest extends MigrateTestBase {
+
+  /**
+   * {@inheritdoc}
+   */
+  public static $modules = [
+    'system',
+    'user',
+    'node',
+    'migrate',
+    'migrate_sql_test',
+    'field',
+  ];
+
+  /**
+   * {@inheritdoc}
+   */
+  protected function setUp() {
+    parent::setUp();
+    // Create source test table.
+    $this->sourceDatabase->schema()->createTable('high_water_node', [
+      'fields' => [
+        'id' => [
+          'description' => 'Serial',
+          'type' => 'serial',
+          'unsigned' => TRUE,
+          'not null' => TRUE,
+        ],
+        'changed' => [
+          'description' => 'Highwater',
+          'type' => 'int',
+          'unsigned' => TRUE,
+        ],
+        'title' => [
+          'description' => 'Title',
+          'type' => 'varchar',
+          'length' => 128,
+          'not null' => TRUE,
+          'default' => '',
+        ],
+      ],
+      'primary key' => [
+        'id',
+      ],
+      'description' => 'Contains nodes to import',
+    ]);
+
+    // Add 3 items to source table.
+    $this->sourceDatabase->insert('high_water_node')
+      ->fields([
+        'title',
+        'changed',
+      ])
+      ->values([
+        'title' => 'Item 1',
+        'changed' => 1,
+      ])
+      ->values([
+        'title' => 'Item 2',
+        'changed' => 2,
+      ])
+      ->values([
+        'title' => 'Item 3',
+        'changed' => 3,
+      ])
+      ->execute();
+
+    $this->installEntitySchema('node');
+    $this->installEntitySchema('user');
+    $this->installSchema('node', 'node_access');
+
+    $this->executeMigration('high_water_test');
+  }
+
+  /**
+   * Tests high water property of SqlBase.
+   */
+  public function testHighWater() {
+    // Assert all of the nodes have been imported.
+    $this->assertNodeExists('Item 1');
+    $this->assertNodeExists('Item 2');
+    $this->assertNodeExists('Item 3');
+
+    // Update Item 1 setting its high_water_property to value that is below
+    // current high water mark.
+    $this->sourceDatabase->update('high_water_node')
+      ->fields([
+        'title' => 'Item 1 updated',
+        'changed' => 2,
+      ])
+      ->condition('title', 'Item 1')
+      ->execute();
+
+    // Update Item 2 setting its high_water_property to value equal to
+    // current high water mark.
+    $this->sourceDatabase->update('high_water_node')
+      ->fields([
+        'title' => 'Item 2 updated',
+        'changed' => 3,
+      ])
+      ->condition('title', 'Item 2')
+      ->execute();
+
+    // Update Item 3 setting its high_water_property to value that is above
+    // current high water mark.
+    $this->sourceDatabase->update('high_water_node')
+      ->fields([
+        'title' => 'Item 3 updated',
+        'changed' => 4,
+      ])
+      ->condition('title', 'Item 3')
+      ->execute();
+
+    // Execute migration again.
+    $this->executeMigration('high_water_test');
+
+    // Item with lower highwater should not be updated.
+    $this->assertNodeExists('Item 1');
+    $this->assertNodeDoesNotExist('Item 1 updated');
+
+    // Item with equal highwater should not be updated.
+    $this->assertNodeExists('Item 2');
+    $this->assertNodeDoesNotExist('Item 2 updated');
+
+    // Item with greater highwater should be updated.
+    $this->assertNodeExists('Item 3 updated');
+    $this->assertNodeDoesNotExist('Item 3');
+  }
+
+  /**
+   * Assert that node with given title exists.
+   *
+   * @param string $title
+   *   Title of the node.
+   */
+  protected function assertNodeExists($title) {
+    self::assertTrue($this->nodeExists($title));
+  }
+
+  /**
+   * Assert that node with given title does not exist.
+   *
+   * @param string $title
+   *   Title of the node.
+   */
+  protected function assertNodeDoesNotExist($title) {
+    self::assertFalse($this->nodeExists($title));
+  }
+
+  /**
+   * Checks if node with given title exists.
+   *
+   * @param string $title
+   *   Title of the node.
+   *
+   * @return bool
+   */
+  protected function nodeExists($title) {
+    $query = \Drupal::entityQuery('node');
+    $result = $query
+      ->condition('title', $title)
+      ->range(0, 1)
+      ->execute();
+
+    return !empty($result);
+  }
+
+}
diff --git a/core/modules/migrate/tests/src/Unit/MigrateExecutableTest.php b/core/modules/migrate/tests/src/Unit/MigrateExecutableTest.php
index b253d070999e6e5b6e33f501e26831dafaace8e8..84c97f2a262c19d306e47add241be3bdfdab0eba 100644
--- a/core/modules/migrate/tests/src/Unit/MigrateExecutableTest.php
+++ b/core/modules/migrate/tests/src/Unit/MigrateExecutableTest.php
@@ -114,9 +114,9 @@ public function testImportWithValidRow() {
       ->with($row, array('test'))
       ->will($this->returnValue(array('id' => 'test')));
 
-    $this->migration->expects($this->once())
+    $this->migration
       ->method('getDestinationPlugin')
-      ->will($this->returnValue($destination));
+      ->willReturn($destination);
 
     $this->assertSame(MigrationInterface::RESULT_COMPLETED, $this->executable->import());
   }
@@ -156,9 +156,9 @@ public function testImportWithValidRowWithoutDestinationId() {
       ->with($row, array('test'))
       ->will($this->returnValue(TRUE));
 
-    $this->migration->expects($this->once())
+    $this->migration
       ->method('getDestinationPlugin')
-      ->will($this->returnValue($destination));
+      ->willReturn($destination);
 
     $this->idMap->expects($this->never())
       ->method('saveIdMapping');
@@ -196,9 +196,9 @@ public function testImportWithValidRowNoDestinationValues() {
       ->with($row, array('test'))
       ->will($this->returnValue(array()));
 
-    $this->migration->expects($this->once())
+    $this->migration
       ->method('getDestinationPlugin')
-      ->will($this->returnValue($destination));
+      ->willReturn($destination);
 
     $this->idMap->expects($this->once())
       ->method('saveIdMapping')
@@ -256,9 +256,9 @@ public function testImportWithValidRowWithDestinationMigrateException() {
       ->with($row, array('test'))
       ->will($this->throwException(new MigrateException($exception_message)));
 
-    $this->migration->expects($this->once())
+    $this->migration
       ->method('getDestinationPlugin')
-      ->will($this->returnValue($destination));
+      ->willReturn($destination);
 
     $this->idMap->expects($this->once())
       ->method('saveIdMapping')
@@ -306,7 +306,7 @@ public function testImportWithValidRowWithProcesMigrateException() {
     $destination->expects($this->never())
       ->method('import');
 
-    $this->migration->expects($this->once())
+    $this->migration
       ->method('getDestinationPlugin')
       ->willReturn($destination);
 
@@ -354,9 +354,9 @@ public function testImportWithValidRowWithException() {
       ->with($row, array('test'))
       ->will($this->throwException(new \Exception($exception_message)));
 
-    $this->migration->expects($this->once())
+    $this->migration
       ->method('getDestinationPlugin')
-      ->will($this->returnValue($destination));
+      ->willReturn($destination);
 
     $this->idMap->expects($this->once())
       ->method('saveIdMapping')
diff --git a/core/modules/migrate/tests/src/Unit/MigrateSourceTest.php b/core/modules/migrate/tests/src/Unit/MigrateSourceTest.php
index 2b7f12f6d1b493e42dd55f7ddd36b20615c6c01d..8039aaa1be471ef389b543b8754533a727b5e5c4 100644
--- a/core/modules/migrate/tests/src/Unit/MigrateSourceTest.php
+++ b/core/modules/migrate/tests/src/Unit/MigrateSourceTest.php
@@ -10,6 +10,8 @@
 use Drupal\Core\Cache\CacheBackendInterface;
 use Drupal\Core\DependencyInjection\ContainerBuilder;
 use Drupal\Core\Extension\ModuleHandlerInterface;
+use Drupal\Core\KeyValueStore\KeyValueFactoryInterface;
+use Drupal\Core\KeyValueStore\KeyValueStoreInterface;
 use Drupal\migrate\MigrateExecutable;
 use Drupal\migrate\MigrateSkipRowException;
 use Drupal\migrate\Plugin\migrate\source\SourcePluginBase;
@@ -75,7 +77,21 @@ class MigrateSourceTest extends MigrateTestCase {
    * @return \Drupal\migrate\Plugin\MigrateSourceInterface
    *   A mocked source plugin.
    */
-  protected function getSource($configuration = [], $migrate_config = [], $status = MigrateIdMapInterface::STATUS_NEEDS_UPDATE) {
+  protected function getSource($configuration = [], $migrate_config = [], $status = MigrateIdMapInterface::STATUS_NEEDS_UPDATE, $high_water_value = NULL) {
+    $container = new ContainerBuilder();
+    \Drupal::setContainer($container);
+
+    $key_value = $this->getMock(KeyValueStoreInterface::class);
+
+    $key_value_factory = $this->getMock(KeyValueFactoryInterface::class);
+    $key_value_factory
+      ->method('get')
+      ->with('migrate:high_water')
+      ->willReturn($key_value);
+    $container->set('keyvalue', $key_value_factory);
+
+    $container->set('cache.migrate', $this->getMock(CacheBackendInterface::class));
+
     $this->migrationConfiguration = $this->defaultMigrationConfiguration + $migrate_config;
     $this->migration = parent::getMigration();
     $this->executable = $this->getMigrateExecutable($this->migration);
@@ -90,47 +106,45 @@ protected function getSource($configuration = [], $migrate_config = [], $status
       ->willReturn($id_map_array);
 
     $constructor_args = [$configuration, 'd6_action', [], $this->migration];
-    $methods = ['getModuleHandler', 'fields', 'getIds', '__toString', 'getIterator', 'prepareRow', 'initializeIterator', 'calculateDependencies'];
-    $source_plugin = $this->getMock('\Drupal\migrate\Plugin\migrate\source\SourcePluginBase', $methods, $constructor_args);
+    $methods = ['getModuleHandler', 'fields', 'getIds', '__toString', 'prepareRow', 'initializeIterator'];
+    $source_plugin = $this->getMock(SourcePluginBase::class, $methods, $constructor_args);
 
     $source_plugin
-      ->expects($this->any())
       ->method('fields')
       ->willReturn([]);
     $source_plugin
-      ->expects($this->any())
       ->method('getIds')
       ->willReturn([]);
     $source_plugin
-      ->expects($this->any())
       ->method('__toString')
       ->willReturn('');
     $source_plugin
-      ->expects($this->any())
       ->method('prepareRow')
       ->willReturn(empty($migrate_config['prepare_row_false']));
+
+    $rows = [$this->row];
+    if (isset($configuration['high_water_property']) && isset($high_water_value)) {
+      $property = $configuration['high_water_property']['name'];
+      $rows = array_filter($rows, function (array $row) use ($property, $high_water_value) {
+        return $row[$property] >= $high_water_value;
+      });
+    }
+    $iterator = new \ArrayIterator($rows);
+
     $source_plugin
-      ->expects($this->any())
       ->method('initializeIterator')
-      ->willReturn([]);
-    $iterator = new \ArrayIterator([$this->row]);
-    $source_plugin
-      ->expects($this->any())
-      ->method('getIterator')
       ->willReturn($iterator);
 
-    $module_handler = $this->getMock('\Drupal\Core\Extension\ModuleHandlerInterface');
+    $module_handler = $this->getMock(ModuleHandlerInterface::class);
     $source_plugin
-      ->expects($this->any())
       ->method('getModuleHandler')
       ->willReturn($module_handler);
 
     $this->migration
-      ->expects($this->any())
       ->method('getSourcePlugin')
       ->willReturn($source_plugin);
 
-    return $this->migration->getSourcePlugin();
+    return $source_plugin;
   }
 
   /**
@@ -138,9 +152,8 @@ protected function getSource($configuration = [], $migrate_config = [], $status
    * @expectedException \Drupal\migrate\MigrateException
    */
   public function testHighwaterTrackChangesIncompatible() {
-    $source_config = ['track_changes' => TRUE];
-    $migration_config = ['highWaterProperty' => ['name' => 'something']];
-    $this->getSource($source_config, $migration_config);
+    $source_config = ['track_changes' => TRUE, 'high_water_property' => ['name' => 'something']];
+    $this->getSource($source_config);
   }
 
   /**
@@ -219,14 +232,12 @@ public function testNextNeedsUpdate() {
    * Test that an outdated highwater mark does not cause a row to be imported.
    */
   public function testOutdatedHighwater() {
-
-    $source = $this->getSource([], [], MigrateIdMapInterface::STATUS_IMPORTED);
-
-    // Set the originalHighwater to something higher than our timestamp.
-    $this->migration
-      ->expects($this->any())
-      ->method('getHighwater')
-      ->willReturn($this->row['timestamp'] + 1);
+    $configuration = [
+      'high_water_property' => [
+        'name' => 'timestamp',
+      ],
+    ];
+    $source = $this->getSource($configuration, [], MigrateIdMapInterface::STATUS_IMPORTED, $this->row['timestamp'] + 1);
 
     // The current highwater mark is now higher than the row timestamp so no row
     // is expected.
@@ -240,13 +251,17 @@ public function testOutdatedHighwater() {
    * @throws \Exception
    */
   public function testNewHighwater() {
-
+    $configuration = [
+      'high_water_property' => [
+        'name' => 'timestamp',
+      ],
+    ];
     // Set a highwater property field for source. Now we should have a row
     // because the row timestamp is greater than the current highwater mark.
-    $source = $this->getSource([], ['highWaterProperty' => ['name' => 'timestamp']], MigrateIdMapInterface::STATUS_IMPORTED);
+    $source = $this->getSource($configuration, [], MigrateIdMapInterface::STATUS_IMPORTED, $this->row['timestamp'] - 1);
 
     $source->rewind();
-    $this->assertTrue(is_a($source->current(), 'Drupal\migrate\Row'), 'Incoming row timestamp is greater than current highwater mark so we have a row.');
+    $this->assertInstanceOf(Row::class, $source->current(), 'Incoming row timestamp is greater than current highwater mark so we have a row.');
   }
 
   /**
diff --git a/core/modules/migrate/tests/src/Unit/MigrateSqlSourceTestCase.php b/core/modules/migrate/tests/src/Unit/MigrateSqlSourceTestCase.php
index 764e14f9d152d4852dc6d640d7a220c9e0e7ff70..d5d641dcfa9ded880b8d102722fcdc483ceea449 100644
--- a/core/modules/migrate/tests/src/Unit/MigrateSqlSourceTestCase.php
+++ b/core/modules/migrate/tests/src/Unit/MigrateSqlSourceTestCase.php
@@ -3,6 +3,10 @@
 namespace Drupal\Tests\migrate\Unit;
 
 use Drupal\Core\Database\Query\SelectInterface;
+use Drupal\Core\DependencyInjection\ContainerBuilder;
+use Drupal\Core\DependencyInjection\ContainerNotInitializedException;
+use Drupal\Core\KeyValueStore\KeyValueFactoryInterface;
+use Drupal\Core\KeyValueStore\KeyValueStoreInterface;
 
 /**
  * Base class for Migrate module source unit tests.
@@ -44,9 +48,9 @@ abstract class MigrateSqlSourceTestCase extends MigrateTestCase {
    * Once the migration is run, we save a mark of the migrated sources, so the
    * migration can run again and update only new sources or changed sources.
    *
-   * @var string
+   * @var mixed
    */
-  const ORIGINAL_HIGH_WATER = '';
+  const ORIGINAL_HIGH_WATER = NULL;
 
   /**
    * Expected results after the source parsing.
@@ -77,6 +81,27 @@ protected function setUp() {
     $state = $this->getMock('Drupal\Core\State\StateInterface');
     $entity_manager = $this->getMock('Drupal\Core\Entity\EntityManagerInterface');
 
+    // Mock a key-value store to return high-water values.
+    $key_value = $this->getMock(KeyValueStoreInterface::class);
+
+    // SourcePluginBase does not yet support full dependency injection so we
+    // need to make sure that \Drupal::keyValue() works as expected by mocking
+    // the keyvalue service.
+    $key_value_factory = $this->getMock(KeyValueFactoryInterface::class);
+    $key_value_factory
+      ->method('get')
+      ->with('migrate:high_water')
+      ->willReturn($key_value);
+
+    try {
+      $container = \Drupal::getContainer();
+    }
+    catch (ContainerNotInitializedException $e) {
+      $container = new ContainerBuilder();
+    }
+    $container->set('keyvalue', $key_value_factory);
+    \Drupal::setContainer($container);
+
     $migration = $this->getMigration();
     $migration->expects($this->any())
       ->method('getHighWater')
diff --git a/core/modules/migrate/tests/src/Unit/MigrateTestCase.php b/core/modules/migrate/tests/src/Unit/MigrateTestCase.php
index b16fef2c85eae534e99fadf59e367168c162c18a..19db21a6ff04c068ea4ef8294eb47551f1bbfa54 100644
--- a/core/modules/migrate/tests/src/Unit/MigrateTestCase.php
+++ b/core/modules/migrate/tests/src/Unit/MigrateTestCase.php
@@ -80,7 +80,7 @@ protected function getMigration() {
 
     $migration->method('getHighWaterProperty')
       ->willReturnCallback(function () use ($configuration) {
-        return isset($configuration['highWaterProperty']) ? $configuration['highWaterProperty'] : '';
+        return isset($configuration['high_water_property']) ? $configuration['high_water_property'] : '';
       });
 
     $migration->method('set')
diff --git a/core/modules/path/tests/src/Unit/Migrate/d6/UrlAliasTest.php b/core/modules/path/tests/src/Unit/Migrate/d6/UrlAliasTest.php
index c5d28f5eaab053d3ad416f25c768d05b634a7f42..fa00d60c9825fd0d3116d81218f8267ae4c7ab11 100644
--- a/core/modules/path/tests/src/Unit/Migrate/d6/UrlAliasTest.php
+++ b/core/modules/path/tests/src/Unit/Migrate/d6/UrlAliasTest.php
@@ -15,7 +15,6 @@ class UrlAliasTest extends UrlAliasTestBase {
 
   protected $migrationConfiguration = array(
     'id' => 'test',
-    'highWaterProperty' => array('field' => 'test'),
     'source' => array(
       'plugin' => 'd6_url_alias',
     ),
diff --git a/core/modules/taxonomy/tests/src/Unit/Migrate/TermTestBase.php b/core/modules/taxonomy/tests/src/Unit/Migrate/TermTestBase.php
index 3fb7faeb693ce1b78792fab052a3856ccf7a8704..e0ff6252acfcb7a90269eb0e3c769355d197b94c 100644
--- a/core/modules/taxonomy/tests/src/Unit/Migrate/TermTestBase.php
+++ b/core/modules/taxonomy/tests/src/Unit/Migrate/TermTestBase.php
@@ -13,7 +13,6 @@ abstract class TermTestBase extends MigrateSqlSourceTestCase {
 
   protected $migrationConfiguration = array(
     'id' => 'test',
-    'highWaterProperty' => array('field' => 'test'),
     'source' => array(
       'plugin' => 'd6_taxonomy_term',
     ),