From aeab124ac9f1e3e88f6a8ae246c90b8094e94223 Mon Sep 17 00:00:00 2001 From: Lars-Dominik Braun Date: Tue, 6 Nov 2018 16:53:16 +0100 Subject: Switch single mode to asyncio This is a direct port to asyncio without any design changes. These need to happen in further refinements. Fixes issue #1. --- crocoite/controller.py | 178 +++++++++++++++++++++---------------------------- 1 file changed, 75 insertions(+), 103 deletions(-) (limited to 'crocoite/controller.py') diff --git a/crocoite/controller.py b/crocoite/controller.py index fc5b62b..dd32331 100644 --- a/crocoite/controller.py +++ b/crocoite/controller.py @@ -23,16 +23,14 @@ Controller classes, handling actions required for archival """ class ControllerSettings: - __slots__ = ('maxBodySize', 'idleTimeout', 'timeout') + __slots__ = ('idleTimeout', 'timeout') - def __init__ (self, maxBodySize=50*1024*1024, idleTimeout=2, timeout=10): - self.maxBodySize = maxBodySize + def __init__ (self, idleTimeout=2, timeout=10): self.idleTimeout = idleTimeout self.timeout = timeout def toDict (self): - return dict (maxBodySize=self.maxBodySize, - idleTimeout=self.idleTimeout, timeout=self.timeout) + return dict (idleTimeout=self.idleTimeout, timeout=self.timeout) defaultSettings = ControllerSettings () @@ -48,8 +46,6 @@ class EventHandler: def push (self, item): raise NotImplementedError () -from .browser import BrowserCrashed - class StatsHandler (EventHandler): __slots__ = ('stats') @@ -124,108 +120,84 @@ class SinglePageController: self.handler = handler def processItem (self, item): - if isinstance (item, Exception): - for h in self.handler: - if h.acceptException: - h.push (item) - raise item - for h in self.handler: h.push (item) - def run (self): + async def run (self): logger = self.logger - def processQueue (idleTimeout=self.settings.idleTimeout): - # XXX: this is very ugly code and does not work well. figure out a - # better way to impose timeouts and still process all items in the - # queue - queue = l.queue - logger.debug ('process queue', - uuid='dafbf76b-a37e-44db-a021-efb5593b81f8', - queuelen=len (queue)) - while True: - now = time.time () - elapsed = now-start - maxTimeout = max (min (idleTimeout, self.settings.timeout-elapsed), 0) - logger.debug ('timeout status', - uuid='49550447-37e3-49ff-9a73-34da1c3e5984', - maxTimeout=maxTimeout, elapsed=elapsed) - # skip waiting if there is work to do. processes all items in - # queue, regardless of timeouts, i.e. you need to make sure the - # queue will actually be empty at some point. - if len (queue) == 0: - if not l.notify.wait (maxTimeout): - assert len (queue) == 0, "event must be sent" - # timed out - logger.debug ('timeout', - uuid='6a7e0083-7c1a-45ba-b1ed-dbc4f26697c6', - elapsed=elapsed) + async def processQueue (): + async for item in l: + self.processItem (item) + + with self.service as browser: + async with SiteLoader (browser, self.url, logger=logger) as l: + handle = asyncio.ensure_future (processQueue ()) + + start = time.time () + + version = await l.tab.Browser.getVersion () + payload = { + 'software': { + 'platform': platform.platform (), + 'python': { + 'implementation': platform.python_implementation(), + 'version': platform.python_version (), + 'build': platform.python_build () + }, + 'self': getRequirements (__package__) + }, + 'browser': { + 'product': version['product'], + 'useragent': version['userAgent'], + 'viewport': await getFormattedViewportMetrics (l.tab), + }, + } + self.processItem (ControllerStart (payload)) + + # not all behavior scripts are allowed for every URL, filter them + enabledBehavior = list (filter (lambda x: self.url in x, + map (lambda x: x (l, logger), self.behavior))) + + for b in enabledBehavior: + async for item in b.onload (): + self.processItem (item) + await l.start () + + # XXX: this does not detect idle changes properly + idleSince = None + while True: + now = time.time() + runtime = now-start + if runtime >= self.settings.timeout or (idleSince and now-idleSince > self.settings.idleTimeout): break + if len (l) == 0: + if idleSince is None: + idleSince = time.time () else: - l.notify.clear () - # limit number of items processed here, otherwise timeout won’t - # be checked frequently. this can happen if the site quickly - # loads a lot of items. - for i in range (1000): - try: - item = queue.popleft () - logger.debug ('queue pop', - uuid='adc96bfa-026d-4092-b732-4a022a1a92ca', - item=item, queuelen=len (queue)) - except IndexError: - break - self.processItem (item) - if maxTimeout == 0: - break - - with self.service as browser, SiteLoader (browser, self.url, logger=logger) as l: - start = time.time () - - version = l.tab.Browser.getVersion () - payload = { - 'software': { - 'platform': platform.platform (), - 'python': { - 'implementation': platform.python_implementation(), - 'version': platform.python_version (), - 'build': platform.python_build () - }, - 'self': getRequirements (__package__) - }, - 'browser': { - 'product': version['product'], - 'useragent': version['userAgent'], - 'viewport': getFormattedViewportMetrics (l.tab), - }, - } - self.processItem (ControllerStart (payload)) - - # not all behavior scripts are allowed for every URL, filter them - enabledBehavior = list (filter (lambda x: self.url in x, - map (lambda x: x (l, logger), self.behavior))) - - for b in enabledBehavior: - # I decided against using the queue here to limit memory - # usage (screenshot behavior would put all images into - # queue before we could process them) - for item in b.onload (): - self.processItem (item) - l.start () - - processQueue () - - for b in enabledBehavior: - for item in b.onstop (): - self.processItem (item) - - # if we stopped due to timeout, wait for remaining assets - processQueue (1) - - for b in enabledBehavior: - for item in b.onfinish (): - self.processItem (item) - - processQueue (1) + idleSince = None + await asyncio.sleep (1) + await l.tab.Page.stopLoading () + + for b in enabledBehavior: + async for item in b.onstop (): + self.processItem (item) + + await asyncio.sleep (1) + + for b in enabledBehavior: + async for item in b.onfinish (): + self.processItem (item) + + # drain the queue XXX detect idle properly + i = 0 + while len (l) and i < 20: + i += 1 + await asyncio.sleep (1) + + if handle.done (): + handle.result () + else: + handle.cancel () class RecursionPolicy: """ Abstract recursion policy """ -- cgit v1.2.3