From 1deb60037ed061c5dd973005b81c106561032ebe Mon Sep 17 00:00:00 2001 From: Lars-Dominik Braun Date: Sat, 25 Apr 2020 08:14:27 +0200 Subject: Improve lulua-write Introduce composable filters, switch to brotli-compressed tarballs, which has good ratios and fast decompression, reducing I/O significantly. --- .gitignore | 3 + gen.sh | 34 ++++--- lulua/test_text.py | 14 ++- lulua/text.py | 292 +++++++++++++++++++++++++++++++++++++---------------- setup.py | 2 + 5 files changed, 244 insertions(+), 101 deletions(-) diff --git a/.gitignore b/.gitignore index 24ac710..97c835b 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,6 @@ __pycache__ .ninja_* build.ninja _build +corpus/*/*.tar.br +corpus/arwiki/*.bz2 +corpus/osm/*.pbf diff --git a/gen.sh b/gen.sh index 24ba833..2bf35a0 100755 --- a/gen.sh +++ b/gen.sh @@ -47,35 +47,35 @@ rule analyze-heat command = lulua-analyze -l \$layout keyheatmap < \$in > \$out rule write-bbcarabic - command = find \$in -type f | lulua-write bbcarabic \$layout | lulua-analyze combine > \$out + command = find \$in | lulua-write \$layout file brotli tar bbcarabic | lulua-analyze combine > \$out pool = write rule write-aljazeera - command = find \$in -type f | lulua-write aljazeera \$layout | lulua-analyze combine > \$out + command = find \$in | lulua-write \$layout file brotli tar aljazeera | lulua-analyze combine > \$out pool = write rule write-epub - command = find \$in -type f | lulua-write epub \$layout | lulua-analyze combine > \$out + command = find \$in | lulua-write \$layout epub | lulua-analyze combine > \$out pool = write rule write-tanzil - command = echo \$in | lulua-write text \$layout | lulua-analyze combine > \$out + command = find \$in | lulua-write \$layout file text | lulua-analyze combine > \$out pool = write rule write-tei2 - command = find \$in -type f -name '*.xml' | lulua-write tei2 \$layout | lulua-analyze combine > \$out + command = find \$in | lulua-write \$layout file brotli tar xml tei2 | lulua-analyze combine > \$out pool = write rule write-opensubtitles - command = find \$in -type f -name '*.xml' | lulua-write opensubtitles \$layout | lulua-analyze combine > \$out + command = find \$in | lulua-write \$layout file brotli tar xml opensubtitles | lulua-analyze combine > \$out pool = write rule write-arwiki - command = \$wikiextractor -ns 0 --json -o - \$in 2>/dev/null | jq .text | lulua-write json \$layout | lulua-analyze combine > \$out + command = \$wikiextractor -ns 0 --json -o - \$in 2>/dev/null | jq .text | lulua-write \$layout json | lulua-analyze combine > \$out pool = write rule write-osm - command = \$osmconvert --csv='name:ar' \$in | sort -u | lulua-write lines \$layout | lulua-analyze combine > \$out + command = \$osmconvert --csv='name:ar' \$in | sort -u | lulua-write \$layout lines | lulua-analyze combine > \$out pool = write rule combine @@ -152,21 +152,27 @@ build \$reportdir/ar-lulua-w64.zip: zipR \$tempdir/ar-lulua-w64 | $deps EOF +bbcarabicfiles=`find $corpusdir/bbcarabic/ -type f -name '*.tar.br' | tr '\n' ' '` +aljazeerafiles=`find $corpusdir/aljazeera/ -type f -name '*.tar.br' | tr '\n' ' '` +unfiles=`find $corpusdir/un-v1.0-tei/ -type f -name '*.tar.br' | tr '\n' ' '` +opensubtitlesfiles=`find $corpusdir/opensubtitles-2018/ -type f -name '*.tar.br' | tr '\n' ' '` +hindawifiles=`find $corpusdir/hindawi/ -type f -name '*.epub' | tr '\n' ' '` + # targets for every layout for l in $layouts; do cat <= num: + break + self.buf += self.decompressor.process (self.fd.read (self.readchunk)) + if num is not None: + b = self.buf[0:num] + self.buf = self.buf[num:] + else: + b = self.buf + self.buf = b'' + return b + + def seekable (self): + return False def close (self): - self.p.wait () - assert self.p.returncode == 0 + self.decompressor = None -def sourceHtml (selectFunc, item): - with LzipFile (item.rstrip ()) as fd: - document = html5lib.parse (fd) - walker = html5lib.getTreeWalker("etree") - stream = walker (document) - s = HTMLSerializer() - yield ''.join (s.serialize(Select (stream, selectFunc))) +def filterTar (fd): + # Python’s tarfile module is painfully slow. We can do better. + pos = 0 + blocksize = 512 + emptyblock = b'\0'*blocksize -def sourceEpub (item): + while True: + # read header + header = fd.read (blocksize) + pos += blocksize + if header == b'' or header == emptyblock: + break + assert header[256:256+8] == b'\0ustar ', (header[256:256+8]) + size = int (header[124:124+12].rstrip (b'\0'), base=8) + + # read body + if size > 0: + yield BytesIO (fd.read (size)) + pos += size + + # align to next 512 byte block + into = pos%blocksize + if into != 0: + pad = blocksize-into + fd.read (pad) + pos += pad + +def filterBrotli (fd): + yield BrotliFile (fd) + +def filterHtml (selectFunc, fd): + document = html5lib.parse (fd) + walker = html5lib.getTreeWalker("etree") + stream = walker (document) + s = HTMLSerializer() + yield ''.join (s.serialize(Select (stream, selectFunc))) + +def filterEpub (item): """ epub reader """ book = epub.read_epub (item.rstrip ()) logging.debug (f'reading ebook {item}') @@ -155,66 +195,95 @@ def sourceEpub (item): s = HTMLSerializer() yield ''.join (s.serialize (stream)) -def sourceText (item): - with LzipFile (item.rstrip ()) as fd: - yield fd.read ().decode ('utf-8') +def filterText (fd): + yield fd.read ().decode ('utf-8') -def sourceLines (item): +def filterLines (item): """ Read items (i.e. lines) as is """ yield item -def sourceJson (item): +def filterJson (item): yield json.loads (item) +def filterFile (item): + with open (item.rstrip (), 'rb') as fd: + yield fd + +def filterXml (fd): + try: + yield xml.dom.minidom.parse (fd) + except Exception: + logging.error (f'invalid xml document {fd}') + def getText (nodelist): + """ Helper to retrieve text from an XML node list """ rc = [] for node in nodelist: if node.nodeType == node.TEXT_NODE: rc.append(node.data) return ''.join(rc) -def sourceTEI2 (item): +def filterTEI2 (doc): """ TEI.2 format used for United Nations parallel corpus """ - with open (item.rstrip (), 'rb') as fd: - try: - out = [] - doc = xml.dom.minidom.parse (fd) - for text in doc.getElementsByTagName ('text'): - for body in text.getElementsByTagName ('body'): - for p in body.getElementsByTagName ('p'): - for s in p.getElementsByTagName ('s'): - out.append (getText (s.childNodes)) - out.append ('') - yield '\n'.join (out) - except Exception: - logging.error (f'invalid xml document {item}') - -def sourceOpenSubtitles (item): + out = [] + for text in doc.getElementsByTagName ('text'): + for body in text.getElementsByTagName ('body'): + for p in body.getElementsByTagName ('p'): + for s in p.getElementsByTagName ('s'): + out.append (getText (s.childNodes)) + out.append ('') + yield '\n'.join (out) + +def filterOpenSubtitles (doc): """ XML-based format used by the (raw!) OpenSubtitles dump found here: http://opus.nlpl.eu/OpenSubtitles-v2018.php """ - with open (item.rstrip (), 'rb') as fd: - try: - out = [] - doc = xml.dom.minidom.parse (fd) - for s in doc.getElementsByTagName ('s'): - # strip newlines, which are mostly unintentional due to - # pretty-printed xml structure - out.append (getText (s.childNodes).strip ()) - yield '\n'.join (out) - except Exception as e: - logging.error (f'invalid xml document {item} {e}') - -sources = dict( - aljazeera=partial(sourceHtml, f['aljazeera']), - bbcarabic=partial(sourceHtml, f['bbcarabic']), - text=sourceText, - json=sourceJson, - epub=sourceEpub, - tei2=sourceTEI2, - opensubtitles=sourceOpenSubtitles, - lines=sourceLines, + + out = [] + for s in doc.getElementsByTagName ('s'): + # strip newlines, which are mostly unintentional due to + # pretty-printed xml structure + out.append (getText (s.childNodes).strip ()) + yield '\n'.join (out) + +def filterMediawikiMarkdown (text): + """ + Convert mediawiki to markdown + """ + p = subprocess.Popen (['pandoc', '-f', 'mediawiki', '-t', 'markdown'], + stdin=subprocess.PIPE, stdout=subprocess.PIPE) + p.stdin.write (text.encode ('utf-8')) + p.stdin.close () + text = p.stdout.read ().decode ('utf-8') + ret = p.wait () + # make sure we’re not leaking fd’s + p.stdout.close () + del p + if ret != 0: + logging.error ('pandoc rejected document') + else: + yield text + +f = dict( + aljazeera=lambda x: x['name'] == 'div' and x['data'].get ((None, 'id')) == 'DynamicContentContainer', + bbcarabic=lambda x: x['name'] == 'div' and x['data'].get ((None, 'property')) == 'articleBody', + ) + +filterAvail = dict( + aljazeera=partial(filterHtml, f['aljazeera']), + bbcarabic=partial(filterHtml, f['bbcarabic']), + text=filterText, + json=filterJson, + epub=filterEpub, + tei2=filterTEI2, + opensubtitles=filterOpenSubtitles, + lines=filterLines, + xml=filterXml, + file=filterFile, + tar=filterTar, + mediawikimarkdown=filterMediawikiMarkdown, + brotli=filterBrotli, ) charMap = { @@ -248,7 +317,14 @@ def mapChars (text, m): """ For all characters in text, replace if found in map m or keep as-is """ return ''.join (map (lambda x: m.get (x, x), text)) -def writeWorker (layout, sourceFunc, inq, outq): +def apply (fs, items): + """ Apply the first function fs[0] to all items, flatten the result and repeat """ + if not fs: + return items + else: + return apply (fs[1:], chain.from_iterable (map (fs[0], items))) + +def writeWorker (layout, funcs, inq, outq, statusq, benchmark): try: keyboard = defaultKeyboards['ibmpc105'] combined = makeCombined (keyboard) @@ -259,8 +335,13 @@ def writeWorker (layout, sourceFunc, inq, outq): if item is None: break - # extract (can be multiple items per source) - for text in sourceFunc (item): + # extract (can be multiple texts per item) + i = 0 + for text in apply (funcs, [item]): + if benchmark: + i += 1 + continue + # map chars; make sure we’re using unix line endings, which is # only one character text = mapChars (text, charMap).replace ('\r\n', '\n') @@ -278,28 +359,41 @@ def writeWorker (layout, sourceFunc, inq, outq): for s in stats: combined[s.name].update (s) - itemsProcessed += 1 - + i += 1 + # only update ocasionally, this is an expensive operation + statusq.put (i) + itemsProcessed += i if itemsProcessed > 0: outq.put (combined) else: outq.put (None) except Exception as e: # async exceptions - outq.put (None) - raise + outq.put (e) + +def statusWorker (statusq): + with tqdm (unit='item', smoothing=0) as bar: + while True: + try: + num = statusq.get (block=True, timeout=1) + if num is None: + break + bar.update (n=num) + except queue.Empty: + bar.update (n=0) def write (): """ Extract corpus source file, convert to plain text, map chars and create stats """ parser = argparse.ArgumentParser(description='Import text and create stats.') - parser.add_argument('-v', '--verbose', action='store_true', help='Enable debugging output') + parser.add_argument('--benchmark', action='store_true', help='Benchmark filter, no stats') + parser.add_argument('-j', '--jobs', metavar='NUM', + default=cpu_count (), type=int, help='Number of parallel jobs') parser.add_argument('-k', '--keyboard', metavar='KEYBOARD', default='ibmpc105', help='Physical keyboard name') - parser.add_argument('-j', '--jobs', metavar='NUM', - default=cpu_count (), help='Number of parallel jobs') - parser.add_argument('source', metavar='SOURCE', choices=sources.keys(), help='Data source extractor name') + parser.add_argument('-v', '--verbose', action='store_true', help='Enable debugging output') parser.add_argument('layout', metavar='LAYOUT', help='Keyboard layout name') + parser.add_argument('filter', metavar='FILTER', choices=filterAvail.keys(), nargs='+', help='Data filter') args = parser.parse_args() @@ -310,30 +404,36 @@ def write (): keyboard = defaultKeyboards[args.keyboard] layout = defaultLayouts[args.layout].specialize (keyboard) + filterSel = [filterAvail[x] for x in args.filter] # limit queue sizes to limit memory usage inq = Queue (args.jobs*2) outq = Queue (args.jobs+1) + statusq = Queue (args.jobs+1) logging.info (f'using {args.jobs} workers') workers = [] for i in range (args.jobs): p = Process(target=writeWorker, - args=(layout, sources[args.source], inq, outq), + args=(layout, filterSel, inq, outq, statusq, args.benchmark), daemon=True, name=f'worker-{i}') p.start() workers.append (p) + statusp = Process(target=statusWorker, + args=(statusq,), + daemon=True, + name=f'status') + statusp.start() + try: - with tqdm (unit='item') as bar: - for l in sys.stdin: - inq.put (l) - bar.update (n=1) - - # something is wrong - if not outq.empty (): - return 1 + for l in sys.stdin: + inq.put (l) + + # something is wrong + if not outq.empty (): + break except KeyboardInterrupt: pass @@ -342,12 +442,32 @@ def write (): for w in workers: inq.put (None) item = outq.get () + if isinstance (item, Exception): + raise item if item is not None: pickle.dump (item, sys.stdout.buffer, pickle.HIGHEST_PROTOCOL) assert outq.empty () + + statusq.put (None) + statusp.join () + # and then we can kill them for w in workers: w.join () return 0 +import bz2, sys, json, subprocess +from xml.etree.ElementTree import iterparse +def extractMediawiki (): + parser = argparse.ArgumentParser(description='Extract raw wikitext from mediawiki dump.') + parser.add_argument('file', metavar='FILE', help='bzip2-compressed dump') + args = parser.parse_args() + + with bz2.open (args.file, 'r') as fd: + for event, elem in iterparse (fd, ['start', 'end']): + if event == 'end' and elem.tag == '{http://www.mediawiki.org/xml/export-0.10/}text': + text = ''.join (elem.itertext ()) + json.dump (text, sys.stdout, ensure_ascii=False) + sys.stdout.write ('\n') + diff --git a/setup.py b/setup.py index 4134766..4ec725a 100644 --- a/setup.py +++ b/setup.py @@ -40,6 +40,7 @@ setup( 'html5lib', 'ebooklib', 'jinja2', + 'brotli', ], entry_points={ 'console_scripts': [ @@ -49,6 +50,7 @@ setup( 'lulua-import = lulua.layout:importFrom', 'lulua-optimize = lulua.optimize:optimize', 'lulua-write = lulua.text:write', + 'lulua-extract-mediawiki = lulua.text:extractMediawiki', ], }, package_data = { -- cgit v1.2.3