summaryrefslogtreecommitdiff
path: root/crocoite/test_behavior.py
diff options
context:
space:
mode:
Diffstat (limited to 'crocoite/test_behavior.py')
-rw-r--r--crocoite/test_behavior.py186
1 files changed, 177 insertions, 9 deletions
diff --git a/crocoite/test_behavior.py b/crocoite/test_behavior.py
index 280b35b..1efea08 100644
--- a/crocoite/test_behavior.py
+++ b/crocoite/test_behavior.py
@@ -18,19 +18,24 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
-import asyncio, os, yaml, re
-from urllib.parse import urlparse
+import asyncio, os, yaml, re, math, struct
from functools import partial
+from operator import attrgetter
+
import pytest
+from yarl import URL
+from aiohttp import web
import pkg_resources
from .logger import Logger
from .devtools import Process
-from .behavior import Scroll, Behavior
-from .controller import SinglePageController
+from .behavior import Scroll, Behavior, ExtractLinks, ExtractLinksEvent, Crash, \
+ Screenshot, ScreenshotEvent, DomSnapshot, DomSnapshotEvent, mapOrIgnore
+from .controller import SinglePageController, EventHandler, ControllerSettings
+from .devtools import Crashed
with pkg_resources.resource_stream (__name__, os.path.join ('data', 'click.yaml')) as fd:
- sites = list (yaml.load_all (fd))
+ sites = list (yaml.safe_load_all (fd))
clickParam = []
for o in sites:
for s in o['selector']:
@@ -67,7 +72,7 @@ class ClickTester (Behavior):
# assert any (map (lambda x: x['type'] == 'click', listeners)), listeners
return
- yield
+ yield # pragma: no cover
@pytest.mark.parametrize("url,selector", clickParam)
@pytest.mark.asyncio
@@ -77,8 +82,10 @@ async def test_click_selectors (url, selector):
Make sure the CSS selector exists on an example url
"""
logger = Logger ()
+ settings = ControllerSettings (idleTimeout=5, timeout=10)
# Some selectors are loaded dynamically and require scrolling
controller = SinglePageController (url=url, logger=logger,
+ settings=settings,
service=Process (),
behavior=[Scroll, partial(ClickTester, selector=selector)])
await controller.run ()
@@ -87,12 +94,173 @@ matchParam = []
for o in sites:
for s in o['selector']:
for u in s.get ('urls', []):
- matchParam.append ((o['match'], u))
+ matchParam.append ((o['match'], URL (u)))
@pytest.mark.parametrize("match,url", matchParam)
@pytest.mark.asyncio
async def test_click_match (match, url):
""" Test urls must match """
- host = urlparse (url).netloc
- assert re.match (match, host, re.I)
+ # keep this aligned with click.js
+ assert re.match (match, url.host, re.I)
+
+
+class AccumHandler (EventHandler):
+ """ Test adapter that accumulates all incoming items """
+ __slots__ = ('data')
+
+ def __init__ (self):
+ super().__init__ ()
+ self.data = []
+
+ async def push (self, item):
+ self.data.append (item)
+
+async def simpleServer (url, response):
+ async def f (req):
+ return web.Response (body=response, status=200, content_type='text/html', charset='utf-8')
+
+ app = web.Application ()
+ app.router.add_route ('GET', url.path, f)
+ runner = web.AppRunner(app)
+ await runner.setup()
+ site = web.TCPSite(runner, url.host, url.port)
+ await site.start()
+ return runner
+
+@pytest.mark.asyncio
+async def test_extract_links ():
+ """
+ Make sure the CSS selector exists on an example url
+ """
+
+ url = URL.build (scheme='http', host='localhost', port=8080)
+ runner = await simpleServer (url, """<html><head></head>
+ <body>
+ <div>
+ <a href="/relative">foo</a>
+ <a href="http://example.com/absolute/">foo</a>
+ <a href="https://example.com/absolute/secure">foo</a>
+ <a href="#anchor">foo</a>
+ <a href="http://neue_preise_f%c3%bcr_zahnimplantate_k%c3%b6nnten_sie_%c3%bcberraschen">foo</a>
+
+ <a href="/hidden/visibility" style="visibility: hidden">foo</a>
+ <a href="/hidden/display" style="display: none">foo</a>
+ <div style="display: none">
+ <a href="/hidden/display/insidediv">foo</a>
+ </div>
+ <!--<a href="/hidden/comment">foo</a>-->
+
+ <p><img src="shapes.png" usemap="#shapes">
+ <map name="shapes"><area shape=rect coords="50,50,100,100" href="/map/rect"></map></p>
+ </div>
+ </body></html>""")
+
+ try:
+ handler = AccumHandler ()
+ logger = Logger ()
+ controller = SinglePageController (url=url, logger=logger,
+ service=Process (), behavior=[ExtractLinks], handler=[handler])
+ await controller.run ()
+
+ links = []
+ for d in handler.data:
+ if isinstance (d, ExtractLinksEvent):
+ links.extend (d.links)
+ assert sorted (links) == sorted ([
+ url.with_path ('/relative'),
+ url.with_fragment ('anchor'),
+ URL ('http://neue_preise_f%C3%BCr_zahnimplantate_k%C3%B6nnten_sie_%C3%BCberraschen'),
+ URL ('http://example.com/absolute/'),
+ URL ('https://example.com/absolute/secure'),
+ url.with_path ('/hidden/visibility'), # XXX: shall we ignore these as well?
+ url.with_path ('/map/rect'),
+ ])
+ finally:
+ await runner.cleanup ()
+
+@pytest.mark.asyncio
+async def test_crash ():
+ """
+ Crashing through Behavior works?
+ """
+
+ url = URL.build (scheme='http', host='localhost', port=8080)
+ runner = await simpleServer (url, '<html></html>')
+
+ try:
+ logger = Logger ()
+ controller = SinglePageController (url=url, logger=logger,
+ service=Process (), behavior=[Crash])
+ with pytest.raises (Crashed):
+ await controller.run ()
+ finally:
+ await runner.cleanup ()
+
+@pytest.mark.asyncio
+async def test_screenshot ():
+ """
+ Make sure screenshots are taken and have the correct dimensions. We can’t
+ and don’t want to check their content.
+ """
+ # ceil(0) == 0, so starting with 1
+ for expectHeight in (1, Screenshot.maxDim, Screenshot.maxDim+1, Screenshot.maxDim*2+Screenshot.maxDim//2):
+ url = URL.build (scheme='http', host='localhost', port=8080)
+ runner = await simpleServer (url, f'<html><body style="margin: 0; padding: 0;"><div style="height: {expectHeight}"></div></body></html>')
+
+ try:
+ handler = AccumHandler ()
+ logger = Logger ()
+ controller = SinglePageController (url=url, logger=logger,
+ service=Process (), behavior=[Screenshot], handler=[handler])
+ await controller.run ()
+
+ screenshots = list (filter (lambda x: isinstance (x, ScreenshotEvent), handler.data))
+ assert len (screenshots) == math.ceil (expectHeight/Screenshot.maxDim)
+ totalHeight = 0
+ for s in screenshots:
+ assert s.url == url
+ # PNG ident is fixed, IHDR is always the first chunk
+ assert s.data.startswith (b'\x89PNG\r\n\x1a\n\x00\x00\x00\x0dIHDR')
+ width, height = struct.unpack ('>II', s.data[16:24])
+ assert height <= Screenshot.maxDim
+ totalHeight += height
+ # screenshot height is at least canvas height (XXX: get hardcoded
+ # value from devtools.Process)
+ assert totalHeight == max (expectHeight, 1080)
+ finally:
+ await runner.cleanup ()
+
+@pytest.mark.asyncio
+async def test_dom_snapshot ():
+ """
+ Behavior plug-in works, <canvas> is replaced by static image, <script> is
+ stripped. Actual conversion from Chrome DOM to HTML is validated by module
+ .test_html
+ """
+
+ url = URL.build (scheme='http', host='localhost', port=8080)
+ runner = await simpleServer (url, f'<html><body><p>ÄÖÜäöü</p><script>alert("yes");</script><canvas id="canvas" width="1" height="1">Alternate text.</canvas></body></html>')
+
+ try:
+ handler = AccumHandler ()
+ logger = Logger ()
+ controller = SinglePageController (url=url, logger=logger,
+ service=Process (), behavior=[DomSnapshot], handler=[handler])
+ await controller.run ()
+
+ snapshots = list (filter (lambda x: isinstance (x, DomSnapshotEvent), handler.data))
+ assert len (snapshots) == 1
+ doc = snapshots[0].document
+ assert doc.startswith ('<HTML><HEAD><meta charset=utf-8></HEAD><BODY><P>ÄÖÜäöü</P><IMG id=canvas width=1 height=1 src="data:image/png;base64,'.encode ('utf-8'))
+ assert doc.endswith ('></BODY></HTML>'.encode ('utf-8'))
+ finally:
+ await runner.cleanup ()
+
+def test_mapOrIgnore ():
+ def fail (x):
+ if x < 50:
+ raise Exception ()
+ return x+1
+
+ assert list (mapOrIgnore (fail, range (100))) == list (range (51, 101))