From 824c6e91ae6fee1318e79c3ce1a43f98bc697c7b Mon Sep 17 00:00:00 2001 From: Lars-Dominik Braun Date: Fri, 4 May 2018 13:00:05 +0200 Subject: Add distributed recursive crawls --- crocoite/task.py | 59 +++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 56 insertions(+), 3 deletions(-) (limited to 'crocoite/task.py') diff --git a/crocoite/task.py b/crocoite/task.py index 39900a5..52d3b26 100644 --- a/crocoite/task.py +++ b/crocoite/task.py @@ -22,19 +22,37 @@ Celery distributed tasks """ -import os +import os, logging from urllib.parse import urlsplit from datetime import datetime +from operator import attrgetter +from itertools import chain +def _monkeyPatchSyncTasks (): + """ Result sets don’t support the argument disable_sync_subtasks argument """ + import celery.result + celery.result.assert_will_not_block = lambda: None + +_monkeyPatchSyncTasks () from celery import Celery from celery.utils.log import get_task_logger -from .controller import SinglePageController, ControllerSettings +from .browser import ChromeService +from .controller import SinglePageController, ControllerSettings, RecursiveController, defaultSettings, DepthLimit, PrefixLimit from . import behavior app = Celery ('crocoite.distributed') app.config_from_object('celeryconfig') +app.conf.task_routes = { + 'crocoite.task.archive': {'queue': 'crocoite.archive'}, + 'crocoite.task.controller': {'queue': 'crocoite.controller'}, + # .chunks is actually a starmap job + 'celery.starmap': {'queue': 'crocoite.archive'}, + } +app.conf.task_default_queue = 'crocoite.default' +# disable prefetching, since our tasks usually run for a _very_ long time +app.conf.worker_prefetch_multiplier = 1 logger = get_task_logger('crocoite.distributed.archive') @app.task(bind=True, track_started=True) @@ -51,7 +69,7 @@ def archive (self, url, settings, enabledBehaviorNames): parsedUrl = urlsplit (url) outFile = app.conf.warc_filename.format ( - id=self.request.id, + id=self.request.root_id, domain=parsedUrl.hostname.replace ('/', '-'), date=datetime.utcnow ().isoformat (), ) @@ -69,3 +87,38 @@ def archive (self, url, settings, enabledBehaviorNames): return ret +class DistributedRecursiveController (RecursiveController): + """ Distributed, recursive controller using celery """ + + def __init__ (self, url, service=ChromeService (), behavior=behavior.available, \ + logger=logging.getLogger(__name__), settings=defaultSettings, + recursionPolicy=DepthLimit (0), concurrency=1): + super ().__init__ (url, None, service, behavior, logger, settings, recursionPolicy) + self.concurrency = concurrency + + def fetch (self, urls): + def chunksIter (urls): + for u in urls: + yield (u, self.settings.toDict (), list (map (attrgetter ('name'), self.behavior))) + itemsPerTask = len (urls)//self.concurrency + if itemsPerTask <= 0: + itemsPerTask = len (urls) + return chain.from_iterable (archive.chunks (chunksIter (urls), itemsPerTask).apply_async ().get ()) + +@app.task(bind=True, track_started=True) +def controller (self, url, settings, enabledBehaviorNames, recursive, concurrency): + """ Recursive controller """ + + if recursive is None: + recursionPolicy = DepthLimit (0) + elif recursive.isdigit (): + recursionPolicy = DepthLimit (int (recursive)) + elif recursive == 'prefix': + recursionPolicy = PrefixLimit (url) + + enabledBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available)) + settings = ControllerSettings (**settings) + controller = DistributedRecursiveController (url, None, behavior=enabledBehavior, + settings=settings, recursionPolicy=recursionPolicy, concurrency=concurrency) + return controller.run () + -- cgit v1.2.3