From 7730e0d64ec895091a0dd7eb0e3c6ce2ed02d981 Mon Sep 17 00:00:00 2001 From: Lars-Dominik Braun Date: Wed, 20 Jun 2018 11:13:37 +0200 Subject: Synchronous SiteLoader event handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously a browser crash stalled the entire grab, since events from pychrome were handled asynchronously in a different thread and exceptions were not propagated to the main thread. Now all browser events are stored in a queue and processed by the main thread, allowing us to handle browser crashes gracefully (more or less). This made the following additional changes necessary: - Clear separation between producer (browser) and consumer (WARC, stats, …) - Behavior scripts now yield events as well, instead of accessing the WARC writer - WARC logging was removed (for now) and WARC writer does not require serialization any more --- crocoite/task.py | 38 +++++++++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 7 deletions(-) (limited to 'crocoite/task.py') diff --git a/crocoite/task.py b/crocoite/task.py index e93cfde..48fe5d8 100644 --- a/crocoite/task.py +++ b/crocoite/task.py @@ -38,10 +38,11 @@ _monkeyPatchSyncTasks () from celery import Celery from celery.utils.log import get_task_logger -from .browser import ChromeService -from .controller import SinglePageController, ControllerSettings, RecursiveController, defaultSettings, DepthLimit, PrefixLimit +from .browser import ChromeService, BrowserCrashed +from .controller import SinglePageController, ControllerSettings, RecursiveController, defaultSettings, DepthLimit, PrefixLimit, StatsHandler from . import behavior from .cli import parseRecursive +from .warc import WarcHandler app = Celery ('crocoite.distributed') app.config_from_object('celeryconfig') @@ -77,16 +78,35 @@ def archive (self, url, settings, enabledBehaviorNames): outPath = os.path.join (app.conf.temp_dir, outFile) fd = open (outPath, 'wb') + handler = [StatsHandler (), WarcHandler (fd)] enabledBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available)) settings = ControllerSettings (**settings) - controller = SinglePageController (url, fd, behavior=enabledBehavior, settings=settings) - ret = controller.run () + try: + controller = SinglePageController (url, fd, behavior=enabledBehavior, + settings=settings, handler=handler) + controller.run () + except BrowserCrashed: + # nothing we can do about that + logger.error ('browser crashed for {}'.format (url)) os.makedirs (app.conf.finished_dir, exist_ok=True) outPath = os.path.join (app.conf.finished_dir, outFile) os.rename (fd.name, outPath) - return ret + return handler[0].stats + +from collections import UserDict + +class IntegerDict (UserDict): + """ Dict with dict/dict per-item arithmetic propagation, i.e. {1: 2}+{1: 1}={1: 3} """ + def __add__ (self, b): + newdict = self.__class__ (self) + for k, v in b.items (): + if k in self: + newdict[k] += v + else: + newdict[k] = v + return newdict class DistributedRecursiveController (RecursiveController): """ Distributed, recursive controller using celery """ @@ -96,6 +116,7 @@ class DistributedRecursiveController (RecursiveController): recursionPolicy=DepthLimit (0), concurrency=1): super ().__init__ (url, None, service, behavior, logger, settings, recursionPolicy) self.concurrency = concurrency + self.stats = IntegerDict () def fetch (self, urls): def chunksIter (urls): @@ -104,7 +125,8 @@ class DistributedRecursiveController (RecursiveController): itemsPerTask = len (urls)//self.concurrency if itemsPerTask <= 0: itemsPerTask = len (urls) - return chain.from_iterable (archive.chunks (chunksIter (urls), itemsPerTask).apply_async ().get ()) + result = archive.chunks (chunksIter (urls), itemsPerTask).apply_async ().get () + self.stats = sum (chain.from_iterable (result), self.stats) @app.task(bind=True, track_started=True) def controller (self, url, settings, enabledBehaviorNames, recursive, concurrency): @@ -115,5 +137,7 @@ def controller (self, url, settings, enabledBehaviorNames, recursive, concurrenc settings = ControllerSettings (**settings) controller = DistributedRecursiveController (url, None, behavior=enabledBehavior, settings=settings, recursionPolicy=recursionPolicy, concurrency=concurrency) - return controller.run () + controller.run () + return dict (controller.stats) + -- cgit v1.2.3