diff options
Diffstat (limited to 'crocoite/test_warc.py')
-rw-r--r-- | crocoite/test_warc.py | 145 |
1 files changed, 51 insertions, 94 deletions
diff --git a/crocoite/test_warc.py b/crocoite/test_warc.py index 7f2635b..954e8c8 100644 --- a/crocoite/test_warc.py +++ b/crocoite/test_warc.py @@ -24,6 +24,7 @@ from operator import itemgetter from warcio.archiveiterator import ArchiveIterator from yarl import URL +from multidict import CIMultiDict from hypothesis import given, reproduce_failure import hypothesis.strategies as st import pytest @@ -32,7 +33,8 @@ from .warc import WarcHandler from .logger import Logger, WarcHandlerConsumer from .controller import ControllerStart from .behavior import Script, ScreenshotEvent, DomSnapshotEvent -from .browser import Item +from .browser import RequestResponsePair, Base64Body, UnicodeBody +from .test_browser import requestResponsePair, urls def test_log (): logger = Logger () @@ -66,50 +68,6 @@ def test_log (): data = json.loads (l.strip ()) assert data == golden.pop (0) -def hostname (): - # XXX: find a better way to generate hostnames - return st.text (alphabet=st.sampled_from('abcdefghijklmnopqrstuvwxyz0123456789-'), min_size=1, max_size=253) - -def urls (): - """ Build http/https URL """ - scheme = st.one_of (st.just ('http'), st.just ('https')) - # Path must start with a slash - pathSt = st.builds (lambda x: '/' + x, st.text ()) - args = st.fixed_dictionaries ({ - 'scheme': scheme, - 'host': hostname (), - 'port': st.one_of (st.none (), st.integers (min_value=1, max_value=2**16-1)), - 'path': pathSt, - 'query_string': st.text (), - 'fragment': st.text (), - }) - return st.builds (lambda x: URL.build (**x), args) - -def item (): - def f (url, requestBody, body, mimeType): - i = Item () - # XXX: we really need some level of abstraction. Testing is a nightmare. - i.setRequest ({'requestId': 'myid', 'initiator': 'Test', 'wallTime': 0, 'timestamp': 1, 'request': {'url': str (url), 'method': 'GET', 'headers': {'None': 'None'}}}) - i.setResponse ({'requestId': 'myid', 'timestamp': 2, 'type': 'Document', 'response': {'url': str (url), 'requestHeaders': {'foo': 'bar', 'Set-Cookie': 'line1\nline2'}, 'headers': {'Response': 'Headers', 'Content-Length': '12345'}, 'status': 200}}) - if mimeType is not None: - i.chromeResponse['response']['mimeType'] = 'text/html' - i.requestBody = requestBody - i.body = body - return i - - def failedItem (url): - i = Item () - i.setRequest ({'requestId': 'myid', 'initiator': 'Test', 'wallTime': 0, 'timestamp': 1, 'request': {'url': str (url), 'method': 'GET', 'headers': {'None': 'None'}}}) - i.failed = True - return i - - bodySt = st.one_of (st.none (), st.tuples (st.one_of (st.none (), st.binary ()), st.booleans ())) - mimeTypeSt = st.one_of (st.none (), st.just ('text/html')) - return st.one_of ( - st.builds (failedItem, urls ()), - st.builds (f, urls (), bodySt, bodySt, mimeTypeSt), - ) - def jsonObject (): """ JSON-encodable objects """ return st.dictionaries (st.text (), st.one_of (st.integers (), st.text ())) @@ -123,7 +81,7 @@ def event (): st.builds (Script.fromStr, st.text (), st.one_of(st.none (), st.text ())), st.builds (ScreenshotEvent, urls (), st.integers (), st.binary ()), st.builds (DomSnapshotEvent, urls (), st.builds (lambda x: x.encode ('utf-8'), st.text ()), viewport()), - item (), + requestResponsePair (), ) @given (st.lists (event ())) @@ -136,7 +94,7 @@ def test_push (golden): # null logger logger = Logger () - with NamedTemporaryFile() as fd: + with open('/tmp/test.warc.gz', 'w+b') as fd: with WarcHandler (fd, logger) as handler: for g in golden: handler.push (g) @@ -191,10 +149,7 @@ def test_push (golden): assert headers['X-DOM-Snapshot'] == 'True' assert rec.raw_stream.read () == g.document - elif isinstance (g, Item): - if g.failed: - continue - + elif isinstance (g, RequestResponsePair): rec = next (it) # request @@ -204,54 +159,56 @@ def test_push (golden): assert URL (headers['warc-target-uri']) == g.url assert headers['x-chrome-request-id'] == g.id - assert sorted (rec.http_headers.headers, key=itemgetter (0)) == sorted (g.requestHeaders, key=itemgetter (0)) - if g.requestBody: - if g.requestBody[0] is None: - assert not rec.raw_stream.read () + assert CIMultiDict (rec.http_headers.headers) == g.request.headers + if g.request.hasPostData: + if g.request.body is not None: + assert rec.raw_stream.read () == g.request.body + assert str (headers['x-chrome-base64body'] or False) == str (isinstance (g.request.body, Base64Body)), (headers['x-chrome-base64body'], g.request.body) else: - assert rec.raw_stream.read () == g.requestBody[0], g.requestBody - assert str (headers['x-chrome-base64body'] or False) == str (g.requestBody[1]), (headers['x-chrome-base64body'], g.requestBody) + # body fetch failed + assert headers['warc-truncated'] == 'unspecified' + assert not rec.raw_stream.read () else: - # body fetch failed - assert headers['warc-truncated'] == 'unspecified' + assert not rec.raw_stream.read () # response - rec = next (it) - headers = rec.rec_headers - httpheaders = rec.http_headers - assert headers['warc-type'] == 'response' - checkWarcinfoId (headers) - assert URL (headers['warc-target-uri']) == g.url - assert headers['x-chrome-request-id'] == g.id - - # these are checked separately - blacklistedHeaders = {'content-type', 'content-length'} - sortedHeaders = lambda l: sorted (filter (lambda x: x[0].lower() not in blacklistedHeaders, l), key=itemgetter (0)) - assert sortedHeaders (httpheaders.headers) == sortedHeaders (g.responseHeaders) - - expectedContentType = g.response.get ('mimeType') - if expectedContentType is not None: - assert httpheaders['content-type'].startswith (expectedContentType) - - if g.body: - if g.body[0] is None: - assert not rec.raw_stream.read () - #assert httpheaders['content-length'] == '0' + if g.response: + rec = next (it) + headers = rec.rec_headers + httpheaders = rec.http_headers + assert headers['warc-type'] == 'response' + checkWarcinfoId (headers) + assert URL (headers['warc-target-uri']) == g.url + assert headers['x-chrome-request-id'] == g.id + + # these are checked separately + filteredHeaders = CIMultiDict (httpheaders.headers) + for b in {'content-type', 'content-length'}: + if b in g.response.headers: + g.response.headers.popall (b) + if b in filteredHeaders: + filteredHeaders.popall (b) + assert filteredHeaders == g.response.headers + + expectedContentType = g.response.mimeType + if expectedContentType is not None: + assert httpheaders['content-type'].startswith (expectedContentType) + + if g.response.body is not None: + assert rec.raw_stream.read () == g.response.body + assert str (headers['x-chrome-base64body'] or False) == str (isinstance (g.response.body, Base64Body)) + assert httpheaders['content-length'] == str (len (g.response.body)) + # body is never truncated if it exists + assert headers['warc-truncated'] is None + + # unencoded strings are converted to utf8 + if isinstance (g.response.body, UnicodeBody) and httpheaders['content-type'] is not None: + assert httpheaders['content-type'].endswith ('; charset=utf-8') else: - assert rec.raw_stream.read () == g.body[0] - assert str (headers['x-chrome-base64body'] or False) == str (g.body[1]) - assert httpheaders['content-length'] == str (len (g.body[0])) - - # body is never truncated if it exists - assert headers['warc-truncated'] is None - - # unencoded strings are converted to utf8 - if not g.body[1] and httpheaders['content-type'] is not None: - assert httpheaders['content-type'].endswith ('; charset=utf-8') - else: - # body fetch failed - assert headers['warc-truncated'] == 'unspecified' - # content-length header should be kept intact + # body fetch failed + assert headers['warc-truncated'] == 'unspecified' + assert not rec.raw_stream.read () + # content-length header should be kept intact else: assert False, f"invalid golden type {type(g)}" # pragma: no cover |