summaryrefslogtreecommitdiff
path: root/crocoite/irc.py
diff options
context:
space:
mode:
authorLars-Dominik Braun <lars@6xq.net>2018-10-14 12:41:37 +0200
committerLars-Dominik Braun <lars@6xq.net>2018-10-14 12:43:39 +0200
commit958563a3602780b48599c27acf212139c2e6904d (patch)
treebc3b3e1209691c320122b4586972e275563cc867 /crocoite/irc.py
parent07994fb6b72b0c84d2ee2c69e5afdb204d33d5e6 (diff)
downloadcrocoite-958563a3602780b48599c27acf212139c2e6904d.tar.gz
crocoite-958563a3602780b48599c27acf212139c2e6904d.tar.bz2
crocoite-958563a3602780b48599c27acf212139c2e6904d.zip
irc: Add PoC dashboard
Using websockets, vue and bulma.
Diffstat (limited to 'crocoite/irc.py')
-rw-r--r--crocoite/irc.py109
1 files changed, 99 insertions, 10 deletions
diff --git a/crocoite/irc.py b/crocoite/irc.py
index 7d1a96d..095c55f 100644
--- a/crocoite/irc.py
+++ b/crocoite/irc.py
@@ -29,6 +29,7 @@ from enum import IntEnum, Enum
from collections import defaultdict
from abc import abstractmethod
from functools import wraps
+from io import BytesIO
import bottom
### helper functions ###
@@ -205,7 +206,7 @@ class ArgparseBot (bottom.Client):
self.nick = nick
# map channel -> nick -> user
self.users = defaultdict (dict)
- self.logger = logger
+ self.logger = logger.bind (context=type (self).__name__)
self.parser = self.getParser ()
# bot does not accept new queries in shutdown mode, unless explicitly
@@ -238,7 +239,7 @@ class ArgparseBot (bottom.Client):
await self.disconnect ()
async def onConnect (self, **kwargs):
- self.logger.info ('connect', nick=self.nick)
+ self.logger.info ('connect', nick=self.nick, uuid='01f7b138-ea53-4609-88e9-61f3eca3e7e7')
self.send('NICK', nick=self.nick)
self.send('USER', user=self.nick, realname='https://github.com/PromyLOPh/crocoite')
@@ -254,7 +255,7 @@ class ArgparseBot (bottom.Client):
future.cancel()
for c in self.channels:
- self.logger.info ('join', channel=c)
+ self.logger.info ('join', channel=c, uuid='367063a5-9069-4025-907c-65ba88af8593')
self.send ('JOIN', channel=c)
# no need for NAMES here, server sends this automatically
@@ -334,10 +335,10 @@ class ArgparseBot (bottom.Client):
async def onDisconnect (**kwargs):
""" Auto-reconnect """
- self.logger.info ('disconnect')
+ self.logger.info ('disconnect', uuid='4c74b2c8-2403-4921-879d-2279ad85db72')
if not self._quit.armed:
await asynio.sleep (10, loop=self.loop)
- self.logger.info ('reconnect')
+ self.logger.info ('reconnect', uuid='c53555cb-e1a4-4b69-b1c9-3320269c19d7')
await self.connect ()
def voice (func):
@@ -413,7 +414,7 @@ class Chromebot (ArgparseBot):
assert j.id not in self.jobs, 'duplicate job id'
self.jobs[j.id] = j
- logger = self.logger.bind (id=j.id, user=user.name, url=args.url)
+ logger = self.logger.bind (job=j.id)
cmdline = ['crocoite-recursive', args.url, '--tempdir', self.tempdir,
'--prefix', j.id + '-{host}-{date}-', '--policy',
@@ -426,7 +427,8 @@ class Chromebot (ArgparseBot):
}
strargs = ', '.join (map (lambda x: '{}={}'.format (*x), showargs.items ()))
reply ('{} has been queued as {} with {}'.format (args.url, j.id, strargs))
- logger.info ('queue', cmdline=cmdline)
+ logger.info ('queue', user=user.name, url=args.url, cmdline=cmdline,
+ uuid='36cc34a6-061b-4cc5-84a9-4ab6552c8d75')
async with self.processLimit:
if j.status == Status.pending:
@@ -443,7 +445,7 @@ class Chromebot (ArgparseBot):
# job is marked running after the first message is received from it
if j.status == Status.pending:
- logger.info ('start')
+ logger.info ('start', uuid='46e62d60-f498-4ab0-90e1-d08a073b10fb')
j.status = Status.running
data = json.loads (data)
@@ -452,10 +454,15 @@ class Chromebot (ArgparseBot):
j.stats = data
elif msgid == '5b8498e4-868d-413c-a67e-004516b8452c':
j.rstats = data
+
+ # forward message, so the dashboard can use it
+ logger.info ('message',
+ uuid='5c0f9a11-dcd8-4182-a60f-54f4d3ab3687',
+ data=data)
code = await j.process.wait ()
if j.status == Status.running:
- logger.info ('finish')
+ logger.info ('finish', uuid='7b40ffbb-faab-4224-90ed-cd4febd8f7ec')
j.status = Status.finished
j.finished = datetime.utcnow ()
@@ -480,7 +487,89 @@ class Chromebot (ArgparseBot):
return
job.status = Status.aborted
- self.logger.info ('abort', id=job.id, user=user.name)
+ self.logger.info ('abort', job=job.id, user=user.name,
+ uuid='865b3b3e-a54a-4a56-a545-f38a37bac295')
if job.process and job.process.returncode is None:
job.process.terminate ()
+import websockets
+
+class Dashboard:
+ __slots__ = ('fd', 'clients', 'loop', 'log', 'maxLog', 'pingInterval', 'pingTimeout')
+ # these messages will not be forwarded to the browser
+ ignoreMsgid = {
+ # connect
+ '01f7b138-ea53-4609-88e9-61f3eca3e7e7',
+ # join
+ '367063a5-9069-4025-907c-65ba88af8593',
+ # disconnect
+ '4c74b2c8-2403-4921-879d-2279ad85db72',
+ # reconnect
+ 'c53555cb-e1a4-4b69-b1c9-3320269c19d7',
+ }
+
+ def __init__ (self, fd, loop, maxLog=1000, pingInterval=30, pingTimeout=10):
+ self.fd = fd
+ self.clients = set ()
+ self.loop = loop
+ # log buffer
+ self.log = []
+ self.maxLog = maxLog
+ self.pingInterval = pingInterval
+ self.pingTimeout = pingTimeout
+
+ async def client (self, websocket, path):
+ self.clients.add (websocket)
+ try:
+ for l in self.log:
+ buf = json.dumps (l)
+ await websocket.send (buf)
+ while True:
+ try:
+ msg = await asyncio.wait_for (websocket.recv(), timeout=self.pingInterval)
+ except asyncio.TimeoutError:
+ try:
+ pong = await websocket.ping()
+ await asyncio.wait_for (pong, timeout=self.pingTimeout)
+ except asyncio.TimeoutError:
+ break
+ except websockets.exceptions.ConnectionClosed:
+ break
+ finally:
+ self.clients.remove (websocket)
+
+ def handleStdin (self):
+ buf = self.fd.readline ()
+ if not buf:
+ return
+
+ data = json.loads (buf)
+ msgid = data['uuid']
+
+ if msgid in self.ignoreMsgid:
+ return
+
+ # a few messages may contain sensitive information that we want to hide
+ if msgid == '36cc34a6-061b-4cc5-84a9-4ab6552c8d75':
+ # queue
+ del data['cmdline']
+ elif msgid == '5c0f9a11-dcd8-4182-a60f-54f4d3ab3687':
+ nesteddata = data['data']
+ nestedmsgid = nesteddata['uuid']
+ if nestedmsgid == '1680f384-744c-4b8a-815b-7346e632e8db':
+ del nesteddata['command']
+ del nesteddata['destfile']
+
+ buf = json.dumps (data)
+ for c in self.clients:
+ # XXX can’t await here
+ asyncio.ensure_future (c.send (buf))
+
+ self.log.append (data)
+ while len (self.log) > self.maxLog:
+ self.log.pop (0)
+
+ def run (self, host='localhost', port=6789):
+ self.loop.add_reader (self.fd, self.handleStdin)
+ return websockets.serve(self.client, host, port)
+