diff options
author | Lars-Dominik Braun <lars@6xq.net> | 2018-04-29 10:44:33 +0200 |
---|---|---|
committer | Lars-Dominik Braun <lars@6xq.net> | 2018-05-04 16:00:05 +0200 |
commit | b30a44cfe9456deafc83e008f8501c391cd1e258 (patch) | |
tree | 91bd6a9f431b4155047e0f024fe9f5665766804e | |
parent | d15b498505dc0362fbd7e92bf7ba2945cad5a118 (diff) | |
download | crocoite-b30a44cfe9456deafc83e008f8501c391cd1e258.tar.gz crocoite-b30a44cfe9456deafc83e008f8501c391cd1e258.tar.bz2 crocoite-b30a44cfe9456deafc83e008f8501c391cd1e258.zip |
Move page archiving logic to SinglePageController
In preparation for recursive crawls.
-rw-r--r-- | README.rst | 2 | ||||
-rw-r--r-- | contrib/celerycrocoite.py | 27 | ||||
-rw-r--r-- | crocoite/cli.py | 135 | ||||
-rw-r--r-- | crocoite/controller.py | 103 | ||||
-rw-r--r-- | crocoite/defaults.py | 27 | ||||
-rw-r--r-- | crocoite/task.py | 71 | ||||
-rw-r--r-- | crocoite/warc.py | 6 |
7 files changed, 211 insertions, 160 deletions
@@ -79,7 +79,7 @@ Configure using celeryconfig.py Start a Celery worker:: - celery -A crocoite.cli worker --loglevel=info + celery -A crocoite.task worker --loglevel=info Then queue archive job:: diff --git a/contrib/celerycrocoite.py b/contrib/celerycrocoite.py index c3a67ae..3d8c786 100644 --- a/contrib/celerycrocoite.py +++ b/contrib/celerycrocoite.py @@ -32,7 +32,8 @@ from threading import Thread from queue import Queue import queue -from crocoite import behavior, cli, defaults +from crocoite import behavior, task +from crocoite.controller import defaultSettings def prettyTimeDelta (seconds): """ @@ -125,14 +126,14 @@ def celeryWorker (bot, q): break action, trigger, args = item if action == 'ao': - handle = cli.archive.delay (**args) + handle = task.archive.delay (**args) j = jobs[handle.id] = {'handle': handle, 'trigger': trigger, 'args': args} # pretty-print a few selected args showargs = { - 'idleTimeout': prettyTimeDelta (args['idleTimeout']), - 'timeout': prettyTimeDelta (args['timeout']), - 'maxBodySize': prettyBytes (args['maxBodySize']), + 'idleTimeout': prettyTimeDelta (args['settings']['idleTimeout']), + 'timeout': prettyTimeDelta (args['settings']['timeout']), + 'maxBodySize': prettyBytes (args['settings']['maxBodySize']), } strargs = ', '.join (map (lambda x: '{}={}'.format (*x), showargs.items ())) bot.msg (trigger.sender, '{}: {} has been queued as {} with {}'.format (trigger.nick, args['url'], handle.id, strargs)) @@ -173,16 +174,12 @@ def archive (bot, trigger): return blacklistedBehavior = {'domSnapshot', 'screenshot'} - args = { - 'url': url, - 'output': None, - 'enabledBehaviorNames': list (behavior.availableNames-blacklistedBehavior), - 'browser': None, - 'logBuffer': defaults.logBuffer, - 'maxBodySize': defaults.maxBodySize, - 'idleTimeout': 10, - 'timeout': 1*60*60, # 1 hour - } + settings = dict (maxBodySize=defaultSettings.maxBodySize, + logBuffer=defaultSettings.logBuffer, idleTimeout=10, + timeout=1*60*60) + args = dict (url=url, + enabledBehaviorNames=list (behavior.availableNames-blacklistedBehavior), + settings=settings) q = bot.memory['crocoite']['q'] q.put_nowait (('ao', trigger, args)) diff --git a/crocoite/cli.py b/crocoite/cli.py index 2cbbfa8..cac5b3b 100644 --- a/crocoite/cli.py +++ b/crocoite/cli.py @@ -19,112 +19,13 @@ # THE SOFTWARE. """ -Standalone and Celery command line interface +Command line interface """ -import os, logging, argparse -from io import BytesIO -from datetime import datetime -import pychrome -from urllib.parse import urlsplit +import logging, argparse -from celery import Celery -from celery.utils.log import get_task_logger - -from . import behavior, defaults -from .warc import WarcLoader, SerializingWARCWriter -from .browser import ChromeService, NullService -from .util import packageUrl, getFormattedViewportMetrics - -app = Celery ('crocoite.distributed') -app.config_from_object('celeryconfig') -logger = get_task_logger('crocoite.distributed.archive') - -# defaults can be changed below using argparse; track started state, because tasks are usually long-running -@app.task(bind=True, track_started=True) -def archive (self, url, output, browser, logBuffer, maxBodySize, idleTimeout, - timeout, enabledBehaviorNames): - """ - Archive a single URL - - Supports these config keys (celeryconfig): - - warc_filename = '{domain}-{date}-{id}.warc.gz' - temp_dir = '/tmp/' - finished_dir = '/tmp/finished' - """ - - ret = {'stats': None} - - self.update_state (state='PROGRESS', meta={'step': 'start'}) - - service = ChromeService () - if browser: - service = NullService (browser) - - allBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available)) - - with service as browser: - browser = pychrome.Browser(url=browser) - - if not output: - parsedUrl = urlsplit (url) - outFile = app.conf.warc_filename.format ( - id=self.request.id, - domain=parsedUrl.hostname.replace ('/', '-'), - date=datetime.utcnow ().isoformat (), - ) - outPath = os.path.join (app.conf.temp_dir, outFile) - fd = open (outPath, 'wb') - else: - fd = open (output, 'wb') - writer = SerializingWARCWriter (fd, gzip=True) - - with WarcLoader (browser, url, writer, logBuffer=logBuffer, - maxBodySize=maxBodySize) as l: - version = l.tab.Browser.getVersion () - payload = { - 'software': __package__, - 'browser': version['product'], - 'useragent': version['userAgent'], - 'viewport': getFormattedViewportMetrics (l.tab), - } - warcinfo = writer.create_warcinfo_record (filename=None, info=payload) - writer.write_record (warcinfo) - - # not all behavior scripts are allowed for every URL, filter them - enabledBehavior = list (filter (lambda x: url in x, - map (lambda x: x (l), allBehavior))) - - self.update_state (state='PROGRESS', meta={'step': 'onload'}) - for b in enabledBehavior: - logger.debug ('starting onload behavior {}'.format (b.name)) - b.onload () - l.start () - - self.update_state (state='PROGRESS', meta={'step': 'fetch'}) - l.waitIdle (idleTimeout, timeout) - - self.update_state (state='PROGRESS', meta={'step': 'onstop'}) - for b in enabledBehavior: - logger.debug ('starting onstop behavior {}'.format (b.name)) - b.onstop () - - # if we stopped due to timeout, wait for remaining assets - l.waitIdle (2, 60) - l.stop () - - self.update_state (state='PROGRESS', meta={'step': 'onfinish'}) - for b in enabledBehavior: - logger.debug ('starting onfinish behavior {}'.format (b.name)) - b.onfinish () - - ret['stats'] = l.stats - writer.flush () - if not output: - outPath = os.path.join (app.conf.finished_dir, outFile) - os.rename (fd.name, outPath) - return ret +from . import behavior +from .controller import SinglePageController, defaultSettings, ControllerSettings def stateCallback (data): result = data['result'] @@ -134,34 +35,40 @@ def stateCallback (data): def main (): parser = argparse.ArgumentParser(description='Save website to WARC using Google Chrome.') parser.add_argument('--browser', help='DevTools URL', metavar='URL') - parser.add_argument('--distributed', help='Use celery worker', action='store_true') parser.add_argument('--timeout', default=10, type=int, help='Maximum time for archival', metavar='SEC') parser.add_argument('--idle-timeout', default=2, type=int, help='Maximum idle seconds (i.e. no requests)', dest='idleTimeout', metavar='SEC') - parser.add_argument('--log-buffer', default=defaults.logBuffer, type=int, dest='logBuffer', metavar='LINES') - parser.add_argument('--max-body-size', default=defaults.maxBodySize, type=int, dest='maxBodySize', help='Max body size', metavar='BYTES') - #parser.add_argument('--keep-tab', action='store_true', default=False, dest='keepTab', help='Keep tab open') + parser.add_argument('--log-buffer', default=defaultSettings.logBuffer, type=int, dest='logBuffer', metavar='LINES') + parser.add_argument('--max-body-size', default=defaultSettings.maxBodySize, type=int, dest='maxBodySize', help='Max body size', metavar='BYTES') parser.add_argument('--behavior', help='Comma-separated list of enabled behavior scripts', dest='enabledBehaviorNames', default=list (behavior.availableNames), choices=list (behavior.availableNames)) + group = parser.add_mutually_exclusive_group (required=True) + group.add_argument('--output', help='WARC filename', metavar='FILE') + group.add_argument('--distributed', help='Use celery worker', action='store_true') parser.add_argument('url', help='Website URL') - parser.add_argument('output', help='WARC filename') args = parser.parse_args () # prepare args for function distributed = args.distributed - passArgs = vars (args) - del passArgs['distributed'] if distributed: - result = archive.delay (**passArgs) + from .task import archive + settings = dict (maxBodySize=args.maxBodySize, + logBuffer=args.logBuffer, idleTimeout=args.idleTimeout, + timeout=args.timeout) + result = archive.delay (url=args.url, settings=settings, + enabledBehaviorNames=args.enabledBehaviorNames) r = result.get (on_message=stateCallback) else: - # XXX: local evaluation does not init celery logging? logging.basicConfig (level=logging.INFO) - r = archive (**passArgs) - print (r['stats']) + settings = ControllerSettings (maxBodySize=args.maxBodySize, + logBuffer=args.logBuffer, idleTimeout=args.idleTimeout, + timeout=args.timeout) + with open (args.output, 'wb') as fd: + controller = SinglePageController (args.url, fd, settings=settings) + controller.run () return True diff --git a/crocoite/controller.py b/crocoite/controller.py new file mode 100644 index 0000000..a338559 --- /dev/null +++ b/crocoite/controller.py @@ -0,0 +1,103 @@ +# Copyright (c) 2017–2018 crocoite contributors +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +""" +Controller classes, handling actions required for archival +""" + +class ControllerSettings: + def __init__ (self, logBuffer=1000, maxBodySize=50*1024*1024, idleTimeout=2, timeout=10): + self.logBuffer = logBuffer + self.maxBodySize = maxBodySize + self.idleTimeout = idleTimeout + self.timeout = timeout + +defaultSettings = ControllerSettings () + +import logging + +import pychrome + +from . import behavior as cbehavior +from .browser import ChromeService +from .warc import WarcLoader, SerializingWARCWriter +from .util import getFormattedViewportMetrics + +class SinglePageController: + """ + Archive a single page url to file output. + """ + + def __init__ (self, url, output, service=ChromeService (), behavior=cbehavior.available, \ + logger=logging.getLogger(__name__), settings=defaultSettings): + self.url = url + self.output = output + self.service = service + self.behavior = behavior + self.settings = settings + self.logger = logger + + def run (self): + ret = {'stats': None} + + with self.service as browser: + browser = pychrome.Browser (url=browser) + writer = SerializingWARCWriter (self.output, gzip=True) + + with WarcLoader (browser, self.url, writer, + logBuffer=self.settings.logBuffer, + maxBodySize=self.settings.maxBodySize) as l: + version = l.tab.Browser.getVersion () + payload = { + 'software': __package__, + 'browser': version['product'], + 'useragent': version['userAgent'], + 'viewport': getFormattedViewportMetrics (l.tab), + } + warcinfo = writer.create_warcinfo_record (filename=None, info=payload) + writer.write_record (warcinfo) + + # not all behavior scripts are allowed for every URL, filter them + enabledBehavior = list (filter (lambda x: self.url in x, + map (lambda x: x (l), self.behavior))) + + for b in enabledBehavior: + self.logger.debug ('starting onload behavior {}'.format (b.name)) + b.onload () + l.start () + + l.waitIdle (self.settings.idleTimeout, self.settings.timeout) + + for b in enabledBehavior: + self.logger.debug ('starting onstop behavior {}'.format (b.name)) + b.onstop () + + # if we stopped due to timeout, wait for remaining assets + l.waitIdle (2, 60) + l.stop () + + for b in enabledBehavior: + self.logger.debug ('starting onfinish behavior {}'.format (b.name)) + b.onfinish () + + ret['stats'] = l.stats + writer.flush () + return ret + diff --git a/crocoite/defaults.py b/crocoite/defaults.py deleted file mode 100644 index d55312d..0000000 --- a/crocoite/defaults.py +++ /dev/null @@ -1,27 +0,0 @@ -# Copyright (c) 2017 crocoite contributors -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -""" -Defaults settings -""" - -maxBodySize = 50*1024*1024 -logBuffer = 1000 - diff --git a/crocoite/task.py b/crocoite/task.py new file mode 100644 index 0000000..39900a5 --- /dev/null +++ b/crocoite/task.py @@ -0,0 +1,71 @@ +# Copyright (c) 2017–2018 crocoite contributors +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +""" +Celery distributed tasks +""" + +import os + +from urllib.parse import urlsplit +from datetime import datetime + +from celery import Celery +from celery.utils.log import get_task_logger + +from .controller import SinglePageController, ControllerSettings +from . import behavior + +app = Celery ('crocoite.distributed') +app.config_from_object('celeryconfig') +logger = get_task_logger('crocoite.distributed.archive') + +@app.task(bind=True, track_started=True) +def archive (self, url, settings, enabledBehaviorNames): + """ + Archive a single URL + + Supports these config keys (celeryconfig): + + warc_filename = '{domain}-{date}-{id}.warc.gz' + temp_dir = '/tmp/' + finished_dir = '/tmp/finished' + """ + + parsedUrl = urlsplit (url) + outFile = app.conf.warc_filename.format ( + id=self.request.id, + domain=parsedUrl.hostname.replace ('/', '-'), + date=datetime.utcnow ().isoformat (), + ) + outPath = os.path.join (app.conf.temp_dir, outFile) + fd = open (outPath, 'wb') + + enabledBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available)) + settings = ControllerSettings (**settings) + controller = SinglePageController (url, fd, behavior=enabledBehavior, settings=settings) + ret = controller.run () + + os.makedirs (app.conf.finished_dir, exist_ok=True) + outPath = os.path.join (app.conf.finished_dir, outFile) + os.rename (fd.name, outPath) + + return ret + diff --git a/crocoite/warc.py b/crocoite/warc.py index 9c96900..3fd65e4 100644 --- a/crocoite/warc.py +++ b/crocoite/warc.py @@ -38,7 +38,7 @@ from warcio.warcwriter import WARCWriter from .browser import AccountingSiteLoader from .util import packageUrl -from . import defaults +from .controller import defaultSettings class SerializingWARCWriter (WARCWriter): """ @@ -103,8 +103,8 @@ class WARCLogHandler (BufferingHandler): class WarcLoader (AccountingSiteLoader): def __init__ (self, browser, url, writer, logger=logging.getLogger(__name__), - logBuffer=defaults.logBuffer, - maxBodySize=defaults.maxBodySize): + logBuffer=defaultSettings.logBuffer, + maxBodySize=defaultSettings.maxBodySize): super ().__init__ (browser, url, logger) self.writer = writer self.maxBodySize = maxBodySize |