Commit eda4f056 authored by Jibran Ijaz's avatar Jibran Ijaz Committed by Adam Bramley
Browse files

Issue #3257923 by jibran: Queue the items before re-index

parent 6b1bed21
Loading
Loading
Loading
Loading
+44 −5
Original line number Diff line number Diff line
@@ -12,6 +12,7 @@ use Drupal\Core\Queue\RequeueException;
use Drupal\Core\StringTranslation\TranslatableMarkup;
use Drupal\data_pipelines\Entity\Dataset;
use Drupal\data_pipelines\Entity\DatasetInterface;
use Drupal\data_pipelines\Index\IndexingRequest;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\Validator\ConstraintViolation;

@@ -101,12 +102,15 @@ class DatasetBatchOperations implements ContainerInjectionInterface {
   *
   * @param \Drupal\data_pipelines\Entity\DatasetInterface $dataset
   *   Dataset.
   * @param bool $saving
   *   TRUE if Dataset is being saved FALSE otherwise.
   *
   * @return array
   *   Batch.
   */
  public static function batchForDataset(DatasetInterface $dataset): array {
    return (new BatchBuilder())
  public static function batchForDataset(DatasetInterface $dataset, bool $saving = FALSE): array {
    $batch_builder = new BatchBuilder();
    $batch_builder
      ->setTitle(new TranslatableMarkup('Validating and indexing dataset %name', [
        '%name' => $dataset->label(),
      ]))
@@ -114,9 +118,17 @@ class DatasetBatchOperations implements ContainerInjectionInterface {
      ->setProgressive(TRUE)
      ->setFinishCallback([self::class, 'finished'])
      ->setProgressMessage(new TranslatableMarkup('Step @current of @total'))
      ->addOperation([self::class, 'operationValidate'], [$dataset->id()])
      ->addOperation([self::class, 'operationIndex'], [$dataset->id()])
      ->toArray();
      ->addOperation([self::class, 'operationValidate'], [$dataset->id()]);
    // After saving the Dataset, it is already queued in
    // \Drupal\data_pipelines\EntityHandlers\DatasetStorage::save() so we can
    // skip this step here.
    if (!$saving) {
      $batch_builder
        ->addOperation([self::class, 'operationQueueItem'], [$dataset->id()]);
    }
    $batch_builder
      ->addOperation([self::class, 'operationIndex'], [$dataset->id()]);
    return $batch_builder->toArray();
  }

  /**
@@ -141,6 +153,33 @@ class DatasetBatchOperations implements ContainerInjectionInterface {
    $context['message'] = new TranslatableMarkup('Validation complete');
  }

  /**
   * Batch callback.
   */
  public static function operationQueueItem(int $dataset_id, &$context): void {
    $dataset = Dataset::load($dataset_id);
    assert($dataset instanceof DatasetInterface);
    self::factory()->queue($dataset);
    $context['message'] = new TranslatableMarkup('Data is successfully queued for indexing.');
  }

  /**
   * Queue a dataset for indexing.
   *
   * @param \Drupal\data_pipelines\Entity\DatasetInterface $dataset
   *   Dataset.
   *
   * @return int
   *   Number items.
   */
  protected function queue(DatasetInterface $dataset): int {
    $queue = $this->queueFactory->get('data_pipelines_index:' . $dataset->id());
    // Clear any previous operations.
    $queue->deleteQueue();
    $queue->createItem(IndexingRequest::forDataSet($dataset));
    return $queue->numberOfItems();
  }

  /**
   * Index a dataset.
   *
+1 −1
Original line number Diff line number Diff line
@@ -80,7 +80,7 @@ class DatasetForm extends ContentEntityForm {
        ]);
        break;
    }
    batch_set(DatasetBatchOperations::batchForDataset($dataset));
    batch_set(DatasetBatchOperations::batchForDataset($dataset, TRUE));
    $form_state->setRedirectUrl($dataset->toUrl('collection'));
    return $result;
  }
+19 −0
Original line number Diff line number Diff line
@@ -27,6 +27,7 @@ class DatasetBatchOperationsTest extends DatasetKernelTestBase {
    $batch = DatasetBatchOperations::batchForDataset($dataset);
    $this->assertEquals([
      [[DatasetBatchOperations::class, 'operationValidate'], [$dataset->id()]],
      [[DatasetBatchOperations::class, 'operationQueueItem'], [$dataset->id()]],
      [[DatasetBatchOperations::class, 'operationIndex'], [$dataset->id()]],
    ], $batch['operations']);
  }
@@ -70,6 +71,24 @@ CSV;
    $this->assertEquals('Skipped indexing dataset, validation errors exist.', (string) $context['message']);
  }

  /**
   * Tests queue operation.
   */
  public function testQueuingValid(): void {
    $dataset = $this->createTestDataset([
      'status' => DatasetInterface::STATUS_PENDING_INDEX,
    ]);
    $queue = \Drupal::queue('data_pipelines_index:' . $dataset->id());
    // Clear any previous operations.
    $queue->deleteQueue();
    $context = [
      'results' => [],
    ];
    DatasetBatchOperations::operationQueueItem((int) $dataset->id(), $context);
    $this->assertEquals('Data is successfully queued for indexing.', (string) $context['message']);
    $this->assertEquals(1, $queue->numberOfItems());
  }

  /**
   * Tests index operation.
   */