import abc
import asyncio
import enum
import gettext
import logging
import time
from typing import Optional, Sequence, TypeVar, Generic, Iterator, Tuple, Set
from wpull.backport.logging import BraceMessage as __
_logger = logging.getLogger(__name__)
POISON_PILL = object()
ITEM_PRIORITY = 1
POISON_PRIORITY = 0
WorkItemT = TypeVar('WorkItemT')
[docs]class ItemTask(Generic[WorkItemT], metaclass=abc.ABCMeta):
@abc.abstractmethod
@asyncio.coroutine
[docs] def process(self, work_item: WorkItemT):
pass
[docs]class ItemSource(Generic[WorkItemT], metaclass=abc.ABCMeta):
@abc.abstractmethod
@asyncio.coroutine
[docs] def get_item(self) -> Optional[WorkItemT]:
pass
[docs]class ItemQueue(Generic[WorkItemT]):
def __init__(self):
self._queue = asyncio.PriorityQueue()
self._unfinished_items = 0
self._worker_ready_condition = asyncio.Condition()
self._entry_count = 0
@asyncio.coroutine
[docs] def put_item(self, item: WorkItemT):
while self._queue.qsize() > 0:
yield from self._worker_ready_condition.acquire()
yield from self._worker_ready_condition.wait()
self._worker_ready_condition.release()
self._unfinished_items += 1
self._queue.put_nowait((ITEM_PRIORITY, self._entry_count, item))
self._entry_count += 1
[docs] def put_poison_nowait(self):
self._queue.put_nowait((POISON_PRIORITY, self._entry_count, POISON_PILL))
self._entry_count += 1
@asyncio.coroutine
[docs] def get(self) -> WorkItemT:
priority, entry_count, item = yield from self._queue.get()
yield from self._worker_ready_condition.acquire()
self._worker_ready_condition.notify_all()
self._worker_ready_condition.release()
return item
@asyncio.coroutine
[docs] def item_done(self):
self._unfinished_items -= 1
assert self._unfinished_items >= 0
yield from self._worker_ready_condition.acquire()
self._worker_ready_condition.notify_all()
self._worker_ready_condition.release()
@property
def unfinished_items(self) -> int:
return self._unfinished_items
@asyncio.coroutine
[docs] def wait_for_worker(self):
yield from self._worker_ready_condition.acquire()
yield from self._worker_ready_condition.wait()
self._worker_ready_condition.release()
[docs]class Worker(object):
def __init__(self, item_queue: ItemQueue, tasks: Sequence[ItemTask]):
self._item_queue = item_queue
self._tasks = tasks
self._worker_id_counter = 0
@asyncio.coroutine
[docs] def process_one(self, _worker_id=None):
item = yield from self._item_queue.get()
if item == POISON_PILL:
return item
_logger.debug(__('Worker id {} Processing item {}', _worker_id, item))
for task in self._tasks:
yield from task.process(item)
_logger.debug(__('Worker id {} Processed item {}', _worker_id, item))
yield from self._item_queue.item_done()
return item
@asyncio.coroutine
[docs] def process(self):
worker_id = self._worker_id_counter
self._worker_id_counter += 1
_logger.debug('Worker process id=%s', worker_id)
while True:
item = yield from self.process_one(_worker_id=worker_id)
if item == POISON_PILL:
_logger.debug('Worker quitting.')
break
[docs]class Producer(object):
def __init__(self, item_source: ItemSource, item_queue: ItemQueue):
self._item_source = item_source
self._item_queue = item_queue
self._running = False
@asyncio.coroutine
[docs] def process_one(self):
_logger.debug('Get item from source')
item = yield from self._item_source.get_item()
if item:
yield from self._item_queue.put_item(item)
return item
@asyncio.coroutine
[docs] def process(self):
self._running = True
while self._running:
item = yield from self.process_one()
if not item and self._item_queue.unfinished_items == 0:
self.stop()
break
elif not item:
yield from self._item_queue.wait_for_worker()
[docs] def stop(self):
if self._running:
_logger.debug('Producer stopping.')
self._running = False
[docs]class PipelineState(enum.Enum):
stopped = 'stopped'
running = 'running'
stopping = 'stopping'
[docs]class Pipeline(object):
def __init__(self, item_source: ItemSource, tasks: Sequence[ItemTask],
item_queue: Optional[ItemQueue]=None):
self._item_queue = item_queue or ItemQueue()
self._tasks = tasks
self._producer = Producer(item_source, self._item_queue)
self._worker = Worker(self._item_queue, tasks)
self._state = PipelineState.stopped
self._concurrency = 1
self._producer_task = None
self._worker_tasks = set()
self._unpaused_event = asyncio.Event()
self.skippable = False
@property
def tasks(self):
return self._tasks
@asyncio.coroutine
[docs] def process(self):
if self._state == PipelineState.stopped:
self._state = PipelineState.running
self._producer_task = asyncio.get_event_loop().create_task(self._run_producer_wrapper())
self._unpaused_event.set()
while self._state == PipelineState.running:
yield from self._process_one_worker()
yield from self._shutdown_processing()
@asyncio.coroutine
def _process_one_worker(self):
assert self._state == PipelineState.running, self._state
while len(self._worker_tasks) < self._concurrency:
_logger.debug('Creating worker')
worker_task = asyncio.get_event_loop().create_task(self._worker.process())
self._worker_tasks.add(worker_task)
if self._worker_tasks:
wait_coroutine = asyncio.wait(
self._worker_tasks, return_when=asyncio.FIRST_COMPLETED)
done_tasks = (yield from wait_coroutine)[0]
_logger.debug('%d worker tasks completed', len(done_tasks))
for task in done_tasks:
task.result()
self._worker_tasks.remove(task)
else:
yield from self._unpaused_event.wait()
@asyncio.coroutine
def _shutdown_processing(self):
assert self._state == PipelineState.stopping
_logger.debug('Exited workers loop.')
if self._worker_tasks:
_logger.debug('Waiting for workers to stop.')
yield from asyncio.wait(self._worker_tasks)
_logger.debug('Waiting for producer to stop.')
self._worker_tasks.clear()
yield from self._producer_task
self._state = PipelineState.stopped
[docs] def stop(self):
if self._state == PipelineState.running:
self._state = PipelineState.stopping
self._producer.stop()
self._kill_workers()
@asyncio.coroutine
def _run_producer_wrapper(self):
'''Run the producer, if exception, stop engine.'''
try:
yield from self._producer.process()
except Exception as error:
if not isinstance(error, StopIteration):
# Stop the workers so the producer exception will be handled
# when we finally yield from this coroutine
_logger.debug('Producer died.', exc_info=True)
self.stop()
raise
else:
self.stop()
def _kill_workers(self):
for dummy in range(len(self._worker_tasks)):
_logger.debug('Put poison pill.')
self._item_queue.put_poison_nowait()
@property
def concurrency(self) -> int:
return self._concurrency
@concurrency.setter
def concurrency(self, new_concurrency: int):
if new_concurrency < 0:
raise ValueError('Concurrency cannot be negative')
change = new_concurrency - self._concurrency
self._concurrency = new_concurrency
if self._state != PipelineState.running:
return
if change < 0:
for dummy in range(abs(change)):
_logger.debug('Put poison pill for less workers.')
self._item_queue.put_poison_nowait()
elif change > 0:
_logger.debug('Put 1 poison pill to trigger more workers.')
self._item_queue.put_poison_nowait()
if self._concurrency:
self._unpaused_event.set()
else:
self._unpaused_event.clear()
def _warn_discarded_items(self):
_logger.warning(__(
gettext.ngettext(
'Discarding {num} unprocessed item.',
'Discarding {num} unprocessed items.',
self._item_queue.unfinished_items
),
num=self._item_queue.unfinished_items
))
[docs]class PipelineSeries(object):
def __init__(self, pipelines: Iterator[Pipeline]):
self._pipelines = tuple(pipelines)
self._concurrency = 1
self._concurrency_pipelines = set()
@property
def pipelines(self) -> Tuple[Pipeline]:
return self._pipelines
@property
def concurrency(self) -> int:
return self._concurrency
@concurrency.setter
def concurrency(self, new_concurrency: int):
self._concurrency = new_concurrency
for pipeline in self._pipelines:
if pipeline in self._concurrency_pipelines:
pipeline.concurrency = new_concurrency
@property
def concurrency_pipelines(self) -> Set[Pipeline]:
return self._concurrency_pipelines