summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLars-Dominik Braun <lars@6xq.net>2018-06-20 11:13:37 +0200
committerLars-Dominik Braun <lars@6xq.net>2018-06-20 11:17:25 +0200
commit7730e0d64ec895091a0dd7eb0e3c6ce2ed02d981 (patch)
tree15d0ca2e0374b7d00a05d5dd5de1e48838e71feb
parent06a06463c0367718b2ed1b2b7f081cff6ca998a0 (diff)
downloadcrocoite-7730e0d64ec895091a0dd7eb0e3c6ce2ed02d981.tar.gz
crocoite-7730e0d64ec895091a0dd7eb0e3c6ce2ed02d981.tar.bz2
crocoite-7730e0d64ec895091a0dd7eb0e3c6ce2ed02d981.zip
Synchronous SiteLoader event handling
Previously a browser crash stalled the entire grab, since events from pychrome were handled asynchronously in a different thread and exceptions were not propagated to the main thread. Now all browser events are stored in a queue and processed by the main thread, allowing us to handle browser crashes gracefully (more or less). This made the following additional changes necessary: - Clear separation between producer (browser) and consumer (WARC, stats, …) - Behavior scripts now yield events as well, instead of accessing the WARC writer - WARC logging was removed (for now) and WARC writer does not require serialization any more
-rw-r--r--contrib/celerycrocoite.py9
-rw-r--r--crocoite/behavior.py124
-rw-r--r--crocoite/browser.py417
-rw-r--r--crocoite/cli.py19
-rw-r--r--crocoite/controller.py260
-rw-r--r--crocoite/task.py38
-rw-r--r--crocoite/warc.py165
7 files changed, 518 insertions, 514 deletions
diff --git a/contrib/celerycrocoite.py b/contrib/celerycrocoite.py
index 26c35ce..d0a02e9 100644
--- a/contrib/celerycrocoite.py
+++ b/contrib/celerycrocoite.py
@@ -88,10 +88,9 @@ def checkCompletedJobs (bot, jobs):
if Identifier (channel) not in bot.channels:
continue
try:
- result = handle.get (timeout=0.1)
- stats = result['stats']
- bot.msg (channel, '{}: {} ({}) finished. {} requests, {} failed, {} received.'.format (user, url,
- handle.id, stats['requests'], stats['failed'],
+ stats = handle.get (timeout=0.1)
+ bot.msg (channel, '{}: {} ({}) finished. {} crashed, {} requests, {} failed, {} received.'.format (user, url,
+ handle.id, stats['crashed'], stats['requests'], stats['failed'],
prettyBytes (stats['bytesRcv'])))
delete.add (handle.id)
except celery.exceptions.TimeoutError:
@@ -198,7 +197,7 @@ def archive (bot, trigger):
logBuffer=defaultSettings.logBuffer, idleTimeout=args.idleTimeout,
timeout=args.timeout)
args = dict (url=args.url,
- enabledBehaviorNames=list (behavior.availableNames-blacklistedBehavior),
+ enabledBehaviorNames=list (set (behavior.availableMap.keys())-blacklistedBehavior),
settings=settings, recursive=args.recursive,
concurrency=args.concurrency)
q = bot.memory['crocoite']['q']
diff --git a/crocoite/behavior.py b/crocoite/behavior.py
index 95e8160..fd8fff8 100644
--- a/crocoite/behavior.py
+++ b/crocoite/behavior.py
@@ -22,21 +22,44 @@
Generic and per-site behavior scripts
"""
-import logging
+import logging, time
from io import BytesIO
from urllib.parse import urlsplit
import os.path
import pkg_resources
from base64 import b64decode
+from collections import OrderedDict
+
+from html5lib.serializer import HTMLSerializer
+from warcio.statusandheaders import StatusAndHeaders
+from pychrome.exceptions import TimeoutException
from .util import randomString, packageUrl, getFormattedViewportMetrics
from . import html
from .html import StripAttributeFilter, StripTagFilter, ChromeTreeWalker
-from html5lib.serializer import HTMLSerializer
-from warcio.statusandheaders import StatusAndHeaders
+from .browser import SiteLoader
logger = logging.getLogger(__name__)
+class Script:
+ """ A JavaScript resource """
+ def __init__ (self, path=None, encoding='utf-8'):
+ self.path = path
+ if path:
+ self.data = pkg_resources.resource_string (__name__, os.path.join ('data', path)).decode (encoding)
+
+ def __repr__ (self):
+ return '<Script {}>'.format (self.path)
+
+ def __str__ (self):
+ return self.data
+
+ @classmethod
+ def fromStr (cls, data):
+ s = Script ()
+ s.data = data
+ return s
+
class Behavior:
# unique behavior name
name = None
@@ -51,27 +74,20 @@ class Behavior:
"""
return True
- def loadScript (self, path, encoding='utf-8'):
- return pkg_resources.resource_string (__name__, os.path.join ('data', path)).decode (encoding)
-
- def useScript (self, script, encoding='utf-8'):
- writer = self.loader.writer
- record = writer.create_warc_record (packageUrl ('script'), 'metadata',
- payload=BytesIO (script.encode (encoding)),
- warc_headers_dict={'Content-Type': 'application/javascript; charset={}'.format (encoding)})
- writer.write_record (record)
+ def __repr__ (self):
+ return '<Behavior {}>'.format (self.name)
def onload (self):
""" Before loading the page """
- pass
+ yield from ()
def onstop (self):
""" Before page loading is stopped """
- pass
+ yield from ()
def onfinish (self):
""" After the site has stopped loading """
- pass
+ yield from ()
class HostnameFilter:
""" Limit behavior script to hostname """
@@ -90,15 +106,16 @@ class JsOnload (Behavior):
def __init__ (self, loader):
super ().__init__ (loader)
- self.script = self.loadScript (self.scriptPath)
+ self.script = Script (self.scriptPath)
self.scriptHandle = None
def onload (self):
- self.useScript (self.script)
- self.scriptHandle = self.loader.tab.Page.addScriptToEvaluateOnNewDocument (source=self.script)['identifier']
+ yield self.script
+ self.scriptHandle = self.loader.tab.Page.addScriptToEvaluateOnNewDocument (source=str (self.script))['identifier']
def onstop (self):
self.loader.tab.Page.removeScriptToEvaluateOnNewDocument (identifier=self.scriptHandle)
+ yield from ()
### Generic scripts ###
@@ -110,15 +127,15 @@ class Scroll (JsOnload):
super ().__init__ (loader)
stopVarname = '__' + __package__ + '_stop__'
newStopVarname = randomString ()
- self.script = self.script.replace (stopVarname, newStopVarname)
+ self.script.data = self.script.data.replace (stopVarname, newStopVarname)
self.stopVarname = newStopVarname
def onstop (self):
super ().onstop ()
# removing the script does not stop it if running
- script = '{} = true; window.scrollTo (0, 0);'.format (self.stopVarname)
- self.useScript (script)
- self.loader.tab.Runtime.evaluate (expression=script, returnByValue=True)
+ script = Script.fromStr ('{} = true; window.scrollTo (0, 0);'.format (self.stopVarname))
+ yield script
+ self.loader.tab.Runtime.evaluate (expression=str (script), returnByValue=True)
class EmulateScreenMetrics (Behavior):
name = 'emulateScreenMetrics'
@@ -147,9 +164,16 @@ class EmulateScreenMetrics (Behavior):
for s in sizes:
tab.Emulation.setDeviceMetricsOverride (**s)
# give the browser time to re-eval page and start requests
- l.wait (1)
+ time.sleep (1)
# XXX: this seems to be broken, it does not clear the override
#tab.Emulation.clearDeviceMetricsOverride ()
+ yield from ()
+
+class DomSnapshotEvent:
+ def __init__ (self, url, document, viewport):
+ self.url = url
+ self.document = document
+ self.viewport = viewport
class DomSnapshot (Behavior):
"""
@@ -166,14 +190,13 @@ class DomSnapshot (Behavior):
def __init__ (self, loader):
super ().__init__ (loader)
- self.script = self.loadScript ('canvas-snapshot.js')
+ self.script = Script ('canvas-snapshot.js')
def onfinish (self):
tab = self.loader.tab
- writer = self.loader.writer
- self.useScript (self.script)
- tab.Runtime.evaluate (expression=self.script, returnByValue=True)
+ yield self.script
+ tab.Runtime.evaluate (expression=str (self.script), returnByValue=True)
viewport = getFormattedViewportMetrics (tab)
dom = tab.DOM.getDocument (depth=-1, pierce=True)
@@ -196,13 +219,12 @@ class DomSnapshot (Behavior):
disallowedAttributes = html.eventAttributes
stream = StripAttributeFilter (StripTagFilter (walker, disallowedTags), disallowedAttributes)
serializer = HTMLSerializer ()
- httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1')
- record = writer.create_warc_record (doc['documentURL'], 'response',
- payload=BytesIO (serializer.render (stream, 'utf-8')),
- http_headers=httpHeaders,
- warc_headers_dict={'X-DOM-Snapshot': str (True),
- 'X-Chrome-Viewport': viewport})
- writer.write_record (record)
+ yield DomSnapshotEvent (doc['documentURL'], serializer.render (stream, 'utf-8'), viewport)
+
+class ScreenshotEvent:
+ def __init__ (self, yoff, data):
+ self.yoff = yoff
+ self.data = data
class Screenshot (Behavior):
"""
@@ -213,7 +235,6 @@ class Screenshot (Behavior):
def onfinish (self):
tab = self.loader.tab
- writer = self.loader.writer
# see https://github.com/GoogleChrome/puppeteer/blob/230be28b067b521f0577206899db01f0ca7fc0d2/examples/screenshots-longpage.js
# Hardcoded max texture size of 16,384 (crbug.com/770769)
@@ -227,10 +248,7 @@ class Screenshot (Behavior):
height = min (contentSize['height'] - yoff, maxDim)
clip = {'x': 0, 'y': yoff, 'width': width, 'height': height, 'scale': 1}
data = b64decode (tab.Page.captureScreenshot (format='png', clip=clip)['data'])
- url = packageUrl ('screenshot-{}-{}.png'.format (0, yoff))
- record = writer.create_warc_record (url, 'resource',
- payload=BytesIO (data), warc_headers_dict={'Content-Type': 'image/png'})
- writer.write_record (record)
+ yield ScreenshotEvent (yoff, data)
class Click (JsOnload):
""" Generic link clicking """
@@ -238,6 +256,10 @@ class Click (JsOnload):
name = 'click'
scriptPath = 'click.js'
+class ExtractLinksEvent:
+ def __init__ (self, links):
+ self.links = links
+
class ExtractLinks (Behavior):
"""
Extract links from a page using JavaScript
@@ -250,19 +272,33 @@ class ExtractLinks (Behavior):
def __init__ (self, loader):
super ().__init__ (loader)
- self.script = self.loadScript ('extract-links.js')
+ self.script = Script ('extract-links.js')
self.links = None
def onfinish (self):
tab = self.loader.tab
- self.useScript (self.script)
- result = tab.Runtime.evaluate (expression=self.script, returnByValue=True)
- self.links = list (set (result['result']['value']))
+ yield self.script
+ result = tab.Runtime.evaluate (expression=str (self.script), returnByValue=True)
+ yield ExtractLinksEvent (list (set (result['result']['value'])))
+
+class Crash (Behavior):
+ """ Crash the browser. For testing only. Obviously. """
+
+ name = 'crash'
+
+ def onstop (self):
+ try:
+ self.loader.tab.Page.crash (_timeout=1)
+ except TimeoutException:
+ pass
+ yield from ()
# available behavior scripts. Order matters, move those modifying the page
# towards the end of available
generic = [Scroll, EmulateScreenMetrics, Click, ExtractLinks]
perSite = []
available = generic + perSite + [Screenshot, DomSnapshot]
-availableNames = set (map (lambda x: x.name, available))
+#available.append (Crash)
+# order matters, since behavior can modify the page (dom snapshots, for instance)
+availableMap = OrderedDict (map (lambda x: (x.name, x), available))
diff --git a/crocoite/browser.py b/crocoite/browser.py
index a891ce7..57d0dd0 100644
--- a/crocoite/browser.py
+++ b/crocoite/browser.py
@@ -25,6 +25,10 @@ Chrome browser interactions.
import logging
from urllib.parse import urlsplit
from base64 import b64decode
+from http.server import BaseHTTPRequestHandler
+from collections import deque
+from threading import Event
+
import pychrome
class Item:
@@ -37,6 +41,8 @@ class Item:
self.chromeRequest = None
self.chromeResponse = None
self.chromeFinished = None
+ self.isRedirect = False
+ self.failed = False
def __repr__ (self):
return '<Item {}>'.format (self.request['url'])
@@ -47,6 +53,7 @@ class Item:
@property
def response (self):
+ assert not self.failed, "you must not access response if failed is set"
return self.chromeResponse['response']
@property
@@ -73,7 +80,7 @@ class Item:
def body (self):
""" Return response body or None """
try:
- body = self.tab.Network.getResponseBody (requestId=self.id, _timeout=60)
+ body = self.tab.Network.getResponseBody (requestId=self.id, _timeout=10)
rawBody = body['body']
base64Encoded = body['base64Encoded']
if base64Encoded:
@@ -93,7 +100,7 @@ class Item:
return postData.encode ('utf8'), False
elif req.get ('hasPostData', False):
try:
- return b64decode (self.tab.Network.getRequestPostData (requestId=self.id, _timeout=60)['postData']), True
+ return b64decode (self.tab.Network.getRequestPostData (requestId=self.id, _timeout=10)['postData']), True
except (pychrome.exceptions.CallMethodException, pychrome.exceptions.TimeoutException):
raise ValueError ('Cannot fetch request body')
return None, False
@@ -108,6 +115,16 @@ class Item:
def responseHeaders (self):
return self._unfoldHeaders (self.response['headers'])
+ @property
+ def statusText (self):
+ text = self.response.get ('statusText')
+ if text:
+ return text
+ text = BaseHTTPRequestHandler.responses.get (self.response['status'])
+ if text:
+ return text[0]
+ return 'No status text available'
+
@staticmethod
def _unfoldHeaders (headers):
"""
@@ -129,10 +146,21 @@ class Item:
def setFinished (self, finished):
self.chromeFinished = finished
+class BrowserCrashed (Exception):
+ pass
+
class SiteLoader:
"""
Load site in Chrome and monitor network requests
+ Chrome’s raw devtools events are preprocessed here (asynchronously, in a
+ different thread, spawned by pychrome) and put into a deque. There
+ are two reasons for this: First of all, it makes consumer exception
+ handling alot easier (no need to propagate them to the main thread). And
+ secondly, browser crashes must be handled before everything else, as they
+ result in a loss of communication with the browser itself (i.e. we can’t
+ fetch a resource’s body any more).
+
XXX: track popup windows/new tabs and close them
"""
@@ -140,22 +168,22 @@ class SiteLoader:
def __init__ (self, browser, url, logger=logging.getLogger(__name__)):
self.requests = {}
- self.browser = browser
+ self.browser = pychrome.Browser (url=browser)
self.url = url
self.logger = logger
-
- self.tab = browser.new_tab()
+ self.queue = deque ()
+ self.notify = Event ()
def __enter__ (self):
- tab = self.tab
+ tab = self.tab = self.browser.new_tab()
# setup callbacks
tab.Network.requestWillBeSent = self._requestWillBeSent
tab.Network.responseReceived = self._responseReceived
tab.Network.loadingFinished = self._loadingFinished
tab.Network.loadingFailed = self._loadingFailed
tab.Log.entryAdded = self._entryAdded
- #tab.Page.loadEventFired = loadEventFired
tab.Page.javascriptDialogOpening = self._javascriptDialogOpening
+ tab.Inspector.targetCrashed = self._targetCrashed
# start the tab
tab.start()
@@ -164,66 +192,37 @@ class SiteLoader:
tab.Log.enable ()
tab.Network.enable()
tab.Page.enable ()
+ tab.Inspector.enable ()
tab.Network.clearBrowserCache ()
if tab.Network.canClearBrowserCookies ()['result']:
tab.Network.clearBrowserCookies ()
return self
+ def __exit__ (self, exc_type, exc_value, traceback):
+ self.tab.Page.stopLoading ()
+ self.tab.stop ()
+ self.browser.close_tab(self.tab)
+ return False
+
def __len__ (self):
return len (self.requests)
+ def __iter__ (self):
+ return iter (self.queue)
+
def start (self):
self.tab.Page.navigate(url=self.url)
- def wait (self, timeout=1):
- self.tab.wait (timeout)
-
- def waitIdle (self, idleTimeout=1, maxTimeout=60):
- step = 0
- for i in range (0, maxTimeout):
- self.wait (1)
- if len (self) == 0:
- step += 1
- if step > idleTimeout:
- break
- else:
- step = 0
-
- def stop (self):
- """
- Stop loading site
-
- XXX: stop executing scripts
- """
+ # use event to signal presence of new items. This way the controller
+ # can wait for them without polling.
+ def _append (self, item):
+ self.queue.append (item)
+ self.notify.set ()
- tab = self.tab
-
- tab.Page.stopLoading ()
- tab.Network.disable ()
- tab.Page.disable ()
- tab.Log.disable ()
- # XXX: we can’t drain the event queue directly, so insert (yet another) wait
- tab.wait (1)
- tab.Network.requestWillBeSent = None
- tab.Network.responseReceived = None
- tab.Network.loadingFinished = None
- tab.Network.loadingFailed = None
- tab.Page.loadEventFired = None
- tab.Page.javascriptDialogOpening = None
- tab.Log.entryAdded = None
-
- def __exit__ (self, exc_type, exc_value, traceback):
- self.tab.stop ()
- self.browser.close_tab(self.tab)
- return False
-
- # overrideable callbacks
- def loadingFinished (self, item, redirect=False):
- pass
-
- def loadingFailed (self, item):
- pass
+ def _appendleft (self, item):
+ self.queue.appendleft (item)
+ self.notify.set ()
# internal chrome callbacks
def _requestWillBeSent (self, **kwargs):
@@ -244,8 +243,9 @@ class SiteLoader:
item.setResponse (resp)
resp = {'requestId': reqId, 'encodedDataLength': 0, 'timestamp': kwargs['timestamp']}
item.setFinished (resp)
- self.loadingFinished (item, redirect=True)
+ item.isRedirect = True
self.logger.info ('redirected request {} has url {}'.format (reqId, req['url']))
+ self._append (item)
else:
self.logger.warning ('request {} already exists, overwriting.'.format (reqId))
@@ -284,13 +284,14 @@ class SiteLoader:
if url.scheme in self.allowedSchemes:
self.logger.info ('finished {} {}'.format (reqId, req['url']))
item.setFinished (kwargs)
- self.loadingFinished (item)
+ self._append (item)
def _loadingFailed (self, **kwargs):
reqId = kwargs['requestId']
self.logger.warning ('failed {} {}'.format (reqId, kwargs['errorText'], kwargs.get ('blockedReason')))
item = self.requests.pop (reqId, None)
- self.loadingFailed (item)
+ item.failed = True
+ self._append (item)
def _entryAdded (self, **kwargs):
""" Log entry added """
@@ -312,31 +313,10 @@ class SiteLoader:
else:
self.logger.warning ('unknown javascript dialog type {}'.format (t))
-class AccountingSiteLoader (SiteLoader):
- """
- SiteLoader that keeps basic statistics about retrieved pages.
- """
-
- def __init__ (self, browser, url, logger=logging.getLogger(__name__)):
- super ().__init__ (browser, url, logger)
-
- self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0}
-
- def loadingFinished (self, item, redirect=False):
- super ().loadingFinished (item, redirect)
-
- self.stats['finished'] += 1
- self.stats['bytesRcv'] += item.encodedDataLength
-
- def loadingFailed (self, item):
- super ().loadingFailed (item)
-
- self.stats['failed'] += 1
-
- def _requestWillBeSent (self, **kwargs):
- super ()._requestWillBeSent (**kwargs)
-
- self.stats['requests'] += 1
+ def _targetCrashed (self, **kwargs):
+ self.logger.error ('browser crashed')
+ # priority message
+ self._appendleft (BrowserCrashed ())
import subprocess, os, time
from tempfile import mkdtemp
@@ -357,7 +337,6 @@ class ChromeService:
def __enter__ (self):
assert self.p is None
self.userDataDir = mkdtemp ()
- print (self.userDataDir)
args = [self.binary,
'--window-size={},{}'.format (*self.windowSize),
'--user-data-dir={}'.format (self.userDataDir), # use temporory user dir
@@ -413,69 +392,65 @@ class NullService:
### tests ###
import unittest, time
-from http.server import BaseHTTPRequestHandler
+from operator import itemgetter
-class TestHTTPRequestHandler (BaseHTTPRequestHandler):
- encodingTestString = {
- 'latin1': 'äöü',
- 'utf-8': 'äöü',
- 'ISO-8859-1': 'äöü',
- }
- binaryTestData = b'\x00\x01\x02'
- # 1×1 pixel PNG
- imageTestData = b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x00\x00\x00\x00:~\x9bU\x00\x00\x00\nIDAT\x08\x1dc\xf8\x0f\x00\x01\x01\x01\x006_g\x80\x00\x00\x00\x00IEND\xaeB`\x82'
- htmlTestData = '<html><body><img src="/image"><img src="/nonexistent"></body></html>'
- alertData = '<html><body><script>window.addEventListener("beforeunload", function (e) { e.returnValue = "bye?"; return e.returnValue; }); alert("stopping here"); if (confirm("are you sure?") || prompt ("42?")) { window.location = "/nonexistent"; }</script><img src="/image"></body></html>'
+class TestItem (Item):
+ """ This should be as close to Item as possible """
+
+ base = 'http://localhost:8000/'
+
+ def __init__ (self, path, status, headers, bodyReceive, bodySend=None):
+ super ().__init__ (tab=None)
+ self.chromeResponse = {'response': {'headers': headers, 'status': status, 'url': self.base + path}}
+ self._body = bodyReceive, False
+ self.bodySend = bodyReceive if not bodySend else bodySend
+ @property
+ def body (self):
+ return self._body
+
+testItems = [
+ TestItem ('binary', 200, {'Content-Type': 'application/octet-stream'}, b'\x00\x01\x02'),
+ TestItem ('attachment', 200,
+ {'Content-Type': 'text/plain; charset=utf-8',
+ 'Content-Disposition': 'attachment; filename="attachment.txt"',
+ },
+ 'This is a simple text file with umlauts. ÄÖU.'.encode ('utf8')),
+ TestItem ('encoding/utf8', 200, {'Content-Type': 'text/plain; charset=utf-8'},
+ 'This is a test, äöü μνψκ ¥¥¥¿ýý¡'.encode ('utf8')),
+ TestItem ('encoding/iso88591', 200, {'Content-Type': 'text/plain; charset=ISO-8859-1'},
+ 'This is a test, äöü.'.encode ('utf8'),
+ 'This is a test, äöü.'.encode ('ISO-8859-1')),
+ TestItem ('encoding/latin1', 200, {'Content-Type': 'text/plain; charset=latin1'},
+ 'This is a test, äöü.'.encode ('utf8'),
+ 'This is a test, äöü.'.encode ('latin1')),
+ TestItem ('image', 200, {'Content-Type': 'image/png'},
+ # 1×1 png image
+ b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x00\x00\x00\x00:~\x9bU\x00\x00\x00\nIDAT\x08\x1dc\xf8\x0f\x00\x01\x01\x01\x006_g\x80\x00\x00\x00\x00IEND\xaeB`\x82'),
+ TestItem ('empty', 200, {}, b''),
+ TestItem ('redirect/301/empty', 301, {'Location': '/empty'}, b''),
+ TestItem ('redirect/301/redirect/301/empty', 301, {'Location': '/redirect/301/empty'}, b''),
+ TestItem ('nonexistent', 404, {}, b''),
+ TestItem ('html', 200, {'Content-Type': 'html'},
+ '<html><body><img src="/image"><img src="/nonexistent"></body></html>'.encode ('utf8')),
+ TestItem ('html/alert', 200, {'Content-Type': 'html'},
+ '<html><body><script>window.addEventListener("beforeunload", function (e) { e.returnValue = "bye?"; return e.returnValue; }); alert("stopping here"); if (confirm("are you sure?") || prompt ("42?")) { window.location = "/nonexistent"; }</script><img src="/image"></body></html>'.encode ('utf8')),
+ ]
+testItemMap = dict ([(item.parsedUrl.path, item) for item in testItems])
+
+class TestHTTPRequestHandler (BaseHTTPRequestHandler):
def do_GET(self):
- path = self.path
- if path.startswith ('/redirect/301'):
- self.send_response(301)
- self.send_header ('Location', path[13:])
+ item = testItemMap.get (self.path)
+ if item:
+ self.send_response (item.response['status'])
+ for k, v in item.response['headers'].items ():
+ self.send_header (k, v)
+ body = item.bodySend
+ self.send_header ('Content-Length', len (body))
self.end_headers()
- elif path == '/empty':
- self.send_response (200)
- self.end_headers ()
- elif path.startswith ('/encoding'):
- # send text data with different encodings
- _, _, encoding = path.split ('/', 3)
- self.send_response (200)
- self.send_header ('Content-Type', 'text/plain; charset={}'.format (encoding))
- self.end_headers ()
- self.wfile.write (self.encodingTestString[encoding].encode (encoding))
- elif path == '/binary':
- # send binary data
- self.send_response (200)
- self.send_header ('Content-Type', 'application/octet-stream')
- self.send_header ('Content-Length', len (self.binaryTestData))
- self.end_headers ()
- self.wfile.write (self.binaryTestData)
- elif path == '/image':
- # send binary data
- self.send_response (200)
- self.send_header ('Content-Type', 'image/png')
- self.end_headers ()
- self.wfile.write (self.imageTestData)
- elif path == '/attachment':
- self.send_response (200)
- self.send_header ('Content-Type', 'text/plain; charset=utf-8')
- self.send_header ('Content-Disposition', 'attachment; filename="attachment.txt"')
- self.end_headers ()
- self.wfile.write (self.encodingTestString['utf-8'].encode ('utf-8'))
- elif path == '/html':
- self.send_response (200)
- self.send_header ('Content-Type', 'text/html; charset=utf-8')
- self.end_headers ()
- self.wfile.write (self.htmlTestData.encode ('utf-8'))
- elif path == '/alert':
- self.send_response (200)
- self.send_header ('Content-Type', 'text/html; charset=utf-8')
- self.end_headers ()
- self.wfile.write (self.alertData.encode ('utf-8'))
- else:
- self.send_response (404)
- self.end_headers ()
-
+ self.wfile.write (body)
+ return
+
def log_message (self, format, *args):
pass
@@ -485,144 +460,82 @@ def startServer ():
httpd = http.server.HTTPServer (("localhost", PORT), TestHTTPRequestHandler)
httpd.serve_forever()
-class TestSiteLoaderAdapter (SiteLoader):
- def __init__ (self, browser, url):
- SiteLoader.__init__ (self, browser, url)
- self.finished = []
-
- def loadingFinished (self, item, redirect=False):
- self.finished.append (item)
-
class TestSiteLoader (unittest.TestCase):
def setUp (self):
from multiprocessing import Process
self.server = Process (target=startServer)
self.server.start ()
- self.baseurl = 'http://localhost:8000/'
+ self.baseurl = 'http://localhost:8000'
self.service = ChromeService ()
- browserUrl = self.service.__enter__ ()
- self.browser = pychrome.Browser(url=browserUrl)
+ self.browser = self.service.__enter__ ()
def buildAdapter (self, path):
- return TestSiteLoaderAdapter (self.browser, '{}{}'.format (self.baseurl, path))
-
- def assertUrls (self, l, expect):
- urls = set (map (lambda x: x.parsedUrl.path, l.finished))
- expect = set (expect)
- self.assertEqual (urls, expect)
-
- def test_wait (self):
- waittime = 2
- with self.buildAdapter ('empty') as l:
+ self.assertTrue (path.startswith ('/'))
+ return SiteLoader (self.browser, '{}{}'.format (self.baseurl, path))
+
+ def assertItems (self, l, items):
+ items = dict ([(i.parsedUrl.path, i) for i in items])
+ timeout = 5
+ while True:
+ if not l.notify.wait (timeout) and len (items) > 0:
+ self.fail ('timeout')
+ if len (l.queue) > 0:
+ item = l.queue.popleft ()
+ if isinstance (item, Exception):
+ raise item
+ self.assertIsNot (item.chromeResponse, None, msg='url={}'.format (item.request['url']))
+ golden = items.pop (item.parsedUrl.path)
+ if not golden:
+ self.fail ('url {} not supposed to be fetched'.format (item.url))
+ self.assertEqual (item.body[0], golden.body[0], msg='body for url={}'.format (item.request['url']))
+ self.assertEqual (item.response['status'], golden.response['status'])
+ for k, v in golden.responseHeaders:
+ actual = list (map (itemgetter (1), filter (lambda x: x[0] == k, item.responseHeaders)))
+ self.assertIn (v, actual)
+
+ # check queue at least once
+ if not items:
+ break
+
+ def assertLiteralItem (self, item, deps=[]):
+ with self.buildAdapter (item.parsedUrl.path) as l:
l.start ()
- before = time.time ()
- l.wait (waittime)
- after = time.time ()
- self.assertTrue ((after-before) >= waittime)
+ self.assertItems (l, [item] + deps)
def test_empty (self):
- with self.buildAdapter ('empty') as l:
- l.start ()
- l.waitIdle ()
- self.assertEqual (len (l.finished), 1)
+ self.assertLiteralItem (testItemMap['/empty'])
- def test_redirect301 (self):
- with self.buildAdapter ('redirect/301/empty') as l:
- l.start ()
- l.waitIdle ()
- self.assertEqual (len (l.finished), 2)
- self.assertUrls (l, ['/redirect/301/empty', '/empty'])
- for item in l.finished:
- if item.parsedUrl.path == '/empty':
- self.assertEqual (item.response['status'], 200)
- self.assertEqual (item.body[0], b'')
- elif item.parsedUrl.path == '/redirect/301/empty':
- self.assertEqual (item.response['status'], 301)
- else:
- self.fail ('unknown url')
-
- def test_redirect301multi (self):
- with self.buildAdapter ('redirect/301/redirect/301/empty') as l:
- l.start ()
- l.waitIdle ()
- self.assertEqual (len (l.finished), 3)
- self.assertUrls (l, ['/redirect/301/redirect/301/empty', '/redirect/301/empty', '/empty'])
- for item in l.finished:
- if item.parsedUrl.path == '/empty':
- self.assertEqual (item.response['status'], 200)
- self.assertEqual (item.body[0], b'')
- elif item.parsedUrl.path in {'/redirect/301/empty', \
- '/redirect/301/redirect/301/empty'}:
- self.assertEqual (item.response['status'], 301)
- else:
- self.fail ('unknown url')
+ def test_redirect (self):
+ self.assertLiteralItem (testItemMap['/redirect/301/empty'], [testItemMap['/empty']])
+ # chained redirects
+ self.assertLiteralItem (testItemMap['/redirect/301/redirect/301/empty'], [testItemMap['/redirect/301/empty'], testItemMap['/empty']])
def test_encoding (self):
""" Text responses are transformed to UTF-8. Make sure this works
correctly. """
- for encoding, expected in TestHTTPRequestHandler.encodingTestString.items ():
- with self.buildAdapter ('encoding/{}'.format (encoding)) as l:
- l.start ()
- l.waitIdle ()
- self.assertEqual (len (l.finished), 1)
- self.assertUrls (l, ['/encoding/{}'.format (encoding)])
- self.assertEqual (l.finished[0].body[0], expected.encode ('utf8'))
+ for item in {testItemMap['/encoding/utf8'], testItemMap['/encoding/latin1'], testItemMap['/encoding/iso88591']}:
+ self.assertLiteralItem (item)
def test_binary (self):
""" Browser should ignore content it cannot display (i.e. octet-stream) """
- with self.buildAdapter ('binary') as l:
+ with self.buildAdapter ('/binary') as l:
l.start ()
- l.waitIdle ()
- self.assertEqual (len (l.finished), 0)
+ self.assertItems (l, [])
def test_image (self):
""" Images should be displayed inline """
- with self.buildAdapter ('image') as l:
- l.start ()
- l.waitIdle ()
- self.assertEqual (len (l.finished), 1)
- self.assertUrls (l, ['/image'])
- self.assertEqual (l.finished[0].body[0], TestHTTPRequestHandler.imageTestData)
+ self.assertLiteralItem (testItemMap['/image'])
def test_attachment (self):
- """ And downloads won’t work in headless mode """
- with self.buildAdapter ('attachment') as l:
+ """ And downloads won’t work in headless mode, even if it’s just a text file """
+ with self.buildAdapter ('/attachment') as l:
l.start ()
- l.waitIdle ()
- self.assertEqual (len (l.finished), 0)
+ self.assertItems (l, [])
def test_html (self):
- with self.buildAdapter ('html') as l:
- l.start ()
- l.waitIdle ()
- self.assertEqual (len (l.finished), 3)
- self.assertUrls (l, ['/html', '/image', '/nonexistent'])
- for item in l.finished:
- if item.parsedUrl.path == '/html':
- self.assertEqual (item.response['status'], 200)
- self.assertEqual (item.body[0], TestHTTPRequestHandler.htmlTestData.encode ('utf-8'))
- elif item.parsedUrl.path == '/image':
- self.assertEqual (item.response['status'], 200)
- self.assertEqual (item.body[0], TestHTTPRequestHandler.imageTestData)
- elif item.parsedUrl.path == '/nonexistent':
- self.assertEqual (item.response['status'], 404)
- else:
- self.fail ('unknown url')
-
- def test_alert (self):
- with self.buildAdapter ('alert') as l:
- l.start ()
- l.waitIdle ()
- self.assertUrls (l, ['/alert', '/image'])
- for item in l.finished:
- if item.parsedUrl.path == '/alert':
- self.assertEqual (item.response['status'], 200)
- self.assertEqual (item.body[0], TestHTTPRequestHandler.alertData.encode ('utf-8'))
- elif item.parsedUrl.path == '/image':
- self.assertEqual (item.response['status'], 200)
- self.assertEqual (item.body[0], TestHTTPRequestHandler.imageTestData)
- else:
- self.fail ('unknown url')
+ self.assertLiteralItem (testItemMap['/html'], [testItemMap['/image'], testItemMap['/nonexistent']])
+ # make sure alerts are dismissed correctly (image won’t load otherwise)
+ self.assertLiteralItem (testItemMap['/html/alert'], [testItemMap['/image']])
def tearDown (self):
self.service.__exit__ (None, None, None)
diff --git a/crocoite/cli.py b/crocoite/cli.py
index f6454da..d631f10 100644
--- a/crocoite/cli.py
+++ b/crocoite/cli.py
@@ -26,8 +26,9 @@ import logging, argparse, json, sys
from . import behavior
from .controller import RecursiveController, defaultSettings, \
- ControllerSettings, DepthLimit, PrefixLimit
+ ControllerSettings, DepthLimit, PrefixLimit, StatsHandler
from .browser import NullService, ChromeService
+from .warc import WarcHandler
def parseRecursive (recursive, url):
if recursive is None:
@@ -41,6 +42,7 @@ def parseRecursive (recursive, url):
def main ():
parser = argparse.ArgumentParser(description='Save website to WARC using Google Chrome.')
+ parser.add_argument('--debug', help='Enable debug messages', action='store_true')
parser.add_argument('--browser', help='DevTools URL', metavar='URL')
parser.add_argument('--recursive', help='Follow links recursively')
parser.add_argument('--concurrency', '-j', type=int, default=1)
@@ -50,8 +52,8 @@ def main ():
parser.add_argument('--max-body-size', default=defaultSettings.maxBodySize, type=int, dest='maxBodySize', help='Max body size', metavar='BYTES')
parser.add_argument('--behavior', help='Comma-separated list of enabled behavior scripts',
dest='enabledBehaviorNames',
- default=list (behavior.availableNames),
- choices=list (behavior.availableNames))
+ default=list (behavior.availableMap.keys ()),
+ choices=list (behavior.availableMap.keys ()))
group = parser.add_mutually_exclusive_group (required=True)
group.add_argument('--output', help='WARC filename', metavar='FILE')
group.add_argument('--distributed', help='Use celery worker', action='store_true')
@@ -71,7 +73,8 @@ def main ():
recursive=args.recursive, concurrency=args.concurrency)
r = result.get ()
else:
- logging.basicConfig (level=logging.INFO)
+ level = logging.DEBUG if args.debug else logging.INFO
+ logging.basicConfig (level=level)
try:
recursionPolicy = parseRecursive (args.recursive, args.url)
@@ -84,9 +87,13 @@ def main ():
logBuffer=args.logBuffer, idleTimeout=args.idleTimeout,
timeout=args.timeout)
with open (args.output, 'wb') as fd:
+ handler = [StatsHandler (), WarcHandler (fd)]
+ b = list (map (lambda x: behavior.availableMap[x], args.enabledBehaviorNames))
controller = RecursiveController (args.url, fd, settings=settings,
- recursionPolicy=recursionPolicy, service=service)
- r = controller.run ()
+ recursionPolicy=recursionPolicy, service=service,
+ handler=handler, behavior=b)
+ controller.run ()
+ r = handler[0].stats
json.dump (r, sys.stdout)
return True
diff --git a/crocoite/controller.py b/crocoite/controller.py
index bc6f948..cdae268 100644
--- a/crocoite/controller.py
+++ b/crocoite/controller.py
@@ -35,99 +35,151 @@ class ControllerSettings:
defaultSettings = ControllerSettings ()
-import logging
-from urllib.parse import urlsplit, urlunsplit
+class EventHandler:
+ """ Abstract base class for event handler """
+
+ # this handler wants to know about exceptions before they are reraised by
+ # the controller
+ acceptException = False
+
+ def push (self, item):
+ raise NotImplementedError ()
+
+from .browser import BrowserCrashed
+
+class StatsHandler (EventHandler):
+ acceptException = True
-import pychrome
+ def __init__ (self):
+ self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0, 'crashed': 0}
+
+ def push (self, item):
+ if isinstance (item, Item):
+ self.stats['requests'] += 1
+ if item.failed:
+ self.stats['failed'] += 1
+ else:
+ self.stats['finished'] += 1
+ self.stats['bytesRcv'] += item.encodedDataLength
+ elif isinstance (item, BrowserCrashed):
+ self.stats['crashed'] += 1
+
+import logging, time
+from urllib.parse import urlsplit, urlunsplit
from . import behavior as cbehavior
-from .browser import ChromeService
-from .warc import WarcLoader, SerializingWARCWriter
+from .browser import ChromeService, SiteLoader, Item
from .util import getFormattedViewportMetrics
-def firstOrNone (it):
- """ Return first item of iterator it or None if empty """
- try:
- return next (it)
- except StopIteration:
- return None
+class ControllerStart:
+ def __init__ (self, payload):
+ self.payload = payload
class SinglePageController:
"""
Archive a single page url to file output.
+
+ Dispatches between producer (site loader and behavior scripts) and consumer
+ (stats, warc writer).
"""
def __init__ (self, url, output, service=ChromeService (), behavior=cbehavior.available, \
- logger=logging.getLogger(__name__), settings=defaultSettings):
+ logger=logging.getLogger(__name__), settings=defaultSettings, handler=[]):
self.url = url
self.output = output
self.service = service
self.behavior = behavior
self.settings = settings
self.logger = logger
+ self.handler = handler
+
+ def processItem (self, item):
+ if isinstance (item, Exception):
+ for h in self.handler:
+ if h.acceptException:
+ h.push (item)
+ raise item
+
+ for h in self.handler:
+ h.push (item)
def run (self):
- ret = {'stats': None, 'links': []}
-
- with self.service as browser:
- browser = pychrome.Browser (url=browser)
- writer = SerializingWARCWriter (self.output, gzip=True)
-
- with WarcLoader (browser, self.url, writer,
- logBuffer=self.settings.logBuffer,
- maxBodySize=self.settings.maxBodySize) as l:
- version = l.tab.Browser.getVersion ()
- payload = {
- 'software': __package__,
- 'browser': version['product'],
- 'useragent': version['userAgent'],
- 'viewport': getFormattedViewportMetrics (l.tab),
- }
- warcinfo = writer.create_warcinfo_record (filename=None, info=payload)
- writer.write_record (warcinfo)
-
- # not all behavior scripts are allowed for every URL, filter them
- enabledBehavior = list (filter (lambda x: self.url in x,
- map (lambda x: x (l), self.behavior)))
- linksBehavior = firstOrNone (filter (lambda x: isinstance (x, cbehavior.ExtractLinks),
- enabledBehavior))
-
- for b in enabledBehavior:
- self.logger.debug ('starting onload behavior {}'.format (b.name))
- b.onload ()
- l.start ()
-
- l.waitIdle (self.settings.idleTimeout, self.settings.timeout)
-
- for b in enabledBehavior:
- self.logger.debug ('starting onstop behavior {}'.format (b.name))
- b.onstop ()
-
- # if we stopped due to timeout, wait for remaining assets
- l.waitIdle (2, 60)
- l.stop ()
-
- for b in enabledBehavior:
- self.logger.debug ('starting onfinish behavior {}'.format (b.name))
- b.onfinish ()
-
- ret['stats'] = l.stats
- ret['links'] = linksBehavior.links if linksBehavior else None
- writer.flush ()
- return ret
-
-from collections import UserDict
-
-class IntegerDict (UserDict):
- """ Dict with dict/dict per-item arithmetic propagation, i.e. {1: 2}+{1: 1}={1: 3} """
- def __add__ (self, b):
- newdict = self.__class__ (self)
- for k, v in b.items ():
- if k in self:
- newdict[k] += v
- else:
- newdict[k] = v
- return newdict
+ def processQueue ():
+ # XXX: this is very ugly code and does not work well. figure out a
+ # better way to impose timeouts and still process all items in the
+ # queue
+ queue = l.queue
+ self.logger.debug ('processing at least {} queue items'.format (len (queue)))
+ while True:
+ now = time.time ()
+ elapsed = now-start
+ maxTimeout = max (min (self.settings.idleTimeout, self.settings.timeout-elapsed), 0)
+ self.logger.debug ('max timeout is {} with elapsed {}'.format (maxTimeout, elapsed))
+ # skip waiting if there is work to do. processes all items in
+ # queue, regardless of timeouts, i.e. you need to make sure the
+ # queue will actually be empty at some point.
+ if len (queue) == 0:
+ if not l.notify.wait (maxTimeout):
+ assert len (queue) == 0, "event must be sent"
+ # timed out
+ self.logger.debug ('timed out after {}'.format (elapsed))
+ break
+ else:
+ l.notify.clear ()
+ # limit number of items processed here, otherwise timeout won’t
+ # be checked frequently. this can happen if the site quickly
+ # loads a lot of items.
+ for i in range (1000):
+ try:
+ item = queue.popleft ()
+ self.logger.debug ('queue pop: {!r}, len now {}'.format (item, len (queue)))
+ except IndexError:
+ break
+ self.processItem (item)
+ if maxTimeout == 0:
+ break
+
+ with self.service as browser, SiteLoader (browser, self.url, logger=self.logger) as l:
+ start = time.time ()
+
+ version = l.tab.Browser.getVersion ()
+ payload = {
+ 'software': __package__,
+ 'browser': version['product'],
+ 'useragent': version['userAgent'],
+ 'viewport': getFormattedViewportMetrics (l.tab),
+ }
+ self.processItem (ControllerStart (payload))
+
+ # not all behavior scripts are allowed for every URL, filter them
+ enabledBehavior = list (filter (lambda x: self.url in x,
+ map (lambda x: x (l), self.behavior)))
+
+ for b in enabledBehavior:
+ self.logger.debug ('starting onload {}'.format (b))
+ # I decided against using the queue here to limit memory
+ # usage (screenshot behavior would put all images into
+ # queue before we could process them)
+ for item in b.onload ():
+ self.processItem (item)
+ l.start ()
+
+ processQueue ()
+
+ for b in enabledBehavior:
+ self.logger.debug ('starting onstop {}'.format (b))
+ for item in b.onstop ():
+ self.processItem (item)
+
+ # if we stopped due to timeout, wait for remaining assets
+ processQueue ()
+
+ for b in enabledBehavior:
+ self.logger.debug ('starting onfinish {}'.format (b))
+ for item in b.onfinish ():
+ self.processItem (item)
+
+ processQueue ()
class RecursionPolicy:
""" Abstract recursion policy """
@@ -172,7 +224,9 @@ def removeFragment (u):
s = urlsplit (u)
return urlunsplit ((s.scheme, s.netloc, s.path, s.query, ''))
-class RecursiveController:
+from .behavior import ExtractLinksEvent
+
+class RecursiveController (EventHandler):
"""
Simple recursive controller
@@ -181,7 +235,7 @@ class RecursiveController:
def __init__ (self, url, output, service=ChromeService (), behavior=cbehavior.available, \
logger=logging.getLogger(__name__), settings=defaultSettings,
- recursionPolicy=DepthLimit (0)):
+ recursionPolicy=DepthLimit (0), handler=[]):
self.url = url
self.output = output
self.service = service
@@ -189,37 +243,45 @@ class RecursiveController:
self.settings = settings
self.logger = logger
self.recursionPolicy = recursionPolicy
+ self.handler = handler
+ self.handler.append (self)
def fetch (self, urls):
"""
Overrideable fetch action for URLs. Defaults to sequential
SinglePageController.
"""
- result = []
for u in urls:
- c = SinglePageController (u, self.output, self.service,
- self.behavior, self.logger, self.settings)
- result.append (c.run ())
- return result
+ try:
+ c = SinglePageController (u, self.output, self.service,
+ self.behavior, self.logger, self.settings, self.handler)
+ c.run ()
+ except BrowserCrashed:
+ # this is fine if reported
+ self.logger.error ('browser crashed for {}'.format (u))
def run (self):
- have = set ()
- urls = set ([self.url])
- ret = {'stats': IntegerDict ()}
-
- while urls:
- self.logger.info ('retrieving {} urls'.format (len (urls)))
- result = self.fetch (urls)
-
- have.update (urls)
- urls = set ()
- for r in result:
- ret['stats'] += r['stats']
- urls.update (map (removeFragment, r['links']))
- urls.difference_update (have)
-
- urls = self.recursionPolicy (urls)
- # everything in ret must be serializeable
- ret['stats'] = dict (ret['stats'])
- return ret
+ self.have = set ()
+ self.urls = set ([self.url])
+
+ while self.urls:
+ self.logger.info ('retrieving {} urls'.format (len (self.urls)))
+
+ self.have.update (self.urls)
+ fetchurls = self.urls
+ self.urls = set ()
+
+ # handler appends new urls to self.urls through push()
+ self.fetch (fetchurls)
+
+ # remove urls we have and apply recursion policy
+ self.urls.difference_update (self.have)
+ self.urls = self.recursionPolicy (self.urls)
+
+ def push (self, item):
+ if isinstance (item, ExtractLinksEvent):
+ self.logger.debug ('adding extracted links: {}'.format (item.links))
+ self.urls.update (map (removeFragment, item.links))
+ else:
+ self.logger.debug ('{} got unhandled event {!r}'.format (self, item))
diff --git a/crocoite/task.py b/crocoite/task.py
index e93cfde..48fe5d8 100644
--- a/crocoite/task.py
+++ b/crocoite/task.py
@@ -38,10 +38,11 @@ _monkeyPatchSyncTasks ()
from celery import Celery
from celery.utils.log import get_task_logger
-from .browser import ChromeService
-from .controller import SinglePageController, ControllerSettings, RecursiveController, defaultSettings, DepthLimit, PrefixLimit
+from .browser import ChromeService, BrowserCrashed
+from .controller import SinglePageController, ControllerSettings, RecursiveController, defaultSettings, DepthLimit, PrefixLimit, StatsHandler
from . import behavior
from .cli import parseRecursive
+from .warc import WarcHandler
app = Celery ('crocoite.distributed')
app.config_from_object('celeryconfig')
@@ -77,16 +78,35 @@ def archive (self, url, settings, enabledBehaviorNames):
outPath = os.path.join (app.conf.temp_dir, outFile)
fd = open (outPath, 'wb')
+ handler = [StatsHandler (), WarcHandler (fd)]
enabledBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available))
settings = ControllerSettings (**settings)
- controller = SinglePageController (url, fd, behavior=enabledBehavior, settings=settings)
- ret = controller.run ()
+ try:
+ controller = SinglePageController (url, fd, behavior=enabledBehavior,
+ settings=settings, handler=handler)
+ controller.run ()
+ except BrowserCrashed:
+ # nothing we can do about that
+ logger.error ('browser crashed for {}'.format (url))
os.makedirs (app.conf.finished_dir, exist_ok=True)
outPath = os.path.join (app.conf.finished_dir, outFile)
os.rename (fd.name, outPath)
- return ret
+ return handler[0].stats
+
+from collections import UserDict
+
+class IntegerDict (UserDict):
+ """ Dict with dict/dict per-item arithmetic propagation, i.e. {1: 2}+{1: 1}={1: 3} """
+ def __add__ (self, b):
+ newdict = self.__class__ (self)
+ for k, v in b.items ():
+ if k in self:
+ newdict[k] += v
+ else:
+ newdict[k] = v
+ return newdict
class DistributedRecursiveController (RecursiveController):
""" Distributed, recursive controller using celery """
@@ -96,6 +116,7 @@ class DistributedRecursiveController (RecursiveController):
recursionPolicy=DepthLimit (0), concurrency=1):
super ().__init__ (url, None, service, behavior, logger, settings, recursionPolicy)
self.concurrency = concurrency
+ self.stats = IntegerDict ()
def fetch (self, urls):
def chunksIter (urls):
@@ -104,7 +125,8 @@ class DistributedRecursiveController (RecursiveController):
itemsPerTask = len (urls)//self.concurrency
if itemsPerTask <= 0:
itemsPerTask = len (urls)
- return chain.from_iterable (archive.chunks (chunksIter (urls), itemsPerTask).apply_async ().get ())
+ result = archive.chunks (chunksIter (urls), itemsPerTask).apply_async ().get ()
+ self.stats = sum (chain.from_iterable (result), self.stats)
@app.task(bind=True, track_started=True)
def controller (self, url, settings, enabledBehaviorNames, recursive, concurrency):
@@ -115,5 +137,7 @@ def controller (self, url, settings, enabledBehaviorNames, recursive, concurrenc
settings = ControllerSettings (**settings)
controller = DistributedRecursiveController (url, None, behavior=enabledBehavior,
settings=settings, recursionPolicy=recursionPolicy, concurrency=concurrency)
- return controller.run ()
+ controller.run ()
+ return dict (controller.stats)
+
diff --git a/crocoite/warc.py b/crocoite/warc.py
index 3fd65e4..fd8ce8d 100644
--- a/crocoite/warc.py
+++ b/crocoite/warc.py
@@ -24,7 +24,6 @@ Classes writing data to WARC files
import logging
import json
-from http.server import BaseHTTPRequestHandler
from io import BytesIO
from warcio.statusandheaders import StatusAndHeaders
from urllib.parse import urlsplit
@@ -36,95 +35,18 @@ from queue import Queue
from warcio.timeutils import datetime_to_iso_date
from warcio.warcwriter import WARCWriter
-from .browser import AccountingSiteLoader
from .util import packageUrl
-from .controller import defaultSettings
-
-class SerializingWARCWriter (WARCWriter):
- """
- Serializing WARC writer using separate writer thread and queue for
- non-blocking operation
-
- Needs an explicit .flush() before deletion.
- """
-
- def __init__ (self, filebuf, *args, **kwargs):
- WARCWriter.__init__ (self, filebuf, *args, **kwargs)
- self.queue = Queue ()
- self.thread = Thread (target=self._run_writer)
- self.thread.start ()
-
- def flush (self):
- self.queue.put (None)
- self.thread.join ()
- self.queue = None
- self.thread = None
-
- def _run_writer (self):
- while True:
- item = self.queue.get ()
- if not item:
- break
- out, record = item
- WARCWriter._write_warc_record (self, out, record)
-
- def _write_warc_record (self, out, record):
- self.queue.put ((out, record))
-
-class WARCLogHandler (BufferingHandler):
- """
- Buffered log handler, flushing to warcio
- """
+from .controller import defaultSettings, EventHandler, ControllerStart
+from .behavior import Script, DomSnapshotEvent, ScreenshotEvent
+from .browser import Item
- contentType = 'text/plain; charset=utf-8'
-
- def __init__ (self, capacity, warcfile):
- BufferingHandler.__init__ (self, capacity)
- self.warcfile = warcfile
-
- def flush (self):
- self.acquire ()
- try:
- if self.buffer:
- buf = ''
- for record in self.buffer:
- buf += self.format (record)
- buf += '\n'
- # XXX: record type?
- record = self.warcfile.create_warc_record (
- packageUrl ('log'), 'metadata',
- payload=BytesIO (buf.encode ('utf8')),
- warc_headers_dict={'Content-Type': self.contentType})
- self.warcfile.write_record(record)
- self.buffer = []
- finally:
- self.release ()
-
-class WarcLoader (AccountingSiteLoader):
- def __init__ (self, browser, url, writer,
+class WarcHandler (EventHandler):
+ def __init__ (self, fd,
logger=logging.getLogger(__name__),
- logBuffer=defaultSettings.logBuffer,
maxBodySize=defaultSettings.maxBodySize):
- super ().__init__ (browser, url, logger)
- self.writer = writer
+ self.logger = logger
+ self.writer = WARCWriter (fd, gzip=True)
self.maxBodySize = maxBodySize
- self.warcLogger = WARCLogHandler (logBuffer, writer)
- self.logger.addHandler (self.warcLogger)
-
- def __exit__ (self, exc_type, exc_value, traceback):
- self.logger.removeHandler (self.warcLogger)
- self.warcLogger.flush ()
- return super ().__exit__ (exc_type, exc_value, traceback)
-
- @staticmethod
- def getStatusText (response):
- text = response.get ('statusText')
- if text:
- return text
- text = BaseHTTPRequestHandler.responses.get (response['status'])
- if text:
- return text[0]
- return 'No status text available'
def _writeRequest (self, item):
writer = self.writer
@@ -154,13 +76,12 @@ class WarcLoader (AccountingSiteLoader):
return record.rec_headers['WARC-Record-ID']
- def _getBody (self, item, redirect):
+ def _getBody (self, item):
reqId = item.id
- resp = item.response
rawBody = b''
base64Encoded = False
- if redirect:
+ if item.isRedirect:
# redirects reuse the same request, thus we cannot safely retrieve
# the body (i.e getResponseBody may return the new location’s
# body). This is fine.
@@ -173,7 +94,7 @@ class WarcLoader (AccountingSiteLoader):
rawBody, base64Encoded = item.body
return rawBody, base64Encoded
- def _writeResponse (self, item, redirect, concurrentTo, rawBody, base64Encoded):
+ def _writeResponse (self, item, concurrentTo, rawBody, base64Encoded):
writer = self.writer
reqId = item.id
resp = item.response
@@ -192,7 +113,7 @@ class WarcLoader (AccountingSiteLoader):
}
httpHeaders = StatusAndHeaders('{} {}'.format (resp['status'],
- self.getStatusText (resp)), item.responseHeaders,
+ item.statusText), item.responseHeaders,
protocol='HTTP/1.1')
# Content is saved decompressed and decoded, remove these headers
@@ -216,21 +137,63 @@ class WarcLoader (AccountingSiteLoader):
http_headers=httpHeaders)
writer.write_record(record)
- def loadingFinished (self, item, redirect=False):
- super ().loadingFinished (item, redirect)
-
+ def _writeScript (self, item):
writer = self.writer
-
- req = item.request
- reqId = item.id
- resp = item.response
- url = urlsplit (resp['url'])
-
+ encoding = 'utf-8'
+ record = writer.create_warc_record (packageUrl ('script/{}'.format (item.path)), 'metadata',
+ payload=BytesIO (str (item).encode (encoding)),
+ warc_headers_dict={'Content-Type': 'application/javascript; charset={}'.format (encoding)})
+ writer.write_record (record)
+
+ def _writeItem (self, item):
+ if item.failed:
+ # should have been handled by the logger already
+ return
try:
# write neither request nor response if we cannot retrieve the body
- rawBody, base64Encoded = self._getBody (item, redirect)
+ rawBody, base64Encoded = self._getBody (item)
concurrentTo = self._writeRequest (item)
- self._writeResponse (item, redirect, concurrentTo, rawBody, base64Encoded)
+ self._writeResponse (item, concurrentTo, rawBody, base64Encoded)
except ValueError as e:
self.logger.error (e.args[0])
+ def _writeDomSnapshot (self, item):
+ writer = self.writer
+ httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1')
+ record = writer.create_warc_record (item.url, 'response',
+ payload=BytesIO (item.document),
+ http_headers=httpHeaders,
+ warc_headers_dict={'X-DOM-Snapshot': str (True),
+ 'X-Chrome-Viewport': item.viewport})
+ writer.write_record (record)
+
+ def _writeScreenshot (self, item):
+ writer = self.writer
+ url = packageUrl ('screenshot-{}-{}.png'.format (0, item.yoff))
+ record = writer.create_warc_record (url, 'resource',
+ payload=BytesIO (item.data), warc_headers_dict={'Content-Type': 'image/png'})
+ writer.write_record (record)
+
+ def _writeControllerStart (self, item):
+ writer = self.writer
+ warcinfo = writer.create_warcinfo_record (filename=None, info=item.payload)
+ writer.write_record (warcinfo)
+
+ route = {Script: _writeScript,
+ Item: _writeItem,
+ DomSnapshotEvent: _writeDomSnapshot,
+ ScreenshotEvent: _writeScreenshot,
+ ControllerStart: _writeControllerStart,
+ }
+
+ def push (self, item):
+ processed = False
+ for k, v in self.route.items ():
+ if isinstance (item, k):
+ v (self, item)
+ processed = True
+ break
+
+ if not processed:
+ self.logger.debug ('unknown event {}'.format (repr (item)))
+