'''Utils'''
import re
from typing import Tuple, List, Union, Iterable
from wpull.errors import ServerError
import datetime
from wpull.protocol.ftp.ls.listing import FileEntry
[docs]class ReplyCodes(object):
command_okay = 200
syntax_error_command_unrecognized = 500
syntax_error_in_parameters_or_arguments = 501
command_not_implemented_superfluous_at_this_site = 202
command_not_implemented = 502
bad_sequence_of_commands = 503
command_not_implemented_for_that_parameter = 504
restart_marker_reply = 110
system_status_or_system_help_reply = 211
directory_status = 212
file_status = 213
help_message = 214
name_system_type = 215
service_ready_in_nnn_minutes = 120
service_ready_for_new_user = 220
service_closing_control_connection = 221
service_not_available_closing_control_connection = 421
data_connection_already_open_transfer_starting = 125
data_connection_open_no_transfer_in_progress = 225
cant_open_data_connection = 425
closing_data_connection = 226
connection_closed_transfer_aborted = 426
entering_passive_mode = 227
user_logged_in_proceed = 230
not_logged_in = 530
user_name_okay_need_password = 331
need_account_for_login = 332
need_account_for_storing_files = 532
file_status_okay_about_to_open_data_connection = 150
requested_file_action_okay_completed = 250
pathname_created = 257
requested_file_action_pending_further_information = 350
requested_file_action_not_taken = 450
requested_action_not_taken_file_unavailable = 550
requested_action_aborted_local_error_in_processing = 451
requested_action_aborted_page_type_unknown = 551
requested_action_not_taken_insufficient_storage_space = 452
requested_file_action_aborted = 552
requested_action_not_taken_file_name_not_allowed = 553
[docs]class FTPServerError(ServerError):
@property
def reply_code(self):
'''Return reply code.'''
if len(self.args) >= 2 and isinstance(self.args[1], int):
return self.args[1]
[docs]def parse_address(text: str) -> Tuple[str, int]:
'''Parse PASV address.'''
match = re.search(
r'\('
r'(\d{1,3})\s*,'
r'\s*(\d{1,3})\s*,'
r'\s*(\d{1,3})\s*,'
r'\s*(\d{1,3})\s*,'
r'\s*(\d{1,3})\s*,'
r'\s*(\d{1,3})\s*'
r'\)',
text)
if match:
return (
'{0}.{1}.{2}.{3}'.format(int(match.group(1)),
int(match.group(2)),
int(match.group(3)),
int(match.group(4))
),
int(match.group(5)) << 8 | int(match.group(6))
)
else:
raise ValueError('No address found')
[docs]def reply_code_tuple(code: int) -> Tuple[int, int, int]:
'''Return the reply code as a tuple.
Args:
code: The reply code.
Returns:
Each item in the tuple is the digit.
'''
return code // 100, code // 10 % 10, code % 10
[docs]def parse_machine_listing(text: str, convert: bool=True, strict: bool=True) -> \
List[dict]:
'''Parse machine listing.
Args:
text: The listing.
convert: Convert sizes and dates.
strict: Method of handling errors. ``True`` will raise
``ValueError``. ``False`` will ignore rows with errors.
Returns:
list: A list of dict of the facts defined in RFC 3659.
The key names must be lowercase. The filename uses the key
``name``.
'''
# TODO: this function should be moved into the 'ls' package
listing = []
for line in text.splitlines(False):
facts = line.split(';')
row = {}
filename = None
for fact in facts:
name, sep, value = fact.partition('=')
if sep:
name = name.strip().lower()
value = value.strip().lower()
if convert:
try:
value = convert_machine_list_value(name, value)
except ValueError:
if strict:
raise
row[name] = value
else:
if name[0:1] == ' ':
# Is a filename
filename = name[1:]
else:
name = name.strip().lower()
row[name] = ''
if filename:
row['name'] = filename
listing.append(row)
elif strict:
raise ValueError('Missing filename.')
return listing
[docs]def convert_machine_list_value(name: str, value: str) -> \
Union[datetime.datetime, str, int]:
'''Convert sizes and time values.
Size will be ``int`` while time value will be :class:`datetime.datetime`.
'''
if name == 'modify':
return convert_machine_list_time_val(value)
elif name == 'size':
return int(value)
else:
return value
[docs]def convert_machine_list_time_val(text: str) -> datetime.datetime:
'''Convert RFC 3659 time-val to datetime objects.'''
# TODO: implement fractional seconds
text = text[:14]
if len(text) != 14:
raise ValueError('Time value not 14 chars')
year = int(text[0:4])
month = int(text[4:6])
day = int(text[6:8])
hour = int(text[8:10])
minute = int(text[10:12])
second = int(text[12:14])
return datetime.datetime(year, month, day, hour, minute, second,
tzinfo=datetime.timezone.utc)
[docs]def machine_listings_to_file_entries(listings: Iterable[dict]) -> \
Iterable[FileEntry]:
'''Convert results from parsing machine listings to FileEntry list.'''
for listing in listings:
yield FileEntry(
listing['name'],
type=listing.get('type'),
size=listing.get('size'),
date=listing.get('modify')
)