diff options
author | Lars-Dominik Braun <lars@6xq.net> | 2018-06-20 11:13:37 +0200 |
---|---|---|
committer | Lars-Dominik Braun <lars@6xq.net> | 2018-06-20 11:17:25 +0200 |
commit | 7730e0d64ec895091a0dd7eb0e3c6ce2ed02d981 (patch) | |
tree | 15d0ca2e0374b7d00a05d5dd5de1e48838e71feb | |
parent | 06a06463c0367718b2ed1b2b7f081cff6ca998a0 (diff) | |
download | crocoite-7730e0d64ec895091a0dd7eb0e3c6ce2ed02d981.tar.gz crocoite-7730e0d64ec895091a0dd7eb0e3c6ce2ed02d981.tar.bz2 crocoite-7730e0d64ec895091a0dd7eb0e3c6ce2ed02d981.zip |
Synchronous SiteLoader event handling
Previously a browser crash stalled the entire grab, since events from
pychrome were handled asynchronously in a different thread and
exceptions were not propagated to the main thread.
Now all browser events are stored in a queue and processed by the main
thread, allowing us to handle browser crashes gracefully (more or less).
This made the following additional changes necessary:
- Clear separation between producer (browser) and consumer (WARC, stats,
…)
- Behavior scripts now yield events as well, instead of accessing the
WARC writer
- WARC logging was removed (for now) and WARC writer does not require
serialization any more
-rw-r--r-- | contrib/celerycrocoite.py | 9 | ||||
-rw-r--r-- | crocoite/behavior.py | 124 | ||||
-rw-r--r-- | crocoite/browser.py | 417 | ||||
-rw-r--r-- | crocoite/cli.py | 19 | ||||
-rw-r--r-- | crocoite/controller.py | 260 | ||||
-rw-r--r-- | crocoite/task.py | 38 | ||||
-rw-r--r-- | crocoite/warc.py | 165 |
7 files changed, 518 insertions, 514 deletions
diff --git a/contrib/celerycrocoite.py b/contrib/celerycrocoite.py index 26c35ce..d0a02e9 100644 --- a/contrib/celerycrocoite.py +++ b/contrib/celerycrocoite.py @@ -88,10 +88,9 @@ def checkCompletedJobs (bot, jobs): if Identifier (channel) not in bot.channels: continue try: - result = handle.get (timeout=0.1) - stats = result['stats'] - bot.msg (channel, '{}: {} ({}) finished. {} requests, {} failed, {} received.'.format (user, url, - handle.id, stats['requests'], stats['failed'], + stats = handle.get (timeout=0.1) + bot.msg (channel, '{}: {} ({}) finished. {} crashed, {} requests, {} failed, {} received.'.format (user, url, + handle.id, stats['crashed'], stats['requests'], stats['failed'], prettyBytes (stats['bytesRcv']))) delete.add (handle.id) except celery.exceptions.TimeoutError: @@ -198,7 +197,7 @@ def archive (bot, trigger): logBuffer=defaultSettings.logBuffer, idleTimeout=args.idleTimeout, timeout=args.timeout) args = dict (url=args.url, - enabledBehaviorNames=list (behavior.availableNames-blacklistedBehavior), + enabledBehaviorNames=list (set (behavior.availableMap.keys())-blacklistedBehavior), settings=settings, recursive=args.recursive, concurrency=args.concurrency) q = bot.memory['crocoite']['q'] diff --git a/crocoite/behavior.py b/crocoite/behavior.py index 95e8160..fd8fff8 100644 --- a/crocoite/behavior.py +++ b/crocoite/behavior.py @@ -22,21 +22,44 @@ Generic and per-site behavior scripts """ -import logging +import logging, time from io import BytesIO from urllib.parse import urlsplit import os.path import pkg_resources from base64 import b64decode +from collections import OrderedDict + +from html5lib.serializer import HTMLSerializer +from warcio.statusandheaders import StatusAndHeaders +from pychrome.exceptions import TimeoutException from .util import randomString, packageUrl, getFormattedViewportMetrics from . import html from .html import StripAttributeFilter, StripTagFilter, ChromeTreeWalker -from html5lib.serializer import HTMLSerializer -from warcio.statusandheaders import StatusAndHeaders +from .browser import SiteLoader logger = logging.getLogger(__name__) +class Script: + """ A JavaScript resource """ + def __init__ (self, path=None, encoding='utf-8'): + self.path = path + if path: + self.data = pkg_resources.resource_string (__name__, os.path.join ('data', path)).decode (encoding) + + def __repr__ (self): + return '<Script {}>'.format (self.path) + + def __str__ (self): + return self.data + + @classmethod + def fromStr (cls, data): + s = Script () + s.data = data + return s + class Behavior: # unique behavior name name = None @@ -51,27 +74,20 @@ class Behavior: """ return True - def loadScript (self, path, encoding='utf-8'): - return pkg_resources.resource_string (__name__, os.path.join ('data', path)).decode (encoding) - - def useScript (self, script, encoding='utf-8'): - writer = self.loader.writer - record = writer.create_warc_record (packageUrl ('script'), 'metadata', - payload=BytesIO (script.encode (encoding)), - warc_headers_dict={'Content-Type': 'application/javascript; charset={}'.format (encoding)}) - writer.write_record (record) + def __repr__ (self): + return '<Behavior {}>'.format (self.name) def onload (self): """ Before loading the page """ - pass + yield from () def onstop (self): """ Before page loading is stopped """ - pass + yield from () def onfinish (self): """ After the site has stopped loading """ - pass + yield from () class HostnameFilter: """ Limit behavior script to hostname """ @@ -90,15 +106,16 @@ class JsOnload (Behavior): def __init__ (self, loader): super ().__init__ (loader) - self.script = self.loadScript (self.scriptPath) + self.script = Script (self.scriptPath) self.scriptHandle = None def onload (self): - self.useScript (self.script) - self.scriptHandle = self.loader.tab.Page.addScriptToEvaluateOnNewDocument (source=self.script)['identifier'] + yield self.script + self.scriptHandle = self.loader.tab.Page.addScriptToEvaluateOnNewDocument (source=str (self.script))['identifier'] def onstop (self): self.loader.tab.Page.removeScriptToEvaluateOnNewDocument (identifier=self.scriptHandle) + yield from () ### Generic scripts ### @@ -110,15 +127,15 @@ class Scroll (JsOnload): super ().__init__ (loader) stopVarname = '__' + __package__ + '_stop__' newStopVarname = randomString () - self.script = self.script.replace (stopVarname, newStopVarname) + self.script.data = self.script.data.replace (stopVarname, newStopVarname) self.stopVarname = newStopVarname def onstop (self): super ().onstop () # removing the script does not stop it if running - script = '{} = true; window.scrollTo (0, 0);'.format (self.stopVarname) - self.useScript (script) - self.loader.tab.Runtime.evaluate (expression=script, returnByValue=True) + script = Script.fromStr ('{} = true; window.scrollTo (0, 0);'.format (self.stopVarname)) + yield script + self.loader.tab.Runtime.evaluate (expression=str (script), returnByValue=True) class EmulateScreenMetrics (Behavior): name = 'emulateScreenMetrics' @@ -147,9 +164,16 @@ class EmulateScreenMetrics (Behavior): for s in sizes: tab.Emulation.setDeviceMetricsOverride (**s) # give the browser time to re-eval page and start requests - l.wait (1) + time.sleep (1) # XXX: this seems to be broken, it does not clear the override #tab.Emulation.clearDeviceMetricsOverride () + yield from () + +class DomSnapshotEvent: + def __init__ (self, url, document, viewport): + self.url = url + self.document = document + self.viewport = viewport class DomSnapshot (Behavior): """ @@ -166,14 +190,13 @@ class DomSnapshot (Behavior): def __init__ (self, loader): super ().__init__ (loader) - self.script = self.loadScript ('canvas-snapshot.js') + self.script = Script ('canvas-snapshot.js') def onfinish (self): tab = self.loader.tab - writer = self.loader.writer - self.useScript (self.script) - tab.Runtime.evaluate (expression=self.script, returnByValue=True) + yield self.script + tab.Runtime.evaluate (expression=str (self.script), returnByValue=True) viewport = getFormattedViewportMetrics (tab) dom = tab.DOM.getDocument (depth=-1, pierce=True) @@ -196,13 +219,12 @@ class DomSnapshot (Behavior): disallowedAttributes = html.eventAttributes stream = StripAttributeFilter (StripTagFilter (walker, disallowedTags), disallowedAttributes) serializer = HTMLSerializer () - httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1') - record = writer.create_warc_record (doc['documentURL'], 'response', - payload=BytesIO (serializer.render (stream, 'utf-8')), - http_headers=httpHeaders, - warc_headers_dict={'X-DOM-Snapshot': str (True), - 'X-Chrome-Viewport': viewport}) - writer.write_record (record) + yield DomSnapshotEvent (doc['documentURL'], serializer.render (stream, 'utf-8'), viewport) + +class ScreenshotEvent: + def __init__ (self, yoff, data): + self.yoff = yoff + self.data = data class Screenshot (Behavior): """ @@ -213,7 +235,6 @@ class Screenshot (Behavior): def onfinish (self): tab = self.loader.tab - writer = self.loader.writer # see https://github.com/GoogleChrome/puppeteer/blob/230be28b067b521f0577206899db01f0ca7fc0d2/examples/screenshots-longpage.js # Hardcoded max texture size of 16,384 (crbug.com/770769) @@ -227,10 +248,7 @@ class Screenshot (Behavior): height = min (contentSize['height'] - yoff, maxDim) clip = {'x': 0, 'y': yoff, 'width': width, 'height': height, 'scale': 1} data = b64decode (tab.Page.captureScreenshot (format='png', clip=clip)['data']) - url = packageUrl ('screenshot-{}-{}.png'.format (0, yoff)) - record = writer.create_warc_record (url, 'resource', - payload=BytesIO (data), warc_headers_dict={'Content-Type': 'image/png'}) - writer.write_record (record) + yield ScreenshotEvent (yoff, data) class Click (JsOnload): """ Generic link clicking """ @@ -238,6 +256,10 @@ class Click (JsOnload): name = 'click' scriptPath = 'click.js' +class ExtractLinksEvent: + def __init__ (self, links): + self.links = links + class ExtractLinks (Behavior): """ Extract links from a page using JavaScript @@ -250,19 +272,33 @@ class ExtractLinks (Behavior): def __init__ (self, loader): super ().__init__ (loader) - self.script = self.loadScript ('extract-links.js') + self.script = Script ('extract-links.js') self.links = None def onfinish (self): tab = self.loader.tab - self.useScript (self.script) - result = tab.Runtime.evaluate (expression=self.script, returnByValue=True) - self.links = list (set (result['result']['value'])) + yield self.script + result = tab.Runtime.evaluate (expression=str (self.script), returnByValue=True) + yield ExtractLinksEvent (list (set (result['result']['value']))) + +class Crash (Behavior): + """ Crash the browser. For testing only. Obviously. """ + + name = 'crash' + + def onstop (self): + try: + self.loader.tab.Page.crash (_timeout=1) + except TimeoutException: + pass + yield from () # available behavior scripts. Order matters, move those modifying the page # towards the end of available generic = [Scroll, EmulateScreenMetrics, Click, ExtractLinks] perSite = [] available = generic + perSite + [Screenshot, DomSnapshot] -availableNames = set (map (lambda x: x.name, available)) +#available.append (Crash) +# order matters, since behavior can modify the page (dom snapshots, for instance) +availableMap = OrderedDict (map (lambda x: (x.name, x), available)) diff --git a/crocoite/browser.py b/crocoite/browser.py index a891ce7..57d0dd0 100644 --- a/crocoite/browser.py +++ b/crocoite/browser.py @@ -25,6 +25,10 @@ Chrome browser interactions. import logging from urllib.parse import urlsplit from base64 import b64decode +from http.server import BaseHTTPRequestHandler +from collections import deque +from threading import Event + import pychrome class Item: @@ -37,6 +41,8 @@ class Item: self.chromeRequest = None self.chromeResponse = None self.chromeFinished = None + self.isRedirect = False + self.failed = False def __repr__ (self): return '<Item {}>'.format (self.request['url']) @@ -47,6 +53,7 @@ class Item: @property def response (self): + assert not self.failed, "you must not access response if failed is set" return self.chromeResponse['response'] @property @@ -73,7 +80,7 @@ class Item: def body (self): """ Return response body or None """ try: - body = self.tab.Network.getResponseBody (requestId=self.id, _timeout=60) + body = self.tab.Network.getResponseBody (requestId=self.id, _timeout=10) rawBody = body['body'] base64Encoded = body['base64Encoded'] if base64Encoded: @@ -93,7 +100,7 @@ class Item: return postData.encode ('utf8'), False elif req.get ('hasPostData', False): try: - return b64decode (self.tab.Network.getRequestPostData (requestId=self.id, _timeout=60)['postData']), True + return b64decode (self.tab.Network.getRequestPostData (requestId=self.id, _timeout=10)['postData']), True except (pychrome.exceptions.CallMethodException, pychrome.exceptions.TimeoutException): raise ValueError ('Cannot fetch request body') return None, False @@ -108,6 +115,16 @@ class Item: def responseHeaders (self): return self._unfoldHeaders (self.response['headers']) + @property + def statusText (self): + text = self.response.get ('statusText') + if text: + return text + text = BaseHTTPRequestHandler.responses.get (self.response['status']) + if text: + return text[0] + return 'No status text available' + @staticmethod def _unfoldHeaders (headers): """ @@ -129,10 +146,21 @@ class Item: def setFinished (self, finished): self.chromeFinished = finished +class BrowserCrashed (Exception): + pass + class SiteLoader: """ Load site in Chrome and monitor network requests + Chrome’s raw devtools events are preprocessed here (asynchronously, in a + different thread, spawned by pychrome) and put into a deque. There + are two reasons for this: First of all, it makes consumer exception + handling alot easier (no need to propagate them to the main thread). And + secondly, browser crashes must be handled before everything else, as they + result in a loss of communication with the browser itself (i.e. we can’t + fetch a resource’s body any more). + XXX: track popup windows/new tabs and close them """ @@ -140,22 +168,22 @@ class SiteLoader: def __init__ (self, browser, url, logger=logging.getLogger(__name__)): self.requests = {} - self.browser = browser + self.browser = pychrome.Browser (url=browser) self.url = url self.logger = logger - - self.tab = browser.new_tab() + self.queue = deque () + self.notify = Event () def __enter__ (self): - tab = self.tab + tab = self.tab = self.browser.new_tab() # setup callbacks tab.Network.requestWillBeSent = self._requestWillBeSent tab.Network.responseReceived = self._responseReceived tab.Network.loadingFinished = self._loadingFinished tab.Network.loadingFailed = self._loadingFailed tab.Log.entryAdded = self._entryAdded - #tab.Page.loadEventFired = loadEventFired tab.Page.javascriptDialogOpening = self._javascriptDialogOpening + tab.Inspector.targetCrashed = self._targetCrashed # start the tab tab.start() @@ -164,66 +192,37 @@ class SiteLoader: tab.Log.enable () tab.Network.enable() tab.Page.enable () + tab.Inspector.enable () tab.Network.clearBrowserCache () if tab.Network.canClearBrowserCookies ()['result']: tab.Network.clearBrowserCookies () return self + def __exit__ (self, exc_type, exc_value, traceback): + self.tab.Page.stopLoading () + self.tab.stop () + self.browser.close_tab(self.tab) + return False + def __len__ (self): return len (self.requests) + def __iter__ (self): + return iter (self.queue) + def start (self): self.tab.Page.navigate(url=self.url) - def wait (self, timeout=1): - self.tab.wait (timeout) - - def waitIdle (self, idleTimeout=1, maxTimeout=60): - step = 0 - for i in range (0, maxTimeout): - self.wait (1) - if len (self) == 0: - step += 1 - if step > idleTimeout: - break - else: - step = 0 - - def stop (self): - """ - Stop loading site - - XXX: stop executing scripts - """ + # use event to signal presence of new items. This way the controller + # can wait for them without polling. + def _append (self, item): + self.queue.append (item) + self.notify.set () - tab = self.tab - - tab.Page.stopLoading () - tab.Network.disable () - tab.Page.disable () - tab.Log.disable () - # XXX: we can’t drain the event queue directly, so insert (yet another) wait - tab.wait (1) - tab.Network.requestWillBeSent = None - tab.Network.responseReceived = None - tab.Network.loadingFinished = None - tab.Network.loadingFailed = None - tab.Page.loadEventFired = None - tab.Page.javascriptDialogOpening = None - tab.Log.entryAdded = None - - def __exit__ (self, exc_type, exc_value, traceback): - self.tab.stop () - self.browser.close_tab(self.tab) - return False - - # overrideable callbacks - def loadingFinished (self, item, redirect=False): - pass - - def loadingFailed (self, item): - pass + def _appendleft (self, item): + self.queue.appendleft (item) + self.notify.set () # internal chrome callbacks def _requestWillBeSent (self, **kwargs): @@ -244,8 +243,9 @@ class SiteLoader: item.setResponse (resp) resp = {'requestId': reqId, 'encodedDataLength': 0, 'timestamp': kwargs['timestamp']} item.setFinished (resp) - self.loadingFinished (item, redirect=True) + item.isRedirect = True self.logger.info ('redirected request {} has url {}'.format (reqId, req['url'])) + self._append (item) else: self.logger.warning ('request {} already exists, overwriting.'.format (reqId)) @@ -284,13 +284,14 @@ class SiteLoader: if url.scheme in self.allowedSchemes: self.logger.info ('finished {} {}'.format (reqId, req['url'])) item.setFinished (kwargs) - self.loadingFinished (item) + self._append (item) def _loadingFailed (self, **kwargs): reqId = kwargs['requestId'] self.logger.warning ('failed {} {}'.format (reqId, kwargs['errorText'], kwargs.get ('blockedReason'))) item = self.requests.pop (reqId, None) - self.loadingFailed (item) + item.failed = True + self._append (item) def _entryAdded (self, **kwargs): """ Log entry added """ @@ -312,31 +313,10 @@ class SiteLoader: else: self.logger.warning ('unknown javascript dialog type {}'.format (t)) -class AccountingSiteLoader (SiteLoader): - """ - SiteLoader that keeps basic statistics about retrieved pages. - """ - - def __init__ (self, browser, url, logger=logging.getLogger(__name__)): - super ().__init__ (browser, url, logger) - - self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0} - - def loadingFinished (self, item, redirect=False): - super ().loadingFinished (item, redirect) - - self.stats['finished'] += 1 - self.stats['bytesRcv'] += item.encodedDataLength - - def loadingFailed (self, item): - super ().loadingFailed (item) - - self.stats['failed'] += 1 - - def _requestWillBeSent (self, **kwargs): - super ()._requestWillBeSent (**kwargs) - - self.stats['requests'] += 1 + def _targetCrashed (self, **kwargs): + self.logger.error ('browser crashed') + # priority message + self._appendleft (BrowserCrashed ()) import subprocess, os, time from tempfile import mkdtemp @@ -357,7 +337,6 @@ class ChromeService: def __enter__ (self): assert self.p is None self.userDataDir = mkdtemp () - print (self.userDataDir) args = [self.binary, '--window-size={},{}'.format (*self.windowSize), '--user-data-dir={}'.format (self.userDataDir), # use temporory user dir @@ -413,69 +392,65 @@ class NullService: ### tests ### import unittest, time -from http.server import BaseHTTPRequestHandler +from operator import itemgetter -class TestHTTPRequestHandler (BaseHTTPRequestHandler): - encodingTestString = { - 'latin1': 'äöü', - 'utf-8': 'äöü', - 'ISO-8859-1': 'äöü', - } - binaryTestData = b'\x00\x01\x02' - # 1×1 pixel PNG - imageTestData = b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x00\x00\x00\x00:~\x9bU\x00\x00\x00\nIDAT\x08\x1dc\xf8\x0f\x00\x01\x01\x01\x006_g\x80\x00\x00\x00\x00IEND\xaeB`\x82' - htmlTestData = '<html><body><img src="/image"><img src="/nonexistent"></body></html>' - alertData = '<html><body><script>window.addEventListener("beforeunload", function (e) { e.returnValue = "bye?"; return e.returnValue; }); alert("stopping here"); if (confirm("are you sure?") || prompt ("42?")) { window.location = "/nonexistent"; }</script><img src="/image"></body></html>' +class TestItem (Item): + """ This should be as close to Item as possible """ + + base = 'http://localhost:8000/' + + def __init__ (self, path, status, headers, bodyReceive, bodySend=None): + super ().__init__ (tab=None) + self.chromeResponse = {'response': {'headers': headers, 'status': status, 'url': self.base + path}} + self._body = bodyReceive, False + self.bodySend = bodyReceive if not bodySend else bodySend + @property + def body (self): + return self._body + +testItems = [ + TestItem ('binary', 200, {'Content-Type': 'application/octet-stream'}, b'\x00\x01\x02'), + TestItem ('attachment', 200, + {'Content-Type': 'text/plain; charset=utf-8', + 'Content-Disposition': 'attachment; filename="attachment.txt"', + }, + 'This is a simple text file with umlauts. ÄÖU.'.encode ('utf8')), + TestItem ('encoding/utf8', 200, {'Content-Type': 'text/plain; charset=utf-8'}, + 'This is a test, äöü μνψκ ¥¥¥¿ýý¡'.encode ('utf8')), + TestItem ('encoding/iso88591', 200, {'Content-Type': 'text/plain; charset=ISO-8859-1'}, + 'This is a test, äöü.'.encode ('utf8'), + 'This is a test, äöü.'.encode ('ISO-8859-1')), + TestItem ('encoding/latin1', 200, {'Content-Type': 'text/plain; charset=latin1'}, + 'This is a test, äöü.'.encode ('utf8'), + 'This is a test, äöü.'.encode ('latin1')), + TestItem ('image', 200, {'Content-Type': 'image/png'}, + # 1×1 png image + b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x00\x00\x00\x00:~\x9bU\x00\x00\x00\nIDAT\x08\x1dc\xf8\x0f\x00\x01\x01\x01\x006_g\x80\x00\x00\x00\x00IEND\xaeB`\x82'), + TestItem ('empty', 200, {}, b''), + TestItem ('redirect/301/empty', 301, {'Location': '/empty'}, b''), + TestItem ('redirect/301/redirect/301/empty', 301, {'Location': '/redirect/301/empty'}, b''), + TestItem ('nonexistent', 404, {}, b''), + TestItem ('html', 200, {'Content-Type': 'html'}, + '<html><body><img src="/image"><img src="/nonexistent"></body></html>'.encode ('utf8')), + TestItem ('html/alert', 200, {'Content-Type': 'html'}, + '<html><body><script>window.addEventListener("beforeunload", function (e) { e.returnValue = "bye?"; return e.returnValue; }); alert("stopping here"); if (confirm("are you sure?") || prompt ("42?")) { window.location = "/nonexistent"; }</script><img src="/image"></body></html>'.encode ('utf8')), + ] +testItemMap = dict ([(item.parsedUrl.path, item) for item in testItems]) + +class TestHTTPRequestHandler (BaseHTTPRequestHandler): def do_GET(self): - path = self.path - if path.startswith ('/redirect/301'): - self.send_response(301) - self.send_header ('Location', path[13:]) + item = testItemMap.get (self.path) + if item: + self.send_response (item.response['status']) + for k, v in item.response['headers'].items (): + self.send_header (k, v) + body = item.bodySend + self.send_header ('Content-Length', len (body)) self.end_headers() - elif path == '/empty': - self.send_response (200) - self.end_headers () - elif path.startswith ('/encoding'): - # send text data with different encodings - _, _, encoding = path.split ('/', 3) - self.send_response (200) - self.send_header ('Content-Type', 'text/plain; charset={}'.format (encoding)) - self.end_headers () - self.wfile.write (self.encodingTestString[encoding].encode (encoding)) - elif path == '/binary': - # send binary data - self.send_response (200) - self.send_header ('Content-Type', 'application/octet-stream') - self.send_header ('Content-Length', len (self.binaryTestData)) - self.end_headers () - self.wfile.write (self.binaryTestData) - elif path == '/image': - # send binary data - self.send_response (200) - self.send_header ('Content-Type', 'image/png') - self.end_headers () - self.wfile.write (self.imageTestData) - elif path == '/attachment': - self.send_response (200) - self.send_header ('Content-Type', 'text/plain; charset=utf-8') - self.send_header ('Content-Disposition', 'attachment; filename="attachment.txt"') - self.end_headers () - self.wfile.write (self.encodingTestString['utf-8'].encode ('utf-8')) - elif path == '/html': - self.send_response (200) - self.send_header ('Content-Type', 'text/html; charset=utf-8') - self.end_headers () - self.wfile.write (self.htmlTestData.encode ('utf-8')) - elif path == '/alert': - self.send_response (200) - self.send_header ('Content-Type', 'text/html; charset=utf-8') - self.end_headers () - self.wfile.write (self.alertData.encode ('utf-8')) - else: - self.send_response (404) - self.end_headers () - + self.wfile.write (body) + return + def log_message (self, format, *args): pass @@ -485,144 +460,82 @@ def startServer (): httpd = http.server.HTTPServer (("localhost", PORT), TestHTTPRequestHandler) httpd.serve_forever() -class TestSiteLoaderAdapter (SiteLoader): - def __init__ (self, browser, url): - SiteLoader.__init__ (self, browser, url) - self.finished = [] - - def loadingFinished (self, item, redirect=False): - self.finished.append (item) - class TestSiteLoader (unittest.TestCase): def setUp (self): from multiprocessing import Process self.server = Process (target=startServer) self.server.start () - self.baseurl = 'http://localhost:8000/' + self.baseurl = 'http://localhost:8000' self.service = ChromeService () - browserUrl = self.service.__enter__ () - self.browser = pychrome.Browser(url=browserUrl) + self.browser = self.service.__enter__ () def buildAdapter (self, path): - return TestSiteLoaderAdapter (self.browser, '{}{}'.format (self.baseurl, path)) - - def assertUrls (self, l, expect): - urls = set (map (lambda x: x.parsedUrl.path, l.finished)) - expect = set (expect) - self.assertEqual (urls, expect) - - def test_wait (self): - waittime = 2 - with self.buildAdapter ('empty') as l: + self.assertTrue (path.startswith ('/')) + return SiteLoader (self.browser, '{}{}'.format (self.baseurl, path)) + + def assertItems (self, l, items): + items = dict ([(i.parsedUrl.path, i) for i in items]) + timeout = 5 + while True: + if not l.notify.wait (timeout) and len (items) > 0: + self.fail ('timeout') + if len (l.queue) > 0: + item = l.queue.popleft () + if isinstance (item, Exception): + raise item + self.assertIsNot (item.chromeResponse, None, msg='url={}'.format (item.request['url'])) + golden = items.pop (item.parsedUrl.path) + if not golden: + self.fail ('url {} not supposed to be fetched'.format (item.url)) + self.assertEqual (item.body[0], golden.body[0], msg='body for url={}'.format (item.request['url'])) + self.assertEqual (item.response['status'], golden.response['status']) + for k, v in golden.responseHeaders: + actual = list (map (itemgetter (1), filter (lambda x: x[0] == k, item.responseHeaders))) + self.assertIn (v, actual) + + # check queue at least once + if not items: + break + + def assertLiteralItem (self, item, deps=[]): + with self.buildAdapter (item.parsedUrl.path) as l: l.start () - before = time.time () - l.wait (waittime) - after = time.time () - self.assertTrue ((after-before) >= waittime) + self.assertItems (l, [item] + deps) def test_empty (self): - with self.buildAdapter ('empty') as l: - l.start () - l.waitIdle () - self.assertEqual (len (l.finished), 1) + self.assertLiteralItem (testItemMap['/empty']) - def test_redirect301 (self): - with self.buildAdapter ('redirect/301/empty') as l: - l.start () - l.waitIdle () - self.assertEqual (len (l.finished), 2) - self.assertUrls (l, ['/redirect/301/empty', '/empty']) - for item in l.finished: - if item.parsedUrl.path == '/empty': - self.assertEqual (item.response['status'], 200) - self.assertEqual (item.body[0], b'') - elif item.parsedUrl.path == '/redirect/301/empty': - self.assertEqual (item.response['status'], 301) - else: - self.fail ('unknown url') - - def test_redirect301multi (self): - with self.buildAdapter ('redirect/301/redirect/301/empty') as l: - l.start () - l.waitIdle () - self.assertEqual (len (l.finished), 3) - self.assertUrls (l, ['/redirect/301/redirect/301/empty', '/redirect/301/empty', '/empty']) - for item in l.finished: - if item.parsedUrl.path == '/empty': - self.assertEqual (item.response['status'], 200) - self.assertEqual (item.body[0], b'') - elif item.parsedUrl.path in {'/redirect/301/empty', \ - '/redirect/301/redirect/301/empty'}: - self.assertEqual (item.response['status'], 301) - else: - self.fail ('unknown url') + def test_redirect (self): + self.assertLiteralItem (testItemMap['/redirect/301/empty'], [testItemMap['/empty']]) + # chained redirects + self.assertLiteralItem (testItemMap['/redirect/301/redirect/301/empty'], [testItemMap['/redirect/301/empty'], testItemMap['/empty']]) def test_encoding (self): """ Text responses are transformed to UTF-8. Make sure this works correctly. """ - for encoding, expected in TestHTTPRequestHandler.encodingTestString.items (): - with self.buildAdapter ('encoding/{}'.format (encoding)) as l: - l.start () - l.waitIdle () - self.assertEqual (len (l.finished), 1) - self.assertUrls (l, ['/encoding/{}'.format (encoding)]) - self.assertEqual (l.finished[0].body[0], expected.encode ('utf8')) + for item in {testItemMap['/encoding/utf8'], testItemMap['/encoding/latin1'], testItemMap['/encoding/iso88591']}: + self.assertLiteralItem (item) def test_binary (self): """ Browser should ignore content it cannot display (i.e. octet-stream) """ - with self.buildAdapter ('binary') as l: + with self.buildAdapter ('/binary') as l: l.start () - l.waitIdle () - self.assertEqual (len (l.finished), 0) + self.assertItems (l, []) def test_image (self): """ Images should be displayed inline """ - with self.buildAdapter ('image') as l: - l.start () - l.waitIdle () - self.assertEqual (len (l.finished), 1) - self.assertUrls (l, ['/image']) - self.assertEqual (l.finished[0].body[0], TestHTTPRequestHandler.imageTestData) + self.assertLiteralItem (testItemMap['/image']) def test_attachment (self): - """ And downloads won’t work in headless mode """ - with self.buildAdapter ('attachment') as l: + """ And downloads won’t work in headless mode, even if it’s just a text file """ + with self.buildAdapter ('/attachment') as l: l.start () - l.waitIdle () - self.assertEqual (len (l.finished), 0) + self.assertItems (l, []) def test_html (self): - with self.buildAdapter ('html') as l: - l.start () - l.waitIdle () - self.assertEqual (len (l.finished), 3) - self.assertUrls (l, ['/html', '/image', '/nonexistent']) - for item in l.finished: - if item.parsedUrl.path == '/html': - self.assertEqual (item.response['status'], 200) - self.assertEqual (item.body[0], TestHTTPRequestHandler.htmlTestData.encode ('utf-8')) - elif item.parsedUrl.path == '/image': - self.assertEqual (item.response['status'], 200) - self.assertEqual (item.body[0], TestHTTPRequestHandler.imageTestData) - elif item.parsedUrl.path == '/nonexistent': - self.assertEqual (item.response['status'], 404) - else: - self.fail ('unknown url') - - def test_alert (self): - with self.buildAdapter ('alert') as l: - l.start () - l.waitIdle () - self.assertUrls (l, ['/alert', '/image']) - for item in l.finished: - if item.parsedUrl.path == '/alert': - self.assertEqual (item.response['status'], 200) - self.assertEqual (item.body[0], TestHTTPRequestHandler.alertData.encode ('utf-8')) - elif item.parsedUrl.path == '/image': - self.assertEqual (item.response['status'], 200) - self.assertEqual (item.body[0], TestHTTPRequestHandler.imageTestData) - else: - self.fail ('unknown url') + self.assertLiteralItem (testItemMap['/html'], [testItemMap['/image'], testItemMap['/nonexistent']]) + # make sure alerts are dismissed correctly (image won’t load otherwise) + self.assertLiteralItem (testItemMap['/html/alert'], [testItemMap['/image']]) def tearDown (self): self.service.__exit__ (None, None, None) diff --git a/crocoite/cli.py b/crocoite/cli.py index f6454da..d631f10 100644 --- a/crocoite/cli.py +++ b/crocoite/cli.py @@ -26,8 +26,9 @@ import logging, argparse, json, sys from . import behavior from .controller import RecursiveController, defaultSettings, \ - ControllerSettings, DepthLimit, PrefixLimit + ControllerSettings, DepthLimit, PrefixLimit, StatsHandler from .browser import NullService, ChromeService +from .warc import WarcHandler def parseRecursive (recursive, url): if recursive is None: @@ -41,6 +42,7 @@ def parseRecursive (recursive, url): def main (): parser = argparse.ArgumentParser(description='Save website to WARC using Google Chrome.') + parser.add_argument('--debug', help='Enable debug messages', action='store_true') parser.add_argument('--browser', help='DevTools URL', metavar='URL') parser.add_argument('--recursive', help='Follow links recursively') parser.add_argument('--concurrency', '-j', type=int, default=1) @@ -50,8 +52,8 @@ def main (): parser.add_argument('--max-body-size', default=defaultSettings.maxBodySize, type=int, dest='maxBodySize', help='Max body size', metavar='BYTES') parser.add_argument('--behavior', help='Comma-separated list of enabled behavior scripts', dest='enabledBehaviorNames', - default=list (behavior.availableNames), - choices=list (behavior.availableNames)) + default=list (behavior.availableMap.keys ()), + choices=list (behavior.availableMap.keys ())) group = parser.add_mutually_exclusive_group (required=True) group.add_argument('--output', help='WARC filename', metavar='FILE') group.add_argument('--distributed', help='Use celery worker', action='store_true') @@ -71,7 +73,8 @@ def main (): recursive=args.recursive, concurrency=args.concurrency) r = result.get () else: - logging.basicConfig (level=logging.INFO) + level = logging.DEBUG if args.debug else logging.INFO + logging.basicConfig (level=level) try: recursionPolicy = parseRecursive (args.recursive, args.url) @@ -84,9 +87,13 @@ def main (): logBuffer=args.logBuffer, idleTimeout=args.idleTimeout, timeout=args.timeout) with open (args.output, 'wb') as fd: + handler = [StatsHandler (), WarcHandler (fd)] + b = list (map (lambda x: behavior.availableMap[x], args.enabledBehaviorNames)) controller = RecursiveController (args.url, fd, settings=settings, - recursionPolicy=recursionPolicy, service=service) - r = controller.run () + recursionPolicy=recursionPolicy, service=service, + handler=handler, behavior=b) + controller.run () + r = handler[0].stats json.dump (r, sys.stdout) return True diff --git a/crocoite/controller.py b/crocoite/controller.py index bc6f948..cdae268 100644 --- a/crocoite/controller.py +++ b/crocoite/controller.py @@ -35,99 +35,151 @@ class ControllerSettings: defaultSettings = ControllerSettings () -import logging -from urllib.parse import urlsplit, urlunsplit +class EventHandler: + """ Abstract base class for event handler """ + + # this handler wants to know about exceptions before they are reraised by + # the controller + acceptException = False + + def push (self, item): + raise NotImplementedError () + +from .browser import BrowserCrashed + +class StatsHandler (EventHandler): + acceptException = True -import pychrome + def __init__ (self): + self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0, 'crashed': 0} + + def push (self, item): + if isinstance (item, Item): + self.stats['requests'] += 1 + if item.failed: + self.stats['failed'] += 1 + else: + self.stats['finished'] += 1 + self.stats['bytesRcv'] += item.encodedDataLength + elif isinstance (item, BrowserCrashed): + self.stats['crashed'] += 1 + +import logging, time +from urllib.parse import urlsplit, urlunsplit from . import behavior as cbehavior -from .browser import ChromeService -from .warc import WarcLoader, SerializingWARCWriter +from .browser import ChromeService, SiteLoader, Item from .util import getFormattedViewportMetrics -def firstOrNone (it): - """ Return first item of iterator it or None if empty """ - try: - return next (it) - except StopIteration: - return None +class ControllerStart: + def __init__ (self, payload): + self.payload = payload class SinglePageController: """ Archive a single page url to file output. + + Dispatches between producer (site loader and behavior scripts) and consumer + (stats, warc writer). """ def __init__ (self, url, output, service=ChromeService (), behavior=cbehavior.available, \ - logger=logging.getLogger(__name__), settings=defaultSettings): + logger=logging.getLogger(__name__), settings=defaultSettings, handler=[]): self.url = url self.output = output self.service = service self.behavior = behavior self.settings = settings self.logger = logger + self.handler = handler + + def processItem (self, item): + if isinstance (item, Exception): + for h in self.handler: + if h.acceptException: + h.push (item) + raise item + + for h in self.handler: + h.push (item) def run (self): - ret = {'stats': None, 'links': []} - - with self.service as browser: - browser = pychrome.Browser (url=browser) - writer = SerializingWARCWriter (self.output, gzip=True) - - with WarcLoader (browser, self.url, writer, - logBuffer=self.settings.logBuffer, - maxBodySize=self.settings.maxBodySize) as l: - version = l.tab.Browser.getVersion () - payload = { - 'software': __package__, - 'browser': version['product'], - 'useragent': version['userAgent'], - 'viewport': getFormattedViewportMetrics (l.tab), - } - warcinfo = writer.create_warcinfo_record (filename=None, info=payload) - writer.write_record (warcinfo) - - # not all behavior scripts are allowed for every URL, filter them - enabledBehavior = list (filter (lambda x: self.url in x, - map (lambda x: x (l), self.behavior))) - linksBehavior = firstOrNone (filter (lambda x: isinstance (x, cbehavior.ExtractLinks), - enabledBehavior)) - - for b in enabledBehavior: - self.logger.debug ('starting onload behavior {}'.format (b.name)) - b.onload () - l.start () - - l.waitIdle (self.settings.idleTimeout, self.settings.timeout) - - for b in enabledBehavior: - self.logger.debug ('starting onstop behavior {}'.format (b.name)) - b.onstop () - - # if we stopped due to timeout, wait for remaining assets - l.waitIdle (2, 60) - l.stop () - - for b in enabledBehavior: - self.logger.debug ('starting onfinish behavior {}'.format (b.name)) - b.onfinish () - - ret['stats'] = l.stats - ret['links'] = linksBehavior.links if linksBehavior else None - writer.flush () - return ret - -from collections import UserDict - -class IntegerDict (UserDict): - """ Dict with dict/dict per-item arithmetic propagation, i.e. {1: 2}+{1: 1}={1: 3} """ - def __add__ (self, b): - newdict = self.__class__ (self) - for k, v in b.items (): - if k in self: - newdict[k] += v - else: - newdict[k] = v - return newdict + def processQueue (): + # XXX: this is very ugly code and does not work well. figure out a + # better way to impose timeouts and still process all items in the + # queue + queue = l.queue + self.logger.debug ('processing at least {} queue items'.format (len (queue))) + while True: + now = time.time () + elapsed = now-start + maxTimeout = max (min (self.settings.idleTimeout, self.settings.timeout-elapsed), 0) + self.logger.debug ('max timeout is {} with elapsed {}'.format (maxTimeout, elapsed)) + # skip waiting if there is work to do. processes all items in + # queue, regardless of timeouts, i.e. you need to make sure the + # queue will actually be empty at some point. + if len (queue) == 0: + if not l.notify.wait (maxTimeout): + assert len (queue) == 0, "event must be sent" + # timed out + self.logger.debug ('timed out after {}'.format (elapsed)) + break + else: + l.notify.clear () + # limit number of items processed here, otherwise timeout won’t + # be checked frequently. this can happen if the site quickly + # loads a lot of items. + for i in range (1000): + try: + item = queue.popleft () + self.logger.debug ('queue pop: {!r}, len now {}'.format (item, len (queue))) + except IndexError: + break + self.processItem (item) + if maxTimeout == 0: + break + + with self.service as browser, SiteLoader (browser, self.url, logger=self.logger) as l: + start = time.time () + + version = l.tab.Browser.getVersion () + payload = { + 'software': __package__, + 'browser': version['product'], + 'useragent': version['userAgent'], + 'viewport': getFormattedViewportMetrics (l.tab), + } + self.processItem (ControllerStart (payload)) + + # not all behavior scripts are allowed for every URL, filter them + enabledBehavior = list (filter (lambda x: self.url in x, + map (lambda x: x (l), self.behavior))) + + for b in enabledBehavior: + self.logger.debug ('starting onload {}'.format (b)) + # I decided against using the queue here to limit memory + # usage (screenshot behavior would put all images into + # queue before we could process them) + for item in b.onload (): + self.processItem (item) + l.start () + + processQueue () + + for b in enabledBehavior: + self.logger.debug ('starting onstop {}'.format (b)) + for item in b.onstop (): + self.processItem (item) + + # if we stopped due to timeout, wait for remaining assets + processQueue () + + for b in enabledBehavior: + self.logger.debug ('starting onfinish {}'.format (b)) + for item in b.onfinish (): + self.processItem (item) + + processQueue () class RecursionPolicy: """ Abstract recursion policy """ @@ -172,7 +224,9 @@ def removeFragment (u): s = urlsplit (u) return urlunsplit ((s.scheme, s.netloc, s.path, s.query, '')) -class RecursiveController: +from .behavior import ExtractLinksEvent + +class RecursiveController (EventHandler): """ Simple recursive controller @@ -181,7 +235,7 @@ class RecursiveController: def __init__ (self, url, output, service=ChromeService (), behavior=cbehavior.available, \ logger=logging.getLogger(__name__), settings=defaultSettings, - recursionPolicy=DepthLimit (0)): + recursionPolicy=DepthLimit (0), handler=[]): self.url = url self.output = output self.service = service @@ -189,37 +243,45 @@ class RecursiveController: self.settings = settings self.logger = logger self.recursionPolicy = recursionPolicy + self.handler = handler + self.handler.append (self) def fetch (self, urls): """ Overrideable fetch action for URLs. Defaults to sequential SinglePageController. """ - result = [] for u in urls: - c = SinglePageController (u, self.output, self.service, - self.behavior, self.logger, self.settings) - result.append (c.run ()) - return result + try: + c = SinglePageController (u, self.output, self.service, + self.behavior, self.logger, self.settings, self.handler) + c.run () + except BrowserCrashed: + # this is fine if reported + self.logger.error ('browser crashed for {}'.format (u)) def run (self): - have = set () - urls = set ([self.url]) - ret = {'stats': IntegerDict ()} - - while urls: - self.logger.info ('retrieving {} urls'.format (len (urls))) - result = self.fetch (urls) - - have.update (urls) - urls = set () - for r in result: - ret['stats'] += r['stats'] - urls.update (map (removeFragment, r['links'])) - urls.difference_update (have) - - urls = self.recursionPolicy (urls) - # everything in ret must be serializeable - ret['stats'] = dict (ret['stats']) - return ret + self.have = set () + self.urls = set ([self.url]) + + while self.urls: + self.logger.info ('retrieving {} urls'.format (len (self.urls))) + + self.have.update (self.urls) + fetchurls = self.urls + self.urls = set () + + # handler appends new urls to self.urls through push() + self.fetch (fetchurls) + + # remove urls we have and apply recursion policy + self.urls.difference_update (self.have) + self.urls = self.recursionPolicy (self.urls) + + def push (self, item): + if isinstance (item, ExtractLinksEvent): + self.logger.debug ('adding extracted links: {}'.format (item.links)) + self.urls.update (map (removeFragment, item.links)) + else: + self.logger.debug ('{} got unhandled event {!r}'.format (self, item)) diff --git a/crocoite/task.py b/crocoite/task.py index e93cfde..48fe5d8 100644 --- a/crocoite/task.py +++ b/crocoite/task.py @@ -38,10 +38,11 @@ _monkeyPatchSyncTasks () from celery import Celery from celery.utils.log import get_task_logger -from .browser import ChromeService -from .controller import SinglePageController, ControllerSettings, RecursiveController, defaultSettings, DepthLimit, PrefixLimit +from .browser import ChromeService, BrowserCrashed +from .controller import SinglePageController, ControllerSettings, RecursiveController, defaultSettings, DepthLimit, PrefixLimit, StatsHandler from . import behavior from .cli import parseRecursive +from .warc import WarcHandler app = Celery ('crocoite.distributed') app.config_from_object('celeryconfig') @@ -77,16 +78,35 @@ def archive (self, url, settings, enabledBehaviorNames): outPath = os.path.join (app.conf.temp_dir, outFile) fd = open (outPath, 'wb') + handler = [StatsHandler (), WarcHandler (fd)] enabledBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available)) settings = ControllerSettings (**settings) - controller = SinglePageController (url, fd, behavior=enabledBehavior, settings=settings) - ret = controller.run () + try: + controller = SinglePageController (url, fd, behavior=enabledBehavior, + settings=settings, handler=handler) + controller.run () + except BrowserCrashed: + # nothing we can do about that + logger.error ('browser crashed for {}'.format (url)) os.makedirs (app.conf.finished_dir, exist_ok=True) outPath = os.path.join (app.conf.finished_dir, outFile) os.rename (fd.name, outPath) - return ret + return handler[0].stats + +from collections import UserDict + +class IntegerDict (UserDict): + """ Dict with dict/dict per-item arithmetic propagation, i.e. {1: 2}+{1: 1}={1: 3} """ + def __add__ (self, b): + newdict = self.__class__ (self) + for k, v in b.items (): + if k in self: + newdict[k] += v + else: + newdict[k] = v + return newdict class DistributedRecursiveController (RecursiveController): """ Distributed, recursive controller using celery """ @@ -96,6 +116,7 @@ class DistributedRecursiveController (RecursiveController): recursionPolicy=DepthLimit (0), concurrency=1): super ().__init__ (url, None, service, behavior, logger, settings, recursionPolicy) self.concurrency = concurrency + self.stats = IntegerDict () def fetch (self, urls): def chunksIter (urls): @@ -104,7 +125,8 @@ class DistributedRecursiveController (RecursiveController): itemsPerTask = len (urls)//self.concurrency if itemsPerTask <= 0: itemsPerTask = len (urls) - return chain.from_iterable (archive.chunks (chunksIter (urls), itemsPerTask).apply_async ().get ()) + result = archive.chunks (chunksIter (urls), itemsPerTask).apply_async ().get () + self.stats = sum (chain.from_iterable (result), self.stats) @app.task(bind=True, track_started=True) def controller (self, url, settings, enabledBehaviorNames, recursive, concurrency): @@ -115,5 +137,7 @@ def controller (self, url, settings, enabledBehaviorNames, recursive, concurrenc settings = ControllerSettings (**settings) controller = DistributedRecursiveController (url, None, behavior=enabledBehavior, settings=settings, recursionPolicy=recursionPolicy, concurrency=concurrency) - return controller.run () + controller.run () + return dict (controller.stats) + diff --git a/crocoite/warc.py b/crocoite/warc.py index 3fd65e4..fd8ce8d 100644 --- a/crocoite/warc.py +++ b/crocoite/warc.py @@ -24,7 +24,6 @@ Classes writing data to WARC files import logging import json -from http.server import BaseHTTPRequestHandler from io import BytesIO from warcio.statusandheaders import StatusAndHeaders from urllib.parse import urlsplit @@ -36,95 +35,18 @@ from queue import Queue from warcio.timeutils import datetime_to_iso_date from warcio.warcwriter import WARCWriter -from .browser import AccountingSiteLoader from .util import packageUrl -from .controller import defaultSettings - -class SerializingWARCWriter (WARCWriter): - """ - Serializing WARC writer using separate writer thread and queue for - non-blocking operation - - Needs an explicit .flush() before deletion. - """ - - def __init__ (self, filebuf, *args, **kwargs): - WARCWriter.__init__ (self, filebuf, *args, **kwargs) - self.queue = Queue () - self.thread = Thread (target=self._run_writer) - self.thread.start () - - def flush (self): - self.queue.put (None) - self.thread.join () - self.queue = None - self.thread = None - - def _run_writer (self): - while True: - item = self.queue.get () - if not item: - break - out, record = item - WARCWriter._write_warc_record (self, out, record) - - def _write_warc_record (self, out, record): - self.queue.put ((out, record)) - -class WARCLogHandler (BufferingHandler): - """ - Buffered log handler, flushing to warcio - """ +from .controller import defaultSettings, EventHandler, ControllerStart +from .behavior import Script, DomSnapshotEvent, ScreenshotEvent +from .browser import Item - contentType = 'text/plain; charset=utf-8' - - def __init__ (self, capacity, warcfile): - BufferingHandler.__init__ (self, capacity) - self.warcfile = warcfile - - def flush (self): - self.acquire () - try: - if self.buffer: - buf = '' - for record in self.buffer: - buf += self.format (record) - buf += '\n' - # XXX: record type? - record = self.warcfile.create_warc_record ( - packageUrl ('log'), 'metadata', - payload=BytesIO (buf.encode ('utf8')), - warc_headers_dict={'Content-Type': self.contentType}) - self.warcfile.write_record(record) - self.buffer = [] - finally: - self.release () - -class WarcLoader (AccountingSiteLoader): - def __init__ (self, browser, url, writer, +class WarcHandler (EventHandler): + def __init__ (self, fd, logger=logging.getLogger(__name__), - logBuffer=defaultSettings.logBuffer, maxBodySize=defaultSettings.maxBodySize): - super ().__init__ (browser, url, logger) - self.writer = writer + self.logger = logger + self.writer = WARCWriter (fd, gzip=True) self.maxBodySize = maxBodySize - self.warcLogger = WARCLogHandler (logBuffer, writer) - self.logger.addHandler (self.warcLogger) - - def __exit__ (self, exc_type, exc_value, traceback): - self.logger.removeHandler (self.warcLogger) - self.warcLogger.flush () - return super ().__exit__ (exc_type, exc_value, traceback) - - @staticmethod - def getStatusText (response): - text = response.get ('statusText') - if text: - return text - text = BaseHTTPRequestHandler.responses.get (response['status']) - if text: - return text[0] - return 'No status text available' def _writeRequest (self, item): writer = self.writer @@ -154,13 +76,12 @@ class WarcLoader (AccountingSiteLoader): return record.rec_headers['WARC-Record-ID'] - def _getBody (self, item, redirect): + def _getBody (self, item): reqId = item.id - resp = item.response rawBody = b'' base64Encoded = False - if redirect: + if item.isRedirect: # redirects reuse the same request, thus we cannot safely retrieve # the body (i.e getResponseBody may return the new location’s # body). This is fine. @@ -173,7 +94,7 @@ class WarcLoader (AccountingSiteLoader): rawBody, base64Encoded = item.body return rawBody, base64Encoded - def _writeResponse (self, item, redirect, concurrentTo, rawBody, base64Encoded): + def _writeResponse (self, item, concurrentTo, rawBody, base64Encoded): writer = self.writer reqId = item.id resp = item.response @@ -192,7 +113,7 @@ class WarcLoader (AccountingSiteLoader): } httpHeaders = StatusAndHeaders('{} {}'.format (resp['status'], - self.getStatusText (resp)), item.responseHeaders, + item.statusText), item.responseHeaders, protocol='HTTP/1.1') # Content is saved decompressed and decoded, remove these headers @@ -216,21 +137,63 @@ class WarcLoader (AccountingSiteLoader): http_headers=httpHeaders) writer.write_record(record) - def loadingFinished (self, item, redirect=False): - super ().loadingFinished (item, redirect) - + def _writeScript (self, item): writer = self.writer - - req = item.request - reqId = item.id - resp = item.response - url = urlsplit (resp['url']) - + encoding = 'utf-8' + record = writer.create_warc_record (packageUrl ('script/{}'.format (item.path)), 'metadata', + payload=BytesIO (str (item).encode (encoding)), + warc_headers_dict={'Content-Type': 'application/javascript; charset={}'.format (encoding)}) + writer.write_record (record) + + def _writeItem (self, item): + if item.failed: + # should have been handled by the logger already + return try: # write neither request nor response if we cannot retrieve the body - rawBody, base64Encoded = self._getBody (item, redirect) + rawBody, base64Encoded = self._getBody (item) concurrentTo = self._writeRequest (item) - self._writeResponse (item, redirect, concurrentTo, rawBody, base64Encoded) + self._writeResponse (item, concurrentTo, rawBody, base64Encoded) except ValueError as e: self.logger.error (e.args[0]) + def _writeDomSnapshot (self, item): + writer = self.writer + httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1') + record = writer.create_warc_record (item.url, 'response', + payload=BytesIO (item.document), + http_headers=httpHeaders, + warc_headers_dict={'X-DOM-Snapshot': str (True), + 'X-Chrome-Viewport': item.viewport}) + writer.write_record (record) + + def _writeScreenshot (self, item): + writer = self.writer + url = packageUrl ('screenshot-{}-{}.png'.format (0, item.yoff)) + record = writer.create_warc_record (url, 'resource', + payload=BytesIO (item.data), warc_headers_dict={'Content-Type': 'image/png'}) + writer.write_record (record) + + def _writeControllerStart (self, item): + writer = self.writer + warcinfo = writer.create_warcinfo_record (filename=None, info=item.payload) + writer.write_record (warcinfo) + + route = {Script: _writeScript, + Item: _writeItem, + DomSnapshotEvent: _writeDomSnapshot, + ScreenshotEvent: _writeScreenshot, + ControllerStart: _writeControllerStart, + } + + def push (self, item): + processed = False + for k, v in self.route.items (): + if isinstance (item, k): + v (self, item) + processed = True + break + + if not processed: + self.logger.debug ('unknown event {}'.format (repr (item))) + |