diff options
| -rw-r--r-- | README.rst | 2 | ||||
| -rw-r--r-- | contrib/celerycrocoite.py | 27 | ||||
| -rw-r--r-- | crocoite/cli.py | 135 | ||||
| -rw-r--r-- | crocoite/controller.py | 103 | ||||
| -rw-r--r-- | crocoite/defaults.py | 27 | ||||
| -rw-r--r-- | crocoite/task.py | 71 | ||||
| -rw-r--r-- | crocoite/warc.py | 6 | 
7 files changed, 211 insertions, 160 deletions
| @@ -79,7 +79,7 @@ Configure using celeryconfig.py  Start a Celery worker:: -    celery -A crocoite.cli worker --loglevel=info +    celery -A crocoite.task worker --loglevel=info  Then queue archive job:: diff --git a/contrib/celerycrocoite.py b/contrib/celerycrocoite.py index c3a67ae..3d8c786 100644 --- a/contrib/celerycrocoite.py +++ b/contrib/celerycrocoite.py @@ -32,7 +32,8 @@ from threading import Thread  from queue import Queue  import queue -from crocoite import behavior, cli, defaults +from crocoite import behavior, task +from crocoite.controller import defaultSettings  def prettyTimeDelta (seconds):      """ @@ -125,14 +126,14 @@ def celeryWorker (bot, q):              break          action, trigger, args = item          if action == 'ao': -            handle = cli.archive.delay (**args) +            handle = task.archive.delay (**args)              j = jobs[handle.id] = {'handle': handle, 'trigger': trigger, 'args': args}              # pretty-print a few selected args              showargs = { -                    'idleTimeout': prettyTimeDelta (args['idleTimeout']), -                    'timeout': prettyTimeDelta (args['timeout']), -                    'maxBodySize': prettyBytes (args['maxBodySize']), +                    'idleTimeout': prettyTimeDelta (args['settings']['idleTimeout']), +                    'timeout': prettyTimeDelta (args['settings']['timeout']), +                    'maxBodySize': prettyBytes (args['settings']['maxBodySize']),                      }              strargs = ', '.join (map (lambda x: '{}={}'.format (*x), showargs.items ()))              bot.msg (trigger.sender, '{}: {} has been queued as {} with {}'.format (trigger.nick, args['url'], handle.id, strargs)) @@ -173,16 +174,12 @@ def archive (bot, trigger):          return      blacklistedBehavior = {'domSnapshot', 'screenshot'} -    args = { -            'url': url, -            'output': None, -            'enabledBehaviorNames': list (behavior.availableNames-blacklistedBehavior), -            'browser': None, -            'logBuffer': defaults.logBuffer, -            'maxBodySize': defaults.maxBodySize, -            'idleTimeout': 10, -            'timeout': 1*60*60, # 1 hour -            } +    settings = dict (maxBodySize=defaultSettings.maxBodySize, +            logBuffer=defaultSettings.logBuffer, idleTimeout=10, +            timeout=1*60*60) +    args = dict (url=url, +            enabledBehaviorNames=list (behavior.availableNames-blacklistedBehavior), +            settings=settings)      q = bot.memory['crocoite']['q']      q.put_nowait (('ao', trigger, args)) diff --git a/crocoite/cli.py b/crocoite/cli.py index 2cbbfa8..cac5b3b 100644 --- a/crocoite/cli.py +++ b/crocoite/cli.py @@ -19,112 +19,13 @@  # THE SOFTWARE.  """ -Standalone and Celery command line interface +Command line interface  """ -import os, logging, argparse -from io import BytesIO -from datetime import datetime -import pychrome -from urllib.parse import urlsplit +import logging, argparse -from celery import Celery -from celery.utils.log import get_task_logger - -from . import behavior, defaults -from .warc import WarcLoader, SerializingWARCWriter -from .browser import ChromeService, NullService -from .util import packageUrl, getFormattedViewportMetrics - -app = Celery ('crocoite.distributed') -app.config_from_object('celeryconfig') -logger = get_task_logger('crocoite.distributed.archive') - -# defaults can be changed below using argparse; track started state, because tasks are usually long-running -@app.task(bind=True, track_started=True) -def archive (self, url, output, browser, logBuffer, maxBodySize, idleTimeout, -        timeout, enabledBehaviorNames): -    """ -    Archive a single URL - -    Supports these config keys (celeryconfig): - -    warc_filename = '{domain}-{date}-{id}.warc.gz' -    temp_dir = '/tmp/' -    finished_dir = '/tmp/finished' -    """ - -    ret = {'stats': None} - -    self.update_state (state='PROGRESS', meta={'step': 'start'}) - -    service = ChromeService () -    if browser: -        service = NullService (browser) - -    allBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available)) - -    with service as browser: -        browser = pychrome.Browser(url=browser) - -        if not output: -            parsedUrl = urlsplit (url) -            outFile = app.conf.warc_filename.format ( -                            id=self.request.id, -                            domain=parsedUrl.hostname.replace ('/', '-'), -                            date=datetime.utcnow ().isoformat (), -                            ) -            outPath = os.path.join (app.conf.temp_dir, outFile) -            fd = open (outPath, 'wb') -        else: -            fd = open (output, 'wb') -        writer = SerializingWARCWriter (fd, gzip=True) - -        with WarcLoader (browser, url, writer, logBuffer=logBuffer, -                maxBodySize=maxBodySize) as l: -            version = l.tab.Browser.getVersion () -            payload = { -                    'software': __package__, -                    'browser': version['product'], -                    'useragent': version['userAgent'], -                    'viewport': getFormattedViewportMetrics (l.tab), -                    } -            warcinfo = writer.create_warcinfo_record (filename=None, info=payload) -            writer.write_record (warcinfo) - -            # not all behavior scripts are allowed for every URL, filter them -            enabledBehavior = list (filter (lambda x: url in x, -                    map (lambda x: x (l), allBehavior))) - -            self.update_state (state='PROGRESS', meta={'step': 'onload'}) -            for b in enabledBehavior: -                logger.debug ('starting onload behavior {}'.format (b.name)) -                b.onload () -            l.start () - -            self.update_state (state='PROGRESS', meta={'step': 'fetch'}) -            l.waitIdle (idleTimeout, timeout) - -            self.update_state (state='PROGRESS', meta={'step': 'onstop'}) -            for b in enabledBehavior: -                logger.debug ('starting onstop behavior {}'.format (b.name)) -                b.onstop () - -            # if we stopped due to timeout, wait for remaining assets -            l.waitIdle (2, 60) -            l.stop () - -            self.update_state (state='PROGRESS', meta={'step': 'onfinish'}) -            for b in enabledBehavior: -                logger.debug ('starting onfinish behavior {}'.format (b.name)) -                b.onfinish () - -            ret['stats'] = l.stats -        writer.flush () -    if not output: -        outPath = os.path.join (app.conf.finished_dir, outFile) -        os.rename (fd.name, outPath) -    return ret +from . import behavior +from .controller import SinglePageController, defaultSettings, ControllerSettings  def stateCallback (data):      result = data['result'] @@ -134,34 +35,40 @@ def stateCallback (data):  def main ():      parser = argparse.ArgumentParser(description='Save website to WARC using Google Chrome.')      parser.add_argument('--browser', help='DevTools URL', metavar='URL') -    parser.add_argument('--distributed', help='Use celery worker', action='store_true')      parser.add_argument('--timeout', default=10, type=int, help='Maximum time for archival', metavar='SEC')      parser.add_argument('--idle-timeout', default=2, type=int, help='Maximum idle seconds (i.e. no requests)', dest='idleTimeout', metavar='SEC') -    parser.add_argument('--log-buffer', default=defaults.logBuffer, type=int, dest='logBuffer', metavar='LINES') -    parser.add_argument('--max-body-size', default=defaults.maxBodySize, type=int, dest='maxBodySize', help='Max body size', metavar='BYTES') -    #parser.add_argument('--keep-tab', action='store_true', default=False, dest='keepTab', help='Keep tab open') +    parser.add_argument('--log-buffer', default=defaultSettings.logBuffer, type=int, dest='logBuffer', metavar='LINES') +    parser.add_argument('--max-body-size', default=defaultSettings.maxBodySize, type=int, dest='maxBodySize', help='Max body size', metavar='BYTES')      parser.add_argument('--behavior', help='Comma-separated list of enabled behavior scripts',              dest='enabledBehaviorNames',              default=list (behavior.availableNames),              choices=list (behavior.availableNames)) +    group = parser.add_mutually_exclusive_group (required=True) +    group.add_argument('--output', help='WARC filename', metavar='FILE') +    group.add_argument('--distributed', help='Use celery worker', action='store_true')      parser.add_argument('url', help='Website URL') -    parser.add_argument('output', help='WARC filename')      args = parser.parse_args ()      # prepare args for function      distributed = args.distributed -    passArgs = vars (args) -    del passArgs['distributed']      if distributed: -        result = archive.delay (**passArgs) +        from .task import archive +        settings = dict (maxBodySize=args.maxBodySize, +                logBuffer=args.logBuffer, idleTimeout=args.idleTimeout, +                timeout=args.timeout) +        result = archive.delay (url=args.url, settings=settings, +                enabledBehaviorNames=args.enabledBehaviorNames)          r = result.get (on_message=stateCallback)      else: -        # XXX: local evaluation does not init celery logging?          logging.basicConfig (level=logging.INFO) -        r = archive (**passArgs) -    print (r['stats']) +        settings = ControllerSettings (maxBodySize=args.maxBodySize, +                logBuffer=args.logBuffer, idleTimeout=args.idleTimeout, +                timeout=args.timeout) +        with open (args.output, 'wb') as fd: +            controller = SinglePageController (args.url, fd, settings=settings) +            controller.run ()      return True diff --git a/crocoite/controller.py b/crocoite/controller.py new file mode 100644 index 0000000..a338559 --- /dev/null +++ b/crocoite/controller.py @@ -0,0 +1,103 @@ +# Copyright (c) 2017–2018 crocoite contributors +#  +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +#  +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +#  +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +""" +Controller classes, handling actions required for archival +""" + +class ControllerSettings: +    def __init__ (self, logBuffer=1000, maxBodySize=50*1024*1024, idleTimeout=2, timeout=10): +        self.logBuffer = logBuffer +        self.maxBodySize = maxBodySize +        self.idleTimeout = idleTimeout +        self.timeout = timeout + +defaultSettings = ControllerSettings () + +import logging + +import pychrome + +from . import behavior as cbehavior +from .browser import ChromeService +from .warc import WarcLoader, SerializingWARCWriter +from .util import getFormattedViewportMetrics + +class SinglePageController: +    """ +    Archive a single page url to file output. +    """ + +    def __init__ (self, url, output, service=ChromeService (), behavior=cbehavior.available, \ +            logger=logging.getLogger(__name__), settings=defaultSettings): +        self.url = url +        self.output = output +        self.service = service +        self.behavior = behavior +        self.settings = settings +        self.logger = logger + +    def run (self): +        ret = {'stats': None} + +        with self.service as browser: +            browser = pychrome.Browser (url=browser) +            writer = SerializingWARCWriter (self.output, gzip=True) + +            with WarcLoader (browser, self.url, writer, +                    logBuffer=self.settings.logBuffer, +                    maxBodySize=self.settings.maxBodySize) as l: +                version = l.tab.Browser.getVersion () +                payload = { +                        'software': __package__, +                        'browser': version['product'], +                        'useragent': version['userAgent'], +                        'viewport': getFormattedViewportMetrics (l.tab), +                        } +                warcinfo = writer.create_warcinfo_record (filename=None, info=payload) +                writer.write_record (warcinfo) + +                # not all behavior scripts are allowed for every URL, filter them +                enabledBehavior = list (filter (lambda x: self.url in x, +                        map (lambda x: x (l), self.behavior))) + +                for b in enabledBehavior: +                    self.logger.debug ('starting onload behavior {}'.format (b.name)) +                    b.onload () +                l.start () + +                l.waitIdle (self.settings.idleTimeout, self.settings.timeout) + +                for b in enabledBehavior: +                    self.logger.debug ('starting onstop behavior {}'.format (b.name)) +                    b.onstop () + +                # if we stopped due to timeout, wait for remaining assets +                l.waitIdle (2, 60) +                l.stop () + +                for b in enabledBehavior: +                    self.logger.debug ('starting onfinish behavior {}'.format (b.name)) +                    b.onfinish () + +                ret['stats'] = l.stats +            writer.flush () +        return ret + diff --git a/crocoite/defaults.py b/crocoite/defaults.py deleted file mode 100644 index d55312d..0000000 --- a/crocoite/defaults.py +++ /dev/null @@ -1,27 +0,0 @@ -# Copyright (c) 2017 crocoite contributors -#  -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -#  -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -#  -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -""" -Defaults settings -""" - -maxBodySize = 50*1024*1024 -logBuffer = 1000 - diff --git a/crocoite/task.py b/crocoite/task.py new file mode 100644 index 0000000..39900a5 --- /dev/null +++ b/crocoite/task.py @@ -0,0 +1,71 @@ +# Copyright (c) 2017–2018 crocoite contributors +#  +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +#  +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +#  +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +""" +Celery distributed tasks +""" + +import os + +from urllib.parse import urlsplit +from datetime import datetime + +from celery import Celery +from celery.utils.log import get_task_logger + +from .controller import SinglePageController, ControllerSettings +from . import behavior + +app = Celery ('crocoite.distributed') +app.config_from_object('celeryconfig') +logger = get_task_logger('crocoite.distributed.archive') + +@app.task(bind=True, track_started=True) +def archive (self, url, settings, enabledBehaviorNames): +    """ +    Archive a single URL + +    Supports these config keys (celeryconfig): + +    warc_filename = '{domain}-{date}-{id}.warc.gz' +    temp_dir = '/tmp/' +    finished_dir = '/tmp/finished' +    """ + +    parsedUrl = urlsplit (url) +    outFile = app.conf.warc_filename.format ( +                    id=self.request.id, +                    domain=parsedUrl.hostname.replace ('/', '-'), +                    date=datetime.utcnow ().isoformat (), +                    ) +    outPath = os.path.join (app.conf.temp_dir, outFile) +    fd = open (outPath, 'wb') + +    enabledBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available)) +    settings = ControllerSettings (**settings) +    controller = SinglePageController (url, fd, behavior=enabledBehavior, settings=settings) +    ret = controller.run () + +    os.makedirs (app.conf.finished_dir, exist_ok=True) +    outPath = os.path.join (app.conf.finished_dir, outFile) +    os.rename (fd.name, outPath) + +    return ret + diff --git a/crocoite/warc.py b/crocoite/warc.py index 9c96900..3fd65e4 100644 --- a/crocoite/warc.py +++ b/crocoite/warc.py @@ -38,7 +38,7 @@ from warcio.warcwriter import WARCWriter  from .browser import AccountingSiteLoader  from .util import packageUrl -from . import defaults +from .controller import defaultSettings  class SerializingWARCWriter (WARCWriter):      """ @@ -103,8 +103,8 @@ class WARCLogHandler (BufferingHandler):  class WarcLoader (AccountingSiteLoader):      def __init__ (self, browser, url, writer,              logger=logging.getLogger(__name__), -            logBuffer=defaults.logBuffer, -            maxBodySize=defaults.maxBodySize): +            logBuffer=defaultSettings.logBuffer, +            maxBodySize=defaultSettings.maxBodySize):          super ().__init__ (browser, url, logger)          self.writer = writer          self.maxBodySize = maxBodySize | 
