Skip to content
Snippets Groups Projects
Commit 5c85a925 authored by Jibran Ijaz's avatar Jibran Ijaz
Browse files

Issue #3406111 by jibran, mortim07: Allow unpublishing a pipeline

parent e848db21
No related branches found
Tags 1.0.0-alpha24
1 merge request!19Issue #3406111 by jibran: Allow unpublishing a pipeline
Pipeline #78418 passed
......@@ -334,3 +334,35 @@ function data_pipelines_update_8016(): void {
$entity_definition_update_manager->updateFieldStorageDefinition($field_storage_definition);
}
/**
* Install published field.
*/
function data_pipelines_update_8017(): void {
$entity_definition_update_manager = \Drupal::entityDefinitionUpdateManager();
// Update the entity keys.
$entity_type = $entity_definition_update_manager->getEntityType('data_pipelines');
$keys = $entity_type->getKeys();
$keys['published'] = 'published';
$entity_type->set('entity_keys', $keys);
$entity_definition_update_manager->updateEntityType($entity_type);
// Update the field storage.
$field_storage_definition = BaseFieldDefinition::create('boolean')
->setLabel(new TranslatableMarkup('Published'))
->setDescription(new TranslatableMarkup('Unpublishing the pipeline will remove all the destination data.'))
->setRevisionable(FALSE)
->setTranslatable(FALSE)
->setDefaultValue(TRUE)
->setInitialValue(TRUE)
->setDisplayOptions('view', [
'type' => 'boolean',
'format' => 'yes-no',
'weight' => -4,
])
->setDisplayOptions('form', [
'type' => 'options_select',
'weight' => 50,
]);
$entity_definition_update_manager->installFieldStorageDefinition('published', 'data_pipelines', 'data_pipelines', $field_storage_definition);
}
......@@ -70,6 +70,30 @@ class ElasticsearchDestinationQueueTest extends DatasetKernelTestBase {
]);
$this->assertTrue($this->client->indices()->exists(['index' => 'foo_' . $this->dataset->getMachineName()]));
$this->assertEquals(2, $this->destinationPlugin->getCurrentCount($this->dataset));
// Set unpublished.
$this->dataset->setUnpublished();
$this->dataset->save();
$this->assertFalse($this->client->indices()->exists(['index' => 'foo_' . $this->dataset->getMachineName()]));
// Set published.
$this->dataset->setPublished();
$this->dataset->save();
$this->dataset->queueProcessing($process_queue);
// The ES destination index items in bulk so only one process chunk.
$this->assertEquals(3, $process_queue->numberOfItems());
$this->processQueueWithWait($queue_id, $this->destination->id(), [
ProcessingOperationEnum::BEGIN,
ProcessingOperationEnum::PROCESS_CHUNK,
ProcessingOperationEnum::END,
]);
$this->assertTrue($this->client->indices()->exists(['index' => 'foo_' . $this->dataset->getMachineName()]));
$this->assertEquals(2, $this->destinationPlugin->getCurrentCount($this->dataset));
// Clean up.
$this->dataset->delete();
$this->assertFalse($this->client->indices()->exists(['index' => 'foo_' . $this->dataset->getMachineName()]));
}
/**
......
......@@ -5,6 +5,7 @@ declare(strict_types=1);
namespace Drupal\data_pipelines\Entity;
use Drupal\Core\Entity\ContentEntityBase;
use Drupal\Core\Entity\EntityPublishedTrait;
use Drupal\Core\Entity\EntityStorageInterface;
use Drupal\Core\Entity\EntityTypeInterface;
use Drupal\Core\Field\BaseFieldDefinition;
......@@ -47,6 +48,7 @@ use Drupal\data_pipelines\TransformValidDataIterator;
* "bundle" = "source",
* "uuid" = "uuid",
* "label" = "name",
* "published" = "published",
* },
* handlers = {
* "storage" = \Drupal\data_pipelines\EntityHandlers\DatasetStorage::class,
......@@ -77,11 +79,14 @@ use Drupal\data_pipelines\TransformValidDataIterator;
*/
class Dataset extends ContentEntityBase implements DatasetInterface {
use EntityPublishedTrait;
/**
* {@inheritdoc}
*/
public static function baseFieldDefinitions(EntityTypeInterface $entity_type): array {
$fields = parent::baseFieldDefinitions($entity_type);
$fields['name'] = BaseFieldDefinition::create('string')
->setLabel(new TranslatableMarkup('Name'))
->setRequired(TRUE)
......@@ -115,6 +120,25 @@ class Dataset extends ContentEntityBase implements DatasetInterface {
->setDisplayOptions('form', [
'type' => 'options_select',
]);
// Add the published field.
$fields += static::publishedBaseFieldDefinitions($entity_type);
$fields['published']
->setDescription(new TranslatableMarkup('Unpublishing the pipeline will remove all the destination data.'))
->setRevisionable(FALSE)
->setTranslatable(FALSE)
->setInitialValue(TRUE)
->setDisplayOptions('view', [
'type' => 'boolean',
'format' => 'yes-no',
'label' => 'above',
'weight' => -4,
])
->setDisplayOptions('form', [
'type' => 'options_select',
'weight' => 50,
]);
$fields['status'] = BaseFieldDefinition::create('list_string')
->setLabel(new TranslatableMarkup('Status'))
->setDefaultValue(self::STATUS_INITIAL)
......@@ -131,6 +155,7 @@ class Dataset extends ContentEntityBase implements DatasetInterface {
'weight' => -4,
])
->setRequired(TRUE);
$fields['log'] = BaseFieldDefinition::create('string_long')
->setLabel(new TranslatableMarkup('Log'))
->setDescription(new TranslatableMarkup('Log entries.'))
......@@ -266,6 +291,9 @@ class Dataset extends ContentEntityBase implements DatasetInterface {
* {@inheritdoc}
*/
public function queueProcessing(QueueInterface $queue, int $from = 0, int $limit = 1000): int {
if (!$this->isPublished()) {
return 0;
}
if ($from === 0) {
$queue->deleteQueue();
}
......@@ -528,6 +556,11 @@ class Dataset extends ContentEntityBase implements DatasetInterface {
*/
public function postSave(EntityStorageInterface $storage, $update = TRUE) {
parent::postSave($storage, $update);
if (!$this->isPublished()) {
foreach ($this->getDestinations() as $destination) {
$destination->getDestinationPlugin()->deleteDataSet($this, $destination);
}
}
\Drupal::service('plugin.manager.queue_worker')->clearCachedDefinitions();
}
......
......@@ -5,6 +5,7 @@ declare(strict_types=1);
namespace Drupal\data_pipelines\Entity;
use Drupal\Core\Entity\ContentEntityInterface;
use Drupal\Core\Entity\EntityPublishedInterface;
use Drupal\Core\Queue\QueueInterface;
use Drupal\Core\StringTranslation\TranslatableMarkup;
use Drupal\data_pipelines\TransformValidDataIterator;
......@@ -12,7 +13,7 @@ use Drupal\data_pipelines\TransformValidDataIterator;
/**
* Defines an interface for dataset entities.
*/
interface DatasetInterface extends ContentEntityInterface {
interface DatasetInterface extends ContentEntityInterface, EntityPublishedInterface {
const STATUS_PENDING_VALIDATION = 'pending_validation';
const STATUS_PENDING_PROCESSING = 'pending_processing';
......
......@@ -69,6 +69,9 @@ class DatasetBatchOperations implements ContainerInjectionInterface {
* Batch.
*/
public static function batchForDataset(DatasetInterface $dataset, bool $saving = FALSE): array {
if (!$dataset->isPublished()) {
return [];
}
$batch_builder = new BatchBuilder();
$batch_builder
->setTitle(new TranslatableMarkup('Validating and indexing dataset %name', [
......
......@@ -152,7 +152,14 @@ class DatasetForm extends ContentEntityForm {
$result = $this->save($form, $form_state);
$dataset = $this->entity;
assert($dataset instanceof DatasetInterface);
batch_set(DatasetBatchOperations::batchForDataset($dataset));
if ($dataset->isPublished()) {
batch_set(DatasetBatchOperations::batchForDataset($dataset));
$form_state->setRedirectUrl($dataset->toUrl('collection'));
}
else {
$this->messenger()->addWarning($this->t('Unpublished dataset cannot be indexed.'));
$form_state->setRedirectUrl($dataset->toUrl('edit'));
}
return $result;
}
......
......@@ -39,8 +39,14 @@ class DatasetProcessForm extends ContentEntityConfirmFormBase {
parent::submitForm($form, $form_state);
$dataset = $this->entity;
assert($dataset instanceof DatasetInterface);
batch_set(DatasetBatchOperations::batchForDataset($dataset));
$form_state->setRedirect('entity.data_pipelines.collection');
if ($dataset->isPublished()) {
batch_set(DatasetBatchOperations::batchForDataset($dataset));
$form_state->setRedirectUrl($dataset->toUrl('collection'));
}
else {
$this->messenger()->addWarning($this->t('Unpublished dataset cannot be indexed.'));
$form_state->setRedirectUrl($dataset->toUrl('process'));
}
}
}
......@@ -69,6 +69,28 @@ class DestinationQueueTest extends DatasetKernelTestBase {
$this->assertTrue($expected['should_we']);
$this->assertEquals('bloggs, joe', $expected['full_name']);
// Set unpublished.
$dataset->setUnpublished();
$dataset->save();
$data = \Drupal::state()->get('data_pipelines_test');
$this->assertEmpty($data);
// Set published.
$dataset->setPublished();
$dataset->save();
$dataset->queueProcessing($process_queue);
// The state destination wants items one at a time.
$this->assertEquals(4, $process_queue->numberOfItems());
$this->processQueue($queue_id, $destinationId, [
ProcessingOperationEnum::BEGIN,
ProcessingOperationEnum::PROCESS_CHUNK,
ProcessingOperationEnum::PROCESS_CHUNK,
ProcessingOperationEnum::END,
]);
$data = \Drupal::state()->get('data_pipelines_test');
$this->assertEquals([$dataset->id() => iterator_to_array($dataset->getDataIterator())], $data);
// Clean up.
$dataset->delete();
$data = \Drupal::state()->get('data_pipelines_test');
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment