# Copyright (c) 2017–2018 crocoite contributors # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. """ IRC bot “chromebot” """ import asyncio, argparse, uuid, json, tempfile from datetime import datetime from urllib.parse import urlsplit from enum import IntEnum, Enum from collections import defaultdict from abc import abstractmethod from functools import wraps import bottom import websockets ### helper functions ### def prettyTimeDelta (seconds): """ Pretty-print seconds to human readable string 1d 1h 1m 1s """ seconds = int(seconds) days, seconds = divmod(seconds, 86400) hours, seconds = divmod(seconds, 3600) minutes, seconds = divmod(seconds, 60) s = [(days, 'd'), (hours, 'h'), (minutes, 'm'), (seconds, 's')] s = filter (lambda x: x[0] != 0, s) return ' '.join (map (lambda x: '{}{}'.format (*x), s)) def prettyBytes (b): """ Pretty-print bytes """ prefixes = ['B', 'KiB', 'MiB', 'GiB', 'TiB'] while b >= 1024 and len (prefixes) > 1: b /= 1024 prefixes.pop (0) return '{:.1f} {}'.format (b, prefixes[0]) def isValidUrl (s): url = urlsplit (s) if url.scheme and url.netloc and url.scheme in {'http', 'https'}: return s raise TypeError () class NonExitingArgumentParser (argparse.ArgumentParser): """ Argument parser that does not call exit(), suitable for interactive use """ def exit (self, status=0, message=None): # should never be called pass def error (self, message): # if we use subparsers it’s important to return self, so we can show # the correct help raise Exception (self, message) def format_usage (self): return super().format_usage ().replace ('\n', ' ') class Status(IntEnum): """ Job status """ undefined = 0 pending = 1 running = 2 aborted = 3 finished = 4 class Job: """ Archival job """ __slots__ = ('id', 'stats', 'rstats', 'started', 'finished', 'nick', 'status', 'process', 'url') def __init__ (self, url, nick): self.id = str (uuid.uuid4 ()) self.stats = {} self.rstats = {} self.started = datetime.utcnow () self.finished = None self.url = url # user who scheduled this job self.nick = nick self.status = Status.pending self.process = None def formatStatus (self): stats = self.stats rstats = self.rstats return '{} ({}) {}. {} pages finished, {} pending; {} crashed, {} requests, {} failed, {} received.'.format ( self.url, self.id, self.status.name, rstats.get ('have', 0), rstats.get ('pending', 0), stats.get ('crashed', 0), stats.get ('requests', 0), stats.get ('failed', 0), prettyBytes (stats.get ('bytesRcv', 0))) class NickMode(Enum): operator = '@' voice = '+' @classmethod def fromMode (cls, mode): return {'v': cls.voice, 'o': cls.operator}[mode] class User: """ IRC user """ __slots__ = ('name', 'modes') def __init__ (self, name, modes=set ()): self.name = name self.modes = modes def __eq__ (self, b): return self.name == b.name def __hash__ (self): return hash (self.name) def __repr__ (self): return ''.format (self.name, self.modes) @classmethod def fromName (cls, name): """ Get mode and name from NAMES command """ try: modes = {NickMode(name[0])} name = name[1:] except ValueError: modes = set () return cls (name, modes) class ReplyContext: __slots__ = ('client', 'target', 'user') def __init__ (self, client, target, user): self.client = client self.target = target self.user = user def __call__ (self, message): self.client.send ('PRIVMSG', target=self.target, message='{}: {}'.format (self.user.name, message)) class RefCountEvent: """ Ref-counted event that triggers if a) armed and b) refcount drops to zero. Must be used as a context manager. """ __slots__ = ('count', 'event', 'armed') def __init__ (self): self.armed = False self.count = 0 self.event = asyncio.Event () def __enter__ (self): self.count += 1 self.event.clear () def __exit__ (self, exc_type, exc_val, exc_tb): self.count -= 1 if self.armed and self.count == 0: self.event.set () async def wait (self): await self.event.wait () def arm (self): self.armed = True if self.count == 0: self.event.set () class ArgparseBot (bottom.Client): """ Simple IRC bot using argparse Tracks user’s modes, reconnects on disconnect """ __slots__ = ('channels', 'nick', 'parser', 'users', '_quit') def __init__ (self, host, port, ssl, nick, logger, channels=[], loop=None): super().__init__ (host=host, port=port, ssl=ssl, loop=loop) self.channels = channels self.nick = nick # map channel -> nick -> user self.users = defaultdict (dict) self.logger = logger.bind (context=type (self).__name__) self.parser = self.getParser () # bot does not accept new queries in shutdown mode, unless explicitly # permitted by the parser self._quit = RefCountEvent () # register bottom event handler self.on('CLIENT_CONNECT', self.onConnect) self.on('PING', self.onKeepalive) self.on('PRIVMSG', self.onMessage) self.on('CLIENT_DISCONNECT', self.onDisconnect) self.on('RPL_NAMREPLY', self.onNameReply) self.on('CHANNELMODE', self.onMode) self.on('PART', self.onPart) self.on('JOIN', self.onJoin) # XXX: we would like to handle KICK, but bottom does not support that at the moment @abstractmethod def getParser (self): pass def cancel (self): self.logger.info ('cancel', uuid='1eb34aea-a854-4fec-90b2-7f8a3812a9cd') self._quit.arm () async def run (self): await self.connect () await self._quit.wait () self.send ('QUIT', message='Bye.') await self.disconnect () async def onConnect (self, **kwargs): self.logger.info ('connect', nick=self.nick, uuid='01f7b138-ea53-4609-88e9-61f3eca3e7e7') self.send('NICK', nick=self.nick) self.send('USER', user=self.nick, realname='https://github.com/PromyLOPh/crocoite') # Don't try to join channels until the server has # sent the MOTD, or signaled that there's no MOTD. done, pending = await asyncio.wait( [self.wait('RPL_ENDOFMOTD'), self.wait('ERR_NOMOTD')], loop=self.loop, return_when=asyncio.FIRST_COMPLETED) # Cancel whichever waiter's event didn't come in. for future in pending: future.cancel() for c in self.channels: self.logger.info ('join', channel=c, uuid='367063a5-9069-4025-907c-65ba88af8593') self.send ('JOIN', channel=c) # no need for NAMES here, server sends this automatically async def onNameReply (self, target, channel_type, channel, users, **kwargs): self.users[channel] = dict (map (lambda x: (x.name, x), map (User.fromName, users))) @staticmethod def parseMode (mode): """ Parse mode strings like +a, -b, +a-b, -b+a, … """ action = '+' ret = [] for c in mode: if c in {'+', '-'}: action = c else: ret.append ((action, c)) return ret async def onMode (self, nick, user, host, channel, modes, params, **kwargs): if channel not in self.channels: return for (action, mode), nick in zip (self.parseMode (modes), params): try: m = NickMode.fromMode (mode) u = self.users[channel].get (nick, User (nick)) if action == '+': u.modes.add (m) elif action == '-': u.modes.remove (m) except KeyError: # unknown mode, ignore pass async def onPart (self, nick, user, host, message, channel, **kwargs): if channel not in self.channels: return try: self.users[channel].pop (nick) except KeyError: # gone already pass async def onJoin (self, nick, channel, **kwargs): if channel not in self.channels: return self.users[channel][nick] = User (nick) async def onKeepalive (self, message, **kwargs): """ Ping received """ self.send('PONG', message=message) async def onMessage (self, nick, target, message, **kwargs): """ Message received """ if target in self.channels and message.startswith (self.nick): user = self.users[target].get (nick, User (nick)) reply = ReplyContext (client=self, target=target, user=user) # channel message that starts with our nick command = message.split (' ')[1:] try: args = self.parser.parse_args (command) except Exception as e: reply ('{} -- {}'.format (e.args[1], e.args[0].format_usage ())) return if not args: reply ('Sorry, I don’t understand {}'.format (command)) return if self._quit.armed and not getattr (args, 'allowOnShutdown', False): reply ('Sorry, I’m shutting down and cannot accept your request right now.') else: with self._quit: await args.func (user=user, args=args, reply=reply) async def onDisconnect (self, **kwargs): """ Auto-reconnect """ self.logger.info ('disconnect', uuid='4c74b2c8-2403-4921-879d-2279ad85db72') if not self._quit.armed: await asyncio.sleep (10, loop=self.loop) self.logger.info ('reconnect', uuid='c53555cb-e1a4-4b69-b1c9-3320269c19d7') await self.connect () def voice (func): """ Calling user must have voice or ops """ @wraps (func) async def inner (self, *args, **kwargs): user = kwargs.get ('user') reply = kwargs.get ('reply') if not user.modes.intersection ({NickMode.operator, NickMode.voice}): reply ('Sorry, you must have voice to use this command.') else: ret = await func (self, *args, **kwargs) return ret return inner def jobExists (func): """ Chromebot job exists """ @wraps (func) async def inner (self, **kwargs): # XXX: not sure why it works with **kwargs, but not (user, args, reply) args = kwargs.get ('args') reply = kwargs.get ('reply') j = self.jobs.get (args.id, None) if not j: reply ('Job {} is unknown'.format (args.id)) else: ret = await func (self, job=j, **kwargs) return ret return inner class Chromebot (ArgparseBot): __slots__ = ('jobs', 'tempdir', 'destdir', 'processLimit') def __init__ (self, host, port, ssl, nick, logger, channels=[], tempdir=tempfile.gettempdir(), destdir='.', processLimit=1, loop=None): super().__init__ (host=host, port=port, ssl=ssl, nick=nick, logger=logger, channels=channels, loop=loop) self.jobs = {} self.tempdir = tempdir self.destdir = destdir self.processLimit = asyncio.Semaphore (processLimit) def getParser (self): parser = NonExitingArgumentParser (prog=self.nick + ': ', add_help=False) subparsers = parser.add_subparsers(help='Sub-commands') archiveparser = subparsers.add_parser('a', help='Archive a site', add_help=False) #archiveparser.add_argument('--timeout', default=1*60*60, type=int, help='Maximum time for archival', metavar='SEC', choices=[60, 1*60*60, 2*60*60]) #archiveparser.add_argument('--idle-timeout', default=10, type=int, help='Maximum idle seconds (i.e. no requests)', dest='idleTimeout', metavar='SEC', choices=[1, 10, 20, 30, 60]) #archiveparser.add_argument('--max-body-size', default=None, type=int, dest='maxBodySize', help='Max body size', metavar='BYTES', choices=[1*1024*1024, 10*1024*1024, 100*1024*1024]) archiveparser.add_argument('--concurrency', '-j', default=1, type=int, help='Parallel workers for this job', choices=range (1, 5)) archiveparser.add_argument('--recursive', '-r', help='Enable recursion', choices=['0', '1', 'prefix'], default='0') archiveparser.add_argument('url', help='Website URL', type=isValidUrl, metavar='URL') archiveparser.set_defaults (func=self.handleArchive) statusparser = subparsers.add_parser ('s', help='Get job status', add_help=False) statusparser.add_argument('id', help='Job id', metavar='UUID') statusparser.set_defaults (func=self.handleStatus, allowOnShutdown=True) abortparser = subparsers.add_parser ('r', help='Revoke/abort job', add_help=False) abortparser.add_argument('id', help='Job id', metavar='UUID') abortparser.set_defaults (func=self.handleAbort, allowOnShutdown=True) return parser @voice async def handleArchive (self, user, args, reply): """ Handle the archive command """ j = Job (args.url, user.name) assert j.id not in self.jobs, 'duplicate job id' self.jobs[j.id] = j logger = self.logger.bind (job=j.id) cmdline = ['crocoite-recursive', args.url, '--tempdir', self.tempdir, '--prefix', j.id + '-{host}-{date}-', '--policy', args.recursive, '--concurrency', str (args.concurrency), self.destdir] showargs = { 'recursive': args.recursive, 'concurrency': args.concurrency, } strargs = ', '.join (map (lambda x: '{}={}'.format (*x), showargs.items ())) reply ('{} has been queued as {} with {}'.format (args.url, j.id, strargs)) logger.info ('queue', user=user.name, url=args.url, cmdline=cmdline, uuid='36cc34a6-061b-4cc5-84a9-4ab6552c8d75') async with self.processLimit: if j.status == Status.pending: # job was not aborted j.process = await asyncio.create_subprocess_exec (*cmdline, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, stdin=asyncio.subprocess.DEVNULL, start_new_session=True) while True: data = await j.process.stdout.readline () if not data: break # job is marked running after the first message is received from it if j.status == Status.pending: logger.info ('start', uuid='46e62d60-f498-4ab0-90e1-d08a073b10fb') j.status = Status.running data = json.loads (data) msgid = data.get ('uuid') if msgid == '24d92d16-770e-4088-b769-4020e127a7ff': j.stats = data elif msgid == '5b8498e4-868d-413c-a67e-004516b8452c': j.rstats = data # forward message, so the dashboard can use it logger.info ('message', uuid='5c0f9a11-dcd8-4182-a60f-54f4d3ab3687', data=data) code = await j.process.wait () if j.status == Status.running: logger.info ('finish', uuid='7b40ffbb-faab-4224-90ed-cd4febd8f7ec') j.status = Status.finished j.finished = datetime.utcnow () stats = j.stats rstats = j.rstats reply (j.formatStatus ()) @jobExists async def handleStatus (self, user, args, reply, job): """ Handle status command """ rstats = job.rstats reply (job.formatStatus ()) @voice @jobExists async def handleAbort (self, user, args, reply, job): """ Handle abort command """ if job.status not in {Status.pending, Status.running}: reply ('This job is not running.') return job.status = Status.aborted self.logger.info ('abort', job=job.id, user=user.name, uuid='865b3b3e-a54a-4a56-a545-f38a37bac295') if job.process and job.process.returncode is None: job.process.terminate () class Dashboard: __slots__ = ('fd', 'clients', 'loop', 'log', 'maxLog', 'pingInterval', 'pingTimeout') # these messages will not be forwarded to the browser ignoreMsgid = { # connect '01f7b138-ea53-4609-88e9-61f3eca3e7e7', # join '367063a5-9069-4025-907c-65ba88af8593', # disconnect '4c74b2c8-2403-4921-879d-2279ad85db72', # reconnect 'c53555cb-e1a4-4b69-b1c9-3320269c19d7', } def __init__ (self, fd, loop, maxLog=1000, pingInterval=30, pingTimeout=10): self.fd = fd self.clients = set () self.loop = loop # log buffer self.log = [] self.maxLog = maxLog self.pingInterval = pingInterval self.pingTimeout = pingTimeout async def client (self, websocket, path): self.clients.add (websocket) try: for l in self.log: buf = json.dumps (l) await websocket.send (buf) while True: try: msg = await asyncio.wait_for (websocket.recv(), timeout=self.pingInterval) except asyncio.TimeoutError: try: pong = await websocket.ping() await asyncio.wait_for (pong, timeout=self.pingTimeout) except asyncio.TimeoutError: break except websockets.exceptions.ConnectionClosed: break finally: self.clients.remove (websocket) def handleStdin (self): buf = self.fd.readline () if not buf: return data = json.loads (buf) msgid = data['uuid'] if msgid in self.ignoreMsgid: return # a few messages may contain sensitive information that we want to hide if msgid == '36cc34a6-061b-4cc5-84a9-4ab6552c8d75': # queue del data['cmdline'] elif msgid == '5c0f9a11-dcd8-4182-a60f-54f4d3ab3687': nesteddata = data['data'] nestedmsgid = nesteddata['uuid'] if nestedmsgid == '1680f384-744c-4b8a-815b-7346e632e8db': del nesteddata['command'] del nesteddata['destfile'] buf = json.dumps (data) for c in self.clients: # XXX can’t await here asyncio.ensure_future (c.send (buf)) self.log.append (data) while len (self.log) > self.maxLog: self.log.pop (0) def run (self, host='localhost', port=6789): self.loop.add_reader (self.fd, self.handleStdin) return websockets.serve(self.client, host, port)