summaryrefslogtreecommitdiff
path: root/lulua
diff options
context:
space:
mode:
authorLars-Dominik Braun <lars@6xq.net>2020-04-25 08:14:27 +0200
committerLars-Dominik Braun <lars@6xq.net>2020-04-25 08:14:27 +0200
commit1deb60037ed061c5dd973005b81c106561032ebe (patch)
treeff8954bdbb59429a46cd9e362f57e8e81e47dfb5 /lulua
parent528daea04681eeb012c5bd963463ebeabebdc1bd (diff)
downloadlulua-1deb60037ed061c5dd973005b81c106561032ebe.tar.gz
lulua-1deb60037ed061c5dd973005b81c106561032ebe.tar.bz2
lulua-1deb60037ed061c5dd973005b81c106561032ebe.zip
Improve lulua-write
Introduce composable filters, switch to brotli-compressed tarballs, which has good ratios and fast decompression, reducing I/O significantly.
Diffstat (limited to 'lulua')
-rw-r--r--lulua/test_text.py14
-rw-r--r--lulua/text.py292
2 files changed, 219 insertions, 87 deletions
diff --git a/lulua/test_text.py b/lulua/test_text.py
index 65aa3a1..cd8ae7a 100644
--- a/lulua/test_text.py
+++ b/lulua/test_text.py
@@ -18,7 +18,10 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
-from .text import charMap, mapChars
+import brotli
+from io import BytesIO
+
+from .text import charMap, mapChars, BrotliFile
def test_map_chars_mapped ():
""" Make sure all chars in the map are mapped correctly """
@@ -46,3 +49,12 @@ def test_map_chars_not_mapped ():
outText = mapChars (inText, charMap)
assert outText == expectText
+def test_brotlifile ():
+ compressed = brotli.compress (b'hello world')
+ for chunk in (1, 2, 3, 1024, None):
+ f = BrotliFile (BytesIO (compressed), chunk)
+ s = f.read (1)
+ assert s == b'h'
+ s = f.read ()
+ assert s == b'ello world'
+
diff --git a/lulua/text.py b/lulua/text.py
index 41be637..77b259e 100644
--- a/lulua/text.py
+++ b/lulua/text.py
@@ -22,17 +22,19 @@
Text/corpus handling tools
"""
-import sys, argparse, pickle, json, logging, xml.dom.minidom
-from io import StringIO
+import sys, argparse, pickle, json, logging, xml.dom.minidom, queue
+from io import StringIO, BytesIO
from functools import partial
+from itertools import chain
from multiprocessing import Process, Queue, cpu_count, current_process
from subprocess import Popen, PIPE
+
from tqdm import tqdm
import ebooklib
from ebooklib import epub
-
import html5lib
from html5lib.filters.base import Filter
+import brotli
from .keyboard import defaultKeyboards
from .layout import defaultLayouts
@@ -108,40 +110,78 @@ class HTMLSerializer(object):
else:
assert False
-f = dict(
- aljazeera=lambda x: x['name'] == 'div' and x['data'].get ((None, 'id')) == 'DynamicContentContainer',
- bbcarabic=lambda x: x['name'] == 'div' and x['data'].get ((None, 'property')) == 'articleBody',
- )
-
-class LzipFile:
- __slots__ = ('p', )
+class BrotliFile:
+ __slots__ = ('decompressor', 'readchunk', 'fd', 'buf')
- def __init__ (self, path):
- self.p = Popen (['/usr/bin/lzip', '-c', '-d', path], stdout=PIPE)
+ def __init__ (self, fd, readchunk=100*1024):
+ self.fd = fd
+ self.readchunk = readchunk
+ self.decompressor = brotli.Decompressor ()
+ self.buf = b''
def __enter__ (self):
return self
def __exit__ (self, exc_type, exc_val, exc_tb):
- self.close ()
return True
def read (self, num=None):
- return self.p.stdout.read (num)
+ while not self.decompressor.is_finished ():
+ if num is not None and len (self.buf) >= num:
+ break
+ self.buf += self.decompressor.process (self.fd.read (self.readchunk))
+ if num is not None:
+ b = self.buf[0:num]
+ self.buf = self.buf[num:]
+ else:
+ b = self.buf
+ self.buf = b''
+ return b
+
+ def seekable (self):
+ return False
def close (self):
- self.p.wait ()
- assert self.p.returncode == 0
+ self.decompressor = None
-def sourceHtml (selectFunc, item):
- with LzipFile (item.rstrip ()) as fd:
- document = html5lib.parse (fd)
- walker = html5lib.getTreeWalker("etree")
- stream = walker (document)
- s = HTMLSerializer()
- yield ''.join (s.serialize(Select (stream, selectFunc)))
+def filterTar (fd):
+ # Python’s tarfile module is painfully slow. We can do better.
+ pos = 0
+ blocksize = 512
+ emptyblock = b'\0'*blocksize
-def sourceEpub (item):
+ while True:
+ # read header
+ header = fd.read (blocksize)
+ pos += blocksize
+ if header == b'' or header == emptyblock:
+ break
+ assert header[256:256+8] == b'\0ustar ', (header[256:256+8])
+ size = int (header[124:124+12].rstrip (b'\0'), base=8)
+
+ # read body
+ if size > 0:
+ yield BytesIO (fd.read (size))
+ pos += size
+
+ # align to next 512 byte block
+ into = pos%blocksize
+ if into != 0:
+ pad = blocksize-into
+ fd.read (pad)
+ pos += pad
+
+def filterBrotli (fd):
+ yield BrotliFile (fd)
+
+def filterHtml (selectFunc, fd):
+ document = html5lib.parse (fd)
+ walker = html5lib.getTreeWalker("etree")
+ stream = walker (document)
+ s = HTMLSerializer()
+ yield ''.join (s.serialize(Select (stream, selectFunc)))
+
+def filterEpub (item):
""" epub reader """
book = epub.read_epub (item.rstrip ())
logging.debug (f'reading ebook {item}')
@@ -155,66 +195,95 @@ def sourceEpub (item):
s = HTMLSerializer()
yield ''.join (s.serialize (stream))
-def sourceText (item):
- with LzipFile (item.rstrip ()) as fd:
- yield fd.read ().decode ('utf-8')
+def filterText (fd):
+ yield fd.read ().decode ('utf-8')
-def sourceLines (item):
+def filterLines (item):
""" Read items (i.e. lines) as is """
yield item
-def sourceJson (item):
+def filterJson (item):
yield json.loads (item)
+def filterFile (item):
+ with open (item.rstrip (), 'rb') as fd:
+ yield fd
+
+def filterXml (fd):
+ try:
+ yield xml.dom.minidom.parse (fd)
+ except Exception:
+ logging.error (f'invalid xml document {fd}')
+
def getText (nodelist):
+ """ Helper to retrieve text from an XML node list """
rc = []
for node in nodelist:
if node.nodeType == node.TEXT_NODE:
rc.append(node.data)
return ''.join(rc)
-def sourceTEI2 (item):
+def filterTEI2 (doc):
""" TEI.2 format used for United Nations parallel corpus """
- with open (item.rstrip (), 'rb') as fd:
- try:
- out = []
- doc = xml.dom.minidom.parse (fd)
- for text in doc.getElementsByTagName ('text'):
- for body in text.getElementsByTagName ('body'):
- for p in body.getElementsByTagName ('p'):
- for s in p.getElementsByTagName ('s'):
- out.append (getText (s.childNodes))
- out.append ('')
- yield '\n'.join (out)
- except Exception:
- logging.error (f'invalid xml document {item}')
-
-def sourceOpenSubtitles (item):
+ out = []
+ for text in doc.getElementsByTagName ('text'):
+ for body in text.getElementsByTagName ('body'):
+ for p in body.getElementsByTagName ('p'):
+ for s in p.getElementsByTagName ('s'):
+ out.append (getText (s.childNodes))
+ out.append ('')
+ yield '\n'.join (out)
+
+def filterOpenSubtitles (doc):
"""
XML-based format used by the (raw!) OpenSubtitles dump found here:
http://opus.nlpl.eu/OpenSubtitles-v2018.php
"""
- with open (item.rstrip (), 'rb') as fd:
- try:
- out = []
- doc = xml.dom.minidom.parse (fd)
- for s in doc.getElementsByTagName ('s'):
- # strip newlines, which are mostly unintentional due to
- # pretty-printed xml structure
- out.append (getText (s.childNodes).strip ())
- yield '\n'.join (out)
- except Exception as e:
- logging.error (f'invalid xml document {item} {e}')
-
-sources = dict(
- aljazeera=partial(sourceHtml, f['aljazeera']),
- bbcarabic=partial(sourceHtml, f['bbcarabic']),
- text=sourceText,
- json=sourceJson,
- epub=sourceEpub,
- tei2=sourceTEI2,
- opensubtitles=sourceOpenSubtitles,
- lines=sourceLines,
+
+ out = []
+ for s in doc.getElementsByTagName ('s'):
+ # strip newlines, which are mostly unintentional due to
+ # pretty-printed xml structure
+ out.append (getText (s.childNodes).strip ())
+ yield '\n'.join (out)
+
+def filterMediawikiMarkdown (text):
+ """
+ Convert mediawiki to markdown
+ """
+ p = subprocess.Popen (['pandoc', '-f', 'mediawiki', '-t', 'markdown'],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+ p.stdin.write (text.encode ('utf-8'))
+ p.stdin.close ()
+ text = p.stdout.read ().decode ('utf-8')
+ ret = p.wait ()
+ # make sure we’re not leaking fd’s
+ p.stdout.close ()
+ del p
+ if ret != 0:
+ logging.error ('pandoc rejected document')
+ else:
+ yield text
+
+f = dict(
+ aljazeera=lambda x: x['name'] == 'div' and x['data'].get ((None, 'id')) == 'DynamicContentContainer',
+ bbcarabic=lambda x: x['name'] == 'div' and x['data'].get ((None, 'property')) == 'articleBody',
+ )
+
+filterAvail = dict(
+ aljazeera=partial(filterHtml, f['aljazeera']),
+ bbcarabic=partial(filterHtml, f['bbcarabic']),
+ text=filterText,
+ json=filterJson,
+ epub=filterEpub,
+ tei2=filterTEI2,
+ opensubtitles=filterOpenSubtitles,
+ lines=filterLines,
+ xml=filterXml,
+ file=filterFile,
+ tar=filterTar,
+ mediawikimarkdown=filterMediawikiMarkdown,
+ brotli=filterBrotli,
)
charMap = {
@@ -248,7 +317,14 @@ def mapChars (text, m):
""" For all characters in text, replace if found in map m or keep as-is """
return ''.join (map (lambda x: m.get (x, x), text))
-def writeWorker (layout, sourceFunc, inq, outq):
+def apply (fs, items):
+ """ Apply the first function fs[0] to all items, flatten the result and repeat """
+ if not fs:
+ return items
+ else:
+ return apply (fs[1:], chain.from_iterable (map (fs[0], items)))
+
+def writeWorker (layout, funcs, inq, outq, statusq, benchmark):
try:
keyboard = defaultKeyboards['ibmpc105']
combined = makeCombined (keyboard)
@@ -259,8 +335,13 @@ def writeWorker (layout, sourceFunc, inq, outq):
if item is None:
break
- # extract (can be multiple items per source)
- for text in sourceFunc (item):
+ # extract (can be multiple texts per item)
+ i = 0
+ for text in apply (funcs, [item]):
+ if benchmark:
+ i += 1
+ continue
+
# map chars; make sure we’re using unix line endings, which is
# only one character
text = mapChars (text, charMap).replace ('\r\n', '\n')
@@ -278,28 +359,41 @@ def writeWorker (layout, sourceFunc, inq, outq):
for s in stats:
combined[s.name].update (s)
- itemsProcessed += 1
-
+ i += 1
+ # only update ocasionally, this is an expensive operation
+ statusq.put (i)
+ itemsProcessed += i
if itemsProcessed > 0:
outq.put (combined)
else:
outq.put (None)
except Exception as e:
# async exceptions
- outq.put (None)
- raise
+ outq.put (e)
+
+def statusWorker (statusq):
+ with tqdm (unit='item', smoothing=0) as bar:
+ while True:
+ try:
+ num = statusq.get (block=True, timeout=1)
+ if num is None:
+ break
+ bar.update (n=num)
+ except queue.Empty:
+ bar.update (n=0)
def write ():
""" Extract corpus source file, convert to plain text, map chars and create stats """
parser = argparse.ArgumentParser(description='Import text and create stats.')
- parser.add_argument('-v', '--verbose', action='store_true', help='Enable debugging output')
+ parser.add_argument('--benchmark', action='store_true', help='Benchmark filter, no stats')
+ parser.add_argument('-j', '--jobs', metavar='NUM',
+ default=cpu_count (), type=int, help='Number of parallel jobs')
parser.add_argument('-k', '--keyboard', metavar='KEYBOARD',
default='ibmpc105', help='Physical keyboard name')
- parser.add_argument('-j', '--jobs', metavar='NUM',
- default=cpu_count (), help='Number of parallel jobs')
- parser.add_argument('source', metavar='SOURCE', choices=sources.keys(), help='Data source extractor name')
+ parser.add_argument('-v', '--verbose', action='store_true', help='Enable debugging output')
parser.add_argument('layout', metavar='LAYOUT', help='Keyboard layout name')
+ parser.add_argument('filter', metavar='FILTER', choices=filterAvail.keys(), nargs='+', help='Data filter')
args = parser.parse_args()
@@ -310,30 +404,36 @@ def write ():
keyboard = defaultKeyboards[args.keyboard]
layout = defaultLayouts[args.layout].specialize (keyboard)
+ filterSel = [filterAvail[x] for x in args.filter]
# limit queue sizes to limit memory usage
inq = Queue (args.jobs*2)
outq = Queue (args.jobs+1)
+ statusq = Queue (args.jobs+1)
logging.info (f'using {args.jobs} workers')
workers = []
for i in range (args.jobs):
p = Process(target=writeWorker,
- args=(layout, sources[args.source], inq, outq),
+ args=(layout, filterSel, inq, outq, statusq, args.benchmark),
daemon=True,
name=f'worker-{i}')
p.start()
workers.append (p)
+ statusp = Process(target=statusWorker,
+ args=(statusq,),
+ daemon=True,
+ name=f'status')
+ statusp.start()
+
try:
- with tqdm (unit='item') as bar:
- for l in sys.stdin:
- inq.put (l)
- bar.update (n=1)
-
- # something is wrong
- if not outq.empty ():
- return 1
+ for l in sys.stdin:
+ inq.put (l)
+
+ # something is wrong
+ if not outq.empty ():
+ break
except KeyboardInterrupt:
pass
@@ -342,12 +442,32 @@ def write ():
for w in workers:
inq.put (None)
item = outq.get ()
+ if isinstance (item, Exception):
+ raise item
if item is not None:
pickle.dump (item, sys.stdout.buffer, pickle.HIGHEST_PROTOCOL)
assert outq.empty ()
+
+ statusq.put (None)
+ statusp.join ()
+
# and then we can kill them
for w in workers:
w.join ()
return 0
+import bz2, sys, json, subprocess
+from xml.etree.ElementTree import iterparse
+def extractMediawiki ():
+ parser = argparse.ArgumentParser(description='Extract raw wikitext from mediawiki dump.')
+ parser.add_argument('file', metavar='FILE', help='bzip2-compressed dump')
+ args = parser.parse_args()
+
+ with bz2.open (args.file, 'r') as fd:
+ for event, elem in iterparse (fd, ['start', 'end']):
+ if event == 'end' and elem.tag == '{http://www.mediawiki.org/xml/export-0.10/}text':
+ text = ''.join (elem.itertext ())
+ json.dump (text, sys.stdout, ensure_ascii=False)
+ sys.stdout.write ('\n')
+