Source code for wpull.application.tasks.plugin

import asyncio
import gettext
import inspect
import logging
import os
import re
from configparser import ConfigParser

from typing import cast
from yapsy.IPluginLocator import IPluginLocator
from yapsy.PluginFileLocator import PluginFileAnalyzerMathingRegex, \
    PluginFileLocator
from yapsy.PluginInfo import PluginInfo
from yapsy.PluginManager import PluginManager

from wpull.application.hook import HookableMixin
from wpull.application.plugin import WpullPlugin
from wpull.backport.logging import BraceMessage as __
from wpull.pipeline.pipeline import ItemTask, PipelineSeries
from wpull.pipeline.app import AppSession
from wpull.util import get_package_filename

_logger = logging.getLogger(__name__)
_ = gettext.gettext


[docs]class PluginLocator(IPluginLocator): def __init__(self, directories, paths): self._directories = directories self._paths = paths
[docs] def locatePlugins(self): candidates = [] for directory in self._directories: for filename in os.listdir(directory): path = os.path.join(directory, filename) if os.path.isfile(path) and self._is_plugin_filename(filename): info = self._plugin_info_from_path(path) candidates.append((path, path, info)) for filename in self._paths: info = self._plugin_info_from_path(filename) candidates.append((filename, filename, info)) return candidates, len(candidates)
@classmethod def _is_plugin_filename(cls, filename): return re.match('.+\.plugin\.py', filename) @classmethod def _plugin_info_from_path(cls, path): name = re.sub(r'[^\w]|:', '_', os.path.basename(path)) info = PluginInfo(name, path) return info
[docs] def gatherCorePluginInfo(self, directory, filename): info = self._plugin_info_from_path(os.path.join(directory, filename)) config = ConfigParser() config.add_section("Core") config.set("Core", "Name", info.name) config.set("Core", "Module", info.path) return info, config
[docs]class PluginSetupTask(ItemTask[AppSession]): @asyncio.coroutine
[docs] def process(self, session: AppSession): self._debug_log_registered_hooks(session) internal_plugin_path = get_package_filename(os.path.join('application', 'plugins')) plugin_locations = [internal_plugin_path] plugin_filenames = [] if session.args.plugin_script: plugin_filenames.append(session.args.plugin_script) locator = PluginLocator(plugin_locations, plugin_filenames) session.plugin_manager = PluginManager(plugin_locator=locator) session.plugin_manager.collectPlugins() for plugin_info in session.plugin_manager.getAllPlugins(): if plugin_info.path.startswith(internal_plugin_path): _logger.debug(__( _('Found plugin {name} from {filename}.'), filename=plugin_info.path, name=plugin_info.name )) else: _logger.info(__( _('Found plugin {name} from {filename}.'), filename=plugin_info.path, name=plugin_info.name )) plugin_info.plugin_object.app_session = session if plugin_info.plugin_object.should_activate(): session.plugin_manager.activatePluginByName(plugin_info.name) self._connect_plugin_hooks(session, plugin_info.plugin_object)
@classmethod def _get_candidate_instances(cls, session: AppSession): instances = list(session.factory.instance_map.values()) pipeline_series = cast(PipelineSeries, session.factory['PipelineSeries']) for pipeline in pipeline_series.pipelines: instances.extend(pipeline.tasks) return (instance for instance in instances if isinstance(instance, HookableMixin)) @classmethod def _connect_plugin_hooks(cls, session: AppSession, plugin_object: WpullPlugin): for instance in cls._get_candidate_instances(session): instance.connect_plugin(plugin_object) # TODO: raise error if any function is left unattached # raise RuntimeError('Plugin function ‘{function_name}’ could not be attached to plugin hook/event ‘{name}’.'.format(name=name, function_name=func)) @classmethod def _debug_log_registered_hooks(cls, session: AppSession): for instance in cls._get_candidate_instances(session): for event_name in instance.event_dispatcher: _logger.debug('Instance %s registered event %s', instance, event_name) for hook_name in instance.hook_dispatcher: _logger.debug('Instance %s registered hook %s', instance, hook_name)