diff options
Diffstat (limited to 'crocoite')
-rw-r--r-- | crocoite/cli.py | 8 | ||||
-rw-r--r-- | crocoite/controller.py | 18 | ||||
-rw-r--r-- | crocoite/irc.py | 109 |
3 files changed, 119 insertions, 16 deletions
diff --git a/crocoite/cli.py b/crocoite/cli.py index 913db7c..6365c78 100644 --- a/crocoite/cli.py +++ b/crocoite/cli.py @@ -142,3 +142,11 @@ def irc (): loop.add_signal_handler (signal.SIGTERM, stop, signal.SIGTERM) loop.run_until_complete(bot.run ()) +def dashboard (): + from .irc import Dashboard + + loop = asyncio.get_event_loop() + d = Dashboard (sys.stdin, loop) + loop.run_until_complete(d.run ()) + loop.run_forever() + diff --git a/crocoite/controller.py b/crocoite/controller.py index 8916726..45d9442 100644 --- a/crocoite/controller.py +++ b/crocoite/controller.py @@ -337,9 +337,9 @@ class RecursiveController: prefix=formatPrefix (self.prefix), suffix='.warc.gz', delete=False) destpath = os.path.join (self.output, os.path.basename (dest.name)) - logger = self.logger.bind (url=url, destfile=destpath) + logger = self.logger.bind (url=url) command = list (map (formatCommand, self.command)) - logger.info ('fetch', uuid='1680f384-744c-4b8a-815b-7346e632e8db', command=command) + logger.info ('fetch', uuid='1680f384-744c-4b8a-815b-7346e632e8db', command=command, destfile=destpath) process = await asyncio.create_subprocess_exec (*command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, stdin=asyncio.subprocess.DEVNULL, start_new_session=True) @@ -370,22 +370,28 @@ class RecursiveController: self._quit = True async def run (self): - self.have = set () - self.pending = set ([self.url]) - - while self.pending and not self._quit: + def log (): self.logger.info ('recursing', uuid='5b8498e4-868d-413c-a67e-004516b8452c', pending=len (self.pending), have=len (self.have), running=len (self.running)) + self.have = set () + self.pending = set ([self.url]) + + while self.pending: # since pending is a set this picks a random item, which is fine u = self.pending.pop () self.have.add (u) t = asyncio.ensure_future (self.fetch (u)) self.running.add (t) + + log () + if len (self.running) >= self.concurrency or not self.pending: done, pending = await asyncio.wait (self.running, return_when=asyncio.FIRST_COMPLETED) self.running.difference_update (done) + log () + diff --git a/crocoite/irc.py b/crocoite/irc.py index 7d1a96d..095c55f 100644 --- a/crocoite/irc.py +++ b/crocoite/irc.py @@ -29,6 +29,7 @@ from enum import IntEnum, Enum from collections import defaultdict from abc import abstractmethod from functools import wraps +from io import BytesIO import bottom ### helper functions ### @@ -205,7 +206,7 @@ class ArgparseBot (bottom.Client): self.nick = nick # map channel -> nick -> user self.users = defaultdict (dict) - self.logger = logger + self.logger = logger.bind (context=type (self).__name__) self.parser = self.getParser () # bot does not accept new queries in shutdown mode, unless explicitly @@ -238,7 +239,7 @@ class ArgparseBot (bottom.Client): await self.disconnect () async def onConnect (self, **kwargs): - self.logger.info ('connect', nick=self.nick) + self.logger.info ('connect', nick=self.nick, uuid='01f7b138-ea53-4609-88e9-61f3eca3e7e7') self.send('NICK', nick=self.nick) self.send('USER', user=self.nick, realname='https://github.com/PromyLOPh/crocoite') @@ -254,7 +255,7 @@ class ArgparseBot (bottom.Client): future.cancel() for c in self.channels: - self.logger.info ('join', channel=c) + self.logger.info ('join', channel=c, uuid='367063a5-9069-4025-907c-65ba88af8593') self.send ('JOIN', channel=c) # no need for NAMES here, server sends this automatically @@ -334,10 +335,10 @@ class ArgparseBot (bottom.Client): async def onDisconnect (**kwargs): """ Auto-reconnect """ - self.logger.info ('disconnect') + self.logger.info ('disconnect', uuid='4c74b2c8-2403-4921-879d-2279ad85db72') if not self._quit.armed: await asynio.sleep (10, loop=self.loop) - self.logger.info ('reconnect') + self.logger.info ('reconnect', uuid='c53555cb-e1a4-4b69-b1c9-3320269c19d7') await self.connect () def voice (func): @@ -413,7 +414,7 @@ class Chromebot (ArgparseBot): assert j.id not in self.jobs, 'duplicate job id' self.jobs[j.id] = j - logger = self.logger.bind (id=j.id, user=user.name, url=args.url) + logger = self.logger.bind (job=j.id) cmdline = ['crocoite-recursive', args.url, '--tempdir', self.tempdir, '--prefix', j.id + '-{host}-{date}-', '--policy', @@ -426,7 +427,8 @@ class Chromebot (ArgparseBot): } strargs = ', '.join (map (lambda x: '{}={}'.format (*x), showargs.items ())) reply ('{} has been queued as {} with {}'.format (args.url, j.id, strargs)) - logger.info ('queue', cmdline=cmdline) + logger.info ('queue', user=user.name, url=args.url, cmdline=cmdline, + uuid='36cc34a6-061b-4cc5-84a9-4ab6552c8d75') async with self.processLimit: if j.status == Status.pending: @@ -443,7 +445,7 @@ class Chromebot (ArgparseBot): # job is marked running after the first message is received from it if j.status == Status.pending: - logger.info ('start') + logger.info ('start', uuid='46e62d60-f498-4ab0-90e1-d08a073b10fb') j.status = Status.running data = json.loads (data) @@ -452,10 +454,15 @@ class Chromebot (ArgparseBot): j.stats = data elif msgid == '5b8498e4-868d-413c-a67e-004516b8452c': j.rstats = data + + # forward message, so the dashboard can use it + logger.info ('message', + uuid='5c0f9a11-dcd8-4182-a60f-54f4d3ab3687', + data=data) code = await j.process.wait () if j.status == Status.running: - logger.info ('finish') + logger.info ('finish', uuid='7b40ffbb-faab-4224-90ed-cd4febd8f7ec') j.status = Status.finished j.finished = datetime.utcnow () @@ -480,7 +487,89 @@ class Chromebot (ArgparseBot): return job.status = Status.aborted - self.logger.info ('abort', id=job.id, user=user.name) + self.logger.info ('abort', job=job.id, user=user.name, + uuid='865b3b3e-a54a-4a56-a545-f38a37bac295') if job.process and job.process.returncode is None: job.process.terminate () +import websockets + +class Dashboard: + __slots__ = ('fd', 'clients', 'loop', 'log', 'maxLog', 'pingInterval', 'pingTimeout') + # these messages will not be forwarded to the browser + ignoreMsgid = { + # connect + '01f7b138-ea53-4609-88e9-61f3eca3e7e7', + # join + '367063a5-9069-4025-907c-65ba88af8593', + # disconnect + '4c74b2c8-2403-4921-879d-2279ad85db72', + # reconnect + 'c53555cb-e1a4-4b69-b1c9-3320269c19d7', + } + + def __init__ (self, fd, loop, maxLog=1000, pingInterval=30, pingTimeout=10): + self.fd = fd + self.clients = set () + self.loop = loop + # log buffer + self.log = [] + self.maxLog = maxLog + self.pingInterval = pingInterval + self.pingTimeout = pingTimeout + + async def client (self, websocket, path): + self.clients.add (websocket) + try: + for l in self.log: + buf = json.dumps (l) + await websocket.send (buf) + while True: + try: + msg = await asyncio.wait_for (websocket.recv(), timeout=self.pingInterval) + except asyncio.TimeoutError: + try: + pong = await websocket.ping() + await asyncio.wait_for (pong, timeout=self.pingTimeout) + except asyncio.TimeoutError: + break + except websockets.exceptions.ConnectionClosed: + break + finally: + self.clients.remove (websocket) + + def handleStdin (self): + buf = self.fd.readline () + if not buf: + return + + data = json.loads (buf) + msgid = data['uuid'] + + if msgid in self.ignoreMsgid: + return + + # a few messages may contain sensitive information that we want to hide + if msgid == '36cc34a6-061b-4cc5-84a9-4ab6552c8d75': + # queue + del data['cmdline'] + elif msgid == '5c0f9a11-dcd8-4182-a60f-54f4d3ab3687': + nesteddata = data['data'] + nestedmsgid = nesteddata['uuid'] + if nestedmsgid == '1680f384-744c-4b8a-815b-7346e632e8db': + del nesteddata['command'] + del nesteddata['destfile'] + + buf = json.dumps (data) + for c in self.clients: + # XXX can’t await here + asyncio.ensure_future (c.send (buf)) + + self.log.append (data) + while len (self.log) > self.maxLog: + self.log.pop (0) + + def run (self, host='localhost', port=6789): + self.loop.add_reader (self.fd, self.handleStdin) + return websockets.serve(self.client, host, port) + |