summaryrefslogtreecommitdiff
path: root/crocoite
diff options
context:
space:
mode:
authorLars-Dominik Braun <lars@6xq.net>2018-08-21 11:27:05 +0200
committerLars-Dominik Braun <lars@6xq.net>2018-08-21 13:19:47 +0200
commit53e4df3fe732417988532e5b3d8b4dc7e781a3df (patch)
tree2ed52af2b575afcb0165e03eebf6d4f4d30f965e /crocoite
parent8e5ac24c85ca9388410b2afda9a05fa4a3d9bf92 (diff)
downloadcrocoite-53e4df3fe732417988532e5b3d8b4dc7e781a3df.tar.gz
crocoite-53e4df3fe732417988532e5b3d8b4dc7e781a3df.tar.bz2
crocoite-53e4df3fe732417988532e5b3d8b4dc7e781a3df.zip
Remove celery and recursion
Gonna rewrite that properly.
Diffstat (limited to 'crocoite')
-rw-r--r--crocoite/cli.py73
-rw-r--r--crocoite/controller.py121
-rw-r--r--crocoite/task.py146
3 files changed, 23 insertions, 317 deletions
diff --git a/crocoite/cli.py b/crocoite/cli.py
index 8e225d9..ac7e648 100644
--- a/crocoite/cli.py
+++ b/crocoite/cli.py
@@ -25,76 +25,43 @@ Command line interface
import argparse, json, sys
from . import behavior
-from .controller import RecursiveController, defaultSettings, \
- ControllerSettings, DepthLimit, PrefixLimit, StatsHandler
+from .controller import SinglePageController, defaultSettings, \
+ ControllerSettings, StatsHandler
from .browser import NullService, ChromeService
from .warc import WarcHandler
from .logger import Logger, JsonPrintConsumer, DatetimeConsumer, WarcHandlerConsumer
-def parseRecursive (recursive, url):
- if recursive is None:
- return DepthLimit (0)
- elif recursive.isdigit ():
- return DepthLimit (int (recursive))
- elif recursive == 'prefix':
- return PrefixLimit (url)
- else:
- raise ValueError ('Unsupported')
-
-def main ():
+def single ():
parser = argparse.ArgumentParser(description='Save website to WARC using Google Chrome.')
parser.add_argument('--browser', help='DevTools URL', metavar='URL')
- parser.add_argument('--recursive', help='Follow links recursively')
- parser.add_argument('--concurrency', '-j', type=int, default=1)
parser.add_argument('--timeout', default=10, type=int, help='Maximum time for archival', metavar='SEC')
parser.add_argument('--idle-timeout', default=2, type=int, help='Maximum idle seconds (i.e. no requests)', dest='idleTimeout', metavar='SEC')
- parser.add_argument('--log-buffer', default=defaultSettings.logBuffer, type=int, dest='logBuffer', metavar='LINES')
parser.add_argument('--max-body-size', default=defaultSettings.maxBodySize, type=int, dest='maxBodySize', help='Max body size', metavar='BYTES')
parser.add_argument('--behavior', help='Comma-separated list of enabled behavior scripts',
dest='enabledBehaviorNames',
default=list (behavior.availableMap.keys ()),
choices=list (behavior.availableMap.keys ()))
- group = parser.add_mutually_exclusive_group (required=True)
- group.add_argument('--output', help='WARC filename', metavar='FILE')
- group.add_argument('--distributed', help='Use celery worker', action='store_true')
- parser.add_argument('url', help='Website URL')
+ parser.add_argument('url', help='Website URL', metavar='URL')
+ parser.add_argument('output', help='WARC filename', metavar='FILE')
args = parser.parse_args ()
- if args.distributed:
- if args.browser:
- parser.error ('--browser is not supported for distributed jobs')
- from . import task
- settings = dict (maxBodySize=args.maxBodySize,
- logBuffer=args.logBuffer, idleTimeout=args.idleTimeout,
- timeout=args.timeout)
- result = task.controller.delay (url=args.url, settings=settings,
- enabledBehaviorNames=args.enabledBehaviorNames,
- recursive=args.recursive, concurrency=args.concurrency)
- r = result.get ()
- else:
- logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()])
+ logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()])
- try:
- recursionPolicy = parseRecursive (args.recursive, args.url)
- except ValueError:
- parser.error ('Invalid argument for --recursive')
- service = ChromeService ()
- if args.browser:
- service = NullService (args.browser)
- settings = ControllerSettings (maxBodySize=args.maxBodySize,
- logBuffer=args.logBuffer, idleTimeout=args.idleTimeout,
- timeout=args.timeout)
- with open (args.output, 'wb') as fd, WarcHandler (fd, logger) as warcHandler:
- logger.connect (WarcHandlerConsumer (warcHandler))
- handler = [StatsHandler (), warcHandler]
- b = list (map (lambda x: behavior.availableMap[x], args.enabledBehaviorNames))
- controller = RecursiveController (args.url, fd, settings=settings,
- recursionPolicy=recursionPolicy, service=service,
- handler=handler, behavior=b, logger=logger)
- controller.run ()
- r = handler[0].stats
- logger.info ('stats', context='cli', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **r)
+ service = ChromeService ()
+ if args.browser:
+ service = NullService (args.browser)
+ settings = ControllerSettings (maxBodySize=args.maxBodySize,
+ idleTimeout=args.idleTimeout, timeout=args.timeout)
+ with open (args.output, 'wb') as fd, WarcHandler (fd, logger) as warcHandler:
+ logger.connect (WarcHandlerConsumer (warcHandler))
+ handler = [StatsHandler (), warcHandler]
+ b = list (map (lambda x: behavior.availableMap[x], args.enabledBehaviorNames))
+ controller = SinglePageController (args.url, fd, settings=settings,
+ service=service, handler=handler, behavior=b, logger=logger)
+ controller.run ()
+ r = handler[0].stats
+ logger.info ('stats', context='cli', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **r)
return True
diff --git a/crocoite/controller.py b/crocoite/controller.py
index 178d11c..9dae96f 100644
--- a/crocoite/controller.py
+++ b/crocoite/controller.py
@@ -23,16 +23,15 @@ Controller classes, handling actions required for archival
"""
class ControllerSettings:
- __slots__ = ('logBuffer', 'maxBodySize', 'idleTimeout', 'timeout')
+ __slots__ = ('maxBodySize', 'idleTimeout', 'timeout')
- def __init__ (self, logBuffer=1000, maxBodySize=50*1024*1024, idleTimeout=2, timeout=10):
- self.logBuffer = logBuffer
+ def __init__ (self, maxBodySize=50*1024*1024, idleTimeout=2, timeout=10):
self.maxBodySize = maxBodySize
self.idleTimeout = idleTimeout
self.timeout = timeout
def toDict (self):
- return dict (logBuffer=self.logBuffer, maxBodySize=self.maxBodySize,
+ return dict (maxBodySize=self.maxBodySize,
idleTimeout=self.idleTimeout, timeout=self.timeout)
defaultSettings = ControllerSettings ()
@@ -207,117 +206,3 @@ class SinglePageController:
processQueue ()
-class RecursionPolicy:
- """ Abstract recursion policy """
-
- __slots__ = ()
-
- def __call__ (self, urls):
- raise NotImplementedError
-
-class DepthLimit (RecursionPolicy):
- """
- Limit recursion by depth.
-
- depth==0 means no recursion, depth==1 is the page and outgoing links, …
- """
-
- __slots__ = ('maxdepth')
-
- def __init__ (self, maxdepth=0):
- self.maxdepth = maxdepth
-
- def __call__ (self, urls):
- if self.maxdepth <= 0:
- return {}
- else:
- self.maxdepth -= 1
- return urls
-
- def __repr__ (self):
- return '<DepthLimit {}>'.format (self.maxdepth)
-
-class PrefixLimit (RecursionPolicy):
- """
- Limit recursion by prefix
-
- i.e. prefix=http://example.com/foo
- ignored: http://example.com/bar http://offsite.example/foo
- accepted: http://example.com/foobar http://example.com/foo/bar
- """
-
- __slots__ = ('prefix')
-
- def __init__ (self, prefix):
- self.prefix = prefix
-
- def __call__ (self, urls):
- return set (filter (lambda u: u.startswith (self.prefix), urls))
-
-from .behavior import ExtractLinksEvent
-
-class RecursiveController (EventHandler):
- """
- Simple recursive controller
-
- Visits links acording to recursionPolicy
- """
-
- __slots__ = ('url', 'output', 'service', 'behavior', 'settings', 'logger',
- 'recursionPolicy', 'handler', 'urls', 'have')
-
- def __init__ (self, url, output, logger,
- service=ChromeService (), behavior=cbehavior.available, \
- settings=defaultSettings, \
- recursionPolicy=DepthLimit (0), handler=[]):
- self.url = url
- self.output = output
- self.service = service
- self.behavior = behavior
- self.settings = settings
- self.logger = logger.bind (context=type(self).__name__, url=url)
- self.recursionPolicy = recursionPolicy
- self.handler = handler
- self.handler.append (self)
-
- def fetch (self, urls):
- """
- Overrideable fetch action for URLs. Defaults to sequential
- SinglePageController.
- """
- for u in urls:
- try:
- c = SinglePageController (url=u, output=self.output, service=self.service,
- behavior=self.behavior, logger=self.logger,
- settings=self.settings, handler=self.handler)
- c.run ()
- except BrowserCrashed:
- # this is fine if reported
- self.logger.error ('browser crashed', uuid='42582cbe-fb83-47ce-b330-d022a1c3b331')
-
- def run (self):
- self.have = set ()
- self.urls = set ([self.url])
-
- while self.urls:
- self.logger.info ('recursing',
- uuid='5b8498e4-868d-413c-a67e-004516b8452c',
- numurls=len (self.urls))
-
- self.have.update (self.urls)
- fetchurls = self.urls
- self.urls = set ()
-
- # handler appends new urls to self.urls through push()
- self.fetch (fetchurls)
-
- # remove urls we have and apply recursion policy
- self.urls.difference_update (self.have)
- self.urls = self.recursionPolicy (self.urls)
-
- def push (self, item):
- if isinstance (item, ExtractLinksEvent):
- self.logger.debug ('extracted links',
- uuid='8ee5e9c9-1130-4c5c-88ff-718508546e0c', links=item.links)
- self.urls.update (map (removeFragment, item.links))
-
diff --git a/crocoite/task.py b/crocoite/task.py
deleted file mode 100644
index 06dd022..0000000
--- a/crocoite/task.py
+++ /dev/null
@@ -1,146 +0,0 @@
-# Copyright (c) 2017–2018 crocoite contributors
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-
-"""
-Celery distributed tasks
-"""
-
-import os
-
-from urllib.parse import urlsplit
-from datetime import datetime
-from operator import attrgetter
-from itertools import chain
-
-def _monkeyPatchSyncTasks ():
- """ Result sets don’t support the argument disable_sync_subtasks argument """
- import celery.result
- celery.result.assert_will_not_block = lambda: None
-
-_monkeyPatchSyncTasks ()
-from celery import Celery
-from celery.utils.log import get_task_logger
-
-from .browser import ChromeService, BrowserCrashed
-from .controller import SinglePageController, ControllerSettings, RecursiveController, defaultSettings, DepthLimit, StatsHandler
-from . import behavior
-from .cli import parseRecursive
-from .warc import WarcHandler
-
-app = Celery ('crocoite.distributed')
-app.config_from_object('celeryconfig')
-app.conf.task_routes = {
- 'crocoite.task.archive': {'queue': 'crocoite.archive'},
- 'crocoite.task.controller': {'queue': 'crocoite.controller'},
- # <method>.chunks is actually a starmap job
- 'celery.starmap': {'queue': 'crocoite.archive'},
- }
-app.conf.task_default_queue = 'crocoite.default'
-# disable prefetching, since our tasks usually run for a _very_ long time
-app.conf.worker_prefetch_multiplier = 1
-logger = get_task_logger('crocoite.distributed.archive')
-
-@app.task(bind=True, track_started=True)
-def archive (self, url, settings, enabledBehaviorNames):
- """
- Archive a single URL
-
- Supports these config keys (celeryconfig):
-
- warc_filename = '{domain}-{date}-{id}.warc.gz'
- temp_dir = '/tmp/'
- finished_dir = '/tmp/finished'
- """
-
- parsedUrl = urlsplit (url)
- outFile = app.conf.warc_filename.format (
- id=self.request.root_id,
- domain=parsedUrl.hostname.replace ('/', '-'),
- date=datetime.utcnow ().isoformat (),
- )
- outPath = os.path.join (app.conf.temp_dir, outFile)
- fd = open (outPath, 'wb')
-
- handler = [StatsHandler (), WarcHandler (fd)]
- enabledBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available))
- settings = ControllerSettings (**settings)
- try:
- c = SinglePageController (url, fd, behavior=enabledBehavior,
- settings=settings, handler=handler)
- c.run ()
- except BrowserCrashed:
- # nothing we can do about that
- logger.error ('browser crashed for {}'.format (url))
-
- os.makedirs (app.conf.finished_dir, exist_ok=True)
- outPath = os.path.join (app.conf.finished_dir, outFile)
- os.rename (fd.name, outPath)
-
- return handler[0].stats
-
-from collections import UserDict
-
-class IntegerDict (UserDict):
- """ Dict with dict/dict per-item arithmetic propagation, i.e. {1: 2}+{1: 1}={1: 3} """
- def __add__ (self, b):
- newdict = self.__class__ (self)
- for k, v in b.items ():
- if k in self:
- newdict[k] += v
- else:
- newdict[k] = v
- return newdict
-
-class DistributedRecursiveController (RecursiveController):
- """ Distributed, recursive controller using celery """
-
- __slots__ = ('concurrency', 'stats')
-
- def __init__ (self, url, logger, service=ChromeService (), behavior=behavior.available, \
- settings=defaultSettings,
- recursionPolicy=DepthLimit (0), concurrency=1):
- super ().__init__ (url, None, service, behavior, logger, settings, recursionPolicy)
- self.concurrency = concurrency
- self.stats = IntegerDict ()
-
- def fetch (self, urls):
- def chunksIter (urls):
- for u in urls:
- yield (u, self.settings.toDict (), list (map (attrgetter ('name'), self.behavior)))
- itemsPerTask = len (urls)//self.concurrency
- if itemsPerTask <= 0:
- itemsPerTask = len (urls)
- result = archive.chunks (chunksIter (urls), itemsPerTask).apply_async ().get ()
- self.stats = sum (chain.from_iterable (result), self.stats)
-
-@app.task(bind=True, track_started=True)
-def controller (self, url, settings, enabledBehaviorNames, recursive, concurrency):
- """ Recursive controller """
-
- logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()])
- recursionPolicy = parseRecursive (recursive, url)
- enabledBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available))
- settings = ControllerSettings (**settings)
- c = DistributedRecursiveController (url, None, logger=logger, behavior=enabledBehavior,
- settings=settings, recursionPolicy=recursionPolicy, concurrency=concurrency)
- c.run ()
- return dict (c.stats)
-
-