diff options
Diffstat (limited to 'crocoite/irc.py')
-rw-r--r-- | crocoite/irc.py | 254 |
1 files changed, 176 insertions, 78 deletions
diff --git a/crocoite/irc.py b/crocoite/irc.py index 99485e4..d9c0634 100644 --- a/crocoite/irc.py +++ b/crocoite/irc.py @@ -22,16 +22,19 @@ IRC bot “chromebot” """ -import asyncio, argparse, uuid, json, tempfile +import asyncio, argparse, json, tempfile, time, random, os, shlex from datetime import datetime from urllib.parse import urlsplit -from enum import IntEnum, Enum +from enum import IntEnum, unique from collections import defaultdict from abc import abstractmethod from functools import wraps import bottom import websockets +from .util import StrJsonEncoder +from .cli import cookie + ### helper functions ### def prettyTimeDelta (seconds): """ @@ -53,7 +56,7 @@ def prettyBytes (b): while b >= 1024 and len (prefixes) > 1: b /= 1024 prefixes.pop (0) - return '{:.1f} {}'.format (b, prefixes[0]) + return f'{b:.1f} {prefixes[0]}' def isValidUrl (s): url = urlsplit (s) @@ -84,13 +87,45 @@ class Status(IntEnum): aborted = 3 finished = 4 +# see https://arxiv.org/html/0901.4016 on how to build proquints (human +# pronouncable unique ids) +toConsonant = 'bdfghjklmnprstvz' +toVowel = 'aiou' + +def u16ToQuint (v): + """ Transform a 16 bit unsigned integer into a single quint """ + assert 0 <= v < 2**16 + # quints are “big-endian” + return ''.join ([ + toConsonant[(v>>(4+2+4+2))&0xf], + toVowel[(v>>(4+2+4))&0x3], + toConsonant[(v>>(4+2))&0xf], + toVowel[(v>>4)&0x3], + toConsonant[(v>>0)&0xf], + ]) + +def uintToQuint (v, length=2): + """ Turn any integer into a proquint with fixed length """ + assert 0 <= v < 2**(length*16) + + return '-'.join (reversed ([u16ToQuint ((v>>(x*16))&0xffff) for x in range (length)])) + +def makeJobId (): + """ Create job id from time and randomness source """ + # allocate 48 bits for the time (in milliseconds) and add 16 random bits + # at the end (just to be sure) for a total of 64 bits. Should be enough to + # avoid collisions. + randbits = 16 + stamp = (int (time.time ()*1000) << randbits) | random.randint (0, 2**randbits-1) + return uintToQuint (stamp, 4) + class Job: """ Archival job """ __slots__ = ('id', 'stats', 'rstats', 'started', 'finished', 'nick', 'status', 'process', 'url') def __init__ (self, url, nick): - self.id = str (uuid.uuid4 ()) + self.id = makeJobId () self.stats = {} self.rstats = {} self.started = datetime.utcnow () @@ -104,32 +139,40 @@ class Job: def formatStatus (self): stats = self.stats rstats = self.rstats - return '{} ({}) {}. {} pages finished, {} pending; {} crashed, {} requests, {} failed, {} received.'.format ( - self.url, - self.id, - self.status.name, - rstats.get ('have', 0), - rstats.get ('pending', 0), - stats.get ('crashed', 0), - stats.get ('requests', 0), - stats.get ('failed', 0), - prettyBytes (stats.get ('bytesRcv', 0))) - -class NickMode(Enum): - operator = '@' - voice = '+' + return (f"{self.url} ({self.id}) {self.status.name}. " + f"{rstats.get ('have', 0)} pages finished, " + f"{rstats.get ('pending', 0)} pending; " + f"{stats.get ('crashed', 0)} crashed, " + f"{stats.get ('requests', 0)} requests, " + f"{stats.get ('failed', 0)} failed, " + f"{prettyBytes (stats.get ('bytesRcv', 0))} received.") + +@unique +class NickMode(IntEnum): + # the actual numbers don’t matter, but their order must be strictly + # increasing (with priviledge level) + operator = 100 + voice = 10 @classmethod def fromMode (cls, mode): return {'v': cls.voice, 'o': cls.operator}[mode] + @classmethod + def fromNickPrefix (cls, mode): + return {'@': cls.operator, '+': cls.voice}[mode] + + @property + def human (self): + return {self.operator: 'operator', self.voice: 'voice'}[self] + class User: """ IRC user """ __slots__ = ('name', 'modes') - def __init__ (self, name, modes=set ()): + def __init__ (self, name, modes=None): self.name = name - self.modes = modes + self.modes = modes or set () def __eq__ (self, b): return self.name == b.name @@ -138,15 +181,21 @@ class User: return hash (self.name) def __repr__ (self): - return '<User {} {}>'.format (self.name, self.modes) + return f'<User {self.name} {self.modes}>' + + def hasPriv (self, p): + if p is None: + return True + else: + return self.modes and max (self.modes) >= p @classmethod def fromName (cls, name): """ Get mode and name from NAMES command """ try: - modes = {NickMode(name[0])} + modes = {NickMode.fromNickPrefix (name[0])} name = name[1:] - except ValueError: + except KeyError: modes = set () return cls (name, modes) @@ -159,7 +208,8 @@ class ReplyContext: self.user = user def __call__ (self, message): - self.client.send ('PRIVMSG', target=self.target, message='{}: {}'.format (self.user.name, message)) + self.client.send ('PRIVMSG', target=self.target, + message=f'{self.user.name}: {message}') class RefCountEvent: """ @@ -200,9 +250,9 @@ class ArgparseBot (bottom.Client): __slots__ = ('channels', 'nick', 'parser', 'users', '_quit') - def __init__ (self, host, port, ssl, nick, logger, channels=[], loop=None): + def __init__ (self, host, port, ssl, nick, logger, channels=None, loop=None): super().__init__ (host=host, port=port, ssl=ssl, loop=loop) - self.channels = channels + self.channels = channels or [] self.nick = nick # map channel -> nick -> user self.users = defaultdict (dict) @@ -259,8 +309,13 @@ class ArgparseBot (bottom.Client): self.send ('JOIN', channel=c) # no need for NAMES here, server sends this automatically - async def onNameReply (self, target, channel_type, channel, users, **kwargs): - self.users[channel] = dict (map (lambda x: (x.name, x), map (User.fromName, users))) + async def onNameReply (self, channel, users, **kwargs): + # channels may be too big for a single message + addusers = dict (map (lambda x: (x.name, x), map (User.fromName, users))) + if channel not in self.users: + self.users[channel] = addusers + else: + self.users[channel].update (addusers) @staticmethod def parseMode (mode): @@ -274,7 +329,7 @@ class ArgparseBot (bottom.Client): ret.append ((action, c)) return ret - async def onMode (self, nick, user, host, channel, modes, params, **kwargs): + async def onMode (self, channel, modes, params, **kwargs): if channel not in self.channels: return @@ -290,7 +345,7 @@ class ArgparseBot (bottom.Client): # unknown mode, ignore pass - async def onPart (self, nick, user, host, message, channel, **kwargs): + async def onPart (self, nick, channel, **kwargs): if channel not in self.channels: return @@ -312,23 +367,27 @@ class ArgparseBot (bottom.Client): async def onMessage (self, nick, target, message, **kwargs): """ Message received """ - if target in self.channels and message.startswith (self.nick): + msgPrefix = self.nick + ':' + if target in self.channels and message.startswith (msgPrefix): user = self.users[target].get (nick, User (nick)) reply = ReplyContext (client=self, target=target, user=user) - # channel message that starts with our nick - command = message.split (' ')[1:] + # shlex.split supports quoting arguments, which str.split() does not + command = shlex.split (message[len (msgPrefix):]) try: args = self.parser.parse_args (command) except Exception as e: - reply ('{} -- {}'.format (e.args[1], e.args[0].format_usage ())) + reply (f'{e.args[1]} -- {e.args[0].format_usage ()}') return - if not args: - reply ('Sorry, I don’t understand {}'.format (command)) + if not args or not hasattr (args, 'func'): + reply (f'Sorry, I don’t understand {command}') return + minPriv = getattr (args, 'minPriv', None) if self._quit.armed and not getattr (args, 'allowOnShutdown', False): reply ('Sorry, I’m shutting down and cannot accept your request right now.') + elif not user.hasPriv (minPriv): + reply (f'Sorry, you need the privilege {minPriv.human} to use this command.') else: with self._quit: await args.func (user=user, args=args, reply=reply) @@ -336,23 +395,14 @@ class ArgparseBot (bottom.Client): async def onDisconnect (self, **kwargs): """ Auto-reconnect """ self.logger.info ('disconnect', uuid='4c74b2c8-2403-4921-879d-2279ad85db72') - if not self._quit.armed: - await asyncio.sleep (10, loop=self.loop) - self.logger.info ('reconnect', uuid='c53555cb-e1a4-4b69-b1c9-3320269c19d7') - await self.connect () - -def voice (func): - """ Calling user must have voice or ops """ - @wraps (func) - async def inner (self, *args, **kwargs): - user = kwargs.get ('user') - reply = kwargs.get ('reply') - if not user.modes.intersection ({NickMode.operator, NickMode.voice}): - reply ('Sorry, you must have voice to use this command.') - else: - ret = await func (self, *args, **kwargs) - return ret - return inner + while True: + if not self._quit.armed: + await asyncio.sleep (10, loop=self.loop) + self.logger.info ('reconnect', uuid='c53555cb-e1a4-4b69-b1c9-3320269c19d7') + try: + await self.connect () + finally: + break def jobExists (func): """ Chromebot job exists """ @@ -363,38 +413,45 @@ def jobExists (func): reply = kwargs.get ('reply') j = self.jobs.get (args.id, None) if not j: - reply ('Job {} is unknown'.format (args.id)) + reply (f'Job {args.id} is unknown') else: ret = await func (self, job=j, **kwargs) return ret return inner class Chromebot (ArgparseBot): - __slots__ = ('jobs', 'tempdir', 'destdir', 'processLimit') + __slots__ = ('jobs', 'tempdir', 'destdir', 'processLimit', 'blacklist', 'needVoice') + + def __init__ (self, host, port, ssl, nick, logger, channels=None, + tempdir=None, destdir='.', processLimit=1, + blacklist={}, needVoice=False, loop=None): + self.needVoice = needVoice - def __init__ (self, host, port, ssl, nick, logger, channels=[], - tempdir=tempfile.gettempdir(), destdir='.', processLimit=1, - loop=None): super().__init__ (host=host, port=port, ssl=ssl, nick=nick, logger=logger, channels=channels, loop=loop) self.jobs = {} - self.tempdir = tempdir + self.tempdir = tempdir or tempfile.gettempdir() self.destdir = destdir self.processLimit = asyncio.Semaphore (processLimit) + self.blacklist = blacklist def getParser (self): parser = NonExitingArgumentParser (prog=self.nick + ': ', add_help=False) subparsers = parser.add_subparsers(help='Sub-commands') archiveparser = subparsers.add_parser('a', help='Archive a site', add_help=False) - #archiveparser.add_argument('--timeout', default=1*60*60, type=int, help='Maximum time for archival', metavar='SEC', choices=[60, 1*60*60, 2*60*60]) - #archiveparser.add_argument('--idle-timeout', default=10, type=int, help='Maximum idle seconds (i.e. no requests)', dest='idleTimeout', metavar='SEC', choices=[1, 10, 20, 30, 60]) - #archiveparser.add_argument('--max-body-size', default=None, type=int, dest='maxBodySize', help='Max body size', metavar='BYTES', choices=[1*1024*1024, 10*1024*1024, 100*1024*1024]) archiveparser.add_argument('--concurrency', '-j', default=1, type=int, help='Parallel workers for this job', choices=range (1, 5)) archiveparser.add_argument('--recursive', '-r', help='Enable recursion', choices=['0', '1', 'prefix'], default='0') + archiveparser.add_argument('--insecure', '-k', + help='Disable certificate checking', action='store_true') + # parsing the cookie here, so we can give an early feedback without + # waiting for the job to crash on invalid arguments. + archiveparser.add_argument('--cookie', '-b', type=cookie, + help='Add a cookie', action='append', default=[]) archiveparser.add_argument('url', help='Website URL', type=isValidUrl, metavar='URL') - archiveparser.set_defaults (func=self.handleArchive) + archiveparser.set_defaults (func=self.handleArchive, + minPriv=NickMode.voice if self.needVoice else None) statusparser = subparsers.add_parser ('s', help='Get job status', add_help=False) statusparser.add_argument('id', help='Job id', metavar='UUID') @@ -402,31 +459,70 @@ class Chromebot (ArgparseBot): abortparser = subparsers.add_parser ('r', help='Revoke/abort job', add_help=False) abortparser.add_argument('id', help='Job id', metavar='UUID') - abortparser.set_defaults (func=self.handleAbort, allowOnShutdown=True) + abortparser.set_defaults (func=self.handleAbort, allowOnShutdown=True, + minPriv=NickMode.voice if self.needVoice else None) return parser - @voice + def isBlacklisted (self, url): + for k, v in self.blacklist.items(): + if k.match (url): + return v + return False + async def handleArchive (self, user, args, reply): """ Handle the archive command """ - j = Job (args.url, user.name) - assert j.id not in self.jobs, 'duplicate job id' + msg = self.isBlacklisted (args.url) + if msg: + reply (f'{args.url} cannot be queued: {msg}') + return + + # make sure the job id is unique. Since ids are time-based we can just + # wait. + while True: + j = Job (args.url, user.name) + if j.id not in self.jobs: + break + time.sleep (0.01) self.jobs[j.id] = j logger = self.logger.bind (job=j.id) - cmdline = ['crocoite-recursive', args.url, '--tempdir', self.tempdir, - '--prefix', j.id + '-{host}-{date}-', '--policy', - args.recursive, '--concurrency', str (args.concurrency), - self.destdir] - showargs = { 'recursive': args.recursive, 'concurrency': args.concurrency, } + if args.insecure: + showargs['insecure'] = args.insecure + warcinfo = {'chromebot': { + 'jobid': j.id, + 'user': user.name, + 'queued': j.started, + 'url': args.url, + 'recursive': args.recursive, + 'concurrency': args.concurrency, + }} + grabCmd = ['crocoite-single'] + # prefix warcinfo with !, so it won’t get expanded + grabCmd.extend (['--warcinfo', + '!' + json.dumps (warcinfo, cls=StrJsonEncoder)]) + for v in args.cookie: + grabCmd.extend (['--cookie', v.OutputString ()]) + if args.insecure: + grabCmd.append ('--insecure') + grabCmd.extend (['{url}', '{dest}']) + cmdline = ['crocoite', + '--tempdir', self.tempdir, + '--recursion', args.recursive, + '--concurrency', str (args.concurrency), + args.url, + os.path.join (self.destdir, + j.id + '-{host}-{date}-{seqnum}.warc.gz'), + '--'] + grabCmd + strargs = ', '.join (map (lambda x: '{}={}'.format (*x), showargs.items ())) - reply ('{} has been queued as {} with {}'.format (args.url, j.id, strargs)) + reply (f'{args.url} has been queued as {j.id} with {strargs}') logger.info ('queue', user=user.name, url=args.url, cmdline=cmdline, uuid='36cc34a6-061b-4cc5-84a9-4ab6552c8d75') @@ -437,7 +533,7 @@ class Chromebot (ArgparseBot): stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, stdin=asyncio.subprocess.DEVNULL, - start_new_session=True) + start_new_session=True, limit=100*1024*1024) while True: data = await j.process.stdout.readline () if not data: @@ -477,7 +573,6 @@ class Chromebot (ArgparseBot): rstats = job.rstats reply (job.formatStatus ()) - @voice @jobExists async def handleAbort (self, user, args, reply, job): """ Handle abort command """ @@ -541,7 +636,11 @@ class Dashboard: if not buf: return - data = json.loads (buf) + try: + data = json.loads (buf) + except json.decoder.JSONDecodeError: + # ignore invalid + return msgid = data['uuid'] if msgid in self.ignoreMsgid: @@ -554,9 +653,8 @@ class Dashboard: elif msgid == '5c0f9a11-dcd8-4182-a60f-54f4d3ab3687': nesteddata = data['data'] nestedmsgid = nesteddata['uuid'] - if nestedmsgid == '1680f384-744c-4b8a-815b-7346e632e8db': + if nestedmsgid == 'd1288fbe-8bae-42c8-af8c-f2fa8b41794f': del nesteddata['command'] - del nesteddata['destfile'] buf = json.dumps (data) for c in self.clients: |