Commit a8ded3af authored by Zoltan Attila Horvath's avatar Zoltan Attila Horvath Committed by Zoltan Attila Horvath
Browse files

Issue #3307033 by huzooka: Better memory management for environments with LOT of migrations

parent d5c7a773
Loading
Loading
Loading
Loading
+1 −2
Original line number Diff line number Diff line
@@ -6,7 +6,7 @@ services:
      - '@keyvalue'
      - '@account_switcher'
      - '@entity_type.manager'
      - '@?plugin.manager.migration'
      - '@?smart_migrate_cli.plugin.manager.migration.simple'
    tags:
      -  { name: drush.command }

@@ -17,6 +17,5 @@ services:
      - '@account_switcher'
      - '@entity_type.manager'
      - '@?smart_migrate_cli.plugin.manager.migration.simple'
      - '@?plugin.manager.migration'
    tags:
      -  { name: drush.command }
+12 −18
Original line number Diff line number Diff line
@@ -73,7 +73,7 @@ class MigrateRunnerCommands extends OriginalMigrateRunnerCommands {
   *   The account switcher.
   * @param \Drupal\Core\Entity\EntityTypeManagerInterface $entityTypeManager
   *   The entity type manager.
   * @param \Drupal\migrate\Plugin\MigrationPluginManagerInterface|null $migrationPluginManager
   * @param \Drupal\smart_migrate_cli\SimpleMigrationManager|null $migrationPluginManager
   *   The migration plugin manager service.
   */
  public function __construct(DateFormatter $dateFormatter, KeyValueFactoryInterface $keyValueFactory, AccountSwitcherInterface $accountSwitcher, EntityTypeManagerInterface $entityTypeManager, ?MigrationPluginManagerInterface $migrationPluginManager = NULL) {
@@ -89,11 +89,14 @@ class MigrateRunnerCommands extends OriginalMigrateRunnerCommands {
    $invoked = debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS, 2)[1]['function'] ?? NULL;
    $order_forced = func_get_args()[2] ?? FALSE;
    $is_import_operation = $invoked === 'import' || $order_forced;
    $parent_grouped_list = $is_import_operation
      ? $this->getSortedMigrationListForImport($migrationIds, $tags)
      : parent::getMigrationList($migrationIds, $tags);
    $grouped_list = $this->doGetGroupedMigrationList($migrationIds, $tags);
    if ($is_import_operation) {
      ksort($grouped_list);
    }

    $grouped_list = $this->filterMigrationList($grouped_list);

    return $this->filterMigrationList($parent_grouped_list, $is_import_operation);
    return $this->orderMigrationList($grouped_list, $is_import_operation);
  }

  /**
@@ -249,30 +252,21 @@ class MigrateRunnerCommands extends OriginalMigrateRunnerCommands {
  /**
   * {@inheritdoc}
   */
  protected function executeMigration(MigrationInterface $initial, string $migrationId, array $userData): void {
  protected function executeMigration(MigrationInterface $migration, string $migrationId, array $userData): void {
    // We have to drop stale entity caches - some base fields (e.g. provided by
    // Content Translation) might be installed after the default translations
    // are imported. If we don't drop cached entities and try to import
    // entity translations, the entity storage will return a stale entity
    // without the values of the base fields which were installed later.
    // @todo Consider killing risky caches during migration instead.
    if ($initial->getDestinationPlugin() instanceof EntityContentBase) {
      $entity_type_id = explode(':', $initial->getDestinationConfiguration()['plugin'])[1] ?? NULL;
    if ($migration->getDestinationPlugin() instanceof EntityContentBase) {
      $entity_type_id = explode(':', $migration->getDestinationConfiguration()['plugin'])[1] ?? NULL;
      // Book destination plugin does not correspond to an entity type ID.
      if ($entity_type_id) {
        $this->entityTypeManager->getStorage($entity_type_id)->resetCache();
      }
    }
    // Base fields might change by the previous migrations. For instance
    // Content Translation installs base fields during migration - and these
    // new base fields aren't reflected in the destination migration plugin
    // instances (e.g. the injected entity storage objects aren't up-to-date,
    // they still miss the installed base fields).
    // That's why it is a better idea to do the same what Drupal core's
    // MigrateUpgradeImportBatch does: it instantiates the migration instances
    // right before it executes them.
    $migration = $this->migrationPluginManager->createInstance($migrationId);
    assert($migration instanceof MigrationInterface);
    $migration = $this->getFreshMigrationInstance($migration->id());

    static::prepareMigration($migration);

+30 −33
Original line number Diff line number Diff line
@@ -15,7 +15,7 @@ use Drupal\Core\Session\UserSession;
use Drupal\Core\Site\Settings;
use Drupal\Core\State\StateInterface;
use Drupal\migrate\Plugin\MigrationInterface;
use Drupal\migrate\Plugin\RequirementsInterface;
use Drupal\smart_migrate_cli\SimpleMigrationManager;
use Drupal\smart_migrate_cli\Traits\SmartMigrateHelperTrait;
use Drupal\smart_migrate_cli\Traits\SmartMigrationConfigurationTrait;
use Drush\Commands\DrushCommands;
@@ -101,21 +101,28 @@ class SmartMigrateCliCommands extends DrushCommands implements SiteAliasManagerA
   */
  const PROGRESS_BAR_FORMAT = ' [%bar%] %percent:3s%% | %current:4s%/%max:-4s% | %message%';

  /**
   * Key-value storage key where original states are stored.
   *
   * @const string
   */
  const ORIGINAL_STATES_STORAGE_KEY = 'smart_migrate_cli.original_states';

  /**
   * The logger section used by SmartMigrateHelper's getNextUnblockedMigration.
   *
   * Yepp, bad design.
   *
   * @var \Symfony\Component\Console\Output\OutputInterface
   * @var \Symfony\Component\Console\Output\OutputInterface|null
   */
  private $loggerSection;
  private ?OutputInterface $loggerSection;

  /**
   * Storage for empty "placeholder" sections around progress bars.
   *
   * @var \Symfony\Component\Console\Output\OutputInterface[]
   */
  private $barPlaceholderSections = [];
  private array $barPlaceholderSections = [];

  /**
   * Error messages recorded during multi-thread migration.
@@ -127,7 +134,7 @@ class SmartMigrateCliCommands extends DrushCommands implements SiteAliasManagerA
   *
   * @var array[][]
   */
  private $errorOutputs = [];
  private array $errorOutputs = [];

  /**
   * The progress bars and their metadata.
@@ -138,7 +145,7 @@ class SmartMigrateCliCommands extends DrushCommands implements SiteAliasManagerA
   *
   * @todo Provide our own object for these things.
   */
  private $progressBars = [];
  private array $progressBars = [];

  /**
   * The scheduled subprocesses.
@@ -147,35 +154,35 @@ class SmartMigrateCliCommands extends DrushCommands implements SiteAliasManagerA
   *
   * @todo Provide our own object for these things too.
   */
  private $activeProcesses = [];
  private array $activeProcesses = [];

  /**
   * List of the migration plugin instances to execute.
   *
   * @var \Drupal\migrate\Plugin\MigrationInterface[]
   */
  private $list = [];
  private array $list = [];

  /**
   * List of the migration plugin IDs which should be executed.
   *
   * @var string[]
   */
  private $migrationsToProcess = [];
  private array $migrationsToProcess = [];

  /**
   * List of the already processed migrations' plugin IDs.
   *
   * @var string[]
   */
  private $migrationsProcessed = [];
  private array $migrationsProcessed = [];

  /**
   * The state service.
   *
   * @var \Drupal\Core\State\StateInterface
   */
  protected $state;
  protected StateInterface $state;

  /**
   * Account switcher.
@@ -191,19 +198,12 @@ class SmartMigrateCliCommands extends DrushCommands implements SiteAliasManagerA
   */
  protected EntityTypeManagerInterface $entityTypeManager;

  /**
   * Migration plugin manager service.
   *
   * @var \Drupal\migrate\Plugin\MigratePluginManagerInterface|null
   */
  protected $migrationPluginManager;

  /**
   * Light-weight migration plugin manager service optimized for multi-thread.
   *
   * @var \Drupal\smart_migrate_cli\SimpleMigrationManager
   * @var \Drupal\smart_migrate_cli\SimpleMigrationManager|null
   */
  protected $simpleMigrationManager;
  protected SimpleMigrationManager $migrationPluginManager;

  /**
   * Constructs a new SmartMigrateCliCommands instance.
@@ -214,18 +214,15 @@ class SmartMigrateCliCommands extends DrushCommands implements SiteAliasManagerA
   *   The account switcher service.
   * @param \Drupal\Core\Entity\EntityTypeManagerInterface $entity_type_manager
   *   The entity_type manager.
   * @param \Drupal\smart_migrate_cli\SimpleMigrationManager|null $simple_migration_plugin_manager
   * @param \Drupal\smart_migrate_cli\SimpleMigrationManager $simple_migration_plugin_manager
   *   The light-weight migration plugin manager.
   * @param \Drupal\migrate\Plugin\MigratePluginManagerInterface|null $migration_plugin_manager
   *   The migration plugin manager, if available.
   */
  public function __construct(StateInterface $state, AccountSwitcherInterface $account_switcher, EntityTypeManagerInterface $entity_type_manager, $simple_migration_plugin_manager = NULL, $migration_plugin_manager = NULL) {
  public function __construct(StateInterface $state, AccountSwitcherInterface $account_switcher, EntityTypeManagerInterface $entity_type_manager, SimpleMigrationManager $simple_migration_plugin_manager = NULL) {
    parent::__construct();
    $this->state = $state;
    $this->accountSwitcher = $account_switcher;
    $this->entityTypeManager = $entity_type_manager;
    $this->migrationPluginManager = $migration_plugin_manager;
    $this->simpleMigrationManager = $simple_migration_plugin_manager;
    $this->migrationPluginManager = $simple_migration_plugin_manager;
    $this->loggerSection = $this->output();
  }

@@ -293,7 +290,7 @@ class SmartMigrateCliCommands extends DrushCommands implements SiteAliasManagerA
      $threads
    ));

    $list = $this->getMigrationList($this->input()->getOption('tag'), TRUE);
    $list = $this->getMigrationList($this->input()->getOption('tag'));
    if (empty($list) || empty($list = reset($list))) {
      $this->logger()->error(sprintf(
        "No migrations found with the given tag %s.",
@@ -317,8 +314,8 @@ class SmartMigrateCliCommands extends DrushCommands implements SiteAliasManagerA
      );
    $this->initializeMultiThreadProcessing($list, $run_at_the_end);

    // Register our SIGNIT and SIGTERM signals. If the main process gets one of
    // these, we have stop all the subprocesses too.
    // Register our SIGINT and SIGTERM signals. If the main process gets one of
    // these, we have to stop all the subprocesses too.
    if (extension_loaded('pcntl')) {
      pcntl_signal(SIGTERM, static::stopSignalClosure());
      pcntl_signal(SIGINT, static::stopSignalClosure());
@@ -414,7 +411,7 @@ class SmartMigrateCliCommands extends DrushCommands implements SiteAliasManagerA
   */
  public function singleImport(): int {
    $migration_id = getenv(self::SMC_MIGRATION_PLUGIN_ID_ENV);
    $migrations = $migration_id ? $this->simpleMigrationManager->createInstances([$migration_id]) : [];
    $migrations = $migration_id ? $this->migrationPluginManager->createInstances([$migration_id]) : [];
    if (count($migrations) !== 1) {
      $this->logger()->error('Too many, or no migrations found.');
      $return = self::EXIT_FAILURE;
@@ -424,7 +421,6 @@ class SmartMigrateCliCommands extends DrushCommands implements SiteAliasManagerA
    $verbosity = $this->output()->getVerbosity();
    if (
      !isset($return) &&
      $migration instanceof RequirementsInterface &&
      ($requirement_message = $this->getMigrationRequirementsMessages($migration))
    ) {
      $this->logger()->error($requirement_message);
@@ -468,8 +464,9 @@ class SmartMigrateCliCommands extends DrushCommands implements SiteAliasManagerA
   *   List of the discovered and executable migrations.
   */
  protected function getMigrationList(string $tag): array {
    $sorted_list = $this->getSortedMigrationListForImport(NULL, $tag);
    return $this->filterMigrationList($sorted_list);
    $grouped_list = $this->doGetGroupedMigrationList(NULL, $tag);
    $grouped_list = $this->filterMigrationList($grouped_list);
    return $this->orderMigrationList($grouped_list, TRUE);
  }

  /**
+95 −40
Original line number Diff line number Diff line
@@ -64,12 +64,37 @@ class SimpleMigrationManager extends MigrationPluginManager {
   * {@inheritdoc}
   */
  public function buildDependencyMigration(array $migrations, array $dynamic_ids) {
    // If caller is the manager's createInstances method, we don't build the
    // dependency graph. Smart Migrate CLI will do this separately.
    $is_createinstances_call = array_reduce(
      debug_backtrace(0, 2),
      function (bool $carry, array $stack) {
        if ($carry) {
          return $carry;
        }
        $class = $stack['class'] ?? NULL;
        return $class &&
          in_array(MigrationPluginManagerInterface::class, class_implements($class)) &&
          ($stack['function'] ?? NULL) === 'createInstances';
      },
      FALSE
    );
    if ($is_createinstances_call) {
      return $migrations;
    }

    return is_callable([$this->currentPluginManager, 'buildDependencyMigration'])
      ? $this->currentPluginManager->buildDependencyMigration($migrations, $dynamic_ids)
      : parent::buildDependencyMigration($migrations, $dynamic_ids);
  }

  /**
   * Checks if requirements for the given migration plugin instance are met.
   *
   * This is an optimized version of the requirements check built into
   * migrations: this performs the same check, but operates on a clone, which is
   * removed from PHP memory after the check is done.
   *
   * @param \Drupal\migrate\Plugin\MigrationInterface $migration
   *   The migration plugin instance to check.
   * @param bool $check_dependencies
@@ -80,39 +105,76 @@ class SimpleMigrationManager extends MigrationPluginManager {
   *   Whether requirements are met or not.
   */
  public function migrationRequirementsMet(MigrationInterface $migration, bool $check_dependencies = TRUE): bool {
    $clone = clone $migration;
    // Check whether the current migration source and destination plugin
    // requirements are met or not.
    try {
      if ($migration->getSourcePlugin() instanceof RequirementsInterface) {
        $migration->getSourcePlugin()->checkRequirements();
      if ($clone->getSourcePlugin() instanceof RequirementsInterface) {
        $clone->getSourcePlugin()->checkRequirements();
      }
      if ($clone->getDestinationPlugin() instanceof RequirementsInterface) {
        $clone->getDestinationPlugin()->checkRequirements();
      }

      if ($check_dependencies && !empty($requirements = $clone->getRequirements())) {
        // Requirements need to be checked.
        $required_migrations = $this->createInstances($requirements);

        if (empty(array_diff($requirements, array_keys($required_migrations)))) {
          // Check if the dependencies are in good shape.
          foreach ($required_migrations as $required_migration) {
            if (!$required_migration->allRowsProcessed()) {
              $return = FALSE;
              break 1;
            }
      if ($migration->getDestinationPlugin() instanceof RequirementsInterface) {
        $migration->getDestinationPlugin()->checkRequirements();
          }
        }
    catch (RequirementsException $e) {
      return FALSE;
        else {
          $return = FALSE;
        }
      }

    if (!$check_dependencies || empty($requirements = $migration->getRequirements())) {
      // Requirements don't need to be checked.
      return TRUE;
      $return = $return ?? TRUE;
    }
    catch (RequirementsException $e) {
      $return = FALSE;
    }

    $required_migrations = $this->createInstances($requirements);
    unset($clone);
    unset($required_migrations);
    unset($requirements);

    if (!empty(array_diff($requirements, array_keys($required_migrations)))) {
      return FALSE;
    return $return;
  }

  /**
   * Returns the IDs of the given migration's unmet migration dependencies.
   *
   * @param \Drupal\migrate\Plugin\MigrationInterface $migration
   *   The migration plugin instance to check.
   */
  public function getMissingRequirements(MigrationInterface $migration) {
    $clone = clone $migration;
    $requirements = $clone->getRequirements();
    if (!empty($requirements)) {
      // Requirements need to be checked.
      $required_migrations = $this->createInstances($requirements);
      $missing_migrations = array_diff($requirements, array_keys($required_migrations));

      // Check if the dependencies are in good shape.
      foreach ($required_migrations as $required_migration) {
        if (!$required_migration->allRowsProcessed()) {
        return FALSE;
          $missing_migrations[] = $required_migration->id();
        }
      }
    }

    return TRUE;
    unset($clone);
    unset($requirements);
    unset($required_migrations);
    unset($required_migration);

    return $missing_migrations ?? [];
  }

  /**
@@ -126,37 +188,30 @@ class SimpleMigrationManager extends MigrationPluginManager {
   *   ready for being executed.
   */
  public function getMigrationRequirementsMessages(MigrationInterface $migration): ?string {
    $message = NULL;
    $migration_clone = clone $migration;
    // Check whether the current migration source and destination plugin
    // requirements are met or not.
    try {
      $source = $migration->getSourcePlugin();
      if ($source instanceof RequirementsInterface) {
        $source->checkRequirements();
      }
      $destination = $migration->getDestinationPlugin();
      if ($destination instanceof RequirementsInterface) {
        $destination->checkRequirements();
      if ($migration_clone->getSourcePlugin() instanceof RequirementsInterface) {
        $migration_clone->getSourcePlugin()->checkRequirements();
      }
      if ($migration_clone->getDestinationPlugin() instanceof RequirementsInterface) {
        $migration_clone->getDestinationPlugin()->checkRequirements();
      }
    catch (RequirementsException $e) {
      return $e->getMessage();
    }

    if (!empty($requirements = $migration->getRequirements())) {
      $required_migrations = $this->createInstances($requirements);
      $missing_migrations = array_diff($requirements, array_keys($required_migrations));

      // Check if the dependencies are in good shape.
      foreach ($required_migrations as $migration_id => $required_migration) {
        if (!$required_migration->allRowsProcessed()) {
          $missing_migrations[] = $migration_id;
      if (!empty($missing_migration_ids = $this->getMissingRequirements($migration_clone))) {
        $message = sprintf("Missing migrations %s.", implode(', ', $missing_migration_ids));
      }
    }
    catch (RequirementsException $e) {
      $message = $e->getMessage();
    }

    return !empty($missing_migrations)
      ? sprintf("Missing migrations %s.", implode(', ', $missing_migrations))
      : NULL;
    unset($migration_clone);
    unset($missing_migration_ids);

    return $message;
  }

}
+21 −0
Original line number Diff line number Diff line
@@ -4,8 +4,11 @@ declare(strict_types = 1);

namespace Drupal\smart_migrate_cli;

use Drupal\migrate\MigrateException;
use Drupal\migrate\MigrateMessageInterface;
use Drupal\migrate\MigrateSkipRowException;
use Drupal\migrate\Plugin\MigrationInterface;
use Drupal\migrate\Row;
use Drush\Drupal\Migrate\MigrateExecutable as DrushMigrateExecutable;
use Symfony\Component\Console\Output\OutputInterface;

@@ -50,4 +53,22 @@ class SmartMigrateExecutable extends DrushMigrateExecutable {
    return FALSE;
  }

  /**
   * {@inheritdoc}
   */
  protected function processPipeline(Row $row, string $destination, array $plugins, $value) {
    try {
      parent::processPipeline($row, $destination, $plugins, $value);
    }
    catch (MigrateException | MigrateSkipRowException $handled_exception) {
      throw $handled_exception;
    }
    catch (\Exception $unhandled_exception) {
      throw new MigrateException(sprintf(
        "Processing destination property threw the following exception: %s",
        $unhandled_exception->getMessage()
      ));
    }
  }

}
Loading