Skip to content
Snippets Groups Projects
Commit d108039c authored by Ken Mortimer's avatar Ken Mortimer
Browse files

Resolve #3437982 "Fixing an issue"

parent ad38a0b5
No related branches found
Tags 2.0.1
1 merge request!23Resolve #3437982 "Fixing an issue"
Pipeline #343073 passed with warnings
Showing
with 407 additions and 238 deletions
......@@ -49,7 +49,7 @@ include:
# Docs at https://git.drupalcode.org/project/gitlab_templates/-/blob/1.0.x/includes/include.drupalci.variables.yml
################
variables:
_TARGET_PHP: "8.1"
_TARGET_PHP: "8.3"
# SKIP_ESLINT: '1'
# Broaden test coverage. Current Drupal version is 10.1.x.
# Opt in to testing previous & next minor (Drupal 10.0.x and 10.2.x).
......@@ -57,8 +57,6 @@ variables:
OPT_IN_TEST_NEXT_MINOR: '1'
# Opt in to testing previous major (Drupal 9.5.x).
# OPT_IN_TEST_PREVIOUS_MAJOR: '1'
# Show more log output
_PHPUNIT_EXTRA: '--verbose'
# Convenient, and we have no secrets.
_SHOW_ENVIRONMENT_VARIABLES: '1'
ELASTICSEARCH_URL: 'http://elasticsearch:9200'
......
......@@ -16,13 +16,14 @@
}
},
"require": {
"drupal/core": "^10.1 || ^11",
"drupal/entity": "^1.0",
"softcreatr/jsonpath": "^0.7 || ^0.8",
"php": ">=8.1",
"drupal/core": "^10.1 || ^11"
"softcreatr/jsonpath": "^0.9",
"php": ">=8.3"
},
"require-dev": {
"elasticsearch/elasticsearch": "^7"
"elasticsearch/elasticsearch": "^8",
"dg/bypass-finals": "^1"
},
"suggest": {
"elasticsearch/elasticsearch": "Required for the data_pipelines_elasticsearch module"
......
services:
data_pipelines.commands:
class: Drupal\data_pipelines\Commands\DataPipelinesCommands
arguments: ['@entity_type.manager']
arguments: ['@entity_type.manager', '@database']
tags:
- { name: drush.command }
- { name: drush.command }
\ No newline at end of file
......@@ -5,7 +5,7 @@
* Provides the module install hooks.
*/
use Elasticsearch\Client;
use Elastic\Elasticsearch\Client;
/**
* Implements hook_requirements().
......@@ -16,7 +16,7 @@ function data_pipelines_elasticsearch_requirements($phase): array {
if ($phase == 'runtime') {
// Check for the ElasticSearch library.
$requirements['data_pipelines_elasticsearch']['title'] = t('Data Pipelines ElasticSearch Client');
if (class_exists('\Elasticsearch\Client')) {
if (class_exists(Client::class)) {
$requirements['data_pipelines_elasticsearch'] += [
'value' => Client::VERSION,
];
......
......@@ -15,9 +15,8 @@ use Drupal\data_pipelines\Destination\DatasetDestinationPluginBase;
use Drupal\data_pipelines\Entity\DatasetInterface;
use Drupal\data_pipelines\Entity\DestinationInterface;
use Drupal\data_pipelines_elasticsearch\Exception\CouldNotCreateIndexException;
use Elasticsearch\Client;
use Elasticsearch\ClientBuilder;
use Elasticsearch\Common\Exceptions\ElasticsearchException;
use Elastic\Elasticsearch\Client;
use Elastic\Elasticsearch\ClientBuilder;
use Psr\Log\LoggerInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;
......@@ -64,7 +63,7 @@ class ElasticSearchDestination extends DatasetDestinationPluginBase implements C
/**
* The elasticsearch client.
*
* @var \Elasticsearch\Client
* @var \Elastic\Elasticsearch\Client
*/
protected $client;
......@@ -81,13 +80,10 @@ class ElasticSearchDestination extends DatasetDestinationPluginBase implements C
* The pipeline manager.
* @param \Psr\Log\LoggerInterface $logger
* The logger.
* @param \Elasticsearch\Client|null $client
* The client. Used for testing.
*/
final public function __construct(array $configuration, $plugin_id, $plugin_definition, DatasetPipelinePluginManager $pipeline_manager, LoggerInterface $logger, Client $client = NULL) {
final public function __construct(array $configuration, $plugin_id, $plugin_definition, DatasetPipelinePluginManager $pipeline_manager, LoggerInterface $logger) {
parent::__construct($configuration + $this->defaultConfiguration(), $plugin_id, $plugin_definition);
$this->logger = $logger;
$this->client = $client;
$this->pipelineManager = $pipeline_manager;
}
......@@ -204,18 +200,20 @@ class ElasticSearchDestination extends DatasetDestinationPluginBase implements C
}
/**
* Get an instance of an elasticsearch client.
* A method to get an Elasticsearch client.
*
* @return \Elasticsearch\Client|null
* The elasticsearch client.
* @return \Elastic\Elasticsearch\Client
* The Elasticsearch client.
*
* @throws \Elastic\Elasticsearch\Exception\AuthenticationException
* @throws \Exception
*/
public function getClient(): ?Client {
if (isset($this->client)) {
public function getClient(): Client {
if ($this->client) {
return $this->client;
}
if (empty($this->getUrl())) {
$this->logger->warning("Cluster URL is required to initialise the ElasticSearch client.");
return NULL;
throw new \Exception("Cluster URL is required to initialise the ElasticSearch client");
}
$clientBuilder = ClientBuilder::create()
->setHosts([$this->getUrl()]);
......@@ -228,22 +226,19 @@ class ElasticSearchDestination extends DatasetDestinationPluginBase implements C
}
/**
* Check if the elasticsearch endpoint is available.
* A method to ping the Elasticsearch instance for availability.
*
* @return bool
* TRUE if the server is available, FALSE otherwise.
* The result of the ping.
*/
public function ping(): bool {
if (empty($this->getClient())) {
return FALSE;
}
try {
return $this->getClient()->ping();
return $this->getClient()->ping()->asBool();
}
catch (ElasticsearchException $e) {
catch (\Exception $e) {
Error::logException($this->logger, $e);
return FALSE;
}
return FALSE;
}
/**
......@@ -290,7 +285,7 @@ class ElasticSearchDestination extends DatasetDestinationPluginBase implements C
$index_params['body']['mappings'] = $mappings;
}
try {
if ($this->getClient()->indices()->exists($index_params_base)) {
if ($this->getClient()->indices()->exists($index_params_base)->asBool()) {
return;
}
$this->getClient()->indices()->create($index_params_base + $index_params);
......@@ -304,15 +299,23 @@ class ElasticSearchDestination extends DatasetDestinationPluginBase implements C
/**
* {@inheritdoc}
*/
public function getCurrentCount(DatasetInterface $dataset): int {
public function getLastDelta(DatasetInterface $dataset): int {
$index_id = $this->configuration['prefix'] . $dataset->getMachineName();
$index_params_base = [];
$index_params_base['index'] = $index_id;
$params = [];
$params['index'] = $index_id;
$count = 0;
try {
if ($this->getClient()->indices()->exists($index_params_base)) {
$result = $this->getClient()->count($index_params_base);
$count = $result['count'] ?? 0;
if ($this->getClient()->indices()->exists($params)->asBool()) {
$params['body'] = [
"size" => 1,
"sort" => [
"@delta" => [
"order" => "desc",
],
],
];
$response = $this->getClient()->search($params)->asArray();
$count = (int) $response['hits']['hits'][0]['_source']['@delta'];
}
}
catch (\Exception $e) {
......@@ -372,7 +375,7 @@ class ElasticSearchDestination extends DatasetDestinationPluginBase implements C
'_id' => $dataset->getMachineName() . ':' . $delta,
],
];
$bulk['body'][] = $item->getArrayCopy();
$bulk['body'][] = ['@delta' => $delta] + $item->getArrayCopy();
}
if (count($bulk['body']) > 0) {
$this->getClient()->bulk($bulk);
......
......@@ -4,12 +4,13 @@ declare(strict_types=1);
namespace Drupal\Tests\data_pipelines_elasticsearch\Kernel;
use Drupal\data_pipelines\DatasetPipelinePluginManager;
use Drupal\data_pipelines_elasticsearch\Plugin\DatasetDestination\ElasticSearchDestination;
use Drupal\Tests\data_pipelines\Kernel\DatasetKernelTestBase;
use Drupal\Tests\data_pipelines_elasticsearch\Traits\ElasticsearchTestDestinationTrait;
use Elasticsearch\Client;
use Elasticsearch\Namespaces\IndicesNamespace;
use Drupal\data_pipelines\DatasetPipelinePluginManager;
use Drupal\data_pipelines_elasticsearch\Plugin\DatasetDestination\ElasticSearchDestination;
use Elastic\Elasticsearch\Client;
use Elastic\Elasticsearch\Endpoints\Indices;
use Elastic\Elasticsearch\Response\Elasticsearch;
use Prophecy\Argument;
use Prophecy\PhpUnit\ProphecyTrait;
use Psr\Log\NullLogger;
......@@ -64,8 +65,10 @@ class ElasticsearchDestinationDatasetTest extends DatasetKernelTestBase {
'destinations' => $destination,
'pipeline' => 'elasticsearch_test_pipeline_1',
]);
$indices = $this->prophesize(IndicesNamespace::class);
$indices->exists(Argument::any())->willReturn(FALSE);
$indices = $this->prophesize(Indices::class);
$indices_exists_result = $this->prophesize(Elasticsearch::class);
$indices_exists_result->asBool()->willReturn(FALSE);
$indices->exists(Argument::any())->willReturn($indices_exists_result->reveal());
$params = [
'index' => 'foo_' . $dataset->getMachineName(),
'body' => [
......@@ -101,13 +104,7 @@ class ElasticsearchDestinationDatasetTest extends DatasetKernelTestBase {
$pipeline_manager->hasDefinition(Argument::exact('elasticsearch_test_pipeline_1'))->willReturn(TRUE);
$pipeline_manager->getDefinition(Argument::exact('elasticsearch_test_pipeline_1'))->willReturn(['mappings' => $params['body']['mappings']]);
$configuration = [
'prefix' => 'foo_',
];
$plugin_id = 'foo';
$plugin_definition = [];
$logger = new NullLogger();
$indexer = new ElasticSearchDestination($configuration, $plugin_id, $plugin_definition, $pipeline_manager->reveal(), $logger, $client->reveal());
$indexer = $this->mockIndexer($pipeline_manager->reveal(), new NullLogger(), $client->reveal());
$this->assertTrue($indexer->beginProcessing($dataset));
$this->assertTrue($indexer->processChunk($dataset, iterator_to_array($dataset->getDataIterator())));
}
......@@ -121,8 +118,10 @@ class ElasticsearchDestinationDatasetTest extends DatasetKernelTestBase {
'destinations' => $destination,
'pipeline' => 'elasticsearch_test_pipeline_1',
]);
$indices = $this->prophesize(IndicesNamespace::class);
$indices->exists(Argument::any())->willReturn(TRUE);
$indices = $this->prophesize(Indices::class);
$indices_exists_result = $this->prophesize(Elasticsearch::class);
$indices_exists_result->asBool()->willReturn(TRUE);
$indices->exists(Argument::any())->willReturn($indices_exists_result->reveal());
$client = $this->prophesize(Client::class);
$client->indices()->willReturn($indices->reveal());
......@@ -143,14 +142,8 @@ class ElasticsearchDestinationDatasetTest extends DatasetKernelTestBase {
],
])->willReturn([]);
$configuration = [
'prefix' => 'foo_',
];
$plugin_id = 'foo';
$plugin_definition = [];
$logger = new NullLogger();
$pipeline_manager = $this->prophesize(DatasetPipelinePluginManager::class);
$indexer = new ElasticSearchDestination($configuration, $plugin_id, $plugin_definition, $pipeline_manager->reveal(), $logger, $client->reveal());
$indexer = $this->mockIndexer($pipeline_manager->reveal(), new NullLogger(), $client->reveal());
$this->assertTrue($indexer->processCleanup($dataset, [1, 2]));
}
......
......@@ -4,11 +4,12 @@ declare(strict_types=1);
namespace Drupal\Tests\data_pipelines_elasticsearch\Kernel;
use Drupal\data_pipelines\Destination\ProcessingOperationEnum;
use Drupal\data_pipelines\Entity\DatasetInterface;
use Drupal\Tests\data_pipelines\Kernel\DatasetKernelTestBase;
use Drupal\Tests\data_pipelines\Traits\DatasetTestProcessingTrait;
use Drupal\Tests\data_pipelines_elasticsearch\Traits\ElasticsearchTestDestinationTrait;
use Drupal\data_pipelines\Destination\ProcessingOperationEnum;
use Drupal\data_pipelines\Entity\DatasetInterface;
use Drupal\data_pipelines\Form\DatasetBatchContext;
/**
* Elasticsearch Destination Queue Test.
......@@ -49,7 +50,7 @@ class ElasticsearchDestinationQueueTest extends DatasetKernelTestBase {
/**
* Tests index creation.
*
* @covers \Drupal\data_pipelines_elasticsearch\Plugin\DatasetDestination\ElasticSearchDestination::getCurrentCount
* @covers \Drupal\data_pipelines_elasticsearch\Plugin\DatasetDestination\ElasticSearchDestination::getLastDelta
* @covers \Drupal\data_pipelines_elasticsearch\Plugin\DatasetDestination\ElasticSearchDestination::getProcessingChunkSize
* @covers \Drupal\data_pipelines_elasticsearch\Plugin\DatasetDestination\ElasticSearchDestination::beginProcessing
* @covers \Drupal\data_pipelines_elasticsearch\Plugin\DatasetDestination\ElasticSearchDestination::createIndex
......@@ -57,10 +58,10 @@ class ElasticsearchDestinationQueueTest extends DatasetKernelTestBase {
* @covers \Drupal\data_pipelines_elasticsearch\Plugin\DatasetDestination\ElasticSearchDestination::endProcessing
*/
public function testIndexing() {
$this->assertEquals(0, $this->destinationPlugin->getCurrentCount($this->dataset));
$this->assertEquals(0, $this->destinationPlugin->getLastDelta($this->dataset));
$queue_id = $this->dataset->getProcessingQueueId();
$process_queue = \Drupal::queue($queue_id);
$this->dataset->queueProcessing($process_queue);
$this->dataset->queueProcessing($process_queue, new DatasetBatchContext($this->dataset->getBatchSize()));
// The ES destination index items in bulk so only one process chunk.
$this->assertEquals(3, $process_queue->numberOfItems());
$this->processQueueWithWait($queue_id, $this->destination->id(), [
......@@ -68,18 +69,18 @@ class ElasticsearchDestinationQueueTest extends DatasetKernelTestBase {
ProcessingOperationEnum::PROCESS_CHUNK,
ProcessingOperationEnum::END,
]);
$this->assertTrue($this->client->indices()->exists(['index' => 'foo_' . $this->dataset->getMachineName()]));
$this->assertEquals(2, $this->destinationPlugin->getCurrentCount($this->dataset));
$this->assertTrue($this->client->indices()->exists(['index' => 'foo_' . $this->dataset->getMachineName()])->asBool());
$this->assertEquals(1, $this->destinationPlugin->getLastDelta($this->dataset));
// Set unpublished.
$this->dataset->setUnpublished();
$this->dataset->save();
$this->assertFalse($this->client->indices()->exists(['index' => 'foo_' . $this->dataset->getMachineName()]));
$this->assertFalse($this->client->indices()->exists(['index' => 'foo_' . $this->dataset->getMachineName()])->asBool());
// Set published.
$this->dataset->setPublished();
$this->dataset->save();
$this->dataset->queueProcessing($process_queue);
$this->dataset->queueProcessing($process_queue, new DatasetBatchContext($this->dataset->getBatchSize()));
// The ES destination index items in bulk so only one process chunk.
$this->assertEquals(3, $process_queue->numberOfItems());
$this->processQueueWithWait($queue_id, $this->destination->id(), [
......@@ -87,12 +88,12 @@ class ElasticsearchDestinationQueueTest extends DatasetKernelTestBase {
ProcessingOperationEnum::PROCESS_CHUNK,
ProcessingOperationEnum::END,
]);
$this->assertTrue($this->client->indices()->exists(['index' => 'foo_' . $this->dataset->getMachineName()]));
$this->assertEquals(2, $this->destinationPlugin->getCurrentCount($this->dataset));
$this->assertTrue($this->client->indices()->exists(['index' => 'foo_' . $this->dataset->getMachineName()])->asBool());
$this->assertEquals(1, $this->destinationPlugin->getLastDelta($this->dataset));
// Clean up.
$this->dataset->delete();
$this->assertFalse($this->client->indices()->exists(['index' => 'foo_' . $this->dataset->getMachineName()]));
$this->assertFalse($this->client->indices()->exists(['index' => 'foo_' . $this->dataset->getMachineName()])->asBool());
}
......@@ -103,7 +104,7 @@ class ElasticsearchDestinationQueueTest extends DatasetKernelTestBase {
$queue_id = $this->dataset->getProcessingQueueId();
/** @var \Drupal\Core\Queue\QueueWorkerInterface $queue_worker */
$process_queue = \Drupal::queue($queue_id);
$this->dataset->queueProcessing($process_queue);
$this->dataset->queueProcessing($process_queue, new DatasetBatchContext($this->dataset->getBatchSize()));
// The elastic destination index all items at once.
$this->assertEquals(3, $process_queue->numberOfItems());
$this->processQueueWithWait($queue_id, $this->destination->id(), [
......@@ -111,7 +112,7 @@ class ElasticsearchDestinationQueueTest extends DatasetKernelTestBase {
ProcessingOperationEnum::PROCESS_CHUNK,
ProcessingOperationEnum::END,
]);
$this->assertEquals(2, $this->destinationPlugin->getCurrentCount($this->dataset));
$this->assertEquals(1, $this->destinationPlugin->getLastDelta($this->dataset));
// Add some more data to the dataset including some invalid data.
$csv = <<<CSV
should_we,firstname,lastname
......@@ -123,7 +124,7 @@ N,nora,bloggs
CSV;
$this->dataset->csv_text->value = $csv;
$this->dataset->save();
$this->dataset->queueProcessing($process_queue);
$this->dataset->queueProcessing($process_queue, new DatasetBatchContext($this->dataset->getBatchSize()));
// The elastic destination index all items at once. Item 3 is invalid but
// still we have no clean up item as we want to retain them.
$this->assertEquals(3, $process_queue->numberOfItems());
......@@ -133,7 +134,7 @@ CSV;
ProcessingOperationEnum::END,
]);
// Only 4 valid items are indexed.
$this->assertEquals(4, $this->destinationPlugin->getCurrentCount($this->dataset));
$this->assertEquals(4, $this->destinationPlugin->getLastDelta($this->dataset));
// Remove some more data to the dataset.
$csv = <<<CSV
should_we,firstname,lastname
......@@ -143,7 +144,7 @@ N,lara,bloggs
CSV;
$this->dataset->csv_text->value = $csv;
$this->dataset->save();
$this->dataset->queueProcessing($process_queue);
$this->dataset->queueProcessing($process_queue, new DatasetBatchContext($this->dataset->getBatchSize()));
// The elastic destination index all items at once. Item 1 is invalid but
// still we have no clean up item as we want to retain them.
$this->assertEquals(3, $process_queue->numberOfItems());
......@@ -153,7 +154,7 @@ CSV;
ProcessingOperationEnum::END,
]);
// Three older items and two new item all are indexed.
$this->assertEquals(5, $this->destinationPlugin->getCurrentCount($this->dataset));
$this->assertEquals(4, $this->destinationPlugin->getLastDelta($this->dataset));
}
/**
......@@ -162,11 +163,11 @@ CSV;
public function testRemoveInvalid(): void {
$this->dataset->invalid_values->value = DatasetInterface::INVALID_REMOVE;
$this->dataset->save();
$this->assertEquals(0, $this->destinationPlugin->getCurrentCount($this->dataset));
$this->assertEquals(0, $this->destinationPlugin->getLastDelta($this->dataset));
$queue_id = $this->dataset->getProcessingQueueId();
/** @var \Drupal\Core\Queue\QueueWorkerInterface $queue_worker */
$process_queue = \Drupal::queue($queue_id);
$this->dataset->queueProcessing($process_queue);
$this->dataset->queueProcessing($process_queue, new DatasetBatchContext($this->dataset->getBatchSize()));
// The elastic destination index all items at once.
$this->assertEquals(3, $process_queue->numberOfItems());
$this->processQueueWithWait($queue_id, $this->destination->id(), [
......@@ -174,7 +175,7 @@ CSV;
ProcessingOperationEnum::PROCESS_CHUNK,
ProcessingOperationEnum::END,
]);
$this->assertEquals(2, $this->destinationPlugin->getCurrentCount($this->dataset));
$this->assertEquals(1, $this->destinationPlugin->getLastDelta($this->dataset));
// Add some more data to the dataset including some invalid data.
$csv = <<<CSV
should_we,firstname,lastname
......@@ -186,7 +187,7 @@ N,nora,bloggs
CSV;
$this->dataset->csv_text->value = $csv;
$this->dataset->save();
$this->dataset->queueProcessing($process_queue);
$this->dataset->queueProcessing($process_queue, new DatasetBatchContext($this->dataset->getBatchSize()));
// The elastic destination index all items at once. Item 3 is invalid.
$this->assertEquals(4, $process_queue->numberOfItems());
$this->processQueueWithWait($queue_id, $this->destination->id(), [
......@@ -195,7 +196,7 @@ CSV;
ProcessingOperationEnum::PROCESS_CLEANUP,
ProcessingOperationEnum::END,
]);
$this->assertEquals(4, $this->destinationPlugin->getCurrentCount($this->dataset));
$this->assertEquals(4, $this->destinationPlugin->getLastDelta($this->dataset));
// Remove some more data to the dataset.
$csv = <<<CSV
should_we,firstname,lastname
......@@ -205,7 +206,7 @@ N,lara,bloggs
CSV;
$this->dataset->csv_text->value = $csv;
$this->dataset->save();
$this->dataset->queueProcessing($process_queue);
$this->dataset->queueProcessing($process_queue, new DatasetBatchContext($this->dataset->getBatchSize()));
// The elastic destination index all items at once. Item 1 is valid.
$this->assertEquals(5, $process_queue->numberOfItems());
$this->processQueueWithWait($queue_id, $this->destination->id(), [
......@@ -217,7 +218,7 @@ CSV;
ProcessingOperationEnum::PROCESS_CLEANUP,
ProcessingOperationEnum::END,
]);
$this->assertEquals(2, $this->destinationPlugin->getCurrentCount($this->dataset));
$this->assertEquals(2, $this->destinationPlugin->getLastDelta($this->dataset));
}
}
......@@ -5,13 +5,15 @@ declare(strict_types=1);
namespace Drupal\Tests\data_pipelines_elasticsearch\Kernel;
use Drupal\Core\Messenger\MessengerInterface;
use Drupal\Tests\data_pipelines\Kernel\DatasetKernelTestBase;
use Drupal\Tests\data_pipelines_elasticsearch\Traits\ElasticsearchTestDestinationTrait;
use Drupal\data_pipelines\DatasetData;
use Drupal\data_pipelines\DatasetPipelinePluginManager;
use Drupal\data_pipelines\Entity\DestinationInterface;
use Drupal\data_pipelines_elasticsearch\Plugin\DatasetDestination\ElasticSearchDestination;
use Drupal\Tests\data_pipelines\Kernel\DatasetKernelTestBase;
use Elasticsearch\Client;
use Elasticsearch\Namespaces\IndicesNamespace;
use Elastic\Elasticsearch\Client;
use Elastic\Elasticsearch\Endpoints\Indices;
use Elastic\Elasticsearch\Response\Elasticsearch;
use Prophecy\Argument;
use Prophecy\PhpUnit\ProphecyTrait;
use Psr\Log\NullLogger;
......@@ -27,14 +29,17 @@ use Symfony\Component\ErrorHandler\BufferingLogger;
class ElasticsearchDestinationTest extends DatasetKernelTestBase {
use ProphecyTrait;
use ElasticsearchTestDestinationTrait;
/**
* Tests index creation.
*/
public function testIndexCreation() {
$dataset = $this->createTestDataset();
$indices = $this->prophesize(IndicesNamespace::class);
$indices->exists(Argument::any())->willReturn(FALSE);
$indices = $this->prophesize(Indices::class);
$indices_exists_result = $this->prophesize(Elasticsearch::class);
$indices_exists_result->asBool()->willReturn(FALSE);
$indices->exists(Argument::any())->willReturn($indices_exists_result->reveal());
$indices->create([
'index' => 'foo_' . $dataset->getMachineName(),
......@@ -55,13 +60,7 @@ class ElasticsearchDestinationTest extends DatasetKernelTestBase {
$pipeline_manager->hasDefinition(Argument::exact($dataset->getPipelineId()))->willReturn(TRUE);
$pipeline_manager->getDefinition(Argument::exact($dataset->getPipelineId()))->willReturn([]);
$configuration = [
'prefix' => 'foo_',
];
$plugin_id = 'foo';
$plugin_definition = [];
$logger = new NullLogger();
$indexer = new ElasticSearchDestination($configuration, $plugin_id, $plugin_definition, $pipeline_manager->reveal(), $logger, $client->reveal());
$indexer = $this->mockIndexer($pipeline_manager->reveal(), new NullLogger(), $client->reveal());
$this->assertTrue($indexer->beginProcessing($dataset));
}
......@@ -70,8 +69,10 @@ class ElasticsearchDestinationTest extends DatasetKernelTestBase {
*/
public function testIndexNotCreatedIfExists() {
$dataset = $this->createTestDataset();
$indices = $this->prophesize(IndicesNamespace::class);
$indices->exists(Argument::any())->willReturn(TRUE);
$indices = $this->prophesize(Indices::class);
$indices_exists_result = $this->prophesize(Elasticsearch::class);
$indices_exists_result->asBool()->willReturn(TRUE);
$indices->exists(Argument::any())->willReturn($indices_exists_result->reveal());
$indices->create(Argument::any())->shouldNotBeCalled();
$client = $this->prophesize(Client::class);
$client->indices()->willReturn($indices->reveal());
......@@ -82,13 +83,7 @@ class ElasticsearchDestinationTest extends DatasetKernelTestBase {
$pipeline_manager->hasDefinition(Argument::exact($dataset->getPipelineId()))->willReturn(TRUE);
$pipeline_manager->getDefinition(Argument::exact($dataset->getPipelineId()))->willReturn([]);
$configuration = [
'prefix' => 'foo_',
];
$plugin_id = 'foo';
$plugin_definition = [];
$logger = new NullLogger();
$indexer = new ElasticSearchDestination($configuration, $plugin_id, $plugin_definition, $pipeline_manager->reveal(), $logger, $client->reveal());
$indexer = $this->mockIndexer($pipeline_manager->reveal(), new NullLogger(), $client->reveal());
$this->assertTrue($indexer->beginProcessing($dataset));
}
......@@ -97,7 +92,7 @@ class ElasticsearchDestinationTest extends DatasetKernelTestBase {
*/
public function testIndexExistenceFailure() {
$dataset = $this->createTestDataset();
$indices = $this->prophesize(IndicesNamespace::class);
$indices = $this->prophesize(Indices::class);
$indices->exists(Argument::any())->willThrow(new \Exception('Barf'));
$indices->create(Argument::any())->shouldNotBeCalled();
$client = $this->prophesize(Client::class);
......@@ -107,14 +102,7 @@ class ElasticsearchDestinationTest extends DatasetKernelTestBase {
$pipeline_manager->hasDefinition(Argument::exact($dataset->getPipelineId()))->willReturn(TRUE);
$pipeline_manager->getDefinition(Argument::exact($dataset->getPipelineId()))->willReturn([]);
$configuration = [
'prefix' => 'foo_',
];
$plugin_id = 'foo';
$plugin_definition = [];
$logger = new NullLogger();
$indexer = new ElasticSearchDestination($configuration, $plugin_id, $plugin_definition, $pipeline_manager->reveal(), $logger, $client->reveal());
$indexer = $this->mockIndexer($pipeline_manager->reveal(), new NullLogger(), $client->reveal());
$this->assertFalse($indexer->beginProcessing($dataset));
}
......@@ -123,8 +111,10 @@ class ElasticsearchDestinationTest extends DatasetKernelTestBase {
*/
public function testIndexCreationFailure() {
$dataset = $this->createTestDataset();
$indices = $this->prophesize(IndicesNamespace::class);
$indices->exists(Argument::any())->willReturn(FALSE);
$indices = $this->prophesize(Indices::class);
$indices_exists_result = $this->prophesize(Elasticsearch::class);
$indices_exists_result->asBool()->willReturn(FALSE);
$indices->exists(Argument::any())->willReturn($indices_exists_result->reveal());
$indices->create(Argument::any())->willThrow(new \Exception('Barf'));
$client = $this->prophesize(Client::class);
$client->indices()->willReturn($indices->reveal());
......@@ -133,14 +123,7 @@ class ElasticsearchDestinationTest extends DatasetKernelTestBase {
$pipeline_manager->hasDefinition(Argument::exact($dataset->getPipelineId()))->willReturn(TRUE);
$pipeline_manager->getDefinition(Argument::exact($dataset->getPipelineId()))->willReturn([]);
$configuration = [
'prefix' => 'foo_',
];
$plugin_id = 'foo';
$plugin_definition = [];
$logger = new NullLogger();
$indexer = new ElasticSearchDestination($configuration, $plugin_id, $plugin_definition, $pipeline_manager->reveal(), $logger, $client->reveal());
$indexer = $this->mockIndexer($pipeline_manager->reveal(), new NullLogger(), $client->reveal());
$dataset = $this->createTestDataset();
$this->assertFalse($indexer->beginProcessing($dataset));
}
......@@ -150,8 +133,11 @@ class ElasticsearchDestinationTest extends DatasetKernelTestBase {
*/
public function testIndexing() {
$dataset = $this->createTestDataset();
$indices = $this->prophesize(IndicesNamespace::class);
$indices->exists(Argument::any())->willReturn(TRUE);
$indices = $this->prophesize(Indices::class);
$indices = $this->prophesize(Indices::class);
$indices_exists_result = $this->prophesize(Elasticsearch::class);
$indices_exists_result->asBool()->willReturn(TRUE);
$indices->exists(Argument::any())->willReturn($indices_exists_result->reveal());
$client = $this->prophesize(Client::class);
$client->indices()->willReturn($indices->reveal());
......@@ -170,7 +156,7 @@ class ElasticsearchDestinationTest extends DatasetKernelTestBase {
'_id' => $dataset->getMachineName() . ':' . $delta,
],
];
$bulk['body'][] = $data->getArrayCopy();
$bulk['body'][] = ['@delta' => $delta] + $data->getArrayCopy();
if (count($bulk['body']) === 200) {
$client->bulk($bulk)->shouldBeCalled();
$bulk['body'] = [];
......@@ -179,17 +165,8 @@ class ElasticsearchDestinationTest extends DatasetKernelTestBase {
if (count($bulk['body']) > 0) {
$client->bulk($bulk)->shouldBeCalled();
}
if (count($bulk['body']) > 0) {
$client->bulk($bulk)->shouldBeCalled();
}
$configuration = [
'prefix' => 'foo_',
];
$plugin_id = 'foo';
$plugin_definition = [];
$logger = new NullLogger();
$indexer = new ElasticSearchDestination($configuration, $plugin_id, $plugin_definition, $pipeline_manager->reveal(), $logger, $client->reveal());
$indexer = $this->mockIndexer($pipeline_manager->reveal(), new NullLogger(), $client->reveal());
$this->assertTrue($indexer->beginProcessing($dataset));
$this->assertTrue($indexer->processChunk($dataset, iterator_to_array($dataset->getDataIterator())));
}
......@@ -200,7 +177,7 @@ class ElasticsearchDestinationTest extends DatasetKernelTestBase {
public function testDeleting() {
$dataset = $this->createTestDataset();
$destination = $this->prophesize(DestinationInterface::class);
$indices = $this->prophesize(IndicesNamespace::class);
$indices = $this->prophesize(Indices::class);
$client = $this->prophesize(Client::class);
$client->indices()->willReturn($indices->reveal());
......@@ -213,13 +190,7 @@ class ElasticsearchDestinationTest extends DatasetKernelTestBase {
];
$indices->delete($expected)->shouldBeCalled();
$configuration = [
'prefix' => 'foo_',
];
$plugin_id = 'foo';
$plugin_definition = [];
$logger = new NullLogger();
$indexer = new ElasticSearchDestination($configuration, $plugin_id, $plugin_definition, $pipeline_manager->reveal(), $logger, $client->reveal());
$indexer = $this->mockIndexer($pipeline_manager->reveal(), new NullLogger(), $client->reveal());
$this->assertTrue($indexer->deleteDataSet($dataset, $destination->reveal()));
}
......@@ -229,7 +200,7 @@ class ElasticsearchDestinationTest extends DatasetKernelTestBase {
public function testDeletingException() {
$dataset = $this->createTestDataset();
$destination = $this->prophesize(DestinationInterface::class);
$indices = $this->prophesize(IndicesNamespace::class);
$indices = $this->prophesize(Indices::class);
$client = $this->prophesize(Client::class);
$client->indices()->willReturn($indices->reveal());
......@@ -244,13 +215,8 @@ class ElasticsearchDestinationTest extends DatasetKernelTestBase {
$exception = new \Exception('barf');
$indices->delete($expected)->willThrow($exception);
$configuration = [
'prefix' => 'foo_',
];
$plugin_id = 'foo';
$plugin_definition = [];
$logger = new BufferingLogger();
$indexer = new ElasticSearchDestination($configuration, $plugin_id, $plugin_definition, $pipeline_manager->reveal(), $logger, $client->reveal());
$indexer = $this->mockIndexer($pipeline_manager->reveal(), $logger, $client->reveal());
$indexer->setMessenger($messenger->reveal());
$indexer->deleteDataSet($dataset, $destination->reveal());
......
......@@ -4,11 +4,15 @@ declare(strict_types=1);
namespace Drupal\Tests\data_pipelines_elasticsearch\Traits;
use Drupal\data_pipelines\DatasetPipelinePluginManager;
use Drupal\data_pipelines\Destination\DatasetDestinationPluginInterface;
use Drupal\data_pipelines\Entity\Destination;
use Drupal\data_pipelines\Entity\DestinationInterface;
use Elasticsearch\Client;
use Elasticsearch\Common\Exceptions\NoNodesAvailableException;
use Drupal\data_pipelines_elasticsearch\Plugin\DatasetDestination\ElasticSearchDestination;
use Elastic\Elasticsearch\Client;
use Elastic\Transport\Exception\NoNodeAvailableException;
use PHPUnit\Framework\MockObject\MockObject;
use Psr\Log\LoggerInterface;
/**
* Elasticsearch Destination Trait.
......@@ -32,9 +36,9 @@ trait ElasticsearchTestDestinationTrait {
/**
* The Elasticsearch client.
*
* @var \Elasticsearch\Client|null
* @var \Elastic\Elasticsearch\Client
*/
protected ?Client $client;
protected Client $client;
/**
* Creates a test elasticsearch destination.
......@@ -49,10 +53,10 @@ trait ElasticsearchTestDestinationTrait {
'label' => 'Test destination',
'destination' => 'elasticsearch',
'destinationSettings' => [
'url' => '',
'username' => '',
'password' => '',
'prefix' => '',
'url' => getenv('ELASTICSEARCH_URL'),
'username' => 'elastic',
'password' => 'changeme',
'prefix' => 'foo_',
],
]);
$destination->save();
......@@ -65,24 +69,45 @@ trait ElasticsearchTestDestinationTrait {
* @covers \Drupal\data_pipelines_elasticsearch\Plugin\DatasetDestination\ElasticSearchDestination::getClient
*/
protected function initializeElasticClient():void {
$this->destination = $this->createElasticsearchDestination([
'destinationSettings' => [
'url' => getenv('ELASTICSEARCH_URL'),
'username' => 'elastic',
'password' => 'changeme',
'prefix' => 'foo_',
],
]);
$this->destination = $this->createElasticsearchDestination();
$this->destinationPlugin = $this->destination->getDestinationPlugin();
$this->client = $this->destinationPlugin->getClient();
$this->assertInstanceOf(Client::class, $this->client);
try {
$this->client->ping();
}
catch (NoNodesAvailableException $exception) {
catch (NoNodeAvailableException) {
$this->markTestSkipped('The test is skipped as no alive nodes found in cluster');
}
}
/**
* A method to mock the indexer required for testing indexing.
*
* @param \Drupal\data_pipelines\DatasetPipelinePluginManager $pipeline_manager
* The mocked pipeline manager.
* @param \Psr\Log\LoggerInterface $logger
* The logger.
* @param \Elastic\Elasticsearch\Client $client
* The mocked Elasticsearch client.
*
* @return \PHPUnit\Framework\MockObject\MockObject
* The mocked indexer.
*/
protected function mockIndexer(DatasetPipelinePluginManager $pipeline_manager, LoggerInterface $logger, Client $client): MockObject {
$configuration = [
'prefix' => 'foo_',
];
$plugin_id = 'foo';
$plugin_definition = [];
$indexer = $this->getMockBuilder(ElasticSearchDestination::class)
->disableOriginalClone()
->onlyMethods(['getClient'])
->setConstructorArgs([$configuration, $plugin_id, $plugin_definition, $pipeline_manager, $logger])
->getMock();
$indexer->method('getClient')->willReturn($client);
return $indexer;
}
}
......@@ -2,6 +2,7 @@
namespace Drupal\data_pipelines\Commands;
use Drupal\Core\Database\Connection;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\data_pipelines\Entity\Dataset;
use Drupal\data_pipelines\Entity\DatasetInterface;
......@@ -22,54 +23,78 @@ class DataPipelinesCommands extends DrushCommands {
*/
protected $entityTypeManager;
/**
* The database connection.
*
* @var \Drupal\Core\Database\Connection
*/
protected $database;
/**
* Creates a DataPipelinesCommand object.
*
* @param \Drupal\Core\Entity\EntityTypeManagerInterface $entity_type_manager
* The entity type manager.
* @param \Drupal\Core\Database\Connection $database
* The database connection.
*/
public function __construct(EntityTypeManagerInterface $entity_type_manager) {
public function __construct(EntityTypeManagerInterface $entity_type_manager, Connection $database) {
parent::__construct();
$this->entityTypeManager = $entity_type_manager;
$this->database = $database;
}
/**
* Provide a list of datasets if argument is not provided.
* Provide a list of datasets if a machine name has not been provided.
*
* @hook interact data-pipelines:reindex
*/
public function datasetReindexInteract(InputInterface $input, OutputInterface $output) {
public function datasetReindexInteract(InputInterface $input, OutputInterface $output): void {
if (!empty($input->getArgument('machine_name'))) {
return;
}
$datasets = $this->entityTypeManager->getStorage('data_pipelines')->loadMultiple();
$choices = [];
$choices = ['all' => 'All'];
foreach ($datasets as $dataset) {
assert($dataset instanceof DatasetInterface);
// Choices doe not support integer keys, so do some string manipulation.
$choices[$dataset->getMachineName()] = $dataset->label();
}
$choice = $this->io()->choice(dt('Which dataset would you like to reindex'), $choices);
$input->setArgument('machine_name', $choice);
}
/**
* Rebulid the index for a dataset.
* Delete and reindex dataset(s).
*
* @param string $machine_name
* The data pipelines machine_name to rebuild.
* The dataset machine name.
*
* @command data-pipelines:reindex
*
* @validate-module-enabled data_pipelines
*
* @throws \Exception
*/
public function datasetReindex(string $machine_name) {
$dataset = DataSet::loadByMachineName($machine_name);
assert($dataset instanceof DatasetInterface);
batch_set(DatasetBatchOperations::batchForDataset($dataset));
public function datasetReindex(string $machine_name): void {
$datasets = [];
if ('all' === $machine_name) {
$results = $this->database->select('data_pipelines')
->fields('data_pipelines', ['machine_name'])
->execute()
->fetchAll();
$datasets = array_map(fn($result) => $result->machine_name, $results);
}
else {
$datasets[] = $machine_name;
}
foreach ($datasets as $dataset) {
$dataset = Dataset::loadByMachineName($dataset);
assert($dataset instanceof DatasetInterface);
foreach ($dataset->getDestinations() as $destination) {
$destination->getDestinationPlugin()->deleteDataSet($dataset, $destination);
}
batch_set(DatasetBatchOperations::batchForDataset($dataset));
}
drush_backend_batch_process();
}
......
......@@ -85,7 +85,7 @@ abstract class DatasetDestinationPluginBase extends PluginBase implements Datase
/**
* {@inheritdoc}
*/
public function getCurrentCount(DatasetInterface $dataset): int {
public function getLastDelta(DatasetInterface $dataset): int {
return 0;
}
......
......@@ -55,15 +55,15 @@ interface DatasetDestinationPluginInterface extends PluginFormInterface, Configu
public function deleteDataSet(DatasetInterface $dataset, DestinationInterface $destination): bool;
/**
* Gets count of currently processed data.
* Get the last delta of the dataset from the destination.
*
* @param \Drupal\data_pipelines\Entity\DatasetInterface $dataset
* Dataset.
*
* @return int
* Count of currently processed data.
* The last delta.
*/
public function getCurrentCount(DatasetInterface $dataset): int;
public function getLastDelta(DatasetInterface $dataset): int;
/**
* Gets processing chunk size.
......
......@@ -18,6 +18,7 @@ use Drupal\data_pipelines\DatasetPipelinePluginManager;
use Drupal\data_pipelines\Destination\DatasetDestinationPluginInterface;
use Drupal\data_pipelines\Destination\ProcessingOperation;
use Drupal\data_pipelines\Destination\ProcessingOperationEnum;
use Drupal\data_pipelines\Form\DatasetBatchContext;
use Drupal\data_pipelines\Source\DatasetSourceInterface;
use Drupal\data_pipelines\Source\DatasetSourcePluginManager;
use Drupal\data_pipelines\TransformValidDataIterator;
......@@ -262,7 +263,7 @@ class Dataset extends ContentEntityBase implements DatasetInterface {
* @return \Drupal\data_pipelines\DatasetPipelineInterface
* Pipeline.
*/
protected function getPipeline(): DatasetPipelineInterface {
public function getPipeline(): DatasetPipelineInterface {
return \Drupal::service('plugin.manager.data_pipelines_pipeline')->createInstance($this->getPipelineId());
}
......@@ -290,15 +291,15 @@ class Dataset extends ContentEntityBase implements DatasetInterface {
/**
* {@inheritdoc}
*/
public function queueProcessing(QueueInterface $queue, int $from = 0, int $limit = 1000): int {
public function queueProcessing(QueueInterface $queue, DatasetBatchContext $context): void {
if (!$this->isPublished()) {
return 0;
return;
}
if ($from === 0) {
if ($context->getProcessed() === 0) {
$queue->deleteQueue();
}
$destinations = [];
$original_size = [];
$last_deltas = [];
$chunk_sizes = [];
$buffers = [];
foreach ($this->getDestinations() as $destination_entity) {
......@@ -306,25 +307,25 @@ class Dataset extends ContentEntityBase implements DatasetInterface {
$destination_id = $destination_entity->id();
// Keep track of destinations, so we can easily loop them by ID later.
$destinations[$destination_id] = $destination_entity;
// Get the original size so that we can remove outdated records later.
$original_size[$destination_id] = $destination_entity->getDestinationPlugin()->getCurrentCount($this);
// Get the last delta from the dataset.
$last_deltas[$destination_id] = $destination_entity->getDestinationPlugin()->getLastDelta($this);
// Store a keyed array of the chunk size each destination wants to store
// in each batch entry.
$chunk_sizes[$destination_id] = $destination_entity->getDestinationPlugin()->getProcessingChunkSize();
// Buffer dataset rows for each destination, so we can create the queue
// items with the appropriate number of rows (per the chunk size).
$buffers[$destination_id] = [];
if ($from === 0) {
if ($context->getProcessed() === 0) {
// If we're processing from the start, create a queue item for a 'begin'
// operation.
$queue->createItem(new ProcessingOperation(ProcessingOperationEnum::BEGIN, $destination_id));
}
}
$total_count = $count = 0;
$iterator = $this->getDataIterator($from);
$iterator = $this->getDataIterator($context->getProcessed());
$valid = 0;
// Iterate over each row in the dataset.
foreach ($iterator as $delta => $transformed) {
$count++;
$valid++;
foreach ($destinations as $destination_id => $destination) {
assert($destination instanceof DestinationInterface);
// Push this item into the buffer for each destination. Be sure to
......@@ -346,40 +347,44 @@ class Dataset extends ContentEntityBase implements DatasetInterface {
$buffers[$destination_id] = [];
}
}
if ($count === $limit) {
if ($valid === $context->getChunkSize()) {
break;
}
}
// We've processed all available records at this point, so we need to flush
if ($valid < $context->getChunkSize()) {
$context->setFinished();
}
$invalid_deltas = $iterator->getInvalidDeltas();
$invalid = count($invalid_deltas);
$context->incrementValid($valid);
$context->incrementInvalid($invalid);
// We've processed all available records at this point, so we need to flush
// any buffered data.
foreach ($destinations as $destination_id => $destination) {
assert($destination instanceof DestinationInterface);
// There are buffered records for this destination.
if (count($buffers[$destination_id]) > 0) {
if ($buffers[$destination_id]) {
// So we create a queue item for the buffered records and this
// destination.
$queue->createItem(new ProcessingOperation(ProcessingOperationEnum::PROCESS_CHUNK, $destination_id, $buffers[$destination_id]));
}
// Queue cleanup of any invalid items.
if ($this->getInvalidValuesHandling() === self::INVALID_REMOVE && $invalid_deltas = $iterator->getInvalidDeltas()) {
$total_count = $count + count($invalid_deltas);
if ($this->getInvalidValuesHandling() === self::INVALID_REMOVE && $invalid_deltas) {
$queue->createItem(new ProcessingOperation(ProcessingOperationEnum::PROCESS_CLEANUP, $destination_id, $invalid_deltas));
}
// We've also reached the final record, so we queue an 'end' operation.
if ($count < $limit) {
// After processing all the data if the current count is less than the
// previously indexed data count then clear up the remaining data.
if ($count < $original_size[$destination_id] && $this->getInvalidValuesHandling() === self::INVALID_REMOVE) {
// Index IDs start from 0 so removing from the current total count
// till the end will remove the outdated items.
$queue->createItem(new ProcessingOperation(ProcessingOperationEnum::PROCESS_CLEANUP, $destination_id, range($total_count, $original_size[$destination_id])));
if ($context->isFinished()) {
// If the last delta is less than the current last delta,
// truncate upwards starting from the next delta.
if ($context->getLastDelta() < $last_deltas[$destination_id] && $this->getInvalidValuesHandling() === self::INVALID_REMOVE) {
$queue->createItem(new ProcessingOperation(ProcessingOperationEnum::PROCESS_CLEANUP, $destination_id, range($context->getLastDelta() + 1, $last_deltas[$destination_id])));
}
$queue->createItem(new ProcessingOperation(ProcessingOperationEnum::END, $destination_id));
}
}
// Return the number of processed records so that calling code can
// update progress etc.
return $count;
}
/**
......
......@@ -8,6 +8,7 @@ use Drupal\Core\Entity\ContentEntityInterface;
use Drupal\Core\Entity\EntityPublishedInterface;
use Drupal\Core\Queue\QueueInterface;
use Drupal\Core\StringTranslation\TranslatableMarkup;
use Drupal\data_pipelines\Form\DatasetBatchContext;
use Drupal\data_pipelines\TransformValidDataIterator;
/**
......@@ -196,15 +197,10 @@ interface DatasetInterface extends ContentEntityInterface, EntityPublishedInterf
*
* @param \Drupal\Core\Queue\QueueInterface $queue
* Queue to use.
* @param int $from
* Starting point.
* @param int $limit
* Number of records to queue.
*
* @return int
* Number of queued records.
* @param \Drupal\data_pipelines\Form\DatasetBatchContext $context
* The dataset batch context.
*/
public function queueProcessing(QueueInterface $queue, int $from = 1000, int $limit = 1000): int;
public function queueProcessing(QueueInterface $queue, DatasetBatchContext $context): void;
/**
* Gets the data iterator.
......
<?php
declare(strict_types=1);
namespace Drupal\data_pipelines\Form;
/**
* The context when processing the dataset as part of a batch.
*/
class DatasetBatchContext {
/**
* The batch chunk size.
*
* @var int
*/
protected int $chunkSize;
/**
* Whether the batch is finished.
*
* @var bool
*/
protected bool $finished;
/**
* The number of valid items in the batch.
*
* @var int
*/
protected int $valid;
/**
* The number if invalid items in the batch.
*
* @var int
*/
protected int $invalid;
/**
* The number of items processed in the batch.
*
* @var int
*/
protected int $processed;
/**
* DatasetBatchContext constructor.
*
* @param int $chunk_size
* The batch chunk size.
*/
public function __construct(int $chunk_size) {
$this->valid = 0;
$this->invalid = 0;
$this->processed = 0;
$this->finished = FALSE;
$this->chunkSize = $chunk_size;
}
/**
* A method to get the number of valid items in the batch.
*
* @return int
* The number of valid items.
*/
public function getValid(): int {
return $this->valid;
}
/**
* A method to get the number of invalid items in the batch.
*
* @return int
* The number of invalid items.
*/
public function getInvalid(): int {
return $this->invalid;
}
/**
* A method to get the number of items processed in the batch.
*
* @return int
* The number of items processed.
*/
public function getProcessed(): int {
return $this->processed;
}
/**
* A method to get the last delta of the batch.
*
* This is simply the number of processed items minus 1 to account
* for it starting from 0.
*
* @return int
* The last delta.
*/
public function getLastDelta(): int {
return $this->processed - 1;
}
/**
* A method to get the batch chunk size.
*
* @return int
* The batch chunk size.
*/
public function getChunkSize(): int {
return $this->chunkSize;
}
/**
* A method to increment the number of valid items.
*
* This in turn increments the number of processed items.
*
* @param int $count
* The valid count to increment.
*/
public function incrementValid(int $count = 1): void {
$this->valid += $count;
$this->processed += $count;
}
/**
* A method to increment the number of invalid items.
*
* This in turn increments the number of processed items.
*
* @param int $count
* The invalid count to increment.
*/
public function incrementInvalid(int $count = 1): void {
$this->invalid += $count;
$this->processed += $count;
}
/**
* A method to indicate whether the batch is finished.
*
* @return bool
* Whether the batch is finished.
*/
public function isFinished(): bool {
return $this->finished;
}
/**
* A method to set the batch to finished.
*/
public function setFinished(): void {
$this->finished = TRUE;
}
}
......@@ -93,17 +93,16 @@ class DatasetBatchOperations implements ContainerInjectionInterface {
$context['results']['dataset_id'] = $dataset_id;
$dataset = Dataset::load($dataset_id);
assert($dataset instanceof DatasetInterface);
$chunk_size = $dataset->getBatchSize();
if (!isset($context['sandbox']['from'])) {
$context['sandbox']['from'] = 0;
if (!$context['sandbox']) {
$context['sandbox']['context'] = new DatasetBatchContext($dataset->getBatchSize());
$dataset->resetLogs();
$dataset->setPendingProcessing();
}
$inner_context = &$context['sandbox']['context'];
$context['finished'] = 0;
$indexed = $dataset->queueProcessing(\Drupal::queue($dataset->getProcessingQueueId()), $context['sandbox']['from'], $chunk_size);
$context['sandbox']['from'] += $indexed;
$context['message'] = new TranslatableMarkup('Queueing processing, queued @count records.', ['@count' => $context['sandbox']['from']]);
if ($indexed < $chunk_size) {
$dataset->queueProcessing(\Drupal::queue($dataset->getProcessingQueueId()), $inner_context);
$context['message'] = new TranslatableMarkup('Queueing processing, queued @count records.', ['@count' => $inner_context->getProcessed()]);
if ($inner_context->isFinished()) {
$context['finished'] = 1;
$context['message'] = new TranslatableMarkup('Data is successfully queued for processing.');
}
......
......@@ -7,11 +7,11 @@ namespace Drupal\data_pipelines\Plugin\QueueWorker;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\Queue\QueueWorkerBase;
use Drupal\Tests\data_pipelines\Exception\DestinationNotFoundException;
use Drupal\Tests\data_pipelines\Exception\DestinationPluginNotFoundException;
use Drupal\data_pipelines\Destination\DatasetDestinationPluginManager;
use Drupal\data_pipelines\Destination\ProcessingOperation;
use Drupal\data_pipelines\Entity\DatasetInterface;
use Drupal\Tests\data_pipelines\Exception\DestinationNotFoundException;
use Drupal\Tests\data_pipelines\Exception\DestinationPluginNotFoundException;
use Psr\Log\LoggerInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;
......
......@@ -24,7 +24,7 @@ trait FieldValueTrait {
* @return mixed
* The field value.
*/
public static function getFieldValue(ContentEntityInterface $entity, string $field_name, string $main_property_name = NULL) {
public static function getFieldValue(ContentEntityInterface $entity, string $field_name, ?string $main_property_name = NULL) {
/** @var \Drupal\Core\Field\BaseFieldDefinition $field_definition */
$field_definition = $entity->{$field_name}->getFieldDefinition();
$main_property_name = $main_property_name ?? $field_definition->getMainPropertyName();
......
......@@ -125,7 +125,7 @@ class StateDestination extends DatasetDestinationPluginBase implements Container
/**
* {@inheritdoc}
*/
public function getCurrentCount(DatasetInterface $dataset): int {
public function getLastDelta(DatasetInterface $dataset): int {
$data = $this->getData();
return isset($data[$dataset->id()]) ? count($data[$dataset->id()]) : 0;
}
......
......@@ -5,11 +5,11 @@ declare(strict_types=1);
namespace Drupal\Tests\data_pipelines\Functional;
use Drupal\Core\Url;
use Drupal\Tests\BrowserTestBase;
use Drupal\data_pipelines\DatasetData;
use Drupal\data_pipelines\Entity\Dataset;
use Drupal\data_pipelines\Entity\DatasetInterface;
use Drupal\data_pipelines\Entity\Destination;
use Drupal\Tests\BrowserTestBase;
/**
* Defines a test for UI aspects of the module.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment