From d20528cf1f5ee85162c449d9c74c2c862ab3d4ff Mon Sep 17 00:00:00 2001 From: Lars-Dominik Braun Date: Tue, 19 Dec 2017 10:52:23 +0100 Subject: Serialize WARC writing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Logger and SiteWriter both access .write_record() concurrently, which can corrupt WARC files. Move the writer to its own thread and decouple it with a queue. Since we’re probably I/O-bound this may speed up writeback as well. --- crocoite/warc.py | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) (limited to 'crocoite/warc.py') diff --git a/crocoite/warc.py b/crocoite/warc.py index 8e443fd..92ae601 100644 --- a/crocoite/warc.py +++ b/crocoite/warc.py @@ -34,7 +34,42 @@ from urllib.parse import urlsplit from logging.handlers import BufferingHandler import pychrome from datetime import datetime +from threading import Thread +from queue import Queue + from warcio.timeutils import datetime_to_iso_date +from warcio.warcwriter import WARCWriter + +class SerializingWARCWriter (WARCWriter): + """ + Serializing WARC writer using separate writer thread and queue for + non-blocking operation + + Needs an explicit .flush() before deletion. + """ + + def __init__ (self, filebuf, *args, **kwargs): + WARCWriter.__init__ (self, filebuf, *args, **kwargs) + self.queue = Queue () + self.thread = Thread (target=self._run_writer) + self.thread.start () + + def flush (self): + self.queue.put (None) + self.thread.join () + self.queue = None + self.thread = None + + def _run_writer (self): + while True: + item = self.queue.get () + if not item: + break + out, record = item + WARCWriter._write_warc_record (self, out, record) + + def _write_warc_record (self, out, record): + self.queue.put ((out, record)) class WARCLogHandler (BufferingHandler): """ -- cgit v1.2.3