From 53e4df3fe732417988532e5b3d8b4dc7e781a3df Mon Sep 17 00:00:00 2001 From: Lars-Dominik Braun Date: Tue, 21 Aug 2018 11:27:05 +0200 Subject: Remove celery and recursion Gonna rewrite that properly. --- README.rst | 61 ------------ contrib/celerycrocoite.py | 229 ---------------------------------------------- crocoite/cli.py | 73 ++++----------- crocoite/controller.py | 121 +----------------------- crocoite/task.py | 146 ----------------------------- setup.py | 3 +- 6 files changed, 24 insertions(+), 609 deletions(-) delete mode 100644 contrib/celerycrocoite.py delete mode 100644 crocoite/task.py diff --git a/README.rst b/README.rst index 7108491..b1fce2c 100644 --- a/README.rst +++ b/README.rst @@ -17,12 +17,10 @@ The following dependencies must be present to run crocoite: - pychrome_ - warcio_ - html5lib_ -- Celery_ (optional) .. _pychrome: https://github.com/fate0/pychrome .. _warcio: https://github.com/webrecorder/warcio .. _html5lib: https://github.com/html5lib/html5lib-python -.. _Celery: http://www.celeryproject.org/ It is recommended to prepare a virtualenv and let pip handle the dependency resolution for Python packages instead: @@ -121,65 +119,6 @@ does not work any more. Secondly it also saves a screenshot of the full page, so even if future browsers cannot render and display the stored HTML a fully rendered version of the website can be replayed instead. -Advanced usage --------------- - -crocoite offers more than just a one-shot command-line interface. - -Distributed crawling -^^^^^^^^^^^^^^^^^^^^ - -Configure using celeryconfig.py - -.. code:: python - - broker_url = 'pyamqp://' - result_backend = 'rpc://' - warc_filename = '{domain}-{date}-{id}.warc.gz' - temp_dir = '/tmp/' - finished_dir = '/tmp/finished' - -Start a Celery worker:: - - celery -A crocoite.task worker -Q crocoite.archive,crocoite.controller --loglevel=info - -Then queue archive job:: - - crocoite-grab --distributed http://example.com - -The worker will create a temporary file named according to ``warc_filename`` in -``/tmp`` while archiving and move it to ``/tmp/finished`` when done. - -IRC bot -^^^^^^^ - -Configure sopel_ (``~/.sopel/default.cfg``) to use the plugin located in -``contrib/celerycrocoite.py`` - -.. code:: ini - - [core] - nick = chromebot - host = irc.efnet.fr - port = 6667 - owner = someone - extra = /path/to/crocoite/contrib - enable = celerycrocoite - channels = #somechannel - -Then start it by running ``sopel``. The bot must be addressed directly (i.e. -``chromebot: ``). The following commands are currently supported: - -a - Archives and all of its resources (images, css, …). A unique UID - (UUID) is assigned to each job. -s - Get status of job with -r - Revoke job with . If it started already the job will be killed. - -.. _sopel: https://sopel.chat/ - Related projects ---------------- diff --git a/contrib/celerycrocoite.py b/contrib/celerycrocoite.py deleted file mode 100644 index 3da43d9..0000000 --- a/contrib/celerycrocoite.py +++ /dev/null @@ -1,229 +0,0 @@ -# Copyright (c) 2017 crocoite contributors -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -""" -Module for Sopel IRC bot -""" - -import os, logging, argparse -from sopel.module import nickname_commands, require_chanmsg, thread, example, require_privilege, VOICE -from sopel.tools import Identifier, SopelMemory -import celery, celery.exceptions -from celery.result import AsyncResult -from urllib.parse import urlsplit -from threading import Thread -from queue import Queue -import queue - -from crocoite import behavior, task -from crocoite.controller import defaultSettings - -def prettyTimeDelta (seconds): - """ - Pretty-print seconds to human readable string 1d 1h 1m 1s - """ - seconds = int(seconds) - days, seconds = divmod(seconds, 86400) - hours, seconds = divmod(seconds, 3600) - minutes, seconds = divmod(seconds, 60) - s = [(days, 'd'), (hours, 'h'), (minutes, 'm'), (seconds, 's')] - s = filter (lambda x: x[0] != 0, s) - return ' '.join (map (lambda x: '{}{}'.format (*x), s)) - -def prettyBytes (b): - """ - Pretty-print bytes - """ - prefixes = ['B', 'KiB', 'MiB', 'GiB', 'TiB'] - while b >= 1024 and len (prefixes) > 1: - b /= 1024 - prefixes.pop (0) - return '{:.1f} {}'.format (b, prefixes[0]) - -def setup (bot): - m = bot.memory['crocoite'] = {} - q = m['q'] = Queue () - t = m['t'] = Thread (target=celeryWorker, args=(bot, q)) - t.start () - -def shutdown (bot): - m = bot.memory['crocoite'] - q = m['q'] - t = m['t'] - q.put_nowait (None) - t.join () - -def isValidUrl (s): - url = urlsplit (s) - if url.scheme and url.netloc and url.scheme in {'http', 'https'}: - return s - raise TypeError () - -def checkCompletedJobs (bot, jobs): - delete = set () - for i, data in jobs.items (): - handle = data['handle'] - trigger = data['trigger'] - args = data['args'] - url = args['url'] - channel = trigger.sender - user = trigger.nick - if Identifier (channel) not in bot.channels: - continue - try: - stats = handle.get (timeout=0.1) - bot.msg (channel, '{}: {} ({}) finished. {} crashed, {} requests, {} failed, {} received.'.format (user, url, - handle.id, stats['crashed'], stats['requests'], stats['failed'], - prettyBytes (stats['bytesRcv']))) - delete.add (handle.id) - except celery.exceptions.TimeoutError: - pass - except Exception as e: - # json serialization does not work well with exceptions. If their class - # names are unique we can still distinguish them. - ename = type (e).__name__ - if ename == 'TaskRevokedError': - bot.msg (channel, '{}: {} ({}) was revoked'.format (user, url, handle.id)) - else: - bot.msg (channel, '{} ({}) failed'.format (user, url, handle.id)) - logging.exception ('{} ({}) failed'.format (url, handle.id)) - delete.add (handle.id) - for d in delete: - del jobs[d] - -def celeryWorker (bot, q): - """ - Serialize celery operations in a single thread. This is a workaround for - https://github.com/celery/celery/issues/4480 - """ - - jobs = {} - - while True: - try: - item = q.get (timeout=1) - except queue.Empty: - checkCompletedJobs (bot, jobs) - continue - - if item is None: - break - action, trigger, args = item - if action == 'a': - handle = task.controller.delay (**args) - j = jobs[handle.id] = {'handle': handle, 'trigger': trigger, 'args': args} - - # pretty-print a few selected args - showargs = { - 'idleTimeout': prettyTimeDelta (args['settings']['idleTimeout']), - 'timeout': prettyTimeDelta (args['settings']['timeout']), - 'maxBodySize': prettyBytes (args['settings']['maxBodySize']), - 'recursive': args['recursive'], - 'concurrency': args['concurrency'], - } - strargs = ', '.join (map (lambda x: '{}={}'.format (*x), showargs.items ())) - bot.msg (trigger.sender, '{}: {} has been queued as {} with {}'.format (trigger.nick, args['url'], handle.id, strargs)) - elif action == 'status': - if args and args in jobs: - j = jobs[args] - jtrigger = j['trigger'] - handle = j['handle'] - bot.msg (trigger.sender, '{}: {}, queued {}, by {}'.format (handle.id, - handle.status, jtrigger.time, jtrigger.nick)) - else: - bot.msg (trigger.sender, "Job not found.") - elif action == 'revoke': - if args and args in jobs: - j = jobs[args] - handle = j['handle'] - handle.revoke (terminate=True) - # response is handled above - else: - bot.msg (trigger.sender, "Job not found.") - q.task_done () - -class NonExitingArgumentParser (argparse.ArgumentParser): - def exit (self, status=0, message=None): - # should never be called - pass - - def error (self, message): - raise Exception (message) - -archiveparser = NonExitingArgumentParser (prog='a', add_help=False) -archiveparser.add_argument('--timeout', default=1*60*60, type=int, help='Maximum time for archival', metavar='SEC', choices=[60, 1*60*60, 2*60*60]) -archiveparser.add_argument('--idle-timeout', default=10, type=int, help='Maximum idle seconds (i.e. no requests)', dest='idleTimeout', metavar='SEC', choices=[1, 10, 20, 30, 60]) -archiveparser.add_argument('--max-body-size', default=defaultSettings.maxBodySize, type=int, dest='maxBodySize', help='Max body size', metavar='BYTES', choices=[1*1024*1024, 10*1024*1024, defaultSettings.maxBodySize, 100*1024*1024]) -archiveparser.add_argument('--concurrency', default=1, type=int, help='Parallel workers for this job', choices=range (9)) -archiveparser.add_argument('--recursive', help='Enable recursion', choices=['0', '1', '2', '3', 'prefix']) -archiveparser.add_argument('url', help='Website URL', type=isValidUrl) - -@nickname_commands ('a', 'archive') -@require_chanmsg () -@require_privilege (VOICE) -@example ('a http://example.com') -def archive (bot, trigger): - """ - Archive a URL to WARC - """ - - try: - args = archiveparser.parse_args (trigger.group (2).split ()) - except Exception as e: - bot.reply ('{} -- {}'.format (e.args[0], archiveparser.format_usage ())) - return - if not args: - bot.reply ('Sorry, I don’t understand {}'.format (trigger.group (2))) - return - settings = dict (maxBodySize=args.maxBodySize, - logBuffer=defaultSettings.logBuffer, idleTimeout=args.idleTimeout, - timeout=args.timeout) - args = dict (url=args.url, - enabledBehaviorNames=list (behavior.availableMap.keys ()), - settings=settings, recursive=args.recursive, - concurrency=args.concurrency) - q = bot.memory['crocoite']['q'] - q.put_nowait (('a', trigger, args)) - -@nickname_commands ('s', 'status') -@example ('s c251f09e-3c26-481f-96e0-4b5f58bd1170') -@require_chanmsg () -def status (bot, trigger): - """ - Retrieve status for a job - """ - - i = trigger.group(2) - q = bot.memory['crocoite']['q'] - q.put_nowait (('status', trigger, i)) - -@nickname_commands ('r', 'revoke') -@example ('r c251f09e-3c26-481f-96e0-4b5f58bd1170') -@require_privilege (VOICE) -@require_chanmsg () -def revoke (bot, trigger): - """ - Cancel (revoke) a job - """ - - i = trigger.group(2) - q = bot.memory['crocoite']['q'] - q.put_nowait (('revoke', trigger, i)) - diff --git a/crocoite/cli.py b/crocoite/cli.py index 8e225d9..ac7e648 100644 --- a/crocoite/cli.py +++ b/crocoite/cli.py @@ -25,76 +25,43 @@ Command line interface import argparse, json, sys from . import behavior -from .controller import RecursiveController, defaultSettings, \ - ControllerSettings, DepthLimit, PrefixLimit, StatsHandler +from .controller import SinglePageController, defaultSettings, \ + ControllerSettings, StatsHandler from .browser import NullService, ChromeService from .warc import WarcHandler from .logger import Logger, JsonPrintConsumer, DatetimeConsumer, WarcHandlerConsumer -def parseRecursive (recursive, url): - if recursive is None: - return DepthLimit (0) - elif recursive.isdigit (): - return DepthLimit (int (recursive)) - elif recursive == 'prefix': - return PrefixLimit (url) - else: - raise ValueError ('Unsupported') - -def main (): +def single (): parser = argparse.ArgumentParser(description='Save website to WARC using Google Chrome.') parser.add_argument('--browser', help='DevTools URL', metavar='URL') - parser.add_argument('--recursive', help='Follow links recursively') - parser.add_argument('--concurrency', '-j', type=int, default=1) parser.add_argument('--timeout', default=10, type=int, help='Maximum time for archival', metavar='SEC') parser.add_argument('--idle-timeout', default=2, type=int, help='Maximum idle seconds (i.e. no requests)', dest='idleTimeout', metavar='SEC') - parser.add_argument('--log-buffer', default=defaultSettings.logBuffer, type=int, dest='logBuffer', metavar='LINES') parser.add_argument('--max-body-size', default=defaultSettings.maxBodySize, type=int, dest='maxBodySize', help='Max body size', metavar='BYTES') parser.add_argument('--behavior', help='Comma-separated list of enabled behavior scripts', dest='enabledBehaviorNames', default=list (behavior.availableMap.keys ()), choices=list (behavior.availableMap.keys ())) - group = parser.add_mutually_exclusive_group (required=True) - group.add_argument('--output', help='WARC filename', metavar='FILE') - group.add_argument('--distributed', help='Use celery worker', action='store_true') - parser.add_argument('url', help='Website URL') + parser.add_argument('url', help='Website URL', metavar='URL') + parser.add_argument('output', help='WARC filename', metavar='FILE') args = parser.parse_args () - if args.distributed: - if args.browser: - parser.error ('--browser is not supported for distributed jobs') - from . import task - settings = dict (maxBodySize=args.maxBodySize, - logBuffer=args.logBuffer, idleTimeout=args.idleTimeout, - timeout=args.timeout) - result = task.controller.delay (url=args.url, settings=settings, - enabledBehaviorNames=args.enabledBehaviorNames, - recursive=args.recursive, concurrency=args.concurrency) - r = result.get () - else: - logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()]) + logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()]) - try: - recursionPolicy = parseRecursive (args.recursive, args.url) - except ValueError: - parser.error ('Invalid argument for --recursive') - service = ChromeService () - if args.browser: - service = NullService (args.browser) - settings = ControllerSettings (maxBodySize=args.maxBodySize, - logBuffer=args.logBuffer, idleTimeout=args.idleTimeout, - timeout=args.timeout) - with open (args.output, 'wb') as fd, WarcHandler (fd, logger) as warcHandler: - logger.connect (WarcHandlerConsumer (warcHandler)) - handler = [StatsHandler (), warcHandler] - b = list (map (lambda x: behavior.availableMap[x], args.enabledBehaviorNames)) - controller = RecursiveController (args.url, fd, settings=settings, - recursionPolicy=recursionPolicy, service=service, - handler=handler, behavior=b, logger=logger) - controller.run () - r = handler[0].stats - logger.info ('stats', context='cli', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **r) + service = ChromeService () + if args.browser: + service = NullService (args.browser) + settings = ControllerSettings (maxBodySize=args.maxBodySize, + idleTimeout=args.idleTimeout, timeout=args.timeout) + with open (args.output, 'wb') as fd, WarcHandler (fd, logger) as warcHandler: + logger.connect (WarcHandlerConsumer (warcHandler)) + handler = [StatsHandler (), warcHandler] + b = list (map (lambda x: behavior.availableMap[x], args.enabledBehaviorNames)) + controller = SinglePageController (args.url, fd, settings=settings, + service=service, handler=handler, behavior=b, logger=logger) + controller.run () + r = handler[0].stats + logger.info ('stats', context='cli', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **r) return True diff --git a/crocoite/controller.py b/crocoite/controller.py index 178d11c..9dae96f 100644 --- a/crocoite/controller.py +++ b/crocoite/controller.py @@ -23,16 +23,15 @@ Controller classes, handling actions required for archival """ class ControllerSettings: - __slots__ = ('logBuffer', 'maxBodySize', 'idleTimeout', 'timeout') + __slots__ = ('maxBodySize', 'idleTimeout', 'timeout') - def __init__ (self, logBuffer=1000, maxBodySize=50*1024*1024, idleTimeout=2, timeout=10): - self.logBuffer = logBuffer + def __init__ (self, maxBodySize=50*1024*1024, idleTimeout=2, timeout=10): self.maxBodySize = maxBodySize self.idleTimeout = idleTimeout self.timeout = timeout def toDict (self): - return dict (logBuffer=self.logBuffer, maxBodySize=self.maxBodySize, + return dict (maxBodySize=self.maxBodySize, idleTimeout=self.idleTimeout, timeout=self.timeout) defaultSettings = ControllerSettings () @@ -207,117 +206,3 @@ class SinglePageController: processQueue () -class RecursionPolicy: - """ Abstract recursion policy """ - - __slots__ = () - - def __call__ (self, urls): - raise NotImplementedError - -class DepthLimit (RecursionPolicy): - """ - Limit recursion by depth. - - depth==0 means no recursion, depth==1 is the page and outgoing links, … - """ - - __slots__ = ('maxdepth') - - def __init__ (self, maxdepth=0): - self.maxdepth = maxdepth - - def __call__ (self, urls): - if self.maxdepth <= 0: - return {} - else: - self.maxdepth -= 1 - return urls - - def __repr__ (self): - return ''.format (self.maxdepth) - -class PrefixLimit (RecursionPolicy): - """ - Limit recursion by prefix - - i.e. prefix=http://example.com/foo - ignored: http://example.com/bar http://offsite.example/foo - accepted: http://example.com/foobar http://example.com/foo/bar - """ - - __slots__ = ('prefix') - - def __init__ (self, prefix): - self.prefix = prefix - - def __call__ (self, urls): - return set (filter (lambda u: u.startswith (self.prefix), urls)) - -from .behavior import ExtractLinksEvent - -class RecursiveController (EventHandler): - """ - Simple recursive controller - - Visits links acording to recursionPolicy - """ - - __slots__ = ('url', 'output', 'service', 'behavior', 'settings', 'logger', - 'recursionPolicy', 'handler', 'urls', 'have') - - def __init__ (self, url, output, logger, - service=ChromeService (), behavior=cbehavior.available, \ - settings=defaultSettings, \ - recursionPolicy=DepthLimit (0), handler=[]): - self.url = url - self.output = output - self.service = service - self.behavior = behavior - self.settings = settings - self.logger = logger.bind (context=type(self).__name__, url=url) - self.recursionPolicy = recursionPolicy - self.handler = handler - self.handler.append (self) - - def fetch (self, urls): - """ - Overrideable fetch action for URLs. Defaults to sequential - SinglePageController. - """ - for u in urls: - try: - c = SinglePageController (url=u, output=self.output, service=self.service, - behavior=self.behavior, logger=self.logger, - settings=self.settings, handler=self.handler) - c.run () - except BrowserCrashed: - # this is fine if reported - self.logger.error ('browser crashed', uuid='42582cbe-fb83-47ce-b330-d022a1c3b331') - - def run (self): - self.have = set () - self.urls = set ([self.url]) - - while self.urls: - self.logger.info ('recursing', - uuid='5b8498e4-868d-413c-a67e-004516b8452c', - numurls=len (self.urls)) - - self.have.update (self.urls) - fetchurls = self.urls - self.urls = set () - - # handler appends new urls to self.urls through push() - self.fetch (fetchurls) - - # remove urls we have and apply recursion policy - self.urls.difference_update (self.have) - self.urls = self.recursionPolicy (self.urls) - - def push (self, item): - if isinstance (item, ExtractLinksEvent): - self.logger.debug ('extracted links', - uuid='8ee5e9c9-1130-4c5c-88ff-718508546e0c', links=item.links) - self.urls.update (map (removeFragment, item.links)) - diff --git a/crocoite/task.py b/crocoite/task.py deleted file mode 100644 index 06dd022..0000000 --- a/crocoite/task.py +++ /dev/null @@ -1,146 +0,0 @@ -# Copyright (c) 2017–2018 crocoite contributors -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -""" -Celery distributed tasks -""" - -import os - -from urllib.parse import urlsplit -from datetime import datetime -from operator import attrgetter -from itertools import chain - -def _monkeyPatchSyncTasks (): - """ Result sets don’t support the argument disable_sync_subtasks argument """ - import celery.result - celery.result.assert_will_not_block = lambda: None - -_monkeyPatchSyncTasks () -from celery import Celery -from celery.utils.log import get_task_logger - -from .browser import ChromeService, BrowserCrashed -from .controller import SinglePageController, ControllerSettings, RecursiveController, defaultSettings, DepthLimit, StatsHandler -from . import behavior -from .cli import parseRecursive -from .warc import WarcHandler - -app = Celery ('crocoite.distributed') -app.config_from_object('celeryconfig') -app.conf.task_routes = { - 'crocoite.task.archive': {'queue': 'crocoite.archive'}, - 'crocoite.task.controller': {'queue': 'crocoite.controller'}, - # .chunks is actually a starmap job - 'celery.starmap': {'queue': 'crocoite.archive'}, - } -app.conf.task_default_queue = 'crocoite.default' -# disable prefetching, since our tasks usually run for a _very_ long time -app.conf.worker_prefetch_multiplier = 1 -logger = get_task_logger('crocoite.distributed.archive') - -@app.task(bind=True, track_started=True) -def archive (self, url, settings, enabledBehaviorNames): - """ - Archive a single URL - - Supports these config keys (celeryconfig): - - warc_filename = '{domain}-{date}-{id}.warc.gz' - temp_dir = '/tmp/' - finished_dir = '/tmp/finished' - """ - - parsedUrl = urlsplit (url) - outFile = app.conf.warc_filename.format ( - id=self.request.root_id, - domain=parsedUrl.hostname.replace ('/', '-'), - date=datetime.utcnow ().isoformat (), - ) - outPath = os.path.join (app.conf.temp_dir, outFile) - fd = open (outPath, 'wb') - - handler = [StatsHandler (), WarcHandler (fd)] - enabledBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available)) - settings = ControllerSettings (**settings) - try: - c = SinglePageController (url, fd, behavior=enabledBehavior, - settings=settings, handler=handler) - c.run () - except BrowserCrashed: - # nothing we can do about that - logger.error ('browser crashed for {}'.format (url)) - - os.makedirs (app.conf.finished_dir, exist_ok=True) - outPath = os.path.join (app.conf.finished_dir, outFile) - os.rename (fd.name, outPath) - - return handler[0].stats - -from collections import UserDict - -class IntegerDict (UserDict): - """ Dict with dict/dict per-item arithmetic propagation, i.e. {1: 2}+{1: 1}={1: 3} """ - def __add__ (self, b): - newdict = self.__class__ (self) - for k, v in b.items (): - if k in self: - newdict[k] += v - else: - newdict[k] = v - return newdict - -class DistributedRecursiveController (RecursiveController): - """ Distributed, recursive controller using celery """ - - __slots__ = ('concurrency', 'stats') - - def __init__ (self, url, logger, service=ChromeService (), behavior=behavior.available, \ - settings=defaultSettings, - recursionPolicy=DepthLimit (0), concurrency=1): - super ().__init__ (url, None, service, behavior, logger, settings, recursionPolicy) - self.concurrency = concurrency - self.stats = IntegerDict () - - def fetch (self, urls): - def chunksIter (urls): - for u in urls: - yield (u, self.settings.toDict (), list (map (attrgetter ('name'), self.behavior))) - itemsPerTask = len (urls)//self.concurrency - if itemsPerTask <= 0: - itemsPerTask = len (urls) - result = archive.chunks (chunksIter (urls), itemsPerTask).apply_async ().get () - self.stats = sum (chain.from_iterable (result), self.stats) - -@app.task(bind=True, track_started=True) -def controller (self, url, settings, enabledBehaviorNames, recursive, concurrency): - """ Recursive controller """ - - logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()]) - recursionPolicy = parseRecursive (recursive, url) - enabledBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available)) - settings = ControllerSettings (**settings) - c = DistributedRecursiveController (url, None, logger=logger, behavior=enabledBehavior, - settings=settings, recursionPolicy=recursionPolicy, concurrency=concurrency) - c.run () - return dict (c.stats) - - diff --git a/setup.py b/setup.py index 26113cf..e2357ca 100644 --- a/setup.py +++ b/setup.py @@ -13,11 +13,10 @@ setup( 'pychrome', 'warcio', 'html5lib>=0.999999999', - 'Celery', ], entry_points={ 'console_scripts': [ - 'crocoite-grab = crocoite.cli:main', + 'crocoite-grab = crocoite.cli:single', 'crocoite-merge-warc = crocoite.tools:mergeWarc', 'crocoite-extract-screenshot = crocoite.tools:extractScreenshot', ], -- cgit v1.2.3