Commit c1c71a69 authored by webchick's avatar webchick

Issue #2159369 by ParisLiakos: Move aggregator_refresh() and most of...

Issue #2159369 by ParisLiakos: Move aggregator_refresh() and most of ->removeItems() logic to a service.
parent 2edd9168
......@@ -6,7 +6,6 @@
*/
use Drupal\aggregator\FeedInterface;
use Drupal\Component\Plugin\Exception\PluginException;
/**
* Denotes that a feed's items should never expire.
......@@ -163,7 +162,9 @@ function aggregator_cron() {
function aggregator_queue_info() {
$queues['aggregator_feeds'] = array(
'title' => t('Aggregator refresh'),
'worker callback' => 'aggregator_refresh',
'worker callback' => function (FeedInterface $feed) {
$feed->refreshItems();
},
'cron' => array(
'time' => 60,
),
......@@ -171,92 +172,6 @@ function aggregator_queue_info() {
return $queues;
}
/**
* Checks a news feed for new items.
*
* @param \Drupal\aggregator\FeedInterface $feed
* An object describing the feed to be refreshed.
*/
function aggregator_refresh(FeedInterface $feed) {
// Store feed URL to track changes.
$feed_url = $feed->getUrl();
$config = \Drupal::config('aggregator.settings');
// Fetch the feed.
$fetcher_manager = \Drupal::service('plugin.manager.aggregator.fetcher');
try {
$success = $fetcher_manager->createInstance($config->get('fetcher'))->fetch($feed);
}
catch (PluginException $e) {
$success = FALSE;
watchdog_exception('aggregator', $e);
}
// Retrieve processor manager now.
$processor_manager = \Drupal::service('plugin.manager.aggregator.processor');
// Store instances in an array so we dont have to instantiate new objects.
$processor_instances = array();
foreach ($config->get('processors') as $processor) {
try {
$processor_instances[$processor] = $processor_manager->createInstance($processor);
}
catch (PluginException $e) {
watchdog_exception('aggregator', $e);
}
}
// We store the hash of feed data in the database. When refreshing a
// feed we compare stored hash and new hash calculated from downloaded
// data. If both are equal we say that feed is not updated.
$hash = hash('sha256', $feed->source_string);
if ($success && ($feed->getHash() != $hash)) {
// Parse the feed.
$parser_manager = \Drupal::service('plugin.manager.aggregator.parser');
try {
if ($parser_manager->createInstance($config->get('parser'))->parse($feed)) {
if ($feed->getWebsiteUrl()) {
$feed->setWebsiteUrl($feed->getUrl());
}
$feed->setHash($hash);
// Update feed with parsed data.
$feed->save();
// Log if feed URL has changed.
if ($feed->getUrl() != $feed_url) {
watchdog('aggregator', 'Updated URL for feed %title to %url.', array('%title' => $feed->label(), '%url' => $feed->getUrl()));
}
watchdog('aggregator', 'There is new syndicated content from %site.', array('%site' => $feed->label()));
drupal_set_message(t('There is new syndicated content from %site.', array('%site' => $feed->label())));
// If there are items on the feed, let enabled processors process them.
if (!empty($feed->items)) {
foreach ($processor_instances as $instance) {
$instance->process($feed);
}
}
}
}
catch (PluginException $e) {
watchdog_exception('aggregator', $e);
}
}
else {
drupal_set_message(t('There is no new syndicated content from %site.', array('%site' => $feed->label())));
}
// Regardless of successful or not, indicate that this feed has been checked.
$feed->setLastCheckedTime(REQUEST_TIME);
$feed->setQueuedTime(0);
$feed->save();
// Processing is done, call postProcess on enabled processors.
foreach ($processor_instances as $instance) {
$instance->postProcess($feed);
}
}
/**
* Loads an aggregator feed.
*
......
......@@ -8,3 +8,6 @@ services:
plugin.manager.aggregator.processor:
class: Drupal\aggregator\Plugin\AggregatorPluginManager
arguments: [processor, '@container.namespaces', '@cache.cache', '@language_manager', '@module_handler']
aggregator.items.importer:
class: Drupal\aggregator\ItemsImporter
arguments: ['@config.factory', '@plugin.manager.aggregator.fetcher', '@plugin.manager.aggregator.parser', '@plugin.manager.aggregator.processor']
......@@ -121,8 +121,10 @@ protected function buildPageList(array $items, $feed_source = '') {
* If the query token is missing or invalid.
*/
public function feedRefresh(FeedInterface $aggregator_feed) {
// @todo after https://drupal.org/node/1972246 find a new place for it.
aggregator_refresh($aggregator_feed);
$message = $aggregator_feed->refreshItems()
? $this->t('There is new syndicated content from %site.', array('%site' => $aggregator_feed->label()))
: $this->t('There is no new syndicated content from %site.', array('%site' => $aggregator_feed->label()));
drupal_set_message($message);
return $this->redirect('aggregator.admin_overview');
}
......
......@@ -63,10 +63,8 @@ public function label() {
* {@inheritdoc}
*/
public function deleteItems() {
$manager = \Drupal::service('plugin.manager.aggregator.processor');
foreach ($manager->getDefinitions() as $id => $definition) {
$manager->createInstance($id)->delete($this);
}
\Drupal::service('aggregator.items.importer')->delete($this);
// Reset feed.
$this->setLastCheckedTime(0);
$this->setHash('');
......@@ -77,6 +75,20 @@ public function deleteItems() {
return $this;
}
/**
* {@inheritdoc}
*/
public function refreshItems() {
$success = \Drupal::service('aggregator.items.importer')->refresh($this);
// Regardless of successful or not, indicate that it has been checked.
$this->setLastCheckedTime(REQUEST_TIME);
$this->setQueuedTime(0);
$this->save();
return $success;
}
/**
* {@inheritdoc}
*/
......@@ -94,10 +106,7 @@ public static function preCreate(EntityStorageControllerInterface $storage_contr
public static function preDelete(EntityStorageControllerInterface $storage_controller, array $entities) {
foreach ($entities as $entity) {
// Notify processors to delete stored items.
$manager = \Drupal::service('plugin.manager.aggregator.processor');
foreach ($manager->getDefinitions() as $id => $definition) {
$manager->createInstance($id)->delete($entity);
}
\Drupal::service('aggregator.items.importer')->delete($entity);
}
}
......
......@@ -218,9 +218,26 @@ public function setLastModified($modified);
/**
* Deletes all items from a feed.
*
* This will also reset the last checked and modified time of the feed and
* save it.
*
* @return \Drupal\aggregator\FeedInterface
* The class instance that this method is called on.
*
* @see \Drupal\aggregator\ItemsImporterInterface::delete()
*/
public function deleteItems();
/**
* Updates the feed items by triggering the import process.
*
* This will also update the last checked time of the feed and save it.
*
* @return bool
* TRUE if there is new content for the feed FALSE otherwise.
*
* @see \Drupal\aggregator\ItemsImporterInterface::refresh()
*/
public function refreshItems();
}
<?php
/**
* @file
* Contains \Drupal\aggregator\Entity\ItemsImporter.
*/
namespace Drupal\aggregator;
use Drupal\aggregator\Plugin\AggregatorPluginManager;
use Drupal\Component\Plugin\Exception\PluginException;
use Drupal\Core\Config\ConfigFactory;
/**
* Defines an importer of aggregator items.
*/
class ItemsImporter implements ItemsImporterInterface {
/**
* The aggregator fetcher manager.
*
* @var \Drupal\aggregator\Plugin\AggregatorPluginManager
*/
protected $fetcherManager;
/**
* The aggregator processor manager.
*
* @var \Drupal\aggregator\Plugin\AggregatorPluginManager
*/
protected $processorManager;
/**
* The aggregator parser manager.
*
* @var \Drupal\aggregator\Plugin\AggregatorPluginManager
*/
protected $parserManager;
/**
* The aggregator.settings config object.
*
* @var \Drupal\Core\Config\Config
*/
protected $config;
/**
* Constructs an Importer object.
*
* @param \Drupal\Core\Config\ConfigFactory $config_factory
* The factory for configuration objects.
* @param \Drupal\aggregator\Plugin\AggregatorPluginManager $fetcher_manager
* The aggregator fetcher plugin manager.
* @param \Drupal\aggregator\Plugin\AggregatorPluginManager $parser_manager
* The aggregator parser plugin manager.
* @param \Drupal\aggregator\Plugin\AggregatorPluginManager $processor_manager
* The aggregator processor plugin manager.
*/
public function __construct(ConfigFactory $config_factory, AggregatorPluginManager $fetcher_manager, AggregatorPluginManager $parser_manager, AggregatorPluginManager $processor_manager) {
$this->fetcherManager = $fetcher_manager;
$this->processorManager = $processor_manager;
$this->parserManager = $parser_manager;
$this->config = $config_factory->get('aggregator.settings');
}
/**
* {@inheritdoc}
*/
public function delete(FeedInterface $feed) {
foreach ($this->processorManager->getDefinitions() as $id => $definition) {
$this->processorManager->createInstance($id)->delete($feed);
}
}
/**
* {@inheritdoc}
*/
public function refresh(FeedInterface $feed) {
// Store feed URL to track changes.
$feed_url = $feed->getUrl();
// Fetch the feed.
try {
$success = $this->fetcherManager->createInstance($this->config->get('fetcher'))->fetch($feed);
}
catch (PluginException $e) {
$success = FALSE;
watchdog_exception('aggregator', $e);
}
// Store instances in an array so we dont have to instantiate new objects.
$processor_instances = array();
foreach ($this->config->get('processors') as $processor) {
try {
$processor_instances[$processor] = $this->processorManager->createInstance($processor);
}
catch (PluginException $e) {
watchdog_exception('aggregator', $e);
}
}
// We store the hash of feed data in the database. When refreshing a
// feed we compare stored hash and new hash calculated from downloaded
// data. If both are equal we say that feed is not updated.
$hash = hash('sha256', $feed->source_string);
$has_new_content = $success && ($feed->getHash() != $hash);
if ($has_new_content) {
// Parse the feed.
try {
if ($this->parserManager->createInstance($this->config->get('parser'))->parse($feed)) {
if ($feed->getWebsiteUrl()) {
$feed->setWebsiteUrl($feed->getUrl());
}
$feed->setHash($hash);
// Update feed with parsed data.
$feed->save();
// Log if feed URL has changed.
if ($feed->getUrl() != $feed_url) {
watchdog('aggregator', 'Updated URL for feed %title to %url.', array('%title' => $feed->label(), '%url' => $feed->getUrl()));
}
watchdog('aggregator', 'There is new syndicated content from %site.', array('%site' => $feed->label()));
// If there are items on the feed, let enabled processors process them.
if (!empty($feed->items)) {
foreach ($processor_instances as $instance) {
$instance->process($feed);
}
}
}
}
catch (PluginException $e) {
watchdog_exception('aggregator', $e);
}
}
// Processing is done, call postProcess on enabled processors.
foreach ($processor_instances as $instance) {
$instance->postProcess($feed);
}
return $has_new_content;
}
}
<?php
/**
* @file
* Contains \Drupal\aggregator\ItemsImporterInterface.
*/
namespace Drupal\aggregator;
/**
* Provides an interface defining an aggregator items importer.
*/
interface ItemsImporterInterface {
/**
* Updates the feed items by triggering the import process.
*
* This process can be slow and lengthy because it relies on network
* operations. Calling it on performance critical paths should be avoided.
*
* @param \Drupal\aggregator\FeedInterface $feed
* The feed which items should be refreshed.
*
* @return bool
* TRUE if there is new content for the feed FALSE otherwise.
*/
public function refresh(FeedInterface $feed);
/**
* Deletes all imported items from a feed.
*
* @param \Drupal\aggregator\FeedInterface $feed
* The feed that associated items should be deleted from.
*/
public function delete(FeedInterface $feed);
}
......@@ -39,7 +39,7 @@ function setUp() {
*/
function testRSS091Sample() {
$feed = $this->createFeed($this->getRSS091Sample());
aggregator_refresh($feed);
$feed->refreshItems();
$this->drupalGet('aggregator/sources/' . $feed->id());
$this->assertResponse(200, format_string('Feed %name exists.', array('%name' => $feed->label())));
$this->assertText('First example feed item title');
......@@ -61,7 +61,7 @@ function testRSS091Sample() {
*/
function testAtomSample() {
$feed = $this->createFeed($this->getAtomSample());
aggregator_refresh($feed);
$feed->refreshItems();
$this->drupalGet('aggregator/sources/' . $feed->id());
$this->assertResponse(200, format_string('Feed %name exists.', array('%name' => $feed->label())));
$this->assertText('Atom-Powered Robots Run Amok');
......@@ -75,7 +75,7 @@ function testAtomSample() {
*/
function testHtmlEntitiesSample() {
$feed = $this->createFeed($this->getHtmlEntitiesSample());
aggregator_refresh($feed);
$feed->refreshItems();
$this->drupalGet('aggregator/sources/' . $feed->id());
$this->assertResponse(200, format_string('Feed %name exists.', array('%name' => $feed->label())));
$this->assertRaw("Quote&quot; Amp&amp;");
......@@ -89,7 +89,7 @@ function testRedirectFeed() {
$invalid_url = url('aggregator/redirect', array('absolute' => TRUE));
$feed = entity_create('aggregator_feed', array('url' => $invalid_url));
$feed->save();
aggregator_refresh($feed);
$feed->refreshItems();
// Make sure that the feed URL was updated correctly.
$this->assertEqual($feed->getUrl(), url('aggregator/test-feed', array('absolute' => TRUE)));
......
......@@ -50,7 +50,7 @@ function testUpdateFeedItem() {
$fid = db_query("SELECT fid FROM {aggregator_feed} WHERE url = :url", array(':url' => $edit['url']))->fetchField();
$feed = aggregator_feed_load($fid);
aggregator_refresh($feed);
$feed->refreshItems();
$before = db_query('SELECT timestamp FROM {aggregator_item} WHERE fid = :fid', array(':fid' => $feed->id()))->fetchField();
// Sleep for 3 second.
......@@ -64,7 +64,7 @@ function testUpdateFeedItem() {
'modified' => 0,
))
->execute();
aggregator_refresh($feed);
$feed->refreshItems();
$after = db_query('SELECT timestamp FROM {aggregator_item} WHERE fid = :fid', array(':fid' => $feed->id()))->fetchField();
$this->assertTrue($before === $after, format_string('Publish timestamp of feed item was not updated (!before === !after)', array('!before' => $before, '!after' => $after)));
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment