diff options
Diffstat (limited to 'crocoite')
| -rw-r--r-- | crocoite/cli.py | 41 | ||||
| -rw-r--r-- | crocoite/controller.py | 22 | ||||
| -rw-r--r-- | crocoite/task.py | 59 | 
3 files changed, 91 insertions, 31 deletions
| diff --git a/crocoite/cli.py b/crocoite/cli.py index 196162e..15fac12 100644 --- a/crocoite/cli.py +++ b/crocoite/cli.py @@ -28,15 +28,11 @@ from . import behavior  from .controller import RecursiveController, defaultSettings, \          ControllerSettings, DepthLimit, PrefixLimit -def stateCallback (data): -    result = data['result'] -    if data['status'] == 'PROGRESS': -        print (data['task_id'], result['step']) -  def main ():      parser = argparse.ArgumentParser(description='Save website to WARC using Google Chrome.')      parser.add_argument('--browser', help='DevTools URL', metavar='URL')      parser.add_argument('--recursive', help='Follow links recursively') +    parser.add_argument('--concurrency', '-j', type=int, default=1)      parser.add_argument('--timeout', default=10, type=int, help='Maximum time for archival', metavar='SEC')      parser.add_argument('--idle-timeout', default=2, type=int, help='Maximum idle seconds (i.e. no requests)', dest='idleTimeout', metavar='SEC')      parser.add_argument('--log-buffer', default=defaultSettings.logBuffer, type=int, dest='logBuffer', metavar='LINES') @@ -52,29 +48,28 @@ def main ():      args = parser.parse_args () -    # prepare args for function -    distributed = args.distributed -    if args.recursive is None: -        recursionPolicy = DepthLimit (0) -    elif args.recursive.isdigit (): -        recursionPolicy = DepthLimit (int (args.recursive)) -    elif args.recursive == 'prefix': -        recursionPolicy = PrefixLimit (args.url) -    else: -        parser.error ('Invalid argument for --recursive') - -    if distributed: -        assert args.recursive is None, "Distributed crawls cannot be recursive right now, sorry" - -        from .task import archive +    if args.distributed: +        if args.browser: +            parser.error ('--browser is not supported for distributed jobs') +        from . import task          settings = dict (maxBodySize=args.maxBodySize,                  logBuffer=args.logBuffer, idleTimeout=args.idleTimeout,                  timeout=args.timeout) -        result = archive.delay (url=args.url, settings=settings, -                enabledBehaviorNames=args.enabledBehaviorNames) -        r = result.get (on_message=stateCallback) +        result = task.controller.delay (url=args.url, settings=settings, +                enabledBehaviorNames=args.enabledBehaviorNames, +                recursive=args.recursive, concurrency=args.concurrency) +        r = result.get ()      else:          logging.basicConfig (level=logging.INFO) + +        if args.recursive is None: +            recursionPolicy = DepthLimit (0) +        elif args.recursive.isdigit (): +            recursionPolicy = DepthLimit (int (args.recursive)) +        elif args.recursive == 'prefix': +            recursionPolicy = PrefixLimit (args.url) +        else: +            parser.error ('Invalid argument for --recursive')          settings = ControllerSettings (maxBodySize=args.maxBodySize,                  logBuffer=args.logBuffer, idleTimeout=args.idleTimeout,                  timeout=args.timeout) diff --git a/crocoite/controller.py b/crocoite/controller.py index 638cb6c..113c139 100644 --- a/crocoite/controller.py +++ b/crocoite/controller.py @@ -29,6 +29,10 @@ class ControllerSettings:          self.idleTimeout = idleTimeout          self.timeout = timeout +    def toDict (self): +        return dict (logBuffer=self.logBuffer, maxBodySize=self.maxBodySize, +                idleTimeout=self.idleTimeout, timeout=self.timeout) +  defaultSettings = ControllerSettings ()  import logging @@ -186,6 +190,18 @@ class RecursiveController:          self.logger = logger          self.recursionPolicy = recursionPolicy +    def fetch (self, urls): +        """ +        Overrideable fetch action for URLs. Defaults to sequential +        SinglePageController. +        """ +        result = [] +        for u in urls: +            c = SinglePageController (u, self.output, self.service, +                    self.behavior, self.logger, self.settings) +            result.append (c.run ()) +        return result +      def run (self):          have = set ()          urls = set ([self.url]) @@ -193,11 +209,7 @@ class RecursiveController:          while urls:              self.logger.info ('retrieving {} urls'.format (len (urls))) -            result = [] -            for u in urls: -                c = SinglePageController (u, self.output, self.service, -                        self.behavior, self.logger, self.settings) -                result.append (c.run ()) +            result = self.fetch (urls)              have.update (urls)              urls = set () diff --git a/crocoite/task.py b/crocoite/task.py index 39900a5..52d3b26 100644 --- a/crocoite/task.py +++ b/crocoite/task.py @@ -22,19 +22,37 @@  Celery distributed tasks  """ -import os +import os, logging  from urllib.parse import urlsplit  from datetime import datetime +from operator import attrgetter +from itertools import chain +def _monkeyPatchSyncTasks (): +    """ Result sets don’t support the argument disable_sync_subtasks argument """ +    import celery.result +    celery.result.assert_will_not_block = lambda: None + +_monkeyPatchSyncTasks ()  from celery import Celery  from celery.utils.log import get_task_logger -from .controller import SinglePageController, ControllerSettings +from .browser import ChromeService +from .controller import SinglePageController, ControllerSettings, RecursiveController, defaultSettings, DepthLimit, PrefixLimit  from . import behavior  app = Celery ('crocoite.distributed')  app.config_from_object('celeryconfig') +app.conf.task_routes = { +        'crocoite.task.archive': {'queue': 'crocoite.archive'}, +        'crocoite.task.controller': {'queue': 'crocoite.controller'}, +        # <method>.chunks is actually a starmap job +        'celery.starmap': {'queue': 'crocoite.archive'}, +        } +app.conf.task_default_queue = 'crocoite.default' +# disable prefetching, since our tasks usually run for a _very_ long time +app.conf.worker_prefetch_multiplier = 1  logger = get_task_logger('crocoite.distributed.archive')  @app.task(bind=True, track_started=True) @@ -51,7 +69,7 @@ def archive (self, url, settings, enabledBehaviorNames):      parsedUrl = urlsplit (url)      outFile = app.conf.warc_filename.format ( -                    id=self.request.id, +                    id=self.request.root_id,                      domain=parsedUrl.hostname.replace ('/', '-'),                      date=datetime.utcnow ().isoformat (),                      ) @@ -69,3 +87,38 @@ def archive (self, url, settings, enabledBehaviorNames):      return ret +class DistributedRecursiveController (RecursiveController): +    """ Distributed, recursive controller using celery """ + +    def __init__ (self, url, service=ChromeService (), behavior=behavior.available, \ +            logger=logging.getLogger(__name__), settings=defaultSettings, +            recursionPolicy=DepthLimit (0), concurrency=1): +        super ().__init__ (url, None, service, behavior, logger, settings, recursionPolicy) +        self.concurrency = concurrency + +    def fetch (self, urls): +        def chunksIter (urls): +            for u in urls: +                yield (u, self.settings.toDict (), list (map (attrgetter ('name'), self.behavior))) +        itemsPerTask = len (urls)//self.concurrency +        if itemsPerTask <= 0: +            itemsPerTask = len (urls) +        return chain.from_iterable (archive.chunks (chunksIter (urls), itemsPerTask).apply_async ().get ()) + +@app.task(bind=True, track_started=True) +def controller (self, url, settings, enabledBehaviorNames, recursive, concurrency): +    """ Recursive controller """ + +    if recursive is None: +        recursionPolicy = DepthLimit (0) +    elif recursive.isdigit (): +        recursionPolicy = DepthLimit (int (recursive)) +    elif recursive == 'prefix': +        recursionPolicy = PrefixLimit (url) + +    enabledBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available)) +    settings = ControllerSettings (**settings) +    controller = DistributedRecursiveController (url, None, behavior=enabledBehavior, +            settings=settings, recursionPolicy=recursionPolicy, concurrency=concurrency) +    return controller.run () + | 
