From 2ef2ed8202bd5249cda78f135d64f5add9a461ea Mon Sep 17 00:00:00 2001
From: Lars-Dominik Braun <lars@6xq.net>
Date: Tue, 25 Sep 2018 16:17:03 +0200
Subject: Add recursive controller

Simple and sequential.
---
 crocoite/cli.py        |  40 +++++++++++++++
 crocoite/controller.py | 130 ++++++++++++++++++++++++++++++++++++++++++++++++-
 setup.py               |   1 +
 3 files changed, 170 insertions(+), 1 deletion(-)

diff --git a/crocoite/cli.py b/crocoite/cli.py
index 73ddca1..6167249 100644
--- a/crocoite/cli.py
+++ b/crocoite/cli.py
@@ -65,3 +65,43 @@ def single ():
 
     return True
 
+import asyncio, os
+from .controller import RecursiveController, DepthLimit, PrefixLimit
+
+def parsePolicy (recursive, url):
+    if recursive is None:
+        return DepthLimit (0)
+    elif recursive.isdigit ():
+        return DepthLimit (int (recursive))
+    elif recursive == 'prefix':
+        return PrefixLimit (url)
+    else:
+        raise ValueError ('Unsupported')
+
+def recursive ():
+    logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()])
+
+    parser = argparse.ArgumentParser(description='Recursively run crocoite-grab.')
+    parser.add_argument('--policy', help='Recursion policy', metavar='POLICY')
+    parser.add_argument('--tempdir', help='Directory for temporary files', metavar='DIR')
+    parser.add_argument('--prefix', help='Output filename prefix, supports templates {host} and {date}', metavar='FILENAME', default='{host}-{date}-')
+    parser.add_argument('url', help='Seed URL', metavar='URL')
+    parser.add_argument('output', help='Output directory', metavar='DIR')
+    parser.add_argument('command', help='Fetch command, supports templates {url} and {dest}', metavar='CMD', nargs='*', default=['crocoite-grab', '{url}', '{dest}'])
+
+    args = parser.parse_args ()
+    try:
+        policy = parsePolicy (args.policy, args.url)
+    except ValueError:
+        parser.error ('Invalid argument for --policy')
+
+    os.makedirs (args.output, exist_ok=True)
+
+    controller = RecursiveController (url=args.url, output=args.output,
+            command=args.command, logger=logger, policy=policy,
+            tempdir=args.tempdir, prefix=args.prefix)
+
+    loop = asyncio.get_event_loop()
+    loop.run_until_complete(controller.run ())
+    loop.close()
+
diff --git a/crocoite/controller.py b/crocoite/controller.py
index 01edc44..8c5a30c 100644
--- a/crocoite/controller.py
+++ b/crocoite/controller.py
@@ -96,7 +96,7 @@ import time, platform
 
 from . import behavior as cbehavior
 from .browser import ChromeService, SiteLoader, Item
-from .util import getFormattedViewportMetrics, removeFragment, getRequirements
+from .util import getFormattedViewportMetrics, getRequirements
 
 class ControllerStart:
     __slots__ = ('payload')
@@ -229,3 +229,131 @@ class SinglePageController:
 
             processQueue ()
 
+class RecursionPolicy:
+    """ Abstract recursion policy """
+
+    __slots__ = ()
+
+    def __call__ (self, urls):
+        raise NotImplementedError
+
+class DepthLimit (RecursionPolicy):
+    """
+    Limit recursion by depth.
+    
+    depth==0 means no recursion, depth==1 is the page and outgoing links, …
+    """
+
+    __slots__ = ('maxdepth')
+
+    def __init__ (self, maxdepth=0):
+        self.maxdepth = maxdepth
+
+    def __call__ (self, urls):
+        if self.maxdepth <= 0:
+            return {}
+        else:
+            self.maxdepth -= 1
+            return urls
+
+    def __repr__ (self):
+        return '<DepthLimit {}>'.format (self.maxdepth)
+
+class PrefixLimit (RecursionPolicy):
+    """
+    Limit recursion by prefix
+    
+    i.e. prefix=http://example.com/foo
+    ignored: http://example.com/bar http://offsite.example/foo
+    accepted: http://example.com/foobar http://example.com/foo/bar
+    """
+
+    __slots__ = ('prefix')
+
+    def __init__ (self, prefix):
+        self.prefix = prefix
+
+    def __call__ (self, urls):
+        return set (filter (lambda u: u.startswith (self.prefix), urls))
+
+import tempfile, asyncio, json, os
+from datetime import datetime
+from urllib.parse import urlparse
+from .behavior import ExtractLinksEvent
+from .util import removeFragment
+
+class RecursiveController:
+    """
+    Simple recursive controller
+
+    Visits links acording to policy
+    """
+
+    __slots__ = ('url', 'output', 'command', 'logger', 'policy', 'have',
+            'pending', 'stats', 'prefix', 'tempdir')
+
+    def __init__ (self, url, output, command, logger, prefix='{host}-{date}-',
+            tempdir=None, policy=DepthLimit (0)):
+        self.url = url
+        self.output = output
+        self.command = command
+        self.prefix = prefix
+        self.logger = logger.bind (context=type(self).__name__, seedurl=url)
+        self.policy = policy
+        self.tempdir = tempdir
+        # keep in sync with StatsHandler
+        self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0, 'crashed': 0}
+
+    async def fetch (self, url):
+        """
+        Overrideable fetch action for URLs. Defaults to sequential
+        SinglePageController.
+        """
+
+        def formatCommand (e):
+            return e.format (url=url, dest=dest.name)
+
+        def formatPrefix (p):
+            return p.format (host=urlparse (url).hostname, date=datetime.utcnow ().isoformat ())
+
+        dest = tempfile.NamedTemporaryFile (dir=self.tempdir,
+                prefix=formatPrefix (self.prefix), suffix='.warc.gz',
+                delete=False)
+        destpath = os.path.join (self.output, os.path.basename (dest.name))
+        logger = self.logger.bind (url=url, destfile=destpath)
+        command = list (map (formatCommand, self.command))
+        logger.info ('fetch', uuid='1680f384-744c-4b8a-815b-7346e632e8db', command=command)
+        process = await asyncio.create_subprocess_exec (*command, stdout=asyncio.subprocess.PIPE,
+                stderr=asyncio.subprocess.DEVNULL, stdin=asyncio.subprocess.DEVNULL)
+        while True:
+            data = await process.stdout.readline ()
+            if not data:
+                break
+            data = json.loads (data)
+            uuid = data.get ('uuid')
+            if uuid == '8ee5e9c9-1130-4c5c-88ff-718508546e0c':
+                links = set (self.policy (map (removeFragment, data.get ('links', []))))
+                links.difference_update (self.have)
+                self.pending.update (links)
+            elif uuid == '24d92d16-770e-4088-b769-4020e127a7ff':
+                for k in self.stats.keys ():
+                    self.stats[k] += data.get (k, 0)
+                logger.info ('stats', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **self.stats)
+        code = await process.wait()
+        # atomically move once finished
+        os.rename (dest.name, destpath)
+
+    async def run (self):
+        self.have = set ()
+        self.pending = set ([self.url])
+
+        while self.pending:
+            self.logger.info ('recursing',
+                    uuid='5b8498e4-868d-413c-a67e-004516b8452c',
+                    pending=len (self.pending), have=len (self.have))
+
+            # since pending is a set this picks a random item, which is fine
+            u = self.pending.pop ()
+            self.have.add (u)
+            await self.fetch (u)
+
diff --git a/setup.py b/setup.py
index e2357ca..6f6b255 100644
--- a/setup.py
+++ b/setup.py
@@ -17,6 +17,7 @@ setup(
     entry_points={
     'console_scripts': [
             'crocoite-grab = crocoite.cli:single',
+            'crocoite-recursive = crocoite.cli:recursive',
             'crocoite-merge-warc = crocoite.tools:mergeWarc',
             'crocoite-extract-screenshot = crocoite.tools:extractScreenshot',
             ],
-- 
cgit v1.2.3