summaryrefslogtreecommitdiff
path: root/crocoite
diff options
context:
space:
mode:
authorLars-Dominik Braun <lars@6xq.net>2018-11-03 13:32:38 +0100
committerLars-Dominik Braun <lars@6xq.net>2018-11-06 16:54:39 +0100
commit60fe79f2d898757f4f20aa89015e86cd63ef7871 (patch)
treee36e42b54dd4e67787930fc2a5918b010ec5523e /crocoite
parent89d5e6bcf4e3a2f6e0ed0e222e15cc80604f7351 (diff)
downloadcrocoite-60fe79f2d898757f4f20aa89015e86cd63ef7871.tar.gz
crocoite-60fe79f2d898757f4f20aa89015e86cd63ef7871.tar.bz2
crocoite-60fe79f2d898757f4f20aa89015e86cd63ef7871.zip
Switch site loader to async DevTools communication
Diffstat (limited to 'crocoite')
-rw-r--r--crocoite/browser.py233
-rw-r--r--crocoite/test_browser.py232
2 files changed, 236 insertions, 229 deletions
diff --git a/crocoite/browser.py b/crocoite/browser.py
index b5ea4e3..515d06b 100644
--- a/crocoite/browser.py
+++ b/crocoite/browser.py
@@ -22,30 +22,32 @@
Chrome browser interactions.
"""
+import asyncio
from urllib.parse import urlsplit
from base64 import b64decode
from collections import deque
from threading import Event
from http.server import BaseHTTPRequestHandler
-from .logger import Level
-import pychrome
+from .logger import Level
+from .devtools import Browser, TabException
class Item:
"""
Simple wrapper containing Chrome request and response
"""
- __slots__ = ('tab', 'chromeRequest', 'chromeResponse', 'chromeFinished',
- 'isRedirect', 'failed')
+ __slots__ = ('chromeRequest', 'chromeResponse', 'chromeFinished',
+ 'isRedirect', 'failed', 'body', 'requestBody')
def __init__ (self, tab):
- self.tab = tab
self.chromeRequest = {}
self.chromeResponse = {}
self.chromeFinished = {}
self.isRedirect = False
self.failed = False
+ self.body = None
+ self.requestBody = None
def __repr__ (self):
return '<Item {}>'.format (self.url)
@@ -79,36 +81,6 @@ class Item:
return urlsplit (self.url)
@property
- def body (self):
- """ Return response body or None """
- try:
- body = self.tab.Network.getResponseBody (requestId=self.id, _timeout=10)
- rawBody = body['body']
- base64Encoded = body['base64Encoded']
- if base64Encoded:
- rawBody = b64decode (rawBody)
- else:
- rawBody = rawBody.encode ('utf8')
- return rawBody, base64Encoded
- except (pychrome.exceptions.CallMethodException, pychrome.exceptions.TimeoutException):
- raise ValueError ('Cannot fetch response body')
-
- @property
- def requestBody (self):
- """ Get request/POST body """
- req = self.request
- postData = req.get ('postData')
- if postData:
- return postData.encode ('utf8'), False
- elif req.get ('hasPostData', False):
- try:
- postData = self.tab.Network.getRequestPostData (requestId=self.id, _timeout=10)['postData']
- return b64decode (postData), True
- except (pychrome.exceptions.CallMethodException, pychrome.exceptions.TimeoutException):
- raise ValueError ('Cannot fetch request body')
- return None, False
-
- @property
def requestHeaders (self):
# the response object may contain refined headers, which were
# *actually* sent over the wire
@@ -153,87 +125,131 @@ class Item:
def setFinished (self, finished):
self.chromeFinished = finished
-class BrowserCrashed (Exception):
- pass
+ async def prefetchRequestBody (self, tab):
+ # request body
+ req = self.request
+ postData = req.get ('postData')
+ if postData:
+ self.requestBody = postData.encode ('utf8'), False
+ elif req.get ('hasPostData', False):
+ try:
+ postData = await tab.Network.getRequestPostData (requestId=self.id)
+ postData = postData['postData']
+ self.requestBody = b64decode (postData), True
+ except TabException:
+ self.requestBody = None
+ else:
+ self.requestBody = None, False
+
+ async def prefetchResponseBody (self, tab):
+ # get response body
+ try:
+ body = await tab.Network.getResponseBody (requestId=self.id)
+ rawBody = body['body']
+ base64Encoded = body['base64Encoded']
+ if base64Encoded:
+ rawBody = b64decode (rawBody)
+ else:
+ rawBody = rawBody.encode ('utf8')
+ self.body = rawBody, base64Encoded
+ except TabException:
+ self.body = None
class SiteLoader:
"""
Load site in Chrome and monitor network requests
- Chrome’s raw devtools events are preprocessed here (asynchronously, in a
- different thread, spawned by pychrome) and put into a deque. There
- are two reasons for this: First of all, it makes consumer exception
- handling alot easier (no need to propagate them to the main thread). And
- secondly, browser crashes must be handled before everything else, as they
- result in a loss of communication with the browser itself (i.e. we can’t
- fetch a resource’s body any more).
-
XXX: track popup windows/new tabs and close them
"""
- __slots__ = ('requests', 'browser', 'url', 'logger', 'queue', 'notify', 'tab')
+ __slots__ = ('requests', 'browser', 'url', 'logger', 'tab', '_iterRunning')
allowedSchemes = {'http', 'https'}
def __init__ (self, browser, url, logger):
self.requests = {}
- self.browser = pychrome.Browser (url=browser)
+ self.browser = Browser (url=browser)
self.url = url
self.logger = logger.bind (context=type (self).__name__, url=url)
- self.queue = deque ()
- self.notify = Event ()
+ self._iterRunning = []
- def __enter__ (self):
- tab = self.tab = self.browser.new_tab()
- # setup callbacks
- tab.Network.requestWillBeSent = self._requestWillBeSent
- tab.Network.responseReceived = self._responseReceived
- tab.Network.loadingFinished = self._loadingFinished
- tab.Network.loadingFailed = self._loadingFailed
- tab.Log.entryAdded = self._entryAdded
- tab.Page.javascriptDialogOpening = self._javascriptDialogOpening
- tab.Inspector.targetCrashed = self._targetCrashed
-
- # start the tab
- tab.start()
+ async def __aenter__ (self):
+ tab = self.tab = await self.browser.__aenter__ ()
# enable events
- tab.Log.enable ()
- tab.Network.enable()
- tab.Page.enable ()
- tab.Inspector.enable ()
- tab.Network.clearBrowserCache ()
- if tab.Network.canClearBrowserCookies ()['result']:
- tab.Network.clearBrowserCookies ()
+ await asyncio.gather (*[
+ tab.Log.enable (),
+ tab.Network.enable(),
+ tab.Page.enable (),
+ tab.Inspector.enable (),
+ tab.Network.clearBrowserCache (),
+ ])
+ resp = await tab.Network.canClearBrowserCookies ()
+ if resp['result']:
+ await tab.Network.clearBrowserCookies ()
return self
- def __exit__ (self, exc_type, exc_value, traceback):
- self.tab.Page.stopLoading ()
- self.tab.stop ()
- self.browser.close_tab(self.tab)
+ async def __aexit__ (self, exc_type, exc_value, traceback):
+ for task in self._iterRunning:
+ # ignore any results from stuff we did not end up using anyway
+ if not task.done ():
+ task.cancel ()
+ self._iterRunning = []
+ await self.browser.__aexit__ (exc_type, exc_value, traceback)
+ self.tab = None
return False
def __len__ (self):
return len (self.requests)
- def __iter__ (self):
- return iter (self.queue)
-
- def start (self):
- self.tab.Page.navigate(url=self.url)
-
- # use event to signal presence of new items. This way the controller
- # can wait for them without polling.
- def _append (self, item):
- self.queue.append (item)
- self.notify.set ()
-
- def _appendleft (self, item):
- self.queue.appendleft (item)
- self.notify.set ()
+ async def __aiter__ (self):
+ """ Retrieve network items """
+ tab = self.tab
+ assert tab is not None
+ handler = {
+ tab.Network.requestWillBeSent: self._requestWillBeSent,
+ tab.Network.responseReceived: self._responseReceived,
+ tab.Network.loadingFinished: self._loadingFinished,
+ tab.Network.loadingFailed: self._loadingFailed,
+ tab.Log.entryAdded: self._entryAdded,
+ tab.Page.javascriptDialogOpening: self._javascriptDialogOpening,
+ #tab.Inspector.targetCrashed: self._targetCrashed,
+ }
+
+ # The implementation is a little advanced. Why? The goal here is to
+ # process events from the tab as quickly as possible (i.e.
+ # asynchronously). We need to make sure that JavaScript dialogs are
+ # handled immediately for instance. Otherwise they stall every
+ # other request. Also, we don’t want to use an unbounded queue,
+ # since the items yielded can get quite big (response body). Thus
+ # we need to block (yield) for every item completed, but not
+ # handled by the consumer (caller).
+ running = self._iterRunning
+ running.append (asyncio.ensure_future (self.tab.get ()))
+ while True:
+ done, pending = await asyncio.wait (running, return_when=asyncio.FIRST_COMPLETED)
+ for t in done:
+ result = t.result ()
+ if result is None:
+ pass
+ elif isinstance (result, Item):
+ yield result
+ else:
+ method, data = result
+ f = handler.get (method, None)
+ if f is not None:
+ task = asyncio.ensure_future (f (**data))
+ pending.add (task)
+ pending.add (asyncio.ensure_future (self.tab.get ()))
+
+ running = pending
+ self._iterRunning = running
+
+ async def start (self):
+ await self.tab.Page.navigate(url=self.url)
# internal chrome callbacks
- def _requestWillBeSent (self, **kwargs):
+ async def _requestWillBeSent (self, **kwargs):
reqId = kwargs['requestId']
req = kwargs['request']
logger = self.logger.bind (reqId=reqId, reqUrl=req['url'])
@@ -242,6 +258,7 @@ class SiteLoader:
if url.scheme not in self.allowedSchemes:
return
+ ret = None
item = self.requests.get (reqId)
if item:
# redirects never “finish” loading, but yield another requestWillBeSent with this key set
@@ -254,7 +271,9 @@ class SiteLoader:
item.setFinished (resp)
item.isRedirect = True
logger.info ('redirect', uuid='85eaec41-e2a9-49c2-9445-6f19690278b8', target=req['url'])
- self._append (item)
+ await item.prefetchRequestBody (self.tab)
+ # cannot fetch request body due to race condition (item id reused)
+ ret = item
else:
logger.warning ('request exists', uuid='2c989142-ba00-4791-bb03-c2a14e91a56b')
@@ -263,7 +282,9 @@ class SiteLoader:
self.requests[reqId] = item
logger.debug ('request', uuid='55c17564-1bd0-4499-8724-fa7aad65478f')
- def _responseReceived (self, **kwargs):
+ return ret
+
+ async def _responseReceived (self, **kwargs):
reqId = kwargs['requestId']
item = self.requests.get (reqId)
if item is None:
@@ -278,7 +299,7 @@ class SiteLoader:
else:
logger.warning ('scheme forbidden', uuid='2ea6e5d7-dd3b-4881-b9de-156c1751c666')
- def _loadingFinished (self, **kwargs):
+ async def _loadingFinished (self, **kwargs):
"""
Item was fully loaded. For some items the request body is not available
when responseReceived is fired, thus move everything here.
@@ -297,9 +318,10 @@ class SiteLoader:
if url.scheme in self.allowedSchemes:
logger.info ('finished', uuid='5a8b4bad-f86a-4fe6-a53e-8da4130d6a02')
item.setFinished (kwargs)
- self._append (item)
+ await asyncio.gather (item.prefetchRequestBody (self.tab), item.prefetchResponseBody (self.tab))
+ return item
- def _loadingFailed (self, **kwargs):
+ async def _loadingFailed (self, **kwargs):
reqId = kwargs['requestId']
self.logger.warning ('loading failed',
uuid='68410f13-6eea-453e-924e-c1af4601748b',
@@ -307,9 +329,9 @@ class SiteLoader:
blockedReason=kwargs.get ('blockedReason'))
item = self.requests.pop (reqId, None)
item.failed = True
- self._append (item)
+ return item
- def _entryAdded (self, **kwargs):
+ async def _entryAdded (self, **kwargs):
""" Log entry added """
entry = kwargs['entry']
level = {'verbose': Level.DEBUG, 'info': Level.INFO,
@@ -318,26 +340,22 @@ class SiteLoader:
entry['uuid'] = 'e62ffb5a-0521-459c-a3d9-1124551934d2'
self.logger (level, 'console', **entry)
- def _javascriptDialogOpening (self, **kwargs):
+ async def _javascriptDialogOpening (self, **kwargs):
t = kwargs.get ('type')
if t in {'alert', 'confirm', 'prompt'}:
self.logger.info ('js dialog',
uuid='d6f07ce2-648e-493b-a1df-f353bed27c84',
action='cancel', type=t, message=kwargs.get ('message'))
- self.tab.Page.handleJavaScriptDialog (accept=False)
+ await self.tab.Page.handleJavaScriptDialog (accept=False)
elif t == 'beforeunload':
# we must accept this one, otherwise the page will not unload/close
self.logger.info ('js dialog',
uuid='96399b99-9834-4c8f-bd93-cb9fa2225abd',
action='proceed', type=t, message=kwargs.get ('message'))
- self.tab.Page.handleJavaScriptDialog (accept=True)
- else:
- self.logger.warning ('js dialog unknown', uuid='3ef7292e-8595-4e89-b834-0cc6bc40ee38', **kwargs)
-
- def _targetCrashed (self, **kwargs):
- self.logger.error ('browser crashed', uuid='6fe2b3be-ff01-4503-b30c-ad6aeea953ef')
- # priority message
- self._appendleft (BrowserCrashed ())
+ await self.tab.Page.handleJavaScriptDialog (accept=True)
+ else: # pragma: no cover
+ self.logger.warning ('js dialog unknown',
+ uuid='3ef7292e-8595-4e89-b834-0cc6bc40ee38', **kwargs)
import subprocess, os, time
from tempfile import mkdtemp
@@ -397,6 +415,7 @@ class ChromeService:
self.p.wait ()
shutil.rmtree (self.userDataDir)
self.p = None
+ return False
class NullService:
__slots__ = ('url')
@@ -408,5 +427,5 @@ class NullService:
return self.url
def __exit__ (self, *exc):
- pass
+ return False
diff --git a/crocoite/test_browser.py b/crocoite/test_browser.py
index 5c7fc69..030ffb1 100644
--- a/crocoite/test_browser.py
+++ b/crocoite/test_browser.py
@@ -18,13 +18,19 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
+import logging
+import asyncio
import pytest
from operator import itemgetter
+from aiohttp import web
from http.server import BaseHTTPRequestHandler
-from pychrome.exceptions import TimeoutException
-from .browser import Item, SiteLoader, ChromeService, NullService, BrowserCrashed
-from .logger import Logger, Consumer
+from .browser import Item, SiteLoader, ChromeService, NullService
+from .logger import Logger, Consumer, JsonPrintConsumer
+from .devtools import Crashed
+
+# if you want to know what’s going on:
+#logging.basicConfig(level=logging.DEBUG)
class TItem (Item):
""" This should be as close to Item as possible """
@@ -32,21 +38,14 @@ class TItem (Item):
__slots__ = ('bodySend', '_body', '_requestBody')
base = 'http://localhost:8000/'
- def __init__ (self, path, status, headers, bodyReceive, bodySend=None, requestBody=None, failed=False):
+ def __init__ (self, path, status, headers, bodyReceive, bodySend=None, requestBody=None, failed=False, isRedirect=False):
super ().__init__ (tab=None)
self.chromeResponse = {'response': {'headers': headers, 'status': status, 'url': self.base + path}}
- self._body = bodyReceive, False
+ self.body = bodyReceive, False
self.bodySend = bodyReceive if not bodySend else bodySend
- self._requestBody = requestBody, False
+ self.requestBody = requestBody, False
self.failed = failed
-
- @property
- def body (self):
- return self._body
-
- @property
- def requestBody (self):
- return self._requestBody
+ self.isRedirect = isRedirect
testItems = [
TItem ('binary', 200, {'Content-Type': 'application/octet-stream'}, b'\x00\x01\x02', failed=True),
@@ -66,15 +65,15 @@ testItems = [
TItem ('image', 200, {'Content-Type': 'image/png'},
# 1×1 png image
b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x00\x00\x00\x00:~\x9bU\x00\x00\x00\nIDAT\x08\x1dc\xf8\x0f\x00\x01\x01\x01\x006_g\x80\x00\x00\x00\x00IEND\xaeB`\x82'),
- TItem ('empty', 200, {}, b''),
- TItem ('redirect/301/empty', 301, {'Location': '/empty'}, b''),
- TItem ('redirect/301/redirect/301/empty', 301, {'Location': '/redirect/301/empty'}, b''),
+ TItem ('empty', 200, {'Content-Type': 'text/plain'}, b''),
+ TItem ('redirect/301/empty', 301, {'Location': '/empty'}, b'', isRedirect=True),
+ TItem ('redirect/301/redirect/301/empty', 301, {'Location': '/redirect/301/empty'}, b'', isRedirect=True),
TItem ('nonexistent', 404, {}, b''),
- TItem ('html', 200, {'Content-Type': 'html'},
+ TItem ('html', 200, {'Content-Type': 'text/html'},
'<html><body><img src="/image"><img src="/nonexistent"></body></html>'.encode ('utf8')),
- TItem ('html/alert', 200, {'Content-Type': 'html'},
- '<html><body><script>window.addEventListener("beforeunload", function (e) { e.returnValue = "bye?"; return e.returnValue; }); alert("stopping here"); if (confirm("are you sure?") || prompt ("42?")) { window.location = "/nonexistent"; }</script><img src="/image"></body></html>'.encode ('utf8')),
- TItem ('html/fetchPost', 200, {'Content-Type': 'html'},
+ TItem ('html/alert', 200, {'Content-Type': 'text/html'},
+ '<html><body><script>window.addEventListener("beforeunload", function (e) { e.returnValue = "bye?"; return e.returnValue; }); alert("stopping here"); if (confirm("are you sure?") || prompt ("42?")) { window.location = "/nonexistent"; }</script><script>document.write(\'<img src="/image">\');</script></body></html>'.encode ('utf8')),
+ TItem ('html/fetchPost', 200, {'Content-Type': 'text/html'},
r"""<html><body><script>
let a = fetch("/html/fetchPost/binary", {"method": "POST", "body": "\x00"});
let b = fetch("/html/fetchPost/form", {"method": "POST", "body": new URLSearchParams({"data": "!"})});
@@ -89,156 +88,145 @@ testItems = [
]
testItemMap = dict ([(item.parsedUrl.path, item) for item in testItems])
-class RequestHandler (BaseHTTPRequestHandler):
- def do_GET(self):
- item = testItemMap.get (self.path)
- if item:
- self.send_response (item.response['status'])
- for k, v in item.response['headers'].items ():
- self.send_header (k, v)
- body = item.bodySend
- self.send_header ('Content-Length', len (body))
- self.end_headers()
- self.wfile.write (body)
- return
-
- do_POST = do_GET
-
- def log_message (self, format, *args):
- pass
+def itemToResponse (item):
+ async def f (req):
+ headers = item.response['headers']
+ return web.Response(body=item.bodySend, status=item.response['status'],
+ headers=headers)
+ return f
@pytest.fixture
-def http ():
- def run ():
- import http.server
- PORT = 8000
- httpd = http.server.HTTPServer (("localhost", PORT), RequestHandler)
- print ('starting http server')
- httpd.serve_forever()
-
- from multiprocessing import Process
- p = Process (target=run)
- p.start ()
- yield p
- p.terminate ()
- p.join ()
+async def server ():
+ """ Simple HTTP server for testing notifications """
+ import logging
+ logging.basicConfig(level=logging.DEBUG)
+ app = web.Application(debug=True)
+ for item in testItems:
+ app.router.add_route ('*', item.parsedUrl.path, itemToResponse (item))
+ runner = web.AppRunner(app)
+ await runner.setup()
+ site = web.TCPSite(runner, 'localhost', 8080)
+ await site.start()
+ yield app
+ await runner.cleanup ()
class AssertConsumer (Consumer):
def __call__ (self, **kwargs):
assert 'uuid' in kwargs
assert 'msg' in kwargs
assert 'context' in kwargs
+ return kwargs
@pytest.fixture
def logger ():
return Logger (consumer=[AssertConsumer ()])
@pytest.fixture
-def loader (http, logger):
+def loader (server, logger):
def f (path):
if path.startswith ('/'):
- path = 'http://localhost:8000{}'.format (path)
+ path = 'http://localhost:8080{}'.format (path)
return SiteLoader (browser, path, logger)
- print ('loader setup')
with ChromeService () as browser:
yield f
- print ('loader teardown')
-def itemsLoaded (l, items):
+async def itemsLoaded (l, items):
items = dict ([(i.parsedUrl.path, i) for i in items])
- timeout = 5
- while True:
- if not l.notify.wait (timeout) and len (items) > 0:
- assert False, 'timeout'
- if len (l.queue) > 0:
- item = l.queue.popleft ()
- if isinstance (item, Exception):
- raise item
- assert item.chromeResponse is not None
- golden = items.pop (item.parsedUrl.path)
- if not golden:
- assert False, 'url {} not supposed to be fetched'.format (item.url)
- assert item.failed == golden.failed
- if item.failed:
- # response will be invalid if request failed
+ async for item in l:
+ assert item.chromeResponse is not None
+ golden = items.pop (item.parsedUrl.path)
+ if not golden:
+ assert False, 'url {} not supposed to be fetched'.format (item.url)
+ assert item.failed == golden.failed
+ if item.failed:
+ # response will be invalid if request failed
+ if not items:
+ break
+ else:
continue
+ assert item.isRedirect == golden.isRedirect
+ if golden.isRedirect:
+ assert item.body is None
+ else:
assert item.body[0] == golden.body[0]
- assert item.requestBody[0] == golden.requestBody[0]
- assert item.response['status'] == golden.response['status']
- assert item.statusText == BaseHTTPRequestHandler.responses.get (item.response['status'])[0]
- for k, v in golden.responseHeaders:
- actual = list (map (itemgetter (1), filter (lambda x: x[0] == k, item.responseHeaders)))
- assert v in actual
-
- # check queue at least once
+ assert item.requestBody[0] == golden.requestBody[0]
+ assert item.response['status'] == golden.response['status']
+ assert item.statusText == BaseHTTPRequestHandler.responses.get (item.response['status'])[0]
+ for k, v in golden.responseHeaders:
+ actual = list (map (itemgetter (1), filter (lambda x: x[0] == k, item.responseHeaders)))
+ assert v in actual
+
+ # we’re done when everything has been loaded
if not items:
break
-def literalItem (lf, item, deps=[]):
- with lf (item.parsedUrl.path) as l:
- l.start ()
- itemsLoaded (l, [item] + deps)
+async def literalItem (lf, item, deps=[]):
+ async with lf (item.parsedUrl.path) as l:
+ await l.start ()
+ await asyncio.wait_for (itemsLoaded (l, [item] + deps), timeout=30)
-def test_empty (loader):
- literalItem (loader, testItemMap['/empty'])
+@pytest.mark.asyncio
+async def test_empty (loader):
+ await literalItem (loader, testItemMap['/empty'])
-def test_redirect (loader):
- literalItem (loader, testItemMap['/redirect/301/empty'], [testItemMap['/empty']])
+@pytest.mark.asyncio
+async def test_redirect (loader):
+ await literalItem (loader, testItemMap['/redirect/301/empty'], [testItemMap['/empty']])
# chained redirects
- literalItem (loader, testItemMap['/redirect/301/redirect/301/empty'], [testItemMap['/redirect/301/empty'], testItemMap['/empty']])
+ await literalItem (loader, testItemMap['/redirect/301/redirect/301/empty'], [testItemMap['/redirect/301/empty'], testItemMap['/empty']])
-def test_encoding (loader):
+@pytest.mark.asyncio
+async def test_encoding (loader):
""" Text responses are transformed to UTF-8. Make sure this works
correctly. """
for item in {testItemMap['/encoding/utf8'], testItemMap['/encoding/latin1'], testItemMap['/encoding/iso88591']}:
- literalItem (loader, item)
+ await literalItem (loader, item)
-def test_binary (loader):
+@pytest.mark.asyncio
+async def test_binary (loader):
""" Browser should ignore content it cannot display (i.e. octet-stream) """
- literalItem (loader, testItemMap['/binary'])
+ await literalItem (loader, testItemMap['/binary'])
-def test_image (loader):
+@pytest.mark.asyncio
+async def test_image (loader):
""" Images should be displayed inline """
- literalItem (loader, testItemMap['/image'])
+ await literalItem (loader, testItemMap['/image'])
-def test_attachment (loader):
+@pytest.mark.asyncio
+async def test_attachment (loader):
""" And downloads won’t work in headless mode, even if it’s just a text file """
- literalItem (loader, testItemMap['/attachment'])
+ await literalItem (loader, testItemMap['/attachment'])
-def test_html (loader):
- literalItem (loader, testItemMap['/html'], [testItemMap['/image'], testItemMap['/nonexistent']])
+@pytest.mark.asyncio
+async def test_html (loader):
+ await literalItem (loader, testItemMap['/html'], [testItemMap['/image'], testItemMap['/nonexistent']])
# make sure alerts are dismissed correctly (image won’t load otherwise)
- literalItem (loader, testItemMap['/html/alert'], [testItemMap['/image']])
+ await literalItem (loader, testItemMap['/html/alert'], [testItemMap['/image']])
-def test_post (loader):
+@pytest.mark.asyncio
+async def test_post (loader):
""" XHR POST request with binary data"""
- literalItem (loader, testItemMap['/html/fetchPost'],
+ await literalItem (loader, testItemMap['/html/fetchPost'],
[testItemMap['/html/fetchPost/binary'],
testItemMap['/html/fetchPost/binary/large'],
testItemMap['/html/fetchPost/form'],
testItemMap['/html/fetchPost/form/large']])
-def test_crash (loader):
- with loader ('/html') as l:
- l.start ()
- try:
- l.tab.Page.crash (_timeout=1)
- except TimeoutException:
- pass
- q = l.queue
- assert isinstance (q.popleft (), BrowserCrashed)
-
-def test_invalidurl (loader):
- url = 'http://nonexistent.example/'
- with loader (url) as l:
- l.start ()
-
- q = l.queue
- if not l.notify.wait (10):
- assert False, 'timeout'
+@pytest.mark.asyncio
+async def test_crash (loader):
+ async with loader ('/html') as l:
+ await l.start ()
+ with pytest.raises (Crashed):
+ await l.tab.Page.crash ()
- it = q.popleft ()
- assert it.failed
+@pytest.mark.asyncio
+async def test_invalidurl (loader):
+ url = 'http://nonexistent.example/'
+ async with loader (url) as l:
+ await l.start ()
+ async for it in l:
+ assert it.failed
+ break
def test_nullservice ():
""" Null service returns the url as is """