Skip to content
Snippets Groups Projects
Commit 281bd09c authored by Lidia Matei's avatar Lidia Matei Committed by Yas Naoi
Browse files

Issue #3247232 by XLD, yas, Xiaohua Guan: Add CloudClusterWorkerService

parent 6055515e
No related branches found
No related tags found
No related merge requests found
......@@ -32,172 +32,8 @@ function cloud_cluster_worker_help($route_name, RouteMatchInterface $route_match
* Implements hook_cron().
*/
function cloud_cluster_worker_cron(): void {
cloud_cluster_worker_sync_resources();
cloud_cluster_worker_receive_operations();
}
/**
* Synchronize cloud entity resources.
*/
function cloud_cluster_worker_sync_resources(): void {
$logger = \Drupal::logger('cloud_cluster_worker');
$logger->info(t('Starting execution of cloud_cluster_worker_sync_resources()'));
if (!cloud_cluster_worker_is_configured()) {
$logger->warning(t('The cloud cluster worker is not configured.'));
return;
}
$entity_type_definitions = \Drupal::entityTypeManager()->getDefinitions();
$resources_queue = \Drupal::service('queue')->get('cloud_cluster_worker_sync_resources_queue');
$serializer = \Drupal::service('serializer');
$manager = \Drupal::service('plugin.manager.cloud_config_plugin');
$definitions = $manager->getDefinitions();
foreach ($definitions ?: [] as $key => $definition) {
// Ignore definitions like "aws_cloud.cloud_config:1" whose base_plugin
// is undefined. And because the value of $definition['base_plugin'] is
// empty string, the "empty" function cannot be used in the condition.
if (!isset($definition['base_plugin'])) {
continue;
}
$configs = $manager->createInstance($key)->loadConfigEntities();
$cloud_name = substr($key, 0, strlen($key) - strlen('.cloud_config'));
// Skip cloud cluster.
if ($cloud_name === 'cloud_cluster') {
continue;
}
foreach ($configs ?: [] as $config) {
if ($config->isRemote()) {
continue;
}
$resources_queue->createItem([
'cloud_context' => $config->getCloudContext(),
'resource_jsons' => [$serializer->serialize($config, 'json')],
'entity_type' => 'cloud_config',
]);
// Loop for entity types, such as aws_cloud_instance, aws_cloud_image.
foreach ($entity_type_definitions ?: [] as $entity_type_key => $entity_type_definition) {
if (strpos($entity_type_key, $cloud_name) !== 0) {
continue;
}
$resources = \Drupal::entityTypeManager()
->getStorage($entity_type_key)
->loadByProperties([
'cloud_context' => $config->getCloudContext(),
]);
$resource_jsons = [];
foreach ($resources ?: [] as $resource) {
$resource_jsons[] = $serializer->serialize($resource, 'json');
}
$resources_queue->createItem([
'cloud_context' => $config->getCloudContext(),
'resource_jsons' => $resource_jsons,
'entity_type' => $entity_type_key,
]);
}
}
}
}
/**
* Receive operations from the master site.
*/
function cloud_cluster_worker_receive_operations(): void {
$logger = \Drupal::logger('cloud_cluster_worker');
$logger->info(t('Starting execution of cloud_cluster_worker_receive_operations()'));
if (!cloud_cluster_worker_is_configured()) {
$logger->warning(t('The cloud cluster worker is not configured.'));
return;
}
$config = \Drupal::service('config.factory')->get('cloud_cluster_worker.settings');
$logger = \Drupal::logger('cloud_cluster_worker');
try {
$headers = [
'Authorization' => 'Bearer ' . base64_encode($config->get('cloud_cluster_worker_token')),
'Content-Type' => 'application/json',
];
$endpoint_url = $config->get('cloud_cluster_worker_api_endpoint');
// Remove the '/' if the endpoint url ends with '/'.
if (substr($endpoint_url, -1) === '/') {
$endpoint_url = substr($endpoint_url, 0, strlen($endpoint_url) - 1);
}
$cloud_cluster_worker_name = $config->get('cloud_cluster_worker_id');
$response = \Drupal::service('http_client')->post(
$endpoint_url . '/cloud_cluster/operation',
[
'body' => json_encode([
'cloud_cluster_worker_name' => $cloud_cluster_worker_name,
]),
'headers' => $headers,
]
);
if ($response->getStatusCode() !== 200) {
$logger->error(!empty($output['error']) ? $output['error'] : t('Unknown error.'));
return;
}
$output = json_decode($response->getBody()->getContents(), TRUE);
$logger->debug(json_encode($output));
foreach ($output ?: [] as $item) {
$cloud_context = $item['cloud_context'];
if (($pos = strrpos($cloud_context, '_')) !== FALSE) {
$cloud_context = substr($cloud_context, 0, $pos);
}
// Run operation.
$service_name = $item['service_name'];
$service = \Drupal::service($service_name);
$service->setCloudContext($cloud_context);
$method_name = lcfirst($item['operation']);
if (!method_exists($service, $method_name)) {
$logger->error(t('The method @method_name does not exist.', [
'@method_name' => $method_name,
]));
continue;
}
call_user_func_array(
[$service, $method_name],
$item['params']
);
if (empty($item['update_entities_method'])) {
continue;
}
$update_entities_method = $item['update_entities_method'];
if (!method_exists($service, $update_entities_method)) {
continue;
}
// Update entities.
call_user_func_array(
[$service, $update_entities_method],
$item['update_entities_method_params']
);
}
}
catch (\Exception $error) {
$logger->error($error->getMessage());
}
\Drupal::service('cloud_cluster_worker')->syncResources();
\Drupal::service('cloud_cluster_worker')->receiveOperations();
}
/**
......@@ -231,7 +67,7 @@ function cloud_cluster_worker_entity_presave(EntityInterface $entity) {
return;
}
\Drupal::service('cloud_cluster_worker.sync_resources')->send(
\Drupal::service('cloud_cluster_worker')->send(
$entity->getCloudContext(),
$entity_type_id,
[\Drupal::service('serializer')->serialize($entity, 'json')],
......
services:
cloud_cluster_worker.sync_resources:
class: Drupal\cloud_cluster_worker\Service\SyncResourcesService
arguments: ['@logger.factory', '@config.factory', '@http_client']
cloud_cluster_worker:
class: Drupal\cloud_cluster_worker\Service\CloudClusterWorkerService
arguments: ['@entity_type.manager', '@config.factory', '@http_client', '@queue', '@serializer', '@plugin.manager.cloud_config_plugin', '@logger.factory']
......@@ -2,7 +2,7 @@
namespace Drupal\cloud_cluster_worker\Plugin\QueueWorker;
use Drupal\cloud_cluster_worker\Service\SyncResourcesService;
use Drupal\cloud_cluster_worker\Service\CloudClusterWorkerServiceInterface;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\Queue\QueueWorkerBase;
use Symfony\Component\DependencyInjection\ContainerInterface;
......@@ -21,9 +21,9 @@ class CloudClusterWorkerSyncResourcesQueueWorker extends QueueWorkerBase impleme
/**
* The resources synchronization service.
*
* @var \Drupal\cloud_cluster_worker\Service\SyncResourcesService
* @var \Drupal\cloud_cluster_worker\Service\CloudClusterWorkerService
*/
protected $syncResources;
protected $clusterWorker;
/**
* Constructs a new LocaleTranslation object.
......@@ -34,18 +34,18 @@ class CloudClusterWorkerSyncResourcesQueueWorker extends QueueWorkerBase impleme
* The plugin_id for the plugin instance.
* @param array $plugin_definition
* The plugin implementation definition.
* @param \Drupal\cloud_cluster_worker\Service\SyncResourcesService $sync_resources
* @param \Drupal\cloud_cluster_worker\Service\CloudClusterWorkerServiceInterface $cluster_worker
* The resources synchronization service.
*/
public function __construct(
array $configuration,
$plugin_id,
array $plugin_definition,
SyncResourcesService $sync_resources
CloudClusterWorkerServiceInterface $cluster_worker
) {
parent::__construct($configuration, $plugin_id, $plugin_definition);
$this->syncResources = $sync_resources;
$this->clusterWorker = $cluster_worker;
}
/**
......@@ -56,7 +56,7 @@ class CloudClusterWorkerSyncResourcesQueueWorker extends QueueWorkerBase impleme
$configuration,
$plugin_id,
$plugin_definition,
$container->get('cloud_cluster_worker.sync_resources')
$container->get('cloud_cluster_worker')
);
}
......@@ -64,7 +64,7 @@ class CloudClusterWorkerSyncResourcesQueueWorker extends QueueWorkerBase impleme
* {@inheritdoc}
*/
public function processItem($data): void {
$this->syncResources->send($data['cloud_context'], $data['entity_type'], $data['resource_jsons']);
$this->clusterWorker->sendSyncRequest($data['cloud_context'], $data['entity_type'], $data['resource_jsons']);
}
}
<?php
namespace Drupal\cloud_cluster_worker\Service;
use Drupal\cloud\Plugin\cloud\config\CloudConfigPluginManagerInterface;
use Drupal\cloud\Service\CloudServiceBase;
use Drupal\Core\Config\ConfigFactoryInterface;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Logger\LoggerChannelFactoryInterface;
use Drupal\Core\Queue\QueueFactory;
use GuzzleHttp\ClientInterface;
use Symfony\Component\Serializer\Serializer;
/**
* Provides the service for the cloud_cluster_worker.
*/
class CloudClusterWorkerService extends CloudServiceBase implements CloudClusterWorkerServiceInterface {
/**
* Drupal\Core\Entity\EntityTypeManagerInterface definition.
*
* @var \Drupal\Core\Entity\EntityTypeManagerInterface
*/
protected $entityTypeManager;
/**
* The config factory.
*
* Subclasses should use the self::config() method, which may be overridden to
* address specific needs when loading config, rather than this property
* directly. See \Drupal\Core\Form\ConfigFormBase::config() for an example of
* this.
*
* @var \Drupal\Core\Config\ConfigFactoryInterface
*/
protected $configFactory;
/**
* Guzzle client.
*
* @var \GuzzleHttp\Client
*/
protected $httpClient;
/**
* The queue factory.
*
* @var \Drupal\Core\Queue\QueueFactory
*/
protected $queueFactory;
/**
* The serializer object.
*
* @var \Symfony\Component\Serializer\Serializer
*/
protected $serializer;
/**
* The cloud service provider plugin manager (CloudConfigPluginManager).
*
* @var \Drupal\cloud\Plugin\cloud\config\CloudConfigPluginManagerInterface
*/
protected $cloudConfigPluginManager;
/**
* A logger instance.
*
* @var \Psr\Log\LoggerInterface
*/
protected $logger;
/**
* Constructs a new CloudClusterWorkerService object.
*
* @param \Drupal\Core\Entity\EntityTypeManagerInterface $entity_type_manager
* An entity type manager instance.
* @param \Drupal\Core\Config\ConfigFactoryInterface $config_factory
* A configuration factory.
* @param \GuzzleHttp\ClientInterface $http_client
* The Guzzle Http client.
* @param \Drupal\Core\Queue\QueueFactory $queue_factory
* The queue factory.
* @param \Symfony\Component\Serializer\Serializer $serializer
* The serializer object.
* @param \Drupal\cloud\Plugin\cloud\config\CloudConfigPluginManagerInterface $cloud_config_plugin_manager
* The cloud service provider plugin manager (CloudConfigPluginManager).
* @param \Drupal\Core\Logger\LoggerChannelFactoryInterface $logger_factory
* A logger factory instance.
*/
public function __construct(
EntityTypeManagerInterface $entity_type_manager,
ConfigFactoryInterface $config_factory,
ClientInterface $http_client,
QueueFactory $queue_factory,
Serializer $serializer,
CloudConfigPluginManagerInterface $cloud_config_plugin_manager,
LoggerChannelFactoryInterface $logger_factory
) {
// The parent constructor takes care of $this->messenger object.
parent::__construct();
// Set up the entity type manager for querying entities.
$this->entityTypeManager = $entity_type_manager;
$this->configFactory = $config_factory;
$this->httpClient = $http_client;
$this->queueFactory = $queue_factory;
$this->serializer = $serializer;
$this->cloudConfigPluginManager = $cloud_config_plugin_manager;
$this->logger = $logger_factory->get('cloud_cluster_worker');
}
/**
* {@inheritdoc}
*/
public function receiveOperations(): void {
$this->logger->info($this->t('Starting execution of CloudClusterWorkerService->receiveOperations()'));
if (!cloud_cluster_worker_is_configured()) {
$this->logger->warning($this->t('The cloud cluster worker is not configured.'));
return;
}
$config = $this->configFactory->get('cloud_cluster_worker.settings');
try {
$headers = [
'Authorization' => 'Bearer ' . base64_encode($config->get('cloud_cluster_worker_token')),
'Content-Type' => 'application/json',
];
$endpoint_url = $config->get('cloud_cluster_worker_api_endpoint');
// Remove the '/' if the endpoint url ends with '/'.
if (substr($endpoint_url, -1) === '/') {
$endpoint_url = substr($endpoint_url, 0, -1);
}
$cloud_cluster_worker_name = $config->get('cloud_cluster_worker_id');
$response = $this->httpClient->post(
$endpoint_url . '/cloud_cluster/operation',
[
'body' => json_encode([
'cloud_cluster_worker_name' => $cloud_cluster_worker_name,
], JSON_THROW_ON_ERROR),
'headers' => $headers,
]
);
if ($response->getStatusCode() !== 200) {
$this->logger->error(!empty($output['error']) ? $output['error'] : $this->t('Unknown error.'));
return;
}
$output = json_decode($response->getBody()
->getContents(), TRUE, 512, JSON_THROW_ON_ERROR);
foreach ($output ?: [] as $item) {
$cloud_context = $item['cloud_context'];
if (($pos = strrpos($cloud_context, '_')) !== FALSE) {
$cloud_context = substr($cloud_context, 0, $pos);
}
// Run operation.
$service_name = $item['service_name'];
$service = \Drupal::service($service_name);
$service->setCloudContext($cloud_context);
$method_name = lcfirst($item['operation']);
if (!method_exists($service, $method_name)) {
$this->logger->error($this->t('The method @method_name does not exist.', [
'@method_name' => $method_name,
]));
continue;
}
call_user_func_array(
[$service, $method_name],
$item['params']
);
if (empty($item['update_entities_method'])) {
continue;
}
$update_entities_method = $item['update_entities_method'];
if (!method_exists($service, $update_entities_method)) {
continue;
}
// Update entities.
call_user_func_array(
[$service, $update_entities_method],
$item['update_entities_method_params']
);
}
}
catch (\Exception $e) {
$this->logger->error($e->getMessage());
}
}
/**
* {@inheritdoc}
*/
public function syncResources(): void {
$this->logger->info($this->t('Starting execution of CloudClusterWorkerService->syncResources().'));
if (!cloud_cluster_worker_is_configured()) {
$this->logger->warning($this->t('The cloud cluster worker is not configured.'));
return;
}
$entity_type_definitions = $this->entityTypeManager->getDefinitions();
$resources_queue = $this->queueFactory->get('cloud_cluster_worker_sync_resources_queue');
$definitions = $this->cloudConfigPluginManager->getDefinitions();
foreach ($definitions ?: [] as $key => $definition) {
// Ignore definitions like "aws_cloud.cloud_config:1" whose base_plugin
// is undefined. And because the value of $definition['base_plugin'] is
// empty string, the "empty" function cannot be used in the condition.
if (!isset($definition['base_plugin'])) {
continue;
}
$configs = $this->cloudConfigPluginManager->createInstance($key)->loadConfigEntities();
$cloud_name = substr($key, 0, strlen($key) - strlen('.cloud_config'));
// Skip cloud cluster.
if ($cloud_name === 'cloud_cluster') {
continue;
}
foreach ($configs ?: [] as $config) {
if ($config->isRemote()) {
continue;
}
$resources_queue->createItem([
'cloud_context' => $config->getCloudContext(),
'resource_jsons' => [$this->serializer->serialize($config, 'json')],
'entity_type' => 'cloud_config',
]);
// Loop for entity types, such as aws_cloud_instance, aws_cloud_image.
foreach ($entity_type_definitions ?: [] as $entity_type_key => $entity_type_definition) {
if (strpos($entity_type_key, $cloud_name) !== 0) {
continue;
}
$resources = $this->entityTypeManager
->getStorage($entity_type_key)
->loadByProperties([
'cloud_context' => $config->getCloudContext(),
]);
$resource_jsons = [];
foreach ($resources ?: [] as $resource) {
$resource_jsons[] = $this->serializer->serialize($resource, 'json');
}
$resources_queue->createItem([
'cloud_context' => $config->getCloudContext(),
'resource_jsons' => $resource_jsons,
'entity_type' => $entity_type_key,
]);
}
}
}
}
/**
* {@inheritdoc}
*/
public function sendSyncRequest($cloud_context, $entity_type, array $resource_jsons, $clear = TRUE): void {
$config = $this->configFactory
->get('cloud_cluster_worker.settings');
try {
$headers = [
'Authorization' => 'Bearer ' . base64_encode($config->get('cloud_cluster_worker_token')),
'Content-Type' => 'application/json',
];
$endpoint_url = $config->get('cloud_cluster_worker_api_endpoint');
if (empty($endpoint_url)) {
return;
}
// Remove the '/' if the endpoint url ends with '/'.
if (substr($endpoint_url, -1) === '/') {
$endpoint_url = substr($endpoint_url, 0, -1);
}
$cloud_cluster_worker_name = $config->get('cloud_cluster_worker_id');
$response = $this->httpClient->post(
$endpoint_url . self::API_PATH,
[
'body' => json_encode([
'cloud_cluster_worker_name' => $cloud_cluster_worker_name,
'cloud_context' => $cloud_context . '_' . substr($cloud_cluster_worker_name, -self::CLOUD_CONTEXT_UUID_LENGTH),
'entity_type' => $entity_type,
'resource_jsons' => $resource_jsons,
'clear' => $clear,
], JSON_THROW_ON_ERROR),
'headers' => $headers,
]
);
$output = json_decode($response->getBody()
->getContents(), TRUE, 512, JSON_THROW_ON_ERROR);
if ($response->getStatusCode() !== 200) {
$this->logger->error($output['error'] ?? $this->t('Unknown error.'));
}
}
catch (\Exception $error) {
$this->logger->error($error->getMessage());
}
}
}
......@@ -3,14 +3,24 @@
namespace Drupal\cloud_cluster_worker\Service;
/**
* Provides resources synchronization service interface.
* Interface for the CloudClusterWorkerService.
*/
interface SyncResourcesServiceInterface {
interface CloudClusterWorkerServiceInterface {
const API_PATH = '/cloud_cluster/sync';
const CLOUD_CONTEXT_UUID_LENGTH = 12;
/**
* Receive operations from the master site.
*/
public function receiveOperations(): void;
/**
* Synchronize entity resources.
*/
public function syncResources(): void;
/**
* Send synchronization request.
*
......@@ -23,6 +33,6 @@ interface SyncResourcesServiceInterface {
* @param bool $clear
* Whether clear other resources not included in $resource_jsons or not.
*/
public function send($cloud_context, $entity_type, array $resource_jsons, $clear = TRUE): void;
public function sendSyncRequest($cloud_context, $entity_type, array $resource_jsons, $clear = TRUE): void;
}
<?php
namespace Drupal\cloud_cluster_worker\Service;
use Drupal\cloud\Service\CloudServiceBase;
use Drupal\Core\Config\ConfigFactoryInterface;
use Drupal\Core\Logger\LoggerChannelFactoryInterface;
use GuzzleHttp\ClientInterface;
/**
* Provides resources synchronization service.
*/
class SyncResourcesService extends CloudServiceBase implements SyncResourcesServiceInterface {
/**
* A logger instance.
*
* @var \Psr\Log\LoggerInterface
*/
protected $logger;
/**
* Stores the configuration factory.
*
* @var \Drupal\Core\Config\ConfigFactoryInterface
*/
protected $configFactory;
/**
* Guzzle Http Client.
*
* @var \GuzzleHttp\ClientInterface
*/
protected $httpClient;
/**
* Constructs a new SyncResourcesService object.
*
* @param \Drupal\Core\Logger\LoggerChannelFactoryInterface $logger_factory
* A logger factory instance.
* @param \Drupal\Core\Config\ConfigFactoryInterface $config_factory
* A configuration factory.
* @param \GuzzleHttp\ClientInterface $http_client
* The Guzzle Http client.
*/
public function __construct(
LoggerChannelFactoryInterface $logger_factory,
ConfigFactoryInterface $config_factory,
ClientInterface $http_client
) {
// The parent constructor takes care of $this->messenger object.
parent::__construct();
$this->logger = $logger_factory->get('cloud_cluster_worker');
$this->configFactory = $config_factory;
$this->httpClient = $http_client;
}
/**
* {@inheritdoc}
*/
public function send($cloud_context, $entity_type, array $resource_jsons, $clear = TRUE): void {
$config = $this->configFactory
->get('cloud_cluster_worker.settings');
try {
$headers = [
'Authorization' => 'Bearer ' . base64_encode($config->get('cloud_cluster_worker_token')),
'Content-Type' => 'application/json',
];
$endpoint_url = $config->get('cloud_cluster_worker_api_endpoint');
if (empty($endpoint_url)) {
return;
}
// Remove the '/' if the endpoint url ends with '/'.
if (substr($endpoint_url, -1) === '/') {
$endpoint_url = substr($endpoint_url, 0, -1);
}
$cloud_cluster_worker_name = $config->get('cloud_cluster_worker_id');
$response = $this->httpClient->post(
$endpoint_url . self::API_PATH,
[
'body' => json_encode([
'cloud_cluster_worker_name' => $cloud_cluster_worker_name,
'cloud_context' => $cloud_context . '_' . substr($cloud_cluster_worker_name, -self::CLOUD_CONTEXT_UUID_LENGTH),
'entity_type' => $entity_type,
'resource_jsons' => $resource_jsons,
'clear' => $clear,
]),
'headers' => $headers,
]
);
$output = json_decode($response->getBody()->getContents(), TRUE);
if ($response->getStatusCode() !== 200) {
$this->logger->error($output['error'] ?? $this->t('Unknown error.'));
}
}
catch (\Exception $error) {
$this->logger->error($error->getMessage());
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment