From d078d8f4046971e60adcb6a8cc88067f78934eff Mon Sep 17 00:00:00 2001
From: Lars-Dominik Braun <lars@6xq.net>
Date: Thu, 22 Feb 2018 16:02:43 +0100
Subject: irc plugin: Serialize celery operations

This is a workaround for https://github.com/celery/celery/issues/4480
---
 contrib/celerycrocoite.py | 173 ++++++++++++++++++++++++++++------------------
 1 file changed, 105 insertions(+), 68 deletions(-)

(limited to 'contrib')

diff --git a/contrib/celerycrocoite.py b/contrib/celerycrocoite.py
index 6a6ac1c..c3a67ae 100644
--- a/contrib/celerycrocoite.py
+++ b/contrib/celerycrocoite.py
@@ -25,8 +25,12 @@ Module for Sopel IRC bot
 import os, logging
 from sopel.module import nickname_commands, require_chanmsg, thread, example, require_privilege, VOICE
 from sopel.tools import Identifier, SopelMemory
-import celery
+import celery, celery.exceptions
+from celery.result import AsyncResult
 from urllib.parse import urlsplit
+from threading import Thread
+from queue import Queue
+import queue
 
 from crocoite import behavior, cli, defaults
 
@@ -53,26 +57,113 @@ def prettyBytes (b):
     return '{:.1f} {}'.format (b, prefixes[0])
 
 def setup (bot):
-    m = bot.memory['crocoite'] = SopelMemory ()
-    m['jobs'] = {}
+    m = bot.memory['crocoite'] = {}
+    q = m['q'] = Queue ()
+    t = m['t'] = Thread (target=celeryWorker, args=(bot, q))
+    t.start ()
+
+def shutdown (bot):
+    m = bot.memory['crocoite']
+    q = m['q']
+    t = m['t']
+    q.put_nowait (None)
+    t.join ()
 
 def isValidUrl (s):
     url = urlsplit (s)
     return url.scheme and url.netloc and url.scheme in {'http', 'https'}
 
+def checkCompletedJobs (bot, jobs):
+    delete = set ()
+    for i, data in jobs.items ():
+        handle = data['handle']
+        trigger = data['trigger']
+        args = data['args']
+        url = args['url']
+        channel = trigger.sender
+        user = trigger.nick
+        if Identifier (channel) not in bot.channels:
+            continue
+        try:
+            result = handle.get (timeout=0.1)
+            stats = result['stats']
+            bot.msg (channel, '{}: {} ({}) finished. {} requests, {} failed, {} received.'.format (user, url,
+                    handle.id, stats['requests'], stats['failed'],
+                    prettyBytes (stats['bytesRcv'])))
+            delete.add (handle.id)
+        except celery.exceptions.TimeoutError:
+            pass
+        except Exception as e:
+            # json serialization does not work well with exceptions. If their class
+            # names are unique we can still distinguish them.
+            ename = type (e).__name__
+            if ename == 'TaskRevokedError':
+                bot.msg (channel, '{}: {} ({}) was revoked'.format (user, url, handle.id))
+            else:
+                bot.msg (channel, '{} ({}) failed'.format (user, url, handle.id))
+                logging.exception ('{} ({}) failed'.format (url, handle.id))
+            delete.add (handle.id)
+    for d in delete:
+        del jobs[d]
+
+def celeryWorker (bot, q):
+    """
+    Serialize celery operations in a single thread. This is a workaround for
+    https://github.com/celery/celery/issues/4480
+    """
+
+    jobs = {}
+
+    while True:
+        try:
+            item = q.get (timeout=1)
+        except queue.Empty:
+            checkCompletedJobs (bot, jobs)
+            continue
+
+        if item is None:
+            break
+        action, trigger, args = item
+        if action == 'ao':
+            handle = cli.archive.delay (**args)
+            j = jobs[handle.id] = {'handle': handle, 'trigger': trigger, 'args': args}
+
+            # pretty-print a few selected args
+            showargs = {
+                    'idleTimeout': prettyTimeDelta (args['idleTimeout']),
+                    'timeout': prettyTimeDelta (args['timeout']),
+                    'maxBodySize': prettyBytes (args['maxBodySize']),
+                    }
+            strargs = ', '.join (map (lambda x: '{}={}'.format (*x), showargs.items ()))
+            bot.msg (trigger.sender, '{}: {} has been queued as {} with {}'.format (trigger.nick, args['url'], handle.id, strargs))
+        elif action == 'status':
+            if args and args in jobs:
+                j = jobs[args]
+                jtrigger = j['trigger']
+                handle = j['handle']
+                bot.msg (trigger.sender, '{}: {}, queued {}, by {}'.format (handle.id,
+                        handle.status, jtrigger.time, jtrigger.nick))
+            else:
+                bot.msg (trigger.sender, "Job not found.")
+        elif action == 'revoke':
+            if args and args in jobs:
+                j = jobs[args]
+                handle = j['handle']
+                handle.revoke (terminate=True)
+                # response is handled above
+            else:
+                bot.msg (trigger.sender, "Job not found.")
+        q.task_done ()
+
 @nickname_commands ('ao', 'archiveonly')
 @require_chanmsg ()
-#@require_privilege (VOICE)
-@thread (True)
+@require_privilege (VOICE)
 @example ('ao http://example.com')
 def archive (bot, trigger):
     """
     Archive a single page (no recursion) to WARC
     """
 
-    def updateState (job, data):
-        job['state'] = data
-
     url = trigger.group(2)
     if not url:
         bot.reply ('Need a URL')
@@ -92,41 +183,8 @@ def archive (bot, trigger):
             'idleTimeout': 10,
             'timeout': 1*60*60, # 1 hour
             }
-
-    handle = cli.archive.delay (**args)
-    m = bot.memory['crocoite']
-    jobs = m['jobs']
-    # XXX: for some reason we cannot access the job’s state through handle,
-    # instead use a callback quirk
-    j = jobs[handle.id] = {'handle': handle, 'trigger': trigger, 'state': {}}
-
-    # pretty-print a few selected args
-    showargs = {
-            'behavior': ','.join (args['enabledBehaviorNames']),
-            'idleTimeout': prettyTimeDelta (args['idleTimeout']),
-            'timeout': prettyTimeDelta (args['timeout']),
-            'maxBodySize': prettyBytes (args['maxBodySize']),
-            }
-    strargs = ', '.join (map (lambda x: '{}={}'.format (*x), showargs.items ()))
-    bot.reply ('{} has been queued as {} with {}'.format (url, handle.id, strargs))
-
-    try:
-        result = handle.get (on_message=lambda x: updateState (j, x))
-        stats = result['stats']
-        bot.reply ('{} ({}) finished. {} requests, {} failed, {} received.'.format (url,
-                handle.id, stats['requests'], stats['failed'],
-                prettyBytes (stats['bytesRcv'])))
-    except Exception as e:
-        # json serialization does not work well with exceptions. If their class
-        # names are unique we can still distinguish them.
-        ename = type (e).__name__
-        if ename == 'TaskRevokedError':
-            bot.reply ('{} ({}) was revoked'.format (url, handle.id))
-        else:
-            bot.reply ('{} ({}) failed'.format (url, handle.id))
-            logging.exception ('{} ({}) failed'.format (url, handle.id))
-    finally:
-        del jobs[handle.id]
+    q = bot.memory['crocoite']['q']
+    q.put_nowait (('ao', trigger, args))
 
 @nickname_commands ('s', 'status')
 @example ('s c251f09e-3c26-481f-96e0-4b5f58bd1170')
@@ -136,21 +194,9 @@ def status (bot, trigger):
     Retrieve status for a job
     """
 
-    m = bot.memory['crocoite']
-    jobs = m['jobs']
-
     i = trigger.group(2)
-    if not i or i not in jobs:
-        bot.reply("Job not found.")
-        return
-    
-    j = jobs[i]
-    jtrigger = j['trigger']
-    jhandle = j['handle']
-    jstate = j['state']
-    jresult = jstate.get ('result', {})
-    bot.reply ('{}: {}, queued {}, by {}'.format (jhandle.id,
-            jstate.get ('status', 'UNKNOWN'), jtrigger.time, jtrigger.nick))
+    q = bot.memory['crocoite']['q']
+    q.put_nowait (('status', trigger, i))
 
 @nickname_commands ('r', 'revoke')
 @example ('r c251f09e-3c26-481f-96e0-4b5f58bd1170')
@@ -161,16 +207,7 @@ def revoke (bot, trigger):
     Cancel (revoke) a job
     """
 
-    m = bot.memory['crocoite']
-    jobs = m['jobs']
-
     i = trigger.group(2)
-    if not i or i not in jobs:
-        bot.reply ("Job not found.")
-        return
-    
-    j = jobs[i]
-    jhandle = j['handle']
-    jhandle.revoke (terminate=True)
-    # response is handled by long-running initiation thread
+    q = bot.memory['crocoite']['q']
+    q.put_nowait (('revoke', trigger, i))
 
-- 
cgit v1.2.3