Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.

Data Pipelines

Key concepts

DataSet - content entity that stores the association between a source plugin and a pipeline Pipeline - handles validation and transformation for indexing. Made up of validation and transform plugins. Validation - handled by core's validation API Transforms - plugins that either handle transforming an entire record or a single field. Very similar to migrate process plugins.

Source plugins

Source plugins form the bundle of the dataset content entity. You can add a new source plugin just as you would any other plugin. A source plugin needs to implement two methods - extractDataFromDataSet and buildFieldDefinitions The field definition should return the field used to store the data. E.g. for the CSV File source, this is a file field. The extract data method then deals with getting data out of the given field.

Transform plugins

Transform plugins can take a record, or a field and apply a transform. They should extend from TransformPluginBase and implement one or both of doTransformField or doTransformRecord. Both of these methods take a DatasetData object as a parameter, and should operate on it and return the transformed version. The doTransformField method also takes the field name. The field value can be accessed from the passed DatasetData object. Use the fields and records properties in the annotation to signify which sorts of transforms it supports.

Pipeline plugins

Pipelines are defined in YML. They consist of an ID and label, as well as a series of optional field and record validations and transforms. Pipelines should be listed in a file in the root of your module named {yourmodule}.data_pipelines.yml. E.g. if your module machine name is whizbang, then the file will be modules/whizbang/whizbang.data_pipelines.yml. Here's a sample definition:

a_pipeline:
  label: 'A data pipeline'
  transforms:
    field:
      some_field:
        - plugin: map
          map:
            Y: true
            N: false
    record:
      - plugin: concat
        fields:
          - lastname
          - firstname
        as: full_name
        separator: ', '
  validations:
    record:
      ItemCount:
        expectedCount: 3
    field:
      firstname:
        NotBlank:
          message: 'Firstname is required'
        Length:
          min: 3
          minMessage: 'Firstname should be at least 3 characters'

The top level keys are:

  • label
  • transforms
  • validations

The transforms key has two sub-keys as follows:

  • field
  • record

The field key is a series of named field transforms. The keys of which are the field names by which data can be accessed in the DatasetData object. The value is a sequence of transform plugins and their configuration. You can define multiple transforms for the same field. Each entry in the sequence must contain at least the plugin key. Any other keys are considered configuration for the plugin. Refer to each plugin's defaultConfiguration entry for the possible configuration values.

The record key is a sequence of transform plugins and their configuration. As with field transforms, you can define multiple transforms. Each entry must contain the plugin key, and all other keys are configuration for the plugin.

The validations key has two sub-keys as well:

  • field
  • record

The field key is a series of field validations. The keys are field names, and the values are a map of validation plugins. Each ID in the map is the validation constraint plugin IDs. These just use Drupal's built in validation API. The values in the map are the configuration options for the plugin, if any. If the plugin has no configuration, pass { } - an empty map.

The record key is a map of validation plugins. Again the keys are the validation plugin IDs whilst the values are the configuration options.

Setting a destination

Each pipeline needs a destination which can be set through the UI at /admin/config/content/dataset_destinations. By default the module provides a destination of a JSON file which allows a directory path to specified. Other modules may provide additional destinations. For example the Elasticsearch sub-module provides support for pushing the processed data to a search index.

Processing flow

Each dataset has its own processing queue. This queue is defined using a plugin deriver DatasetQueueWorkerDeriver. When a dataset is saved or deleted, a processing task is added to the queue. Deleting is handled in the background, whilst the form submission handler for creating or updating a dataset entity triggers a batch task that processes it immediately. This is done after the entity is saved. This allows for large datasets that may take time to process, as the task will occur outside the normal save workflow.

When a dataset entity is saved, its status is set to Pending validation. Then a batch operation performs the validation and if it is valid, moves it to Pending indexing. The next batch operation performs the indexing and moves it to Indexed.

When the dataset is saved, the base Drupal entity-validations are applied, but the dataset itself is not validated. This is because it is feasible that the dataset validation could be an expensive operation in the case of large dataset. For this reason it occurs in background processing.

Managing datasets

Datasets can be managed from /admin/content/datasets

API

Getting data from the dataset

The data can be accessed with $dataset->getDataIterator() this will take care of calling the source plugin to extract the data, and any transform plugins. The result will be an iterator and each return will be a DatasetData object. Fetching all results in one go is not recommended, instead you should use a destination plugin to push the data to e.g. an Elasticsearch index or a file. Fetching all data in one pass may not be possible in a single request for large datasets. For this reason processing occurs via the batch and queue APIS.