Solía tener una función como esta

def calculate(self, input):
    result = input * 2

    if result > 4:
        result_higher_then_four.send(result)

    return result

Donde result_higher_then_four obviamente representa una señal.

Luego introduje el apio y mi función se parecía a la siguiente y nunca más recibí una señal. Supongo que las señales están vinculadas por proceso y como el apio se ejecuta en un proceso diferente, esto significa que no puedo captar la señal en el proceso principal. ¿Debo usar un thread_local para arreglar esto? ¿O estoy pasando por alto lo obvio?

Gracias

@task
def calculate(self, input):
    result = input * 2

    if result > 4:
        result_higher_then_four.send(result)

    return result
11
user2298943 3 sep. 2014 a las 18:43

2 respuestas

La mejor respuesta

El problema es que el receptor de señal no se está registrando. Los trabajadores del apio se ejecutan en su propio proceso, por lo que las conexiones de señal deben realizarse en ese proceso. Si sabe cuáles son o puede descubrirlos, puede registrarlos durante la inicialización de la tarea utilizando esta técnica.

Por supuesto, eso elimina algunas de las ventajas de usar señales en primer lugar porque necesita conocer las conexiones de antemano.

Una idea es suponer que los receptores de señal siempre se registrarán en el módulo de modelos de cada aplicación. En cuyo caso, lo siguiente funcionará.

class CalculateTask(celery.Task):

    def __init__(self):
        from django.conf import settings
        for app in settings.INSTALLED_APPS:
            app_models = '{}.{}'.format(app,'models') 
            __import__(app_models, globals=globals())                                 

    def run(self, input):
        result = input * 2
        if result > 4:
            result_higher_then_four.send(result)

        return result
3
joshua 8 sep. 2014 a las 21:10

Puede usar la señal celeryd_init para inicializar sus trabajadores y señales http://celery.readthedocs.org/en/latest/userguide /signals.html#celeryd-init

Según lo que proporcionó, lo he probado con:

from celery.signals import celeryd_init
from celery.utils.dispatch import Signal

def process_result(result, *args, **kwargs):
    print "signals received: %s" % result

result_higher_then_four = Signal()

@celeryd_init.connect
def init_signals(*args, **kwargs):
    result_higher_then_four.connect(process_result)

@task(bind=True)
def calculate(self, input):
    result = input * 2

    if result > 4:
       result_higher_then_four.send(result=result, sender=self)

    return result
3
ant31 9 sep. 2014 a las 09:27