Skip to content
Snippets Groups Projects
Forked from project / search_api_opensearch
19 commits behind, 14 commits ahead of the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
BackendClient.php 13.19 KiB
<?php

namespace Drupal\search_api_opensearch\SearchAPI;

use Drupal\Component\Utility\NestedArray;
use Drupal\Core\DependencyInjection\DependencySerializationTrait;
use Drupal\Core\Utility\Error;
use Drupal\search_api\IndexInterface;
use Drupal\search_api\Query\QueryInterface;
use Drupal\search_api\Query\ResultSetInterface;
use Drupal\search_api\SearchApiException;
use Drupal\search_api\Utility\FieldsHelperInterface;
use Drupal\search_api_opensearch\Analyser\AnalyserInterface;
use Drupal\search_api_opensearch\Analyser\AnalyserManager;
use Drupal\search_api_opensearch\Event\AlterSettingsEvent;
use Drupal\search_api_opensearch\Event\IndexCreatedEvent;
use Drupal\search_api_opensearch\SearchAPI\Query\QueryParamBuilder;
use Drupal\search_api_opensearch\SearchAPI\Query\QueryResultParser;
use OpenSearch\Client;
use OpenSearch\Common\Exceptions\OpenSearchException;
use Psr\Log\LoggerInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;

/**
 * Provides an OpenSearch Search API client.
 */
class BackendClient implements BackendClientInterface {

  use DependencySerializationTrait {
    __sleep as traitSleep;
  }

  /**
   * {@inheritdoc}
   */
  public function isAvailable() {
    try {
      return $this->client->ping();
    }
    catch (\Exception $e) {
      $this->logger->error('%type: @message in %function (line %line of %file).', Error::decodeException($e));
      return FALSE;
    }
  }

  /**
   * Constructs a new BackendClient.
   *
   * @param \Drupal\search_api_opensearch\SearchAPI\Query\QueryParamBuilder $queryParamBuilder
   *   The query param builder.
   * @param \Drupal\search_api_opensearch\SearchAPI\Query\QueryResultParser $resultParser
   *   The query result parser.
   * @param \Drupal\search_api_opensearch\SearchAPI\DeleteParamBuilder $deleteParamBuilder
   *   The delete param builder.
   * @param \Drupal\search_api_opensearch\SearchAPI\IndexParamBuilder $indexParamBuilder
   *   The index param builder.
   * @param \Drupal\search_api\Utility\FieldsHelperInterface $fieldsHelper
   *   The fields helper.
   * @param \Drupal\search_api_opensearch\SearchAPI\FieldMapper $fieldParamsBuilder
   *   THe field mapper.
   * @param \Psr\Log\LoggerInterface $logger
   *   The logger.
   * @param \OpenSearch\Client $client
   *   The OpenSearch client.
   * @param \Drupal\search_api_opensearch\Analyser\AnalyserManager $analyserManager
   *   Analyser manager.
   * @param \Symfony\Contracts\EventDispatcher\EventDispatcherInterface $eventDispatcher
   *   The event dispatcher.
   * @param array $settings
   *   The settings.
   */
  public function __construct(
    protected QueryParamBuilder $queryParamBuilder,
    protected QueryResultParser $resultParser,
    protected DeleteParamBuilder $deleteParamBuilder,
    protected IndexParamBuilder $indexParamBuilder,
    protected FieldsHelperInterface $fieldsHelper,
    protected FieldMapper $fieldParamsBuilder,
    protected LoggerInterface $logger,
    protected Client $client,
    protected AnalyserManager $analyserManager,
    protected EventDispatcherInterface $eventDispatcher,
    protected array $settings = [],
  ) {
  }

  /**
   * {@inheritdoc}
   */
  public function indexItems(IndexInterface $index, array $items): array {
    if (empty($items)) {
      return [];
    }
    $indexId = $this->getIndexId($index);

    $params = $this->indexParamBuilder->buildIndexParams($indexId, $index, $items);

    try {
      $response = $this->client->bulk($params);
      // If there were any errors, log them and throw an exception.
      if (!empty($response['errors'])) {
        foreach ($response['items'] as $item) {
          if (!empty($item['index']['status']) && $item['index']['status'] >= 400) {
            $this->logger->error('%reason %caused_by for id: %id. Status code: %code', [
              '%reason' => $item['index']['error']['reason'],
              '%caused_by' => $item['index']['error']['caused_by']['reason'] ?? '',
              '%id' => $item['index']['_id'],
              '%code' => $item['index']['status'],
            ]);
          }
        }
        throw new SearchApiException('An error occurred indexing items.');
      }
    }
    catch (OpenSearchException $e) {
      throw new SearchApiException(sprintf('%s when indexing items in index %s.', $e->getMessage(), $indexId), 0, $e);
    }

    return array_keys($items);

  }

  /**
   * {@inheritdoc}
   */
  public function deleteItems(IndexInterface $index, array $item_ids): void {
    if (empty($item_ids)) {
      return;
    }

    $indexId = $this->getIndexId($index);
    $params = $this->deleteParamBuilder->buildDeleteParams($indexId, $item_ids);
    try {
      $this->client->bulk($params);
    }
    catch (OpenSearchException $e) {
      throw new SearchApiException(sprintf('An error occurred deleting items from the index %s.', $indexId), 0, $e);
    }

  }

  /**
   * {@inheritdoc}
   */
  public function search(QueryInterface $query): ResultSetInterface {
    $resultSet = $query->getResults();
    $index = $query->getIndex();
    $indexId = $this->getIndexId($index);
    $params = [
      'index' => $indexId,
    ];

    // Check index exists.
    if (!$this->client->indices()->exists($params)) {
      $this->logger->warning('Index "%index" does not exist.', ["%index" => $indexId]);
      return $resultSet;
    }

    // Build OpenSearch query.
    $params = $this->queryParamBuilder->buildQueryParams($indexId, $query, $this->settings);

    try {

      // When set to true the search response will always track the number of
      // hits that match the query accurately.
      $params['track_total_hits'] = TRUE;

      // Do search.
      $response = $this->client->search($params);
      $resultSet = $this->resultParser->parseResult($query, $response);

      return $resultSet;
    }
    catch (OpenSearchException $e) {
      throw new SearchApiException(sprintf('Error querying index %s', $indexId), 0, $e);
    }
  }

  /**
   * {@inheritdoc}
   */
  public function removeIndex($index): void {
    if (!$this->indexExists($index)) {
      return;
    }
    $indexId = $this->getIndexId($index);
    try {
      $this->client->indices()->delete([
        'index' => [$indexId],
      ]);
    }
    catch (OpenSearchException $e) {
      throw new SearchApiException(sprintf('An error occurred removing the index %s.', $indexId), 0, $e);
    }
  }

  /**
   * {@inheritdoc}
   */
  public function addIndex(IndexInterface $index): void {
    $indexId = $this->getIndexId($index);
    if ($this->indexExists($index)) {
      return;
    }

    try {
      $this->client->indices()->create([
        'index' => $indexId,
      ]);
      $this->updateSettings($index);
      $this->updateFieldMapping($index);
      $event = new IndexCreatedEvent($index);
      $this->eventDispatcher->dispatch($event);
    }
    catch (OpenSearchException $e) {
      throw new SearchApiException(sprintf('An error occurred creating the index %s.', $indexId), 0, $e);
    }
  }

  /**
   * {@inheritdoc}
   */
  public function updateIndex(IndexInterface $index): void {
    if ($this->indexExists($index)) {
      $result = $this->indexNeedsClearing($index);
      if ($result) {
        $index->clear();
      }
      $this->updateSettings($index);
      $this->updateFieldMapping($index);
      if (!$result) {
        $index->reindex();
      }
    }
    else {
      $this->addIndex($index);
    }
  }

  /**
   * Updates the field mappings for an index.
   *
   * @param \Drupal\search_api\IndexInterface $index
   *   The index.
   *
   * @throws \Drupal\search_api\SearchApiException
   *   Thrown when an underlying OpenSearch error occurs.
   */
  public function updateFieldMapping(IndexInterface $index): void {
    $indexId = $this->getIndexId($index);
    try {
      $params = $this->fieldParamsBuilder->mapFieldParams($indexId, $index);
      $this->client->indices()->putMapping($params);
    }
    catch (OpenSearchException $e) {
      throw new SearchApiException(sprintf('An error occurred updating field mappings for index %s.', $indexId), 0, $e);
    }
  }

  /**
   * {@inheritdoc}
   */
  public function clearIndex(IndexInterface $index, ?string $datasource_id = NULL): void {
    $this->removeIndex($index);
    $this->addIndex($index);
  }

  /**
   * {@inheritdoc}
   */
  public function indexExists(IndexInterface $index): bool {
    $indexId = $this->getIndexId($index);
    try {
      return $this->client->indices()->exists([
        'index' => $indexId,
      ]);
    }
    catch (OpenSearchException $e) {
      throw new SearchApiException(sprintf('An error occurred checking if the index %s exists.', $indexId), 0, $e);
    }
  }

  /**
   * Gets the index ID.
   *
   * @param \Drupal\search_api\IndexInterface $index
   *   The index.
   *
   * @return string
   *   The index ID.
   */
  public function getIndexId(IndexInterface $index) {
    return $this->settings['prefix'] . $index->id();
  }

  /**
   * {@inheritdoc}
   *
   * Make sure that the client does not get serialized.
   */
  public function __sleep() {
    $vars = $this->traitSleep();
    unset($vars[array_search('client', $vars)]);
    return $vars;
  }

  /**
   * Updates index settings.
   *
   * @param \Drupal\search_api\IndexInterface $index_param
   *   Index.
   */
  public function updateSettings(IndexInterface $index_param): void {
    $indexId = $this->getIndexId($index_param);
    $params = $this->fieldParamsBuilder->mapFieldParams($indexId, $index_param);
    $analyzers = array_reduce($params['body']['properties'], function (array $carry, array $field_definition) {
      if (isset($field_definition['analyzer'])) {
        $carry[$field_definition['analyzer']] = $field_definition['analyzer_settings'] ?? [];
      }
      return $carry;
    }, []);
    $settings = [];
    foreach ($analyzers as $analyzer_id => $configuration) {
      $analyser = $this->analyserManager->createInstance($analyzer_id, $configuration);
      assert($analyser instanceof AnalyserInterface);
      $settings = NestedArray::mergeDeep($settings, $analyser->getSettings());
    }

    $backendConfig = $index_param->getServerInstance()->getBackendConfig();

    $event = new AlterSettingsEvent($settings, $backendConfig);
    $this->eventDispatcher->dispatch($event);
    $settings = $event->getSettings();

    if (!$settings) {
      // Nothing to push.
      return;
    }
    try {
      $index_param = [
        'index' => $indexId,
      ];
      $this->client->indices()->close($index_param);
      $this->client->indices()->putSettings($index_param + [
        'body' => $settings,
      ]);
    }
    catch (OpenSearchException $e) {
      throw new SearchApiException(sprintf('An error occurred updating settings for index %s.', $indexId), 0, $e);
    }
    finally {
      $this->client->indices()->open($index_param);
    }
  }

  /**
   * Determine whether the index needs clearing.
   *
   * OpenSearch does not allow changing existing field
   * mappings with data but does allow adding new fields.
   * OpenSearch has dynamic mappings which means it will automatically
   * add types to fields based on the indexed data if no mapping
   * is explicitly provided. It will also override the mapping
   * if it is wrong, e.g. if you provide a string type but
   * the indexed data for the field is a float it will override the
   * mapping. In order to make sure the index is not cleared for minor
   * changes It's important to make sure the types for fields are correct
   * and that any custom fields are explicitly mapped.
   */
  private function indexNeedsClearing(IndexInterface $index): bool {
    $openSearchMapping = $this->client->indices()->getMapping([
      'index' => $this->getIndexId($index),
    ]);
    $drupalMapping = $this->fieldParamsBuilder->mapFieldParams($this->getIndexId($index), $index);

    // If mappings have yet to be set no need to clear.
    if (!isset($openSearchMapping[$this->getIndexId($index)]['mappings']['properties'])) {
      return FALSE;
    }
    // Recursively check for differences in the mappings between the
    // $openSearchMapping and $drupalMapping arrays. Before comparing,
    // convert $drupalMapping to a json format returned as an
    // associative array, so it is in the same format as the openSearchMapping.
    // For example the putMappings method on the OS client expects an empty
    // object but getMapping returns an empty array. Ensure float values are
    // preserved as json_encode doesn't preserve them by default.
    return $this->mappingsHaveDifferences(
      $openSearchMapping[$this->getIndexId($index)]['mappings']['properties'],
      json_decode(
        json_encode($drupalMapping['body']['properties'], JSON_PRESERVE_ZERO_FRACTION),
        TRUE
      )
    );
  }

  /**
   * Recursively diff an associative array to find differences.
   *
   * If any difference is found bail early.
   */
  private function mappingsHaveDifferences(array $array1, array $array2): bool {
    foreach ($array1 as $key => $value) {
      if (is_array($value)) {
        if (!isset($array2[$key]) || !is_array($array2[$key])) {
          return TRUE;
        }
        else {
          $newDiff = $this->mappingsHaveDifferences($value, $array2[$key]);
          if (!empty($newDiff)) {
            return TRUE;
          }
        }
      }
      elseif (!array_key_exists($key, $array2) || $array2[$key] !== $value) {
        return TRUE;
      }
    }
    return FALSE;
  }
}