summaryrefslogtreecommitdiff
path: root/crocoite/task.py
diff options
context:
space:
mode:
authorLars-Dominik Braun <lars@6xq.net>2018-05-04 13:00:05 +0200
committerLars-Dominik Braun <lars@6xq.net>2018-05-04 16:00:05 +0200
commit824c6e91ae6fee1318e79c3ce1a43f98bc697c7b (patch)
treec8662f34d26247202f0d8eebc4f37224f16f18ec /crocoite/task.py
parent48cabe3b0c7e7a760de4d40af55717c024fdc3bf (diff)
downloadcrocoite-824c6e91ae6fee1318e79c3ce1a43f98bc697c7b.tar.gz
crocoite-824c6e91ae6fee1318e79c3ce1a43f98bc697c7b.tar.bz2
crocoite-824c6e91ae6fee1318e79c3ce1a43f98bc697c7b.zip
Add distributed recursive crawls
Diffstat (limited to 'crocoite/task.py')
-rw-r--r--crocoite/task.py59
1 files changed, 56 insertions, 3 deletions
diff --git a/crocoite/task.py b/crocoite/task.py
index 39900a5..52d3b26 100644
--- a/crocoite/task.py
+++ b/crocoite/task.py
@@ -22,19 +22,37 @@
Celery distributed tasks
"""
-import os
+import os, logging
from urllib.parse import urlsplit
from datetime import datetime
+from operator import attrgetter
+from itertools import chain
+def _monkeyPatchSyncTasks ():
+ """ Result sets don’t support the argument disable_sync_subtasks argument """
+ import celery.result
+ celery.result.assert_will_not_block = lambda: None
+
+_monkeyPatchSyncTasks ()
from celery import Celery
from celery.utils.log import get_task_logger
-from .controller import SinglePageController, ControllerSettings
+from .browser import ChromeService
+from .controller import SinglePageController, ControllerSettings, RecursiveController, defaultSettings, DepthLimit, PrefixLimit
from . import behavior
app = Celery ('crocoite.distributed')
app.config_from_object('celeryconfig')
+app.conf.task_routes = {
+ 'crocoite.task.archive': {'queue': 'crocoite.archive'},
+ 'crocoite.task.controller': {'queue': 'crocoite.controller'},
+ # <method>.chunks is actually a starmap job
+ 'celery.starmap': {'queue': 'crocoite.archive'},
+ }
+app.conf.task_default_queue = 'crocoite.default'
+# disable prefetching, since our tasks usually run for a _very_ long time
+app.conf.worker_prefetch_multiplier = 1
logger = get_task_logger('crocoite.distributed.archive')
@app.task(bind=True, track_started=True)
@@ -51,7 +69,7 @@ def archive (self, url, settings, enabledBehaviorNames):
parsedUrl = urlsplit (url)
outFile = app.conf.warc_filename.format (
- id=self.request.id,
+ id=self.request.root_id,
domain=parsedUrl.hostname.replace ('/', '-'),
date=datetime.utcnow ().isoformat (),
)
@@ -69,3 +87,38 @@ def archive (self, url, settings, enabledBehaviorNames):
return ret
+class DistributedRecursiveController (RecursiveController):
+ """ Distributed, recursive controller using celery """
+
+ def __init__ (self, url, service=ChromeService (), behavior=behavior.available, \
+ logger=logging.getLogger(__name__), settings=defaultSettings,
+ recursionPolicy=DepthLimit (0), concurrency=1):
+ super ().__init__ (url, None, service, behavior, logger, settings, recursionPolicy)
+ self.concurrency = concurrency
+
+ def fetch (self, urls):
+ def chunksIter (urls):
+ for u in urls:
+ yield (u, self.settings.toDict (), list (map (attrgetter ('name'), self.behavior)))
+ itemsPerTask = len (urls)//self.concurrency
+ if itemsPerTask <= 0:
+ itemsPerTask = len (urls)
+ return chain.from_iterable (archive.chunks (chunksIter (urls), itemsPerTask).apply_async ().get ())
+
+@app.task(bind=True, track_started=True)
+def controller (self, url, settings, enabledBehaviorNames, recursive, concurrency):
+ """ Recursive controller """
+
+ if recursive is None:
+ recursionPolicy = DepthLimit (0)
+ elif recursive.isdigit ():
+ recursionPolicy = DepthLimit (int (recursive))
+ elif recursive == 'prefix':
+ recursionPolicy = PrefixLimit (url)
+
+ enabledBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available))
+ settings = ControllerSettings (**settings)
+ controller = DistributedRecursiveController (url, None, behavior=enabledBehavior,
+ settings=settings, recursionPolicy=recursionPolicy, concurrency=concurrency)
+ return controller.run ()
+