summaryrefslogtreecommitdiff
path: root/crocoite/irc.py
diff options
context:
space:
mode:
Diffstat (limited to 'crocoite/irc.py')
-rw-r--r--crocoite/irc.py254
1 files changed, 176 insertions, 78 deletions
diff --git a/crocoite/irc.py b/crocoite/irc.py
index 99485e4..d9c0634 100644
--- a/crocoite/irc.py
+++ b/crocoite/irc.py
@@ -22,16 +22,19 @@
IRC bot “chromebot”
"""
-import asyncio, argparse, uuid, json, tempfile
+import asyncio, argparse, json, tempfile, time, random, os, shlex
from datetime import datetime
from urllib.parse import urlsplit
-from enum import IntEnum, Enum
+from enum import IntEnum, unique
from collections import defaultdict
from abc import abstractmethod
from functools import wraps
import bottom
import websockets
+from .util import StrJsonEncoder
+from .cli import cookie
+
### helper functions ###
def prettyTimeDelta (seconds):
"""
@@ -53,7 +56,7 @@ def prettyBytes (b):
while b >= 1024 and len (prefixes) > 1:
b /= 1024
prefixes.pop (0)
- return '{:.1f} {}'.format (b, prefixes[0])
+ return f'{b:.1f} {prefixes[0]}'
def isValidUrl (s):
url = urlsplit (s)
@@ -84,13 +87,45 @@ class Status(IntEnum):
aborted = 3
finished = 4
+# see https://arxiv.org/html/0901.4016 on how to build proquints (human
+# pronouncable unique ids)
+toConsonant = 'bdfghjklmnprstvz'
+toVowel = 'aiou'
+
+def u16ToQuint (v):
+ """ Transform a 16 bit unsigned integer into a single quint """
+ assert 0 <= v < 2**16
+ # quints are “big-endian”
+ return ''.join ([
+ toConsonant[(v>>(4+2+4+2))&0xf],
+ toVowel[(v>>(4+2+4))&0x3],
+ toConsonant[(v>>(4+2))&0xf],
+ toVowel[(v>>4)&0x3],
+ toConsonant[(v>>0)&0xf],
+ ])
+
+def uintToQuint (v, length=2):
+ """ Turn any integer into a proquint with fixed length """
+ assert 0 <= v < 2**(length*16)
+
+ return '-'.join (reversed ([u16ToQuint ((v>>(x*16))&0xffff) for x in range (length)]))
+
+def makeJobId ():
+ """ Create job id from time and randomness source """
+ # allocate 48 bits for the time (in milliseconds) and add 16 random bits
+ # at the end (just to be sure) for a total of 64 bits. Should be enough to
+ # avoid collisions.
+ randbits = 16
+ stamp = (int (time.time ()*1000) << randbits) | random.randint (0, 2**randbits-1)
+ return uintToQuint (stamp, 4)
+
class Job:
""" Archival job """
__slots__ = ('id', 'stats', 'rstats', 'started', 'finished', 'nick', 'status', 'process', 'url')
def __init__ (self, url, nick):
- self.id = str (uuid.uuid4 ())
+ self.id = makeJobId ()
self.stats = {}
self.rstats = {}
self.started = datetime.utcnow ()
@@ -104,32 +139,40 @@ class Job:
def formatStatus (self):
stats = self.stats
rstats = self.rstats
- return '{} ({}) {}. {} pages finished, {} pending; {} crashed, {} requests, {} failed, {} received.'.format (
- self.url,
- self.id,
- self.status.name,
- rstats.get ('have', 0),
- rstats.get ('pending', 0),
- stats.get ('crashed', 0),
- stats.get ('requests', 0),
- stats.get ('failed', 0),
- prettyBytes (stats.get ('bytesRcv', 0)))
-
-class NickMode(Enum):
- operator = '@'
- voice = '+'
+ return (f"{self.url} ({self.id}) {self.status.name}. "
+ f"{rstats.get ('have', 0)} pages finished, "
+ f"{rstats.get ('pending', 0)} pending; "
+ f"{stats.get ('crashed', 0)} crashed, "
+ f"{stats.get ('requests', 0)} requests, "
+ f"{stats.get ('failed', 0)} failed, "
+ f"{prettyBytes (stats.get ('bytesRcv', 0))} received.")
+
+@unique
+class NickMode(IntEnum):
+ # the actual numbers don’t matter, but their order must be strictly
+ # increasing (with priviledge level)
+ operator = 100
+ voice = 10
@classmethod
def fromMode (cls, mode):
return {'v': cls.voice, 'o': cls.operator}[mode]
+ @classmethod
+ def fromNickPrefix (cls, mode):
+ return {'@': cls.operator, '+': cls.voice}[mode]
+
+ @property
+ def human (self):
+ return {self.operator: 'operator', self.voice: 'voice'}[self]
+
class User:
""" IRC user """
__slots__ = ('name', 'modes')
- def __init__ (self, name, modes=set ()):
+ def __init__ (self, name, modes=None):
self.name = name
- self.modes = modes
+ self.modes = modes or set ()
def __eq__ (self, b):
return self.name == b.name
@@ -138,15 +181,21 @@ class User:
return hash (self.name)
def __repr__ (self):
- return '<User {} {}>'.format (self.name, self.modes)
+ return f'<User {self.name} {self.modes}>'
+
+ def hasPriv (self, p):
+ if p is None:
+ return True
+ else:
+ return self.modes and max (self.modes) >= p
@classmethod
def fromName (cls, name):
""" Get mode and name from NAMES command """
try:
- modes = {NickMode(name[0])}
+ modes = {NickMode.fromNickPrefix (name[0])}
name = name[1:]
- except ValueError:
+ except KeyError:
modes = set ()
return cls (name, modes)
@@ -159,7 +208,8 @@ class ReplyContext:
self.user = user
def __call__ (self, message):
- self.client.send ('PRIVMSG', target=self.target, message='{}: {}'.format (self.user.name, message))
+ self.client.send ('PRIVMSG', target=self.target,
+ message=f'{self.user.name}: {message}')
class RefCountEvent:
"""
@@ -200,9 +250,9 @@ class ArgparseBot (bottom.Client):
__slots__ = ('channels', 'nick', 'parser', 'users', '_quit')
- def __init__ (self, host, port, ssl, nick, logger, channels=[], loop=None):
+ def __init__ (self, host, port, ssl, nick, logger, channels=None, loop=None):
super().__init__ (host=host, port=port, ssl=ssl, loop=loop)
- self.channels = channels
+ self.channels = channels or []
self.nick = nick
# map channel -> nick -> user
self.users = defaultdict (dict)
@@ -259,8 +309,13 @@ class ArgparseBot (bottom.Client):
self.send ('JOIN', channel=c)
# no need for NAMES here, server sends this automatically
- async def onNameReply (self, target, channel_type, channel, users, **kwargs):
- self.users[channel] = dict (map (lambda x: (x.name, x), map (User.fromName, users)))
+ async def onNameReply (self, channel, users, **kwargs):
+ # channels may be too big for a single message
+ addusers = dict (map (lambda x: (x.name, x), map (User.fromName, users)))
+ if channel not in self.users:
+ self.users[channel] = addusers
+ else:
+ self.users[channel].update (addusers)
@staticmethod
def parseMode (mode):
@@ -274,7 +329,7 @@ class ArgparseBot (bottom.Client):
ret.append ((action, c))
return ret
- async def onMode (self, nick, user, host, channel, modes, params, **kwargs):
+ async def onMode (self, channel, modes, params, **kwargs):
if channel not in self.channels:
return
@@ -290,7 +345,7 @@ class ArgparseBot (bottom.Client):
# unknown mode, ignore
pass
- async def onPart (self, nick, user, host, message, channel, **kwargs):
+ async def onPart (self, nick, channel, **kwargs):
if channel not in self.channels:
return
@@ -312,23 +367,27 @@ class ArgparseBot (bottom.Client):
async def onMessage (self, nick, target, message, **kwargs):
""" Message received """
- if target in self.channels and message.startswith (self.nick):
+ msgPrefix = self.nick + ':'
+ if target in self.channels and message.startswith (msgPrefix):
user = self.users[target].get (nick, User (nick))
reply = ReplyContext (client=self, target=target, user=user)
- # channel message that starts with our nick
- command = message.split (' ')[1:]
+ # shlex.split supports quoting arguments, which str.split() does not
+ command = shlex.split (message[len (msgPrefix):])
try:
args = self.parser.parse_args (command)
except Exception as e:
- reply ('{} -- {}'.format (e.args[1], e.args[0].format_usage ()))
+ reply (f'{e.args[1]} -- {e.args[0].format_usage ()}')
return
- if not args:
- reply ('Sorry, I don’t understand {}'.format (command))
+ if not args or not hasattr (args, 'func'):
+ reply (f'Sorry, I don’t understand {command}')
return
+ minPriv = getattr (args, 'minPriv', None)
if self._quit.armed and not getattr (args, 'allowOnShutdown', False):
reply ('Sorry, I’m shutting down and cannot accept your request right now.')
+ elif not user.hasPriv (minPriv):
+ reply (f'Sorry, you need the privilege {minPriv.human} to use this command.')
else:
with self._quit:
await args.func (user=user, args=args, reply=reply)
@@ -336,23 +395,14 @@ class ArgparseBot (bottom.Client):
async def onDisconnect (self, **kwargs):
""" Auto-reconnect """
self.logger.info ('disconnect', uuid='4c74b2c8-2403-4921-879d-2279ad85db72')
- if not self._quit.armed:
- await asyncio.sleep (10, loop=self.loop)
- self.logger.info ('reconnect', uuid='c53555cb-e1a4-4b69-b1c9-3320269c19d7')
- await self.connect ()
-
-def voice (func):
- """ Calling user must have voice or ops """
- @wraps (func)
- async def inner (self, *args, **kwargs):
- user = kwargs.get ('user')
- reply = kwargs.get ('reply')
- if not user.modes.intersection ({NickMode.operator, NickMode.voice}):
- reply ('Sorry, you must have voice to use this command.')
- else:
- ret = await func (self, *args, **kwargs)
- return ret
- return inner
+ while True:
+ if not self._quit.armed:
+ await asyncio.sleep (10, loop=self.loop)
+ self.logger.info ('reconnect', uuid='c53555cb-e1a4-4b69-b1c9-3320269c19d7')
+ try:
+ await self.connect ()
+ finally:
+ break
def jobExists (func):
""" Chromebot job exists """
@@ -363,38 +413,45 @@ def jobExists (func):
reply = kwargs.get ('reply')
j = self.jobs.get (args.id, None)
if not j:
- reply ('Job {} is unknown'.format (args.id))
+ reply (f'Job {args.id} is unknown')
else:
ret = await func (self, job=j, **kwargs)
return ret
return inner
class Chromebot (ArgparseBot):
- __slots__ = ('jobs', 'tempdir', 'destdir', 'processLimit')
+ __slots__ = ('jobs', 'tempdir', 'destdir', 'processLimit', 'blacklist', 'needVoice')
+
+ def __init__ (self, host, port, ssl, nick, logger, channels=None,
+ tempdir=None, destdir='.', processLimit=1,
+ blacklist={}, needVoice=False, loop=None):
+ self.needVoice = needVoice
- def __init__ (self, host, port, ssl, nick, logger, channels=[],
- tempdir=tempfile.gettempdir(), destdir='.', processLimit=1,
- loop=None):
super().__init__ (host=host, port=port, ssl=ssl, nick=nick,
logger=logger, channels=channels, loop=loop)
self.jobs = {}
- self.tempdir = tempdir
+ self.tempdir = tempdir or tempfile.gettempdir()
self.destdir = destdir
self.processLimit = asyncio.Semaphore (processLimit)
+ self.blacklist = blacklist
def getParser (self):
parser = NonExitingArgumentParser (prog=self.nick + ': ', add_help=False)
subparsers = parser.add_subparsers(help='Sub-commands')
archiveparser = subparsers.add_parser('a', help='Archive a site', add_help=False)
- #archiveparser.add_argument('--timeout', default=1*60*60, type=int, help='Maximum time for archival', metavar='SEC', choices=[60, 1*60*60, 2*60*60])
- #archiveparser.add_argument('--idle-timeout', default=10, type=int, help='Maximum idle seconds (i.e. no requests)', dest='idleTimeout', metavar='SEC', choices=[1, 10, 20, 30, 60])
- #archiveparser.add_argument('--max-body-size', default=None, type=int, dest='maxBodySize', help='Max body size', metavar='BYTES', choices=[1*1024*1024, 10*1024*1024, 100*1024*1024])
archiveparser.add_argument('--concurrency', '-j', default=1, type=int, help='Parallel workers for this job', choices=range (1, 5))
archiveparser.add_argument('--recursive', '-r', help='Enable recursion', choices=['0', '1', 'prefix'], default='0')
+ archiveparser.add_argument('--insecure', '-k',
+ help='Disable certificate checking', action='store_true')
+ # parsing the cookie here, so we can give an early feedback without
+ # waiting for the job to crash on invalid arguments.
+ archiveparser.add_argument('--cookie', '-b', type=cookie,
+ help='Add a cookie', action='append', default=[])
archiveparser.add_argument('url', help='Website URL', type=isValidUrl, metavar='URL')
- archiveparser.set_defaults (func=self.handleArchive)
+ archiveparser.set_defaults (func=self.handleArchive,
+ minPriv=NickMode.voice if self.needVoice else None)
statusparser = subparsers.add_parser ('s', help='Get job status', add_help=False)
statusparser.add_argument('id', help='Job id', metavar='UUID')
@@ -402,31 +459,70 @@ class Chromebot (ArgparseBot):
abortparser = subparsers.add_parser ('r', help='Revoke/abort job', add_help=False)
abortparser.add_argument('id', help='Job id', metavar='UUID')
- abortparser.set_defaults (func=self.handleAbort, allowOnShutdown=True)
+ abortparser.set_defaults (func=self.handleAbort, allowOnShutdown=True,
+ minPriv=NickMode.voice if self.needVoice else None)
return parser
- @voice
+ def isBlacklisted (self, url):
+ for k, v in self.blacklist.items():
+ if k.match (url):
+ return v
+ return False
+
async def handleArchive (self, user, args, reply):
""" Handle the archive command """
- j = Job (args.url, user.name)
- assert j.id not in self.jobs, 'duplicate job id'
+ msg = self.isBlacklisted (args.url)
+ if msg:
+ reply (f'{args.url} cannot be queued: {msg}')
+ return
+
+ # make sure the job id is unique. Since ids are time-based we can just
+ # wait.
+ while True:
+ j = Job (args.url, user.name)
+ if j.id not in self.jobs:
+ break
+ time.sleep (0.01)
self.jobs[j.id] = j
logger = self.logger.bind (job=j.id)
- cmdline = ['crocoite-recursive', args.url, '--tempdir', self.tempdir,
- '--prefix', j.id + '-{host}-{date}-', '--policy',
- args.recursive, '--concurrency', str (args.concurrency),
- self.destdir]
-
showargs = {
'recursive': args.recursive,
'concurrency': args.concurrency,
}
+ if args.insecure:
+ showargs['insecure'] = args.insecure
+ warcinfo = {'chromebot': {
+ 'jobid': j.id,
+ 'user': user.name,
+ 'queued': j.started,
+ 'url': args.url,
+ 'recursive': args.recursive,
+ 'concurrency': args.concurrency,
+ }}
+ grabCmd = ['crocoite-single']
+ # prefix warcinfo with !, so it won’t get expanded
+ grabCmd.extend (['--warcinfo',
+ '!' + json.dumps (warcinfo, cls=StrJsonEncoder)])
+ for v in args.cookie:
+ grabCmd.extend (['--cookie', v.OutputString ()])
+ if args.insecure:
+ grabCmd.append ('--insecure')
+ grabCmd.extend (['{url}', '{dest}'])
+ cmdline = ['crocoite',
+ '--tempdir', self.tempdir,
+ '--recursion', args.recursive,
+ '--concurrency', str (args.concurrency),
+ args.url,
+ os.path.join (self.destdir,
+ j.id + '-{host}-{date}-{seqnum}.warc.gz'),
+ '--'] + grabCmd
+
strargs = ', '.join (map (lambda x: '{}={}'.format (*x), showargs.items ()))
- reply ('{} has been queued as {} with {}'.format (args.url, j.id, strargs))
+ reply (f'{args.url} has been queued as {j.id} with {strargs}')
logger.info ('queue', user=user.name, url=args.url, cmdline=cmdline,
uuid='36cc34a6-061b-4cc5-84a9-4ab6552c8d75')
@@ -437,7 +533,7 @@ class Chromebot (ArgparseBot):
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
stdin=asyncio.subprocess.DEVNULL,
- start_new_session=True)
+ start_new_session=True, limit=100*1024*1024)
while True:
data = await j.process.stdout.readline ()
if not data:
@@ -477,7 +573,6 @@ class Chromebot (ArgparseBot):
rstats = job.rstats
reply (job.formatStatus ())
- @voice
@jobExists
async def handleAbort (self, user, args, reply, job):
""" Handle abort command """
@@ -541,7 +636,11 @@ class Dashboard:
if not buf:
return
- data = json.loads (buf)
+ try:
+ data = json.loads (buf)
+ except json.decoder.JSONDecodeError:
+ # ignore invalid
+ return
msgid = data['uuid']
if msgid in self.ignoreMsgid:
@@ -554,9 +653,8 @@ class Dashboard:
elif msgid == '5c0f9a11-dcd8-4182-a60f-54f4d3ab3687':
nesteddata = data['data']
nestedmsgid = nesteddata['uuid']
- if nestedmsgid == '1680f384-744c-4b8a-815b-7346e632e8db':
+ if nestedmsgid == 'd1288fbe-8bae-42c8-af8c-f2fa8b41794f':
del nesteddata['command']
- del nesteddata['destfile']
buf = json.dumps (data)
for c in self.clients: