summaryrefslogtreecommitdiff
path: root/crocoite/task.py
diff options
context:
space:
mode:
authorLars-Dominik Braun <lars@6xq.net>2018-06-20 11:13:37 +0200
committerLars-Dominik Braun <lars@6xq.net>2018-06-20 11:17:25 +0200
commit7730e0d64ec895091a0dd7eb0e3c6ce2ed02d981 (patch)
tree15d0ca2e0374b7d00a05d5dd5de1e48838e71feb /crocoite/task.py
parent06a06463c0367718b2ed1b2b7f081cff6ca998a0 (diff)
downloadcrocoite-7730e0d64ec895091a0dd7eb0e3c6ce2ed02d981.tar.gz
crocoite-7730e0d64ec895091a0dd7eb0e3c6ce2ed02d981.tar.bz2
crocoite-7730e0d64ec895091a0dd7eb0e3c6ce2ed02d981.zip
Synchronous SiteLoader event handling
Previously a browser crash stalled the entire grab, since events from pychrome were handled asynchronously in a different thread and exceptions were not propagated to the main thread. Now all browser events are stored in a queue and processed by the main thread, allowing us to handle browser crashes gracefully (more or less). This made the following additional changes necessary: - Clear separation between producer (browser) and consumer (WARC, stats, …) - Behavior scripts now yield events as well, instead of accessing the WARC writer - WARC logging was removed (for now) and WARC writer does not require serialization any more
Diffstat (limited to 'crocoite/task.py')
-rw-r--r--crocoite/task.py38
1 files changed, 31 insertions, 7 deletions
diff --git a/crocoite/task.py b/crocoite/task.py
index e93cfde..48fe5d8 100644
--- a/crocoite/task.py
+++ b/crocoite/task.py
@@ -38,10 +38,11 @@ _monkeyPatchSyncTasks ()
from celery import Celery
from celery.utils.log import get_task_logger
-from .browser import ChromeService
-from .controller import SinglePageController, ControllerSettings, RecursiveController, defaultSettings, DepthLimit, PrefixLimit
+from .browser import ChromeService, BrowserCrashed
+from .controller import SinglePageController, ControllerSettings, RecursiveController, defaultSettings, DepthLimit, PrefixLimit, StatsHandler
from . import behavior
from .cli import parseRecursive
+from .warc import WarcHandler
app = Celery ('crocoite.distributed')
app.config_from_object('celeryconfig')
@@ -77,16 +78,35 @@ def archive (self, url, settings, enabledBehaviorNames):
outPath = os.path.join (app.conf.temp_dir, outFile)
fd = open (outPath, 'wb')
+ handler = [StatsHandler (), WarcHandler (fd)]
enabledBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available))
settings = ControllerSettings (**settings)
- controller = SinglePageController (url, fd, behavior=enabledBehavior, settings=settings)
- ret = controller.run ()
+ try:
+ controller = SinglePageController (url, fd, behavior=enabledBehavior,
+ settings=settings, handler=handler)
+ controller.run ()
+ except BrowserCrashed:
+ # nothing we can do about that
+ logger.error ('browser crashed for {}'.format (url))
os.makedirs (app.conf.finished_dir, exist_ok=True)
outPath = os.path.join (app.conf.finished_dir, outFile)
os.rename (fd.name, outPath)
- return ret
+ return handler[0].stats
+
+from collections import UserDict
+
+class IntegerDict (UserDict):
+ """ Dict with dict/dict per-item arithmetic propagation, i.e. {1: 2}+{1: 1}={1: 3} """
+ def __add__ (self, b):
+ newdict = self.__class__ (self)
+ for k, v in b.items ():
+ if k in self:
+ newdict[k] += v
+ else:
+ newdict[k] = v
+ return newdict
class DistributedRecursiveController (RecursiveController):
""" Distributed, recursive controller using celery """
@@ -96,6 +116,7 @@ class DistributedRecursiveController (RecursiveController):
recursionPolicy=DepthLimit (0), concurrency=1):
super ().__init__ (url, None, service, behavior, logger, settings, recursionPolicy)
self.concurrency = concurrency
+ self.stats = IntegerDict ()
def fetch (self, urls):
def chunksIter (urls):
@@ -104,7 +125,8 @@ class DistributedRecursiveController (RecursiveController):
itemsPerTask = len (urls)//self.concurrency
if itemsPerTask <= 0:
itemsPerTask = len (urls)
- return chain.from_iterable (archive.chunks (chunksIter (urls), itemsPerTask).apply_async ().get ())
+ result = archive.chunks (chunksIter (urls), itemsPerTask).apply_async ().get ()
+ self.stats = sum (chain.from_iterable (result), self.stats)
@app.task(bind=True, track_started=True)
def controller (self, url, settings, enabledBehaviorNames, recursive, concurrency):
@@ -115,5 +137,7 @@ def controller (self, url, settings, enabledBehaviorNames, recursive, concurrenc
settings = ControllerSettings (**settings)
controller = DistributedRecursiveController (url, None, behavior=enabledBehavior,
settings=settings, recursionPolicy=recursionPolicy, concurrency=concurrency)
- return controller.run ()
+ controller.run ()
+ return dict (controller.stats)
+