From b4669705fa8e581c17bbe0ca0c7cf4fadbd3deb8 Mon Sep 17 00:00:00 2001 From: Lars-Dominik Braun Date: Tue, 18 Jun 2019 13:41:53 +0200 Subject: Fix idle state tracking race condition MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #16. Expose SiteLoader’s page idle changes through events and move state tracking into controller event handler. Relies on tracking time instead of asyncio event, which is more reliable. --- crocoite/browser.py | 49 ++++++++++--------------- crocoite/controller.py | 87 ++++++++++++++++++++++++++++----------------- crocoite/test_browser.py | 38 +++++++------------- crocoite/test_controller.py | 40 +++++++++++++++++++-- 4 files changed, 121 insertions(+), 93 deletions(-) diff --git a/crocoite/browser.py b/crocoite/browser.py index 10eaaff..577e77a 100644 --- a/crocoite/browser.py +++ b/crocoite/browser.py @@ -246,31 +246,17 @@ class RequestResponsePair: except TabException: self.response.body = None -class VarChangeEvent: - """ Notify when variable is changed """ - - __slots__ = ('_value', 'event') - - def __init__ (self, value): - self._value = value - self.event = asyncio.Event() +class NavigateError (IOError): + pass - def set (self, value): - if value != self._value: - self._value = value - # unblock waiting threads - self.event.set () - self.event.clear () +class PageIdle: + """ Page idle event """ - def get (self): - return self._value + def __init__ (self, idle): + self.idle = idle - async def wait (self): - await self.event.wait () - return self._value - -class NavigateError (IOError): - pass + def __bool__ (self): + return self.idle class SiteLoader: """ @@ -280,7 +266,7 @@ class SiteLoader: """ __slots__ = ('requests', 'browser', 'logger', 'tab', '_iterRunning', - 'idle', '_framesLoading') + '_framesLoading') allowedSchemes = {'http', 'https'} def __init__ (self, browser, logger): @@ -289,7 +275,6 @@ class SiteLoader: self.logger = logger.bind (context=type (self).__name__) self._iterRunning = [] - self.idle = VarChangeEvent (True) self._framesLoading = set () async def __aenter__ (self): @@ -343,22 +328,24 @@ class SiteLoader: # we need to block (yield) for every item completed, but not # handled by the consumer (caller). running = self._iterRunning - running.append (asyncio.ensure_future (self.tab.get ())) + tabGetTask = asyncio.ensure_future (self.tab.get ()) + running.append (tabGetTask) while True: done, pending = await asyncio.wait (running, return_when=asyncio.FIRST_COMPLETED) for t in done: result = t.result () if result is None: pass - elif isinstance (result, RequestResponsePair): - yield result - else: + elif t == tabGetTask: method, data = result f = handler.get (method, None) if f is not None: task = asyncio.ensure_future (f (**data)) pending.add (task) - pending.add (asyncio.ensure_future (self.tab.get ())) + tabGetTask = asyncio.ensure_future (self.tab.get ()) + pending.add (tabGetTask) + else: + yield result running = pending self._iterRunning = running @@ -492,7 +479,7 @@ class SiteLoader: uuid='bbeb39c0-3304-4221-918e-f26bd443c566', args=kwargs) self._framesLoading.add (kwargs['frameId']) - self.idle.set (False) + return PageIdle (False) async def _frameStoppedLoading (self, **kwargs): self.logger.debug ('frameStoppedLoading', @@ -500,5 +487,5 @@ class SiteLoader: self._framesLoading.remove (kwargs['frameId']) if not self._framesLoading: - self.idle.set (True) + return PageIdle (True) diff --git a/crocoite/controller.py b/crocoite/controller.py index b531491..08482af 100644 --- a/crocoite/controller.py +++ b/crocoite/controller.py @@ -29,7 +29,7 @@ from operator import attrgetter from yarl import URL from . import behavior as cbehavior -from .browser import SiteLoader, RequestResponsePair +from .browser import SiteLoader, RequestResponsePair, PageIdle from .util import getFormattedViewportMetrics, getSoftwareInfo from .behavior import ExtractLinksEvent @@ -95,6 +95,40 @@ class ControllerStart: def __init__ (self, payload): self.payload = payload +class IdleStateTracker (EventHandler): + """ Track SiteLoader’s idle state by listening to PageIdle events """ + + __slots__ = ('_idle', '_loop', '_idleSince') + + def __init__ (self, loop): + self._idle = True + self._loop = loop + + self._idleSince = self._loop.time () + + def push (self, item): + if isinstance (item, PageIdle): + self._idle = bool (item) + if self._idle: + self._idleSince = self._loop.time () + + async def wait (self, timeout): + """ Wait until page has been idle for at least timeout seconds. If the + page has been idle before calling this function it may return + immediately. """ + + assert timeout > 0 + while True: + if self._idle: + now = self._loop.time () + sleep = timeout-(now-self._idleSince) + if sleep <= 0: + break + else: + # not idle, check again after timeout expires + sleep = timeout + await asyncio.sleep (sleep) + class SinglePageController: """ Archive a single page url. @@ -128,10 +162,12 @@ class SinglePageController: async for item in l: self.processItem (item) + idle = IdleStateTracker (asyncio.get_event_loop ()) + self.handler.append (idle) + async with self.service as browser, SiteLoader (browser, logger=logger) as l: handle = asyncio.ensure_future (processQueue ()) - - start = time.time () + timeoutProc = asyncio.ensure_future (asyncio.sleep (self.settings.timeout)) # not all behavior scripts are allowed for every URL, filter them enabledBehavior = list (filter (lambda x: self.url in x, @@ -162,30 +198,17 @@ class SinglePageController: async for item in b.onload (): self.processItem (item) - # wait until the browser has a) been idle for at least - # settings.idleTimeout or b) settings.timeout is exceeded - timeoutProc = asyncio.ensure_future (asyncio.sleep (self.settings.timeout)) - # the browser might have changed to idle from .navigate until here - # due to awaits inbetween. Thus, idleProc may never be triggered. - idleTimeout = None if not l.idle.get() else self.settings.idleTimeout + idleProc = asyncio.ensure_future (idle.wait (self.settings.idleTimeout)) while True: - idleProc = asyncio.ensure_future (l.idle.wait ()) try: finished, pending = await asyncio.wait([idleProc, timeoutProc, handle], - return_when=asyncio.FIRST_COMPLETED, timeout=idleTimeout) + return_when=asyncio.FIRST_COMPLETED) except asyncio.CancelledError: idleProc.cancel () timeoutProc.cancel () break - if not finished: - # idle timeout - logger.debug ('idle timeout', - uuid='90702590-94c4-44ef-9b37-02a16de444c3') - idleProc.cancel () - timeoutProc.cancel () - break - elif handle in finished: + if handle in finished: # something went wrong while processing the data logger.error ('fetch failed', uuid='43a0686a-a3a9-4214-9acd-43f6976f8ff3') @@ -201,16 +224,12 @@ class SinglePageController: timeoutProc.result () break elif idleProc in finished: - # idle state change - isIdle = idleProc.result () - logger.debug ('idle state', - uuid='e3eaff79-7b56-4d17-aa42-d32fa1ec268b', - idle=isIdle) - if isIdle: - # browser is idle, start the clock - idleTimeout = self.settings.idleTimeout - else: - idleTimeout = None + # idle timeout + logger.debug ('idle timeout', + uuid='90702590-94c4-44ef-9b37-02a16de444c3') + idleProc.result () + timeoutProc.cancel () + break for b in enabledBehavior: async for item in b.onstop (): @@ -223,10 +242,12 @@ class SinglePageController: async for item in b.onfinish (): self.processItem (item) - # wait until loads from behavior scripts are done - await asyncio.sleep (1) - if not l.idle.get (): - while not await l.idle.wait (): pass + # wait until loads from behavior scripts are done and browser is + # idle for at least 1 second + try: + await asyncio.wait_for (idle.wait (1), timeout=1) + except (asyncio.TimeoutError, asyncio.CancelledError): + pass if handle.done (): handle.result () diff --git a/crocoite/test_browser.py b/crocoite/test_browser.py index 6341bd6..713c434 100644 --- a/crocoite/test_browser.py +++ b/crocoite/test_browser.py @@ -32,9 +32,9 @@ import hypothesis.strategies as st from hypothesis.provisional import domains import pytest -from .browser import RequestResponsePair, SiteLoader, VarChangeEvent, Request, \ +from .browser import RequestResponsePair, SiteLoader, Request, \ UnicodeBody, ReferenceTimestamp, Base64Body, UnicodeBody, Request, \ - Response, NavigateError + Response, NavigateError, PageIdle from .logger import Logger, Consumer from .devtools import Crashed, Process @@ -79,29 +79,6 @@ async def test_invalidurl (loader): else: pytest.skip (f'host {host} resolved to {resolved}') -@pytest.mark.asyncio -async def test_varchangeevent (): - e = VarChangeEvent (True) - assert e.get () == True - - # no change at all - w = asyncio.ensure_future (e.wait ()) - finished, pending = await asyncio.wait ([w], timeout=0.1) - assert not finished and pending - - # no change - e.set (True) - finished, pending = await asyncio.wait ([w], timeout=0.1) - assert not finished and pending - - # changed - e.set (False) - await asyncio.sleep (0.1) # XXX: is there a yield() ? - assert w.done () - ret = w.result () - assert ret == False - assert e.get () == ret - timestamp = st.one_of ( st.integers(min_value=0, max_value=2**32-1), st.floats (min_value=0, max_value=2**32-1), @@ -365,7 +342,10 @@ async def test_integration_item (loader, golden): await loader.navigate (golden.url) it = loader.__aiter__ () - item = await it.__anext__ () + while True: + item = await it.__anext__ () + if isinstance (item, RequestResponsePair): + break # we do not know this in advance item.request.initiator = None @@ -385,3 +365,9 @@ async def test_integration_item (loader, golden): finally: await runner.cleanup () +def test_page_idle (): + for v in (True, False): + idle = PageIdle (v) + assert bool (idle) == v + + diff --git a/crocoite/test_controller.py b/crocoite/test_controller.py index 6f92e23..fa478a1 100644 --- a/crocoite/test_controller.py +++ b/crocoite/test_controller.py @@ -26,7 +26,9 @@ from aiohttp import web import pytest from .logger import Logger -from .controller import ControllerSettings, SinglePageController, SetEntry +from .controller import ControllerSettings, SinglePageController, SetEntry, \ + IdleStateTracker +from .browser import PageIdle from .devtools import Process from .test_browser import loader @@ -63,10 +65,13 @@ window.setInterval (function () { fetch('/').then (function (e) { console.log (e # hard-coded asyncio.sleep calls in there right now. # XXX fix this before = loop.time () - await asyncio.wait_for (controller.run (), settings.timeout*2) + await asyncio.wait_for (controller.run (), timeout=settings.timeout*2) after = loop.time () - assert after-before >= settings.timeout + assert after-before >= settings.timeout, (settings.timeout*2, after-before) finally: + # give the browser some time to close before interrupting the + # connection by destroying the HTTP server + await asyncio.sleep (1) await runner.cleanup () @pytest.mark.asyncio @@ -117,3 +122,32 @@ def test_set_entry (): assert a != c assert hash (a) != hash (c) +@pytest.mark.asyncio +async def test_idle_state_tracker (): + # default is idle + loop = asyncio.get_event_loop () + idle = IdleStateTracker (loop) + assert idle._idle + + # idle change + idle.push (PageIdle (False)) + assert not idle._idle + + # nothing happens for other objects + idle.push ({}) + assert not idle._idle + + # no state change -> wait does not return + with pytest.raises (asyncio.TimeoutError): + await asyncio.wait_for (idle.wait (0.1), timeout=1) + + # wait at least timeout + delta = 0.2 + timeout = 1 + idle.push (PageIdle (True)) + assert idle._idle + start = loop.time () + await idle.wait (timeout) + end = loop.time () + assert (timeout-delta) < (end-start) < (timeout+delta) + -- cgit v1.2.3