Newer
Older

Aaron Bauman
committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
<?php
namespace Drupal\salesforce_pull\Controller;
use Drupal\Component\Datetime\Time;
use Drupal\Core\Config\ConfigFactoryInterface;
use Drupal\Core\Controller\ControllerBase;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Queue\QueueWorkerManagerInterface;
use Drupal\Core\Queue\RequeueException;
use Drupal\Core\Queue\SuspendQueueException;
use Drupal\Core\State\StateInterface;
use Drupal\salesforce\Event\SalesforceEvents;
use Drupal\salesforce\Event\SalesforceNoticeEvent;
use Drupal\salesforce\SFID;
use Drupal\salesforce_mapping\Entity\SalesforceMappingInterface;
use Drupal\salesforce_pull\DeleteHandler;
use Drupal\salesforce_pull\QueueHandler;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\HttpFoundation\RedirectResponse;
use Symfony\Component\HttpFoundation\RequestStack;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException;
/**
* Push controller.
*/
class PullController extends ControllerBase {
const DEFAULT_TIME_LIMIT = 30;
/**
* Pull queue handler service.
*
* @var \Drupal\salesforce_pull\QueueHandler
*/
protected $queueHandler;
/**
* Pull delete handler service.
*
* @var \Drupal\salesforce_pull\DeleteHandler
*/
protected $deleteHandler;
/**
* Mapping storage.
*
* @var \Drupal\Core\Entity\EntityStorageInterface
*/
protected $mappingStorage;
/**
* Queue factory service.
*
* @var \Drupal\Core\Queue\QueueFactory
*/
protected $queueService;
/**
* Queue worker manager.
*
* @var \Drupal\Core\Queue\QueueWorkerManagerInterface
*/
protected $queueWorkerManager;
/**
* Event dispatcher.
*
* @var \Symfony\Component\EventDispatcher\EventDispatcherInterface
*/
protected $eventDispatcher;
/**
* Time.
*
* @var \Drupal\Component\Datetime\Time
*/
protected $time;
/**
* Current Request.
*
* @var \Symfony\Component\HttpFoundation\Request
*/
protected $request;
/**
* PushController constructor.
*
* @throws \Drupal\Component\Plugin\Exception\InvalidPluginDefinitionException
* @throws \Drupal\Component\Plugin\Exception\PluginNotFoundException
*/
public function __construct(QueueHandler $queueHandler, DeleteHandler $deleteHandler, EntityTypeManagerInterface $etm, ConfigFactoryInterface $configFactory, StateInterface $stateService, QueueFactory $queueService, QueueWorkerManagerInterface $queueWorkerManager, EventDispatcherInterface $eventDispatcher, Time $time, RequestStack $requestStack) {

Aaron Bauman
committed
$this->queueHandler = $queueHandler;
$this->deleteHandler = $deleteHandler;
$this->mappingStorage = $etm->getStorage('salesforce_mapping');
$this->configFactory = $configFactory;
$this->stateService = $stateService;

Aaron Bauman
committed
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
$this->queueService = $queueService;
$this->queueWorkerManager = $queueWorkerManager;
$this->eventDispatcher = $eventDispatcher;
$this->time = $time;
$this->request = $requestStack->getCurrentRequest();
}
/**
* {@inheritdoc}
*/
public static function create(ContainerInterface $container) {
return new static(
$container->get('salesforce_pull.queue_handler'),
$container->get('salesforce_pull.delete_handler'),
$container->get('entity_type.manager'),
$container->get('config.factory'),
$container->get('state'),
$container->get('queue'),
$container->get('plugin.manager.queue_worker'),
$container->get('event_dispatcher'),
$container->get('datetime.time'),
$container->get('request_stack')
);
}
/**
* Page callback to process push queue for a given mapping.
*/
public function endpoint(SalesforceMappingInterface $salesforce_mapping = NULL, $key = NULL, $id = NULL) {
// If standalone for this mapping is disabled, and global standalone is
// disabled, then "Access Denied" for this mapping.
if ($key != $this->stateService->get('system.cron_key')) {

Aaron Bauman
committed
throw new AccessDeniedHttpException();
}
$global_standalone = $this->config('salesforce.settings')->get('standalone');
if (!$salesforce_mapping && !$global_standalone) {
throw new AccessDeniedHttpException();
}
if ($salesforce_mapping && !$salesforce_mapping->doesPullStandalone() && !$global_standalone) {
throw new AccessDeniedHttpException();
}
if ($id) {
try {
$id = new SFID($id);
}
catch (\Exception $e) {
throw new AccessDeniedHttpException();
}
}
$this->populateQueue($salesforce_mapping, $id);
$this->processQueue();
if ($this->request->get('destination')) {
return new RedirectResponse($this->request->get('destination'));
}
return new Response('', 204);
}
/**
* Helper method to populate queue, optionally by mapping or a single record.
*/

Aaron Bauman
committed
protected function populateQueue(SalesforceMappingInterface $mapping = NULL, SFID $id = NULL) {
$mappings = [];
if ($id) {
return $this->queueHandler->getSingleUpdatedRecord($mapping, $id, TRUE);
}
if ($mapping != NULL) {
$mappings[] = $mapping;
}
else {

Edith F
committed
$mappings = $this->mappingStorage->loadByProperties(["pull_standalone" => TRUE]);

Aaron Bauman
committed
}
foreach ($mappings as $mapping) {
$this->queueHandler->getUpdatedRecordsForMapping($mapping);
}
}
/**
* Helper method to get queue processing time limit.
*/

Aaron Bauman
committed
protected function getTimeLimit() {
return self::DEFAULT_TIME_LIMIT;
}

Aaron Bauman
committed
protected function processQueue() {

Aaron Bauman
committed
$worker = $this->queueWorkerManager->createInstance(QueueHandler::PULL_QUEUE_NAME);
$end = time() + $this->getTimeLimit();
$queue = $this->queueService->get(QueueHandler::PULL_QUEUE_NAME);
$count = 0;
while ((!$this->getTimeLimit() || time() < $end) && ($item = $queue->claimItem())) {
try {
$this->eventDispatcher->dispatch(SalesforceEvents::NOTICE, new SalesforceNoticeEvent(NULL, 'Processing item @id from @name queue.', ['@name' => QueueHandler::PULL_QUEUE_NAME, '@id' => $item->item_id]));
$worker->processItem($item->data);
$queue->deleteItem($item);
$count++;
}
catch (RequeueException $e) {
// The worker requested the task to be immediately requeued.
$queue->releaseItem($item);
}
catch (SuspendQueueException $e) {
// If the worker indicates there is a problem with the whole queue,
// release the item.
$queue->releaseItem($item);
throw new \Exception($e->getMessage());
}
}
$elapsed = microtime(TRUE) - $start;
$this->eventDispatcher->dispatch(SalesforceEvents::NOTICE, new SalesforceNoticeEvent(NULL, 'Processed @count items from the @name queue in @elapsed sec.', [
'@count' => $count,
'@name' => QueueHandler::PULL_QUEUE_NAME,
'@elapsed' => round($elapsed, 2),
]));

Aaron Bauman
committed
}