Commit 0dd16127 authored by webchick's avatar webchick

#629794 by yched: Fix Scaling issues with batch API. (with tests)

parent e07b9d35
......@@ -6,7 +6,7 @@
* @file
* Batch processing API for processes to run in multiple HTTP requests.
*
* Please note that batches are usually invoked by form submissions, which is
* Note that batches are usually invoked by form submissions, which is
* why the core interaction functions of the batch processing API live in
* form.inc.
*
......@@ -62,8 +62,10 @@ function _batch_page() {
// Add batch-specific CSS.
foreach ($batch['sets'] as $batch_set) {
foreach ($batch_set['css'] as $css) {
drupal_add_css($css);
if (isset($batch_set['css'])) {
foreach ($batch_set['css'] as $css) {
drupal_add_css($css);
}
}
}
......@@ -252,6 +254,12 @@ function _batch_process() {
timer_start('batch_processing');
}
if (empty($current_set['start'])) {
$current_set['start'] = microtime(TRUE);
}
$queue = _batch_queue($current_set);
while (!$current_set['success']) {
// If this is the first time we iterate this batch set in the current
// request, we check if it requires an additional file for functions
......@@ -261,42 +269,49 @@ function _batch_process() {
}
$task_message = '';
// We assume a single pass operation and set the completion level to 1 by
// Assume a single pass operation and set the completion level to 1 by
// default.
$finished = 1;
if ((list($function, $args) = reset($current_set['operations'])) && function_exists($function)) {
// Build the 'context' array, execute the function call, and retrieve the
// user message.
if ($item = $queue->claimItem()) {
list($function, $args) = $item->data;
// Build the 'context' array and execute the function call.
$batch_context = array(
'sandbox' => &$current_set['sandbox'],
'results' => &$current_set['results'],
'finished' => &$finished,
'message' => &$task_message,
);
// Process the current operation.
call_user_func_array($function, array_merge($args, array(&$batch_context)));
}
if ($finished == 1) {
// Make sure this step is not counted twice when computing $current.
$finished = 0;
// Remove the processed operation and clear the sandbox.
array_shift($current_set['operations']);
$current_set['sandbox'] = array();
if ($finished == 1) {
// Make sure this step is not counted twice when computing $current.
$finished = 0;
// Remove the processed operation and clear the sandbox.
$queue->deleteItem($item);
$current_set['count']--;
$current_set['sandbox'] = array();
}
}
// When all operations in the current batch set are completed, browse
// through the remaining sets until we find a set that contains operations.
// Note that _batch_next_set() executes stored form submit handlers in
// remaining batch sets, which can add new sets to the batch.
// through the remaining sets, marking them 'successfully processed'
// along the way, until we find a set that contains operations.
// _batch_next_set() executes form submit handlers stored in 'control'
// sets (see form_execute_handlers()), which can in turn add new sets to
// the batch.
$set_changed = FALSE;
$old_set = $current_set;
while (empty($current_set['operations']) && ($current_set['success'] = TRUE) && _batch_next_set()) {
while (empty($current_set['count']) && ($current_set['success'] = TRUE) && _batch_next_set()) {
$current_set = &_batch_current_set();
$current_set['start'] = microtime(TRUE);
$set_changed = TRUE;
}
// At this point, either $current_set contains operations that need to be
// processed or all sets have been completed.
$queue = _batch_queue($current_set);
// If we are in progressive mode, break processing after 1 second.
if ($batch['progressive'] && timer_read('batch_processing') > 1000) {
......@@ -312,33 +327,31 @@ function _batch_process() {
// Reporting 100% progress will cause the whole batch to be considered
// processed. If processing was paused right after moving to a new set,
// we have to use the info from the new (unprocessed) set.
if ($set_changed && isset($current_set['operations'])) {
if ($set_changed && isset($current_set['queue'])) {
// Processing will continue with a fresh batch set.
$remaining = count($current_set['operations']);
$remaining = $current_set['count'];
$total = $current_set['total'];
$progress_message = $current_set['init_message'];
$task_message = '';
}
else {
// Processing will continue with the current batch set.
$remaining = count($old_set['operations']);
$remaining = $old_set['count'];
$total = $old_set['total'];
$progress_message = $old_set['progress_message'];
}
$current = $total - $remaining + $finished;
$current = $total - $remaining + $finished;
$percentage = _batch_api_percentage($total, $current);
$elapsed = $current_set['elapsed'];
// Estimate remaining with percentage in floating format.
$estimate = $elapsed * ($total - $current) / $current;
$values = array(
'@remaining' => $remaining,
'@total' => $total,
'@current' => floor($current),
'@percentage' => $percentage,
'@elapsed' => format_interval($elapsed / 1000),
'@estimate' => format_interval($estimate / 1000),
// If possible, estimate remaining processing time.
'@estimate' => ($current > 0) ? format_interval(($elapsed * ($total - $current) / $current) / 1000) : '-',
);
$message = strtr($progress_message, $values);
if (!empty($message)) {
......@@ -410,7 +423,7 @@ function _batch_next_set() {
if (isset($current_set['form_submit']) && ($function = $current_set['form_submit']) && function_exists($function)) {
// We use our stored copies of $form and $form_state to account for
// possible alterations by previous form submit handlers.
$function($batch['form'], $batch['form_state']);
$function($batch['form_state']['complete form'], $batch['form_state']);
}
return TRUE;
}
......@@ -426,15 +439,16 @@ function _batch_finished() {
$batch = &batch_get();
// Execute the 'finished' callbacks for each batch set, if defined.
foreach ($batch['sets'] as $key => $batch_set) {
foreach ($batch['sets'] as $batch_set) {
if (isset($batch_set['finished'])) {
// Check if the set requires an additional file for function definitions.
if (isset($batch_set['file']) && is_file($batch_set['file'])) {
include_once DRUPAL_ROOT . '/' . $batch_set['file'];
}
if (function_exists($batch_set['finished'])) {
// Format the elapsed time when batch complete.
$batch_set['finished']($batch_set['success'], $batch_set['results'], $batch_set['operations'], format_interval($batch_set['elapsed'] / 1000));
$queue = _batch_queue($batch_set);
$operations = $queue->getAllItems();
$batch_set['finished']($batch_set['success'], $batch_set['results'], $operations, format_interval($batch_set['elapsed'] / 1000));
}
}
}
......@@ -444,6 +458,11 @@ function _batch_finished() {
db_delete('batch')
->condition('bid', $batch['id'])
->execute();
foreach ($batch['sets'] as $batch_set) {
if ($queue = _batch_queue($batch_set)) {
$queue->deleteQueue();
}
}
}
$_batch = $batch;
$batch = NULL;
......
<?php
// $Id$
/**
* @file
* Queue handlers used by the Batch API.
*
* Those implementations:
* - ensure FIFO ordering,
* - let an item be repeatedly claimed until it is actually deleted (no notion
* of lease time or 'expire' date), to allow multipass operations.
*/
/**
* Batch queue implementation.
*
* Stale items from failed batches are cleaned from the {queue} table on cron
* using the 'created' date.
*/
class BatchQueue extends SystemQueue {
public function claimItem($lease_time = 0) {
$item = db_query('SELECT data, item_id FROM {queue} q WHERE name = :name ORDER BY item_id ASC', array(':name' => $this->name))->fetchObject();
if ($item) {
$item->data = unserialize($item->data);
return $item;
}
return FALSE;
}
/**
* Retrieve all remaining items in the queue.
*
* This is specific to Batch API and is not part of the DrupalQueueInterface,
*/
public function getAllItems() {
$result = array();
$items = db_query('SELECT data FROM {queue} q WHERE name = :name ORDER BY item_id ASC', array(':name' => $this->name))->fetchAll();
foreach ($items as $item) {
$result[] = unserialize($item->data);
}
return $result;
}
}
/**
* Batch queue implementation used for non-progressive batches.
*/
class BatchMemoryQueue extends MemoryQueue {
public function claimItem($lease_time = 0) {
if (!empty($this->queue)) {
reset($this->queue);
return current($this->queue);
}
return FALSE;
}
/**
* Retrieve all remaining items in the queue.
*
* This is specific to Batch API and is not part of the DrupalQueueInterface,
*/
public function getAllItems() {
$result = array();
foreach ($this->queue as $item) {
$result[] = $item->data;
}
return $result;
}
}
This diff is collapsed.
......@@ -338,6 +338,51 @@ function update_fix_d7_requirements() {
db_create_table('date_formats', $schema['date_formats']);
db_create_table('date_format_locale', $schema['date_format_locale']);
// Add the queue table.
$schema['queue'] = array(
'description' => 'Stores items in queues.',
'fields' => array(
'item_id' => array(
'type' => 'serial',
'unsigned' => TRUE,
'not null' => TRUE,
'description' => 'Primary Key: Unique item ID.',
),
'name' => array(
'type' => 'varchar',
'length' => 255,
'not null' => TRUE,
'default' => '',
'description' => 'The queue name.',
),
'data' => array(
'type' => 'text',
'not null' => FALSE,
'size' => 'big',
'serialize' => TRUE,
'description' => 'The arbitrary data for the item.',
),
'expire' => array(
'type' => 'int',
'not null' => TRUE,
'default' => 0,
'description' => 'Timestamp when the claim lease expires on the item.',
),
'created' => array(
'type' => 'int',
'not null' => TRUE,
'default' => 0,
'description' => 'Timestamp when the item was created.',
),
),
'primary key' => array('item_id'),
'indexes' => array(
'name_created' => array('name', 'created'),
'expire' => array('expire'),
),
);
db_create_table('queue', $schema['queue']);
// Add column for locale context.
if (db_table_exists('locales_source')) {
db_add_field('locales_source', 'context', array('type' => 'varchar', 'length' => 255, 'not null' => TRUE, 'default' => '', 'description' => 'The context this string applies to.'));
......
This diff is collapsed.
<?php
// $Id$
/**
* @file
* Batch callbacks for the Batch API tests.
*/
/**
* Simple batch operation.
*/
function _batch_test_callback_1($id, $sleep, &$context) {
// No-op, but ensure the batch take a couple iterations.
usleep($sleep);
// Track execution, and store some result for post-processing in the
// 'finished' callback.
batch_test_stack("op 1 id $id");
$context['results'][1][] = $id;
}
/**
* Multistep batch operation.
*/
function _batch_test_callback_2($start, $total, $sleep, &$context) {
// Initialize context with progress information.
if (!isset($context['sandbox']['current'])) {
$context['sandbox']['current'] = $start;
$context['sandbox']['count'] = 0;
}
// Process by groups of 5 (arbitrary value).
$limit = 5;
for ($i = 0; $i < $limit && $context['sandbox']['count'] < $total; $i++) {
// No-op, but ensure the batch take a couple iterations.
usleep($sleep);
// Track execution, and store some result for post-processing in the
// 'finished' callback.
$id = $context['sandbox']['current'] + $i;
batch_test_stack("op 2 id $id");
$context['results'][2][] = $id;
// Update progress information.
$context['sandbox']['count']++;
}
$context['sandbox']['current'] += $i;
// Inform batch engine about progress.
if ($context['sandbox']['count'] != $total) {
$context['finished'] = $context['sandbox']['count'] / $total;
}
}
/**
* Batch operation setting up its own batch.
*/
function _batch_test_nested_batch_callback() {
batch_test_stack('setting up batch 2');
batch_set(_batch_test_batch_2());
}
/**
* Common 'finished' callbacks for batches 1 to 4.
*/
function _batch_test_finished_helper($batch_id, $success, $results, $operations) {
$messages = array("results for batch $batch_id");
if ($results) {
foreach ($results as $op => $op_results) {
$messages[] = 'op '. $op . ': processed ' . count($op_results) . ' elements';
}
}
else {
$messages[] = 'none';
}
if (!$success) {
// A fatal error occurred during the processing.
$error_operation = reset($operations);
$messages[] = t('An error occurred while processing @op with arguments:<br/>@args', array('@op' => $error_operation[0], '@args' => print_r($error_operation[1], TRUE)));
}
drupal_set_message(implode('<br />', $messages));
}
/**
* 'finished' callback for batch 0.
*/
function _batch_test_finished_0($success, $results, $operations) {
_batch_test_finished_helper(0, $success, $results, $operations);
}
/**
* 'finished' callback for batch 1.
*/
function _batch_test_finished_1($success, $results, $operations) {
_batch_test_finished_helper(1, $success, $results, $operations);
}
/**
* 'finished' callback for batch 2.
*/
function _batch_test_finished_2($success, $results, $operations) {
_batch_test_finished_helper(2, $success, $results, $operations);
}
/**
* 'finished' callback for batch 3.
*/
function _batch_test_finished_3($success, $results, $operations) {
_batch_test_finished_helper(3, $success, $results, $operations);
}
/**
* 'finished' callback for batch 4.
*/
function _batch_test_finished_4($success, $results, $operations) {
_batch_test_finished_helper(4, $success, $results, $operations);
}
; $Id$
name = "Batch API test"
description = "Support module for Batch API tests."
package = Testing
version = VERSION
core = 7.x
files[] = batch_test.module
files[] = batch_test.callbacks.inc
hidden = TRUE
This diff is collapsed.
......@@ -511,45 +511,6 @@ class FormsElementsTableSelectFunctionalTest extends DrupalWebTestCase {
}
/**
* Test using drupal_form_submit in a batch.
*/
class FormAPITestCase extends DrupalWebTestCase {
public static function getInfo() {
return array(
'name' => 'Drupal Execute and Batch API',
'description' => 'Tests the compatibility of drupal_form_submit and the Batch API',
'group' => 'Form API',
);
}
/**
* Check that we can run drupal_form_submit during a batch.
*/
function testDrupalFormSubmitInBatch() {
// Our test is going to modify the following variable.
variable_set('form_test_mock_submit', 'initial_state');
// This is a page that sets a batch, which calls drupal_form_submit, which
// modifies the variable we set up above.
$this->drupalGet('form_test/drupal_form_submit_batch_api');
// If the drupal_form_submit call executed correctly our test variable will be
// set to 'form_submitted'.
$this->assertEqual('form_submitted', variable_get('form_test_mock_submit', 'initial_state'), t('Check drupal_form_submit called submit handlers when running in a batch'));
// Clean our variable up.
variable_del('form_test_mock_submit');
}
function setUp() {
parent::setUp('form_test');
}
}
/**
* Test the form storage on a multistep form.
*
......
......@@ -54,13 +54,6 @@ function form_test_menu() {
'type' => MENU_CALLBACK,
);
$items['form_test/drupal_form_submit_batch_api'] = array(
'title' => 'BatchAPI Drupal_form_submit tests',
'page callback' => 'form_test_drupal_form_submit_batch_api',
'access arguments' => array('access content'),
'type' => MENU_CALLBACK,
);
$items['form_test/form-storage'] = array(
'title' => 'Form storage test',
'page callback' => 'drupal_get_form',
......@@ -375,67 +368,6 @@ function _form_test_tableselect_js_select_form($form, $form_state, $action) {
return _form_test_tableselect_form_builder($form, $form_state, $options);
}
/**
* Page callback for the batch/drupal_form_submit interaction test.
*
* When called without any arguments we set up a batch that calls
* form_test_batch_callback. That function will submit a form using
* drupal_form_submit using the values specified in this function.
*
* The form's field test_value begins at 'initial_value', and is changed
* to 'form_submitted' when the form is submitted successfully. On
* completion this function is passed 'done' to complete the process.
*/
function form_test_drupal_form_submit_batch_api($arg = '') {
// If we're at the end of the batch process, return.
if ($arg == 'done') {
return t('Done');
}
// Otherwise set up the batch.
$batch['operations'] = array(
array('form_test_batch_callback', array('form_submitted')),
);
// Set the batch and process it.
batch_set($batch);
batch_process('form_test/drupal_form_submit_batch_api/done');
}
/**
* Submits form_test_mock_form using drupal_form_submit using the given $value.
*/
function form_test_batch_callback($value) {
$state['values']['test_value'] = $value;
drupal_form_submit('form_test_mock_form', $state);
}
/**
* A simple form with a textfield and submit button.
*/
function form_test_mock_form($form, $form_state) {
$form['test_value'] = array(
'#type' => 'textfield',
'#default_value' => 'initial_state',
);
$form['submit'] = array(
'#type' => 'submit',
'#value' => t('Submit'),
);
return $form;
}
/**
* Form submission callback.
*
* Updates the variable 'form_test_mock_submit' to the submitted form value.
*/
function form_test_mock_form_submit($form, &$form_state) {
variable_set('form_test_mock_submit', $form_state['values']['test_value']);
}
/**
* A multistep form for testing the form storage.
*
......
......@@ -5,11 +5,6 @@
* Implements hook_menu().
*/
function system_test_menu() {
$items['admin/system-test/batch-theme'] = array(
'page callback' => 'system_test_batch_theme',
'access callback' => TRUE,
'type' => MENU_CALLBACK,
);
$items['system-test/sleep/%'] = array(
'page callback' => 'system_test_sleep',
'page arguments' => array(2),
......@@ -102,34 +97,6 @@ function system_test_menu() {
return $items;
}
/**
* Menu callback; start a new batch for testing the batch progress page theme.
*/
function system_test_batch_theme() {
$batch = array(
'operations' => array(
array('system_test_batch_theme_callback', array()),
),
);
batch_set($batch);
// Force the batch to redirect to some page other than this one (to avoid an
// infinite loop).
batch_process('node');
}
/**
* Batch callback function for testing the theme used by a batch.
*/
function system_test_batch_theme_callback() {
// Because drupalGet() steps through the full progressive batch before
// returning control to the test function, we cannot test that the correct
// theme is being used on the batch processing page by viewing that page
// directly. Instead, we save the theme being used in a variable here, so
// that it can be loaded and inspected in the thread running the test.
global $theme;
variable_set('system_test_batch_theme_used', $theme);
}
function system_test_sleep($seconds) {
sleep($seconds);
}
......
......@@ -496,7 +496,9 @@ function system_schema() {
'fields' => array(
'bid' => array(
'description' => 'Primary Key: Unique batch ID.',
'type' => 'serial',
// This is not a serial column, to allow both progressive and
// non-progressive batches. See batch_process().
'type' => 'int',
'unsigned' => TRUE,
'not null' => TRUE,
),
......@@ -2257,50 +2259,7 @@ function system_update_7021() {
* Add the queue tables.
*/
function system_update_7022() {
$schema['queue'] = array(
'description' => 'Stores items in queues.',
'fields' => array(
'item_id' => array(
'type' => 'serial',
'unsigned' => TRUE,
'not null' => TRUE,
'description' => 'Primary Key: Unique item ID.',
),
'name' => array(
'type' => 'varchar',
'length' => 255,
'not null' => TRUE,
'default' => '',
'description' => 'The queue name.',
),
'data' => array(
'type' => 'text',
'not null' => FALSE,
'size' => 'big',
'serialize' => TRUE,
'description' => 'The arbitrary data for the item.',
),
'expire' => array(
'type' => 'int',
'not null' => TRUE,
'default' => 0,
'description' => 'Timestamp when the claim lease expires on the item.',
),
'created' => array(
'type' => 'int',
'not null' => TRUE,
'default' => 0,
'description' => 'Timestamp when the item was created.',
),
),
'primary key' => array('item_id'),
'indexes' => array(
'name_created' => array('name', 'created'),
'expire' => array('expire'),
),
);
db_create_table('queue', $schema['queue']);
// Moved to update_fix_d7_requirements().
}
/**
......@@ -2727,6 +2686,13 @@ function system_update_7049() {
}
}
/**
* Change {batch}.id column from serial to regular int.
*/
function system_update_7050() {
db_change_field('batch', 'bid', 'bid', array('description' => 'Primary Key: Unique batch ID.', 'type' => 'int', 'unsigned' => TRUE, 'not null' => TRUE));
}
/**
* @} End of "defgroup updates-6.x-to-7.x"
* The next series of updates should start at 8000.
......
......@@ -2695,10 +2695,6 @@ function system_cron() {
db_delete('flood')
->condition('expiration', REQUEST_TIME, '<')
->execute();
// Cleanup the batch table.
db_delete('batch')
->condition('timestamp', REQUEST_TIME - 864000, '<')
->execute();
// Remove temporary files that are older than DRUPAL_MAXIMUM_TEMP_FILE_AGE.
// Use separate placeholders for the status to avoid a bug in some versions
......@@ -2722,6 +2718,15 @@ function system_cron() {
cache_clear_all(NULL, $table);
}
// Cleanup the batch table and the queue for failed batches.
db_delete('batch')
->condition('timestamp', REQUEST_TIME - 864000, '<')
->execute();
db_delete('queue')
->condition('created', REQUEST_TIME - 864000, '<')
->condition('name', 'drupal_batch:%', 'LIKE')
->execute();
// Reset expired items in the default queue implementation table. If that's
// not used, this will simply be a no-op.
db_update('queue')
......
......@@ -253,6 +253,79 @@ public function deleteQueue() {
}
}
/**
* Static queue implementation.
*
* This allows "undelayed" variants of processes relying on the Queue
* interface. The queue data resides in memory. It should only be used for
* items that will be queued and dequeued within a given page request.
*/
class MemoryQueue implements DrupalQueueInterface {
/**
* The queue data.
*
* @var array
*/