diff options
Diffstat (limited to 'crocoite/warc.py')
-rw-r--r-- | crocoite/warc.py | 413 |
1 files changed, 210 insertions, 203 deletions
diff --git a/crocoite/warc.py b/crocoite/warc.py index e04bee4..415b487 100644 --- a/crocoite/warc.py +++ b/crocoite/warc.py @@ -22,208 +22,131 @@ Classes writing data to WARC files """ -import logging -import json -from http.server import BaseHTTPRequestHandler -from base64 import b64decode +import json, threading from io import BytesIO -from warcio.statusandheaders import StatusAndHeaders -from urllib.parse import urlsplit -from logging.handlers import BufferingHandler -import pychrome from datetime import datetime -from threading import Thread -from queue import Queue +from http.server import BaseHTTPRequestHandler from warcio.timeutils import datetime_to_iso_date from warcio.warcwriter import WARCWriter - -from .browser import AccountingSiteLoader -from .util import packageUrl -from . import defaults - -class SerializingWARCWriter (WARCWriter): - """ - Serializing WARC writer using separate writer thread and queue for - non-blocking operation - - Needs an explicit .flush() before deletion. - """ - - def __init__ (self, filebuf, *args, **kwargs): - WARCWriter.__init__ (self, filebuf, *args, **kwargs) - self.queue = Queue () - self.thread = Thread (target=self._run_writer) - self.thread.start () - - def flush (self): - self.queue.put (None) - self.thread.join () - self.queue = None - self.thread = None - - def _run_writer (self): - while True: - item = self.queue.get () - if not item: - break - out, record = item - WARCWriter._write_warc_record (self, out, record) - - def _write_warc_record (self, out, record): - self.queue.put ((out, record)) - -class WARCLogHandler (BufferingHandler): - """ - Buffered log handler, flushing to warcio - """ - - contentType = 'text/plain; charset=utf-8' - - def __init__ (self, capacity, warcfile): - BufferingHandler.__init__ (self, capacity) - self.warcfile = warcfile - - def flush (self): - self.acquire () - try: - if self.buffer: - buf = '' - for record in self.buffer: - buf += self.format (record) - buf += '\n' - # XXX: record type? - record = self.warcfile.create_warc_record ( - packageUrl ('log'), 'metadata', - payload=BytesIO (buf.encode ('utf8')), - warc_headers_dict={'Content-Type': self.contentType}) - self.warcfile.write_record(record) - self.buffer = [] - finally: - self.release () - -class WarcLoader (AccountingSiteLoader): - def __init__ (self, browser, url, writer, - logger=logging.getLogger(__name__), - logBuffer=defaults.logBuffer, - maxBodySize=defaults.maxBodySize): - super ().__init__ (browser, url, logger) - self.writer = writer - self.maxBodySize = maxBodySize - self.warcLogger = WARCLogHandler (logBuffer, writer) - self.logger.addHandler (self.warcLogger) - - def __exit__ (self, exc_type, exc_value, traceback): - self.logger.removeHandler (self.warcLogger) - self.warcLogger.flush () - return super ().__exit__ (exc_type, exc_value, traceback) - - @staticmethod - def getStatusText (response): - text = response.get ('statusText') - if text: - return text - text = BaseHTTPRequestHandler.responses.get (response['status']) - if text: - return text[0] - return 'No status text available' - - @staticmethod - def _unfoldHeaders (headers): +from warcio.statusandheaders import StatusAndHeaders +from yarl import URL + +from .util import StrJsonEncoder +from .controller import EventHandler, ControllerStart +from .behavior import Script, DomSnapshotEvent, ScreenshotEvent +from .browser import RequestResponsePair, UnicodeBody + +# the official mimetype for json, according to https://tools.ietf.org/html/rfc8259 +jsonMime = 'application/json' +# mime for javascript, according to https://tools.ietf.org/html/rfc4329#section-7.2 +jsMime = 'application/javascript' + +def makeContentType (mime, charset=None): + """ Create value of Content-Type WARC header with optional charset """ + s = [mime] + if charset: + s.extend (['; charset=', charset]) + return ''.join (s) + +class WarcHandler (EventHandler): + __slots__ = ('logger', 'writer', 'documentRecords', 'log', + 'maxLogSize', 'logEncoding', 'warcinfoRecordId') + + def __init__ (self, fd, logger): + self.logger = logger + self.writer = WARCWriter (fd, gzip=True) + + self.logEncoding = 'utf-8' + self.log = BytesIO () + # max log buffer size (bytes) + self.maxLogSize = 500*1024 + + # maps document urls to WARC record ids, required for DomSnapshotEvent + # and ScreenshotEvent + self.documentRecords = {} + # record id of warcinfo record + self.warcinfoRecordId = None + + def __enter__ (self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self._flushLogEntries () + + def writeRecord (self, url, kind, payload, warc_headers_dict=None, http_headers=None): """ - A host may send multiple headers using the same key, which Chrome folds - into the same item. Separate those. + Thin wrapper around writer.create_warc_record and writer.write_record. + + Adds default WARC headers. """ - items = [] - for k in headers.keys (): - for v in headers[k].split ('\n'): - items.append ((k, v)) - return items + assert url is None or isinstance (url, URL) + + d = {} + if self.warcinfoRecordId: + d['WARC-Warcinfo-ID'] = self.warcinfoRecordId + d.update (warc_headers_dict) + warc_headers_dict = d + + record = self.writer.create_warc_record (str (url) if url else '', + kind, + payload=payload, + warc_headers_dict=warc_headers_dict, + http_headers=http_headers) + self.writer.write_record (record) + + return record def _writeRequest (self, item): - writer = self.writer + logger = self.logger.bind (reqId=item.id) req = item.request - resp = item.response - url = urlsplit (resp['url']) - - # overwrite request headers with those actually sent - newReqHeaders = resp.get ('requestHeaders') - if newReqHeaders: - req['headers'] = newReqHeaders - - postData = req.get ('postData') - if postData: - postData = BytesIO (postData.encode ('utf8')) - path = url.path - if url.query: - path += '?' + url.query - httpHeaders = StatusAndHeaders('{} {} HTTP/1.1'.format (req['method'], path), - self._unfoldHeaders (req['headers']), protocol='HTTP/1.1', is_http_request=True) - initiator = item.initiator + url = item.url + + path = url.relative().with_fragment(None) + httpHeaders = StatusAndHeaders(f'{req.method} {path} HTTP/1.1', + req.headers, protocol='HTTP/1.1', is_http_request=True) warcHeaders = { - 'X-Chrome-Initiator': json.dumps (initiator), - 'WARC-Date': datetime_to_iso_date (datetime.utcfromtimestamp (item.chromeRequest['wallTime'])), + # required to correlate request with log entries + 'X-Chrome-Request-ID': item.id, + 'WARC-Date': datetime_to_iso_date (req.timestamp), } - record = writer.create_warc_record(req['url'], 'request', - payload=postData, http_headers=httpHeaders, - warc_headers_dict=warcHeaders) - writer.write_record(record) + body = item.request.body + if item.request.hasPostData and body is None: + # oops, don’t know what went wrong here + logger.error ('requestBody missing', + uuid='ee9adc58-e723-4595-9feb-312a67ead6a0') + warcHeaders['WARC-Truncated'] = 'unspecified' + else: + body = BytesIO (body) + record = self.writeRecord (url, 'request', + payload=body, http_headers=httpHeaders, + warc_headers_dict=warcHeaders) return record.rec_headers['WARC-Record-ID'] - def _getBody (self, item, redirect): + def _writeResponse (self, item, concurrentTo): + # fetch the body reqId = item.id - resp = item.response - - rawBody = b'' - base64Encoded = False - if redirect: - # redirects reuse the same request, thus we cannot safely retrieve - # the body (i.e getResponseBody may return the new location’s - # body). This is fine. - pass - elif item.encodedDataLength > self.maxBodySize: - # check body size first, since we’re loading everything into memory - raise ValueError ('body for {} too large {} vs {}'.format (reqId, - item.encodedDataLength, self.maxBodySize)) - else: - try: - body = self.tab.Network.getResponseBody (requestId=reqId) - rawBody = body['body'] - base64Encoded = body['base64Encoded'] - if base64Encoded: - rawBody = b64decode (rawBody) - else: - rawBody = rawBody.encode ('utf8') - except pychrome.exceptions.CallMethodException: - raise ValueError ('no data for {} {} {}'.format (resp['url'], - resp['status'], reqId)) - return rawBody, base64Encoded - - def _writeResponse (self, item, redirect, concurrentTo, rawBody, base64Encoded): - writer = self.writer - reqId = item.id - resp = item.response # now the response + resp = item.response warcHeaders = { 'WARC-Concurrent-To': concurrentTo, - 'WARC-IP-Address': resp.get ('remoteIPAddress', ''), - 'X-Chrome-Protocol': resp.get ('protocol', ''), - 'X-Chrome-FromDiskCache': str (resp.get ('fromDiskCache')), - 'X-Chrome-ConnectionReused': str (resp.get ('connectionReused')), - 'X-Chrome-Base64Body': str (base64Encoded), - 'WARC-Date': datetime_to_iso_date (datetime.utcfromtimestamp ( - item.chromeRequest['wallTime']+ - (item.chromeResponse['timestamp']-item.chromeRequest['timestamp']))), + # required to correlate request with log entries + 'X-Chrome-Request-ID': item.id, + 'WARC-Date': datetime_to_iso_date (resp.timestamp), } + # conditional WARC headers + if item.remoteIpAddress: + warcHeaders['WARC-IP-Address'] = item.remoteIpAddress - - - httpHeaders = StatusAndHeaders('{} {}'.format (resp['status'], - self.getStatusText (resp)), self._unfoldHeaders (resp['headers']), - protocol='HTTP/1.1') + # HTTP headers + statusText = resp.statusText or \ + BaseHTTPRequestHandler.responses.get ( + resp.status, ('No status text available', ))[0] + httpHeaders = StatusAndHeaders(f'{resp.status} {statusText}', + resp.headers, protocol='HTTP/1.1') # Content is saved decompressed and decoded, remove these headers blacklistedHeaders = {'transfer-encoding', 'content-encoding'} @@ -233,34 +156,118 @@ class WarcLoader (AccountingSiteLoader): # chrome sends nothing but utf8 encoded text. Fortunately HTTP # headers take precedence over the document’s <meta>, thus we can # easily override those. - contentType = resp.get ('mimeType') - if contentType: - if not base64Encoded: - contentType += '; charset=utf-8' - httpHeaders.replace_header ('content-type', contentType) - - httpHeaders.replace_header ('content-length', '{:d}'.format (len (rawBody))) + if resp.mimeType: + charset = 'utf-8' if isinstance (resp.body, UnicodeBody) else None + contentType = makeContentType (resp.mimeType, charset=charset) + httpHeaders.replace_header ('Content-Type', contentType) + + # response body + body = resp.body + if body is None: + warcHeaders['WARC-Truncated'] = 'unspecified' + else: + httpHeaders.replace_header ('Content-Length', str (len (body))) + body = BytesIO (body) - record = writer.create_warc_record(resp['url'], 'response', - warc_headers_dict=warcHeaders, payload=BytesIO (rawBody), + record = self.writeRecord (item.url, 'response', + warc_headers_dict=warcHeaders, payload=body, http_headers=httpHeaders) - writer.write_record(record) - def loadingFinished (self, item, redirect=False): - super ().loadingFinished (item, redirect) + if item.resourceType == 'Document': + self.documentRecords[item.url] = record.rec_headers.get_header ('WARC-Record-ID') + def _writeScript (self, item): writer = self.writer + encoding = 'utf-8' + # XXX: yes, we’re leaking information about the user here, but this is + # the one and only source URL of the scripts. + uri = URL(f'file://{item.abspath}') if item.path else None + self.writeRecord (uri, 'resource', + payload=BytesIO (str (item).encode (encoding)), + warc_headers_dict={ + 'Content-Type': makeContentType (jsMime, encoding), + 'X-Crocoite-Type': 'script', + }) + + def _writeItem (self, item): + assert item.request + concurrentTo = self._writeRequest (item) + # items that failed loading don’t have a response + if item.response: + self._writeResponse (item, concurrentTo) + + def _addRefersTo (self, headers, url): + refersTo = self.documentRecords.get (url) + if refersTo: + headers['WARC-Refers-To'] = refersTo + else: + self.logger.error (f'No document record found for {url}') + return headers - req = item.request - reqId = item.id - resp = item.response - url = urlsplit (resp['url']) - - try: - # write neither request nor response if we cannot retrieve the body - rawBody, base64Encoded = self._getBody (item, redirect) - concurrentTo = self._writeRequest (item) - self._writeResponse (item, redirect, concurrentTo, rawBody, base64Encoded) - except ValueError as e: - self.logger.error (e.args[0]) + def _writeDomSnapshot (self, item): + writer = self.writer + + warcHeaders = { + 'X-Crocoite-Type': 'dom-snapshot', + 'X-Chrome-Viewport': item.viewport, + 'Content-Type': makeContentType ('text/html', 'utf-8') + } + + self._addRefersTo (warcHeaders, item.url) + + self.writeRecord (item.url, 'conversion', + payload=BytesIO (item.document), + warc_headers_dict=warcHeaders) + + def _writeScreenshot (self, item): + writer = self.writer + warcHeaders = { + 'Content-Type': makeContentType ('image/png'), + 'X-Crocoite-Screenshot-Y-Offset': str (item.yoff), + 'X-Crocoite-Type': 'screenshot', + } + self._addRefersTo (warcHeaders, item.url) + self.writeRecord (item.url, 'conversion', + payload=BytesIO (item.data), warc_headers_dict=warcHeaders) + + def _writeControllerStart (self, item, encoding='utf-8'): + payload = BytesIO (json.dumps (item.payload, indent=2, cls=StrJsonEncoder).encode (encoding)) + + writer = self.writer + warcinfo = self.writeRecord (None, 'warcinfo', + warc_headers_dict={'Content-Type': makeContentType (jsonMime, encoding)}, + payload=payload) + self.warcinfoRecordId = warcinfo.rec_headers['WARC-Record-ID'] + + def _flushLogEntries (self): + if self.log.tell () > 0: + writer = self.writer + self.log.seek (0) + warcHeaders = { + 'Content-Type': makeContentType (jsonMime, self.logEncoding), + 'X-Crocoite-Type': 'log', + } + self.writeRecord (None, 'metadata', payload=self.log, + warc_headers_dict=warcHeaders) + self.log = BytesIO () + + def _writeLog (self, item): + """ Handle log entries, called by .logger.WarcHandlerConsumer only """ + self.log.write (item.encode (self.logEncoding)) + self.log.write (b'\n') + if self.log.tell () > self.maxLogSize: + self._flushLogEntries () + + route = {Script: _writeScript, + RequestResponsePair: _writeItem, + DomSnapshotEvent: _writeDomSnapshot, + ScreenshotEvent: _writeScreenshot, + ControllerStart: _writeControllerStart, + } + + async def push (self, item): + for k, v in self.route.items (): + if isinstance (item, k): + v (self, item) + break |