diff options
author | Lars-Dominik Braun <lars@6xq.net> | 2017-12-19 10:52:23 +0100 |
---|---|---|
committer | Lars-Dominik Braun <lars@6xq.net> | 2017-12-19 10:52:23 +0100 |
commit | d20528cf1f5ee85162c449d9c74c2c862ab3d4ff (patch) | |
tree | d78cab05988dc2aa2161cd522a7c4781d487c0b6 /crocoite | |
parent | 8e939f8922815bd917f4dd750aa5f8a17a8f750c (diff) | |
download | crocoite-d20528cf1f5ee85162c449d9c74c2c862ab3d4ff.tar.gz crocoite-d20528cf1f5ee85162c449d9c74c2c862ab3d4ff.tar.bz2 crocoite-d20528cf1f5ee85162c449d9c74c2c862ab3d4ff.zip |
Serialize WARC writing
Logger and SiteWriter both access .write_record() concurrently, which
can corrupt WARC files. Move the writer to its own thread and decouple
it with a queue. Since we’re probably I/O-bound this may speed up
writeback as well.
Diffstat (limited to 'crocoite')
-rw-r--r-- | crocoite/cli.py | 6 | ||||
-rw-r--r-- | crocoite/warc.py | 35 |
2 files changed, 38 insertions, 3 deletions
diff --git a/crocoite/cli.py b/crocoite/cli.py index c085326..ac58bd1 100644 --- a/crocoite/cli.py +++ b/crocoite/cli.py @@ -28,7 +28,6 @@ from datetime import datetime from base64 import b64decode import pychrome from urllib.parse import urlsplit -from warcio.warcwriter import WARCWriter from warcio.statusandheaders import StatusAndHeaders from html5lib.serializer import HTMLSerializer @@ -36,7 +35,7 @@ from celery import Celery from celery.utils.log import get_task_logger from . import html, packageData, packageUrl -from .warc import WarcLoader +from .warc import WarcLoader, SerializingWARCWriter from .html import StripAttributeFilter, StripTagFilter, ChromeTreeWalker from .browser import ChromeService, NullService @@ -187,7 +186,7 @@ def archive (self, url, output, onload, onsnapshot, browser, fd = open (outPath, 'wb') else: fd = open (output, 'wb') - writer = WARCWriter (fd, gzip=True) + writer = SerializingWARCWriter (fd, gzip=True) with WarcLoader (browser, url, writer, logBuffer=logBuffer, maxBodySize=maxBodySize) as l: @@ -230,6 +229,7 @@ def archive (self, url, output, onload, onsnapshot, browser, if screenshot: self.update_state (state='PROGRESS', meta={'step': 'screenshot'}) writeScreenshot (l.tab, writer) + writer.flush () if not output: outPath = os.path.join (app.conf.finished_dir, outFile) os.rename (fd.name, outPath) diff --git a/crocoite/warc.py b/crocoite/warc.py index 8e443fd..92ae601 100644 --- a/crocoite/warc.py +++ b/crocoite/warc.py @@ -34,7 +34,42 @@ from urllib.parse import urlsplit from logging.handlers import BufferingHandler import pychrome from datetime import datetime +from threading import Thread +from queue import Queue + from warcio.timeutils import datetime_to_iso_date +from warcio.warcwriter import WARCWriter + +class SerializingWARCWriter (WARCWriter): + """ + Serializing WARC writer using separate writer thread and queue for + non-blocking operation + + Needs an explicit .flush() before deletion. + """ + + def __init__ (self, filebuf, *args, **kwargs): + WARCWriter.__init__ (self, filebuf, *args, **kwargs) + self.queue = Queue () + self.thread = Thread (target=self._run_writer) + self.thread.start () + + def flush (self): + self.queue.put (None) + self.thread.join () + self.queue = None + self.thread = None + + def _run_writer (self): + while True: + item = self.queue.get () + if not item: + break + out, record = item + WARCWriter._write_warc_record (self, out, record) + + def _write_warc_record (self, out, record): + self.queue.put ((out, record)) class WARCLogHandler (BufferingHandler): """ |