From 7730e0d64ec895091a0dd7eb0e3c6ce2ed02d981 Mon Sep 17 00:00:00 2001 From: Lars-Dominik Braun Date: Wed, 20 Jun 2018 11:13:37 +0200 Subject: Synchronous SiteLoader event handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously a browser crash stalled the entire grab, since events from pychrome were handled asynchronously in a different thread and exceptions were not propagated to the main thread. Now all browser events are stored in a queue and processed by the main thread, allowing us to handle browser crashes gracefully (more or less). This made the following additional changes necessary: - Clear separation between producer (browser) and consumer (WARC, stats, …) - Behavior scripts now yield events as well, instead of accessing the WARC writer - WARC logging was removed (for now) and WARC writer does not require serialization any more --- contrib/celerycrocoite.py | 9 +- crocoite/behavior.py | 124 +++++++++----- crocoite/browser.py | 417 ++++++++++++++++++---------------------------- crocoite/cli.py | 19 ++- crocoite/controller.py | 260 ++++++++++++++++++----------- crocoite/task.py | 38 ++++- crocoite/warc.py | 165 +++++++----------- 7 files changed, 518 insertions(+), 514 deletions(-) diff --git a/contrib/celerycrocoite.py b/contrib/celerycrocoite.py index 26c35ce..d0a02e9 100644 --- a/contrib/celerycrocoite.py +++ b/contrib/celerycrocoite.py @@ -88,10 +88,9 @@ def checkCompletedJobs (bot, jobs): if Identifier (channel) not in bot.channels: continue try: - result = handle.get (timeout=0.1) - stats = result['stats'] - bot.msg (channel, '{}: {} ({}) finished. {} requests, {} failed, {} received.'.format (user, url, - handle.id, stats['requests'], stats['failed'], + stats = handle.get (timeout=0.1) + bot.msg (channel, '{}: {} ({}) finished. {} crashed, {} requests, {} failed, {} received.'.format (user, url, + handle.id, stats['crashed'], stats['requests'], stats['failed'], prettyBytes (stats['bytesRcv']))) delete.add (handle.id) except celery.exceptions.TimeoutError: @@ -198,7 +197,7 @@ def archive (bot, trigger): logBuffer=defaultSettings.logBuffer, idleTimeout=args.idleTimeout, timeout=args.timeout) args = dict (url=args.url, - enabledBehaviorNames=list (behavior.availableNames-blacklistedBehavior), + enabledBehaviorNames=list (set (behavior.availableMap.keys())-blacklistedBehavior), settings=settings, recursive=args.recursive, concurrency=args.concurrency) q = bot.memory['crocoite']['q'] diff --git a/crocoite/behavior.py b/crocoite/behavior.py index 95e8160..fd8fff8 100644 --- a/crocoite/behavior.py +++ b/crocoite/behavior.py @@ -22,21 +22,44 @@ Generic and per-site behavior scripts """ -import logging +import logging, time from io import BytesIO from urllib.parse import urlsplit import os.path import pkg_resources from base64 import b64decode +from collections import OrderedDict + +from html5lib.serializer import HTMLSerializer +from warcio.statusandheaders import StatusAndHeaders +from pychrome.exceptions import TimeoutException from .util import randomString, packageUrl, getFormattedViewportMetrics from . import html from .html import StripAttributeFilter, StripTagFilter, ChromeTreeWalker -from html5lib.serializer import HTMLSerializer -from warcio.statusandheaders import StatusAndHeaders +from .browser import SiteLoader logger = logging.getLogger(__name__) +class Script: + """ A JavaScript resource """ + def __init__ (self, path=None, encoding='utf-8'): + self.path = path + if path: + self.data = pkg_resources.resource_string (__name__, os.path.join ('data', path)).decode (encoding) + + def __repr__ (self): + return '' +class TestItem (Item): + """ This should be as close to Item as possible """ + + base = 'http://localhost:8000/' + + def __init__ (self, path, status, headers, bodyReceive, bodySend=None): + super ().__init__ (tab=None) + self.chromeResponse = {'response': {'headers': headers, 'status': status, 'url': self.base + path}} + self._body = bodyReceive, False + self.bodySend = bodyReceive if not bodySend else bodySend + @property + def body (self): + return self._body + +testItems = [ + TestItem ('binary', 200, {'Content-Type': 'application/octet-stream'}, b'\x00\x01\x02'), + TestItem ('attachment', 200, + {'Content-Type': 'text/plain; charset=utf-8', + 'Content-Disposition': 'attachment; filename="attachment.txt"', + }, + 'This is a simple text file with umlauts. ÄÖU.'.encode ('utf8')), + TestItem ('encoding/utf8', 200, {'Content-Type': 'text/plain; charset=utf-8'}, + 'This is a test, äöü μνψκ ¥¥¥¿ýý¡'.encode ('utf8')), + TestItem ('encoding/iso88591', 200, {'Content-Type': 'text/plain; charset=ISO-8859-1'}, + 'This is a test, äöü.'.encode ('utf8'), + 'This is a test, äöü.'.encode ('ISO-8859-1')), + TestItem ('encoding/latin1', 200, {'Content-Type': 'text/plain; charset=latin1'}, + 'This is a test, äöü.'.encode ('utf8'), + 'This is a test, äöü.'.encode ('latin1')), + TestItem ('image', 200, {'Content-Type': 'image/png'}, + # 1×1 png image + b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x00\x00\x00\x00:~\x9bU\x00\x00\x00\nIDAT\x08\x1dc\xf8\x0f\x00\x01\x01\x01\x006_g\x80\x00\x00\x00\x00IEND\xaeB`\x82'), + TestItem ('empty', 200, {}, b''), + TestItem ('redirect/301/empty', 301, {'Location': '/empty'}, b''), + TestItem ('redirect/301/redirect/301/empty', 301, {'Location': '/redirect/301/empty'}, b''), + TestItem ('nonexistent', 404, {}, b''), + TestItem ('html', 200, {'Content-Type': 'html'}, + ''.encode ('utf8')), + TestItem ('html/alert', 200, {'Content-Type': 'html'}, + ''.encode ('utf8')), + ] +testItemMap = dict ([(item.parsedUrl.path, item) for item in testItems]) + +class TestHTTPRequestHandler (BaseHTTPRequestHandler): def do_GET(self): - path = self.path - if path.startswith ('/redirect/301'): - self.send_response(301) - self.send_header ('Location', path[13:]) + item = testItemMap.get (self.path) + if item: + self.send_response (item.response['status']) + for k, v in item.response['headers'].items (): + self.send_header (k, v) + body = item.bodySend + self.send_header ('Content-Length', len (body)) self.end_headers() - elif path == '/empty': - self.send_response (200) - self.end_headers () - elif path.startswith ('/encoding'): - # send text data with different encodings - _, _, encoding = path.split ('/', 3) - self.send_response (200) - self.send_header ('Content-Type', 'text/plain; charset={}'.format (encoding)) - self.end_headers () - self.wfile.write (self.encodingTestString[encoding].encode (encoding)) - elif path == '/binary': - # send binary data - self.send_response (200) - self.send_header ('Content-Type', 'application/octet-stream') - self.send_header ('Content-Length', len (self.binaryTestData)) - self.end_headers () - self.wfile.write (self.binaryTestData) - elif path == '/image': - # send binary data - self.send_response (200) - self.send_header ('Content-Type', 'image/png') - self.end_headers () - self.wfile.write (self.imageTestData) - elif path == '/attachment': - self.send_response (200) - self.send_header ('Content-Type', 'text/plain; charset=utf-8') - self.send_header ('Content-Disposition', 'attachment; filename="attachment.txt"') - self.end_headers () - self.wfile.write (self.encodingTestString['utf-8'].encode ('utf-8')) - elif path == '/html': - self.send_response (200) - self.send_header ('Content-Type', 'text/html; charset=utf-8') - self.end_headers () - self.wfile.write (self.htmlTestData.encode ('utf-8')) - elif path == '/alert': - self.send_response (200) - self.send_header ('Content-Type', 'text/html; charset=utf-8') - self.end_headers () - self.wfile.write (self.alertData.encode ('utf-8')) - else: - self.send_response (404) - self.end_headers () - + self.wfile.write (body) + return + def log_message (self, format, *args): pass @@ -485,144 +460,82 @@ def startServer (): httpd = http.server.HTTPServer (("localhost", PORT), TestHTTPRequestHandler) httpd.serve_forever() -class TestSiteLoaderAdapter (SiteLoader): - def __init__ (self, browser, url): - SiteLoader.__init__ (self, browser, url) - self.finished = [] - - def loadingFinished (self, item, redirect=False): - self.finished.append (item) - class TestSiteLoader (unittest.TestCase): def setUp (self): from multiprocessing import Process self.server = Process (target=startServer) self.server.start () - self.baseurl = 'http://localhost:8000/' + self.baseurl = 'http://localhost:8000' self.service = ChromeService () - browserUrl = self.service.__enter__ () - self.browser = pychrome.Browser(url=browserUrl) + self.browser = self.service.__enter__ () def buildAdapter (self, path): - return TestSiteLoaderAdapter (self.browser, '{}{}'.format (self.baseurl, path)) - - def assertUrls (self, l, expect): - urls = set (map (lambda x: x.parsedUrl.path, l.finished)) - expect = set (expect) - self.assertEqual (urls, expect) - - def test_wait (self): - waittime = 2 - with self.buildAdapter ('empty') as l: + self.assertTrue (path.startswith ('/')) + return SiteLoader (self.browser, '{}{}'.format (self.baseurl, path)) + + def assertItems (self, l, items): + items = dict ([(i.parsedUrl.path, i) for i in items]) + timeout = 5 + while True: + if not l.notify.wait (timeout) and len (items) > 0: + self.fail ('timeout') + if len (l.queue) > 0: + item = l.queue.popleft () + if isinstance (item, Exception): + raise item + self.assertIsNot (item.chromeResponse, None, msg='url={}'.format (item.request['url'])) + golden = items.pop (item.parsedUrl.path) + if not golden: + self.fail ('url {} not supposed to be fetched'.format (item.url)) + self.assertEqual (item.body[0], golden.body[0], msg='body for url={}'.format (item.request['url'])) + self.assertEqual (item.response['status'], golden.response['status']) + for k, v in golden.responseHeaders: + actual = list (map (itemgetter (1), filter (lambda x: x[0] == k, item.responseHeaders))) + self.assertIn (v, actual) + + # check queue at least once + if not items: + break + + def assertLiteralItem (self, item, deps=[]): + with self.buildAdapter (item.parsedUrl.path) as l: l.start () - before = time.time () - l.wait (waittime) - after = time.time () - self.assertTrue ((after-before) >= waittime) + self.assertItems (l, [item] + deps) def test_empty (self): - with self.buildAdapter ('empty') as l: - l.start () - l.waitIdle () - self.assertEqual (len (l.finished), 1) + self.assertLiteralItem (testItemMap['/empty']) - def test_redirect301 (self): - with self.buildAdapter ('redirect/301/empty') as l: - l.start () - l.waitIdle () - self.assertEqual (len (l.finished), 2) - self.assertUrls (l, ['/redirect/301/empty', '/empty']) - for item in l.finished: - if item.parsedUrl.path == '/empty': - self.assertEqual (item.response['status'], 200) - self.assertEqual (item.body[0], b'') - elif item.parsedUrl.path == '/redirect/301/empty': - self.assertEqual (item.response['status'], 301) - else: - self.fail ('unknown url') - - def test_redirect301multi (self): - with self.buildAdapter ('redirect/301/redirect/301/empty') as l: - l.start () - l.waitIdle () - self.assertEqual (len (l.finished), 3) - self.assertUrls (l, ['/redirect/301/redirect/301/empty', '/redirect/301/empty', '/empty']) - for item in l.finished: - if item.parsedUrl.path == '/empty': - self.assertEqual (item.response['status'], 200) - self.assertEqual (item.body[0], b'') - elif item.parsedUrl.path in {'/redirect/301/empty', \ - '/redirect/301/redirect/301/empty'}: - self.assertEqual (item.response['status'], 301) - else: - self.fail ('unknown url') + def test_redirect (self): + self.assertLiteralItem (testItemMap['/redirect/301/empty'], [testItemMap['/empty']]) + # chained redirects + self.assertLiteralItem (testItemMap['/redirect/301/redirect/301/empty'], [testItemMap['/redirect/301/empty'], testItemMap['/empty']]) def test_encoding (self): """ Text responses are transformed to UTF-8. Make sure this works correctly. """ - for encoding, expected in TestHTTPRequestHandler.encodingTestString.items (): - with self.buildAdapter ('encoding/{}'.format (encoding)) as l: - l.start () - l.waitIdle () - self.assertEqual (len (l.finished), 1) - self.assertUrls (l, ['/encoding/{}'.format (encoding)]) - self.assertEqual (l.finished[0].body[0], expected.encode ('utf8')) + for item in {testItemMap['/encoding/utf8'], testItemMap['/encoding/latin1'], testItemMap['/encoding/iso88591']}: + self.assertLiteralItem (item) def test_binary (self): """ Browser should ignore content it cannot display (i.e. octet-stream) """ - with self.buildAdapter ('binary') as l: + with self.buildAdapter ('/binary') as l: l.start () - l.waitIdle () - self.assertEqual (len (l.finished), 0) + self.assertItems (l, []) def test_image (self): """ Images should be displayed inline """ - with self.buildAdapter ('image') as l: - l.start () - l.waitIdle () - self.assertEqual (len (l.finished), 1) - self.assertUrls (l, ['/image']) - self.assertEqual (l.finished[0].body[0], TestHTTPRequestHandler.imageTestData) + self.assertLiteralItem (testItemMap['/image']) def test_attachment (self): - """ And downloads won’t work in headless mode """ - with self.buildAdapter ('attachment') as l: + """ And downloads won’t work in headless mode, even if it’s just a text file """ + with self.buildAdapter ('/attachment') as l: l.start () - l.waitIdle () - self.assertEqual (len (l.finished), 0) + self.assertItems (l, []) def test_html (self): - with self.buildAdapter ('html') as l: - l.start () - l.waitIdle () - self.assertEqual (len (l.finished), 3) - self.assertUrls (l, ['/html', '/image', '/nonexistent']) - for item in l.finished: - if item.parsedUrl.path == '/html': - self.assertEqual (item.response['status'], 200) - self.assertEqual (item.body[0], TestHTTPRequestHandler.htmlTestData.encode ('utf-8')) - elif item.parsedUrl.path == '/image': - self.assertEqual (item.response['status'], 200) - self.assertEqual (item.body[0], TestHTTPRequestHandler.imageTestData) - elif item.parsedUrl.path == '/nonexistent': - self.assertEqual (item.response['status'], 404) - else: - self.fail ('unknown url') - - def test_alert (self): - with self.buildAdapter ('alert') as l: - l.start () - l.waitIdle () - self.assertUrls (l, ['/alert', '/image']) - for item in l.finished: - if item.parsedUrl.path == '/alert': - self.assertEqual (item.response['status'], 200) - self.assertEqual (item.body[0], TestHTTPRequestHandler.alertData.encode ('utf-8')) - elif item.parsedUrl.path == '/image': - self.assertEqual (item.response['status'], 200) - self.assertEqual (item.body[0], TestHTTPRequestHandler.imageTestData) - else: - self.fail ('unknown url') + self.assertLiteralItem (testItemMap['/html'], [testItemMap['/image'], testItemMap['/nonexistent']]) + # make sure alerts are dismissed correctly (image won’t load otherwise) + self.assertLiteralItem (testItemMap['/html/alert'], [testItemMap['/image']]) def tearDown (self): self.service.__exit__ (None, None, None) diff --git a/crocoite/cli.py b/crocoite/cli.py index f6454da..d631f10 100644 --- a/crocoite/cli.py +++ b/crocoite/cli.py @@ -26,8 +26,9 @@ import logging, argparse, json, sys from . import behavior from .controller import RecursiveController, defaultSettings, \ - ControllerSettings, DepthLimit, PrefixLimit + ControllerSettings, DepthLimit, PrefixLimit, StatsHandler from .browser import NullService, ChromeService +from .warc import WarcHandler def parseRecursive (recursive, url): if recursive is None: @@ -41,6 +42,7 @@ def parseRecursive (recursive, url): def main (): parser = argparse.ArgumentParser(description='Save website to WARC using Google Chrome.') + parser.add_argument('--debug', help='Enable debug messages', action='store_true') parser.add_argument('--browser', help='DevTools URL', metavar='URL') parser.add_argument('--recursive', help='Follow links recursively') parser.add_argument('--concurrency', '-j', type=int, default=1) @@ -50,8 +52,8 @@ def main (): parser.add_argument('--max-body-size', default=defaultSettings.maxBodySize, type=int, dest='maxBodySize', help='Max body size', metavar='BYTES') parser.add_argument('--behavior', help='Comma-separated list of enabled behavior scripts', dest='enabledBehaviorNames', - default=list (behavior.availableNames), - choices=list (behavior.availableNames)) + default=list (behavior.availableMap.keys ()), + choices=list (behavior.availableMap.keys ())) group = parser.add_mutually_exclusive_group (required=True) group.add_argument('--output', help='WARC filename', metavar='FILE') group.add_argument('--distributed', help='Use celery worker', action='store_true') @@ -71,7 +73,8 @@ def main (): recursive=args.recursive, concurrency=args.concurrency) r = result.get () else: - logging.basicConfig (level=logging.INFO) + level = logging.DEBUG if args.debug else logging.INFO + logging.basicConfig (level=level) try: recursionPolicy = parseRecursive (args.recursive, args.url) @@ -84,9 +87,13 @@ def main (): logBuffer=args.logBuffer, idleTimeout=args.idleTimeout, timeout=args.timeout) with open (args.output, 'wb') as fd: + handler = [StatsHandler (), WarcHandler (fd)] + b = list (map (lambda x: behavior.availableMap[x], args.enabledBehaviorNames)) controller = RecursiveController (args.url, fd, settings=settings, - recursionPolicy=recursionPolicy, service=service) - r = controller.run () + recursionPolicy=recursionPolicy, service=service, + handler=handler, behavior=b) + controller.run () + r = handler[0].stats json.dump (r, sys.stdout) return True diff --git a/crocoite/controller.py b/crocoite/controller.py index bc6f948..cdae268 100644 --- a/crocoite/controller.py +++ b/crocoite/controller.py @@ -35,99 +35,151 @@ class ControllerSettings: defaultSettings = ControllerSettings () -import logging -from urllib.parse import urlsplit, urlunsplit +class EventHandler: + """ Abstract base class for event handler """ + + # this handler wants to know about exceptions before they are reraised by + # the controller + acceptException = False + + def push (self, item): + raise NotImplementedError () + +from .browser import BrowserCrashed + +class StatsHandler (EventHandler): + acceptException = True -import pychrome + def __init__ (self): + self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0, 'crashed': 0} + + def push (self, item): + if isinstance (item, Item): + self.stats['requests'] += 1 + if item.failed: + self.stats['failed'] += 1 + else: + self.stats['finished'] += 1 + self.stats['bytesRcv'] += item.encodedDataLength + elif isinstance (item, BrowserCrashed): + self.stats['crashed'] += 1 + +import logging, time +from urllib.parse import urlsplit, urlunsplit from . import behavior as cbehavior -from .browser import ChromeService -from .warc import WarcLoader, SerializingWARCWriter +from .browser import ChromeService, SiteLoader, Item from .util import getFormattedViewportMetrics -def firstOrNone (it): - """ Return first item of iterator it or None if empty """ - try: - return next (it) - except StopIteration: - return None +class ControllerStart: + def __init__ (self, payload): + self.payload = payload class SinglePageController: """ Archive a single page url to file output. + + Dispatches between producer (site loader and behavior scripts) and consumer + (stats, warc writer). """ def __init__ (self, url, output, service=ChromeService (), behavior=cbehavior.available, \ - logger=logging.getLogger(__name__), settings=defaultSettings): + logger=logging.getLogger(__name__), settings=defaultSettings, handler=[]): self.url = url self.output = output self.service = service self.behavior = behavior self.settings = settings self.logger = logger + self.handler = handler + + def processItem (self, item): + if isinstance (item, Exception): + for h in self.handler: + if h.acceptException: + h.push (item) + raise item + + for h in self.handler: + h.push (item) def run (self): - ret = {'stats': None, 'links': []} - - with self.service as browser: - browser = pychrome.Browser (url=browser) - writer = SerializingWARCWriter (self.output, gzip=True) - - with WarcLoader (browser, self.url, writer, - logBuffer=self.settings.logBuffer, - maxBodySize=self.settings.maxBodySize) as l: - version = l.tab.Browser.getVersion () - payload = { - 'software': __package__, - 'browser': version['product'], - 'useragent': version['userAgent'], - 'viewport': getFormattedViewportMetrics (l.tab), - } - warcinfo = writer.create_warcinfo_record (filename=None, info=payload) - writer.write_record (warcinfo) - - # not all behavior scripts are allowed for every URL, filter them - enabledBehavior = list (filter (lambda x: self.url in x, - map (lambda x: x (l), self.behavior))) - linksBehavior = firstOrNone (filter (lambda x: isinstance (x, cbehavior.ExtractLinks), - enabledBehavior)) - - for b in enabledBehavior: - self.logger.debug ('starting onload behavior {}'.format (b.name)) - b.onload () - l.start () - - l.waitIdle (self.settings.idleTimeout, self.settings.timeout) - - for b in enabledBehavior: - self.logger.debug ('starting onstop behavior {}'.format (b.name)) - b.onstop () - - # if we stopped due to timeout, wait for remaining assets - l.waitIdle (2, 60) - l.stop () - - for b in enabledBehavior: - self.logger.debug ('starting onfinish behavior {}'.format (b.name)) - b.onfinish () - - ret['stats'] = l.stats - ret['links'] = linksBehavior.links if linksBehavior else None - writer.flush () - return ret - -from collections import UserDict - -class IntegerDict (UserDict): - """ Dict with dict/dict per-item arithmetic propagation, i.e. {1: 2}+{1: 1}={1: 3} """ - def __add__ (self, b): - newdict = self.__class__ (self) - for k, v in b.items (): - if k in self: - newdict[k] += v - else: - newdict[k] = v - return newdict + def processQueue (): + # XXX: this is very ugly code and does not work well. figure out a + # better way to impose timeouts and still process all items in the + # queue + queue = l.queue + self.logger.debug ('processing at least {} queue items'.format (len (queue))) + while True: + now = time.time () + elapsed = now-start + maxTimeout = max (min (self.settings.idleTimeout, self.settings.timeout-elapsed), 0) + self.logger.debug ('max timeout is {} with elapsed {}'.format (maxTimeout, elapsed)) + # skip waiting if there is work to do. processes all items in + # queue, regardless of timeouts, i.e. you need to make sure the + # queue will actually be empty at some point. + if len (queue) == 0: + if not l.notify.wait (maxTimeout): + assert len (queue) == 0, "event must be sent" + # timed out + self.logger.debug ('timed out after {}'.format (elapsed)) + break + else: + l.notify.clear () + # limit number of items processed here, otherwise timeout won’t + # be checked frequently. this can happen if the site quickly + # loads a lot of items. + for i in range (1000): + try: + item = queue.popleft () + self.logger.debug ('queue pop: {!r}, len now {}'.format (item, len (queue))) + except IndexError: + break + self.processItem (item) + if maxTimeout == 0: + break + + with self.service as browser, SiteLoader (browser, self.url, logger=self.logger) as l: + start = time.time () + + version = l.tab.Browser.getVersion () + payload = { + 'software': __package__, + 'browser': version['product'], + 'useragent': version['userAgent'], + 'viewport': getFormattedViewportMetrics (l.tab), + } + self.processItem (ControllerStart (payload)) + + # not all behavior scripts are allowed for every URL, filter them + enabledBehavior = list (filter (lambda x: self.url in x, + map (lambda x: x (l), self.behavior))) + + for b in enabledBehavior: + self.logger.debug ('starting onload {}'.format (b)) + # I decided against using the queue here to limit memory + # usage (screenshot behavior would put all images into + # queue before we could process them) + for item in b.onload (): + self.processItem (item) + l.start () + + processQueue () + + for b in enabledBehavior: + self.logger.debug ('starting onstop {}'.format (b)) + for item in b.onstop (): + self.processItem (item) + + # if we stopped due to timeout, wait for remaining assets + processQueue () + + for b in enabledBehavior: + self.logger.debug ('starting onfinish {}'.format (b)) + for item in b.onfinish (): + self.processItem (item) + + processQueue () class RecursionPolicy: """ Abstract recursion policy """ @@ -172,7 +224,9 @@ def removeFragment (u): s = urlsplit (u) return urlunsplit ((s.scheme, s.netloc, s.path, s.query, '')) -class RecursiveController: +from .behavior import ExtractLinksEvent + +class RecursiveController (EventHandler): """ Simple recursive controller @@ -181,7 +235,7 @@ class RecursiveController: def __init__ (self, url, output, service=ChromeService (), behavior=cbehavior.available, \ logger=logging.getLogger(__name__), settings=defaultSettings, - recursionPolicy=DepthLimit (0)): + recursionPolicy=DepthLimit (0), handler=[]): self.url = url self.output = output self.service = service @@ -189,37 +243,45 @@ class RecursiveController: self.settings = settings self.logger = logger self.recursionPolicy = recursionPolicy + self.handler = handler + self.handler.append (self) def fetch (self, urls): """ Overrideable fetch action for URLs. Defaults to sequential SinglePageController. """ - result = [] for u in urls: - c = SinglePageController (u, self.output, self.service, - self.behavior, self.logger, self.settings) - result.append (c.run ()) - return result + try: + c = SinglePageController (u, self.output, self.service, + self.behavior, self.logger, self.settings, self.handler) + c.run () + except BrowserCrashed: + # this is fine if reported + self.logger.error ('browser crashed for {}'.format (u)) def run (self): - have = set () - urls = set ([self.url]) - ret = {'stats': IntegerDict ()} - - while urls: - self.logger.info ('retrieving {} urls'.format (len (urls))) - result = self.fetch (urls) - - have.update (urls) - urls = set () - for r in result: - ret['stats'] += r['stats'] - urls.update (map (removeFragment, r['links'])) - urls.difference_update (have) - - urls = self.recursionPolicy (urls) - # everything in ret must be serializeable - ret['stats'] = dict (ret['stats']) - return ret + self.have = set () + self.urls = set ([self.url]) + + while self.urls: + self.logger.info ('retrieving {} urls'.format (len (self.urls))) + + self.have.update (self.urls) + fetchurls = self.urls + self.urls = set () + + # handler appends new urls to self.urls through push() + self.fetch (fetchurls) + + # remove urls we have and apply recursion policy + self.urls.difference_update (self.have) + self.urls = self.recursionPolicy (self.urls) + + def push (self, item): + if isinstance (item, ExtractLinksEvent): + self.logger.debug ('adding extracted links: {}'.format (item.links)) + self.urls.update (map (removeFragment, item.links)) + else: + self.logger.debug ('{} got unhandled event {!r}'.format (self, item)) diff --git a/crocoite/task.py b/crocoite/task.py index e93cfde..48fe5d8 100644 --- a/crocoite/task.py +++ b/crocoite/task.py @@ -38,10 +38,11 @@ _monkeyPatchSyncTasks () from celery import Celery from celery.utils.log import get_task_logger -from .browser import ChromeService -from .controller import SinglePageController, ControllerSettings, RecursiveController, defaultSettings, DepthLimit, PrefixLimit +from .browser import ChromeService, BrowserCrashed +from .controller import SinglePageController, ControllerSettings, RecursiveController, defaultSettings, DepthLimit, PrefixLimit, StatsHandler from . import behavior from .cli import parseRecursive +from .warc import WarcHandler app = Celery ('crocoite.distributed') app.config_from_object('celeryconfig') @@ -77,16 +78,35 @@ def archive (self, url, settings, enabledBehaviorNames): outPath = os.path.join (app.conf.temp_dir, outFile) fd = open (outPath, 'wb') + handler = [StatsHandler (), WarcHandler (fd)] enabledBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available)) settings = ControllerSettings (**settings) - controller = SinglePageController (url, fd, behavior=enabledBehavior, settings=settings) - ret = controller.run () + try: + controller = SinglePageController (url, fd, behavior=enabledBehavior, + settings=settings, handler=handler) + controller.run () + except BrowserCrashed: + # nothing we can do about that + logger.error ('browser crashed for {}'.format (url)) os.makedirs (app.conf.finished_dir, exist_ok=True) outPath = os.path.join (app.conf.finished_dir, outFile) os.rename (fd.name, outPath) - return ret + return handler[0].stats + +from collections import UserDict + +class IntegerDict (UserDict): + """ Dict with dict/dict per-item arithmetic propagation, i.e. {1: 2}+{1: 1}={1: 3} """ + def __add__ (self, b): + newdict = self.__class__ (self) + for k, v in b.items (): + if k in self: + newdict[k] += v + else: + newdict[k] = v + return newdict class DistributedRecursiveController (RecursiveController): """ Distributed, recursive controller using celery """ @@ -96,6 +116,7 @@ class DistributedRecursiveController (RecursiveController): recursionPolicy=DepthLimit (0), concurrency=1): super ().__init__ (url, None, service, behavior, logger, settings, recursionPolicy) self.concurrency = concurrency + self.stats = IntegerDict () def fetch (self, urls): def chunksIter (urls): @@ -104,7 +125,8 @@ class DistributedRecursiveController (RecursiveController): itemsPerTask = len (urls)//self.concurrency if itemsPerTask <= 0: itemsPerTask = len (urls) - return chain.from_iterable (archive.chunks (chunksIter (urls), itemsPerTask).apply_async ().get ()) + result = archive.chunks (chunksIter (urls), itemsPerTask).apply_async ().get () + self.stats = sum (chain.from_iterable (result), self.stats) @app.task(bind=True, track_started=True) def controller (self, url, settings, enabledBehaviorNames, recursive, concurrency): @@ -115,5 +137,7 @@ def controller (self, url, settings, enabledBehaviorNames, recursive, concurrenc settings = ControllerSettings (**settings) controller = DistributedRecursiveController (url, None, behavior=enabledBehavior, settings=settings, recursionPolicy=recursionPolicy, concurrency=concurrency) - return controller.run () + controller.run () + return dict (controller.stats) + diff --git a/crocoite/warc.py b/crocoite/warc.py index 3fd65e4..fd8ce8d 100644 --- a/crocoite/warc.py +++ b/crocoite/warc.py @@ -24,7 +24,6 @@ Classes writing data to WARC files import logging import json -from http.server import BaseHTTPRequestHandler from io import BytesIO from warcio.statusandheaders import StatusAndHeaders from urllib.parse import urlsplit @@ -36,95 +35,18 @@ from queue import Queue from warcio.timeutils import datetime_to_iso_date from warcio.warcwriter import WARCWriter -from .browser import AccountingSiteLoader from .util import packageUrl -from .controller import defaultSettings - -class SerializingWARCWriter (WARCWriter): - """ - Serializing WARC writer using separate writer thread and queue for - non-blocking operation - - Needs an explicit .flush() before deletion. - """ - - def __init__ (self, filebuf, *args, **kwargs): - WARCWriter.__init__ (self, filebuf, *args, **kwargs) - self.queue = Queue () - self.thread = Thread (target=self._run_writer) - self.thread.start () - - def flush (self): - self.queue.put (None) - self.thread.join () - self.queue = None - self.thread = None - - def _run_writer (self): - while True: - item = self.queue.get () - if not item: - break - out, record = item - WARCWriter._write_warc_record (self, out, record) - - def _write_warc_record (self, out, record): - self.queue.put ((out, record)) - -class WARCLogHandler (BufferingHandler): - """ - Buffered log handler, flushing to warcio - """ +from .controller import defaultSettings, EventHandler, ControllerStart +from .behavior import Script, DomSnapshotEvent, ScreenshotEvent +from .browser import Item - contentType = 'text/plain; charset=utf-8' - - def __init__ (self, capacity, warcfile): - BufferingHandler.__init__ (self, capacity) - self.warcfile = warcfile - - def flush (self): - self.acquire () - try: - if self.buffer: - buf = '' - for record in self.buffer: - buf += self.format (record) - buf += '\n' - # XXX: record type? - record = self.warcfile.create_warc_record ( - packageUrl ('log'), 'metadata', - payload=BytesIO (buf.encode ('utf8')), - warc_headers_dict={'Content-Type': self.contentType}) - self.warcfile.write_record(record) - self.buffer = [] - finally: - self.release () - -class WarcLoader (AccountingSiteLoader): - def __init__ (self, browser, url, writer, +class WarcHandler (EventHandler): + def __init__ (self, fd, logger=logging.getLogger(__name__), - logBuffer=defaultSettings.logBuffer, maxBodySize=defaultSettings.maxBodySize): - super ().__init__ (browser, url, logger) - self.writer = writer + self.logger = logger + self.writer = WARCWriter (fd, gzip=True) self.maxBodySize = maxBodySize - self.warcLogger = WARCLogHandler (logBuffer, writer) - self.logger.addHandler (self.warcLogger) - - def __exit__ (self, exc_type, exc_value, traceback): - self.logger.removeHandler (self.warcLogger) - self.warcLogger.flush () - return super ().__exit__ (exc_type, exc_value, traceback) - - @staticmethod - def getStatusText (response): - text = response.get ('statusText') - if text: - return text - text = BaseHTTPRequestHandler.responses.get (response['status']) - if text: - return text[0] - return 'No status text available' def _writeRequest (self, item): writer = self.writer @@ -154,13 +76,12 @@ class WarcLoader (AccountingSiteLoader): return record.rec_headers['WARC-Record-ID'] - def _getBody (self, item, redirect): + def _getBody (self, item): reqId = item.id - resp = item.response rawBody = b'' base64Encoded = False - if redirect: + if item.isRedirect: # redirects reuse the same request, thus we cannot safely retrieve # the body (i.e getResponseBody may return the new location’s # body). This is fine. @@ -173,7 +94,7 @@ class WarcLoader (AccountingSiteLoader): rawBody, base64Encoded = item.body return rawBody, base64Encoded - def _writeResponse (self, item, redirect, concurrentTo, rawBody, base64Encoded): + def _writeResponse (self, item, concurrentTo, rawBody, base64Encoded): writer = self.writer reqId = item.id resp = item.response @@ -192,7 +113,7 @@ class WarcLoader (AccountingSiteLoader): } httpHeaders = StatusAndHeaders('{} {}'.format (resp['status'], - self.getStatusText (resp)), item.responseHeaders, + item.statusText), item.responseHeaders, protocol='HTTP/1.1') # Content is saved decompressed and decoded, remove these headers @@ -216,21 +137,63 @@ class WarcLoader (AccountingSiteLoader): http_headers=httpHeaders) writer.write_record(record) - def loadingFinished (self, item, redirect=False): - super ().loadingFinished (item, redirect) - + def _writeScript (self, item): writer = self.writer - - req = item.request - reqId = item.id - resp = item.response - url = urlsplit (resp['url']) - + encoding = 'utf-8' + record = writer.create_warc_record (packageUrl ('script/{}'.format (item.path)), 'metadata', + payload=BytesIO (str (item).encode (encoding)), + warc_headers_dict={'Content-Type': 'application/javascript; charset={}'.format (encoding)}) + writer.write_record (record) + + def _writeItem (self, item): + if item.failed: + # should have been handled by the logger already + return try: # write neither request nor response if we cannot retrieve the body - rawBody, base64Encoded = self._getBody (item, redirect) + rawBody, base64Encoded = self._getBody (item) concurrentTo = self._writeRequest (item) - self._writeResponse (item, redirect, concurrentTo, rawBody, base64Encoded) + self._writeResponse (item, concurrentTo, rawBody, base64Encoded) except ValueError as e: self.logger.error (e.args[0]) + def _writeDomSnapshot (self, item): + writer = self.writer + httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1') + record = writer.create_warc_record (item.url, 'response', + payload=BytesIO (item.document), + http_headers=httpHeaders, + warc_headers_dict={'X-DOM-Snapshot': str (True), + 'X-Chrome-Viewport': item.viewport}) + writer.write_record (record) + + def _writeScreenshot (self, item): + writer = self.writer + url = packageUrl ('screenshot-{}-{}.png'.format (0, item.yoff)) + record = writer.create_warc_record (url, 'resource', + payload=BytesIO (item.data), warc_headers_dict={'Content-Type': 'image/png'}) + writer.write_record (record) + + def _writeControllerStart (self, item): + writer = self.writer + warcinfo = writer.create_warcinfo_record (filename=None, info=item.payload) + writer.write_record (warcinfo) + + route = {Script: _writeScript, + Item: _writeItem, + DomSnapshotEvent: _writeDomSnapshot, + ScreenshotEvent: _writeScreenshot, + ControllerStart: _writeControllerStart, + } + + def push (self, item): + processed = False + for k, v in self.route.items (): + if isinstance (item, k): + v (self, item) + processed = True + break + + if not processed: + self.logger.debug ('unknown event {}'.format (repr (item))) + -- cgit v1.2.3