From 1deb60037ed061c5dd973005b81c106561032ebe Mon Sep 17 00:00:00 2001 From: Lars-Dominik Braun Date: Sat, 25 Apr 2020 08:14:27 +0200 Subject: Improve lulua-write Introduce composable filters, switch to brotli-compressed tarballs, which has good ratios and fast decompression, reducing I/O significantly. --- lulua/test_text.py | 14 ++- lulua/text.py | 292 +++++++++++++++++++++++++++++++++++++---------------- 2 files changed, 219 insertions(+), 87 deletions(-) (limited to 'lulua') diff --git a/lulua/test_text.py b/lulua/test_text.py index 65aa3a1..cd8ae7a 100644 --- a/lulua/test_text.py +++ b/lulua/test_text.py @@ -18,7 +18,10 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. -from .text import charMap, mapChars +import brotli +from io import BytesIO + +from .text import charMap, mapChars, BrotliFile def test_map_chars_mapped (): """ Make sure all chars in the map are mapped correctly """ @@ -46,3 +49,12 @@ def test_map_chars_not_mapped (): outText = mapChars (inText, charMap) assert outText == expectText +def test_brotlifile (): + compressed = brotli.compress (b'hello world') + for chunk in (1, 2, 3, 1024, None): + f = BrotliFile (BytesIO (compressed), chunk) + s = f.read (1) + assert s == b'h' + s = f.read () + assert s == b'ello world' + diff --git a/lulua/text.py b/lulua/text.py index 41be637..77b259e 100644 --- a/lulua/text.py +++ b/lulua/text.py @@ -22,17 +22,19 @@ Text/corpus handling tools """ -import sys, argparse, pickle, json, logging, xml.dom.minidom -from io import StringIO +import sys, argparse, pickle, json, logging, xml.dom.minidom, queue +from io import StringIO, BytesIO from functools import partial +from itertools import chain from multiprocessing import Process, Queue, cpu_count, current_process from subprocess import Popen, PIPE + from tqdm import tqdm import ebooklib from ebooklib import epub - import html5lib from html5lib.filters.base import Filter +import brotli from .keyboard import defaultKeyboards from .layout import defaultLayouts @@ -108,40 +110,78 @@ class HTMLSerializer(object): else: assert False -f = dict( - aljazeera=lambda x: x['name'] == 'div' and x['data'].get ((None, 'id')) == 'DynamicContentContainer', - bbcarabic=lambda x: x['name'] == 'div' and x['data'].get ((None, 'property')) == 'articleBody', - ) - -class LzipFile: - __slots__ = ('p', ) +class BrotliFile: + __slots__ = ('decompressor', 'readchunk', 'fd', 'buf') - def __init__ (self, path): - self.p = Popen (['/usr/bin/lzip', '-c', '-d', path], stdout=PIPE) + def __init__ (self, fd, readchunk=100*1024): + self.fd = fd + self.readchunk = readchunk + self.decompressor = brotli.Decompressor () + self.buf = b'' def __enter__ (self): return self def __exit__ (self, exc_type, exc_val, exc_tb): - self.close () return True def read (self, num=None): - return self.p.stdout.read (num) + while not self.decompressor.is_finished (): + if num is not None and len (self.buf) >= num: + break + self.buf += self.decompressor.process (self.fd.read (self.readchunk)) + if num is not None: + b = self.buf[0:num] + self.buf = self.buf[num:] + else: + b = self.buf + self.buf = b'' + return b + + def seekable (self): + return False def close (self): - self.p.wait () - assert self.p.returncode == 0 + self.decompressor = None -def sourceHtml (selectFunc, item): - with LzipFile (item.rstrip ()) as fd: - document = html5lib.parse (fd) - walker = html5lib.getTreeWalker("etree") - stream = walker (document) - s = HTMLSerializer() - yield ''.join (s.serialize(Select (stream, selectFunc))) +def filterTar (fd): + # Python’s tarfile module is painfully slow. We can do better. + pos = 0 + blocksize = 512 + emptyblock = b'\0'*blocksize -def sourceEpub (item): + while True: + # read header + header = fd.read (blocksize) + pos += blocksize + if header == b'' or header == emptyblock: + break + assert header[256:256+8] == b'\0ustar ', (header[256:256+8]) + size = int (header[124:124+12].rstrip (b'\0'), base=8) + + # read body + if size > 0: + yield BytesIO (fd.read (size)) + pos += size + + # align to next 512 byte block + into = pos%blocksize + if into != 0: + pad = blocksize-into + fd.read (pad) + pos += pad + +def filterBrotli (fd): + yield BrotliFile (fd) + +def filterHtml (selectFunc, fd): + document = html5lib.parse (fd) + walker = html5lib.getTreeWalker("etree") + stream = walker (document) + s = HTMLSerializer() + yield ''.join (s.serialize(Select (stream, selectFunc))) + +def filterEpub (item): """ epub reader """ book = epub.read_epub (item.rstrip ()) logging.debug (f'reading ebook {item}') @@ -155,66 +195,95 @@ def sourceEpub (item): s = HTMLSerializer() yield ''.join (s.serialize (stream)) -def sourceText (item): - with LzipFile (item.rstrip ()) as fd: - yield fd.read ().decode ('utf-8') +def filterText (fd): + yield fd.read ().decode ('utf-8') -def sourceLines (item): +def filterLines (item): """ Read items (i.e. lines) as is """ yield item -def sourceJson (item): +def filterJson (item): yield json.loads (item) +def filterFile (item): + with open (item.rstrip (), 'rb') as fd: + yield fd + +def filterXml (fd): + try: + yield xml.dom.minidom.parse (fd) + except Exception: + logging.error (f'invalid xml document {fd}') + def getText (nodelist): + """ Helper to retrieve text from an XML node list """ rc = [] for node in nodelist: if node.nodeType == node.TEXT_NODE: rc.append(node.data) return ''.join(rc) -def sourceTEI2 (item): +def filterTEI2 (doc): """ TEI.2 format used for United Nations parallel corpus """ - with open (item.rstrip (), 'rb') as fd: - try: - out = [] - doc = xml.dom.minidom.parse (fd) - for text in doc.getElementsByTagName ('text'): - for body in text.getElementsByTagName ('body'): - for p in body.getElementsByTagName ('p'): - for s in p.getElementsByTagName ('s'): - out.append (getText (s.childNodes)) - out.append ('') - yield '\n'.join (out) - except Exception: - logging.error (f'invalid xml document {item}') - -def sourceOpenSubtitles (item): + out = [] + for text in doc.getElementsByTagName ('text'): + for body in text.getElementsByTagName ('body'): + for p in body.getElementsByTagName ('p'): + for s in p.getElementsByTagName ('s'): + out.append (getText (s.childNodes)) + out.append ('') + yield '\n'.join (out) + +def filterOpenSubtitles (doc): """ XML-based format used by the (raw!) OpenSubtitles dump found here: http://opus.nlpl.eu/OpenSubtitles-v2018.php """ - with open (item.rstrip (), 'rb') as fd: - try: - out = [] - doc = xml.dom.minidom.parse (fd) - for s in doc.getElementsByTagName ('s'): - # strip newlines, which are mostly unintentional due to - # pretty-printed xml structure - out.append (getText (s.childNodes).strip ()) - yield '\n'.join (out) - except Exception as e: - logging.error (f'invalid xml document {item} {e}') - -sources = dict( - aljazeera=partial(sourceHtml, f['aljazeera']), - bbcarabic=partial(sourceHtml, f['bbcarabic']), - text=sourceText, - json=sourceJson, - epub=sourceEpub, - tei2=sourceTEI2, - opensubtitles=sourceOpenSubtitles, - lines=sourceLines, + + out = [] + for s in doc.getElementsByTagName ('s'): + # strip newlines, which are mostly unintentional due to + # pretty-printed xml structure + out.append (getText (s.childNodes).strip ()) + yield '\n'.join (out) + +def filterMediawikiMarkdown (text): + """ + Convert mediawiki to markdown + """ + p = subprocess.Popen (['pandoc', '-f', 'mediawiki', '-t', 'markdown'], + stdin=subprocess.PIPE, stdout=subprocess.PIPE) + p.stdin.write (text.encode ('utf-8')) + p.stdin.close () + text = p.stdout.read ().decode ('utf-8') + ret = p.wait () + # make sure we’re not leaking fd’s + p.stdout.close () + del p + if ret != 0: + logging.error ('pandoc rejected document') + else: + yield text + +f = dict( + aljazeera=lambda x: x['name'] == 'div' and x['data'].get ((None, 'id')) == 'DynamicContentContainer', + bbcarabic=lambda x: x['name'] == 'div' and x['data'].get ((None, 'property')) == 'articleBody', + ) + +filterAvail = dict( + aljazeera=partial(filterHtml, f['aljazeera']), + bbcarabic=partial(filterHtml, f['bbcarabic']), + text=filterText, + json=filterJson, + epub=filterEpub, + tei2=filterTEI2, + opensubtitles=filterOpenSubtitles, + lines=filterLines, + xml=filterXml, + file=filterFile, + tar=filterTar, + mediawikimarkdown=filterMediawikiMarkdown, + brotli=filterBrotli, ) charMap = { @@ -248,7 +317,14 @@ def mapChars (text, m): """ For all characters in text, replace if found in map m or keep as-is """ return ''.join (map (lambda x: m.get (x, x), text)) -def writeWorker (layout, sourceFunc, inq, outq): +def apply (fs, items): + """ Apply the first function fs[0] to all items, flatten the result and repeat """ + if not fs: + return items + else: + return apply (fs[1:], chain.from_iterable (map (fs[0], items))) + +def writeWorker (layout, funcs, inq, outq, statusq, benchmark): try: keyboard = defaultKeyboards['ibmpc105'] combined = makeCombined (keyboard) @@ -259,8 +335,13 @@ def writeWorker (layout, sourceFunc, inq, outq): if item is None: break - # extract (can be multiple items per source) - for text in sourceFunc (item): + # extract (can be multiple texts per item) + i = 0 + for text in apply (funcs, [item]): + if benchmark: + i += 1 + continue + # map chars; make sure we’re using unix line endings, which is # only one character text = mapChars (text, charMap).replace ('\r\n', '\n') @@ -278,28 +359,41 @@ def writeWorker (layout, sourceFunc, inq, outq): for s in stats: combined[s.name].update (s) - itemsProcessed += 1 - + i += 1 + # only update ocasionally, this is an expensive operation + statusq.put (i) + itemsProcessed += i if itemsProcessed > 0: outq.put (combined) else: outq.put (None) except Exception as e: # async exceptions - outq.put (None) - raise + outq.put (e) + +def statusWorker (statusq): + with tqdm (unit='item', smoothing=0) as bar: + while True: + try: + num = statusq.get (block=True, timeout=1) + if num is None: + break + bar.update (n=num) + except queue.Empty: + bar.update (n=0) def write (): """ Extract corpus source file, convert to plain text, map chars and create stats """ parser = argparse.ArgumentParser(description='Import text and create stats.') - parser.add_argument('-v', '--verbose', action='store_true', help='Enable debugging output') + parser.add_argument('--benchmark', action='store_true', help='Benchmark filter, no stats') + parser.add_argument('-j', '--jobs', metavar='NUM', + default=cpu_count (), type=int, help='Number of parallel jobs') parser.add_argument('-k', '--keyboard', metavar='KEYBOARD', default='ibmpc105', help='Physical keyboard name') - parser.add_argument('-j', '--jobs', metavar='NUM', - default=cpu_count (), help='Number of parallel jobs') - parser.add_argument('source', metavar='SOURCE', choices=sources.keys(), help='Data source extractor name') + parser.add_argument('-v', '--verbose', action='store_true', help='Enable debugging output') parser.add_argument('layout', metavar='LAYOUT', help='Keyboard layout name') + parser.add_argument('filter', metavar='FILTER', choices=filterAvail.keys(), nargs='+', help='Data filter') args = parser.parse_args() @@ -310,30 +404,36 @@ def write (): keyboard = defaultKeyboards[args.keyboard] layout = defaultLayouts[args.layout].specialize (keyboard) + filterSel = [filterAvail[x] for x in args.filter] # limit queue sizes to limit memory usage inq = Queue (args.jobs*2) outq = Queue (args.jobs+1) + statusq = Queue (args.jobs+1) logging.info (f'using {args.jobs} workers') workers = [] for i in range (args.jobs): p = Process(target=writeWorker, - args=(layout, sources[args.source], inq, outq), + args=(layout, filterSel, inq, outq, statusq, args.benchmark), daemon=True, name=f'worker-{i}') p.start() workers.append (p) + statusp = Process(target=statusWorker, + args=(statusq,), + daemon=True, + name=f'status') + statusp.start() + try: - with tqdm (unit='item') as bar: - for l in sys.stdin: - inq.put (l) - bar.update (n=1) - - # something is wrong - if not outq.empty (): - return 1 + for l in sys.stdin: + inq.put (l) + + # something is wrong + if not outq.empty (): + break except KeyboardInterrupt: pass @@ -342,12 +442,32 @@ def write (): for w in workers: inq.put (None) item = outq.get () + if isinstance (item, Exception): + raise item if item is not None: pickle.dump (item, sys.stdout.buffer, pickle.HIGHEST_PROTOCOL) assert outq.empty () + + statusq.put (None) + statusp.join () + # and then we can kill them for w in workers: w.join () return 0 +import bz2, sys, json, subprocess +from xml.etree.ElementTree import iterparse +def extractMediawiki (): + parser = argparse.ArgumentParser(description='Extract raw wikitext from mediawiki dump.') + parser.add_argument('file', metavar='FILE', help='bzip2-compressed dump') + args = parser.parse_args() + + with bz2.open (args.file, 'r') as fd: + for event, elem in iterparse (fd, ['start', 'end']): + if event == 'end' and elem.tag == '{http://www.mediawiki.org/xml/export-0.10/}text': + text = ''.join (elem.itertext ()) + json.dump (text, sys.stdout, ensure_ascii=False) + sys.stdout.write ('\n') + -- cgit v1.2.3