From 7730e0d64ec895091a0dd7eb0e3c6ce2ed02d981 Mon Sep 17 00:00:00 2001 From: Lars-Dominik Braun Date: Wed, 20 Jun 2018 11:13:37 +0200 Subject: Synchronous SiteLoader event handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously a browser crash stalled the entire grab, since events from pychrome were handled asynchronously in a different thread and exceptions were not propagated to the main thread. Now all browser events are stored in a queue and processed by the main thread, allowing us to handle browser crashes gracefully (more or less). This made the following additional changes necessary: - Clear separation between producer (browser) and consumer (WARC, stats, …) - Behavior scripts now yield events as well, instead of accessing the WARC writer - WARC logging was removed (for now) and WARC writer does not require serialization any more --- crocoite/controller.py | 260 ++++++++++++++++++++++++++++++------------------- 1 file changed, 161 insertions(+), 99 deletions(-) (limited to 'crocoite/controller.py') diff --git a/crocoite/controller.py b/crocoite/controller.py index bc6f948..cdae268 100644 --- a/crocoite/controller.py +++ b/crocoite/controller.py @@ -35,99 +35,151 @@ class ControllerSettings: defaultSettings = ControllerSettings () -import logging -from urllib.parse import urlsplit, urlunsplit +class EventHandler: + """ Abstract base class for event handler """ + + # this handler wants to know about exceptions before they are reraised by + # the controller + acceptException = False + + def push (self, item): + raise NotImplementedError () + +from .browser import BrowserCrashed + +class StatsHandler (EventHandler): + acceptException = True -import pychrome + def __init__ (self): + self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0, 'crashed': 0} + + def push (self, item): + if isinstance (item, Item): + self.stats['requests'] += 1 + if item.failed: + self.stats['failed'] += 1 + else: + self.stats['finished'] += 1 + self.stats['bytesRcv'] += item.encodedDataLength + elif isinstance (item, BrowserCrashed): + self.stats['crashed'] += 1 + +import logging, time +from urllib.parse import urlsplit, urlunsplit from . import behavior as cbehavior -from .browser import ChromeService -from .warc import WarcLoader, SerializingWARCWriter +from .browser import ChromeService, SiteLoader, Item from .util import getFormattedViewportMetrics -def firstOrNone (it): - """ Return first item of iterator it or None if empty """ - try: - return next (it) - except StopIteration: - return None +class ControllerStart: + def __init__ (self, payload): + self.payload = payload class SinglePageController: """ Archive a single page url to file output. + + Dispatches between producer (site loader and behavior scripts) and consumer + (stats, warc writer). """ def __init__ (self, url, output, service=ChromeService (), behavior=cbehavior.available, \ - logger=logging.getLogger(__name__), settings=defaultSettings): + logger=logging.getLogger(__name__), settings=defaultSettings, handler=[]): self.url = url self.output = output self.service = service self.behavior = behavior self.settings = settings self.logger = logger + self.handler = handler + + def processItem (self, item): + if isinstance (item, Exception): + for h in self.handler: + if h.acceptException: + h.push (item) + raise item + + for h in self.handler: + h.push (item) def run (self): - ret = {'stats': None, 'links': []} - - with self.service as browser: - browser = pychrome.Browser (url=browser) - writer = SerializingWARCWriter (self.output, gzip=True) - - with WarcLoader (browser, self.url, writer, - logBuffer=self.settings.logBuffer, - maxBodySize=self.settings.maxBodySize) as l: - version = l.tab.Browser.getVersion () - payload = { - 'software': __package__, - 'browser': version['product'], - 'useragent': version['userAgent'], - 'viewport': getFormattedViewportMetrics (l.tab), - } - warcinfo = writer.create_warcinfo_record (filename=None, info=payload) - writer.write_record (warcinfo) - - # not all behavior scripts are allowed for every URL, filter them - enabledBehavior = list (filter (lambda x: self.url in x, - map (lambda x: x (l), self.behavior))) - linksBehavior = firstOrNone (filter (lambda x: isinstance (x, cbehavior.ExtractLinks), - enabledBehavior)) - - for b in enabledBehavior: - self.logger.debug ('starting onload behavior {}'.format (b.name)) - b.onload () - l.start () - - l.waitIdle (self.settings.idleTimeout, self.settings.timeout) - - for b in enabledBehavior: - self.logger.debug ('starting onstop behavior {}'.format (b.name)) - b.onstop () - - # if we stopped due to timeout, wait for remaining assets - l.waitIdle (2, 60) - l.stop () - - for b in enabledBehavior: - self.logger.debug ('starting onfinish behavior {}'.format (b.name)) - b.onfinish () - - ret['stats'] = l.stats - ret['links'] = linksBehavior.links if linksBehavior else None - writer.flush () - return ret - -from collections import UserDict - -class IntegerDict (UserDict): - """ Dict with dict/dict per-item arithmetic propagation, i.e. {1: 2}+{1: 1}={1: 3} """ - def __add__ (self, b): - newdict = self.__class__ (self) - for k, v in b.items (): - if k in self: - newdict[k] += v - else: - newdict[k] = v - return newdict + def processQueue (): + # XXX: this is very ugly code and does not work well. figure out a + # better way to impose timeouts and still process all items in the + # queue + queue = l.queue + self.logger.debug ('processing at least {} queue items'.format (len (queue))) + while True: + now = time.time () + elapsed = now-start + maxTimeout = max (min (self.settings.idleTimeout, self.settings.timeout-elapsed), 0) + self.logger.debug ('max timeout is {} with elapsed {}'.format (maxTimeout, elapsed)) + # skip waiting if there is work to do. processes all items in + # queue, regardless of timeouts, i.e. you need to make sure the + # queue will actually be empty at some point. + if len (queue) == 0: + if not l.notify.wait (maxTimeout): + assert len (queue) == 0, "event must be sent" + # timed out + self.logger.debug ('timed out after {}'.format (elapsed)) + break + else: + l.notify.clear () + # limit number of items processed here, otherwise timeout won’t + # be checked frequently. this can happen if the site quickly + # loads a lot of items. + for i in range (1000): + try: + item = queue.popleft () + self.logger.debug ('queue pop: {!r}, len now {}'.format (item, len (queue))) + except IndexError: + break + self.processItem (item) + if maxTimeout == 0: + break + + with self.service as browser, SiteLoader (browser, self.url, logger=self.logger) as l: + start = time.time () + + version = l.tab.Browser.getVersion () + payload = { + 'software': __package__, + 'browser': version['product'], + 'useragent': version['userAgent'], + 'viewport': getFormattedViewportMetrics (l.tab), + } + self.processItem (ControllerStart (payload)) + + # not all behavior scripts are allowed for every URL, filter them + enabledBehavior = list (filter (lambda x: self.url in x, + map (lambda x: x (l), self.behavior))) + + for b in enabledBehavior: + self.logger.debug ('starting onload {}'.format (b)) + # I decided against using the queue here to limit memory + # usage (screenshot behavior would put all images into + # queue before we could process them) + for item in b.onload (): + self.processItem (item) + l.start () + + processQueue () + + for b in enabledBehavior: + self.logger.debug ('starting onstop {}'.format (b)) + for item in b.onstop (): + self.processItem (item) + + # if we stopped due to timeout, wait for remaining assets + processQueue () + + for b in enabledBehavior: + self.logger.debug ('starting onfinish {}'.format (b)) + for item in b.onfinish (): + self.processItem (item) + + processQueue () class RecursionPolicy: """ Abstract recursion policy """ @@ -172,7 +224,9 @@ def removeFragment (u): s = urlsplit (u) return urlunsplit ((s.scheme, s.netloc, s.path, s.query, '')) -class RecursiveController: +from .behavior import ExtractLinksEvent + +class RecursiveController (EventHandler): """ Simple recursive controller @@ -181,7 +235,7 @@ class RecursiveController: def __init__ (self, url, output, service=ChromeService (), behavior=cbehavior.available, \ logger=logging.getLogger(__name__), settings=defaultSettings, - recursionPolicy=DepthLimit (0)): + recursionPolicy=DepthLimit (0), handler=[]): self.url = url self.output = output self.service = service @@ -189,37 +243,45 @@ class RecursiveController: self.settings = settings self.logger = logger self.recursionPolicy = recursionPolicy + self.handler = handler + self.handler.append (self) def fetch (self, urls): """ Overrideable fetch action for URLs. Defaults to sequential SinglePageController. """ - result = [] for u in urls: - c = SinglePageController (u, self.output, self.service, - self.behavior, self.logger, self.settings) - result.append (c.run ()) - return result + try: + c = SinglePageController (u, self.output, self.service, + self.behavior, self.logger, self.settings, self.handler) + c.run () + except BrowserCrashed: + # this is fine if reported + self.logger.error ('browser crashed for {}'.format (u)) def run (self): - have = set () - urls = set ([self.url]) - ret = {'stats': IntegerDict ()} - - while urls: - self.logger.info ('retrieving {} urls'.format (len (urls))) - result = self.fetch (urls) - - have.update (urls) - urls = set () - for r in result: - ret['stats'] += r['stats'] - urls.update (map (removeFragment, r['links'])) - urls.difference_update (have) - - urls = self.recursionPolicy (urls) - # everything in ret must be serializeable - ret['stats'] = dict (ret['stats']) - return ret + self.have = set () + self.urls = set ([self.url]) + + while self.urls: + self.logger.info ('retrieving {} urls'.format (len (self.urls))) + + self.have.update (self.urls) + fetchurls = self.urls + self.urls = set () + + # handler appends new urls to self.urls through push() + self.fetch (fetchurls) + + # remove urls we have and apply recursion policy + self.urls.difference_update (self.have) + self.urls = self.recursionPolicy (self.urls) + + def push (self, item): + if isinstance (item, ExtractLinksEvent): + self.logger.debug ('adding extracted links: {}'.format (item.links)) + self.urls.update (map (removeFragment, item.links)) + else: + self.logger.debug ('{} got unhandled event {!r}'.format (self, item)) -- cgit v1.2.3