summaryrefslogtreecommitdiff
path: root/crocoite/warc.py
diff options
context:
space:
mode:
Diffstat (limited to 'crocoite/warc.py')
-rw-r--r--crocoite/warc.py413
1 files changed, 210 insertions, 203 deletions
diff --git a/crocoite/warc.py b/crocoite/warc.py
index e04bee4..415b487 100644
--- a/crocoite/warc.py
+++ b/crocoite/warc.py
@@ -22,208 +22,131 @@
Classes writing data to WARC files
"""
-import logging
-import json
-from http.server import BaseHTTPRequestHandler
-from base64 import b64decode
+import json, threading
from io import BytesIO
-from warcio.statusandheaders import StatusAndHeaders
-from urllib.parse import urlsplit
-from logging.handlers import BufferingHandler
-import pychrome
from datetime import datetime
-from threading import Thread
-from queue import Queue
+from http.server import BaseHTTPRequestHandler
from warcio.timeutils import datetime_to_iso_date
from warcio.warcwriter import WARCWriter
-
-from .browser import AccountingSiteLoader
-from .util import packageUrl
-from . import defaults
-
-class SerializingWARCWriter (WARCWriter):
- """
- Serializing WARC writer using separate writer thread and queue for
- non-blocking operation
-
- Needs an explicit .flush() before deletion.
- """
-
- def __init__ (self, filebuf, *args, **kwargs):
- WARCWriter.__init__ (self, filebuf, *args, **kwargs)
- self.queue = Queue ()
- self.thread = Thread (target=self._run_writer)
- self.thread.start ()
-
- def flush (self):
- self.queue.put (None)
- self.thread.join ()
- self.queue = None
- self.thread = None
-
- def _run_writer (self):
- while True:
- item = self.queue.get ()
- if not item:
- break
- out, record = item
- WARCWriter._write_warc_record (self, out, record)
-
- def _write_warc_record (self, out, record):
- self.queue.put ((out, record))
-
-class WARCLogHandler (BufferingHandler):
- """
- Buffered log handler, flushing to warcio
- """
-
- contentType = 'text/plain; charset=utf-8'
-
- def __init__ (self, capacity, warcfile):
- BufferingHandler.__init__ (self, capacity)
- self.warcfile = warcfile
-
- def flush (self):
- self.acquire ()
- try:
- if self.buffer:
- buf = ''
- for record in self.buffer:
- buf += self.format (record)
- buf += '\n'
- # XXX: record type?
- record = self.warcfile.create_warc_record (
- packageUrl ('log'), 'metadata',
- payload=BytesIO (buf.encode ('utf8')),
- warc_headers_dict={'Content-Type': self.contentType})
- self.warcfile.write_record(record)
- self.buffer = []
- finally:
- self.release ()
-
-class WarcLoader (AccountingSiteLoader):
- def __init__ (self, browser, url, writer,
- logger=logging.getLogger(__name__),
- logBuffer=defaults.logBuffer,
- maxBodySize=defaults.maxBodySize):
- super ().__init__ (browser, url, logger)
- self.writer = writer
- self.maxBodySize = maxBodySize
- self.warcLogger = WARCLogHandler (logBuffer, writer)
- self.logger.addHandler (self.warcLogger)
-
- def __exit__ (self, exc_type, exc_value, traceback):
- self.logger.removeHandler (self.warcLogger)
- self.warcLogger.flush ()
- return super ().__exit__ (exc_type, exc_value, traceback)
-
- @staticmethod
- def getStatusText (response):
- text = response.get ('statusText')
- if text:
- return text
- text = BaseHTTPRequestHandler.responses.get (response['status'])
- if text:
- return text[0]
- return 'No status text available'
-
- @staticmethod
- def _unfoldHeaders (headers):
+from warcio.statusandheaders import StatusAndHeaders
+from yarl import URL
+
+from .util import StrJsonEncoder
+from .controller import EventHandler, ControllerStart
+from .behavior import Script, DomSnapshotEvent, ScreenshotEvent
+from .browser import RequestResponsePair, UnicodeBody
+
+# the official mimetype for json, according to https://tools.ietf.org/html/rfc8259
+jsonMime = 'application/json'
+# mime for javascript, according to https://tools.ietf.org/html/rfc4329#section-7.2
+jsMime = 'application/javascript'
+
+def makeContentType (mime, charset=None):
+ """ Create value of Content-Type WARC header with optional charset """
+ s = [mime]
+ if charset:
+ s.extend (['; charset=', charset])
+ return ''.join (s)
+
+class WarcHandler (EventHandler):
+ __slots__ = ('logger', 'writer', 'documentRecords', 'log',
+ 'maxLogSize', 'logEncoding', 'warcinfoRecordId')
+
+ def __init__ (self, fd, logger):
+ self.logger = logger
+ self.writer = WARCWriter (fd, gzip=True)
+
+ self.logEncoding = 'utf-8'
+ self.log = BytesIO ()
+ # max log buffer size (bytes)
+ self.maxLogSize = 500*1024
+
+ # maps document urls to WARC record ids, required for DomSnapshotEvent
+ # and ScreenshotEvent
+ self.documentRecords = {}
+ # record id of warcinfo record
+ self.warcinfoRecordId = None
+
+ def __enter__ (self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self._flushLogEntries ()
+
+ def writeRecord (self, url, kind, payload, warc_headers_dict=None, http_headers=None):
"""
- A host may send multiple headers using the same key, which Chrome folds
- into the same item. Separate those.
+ Thin wrapper around writer.create_warc_record and writer.write_record.
+
+ Adds default WARC headers.
"""
- items = []
- for k in headers.keys ():
- for v in headers[k].split ('\n'):
- items.append ((k, v))
- return items
+ assert url is None or isinstance (url, URL)
+
+ d = {}
+ if self.warcinfoRecordId:
+ d['WARC-Warcinfo-ID'] = self.warcinfoRecordId
+ d.update (warc_headers_dict)
+ warc_headers_dict = d
+
+ record = self.writer.create_warc_record (str (url) if url else '',
+ kind,
+ payload=payload,
+ warc_headers_dict=warc_headers_dict,
+ http_headers=http_headers)
+ self.writer.write_record (record)
+
+ return record
def _writeRequest (self, item):
- writer = self.writer
+ logger = self.logger.bind (reqId=item.id)
req = item.request
- resp = item.response
- url = urlsplit (resp['url'])
-
- # overwrite request headers with those actually sent
- newReqHeaders = resp.get ('requestHeaders')
- if newReqHeaders:
- req['headers'] = newReqHeaders
-
- postData = req.get ('postData')
- if postData:
- postData = BytesIO (postData.encode ('utf8'))
- path = url.path
- if url.query:
- path += '?' + url.query
- httpHeaders = StatusAndHeaders('{} {} HTTP/1.1'.format (req['method'], path),
- self._unfoldHeaders (req['headers']), protocol='HTTP/1.1', is_http_request=True)
- initiator = item.initiator
+ url = item.url
+
+ path = url.relative().with_fragment(None)
+ httpHeaders = StatusAndHeaders(f'{req.method} {path} HTTP/1.1',
+ req.headers, protocol='HTTP/1.1', is_http_request=True)
warcHeaders = {
- 'X-Chrome-Initiator': json.dumps (initiator),
- 'WARC-Date': datetime_to_iso_date (datetime.utcfromtimestamp (item.chromeRequest['wallTime'])),
+ # required to correlate request with log entries
+ 'X-Chrome-Request-ID': item.id,
+ 'WARC-Date': datetime_to_iso_date (req.timestamp),
}
- record = writer.create_warc_record(req['url'], 'request',
- payload=postData, http_headers=httpHeaders,
- warc_headers_dict=warcHeaders)
- writer.write_record(record)
+ body = item.request.body
+ if item.request.hasPostData and body is None:
+ # oops, don’t know what went wrong here
+ logger.error ('requestBody missing',
+ uuid='ee9adc58-e723-4595-9feb-312a67ead6a0')
+ warcHeaders['WARC-Truncated'] = 'unspecified'
+ else:
+ body = BytesIO (body)
+ record = self.writeRecord (url, 'request',
+ payload=body, http_headers=httpHeaders,
+ warc_headers_dict=warcHeaders)
return record.rec_headers['WARC-Record-ID']
- def _getBody (self, item, redirect):
+ def _writeResponse (self, item, concurrentTo):
+ # fetch the body
reqId = item.id
- resp = item.response
-
- rawBody = b''
- base64Encoded = False
- if redirect:
- # redirects reuse the same request, thus we cannot safely retrieve
- # the body (i.e getResponseBody may return the new location’s
- # body). This is fine.
- pass
- elif item.encodedDataLength > self.maxBodySize:
- # check body size first, since we’re loading everything into memory
- raise ValueError ('body for {} too large {} vs {}'.format (reqId,
- item.encodedDataLength, self.maxBodySize))
- else:
- try:
- body = self.tab.Network.getResponseBody (requestId=reqId)
- rawBody = body['body']
- base64Encoded = body['base64Encoded']
- if base64Encoded:
- rawBody = b64decode (rawBody)
- else:
- rawBody = rawBody.encode ('utf8')
- except pychrome.exceptions.CallMethodException:
- raise ValueError ('no data for {} {} {}'.format (resp['url'],
- resp['status'], reqId))
- return rawBody, base64Encoded
-
- def _writeResponse (self, item, redirect, concurrentTo, rawBody, base64Encoded):
- writer = self.writer
- reqId = item.id
- resp = item.response
# now the response
+ resp = item.response
warcHeaders = {
'WARC-Concurrent-To': concurrentTo,
- 'WARC-IP-Address': resp.get ('remoteIPAddress', ''),
- 'X-Chrome-Protocol': resp.get ('protocol', ''),
- 'X-Chrome-FromDiskCache': str (resp.get ('fromDiskCache')),
- 'X-Chrome-ConnectionReused': str (resp.get ('connectionReused')),
- 'X-Chrome-Base64Body': str (base64Encoded),
- 'WARC-Date': datetime_to_iso_date (datetime.utcfromtimestamp (
- item.chromeRequest['wallTime']+
- (item.chromeResponse['timestamp']-item.chromeRequest['timestamp']))),
+ # required to correlate request with log entries
+ 'X-Chrome-Request-ID': item.id,
+ 'WARC-Date': datetime_to_iso_date (resp.timestamp),
}
+ # conditional WARC headers
+ if item.remoteIpAddress:
+ warcHeaders['WARC-IP-Address'] = item.remoteIpAddress
-
-
- httpHeaders = StatusAndHeaders('{} {}'.format (resp['status'],
- self.getStatusText (resp)), self._unfoldHeaders (resp['headers']),
- protocol='HTTP/1.1')
+ # HTTP headers
+ statusText = resp.statusText or \
+ BaseHTTPRequestHandler.responses.get (
+ resp.status, ('No status text available', ))[0]
+ httpHeaders = StatusAndHeaders(f'{resp.status} {statusText}',
+ resp.headers, protocol='HTTP/1.1')
# Content is saved decompressed and decoded, remove these headers
blacklistedHeaders = {'transfer-encoding', 'content-encoding'}
@@ -233,34 +156,118 @@ class WarcLoader (AccountingSiteLoader):
# chrome sends nothing but utf8 encoded text. Fortunately HTTP
# headers take precedence over the document’s <meta>, thus we can
# easily override those.
- contentType = resp.get ('mimeType')
- if contentType:
- if not base64Encoded:
- contentType += '; charset=utf-8'
- httpHeaders.replace_header ('content-type', contentType)
-
- httpHeaders.replace_header ('content-length', '{:d}'.format (len (rawBody)))
+ if resp.mimeType:
+ charset = 'utf-8' if isinstance (resp.body, UnicodeBody) else None
+ contentType = makeContentType (resp.mimeType, charset=charset)
+ httpHeaders.replace_header ('Content-Type', contentType)
+
+ # response body
+ body = resp.body
+ if body is None:
+ warcHeaders['WARC-Truncated'] = 'unspecified'
+ else:
+ httpHeaders.replace_header ('Content-Length', str (len (body)))
+ body = BytesIO (body)
- record = writer.create_warc_record(resp['url'], 'response',
- warc_headers_dict=warcHeaders, payload=BytesIO (rawBody),
+ record = self.writeRecord (item.url, 'response',
+ warc_headers_dict=warcHeaders, payload=body,
http_headers=httpHeaders)
- writer.write_record(record)
- def loadingFinished (self, item, redirect=False):
- super ().loadingFinished (item, redirect)
+ if item.resourceType == 'Document':
+ self.documentRecords[item.url] = record.rec_headers.get_header ('WARC-Record-ID')
+ def _writeScript (self, item):
writer = self.writer
+ encoding = 'utf-8'
+ # XXX: yes, we’re leaking information about the user here, but this is
+ # the one and only source URL of the scripts.
+ uri = URL(f'file://{item.abspath}') if item.path else None
+ self.writeRecord (uri, 'resource',
+ payload=BytesIO (str (item).encode (encoding)),
+ warc_headers_dict={
+ 'Content-Type': makeContentType (jsMime, encoding),
+ 'X-Crocoite-Type': 'script',
+ })
+
+ def _writeItem (self, item):
+ assert item.request
+ concurrentTo = self._writeRequest (item)
+ # items that failed loading don’t have a response
+ if item.response:
+ self._writeResponse (item, concurrentTo)
+
+ def _addRefersTo (self, headers, url):
+ refersTo = self.documentRecords.get (url)
+ if refersTo:
+ headers['WARC-Refers-To'] = refersTo
+ else:
+ self.logger.error (f'No document record found for {url}')
+ return headers
- req = item.request
- reqId = item.id
- resp = item.response
- url = urlsplit (resp['url'])
-
- try:
- # write neither request nor response if we cannot retrieve the body
- rawBody, base64Encoded = self._getBody (item, redirect)
- concurrentTo = self._writeRequest (item)
- self._writeResponse (item, redirect, concurrentTo, rawBody, base64Encoded)
- except ValueError as e:
- self.logger.error (e.args[0])
+ def _writeDomSnapshot (self, item):
+ writer = self.writer
+
+ warcHeaders = {
+ 'X-Crocoite-Type': 'dom-snapshot',
+ 'X-Chrome-Viewport': item.viewport,
+ 'Content-Type': makeContentType ('text/html', 'utf-8')
+ }
+
+ self._addRefersTo (warcHeaders, item.url)
+
+ self.writeRecord (item.url, 'conversion',
+ payload=BytesIO (item.document),
+ warc_headers_dict=warcHeaders)
+
+ def _writeScreenshot (self, item):
+ writer = self.writer
+ warcHeaders = {
+ 'Content-Type': makeContentType ('image/png'),
+ 'X-Crocoite-Screenshot-Y-Offset': str (item.yoff),
+ 'X-Crocoite-Type': 'screenshot',
+ }
+ self._addRefersTo (warcHeaders, item.url)
+ self.writeRecord (item.url, 'conversion',
+ payload=BytesIO (item.data), warc_headers_dict=warcHeaders)
+
+ def _writeControllerStart (self, item, encoding='utf-8'):
+ payload = BytesIO (json.dumps (item.payload, indent=2, cls=StrJsonEncoder).encode (encoding))
+
+ writer = self.writer
+ warcinfo = self.writeRecord (None, 'warcinfo',
+ warc_headers_dict={'Content-Type': makeContentType (jsonMime, encoding)},
+ payload=payload)
+ self.warcinfoRecordId = warcinfo.rec_headers['WARC-Record-ID']
+
+ def _flushLogEntries (self):
+ if self.log.tell () > 0:
+ writer = self.writer
+ self.log.seek (0)
+ warcHeaders = {
+ 'Content-Type': makeContentType (jsonMime, self.logEncoding),
+ 'X-Crocoite-Type': 'log',
+ }
+ self.writeRecord (None, 'metadata', payload=self.log,
+ warc_headers_dict=warcHeaders)
+ self.log = BytesIO ()
+
+ def _writeLog (self, item):
+ """ Handle log entries, called by .logger.WarcHandlerConsumer only """
+ self.log.write (item.encode (self.logEncoding))
+ self.log.write (b'\n')
+ if self.log.tell () > self.maxLogSize:
+ self._flushLogEntries ()
+
+ route = {Script: _writeScript,
+ RequestResponsePair: _writeItem,
+ DomSnapshotEvent: _writeDomSnapshot,
+ ScreenshotEvent: _writeScreenshot,
+ ControllerStart: _writeControllerStart,
+ }
+
+ async def push (self, item):
+ for k, v in self.route.items ():
+ if isinstance (item, k):
+ v (self, item)
+ break