summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLars-Dominik Braun <lars@6xq.net>2018-05-04 13:00:05 +0200
committerLars-Dominik Braun <lars@6xq.net>2018-05-04 16:00:05 +0200
commit824c6e91ae6fee1318e79c3ce1a43f98bc697c7b (patch)
treec8662f34d26247202f0d8eebc4f37224f16f18ec
parent48cabe3b0c7e7a760de4d40af55717c024fdc3bf (diff)
downloadcrocoite-824c6e91ae6fee1318e79c3ce1a43f98bc697c7b.tar.gz
crocoite-824c6e91ae6fee1318e79c3ce1a43f98bc697c7b.tar.bz2
crocoite-824c6e91ae6fee1318e79c3ce1a43f98bc697c7b.zip
Add distributed recursive crawls
-rw-r--r--crocoite/cli.py41
-rw-r--r--crocoite/controller.py22
-rw-r--r--crocoite/task.py59
3 files changed, 91 insertions, 31 deletions
diff --git a/crocoite/cli.py b/crocoite/cli.py
index 196162e..15fac12 100644
--- a/crocoite/cli.py
+++ b/crocoite/cli.py
@@ -28,15 +28,11 @@ from . import behavior
from .controller import RecursiveController, defaultSettings, \
ControllerSettings, DepthLimit, PrefixLimit
-def stateCallback (data):
- result = data['result']
- if data['status'] == 'PROGRESS':
- print (data['task_id'], result['step'])
-
def main ():
parser = argparse.ArgumentParser(description='Save website to WARC using Google Chrome.')
parser.add_argument('--browser', help='DevTools URL', metavar='URL')
parser.add_argument('--recursive', help='Follow links recursively')
+ parser.add_argument('--concurrency', '-j', type=int, default=1)
parser.add_argument('--timeout', default=10, type=int, help='Maximum time for archival', metavar='SEC')
parser.add_argument('--idle-timeout', default=2, type=int, help='Maximum idle seconds (i.e. no requests)', dest='idleTimeout', metavar='SEC')
parser.add_argument('--log-buffer', default=defaultSettings.logBuffer, type=int, dest='logBuffer', metavar='LINES')
@@ -52,29 +48,28 @@ def main ():
args = parser.parse_args ()
- # prepare args for function
- distributed = args.distributed
- if args.recursive is None:
- recursionPolicy = DepthLimit (0)
- elif args.recursive.isdigit ():
- recursionPolicy = DepthLimit (int (args.recursive))
- elif args.recursive == 'prefix':
- recursionPolicy = PrefixLimit (args.url)
- else:
- parser.error ('Invalid argument for --recursive')
-
- if distributed:
- assert args.recursive is None, "Distributed crawls cannot be recursive right now, sorry"
-
- from .task import archive
+ if args.distributed:
+ if args.browser:
+ parser.error ('--browser is not supported for distributed jobs')
+ from . import task
settings = dict (maxBodySize=args.maxBodySize,
logBuffer=args.logBuffer, idleTimeout=args.idleTimeout,
timeout=args.timeout)
- result = archive.delay (url=args.url, settings=settings,
- enabledBehaviorNames=args.enabledBehaviorNames)
- r = result.get (on_message=stateCallback)
+ result = task.controller.delay (url=args.url, settings=settings,
+ enabledBehaviorNames=args.enabledBehaviorNames,
+ recursive=args.recursive, concurrency=args.concurrency)
+ r = result.get ()
else:
logging.basicConfig (level=logging.INFO)
+
+ if args.recursive is None:
+ recursionPolicy = DepthLimit (0)
+ elif args.recursive.isdigit ():
+ recursionPolicy = DepthLimit (int (args.recursive))
+ elif args.recursive == 'prefix':
+ recursionPolicy = PrefixLimit (args.url)
+ else:
+ parser.error ('Invalid argument for --recursive')
settings = ControllerSettings (maxBodySize=args.maxBodySize,
logBuffer=args.logBuffer, idleTimeout=args.idleTimeout,
timeout=args.timeout)
diff --git a/crocoite/controller.py b/crocoite/controller.py
index 638cb6c..113c139 100644
--- a/crocoite/controller.py
+++ b/crocoite/controller.py
@@ -29,6 +29,10 @@ class ControllerSettings:
self.idleTimeout = idleTimeout
self.timeout = timeout
+ def toDict (self):
+ return dict (logBuffer=self.logBuffer, maxBodySize=self.maxBodySize,
+ idleTimeout=self.idleTimeout, timeout=self.timeout)
+
defaultSettings = ControllerSettings ()
import logging
@@ -186,6 +190,18 @@ class RecursiveController:
self.logger = logger
self.recursionPolicy = recursionPolicy
+ def fetch (self, urls):
+ """
+ Overrideable fetch action for URLs. Defaults to sequential
+ SinglePageController.
+ """
+ result = []
+ for u in urls:
+ c = SinglePageController (u, self.output, self.service,
+ self.behavior, self.logger, self.settings)
+ result.append (c.run ())
+ return result
+
def run (self):
have = set ()
urls = set ([self.url])
@@ -193,11 +209,7 @@ class RecursiveController:
while urls:
self.logger.info ('retrieving {} urls'.format (len (urls)))
- result = []
- for u in urls:
- c = SinglePageController (u, self.output, self.service,
- self.behavior, self.logger, self.settings)
- result.append (c.run ())
+ result = self.fetch (urls)
have.update (urls)
urls = set ()
diff --git a/crocoite/task.py b/crocoite/task.py
index 39900a5..52d3b26 100644
--- a/crocoite/task.py
+++ b/crocoite/task.py
@@ -22,19 +22,37 @@
Celery distributed tasks
"""
-import os
+import os, logging
from urllib.parse import urlsplit
from datetime import datetime
+from operator import attrgetter
+from itertools import chain
+def _monkeyPatchSyncTasks ():
+ """ Result sets don’t support the argument disable_sync_subtasks argument """
+ import celery.result
+ celery.result.assert_will_not_block = lambda: None
+
+_monkeyPatchSyncTasks ()
from celery import Celery
from celery.utils.log import get_task_logger
-from .controller import SinglePageController, ControllerSettings
+from .browser import ChromeService
+from .controller import SinglePageController, ControllerSettings, RecursiveController, defaultSettings, DepthLimit, PrefixLimit
from . import behavior
app = Celery ('crocoite.distributed')
app.config_from_object('celeryconfig')
+app.conf.task_routes = {
+ 'crocoite.task.archive': {'queue': 'crocoite.archive'},
+ 'crocoite.task.controller': {'queue': 'crocoite.controller'},
+ # <method>.chunks is actually a starmap job
+ 'celery.starmap': {'queue': 'crocoite.archive'},
+ }
+app.conf.task_default_queue = 'crocoite.default'
+# disable prefetching, since our tasks usually run for a _very_ long time
+app.conf.worker_prefetch_multiplier = 1
logger = get_task_logger('crocoite.distributed.archive')
@app.task(bind=True, track_started=True)
@@ -51,7 +69,7 @@ def archive (self, url, settings, enabledBehaviorNames):
parsedUrl = urlsplit (url)
outFile = app.conf.warc_filename.format (
- id=self.request.id,
+ id=self.request.root_id,
domain=parsedUrl.hostname.replace ('/', '-'),
date=datetime.utcnow ().isoformat (),
)
@@ -69,3 +87,38 @@ def archive (self, url, settings, enabledBehaviorNames):
return ret
+class DistributedRecursiveController (RecursiveController):
+ """ Distributed, recursive controller using celery """
+
+ def __init__ (self, url, service=ChromeService (), behavior=behavior.available, \
+ logger=logging.getLogger(__name__), settings=defaultSettings,
+ recursionPolicy=DepthLimit (0), concurrency=1):
+ super ().__init__ (url, None, service, behavior, logger, settings, recursionPolicy)
+ self.concurrency = concurrency
+
+ def fetch (self, urls):
+ def chunksIter (urls):
+ for u in urls:
+ yield (u, self.settings.toDict (), list (map (attrgetter ('name'), self.behavior)))
+ itemsPerTask = len (urls)//self.concurrency
+ if itemsPerTask <= 0:
+ itemsPerTask = len (urls)
+ return chain.from_iterable (archive.chunks (chunksIter (urls), itemsPerTask).apply_async ().get ())
+
+@app.task(bind=True, track_started=True)
+def controller (self, url, settings, enabledBehaviorNames, recursive, concurrency):
+ """ Recursive controller """
+
+ if recursive is None:
+ recursionPolicy = DepthLimit (0)
+ elif recursive.isdigit ():
+ recursionPolicy = DepthLimit (int (recursive))
+ elif recursive == 'prefix':
+ recursionPolicy = PrefixLimit (url)
+
+ enabledBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available))
+ settings = ControllerSettings (**settings)
+ controller = DistributedRecursiveController (url, None, behavior=enabledBehavior,
+ settings=settings, recursionPolicy=recursionPolicy, concurrency=concurrency)
+ return controller.run ()
+