diff options
Diffstat (limited to 'crocoite/test_browser.py')
-rw-r--r-- | crocoite/test_browser.py | 531 |
1 files changed, 312 insertions, 219 deletions
diff --git a/crocoite/test_browser.py b/crocoite/test_browser.py index 8008855..4bf2c64 100644 --- a/crocoite/test_browser.py +++ b/crocoite/test_browser.py @@ -19,103 +19,28 @@ # THE SOFTWARE. import asyncio, socket -import pytest from operator import itemgetter -from aiohttp import web from http.server import BaseHTTPRequestHandler +from datetime import datetime + +from yarl import URL +from aiohttp import web +from multidict import CIMultiDict -from .browser import Item, SiteLoader, VarChangeEvent +from hypothesis import given +import hypothesis.strategies as st +import pytest + +from .browser import RequestResponsePair, SiteLoader, VarChangeEvent, Request, \ + UnicodeBody, ReferenceTimestamp, Base64Body, UnicodeBody, Request, \ + Response from .logger import Logger, Consumer from .devtools import Crashed, Process # if you want to know what’s going on: +#import logging #logging.basicConfig(level=logging.DEBUG) -class TItem (Item): - """ This should be as close to Item as possible """ - - __slots__ = ('bodySend', '_body', '_requestBody') - base = 'http://localhost:8000/' - - def __init__ (self, path, status, headers, bodyReceive, bodySend=None, requestBody=None, failed=False, isRedirect=False): - super ().__init__ () - self.chromeResponse = {'response': {'headers': headers, 'status': status, 'url': self.base + path}} - self.body = bodyReceive, False - self.bodySend = bodyReceive if not bodySend else bodySend - self.requestBody = requestBody, False - self.failed = failed - self.isRedirect = isRedirect - -testItems = [ - TItem ('binary', 200, {'Content-Type': 'application/octet-stream'}, b'\x00\x01\x02', failed=True), - TItem ('attachment', 200, - {'Content-Type': 'text/plain; charset=utf-8', - 'Content-Disposition': 'attachment; filename="attachment.txt"', - }, - 'This is a simple text file with umlauts. ÄÖU.'.encode ('utf8'), failed=True), - TItem ('encoding/utf8', 200, {'Content-Type': 'text/plain; charset=utf-8'}, - 'This is a test, äöü μνψκ ¥¥¥¿ýý¡'.encode ('utf8')), - TItem ('encoding/iso88591', 200, {'Content-Type': 'text/plain; charset=ISO-8859-1'}, - 'This is a test, äöü.'.encode ('utf8'), - 'This is a test, äöü.'.encode ('ISO-8859-1')), - TItem ('encoding/latin1', 200, {'Content-Type': 'text/plain; charset=latin1'}, - 'This is a test, äöü.'.encode ('utf8'), - 'This is a test, äöü.'.encode ('latin1')), - TItem ('image', 200, {'Content-Type': 'image/png'}, - # 1×1 png image - b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x00\x00\x00\x00:~\x9bU\x00\x00\x00\nIDAT\x08\x1dc\xf8\x0f\x00\x01\x01\x01\x006_g\x80\x00\x00\x00\x00IEND\xaeB`\x82'), - TItem ('empty', 200, {'Content-Type': 'text/plain'}, b''), - TItem ('headers/duplicate', 200, [('Content-Type', 'text/plain'), ('Duplicate', '1'), ('Duplicate', '2')], b''), - TItem ('headers/fetch/req', 200, {'Content-Type': 'text/plain'}, b''), - TItem ('headers/fetch/html', 200, {'Content-Type': 'text/html'}, - r"""<html><body><script> - let h = new Headers([["custom", "1"]]); - fetch("/headers/fetch/req", {"method": "GET", "headers": h}).then(x => console.log("done")); - </script></body></html>""".encode ('utf8')), - TItem ('redirect/301/empty', 301, {'Location': '/empty'}, b'', isRedirect=True), - TItem ('redirect/301/redirect/301/empty', 301, {'Location': '/redirect/301/empty'}, b'', isRedirect=True), - TItem ('nonexistent', 404, {}, b''), - TItem ('html', 200, {'Content-Type': 'text/html'}, - '<html><body><img src="/image"><img src="/nonexistent"></body></html>'.encode ('utf8')), - TItem ('html/alert', 200, {'Content-Type': 'text/html'}, - '<html><body><script>window.addEventListener("beforeunload", function (e) { e.returnValue = "bye?"; return e.returnValue; }); alert("stopping here"); if (confirm("are you sure?") || prompt ("42?")) { window.location = "/nonexistent"; }</script><script>document.write(\'<img src="/image">\');</script></body></html>'.encode ('utf8')), - TItem ('html/fetchPost', 200, {'Content-Type': 'text/html'}, - r"""<html><body><script> - let a = fetch("/html/fetchPost/binary", {"method": "POST", "body": "\x00"}); - let b = fetch("/html/fetchPost/form", {"method": "POST", "body": new URLSearchParams({"data": "!"})}); - let c = fetch("/html/fetchPost/binary/large", {"method": "POST", "body": "\x00".repeat(100*1024)}); - let d = fetch("/html/fetchPost/form/large", {"method": "POST", "body": new URLSearchParams({"data": "!".repeat(100*1024)})}); - </script></body></html>""".encode ('utf8')), - TItem ('html/fetchPost/binary', 200, {'Content-Type': 'application/octet-stream'}, b'\x00', requestBody=b'\x00'), - TItem ('html/fetchPost/form', 200, {'Content-Type': 'application/octet-stream'}, b'\x00', requestBody=b'data=%21'), - # XXX: these should trigger the need for getRequestPostData, but they don’t. oh well. - TItem ('html/fetchPost/binary/large', 200, {'Content-Type': 'application/octet-stream'}, b'\x00', requestBody=(100*1024)*b'\x00'), - TItem ('html/fetchPost/form/large', 200, {'Content-Type': 'application/octet-stream'}, b'\x00', requestBody=b'data=' + (100*1024)*b'%21'), - ] -testItemMap = dict ([(item.url.path, item) for item in testItems]) - -def itemToResponse (item): - async def f (req): - headers = item.response['headers'] - return web.Response(body=item.bodySend, status=item.response['status'], - headers=headers) - return f - -@pytest.fixture -async def server (): - """ Simple HTTP server for testing notifications """ - import logging - logging.basicConfig(level=logging.DEBUG) - app = web.Application() - for item in testItems: - app.router.add_route ('*', item.url.path, itemToResponse (item)) - runner = web.AppRunner(app) - await runner.setup() - site = web.TCPSite(runner, 'localhost', 8080) - await site.start() - yield app - await runner.cleanup () - class AssertConsumer (Consumer): def __call__ (self, **kwargs): assert 'uuid' in kwargs @@ -128,134 +53,14 @@ def logger (): return Logger (consumer=[AssertConsumer ()]) @pytest.fixture -async def loader (server, logger): - def f (path): - if path.startswith ('/'): - path = 'http://localhost:8080{}'.format (path) - return SiteLoader (browser, path, logger) - async with Process () as browser: - yield f - -async def itemsLoaded (l, items): - items = dict ([(i.url.path, i) for i in items]) - async for item in l: - assert item.chromeResponse is not None - golden = items.pop (item.url.path) - if not golden: - assert False, f'url {item.url} not supposed to be fetched' - assert item.failed == golden.failed - if item.failed: - # response will be invalid if request failed - if not items: - break - else: - continue - assert item.isRedirect == golden.isRedirect - if golden.isRedirect: - assert item.body is None - else: - assert item.body[0] == golden.body[0] - assert item.requestBody[0] == golden.requestBody[0] - assert item.response['status'] == golden.response['status'] - assert item.statusText == BaseHTTPRequestHandler.responses.get (item.response['status'])[0] - for k, v in golden.responseHeaders: - actual = list (map (itemgetter (1), filter (lambda x: x[0] == k, item.responseHeaders))) - assert v in actual - - # we’re done when everything has been loaded - if not items: - break - -async def literalItem (lf, item, deps=[]): - async with lf (item.url.path) as l: - await l.start () - await asyncio.wait_for (itemsLoaded (l, [item] + deps), timeout=30) - -@pytest.mark.asyncio -async def test_empty (loader): - await literalItem (loader, testItemMap['/empty']) - -@pytest.mark.asyncio -async def test_headers_duplicate (loader): - """ - Some headers, like Set-Cookie can be present multiple times. Chrome - separates these with a newline. - """ - async with loader ('/headers/duplicate') as l: - await l.start () - async for it in l: - if it.url.path == '/headers/duplicate': - assert not it.failed - dup = list (filter (lambda x: x[0] == 'Duplicate', it.responseHeaders)) - assert len(dup) == 2 - assert list(sorted(map(itemgetter(1), dup))) == ['1', '2'] - break - -@pytest.mark.asyncio -async def test_headers_req (loader): - """ - Custom request headers. JavaScript’s Headers() does not support duplicate - headers, so we can’t generate those. - """ - async with loader ('/headers/fetch/html') as l: - await l.start () - async for it in l: - if it.url.path == '/headers/fetch/req': - assert not it.failed - dup = list (filter (lambda x: x[0] == 'custom', it.requestHeaders)) - assert len(dup) == 1 - assert list(sorted(map(itemgetter(1), dup))) == ['1'] - break - -@pytest.mark.asyncio -async def test_redirect (loader): - await literalItem (loader, testItemMap['/redirect/301/empty'], [testItemMap['/empty']]) - # chained redirects - await literalItem (loader, testItemMap['/redirect/301/redirect/301/empty'], [testItemMap['/redirect/301/empty'], testItemMap['/empty']]) - -@pytest.mark.asyncio -async def test_encoding (loader): - """ Text responses are transformed to UTF-8. Make sure this works - correctly. """ - for item in {testItemMap['/encoding/utf8'], testItemMap['/encoding/latin1'], testItemMap['/encoding/iso88591']}: - await literalItem (loader, item) - -@pytest.mark.asyncio -async def test_binary (loader): - """ Browser should ignore content it cannot display (i.e. octet-stream) """ - await literalItem (loader, testItemMap['/binary']) - -@pytest.mark.asyncio -async def test_image (loader): - """ Images should be displayed inline """ - await literalItem (loader, testItemMap['/image']) - -@pytest.mark.asyncio -async def test_attachment (loader): - """ And downloads won’t work in headless mode, even if it’s just a text file """ - await literalItem (loader, testItemMap['/attachment']) - -@pytest.mark.asyncio -async def test_html (loader): - await literalItem (loader, testItemMap['/html'], [testItemMap['/image'], testItemMap['/nonexistent']]) - # make sure alerts are dismissed correctly (image won’t load otherwise) - await literalItem (loader, testItemMap['/html/alert'], [testItemMap['/image']]) - -@pytest.mark.asyncio -async def test_post (loader): - """ XHR POST request with binary data""" - await literalItem (loader, testItemMap['/html/fetchPost'], - [testItemMap['/html/fetchPost/binary'], - testItemMap['/html/fetchPost/binary/large'], - testItemMap['/html/fetchPost/form'], - testItemMap['/html/fetchPost/form/large']]) +async def loader (logger): + async with Process () as browser, SiteLoader (browser, logger) as l: + yield l @pytest.mark.asyncio async def test_crash (loader): - async with loader ('/html') as l: - await l.start () - with pytest.raises (Crashed): - await l.tab.Page.crash () + with pytest.raises (Crashed): + await loader.tab.Page.crash () @pytest.mark.asyncio async def test_invalidurl (loader): @@ -267,15 +72,16 @@ async def test_invalidurl (loader): try: resolved = await loop.getaddrinfo (host, None) except socket.gaierror: - async with loader (f'http://{host}/') as l: - await l.start () - async for it in l: - assert it.failed - break + url = URL.build (scheme='http', host=host) + await loader.navigate (url) + async for it in loader: + assert it.request is not None + assert it.url == url + assert it.response is None + break else: pytest.skip (f'host {host} resolved to {resolved}') - @pytest.mark.asyncio async def test_varchangeevent (): e = VarChangeEvent (True) @@ -299,3 +105,290 @@ async def test_varchangeevent (): assert ret == False assert e.get () == ret +timestamp = st.one_of ( + st.integers(min_value=0, max_value=2**32-1), + st.floats (min_value=0, max_value=2**32-1), + ) + +@given(timestamp, timestamp, timestamp) +def test_referencetimestamp (relativeA, absoluteA, relativeB): + ts = ReferenceTimestamp (relativeA, absoluteA) + absoluteA = datetime.utcfromtimestamp (absoluteA) + absoluteB = ts (relativeB) + assert (absoluteA < absoluteB and relativeA < relativeB) or \ + (absoluteA >= absoluteB and relativeA >= relativeB) + assert abs ((absoluteB - absoluteA).total_seconds () - (relativeB - relativeA)) < 10e-6 + +def hostname (): + # XXX: find a better way to generate hostnames + return st.text (alphabet=st.sampled_from('abcdefghijklmnopqrstuvwxyz0123456789-'), min_size=1, max_size=253) + +def urls (): + """ Build http/https URL """ + scheme = st.sampled_from (['http', 'https']) + # Path must start with a slash + pathSt = st.builds (lambda x: '/' + x, st.text ()) + args = st.fixed_dictionaries ({ + 'scheme': scheme, + 'host': hostname (), + 'port': st.one_of (st.none (), st.integers (min_value=1, max_value=2**16-1)), + 'path': pathSt, + 'query_string': st.text (), + 'fragment': st.text (), + }) + return st.builds (lambda x: URL.build (**x), args) + +def urlsStr (): + return st.builds (lambda x: str (x), urls ()) + +asciiText = st.text (st.characters (min_codepoint=32, max_codepoint=126)) + +def chromeHeaders (): + # token as defined by https://tools.ietf.org/html/rfc7230#section-3.2.6 + token = st.sampled_from('abcdefghijklmnopqrstuvwxyz0123456789!#$%&\'*+-.^_`|~') + # XXX: the value should be asciiText without leading/trailing spaces + return st.dictionaries (token, token) + +def fixedDicts (fixed, dynamic): + return st.builds (lambda x, y: x.update (y), st.fixed_dictionaries (fixed), st.lists (dynamic)) + +def chromeRequestWillBeSent (reqid, url): + methodSt = st.sampled_from (['GET', 'POST', 'PUT', 'DELETE']) + return st.fixed_dictionaries ({ + 'requestId': reqid, + 'initiator': st.just ('Test'), + 'wallTime': timestamp, + 'timestamp': timestamp, + 'request': st.fixed_dictionaries ({ + 'url': url, + 'method': methodSt, + 'headers': chromeHeaders (), + # XXX: postData, hasPostData + }) + }) + +def chromeResponseReceived (reqid, url): + mimeTypeSt = st.one_of (st.none (), st.just ('text/html')) + remoteIpAddressSt = st.one_of (st.none (), st.just ('127.0.0.1')) + protocolSt = st.one_of (st.none (), st.just ('h2')) + statusCodeSt = st.integers (min_value=100, max_value=999) + typeSt = st.sampled_from (['Document', 'Stylesheet', 'Image', 'Media', + 'Font', 'Script', 'TextTrack', 'XHR', 'Fetch', 'EventSource', + 'WebSocket', 'Manifest', 'SignedExchange', 'Ping', + 'CSPViolationReport', 'Other']) + return st.fixed_dictionaries ({ + 'requestId': reqid, + 'timestamp': timestamp, + 'type': typeSt, + 'response': st.fixed_dictionaries ({ + 'url': url, + 'requestHeaders': chromeHeaders (), # XXX: make this optional + 'headers': chromeHeaders (), + 'status': statusCodeSt, + 'statusText': asciiText, + 'mimeType': mimeTypeSt, + 'remoteIPAddress': remoteIpAddressSt, + 'protocol': protocolSt, + }) + }) + +def chromeReqResp (): + # XXX: will this gnerated the same url for all testcases? + reqid = st.shared (st.text (), 'reqresp') + url = st.shared (urlsStr (), 'reqresp') + return st.tuples (chromeRequestWillBeSent (reqid, url), + chromeResponseReceived (reqid, url)) + +def requestResponsePair (): + def f (creq, cresp, hasPostData, reqBody, respBody): + i = RequestResponsePair () + i.fromRequestWillBeSent (creq) + i.request.hasPostData = hasPostData + if hasPostData: + i.request.body = reqBody + + if cresp is not None: + i.fromResponseReceived (cresp) + if respBody is not None: + i.response.body = respBody + return i + + bodySt = st.one_of ( + st.none (), + st.builds (UnicodeBody, st.text ()), + st.builds (Base64Body.fromBytes, st.binary ()) + ) + return st.builds (lambda reqresp, hasPostData, reqBody, respBody: + f (reqresp[0], reqresp[1], hasPostData, reqBody, respBody), + chromeReqResp (), st.booleans (), bodySt, bodySt) + +@given(chromeReqResp ()) +def test_requestResponsePair (creqresp): + creq, cresp = creqresp + + item = RequestResponsePair () + + assert item.id is None + assert item.url is None + assert item.request is None + assert item.response is None + + item.fromRequestWillBeSent (creq) + + assert item.id == creq['requestId'] + url = URL (creq['request']['url']) + assert item.url == url + assert item.request is not None + assert item.request.timestamp == datetime.utcfromtimestamp (creq['wallTime']) + assert set (item.request.headers.keys ()) == set (creq['request']['headers'].keys ()) + assert item.response is None + + item.fromResponseReceived (cresp) + + # url will not be overwritten + assert item.id == creq['requestId'] == cresp['requestId'] + assert item.url == url + assert item.request is not None + assert set (item.request.headers.keys ()) == set (cresp['response']['requestHeaders'].keys ()) + assert item.response is not None + assert set (item.response.headers.keys ()) == set (cresp['response']['headers'].keys ()) + assert (item.response.timestamp - item.request.timestamp).total_seconds () - \ + (cresp['timestamp'] - creq['timestamp']) < 10e-6 + +@given(chromeReqResp ()) +def test_requestResponsePair_eq (creqresp): + creq, cresp = creqresp + + item = RequestResponsePair () + item2 = RequestResponsePair () + assert item == item + assert item == item2 + + item.fromRequestWillBeSent (creq) + assert item != item2 + item2.fromRequestWillBeSent (creq) + assert item == item + assert item == item2 + + item.fromResponseReceived (cresp) + assert item != item2 + item2.fromResponseReceived (cresp) + assert item == item + assert item == item2 + + # XXX: test for inequality with different parameters + +### Google Chrome integration tests ### + +serverUrl = URL.build (scheme='http', host='localhost', port=8080) +items = [ + RequestResponsePair ( + url=serverUrl.with_path ('/encoding/utf-8'), + request=Request (method='GET'), + response=Response (status=200, headers=CIMultiDict ([('Content-Type', 'text/html; charset=utf-8')]), + body=UnicodeBody ('äöü'), mimeType='text/html') + ), + RequestResponsePair ( + url=serverUrl.with_path ('/encoding/latin1'), + request=Request (method='GET'), + response=Response (status=200, headers=CIMultiDict ([('Content-Type', 'text/html; charset=latin1')]), + body=UnicodeBody ('äöü'), mimeType='text/html') + ), + RequestResponsePair ( + url=serverUrl.with_path ('/encoding/utf-16'), + request=Request (method='GET'), + response=Response (status=200, headers=CIMultiDict ([('Content-Type', 'text/html; charset=utf-16')]), + body=UnicodeBody ('äöü'), mimeType='text/html') + ), + RequestResponsePair ( + url=serverUrl.with_path ('/encoding/ISO-8859-1'), + request=Request (method='GET'), + response=Response (status=200, headers=CIMultiDict ([('Content-Type', 'text/html; charset=ISO-8859-1')]), + body=UnicodeBody ('äöü'), mimeType='text/html') + ), + RequestResponsePair ( + url=serverUrl.with_path ('/status/200'), + request=Request (method='GET'), + response=Response (status=200, headers=CIMultiDict ([('Content-Type', 'text/plain')]), + body=b'', + mimeType='text/plain'), + ), + # redirects never have a response body + RequestResponsePair ( + url=serverUrl.with_path ('/status/301'), + request=Request (method='GET'), + response=Response (status=301, + headers=CIMultiDict ([('Content-Type', 'text/plain'), + ('Location', str (serverUrl.with_path ('/status/301/redirected')))]), + body=None, + mimeType='text/plain'), + ), + RequestResponsePair ( + url=serverUrl.with_path ('/image/png'), + request=Request (method='GET'), + response=Response (status=200, headers=CIMultiDict ([('Content-Type', 'image/png')]), + body=Base64Body.fromBytes (b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x00\x00\x00\x00:~\x9bU\x00\x00\x00\nIDAT\x08\x1dc\xf8\x0f\x00\x01\x01\x01\x006_g\x80\x00\x00\x00\x00IEND\xaeB`\x82'), + mimeType='image/png'), + ), + RequestResponsePair ( + url=serverUrl.with_path ('/script/alert'), + request=Request (method='GET'), + response=Response (status=200, headers=CIMultiDict ([('Content-Type', 'text/html; charset=utf-8')]), + body=UnicodeBody ('''<html><body><script> +window.addEventListener("beforeunload", function (e) { + e.returnValue = "bye?"; + return e.returnValue; +}); +alert("stopping here"); +if (confirm("are you sure?") || prompt ("42?")) { + window.location = "/nonexistent"; +} +</script></body></html>'''), mimeType='text/html') + ), + ] + +@pytest.mark.asyncio +# would be nice if we could use hypothesis here somehow +@pytest.mark.parametrize("golden", items) +async def test_integration_item (loader, golden): + async def f (req): + body = golden.response.body + contentType = golden.response.headers.get ('content-type', '') if golden.response.headers is not None else '' + charsetOff = contentType.find ('charset=') + if isinstance (body, UnicodeBody) and charsetOff != -1: + encoding = contentType[charsetOff+len ('charset='):] + body = golden.response.body.decode ('utf-8').encode (encoding) + return web.Response (body=body, status=golden.response.status, + headers=golden.response.headers) + + app = web.Application () + app.router.add_route (golden.request.method, golden.url.path, f) + runner = web.AppRunner(app) + await runner.setup() + site = web.TCPSite(runner, serverUrl.host, serverUrl.port) + await site.start() + + try: + await loader.navigate (golden.url) + + it = loader.__aiter__ () + item = await it.__anext__ () + + # we do not know this in advance + item.request.initiator = None + item.request.headers = None + item.remoteIpAddress = None + item.protocol = None + item.resourceType = None + + if item.response: + assert item.response.statusText is not None + item.response.statusText = None + + del item.response.headers['server'] + del item.response.headers['content-length'] + del item.response.headers['date'] + assert item == golden + finally: + await runner.cleanup () + |