diff options
Diffstat (limited to 'crocoite/browser.py')
-rw-r--r-- | crocoite/browser.py | 644 |
1 files changed, 419 insertions, 225 deletions
diff --git a/crocoite/browser.py b/crocoite/browser.py index 23b251f..3518789 100644 --- a/crocoite/browser.py +++ b/crocoite/browser.py @@ -22,50 +22,251 @@ Chrome browser interactions. """ -import logging -from urllib.parse import urlsplit +import asyncio +from base64 import b64decode, b64encode +from datetime import datetime, timedelta +from http.server import BaseHTTPRequestHandler + +from yarl import URL +from multidict import CIMultiDict + +from .logger import Level +from .devtools import Browser, TabException + +# These two classes’ only purpose is so we can later tell whether a body was +# base64-encoded or a unicode string +class Base64Body (bytes): + def __new__ (cls, value): + return bytes.__new__ (cls, b64decode (value)) + + @classmethod + def fromBytes (cls, b): + """ For testing """ + return cls (b64encode (b)) + +class UnicodeBody (bytes): + def __new__ (cls, value): + if type (value) is not str: + raise TypeError ('expecting unicode string') + + return bytes.__new__ (cls, value.encode ('utf-8')) + +class Request: + __slots__ = ('headers', 'body', 'initiator', 'hasPostData', 'method', 'timestamp') + + def __init__ (self, method=None, headers=None, body=None): + self.headers = headers + self.body = body + self.hasPostData = False + self.initiator = None + # HTTP method + self.method = method + self.timestamp = None -class Item: - """ - Simple wrapper containing Chrome request and response - """ + def __repr__ (self): + return f'Request({self.method!r}, {self.headers!r}, {self.body!r})' + + def __eq__ (self, b): + if b is None: + return False + + if not isinstance (b, Request): + raise TypeError ('Can only compare equality with Request.') + + # do not compare hasPostData (only required to fetch body) and + # timestamp (depends on time) + return self.headers == b.headers and \ + self.body == b.body and \ + self.initiator == b.initiator and \ + self.method == b.method + +class Response: + __slots__ = ('status', 'statusText', 'headers', 'body', 'bytesReceived', + 'timestamp', 'mimeType') + + def __init__ (self, status=None, statusText=None, headers=None, body=None, mimeType=None): + self.status = status + self.statusText = statusText + self.headers = headers + self.body = body + # bytes received over the network (not body size!) + self.bytesReceived = 0 + self.timestamp = None + self.mimeType = mimeType - def __init__ (self): - self.chromeRequest = None - self.chromeResponse = None - self.chromeFinished = None + def __repr__ (self): + return f'Response({self.status!r}, {self.statusText!r}, {self.headers!r}, {self.body!r}, {self.mimeType!r})' + + def __eq__ (self, b): + if b is None: + return False + + if not isinstance (b, Response): + raise TypeError ('Can only compare equality with Response.') + + # do not compare bytesReceived (depends on network), timestamp + # (depends on time) and statusText (does not matter) + return self.status == b.status and \ + self.statusText == b.statusText and \ + self.headers == b.headers and \ + self.body == b.body and \ + self.mimeType == b.mimeType + +class ReferenceTimestamp: + """ Map relative timestamp to absolute timestamp """ + + def __init__ (self, relative, absolute): + self.relative = timedelta (seconds=relative) + self.absolute = datetime.utcfromtimestamp (absolute) + + def __call__ (self, relative): + if not isinstance (relative, timedelta): + relative = timedelta (seconds=relative) + return self.absolute + (relative-self.relative) + +class RequestResponsePair: + __slots__ = ('request', 'response', 'id', 'url', 'remoteIpAddress', + 'protocol', 'resourceType', '_time') + + def __init__ (self, id=None, url=None, request=None, response=None): + self.request = request + self.response = response + self.id = id + self.url = url + self.remoteIpAddress = None + self.protocol = None + self.resourceType = None + self._time = None def __repr__ (self): - return '<Item {}>'.format (self.request['url']) + return f'RequestResponsePair({self.id!r}, {self.url!r}, {self.request!r}, {self.response!r})' + + def __eq__ (self, b): + if not isinstance (b, RequestResponsePair): + raise TypeError (f'Can only compare with {self.__class__.__name__}') + + # do not compare id and _time. These depend on external factors and do + # not influence the request/response *content* + return self.request == b.request and \ + self.response == b.response and \ + self.url == b.url and \ + self.remoteIpAddress == b.remoteIpAddress and \ + self.protocol == b.protocol and \ + self.resourceType == b.resourceType + + def fromRequestWillBeSent (self, req): + """ Set request data from Chrome Network.requestWillBeSent event """ + r = req['request'] + + self.id = req['requestId'] + self.url = URL (r['url']) + self.resourceType = req.get ('type') + self._time = ReferenceTimestamp (req['timestamp'], req['wallTime']) + + assert self.request is None, req + self.request = Request () + self.request.initiator = req['initiator'] + self.request.headers = CIMultiDict (self._unfoldHeaders (r['headers'])) + self.request.hasPostData = r.get ('hasPostData', False) + self.request.method = r['method'] + self.request.timestamp = self._time (req['timestamp']) + if self.request.hasPostData: + postData = r.get ('postData') + if postData is not None: + self.request.body = UnicodeBody (postData) + + def fromResponse (self, r, timestamp=None, resourceType=None): + """ + Set response data from Chrome’s Response object. + + Request must exist. Updates if response was set before. Sometimes + fromResponseReceived is triggered twice by Chrome. No idea why. + """ + assert self.request is not None, (self.request, r) + + if not timestamp: + timestamp = self.request.timestamp - @property - def request (self): - return self.chromeRequest['request'] + self.remoteIpAddress = r.get ('remoteIPAddress') + self.protocol = r.get ('protocol') + if resourceType: + self.resourceType = resourceType - @property - def response (self): - return self.chromeResponse['response'] + # a response may contain updated request headers (i.e. those actually + # sent over the wire) + if 'requestHeaders' in r: + self.request.headers = CIMultiDict (self._unfoldHeaders (r['requestHeaders'])) - @property - def initiator (self): - return self.chromeRequest['initiator'] + self.response = Response () + self.response.headers = CIMultiDict (self._unfoldHeaders (r['headers'])) + self.response.status = r['status'] + self.response.statusText = r['statusText'] + self.response.timestamp = timestamp + self.response.mimeType = r['mimeType'] + + def fromResponseReceived (self, resp): + """ Set response data from Chrome Network.responseReceived """ + return self.fromResponse (resp['response'], + self._time (resp['timestamp']), resp['type']) + + def fromLoadingFinished (self, data): + self.response.bytesReceived = data['encodedDataLength'] + + def fromLoadingFailed (self, data): + self.response = None + + @staticmethod + def _unfoldHeaders (headers): + """ + A host may send multiple headers using the same key, which Chrome folds + into the same item. Separate those. + """ + items = [] + for k in headers.keys (): + for v in headers[k].split ('\n'): + items.append ((k, v)) + return items + + async def prefetchRequestBody (self, tab): + if self.request.hasPostData and self.request.body is None: + try: + postData = await tab.Network.getRequestPostData (requestId=self.id) + self.request.body = UnicodeBody (postData['postData']) + except TabException: + self.request.body = None + + async def prefetchResponseBody (self, tab): + """ Fetch response body """ + try: + body = await tab.Network.getResponseBody (requestId=self.id) + if body['base64Encoded']: + self.response.body = Base64Body (body['body']) + else: + self.response.body = UnicodeBody (body['body']) + except TabException: + self.response.body = None - @property - def id (self): - return self.chromeRequest['requestId'] +class NavigateError (IOError): + pass - @property - def encodedDataLength (self): - return self.chromeFinished['encodedDataLength'] +class PageIdle: + """ Page idle event """ - def setRequest (self, req): - self.chromeRequest = req + __slots__ = ('idle', ) - def setResponse (self, resp): - self.chromeResponse = resp + def __init__ (self, idle): + self.idle = idle - def setFinished (self, finished): - self.chromeFinished = finished + def __bool__ (self): + return self.idle + +class FrameNavigated: + __slots__ = ('id', 'url', 'mimeType') + + def __init__ (self, id, url, mimeType): + self.id = id + self.url = URL (url) + self.mimeType = mimeType class SiteLoader: """ @@ -74,252 +275,245 @@ class SiteLoader: XXX: track popup windows/new tabs and close them """ + __slots__ = ('requests', 'browser', 'logger', 'tab', '_iterRunning', + '_framesLoading', '_rootFrame') allowedSchemes = {'http', 'https'} - def __init__ (self, browser, url, logger=logging.getLogger(__name__)): + def __init__ (self, browser, logger): self.requests = {} - self.browser = browser - self.url = url - self.logger = logger - - self.tab = browser.new_tab() + self.browser = Browser (url=browser) + self.logger = logger.bind (context=type (self).__name__) + self._iterRunning = [] - def __enter__ (self): - tab = self.tab - # setup callbacks - tab.Network.requestWillBeSent = self._requestWillBeSent - tab.Network.responseReceived = self._responseReceived - tab.Network.loadingFinished = self._loadingFinished - tab.Network.loadingFailed = self._loadingFailed - tab.Log.entryAdded = self._entryAdded - #tab.Page.loadEventFired = loadEventFired + self._framesLoading = set () + self._rootFrame = None - # start the tab - tab.start() + async def __aenter__ (self): + tab = self.tab = await self.browser.__aenter__ () # enable events - tab.Log.enable () - tab.Network.enable() - tab.Page.enable () - tab.Network.clearBrowserCache () - if tab.Network.canClearBrowserCookies ()['result']: - tab.Network.clearBrowserCookies () - + await asyncio.gather (*[ + tab.Log.enable (), + tab.Network.enable(), + tab.Page.enable (), + tab.Inspector.enable (), + tab.Network.clearBrowserCache (), + tab.Network.clearBrowserCookies (), + ]) return self + async def __aexit__ (self, exc_type, exc_value, traceback): + for task in self._iterRunning: + # ignore any results from stuff we did not end up using anyway + if not task.done (): + task.cancel () + self._iterRunning = [] + await self.browser.__aexit__ (exc_type, exc_value, traceback) + self.tab = None + return False + def __len__ (self): return len (self.requests) - def start (self): - self.tab.Page.navigate(url=self.url) - - def wait (self, timeout=1): - self.tab.wait (timeout) - - def waitIdle (self, idleTimeout=1, maxTimeout=60): - step = 0 - for i in range (0, maxTimeout): - self.wait (1) - if len (self) == 0: - step += 1 - if step > idleTimeout: - break - else: - step = 0 - - def stop (self): - """ - Stop loading site - - XXX: stop executing scripts - """ - + async def __aiter__ (self): + """ Retrieve network items """ tab = self.tab - - tab.Page.stopLoading () - tab.Network.disable () - tab.Page.disable () - tab.Log.disable () - # XXX: we can’t drain the event queue directly, so insert (yet another) wait - tab.wait (1) - tab.Network.requestWillBeSent = None - tab.Network.responseReceived = None - tab.Network.loadingFinished = None - tab.Network.loadingFailed = None - tab.Page.loadEventFired = None - tab.Log.entryAdded = None - - def __exit__ (self, exc_type, exc_value, traceback): - self.tab.stop () - self.browser.close_tab(self.tab) - return False - - # overrideable callbacks - def loadingFinished (self, item, redirect=False): - pass - - def loadingFailed (self, item): - pass + assert tab is not None + handler = { + tab.Network.requestWillBeSent: self._requestWillBeSent, + tab.Network.responseReceived: self._responseReceived, + tab.Network.loadingFinished: self._loadingFinished, + tab.Network.loadingFailed: self._loadingFailed, + tab.Log.entryAdded: self._entryAdded, + tab.Page.javascriptDialogOpening: self._javascriptDialogOpening, + tab.Page.frameStartedLoading: self._frameStartedLoading, + tab.Page.frameStoppedLoading: self._frameStoppedLoading, + tab.Page.frameNavigated: self._frameNavigated, + } + + # The implementation is a little advanced. Why? The goal here is to + # process events from the tab as quickly as possible (i.e. + # asynchronously). We need to make sure that JavaScript dialogs are + # handled immediately for instance. Otherwise they stall every + # other request. Also, we don’t want to use an unbounded queue, + # since the items yielded can get quite big (response body). Thus + # we need to block (yield) for every item completed, but not + # handled by the consumer (caller). + running = self._iterRunning + tabGetTask = asyncio.ensure_future (self.tab.get ()) + running.append (tabGetTask) + while True: + done, pending = await asyncio.wait (running, return_when=asyncio.FIRST_COMPLETED) + for t in done: + result = t.result () + if result is None: + pass + elif t == tabGetTask: + method, data = result + f = handler.get (method, None) + if f is not None: + task = asyncio.ensure_future (f (**data)) + pending.add (task) + tabGetTask = asyncio.ensure_future (self.tab.get ()) + pending.add (tabGetTask) + else: + yield result + + running = pending + self._iterRunning = running + + async def navigate (self, url): + ret = await self.tab.Page.navigate(url=url) + self.logger.debug ('navigate', + uuid='9d47ded2-951f-4e09-86ee-fd4151e20666', result=ret) + if 'errorText' in ret: + raise NavigateError (ret['errorText']) + self._rootFrame = ret['frameId'] # internal chrome callbacks - def _requestWillBeSent (self, **kwargs): + async def _requestWillBeSent (self, **kwargs): + self.logger.debug ('requestWillBeSent', + uuid='b828d75a-650d-42d2-8c66-14f4547512da', args=kwargs) + reqId = kwargs['requestId'] req = kwargs['request'] + url = URL (req['url']) + logger = self.logger.bind (reqId=reqId, reqUrl=url) - url = urlsplit (req['url']) if url.scheme not in self.allowedSchemes: return + ret = None item = self.requests.get (reqId) if item: # redirects never “finish” loading, but yield another requestWillBeSent with this key set redirectResp = kwargs.get ('redirectResponse') if redirectResp: - # create fake responses - resp = {'requestId': reqId, 'response': redirectResp, 'timestamp': kwargs['timestamp']} - item.setResponse (resp) - resp = {'requestId': reqId, 'encodedDataLength': 0, 'timestamp': kwargs['timestamp']} - item.setFinished (resp) - self.loadingFinished (item, redirect=True) - self.logger.info ('redirected request {} has url {}'.format (reqId, req['url'])) + if item.url != url: + # this happens for unknown reasons. the docs simply state + # it can differ in case of a redirect. Fix it and move on. + logger.warning ('redirect url differs', + uuid='558a7df7-2258-4fe4-b16d-22b6019cc163', + expected=item.url) + redirectResp['url'] = str (item.url) + item.fromResponse (redirectResp) + logger.info ('redirect', uuid='85eaec41-e2a9-49c2-9445-6f19690278b8', target=url) + # XXX: queue this? no need to wait for it + await item.prefetchRequestBody (self.tab) + # cannot fetch response body due to race condition (item id reused) + ret = item else: - self.logger.warn ('request {} already exists, overwriting.'.format (reqId)) + logger.warning ('request exists', uuid='2c989142-ba00-4791-bb03-c2a14e91a56b') - item = Item () - item.setRequest (kwargs) + item = RequestResponsePair () + item.fromRequestWillBeSent (kwargs) self.requests[reqId] = item - def _responseReceived (self, **kwargs): + return ret + + async def _responseReceived (self, **kwargs): + self.logger.debug ('responseReceived', + uuid='ecd67e69-401a-41cb-b4ec-eeb1f1ec6abb', args=kwargs) + reqId = kwargs['requestId'] item = self.requests.get (reqId) if item is None: return resp = kwargs['response'] - url = urlsplit (resp['url']) + url = URL (resp['url']) + logger = self.logger.bind (reqId=reqId, respUrl=url) + if item.url != url: + logger.error ('url mismatch', uuid='7385f45f-0b06-4cbc-81f9-67bcd72ee7d0', respUrl=url) if url.scheme in self.allowedSchemes: - self.logger.info ('response {} {}'.format (reqId, resp['url'])) - item.setResponse (kwargs) + item.fromResponseReceived (kwargs) else: - self.logger.warn ('response: ignoring scheme {}'.format (url.scheme)) + logger.warning ('scheme forbidden', uuid='2ea6e5d7-dd3b-4881-b9de-156c1751c666') - def _loadingFinished (self, **kwargs): + async def _loadingFinished (self, **kwargs): """ Item was fully loaded. For some items the request body is not available when responseReceived is fired, thus move everything here. """ + self.logger.debug ('loadingFinished', + uuid='35479405-a5b5-4395-8c33-d3601d1796b9', args=kwargs) + reqId = kwargs['requestId'] item = self.requests.pop (reqId, None) if item is None: # we never recorded this request (blacklisted scheme, for example) return + if not item.response: + # chrome failed to send us a responseReceived event for this item, + # so we can’t record it (missing request/response headers) + self.logger.error ('response missing', + uuid='fac3ab96-3f9b-4c5a-95c7-f83b675cdcb9', requestId=item.id) + return + req = item.request - resp = item.response - assert req['url'] == resp['url'], 'req and resp urls are not the same {} vs {}'.format (req['url'], resp['url']) - url = urlsplit (resp['url']) - if url.scheme in self.allowedSchemes: - self.logger.info ('finished {} {}'.format (reqId, req['url'])) - item.setFinished (kwargs) - self.loadingFinished (item) + if item.url.scheme in self.allowedSchemes: + item.fromLoadingFinished (kwargs) + # XXX queue both + await asyncio.gather (item.prefetchRequestBody (self.tab), item.prefetchResponseBody (self.tab)) + return item + + async def _loadingFailed (self, **kwargs): + self.logger.info ('loadingFailed', + uuid='4a944e85-5fae-4aa6-9e7c-e578b29392e4', args=kwargs) - def _loadingFailed (self, **kwargs): reqId = kwargs['requestId'] - self.logger.warn ('failed {} {}'.format (reqId, kwargs['errorText'], kwargs.get ('blockedReason'))) + logger = self.logger.bind (reqId=reqId) item = self.requests.pop (reqId, None) - self.loadingFailed (item) + if item is not None: + item.fromLoadingFailed (kwargs) + return item - def _entryAdded (self, **kwargs): + async def _entryAdded (self, **kwargs): """ Log entry added """ entry = kwargs['entry'] - level = {'verbose': logging.DEBUG, 'info': logging.INFO, - 'warning': logging.WARNING, - 'error': logging.ERROR}[entry['level']] - self.logger.log (level, 'console: {}: {}'.format (entry['source'], entry['text']), extra=entry) - -class AccountingSiteLoader (SiteLoader): - """ - SiteLoader that keeps basic statistics about retrieved pages. - """ - - def __init__ (self, browser, url, logger=logging.getLogger(__name__)): - super ().__init__ (browser, url, logger) - - self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0} - - def loadingFinished (self, item, redirect=False): - super ().loadingFinished (item, redirect) - - self.stats['finished'] += 1 - self.stats['bytesRcv'] += item.encodedDataLength - - def loadingFailed (self, item): - super ().loadingFailed (item) - - self.stats['failed'] += 1 - - def _requestWillBeSent (self, **kwargs): - super ()._requestWillBeSent (**kwargs) - - self.stats['requests'] += 1 - -import subprocess -from tempfile import mkdtemp -from contextlib import contextmanager -import socket, shutil - -@contextmanager -def ChromeService (binary='google-chrome-stable', host='localhost', port=9222, windowSize=(1920, 1080)): - """ - Start Chrome with socket activation (i.e. pass listening socket). Polling - is not required with this method, since reads will block until Chrome is - ready. - """ - while True: - s = socket.socket () - s.setsockopt (socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - try: - s.bind ((host, port)) - break - except OSError: - # try different port - if port < 65000: - port += 1 - else: - raise - s.listen (10) - userDataDir = mkdtemp () - args = [binary, - '--window-size={},{}'.format (*windowSize), - '--user-data-dir={}'.format (userDataDir), # use temporory user dir - '--no-default-browser-check', - '--no-first-run', # don’t show first run screen - '--disable-breakpad', # no error reports - '--disable-extensions', - '--disable-infobars', - '--disable-notifications', # no libnotify - '--headless', - '--disable-gpu', - '--hide-scrollbars', # hide scrollbars on screenshots - '--mute-audio', # don’t play any audio - '--remote-debugging-socket-fd={}'.format (s.fileno ()), - '--homepage=about:blank', - 'about:blank'] - # start new session, so ^C does not affect subprocess - p = subprocess.Popen (args, pass_fds=[s.fileno()], start_new_session=True, - stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL) - - # must be wrapped in try-finally, otherwise code in __exit__/finally is not - # executed - try: - yield 'http://{}:{}'.format (host, port) - finally: - p.terminate () - p.wait () - shutil.rmtree (userDataDir) - -@contextmanager -def NullService (url): - yield url - + level = {'verbose': Level.DEBUG, 'info': Level.INFO, + 'warning': Level.WARNING, + 'error': Level.ERROR}.get (entry.pop ('level'), Level.INFO) + entry['uuid'] = 'e62ffb5a-0521-459c-a3d9-1124551934d2' + self.logger (level, 'console', **entry) + + async def _javascriptDialogOpening (self, **kwargs): + t = kwargs.get ('type') + if t in {'alert', 'confirm', 'prompt'}: + self.logger.info ('js dialog', + uuid='d6f07ce2-648e-493b-a1df-f353bed27c84', + action='cancel', type=t, message=kwargs.get ('message')) + await self.tab.Page.handleJavaScriptDialog (accept=False) + elif t == 'beforeunload': + # we must accept this one, otherwise the page will not unload/close + self.logger.info ('js dialog', + uuid='96399b99-9834-4c8f-bd93-cb9fa2225abd', + action='proceed', type=t, message=kwargs.get ('message')) + await self.tab.Page.handleJavaScriptDialog (accept=True) + else: # pragma: no cover + self.logger.warning ('js dialog unknown', + uuid='3ef7292e-8595-4e89-b834-0cc6bc40ee38', **kwargs) + + async def _frameStartedLoading (self, **kwargs): + self.logger.debug ('frameStartedLoading', + uuid='bbeb39c0-3304-4221-918e-f26bd443c566', args=kwargs) + + self._framesLoading.add (kwargs['frameId']) + return PageIdle (False) + + async def _frameStoppedLoading (self, **kwargs): + self.logger.debug ('frameStoppedLoading', + uuid='fcbe8110-511c-4cbb-ac2b-f61a5782c5a0', args=kwargs) + + self._framesLoading.remove (kwargs['frameId']) + if not self._framesLoading: + return PageIdle (True) + + async def _frameNavigated (self, **kwargs): + self.logger.debug ('frameNavigated', + uuid='0e876f7d-7129-4612-8632-686f42ac6e1f', args=kwargs) + frame = kwargs['frame'] + if self._rootFrame == frame['id']: + assert frame.get ('parentId', None) is None, "root frame must not have a parent" + return FrameNavigated (frame['id'], frame['url'], frame['mimeType']) |