From aeab124ac9f1e3e88f6a8ae246c90b8094e94223 Mon Sep 17 00:00:00 2001 From: Lars-Dominik Braun Date: Tue, 6 Nov 2018 16:53:16 +0100 Subject: Switch single mode to asyncio This is a direct port to asyncio without any design changes. These need to happen in further refinements. Fixes issue #1. --- crocoite/behavior.py | 89 +++++++++++++------------ crocoite/cli.py | 13 ++-- crocoite/controller.py | 178 +++++++++++++++++++++---------------------------- crocoite/util.py | 4 +- crocoite/warc.py | 32 +++------ 5 files changed, 141 insertions(+), 175 deletions(-) (limited to 'crocoite') diff --git a/crocoite/behavior.py b/crocoite/behavior.py index 119248e..1a4aac1 100644 --- a/crocoite/behavior.py +++ b/crocoite/behavior.py @@ -22,19 +22,19 @@ Generic and per-site behavior scripts """ -import time +import asyncio from urllib.parse import urlsplit import os.path -import pkg_resources from base64 import b64decode from collections import OrderedDict +import pkg_resources from html5lib.serializer import HTMLSerializer -from pychrome.exceptions import TimeoutException from .util import randomString, getFormattedViewportMetrics, removeFragment from . import html from .html import StripAttributeFilter, StripTagFilter, ChromeTreeWalker +from .devtools import Crashed class Script: """ A JavaScript resource """ @@ -78,17 +78,21 @@ class Behavior: def __repr__ (self): return ''.format (self.name) - def onload (self): + async def onload (self): """ Before loading the page """ - yield from () + # this is a dirty hack to make this function an async generator + return + yield - def onstop (self): + async def onstop (self): """ Before page loading is stopped """ - yield from () + return + yield - def onfinish (self): + async def onfinish (self): """ After the site has stopped loading """ - yield from () + return + yield class HostnameFilter: """ Limit behavior script to hostname """ @@ -112,20 +116,21 @@ class JsOnload (Behavior): self.script = Script (self.scriptPath) self.scriptHandle = None - def onload (self): + async def onload (self): yield self.script - result = self.loader.tab.Page.addScriptToEvaluateOnNewDocument (source=str (self.script)) + result = await self.loader.tab.Page.addScriptToEvaluateOnNewDocument (source=str (self.script)) self.scriptHandle = result['identifier'] - def onstop (self): + async def onstop (self): if self.scriptHandle: - self.loader.tab.Page.removeScriptToEvaluateOnNewDocument (identifier=self.scriptHandle) - yield from () + await self.loader.tab.Page.removeScriptToEvaluateOnNewDocument (identifier=self.scriptHandle) + return + yield ### Generic scripts ### class Scroll (JsOnload): - __slots__ = ('stopVarname') + __slots__ = ('stopVarname', ) name = 'scroll' scriptPath = 'scroll.js' @@ -137,17 +142,17 @@ class Scroll (JsOnload): self.script.data = self.script.data.replace (stopVarname, newStopVarname) self.stopVarname = newStopVarname - def onstop (self): + async def onstop (self): super ().onstop () # removing the script does not stop it if running script = Script.fromStr ('{} = true; window.scrollTo (0, 0);'.format (self.stopVarname)) yield script - self.loader.tab.Runtime.evaluate (expression=str (script), returnByValue=True) + await self.loader.tab.Runtime.evaluate (expression=str (script), returnByValue=True) class EmulateScreenMetrics (Behavior): name = 'emulateScreenMetrics' - def onstop (self): + async def onstop (self): """ Emulate different screen sizes, causing the site to fetch assets (img srcset and css, for example) for different screen resolutions. @@ -169,12 +174,14 @@ class EmulateScreenMetrics (Behavior): l = self.loader tab = l.tab for s in sizes: - tab.Emulation.setDeviceMetricsOverride (**s) + await tab.Emulation.setDeviceMetricsOverride (**s) # give the browser time to re-eval page and start requests - time.sleep (1) + # XXX: should wait until loader is not busy any more + await asyncio.sleep (1) # XXX: this seems to be broken, it does not clear the override #tab.Emulation.clearDeviceMetricsOverride () - yield from () + return + yield class DomSnapshotEvent: __slots__ = ('url', 'document', 'viewport') @@ -195,7 +202,7 @@ class DomSnapshot (Behavior): can’t handle that though. """ - __slots__ = ('script') + __slots__ = ('script', ) name = 'domSnapshot' @@ -203,14 +210,14 @@ class DomSnapshot (Behavior): super ().__init__ (loader, logger) self.script = Script ('canvas-snapshot.js') - def onfinish (self): + async def onfinish (self): tab = self.loader.tab yield self.script - tab.Runtime.evaluate (expression=str (self.script), returnByValue=True) + await tab.Runtime.evaluate (expression=str (self.script), returnByValue=True) - viewport = getFormattedViewportMetrics (tab) - dom = tab.DOM.getDocument (depth=-1, pierce=True) + viewport = await getFormattedViewportMetrics (tab) + dom = await tab.DOM.getDocument (depth=-1, pierce=True) haveUrls = set () for doc in ChromeTreeWalker (dom['root']).split (): rawUrl = doc['documentURL'] @@ -247,10 +254,10 @@ class Screenshot (Behavior): name = 'screenshot' - def onfinish (self): + async def onfinish (self): tab = self.loader.tab - tree = tab.Page.getFrameTree () + tree = await tab.Page.getFrameTree () try: url = removeFragment (tree['frameTree']['frame']['url']) except KeyError: @@ -260,7 +267,7 @@ class Screenshot (Behavior): # see https://github.com/GoogleChrome/puppeteer/blob/230be28b067b521f0577206899db01f0ca7fc0d2/examples/screenshots-longpage.js # Hardcoded max texture size of 16,384 (crbug.com/770769) maxDim = 16*1024 - metrics = tab.Page.getLayoutMetrics () + metrics = await tab.Page.getLayoutMetrics () contentSize = metrics['contentSize'] width = min (contentSize['width'], maxDim) # we’re ignoring horizontal scroll intentionally. Most horizontal @@ -268,7 +275,8 @@ class Screenshot (Behavior): for yoff in range (0, contentSize['height'], maxDim): height = min (contentSize['height'] - yoff, maxDim) clip = {'x': 0, 'y': yoff, 'width': width, 'height': height, 'scale': 1} - data = b64decode (tab.Page.captureScreenshot (format='png', clip=clip)['data']) + ret = await tab.Page.captureScreenshot (format='png', clip=clip) + data = b64decode (ret['data']) yield ScreenshotEvent (url, yoff, data) class Click (JsOnload): @@ -278,7 +286,7 @@ class Click (JsOnload): scriptPath = 'click.js' class ExtractLinksEvent: - __slots__ = ('links') + __slots__ = ('links', ) def __init__ (self, links): self.links = links @@ -291,7 +299,7 @@ class ExtractLinks (Behavior): manually resolve relative links. """ - __slots__ = ('script') + __slots__ = ('script', ) name = 'extractLinks' @@ -299,10 +307,10 @@ class ExtractLinks (Behavior): super ().__init__ (loader, logger) self.script = Script ('extract-links.js') - def onfinish (self): + async def onfinish (self): tab = self.loader.tab yield self.script - result = tab.Runtime.evaluate (expression=str (self.script), returnByValue=True) + result = await tab.Runtime.evaluate (expression=str (self.script), returnByValue=True) yield ExtractLinksEvent (list (set (result['result']['value']))) class Crash (Behavior): @@ -310,18 +318,17 @@ class Crash (Behavior): name = 'crash' - def onstop (self): + async def onstop (self): try: - self.loader.tab.Page.crash (_timeout=1) - except TimeoutException: + await self.loader.tab.Page.crash () + except Crashed: pass - yield from () + return + yield # available behavior scripts. Order matters, move those modifying the page # towards the end of available -generic = [Scroll, EmulateScreenMetrics, Click, ExtractLinks] -perSite = [] -available = generic + perSite + [Screenshot, DomSnapshot] +available = [Scroll, Click, ExtractLinks, Screenshot, EmulateScreenMetrics, DomSnapshot] #available.append (Crash) # order matters, since behavior can modify the page (dom snapshots, for instance) availableMap = OrderedDict (map (lambda x: (x.name, x), available)) diff --git a/crocoite/cli.py b/crocoite/cli.py index ca38bca..c5dee35 100644 --- a/crocoite/cli.py +++ b/crocoite/cli.py @@ -28,9 +28,10 @@ from enum import IntEnum from . import behavior from .controller import SinglePageController, defaultSettings, \ ControllerSettings, StatsHandler, LogHandler -from .browser import NullService, ChromeService, BrowserCrashed +from .browser import NullService, ChromeService from .warc import WarcHandler from .logger import Logger, JsonPrintConsumer, DatetimeConsumer, WarcHandlerConsumer +from .devtools import Crashed class SingleExitStatus(IntEnum): """ Exit status for single-shot command line """ @@ -43,7 +44,6 @@ def single (): parser.add_argument('--browser', help='DevTools URL', metavar='URL') parser.add_argument('--timeout', default=1*60*60, type=int, help='Maximum time for archival', metavar='SEC') parser.add_argument('--idle-timeout', default=30, type=int, help='Maximum idle seconds (i.e. no requests)', dest='idleTimeout', metavar='SEC') - parser.add_argument('--max-body-size', default=defaultSettings.maxBodySize, type=int, dest='maxBodySize', help='Max body size', metavar='BYTES') parser.add_argument('--behavior', help='Comma-separated list of enabled behavior scripts', dest='enabledBehaviorNames', default=list (behavior.availableMap.keys ()), @@ -59,8 +59,7 @@ def single (): service = ChromeService () if args.browser: service = NullService (args.browser) - settings = ControllerSettings (maxBodySize=args.maxBodySize, - idleTimeout=args.idleTimeout, timeout=args.timeout) + settings = ControllerSettings (idleTimeout=args.idleTimeout, timeout=args.timeout) with open (args.output, 'wb') as fd, WarcHandler (fd, logger) as warcHandler: logger.connect (WarcHandlerConsumer (warcHandler)) handler = [StatsHandler (), LogHandler (logger), warcHandler] @@ -68,9 +67,11 @@ def single (): controller = SinglePageController (args.url, fd, settings=settings, service=service, handler=handler, behavior=b, logger=logger) try: - controller.run () + loop = asyncio.get_event_loop() + loop.run_until_complete(controller.run ()) + loop.close() ret = SingleExitStatus.Ok - except BrowserCrashed: + except Crashed: ret = SingleExitStatus.BrowserCrash finally: r = handler[0].stats diff --git a/crocoite/controller.py b/crocoite/controller.py index fc5b62b..dd32331 100644 --- a/crocoite/controller.py +++ b/crocoite/controller.py @@ -23,16 +23,14 @@ Controller classes, handling actions required for archival """ class ControllerSettings: - __slots__ = ('maxBodySize', 'idleTimeout', 'timeout') + __slots__ = ('idleTimeout', 'timeout') - def __init__ (self, maxBodySize=50*1024*1024, idleTimeout=2, timeout=10): - self.maxBodySize = maxBodySize + def __init__ (self, idleTimeout=2, timeout=10): self.idleTimeout = idleTimeout self.timeout = timeout def toDict (self): - return dict (maxBodySize=self.maxBodySize, - idleTimeout=self.idleTimeout, timeout=self.timeout) + return dict (idleTimeout=self.idleTimeout, timeout=self.timeout) defaultSettings = ControllerSettings () @@ -48,8 +46,6 @@ class EventHandler: def push (self, item): raise NotImplementedError () -from .browser import BrowserCrashed - class StatsHandler (EventHandler): __slots__ = ('stats') @@ -124,108 +120,84 @@ class SinglePageController: self.handler = handler def processItem (self, item): - if isinstance (item, Exception): - for h in self.handler: - if h.acceptException: - h.push (item) - raise item - for h in self.handler: h.push (item) - def run (self): + async def run (self): logger = self.logger - def processQueue (idleTimeout=self.settings.idleTimeout): - # XXX: this is very ugly code and does not work well. figure out a - # better way to impose timeouts and still process all items in the - # queue - queue = l.queue - logger.debug ('process queue', - uuid='dafbf76b-a37e-44db-a021-efb5593b81f8', - queuelen=len (queue)) - while True: - now = time.time () - elapsed = now-start - maxTimeout = max (min (idleTimeout, self.settings.timeout-elapsed), 0) - logger.debug ('timeout status', - uuid='49550447-37e3-49ff-9a73-34da1c3e5984', - maxTimeout=maxTimeout, elapsed=elapsed) - # skip waiting if there is work to do. processes all items in - # queue, regardless of timeouts, i.e. you need to make sure the - # queue will actually be empty at some point. - if len (queue) == 0: - if not l.notify.wait (maxTimeout): - assert len (queue) == 0, "event must be sent" - # timed out - logger.debug ('timeout', - uuid='6a7e0083-7c1a-45ba-b1ed-dbc4f26697c6', - elapsed=elapsed) + async def processQueue (): + async for item in l: + self.processItem (item) + + with self.service as browser: + async with SiteLoader (browser, self.url, logger=logger) as l: + handle = asyncio.ensure_future (processQueue ()) + + start = time.time () + + version = await l.tab.Browser.getVersion () + payload = { + 'software': { + 'platform': platform.platform (), + 'python': { + 'implementation': platform.python_implementation(), + 'version': platform.python_version (), + 'build': platform.python_build () + }, + 'self': getRequirements (__package__) + }, + 'browser': { + 'product': version['product'], + 'useragent': version['userAgent'], + 'viewport': await getFormattedViewportMetrics (l.tab), + }, + } + self.processItem (ControllerStart (payload)) + + # not all behavior scripts are allowed for every URL, filter them + enabledBehavior = list (filter (lambda x: self.url in x, + map (lambda x: x (l, logger), self.behavior))) + + for b in enabledBehavior: + async for item in b.onload (): + self.processItem (item) + await l.start () + + # XXX: this does not detect idle changes properly + idleSince = None + while True: + now = time.time() + runtime = now-start + if runtime >= self.settings.timeout or (idleSince and now-idleSince > self.settings.idleTimeout): break + if len (l) == 0: + if idleSince is None: + idleSince = time.time () else: - l.notify.clear () - # limit number of items processed here, otherwise timeout won’t - # be checked frequently. this can happen if the site quickly - # loads a lot of items. - for i in range (1000): - try: - item = queue.popleft () - logger.debug ('queue pop', - uuid='adc96bfa-026d-4092-b732-4a022a1a92ca', - item=item, queuelen=len (queue)) - except IndexError: - break - self.processItem (item) - if maxTimeout == 0: - break - - with self.service as browser, SiteLoader (browser, self.url, logger=logger) as l: - start = time.time () - - version = l.tab.Browser.getVersion () - payload = { - 'software': { - 'platform': platform.platform (), - 'python': { - 'implementation': platform.python_implementation(), - 'version': platform.python_version (), - 'build': platform.python_build () - }, - 'self': getRequirements (__package__) - }, - 'browser': { - 'product': version['product'], - 'useragent': version['userAgent'], - 'viewport': getFormattedViewportMetrics (l.tab), - }, - } - self.processItem (ControllerStart (payload)) - - # not all behavior scripts are allowed for every URL, filter them - enabledBehavior = list (filter (lambda x: self.url in x, - map (lambda x: x (l, logger), self.behavior))) - - for b in enabledBehavior: - # I decided against using the queue here to limit memory - # usage (screenshot behavior would put all images into - # queue before we could process them) - for item in b.onload (): - self.processItem (item) - l.start () - - processQueue () - - for b in enabledBehavior: - for item in b.onstop (): - self.processItem (item) - - # if we stopped due to timeout, wait for remaining assets - processQueue (1) - - for b in enabledBehavior: - for item in b.onfinish (): - self.processItem (item) - - processQueue (1) + idleSince = None + await asyncio.sleep (1) + await l.tab.Page.stopLoading () + + for b in enabledBehavior: + async for item in b.onstop (): + self.processItem (item) + + await asyncio.sleep (1) + + for b in enabledBehavior: + async for item in b.onfinish (): + self.processItem (item) + + # drain the queue XXX detect idle properly + i = 0 + while len (l) and i < 20: + i += 1 + await asyncio.sleep (1) + + if handle.done (): + handle.result () + else: + handle.cancel () class RecursionPolicy: """ Abstract recursion policy """ diff --git a/crocoite/util.py b/crocoite/util.py index 3a62533..daa60db 100644 --- a/crocoite/util.py +++ b/crocoite/util.py @@ -37,8 +37,8 @@ def packageUrl (path): """ return 'urn:' + __package__ + ':' + path -def getFormattedViewportMetrics (tab): - layoutMetrics = tab.Page.getLayoutMetrics () +async def getFormattedViewportMetrics (tab): + layoutMetrics = await tab.Page.getLayoutMetrics () # XXX: I’m not entirely sure which one we should use here return '{}x{}'.format (layoutMetrics['layoutViewport']['clientWidth'], layoutMetrics['layoutViewport']['clientHeight']) diff --git a/crocoite/warc.py b/crocoite/warc.py index 9b97e75..c1cbff2 100644 --- a/crocoite/warc.py +++ b/crocoite/warc.py @@ -37,15 +37,13 @@ from .behavior import Script, DomSnapshotEvent, ScreenshotEvent from .browser import Item class WarcHandler (EventHandler): - __slots__ = ('logger', 'writer', 'maxBodySize', 'documentRecords', 'log', + __slots__ = ('logger', 'writer', 'documentRecords', 'log', 'maxLogSize', 'logEncoding', 'warcinfoRecordId') def __init__ (self, fd, - logger, - maxBodySize=defaultSettings.maxBodySize): + logger): self.logger = logger self.writer = WARCWriter (fd, gzip=True) - self.maxBodySize = maxBodySize self.logEncoding = 'utf-8' self.log = BytesIO () @@ -101,16 +99,13 @@ class WarcHandler (EventHandler): 'X-Chrome-Request-ID': item.id, 'WARC-Date': datetime_to_iso_date (datetime.utcfromtimestamp (item.chromeRequest['wallTime'])), } - try: - bodyTruncated = None + + if item.requestBody is not None: payload, payloadBase64Encoded = item.requestBody - except ValueError: + else: # oops, don’t know what went wrong here - bodyTruncated = 'unspecified' logger.error ('requestBody missing', uuid='ee9adc58-e723-4595-9feb-312a67ead6a0') - - if bodyTruncated: - warcHeaders['WARC-Truncated'] = bodyTruncated + warcHeaders['WARC-Truncated'] = 'unspecified' payload = None if payload: @@ -127,22 +122,13 @@ class WarcHandler (EventHandler): rawBody = None base64Encoded = False bodyTruncated = None - if item.isRedirect: + if item.isRedirect or item.body is None: # redirects reuse the same request, thus we cannot safely retrieve # the body (i.e getResponseBody may return the new location’s - # body). + # body). No body available means we failed to retrieve it. bodyTruncated = 'unspecified' - elif item.encodedDataLength > self.maxBodySize: - bodyTruncated = 'length' - # check body size first, since we’re loading everything into memory - self.logger.error ('body for {} too large {} vs {}'.format (reqId, - item.encodedDataLength, self.maxBodySize)) else: - try: - rawBody, base64Encoded = item.body - except ValueError: - # oops, don’t know what went wrong here - bodyTruncated = 'unspecified' + rawBody, base64Encoded = item.body # now the response resp = item.response -- cgit v1.2.3