summaryrefslogtreecommitdiff
path: root/crocoite/test_browser.py
diff options
context:
space:
mode:
authorLars-Dominik Braun <lars@6xq.net>2019-01-03 19:34:17 +0100
committerLars-Dominik Braun <lars@6xq.net>2019-01-03 19:37:27 +0100
commit9d7974e3e7e8a4575ea61cb33a30fa291d12ae38 (patch)
tree5311396c0d74eaa35e1eff1e1641c0bd157cde25 /crocoite/test_browser.py
parentad4e119bf1a55c84dc7c6260588ed7db9e7199c6 (diff)
downloadcrocoite-9d7974e3e7e8a4575ea61cb33a30fa291d12ae38.tar.gz
crocoite-9d7974e3e7e8a4575ea61cb33a30fa291d12ae38.tar.bz2
crocoite-9d7974e3e7e8a4575ea61cb33a30fa291d12ae38.zip
browser: Turn Item into RequestResponsePair
Previously Item was just a simple wrapper around Chrome’s Network.* events. This turned out to be quite nasty when testing, so its replacement, RequestResponsePair, does some level of abstraction. This makes testing alot easier, since we now can simply instantiate it without building a proper DevTools event. Should come without any functional changes.
Diffstat (limited to 'crocoite/test_browser.py')
-rw-r--r--crocoite/test_browser.py531
1 files changed, 312 insertions, 219 deletions
diff --git a/crocoite/test_browser.py b/crocoite/test_browser.py
index 8008855..4bf2c64 100644
--- a/crocoite/test_browser.py
+++ b/crocoite/test_browser.py
@@ -19,103 +19,28 @@
# THE SOFTWARE.
import asyncio, socket
-import pytest
from operator import itemgetter
-from aiohttp import web
from http.server import BaseHTTPRequestHandler
+from datetime import datetime
+
+from yarl import URL
+from aiohttp import web
+from multidict import CIMultiDict
-from .browser import Item, SiteLoader, VarChangeEvent
+from hypothesis import given
+import hypothesis.strategies as st
+import pytest
+
+from .browser import RequestResponsePair, SiteLoader, VarChangeEvent, Request, \
+ UnicodeBody, ReferenceTimestamp, Base64Body, UnicodeBody, Request, \
+ Response
from .logger import Logger, Consumer
from .devtools import Crashed, Process
# if you want to know what’s going on:
+#import logging
#logging.basicConfig(level=logging.DEBUG)
-class TItem (Item):
- """ This should be as close to Item as possible """
-
- __slots__ = ('bodySend', '_body', '_requestBody')
- base = 'http://localhost:8000/'
-
- def __init__ (self, path, status, headers, bodyReceive, bodySend=None, requestBody=None, failed=False, isRedirect=False):
- super ().__init__ ()
- self.chromeResponse = {'response': {'headers': headers, 'status': status, 'url': self.base + path}}
- self.body = bodyReceive, False
- self.bodySend = bodyReceive if not bodySend else bodySend
- self.requestBody = requestBody, False
- self.failed = failed
- self.isRedirect = isRedirect
-
-testItems = [
- TItem ('binary', 200, {'Content-Type': 'application/octet-stream'}, b'\x00\x01\x02', failed=True),
- TItem ('attachment', 200,
- {'Content-Type': 'text/plain; charset=utf-8',
- 'Content-Disposition': 'attachment; filename="attachment.txt"',
- },
- 'This is a simple text file with umlauts. ÄÖU.'.encode ('utf8'), failed=True),
- TItem ('encoding/utf8', 200, {'Content-Type': 'text/plain; charset=utf-8'},
- 'This is a test, äöü μνψκ ¥¥¥¿ýý¡'.encode ('utf8')),
- TItem ('encoding/iso88591', 200, {'Content-Type': 'text/plain; charset=ISO-8859-1'},
- 'This is a test, äöü.'.encode ('utf8'),
- 'This is a test, äöü.'.encode ('ISO-8859-1')),
- TItem ('encoding/latin1', 200, {'Content-Type': 'text/plain; charset=latin1'},
- 'This is a test, äöü.'.encode ('utf8'),
- 'This is a test, äöü.'.encode ('latin1')),
- TItem ('image', 200, {'Content-Type': 'image/png'},
- # 1×1 png image
- b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x00\x00\x00\x00:~\x9bU\x00\x00\x00\nIDAT\x08\x1dc\xf8\x0f\x00\x01\x01\x01\x006_g\x80\x00\x00\x00\x00IEND\xaeB`\x82'),
- TItem ('empty', 200, {'Content-Type': 'text/plain'}, b''),
- TItem ('headers/duplicate', 200, [('Content-Type', 'text/plain'), ('Duplicate', '1'), ('Duplicate', '2')], b''),
- TItem ('headers/fetch/req', 200, {'Content-Type': 'text/plain'}, b''),
- TItem ('headers/fetch/html', 200, {'Content-Type': 'text/html'},
- r"""<html><body><script>
- let h = new Headers([["custom", "1"]]);
- fetch("/headers/fetch/req", {"method": "GET", "headers": h}).then(x => console.log("done"));
- </script></body></html>""".encode ('utf8')),
- TItem ('redirect/301/empty', 301, {'Location': '/empty'}, b'', isRedirect=True),
- TItem ('redirect/301/redirect/301/empty', 301, {'Location': '/redirect/301/empty'}, b'', isRedirect=True),
- TItem ('nonexistent', 404, {}, b''),
- TItem ('html', 200, {'Content-Type': 'text/html'},
- '<html><body><img src="/image"><img src="/nonexistent"></body></html>'.encode ('utf8')),
- TItem ('html/alert', 200, {'Content-Type': 'text/html'},
- '<html><body><script>window.addEventListener("beforeunload", function (e) { e.returnValue = "bye?"; return e.returnValue; }); alert("stopping here"); if (confirm("are you sure?") || prompt ("42?")) { window.location = "/nonexistent"; }</script><script>document.write(\'<img src="/image">\');</script></body></html>'.encode ('utf8')),
- TItem ('html/fetchPost', 200, {'Content-Type': 'text/html'},
- r"""<html><body><script>
- let a = fetch("/html/fetchPost/binary", {"method": "POST", "body": "\x00"});
- let b = fetch("/html/fetchPost/form", {"method": "POST", "body": new URLSearchParams({"data": "!"})});
- let c = fetch("/html/fetchPost/binary/large", {"method": "POST", "body": "\x00".repeat(100*1024)});
- let d = fetch("/html/fetchPost/form/large", {"method": "POST", "body": new URLSearchParams({"data": "!".repeat(100*1024)})});
- </script></body></html>""".encode ('utf8')),
- TItem ('html/fetchPost/binary', 200, {'Content-Type': 'application/octet-stream'}, b'\x00', requestBody=b'\x00'),
- TItem ('html/fetchPost/form', 200, {'Content-Type': 'application/octet-stream'}, b'\x00', requestBody=b'data=%21'),
- # XXX: these should trigger the need for getRequestPostData, but they don’t. oh well.
- TItem ('html/fetchPost/binary/large', 200, {'Content-Type': 'application/octet-stream'}, b'\x00', requestBody=(100*1024)*b'\x00'),
- TItem ('html/fetchPost/form/large', 200, {'Content-Type': 'application/octet-stream'}, b'\x00', requestBody=b'data=' + (100*1024)*b'%21'),
- ]
-testItemMap = dict ([(item.url.path, item) for item in testItems])
-
-def itemToResponse (item):
- async def f (req):
- headers = item.response['headers']
- return web.Response(body=item.bodySend, status=item.response['status'],
- headers=headers)
- return f
-
-@pytest.fixture
-async def server ():
- """ Simple HTTP server for testing notifications """
- import logging
- logging.basicConfig(level=logging.DEBUG)
- app = web.Application()
- for item in testItems:
- app.router.add_route ('*', item.url.path, itemToResponse (item))
- runner = web.AppRunner(app)
- await runner.setup()
- site = web.TCPSite(runner, 'localhost', 8080)
- await site.start()
- yield app
- await runner.cleanup ()
-
class AssertConsumer (Consumer):
def __call__ (self, **kwargs):
assert 'uuid' in kwargs
@@ -128,134 +53,14 @@ def logger ():
return Logger (consumer=[AssertConsumer ()])
@pytest.fixture
-async def loader (server, logger):
- def f (path):
- if path.startswith ('/'):
- path = 'http://localhost:8080{}'.format (path)
- return SiteLoader (browser, path, logger)
- async with Process () as browser:
- yield f
-
-async def itemsLoaded (l, items):
- items = dict ([(i.url.path, i) for i in items])
- async for item in l:
- assert item.chromeResponse is not None
- golden = items.pop (item.url.path)
- if not golden:
- assert False, f'url {item.url} not supposed to be fetched'
- assert item.failed == golden.failed
- if item.failed:
- # response will be invalid if request failed
- if not items:
- break
- else:
- continue
- assert item.isRedirect == golden.isRedirect
- if golden.isRedirect:
- assert item.body is None
- else:
- assert item.body[0] == golden.body[0]
- assert item.requestBody[0] == golden.requestBody[0]
- assert item.response['status'] == golden.response['status']
- assert item.statusText == BaseHTTPRequestHandler.responses.get (item.response['status'])[0]
- for k, v in golden.responseHeaders:
- actual = list (map (itemgetter (1), filter (lambda x: x[0] == k, item.responseHeaders)))
- assert v in actual
-
- # we’re done when everything has been loaded
- if not items:
- break
-
-async def literalItem (lf, item, deps=[]):
- async with lf (item.url.path) as l:
- await l.start ()
- await asyncio.wait_for (itemsLoaded (l, [item] + deps), timeout=30)
-
-@pytest.mark.asyncio
-async def test_empty (loader):
- await literalItem (loader, testItemMap['/empty'])
-
-@pytest.mark.asyncio
-async def test_headers_duplicate (loader):
- """
- Some headers, like Set-Cookie can be present multiple times. Chrome
- separates these with a newline.
- """
- async with loader ('/headers/duplicate') as l:
- await l.start ()
- async for it in l:
- if it.url.path == '/headers/duplicate':
- assert not it.failed
- dup = list (filter (lambda x: x[0] == 'Duplicate', it.responseHeaders))
- assert len(dup) == 2
- assert list(sorted(map(itemgetter(1), dup))) == ['1', '2']
- break
-
-@pytest.mark.asyncio
-async def test_headers_req (loader):
- """
- Custom request headers. JavaScript’s Headers() does not support duplicate
- headers, so we can’t generate those.
- """
- async with loader ('/headers/fetch/html') as l:
- await l.start ()
- async for it in l:
- if it.url.path == '/headers/fetch/req':
- assert not it.failed
- dup = list (filter (lambda x: x[0] == 'custom', it.requestHeaders))
- assert len(dup) == 1
- assert list(sorted(map(itemgetter(1), dup))) == ['1']
- break
-
-@pytest.mark.asyncio
-async def test_redirect (loader):
- await literalItem (loader, testItemMap['/redirect/301/empty'], [testItemMap['/empty']])
- # chained redirects
- await literalItem (loader, testItemMap['/redirect/301/redirect/301/empty'], [testItemMap['/redirect/301/empty'], testItemMap['/empty']])
-
-@pytest.mark.asyncio
-async def test_encoding (loader):
- """ Text responses are transformed to UTF-8. Make sure this works
- correctly. """
- for item in {testItemMap['/encoding/utf8'], testItemMap['/encoding/latin1'], testItemMap['/encoding/iso88591']}:
- await literalItem (loader, item)
-
-@pytest.mark.asyncio
-async def test_binary (loader):
- """ Browser should ignore content it cannot display (i.e. octet-stream) """
- await literalItem (loader, testItemMap['/binary'])
-
-@pytest.mark.asyncio
-async def test_image (loader):
- """ Images should be displayed inline """
- await literalItem (loader, testItemMap['/image'])
-
-@pytest.mark.asyncio
-async def test_attachment (loader):
- """ And downloads won’t work in headless mode, even if it’s just a text file """
- await literalItem (loader, testItemMap['/attachment'])
-
-@pytest.mark.asyncio
-async def test_html (loader):
- await literalItem (loader, testItemMap['/html'], [testItemMap['/image'], testItemMap['/nonexistent']])
- # make sure alerts are dismissed correctly (image won’t load otherwise)
- await literalItem (loader, testItemMap['/html/alert'], [testItemMap['/image']])
-
-@pytest.mark.asyncio
-async def test_post (loader):
- """ XHR POST request with binary data"""
- await literalItem (loader, testItemMap['/html/fetchPost'],
- [testItemMap['/html/fetchPost/binary'],
- testItemMap['/html/fetchPost/binary/large'],
- testItemMap['/html/fetchPost/form'],
- testItemMap['/html/fetchPost/form/large']])
+async def loader (logger):
+ async with Process () as browser, SiteLoader (browser, logger) as l:
+ yield l
@pytest.mark.asyncio
async def test_crash (loader):
- async with loader ('/html') as l:
- await l.start ()
- with pytest.raises (Crashed):
- await l.tab.Page.crash ()
+ with pytest.raises (Crashed):
+ await loader.tab.Page.crash ()
@pytest.mark.asyncio
async def test_invalidurl (loader):
@@ -267,15 +72,16 @@ async def test_invalidurl (loader):
try:
resolved = await loop.getaddrinfo (host, None)
except socket.gaierror:
- async with loader (f'http://{host}/') as l:
- await l.start ()
- async for it in l:
- assert it.failed
- break
+ url = URL.build (scheme='http', host=host)
+ await loader.navigate (url)
+ async for it in loader:
+ assert it.request is not None
+ assert it.url == url
+ assert it.response is None
+ break
else:
pytest.skip (f'host {host} resolved to {resolved}')
-
@pytest.mark.asyncio
async def test_varchangeevent ():
e = VarChangeEvent (True)
@@ -299,3 +105,290 @@ async def test_varchangeevent ():
assert ret == False
assert e.get () == ret
+timestamp = st.one_of (
+ st.integers(min_value=0, max_value=2**32-1),
+ st.floats (min_value=0, max_value=2**32-1),
+ )
+
+@given(timestamp, timestamp, timestamp)
+def test_referencetimestamp (relativeA, absoluteA, relativeB):
+ ts = ReferenceTimestamp (relativeA, absoluteA)
+ absoluteA = datetime.utcfromtimestamp (absoluteA)
+ absoluteB = ts (relativeB)
+ assert (absoluteA < absoluteB and relativeA < relativeB) or \
+ (absoluteA >= absoluteB and relativeA >= relativeB)
+ assert abs ((absoluteB - absoluteA).total_seconds () - (relativeB - relativeA)) < 10e-6
+
+def hostname ():
+ # XXX: find a better way to generate hostnames
+ return st.text (alphabet=st.sampled_from('abcdefghijklmnopqrstuvwxyz0123456789-'), min_size=1, max_size=253)
+
+def urls ():
+ """ Build http/https URL """
+ scheme = st.sampled_from (['http', 'https'])
+ # Path must start with a slash
+ pathSt = st.builds (lambda x: '/' + x, st.text ())
+ args = st.fixed_dictionaries ({
+ 'scheme': scheme,
+ 'host': hostname (),
+ 'port': st.one_of (st.none (), st.integers (min_value=1, max_value=2**16-1)),
+ 'path': pathSt,
+ 'query_string': st.text (),
+ 'fragment': st.text (),
+ })
+ return st.builds (lambda x: URL.build (**x), args)
+
+def urlsStr ():
+ return st.builds (lambda x: str (x), urls ())
+
+asciiText = st.text (st.characters (min_codepoint=32, max_codepoint=126))
+
+def chromeHeaders ():
+ # token as defined by https://tools.ietf.org/html/rfc7230#section-3.2.6
+ token = st.sampled_from('abcdefghijklmnopqrstuvwxyz0123456789!#$%&\'*+-.^_`|~')
+ # XXX: the value should be asciiText without leading/trailing spaces
+ return st.dictionaries (token, token)
+
+def fixedDicts (fixed, dynamic):
+ return st.builds (lambda x, y: x.update (y), st.fixed_dictionaries (fixed), st.lists (dynamic))
+
+def chromeRequestWillBeSent (reqid, url):
+ methodSt = st.sampled_from (['GET', 'POST', 'PUT', 'DELETE'])
+ return st.fixed_dictionaries ({
+ 'requestId': reqid,
+ 'initiator': st.just ('Test'),
+ 'wallTime': timestamp,
+ 'timestamp': timestamp,
+ 'request': st.fixed_dictionaries ({
+ 'url': url,
+ 'method': methodSt,
+ 'headers': chromeHeaders (),
+ # XXX: postData, hasPostData
+ })
+ })
+
+def chromeResponseReceived (reqid, url):
+ mimeTypeSt = st.one_of (st.none (), st.just ('text/html'))
+ remoteIpAddressSt = st.one_of (st.none (), st.just ('127.0.0.1'))
+ protocolSt = st.one_of (st.none (), st.just ('h2'))
+ statusCodeSt = st.integers (min_value=100, max_value=999)
+ typeSt = st.sampled_from (['Document', 'Stylesheet', 'Image', 'Media',
+ 'Font', 'Script', 'TextTrack', 'XHR', 'Fetch', 'EventSource',
+ 'WebSocket', 'Manifest', 'SignedExchange', 'Ping',
+ 'CSPViolationReport', 'Other'])
+ return st.fixed_dictionaries ({
+ 'requestId': reqid,
+ 'timestamp': timestamp,
+ 'type': typeSt,
+ 'response': st.fixed_dictionaries ({
+ 'url': url,
+ 'requestHeaders': chromeHeaders (), # XXX: make this optional
+ 'headers': chromeHeaders (),
+ 'status': statusCodeSt,
+ 'statusText': asciiText,
+ 'mimeType': mimeTypeSt,
+ 'remoteIPAddress': remoteIpAddressSt,
+ 'protocol': protocolSt,
+ })
+ })
+
+def chromeReqResp ():
+ # XXX: will this gnerated the same url for all testcases?
+ reqid = st.shared (st.text (), 'reqresp')
+ url = st.shared (urlsStr (), 'reqresp')
+ return st.tuples (chromeRequestWillBeSent (reqid, url),
+ chromeResponseReceived (reqid, url))
+
+def requestResponsePair ():
+ def f (creq, cresp, hasPostData, reqBody, respBody):
+ i = RequestResponsePair ()
+ i.fromRequestWillBeSent (creq)
+ i.request.hasPostData = hasPostData
+ if hasPostData:
+ i.request.body = reqBody
+
+ if cresp is not None:
+ i.fromResponseReceived (cresp)
+ if respBody is not None:
+ i.response.body = respBody
+ return i
+
+ bodySt = st.one_of (
+ st.none (),
+ st.builds (UnicodeBody, st.text ()),
+ st.builds (Base64Body.fromBytes, st.binary ())
+ )
+ return st.builds (lambda reqresp, hasPostData, reqBody, respBody:
+ f (reqresp[0], reqresp[1], hasPostData, reqBody, respBody),
+ chromeReqResp (), st.booleans (), bodySt, bodySt)
+
+@given(chromeReqResp ())
+def test_requestResponsePair (creqresp):
+ creq, cresp = creqresp
+
+ item = RequestResponsePair ()
+
+ assert item.id is None
+ assert item.url is None
+ assert item.request is None
+ assert item.response is None
+
+ item.fromRequestWillBeSent (creq)
+
+ assert item.id == creq['requestId']
+ url = URL (creq['request']['url'])
+ assert item.url == url
+ assert item.request is not None
+ assert item.request.timestamp == datetime.utcfromtimestamp (creq['wallTime'])
+ assert set (item.request.headers.keys ()) == set (creq['request']['headers'].keys ())
+ assert item.response is None
+
+ item.fromResponseReceived (cresp)
+
+ # url will not be overwritten
+ assert item.id == creq['requestId'] == cresp['requestId']
+ assert item.url == url
+ assert item.request is not None
+ assert set (item.request.headers.keys ()) == set (cresp['response']['requestHeaders'].keys ())
+ assert item.response is not None
+ assert set (item.response.headers.keys ()) == set (cresp['response']['headers'].keys ())
+ assert (item.response.timestamp - item.request.timestamp).total_seconds () - \
+ (cresp['timestamp'] - creq['timestamp']) < 10e-6
+
+@given(chromeReqResp ())
+def test_requestResponsePair_eq (creqresp):
+ creq, cresp = creqresp
+
+ item = RequestResponsePair ()
+ item2 = RequestResponsePair ()
+ assert item == item
+ assert item == item2
+
+ item.fromRequestWillBeSent (creq)
+ assert item != item2
+ item2.fromRequestWillBeSent (creq)
+ assert item == item
+ assert item == item2
+
+ item.fromResponseReceived (cresp)
+ assert item != item2
+ item2.fromResponseReceived (cresp)
+ assert item == item
+ assert item == item2
+
+ # XXX: test for inequality with different parameters
+
+### Google Chrome integration tests ###
+
+serverUrl = URL.build (scheme='http', host='localhost', port=8080)
+items = [
+ RequestResponsePair (
+ url=serverUrl.with_path ('/encoding/utf-8'),
+ request=Request (method='GET'),
+ response=Response (status=200, headers=CIMultiDict ([('Content-Type', 'text/html; charset=utf-8')]),
+ body=UnicodeBody ('äöü'), mimeType='text/html')
+ ),
+ RequestResponsePair (
+ url=serverUrl.with_path ('/encoding/latin1'),
+ request=Request (method='GET'),
+ response=Response (status=200, headers=CIMultiDict ([('Content-Type', 'text/html; charset=latin1')]),
+ body=UnicodeBody ('äöü'), mimeType='text/html')
+ ),
+ RequestResponsePair (
+ url=serverUrl.with_path ('/encoding/utf-16'),
+ request=Request (method='GET'),
+ response=Response (status=200, headers=CIMultiDict ([('Content-Type', 'text/html; charset=utf-16')]),
+ body=UnicodeBody ('äöü'), mimeType='text/html')
+ ),
+ RequestResponsePair (
+ url=serverUrl.with_path ('/encoding/ISO-8859-1'),
+ request=Request (method='GET'),
+ response=Response (status=200, headers=CIMultiDict ([('Content-Type', 'text/html; charset=ISO-8859-1')]),
+ body=UnicodeBody ('äöü'), mimeType='text/html')
+ ),
+ RequestResponsePair (
+ url=serverUrl.with_path ('/status/200'),
+ request=Request (method='GET'),
+ response=Response (status=200, headers=CIMultiDict ([('Content-Type', 'text/plain')]),
+ body=b'',
+ mimeType='text/plain'),
+ ),
+ # redirects never have a response body
+ RequestResponsePair (
+ url=serverUrl.with_path ('/status/301'),
+ request=Request (method='GET'),
+ response=Response (status=301,
+ headers=CIMultiDict ([('Content-Type', 'text/plain'),
+ ('Location', str (serverUrl.with_path ('/status/301/redirected')))]),
+ body=None,
+ mimeType='text/plain'),
+ ),
+ RequestResponsePair (
+ url=serverUrl.with_path ('/image/png'),
+ request=Request (method='GET'),
+ response=Response (status=200, headers=CIMultiDict ([('Content-Type', 'image/png')]),
+ body=Base64Body.fromBytes (b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x00\x00\x00\x00:~\x9bU\x00\x00\x00\nIDAT\x08\x1dc\xf8\x0f\x00\x01\x01\x01\x006_g\x80\x00\x00\x00\x00IEND\xaeB`\x82'),
+ mimeType='image/png'),
+ ),
+ RequestResponsePair (
+ url=serverUrl.with_path ('/script/alert'),
+ request=Request (method='GET'),
+ response=Response (status=200, headers=CIMultiDict ([('Content-Type', 'text/html; charset=utf-8')]),
+ body=UnicodeBody ('''<html><body><script>
+window.addEventListener("beforeunload", function (e) {
+ e.returnValue = "bye?";
+ return e.returnValue;
+});
+alert("stopping here");
+if (confirm("are you sure?") || prompt ("42?")) {
+ window.location = "/nonexistent";
+}
+</script></body></html>'''), mimeType='text/html')
+ ),
+ ]
+
+@pytest.mark.asyncio
+# would be nice if we could use hypothesis here somehow
+@pytest.mark.parametrize("golden", items)
+async def test_integration_item (loader, golden):
+ async def f (req):
+ body = golden.response.body
+ contentType = golden.response.headers.get ('content-type', '') if golden.response.headers is not None else ''
+ charsetOff = contentType.find ('charset=')
+ if isinstance (body, UnicodeBody) and charsetOff != -1:
+ encoding = contentType[charsetOff+len ('charset='):]
+ body = golden.response.body.decode ('utf-8').encode (encoding)
+ return web.Response (body=body, status=golden.response.status,
+ headers=golden.response.headers)
+
+ app = web.Application ()
+ app.router.add_route (golden.request.method, golden.url.path, f)
+ runner = web.AppRunner(app)
+ await runner.setup()
+ site = web.TCPSite(runner, serverUrl.host, serverUrl.port)
+ await site.start()
+
+ try:
+ await loader.navigate (golden.url)
+
+ it = loader.__aiter__ ()
+ item = await it.__anext__ ()
+
+ # we do not know this in advance
+ item.request.initiator = None
+ item.request.headers = None
+ item.remoteIpAddress = None
+ item.protocol = None
+ item.resourceType = None
+
+ if item.response:
+ assert item.response.statusText is not None
+ item.response.statusText = None
+
+ del item.response.headers['server']
+ del item.response.headers['content-length']
+ del item.response.headers['date']
+ assert item == golden
+ finally:
+ await runner.cleanup ()
+