summaryrefslogtreecommitdiff
path: root/crocoite
diff options
context:
space:
mode:
Diffstat (limited to 'crocoite')
-rw-r--r--crocoite/cli.py101
-rw-r--r--crocoite/controller.py72
-rw-r--r--crocoite/irc.py19
3 files changed, 102 insertions, 90 deletions
diff --git a/crocoite/cli.py b/crocoite/cli.py
index d89384d..93b742b 100644
--- a/crocoite/cli.py
+++ b/crocoite/cli.py
@@ -42,6 +42,13 @@ from .logger import Logger, JsonPrintConsumer, DatetimeConsumer, \
WarcHandlerConsumer, Level
from .devtools import Crashed
+def absurl (s):
+ """ argparse: Absolute URL """
+ u = URL (s)
+ if u.is_absolute ():
+ return u
+ raise argparse.ArgumentTypeError ('Must be absolute')
+
class SingleExitStatus(IntEnum):
""" Exit status for single-shot command line """
Ok = 0
@@ -50,21 +57,8 @@ class SingleExitStatus(IntEnum):
Navigate = 3
def single ():
- """
- One-shot command line interface and pywb_ playback:
-
- .. code:: bash
-
- pip install pywb
- crocoite-grab http://example.com/ example.com.warc.gz
- rm -rf collections && wb-manager init test && wb-manager add test example.com.warc.gz
- wayback &
- $BROWSER http://localhost:8080
-
- .. _pywb: https://github.com/ikreymer/pywb
- """
- parser = argparse.ArgumentParser(description='Save website to WARC using Google Chrome.')
- parser.add_argument('--browser', help='DevTools URL', metavar='URL')
+ parser = argparse.ArgumentParser(description='crocoite helper tools to fetch individual pages.')
+ parser.add_argument('--browser', help='DevTools URL', type=absurl, metavar='URL')
parser.add_argument('--timeout', default=1*60*60, type=int, help='Maximum time for archival', metavar='SEC')
parser.add_argument('--idle-timeout', default=30, type=int, help='Maximum idle seconds (i.e. no requests)', dest='idleTimeout', metavar='SEC')
parser.add_argument('--behavior', help='Enable behavior script',
@@ -77,7 +71,7 @@ def single ():
parser.add_argument('-k', '--insecure',
action='store_true',
help='Disable certificate validation')
- parser.add_argument('url', help='Website URL', type=URL, metavar='URL')
+ parser.add_argument('url', help='Website URL', type=absurl, metavar='URL')
parser.add_argument('output', help='WARC filename', metavar='FILE')
args = parser.parse_args ()
@@ -135,50 +129,40 @@ def parsePolicy (recursive, url):
return DepthLimit (int (recursive))
elif recursive == 'prefix':
return PrefixLimit (url)
- raise ValueError ('Unsupported')
+ raise argparse.ArgumentTypeError ('Unsupported recursion mode')
def recursive ():
- """
- crocoite is built with the Unix philosophy (“do one thing and do it well”) in
- mind. Thus ``crocoite-grab`` can only save a single page. If you want recursion
- use ``crocoite-recursive``, which follows hyperlinks according to ``--policy``.
- It can either recurse a maximum number of levels or grab all pages with the
- same prefix as the start URL:
-
- .. code:: bash
-
- crocoite-recursive --policy prefix http://www.example.com/dir/ output
-
- will save all pages in ``/dir/`` and below to individual files in the output
- directory ``output``. You can customize the command used to grab individual
- pages by appending it after ``output``. This way distributed grabs (ssh to a
- different machine and execute the job there, queue the command with Slurm, …)
- are possible.
- """
-
logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()])
- parser = argparse.ArgumentParser(description='Recursively run crocoite-grab.')
- parser.add_argument('--policy', help='Recursion policy', metavar='POLICY')
- parser.add_argument('--tempdir', help='Directory for temporary files', metavar='DIR')
- parser.add_argument('--prefix', help='Output filename prefix, supports templates {host} and {date}', metavar='FILENAME', default='{host}-{date}-')
- parser.add_argument('--concurrency', '-j', help='Run at most N jobs', metavar='N', default=1, type=int)
- parser.add_argument('url', help='Seed URL', type=URL, metavar='URL')
- parser.add_argument('output', help='Output directory', metavar='DIR')
- parser.add_argument('command', help='Fetch command, supports templates {url} and {dest}', metavar='CMD', nargs='*', default=['crocoite-grab', '{url}', '{dest}'])
+ parser = argparse.ArgumentParser(description='Save website to WARC using Google Chrome.')
+ parser.add_argument('-j', '--concurrency',
+ help='Run at most N jobs concurrently', metavar='N', default=1,
+ type=int)
+ parser.add_argument('-r', '--recursion', help='Recursion policy',
+ metavar='POLICY')
+ parser.add_argument('--tempdir', help='Directory for temporary files',
+ metavar='DIR')
+ parser.add_argument('url', help='Seed URL', type=absurl, metavar='URL')
+ parser.add_argument('output',
+ help='Output file, supports templates {host}, {date} and {seqnum}',
+ metavar='FILE')
+ parser.add_argument('command',
+ help='Fetch command, supports templates {url} and {dest}',
+ metavar='CMD', nargs='*',
+ default=['crocoite-single', '{url}', '{dest}'])
args = parser.parse_args ()
try:
- policy = parsePolicy (args.policy, args.url)
- except ValueError:
- parser.error ('Invalid argument for --policy')
+ policy = parsePolicy (args.recursion, args.url)
+ except argparse.ArgumentTypeError as e:
+ parser.error (str (e))
- os.makedirs (args.output, exist_ok=True)
-
- controller = RecursiveController (url=args.url, output=args.output,
- command=args.command, logger=logger, policy=policy,
- tempdir=args.tempdir, prefix=args.prefix,
- concurrency=args.concurrency)
+ try:
+ controller = RecursiveController (url=args.url, output=args.output,
+ command=args.command, logger=logger, policy=policy,
+ tempdir=args.tempdir, concurrency=args.concurrency)
+ except ValueError as e:
+ parser.error (str (e))
run = asyncio.ensure_future (controller.run ())
loop = asyncio.get_event_loop()
@@ -191,19 +175,6 @@ def recursive ():
return 0
def irc ():
- """
- A simple IRC bot (“chromebot”) is provided with the command ``crocoite-irc``.
- It reads its configuration from a config file like the example provided in
- ``contrib/chromebot.json`` and supports the following commands:
-
- a <url> -j <concurrency> -r <policy>
- Archive <url> with <concurrency> processes according to recursion <policy>
- s <uuid>
- Get job status for <uuid>
- r <uuid>
- Revoke or abort running job with <uuid>
- """
-
import json, re
from .irc import Chromebot
diff --git a/crocoite/controller.py b/crocoite/controller.py
index 02017c3..56fb3bb 100644
--- a/crocoite/controller.py
+++ b/crocoite/controller.py
@@ -22,7 +22,7 @@
Controller classes, handling actions required for archival
"""
-import time, tempfile, asyncio, json, os
+import time, tempfile, asyncio, json, os, shutil
from itertools import islice
from datetime import datetime
from operator import attrgetter
@@ -355,6 +355,10 @@ class PrefixLimit (RecursionPolicy):
def __call__ (self, urls):
return set (filter (lambda u: str(u.value).startswith (str (self.prefix)), urls))
+def hasTemplate (s):
+ """ Return True if string s has string templates """
+ return '{' in s and '}' in s
+
class RecursiveController:
"""
Simple recursive controller
@@ -363,19 +367,24 @@ class RecursiveController:
"""
__slots__ = ('url', 'output', 'command', 'logger', 'policy', 'have',
- 'pending', 'stats', 'prefix', 'tempdir', 'running', 'concurrency')
+ 'pending', 'stats', 'tempdir', 'running', 'concurrency',
+ 'copyLock')
SCHEME_WHITELIST = {'http', 'https'}
- def __init__ (self, url, output, command, logger, prefix='{host}-{date}-',
+ def __init__ (self, url, output, command, logger,
tempdir=None, policy=DepthLimit (0), concurrency=1):
self.url = url
self.output = output
self.command = command
- self.prefix = prefix
self.logger = logger.bind (context=type(self).__name__, seedurl=url)
self.policy = policy
self.tempdir = tempdir
+ # A lock if only a single output file (no template) is requested
+ self.copyLock = None if hasTemplate (output) else asyncio.Lock ()
+ # some sanity checks. XXX move to argparse?
+ if self.copyLock and os.path.exists (self.output):
+ raise ValueError ('Output file exists')
# tasks currently running
self.running = set ()
# max number of tasks running
@@ -383,11 +392,11 @@ class RecursiveController:
# keep in sync with StatsHandler
self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0, 'crashed': 0, 'ignored': 0}
- async def fetch (self, entry):
+ async def fetch (self, entry, seqnum):
"""
Fetch a single URL using an external command
- command is usually crocoite-grab
+ command is usually crocoite-single
"""
assert isinstance (entry, SetEntry)
@@ -403,8 +412,9 @@ class RecursiveController:
else:
return e.format (url=url, dest=dest.name)
- def formatPrefix (p):
- return p.format (host=url.host, date=datetime.utcnow ().isoformat ())
+ def formatOutput (p):
+ return p.format (host=url.host,
+ date=datetime.utcnow ().isoformat (), seqnum=seqnum)
def logStats ():
logger.info ('stats', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **self.stats)
@@ -417,14 +427,15 @@ class RecursiveController:
return
dest = tempfile.NamedTemporaryFile (dir=self.tempdir,
- prefix=formatPrefix (self.prefix), suffix='.warc.gz',
- delete=False)
- destpath = os.path.join (self.output, os.path.basename (dest.name))
+ prefix=__package__, suffix='.warc.gz', delete=False)
command = list (map (formatCommand, self.command))
- logger.info ('fetch', uuid='1680f384-744c-4b8a-815b-7346e632e8db', command=command, destfile=destpath)
+ logger.info ('fetch', uuid='d1288fbe-8bae-42c8-af8c-f2fa8b41794f',
+ command=command)
try:
- process = await asyncio.create_subprocess_exec (*command, stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.DEVNULL, stdin=asyncio.subprocess.DEVNULL,
+ process = await asyncio.create_subprocess_exec (*command,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.DEVNULL,
+ stdin=asyncio.subprocess.DEVNULL,
start_new_session=True, limit=100*1024*1024)
while True:
data = await process.stdout.readline ()
@@ -449,8 +460,33 @@ class RecursiveController:
finally:
code = await process.wait()
if code == 0:
- # atomically move once finished
- os.rename (dest.name, destpath)
+ if self.copyLock is None:
+ # atomically move once finished
+ lastDestpath = None
+ while True:
+ # XXX: must generate a new name every time, otherwise
+ # this loop never terminates
+ destpath = formatOutput (self.output)
+ assert destpath != lastDestpath
+ lastDestpath = destpath
+
+ # python does not have rename(…, …, RENAME_NOREPLACE),
+ # but this is safe nontheless, since we’re
+ # single-threaded
+ if not os.path.exists (destpath):
+ # create the directory, so templates like
+ # /{host}/{date}/… are possible
+ os.makedirs (os.path.dirname (destpath), exist_ok=True)
+ os.rename (dest.name, destpath)
+ break
+ else:
+ # atomically (in the context of this process) append to
+ # existing file
+ async with self.copyLock:
+ with open (dest.name, 'rb') as infd, \
+ open (self.output, 'ab') as outfd:
+ shutil.copyfileobj (infd, outfd)
+ os.unlink (dest.name)
else:
self.stats['crashed'] += 1
logStats ()
@@ -464,6 +500,7 @@ class RecursiveController:
have=len (self.have)-len(self.running),
running=len (self.running))
+ seqnum = 1
try:
self.have = set ()
self.pending = set ([SetEntry (self.url, depth=0)])
@@ -472,8 +509,9 @@ class RecursiveController:
# since pending is a set this picks a random item, which is fine
u = self.pending.pop ()
self.have.add (u)
- t = asyncio.ensure_future (self.fetch (u))
+ t = asyncio.ensure_future (self.fetch (u, seqnum))
self.running.add (t)
+ seqnum += 1
log ()
diff --git a/crocoite/irc.py b/crocoite/irc.py
index 0803398..cb40f4c 100644
--- a/crocoite/irc.py
+++ b/crocoite/irc.py
@@ -22,7 +22,7 @@
IRC bot “chromebot”
"""
-import asyncio, argparse, json, tempfile, time, random
+import asyncio, argparse, json, tempfile, time, random, os
from datetime import datetime
from urllib.parse import urlsplit
from enum import IntEnum, unique
@@ -500,17 +500,21 @@ class Chromebot (ArgparseBot):
'recursive': args.recursive,
'concurrency': args.concurrency,
}}
- grabCmd = ['crocoite-grab']
+ grabCmd = ['crocoite-single']
grabCmd.extend (['--warcinfo',
'!' + json.dumps (warcinfo, cls=StrJsonEncoder)])
if args.insecure:
grabCmd.append ('--insecure')
grabCmd.extend (['{url}', '{dest}'])
# prefix warcinfo with !, so it won’t get expanded
- cmdline = ['crocoite-recursive', args.url, '--tempdir', self.tempdir,
- '--prefix', j.id + '-{host}-{date}-', '--policy',
- args.recursive, '--concurrency', str (args.concurrency),
- self.destdir, '--'] + grabCmd
+ cmdline = ['crocoite',
+ '--tempdir', self.tempdir,
+ '--recursion', args.recursive,
+ '--concurrency', str (args.concurrency),
+ args.url,
+ os.path.join (self.destdir,
+ j.id + '-{host}-{date}-{seqnum}.warc.gz'),
+ '--'] + grabCmd
strargs = ', '.join (map (lambda x: '{}={}'.format (*x), showargs.items ()))
reply (f'{args.url} has been queued as {j.id} with {strargs}')
@@ -640,9 +644,8 @@ class Dashboard:
elif msgid == '5c0f9a11-dcd8-4182-a60f-54f4d3ab3687':
nesteddata = data['data']
nestedmsgid = nesteddata['uuid']
- if nestedmsgid == '1680f384-744c-4b8a-815b-7346e632e8db':
+ if nestedmsgid == 'd1288fbe-8bae-42c8-af8c-f2fa8b41794f':
del nesteddata['command']
- del nesteddata['destfile']
buf = json.dumps (data)
for c in self.clients: