summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLars-Dominik Braun <lars@6xq.net>2018-08-21 11:27:05 +0200
committerLars-Dominik Braun <lars@6xq.net>2018-08-21 13:19:47 +0200
commit53e4df3fe732417988532e5b3d8b4dc7e781a3df (patch)
tree2ed52af2b575afcb0165e03eebf6d4f4d30f965e
parent8e5ac24c85ca9388410b2afda9a05fa4a3d9bf92 (diff)
downloadcrocoite-53e4df3fe732417988532e5b3d8b4dc7e781a3df.tar.gz
crocoite-53e4df3fe732417988532e5b3d8b4dc7e781a3df.tar.bz2
crocoite-53e4df3fe732417988532e5b3d8b4dc7e781a3df.zip
Remove celery and recursion
Gonna rewrite that properly.
-rw-r--r--README.rst61
-rw-r--r--contrib/celerycrocoite.py229
-rw-r--r--crocoite/cli.py73
-rw-r--r--crocoite/controller.py121
-rw-r--r--crocoite/task.py146
-rw-r--r--setup.py3
6 files changed, 24 insertions, 609 deletions
diff --git a/README.rst b/README.rst
index 7108491..b1fce2c 100644
--- a/README.rst
+++ b/README.rst
@@ -17,12 +17,10 @@ The following dependencies must be present to run crocoite:
- pychrome_
- warcio_
- html5lib_
-- Celery_ (optional)
.. _pychrome: https://github.com/fate0/pychrome
.. _warcio: https://github.com/webrecorder/warcio
.. _html5lib: https://github.com/html5lib/html5lib-python
-.. _Celery: http://www.celeryproject.org/
It is recommended to prepare a virtualenv and let pip handle the dependency
resolution for Python packages instead:
@@ -121,65 +119,6 @@ does not work any more. Secondly it also saves a screenshot of the full page,
so even if future browsers cannot render and display the stored HTML a fully
rendered version of the website can be replayed instead.
-Advanced usage
---------------
-
-crocoite offers more than just a one-shot command-line interface.
-
-Distributed crawling
-^^^^^^^^^^^^^^^^^^^^
-
-Configure using celeryconfig.py
-
-.. code:: python
-
- broker_url = 'pyamqp://'
- result_backend = 'rpc://'
- warc_filename = '{domain}-{date}-{id}.warc.gz'
- temp_dir = '/tmp/'
- finished_dir = '/tmp/finished'
-
-Start a Celery worker::
-
- celery -A crocoite.task worker -Q crocoite.archive,crocoite.controller --loglevel=info
-
-Then queue archive job::
-
- crocoite-grab --distributed http://example.com
-
-The worker will create a temporary file named according to ``warc_filename`` in
-``/tmp`` while archiving and move it to ``/tmp/finished`` when done.
-
-IRC bot
-^^^^^^^
-
-Configure sopel_ (``~/.sopel/default.cfg``) to use the plugin located in
-``contrib/celerycrocoite.py``
-
-.. code:: ini
-
- [core]
- nick = chromebot
- host = irc.efnet.fr
- port = 6667
- owner = someone
- extra = /path/to/crocoite/contrib
- enable = celerycrocoite
- channels = #somechannel
-
-Then start it by running ``sopel``. The bot must be addressed directly (i.e.
-``chromebot: <command>``). The following commands are currently supported:
-
-a <url>
- Archives <url> and all of its resources (images, css, …). A unique UID
- (UUID) is assigned to each job.
-s <uuid>
- Get status of job with <uuid>
-r <uuid>
- Revoke job with <uuid>. If it started already the job will be killed.
-
-.. _sopel: https://sopel.chat/
-
Related projects
----------------
diff --git a/contrib/celerycrocoite.py b/contrib/celerycrocoite.py
deleted file mode 100644
index 3da43d9..0000000
--- a/contrib/celerycrocoite.py
+++ /dev/null
@@ -1,229 +0,0 @@
-# Copyright (c) 2017 crocoite contributors
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-
-"""
-Module for Sopel IRC bot
-"""
-
-import os, logging, argparse
-from sopel.module import nickname_commands, require_chanmsg, thread, example, require_privilege, VOICE
-from sopel.tools import Identifier, SopelMemory
-import celery, celery.exceptions
-from celery.result import AsyncResult
-from urllib.parse import urlsplit
-from threading import Thread
-from queue import Queue
-import queue
-
-from crocoite import behavior, task
-from crocoite.controller import defaultSettings
-
-def prettyTimeDelta (seconds):
- """
- Pretty-print seconds to human readable string 1d 1h 1m 1s
- """
- seconds = int(seconds)
- days, seconds = divmod(seconds, 86400)
- hours, seconds = divmod(seconds, 3600)
- minutes, seconds = divmod(seconds, 60)
- s = [(days, 'd'), (hours, 'h'), (minutes, 'm'), (seconds, 's')]
- s = filter (lambda x: x[0] != 0, s)
- return ' '.join (map (lambda x: '{}{}'.format (*x), s))
-
-def prettyBytes (b):
- """
- Pretty-print bytes
- """
- prefixes = ['B', 'KiB', 'MiB', 'GiB', 'TiB']
- while b >= 1024 and len (prefixes) > 1:
- b /= 1024
- prefixes.pop (0)
- return '{:.1f} {}'.format (b, prefixes[0])
-
-def setup (bot):
- m = bot.memory['crocoite'] = {}
- q = m['q'] = Queue ()
- t = m['t'] = Thread (target=celeryWorker, args=(bot, q))
- t.start ()
-
-def shutdown (bot):
- m = bot.memory['crocoite']
- q = m['q']
- t = m['t']
- q.put_nowait (None)
- t.join ()
-
-def isValidUrl (s):
- url = urlsplit (s)
- if url.scheme and url.netloc and url.scheme in {'http', 'https'}:
- return s
- raise TypeError ()
-
-def checkCompletedJobs (bot, jobs):
- delete = set ()
- for i, data in jobs.items ():
- handle = data['handle']
- trigger = data['trigger']
- args = data['args']
- url = args['url']
- channel = trigger.sender
- user = trigger.nick
- if Identifier (channel) not in bot.channels:
- continue
- try:
- stats = handle.get (timeout=0.1)
- bot.msg (channel, '{}: {} ({}) finished. {} crashed, {} requests, {} failed, {} received.'.format (user, url,
- handle.id, stats['crashed'], stats['requests'], stats['failed'],
- prettyBytes (stats['bytesRcv'])))
- delete.add (handle.id)
- except celery.exceptions.TimeoutError:
- pass
- except Exception as e:
- # json serialization does not work well with exceptions. If their class
- # names are unique we can still distinguish them.
- ename = type (e).__name__
- if ename == 'TaskRevokedError':
- bot.msg (channel, '{}: {} ({}) was revoked'.format (user, url, handle.id))
- else:
- bot.msg (channel, '{} ({}) failed'.format (user, url, handle.id))
- logging.exception ('{} ({}) failed'.format (url, handle.id))
- delete.add (handle.id)
- for d in delete:
- del jobs[d]
-
-def celeryWorker (bot, q):
- """
- Serialize celery operations in a single thread. This is a workaround for
- https://github.com/celery/celery/issues/4480
- """
-
- jobs = {}
-
- while True:
- try:
- item = q.get (timeout=1)
- except queue.Empty:
- checkCompletedJobs (bot, jobs)
- continue
-
- if item is None:
- break
- action, trigger, args = item
- if action == 'a':
- handle = task.controller.delay (**args)
- j = jobs[handle.id] = {'handle': handle, 'trigger': trigger, 'args': args}
-
- # pretty-print a few selected args
- showargs = {
- 'idleTimeout': prettyTimeDelta (args['settings']['idleTimeout']),
- 'timeout': prettyTimeDelta (args['settings']['timeout']),
- 'maxBodySize': prettyBytes (args['settings']['maxBodySize']),
- 'recursive': args['recursive'],
- 'concurrency': args['concurrency'],
- }
- strargs = ', '.join (map (lambda x: '{}={}'.format (*x), showargs.items ()))
- bot.msg (trigger.sender, '{}: {} has been queued as {} with {}'.format (trigger.nick, args['url'], handle.id, strargs))
- elif action == 'status':
- if args and args in jobs:
- j = jobs[args]
- jtrigger = j['trigger']
- handle = j['handle']
- bot.msg (trigger.sender, '{}: {}, queued {}, by {}'.format (handle.id,
- handle.status, jtrigger.time, jtrigger.nick))
- else:
- bot.msg (trigger.sender, "Job not found.")
- elif action == 'revoke':
- if args and args in jobs:
- j = jobs[args]
- handle = j['handle']
- handle.revoke (terminate=True)
- # response is handled above
- else:
- bot.msg (trigger.sender, "Job not found.")
- q.task_done ()
-
-class NonExitingArgumentParser (argparse.ArgumentParser):
- def exit (self, status=0, message=None):
- # should never be called
- pass
-
- def error (self, message):
- raise Exception (message)
-
-archiveparser = NonExitingArgumentParser (prog='a', add_help=False)
-archiveparser.add_argument('--timeout', default=1*60*60, type=int, help='Maximum time for archival', metavar='SEC', choices=[60, 1*60*60, 2*60*60])
-archiveparser.add_argument('--idle-timeout', default=10, type=int, help='Maximum idle seconds (i.e. no requests)', dest='idleTimeout', metavar='SEC', choices=[1, 10, 20, 30, 60])
-archiveparser.add_argument('--max-body-size', default=defaultSettings.maxBodySize, type=int, dest='maxBodySize', help='Max body size', metavar='BYTES', choices=[1*1024*1024, 10*1024*1024, defaultSettings.maxBodySize, 100*1024*1024])
-archiveparser.add_argument('--concurrency', default=1, type=int, help='Parallel workers for this job', choices=range (9))
-archiveparser.add_argument('--recursive', help='Enable recursion', choices=['0', '1', '2', '3', 'prefix'])
-archiveparser.add_argument('url', help='Website URL', type=isValidUrl)
-
-@nickname_commands ('a', 'archive')
-@require_chanmsg ()
-@require_privilege (VOICE)
-@example ('a http://example.com')
-def archive (bot, trigger):
- """
- Archive a URL to WARC
- """
-
- try:
- args = archiveparser.parse_args (trigger.group (2).split ())
- except Exception as e:
- bot.reply ('{} -- {}'.format (e.args[0], archiveparser.format_usage ()))
- return
- if not args:
- bot.reply ('Sorry, I don’t understand {}'.format (trigger.group (2)))
- return
- settings = dict (maxBodySize=args.maxBodySize,
- logBuffer=defaultSettings.logBuffer, idleTimeout=args.idleTimeout,
- timeout=args.timeout)
- args = dict (url=args.url,
- enabledBehaviorNames=list (behavior.availableMap.keys ()),
- settings=settings, recursive=args.recursive,
- concurrency=args.concurrency)
- q = bot.memory['crocoite']['q']
- q.put_nowait (('a', trigger, args))
-
-@nickname_commands ('s', 'status')
-@example ('s c251f09e-3c26-481f-96e0-4b5f58bd1170')
-@require_chanmsg ()
-def status (bot, trigger):
- """
- Retrieve status for a job
- """
-
- i = trigger.group(2)
- q = bot.memory['crocoite']['q']
- q.put_nowait (('status', trigger, i))
-
-@nickname_commands ('r', 'revoke')
-@example ('r c251f09e-3c26-481f-96e0-4b5f58bd1170')
-@require_privilege (VOICE)
-@require_chanmsg ()
-def revoke (bot, trigger):
- """
- Cancel (revoke) a job
- """
-
- i = trigger.group(2)
- q = bot.memory['crocoite']['q']
- q.put_nowait (('revoke', trigger, i))
-
diff --git a/crocoite/cli.py b/crocoite/cli.py
index 8e225d9..ac7e648 100644
--- a/crocoite/cli.py
+++ b/crocoite/cli.py
@@ -25,76 +25,43 @@ Command line interface
import argparse, json, sys
from . import behavior
-from .controller import RecursiveController, defaultSettings, \
- ControllerSettings, DepthLimit, PrefixLimit, StatsHandler
+from .controller import SinglePageController, defaultSettings, \
+ ControllerSettings, StatsHandler
from .browser import NullService, ChromeService
from .warc import WarcHandler
from .logger import Logger, JsonPrintConsumer, DatetimeConsumer, WarcHandlerConsumer
-def parseRecursive (recursive, url):
- if recursive is None:
- return DepthLimit (0)
- elif recursive.isdigit ():
- return DepthLimit (int (recursive))
- elif recursive == 'prefix':
- return PrefixLimit (url)
- else:
- raise ValueError ('Unsupported')
-
-def main ():
+def single ():
parser = argparse.ArgumentParser(description='Save website to WARC using Google Chrome.')
parser.add_argument('--browser', help='DevTools URL', metavar='URL')
- parser.add_argument('--recursive', help='Follow links recursively')
- parser.add_argument('--concurrency', '-j', type=int, default=1)
parser.add_argument('--timeout', default=10, type=int, help='Maximum time for archival', metavar='SEC')
parser.add_argument('--idle-timeout', default=2, type=int, help='Maximum idle seconds (i.e. no requests)', dest='idleTimeout', metavar='SEC')
- parser.add_argument('--log-buffer', default=defaultSettings.logBuffer, type=int, dest='logBuffer', metavar='LINES')
parser.add_argument('--max-body-size', default=defaultSettings.maxBodySize, type=int, dest='maxBodySize', help='Max body size', metavar='BYTES')
parser.add_argument('--behavior', help='Comma-separated list of enabled behavior scripts',
dest='enabledBehaviorNames',
default=list (behavior.availableMap.keys ()),
choices=list (behavior.availableMap.keys ()))
- group = parser.add_mutually_exclusive_group (required=True)
- group.add_argument('--output', help='WARC filename', metavar='FILE')
- group.add_argument('--distributed', help='Use celery worker', action='store_true')
- parser.add_argument('url', help='Website URL')
+ parser.add_argument('url', help='Website URL', metavar='URL')
+ parser.add_argument('output', help='WARC filename', metavar='FILE')
args = parser.parse_args ()
- if args.distributed:
- if args.browser:
- parser.error ('--browser is not supported for distributed jobs')
- from . import task
- settings = dict (maxBodySize=args.maxBodySize,
- logBuffer=args.logBuffer, idleTimeout=args.idleTimeout,
- timeout=args.timeout)
- result = task.controller.delay (url=args.url, settings=settings,
- enabledBehaviorNames=args.enabledBehaviorNames,
- recursive=args.recursive, concurrency=args.concurrency)
- r = result.get ()
- else:
- logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()])
+ logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()])
- try:
- recursionPolicy = parseRecursive (args.recursive, args.url)
- except ValueError:
- parser.error ('Invalid argument for --recursive')
- service = ChromeService ()
- if args.browser:
- service = NullService (args.browser)
- settings = ControllerSettings (maxBodySize=args.maxBodySize,
- logBuffer=args.logBuffer, idleTimeout=args.idleTimeout,
- timeout=args.timeout)
- with open (args.output, 'wb') as fd, WarcHandler (fd, logger) as warcHandler:
- logger.connect (WarcHandlerConsumer (warcHandler))
- handler = [StatsHandler (), warcHandler]
- b = list (map (lambda x: behavior.availableMap[x], args.enabledBehaviorNames))
- controller = RecursiveController (args.url, fd, settings=settings,
- recursionPolicy=recursionPolicy, service=service,
- handler=handler, behavior=b, logger=logger)
- controller.run ()
- r = handler[0].stats
- logger.info ('stats', context='cli', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **r)
+ service = ChromeService ()
+ if args.browser:
+ service = NullService (args.browser)
+ settings = ControllerSettings (maxBodySize=args.maxBodySize,
+ idleTimeout=args.idleTimeout, timeout=args.timeout)
+ with open (args.output, 'wb') as fd, WarcHandler (fd, logger) as warcHandler:
+ logger.connect (WarcHandlerConsumer (warcHandler))
+ handler = [StatsHandler (), warcHandler]
+ b = list (map (lambda x: behavior.availableMap[x], args.enabledBehaviorNames))
+ controller = SinglePageController (args.url, fd, settings=settings,
+ service=service, handler=handler, behavior=b, logger=logger)
+ controller.run ()
+ r = handler[0].stats
+ logger.info ('stats', context='cli', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **r)
return True
diff --git a/crocoite/controller.py b/crocoite/controller.py
index 178d11c..9dae96f 100644
--- a/crocoite/controller.py
+++ b/crocoite/controller.py
@@ -23,16 +23,15 @@ Controller classes, handling actions required for archival
"""
class ControllerSettings:
- __slots__ = ('logBuffer', 'maxBodySize', 'idleTimeout', 'timeout')
+ __slots__ = ('maxBodySize', 'idleTimeout', 'timeout')
- def __init__ (self, logBuffer=1000, maxBodySize=50*1024*1024, idleTimeout=2, timeout=10):
- self.logBuffer = logBuffer
+ def __init__ (self, maxBodySize=50*1024*1024, idleTimeout=2, timeout=10):
self.maxBodySize = maxBodySize
self.idleTimeout = idleTimeout
self.timeout = timeout
def toDict (self):
- return dict (logBuffer=self.logBuffer, maxBodySize=self.maxBodySize,
+ return dict (maxBodySize=self.maxBodySize,
idleTimeout=self.idleTimeout, timeout=self.timeout)
defaultSettings = ControllerSettings ()
@@ -207,117 +206,3 @@ class SinglePageController:
processQueue ()
-class RecursionPolicy:
- """ Abstract recursion policy """
-
- __slots__ = ()
-
- def __call__ (self, urls):
- raise NotImplementedError
-
-class DepthLimit (RecursionPolicy):
- """
- Limit recursion by depth.
-
- depth==0 means no recursion, depth==1 is the page and outgoing links, …
- """
-
- __slots__ = ('maxdepth')
-
- def __init__ (self, maxdepth=0):
- self.maxdepth = maxdepth
-
- def __call__ (self, urls):
- if self.maxdepth <= 0:
- return {}
- else:
- self.maxdepth -= 1
- return urls
-
- def __repr__ (self):
- return '<DepthLimit {}>'.format (self.maxdepth)
-
-class PrefixLimit (RecursionPolicy):
- """
- Limit recursion by prefix
-
- i.e. prefix=http://example.com/foo
- ignored: http://example.com/bar http://offsite.example/foo
- accepted: http://example.com/foobar http://example.com/foo/bar
- """
-
- __slots__ = ('prefix')
-
- def __init__ (self, prefix):
- self.prefix = prefix
-
- def __call__ (self, urls):
- return set (filter (lambda u: u.startswith (self.prefix), urls))
-
-from .behavior import ExtractLinksEvent
-
-class RecursiveController (EventHandler):
- """
- Simple recursive controller
-
- Visits links acording to recursionPolicy
- """
-
- __slots__ = ('url', 'output', 'service', 'behavior', 'settings', 'logger',
- 'recursionPolicy', 'handler', 'urls', 'have')
-
- def __init__ (self, url, output, logger,
- service=ChromeService (), behavior=cbehavior.available, \
- settings=defaultSettings, \
- recursionPolicy=DepthLimit (0), handler=[]):
- self.url = url
- self.output = output
- self.service = service
- self.behavior = behavior
- self.settings = settings
- self.logger = logger.bind (context=type(self).__name__, url=url)
- self.recursionPolicy = recursionPolicy
- self.handler = handler
- self.handler.append (self)
-
- def fetch (self, urls):
- """
- Overrideable fetch action for URLs. Defaults to sequential
- SinglePageController.
- """
- for u in urls:
- try:
- c = SinglePageController (url=u, output=self.output, service=self.service,
- behavior=self.behavior, logger=self.logger,
- settings=self.settings, handler=self.handler)
- c.run ()
- except BrowserCrashed:
- # this is fine if reported
- self.logger.error ('browser crashed', uuid='42582cbe-fb83-47ce-b330-d022a1c3b331')
-
- def run (self):
- self.have = set ()
- self.urls = set ([self.url])
-
- while self.urls:
- self.logger.info ('recursing',
- uuid='5b8498e4-868d-413c-a67e-004516b8452c',
- numurls=len (self.urls))
-
- self.have.update (self.urls)
- fetchurls = self.urls
- self.urls = set ()
-
- # handler appends new urls to self.urls through push()
- self.fetch (fetchurls)
-
- # remove urls we have and apply recursion policy
- self.urls.difference_update (self.have)
- self.urls = self.recursionPolicy (self.urls)
-
- def push (self, item):
- if isinstance (item, ExtractLinksEvent):
- self.logger.debug ('extracted links',
- uuid='8ee5e9c9-1130-4c5c-88ff-718508546e0c', links=item.links)
- self.urls.update (map (removeFragment, item.links))
-
diff --git a/crocoite/task.py b/crocoite/task.py
deleted file mode 100644
index 06dd022..0000000
--- a/crocoite/task.py
+++ /dev/null
@@ -1,146 +0,0 @@
-# Copyright (c) 2017–2018 crocoite contributors
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-
-"""
-Celery distributed tasks
-"""
-
-import os
-
-from urllib.parse import urlsplit
-from datetime import datetime
-from operator import attrgetter
-from itertools import chain
-
-def _monkeyPatchSyncTasks ():
- """ Result sets don’t support the argument disable_sync_subtasks argument """
- import celery.result
- celery.result.assert_will_not_block = lambda: None
-
-_monkeyPatchSyncTasks ()
-from celery import Celery
-from celery.utils.log import get_task_logger
-
-from .browser import ChromeService, BrowserCrashed
-from .controller import SinglePageController, ControllerSettings, RecursiveController, defaultSettings, DepthLimit, StatsHandler
-from . import behavior
-from .cli import parseRecursive
-from .warc import WarcHandler
-
-app = Celery ('crocoite.distributed')
-app.config_from_object('celeryconfig')
-app.conf.task_routes = {
- 'crocoite.task.archive': {'queue': 'crocoite.archive'},
- 'crocoite.task.controller': {'queue': 'crocoite.controller'},
- # <method>.chunks is actually a starmap job
- 'celery.starmap': {'queue': 'crocoite.archive'},
- }
-app.conf.task_default_queue = 'crocoite.default'
-# disable prefetching, since our tasks usually run for a _very_ long time
-app.conf.worker_prefetch_multiplier = 1
-logger = get_task_logger('crocoite.distributed.archive')
-
-@app.task(bind=True, track_started=True)
-def archive (self, url, settings, enabledBehaviorNames):
- """
- Archive a single URL
-
- Supports these config keys (celeryconfig):
-
- warc_filename = '{domain}-{date}-{id}.warc.gz'
- temp_dir = '/tmp/'
- finished_dir = '/tmp/finished'
- """
-
- parsedUrl = urlsplit (url)
- outFile = app.conf.warc_filename.format (
- id=self.request.root_id,
- domain=parsedUrl.hostname.replace ('/', '-'),
- date=datetime.utcnow ().isoformat (),
- )
- outPath = os.path.join (app.conf.temp_dir, outFile)
- fd = open (outPath, 'wb')
-
- handler = [StatsHandler (), WarcHandler (fd)]
- enabledBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available))
- settings = ControllerSettings (**settings)
- try:
- c = SinglePageController (url, fd, behavior=enabledBehavior,
- settings=settings, handler=handler)
- c.run ()
- except BrowserCrashed:
- # nothing we can do about that
- logger.error ('browser crashed for {}'.format (url))
-
- os.makedirs (app.conf.finished_dir, exist_ok=True)
- outPath = os.path.join (app.conf.finished_dir, outFile)
- os.rename (fd.name, outPath)
-
- return handler[0].stats
-
-from collections import UserDict
-
-class IntegerDict (UserDict):
- """ Dict with dict/dict per-item arithmetic propagation, i.e. {1: 2}+{1: 1}={1: 3} """
- def __add__ (self, b):
- newdict = self.__class__ (self)
- for k, v in b.items ():
- if k in self:
- newdict[k] += v
- else:
- newdict[k] = v
- return newdict
-
-class DistributedRecursiveController (RecursiveController):
- """ Distributed, recursive controller using celery """
-
- __slots__ = ('concurrency', 'stats')
-
- def __init__ (self, url, logger, service=ChromeService (), behavior=behavior.available, \
- settings=defaultSettings,
- recursionPolicy=DepthLimit (0), concurrency=1):
- super ().__init__ (url, None, service, behavior, logger, settings, recursionPolicy)
- self.concurrency = concurrency
- self.stats = IntegerDict ()
-
- def fetch (self, urls):
- def chunksIter (urls):
- for u in urls:
- yield (u, self.settings.toDict (), list (map (attrgetter ('name'), self.behavior)))
- itemsPerTask = len (urls)//self.concurrency
- if itemsPerTask <= 0:
- itemsPerTask = len (urls)
- result = archive.chunks (chunksIter (urls), itemsPerTask).apply_async ().get ()
- self.stats = sum (chain.from_iterable (result), self.stats)
-
-@app.task(bind=True, track_started=True)
-def controller (self, url, settings, enabledBehaviorNames, recursive, concurrency):
- """ Recursive controller """
-
- logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()])
- recursionPolicy = parseRecursive (recursive, url)
- enabledBehavior = list (filter (lambda x: x.name in enabledBehaviorNames, behavior.available))
- settings = ControllerSettings (**settings)
- c = DistributedRecursiveController (url, None, logger=logger, behavior=enabledBehavior,
- settings=settings, recursionPolicy=recursionPolicy, concurrency=concurrency)
- c.run ()
- return dict (c.stats)
-
-
diff --git a/setup.py b/setup.py
index 26113cf..e2357ca 100644
--- a/setup.py
+++ b/setup.py
@@ -13,11 +13,10 @@ setup(
'pychrome',
'warcio',
'html5lib>=0.999999999',
- 'Celery',
],
entry_points={
'console_scripts': [
- 'crocoite-grab = crocoite.cli:main',
+ 'crocoite-grab = crocoite.cli:single',
'crocoite-merge-warc = crocoite.tools:mergeWarc',
'crocoite-extract-screenshot = crocoite.tools:extractScreenshot',
],