diff options
Diffstat (limited to 'crocoite')
| -rw-r--r-- | crocoite/cli.py | 8 | ||||
| -rw-r--r-- | crocoite/controller.py | 18 | ||||
| -rw-r--r-- | crocoite/irc.py | 109 | 
3 files changed, 119 insertions, 16 deletions
| diff --git a/crocoite/cli.py b/crocoite/cli.py index 913db7c..6365c78 100644 --- a/crocoite/cli.py +++ b/crocoite/cli.py @@ -142,3 +142,11 @@ def irc ():      loop.add_signal_handler (signal.SIGTERM, stop, signal.SIGTERM)      loop.run_until_complete(bot.run ()) +def dashboard (): +    from .irc import Dashboard + +    loop = asyncio.get_event_loop() +    d = Dashboard (sys.stdin, loop) +    loop.run_until_complete(d.run ()) +    loop.run_forever() + diff --git a/crocoite/controller.py b/crocoite/controller.py index 8916726..45d9442 100644 --- a/crocoite/controller.py +++ b/crocoite/controller.py @@ -337,9 +337,9 @@ class RecursiveController:                  prefix=formatPrefix (self.prefix), suffix='.warc.gz',                  delete=False)          destpath = os.path.join (self.output, os.path.basename (dest.name)) -        logger = self.logger.bind (url=url, destfile=destpath) +        logger = self.logger.bind (url=url)          command = list (map (formatCommand, self.command)) -        logger.info ('fetch', uuid='1680f384-744c-4b8a-815b-7346e632e8db', command=command) +        logger.info ('fetch', uuid='1680f384-744c-4b8a-815b-7346e632e8db', command=command, destfile=destpath)          process = await asyncio.create_subprocess_exec (*command, stdout=asyncio.subprocess.PIPE,                  stderr=asyncio.subprocess.DEVNULL, stdin=asyncio.subprocess.DEVNULL,                  start_new_session=True) @@ -370,22 +370,28 @@ class RecursiveController:          self._quit = True      async def run (self): -        self.have = set () -        self.pending = set ([self.url]) - -        while self.pending and not self._quit: +        def log ():              self.logger.info ('recursing',                      uuid='5b8498e4-868d-413c-a67e-004516b8452c',                      pending=len (self.pending), have=len (self.have),                      running=len (self.running)) +        self.have = set () +        self.pending = set ([self.url]) + +        while self.pending:              # since pending is a set this picks a random item, which is fine              u = self.pending.pop ()              self.have.add (u)              t = asyncio.ensure_future (self.fetch (u))              self.running.add (t) + +            log () +              if len (self.running) >= self.concurrency or not self.pending:                  done, pending = await asyncio.wait (self.running,                          return_when=asyncio.FIRST_COMPLETED)                  self.running.difference_update (done) +        log () + diff --git a/crocoite/irc.py b/crocoite/irc.py index 7d1a96d..095c55f 100644 --- a/crocoite/irc.py +++ b/crocoite/irc.py @@ -29,6 +29,7 @@ from enum import IntEnum, Enum  from collections import defaultdict  from abc import abstractmethod  from functools import wraps +from io import BytesIO  import bottom  ### helper functions ### @@ -205,7 +206,7 @@ class ArgparseBot (bottom.Client):          self.nick = nick          # map channel -> nick -> user          self.users = defaultdict (dict) -        self.logger = logger +        self.logger = logger.bind (context=type (self).__name__)          self.parser = self.getParser ()          # bot does not accept new queries in shutdown mode, unless explicitly @@ -238,7 +239,7 @@ class ArgparseBot (bottom.Client):          await self.disconnect ()      async def onConnect (self, **kwargs): -        self.logger.info ('connect', nick=self.nick) +        self.logger.info ('connect', nick=self.nick, uuid='01f7b138-ea53-4609-88e9-61f3eca3e7e7')          self.send('NICK', nick=self.nick)          self.send('USER', user=self.nick, realname='https://github.com/PromyLOPh/crocoite') @@ -254,7 +255,7 @@ class ArgparseBot (bottom.Client):              future.cancel()          for c in self.channels: -            self.logger.info ('join', channel=c) +            self.logger.info ('join', channel=c, uuid='367063a5-9069-4025-907c-65ba88af8593')              self.send ('JOIN', channel=c)              # no need for NAMES here, server sends this automatically @@ -334,10 +335,10 @@ class ArgparseBot (bottom.Client):      async def onDisconnect (**kwargs):          """ Auto-reconnect """ -        self.logger.info ('disconnect') +        self.logger.info ('disconnect', uuid='4c74b2c8-2403-4921-879d-2279ad85db72')          if not self._quit.armed:              await asynio.sleep (10, loop=self.loop) -            self.logger.info ('reconnect') +            self.logger.info ('reconnect', uuid='c53555cb-e1a4-4b69-b1c9-3320269c19d7')              await self.connect ()  def voice (func): @@ -413,7 +414,7 @@ class Chromebot (ArgparseBot):          assert j.id not in self.jobs, 'duplicate job id'          self.jobs[j.id] = j -        logger = self.logger.bind (id=j.id, user=user.name, url=args.url) +        logger = self.logger.bind (job=j.id)          cmdline = ['crocoite-recursive', args.url, '--tempdir', self.tempdir,                  '--prefix', j.id + '-{host}-{date}-', '--policy', @@ -426,7 +427,8 @@ class Chromebot (ArgparseBot):                  }          strargs = ', '.join (map (lambda x: '{}={}'.format (*x), showargs.items ()))          reply ('{} has been queued as {} with {}'.format (args.url, j.id, strargs)) -        logger.info ('queue', cmdline=cmdline) +        logger.info ('queue', user=user.name, url=args.url, cmdline=cmdline, +                uuid='36cc34a6-061b-4cc5-84a9-4ab6552c8d75')          async with self.processLimit:              if j.status == Status.pending: @@ -443,7 +445,7 @@ class Chromebot (ArgparseBot):                      # job is marked running after the first message is received from it                      if j.status == Status.pending: -                        logger.info ('start') +                        logger.info ('start', uuid='46e62d60-f498-4ab0-90e1-d08a073b10fb')                          j.status = Status.running                      data = json.loads (data) @@ -452,10 +454,15 @@ class Chromebot (ArgparseBot):                          j.stats = data                      elif msgid == '5b8498e4-868d-413c-a67e-004516b8452c':                          j.rstats = data + +                    # forward message, so the dashboard can use it +                    logger.info ('message', +                            uuid='5c0f9a11-dcd8-4182-a60f-54f4d3ab3687', +                            data=data)                  code = await j.process.wait ()          if j.status == Status.running: -            logger.info ('finish') +            logger.info ('finish', uuid='7b40ffbb-faab-4224-90ed-cd4febd8f7ec')              j.status = Status.finished          j.finished = datetime.utcnow () @@ -480,7 +487,89 @@ class Chromebot (ArgparseBot):              return          job.status = Status.aborted -        self.logger.info ('abort', id=job.id, user=user.name) +        self.logger.info ('abort', job=job.id, user=user.name, +                uuid='865b3b3e-a54a-4a56-a545-f38a37bac295')          if job.process and job.process.returncode is None:              job.process.terminate () +import websockets + +class Dashboard: +    __slots__ = ('fd', 'clients', 'loop', 'log', 'maxLog', 'pingInterval', 'pingTimeout') +    # these messages will not be forwarded to the browser +    ignoreMsgid = { +            # connect +            '01f7b138-ea53-4609-88e9-61f3eca3e7e7', +            # join +            '367063a5-9069-4025-907c-65ba88af8593', +            # disconnect +            '4c74b2c8-2403-4921-879d-2279ad85db72', +            # reconnect +            'c53555cb-e1a4-4b69-b1c9-3320269c19d7', +            } + +    def __init__ (self, fd, loop, maxLog=1000, pingInterval=30, pingTimeout=10): +        self.fd = fd +        self.clients = set () +        self.loop = loop +        # log buffer +        self.log = [] +        self.maxLog = maxLog +        self.pingInterval = pingInterval +        self.pingTimeout = pingTimeout + +    async def client (self, websocket, path): +        self.clients.add (websocket) +        try: +            for l in self.log: +                buf = json.dumps (l) +                await websocket.send (buf) +            while True: +                try: +                    msg = await asyncio.wait_for (websocket.recv(), timeout=self.pingInterval) +                except asyncio.TimeoutError: +                    try: +                        pong = await websocket.ping() +                        await asyncio.wait_for (pong, timeout=self.pingTimeout) +                    except asyncio.TimeoutError: +                        break +                except websockets.exceptions.ConnectionClosed: +                    break +        finally: +            self.clients.remove (websocket) + +    def handleStdin (self): +        buf = self.fd.readline () +        if not buf: +            return + +        data = json.loads (buf) +        msgid = data['uuid'] + +        if msgid in self.ignoreMsgid: +            return + +        # a few messages may contain sensitive information that we want to hide +        if msgid == '36cc34a6-061b-4cc5-84a9-4ab6552c8d75': +            # queue +            del data['cmdline'] +        elif msgid == '5c0f9a11-dcd8-4182-a60f-54f4d3ab3687': +            nesteddata = data['data'] +            nestedmsgid = nesteddata['uuid'] +            if nestedmsgid == '1680f384-744c-4b8a-815b-7346e632e8db': +                del nesteddata['command'] +                del nesteddata['destfile'] +             +        buf = json.dumps (data) +        for c in self.clients: +            # XXX can’t await here +            asyncio.ensure_future (c.send (buf)) + +        self.log.append (data) +        while len (self.log) > self.maxLog: +            self.log.pop (0) + +    def run (self, host='localhost', port=6789): +        self.loop.add_reader (self.fd, self.handleStdin) +        return websockets.serve(self.client, host, port) + | 
