summaryrefslogtreecommitdiff
path: root/crocoite
diff options
context:
space:
mode:
authorLars-Dominik Braun <lars@6xq.net>2018-09-25 16:17:03 +0200
committerLars-Dominik Braun <lars@6xq.net>2018-09-25 17:00:21 +0200
commit2ef2ed8202bd5249cda78f135d64f5add9a461ea (patch)
tree0661b9a2a70e9e7feeda51d4fa9422a38784516a /crocoite
parent329de53c7c8cea725249089df87d6cd9823f6972 (diff)
downloadcrocoite-2ef2ed8202bd5249cda78f135d64f5add9a461ea.tar.gz
crocoite-2ef2ed8202bd5249cda78f135d64f5add9a461ea.tar.bz2
crocoite-2ef2ed8202bd5249cda78f135d64f5add9a461ea.zip
Add recursive controller
Simple and sequential.
Diffstat (limited to 'crocoite')
-rw-r--r--crocoite/cli.py40
-rw-r--r--crocoite/controller.py130
2 files changed, 169 insertions, 1 deletions
diff --git a/crocoite/cli.py b/crocoite/cli.py
index 73ddca1..6167249 100644
--- a/crocoite/cli.py
+++ b/crocoite/cli.py
@@ -65,3 +65,43 @@ def single ():
return True
+import asyncio, os
+from .controller import RecursiveController, DepthLimit, PrefixLimit
+
+def parsePolicy (recursive, url):
+ if recursive is None:
+ return DepthLimit (0)
+ elif recursive.isdigit ():
+ return DepthLimit (int (recursive))
+ elif recursive == 'prefix':
+ return PrefixLimit (url)
+ else:
+ raise ValueError ('Unsupported')
+
+def recursive ():
+ logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()])
+
+ parser = argparse.ArgumentParser(description='Recursively run crocoite-grab.')
+ parser.add_argument('--policy', help='Recursion policy', metavar='POLICY')
+ parser.add_argument('--tempdir', help='Directory for temporary files', metavar='DIR')
+ parser.add_argument('--prefix', help='Output filename prefix, supports templates {host} and {date}', metavar='FILENAME', default='{host}-{date}-')
+ parser.add_argument('url', help='Seed URL', metavar='URL')
+ parser.add_argument('output', help='Output directory', metavar='DIR')
+ parser.add_argument('command', help='Fetch command, supports templates {url} and {dest}', metavar='CMD', nargs='*', default=['crocoite-grab', '{url}', '{dest}'])
+
+ args = parser.parse_args ()
+ try:
+ policy = parsePolicy (args.policy, args.url)
+ except ValueError:
+ parser.error ('Invalid argument for --policy')
+
+ os.makedirs (args.output, exist_ok=True)
+
+ controller = RecursiveController (url=args.url, output=args.output,
+ command=args.command, logger=logger, policy=policy,
+ tempdir=args.tempdir, prefix=args.prefix)
+
+ loop = asyncio.get_event_loop()
+ loop.run_until_complete(controller.run ())
+ loop.close()
+
diff --git a/crocoite/controller.py b/crocoite/controller.py
index 01edc44..8c5a30c 100644
--- a/crocoite/controller.py
+++ b/crocoite/controller.py
@@ -96,7 +96,7 @@ import time, platform
from . import behavior as cbehavior
from .browser import ChromeService, SiteLoader, Item
-from .util import getFormattedViewportMetrics, removeFragment, getRequirements
+from .util import getFormattedViewportMetrics, getRequirements
class ControllerStart:
__slots__ = ('payload')
@@ -229,3 +229,131 @@ class SinglePageController:
processQueue ()
+class RecursionPolicy:
+ """ Abstract recursion policy """
+
+ __slots__ = ()
+
+ def __call__ (self, urls):
+ raise NotImplementedError
+
+class DepthLimit (RecursionPolicy):
+ """
+ Limit recursion by depth.
+
+ depth==0 means no recursion, depth==1 is the page and outgoing links, …
+ """
+
+ __slots__ = ('maxdepth')
+
+ def __init__ (self, maxdepth=0):
+ self.maxdepth = maxdepth
+
+ def __call__ (self, urls):
+ if self.maxdepth <= 0:
+ return {}
+ else:
+ self.maxdepth -= 1
+ return urls
+
+ def __repr__ (self):
+ return '<DepthLimit {}>'.format (self.maxdepth)
+
+class PrefixLimit (RecursionPolicy):
+ """
+ Limit recursion by prefix
+
+ i.e. prefix=http://example.com/foo
+ ignored: http://example.com/bar http://offsite.example/foo
+ accepted: http://example.com/foobar http://example.com/foo/bar
+ """
+
+ __slots__ = ('prefix')
+
+ def __init__ (self, prefix):
+ self.prefix = prefix
+
+ def __call__ (self, urls):
+ return set (filter (lambda u: u.startswith (self.prefix), urls))
+
+import tempfile, asyncio, json, os
+from datetime import datetime
+from urllib.parse import urlparse
+from .behavior import ExtractLinksEvent
+from .util import removeFragment
+
+class RecursiveController:
+ """
+ Simple recursive controller
+
+ Visits links acording to policy
+ """
+
+ __slots__ = ('url', 'output', 'command', 'logger', 'policy', 'have',
+ 'pending', 'stats', 'prefix', 'tempdir')
+
+ def __init__ (self, url, output, command, logger, prefix='{host}-{date}-',
+ tempdir=None, policy=DepthLimit (0)):
+ self.url = url
+ self.output = output
+ self.command = command
+ self.prefix = prefix
+ self.logger = logger.bind (context=type(self).__name__, seedurl=url)
+ self.policy = policy
+ self.tempdir = tempdir
+ # keep in sync with StatsHandler
+ self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0, 'crashed': 0}
+
+ async def fetch (self, url):
+ """
+ Overrideable fetch action for URLs. Defaults to sequential
+ SinglePageController.
+ """
+
+ def formatCommand (e):
+ return e.format (url=url, dest=dest.name)
+
+ def formatPrefix (p):
+ return p.format (host=urlparse (url).hostname, date=datetime.utcnow ().isoformat ())
+
+ dest = tempfile.NamedTemporaryFile (dir=self.tempdir,
+ prefix=formatPrefix (self.prefix), suffix='.warc.gz',
+ delete=False)
+ destpath = os.path.join (self.output, os.path.basename (dest.name))
+ logger = self.logger.bind (url=url, destfile=destpath)
+ command = list (map (formatCommand, self.command))
+ logger.info ('fetch', uuid='1680f384-744c-4b8a-815b-7346e632e8db', command=command)
+ process = await asyncio.create_subprocess_exec (*command, stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.DEVNULL, stdin=asyncio.subprocess.DEVNULL)
+ while True:
+ data = await process.stdout.readline ()
+ if not data:
+ break
+ data = json.loads (data)
+ uuid = data.get ('uuid')
+ if uuid == '8ee5e9c9-1130-4c5c-88ff-718508546e0c':
+ links = set (self.policy (map (removeFragment, data.get ('links', []))))
+ links.difference_update (self.have)
+ self.pending.update (links)
+ elif uuid == '24d92d16-770e-4088-b769-4020e127a7ff':
+ for k in self.stats.keys ():
+ self.stats[k] += data.get (k, 0)
+ logger.info ('stats', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **self.stats)
+ code = await process.wait()
+ # atomically move once finished
+ os.rename (dest.name, destpath)
+
+ async def run (self):
+ self.have = set ()
+ self.pending = set ([self.url])
+
+ while self.pending:
+ self.logger.info ('recursing',
+ uuid='5b8498e4-868d-413c-a67e-004516b8452c',
+ pending=len (self.pending), have=len (self.have))
+
+ # since pending is a set this picks a random item, which is fine
+ u = self.pending.pop ()
+ self.have.add (u)
+ await self.fetch (u)
+