diff options
Diffstat (limited to 'crocoite')
| -rw-r--r-- | crocoite/cli.py | 40 | ||||
| -rw-r--r-- | crocoite/controller.py | 130 | 
2 files changed, 169 insertions, 1 deletions
| diff --git a/crocoite/cli.py b/crocoite/cli.py index 73ddca1..6167249 100644 --- a/crocoite/cli.py +++ b/crocoite/cli.py @@ -65,3 +65,43 @@ def single ():      return True +import asyncio, os +from .controller import RecursiveController, DepthLimit, PrefixLimit + +def parsePolicy (recursive, url): +    if recursive is None: +        return DepthLimit (0) +    elif recursive.isdigit (): +        return DepthLimit (int (recursive)) +    elif recursive == 'prefix': +        return PrefixLimit (url) +    else: +        raise ValueError ('Unsupported') + +def recursive (): +    logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()]) + +    parser = argparse.ArgumentParser(description='Recursively run crocoite-grab.') +    parser.add_argument('--policy', help='Recursion policy', metavar='POLICY') +    parser.add_argument('--tempdir', help='Directory for temporary files', metavar='DIR') +    parser.add_argument('--prefix', help='Output filename prefix, supports templates {host} and {date}', metavar='FILENAME', default='{host}-{date}-') +    parser.add_argument('url', help='Seed URL', metavar='URL') +    parser.add_argument('output', help='Output directory', metavar='DIR') +    parser.add_argument('command', help='Fetch command, supports templates {url} and {dest}', metavar='CMD', nargs='*', default=['crocoite-grab', '{url}', '{dest}']) + +    args = parser.parse_args () +    try: +        policy = parsePolicy (args.policy, args.url) +    except ValueError: +        parser.error ('Invalid argument for --policy') + +    os.makedirs (args.output, exist_ok=True) + +    controller = RecursiveController (url=args.url, output=args.output, +            command=args.command, logger=logger, policy=policy, +            tempdir=args.tempdir, prefix=args.prefix) + +    loop = asyncio.get_event_loop() +    loop.run_until_complete(controller.run ()) +    loop.close() + diff --git a/crocoite/controller.py b/crocoite/controller.py index 01edc44..8c5a30c 100644 --- a/crocoite/controller.py +++ b/crocoite/controller.py @@ -96,7 +96,7 @@ import time, platform  from . import behavior as cbehavior  from .browser import ChromeService, SiteLoader, Item -from .util import getFormattedViewportMetrics, removeFragment, getRequirements +from .util import getFormattedViewportMetrics, getRequirements  class ControllerStart:      __slots__ = ('payload') @@ -229,3 +229,131 @@ class SinglePageController:              processQueue () +class RecursionPolicy: +    """ Abstract recursion policy """ + +    __slots__ = () + +    def __call__ (self, urls): +        raise NotImplementedError + +class DepthLimit (RecursionPolicy): +    """ +    Limit recursion by depth. +     +    depth==0 means no recursion, depth==1 is the page and outgoing links, … +    """ + +    __slots__ = ('maxdepth') + +    def __init__ (self, maxdepth=0): +        self.maxdepth = maxdepth + +    def __call__ (self, urls): +        if self.maxdepth <= 0: +            return {} +        else: +            self.maxdepth -= 1 +            return urls + +    def __repr__ (self): +        return '<DepthLimit {}>'.format (self.maxdepth) + +class PrefixLimit (RecursionPolicy): +    """ +    Limit recursion by prefix +     +    i.e. prefix=http://example.com/foo +    ignored: http://example.com/bar http://offsite.example/foo +    accepted: http://example.com/foobar http://example.com/foo/bar +    """ + +    __slots__ = ('prefix') + +    def __init__ (self, prefix): +        self.prefix = prefix + +    def __call__ (self, urls): +        return set (filter (lambda u: u.startswith (self.prefix), urls)) + +import tempfile, asyncio, json, os +from datetime import datetime +from urllib.parse import urlparse +from .behavior import ExtractLinksEvent +from .util import removeFragment + +class RecursiveController: +    """ +    Simple recursive controller + +    Visits links acording to policy +    """ + +    __slots__ = ('url', 'output', 'command', 'logger', 'policy', 'have', +            'pending', 'stats', 'prefix', 'tempdir') + +    def __init__ (self, url, output, command, logger, prefix='{host}-{date}-', +            tempdir=None, policy=DepthLimit (0)): +        self.url = url +        self.output = output +        self.command = command +        self.prefix = prefix +        self.logger = logger.bind (context=type(self).__name__, seedurl=url) +        self.policy = policy +        self.tempdir = tempdir +        # keep in sync with StatsHandler +        self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0, 'crashed': 0} + +    async def fetch (self, url): +        """ +        Overrideable fetch action for URLs. Defaults to sequential +        SinglePageController. +        """ + +        def formatCommand (e): +            return e.format (url=url, dest=dest.name) + +        def formatPrefix (p): +            return p.format (host=urlparse (url).hostname, date=datetime.utcnow ().isoformat ()) + +        dest = tempfile.NamedTemporaryFile (dir=self.tempdir, +                prefix=formatPrefix (self.prefix), suffix='.warc.gz', +                delete=False) +        destpath = os.path.join (self.output, os.path.basename (dest.name)) +        logger = self.logger.bind (url=url, destfile=destpath) +        command = list (map (formatCommand, self.command)) +        logger.info ('fetch', uuid='1680f384-744c-4b8a-815b-7346e632e8db', command=command) +        process = await asyncio.create_subprocess_exec (*command, stdout=asyncio.subprocess.PIPE, +                stderr=asyncio.subprocess.DEVNULL, stdin=asyncio.subprocess.DEVNULL) +        while True: +            data = await process.stdout.readline () +            if not data: +                break +            data = json.loads (data) +            uuid = data.get ('uuid') +            if uuid == '8ee5e9c9-1130-4c5c-88ff-718508546e0c': +                links = set (self.policy (map (removeFragment, data.get ('links', [])))) +                links.difference_update (self.have) +                self.pending.update (links) +            elif uuid == '24d92d16-770e-4088-b769-4020e127a7ff': +                for k in self.stats.keys (): +                    self.stats[k] += data.get (k, 0) +                logger.info ('stats', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **self.stats) +        code = await process.wait() +        # atomically move once finished +        os.rename (dest.name, destpath) + +    async def run (self): +        self.have = set () +        self.pending = set ([self.url]) + +        while self.pending: +            self.logger.info ('recursing', +                    uuid='5b8498e4-868d-413c-a67e-004516b8452c', +                    pending=len (self.pending), have=len (self.have)) + +            # since pending is a set this picks a random item, which is fine +            u = self.pending.pop () +            self.have.add (u) +            await self.fetch (u) + | 
