Me gustaría tener tareas de apio que dependen del resultado de 2 o más tareas. He investigado Python + Apio: ¿encadenando trabajos? y http://pypi.python.org/pypi/celery-tasktree, pero solo son buenas si las tareas tienen una sola tarea dependiente .

Sé sobre TaskSet, pero no parece haber una manera de ejecutar instantáneamente una devolución de llamada cuando TaskSetResult.ready () se convierte en True. Lo que tengo en mente en este momento es tener una tarea periódica que sondee TaskSetResult.ready () cada pocos [mili] segundos más o menos y active la devolución de llamada cuando devuelva True, pero eso me parece bastante poco elegante.

¿Alguna sugerencia?

10
Leonth 5 jul. 2011 a las 13:39

3 respuestas

La mejor respuesta

Mrbox es verdadero, puede volver a intentarlo hasta que los resultados estén listos, pero no está tan claro en los documentos que cuando vuelva a intentarlo tenga que pasar los elementos setid y subtareas, y para recuperarlo debe usar la función de mapa, debajo de allí es un código de muestra para explicar lo que quiero decir.

def run(self, setid=None, subtasks=None, **kwargs):

    if not setid or not subtasks:
        #Is the first time that I launch this task, I'm going to launch the subtasks
        …
        tasks = []
        for slice in slices:
            tasks.append(uploadTrackSlice.subtask((slice,folder_name)))

        job = TaskSet(tasks=tasks)
        task_set_result = job.apply_async()
        setid = task_set_result.taskset_id
        subtasks = [result.task_id for result in task_set_result.subtasks]
        self.retry(exc=Exception("Result not ready"), args=[setid,subtasks])

    #Is a retry than we just have to check the results        
    tasks_result = TaskSetResult(setid, map(AsyncResult,subtasks))
    if not tasks_result.ready():
        self.retry(exc=Exception("Result not ready"), args=[setid,subtasks])
    else:    
        if tasks_result.successful():
            return tasks_result.join()
        else:
            raise Exception("Some of the tasks was failing")
3
Mauro Rocco 5 jul. 2011 a las 12:29

En mi humilde opinión, puede hacer algo similar a lo que se hace en docs- enlace

O puede usar el método de reintento con max_retries = None: si una de las tareas 'base' .ready () es falsa, puede disparar el método .retry () hasta que se completen las dos tareas 'base'.

2
mrbox 5 jul. 2011 a las 12:23

En las versiones recientes de Celery (3.0+) puede usar un llamado acorde para lograr el efecto deseado:

De http://docs.celeryproject.org/en/latest/ userguide / canvas.html # the-primitives:

Acorde simple

La primitiva de acordes nos permite agregar una devolución de llamada para que se llame cuando todas las tareas en un grupo hayan terminado de ejecutarse, lo que a menudo se requiere para algoritmos que no son vergonzosamente paralelos:

 >>> from celery import chord
 >>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())()
 >>> res.get()
 90

Descargo de responsabilidad: todavía no lo he probado.

12
vadipp 12 ene. 2015 a las 14:08