summaryrefslogtreecommitdiff
path: root/crocoite/controller.py
diff options
context:
space:
mode:
Diffstat (limited to 'crocoite/controller.py')
-rw-r--r--crocoite/controller.py72
1 files changed, 55 insertions, 17 deletions
diff --git a/crocoite/controller.py b/crocoite/controller.py
index 02017c3..56fb3bb 100644
--- a/crocoite/controller.py
+++ b/crocoite/controller.py
@@ -22,7 +22,7 @@
Controller classes, handling actions required for archival
"""
-import time, tempfile, asyncio, json, os
+import time, tempfile, asyncio, json, os, shutil
from itertools import islice
from datetime import datetime
from operator import attrgetter
@@ -355,6 +355,10 @@ class PrefixLimit (RecursionPolicy):
def __call__ (self, urls):
return set (filter (lambda u: str(u.value).startswith (str (self.prefix)), urls))
+def hasTemplate (s):
+ """ Return True if string s has string templates """
+ return '{' in s and '}' in s
+
class RecursiveController:
"""
Simple recursive controller
@@ -363,19 +367,24 @@ class RecursiveController:
"""
__slots__ = ('url', 'output', 'command', 'logger', 'policy', 'have',
- 'pending', 'stats', 'prefix', 'tempdir', 'running', 'concurrency')
+ 'pending', 'stats', 'tempdir', 'running', 'concurrency',
+ 'copyLock')
SCHEME_WHITELIST = {'http', 'https'}
- def __init__ (self, url, output, command, logger, prefix='{host}-{date}-',
+ def __init__ (self, url, output, command, logger,
tempdir=None, policy=DepthLimit (0), concurrency=1):
self.url = url
self.output = output
self.command = command
- self.prefix = prefix
self.logger = logger.bind (context=type(self).__name__, seedurl=url)
self.policy = policy
self.tempdir = tempdir
+ # A lock if only a single output file (no template) is requested
+ self.copyLock = None if hasTemplate (output) else asyncio.Lock ()
+ # some sanity checks. XXX move to argparse?
+ if self.copyLock and os.path.exists (self.output):
+ raise ValueError ('Output file exists')
# tasks currently running
self.running = set ()
# max number of tasks running
@@ -383,11 +392,11 @@ class RecursiveController:
# keep in sync with StatsHandler
self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0, 'crashed': 0, 'ignored': 0}
- async def fetch (self, entry):
+ async def fetch (self, entry, seqnum):
"""
Fetch a single URL using an external command
- command is usually crocoite-grab
+ command is usually crocoite-single
"""
assert isinstance (entry, SetEntry)
@@ -403,8 +412,9 @@ class RecursiveController:
else:
return e.format (url=url, dest=dest.name)
- def formatPrefix (p):
- return p.format (host=url.host, date=datetime.utcnow ().isoformat ())
+ def formatOutput (p):
+ return p.format (host=url.host,
+ date=datetime.utcnow ().isoformat (), seqnum=seqnum)
def logStats ():
logger.info ('stats', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **self.stats)
@@ -417,14 +427,15 @@ class RecursiveController:
return
dest = tempfile.NamedTemporaryFile (dir=self.tempdir,
- prefix=formatPrefix (self.prefix), suffix='.warc.gz',
- delete=False)
- destpath = os.path.join (self.output, os.path.basename (dest.name))
+ prefix=__package__, suffix='.warc.gz', delete=False)
command = list (map (formatCommand, self.command))
- logger.info ('fetch', uuid='1680f384-744c-4b8a-815b-7346e632e8db', command=command, destfile=destpath)
+ logger.info ('fetch', uuid='d1288fbe-8bae-42c8-af8c-f2fa8b41794f',
+ command=command)
try:
- process = await asyncio.create_subprocess_exec (*command, stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.DEVNULL, stdin=asyncio.subprocess.DEVNULL,
+ process = await asyncio.create_subprocess_exec (*command,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.DEVNULL,
+ stdin=asyncio.subprocess.DEVNULL,
start_new_session=True, limit=100*1024*1024)
while True:
data = await process.stdout.readline ()
@@ -449,8 +460,33 @@ class RecursiveController:
finally:
code = await process.wait()
if code == 0:
- # atomically move once finished
- os.rename (dest.name, destpath)
+ if self.copyLock is None:
+ # atomically move once finished
+ lastDestpath = None
+ while True:
+ # XXX: must generate a new name every time, otherwise
+ # this loop never terminates
+ destpath = formatOutput (self.output)
+ assert destpath != lastDestpath
+ lastDestpath = destpath
+
+ # python does not have rename(…, …, RENAME_NOREPLACE),
+ # but this is safe nontheless, since we’re
+ # single-threaded
+ if not os.path.exists (destpath):
+ # create the directory, so templates like
+ # /{host}/{date}/… are possible
+ os.makedirs (os.path.dirname (destpath), exist_ok=True)
+ os.rename (dest.name, destpath)
+ break
+ else:
+ # atomically (in the context of this process) append to
+ # existing file
+ async with self.copyLock:
+ with open (dest.name, 'rb') as infd, \
+ open (self.output, 'ab') as outfd:
+ shutil.copyfileobj (infd, outfd)
+ os.unlink (dest.name)
else:
self.stats['crashed'] += 1
logStats ()
@@ -464,6 +500,7 @@ class RecursiveController:
have=len (self.have)-len(self.running),
running=len (self.running))
+ seqnum = 1
try:
self.have = set ()
self.pending = set ([SetEntry (self.url, depth=0)])
@@ -472,8 +509,9 @@ class RecursiveController:
# since pending is a set this picks a random item, which is fine
u = self.pending.pop ()
self.have.add (u)
- t = asyncio.ensure_future (self.fetch (u))
+ t = asyncio.ensure_future (self.fetch (u, seqnum))
self.running.add (t)
+ seqnum += 1
log ()