summaryrefslogtreecommitdiff
path: root/crocoite/controller.py
diff options
context:
space:
mode:
authorLars-Dominik Braun <lars@6xq.net>2018-09-25 16:17:03 +0200
committerLars-Dominik Braun <lars@6xq.net>2018-09-25 17:00:21 +0200
commit2ef2ed8202bd5249cda78f135d64f5add9a461ea (patch)
tree0661b9a2a70e9e7feeda51d4fa9422a38784516a /crocoite/controller.py
parent329de53c7c8cea725249089df87d6cd9823f6972 (diff)
downloadcrocoite-2ef2ed8202bd5249cda78f135d64f5add9a461ea.tar.gz
crocoite-2ef2ed8202bd5249cda78f135d64f5add9a461ea.tar.bz2
crocoite-2ef2ed8202bd5249cda78f135d64f5add9a461ea.zip
Add recursive controller
Simple and sequential.
Diffstat (limited to 'crocoite/controller.py')
-rw-r--r--crocoite/controller.py130
1 files changed, 129 insertions, 1 deletions
diff --git a/crocoite/controller.py b/crocoite/controller.py
index 01edc44..8c5a30c 100644
--- a/crocoite/controller.py
+++ b/crocoite/controller.py
@@ -96,7 +96,7 @@ import time, platform
from . import behavior as cbehavior
from .browser import ChromeService, SiteLoader, Item
-from .util import getFormattedViewportMetrics, removeFragment, getRequirements
+from .util import getFormattedViewportMetrics, getRequirements
class ControllerStart:
__slots__ = ('payload')
@@ -229,3 +229,131 @@ class SinglePageController:
processQueue ()
+class RecursionPolicy:
+ """ Abstract recursion policy """
+
+ __slots__ = ()
+
+ def __call__ (self, urls):
+ raise NotImplementedError
+
+class DepthLimit (RecursionPolicy):
+ """
+ Limit recursion by depth.
+
+ depth==0 means no recursion, depth==1 is the page and outgoing links, …
+ """
+
+ __slots__ = ('maxdepth')
+
+ def __init__ (self, maxdepth=0):
+ self.maxdepth = maxdepth
+
+ def __call__ (self, urls):
+ if self.maxdepth <= 0:
+ return {}
+ else:
+ self.maxdepth -= 1
+ return urls
+
+ def __repr__ (self):
+ return '<DepthLimit {}>'.format (self.maxdepth)
+
+class PrefixLimit (RecursionPolicy):
+ """
+ Limit recursion by prefix
+
+ i.e. prefix=http://example.com/foo
+ ignored: http://example.com/bar http://offsite.example/foo
+ accepted: http://example.com/foobar http://example.com/foo/bar
+ """
+
+ __slots__ = ('prefix')
+
+ def __init__ (self, prefix):
+ self.prefix = prefix
+
+ def __call__ (self, urls):
+ return set (filter (lambda u: u.startswith (self.prefix), urls))
+
+import tempfile, asyncio, json, os
+from datetime import datetime
+from urllib.parse import urlparse
+from .behavior import ExtractLinksEvent
+from .util import removeFragment
+
+class RecursiveController:
+ """
+ Simple recursive controller
+
+ Visits links acording to policy
+ """
+
+ __slots__ = ('url', 'output', 'command', 'logger', 'policy', 'have',
+ 'pending', 'stats', 'prefix', 'tempdir')
+
+ def __init__ (self, url, output, command, logger, prefix='{host}-{date}-',
+ tempdir=None, policy=DepthLimit (0)):
+ self.url = url
+ self.output = output
+ self.command = command
+ self.prefix = prefix
+ self.logger = logger.bind (context=type(self).__name__, seedurl=url)
+ self.policy = policy
+ self.tempdir = tempdir
+ # keep in sync with StatsHandler
+ self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0, 'crashed': 0}
+
+ async def fetch (self, url):
+ """
+ Overrideable fetch action for URLs. Defaults to sequential
+ SinglePageController.
+ """
+
+ def formatCommand (e):
+ return e.format (url=url, dest=dest.name)
+
+ def formatPrefix (p):
+ return p.format (host=urlparse (url).hostname, date=datetime.utcnow ().isoformat ())
+
+ dest = tempfile.NamedTemporaryFile (dir=self.tempdir,
+ prefix=formatPrefix (self.prefix), suffix='.warc.gz',
+ delete=False)
+ destpath = os.path.join (self.output, os.path.basename (dest.name))
+ logger = self.logger.bind (url=url, destfile=destpath)
+ command = list (map (formatCommand, self.command))
+ logger.info ('fetch', uuid='1680f384-744c-4b8a-815b-7346e632e8db', command=command)
+ process = await asyncio.create_subprocess_exec (*command, stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.DEVNULL, stdin=asyncio.subprocess.DEVNULL)
+ while True:
+ data = await process.stdout.readline ()
+ if not data:
+ break
+ data = json.loads (data)
+ uuid = data.get ('uuid')
+ if uuid == '8ee5e9c9-1130-4c5c-88ff-718508546e0c':
+ links = set (self.policy (map (removeFragment, data.get ('links', []))))
+ links.difference_update (self.have)
+ self.pending.update (links)
+ elif uuid == '24d92d16-770e-4088-b769-4020e127a7ff':
+ for k in self.stats.keys ():
+ self.stats[k] += data.get (k, 0)
+ logger.info ('stats', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **self.stats)
+ code = await process.wait()
+ # atomically move once finished
+ os.rename (dest.name, destpath)
+
+ async def run (self):
+ self.have = set ()
+ self.pending = set ([self.url])
+
+ while self.pending:
+ self.logger.info ('recursing',
+ uuid='5b8498e4-868d-413c-a67e-004516b8452c',
+ pending=len (self.pending), have=len (self.have))
+
+ # since pending is a set this picks a random item, which is fine
+ u = self.pending.pop ()
+ self.have.add (u)
+ await self.fetch (u)
+