summaryrefslogtreecommitdiff
path: root/crocoite/controller.py
diff options
context:
space:
mode:
Diffstat (limited to 'crocoite/controller.py')
-rw-r--r--crocoite/controller.py260
1 files changed, 161 insertions, 99 deletions
diff --git a/crocoite/controller.py b/crocoite/controller.py
index bc6f948..cdae268 100644
--- a/crocoite/controller.py
+++ b/crocoite/controller.py
@@ -35,99 +35,151 @@ class ControllerSettings:
defaultSettings = ControllerSettings ()
-import logging
-from urllib.parse import urlsplit, urlunsplit
+class EventHandler:
+ """ Abstract base class for event handler """
+
+ # this handler wants to know about exceptions before they are reraised by
+ # the controller
+ acceptException = False
+
+ def push (self, item):
+ raise NotImplementedError ()
+
+from .browser import BrowserCrashed
+
+class StatsHandler (EventHandler):
+ acceptException = True
-import pychrome
+ def __init__ (self):
+ self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0, 'crashed': 0}
+
+ def push (self, item):
+ if isinstance (item, Item):
+ self.stats['requests'] += 1
+ if item.failed:
+ self.stats['failed'] += 1
+ else:
+ self.stats['finished'] += 1
+ self.stats['bytesRcv'] += item.encodedDataLength
+ elif isinstance (item, BrowserCrashed):
+ self.stats['crashed'] += 1
+
+import logging, time
+from urllib.parse import urlsplit, urlunsplit
from . import behavior as cbehavior
-from .browser import ChromeService
-from .warc import WarcLoader, SerializingWARCWriter
+from .browser import ChromeService, SiteLoader, Item
from .util import getFormattedViewportMetrics
-def firstOrNone (it):
- """ Return first item of iterator it or None if empty """
- try:
- return next (it)
- except StopIteration:
- return None
+class ControllerStart:
+ def __init__ (self, payload):
+ self.payload = payload
class SinglePageController:
"""
Archive a single page url to file output.
+
+ Dispatches between producer (site loader and behavior scripts) and consumer
+ (stats, warc writer).
"""
def __init__ (self, url, output, service=ChromeService (), behavior=cbehavior.available, \
- logger=logging.getLogger(__name__), settings=defaultSettings):
+ logger=logging.getLogger(__name__), settings=defaultSettings, handler=[]):
self.url = url
self.output = output
self.service = service
self.behavior = behavior
self.settings = settings
self.logger = logger
+ self.handler = handler
+
+ def processItem (self, item):
+ if isinstance (item, Exception):
+ for h in self.handler:
+ if h.acceptException:
+ h.push (item)
+ raise item
+
+ for h in self.handler:
+ h.push (item)
def run (self):
- ret = {'stats': None, 'links': []}
-
- with self.service as browser:
- browser = pychrome.Browser (url=browser)
- writer = SerializingWARCWriter (self.output, gzip=True)
-
- with WarcLoader (browser, self.url, writer,
- logBuffer=self.settings.logBuffer,
- maxBodySize=self.settings.maxBodySize) as l:
- version = l.tab.Browser.getVersion ()
- payload = {
- 'software': __package__,
- 'browser': version['product'],
- 'useragent': version['userAgent'],
- 'viewport': getFormattedViewportMetrics (l.tab),
- }
- warcinfo = writer.create_warcinfo_record (filename=None, info=payload)
- writer.write_record (warcinfo)
-
- # not all behavior scripts are allowed for every URL, filter them
- enabledBehavior = list (filter (lambda x: self.url in x,
- map (lambda x: x (l), self.behavior)))
- linksBehavior = firstOrNone (filter (lambda x: isinstance (x, cbehavior.ExtractLinks),
- enabledBehavior))
-
- for b in enabledBehavior:
- self.logger.debug ('starting onload behavior {}'.format (b.name))
- b.onload ()
- l.start ()
-
- l.waitIdle (self.settings.idleTimeout, self.settings.timeout)
-
- for b in enabledBehavior:
- self.logger.debug ('starting onstop behavior {}'.format (b.name))
- b.onstop ()
-
- # if we stopped due to timeout, wait for remaining assets
- l.waitIdle (2, 60)
- l.stop ()
-
- for b in enabledBehavior:
- self.logger.debug ('starting onfinish behavior {}'.format (b.name))
- b.onfinish ()
-
- ret['stats'] = l.stats
- ret['links'] = linksBehavior.links if linksBehavior else None
- writer.flush ()
- return ret
-
-from collections import UserDict
-
-class IntegerDict (UserDict):
- """ Dict with dict/dict per-item arithmetic propagation, i.e. {1: 2}+{1: 1}={1: 3} """
- def __add__ (self, b):
- newdict = self.__class__ (self)
- for k, v in b.items ():
- if k in self:
- newdict[k] += v
- else:
- newdict[k] = v
- return newdict
+ def processQueue ():
+ # XXX: this is very ugly code and does not work well. figure out a
+ # better way to impose timeouts and still process all items in the
+ # queue
+ queue = l.queue
+ self.logger.debug ('processing at least {} queue items'.format (len (queue)))
+ while True:
+ now = time.time ()
+ elapsed = now-start
+ maxTimeout = max (min (self.settings.idleTimeout, self.settings.timeout-elapsed), 0)
+ self.logger.debug ('max timeout is {} with elapsed {}'.format (maxTimeout, elapsed))
+ # skip waiting if there is work to do. processes all items in
+ # queue, regardless of timeouts, i.e. you need to make sure the
+ # queue will actually be empty at some point.
+ if len (queue) == 0:
+ if not l.notify.wait (maxTimeout):
+ assert len (queue) == 0, "event must be sent"
+ # timed out
+ self.logger.debug ('timed out after {}'.format (elapsed))
+ break
+ else:
+ l.notify.clear ()
+ # limit number of items processed here, otherwise timeout won’t
+ # be checked frequently. this can happen if the site quickly
+ # loads a lot of items.
+ for i in range (1000):
+ try:
+ item = queue.popleft ()
+ self.logger.debug ('queue pop: {!r}, len now {}'.format (item, len (queue)))
+ except IndexError:
+ break
+ self.processItem (item)
+ if maxTimeout == 0:
+ break
+
+ with self.service as browser, SiteLoader (browser, self.url, logger=self.logger) as l:
+ start = time.time ()
+
+ version = l.tab.Browser.getVersion ()
+ payload = {
+ 'software': __package__,
+ 'browser': version['product'],
+ 'useragent': version['userAgent'],
+ 'viewport': getFormattedViewportMetrics (l.tab),
+ }
+ self.processItem (ControllerStart (payload))
+
+ # not all behavior scripts are allowed for every URL, filter them
+ enabledBehavior = list (filter (lambda x: self.url in x,
+ map (lambda x: x (l), self.behavior)))
+
+ for b in enabledBehavior:
+ self.logger.debug ('starting onload {}'.format (b))
+ # I decided against using the queue here to limit memory
+ # usage (screenshot behavior would put all images into
+ # queue before we could process them)
+ for item in b.onload ():
+ self.processItem (item)
+ l.start ()
+
+ processQueue ()
+
+ for b in enabledBehavior:
+ self.logger.debug ('starting onstop {}'.format (b))
+ for item in b.onstop ():
+ self.processItem (item)
+
+ # if we stopped due to timeout, wait for remaining assets
+ processQueue ()
+
+ for b in enabledBehavior:
+ self.logger.debug ('starting onfinish {}'.format (b))
+ for item in b.onfinish ():
+ self.processItem (item)
+
+ processQueue ()
class RecursionPolicy:
""" Abstract recursion policy """
@@ -172,7 +224,9 @@ def removeFragment (u):
s = urlsplit (u)
return urlunsplit ((s.scheme, s.netloc, s.path, s.query, ''))
-class RecursiveController:
+from .behavior import ExtractLinksEvent
+
+class RecursiveController (EventHandler):
"""
Simple recursive controller
@@ -181,7 +235,7 @@ class RecursiveController:
def __init__ (self, url, output, service=ChromeService (), behavior=cbehavior.available, \
logger=logging.getLogger(__name__), settings=defaultSettings,
- recursionPolicy=DepthLimit (0)):
+ recursionPolicy=DepthLimit (0), handler=[]):
self.url = url
self.output = output
self.service = service
@@ -189,37 +243,45 @@ class RecursiveController:
self.settings = settings
self.logger = logger
self.recursionPolicy = recursionPolicy
+ self.handler = handler
+ self.handler.append (self)
def fetch (self, urls):
"""
Overrideable fetch action for URLs. Defaults to sequential
SinglePageController.
"""
- result = []
for u in urls:
- c = SinglePageController (u, self.output, self.service,
- self.behavior, self.logger, self.settings)
- result.append (c.run ())
- return result
+ try:
+ c = SinglePageController (u, self.output, self.service,
+ self.behavior, self.logger, self.settings, self.handler)
+ c.run ()
+ except BrowserCrashed:
+ # this is fine if reported
+ self.logger.error ('browser crashed for {}'.format (u))
def run (self):
- have = set ()
- urls = set ([self.url])
- ret = {'stats': IntegerDict ()}
-
- while urls:
- self.logger.info ('retrieving {} urls'.format (len (urls)))
- result = self.fetch (urls)
-
- have.update (urls)
- urls = set ()
- for r in result:
- ret['stats'] += r['stats']
- urls.update (map (removeFragment, r['links']))
- urls.difference_update (have)
-
- urls = self.recursionPolicy (urls)
- # everything in ret must be serializeable
- ret['stats'] = dict (ret['stats'])
- return ret
+ self.have = set ()
+ self.urls = set ([self.url])
+
+ while self.urls:
+ self.logger.info ('retrieving {} urls'.format (len (self.urls)))
+
+ self.have.update (self.urls)
+ fetchurls = self.urls
+ self.urls = set ()
+
+ # handler appends new urls to self.urls through push()
+ self.fetch (fetchurls)
+
+ # remove urls we have and apply recursion policy
+ self.urls.difference_update (self.have)
+ self.urls = self.recursionPolicy (self.urls)
+
+ def push (self, item):
+ if isinstance (item, ExtractLinksEvent):
+ self.logger.debug ('adding extracted links: {}'.format (item.links))
+ self.urls.update (map (removeFragment, item.links))
+ else:
+ self.logger.debug ('{} got unhandled event {!r}'.format (self, item))