Source code for wpull.application.app

# encoding=utf-8
'''Application main interface.'''
import enum
from collections import OrderedDict
import gettext
import logging
import platform
import signal

import asyncio

from wpull.backport.logging import StyleAdapter
from wpull.errors import ServerError, ExitStatus, ProtocolError, \
    SSLVerificationError, DNSNotFound, ConnectionRefused, NetworkError, \
    AuthenticationError
from wpull.application.hook import  HookStop, HookableMixin
from wpull.pipeline.pipeline import PipelineSeries

_logger = StyleAdapter(logging.getLogger(__name__))
_ = gettext.gettext


[docs]class ApplicationState(enum.Enum): ready = 'ready' running = 'running' stopping = 'stopping' stopped = 'stopped'
[docs]class Application(HookableMixin): '''Default non-interactive application user interface. This class manages process signals and displaying warnings. ''' ERROR_CODE_MAP = OrderedDict([ (AuthenticationError, ExitStatus.authentication_failure), (ServerError, ExitStatus.server_error), (ProtocolError, ExitStatus.protocol_error), (SSLVerificationError, ExitStatus.ssl_verification_error), (DNSNotFound, ExitStatus.network_failure), (ConnectionRefused, ExitStatus.network_failure), (NetworkError, ExitStatus.network_failure), (OSError, ExitStatus.file_io_error), (IOError, ExitStatus.file_io_error), # ExitStatus.parse_error is handled by the ArgumentParse and is not # needed here. # Anything else is ExitStatus.generic_error. ]) '''Mapping of error types to exit status.''' EXPECTED_EXCEPTIONS = ( ServerError, ProtocolError, SSLVerificationError, DNSNotFound, ConnectionRefused, NetworkError, OSError, IOError, HookStop, StopIteration, SystemExit, KeyboardInterrupt, ) '''Exception classes that are not crashes.'''
[docs] class Event(enum.Enum): pipeline_begin = 'pipeline_begin' pipeline_end = 'pipeline_end'
def __init__(self, pipeline_series: PipelineSeries): super().__init__() self._pipeline_series = pipeline_series self._exit_code = 0 self._current_pipeline = None self._state = ApplicationState.ready self.event_dispatcher.register(self.Event.pipeline_begin) self.event_dispatcher.register(self.Event.pipeline_end) @property def exit_code(self) -> int: return self._exit_code @exit_code.setter def exit_code(self, new_code: int): self._exit_code = new_code
[docs] def setup_signal_handlers(self): '''Setup Ctrl+C and SIGTERM handlers.''' if platform.system() == 'Windows': _logger.warning(_( 'Graceful stopping with Unix signals is not supported ' 'on this OS.' )) return event_loop = asyncio.get_event_loop() graceful_called = False def graceful_stop_callback(): nonlocal graceful_called if graceful_called: forceful_stop_callback() return graceful_called = True _logger.info(_('Stopping once all requests complete...')) _logger.info(_('Interrupt again to force stopping immediately.')) self.stop() def forceful_stop_callback(): _logger.info(_('Forcing immediate stop...')) logging.raiseExceptions = False event_loop.stop() event_loop.add_signal_handler(signal.SIGINT, graceful_stop_callback) event_loop.add_signal_handler(signal.SIGTERM, forceful_stop_callback)
[docs] def stop(self): if self._state == ApplicationState.running: _logger.debug('Application stopping') self._state = ApplicationState.stopping if self._current_pipeline: self._current_pipeline.stop()
[docs] def run_sync(self) -> int: '''Run the application. This function is blocking. Returns: int: The exit status. ''' exit_status = asyncio.get_event_loop().run_until_complete(self.run()) asyncio.get_event_loop().close() return exit_status
@asyncio.coroutine
[docs] def run(self): if self._state != ApplicationState.ready: raise RuntimeError('Application is not ready') self._state = ApplicationState.running for pipeline in self._pipeline_series.pipelines: self._current_pipeline = pipeline if self._state == ApplicationState.stopping and pipeline.skippable: continue self.event_dispatcher.notify(self.Event.pipeline_begin, pipeline) try: yield from pipeline.process() except Exception as error: if isinstance(error, StopIteration): raise is_expected = isinstance(error, self.EXPECTED_EXCEPTIONS) show_traceback = not is_expected if show_traceback: _logger.exception('Fatal exception.') else: try: text = '{}: {}'.format(type(error).__name__, error) except AttributeError: text = str(error) _logger.error(text) self._update_exit_code_from_error(error) if not is_expected: self._print_crash_message() self._print_report_bug_message() break self.event_dispatcher.notify(self.Event.pipeline_end, pipeline) self._current_pipeline = None self._state = ApplicationState.stopping if self._exit_code == ExitStatus.ssl_verification_error: self._print_ssl_error() _logger.info(_('Exiting with status {0}.'), self._exit_code) self._state = ApplicationState.stopped return self._exit_code
def _update_exit_code_from_error(self, error): '''Set the exit code based on the error type. Args: error (:class:`Exception`): An exception instance. ''' for error_type, exit_code in self.ERROR_CODE_MAP.items(): if isinstance(error, error_type): self.update_exit_code(exit_code) break else: self.update_exit_code(ExitStatus.generic_error)
[docs] def update_exit_code(self, code: int): '''Set the exit code if it is serious than before. Args: code: The exit code. ''' if code: if self._exit_code: self._exit_code = min(self._exit_code, code) else: self._exit_code = code
@classmethod def _print_ssl_error(cls): '''Print an invalid SSL certificate warning.''' _logger.info(_('A SSL certificate could not be verified.')) _logger.info(_('To ignore and proceed insecurely, ' 'use ‘--no-check-certificate’.')) @classmethod def _print_crash_message(cls): '''Print crashed message.''' _logger.critical(_('Sorry, Wpull unexpectedly crashed.')) @classmethod def _print_report_bug_message(cls): '''Print report the bug message.''' _logger.critical(_( 'Please report this problem to the authors at Wpull\'s ' 'issue tracker so it may be fixed. ' 'If you know how to program, maybe help us fix it? ' 'Thank you for helping us help you help us all.' ))