Source code for wpull.protocol.http.client

# encoding=utf-8
'''Basic HTTP Client.'''
import enum
import functools
import gettext
import logging
import warnings

import asyncio

from typing import Optional, Union, IO, Callable

from wpull.application.hook import HookableMixin
from wpull.protocol.abstract.client import BaseClient, BaseSession, DurationTimeout
from wpull.backport.logging import BraceMessage as __
from wpull.body import Body
from wpull.protocol.http.request import Request, Response
from wpull.protocol.http.stream import Stream


_ = gettext.gettext
_logger = logging.getLogger(__name__)


[docs]class SessionState(enum.Enum): ready = 'ready' request_sent = 'request_sent' response_received = 'response_received' aborted = 'aborted'
[docs]class Session(BaseSession): '''HTTP request and response session.'''
[docs] class Event(enum.Enum): begin_request = 'begin_request' request_data = 'request_data' end_request = 'end_request' begin_response = 'begin_response' response_data = 'response_data' end_response = 'end_response'
def __init__(self, stream_factory: Callable[..., Stream]=None, **kwargs): super().__init__(**kwargs) assert stream_factory self._stream_factory = stream_factory self._stream = None self._request = None self._response = None self._session_state = SessionState.ready self.event_dispatcher.register(self.Event.begin_request) self.event_dispatcher.register(self.Event.request_data) self.event_dispatcher.register(self.Event.end_request) self.event_dispatcher.register(self.Event.begin_response) self.event_dispatcher.register(self.Event.response_data) self.event_dispatcher.register(self.Event.end_response) @asyncio.coroutine
[docs] def start(self, request: Request) -> Response: '''Begin a HTTP request Args: request: Request information. Returns: A response populated with the HTTP headers. Once the headers are received, call :meth:`download`. Coroutine. ''' if self._session_state != SessionState.ready: raise RuntimeError('Session already started') assert not self._request self._request = request _logger.debug(__('Client fetch request {0}.', request)) connection = yield from self._acquire_request_connection(request) full_url = connection.proxied and not connection.tunneled self._stream = stream = self._stream_factory(connection) yield from self._stream.reconnect() request.address = connection.address self.event_dispatcher.notify(self.Event.begin_request, request) write_callback = functools.partial(self.event_dispatcher.notify, self.Event.request_data) stream.data_event_dispatcher.add_write_listener(write_callback) yield from stream.write_request(request, full_url=full_url) if request.body: assert 'Content-Length' in request.fields length = int(request.fields['Content-Length']) yield from stream.write_body(request.body, length=length) stream.data_event_dispatcher.remove_write_listener(write_callback) self.event_dispatcher.notify(self.Event.end_request, request) read_callback = functools.partial(self.event_dispatcher.notify, self.Event.response_data) stream.data_event_dispatcher.add_read_listener(read_callback) self._response = response = yield from stream.read_response() response.request = request self.event_dispatcher.notify(self.Event.begin_response, response) self._session_state = SessionState.request_sent return response
@asyncio.coroutine
[docs] def download( self, file: Union[IO[bytes], asyncio.StreamWriter, None]=None, raw: bool=False, rewind: bool=True, duration_timeout: Optional[float]=None): '''Read the response content into file. Args: file: A file object or asyncio stream. raw: Whether chunked transfer encoding should be included. rewind: Seek the given file back to its original offset after reading is finished. duration_timeout: Maximum time in seconds of which the entire file must be read. Be sure to call :meth:`start` first. Coroutine. ''' if self._session_state != SessionState.request_sent: raise RuntimeError('Request not sent') if rewind and file and hasattr(file, 'seek'): original_offset = file.tell() else: original_offset = None if not hasattr(file, 'drain'): self._response.body = file if not isinstance(file, Body): self._response.body = Body(file) read_future = self._stream.read_body(self._request, self._response, file=file, raw=raw) try: yield from asyncio.wait_for(read_future, timeout=duration_timeout) except asyncio.TimeoutError as error: raise DurationTimeout( 'Did not finish reading after {} seconds.' .format(duration_timeout) ) from error self._session_state = SessionState.response_received if original_offset is not None: file.seek(original_offset) self.event_dispatcher.notify(self.Event.end_response, self._response) self.recycle()
[docs] def done(self) -> bool: '''Return whether the session was complete. A session is complete when it has sent a request, read the response header and the response body. ''' return self._session_state == SessionState.response_received
[docs] def abort(self): super().abort() self._session_state = SessionState.aborted
[docs] def recycle(self): if not self.done(): super().abort() warnings.warn(_('HTTP session did not complete.')) super().recycle()
[docs]class Client(BaseClient): '''Stateless HTTP/1.1 client. The session object is :class:`Session`. ''' def __init__(self, *args, stream_factory=Stream, **kwargs): super().__init__(*args, **kwargs) self._stream_factory = stream_factory def _session_class(self) -> Callable[[], Session]: return functools.partial(Session, stream_factory=self._stream_factory)
[docs] def session(self) -> Session: return super().session()