summaryrefslogtreecommitdiff
path: root/crocoite/controller.py
diff options
context:
space:
mode:
Diffstat (limited to 'crocoite/controller.py')
-rw-r--r--crocoite/controller.py584
1 files changed, 401 insertions, 183 deletions
diff --git a/crocoite/controller.py b/crocoite/controller.py
index 178d11c..8374b4e 100644
--- a/crocoite/controller.py
+++ b/crocoite/controller.py
@@ -22,190 +22,291 @@
Controller classes, handling actions required for archival
"""
+import time, tempfile, asyncio, json, os, shutil, signal
+from itertools import islice
+from datetime import datetime
+from operator import attrgetter
+from abc import ABC, abstractmethod
+from yarl import URL
+
+from . import behavior as cbehavior
+from .browser import SiteLoader, RequestResponsePair, PageIdle, FrameNavigated
+from .util import getFormattedViewportMetrics, getSoftwareInfo
+from .behavior import ExtractLinksEvent
+from .devtools import toCookieParam
+
class ControllerSettings:
- __slots__ = ('logBuffer', 'maxBodySize', 'idleTimeout', 'timeout')
+ __slots__ = ('idleTimeout', 'timeout', 'insecure', 'cookies')
- def __init__ (self, logBuffer=1000, maxBodySize=50*1024*1024, idleTimeout=2, timeout=10):
- self.logBuffer = logBuffer
- self.maxBodySize = maxBodySize
+ def __init__ (self, idleTimeout=2, timeout=10, insecure=False, cookies=None):
self.idleTimeout = idleTimeout
self.timeout = timeout
+ self.insecure = insecure
+ self.cookies = cookies or []
- def toDict (self):
- return dict (logBuffer=self.logBuffer, maxBodySize=self.maxBodySize,
- idleTimeout=self.idleTimeout, timeout=self.timeout)
+ def __repr__ (self):
+ return f'<ControllerSetting idleTimeout={self.idleTimeout!r}, timeout={self.timeout!r}, insecure={self.insecure!r}, cookies={self.cookies!r}>'
defaultSettings = ControllerSettings ()
-class EventHandler:
+class EventHandler (ABC):
""" Abstract base class for event handler """
__slots__ = ()
- # this handler wants to know about exceptions before they are reraised by
- # the controller
- acceptException = False
-
- def push (self, item):
+ @abstractmethod
+ async def push (self, item):
raise NotImplementedError ()
-from .browser import BrowserCrashed
-
class StatsHandler (EventHandler):
- __slots__ = ('stats')
-
- acceptException = True
+ __slots__ = ('stats', )
def __init__ (self):
- self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0, 'crashed': 0}
+ self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0}
- def push (self, item):
- if isinstance (item, Item):
+ async def push (self, item):
+ if isinstance (item, RequestResponsePair):
self.stats['requests'] += 1
- if item.failed:
+ if not item.response:
self.stats['failed'] += 1
else:
self.stats['finished'] += 1
- self.stats['bytesRcv'] += item.encodedDataLength
- elif isinstance (item, BrowserCrashed):
- self.stats['crashed'] += 1
+ self.stats['bytesRcv'] += item.response.bytesReceived
-import time, platform
+class LogHandler (EventHandler):
+ """ Handle items by logging information about them """
+
+ __slots__ = ('logger', )
+
+ def __init__ (self, logger):
+ self.logger = logger.bind (context=type (self).__name__)
+
+ async def push (self, item):
+ if isinstance (item, ExtractLinksEvent):
+ # limit number of links per message, so json blob won’t get too big
+ it = iter (item.links)
+ limit = 100
+ while True:
+ limitlinks = list (islice (it, 0, limit))
+ if not limitlinks:
+ break
+ self.logger.info ('extracted links', context=type (item).__name__,
+ uuid='8ee5e9c9-1130-4c5c-88ff-718508546e0c', links=limitlinks)
-from . import behavior as cbehavior
-from .browser import ChromeService, SiteLoader, Item
-from .util import getFormattedViewportMetrics, removeFragment, getRequirements
class ControllerStart:
- __slots__ = ('payload')
+ __slots__ = ('payload', )
def __init__ (self, payload):
self.payload = payload
+class IdleStateTracker (EventHandler):
+ """ Track SiteLoader’s idle state by listening to PageIdle events """
+
+ __slots__ = ('_idle', '_loop', '_idleSince')
+
+ def __init__ (self, loop):
+ self._idle = True
+ self._loop = loop
+
+ self._idleSince = self._loop.time ()
+
+ async def push (self, item):
+ if isinstance (item, PageIdle):
+ self._idle = bool (item)
+ if self._idle:
+ self._idleSince = self._loop.time ()
+
+ async def wait (self, timeout):
+ """ Wait until page has been idle for at least timeout seconds. If the
+ page has been idle before calling this function it may return
+ immediately. """
+
+ assert timeout > 0
+ while True:
+ if self._idle:
+ now = self._loop.time ()
+ sleep = timeout-(now-self._idleSince)
+ if sleep <= 0:
+ break
+ else:
+ # not idle, check again after timeout expires
+ sleep = timeout
+ await asyncio.sleep (sleep)
+
+class InjectBehaviorOnload (EventHandler):
+ """ Control behavior script injection based on frame navigation messages.
+ When a page is reloaded (for whatever reason), the scripts need to be
+ reinjected. """
+
+ __slots__ = ('controller', '_loaded')
+
+ def __init__ (self, controller):
+ self.controller = controller
+ self._loaded = False
+
+ async def push (self, item):
+ if isinstance (item, FrameNavigated):
+ await self._runon ('load')
+ self._loaded = True
+
+ async def stop (self):
+ if self._loaded:
+ await self._runon ('stop')
+
+ async def finish (self):
+ if self._loaded:
+ await self._runon ('finish')
+
+ async def _runon (self, method):
+ controller = self.controller
+ for b in controller._enabledBehavior:
+ f = getattr (b, 'on' + method)
+ async for item in f ():
+ await controller.processItem (item)
+
class SinglePageController:
"""
- Archive a single page url to file output.
+ Archive a single page url.
Dispatches between producer (site loader and behavior scripts) and consumer
(stats, warc writer).
"""
- __slots__ = ('url', 'output', 'service', 'behavior', 'settings', 'logger', 'handler')
+ __slots__ = ('url', 'service', 'behavior', 'settings', 'logger', 'handler',
+ 'warcinfo', '_enabledBehavior')
- def __init__ (self, url, output, logger, \
- service=ChromeService (), behavior=cbehavior.available, \
- settings=defaultSettings, handler=[]):
+ def __init__ (self, url, logger, \
+ service, behavior=cbehavior.available, \
+ settings=defaultSettings, handler=None, \
+ warcinfo=None):
self.url = url
- self.output = output
self.service = service
self.behavior = behavior
self.settings = settings
self.logger = logger.bind (context=type (self).__name__, url=url)
- self.handler = handler
-
- def processItem (self, item):
- if isinstance (item, Exception):
- for h in self.handler:
- if h.acceptException:
- h.push (item)
- raise item
+ self.handler = handler or []
+ self.warcinfo = warcinfo
+ async def processItem (self, item):
for h in self.handler:
- h.push (item)
+ await h.push (item)
- def run (self):
+ async def run (self):
logger = self.logger
- def processQueue ():
- # XXX: this is very ugly code and does not work well. figure out a
- # better way to impose timeouts and still process all items in the
- # queue
- queue = l.queue
- logger.debug ('process queue',
- uuid='dafbf76b-a37e-44db-a021-efb5593b81f8',
- queuelen=len (queue))
- while True:
- now = time.time ()
- elapsed = now-start
- maxTimeout = max (min (self.settings.idleTimeout, self.settings.timeout-elapsed), 0)
- logger.debug ('timeout status',
- uuid='49550447-37e3-49ff-9a73-34da1c3e5984',
- maxTimeout=maxTimeout, elapsed=elapsed)
- # skip waiting if there is work to do. processes all items in
- # queue, regardless of timeouts, i.e. you need to make sure the
- # queue will actually be empty at some point.
- if len (queue) == 0:
- if not l.notify.wait (maxTimeout):
- assert len (queue) == 0, "event must be sent"
- # timed out
- logger.debug ('timeout',
- uuid='6a7e0083-7c1a-45ba-b1ed-dbc4f26697c6',
- elapsed=elapsed)
- break
- else:
- l.notify.clear ()
- # limit number of items processed here, otherwise timeout won’t
- # be checked frequently. this can happen if the site quickly
- # loads a lot of items.
- for i in range (1000):
- try:
- item = queue.popleft ()
- logger.debug ('queue pop',
- uuid='adc96bfa-026d-4092-b732-4a022a1a92ca',
- item=item, queuelen=len (queue))
- except IndexError:
- break
- self.processItem (item)
- if maxTimeout == 0:
- break
+ async def processQueue ():
+ async for item in l:
+ await self.processItem (item)
+
+ idle = IdleStateTracker (asyncio.get_event_loop ())
+ self.handler.append (idle)
+ behavior = InjectBehaviorOnload (self)
+ self.handler.append (behavior)
+
+ async with self.service as browser, SiteLoader (browser, logger=logger) as l:
+ handle = asyncio.ensure_future (processQueue ())
+ timeoutProc = asyncio.ensure_future (asyncio.sleep (self.settings.timeout))
- with self.service as browser, SiteLoader (browser, self.url, logger=logger) as l:
- start = time.time ()
+ # configure browser
+ tab = l.tab
+ await tab.Security.setIgnoreCertificateErrors (ignore=self.settings.insecure)
+ await tab.Network.setCookies (cookies=list (map (toCookieParam, self.settings.cookies)))
- version = l.tab.Browser.getVersion ()
+ # not all behavior scripts are allowed for every URL, filter them
+ self._enabledBehavior = list (filter (lambda x: self.url in x,
+ map (lambda x: x (l, logger), self.behavior)))
+
+ version = await tab.Browser.getVersion ()
payload = {
- 'software': {
- 'platform': platform.platform (),
- 'python': {
- 'implementation': platform.python_implementation(),
- 'version': platform.python_version (),
- 'build': platform.python_build ()
- },
- 'self': getRequirements (__package__)
- },
+ 'software': getSoftwareInfo (),
'browser': {
'product': version['product'],
'useragent': version['userAgent'],
- 'viewport': getFormattedViewportMetrics (l.tab),
+ 'viewport': await getFormattedViewportMetrics (tab),
+ },
+ 'tool': 'crocoite-single', # not the name of the cli utility
+ 'parameters': {
+ 'url': self.url,
+ 'idleTimeout': self.settings.idleTimeout,
+ 'timeout': self.settings.timeout,
+ 'behavior': list (map (attrgetter('name'), self._enabledBehavior)),
+ 'insecure': self.settings.insecure,
+ 'cookies': list (map (lambda x: x.OutputString(), self.settings.cookies)),
},
}
- self.processItem (ControllerStart (payload))
+ if self.warcinfo:
+ payload['extra'] = self.warcinfo
+ await self.processItem (ControllerStart (payload))
- # not all behavior scripts are allowed for every URL, filter them
- enabledBehavior = list (filter (lambda x: self.url in x,
- map (lambda x: x (l, logger), self.behavior)))
+ await l.navigate (self.url)
+
+ idleProc = asyncio.ensure_future (idle.wait (self.settings.idleTimeout))
+ while True:
+ try:
+ finished, pending = await asyncio.wait([idleProc, timeoutProc, handle],
+ return_when=asyncio.FIRST_COMPLETED)
+ except asyncio.CancelledError:
+ idleProc.cancel ()
+ timeoutProc.cancel ()
+ break
- for b in enabledBehavior:
- # I decided against using the queue here to limit memory
- # usage (screenshot behavior would put all images into
- # queue before we could process them)
- for item in b.onload ():
- self.processItem (item)
- l.start ()
+ if handle in finished:
+ # something went wrong while processing the data
+ logger.error ('fetch failed',
+ uuid='43a0686a-a3a9-4214-9acd-43f6976f8ff3')
+ idleProc.cancel ()
+ timeoutProc.cancel ()
+ handle.result ()
+ assert False # previous line should always raise Exception
+ elif timeoutProc in finished:
+ # global timeout
+ logger.debug ('global timeout',
+ uuid='2f858adc-9448-4ace-94b4-7cd1484c0728')
+ idleProc.cancel ()
+ timeoutProc.result ()
+ break
+ elif idleProc in finished:
+ # idle timeout
+ logger.debug ('idle timeout',
+ uuid='90702590-94c4-44ef-9b37-02a16de444c3')
+ idleProc.result ()
+ timeoutProc.cancel ()
+ break
- processQueue ()
+ await behavior.stop ()
+ await tab.Page.stopLoading ()
+ await asyncio.sleep (1)
+ await behavior.finish ()
- for b in enabledBehavior:
- for item in b.onstop ():
- self.processItem (item)
+ # wait until loads from behavior scripts are done and browser is
+ # idle for at least 1 second
+ try:
+ await asyncio.wait_for (idle.wait (1), timeout=1)
+ except (asyncio.TimeoutError, asyncio.CancelledError):
+ pass
- # if we stopped due to timeout, wait for remaining assets
- processQueue ()
+ if handle.done ():
+ handle.result ()
+ else:
+ handle.cancel ()
- for b in enabledBehavior:
- for item in b.onfinish ():
- self.processItem (item)
+class SetEntry:
+ """ A object, to be used with sets, that compares equality only on its
+ primary property. """
+ def __init__ (self, value, **props):
+ self.value = value
+ for k, v in props.items ():
+ setattr (self, k, v)
- processQueue ()
+ def __eq__ (self, b):
+ assert isinstance (b, SetEntry)
+ return self.value == b.value
+
+ def __hash__ (self):
+ return hash (self.value)
+
+ def __repr__ (self):
+ return f'<SetEntry {self.value!r}>'
class RecursionPolicy:
""" Abstract recursion policy """
@@ -219,23 +320,23 @@ class DepthLimit (RecursionPolicy):
"""
Limit recursion by depth.
- depth==0 means no recursion, depth==1 is the page and outgoing links, …
+ depth==0 means no recursion, depth==1 is the page and outgoing links
"""
- __slots__ = ('maxdepth')
+ __slots__ = ('maxdepth', )
def __init__ (self, maxdepth=0):
self.maxdepth = maxdepth
def __call__ (self, urls):
- if self.maxdepth <= 0:
- return {}
- else:
- self.maxdepth -= 1
- return urls
+ newurls = set ()
+ for u in urls:
+ if u.depth <= self.maxdepth:
+ newurls.add (u)
+ return newurls
def __repr__ (self):
- return '<DepthLimit {}>'.format (self.maxdepth)
+ return f'<DepthLimit {self.maxdepth}>'
class PrefixLimit (RecursionPolicy):
"""
@@ -246,78 +347,195 @@ class PrefixLimit (RecursionPolicy):
accepted: http://example.com/foobar http://example.com/foo/bar
"""
- __slots__ = ('prefix')
+ __slots__ = ('prefix', )
def __init__ (self, prefix):
self.prefix = prefix
def __call__ (self, urls):
- return set (filter (lambda u: u.startswith (self.prefix), urls))
+ return set (filter (lambda u: str(u.value).startswith (str (self.prefix)), urls))
-from .behavior import ExtractLinksEvent
+def hasTemplate (s):
+ """ Return True if string s has string templates """
+ return '{' in s and '}' in s
-class RecursiveController (EventHandler):
+class RecursiveController:
"""
Simple recursive controller
- Visits links acording to recursionPolicy
+ Visits links acording to policy
"""
- __slots__ = ('url', 'output', 'service', 'behavior', 'settings', 'logger',
- 'recursionPolicy', 'handler', 'urls', 'have')
+ __slots__ = ('url', 'output', 'command', 'logger', 'policy', 'have',
+ 'pending', 'stats', 'tempdir', 'running', 'concurrency',
+ 'copyLock')
+
+ SCHEME_WHITELIST = {'http', 'https'}
- def __init__ (self, url, output, logger,
- service=ChromeService (), behavior=cbehavior.available, \
- settings=defaultSettings, \
- recursionPolicy=DepthLimit (0), handler=[]):
+ def __init__ (self, url, output, command, logger,
+ tempdir=None, policy=DepthLimit (0), concurrency=1):
self.url = url
self.output = output
- self.service = service
- self.behavior = behavior
- self.settings = settings
- self.logger = logger.bind (context=type(self).__name__, url=url)
- self.recursionPolicy = recursionPolicy
- self.handler = handler
- self.handler.append (self)
-
- def fetch (self, urls):
+ self.command = command
+ self.logger = logger.bind (context=type(self).__name__, seedurl=url)
+ self.policy = policy
+ self.tempdir = tempdir
+ # A lock if only a single output file (no template) is requested
+ self.copyLock = None if hasTemplate (output) else asyncio.Lock ()
+ # some sanity checks. XXX move to argparse?
+ if self.copyLock and os.path.exists (self.output):
+ raise ValueError ('Output file exists')
+ # tasks currently running
+ self.running = set ()
+ # max number of tasks running
+ self.concurrency = concurrency
+ # keep in sync with StatsHandler
+ self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0, 'crashed': 0, 'ignored': 0}
+
+ async def fetch (self, entry, seqnum):
"""
- Overrideable fetch action for URLs. Defaults to sequential
- SinglePageController.
+ Fetch a single URL using an external command
+
+ command is usually crocoite-single
"""
- for u in urls:
- try:
- c = SinglePageController (url=u, output=self.output, service=self.service,
- behavior=self.behavior, logger=self.logger,
- settings=self.settings, handler=self.handler)
- c.run ()
- except BrowserCrashed:
- # this is fine if reported
- self.logger.error ('browser crashed', uuid='42582cbe-fb83-47ce-b330-d022a1c3b331')
-
- def run (self):
- self.have = set ()
- self.urls = set ([self.url])
-
- while self.urls:
- self.logger.info ('recursing',
- uuid='5b8498e4-868d-413c-a67e-004516b8452c',
- numurls=len (self.urls))
- self.have.update (self.urls)
- fetchurls = self.urls
- self.urls = set ()
+ assert isinstance (entry, SetEntry)
- # handler appends new urls to self.urls through push()
- self.fetch (fetchurls)
+ url = entry.value
+ depth = entry.depth
+ logger = self.logger.bind (url=url)
- # remove urls we have and apply recursion policy
- self.urls.difference_update (self.have)
- self.urls = self.recursionPolicy (self.urls)
+ def formatCommand (e):
+ # provide means to disable variable expansion
+ if e.startswith ('!'):
+ return e[1:]
+ else:
+ return e.format (url=url, dest=dest.name)
+
+ def formatOutput (p):
+ return p.format (host=url.host,
+ date=datetime.utcnow ().isoformat (), seqnum=seqnum)
+
+ def logStats ():
+ logger.info ('stats', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **self.stats)
+
+ if url.scheme not in self.SCHEME_WHITELIST:
+ self.stats['ignored'] += 1
+ logStats ()
+ self.logger.warning ('scheme not whitelisted', url=url,
+ uuid='57e838de-4494-4316-ae98-cd3a2ebf541b')
+ return
+
+ dest = tempfile.NamedTemporaryFile (dir=self.tempdir,
+ prefix=os.path.basename (self.output) + '-', suffix='.warc.gz',
+ delete=False)
+ command = list (map (formatCommand, self.command))
+ logger.info ('fetch', uuid='d1288fbe-8bae-42c8-af8c-f2fa8b41794f',
+ command=command)
+ try:
+ process = await asyncio.create_subprocess_exec (*command,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.DEVNULL,
+ stdin=asyncio.subprocess.DEVNULL,
+ start_new_session=True, limit=100*1024*1024)
+ while True:
+ data = await process.stdout.readline ()
+ if not data:
+ break
+ data = json.loads (data)
+ uuid = data.get ('uuid')
+ if uuid == '8ee5e9c9-1130-4c5c-88ff-718508546e0c':
+ links = set (self.policy (map (lambda x: SetEntry (URL(x).with_fragment(None), depth=depth+1), data.get ('links', []))))
+ links.difference_update (self.have)
+ self.pending.update (links)
+ elif uuid == '24d92d16-770e-4088-b769-4020e127a7ff':
+ for k in self.stats.keys ():
+ self.stats[k] += data.get (k, 0)
+ logStats ()
+ except asyncio.CancelledError:
+ # graceful cancellation
+ process.send_signal (signal.SIGINT)
+ except Exception as e:
+ process.kill ()
+ raise e
+ finally:
+ code = await process.wait()
+ if code == 0:
+ if self.copyLock is None:
+ # atomically move once finished
+ lastDestpath = None
+ while True:
+ # XXX: must generate a new name every time, otherwise
+ # this loop never terminates
+ destpath = formatOutput (self.output)
+ assert destpath != lastDestpath
+ lastDestpath = destpath
+
+ # python does not have rename(…, …, RENAME_NOREPLACE),
+ # but this is safe nontheless, since we’re
+ # single-threaded
+ if not os.path.exists (destpath):
+ # create the directory, so templates like
+ # /{host}/{date}/… are possible
+ os.makedirs (os.path.dirname (destpath), exist_ok=True)
+ os.rename (dest.name, destpath)
+ break
+ else:
+ # atomically (in the context of this process) append to
+ # existing file
+ async with self.copyLock:
+ with open (dest.name, 'rb') as infd, \
+ open (self.output, 'ab') as outfd:
+ shutil.copyfileobj (infd, outfd)
+ os.unlink (dest.name)
+ else:
+ self.stats['crashed'] += 1
+ logStats ()
- def push (self, item):
- if isinstance (item, ExtractLinksEvent):
- self.logger.debug ('extracted links',
- uuid='8ee5e9c9-1130-4c5c-88ff-718508546e0c', links=item.links)
- self.urls.update (map (removeFragment, item.links))
+ async def run (self):
+ def log ():
+ # self.have includes running jobs
+ self.logger.info ('recursing',
+ uuid='5b8498e4-868d-413c-a67e-004516b8452c',
+ pending=len (self.pending),
+ have=len (self.have)-len(self.running),
+ running=len (self.running))
+
+ seqnum = 1
+ try:
+ self.have = set ()
+ self.pending = set ([SetEntry (self.url, depth=0)])
+
+ while self.pending:
+ # since pending is a set this picks a random item, which is fine
+ u = self.pending.pop ()
+ self.have.add (u)
+ t = asyncio.ensure_future (self.fetch (u, seqnum))
+ self.running.add (t)
+ seqnum += 1
+
+ log ()
+
+ if len (self.running) >= self.concurrency or not self.pending:
+ done, pending = await asyncio.wait (self.running,
+ return_when=asyncio.FIRST_COMPLETED)
+ self.running.difference_update (done)
+ # propagate exceptions
+ for r in done:
+ r.result ()
+ except asyncio.CancelledError:
+ self.logger.info ('cancel',
+ uuid='d58154c8-ec27-40f2-ab9e-e25c1b21cd88',
+ pending=len (self.pending),
+ have=len (self.have)-len (self.running),
+ running=len (self.running))
+ finally:
+ done = await asyncio.gather (*self.running,
+ return_exceptions=True)
+ # propagate exceptions
+ for r in done:
+ if isinstance (r, Exception):
+ raise r
+ self.running = set ()
+ log ()