summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLars-Dominik Braun <lars@6xq.net>2018-02-22 16:02:43 +0100
committerLars-Dominik Braun <lars@6xq.net>2018-02-22 16:02:43 +0100
commitd078d8f4046971e60adcb6a8cc88067f78934eff (patch)
tree64d290bff7e719211f841e40a7681960754ccb0e
parentf147f634277864cd6cfb8d5184c6392e044453c6 (diff)
downloadcrocoite-d078d8f4046971e60adcb6a8cc88067f78934eff.tar.gz
crocoite-d078d8f4046971e60adcb6a8cc88067f78934eff.tar.bz2
crocoite-d078d8f4046971e60adcb6a8cc88067f78934eff.zip
irc plugin: Serialize celery operations
This is a workaround for https://github.com/celery/celery/issues/4480
-rw-r--r--contrib/celerycrocoite.py173
1 files changed, 105 insertions, 68 deletions
diff --git a/contrib/celerycrocoite.py b/contrib/celerycrocoite.py
index 6a6ac1c..c3a67ae 100644
--- a/contrib/celerycrocoite.py
+++ b/contrib/celerycrocoite.py
@@ -25,8 +25,12 @@ Module for Sopel IRC bot
import os, logging
from sopel.module import nickname_commands, require_chanmsg, thread, example, require_privilege, VOICE
from sopel.tools import Identifier, SopelMemory
-import celery
+import celery, celery.exceptions
+from celery.result import AsyncResult
from urllib.parse import urlsplit
+from threading import Thread
+from queue import Queue
+import queue
from crocoite import behavior, cli, defaults
@@ -53,26 +57,113 @@ def prettyBytes (b):
return '{:.1f} {}'.format (b, prefixes[0])
def setup (bot):
- m = bot.memory['crocoite'] = SopelMemory ()
- m['jobs'] = {}
+ m = bot.memory['crocoite'] = {}
+ q = m['q'] = Queue ()
+ t = m['t'] = Thread (target=celeryWorker, args=(bot, q))
+ t.start ()
+
+def shutdown (bot):
+ m = bot.memory['crocoite']
+ q = m['q']
+ t = m['t']
+ q.put_nowait (None)
+ t.join ()
def isValidUrl (s):
url = urlsplit (s)
return url.scheme and url.netloc and url.scheme in {'http', 'https'}
+def checkCompletedJobs (bot, jobs):
+ delete = set ()
+ for i, data in jobs.items ():
+ handle = data['handle']
+ trigger = data['trigger']
+ args = data['args']
+ url = args['url']
+ channel = trigger.sender
+ user = trigger.nick
+ if Identifier (channel) not in bot.channels:
+ continue
+ try:
+ result = handle.get (timeout=0.1)
+ stats = result['stats']
+ bot.msg (channel, '{}: {} ({}) finished. {} requests, {} failed, {} received.'.format (user, url,
+ handle.id, stats['requests'], stats['failed'],
+ prettyBytes (stats['bytesRcv'])))
+ delete.add (handle.id)
+ except celery.exceptions.TimeoutError:
+ pass
+ except Exception as e:
+ # json serialization does not work well with exceptions. If their class
+ # names are unique we can still distinguish them.
+ ename = type (e).__name__
+ if ename == 'TaskRevokedError':
+ bot.msg (channel, '{}: {} ({}) was revoked'.format (user, url, handle.id))
+ else:
+ bot.msg (channel, '{} ({}) failed'.format (user, url, handle.id))
+ logging.exception ('{} ({}) failed'.format (url, handle.id))
+ delete.add (handle.id)
+ for d in delete:
+ del jobs[d]
+
+def celeryWorker (bot, q):
+ """
+ Serialize celery operations in a single thread. This is a workaround for
+ https://github.com/celery/celery/issues/4480
+ """
+
+ jobs = {}
+
+ while True:
+ try:
+ item = q.get (timeout=1)
+ except queue.Empty:
+ checkCompletedJobs (bot, jobs)
+ continue
+
+ if item is None:
+ break
+ action, trigger, args = item
+ if action == 'ao':
+ handle = cli.archive.delay (**args)
+ j = jobs[handle.id] = {'handle': handle, 'trigger': trigger, 'args': args}
+
+ # pretty-print a few selected args
+ showargs = {
+ 'idleTimeout': prettyTimeDelta (args['idleTimeout']),
+ 'timeout': prettyTimeDelta (args['timeout']),
+ 'maxBodySize': prettyBytes (args['maxBodySize']),
+ }
+ strargs = ', '.join (map (lambda x: '{}={}'.format (*x), showargs.items ()))
+ bot.msg (trigger.sender, '{}: {} has been queued as {} with {}'.format (trigger.nick, args['url'], handle.id, strargs))
+ elif action == 'status':
+ if args and args in jobs:
+ j = jobs[args]
+ jtrigger = j['trigger']
+ handle = j['handle']
+ bot.msg (trigger.sender, '{}: {}, queued {}, by {}'.format (handle.id,
+ handle.status, jtrigger.time, jtrigger.nick))
+ else:
+ bot.msg (trigger.sender, "Job not found.")
+ elif action == 'revoke':
+ if args and args in jobs:
+ j = jobs[args]
+ handle = j['handle']
+ handle.revoke (terminate=True)
+ # response is handled above
+ else:
+ bot.msg (trigger.sender, "Job not found.")
+ q.task_done ()
+
@nickname_commands ('ao', 'archiveonly')
@require_chanmsg ()
-#@require_privilege (VOICE)
-@thread (True)
+@require_privilege (VOICE)
@example ('ao http://example.com')
def archive (bot, trigger):
"""
Archive a single page (no recursion) to WARC
"""
- def updateState (job, data):
- job['state'] = data
-
url = trigger.group(2)
if not url:
bot.reply ('Need a URL')
@@ -92,41 +183,8 @@ def archive (bot, trigger):
'idleTimeout': 10,
'timeout': 1*60*60, # 1 hour
}
-
- handle = cli.archive.delay (**args)
- m = bot.memory['crocoite']
- jobs = m['jobs']
- # XXX: for some reason we cannot access the job’s state through handle,
- # instead use a callback quirk
- j = jobs[handle.id] = {'handle': handle, 'trigger': trigger, 'state': {}}
-
- # pretty-print a few selected args
- showargs = {
- 'behavior': ','.join (args['enabledBehaviorNames']),
- 'idleTimeout': prettyTimeDelta (args['idleTimeout']),
- 'timeout': prettyTimeDelta (args['timeout']),
- 'maxBodySize': prettyBytes (args['maxBodySize']),
- }
- strargs = ', '.join (map (lambda x: '{}={}'.format (*x), showargs.items ()))
- bot.reply ('{} has been queued as {} with {}'.format (url, handle.id, strargs))
-
- try:
- result = handle.get (on_message=lambda x: updateState (j, x))
- stats = result['stats']
- bot.reply ('{} ({}) finished. {} requests, {} failed, {} received.'.format (url,
- handle.id, stats['requests'], stats['failed'],
- prettyBytes (stats['bytesRcv'])))
- except Exception as e:
- # json serialization does not work well with exceptions. If their class
- # names are unique we can still distinguish them.
- ename = type (e).__name__
- if ename == 'TaskRevokedError':
- bot.reply ('{} ({}) was revoked'.format (url, handle.id))
- else:
- bot.reply ('{} ({}) failed'.format (url, handle.id))
- logging.exception ('{} ({}) failed'.format (url, handle.id))
- finally:
- del jobs[handle.id]
+ q = bot.memory['crocoite']['q']
+ q.put_nowait (('ao', trigger, args))
@nickname_commands ('s', 'status')
@example ('s c251f09e-3c26-481f-96e0-4b5f58bd1170')
@@ -136,21 +194,9 @@ def status (bot, trigger):
Retrieve status for a job
"""
- m = bot.memory['crocoite']
- jobs = m['jobs']
-
i = trigger.group(2)
- if not i or i not in jobs:
- bot.reply("Job not found.")
- return
-
- j = jobs[i]
- jtrigger = j['trigger']
- jhandle = j['handle']
- jstate = j['state']
- jresult = jstate.get ('result', {})
- bot.reply ('{}: {}, queued {}, by {}'.format (jhandle.id,
- jstate.get ('status', 'UNKNOWN'), jtrigger.time, jtrigger.nick))
+ q = bot.memory['crocoite']['q']
+ q.put_nowait (('status', trigger, i))
@nickname_commands ('r', 'revoke')
@example ('r c251f09e-3c26-481f-96e0-4b5f58bd1170')
@@ -161,16 +207,7 @@ def revoke (bot, trigger):
Cancel (revoke) a job
"""
- m = bot.memory['crocoite']
- jobs = m['jobs']
-
i = trigger.group(2)
- if not i or i not in jobs:
- bot.reply ("Job not found.")
- return
-
- j = jobs[i]
- jhandle = j['handle']
- jhandle.revoke (terminate=True)
- # response is handled by long-running initiation thread
+ q = bot.memory['crocoite']['q']
+ q.put_nowait (('revoke', trigger, i))