Commit e0c165c0 authored by Johan den Hollander's avatar Johan den Hollander Committed by Johan den Hollander
Browse files

Issue #3010754 by Murz, Johan den Hollander, azcomi, Siegrist, grahl,...

Issue #3010754 by Murz, Johan den Hollander, azcomi, Siegrist, grahl, styrbaek, owenbush: Command "sapi-fast" is not defined
parent ad4e3c8e
Loading
Loading
Loading
Loading

composer.json

0 → 100644
+13 −0
Original line number Diff line number Diff line
{
  "name": "drupal/search_api_fast",
  "description": "Provides drush commands to index fast with search API (using all your CPU cores).",
  "type": "drupal-module",
  "homepage": "https://www.drupal.org/project/search_api_fast",
  "extra": {
    "drush": {
      "services": {
        "drush.services.yml": "^9"
      }
    }
  }
}

drush.services.yml

0 → 100644
+9 −0
Original line number Diff line number Diff line
services:
  search_api_fast.commands:
    class: \Drupal\search_api_fast\Commands\SearchApiFastCommands
    arguments:
      - '@entity_type.manager'
      - '@module_handler'
      - '@event_dispatcher'
    tags:
      - { name: drush.command }
+28 −12
Original line number Diff line number Diff line
@@ -6,8 +6,10 @@
 */

use Drupal\search_api\SearchApiException;
use Drupal\search_api\Utility\CommandHelper;
use Drupal\search_api_fast\SearchApiFastQueue;
use Drupal\Core\Database\Database;
use Drupal\search_api\Entity\Index;

/**
 * Implements hook_drush_command().
@@ -60,7 +62,11 @@ function search_api_fast_drush_command() {
 */
function drush_search_api_fast_search_api_index_fast($index_name = '', $clear = '') {

  if ($index_name && $indexes = search_api_drush_get_indexes($index_name)) {
  $command_helper = new CommandHelper(\Drupal::entityTypeManager(), \Drupal::moduleHandler(), 'dt');
  $command_helper->setLogger(\Drupal::logger('search_api'));
  $indexes = $command_helper->loadIndexes([$index_name]);

  if ($index_name && $indexes) {

    $index = reset($indexes);

@@ -189,8 +195,12 @@ function drush_search_api_fast_search_api_index_fast_queue($index_name, $worker)
        $queue->deleteItems(array_keys($items));
      }

      $command_helper = new CommandHelper(\Drupal::entityTypeManager(), \Drupal::moduleHandler(), 'dt');
      $command_helper->setLogger(\Drupal::logger('search_api'));
      $indexes = $command_helper->loadIndexes([$index_name]);

      // Get index.
      if ($indexes = search_api_drush_get_indexes($index_name)) {
      if ($indexes) {
        $index = reset($indexes);

        // Index each batch.
@@ -204,7 +214,7 @@ function drush_search_api_fast_search_api_index_fast_queue($index_name, $worker)
            // Clear entity cache.
            // If not done, this becomes a huge memory leak.
            // Still, php GC is crap.
            search_api_fast_reset_entity_cache();
            search_api_fast_reset_entity_cache($index);
          }
          catch (SearchApiException $e) {
            drush_print($e);
@@ -257,21 +267,27 @@ function search_api_fast_get_queue($name) {

/**
 * Reset entity cache.
 *
 * @param Drupal\search_api\Entity\Index $index
 *   The index for which to reset the entity caches.
 */
function search_api_fast_reset_entity_cache() {
  $reset_entity_types = array(
    'taxonomy_term',
    'node',
    'field_collection_item',
    'paragraphs_item',
  );
function search_api_fast_reset_entity_cache(Index $index) {
  $index_datasources = $index->getDatasourceIds();
  $reset_entity_types = array_map(function ($value) {
    if (strpos($value, 'entity:') !== FALSE) {
      return str_replace('entity:', '', $value);
    }
    return NULL;
  }, $index_datasources);

  $valid_types = array_keys(\Drupal::service('entity_type.manager')->getDefinitions());
  foreach ($reset_entity_types as $entity_type) {
    if (isset($valid_types[$entity_type])) {
    if (!is_null($entity_type)) {
      if (array_search($entity_type, $valid_types)) {
        \Drupal::service('entity_type.manager')->getStorage($entity_type)->resetCache();
      }
    }
  }

  gc_collect_cycles();
}
+338 −0
Original line number Diff line number Diff line
<?php

namespace Drupal\search_api_fast\Commands;

use Drush\Commands\DrushCommands;
use Drupal\Core\Database\Database;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Extension\ModuleHandlerInterface;
use Psr\Log\LoggerInterface;
use Drupal\search_api\SearchApiException;
use Drupal\search_api\Utility\CommandHelper;
use Drupal\search_api\Entity\Index;
use Drupal\search_api_fast\SearchApiFastQueue;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Drush\Drush;

/**
 * A Drush commandfile.
 *
 * In addition to this file, you need a drush.services.yml
 * in root of your module, and a composer.json file that provides the name
 * of the services file to use.
 *
 * See these files for an example of injecting Drupal services:
 *   - http://cgit.drupalcode.org/devel/tree/src/Commands/DevelCommands.php
 *   - http://cgit.drupalcode.org/devel/tree/drush.services.yml
 */
class SearchApiFastCommands extends DrushCommands {

  /**
   * The command helper.
   *
   * @var \Drupal\search_api\Utility\CommandHelper
   */
  protected $commandHelper;
  protected $entityTypeManager;
  protected $searchApiFastIndexWorkers;
  protected $maxBatchesWorkerRespawn;
  protected $workerBatchSize;
  protected $drush;

  /**
   * Constructs a SearchApiFastCommands object.
   *
   * @param \Drupal\Core\Entity\EntityTypeManagerInterface $entityTypeManager
   *   The entity type manager.
   * @param \Drupal\Core\Extension\ModuleHandlerInterface $moduleHandler
   *   The module handler.
   * @param \Symfony\Component\EventDispatcher\EventDispatcherInterface $eventDispatcher
   *   The event dispatcher.
   */
  public function __construct(EntityTypeManagerInterface $entityTypeManager, ModuleHandlerInterface $moduleHandler, EventDispatcherInterface $eventDispatcher) {
    $this->commandHelper = new CommandHelper($entityTypeManager, $moduleHandler, $eventDispatcher, 'dt');
    $this->entityTypeManager = $entityTypeManager;

    $this->searchApiFastIndexWorkers = \Drupal::config('search_api_fast.performance')->get('index_workers');
    $this->maxBatchesWorkerRespawn = \Drupal::config('search_api_fast.performance')->get('max_batches_worker_respawn');
    $this->workerBatchSize = \Drupal::config('search_api_fast.performance')->get('worker_batch_size');
    $this->drush = \Drupal::config('search_api_fast.performance')->get('drush');
  }

  /**
   * {@inheritdoc}
   */
  public function setLogger(LoggerInterface $logger) {
    parent::setLogger($logger);
    $this->commandHelper->setLogger($logger);
  }

  /**
   * Index everything with use of multiple workers.
   *
   * @param string $index_name
   *   index ID to run on.
   * @param string $clear
   *   Provide "clear" to clear the index first or "reindex" to reindex.
   *
   * @usage drush sapi-fast [index] clear
   *   Index [index], and clear it first (no reindex)
   * @usage drush sapi-fast [index] reindex
   *   Index [index], and mark for reindex first
   * @usage drush sapi-fast [index]
   *   Index without clearing, just whats left un-indexed.
   *
   * @command search:api-index-fast
   * @aliases sapi-fast,search-api-index-fast
   */
  public function apiIndexFast($index_name = '', $clear = '') {
    $indexes = $this->commandHelper->loadIndexes([$index_name]);

    if ($index_name && $indexes) {

      $index = reset($indexes);

      // Check for other processes that mow the grass for MY feet away! Mofo's..
      // BTW, is $index_name injectable here? No, since we checked if it was a
      // real loadable index.
      if ($this->isAlreadyRunning($index_name)) {
        $this->output()->writeln('Some other process already running my commands... Exit.');
        exit;
      }

      // Clear index.
      if ($clear == 'clear') {
        $index->clear();
      }
      elseif ($clear == 'reindex') {
        $index->reindex();
      }

      // Get to-index items.
      $tracker = $index->getTrackerInstance();
      $items = $tracker->getRemainingItems();
      if ($items) {

        // Check again for already running processes that ME I would like
        // to spawn. Some time has probably passed since loading the indexable
        // items.
        // BTW, we can't get the parent pid of ME, because drush is effectively
        // detaching it. So, this is the most efficient way to do this.
        // I think...
        if ($this->isAlreadyRunning($index_name)) {
          $this->output()->writeln('Some other process already running my commands... Exit.');
          exit;
        }

        // Initiate queues.
        // One for each concurrent process.
        $queues = array();
        for ($worker = 0; $worker < $this->searchApiFastIndexWorkers; $worker++) {
          $queues[$worker] = $this->getQueue('search_api_fast_index_fast_' . $index_name . '_' . $worker);
          // There is no harm in trying to recreate existing.
          $queues[$worker]->createQueue();
        }

        // Add items to queue.
        $queue_index = 0;
        $item_queues = array();
        foreach ($items as $item) {
          $item_queues[$queue_index][] = $item;
          // Cycle queues round robin.
          if ($queue_index == (count($queues) - 1)) {
            $queue_index = 0;
          }
          else {
            $queue_index++;
          }
        }
        // Create queues.
        foreach ($item_queues as $queue_index => $items) {
          $queues[$queue_index]->createItems($items);
        }

        // Spawn new process for each queue.
        foreach ($item_queues as $queue_index => $items) {
          if (!empty($items)) {
            $this->fastIndexQueueInvoke($index_name, $queue_index);
          }
        }
      }
    }
    else {
      // Print indexes.
      Drush::drush(Drush::aliasManager()->getSelf(), 'sapi-l');
    }
  }

  /**
   * Index everything with use of multiple workers: queue worker.
   *
   * @param string $index_name
   *   Index ID.
   * @param string $worker
   *   The worker id, 0 to $this->searchApiFastIndexWorkers.
   *
   * @command search:api-index-fast-queue
   * @aliases sapi-ifq,search-api-index-fast-queue
   */
  public function apiIndexFastQueue($index_name, $worker) {
    if ($worker >= 0 && $worker <= $this->searchApiFastIndexWorkers) {
      // Get queue for this worker.
      $queue = $this->getQueue('search_api_fast_index_fast_' . $index_name . '_' . $worker);

      // Get number of items.
      $count = $queue->numberOfItems();
      if ($count) {

        // Create small batches to index.
        // $items will hold each batch.
        $item_lists = array();
        while (count($item_lists) < $this->maxBatchesWorkerRespawn && $items = $queue->claimItems($this->workerBatchSize)) {
          $item_lists[] = $items;
          $queue->deleteItems(array_keys($items));
        }

        $indexes = $this->commandHelper->loadIndexes([$index_name]);

        // Get index.
        if ($indexes) {
          $index = reset($indexes);

          // Index each batch.
          foreach ($item_lists as $item_list) {
            try {
              $items = $index->loadItemsMultiple($item_list);

              // Index the motherfuckers.
              $index->indexSpecificItems($items);

              // Clear entity cache.
              // If not done, this becomes a huge memory leak.
              // Still, php GC is crap.
              $this->resetEntityCache($index);
            }
            catch (SearchApiException $e) {
              $this->output()->writeln($e);
              return FALSE;
            }
          }

          // Respawn because php GC works like crap.
          if ($queue->numberOfItems()) {
            $this->respawn();
          }
        }
      }
    }
    return TRUE;
  }

  /**
   * Queue workers already spawned ?
   *
   * @param string $index_name
   *   Index name.
   *
   * @return bool
   *   TRUE: yes. FALSE: no queue workers around.
   */
  protected function isAlreadyRunning($index_name) {
    exec("ps -ef | grep 'search-api-index-fast-queue " . escapeshellarg($index_name) . "\b' | grep -v grep", $proclist);
    if (!empty($proclist)) {
      return TRUE;
    }
    return FALSE;
  }

  /**
   * Load queue object.
   *
   * This one provides setting, getting, claiming items in multiples.
   * Saves a HUGE amount of queries (minutes).
   *
   * @param string $name
   *   Queue name.
   *
   * @return \Drupal\search_api_fast\SearchApiFastQueue
   *   Queue object.
   */
  protected function getQueue($name) {
    return new SearchApiFastQueue($name, Database::getConnection());
  }

  /**
   * Spawn new drush process to start worker for indexing stuff.
   *
   * @param string $index_name
   *   Sapi index.
   * @param int $worker
   *   Queue number.
   */
  protected function fastIndexQueueInvoke($index_name, $worker) {
    // Get drush binary from script runtime parameters,
    // or guess.
    $drush = escapeshellarg($this->drush);
    if (php_sapi_name() == 'cli') {
      global $argv;
      if (strpos($argv[0], 'drush') !== FALSE) {
        $drush = escapeshellcmd($argv[0]);
      }
    }

    exec('nohup ' . $drush . ' search-api-index-fast-queue ' . $index_name . ' ' . $worker . ' > /dev/null 2>&1 &');
  }

  /**
   * Reset entity cache.
   *
   * @param Drupal\search_api\Entity\Index $index
   *   The index for which to reset the entity caches.
   */
  protected function resetEntityCache(Index $index) {
    $index_datasources = $index->getDatasourceIds();
    $reset_entity_types = array_map(function ($value) {
      if (strpos($value, 'entity:') !== FALSE) {
        return str_replace('entity:', '', $value);
      }
      return NULL;
    }, $index_datasources);

    $valid_types = array_keys($this->entityTypeManager->getDefinitions());
    foreach ($reset_entity_types as $entity_type) {
      if (!is_null($entity_type)) {
        if (array_search($entity_type, $valid_types)) {
          $this->entityTypeManager->getStorage($entity_type)->resetCache();
        }
      }
    }

    gc_collect_cycles();
  }

  /**
   * Respawn ME if this is a drush process.
   *
   * Used to tame memory usage vs garbage collection (lack thereof).
   */
  protected function respawn() {
    if (php_sapi_name() == 'cli') {
      global $argv;

      if (strpos($argv[0], 'drush') !== FALSE) {
        $cmd = escapeshellcmd($argv[0]);
        unset($argv[0]);
        $params = array();
        foreach ($argv as $arg) {
          $params[] = escapeshellarg($arg);
        }
        $cmd .= ' ' . implode(' ', $params);

        \Drupal::logger('search_api_fast')->debug(dt('Respawning: :cmd', array(':cmd' => $cmd)));
        exec('nohup ' . $cmd . ' > /dev/null 2>&1 &');
        exit();
      }
    }
  }

}