'''PhantomJS page loading and scrolling.'''
import contextlib
import copy
import gettext
import json
import logging
import os
import tempfile
import io
import namedlist
import asyncio
from typing import Callable
from wpull.backport.logging import BraceMessage as __
from wpull.document.html import HTMLReader
from wpull.body import Body
from wpull.driver.phantomjs import PhantomJSDriverParams, PhantomJSDriver
from wpull.namevalue import NameValueRecord
from wpull.pipeline.session import ItemSession
from wpull.processor.rule import ProcessingRule
from wpull.warc.format import WARCRecord
import wpull.url
PhantomJSParams = namedlist.namedtuple(
'PhantomJSParamsType', [
('snapshot_types', ('html', 'pdf')),
('wait_time', 1),
('num_scrolls', 10),
('smart_scroll', True),
('snapshot', True),
('viewport_size', (1200, 1920)),
('paper_size', (2400, 3840)),
('load_time', 900),
('custom_headers', {}),
('page_settings', {}),
]
)
'''PhantomJS parameters
Attributes:
snapshot_type (list): File types. Accepted are html, pdf, png, gif.
wait_time (float): Time between page scrolls.
num_scrolls (int): Maximum number of scrolls.
smart_scroll (bool): Whether to stop scrolling if number of
requests & responses do not change.
snapshot (bool): Whether to take snapshot files.
viewport_size (tuple): Width and height of the page viewport.
paper_size (tuple): Width and height of the paper size.
load_time (float): Maximum time to wait for page load.
custom_headers (dict): Default HTTP headers.
page_settings (dict): Page settings.
'''
_logger = logging.getLogger(__name__)
_ = gettext.gettext
[docs]class PhantomJSCrashed(Exception):
'''PhantomJS exited with non-zero code.'''
[docs]class PhantomJSCoprocessor(object):
'''PhantomJS coprocessor.
Args:
phantomjs_driver_factory: Callback function that accepts ``params``
argument and returns PhantomJSDriver
processing_rule: Processing
rule.
warc_recorder: WARC recorder.
root_dir (str): Root directory path for temp files.
'''
def __init__(self, phantomjs_driver_factory: Callable[..., PhantomJSDriver],
processing_rule: ProcessingRule,
phantomjs_params: PhantomJSParams,
warc_recorder=None, root_path='.'):
self._phantomjs_driver_factory = phantomjs_driver_factory
self._processing_rule = processing_rule
self._phantomjs_params = phantomjs_params
self._warc_recorder = warc_recorder
self._root_path = root_path
self._file_writer_session = None
@asyncio.coroutine
[docs] def process(self, item_session: ItemSession, request, response, file_writer_session):
'''Process PhantomJS.
Coroutine.
'''
if response.status_code != 200:
return
if not HTMLReader.is_supported(request=request, response=response):
return
_logger.debug('Starting PhantomJS processing.')
self._file_writer_session = file_writer_session
# FIXME: this is a quick hack for crashes. See #137.
attempts = int(os.environ.get('WPULL_PHANTOMJS_TRIES', 5))
for dummy in range(attempts):
try:
yield from self._run_driver(item_session, request, response)
except asyncio.TimeoutError:
_logger.warning(_('Waiting for page load timed out.'))
break
except PhantomJSCrashed as error:
_logger.exception(__('PhantomJS crashed: {}', error))
else:
break
else:
_logger.warning(__(
_('PhantomJS failed to fetch ‘{url}’. I am sorry.'),
url=request.url_info.url
))
@asyncio.coroutine
def _run_driver(self, item_session: ItemSession, request, response):
'''Start PhantomJS processing.'''
_logger.debug('Started PhantomJS processing.')
session = PhantomJSCoprocessorSession(
self._phantomjs_driver_factory, self._root_path,
self._processing_rule, self._file_writer_session,
request, response,
item_session, self._phantomjs_params, self._warc_recorder
)
with contextlib.closing(session):
yield from session.run()
_logger.debug('Ended PhantomJS processing.')
[docs]class PhantomJSCoprocessorSession(object):
'''PhantomJS coprocessor session.'''
def __init__(self, phantomjs_driver_factory, root_path,
processing_rule, file_writer_session,
request, response,
item_session: ItemSession, params, warc_recorder):
self._phantomjs_driver_factory = phantomjs_driver_factory
self._root_path = root_path
self._processing_rule = processing_rule
self._file_writer_session = file_writer_session
self._request = request
self._response = response
self._item_session = item_session
self._params = params
self._warc_recorder = warc_recorder
self._temp_filenames = []
self._action_warc_record = None
@asyncio.coroutine
[docs] def run(self):
scrape_snapshot_path = self._get_temp_path('phantom', suffix='.html')
action_log_path = self._get_temp_path('phantom-action', suffix='.txt')
event_log_path = self._get_temp_path('phantom-event', suffix='.txt')
snapshot_paths = [scrape_snapshot_path]
snapshot_paths.extend(self._get_snapshot_paths())
url = self._item_session.url_record.url
driver_params = PhantomJSDriverParams(
url=url,
snapshot_paths=snapshot_paths,
wait_time=self._params.wait_time,
num_scrolls=self._params.num_scrolls,
smart_scroll=self._params.smart_scroll,
snapshot=self._params.snapshot,
viewport_size=self._params.viewport_size,
paper_size=self._params.paper_size,
event_log_filename=event_log_path,
action_log_filename=action_log_path,
custom_headers=self._params.custom_headers,
page_settings=self._params.page_settings,
)
driver = self._phantomjs_driver_factory(params=driver_params)
_logger.info(__(
_('PhantomJS fetching ‘{url}’.'),
url=url
))
with contextlib.closing(driver):
yield from driver.start()
# FIXME: we don't account that things might be scrolling and
# downloading so it might not be a good idea to timeout like
# this
if self._params.load_time:
yield from asyncio.wait_for(
driver.process.wait(), self._params.load_time
)
else:
yield from driver.process.wait()
if driver.process.returncode != 0:
raise PhantomJSCrashed(
'PhantomJS exited with code {}'
.format(driver.process.returncode)
)
if self._warc_recorder:
self._add_warc_action_log(action_log_path, url)
for path in snapshot_paths:
self._add_warc_snapshot(path, url)
_logger.info(__(
_('PhantomJS fetched ‘{url}’.'),
url=url
))
def _get_temp_path(self, hint, suffix='.tmp'):
temp_fd, temp_path = tempfile.mkstemp(
dir=self._root_path, prefix='tmp-wpull-{}'.format(hint), suffix=suffix
)
os.close(temp_fd)
self._temp_filenames.append(temp_path)
return temp_path
def _get_snapshot_paths(self, infix='snapshot'):
for snapshot_type in self._params.snapshot_types or ():
path = self._file_writer_session.extra_resource_path(
'.{infix}.{file_type}'
.format(infix=infix, file_type=snapshot_type)
)
if not path:
temp_fd, temp_path = tempfile.mkstemp(
dir=self._root_path, prefix='tmp-phnsh',
suffix='.{}'.format(snapshot_type)
)
os.close(temp_fd)
path = temp_path
self._temp_filenames.append(temp_path)
yield path
def _add_warc_action_log(self, path, url):
'''Add the action log to the WARC file.'''
_logger.debug('Adding action log record.')
actions = []
with open(path, 'r', encoding='utf-8', errors='replace') as file:
for line in file:
actions.append(json.loads(line))
log_data = json.dumps(
{'actions': actions},
indent=4,
).encode('utf-8')
self._action_warc_record = record = WARCRecord()
record.set_common_fields('metadata', 'application/json')
record.fields['WARC-Target-URI'] = 'urn:X-wpull:snapshot?url={0}' \
.format(wpull.url.percent_encode_query_value(url))
record.block_file = io.BytesIO(log_data)
self._warc_recorder.set_length_and_maybe_checksums(record)
self._warc_recorder.write_record(record)
def _add_warc_snapshot(self, filename, url):
'''Add the snaphot to the WARC file.'''
_logger.debug('Adding snapshot record.')
extension = os.path.splitext(filename)[1]
content_type = {
'.pdf': 'application/pdf',
'.html': 'text/html',
'.png': 'image/png',
'.gif': 'image/gif'
}[extension]
record = WARCRecord()
record.set_common_fields('resource', content_type)
record.fields['WARC-Target-URI'] = 'urn:X-wpull:snapshot?url={0}' \
.format(wpull.url.percent_encode_query_value(url))
if self._action_warc_record:
record.fields['WARC-Concurrent-To'] = \
self._action_warc_record.fields[WARCRecord.WARC_RECORD_ID]
with open(filename, 'rb') as in_file:
record.block_file = in_file
self._warc_recorder.set_length_and_maybe_checksums(record)
self._warc_recorder.write_record(record)
def _scrape_document(self):
'''Extract links from the DOM.'''
mock_response = self._new_mock_response(
self._response, self._get_temp_path('phantom', '.html')
)
self._item_session.request = self._request
self._item_session.response = mock_response
self._processing_rule.scrape_document(item_session)
if mock_response.body:
mock_response.body.close()
def _new_mock_response(self, response, file_path):
'''Return a new mock Response with the content.'''
mock_response = copy.copy(response)
mock_response.body = Body(open(file_path, 'rb'))
mock_response.fields = NameValueRecord()
for name, value in response.fields.get_all():
mock_response.fields.add(name, value)
mock_response.fields['Content-Type'] = 'text/html; charset="utf-8"'
return mock_response
[docs] def close(self):
'''Clean up.'''
for path in self._temp_filenames:
if os.path.exists(path):
os.remove(path)