From 7fdaf9b5748c9e8864a9a3a66616a0108a36ee05 Mon Sep 17 00:00:00 2001 From: Aaron Bauman <aaron@messageagency.com> Date: Thu, 29 Dec 2016 12:55:51 -0500 Subject: [PATCH] by aaronbauman - Rest async push is basically working now --- .../salesforce_mapping.module | 23 +- .../salesforce_push/salesforce_push.install | 10 + .../salesforce_push/salesforce_push.module | 11 +- .../salesforce_push.services.yml | 8 +- .../SalesforcePushQueueProcessor/Rest.php | 74 +++++ modules/salesforce_push/src/PushQueue.php | 253 ++++++++++-------- .../src/PushQueueProcessorPluginManager.php | 19 ++ src/EntityNotFoundException.php | 11 + 8 files changed, 285 insertions(+), 124 deletions(-) create mode 100644 modules/salesforce_push/src/Plugin/SalesforcePushQueueProcessor/Rest.php create mode 100644 modules/salesforce_push/src/PushQueueProcessorPluginManager.php create mode 100644 src/EntityNotFoundException.php diff --git a/modules/salesforce_mapping/salesforce_mapping.module b/modules/salesforce_mapping/salesforce_mapping.module index 373b476c..0b9ca814 100644 --- a/modules/salesforce_mapping/salesforce_mapping.module +++ b/modules/salesforce_mapping/salesforce_mapping.module @@ -13,6 +13,7 @@ use \Drupal\Core\Entity\Display\EntityViewDisplayInterface; // Not sure if we'll actually need these, since entity API seems to provide everything: use Drupal\salesforce_mapping\Entity\SalesforceMapping; use Drupal\salesforce_mapping\Entity\MappedObject; +use Drupal\salesforce\EntityNotFoundException; /** * Define when a data sync should take place for a given mapping. @@ -94,14 +95,14 @@ function salesforce_mapping_entity_operation(EntityInterface $entity) { * The requested mapping or an array of all mappings, indexed by id, if $name * was not specified * - * @throws Exception if no mapping is found + * @throws EntityNotFoundException if no mapping exists with the given name */ function salesforce_mapping_load($name) { $mapping = \Drupal::entityTypeManager() ->getStorage('salesforce_mapping') ->load($name); if (empty($mapping)) { - throw new Exception("No mapping found for $name."); + throw new EntityNotFoundException("No mapping found for $name."); } return $mapping; } @@ -117,16 +118,16 @@ function salesforce_mapping_load($name) { * @return array * An array of \Drupal\salesforce_mapping\Entity\SalesforceMapping objects, indexed by id * - * @throws Exception if not mappings are found + * @throws Exception if not mappings exist with the given properties */ function salesforce_mapping_load_multiple($properties = []) { $mappings = []; $mappings = \Drupal::entityTypeManager() ->getStorage('salesforce_mapping') ->loadByProperties($properties); - //if (empty($mappings)) { - // throw new Exception('No mappings found.'); - //} + if (empty($mappings)) { + throw new EntityNotFoundException('No mappings found.'); + } return $mappings; } @@ -149,16 +150,18 @@ function salesforce_mapping_load_by_drupal($entity_type) { * is necessary, it should be done explicitly by the caller. * * @return array - * An array of \Drupal\salesforce_mapping\Entity\MappedObject objects, indexed by id + * An array of \Drupal\salesforce_mapping\Entity\MappedObject objects, + * indexed by id * - * @throws Exception if not mapped objects are found + * @throws EntityNotFoundException if no mapped objects exist with the given + * properties */ function salesforce_mapped_object_load_multiple($properties = []) { $mappings = \Drupal::entityTypeManager() ->getStorage('salesforce_mapped_object') ->loadByProperties($properties); if (empty($mappings)) { - throw new Exception('No mapped objects found'); + throw new EntityNotFoundException('No mapped objects found'); } return $mappings; } @@ -193,7 +196,7 @@ function salesforce_mapped_object_load_by_sfid($salesforce_id) { /** * Return a unique list of mapped Salesforce object types. * @see salesforce_mapping_load_multiple() - * @throws Exception if no mappings have been created yet. + * @throws EntityNotFoundException if no mappings have been created yet. */ function salesforce_mapping_get_mapped_objects() { $object_types = []; diff --git a/modules/salesforce_push/salesforce_push.install b/modules/salesforce_push/salesforce_push.install index e17fed72..5ab50887 100644 --- a/modules/salesforce_push/salesforce_push.install +++ b/modules/salesforce_push/salesforce_push.install @@ -7,6 +7,8 @@ use Drupal\salesforce_push\PushQueue; */ function salesforce_push_install() { \Drupal::state()->set('salesforce.push_limit', PushQueue::DEFAULT_CRON_PUSH_LIMIT); + \Drupal::state()->set('salesforce.push_queue_processor', PushQueue::DEFAULT_QUEUE_PROCESSOR); + \Drupal::state()->set('salesforce.push_queue_max_fails', PushQueue::DEFAULT_MAX_FAILS); } /** @@ -14,4 +16,12 @@ function salesforce_push_install() { */ function salesforce_push_uninstall() { \Drupal::state()->delete('salesforce.push_limit'); + \Drupal::state()->delete('salesforce.push_queue_processor'); + \Drupal::state()->delete('salesforce.push_queue_max_fails'); } + +function salesforce_push_update_1() { + \Drupal::state()->set('salesforce.push_queue_processor', 'rest'); + \Drupal::state()->set('salesforce.push_queue_max_fails', 10); +} + diff --git a/modules/salesforce_push/salesforce_push.module b/modules/salesforce_push/salesforce_push.module index 7df5978e..c33f1e86 100644 --- a/modules/salesforce_push/salesforce_push.module +++ b/modules/salesforce_push/salesforce_push.module @@ -186,7 +186,7 @@ function salesforce_push_enqueue_async(EntityInterface $entity, SalesforceMappin // for batching. Each queue item is a unique array of entity ids to be // pushed. The async queue worker loads the queue item and works through as // many entities as possible, up to the async limit for this mapping. - \Drupal::service('queue.salesforce')->createItem([ + \Drupal::service('queue.salesforce_push')->createItem([ 'name' => $mapping->id(), 'entity_id' => $entity->id(), 'op' => $op, @@ -197,5 +197,12 @@ function salesforce_push_enqueue_async(EntityInterface $entity, SalesforceMappin * Implements hook_cron(). */ function salesforce_push_cron() { - \Drupal::service('queue.salesforce')->processQueues(); + $queue = \Drupal::service('queue.salesforce_push'); + try { + $queue->processQueues(); + } + catch (Exception $e) { + watchdog_exception('Salesforce Push', $e); + } + $queue->garbageCollection(); } diff --git a/modules/salesforce_push/salesforce_push.services.yml b/modules/salesforce_push/salesforce_push.services.yml index 11fe8b50..f0ca0bb5 100644 --- a/modules/salesforce_push/salesforce_push.services.yml +++ b/modules/salesforce_push/salesforce_push.services.yml @@ -1,4 +1,8 @@ services: - queue.salesforce: + plugin.manager.salesforce_push_queue_processor: + class: Drupal\salesforce_push\PushQueueProcessorPluginManager + arguments: ['@container.namespaces', '@cache.discovery', '@module_handler'] + + queue.salesforce_push: class: Drupal\salesforce_push\PushQueue - arguments: ['@database', '@state'] + arguments: ['@database', '@state', '@plugin.manager.salesforce_push_queue_processor'] diff --git a/modules/salesforce_push/src/Plugin/SalesforcePushQueueProcessor/Rest.php b/modules/salesforce_push/src/Plugin/SalesforcePushQueueProcessor/Rest.php new file mode 100644 index 00000000..82837dd6 --- /dev/null +++ b/modules/salesforce_push/src/Plugin/SalesforcePushQueueProcessor/Rest.php @@ -0,0 +1,74 @@ +<?php + +namespace Drupal\salesforce_push\Plugin\SalesforcePushQueueProcessor; + +use Drupal\Core\Plugin\ContainerFactoryPluginInterface; +use Drupal\Core\Plugin\PluginBase; +use Drupal\Core\Queue\SuspendQueueException; +use Drupal\salesforce\EntityNotFoundException; +use Drupal\salesforce\Rest\RestClient; +use Drupal\salesforce_push\PushQueue; +use Symfony\Component\DependencyInjection\ContainerInterface; + +/** + * Rest queue processor plugin. + * + * @Plugin( + * id = "rest", + * label = @Translation("REST Push Queue Processor") + * ) + */ +class Rest extends PluginBase implements ContainerFactoryPluginInterface { + protected $queue; + protected $client; + public function __construct(array $configuration, $plugin_id, array $plugin_definition, PushQueue $queue, RestClient $client) { + $this->queue = $queue; + $this->client = $client; + } + + /** + * {@inheritdoc} + */ + public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) { + return new static($configuration, $plugin_id, $plugin_definition, + $container->get('queue.salesforce_push'), + $container->get('salesforce.client') + ); + } + + public function process(array $items) { + if (!$this->client->isAuthorized()) { + throw new SuspendQueueException('Salesforce client not authorized.'); + } + foreach ($items as $item) { + try { + $this->processItem($item); + $this->queue->deleteItem($item); + } + catch (\Exception $e) { + $this->queue->failItem($item, $e); + } + } + } + + protected function processItem(\stdClass $item) { + $mapping = salesforce_mapping_load($item->name); + + $entity = \Drupal::entityTypeManager() + ->getStorage($mapping->get('drupal_entity_type')) + ->load($item->entity_id); + if (!$entity) { + throw new EntityNotFoundException(); + } + + salesforce_push_sync_rest($entity, $mapping, $item->op); + \Drupal::logger('Salesforce Push')->notice('Entity %type %id for salesforce mapping %mapping pushed successfully.', + [ + '%type' => $mapping->get('drupal_entity_type'), + '%id' => $item->entity_id, + '%mapping' => $mapping->id(), + ] + ); + } + +} \ No newline at end of file diff --git a/modules/salesforce_push/src/PushQueue.php b/modules/salesforce_push/src/PushQueue.php index f57a3d50..10116516 100644 --- a/modules/salesforce_push/src/PushQueue.php +++ b/modules/salesforce_push/src/PushQueue.php @@ -8,6 +8,8 @@ use Drupal\Core\DependencyInjection\DependencySerializationTrait; use Drupal\Core\Database\Query\Merge; use Drupal\Core\Queue\DatabaseQueue; use Drupal\Core\State\State; +use Drupal\Core\Queue\SuspendQueueException; +use Drupal\Core\Queue\RequeueException; /** * Salesforce push queue. @@ -22,9 +24,16 @@ class PushQueue extends DatabaseQueue { const TABLE_NAME = 'salesforce_push_queue'; const DEFAULT_CRON_PUSH_LIMIT = 200; + + const DEFAULT_QUEUE_PROCESSOR = 'rest'; + + const DEFAULT_MAX_FAILS = 10; + protected $limit; protected $connection; protected $state; + protected $queueManager; + protected $max_fails; /** * Constructs a \Drupal\Core\Queue\DatabaseQueue object. @@ -32,10 +41,14 @@ class PushQueue extends DatabaseQueue { * @param \Drupal\Core\Database\Connection $connection * The Connection object containing the key-value tables. */ - public function __construct(Connection $connection, State $state) { + public function __construct(Connection $connection, State $state, PushQueueProcessorPluginManager $queue_manager) { $this->connection = $connection; $this->state = $state; - $this->limit = $state->get('salesforce.push_limit', self::DEFAULT_CRON_PUSH_LIMIT); + $this->queueManager = $queue_manager; + + $this->limit = $state->get('salesforce.push_limit', static::DEFAULT_CRON_PUSH_LIMIT); + + $this->max_fails = \Drupal::state()->get('salesforce.push_queue_max_fails', static::DEFAULT_MAX_FAILS); } /** @@ -99,19 +112,24 @@ class PushQueue extends DatabaseQueue { } /** - * Claim $n items from the current queue. + * Claim up to $n items from the current queue. + * If queue is empty, return an empty array. * @see DatabaseQueue::claimItem + * @return array $items + * Zero to $n Items indexed by item_id */ - public function claimItems($n, $lease_time = 30) { + public function claimItems($n, $lease_time = 300) { while (TRUE) { try { - $items = $this->connection->queryRange('SELECT * FROM {' . static::TABLE_NAME . '} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, $n, array(':name' => $this->name))->fetchAllAssoc('entity_id'); + // @TODO: convert items to content entities. + // @see \Drupal::entityQuery() + $items = $this->connection->queryRange('SELECT * FROM {' . static::TABLE_NAME . '} q WHERE expire = 0 AND name = :name AND failures < :fail_limit ORDER BY created, item_id ASC', 0, $n, array(':name' => $this->name, ':fail_limit' => $this->max_fails))->fetchAllAssoc('item_id'); } catch (\Exception $e) { $this->catchException($e); // If the table does not exist there are no items currently available to // claim. - return FALSE; + return []; } if ($items) { // Try to update the item. Only one thread can succeed in UPDATEing the @@ -133,55 +151,19 @@ class PushQueue extends DatabaseQueue { } else { // No items currently available to claim. - return FALSE; + return []; } } } /** - * {@inheritdoc} + * DO NOT USE THIS FUNCTION. + * Use claimItems() instead. */ - public function claimItem($lease_time = 30) { - // Claim an item by updating its expire fields. If claim is not successful - // another thread may have claimed the item in the meantime. Therefore loop - // until an item is successfully claimed or we are reasonably sure there - // are no unclaimed items left. - while (TRUE) { - try { - $item = $this->connection->queryRange('SELECT * FROM {' . static::TABLE_NAME . '} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, 1, array(':name' => $this->name))->fetchObject(); - } - catch (\Exception $e) { - $this->catchException($e); - // If the table does not exist there are no items currently available to - // claim. - return FALSE; - } - if ($item) { - // Try to update the item. Only one thread can succeed in UPDATEing the - // same row. We cannot rely on REQUEST_TIME because items might be - // claimed by a single consumer which runs longer than 1 second. If we - // continue to use REQUEST_TIME instead of the current time(), we steal - // time from the lease, and will tend to reset items before the lease - // should really expire. - $update = $this->connection->update(static::TABLE_NAME) - ->fields(array( - 'expire' => time() + $lease_time, - )) - ->condition('item_id', $item->item_id) - ->condition('expire', 0); - // If there are affected rows, this update succeeded. - if ($update->execute()) { - return $item; - } - } - else { - // No items currently available to claim. - return FALSE; - } - } + public function claimItem($lease_time = NULL) { + throw new Exception('This queue is designed to process multiple items at once. Please use "claimItems" instead.'); } - /** * Defines the schema for the queue table. */ @@ -258,82 +240,133 @@ class PushQueue extends DatabaseQueue { public function processQueues() { $mappings = salesforce_push_load_push_mappings(); $i = 0; + + // [ id => ['id' => $id, 'label' => $label, 'class' => $class, 'provider' => $module_name] + $plugin_name = \Drupal::state()->get('salesforce.push_queue_processor', static::DEFAULT_QUEUE_PROCESSOR); + + $queue_processor = $this->queueManager->createInstance($plugin_name); + foreach ($mappings as $mapping) { + // Set the queue name, which is the mapping id. $this->setName($mapping->id()); - // @TODO: eventually this should work as follows: - // - New plugin type "PushQueueProcessor" - // -- Differs from QueueWorker plugin, because it can choose how to process an entire queue. - // -- Allows SoapQueueProcessor to optimize queries by processing multiple queue items at once. - // -- RestQueueProcessor will still do one-at-a-time. - // - Hand the mapping id (queue name) to the queue processor and let it do its thing + // Iterate through items in this queue until we run out or hit the limit. while (TRUE) { - if ($this->limit && $i++ > $this->limit) { - // Global limit is a hard stop. We're done processing now. - // @TODO some logging about how many items were processed, etc. - return; - } - - $item = $this->claimItem(); - if (!$item) { - // Ran out of items in this queue. Move on to the next one. - break; + // Claim as many items as we can from this queue and advance our counter. If this queue is empty, move to the next mapping. + $items = $this->claimItems($this->limit); + if (empty($items)) { + continue 2; } + // Hand them to the queue processor. try { - $entity = \Drupal::entityTypeManager() - ->getStorage($mapping->get('drupal_entity_type')) - ->load($item->entity_id); - if (!$entity) { - throw new Exception(); - } + $queue_processor->process($items); } - catch (Exception $e) { - // If there was an exception loading the entity, we assume that this queue item is no longer relevant. - \Drupal::logger('Salesforce Push')->notice($e->getMessage() . - ' Exception while loading entity %type %id for salesforce mapping %mapping. Queue item deleted.', - [ - '%type' => $mapping->get('drupal_entity_type'), - '%id' => $item->entity_id, - '%mapping' => $mapping->id(), - ] - ); - $item->delete(); + catch (RequeueException $e) { + // Getting a Requeue here is weird for a group of items, but we'll + // deal with it. + $this->releaseItems($items); + watchdog_exception('Salesforce Push', $e); } + catch (SuspendQueueException $e) { + // Getting a SuspendQueue is more likely, e.g. because of a network + // or authorization error. Move on to the next mapping in this case. + $this->releaseItems($items); + watchdog_exception('Salesforce Push', $e); - try { - salesforce_push_sync_rest($entity, $mapping, $item->op); - $this->deleteItem($item); - \Drupal::logger('Salesforce Push')->notice('Entity %type %id for salesforce mapping %mapping pushed successfully.', - [ - '%type' => $mapping->get('drupal_entity_type'), - '%id' => $item->entity_id, - '%mapping' => $mapping->id(), - ] - ); + continue 2; } - catch (Exception $e) { - $item->failure++; - \Drupal::logger('Salesforce Push')->notice($e->getMessage() . - ' Exception while pushing entity %type %id for salesforce mapping %mapping. Queue item %item failed %fail times.', - [ - '%type' => $mapping->get('drupal_entity_type'), - '%id' => $item->entity_id, - '%mapping' => $mapping->id(), - '%item' => $item->item_id, - '%fail' => $item->failure, - ] - ); - // doCreateItem() doubles as "save" function. - $item->doCreateItem(get_object_vars($item)); - $this->releaseItem($item); - // @TODO: push queue processor plugins will have to implement some error tolerance: - // - If mapped object does not exist, and this is a delete operation, we can delete this queue item. - // - Otherwise, return item to queue and increment failure count. - // - After N failures, move to perma fail table. + catch (\Exception $e) { + // In case of any other kind of exception, log it and leave the item + // in the queue to be processed again later. + // @TODO: this is how Cron.php queue works, but I don't really understand why it doesn't get re-queued. + watchdog_exception('Salesforce Push', $e); + } + finally { + // If we've reached our limit, we're done. Otherwise, continue to next items. + $i += count($items); + if ($i >= $this->limit) { + return $this; + } } } - } + } + return $this; + } + + /** + * Exception handler so that Queue Processors don't have to worry about what + * happens when a queue item fails. + * + * @param Exception $e + * @param stdClass $item + */ + public function failItem(\Exception $e, \stdClass $item) { + // For now we only have special handling for EntityNotFoundException. + // May want to distinguish in the future between network exceptions, etc. + $mapping = salesforce_mapping_load($item->name); + + if ($e instanceof EntityNotFoundException) { + // If there was an exception loading the entity, we assume that this queue item is no longer relevant. + \Drupal::logger('Salesforce Push')->error($e->getMessage() . + ' Exception while loading entity %type %id for salesforce mapping %mapping. Queue item deleted.', + [ + '%type' => $mapping->get('drupal_entity_type'), + '%id' => $item->entity_id, + '%mapping' => $mapping->id(), + ] + ); + $this->deleteItem($item); + return; + } + + $item->failures++; + + $message = $e->getMessage(); + if ($item->failures >= $this->max_fails) { + $message = 'Permanently failed queue item %item failed %fail times. Exception while pushing entity %type %id for salesforce mapping %mapping. ' . $message; + } + else { + $message = 'Queue item %item failed %fail times. Exception while pushing entity %type %id for salesforce mapping %mapping. ' . $message; + } + + \Drupal::logger('Salesforce Push')->error($message, + [ + '%type' => $mapping->get('drupal_entity_type'), + '%id' => $item->entity_id, + '%mapping' => $mapping->id(), + '%item' => $item->item_id, + '%fail' => $item->failures, + ] + ); + + // doCreateItem() doubles as "save" function. + // failed items will remain in queue in case fail params change or they need to be manually retried. + $this->doCreateItem(get_object_vars($item)); + $this->releaseItem($item); + } + + /** + * same as releaseItem, but for multiple items + * @param array $items + * Indexes must be item ids. Values are ignored. Return from claimItems() + * is acceptable. + */ + public function releaseItems(array $items) { + try { + $update = $this->connection->update(static::TABLE_NAME) + ->fields(array( + 'expire' => 0, + )) + ->condition('item_id', array_keys($items), 'IN'); + return $update->execute(); + } + catch (\Exception $e) { + watchdog_exception('Salesforce Push', $e); + $this->catchException($e); + // If the table doesn't exist we should consider the item released. + return TRUE; + } } } diff --git a/modules/salesforce_push/src/PushQueueProcessorPluginManager.php b/modules/salesforce_push/src/PushQueueProcessorPluginManager.php new file mode 100644 index 00000000..53de8744 --- /dev/null +++ b/modules/salesforce_push/src/PushQueueProcessorPluginManager.php @@ -0,0 +1,19 @@ +<?php + +namespace Drupal\salesforce_push; + +use Drupal\Core\Cache\CacheBackendInterface; +use Drupal\Core\Extension\ModuleHandlerInterface; +use Drupal\Core\Plugin\DefaultPluginManager; + +/** + * Plugin type manager for SF push queue processors + */ +class PushQueueProcessorPluginManager extends DefaultPluginManager { + + public function __construct(\Traversable $namespaces, CacheBackendInterface $cache_backend, ModuleHandlerInterface $module_handler) { + parent::__construct('Plugin/SalesforcePushQueueProcessor', $namespaces, $module_handler); + + $this->setCacheBackend($cache_backend, 'salesforce_push_queue_processor'); + } +} diff --git a/src/EntityNotFoundException.php b/src/EntityNotFoundException.php new file mode 100644 index 00000000..aba4d9ee --- /dev/null +++ b/src/EntityNotFoundException.php @@ -0,0 +1,11 @@ +<?php + +namespace Drupal\salesforce; + +/** + * EntityNotFoundException extends Drupal\salesforce\Exception + * Thrown when a load operation returns no results. + */ +class EntityNotFoundException extends Exception { + +} -- GitLab