Commit bf02c1b1 authored by alexpott's avatar alexpott

Issue #2309695 by quietone, alexpott, mikeryan, benjy: Add query batching to SqlBase

parent 37c07a73
......@@ -310,13 +310,13 @@ public function next() {
while (!isset($this->currentRow) && $this->getIterator()->valid()) {
$row_data = $this->getIterator()->current() + $this->configuration;
$this->getIterator()->next();
$this->fetchNextRow();
$row = new Row($row_data, $this->migration->getSourcePlugin()->getIds(), $this->migration->getDestinationIds());
// Populate the source key for this row.
$this->currentSourceIds = $row->getSourceIdValues();
// Pick up the existing map row, if any, unless getNextRow() did it.
// Pick up the existing map row, if any, unless fetchNextRow() did it.
if (!$this->mapRowAdded && ($id_map = $this->idMap->getRowBySource($this->currentSourceIds))) {
$row->setIdMap($id_map);
}
......@@ -348,7 +348,14 @@ public function next() {
}
/**
* Checks if the incoming data is newer than what we've previously imported.
* Position the iterator to the following row.
*/
protected function fetchNextRow() {
$this->getIterator()->next();
}
/**
* Check if the incoming data is newer than what we've previously imported.
*
* @param \Drupal\migrate\Row $row
* The row we're importing.
......
......@@ -5,6 +5,7 @@
use Drupal\Core\Database\Database;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\State\StateInterface;
use Drupal\migrate\MigrateException;
use Drupal\migrate\Plugin\MigrationInterface;
use Drupal\migrate\Plugin\migrate\id_map\Sql;
use Drupal\migrate\Plugin\MigrateIdMapInterface;
......@@ -42,6 +43,22 @@ abstract class SqlBase extends SourcePluginBase implements ContainerFactoryPlugi
*/
protected $state;
/**
* The count of the number of batches run.
*
* @var int
*/
protected $batch = 0;
/**
* Number of records to fetch from the database during each batch.
*
* A value of zero indicates no batching is to be done.
*
* @var int
*/
protected $batchSize = 0;
/**
* {@inheritdoc}
*/
......@@ -160,68 +177,108 @@ protected function prepareQuery() {
* we will take advantage of the PDO-based API to optimize the query up-front.
*/
protected function initializeIterator() {
$this->prepareQuery();
// Initialize the batch size.
if ($this->batchSize == 0 && isset($this->configuration['batch_size'])) {
// Valid batch sizes are integers >= 0.
if (is_int($this->configuration['batch_size']) && ($this->configuration['batch_size']) >= 0) {
$this->batchSize = $this->configuration['batch_size'];
}
else {
throw new MigrateException("batch_size must be greater than or equal to zero");
}
}
// If a batch has run the query is already setup.
if ($this->batch == 0) {
$this->prepareQuery();
// Get the key values, for potential use in joining to the map table.
$keys = array();
// Get the key values, for potential use in joining to the map table.
$keys = array();
// The rules for determining what conditions to add to the query are as
// follows (applying first applicable rule):
// 1. If the map is joinable, join it. We will want to accept all rows
// which are either not in the map, or marked in the map as NEEDS_UPDATE.
// Note that if high water fields are in play, we want to accept all rows
// above the high water mark in addition to those selected by the map
// conditions, so we need to OR them together (but AND with any existing
// conditions in the query). So, ultimately the SQL condition will look
// like (original conditions) AND (map IS NULL OR map needs update
// OR above high water).
$conditions = $this->query->orConditionGroup();
$condition_added = FALSE;
if (empty($this->configuration['ignore_map']) && $this->mapJoinable()) {
// Build the join to the map table. Because the source key could have
// multiple fields, we need to build things up.
$count = 1;
$map_join = '';
$delimiter = '';
foreach ($this->getIds() as $field_name => $field_schema) {
if (isset($field_schema['alias'])) {
$field_name = $field_schema['alias'] . '.' . $this->query->escapeField($field_name);
// The rules for determining what conditions to add to the query are as
// follows (applying first applicable rule):
// 1. If the map is joinable, join it. We will want to accept all rows
// which are either not in the map, or marked in the map as NEEDS_UPDATE.
// Note that if high water fields are in play, we want to accept all rows
// above the high water mark in addition to those selected by the map
// conditions, so we need to OR them together (but AND with any existing
// conditions in the query). So, ultimately the SQL condition will look
// like (original conditions) AND (map IS NULL OR map needs update
// OR above high water).
$conditions = $this->query->orConditionGroup();
$condition_added = FALSE;
if (empty($this->configuration['ignore_map']) && $this->mapJoinable()) {
// Build the join to the map table. Because the source key could have
// multiple fields, we need to build things up.
$count = 1;
$map_join = '';
$delimiter = '';
foreach ($this->getIds() as $field_name => $field_schema) {
if (isset($field_schema['alias'])) {
$field_name = $field_schema['alias'] . '.' . $this->query->escapeField($field_name);
}
$map_join .= "$delimiter$field_name = map.sourceid" . $count++;
$delimiter = ' AND ';
}
$map_join .= "$delimiter$field_name = map.sourceid" . $count++;
$delimiter = ' AND ';
}
$alias = $this->query->leftJoin($this->migration->getIdMap()->getQualifiedMapTableName(), 'map', $map_join);
$conditions->isNull($alias . '.sourceid1');
$conditions->condition($alias . '.source_row_status', MigrateIdMapInterface::STATUS_NEEDS_UPDATE);
$condition_added = TRUE;
$alias = $this->query->leftJoin($this->migration->getIdMap()
->getQualifiedMapTableName(), 'map', $map_join);
$conditions->isNull($alias . '.sourceid1');
$conditions->condition($alias . '.source_row_status', MigrateIdMapInterface::STATUS_NEEDS_UPDATE);
$condition_added = TRUE;
// And as long as we have the map table, add its data to the row.
$n = count($this->getIds());
for ($count = 1; $count <= $n; $count++) {
$map_key = 'sourceid' . $count;
$this->query->addField($alias, $map_key, "migrate_map_$map_key");
}
if ($n = count($this->migration->getDestinationIds())) {
// And as long as we have the map table, add its data to the row.
$n = count($this->getIds());
for ($count = 1; $count <= $n; $count++) {
$map_key = 'destid' . $count++;
$map_key = 'sourceid' . $count;
$this->query->addField($alias, $map_key, "migrate_map_$map_key");
}
if ($n = count($this->migration->getDestinationIds())) {
for ($count = 1; $count <= $n; $count++) {
$map_key = 'destid' . $count++;
$this->query->addField($alias, $map_key, "migrate_map_$map_key");
}
}
$this->query->addField($alias, 'source_row_status', 'migrate_map_source_row_status');
}
// 2. If we are using high water marks, also include rows above the mark.
// But, include all rows if the high water mark is not set.
if ($this->getHighWaterProperty() && ($high_water = $this->getHighWater()) !== '') {
$high_water_field = $this->getHighWaterField();
$conditions->condition($high_water_field, $high_water, '>');
$this->query->orderBy($high_water_field);
}
if ($condition_added) {
$this->query->condition($conditions);
}
$this->query->addField($alias, 'source_row_status', 'migrate_map_source_row_status');
}
// 2. If we are using high water marks, also include rows above the mark.
// But, include all rows if the high water mark is not set.
if ($this->getHighWaterProperty() && ($high_water = $this->getHighWater()) !== '') {
$high_water_field = $this->getHighWaterField();
$conditions->condition($high_water_field, $high_water, '>');
$this->query->orderBy($high_water_field);
// Download data in batches for performance.
if (($this->batchSize > 0)) {
$this->query->range($this->batch * $this->batchSize, $this->batchSize);
}
if ($condition_added) {
$this->query->condition($conditions);
return new \IteratorIterator($this->query->execute());
}
/**
* Position the iterator to the following row.
*/
protected function fetchNextRow() {
$this->getIterator()->next();
// We might be out of data entirely, or just out of data in the current
// batch. Attempt to fetch the next batch and see.
if ($this->batchSize > 0 && !$this->getIterator()->valid()) {
$this->fetchNextBatch();
}
}
return new \IteratorIterator($this->query->execute());
/**
* Prepares query for the next set of data from the source database.
*/
protected function fetchNextBatch() {
$this->batch++;
unset($this->iterator);
$this->getIterator()->rewind();
}
/**
......@@ -249,6 +306,14 @@ protected function mapJoinable() {
if (!$this->getIds()) {
return FALSE;
}
// With batching, we want a later batch to return the same rows that would
// have been returned at the same point within a monolithic query. If we
// join to the map table, the first batch is writing to the map table and
// thus affecting the results of subsequent batches. To be safe, we avoid
// joining to the map table when batching.
if ($this->batchSize > 0) {
return FALSE;
}
$id_map = $this->migration->getIdMap();
if (!$id_map instanceof Sql) {
return FALSE;
......
type: module
name: Migrate query batch Source test
description: 'Provides a database table and records for SQL import with batch testing.'
package: Testing
core: 8.x
dependencies:
- migrate
<?php
namespace Drupal\migrate_query_batch_test\Plugin\migrate\source;
use Drupal\migrate\Plugin\migrate\source\SqlBase;
/**
* Source plugin for migration high water tests.
*
* @MigrateSource(
* id = "query_batch_test"
* )
*/
class QueryBatchTest extends SqlBase {
/**
* {@inheritdoc}
*/
public function query() {
return ($this->select('query_batch_test', 'q')->fields('q'));
}
/**
* {@inheritdoc}
*/
public function fields() {
$fields = [
'id' => $this->t('Id'),
'data' => $this->t('data'),
];
return $fields;
}
/**
* {@inheritdoc}
*/
public function getIds() {
return [
'id' => [
'type' => 'integer',
],
];
}
}
<?php
namespace Drupal\Tests\migrate\Kernel;
use Drupal\KernelTests\KernelTestBase;
use Drupal\migrate\MigrateException;
use Drupal\migrate\Plugin\MigrateIdMapInterface;
use Drupal\migrate\Plugin\MigrationInterface;
use Drupal\Core\Database\Driver\sqlite\Connection;
/**
* Tests query batching.
*
* @covers \Drupal\migrate_query_batch_test\Plugin\migrate\source\QueryBatchTest
* @group migrate
*/
class QueryBatchTest extends KernelTestBase {
/**
* The mocked migration.
*
* @var MigrationInterface|\Prophecy\Prophecy\ObjectProphecy
*/
protected $migration;
/**
* {@inheritdoc}
*/
public static $modules = [
'migrate',
'migrate_query_batch_test',
];
/**
* {@inheritdoc}
*/
protected function setUp() {
parent::setUp();
// Create a mock migration. This will be injected into the source plugin
// under test.
$this->migration = $this->prophesize(MigrationInterface::class);
$this->migration->id()->willReturn(
$this->randomMachineName(16)
);
// Prophesize a useless ID map plugin and an empty set of destination IDs.
// Calling code can override these prophecies later and set up different
// behaviors.
$this->migration->getIdMap()->willReturn(
$this->prophesize(MigrateIdMapInterface::class)->reveal()
);
$this->migration->getDestinationIds()->willReturn([]);
}
/**
* Tests a negative batch size throws an exception.
*/
public function testBatchSizeNegative() {
$this->setExpectedException(MigrateException::class, 'batch_size must be greater than or equal to zero');
$plugin = $this->getPlugin(['batch_size' => -1]);
$plugin->next();
}
/**
* Tests a non integer batch size throws an exception.
*/
public function testBatchSizeNonInteger() {
$this->setExpectedException(MigrateException::class, 'batch_size must be greater than or equal to zero');
$plugin = $this->getPlugin(['batch_size' => '1']);
$plugin->next();
}
/**
* {@inheritdoc}
*/
public function queryDataProvider() {
// Define the parameters for building the data array. The first element is
// the number of source data rows, the second is the batch size to set on
// the plugin configuration.
$test_parameters = [
// Test when batch size is 0.
[200, 0],
// Test when rows mod batch size is 0.
[200, 20],
// Test when rows mod batch size is > 0.
[200, 30],
// Test when batch size = row count.
[200, 200],
// Test when batch size > row count.
[200, 300],
];
// Build the data provider array. The provider array consists of the source
// data rows, the expected result data, the expected count, the plugin
// configuration, the expected batch size and the expected batch count.
$table = 'query_batch_test';
$tests = [];
$data_set = 0;
foreach ($test_parameters as $data) {
list($num_rows, $batch_size) = $data;
for ($i = 0; $i < $num_rows; $i++) {
$tests[$data_set]['source_data'][$table][] = [
'id' => $i,
'data' => $this->randomString(),
];
}
$tests[$data_set]['expected_data'] = $tests[$data_set]['source_data'][$table];
$tests[$data_set][2] = $num_rows;
// Plugin configuration array.
$tests[$data_set][3] = ['batch_size' => $batch_size];
// Expected batch size.
$tests[$data_set][4] = $batch_size;
// Expected batch count is 0 unless a batch size is set.
$expected_batch_count = 0;
if ($batch_size > 0) {
$expected_batch_count = (int) ($num_rows / $batch_size);
if ($num_rows % $batch_size) {
// If there is a remainder an extra batch is needed to get the
// remaining rows.
$expected_batch_count++;
}
}
$tests[$data_set][5] = $expected_batch_count;
$data_set++;
}
return $tests;
}
/**
* Tests query batch size.
*
* @param array $source_data
* The source data, keyed by table name. Each table is an array containing
* the rows in that table.
* @param array $expected_data
* The result rows the plugin is expected to return.
* @param int $num_rows
* How many rows the source plugin is expected to return.
* @param array $configuration
* Configuration for the source plugin specifying the batch size.
* @param int $expected_batch_size
* The expected batch size, will be set to zero for invalid batch sizes.
* @param int $expected_batch_count
* The total number of batches.
*
* @dataProvider queryDataProvider
*/
public function testQueryBatch($source_data, $expected_data, $num_rows, $configuration, $expected_batch_size, $expected_batch_count) {
$plugin = $this->getPlugin($configuration);
// Since we don't yet inject the database connection, we need to use a
// reflection hack to set it in the plugin instance.
$reflector = new \ReflectionObject($plugin);
$property = $reflector->getProperty('database');
$property->setAccessible(TRUE);
$connection = $this->getDatabase($source_data);
$property->setValue($plugin, $connection);
// Test the results.
$i = 0;
/** @var \Drupal\migrate\Row $row */
foreach ($plugin as $row) {
$expected = $expected_data[$i++];
$actual = $row->getSource();
foreach ($expected as $key => $value) {
$this->assertArrayHasKey($key, $actual);
$this->assertSame((string) $value, (string) $actual[$key]);
}
}
// Test that all rows were retrieved.
self::assertSame($num_rows, $i);
// Test the batch size.
if (is_null($expected_batch_size)) {
$expected_batch_size = $configuration['batch_size'];
}
$property = $reflector->getProperty('batchSize');
$property->setAccessible(TRUE);
self::assertSame($expected_batch_size, $property->getValue($plugin));
// Test the batch count.
if (is_null($expected_batch_count)) {
$expected_batch_count = intdiv($num_rows, $expected_batch_size);
if ($num_rows % $configuration['batch_size']) {
$expected_batch_count++;
}
}
$property = $reflector->getProperty('batch');
$property->setAccessible(TRUE);
self::assertSame($expected_batch_count, $property->getValue($plugin));
}
/**
* Instantiates the source plugin under test.
*
* @param array $configuration
* The source plugin configuration.
*
* @return \Drupal\migrate\Plugin\MigrateSourceInterface|object
* The fully configured source plugin.
*/
protected function getPlugin($configuration) {
/** @var \Drupal\migrate\Plugin\MigratePluginManager $plugin_manager */
$plugin_manager = $this->container->get('plugin.manager.migrate.source');
$plugin = $plugin_manager->createInstance('query_batch_test', $configuration, $this->migration->reveal());
$this->migration
->getSourcePlugin()
->willReturn($plugin);
return $plugin;
}
/**
* Builds an in-memory SQLite database from a set of source data.
*
* @param array $source_data
* The source data, keyed by table name. Each table is an array containing
* the rows in that table.
*
* @return \Drupal\Core\Database\Driver\sqlite\Connection
* The SQLite database connection.
*/
protected function getDatabase(array $source_data) {
// Create an in-memory SQLite database. Plugins can interact with it like
// any other database, and it will cease to exist when the connection is
// closed.
$connection_options = ['database' => ':memory:'];
$pdo = Connection::open($connection_options);
$connection = new Connection($pdo, $connection_options);
// Create the tables and fill them with data.
foreach ($source_data as $table => $rows) {
// Use the biggest row to build the table schema.
$counts = array_map('count', $rows);
asort($counts);
end($counts);
$pilot = $rows[key($counts)];
$connection->schema()
->createTable($table, [
// SQLite uses loose affinity typing, so it's OK for every field to
// be a text field.
'fields' => array_map(function () {
return ['type' => 'text'];
}, $pilot),
]);
$fields = array_keys($pilot);
$insert = $connection->insert($table)->fields($fields);
array_walk($rows, [$insert, 'values']);
$insert->execute();
}
return $connection;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment