diff --git a/modules/salesforce_mapping/salesforce_mapping.module b/modules/salesforce_mapping/salesforce_mapping.module index fd583dd53cd452f9638a97f6dd6ce0e44532a16f..373b476c5181120e9c683ae5018765f93a8625ec 100644 --- a/modules/salesforce_mapping/salesforce_mapping.module +++ b/modules/salesforce_mapping/salesforce_mapping.module @@ -100,9 +100,9 @@ function salesforce_mapping_load($name) { $mapping = \Drupal::entityTypeManager() ->getStorage('salesforce_mapping') ->load($name); - //if (empty($mapping)) { - // throw new Exception("No mapping found for $name."); - //} + if (empty($mapping)) { + throw new Exception("No mapping found for $name."); + } return $mapping; } diff --git a/modules/salesforce_push/salesforce_push.install b/modules/salesforce_push/salesforce_push.install index 8b80a7fb06f23382b232dd5382759a668505bd08..e17fed7210f4da02b41b848c372814b1fcb36cd7 100644 --- a/modules/salesforce_push/salesforce_push.install +++ b/modules/salesforce_push/salesforce_push.install @@ -1,14 +1,12 @@ <?php -/** - * @file - * Install/uninstall tasks for the Salesforce Push module. - */ + +use Drupal\salesforce_push\PushQueue; /** * Implements hook_install(). */ function salesforce_push_install() { - \Drupal::state()->set('salesforce.push_limit', 50); + \Drupal::state()->set('salesforce.push_limit', PushQueue::DEFAULT_CRON_PUSH_LIMIT); } /** diff --git a/modules/salesforce_push/salesforce_push.module b/modules/salesforce_push/salesforce_push.module index 609d656a67cac259c64194b102e06b45247497d7..7df5978eb9a3038ab2767d51df24df1c0ede7608 100644 --- a/modules/salesforce_push/salesforce_push.module +++ b/modules/salesforce_push/salesforce_push.module @@ -197,5 +197,5 @@ function salesforce_push_enqueue_async(EntityInterface $entity, SalesforceMappin * Implements hook_cron(). */ function salesforce_push_cron() { - \Drupal::service('queue.salesforce')->processQueue(); + \Drupal::service('queue.salesforce')->processQueues(); } diff --git a/modules/salesforce_push/salesforce_push.services.yml b/modules/salesforce_push/salesforce_push.services.yml index 4f717e369ad6bef33e87cd7421ceba32955d0f89..11fe8b5049f6d1db6d9a8aaf39f250bd20b923d2 100644 --- a/modules/salesforce_push/salesforce_push.services.yml +++ b/modules/salesforce_push/salesforce_push.services.yml @@ -1,4 +1,4 @@ services: queue.salesforce: class: Drupal\salesforce_push\PushQueue - arguments: ['@database'] + arguments: ['@database', '@state'] diff --git a/modules/salesforce_push/src/PushQueue.php b/modules/salesforce_push/src/PushQueue.php index 18523226b03bb423b2e212e12f20c24204c862d0..f57a3d50b941418e01a447541f3a2f249f8584bb 100644 --- a/modules/salesforce_push/src/PushQueue.php +++ b/modules/salesforce_push/src/PushQueue.php @@ -2,11 +2,12 @@ namespace Drupal\salesforce_push; -use Drupal\Core\Queue\DatabaseQueue; use Drupal\Core\Database\Connection; use Drupal\Core\Database\SchemaObjectExistsException; use Drupal\Core\DependencyInjection\DependencySerializationTrait; use Drupal\Core\Database\Query\Merge; +use Drupal\Core\Queue\DatabaseQueue; +use Drupal\Core\State\State; /** * Salesforce push queue. @@ -20,18 +21,34 @@ class PushQueue extends DatabaseQueue { */ const TABLE_NAME = 'salesforce_push_queue'; + const DEFAULT_CRON_PUSH_LIMIT = 200; + protected $limit; + protected $connection; + protected $state; + /** * Constructs a \Drupal\Core\Queue\DatabaseQueue object. * * @param \Drupal\Core\Database\Connection $connection * The Connection object containing the key-value tables. */ - function __construct(Connection $connection) { + public function __construct(Connection $connection, State $state) { $this->connection = $connection; + $this->state = $state; + $this->limit = $state->get('salesforce.push_limit', self::DEFAULT_CRON_PUSH_LIMIT); } + /** + * Parent class DatabaseQueue relies heavily on $this->name, so it's best to + * just set the value appropriately. + * + * @param string $name + * + * @return $this + */ public function setName($name) { $this->name = $name; + return $this; } /** @@ -47,6 +64,7 @@ class PushQueue extends DatabaseQueue { * On success, Drupal\Core\Database\Query\Merge::STATUS_INSERT or Drupal\Core\Database\Query\Merge::STATUS_UPDATE * * @throws Exception if the required indexes are not provided. + * * @TODO convert $data to a proper class and make sure that's what we get for this argument. */ protected function doCreateItem($data) { @@ -87,7 +105,7 @@ class PushQueue extends DatabaseQueue { public function claimItems($n, $lease_time = 30) { while (TRUE) { try { - $items = $this->connection->queryRange('SELECT name, entity_id, op, created, item_id FROM {' . static::TABLE_NAME . '} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, $n, array(':name' => $this->name))->fetchAllAssoc('entity_id'); + $items = $this->connection->queryRange('SELECT * FROM {' . static::TABLE_NAME . '} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, $n, array(':name' => $this->name))->fetchAllAssoc('entity_id'); } catch (\Exception $e) { $this->catchException($e); @@ -120,6 +138,50 @@ class PushQueue extends DatabaseQueue { } } + /** + * {@inheritdoc} + */ + public function claimItem($lease_time = 30) { + // Claim an item by updating its expire fields. If claim is not successful + // another thread may have claimed the item in the meantime. Therefore loop + // until an item is successfully claimed or we are reasonably sure there + // are no unclaimed items left. + while (TRUE) { + try { + $item = $this->connection->queryRange('SELECT * FROM {' . static::TABLE_NAME . '} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, 1, array(':name' => $this->name))->fetchObject(); + } + catch (\Exception $e) { + $this->catchException($e); + // If the table does not exist there are no items currently available to + // claim. + return FALSE; + } + if ($item) { + // Try to update the item. Only one thread can succeed in UPDATEing the + // same row. We cannot rely on REQUEST_TIME because items might be + // claimed by a single consumer which runs longer than 1 second. If we + // continue to use REQUEST_TIME instead of the current time(), we steal + // time from the lease, and will tend to reset items before the lease + // should really expire. + $update = $this->connection->update(static::TABLE_NAME) + ->fields(array( + 'expire' => time() + $lease_time, + )) + ->condition('item_id', $item->item_id) + ->condition('expire', 0); + // If there are affected rows, this update succeeded. + if ($update->execute()) { + return $item; + } + } + else { + // No items currently available to claim. + return FALSE; + } + } + } + + /** * Defines the schema for the queue table. */ @@ -153,6 +215,12 @@ class PushQueue extends DatabaseQueue { 'default' => '', 'description' => 'The operation which triggered this push', ], + 'failures' => [ + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'Number of failed push attempts for this queue item.', + ], 'expire' => [ 'type' => 'int', 'not null' => TRUE, @@ -189,26 +257,80 @@ class PushQueue extends DatabaseQueue { */ public function processQueues() { $mappings = salesforce_push_load_push_mappings(); - + $i = 0; foreach ($mappings as $mapping) { - // @TODO: Implement a global limit for REST async. Limit per mapping doesn't make sense here since we're doing one entry at a time. $this->setName($mapping->id()); - // @TODO this is where we would be branching for SOAP vs REST async push. How to encapsulate this? Delegate to queue worker? - while ($item = $this->claimItem()) { + // @TODO: eventually this should work as follows: + // - New plugin type "PushQueueProcessor" + // -- Differs from QueueWorker plugin, because it can choose how to process an entire queue. + // -- Allows SoapQueueProcessor to optimize queries by processing multiple queue items at once. + // -- RestQueueProcessor will still do one-at-a-time. + // - Hand the mapping id (queue name) to the queue processor and let it do its thing + while (TRUE) { + if ($this->limit && $i++ > $this->limit) { + // Global limit is a hard stop. We're done processing now. + // @TODO some logging about how many items were processed, etc. + return; + } + + $item = $this->claimItem(); + if (!$item) { + // Ran out of items in this queue. Move on to the next one. + break; + } + try { $entity = \Drupal::entityTypeManager() ->getStorage($mapping->get('drupal_entity_type')) ->load($item->entity_id); + if (!$entity) { + throw new Exception(); + } + } + catch (Exception $e) { + // If there was an exception loading the entity, we assume that this queue item is no longer relevant. + \Drupal::logger('Salesforce Push')->notice($e->getMessage() . + ' Exception while loading entity %type %id for salesforce mapping %mapping. Queue item deleted.', + [ + '%type' => $mapping->get('drupal_entity_type'), + '%id' => $item->entity_id, + '%mapping' => $mapping->id(), + ] + ); + $item->delete(); + } - // @TODO this doesn't feel right. Where should this go? + try { salesforce_push_sync_rest($entity, $mapping, $item->op); + $this->deleteItem($item); + \Drupal::logger('Salesforce Push')->notice('Entity %type %id for salesforce mapping %mapping pushed successfully.', + [ + '%type' => $mapping->get('drupal_entity_type'), + '%id' => $item->entity_id, + '%mapping' => $mapping->id(), + ] + ); } catch (Exception $e) { - // @TODO on Exception, mapped object was unable to be created or updated, and operation was not undertaken. - // If mapped does not exist, and this is a delete operation, we can delete this queue item. - // Otherwise, return item to queue and increment failure count. - // After N failures, move to perma fail table. + $item->failure++; + \Drupal::logger('Salesforce Push')->notice($e->getMessage() . + ' Exception while pushing entity %type %id for salesforce mapping %mapping. Queue item %item failed %fail times.', + [ + '%type' => $mapping->get('drupal_entity_type'), + '%id' => $item->entity_id, + '%mapping' => $mapping->id(), + '%item' => $item->item_id, + '%fail' => $item->failure, + ] + ); + // doCreateItem() doubles as "save" function. + $item->doCreateItem(get_object_vars($item)); + $this->releaseItem($item); + // @TODO: push queue processor plugins will have to implement some error tolerance: + // - If mapped object does not exist, and this is a delete operation, we can delete this queue item. + // - Otherwise, return item to queue and increment failure count. + // - After N failures, move to perma fail table. } } } diff --git a/src/Rest/RestResponse.php b/src/Rest/RestResponse.php index 389d5c2fdfb210714b5a682637e6e8347e456379..e5594609c38d6b1c289a2221fbf884b3b4a3c596 100644 --- a/src/Rest/RestResponse.php +++ b/src/Rest/RestResponse.php @@ -46,7 +46,7 @@ class RestResponse extends Response { */ function __get($key) { if (!property_exists($this, $key)) { - throw new Exception("Undefined property $key"); + throw new \Exception("Undefined property $key"); } return $this->$key; }