Source code for wpull.pipeline.session

import asyncio
import gettext
import logging

from typing import Optional

from wpull.database.base import AddURLInfo, NotFound
from wpull.pipeline.app import AppSession
from wpull.pipeline.item import URLRecord, Status, URLResult, URLProperties, \
    URLData, LinkType
from wpull.pipeline.pipeline import ItemSource
from wpull.backport.logging import BraceMessage as __
from wpull.protocol.abstract.request import URLPropertyMixin, \
    ProtocolResponseMixin, BaseResponse, BaseRequest
from wpull.url import parse_url_or_log

_logger = logging.getLogger(__name__)
_ = gettext.gettext


[docs]class ItemSession(object): '''Item for a URL that needs to processed.''' def __init__(self, app_session: AppSession, url_record: URLRecord): self.app_session = app_session self.url_record = url_record self._processed = False self._try_count_incremented = False self._add_url_batch = [] self._request = None self._response = None @property def is_virtual(self) -> bool: return False @property def is_processed(self): '''Return whether the item has been processed.''' return self._processed @property def request(self) -> BaseRequest: return self._request @request.setter def request(self, request: BaseRequest): self._request = request @property def response(self) -> BaseResponse: return self._response @response.setter def response(self, response: BaseResponse): self._response = response
[docs] def skip(self): '''Mark the item as processed without download.''' _logger.debug(__(_('Skipping ‘{url}’.'), url=self.url_record.url)) self.app_session.factory['URLTable'].check_in(self.url_record.url, Status.skipped) self._processed = True
[docs] def set_status(self, status: Status, increment_try_count: bool=True, filename: str=None): '''Mark the item with the given status. Args: status: a value from :class:`Status`. increment_try_count: if True, increment the ``try_count`` value ''' url = self.url_record.url assert not self._try_count_incremented, (url, status) if increment_try_count: self._try_count_incremented = True _logger.debug(__('Marking URL {0} status {1}.', url, status)) url_result = URLResult() url_result.filename = filename self.app_session.factory['URLTable'].check_in( url, status, increment_try_count=increment_try_count, url_result=url_result, ) self._processed = True
[docs] def add_url(self, url: str, url_properites: Optional[URLProperties]=None, url_data: Optional[URLData]=None): url_info = parse_url_or_log(url) if not url_info: return url_properties = url_properites or URLProperties() url_data = url_data or URLData() add_url_info = AddURLInfo(url, url_properties, url_data) self._add_url_batch.append(add_url_info) if len(self._add_url_batch) >= 1000: self.app_session.factory['URLTable'].add_many(self._add_url_batch) self._add_url_batch.clear()
[docs] def add_child_url(self, url: str, inline: bool=False, link_type: Optional[LinkType]=None, post_data: Optional[str]=None, level: Optional[int]=None, replace: bool=False): '''Add links scraped from the document with automatic values. Args: url: A full URL. (It can't be a relative path.) inline: Whether the URL is an embedded object. link_type: Expected link type. post_data: URL encoded form data. The request will be made using POST. (Don't use this to upload files.) level: The child depth of this URL. replace: Whether to replace the existing entry in the database table so it will be redownloaded again. This function provides values automatically for: * ``inline`` * ``level`` * ``parent``: The referrering page. * ``root`` See also :meth:`add_url`. ''' url_properties = URLProperties() url_properties.level = self.url_record.level + 1 if level is None else level url_properties.inline_level = (self.url_record.inline_level or 0) + 1 if inline else None url_properties.parent_url = self.url_record.url url_properties.root_url = self.url_record.root_url or self.url_record.url url_properties.link_type = link_type url_data = URLData() url_data.post_data = post_data if replace: self.app_session.factory['URLTable'].remove_many([url]) self.add_url(url, url_properties, url_data)
[docs] def child_url_record(self, url: str, inline: bool=False, link_type: Optional[LinkType]=None, post_data: Optional[str]=None, level: Optional[int]=None): '''Return a child URLRecord. This function is useful for testing filters before adding to table. ''' url_record = URLRecord() url_record.url = url url_record.status = Status.todo url_record.try_count = 0 url_record.level = self.url_record.level + 1 if level is None else level url_record.root_url = self.url_record.root_url or self.url_record.url url_record.parent_url = self.url_record.url url_record.inline_level = (self.url_record.inline_level or 0) + 1 if inline else 0 url_record.link_type = link_type url_record.post_data = post_data return url_record
[docs] def finish(self): self.app_session.factory['URLTable'].add_many(self._add_url_batch) self._add_url_batch.clear()
[docs] def update_record_value(self, **kwargs): self.app_session.factory['URLTable'].update_one(self.url_record.url, **kwargs) for key, value in kwargs.items(): setattr(self.url_record, key, value)
[docs]class URLItemSource(ItemSource[ItemSession]): def __init__(self, app_session: AppSession): self._app_session = app_session @asyncio.coroutine
[docs] def get_item(self) -> Optional[ItemSession]: try: url_record = self._app_session.factory['URLTable'].check_out(Status.todo) except NotFound: try: url_record = self._app_session.factory['URLTable'].check_out(Status.error) except NotFound: return None item_session = ItemSession(self._app_session, url_record) return item_session