From 2ef2ed8202bd5249cda78f135d64f5add9a461ea Mon Sep 17 00:00:00 2001 From: Lars-Dominik Braun Date: Tue, 25 Sep 2018 16:17:03 +0200 Subject: Add recursive controller Simple and sequential. --- crocoite/controller.py | 130 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 129 insertions(+), 1 deletion(-) (limited to 'crocoite/controller.py') diff --git a/crocoite/controller.py b/crocoite/controller.py index 01edc44..8c5a30c 100644 --- a/crocoite/controller.py +++ b/crocoite/controller.py @@ -96,7 +96,7 @@ import time, platform from . import behavior as cbehavior from .browser import ChromeService, SiteLoader, Item -from .util import getFormattedViewportMetrics, removeFragment, getRequirements +from .util import getFormattedViewportMetrics, getRequirements class ControllerStart: __slots__ = ('payload') @@ -229,3 +229,131 @@ class SinglePageController: processQueue () +class RecursionPolicy: + """ Abstract recursion policy """ + + __slots__ = () + + def __call__ (self, urls): + raise NotImplementedError + +class DepthLimit (RecursionPolicy): + """ + Limit recursion by depth. + + depth==0 means no recursion, depth==1 is the page and outgoing links, … + """ + + __slots__ = ('maxdepth') + + def __init__ (self, maxdepth=0): + self.maxdepth = maxdepth + + def __call__ (self, urls): + if self.maxdepth <= 0: + return {} + else: + self.maxdepth -= 1 + return urls + + def __repr__ (self): + return ''.format (self.maxdepth) + +class PrefixLimit (RecursionPolicy): + """ + Limit recursion by prefix + + i.e. prefix=http://example.com/foo + ignored: http://example.com/bar http://offsite.example/foo + accepted: http://example.com/foobar http://example.com/foo/bar + """ + + __slots__ = ('prefix') + + def __init__ (self, prefix): + self.prefix = prefix + + def __call__ (self, urls): + return set (filter (lambda u: u.startswith (self.prefix), urls)) + +import tempfile, asyncio, json, os +from datetime import datetime +from urllib.parse import urlparse +from .behavior import ExtractLinksEvent +from .util import removeFragment + +class RecursiveController: + """ + Simple recursive controller + + Visits links acording to policy + """ + + __slots__ = ('url', 'output', 'command', 'logger', 'policy', 'have', + 'pending', 'stats', 'prefix', 'tempdir') + + def __init__ (self, url, output, command, logger, prefix='{host}-{date}-', + tempdir=None, policy=DepthLimit (0)): + self.url = url + self.output = output + self.command = command + self.prefix = prefix + self.logger = logger.bind (context=type(self).__name__, seedurl=url) + self.policy = policy + self.tempdir = tempdir + # keep in sync with StatsHandler + self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0, 'crashed': 0} + + async def fetch (self, url): + """ + Overrideable fetch action for URLs. Defaults to sequential + SinglePageController. + """ + + def formatCommand (e): + return e.format (url=url, dest=dest.name) + + def formatPrefix (p): + return p.format (host=urlparse (url).hostname, date=datetime.utcnow ().isoformat ()) + + dest = tempfile.NamedTemporaryFile (dir=self.tempdir, + prefix=formatPrefix (self.prefix), suffix='.warc.gz', + delete=False) + destpath = os.path.join (self.output, os.path.basename (dest.name)) + logger = self.logger.bind (url=url, destfile=destpath) + command = list (map (formatCommand, self.command)) + logger.info ('fetch', uuid='1680f384-744c-4b8a-815b-7346e632e8db', command=command) + process = await asyncio.create_subprocess_exec (*command, stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL, stdin=asyncio.subprocess.DEVNULL) + while True: + data = await process.stdout.readline () + if not data: + break + data = json.loads (data) + uuid = data.get ('uuid') + if uuid == '8ee5e9c9-1130-4c5c-88ff-718508546e0c': + links = set (self.policy (map (removeFragment, data.get ('links', [])))) + links.difference_update (self.have) + self.pending.update (links) + elif uuid == '24d92d16-770e-4088-b769-4020e127a7ff': + for k in self.stats.keys (): + self.stats[k] += data.get (k, 0) + logger.info ('stats', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **self.stats) + code = await process.wait() + # atomically move once finished + os.rename (dest.name, destpath) + + async def run (self): + self.have = set () + self.pending = set ([self.url]) + + while self.pending: + self.logger.info ('recursing', + uuid='5b8498e4-868d-413c-a67e-004516b8452c', + pending=len (self.pending), have=len (self.have)) + + # since pending is a set this picks a random item, which is fine + u = self.pending.pop () + self.have.add (u) + await self.fetch (u) + -- cgit v1.2.3