summaryrefslogtreecommitdiff
path: root/crocoite/test_warc.py
diff options
context:
space:
mode:
Diffstat (limited to 'crocoite/test_warc.py')
-rw-r--r--crocoite/test_warc.py145
1 files changed, 51 insertions, 94 deletions
diff --git a/crocoite/test_warc.py b/crocoite/test_warc.py
index 7f2635b..954e8c8 100644
--- a/crocoite/test_warc.py
+++ b/crocoite/test_warc.py
@@ -24,6 +24,7 @@ from operator import itemgetter
from warcio.archiveiterator import ArchiveIterator
from yarl import URL
+from multidict import CIMultiDict
from hypothesis import given, reproduce_failure
import hypothesis.strategies as st
import pytest
@@ -32,7 +33,8 @@ from .warc import WarcHandler
from .logger import Logger, WarcHandlerConsumer
from .controller import ControllerStart
from .behavior import Script, ScreenshotEvent, DomSnapshotEvent
-from .browser import Item
+from .browser import RequestResponsePair, Base64Body, UnicodeBody
+from .test_browser import requestResponsePair, urls
def test_log ():
logger = Logger ()
@@ -66,50 +68,6 @@ def test_log ():
data = json.loads (l.strip ())
assert data == golden.pop (0)
-def hostname ():
- # XXX: find a better way to generate hostnames
- return st.text (alphabet=st.sampled_from('abcdefghijklmnopqrstuvwxyz0123456789-'), min_size=1, max_size=253)
-
-def urls ():
- """ Build http/https URL """
- scheme = st.one_of (st.just ('http'), st.just ('https'))
- # Path must start with a slash
- pathSt = st.builds (lambda x: '/' + x, st.text ())
- args = st.fixed_dictionaries ({
- 'scheme': scheme,
- 'host': hostname (),
- 'port': st.one_of (st.none (), st.integers (min_value=1, max_value=2**16-1)),
- 'path': pathSt,
- 'query_string': st.text (),
- 'fragment': st.text (),
- })
- return st.builds (lambda x: URL.build (**x), args)
-
-def item ():
- def f (url, requestBody, body, mimeType):
- i = Item ()
- # XXX: we really need some level of abstraction. Testing is a nightmare.
- i.setRequest ({'requestId': 'myid', 'initiator': 'Test', 'wallTime': 0, 'timestamp': 1, 'request': {'url': str (url), 'method': 'GET', 'headers': {'None': 'None'}}})
- i.setResponse ({'requestId': 'myid', 'timestamp': 2, 'type': 'Document', 'response': {'url': str (url), 'requestHeaders': {'foo': 'bar', 'Set-Cookie': 'line1\nline2'}, 'headers': {'Response': 'Headers', 'Content-Length': '12345'}, 'status': 200}})
- if mimeType is not None:
- i.chromeResponse['response']['mimeType'] = 'text/html'
- i.requestBody = requestBody
- i.body = body
- return i
-
- def failedItem (url):
- i = Item ()
- i.setRequest ({'requestId': 'myid', 'initiator': 'Test', 'wallTime': 0, 'timestamp': 1, 'request': {'url': str (url), 'method': 'GET', 'headers': {'None': 'None'}}})
- i.failed = True
- return i
-
- bodySt = st.one_of (st.none (), st.tuples (st.one_of (st.none (), st.binary ()), st.booleans ()))
- mimeTypeSt = st.one_of (st.none (), st.just ('text/html'))
- return st.one_of (
- st.builds (failedItem, urls ()),
- st.builds (f, urls (), bodySt, bodySt, mimeTypeSt),
- )
-
def jsonObject ():
""" JSON-encodable objects """
return st.dictionaries (st.text (), st.one_of (st.integers (), st.text ()))
@@ -123,7 +81,7 @@ def event ():
st.builds (Script.fromStr, st.text (), st.one_of(st.none (), st.text ())),
st.builds (ScreenshotEvent, urls (), st.integers (), st.binary ()),
st.builds (DomSnapshotEvent, urls (), st.builds (lambda x: x.encode ('utf-8'), st.text ()), viewport()),
- item (),
+ requestResponsePair (),
)
@given (st.lists (event ()))
@@ -136,7 +94,7 @@ def test_push (golden):
# null logger
logger = Logger ()
- with NamedTemporaryFile() as fd:
+ with open('/tmp/test.warc.gz', 'w+b') as fd:
with WarcHandler (fd, logger) as handler:
for g in golden:
handler.push (g)
@@ -191,10 +149,7 @@ def test_push (golden):
assert headers['X-DOM-Snapshot'] == 'True'
assert rec.raw_stream.read () == g.document
- elif isinstance (g, Item):
- if g.failed:
- continue
-
+ elif isinstance (g, RequestResponsePair):
rec = next (it)
# request
@@ -204,54 +159,56 @@ def test_push (golden):
assert URL (headers['warc-target-uri']) == g.url
assert headers['x-chrome-request-id'] == g.id
- assert sorted (rec.http_headers.headers, key=itemgetter (0)) == sorted (g.requestHeaders, key=itemgetter (0))
- if g.requestBody:
- if g.requestBody[0] is None:
- assert not rec.raw_stream.read ()
+ assert CIMultiDict (rec.http_headers.headers) == g.request.headers
+ if g.request.hasPostData:
+ if g.request.body is not None:
+ assert rec.raw_stream.read () == g.request.body
+ assert str (headers['x-chrome-base64body'] or False) == str (isinstance (g.request.body, Base64Body)), (headers['x-chrome-base64body'], g.request.body)
else:
- assert rec.raw_stream.read () == g.requestBody[0], g.requestBody
- assert str (headers['x-chrome-base64body'] or False) == str (g.requestBody[1]), (headers['x-chrome-base64body'], g.requestBody)
+ # body fetch failed
+ assert headers['warc-truncated'] == 'unspecified'
+ assert not rec.raw_stream.read ()
else:
- # body fetch failed
- assert headers['warc-truncated'] == 'unspecified'
+ assert not rec.raw_stream.read ()
# response
- rec = next (it)
- headers = rec.rec_headers
- httpheaders = rec.http_headers
- assert headers['warc-type'] == 'response'
- checkWarcinfoId (headers)
- assert URL (headers['warc-target-uri']) == g.url
- assert headers['x-chrome-request-id'] == g.id
-
- # these are checked separately
- blacklistedHeaders = {'content-type', 'content-length'}
- sortedHeaders = lambda l: sorted (filter (lambda x: x[0].lower() not in blacklistedHeaders, l), key=itemgetter (0))
- assert sortedHeaders (httpheaders.headers) == sortedHeaders (g.responseHeaders)
-
- expectedContentType = g.response.get ('mimeType')
- if expectedContentType is not None:
- assert httpheaders['content-type'].startswith (expectedContentType)
-
- if g.body:
- if g.body[0] is None:
- assert not rec.raw_stream.read ()
- #assert httpheaders['content-length'] == '0'
+ if g.response:
+ rec = next (it)
+ headers = rec.rec_headers
+ httpheaders = rec.http_headers
+ assert headers['warc-type'] == 'response'
+ checkWarcinfoId (headers)
+ assert URL (headers['warc-target-uri']) == g.url
+ assert headers['x-chrome-request-id'] == g.id
+
+ # these are checked separately
+ filteredHeaders = CIMultiDict (httpheaders.headers)
+ for b in {'content-type', 'content-length'}:
+ if b in g.response.headers:
+ g.response.headers.popall (b)
+ if b in filteredHeaders:
+ filteredHeaders.popall (b)
+ assert filteredHeaders == g.response.headers
+
+ expectedContentType = g.response.mimeType
+ if expectedContentType is not None:
+ assert httpheaders['content-type'].startswith (expectedContentType)
+
+ if g.response.body is not None:
+ assert rec.raw_stream.read () == g.response.body
+ assert str (headers['x-chrome-base64body'] or False) == str (isinstance (g.response.body, Base64Body))
+ assert httpheaders['content-length'] == str (len (g.response.body))
+ # body is never truncated if it exists
+ assert headers['warc-truncated'] is None
+
+ # unencoded strings are converted to utf8
+ if isinstance (g.response.body, UnicodeBody) and httpheaders['content-type'] is not None:
+ assert httpheaders['content-type'].endswith ('; charset=utf-8')
else:
- assert rec.raw_stream.read () == g.body[0]
- assert str (headers['x-chrome-base64body'] or False) == str (g.body[1])
- assert httpheaders['content-length'] == str (len (g.body[0]))
-
- # body is never truncated if it exists
- assert headers['warc-truncated'] is None
-
- # unencoded strings are converted to utf8
- if not g.body[1] and httpheaders['content-type'] is not None:
- assert httpheaders['content-type'].endswith ('; charset=utf-8')
- else:
- # body fetch failed
- assert headers['warc-truncated'] == 'unspecified'
- # content-length header should be kept intact
+ # body fetch failed
+ assert headers['warc-truncated'] == 'unspecified'
+ assert not rec.raw_stream.read ()
+ # content-length header should be kept intact
else:
assert False, f"invalid golden type {type(g)}" # pragma: no cover