'''URL table wrappers.'''
from wpull.application.plugin import event_interface, PluginFunctions
from wpull.database.base import BaseURLTable
from wpull.application.hook import HookableMixin, HookDisconnected
from wpull.pipeline.item import Status, URLRecord
from wpull.url import parse_url_or_log, URLInfo
import wpull.application.hook
[docs]class URLTableHookWrapper(BaseURLTable, HookableMixin):
'''URL table wrapper with scripting hooks.
Args:
url_table: URL table.
Attributes:
url_table: URL table.
'''
def __init__(self, url_table):
super().__init__()
self.url_table = url_table
self._queue_counter = 0
self.event_dispatcher.register(PluginFunctions.queued_url)
self.event_dispatcher.register(PluginFunctions.dequeued_url)
[docs] def queue_count(self):
'''Return the number of URLs queued in this session.'''
return self._queue_counter
[docs] def count(self):
return self.url_table.count()
[docs] def get_one(self, url):
return self.url_table.get_one(url)
[docs] def get_all(self):
return self.url_table.get_all()
[docs] def add_many(self, urls):
added_urls = tuple(self.url_table.add_many(urls))
for url in added_urls:
url_info = parse_url_or_log(url)
if url_info:
self._queue_counter += 1
self.event_dispatcher.notify(PluginFunctions.queued_url, url_info)
return added_urls
[docs] def check_out(self, filter_status, filter_level=None):
url_record = self.url_table.check_out(filter_status, filter_level)
self._queue_counter -= 1
self.event_dispatcher.notify(PluginFunctions.dequeued_url, url_record.url_info, url_record)
return url_record
[docs] def check_in(self, url, new_status, increment_try_count=True,
url_result=None):
if new_status == Status.error:
self._queue_counter += 1
url_info = parse_url_or_log(url)
if url_info:
self.event_dispatcher.notify(PluginFunctions.queued_url, url_info)
return self.url_table.check_in(url, new_status, increment_try_count=increment_try_count, url_result=url_result)
[docs] def update_one(self, *args, **kwargs):
return self.url_table.update_one(*args, **kwargs)
[docs] def release(self):
return self.url_table.release()
[docs] def remove_many(self, urls):
return self.url_table.remove_many(urls)
[docs] def close(self):
return self.url_table.close()
[docs] def add_visits(self, visits):
return self.url_table.add_visits(visits)
[docs] def get_revisit_id(self, url, payload_digest):
return self.url_table.get_revisit_id(url, payload_digest)
[docs] def get_hostnames(self):
return self.url_table.get_hostnames()
@staticmethod
@event_interface(PluginFunctions.queued_url)
[docs] def queued_url(url_info: URLInfo):
'''Callback fired after an URL was put into the queue.
'''
@staticmethod
@event_interface(PluginFunctions.dequeued_url)
[docs] def dequeued_url(url_info: URLInfo, record_info: URLRecord):
'''Callback fired after an URL was retrieved from the queue.
'''
[docs] def get_root_url_todo_count(self):
return self.url_table.get_root_url_todo_count()
[docs] def convert_check_out(self):
return self.url_table.convert_check_out()
[docs] def convert_check_in(self, file_id: int, status: Status):
self.url_table.convert_check_in(file_id, status)