diff options
author | Lars-Dominik Braun <lars@6xq.net> | 2018-09-25 16:17:03 +0200 |
---|---|---|
committer | Lars-Dominik Braun <lars@6xq.net> | 2018-09-25 17:00:21 +0200 |
commit | 2ef2ed8202bd5249cda78f135d64f5add9a461ea (patch) | |
tree | 0661b9a2a70e9e7feeda51d4fa9422a38784516a /crocoite | |
parent | 329de53c7c8cea725249089df87d6cd9823f6972 (diff) | |
download | crocoite-2ef2ed8202bd5249cda78f135d64f5add9a461ea.tar.gz crocoite-2ef2ed8202bd5249cda78f135d64f5add9a461ea.tar.bz2 crocoite-2ef2ed8202bd5249cda78f135d64f5add9a461ea.zip |
Add recursive controller
Simple and sequential.
Diffstat (limited to 'crocoite')
-rw-r--r-- | crocoite/cli.py | 40 | ||||
-rw-r--r-- | crocoite/controller.py | 130 |
2 files changed, 169 insertions, 1 deletions
diff --git a/crocoite/cli.py b/crocoite/cli.py index 73ddca1..6167249 100644 --- a/crocoite/cli.py +++ b/crocoite/cli.py @@ -65,3 +65,43 @@ def single (): return True +import asyncio, os +from .controller import RecursiveController, DepthLimit, PrefixLimit + +def parsePolicy (recursive, url): + if recursive is None: + return DepthLimit (0) + elif recursive.isdigit (): + return DepthLimit (int (recursive)) + elif recursive == 'prefix': + return PrefixLimit (url) + else: + raise ValueError ('Unsupported') + +def recursive (): + logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()]) + + parser = argparse.ArgumentParser(description='Recursively run crocoite-grab.') + parser.add_argument('--policy', help='Recursion policy', metavar='POLICY') + parser.add_argument('--tempdir', help='Directory for temporary files', metavar='DIR') + parser.add_argument('--prefix', help='Output filename prefix, supports templates {host} and {date}', metavar='FILENAME', default='{host}-{date}-') + parser.add_argument('url', help='Seed URL', metavar='URL') + parser.add_argument('output', help='Output directory', metavar='DIR') + parser.add_argument('command', help='Fetch command, supports templates {url} and {dest}', metavar='CMD', nargs='*', default=['crocoite-grab', '{url}', '{dest}']) + + args = parser.parse_args () + try: + policy = parsePolicy (args.policy, args.url) + except ValueError: + parser.error ('Invalid argument for --policy') + + os.makedirs (args.output, exist_ok=True) + + controller = RecursiveController (url=args.url, output=args.output, + command=args.command, logger=logger, policy=policy, + tempdir=args.tempdir, prefix=args.prefix) + + loop = asyncio.get_event_loop() + loop.run_until_complete(controller.run ()) + loop.close() + diff --git a/crocoite/controller.py b/crocoite/controller.py index 01edc44..8c5a30c 100644 --- a/crocoite/controller.py +++ b/crocoite/controller.py @@ -96,7 +96,7 @@ import time, platform from . import behavior as cbehavior from .browser import ChromeService, SiteLoader, Item -from .util import getFormattedViewportMetrics, removeFragment, getRequirements +from .util import getFormattedViewportMetrics, getRequirements class ControllerStart: __slots__ = ('payload') @@ -229,3 +229,131 @@ class SinglePageController: processQueue () +class RecursionPolicy: + """ Abstract recursion policy """ + + __slots__ = () + + def __call__ (self, urls): + raise NotImplementedError + +class DepthLimit (RecursionPolicy): + """ + Limit recursion by depth. + + depth==0 means no recursion, depth==1 is the page and outgoing links, … + """ + + __slots__ = ('maxdepth') + + def __init__ (self, maxdepth=0): + self.maxdepth = maxdepth + + def __call__ (self, urls): + if self.maxdepth <= 0: + return {} + else: + self.maxdepth -= 1 + return urls + + def __repr__ (self): + return '<DepthLimit {}>'.format (self.maxdepth) + +class PrefixLimit (RecursionPolicy): + """ + Limit recursion by prefix + + i.e. prefix=http://example.com/foo + ignored: http://example.com/bar http://offsite.example/foo + accepted: http://example.com/foobar http://example.com/foo/bar + """ + + __slots__ = ('prefix') + + def __init__ (self, prefix): + self.prefix = prefix + + def __call__ (self, urls): + return set (filter (lambda u: u.startswith (self.prefix), urls)) + +import tempfile, asyncio, json, os +from datetime import datetime +from urllib.parse import urlparse +from .behavior import ExtractLinksEvent +from .util import removeFragment + +class RecursiveController: + """ + Simple recursive controller + + Visits links acording to policy + """ + + __slots__ = ('url', 'output', 'command', 'logger', 'policy', 'have', + 'pending', 'stats', 'prefix', 'tempdir') + + def __init__ (self, url, output, command, logger, prefix='{host}-{date}-', + tempdir=None, policy=DepthLimit (0)): + self.url = url + self.output = output + self.command = command + self.prefix = prefix + self.logger = logger.bind (context=type(self).__name__, seedurl=url) + self.policy = policy + self.tempdir = tempdir + # keep in sync with StatsHandler + self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0, 'crashed': 0} + + async def fetch (self, url): + """ + Overrideable fetch action for URLs. Defaults to sequential + SinglePageController. + """ + + def formatCommand (e): + return e.format (url=url, dest=dest.name) + + def formatPrefix (p): + return p.format (host=urlparse (url).hostname, date=datetime.utcnow ().isoformat ()) + + dest = tempfile.NamedTemporaryFile (dir=self.tempdir, + prefix=formatPrefix (self.prefix), suffix='.warc.gz', + delete=False) + destpath = os.path.join (self.output, os.path.basename (dest.name)) + logger = self.logger.bind (url=url, destfile=destpath) + command = list (map (formatCommand, self.command)) + logger.info ('fetch', uuid='1680f384-744c-4b8a-815b-7346e632e8db', command=command) + process = await asyncio.create_subprocess_exec (*command, stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL, stdin=asyncio.subprocess.DEVNULL) + while True: + data = await process.stdout.readline () + if not data: + break + data = json.loads (data) + uuid = data.get ('uuid') + if uuid == '8ee5e9c9-1130-4c5c-88ff-718508546e0c': + links = set (self.policy (map (removeFragment, data.get ('links', [])))) + links.difference_update (self.have) + self.pending.update (links) + elif uuid == '24d92d16-770e-4088-b769-4020e127a7ff': + for k in self.stats.keys (): + self.stats[k] += data.get (k, 0) + logger.info ('stats', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **self.stats) + code = await process.wait() + # atomically move once finished + os.rename (dest.name, destpath) + + async def run (self): + self.have = set () + self.pending = set ([self.url]) + + while self.pending: + self.logger.info ('recursing', + uuid='5b8498e4-868d-413c-a67e-004516b8452c', + pending=len (self.pending), have=len (self.have)) + + # since pending is a set this picks a random item, which is fine + u = self.pending.pop () + self.have.add (u) + await self.fetch (u) + |