summaryrefslogtreecommitdiff
path: root/crocoite/warc.py
diff options
context:
space:
mode:
Diffstat (limited to 'crocoite/warc.py')
-rw-r--r--crocoite/warc.py165
1 files changed, 64 insertions, 101 deletions
diff --git a/crocoite/warc.py b/crocoite/warc.py
index 3fd65e4..fd8ce8d 100644
--- a/crocoite/warc.py
+++ b/crocoite/warc.py
@@ -24,7 +24,6 @@ Classes writing data to WARC files
import logging
import json
-from http.server import BaseHTTPRequestHandler
from io import BytesIO
from warcio.statusandheaders import StatusAndHeaders
from urllib.parse import urlsplit
@@ -36,95 +35,18 @@ from queue import Queue
from warcio.timeutils import datetime_to_iso_date
from warcio.warcwriter import WARCWriter
-from .browser import AccountingSiteLoader
from .util import packageUrl
-from .controller import defaultSettings
-
-class SerializingWARCWriter (WARCWriter):
- """
- Serializing WARC writer using separate writer thread and queue for
- non-blocking operation
-
- Needs an explicit .flush() before deletion.
- """
-
- def __init__ (self, filebuf, *args, **kwargs):
- WARCWriter.__init__ (self, filebuf, *args, **kwargs)
- self.queue = Queue ()
- self.thread = Thread (target=self._run_writer)
- self.thread.start ()
-
- def flush (self):
- self.queue.put (None)
- self.thread.join ()
- self.queue = None
- self.thread = None
-
- def _run_writer (self):
- while True:
- item = self.queue.get ()
- if not item:
- break
- out, record = item
- WARCWriter._write_warc_record (self, out, record)
-
- def _write_warc_record (self, out, record):
- self.queue.put ((out, record))
-
-class WARCLogHandler (BufferingHandler):
- """
- Buffered log handler, flushing to warcio
- """
+from .controller import defaultSettings, EventHandler, ControllerStart
+from .behavior import Script, DomSnapshotEvent, ScreenshotEvent
+from .browser import Item
- contentType = 'text/plain; charset=utf-8'
-
- def __init__ (self, capacity, warcfile):
- BufferingHandler.__init__ (self, capacity)
- self.warcfile = warcfile
-
- def flush (self):
- self.acquire ()
- try:
- if self.buffer:
- buf = ''
- for record in self.buffer:
- buf += self.format (record)
- buf += '\n'
- # XXX: record type?
- record = self.warcfile.create_warc_record (
- packageUrl ('log'), 'metadata',
- payload=BytesIO (buf.encode ('utf8')),
- warc_headers_dict={'Content-Type': self.contentType})
- self.warcfile.write_record(record)
- self.buffer = []
- finally:
- self.release ()
-
-class WarcLoader (AccountingSiteLoader):
- def __init__ (self, browser, url, writer,
+class WarcHandler (EventHandler):
+ def __init__ (self, fd,
logger=logging.getLogger(__name__),
- logBuffer=defaultSettings.logBuffer,
maxBodySize=defaultSettings.maxBodySize):
- super ().__init__ (browser, url, logger)
- self.writer = writer
+ self.logger = logger
+ self.writer = WARCWriter (fd, gzip=True)
self.maxBodySize = maxBodySize
- self.warcLogger = WARCLogHandler (logBuffer, writer)
- self.logger.addHandler (self.warcLogger)
-
- def __exit__ (self, exc_type, exc_value, traceback):
- self.logger.removeHandler (self.warcLogger)
- self.warcLogger.flush ()
- return super ().__exit__ (exc_type, exc_value, traceback)
-
- @staticmethod
- def getStatusText (response):
- text = response.get ('statusText')
- if text:
- return text
- text = BaseHTTPRequestHandler.responses.get (response['status'])
- if text:
- return text[0]
- return 'No status text available'
def _writeRequest (self, item):
writer = self.writer
@@ -154,13 +76,12 @@ class WarcLoader (AccountingSiteLoader):
return record.rec_headers['WARC-Record-ID']
- def _getBody (self, item, redirect):
+ def _getBody (self, item):
reqId = item.id
- resp = item.response
rawBody = b''
base64Encoded = False
- if redirect:
+ if item.isRedirect:
# redirects reuse the same request, thus we cannot safely retrieve
# the body (i.e getResponseBody may return the new location’s
# body). This is fine.
@@ -173,7 +94,7 @@ class WarcLoader (AccountingSiteLoader):
rawBody, base64Encoded = item.body
return rawBody, base64Encoded
- def _writeResponse (self, item, redirect, concurrentTo, rawBody, base64Encoded):
+ def _writeResponse (self, item, concurrentTo, rawBody, base64Encoded):
writer = self.writer
reqId = item.id
resp = item.response
@@ -192,7 +113,7 @@ class WarcLoader (AccountingSiteLoader):
}
httpHeaders = StatusAndHeaders('{} {}'.format (resp['status'],
- self.getStatusText (resp)), item.responseHeaders,
+ item.statusText), item.responseHeaders,
protocol='HTTP/1.1')
# Content is saved decompressed and decoded, remove these headers
@@ -216,21 +137,63 @@ class WarcLoader (AccountingSiteLoader):
http_headers=httpHeaders)
writer.write_record(record)
- def loadingFinished (self, item, redirect=False):
- super ().loadingFinished (item, redirect)
-
+ def _writeScript (self, item):
writer = self.writer
-
- req = item.request
- reqId = item.id
- resp = item.response
- url = urlsplit (resp['url'])
-
+ encoding = 'utf-8'
+ record = writer.create_warc_record (packageUrl ('script/{}'.format (item.path)), 'metadata',
+ payload=BytesIO (str (item).encode (encoding)),
+ warc_headers_dict={'Content-Type': 'application/javascript; charset={}'.format (encoding)})
+ writer.write_record (record)
+
+ def _writeItem (self, item):
+ if item.failed:
+ # should have been handled by the logger already
+ return
try:
# write neither request nor response if we cannot retrieve the body
- rawBody, base64Encoded = self._getBody (item, redirect)
+ rawBody, base64Encoded = self._getBody (item)
concurrentTo = self._writeRequest (item)
- self._writeResponse (item, redirect, concurrentTo, rawBody, base64Encoded)
+ self._writeResponse (item, concurrentTo, rawBody, base64Encoded)
except ValueError as e:
self.logger.error (e.args[0])
+ def _writeDomSnapshot (self, item):
+ writer = self.writer
+ httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1')
+ record = writer.create_warc_record (item.url, 'response',
+ payload=BytesIO (item.document),
+ http_headers=httpHeaders,
+ warc_headers_dict={'X-DOM-Snapshot': str (True),
+ 'X-Chrome-Viewport': item.viewport})
+ writer.write_record (record)
+
+ def _writeScreenshot (self, item):
+ writer = self.writer
+ url = packageUrl ('screenshot-{}-{}.png'.format (0, item.yoff))
+ record = writer.create_warc_record (url, 'resource',
+ payload=BytesIO (item.data), warc_headers_dict={'Content-Type': 'image/png'})
+ writer.write_record (record)
+
+ def _writeControllerStart (self, item):
+ writer = self.writer
+ warcinfo = writer.create_warcinfo_record (filename=None, info=item.payload)
+ writer.write_record (warcinfo)
+
+ route = {Script: _writeScript,
+ Item: _writeItem,
+ DomSnapshotEvent: _writeDomSnapshot,
+ ScreenshotEvent: _writeScreenshot,
+ ControllerStart: _writeControllerStart,
+ }
+
+ def push (self, item):
+ processed = False
+ for k, v in self.route.items ():
+ if isinstance (item, k):
+ v (self, item)
+ processed = True
+ break
+
+ if not processed:
+ self.logger.debug ('unknown event {}'.format (repr (item)))
+