diff options
Diffstat (limited to 'crocoite/cli.py')
-rw-r--r-- | crocoite/cli.py | 361 |
1 files changed, 225 insertions, 136 deletions
diff --git a/crocoite/cli.py b/crocoite/cli.py index 2cbbfa8..04bbb19 100644 --- a/crocoite/cli.py +++ b/crocoite/cli.py @@ -19,149 +19,238 @@ # THE SOFTWARE. """ -Standalone and Celery command line interface +Command line interface """ -import os, logging, argparse -from io import BytesIO -from datetime import datetime -import pychrome -from urllib.parse import urlsplit - -from celery import Celery -from celery.utils.log import get_task_logger - -from . import behavior, defaults -from .warc import WarcLoader, SerializingWARCWriter -from .browser import ChromeService, NullService -from .util import packageUrl, getFormattedViewportMetrics - -app = Celery ('crocoite.distributed') -app.config_from_object('celeryconfig') -logger = get_task_logger('crocoite.distributed.archive') - -# defaults can be changed below using argparse; track started state, because tasks are usually long-running -@app.task(bind=True, track_started=True) -def archive (self, url, output, browser, logBuffer, maxBodySize, idleTimeout, - timeout, enabledBehaviorNames): - """ - Archive a single URL - - Supports these config keys (celeryconfig): - - warc_filename = '{domain}-{date}-{id}.warc.gz' - temp_dir = '/tmp/' - finished_dir = '/tmp/finished' - """ - - ret = {'stats': None} - - self.update_state (state='PROGRESS', meta={'step': 'start'}) - - service = ChromeService () - if browser: - service = NullService (browser) - - allBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available)) - - with service as browser: - browser = pychrome.Browser(url=browser) - - if not output: - parsedUrl = urlsplit (url) - outFile = app.conf.warc_filename.format ( - id=self.request.id, - domain=parsedUrl.hostname.replace ('/', '-'), - date=datetime.utcnow ().isoformat (), - ) - outPath = os.path.join (app.conf.temp_dir, outFile) - fd = open (outPath, 'wb') - else: - fd = open (output, 'wb') - writer = SerializingWARCWriter (fd, gzip=True) - - with WarcLoader (browser, url, writer, logBuffer=logBuffer, - maxBodySize=maxBodySize) as l: - version = l.tab.Browser.getVersion () - payload = { - 'software': __package__, - 'browser': version['product'], - 'useragent': version['userAgent'], - 'viewport': getFormattedViewportMetrics (l.tab), - } - warcinfo = writer.create_warcinfo_record (filename=None, info=payload) - writer.write_record (warcinfo) - - # not all behavior scripts are allowed for every URL, filter them - enabledBehavior = list (filter (lambda x: url in x, - map (lambda x: x (l), allBehavior))) - - self.update_state (state='PROGRESS', meta={'step': 'onload'}) - for b in enabledBehavior: - logger.debug ('starting onload behavior {}'.format (b.name)) - b.onload () - l.start () - - self.update_state (state='PROGRESS', meta={'step': 'fetch'}) - l.waitIdle (idleTimeout, timeout) - - self.update_state (state='PROGRESS', meta={'step': 'onstop'}) - for b in enabledBehavior: - logger.debug ('starting onstop behavior {}'.format (b.name)) - b.onstop () - - # if we stopped due to timeout, wait for remaining assets - l.waitIdle (2, 60) - l.stop () - - self.update_state (state='PROGRESS', meta={'step': 'onfinish'}) - for b in enabledBehavior: - logger.debug ('starting onfinish behavior {}'.format (b.name)) - b.onfinish () - - ret['stats'] = l.stats - writer.flush () - if not output: - outPath = os.path.join (app.conf.finished_dir, outFile) - os.rename (fd.name, outPath) +import argparse, sys, signal, asyncio, os, json +from traceback import TracebackException +from enum import IntEnum +from yarl import URL +from http.cookies import SimpleCookie +import pkg_resources +try: + import manhole + manhole.install (patch_fork=False, oneshot_on='USR1') +except ModuleNotFoundError: + pass + +from . import behavior, browser +from .controller import SinglePageController, \ + ControllerSettings, StatsHandler, LogHandler, \ + RecursiveController, DepthLimit, PrefixLimit +from .devtools import Passthrough, Process +from .warc import WarcHandler +from .logger import Logger, JsonPrintConsumer, DatetimeConsumer, \ + WarcHandlerConsumer, Level +from .devtools import Crashed + +def absurl (s): + """ argparse: Absolute URL """ + u = URL (s) + if u.is_absolute (): + return u + raise argparse.ArgumentTypeError ('Must be absolute') + +def cookie (s): + """ argparse: Cookie """ + c = SimpleCookie (s) + # for some reason the constructor does not raise an exception if the cookie + # supplied is invalid. It’ll simply be empty. + if len (c) != 1: + raise argparse.ArgumentTypeError ('Invalid cookie') + # we want a single Morsel + return next (iter (c.values ())) + +def cookiejar (f): + """ argparse: Cookies from file """ + cookies = [] + try: + with open (f, 'r') as fd: + for l in fd: + l = l.lstrip () + if l and not l.startswith ('#'): + cookies.append (cookie (l)) + except FileNotFoundError: + raise argparse.ArgumentTypeError (f'Cookie jar "{f}" does not exist') + return cookies + +class SingleExitStatus(IntEnum): + """ Exit status for single-shot command line """ + Ok = 0 + Fail = 1 + BrowserCrash = 2 + Navigate = 3 + +def single (): + parser = argparse.ArgumentParser(description='crocoite helper tools to fetch individual pages.') + parser.add_argument('--browser', help='DevTools URL', type=absurl, metavar='URL') + parser.add_argument('--timeout', default=1*60*60, type=int, help='Maximum time for archival', metavar='SEC') + parser.add_argument('--idle-timeout', default=30, type=int, help='Maximum idle seconds (i.e. no requests)', dest='idleTimeout', metavar='SEC') + parser.add_argument('--behavior', help='Enable behavior script', + dest='enabledBehaviorNames', + default=list (behavior.availableMap.keys ()), + choices=list (behavior.availableMap.keys ()), + metavar='NAME', nargs='*') + parser.add_argument('--warcinfo', help='Add extra information to warcinfo record', + metavar='JSON', type=json.loads) + # re-using curl’s short/long switch names whenever possible + parser.add_argument('-k', '--insecure', + action='store_true', + help='Disable certificate validation') + parser.add_argument ('-b', '--cookie', type=cookie, metavar='SET-COOKIE', + action='append', default=[], help='Cookies in Set-Cookie format.') + parser.add_argument ('-c', '--cookie-jar', dest='cookieJar', + type=cookiejar, metavar='FILE', + default=pkg_resources.resource_filename (__name__, 'data/cookies.txt'), + help='Cookie jar file, read-only.') + parser.add_argument('url', help='Website URL', type=absurl, metavar='URL') + parser.add_argument('output', help='WARC filename', metavar='FILE') + + args = parser.parse_args () + + logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()]) + + ret = SingleExitStatus.Fail + service = Process () + if args.browser: + service = Passthrough (args.browser) + settings = ControllerSettings ( + idleTimeout=args.idleTimeout, + timeout=args.timeout, + insecure=args.insecure, + cookies=args.cookieJar + args.cookie, + ) + with open (args.output, 'wb') as fd, WarcHandler (fd, logger) as warcHandler: + logger.connect (WarcHandlerConsumer (warcHandler)) + handler = [StatsHandler (), LogHandler (logger), warcHandler] + b = list (map (lambda x: behavior.availableMap[x], args.enabledBehaviorNames)) + controller = SinglePageController (url=args.url, settings=settings, + service=service, handler=handler, behavior=b, logger=logger, + warcinfo=args.warcinfo) + try: + loop = asyncio.get_event_loop() + run = asyncio.ensure_future (controller.run ()) + stop = lambda signum: run.cancel () + loop.add_signal_handler (signal.SIGINT, stop, signal.SIGINT) + loop.add_signal_handler (signal.SIGTERM, stop, signal.SIGTERM) + loop.run_until_complete(run) + loop.close() + ret = SingleExitStatus.Ok + except Crashed: + ret = SingleExitStatus.BrowserCrash + except asyncio.CancelledError: + # don’t log this one + pass + except browser.NavigateError: + ret = SingleExitStatus.Navigate + except Exception as e: + ret = SingleExitStatus.Fail + logger.error ('cli exception', + uuid='7fd69858-ecaa-4225-b213-8ab880aa3cc5', + traceback=list (TracebackException.from_exception (e).format ())) + finally: + r = handler[0].stats + logger.info ('stats', context='cli', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **r) + logger.info ('exit', context='cli', uuid='9b1bd603-f7cd-4745-895a-5b894a5166f2', status=ret) + return ret -def stateCallback (data): - result = data['result'] - if data['status'] == 'PROGRESS': - print (data['task_id'], result['step']) +def parsePolicy (recursive, url): + if recursive is None: + return DepthLimit (0) + elif recursive.isdigit (): + return DepthLimit (int (recursive)) + elif recursive == 'prefix': + return PrefixLimit (url) + raise argparse.ArgumentTypeError ('Unsupported recursion mode') + +def recursive (): + logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()]) -def main (): parser = argparse.ArgumentParser(description='Save website to WARC using Google Chrome.') - parser.add_argument('--browser', help='DevTools URL', metavar='URL') - parser.add_argument('--distributed', help='Use celery worker', action='store_true') - parser.add_argument('--timeout', default=10, type=int, help='Maximum time for archival', metavar='SEC') - parser.add_argument('--idle-timeout', default=2, type=int, help='Maximum idle seconds (i.e. no requests)', dest='idleTimeout', metavar='SEC') - parser.add_argument('--log-buffer', default=defaults.logBuffer, type=int, dest='logBuffer', metavar='LINES') - parser.add_argument('--max-body-size', default=defaults.maxBodySize, type=int, dest='maxBodySize', help='Max body size', metavar='BYTES') - #parser.add_argument('--keep-tab', action='store_true', default=False, dest='keepTab', help='Keep tab open') - parser.add_argument('--behavior', help='Comma-separated list of enabled behavior scripts', - dest='enabledBehaviorNames', - default=list (behavior.availableNames), - choices=list (behavior.availableNames)) - parser.add_argument('url', help='Website URL') - parser.add_argument('output', help='WARC filename') + parser.add_argument('-j', '--concurrency', + help='Run at most N jobs concurrently', metavar='N', default=1, + type=int) + parser.add_argument('-r', '--recursion', help='Recursion policy', + metavar='POLICY') + parser.add_argument('--tempdir', help='Directory for temporary files', + metavar='DIR') + parser.add_argument('url', help='Seed URL', type=absurl, metavar='URL') + parser.add_argument('output', + help='Output file, supports templates {host}, {date} and {seqnum}', + metavar='FILE') + parser.add_argument('command', + help='Fetch command, supports templates {url} and {dest}', + metavar='CMD', nargs='*', + default=['crocoite-single', '{url}', '{dest}']) + + args = parser.parse_args () + try: + policy = parsePolicy (args.recursion, args.url) + except argparse.ArgumentTypeError as e: + parser.error (str (e)) + + try: + controller = RecursiveController (url=args.url, output=args.output, + command=args.command, logger=logger, policy=policy, + tempdir=args.tempdir, concurrency=args.concurrency) + except ValueError as e: + parser.error (str (e)) + + run = asyncio.ensure_future (controller.run ()) + loop = asyncio.get_event_loop() + stop = lambda signum: run.cancel () + loop.add_signal_handler (signal.SIGINT, stop, signal.SIGINT) + loop.add_signal_handler (signal.SIGTERM, stop, signal.SIGTERM) + try: + loop.run_until_complete(run) + except asyncio.CancelledError: + pass + finally: + loop.close() + + return 0 + +def irc (): + import json, re + from .irc import Chromebot + + logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()]) + + parser = argparse.ArgumentParser(description='IRC bot.') + parser.add_argument('--config', '-c', help='Config file location', metavar='PATH', default='chromebot.json') args = parser.parse_args () - # prepare args for function - distributed = args.distributed - passArgs = vars (args) - del passArgs['distributed'] - - if distributed: - result = archive.delay (**passArgs) - r = result.get (on_message=stateCallback) - else: - # XXX: local evaluation does not init celery logging? - logging.basicConfig (level=logging.INFO) - r = archive (**passArgs) - print (r['stats']) - - return True + with open (args.config) as fd: + config = json.load (fd) + s = config['irc'] + blacklist = dict (map (lambda x: (re.compile (x[0], re.I), x[1]), config['blacklist'].items ())) + + loop = asyncio.get_event_loop() + bot = Chromebot ( + host=s['host'], + port=s['port'], + ssl=s['ssl'], + nick=s['nick'], + channels=s['channels'], + tempdir=config['tempdir'], + destdir=config['destdir'], + processLimit=config['process_limit'], + logger=logger, + blacklist=blacklist, + needVoice=config['need_voice'], + loop=loop) + stop = lambda signum: bot.cancel () + loop.add_signal_handler (signal.SIGINT, stop, signal.SIGINT) + loop.add_signal_handler (signal.SIGTERM, stop, signal.SIGTERM) + loop.run_until_complete(bot.run ()) + +def dashboard (): + from .irc import Dashboard + + loop = asyncio.get_event_loop() + d = Dashboard (sys.stdin, loop) + loop.run_until_complete(d.run ()) + loop.run_forever() |