'''Listing parser.'''
import re
import itertools
import namedlist
from wpull.protocol.ftp.ls.date import parse_datetime
import wpull.protocol.ftp.ls.date
FileEntry = namedlist.namedtuple(
'FileEntryType',
[
'name',
('type', None),
('size', None),
('date', None),
('dest', None),
('perm', None)
])
'''A row in a listing.
Attributes:
name (str): Filename.
type (str, None): ``file``, ``dir``, ``symlink``, ``other``, ``None``
size (int, None): Size of file.
date (:class:`datetime.datetime`, None): A datetime object in UTC.
dest (str, None): Destination filename for symlinks.
perm (int, None): Unix permissions expressed as an integer.
'''
[docs]class ListingError(ValueError):
'''Error during parsing a listing.'''
[docs]class UnknownListingError(ListingError):
'''Failed to determine type of listing.'''
[docs]class LineParser(object):
'''Parse individual lines in a listing.'''
def __init__(self):
self.type = None
self.date_format = None
self.is_day_period = None
[docs] def guess_type(self, sample_lines):
'''Guess the type of listing from a sample of lines.'''
self.type = guess_listing_type(sample_lines)
return self.type
[docs] def parse(self, lines):
'''Parse the lines.'''
if self.type == 'msdos':
return self.parse_msdos(lines)
elif self.type == 'unix':
return self.parse_unix(lines)
elif self.type == 'nlst':
return self.parse_nlst(lines)
else:
raise UnknownListingError('Unsupported listing type.')
[docs] def parse_datetime(self, text):
'''Parse datetime from line of text.'''
return parse_datetime(text, date_format=self.date_format,
is_day_period=self.is_day_period)
[docs] def parse_nlst(self, lines):
'''Parse lines from a NLST format.'''
for line in lines:
yield FileEntry(line)
[docs] def parse_msdos(self, lines):
'''Parse lines from a MS-DOS format.'''
for line in lines:
fields = line.split(None, 4)
date_str = fields[0]
time_str = fields[1]
datetime_str = '{} {}'.format(date_str, time_str)
file_datetime = self.parse_datetime(datetime_str)[0]
if fields[2] == '<DIR>':
file_size = None
file_type = 'dir'
else:
file_size = parse_int(fields[2])
file_type = 'file'
filename = fields[3]
yield FileEntry(filename, file_type, file_size, file_datetime)
[docs] def parse_unix(self, lines):
'''Parse listings from a Unix ls command format.'''
# This method uses some Filezilla parsing algorithms
for line in lines:
original_line = line
fields = line.split(' ')
after_perm_index = 0
# Search for the permissions field by checking the file type
for field in fields:
after_perm_index += len(field)
if not field:
continue
# If the filesystem goes corrupt, it may show ? instead
# but I don't really care in that situation.
if field[0] in 'bcdlps-':
if field[0] == 'd':
file_type = 'dir'
elif field[0] == '-':
file_type = 'file'
elif field[0] == 'l':
file_type = 'symlink'
else:
file_type = 'other'
perms = parse_unix_perm(field[1:])
break
else:
raise ListingError('Failed to parse file type.')
line = line[after_perm_index:]
# We look for the position of the date and use the integer
# before it as the file size.
# We look for the position of the time and use the text
# after it as the filename
while line:
try:
datetime_obj, start_index, end_index = self.parse_datetime(line)
except ValueError:
line = line[4:]
else:
break
else:
raise ListingError(
'Could parse a date from {}'.format(repr(original_line)))
file_size = int(line[:start_index].rstrip().rpartition(' ')[-1])
filename = line[end_index:].strip()
if file_type == 'symlink':
filename, sep, symlink_dest = filename.partition(' -> ')
else:
symlink_dest = None
yield FileEntry(filename, file_type, file_size, datetime_obj,
symlink_dest, perm=perms)
[docs]def guess_listing_type(lines, threshold=100):
'''Guess the style of directory listing.
Returns:
str: ``unix``, ``msdos``, ``nlst``, ``unknown``.
'''
scores = {
'unix': 0,
'msdos': 0,
'nlst': 0,
}
for line in lines:
if not line:
continue
if re.search(r'---|r--|rw-|rwx', line):
scores['unix'] += 1
if '<DIR>' in line or re.search(r'^.{0,4}\d\d', line):
scores['msdos'] += 1
words = line.split(' ', 1)
if len(words) == 1:
scores['nlst'] += 1
if max(scores.values()) > threshold:
break
top = max(scores.items(), key=lambda item: item[1])
if top[1]:
return top[0]
else:
return 'unknown'
NUM_GROUPER_TABLE = str.maketrans('', '', ' ,')
[docs]def parse_int(text):
'''Parse a integer containing potential grouping characters.'''
text = text.translate(NUM_GROUPER_TABLE)
return int(text)
[docs]def parse_unix_perm(text):
'''Parse a Unix permission string and return integer value.'''
# Based on ftp-ls.c symperms
if len(text) != 9:
return 0
perms = 0
for triad_index in range(3):
string_index = triad_index * 3
perms <<= 3
if text[string_index] == 'r':
perms |= 1 << 2
if text[string_index + 1] == 'w':
perms |= 1 << 1
if text[string_index + 2] in 'xs':
perms |= 1
return perms
[docs]class ListingParser(LineParser):
'''Listing parser.
Args:
text (str): A text listing.
file: A file object in text mode containing the listing.
'''
def __init__(self, text=None, file=None):
super().__init__()
self._text = text
self._file = file