From d078d8f4046971e60adcb6a8cc88067f78934eff Mon Sep 17 00:00:00 2001 From: Lars-Dominik Braun Date: Thu, 22 Feb 2018 16:02:43 +0100 Subject: irc plugin: Serialize celery operations This is a workaround for https://github.com/celery/celery/issues/4480 --- contrib/celerycrocoite.py | 173 ++++++++++++++++++++++++++++------------------ 1 file changed, 105 insertions(+), 68 deletions(-) diff --git a/contrib/celerycrocoite.py b/contrib/celerycrocoite.py index 6a6ac1c..c3a67ae 100644 --- a/contrib/celerycrocoite.py +++ b/contrib/celerycrocoite.py @@ -25,8 +25,12 @@ Module for Sopel IRC bot import os, logging from sopel.module import nickname_commands, require_chanmsg, thread, example, require_privilege, VOICE from sopel.tools import Identifier, SopelMemory -import celery +import celery, celery.exceptions +from celery.result import AsyncResult from urllib.parse import urlsplit +from threading import Thread +from queue import Queue +import queue from crocoite import behavior, cli, defaults @@ -53,26 +57,113 @@ def prettyBytes (b): return '{:.1f} {}'.format (b, prefixes[0]) def setup (bot): - m = bot.memory['crocoite'] = SopelMemory () - m['jobs'] = {} + m = bot.memory['crocoite'] = {} + q = m['q'] = Queue () + t = m['t'] = Thread (target=celeryWorker, args=(bot, q)) + t.start () + +def shutdown (bot): + m = bot.memory['crocoite'] + q = m['q'] + t = m['t'] + q.put_nowait (None) + t.join () def isValidUrl (s): url = urlsplit (s) return url.scheme and url.netloc and url.scheme in {'http', 'https'} +def checkCompletedJobs (bot, jobs): + delete = set () + for i, data in jobs.items (): + handle = data['handle'] + trigger = data['trigger'] + args = data['args'] + url = args['url'] + channel = trigger.sender + user = trigger.nick + if Identifier (channel) not in bot.channels: + continue + try: + result = handle.get (timeout=0.1) + stats = result['stats'] + bot.msg (channel, '{}: {} ({}) finished. {} requests, {} failed, {} received.'.format (user, url, + handle.id, stats['requests'], stats['failed'], + prettyBytes (stats['bytesRcv']))) + delete.add (handle.id) + except celery.exceptions.TimeoutError: + pass + except Exception as e: + # json serialization does not work well with exceptions. If their class + # names are unique we can still distinguish them. + ename = type (e).__name__ + if ename == 'TaskRevokedError': + bot.msg (channel, '{}: {} ({}) was revoked'.format (user, url, handle.id)) + else: + bot.msg (channel, '{} ({}) failed'.format (user, url, handle.id)) + logging.exception ('{} ({}) failed'.format (url, handle.id)) + delete.add (handle.id) + for d in delete: + del jobs[d] + +def celeryWorker (bot, q): + """ + Serialize celery operations in a single thread. This is a workaround for + https://github.com/celery/celery/issues/4480 + """ + + jobs = {} + + while True: + try: + item = q.get (timeout=1) + except queue.Empty: + checkCompletedJobs (bot, jobs) + continue + + if item is None: + break + action, trigger, args = item + if action == 'ao': + handle = cli.archive.delay (**args) + j = jobs[handle.id] = {'handle': handle, 'trigger': trigger, 'args': args} + + # pretty-print a few selected args + showargs = { + 'idleTimeout': prettyTimeDelta (args['idleTimeout']), + 'timeout': prettyTimeDelta (args['timeout']), + 'maxBodySize': prettyBytes (args['maxBodySize']), + } + strargs = ', '.join (map (lambda x: '{}={}'.format (*x), showargs.items ())) + bot.msg (trigger.sender, '{}: {} has been queued as {} with {}'.format (trigger.nick, args['url'], handle.id, strargs)) + elif action == 'status': + if args and args in jobs: + j = jobs[args] + jtrigger = j['trigger'] + handle = j['handle'] + bot.msg (trigger.sender, '{}: {}, queued {}, by {}'.format (handle.id, + handle.status, jtrigger.time, jtrigger.nick)) + else: + bot.msg (trigger.sender, "Job not found.") + elif action == 'revoke': + if args and args in jobs: + j = jobs[args] + handle = j['handle'] + handle.revoke (terminate=True) + # response is handled above + else: + bot.msg (trigger.sender, "Job not found.") + q.task_done () + @nickname_commands ('ao', 'archiveonly') @require_chanmsg () -#@require_privilege (VOICE) -@thread (True) +@require_privilege (VOICE) @example ('ao http://example.com') def archive (bot, trigger): """ Archive a single page (no recursion) to WARC """ - def updateState (job, data): - job['state'] = data - url = trigger.group(2) if not url: bot.reply ('Need a URL') @@ -92,41 +183,8 @@ def archive (bot, trigger): 'idleTimeout': 10, 'timeout': 1*60*60, # 1 hour } - - handle = cli.archive.delay (**args) - m = bot.memory['crocoite'] - jobs = m['jobs'] - # XXX: for some reason we cannot access the job’s state through handle, - # instead use a callback quirk - j = jobs[handle.id] = {'handle': handle, 'trigger': trigger, 'state': {}} - - # pretty-print a few selected args - showargs = { - 'behavior': ','.join (args['enabledBehaviorNames']), - 'idleTimeout': prettyTimeDelta (args['idleTimeout']), - 'timeout': prettyTimeDelta (args['timeout']), - 'maxBodySize': prettyBytes (args['maxBodySize']), - } - strargs = ', '.join (map (lambda x: '{}={}'.format (*x), showargs.items ())) - bot.reply ('{} has been queued as {} with {}'.format (url, handle.id, strargs)) - - try: - result = handle.get (on_message=lambda x: updateState (j, x)) - stats = result['stats'] - bot.reply ('{} ({}) finished. {} requests, {} failed, {} received.'.format (url, - handle.id, stats['requests'], stats['failed'], - prettyBytes (stats['bytesRcv']))) - except Exception as e: - # json serialization does not work well with exceptions. If their class - # names are unique we can still distinguish them. - ename = type (e).__name__ - if ename == 'TaskRevokedError': - bot.reply ('{} ({}) was revoked'.format (url, handle.id)) - else: - bot.reply ('{} ({}) failed'.format (url, handle.id)) - logging.exception ('{} ({}) failed'.format (url, handle.id)) - finally: - del jobs[handle.id] + q = bot.memory['crocoite']['q'] + q.put_nowait (('ao', trigger, args)) @nickname_commands ('s', 'status') @example ('s c251f09e-3c26-481f-96e0-4b5f58bd1170') @@ -136,21 +194,9 @@ def status (bot, trigger): Retrieve status for a job """ - m = bot.memory['crocoite'] - jobs = m['jobs'] - i = trigger.group(2) - if not i or i not in jobs: - bot.reply("Job not found.") - return - - j = jobs[i] - jtrigger = j['trigger'] - jhandle = j['handle'] - jstate = j['state'] - jresult = jstate.get ('result', {}) - bot.reply ('{}: {}, queued {}, by {}'.format (jhandle.id, - jstate.get ('status', 'UNKNOWN'), jtrigger.time, jtrigger.nick)) + q = bot.memory['crocoite']['q'] + q.put_nowait (('status', trigger, i)) @nickname_commands ('r', 'revoke') @example ('r c251f09e-3c26-481f-96e0-4b5f58bd1170') @@ -161,16 +207,7 @@ def revoke (bot, trigger): Cancel (revoke) a job """ - m = bot.memory['crocoite'] - jobs = m['jobs'] - i = trigger.group(2) - if not i or i not in jobs: - bot.reply ("Job not found.") - return - - j = jobs[i] - jhandle = j['handle'] - jhandle.revoke (terminate=True) - # response is handled by long-running initiation thread + q = bot.memory['crocoite']['q'] + q.put_nowait (('revoke', trigger, i)) -- cgit v1.2.3