diff options
Diffstat (limited to 'crocoite/controller.py')
-rw-r--r-- | crocoite/controller.py | 584 |
1 files changed, 401 insertions, 183 deletions
diff --git a/crocoite/controller.py b/crocoite/controller.py index 178d11c..8374b4e 100644 --- a/crocoite/controller.py +++ b/crocoite/controller.py @@ -22,190 +22,291 @@ Controller classes, handling actions required for archival """ +import time, tempfile, asyncio, json, os, shutil, signal +from itertools import islice +from datetime import datetime +from operator import attrgetter +from abc import ABC, abstractmethod +from yarl import URL + +from . import behavior as cbehavior +from .browser import SiteLoader, RequestResponsePair, PageIdle, FrameNavigated +from .util import getFormattedViewportMetrics, getSoftwareInfo +from .behavior import ExtractLinksEvent +from .devtools import toCookieParam + class ControllerSettings: - __slots__ = ('logBuffer', 'maxBodySize', 'idleTimeout', 'timeout') + __slots__ = ('idleTimeout', 'timeout', 'insecure', 'cookies') - def __init__ (self, logBuffer=1000, maxBodySize=50*1024*1024, idleTimeout=2, timeout=10): - self.logBuffer = logBuffer - self.maxBodySize = maxBodySize + def __init__ (self, idleTimeout=2, timeout=10, insecure=False, cookies=None): self.idleTimeout = idleTimeout self.timeout = timeout + self.insecure = insecure + self.cookies = cookies or [] - def toDict (self): - return dict (logBuffer=self.logBuffer, maxBodySize=self.maxBodySize, - idleTimeout=self.idleTimeout, timeout=self.timeout) + def __repr__ (self): + return f'<ControllerSetting idleTimeout={self.idleTimeout!r}, timeout={self.timeout!r}, insecure={self.insecure!r}, cookies={self.cookies!r}>' defaultSettings = ControllerSettings () -class EventHandler: +class EventHandler (ABC): """ Abstract base class for event handler """ __slots__ = () - # this handler wants to know about exceptions before they are reraised by - # the controller - acceptException = False - - def push (self, item): + @abstractmethod + async def push (self, item): raise NotImplementedError () -from .browser import BrowserCrashed - class StatsHandler (EventHandler): - __slots__ = ('stats') - - acceptException = True + __slots__ = ('stats', ) def __init__ (self): - self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0, 'crashed': 0} + self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0} - def push (self, item): - if isinstance (item, Item): + async def push (self, item): + if isinstance (item, RequestResponsePair): self.stats['requests'] += 1 - if item.failed: + if not item.response: self.stats['failed'] += 1 else: self.stats['finished'] += 1 - self.stats['bytesRcv'] += item.encodedDataLength - elif isinstance (item, BrowserCrashed): - self.stats['crashed'] += 1 + self.stats['bytesRcv'] += item.response.bytesReceived -import time, platform +class LogHandler (EventHandler): + """ Handle items by logging information about them """ + + __slots__ = ('logger', ) + + def __init__ (self, logger): + self.logger = logger.bind (context=type (self).__name__) + + async def push (self, item): + if isinstance (item, ExtractLinksEvent): + # limit number of links per message, so json blob won’t get too big + it = iter (item.links) + limit = 100 + while True: + limitlinks = list (islice (it, 0, limit)) + if not limitlinks: + break + self.logger.info ('extracted links', context=type (item).__name__, + uuid='8ee5e9c9-1130-4c5c-88ff-718508546e0c', links=limitlinks) -from . import behavior as cbehavior -from .browser import ChromeService, SiteLoader, Item -from .util import getFormattedViewportMetrics, removeFragment, getRequirements class ControllerStart: - __slots__ = ('payload') + __slots__ = ('payload', ) def __init__ (self, payload): self.payload = payload +class IdleStateTracker (EventHandler): + """ Track SiteLoader’s idle state by listening to PageIdle events """ + + __slots__ = ('_idle', '_loop', '_idleSince') + + def __init__ (self, loop): + self._idle = True + self._loop = loop + + self._idleSince = self._loop.time () + + async def push (self, item): + if isinstance (item, PageIdle): + self._idle = bool (item) + if self._idle: + self._idleSince = self._loop.time () + + async def wait (self, timeout): + """ Wait until page has been idle for at least timeout seconds. If the + page has been idle before calling this function it may return + immediately. """ + + assert timeout > 0 + while True: + if self._idle: + now = self._loop.time () + sleep = timeout-(now-self._idleSince) + if sleep <= 0: + break + else: + # not idle, check again after timeout expires + sleep = timeout + await asyncio.sleep (sleep) + +class InjectBehaviorOnload (EventHandler): + """ Control behavior script injection based on frame navigation messages. + When a page is reloaded (for whatever reason), the scripts need to be + reinjected. """ + + __slots__ = ('controller', '_loaded') + + def __init__ (self, controller): + self.controller = controller + self._loaded = False + + async def push (self, item): + if isinstance (item, FrameNavigated): + await self._runon ('load') + self._loaded = True + + async def stop (self): + if self._loaded: + await self._runon ('stop') + + async def finish (self): + if self._loaded: + await self._runon ('finish') + + async def _runon (self, method): + controller = self.controller + for b in controller._enabledBehavior: + f = getattr (b, 'on' + method) + async for item in f (): + await controller.processItem (item) + class SinglePageController: """ - Archive a single page url to file output. + Archive a single page url. Dispatches between producer (site loader and behavior scripts) and consumer (stats, warc writer). """ - __slots__ = ('url', 'output', 'service', 'behavior', 'settings', 'logger', 'handler') + __slots__ = ('url', 'service', 'behavior', 'settings', 'logger', 'handler', + 'warcinfo', '_enabledBehavior') - def __init__ (self, url, output, logger, \ - service=ChromeService (), behavior=cbehavior.available, \ - settings=defaultSettings, handler=[]): + def __init__ (self, url, logger, \ + service, behavior=cbehavior.available, \ + settings=defaultSettings, handler=None, \ + warcinfo=None): self.url = url - self.output = output self.service = service self.behavior = behavior self.settings = settings self.logger = logger.bind (context=type (self).__name__, url=url) - self.handler = handler - - def processItem (self, item): - if isinstance (item, Exception): - for h in self.handler: - if h.acceptException: - h.push (item) - raise item + self.handler = handler or [] + self.warcinfo = warcinfo + async def processItem (self, item): for h in self.handler: - h.push (item) + await h.push (item) - def run (self): + async def run (self): logger = self.logger - def processQueue (): - # XXX: this is very ugly code and does not work well. figure out a - # better way to impose timeouts and still process all items in the - # queue - queue = l.queue - logger.debug ('process queue', - uuid='dafbf76b-a37e-44db-a021-efb5593b81f8', - queuelen=len (queue)) - while True: - now = time.time () - elapsed = now-start - maxTimeout = max (min (self.settings.idleTimeout, self.settings.timeout-elapsed), 0) - logger.debug ('timeout status', - uuid='49550447-37e3-49ff-9a73-34da1c3e5984', - maxTimeout=maxTimeout, elapsed=elapsed) - # skip waiting if there is work to do. processes all items in - # queue, regardless of timeouts, i.e. you need to make sure the - # queue will actually be empty at some point. - if len (queue) == 0: - if not l.notify.wait (maxTimeout): - assert len (queue) == 0, "event must be sent" - # timed out - logger.debug ('timeout', - uuid='6a7e0083-7c1a-45ba-b1ed-dbc4f26697c6', - elapsed=elapsed) - break - else: - l.notify.clear () - # limit number of items processed here, otherwise timeout won’t - # be checked frequently. this can happen if the site quickly - # loads a lot of items. - for i in range (1000): - try: - item = queue.popleft () - logger.debug ('queue pop', - uuid='adc96bfa-026d-4092-b732-4a022a1a92ca', - item=item, queuelen=len (queue)) - except IndexError: - break - self.processItem (item) - if maxTimeout == 0: - break + async def processQueue (): + async for item in l: + await self.processItem (item) + + idle = IdleStateTracker (asyncio.get_event_loop ()) + self.handler.append (idle) + behavior = InjectBehaviorOnload (self) + self.handler.append (behavior) + + async with self.service as browser, SiteLoader (browser, logger=logger) as l: + handle = asyncio.ensure_future (processQueue ()) + timeoutProc = asyncio.ensure_future (asyncio.sleep (self.settings.timeout)) - with self.service as browser, SiteLoader (browser, self.url, logger=logger) as l: - start = time.time () + # configure browser + tab = l.tab + await tab.Security.setIgnoreCertificateErrors (ignore=self.settings.insecure) + await tab.Network.setCookies (cookies=list (map (toCookieParam, self.settings.cookies))) - version = l.tab.Browser.getVersion () + # not all behavior scripts are allowed for every URL, filter them + self._enabledBehavior = list (filter (lambda x: self.url in x, + map (lambda x: x (l, logger), self.behavior))) + + version = await tab.Browser.getVersion () payload = { - 'software': { - 'platform': platform.platform (), - 'python': { - 'implementation': platform.python_implementation(), - 'version': platform.python_version (), - 'build': platform.python_build () - }, - 'self': getRequirements (__package__) - }, + 'software': getSoftwareInfo (), 'browser': { 'product': version['product'], 'useragent': version['userAgent'], - 'viewport': getFormattedViewportMetrics (l.tab), + 'viewport': await getFormattedViewportMetrics (tab), + }, + 'tool': 'crocoite-single', # not the name of the cli utility + 'parameters': { + 'url': self.url, + 'idleTimeout': self.settings.idleTimeout, + 'timeout': self.settings.timeout, + 'behavior': list (map (attrgetter('name'), self._enabledBehavior)), + 'insecure': self.settings.insecure, + 'cookies': list (map (lambda x: x.OutputString(), self.settings.cookies)), }, } - self.processItem (ControllerStart (payload)) + if self.warcinfo: + payload['extra'] = self.warcinfo + await self.processItem (ControllerStart (payload)) - # not all behavior scripts are allowed for every URL, filter them - enabledBehavior = list (filter (lambda x: self.url in x, - map (lambda x: x (l, logger), self.behavior))) + await l.navigate (self.url) + + idleProc = asyncio.ensure_future (idle.wait (self.settings.idleTimeout)) + while True: + try: + finished, pending = await asyncio.wait([idleProc, timeoutProc, handle], + return_when=asyncio.FIRST_COMPLETED) + except asyncio.CancelledError: + idleProc.cancel () + timeoutProc.cancel () + break - for b in enabledBehavior: - # I decided against using the queue here to limit memory - # usage (screenshot behavior would put all images into - # queue before we could process them) - for item in b.onload (): - self.processItem (item) - l.start () + if handle in finished: + # something went wrong while processing the data + logger.error ('fetch failed', + uuid='43a0686a-a3a9-4214-9acd-43f6976f8ff3') + idleProc.cancel () + timeoutProc.cancel () + handle.result () + assert False # previous line should always raise Exception + elif timeoutProc in finished: + # global timeout + logger.debug ('global timeout', + uuid='2f858adc-9448-4ace-94b4-7cd1484c0728') + idleProc.cancel () + timeoutProc.result () + break + elif idleProc in finished: + # idle timeout + logger.debug ('idle timeout', + uuid='90702590-94c4-44ef-9b37-02a16de444c3') + idleProc.result () + timeoutProc.cancel () + break - processQueue () + await behavior.stop () + await tab.Page.stopLoading () + await asyncio.sleep (1) + await behavior.finish () - for b in enabledBehavior: - for item in b.onstop (): - self.processItem (item) + # wait until loads from behavior scripts are done and browser is + # idle for at least 1 second + try: + await asyncio.wait_for (idle.wait (1), timeout=1) + except (asyncio.TimeoutError, asyncio.CancelledError): + pass - # if we stopped due to timeout, wait for remaining assets - processQueue () + if handle.done (): + handle.result () + else: + handle.cancel () - for b in enabledBehavior: - for item in b.onfinish (): - self.processItem (item) +class SetEntry: + """ A object, to be used with sets, that compares equality only on its + primary property. """ + def __init__ (self, value, **props): + self.value = value + for k, v in props.items (): + setattr (self, k, v) - processQueue () + def __eq__ (self, b): + assert isinstance (b, SetEntry) + return self.value == b.value + + def __hash__ (self): + return hash (self.value) + + def __repr__ (self): + return f'<SetEntry {self.value!r}>' class RecursionPolicy: """ Abstract recursion policy """ @@ -219,23 +320,23 @@ class DepthLimit (RecursionPolicy): """ Limit recursion by depth. - depth==0 means no recursion, depth==1 is the page and outgoing links, … + depth==0 means no recursion, depth==1 is the page and outgoing links """ - __slots__ = ('maxdepth') + __slots__ = ('maxdepth', ) def __init__ (self, maxdepth=0): self.maxdepth = maxdepth def __call__ (self, urls): - if self.maxdepth <= 0: - return {} - else: - self.maxdepth -= 1 - return urls + newurls = set () + for u in urls: + if u.depth <= self.maxdepth: + newurls.add (u) + return newurls def __repr__ (self): - return '<DepthLimit {}>'.format (self.maxdepth) + return f'<DepthLimit {self.maxdepth}>' class PrefixLimit (RecursionPolicy): """ @@ -246,78 +347,195 @@ class PrefixLimit (RecursionPolicy): accepted: http://example.com/foobar http://example.com/foo/bar """ - __slots__ = ('prefix') + __slots__ = ('prefix', ) def __init__ (self, prefix): self.prefix = prefix def __call__ (self, urls): - return set (filter (lambda u: u.startswith (self.prefix), urls)) + return set (filter (lambda u: str(u.value).startswith (str (self.prefix)), urls)) -from .behavior import ExtractLinksEvent +def hasTemplate (s): + """ Return True if string s has string templates """ + return '{' in s and '}' in s -class RecursiveController (EventHandler): +class RecursiveController: """ Simple recursive controller - Visits links acording to recursionPolicy + Visits links acording to policy """ - __slots__ = ('url', 'output', 'service', 'behavior', 'settings', 'logger', - 'recursionPolicy', 'handler', 'urls', 'have') + __slots__ = ('url', 'output', 'command', 'logger', 'policy', 'have', + 'pending', 'stats', 'tempdir', 'running', 'concurrency', + 'copyLock') + + SCHEME_WHITELIST = {'http', 'https'} - def __init__ (self, url, output, logger, - service=ChromeService (), behavior=cbehavior.available, \ - settings=defaultSettings, \ - recursionPolicy=DepthLimit (0), handler=[]): + def __init__ (self, url, output, command, logger, + tempdir=None, policy=DepthLimit (0), concurrency=1): self.url = url self.output = output - self.service = service - self.behavior = behavior - self.settings = settings - self.logger = logger.bind (context=type(self).__name__, url=url) - self.recursionPolicy = recursionPolicy - self.handler = handler - self.handler.append (self) - - def fetch (self, urls): + self.command = command + self.logger = logger.bind (context=type(self).__name__, seedurl=url) + self.policy = policy + self.tempdir = tempdir + # A lock if only a single output file (no template) is requested + self.copyLock = None if hasTemplate (output) else asyncio.Lock () + # some sanity checks. XXX move to argparse? + if self.copyLock and os.path.exists (self.output): + raise ValueError ('Output file exists') + # tasks currently running + self.running = set () + # max number of tasks running + self.concurrency = concurrency + # keep in sync with StatsHandler + self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0, 'crashed': 0, 'ignored': 0} + + async def fetch (self, entry, seqnum): """ - Overrideable fetch action for URLs. Defaults to sequential - SinglePageController. + Fetch a single URL using an external command + + command is usually crocoite-single """ - for u in urls: - try: - c = SinglePageController (url=u, output=self.output, service=self.service, - behavior=self.behavior, logger=self.logger, - settings=self.settings, handler=self.handler) - c.run () - except BrowserCrashed: - # this is fine if reported - self.logger.error ('browser crashed', uuid='42582cbe-fb83-47ce-b330-d022a1c3b331') - - def run (self): - self.have = set () - self.urls = set ([self.url]) - - while self.urls: - self.logger.info ('recursing', - uuid='5b8498e4-868d-413c-a67e-004516b8452c', - numurls=len (self.urls)) - self.have.update (self.urls) - fetchurls = self.urls - self.urls = set () + assert isinstance (entry, SetEntry) - # handler appends new urls to self.urls through push() - self.fetch (fetchurls) + url = entry.value + depth = entry.depth + logger = self.logger.bind (url=url) - # remove urls we have and apply recursion policy - self.urls.difference_update (self.have) - self.urls = self.recursionPolicy (self.urls) + def formatCommand (e): + # provide means to disable variable expansion + if e.startswith ('!'): + return e[1:] + else: + return e.format (url=url, dest=dest.name) + + def formatOutput (p): + return p.format (host=url.host, + date=datetime.utcnow ().isoformat (), seqnum=seqnum) + + def logStats (): + logger.info ('stats', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **self.stats) + + if url.scheme not in self.SCHEME_WHITELIST: + self.stats['ignored'] += 1 + logStats () + self.logger.warning ('scheme not whitelisted', url=url, + uuid='57e838de-4494-4316-ae98-cd3a2ebf541b') + return + + dest = tempfile.NamedTemporaryFile (dir=self.tempdir, + prefix=os.path.basename (self.output) + '-', suffix='.warc.gz', + delete=False) + command = list (map (formatCommand, self.command)) + logger.info ('fetch', uuid='d1288fbe-8bae-42c8-af8c-f2fa8b41794f', + command=command) + try: + process = await asyncio.create_subprocess_exec (*command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL, + stdin=asyncio.subprocess.DEVNULL, + start_new_session=True, limit=100*1024*1024) + while True: + data = await process.stdout.readline () + if not data: + break + data = json.loads (data) + uuid = data.get ('uuid') + if uuid == '8ee5e9c9-1130-4c5c-88ff-718508546e0c': + links = set (self.policy (map (lambda x: SetEntry (URL(x).with_fragment(None), depth=depth+1), data.get ('links', [])))) + links.difference_update (self.have) + self.pending.update (links) + elif uuid == '24d92d16-770e-4088-b769-4020e127a7ff': + for k in self.stats.keys (): + self.stats[k] += data.get (k, 0) + logStats () + except asyncio.CancelledError: + # graceful cancellation + process.send_signal (signal.SIGINT) + except Exception as e: + process.kill () + raise e + finally: + code = await process.wait() + if code == 0: + if self.copyLock is None: + # atomically move once finished + lastDestpath = None + while True: + # XXX: must generate a new name every time, otherwise + # this loop never terminates + destpath = formatOutput (self.output) + assert destpath != lastDestpath + lastDestpath = destpath + + # python does not have rename(…, …, RENAME_NOREPLACE), + # but this is safe nontheless, since we’re + # single-threaded + if not os.path.exists (destpath): + # create the directory, so templates like + # /{host}/{date}/… are possible + os.makedirs (os.path.dirname (destpath), exist_ok=True) + os.rename (dest.name, destpath) + break + else: + # atomically (in the context of this process) append to + # existing file + async with self.copyLock: + with open (dest.name, 'rb') as infd, \ + open (self.output, 'ab') as outfd: + shutil.copyfileobj (infd, outfd) + os.unlink (dest.name) + else: + self.stats['crashed'] += 1 + logStats () - def push (self, item): - if isinstance (item, ExtractLinksEvent): - self.logger.debug ('extracted links', - uuid='8ee5e9c9-1130-4c5c-88ff-718508546e0c', links=item.links) - self.urls.update (map (removeFragment, item.links)) + async def run (self): + def log (): + # self.have includes running jobs + self.logger.info ('recursing', + uuid='5b8498e4-868d-413c-a67e-004516b8452c', + pending=len (self.pending), + have=len (self.have)-len(self.running), + running=len (self.running)) + + seqnum = 1 + try: + self.have = set () + self.pending = set ([SetEntry (self.url, depth=0)]) + + while self.pending: + # since pending is a set this picks a random item, which is fine + u = self.pending.pop () + self.have.add (u) + t = asyncio.ensure_future (self.fetch (u, seqnum)) + self.running.add (t) + seqnum += 1 + + log () + + if len (self.running) >= self.concurrency or not self.pending: + done, pending = await asyncio.wait (self.running, + return_when=asyncio.FIRST_COMPLETED) + self.running.difference_update (done) + # propagate exceptions + for r in done: + r.result () + except asyncio.CancelledError: + self.logger.info ('cancel', + uuid='d58154c8-ec27-40f2-ab9e-e25c1b21cd88', + pending=len (self.pending), + have=len (self.have)-len (self.running), + running=len (self.running)) + finally: + done = await asyncio.gather (*self.running, + return_exceptions=True) + # propagate exceptions + for r in done: + if isinstance (r, Exception): + raise r + self.running = set () + log () |