summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.rst2
-rw-r--r--contrib/celerycrocoite.py27
-rw-r--r--crocoite/cli.py135
-rw-r--r--crocoite/controller.py103
-rw-r--r--crocoite/defaults.py27
-rw-r--r--crocoite/task.py71
-rw-r--r--crocoite/warc.py6
7 files changed, 211 insertions, 160 deletions
diff --git a/README.rst b/README.rst
index 6ddc62a..e3c04cc 100644
--- a/README.rst
+++ b/README.rst
@@ -79,7 +79,7 @@ Configure using celeryconfig.py
Start a Celery worker::
- celery -A crocoite.cli worker --loglevel=info
+ celery -A crocoite.task worker --loglevel=info
Then queue archive job::
diff --git a/contrib/celerycrocoite.py b/contrib/celerycrocoite.py
index c3a67ae..3d8c786 100644
--- a/contrib/celerycrocoite.py
+++ b/contrib/celerycrocoite.py
@@ -32,7 +32,8 @@ from threading import Thread
from queue import Queue
import queue
-from crocoite import behavior, cli, defaults
+from crocoite import behavior, task
+from crocoite.controller import defaultSettings
def prettyTimeDelta (seconds):
"""
@@ -125,14 +126,14 @@ def celeryWorker (bot, q):
break
action, trigger, args = item
if action == 'ao':
- handle = cli.archive.delay (**args)
+ handle = task.archive.delay (**args)
j = jobs[handle.id] = {'handle': handle, 'trigger': trigger, 'args': args}
# pretty-print a few selected args
showargs = {
- 'idleTimeout': prettyTimeDelta (args['idleTimeout']),
- 'timeout': prettyTimeDelta (args['timeout']),
- 'maxBodySize': prettyBytes (args['maxBodySize']),
+ 'idleTimeout': prettyTimeDelta (args['settings']['idleTimeout']),
+ 'timeout': prettyTimeDelta (args['settings']['timeout']),
+ 'maxBodySize': prettyBytes (args['settings']['maxBodySize']),
}
strargs = ', '.join (map (lambda x: '{}={}'.format (*x), showargs.items ()))
bot.msg (trigger.sender, '{}: {} has been queued as {} with {}'.format (trigger.nick, args['url'], handle.id, strargs))
@@ -173,16 +174,12 @@ def archive (bot, trigger):
return
blacklistedBehavior = {'domSnapshot', 'screenshot'}
- args = {
- 'url': url,
- 'output': None,
- 'enabledBehaviorNames': list (behavior.availableNames-blacklistedBehavior),
- 'browser': None,
- 'logBuffer': defaults.logBuffer,
- 'maxBodySize': defaults.maxBodySize,
- 'idleTimeout': 10,
- 'timeout': 1*60*60, # 1 hour
- }
+ settings = dict (maxBodySize=defaultSettings.maxBodySize,
+ logBuffer=defaultSettings.logBuffer, idleTimeout=10,
+ timeout=1*60*60)
+ args = dict (url=url,
+ enabledBehaviorNames=list (behavior.availableNames-blacklistedBehavior),
+ settings=settings)
q = bot.memory['crocoite']['q']
q.put_nowait (('ao', trigger, args))
diff --git a/crocoite/cli.py b/crocoite/cli.py
index 2cbbfa8..cac5b3b 100644
--- a/crocoite/cli.py
+++ b/crocoite/cli.py
@@ -19,112 +19,13 @@
# THE SOFTWARE.
"""
-Standalone and Celery command line interface
+Command line interface
"""
-import os, logging, argparse
-from io import BytesIO
-from datetime import datetime
-import pychrome
-from urllib.parse import urlsplit
+import logging, argparse
-from celery import Celery
-from celery.utils.log import get_task_logger
-
-from . import behavior, defaults
-from .warc import WarcLoader, SerializingWARCWriter
-from .browser import ChromeService, NullService
-from .util import packageUrl, getFormattedViewportMetrics
-
-app = Celery ('crocoite.distributed')
-app.config_from_object('celeryconfig')
-logger = get_task_logger('crocoite.distributed.archive')
-
-# defaults can be changed below using argparse; track started state, because tasks are usually long-running
-@app.task(bind=True, track_started=True)
-def archive (self, url, output, browser, logBuffer, maxBodySize, idleTimeout,
- timeout, enabledBehaviorNames):
- """
- Archive a single URL
-
- Supports these config keys (celeryconfig):
-
- warc_filename = '{domain}-{date}-{id}.warc.gz'
- temp_dir = '/tmp/'
- finished_dir = '/tmp/finished'
- """
-
- ret = {'stats': None}
-
- self.update_state (state='PROGRESS', meta={'step': 'start'})
-
- service = ChromeService ()
- if browser:
- service = NullService (browser)
-
- allBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available))
-
- with service as browser:
- browser = pychrome.Browser(url=browser)
-
- if not output:
- parsedUrl = urlsplit (url)
- outFile = app.conf.warc_filename.format (
- id=self.request.id,
- domain=parsedUrl.hostname.replace ('/', '-'),
- date=datetime.utcnow ().isoformat (),
- )
- outPath = os.path.join (app.conf.temp_dir, outFile)
- fd = open (outPath, 'wb')
- else:
- fd = open (output, 'wb')
- writer = SerializingWARCWriter (fd, gzip=True)
-
- with WarcLoader (browser, url, writer, logBuffer=logBuffer,
- maxBodySize=maxBodySize) as l:
- version = l.tab.Browser.getVersion ()
- payload = {
- 'software': __package__,
- 'browser': version['product'],
- 'useragent': version['userAgent'],
- 'viewport': getFormattedViewportMetrics (l.tab),
- }
- warcinfo = writer.create_warcinfo_record (filename=None, info=payload)
- writer.write_record (warcinfo)
-
- # not all behavior scripts are allowed for every URL, filter them
- enabledBehavior = list (filter (lambda x: url in x,
- map (lambda x: x (l), allBehavior)))
-
- self.update_state (state='PROGRESS', meta={'step': 'onload'})
- for b in enabledBehavior:
- logger.debug ('starting onload behavior {}'.format (b.name))
- b.onload ()
- l.start ()
-
- self.update_state (state='PROGRESS', meta={'step': 'fetch'})
- l.waitIdle (idleTimeout, timeout)
-
- self.update_state (state='PROGRESS', meta={'step': 'onstop'})
- for b in enabledBehavior:
- logger.debug ('starting onstop behavior {}'.format (b.name))
- b.onstop ()
-
- # if we stopped due to timeout, wait for remaining assets
- l.waitIdle (2, 60)
- l.stop ()
-
- self.update_state (state='PROGRESS', meta={'step': 'onfinish'})
- for b in enabledBehavior:
- logger.debug ('starting onfinish behavior {}'.format (b.name))
- b.onfinish ()
-
- ret['stats'] = l.stats
- writer.flush ()
- if not output:
- outPath = os.path.join (app.conf.finished_dir, outFile)
- os.rename (fd.name, outPath)
- return ret
+from . import behavior
+from .controller import SinglePageController, defaultSettings, ControllerSettings
def stateCallback (data):
result = data['result']
@@ -134,34 +35,40 @@ def stateCallback (data):
def main ():
parser = argparse.ArgumentParser(description='Save website to WARC using Google Chrome.')
parser.add_argument('--browser', help='DevTools URL', metavar='URL')
- parser.add_argument('--distributed', help='Use celery worker', action='store_true')
parser.add_argument('--timeout', default=10, type=int, help='Maximum time for archival', metavar='SEC')
parser.add_argument('--idle-timeout', default=2, type=int, help='Maximum idle seconds (i.e. no requests)', dest='idleTimeout', metavar='SEC')
- parser.add_argument('--log-buffer', default=defaults.logBuffer, type=int, dest='logBuffer', metavar='LINES')
- parser.add_argument('--max-body-size', default=defaults.maxBodySize, type=int, dest='maxBodySize', help='Max body size', metavar='BYTES')
- #parser.add_argument('--keep-tab', action='store_true', default=False, dest='keepTab', help='Keep tab open')
+ parser.add_argument('--log-buffer', default=defaultSettings.logBuffer, type=int, dest='logBuffer', metavar='LINES')
+ parser.add_argument('--max-body-size', default=defaultSettings.maxBodySize, type=int, dest='maxBodySize', help='Max body size', metavar='BYTES')
parser.add_argument('--behavior', help='Comma-separated list of enabled behavior scripts',
dest='enabledBehaviorNames',
default=list (behavior.availableNames),
choices=list (behavior.availableNames))
+ group = parser.add_mutually_exclusive_group (required=True)
+ group.add_argument('--output', help='WARC filename', metavar='FILE')
+ group.add_argument('--distributed', help='Use celery worker', action='store_true')
parser.add_argument('url', help='Website URL')
- parser.add_argument('output', help='WARC filename')
args = parser.parse_args ()
# prepare args for function
distributed = args.distributed
- passArgs = vars (args)
- del passArgs['distributed']
if distributed:
- result = archive.delay (**passArgs)
+ from .task import archive
+ settings = dict (maxBodySize=args.maxBodySize,
+ logBuffer=args.logBuffer, idleTimeout=args.idleTimeout,
+ timeout=args.timeout)
+ result = archive.delay (url=args.url, settings=settings,
+ enabledBehaviorNames=args.enabledBehaviorNames)
r = result.get (on_message=stateCallback)
else:
- # XXX: local evaluation does not init celery logging?
logging.basicConfig (level=logging.INFO)
- r = archive (**passArgs)
- print (r['stats'])
+ settings = ControllerSettings (maxBodySize=args.maxBodySize,
+ logBuffer=args.logBuffer, idleTimeout=args.idleTimeout,
+ timeout=args.timeout)
+ with open (args.output, 'wb') as fd:
+ controller = SinglePageController (args.url, fd, settings=settings)
+ controller.run ()
return True
diff --git a/crocoite/controller.py b/crocoite/controller.py
new file mode 100644
index 0000000..a338559
--- /dev/null
+++ b/crocoite/controller.py
@@ -0,0 +1,103 @@
+# Copyright (c) 2017–2018 crocoite contributors
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+"""
+Controller classes, handling actions required for archival
+"""
+
+class ControllerSettings:
+ def __init__ (self, logBuffer=1000, maxBodySize=50*1024*1024, idleTimeout=2, timeout=10):
+ self.logBuffer = logBuffer
+ self.maxBodySize = maxBodySize
+ self.idleTimeout = idleTimeout
+ self.timeout = timeout
+
+defaultSettings = ControllerSettings ()
+
+import logging
+
+import pychrome
+
+from . import behavior as cbehavior
+from .browser import ChromeService
+from .warc import WarcLoader, SerializingWARCWriter
+from .util import getFormattedViewportMetrics
+
+class SinglePageController:
+ """
+ Archive a single page url to file output.
+ """
+
+ def __init__ (self, url, output, service=ChromeService (), behavior=cbehavior.available, \
+ logger=logging.getLogger(__name__), settings=defaultSettings):
+ self.url = url
+ self.output = output
+ self.service = service
+ self.behavior = behavior
+ self.settings = settings
+ self.logger = logger
+
+ def run (self):
+ ret = {'stats': None}
+
+ with self.service as browser:
+ browser = pychrome.Browser (url=browser)
+ writer = SerializingWARCWriter (self.output, gzip=True)
+
+ with WarcLoader (browser, self.url, writer,
+ logBuffer=self.settings.logBuffer,
+ maxBodySize=self.settings.maxBodySize) as l:
+ version = l.tab.Browser.getVersion ()
+ payload = {
+ 'software': __package__,
+ 'browser': version['product'],
+ 'useragent': version['userAgent'],
+ 'viewport': getFormattedViewportMetrics (l.tab),
+ }
+ warcinfo = writer.create_warcinfo_record (filename=None, info=payload)
+ writer.write_record (warcinfo)
+
+ # not all behavior scripts are allowed for every URL, filter them
+ enabledBehavior = list (filter (lambda x: self.url in x,
+ map (lambda x: x (l), self.behavior)))
+
+ for b in enabledBehavior:
+ self.logger.debug ('starting onload behavior {}'.format (b.name))
+ b.onload ()
+ l.start ()
+
+ l.waitIdle (self.settings.idleTimeout, self.settings.timeout)
+
+ for b in enabledBehavior:
+ self.logger.debug ('starting onstop behavior {}'.format (b.name))
+ b.onstop ()
+
+ # if we stopped due to timeout, wait for remaining assets
+ l.waitIdle (2, 60)
+ l.stop ()
+
+ for b in enabledBehavior:
+ self.logger.debug ('starting onfinish behavior {}'.format (b.name))
+ b.onfinish ()
+
+ ret['stats'] = l.stats
+ writer.flush ()
+ return ret
+
diff --git a/crocoite/defaults.py b/crocoite/defaults.py
deleted file mode 100644
index d55312d..0000000
--- a/crocoite/defaults.py
+++ /dev/null
@@ -1,27 +0,0 @@
-# Copyright (c) 2017 crocoite contributors
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-
-"""
-Defaults settings
-"""
-
-maxBodySize = 50*1024*1024
-logBuffer = 1000
-
diff --git a/crocoite/task.py b/crocoite/task.py
new file mode 100644
index 0000000..39900a5
--- /dev/null
+++ b/crocoite/task.py
@@ -0,0 +1,71 @@
+# Copyright (c) 2017–2018 crocoite contributors
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+"""
+Celery distributed tasks
+"""
+
+import os
+
+from urllib.parse import urlsplit
+from datetime import datetime
+
+from celery import Celery
+from celery.utils.log import get_task_logger
+
+from .controller import SinglePageController, ControllerSettings
+from . import behavior
+
+app = Celery ('crocoite.distributed')
+app.config_from_object('celeryconfig')
+logger = get_task_logger('crocoite.distributed.archive')
+
+@app.task(bind=True, track_started=True)
+def archive (self, url, settings, enabledBehaviorNames):
+ """
+ Archive a single URL
+
+ Supports these config keys (celeryconfig):
+
+ warc_filename = '{domain}-{date}-{id}.warc.gz'
+ temp_dir = '/tmp/'
+ finished_dir = '/tmp/finished'
+ """
+
+ parsedUrl = urlsplit (url)
+ outFile = app.conf.warc_filename.format (
+ id=self.request.id,
+ domain=parsedUrl.hostname.replace ('/', '-'),
+ date=datetime.utcnow ().isoformat (),
+ )
+ outPath = os.path.join (app.conf.temp_dir, outFile)
+ fd = open (outPath, 'wb')
+
+ enabledBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available))
+ settings = ControllerSettings (**settings)
+ controller = SinglePageController (url, fd, behavior=enabledBehavior, settings=settings)
+ ret = controller.run ()
+
+ os.makedirs (app.conf.finished_dir, exist_ok=True)
+ outPath = os.path.join (app.conf.finished_dir, outFile)
+ os.rename (fd.name, outPath)
+
+ return ret
+
diff --git a/crocoite/warc.py b/crocoite/warc.py
index 9c96900..3fd65e4 100644
--- a/crocoite/warc.py
+++ b/crocoite/warc.py
@@ -38,7 +38,7 @@ from warcio.warcwriter import WARCWriter
from .browser import AccountingSiteLoader
from .util import packageUrl
-from . import defaults
+from .controller import defaultSettings
class SerializingWARCWriter (WARCWriter):
"""
@@ -103,8 +103,8 @@ class WARCLogHandler (BufferingHandler):
class WarcLoader (AccountingSiteLoader):
def __init__ (self, browser, url, writer,
logger=logging.getLogger(__name__),
- logBuffer=defaults.logBuffer,
- maxBodySize=defaults.maxBodySize):
+ logBuffer=defaultSettings.logBuffer,
+ maxBodySize=defaultSettings.maxBodySize):
super ().__init__ (browser, url, logger)
self.writer = writer
self.maxBodySize = maxBodySize