Loading src/DatasetPipelinePluginManager.php +5 −1 Original line number Diff line number Diff line Loading @@ -47,7 +47,11 @@ final class DatasetPipelinePluginManager extends DefaultPluginManager { */ public function processDefinition(&$definition, $plugin_id) { parent::processDefinition($definition, $plugin_id); $definition += ['validations' => [], 'transforms' => []]; $definition += [ 'validations' => [], 'transforms' => [], 'mappings' => [], ]; $definition['validations'] += [ 'record' => [], 'field' => [], Loading src/Index/DatasetElasticsearchIndexer.php +21 −7 Original line number Diff line number Diff line Loading @@ -9,6 +9,7 @@ use Drupal\Core\Config\ImmutableConfig; use Drupal\Core\Entity\EntityTypeManagerInterface; use Drupal\Core\Logger\LoggerChannelInterface; use Drupal\data_pipelines\DatasetData; use Drupal\data_pipelines\DatasetPipelinePluginManager; use Drupal\data_pipelines\Entity\DatasetInterface; use Drupal\elasticsearch_connector\ClusterManager; use Drupal\elasticsearch_connector\ElasticSearch\ClientManager; Loading Loading @@ -90,12 +91,20 @@ final class DatasetElasticsearchIndexer implements DatasetIndexerInterface { * {@inheritdoc} */ public function indexDataSet(DatasetInterface $dataset): bool { try { $mappings = []; $pipeline_plugin_id = $dataset->getPipelineId(); $pipeline_manager = \Drupal::service('plugin.manager.data_pipelines_pipeline'); assert($pipeline_manager instanceof DatasetPipelinePluginManager); if ($pipeline_manager->hasDefinition($pipeline_plugin_id)) { $pipeline_definition = $pipeline_manager->getDefinition($pipeline_plugin_id); $mappings = $pipeline_definition['mappings'] ?? []; } $index_params = []; $index_params_base = []; $index_name = $this->config->get('index_name_prefix') . $dataset->getMachineName(); $this->createIndex($index_name); $index_params_base['index'] = $index_name; try { $this->createIndex($index_name, $mappings); foreach ($dataset->data() as $delta => $data) { $index_params['id'] = $dataset->getMachineName() . ':' . $delta; assert($data instanceof DatasetData); Loading @@ -118,11 +127,13 @@ final class DatasetElasticsearchIndexer implements DatasetIndexerInterface { * * @param string $name * The name of the index. * @param array $mappings * The mapping information of the index. * * @throws \Drupal\data_pipelines\Index\CouldNotCreateIndexException * When the index can't be created. */ protected function createIndex(string $name): void { protected function createIndex(string $name, array $mappings): void { try { $index_params = []; $index_params_base = []; Loading @@ -134,6 +145,9 @@ final class DatasetElasticsearchIndexer implements DatasetIndexerInterface { 'codec' => self::INDEX_CODEC, ], ]; if (!empty($mappings)) { $index_params['body']['mappings'] = $mappings; } if ($this->client->indices()->exists($index_params_base)) { return; } Loading tests/modules/data_pipelines_test/data_pipelines_test.data_pipelines.yml +42 −0 Original line number Diff line number Diff line Loading @@ -46,3 +46,45 @@ invalid_pipeline_2: - plugin: concat record: - plugin: map test_pipeline_2: label: 'Test pipeline 2' mappings: properties: firstname: type: text index: false lastname: type: text index: false full_name: type: text transforms: field: should_we: - plugin: map map: Y: true N: false record: - plugin: concat fields: - lastname - firstname as: full_name separator: ', ' - plugin: remove fields: - lastname - firstname validations: record: ItemCount: expectedCount: 3 field: firstname: NotBlank: message: 'Firstname is required' Length: min: 3 minMessage: 'Firstname should be at least 3 characters' tests/src/Kernel/DatasetEntityTest.php +1 −0 Original line number Diff line number Diff line Loading @@ -28,6 +28,7 @@ class DatasetEntityTest extends DatasetKernelTestBase { 'test_pipeline_1' => 'Test pipeline 1', 'invalid_pipeline_1' => 'Invalid pipeline 1', 'invalid_pipeline_2' => 'Pipeline with record transform for fields and view versa', 'test_pipeline_2' => 'Test pipeline 2', ], Dataset::getPipelines($dataset->getFieldDefinition('pipeline'), $dataset)); } Loading tests/src/Kernel/Index/DatasetElasticsearchIndexerTest.php +42 −0 Original line number Diff line number Diff line Loading @@ -67,6 +67,48 @@ class DatasetElasticsearchIndexerTest extends DatasetKernelTestBase { $indexer->indexDataSet($dataset); } /** * Tests index creation with mapping. */ public function testIndexCreationWithMapping() { $indices = $this->prophesize(IndicesNamespace::class); $indices->exists(Argument::any())->willReturn(FALSE); $dataset = $this->createTestDataset(['pipeline' => 'test_pipeline_2']); \Drupal::configFactory()->getEditable('data_pipelines.settings')->set('index_name_prefix', 'foo_')->save(); $indices->create([ 'index' => 'foo_' . $dataset->getMachineName(), 'body' => [ 'settings' => [ 'number_of_shards' => DatasetElasticsearchIndexer::INDEX_SHARDS, 'number_of_replicas' => DatasetElasticsearchIndexer::INDEX_REPLICAS, 'codec' => DatasetElasticsearchIndexer::INDEX_CODEC, ], 'mappings' => [ 'properties' => [ 'firstname' => [ 'type' => 'text', 'index' => FALSE, ], 'lastname' => [ 'type' => 'text', 'index' => FALSE, ], 'full_name' => [ 'type' => 'text', ], ], ], ], ])->shouldBeCalled(); $client = $this->prophesize(Client::class); $client->indices()->willReturn($indices->reveal()); $client->CheckResponseAck(Argument::any())->willReturn(TRUE); $indexer = new DatasetElasticsearchIndexer($client->reveal(), $this->prophesize(LoggerChannelInterface::class)->reveal(), \Drupal::config('data_pipelines.settings')); $indexer->indexDataSet($dataset); } /** * Tests index not created if exists. */ Loading Loading
src/DatasetPipelinePluginManager.php +5 −1 Original line number Diff line number Diff line Loading @@ -47,7 +47,11 @@ final class DatasetPipelinePluginManager extends DefaultPluginManager { */ public function processDefinition(&$definition, $plugin_id) { parent::processDefinition($definition, $plugin_id); $definition += ['validations' => [], 'transforms' => []]; $definition += [ 'validations' => [], 'transforms' => [], 'mappings' => [], ]; $definition['validations'] += [ 'record' => [], 'field' => [], Loading
src/Index/DatasetElasticsearchIndexer.php +21 −7 Original line number Diff line number Diff line Loading @@ -9,6 +9,7 @@ use Drupal\Core\Config\ImmutableConfig; use Drupal\Core\Entity\EntityTypeManagerInterface; use Drupal\Core\Logger\LoggerChannelInterface; use Drupal\data_pipelines\DatasetData; use Drupal\data_pipelines\DatasetPipelinePluginManager; use Drupal\data_pipelines\Entity\DatasetInterface; use Drupal\elasticsearch_connector\ClusterManager; use Drupal\elasticsearch_connector\ElasticSearch\ClientManager; Loading Loading @@ -90,12 +91,20 @@ final class DatasetElasticsearchIndexer implements DatasetIndexerInterface { * {@inheritdoc} */ public function indexDataSet(DatasetInterface $dataset): bool { try { $mappings = []; $pipeline_plugin_id = $dataset->getPipelineId(); $pipeline_manager = \Drupal::service('plugin.manager.data_pipelines_pipeline'); assert($pipeline_manager instanceof DatasetPipelinePluginManager); if ($pipeline_manager->hasDefinition($pipeline_plugin_id)) { $pipeline_definition = $pipeline_manager->getDefinition($pipeline_plugin_id); $mappings = $pipeline_definition['mappings'] ?? []; } $index_params = []; $index_params_base = []; $index_name = $this->config->get('index_name_prefix') . $dataset->getMachineName(); $this->createIndex($index_name); $index_params_base['index'] = $index_name; try { $this->createIndex($index_name, $mappings); foreach ($dataset->data() as $delta => $data) { $index_params['id'] = $dataset->getMachineName() . ':' . $delta; assert($data instanceof DatasetData); Loading @@ -118,11 +127,13 @@ final class DatasetElasticsearchIndexer implements DatasetIndexerInterface { * * @param string $name * The name of the index. * @param array $mappings * The mapping information of the index. * * @throws \Drupal\data_pipelines\Index\CouldNotCreateIndexException * When the index can't be created. */ protected function createIndex(string $name): void { protected function createIndex(string $name, array $mappings): void { try { $index_params = []; $index_params_base = []; Loading @@ -134,6 +145,9 @@ final class DatasetElasticsearchIndexer implements DatasetIndexerInterface { 'codec' => self::INDEX_CODEC, ], ]; if (!empty($mappings)) { $index_params['body']['mappings'] = $mappings; } if ($this->client->indices()->exists($index_params_base)) { return; } Loading
tests/modules/data_pipelines_test/data_pipelines_test.data_pipelines.yml +42 −0 Original line number Diff line number Diff line Loading @@ -46,3 +46,45 @@ invalid_pipeline_2: - plugin: concat record: - plugin: map test_pipeline_2: label: 'Test pipeline 2' mappings: properties: firstname: type: text index: false lastname: type: text index: false full_name: type: text transforms: field: should_we: - plugin: map map: Y: true N: false record: - plugin: concat fields: - lastname - firstname as: full_name separator: ', ' - plugin: remove fields: - lastname - firstname validations: record: ItemCount: expectedCount: 3 field: firstname: NotBlank: message: 'Firstname is required' Length: min: 3 minMessage: 'Firstname should be at least 3 characters'
tests/src/Kernel/DatasetEntityTest.php +1 −0 Original line number Diff line number Diff line Loading @@ -28,6 +28,7 @@ class DatasetEntityTest extends DatasetKernelTestBase { 'test_pipeline_1' => 'Test pipeline 1', 'invalid_pipeline_1' => 'Invalid pipeline 1', 'invalid_pipeline_2' => 'Pipeline with record transform for fields and view versa', 'test_pipeline_2' => 'Test pipeline 2', ], Dataset::getPipelines($dataset->getFieldDefinition('pipeline'), $dataset)); } Loading
tests/src/Kernel/Index/DatasetElasticsearchIndexerTest.php +42 −0 Original line number Diff line number Diff line Loading @@ -67,6 +67,48 @@ class DatasetElasticsearchIndexerTest extends DatasetKernelTestBase { $indexer->indexDataSet($dataset); } /** * Tests index creation with mapping. */ public function testIndexCreationWithMapping() { $indices = $this->prophesize(IndicesNamespace::class); $indices->exists(Argument::any())->willReturn(FALSE); $dataset = $this->createTestDataset(['pipeline' => 'test_pipeline_2']); \Drupal::configFactory()->getEditable('data_pipelines.settings')->set('index_name_prefix', 'foo_')->save(); $indices->create([ 'index' => 'foo_' . $dataset->getMachineName(), 'body' => [ 'settings' => [ 'number_of_shards' => DatasetElasticsearchIndexer::INDEX_SHARDS, 'number_of_replicas' => DatasetElasticsearchIndexer::INDEX_REPLICAS, 'codec' => DatasetElasticsearchIndexer::INDEX_CODEC, ], 'mappings' => [ 'properties' => [ 'firstname' => [ 'type' => 'text', 'index' => FALSE, ], 'lastname' => [ 'type' => 'text', 'index' => FALSE, ], 'full_name' => [ 'type' => 'text', ], ], ], ], ])->shouldBeCalled(); $client = $this->prophesize(Client::class); $client->indices()->willReturn($indices->reveal()); $client->CheckResponseAck(Argument::any())->willReturn(TRUE); $indexer = new DatasetElasticsearchIndexer($client->reveal(), $this->prophesize(LoggerChannelInterface::class)->reveal(), \Drupal::config('data_pipelines.settings')); $indexer->indexDataSet($dataset); } /** * Tests index not created if exists. */ Loading