Source code for wpull.protocol.ftp.client

'''FTP client.'''
import enum
import io
import logging
import weakref
import functools


import asyncio

from typing import IO, Tuple
from typing import Optional

from wpull.application.hook import HookableMixin
from wpull.protocol.abstract.client import BaseClient, BaseSession, DurationTimeout
from wpull.body import Body
from wpull.errors import ProtocolError, AuthenticationError
from wpull.protocol.ftp.command import Commander
from wpull.protocol.ftp.ls.listing import ListingError, ListingParser
from wpull.protocol.ftp.request import Response, Command, ListingResponse, \
    Request
from wpull.protocol.ftp.stream import ControlStream
from wpull.protocol.ftp.util import FTPServerError, ReplyCodes
import wpull.protocol.ftp.util


_logger = logging.getLogger(__name__)


[docs]class SessionState(enum.Enum): ready = 'ready' file_request_sent = 'file_request_sent' directory_request_sent = 'directory_request_sent' response_received = 'response_received' aborted = 'aborted'
[docs]class Session(BaseSession):
[docs] class Event(enum.Enum): begin_control = 'begin_control' control_send_data = 'control_send_data' control_receive_data = 'control_receive_data' end_control = 'end_control' begin_transfer = 'begin_transfer' transfer_send_data = 'transfer_send_data' transfer_receive_data = 'transfer_receive_data' end_transfer = 'end_transfer'
def __init__(self, login_table: weakref.WeakKeyDictionary, **kwargs): self._login_table = login_table super().__init__(**kwargs) self._control_connection = None self._control_stream = None self._commander = None self._request = None self._response = None self._data_stream = None self._data_connection = None self._listing_type = None self._session_state = SessionState.ready self.event_dispatcher.register(self.Event.begin_control) self.event_dispatcher.register(self.Event.control_send_data) self.event_dispatcher.register(self.Event.control_receive_data) self.event_dispatcher.register(self.Event.end_control) self.event_dispatcher.register(self.Event.begin_transfer) self.event_dispatcher.register(self.Event.transfer_send_data) self.event_dispatcher.register(self.Event.transfer_receive_data) self.event_dispatcher.register(self.Event.end_transfer) @asyncio.coroutine def _init_stream(self): '''Create streams and commander. Coroutine. ''' assert not self._control_connection self._control_connection = yield from self._acquire_request_connection(self._request) self._control_stream = ControlStream(self._control_connection) self._commander = Commander(self._control_stream) read_callback = functools.partial(self.event_dispatcher.notify, self.Event.control_receive_data) self._control_stream.data_event_dispatcher.add_read_listener(read_callback) write_callback = functools.partial(self.event_dispatcher.notify, self.Event.control_send_data) self._control_stream.data_event_dispatcher.add_write_listener(write_callback) @asyncio.coroutine def _log_in(self): '''Connect and login. Coroutine. ''' username = self._request.url_info.username or self._request.username or 'anonymous' password = self._request.url_info.password or self._request.password or '-wpull@' cached_login = self._login_table.get(self._control_connection) if cached_login and cached_login == (username, password): _logger.debug('Reusing existing login.') return try: yield from self._commander.login(username, password) except FTPServerError as error: raise AuthenticationError('Login error: {}'.format(error)) \ from error self._login_table[self._control_connection] = (username, password) @asyncio.coroutine
[docs] def start(self, request: Request) -> Response: '''Start a file or directory listing download. Args: request: Request. Returns: A Response populated with the initial data connection reply. Once the response is received, call :meth:`download`. Coroutine. ''' if self._session_state != SessionState.ready: raise RuntimeError('Session not ready') response = Response() yield from self._prepare_fetch(request, response) response.file_transfer_size = yield from self._fetch_size(request) if request.restart_value: try: yield from self._commander.restart(request.restart_value) response.restart_value = request.restart_value except FTPServerError: _logger.debug('Could not restart file.', exc_info=1) yield from self._open_data_stream() command = Command('RETR', request.file_path) yield from self._begin_stream(command) self._session_state = SessionState.file_request_sent return response
@asyncio.coroutine
[docs] def start_listing(self, request: Request) -> ListingResponse: '''Fetch a file listing. Args: request: Request. Returns: A listing response populated with the initial data connection reply. Once the response is received, call :meth:`download_listing`. Coroutine. ''' if self._session_state != SessionState.ready: raise RuntimeError('Session not ready') response = ListingResponse() yield from self._prepare_fetch(request, response) yield from self._open_data_stream() mlsd_command = Command('MLSD', self._request.file_path) list_command = Command('LIST', self._request.file_path) try: yield from self._begin_stream(mlsd_command) self._listing_type = 'mlsd' except FTPServerError as error: if error.reply_code in (ReplyCodes.syntax_error_command_unrecognized, ReplyCodes.command_not_implemented): self._listing_type = None else: raise if not self._listing_type: # This code not in exception handler to avoid incorrect # exception chaining yield from self._begin_stream(list_command) self._listing_type = 'list' _logger.debug('Listing type is %s', self._listing_type) self._session_state = SessionState.directory_request_sent return response
@asyncio.coroutine def _prepare_fetch(self, request: Request, response: Response): '''Prepare for a fetch. Coroutine. ''' self._request = request self._response = response yield from self._init_stream() connection_closed = self._control_connection.closed() if connection_closed: self._login_table.pop(self._control_connection, None) yield from self._control_stream.reconnect() request.address = self._control_connection.address connection_reused = not connection_closed self.event_dispatcher.notify(self.Event.begin_control, request, connection_reused=connection_reused) if connection_closed: yield from self._commander.read_welcome_message() yield from self._log_in() self._response.request = request @asyncio.coroutine def _begin_stream(self, command: Command): '''Start data stream transfer.''' begin_reply = yield from self._commander.begin_stream(command) self._response.reply = begin_reply self.event_dispatcher.notify(self.Event.begin_transfer, self._response) @asyncio.coroutine
[docs] def download(self, file: Optional[IO]=None, rewind: bool=True, duration_timeout: Optional[float]=None) -> Response: '''Read the response content into file. Args: file: A file object or asyncio stream. rewind: Seek the given file back to its original offset after reading is finished. duration_timeout: Maximum time in seconds of which the entire file must be read. Returns: A Response populated with the final data connection reply. Be sure to call :meth:`start` first. Coroutine. ''' if self._session_state != SessionState.file_request_sent: raise RuntimeError('File request not sent') if rewind and file and hasattr(file, 'seek'): original_offset = file.tell() else: original_offset = None if not hasattr(file, 'drain'): self._response.body = file if not isinstance(file, Body): self._response.body = Body(file) read_future = self._commander.read_stream(file, self._data_stream) try: reply = yield from \ asyncio.wait_for(read_future, timeout=duration_timeout) except asyncio.TimeoutError as error: raise DurationTimeout( 'Did not finish reading after {} seconds.' .format(duration_timeout) ) from error self._response.reply = reply if original_offset is not None: file.seek(original_offset) self.event_dispatcher.notify(self.Event.end_transfer, self._response) self._session_state = SessionState.response_received return self._response
@asyncio.coroutine
[docs] def download_listing(self, file: Optional[IO], duration_timeout: Optional[float]=None) -> \ ListingResponse: '''Read file listings. Args: file: A file object or asyncio stream. duration_timeout: Maximum time in seconds of which the entire file must be read. Returns: A Response populated the file listings Be sure to call :meth:`start_file_listing` first. Coroutine. ''' if self._session_state != SessionState.directory_request_sent: raise RuntimeError('File request not sent') self._session_state = SessionState.file_request_sent yield from self.download(file=file, rewind=False, duration_timeout=duration_timeout) try: if self._response.body.tell() == 0: listings = () elif self._listing_type == 'mlsd': self._response.body.seek(0) machine_listings = wpull.protocol.ftp.util.parse_machine_listing( self._response.body.read().decode('utf-8', errors='surrogateescape'), convert=True, strict=False ) listings = list( wpull.protocol.ftp.util.machine_listings_to_file_entries( machine_listings )) else: self._response.body.seek(0) file = io.TextIOWrapper(self._response.body, encoding='utf-8', errors='surrogateescape') listing_parser = ListingParser(file=file) listings = list(listing_parser.parse_input()) _logger.debug('Listing detected as %s', listing_parser.type) # We don't want the file to be closed when exiting this function file.detach() except (ListingError, ValueError) as error: raise ProtocolError(*error.args) from error self._response.files = listings self._response.body.seek(0) self._session_state = SessionState.response_received return self._response
@asyncio.coroutine def _open_data_stream(self): '''Open the data stream connection. Coroutine. ''' @asyncio.coroutine def connection_factory(address: Tuple[int, int]): self._data_connection = yield from self._acquire_connection(address[0], address[1]) return self._data_connection self._data_stream = yield from self._commander.setup_data_stream( connection_factory ) self._response.data_address = self._data_connection.address read_callback = functools.partial(self.event_dispatcher.notify, self.Event.transfer_receive_data) self._data_stream.data_event_dispatcher.add_read_listener(read_callback) write_callback = functools.partial(self.event_dispatcher.notify, self.Event.transfer_send_data) self._data_stream.data_event_dispatcher.add_write_listener(write_callback) @asyncio.coroutine def _fetch_size(self, request: Request) -> int: '''Return size of file. Coroutine. ''' try: size = yield from self._commander.size(request.file_path) return size except FTPServerError: return
[docs] def abort(self): super().abort() self._close_data_connection() if self._control_connection: self._login_table.pop(self._control_connection, None)
[docs] def recycle(self): super().recycle() self._close_data_connection() if self._control_connection: self.event_dispatcher.notify( self.Event.end_control, self._response, connection_closed=self._control_connection.closed() )
def _close_data_connection(self): if self._data_connection: # self._data_connection.close() # self._connection_pool.no_wait_release(self._data_connection) self._data_connection = None if self._data_stream: self._data_stream = None
[docs]class Client(BaseClient): '''FTP Client. The session object is :class:`Session`. ''' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._login_table = weakref.WeakKeyDictionary() def _session_class(self) -> Session: return functools.partial(Session, login_table=self._login_table)
[docs] def session(self) -> Session: return super().session()