diff options
| author | Lars-Dominik Braun <lars@6xq.net> | 2018-08-21 11:27:05 +0200 | 
|---|---|---|
| committer | Lars-Dominik Braun <lars@6xq.net> | 2018-08-21 13:19:47 +0200 | 
| commit | 53e4df3fe732417988532e5b3d8b4dc7e781a3df (patch) | |
| tree | 2ed52af2b575afcb0165e03eebf6d4f4d30f965e /crocoite | |
| parent | 8e5ac24c85ca9388410b2afda9a05fa4a3d9bf92 (diff) | |
| download | crocoite-53e4df3fe732417988532e5b3d8b4dc7e781a3df.tar.gz crocoite-53e4df3fe732417988532e5b3d8b4dc7e781a3df.tar.bz2 crocoite-53e4df3fe732417988532e5b3d8b4dc7e781a3df.zip | |
Remove celery and recursion
Gonna rewrite that properly.
Diffstat (limited to 'crocoite')
| -rw-r--r-- | crocoite/cli.py | 73 | ||||
| -rw-r--r-- | crocoite/controller.py | 121 | ||||
| -rw-r--r-- | crocoite/task.py | 146 | 
3 files changed, 23 insertions, 317 deletions
| diff --git a/crocoite/cli.py b/crocoite/cli.py index 8e225d9..ac7e648 100644 --- a/crocoite/cli.py +++ b/crocoite/cli.py @@ -25,76 +25,43 @@ Command line interface  import argparse, json, sys  from . import behavior -from .controller import RecursiveController, defaultSettings, \ -        ControllerSettings, DepthLimit, PrefixLimit, StatsHandler +from .controller import SinglePageController, defaultSettings, \ +        ControllerSettings, StatsHandler  from .browser import NullService, ChromeService  from .warc import WarcHandler  from .logger import Logger, JsonPrintConsumer, DatetimeConsumer, WarcHandlerConsumer -def parseRecursive (recursive, url): -    if recursive is None: -        return DepthLimit (0) -    elif recursive.isdigit (): -        return DepthLimit (int (recursive)) -    elif recursive == 'prefix': -        return PrefixLimit (url) -    else: -        raise ValueError ('Unsupported') - -def main (): +def single ():      parser = argparse.ArgumentParser(description='Save website to WARC using Google Chrome.')      parser.add_argument('--browser', help='DevTools URL', metavar='URL') -    parser.add_argument('--recursive', help='Follow links recursively') -    parser.add_argument('--concurrency', '-j', type=int, default=1)      parser.add_argument('--timeout', default=10, type=int, help='Maximum time for archival', metavar='SEC')      parser.add_argument('--idle-timeout', default=2, type=int, help='Maximum idle seconds (i.e. no requests)', dest='idleTimeout', metavar='SEC') -    parser.add_argument('--log-buffer', default=defaultSettings.logBuffer, type=int, dest='logBuffer', metavar='LINES')      parser.add_argument('--max-body-size', default=defaultSettings.maxBodySize, type=int, dest='maxBodySize', help='Max body size', metavar='BYTES')      parser.add_argument('--behavior', help='Comma-separated list of enabled behavior scripts',              dest='enabledBehaviorNames',              default=list (behavior.availableMap.keys ()),              choices=list (behavior.availableMap.keys ())) -    group = parser.add_mutually_exclusive_group (required=True) -    group.add_argument('--output', help='WARC filename', metavar='FILE') -    group.add_argument('--distributed', help='Use celery worker', action='store_true') -    parser.add_argument('url', help='Website URL') +    parser.add_argument('url', help='Website URL', metavar='URL') +    parser.add_argument('output', help='WARC filename', metavar='FILE')      args = parser.parse_args () -    if args.distributed: -        if args.browser: -            parser.error ('--browser is not supported for distributed jobs') -        from . import task -        settings = dict (maxBodySize=args.maxBodySize, -                logBuffer=args.logBuffer, idleTimeout=args.idleTimeout, -                timeout=args.timeout) -        result = task.controller.delay (url=args.url, settings=settings, -                enabledBehaviorNames=args.enabledBehaviorNames, -                recursive=args.recursive, concurrency=args.concurrency) -        r = result.get () -    else: -        logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()]) +    logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()]) -        try: -            recursionPolicy = parseRecursive (args.recursive, args.url) -        except ValueError: -            parser.error ('Invalid argument for --recursive') -        service = ChromeService () -        if args.browser: -            service = NullService (args.browser) -        settings = ControllerSettings (maxBodySize=args.maxBodySize, -                logBuffer=args.logBuffer, idleTimeout=args.idleTimeout, -                timeout=args.timeout) -        with open (args.output, 'wb') as fd, WarcHandler (fd, logger) as warcHandler: -            logger.connect (WarcHandlerConsumer (warcHandler)) -            handler = [StatsHandler (), warcHandler] -            b = list (map (lambda x: behavior.availableMap[x], args.enabledBehaviorNames)) -            controller = RecursiveController (args.url, fd, settings=settings, -                    recursionPolicy=recursionPolicy, service=service, -                    handler=handler, behavior=b, logger=logger) -            controller.run () -            r = handler[0].stats -            logger.info ('stats', context='cli', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **r) +    service = ChromeService () +    if args.browser: +        service = NullService (args.browser) +    settings = ControllerSettings (maxBodySize=args.maxBodySize, +            idleTimeout=args.idleTimeout, timeout=args.timeout) +    with open (args.output, 'wb') as fd, WarcHandler (fd, logger) as warcHandler: +        logger.connect (WarcHandlerConsumer (warcHandler)) +        handler = [StatsHandler (), warcHandler] +        b = list (map (lambda x: behavior.availableMap[x], args.enabledBehaviorNames)) +        controller = SinglePageController (args.url, fd, settings=settings, +                service=service, handler=handler, behavior=b, logger=logger) +        controller.run () +        r = handler[0].stats +        logger.info ('stats', context='cli', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **r)      return True diff --git a/crocoite/controller.py b/crocoite/controller.py index 178d11c..9dae96f 100644 --- a/crocoite/controller.py +++ b/crocoite/controller.py @@ -23,16 +23,15 @@ Controller classes, handling actions required for archival  """  class ControllerSettings: -    __slots__ = ('logBuffer', 'maxBodySize', 'idleTimeout', 'timeout') +    __slots__ = ('maxBodySize', 'idleTimeout', 'timeout') -    def __init__ (self, logBuffer=1000, maxBodySize=50*1024*1024, idleTimeout=2, timeout=10): -        self.logBuffer = logBuffer +    def __init__ (self, maxBodySize=50*1024*1024, idleTimeout=2, timeout=10):          self.maxBodySize = maxBodySize          self.idleTimeout = idleTimeout          self.timeout = timeout      def toDict (self): -        return dict (logBuffer=self.logBuffer, maxBodySize=self.maxBodySize, +        return dict (maxBodySize=self.maxBodySize,                  idleTimeout=self.idleTimeout, timeout=self.timeout)  defaultSettings = ControllerSettings () @@ -207,117 +206,3 @@ class SinglePageController:              processQueue () -class RecursionPolicy: -    """ Abstract recursion policy """ - -    __slots__ = () - -    def __call__ (self, urls): -        raise NotImplementedError - -class DepthLimit (RecursionPolicy): -    """ -    Limit recursion by depth. -     -    depth==0 means no recursion, depth==1 is the page and outgoing links, … -    """ - -    __slots__ = ('maxdepth') - -    def __init__ (self, maxdepth=0): -        self.maxdepth = maxdepth - -    def __call__ (self, urls): -        if self.maxdepth <= 0: -            return {} -        else: -            self.maxdepth -= 1 -            return urls - -    def __repr__ (self): -        return '<DepthLimit {}>'.format (self.maxdepth) - -class PrefixLimit (RecursionPolicy): -    """ -    Limit recursion by prefix -     -    i.e. prefix=http://example.com/foo -    ignored: http://example.com/bar http://offsite.example/foo -    accepted: http://example.com/foobar http://example.com/foo/bar -    """ - -    __slots__ = ('prefix') - -    def __init__ (self, prefix): -        self.prefix = prefix - -    def __call__ (self, urls): -        return set (filter (lambda u: u.startswith (self.prefix), urls)) - -from .behavior import ExtractLinksEvent - -class RecursiveController (EventHandler): -    """ -    Simple recursive controller - -    Visits links acording to recursionPolicy -    """ - -    __slots__ = ('url', 'output', 'service', 'behavior', 'settings', 'logger', -            'recursionPolicy', 'handler', 'urls', 'have') - -    def __init__ (self, url, output, logger, -            service=ChromeService (), behavior=cbehavior.available, \ -            settings=defaultSettings, \ -            recursionPolicy=DepthLimit (0), handler=[]): -        self.url = url -        self.output = output -        self.service = service -        self.behavior = behavior -        self.settings = settings -        self.logger = logger.bind (context=type(self).__name__, url=url) -        self.recursionPolicy = recursionPolicy -        self.handler = handler -        self.handler.append (self) - -    def fetch (self, urls): -        """ -        Overrideable fetch action for URLs. Defaults to sequential -        SinglePageController. -        """ -        for u in urls: -            try: -                c = SinglePageController (url=u, output=self.output, service=self.service, -                        behavior=self.behavior, logger=self.logger, -                        settings=self.settings, handler=self.handler) -                c.run () -            except BrowserCrashed: -                # this is fine if reported -                self.logger.error ('browser crashed', uuid='42582cbe-fb83-47ce-b330-d022a1c3b331') - -    def run (self): -        self.have = set () -        self.urls = set ([self.url]) - -        while self.urls: -            self.logger.info ('recursing', -                    uuid='5b8498e4-868d-413c-a67e-004516b8452c', -                    numurls=len (self.urls)) - -            self.have.update (self.urls) -            fetchurls = self.urls -            self.urls = set () - -            # handler appends new urls to self.urls through push() -            self.fetch (fetchurls) - -            # remove urls we have and apply recursion policy -            self.urls.difference_update (self.have) -            self.urls = self.recursionPolicy (self.urls) - -    def push (self, item): -        if isinstance (item, ExtractLinksEvent): -            self.logger.debug ('extracted links', -                    uuid='8ee5e9c9-1130-4c5c-88ff-718508546e0c', links=item.links) -            self.urls.update (map (removeFragment, item.links)) - diff --git a/crocoite/task.py b/crocoite/task.py deleted file mode 100644 index 06dd022..0000000 --- a/crocoite/task.py +++ /dev/null @@ -1,146 +0,0 @@ -# Copyright (c) 2017–2018 crocoite contributors -#  -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -#  -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -#  -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -""" -Celery distributed tasks -""" - -import os - -from urllib.parse import urlsplit -from datetime import datetime -from operator import attrgetter -from itertools import chain - -def _monkeyPatchSyncTasks (): -    """ Result sets don’t support the argument disable_sync_subtasks argument """ -    import celery.result -    celery.result.assert_will_not_block = lambda: None - -_monkeyPatchSyncTasks () -from celery import Celery -from celery.utils.log import get_task_logger - -from .browser import ChromeService, BrowserCrashed -from .controller import SinglePageController, ControllerSettings, RecursiveController, defaultSettings, DepthLimit, StatsHandler -from . import behavior -from .cli import parseRecursive -from .warc import WarcHandler - -app = Celery ('crocoite.distributed') -app.config_from_object('celeryconfig') -app.conf.task_routes = { -        'crocoite.task.archive': {'queue': 'crocoite.archive'}, -        'crocoite.task.controller': {'queue': 'crocoite.controller'}, -        # <method>.chunks is actually a starmap job -        'celery.starmap': {'queue': 'crocoite.archive'}, -        } -app.conf.task_default_queue = 'crocoite.default' -# disable prefetching, since our tasks usually run for a _very_ long time -app.conf.worker_prefetch_multiplier = 1 -logger = get_task_logger('crocoite.distributed.archive') - -@app.task(bind=True, track_started=True) -def archive (self, url, settings, enabledBehaviorNames): -    """ -    Archive a single URL - -    Supports these config keys (celeryconfig): - -    warc_filename = '{domain}-{date}-{id}.warc.gz' -    temp_dir = '/tmp/' -    finished_dir = '/tmp/finished' -    """ - -    parsedUrl = urlsplit (url) -    outFile = app.conf.warc_filename.format ( -                    id=self.request.root_id, -                    domain=parsedUrl.hostname.replace ('/', '-'), -                    date=datetime.utcnow ().isoformat (), -                    ) -    outPath = os.path.join (app.conf.temp_dir, outFile) -    fd = open (outPath, 'wb') - -    handler = [StatsHandler (), WarcHandler (fd)] -    enabledBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available)) -    settings = ControllerSettings (**settings) -    try: -        c = SinglePageController (url, fd, behavior=enabledBehavior, -                settings=settings, handler=handler) -        c.run () -    except BrowserCrashed: -        # nothing we can do about that -        logger.error ('browser crashed for {}'.format (url)) - -    os.makedirs (app.conf.finished_dir, exist_ok=True) -    outPath = os.path.join (app.conf.finished_dir, outFile) -    os.rename (fd.name, outPath) - -    return handler[0].stats - -from collections import UserDict - -class IntegerDict (UserDict): -    """ Dict with dict/dict per-item arithmetic propagation, i.e. {1: 2}+{1: 1}={1: 3} """ -    def __add__ (self, b): -        newdict = self.__class__ (self) -        for k, v in b.items (): -            if k in self: -                newdict[k] += v -            else: -                newdict[k] = v -        return newdict - -class DistributedRecursiveController (RecursiveController): -    """ Distributed, recursive controller using celery """ - -    __slots__ = ('concurrency', 'stats') - -    def __init__ (self, url, logger, service=ChromeService (), behavior=behavior.available, \ -            settings=defaultSettings, -            recursionPolicy=DepthLimit (0), concurrency=1): -        super ().__init__ (url, None, service, behavior, logger, settings, recursionPolicy) -        self.concurrency = concurrency -        self.stats = IntegerDict () - -    def fetch (self, urls): -        def chunksIter (urls): -            for u in urls: -                yield (u, self.settings.toDict (), list (map (attrgetter ('name'), self.behavior))) -        itemsPerTask = len (urls)//self.concurrency -        if itemsPerTask <= 0: -            itemsPerTask = len (urls) -        result = archive.chunks (chunksIter (urls), itemsPerTask).apply_async ().get () -        self.stats = sum (chain.from_iterable (result), self.stats) - -@app.task(bind=True, track_started=True) -def controller (self, url, settings, enabledBehaviorNames, recursive, concurrency): -    """ Recursive controller """ - -    logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()]) -    recursionPolicy = parseRecursive (recursive, url) -    enabledBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available)) -    settings = ControllerSettings (**settings) -    c = DistributedRecursiveController (url, None, logger=logger, behavior=enabledBehavior, -            settings=settings, recursionPolicy=recursionPolicy, concurrency=concurrency) -    c.run () -    return dict (c.stats) - - | 
