Source code for wpull.network.dns

# encoding=utf-8
'''DNS resolution.'''
import datetime
import enum
import itertools
import logging
import random
import socket
import functools
import asyncio

import dns.resolver
import dns.exception
import dns.rdatatype
import dns.rrset
from typing import List, Sequence, Optional, Iterable, NamedTuple

from wpull.application.plugin import PluginFunctions, hook_interface, \
    event_interface
from wpull.backport.logging import BraceMessage as __
from wpull.cache import FIFOCache
from wpull.errors import DNSNotFound, NetworkError
from wpull.application.hook import HookableMixin, HookDisconnected
import wpull.util
import wpull.application.hook


_logger = logging.getLogger(__name__)


AddressInfo = NamedTuple(
    '_AddressInfo', [
        ('ip_address', str),
        ('family', int),
        ('flow_info', Optional[int]),
        ('scope_id', Optional[int])
    ])
'''Socket address.'''

_DNSInfo = NamedTuple(
    '_DNSInfo', [
        ('fetch_date', datetime.datetime),
        ('resource_records', List[dns.rrset.RRset])
    ])


[docs]class DNSInfo(_DNSInfo): '''DNS resource records.''' __slots__ = ()
[docs] def to_text_format(self): '''Format as detached DNS information as text.''' return '\n'.join(itertools.chain( (self.fetch_date.strftime('%Y%m%d%H%M%S'), ), (rr.to_text() for rr in self.resource_records), (), ))
[docs]class ResolveResult(object): '''DNS resolution information.''' def __init__(self, address_infos: List[AddressInfo], dns_infos: Optional[List[DNSInfo]]=None): self._address_infos = address_infos self._dns_infos = dns_infos @property def addresses(self) -> Sequence[AddressInfo]: '''The socket addresses.''' return self._address_infos @property def dns_infos(self) -> List[DNSInfo]: '''The DNS resource records.''' return self._dns_infos @property def first_ipv4(self) -> Optional[AddressInfo]: '''The first IPv4 address.''' for info in self._address_infos: if info.family == socket.AF_INET: return info @property def first_ipv6(self) -> Optional[AddressInfo]: '''The first IPV6 address.''' for info in self._address_infos: if info.family == socket.AF_INET6: return info
[docs] def shuffle(self): '''Shuffle the addresses.''' random.shuffle(self._address_infos)
[docs] def rotate(self): '''Move the first address to the last position.''' item = self._address_infos.pop(0) self._address_infos.append(item)
@enum.unique
[docs]class IPFamilyPreference(enum.Enum): '''IPv4 and IPV6 preferences.''' any = 'any' ipv4_only = socket.AF_INET ipv6_only = socket.AF_INET6
[docs]class Resolver(HookableMixin): '''Asynchronous resolver with cache and timeout. Args: family: IPv4 or IPv6 preference. timeout: A time in seconds used for timing-out requests. If not specified, this class relies on the underlying libraries. bind_address: An IP address to bind DNS requests if possible. cache: Cache to store results of any query. rotate: If result is cached rotates the results, otherwise, shuffle the results. ''' def __init__( self, family: IPFamilyPreference=IPFamilyPreference.any, timeout: Optional[float]=None, bind_address: Optional[str]=None, cache: Optional[FIFOCache]=None, rotate: bool=False): super().__init__() assert family in IPFamilyPreference, \ 'Unknown family {}.'.format(family) self._family = family self._timeout = timeout self._bind_address = bind_address self._cache = cache self._rotate = rotate self._dns_resolver = dns.resolver.Resolver() self.dns_python_enabled = True if timeout: self._dns_resolver.timeout = timeout self.hook_dispatcher.register(PluginFunctions.resolve_dns) self.event_dispatcher.register(PluginFunctions.resolve_dns_result) @classmethod
[docs] def new_cache(cls) -> FIFOCache: '''Return a default cache''' return FIFOCache(max_items=100, time_to_live=3600)
@asyncio.coroutine
[docs] def resolve(self, host: str) -> ResolveResult: '''Resolve hostname. Args: host: Hostname. Returns: Resolved IP addresses. Raises: DNSNotFound if the hostname could not be resolved or NetworkError if there was an error connecting to DNS servers. Coroutine. ''' _logger.debug(__('Lookup address {0}.', host)) try: host = self.hook_dispatcher.call(PluginFunctions.resolve_dns, host ) or host except HookDisconnected: pass cache_key = (host, self._family) if self._cache and cache_key in self._cache: resolve_result = self._cache[cache_key] _logger.debug(__('Return by cache {0}.', resolve_result)) if self._rotate: resolve_result.rotate() return resolve_result address_infos = [] dns_infos = [] if not self.dns_python_enabled: families = () elif self._family == IPFamilyPreference.any: families = (socket.AF_INET, socket.AF_INET6) elif self._family == IPFamilyPreference.ipv4_only: families = (socket.AF_INET, ) else: families = (socket.AF_INET6, ) for family in families: datetime_now = datetime.datetime.utcnow() try: answer = yield from self._query_dns(host, family) except DNSNotFound: continue else: dns_infos.append(DNSInfo(datetime_now, answer.response.answer)) address_infos.extend(self._convert_dns_answer(answer)) if not address_infos: # Maybe the address is defined in hosts file or mDNS if self._family == IPFamilyPreference.any: family = socket.AF_UNSPEC elif self._family == IPFamilyPreference.ipv4_only: family = socket.AF_INET else: family = socket.AF_INET6 results = yield from self._getaddrinfo(host, family) address_infos.extend(self._convert_addrinfo(results)) _logger.debug(__('Resolved addresses: {0}.', address_infos)) resolve_result = ResolveResult(address_infos, dns_infos) if self._cache: self._cache[cache_key] = resolve_result self.event_dispatcher.notify(PluginFunctions.resolve_dns_result, host, resolve_result) if self._rotate: resolve_result.shuffle() return resolve_result
@asyncio.coroutine def _query_dns(self, host: str, family: int=socket.AF_INET) \ -> dns.resolver.Answer: '''Query DNS using Python. Coroutine. ''' record_type = {socket.AF_INET: 'A', socket.AF_INET6: 'AAAA'}[family] event_loop = asyncio.get_event_loop() query = functools.partial( self._dns_resolver.query, host, record_type, source=self._bind_address) try: answer = yield from event_loop.run_in_executor(None, query) except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer) as error: # dnspython doesn't raise an instance with a message, so use the # class name instead. raise DNSNotFound( 'DNS resolution failed: {error}' .format(error=wpull.util.get_exception_message(error)) ) from error except dns.exception.DNSException as error: raise NetworkError( 'DNS resolution error: {error}' .format(error=wpull.util.get_exception_message(error)) ) from error else: return answer @asyncio.coroutine def _getaddrinfo(self, host: str, family: int=socket.AF_UNSPEC) \ -> List[tuple]: '''Query DNS using system resolver. Coroutine. ''' event_loop = asyncio.get_event_loop() query = event_loop.getaddrinfo(host, 0, family=family, proto=socket.IPPROTO_TCP) if self._timeout: query = asyncio.wait_for(query, self._timeout) try: results = yield from query except socket.error as error: if error.errno in ( socket.EAI_FAIL, socket.EAI_NODATA, socket.EAI_NONAME): raise DNSNotFound( 'DNS resolution failed: {error}'.format(error=error) ) from error else: raise NetworkError( 'DNS resolution error: {error}'.format(error=error) ) from error except asyncio.TimeoutError as error: raise NetworkError('DNS resolve timed out.') from error else: return results @classmethod def _convert_dns_answer(cls, answer: dns.resolver.Answer) \ -> Iterable[AddressInfo]: '''Convert the DNS answer to address info.''' assert answer.rdtype in (dns.rdatatype.A, dns.rdatatype.AAAA) if answer.rdtype == dns.rdatatype.A: family = socket.AF_INET else: family = socket.AF_INET6 for record in answer: ip_address = record.to_text() if family == socket.AF_INET6: flow_info, control_id = cls._get_ipv6_info(ip_address) else: flow_info = control_id = None yield AddressInfo(ip_address, family, flow_info, control_id) @classmethod def _convert_addrinfo(cls, results: List[tuple]) -> Iterable[AddressInfo]: '''Convert the result list to address info.''' for result in results: family = result[0] address = result[4] ip_address = address[0] if family == socket.AF_INET6: flow_info = address[2] control_id = address[3] else: flow_info = None control_id = None yield AddressInfo(ip_address, family, flow_info, control_id) @classmethod def _get_ipv6_info(cls, ip_address: str) -> tuple: '''Extract the flow info and control id.''' results = socket.getaddrinfo( ip_address, 0, proto=socket.IPPROTO_TCP, flags=socket.AI_NUMERICHOST) flow_info = results[0][4][2] control_id = results[0][4][3] return flow_info, control_id @staticmethod @hook_interface(PluginFunctions.resolve_dns)
[docs] def resolve_dns(host: str) -> str: '''Resolve the hostname to an IP address. Args: host: The hostname. This callback is to override the DNS lookup. It is useful when the server is no longer available to the public. Typically, large infrastructures will change the DNS settings to make clients no longer hit the front-ends, but rather go towards a static HTTP server with a "We've been acqui-hired!" page. In these cases, the original servers may still be online. Returns: str, None: ``None`` to use the original behavior or a string containing an IP address or an alternate hostname. ''' return host
@staticmethod @event_interface(PluginFunctions.resolve_dns_result)
[docs] def resolve_dns_result(host: str, result: ResolveResult): '''Callback when a DNS resolution has been made.'''