summaryrefslogtreecommitdiff
path: root/crocoite
diff options
context:
space:
mode:
authorLars-Dominik Braun <lars@6xq.net>2018-12-22 10:28:11 +0100
committerLars-Dominik Braun <lars@6xq.net>2018-12-22 10:28:11 +0100
commit0b83e60c0b637ee4c9b7f2299dba8742e6b8fc5a (patch)
treeb31e4cd4c491850ccfe235eb4d1de4452065b3c1 /crocoite
parent0e176f24e93f02a266470d1a8924f020a3e2ddd5 (diff)
downloadcrocoite-0b83e60c0b637ee4c9b7f2299dba8742e6b8fc5a.tar.gz
crocoite-0b83e60c0b637ee4c9b7f2299dba8742e6b8fc5a.tar.bz2
crocoite-0b83e60c0b637ee4c9b7f2299dba8742e6b8fc5a.zip
Switch -recursive to asyncio’s .cancel()
RecursiveController used a custom .cancel() method before. Instead we can simply cancel .run() and handle the CancelledError inside run() and fetch().
Diffstat (limited to 'crocoite')
-rw-r--r--crocoite/cli.py5
-rw-r--r--crocoite/controller.py108
2 files changed, 58 insertions, 55 deletions
diff --git a/crocoite/cli.py b/crocoite/cli.py
index b0ad53a..f9ef52c 100644
--- a/crocoite/cli.py
+++ b/crocoite/cli.py
@@ -120,11 +120,12 @@ def recursive ():
tempdir=args.tempdir, prefix=args.prefix,
concurrency=args.concurrency)
+ run = asyncio.ensure_future (controller.run ())
loop = asyncio.get_event_loop()
- stop = lambda signum: controller.cancel ()
+ stop = lambda signum: run.cancel ()
loop.add_signal_handler (signal.SIGINT, stop, signal.SIGINT)
loop.add_signal_handler (signal.SIGTERM, stop, signal.SIGTERM)
- loop.run_until_complete(controller.run ())
+ loop.run_until_complete(run)
loop.close()
return 0
diff --git a/crocoite/controller.py b/crocoite/controller.py
index bf0d852..435f979 100644
--- a/crocoite/controller.py
+++ b/crocoite/controller.py
@@ -274,7 +274,7 @@ class RecursiveController:
"""
__slots__ = ('url', 'output', 'command', 'logger', 'policy', 'have',
- 'pending', 'stats', 'prefix', 'tempdir', 'running', 'concurrency', '_quit')
+ 'pending', 'stats', 'prefix', 'tempdir', 'running', 'concurrency')
SCHEME_WHITELIST = {'http', 'https'}
@@ -293,8 +293,6 @@ class RecursiveController:
self.concurrency = concurrency
# keep in sync with StatsHandler
self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0, 'crashed': 0, 'ignored': 0}
- # initiate graceful shutdown
- self._quit = False
async def fetch (self, url):
"""
@@ -327,38 +325,35 @@ class RecursiveController:
destpath = os.path.join (self.output, os.path.basename (dest.name))
command = list (map (formatCommand, self.command))
logger.info ('fetch', uuid='1680f384-744c-4b8a-815b-7346e632e8db', command=command, destfile=destpath)
- process = await asyncio.create_subprocess_exec (*command, stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.DEVNULL, stdin=asyncio.subprocess.DEVNULL,
- start_new_session=True)
- while True:
- data = await process.stdout.readline ()
- if not data:
- break
- data = json.loads (data)
- uuid = data.get ('uuid')
- if uuid == '8ee5e9c9-1130-4c5c-88ff-718508546e0c':
- links = set (self.policy (map (lambda x: x.with_fragment(None), data.get ('links', []))))
- links.difference_update (self.have)
- self.pending.update (links)
- elif uuid == '24d92d16-770e-4088-b769-4020e127a7ff':
- for k in self.stats.keys ():
- self.stats[k] += data.get (k, 0)
+ try:
+ process = await asyncio.create_subprocess_exec (*command, stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.DEVNULL, stdin=asyncio.subprocess.DEVNULL,
+ start_new_session=True)
+ while True:
+ data = await process.stdout.readline ()
+ if not data:
+ break
+ data = json.loads (data)
+ uuid = data.get ('uuid')
+ if uuid == '8ee5e9c9-1130-4c5c-88ff-718508546e0c':
+ links = set (self.policy (map (lambda x: x.with_fragment(None), data.get ('links', []))))
+ links.difference_update (self.have)
+ self.pending.update (links)
+ elif uuid == '24d92d16-770e-4088-b769-4020e127a7ff':
+ for k in self.stats.keys ():
+ self.stats[k] += data.get (k, 0)
+ logStats ()
+ except asyncio.CancelledError:
+ # graceful cancellation
+ process.send_signal (signal.SIGINT)
+ finally:
+ code = await process.wait()
+ if code == 0:
+ # atomically move once finished
+ os.rename (dest.name, destpath)
+ else:
+ self.stats['crashed'] += 1
logStats ()
- code = await process.wait()
- if code == 0:
- # atomically move once finished
- os.rename (dest.name, destpath)
- else:
- self.stats['crashed'] += 1
- logStats ()
-
- def cancel (self):
- """ Gracefully cancel this job, waiting for existing workers to shut down """
- self.logger.info ('cancel',
- uuid='d58154c8-ec27-40f2-ab9e-e25c1b21cd88',
- pending=len (self.pending), have=len (self.have),
- running=len (self.running))
- self._quit = True
async def run (self):
def log ():
@@ -367,24 +362,31 @@ class RecursiveController:
pending=len (self.pending), have=len (self.have),
running=len (self.running))
- self.have = set ()
- self.pending = set ([self.url])
-
- while self.pending and not self._quit:
- # since pending is a set this picks a random item, which is fine
- u = self.pending.pop ()
- self.have.add (u)
- t = asyncio.ensure_future (self.fetch (u))
- self.running.add (t)
-
+ try:
+ self.have = set ()
+ self.pending = set ([self.url])
+
+ while self.pending:
+ # since pending is a set this picks a random item, which is fine
+ u = self.pending.pop ()
+ self.have.add (u)
+ t = asyncio.ensure_future (self.fetch (u))
+ self.running.add (t)
+
+ log ()
+
+ if len (self.running) >= self.concurrency or not self.pending:
+ done, pending = await asyncio.wait (self.running,
+ return_when=asyncio.FIRST_COMPLETED)
+ self.running.difference_update (done)
+ except asyncio.CancelledError:
+ self.logger.info ('cancel',
+ uuid='d58154c8-ec27-40f2-ab9e-e25c1b21cd88',
+ pending=len (self.pending), have=len (self.have),
+ running=len (self.running))
+ finally:
+ done = await asyncio.gather (*self.running,
+ return_exceptions=True)
+ self.running = set ()
log ()
- if len (self.running) >= self.concurrency or not self.pending:
- done, pending = await asyncio.wait (self.running,
- return_when=asyncio.FIRST_COMPLETED)
- self.running.difference_update (done)
-
- done = asyncio.gather (*self.running)
- self.running = set ()
- log ()
-