diff options
Diffstat (limited to 'crocoite')
| -rw-r--r-- | crocoite/cli.py | 4 | ||||
| -rw-r--r-- | crocoite/controller.py | 18 | 
2 files changed, 17 insertions, 5 deletions
| diff --git a/crocoite/cli.py b/crocoite/cli.py index 6167249..3f5904c 100644 --- a/crocoite/cli.py +++ b/crocoite/cli.py @@ -85,6 +85,7 @@ def recursive ():      parser.add_argument('--policy', help='Recursion policy', metavar='POLICY')      parser.add_argument('--tempdir', help='Directory for temporary files', metavar='DIR')      parser.add_argument('--prefix', help='Output filename prefix, supports templates {host} and {date}', metavar='FILENAME', default='{host}-{date}-') +    parser.add_argument('--concurrency', '-j', help='Run at most N jobs', metavar='N', default=1, type=int)      parser.add_argument('url', help='Seed URL', metavar='URL')      parser.add_argument('output', help='Output directory', metavar='DIR')      parser.add_argument('command', help='Fetch command, supports templates {url} and {dest}', metavar='CMD', nargs='*', default=['crocoite-grab', '{url}', '{dest}']) @@ -99,7 +100,8 @@ def recursive ():      controller = RecursiveController (url=args.url, output=args.output,              command=args.command, logger=logger, policy=policy, -            tempdir=args.tempdir, prefix=args.prefix) +            tempdir=args.tempdir, prefix=args.prefix, +            concurrency=args.concurrency)      loop = asyncio.get_event_loop()      loop.run_until_complete(controller.run ()) diff --git a/crocoite/controller.py b/crocoite/controller.py index 8c5a30c..fffd024 100644 --- a/crocoite/controller.py +++ b/crocoite/controller.py @@ -290,10 +290,10 @@ class RecursiveController:      """      __slots__ = ('url', 'output', 'command', 'logger', 'policy', 'have', -            'pending', 'stats', 'prefix', 'tempdir') +            'pending', 'stats', 'prefix', 'tempdir', 'running', 'concurrency')      def __init__ (self, url, output, command, logger, prefix='{host}-{date}-', -            tempdir=None, policy=DepthLimit (0)): +            tempdir=None, policy=DepthLimit (0), concurrency=1):          self.url = url          self.output = output          self.command = command @@ -301,6 +301,10 @@ class RecursiveController:          self.logger = logger.bind (context=type(self).__name__, seedurl=url)          self.policy = policy          self.tempdir = tempdir +        # tasks currently running +        self.running = set () +        # max number of tasks running +        self.concurrency = concurrency          # keep in sync with StatsHandler          self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0, 'crashed': 0} @@ -350,10 +354,16 @@ class RecursiveController:          while self.pending:              self.logger.info ('recursing',                      uuid='5b8498e4-868d-413c-a67e-004516b8452c', -                    pending=len (self.pending), have=len (self.have)) +                    pending=len (self.pending), have=len (self.have), +                    running=len (self.running))              # since pending is a set this picks a random item, which is fine              u = self.pending.pop ()              self.have.add (u) -            await self.fetch (u) +            t = asyncio.ensure_future (self.fetch (u)) +            self.running.add (t) +            if len (self.running) >= self.concurrency or not self.pending: +                done, pending = await asyncio.wait (self.running, +                        return_when=asyncio.FIRST_COMPLETED) +                self.running.difference_update (done) | 
