Skip to content
Snippets Groups Projects
Commit 1ce816c5 authored by Dimitris Bozelos's avatar Dimitris Bozelos
Browse files

Issue #3421759 Added base class for running paged imports via cron

parent 7d82e9a1
No related branches found
No related tags found
No related merge requests found
<?php
namespace Drupal\commerce_sheets\Cron;
// Drupal modules.
use Drupal\entity_sync\Entity\OperationInterface;
use Drupal\entity_sync\Import\ManagerInterface as ImportManagerInterface;
use Drupal\entity_sync\MachineName\Field\Operation as OperationField;
use Drupal\entity_sync\StateManagerInterface;
// Drupal core.
use Drupal\Core\Entity\EntityTypeManagerInterface;
/**
* Base class for services that run list imports during cron.
*/
abstract class ListImport {
/**
* Returns the ID of the operation type that defines the list import.
*
* @return string
* The operation type ID.
*/
abstract protected function getOperationTypeId();
/**
* Constructs a new ListImport object.
*
* @param \Drupal\Core\Entity\EntityTypeManagerInterface $entity_type_manager
* The entity type manager.
* @param \Drupal\entity_sync\Import\ManagerInterface $import_manager
* The Entity Synchronization import manager.
* @param \Drupal\entity_sync\StateManagerInterface $state_manager
* The Entity Synchronization state manager.
*/
public function __construct(
EntityTypeManagerInterface $entity_type_manager,
ImportManagerInterface $import_manager,
StateManagerInterface $state_manager
) {
$this->entityTypeManager = $entity_type_manager;
$this->importManager = $import_manager;
$this->stateManager = $state_manager;
}
/**
* Creates an operation for the list import and runs it.
*/
public function run() {
$import = $this->getImport();
if ($import === NULL) {
return;
}
$operation_type_id = $this->getOperationTypeId();
$storage = $this->entityTypeManager->getStorage('entity_sync_operation');
// Create the operation.
$operation = $storage->create([
'type' => $operation_type_id,
'entity' => $import,
]);
$storage->save($operation);
// Move it to the `running` state.
$state_item = $operation->get(OperationField::STATE)->first();
$run_transition = $state_item->getWorkflow()->getRunTransition();
$state_item->applyTransitionById($run_transition);
$storage->save($operation);
// Run the operation.
$this->entityTypeManager
->getStorage('entity_sync_operation_type')
->loadWithPluginInstantiated($operation_type_id)
->getPlugin()
->runner()
->run($operation, $this->getOperationContext($operation));
}
/**
* Returns the import scheduled to be run.
*
* We first check if there is an import that did not finish in its last run,
* and continue importing that. Otherwise, we check if there is another import
* that has been scheduled but has not started yet, and pick up that one.
*
* @return \Drupal\commerce_sheets\Entity\ImportInterface|null
* The import to run, or `NULL` if no scheduled import was found.
*/
protected function getImport() {
$import = $this->getCurrentImport();
if ($import !== NULL) {
return $import;
}
return $this->getNextImport();
}
/**
* Returns the import that did not finish during the last run, if one exists.
*
* @return \Drupal\commerce_sheets\Entity\ImportInterface|null
* The import entity, or `NULL` if no unfinished scheduled import was found.
*/
protected function getCurrentImport() {
$last_run = $this->stateManager->getLastRun(
$this->getOperationTypeId(),
'import_list'
);
// If we don't have an offset for the next run, the last run must have
// finished i.e. no current import, proceed to the next one.
if (($last_run['next_offset'] ?? NULL) === NULL) {
return NULL;
}
if (empty($last_run['data']['import_id'])) {
throw new \RuntimeException(sprintf(
'No Commerce Sheets import ID found for the last run of Entity Synchronization with ID "%s".',
$this->getOperationTypeId()
));
}
$import = $this->entityTypeManager
->getStorage('commerce_sheets_import')
->load($last_run['data']['import_id']);
if (!$import) {
throw new \RuntimeException(sprintf(
'Commerce Sheets import with ID "%s" was not found as required for continuing the last run of Entity Synchronization with ID "%s".',
$last_run['data']['import_id'],
$this->getOperationTypeId()
));
}
// If we have a next offset and the import is not in the `scheduled` state,
// this must have been by mistake e.g. stuck in `running` state.
// @I Log cases for unscheduled imports that have next offset
if ($import->getState()->getId() !== 'scheduled') {
return NULL;
}
return $import;
}
/**
* Returns the next import scheduled to run, if one exists.
*
* @return \Drupal\commerce_sheets\Entity\ImportInterface|null
* The import entity, or `NULL` if no scheduled import was found.
*/
protected function getNextImport() {
$storage = $this->entityTypeManager->getStorage('commerce_sheets_import');
$ids = $storage
->getQuery()
->accessCheck(FALSE)
->condition('state', 'scheduled')
->range(0, 1)
->sort('created', 'asc')
->execute();
if (!$ids) {
return NULL;
}
return $storage->load(current($ids));
}
/**
* Returns the context to be passed to the operation runner.
*
* @param \Drupal\entity_sync\Entity\OperationInterface $operation
* The operation being run.
*
* @return array
* The operation context.
*/
protected function getOperationContext(OperationInterface $operation) {
return [
'filters' => [
'limit' => 500,
],
'options' => [
'client' => [
'limit' => 100,
'import_entity' => $operation->get(OperationField::ENTITY)->entity,
],
'context' => [
'operation_entity' => $operation,
'state' => [
'manager' => 'entity_sync',
],
],
],
];
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment