# encoding=utf-8
'''Document content post-processing.'''
import abc
import codecs
import gettext
import io
import logging
import os.path
import shutil
import wpull.string
from wpull.backport.logging import BraceMessage as __
from wpull.database.base import NotFound
from wpull.document.htmlparse.element import Comment, Element, Doctype
from wpull.pipeline.item import Status
from wpull.scraper.css import CSSScraper
from wpull.scraper.html import HTMLScraper
from wpull.url import URLInfo
_ = gettext.gettext
_logger = logging.getLogger(__name__)
# Snipped from lxml.html.def:
empty_tags = frozenset([
'area', 'base', 'basefont', 'br', 'col', 'frame', 'hr',
'img', 'input', 'isindex', 'link', 'meta', 'param'])
[docs]class BaseDocumentConverter(object, metaclass=abc.ABCMeta):
'''Base class for classes that convert links within a document.'''
@abc.abstractmethod
[docs] def convert(self, input_filename, output_filename, base_url=None):
pass
[docs]class BatchDocumentConverter(object):
'''Convert all documents in URL table.
Args:
url_table: An instance of :class:`.database.URLTable`.
backup (bool): Whether back up files are created.
'''
def __init__(self, html_parser, element_walker, url_table, backup=False):
self._url_table = url_table
self._backup_enabled = backup
self._html_converter = HTMLConverter(html_parser, element_walker,
url_table)
self._css_converter = CSSConverter(url_table)
[docs] def convert_all(self):
'''Convert all links in URL table.'''
for url_record in self._url_table.get_all():
if url_record.status != Status.done:
continue
self.convert_by_record(url_record)
[docs] def convert_by_record(self, url_record):
'''Convert using given URL Record.'''
filename = url_record.filename
if not os.path.exists(filename):
return
if url_record.link_type:
if url_record.link_type not in ('css', 'html'):
return
else:
link_type = url_record.link_type
else:
with open(filename, 'rb') as in_file:
if HTMLScraper.is_supported(
file=in_file, url_info=url_record.url_info):
link_type = 'html'
elif CSSScraper.is_supported(
file=in_file, url_info=url_record.url_info):
link_type = 'css'
else:
link_type = None
_logger.info(__(
_('Converting links in file ‘{filename}’ (type={type}).'),
filename=filename, type=link_type
))
if self._backup_enabled:
shutil.copy2(filename, filename + '.orig')
temp_filename = filename + '-new'
if link_type == 'css':
self._css_converter.convert(
filename, temp_filename, base_url=url_record.url)
elif link_type == 'html':
self._html_converter.convert(
filename, temp_filename, base_url=url_record.url)
else:
raise Exception('Unknown link type.')
os.remove(filename)
os.rename(temp_filename, filename)
[docs]class HTMLConverter(HTMLScraper, BaseDocumentConverter):
'''HTML converter.'''
def __init__(self, html_parser, element_walker, url_table):
super().__init__(html_parser, element_walker)
self._url_table = url_table
self._css_converter = CSSConverter(url_table)
self._out_file = None
self._css_already_done = None
self._base_url = None
self._encoding = None
[docs] def convert(self, input_filename, output_filename, base_url=None):
self._css_already_done = set()
self._base_url = base_url
with open(input_filename, 'rb') as in_file:
encoding = wpull.string.detect_encoding(
in_file.peek(1048576), is_html=True
)
with open(input_filename, 'rb') as in_file:
try:
doctype = self._html_parser.parse_doctype(in_file,
encoding=encoding)
is_xhtml = doctype and 'XHTML' in doctype
except AttributeError:
# using html5lib
is_xhtml = False
doctype = None
with open(input_filename, 'rb') as in_file:
with open(output_filename, 'wb') as bin_out_file:
elements = self.iter_elements(in_file, encoding=encoding)
out_file = io.TextIOWrapper(bin_out_file, encoding=encoding)
if doctype:
out_file.write(doctype)
out_file.write('\r\n')
self._out_file = out_file
self._encoding = encoding
for element in elements:
if isinstance(element, Comment):
out_file.write(
'<!--{0}-->'.format(element.text)
)
elif isinstance(element, Element):
if element.end:
if element.tag not in empty_tags:
self._out_file.write('</{0}>'
.format(element.tag))
if element.tail:
self._out_file.write(element.tail)
else:
self._convert_element(element, is_xhtml=is_xhtml)
elif isinstance(element, Doctype):
doctype = element.text
is_xhtml = doctype and 'XHTML' in doctype
self._out_file.close()
self._out_file = None
def _convert_element(self, element, is_xhtml=False):
self._out_file.write('<')
self._out_file.write(element.tag)
new_text = element.text
unfilled_value = object()
new_attribs = dict(((name, unfilled_value) for name in element.attrib))
for link_info in self._element_walker.iter_links_element(element):
new_value = None
if link_info.value_type == 'plain':
new_value = self._convert_plain(link_info)
elif link_info.value_type == 'css':
if link_info.attrib:
new_value = self._convert_css_attrib(link_info)
else:
text = self._convert_css_text(link_info)
if text:
new_text = text
if new_value and link_info.attrib:
if new_attribs[link_info.attrib] == unfilled_value:
new_attribs[link_info.attrib] = [new_value]
else:
new_attribs[link_info.attrib].append(new_value)
for name in new_attribs:
if new_attribs[name] == unfilled_value:
value = element.attrib[name]
else:
value = ' '.join(new_attribs[name])
self._out_file.write(' {0}="{1}"'.format(name, value))
if is_xhtml and element.tag in empty_tags:
self._out_file.write('/')
self._out_file.write('>')
if element.tag not in empty_tags:
if new_text:
self._out_file.write(new_text)
def _convert_plain(self, link_info):
base_url = self._base_url
if link_info.base_link:
if self._base_url:
base_url = wpull.url.urljoin(
self._base_url, link_info.base_link
)
else:
base_url = link_info.base_link
if base_url:
url = wpull.url.urljoin(base_url, link_info.link)
else:
url = link_info.link
url_info = URLInfo.parse(url, encoding=self._encoding)
new_url = self._get_new_url(url_info)
return new_url
def _convert_css_attrib(self, link_info):
done_key = (link_info.element, link_info.attrib)
if done_key in self._css_already_done:
return
text = wpull.string.to_str(
link_info.element.attrib.get(link_info.attrib)
)
new_value = self._css_converter.convert_text(
text, base_url=self._base_url
)
self._css_already_done.add(done_key)
return new_value
def _convert_css_text(self, link_info):
if link_info.element in self._css_already_done:
return
text = wpull.string.to_str(link_info.element.text)
new_text = self._css_converter.convert_text(
text, base_url=self._base_url
)
self._css_already_done.add(id(link_info.element))
return new_text
def _get_new_url(self, url_info):
try:
url_record = self._url_table.get_one(url_info.url)
except NotFound:
url_record = None
if url_record \
and url_record.status == Status.done and url_record.filename:
new_url = url_record.filename
else:
new_url = url_info.url
return new_url
[docs]class CSSConverter(CSSScraper, BaseDocumentConverter):
'''CSS converter.'''
def __init__(self, url_table):
super().__init__()
self._url_table = url_table
[docs] def convert(self, input_filename, output_filename, base_url=None):
with open(input_filename, 'rb') as in_file, \
open(output_filename, 'wb') as out_file:
encoding = wpull.string.detect_encoding(
wpull.util.peek_file(in_file))
out_stream = codecs.getwriter(encoding)(out_file)
for text, is_link in self.iter_processed_text(in_file, encoding):
if is_link:
out_stream.write(self.get_new_url(text, base_url))
else:
out_stream.write(text)
[docs] def convert_text(self, text, base_url=None):
text_list = []
for text, is_link in self.iter_processed_text(io.StringIO(text)):
if is_link:
text_list.append(self.get_new_url(text, base_url))
else:
text_list.append(text)
return ''.join(text_list)
[docs] def get_new_url(self, url, base_url=None):
if base_url:
url = wpull.url.urljoin(base_url, url)
try:
url_record = self._url_table.get_one(url)
except NotFound:
url_record = None
if url_record \
and url_record.status == Status.done and url_record.filename:
new_url = url_record.filename
else:
new_url = url
return new_url
# TODO: add javascript conversion