Es bastante fácil hacer un trabajo paralelo con el módulo concurrent.futures de Python 3 como se muestra a continuación.

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    future_to = {executor.submit(do_work, input, 60): input for input in dictionary}
    for future in concurrent.futures.as_completed(future_to):
        data = future.result()  

También es muy útil para insertar y recuperar elementos en una cola.

q = queue.Queue()
for task in tasks:
q.put(task)
while not q.empty():
   q.get()

Tengo un script ejecutándose en segundo plano escuchando actualizaciones. Ahora, en teoría, suponga que, a medida que lleguen esas actualizaciones, las pondría en cola y trabajaría en ellas simultáneamente usando ThreadPoolExecutor.

Ahora, individualmente, todos estos componentes funcionan de forma aislada y tienen sentido, pero ¿cómo hago para usarlos juntos? No sé si es posible alimentar el trabajo ThreadPoolExecutor desde la cola en tiempo real a menos que los datos para trabajar estén predeterminados.

En pocas palabras, todo lo que quiero hacer es recibir actualizaciones de, digamos, 4 mensajes por segundo, meterlos en una cola y hacer que mis futuros concurrentes trabajen en ellos. Si no lo hago, entonces estoy atrapado con un enfoque secuencial que es lento.

Tomemos el ejemplo canónico en la documentación de Python a continuación:

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

La lista de URLS es fija. ¿Es posible alimentar esta lista en tiempo real y hacer que el trabajador la procese a medida que avanza, tal vez desde una cola para fines de gestión? ¿Estoy un poco confundido sobre si mi enfoque es realmente posible ?

14
Ali Gajani 14 ene. 2017 a las 11:24

3 respuestas

La mejor respuesta

El ejemplo de los documentos de Python, ampliado para tomar su Trabajar desde una cola. Un cambio a tener en cuenta es que este código usa concurrent.futures.wait en lugar de concurrent.futures.as_completed para permitir que se inicie un nuevo trabajo mientras se espera que se complete otro trabajo.

import concurrent.futures
import urllib.request
import time
import queue

q = queue.Queue()

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

def feed_the_workers(spacing):
    """ Simulate outside actors sending in work to do, request each url twice """
    for url in URLS + URLS:
        time.sleep(spacing)
        q.put(url)
    return "DONE FEEDING"

def load_url(url, timeout):
    """ Retrieve a single page and report the URL and contents """
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:

    # start a future for a thread which sends work in through the queue
    future_to_url = {
        executor.submit(feed_the_workers, 0.25): 'FEEDER DONE'}

    while future_to_url:
        # check for status of the futures which are currently working
        done, not_done = concurrent.futures.wait(
            future_to_url, timeout=0.25,
            return_when=concurrent.futures.FIRST_COMPLETED)

        # if there is incoming work, start a new future
        while not q.empty():

            # fetch a url from the queue
            url = q.get()

            # Start the load operation and mark the future with its URL
            future_to_url[executor.submit(load_url, url, 60)] = url

        # process any completed futures
        for future in done:
            url = future_to_url[future]
            try:
                data = future.result()
            except Exception as exc:
                print('%r generated an exception: %s' % (url, exc))
            else:
                if url == 'FEEDER DONE':
                    print(data)
                else:
                    print('%r page is %d bytes' % (url, len(data)))

            # remove the now completed future
            del future_to_url[future]

Salida de buscar cada url dos veces:

'http://www.foxnews.com/' page is 67574 bytes
'http://www.cnn.com/' page is 136975 bytes
'http://www.bbc.co.uk/' page is 193780 bytes
'http://some-made-up-domain.com/' page is 896 bytes
'http://www.foxnews.com/' page is 67574 bytes
'http://www.cnn.com/' page is 136975 bytes
DONE FEEDING
'http://www.bbc.co.uk/' page is 193605 bytes
'http://some-made-up-domain.com/' page is 896 bytes
'http://europe.wsj.com/' page is 874649 bytes
'http://europe.wsj.com/' page is 874649 bytes
16
Stephen Rauch 14 ene. 2017 a las 23:34

Realmente me gustó el enfoque interesante de @pedro arriba. Sin embargo, al procesar miles de archivos, noté que al final se lanzaría una StopIteration y siempre se omitirían algunos archivos. Tuve que hacer una pequeña modificación de la siguiente manera. Muy útil respuesta de nuevo.

class BufferedIter(object):
    def __init__(self, iterator):
        self.iter = iterator

    def nextN(self, n):
        vals = []
        try:
            for _ in range(n):
                vals.append(next(self.iter))
            return vals, False
        except StopIteration as e:
            return vals, True

- Llame de la siguiente manera

...
if not is_shutting_down():
   items, is_finished = buffer.nextN(idle_workers)
   if is_finished:
        stop()
...

- Donde parar es una función que simplemente le dice al apagado

def stop():
    shutting_down.put(None)
0
Vitalis 11 abr. 2020 a las 16:19

En el trabajo, encontré una situación en la que quería hacer un trabajo paralelo en una secuencia de datos ilimitada. Creé una pequeña biblioteca inspirada en la excelente respuesta ya proporcionada por Stephen Rauch.

Originalmente abordé este problema pensando en dos hilos separados, uno que envía el trabajo a una cola y otro que monitorea la cola para cualquier tarea completada y deja más espacio para que ingrese trabajo nuevo. Esto es similar a lo que propuso Stephen Rauch, donde él consume la secuencia usando una función feed_the_workers que se ejecuta en un hilo separado.

Hablando con uno de mis colegas, me ayudó a darme cuenta de que puede salirse con la suya haciendo todo en un solo hilo si define un iterador con búfer que le permite controlar cuántos elementos salen de la secuencia de entrada cada vez que está listo para enviar más trabajo al grupo de subprocesos.

Entonces presentamos la clase BufferedIter

class BufferedIter(object):
    def __init__(self, iterator):
        self.iter = iterator

    def nextN(self, n):
        vals = []
        for _ in range(n):
            vals.append(next(self.iter))
        return vals

Lo que nos permite definir el procesador de flujo de la siguiente manera

import logging
import queue
import signal
import sys
import time
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED

level = logging.DEBUG
log = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter('%(asctime)s %(message)s'))
handler.setLevel(level)
log.addHandler(handler)
log.setLevel(level)

WAIT_SLEEP = 1  # second, adjust this based on the timescale of your tasks


def stream_processor(input_stream, task, num_workers):

    # Use a queue to signal shutdown.
    shutting_down = queue.Queue()

    def shutdown(signum, frame):
        log.warning('Caught signal %d, shutting down gracefully ...' % signum)
        # Put an item in the shutting down queue to signal shutdown.
        shutting_down.put(None)

    # Register the signal handler
    signal.signal(signal.SIGTERM, shutdown)
    signal.signal(signal.SIGINT, shutdown)

    def is_shutting_down():
        return not shutting_down.empty()

    futures = dict()
    buffer = BufferedIter(input_stream)
    with ThreadPoolExecutor(num_workers) as executor:
        num_success = 0
        num_failure = 0
        while True:
            idle_workers = num_workers - len(futures)

            if not is_shutting_down():
                items = buffer.nextN(idle_workers)
                for data in items:
                    futures[executor.submit(task, data)] = data

            done, _ = wait(futures, timeout=WAIT_SLEEP, return_when=ALL_COMPLETED)
            for f in done:
                data = futures[f]
                try:
                    f.result(timeout=0)
                except Exception as exc:
                    log.error('future encountered an exception: %r, %s' % (data, exc))
                    num_failure += 1
                else:
                    log.info('future finished successfully: %r' % data)
                    num_success += 1

                del futures[f]

            if is_shutting_down() and len(futures) == 0:
                break

        log.info("num_success=%d, num_failure=%d" % (num_success, num_failure))

A continuación mostramos un ejemplo de cómo usar el procesador de flujo

import itertools

def integers():
    """Simulate an infinite stream of work."""
    for i in itertools.count():
        yield i


def task(x):
    """The task we would like to perform in parallel.
    With some delay to simulate a time consuming job.
    With a baked in exception to simulate errors.
    """
    time.sleep(3)
    if x == 4:
        raise ValueError('bad luck')
    return x * x

stream_processor(integers(), task, num_workers=3)

La salida para este ejemplo se muestra a continuación

2019-01-15 22:34:40,193 future finished successfully: 1
2019-01-15 22:34:40,193 future finished successfully: 0
2019-01-15 22:34:40,193 future finished successfully: 2
2019-01-15 22:34:43,201 future finished successfully: 5
2019-01-15 22:34:43,201 future encountered an exception: 4, bad luck
2019-01-15 22:34:43,202 future finished successfully: 3
2019-01-15 22:34:46,208 future finished successfully: 6
2019-01-15 22:34:46,209 future finished successfully: 7
2019-01-15 22:34:46,209 future finished successfully: 8
2019-01-15 22:34:49,215 future finished successfully: 11
2019-01-15 22:34:49,215 future finished successfully: 10
2019-01-15 22:34:49,215 future finished successfully: 9
^C <=== THIS IS WHEN I HIT Ctrl-C
2019-01-15 22:34:50,648 Caught signal 2, shutting down gracefully ...
2019-01-15 22:34:52,221 future finished successfully: 13
2019-01-15 22:34:52,222 future finished successfully: 14
2019-01-15 22:34:52,222 future finished successfully: 12
2019-01-15 22:34:52,222 num_success=14, num_failure=1
3
Pedro M Duarte 15 ene. 2019 a las 22:55