Loading src/Thread/MigrationImportThread.php +156 −61 Original line number Diff line number Diff line Loading @@ -6,9 +6,11 @@ namespace Drupal\smart_migrate_cli\Thread; use Consolidation\SiteAlias\SiteAliasInterface; use Consolidation\SiteProcess\SiteProcess; use Drupal\migrate\Plugin\migrate\id_map\Sql; use Drupal\migrate\Plugin\MigrateIdMapInterface; use Drupal\migrate\Plugin\MigrationInterface; use Drupal\smart_migrate_cli\SmartMigrateCli; use Drupal\smart_migrate_cli\Utility\MigrationSourceUtility; use Drush\Drush; use Symfony\Component\Console\Helper\ProgressBar; use Symfony\Component\Console\Output\OutputInterface; Loading Loading @@ -71,6 +73,27 @@ class MigrationImportThread implements MigrationThreadInterface { */ protected int $sourceCount = 0; /** * Number of items to process. * * @var int */ protected int $itemsToProcessCount = 0; /** * Changes are tracked. * * @var bool */ protected bool $trackChanges = FALSE; /** * High water is tracked. * * @var bool */ protected bool $highwater = FALSE; /** * Number of the processed items since the previous update. * Loading @@ -86,18 +109,27 @@ class MigrationImportThread implements MigrationThreadInterface { protected int $currentProgress = 0; /** * IDs of the follow-up tasks, if any. * Number of the previously processed items. * * @var string[] * @var int */ protected array $followUps = []; protected int $initialIdMapCount = 0; /** * Errors as string, if any. * Whether last import timestamp is tracked. * * @var string[][] * @var bool */ protected array $errors = []; protected bool $trackLastImported = FALSE; /** * The most recently imported item's import timestamp. * * (... if last import is tracked). * * @var int */ protected int $originalLastImported = 0; /** * Constructs a MigrationThread instance. Loading Loading @@ -168,10 +200,21 @@ class MigrationImportThread implements MigrationThreadInterface { ->enableOutput(); $this->migrationId = $migration->id(); $this->migrationIdMap = (clone $migration)->getIdMap(); // Source count can be -1. $this->sourceCount = (clone $migration)->getSourcePlugin()->count(); $this->errors = []; $this->followUps = []; $this->source = (clone $migration)->getSourcePlugin(); $this->sourceCount = $this->source->count(); $this->trackChanges = (bool) ($migration->getSourceConfiguration()['track_changes'] ?? FALSE); $this->highwater = array_key_exists('high_water_property', $migration->getSourceConfiguration()); $this->itemsToProcessCount = MigrationSourceUtility::getNumberOfItemsToImportProcessFromPieces( $this->source, $this->migrationIdMap, $this->getMigrationId(), $this->trackChanges, $this->highwater ); $this->trackLastImported = $migration->getTrackLastImported(); if ($this->trackChanges) { $this->initialIdMapCount = $this->migrationIdMap->processedCount() - $this->migrationIdMap->updateCount(); } } /** Loading @@ -182,20 +225,23 @@ class MigrationImportThread implements MigrationThreadInterface { throw new \LogicException('Cannot start an already active migration thread.'); } $source_count = max(0, $this->sourceCount); // Source count can be -1. $processed_count = $this->sourceCount >= 0 ? $this->migrationIdMap->processedCount() $this->originalLastImported = $this->trackLastImported ? time() : 0; $this->previousProgress = $processed_count; $this->currentProgress = $processed_count; $format = $source_count > 0 ? SmartMigrateCli::PROGRESS_BAR_FORMAT_ID : SmartMigrateCli::PROGRESS_BAR_FORMAT_ID_NOMAX; if (!$this->trackLastImported) { assert($this->migrationIdMap instanceof Sql); $this->migrationIdMap->getDatabase()->update($this->migrationIdMap->mapTableName()) ->fields(['last_imported' => 1]) ->execute(); } $format = $this->itemsToProcessCount >= 0 ? SmartMigrateCli::PROGRESS_BAR_FORMAT_ID : SmartMigrateCli::PROGRESS_BAR_FORMAT_ID_NOMAX; $this->resetBar(); $this->bar->setMessage("<info>{$this->migrationId}</info>"); $this->bar->setFormat($format); $this->bar->start($source_count); $this->bar->start(max(0, $this->itemsToProcessCount)); $this->bar->setProgress($this->currentProgress); $this->process->start(); Loading Loading @@ -226,33 +272,44 @@ class MigrationImportThread implements MigrationThreadInterface { $this->process = NULL; $this->migrationId = NULL; $this->sourceCount = 0; $this->sourceConfig = []; $this->previousProgress = 0; $this->currentProgress = 0; $this->migrationIdMap = NULL; $this->errors = []; $this->followUps = []; $this->itemsToProcessCount = 0; $this->initialIdMapCount = 0; $this->resetBar(); } /** * {@inheritdoc} */ public function reportProgress(): int { if ($this->isIdle()) { return 0; } $progress = $this->updateProgress(); if ($this->isFinished()) { public function getReport(): ThreadReportInterface { // We cannot pause a subprocess. Because of this, we always risk race // conditions, because we cannot determine the various states of a process // in the same moment. It might happen that the import process still works // on the last item when we ask for a progress update (and get know how many // items are processed so far), and after that, when we check whether the // subprocess is terminated, we get a TRUE result. // So to prevent any race condition, we check "isFinished" first and store // its return value in a local variable. Of course, it still can happen that // the process is finished after we checked its termination, but in this // case, we will be able to properly finish the job when the scheduler // asks for a new report next time (then we will report "finished" status // with 0 items processed since the last report). $is_finished = $this->isFinished() && !($is_idle = $this->isIdle()); $progress = !$is_idle ? $this->updateProgress() : 0; $fully_processed = !$is_idle && $this->isFullyProcessed(); if ($is_finished) { $process_output = array_filter(explode("\n", $this->process->getOutput())); $last = array_pop($process_output); $message = $last ? json_decode($last, TRUE) : []; $this->followUps = array_combine( $follow_ups = array_combine( $message['follow_ups'] ?? [], $message['follow_ups'] ?? [], ); $this->errors = array_reduce( $errors = array_reduce( array_filter(explode("\n", $this->process->getErrorOutput())), function (array $carry, string $error) { $key = md5($error); Loading @@ -265,58 +322,96 @@ class MigrationImportThread implements MigrationThreadInterface { }, [] ); // Reset the ID map plugin's last_import column if needed: if last import // is not tracked, we write '1' into this column before we start the // subprocess. if (!$this->trackLastImported) { assert($this->migrationIdMap instanceof Sql); $this->migrationIdMap->getDatabase()->update($this->migrationIdMap->mapTableName()) ->fields(['last_imported' => 0]) ->execute(); } return $progress; // If there are no errors, and the current migration was configured to // track changes, and the current progress does not match the initially // calculated number of the items to process, we assume that the source // plugin just skipped some previously imported unchanged rows. if ( !$errors && $this->trackChanges && $this->currentProgress !== $this->itemsToProcessCount && $this->sourceCount <= $this->migrationIdMap->processedCount() ) { $progress += $this->initialIdMapCount; } /** * {@inheritdoc} */ public function getErrors(): ?array { return $this->isFinished() && !empty($this->errors) ? $this->errors : NULL; } /** * {@inheritdoc} */ public function getFollowUps(): array { return $this->followUps; return new ThreadReport( $progress ?? 0, $is_finished ?? FALSE, $fully_processed ?? FALSE, $errors ?? NULL, $follow_ups ?? NULL, $is_finished ? $this->process->getExitCode() : NULL ); } /** * Updates the progress bar of the thread, and returns the progress. * Updates the progress bar of the thread and returns the progress. * * @return int * The number of the processed items since the last update. */ protected function updateProgress(): int { $this->previousProgress = $this->currentProgress; $this->currentProgress = $this->migrationIdMap->processedCount(); $this->currentProgress = $this->getCurrentProgress(); $this->bar->setProgress($this->currentProgress); return $this->currentProgress - $this->previousProgress; } /** * {@inheritdoc} * Returns the actual number of the processed items. * * @return int * The actual number of the processed items. */ public function isFullyProcessed(): bool { // If the source is uncountable, we have no way of knowing if it is // complete, so assume that it is. return $this->sourceCount < 0 ? $this->process->isTerminated() : $this->process->isTerminated() && $this->sourceCount <= $this->currentProgress; protected function getCurrentProgress(): int { assert($this->migrationIdMap instanceof Sql); $current_progress_q = $this->migrationIdMap->getDatabase()->select($this->migrationIdMap->mapTableName(), 'm') ->fields('m', ['last_imported']); if ($this->trackLastImported) { $current_progress_q->condition('m.last_imported', $this->originalLastImported, '>='); } else { $current_progress_q->condition('m.last_imported', 1, '<>'); } return (int) $current_progress_q->countQuery()->execute()->fetchField(); } /** * {@inheritdoc} * Determines whether the current task is fully processed. * * @return bool * Whether the current task is fully processed. */ public function exitCode(): ?int { return isset($this->process) ? $this->process->getExitCode() : NULL; protected function isFullyProcessed(): bool { if (!$this->process->isTerminated()) { return FALSE; } // If the source is uncountable, we have no way of knowing if the migration // is complete, so assume that it is. if ($this->sourceCount < 0) { return TRUE; } if ($this->itemsToProcessCount < 0) { return TRUE; } // Task is fully process if the number of items to process matches current // progress... return $this->itemsToProcessCount <= $this->currentProgress || // ...or if we are tracking changes, the source count matches the process // count (this slightly simplifies the situation). $this->trackChanges && $this->sourceCount <= $this->migrationIdMap->processedCount(); } /** Loading src/Thread/MigrationRollbackRemovedThread.php +0 −2 Original line number Diff line number Diff line Loading @@ -41,8 +41,6 @@ class MigrationRollbackRemovedThread extends MigrationImportThread { $removed_items_id_map = (clone $migration)->set('idMap', ['plugin' => $this->migrationIdMap->getPluginId() . '_rollback_removed'])->getIdMap(); $removed_items_id_map->rewind(); $this->sourceCount = iterator_count($removed_items_id_map); $this->errors = []; $this->followUps = []; unset($removed_items_id_map); } Loading src/Thread/MigrationThreadInterface.php +5 −36 Original line number Diff line number Diff line Loading @@ -7,7 +7,9 @@ namespace Drupal\smart_migrate_cli\Thread; use Drupal\migrate\Plugin\MigrationInterface; /** * Interface fot migration threads. * Interface of migration threads.. * * @internal */ interface MigrationThreadInterface { Loading Loading @@ -35,39 +37,6 @@ interface MigrationThreadInterface { */ public function isFinished(): bool; /** * Whether the subprocess processed all its items. * * @return bool * Whether all migration rows are processed. */ public function isFullyProcessed(): bool; /** * Returns errors of the process. * * @return string[][]|null * The errors reported during the subprocess. NULL if the process did not * finish yet or if no errors were captured. */ public function getErrors(): ?array; /** * Returns the IDs of the follow-up tasks, if any. * * @return string[] * The IDs of the follow-up tasks, if any. */ public function getFollowUps(): array; /** * Returns the exit code of the subprocess, if it exists and already finished. * * @return int|null * The exit code of the subprocess, if it exists and already finished. */ public function exitCode(): ?int; /** * Returns the migration plugin ID of the thread, if any. * Loading Loading @@ -101,8 +70,8 @@ interface MigrationThreadInterface { public function reset(): void; /** * Updates progress and returns the number of the processed items. * Returns report about the thread and its subprocess. */ public function reportProgress(): int; public function getReport(): ThreadReportInterface; } src/Thread/ThreadReport.php 0 → 100644 +123 −0 Original line number Diff line number Diff line <?php declare(strict_types = 1); namespace Drupal\smart_migrate_cli\Thread; /** * Simple report object about a thread's task and subprocess.. * * @internal */ class ThreadReport implements ThreadReportInterface { /** * The progress of the actual task since the last report. * * @var int */ protected int $progress; /** * Whether the task is finished or not. * * @var bool */ protected bool $processFinished; /** * Whether the task is fully done or not. * * @var bool */ protected bool $taskIsFullyProcessed; /** * Errors captured during the execution of the task. * * @var array|null */ protected ?array $errors; /** * IDs of the follow-up tasks, if any. * * @var array|null */ protected ?array $followUps; /** * Exit code of the task's subprocess. * * @var int|null */ protected ?int $exitCode; /** * Constructs a ThreadReport instance. * * @param int $progress * The progress of the actual task since the last report. * @param bool $process_finished * Whether the task is finished or not. * @param bool $fully_processed * Whether the task is fully done or not. * @param array|null $errors * Errors captured during the execution of the task. * @param array|null $follow_ups * IDs of the follow-up tasks, if any. * @param int|null $exit_code * Exit code of the task's subprocess. */ public function __construct(int $progress, bool $process_finished = FALSE, bool $fully_processed = FALSE, ?array $errors = NULL, ?array $follow_ups = NULL, ?int $exit_code = NULL) { $this->progress = $progress; $this->processFinished = $process_finished; $this->taskIsFullyProcessed = $fully_processed; $this->errors = $errors; $this->followUps = $follow_ups; $this->exitCode = $exit_code; } /** * {@inheritdoc} */ public function getProgress(): int { return $this->progress; } /** * {@inheritdoc} */ public function processIsFinished(): bool { return $this->processFinished; } /** * {@inheritdoc} */ public function taskIsFullyProcessed(): bool { return $this->taskIsFullyProcessed; } /** * {@inheritdoc} */ public function getProcessErrors(): ?array { return $this->errors; } /** * {@inheritdoc} */ public function getFollowUpTasks(): ?array { return $this->followUps; } /** * {@inheritdoc} */ public function getProcessExistCode(): ?int { return $this->exitCode; } } src/Thread/ThreadReportInterface.php 0 → 100644 +63 −0 Original line number Diff line number Diff line <?php declare(strict_types = 1); namespace Drupal\smart_migrate_cli\Thread; /** * Interface of thread reports.. * * @internal */ interface ThreadReportInterface { /** * Updates progress and returns the number of the processed items. * * @return int * Processed items since the last report. */ public function getProgress(): int; /** * Whether the underlying process is finished. * * @return bool * Whether the subprocess is finished (terminated). */ public function processIsFinished(): bool; /** * Whether the underlying process is fully processed. * * @return bool * Whether the task of the subprocess is fully finished. */ public function taskIsFullyProcessed(): bool; /** * Returns the errors of the subprocess. * * @return array|null * Errors happened during the subprocess as array. */ public function getProcessErrors(): ?array; /** * Returns the follow-up tasks. * * @return array|null * IDs of the follow up tasks which should be executed after * the successfully finished subprocess. */ public function getFollowUpTasks(): ?array; /** * Returns the exit code of the subprocess if it is finished. * * @return int|null * Exit code of the subprocess if it is terminated. */ public function getProcessExistCode(): ?int; } Loading
src/Thread/MigrationImportThread.php +156 −61 Original line number Diff line number Diff line Loading @@ -6,9 +6,11 @@ namespace Drupal\smart_migrate_cli\Thread; use Consolidation\SiteAlias\SiteAliasInterface; use Consolidation\SiteProcess\SiteProcess; use Drupal\migrate\Plugin\migrate\id_map\Sql; use Drupal\migrate\Plugin\MigrateIdMapInterface; use Drupal\migrate\Plugin\MigrationInterface; use Drupal\smart_migrate_cli\SmartMigrateCli; use Drupal\smart_migrate_cli\Utility\MigrationSourceUtility; use Drush\Drush; use Symfony\Component\Console\Helper\ProgressBar; use Symfony\Component\Console\Output\OutputInterface; Loading Loading @@ -71,6 +73,27 @@ class MigrationImportThread implements MigrationThreadInterface { */ protected int $sourceCount = 0; /** * Number of items to process. * * @var int */ protected int $itemsToProcessCount = 0; /** * Changes are tracked. * * @var bool */ protected bool $trackChanges = FALSE; /** * High water is tracked. * * @var bool */ protected bool $highwater = FALSE; /** * Number of the processed items since the previous update. * Loading @@ -86,18 +109,27 @@ class MigrationImportThread implements MigrationThreadInterface { protected int $currentProgress = 0; /** * IDs of the follow-up tasks, if any. * Number of the previously processed items. * * @var string[] * @var int */ protected array $followUps = []; protected int $initialIdMapCount = 0; /** * Errors as string, if any. * Whether last import timestamp is tracked. * * @var string[][] * @var bool */ protected array $errors = []; protected bool $trackLastImported = FALSE; /** * The most recently imported item's import timestamp. * * (... if last import is tracked). * * @var int */ protected int $originalLastImported = 0; /** * Constructs a MigrationThread instance. Loading Loading @@ -168,10 +200,21 @@ class MigrationImportThread implements MigrationThreadInterface { ->enableOutput(); $this->migrationId = $migration->id(); $this->migrationIdMap = (clone $migration)->getIdMap(); // Source count can be -1. $this->sourceCount = (clone $migration)->getSourcePlugin()->count(); $this->errors = []; $this->followUps = []; $this->source = (clone $migration)->getSourcePlugin(); $this->sourceCount = $this->source->count(); $this->trackChanges = (bool) ($migration->getSourceConfiguration()['track_changes'] ?? FALSE); $this->highwater = array_key_exists('high_water_property', $migration->getSourceConfiguration()); $this->itemsToProcessCount = MigrationSourceUtility::getNumberOfItemsToImportProcessFromPieces( $this->source, $this->migrationIdMap, $this->getMigrationId(), $this->trackChanges, $this->highwater ); $this->trackLastImported = $migration->getTrackLastImported(); if ($this->trackChanges) { $this->initialIdMapCount = $this->migrationIdMap->processedCount() - $this->migrationIdMap->updateCount(); } } /** Loading @@ -182,20 +225,23 @@ class MigrationImportThread implements MigrationThreadInterface { throw new \LogicException('Cannot start an already active migration thread.'); } $source_count = max(0, $this->sourceCount); // Source count can be -1. $processed_count = $this->sourceCount >= 0 ? $this->migrationIdMap->processedCount() $this->originalLastImported = $this->trackLastImported ? time() : 0; $this->previousProgress = $processed_count; $this->currentProgress = $processed_count; $format = $source_count > 0 ? SmartMigrateCli::PROGRESS_BAR_FORMAT_ID : SmartMigrateCli::PROGRESS_BAR_FORMAT_ID_NOMAX; if (!$this->trackLastImported) { assert($this->migrationIdMap instanceof Sql); $this->migrationIdMap->getDatabase()->update($this->migrationIdMap->mapTableName()) ->fields(['last_imported' => 1]) ->execute(); } $format = $this->itemsToProcessCount >= 0 ? SmartMigrateCli::PROGRESS_BAR_FORMAT_ID : SmartMigrateCli::PROGRESS_BAR_FORMAT_ID_NOMAX; $this->resetBar(); $this->bar->setMessage("<info>{$this->migrationId}</info>"); $this->bar->setFormat($format); $this->bar->start($source_count); $this->bar->start(max(0, $this->itemsToProcessCount)); $this->bar->setProgress($this->currentProgress); $this->process->start(); Loading Loading @@ -226,33 +272,44 @@ class MigrationImportThread implements MigrationThreadInterface { $this->process = NULL; $this->migrationId = NULL; $this->sourceCount = 0; $this->sourceConfig = []; $this->previousProgress = 0; $this->currentProgress = 0; $this->migrationIdMap = NULL; $this->errors = []; $this->followUps = []; $this->itemsToProcessCount = 0; $this->initialIdMapCount = 0; $this->resetBar(); } /** * {@inheritdoc} */ public function reportProgress(): int { if ($this->isIdle()) { return 0; } $progress = $this->updateProgress(); if ($this->isFinished()) { public function getReport(): ThreadReportInterface { // We cannot pause a subprocess. Because of this, we always risk race // conditions, because we cannot determine the various states of a process // in the same moment. It might happen that the import process still works // on the last item when we ask for a progress update (and get know how many // items are processed so far), and after that, when we check whether the // subprocess is terminated, we get a TRUE result. // So to prevent any race condition, we check "isFinished" first and store // its return value in a local variable. Of course, it still can happen that // the process is finished after we checked its termination, but in this // case, we will be able to properly finish the job when the scheduler // asks for a new report next time (then we will report "finished" status // with 0 items processed since the last report). $is_finished = $this->isFinished() && !($is_idle = $this->isIdle()); $progress = !$is_idle ? $this->updateProgress() : 0; $fully_processed = !$is_idle && $this->isFullyProcessed(); if ($is_finished) { $process_output = array_filter(explode("\n", $this->process->getOutput())); $last = array_pop($process_output); $message = $last ? json_decode($last, TRUE) : []; $this->followUps = array_combine( $follow_ups = array_combine( $message['follow_ups'] ?? [], $message['follow_ups'] ?? [], ); $this->errors = array_reduce( $errors = array_reduce( array_filter(explode("\n", $this->process->getErrorOutput())), function (array $carry, string $error) { $key = md5($error); Loading @@ -265,58 +322,96 @@ class MigrationImportThread implements MigrationThreadInterface { }, [] ); // Reset the ID map plugin's last_import column if needed: if last import // is not tracked, we write '1' into this column before we start the // subprocess. if (!$this->trackLastImported) { assert($this->migrationIdMap instanceof Sql); $this->migrationIdMap->getDatabase()->update($this->migrationIdMap->mapTableName()) ->fields(['last_imported' => 0]) ->execute(); } return $progress; // If there are no errors, and the current migration was configured to // track changes, and the current progress does not match the initially // calculated number of the items to process, we assume that the source // plugin just skipped some previously imported unchanged rows. if ( !$errors && $this->trackChanges && $this->currentProgress !== $this->itemsToProcessCount && $this->sourceCount <= $this->migrationIdMap->processedCount() ) { $progress += $this->initialIdMapCount; } /** * {@inheritdoc} */ public function getErrors(): ?array { return $this->isFinished() && !empty($this->errors) ? $this->errors : NULL; } /** * {@inheritdoc} */ public function getFollowUps(): array { return $this->followUps; return new ThreadReport( $progress ?? 0, $is_finished ?? FALSE, $fully_processed ?? FALSE, $errors ?? NULL, $follow_ups ?? NULL, $is_finished ? $this->process->getExitCode() : NULL ); } /** * Updates the progress bar of the thread, and returns the progress. * Updates the progress bar of the thread and returns the progress. * * @return int * The number of the processed items since the last update. */ protected function updateProgress(): int { $this->previousProgress = $this->currentProgress; $this->currentProgress = $this->migrationIdMap->processedCount(); $this->currentProgress = $this->getCurrentProgress(); $this->bar->setProgress($this->currentProgress); return $this->currentProgress - $this->previousProgress; } /** * {@inheritdoc} * Returns the actual number of the processed items. * * @return int * The actual number of the processed items. */ public function isFullyProcessed(): bool { // If the source is uncountable, we have no way of knowing if it is // complete, so assume that it is. return $this->sourceCount < 0 ? $this->process->isTerminated() : $this->process->isTerminated() && $this->sourceCount <= $this->currentProgress; protected function getCurrentProgress(): int { assert($this->migrationIdMap instanceof Sql); $current_progress_q = $this->migrationIdMap->getDatabase()->select($this->migrationIdMap->mapTableName(), 'm') ->fields('m', ['last_imported']); if ($this->trackLastImported) { $current_progress_q->condition('m.last_imported', $this->originalLastImported, '>='); } else { $current_progress_q->condition('m.last_imported', 1, '<>'); } return (int) $current_progress_q->countQuery()->execute()->fetchField(); } /** * {@inheritdoc} * Determines whether the current task is fully processed. * * @return bool * Whether the current task is fully processed. */ public function exitCode(): ?int { return isset($this->process) ? $this->process->getExitCode() : NULL; protected function isFullyProcessed(): bool { if (!$this->process->isTerminated()) { return FALSE; } // If the source is uncountable, we have no way of knowing if the migration // is complete, so assume that it is. if ($this->sourceCount < 0) { return TRUE; } if ($this->itemsToProcessCount < 0) { return TRUE; } // Task is fully process if the number of items to process matches current // progress... return $this->itemsToProcessCount <= $this->currentProgress || // ...or if we are tracking changes, the source count matches the process // count (this slightly simplifies the situation). $this->trackChanges && $this->sourceCount <= $this->migrationIdMap->processedCount(); } /** Loading
src/Thread/MigrationRollbackRemovedThread.php +0 −2 Original line number Diff line number Diff line Loading @@ -41,8 +41,6 @@ class MigrationRollbackRemovedThread extends MigrationImportThread { $removed_items_id_map = (clone $migration)->set('idMap', ['plugin' => $this->migrationIdMap->getPluginId() . '_rollback_removed'])->getIdMap(); $removed_items_id_map->rewind(); $this->sourceCount = iterator_count($removed_items_id_map); $this->errors = []; $this->followUps = []; unset($removed_items_id_map); } Loading
src/Thread/MigrationThreadInterface.php +5 −36 Original line number Diff line number Diff line Loading @@ -7,7 +7,9 @@ namespace Drupal\smart_migrate_cli\Thread; use Drupal\migrate\Plugin\MigrationInterface; /** * Interface fot migration threads. * Interface of migration threads.. * * @internal */ interface MigrationThreadInterface { Loading Loading @@ -35,39 +37,6 @@ interface MigrationThreadInterface { */ public function isFinished(): bool; /** * Whether the subprocess processed all its items. * * @return bool * Whether all migration rows are processed. */ public function isFullyProcessed(): bool; /** * Returns errors of the process. * * @return string[][]|null * The errors reported during the subprocess. NULL if the process did not * finish yet or if no errors were captured. */ public function getErrors(): ?array; /** * Returns the IDs of the follow-up tasks, if any. * * @return string[] * The IDs of the follow-up tasks, if any. */ public function getFollowUps(): array; /** * Returns the exit code of the subprocess, if it exists and already finished. * * @return int|null * The exit code of the subprocess, if it exists and already finished. */ public function exitCode(): ?int; /** * Returns the migration plugin ID of the thread, if any. * Loading Loading @@ -101,8 +70,8 @@ interface MigrationThreadInterface { public function reset(): void; /** * Updates progress and returns the number of the processed items. * Returns report about the thread and its subprocess. */ public function reportProgress(): int; public function getReport(): ThreadReportInterface; }
src/Thread/ThreadReport.php 0 → 100644 +123 −0 Original line number Diff line number Diff line <?php declare(strict_types = 1); namespace Drupal\smart_migrate_cli\Thread; /** * Simple report object about a thread's task and subprocess.. * * @internal */ class ThreadReport implements ThreadReportInterface { /** * The progress of the actual task since the last report. * * @var int */ protected int $progress; /** * Whether the task is finished or not. * * @var bool */ protected bool $processFinished; /** * Whether the task is fully done or not. * * @var bool */ protected bool $taskIsFullyProcessed; /** * Errors captured during the execution of the task. * * @var array|null */ protected ?array $errors; /** * IDs of the follow-up tasks, if any. * * @var array|null */ protected ?array $followUps; /** * Exit code of the task's subprocess. * * @var int|null */ protected ?int $exitCode; /** * Constructs a ThreadReport instance. * * @param int $progress * The progress of the actual task since the last report. * @param bool $process_finished * Whether the task is finished or not. * @param bool $fully_processed * Whether the task is fully done or not. * @param array|null $errors * Errors captured during the execution of the task. * @param array|null $follow_ups * IDs of the follow-up tasks, if any. * @param int|null $exit_code * Exit code of the task's subprocess. */ public function __construct(int $progress, bool $process_finished = FALSE, bool $fully_processed = FALSE, ?array $errors = NULL, ?array $follow_ups = NULL, ?int $exit_code = NULL) { $this->progress = $progress; $this->processFinished = $process_finished; $this->taskIsFullyProcessed = $fully_processed; $this->errors = $errors; $this->followUps = $follow_ups; $this->exitCode = $exit_code; } /** * {@inheritdoc} */ public function getProgress(): int { return $this->progress; } /** * {@inheritdoc} */ public function processIsFinished(): bool { return $this->processFinished; } /** * {@inheritdoc} */ public function taskIsFullyProcessed(): bool { return $this->taskIsFullyProcessed; } /** * {@inheritdoc} */ public function getProcessErrors(): ?array { return $this->errors; } /** * {@inheritdoc} */ public function getFollowUpTasks(): ?array { return $this->followUps; } /** * {@inheritdoc} */ public function getProcessExistCode(): ?int { return $this->exitCode; } }
src/Thread/ThreadReportInterface.php 0 → 100644 +63 −0 Original line number Diff line number Diff line <?php declare(strict_types = 1); namespace Drupal\smart_migrate_cli\Thread; /** * Interface of thread reports.. * * @internal */ interface ThreadReportInterface { /** * Updates progress and returns the number of the processed items. * * @return int * Processed items since the last report. */ public function getProgress(): int; /** * Whether the underlying process is finished. * * @return bool * Whether the subprocess is finished (terminated). */ public function processIsFinished(): bool; /** * Whether the underlying process is fully processed. * * @return bool * Whether the task of the subprocess is fully finished. */ public function taskIsFullyProcessed(): bool; /** * Returns the errors of the subprocess. * * @return array|null * Errors happened during the subprocess as array. */ public function getProcessErrors(): ?array; /** * Returns the follow-up tasks. * * @return array|null * IDs of the follow up tasks which should be executed after * the successfully finished subprocess. */ public function getFollowUpTasks(): ?array; /** * Returns the exit code of the subprocess if it is finished. * * @return int|null * Exit code of the subprocess if it is terminated. */ public function getProcessExistCode(): ?int; }