summaryrefslogtreecommitdiff
path: root/crocoite/warc.py
diff options
context:
space:
mode:
authorLars-Dominik Braun <lars@6xq.net>2017-12-19 10:52:23 +0100
committerLars-Dominik Braun <lars@6xq.net>2017-12-19 10:52:23 +0100
commitd20528cf1f5ee85162c449d9c74c2c862ab3d4ff (patch)
treed78cab05988dc2aa2161cd522a7c4781d487c0b6 /crocoite/warc.py
parent8e939f8922815bd917f4dd750aa5f8a17a8f750c (diff)
downloadcrocoite-d20528cf1f5ee85162c449d9c74c2c862ab3d4ff.tar.gz
crocoite-d20528cf1f5ee85162c449d9c74c2c862ab3d4ff.tar.bz2
crocoite-d20528cf1f5ee85162c449d9c74c2c862ab3d4ff.zip
Serialize WARC writing
Logger and SiteWriter both access .write_record() concurrently, which can corrupt WARC files. Move the writer to its own thread and decouple it with a queue. Since we’re probably I/O-bound this may speed up writeback as well.
Diffstat (limited to 'crocoite/warc.py')
-rw-r--r--crocoite/warc.py35
1 files changed, 35 insertions, 0 deletions
diff --git a/crocoite/warc.py b/crocoite/warc.py
index 8e443fd..92ae601 100644
--- a/crocoite/warc.py
+++ b/crocoite/warc.py
@@ -34,7 +34,42 @@ from urllib.parse import urlsplit
from logging.handlers import BufferingHandler
import pychrome
from datetime import datetime
+from threading import Thread
+from queue import Queue
+
from warcio.timeutils import datetime_to_iso_date
+from warcio.warcwriter import WARCWriter
+
+class SerializingWARCWriter (WARCWriter):
+ """
+ Serializing WARC writer using separate writer thread and queue for
+ non-blocking operation
+
+ Needs an explicit .flush() before deletion.
+ """
+
+ def __init__ (self, filebuf, *args, **kwargs):
+ WARCWriter.__init__ (self, filebuf, *args, **kwargs)
+ self.queue = Queue ()
+ self.thread = Thread (target=self._run_writer)
+ self.thread.start ()
+
+ def flush (self):
+ self.queue.put (None)
+ self.thread.join ()
+ self.queue = None
+ self.thread = None
+
+ def _run_writer (self):
+ while True:
+ item = self.queue.get ()
+ if not item:
+ break
+ out, record = item
+ WARCWriter._write_warc_record (self, out, record)
+
+ def _write_warc_record (self, out, record):
+ self.queue.put ((out, record))
class WARCLogHandler (BufferingHandler):
"""