diff --git a/core/modules/migrate/src/MigrateExecutable.php b/core/modules/migrate/src/MigrateExecutable.php
index 51c256141f0faf9a8e294f67d4c405247dcdb40a..f33781e9fcadfba67ec4e24426b31bd33532b2c2 100644
--- a/core/modules/migrate/src/MigrateExecutable.php
+++ b/core/modules/migrate/src/MigrateExecutable.php
@@ -162,7 +162,7 @@ class MigrateExecutable implements MigrateExecutableInterface {
   /**
    * The source.
    *
-   * @var \Drupal\migrate\Source
+   * @var \Drupal\migrate\Plugin\MigrateSourceInterface
    */
   protected $source;
 
@@ -219,12 +219,16 @@ public function __construct(MigrationInterface $migration, MigrateMessageInterfa
    *
    * Makes sure source is initialized based on migration settings.
    *
-   * @return \Drupal\migrate\Source
+   * @return \Drupal\migrate\Plugin\MigrateSourceInterface
    *   The source.
    */
   protected function getSource() {
     if (!isset($this->source)) {
-      $this->source = new Source($this->migration, $this);
+      $this->source = $this->migration->getSourcePlugin();
+
+      // @TODO, find out how to remove this.
+      // @see https://drupal.org/node/2443617
+      $this->source->migrateExecutable = $this;
     }
     return $this->source;
   }
@@ -256,13 +260,11 @@ public function import() {
     }
     catch (\Exception $e) {
       $this->message->display(
-        $this->t('Migration failed with source plugin exception: !e',
-          array('!e' => $e->getMessage())), 'error');
+        $this->t('Migration failed with source plugin exception: !e', array('!e' => $e->getMessage())), 'error');
       return MigrationInterface::RESULT_FAILED;
     }
 
     $destination = $this->migration->getDestinationPlugin();
-
     while ($source->valid()) {
       $row = $source->current();
       if ($this->sourceIdValues = $row->getSourceIdValues()) {
diff --git a/core/modules/migrate/src/Plugin/MigrateSourceInterface.php b/core/modules/migrate/src/Plugin/MigrateSourceInterface.php
index 5ff3f98c111551665ccc2ece140c1fb72e32a387..d320c4b459ebc6e16b22a5d3916955ae030006b3 100644
--- a/core/modules/migrate/src/Plugin/MigrateSourceInterface.php
+++ b/core/modules/migrate/src/Plugin/MigrateSourceInterface.php
@@ -19,7 +19,7 @@
  *
  * @ingroup migration
  */
-interface MigrateSourceInterface extends \Countable, PluginInspectionInterface {
+interface MigrateSourceInterface extends \Countable, \Iterator, PluginInspectionInterface {
 
   /**
    * Returns available fields on the source.
diff --git a/core/modules/migrate/src/Plugin/migrate/source/EmptySource.php b/core/modules/migrate/src/Plugin/migrate/source/EmptySource.php
index 705e93bb3399c8052ac8bee9405b8d0a06b9d5c9..32a9ab768ab9ec5af6b625f4da346a689f9b81a4 100644
--- a/core/modules/migrate/src/Plugin/migrate/source/EmptySource.php
+++ b/core/modules/migrate/src/Plugin/migrate/source/EmptySource.php
@@ -30,7 +30,7 @@ public function fields() {
   /**
    * {@inheritdoc}
    */
-  public function getIterator() {
+  public function initializeIterator() {
     return new \ArrayIterator(array(array('id' => '')));
   }
 
diff --git a/core/modules/migrate/src/Plugin/migrate/source/SourcePluginBase.php b/core/modules/migrate/src/Plugin/migrate/source/SourcePluginBase.php
index 1d1fa6c523514cdc5440948e7ac3c9dc8ba55afa..6eb87c74e6d172c7335a281b3018c9dc5836bedf 100644
--- a/core/modules/migrate/src/Plugin/migrate/source/SourcePluginBase.php
+++ b/core/modules/migrate/src/Plugin/migrate/source/SourcePluginBase.php
@@ -9,6 +9,8 @@
 
 use Drupal\Core\Plugin\PluginBase;
 use Drupal\migrate\Entity\MigrationInterface;
+use Drupal\migrate\MigrateException;
+use Drupal\migrate\Plugin\MigrateIdMapInterface;
 use Drupal\migrate\Plugin\MigrateSourceInterface;
 use Drupal\migrate\Row;
 
@@ -22,7 +24,7 @@
  *
  * @ingroup migration
  */
-abstract class SourcePluginBase extends PluginBase implements MigrateSourceInterface  {
+abstract class SourcePluginBase extends PluginBase implements MigrateSourceInterface {
 
   /**
    * @var \Drupal\Core\Extension\ModuleHandlerInterface
@@ -34,14 +36,154 @@ abstract class SourcePluginBase extends PluginBase implements MigrateSourceInter
    */
   protected $migration;
 
+  /**
+   * The name and type of the highwater property in the source.
+   *
+   * @var array
+   *
+   * @see $originalHighwater
+   */
+  protected $highWaterProperty;
+
+  /**
+   * The current row from the query
+   *
+   * @var \Drupal\Migrate\Row
+   */
+  protected $currentRow;
+
+  /**
+   * The primary key of the current row
+   *
+   * @var array
+   */
+  protected $currentSourceIds;
+
+  /**
+   * Number of rows intentionally ignored (prepareRow() returned FALSE)
+   *
+   * @var int
+   */
+  protected $numIgnored = 0;
+
+  /**
+   * Number of rows we've at least looked at.
+   *
+   * @var int
+   */
+  protected $numProcessed = 0;
+
+  /**
+   * The high water mark at the beginning of the import operation.
+   *
+   * If the source has a property for tracking changes (like Drupal ha
+   * node.changed) then this is the highest value of those imported so far.
+   *
+   * @var int
+   */
+  protected $originalHighWater;
+
+  /**
+   * List of source IDs to process.
+   *
+   * @var array
+   */
+  protected $idList = array();
+
+  /**
+   * Whether this instance should cache the source count.
+   *
+   * @var boolean
+   */
+  protected $cacheCounts = FALSE;
+
+  /**
+   * Key to use for caching counts.
+   *
+   * @var string
+   */
+  protected $cacheKey;
+
+  /**
+   * Whether this instance should not attempt to count the source.
+   *
+   * @var boolean
+   */
+  protected $skipCount = FALSE;
+
+  /**
+   * If TRUE, we will maintain hashed source rows to determine whether incoming
+   * data has changed.
+   *
+   * @var bool
+   */
+  protected $trackChanges = FALSE;
+
+  /**
+   * By default, next() will directly read the map row and add it to the data
+   * row. A source plugin implementation may do this itself (in particular, the
+   * SQL source can incorporate the map table into the query) - if so, it should
+   * set this TRUE so we don't duplicate the effort.
+   *
+   * @var bool
+   */
+  protected $mapRowAdded = FALSE;
+
+  /**
+   * @var \Drupal\Core\Cache\CacheBackendInterface
+   */
+  protected $cache;
+
+  /**
+   * @var \Drupal\migrate\Plugin\MigrateIdMapInterface
+   */
+  protected $idMap;
+
+  /**
+   * @var \Iterator
+   */
+  protected $iterator;
+
+  // @TODO, find out how to remove this.
+  // @see https://drupal.org/node/2443617
+  public $migrateExecutable;
+
   /**
    * {@inheritdoc}
    */
   public function __construct(array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration) {
     parent::__construct($configuration, $plugin_id, $plugin_definition);
     $this->migration = $migration;
+
+    // Set up some defaults based on the source configuration.
+    $this->cacheCounts = !empty($configuration['cache_counts']);
+    $this->skipCount = !empty($configuration['skip_count']);
+    $this->cacheKey = !empty($configuration['cache_key']) ? !empty($configuration['cache_key']) : NULL;
+    $this->trackChanges = !empty($configuration['track_changes']) ? $configuration['track_changes'] : FALSE;
+
+    // Pull out the current highwater mark if we have a highwater property.
+    if ($this->highWaterProperty = $this->migration->get('highWaterProperty')) {
+      $this->originalHighWater = $this->migration->getHighWater();
+    }
+
+    if ($id_list = $this->migration->get('idlist')) {
+      $this->idList = $id_list;
+    }
+
+    // Don't allow the use of both highwater and track changes together.
+    if ($this->highWaterProperty && $this->trackChanges) {
+      throw new MigrateException('You should either use a highwater mark or track changes not both. They are both designed to solve the same problem');
+    }
   }
 
+  /**
+   * Initialize the iterator with the source data.
+   *
+   * @return array
+   *   An array of the data for this source.
+   */
+  protected abstract function initializeIterator();
+
   /**
    * Get the module handler.
    *
@@ -59,12 +201,259 @@ protected function getModuleHandler() {
    * {@inheritdoc}
    */
   public function prepareRow(Row $row) {
+
+    $result = TRUE;
     $result_hook = $this->getModuleHandler()->invokeAll('migrate_prepare_row', array($row, $this, $this->migration));
     $result_named_hook = $this->getModuleHandler()->invokeAll('migrate_' . $this->migration->id() . '_prepare_row', array($row, $this, $this->migration));
-    // If any of the hooks returned false, we want to skip the row.
+
+    // We're explicitly skipping this row - keep track in the map table.
     if (($result_hook && in_array(FALSE, $result_hook)) || ($result_named_hook && in_array(FALSE, $result_named_hook))) {
-      return FALSE;
+      // Make sure we replace any previous messages for this item with any
+      // new ones.
+      $id_map = $this->migration->getIdMap();
+      $id_map->delete($this->currentSourceIds, TRUE);
+      $this->migrateExecutable->saveQueuedMessages();
+      $id_map->saveIdMapping($row, array(), MigrateIdMapInterface::STATUS_IGNORED, $this->migrateExecutable->rollbackAction);
+      $this->numIgnored++;
+      $this->currentRow = NULL;
+      $this->currentSourceIds = NULL;
+      $result = FALSE;
+    }
+    elseif ($this->trackChanges) {
+      // When tracking changed data, We want to quietly skip (rather than
+      // "ignore") rows with changes. The caller needs to make that decision,
+      // so we need to provide them with the necessary information (before and
+      // after hashes).
+      $row->rehash();
+    }
+    $this->numProcessed++;
+    return $result;
+  }
+
+  /**
+   * Returns the iterator that will yield the row arrays to be processed.
+   *
+   * @return \Iterator
+   */
+  public function getIterator() {
+    if (!isset($this->iterator)) {
+      $this->iterator = $this->initializeIterator();
+    }
+    return $this->iterator;
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function current() {
+    return $this->currentRow;
+  }
+
+  /**
+   * Get the iterator key.
+   *
+   * Implementation of Iterator::key - called when entering a loop iteration,
+   * returning the key of the current row. It must be a scalar - we will
+   * serialize to fulfill the requirement, but using getCurrentIds() is
+   * preferable.
+   */
+  public function key() {
+    return serialize($this->currentSourceIds);
+  }
+
+  /**
+   * Whether the iterator is currently valid.
+   *
+   * Implementation of Iterator::valid() - called at the top of the loop,
+   * returning TRUE to process the loop and FALSE to terminate it
+   */
+  public function valid() {
+    return isset($this->currentRow);
+  }
+
+  /**
+   * Rewind the iterator.
+   *
+   * Implementation of Iterator::rewind() - subclasses of MigrateSource should
+   * implement performRewind() to do any class-specific setup for iterating
+   * source records.
+   */
+  public function rewind() {
+    $this->idMap = $this->migration->getIdMap();
+    $this->numProcessed = 0;
+    $this->numIgnored = 0;
+    $this->getIterator()->rewind();
+    $this->next();
+  }
+
+  /**
+   * {@inheritdoc}
+   *
+   * The migration iterates over rows returned by the source plugin. This
+   * method determines the next row which will be processed and imported into
+   * the system.
+   *
+   * The method tracks the source and destination IDs using the ID map plugin.
+   *
+   * This also takes care about highwater support. Highwater allows to reimport
+   * rows from a previous migration run, which got changed in the meantime.
+   * This is done by specifying a highwater field, which is compared with the
+   * last time, the migration got executed (originalHighWater).
+   */
+  public function next() {
+    $this->currentSourceIds = NULL;
+    $this->currentRow = NULL;
+    $source_configuration = $this->migration->get('source');
+
+    // In order to find the next row we want to process, we ask the source
+    // plugin for the next possible row.
+    while (!isset($this->currentRow) && $this->getIterator()->valid()) {
+
+      $row_data = $this->getIterator()->current() + $source_configuration;
+      $this->getIterator()->next();
+      $row = new Row($row_data, $this->migration->getSourcePlugin()->getIds(), $this->migration->get('destinationIds'));
+
+      // Populate the source key for this row.
+      $this->currentSourceIds = $row->getSourceIdValues();
+
+      // Pick up the existing map row, if any, unless getNextRow() did it.
+      if (!$this->mapRowAdded && ($id_map = $this->idMap->getRowBySource($this->currentSourceIds))) {
+        $row->setIdMap($id_map);
+      }
+
+      // In case we have specified an ID list, but the ID given by the source is
+      // not in there, we skip the row.
+      $id_in_the_list = $this->idList && in_array(reset($this->currentSourceIds), $this->idList);
+      if ($this->idList && !$id_in_the_list) {
+        continue;
+      }
+
+      // Preparing the row gives source plugins the chance to skip.
+      if ($this->prepareRow($row) === FALSE) {
+        continue;
+      }
+
+      // Check whether the row needs processing.
+      // 1. Explicitly specified IDs.
+      // 2. This row has not been imported yet.
+      // 3. Explicitly set to update.
+      // 4. The row is newer than the current highwater mark.
+      // 5. If no such property exists then try by checking the hash of the row.
+      if ($id_in_the_list || !$row->getIdMap() || $row->needsUpdate() || $this->aboveHighwater($row) || $this->rowChanged($row) ) {
+        $this->currentRow = $row->freezeSource();
+      }
+    }
+  }
+
+  /**
+   * Check if the incoming data is newer than what we've previously imported.
+   *
+   * @param \Drupal\migrate\Row $row
+   *   The row we're importing.
+   *
+   * @return bool
+   *   TRUE if the highwater value in the row is greater than our current value.
+   */
+  protected function aboveHighwater(Row $row) {
+    return $this->highWaterProperty && $row->getSourceProperty($this->highWaterProperty['name']) > $this->originalHighWater;
+  }
+
+  /**
+   * Check if the incoming row has changed since our last import.
+   *
+   * @param \Drupal\migrate\Row $row
+   *   The row we're importing.
+   *
+   * @return bool
+   *   TRUE if the row has changed otherwise FALSE.
+   */
+  protected function rowChanged(Row $row) {
+    return $this->trackChanges && $row->changed();
+  }
+
+  /**
+   * Getter for currentSourceIds data member.
+   */
+  public function getCurrentIds() {
+    return $this->currentSourceIds;
+  }
+
+  /**
+   * Getter for numIgnored data member.
+   */
+  public function getIgnored() {
+    return $this->numIgnored;
+  }
+
+  /**
+   * Getter for numProcessed data member.
+   */
+  public function getProcessed() {
+    return $this->numProcessed;
+  }
+
+  /**
+   * Reset numIgnored back to 0.
+   */
+  public function resetStats() {
+    $this->numIgnored = 0;
+  }
+
+  /**
+   * Get the source count.
+   *
+   * Return a count of available source records, from the cache if appropriate.
+   * Returns -1 if the source is not countable.
+   *
+   * @param bool $refresh
+   *   Whether or not to refresh the count.
+   *
+   * @return int
+   *   The count.
+   */
+  public function count($refresh = FALSE) {
+    if ($this->skipCount) {
+      return -1;
+    }
+
+    if (!isset($this->cacheKey)) {
+      $this->cacheKey = hash('sha256', $this->getPluginId());
+    }
+
+    // If a refresh is requested, or we're not caching counts, ask the derived
+    // class to get the count from the source.
+    if ($refresh || !$this->cacheCounts) {
+      $count = $this->getIterator()->count();
+      $this->getCache()->set($this->cacheKey, $count, 'cache');
+    }
+    else {
+      // Caching is in play, first try to retrieve a cached count.
+      $cache_object = $this->getCache()->get($this->cacheKey, 'cache');
+      if (is_object($cache_object)) {
+        // Success.
+        $count = $cache_object->data;
+      }
+      else {
+        // No cached count, ask the derived class to count 'em up, and cache
+        // the result.
+        $count = $this->getIterator()->count();
+        $this->getCache()->set($this->cacheKey, $count, 'cache');
+      }
+    }
+    return $count;
+  }
+
+  /**
+   * Get the cache object.
+   *
+   * @return \Drupal\Core\Cache\CacheBackendInterface
+   *   The cache object.
+   */
+  protected function getCache() {
+    if (!isset($this->cache)) {
+      $this->cache = \Drupal::cache('migrate');
     }
+    return $this->cache;
   }
 
 }
diff --git a/core/modules/migrate/src/Plugin/migrate/source/SqlBase.php b/core/modules/migrate/src/Plugin/migrate/source/SqlBase.php
index cdfa4deef186e63da0ab26b5f19717c827190785..7a448f03146ca4c2fff002c3e6a1c76ff6565e30 100644
--- a/core/modules/migrate/src/Plugin/migrate/source/SqlBase.php
+++ b/core/modules/migrate/src/Plugin/migrate/source/SqlBase.php
@@ -112,7 +112,7 @@ protected function prepareQuery() {
    * We could simply execute the query and be functionally correct, but
    * we will take advantage of the PDO-based API to optimize the query up-front.
    */
-  protected function runQuery() {
+  protected function initializeIterator() {
     $this->prepareQuery();
     $high_water_property = $this->migration->get('highWaterProperty');
 
@@ -203,18 +203,6 @@ public function count() {
     return $this->query()->countQuery()->execute()->fetchField();
   }
 
-  /**
-   * Returns the iterator that will yield the row arrays to be processed.
-   *
-   * @return \Iterator
-   */
-  public function getIterator() {
-    if (!isset($this->iterator)) {
-      $this->iterator = $this->runQuery();
-    }
-    return $this->iterator;
-  }
-
   /**
    * Check if we can join against the map table.
    *
diff --git a/core/modules/migrate/src/Row.php b/core/modules/migrate/src/Row.php
index 4be97124a0c164a14375d6039359ebb8e6ab1d53..7b3669e4fe83d306aec789797aefa65767cabb01 100644
--- a/core/modules/migrate/src/Row.php
+++ b/core/modules/migrate/src/Row.php
@@ -176,9 +176,12 @@ public function setSourceProperty($property, $data) {
 
   /**
    * Freezes the source.
+   *
+   * @return $this
    */
   public function freezeSource() {
     $this->frozen = TRUE;
+    return $this;
   }
 
   /**
diff --git a/core/modules/migrate/src/Source.php b/core/modules/migrate/src/Source.php
deleted file mode 100644
index ca2fe70507794420842b2e0028c0281ce5c63fff..0000000000000000000000000000000000000000
--- a/core/modules/migrate/src/Source.php
+++ /dev/null
@@ -1,449 +0,0 @@
-<?php
-
-/**
- * @file
- * Contains \Drupal\migrate\Plugin\migrate\source\SourceBase.
- */
-
-namespace Drupal\migrate;
-
-use Drupal\migrate\Entity\MigrationInterface;
-use Drupal\migrate\Plugin\MigrateIdMapInterface;
-
-/**
- * Source is a caching / decision making wrapper around the source plugin.
- *
- * Derived classes are expected to define __toString(), returning a string
- * describing the source and significant options, i.e. the query.
- *
- * @see \Drupal\migrate\MigrateSourceInterface
- */
-class Source implements \Iterator, \Countable {
-
-  /**
-   * The current row from the quey
-   *
-   * @var \Drupal\Migrate\Row
-   */
-  protected $currentRow;
-
-  /**
-   * The primary key of the current row
-   *
-   * @var array
-   */
-  protected $currentIds;
-
-  /**
-   * Number of rows intentionally ignored (prepareRow() returned FALSE)
-   *
-   * @var int
-   */
-  protected $numIgnored = 0;
-
-  /**
-   * Number of rows we've at least looked at.
-   *
-   * @var int
-   */
-  protected $numProcessed = 0;
-
-  /**
-   * The high water mark at the beginning of the import operation.
-   *
-   * @var
-   */
-  protected $originalHighWater = '';
-
-  /**
-   * List of source IDs to process.
-   *
-   * @var array
-   */
-  protected $idList = array();
-
-  /**
-   * Whether this instance should cache the source count.
-   *
-   * @var boolean
-   */
-  protected $cacheCounts = FALSE;
-
-  /**
-   * Key to use for caching counts.
-   *
-   * @var string
-   */
-  protected $cacheKey;
-
-  /**
-   * Whether this instance should not attempt to count the source.
-   *
-   * @var boolean
-   */
-  protected $skipCount = FALSE;
-
-  /**
-   * If TRUE, we will maintain hashed source rows to determine whether incoming
-   * data has changed.
-   *
-   * @var bool
-   */
-  protected $trackChanges = FALSE;
-
-  /**
-   * By default, next() will directly read the map row and add it to the data
-   * row. A source plugin implementation may do this itself (in particular, the
-   * SQL source can incorporate the map table into the query) - if so, it should
-   * set this TRUE so we don't duplicate the effort.
-   *
-   * @var bool
-   */
-  protected $mapRowAdded = FALSE;
-
-  /**
-   * @var array
-   */
-  protected $sourceIds;
-
-  /**
-   * @var \Drupal\Core\Cache\CacheBackendInterface
-   */
-  protected $cache;
-
-  /**
-   * @var \Drupal\migrate\Plugin\MigrateIdMapInterface
-   */
-  protected $idMap;
-
-  /**
-   * @var array
-   */
-  protected $highWaterProperty;
-
-  /**
-   * Getter for currentIds data member.
-   */
-  public function getCurrentIds() {
-    return $this->currentIds;
-  }
-
-  /**
-   * Getter for numIgnored data member.
-   */
-  public function getIgnored() {
-    return $this->numIgnored;
-  }
-
-  /**
-   * Getter for numProcessed data member.
-   */
-  public function getProcessed() {
-    return $this->numProcessed;
-  }
-
-  /**
-   * Reset numIgnored back to 0.
-   */
-  public function resetStats() {
-    $this->numIgnored = 0;
-  }
-
-  /**
-   * Get the source count.
-   *
-   * Return a count of available source records, from the cache if appropriate.
-   * Returns -1 if the source is not countable.
-   *
-   * @param bool $refresh
-   *   Whether or not to refresh the count.
-   *
-   * @return int
-   *   The count.
-   */
-  public function count($refresh = FALSE) {
-    if ($this->skipCount) {
-      return -1;
-    }
-    $source = $this->migration->getSourcePlugin();
-
-    if (!isset($this->cacheKey)) {
-      $this->cacheKey = hash('sha256', (string) $source);
-    }
-
-    // If a refresh is requested, or we're not caching counts, ask the derived
-    // class to get the count from the source.
-    if ($refresh || !$this->cacheCounts) {
-      $count = $source->count();
-      $this->cache->set($this->cacheKey, $count, 'cache');
-    }
-    else {
-      // Caching is in play, first try to retrieve a cached count.
-      $cache_object = $this->cache->get($this->cacheKey, 'cache');
-      if (is_object($cache_object)) {
-        // Success.
-        $count = $cache_object->data;
-      }
-      else {
-        // No cached count, ask the derived class to count 'em up, and cache
-        // the result.
-        $count = $source->count();
-        $this->cache->set($this->cacheKey, $count, 'cache');
-      }
-    }
-    return $count;
-  }
-
-  /**
-   * Class constructor.
-   *
-   * @param \Drupal\migrate\Entity\MigrationInterface $migration
-   *   The migration entity.
-   * @param \Drupal\migrate\MigrateExecutableInterface $migrate_executable
-   *   The migration executable.
-   */
-  public function __construct(MigrationInterface $migration, MigrateExecutableInterface $migrate_executable) {
-    $this->migration = $migration;
-    $this->migrateExecutable = $migrate_executable;
-    $configuration = $migration->get('source');
-    if (!empty($configuration['cache_counts'])) {
-      $this->cacheCounts = TRUE;
-    }
-    if (!empty($configuration['skip_count'])) {
-      $this->skipCount = TRUE;
-    }
-    if (!empty($configuration['cache_key'])) {
-      $this->cacheKey = $configuration['cache_key'];
-    }
-    if (!empty($configuration['track_changes'])) {
-      $this->trackChanges = $configuration['track_changes'];
-    }
-  }
-
-  /**
-   * Get the cache object.
-   *
-   * @return \Drupal\Core\Cache\CacheBackendInterface
-   *   The cache object.
-   */
-  protected function getCache() {
-    if (!isset($this->cache)) {
-      $this->cache = \Drupal::cache('migrate');
-    }
-    return $this->cache;
-  }
-
-  /**
-   * Get the source iterator.
-   *
-   * @return \Iterator
-   *   The source iterator.
-   */
-  protected function getIterator() {
-    if (!isset($this->iterator)) {
-      $this->iterator = $this->migration->getSourcePlugin()->getIterator();
-    }
-    return $this->iterator;
-  }
-
-  /**
-   * {@inheritdoc}
-   */
-  public function current() {
-    return $this->currentRow;
-  }
-
-  /**
-   * Get the iterator key.
-   *
-   * Implementation of Iterator::key - called when entering a loop iteration,
-   * returning the key of the current row. It must be a scalar - we will
-   * serialize to fulfill the requirement, but using getCurrentIds() is
-   * preferable.
-   */
-  public function key() {
-    return serialize($this->currentIds);
-  }
-
-  /**
-   * Whether the iterator is currently valid.
-   *
-   * Implementation of Iterator::valid() - called at the top of the loop,
-   * returning TRUE to process the loop and FALSE to terminate it
-   */
-  public function valid() {
-    return isset($this->currentRow);
-  }
-
-  /**
-   * Rewind the iterator.
-   *
-   * Implementation of Iterator::rewind() - subclasses of MigrateSource should
-   * implement performRewind() to do any class-specific setup for iterating
-   * source records.
-   */
-  public function rewind() {
-    $this->idMap = $this->migration->getIdMap();
-    $this->numProcessed = 0;
-    $this->numIgnored = 0;
-    $this->originalHighWater = $this->migration->getHighWater();
-    $this->highWaterProperty = $this->migration->get('highWaterProperty');
-    if ($id_list = $this->migration->get('idlist')) {
-      $this->idList = $id_list;
-    }
-    $this->getIterator()->rewind();
-    $this->next();
-  }
-
-  /**
-   * {@inheritdoc}
-   */
-  public function next() {
-    $this->currentIds = NULL;
-    $this->currentRow = NULL;
-    $source_configuration = $this->migration->get('source');
-
-    while ($this->getIterator()->valid()) {
-      $row_data = $this->getIterator()->current() + $source_configuration;
-      $this->getIterator()->next();
-      $row = new Row($row_data, $this->migration->getSourcePlugin()->getIds(), $this->migration->get('destinationIds'));
-
-      // Populate the source key for this row.
-      $this->currentIds = $row->getSourceIdValues();
-
-      // Pick up the existing map row, if any, unless getNextRow() did it.
-      if (!$this->mapRowAdded && ($id_map = $this->idMap->getRowBySource($this->currentIds))) {
-        $row->setIdMap($id_map);
-      }
-
-      // First, determine if this row should be passed to prepareRow(), or
-      // skipped entirely. The rules are:
-      // 1. If there's an explicit idlist, that's all we care about (ignore
-      //    high waters and map rows).
-      $prepared = FALSE;
-      if (!empty($this->idList)) {
-        if (in_array(reset($this->currentIds), $this->idList)) {
-          // In the list, fall through.
-        }
-        else {
-          // Not in the list, skip it.
-          continue;
-        }
-      }
-      // 2. If the row is not in the map (we have never tried to import it
-      //    before), we always want to try it.
-      elseif (!$row->getIdMap()) {
-        // Fall through
-      }
-      // 3. If the row is marked as needing update, pass it.
-      elseif ($row->needsUpdate()) {
-        // Fall through.
-      }
-      // 4. At this point, we have a row which has previously been imported and
-      //    not marked for update. If we're not using high water marks, then we
-      //    will not take this row. Except, if we're looking for changes in the
-      //    data, we need to go through prepareRow() before we can decide to
-      //    skip it.
-      elseif (!empty($this->highWaterProperty['field'])) {
-        if ($this->trackChanges) {
-          if ($this->prepareRow($row) !== FALSE) {
-            if ($row->changed()) {
-              // This is a keeper
-              $this->currentRow = $row;
-              break;
-            }
-            else {
-              // No change, skip it.
-              continue;
-            }
-          }
-          else {
-            // prepareRow() told us to skip it.
-            continue;
-          }
-        }
-        else {
-          // No high water and not tracking changes, skip.
-          continue;
-        }
-      }
-      // 5. The initial high water mark, before anything is migrated, is ''. We
-      //    want to make sure we don't mistakenly skip rows with a high water
-      //    field value of 0, so explicitly handle '' here.
-      elseif ($this->originalHighWater === '') {
-        // Fall through
-      }
-      // 6. So, we are using high water marks. Take the row if its high water
-      //    field value is greater than the saved mark, otherwise skip it.
-      else {
-        // Call prepareRow() here, in case the highWaterField needs preparation.
-        if ($this->prepareRow($row) !== FALSE) {
-          if ($row->getSourceProperty($this->highWaterProperty['name']) > $this->originalHighWater) {
-            $this->currentRow = $row;
-            break;
-          }
-          else {
-            // Skip.
-            continue;
-          }
-        }
-        $prepared = TRUE;
-      }
-
-      // Allow the Migration to prepare this row. prepareRow() can return
-      // boolean FALSE to ignore this row.
-      if (!$prepared) {
-        if ($this->prepareRow($row) !== FALSE) {
-          // Finally, we've got a keeper.
-          $this->currentRow = $row;
-          break;
-        }
-        else {
-          $this->currentRow = NULL;
-        }
-      }
-    }
-    if ($this->currentRow) {
-      $this->currentRow->freezeSource();
-    }
-    else {
-      $this->currentIds = NULL;
-    }
-  }
-
-  /**
-   * Source classes should override this as necessary and manipulate $keep.
-   *
-   * @param \Drupal\migrate\Row $row
-   *   The row object.
-   *
-   * @return bool
-   *   TRUE if we're to process the row otherwise FALSE.
-   */
-  protected function prepareRow(Row $row) {
-    // We're explicitly skipping this row - keep track in the map table.
-    if (($result = $this->migration->getSourcePlugin()->prepareRow($row)) === FALSE) {
-      // Make sure we replace any previous messages for this item with any
-      // new ones.
-      $id_map = $this->migration->getIdMap();
-      $id_map->delete($this->currentIds, TRUE);
-      $this->migrateExecutable->saveQueuedMessages();
-      $id_map->saveIdMapping($row, array(), MigrateIdMapInterface::STATUS_IGNORED, $this->migrateExecutable->rollbackAction);
-      $this->numIgnored++;
-      $this->currentRow = NULL;
-      $this->currentIds = NULL;
-    }
-    else {
-      // When tracking changed data, We want to quietly skip (rather than
-      // "ignore") rows with changes. The caller needs to make that decision,
-      // so we need to provide them with the necessary information (before and
-      // after hashes).
-      if ($this->trackChanges) {
-        $row->rehash();
-      }
-    }
-    $this->numProcessed++;
-    return $result;
-  }
-
-}
diff --git a/core/modules/migrate/tests/src/Unit/MigrateExecutableTest.php b/core/modules/migrate/tests/src/Unit/MigrateExecutableTest.php
index 517b6c0480ffb9eb7f3b317576c2b1fe9509f2f1..2a700e9029c1905f0e1ebd0c28d700c6acd60103 100644
--- a/core/modules/migrate/tests/src/Unit/MigrateExecutableTest.php
+++ b/core/modules/migrate/tests/src/Unit/MigrateExecutableTest.php
@@ -61,15 +61,11 @@ protected function setUp() {
    * Tests an import with an incomplete rewinding.
    */
   public function testImportWithFailingRewind() {
-    $iterator = $this->getMock('\Iterator');
     $exception_message = $this->getRandomGenerator()->string();
-    $iterator->expects($this->once())
+    $source = $this->getMock('Drupal\migrate\Plugin\MigrateSourceInterface');
+    $source->expects($this->once())
       ->method('rewind')
       ->will($this->throwException(new \Exception($exception_message)));
-    $source = $this->getMock('Drupal\migrate\Plugin\MigrateSourceInterface');
-    $source->expects($this->any())
-      ->method('getIterator')
-      ->will($this->returnValue($iterator));
 
     $this->migration->expects($this->any())
       ->method('getSourcePlugin')
@@ -500,20 +496,25 @@ public function testProcessRowEmptyPipeline() {
   /**
    * Returns a mock migration source instance.
    *
-   * @return \Drupal\migrate\Source|\PHPUnit_Framework_MockObject_MockObject
+   * @return \Drupal\migrate\Plugin\MigrateSourceInterface|\PHPUnit_Framework_MockObject_MockObject
    */
   protected function getMockSource() {
     $iterator = $this->getMock('\Iterator');
 
-    $source = $this->getMockBuilder('Drupal\migrate\Source')
+    $class = 'Drupal\migrate\Plugin\migrate\source\SourcePluginBase';
+    $source = $this->getMockBuilder($class)
       ->disableOriginalConstructor()
-      ->getMock();
+      ->setMethods(get_class_methods($class))
+      ->getMockForAbstractClass();
     $source->expects($this->any())
       ->method('getIterator')
       ->will($this->returnValue($iterator));
     $source->expects($this->once())
       ->method('rewind')
       ->will($this->returnValue(TRUE));
+    $source->expects($this->any())
+      ->method('initializeIterator')
+      ->will($this->returnValue([]));
     $source->expects($this->any())
       ->method('valid')
       ->will($this->onConsecutiveCalls(TRUE, FALSE));
diff --git a/core/modules/migrate/tests/src/Unit/MigrateSourceTest.php b/core/modules/migrate/tests/src/Unit/MigrateSourceTest.php
new file mode 100644
index 0000000000000000000000000000000000000000..a2389274869e217fcab26b9bf2cd198ed9cad854
--- /dev/null
+++ b/core/modules/migrate/tests/src/Unit/MigrateSourceTest.php
@@ -0,0 +1,245 @@
+<?php
+
+/**
+ * @file
+ * Contains \Drupal\migrate\Unit\MigrateSourceTest
+ */
+
+namespace Drupal\Tests\migrate\Unit;
+
+use Drupal\Core\DependencyInjection\ContainerBuilder;
+use Drupal\migrate\MigrateExecutable;
+use Drupal\migrate\Plugin\MigrateIdMapInterface;
+
+/**
+ * @coversDefaultClass \Drupal\migrate\Plugin\migrate\source\SourcePluginBase
+ * @group migrate
+ */
+class MigrateSourceTest extends MigrateTestCase {
+
+  /**
+   * Override the migration config.
+   *
+   * @var array
+   */
+  protected $defaultMigrationConfiguration = [
+    'id' => 'test_migration',
+    'source' => [],
+  ];
+
+  /**
+   * Test row data.
+   *
+   * @var array
+   */
+  protected $row = ['test_sourceid1' => '1', 'timestamp' => 500];
+
+  /**
+   * Test source ids.
+   *
+   * @var array
+   */
+  protected $sourceIds = ['test_sourceid1' => 'test_sourceid1'];
+
+  /**
+   * The migration entity.
+   *
+   * @var \Drupal\migrate\Entity\Migration
+   */
+  protected $migration;
+
+  /**
+   * The migrate executable.
+   *
+   * @var \Drupal\migrate\MigrateExecutable
+   */
+  protected $executable;
+
+  /**
+   * Get the source plugin to test.
+   *
+   * @param array $configuration
+   *   The source configuration.
+   * @param array $migrate_config
+   *   The migration configuration to be used in parent::getMigration().
+   * @param int $status
+   *   The default status for the new rows to be imported.
+   *
+   * @return \Drupal\migrate\Plugin\MigrateSourceInterface
+   *   A mocked source plugin.
+   */
+  protected function getSource($configuration = [], $migrate_config = [], $status = MigrateIdMapInterface::STATUS_NEEDS_UPDATE) {
+    $this->migrationConfiguration = $this->defaultMigrationConfiguration + $migrate_config;
+    $this->migration = parent::getMigration();
+    $this->executable = $this->getMigrateExecutable($this->migration);
+
+    // Update the idMap for Source so the default is that the row has already
+    // been imported. This allows us to use the highwater mark to decide on the
+    // outcome of whether we choose to import the row.
+    $id_map_array = ['original_hash' => '', 'hash' => '', 'source_row_status' => $status];
+    $this->idMap
+      ->expects($this->any())
+      ->method('getRowBySource')
+      ->willReturn($id_map_array);
+
+    $constructor_args = [$configuration, 'd6_action', [], $this->migration];
+    $methods = ['getModuleHandler', 'fields', 'getIds', '__toString', 'getIterator', 'prepareRow', 'initializeIterator'];
+    $source_plugin = $this->getMock('\Drupal\migrate\Plugin\migrate\source\SourcePluginBase', $methods, $constructor_args);
+
+    $source_plugin
+      ->expects($this->any())
+      ->method('fields')
+      ->willReturn([]);
+    $source_plugin
+      ->expects($this->any())
+      ->method('getIds')
+      ->willReturn([]);
+    $source_plugin
+      ->expects($this->any())
+      ->method('__toString')
+      ->willReturn('');
+    $source_plugin
+      ->expects($this->any())
+      ->method('prepareRow')
+      ->willReturn(empty($migrate_config['prepare_row_false']));
+    $source_plugin
+      ->expects($this->any())
+      ->method('initializeIterator')
+      ->willReturn([]);
+    $iterator = new \ArrayIterator([$this->row]);
+    $source_plugin
+      ->expects($this->any())
+      ->method('getIterator')
+      ->willReturn($iterator);
+
+    $module_handler = $this->getMock('\Drupal\Core\Extension\ModuleHandlerInterface');
+    $source_plugin
+      ->expects($this->any())
+      ->method('getModuleHandler')
+      ->willReturn($module_handler);
+
+    $this->migration
+      ->expects($this->any())
+      ->method('getSourcePlugin')
+      ->willReturn($source_plugin);
+
+    return $this->migration->getSourcePlugin();
+  }
+
+  /**
+   * @expectedException \Drupal\migrate\MigrateException
+   */
+  public function testHighwaterTrackChangesIncompatible() {
+    $source_config = ['track_changes' => TRUE];
+    $migration_config = ['highWaterProperty' => ['name' => 'something']];
+    $this->getSource($source_config, $migration_config);
+  }
+
+  /**
+   * Test that the source count is correct.
+   */
+  public function testCount() {
+
+    $container = new ContainerBuilder();
+    $container->register('cache.migrate', 'Drupal\Core\Cache\NullBackend')
+      ->setArguments(['migrate']);
+    \Drupal::setContainer($container);
+
+    // Test that the basic count works.
+    $source = $this->getSource();
+    $this->assertEquals(1, $source->count());
+
+    // Test caching the count works.
+    $source = $this->getSource(['cache_counts' => TRUE]);
+    $this->assertEquals(1, $source->count());
+
+    // Test the skip argument.
+    $source = $this->getSource(['skip_count' => TRUE]);
+    $this->assertEquals(-1, $source->count());
+  }
+
+  /**
+   * Test that we don't get a row if prepareRow() is false.
+   */
+  public function testPrepareRowFalse() {
+    $source = $this->getSource([], ['prepare_row_false' => TRUE]);
+
+    $source->rewind();
+    $this->assertNull($source->current(), 'No row is available when prepareRow() is false.');
+  }
+
+  /**
+   * Test that the when a source id is in the idList, we don't get a row.
+   */
+  public function testIdInList() {
+    $source = $this->getSource([], ['idlist' => ['test_sourceid1']]);
+    $source->rewind();
+
+    $this->assertNull($source->current(), 'No row is available because id was in idList.');
+  }
+
+  /**
+   * Test that $row->needsUpdate() works as expected.
+   */
+  public function testNextNeedsUpdate() {
+    $source = $this->getSource();
+
+    // $row->needsUpdate() === TRUE so we get a row.
+    $source->rewind();
+    $this->assertTrue(is_a($source->current(), 'Drupal\migrate\Row'), '$row->needsUpdate() is TRUE so we got a row.');
+
+    // Test that we don't get a row when the incoming row is marked as imported.
+    $source = $this->getSource([], [], MigrateIdMapInterface::STATUS_IMPORTED);
+    $source->rewind();
+    $this->assertNull($source->current(), 'Row was already imported, should be NULL');
+  }
+
+  /**
+   * Test that an outdated highwater mark does not cause a row to be imported.
+   */
+  public function testOutdatedHighwater() {
+
+    $source = $this->getSource([], [], MigrateIdMapInterface::STATUS_IMPORTED);
+
+    // Set the originalHighwater to something higher than our timestamp.
+    $this->migration
+      ->expects($this->any())
+      ->method('getHighwater')
+      ->willReturn($this->row['timestamp'] + 1);
+
+    // The current highwater mark is now higher than the row timestamp so no row
+    // is expected.
+    $source->rewind();
+    $this->assertNull($source->current(), 'Original highwater mark is higher than incoming row timestamp.');
+  }
+
+  /**
+   * Test that a highwater mark newer than our saved one imports a row.
+   *
+   * @throws \Exception
+   */
+  public function testNewHighwater() {
+
+    // Set a highwater property field for source. Now we should have a row
+    // because the row timestamp is greater than the current highwater mark.
+    $source = $this->getSource([], ['highWaterProperty' => ['name' => 'timestamp']], MigrateIdMapInterface::STATUS_IMPORTED);
+
+    $source->rewind();
+    $this->assertTrue(is_a($source->current(), 'Drupal\migrate\Row'), 'Incoming row timestamp is greater than current highwater mark so we have a row.');
+  }
+
+  /**
+   * Get a mock executable for the test.
+   *
+   * @param \Drupal\migrate\Entity\MigrationInterface $migration
+   *   The migration entity.
+   *
+   * @return \Drupal\migrate\MigrateExecutable
+   *   The migrate executable.
+   */
+  protected function getMigrateExecutable($migration) {
+    $message = $this->getMock('Drupal\migrate\MigrateMessageInterface');
+    return new MigrateExecutable($migration, $message);
+  }
+
+}
diff --git a/core/modules/migrate/tests/src/Unit/MigrateSqlSourceTestCase.php b/core/modules/migrate/tests/src/Unit/MigrateSqlSourceTestCase.php
index 9319ba8bd77c610ee281577f7f130033f4cf98d6..22863aa27d8934db2337707463636dbcce3afa21 100644
--- a/core/modules/migrate/tests/src/Unit/MigrateSqlSourceTestCase.php
+++ b/core/modules/migrate/tests/src/Unit/MigrateSqlSourceTestCase.php
@@ -7,9 +7,6 @@
 
 namespace Drupal\Tests\migrate\Unit;
 
-use Drupal\Core\Cache\CacheBackendInterface;
-use Drupal\migrate\Source;
-
 /**
  * Base class for Migrate module source unit tests.
  */
@@ -87,13 +84,7 @@ protected function setUp() {
     $migration->expects($this->any())
       ->method('getSourcePlugin')
       ->will($this->returnValue($plugin));
-    $migrateExecutable = $this->getMockBuilder('Drupal\migrate\MigrateExecutable')
-      ->disableOriginalConstructor()
-      ->getMock();
-    $this->source = new TestSource($migration, $migrateExecutable);
-
-    $cache = $this->getMock('Drupal\Core\Cache\CacheBackendInterface');
-    $this->source->setCache($cache);
+    $this->source = $plugin;
   }
 
   /**
@@ -113,9 +104,3 @@ protected function getValue($row, $key) {
   }
 
 }
-
-class TestSource extends Source {
-  public function setCache(CacheBackendInterface $cache) {
-    $this->cache = $cache;
-  }
-}
diff --git a/core/modules/migrate/tests/src/Unit/TestMigrateExecutable.php b/core/modules/migrate/tests/src/Unit/TestMigrateExecutable.php
index 467e022a5134f9f54783f0068b5138cec818f76b..f26bc5687df2c46af4e4970b7d1e6f1ace2f2272 100644
--- a/core/modules/migrate/tests/src/Unit/TestMigrateExecutable.php
+++ b/core/modules/migrate/tests/src/Unit/TestMigrateExecutable.php
@@ -129,7 +129,7 @@ public function maxExecTimeExceeded() {
   /**
    * Allows access to set protected source property.
    *
-   * @param \Drupal\migrate\Source $source
+   * @param \Drupal\migrate\Plugin\MigrateSourceInterface $source
    *   The value to set.
    */
   public function setSource($source) {
diff --git a/core/modules/migrate_drupal/src/Plugin/migrate/source/Variable.php b/core/modules/migrate_drupal/src/Plugin/migrate/source/Variable.php
index d6299997f28cc657d32b6cad97e3c32909ba1d17..32819eb2219c32a2c080f73ed0e0b25e4b0fa449 100644
--- a/core/modules/migrate_drupal/src/Plugin/migrate/source/Variable.php
+++ b/core/modules/migrate_drupal/src/Plugin/migrate/source/Variable.php
@@ -39,7 +39,7 @@ public function __construct(array $configuration, $plugin_id, $plugin_definition
   /**
    * {@inheritdoc}
    */
-  protected function runQuery() {
+  protected function initializeIterator() {
     return new \ArrayIterator(array($this->values()));
   }
 
diff --git a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/Block.php b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/Block.php
index fa4651806b0b53db5c2fffb807e6ae82f9d00711..b27364153da323e88d8e2fcf68d9b4d3b7c38165 100644
--- a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/Block.php
+++ b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/Block.php
@@ -45,10 +45,10 @@ public function query() {
   /**
    * {@inheritdoc}
    */
-  protected function runQuery() {
+  protected function initializeIterator() {
     $this->defaultTheme = $this->variableGet('theme_default', 'Garland');
     $this->adminTheme = $this->variableGet('admin_theme', null);
-    return parent::runQuery();
+    return parent::initializeIterator();
   }
 
   /**
diff --git a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/CommentVariable.php b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/CommentVariable.php
index 333e6fa26b0a812478e32e955ad7ec151cba7cc3..8dfb5a8ffb7853d2b384f3a223f629ed8d262d05 100644
--- a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/CommentVariable.php
+++ b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/CommentVariable.php
@@ -19,7 +19,7 @@ class CommentVariable extends DrupalSqlBase {
   /**
    * {@inheritdoc}
    */
-  protected function runQuery() {
+  protected function initializeIterator() {
     return new \ArrayIterator($this->getCommentVariables());
   }
 
diff --git a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/ContactSettings.php b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/ContactSettings.php
index ad0dc28f87ddad229c88a22407d834e196925009..8c4076320e03e4af210be76241d77e87255b5d1c 100644
--- a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/ContactSettings.php
+++ b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/ContactSettings.php
@@ -18,7 +18,7 @@ class ContactSettings extends Variable {
   /**
    * {@inheritdoc}
    */
-  function runQuery() {
+  function initializeIterator() {
     $default_category = $this->select('contact', 'c')
       ->fields('c', array('cid'))
       ->condition('selected', 1)
diff --git a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/FieldInstancePerFormDisplay.php b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/FieldInstancePerFormDisplay.php
index 2678b69e07986f212f53d2cd0c1fe8087a295cbf..5bbb04d920379a177ea366dafb6e88a0e9bb6399 100644
--- a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/FieldInstancePerFormDisplay.php
+++ b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/FieldInstancePerFormDisplay.php
@@ -21,7 +21,7 @@ class FieldInstancePerFormDisplay extends DrupalSqlBase {
   /**
    * {@inheritdoc}
    */
-  protected function runQuery() {
+  protected function initializeIterator() {
     $rows = array();
     $result = $this->prepareQuery()->execute();
     while ($field_row = $result->fetchAssoc()) {
diff --git a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/FieldInstancePerViewMode.php b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/FieldInstancePerViewMode.php
index ae6d60c0e55ccd35df77dd5ff62c644515a6b2bc..a0cf124ca3a26dcc6385bd5374180a94acb9679c 100644
--- a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/FieldInstancePerViewMode.php
+++ b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/FieldInstancePerViewMode.php
@@ -20,7 +20,7 @@ class FieldInstancePerViewMode extends ViewModeBase {
   /**
    * {@inheritdoc}
    */
-  protected function runQuery() {
+  protected function initializeIterator() {
     $rows = array();
     $result = $this->prepareQuery()->execute();
     while ($field_row = $result->fetchAssoc()) {
diff --git a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/File.php b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/File.php
index f7f30b1cd834bf37d59498eff37483fcf21dcf5d..9401decb62e5d66a78a31bb08d04a973d652efe8 100644
--- a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/File.php
+++ b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/File.php
@@ -62,14 +62,14 @@ public function query() {
   /**
    * {@inheritdoc}
    */
-  protected function runQuery() {
+  protected function initializeIterator() {
     $conf_path = isset($this->configuration['conf_path']) ? $this->configuration['conf_path'] : 'sites/default';
     $this->filePath = $this->variableGet('file_directory_path', $conf_path . '/files') . '/';
     $this->tempFilePath = $this->variableGet('file_directory_temp', '/tmp') . '/';
 
     // FILE_DOWNLOADS_PUBLIC == 1 and FILE_DOWNLOADS_PRIVATE == 2.
     $this->isPublic = $this->variableGet('file_downloads', 1) == 1;
-    return parent::runQuery();
+    return parent::initializeIterator();
   }
 
   /**
diff --git a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/Node.php b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/Node.php
index 10ea3b4716e6d0e4dce5cf1de90fad7e7d7b0ed3..d7579fde10c7d424d6ba425570a50e721dd159f5 100644
--- a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/Node.php
+++ b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/Node.php
@@ -75,9 +75,9 @@ public function query() {
   /**
    * {@inheritdoc}
    */
-  protected function runQuery() {
+  protected function initializeIterator() {
     $this->filterDefaultFormat = $this->variableGet('filter_default_format', '1');
-    return parent::runQuery();
+    return parent::initializeIterator();
   }
 
   /**
diff --git a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/NodeType.php b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/NodeType.php
index 3b2e44373020bd72ff31972e58e063df560cf9d0..cfab0d41a0bb8f00cbae5c9ff1944034575ad7af 100644
--- a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/NodeType.php
+++ b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/NodeType.php
@@ -88,11 +88,11 @@ public function fields() {
   /**
    * {@inheritdoc}
    */
-  protected function runQuery() {
+  protected function initializeIterator() {
     $this->teaserLength = $this->variableGet('teaser_length', 600);
     $this->nodePreview = $this->variableGet('node_preview', 0);
     $this->themeSettings = $this->variableGet('theme_settings', array());
-    return parent::runQuery();
+    return parent::initializeIterator();
   }
 
   /**
diff --git a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/Role.php b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/Role.php
index 5ba763f7560419dbe48f81dd20d3f15ec1a5d5a0..31be2035117b55a3d4811a24f6cff1ae7664dfb9 100644
--- a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/Role.php
+++ b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/Role.php
@@ -49,7 +49,7 @@ public function fields() {
   /**
    * {@inheritdoc}
    */
-  protected function runQuery() {
+  protected function initializeIterator() {
     $filter_roles = $this->select('filter_formats', 'f')
       ->fields('f', array('format', 'roles'))
       ->execute()
@@ -61,7 +61,7 @@ protected function runQuery() {
         $this->filterPermissions[$rid][] = $format;
       }
     }
-    return parent::runQuery();
+    return parent::initializeIterator();
   }
 
   /**
diff --git a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/UploadInstance.php b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/UploadInstance.php
index 7c9446a3978672c15d36603d5ac4624c18a38048..6d5f891f27cf0bd955747b97d299efae964a24fc 100644
--- a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/UploadInstance.php
+++ b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/UploadInstance.php
@@ -22,7 +22,7 @@ class UploadInstance extends DrupalSqlBase {
   /**
    * {@inheritdoc}
    */
-  protected function runQuery() {
+  protected function initializeIterator() {
     $prefix = 'upload';
     $node_types = $this->getDatabase()->query('SELECT type FROM {node_type}')->fetchCol();
     foreach ($node_types as $node_type) {
diff --git a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/UserPictureFile.php b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/UserPictureFile.php
index e478e1d5234df22af2aa4449adc2a58897b57c8b..375629708e6b5098de6c0aa6e97084c0bb9de51a 100644
--- a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/UserPictureFile.php
+++ b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/UserPictureFile.php
@@ -46,11 +46,11 @@ public function query() {
   /**
    * {@inheritdoc}
    */
-  public function runQuery() {
+  public function initializeIterator() {
     $conf_path = isset($this->configuration['conf_path']) ? $this->configuration['conf_path'] : 'sites/default';
     $this->filePath = $this->variableGet('file_directory_path', $conf_path . '/files') . '/';
     $this->tempFilePath = $this->variableGet('file_directory_temp', '/tmp') . '/';
-    return parent::runQuery();
+    return parent::initializeIterator();
   }
 
   /**
diff --git a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/UserPictureInstance.php b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/UserPictureInstance.php
index 03402ce64c53b11196847cfb9ffa47ce9a168c47..667a5518ec34f3a86b5e7d1da3ce312813e417a4 100644
--- a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/UserPictureInstance.php
+++ b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/UserPictureInstance.php
@@ -23,7 +23,7 @@ class UserPictureInstance extends DrupalSqlBase {
   /**
    * {@inheritdoc}
    */
-  public function runQuery() {
+  public function initializeIterator() {
     return new \ArrayIterator(array(
       array(
         'id' => '',
diff --git a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/ViewMode.php b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/ViewMode.php
index e35053d039037f78a7cf8509a13ec0e699e591a4..a84a16a15b1e83e8a5d470d7e28bfc20914a6571 100644
--- a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/ViewMode.php
+++ b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/ViewMode.php
@@ -20,7 +20,7 @@ class ViewMode extends ViewModeBase {
   /**
    * {@inheritdoc}
    */
-  protected function runQuery() {
+  protected function initializeIterator() {
     $rows = array();
     $result = $this->prepareQuery()->execute();
     while ($field_row = $result->fetchAssoc()) {
diff --git a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/ViewModeBase.php b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/ViewModeBase.php
index a32f8ca9aa6ccbcaca14c426bdca0c90d4e96e80..8e0588738432d4b4b5340eb414b2fbaa880e4088 100644
--- a/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/ViewModeBase.php
+++ b/core/modules/migrate_drupal/src/Plugin/migrate/source/d6/ViewModeBase.php
@@ -19,7 +19,7 @@ abstract class ViewModeBase extends DrupalSqlBase {
    * {@inheritdoc}
    */
   public function count() {
-    return count($this->runQuery());
+    return count($this->initializeIterator());
   }
 
   /**