diff options
Diffstat (limited to 'crocoite/test_tools.py')
-rw-r--r-- | crocoite/test_tools.py | 224 |
1 files changed, 224 insertions, 0 deletions
diff --git a/crocoite/test_tools.py b/crocoite/test_tools.py new file mode 100644 index 0000000..416b954 --- /dev/null +++ b/crocoite/test_tools.py @@ -0,0 +1,224 @@ +# Copyright (c) 2018 crocoite contributors +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +from tempfile import NamedTemporaryFile +from operator import itemgetter +from io import BytesIO +import pytest +from warcio.archiveiterator import ArchiveIterator +from warcio.warcwriter import WARCWriter +from warcio.statusandheaders import StatusAndHeaders +from pkg_resources import parse_version + +from .tools import mergeWarc, Errata, FixableErrata + +@pytest.fixture +def writer(): + return WARCWriter (NamedTemporaryFile(), gzip=True) + +def recordsEqual(golden, underTest): + for a, b in zip (golden, underTest): + # record ids are not predictable, so we cannot compare them. Dito for + # dates. Content-* seems to be added when writing to file. + for x in {'WARC-Record-Id', 'WARC-Block-Digest', 'WARC-Date', + 'Content-Length', 'Content-Type'}: + a.rec_headers.remove_header(x) + b.rec_headers.remove_header(x) + aheader = sorted(a.rec_headers.headers, key=itemgetter(0)) + bheader = sorted(b.rec_headers.headers, key=itemgetter(0)) + assert aheader == bheader + assert a.http_headers == b.http_headers + +def makeGolden(writer, records): + # additional warcinfo is written. Content does not matter. + record = writer.create_warc_record ( + '', + 'warcinfo', + payload=b'', + warc_headers_dict={'Content-Type': 'application/json; charset=utf-8'}) + records.insert (0, record) + return records + +def test_unmodified(writer): + """ + Single request/response pair, no revisits + """ + + records = [] + + httpHeaders = StatusAndHeaders('GET / HTTP/1.1', {}, is_http_request=True) + warcHeaders = {} + record = writer.create_warc_record ('http://example.com/', 'request', payload=BytesIO(b'foobar'), + warc_headers_dict=warcHeaders, http_headers=httpHeaders) + records.append (record) + + httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1') + record = writer.create_warc_record ('http://example.com/', 'response', payload=BytesIO(b'data'), + warc_headers_dict=warcHeaders, http_headers=httpHeaders) + records.append (record) + + for r in records: + writer.write_record (r) + + output = NamedTemporaryFile() + mergeWarc ([writer.out.name], output) + + output.seek(0) + recordsEqual (makeGolden (writer, records), ArchiveIterator (output)) + +def test_different_payload(writer): + """ + Duplicate URL, but different payload + """ + + records = [] + for i in range (2): + httpHeaders = StatusAndHeaders('GET / HTTP/1.1', {}, is_http_request=True) + warcHeaders = {} + record = writer.create_warc_record ('http://example.com/', 'request', payload=BytesIO(b'foobar'), + warc_headers_dict=warcHeaders, http_headers=httpHeaders) + records.append (record) + + httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1') + record = writer.create_warc_record ('http://example.com/', 'response', + payload=BytesIO(f'data{i}'.encode ('utf8')), + warc_headers_dict=warcHeaders, http_headers=httpHeaders) + records.append (record) + + for r in records: + writer.write_record (r) + + output = NamedTemporaryFile() + mergeWarc ([writer.out.name], output) + + output.seek(0) + recordsEqual (makeGolden (writer, records), ArchiveIterator (output)) + +def makeRevisit(writer, ref, dup): + """ Make revisit record for reference """ + dupHeaders = dup.rec_headers + refHeaders = ref.rec_headers + record = writer.create_revisit_record (dupHeaders.get_header('WARC-Target-URI'), + digest=refHeaders.get_header('WARC-Payload-Digest'), + refers_to_uri=refHeaders.get_header('WARC-Target-URI'), + refers_to_date=refHeaders.get_header('WARC-Date'), + http_headers=dup.http_headers) + record.rec_headers.add_header ('WARC-Refers-To', refHeaders.get_header('WARC-Record-ID')) + record.rec_headers.add_header ('WARC-Truncated', 'length') + return record + +def test_resp_revisit_same_url(writer): + """ + Duplicate record for the same URL, creates a revisit + """ + + records = [] + for i in range (2): + httpHeaders = StatusAndHeaders('GET / HTTP/1.1', {}, is_http_request=True) + warcHeaders = {} + record = writer.create_warc_record ('http://example.com/', 'request', payload=BytesIO(b'foobar'), + warc_headers_dict=warcHeaders, http_headers=httpHeaders) + records.append (record) + + httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1') + record = writer.create_warc_record ('http://example.com/', 'response', payload=BytesIO(b'data'), + warc_headers_dict=warcHeaders, http_headers=httpHeaders) + records.append (record) + + for r in records: + writer.write_record (r) + + dup = records.pop () + ref = records[1] + records.append (makeRevisit (writer, ref, dup)) + + output = NamedTemporaryFile() + mergeWarc ([writer.out.name], output) + + output.seek(0) + recordsEqual (makeGolden (writer, records), ArchiveIterator (output)) + +def test_resp_revisit_other_url(writer): + """ + Duplicate record for different URL, creates a revisit + """ + + records = [] + + httpHeaders = StatusAndHeaders('GET / HTTP/1.1', {}, is_http_request=True) + warcHeaders = {} + record = writer.create_warc_record ('http://example.com/', 'request', payload=BytesIO(b'foobar'), + warc_headers_dict=warcHeaders, http_headers=httpHeaders) + records.append (record) + + httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1') + record = writer.create_warc_record ('http://example.com/', 'response', payload=BytesIO(b'data'), + warc_headers_dict=warcHeaders, http_headers=httpHeaders) + records.append (record) + + httpHeaders = StatusAndHeaders('GET / HTTP/1.1', {}, is_http_request=True) + warcHeaders = {} + record = writer.create_warc_record ('http://example.com/one', 'request', payload=BytesIO(b'foobar'), + warc_headers_dict=warcHeaders, http_headers=httpHeaders) + records.append (record) + + httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1') + record = writer.create_warc_record ('http://example.com/one', 'response', payload=BytesIO(b'data'), + warc_headers_dict=warcHeaders, http_headers=httpHeaders) + records.append (record) + + for r in records: + writer.write_record (r) + + dup = records.pop () + ref = records[1] + records.append (makeRevisit (writer, ref, dup)) + + output = NamedTemporaryFile() + mergeWarc ([writer.out.name], output) + + output.seek(0) + recordsEqual (makeGolden (writer, records), ArchiveIterator (output)) + +def test_errata_contains(): + """ Test version matching """ + e = Errata('some-uuid', 'description', ['a<1.0']) + assert {'a': parse_version('0.1')} in e + assert {'a': parse_version('1.0')} not in e + assert {'b': parse_version('1.0')} not in e + + e = Errata('some-uuid', 'description', ['a<1.0,>0.1']) + assert {'a': parse_version('0.1')} not in e + assert {'a': parse_version('0.2')} in e + assert {'a': parse_version('1.0')} not in e + + # a AND b + e = Errata('some-uuid', 'description', ['a<1.0', 'b>1.0']) + assert {'a': parse_version('0.1')} not in e + assert {'b': parse_version('1.1')} not in e + assert {'a': parse_version('0.1'), 'b': parse_version('1.1')} in e + +def test_errata_fixable (): + e = Errata('some-uuid', 'description', ['a<1.0', 'b>1.0']) + assert not e.fixable + + e = FixableErrata('some-uuid', 'description', ['a<1.0', 'b>1.0']) + assert e.fixable + |