Source code for wpull.application.tasks.database
import codecs
import gettext
import itertools
import asyncio
import logging
import sys
from wpull.backport.logging import BraceMessage as __
from wpull.database.base import AddURLInfo
from wpull.database.sqltable import GenericSQLURLTable
from wpull.pipeline.app import AppSession
from wpull.pipeline.pipeline import ItemTask
import wpull.util
import wpull.url
_ = gettext.gettext
_logger = logging.getLogger(__name__)
[docs]class DatabaseSetupTask(ItemTask[AppSession]):
@asyncio.coroutine
[docs] def process(self, session: AppSession):
if session.args.database_uri:
session.factory.class_map[
'URLTableImplementation'] = GenericSQLURLTable
url_table_impl = session.factory.new(
'URLTableImplementation', session.args.database_uri)
else:
url_table_impl = session.factory.new(
'URLTableImplementation', path=session.args.database)
url_table = session.factory.new('URLTable', url_table_impl)
# TODO: add a test for this
_logger.debug(_('Releasing any in-progress items in database.'))
url_table.release()
[docs]class InputURLTask(ItemTask[AppSession]):
@asyncio.coroutine
[docs] def process(self, session: AppSession):
url_table = session.factory['URLTable']
url_count = 0
for batch in wpull.util.grouper(self._read_input_urls(session), 1000):
urls = url_table.add_many(AddURLInfo(url_info.url, None, None) for url_info in batch if url_info)
# TODO: attach hook for notifying progress
url_count += len(urls)
# TODO: check if database is empty
# TODO: add a test for this
# if not url_count:
# raise ValueError(_('No URLs found in input file.'))
@classmethod
def _read_input_urls(cls, session: AppSession, default_scheme='http'):
'''Read the URLs provided by the user.'''
url_string_iter = session.args.urls or ()
# FIXME: url rewriter isn't created yet
url_rewriter = session.factory.get('URLRewriter')
if session.args.input_file:
if session.args.force_html:
lines = cls._input_file_as_html_links(session)
else:
lines = cls._input_file_as_lines(session)
url_string_iter = itertools.chain(url_string_iter, lines)
base_url = session.args.base
for url_string in url_string_iter:
_logger.debug(__('Parsing URL {0}', url_string))
if base_url:
url_string = wpull.url.urljoin(base_url, url_string)
url_info = wpull.url.URLInfo.parse(
url_string, default_scheme=default_scheme)
_logger.debug(__('Parsed URL {0}', url_info))
if url_rewriter:
# TODO: this logic should be a hook
url_info = url_rewriter.rewrite(url_info)
_logger.debug(__('Rewritten URL {0}', url_info))
yield url_info
@classmethod
def _input_file_as_lines(cls, session: AppSession):
'''Read lines from input file and return them.'''
if session.args.input_file == sys.stdin:
input_file = session.args.input_file
else:
reader = codecs.getreader(session.args.local_encoding or 'utf-8')
input_file = reader(session.args.input_file)
return input_file
@classmethod
def _input_file_as_html_links(cls, session: AppSession):
'''Read input file as HTML and return the links.'''
scrape_result = session.factory['HTMLScraper'].scrape_file(
session.args.input_file,
encoding=session.args.local_encoding or 'utf-8'
)
for context in scrape_result.link_contexts:
yield context.link