From 7730e0d64ec895091a0dd7eb0e3c6ce2ed02d981 Mon Sep 17 00:00:00 2001 From: Lars-Dominik Braun Date: Wed, 20 Jun 2018 11:13:37 +0200 Subject: Synchronous SiteLoader event handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously a browser crash stalled the entire grab, since events from pychrome were handled asynchronously in a different thread and exceptions were not propagated to the main thread. Now all browser events are stored in a queue and processed by the main thread, allowing us to handle browser crashes gracefully (more or less). This made the following additional changes necessary: - Clear separation between producer (browser) and consumer (WARC, stats, …) - Behavior scripts now yield events as well, instead of accessing the WARC writer - WARC logging was removed (for now) and WARC writer does not require serialization any more --- crocoite/warc.py | 165 +++++++++++++++++++++---------------------------------- 1 file changed, 64 insertions(+), 101 deletions(-) (limited to 'crocoite/warc.py') diff --git a/crocoite/warc.py b/crocoite/warc.py index 3fd65e4..fd8ce8d 100644 --- a/crocoite/warc.py +++ b/crocoite/warc.py @@ -24,7 +24,6 @@ Classes writing data to WARC files import logging import json -from http.server import BaseHTTPRequestHandler from io import BytesIO from warcio.statusandheaders import StatusAndHeaders from urllib.parse import urlsplit @@ -36,95 +35,18 @@ from queue import Queue from warcio.timeutils import datetime_to_iso_date from warcio.warcwriter import WARCWriter -from .browser import AccountingSiteLoader from .util import packageUrl -from .controller import defaultSettings - -class SerializingWARCWriter (WARCWriter): - """ - Serializing WARC writer using separate writer thread and queue for - non-blocking operation - - Needs an explicit .flush() before deletion. - """ - - def __init__ (self, filebuf, *args, **kwargs): - WARCWriter.__init__ (self, filebuf, *args, **kwargs) - self.queue = Queue () - self.thread = Thread (target=self._run_writer) - self.thread.start () - - def flush (self): - self.queue.put (None) - self.thread.join () - self.queue = None - self.thread = None - - def _run_writer (self): - while True: - item = self.queue.get () - if not item: - break - out, record = item - WARCWriter._write_warc_record (self, out, record) - - def _write_warc_record (self, out, record): - self.queue.put ((out, record)) - -class WARCLogHandler (BufferingHandler): - """ - Buffered log handler, flushing to warcio - """ +from .controller import defaultSettings, EventHandler, ControllerStart +from .behavior import Script, DomSnapshotEvent, ScreenshotEvent +from .browser import Item - contentType = 'text/plain; charset=utf-8' - - def __init__ (self, capacity, warcfile): - BufferingHandler.__init__ (self, capacity) - self.warcfile = warcfile - - def flush (self): - self.acquire () - try: - if self.buffer: - buf = '' - for record in self.buffer: - buf += self.format (record) - buf += '\n' - # XXX: record type? - record = self.warcfile.create_warc_record ( - packageUrl ('log'), 'metadata', - payload=BytesIO (buf.encode ('utf8')), - warc_headers_dict={'Content-Type': self.contentType}) - self.warcfile.write_record(record) - self.buffer = [] - finally: - self.release () - -class WarcLoader (AccountingSiteLoader): - def __init__ (self, browser, url, writer, +class WarcHandler (EventHandler): + def __init__ (self, fd, logger=logging.getLogger(__name__), - logBuffer=defaultSettings.logBuffer, maxBodySize=defaultSettings.maxBodySize): - super ().__init__ (browser, url, logger) - self.writer = writer + self.logger = logger + self.writer = WARCWriter (fd, gzip=True) self.maxBodySize = maxBodySize - self.warcLogger = WARCLogHandler (logBuffer, writer) - self.logger.addHandler (self.warcLogger) - - def __exit__ (self, exc_type, exc_value, traceback): - self.logger.removeHandler (self.warcLogger) - self.warcLogger.flush () - return super ().__exit__ (exc_type, exc_value, traceback) - - @staticmethod - def getStatusText (response): - text = response.get ('statusText') - if text: - return text - text = BaseHTTPRequestHandler.responses.get (response['status']) - if text: - return text[0] - return 'No status text available' def _writeRequest (self, item): writer = self.writer @@ -154,13 +76,12 @@ class WarcLoader (AccountingSiteLoader): return record.rec_headers['WARC-Record-ID'] - def _getBody (self, item, redirect): + def _getBody (self, item): reqId = item.id - resp = item.response rawBody = b'' base64Encoded = False - if redirect: + if item.isRedirect: # redirects reuse the same request, thus we cannot safely retrieve # the body (i.e getResponseBody may return the new location’s # body). This is fine. @@ -173,7 +94,7 @@ class WarcLoader (AccountingSiteLoader): rawBody, base64Encoded = item.body return rawBody, base64Encoded - def _writeResponse (self, item, redirect, concurrentTo, rawBody, base64Encoded): + def _writeResponse (self, item, concurrentTo, rawBody, base64Encoded): writer = self.writer reqId = item.id resp = item.response @@ -192,7 +113,7 @@ class WarcLoader (AccountingSiteLoader): } httpHeaders = StatusAndHeaders('{} {}'.format (resp['status'], - self.getStatusText (resp)), item.responseHeaders, + item.statusText), item.responseHeaders, protocol='HTTP/1.1') # Content is saved decompressed and decoded, remove these headers @@ -216,21 +137,63 @@ class WarcLoader (AccountingSiteLoader): http_headers=httpHeaders) writer.write_record(record) - def loadingFinished (self, item, redirect=False): - super ().loadingFinished (item, redirect) - + def _writeScript (self, item): writer = self.writer - - req = item.request - reqId = item.id - resp = item.response - url = urlsplit (resp['url']) - + encoding = 'utf-8' + record = writer.create_warc_record (packageUrl ('script/{}'.format (item.path)), 'metadata', + payload=BytesIO (str (item).encode (encoding)), + warc_headers_dict={'Content-Type': 'application/javascript; charset={}'.format (encoding)}) + writer.write_record (record) + + def _writeItem (self, item): + if item.failed: + # should have been handled by the logger already + return try: # write neither request nor response if we cannot retrieve the body - rawBody, base64Encoded = self._getBody (item, redirect) + rawBody, base64Encoded = self._getBody (item) concurrentTo = self._writeRequest (item) - self._writeResponse (item, redirect, concurrentTo, rawBody, base64Encoded) + self._writeResponse (item, concurrentTo, rawBody, base64Encoded) except ValueError as e: self.logger.error (e.args[0]) + def _writeDomSnapshot (self, item): + writer = self.writer + httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1') + record = writer.create_warc_record (item.url, 'response', + payload=BytesIO (item.document), + http_headers=httpHeaders, + warc_headers_dict={'X-DOM-Snapshot': str (True), + 'X-Chrome-Viewport': item.viewport}) + writer.write_record (record) + + def _writeScreenshot (self, item): + writer = self.writer + url = packageUrl ('screenshot-{}-{}.png'.format (0, item.yoff)) + record = writer.create_warc_record (url, 'resource', + payload=BytesIO (item.data), warc_headers_dict={'Content-Type': 'image/png'}) + writer.write_record (record) + + def _writeControllerStart (self, item): + writer = self.writer + warcinfo = writer.create_warcinfo_record (filename=None, info=item.payload) + writer.write_record (warcinfo) + + route = {Script: _writeScript, + Item: _writeItem, + DomSnapshotEvent: _writeDomSnapshot, + ScreenshotEvent: _writeScreenshot, + ControllerStart: _writeControllerStart, + } + + def push (self, item): + processed = False + for k, v in self.route.items (): + if isinstance (item, k): + v (self, item) + processed = True + break + + if not processed: + self.logger.debug ('unknown event {}'.format (repr (item))) + -- cgit v1.2.3