Source code for wpull.protocol.abstract.client

'''Client abstractions'''
import abc
import asyncio
import contextlib
import enum
import logging

from typing import Optional, Callable, TypeVar, Generic

from wpull.application.hook import HookableMixin
from wpull.errors import NetworkTimedOut
from wpull.network.pool import ConnectionPool

_logger = logging.getLogger(__name__)


@contextlib.contextmanager
[docs]def dummy_context_manager(): yield None
[docs]class DurationTimeout(NetworkTimedOut): '''Download did not complete within specified time.'''
[docs]class BaseSession(HookableMixin, metaclass=abc.ABCMeta): '''Base session.'''
[docs] class SessionEvent(enum.Enum): begin_session = 'begin_session' end_session = 'end_session'
def __init__(self, connection_pool): super().__init__() self._connection_pool = connection_pool self._connections = set() self.event_dispatcher.register(self.SessionEvent.begin_session) self.event_dispatcher.register(self.SessionEvent.end_session) def __enter__(self): self.event_dispatcher.notify(self.SessionEvent.begin_session) return self def __exit__(self, exc_type, exc_val, exc_tb): if exc_val and not isinstance(exc_val, StopIteration): _logger.debug('Early close session.') error = True self.abort() else: error = False self.recycle() self.event_dispatcher.notify(self.SessionEvent.end_session, error=error)
[docs] def abort(self): '''Terminate early and close any connections.''' for connection in self._connections: connection.close()
[docs] def recycle(self): '''Clean up and return connections back to the pool. Connections should be kept alive if supported. ''' for connection in self._connections: self._connection_pool.no_wait_release(connection) self._connections.clear()
@asyncio.coroutine def _acquire_request_connection(self, request): '''Return a connection.''' host = request.url_info.hostname port = request.url_info.port use_ssl = request.url_info.scheme == 'https' tunnel = request.url_info.scheme != 'http' connection = yield from self._acquire_connection(host, port, use_ssl, tunnel) return connection @asyncio.coroutine def _acquire_connection(self, host, port, use_ssl=False, tunnel=True): '''Return a connection.''' if hasattr(self._connection_pool, 'acquire_proxy'): connection = yield from \ self._connection_pool.acquire_proxy(host, port, use_ssl, tunnel=tunnel) else: connection = yield from \ self._connection_pool.acquire(host, port, use_ssl) self._connections.add(connection) return connection
SessionT = TypeVar('SessionT')
[docs]class BaseClient(Generic[SessionT], HookableMixin, metaclass=abc.ABCMeta): '''Base client.'''
[docs] class ClientEvent(enum.Enum): new_session = 'new_session'
def __init__(self, connection_pool: Optional[ConnectionPool]=None): ''' Args: connection_pool: Connection pool. ''' super().__init__() if connection_pool is not None: self._connection_pool = connection_pool else: self._connection_pool = ConnectionPool() self.event_dispatcher.register(self.ClientEvent.new_session) @abc.abstractmethod def _session_class(self) -> Callable[[], SessionT]: '''Return session class.''' return BaseSession # return something for code checkers
[docs] def session(self) -> SessionT: '''Return a new session.''' session = self._session_class()( connection_pool=self._connection_pool, ) self.event_dispatcher.notify(self.ClientEvent.new_session, session) return session
[docs] def close(self): '''Close the connection pool.''' _logger.debug('Client closing.') self._connection_pool.close()