Source code for wpull.pipeline.progress

import datetime
import enum
import gettext
import logging
import sys
import time
import itertools
from typing import IO, cast
import http.client
import re

from wpull.application.hook import HookableMixin
from wpull.network.bandwidth import BandwidthMeter
import wpull.string
from wpull.protocol.abstract.request import BaseRequest, BaseResponse
from wpull.protocol.http.request import Response as HTTPResponse
from wpull.protocol.ftp.request import Response as FTPResponse

_ = gettext.gettext


[docs]class Measurement(enum.Enum): integer = 'integer' bytes = 'bytes'
[docs]class Progress(HookableMixin): '''Print file download progress as dots or a bar. Args: bar_style (bool): If True, print as a progress bar. If False, print dots every few seconds. stream: A file object. Default is usually stderr. human_format (true): If True, format sizes in units. Otherwise, output bits only. ''' def __init__(self, stream: IO[str]=sys.stderr): super().__init__() self._stream = stream self.min_value = 0 self.max_value = None self.current_value = 0 self.continue_value = None self.measurement = Measurement.integer self.event_dispatcher.register('update')
[docs] def update(self): self.event_dispatcher.notify('update', self)
[docs] def reset(self): self.min_value = 0 self.max_value = None self.current_value = 0 self.continue_value = None self.measurement = Measurement.integer
[docs]class ProtocolProgress(Progress):
[docs] class State(enum.Enum): idle = 'idle' sending_request = 'sending_request' sending_body = 'sending_body' receiving_response = 'receiving_response' receiving_body = 'receiving_body'
def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._state = self.State.sending_request
[docs] def update_from_begin_request(self, request: BaseRequest): self._state = self.State.sending_request self.reset() self.measurement = Measurement.bytes
[docs] def update_from_begin_response(self, response: BaseResponse): self._state = self.State.receiving_body self._process_response_sizes(response)
[docs] def update_from_end_response(self, response: BaseResponse): self._state = self.State.idle
def _process_response_sizes(self, response: BaseResponse): if hasattr(response, 'fields'): content_length = response.fields.get('Content-Length') elif hasattr(response, 'file_transfer_size'): content_length = response.file_transfer_size else: content_length = None if content_length: try: self.max_value = int(content_length) except ValueError: pass if response.protocol == 'http': response = cast(HTTPResponse, response) if not response.status_code == http.client.PARTIAL_CONTENT: return match = re.search( r'bytes +([0-9]+)-([0-9]+)/([0-9]+)', response.fields.get('Content-Range', '') ) if match: self.continue_value = int(match.group(1)) self.max_value = int(match.group(3)) elif response.protocol == 'ftp': response = cast(FTPResponse, response) if response.restart_value: self.continue_value = response.restart_value
[docs] def update_with_data(self, data): if self._state == self.State.receiving_body: self.current_value += len(data) self.update()
[docs]class ProgressPrinter(ProtocolProgress): def _print(self, *args): '''Convenience function for the print function. This function prints no newline. ''' string = ' '.join([str(arg) for arg in args]) print(string, end='', file=self._stream) def _println(self, *args): '''Convenience function for the print function.''' string = ' '.join([str(arg) for arg in args]) print(string, file=self._stream) def _flush(self): '''Flush the print stream.''' self._stream.flush()
[docs] def update_from_end_response(self, response: BaseResponse): super().update_from_end_response(response) self._println()
[docs]class DotProgress(ProgressPrinter): def __init__(self, *args, draw_interval: float=2.0, **kwargs): super().__init__(*args, **kwargs) self._draw_interval = draw_interval self._last_draw_time = 0
[docs] def update(self): super().update() if self._state != self.State.receiving_body: return time_now = time.time() if time_now - self._last_draw_time > self._draw_interval: self._print_dots() self._flush() self._last_draw_time = time_now
def _print_dots(self): '''Print a dot.''' self._print('.')
[docs]class BarProgress(ProgressPrinter): def __init__(self, *args, draw_interval: float=0.5, bar_width: int=25, human_format: bool=True, **kwargs): super().__init__(*args, **kwargs) self._draw_interval = draw_interval self._bar_width = bar_width self._human_format = human_format self._throbber_index = 0 self._throbber_iter = itertools.cycle( itertools.chain( range(bar_width), reversed(range(1, bar_width - 1)) )) self._bandwidth_meter = BandwidthMeter() self._previous_value = 0 self._last_draw_time = 0 self._start_time = time.time()
[docs] def update(self): super().update() if self._state != self.State.receiving_body: return difference = self.current_value - self._previous_value self._previous_value = self.current_value self._bandwidth_meter.feed(difference) time_now = time.time() if time_now - self._last_draw_time > self._draw_interval or self.current_value == self.max_value: self._print_status() self._flush() self._last_draw_time = time_now
def _print_status(self): '''Print an entire status line including bar and stats.''' self._clear_line() self._print(' ') if self.max_value: self._print_percent() self._print(' ') self._print_bar() else: self._print_throbber() self._print(' ') if self.measurement == Measurement.bytes: self._print_size_downloaded() else: self._print(self.current_value) self._print(' ') self._print_duration() self._print(' ') if self.measurement == Measurement.bytes: self._print_speed() self._flush() def _clear_line(self): '''Print ANSI code to clear the current line.''' self._print('\x1b[1G') self._print('\x1b[2K') def _print_throbber(self): '''Print an indefinite progress bar.''' self._print('[') for position in range(self._bar_width): self._print('O' if position == self._throbber_index else ' ') self._print(']') self._throbber_index = next(self._throbber_iter) def _print_bar(self): '''Print a progress bar.''' self._print('[') for position in range(self._bar_width): position_fraction = position / (self._bar_width - 1) position_bytes = position_fraction * self.max_value if position_bytes < (self.continue_value or 0): self._print('+') elif position_bytes <= (self.continue_value or 0) + self.current_value: self._print('=') else: self._print(' ') self._print(']') def _print_size_downloaded(self): '''Print the bytes downloaded.''' self._print(wpull.string.format_size(self.current_value)) def _print_duration(self): '''Print the elapsed download time.''' duration = int(time.time() - self._start_time) self._print(datetime.timedelta(seconds=duration)) def _print_speed(self): '''Print the current speed.''' if self._bandwidth_meter.num_samples: speed = self._bandwidth_meter.speed() if self._human_format: file_size_str = wpull.string.format_size(speed) else: file_size_str = '{:.1f} b'.format(speed * 8) speed_str = _('{preformatted_file_size}/s').format( preformatted_file_size=file_size_str ) else: speed_str = _('-- B/s') self._print(speed_str) def _print_percent(self): '''Print how much is done in percentage.''' fraction_done = ((self.continue_value or 0 + self.current_value) / self.max_value) self._print('{fraction_done:.1%}'.format(fraction_done=fraction_done))