summaryrefslogtreecommitdiff
path: root/crocoite
diff options
context:
space:
mode:
Diffstat (limited to 'crocoite')
-rw-r--r--crocoite/cli.py8
-rw-r--r--crocoite/controller.py18
-rw-r--r--crocoite/irc.py109
3 files changed, 119 insertions, 16 deletions
diff --git a/crocoite/cli.py b/crocoite/cli.py
index 913db7c..6365c78 100644
--- a/crocoite/cli.py
+++ b/crocoite/cli.py
@@ -142,3 +142,11 @@ def irc ():
loop.add_signal_handler (signal.SIGTERM, stop, signal.SIGTERM)
loop.run_until_complete(bot.run ())
+def dashboard ():
+ from .irc import Dashboard
+
+ loop = asyncio.get_event_loop()
+ d = Dashboard (sys.stdin, loop)
+ loop.run_until_complete(d.run ())
+ loop.run_forever()
+
diff --git a/crocoite/controller.py b/crocoite/controller.py
index 8916726..45d9442 100644
--- a/crocoite/controller.py
+++ b/crocoite/controller.py
@@ -337,9 +337,9 @@ class RecursiveController:
prefix=formatPrefix (self.prefix), suffix='.warc.gz',
delete=False)
destpath = os.path.join (self.output, os.path.basename (dest.name))
- logger = self.logger.bind (url=url, destfile=destpath)
+ logger = self.logger.bind (url=url)
command = list (map (formatCommand, self.command))
- logger.info ('fetch', uuid='1680f384-744c-4b8a-815b-7346e632e8db', command=command)
+ logger.info ('fetch', uuid='1680f384-744c-4b8a-815b-7346e632e8db', command=command, destfile=destpath)
process = await asyncio.create_subprocess_exec (*command, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL, stdin=asyncio.subprocess.DEVNULL,
start_new_session=True)
@@ -370,22 +370,28 @@ class RecursiveController:
self._quit = True
async def run (self):
- self.have = set ()
- self.pending = set ([self.url])
-
- while self.pending and not self._quit:
+ def log ():
self.logger.info ('recursing',
uuid='5b8498e4-868d-413c-a67e-004516b8452c',
pending=len (self.pending), have=len (self.have),
running=len (self.running))
+ self.have = set ()
+ self.pending = set ([self.url])
+
+ while self.pending:
# since pending is a set this picks a random item, which is fine
u = self.pending.pop ()
self.have.add (u)
t = asyncio.ensure_future (self.fetch (u))
self.running.add (t)
+
+ log ()
+
if len (self.running) >= self.concurrency or not self.pending:
done, pending = await asyncio.wait (self.running,
return_when=asyncio.FIRST_COMPLETED)
self.running.difference_update (done)
+ log ()
+
diff --git a/crocoite/irc.py b/crocoite/irc.py
index 7d1a96d..095c55f 100644
--- a/crocoite/irc.py
+++ b/crocoite/irc.py
@@ -29,6 +29,7 @@ from enum import IntEnum, Enum
from collections import defaultdict
from abc import abstractmethod
from functools import wraps
+from io import BytesIO
import bottom
### helper functions ###
@@ -205,7 +206,7 @@ class ArgparseBot (bottom.Client):
self.nick = nick
# map channel -> nick -> user
self.users = defaultdict (dict)
- self.logger = logger
+ self.logger = logger.bind (context=type (self).__name__)
self.parser = self.getParser ()
# bot does not accept new queries in shutdown mode, unless explicitly
@@ -238,7 +239,7 @@ class ArgparseBot (bottom.Client):
await self.disconnect ()
async def onConnect (self, **kwargs):
- self.logger.info ('connect', nick=self.nick)
+ self.logger.info ('connect', nick=self.nick, uuid='01f7b138-ea53-4609-88e9-61f3eca3e7e7')
self.send('NICK', nick=self.nick)
self.send('USER', user=self.nick, realname='https://github.com/PromyLOPh/crocoite')
@@ -254,7 +255,7 @@ class ArgparseBot (bottom.Client):
future.cancel()
for c in self.channels:
- self.logger.info ('join', channel=c)
+ self.logger.info ('join', channel=c, uuid='367063a5-9069-4025-907c-65ba88af8593')
self.send ('JOIN', channel=c)
# no need for NAMES here, server sends this automatically
@@ -334,10 +335,10 @@ class ArgparseBot (bottom.Client):
async def onDisconnect (**kwargs):
""" Auto-reconnect """
- self.logger.info ('disconnect')
+ self.logger.info ('disconnect', uuid='4c74b2c8-2403-4921-879d-2279ad85db72')
if not self._quit.armed:
await asynio.sleep (10, loop=self.loop)
- self.logger.info ('reconnect')
+ self.logger.info ('reconnect', uuid='c53555cb-e1a4-4b69-b1c9-3320269c19d7')
await self.connect ()
def voice (func):
@@ -413,7 +414,7 @@ class Chromebot (ArgparseBot):
assert j.id not in self.jobs, 'duplicate job id'
self.jobs[j.id] = j
- logger = self.logger.bind (id=j.id, user=user.name, url=args.url)
+ logger = self.logger.bind (job=j.id)
cmdline = ['crocoite-recursive', args.url, '--tempdir', self.tempdir,
'--prefix', j.id + '-{host}-{date}-', '--policy',
@@ -426,7 +427,8 @@ class Chromebot (ArgparseBot):
}
strargs = ', '.join (map (lambda x: '{}={}'.format (*x), showargs.items ()))
reply ('{} has been queued as {} with {}'.format (args.url, j.id, strargs))
- logger.info ('queue', cmdline=cmdline)
+ logger.info ('queue', user=user.name, url=args.url, cmdline=cmdline,
+ uuid='36cc34a6-061b-4cc5-84a9-4ab6552c8d75')
async with self.processLimit:
if j.status == Status.pending:
@@ -443,7 +445,7 @@ class Chromebot (ArgparseBot):
# job is marked running after the first message is received from it
if j.status == Status.pending:
- logger.info ('start')
+ logger.info ('start', uuid='46e62d60-f498-4ab0-90e1-d08a073b10fb')
j.status = Status.running
data = json.loads (data)
@@ -452,10 +454,15 @@ class Chromebot (ArgparseBot):
j.stats = data
elif msgid == '5b8498e4-868d-413c-a67e-004516b8452c':
j.rstats = data
+
+ # forward message, so the dashboard can use it
+ logger.info ('message',
+ uuid='5c0f9a11-dcd8-4182-a60f-54f4d3ab3687',
+ data=data)
code = await j.process.wait ()
if j.status == Status.running:
- logger.info ('finish')
+ logger.info ('finish', uuid='7b40ffbb-faab-4224-90ed-cd4febd8f7ec')
j.status = Status.finished
j.finished = datetime.utcnow ()
@@ -480,7 +487,89 @@ class Chromebot (ArgparseBot):
return
job.status = Status.aborted
- self.logger.info ('abort', id=job.id, user=user.name)
+ self.logger.info ('abort', job=job.id, user=user.name,
+ uuid='865b3b3e-a54a-4a56-a545-f38a37bac295')
if job.process and job.process.returncode is None:
job.process.terminate ()
+import websockets
+
+class Dashboard:
+ __slots__ = ('fd', 'clients', 'loop', 'log', 'maxLog', 'pingInterval', 'pingTimeout')
+ # these messages will not be forwarded to the browser
+ ignoreMsgid = {
+ # connect
+ '01f7b138-ea53-4609-88e9-61f3eca3e7e7',
+ # join
+ '367063a5-9069-4025-907c-65ba88af8593',
+ # disconnect
+ '4c74b2c8-2403-4921-879d-2279ad85db72',
+ # reconnect
+ 'c53555cb-e1a4-4b69-b1c9-3320269c19d7',
+ }
+
+ def __init__ (self, fd, loop, maxLog=1000, pingInterval=30, pingTimeout=10):
+ self.fd = fd
+ self.clients = set ()
+ self.loop = loop
+ # log buffer
+ self.log = []
+ self.maxLog = maxLog
+ self.pingInterval = pingInterval
+ self.pingTimeout = pingTimeout
+
+ async def client (self, websocket, path):
+ self.clients.add (websocket)
+ try:
+ for l in self.log:
+ buf = json.dumps (l)
+ await websocket.send (buf)
+ while True:
+ try:
+ msg = await asyncio.wait_for (websocket.recv(), timeout=self.pingInterval)
+ except asyncio.TimeoutError:
+ try:
+ pong = await websocket.ping()
+ await asyncio.wait_for (pong, timeout=self.pingTimeout)
+ except asyncio.TimeoutError:
+ break
+ except websockets.exceptions.ConnectionClosed:
+ break
+ finally:
+ self.clients.remove (websocket)
+
+ def handleStdin (self):
+ buf = self.fd.readline ()
+ if not buf:
+ return
+
+ data = json.loads (buf)
+ msgid = data['uuid']
+
+ if msgid in self.ignoreMsgid:
+ return
+
+ # a few messages may contain sensitive information that we want to hide
+ if msgid == '36cc34a6-061b-4cc5-84a9-4ab6552c8d75':
+ # queue
+ del data['cmdline']
+ elif msgid == '5c0f9a11-dcd8-4182-a60f-54f4d3ab3687':
+ nesteddata = data['data']
+ nestedmsgid = nesteddata['uuid']
+ if nestedmsgid == '1680f384-744c-4b8a-815b-7346e632e8db':
+ del nesteddata['command']
+ del nesteddata['destfile']
+
+ buf = json.dumps (data)
+ for c in self.clients:
+ # XXX can’t await here
+ asyncio.ensure_future (c.send (buf))
+
+ self.log.append (data)
+ while len (self.log) > self.maxLog:
+ self.log.pop (0)
+
+ def run (self, host='localhost', port=6789):
+ self.loop.add_reader (self.fd, self.handleStdin)
+ return websockets.serve(self.client, host, port)
+