Commit 1c1bcc41 authored by mikeryan's avatar mikeryan Committed by heddn
Browse files

Issue #3090853 by mikeryan, heddn: Option for bulk inserting of rows with table destination

parent c32dfc41
......@@ -5,6 +5,8 @@ namespace Drupal\migrate_plus\Plugin\migrate\destination;
use Drupal\Core\Database\Connection;
use Drupal\Core\Database\Database;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\migrate\Event\ImportAwareInterface;
use Drupal\migrate\Event\MigrateImportEvent;
use Drupal\migrate\MigrateException;
use Drupal\migrate\MigrateSkipProcessException;
use Drupal\migrate\Plugin\MigrationInterface;
......@@ -21,7 +23,7 @@ use Symfony\Component\DependencyInjection\ContainerInterface;
* id = "table"
* )
*/
class Table extends DestinationBase implements ContainerFactoryPluginInterface {
class Table extends DestinationBase implements ContainerFactoryPluginInterface, ImportAwareInterface {
/**
* The name of the destination table.
......@@ -51,6 +53,27 @@ class Table extends DestinationBase implements ContainerFactoryPluginInterface {
*/
protected $dbConnection;
/**
* Maximum number of rows to insert in one query.
*
* @var int
*/
protected $batchSize;
/**
* The query object being built row-by-row.
*
* @var array
*/
protected $rowsToInsert = [];
/**
* The highest ID seen or created so far on this table.
*
* @var int
*/
protected $lastId = 0;
/**
* Constructs a new Table.
*
......@@ -71,6 +94,7 @@ class Table extends DestinationBase implements ContainerFactoryPluginInterface {
$this->tableName = $configuration['table_name'];
$this->idFields = $configuration['id_fields'];
$this->fields = isset($configuration['fields']) ? $configuration['fields'] : [];
$this->batchSize = isset($configuration['batch_size']) ? $configuration['batch_size'] : 1;
$this->supportsRollback = TRUE;
}
......@@ -79,7 +103,6 @@ class Table extends DestinationBase implements ContainerFactoryPluginInterface {
*/
public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration = NULL) {
$db_key = !empty($configuration['database_key']) ? $configuration['database_key'] : NULL;
return new static(
$configuration,
$plugin_id,
......@@ -110,6 +133,9 @@ class Table extends DestinationBase implements ContainerFactoryPluginInterface {
* {@inheritdoc}
*/
public function import(Row $row, array $old_destination_id_values = []) {
$ids = [];
// Skip batching (if configured) for updates.
$batch_inserts = ($this->batchSize > 1 && empty($old_destination_id_values));
$ids = [];
foreach ($this->idFields as $field => $fieldInfo) {
if ($row->hasDestinationProperty($field)) {
......@@ -118,16 +144,51 @@ class Table extends DestinationBase implements ContainerFactoryPluginInterface {
elseif (!$row->hasDestinationProperty($field) && empty($fieldInfo['use_auto_increment'])) {
throw new MigrateSkipProcessException('All the id fields are required for a table migration.');
}
// When batching, we do the auto-incrementing ourselves.
elseif ($batch_inserts && $fieldInfo['use_auto_increment']) {
if (count($this->rowsToInsert) === 0) {
// Get the highest existing ID, so we will create IDs above it.
$this->lastId = $this->dbConnection->query("SELECT MAX($field) AS MaxId FROM {{$this->tableName}}")
->fetchField();
if (!$this->lastId) {
$this->lastId = 0;
}
}
$id = ++$this->lastId;
$ids[$field] = $id;
$row->setDestinationProperty($field, $id);
}
}
$values = $row->getDestination();
// When batching, make sure we have the same properties in the same order
// every time.
if ($batch_inserts) {
$destination_properties = array_keys($this->migration->getProcess());
$destination_properties = array_merge($destination_properties,
array_keys($this->idFields));
sort($destination_properties);
$destination_values = $row->getDestination();
foreach ($destination_properties as $property_name) {
$values[$property_name] = $destination_values[$property_name] ?? NULL;
}
}
else {
$values = $row->getDestination();
}
if ($this->fields) {
$values = array_intersect_key($values, $this->fields);
}
if ($batch_inserts) {
$this->rowsToInsert[] = $values;
if (count($this->rowsToInsert) >= $this->batchSize) {
$this->flushInserts();
}
$status = TRUE;
}
// Row contains empty id field with use_auto_increment enabled.
if (count($ids) < count($this->idFields)) {
elseif (count($ids) < count($this->idFields)) {
$status = $id = $this->dbConnection->insert($this->tableName)
->fields($values)
->execute();
......@@ -140,11 +201,10 @@ class Table extends DestinationBase implements ContainerFactoryPluginInterface {
}
else {
$status = $this->dbConnection->merge($this->tableName)
->key($ids)
->keys($ids)
->fields($values)
->execute();
}
return $status ? $ids : NULL;
}
......@@ -159,4 +219,43 @@ class Table extends DestinationBase implements ContainerFactoryPluginInterface {
$delete->execute();
}
/**
* Execute the insert query and reset everything.
*/
public function flushInserts() {
if (count($this->rowsToInsert) > 0) {
$batch_query = $this->dbConnection->insert($this->tableName)
->fields(array_keys($this->rowsToInsert[0]));
foreach ($this->rowsToInsert as $row) {
$batch_query->values(array_values($row));
}
// Empty the queue first, so if the statement throws an error we don't
// end up here trying to execute the same statement (plus one row).
$this->rowsToInsert = [];
$batch_query->execute();
}
}
/**
* {@inheritDoc}
*/
public function preImport(MigrateImportEvent $event) {
}
/**
* {@inheritDoc}
*/
public function postImport(MigrateImportEvent $event) {
// At the conclusion of a given migration, make sure batched inserts go in.
$this->flushInserts();
}
/**
* Make absolutely sure batched inserts are processed (especially for stubs).
*/
public function __destruct() {
// At the conclusion of a given migration, make sure batched inserts go in.
$this->flushInserts();
}
}
<?php
namespace Drupal\Tests\migrate_plus\Kernel;
/**
* Verifies all tests pass with batching enabled, uneven batches.
*
* @group migrate
*/
class MigrateTableBatchTest extends MigrateTableTest {
/**
* The batch size to configure (a size of 1 disables batching).
*
* @var int
*/
protected $batchSize = 2;
}
<?php
namespace Drupal\Tests\migrate_plus\Kernel;
/**
* Verifies all tests pass with batching enabled, even batches.
*
* @group migrate
*/
class MigrateTableEvenBatchTest extends MigrateTableTest {
/**
* The batch size to configure (a size of 1 disables batching).
*
* @var int
*/
protected $batchSize = 3;
}
<?php
namespace Drupal\Tests\migrate_plus\Kernel;
/**
* Verifies all tests pass with batching enabled, uneven batches.
*
* @group migrate
*/
class MigrateTableIncrementBatchTest extends MigrateTableIncrementTest {
/**
* The batch size to configure.
*
* @var int
*/
protected $batchSize = 2;
}
<?php
namespace Drupal\Tests\migrate_plus\Kernel;
/**
* Verifies all tests pass with batching enabled, even batches.
*
* @group migrate
*/
class MigrateTableIncrementEvenBatchTest extends MigrateTableIncrementTest {
/**
* The batch size to configure.
*
* @var int
*/
protected $batchSize = 3;
}
......@@ -2,7 +2,6 @@
namespace Drupal\Tests\migrate_plus\Kernel;
use Drupal\Core\Database\Database;
use Drupal\migrate\MigrateExecutable;
use Drupal\Tests\migrate\Kernel\MigrateTestBase;
......
......@@ -3,6 +3,8 @@
namespace Drupal\Tests\migrate_plus\Kernel;
use Drupal\migrate\MigrateExecutable;
use Drupal\migrate\Plugin\MigrateIdMapInterface;
use Drupal\migrate\Row;
use Drupal\Tests\migrate\Kernel\MigrateTestBase;
/**
......@@ -22,6 +24,13 @@ class MigrateTableTest extends MigrateTestBase {
*/
protected $connection;
/**
* The batch size to configure (a size of 1 disables batching).
*
* @var int
*/
protected $batchSize = 1;
/**
* {@inheritdoc}
*/
......@@ -95,6 +104,56 @@ class MigrateTableTest extends MigrateTestBase {
parent::tearDown();
}
/**
* Create a minimally valid migration with some source data.
*
* @return array
* The migration definition.
*/
public function tableDestinationMigration() {
return [
'dummy table' => [
[
'id' => 'migration_table_test',
'migration_tags' => ['Testing'],
'source' => [
'plugin' => 'embedded_data',
'data_rows' => [
[
'data' => 'dummy1 value1',
'data2' => 'dummy2 value1',
],
[
'data' => 'dummy1 value2',
'data2' => 'dummy2 value2',
],
[
'data' => 'dummy1 value3',
'data2' => 'dummy2 value3',
],
],
'ids' => [
'data' => ['type' => 'string'],
],
],
'destination' => [
'plugin' => 'table',
'table_name' => static::DEST_TABLE_NAME,
'id_fields' => [
'data' => [
'type' => 'string',
],
],
],
'process' => [
'data' => 'data',
'data2' => 'data2',
],
],
],
];
}
/**
* Tests table migration.
*/
......@@ -108,6 +167,7 @@ class MigrateTableTest extends MigrateTestBase {
'id_fields' => [
'data' => ['type' => 'string'],
],
'ignore_map' => TRUE,
],
'destination' => [
'plugin' => 'table',
......@@ -115,6 +175,7 @@ class MigrateTableTest extends MigrateTestBase {
'id_fields' => [
'data' => ['type' => 'string'],
],
'batch_size' => $this->batchSize,
],
'process' => [
'data' => 'data',
......@@ -148,4 +209,28 @@ class MigrateTableTest extends MigrateTestBase {
$this->assertEquals(0, count($values));
}
/**
* Tests table update.
*
* @dataProvider tableDestinationMigration
*/
public function testTableUpdate(array $definition) {
// Make sure migration overwrites the original data for the first row.
$original_values = [
'data' => 'dummy value',
'data2' => 'original value 2',
'data3' => 'original value 3',
];
$this->connection->insert(static::DEST_TABLE_NAME)
->fields($original_values)
->execute();
/** @var \Drupal\migrate\Plugin\MigrationInterface $migration */
$migration = \Drupal::service('plugin.manager.migration')
->createStubMigration($definition);
$migration->getIdMap()->saveIdMapping(new Row($original_values,
['data' => 'dummy value']), ['data' => 'dummy value'], MigrateIdMapInterface::STATUS_NEEDS_UPDATE);
$this->testTableMigration();
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment