summaryrefslogtreecommitdiff
path: root/crocoite/test_browser.py
diff options
context:
space:
mode:
Diffstat (limited to 'crocoite/test_browser.py')
-rw-r--r--crocoite/test_browser.py560
1 files changed, 349 insertions, 211 deletions
diff --git a/crocoite/test_browser.py b/crocoite/test_browser.py
index 5c7fc69..7084214 100644
--- a/crocoite/test_browser.py
+++ b/crocoite/test_browser.py
@@ -18,232 +18,370 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
-import pytest
+import asyncio, socket
from operator import itemgetter
from http.server import BaseHTTPRequestHandler
-from pychrome.exceptions import TimeoutException
+from datetime import datetime
-from .browser import Item, SiteLoader, ChromeService, NullService, BrowserCrashed
-from .logger import Logger, Consumer
+from yarl import URL
+from aiohttp import web
+from multidict import CIMultiDict
-class TItem (Item):
- """ This should be as close to Item as possible """
-
- __slots__ = ('bodySend', '_body', '_requestBody')
- base = 'http://localhost:8000/'
-
- def __init__ (self, path, status, headers, bodyReceive, bodySend=None, requestBody=None, failed=False):
- super ().__init__ (tab=None)
- self.chromeResponse = {'response': {'headers': headers, 'status': status, 'url': self.base + path}}
- self._body = bodyReceive, False
- self.bodySend = bodyReceive if not bodySend else bodySend
- self._requestBody = requestBody, False
- self.failed = failed
-
- @property
- def body (self):
- return self._body
-
- @property
- def requestBody (self):
- return self._requestBody
-
-testItems = [
- TItem ('binary', 200, {'Content-Type': 'application/octet-stream'}, b'\x00\x01\x02', failed=True),
- TItem ('attachment', 200,
- {'Content-Type': 'text/plain; charset=utf-8',
- 'Content-Disposition': 'attachment; filename="attachment.txt"',
- },
- 'This is a simple text file with umlauts. ÄÖU.'.encode ('utf8'), failed=True),
- TItem ('encoding/utf8', 200, {'Content-Type': 'text/plain; charset=utf-8'},
- 'This is a test, äöü μνψκ ¥¥¥¿ýý¡'.encode ('utf8')),
- TItem ('encoding/iso88591', 200, {'Content-Type': 'text/plain; charset=ISO-8859-1'},
- 'This is a test, äöü.'.encode ('utf8'),
- 'This is a test, äöü.'.encode ('ISO-8859-1')),
- TItem ('encoding/latin1', 200, {'Content-Type': 'text/plain; charset=latin1'},
- 'This is a test, äöü.'.encode ('utf8'),
- 'This is a test, äöü.'.encode ('latin1')),
- TItem ('image', 200, {'Content-Type': 'image/png'},
- # 1×1 png image
- b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x00\x00\x00\x00:~\x9bU\x00\x00\x00\nIDAT\x08\x1dc\xf8\x0f\x00\x01\x01\x01\x006_g\x80\x00\x00\x00\x00IEND\xaeB`\x82'),
- TItem ('empty', 200, {}, b''),
- TItem ('redirect/301/empty', 301, {'Location': '/empty'}, b''),
- TItem ('redirect/301/redirect/301/empty', 301, {'Location': '/redirect/301/empty'}, b''),
- TItem ('nonexistent', 404, {}, b''),
- TItem ('html', 200, {'Content-Type': 'html'},
- '<html><body><img src="/image"><img src="/nonexistent"></body></html>'.encode ('utf8')),
- TItem ('html/alert', 200, {'Content-Type': 'html'},
- '<html><body><script>window.addEventListener("beforeunload", function (e) { e.returnValue = "bye?"; return e.returnValue; }); alert("stopping here"); if (confirm("are you sure?") || prompt ("42?")) { window.location = "/nonexistent"; }</script><img src="/image"></body></html>'.encode ('utf8')),
- TItem ('html/fetchPost', 200, {'Content-Type': 'html'},
- r"""<html><body><script>
- let a = fetch("/html/fetchPost/binary", {"method": "POST", "body": "\x00"});
- let b = fetch("/html/fetchPost/form", {"method": "POST", "body": new URLSearchParams({"data": "!"})});
- let c = fetch("/html/fetchPost/binary/large", {"method": "POST", "body": "\x00".repeat(100*1024)});
- let d = fetch("/html/fetchPost/form/large", {"method": "POST", "body": new URLSearchParams({"data": "!".repeat(100*1024)})});
- </script></body></html>""".encode ('utf8')),
- TItem ('html/fetchPost/binary', 200, {'Content-Type': 'application/octet-stream'}, b'\x00', requestBody=b'\x00'),
- TItem ('html/fetchPost/form', 200, {'Content-Type': 'application/octet-stream'}, b'\x00', requestBody=b'data=%21'),
- # XXX: these should trigger the need for getRequestPostData, but they don’t. oh well.
- TItem ('html/fetchPost/binary/large', 200, {'Content-Type': 'application/octet-stream'}, b'\x00', requestBody=(100*1024)*b'\x00'),
- TItem ('html/fetchPost/form/large', 200, {'Content-Type': 'application/octet-stream'}, b'\x00', requestBody=b'data=' + (100*1024)*b'%21'),
- ]
-testItemMap = dict ([(item.parsedUrl.path, item) for item in testItems])
-
-class RequestHandler (BaseHTTPRequestHandler):
- def do_GET(self):
- item = testItemMap.get (self.path)
- if item:
- self.send_response (item.response['status'])
- for k, v in item.response['headers'].items ():
- self.send_header (k, v)
- body = item.bodySend
- self.send_header ('Content-Length', len (body))
- self.end_headers()
- self.wfile.write (body)
- return
-
- do_POST = do_GET
-
- def log_message (self, format, *args):
- pass
+from hypothesis import given
+import hypothesis.strategies as st
+from hypothesis.provisional import domains
+import pytest
-@pytest.fixture
-def http ():
- def run ():
- import http.server
- PORT = 8000
- httpd = http.server.HTTPServer (("localhost", PORT), RequestHandler)
- print ('starting http server')
- httpd.serve_forever()
-
- from multiprocessing import Process
- p = Process (target=run)
- p.start ()
- yield p
- p.terminate ()
- p.join ()
+from .browser import RequestResponsePair, SiteLoader, Request, \
+ UnicodeBody, ReferenceTimestamp, Base64Body, UnicodeBody, Request, \
+ Response, NavigateError, PageIdle, FrameNavigated
+from .logger import Logger, Consumer
+from .devtools import Crashed, Process
+
+# if you want to know what’s going on:
+#import logging
+#logging.basicConfig(level=logging.DEBUG)
class AssertConsumer (Consumer):
def __call__ (self, **kwargs):
assert 'uuid' in kwargs
assert 'msg' in kwargs
assert 'context' in kwargs
+ return kwargs
@pytest.fixture
def logger ():
return Logger (consumer=[AssertConsumer ()])
@pytest.fixture
-def loader (http, logger):
- def f (path):
- if path.startswith ('/'):
- path = 'http://localhost:8000{}'.format (path)
- return SiteLoader (browser, path, logger)
- print ('loader setup')
- with ChromeService () as browser:
- yield f
- print ('loader teardown')
-
-def itemsLoaded (l, items):
- items = dict ([(i.parsedUrl.path, i) for i in items])
- timeout = 5
- while True:
- if not l.notify.wait (timeout) and len (items) > 0:
- assert False, 'timeout'
- if len (l.queue) > 0:
- item = l.queue.popleft ()
- if isinstance (item, Exception):
- raise item
- assert item.chromeResponse is not None
- golden = items.pop (item.parsedUrl.path)
- if not golden:
- assert False, 'url {} not supposed to be fetched'.format (item.url)
- assert item.failed == golden.failed
- if item.failed:
- # response will be invalid if request failed
- continue
- assert item.body[0] == golden.body[0]
- assert item.requestBody[0] == golden.requestBody[0]
- assert item.response['status'] == golden.response['status']
- assert item.statusText == BaseHTTPRequestHandler.responses.get (item.response['status'])[0]
- for k, v in golden.responseHeaders:
- actual = list (map (itemgetter (1), filter (lambda x: x[0] == k, item.responseHeaders)))
- assert v in actual
-
- # check queue at least once
- if not items:
- break
-
-def literalItem (lf, item, deps=[]):
- with lf (item.parsedUrl.path) as l:
- l.start ()
- itemsLoaded (l, [item] + deps)
-
-def test_empty (loader):
- literalItem (loader, testItemMap['/empty'])
-
-def test_redirect (loader):
- literalItem (loader, testItemMap['/redirect/301/empty'], [testItemMap['/empty']])
- # chained redirects
- literalItem (loader, testItemMap['/redirect/301/redirect/301/empty'], [testItemMap['/redirect/301/empty'], testItemMap['/empty']])
-
-def test_encoding (loader):
- """ Text responses are transformed to UTF-8. Make sure this works
- correctly. """
- for item in {testItemMap['/encoding/utf8'], testItemMap['/encoding/latin1'], testItemMap['/encoding/iso88591']}:
- literalItem (loader, item)
-
-def test_binary (loader):
- """ Browser should ignore content it cannot display (i.e. octet-stream) """
- literalItem (loader, testItemMap['/binary'])
-
-def test_image (loader):
- """ Images should be displayed inline """
- literalItem (loader, testItemMap['/image'])
-
-def test_attachment (loader):
- """ And downloads won’t work in headless mode, even if it’s just a text file """
- literalItem (loader, testItemMap['/attachment'])
-
-def test_html (loader):
- literalItem (loader, testItemMap['/html'], [testItemMap['/image'], testItemMap['/nonexistent']])
- # make sure alerts are dismissed correctly (image won’t load otherwise)
- literalItem (loader, testItemMap['/html/alert'], [testItemMap['/image']])
-
-def test_post (loader):
- """ XHR POST request with binary data"""
- literalItem (loader, testItemMap['/html/fetchPost'],
- [testItemMap['/html/fetchPost/binary'],
- testItemMap['/html/fetchPost/binary/large'],
- testItemMap['/html/fetchPost/form'],
- testItemMap['/html/fetchPost/form/large']])
-
-def test_crash (loader):
- with loader ('/html') as l:
- l.start ()
- try:
- l.tab.Page.crash (_timeout=1)
- except TimeoutException:
- pass
- q = l.queue
- assert isinstance (q.popleft (), BrowserCrashed)
-
-def test_invalidurl (loader):
- url = 'http://nonexistent.example/'
- with loader (url) as l:
- l.start ()
-
- q = l.queue
- if not l.notify.wait (10):
- assert False, 'timeout'
-
- it = q.popleft ()
- assert it.failed
-
-def test_nullservice ():
- """ Null service returns the url as is """
-
- url = 'http://localhost:12345'
- with NullService (url) as u:
- assert u == url
+async def loader (logger):
+ async with Process () as browser, SiteLoader (browser, logger) as l:
+ yield l
+
+@pytest.mark.asyncio
+async def test_crash (loader):
+ with pytest.raises (Crashed):
+ await loader.tab.Page.crash ()
+
+@pytest.mark.asyncio
+async def test_invalidurl (loader):
+ host = 'nonexistent.example'
+
+ # make sure the url does *not* resolve (some DNS intercepting ISP’s mess
+ # with this)
+ loop = asyncio.get_event_loop ()
+ try:
+ resolved = await loop.getaddrinfo (host, None)
+ except socket.gaierror:
+ url = URL.build (scheme='http', host=host)
+ with pytest.raises (NavigateError):
+ await loader.navigate (url)
+ else:
+ pytest.skip (f'host {host} resolved to {resolved}')
+
+timestamp = st.one_of (
+ st.integers(min_value=0, max_value=2**32-1),
+ st.floats (min_value=0, max_value=2**32-1),
+ )
+
+@given(timestamp, timestamp, timestamp)
+def test_referencetimestamp (relativeA, absoluteA, relativeB):
+ ts = ReferenceTimestamp (relativeA, absoluteA)
+ absoluteA = datetime.utcfromtimestamp (absoluteA)
+ absoluteB = ts (relativeB)
+ assert (absoluteA < absoluteB and relativeA < relativeB) or \
+ (absoluteA >= absoluteB and relativeA >= relativeB)
+ assert abs ((absoluteB - absoluteA).total_seconds () - (relativeB - relativeA)) < 10e-6
+
+def urls ():
+ """ Build http/https URL """
+ scheme = st.sampled_from (['http', 'https'])
+ # Path must start with a slash
+ pathSt = st.builds (lambda x: '/' + x, st.text ())
+ args = st.fixed_dictionaries ({
+ 'scheme': scheme,
+ 'host': domains (),
+ 'port': st.one_of (st.none (), st.integers (min_value=1, max_value=2**16-1)),
+ 'path': pathSt,
+ 'query_string': st.text (),
+ 'fragment': st.text (),
+ })
+ return st.builds (lambda x: URL.build (**x), args)
+
+def urlsStr ():
+ return st.builds (lambda x: str (x), urls ())
+
+asciiText = st.text (st.characters (min_codepoint=32, max_codepoint=126))
+
+def chromeHeaders ():
+ # token as defined by https://tools.ietf.org/html/rfc7230#section-3.2.6
+ token = st.sampled_from('abcdefghijklmnopqrstuvwxyz0123456789!#$%&\'*+-.^_`|~')
+ # XXX: the value should be asciiText without leading/trailing spaces
+ return st.dictionaries (token, token)
+
+def fixedDicts (fixed, dynamic):
+ return st.builds (lambda x, y: x.update (y), st.fixed_dictionaries (fixed), st.lists (dynamic))
+
+def chromeRequestWillBeSent (reqid, url):
+ methodSt = st.sampled_from (['GET', 'POST', 'PUT', 'DELETE'])
+ return st.fixed_dictionaries ({
+ 'requestId': reqid,
+ 'initiator': st.just ('Test'),
+ 'wallTime': timestamp,
+ 'timestamp': timestamp,
+ 'request': st.fixed_dictionaries ({
+ 'url': url,
+ 'method': methodSt,
+ 'headers': chromeHeaders (),
+ # XXX: postData, hasPostData
+ })
+ })
+
+def chromeResponseReceived (reqid, url):
+ mimeTypeSt = st.one_of (st.none (), st.just ('text/html'))
+ remoteIpAddressSt = st.one_of (st.none (), st.just ('127.0.0.1'))
+ protocolSt = st.one_of (st.none (), st.just ('h2'))
+ statusCodeSt = st.integers (min_value=100, max_value=999)
+ typeSt = st.sampled_from (['Document', 'Stylesheet', 'Image', 'Media',
+ 'Font', 'Script', 'TextTrack', 'XHR', 'Fetch', 'EventSource',
+ 'WebSocket', 'Manifest', 'SignedExchange', 'Ping',
+ 'CSPViolationReport', 'Other'])
+ return st.fixed_dictionaries ({
+ 'requestId': reqid,
+ 'timestamp': timestamp,
+ 'type': typeSt,
+ 'response': st.fixed_dictionaries ({
+ 'url': url,
+ 'requestHeaders': chromeHeaders (), # XXX: make this optional
+ 'headers': chromeHeaders (),
+ 'status': statusCodeSt,
+ 'statusText': asciiText,
+ 'mimeType': mimeTypeSt,
+ 'remoteIPAddress': remoteIpAddressSt,
+ 'protocol': protocolSt,
+ })
+ })
+
+def chromeReqResp ():
+ # XXX: will this gnerated the same url for all testcases?
+ reqid = st.shared (st.text (), 'reqresp')
+ url = st.shared (urlsStr (), 'reqresp')
+ return st.tuples (chromeRequestWillBeSent (reqid, url),
+ chromeResponseReceived (reqid, url))
+
+def requestResponsePair ():
+ def f (creq, cresp, hasPostData, reqBody, respBody):
+ i = RequestResponsePair ()
+ i.fromRequestWillBeSent (creq)
+ i.request.hasPostData = hasPostData
+ if hasPostData:
+ i.request.body = reqBody
+
+ if cresp is not None:
+ i.fromResponseReceived (cresp)
+ if respBody is not None:
+ i.response.body = respBody
+ return i
+
+ bodySt = st.one_of (
+ st.none (),
+ st.builds (UnicodeBody, st.text ()),
+ st.builds (Base64Body.fromBytes, st.binary ())
+ )
+ return st.builds (lambda reqresp, hasPostData, reqBody, respBody:
+ f (reqresp[0], reqresp[1], hasPostData, reqBody, respBody),
+ chromeReqResp (), st.booleans (), bodySt, bodySt)
+
+@given(chromeReqResp ())
+def test_requestResponsePair (creqresp):
+ creq, cresp = creqresp
+
+ item = RequestResponsePair ()
+
+ assert item.id is None
+ assert item.url is None
+ assert item.request is None
+ assert item.response is None
+
+ item.fromRequestWillBeSent (creq)
+
+ assert item.id == creq['requestId']
+ url = URL (creq['request']['url'])
+ assert item.url == url
+ assert item.request is not None
+ assert item.request.timestamp == datetime.utcfromtimestamp (creq['wallTime'])
+ assert set (item.request.headers.keys ()) == set (creq['request']['headers'].keys ())
+ assert item.response is None
+
+ item.fromResponseReceived (cresp)
+
+ # url will not be overwritten
+ assert item.id == creq['requestId'] == cresp['requestId']
+ assert item.url == url
+ assert item.request is not None
+ assert set (item.request.headers.keys ()) == set (cresp['response']['requestHeaders'].keys ())
+ assert item.response is not None
+ assert set (item.response.headers.keys ()) == set (cresp['response']['headers'].keys ())
+ assert (item.response.timestamp - item.request.timestamp).total_seconds () - \
+ (cresp['timestamp'] - creq['timestamp']) < 10e-6
+
+@given(chromeReqResp ())
+def test_requestResponsePair_eq (creqresp):
+ creq, cresp = creqresp
+
+ item = RequestResponsePair ()
+ item2 = RequestResponsePair ()
+ assert item == item
+ assert item == item2
+
+ item.fromRequestWillBeSent (creq)
+ assert item != item2
+ item2.fromRequestWillBeSent (creq)
+ assert item == item
+ assert item == item2
+
+ item.fromResponseReceived (cresp)
+ assert item != item2
+ item2.fromResponseReceived (cresp)
+ assert item == item
+ assert item == item2
+
+ # XXX: test for inequality with different parameters
+
+### Google Chrome integration tests ###
+
+serverUrl = URL.build (scheme='http', host='localhost', port=8080)
+items = [
+ RequestResponsePair (
+ url=serverUrl.with_path ('/encoding/utf-8'),
+ request=Request (method='GET'),
+ response=Response (status=200, headers=CIMultiDict ([('Content-Type', 'text/html; charset=utf-8')]),
+ body=UnicodeBody ('äöü'), mimeType='text/html')
+ ),
+ RequestResponsePair (
+ url=serverUrl.with_path ('/encoding/latin1'),
+ request=Request (method='GET'),
+ response=Response (status=200, headers=CIMultiDict ([('Content-Type', 'text/html; charset=latin1')]),
+ body=UnicodeBody ('äöü'), mimeType='text/html')
+ ),
+ RequestResponsePair (
+ url=serverUrl.with_path ('/encoding/utf-16'),
+ request=Request (method='GET'),
+ response=Response (status=200, headers=CIMultiDict ([('Content-Type', 'text/html; charset=utf-16')]),
+ body=UnicodeBody ('äöü'), mimeType='text/html')
+ ),
+ RequestResponsePair (
+ url=serverUrl.with_path ('/encoding/ISO-8859-1'),
+ request=Request (method='GET'),
+ response=Response (status=200, headers=CIMultiDict ([('Content-Type', 'text/html; charset=ISO-8859-1')]),
+ body=UnicodeBody ('äöü'), mimeType='text/html')
+ ),
+ RequestResponsePair (
+ url=serverUrl.with_path ('/status/200'),
+ request=Request (method='GET'),
+ response=Response (status=200, headers=CIMultiDict ([('Content-Type', 'text/plain')]),
+ body=b'',
+ mimeType='text/plain'),
+ ),
+ # redirects never have a response body
+ RequestResponsePair (
+ url=serverUrl.with_path ('/status/301'),
+ request=Request (method='GET'),
+ response=Response (status=301,
+ headers=CIMultiDict ([('Content-Type', 'text/plain'),
+ ('Location', str (serverUrl.with_path ('/status/301/redirected')))]),
+ body=None,
+ mimeType='text/plain'),
+ ),
+ RequestResponsePair (
+ url=serverUrl.with_path ('/image/png'),
+ request=Request (method='GET'),
+ response=Response (status=200, headers=CIMultiDict ([('Content-Type', 'image/png')]),
+ body=Base64Body.fromBytes (b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x00\x00\x00\x00:~\x9bU\x00\x00\x00\nIDAT\x08\x1dc\xf8\x0f\x00\x01\x01\x01\x006_g\x80\x00\x00\x00\x00IEND\xaeB`\x82'),
+ mimeType='image/png'),
+ ),
+ RequestResponsePair (
+ url=serverUrl.with_path ('/script/alert'),
+ request=Request (method='GET'),
+ response=Response (status=200, headers=CIMultiDict ([('Content-Type', 'text/html; charset=utf-8')]),
+ body=UnicodeBody ('''<html><body><script>
+window.addEventListener("beforeunload", function (e) {
+ e.returnValue = "bye?";
+ return e.returnValue;
+});
+alert("stopping here");
+if (confirm("are you sure?") || prompt ("42?")) {
+ window.location = "/nonexistent";
+}
+</script></body></html>'''), mimeType='text/html')
+ ),
+ ]
+
+@pytest.mark.asyncio
+# would be nice if we could use hypothesis here somehow
+@pytest.mark.parametrize("golden", items)
+async def test_integration_item (loader, golden):
+ async def f (req):
+ body = golden.response.body
+ contentType = golden.response.headers.get ('content-type', '') if golden.response.headers is not None else ''
+ charsetOff = contentType.find ('charset=')
+ if isinstance (body, UnicodeBody) and charsetOff != -1:
+ encoding = contentType[charsetOff+len ('charset='):]
+ body = golden.response.body.decode ('utf-8').encode (encoding)
+ return web.Response (body=body, status=golden.response.status,
+ headers=golden.response.headers)
+
+ app = web.Application ()
+ app.router.add_route (golden.request.method, golden.url.path, f)
+ runner = web.AppRunner(app)
+ await runner.setup()
+ site = web.TCPSite(runner, serverUrl.host, serverUrl.port)
+ try:
+ await site.start()
+ except Exception as e:
+ pytest.skip (e)
+
+ haveReqResp = False
+ haveNavigated = False
+ try:
+ await loader.navigate (golden.url)
+
+ it = loader.__aiter__ ()
+ while True:
+ try:
+ item = await asyncio.wait_for (it.__anext__ (), timeout=1)
+ except asyncio.TimeoutError:
+ break
+ # XXX: can only check the first req/resp right now (due to redirect)
+ if isinstance (item, RequestResponsePair) and not haveReqResp:
+ # we do not know this in advance
+ item.request.initiator = None
+ item.request.headers = None
+ item.remoteIpAddress = None
+ item.protocol = None
+ item.resourceType = None
+
+ if item.response:
+ assert item.response.statusText is not None
+ item.response.statusText = None
+
+ del item.response.headers['server']
+ del item.response.headers['content-length']
+ del item.response.headers['date']
+ assert item == golden
+ haveReqResp = True
+ elif isinstance (item, FrameNavigated):
+ # XXX: can’t check this, because of the redirect
+ #assert item.url == golden.url
+ haveNavigated = True
+ finally:
+ assert haveReqResp
+ assert haveNavigated
+ await runner.cleanup ()
+
+def test_page_idle ():
+ for v in (True, False):
+ idle = PageIdle (v)
+ assert bool (idle) == v
+