diff options
author | Lars-Dominik Braun <lars@6xq.net> | 2018-06-20 21:01:33 +0200 |
---|---|---|
committer | Lars-Dominik Braun <lars@6xq.net> | 2018-06-20 21:01:55 +0200 |
commit | d77c58fc80a2fe62180796bdc28e8ed69bd46715 (patch) | |
tree | 71d1dc36ed680c19129de15daa2cb105a430784b | |
parent | 4302c6735d2985ed76e4a2d4a3319c7ef2c7ca84 (diff) | |
download | crocoite-d77c58fc80a2fe62180796bdc28e8ed69bd46715.tar.gz crocoite-d77c58fc80a2fe62180796bdc28e8ed69bd46715.tar.bz2 crocoite-d77c58fc80a2fe62180796bdc28e8ed69bd46715.zip |
Move tests to pytest
It just seems a little nicer than plain old unittest
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | .travis.yml | 2 | ||||
-rw-r--r-- | crocoite/browser.py | 162 | ||||
-rw-r--r-- | crocoite/test_browser.py | 177 | ||||
-rw-r--r-- | setup.cfg | 2 | ||||
-rw-r--r-- | setup.py | 2 |
6 files changed, 183 insertions, 163 deletions
@@ -1,3 +1,4 @@ __pycache__ *.sw? *.egg-info/ +.pytest_cache/ diff --git a/.travis.yml b/.travis.yml index e09e518..ce2d61f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,7 +8,7 @@ python: install: - pip install . script: - - python -m unittest crocoite.browser + - pytest --cov=crocoite crocoite addons: chrome: stable sudo: required diff --git a/crocoite/browser.py b/crocoite/browser.py index f583c9b..3b9f7ab 100644 --- a/crocoite/browser.py +++ b/crocoite/browser.py @@ -25,7 +25,6 @@ Chrome browser interactions. import logging from urllib.parse import urlsplit from base64 import b64decode -from http.server import BaseHTTPRequestHandler from collections import deque from threading import Event @@ -397,164 +396,3 @@ class NullService: def __exit__ (self, *exc): pass -### tests ### - -import unittest, time -from operator import itemgetter - -class TestItem (Item): - """ This should be as close to Item as possible """ - - __slots__ = ('bodySend', '_body') - base = 'http://localhost:8000/' - - def __init__ (self, path, status, headers, bodyReceive, bodySend=None): - super ().__init__ (tab=None) - self.chromeResponse = {'response': {'headers': headers, 'status': status, 'url': self.base + path}} - self._body = bodyReceive, False - self.bodySend = bodyReceive if not bodySend else bodySend - - @property - def body (self): - return self._body - -testItems = [ - TestItem ('binary', 200, {'Content-Type': 'application/octet-stream'}, b'\x00\x01\x02'), - TestItem ('attachment', 200, - {'Content-Type': 'text/plain; charset=utf-8', - 'Content-Disposition': 'attachment; filename="attachment.txt"', - }, - 'This is a simple text file with umlauts. ÄÖU.'.encode ('utf8')), - TestItem ('encoding/utf8', 200, {'Content-Type': 'text/plain; charset=utf-8'}, - 'This is a test, äöü μνψκ ¥¥¥¿ýý¡'.encode ('utf8')), - TestItem ('encoding/iso88591', 200, {'Content-Type': 'text/plain; charset=ISO-8859-1'}, - 'This is a test, äöü.'.encode ('utf8'), - 'This is a test, äöü.'.encode ('ISO-8859-1')), - TestItem ('encoding/latin1', 200, {'Content-Type': 'text/plain; charset=latin1'}, - 'This is a test, äöü.'.encode ('utf8'), - 'This is a test, äöü.'.encode ('latin1')), - TestItem ('image', 200, {'Content-Type': 'image/png'}, - # 1×1 png image - b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x00\x00\x00\x00:~\x9bU\x00\x00\x00\nIDAT\x08\x1dc\xf8\x0f\x00\x01\x01\x01\x006_g\x80\x00\x00\x00\x00IEND\xaeB`\x82'), - TestItem ('empty', 200, {}, b''), - TestItem ('redirect/301/empty', 301, {'Location': '/empty'}, b''), - TestItem ('redirect/301/redirect/301/empty', 301, {'Location': '/redirect/301/empty'}, b''), - TestItem ('nonexistent', 404, {}, b''), - TestItem ('html', 200, {'Content-Type': 'html'}, - '<html><body><img src="/image"><img src="/nonexistent"></body></html>'.encode ('utf8')), - TestItem ('html/alert', 200, {'Content-Type': 'html'}, - '<html><body><script>window.addEventListener("beforeunload", function (e) { e.returnValue = "bye?"; return e.returnValue; }); alert("stopping here"); if (confirm("are you sure?") || prompt ("42?")) { window.location = "/nonexistent"; }</script><img src="/image"></body></html>'.encode ('utf8')), - ] -testItemMap = dict ([(item.parsedUrl.path, item) for item in testItems]) - -class TestHTTPRequestHandler (BaseHTTPRequestHandler): - def do_GET(self): - item = testItemMap.get (self.path) - if item: - self.send_response (item.response['status']) - for k, v in item.response['headers'].items (): - self.send_header (k, v) - body = item.bodySend - self.send_header ('Content-Length', len (body)) - self.end_headers() - self.wfile.write (body) - return - - def log_message (self, format, *args): - pass - -def startServer (): - import http.server - PORT = 8000 - httpd = http.server.HTTPServer (("localhost", PORT), TestHTTPRequestHandler) - httpd.serve_forever() - -class TestSiteLoader (unittest.TestCase): - __slots__ = ('server', 'baseurl', 'service', 'browser') - - def setUp (self): - from multiprocessing import Process - self.server = Process (target=startServer) - self.server.start () - self.baseurl = 'http://localhost:8000' - self.service = ChromeService () - self.browser = self.service.__enter__ () - - def buildAdapter (self, path): - self.assertTrue (path.startswith ('/')) - return SiteLoader (self.browser, '{}{}'.format (self.baseurl, path)) - - def assertItems (self, l, items): - items = dict ([(i.parsedUrl.path, i) for i in items]) - timeout = 5 - while True: - if not l.notify.wait (timeout) and len (items) > 0: - self.fail ('timeout') - if len (l.queue) > 0: - item = l.queue.popleft () - if isinstance (item, Exception): - raise item - self.assertIsNot (item.chromeResponse, None, msg='url={}'.format (item.request['url'])) - golden = items.pop (item.parsedUrl.path) - if not golden: - self.fail ('url {} not supposed to be fetched'.format (item.url)) - self.assertEqual (item.body[0], golden.body[0], msg='body for url={}'.format (item.request['url'])) - self.assertEqual (item.response['status'], golden.response['status']) - for k, v in golden.responseHeaders: - actual = list (map (itemgetter (1), filter (lambda x: x[0] == k, item.responseHeaders))) - self.assertIn (v, actual) - - # check queue at least once - if not items: - break - - def assertLiteralItem (self, item, deps=[]): - with self.buildAdapter (item.parsedUrl.path) as l: - l.start () - self.assertItems (l, [item] + deps) - - def test_empty (self): - self.assertLiteralItem (testItemMap['/empty']) - - def test_redirect (self): - self.assertLiteralItem (testItemMap['/redirect/301/empty'], [testItemMap['/empty']]) - # chained redirects - self.assertLiteralItem (testItemMap['/redirect/301/redirect/301/empty'], [testItemMap['/redirect/301/empty'], testItemMap['/empty']]) - - def test_encoding (self): - """ Text responses are transformed to UTF-8. Make sure this works - correctly. """ - for item in {testItemMap['/encoding/utf8'], testItemMap['/encoding/latin1'], testItemMap['/encoding/iso88591']}: - self.assertLiteralItem (item) - - def test_binary (self): - """ Browser should ignore content it cannot display (i.e. octet-stream) """ - with self.buildAdapter ('/binary') as l: - l.start () - self.assertItems (l, []) - - def test_image (self): - """ Images should be displayed inline """ - self.assertLiteralItem (testItemMap['/image']) - - def test_attachment (self): - """ And downloads won’t work in headless mode, even if it’s just a text file """ - with self.buildAdapter ('/attachment') as l: - l.start () - self.assertItems (l, []) - - def test_html (self): - self.assertLiteralItem (testItemMap['/html'], [testItemMap['/image'], testItemMap['/nonexistent']]) - # make sure alerts are dismissed correctly (image won’t load otherwise) - self.assertLiteralItem (testItemMap['/html/alert'], [testItemMap['/image']]) - - def tearDown (self): - self.service.__exit__ (None, None, None) - self.server.terminate () - self.server.join () - -if __name__ == '__main__': - import sys - if sys.argv[1] == 'server': - startServer () - diff --git a/crocoite/test_browser.py b/crocoite/test_browser.py new file mode 100644 index 0000000..cf7be39 --- /dev/null +++ b/crocoite/test_browser.py @@ -0,0 +1,177 @@ +# Copyright (c) 2017 crocoite contributors +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import pytest +from operator import itemgetter +from http.server import BaseHTTPRequestHandler + +from .browser import Item, SiteLoader, ChromeService + +class TItem (Item): + """ This should be as close to Item as possible """ + + __slots__ = ('bodySend', '_body') + base = 'http://localhost:8000/' + + def __init__ (self, path, status, headers, bodyReceive, bodySend=None): + super ().__init__ (tab=None) + self.chromeResponse = {'response': {'headers': headers, 'status': status, 'url': self.base + path}} + self._body = bodyReceive, False + self.bodySend = bodyReceive if not bodySend else bodySend + + @property + def body (self): + return self._body + +testItems = [ + TItem ('binary', 200, {'Content-Type': 'application/octet-stream'}, b'\x00\x01\x02'), + TItem ('attachment', 200, + {'Content-Type': 'text/plain; charset=utf-8', + 'Content-Disposition': 'attachment; filename="attachment.txt"', + }, + 'This is a simple text file with umlauts. ÄÖU.'.encode ('utf8')), + TItem ('encoding/utf8', 200, {'Content-Type': 'text/plain; charset=utf-8'}, + 'This is a test, äöü μνψκ ¥¥¥¿ýý¡'.encode ('utf8')), + TItem ('encoding/iso88591', 200, {'Content-Type': 'text/plain; charset=ISO-8859-1'}, + 'This is a test, äöü.'.encode ('utf8'), + 'This is a test, äöü.'.encode ('ISO-8859-1')), + TItem ('encoding/latin1', 200, {'Content-Type': 'text/plain; charset=latin1'}, + 'This is a test, äöü.'.encode ('utf8'), + 'This is a test, äöü.'.encode ('latin1')), + TItem ('image', 200, {'Content-Type': 'image/png'}, + # 1×1 png image + b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x00\x00\x00\x00:~\x9bU\x00\x00\x00\nIDAT\x08\x1dc\xf8\x0f\x00\x01\x01\x01\x006_g\x80\x00\x00\x00\x00IEND\xaeB`\x82'), + TItem ('empty', 200, {}, b''), + TItem ('redirect/301/empty', 301, {'Location': '/empty'}, b''), + TItem ('redirect/301/redirect/301/empty', 301, {'Location': '/redirect/301/empty'}, b''), + TItem ('nonexistent', 404, {}, b''), + TItem ('html', 200, {'Content-Type': 'html'}, + '<html><body><img src="/image"><img src="/nonexistent"></body></html>'.encode ('utf8')), + TItem ('html/alert', 200, {'Content-Type': 'html'}, + '<html><body><script>window.addEventListener("beforeunload", function (e) { e.returnValue = "bye?"; return e.returnValue; }); alert("stopping here"); if (confirm("are you sure?") || prompt ("42?")) { window.location = "/nonexistent"; }</script><img src="/image"></body></html>'.encode ('utf8')), + ] +testItemMap = dict ([(item.parsedUrl.path, item) for item in testItems]) + +class RequestHandler (BaseHTTPRequestHandler): + def do_GET(self): + item = testItemMap.get (self.path) + if item: + self.send_response (item.response['status']) + for k, v in item.response['headers'].items (): + self.send_header (k, v) + body = item.bodySend + self.send_header ('Content-Length', len (body)) + self.end_headers() + self.wfile.write (body) + return + + def log_message (self, format, *args): + pass + +@pytest.fixture +def http (): + def run (): + import http.server + PORT = 8000 + httpd = http.server.HTTPServer (("localhost", PORT), RequestHandler) + print ('starting http server') + httpd.serve_forever() + + from multiprocessing import Process + p = Process (target=run) + p.start () + yield p + p.terminate () + p.join () + +@pytest.fixture +def loader (http): + def f (path): + assert path.startswith ('/') + return SiteLoader (browser, 'http://localhost:8000{}'.format (path)) + print ('loader setup') + with ChromeService () as browser: + yield f + print ('loader teardown') + +def itemsLoaded (l, items): + items = dict ([(i.parsedUrl.path, i) for i in items]) + timeout = 5 + while True: + if not l.notify.wait (timeout) and len (items) > 0: + assert False, 'timeout' + if len (l.queue) > 0: + item = l.queue.popleft () + if isinstance (item, Exception): + raise item + assert item.chromeResponse is not None + golden = items.pop (item.parsedUrl.path) + if not golden: + assert False, 'url {} not supposed to be fetched'.format (item.url) + assert item.body[0] == golden.body[0] + assert item.response['status'] == golden.response['status'] + for k, v in golden.responseHeaders: + actual = list (map (itemgetter (1), filter (lambda x: x[0] == k, item.responseHeaders))) + assert v in actual + + # check queue at least once + if not items: + break + +def literalItem (lf, item, deps=[]): + with lf (item.parsedUrl.path) as l: + l.start () + itemsLoaded (l, [item] + deps) + +def test_empty (loader): + literalItem (loader, testItemMap['/empty']) + +def test_redirect (loader): + literalItem (loader, testItemMap['/redirect/301/empty'], [testItemMap['/empty']]) + # chained redirects + literalItem (loader, testItemMap['/redirect/301/redirect/301/empty'], [testItemMap['/redirect/301/empty'], testItemMap['/empty']]) + +def test_encoding (loader): + """ Text responses are transformed to UTF-8. Make sure this works + correctly. """ + for item in {testItemMap['/encoding/utf8'], testItemMap['/encoding/latin1'], testItemMap['/encoding/iso88591']}: + literalItem (loader, item) + +def test_binary (loader): + """ Browser should ignore content it cannot display (i.e. octet-stream) """ + with loader ('/binary') as l: + l.start () + itemsLoaded (l, []) + +def test_image (loader): + """ Images should be displayed inline """ + literalItem (loader, testItemMap['/image']) + +def test_attachment (loader): + """ And downloads won’t work in headless mode, even if it’s just a text file """ + with loader ('/attachment') as l: + l.start () + itemsLoaded (l, []) + +def test_html (loader): + literalItem (loader, testItemMap['/html'], [testItemMap['/image'], testItemMap['/nonexistent']]) + # make sure alerts are dismissed correctly (image won’t load otherwise) + literalItem (loader, testItemMap['/html/alert'], [testItemMap['/image']]) + diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..b7e4789 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,2 @@ +[aliases] +test=pytest @@ -25,4 +25,6 @@ setup( package_data={ 'crocoite': ['data/*'], }, + setup_requires=["pytest-runner"], + tests_require=["pytest"], ) |