Skip to content
Snippets Groups Projects
Commit 3fcee3e2 authored by Aaron Bauman's avatar Aaron Bauman
Browse files

- push cron via REST is basically working now

parent 23b84d91
No related branches found
No related tags found
No related merge requests found
......@@ -100,9 +100,9 @@ function salesforce_mapping_load($name) {
$mapping = \Drupal::entityTypeManager()
->getStorage('salesforce_mapping')
->load($name);
//if (empty($mapping)) {
// throw new Exception("No mapping found for $name.");
//}
if (empty($mapping)) {
throw new Exception("No mapping found for $name.");
}
return $mapping;
}
......
<?php
/**
* @file
* Install/uninstall tasks for the Salesforce Push module.
*/
use Drupal\salesforce_push\PushQueue;
/**
* Implements hook_install().
*/
function salesforce_push_install() {
\Drupal::state()->set('salesforce.push_limit', 50);
\Drupal::state()->set('salesforce.push_limit', PushQueue::DEFAULT_CRON_PUSH_LIMIT);
}
/**
......
......@@ -197,5 +197,5 @@ function salesforce_push_enqueue_async(EntityInterface $entity, SalesforceMappin
* Implements hook_cron().
*/
function salesforce_push_cron() {
\Drupal::service('queue.salesforce')->processQueue();
\Drupal::service('queue.salesforce')->processQueues();
}
services:
queue.salesforce:
class: Drupal\salesforce_push\PushQueue
arguments: ['@database']
arguments: ['@database', '@state']
......@@ -2,11 +2,12 @@
namespace Drupal\salesforce_push;
use Drupal\Core\Queue\DatabaseQueue;
use Drupal\Core\Database\Connection;
use Drupal\Core\Database\SchemaObjectExistsException;
use Drupal\Core\DependencyInjection\DependencySerializationTrait;
use Drupal\Core\Database\Query\Merge;
use Drupal\Core\Queue\DatabaseQueue;
use Drupal\Core\State\State;
/**
* Salesforce push queue.
......@@ -20,18 +21,34 @@ class PushQueue extends DatabaseQueue {
*/
const TABLE_NAME = 'salesforce_push_queue';
const DEFAULT_CRON_PUSH_LIMIT = 200;
protected $limit;
protected $connection;
protected $state;
/**
* Constructs a \Drupal\Core\Queue\DatabaseQueue object.
*
* @param \Drupal\Core\Database\Connection $connection
* The Connection object containing the key-value tables.
*/
function __construct(Connection $connection) {
public function __construct(Connection $connection, State $state) {
$this->connection = $connection;
$this->state = $state;
$this->limit = $state->get('salesforce.push_limit', self::DEFAULT_CRON_PUSH_LIMIT);
}
/**
* Parent class DatabaseQueue relies heavily on $this->name, so it's best to
* just set the value appropriately.
*
* @param string $name
*
* @return $this
*/
public function setName($name) {
$this->name = $name;
return $this;
}
/**
......@@ -47,6 +64,7 @@ class PushQueue extends DatabaseQueue {
* On success, Drupal\Core\Database\Query\Merge::STATUS_INSERT or Drupal\Core\Database\Query\Merge::STATUS_UPDATE
*
* @throws Exception if the required indexes are not provided.
*
* @TODO convert $data to a proper class and make sure that's what we get for this argument.
*/
protected function doCreateItem($data) {
......@@ -87,7 +105,7 @@ class PushQueue extends DatabaseQueue {
public function claimItems($n, $lease_time = 30) {
while (TRUE) {
try {
$items = $this->connection->queryRange('SELECT name, entity_id, op, created, item_id FROM {' . static::TABLE_NAME . '} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, $n, array(':name' => $this->name))->fetchAllAssoc('entity_id');
$items = $this->connection->queryRange('SELECT * FROM {' . static::TABLE_NAME . '} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, $n, array(':name' => $this->name))->fetchAllAssoc('entity_id');
}
catch (\Exception $e) {
$this->catchException($e);
......@@ -120,6 +138,50 @@ class PushQueue extends DatabaseQueue {
}
}
/**
* {@inheritdoc}
*/
public function claimItem($lease_time = 30) {
// Claim an item by updating its expire fields. If claim is not successful
// another thread may have claimed the item in the meantime. Therefore loop
// until an item is successfully claimed or we are reasonably sure there
// are no unclaimed items left.
while (TRUE) {
try {
$item = $this->connection->queryRange('SELECT * FROM {' . static::TABLE_NAME . '} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, 1, array(':name' => $this->name))->fetchObject();
}
catch (\Exception $e) {
$this->catchException($e);
// If the table does not exist there are no items currently available to
// claim.
return FALSE;
}
if ($item) {
// Try to update the item. Only one thread can succeed in UPDATEing the
// same row. We cannot rely on REQUEST_TIME because items might be
// claimed by a single consumer which runs longer than 1 second. If we
// continue to use REQUEST_TIME instead of the current time(), we steal
// time from the lease, and will tend to reset items before the lease
// should really expire.
$update = $this->connection->update(static::TABLE_NAME)
->fields(array(
'expire' => time() + $lease_time,
))
->condition('item_id', $item->item_id)
->condition('expire', 0);
// If there are affected rows, this update succeeded.
if ($update->execute()) {
return $item;
}
}
else {
// No items currently available to claim.
return FALSE;
}
}
}
/**
* Defines the schema for the queue table.
*/
......@@ -153,6 +215,12 @@ class PushQueue extends DatabaseQueue {
'default' => '',
'description' => 'The operation which triggered this push',
],
'failures' => [
'type' => 'int',
'not null' => TRUE,
'default' => 0,
'description' => 'Number of failed push attempts for this queue item.',
],
'expire' => [
'type' => 'int',
'not null' => TRUE,
......@@ -189,26 +257,80 @@ class PushQueue extends DatabaseQueue {
*/
public function processQueues() {
$mappings = salesforce_push_load_push_mappings();
$i = 0;
foreach ($mappings as $mapping) {
// @TODO: Implement a global limit for REST async. Limit per mapping doesn't make sense here since we're doing one entry at a time.
$this->setName($mapping->id());
// @TODO this is where we would be branching for SOAP vs REST async push. How to encapsulate this? Delegate to queue worker?
while ($item = $this->claimItem()) {
// @TODO: eventually this should work as follows:
// - New plugin type "PushQueueProcessor"
// -- Differs from QueueWorker plugin, because it can choose how to process an entire queue.
// -- Allows SoapQueueProcessor to optimize queries by processing multiple queue items at once.
// -- RestQueueProcessor will still do one-at-a-time.
// - Hand the mapping id (queue name) to the queue processor and let it do its thing
while (TRUE) {
if ($this->limit && $i++ > $this->limit) {
// Global limit is a hard stop. We're done processing now.
// @TODO some logging about how many items were processed, etc.
return;
}
$item = $this->claimItem();
if (!$item) {
// Ran out of items in this queue. Move on to the next one.
break;
}
try {
$entity = \Drupal::entityTypeManager()
->getStorage($mapping->get('drupal_entity_type'))
->load($item->entity_id);
if (!$entity) {
throw new Exception();
}
}
catch (Exception $e) {
// If there was an exception loading the entity, we assume that this queue item is no longer relevant.
\Drupal::logger('Salesforce Push')->notice($e->getMessage() .
' Exception while loading entity %type %id for salesforce mapping %mapping. Queue item deleted.',
[
'%type' => $mapping->get('drupal_entity_type'),
'%id' => $item->entity_id,
'%mapping' => $mapping->id(),
]
);
$item->delete();
}
// @TODO this doesn't feel right. Where should this go?
try {
salesforce_push_sync_rest($entity, $mapping, $item->op);
$this->deleteItem($item);
\Drupal::logger('Salesforce Push')->notice('Entity %type %id for salesforce mapping %mapping pushed successfully.',
[
'%type' => $mapping->get('drupal_entity_type'),
'%id' => $item->entity_id,
'%mapping' => $mapping->id(),
]
);
}
catch (Exception $e) {
// @TODO on Exception, mapped object was unable to be created or updated, and operation was not undertaken.
// If mapped does not exist, and this is a delete operation, we can delete this queue item.
// Otherwise, return item to queue and increment failure count.
// After N failures, move to perma fail table.
$item->failure++;
\Drupal::logger('Salesforce Push')->notice($e->getMessage() .
' Exception while pushing entity %type %id for salesforce mapping %mapping. Queue item %item failed %fail times.',
[
'%type' => $mapping->get('drupal_entity_type'),
'%id' => $item->entity_id,
'%mapping' => $mapping->id(),
'%item' => $item->item_id,
'%fail' => $item->failure,
]
);
// doCreateItem() doubles as "save" function.
$item->doCreateItem(get_object_vars($item));
$this->releaseItem($item);
// @TODO: push queue processor plugins will have to implement some error tolerance:
// - If mapped object does not exist, and this is a delete operation, we can delete this queue item.
// - Otherwise, return item to queue and increment failure count.
// - After N failures, move to perma fail table.
}
}
}
......
......@@ -46,7 +46,7 @@ class RestResponse extends Response {
*/
function __get($key) {
if (!property_exists($this, $key)) {
throw new Exception("Undefined property $key");
throw new \Exception("Undefined property $key");
}
return $this->$key;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment