From b30a44cfe9456deafc83e008f8501c391cd1e258 Mon Sep 17 00:00:00 2001 From: Lars-Dominik Braun Date: Sun, 29 Apr 2018 10:44:33 +0200 Subject: Move page archiving logic to SinglePageController In preparation for recursive crawls. --- crocoite/cli.py | 135 ++++++++----------------------------------------- crocoite/controller.py | 103 +++++++++++++++++++++++++++++++++++++ crocoite/defaults.py | 27 ---------- crocoite/task.py | 71 ++++++++++++++++++++++++++ crocoite/warc.py | 6 +-- 5 files changed, 198 insertions(+), 144 deletions(-) create mode 100644 crocoite/controller.py delete mode 100644 crocoite/defaults.py create mode 100644 crocoite/task.py (limited to 'crocoite') diff --git a/crocoite/cli.py b/crocoite/cli.py index 2cbbfa8..cac5b3b 100644 --- a/crocoite/cli.py +++ b/crocoite/cli.py @@ -19,112 +19,13 @@ # THE SOFTWARE. """ -Standalone and Celery command line interface +Command line interface """ -import os, logging, argparse -from io import BytesIO -from datetime import datetime -import pychrome -from urllib.parse import urlsplit +import logging, argparse -from celery import Celery -from celery.utils.log import get_task_logger - -from . import behavior, defaults -from .warc import WarcLoader, SerializingWARCWriter -from .browser import ChromeService, NullService -from .util import packageUrl, getFormattedViewportMetrics - -app = Celery ('crocoite.distributed') -app.config_from_object('celeryconfig') -logger = get_task_logger('crocoite.distributed.archive') - -# defaults can be changed below using argparse; track started state, because tasks are usually long-running -@app.task(bind=True, track_started=True) -def archive (self, url, output, browser, logBuffer, maxBodySize, idleTimeout, - timeout, enabledBehaviorNames): - """ - Archive a single URL - - Supports these config keys (celeryconfig): - - warc_filename = '{domain}-{date}-{id}.warc.gz' - temp_dir = '/tmp/' - finished_dir = '/tmp/finished' - """ - - ret = {'stats': None} - - self.update_state (state='PROGRESS', meta={'step': 'start'}) - - service = ChromeService () - if browser: - service = NullService (browser) - - allBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available)) - - with service as browser: - browser = pychrome.Browser(url=browser) - - if not output: - parsedUrl = urlsplit (url) - outFile = app.conf.warc_filename.format ( - id=self.request.id, - domain=parsedUrl.hostname.replace ('/', '-'), - date=datetime.utcnow ().isoformat (), - ) - outPath = os.path.join (app.conf.temp_dir, outFile) - fd = open (outPath, 'wb') - else: - fd = open (output, 'wb') - writer = SerializingWARCWriter (fd, gzip=True) - - with WarcLoader (browser, url, writer, logBuffer=logBuffer, - maxBodySize=maxBodySize) as l: - version = l.tab.Browser.getVersion () - payload = { - 'software': __package__, - 'browser': version['product'], - 'useragent': version['userAgent'], - 'viewport': getFormattedViewportMetrics (l.tab), - } - warcinfo = writer.create_warcinfo_record (filename=None, info=payload) - writer.write_record (warcinfo) - - # not all behavior scripts are allowed for every URL, filter them - enabledBehavior = list (filter (lambda x: url in x, - map (lambda x: x (l), allBehavior))) - - self.update_state (state='PROGRESS', meta={'step': 'onload'}) - for b in enabledBehavior: - logger.debug ('starting onload behavior {}'.format (b.name)) - b.onload () - l.start () - - self.update_state (state='PROGRESS', meta={'step': 'fetch'}) - l.waitIdle (idleTimeout, timeout) - - self.update_state (state='PROGRESS', meta={'step': 'onstop'}) - for b in enabledBehavior: - logger.debug ('starting onstop behavior {}'.format (b.name)) - b.onstop () - - # if we stopped due to timeout, wait for remaining assets - l.waitIdle (2, 60) - l.stop () - - self.update_state (state='PROGRESS', meta={'step': 'onfinish'}) - for b in enabledBehavior: - logger.debug ('starting onfinish behavior {}'.format (b.name)) - b.onfinish () - - ret['stats'] = l.stats - writer.flush () - if not output: - outPath = os.path.join (app.conf.finished_dir, outFile) - os.rename (fd.name, outPath) - return ret +from . import behavior +from .controller import SinglePageController, defaultSettings, ControllerSettings def stateCallback (data): result = data['result'] @@ -134,34 +35,40 @@ def stateCallback (data): def main (): parser = argparse.ArgumentParser(description='Save website to WARC using Google Chrome.') parser.add_argument('--browser', help='DevTools URL', metavar='URL') - parser.add_argument('--distributed', help='Use celery worker', action='store_true') parser.add_argument('--timeout', default=10, type=int, help='Maximum time for archival', metavar='SEC') parser.add_argument('--idle-timeout', default=2, type=int, help='Maximum idle seconds (i.e. no requests)', dest='idleTimeout', metavar='SEC') - parser.add_argument('--log-buffer', default=defaults.logBuffer, type=int, dest='logBuffer', metavar='LINES') - parser.add_argument('--max-body-size', default=defaults.maxBodySize, type=int, dest='maxBodySize', help='Max body size', metavar='BYTES') - #parser.add_argument('--keep-tab', action='store_true', default=False, dest='keepTab', help='Keep tab open') + parser.add_argument('--log-buffer', default=defaultSettings.logBuffer, type=int, dest='logBuffer', metavar='LINES') + parser.add_argument('--max-body-size', default=defaultSettings.maxBodySize, type=int, dest='maxBodySize', help='Max body size', metavar='BYTES') parser.add_argument('--behavior', help='Comma-separated list of enabled behavior scripts', dest='enabledBehaviorNames', default=list (behavior.availableNames), choices=list (behavior.availableNames)) + group = parser.add_mutually_exclusive_group (required=True) + group.add_argument('--output', help='WARC filename', metavar='FILE') + group.add_argument('--distributed', help='Use celery worker', action='store_true') parser.add_argument('url', help='Website URL') - parser.add_argument('output', help='WARC filename') args = parser.parse_args () # prepare args for function distributed = args.distributed - passArgs = vars (args) - del passArgs['distributed'] if distributed: - result = archive.delay (**passArgs) + from .task import archive + settings = dict (maxBodySize=args.maxBodySize, + logBuffer=args.logBuffer, idleTimeout=args.idleTimeout, + timeout=args.timeout) + result = archive.delay (url=args.url, settings=settings, + enabledBehaviorNames=args.enabledBehaviorNames) r = result.get (on_message=stateCallback) else: - # XXX: local evaluation does not init celery logging? logging.basicConfig (level=logging.INFO) - r = archive (**passArgs) - print (r['stats']) + settings = ControllerSettings (maxBodySize=args.maxBodySize, + logBuffer=args.logBuffer, idleTimeout=args.idleTimeout, + timeout=args.timeout) + with open (args.output, 'wb') as fd: + controller = SinglePageController (args.url, fd, settings=settings) + controller.run () return True diff --git a/crocoite/controller.py b/crocoite/controller.py new file mode 100644 index 0000000..a338559 --- /dev/null +++ b/crocoite/controller.py @@ -0,0 +1,103 @@ +# Copyright (c) 2017–2018 crocoite contributors +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +""" +Controller classes, handling actions required for archival +""" + +class ControllerSettings: + def __init__ (self, logBuffer=1000, maxBodySize=50*1024*1024, idleTimeout=2, timeout=10): + self.logBuffer = logBuffer + self.maxBodySize = maxBodySize + self.idleTimeout = idleTimeout + self.timeout = timeout + +defaultSettings = ControllerSettings () + +import logging + +import pychrome + +from . import behavior as cbehavior +from .browser import ChromeService +from .warc import WarcLoader, SerializingWARCWriter +from .util import getFormattedViewportMetrics + +class SinglePageController: + """ + Archive a single page url to file output. + """ + + def __init__ (self, url, output, service=ChromeService (), behavior=cbehavior.available, \ + logger=logging.getLogger(__name__), settings=defaultSettings): + self.url = url + self.output = output + self.service = service + self.behavior = behavior + self.settings = settings + self.logger = logger + + def run (self): + ret = {'stats': None} + + with self.service as browser: + browser = pychrome.Browser (url=browser) + writer = SerializingWARCWriter (self.output, gzip=True) + + with WarcLoader (browser, self.url, writer, + logBuffer=self.settings.logBuffer, + maxBodySize=self.settings.maxBodySize) as l: + version = l.tab.Browser.getVersion () + payload = { + 'software': __package__, + 'browser': version['product'], + 'useragent': version['userAgent'], + 'viewport': getFormattedViewportMetrics (l.tab), + } + warcinfo = writer.create_warcinfo_record (filename=None, info=payload) + writer.write_record (warcinfo) + + # not all behavior scripts are allowed for every URL, filter them + enabledBehavior = list (filter (lambda x: self.url in x, + map (lambda x: x (l), self.behavior))) + + for b in enabledBehavior: + self.logger.debug ('starting onload behavior {}'.format (b.name)) + b.onload () + l.start () + + l.waitIdle (self.settings.idleTimeout, self.settings.timeout) + + for b in enabledBehavior: + self.logger.debug ('starting onstop behavior {}'.format (b.name)) + b.onstop () + + # if we stopped due to timeout, wait for remaining assets + l.waitIdle (2, 60) + l.stop () + + for b in enabledBehavior: + self.logger.debug ('starting onfinish behavior {}'.format (b.name)) + b.onfinish () + + ret['stats'] = l.stats + writer.flush () + return ret + diff --git a/crocoite/defaults.py b/crocoite/defaults.py deleted file mode 100644 index d55312d..0000000 --- a/crocoite/defaults.py +++ /dev/null @@ -1,27 +0,0 @@ -# Copyright (c) 2017 crocoite contributors -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -""" -Defaults settings -""" - -maxBodySize = 50*1024*1024 -logBuffer = 1000 - diff --git a/crocoite/task.py b/crocoite/task.py new file mode 100644 index 0000000..39900a5 --- /dev/null +++ b/crocoite/task.py @@ -0,0 +1,71 @@ +# Copyright (c) 2017–2018 crocoite contributors +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +""" +Celery distributed tasks +""" + +import os + +from urllib.parse import urlsplit +from datetime import datetime + +from celery import Celery +from celery.utils.log import get_task_logger + +from .controller import SinglePageController, ControllerSettings +from . import behavior + +app = Celery ('crocoite.distributed') +app.config_from_object('celeryconfig') +logger = get_task_logger('crocoite.distributed.archive') + +@app.task(bind=True, track_started=True) +def archive (self, url, settings, enabledBehaviorNames): + """ + Archive a single URL + + Supports these config keys (celeryconfig): + + warc_filename = '{domain}-{date}-{id}.warc.gz' + temp_dir = '/tmp/' + finished_dir = '/tmp/finished' + """ + + parsedUrl = urlsplit (url) + outFile = app.conf.warc_filename.format ( + id=self.request.id, + domain=parsedUrl.hostname.replace ('/', '-'), + date=datetime.utcnow ().isoformat (), + ) + outPath = os.path.join (app.conf.temp_dir, outFile) + fd = open (outPath, 'wb') + + enabledBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available)) + settings = ControllerSettings (**settings) + controller = SinglePageController (url, fd, behavior=enabledBehavior, settings=settings) + ret = controller.run () + + os.makedirs (app.conf.finished_dir, exist_ok=True) + outPath = os.path.join (app.conf.finished_dir, outFile) + os.rename (fd.name, outPath) + + return ret + diff --git a/crocoite/warc.py b/crocoite/warc.py index 9c96900..3fd65e4 100644 --- a/crocoite/warc.py +++ b/crocoite/warc.py @@ -38,7 +38,7 @@ from warcio.warcwriter import WARCWriter from .browser import AccountingSiteLoader from .util import packageUrl -from . import defaults +from .controller import defaultSettings class SerializingWARCWriter (WARCWriter): """ @@ -103,8 +103,8 @@ class WARCLogHandler (BufferingHandler): class WarcLoader (AccountingSiteLoader): def __init__ (self, browser, url, writer, logger=logging.getLogger(__name__), - logBuffer=defaults.logBuffer, - maxBodySize=defaults.maxBodySize): + logBuffer=defaultSettings.logBuffer, + maxBodySize=defaultSettings.maxBodySize): super ().__init__ (browser, url, logger) self.writer = writer self.maxBodySize = maxBodySize -- cgit v1.2.3