Quiero implementar un hilo que pueda aceptar punteros de función de un hilo principal y ejecutarlos en serie. Mi idea era usar una estructura que mantenga el puntero de la función y su objeto y seguir empujándolo a una cola. Esto se puede encapsular en una clase. El hilo de la tarea puede entonces salir de la cola y procesarlo. También necesito sincronizarlo (¿para que no bloquee el hilo principal?), Así que estaba pensando en usar un semáforo. Aunque tengo una idea decente de la estructura del programa, tengo problemas para codificar esto, especialmente el subproceso y la sincronización de semáforos en C ++ 11. Sería genial si alguien pudiera sugerir un esquema mediante el cual pueda implementar esto.

EDITAR: La pregunta duplicada responde a la pregunta sobre la creación de un grupo de subprocesos. Parece que se están creando varios subprocesos para hacer algo de trabajo. Solo necesito un hilo que pueda poner en cola los punteros de función y procesarlos en el orden en que se reciben.

0
rookie 15 feb. 2018 a las 09:37

2 respuestas

La mejor respuesta

Tu problema tiene 2 partes. Almacenar la lista de trabajos y manipular la lista de trabajos de forma segura.

Para la primera parte, observe std::function, std::bind y std::ref.

Para la segunda parte, esto es similar al problema productor / consumidor. Puede implementar un semáforo usando std::mutex y std::condition_variable.

Hay una pista / esquema. Ahora mi respuesta completa ...

Paso 1)

Almacene sus punteros de función en una cola de std :: function.

std::queue<std::function<void()>>

Cada elemento de la cola es una función que no toma argumentos y devuelve void.

Para funciones que toman argumentos, use std::bind para vincular los argumentos.

void testfunc(int n);
...
int mynum = 5;
std::function<void()> f = std::bind(testfunction, mynum);

Cuando se invoca f, es decir, f(), 5 se pasará como argumento 1 a testfunc. std::bind copia mynum por valor inmediatamente.

Probablemente también desee poder pasar variables por referencia. Esto es útil para obtener resultados de funciones, así como para pasar dispositivos de sincronización compartidos como semáforos y condiciones. Utilice std::ref, el contenedor de referencia.

void testfunc2(int& n);  // function takes n by ref
...
int a = 5;
std::function<void()> f = std::bind(testfunction, std::ref(a));

std::function y std::bind pueden funcionar con cualquier invocable (funciones, functores o lambdas), lo cual es bastante bueno.

Paso 2)

Un subproceso de trabajo se retira de la cola mientras la cola no está vacía. Su código debe verse similar al problema productor / consumidor.

class AsyncWorker
{
    ...

public:
    // called by main thread
    AddJob(std::function<void()> f)
    {
        {
            std::lock_guard<std::mutex> lock(m_mutex);
            m_queue.push(std::move(f));
            ++m_numJobs;
        }
        m_condition.notify_one();  // It's good style to call notify_one when not holding the lock. 
    }

private:
    worker_main()
    {
        while(!m_exitCondition)
            doJob();
    }

    void doJob()
    {
        std::function<void()> f;
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            while (m_numJobs == 0)
                m_condition.wait(lock);

            if (m_exitCondition)
                return;

            f = std::move(m_queue.front());
            m_queue.pop();
            --m_numJobs;
        }
        f();
    }

    ...

Nota 1: el código de sincronización ... con m_mutex, m_condition y m_numJobs ... es esencialmente lo que debe usar para implementar un semáforo en C ++ '11. Lo que hice aquí es más eficiente que usar una clase de semáforo separada porque solo 1 bloqueo está bloqueado. (Un semáforo tendría su propio bloqueo y aún tendría que bloquear la cola compartida).

Nota 2: puede agregar fácilmente conversaciones de trabajadores adicionales.

Nota 3: m_exitCondition en mi ejemplo es una std::atomic<bool>

En realidad, configurar la función AddJob de una manera polimórfica entra en las 11 plantillas variadas de C ++ y el reenvío perfecto ...

class AsyncWorker
{
    ...

public:
    // called by main thread
    template <typename FUNCTOR, typename... ARGS>
    AddJob(FUNCTOR&& functor, ARGS&&... args)
    {
        std::function<void()> f(std::bind(std::forward<FUNCTOR>(functor), std::forward<ARGS&&>(args)...));
        {
            std::lock_guard<std::mutex> lock(m_mutex);
            m_queue.push(std::move(f));
            ++m_numJobs;
        }
        m_condition.notify_one();  // It's good style to call notify_one when not holding the lock. 
    }

Creo que puede funcionar si solo usa paso por valor en lugar de usar las referencias de reenvío, pero no lo he probado, aunque sé que el reenvío perfecto funciona muy bien. Evitar el reenvío perfecto puede hacer que el concepto sea un poco menos confuso, pero el código no será muy diferente ...

1
Humphrey Winnebago 15 feb. 2018 a las 21:49

Verifique este fragmento de código, lo he implementado sin usar una clase. Vea si ayuda un poco. La variable condicional podría evitarse aquí, pero quiero que el hilo del lector realice una encuesta solo cuando haya una señal del escritor para que los ciclos de CPU en el lector no se desperdicien.

#include <iostream>
#include <functional>
#include <mutex>
#include <thread>
#include <queue>
#include <chrono>
#include <condition_variable>

using namespace std;

typedef function<void(void)> task_t;

queue<task_t> tasks;
mutex mu;
condition_variable cv;

bool stop = false;

void writer()
{
    while(!stop)
    {
        {
            unique_lock<mutex> lock(mu);
            task_t task = [](){ this_thread::sleep_for(chrono::milliseconds(100ms));   };
            tasks.push(task);
            cv.notify_one();
        }

        this_thread::sleep_for(chrono::milliseconds(500ms)); // writes every 500ms
    }
}

void reader()
{
    while(!stop)
    {
        unique_lock<mutex> lock(mu);
        cv.wait(lock,[]() { return !stop;});  
        while( !tasks.empty() )
        {

            auto task = tasks.front();            
            tasks.pop();
            lock.unlock();
            task();
            lock.lock();
        }

    }
}

int main()
{
    thread writer_thread([]() { writer();}  );
    thread reader_thread([]() { reader();}  );

    this_thread::sleep_for(chrono::seconds(3s)); // main other task

    stop = true;


    writer_thread.join();
    reader_thread.join();
}
2
seccpur 15 feb. 2018 a las 08:40