Skip to content
Snippets Groups Projects
Commit e848db21 authored by Jibran Ijaz's avatar Jibran Ijaz
Browse files

Issue #3405873 by jibran, tinny, larowlan, mortim07, nterbogt: Unable to...

Issue #3405873 by jibran, tinny, larowlan, mortim07, nterbogt: Unable to delete outdated records from the pipelines
parent 57fd6cea
No related branches found
Tags 1.0.0-alpha23
1 merge request!14Unable to delete outdated records from the pipelines
Pipeline #75967 passed
Showing
with 524 additions and 97 deletions
......@@ -61,50 +61,26 @@ variables:
_PHPUNIT_EXTRA: '--verbose'
# Convenient, and we have no secrets.
_SHOW_ENVIRONMENT_VARIABLES: '1'
ELASTICSEARCH_URL: 'http://elasticsearch:9200'
.with-elasticsearch: &with-elasticsearch
- name: elasticsearch:7.17.15
alias: elasticsearch
command:
- /bin/bash
- -c
- echo -Xms256m >> /usr/share/elasticsearch/config/jvm.options && echo -Xmx256m >> /usr/share/elasticsearch/config/jvm.options && /usr/local/bin/docker-entrypoint.sh elasticsearch -Ediscovery.type=single-node
###################################################################################
#
# *
# /(
# ((((,
# /(((((((
# ((((((((((*
# ,(((((((((((((((
# ,(((((((((((((((((((
# ((((((((((((((((((((((((*
# *(((((((((((((((((((((((((((((
# ((((((((((((((((((((((((((((((((((*
# *(((((((((((((((((( .((((((((((((((((((
# ((((((((((((((((((. /(((((((((((((((((*
# /((((((((((((((((( .(((((((((((((((((,
# ,(((((((((((((((((( ((((((((((((((((((
# .(((((((((((((((((((( .(((((((((((((((((
# ((((((((((((((((((((((( ((((((((((((((((/
# (((((((((((((((((((((((((((/ ,(((((((((((((((*
# .((((((((((((((/ /(((((((((((((. ,(((((((((((((((
# *(((((((((((((( ,(((((((((((((/ *((((((((((((((.
# ((((((((((((((, /(((((((((((((. ((((((((((((((,
# (((((((((((((/ ,(((((((((((((* ,(((((((((((((,
# *((((((((((((( .((((((((((((((( ,(((((((((((((
# ((((((((((((/ /((((((((((((((((((. ,((((((((((((/
# ((((((((((((( *(((((((((((((((((((((((* *((((((((((((
# ((((((((((((( ,(((((((((((((..((((((((((((( *((((((((((((
# ((((((((((((, /((((((((((((* /((((((((((((/ ((((((((((((
# ((((((((((((( /((((((((((((/ (((((((((((((* ((((((((((((
# (((((((((((((/ /(((((((((((( ,((((((((((((, *((((((((((((
# (((((((((((((( *(((((((((((/ *((((((((((((. ((((((((((((/
# *((((((((((((((((((((((((((, /(((((((((((((((((((((((((
# ((((((((((((((((((((((((( ((((((((((((((((((((((((,
# .(((((((((((((((((((((((/ ,(((((((((((((((((((((((
# ((((((((((((((((((((((/ ,(((((((((((((((((((((/
# *((((((((((((((((((((( (((((((((((((((((((((,
# ,(((((((((((((((((((((, ((((((((((((((((((((/
# ,(((((((((((((((((((((* /((((((((((((((((((((
# ((((((((((((((((((((((, ,/((((((((((((((((((((,
# ,(((((((((((((((((((((((((((((((((((((((((((((((((((
# .(((((((((((((((((((((((((((((((((((((((((((((
# .((((((((((((((((((((((((((((((((((((,.
# .,(((((((((((((((((((((((((.
#
###################################################################################
phpunit:
extends: .phpunit-base
services:
- !reference [.with-database]
- !reference [.with-chrome]
- !reference [.with-elasticsearch]
phpunit (next minor):
extends: .phpunit-base
services:
- !reference [.with-database]
- !reference [.with-chrome]
- !reference [.with-elasticsearch]
......@@ -319,3 +319,18 @@ function data_pipelines_update_8015(): void {
$entity_definition_update_manager->installFieldStorageDefinition('invalid_values', 'data_pipelines', 'data_pipelines', $field_storage_definition);
}
/**
* Update fields for invalid handling.
*/
function data_pipelines_update_8016(): void {
$entity_definition_update_manager = \Drupal::entityDefinitionUpdateManager();
$field_storage_definition = \Drupal::entityDefinitionUpdateManager()->getFieldStorageDefinition('invalid_values', 'data_pipelines');
$field_storage_definition
->setSetting('allowed_values', [
DatasetInterface::INVALID_RETAIN => new TranslatableMarkup('Retain records that were previously valid'),
DatasetInterface::INVALID_REMOVE => new TranslatableMarkup('Remove records that are now invalid and outdated'),
]);
$entity_definition_update_manager->updateFieldStorageDefinition($field_storage_definition);
}
......@@ -302,6 +302,28 @@ class ElasticSearchDestination extends DatasetDestinationPluginBase implements C
}
}
/**
* {@inheritdoc}
*/
public function getCurrentCount(DatasetInterface $dataset): int {
$index_id = $this->configuration['prefix'] . $dataset->getMachineName();
$index_params_base = [];
$index_params_base['index'] = $index_id;
$count = 0;
try {
if ($this->getClient()->indices()->exists($index_params_base)) {
$result = $this->getClient()->count($index_params_base);
$count = $result['count'] ?? 0;
}
}
catch (\Exception $e) {
$this->logger->error("Failed to get a current count due to @message", [
'@message' => $e->getMessage(),
]);
}
return $count;
}
/**
* {@inheritdoc}
*/
......
......@@ -5,10 +5,9 @@ declare(strict_types=1);
namespace Drupal\Tests\data_pipelines_elasticsearch\Kernel;
use Drupal\data_pipelines\DatasetPipelinePluginManager;
use Drupal\data_pipelines\Entity\Destination;
use Drupal\data_pipelines\Entity\DestinationInterface;
use Drupal\data_pipelines_elasticsearch\Plugin\DatasetDestination\ElasticSearchDestination;
use Drupal\Tests\data_pipelines\Kernel\DatasetKernelTestBase;
use Drupal\Tests\data_pipelines_elasticsearch\Traits\ElasticsearchTestDestinationTrait;
use Elasticsearch\Client;
use Elasticsearch\Namespaces\IndicesNamespace;
use Prophecy\Argument;
......@@ -23,6 +22,8 @@ use Psr\Log\NullLogger;
class ElasticsearchDestinationDatasetTest extends DatasetKernelTestBase {
use ProphecyTrait;
use ElasticsearchTestDestinationTrait;
/**
* {@inheritdoc}
*/
......@@ -31,29 +32,6 @@ class ElasticsearchDestinationDatasetTest extends DatasetKernelTestBase {
'data_pipelines_elasticsearch_test',
];
/**
* Creates a test elasticsearch destination.
*
* @return \Drupal\data_pipelines\Entity\DestinationInterface
* The test destination.
*/
protected function createElasticsearchDestination(array $values = []): DestinationInterface {
$destinationId = $values['destinationId'] ?? $this->randomMachineName();
$destination = Destination::create($values + [
'id' => $destinationId,
'label' => 'Test destination',
'destination' => 'elasticsearch',
'destinationSettings' => [
'url' => '',
'username' => '',
'password' => '',
'prefix' => '',
],
]);
$destination->save();
return $destination;
}
/**
* Tests elasticsearch destinations.
*/
......
<?php
declare(strict_types = 1);
namespace Drupal\Tests\data_pipelines_elasticsearch\Kernel;
use Drupal\data_pipelines\Destination\ProcessingOperationEnum;
use Drupal\data_pipelines\Entity\DatasetInterface;
use Drupal\Tests\data_pipelines\Kernel\DatasetKernelTestBase;
use Drupal\Tests\data_pipelines\Traits\DatasetTestProcessingTrait;
use Drupal\Tests\data_pipelines_elasticsearch\Traits\ElasticsearchTestDestinationTrait;
/**
* Elasticsearch Destination Queue Test.
*
* @group data_pipelines
*
* @covers \Drupal\data_pipelines_elasticsearch\Plugin\DatasetDestination\ElasticSearchDestination
*/
class ElasticsearchDestinationQueueTest extends DatasetKernelTestBase {
use DatasetTestProcessingTrait;
use ElasticsearchTestDestinationTrait;
/**
* The data set entity.
*
* @var \Drupal\data_pipelines\Entity\DatasetInterface
*/
protected DatasetInterface $dataset;
/**
* {@inheritdoc}
*/
protected static $modules = [
'data_pipelines_elasticsearch',
'data_pipelines_elasticsearch_test',
];
/**
* {@inheritdoc}
*/
public function setUp() : void {
parent::setUp();
$this->initializeElasticClient();
$this->dataset = $this->createTestDataset(['destinations' => [$this->destination]]);
}
/**
* Tests index creation.
*
* @covers \Drupal\data_pipelines_elasticsearch\Plugin\DatasetDestination\ElasticSearchDestination::getCurrentCount
* @covers \Drupal\data_pipelines_elasticsearch\Plugin\DatasetDestination\ElasticSearchDestination::getProcessingChunkSize
* @covers \Drupal\data_pipelines_elasticsearch\Plugin\DatasetDestination\ElasticSearchDestination::beginProcessing
* @covers \Drupal\data_pipelines_elasticsearch\Plugin\DatasetDestination\ElasticSearchDestination::createIndex
* @covers \Drupal\data_pipelines_elasticsearch\Plugin\DatasetDestination\ElasticSearchDestination::processChunk
* @covers \Drupal\data_pipelines_elasticsearch\Plugin\DatasetDestination\ElasticSearchDestination::endProcessing
*/
public function testIndexing() {
$this->assertEquals(0, $this->destinationPlugin->getCurrentCount($this->dataset));
$queue_id = $this->dataset->getProcessingQueueId();
$process_queue = \Drupal::queue($queue_id);
$this->dataset->queueProcessing($process_queue);
// The ES destination index items in bulk so only one process chunk.
$this->assertEquals(3, $process_queue->numberOfItems());
$this->processQueueWithWait($queue_id, $this->destination->id(), [
ProcessingOperationEnum::BEGIN,
ProcessingOperationEnum::PROCESS_CHUNK,
ProcessingOperationEnum::END,
]);
$this->assertTrue($this->client->indices()->exists(['index' => 'foo_' . $this->dataset->getMachineName()]));
$this->assertEquals(2, $this->destinationPlugin->getCurrentCount($this->dataset));
}
/**
* Tests retain invalid data removal.
*/
public function testRetainInvalid(): void {
$queue_id = $this->dataset->getProcessingQueueId();
/** @var \Drupal\Core\Queue\QueueWorkerInterface $queue_worker */
$process_queue = \Drupal::queue($queue_id);
$this->dataset->queueProcessing($process_queue);
// The elastic destination index all items at once.
$this->assertEquals(3, $process_queue->numberOfItems());
$this->processQueueWithWait($queue_id, $this->destination->id(), [
ProcessingOperationEnum::BEGIN,
ProcessingOperationEnum::PROCESS_CHUNK,
ProcessingOperationEnum::END,
]);
$this->assertEquals(2, $this->destinationPlugin->getCurrentCount($this->dataset));
// Add some more data to the dataset including some invalid data.
$csv = <<<CSV
should_we,firstname,lastname
Y,joe,bloggs
N,betty,bloggs
N,li,bloggs
Y,nick,bloggs
N,nora,bloggs
CSV;
$this->dataset->csv_text->value = $csv;
$this->dataset->save();
$this->dataset->queueProcessing($process_queue);
// The elastic destination index all items at once. Item 3 is invalid but
// still we have no clean up item as we want to retain them.
$this->assertEquals(3, $process_queue->numberOfItems());
$this->processQueueWithWait($queue_id, $this->destination->id(), [
ProcessingOperationEnum::BEGIN,
ProcessingOperationEnum::PROCESS_CHUNK,
ProcessingOperationEnum::END,
]);
// Only 4 valid items are indexed.
$this->assertEquals(4, $this->destinationPlugin->getCurrentCount($this->dataset));
// Remove some more data to the dataset.
$csv = <<<CSV
should_we,firstname,lastname
N,li,bloggs
Y,mike,bloggs
N,lara,bloggs
CSV;
$this->dataset->csv_text->value = $csv;
$this->dataset->save();
$this->dataset->queueProcessing($process_queue);
// The elastic destination index all items at once. Item 1 is invalid but
// still we have no clean up item as we want to retain them.
$this->assertEquals(3, $process_queue->numberOfItems());
$this->processQueueWithWait($queue_id, $this->destination->id(), [
ProcessingOperationEnum::BEGIN,
ProcessingOperationEnum::PROCESS_CHUNK,
ProcessingOperationEnum::END,
]);
// Three older items and two new item all are indexed.
$this->assertEquals(5, $this->destinationPlugin->getCurrentCount($this->dataset));
}
/**
* Tests remove invalid data removal.
*/
public function testRemoveInvalid(): void {
$this->dataset->invalid_values->value = DatasetInterface::INVALID_REMOVE;
$this->dataset->save();
$this->assertEquals(0, $this->destinationPlugin->getCurrentCount($this->dataset));
$queue_id = $this->dataset->getProcessingQueueId();
/** @var \Drupal\Core\Queue\QueueWorkerInterface $queue_worker */
$process_queue = \Drupal::queue($queue_id);
$this->dataset->queueProcessing($process_queue);
// The elastic destination index all items at once.
$this->assertEquals(3, $process_queue->numberOfItems());
$this->processQueueWithWait($queue_id, $this->destination->id(), [
ProcessingOperationEnum::BEGIN,
ProcessingOperationEnum::PROCESS_CHUNK,
ProcessingOperationEnum::END,
]);
$this->assertEquals(2, $this->destinationPlugin->getCurrentCount($this->dataset));
// Add some more data to the dataset including some invalid data.
$csv = <<<CSV
should_we,firstname,lastname
Y,joe,bloggs
N,betty,bloggs
N,li,bloggs
Y,nick,bloggs
N,nora,bloggs
CSV;
$this->dataset->csv_text->value = $csv;
$this->dataset->save();
$this->dataset->queueProcessing($process_queue);
// The elastic destination index all items at once. Item 3 is invalid.
$this->assertEquals(4, $process_queue->numberOfItems());
$this->processQueueWithWait($queue_id, $this->destination->id(), [
ProcessingOperationEnum::BEGIN,
ProcessingOperationEnum::PROCESS_CHUNK,
ProcessingOperationEnum::PROCESS_CLEANUP,
ProcessingOperationEnum::END,
]);
$this->assertEquals(4, $this->destinationPlugin->getCurrentCount($this->dataset));
// Remove some more data to the dataset.
$csv = <<<CSV
should_we,firstname,lastname
N,li,bloggs
Y,mike,bloggs
N,lara,bloggs
CSV;
$this->dataset->csv_text->value = $csv;
$this->dataset->save();
$this->dataset->queueProcessing($process_queue);
// The elastic destination index all items at once. Item 1 is valid.
$this->assertEquals(5, $process_queue->numberOfItems());
$this->processQueueWithWait($queue_id, $this->destination->id(), [
ProcessingOperationEnum::BEGIN,
ProcessingOperationEnum::PROCESS_CHUNK,
// One clean up to remove the invalid.
ProcessingOperationEnum::PROCESS_CLEANUP,
// Second clean up to remove the outdated.
ProcessingOperationEnum::PROCESS_CLEANUP,
ProcessingOperationEnum::END,
]);
$this->assertEquals(2, $this->destinationPlugin->getCurrentCount($this->dataset));
}
}
<?php
declare(strict_types=1);
namespace Drupal\Tests\data_pipelines_elasticsearch\Traits;
use Drupal\data_pipelines\Destination\DatasetDestinationPluginInterface;
use Drupal\data_pipelines\Entity\Destination;
use Drupal\data_pipelines\Entity\DestinationInterface;
use Elasticsearch\Client;
use Elasticsearch\Common\Exceptions\NoNodesAvailableException;
/**
* Elasticsearch Destination Trait.
*/
trait ElasticsearchTestDestinationTrait {
/**
* The destination entity.
*
* @var \Drupal\data_pipelines\Entity\DestinationInterface
*/
protected DestinationInterface $destination;
/**
* The destination plugin instance.
*
* @var \Drupal\data_pipelines\Destination\DatasetDestinationPluginInterface
*/
protected DatasetDestinationPluginInterface $destinationPlugin;
/**
* The Elasticsearch client.
*
* @var \Elasticsearch\Client|null
*/
protected ?Client $client;
/**
* Creates a test elasticsearch destination.
*
* @return \Drupal\data_pipelines\Entity\DestinationInterface
* The test destination.
*/
protected function createElasticsearchDestination(array $values = []): DestinationInterface {
$destinationId = $values['destinationId'] ?? $this->randomMachineName();
$destination = Destination::create($values + [
'id' => $destinationId,
'label' => 'Test destination',
'destination' => 'elasticsearch',
'destinationSettings' => [
'url' => '',
'username' => '',
'password' => '',
'prefix' => '',
],
]);
$destination->save();
return $destination;
}
/**
* Initializes Elastic client.
*
* @covers \Drupal\data_pipelines_elasticsearch\Plugin\DatasetDestination\ElasticSearchDestination::getClient
*/
protected function initializeElasticClient():void {
$this->destination = $this->createElasticsearchDestination([
'destinationSettings' => [
'url' => getenv('ELASTICSEARCH_URL'),
'username' => 'elastic',
'password' => 'changeme',
'prefix' => 'foo_',
],
]);
$this->destinationPlugin = $this->destination->getDestinationPlugin();
$this->client = $this->destinationPlugin->getClient();
$this->assertInstanceOf(Client::class, $this->client);
try {
$this->client->ping();
}
catch (NoNodesAvailableException $exception) {
$this->markTestSkipped('The test is skipped as no alive nodes found in cluster');
}
}
}
......@@ -82,6 +82,13 @@ abstract class DatasetDestinationPluginBase extends PluginBase implements Datase
return TRUE;
}
/**
* {@inheritdoc}
*/
public function getCurrentCount(DatasetInterface $dataset): int {
return 0;
}
/**
* {@inheritdoc}
*/
......
......@@ -54,6 +54,17 @@ interface DatasetDestinationPluginInterface extends PluginFormInterface, Configu
*/
public function deleteDataSet(DatasetInterface $dataset, DestinationInterface $destination): bool;
/**
* Gets count of currently processed data.
*
* @param \Drupal\data_pipelines\Entity\DatasetInterface $dataset
* Dataset.
*
* @return int
* Count of currently processed data.
*/
public function getCurrentCount(DatasetInterface $dataset): int;
/**
* Gets processing chunk size.
*
......
......@@ -179,7 +179,7 @@ class Dataset extends ContentEntityBase implements DatasetInterface {
->setDescription(new TranslatableMarkup('Select the approach to be used for invalid values that were previously valid.'))
->setSetting('allowed_values', [
DatasetInterface::INVALID_RETAIN => new TranslatableMarkup('Retain records that were previously valid'),
DatasetInterface::INVALID_REMOVE => new TranslatableMarkup('Remove records that are now invalid'),
DatasetInterface::INVALID_REMOVE => new TranslatableMarkup('Remove records that are now invalid and outdated'),
])
->setRequired(TRUE)
->setInitialValue(DatasetInterface::INVALID_RETAIN)
......@@ -270,17 +270,20 @@ class Dataset extends ContentEntityBase implements DatasetInterface {
$queue->deleteQueue();
}
$destinations = [];
$original_size = [];
$chunk_sizes = [];
$buffers = [];
foreach ($this->getDestinations() as $destination_entity) {
assert($destination_entity instanceof DestinationInterface);
$destination_id = $destination_entity->id();
// Keep track of destinations so we can easily loop them by ID later.
// Keep track of destinations, so we can easily loop them by ID later.
$destinations[$destination_id] = $destination_entity;
// Get the original size so that we can remove outdated records later.
$original_size[$destination_id] = $destination_entity->getDestinationPlugin()->getCurrentCount($this);
// Store a keyed array of the chunk size each destination wants to store
// in each batch entry.
$chunk_sizes[$destination_id] = $destination_entity->getDestinationPlugin()->getProcessingChunkSize();
// Buffer dataset rows for each destination so we can create the queue
// Buffer dataset rows for each destination, so we can create the queue
// items with the appropriate number of rows (per the chunk size).
$buffers[$destination_id] = [];
if ($from === 0) {
......@@ -289,7 +292,7 @@ class Dataset extends ContentEntityBase implements DatasetInterface {
$queue->createItem(new ProcessingOperation(ProcessingOperationEnum::BEGIN, $destination_id));
}
}
$count = 0;
$total_count = $count = 0;
$iterator = $this->getDataIterator($from);
// Iterate over each row in the dataset.
foreach ($iterator as $delta => $transformed) {
......@@ -331,10 +334,18 @@ class Dataset extends ContentEntityBase implements DatasetInterface {
}
// Queue cleanup of any invalid items.
if ($this->getInvalidValuesHandling() === self::INVALID_REMOVE && $invalid_deltas = $iterator->getInvalidDeltas()) {
$total_count = $count + count($invalid_deltas);
$queue->createItem(new ProcessingOperation(ProcessingOperationEnum::PROCESS_CLEANUP, $destination_id, $invalid_deltas));
}
// We've also reached the final record, so we queue an 'end' operation.
if ($count < $limit) {
// After processing all the data if the current count is less than the
// previously indexed data count then clear up the remaining data.
if ($count < $original_size[$destination_id] && $this->getInvalidValuesHandling() === self::INVALID_REMOVE) {
// Index IDs start from 0 so removing from the current total count
// till the end will remove the outdated items.
$queue->createItem(new ProcessingOperation(ProcessingOperationEnum::PROCESS_CLEANUP, $destination_id, range($total_count, $original_size[$destination_id])));
}
$queue->createItem(new ProcessingOperation(ProcessingOperationEnum::END, $destination_id));
}
}
......
......@@ -122,6 +122,14 @@ class StateDestination extends DatasetDestinationPluginBase implements Container
];
}
/**
* {@inheritdoc}
*/
public function getCurrentCount(DatasetInterface $dataset): int {
$data = $this->getData();
return isset($data[$dataset->id()]) ? count($data[$dataset->id()]) : 0;
}
/**
* {@inheritdoc}
*/
......
......@@ -8,7 +8,7 @@ use Drupal\Core\Queue\QueueWorkerManagerInterface;
use Drupal\data_pipelines\Destination\ProcessingOperation;
use Drupal\data_pipelines\Destination\ProcessingOperationEnum;
use Drupal\data_pipelines\Entity\DatasetInterface;
use Drupal\data_pipelines\Plugin\QueueWorker\DestinationWorker;
use Drupal\Tests\data_pipelines\Traits\DatasetTestProcessingTrait;
/**
* Defines a class for testing queue indexing.
......@@ -19,6 +19,7 @@ use Drupal\data_pipelines\Plugin\QueueWorker\DestinationWorker;
* @covers \Drupal\data_pipelines\Plugin\QueueWorker\DestinationWorker
*/
class DestinationQueueTest extends DatasetKernelTestBase {
use DatasetTestProcessingTrait;
/**
* Tests queue derivation.
......@@ -48,32 +49,16 @@ class DestinationQueueTest extends DatasetKernelTestBase {
$destinationId = $this->randomMachineName();
$dataset = $this->createTestDataset(['destinationId' => $destinationId]);
$queue_id = $dataset->getProcessingQueueId();
$queue_manager = \Drupal::service('plugin.manager.queue_worker');
$process_queue = \Drupal::queue($queue_id);
$dataset->queueProcessing($process_queue);
// The state destination wants items one at a time.
$this->assertEquals(4, $process_queue->numberOfItems());
assert($queue_manager instanceof QueueWorkerManagerInterface);
$this->assertTrue($queue_manager->hasDefinition($queue_id));
$instance = $queue_manager->createInstance($queue_id);
assert($instance instanceof DestinationWorker);
$begin = $process_queue->claimItem()->data;
assert($begin instanceof ProcessingOperation);
$instance->processItem($begin);
$this->assertEquals(ProcessingOperationEnum::BEGIN, $begin->getOperation());
$this->assertEquals($destinationId, $begin->getDestinationId());
$next = $process_queue->claimItem()->data;
assert($next instanceof ProcessingOperation);
$this->assertEquals($destinationId, $next->getDestinationId());
$instance->processItem($next);
$next = $process_queue->claimItem()->data;
assert($next instanceof ProcessingOperation);
$this->assertEquals($destinationId, $next->getDestinationId());
$instance->processItem($next);
$end = $process_queue->claimItem()->data;
assert($end instanceof ProcessingOperation);
$this->assertEquals($destinationId, $end->getDestinationId());
$this->assertEquals(ProcessingOperationEnum::END, $end->getOperation());
$this->processQueue($queue_id, $destinationId, [
ProcessingOperationEnum::BEGIN,
ProcessingOperationEnum::PROCESS_CHUNK,
ProcessingOperationEnum::PROCESS_CHUNK,
ProcessingOperationEnum::END,
]);
$data = \Drupal::state()->get('data_pipelines_test');
$this->assertEquals([$dataset->id() => iterator_to_array($dataset->getDataIterator())], $data);
......@@ -140,4 +125,74 @@ class DestinationQueueTest extends DatasetKernelTestBase {
$this->assertEquals([0, 1], $cleanup->getProcessingData());
}
/**
* Tests outdated data removal.
*/
public function testOutdatedCleanup(): void {
$dataset = $this->createTestDataset([
'invalid_values' => DatasetInterface::INVALID_REMOVE,
]);
$destination = $dataset->getDestinations()[0];
$destination_plugin = $destination->getDestinationPlugin();
$this->assertEquals(0, $destination_plugin->getCurrentCount($dataset));
$queue_id = $dataset->getProcessingQueueId();
/** @var \Drupal\Core\Queue\QueueWorkerInterface $queue_worker */
$process_queue = \Drupal::queue($queue_id);
$dataset->queueProcessing($process_queue);
// The state destination wants items one at a time.
$this->assertEquals(4, $process_queue->numberOfItems());
$this->processQueue($queue_id, $destination->id(), [
ProcessingOperationEnum::BEGIN,
ProcessingOperationEnum::PROCESS_CHUNK,
ProcessingOperationEnum::PROCESS_CHUNK,
ProcessingOperationEnum::END,
]);
$this->assertEquals(2, $destination_plugin->getCurrentCount($dataset));
// Add some more data to the dataset including some invalid data.
$csv = <<<CSV
should_we,firstname,lastname
Y,joe,bloggs
N,betty,bloggs
N,li,bloggs
Y,nick,bloggs
N,nora,bloggs
CSV;
$dataset->csv_text->value = $csv;
$dataset->save();
$dataset->queueProcessing($process_queue);
// The state destination wants items one at a time. Item 3 is invalid.
$this->assertEquals(7, $process_queue->numberOfItems());
$this->processQueue($queue_id, $destination->id(), [
ProcessingOperationEnum::BEGIN,
ProcessingOperationEnum::PROCESS_CHUNK,
ProcessingOperationEnum::PROCESS_CHUNK,
ProcessingOperationEnum::PROCESS_CHUNK,
ProcessingOperationEnum::PROCESS_CHUNK,
ProcessingOperationEnum::PROCESS_CLEANUP,
ProcessingOperationEnum::END,
]);
$this->assertEquals(4, $destination_plugin->getCurrentCount($dataset));
// Remove some more data to the dataset.
$csv = <<<CSV
should_we,firstname,lastname
N,li,bloggs
Y,nick,bloggs
N,nora,bloggs
CSV;
$dataset->csv_text->value = $csv;
$dataset->save();
$dataset->queueProcessing($process_queue);
// The state destination wants items one at a time. Item 1 is valid.
$this->assertEquals(6, $process_queue->numberOfItems());
$this->processQueue($queue_id, $destination->id(), [
ProcessingOperationEnum::BEGIN,
ProcessingOperationEnum::PROCESS_CHUNK,
ProcessingOperationEnum::PROCESS_CHUNK,
ProcessingOperationEnum::PROCESS_CLEANUP,
ProcessingOperationEnum::PROCESS_CLEANUP,
ProcessingOperationEnum::END,
]);
$this->assertEquals(2, $destination_plugin->getCurrentCount($dataset));
}
}
<?php
declare(strict_types=1);
namespace Drupal\Tests\data_pipelines\Traits;
use Drupal\data_pipelines\Destination\ProcessingOperation;
/**
* Dataset Processing Trait.
*/
trait DatasetTestProcessingTrait {
/**
* Processes the data pipeline queue.
*
* @param string $queue_id
* The queue ID.
* @param string $destination_id
* The destination ID.
* @param array $operations
* The array of all the queued operations.
*/
protected function processQueue(string $queue_id, string $destination_id, array $operations): void {
/** @var \Drupal\Core\Queue\QueueWorkerInterface $queue_worker */
$queue_worker = \Drupal::service('plugin.manager.queue_worker')->createInstance($queue_id);
$process_queue = \Drupal::queue($queue_id);
while ($process_queue->numberOfItems()) {
$item = $process_queue->claimItem();
$data = $item->data;
assert($data instanceof ProcessingOperation);
$this->assertEquals(current($operations), $data->getOperation());
$this->assertEquals($destination_id, $data->getDestinationId());
next($operations);
$queue_worker->processItem($data);
$process_queue->deleteItem($item);
}
}
/**
* Processes the data pipeline queue.
*
* @param string $queue_id
* The queue ID.
* @param string $destination_id
* The destination ID.
* @param array $operations
* The array of all the queued operations.
*/
protected function processQueueWithWait(string $queue_id, string $destination_id, array $operations): void {
$this->processQueue($queue_id, $destination_id, $operations);
// Let ES/OS container do its thing.
sleep(1);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment