Source code for wpull.protocol.abstract.stream

'''Abstract stream classes'''
import functools

import asyncio

from typing import Callable

import wpull.util


[docs]def close_stream_on_error(func): '''Decorator to close stream on error.''' @asyncio.coroutine @functools.wraps(func) def wrapper(self, *args, **kwargs): with wpull.util.close_on_error(self.close): return (yield from func(self, *args, **kwargs)) return wrapper
DataEventCallback = Callable[[bytes], None]
[docs]class DataEventDispatcher(object): def __init__(self): self._read_listeners = set() self._write_listeners = set()
[docs] def add_read_listener(self, callback: DataEventCallback): self._read_listeners.add(callback)
[docs] def remove_read_listener(self, callback: DataEventCallback): self._read_listeners.remove(callback)
[docs] def add_write_listener(self, callback: DataEventCallback): self._write_listeners.add(callback)
[docs] def remove_write_listener(self, callback: DataEventCallback): self._write_listeners.remove(callback)
[docs] def notify_read(self, data: bytes): for callback in self._read_listeners: callback(data)
[docs] def notify_write(self, data: bytes): for callback in self._write_listeners: callback(data)