diff options
| -rw-r--r-- | README.rst | 61 | ||||
| -rw-r--r-- | contrib/celerycrocoite.py | 229 | ||||
| -rw-r--r-- | crocoite/cli.py | 73 | ||||
| -rw-r--r-- | crocoite/controller.py | 121 | ||||
| -rw-r--r-- | crocoite/task.py | 146 | ||||
| -rw-r--r-- | setup.py | 3 | 
6 files changed, 24 insertions, 609 deletions
| @@ -17,12 +17,10 @@ The following dependencies must be present to run crocoite:  - pychrome_   - warcio_  - html5lib_ -- Celery_ (optional)  .. _pychrome: https://github.com/fate0/pychrome  .. _warcio: https://github.com/webrecorder/warcio  .. _html5lib: https://github.com/html5lib/html5lib-python -.. _Celery: http://www.celeryproject.org/  It is recommended to prepare a virtualenv and let pip handle the dependency  resolution for Python packages instead: @@ -121,65 +119,6 @@ does not work any more. Secondly it also saves a screenshot of the full page,  so even if future browsers cannot render and display the stored HTML a fully  rendered version of the website can be replayed instead. -Advanced usage --------------- - -crocoite offers more than just a one-shot command-line interface. - -Distributed crawling -^^^^^^^^^^^^^^^^^^^^ - -Configure using celeryconfig.py - -.. code:: python - -    broker_url = 'pyamqp://' -    result_backend = 'rpc://' -    warc_filename = '{domain}-{date}-{id}.warc.gz' -    temp_dir = '/tmp/' -    finished_dir = '/tmp/finished' - -Start a Celery worker:: - -    celery -A crocoite.task worker -Q crocoite.archive,crocoite.controller --loglevel=info - -Then queue archive job:: - -    crocoite-grab --distributed http://example.com - -The worker will create a temporary file named according to ``warc_filename`` in -``/tmp`` while archiving and move it to ``/tmp/finished`` when done. - -IRC bot -^^^^^^^ - -Configure sopel_ (``~/.sopel/default.cfg``) to use the plugin located in -``contrib/celerycrocoite.py`` - -.. code:: ini - -    [core] -    nick = chromebot -    host = irc.efnet.fr -    port = 6667 -    owner = someone -    extra = /path/to/crocoite/contrib -    enable = celerycrocoite -    channels = #somechannel - -Then start it by running ``sopel``. The bot must be addressed directly (i.e. -``chromebot: <command>``). The following commands are currently supported: - -a <url> -    Archives <url> and all of its resources (images, css, …). A unique UID -    (UUID) is assigned to each job. -s <uuid> -    Get status of job with <uuid> -r <uuid> -    Revoke job with <uuid>. If it started already the job will be killed. - -.. _sopel: https://sopel.chat/ -  Related projects  ---------------- diff --git a/contrib/celerycrocoite.py b/contrib/celerycrocoite.py deleted file mode 100644 index 3da43d9..0000000 --- a/contrib/celerycrocoite.py +++ /dev/null @@ -1,229 +0,0 @@ -# Copyright (c) 2017 crocoite contributors -#  -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -#  -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -#  -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -""" -Module for Sopel IRC bot -""" - -import os, logging, argparse -from sopel.module import nickname_commands, require_chanmsg, thread, example, require_privilege, VOICE -from sopel.tools import Identifier, SopelMemory -import celery, celery.exceptions -from celery.result import AsyncResult -from urllib.parse import urlsplit -from threading import Thread -from queue import Queue -import queue - -from crocoite import behavior, task -from crocoite.controller import defaultSettings - -def prettyTimeDelta (seconds): -    """ -    Pretty-print seconds to human readable string 1d 1h 1m 1s -    """ -    seconds = int(seconds) -    days, seconds = divmod(seconds, 86400) -    hours, seconds = divmod(seconds, 3600) -    minutes, seconds = divmod(seconds, 60) -    s = [(days, 'd'), (hours, 'h'), (minutes, 'm'), (seconds, 's')] -    s = filter (lambda x: x[0] != 0, s) -    return ' '.join (map (lambda x: '{}{}'.format (*x), s)) - -def prettyBytes (b): -    """ -    Pretty-print bytes -    """ -    prefixes = ['B', 'KiB', 'MiB', 'GiB', 'TiB'] -    while b >= 1024 and len (prefixes) > 1: -        b /= 1024 -        prefixes.pop (0) -    return '{:.1f} {}'.format (b, prefixes[0]) - -def setup (bot): -    m = bot.memory['crocoite'] = {} -    q = m['q'] = Queue () -    t = m['t'] = Thread (target=celeryWorker, args=(bot, q)) -    t.start () - -def shutdown (bot): -    m = bot.memory['crocoite'] -    q = m['q'] -    t = m['t'] -    q.put_nowait (None) -    t.join () - -def isValidUrl (s): -    url = urlsplit (s) -    if url.scheme and url.netloc and url.scheme in {'http', 'https'}: -        return s -    raise TypeError () - -def checkCompletedJobs (bot, jobs): -    delete = set () -    for i, data in jobs.items (): -        handle = data['handle'] -        trigger = data['trigger'] -        args = data['args'] -        url = args['url'] -        channel = trigger.sender -        user = trigger.nick -        if Identifier (channel) not in bot.channels: -            continue -        try: -            stats = handle.get (timeout=0.1) -            bot.msg (channel, '{}: {} ({}) finished. {} crashed, {} requests, {} failed, {} received.'.format (user, url, -                    handle.id, stats['crashed'], stats['requests'], stats['failed'], -                    prettyBytes (stats['bytesRcv']))) -            delete.add (handle.id) -        except celery.exceptions.TimeoutError: -            pass -        except Exception as e: -            # json serialization does not work well with exceptions. If their class -            # names are unique we can still distinguish them. -            ename = type (e).__name__ -            if ename == 'TaskRevokedError': -                bot.msg (channel, '{}: {} ({}) was revoked'.format (user, url, handle.id)) -            else: -                bot.msg (channel, '{} ({}) failed'.format (user, url, handle.id)) -                logging.exception ('{} ({}) failed'.format (url, handle.id)) -            delete.add (handle.id) -    for d in delete: -        del jobs[d] - -def celeryWorker (bot, q): -    """ -    Serialize celery operations in a single thread. This is a workaround for -    https://github.com/celery/celery/issues/4480 -    """ - -    jobs = {} - -    while True: -        try: -            item = q.get (timeout=1) -        except queue.Empty: -            checkCompletedJobs (bot, jobs) -            continue - -        if item is None: -            break -        action, trigger, args = item -        if action == 'a': -            handle = task.controller.delay (**args) -            j = jobs[handle.id] = {'handle': handle, 'trigger': trigger, 'args': args} - -            # pretty-print a few selected args -            showargs = { -                    'idleTimeout': prettyTimeDelta (args['settings']['idleTimeout']), -                    'timeout': prettyTimeDelta (args['settings']['timeout']), -                    'maxBodySize': prettyBytes (args['settings']['maxBodySize']), -                    'recursive': args['recursive'], -                    'concurrency': args['concurrency'], -                    } -            strargs = ', '.join (map (lambda x: '{}={}'.format (*x), showargs.items ())) -            bot.msg (trigger.sender, '{}: {} has been queued as {} with {}'.format (trigger.nick, args['url'], handle.id, strargs)) -        elif action == 'status': -            if args and args in jobs: -                j = jobs[args] -                jtrigger = j['trigger'] -                handle = j['handle'] -                bot.msg (trigger.sender, '{}: {}, queued {}, by {}'.format (handle.id, -                        handle.status, jtrigger.time, jtrigger.nick)) -            else: -                bot.msg (trigger.sender, "Job not found.") -        elif action == 'revoke': -            if args and args in jobs: -                j = jobs[args] -                handle = j['handle'] -                handle.revoke (terminate=True) -                # response is handled above -            else: -                bot.msg (trigger.sender, "Job not found.") -        q.task_done () - -class NonExitingArgumentParser (argparse.ArgumentParser): -    def exit (self, status=0, message=None): -        # should never be called -        pass - -    def error (self, message): -        raise Exception (message) - -archiveparser = NonExitingArgumentParser (prog='a', add_help=False) -archiveparser.add_argument('--timeout', default=1*60*60, type=int, help='Maximum time for archival', metavar='SEC', choices=[60, 1*60*60, 2*60*60]) -archiveparser.add_argument('--idle-timeout', default=10, type=int, help='Maximum idle seconds (i.e. no requests)', dest='idleTimeout', metavar='SEC', choices=[1, 10, 20, 30, 60]) -archiveparser.add_argument('--max-body-size', default=defaultSettings.maxBodySize, type=int, dest='maxBodySize', help='Max body size', metavar='BYTES', choices=[1*1024*1024, 10*1024*1024, defaultSettings.maxBodySize, 100*1024*1024]) -archiveparser.add_argument('--concurrency', default=1, type=int, help='Parallel workers for this job', choices=range (9)) -archiveparser.add_argument('--recursive', help='Enable recursion', choices=['0', '1', '2', '3', 'prefix']) -archiveparser.add_argument('url', help='Website URL', type=isValidUrl) - -@nickname_commands ('a', 'archive') -@require_chanmsg () -@require_privilege (VOICE) -@example ('a http://example.com') -def archive (bot, trigger): -    """ -    Archive a URL to WARC -    """ - -    try: -        args = archiveparser.parse_args (trigger.group (2).split ()) -    except Exception as e: -        bot.reply ('{} -- {}'.format (e.args[0], archiveparser.format_usage ())) -        return -    if not args: -        bot.reply ('Sorry, I don’t understand {}'.format (trigger.group (2))) -        return -    settings = dict (maxBodySize=args.maxBodySize, -            logBuffer=defaultSettings.logBuffer, idleTimeout=args.idleTimeout, -            timeout=args.timeout) -    args = dict (url=args.url, -            enabledBehaviorNames=list (behavior.availableMap.keys ()), -            settings=settings, recursive=args.recursive, -            concurrency=args.concurrency) -    q = bot.memory['crocoite']['q'] -    q.put_nowait (('a', trigger, args)) - -@nickname_commands ('s', 'status') -@example ('s c251f09e-3c26-481f-96e0-4b5f58bd1170') -@require_chanmsg () -def status (bot, trigger): -    """ -    Retrieve status for a job -    """ - -    i = trigger.group(2) -    q = bot.memory['crocoite']['q'] -    q.put_nowait (('status', trigger, i)) - -@nickname_commands ('r', 'revoke') -@example ('r c251f09e-3c26-481f-96e0-4b5f58bd1170') -@require_privilege (VOICE) -@require_chanmsg () -def revoke (bot, trigger): -    """ -    Cancel (revoke) a job -    """ - -    i = trigger.group(2) -    q = bot.memory['crocoite']['q'] -    q.put_nowait (('revoke', trigger, i)) - diff --git a/crocoite/cli.py b/crocoite/cli.py index 8e225d9..ac7e648 100644 --- a/crocoite/cli.py +++ b/crocoite/cli.py @@ -25,76 +25,43 @@ Command line interface  import argparse, json, sys  from . import behavior -from .controller import RecursiveController, defaultSettings, \ -        ControllerSettings, DepthLimit, PrefixLimit, StatsHandler +from .controller import SinglePageController, defaultSettings, \ +        ControllerSettings, StatsHandler  from .browser import NullService, ChromeService  from .warc import WarcHandler  from .logger import Logger, JsonPrintConsumer, DatetimeConsumer, WarcHandlerConsumer -def parseRecursive (recursive, url): -    if recursive is None: -        return DepthLimit (0) -    elif recursive.isdigit (): -        return DepthLimit (int (recursive)) -    elif recursive == 'prefix': -        return PrefixLimit (url) -    else: -        raise ValueError ('Unsupported') - -def main (): +def single ():      parser = argparse.ArgumentParser(description='Save website to WARC using Google Chrome.')      parser.add_argument('--browser', help='DevTools URL', metavar='URL') -    parser.add_argument('--recursive', help='Follow links recursively') -    parser.add_argument('--concurrency', '-j', type=int, default=1)      parser.add_argument('--timeout', default=10, type=int, help='Maximum time for archival', metavar='SEC')      parser.add_argument('--idle-timeout', default=2, type=int, help='Maximum idle seconds (i.e. no requests)', dest='idleTimeout', metavar='SEC') -    parser.add_argument('--log-buffer', default=defaultSettings.logBuffer, type=int, dest='logBuffer', metavar='LINES')      parser.add_argument('--max-body-size', default=defaultSettings.maxBodySize, type=int, dest='maxBodySize', help='Max body size', metavar='BYTES')      parser.add_argument('--behavior', help='Comma-separated list of enabled behavior scripts',              dest='enabledBehaviorNames',              default=list (behavior.availableMap.keys ()),              choices=list (behavior.availableMap.keys ())) -    group = parser.add_mutually_exclusive_group (required=True) -    group.add_argument('--output', help='WARC filename', metavar='FILE') -    group.add_argument('--distributed', help='Use celery worker', action='store_true') -    parser.add_argument('url', help='Website URL') +    parser.add_argument('url', help='Website URL', metavar='URL') +    parser.add_argument('output', help='WARC filename', metavar='FILE')      args = parser.parse_args () -    if args.distributed: -        if args.browser: -            parser.error ('--browser is not supported for distributed jobs') -        from . import task -        settings = dict (maxBodySize=args.maxBodySize, -                logBuffer=args.logBuffer, idleTimeout=args.idleTimeout, -                timeout=args.timeout) -        result = task.controller.delay (url=args.url, settings=settings, -                enabledBehaviorNames=args.enabledBehaviorNames, -                recursive=args.recursive, concurrency=args.concurrency) -        r = result.get () -    else: -        logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()]) +    logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()]) -        try: -            recursionPolicy = parseRecursive (args.recursive, args.url) -        except ValueError: -            parser.error ('Invalid argument for --recursive') -        service = ChromeService () -        if args.browser: -            service = NullService (args.browser) -        settings = ControllerSettings (maxBodySize=args.maxBodySize, -                logBuffer=args.logBuffer, idleTimeout=args.idleTimeout, -                timeout=args.timeout) -        with open (args.output, 'wb') as fd, WarcHandler (fd, logger) as warcHandler: -            logger.connect (WarcHandlerConsumer (warcHandler)) -            handler = [StatsHandler (), warcHandler] -            b = list (map (lambda x: behavior.availableMap[x], args.enabledBehaviorNames)) -            controller = RecursiveController (args.url, fd, settings=settings, -                    recursionPolicy=recursionPolicy, service=service, -                    handler=handler, behavior=b, logger=logger) -            controller.run () -            r = handler[0].stats -            logger.info ('stats', context='cli', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **r) +    service = ChromeService () +    if args.browser: +        service = NullService (args.browser) +    settings = ControllerSettings (maxBodySize=args.maxBodySize, +            idleTimeout=args.idleTimeout, timeout=args.timeout) +    with open (args.output, 'wb') as fd, WarcHandler (fd, logger) as warcHandler: +        logger.connect (WarcHandlerConsumer (warcHandler)) +        handler = [StatsHandler (), warcHandler] +        b = list (map (lambda x: behavior.availableMap[x], args.enabledBehaviorNames)) +        controller = SinglePageController (args.url, fd, settings=settings, +                service=service, handler=handler, behavior=b, logger=logger) +        controller.run () +        r = handler[0].stats +        logger.info ('stats', context='cli', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **r)      return True diff --git a/crocoite/controller.py b/crocoite/controller.py index 178d11c..9dae96f 100644 --- a/crocoite/controller.py +++ b/crocoite/controller.py @@ -23,16 +23,15 @@ Controller classes, handling actions required for archival  """  class ControllerSettings: -    __slots__ = ('logBuffer', 'maxBodySize', 'idleTimeout', 'timeout') +    __slots__ = ('maxBodySize', 'idleTimeout', 'timeout') -    def __init__ (self, logBuffer=1000, maxBodySize=50*1024*1024, idleTimeout=2, timeout=10): -        self.logBuffer = logBuffer +    def __init__ (self, maxBodySize=50*1024*1024, idleTimeout=2, timeout=10):          self.maxBodySize = maxBodySize          self.idleTimeout = idleTimeout          self.timeout = timeout      def toDict (self): -        return dict (logBuffer=self.logBuffer, maxBodySize=self.maxBodySize, +        return dict (maxBodySize=self.maxBodySize,                  idleTimeout=self.idleTimeout, timeout=self.timeout)  defaultSettings = ControllerSettings () @@ -207,117 +206,3 @@ class SinglePageController:              processQueue () -class RecursionPolicy: -    """ Abstract recursion policy """ - -    __slots__ = () - -    def __call__ (self, urls): -        raise NotImplementedError - -class DepthLimit (RecursionPolicy): -    """ -    Limit recursion by depth. -     -    depth==0 means no recursion, depth==1 is the page and outgoing links, … -    """ - -    __slots__ = ('maxdepth') - -    def __init__ (self, maxdepth=0): -        self.maxdepth = maxdepth - -    def __call__ (self, urls): -        if self.maxdepth <= 0: -            return {} -        else: -            self.maxdepth -= 1 -            return urls - -    def __repr__ (self): -        return '<DepthLimit {}>'.format (self.maxdepth) - -class PrefixLimit (RecursionPolicy): -    """ -    Limit recursion by prefix -     -    i.e. prefix=http://example.com/foo -    ignored: http://example.com/bar http://offsite.example/foo -    accepted: http://example.com/foobar http://example.com/foo/bar -    """ - -    __slots__ = ('prefix') - -    def __init__ (self, prefix): -        self.prefix = prefix - -    def __call__ (self, urls): -        return set (filter (lambda u: u.startswith (self.prefix), urls)) - -from .behavior import ExtractLinksEvent - -class RecursiveController (EventHandler): -    """ -    Simple recursive controller - -    Visits links acording to recursionPolicy -    """ - -    __slots__ = ('url', 'output', 'service', 'behavior', 'settings', 'logger', -            'recursionPolicy', 'handler', 'urls', 'have') - -    def __init__ (self, url, output, logger, -            service=ChromeService (), behavior=cbehavior.available, \ -            settings=defaultSettings, \ -            recursionPolicy=DepthLimit (0), handler=[]): -        self.url = url -        self.output = output -        self.service = service -        self.behavior = behavior -        self.settings = settings -        self.logger = logger.bind (context=type(self).__name__, url=url) -        self.recursionPolicy = recursionPolicy -        self.handler = handler -        self.handler.append (self) - -    def fetch (self, urls): -        """ -        Overrideable fetch action for URLs. Defaults to sequential -        SinglePageController. -        """ -        for u in urls: -            try: -                c = SinglePageController (url=u, output=self.output, service=self.service, -                        behavior=self.behavior, logger=self.logger, -                        settings=self.settings, handler=self.handler) -                c.run () -            except BrowserCrashed: -                # this is fine if reported -                self.logger.error ('browser crashed', uuid='42582cbe-fb83-47ce-b330-d022a1c3b331') - -    def run (self): -        self.have = set () -        self.urls = set ([self.url]) - -        while self.urls: -            self.logger.info ('recursing', -                    uuid='5b8498e4-868d-413c-a67e-004516b8452c', -                    numurls=len (self.urls)) - -            self.have.update (self.urls) -            fetchurls = self.urls -            self.urls = set () - -            # handler appends new urls to self.urls through push() -            self.fetch (fetchurls) - -            # remove urls we have and apply recursion policy -            self.urls.difference_update (self.have) -            self.urls = self.recursionPolicy (self.urls) - -    def push (self, item): -        if isinstance (item, ExtractLinksEvent): -            self.logger.debug ('extracted links', -                    uuid='8ee5e9c9-1130-4c5c-88ff-718508546e0c', links=item.links) -            self.urls.update (map (removeFragment, item.links)) - diff --git a/crocoite/task.py b/crocoite/task.py deleted file mode 100644 index 06dd022..0000000 --- a/crocoite/task.py +++ /dev/null @@ -1,146 +0,0 @@ -# Copyright (c) 2017–2018 crocoite contributors -#  -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -#  -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -#  -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -""" -Celery distributed tasks -""" - -import os - -from urllib.parse import urlsplit -from datetime import datetime -from operator import attrgetter -from itertools import chain - -def _monkeyPatchSyncTasks (): -    """ Result sets don’t support the argument disable_sync_subtasks argument """ -    import celery.result -    celery.result.assert_will_not_block = lambda: None - -_monkeyPatchSyncTasks () -from celery import Celery -from celery.utils.log import get_task_logger - -from .browser import ChromeService, BrowserCrashed -from .controller import SinglePageController, ControllerSettings, RecursiveController, defaultSettings, DepthLimit, StatsHandler -from . import behavior -from .cli import parseRecursive -from .warc import WarcHandler - -app = Celery ('crocoite.distributed') -app.config_from_object('celeryconfig') -app.conf.task_routes = { -        'crocoite.task.archive': {'queue': 'crocoite.archive'}, -        'crocoite.task.controller': {'queue': 'crocoite.controller'}, -        # <method>.chunks is actually a starmap job -        'celery.starmap': {'queue': 'crocoite.archive'}, -        } -app.conf.task_default_queue = 'crocoite.default' -# disable prefetching, since our tasks usually run for a _very_ long time -app.conf.worker_prefetch_multiplier = 1 -logger = get_task_logger('crocoite.distributed.archive') - -@app.task(bind=True, track_started=True) -def archive (self, url, settings, enabledBehaviorNames): -    """ -    Archive a single URL - -    Supports these config keys (celeryconfig): - -    warc_filename = '{domain}-{date}-{id}.warc.gz' -    temp_dir = '/tmp/' -    finished_dir = '/tmp/finished' -    """ - -    parsedUrl = urlsplit (url) -    outFile = app.conf.warc_filename.format ( -                    id=self.request.root_id, -                    domain=parsedUrl.hostname.replace ('/', '-'), -                    date=datetime.utcnow ().isoformat (), -                    ) -    outPath = os.path.join (app.conf.temp_dir, outFile) -    fd = open (outPath, 'wb') - -    handler = [StatsHandler (), WarcHandler (fd)] -    enabledBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available)) -    settings = ControllerSettings (**settings) -    try: -        c = SinglePageController (url, fd, behavior=enabledBehavior, -                settings=settings, handler=handler) -        c.run () -    except BrowserCrashed: -        # nothing we can do about that -        logger.error ('browser crashed for {}'.format (url)) - -    os.makedirs (app.conf.finished_dir, exist_ok=True) -    outPath = os.path.join (app.conf.finished_dir, outFile) -    os.rename (fd.name, outPath) - -    return handler[0].stats - -from collections import UserDict - -class IntegerDict (UserDict): -    """ Dict with dict/dict per-item arithmetic propagation, i.e. {1: 2}+{1: 1}={1: 3} """ -    def __add__ (self, b): -        newdict = self.__class__ (self) -        for k, v in b.items (): -            if k in self: -                newdict[k] += v -            else: -                newdict[k] = v -        return newdict - -class DistributedRecursiveController (RecursiveController): -    """ Distributed, recursive controller using celery """ - -    __slots__ = ('concurrency', 'stats') - -    def __init__ (self, url, logger, service=ChromeService (), behavior=behavior.available, \ -            settings=defaultSettings, -            recursionPolicy=DepthLimit (0), concurrency=1): -        super ().__init__ (url, None, service, behavior, logger, settings, recursionPolicy) -        self.concurrency = concurrency -        self.stats = IntegerDict () - -    def fetch (self, urls): -        def chunksIter (urls): -            for u in urls: -                yield (u, self.settings.toDict (), list (map (attrgetter ('name'), self.behavior))) -        itemsPerTask = len (urls)//self.concurrency -        if itemsPerTask <= 0: -            itemsPerTask = len (urls) -        result = archive.chunks (chunksIter (urls), itemsPerTask).apply_async ().get () -        self.stats = sum (chain.from_iterable (result), self.stats) - -@app.task(bind=True, track_started=True) -def controller (self, url, settings, enabledBehaviorNames, recursive, concurrency): -    """ Recursive controller """ - -    logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()]) -    recursionPolicy = parseRecursive (recursive, url) -    enabledBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available)) -    settings = ControllerSettings (**settings) -    c = DistributedRecursiveController (url, None, logger=logger, behavior=enabledBehavior, -            settings=settings, recursionPolicy=recursionPolicy, concurrency=concurrency) -    c.run () -    return dict (c.stats) - - @@ -13,11 +13,10 @@ setup(          'pychrome',          'warcio',          'html5lib>=0.999999999', -        'Celery',      ],      entry_points={      'console_scripts': [ -            'crocoite-grab = crocoite.cli:main', +            'crocoite-grab = crocoite.cli:single',              'crocoite-merge-warc = crocoite.tools:mergeWarc',              'crocoite-extract-screenshot = crocoite.tools:extractScreenshot',              ], | 
