Verified Commit 5436d804 authored by Jibran Ijaz's avatar Jibran Ijaz Committed by Jibran Ijaz
Browse files

Issue #3263017 by jibran: Allow adding explicit field mapping support to the elastic index

parent 77f8c998
Loading
Loading
Loading
Loading
+5 −1
Original line number Diff line number Diff line
@@ -47,7 +47,11 @@ final class DatasetPipelinePluginManager extends DefaultPluginManager {
   */
  public function processDefinition(&$definition, $plugin_id) {
    parent::processDefinition($definition, $plugin_id);
    $definition += ['validations' => [], 'transforms' => []];
    $definition += [
      'validations' => [],
      'transforms' => [],
      'mappings' => [],
    ];
    $definition['validations'] += [
      'record' => [],
      'field' => [],
+21 −7
Original line number Diff line number Diff line
@@ -9,6 +9,7 @@ use Drupal\Core\Config\ImmutableConfig;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Logger\LoggerChannelInterface;
use Drupal\data_pipelines\DatasetData;
use Drupal\data_pipelines\DatasetPipelinePluginManager;
use Drupal\data_pipelines\Entity\DatasetInterface;
use Drupal\elasticsearch_connector\ClusterManager;
use Drupal\elasticsearch_connector\ElasticSearch\ClientManager;
@@ -90,12 +91,20 @@ final class DatasetElasticsearchIndexer implements DatasetIndexerInterface {
   * {@inheritdoc}
   */
  public function indexDataSet(DatasetInterface $dataset): bool {
    try {
    $mappings = [];
    $pipeline_plugin_id = $dataset->getPipelineId();
    $pipeline_manager = \Drupal::service('plugin.manager.data_pipelines_pipeline');
    assert($pipeline_manager instanceof DatasetPipelinePluginManager);
    if ($pipeline_manager->hasDefinition($pipeline_plugin_id)) {
      $pipeline_definition = $pipeline_manager->getDefinition($pipeline_plugin_id);
      $mappings = $pipeline_definition['mappings'] ?? [];
    }
    $index_params = [];
    $index_params_base = [];
    $index_name = $this->config->get('index_name_prefix') . $dataset->getMachineName();
      $this->createIndex($index_name);
    $index_params_base['index'] = $index_name;
    try {
      $this->createIndex($index_name, $mappings);
      foreach ($dataset->data() as $delta => $data) {
        $index_params['id'] = $dataset->getMachineName() . ':' . $delta;
        assert($data instanceof DatasetData);
@@ -118,11 +127,13 @@ final class DatasetElasticsearchIndexer implements DatasetIndexerInterface {
   *
   * @param string $name
   *   The name of the index.
   * @param array $mappings
   *   The mapping information of the index.
   *
   * @throws \Drupal\data_pipelines\Index\CouldNotCreateIndexException
   *   When the index can't be created.
   */
  protected function createIndex(string $name): void {
  protected function createIndex(string $name, array $mappings): void {
    try {
      $index_params = [];
      $index_params_base = [];
@@ -134,6 +145,9 @@ final class DatasetElasticsearchIndexer implements DatasetIndexerInterface {
          'codec' => self::INDEX_CODEC,
        ],
      ];
      if (!empty($mappings)) {
        $index_params['body']['mappings'] = $mappings;
      }
      if ($this->client->indices()->exists($index_params_base)) {
        return;
      }
+42 −0
Original line number Diff line number Diff line
@@ -46,3 +46,45 @@ invalid_pipeline_2:
        - plugin: concat
    record:
      - plugin: map

test_pipeline_2:
  label: 'Test pipeline 2'
  mappings:
    properties:
      firstname:
        type: text
        index: false
      lastname:
        type: text
        index: false
      full_name:
        type: text
  transforms:
    field:
      should_we:
        - plugin: map
          map:
            Y: true
            N: false
    record:
      - plugin: concat
        fields:
          - lastname
          - firstname
        as: full_name
        separator: ', '
      - plugin: remove
        fields:
          - lastname
          - firstname
  validations:
    record:
      ItemCount:
        expectedCount: 3
    field:
      firstname:
        NotBlank:
          message: 'Firstname is required'
        Length:
          min: 3
          minMessage: 'Firstname should be at least 3 characters'
+1 −0
Original line number Diff line number Diff line
@@ -28,6 +28,7 @@ class DatasetEntityTest extends DatasetKernelTestBase {
      'test_pipeline_1' => 'Test pipeline 1',
      'invalid_pipeline_1' => 'Invalid pipeline 1',
      'invalid_pipeline_2' => 'Pipeline with record transform for fields and view versa',
      'test_pipeline_2' => 'Test pipeline 2',
    ], Dataset::getPipelines($dataset->getFieldDefinition('pipeline'), $dataset));
  }

+42 −0
Original line number Diff line number Diff line
@@ -67,6 +67,48 @@ class DatasetElasticsearchIndexerTest extends DatasetKernelTestBase {
    $indexer->indexDataSet($dataset);
  }

  /**
   * Tests index creation with mapping.
   */
  public function testIndexCreationWithMapping() {
    $indices = $this->prophesize(IndicesNamespace::class);
    $indices->exists(Argument::any())->willReturn(FALSE);
    $dataset = $this->createTestDataset(['pipeline' => 'test_pipeline_2']);

    \Drupal::configFactory()->getEditable('data_pipelines.settings')->set('index_name_prefix', 'foo_')->save();
    $indices->create([
      'index' => 'foo_' . $dataset->getMachineName(),
      'body' => [
        'settings' => [
          'number_of_shards' => DatasetElasticsearchIndexer::INDEX_SHARDS,
          'number_of_replicas' => DatasetElasticsearchIndexer::INDEX_REPLICAS,
          'codec' => DatasetElasticsearchIndexer::INDEX_CODEC,
        ],
        'mappings' => [
          'properties' => [
            'firstname' => [
              'type' => 'text',
              'index' => FALSE,
            ],
            'lastname' => [
              'type' => 'text',
              'index' => FALSE,
            ],
            'full_name' => [
              'type' => 'text',
            ],
          ],
        ],
      ],
    ])->shouldBeCalled();
    $client = $this->prophesize(Client::class);
    $client->indices()->willReturn($indices->reveal());
    $client->CheckResponseAck(Argument::any())->willReturn(TRUE);

    $indexer = new DatasetElasticsearchIndexer($client->reveal(), $this->prophesize(LoggerChannelInterface::class)->reveal(), \Drupal::config('data_pipelines.settings'));
    $indexer->indexDataSet($dataset);
  }

  /**
   * Tests index not created if exists.
   */