From 824c6e91ae6fee1318e79c3ce1a43f98bc697c7b Mon Sep 17 00:00:00 2001 From: Lars-Dominik Braun Date: Fri, 4 May 2018 13:00:05 +0200 Subject: Add distributed recursive crawls --- crocoite/cli.py | 41 +++++++++++++++-------------------- crocoite/controller.py | 22 ++++++++++++++----- crocoite/task.py | 59 +++++++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 91 insertions(+), 31 deletions(-) diff --git a/crocoite/cli.py b/crocoite/cli.py index 196162e..15fac12 100644 --- a/crocoite/cli.py +++ b/crocoite/cli.py @@ -28,15 +28,11 @@ from . import behavior from .controller import RecursiveController, defaultSettings, \ ControllerSettings, DepthLimit, PrefixLimit -def stateCallback (data): - result = data['result'] - if data['status'] == 'PROGRESS': - print (data['task_id'], result['step']) - def main (): parser = argparse.ArgumentParser(description='Save website to WARC using Google Chrome.') parser.add_argument('--browser', help='DevTools URL', metavar='URL') parser.add_argument('--recursive', help='Follow links recursively') + parser.add_argument('--concurrency', '-j', type=int, default=1) parser.add_argument('--timeout', default=10, type=int, help='Maximum time for archival', metavar='SEC') parser.add_argument('--idle-timeout', default=2, type=int, help='Maximum idle seconds (i.e. no requests)', dest='idleTimeout', metavar='SEC') parser.add_argument('--log-buffer', default=defaultSettings.logBuffer, type=int, dest='logBuffer', metavar='LINES') @@ -52,29 +48,28 @@ def main (): args = parser.parse_args () - # prepare args for function - distributed = args.distributed - if args.recursive is None: - recursionPolicy = DepthLimit (0) - elif args.recursive.isdigit (): - recursionPolicy = DepthLimit (int (args.recursive)) - elif args.recursive == 'prefix': - recursionPolicy = PrefixLimit (args.url) - else: - parser.error ('Invalid argument for --recursive') - - if distributed: - assert args.recursive is None, "Distributed crawls cannot be recursive right now, sorry" - - from .task import archive + if args.distributed: + if args.browser: + parser.error ('--browser is not supported for distributed jobs') + from . import task settings = dict (maxBodySize=args.maxBodySize, logBuffer=args.logBuffer, idleTimeout=args.idleTimeout, timeout=args.timeout) - result = archive.delay (url=args.url, settings=settings, - enabledBehaviorNames=args.enabledBehaviorNames) - r = result.get (on_message=stateCallback) + result = task.controller.delay (url=args.url, settings=settings, + enabledBehaviorNames=args.enabledBehaviorNames, + recursive=args.recursive, concurrency=args.concurrency) + r = result.get () else: logging.basicConfig (level=logging.INFO) + + if args.recursive is None: + recursionPolicy = DepthLimit (0) + elif args.recursive.isdigit (): + recursionPolicy = DepthLimit (int (args.recursive)) + elif args.recursive == 'prefix': + recursionPolicy = PrefixLimit (args.url) + else: + parser.error ('Invalid argument for --recursive') settings = ControllerSettings (maxBodySize=args.maxBodySize, logBuffer=args.logBuffer, idleTimeout=args.idleTimeout, timeout=args.timeout) diff --git a/crocoite/controller.py b/crocoite/controller.py index 638cb6c..113c139 100644 --- a/crocoite/controller.py +++ b/crocoite/controller.py @@ -29,6 +29,10 @@ class ControllerSettings: self.idleTimeout = idleTimeout self.timeout = timeout + def toDict (self): + return dict (logBuffer=self.logBuffer, maxBodySize=self.maxBodySize, + idleTimeout=self.idleTimeout, timeout=self.timeout) + defaultSettings = ControllerSettings () import logging @@ -186,6 +190,18 @@ class RecursiveController: self.logger = logger self.recursionPolicy = recursionPolicy + def fetch (self, urls): + """ + Overrideable fetch action for URLs. Defaults to sequential + SinglePageController. + """ + result = [] + for u in urls: + c = SinglePageController (u, self.output, self.service, + self.behavior, self.logger, self.settings) + result.append (c.run ()) + return result + def run (self): have = set () urls = set ([self.url]) @@ -193,11 +209,7 @@ class RecursiveController: while urls: self.logger.info ('retrieving {} urls'.format (len (urls))) - result = [] - for u in urls: - c = SinglePageController (u, self.output, self.service, - self.behavior, self.logger, self.settings) - result.append (c.run ()) + result = self.fetch (urls) have.update (urls) urls = set () diff --git a/crocoite/task.py b/crocoite/task.py index 39900a5..52d3b26 100644 --- a/crocoite/task.py +++ b/crocoite/task.py @@ -22,19 +22,37 @@ Celery distributed tasks """ -import os +import os, logging from urllib.parse import urlsplit from datetime import datetime +from operator import attrgetter +from itertools import chain +def _monkeyPatchSyncTasks (): + """ Result sets don’t support the argument disable_sync_subtasks argument """ + import celery.result + celery.result.assert_will_not_block = lambda: None + +_monkeyPatchSyncTasks () from celery import Celery from celery.utils.log import get_task_logger -from .controller import SinglePageController, ControllerSettings +from .browser import ChromeService +from .controller import SinglePageController, ControllerSettings, RecursiveController, defaultSettings, DepthLimit, PrefixLimit from . import behavior app = Celery ('crocoite.distributed') app.config_from_object('celeryconfig') +app.conf.task_routes = { + 'crocoite.task.archive': {'queue': 'crocoite.archive'}, + 'crocoite.task.controller': {'queue': 'crocoite.controller'}, + # .chunks is actually a starmap job + 'celery.starmap': {'queue': 'crocoite.archive'}, + } +app.conf.task_default_queue = 'crocoite.default' +# disable prefetching, since our tasks usually run for a _very_ long time +app.conf.worker_prefetch_multiplier = 1 logger = get_task_logger('crocoite.distributed.archive') @app.task(bind=True, track_started=True) @@ -51,7 +69,7 @@ def archive (self, url, settings, enabledBehaviorNames): parsedUrl = urlsplit (url) outFile = app.conf.warc_filename.format ( - id=self.request.id, + id=self.request.root_id, domain=parsedUrl.hostname.replace ('/', '-'), date=datetime.utcnow ().isoformat (), ) @@ -69,3 +87,38 @@ def archive (self, url, settings, enabledBehaviorNames): return ret +class DistributedRecursiveController (RecursiveController): + """ Distributed, recursive controller using celery """ + + def __init__ (self, url, service=ChromeService (), behavior=behavior.available, \ + logger=logging.getLogger(__name__), settings=defaultSettings, + recursionPolicy=DepthLimit (0), concurrency=1): + super ().__init__ (url, None, service, behavior, logger, settings, recursionPolicy) + self.concurrency = concurrency + + def fetch (self, urls): + def chunksIter (urls): + for u in urls: + yield (u, self.settings.toDict (), list (map (attrgetter ('name'), self.behavior))) + itemsPerTask = len (urls)//self.concurrency + if itemsPerTask <= 0: + itemsPerTask = len (urls) + return chain.from_iterable (archive.chunks (chunksIter (urls), itemsPerTask).apply_async ().get ()) + +@app.task(bind=True, track_started=True) +def controller (self, url, settings, enabledBehaviorNames, recursive, concurrency): + """ Recursive controller """ + + if recursive is None: + recursionPolicy = DepthLimit (0) + elif recursive.isdigit (): + recursionPolicy = DepthLimit (int (recursive)) + elif recursive == 'prefix': + recursionPolicy = PrefixLimit (url) + + enabledBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available)) + settings = ControllerSettings (**settings) + controller = DistributedRecursiveController (url, None, behavior=enabledBehavior, + settings=settings, recursionPolicy=recursionPolicy, concurrency=concurrency) + return controller.run () + -- cgit v1.2.3