diff --git a/modules/salesforce_mapping/config/schema/salesforce_mapping.schema.yml b/modules/salesforce_mapping/config/schema/salesforce_mapping.schema.yml index a03833c265e95137b07325e17ba7096dee86b726..414327afd1a1ea901b310cdf29dd5e28d76760b9 100644 --- a/modules/salesforce_mapping/config/schema/salesforce_mapping.schema.yml +++ b/modules/salesforce_mapping/config/schema/salesforce_mapping.schema.yml @@ -25,6 +25,12 @@ salesforce_mapping.salesforce_mapping.*: key: type: string label: 'Upsert Key' + async: + type: boolean + label: 'Push async' + async_limit: + type: integer + label: 'Async limit' pull_trigger_date: type: string label: 'Pull Trigger Date Field' diff --git a/modules/salesforce_mapping/src/Controller/MappedObjectController.php b/modules/salesforce_mapping/src/Controller/MappedObjectController.php index 105fa5d1faf5f89b691132bbe4cbbe70b053ee05..b0770f8e60b2787029af9bf2696b1be99eb49ca1 100644 --- a/modules/salesforce_mapping/src/Controller/MappedObjectController.php +++ b/modules/salesforce_mapping/src/Controller/MappedObjectController.php @@ -63,8 +63,8 @@ class MappedObjectController extends ControllerBase { } // If an existing mapping was not found, return a new stub instead. return new MappedObject([ - 'entity_id' => [LanguageInterface::LANGCODE_DEFAULT => $entity->id()], - 'entity_type_id' => [LanguageInterface::LANGCODE_DEFAULT => $entity->getEntityTypeId()], + 'entity_id' => $entity->id(), + 'entity_type_id' => $entity->getEntityTypeId(), ]); } diff --git a/modules/salesforce_mapping/src/Entity/SalesforceMapping.php b/modules/salesforce_mapping/src/Entity/SalesforceMapping.php index 9c052ef88bcd30bd578b23ccf7f5474ecba02bd7..f05d49a028917aa55fae6179c5e0d97acd5c3539 100644 --- a/modules/salesforce_mapping/src/Entity/SalesforceMapping.php +++ b/modules/salesforce_mapping/src/Entity/SalesforceMapping.php @@ -43,6 +43,8 @@ use Drupal\salesforce\Exception; * "status", * "type", * "key", + * "async", + * "async_limit", * "pull_trigger_date", * "sync_triggers", * "salesforce_object_type", @@ -102,6 +104,33 @@ class SalesforceMapping extends ConfigEntityBase implements SalesforceMappingInt */ protected $status = TRUE; + /** + * @TODO what does "locked" mean? + * + * @var bool + */ + protected $locked = FALSE; + + /** + * Whether to push asychronously (during dron) or immediately on entity CRUD. + * + * @var bool + */ + protected $async = FALSE; + + /** + * Max number of items for async push for this mapping + * + * @var int + */ + protected $async_limit = 200; + + /** + * The Salesforce field to use for determining whether or not to pull. + * + * @var string + */ + protected $pull_trigger_date = 'LastModifiedDate'; /** * The drupal entity type to which this mapping points. @@ -162,6 +191,15 @@ class SalesforceMapping extends ConfigEntityBase implements SalesforceMappingInt return $this->$key; } + /** + * Return the queue name for this mapping. + * + * @return string + */ + public function getQueue() { + return 'salesforce_push_' . $this->id(); + } + /** * Save the entity. * @@ -263,6 +301,10 @@ class SalesforceMapping extends ConfigEntityBase implements SalesforceMappingInt ); } + public function doesPush() { + return $this->checkTriggers([SALESFORCE_MAPPING_SYNC_DRUPAL_CREATE, SALESFORCE_MAPPING_SYNC_DRUPAL_UPDATE, SALESFORCE_MAPPING_SYNC_DRUPAL_DELETE]); + } + /** * @return bool * TRUE if this mapping uses any of the given $triggers, otherwise FALSE. diff --git a/modules/salesforce_mapping/src/Entity/SalesforceMappingInterface.php b/modules/salesforce_mapping/src/Entity/SalesforceMappingInterface.php index 44441c1cd4f18bff7c56810ca3d8e7d4db6ae7b1..99aff0cdfcc481adb877c28cbbc1801b56074b09 100644 --- a/modules/salesforce_mapping/src/Entity/SalesforceMappingInterface.php +++ b/modules/salesforce_mapping/src/Entity/SalesforceMappingInterface.php @@ -8,4 +8,8 @@ namespace Drupal\salesforce_mapping\Entity; interface SalesforceMappingInterface { // Placeholder interface. // @TODO figure out what to abstract out of SalesforceMapping + + public function __construct(array $values = [], $entity_type); + + public function __get($key); } diff --git a/modules/salesforce_mapping/src/Form/SalesforceMappingFormCrudBase.php b/modules/salesforce_mapping/src/Form/SalesforceMappingFormCrudBase.php index 1b45ea1aafb5dc616e3897a2fa48fbdbb55a0d39..f86d18791a15cb022426f308a4679e8801f90f28 100644 --- a/modules/salesforce_mapping/src/Form/SalesforceMappingFormCrudBase.php +++ b/modules/salesforce_mapping/src/Form/SalesforceMappingFormCrudBase.php @@ -70,7 +70,7 @@ abstract class SalesforceMappingFormCrudBase extends SalesforceMappingFormBase { '#type' => 'select', '#description' => $this->t('Select a Drupal entity type to map to a Salesforce object.'), '#options' => $entity_types, - '#default_value' => $mapping->get('drupal_entity_type'), + '#default_value' => $mapping->drupal_entity_type, '#required' => TRUE, '#empty_option' => $this->t('- Select -'), ]; @@ -100,7 +100,7 @@ abstract class SalesforceMappingFormCrudBase extends SalesforceMappingFormBase { ]; foreach ($bundle_info as $key => $info) { $form['drupal_entity']['drupal_bundle'][$entity_type]['#options'][$key] = $info['label']; - if ($key == $mapping->get('drupal_bundle')) { + if ($key == $mapping->drupal_bundle) { $form['drupal_entity']['drupal_bundle'][$entity_type]['#default_value'] = $key; } } @@ -118,8 +118,8 @@ abstract class SalesforceMappingFormCrudBase extends SalesforceMappingFormBase { if (!empty($form_state->getValues()) && !empty($form_state->getValue('salesforce_object_type'))) { $salesforce_object_type = $form_state->getValue('salesforce_object_type'); } - elseif ($mapping->get('salesforce_object_type')) { - $salesforce_object_type = $mapping->get('salesforce_object_type'); + elseif ($mapping->salesforce_object_type) { + $salesforce_object_type = $mapping->salesforce_object_type; } $form['salesforce_object']['salesforce_object_type'] = [ '#title' => $this->t('Salesforce Object'), @@ -142,7 +142,7 @@ abstract class SalesforceMappingFormCrudBase extends SalesforceMappingFormBase { if ($salesforce_object_type) { // Check for custom record types. - $salesforce_record_type = $mapping->get('salesforce_record_type'); + $salesforce_record_type = $mapping->salesforce_record_type; $salesforce_record_type_options = $this->get_salesforce_record_type_options($salesforce_object_type, $form_state); if (count($salesforce_record_type_options) > 1) { // There are multiple record types for this object type, so the user @@ -172,10 +172,8 @@ abstract class SalesforceMappingFormCrudBase extends SalesforceMappingFormBase { '#type' => 'select', '#title' => t('Date field to trigger pull'), '#description' => t('Select a date field to base pull triggers on. (Default of "Last Modified Date" is usually appropriate).'), - '#required' => $this->entity->get('salesforce_object_type'), - '#default_value' => $this->entity->get('pull_trigger_date') - ? $this->entity->get('pull_trigger_date') - : 'LastModifiedDate', + '#required' => $mapping->salesforce_object_type, + '#default_value' => $mapping->pull_trigger_date, '#options' => $this->get_pull_trigger_options($salesforce_object_type), ]; @@ -183,7 +181,8 @@ abstract class SalesforceMappingFormCrudBase extends SalesforceMappingFormBase { $trigger_options = $this->get_sync_trigger_options(); $form['sync_triggers'] = [ '#title' => t('Action triggers'), - '#type' => 'container', + '#type' => 'details', + '#open' => TRUE, '#tree' => TRUE, '#description' => t('Select which actions on Drupal entities and Salesforce objects should trigger a synchronization. These settings are used by the @@ -195,17 +194,70 @@ abstract class SalesforceMappingFormCrudBase extends SalesforceMappingFormBase { $form['sync_triggers'][$option] = [ '#title' => $label, '#type' => 'checkbox', - '#default_value' => @$mapping->get('sync_triggers')[$option], + '#default_value' => !empty($mapping->sync_triggers[$option]), ]; } - // Hide all the hidden stuff in here. - foreach (['weight', 'status', 'locked', 'type'] as $el) { - $form[$el] = [ - '#type' => 'value', - '#value' => $mapping->get($el), - ]; - } + $form['queue'] = [ + '#title' => 'Queue Settings', + '#type' => 'details', + '#open' => TRUE, + '#tree' => FALSE, + ]; + + $form['queue']['async'] = [ + '#title' => t('Enable queue'), + '#type' => 'checkbox', + '#description' => t('When enabled, enqueue changes and push to Salesforce asynchronously during cron. When disabled, push changes immediately upon entity CRUD.'), + '#default_value' => $mapping->async, + ]; + + $form['queue']['async_limit'] = [ + '#title' => t('Push limit'), + '#type' => 'number', + '#min' => 1, + '#description' => t('Enter the maximum number of items to process for this mapping during cron.'), + '#default_value' => $mapping->async_limit, + '#states' => [ + 'visible' => [ + ':input#edit-async' => ['checked' => TRUE], + ], + ], + ]; + + $form['queue']['weight'] = [ + '#title' => t('Weight'), + '#type' => 'select', + '#options' => array_combine(range(-50,50), range(-50,50)), + '#description' => t('Not yet in use. During cron, mapping weight determines in which order items will be pushed. Lesser weight items will be pushed before greater weight items.'), + '#default_value' => $mapping->weight, + '#states' => [ + 'visible' => [ + ':input#edit-async' => ['checked' => TRUE], + ], + ], + ]; + + $form['meta'] = [ + '#type' => 'details', + '#open' => TRUE, + '#tree' => FALSE, + '#title' => t('Additional properties'), + ]; + + $form['meta']['status'] = [ + '#title' => t('Status'), + '#type' => 'checkbox', + '#description' => t('Not yet in use.'), + '#default_value' => $mapping->status, + ]; + + $form['meta']['locked'] = [ + '#title' => t('Locked'), + '#type' => 'checkbox', + '#description' => t('Not yet in use.'), + '#default_value' => $mapping->locked, + ]; return $form; } diff --git a/modules/salesforce_push/salesforce_push.module b/modules/salesforce_push/salesforce_push.module index 5305f530500f8d809f39a2eafecc1bcd1c5d000d..9ba8bee096d0e24da5c431e65207e08e95e9f612 100644 --- a/modules/salesforce_push/salesforce_push.module +++ b/modules/salesforce_push/salesforce_push.module @@ -34,7 +34,7 @@ function salesforce_push_entity_delete(EntityInterface $entity) { salesforce_push_entity_crud($entity, SALESFORCE_MAPPING_SYNC_DRUPAL_DELETE); } -function salesforce_push_salesforce_push_entity_allowed(EntityInterface $entity, $op) { +function salesforce_push_salesforce_push_entity_allowed(EntityInterface $entity, SalesforceMappingInterface $mapping, $op) { // Don't allow mapped objects or mappings to be pushed! // @TODO can we implement this instead with a validation constraint? This is fugly. if ($entity instanceof MappedObjectInterface @@ -58,29 +58,22 @@ function salesforce_push_salesforce_push_entity_allowed(EntityInterface $entity, * to be a very long ways away. https://www.drupal.org/node/2551893 */ function salesforce_push_entity_crud(EntityInterface $entity, $op) { - - try { - $mappings = salesforce_mapping_load_by_drupal($entity->getEntityTypeId()); - } - catch (Exception $e) { - // No mappings found for this entity. We're done. - return; - } - + $mappings = salesforce_push_load_push_mappings($entity->getEntityTypeId()); foreach ($mappings as $mapping) { $mapped_objects = []; $mapped_object = FALSE; - if (!$mapping->checkTriggers([$op])) { - continue; - } // @TODO decide whether this hook is worth moving to Events framework, and how. Should subscribers throw an exception to prevent entity sync? Return false, like so? Something else entirely? - foreach (\Drupal::moduleHandler()->invokeAll('salesforce_push_entity_allowed', array($entity, $op, $mapping)) as $value) { + foreach (\Drupal::moduleHandler()->invokeAll('salesforce_push_entity_allowed', array($entity, $mapping, $op)) as $value) { if ($value === FALSE) { continue 2; } } - // @TODO batch vs. real-time logic goes here. + if ($mapping->async) { + // Enqueue + salesforce_push_enqueue_async($entity, $mapping, $op); + return; + } try { $mapped_object = salesforce_push_sync_rest($entity, $mapping, $op); @@ -91,6 +84,35 @@ function salesforce_push_entity_crud(EntityInterface $entity, $op) { } } +/** + * Helper function to load only those mappings which have at least one push + * trigger enabled. + * + * @param $entity_type_id + * (optional) filter by mapping drupal entity type. + * + * @return array + */ +function salesforce_push_load_push_mappings($entity_type_id = NULL) { + $push_mappings = []; + try { + $properties = empty($entity_type_id) + ? [] + : ["drupal_entity_type" => $entity_type_id]; + $mappings = salesforce_mapping_load_multiple($properties); + foreach ($mappings as $key => $mapping) { + if (!$mapping->doesPush()) { + continue; + } + $push_mappings[$key] = $mapping; + } + } + catch (Exception $e) { + // No mappings found. + } + return $push_mappings; +} + /** * Worker function to do actual push to Salesforce. * @@ -152,113 +174,28 @@ function salesforce_push_sync_rest(EntityInterface $entity, SalesforceMappingInt } } +/** + * Worker function to insert a new queue item into the async push queue for the + * given mapping. + * @param EntityInterface $entity + * @param SalesforceMappingInterface $mapping + * @param string $op + */ +function salesforce_push_enqueue_async(EntityInterface $entity, SalesforceMappingInterface $mapping, $op) { + // Each mapping has its own queue, so that like entries can be easily grouped + // for batching. Each queue item is a unique array of entity ids to be + // pushed. The async queue worker loads the queue item and works through as + // many entities as possible, up to the async limit for this mapping. + \Drupal::service('queue.salesforce')->createItem([ + 'name' => $mapping->id(), + 'entity_id' => $entity->id(), + 'op' => $op, + ]); +} + /** * Implements hook_cron(). */ function salesforce_push_cron() { - // @TODO haven't started work on this queue yet. - return; - - $sfapi = salesforce_get_api(); - if (!$sfapi->isAuthorized()) { - return; - } - - $maps = salesforce_mapping_load_multiple(); - $entity_ids = []; - // General approach to processing the queue: - // For each map, sorted by weight - // -- For each operation, delete, create, then update - // -- -- Build a list of sf params and send it - - $queue = \Drupal::queue('salesforce_push'); - // @TODO SOAP put limit is 200 per op. Why does this default to 50? - $limit = \Drupal::state()->get('salesforce.push_limit', 50); - $use_soap = \Drupal::moduleHandler()->moduleExists('salesforce_soap'); - for ($delta = 0; $delta < $limit; $delta++) { - $item = $queue->claimItem(); - // We do this after the "for()" so that when we reach the limit, we don't - // incidentally claim a queue license on an item we aren't going to process. - if (!$item) { - break; - } - $mapping = salesforce_mapping_load($item->data['mapping_id']); - - // Duplicate entity in the queue. - if ($item->data['entity_type'] == $entity_type && $item->data['entity_id'] == $entity_id) { - $queue->deleteItem($item); - continue; - } - - $entity_type = $item->data['entity_type']; - $entity_id = $item->data['entity_id']; - $entity = entity_load($entity_type, $entity_id); - $mapped_object = salesforce_mapped_object_load_by_entity($entity); - if ($use_soap) { - if ($item->data['trigger'] == SALESFORCE_MAPPING_SYNC_DRUPAL_DELETE && $mapped_object) { - $delete_list[$delta] = $mapped_object->salesforce_id; - } - else { - $wrapper = entity_metadata_wrapper($item->data['entity_type'], $entity); - $params = salesforce_push_map_params($mapping, $wrapper, $key_field, $key_value); - - $synced_entities[$delta] = [ - 'entity_wrapper' => $wrapper, - 'mapped_objects' => $mapped_object, - ]; - - $sobject = new stdClass(); - $sobject->type = $mapping->salesforce_object_type; - foreach ($params as $key => $value) { - $sobject->fields[$key] = $value; - } - - if ($mapped_object && $mapped_object->salesforce_id) { - $sobject->Id = $mapped_object->salesforce_id; - $update_list[$delta] = $sobject; - } - else { - if ($key_field && $key_value) { - $upsert_list[$key_field][$delta] = $sobject; - } - else { - $create_list[$delta] = $sobject; - } - } - } - } - else { - salesforce_push_sync_rest($entity_type, $entity, $mapping, $item->data['trigger']); - } - - // Remove item from queue. - $queue->deleteItem($item); - } - - // Use soap API to batch process records. - if ($use_soap) { - module_load_include('inc', 'salesforce_soap'); - $soap = new SalesforceSoapPartner($sfapi); - if (!empty($delete_list)) { - $results = $soap->delete($delete_list); - salesforce_push_process_soap_results('Delete', $results, $synced_entities); - } - - if (!empty($create_list)) { - $results = $soap->create($create_list); - salesforce_push_process_soap_results('Create', $results, $synced_entities); - } - - if (!empty($update_list)) { - $results = $soap->update($update_list); - salesforce_push_process_soap_results('Update', $results, $synced_entities); - } - - if (!empty($upsert_list)) { - foreach ($upsert_list as $key => $upsert_item) { - $results = $soap->upsert($key, $upsert_item); - salesforce_push_process_soap_results('Upsert', $results, $synced_entities); - } - } - } + \Drupal::service('queue.salesforce')->processQueue(); } diff --git a/modules/salesforce_push/salesforce_push.services.yml b/modules/salesforce_push/salesforce_push.services.yml index 4135018023091e35f01700696a00ad17dd095a1e..4f717e369ad6bef33e87cd7421ceba32955d0f89 100644 --- a/modules/salesforce_push/salesforce_push.services.yml +++ b/modules/salesforce_push/salesforce_push.services.yml @@ -1,4 +1,4 @@ services: - plugin.manager.salesforce_push: - class: Drupal\salesforce_push\SalesforcePushPluginManager - arguments: ['@container.namespaces', '@cache.default', '@module_handler'] + queue.salesforce: + class: Drupal\salesforce_push\PushQueue + arguments: ['@database'] diff --git a/modules/salesforce_push/src/PushQueue.php b/modules/salesforce_push/src/PushQueue.php new file mode 100644 index 0000000000000000000000000000000000000000..18523226b03bb423b2e212e12f20c24204c862d0 --- /dev/null +++ b/modules/salesforce_push/src/PushQueue.php @@ -0,0 +1,217 @@ +<?php + +namespace Drupal\salesforce_push; + +use Drupal\Core\Queue\DatabaseQueue; +use Drupal\Core\Database\Connection; +use Drupal\Core\Database\SchemaObjectExistsException; +use Drupal\Core\DependencyInjection\DependencySerializationTrait; +use Drupal\Core\Database\Query\Merge; + +/** + * Salesforce push queue. + * + * @ingroup queue + */ +class PushQueue extends DatabaseQueue { + + /** + * The database table name. + */ + const TABLE_NAME = 'salesforce_push_queue'; + + /** + * Constructs a \Drupal\Core\Queue\DatabaseQueue object. + * + * @param \Drupal\Core\Database\Connection $connection + * The Connection object containing the key-value tables. + */ + function __construct(Connection $connection) { + $this->connection = $connection; + } + + public function setName($name) { + $this->name = $name; + } + + /** + * Adds a queue item and store it directly to the queue. + * + * @param array $data + * Data array with the following key-value pairs: + * * 'name': the name of the salesforce mapping for this entity + * * 'entity_id': the entity id being mapped / pushed + * * 'op': the operation which triggered this push. + * + * @return + * On success, Drupal\Core\Database\Query\Merge::STATUS_INSERT or Drupal\Core\Database\Query\Merge::STATUS_UPDATE + * + * @throws Exception if the required indexes are not provided. + * @TODO convert $data to a proper class and make sure that's what we get for this argument. + */ + protected function doCreateItem($data) { + if (empty($data['name']) + || empty($data['entity_id']) + || empty($data['op'])) { + throw new Exception('Salesforce push queue data values are required for "name", "entity_id" and "op"'); + } + $this->name = $data['name']; + $time = time(); + $query = $this->connection->merge(static::TABLE_NAME) + ->key(array('name' => $this->name, 'entity_id' => $data['entity_id'])) + ->fields(array( + 'name' => $this->name, + 'entity_id' => $data['entity_id'], + 'op' => $data['op'], + 'updated' => $time, + )); + + // Return Merge::STATUS_INSERT or Merge::STATUS_UPDATE + $ret = $query->execute(); + + // Drupal still doesn't support now() https://www.drupal.org/node/215821 + // 9 years. + if ($ret == Merge::STATUS_INSERT) { + $this->connection->merge(static::TABLE_NAME) + ->key(array('name' => $this->name, 'entity_id' => $data['entity_id'])) + ->fields(['created' => $time]) + ->execute(); + } + return $ret; + } + + /** + * Claim $n items from the current queue. + * @see DatabaseQueue::claimItem + */ + public function claimItems($n, $lease_time = 30) { + while (TRUE) { + try { + $items = $this->connection->queryRange('SELECT name, entity_id, op, created, item_id FROM {' . static::TABLE_NAME . '} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, $n, array(':name' => $this->name))->fetchAllAssoc('entity_id'); + } + catch (\Exception $e) { + $this->catchException($e); + // If the table does not exist there are no items currently available to + // claim. + return FALSE; + } + if ($items) { + // Try to update the item. Only one thread can succeed in UPDATEing the + // same row. We cannot rely on REQUEST_TIME because items might be + // claimed by a single consumer which runs longer than 1 second. If we + // continue to use REQUEST_TIME instead of the current time(), we steal + // time from the lease, and will tend to reset items before the lease + // should really expire. + $update = $this->connection->update(static::TABLE_NAME) + ->fields(array( + 'expire' => time() + $lease_time, + )) + ->condition('item_id', array_keys($items), 'IN') + ->condition('expire', 0); + // If there are affected rows, this update succeeded. + if ($update->execute()) { + return $items; + } + } + else { + // No items currently available to claim. + return FALSE; + } + } + } + + /** + * Defines the schema for the queue table. + */ + public function schemaDefinition() { + return [ + 'description' => 'Drupal entities to push to Salesforce.', + 'fields' => [ + 'item_id' => [ + 'type' => 'serial', + 'unsigned' => TRUE, + 'not null' => TRUE, + 'description' => 'Primary Key: Unique item ID.', + ], + 'name' => [ + 'type' => 'varchar_ascii', + 'length' => 255, + 'not null' => TRUE, + 'default' => '', + 'description' => 'The salesforce mapping id', + ], + 'entity_id' => [ + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'The entity id', + ], + 'op' => [ + 'type' => 'varchar_ascii', + 'length' => 16, + 'not null' => TRUE, + 'default' => '', + 'description' => 'The operation which triggered this push', + ], + 'expire' => [ + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'Timestamp when the claim lease expires on the item.', + ], + 'created' => [ + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'Timestamp when the item was created.', + ], + 'updated' => [ + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'Timestamp when the item was created.', + ], + ], + 'primary key' => ['item_id'], + 'unique keys' => [ + 'name_entity_id' => ['name', 'entity_id'], + ], + 'indexes' => [ + 'entity_id' => ['entity_id'], + 'name_created' => ['name', 'created'], + 'expire' => ['expire'], + ], + ]; + } + + /** + * Process Salesforce queues + */ + public function processQueues() { + $mappings = salesforce_push_load_push_mappings(); + + foreach ($mappings as $mapping) { + // @TODO: Implement a global limit for REST async. Limit per mapping doesn't make sense here since we're doing one entry at a time. + $this->setName($mapping->id()); + + // @TODO this is where we would be branching for SOAP vs REST async push. How to encapsulate this? Delegate to queue worker? + while ($item = $this->claimItem()) { + try { + $entity = \Drupal::entityTypeManager() + ->getStorage($mapping->get('drupal_entity_type')) + ->load($item->entity_id); + + // @TODO this doesn't feel right. Where should this go? + salesforce_push_sync_rest($entity, $mapping, $item->op); + } + catch (Exception $e) { + // @TODO on Exception, mapped object was unable to be created or updated, and operation was not undertaken. + // If mapped does not exist, and this is a delete operation, we can delete this queue item. + // Otherwise, return item to queue and increment failure count. + // After N failures, move to perma fail table. + } + } + } + } + +} diff --git a/salesforce.api.php b/salesforce.api.php index 16d823e53a4793e40c110c0ebe3cdbd4ca21718a..66b577f362e1a66052c4a86c605fb23d841710ba 100644 --- a/salesforce.api.php +++ b/salesforce.api.php @@ -47,18 +47,22 @@ function hook_salesforce_push_params_alter(&$params, $mapping, $entity_wrapper) /** * Prevent push to SF for an entity. * - * @param string $entity_type + * @param EntityInterface $entity * The type of entity the push is for. - * @param object $entity - * The entity object the push is for. - * @param ing $sf_sync_trigger - * Constant for the Drupal operation that triggered the sync. + * @param SalesforceMappingInterface $mapping + * The mapping being used for this push. + * @param string $operation + * Constant for the Drupal operation that triggered the sync. + * One of: + * SALESFORCE_MAPPING_SYNC_DRUPAL_CREATE + * SALESFORCE_MAPPING_SYNC_DRUPAL_UPDATE + * SALESFORCE_MAPPING_SYNC_DRUPAL_DELETE * * @return bool * FALSE if the entity should not be synced to Salesforce for the * $sf_sync_trigger operation. */ -function hook_salesforce_push_entity_allowed($entity_type, $entity, $sf_sync_trigger) { +function hook_salesforce_push_entity_allowed(EntityInterface $entity, SalesforceMappingInterface $mapping, $operation) { }