From 60fe79f2d898757f4f20aa89015e86cd63ef7871 Mon Sep 17 00:00:00 2001
From: Lars-Dominik Braun <lars@6xq.net>
Date: Sat, 3 Nov 2018 13:32:38 +0100
Subject: Switch site loader to async DevTools communication

---
 crocoite/browser.py      | 233 +++++++++++++++++++++++++----------------------
 crocoite/test_browser.py | 232 ++++++++++++++++++++++------------------------
 2 files changed, 236 insertions(+), 229 deletions(-)

diff --git a/crocoite/browser.py b/crocoite/browser.py
index b5ea4e3..515d06b 100644
--- a/crocoite/browser.py
+++ b/crocoite/browser.py
@@ -22,30 +22,32 @@
 Chrome browser interactions.
 """
 
+import asyncio
 from urllib.parse import urlsplit
 from base64 import b64decode
 from collections import deque
 from threading import Event
 from http.server import BaseHTTPRequestHandler
-from .logger import Level
 
-import pychrome
+from .logger import Level
+from .devtools import Browser, TabException
 
 class Item:
     """
     Simple wrapper containing Chrome request and response
     """
 
-    __slots__ = ('tab', 'chromeRequest', 'chromeResponse', 'chromeFinished',
-            'isRedirect', 'failed')
+    __slots__ = ('chromeRequest', 'chromeResponse', 'chromeFinished',
+            'isRedirect', 'failed', 'body', 'requestBody')
 
     def __init__ (self, tab):
-        self.tab = tab
         self.chromeRequest = {}
         self.chromeResponse = {}
         self.chromeFinished = {}
         self.isRedirect = False
         self.failed = False
+        self.body = None
+        self.requestBody = None
 
     def __repr__ (self):
         return '<Item {}>'.format (self.url)
@@ -78,36 +80,6 @@ class Item:
     def parsedUrl (self):
         return urlsplit (self.url)
 
-    @property
-    def body (self):
-        """ Return response body or None """
-        try:
-            body = self.tab.Network.getResponseBody (requestId=self.id, _timeout=10)
-            rawBody = body['body']
-            base64Encoded = body['base64Encoded']
-            if base64Encoded:
-                rawBody = b64decode (rawBody)
-            else:
-                rawBody = rawBody.encode ('utf8')
-            return rawBody, base64Encoded
-        except (pychrome.exceptions.CallMethodException, pychrome.exceptions.TimeoutException):
-            raise ValueError ('Cannot fetch response body')
-
-    @property
-    def requestBody (self):
-        """ Get request/POST body """
-        req = self.request
-        postData = req.get ('postData')
-        if postData:
-            return postData.encode ('utf8'), False
-        elif req.get ('hasPostData', False):
-            try:
-                postData = self.tab.Network.getRequestPostData (requestId=self.id, _timeout=10)['postData']
-                return b64decode (postData), True
-            except (pychrome.exceptions.CallMethodException, pychrome.exceptions.TimeoutException):
-                raise ValueError ('Cannot fetch request body')
-        return None, False
-
     @property
     def requestHeaders (self):
         # the response object may contain refined headers, which were
@@ -153,87 +125,131 @@ class Item:
     def setFinished (self, finished):
         self.chromeFinished = finished
 
-class BrowserCrashed (Exception):
-    pass
+    async def prefetchRequestBody (self, tab):
+        # request body
+        req = self.request
+        postData = req.get ('postData')
+        if postData:
+            self.requestBody = postData.encode ('utf8'), False
+        elif req.get ('hasPostData', False):
+            try:
+                postData = await tab.Network.getRequestPostData (requestId=self.id)
+                postData = postData['postData']
+                self.requestBody = b64decode (postData), True
+            except TabException:
+                self.requestBody = None
+        else:
+            self.requestBody = None, False
+
+    async def prefetchResponseBody (self, tab):
+        # get response body
+        try:
+            body = await tab.Network.getResponseBody (requestId=self.id)
+            rawBody = body['body']
+            base64Encoded = body['base64Encoded']
+            if base64Encoded:
+                rawBody = b64decode (rawBody)
+            else:
+                rawBody = rawBody.encode ('utf8')
+            self.body = rawBody, base64Encoded
+        except TabException:
+            self.body = None
 
 class SiteLoader:
     """
     Load site in Chrome and monitor network requests
 
-    Chrome’s raw devtools events are preprocessed here (asynchronously, in a
-    different thread, spawned by pychrome) and put into a deque. There
-    are two reasons for this: First of all, it makes consumer exception
-    handling alot easier (no need to propagate them to the main thread). And
-    secondly, browser crashes must be handled before everything else, as they
-    result in a loss of communication with the browser itself (i.e. we can’t
-    fetch a resource’s body any more).
-
     XXX: track popup windows/new tabs and close them
     """
 
-    __slots__ = ('requests', 'browser', 'url', 'logger', 'queue', 'notify', 'tab')
+    __slots__ = ('requests', 'browser', 'url', 'logger', 'tab', '_iterRunning')
     allowedSchemes = {'http', 'https'}
 
     def __init__ (self, browser, url, logger):
         self.requests = {}
-        self.browser = pychrome.Browser (url=browser)
+        self.browser = Browser (url=browser)
         self.url = url
         self.logger = logger.bind (context=type (self).__name__, url=url)
-        self.queue = deque ()
-        self.notify = Event ()
+        self._iterRunning = []
 
-    def __enter__ (self):
-        tab = self.tab = self.browser.new_tab()
-        # setup callbacks
-        tab.Network.requestWillBeSent = self._requestWillBeSent
-        tab.Network.responseReceived = self._responseReceived
-        tab.Network.loadingFinished = self._loadingFinished
-        tab.Network.loadingFailed = self._loadingFailed
-        tab.Log.entryAdded = self._entryAdded
-        tab.Page.javascriptDialogOpening = self._javascriptDialogOpening
-        tab.Inspector.targetCrashed = self._targetCrashed
-
-        # start the tab
-        tab.start()
+    async def __aenter__ (self):
+        tab = self.tab = await self.browser.__aenter__ ()
 
         # enable events
-        tab.Log.enable ()
-        tab.Network.enable()
-        tab.Page.enable ()
-        tab.Inspector.enable ()
-        tab.Network.clearBrowserCache ()
-        if tab.Network.canClearBrowserCookies ()['result']:
-            tab.Network.clearBrowserCookies ()
+        await asyncio.gather (*[
+                tab.Log.enable (),
+                tab.Network.enable(),
+                tab.Page.enable (),
+                tab.Inspector.enable (),
+                tab.Network.clearBrowserCache (),
+                ])
+        resp = await tab.Network.canClearBrowserCookies ()
+        if resp['result']:
+            await tab.Network.clearBrowserCookies ()
 
         return self
 
-    def __exit__ (self, exc_type, exc_value, traceback):
-        self.tab.Page.stopLoading ()
-        self.tab.stop ()
-        self.browser.close_tab(self.tab)
+    async def __aexit__ (self, exc_type, exc_value, traceback):
+        for task in self._iterRunning:
+            # ignore any results from stuff we did not end up using anyway
+            if not task.done ():
+                task.cancel ()
+        self._iterRunning = []
+        await self.browser.__aexit__ (exc_type, exc_value, traceback)
+        self.tab = None
         return False
 
     def __len__ (self):
         return len (self.requests)
 
-    def __iter__ (self):
-        return iter (self.queue)
-
-    def start (self):
-        self.tab.Page.navigate(url=self.url)
-
-    # use event to signal presence of new items. This way the controller
-    # can wait for them without polling.
-    def _append (self, item):
-        self.queue.append (item)
-        self.notify.set ()
-
-    def _appendleft (self, item):
-        self.queue.appendleft (item)
-        self.notify.set ()
+    async def __aiter__ (self):
+        """ Retrieve network items """
+        tab = self.tab
+        assert tab is not None
+        handler = {
+                tab.Network.requestWillBeSent: self._requestWillBeSent,
+                tab.Network.responseReceived: self._responseReceived,
+                tab.Network.loadingFinished: self._loadingFinished,
+                tab.Network.loadingFailed: self._loadingFailed,
+                tab.Log.entryAdded: self._entryAdded,
+                tab.Page.javascriptDialogOpening: self._javascriptDialogOpening,
+                #tab.Inspector.targetCrashed: self._targetCrashed,
+                }
+
+        # The implementation is a little advanced. Why? The goal here is to
+        # process events from the tab as quickly as possible (i.e.
+        # asynchronously). We need to make sure that JavaScript dialogs are
+        # handled immediately for instance. Otherwise they stall every
+        # other request. Also, we don’t want to use an unbounded queue,
+        # since the items yielded can get quite big (response body). Thus
+        # we need to block (yield) for every item completed, but not
+        # handled by the consumer (caller).
+        running = self._iterRunning
+        running.append (asyncio.ensure_future (self.tab.get ()))
+        while True:
+            done, pending = await asyncio.wait (running, return_when=asyncio.FIRST_COMPLETED)
+            for t in done:
+                result = t.result ()
+                if result is None:
+                    pass
+                elif isinstance (result, Item):
+                    yield result
+                else:
+                    method, data = result
+                    f = handler.get (method, None)
+                    if f is not None:
+                        task = asyncio.ensure_future (f (**data))
+                        pending.add (task)
+                    pending.add (asyncio.ensure_future (self.tab.get ()))
+
+            running = pending
+            self._iterRunning = running
+
+    async def start (self):
+        await self.tab.Page.navigate(url=self.url)
 
     # internal chrome callbacks
-    def _requestWillBeSent (self, **kwargs):
+    async def _requestWillBeSent (self, **kwargs):
         reqId = kwargs['requestId']
         req = kwargs['request']
         logger = self.logger.bind (reqId=reqId, reqUrl=req['url'])
@@ -242,6 +258,7 @@ class SiteLoader:
         if url.scheme not in self.allowedSchemes:
             return
 
+        ret = None
         item = self.requests.get (reqId)
         if item:
             # redirects never “finish” loading, but yield another requestWillBeSent with this key set
@@ -254,7 +271,9 @@ class SiteLoader:
                 item.setFinished (resp)
                 item.isRedirect = True
                 logger.info ('redirect', uuid='85eaec41-e2a9-49c2-9445-6f19690278b8', target=req['url'])
-                self._append (item)
+                await item.prefetchRequestBody (self.tab)
+                # cannot fetch request body due to race condition (item id reused)
+                ret = item
             else:
                 logger.warning ('request exists', uuid='2c989142-ba00-4791-bb03-c2a14e91a56b')
 
@@ -263,7 +282,9 @@ class SiteLoader:
         self.requests[reqId] = item
         logger.debug ('request', uuid='55c17564-1bd0-4499-8724-fa7aad65478f')
 
-    def _responseReceived (self, **kwargs):
+        return ret
+
+    async def _responseReceived (self, **kwargs):
         reqId = kwargs['requestId']
         item = self.requests.get (reqId)
         if item is None:
@@ -278,7 +299,7 @@ class SiteLoader:
         else:
             logger.warning ('scheme forbidden', uuid='2ea6e5d7-dd3b-4881-b9de-156c1751c666')
 
-    def _loadingFinished (self, **kwargs):
+    async def _loadingFinished (self, **kwargs):
         """
         Item was fully loaded. For some items the request body is not available
         when responseReceived is fired, thus move everything here.
@@ -297,9 +318,10 @@ class SiteLoader:
         if url.scheme in self.allowedSchemes:
             logger.info ('finished', uuid='5a8b4bad-f86a-4fe6-a53e-8da4130d6a02')
             item.setFinished (kwargs)
-            self._append (item)
+            await asyncio.gather (item.prefetchRequestBody (self.tab), item.prefetchResponseBody (self.tab))
+            return item
 
-    def _loadingFailed (self, **kwargs):
+    async def _loadingFailed (self, **kwargs):
         reqId = kwargs['requestId']
         self.logger.warning ('loading failed',
                 uuid='68410f13-6eea-453e-924e-c1af4601748b',
@@ -307,9 +329,9 @@ class SiteLoader:
                 blockedReason=kwargs.get ('blockedReason'))
         item = self.requests.pop (reqId, None)
         item.failed = True
-        self._append (item)
+        return item
 
-    def _entryAdded (self, **kwargs):
+    async def _entryAdded (self, **kwargs):
         """ Log entry added """
         entry = kwargs['entry']
         level = {'verbose': Level.DEBUG, 'info': Level.INFO,
@@ -318,26 +340,22 @@ class SiteLoader:
         entry['uuid'] = 'e62ffb5a-0521-459c-a3d9-1124551934d2'
         self.logger (level, 'console', **entry)
 
-    def _javascriptDialogOpening (self, **kwargs):
+    async def _javascriptDialogOpening (self, **kwargs):
         t = kwargs.get ('type')
         if t in {'alert', 'confirm', 'prompt'}:
             self.logger.info ('js dialog',
                     uuid='d6f07ce2-648e-493b-a1df-f353bed27c84',
                     action='cancel', type=t, message=kwargs.get ('message'))
-            self.tab.Page.handleJavaScriptDialog (accept=False)
+            await self.tab.Page.handleJavaScriptDialog (accept=False)
         elif t == 'beforeunload':
             # we must accept this one, otherwise the page will not unload/close
             self.logger.info ('js dialog',
                     uuid='96399b99-9834-4c8f-bd93-cb9fa2225abd',
                     action='proceed', type=t, message=kwargs.get ('message'))
-            self.tab.Page.handleJavaScriptDialog (accept=True)
-        else:
-            self.logger.warning ('js dialog unknown', uuid='3ef7292e-8595-4e89-b834-0cc6bc40ee38', **kwargs)
-
-    def _targetCrashed (self, **kwargs):
-        self.logger.error ('browser crashed', uuid='6fe2b3be-ff01-4503-b30c-ad6aeea953ef')
-        # priority message
-        self._appendleft (BrowserCrashed ())
+            await self.tab.Page.handleJavaScriptDialog (accept=True)
+        else: # pragma: no cover
+            self.logger.warning ('js dialog unknown',
+                    uuid='3ef7292e-8595-4e89-b834-0cc6bc40ee38', **kwargs)
 
 import subprocess, os, time
 from tempfile import mkdtemp
@@ -397,6 +415,7 @@ class ChromeService:
         self.p.wait ()
         shutil.rmtree (self.userDataDir)
         self.p = None
+        return False
 
 class NullService:
     __slots__ = ('url')
@@ -408,5 +427,5 @@ class NullService:
         return self.url
 
     def __exit__ (self, *exc):
-        pass
+        return False
 
diff --git a/crocoite/test_browser.py b/crocoite/test_browser.py
index 5c7fc69..030ffb1 100644
--- a/crocoite/test_browser.py
+++ b/crocoite/test_browser.py
@@ -18,13 +18,19 @@
 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 # THE SOFTWARE.
 
+import logging
+import asyncio
 import pytest
 from operator import itemgetter
+from aiohttp import web
 from http.server import BaseHTTPRequestHandler
-from pychrome.exceptions import TimeoutException
 
-from .browser import Item, SiteLoader, ChromeService, NullService, BrowserCrashed
-from .logger import Logger, Consumer
+from .browser import Item, SiteLoader, ChromeService, NullService
+from .logger import Logger, Consumer, JsonPrintConsumer
+from .devtools import Crashed
+
+# if you want to know what’s going on:
+#logging.basicConfig(level=logging.DEBUG)
 
 class TItem (Item):
     """ This should be as close to Item as possible """
@@ -32,21 +38,14 @@ class TItem (Item):
     __slots__ = ('bodySend', '_body', '_requestBody')
     base = 'http://localhost:8000/'
 
-    def __init__ (self, path, status, headers, bodyReceive, bodySend=None, requestBody=None, failed=False):
+    def __init__ (self, path, status, headers, bodyReceive, bodySend=None, requestBody=None, failed=False, isRedirect=False):
         super ().__init__ (tab=None)
         self.chromeResponse = {'response': {'headers': headers, 'status': status, 'url': self.base + path}}
-        self._body = bodyReceive, False
+        self.body = bodyReceive, False
         self.bodySend = bodyReceive if not bodySend else bodySend
-        self._requestBody = requestBody, False
+        self.requestBody = requestBody, False
         self.failed = failed
-
-    @property
-    def body (self):
-        return self._body
-
-    @property
-    def requestBody (self):
-        return self._requestBody
+        self.isRedirect = isRedirect
 
 testItems = [
     TItem ('binary', 200, {'Content-Type': 'application/octet-stream'}, b'\x00\x01\x02', failed=True),
@@ -66,15 +65,15 @@ testItems = [
     TItem ('image', 200, {'Content-Type': 'image/png'},
             # 1×1 png image
             b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x00\x00\x00\x00:~\x9bU\x00\x00\x00\nIDAT\x08\x1dc\xf8\x0f\x00\x01\x01\x01\x006_g\x80\x00\x00\x00\x00IEND\xaeB`\x82'),
-    TItem ('empty', 200, {}, b''),
-    TItem ('redirect/301/empty', 301, {'Location': '/empty'}, b''),
-    TItem ('redirect/301/redirect/301/empty', 301, {'Location': '/redirect/301/empty'}, b''),
+    TItem ('empty', 200, {'Content-Type': 'text/plain'}, b''),
+    TItem ('redirect/301/empty', 301, {'Location': '/empty'}, b'', isRedirect=True),
+    TItem ('redirect/301/redirect/301/empty', 301, {'Location': '/redirect/301/empty'}, b'', isRedirect=True),
     TItem ('nonexistent', 404, {}, b''),
-    TItem ('html', 200, {'Content-Type': 'html'},
+    TItem ('html', 200, {'Content-Type': 'text/html'},
             '<html><body><img src="/image"><img src="/nonexistent"></body></html>'.encode ('utf8')),
-    TItem ('html/alert', 200, {'Content-Type': 'html'},
-            '<html><body><script>window.addEventListener("beforeunload", function (e) { e.returnValue = "bye?"; return e.returnValue; }); alert("stopping here"); if (confirm("are you sure?") || prompt ("42?")) { window.location = "/nonexistent"; }</script><img src="/image"></body></html>'.encode ('utf8')),
-    TItem ('html/fetchPost', 200, {'Content-Type': 'html'},
+    TItem ('html/alert', 200, {'Content-Type': 'text/html'},
+            '<html><body><script>window.addEventListener("beforeunload", function (e) { e.returnValue = "bye?"; return e.returnValue; }); alert("stopping here"); if (confirm("are you sure?") || prompt ("42?")) { window.location = "/nonexistent"; }</script><script>document.write(\'<img src="/image">\');</script></body></html>'.encode ('utf8')),
+    TItem ('html/fetchPost', 200, {'Content-Type': 'text/html'},
             r"""<html><body><script>
             let a = fetch("/html/fetchPost/binary", {"method": "POST", "body": "\x00"});
             let b = fetch("/html/fetchPost/form", {"method": "POST", "body": new URLSearchParams({"data": "!"})});
@@ -89,156 +88,145 @@ testItems = [
     ]
 testItemMap = dict ([(item.parsedUrl.path, item) for item in testItems])
 
-class RequestHandler (BaseHTTPRequestHandler):
-    def do_GET(self):
-        item = testItemMap.get (self.path)
-        if item:
-            self.send_response (item.response['status'])
-            for k, v in item.response['headers'].items ():
-                self.send_header (k, v)
-            body = item.bodySend
-            self.send_header ('Content-Length', len (body))
-            self.end_headers()
-            self.wfile.write (body)
-        return
-
-    do_POST = do_GET
-
-    def log_message (self, format, *args):
-        pass
+def itemToResponse (item):
+    async def f (req):
+        headers = item.response['headers']
+        return web.Response(body=item.bodySend, status=item.response['status'],
+                headers=headers)
+    return f
 
 @pytest.fixture
-def http ():
-    def run ():
-        import http.server
-        PORT = 8000
-        httpd = http.server.HTTPServer (("localhost", PORT), RequestHandler)
-        print ('starting http server')
-        httpd.serve_forever()
-
-    from multiprocessing import Process
-    p = Process (target=run)
-    p.start ()
-    yield p
-    p.terminate ()
-    p.join ()
+async def server ():
+    """ Simple HTTP server for testing notifications """
+    import logging
+    logging.basicConfig(level=logging.DEBUG)
+    app = web.Application(debug=True)
+    for item in testItems:
+        app.router.add_route ('*', item.parsedUrl.path, itemToResponse (item))
+    runner = web.AppRunner(app)
+    await runner.setup()
+    site = web.TCPSite(runner, 'localhost', 8080)
+    await site.start()
+    yield app
+    await runner.cleanup ()
 
 class AssertConsumer (Consumer):
     def __call__ (self, **kwargs):
         assert 'uuid' in kwargs
         assert 'msg' in kwargs
         assert 'context' in kwargs
+        return kwargs
 
 @pytest.fixture
 def logger ():
     return Logger (consumer=[AssertConsumer ()])
 
 @pytest.fixture
-def loader (http, logger):
+def loader (server, logger):
     def f (path):
         if path.startswith ('/'):
-            path = 'http://localhost:8000{}'.format (path)
+            path = 'http://localhost:8080{}'.format (path)
         return SiteLoader (browser, path, logger)
-    print ('loader setup')
     with ChromeService () as browser:
         yield f
-    print ('loader teardown')
 
-def itemsLoaded (l, items):
+async def itemsLoaded (l, items):
     items = dict ([(i.parsedUrl.path, i) for i in items])
-    timeout = 5
-    while True:
-        if not l.notify.wait (timeout) and len (items) > 0:
-            assert False, 'timeout'
-        if len (l.queue) > 0:
-            item = l.queue.popleft ()
-            if isinstance (item, Exception):
-                raise item
-            assert item.chromeResponse is not None
-            golden = items.pop (item.parsedUrl.path)
-            if not golden:
-                assert False, 'url {} not supposed to be fetched'.format (item.url)
-            assert item.failed == golden.failed
-            if item.failed:
-                # response will be invalid if request failed
+    async for item in l:
+        assert item.chromeResponse is not None
+        golden = items.pop (item.parsedUrl.path)
+        if not golden:
+            assert False, 'url {} not supposed to be fetched'.format (item.url)
+        assert item.failed == golden.failed
+        if item.failed:
+            # response will be invalid if request failed
+            if not items:
+                break
+            else:
                 continue
+        assert item.isRedirect == golden.isRedirect
+        if golden.isRedirect:
+            assert item.body is None
+        else:
             assert item.body[0] == golden.body[0]
-            assert item.requestBody[0] == golden.requestBody[0]
-            assert item.response['status'] == golden.response['status']
-            assert item.statusText == BaseHTTPRequestHandler.responses.get (item.response['status'])[0]
-            for k, v in golden.responseHeaders:
-                actual = list (map (itemgetter (1), filter (lambda x: x[0] == k, item.responseHeaders)))
-                assert v in actual
-
-        # check queue at least once
+        assert item.requestBody[0] == golden.requestBody[0]
+        assert item.response['status'] == golden.response['status']
+        assert item.statusText == BaseHTTPRequestHandler.responses.get (item.response['status'])[0]
+        for k, v in golden.responseHeaders:
+            actual = list (map (itemgetter (1), filter (lambda x: x[0] == k, item.responseHeaders)))
+            assert v in actual
+        
+        # we’re done when everything has been loaded
         if not items:
             break
 
-def literalItem (lf, item, deps=[]):
-    with lf (item.parsedUrl.path) as l:
-        l.start ()
-        itemsLoaded (l, [item] + deps)
+async def literalItem (lf, item, deps=[]):
+    async with lf (item.parsedUrl.path) as l:
+        await l.start ()
+        await asyncio.wait_for (itemsLoaded (l, [item] + deps), timeout=30)
 
-def test_empty (loader):
-    literalItem (loader, testItemMap['/empty'])
+@pytest.mark.asyncio
+async def test_empty (loader):
+    await literalItem (loader, testItemMap['/empty'])
 
-def test_redirect (loader):
-    literalItem (loader, testItemMap['/redirect/301/empty'], [testItemMap['/empty']])
+@pytest.mark.asyncio
+async def test_redirect (loader):
+    await literalItem (loader, testItemMap['/redirect/301/empty'], [testItemMap['/empty']])
     # chained redirects
-    literalItem (loader, testItemMap['/redirect/301/redirect/301/empty'], [testItemMap['/redirect/301/empty'], testItemMap['/empty']])
+    await literalItem (loader, testItemMap['/redirect/301/redirect/301/empty'], [testItemMap['/redirect/301/empty'], testItemMap['/empty']])
 
-def test_encoding (loader):
+@pytest.mark.asyncio
+async def test_encoding (loader):
     """ Text responses are transformed to UTF-8. Make sure this works
     correctly. """
     for item in {testItemMap['/encoding/utf8'], testItemMap['/encoding/latin1'], testItemMap['/encoding/iso88591']}:
-        literalItem (loader, item)
+        await literalItem (loader, item)
 
-def test_binary (loader):
+@pytest.mark.asyncio
+async def test_binary (loader):
     """ Browser should ignore content it cannot display (i.e. octet-stream) """
-    literalItem (loader, testItemMap['/binary'])
+    await literalItem (loader, testItemMap['/binary'])
 
-def test_image (loader):
+@pytest.mark.asyncio
+async def test_image (loader):
     """ Images should be displayed inline """
-    literalItem (loader, testItemMap['/image'])
+    await literalItem (loader, testItemMap['/image'])
 
-def test_attachment (loader):
+@pytest.mark.asyncio
+async def test_attachment (loader):
     """ And downloads won’t work in headless mode, even if it’s just a text file """
-    literalItem (loader, testItemMap['/attachment'])
+    await literalItem (loader, testItemMap['/attachment'])
 
-def test_html (loader):
-    literalItem (loader, testItemMap['/html'], [testItemMap['/image'], testItemMap['/nonexistent']])
+@pytest.mark.asyncio
+async def test_html (loader):
+    await literalItem (loader, testItemMap['/html'], [testItemMap['/image'], testItemMap['/nonexistent']])
     # make sure alerts are dismissed correctly (image won’t load otherwise)
-    literalItem (loader, testItemMap['/html/alert'], [testItemMap['/image']])
+    await literalItem (loader, testItemMap['/html/alert'], [testItemMap['/image']])
 
-def test_post (loader):
+@pytest.mark.asyncio
+async def test_post (loader):
     """ XHR POST request with binary data"""
-    literalItem (loader, testItemMap['/html/fetchPost'],
+    await literalItem (loader, testItemMap['/html/fetchPost'],
             [testItemMap['/html/fetchPost/binary'],
             testItemMap['/html/fetchPost/binary/large'],
             testItemMap['/html/fetchPost/form'],
             testItemMap['/html/fetchPost/form/large']])
 
-def test_crash (loader):
-    with loader ('/html') as l:
-        l.start ()
-        try:
-            l.tab.Page.crash (_timeout=1)
-        except TimeoutException:
-            pass
-        q = l.queue
-        assert isinstance (q.popleft (), BrowserCrashed)
-
-def test_invalidurl (loader):
-    url = 'http://nonexistent.example/'
-    with loader (url) as l:
-        l.start ()
-
-        q = l.queue
-        if not l.notify.wait (10):
-            assert False, 'timeout'
+@pytest.mark.asyncio
+async def test_crash (loader):
+    async with loader ('/html') as l:
+        await l.start ()
+        with pytest.raises (Crashed):
+            await l.tab.Page.crash ()
 
-        it = q.popleft ()
-        assert it.failed
+@pytest.mark.asyncio
+async def test_invalidurl (loader):
+    url = 'http://nonexistent.example/'
+    async with loader (url) as l:
+        await l.start ()
+        async for it in l:
+            assert it.failed
+            break
 
 def test_nullservice ():
     """ Null service returns the url as is """
-- 
cgit v1.2.3