Source code for wpull.protocol.ftp.command

'''FTP service control.'''
import logging


import asyncio

from typing import Sequence, Tuple, Callable, IO
from typing import Union

from wpull.errors import ProtocolError
from wpull.network.connection import Connection
from wpull.protocol.ftp.request import Command, Reply
from wpull.protocol.ftp.stream import DataStream
from wpull.protocol.ftp.util import ReplyCodes, FTPServerError
import wpull.protocol.ftp.util


_logger = logging.getLogger(__name__)


[docs]class Commander(object): '''Helper class that performs typical FTP routines. Args: control_stream (:class:`.ftp.stream.ControlStream`): The control stream. ''' def __init__(self, data_stream): self._control_stream = data_stream @classmethod
[docs] def raise_if_not_match(cls, action: str, expected_code: Union[int, Sequence[int]], reply: Reply): '''Raise FTPServerError if not expected reply code. Args: action: Label to use in the exception message. expected_code: Expected 3 digit code. reply: Reply from the server. ''' if isinstance(expected_code, int): expected_codes = (expected_code,) else: expected_codes = expected_code if reply.code not in expected_codes: raise FTPServerError( 'Failed action {action}: {reply_code} {reply_text}' .format(action=action, reply_code=reply.code, reply_text=ascii(reply.text) ), reply.code )
@asyncio.coroutine
[docs] def read_welcome_message(self): '''Read the welcome message. Coroutine. ''' reply = yield from self._control_stream.read_reply() self.raise_if_not_match( 'Server ready', ReplyCodes.service_ready_for_new_user, reply)
@asyncio.coroutine
[docs] def login(self, username: str='anonymous', password: str='-wpull-lib@'): '''Log in. Coroutine. ''' yield from self._control_stream.write_command(Command('USER', username)) reply = yield from self._control_stream.read_reply() if reply.code == ReplyCodes.user_logged_in_proceed: return self.raise_if_not_match( 'Login username', ReplyCodes.user_name_okay_need_password, reply) yield from self._control_stream.write_command(Command('PASS', password)) reply = yield from self._control_stream.read_reply() self.raise_if_not_match( 'Login password', ReplyCodes.user_logged_in_proceed, reply)
@asyncio.coroutine
[docs] def passive_mode(self) -> Tuple[str, int]: '''Enable passive mode. Returns: The address (IP address, port) of the passive port. Coroutine. ''' yield from self._control_stream.write_command(Command('PASV')) reply = yield from self._control_stream.read_reply() self.raise_if_not_match( 'Passive mode', ReplyCodes.entering_passive_mode, reply) try: return wpull.protocol.ftp.util.parse_address(reply.text) except ValueError as error: raise ProtocolError(str(error)) from error
@asyncio.coroutine
[docs] def setup_data_stream( self, connection_factory: Callable[[tuple], Connection], data_stream_factory: Callable[[Connection], DataStream]=DataStream) -> \ DataStream: '''Create and setup a data stream. This function will set up passive and binary mode and handle connecting to the data connection. Args: connection_factory: A coroutine callback that returns a connection data_stream_factory: A callback that returns a data stream Coroutine. Returns: DataStream ''' yield from self._control_stream.write_command(Command('TYPE', 'I')) reply = yield from self._control_stream.read_reply() self.raise_if_not_match('Binary mode', ReplyCodes.command_okay, reply) address = yield from self.passive_mode() connection = yield from connection_factory(address) # TODO: unit test for following line for connections that have # the same port over time but within pool cleaning intervals connection.reset() yield from connection.connect() data_stream = data_stream_factory(connection) return data_stream
@asyncio.coroutine
[docs] def begin_stream(self, command: Command) -> Reply: '''Start sending content on the data stream. Args: command: A command that tells the server to send data over the data connection. Coroutine. Returns: The begin reply. ''' yield from self._control_stream.write_command(command) reply = yield from self._control_stream.read_reply() self.raise_if_not_match( 'Begin stream', ( ReplyCodes.file_status_okay_about_to_open_data_connection, ReplyCodes.data_connection_already_open_transfer_starting, ), reply ) return reply
@asyncio.coroutine
[docs] def read_stream(self, file: IO, data_stream: DataStream) -> Reply: '''Read from the data stream. Args: file: A destination file object or a stream writer. data_stream: The stream of which to read from. Coroutine. Returns: Reply: The final reply. ''' yield from data_stream.read_file(file=file) reply = yield from self._control_stream.read_reply() self.raise_if_not_match( 'End stream', ReplyCodes.closing_data_connection, reply ) data_stream.close() return reply
@asyncio.coroutine
[docs] def size(self, filename: str) -> int: '''Get size of file. Coroutine. ''' yield from self._control_stream.write_command(Command('SIZE', filename)) reply = yield from self._control_stream.read_reply() self.raise_if_not_match('File size', ReplyCodes.file_status, reply) try: return int(reply.text.strip()) except ValueError: return
@asyncio.coroutine
[docs] def restart(self, offset: int): '''Send restart command. Coroutine. ''' yield from self._control_stream.write_command(Command('REST', str(offset))) reply = yield from self._control_stream.read_reply() self.raise_if_not_match('Restart', ReplyCodes.requested_file_action_pending_further_information, reply)