# encoding=utf-8
'''Miscellaneous functions.'''
import calendar
import codecs
import contextlib
import datetime
import gzip
import os.path
import platform
import re
import sys
import time
import zipfile
import pickle
from itertools import zip_longest
IS_PYPY = platform.python_implementation() == 'PyPy'
[docs]class ASCIIStreamWriter(codecs.StreamWriter):
'''A Stream Writer that encodes everything to ASCII.
By default, the replacement character is a Python backslash sequence.
'''
DEFAULT_ERROR = 'backslashreplace'
def __init__(self, stream, errors=DEFAULT_ERROR):
codecs.StreamWriter.__init__(self, stream, errors)
[docs] def encode(self, instance, errors=DEFAULT_ERROR):
return instance.encode('ascii', errors)
[docs] def decode(self, instance, errors=DEFAULT_ERROR):
return instance.encode('ascii', errors)
[docs] def write(self, instance):
if hasattr(instance, 'encode'):
instance = instance.encode('ascii', self.errors)
if hasattr(instance, 'decode'):
instance = instance.decode('ascii', self.errors)
self.stream.write(instance)
[docs] def writelines(self, list_instance):
for item in list_instance:
self.write(item)
@contextlib.contextmanager
[docs]def reset_file_offset(file):
'''Reset the file offset back to original position.'''
offset = file.tell()
yield
file.seek(offset)
[docs]def peek_file(file, length=4096):
'''Peek the file by calling ``read`` on it.'''
with reset_file_offset(file):
return file.read(length)
[docs]def seek_file_end(file):
'''Seek to the end of the file.'''
try:
file.seek(0, 2)
except ValueError:
# gzip files don't support seek from end
while True:
data = file.read(4096)
if not data:
break
[docs]def datetime_str():
'''Return the current time in simple ISO8601 notation.'''
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
[docs]def parse_iso8601_str(string):
'''Parse a fixed ISO8601 datetime string.
.. Note:: This function only parses dates in the format
``%Y-%m-%dT%H:%M:%SZ``. You must use a library like ``dateutils``
to properly parse dates and times.
Returns:
float: A UNIX timestamp.
'''
datetime_obj = datetime.datetime.strptime(string, "%Y-%m-%dT%H:%M:%SZ")
return int(calendar.timegm(datetime_obj.utctimetuple()))
[docs]def python_version():
'''Return the Python version as a string.'''
major, minor, patch = sys.version_info[0:3]
return '{0}.{1}.{2}'.format(major, minor, patch)
[docs]def filter_pem(data):
'''Processes the bytes for PEM certificates.
Returns:
``set`` containing each certificate
'''
assert isinstance(data, bytes), 'Expect bytes. Got {}.'.format(type(data))
certs = set()
new_list = []
in_pem_block = False
for line in re.split(br'[\r\n]+', data):
if line == b'-----BEGIN CERTIFICATE-----':
assert not in_pem_block
in_pem_block = True
elif line == b'-----END CERTIFICATE-----':
assert in_pem_block
in_pem_block = False
content = b''.join(new_list)
content = rewrap_bytes(content)
certs.add(b'-----BEGIN CERTIFICATE-----\n' +
content +
b'\n-----END CERTIFICATE-----\n')
new_list = []
elif in_pem_block:
new_list.append(line)
return certs
[docs]def rewrap_bytes(data):
'''Rewrap characters to 70 character width.
Intended to rewrap base64 content.
'''
return b'\n'.join(
data[index:index+70] for index in range(0, len(data), 70)
)
[docs]def truncate_file(path):
'''Truncate the file.'''
with open(path, 'wb'):
pass
[docs]def get_package_data(filename, mode='rb'):
'''Return the contents of a real file or a zip file.'''
if os.path.exists(filename):
with open(filename, mode=mode) as in_file:
return in_file.read()
else:
parts = os.path.normpath(filename).split(os.sep)
for part, index in zip(parts, range(len(parts))):
if part.endswith('.zip'):
zip_path = os.sep.join(parts[:index + 1])
member_path = os.sep.join(parts[index + 1:])
break
if platform.system() == 'Windows':
member_path = member_path.replace('\\', '/')
with zipfile.ZipFile(zip_path) as zip_file:
return zip_file.read(member_path)
[docs]def get_package_filename(filename, package_dir=None):
'''Return the filename of the data file.'''
if getattr(sys, 'frozen', False):
package_dir = os.path.join(
sys._MEIPASS,
os.path.basename(os.path.dirname(__file__))
)
elif not package_dir:
package_dir = os.path.dirname(__file__)
return os.path.join(package_dir, filename)
[docs]def is_ascii(text):
'''Returns whether the given string is ASCII.'''
try:
text.encode('ascii', 'strict')
except UnicodeError:
return False
else:
return True
@contextlib.contextmanager
[docs]def close_on_error(close_func):
'''Context manager to close object on error.'''
try:
yield
except Exception as error:
if not isinstance(error, StopIteration):
close_func()
raise
[docs]def get_exception_message(instance):
'''Try to get the exception message or the class name.'''
args = getattr(instance, 'args', None)
if args:
return str(instance)
try:
return type(instance).__name__
except AttributeError:
return str(instance)
[docs]class PickleStream(object):
'''Pickle stream helper.'''
def __init__(self, filename=None, file=None, mode='rb',
protocol=pickle.DEFAULT_PROTOCOL):
if file:
self._file = file
else:
self._file = open(filename, mode)
self._protocol = protocol
[docs] def dump(self, obj):
'''Pickle an object.'''
pickle.dump(obj, self._file, protocol=self._protocol)
[docs] def load(self):
'''Unpickle an object.'''
return pickle.load(self._file)
[docs] def iter_load(self):
'''Unpickle objects.'''
while True:
try:
yield pickle.load(self._file)
except EOFError:
break
[docs] def close(self):
'''Close stream.'''
self._file.close()
[docs]class GzipPickleStream(PickleStream):
'''gzip compressed pickle stream.'''
def __init__(self, filename=None, file=None, mode='rb', **kwargs):
if file:
self._gzip_file = gzip.GzipFile(fileobj=file, mode=mode)
else:
self._gzip_file = gzip.GzipFile(filename, mode=mode)
super().__init__(file=self._gzip_file, mode=mode, **kwargs)
[docs] def close(self):
self._gzip_file.close()
super().close()
[docs]def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)