diff options
-rw-r--r-- | index.rst | 1 | ||||
-rw-r--r-- | internals.rst | 6 | ||||
-rwxr-xr-x | tools/convertFileDs.py | 167 | ||||
-rw-r--r-- | tools/eumel.py | 145 | ||||
-rwxr-xr-x | tools/extractAll.sh | 21 | ||||
-rwxr-xr-x | tools/extractArchive.py | 104 | ||||
-rwxr-xr-x | tools/linearizeDisk.py | 49 |
7 files changed, 493 insertions, 0 deletions
@@ -88,6 +88,7 @@ __ disks/grundpaket/03_eumel0.img .. _qemu: http://www.qemu.org/ .. include:: hardware.rst +.. include:: internals.rst .. include:: artifacts.rst Bibliography diff --git a/internals.rst b/internals.rst new file mode 100644 index 0000000..98658de --- /dev/null +++ b/internals.rst @@ -0,0 +1,6 @@ +Internals +--------- + +For the disk archive format see `<tools/extractArchive.py>`_ and EUMEL packet *basic archive* [source86]_. + +Documentation for the dataspace FILE can be found in `<tools/convertFileDs.py>`_ and EUMEL packet *file handling*. diff --git a/tools/convertFileDs.py b/tools/convertFileDs.py new file mode 100755 index 0000000..c4037db --- /dev/null +++ b/tools/convertFileDs.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python3 + +""" +Convert EUMEL FILE dataspace into a plain text file. + +Since there are no “files” in EUMEL we’re dealing with the editor’s in-memory +datastructure here. See EUMEL packet “file handling”. +""" + +import struct, copy +from collections import namedtuple +from eumel import Dataspace, DataspaceTypeMismatch + +Segment = namedtuple ('Segment', ['succ', 'pred', 'end']) +Sequence = namedtuple ('Sequence', ['index', 'segmentbegin', 'segmentend', 'lineno', 'lines']) +Atom = namedtuple ('Atom', ['seg', 'type', 'line']) + +class Chain: + """ + A chain is a cyclic datastructure, pointing to segments. Segments contain + one or more rows, which in turn reference a single line’s text. + """ + def __init__ (self, sequence, rows): + self.lineno = sequence.lineno + # current atom + self.pos = sequence.index + # current segment + self.segpos = sequence.segmentbegin + self.rows = rows + + def next (self): + atom = self.rows[self.segpos] + if self.pos == atom.seg.end: + # move to next segment + self.pos = atom.seg.succ + self.segpos = atom.seg.succ + else: + # just use the next atom in this segment + self.pos += 1 + self.lineno += 1 + + def prev (self): + # backwards is a little more involved: seg.pred points to the *first* segment row + logging.debug ('prev at pos {} seg {} line {}'.format (self.pos, self.segpos, self.lineno)) + if self.pos == self.segpos: + # get previous segment + atom = self.rows[self.segpos] + self.segpos = atom.seg.pred + atom = self.rows[self.segpos] + self.pos = atom.seg.end + else: + self.pos -= 1 + self.lineno -= 1 + + def first (self): + """ + Seek to first line + """ + while self.lineno > 1: + self.prev () + + @property + def atom (self): + """ + Get atom at current position + """ + return self.rows[self.pos] + +class FileDataspace (Dataspace): + """ + EUMEL’s FILE datatype + """ + + TYPE = 1003 + + def __init__ (self, fd): + Dataspace.__init__ (self, fd) + + # header of the BOUND LIST (aka TYPE FILE) + self.used = self.parseSequence () + self.parseInt (2) + self.parseSequence () + self.parseSequence () + self.parseInt (7) + assert self.fd.tell () == 0x38 + + rows = self.parseRows () + + self.parseHeap () + + self.text = self.reconstructText (rows) + + def parseSegment (self): + return Segment (*self.parseInt (3)) + + def parseSequence (self): + return Sequence (*self.parseInt (5)) + + def parseRows (self): + rows = [] + # read lines + while True: + # check data + data = self.fd.read (24) + if data == 24*b'\xff': + break + self.skip (-24) + # and parse it + seg = self.parseSegment () + rowtype = self.parseInt () + text = self.parseText () + rows.append (Atom (seg, rowtype, text)) + logging.debug ('got row {} {}'.format (len (rows)-1, rows[-1])) + return rows + + def reconstructText (self, rows): + # XXX: use + logging.debug ('Used first {}, last {}, starts at line {}, {} lines in total'.format (self.used.segmentbegin, self.used.segmentend, self.used.lineno, self.used.lines)) + chain = Chain (self.used, rows) + chain.first () + firstrow = chain.pos + lines = [] + visited = set () + while True: + if chain.pos in visited: + logging.warning ('Row {} already has been used'.format (chain.pos)) + visited.add (chain.pos) + + r = chain.atom + lbytes = bytes (r.line) + lbytesStripped = lbytes.rstrip (b'\xff') + if len (lbytes) != len (lbytesStripped): + logging.warning ('Line {} length incorrect. Is {}, should be {}, fixing. {}'.format (chain.lineno, r.line.length, len (lbytesStripped), lbytes)) + lbytes = lbytesStripped + lines.append (lbytes) + chain.next () + + # chains are cyclic + if chain.pos == firstrow: + break + return codecs.decode (b'\n'.join (lines), 'eumel', 'replace') + +if __name__ == '__main__': + import sys, os, codecs, logging + import argparse, sys + + parser = argparse.ArgumentParser(description='Convert EUMEL FILE dataspace into plain text file.') + parser.add_argument ('-v', '--verbose', help='Enable debugging messages', action='store_true') + parser.add_argument ('file', help='Input file') + args = parser.parse_args () + + if args.verbose: + logging.basicConfig (level=logging.DEBUG) + else: + logging.basicConfig (level=logging.WARNING) + + with open (args.file, 'rb') as fd: + try: + ds = FileDataspace (fd) + linecount = len (ds.text.splitlines ()) + if linecount != ds.used.lines: + logging.warning ('Got {} lines, but should have been {}'.format (linecount, ds.used.lines)) + print (ds.text) + except DataspaceTypeMismatch: + logging.error ('Not a text file, cannot convert') + sys.exit (1) + diff --git a/tools/eumel.py b/tools/eumel.py new file mode 100644 index 0000000..0434b35 --- /dev/null +++ b/tools/eumel.py @@ -0,0 +1,145 @@ +""" +EUMEL utility functions, including: + +""" + +import logging +import codecs + +# EUMEL character map. See “Benutzerhandbuch 1.7”, page 107. +# map eumel character to unicode codepoint +eumel2unicodemap = dict ( + [(10, '\n'), (13, '\r')] + + # first part is same as ascii + [(i, chr (i)) for i in range (32, 126)] + + [(126, '~')] + + [(214, 'Ä'), (215, 'Ö'), (216, 'Ü'), (217, 'ä'), (218, 'ö'), (219, 'ü'), (220, 'k'), (221, '-'), (222, '#'), (223, ' ')] + + [(251, 'ß')]) + +def decode (input, errors='strict'): + ret = [] + pos = 0 + for pos in range (len (input)): + c = input[pos] + m = eumel2unicodemap.get (c, None) + if m: + ret.append (m) + else: + if errors == 'strict': + raise UnicodeError ('unknown char {}'.format (c)) + elif errors == 'ignore': + pass + elif errors == 'replace': + ret.append ('\uFFFD') + else: + break + return (''.join (ret), pos) + +def lookup (name): + if name == 'eumel': + return codecs.CodecInfo(None, decode) + return None + +codecs.register (lookup) + +# Dataspace utilities +import struct, os + +class DataspaceTypeMismatch (ValueError): + pass + +class Dataspace: + # Expected type + TYPE = None + + def __init__ (self, fd): + self.fd = fd + self.lastaddr, self.firstaddr, self.type, _ = self._parseHeader () + if self.TYPE is not None and self.type != self.TYPE: + raise DataspaceTypeMismatch () + self.heap = {} + + def _parseHeader (self): + """ + :return: (last heap address, first heap address, dataspace type, unknown) + """ + buf = self.fd.read (8) + return struct.unpack ('<HHHH', buf) + + def parseText (self): + """ + Parse TEXT datatype, which can either be embedded (up to 13? chars) or in the heap (i.e. address) + """ + buf = self.fd.read (16) + address, length = struct.unpack ('<HB', buf[:3]) + if length <= 13: + r = buf[3:3+length] + else: + length, = struct.unpack ('<H', buf[3:5]) + r = HeapReference (self.heap, address, length) + return r + + def parseInt (self, count=1): + if count == 1: + return struct.unpack ('<H', self.fd.read (1*intsize))[0] + else: + return [self.parseInt () for i in range (count)] + + def parseHeap (self): + heapaddr = self.firstaddr + maxaddr = 2**(intsize*8)-1 + while True: + head = self.fd.read (2) + # XXX: not sure how to find its offset + if head == b'\xff\xff': + continue + if not head or len (head) < 2: + break + length, = struct.unpack ('<H', head) + self.heap[heapaddr] = self.fd.read (length) + logging.debug ('got heap entry {:x} = ({}) {}'.format (heapaddr, length, self.heap[heapaddr])) + heapaddr = (heapaddr+2+length) % maxaddr + + def skip (self, n): + self.fd.seek (n, os.SEEK_CUR) + + def seek (self, pos): + self.fd.seek (pos, os.SEEK_SET) + +class HeapReference: + def __init__ (self, heap, address, length): + self.heap = heap + self.address = address + self.length = length + self._item = None + + def __bytes__ (self): + return self.item[:self.length] + + def __len__ (self): + return self.length + + def __getitem__ (self, key): + return self.item[key] + + def __repr__ (self): + return '<HeapReference to {:x} length {}>'.format (self.address, self.length) + + @property + def item (self): + if self._item: + return self._item + elif self.address in self.heap: + self._item = self.heap[self.address] + return self._item + else: + raise HeapReferenceUnresolved (self.address, self.length) + +class HeapReferenceUnresolved (Exception): + def __init__ (self, address, length): + Exception.__init__ (self, 'addr: {:x}, len: {}'.format (address, length)) + +# Machine constants +intsize = 2 +pagesize = 512 + diff --git a/tools/extractAll.sh b/tools/extractAll.sh new file mode 100755 index 0000000..6139475 --- /dev/null +++ b/tools/extractAll.sh @@ -0,0 +1,21 @@ +#!/bin/sh + +root=`dirname "$0"` +root=`realpath "$root"` + +while read -r F; do + base=`basename "$F"` + linear=`mktemp` + destdir="${base}.extracted" + echo "Extracting $F to $destdir" + $root/linearizeDisk.py "$F" "$linear" + $root/extractArchive.py -o "$destdir" "$linear" + pushd "$destdir" || continue + for G in ./*; do + echo "Converting $G to ${G}.txt" + $root/convertFileDs.py "$G" > "${G}.txt" || rm "${G}.txt" + done + popd + rm "$linear" +done + diff --git a/tools/extractArchive.py b/tools/extractArchive.py new file mode 100755 index 0000000..2e66879 --- /dev/null +++ b/tools/extractArchive.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 + +""" +Extract linearized (see linearizeDisk.py) EUMEL archive disk. +""" + +import struct, sys, io, logging +import codecs +from eumel import Dataspace + +def take (it, n): + for i in range (n): + yield next (it) + +def parseEntry (blocks): + while True: + header = next (blocks) + unknown1, unknown2, length, unknown3 = struct.unpack ('<HHHH', header[:8]) + logging.debug ('Got dataspace with {} blocks'.format (length)) + yield b''.join (take (blocks, length)) + +def readBlocks (fd): + while True: + buf = fd.read (512) + if not buf: + break + yield buf + +class FileHeaderDataspace (Dataspace): + TYPE = 0 + + def __init__ (self, fd): + Dataspace.__init__ (self, fd) + self.name = self.parseText () + self.mtime = self.parseText () + self.seek (0x40) + self.parseHeap () + +if __name__ == '__main__': + import argparse, sys, codecs, os + from datetime import datetime + from io import BytesIO + from eumel import pagesize + + parser = argparse.ArgumentParser(description='Extract EUMEL disk archive.') + parser.add_argument ('-f', '--force', help='Overwrite existing files', action='store_true') + parser.add_argument ('-o', '--output', help='Output directory, defaults to archive name') + parser.add_argument ('-v', '--verbose', help='Enable debugging messages', action='store_true') + parser.add_argument ('file', help='Input file') + args = parser.parse_args () + + if args.verbose: + logging.basicConfig (level=logging.DEBUG) + else: + logging.basicConfig (level=logging.INFO) + + with open (args.file, 'rb') as infd: + entries = parseEntry (readBlocks (infd)) + + # first entry is always disk info + diskinfo = FileHeaderDataspace (BytesIO (next (entries))) + if not args.output: + args.output = codecs.decode (diskinfo.name, 'eumel', 'replace') + logging.debug ('Using disk name {} as output directory'.format (args.output)) + + # create output dir + try: + os.makedirs (args.output) + except FileExistsError: + pass + + while True: + # file header dataspace + fileheader = FileHeaderDataspace (BytesIO (next (entries))) + filename = codecs.decode (fileheader.name, 'eumel', 'replace').replace ('/', '-') + if len (filename) == 0: + logging.debug ('Filename was empty, i.e. last item in archive. I’m done') + break + try: + mtime = datetime.strptime (codecs.decode (fileheader.mtime, 'eumel', 'replace'), '%d.%m.%y') + except ValueError as e: + logging.warning ('Cannot parse date of file {}, {}'.format (filename, e)) + mtime = datetime.now () + logging.debug ('Got file {}, last modified {}'.format (filename, mtime)) + + # actual file contents + e = next (entries) + + # quirks: if the first page starts with a magic sequence, skip it. + # Not sure what it is used for. + if e.startswith (2*b'\x30\x00\x00\x00'): + logging.debug ('skipping quirks') + e = e[pagesize:] + + outfile = os.path.join (args.output, filename) + if os.path.exists (outfile) and not args.force: + logging.info ('File {} exists, skipping'.format (outfile)) + continue + logging.info ('Extracting {} bytes to file {}'.format (len (e), outfile)) + with open (outfile, 'wb') as outfd: + outfd.write (e) + stamp = mtime.timestamp () + os.utime (outfile, (stamp, stamp)) + diff --git a/tools/linearizeDisk.py b/tools/linearizeDisk.py new file mode 100755 index 0000000..55f4b06 --- /dev/null +++ b/tools/linearizeDisk.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python3 + +""" +For some reason blocks in the bitsavers images are not in linear order, but +shuffled. Not sure why and if other disks are affected as well, but this script +reorders them. +""" + +import os, logging +from itertools import chain + +def linearBlocks (fd): + fd.seek (0, os.SEEK_END) + size = fd.tell () + logging.debug ('File size is {} bytes'.format (size)) + + blockSize = 512 + blocksPerChunk = 15 + chunkSize = blockSize*blocksPerChunk + chunks = size//chunkSize + skip = 1 + if size%chunkSize != 0: + logging.warning ('File size {} is not multiple of chunk size {}'.format (size, chunkSize)) + + # first even then odd chunks + for j in chain (range (0, chunks, 2), range (1, chunks, 2)): + pos = j*chunkSize + logging.debug ('Seeking to {} for chunk {} and reading {} blocks @ {} bytes'.format (pos, j, blocksPerChunk, blockSize)) + fd.seek (pos, os.SEEK_SET) + for i in range (blocksPerChunk): + yield fd.read (blockSize) + +if __name__ == '__main__': + import argparse, sys + + parser = argparse.ArgumentParser(description='Reorder EUMEL archive disk’s blocks.') + parser.add_argument ('-v', '--verbose', help='Enable debugging messages', action='store_true') + parser.add_argument ('input', help='Input file') + parser.add_argument ('output', help='Out file') + args = parser.parse_args () + if args.verbose: + logging.basicConfig (level=logging.DEBUG) + else: + logging.basicConfig (level=logging.WARNING) + + with open (args.input, 'rb') as infd, open (args.output, 'wb') as outfd: + for b in linearBlocks (infd): + outfd.write (b) + |