summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLars-Dominik Braun <lars@6xq.net>2018-10-14 12:41:37 +0200
committerLars-Dominik Braun <lars@6xq.net>2018-10-14 12:43:39 +0200
commit958563a3602780b48599c27acf212139c2e6904d (patch)
treebc3b3e1209691c320122b4586972e275563cc867
parent07994fb6b72b0c84d2ee2c69e5afdb204d33d5e6 (diff)
downloadcrocoite-958563a3602780b48599c27acf212139c2e6904d.tar.gz
crocoite-958563a3602780b48599c27acf212139c2e6904d.tar.bz2
crocoite-958563a3602780b48599c27acf212139c2e6904d.zip
irc: Add PoC dashboard
Using websockets, vue and bulma.
-rw-r--r--contrib/dashboard.css7
-rw-r--r--contrib/dashboard.html22
-rw-r--r--contrib/dashboard.js127
-rw-r--r--crocoite/cli.py8
-rw-r--r--crocoite/controller.py18
-rw-r--r--crocoite/irc.py109
-rw-r--r--setup.py2
7 files changed, 277 insertions, 16 deletions
diff --git a/contrib/dashboard.css b/contrib/dashboard.css
new file mode 100644
index 0000000..469db78
--- /dev/null
+++ b/contrib/dashboard.css
@@ -0,0 +1,7 @@
+.jid {
+ font-family: Consolas, mono;
+}
+.job .urls {
+ max-height: 10em;
+ overflow-y: scroll;
+}
diff --git a/contrib/dashboard.html b/contrib/dashboard.html
new file mode 100644
index 0000000..cc09d50
--- /dev/null
+++ b/contrib/dashboard.html
@@ -0,0 +1,22 @@
+<!DOCTYPE html>
+<html lang="en">
+ <head>
+ <meta charset="utf-8">
+ <meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
+ <title>chromebot dashboard</title>
+ <!--<script src="https://cdn.jsdelivr.net/npm/vue/dist/vue.js"></script>-->
+ <script src="https://cdn.jsdelivr.net/npm/vue@2/dist/vue.min.js"></script>
+ <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bulma@0.7/css/bulma.min.css">
+ <link rel="stylesheet" href="dashboard.css">
+ </head>
+ <body>
+ <noscript>Please enable JavaScript.</noscript>
+ <section id="app" class="section">
+ <h1 class="title">chromebot dashboard</h1>
+ <div class="jobs">
+ <job-item v-for="j in jobs" v-bind:job="j" v-bind:jobs="jobs" v-bind:ignored="ignored" v-bind:key="j.id"></job-item>
+ </div>
+ </section>
+ <script src="dashboard.js"></script>
+ </body>
+</html>
diff --git a/contrib/dashboard.js b/contrib/dashboard.js
new file mode 100644
index 0000000..eb34d43
--- /dev/null
+++ b/contrib/dashboard.js
@@ -0,0 +1,127 @@
+/* configuration */
+let socket = "ws://localhost:6789/",
+ urllogMax = 100;
+
+function formatSize (bytes) {
+ let prefixes = ['B', 'KiB', 'MiB', 'GiB', 'TiB'];
+ while (bytes >= 1024 && prefixes.length > 1) {
+ bytes /= 1024;
+ prefixes.shift ();
+ }
+ return bytes.toFixed (1) + ' ' + prefixes[0];
+}
+
+class Job {
+ constructor (id, url, user, queued) {
+ this.id = id;
+ this.url = url;
+ this.user = user;
+ this.status = undefined;
+ this.stats = {'pending': 0, 'have': 0, 'running': 0,
+ 'requests': 0, 'finished': 0, 'failed': 0,
+ 'bytesRcv': 0, 'crashed': 0, 'ignored': 0};
+ this.urllog = [];
+ this.queued = queued;
+ this.started = undefined;
+ this.finished = undefined;
+ this.aborted = undefined;
+ }
+
+ addUrl (url) {
+ if (this.urllog.push (url) > urllogMax) {
+ this.urllog.shift ();
+ }
+ }
+}
+
+let jobs = {};
+/* list of ignored job ids, i.e. those the user deleted from the dashboard */
+let ignored = [];
+let ws = new WebSocket(socket);
+ws.onmessage = function (event) {
+ var msg = JSON.parse (event.data);
+ let msgdate = new Date (Date.parse (msg.date));
+ var j = undefined;
+ console.log (msg);
+ if (msg.job) {
+ if (ignored.includes (msg.job)) {
+ console.log ("job ignored", msg.job);
+ return;
+ }
+ j = jobs[msg.job];
+ if (j === undefined) {
+ j = new Job (msg.job, 'unknown', '<unknown>', new Date ());
+ Vue.set (jobs, msg.job, j);
+ }
+ }
+ if (msg.uuid == '36cc34a6-061b-4cc5-84a9-4ab6552c8d75') {
+ j = new Job (msg.job, msg.url, msg.user, msgdate);
+ /* jobs[msg.job] = j does not work with vue, see
+ https://vuejs.org/v2/guide/list.html#Object-Change-Detection-Caveats
+ */
+ Vue.set (jobs, msg.job, j);
+ j.status = 'pending';
+ } else if (msg.uuid == '46e62d60-f498-4ab0-90e1-d08a073b10fb') {
+ j.status = 'running';
+ j.started = msgdate;
+ } else if (msg.uuid == '7b40ffbb-faab-4224-90ed-cd4febd8f7ec') {
+ j.status = 'finished';
+ j.finished = msgdate;
+ } else if (msg.uuid == '865b3b3e-a54a-4a56-a545-f38a37bac295') {
+ j.status = 'aborted';
+ j.aborted = msgdate;
+ } else if (msg.uuid == '5c0f9a11-dcd8-4182-a60f-54f4d3ab3687') {
+ /* forwarded job message */
+ let rmsg = msg.data;
+ if (rmsg.uuid == '24d92d16-770e-4088-b769-4020e127a7ff') {
+ /* job status */
+ Object.assign (j.stats, rmsg);
+ } else if (rmsg.uuid == '5b8498e4-868d-413c-a67e-004516b8452c') {
+ /* recursion status */
+ Object.assign (j.stats, rmsg);
+ } else if (rmsg.uuid == '1680f384-744c-4b8a-815b-7346e632e8db') {
+ /* fetch */
+ j.addUrl (rmsg.url);
+ }
+ }
+};
+ws.onopen = function (event) {
+};
+ws.onerror = function (event) {
+};
+
+Vue.component('job-item', {
+ props: ['job', 'jobs', 'ignored'],
+ template: '<div class="job box" :id="job.id"><ul class="columns"><li class="jid column is-narrow"><a :href="\'#\' + job.id">{{ job.id }}</a></li><li class="url column"><a :href="job.url">{{ job.url }}</a></li><li class="status column is-narrow"><job-status v-bind:job="job"></job-status></li><li class="column is-narrow"><a class="delete" v-on:click="del(job.id)"></a></li></ul><job-stats v-bind:job="job"></job-stats></div>',
+ methods: {
+ del: function (id) {
+ Vue.delete(this.jobs, id);
+ this.ignored.push (id);
+ }
+ }
+});
+Vue.component('job-status', {
+ props: ['job'],
+ template: '<span v-if="job.status == \'pending\'">queued on {{ job.queued.toLocaleString() }}</span><span v-else-if="job.status == \'aborted\'">aborted on {{ job.aborted.toLocaleString() }}</span><span v-else-if="job.status == \'running\'">running since {{ job.started.toLocaleString() }}</span><span v-else-if="job.status == \'finished\'">finished since {{ job.finished.toLocaleString() }}</span>'
+});
+Vue.component('job-stats', {
+ props: ['job'],
+ template: '<div><progress class="progress is-info" :value="job.stats.have" :max="job.stats.have+job.stats.pending+job.stats.running"></progress><ul class="stats columns"><li class="column">{{ job.stats.have }} <small>have</small></li><li class="column">{{ job.stats.running }} <small>running</small></li><li class="column">{{ job.stats.pending }} <small>pending</small></li><li class="column">{{ job.stats.requests }} <small>requests</small><li class="column"><filesize v-bind:value="job.stats.bytesRcv"></filesize></li></ul><job-urls v-bind:job="job"></job-urls></div>'
+});
+Vue.component('job-urls', {
+ props: ['job'],
+ template: '<ul class="urls"><li v-for="u in job.urllog">{{ u }}</li></ul>'
+});
+Vue.component('filesize', {
+ props: ['value'],
+ template: '<span class="filesize">{{ fvalue }}</span>',
+ computed: { fvalue: function () { return formatSize (this.value); } }
+});
+
+let app = new Vue({
+ el: '#app',
+ data: {
+ jobs: jobs,
+ }
+});
+
diff --git a/crocoite/cli.py b/crocoite/cli.py
index 913db7c..6365c78 100644
--- a/crocoite/cli.py
+++ b/crocoite/cli.py
@@ -142,3 +142,11 @@ def irc ():
loop.add_signal_handler (signal.SIGTERM, stop, signal.SIGTERM)
loop.run_until_complete(bot.run ())
+def dashboard ():
+ from .irc import Dashboard
+
+ loop = asyncio.get_event_loop()
+ d = Dashboard (sys.stdin, loop)
+ loop.run_until_complete(d.run ())
+ loop.run_forever()
+
diff --git a/crocoite/controller.py b/crocoite/controller.py
index 8916726..45d9442 100644
--- a/crocoite/controller.py
+++ b/crocoite/controller.py
@@ -337,9 +337,9 @@ class RecursiveController:
prefix=formatPrefix (self.prefix), suffix='.warc.gz',
delete=False)
destpath = os.path.join (self.output, os.path.basename (dest.name))
- logger = self.logger.bind (url=url, destfile=destpath)
+ logger = self.logger.bind (url=url)
command = list (map (formatCommand, self.command))
- logger.info ('fetch', uuid='1680f384-744c-4b8a-815b-7346e632e8db', command=command)
+ logger.info ('fetch', uuid='1680f384-744c-4b8a-815b-7346e632e8db', command=command, destfile=destpath)
process = await asyncio.create_subprocess_exec (*command, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL, stdin=asyncio.subprocess.DEVNULL,
start_new_session=True)
@@ -370,22 +370,28 @@ class RecursiveController:
self._quit = True
async def run (self):
- self.have = set ()
- self.pending = set ([self.url])
-
- while self.pending and not self._quit:
+ def log ():
self.logger.info ('recursing',
uuid='5b8498e4-868d-413c-a67e-004516b8452c',
pending=len (self.pending), have=len (self.have),
running=len (self.running))
+ self.have = set ()
+ self.pending = set ([self.url])
+
+ while self.pending:
# since pending is a set this picks a random item, which is fine
u = self.pending.pop ()
self.have.add (u)
t = asyncio.ensure_future (self.fetch (u))
self.running.add (t)
+
+ log ()
+
if len (self.running) >= self.concurrency or not self.pending:
done, pending = await asyncio.wait (self.running,
return_when=asyncio.FIRST_COMPLETED)
self.running.difference_update (done)
+ log ()
+
diff --git a/crocoite/irc.py b/crocoite/irc.py
index 7d1a96d..095c55f 100644
--- a/crocoite/irc.py
+++ b/crocoite/irc.py
@@ -29,6 +29,7 @@ from enum import IntEnum, Enum
from collections import defaultdict
from abc import abstractmethod
from functools import wraps
+from io import BytesIO
import bottom
### helper functions ###
@@ -205,7 +206,7 @@ class ArgparseBot (bottom.Client):
self.nick = nick
# map channel -> nick -> user
self.users = defaultdict (dict)
- self.logger = logger
+ self.logger = logger.bind (context=type (self).__name__)
self.parser = self.getParser ()
# bot does not accept new queries in shutdown mode, unless explicitly
@@ -238,7 +239,7 @@ class ArgparseBot (bottom.Client):
await self.disconnect ()
async def onConnect (self, **kwargs):
- self.logger.info ('connect', nick=self.nick)
+ self.logger.info ('connect', nick=self.nick, uuid='01f7b138-ea53-4609-88e9-61f3eca3e7e7')
self.send('NICK', nick=self.nick)
self.send('USER', user=self.nick, realname='https://github.com/PromyLOPh/crocoite')
@@ -254,7 +255,7 @@ class ArgparseBot (bottom.Client):
future.cancel()
for c in self.channels:
- self.logger.info ('join', channel=c)
+ self.logger.info ('join', channel=c, uuid='367063a5-9069-4025-907c-65ba88af8593')
self.send ('JOIN', channel=c)
# no need for NAMES here, server sends this automatically
@@ -334,10 +335,10 @@ class ArgparseBot (bottom.Client):
async def onDisconnect (**kwargs):
""" Auto-reconnect """
- self.logger.info ('disconnect')
+ self.logger.info ('disconnect', uuid='4c74b2c8-2403-4921-879d-2279ad85db72')
if not self._quit.armed:
await asynio.sleep (10, loop=self.loop)
- self.logger.info ('reconnect')
+ self.logger.info ('reconnect', uuid='c53555cb-e1a4-4b69-b1c9-3320269c19d7')
await self.connect ()
def voice (func):
@@ -413,7 +414,7 @@ class Chromebot (ArgparseBot):
assert j.id not in self.jobs, 'duplicate job id'
self.jobs[j.id] = j
- logger = self.logger.bind (id=j.id, user=user.name, url=args.url)
+ logger = self.logger.bind (job=j.id)
cmdline = ['crocoite-recursive', args.url, '--tempdir', self.tempdir,
'--prefix', j.id + '-{host}-{date}-', '--policy',
@@ -426,7 +427,8 @@ class Chromebot (ArgparseBot):
}
strargs = ', '.join (map (lambda x: '{}={}'.format (*x), showargs.items ()))
reply ('{} has been queued as {} with {}'.format (args.url, j.id, strargs))
- logger.info ('queue', cmdline=cmdline)
+ logger.info ('queue', user=user.name, url=args.url, cmdline=cmdline,
+ uuid='36cc34a6-061b-4cc5-84a9-4ab6552c8d75')
async with self.processLimit:
if j.status == Status.pending:
@@ -443,7 +445,7 @@ class Chromebot (ArgparseBot):
# job is marked running after the first message is received from it
if j.status == Status.pending:
- logger.info ('start')
+ logger.info ('start', uuid='46e62d60-f498-4ab0-90e1-d08a073b10fb')
j.status = Status.running
data = json.loads (data)
@@ -452,10 +454,15 @@ class Chromebot (ArgparseBot):
j.stats = data
elif msgid == '5b8498e4-868d-413c-a67e-004516b8452c':
j.rstats = data
+
+ # forward message, so the dashboard can use it
+ logger.info ('message',
+ uuid='5c0f9a11-dcd8-4182-a60f-54f4d3ab3687',
+ data=data)
code = await j.process.wait ()
if j.status == Status.running:
- logger.info ('finish')
+ logger.info ('finish', uuid='7b40ffbb-faab-4224-90ed-cd4febd8f7ec')
j.status = Status.finished
j.finished = datetime.utcnow ()
@@ -480,7 +487,89 @@ class Chromebot (ArgparseBot):
return
job.status = Status.aborted
- self.logger.info ('abort', id=job.id, user=user.name)
+ self.logger.info ('abort', job=job.id, user=user.name,
+ uuid='865b3b3e-a54a-4a56-a545-f38a37bac295')
if job.process and job.process.returncode is None:
job.process.terminate ()
+import websockets
+
+class Dashboard:
+ __slots__ = ('fd', 'clients', 'loop', 'log', 'maxLog', 'pingInterval', 'pingTimeout')
+ # these messages will not be forwarded to the browser
+ ignoreMsgid = {
+ # connect
+ '01f7b138-ea53-4609-88e9-61f3eca3e7e7',
+ # join
+ '367063a5-9069-4025-907c-65ba88af8593',
+ # disconnect
+ '4c74b2c8-2403-4921-879d-2279ad85db72',
+ # reconnect
+ 'c53555cb-e1a4-4b69-b1c9-3320269c19d7',
+ }
+
+ def __init__ (self, fd, loop, maxLog=1000, pingInterval=30, pingTimeout=10):
+ self.fd = fd
+ self.clients = set ()
+ self.loop = loop
+ # log buffer
+ self.log = []
+ self.maxLog = maxLog
+ self.pingInterval = pingInterval
+ self.pingTimeout = pingTimeout
+
+ async def client (self, websocket, path):
+ self.clients.add (websocket)
+ try:
+ for l in self.log:
+ buf = json.dumps (l)
+ await websocket.send (buf)
+ while True:
+ try:
+ msg = await asyncio.wait_for (websocket.recv(), timeout=self.pingInterval)
+ except asyncio.TimeoutError:
+ try:
+ pong = await websocket.ping()
+ await asyncio.wait_for (pong, timeout=self.pingTimeout)
+ except asyncio.TimeoutError:
+ break
+ except websockets.exceptions.ConnectionClosed:
+ break
+ finally:
+ self.clients.remove (websocket)
+
+ def handleStdin (self):
+ buf = self.fd.readline ()
+ if not buf:
+ return
+
+ data = json.loads (buf)
+ msgid = data['uuid']
+
+ if msgid in self.ignoreMsgid:
+ return
+
+ # a few messages may contain sensitive information that we want to hide
+ if msgid == '36cc34a6-061b-4cc5-84a9-4ab6552c8d75':
+ # queue
+ del data['cmdline']
+ elif msgid == '5c0f9a11-dcd8-4182-a60f-54f4d3ab3687':
+ nesteddata = data['data']
+ nestedmsgid = nesteddata['uuid']
+ if nestedmsgid == '1680f384-744c-4b8a-815b-7346e632e8db':
+ del nesteddata['command']
+ del nesteddata['destfile']
+
+ buf = json.dumps (data)
+ for c in self.clients:
+ # XXX can’t await here
+ asyncio.ensure_future (c.send (buf))
+
+ self.log.append (data)
+ while len (self.log) > self.maxLog:
+ self.log.pop (0)
+
+ def run (self, host='localhost', port=6789):
+ self.loop.add_reader (self.fd, self.handleStdin)
+ return websockets.serve(self.client, host, port)
+
diff --git a/setup.py b/setup.py
index 0ca55f8..0debaf4 100644
--- a/setup.py
+++ b/setup.py
@@ -15,12 +15,14 @@ setup(
'html5lib>=0.999999999',
'bottom',
'pytz',
+ 'websockets',
],
entry_points={
'console_scripts': [
'crocoite-grab = crocoite.cli:single',
'crocoite-recursive = crocoite.cli:recursive',
'crocoite-irc = crocoite.cli:irc',
+ 'crocoite-irc-dashboard = crocoite.cli:dashboard',
'crocoite-merge-warc = crocoite.tools:mergeWarc',
'crocoite-extract-screenshot = crocoite.tools:extractScreenshot',
],