Commit 5ca0a749 authored by Zoltan Attila Horvath's avatar Zoltan Attila Horvath Committed by Zoltan Attila Horvath
Browse files

Issue #3314634 by huzooka: Make multi thread import compatible with change...

Issue #3314634 by huzooka: Make multi thread import compatible with change tracking and high water properties
parent 0302ebe2
Loading
Loading
Loading
Loading
+156 −61
Original line number Diff line number Diff line
@@ -6,9 +6,11 @@ namespace Drupal\smart_migrate_cli\Thread;

use Consolidation\SiteAlias\SiteAliasInterface;
use Consolidation\SiteProcess\SiteProcess;
use Drupal\migrate\Plugin\migrate\id_map\Sql;
use Drupal\migrate\Plugin\MigrateIdMapInterface;
use Drupal\migrate\Plugin\MigrationInterface;
use Drupal\smart_migrate_cli\SmartMigrateCli;
use Drupal\smart_migrate_cli\Utility\MigrationSourceUtility;
use Drush\Drush;
use Symfony\Component\Console\Helper\ProgressBar;
use Symfony\Component\Console\Output\OutputInterface;
@@ -71,6 +73,27 @@ class MigrationImportThread implements MigrationThreadInterface {
   */
  protected int $sourceCount = 0;

  /**
   * Number of items to process.
   *
   * @var int
   */
  protected int $itemsToProcessCount = 0;

  /**
   * Changes are tracked.
   *
   * @var bool
   */
  protected bool $trackChanges = FALSE;

  /**
   * High water is tracked.
   *
   * @var bool
   */
  protected bool $highwater = FALSE;

  /**
   * Number of the processed items since the previous update.
   *
@@ -86,18 +109,27 @@ class MigrationImportThread implements MigrationThreadInterface {
  protected int $currentProgress = 0;

  /**
   * IDs of the follow-up tasks, if any.
   * Number of the previously processed items.
   *
   * @var string[]
   * @var int
   */
  protected array $followUps = [];
  protected int $initialIdMapCount = 0;

  /**
   * Errors as string, if any.
   * Whether last import timestamp is tracked.
   *
   * @var string[][]
   * @var bool
   */
  protected array $errors = [];
  protected bool $trackLastImported = FALSE;

  /**
   * The most recently imported item's import timestamp.
   *
   * (... if last import is tracked).
   *
   * @var int
   */
  protected int $originalLastImported = 0;

  /**
   * Constructs a MigrationThread instance.
@@ -168,10 +200,21 @@ class MigrationImportThread implements MigrationThreadInterface {
      ->enableOutput();
    $this->migrationId = $migration->id();
    $this->migrationIdMap = (clone $migration)->getIdMap();
    // Source count can be -1.
    $this->sourceCount = (clone $migration)->getSourcePlugin()->count();
    $this->errors = [];
    $this->followUps = [];
    $this->source = (clone $migration)->getSourcePlugin();
    $this->sourceCount = $this->source->count();
    $this->trackChanges = (bool) ($migration->getSourceConfiguration()['track_changes'] ?? FALSE);
    $this->highwater = array_key_exists('high_water_property', $migration->getSourceConfiguration());
    $this->itemsToProcessCount = MigrationSourceUtility::getNumberOfItemsToImportProcessFromPieces(
      $this->source,
      $this->migrationIdMap,
      $this->getMigrationId(),
      $this->trackChanges,
      $this->highwater
    );
    $this->trackLastImported = $migration->getTrackLastImported();
    if ($this->trackChanges) {
      $this->initialIdMapCount = $this->migrationIdMap->processedCount() - $this->migrationIdMap->updateCount();
    }
  }

  /**
@@ -182,20 +225,23 @@ class MigrationImportThread implements MigrationThreadInterface {
      throw new \LogicException('Cannot start an already active migration thread.');
    }

    $source_count = max(0, $this->sourceCount);
    // Source count can be -1.
    $processed_count = $this->sourceCount >= 0
      ? $this->migrationIdMap->processedCount()
    $this->originalLastImported = $this->trackLastImported
      ? time()
      : 0;
    $this->previousProgress = $processed_count;
    $this->currentProgress = $processed_count;

    $format = $source_count > 0 ? SmartMigrateCli::PROGRESS_BAR_FORMAT_ID : SmartMigrateCli::PROGRESS_BAR_FORMAT_ID_NOMAX;
    if (!$this->trackLastImported) {
      assert($this->migrationIdMap instanceof Sql);
      $this->migrationIdMap->getDatabase()->update($this->migrationIdMap->mapTableName())
        ->fields(['last_imported' => 1])
        ->execute();
    }

    $format = $this->itemsToProcessCount >= 0 ? SmartMigrateCli::PROGRESS_BAR_FORMAT_ID : SmartMigrateCli::PROGRESS_BAR_FORMAT_ID_NOMAX;

    $this->resetBar();
    $this->bar->setMessage("<info>{$this->migrationId}</info>");
    $this->bar->setFormat($format);
    $this->bar->start($source_count);
    $this->bar->start(max(0, $this->itemsToProcessCount));
    $this->bar->setProgress($this->currentProgress);

    $this->process->start();
@@ -226,33 +272,44 @@ class MigrationImportThread implements MigrationThreadInterface {
    $this->process = NULL;
    $this->migrationId = NULL;
    $this->sourceCount = 0;
    $this->sourceConfig = [];
    $this->previousProgress = 0;
    $this->currentProgress = 0;
    $this->migrationIdMap = NULL;
    $this->errors = [];
    $this->followUps = [];
    $this->itemsToProcessCount = 0;
    $this->initialIdMapCount = 0;
    $this->resetBar();
  }

  /**
   * {@inheritdoc}
   */
  public function reportProgress(): int {
    if ($this->isIdle()) {
      return 0;
    }

    $progress = $this->updateProgress();

    if ($this->isFinished()) {
  public function getReport(): ThreadReportInterface {
    // We cannot pause a subprocess. Because of this, we always risk race
    // conditions, because we cannot determine the various states of a process
    // in the same moment. It might happen that the import process still works
    // on the last item when we ask for a progress update (and get know how many
    // items are processed so far), and after that, when we check whether the
    // subprocess is terminated, we get a TRUE result.
    // So to prevent any race condition, we check "isFinished" first and store
    // its return value in a local variable. Of course, it still can happen that
    // the process is finished after we checked its termination, but in this
    // case, we will be able to properly finish the job when the scheduler
    // asks for a new report next time (then we will report "finished" status
    // with 0 items processed since the last report).
    $is_finished = $this->isFinished() && !($is_idle = $this->isIdle());
    $progress = !$is_idle ? $this->updateProgress() : 0;
    $fully_processed = !$is_idle && $this->isFullyProcessed();

    if ($is_finished) {
      $process_output = array_filter(explode("\n", $this->process->getOutput()));
      $last = array_pop($process_output);
      $message = $last ? json_decode($last, TRUE) : [];
      $this->followUps = array_combine(
      $follow_ups = array_combine(
        $message['follow_ups'] ?? [],
        $message['follow_ups'] ?? [],
      );
      $this->errors = array_reduce(
      $errors = array_reduce(
        array_filter(explode("\n", $this->process->getErrorOutput())),
        function (array $carry, string $error) {
          $key = md5($error);
@@ -265,58 +322,96 @@ class MigrationImportThread implements MigrationThreadInterface {
        },
        []
      );
      // Reset the ID map plugin's last_import column if needed: if last import
      // is not tracked, we write '1' into this column before we start the
      // subprocess.
      if (!$this->trackLastImported) {
        assert($this->migrationIdMap instanceof Sql);
        $this->migrationIdMap->getDatabase()->update($this->migrationIdMap->mapTableName())
          ->fields(['last_imported' => 0])
          ->execute();
      }

    return $progress;
      // If there are no errors, and the current migration was configured to
      // track changes, and the current progress does not match the initially
      // calculated number of the items to process, we assume that the source
      // plugin just skipped some previously imported unchanged rows.
      if (
        !$errors &&
        $this->trackChanges &&
        $this->currentProgress !== $this->itemsToProcessCount &&
        $this->sourceCount <= $this->migrationIdMap->processedCount()
      ) {
        $progress += $this->initialIdMapCount;
      }

  /**
   * {@inheritdoc}
   */
  public function getErrors(): ?array {
    return $this->isFinished() && !empty($this->errors)
      ? $this->errors
      : NULL;
    }

  /**
   * {@inheritdoc}
   */
  public function getFollowUps(): array {
    return $this->followUps;
    return new ThreadReport(
      $progress ?? 0,
      $is_finished ?? FALSE,
      $fully_processed ?? FALSE,
      $errors ?? NULL,
      $follow_ups ?? NULL,
      $is_finished ? $this->process->getExitCode() : NULL
    );
  }

  /**
   * Updates the progress bar of the thread, and returns the progress.
   * Updates the progress bar of the thread and returns the progress.
   *
   * @return int
   *   The number of the processed items since the last update.
   */
  protected function updateProgress(): int {
    $this->previousProgress = $this->currentProgress;
    $this->currentProgress = $this->migrationIdMap->processedCount();
    $this->currentProgress = $this->getCurrentProgress();
    $this->bar->setProgress($this->currentProgress);
    return $this->currentProgress - $this->previousProgress;
  }

  /**
   * {@inheritdoc}
   * Returns the actual number of the processed items.
   *
   * @return int
   *   The actual number of the processed items.
   */
  public function isFullyProcessed(): bool {
    // If the source is uncountable, we have no way of knowing if it is
    // complete, so assume that it is.
    return $this->sourceCount < 0
      ? $this->process->isTerminated()
      : $this->process->isTerminated() && $this->sourceCount <= $this->currentProgress;
  protected function getCurrentProgress(): int {
    assert($this->migrationIdMap instanceof Sql);
    $current_progress_q = $this->migrationIdMap->getDatabase()->select($this->migrationIdMap->mapTableName(), 'm')
      ->fields('m', ['last_imported']);
    if ($this->trackLastImported) {
      $current_progress_q->condition('m.last_imported', $this->originalLastImported, '>=');
    }
    else {
      $current_progress_q->condition('m.last_imported', 1, '<>');
    }
    return (int) $current_progress_q->countQuery()->execute()->fetchField();
  }

  /**
   * {@inheritdoc}
   * Determines whether the current task is fully processed.
   *
   * @return bool
   *   Whether the current task is fully processed.
   */
  public function exitCode(): ?int {
    return isset($this->process)
      ? $this->process->getExitCode()
      : NULL;
  protected function isFullyProcessed(): bool {
    if (!$this->process->isTerminated()) {
      return FALSE;
    }
    // If the source is uncountable, we have no way of knowing if the migration
    // is complete, so assume that it is.
    if ($this->sourceCount < 0) {
      return TRUE;
    }
    if ($this->itemsToProcessCount < 0) {
      return TRUE;
    }
    // Task is fully process if the number of items to process matches current
    // progress...
    return $this->itemsToProcessCount <= $this->currentProgress ||
      // ...or if we are tracking changes, the source count matches the process
      // count (this slightly simplifies the situation).
      $this->trackChanges && $this->sourceCount <= $this->migrationIdMap->processedCount();
  }

  /**
+0 −2
Original line number Diff line number Diff line
@@ -41,8 +41,6 @@ class MigrationRollbackRemovedThread extends MigrationImportThread {
    $removed_items_id_map = (clone $migration)->set('idMap', ['plugin' => $this->migrationIdMap->getPluginId() . '_rollback_removed'])->getIdMap();
    $removed_items_id_map->rewind();
    $this->sourceCount = iterator_count($removed_items_id_map);
    $this->errors = [];
    $this->followUps = [];
    unset($removed_items_id_map);
  }

+5 −36
Original line number Diff line number Diff line
@@ -7,7 +7,9 @@ namespace Drupal\smart_migrate_cli\Thread;
use Drupal\migrate\Plugin\MigrationInterface;

/**
 * Interface fot migration threads.
 * Interface of migration threads..
 *
 * @internal
 */
interface MigrationThreadInterface {

@@ -35,39 +37,6 @@ interface MigrationThreadInterface {
   */
  public function isFinished(): bool;

  /**
   * Whether the subprocess processed all its items.
   *
   * @return bool
   *   Whether all migration rows are processed.
   */
  public function isFullyProcessed(): bool;

  /**
   * Returns errors of the process.
   *
   * @return string[][]|null
   *   The errors reported during the subprocess. NULL if the process did not
   *   finish yet or if no errors were captured.
   */
  public function getErrors(): ?array;

  /**
   * Returns the IDs of the follow-up tasks, if any.
   *
   * @return string[]
   *   The IDs of the follow-up tasks, if any.
   */
  public function getFollowUps(): array;

  /**
   * Returns the exit code of the subprocess, if it exists and already finished.
   *
   * @return int|null
   *   The exit code of the subprocess, if it exists and already finished.
   */
  public function exitCode(): ?int;

  /**
   * Returns the migration plugin ID of the thread, if any.
   *
@@ -101,8 +70,8 @@ interface MigrationThreadInterface {
  public function reset(): void;

  /**
   * Updates progress and returns the number of the processed items.
   * Returns report about the thread and its subprocess.
   */
  public function reportProgress(): int;
  public function getReport(): ThreadReportInterface;

}
+123 −0
Original line number Diff line number Diff line
<?php

declare(strict_types = 1);

namespace Drupal\smart_migrate_cli\Thread;

/**
 * Simple report object about a thread's task and subprocess..
 *
 * @internal
 */
class ThreadReport implements ThreadReportInterface {

  /**
   * The progress of the actual task since the last report.
   *
   * @var int
   */
  protected int $progress;

  /**
   * Whether the task is finished or not.
   *
   * @var bool
   */
  protected bool $processFinished;

  /**
   * Whether the task is fully done or not.
   *
   * @var bool
   */
  protected bool $taskIsFullyProcessed;

  /**
   * Errors captured during the execution of the task.
   *
   * @var array|null
   */
  protected ?array $errors;

  /**
   * IDs of the follow-up tasks, if any.
   *
   * @var array|null
   */
  protected ?array $followUps;

  /**
   * Exit code of the task's subprocess.
   *
   * @var int|null
   */
  protected ?int $exitCode;

  /**
   * Constructs a ThreadReport instance.
   *
   * @param int $progress
   *   The progress of the actual task since the last report.
   * @param bool $process_finished
   *   Whether the task is finished or not.
   * @param bool $fully_processed
   *   Whether the task is fully done or not.
   * @param array|null $errors
   *   Errors captured during the execution of the task.
   * @param array|null $follow_ups
   *   IDs of the follow-up tasks, if any.
   * @param int|null $exit_code
   *   Exit code of the task's subprocess.
   */
  public function __construct(int $progress, bool $process_finished = FALSE, bool $fully_processed = FALSE, ?array $errors = NULL, ?array $follow_ups = NULL, ?int $exit_code = NULL) {
    $this->progress = $progress;
    $this->processFinished = $process_finished;
    $this->taskIsFullyProcessed = $fully_processed;
    $this->errors = $errors;
    $this->followUps = $follow_ups;
    $this->exitCode = $exit_code;
  }

  /**
   * {@inheritdoc}
   */
  public function getProgress(): int {
    return $this->progress;
  }

  /**
   * {@inheritdoc}
   */
  public function processIsFinished(): bool {
    return $this->processFinished;
  }

  /**
   * {@inheritdoc}
   */
  public function taskIsFullyProcessed(): bool {
    return $this->taskIsFullyProcessed;
  }

  /**
   * {@inheritdoc}
   */
  public function getProcessErrors(): ?array {
    return $this->errors;
  }

  /**
   * {@inheritdoc}
   */
  public function getFollowUpTasks(): ?array {
    return $this->followUps;
  }

  /**
   * {@inheritdoc}
   */
  public function getProcessExistCode(): ?int {
    return $this->exitCode;
  }

}
+63 −0
Original line number Diff line number Diff line
<?php

declare(strict_types = 1);

namespace Drupal\smart_migrate_cli\Thread;

/**
 * Interface of thread reports..
 *
 * @internal
 */
interface ThreadReportInterface {

  /**
   * Updates progress and returns the number of the processed items.
   *
   * @return int
   *   Processed items since the last report.
   */
  public function getProgress(): int;

  /**
   * Whether the underlying process is finished.
   *
   * @return bool
   *   Whether the subprocess is finished (terminated).
   */
  public function processIsFinished(): bool;

  /**
   * Whether the underlying process is fully processed.
   *
   * @return bool
   *   Whether the task of the subprocess is fully finished.
   */
  public function taskIsFullyProcessed(): bool;

  /**
   * Returns the errors of the subprocess.
   *
   * @return array|null
   *   Errors happened during the subprocess as array.
   */
  public function getProcessErrors(): ?array;

  /**
   * Returns the follow-up tasks.
   *
   * @return array|null
   *   IDs of the follow up tasks which should be executed after
   *   the successfully finished subprocess.
   */
  public function getFollowUpTasks(): ?array;

  /**
   * Returns the exit code of the subprocess if it is finished.
   *
   * @return int|null
   *   Exit code of the subprocess if it is terminated.
   */
  public function getProcessExistCode(): ?int;

}
Loading