From 53e4df3fe732417988532e5b3d8b4dc7e781a3df Mon Sep 17 00:00:00 2001 From: Lars-Dominik Braun Date: Tue, 21 Aug 2018 11:27:05 +0200 Subject: Remove celery and recursion Gonna rewrite that properly. --- crocoite/cli.py | 73 +++++++------------------ crocoite/controller.py | 121 +--------------------------------------- crocoite/task.py | 146 ------------------------------------------------- 3 files changed, 23 insertions(+), 317 deletions(-) delete mode 100644 crocoite/task.py (limited to 'crocoite') diff --git a/crocoite/cli.py b/crocoite/cli.py index 8e225d9..ac7e648 100644 --- a/crocoite/cli.py +++ b/crocoite/cli.py @@ -25,76 +25,43 @@ Command line interface import argparse, json, sys from . import behavior -from .controller import RecursiveController, defaultSettings, \ - ControllerSettings, DepthLimit, PrefixLimit, StatsHandler +from .controller import SinglePageController, defaultSettings, \ + ControllerSettings, StatsHandler from .browser import NullService, ChromeService from .warc import WarcHandler from .logger import Logger, JsonPrintConsumer, DatetimeConsumer, WarcHandlerConsumer -def parseRecursive (recursive, url): - if recursive is None: - return DepthLimit (0) - elif recursive.isdigit (): - return DepthLimit (int (recursive)) - elif recursive == 'prefix': - return PrefixLimit (url) - else: - raise ValueError ('Unsupported') - -def main (): +def single (): parser = argparse.ArgumentParser(description='Save website to WARC using Google Chrome.') parser.add_argument('--browser', help='DevTools URL', metavar='URL') - parser.add_argument('--recursive', help='Follow links recursively') - parser.add_argument('--concurrency', '-j', type=int, default=1) parser.add_argument('--timeout', default=10, type=int, help='Maximum time for archival', metavar='SEC') parser.add_argument('--idle-timeout', default=2, type=int, help='Maximum idle seconds (i.e. no requests)', dest='idleTimeout', metavar='SEC') - parser.add_argument('--log-buffer', default=defaultSettings.logBuffer, type=int, dest='logBuffer', metavar='LINES') parser.add_argument('--max-body-size', default=defaultSettings.maxBodySize, type=int, dest='maxBodySize', help='Max body size', metavar='BYTES') parser.add_argument('--behavior', help='Comma-separated list of enabled behavior scripts', dest='enabledBehaviorNames', default=list (behavior.availableMap.keys ()), choices=list (behavior.availableMap.keys ())) - group = parser.add_mutually_exclusive_group (required=True) - group.add_argument('--output', help='WARC filename', metavar='FILE') - group.add_argument('--distributed', help='Use celery worker', action='store_true') - parser.add_argument('url', help='Website URL') + parser.add_argument('url', help='Website URL', metavar='URL') + parser.add_argument('output', help='WARC filename', metavar='FILE') args = parser.parse_args () - if args.distributed: - if args.browser: - parser.error ('--browser is not supported for distributed jobs') - from . import task - settings = dict (maxBodySize=args.maxBodySize, - logBuffer=args.logBuffer, idleTimeout=args.idleTimeout, - timeout=args.timeout) - result = task.controller.delay (url=args.url, settings=settings, - enabledBehaviorNames=args.enabledBehaviorNames, - recursive=args.recursive, concurrency=args.concurrency) - r = result.get () - else: - logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()]) + logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()]) - try: - recursionPolicy = parseRecursive (args.recursive, args.url) - except ValueError: - parser.error ('Invalid argument for --recursive') - service = ChromeService () - if args.browser: - service = NullService (args.browser) - settings = ControllerSettings (maxBodySize=args.maxBodySize, - logBuffer=args.logBuffer, idleTimeout=args.idleTimeout, - timeout=args.timeout) - with open (args.output, 'wb') as fd, WarcHandler (fd, logger) as warcHandler: - logger.connect (WarcHandlerConsumer (warcHandler)) - handler = [StatsHandler (), warcHandler] - b = list (map (lambda x: behavior.availableMap[x], args.enabledBehaviorNames)) - controller = RecursiveController (args.url, fd, settings=settings, - recursionPolicy=recursionPolicy, service=service, - handler=handler, behavior=b, logger=logger) - controller.run () - r = handler[0].stats - logger.info ('stats', context='cli', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **r) + service = ChromeService () + if args.browser: + service = NullService (args.browser) + settings = ControllerSettings (maxBodySize=args.maxBodySize, + idleTimeout=args.idleTimeout, timeout=args.timeout) + with open (args.output, 'wb') as fd, WarcHandler (fd, logger) as warcHandler: + logger.connect (WarcHandlerConsumer (warcHandler)) + handler = [StatsHandler (), warcHandler] + b = list (map (lambda x: behavior.availableMap[x], args.enabledBehaviorNames)) + controller = SinglePageController (args.url, fd, settings=settings, + service=service, handler=handler, behavior=b, logger=logger) + controller.run () + r = handler[0].stats + logger.info ('stats', context='cli', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **r) return True diff --git a/crocoite/controller.py b/crocoite/controller.py index 178d11c..9dae96f 100644 --- a/crocoite/controller.py +++ b/crocoite/controller.py @@ -23,16 +23,15 @@ Controller classes, handling actions required for archival """ class ControllerSettings: - __slots__ = ('logBuffer', 'maxBodySize', 'idleTimeout', 'timeout') + __slots__ = ('maxBodySize', 'idleTimeout', 'timeout') - def __init__ (self, logBuffer=1000, maxBodySize=50*1024*1024, idleTimeout=2, timeout=10): - self.logBuffer = logBuffer + def __init__ (self, maxBodySize=50*1024*1024, idleTimeout=2, timeout=10): self.maxBodySize = maxBodySize self.idleTimeout = idleTimeout self.timeout = timeout def toDict (self): - return dict (logBuffer=self.logBuffer, maxBodySize=self.maxBodySize, + return dict (maxBodySize=self.maxBodySize, idleTimeout=self.idleTimeout, timeout=self.timeout) defaultSettings = ControllerSettings () @@ -207,117 +206,3 @@ class SinglePageController: processQueue () -class RecursionPolicy: - """ Abstract recursion policy """ - - __slots__ = () - - def __call__ (self, urls): - raise NotImplementedError - -class DepthLimit (RecursionPolicy): - """ - Limit recursion by depth. - - depth==0 means no recursion, depth==1 is the page and outgoing links, … - """ - - __slots__ = ('maxdepth') - - def __init__ (self, maxdepth=0): - self.maxdepth = maxdepth - - def __call__ (self, urls): - if self.maxdepth <= 0: - return {} - else: - self.maxdepth -= 1 - return urls - - def __repr__ (self): - return ''.format (self.maxdepth) - -class PrefixLimit (RecursionPolicy): - """ - Limit recursion by prefix - - i.e. prefix=http://example.com/foo - ignored: http://example.com/bar http://offsite.example/foo - accepted: http://example.com/foobar http://example.com/foo/bar - """ - - __slots__ = ('prefix') - - def __init__ (self, prefix): - self.prefix = prefix - - def __call__ (self, urls): - return set (filter (lambda u: u.startswith (self.prefix), urls)) - -from .behavior import ExtractLinksEvent - -class RecursiveController (EventHandler): - """ - Simple recursive controller - - Visits links acording to recursionPolicy - """ - - __slots__ = ('url', 'output', 'service', 'behavior', 'settings', 'logger', - 'recursionPolicy', 'handler', 'urls', 'have') - - def __init__ (self, url, output, logger, - service=ChromeService (), behavior=cbehavior.available, \ - settings=defaultSettings, \ - recursionPolicy=DepthLimit (0), handler=[]): - self.url = url - self.output = output - self.service = service - self.behavior = behavior - self.settings = settings - self.logger = logger.bind (context=type(self).__name__, url=url) - self.recursionPolicy = recursionPolicy - self.handler = handler - self.handler.append (self) - - def fetch (self, urls): - """ - Overrideable fetch action for URLs. Defaults to sequential - SinglePageController. - """ - for u in urls: - try: - c = SinglePageController (url=u, output=self.output, service=self.service, - behavior=self.behavior, logger=self.logger, - settings=self.settings, handler=self.handler) - c.run () - except BrowserCrashed: - # this is fine if reported - self.logger.error ('browser crashed', uuid='42582cbe-fb83-47ce-b330-d022a1c3b331') - - def run (self): - self.have = set () - self.urls = set ([self.url]) - - while self.urls: - self.logger.info ('recursing', - uuid='5b8498e4-868d-413c-a67e-004516b8452c', - numurls=len (self.urls)) - - self.have.update (self.urls) - fetchurls = self.urls - self.urls = set () - - # handler appends new urls to self.urls through push() - self.fetch (fetchurls) - - # remove urls we have and apply recursion policy - self.urls.difference_update (self.have) - self.urls = self.recursionPolicy (self.urls) - - def push (self, item): - if isinstance (item, ExtractLinksEvent): - self.logger.debug ('extracted links', - uuid='8ee5e9c9-1130-4c5c-88ff-718508546e0c', links=item.links) - self.urls.update (map (removeFragment, item.links)) - diff --git a/crocoite/task.py b/crocoite/task.py deleted file mode 100644 index 06dd022..0000000 --- a/crocoite/task.py +++ /dev/null @@ -1,146 +0,0 @@ -# Copyright (c) 2017–2018 crocoite contributors -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -""" -Celery distributed tasks -""" - -import os - -from urllib.parse import urlsplit -from datetime import datetime -from operator import attrgetter -from itertools import chain - -def _monkeyPatchSyncTasks (): - """ Result sets don’t support the argument disable_sync_subtasks argument """ - import celery.result - celery.result.assert_will_not_block = lambda: None - -_monkeyPatchSyncTasks () -from celery import Celery -from celery.utils.log import get_task_logger - -from .browser import ChromeService, BrowserCrashed -from .controller import SinglePageController, ControllerSettings, RecursiveController, defaultSettings, DepthLimit, StatsHandler -from . import behavior -from .cli import parseRecursive -from .warc import WarcHandler - -app = Celery ('crocoite.distributed') -app.config_from_object('celeryconfig') -app.conf.task_routes = { - 'crocoite.task.archive': {'queue': 'crocoite.archive'}, - 'crocoite.task.controller': {'queue': 'crocoite.controller'}, - # .chunks is actually a starmap job - 'celery.starmap': {'queue': 'crocoite.archive'}, - } -app.conf.task_default_queue = 'crocoite.default' -# disable prefetching, since our tasks usually run for a _very_ long time -app.conf.worker_prefetch_multiplier = 1 -logger = get_task_logger('crocoite.distributed.archive') - -@app.task(bind=True, track_started=True) -def archive (self, url, settings, enabledBehaviorNames): - """ - Archive a single URL - - Supports these config keys (celeryconfig): - - warc_filename = '{domain}-{date}-{id}.warc.gz' - temp_dir = '/tmp/' - finished_dir = '/tmp/finished' - """ - - parsedUrl = urlsplit (url) - outFile = app.conf.warc_filename.format ( - id=self.request.root_id, - domain=parsedUrl.hostname.replace ('/', '-'), - date=datetime.utcnow ().isoformat (), - ) - outPath = os.path.join (app.conf.temp_dir, outFile) - fd = open (outPath, 'wb') - - handler = [StatsHandler (), WarcHandler (fd)] - enabledBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available)) - settings = ControllerSettings (**settings) - try: - c = SinglePageController (url, fd, behavior=enabledBehavior, - settings=settings, handler=handler) - c.run () - except BrowserCrashed: - # nothing we can do about that - logger.error ('browser crashed for {}'.format (url)) - - os.makedirs (app.conf.finished_dir, exist_ok=True) - outPath = os.path.join (app.conf.finished_dir, outFile) - os.rename (fd.name, outPath) - - return handler[0].stats - -from collections import UserDict - -class IntegerDict (UserDict): - """ Dict with dict/dict per-item arithmetic propagation, i.e. {1: 2}+{1: 1}={1: 3} """ - def __add__ (self, b): - newdict = self.__class__ (self) - for k, v in b.items (): - if k in self: - newdict[k] += v - else: - newdict[k] = v - return newdict - -class DistributedRecursiveController (RecursiveController): - """ Distributed, recursive controller using celery """ - - __slots__ = ('concurrency', 'stats') - - def __init__ (self, url, logger, service=ChromeService (), behavior=behavior.available, \ - settings=defaultSettings, - recursionPolicy=DepthLimit (0), concurrency=1): - super ().__init__ (url, None, service, behavior, logger, settings, recursionPolicy) - self.concurrency = concurrency - self.stats = IntegerDict () - - def fetch (self, urls): - def chunksIter (urls): - for u in urls: - yield (u, self.settings.toDict (), list (map (attrgetter ('name'), self.behavior))) - itemsPerTask = len (urls)//self.concurrency - if itemsPerTask <= 0: - itemsPerTask = len (urls) - result = archive.chunks (chunksIter (urls), itemsPerTask).apply_async ().get () - self.stats = sum (chain.from_iterable (result), self.stats) - -@app.task(bind=True, track_started=True) -def controller (self, url, settings, enabledBehaviorNames, recursive, concurrency): - """ Recursive controller """ - - logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()]) - recursionPolicy = parseRecursive (recursive, url) - enabledBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available)) - settings = ControllerSettings (**settings) - c = DistributedRecursiveController (url, None, logger=logger, behavior=enabledBehavior, - settings=settings, recursionPolicy=recursionPolicy, concurrency=concurrency) - c.run () - return dict (c.stats) - - -- cgit v1.2.3