Me pregunto si es posible iniciar varios observadores para una colección.

En mi caso, comienzo varios subprocesos monitoreando una colección, pero el mongo devuelve el mismo documento a todos los observadores.

¿Cómo consigo que MongoDB divida la carga del documento entre estos subprocesos iniciados?

Iniciador de hilo:

    for (var i = 0; i < 4; i++)
    {
                ThreadMongoDB thWork = new ThreadMongoDB(_config, _serviceProvider);
                thWork.ThreadName = "TH-" + i.ToString();

                Thread th = new Thread(thWork.Process);

                await Task.Run(() =>
                {
                    th.Start();
                });
    }

Código en la clase ThreadMongoDB:

public class ThreadMongoDB
    {
        readonly IMongoDatabase _db;
        readonly Executer _exec;

        public string ThreadName { get; set; }


        public ThreadMongoDB(IConfiguration iconfig, IServiceProvider serviceProvider)
        {
            var typeConn = iconfig["TypeConn"];
            var client = new MongoClient(iconfig[$"{typeConn}:cnnMain"]);
            _db = client.GetDatabase(iconfig[$"{typeConn}:dbMain"]);

            _exec = new Executer(iconfig, serviceProvider);
        }

        public void Process()
        {
            var options = new ChangeStreamOptions() { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };

            var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<WorkerQueue>>()
                .Match("{ operationType: { $in: [ 'replace', 'insert', 'update' ] }}");
                //.Match("{ $or: [ {operationType: 'replace' }, { operationType: 'insert' }, { operationType: 'update' } ] }");

            using (var pilotQueueStream = _db.GetCollection<WorkerQueue>("one_queue_pilot")
                .Watch(pipeline, options).ToEnumerable().GetEnumerator())
            {
                while (true)
                {
                    pilotQueueStream.MoveNext();

                    WorkerQueue currentReg = pilotQueueStream.Current.FullDocument;

                    if (currentReg != null)
                    {
                        if (!currentReg.boolDone)
                        {
                           _exec.Execute(ThreadName, currentReg);
                        }
                    }
                }
            }
        }
    }
}
0
Leandro 26 sep. 2019 a las 04:35

1 respuesta

La mejor respuesta

Me pregunto si es posible iniciar varios observadores para una colección.

En mi caso, comienzo varios subprocesos monitoreando una colección, pero el mongo devuelve el mismo documento a todos los observadores.

Se espera este resultado: puede tener múltiples flujos de cambios activos, pero cada flujo es independiente y no hay coordinación entre los observadores. Ver transmisiones idénticas resultará en eventos idénticos.

¿Cómo consigo que MongoDB divida la carga del documento entre estos subprocesos iniciados?

Este requisito deberá tratarse en su solicitud.

Si sus observadores están realizando un procesamiento significativo para los eventos de flujo de cambios y desea distribuir la carga de trabajo, algunos enfoques a considerar son:

  • Haga que un observador de flujo de cambios envíe eventos a una cola de trabajo / trabajo central para que puedan reclamar y procesar trabajos de la cola. Este es un patrón típico de procesamiento por lotes.

  • Haga que cada observador incluya una etapa $match que filtra un subconjunto diferente de eventos de flujo de cambios para procesar. Este enfoque requiere cierta planificación para garantizar que los eventos no se superpongan y es menos escalable si algunos tipos de eventos necesitarán más trabajadores que otros.

Antes de implementar cualquiera de los enfoques, definitivamente valdría la pena confirmar que su carga de trabajo requiere más trabajadores / subprocesos.

0
Stennie 26 sep. 2019 a las 03:18