Source code for wpull.urlfilter

# encoding=utf-8
'''URL filters.'''
import abc
import fnmatch
import re

from typing import List, Iterator

from wpull.pipeline.item import URLRecord
from wpull.url import URLInfo, schemes_similar, is_subdir


[docs]class BaseURLFilter(object, metaclass=abc.ABCMeta): '''Base class for URL filters. The Processor uses filters to determine whether a URL should be downloaded. ''' @abc.abstractmethod
[docs] def test(self, url_info: URLInfo, url_record: URLRecord) -> bool: '''Return whether the URL should be downloaded. Args: url_info: URL to be tested. url_record: Fetch metadata about the URL. Returns: If True, the filter passed and the URL should be downloaded. '''
[docs]class DemuxURLFilter(BaseURLFilter): '''Puts multiple url filters into one.''' def __init__(self, url_filters: Iterator[BaseURLFilter]): self._url_filters = url_filters @property def url_filters(self) -> Iterator[BaseURLFilter]: return self._url_filters
[docs] def test(self, url_info, url_table_record): return self.test_info(url_info, url_table_record)['verdict']
[docs] def test_info(self, url_info, url_table_record) -> dict: '''Returns info about which filters passed or failed. Returns: dict: A dict containing the keys: * ``verdict`` (bool): Whether all the tests passed. * ``passed`` (set): A set of URLFilters that passed. * ``failed`` (set): A set of URLFilters that failed. * ``map`` (dict): A mapping from URLFilter class name (str) to the verdict (bool). ''' passed = set() failed = set() test_dict = dict() for url_filter in self._url_filters: result = url_filter.test(url_info, url_table_record) test_dict[url_filter.__class__.__name__] = result if result: passed.add(url_filter) else: failed.add(url_filter) info = { 'verdict': len(failed) == 0, 'passed': passed, 'failed': failed, 'map': test_dict, } return info
[docs]class SchemeFilter(BaseURLFilter): '''Allow URL if the URL is in list.''' def __init__(self, allowed=('http', 'https', 'ftp')): self._allowed = allowed
[docs] def test(self, url_info, url_table_record): return url_info.scheme in self._allowed
[docs]class HTTPSOnlyFilter(BaseURLFilter): '''Allow URL if the URL is HTTPS.'''
[docs] def test(self, url_info, url_table_record): return url_info.scheme == 'https'
[docs]class FollowFTPFilter(BaseURLFilter): '''Follow links to FTP URLs.''' def __init__(self, follow=False): self._follow = follow
[docs] def test(self, url_info, url_table_record): if url_info.scheme == 'ftp': if url_table_record.parent_url and \ url_table_record.parent_url_info.scheme in ('http', 'https'): return self._follow else: return True else: return True
[docs]class BackwardDomainFilter(BaseURLFilter): '''Return whether the hostname matches a list of hostname suffixes.''' def __init__(self, accepted=None, rejected=None): self._accepted = accepted self._rejected = rejected
[docs] def test(self, url_info, url_table_record): test_domain = url_info.hostname if self._accepted and not self.match(self._accepted, test_domain): return False if self._rejected and self.match(self._rejected, test_domain): return False return True
@classmethod
[docs] def match(cls, domain_list, test_domain): if not test_domain: return False for domain in domain_list: if test_domain.endswith(domain): return True
[docs]class HostnameFilter(BaseURLFilter): '''Return whether the hostname matches exactly in a list.''' def __init__(self, accepted=None, rejected=None): self._accepted = accepted self._rejected = rejected
[docs] def test(self, url_info, url_table_record): test_domain = url_info.hostname if self._accepted and not test_domain in self._accepted: return False if self._rejected and test_domain in self._rejected: return False return True
[docs]class RecursiveFilter(BaseURLFilter): '''Return ``True`` if recursion is used.''' def __init__(self, enabled=False, page_requisites=False): self._enabled = enabled self._page_requisites = page_requisites
[docs] def test(self, url_info, url_table_record): if url_table_record.level == 0: return True if url_table_record.inline_level: if self._page_requisites: return True else: if self._enabled: return True
[docs]class LevelFilter(BaseURLFilter): '''Allow URLs up to a level of recursion.''' def __init__(self, max_depth, inline_max_depth=5): self._depth = max_depth self._inline_max_depth = inline_max_depth
[docs] def test(self, url_info, url_table_record): if self._inline_max_depth and url_table_record.inline_level and \ url_table_record.inline_level > self._inline_max_depth: return False if self._depth: if url_table_record.inline_level: # Allow exceeding level to allow fetching html pages with # frames, for example, but no more than that return url_table_record.level <= self._depth + 2 else: return url_table_record.level <= self._depth else: return True
[docs]class TriesFilter(BaseURLFilter): '''Allow URLs that have been attempted up to a limit of tries.''' def __init__(self, max_tries): self._tries = max_tries
[docs] def test(self, url_info, url_table_record): if self._tries: return url_table_record.try_count < self._tries else: return True
[docs]class ParentFilter(BaseURLFilter): '''Filter URLs that descend up parent paths.'''
[docs] def test(self, url_info, url_table_record): if url_table_record.inline_level: return True if url_table_record.root_url: top_url_info = URLInfo.parse(url_table_record.root_url) else: top_url_info = url_info if schemes_similar(url_info.scheme, top_url_info.scheme) \ and url_info.hostname == top_url_info.hostname \ and ( url_info.scheme != top_url_info.scheme or url_info.port == top_url_info.port ): return is_subdir(top_url_info.path, url_info.path, trailing_slash=True) return True
[docs]class SpanHostsFilter(BaseURLFilter): '''Filter URLs that go to other hostnames.''' def __init__(self, hostnames, enabled=False, page_requisites=False, linked_pages=False): self._hostnames = hostnames self._enabled = enabled self._page_requisites = page_requisites self._linked_pages = linked_pages
[docs] def test(self, url_info, url_table_record): if self._enabled: return True if url_info.hostname in self._hostnames: return True if self._page_requisites and url_table_record.inline_level: return True if self._linked_pages and url_table_record.parent_url_info \ and url_table_record.parent_url_info.hostname in self._hostnames: return True
[docs]class RegexFilter(BaseURLFilter): '''Filter URLs that match a regular expression.''' def __init__(self, accepted=None, rejected=None): self._accepted = accepted self._rejected = rejected
[docs] def test(self, url_info, url_table_record): if self._accepted and not re.search(self._accepted, url_info.url): return False if self._rejected and re.search(self._rejected, url_info.url): return False return True
[docs]class DirectoryFilter(BaseURLFilter): '''Filter URLs that match a directory path part.''' def __init__(self, accepted=None, rejected=None): self._accepted = accepted self._rejected = rejected
[docs] def test(self, url_info, url_table_record): if self._accepted and not self._is_accepted(url_info): return False if self._rejected and self._is_rejected(url_info): return False return True
def _is_accepted(self, url_info): for dirname in self._accepted: if is_subdir(dirname, url_info.path, wildcards=True): return True def _is_rejected(self, url_info): for dirname in self._rejected: if is_subdir(dirname, url_info.path, wildcards=True): return True
[docs]class BackwardFilenameFilter(BaseURLFilter): '''Filter URLs that match the filename suffixes.''' def __init__(self, accepted=None, rejected=None): self._accepted = accepted self._rejected = rejected
[docs] def test(self, url_info, url_table_record): test_filename = url_info.path.rsplit('/', 1)[-1] if not test_filename: return True if self._accepted: if self._rejected: return self.match(self._accepted, test_filename)\ and not self.match(self._rejected, test_filename) else: return self.match(self._accepted, test_filename) elif self._rejected and self.match(self._rejected, test_filename): return False return True
@classmethod
[docs] def match(cls, suffix_list, test_filename): if not test_filename: return False for suffix in suffix_list: match = re.search(fnmatch.translate(suffix), test_filename) if match: return True