From ec5e6a0aea7a2892f66ca1d196d83af521ca3955 Mon Sep 17 00:00:00 2001 From: Lars-Dominik Braun Date: Thu, 4 Jul 2019 12:02:14 +0200 Subject: Rename cli utils crocoite-recursive is now just crocoite, crocoite-grab is not user-facing any more and called crocoite-single. In preparation for 1.0 release. --- crocoite/cli.py | 101 ++++++++++++++++++------------------------------- crocoite/controller.py | 72 ++++++++++++++++++++++++++--------- crocoite/irc.py | 19 ++++++---- 3 files changed, 102 insertions(+), 90 deletions(-) (limited to 'crocoite') diff --git a/crocoite/cli.py b/crocoite/cli.py index d89384d..93b742b 100644 --- a/crocoite/cli.py +++ b/crocoite/cli.py @@ -42,6 +42,13 @@ from .logger import Logger, JsonPrintConsumer, DatetimeConsumer, \ WarcHandlerConsumer, Level from .devtools import Crashed +def absurl (s): + """ argparse: Absolute URL """ + u = URL (s) + if u.is_absolute (): + return u + raise argparse.ArgumentTypeError ('Must be absolute') + class SingleExitStatus(IntEnum): """ Exit status for single-shot command line """ Ok = 0 @@ -50,21 +57,8 @@ class SingleExitStatus(IntEnum): Navigate = 3 def single (): - """ - One-shot command line interface and pywb_ playback: - - .. code:: bash - - pip install pywb - crocoite-grab http://example.com/ example.com.warc.gz - rm -rf collections && wb-manager init test && wb-manager add test example.com.warc.gz - wayback & - $BROWSER http://localhost:8080 - - .. _pywb: https://github.com/ikreymer/pywb - """ - parser = argparse.ArgumentParser(description='Save website to WARC using Google Chrome.') - parser.add_argument('--browser', help='DevTools URL', metavar='URL') + parser = argparse.ArgumentParser(description='crocoite helper tools to fetch individual pages.') + parser.add_argument('--browser', help='DevTools URL', type=absurl, metavar='URL') parser.add_argument('--timeout', default=1*60*60, type=int, help='Maximum time for archival', metavar='SEC') parser.add_argument('--idle-timeout', default=30, type=int, help='Maximum idle seconds (i.e. no requests)', dest='idleTimeout', metavar='SEC') parser.add_argument('--behavior', help='Enable behavior script', @@ -77,7 +71,7 @@ def single (): parser.add_argument('-k', '--insecure', action='store_true', help='Disable certificate validation') - parser.add_argument('url', help='Website URL', type=URL, metavar='URL') + parser.add_argument('url', help='Website URL', type=absurl, metavar='URL') parser.add_argument('output', help='WARC filename', metavar='FILE') args = parser.parse_args () @@ -135,50 +129,40 @@ def parsePolicy (recursive, url): return DepthLimit (int (recursive)) elif recursive == 'prefix': return PrefixLimit (url) - raise ValueError ('Unsupported') + raise argparse.ArgumentTypeError ('Unsupported recursion mode') def recursive (): - """ - crocoite is built with the Unix philosophy (“do one thing and do it well”) in - mind. Thus ``crocoite-grab`` can only save a single page. If you want recursion - use ``crocoite-recursive``, which follows hyperlinks according to ``--policy``. - It can either recurse a maximum number of levels or grab all pages with the - same prefix as the start URL: - - .. code:: bash - - crocoite-recursive --policy prefix http://www.example.com/dir/ output - - will save all pages in ``/dir/`` and below to individual files in the output - directory ``output``. You can customize the command used to grab individual - pages by appending it after ``output``. This way distributed grabs (ssh to a - different machine and execute the job there, queue the command with Slurm, …) - are possible. - """ - logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()]) - parser = argparse.ArgumentParser(description='Recursively run crocoite-grab.') - parser.add_argument('--policy', help='Recursion policy', metavar='POLICY') - parser.add_argument('--tempdir', help='Directory for temporary files', metavar='DIR') - parser.add_argument('--prefix', help='Output filename prefix, supports templates {host} and {date}', metavar='FILENAME', default='{host}-{date}-') - parser.add_argument('--concurrency', '-j', help='Run at most N jobs', metavar='N', default=1, type=int) - parser.add_argument('url', help='Seed URL', type=URL, metavar='URL') - parser.add_argument('output', help='Output directory', metavar='DIR') - parser.add_argument('command', help='Fetch command, supports templates {url} and {dest}', metavar='CMD', nargs='*', default=['crocoite-grab', '{url}', '{dest}']) + parser = argparse.ArgumentParser(description='Save website to WARC using Google Chrome.') + parser.add_argument('-j', '--concurrency', + help='Run at most N jobs concurrently', metavar='N', default=1, + type=int) + parser.add_argument('-r', '--recursion', help='Recursion policy', + metavar='POLICY') + parser.add_argument('--tempdir', help='Directory for temporary files', + metavar='DIR') + parser.add_argument('url', help='Seed URL', type=absurl, metavar='URL') + parser.add_argument('output', + help='Output file, supports templates {host}, {date} and {seqnum}', + metavar='FILE') + parser.add_argument('command', + help='Fetch command, supports templates {url} and {dest}', + metavar='CMD', nargs='*', + default=['crocoite-single', '{url}', '{dest}']) args = parser.parse_args () try: - policy = parsePolicy (args.policy, args.url) - except ValueError: - parser.error ('Invalid argument for --policy') + policy = parsePolicy (args.recursion, args.url) + except argparse.ArgumentTypeError as e: + parser.error (str (e)) - os.makedirs (args.output, exist_ok=True) - - controller = RecursiveController (url=args.url, output=args.output, - command=args.command, logger=logger, policy=policy, - tempdir=args.tempdir, prefix=args.prefix, - concurrency=args.concurrency) + try: + controller = RecursiveController (url=args.url, output=args.output, + command=args.command, logger=logger, policy=policy, + tempdir=args.tempdir, concurrency=args.concurrency) + except ValueError as e: + parser.error (str (e)) run = asyncio.ensure_future (controller.run ()) loop = asyncio.get_event_loop() @@ -191,19 +175,6 @@ def recursive (): return 0 def irc (): - """ - A simple IRC bot (“chromebot”) is provided with the command ``crocoite-irc``. - It reads its configuration from a config file like the example provided in - ``contrib/chromebot.json`` and supports the following commands: - - a -j -r - Archive with processes according to recursion - s - Get job status for - r - Revoke or abort running job with - """ - import json, re from .irc import Chromebot diff --git a/crocoite/controller.py b/crocoite/controller.py index 02017c3..56fb3bb 100644 --- a/crocoite/controller.py +++ b/crocoite/controller.py @@ -22,7 +22,7 @@ Controller classes, handling actions required for archival """ -import time, tempfile, asyncio, json, os +import time, tempfile, asyncio, json, os, shutil from itertools import islice from datetime import datetime from operator import attrgetter @@ -355,6 +355,10 @@ class PrefixLimit (RecursionPolicy): def __call__ (self, urls): return set (filter (lambda u: str(u.value).startswith (str (self.prefix)), urls)) +def hasTemplate (s): + """ Return True if string s has string templates """ + return '{' in s and '}' in s + class RecursiveController: """ Simple recursive controller @@ -363,19 +367,24 @@ class RecursiveController: """ __slots__ = ('url', 'output', 'command', 'logger', 'policy', 'have', - 'pending', 'stats', 'prefix', 'tempdir', 'running', 'concurrency') + 'pending', 'stats', 'tempdir', 'running', 'concurrency', + 'copyLock') SCHEME_WHITELIST = {'http', 'https'} - def __init__ (self, url, output, command, logger, prefix='{host}-{date}-', + def __init__ (self, url, output, command, logger, tempdir=None, policy=DepthLimit (0), concurrency=1): self.url = url self.output = output self.command = command - self.prefix = prefix self.logger = logger.bind (context=type(self).__name__, seedurl=url) self.policy = policy self.tempdir = tempdir + # A lock if only a single output file (no template) is requested + self.copyLock = None if hasTemplate (output) else asyncio.Lock () + # some sanity checks. XXX move to argparse? + if self.copyLock and os.path.exists (self.output): + raise ValueError ('Output file exists') # tasks currently running self.running = set () # max number of tasks running @@ -383,11 +392,11 @@ class RecursiveController: # keep in sync with StatsHandler self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0, 'crashed': 0, 'ignored': 0} - async def fetch (self, entry): + async def fetch (self, entry, seqnum): """ Fetch a single URL using an external command - command is usually crocoite-grab + command is usually crocoite-single """ assert isinstance (entry, SetEntry) @@ -403,8 +412,9 @@ class RecursiveController: else: return e.format (url=url, dest=dest.name) - def formatPrefix (p): - return p.format (host=url.host, date=datetime.utcnow ().isoformat ()) + def formatOutput (p): + return p.format (host=url.host, + date=datetime.utcnow ().isoformat (), seqnum=seqnum) def logStats (): logger.info ('stats', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **self.stats) @@ -417,14 +427,15 @@ class RecursiveController: return dest = tempfile.NamedTemporaryFile (dir=self.tempdir, - prefix=formatPrefix (self.prefix), suffix='.warc.gz', - delete=False) - destpath = os.path.join (self.output, os.path.basename (dest.name)) + prefix=__package__, suffix='.warc.gz', delete=False) command = list (map (formatCommand, self.command)) - logger.info ('fetch', uuid='1680f384-744c-4b8a-815b-7346e632e8db', command=command, destfile=destpath) + logger.info ('fetch', uuid='d1288fbe-8bae-42c8-af8c-f2fa8b41794f', + command=command) try: - process = await asyncio.create_subprocess_exec (*command, stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.DEVNULL, stdin=asyncio.subprocess.DEVNULL, + process = await asyncio.create_subprocess_exec (*command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL, + stdin=asyncio.subprocess.DEVNULL, start_new_session=True, limit=100*1024*1024) while True: data = await process.stdout.readline () @@ -449,8 +460,33 @@ class RecursiveController: finally: code = await process.wait() if code == 0: - # atomically move once finished - os.rename (dest.name, destpath) + if self.copyLock is None: + # atomically move once finished + lastDestpath = None + while True: + # XXX: must generate a new name every time, otherwise + # this loop never terminates + destpath = formatOutput (self.output) + assert destpath != lastDestpath + lastDestpath = destpath + + # python does not have rename(…, …, RENAME_NOREPLACE), + # but this is safe nontheless, since we’re + # single-threaded + if not os.path.exists (destpath): + # create the directory, so templates like + # /{host}/{date}/… are possible + os.makedirs (os.path.dirname (destpath), exist_ok=True) + os.rename (dest.name, destpath) + break + else: + # atomically (in the context of this process) append to + # existing file + async with self.copyLock: + with open (dest.name, 'rb') as infd, \ + open (self.output, 'ab') as outfd: + shutil.copyfileobj (infd, outfd) + os.unlink (dest.name) else: self.stats['crashed'] += 1 logStats () @@ -464,6 +500,7 @@ class RecursiveController: have=len (self.have)-len(self.running), running=len (self.running)) + seqnum = 1 try: self.have = set () self.pending = set ([SetEntry (self.url, depth=0)]) @@ -472,8 +509,9 @@ class RecursiveController: # since pending is a set this picks a random item, which is fine u = self.pending.pop () self.have.add (u) - t = asyncio.ensure_future (self.fetch (u)) + t = asyncio.ensure_future (self.fetch (u, seqnum)) self.running.add (t) + seqnum += 1 log () diff --git a/crocoite/irc.py b/crocoite/irc.py index 0803398..cb40f4c 100644 --- a/crocoite/irc.py +++ b/crocoite/irc.py @@ -22,7 +22,7 @@ IRC bot “chromebot” """ -import asyncio, argparse, json, tempfile, time, random +import asyncio, argparse, json, tempfile, time, random, os from datetime import datetime from urllib.parse import urlsplit from enum import IntEnum, unique @@ -500,17 +500,21 @@ class Chromebot (ArgparseBot): 'recursive': args.recursive, 'concurrency': args.concurrency, }} - grabCmd = ['crocoite-grab'] + grabCmd = ['crocoite-single'] grabCmd.extend (['--warcinfo', '!' + json.dumps (warcinfo, cls=StrJsonEncoder)]) if args.insecure: grabCmd.append ('--insecure') grabCmd.extend (['{url}', '{dest}']) # prefix warcinfo with !, so it won’t get expanded - cmdline = ['crocoite-recursive', args.url, '--tempdir', self.tempdir, - '--prefix', j.id + '-{host}-{date}-', '--policy', - args.recursive, '--concurrency', str (args.concurrency), - self.destdir, '--'] + grabCmd + cmdline = ['crocoite', + '--tempdir', self.tempdir, + '--recursion', args.recursive, + '--concurrency', str (args.concurrency), + args.url, + os.path.join (self.destdir, + j.id + '-{host}-{date}-{seqnum}.warc.gz'), + '--'] + grabCmd strargs = ', '.join (map (lambda x: '{}={}'.format (*x), showargs.items ())) reply (f'{args.url} has been queued as {j.id} with {strargs}') @@ -640,9 +644,8 @@ class Dashboard: elif msgid == '5c0f9a11-dcd8-4182-a60f-54f4d3ab3687': nesteddata = data['data'] nestedmsgid = nesteddata['uuid'] - if nestedmsgid == '1680f384-744c-4b8a-815b-7346e632e8db': + if nestedmsgid == 'd1288fbe-8bae-42c8-af8c-f2fa8b41794f': del nesteddata['command'] - del nesteddata['destfile'] buf = json.dumps (data) for c in self.clients: -- cgit v1.2.3