From 3e69f8b34a48ffa4df4805c53aeaba144d91c6bc Mon Sep 17 00:00:00 2001 From: Lars-Dominik Braun Date: Thu, 11 Oct 2018 17:03:31 +0200 Subject: recursive: Gracefully shut down on SIGINT/TERM --- crocoite/cli.py | 5 ++++- crocoite/controller.py | 17 ++++++++++++++--- 2 files changed, 18 insertions(+), 4 deletions(-) (limited to 'crocoite') diff --git a/crocoite/cli.py b/crocoite/cli.py index 0319dc9..55ff4a1 100644 --- a/crocoite/cli.py +++ b/crocoite/cli.py @@ -22,7 +22,7 @@ Command line interface """ -import argparse, json, sys +import argparse, json, sys, signal from . import behavior from .controller import SinglePageController, defaultSettings, \ @@ -104,6 +104,9 @@ def recursive (): concurrency=args.concurrency) loop = asyncio.get_event_loop() + stop = lambda signum: controller.cancel () + loop.add_signal_handler (signal.SIGINT, stop, signal.SIGINT) + loop.add_signal_handler (signal.SIGTERM, stop, signal.SIGTERM) loop.run_until_complete(controller.run ()) loop.close() diff --git a/crocoite/controller.py b/crocoite/controller.py index ee05b04..8916726 100644 --- a/crocoite/controller.py +++ b/crocoite/controller.py @@ -292,7 +292,7 @@ class RecursiveController: """ __slots__ = ('url', 'output', 'command', 'logger', 'policy', 'have', - 'pending', 'stats', 'prefix', 'tempdir', 'running', 'concurrency') + 'pending', 'stats', 'prefix', 'tempdir', 'running', 'concurrency', '_quit') SCHEME_WHITELIST = {'http', 'https'} @@ -311,6 +311,8 @@ class RecursiveController: self.concurrency = concurrency # keep in sync with StatsHandler self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0, 'crashed': 0, 'ignored': 0} + # initiate graceful shutdown + self._quit = False async def fetch (self, url): """ @@ -339,7 +341,8 @@ class RecursiveController: command = list (map (formatCommand, self.command)) logger.info ('fetch', uuid='1680f384-744c-4b8a-815b-7346e632e8db', command=command) process = await asyncio.create_subprocess_exec (*command, stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.DEVNULL, stdin=asyncio.subprocess.DEVNULL) + stderr=asyncio.subprocess.DEVNULL, stdin=asyncio.subprocess.DEVNULL, + start_new_session=True) while True: data = await process.stdout.readline () if not data: @@ -358,11 +361,19 @@ class RecursiveController: # atomically move once finished os.rename (dest.name, destpath) + def cancel (self): + """ Gracefully cancel this job, waiting for existing workers to shut down """ + self.logger.info ('cancel', + uuid='d58154c8-ec27-40f2-ab9e-e25c1b21cd88', + pending=len (self.pending), have=len (self.have), + running=len (self.running)) + self._quit = True + async def run (self): self.have = set () self.pending = set ([self.url]) - while self.pending: + while self.pending and not self._quit: self.logger.info ('recursing', uuid='5b8498e4-868d-413c-a67e-004516b8452c', pending=len (self.pending), have=len (self.have), -- cgit v1.2.3