From cbcdde65aa667369b0890a042e5b44d6b1e377aa Mon Sep 17 00:00:00 2001 From: Lars-Dominik Braun Date: Sat, 29 Sep 2018 14:46:09 +0200 Subject: irc: Limit number of processes spawned --- crocoite/cli.py | 3 ++- crocoite/irc.py | 43 +++++++++++++++++++++++-------------------- 2 files changed, 25 insertions(+), 21 deletions(-) (limited to 'crocoite') diff --git a/crocoite/cli.py b/crocoite/cli.py index dadfc45..8473a0d 100644 --- a/crocoite/cli.py +++ b/crocoite/cli.py @@ -122,7 +122,8 @@ def irc (): nick=s.get ('nick'), channels=[s.get ('channel')], tempdir=s.get ('tempdir'), - destdir=s.get ('destdir')) + destdir=s.get ('destdir'), + processLimit=s.getint ('process_limit')) bot.loop.create_task(bot.connect()) bot.loop.run_forever() diff --git a/crocoite/irc.py b/crocoite/irc.py index fea09b3..d2eda45 100644 --- a/crocoite/irc.py +++ b/crocoite/irc.py @@ -106,15 +106,17 @@ class Job: prettyBytes (stats.get ('bytesRcv', 0))) class Bot(bottom.Client): - __slots__ = ('jobs', 'channels', 'nick', 'tempdir', 'destdir', 'parser') + __slots__ = ('jobs', 'channels', 'nick', 'tempdir', 'destdir', 'parser', 'processLimit') - def __init__ (self, host, port, ssl, nick, channels=[], tempdir=tempfile.gettempdir(), destdir='.'): + def __init__ (self, host, port, ssl, nick, channels=[], + tempdir=tempfile.gettempdir(), destdir='.', processLimit=1): super().__init__ (host=host, port=port, ssl=ssl) self.jobs = {} self.channels = channels self.nick = nick self.tempdir = tempdir self.destdir = destdir + self.processLimit = asyncio.Semaphore (processLimit) self.parser = NonExitingArgumentParser (prog=self.nick + ': ', add_help=False) subparsers = self.parser.add_subparsers(help='Sub-commands') @@ -162,24 +164,25 @@ class Bot(bottom.Client): self.send ('PRIVMSG', target=target, message='{}: {} has been queued as {} with {}'.format ( nick, args.url, j.id, strargs)) - j.process = await asyncio.create_subprocess_exec (*cmdline, stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.DEVNULL, stdin=asyncio.subprocess.DEVNULL) - while True: - data = await j.process.stdout.readline () - if not data: - break - - # job is marked running after the first message is received from it - if j.status == Status.pending: - j.status = Status.running - - data = json.loads (data) - msgid = data.get ('uuid') - if msgid == '24d92d16-770e-4088-b769-4020e127a7ff': - j.stats = data - elif msgid == '5b8498e4-868d-413c-a67e-004516b8452c': - j.rstats = data - code = await j.process.wait () + async with self.processLimit: + j.process = await asyncio.create_subprocess_exec (*cmdline, stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL, stdin=asyncio.subprocess.DEVNULL) + while True: + data = await j.process.stdout.readline () + if not data: + break + + # job is marked running after the first message is received from it + if j.status == Status.pending: + j.status = Status.running + + data = json.loads (data) + msgid = data.get ('uuid') + if msgid == '24d92d16-770e-4088-b769-4020e127a7ff': + j.stats = data + elif msgid == '5b8498e4-868d-413c-a67e-004516b8452c': + j.rstats = data + code = await j.process.wait () if j.status == Status.running: j.status = Status.finished -- cgit v1.2.3