From 958563a3602780b48599c27acf212139c2e6904d Mon Sep 17 00:00:00 2001 From: Lars-Dominik Braun Date: Sun, 14 Oct 2018 12:41:37 +0200 Subject: irc: Add PoC dashboard Using websockets, vue and bulma. --- contrib/dashboard.css | 7 +++ contrib/dashboard.html | 22 +++++++++ contrib/dashboard.js | 127 +++++++++++++++++++++++++++++++++++++++++++++++++ crocoite/cli.py | 8 ++++ crocoite/controller.py | 18 ++++--- crocoite/irc.py | 109 ++++++++++++++++++++++++++++++++++++++---- setup.py | 2 + 7 files changed, 277 insertions(+), 16 deletions(-) create mode 100644 contrib/dashboard.css create mode 100644 contrib/dashboard.html create mode 100644 contrib/dashboard.js diff --git a/contrib/dashboard.css b/contrib/dashboard.css new file mode 100644 index 0000000..469db78 --- /dev/null +++ b/contrib/dashboard.css @@ -0,0 +1,7 @@ +.jid { + font-family: Consolas, mono; +} +.job .urls { + max-height: 10em; + overflow-y: scroll; +} diff --git a/contrib/dashboard.html b/contrib/dashboard.html new file mode 100644 index 0000000..cc09d50 --- /dev/null +++ b/contrib/dashboard.html @@ -0,0 +1,22 @@ + + + + + + chromebot dashboard + + + + + + + +
+

chromebot dashboard

+
+ +
+
+ + + diff --git a/contrib/dashboard.js b/contrib/dashboard.js new file mode 100644 index 0000000..eb34d43 --- /dev/null +++ b/contrib/dashboard.js @@ -0,0 +1,127 @@ +/* configuration */ +let socket = "ws://localhost:6789/", + urllogMax = 100; + +function formatSize (bytes) { + let prefixes = ['B', 'KiB', 'MiB', 'GiB', 'TiB']; + while (bytes >= 1024 && prefixes.length > 1) { + bytes /= 1024; + prefixes.shift (); + } + return bytes.toFixed (1) + ' ' + prefixes[0]; +} + +class Job { + constructor (id, url, user, queued) { + this.id = id; + this.url = url; + this.user = user; + this.status = undefined; + this.stats = {'pending': 0, 'have': 0, 'running': 0, + 'requests': 0, 'finished': 0, 'failed': 0, + 'bytesRcv': 0, 'crashed': 0, 'ignored': 0}; + this.urllog = []; + this.queued = queued; + this.started = undefined; + this.finished = undefined; + this.aborted = undefined; + } + + addUrl (url) { + if (this.urllog.push (url) > urllogMax) { + this.urllog.shift (); + } + } +} + +let jobs = {}; +/* list of ignored job ids, i.e. those the user deleted from the dashboard */ +let ignored = []; +let ws = new WebSocket(socket); +ws.onmessage = function (event) { + var msg = JSON.parse (event.data); + let msgdate = new Date (Date.parse (msg.date)); + var j = undefined; + console.log (msg); + if (msg.job) { + if (ignored.includes (msg.job)) { + console.log ("job ignored", msg.job); + return; + } + j = jobs[msg.job]; + if (j === undefined) { + j = new Job (msg.job, 'unknown', '', new Date ()); + Vue.set (jobs, msg.job, j); + } + } + if (msg.uuid == '36cc34a6-061b-4cc5-84a9-4ab6552c8d75') { + j = new Job (msg.job, msg.url, msg.user, msgdate); + /* jobs[msg.job] = j does not work with vue, see + https://vuejs.org/v2/guide/list.html#Object-Change-Detection-Caveats + */ + Vue.set (jobs, msg.job, j); + j.status = 'pending'; + } else if (msg.uuid == '46e62d60-f498-4ab0-90e1-d08a073b10fb') { + j.status = 'running'; + j.started = msgdate; + } else if (msg.uuid == '7b40ffbb-faab-4224-90ed-cd4febd8f7ec') { + j.status = 'finished'; + j.finished = msgdate; + } else if (msg.uuid == '865b3b3e-a54a-4a56-a545-f38a37bac295') { + j.status = 'aborted'; + j.aborted = msgdate; + } else if (msg.uuid == '5c0f9a11-dcd8-4182-a60f-54f4d3ab3687') { + /* forwarded job message */ + let rmsg = msg.data; + if (rmsg.uuid == '24d92d16-770e-4088-b769-4020e127a7ff') { + /* job status */ + Object.assign (j.stats, rmsg); + } else if (rmsg.uuid == '5b8498e4-868d-413c-a67e-004516b8452c') { + /* recursion status */ + Object.assign (j.stats, rmsg); + } else if (rmsg.uuid == '1680f384-744c-4b8a-815b-7346e632e8db') { + /* fetch */ + j.addUrl (rmsg.url); + } + } +}; +ws.onopen = function (event) { +}; +ws.onerror = function (event) { +}; + +Vue.component('job-item', { + props: ['job', 'jobs', 'ignored'], + template: '', + methods: { + del: function (id) { + Vue.delete(this.jobs, id); + this.ignored.push (id); + } + } +}); +Vue.component('job-status', { + props: ['job'], + template: 'queued on {{ job.queued.toLocaleString() }}aborted on {{ job.aborted.toLocaleString() }}running since {{ job.started.toLocaleString() }}finished since {{ job.finished.toLocaleString() }}' +}); +Vue.component('job-stats', { + props: ['job'], + template: '
  • {{ job.stats.have }} have
  • {{ job.stats.running }} running
  • {{ job.stats.pending }} pending
  • {{ job.stats.requests }} requests
' +}); +Vue.component('job-urls', { + props: ['job'], + template: '' +}); +Vue.component('filesize', { + props: ['value'], + template: '{{ fvalue }}', + computed: { fvalue: function () { return formatSize (this.value); } } +}); + +let app = new Vue({ + el: '#app', + data: { + jobs: jobs, + } +}); + diff --git a/crocoite/cli.py b/crocoite/cli.py index 913db7c..6365c78 100644 --- a/crocoite/cli.py +++ b/crocoite/cli.py @@ -142,3 +142,11 @@ def irc (): loop.add_signal_handler (signal.SIGTERM, stop, signal.SIGTERM) loop.run_until_complete(bot.run ()) +def dashboard (): + from .irc import Dashboard + + loop = asyncio.get_event_loop() + d = Dashboard (sys.stdin, loop) + loop.run_until_complete(d.run ()) + loop.run_forever() + diff --git a/crocoite/controller.py b/crocoite/controller.py index 8916726..45d9442 100644 --- a/crocoite/controller.py +++ b/crocoite/controller.py @@ -337,9 +337,9 @@ class RecursiveController: prefix=formatPrefix (self.prefix), suffix='.warc.gz', delete=False) destpath = os.path.join (self.output, os.path.basename (dest.name)) - logger = self.logger.bind (url=url, destfile=destpath) + logger = self.logger.bind (url=url) command = list (map (formatCommand, self.command)) - logger.info ('fetch', uuid='1680f384-744c-4b8a-815b-7346e632e8db', command=command) + logger.info ('fetch', uuid='1680f384-744c-4b8a-815b-7346e632e8db', command=command, destfile=destpath) process = await asyncio.create_subprocess_exec (*command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, stdin=asyncio.subprocess.DEVNULL, start_new_session=True) @@ -370,22 +370,28 @@ class RecursiveController: self._quit = True async def run (self): - self.have = set () - self.pending = set ([self.url]) - - while self.pending and not self._quit: + def log (): self.logger.info ('recursing', uuid='5b8498e4-868d-413c-a67e-004516b8452c', pending=len (self.pending), have=len (self.have), running=len (self.running)) + self.have = set () + self.pending = set ([self.url]) + + while self.pending: # since pending is a set this picks a random item, which is fine u = self.pending.pop () self.have.add (u) t = asyncio.ensure_future (self.fetch (u)) self.running.add (t) + + log () + if len (self.running) >= self.concurrency or not self.pending: done, pending = await asyncio.wait (self.running, return_when=asyncio.FIRST_COMPLETED) self.running.difference_update (done) + log () + diff --git a/crocoite/irc.py b/crocoite/irc.py index 7d1a96d..095c55f 100644 --- a/crocoite/irc.py +++ b/crocoite/irc.py @@ -29,6 +29,7 @@ from enum import IntEnum, Enum from collections import defaultdict from abc import abstractmethod from functools import wraps +from io import BytesIO import bottom ### helper functions ### @@ -205,7 +206,7 @@ class ArgparseBot (bottom.Client): self.nick = nick # map channel -> nick -> user self.users = defaultdict (dict) - self.logger = logger + self.logger = logger.bind (context=type (self).__name__) self.parser = self.getParser () # bot does not accept new queries in shutdown mode, unless explicitly @@ -238,7 +239,7 @@ class ArgparseBot (bottom.Client): await self.disconnect () async def onConnect (self, **kwargs): - self.logger.info ('connect', nick=self.nick) + self.logger.info ('connect', nick=self.nick, uuid='01f7b138-ea53-4609-88e9-61f3eca3e7e7') self.send('NICK', nick=self.nick) self.send('USER', user=self.nick, realname='https://github.com/PromyLOPh/crocoite') @@ -254,7 +255,7 @@ class ArgparseBot (bottom.Client): future.cancel() for c in self.channels: - self.logger.info ('join', channel=c) + self.logger.info ('join', channel=c, uuid='367063a5-9069-4025-907c-65ba88af8593') self.send ('JOIN', channel=c) # no need for NAMES here, server sends this automatically @@ -334,10 +335,10 @@ class ArgparseBot (bottom.Client): async def onDisconnect (**kwargs): """ Auto-reconnect """ - self.logger.info ('disconnect') + self.logger.info ('disconnect', uuid='4c74b2c8-2403-4921-879d-2279ad85db72') if not self._quit.armed: await asynio.sleep (10, loop=self.loop) - self.logger.info ('reconnect') + self.logger.info ('reconnect', uuid='c53555cb-e1a4-4b69-b1c9-3320269c19d7') await self.connect () def voice (func): @@ -413,7 +414,7 @@ class Chromebot (ArgparseBot): assert j.id not in self.jobs, 'duplicate job id' self.jobs[j.id] = j - logger = self.logger.bind (id=j.id, user=user.name, url=args.url) + logger = self.logger.bind (job=j.id) cmdline = ['crocoite-recursive', args.url, '--tempdir', self.tempdir, '--prefix', j.id + '-{host}-{date}-', '--policy', @@ -426,7 +427,8 @@ class Chromebot (ArgparseBot): } strargs = ', '.join (map (lambda x: '{}={}'.format (*x), showargs.items ())) reply ('{} has been queued as {} with {}'.format (args.url, j.id, strargs)) - logger.info ('queue', cmdline=cmdline) + logger.info ('queue', user=user.name, url=args.url, cmdline=cmdline, + uuid='36cc34a6-061b-4cc5-84a9-4ab6552c8d75') async with self.processLimit: if j.status == Status.pending: @@ -443,7 +445,7 @@ class Chromebot (ArgparseBot): # job is marked running after the first message is received from it if j.status == Status.pending: - logger.info ('start') + logger.info ('start', uuid='46e62d60-f498-4ab0-90e1-d08a073b10fb') j.status = Status.running data = json.loads (data) @@ -452,10 +454,15 @@ class Chromebot (ArgparseBot): j.stats = data elif msgid == '5b8498e4-868d-413c-a67e-004516b8452c': j.rstats = data + + # forward message, so the dashboard can use it + logger.info ('message', + uuid='5c0f9a11-dcd8-4182-a60f-54f4d3ab3687', + data=data) code = await j.process.wait () if j.status == Status.running: - logger.info ('finish') + logger.info ('finish', uuid='7b40ffbb-faab-4224-90ed-cd4febd8f7ec') j.status = Status.finished j.finished = datetime.utcnow () @@ -480,7 +487,89 @@ class Chromebot (ArgparseBot): return job.status = Status.aborted - self.logger.info ('abort', id=job.id, user=user.name) + self.logger.info ('abort', job=job.id, user=user.name, + uuid='865b3b3e-a54a-4a56-a545-f38a37bac295') if job.process and job.process.returncode is None: job.process.terminate () +import websockets + +class Dashboard: + __slots__ = ('fd', 'clients', 'loop', 'log', 'maxLog', 'pingInterval', 'pingTimeout') + # these messages will not be forwarded to the browser + ignoreMsgid = { + # connect + '01f7b138-ea53-4609-88e9-61f3eca3e7e7', + # join + '367063a5-9069-4025-907c-65ba88af8593', + # disconnect + '4c74b2c8-2403-4921-879d-2279ad85db72', + # reconnect + 'c53555cb-e1a4-4b69-b1c9-3320269c19d7', + } + + def __init__ (self, fd, loop, maxLog=1000, pingInterval=30, pingTimeout=10): + self.fd = fd + self.clients = set () + self.loop = loop + # log buffer + self.log = [] + self.maxLog = maxLog + self.pingInterval = pingInterval + self.pingTimeout = pingTimeout + + async def client (self, websocket, path): + self.clients.add (websocket) + try: + for l in self.log: + buf = json.dumps (l) + await websocket.send (buf) + while True: + try: + msg = await asyncio.wait_for (websocket.recv(), timeout=self.pingInterval) + except asyncio.TimeoutError: + try: + pong = await websocket.ping() + await asyncio.wait_for (pong, timeout=self.pingTimeout) + except asyncio.TimeoutError: + break + except websockets.exceptions.ConnectionClosed: + break + finally: + self.clients.remove (websocket) + + def handleStdin (self): + buf = self.fd.readline () + if not buf: + return + + data = json.loads (buf) + msgid = data['uuid'] + + if msgid in self.ignoreMsgid: + return + + # a few messages may contain sensitive information that we want to hide + if msgid == '36cc34a6-061b-4cc5-84a9-4ab6552c8d75': + # queue + del data['cmdline'] + elif msgid == '5c0f9a11-dcd8-4182-a60f-54f4d3ab3687': + nesteddata = data['data'] + nestedmsgid = nesteddata['uuid'] + if nestedmsgid == '1680f384-744c-4b8a-815b-7346e632e8db': + del nesteddata['command'] + del nesteddata['destfile'] + + buf = json.dumps (data) + for c in self.clients: + # XXX can’t await here + asyncio.ensure_future (c.send (buf)) + + self.log.append (data) + while len (self.log) > self.maxLog: + self.log.pop (0) + + def run (self, host='localhost', port=6789): + self.loop.add_reader (self.fd, self.handleStdin) + return websockets.serve(self.client, host, port) + diff --git a/setup.py b/setup.py index 0ca55f8..0debaf4 100644 --- a/setup.py +++ b/setup.py @@ -15,12 +15,14 @@ setup( 'html5lib>=0.999999999', 'bottom', 'pytz', + 'websockets', ], entry_points={ 'console_scripts': [ 'crocoite-grab = crocoite.cli:single', 'crocoite-recursive = crocoite.cli:recursive', 'crocoite-irc = crocoite.cli:irc', + 'crocoite-irc-dashboard = crocoite.cli:dashboard', 'crocoite-merge-warc = crocoite.tools:mergeWarc', 'crocoite-extract-screenshot = crocoite.tools:extractScreenshot', ], -- cgit v1.2.3