From ec5e6a0aea7a2892f66ca1d196d83af521ca3955 Mon Sep 17 00:00:00 2001 From: Lars-Dominik Braun Date: Thu, 4 Jul 2019 12:02:14 +0200 Subject: Rename cli utils crocoite-recursive is now just crocoite, crocoite-grab is not user-facing any more and called crocoite-single. In preparation for 1.0 release. --- crocoite/controller.py | 72 ++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 55 insertions(+), 17 deletions(-) (limited to 'crocoite/controller.py') diff --git a/crocoite/controller.py b/crocoite/controller.py index 02017c3..56fb3bb 100644 --- a/crocoite/controller.py +++ b/crocoite/controller.py @@ -22,7 +22,7 @@ Controller classes, handling actions required for archival """ -import time, tempfile, asyncio, json, os +import time, tempfile, asyncio, json, os, shutil from itertools import islice from datetime import datetime from operator import attrgetter @@ -355,6 +355,10 @@ class PrefixLimit (RecursionPolicy): def __call__ (self, urls): return set (filter (lambda u: str(u.value).startswith (str (self.prefix)), urls)) +def hasTemplate (s): + """ Return True if string s has string templates """ + return '{' in s and '}' in s + class RecursiveController: """ Simple recursive controller @@ -363,19 +367,24 @@ class RecursiveController: """ __slots__ = ('url', 'output', 'command', 'logger', 'policy', 'have', - 'pending', 'stats', 'prefix', 'tempdir', 'running', 'concurrency') + 'pending', 'stats', 'tempdir', 'running', 'concurrency', + 'copyLock') SCHEME_WHITELIST = {'http', 'https'} - def __init__ (self, url, output, command, logger, prefix='{host}-{date}-', + def __init__ (self, url, output, command, logger, tempdir=None, policy=DepthLimit (0), concurrency=1): self.url = url self.output = output self.command = command - self.prefix = prefix self.logger = logger.bind (context=type(self).__name__, seedurl=url) self.policy = policy self.tempdir = tempdir + # A lock if only a single output file (no template) is requested + self.copyLock = None if hasTemplate (output) else asyncio.Lock () + # some sanity checks. XXX move to argparse? + if self.copyLock and os.path.exists (self.output): + raise ValueError ('Output file exists') # tasks currently running self.running = set () # max number of tasks running @@ -383,11 +392,11 @@ class RecursiveController: # keep in sync with StatsHandler self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0, 'crashed': 0, 'ignored': 0} - async def fetch (self, entry): + async def fetch (self, entry, seqnum): """ Fetch a single URL using an external command - command is usually crocoite-grab + command is usually crocoite-single """ assert isinstance (entry, SetEntry) @@ -403,8 +412,9 @@ class RecursiveController: else: return e.format (url=url, dest=dest.name) - def formatPrefix (p): - return p.format (host=url.host, date=datetime.utcnow ().isoformat ()) + def formatOutput (p): + return p.format (host=url.host, + date=datetime.utcnow ().isoformat (), seqnum=seqnum) def logStats (): logger.info ('stats', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **self.stats) @@ -417,14 +427,15 @@ class RecursiveController: return dest = tempfile.NamedTemporaryFile (dir=self.tempdir, - prefix=formatPrefix (self.prefix), suffix='.warc.gz', - delete=False) - destpath = os.path.join (self.output, os.path.basename (dest.name)) + prefix=__package__, suffix='.warc.gz', delete=False) command = list (map (formatCommand, self.command)) - logger.info ('fetch', uuid='1680f384-744c-4b8a-815b-7346e632e8db', command=command, destfile=destpath) + logger.info ('fetch', uuid='d1288fbe-8bae-42c8-af8c-f2fa8b41794f', + command=command) try: - process = await asyncio.create_subprocess_exec (*command, stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.DEVNULL, stdin=asyncio.subprocess.DEVNULL, + process = await asyncio.create_subprocess_exec (*command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL, + stdin=asyncio.subprocess.DEVNULL, start_new_session=True, limit=100*1024*1024) while True: data = await process.stdout.readline () @@ -449,8 +460,33 @@ class RecursiveController: finally: code = await process.wait() if code == 0: - # atomically move once finished - os.rename (dest.name, destpath) + if self.copyLock is None: + # atomically move once finished + lastDestpath = None + while True: + # XXX: must generate a new name every time, otherwise + # this loop never terminates + destpath = formatOutput (self.output) + assert destpath != lastDestpath + lastDestpath = destpath + + # python does not have rename(…, …, RENAME_NOREPLACE), + # but this is safe nontheless, since we’re + # single-threaded + if not os.path.exists (destpath): + # create the directory, so templates like + # /{host}/{date}/… are possible + os.makedirs (os.path.dirname (destpath), exist_ok=True) + os.rename (dest.name, destpath) + break + else: + # atomically (in the context of this process) append to + # existing file + async with self.copyLock: + with open (dest.name, 'rb') as infd, \ + open (self.output, 'ab') as outfd: + shutil.copyfileobj (infd, outfd) + os.unlink (dest.name) else: self.stats['crashed'] += 1 logStats () @@ -464,6 +500,7 @@ class RecursiveController: have=len (self.have)-len(self.running), running=len (self.running)) + seqnum = 1 try: self.have = set () self.pending = set ([SetEntry (self.url, depth=0)]) @@ -472,8 +509,9 @@ class RecursiveController: # since pending is a set this picks a random item, which is fine u = self.pending.pop () self.have.add (u) - t = asyncio.ensure_future (self.fetch (u)) + t = asyncio.ensure_future (self.fetch (u, seqnum)) self.running.add (t) + seqnum += 1 log () -- cgit v1.2.3