Suponga que está ejecutando Django en Linux, y tiene una vista, y desea que esa vista devuelva los datos de un subproceso llamado cmd que opera en un archivo que crea la vista, por ejemplo likeso:

 def call_subprocess(request):
     response = HttpResponse()

     with tempfile.NamedTemporaryFile("W") as f:
         f.write(request.GET['data']) # i.e. some data

     # cmd operates on fname and returns output
     p = subprocess.Popen(["cmd", f.name], 
                   stdout=subprocess.PIPE, 
                   stderr=subprocess.PIPE)

     out, err = p.communicate()

     response.write(p.out) # would be text/plain...
     return response

Ahora, supongamos que cmd tiene un tiempo de inicio muy lento, pero un tiempo de funcionamiento muy rápido y no tiene un modo demonio de forma nativa. Me gustaría mejorar el tiempo de respuesta de esta vista.

Me gustaría hacer que todo el sistema se ejecute mucho más rápido al iniciar una serie de instancias de cmd en un grupo de trabajadores, hacer que esperen la entrada y tener < strong> call_process pregunta a uno de esos procesos de grupo de trabajadores que maneja los datos.

Esto es realmente 2 partes:

Parte 1. Una función que llama a cmd y cmd espera la entrada. Esto podría hacerse con tuberías, es decir

def _run_subcmd():
    p = subprocess.Popen(["cmd", fname], 
        stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    out, err = p.communicate()
    # write 'out' to a tmp file
    o = open("out.txt", "W")
    o.write(out)
    o.close()
    p.close()
    exit()

def _run_cmd(data):
    f = tempfile.NamedTemporaryFile("W")
    pipe = os.mkfifo(f.name)

    if os.fork() == 0:
        _run_subcmd(fname)
    else:
        f.write(data)

    r = open("out.txt", "r")
    out = r.read()
    # read 'out' from a tmp file
    return out

def call_process(request):
    response = HttpResponse()

    out = _run_cmd(request.GET['data'])

    response.write(out) # would be text/plain...
    return response

Parte 2. Un conjunto de trabajadores que se ejecutan en segundo plano y esperan los datos. es decir, queremos extender lo anterior para que el subproceso ya se esté ejecutando, p. cuando la instancia de Django se inicializa, o esta call_process se llama por primera vez, se crea un conjunto de estos trabajadores

WORKER_COUNT = 6
WORKERS = []

class Worker(object):
    def __init__(index):
        self.tmp_file = tempfile.NamedTemporaryFile("W") # get a tmp file name
        os.mkfifo(self.tmp_file.name)
        self.p = subprocess.Popen(["cmd", self.tmp_file], 
            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        self.index = index

    def run(out_filename, data):
        WORKERS[self.index] = Null # qua-mutex??
        self.tmp_file.write(data)
        if (os.fork() == 0): # does the child have access to self.p??
            out, err = self.p.communicate()
            o = open(out_filename, "w")
            o.write(out)
            exit()

        self.p.close()
        self.o.close()
        self.tmp_file.close()
        WORKERS[self.index] = Worker(index) # replace this one
        return out_file

    @classmethod
    def get_worker() # get the next worker
    # ... static, incrementing index 

Debería haber alguna inicialización de trabajadores en alguna parte, como esta:

def init_workers(): # create WORKERS_COUNT workers
    for i in xrange(0, WORKERS_COUNT):
        tmp_file = tempfile.NamedTemporaryFile()
        WORKERS.push(Worker(i))

Ahora, lo que tengo arriba se convierte en algo gusta:

def _run_cmd(data):
     Worker.get_worker() # this needs to be atomic & lock worker at Worker.index

     fifo = open(tempfile.NamedTemporaryFile("r")) # this stores output of cmd

     Worker.run(fifo.name, data)
     # please ignore the fact that everything will be
     # appended to out.txt ... these will be tmp files, too, but named elsewhere.

     out = fifo.read()
     # read 'out' from a tmp file
     return out


def call_process(request):
     response = HttpResponse()

     out = _run_cmd(request.GET['data'])

     response.write(out) # would be text/plain...
     return response

Ahora, las preguntas:

  1. esto funcionara? (Acabo de escribir esto desde la parte superior de mi cabeza en StackOverflow, así que estoy seguro de que hay problemas, pero conceptualmente, ¿funcionará?)

  2. ¿Cuáles son los problemas a buscar?

  3. ¿Hay mejores alternativas a esto? p.ej. ¿Podrían los hilos funcionar igual de bien (es Debian Lenny Linux)? ¿Hay alguna biblioteca que maneje grupos de trabajadores de procesos paralelos como este?

  4. ¿Hay interacciones con Django de las que debería ser consciente?

¡Gracias por leer! Espero que encuentres este problema tan interesante como yo.

Brian

4
Brian M. Hunt 15 sep. 2009 a las 22:29

3 respuestas

La mejor respuesta

Puede parecer que estoy golpeando este producto, ya que esta es la segunda vez que respondo con una recomendación al respecto.

Pero parece que necesita un servicio de Message Queue Server, en particular una cola de mensajes distribuidos.

Así es como funcionará:

  1. Su aplicación Django solicita CMD
  2. CMD se agrega a una cola
  3. CMD es empujado a varios trabajos
  4. Se ejecuta y los resultados se devuelven aguas arriba

La mayor parte de este código existe y no es necesario que construyas tu propio sistema.

Echa un vistazo a Apio, que fue construido inicialmente con Django.

http://www.celeryq.org/ http://robertpogorzelski.com/blog/2009/09/10/rabbitmq-celery-and-django/

3
ismail 16 sep. 2009 a las 08:48

¿Qué hay de "demonizar" la llamada del subproceso usando python-daemon o su sucesor, < a href = "http://www.clapper.org/software/python/grizzled/" rel = "nofollow noreferrer"> canoso .

0
John Paulett 16 sep. 2009 a las 00:15

Issy ya mencionó a Apio, pero como los comentarios no funcionan bien con ejemplos de código, en su lugar responderé como respuesta.

Debe intentar utilizar Celery de forma sincronizada con el almacén de resultados AMQP. Puede distribuir la ejecución real a otro proceso o incluso a otra máquina. Ejecutar sincrónicamente en apio es fácil, por ejemplo:

>>> from celery.task import Task
>>> from celery.registry import tasks

>>> class MyTask(Task):
...
...     def run(self, x, y):
...         return x * y 
>>> tasks.register(MyTask)

>>> async_result = MyTask.delay(2, 2)
>>> retval = async_result.get() # Now synchronous
>>> retval 4

El almacén de resultados de AMQP hace que el envío de resultados sea muy rápido, pero solo está disponible en la versión de desarrollo actual (en congelación de código para convertirse en 0.8.0)

3
asksol 18 sep. 2009 a las 17:05