From c6532d8c5c844ede64db6ab51e314cd50655a270 Mon Sep 17 00:00:00 2001
From: Alex Pott <alex.a.pott@googlemail.com>
Date: Fri, 17 Mar 2023 15:47:34 +0000
Subject: [PATCH] Issue #3265086 by mondrake, alexpott, pwolanin, VladimirAus,
 daffie, mfb, catch: Implement statement classes using \Iterator to fix memory
 usage regression and prevent rewinding

---
 .../Core/Database/StatementInterface.php      |   6 +-
 .../Core/Database/StatementIteratorTrait.php  | 145 ++++++
 .../Core/Database/StatementPrefetch.php       |   7 +
 .../Database/StatementPrefetchIterator.php    | 413 ++++++++++++++++++
 .../Drupal/Core/Database/StatementWrapper.php |   7 +
 .../Database/StatementWrapperIterator.php     | 303 +++++++++++++
 .../src/Driver/Database/mysql/Connection.php  |   4 +-
 .../src/Driver/Database/pgsql/Connection.php  |   4 +-
 .../src/Driver/Database/sqlite/Statement.php  |   8 +-
 .../KernelTests/Core/Database/FetchTest.php   |  69 +++
 .../Core/Database/StatementTest.php           | 113 +++++
 .../Tests/Core/Database/ConnectionTest.php    |  29 +-
 12 files changed, 1096 insertions(+), 12 deletions(-)
 create mode 100644 core/lib/Drupal/Core/Database/StatementIteratorTrait.php
 create mode 100644 core/lib/Drupal/Core/Database/StatementPrefetchIterator.php
 create mode 100644 core/lib/Drupal/Core/Database/StatementWrapperIterator.php

diff --git a/core/lib/Drupal/Core/Database/StatementInterface.php b/core/lib/Drupal/Core/Database/StatementInterface.php
index 09a6b94f5054..b4adf3742d9d 100644
--- a/core/lib/Drupal/Core/Database/StatementInterface.php
+++ b/core/lib/Drupal/Core/Database/StatementInterface.php
@@ -7,12 +7,12 @@
 /**
  * Represents a prepared statement.
  *
- * Child implementations should either extend StatementWrapper:
+ * Child implementations should either extend StatementWrapperIterator:
  * @code
- * class Drupal\mymodule\Driver\Database\mydriver\Statement extends Drupal\Core\Database\StatementWrapper {}
+ * class Drupal\mymodule\Driver\Database\mydriver\Statement extends Drupal\Core\Database\StatementWrapperIterator {}
  * @endcode
  * or define their own class. If defining their own class, they will also have
- * to implement either the Iterator or IteratorAggregate interface before
+ * to implement either the \Iterator or \IteratorAggregate interface before
  * Drupal\Core\Database\StatementInterface:
  * @code
  * class Drupal\mymodule\Driver\Database\mydriver\Statement implements Iterator, Drupal\Core\Database\StatementInterface {}
diff --git a/core/lib/Drupal/Core/Database/StatementIteratorTrait.php b/core/lib/Drupal/Core/Database/StatementIteratorTrait.php
new file mode 100644
index 000000000000..c4071545caa1
--- /dev/null
+++ b/core/lib/Drupal/Core/Database/StatementIteratorTrait.php
@@ -0,0 +1,145 @@
+<?php
+
+namespace Drupal\Core\Database;
+
+/**
+ * StatementInterface iterator trait.
+ *
+ * Implements the methods required by StatementInterface objects that implement
+ * the \Iterator interface.
+ */
+trait StatementIteratorTrait {
+
+  /**
+   * Traces if rows can be fetched from the resultset.
+   */
+  private bool $isResultsetIterable = FALSE;
+
+  /**
+   * The current row, retrieved in the current fetch format.
+   */
+  private mixed $resultsetRow = NULL;
+
+  /**
+   * The key of the current row.
+   *
+   * This keeps the index of rows fetched from the underlying statement. It is
+   * set to -1 when no rows have been fetched yet.
+   */
+  private int $resultsetKey = -1;
+
+  /**
+   * Informs the iterator whether rows can be fetched from the resultset.
+   *
+   * @param bool $valid
+   *   The result of the execution of the client statement.
+   */
+  protected function markResultsetIterable(bool $valid): void {
+    $this->isResultsetIterable = $valid;
+    $this->resultsetRow = NULL;
+    if ($valid === TRUE) {
+      $this->resultsetKey = -1;
+    }
+  }
+
+  /**
+   * Sets the current resultset row for the iterator, and increments the key.
+   *
+   * @param mixed $row
+   *   The last row fetched from the client statement.
+   */
+  protected function setResultsetCurrentRow(mixed $row): void {
+    $this->resultsetRow = $row;
+    $this->resultsetKey++;
+  }
+
+  /**
+   * Returns the row index of the current element in the resultset.
+   *
+   * @return int
+   *   The row index of the current element in the resultset.
+   */
+  protected function getResultsetCurrentRowIndex(): int {
+    return $this->resultsetKey;
+  }
+
+  /**
+   * Informs the iterator that no more rows can be fetched from the resultset.
+   */
+  protected function markResultsetFetchingComplete(): void {
+    $this->markResultsetIterable(FALSE);
+  }
+
+  /**
+   * Returns the current element.
+   *
+   * @see https://www.php.net/manual/en/iterator.current.php
+   *
+   * @internal This method should not be called directly.
+   */
+  public function current(): mixed {
+    return $this->resultsetRow;
+  }
+
+  /**
+   * Returns the key of the current element.
+   *
+   * @see https://www.php.net/manual/en/iterator.key.php
+   *
+   * @internal This method should not be called directly.
+   */
+  public function key(): mixed {
+    return $this->resultsetKey;
+  }
+
+  /**
+   * Rewinds back to the first element of the Iterator.
+   *
+   * This is the first method called when starting a foreach loop. It will not
+   * be executed after foreach loops.
+   *
+   * @see https://www.php.net/manual/en/iterator.rewind.php
+   *
+   * @internal This method should not be called directly.
+   */
+  public function rewind(): void {
+    // Nothing to do: our DatabaseStatement can't be rewound. Error out when
+    // attempted.
+    // @todo convert the error to an exception in Drupal 11.
+    if ($this->resultsetKey >= 0) {
+      trigger_error('Attempted rewinding a StatementInterface object when fetching has already started. Refactor your code to avoid rewinding statement objects.', E_USER_WARNING);
+      $this->markResultsetIterable(FALSE);
+    }
+  }
+
+  /**
+   * Moves the current position to the next element.
+   *
+   * This method is called after each foreach loop.
+   *
+   * @see https://www.php.net/manual/en/iterator.next.php
+   *
+   * @internal This method should not be called directly.
+   */
+  public function next(): void {
+    $this->fetch();
+  }
+
+  /**
+   * Checks if current position is valid.
+   *
+   * This method is called after ::rewind() and ::next() to check if the
+   * current position is valid.
+   *
+   * @see https://www.php.net/manual/en/iterator.valid.php
+   *
+   * @internal This method should not be called directly.
+   */
+  public function valid(): bool {
+    if ($this->isResultsetIterable && $this->resultsetKey === -1) {
+      $this->fetch();
+    }
+    return $this->isResultsetIterable;
+  }
+
+}
diff --git a/core/lib/Drupal/Core/Database/StatementPrefetch.php b/core/lib/Drupal/Core/Database/StatementPrefetch.php
index 74946cdb1e78..0e3b98d228c0 100644
--- a/core/lib/Drupal/Core/Database/StatementPrefetch.php
+++ b/core/lib/Drupal/Core/Database/StatementPrefetch.php
@@ -2,6 +2,8 @@
 
 namespace Drupal\Core\Database;
 
+@trigger_error('\Drupal\Core\Database\StatementPrefetch is deprecated in drupal:10.1.0 and is removed from drupal:11.0.0. Use \Drupal\Core\Database\StatementPrefetchIterator instead. See https://www.drupal.org/node/3265938', E_USER_DEPRECATED);
+
 use Drupal\Core\Database\Event\StatementExecutionEndEvent;
 use Drupal\Core\Database\Event\StatementExecutionStartEvent;
 
@@ -11,6 +13,11 @@
  * This class behaves very similar to a StatementWrapper of a \PDOStatement
  * but as it always fetches every row it is possible to manipulate those
  * results.
+ *
+ * @deprecated in drupal:10.1.0 and is removed from drupal:11.0.0. Use
+ *   \Drupal\Core\Database\StatementPrefetchIterator instead.
+ *
+ * @see https://www.drupal.org/node/3265938
  */
 class StatementPrefetch implements \Iterator, StatementInterface {
 
diff --git a/core/lib/Drupal/Core/Database/StatementPrefetchIterator.php b/core/lib/Drupal/Core/Database/StatementPrefetchIterator.php
new file mode 100644
index 000000000000..37227095f2ee
--- /dev/null
+++ b/core/lib/Drupal/Core/Database/StatementPrefetchIterator.php
@@ -0,0 +1,413 @@
+<?php
+
+namespace Drupal\Core\Database;
+
+use Drupal\Core\Database\Event\StatementExecutionEndEvent;
+use Drupal\Core\Database\Event\StatementExecutionStartEvent;
+
+/**
+ * An implementation of StatementInterface that prefetches all data.
+ *
+ * This class behaves very similar to a StatementWrapperIterator of a
+ * \PDOStatement but as it always fetches every row it is possible to
+ * manipulate those results.
+ *
+ * @todo use fully StatementIteratorTrait and remove \Iterator methods
+ *   implementations.
+ */
+class StatementPrefetchIterator implements \Iterator, StatementInterface {
+
+  use StatementIteratorTrait;
+
+  /**
+   * Main data store.
+   *
+   * The resultset is stored as a \PDO::FETCH_ASSOC array.
+   */
+  protected array $data = [];
+
+  /**
+   * The list of column names in this result set.
+   *
+   * @var string[]
+   */
+  protected ?array $columnNames = NULL;
+
+  /**
+   * The number of rows matched by the last query.
+   */
+  protected ?int $rowCount = NULL;
+
+  /**
+   * Holds the default fetch style.
+   */
+  protected int $defaultFetchStyle = \PDO::FETCH_OBJ;
+
+  /**
+   * Holds fetch options.
+   *
+   * @var string[]
+   */
+  protected array $fetchOptions = [
+    'class' => 'stdClass',
+    'constructor_args' => [],
+    'object' => NULL,
+    'column' => 0,
+  ];
+
+  /**
+   * Constructs a StatementPrefetchIterator object.
+   *
+   * @param object $clientConnection
+   *   Client database connection object, for example \PDO.
+   * @param \Drupal\Core\Database\Connection $connection
+   *   The database connection.
+   * @param string $queryString
+   *   The query string.
+   * @param array $driverOptions
+   *   Driver-specific options.
+   * @param bool $rowCountEnabled
+   *   (optional) Enables counting the rows matched. Defaults to FALSE.
+   */
+  public function __construct(
+    protected readonly object $clientConnection,
+    protected readonly Connection $connection,
+    protected string $queryString,
+    protected array $driverOptions = [],
+    protected readonly bool $rowCountEnabled = FALSE,
+  ) {
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function getConnectionTarget(): string {
+    return $this->connection->getTarget();
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function execute($args = [], $options = []) {
+    if (isset($options['fetch'])) {
+      if (is_string($options['fetch'])) {
+        // Default to an object. Note: db fields will be added to the object
+        // before the constructor is run. If you need to assign fields after
+        // the constructor is run. See https://www.drupal.org/node/315092.
+        $this->setFetchMode(\PDO::FETCH_CLASS, $options['fetch']);
+      }
+      else {
+        $this->setFetchMode($options['fetch']);
+      }
+    }
+
+    if ($this->connection->isEventEnabled(StatementExecutionStartEvent::class)) {
+      $startEvent = new StatementExecutionStartEvent(
+        spl_object_id($this),
+        $this->connection->getKey(),
+        $this->connection->getTarget(),
+        $this->getQueryString(),
+        $args ?? [],
+        $this->connection->findCallerFromDebugBacktrace()
+      );
+      $this->connection->dispatchEvent($startEvent);
+    }
+
+    // Prepare the query.
+    $statement = $this->getStatement($this->queryString, $args);
+    if (!$statement) {
+      $this->throwPDOException();
+    }
+
+    $return = $statement->execute($args);
+    if (!$return) {
+      $this->throwPDOException();
+    }
+
+    // Fetch all the data from the reply, in order to release any lock as soon
+    // as possible.
+    $this->data = $statement->fetchAll(\PDO::FETCH_ASSOC);
+    $this->rowCount = $this->rowCountEnabled ? $statement->rowCount() : NULL;
+    // Destroy the statement as soon as possible. See the documentation of
+    // \Drupal\sqlite\Driver\Database\sqlite\Statement for an explanation.
+    unset($statement);
+    $this->markResultsetIterable($return);
+
+    $this->columnNames = count($this->data) > 0 ? array_keys($this->data[0]) : [];
+
+    if (isset($startEvent) && $this->connection->isEventEnabled(StatementExecutionEndEvent::class)) {
+      $this->connection->dispatchEvent(new StatementExecutionEndEvent(
+        $startEvent->statementObjectId,
+        $startEvent->key,
+        $startEvent->target,
+        $startEvent->queryString,
+        $startEvent->args,
+        $startEvent->caller,
+        $startEvent->time
+      ));
+    }
+
+    return $return;
+  }
+
+  /**
+   * Throw a PDO Exception based on the last PDO error.
+   */
+  protected function throwPDOException(): void {
+    $error_info = $this->connection->errorInfo();
+    // We rebuild a message formatted in the same way as PDO.
+    $exception = new \PDOException("SQLSTATE[" . $error_info[0] . "]: General error " . $error_info[1] . ": " . $error_info[2]);
+    $exception->errorInfo = $error_info;
+    throw $exception;
+  }
+
+  /**
+   * Grab a PDOStatement object from a given query and its arguments.
+   *
+   * Some drivers (including SQLite) will need to perform some preparation
+   * themselves to get the statement right.
+   *
+   * @param $query
+   *   The query.
+   * @param array|null $args
+   *   An array of arguments. This can be NULL.
+   *
+   * @return object
+   *   A PDOStatement object.
+   */
+  protected function getStatement(string $query, ?array &$args = []): object {
+    return $this->connection->prepare($query, $this->driverOptions);
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function getQueryString() {
+    return $this->queryString;
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function setFetchMode($mode, $a1 = NULL, $a2 = []) {
+    $this->defaultFetchStyle = $mode;
+    switch ($mode) {
+      case \PDO::FETCH_CLASS:
+        $this->fetchOptions['class'] = $a1;
+        if ($a2) {
+          $this->fetchOptions['constructor_args'] = $a2;
+        }
+        break;
+
+      case \PDO::FETCH_COLUMN:
+        $this->fetchOptions['column'] = $a1;
+        break;
+
+      case \PDO::FETCH_INTO:
+        $this->fetchOptions['object'] = $a1;
+        break;
+    }
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function rowCount() {
+    // SELECT query should not use the method.
+    if ($this->rowCountEnabled) {
+      return $this->rowCount;
+    }
+    else {
+      throw new RowCountException();
+    }
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function fetch($fetch_style = NULL, $cursor_orientation = \PDO::FETCH_ORI_NEXT, $cursor_offset = NULL) {
+    $currentKey = $this->getResultsetCurrentRowIndex();
+
+    // We can remove the current record from the prefetched data, before
+    // moving to the next record.
+    unset($this->data[$currentKey]);
+    $currentKey++;
+    if (!isset($this->data[$currentKey])) {
+      $this->markResultsetFetchingComplete();
+      return FALSE;
+    }
+
+    // Now, format the next prefetched record according to the required fetch
+    // style.
+    $rowAssoc = $this->data[$currentKey];
+    switch ($fetch_style ?? $this->defaultFetchStyle) {
+      case \PDO::FETCH_ASSOC:
+        $row = $rowAssoc;
+        break;
+
+      case \PDO::FETCH_BOTH:
+        // \PDO::FETCH_BOTH returns an array indexed by both the column name
+        // and the column number.
+        $row = $rowAssoc + array_values($rowAssoc);
+        break;
+
+      case \PDO::FETCH_NUM:
+        $row = array_values($rowAssoc);
+        break;
+
+      case \PDO::FETCH_LAZY:
+        // We do not do lazy as everything is fetched already. Fallback to
+        // \PDO::FETCH_OBJ.
+      case \PDO::FETCH_OBJ:
+        $row = (object) $rowAssoc;
+        break;
+
+      case \PDO::FETCH_CLASS | \PDO::FETCH_CLASSTYPE:
+        $class_name = array_shift($rowAssoc);
+        // Deliberate no break.
+      case \PDO::FETCH_CLASS:
+        if (!isset($class_name)) {
+          $class_name = $this->fetchOptions['class'];
+        }
+        if (count($this->fetchOptions['constructor_args'])) {
+          $reflector = new \ReflectionClass($class_name);
+          $result = $reflector->newInstanceArgs($this->fetchOptions['constructor_args']);
+        }
+        else {
+          $result = new $class_name();
+        }
+        foreach ($rowAssoc as $k => $v) {
+          $result->$k = $v;
+        }
+        $row = $result;
+        break;
+
+      case \PDO::FETCH_INTO:
+        foreach ($rowAssoc as $k => $v) {
+          $this->fetchOptions['object']->$k = $v;
+        }
+        $row = $this->fetchOptions['object'];
+        break;
+
+      case \PDO::FETCH_COLUMN:
+        if (isset($this->columnNames[$this->fetchOptions['column']])) {
+          $row = $rowAssoc[$this->columnNames[$this->fetchOptions['column']]];
+        }
+        else {
+          return FALSE;
+        }
+        break;
+
+    }
+    // @todo in Drupal 11, throw an exception if $row is undefined at this
+    //   point.
+    if (!isset($row)) {
+      return FALSE;
+    }
+    $this->setResultsetCurrentRow($row);
+    return $row;
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function fetchColumn($index = 0) {
+    if ($row = $this->fetch(\PDO::FETCH_ASSOC)) {
+      return $row[$this->columnNames[$index]];
+    }
+    return FALSE;
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function fetchField($index = 0) {
+    return $this->fetchColumn($index);
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function fetchObject(string $class_name = NULL, array $constructor_arguments = NULL) {
+    if (!isset($class_name)) {
+      return $this->fetch(\PDO::FETCH_OBJ);
+    }
+    $this->fetchOptions = [
+      'class' => $class_name,
+      'constructor_args' => $constructor_arguments,
+    ];
+    return $this->fetch(\PDO::FETCH_CLASS);
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function fetchAssoc() {
+    return $this->fetch(\PDO::FETCH_ASSOC);
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function fetchAll($mode = NULL, $column_index = NULL, $constructor_arguments = NULL) {
+    $fetchStyle = $mode ?? $this->defaultFetchStyle;
+    if (isset($column_index)) {
+      $this->fetchOptions['column'] = $column_index;
+    }
+    if (isset($constructor_arguments)) {
+      $this->fetchOptions['constructor_args'] = $constructor_arguments;
+    }
+
+    $result = [];
+    while ($row = $this->fetch($fetchStyle)) {
+      $result[] = $row;
+    }
+    return $result;
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function fetchCol($index = 0) {
+    if (isset($this->columnNames[$index])) {
+      $result = [];
+      while ($row = $this->fetch(\PDO::FETCH_ASSOC)) {
+        $result[] = $row[$this->columnNames[$index]];
+      }
+      return $result;
+    }
+    return [];
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function fetchAllKeyed($key_index = 0, $value_index = 1) {
+    if (!isset($this->columnNames[$key_index]) || !isset($this->columnNames[$value_index])) {
+      return [];
+    }
+
+    $key = $this->columnNames[$key_index];
+    $value = $this->columnNames[$value_index];
+
+    $result = [];
+    while ($row = $this->fetch(\PDO::FETCH_ASSOC)) {
+      $result[$row[$key]] = $row[$value];
+    }
+    return $result;
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function fetchAllAssoc($key, $fetch_style = NULL) {
+    $fetchStyle = $fetch_style ?? $this->defaultFetchStyle;
+
+    $result = [];
+    while ($row = $this->fetch($fetchStyle)) {
+      $result[$this->data[$this->getResultsetCurrentRowIndex()][$key]] = $row;
+    }
+    return $result;
+  }
+
+}
diff --git a/core/lib/Drupal/Core/Database/StatementWrapper.php b/core/lib/Drupal/Core/Database/StatementWrapper.php
index 8b70f3d6d38a..d8e7b1c39883 100644
--- a/core/lib/Drupal/Core/Database/StatementWrapper.php
+++ b/core/lib/Drupal/Core/Database/StatementWrapper.php
@@ -7,8 +7,15 @@
 
 // cSpell:ignore maxlen driverdata INOUT
 
+@trigger_error('\Drupal\Core\Database\StatementWrapper is deprecated in drupal:10.1.0 and is removed from drupal:11.0.0. Use \Drupal\Core\Database\StatementWrapperIterator instead. See https://www.drupal.org/node/3265938', E_USER_DEPRECATED);
+
 /**
  * Implementation of StatementInterface encapsulating PDOStatement.
+ *
+ * @deprecated in drupal:10.1.0 and is removed from drupal:11.0.0. Use
+ *   \Drupal\Core\Database\StatementWrapperIterator instead.
+ *
+ * @see https://www.drupal.org/node/3265938
  */
 class StatementWrapper implements \IteratorAggregate, StatementInterface {
 
diff --git a/core/lib/Drupal/Core/Database/StatementWrapperIterator.php b/core/lib/Drupal/Core/Database/StatementWrapperIterator.php
new file mode 100644
index 000000000000..f334d76adab5
--- /dev/null
+++ b/core/lib/Drupal/Core/Database/StatementWrapperIterator.php
@@ -0,0 +1,303 @@
+<?php
+
+namespace Drupal\Core\Database;
+
+use Drupal\Core\Database\Event\StatementExecutionEndEvent;
+use Drupal\Core\Database\Event\StatementExecutionStartEvent;
+
+// cSpell:ignore maxlen driverdata INOUT
+
+/**
+ * StatementInterface iterator implementation.
+ *
+ * This class is meant to be generic enough for any type of database clients,
+ * even if all Drupal core database drivers currently use PDO clients. We
+ * implement \Iterator instead of \IteratorAggregate to allow iteration to be
+ * kept in sync with the underlying database resultset cursor. PDO is not able
+ * to execute a database operation while a cursor is open on the result of an
+ * earlier select query, so Drupal by default uses buffered queries setting
+ * \PDO::MYSQL_ATTR_USE_BUFFERED_QUERY to TRUE on the connection. This forces
+ * the query to return all the results in a buffer local to the client library,
+ * potentially leading to memory issues in case of large datasets being
+ * returned by a query. Other database clients, however, could allow
+ * multithread queries, or developers could disable buffered queries in PDO:
+ * in that case, this class prevents the resultset to be entirely fetched in
+ * PHP memory (that an \IteratorAggregate implementation would force) and
+ * therefore optimize memory usage while iterating the resultset.
+ */
+class StatementWrapperIterator implements \Iterator, StatementInterface {
+
+  use StatementIteratorTrait;
+
+  /**
+   * The client database Statement object.
+   *
+   * For a \PDO client connection, this will be a \PDOStatement object.
+   */
+  protected object $clientStatement;
+
+  /**
+   * Constructs a StatementWrapperIterator object.
+   *
+   * @param \Drupal\Core\Database\Connection $connection
+   *   Drupal database connection object.
+   * @param object $clientConnection
+   *   Client database connection object, for example \PDO.
+   * @param string $query
+   *   The SQL query string.
+   * @param array $options
+   *   Array of query options.
+   * @param bool $rowCountEnabled
+   *   (optional) Enables counting the rows matched. Defaults to FALSE.
+   */
+  public function __construct(
+    protected readonly Connection $connection,
+    object $clientConnection,
+    string $query,
+    array $options,
+    protected readonly bool $rowCountEnabled = FALSE,
+  ) {
+    $this->clientStatement = $clientConnection->prepare($query, $options);
+    $this->setFetchMode(\PDO::FETCH_OBJ);
+  }
+
+  /**
+   * Returns the client-level database statement object.
+   *
+   * This method should normally be used only within database driver code.
+   *
+   * @return object
+   *   The client-level database statement, for example \PDOStatement.
+   */
+  public function getClientStatement(): object {
+    return $this->clientStatement;
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function getConnectionTarget(): string {
+    return $this->connection->getTarget();
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function execute($args = [], $options = []) {
+    if (isset($options['fetch'])) {
+      if (is_string($options['fetch'])) {
+        // \PDO::FETCH_PROPS_LATE tells __construct() to run before properties
+        // are added to the object.
+        $this->setFetchMode(\PDO::FETCH_CLASS | \PDO::FETCH_PROPS_LATE, $options['fetch']);
+      }
+      else {
+        $this->setFetchMode($options['fetch']);
+      }
+    }
+
+    if ($this->connection->isEventEnabled(StatementExecutionStartEvent::class)) {
+      $startEvent = new StatementExecutionStartEvent(
+        spl_object_id($this),
+        $this->connection->getKey(),
+        $this->connection->getTarget(),
+        $this->getQueryString(),
+        $args ?? [],
+        $this->connection->findCallerFromDebugBacktrace()
+      );
+      $this->connection->dispatchEvent($startEvent);
+    }
+
+    $return = $this->clientStatement->execute($args);
+    $this->markResultsetIterable($return);
+
+    if (isset($startEvent) && $this->connection->isEventEnabled(StatementExecutionEndEvent::class)) {
+      $this->connection->dispatchEvent(new StatementExecutionEndEvent(
+        $startEvent->statementObjectId,
+        $startEvent->key,
+        $startEvent->target,
+        $startEvent->queryString,
+        $startEvent->args,
+        $startEvent->caller,
+        $startEvent->time
+      ));
+    }
+
+    return $return;
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function getQueryString() {
+    return $this->clientStatement->queryString;
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function fetchCol($index = 0) {
+    return $this->fetchAll(\PDO::FETCH_COLUMN, $index);
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function fetchAllAssoc($key, $fetch = NULL) {
+    if (isset($fetch)) {
+      if (is_string($fetch)) {
+        $this->setFetchMode(\PDO::FETCH_CLASS, $fetch);
+      }
+      else {
+        $this->setFetchMode($fetch);
+      }
+    }
+
+    // Return early if the statement was already fully traversed.
+    if (!$this->isResultsetIterable) {
+      return [];
+    }
+
+    // Once the foreach loop is completed, the resultset is marked so not to
+    // allow more fetching.
+    $return = [];
+    foreach ($this as $record) {
+      $recordKey = is_object($record) ? $record->$key : $record[$key];
+      $return[$recordKey] = $record;
+    }
+
+    return $return;
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function fetchAllKeyed($key_index = 0, $value_index = 1) {
+    $this->setFetchMode(\PDO::FETCH_NUM);
+
+    // Return early if the statement was already fully traversed.
+    if (!$this->isResultsetIterable) {
+      return [];
+    }
+
+    // Once the foreach loop is completed, the resultset is marked so not to
+    // allow more fetching.
+    $return = [];
+    foreach ($this as $record) {
+      $return[$record[$key_index]] = $record[$value_index];
+    }
+    return $return;
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function fetchField($index = 0) {
+    // Call \PDOStatement::fetchColumn to fetch the field.
+    $column = $this->clientStatement->fetchColumn($index);
+
+    if ($column === FALSE) {
+      $this->markResultsetFetchingComplete();
+      return FALSE;
+    }
+
+    $this->setResultsetCurrentRow($column);
+    return $column;
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function fetchAssoc() {
+    return $this->fetch(\PDO::FETCH_ASSOC);
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function fetchObject(string $class_name = NULL, array $constructor_arguments = NULL) {
+    if ($class_name) {
+      $row = $this->clientStatement->fetchObject($class_name, $constructor_arguments);
+    }
+    else {
+      $row = $this->clientStatement->fetchObject();
+    }
+
+    if ($row === FALSE) {
+      $this->markResultsetFetchingComplete();
+      return FALSE;
+    }
+
+    $this->setResultsetCurrentRow($row);
+    return $row;
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function rowCount() {
+    // SELECT query should not use the method.
+    if ($this->rowCountEnabled) {
+      return $this->clientStatement->rowCount();
+    }
+    else {
+      throw new RowCountException();
+    }
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function setFetchMode($mode, $a1 = NULL, $a2 = []) {
+    // Call \PDOStatement::setFetchMode to set fetch mode.
+    // \PDOStatement is picky about the number of arguments in some cases so we
+    // need to be pass the exact number of arguments we where given.
+    return match(func_num_args()) {
+      1 => $this->clientStatement->setFetchMode($mode),
+      2 => $this->clientStatement->setFetchMode($mode, $a1),
+      default => $this->clientStatement->setFetchMode($mode, $a1, $a2),
+    };
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function fetch($mode = NULL, $cursor_orientation = NULL, $cursor_offset = NULL) {
+    // Call \PDOStatement::fetchAll to fetch all rows.
+    // \PDOStatement is picky about the number of arguments in some cases so we
+    // need to pass the exact number of arguments we were given.
+    $row = match(func_num_args()) {
+      0 => $this->clientStatement->fetch(),
+      1 => $this->clientStatement->fetch($mode),
+      2 => $this->clientStatement->fetch($mode, $cursor_orientation),
+      default => $this->clientStatement->fetch($mode, $cursor_orientation, $cursor_offset),
+    };
+
+    if ($row === FALSE) {
+      $this->markResultsetFetchingComplete();
+      return FALSE;
+    }
+
+    $this->setResultsetCurrentRow($row);
+    return $row;
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function fetchAll($mode = NULL, $column_index = NULL, $constructor_arguments = NULL) {
+    // Call \PDOStatement::fetchAll to fetch all rows.
+    // \PDOStatement is picky about the number of arguments in some cases so we
+    // need to be pass the exact number of arguments we where given.
+    $return = match(func_num_args()) {
+      0 => $this->clientStatement->fetchAll(),
+      1 => $this->clientStatement->fetchAll($mode),
+      2 => $this->clientStatement->fetchAll($mode, $column_index),
+      default => $this->clientStatement->fetchAll($mode, $column_index, $constructor_arguments),
+    };
+
+    $this->markResultsetFetchingComplete();
+
+    return $return;
+  }
+
+}
diff --git a/core/modules/mysql/src/Driver/Database/mysql/Connection.php b/core/modules/mysql/src/Driver/Database/mysql/Connection.php
index 5062d5ebabbc..7c9fab2836d1 100644
--- a/core/modules/mysql/src/Driver/Database/mysql/Connection.php
+++ b/core/modules/mysql/src/Driver/Database/mysql/Connection.php
@@ -4,7 +4,7 @@
 
 use Drupal\Core\Database\DatabaseAccessDeniedException;
 use Drupal\Core\Database\DatabaseExceptionWrapper;
-use Drupal\Core\Database\StatementWrapper;
+use Drupal\Core\Database\StatementWrapperIterator;
 use Drupal\Core\Database\Database;
 use Drupal\Core\Database\DatabaseNotFoundException;
 use Drupal\Core\Database\DatabaseException;
@@ -50,7 +50,7 @@ class Connection extends DatabaseConnection implements SupportsTemporaryTablesIn
   /**
    * {@inheritdoc}
    */
-  protected $statementWrapperClass = StatementWrapper::class;
+  protected $statementWrapperClass = StatementWrapperIterator::class;
 
   /**
    * Flag to indicate if the cleanup function in __destruct() should run.
diff --git a/core/modules/pgsql/src/Driver/Database/pgsql/Connection.php b/core/modules/pgsql/src/Driver/Database/pgsql/Connection.php
index 8417ed831be9..a458b5d4e4e6 100644
--- a/core/modules/pgsql/src/Driver/Database/pgsql/Connection.php
+++ b/core/modules/pgsql/src/Driver/Database/pgsql/Connection.php
@@ -7,7 +7,7 @@
 use Drupal\Core\Database\DatabaseAccessDeniedException;
 use Drupal\Core\Database\DatabaseNotFoundException;
 use Drupal\Core\Database\StatementInterface;
-use Drupal\Core\Database\StatementWrapper;
+use Drupal\Core\Database\StatementWrapperIterator;
 use Drupal\Core\Database\SupportsTemporaryTablesInterface;
 
 // cSpell:ignore ilike nextval
@@ -43,7 +43,7 @@ class Connection extends DatabaseConnection implements SupportsTemporaryTablesIn
   /**
    * {@inheritdoc}
    */
-  protected $statementWrapperClass = StatementWrapper::class;
+  protected $statementWrapperClass = StatementWrapperIterator::class;
 
   /**
    * A map of condition operators to PostgreSQL operators.
diff --git a/core/modules/sqlite/src/Driver/Database/sqlite/Statement.php b/core/modules/sqlite/src/Driver/Database/sqlite/Statement.php
index 8151108ca536..a3842b213a94 100644
--- a/core/modules/sqlite/src/Driver/Database/sqlite/Statement.php
+++ b/core/modules/sqlite/src/Driver/Database/sqlite/Statement.php
@@ -2,8 +2,8 @@
 
 namespace Drupal\sqlite\Driver\Database\sqlite;
 
-use Drupal\Core\Database\StatementPrefetch;
 use Drupal\Core\Database\StatementInterface;
+use Drupal\Core\Database\StatementPrefetchIterator;
 
 /**
  * SQLite implementation of \Drupal\Core\Database\Statement.
@@ -14,7 +14,7 @@
  * user-space mock of PDOStatement that buffers all the data and doesn't
  * have those limitations.
  */
-class Statement extends StatementPrefetch implements StatementInterface {
+class Statement extends StatementPrefetchIterator implements StatementInterface {
 
   /**
    * {@inheritdoc}
@@ -26,7 +26,7 @@ class Statement extends StatementPrefetch implements StatementInterface {
    *
    * See http://bugs.php.net/bug.php?id=45259 for more details.
    */
-  protected function getStatement($query, &$args = []) {
+  protected function getStatement(string $query, ?array &$args = []): object {
     if (is_array($args) && !empty($args)) {
       // Check if $args is a simple numeric array.
       if (range(0, count($args) - 1) === array_keys($args)) {
@@ -79,7 +79,7 @@ protected function getStatement($query, &$args = []) {
       }
     }
 
-    return $this->pdoConnection->prepare($query);
+    return $this->clientConnection->prepare($query);
   }
 
   /**
diff --git a/core/tests/Drupal/KernelTests/Core/Database/FetchTest.php b/core/tests/Drupal/KernelTests/Core/Database/FetchTest.php
index 9368954cd2ec..8704425e2aae 100644
--- a/core/tests/Drupal/KernelTests/Core/Database/FetchTest.php
+++ b/core/tests/Drupal/KernelTests/Core/Database/FetchTest.php
@@ -180,6 +180,75 @@ public function testQueryFetchCol() {
     }
   }
 
+  /**
+   * Tests ::fetchAllAssoc().
+   */
+  public function testQueryFetchAllAssoc(): void {
+    $expected_result = [
+      "Singer" => [
+        "id" => "2",
+        "name" => "George",
+        "age" => "27",
+        "job" => "Singer",
+      ],
+      "Drummer" => [
+        "id" => "3",
+        "name" => "Ringo",
+        "age" => "28",
+        "job" => "Drummer",
+      ],
+    ];
+
+    $statement = $this->connection->query('SELECT * FROM {test} WHERE [age] > :age', [':age' => 26]);
+    $result = $statement->fetchAllAssoc('job', \PDO::FETCH_ASSOC);
+    $this->assertSame($expected_result, $result);
+
+    $statement = $this->connection->query('SELECT * FROM {test} WHERE [age] > :age', [':age' => 26]);
+    $result = $statement->fetchAllAssoc('job', \PDO::FETCH_OBJ);
+    $this->assertEquals((object) $expected_result['Singer'], $result['Singer']);
+    $this->assertEquals((object) $expected_result['Drummer'], $result['Drummer']);
+  }
+
+  /**
+   * Tests ::fetchField().
+   */
+  public function testQueryFetchField(): void {
+    $this->connection->insert('test')
+      ->fields([
+        'name' => 'Foo',
+        'age' => 0,
+        'job' => 'Dummy',
+      ])
+      ->execute();
+
+    $this->connection->insert('test')
+      ->fields([
+        'name' => 'Kurt',
+        'age' => 27,
+        'job' => 'Singer',
+      ])
+      ->execute();
+
+    $expectedResults = ['25', '27', '28', '26', '0', '27'];
+
+    $statement = $this->connection->select('test')
+      ->fields('test', ['age'])
+      ->orderBy('id')
+      ->execute();
+
+    $actualResults = [];
+    while (TRUE) {
+      $result = $statement->fetchField();
+      if ($result === FALSE) {
+        break;
+      }
+      $this->assertIsNumeric($result);
+      $actualResults[] = $result;
+    }
+
+    $this->assertSame($expectedResults, $actualResults);
+  }
+
   /**
    * Tests that rowCount() throws exception on SELECT query.
    */
diff --git a/core/tests/Drupal/KernelTests/Core/Database/StatementTest.php b/core/tests/Drupal/KernelTests/Core/Database/StatementTest.php
index eeb230466385..6f4f9449f271 100644
--- a/core/tests/Drupal/KernelTests/Core/Database/StatementTest.php
+++ b/core/tests/Drupal/KernelTests/Core/Database/StatementTest.php
@@ -43,4 +43,117 @@ public function testRepeatedInsertStatementReuse() {
     $this->assertSame('31', $this->connection->query('SELECT [age] FROM {test} WHERE [name] = :name', [':name' => 'Curly'])->fetchField());
   }
 
+  /**
+   * Tests statement fetching after a full traversal.
+   */
+  public function testIteratedStatementFetch(): void {
+    $statement = $this->connection->query('SELECT * FROM {test}');
+
+    foreach ($statement as $row) {
+      $this->assertNotNull($row);
+    }
+
+    $this->assertSame([], $statement->fetchAll());
+    $this->assertSame([], $statement->fetchAllKeyed());
+    $this->assertSame([], $statement->fetchAllAssoc('age'));
+    $this->assertSame([], $statement->fetchCol());
+
+    $this->assertFalse($statement->fetch());
+    $this->assertFalse($statement->fetchObject());
+    $this->assertFalse($statement->fetchAssoc());
+    $this->assertFalse($statement->fetchField());
+  }
+
+  /**
+   * Tests statement rewinding.
+   */
+  public function testStatementRewind(): void {
+    $statement = $this->connection->query('SELECT * FROM {test}');
+
+    foreach ($statement as $row) {
+      $this->assertNotNull($row);
+    }
+
+    // Trying to iterate through the same statement again should fail.
+    $this->expectError();
+    $this->expectErrorMessage('Attempted rewinding a StatementInterface object when fetching has already started. Refactor your code to avoid rewinding statement objects.');
+    foreach ($statement as $row) {
+      $this->assertNotNull($row);
+    }
+  }
+
+  /**
+   * Tests empty statement rewinding.
+   */
+  public function testEmptyStatementRewind(): void {
+    $statement = $this->connection->query('SELECT * FROM {test} WHERE 1 = 0');
+
+    $count = 0;
+
+    foreach ($statement as $row) {
+      $count++;
+    }
+    foreach ($statement as $row) {
+      $count++;
+    }
+
+    $this->assertEquals(0, $count);
+  }
+
+  /**
+   * Tests counting a statement twice.
+   *
+   * We need to use iterator_count() and not assertCount() since the latter
+   * would rewind the statement twice anyway.
+   */
+  public function testStatementCountTwice(): void {
+    $statement = $this->connection->query('SELECT * FROM {test}');
+    $rowCount = iterator_count($statement);
+    $this->assertSame(4, $rowCount);
+
+    $this->expectError();
+    $this->expectErrorMessage('Attempted rewinding a StatementInterface object when fetching has already started. Refactor your code to avoid rewinding statement objects.');
+    $rowCount = iterator_count($statement);
+  }
+
+  /**
+   * Tests statement with no results counting twice.
+   *
+   * We need to use iterator_count() and not assertCount() since the latter
+   * would rewind the statement twice anyway.
+   */
+  public function testEmptyStatementCountTwice(): void {
+    $statement = $this->connection->query('SELECT * FROM {test} WHERE 1 = 0');
+    $rowCount = iterator_count($statement);
+    $this->assertSame(0, $rowCount);
+    $rowCount = iterator_count($statement);
+    $this->assertSame(0, $rowCount);
+  }
+
+  /**
+   * Tests a follow-on iteration on a statement using foreach.
+   */
+  public function testStatementForeachTwice(): void {
+    $statement = $this->connection->query('SELECT * FROM {test}');
+
+    $count = 0;
+    foreach ($statement as $row) {
+      $count++;
+      $this->assertNotNull($row);
+      if ($count > 2) {
+        break;
+      }
+    }
+    $this->assertSame(3, $count);
+
+    // Restart iterating through the same statement. The foreach loop will try
+    // rewinding the statement which should fail, and the counter should not be
+    // increased.
+    $this->expectError();
+    $this->expectErrorMessage('Attempted rewinding a StatementInterface object when fetching has already started. Refactor your code to avoid rewinding statement objects.');
+    foreach ($statement as $row) {
+      // No-op.
+    }
+  }
+
 }
diff --git a/core/tests/Drupal/Tests/Core/Database/ConnectionTest.php b/core/tests/Drupal/Tests/Core/Database/ConnectionTest.php
index f0d0bc766066..45e8f49e4c51 100644
--- a/core/tests/Drupal/Tests/Core/Database/ConnectionTest.php
+++ b/core/tests/Drupal/Tests/Core/Database/ConnectionTest.php
@@ -4,6 +4,7 @@
 
 use Composer\Autoload\ClassLoader;
 use Drupal\Core\Database\Database;
+use Drupal\Core\Database\StatementPrefetch;
 use Drupal\Tests\Core\Database\Stub\StubConnection;
 use Drupal\Tests\Core\Database\Stub\StubPDO;
 use Drupal\Tests\UnitTestCase;
@@ -662,7 +663,7 @@ public function testFindCallerFromDebugBacktrace() {
     $result = $connection->findCallerFromDebugBacktrace();
     $this->assertSame([
       'file' => __FILE__,
-      'line' => 662,
+      'line' => 663,
       'function' => 'testFindCallerFromDebugBacktrace',
       'class' => 'Drupal\Tests\Core\Database\ConnectionTest',
       'type' => '->',
@@ -866,4 +867,30 @@ public function providerMockedBacktrace(): array {
     ];
   }
 
+  /**
+   * Tests deprecation of the StatementWrapper class.
+   *
+   * @group legacy
+   */
+  public function testStatementWrapperDeprecation() {
+    $this->expectDeprecation('\\Drupal\\Core\\Database\\StatementWrapper is deprecated in drupal:10.1.0 and is removed from drupal:11.0.0. Use \\Drupal\\Core\\Database\\StatementWrapperIterator instead. See https://www.drupal.org/node/3265938');
+    $mock_pdo = $this->createMock(StubPDO::class);
+    $connection = new StubConnection($mock_pdo, []);
+    $this->expectError();
+    $connection->prepareStatement('boing', []);
+  }
+
+  /**
+   * Tests deprecation of the StatementPrefetch class.
+   *
+   * @group legacy
+   */
+  public function testStatementPrefetchDeprecation() {
+    $this->expectDeprecation('\\Drupal\\Core\\Database\\StatementPrefetch is deprecated in drupal:10.1.0 and is removed from drupal:11.0.0. Use \Drupal\Core\Database\StatementPrefetchIterator instead. See https://www.drupal.org/node/3265938');
+    $mockPdo = $this->createMock(StubPDO::class);
+    $mockConnection = new StubConnection($mockPdo, []);
+    $statement = new StatementPrefetch($mockPdo, $mockConnection, '');
+    $this->assertInstanceOf(StatementPrefetch::class, $statement);
+  }
+
 }
-- 
GitLab