'''FTP Streams'''
import logging
import asyncio
from typing import IO, Union
from wpull.network.connection import Connection
from wpull.protocol.abstract.stream import close_stream_on_error, \
DataEventDispatcher
from wpull.errors import NetworkError
from wpull.protocol.ftp.request import Reply, Command
_logger = logging.getLogger(__name__)
[docs]class DataStream(object):
'''Stream class for a data connection.
Args:
connection: Connection.
'''
def __init__(self, connection: Connection):
self._connection = connection
self._data_event_dispatcher = DataEventDispatcher()
@property
def data_event_dispatcher(self) -> DataEventDispatcher:
return self._data_event_dispatcher
[docs] def close(self):
'''Close connection.'''
self._connection.close()
[docs] def closed(self) -> bool:
'''Return whether the connection is closed.'''
return self._connection.closed()
@asyncio.coroutine
@close_stream_on_error
[docs] def read_file(self, file: Union[IO, asyncio.StreamWriter]=None):
'''Read from connection to file.
Args:
file: A file object or a writer stream.
'''
if file:
file_is_async = hasattr(file, 'drain')
while True:
data = yield from self._connection.read(4096)
if not data:
break
if file:
file.write(data)
if file_is_async:
yield from file.drain()
self._data_event_dispatcher.notify_read(data)
# TODO: def write_file()
[docs]class ControlStream(object):
'''Stream class for a control connection.
Args:
connection: Connection.
'''
def __init__(self, connection: Connection):
self._connection = connection
self._data_event_dispatcher = DataEventDispatcher()
@property
def data_event_dispatcher(self):
return self._data_event_dispatcher
[docs] def close(self):
'''Close the connection.'''
self._connection.close()
[docs] def closed(self) -> bool:
'''Return whether the connection is closed.'''
return self._connection.closed()
@asyncio.coroutine
[docs] def reconnect(self):
'''Connected the stream if needed.
Coroutine.
'''
if self._connection.closed():
self._connection.reset()
yield from self._connection.connect()
@asyncio.coroutine
@close_stream_on_error
[docs] def write_command(self, command: Command):
'''Write a command to the stream.
Args:
command: The command.
Coroutine.
'''
_logger.debug('Write command.')
data = command.to_bytes()
yield from self._connection.write(data)
self._data_event_dispatcher.notify_write(data)
@asyncio.coroutine
@close_stream_on_error
[docs] def read_reply(self) -> Reply:
'''Read a reply from the stream.
Returns:
.ftp.request.Reply: The reply
Coroutine.
'''
_logger.debug('Read reply')
reply = Reply()
while True:
line = yield from self._connection.readline()
if line[-1:] != b'\n':
raise NetworkError('Connection closed.')
self._data_event_dispatcher.notify_read(line)
reply.parse(line)
if reply.code is not None:
break
return reply