summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLars-Dominik Braun <lars@6xq.net>2018-09-25 16:37:21 +0200
committerLars-Dominik Braun <lars@6xq.net>2018-09-25 17:01:26 +0200
commitdcf48e1eb4f61fde83e525ddfe0850efbf1d79bd (patch)
tree207c47dbd9d09c97b254411723ee12fc4c4a1680
parent2ef2ed8202bd5249cda78f135d64f5add9a461ea (diff)
downloadcrocoite-dcf48e1eb4f61fde83e525ddfe0850efbf1d79bd.tar.gz
crocoite-dcf48e1eb4f61fde83e525ddfe0850efbf1d79bd.tar.bz2
crocoite-dcf48e1eb4f61fde83e525ddfe0850efbf1d79bd.zip
Parallelize recursive grabs
❤️ asyncio.
-rw-r--r--crocoite/cli.py4
-rw-r--r--crocoite/controller.py18
2 files changed, 17 insertions, 5 deletions
diff --git a/crocoite/cli.py b/crocoite/cli.py
index 6167249..3f5904c 100644
--- a/crocoite/cli.py
+++ b/crocoite/cli.py
@@ -85,6 +85,7 @@ def recursive ():
parser.add_argument('--policy', help='Recursion policy', metavar='POLICY')
parser.add_argument('--tempdir', help='Directory for temporary files', metavar='DIR')
parser.add_argument('--prefix', help='Output filename prefix, supports templates {host} and {date}', metavar='FILENAME', default='{host}-{date}-')
+ parser.add_argument('--concurrency', '-j', help='Run at most N jobs', metavar='N', default=1, type=int)
parser.add_argument('url', help='Seed URL', metavar='URL')
parser.add_argument('output', help='Output directory', metavar='DIR')
parser.add_argument('command', help='Fetch command, supports templates {url} and {dest}', metavar='CMD', nargs='*', default=['crocoite-grab', '{url}', '{dest}'])
@@ -99,7 +100,8 @@ def recursive ():
controller = RecursiveController (url=args.url, output=args.output,
command=args.command, logger=logger, policy=policy,
- tempdir=args.tempdir, prefix=args.prefix)
+ tempdir=args.tempdir, prefix=args.prefix,
+ concurrency=args.concurrency)
loop = asyncio.get_event_loop()
loop.run_until_complete(controller.run ())
diff --git a/crocoite/controller.py b/crocoite/controller.py
index 8c5a30c..fffd024 100644
--- a/crocoite/controller.py
+++ b/crocoite/controller.py
@@ -290,10 +290,10 @@ class RecursiveController:
"""
__slots__ = ('url', 'output', 'command', 'logger', 'policy', 'have',
- 'pending', 'stats', 'prefix', 'tempdir')
+ 'pending', 'stats', 'prefix', 'tempdir', 'running', 'concurrency')
def __init__ (self, url, output, command, logger, prefix='{host}-{date}-',
- tempdir=None, policy=DepthLimit (0)):
+ tempdir=None, policy=DepthLimit (0), concurrency=1):
self.url = url
self.output = output
self.command = command
@@ -301,6 +301,10 @@ class RecursiveController:
self.logger = logger.bind (context=type(self).__name__, seedurl=url)
self.policy = policy
self.tempdir = tempdir
+ # tasks currently running
+ self.running = set ()
+ # max number of tasks running
+ self.concurrency = concurrency
# keep in sync with StatsHandler
self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0, 'crashed': 0}
@@ -350,10 +354,16 @@ class RecursiveController:
while self.pending:
self.logger.info ('recursing',
uuid='5b8498e4-868d-413c-a67e-004516b8452c',
- pending=len (self.pending), have=len (self.have))
+ pending=len (self.pending), have=len (self.have),
+ running=len (self.running))
# since pending is a set this picks a random item, which is fine
u = self.pending.pop ()
self.have.add (u)
- await self.fetch (u)
+ t = asyncio.ensure_future (self.fetch (u))
+ self.running.add (t)
+ if len (self.running) >= self.concurrency or not self.pending:
+ done, pending = await asyncio.wait (self.running,
+ return_when=asyncio.FIRST_COMPLETED)
+ self.running.difference_update (done)