Tengo el siguiente código que hace que Nuke se cuelgue. Básicamente, lo que intento hacer es obtener una lista de archivos y carpetas del sistema de archivos, y estoy tratando de acelerarlo a través del procesamiento paralelo. Esto funciona perfectamente fuera de Nuke, pero como dije antes, ejecutar esto en Nuke hará que Nuke se cuelgue. ¿Hay una mejor manera de hacer esto que haga que Nuke no se cuelgue? Preferiblemente, me gustaría arreglar esto a través de la biblioteca estándar de Python, o paquetes que son independientes de la plataforma. Pero, si no hay forma de hacer eso, entonces estoy bien con eso. En el peor de los casos, tendré que volver a no usar el procesamiento paralelo y buscar otras optimizaciones.

Además, cuando ejecuto este código en Nuke, aparece el siguiente error en la consola:

Unknown units in -c from multiprocessing.forking import main; main()

El código:

#!/bin/env python

import multiprocessing
import os

CPU_COUNT = multiprocessing.cpu_count()


def _threaded_master(root):
    in_queue = multiprocessing.JoinableQueue()
    folder_queue = multiprocessing.JoinableQueue()
    file_queue = multiprocessing.JoinableQueue()

    in_queue.put(root)

    for _ in xrange(CPU_COUNT):
        multiprocessing.Process(target=_threaded_slave, args=(in_queue, folder_queue, file_queue)).start()

    in_queue.join()

    return {"folders": folder_queue, "files": file_queue}


def _threaded_slave(in_queue, folder_queue, file_queue):
    while True:
        path_item = in_queue.get()

        if os.path.isdir(path_item):
            for item in os.listdir(path_item):
                path = os.path.join(path_item, item)
                in_queue.put(path)

        in_queue.task_done()


if __name__ == "__main__":
    print _threaded_master(r"/path/to/root")
2
ScottWilson 27 ago. 2014 a las 21:00

2 respuestas

La mejor respuesta

Aquí está mi código para escanear a través de un gran árbol de directorios usando varios hilos.

Originalmente escribí el código para usar el viejo multiprocessing.Pool(), porque es muy fácil y te da los resultados de las funciones. Las colas de entrada y salida no son necesarias. Otra diferencia es que utiliza procesos sobre hilos , que tienen algunas compensaciones.

El Pool tiene un gran inconveniente: supone que tiene una lista estática de elementos para procesar.

Entonces, reescribí el código siguiendo su ejemplo original: cola de entrada / salida de directorios para procesar, y una cola de salida. La persona que llama tiene que tomar elementos explícitamente de la cola de salida.

En el caso de las sonrisas, hice una comparación de tiempos con el viejo os.walk() y ... al menos en mi máquina, la solución tradicional era más rápida. Las dos soluciones produjeron un número bastante diferente de archivos, que no puedo explicar.

¡Que te diviertas!

Fuente

#!/bin/env python

import multiprocessing, threading, time
import logging, os, Queue, sys

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)-4s %(levelname)s %(threadName)s %(message)s", 
    datefmt="%H:%M:%S",
    stream=sys.stderr,
)

def scan_dir(topdir):
    try:
        for name in os.listdir(topdir):
            path = os.path.join(topdir, name)
            yield (path, os.path.isdir(path))
    except OSError:
        logging.error('uhoh: %s', topdir)

def scan_dir_queue(inqueue, outqueue):
    logging.info('start')
    while True:
        try:
            dir_item = inqueue.get_nowait()
        except Queue.Empty:
            break

        res = list( scan_dir(dir_item) )
        logging.debug('- %d paths', len(res))
        for path,isdir in res:
            outqueue.put( (path,isdir) )
            if isdir:
                inqueue.put(path)
    logging.info('done')

def thread_master(root):
    dir_queue = Queue.Queue() # pylint: disable=E1101
    dir_queue.put(root)
    result_queue = Queue.Queue()

    threads = [
        threading.Thread(
            target=scan_dir_queue, args=[dir_queue, result_queue]
        )
        for _ in range(multiprocessing.cpu_count())
    ]

    for th in threads:
        th.start()
    for th in threads:
        th.join()
    return result_queue.queue

if __name__ == "__main__":
    topdir = os.path.expanduser('~')

    start = time.time()
    res = thread_master(topdir)
    print 'threaded:', time.time() - start
    print len(res), 'paths'

    def mywalk(topdir):
        for (dirpath, _dirnames, filenames) in os.walk(topdir):
            for name in filenames:
                yield os.path.join(dirpath, name)
    start = time.time()
    res = list(mywalk(topdir))
    print 'os.walk:', time.time() - start
    print len(res), 'paths'

Salida

11:56:35 INFO Thread-1 start
11:56:35 INFO Thread-2 start
11:56:35 INFO Thread-3 start
11:56:35 INFO Thread-4 start
11:56:35 INFO Thread-2 done
11:56:35 INFO Thread-3 done
11:56:35 INFO Thread-4 done
11:56:42 INFO Thread-1 done
threaded: 6.49218010902
299230 paths
os.walk: 1.6940600872
175741 paths
2
johntellsall 27 ago. 2014 a las 19:49

Aquí hay un enlace para consultar: https://learn.foundry.com /nuke/developers/63/pythondevguide/threading.html

Lo que es notable es la advertencia mencionada allí: nuke.executeInMainThread y nuke.executeInMainThreadWithResult siempre deben ejecutarse desde un hilo secundario. Si se ejecutan desde el hilo principal, se congelan NUKE.

Entonces, genera un nuevo hilo hijo y haz tus cosas allí.

1
Dushyant 4 jun. 2019 a las 01:20