Data Pipelines
Key concepts
DataSet - content entity that stores the association between a source plugin and a pipeline Pipeline - handles validation and transformation for indexing. Made up of validation and transform plugins. Validation - handled by core's validation API Transforms - plugins that either handle transforming an entire record or a single field. Very similar to migrate process plugins.
Source plugins
Source plugins form the bundle of the dataset content entity.
You can add a new source plugin just as you would any other plugin.
A source plugin needs to implement two methods - extractDataFromDataSet
and
buildFieldDefinitions
The field definition should return the field used to store the data. E.g. for
the CSV File source, this is a file field.
The extract data method then deals with getting data out of the given field.
Transform plugins
Transform plugins can take a record, or a field and apply a transform.
They should extend from TransformPluginBase
and implement one or both of
doTransformField
or doTransformRecord
.
Both of these methods take a DatasetData
object as a parameter, and should
operate on it and return the transformed version.
The doTransformField
method also takes the field name. The field value can be
accessed from the passed DatasetData
object.
Use the fields
and records
properties in the annotation to signify which
sorts of transforms it supports.
Pipeline plugins
Pipelines are defined in YML. They consist of an ID and label, as well as a
series of optional field and record validations and transforms.
Pipelines should be listed in a file in the root of your module named
{yourmodule}.data_pipelines.yml
.
E.g. if your module machine name is whizbang
, then the file will be
modules/whizbang/whizbang.data_pipelines.yml
.
Here's a sample definition:
a_pipeline:
label: 'A data pipeline'
transforms:
field:
some_field:
- plugin: map
map:
Y: true
N: false
record:
- plugin: concat
fields:
- lastname
- firstname
as: full_name
separator: ', '
validations:
record:
ItemCount:
expectedCount: 3
field:
firstname:
NotBlank:
message: 'Firstname is required'
Length:
min: 3
minMessage: 'Firstname should be at least 3 characters'
The top level keys are:
- label
- transforms
- validations
The transforms
key has two sub-keys as follows:
- field
- record
The field
key is a series of named field transforms. The keys of which are
the field names by which data can be accessed in the DatasetData
object.
The value is a sequence of transform plugins and their configuration. You can
define multiple transforms for the same field.
Each entry in the sequence must contain at least the plugin
key. Any other
keys are considered configuration for the plugin.
Refer to each plugin's defaultConfiguration
entry for the possible
configuration values.
The record
key is a sequence of transform plugins and their configuration. As
with field transforms, you can define multiple transforms.
Each entry must contain the plugin
key, and all other keys are configuration
for the plugin.
The validations
key has two sub-keys as well:
- field
- record
The field
key is a series of field validations. The keys are field names, and
the values are a map of validation plugins.
Each ID in the map is the validation constraint plugin IDs. These just use
Drupal's built in validation API.
The values in the map are the configuration options for the plugin, if any.
If the plugin has no configuration, pass { }
- an empty map.
The record
key is a map of validation plugins. Again the keys are the
validation plugin IDs whilst the values are the configuration options.
Setting a destination
Each pipeline needs a destination which can be set through the UI at /admin/config/content/dataset_destinations
.
By default the module provides a destination of a JSON file which allows a directory path to specified.
Other modules may provide additional destinations. For example the Elasticsearch sub-module provides support for
pushing the processed data to a search index.
Processing flow
Each dataset has its own processing queue. This queue is defined using a plugin
deriver DatasetQueueWorkerDeriver
.
When a dataset is saved or deleted, a processing task is added to the queue.
Deleting is handled in the background, whilst the form submission handler for
creating or updating a dataset entity triggers
a batch task that processes it immediately. This is done after the entity is
saved. This allows for large datasets that may take time
to process, as the task will occur outside the normal save workflow.
When a dataset entity is saved, its status is set to Pending validation. Then a batch operation performs the validation and if it is valid, moves it to Pending indexing. The next batch operation performs the indexing and moves it to Indexed.
When the dataset is saved, the base Drupal entity-validations are applied, but the dataset itself is not validated. This is because it is feasible that the dataset validation could be an expensive operation in the case of large dataset. For this reason it occurs in background processing.
Managing datasets
Datasets can be managed from /admin/content/datasets
API
Getting data from the dataset
The data can be accessed with $dataset->getDataIterator()
this will take care of calling
the source plugin to extract the data, and any transform plugins.
The result will be an iterator and each return will be a DatasetData
object.
Fetching all results in one go is not recommended, instead you should use a destination plugin to push the data to
e.g. an Elasticsearch index or a file. Fetching all data in one pass may not be possible in a single request for large
datasets. For this reason processing occurs via the batch and queue APIS.