Tengo una operación lenta y con uso intensivo de la CPU: doWork(int x), que se llama con un único parámetro entero con diferentes valores, de la siguiente manera:

static String doWork(int x) {
  // do work that depends on i, takes ~60 seconds
  ...
}

public static void main(String args[]) {
  for (int i = 1; i < 100; i++) {
    System.println(doWork(i));
  }
}

A medida que se completa cada llamada a doWork(), el resultado se envía a la consola. Me gustaría paralelizar esto: todas las llamadas doWork() son independientes y no mutan ningún estado compartido. Ahora, puedo hacerlo de la manera anterior, jugando con ExecutorSevice y Future.get() y así sucesivamente, pero me gustaría hacerlo de manera más limpia con las transmisiones 1 .

Entonces, algo como esto parece que casi podría funcionar:

public static void main(String args[]) {
    IntStream.rangeClosed(1, 100).parallel()
        .forEach(i -> System.out.println(doWork(i)));
}

... pero el problema es que quiero conservar el orden de salida en la consola (la línea para doWork(1) debería ir primero, y así sucesivamente). No puedo usar forEachOrdered() porque eso serializa toda la operación: solo se usaría un único hilo. La raíz del problema es que forEachOrdered proporciona una garantía demasiado fuerte: que el método del consumidor se llama secuencialmente en un elemento a la vez. Quiero que se llame a los consumidores en paralelo, pero que la salida esté en orden.

Entonces probablemente debería mirar un modismo de tipo map -> collect en su lugar, donde recopilo la salida de cada llamada doWork() en una cadena y la imprimo una vez:

public static void main(String[] args) {
    System.out.println(IntStream.rangeClosed(1, 100).parallel()
        .mapToObj(Main::doWork).collect(Collectors.joining("\n")));
}

¡Casi! El método collect() mantiene el orden de los encuentros, por lo que mis elementos están ordenados. El problema ahora es que no hay salida incremental: todo el trabajo debe finalizar antes de que se produzca cualquier salida. Realmente quiero preservar el comportamiento donde las actualizaciones se filtran a la consola.

Supongo que quiero algún tipo de operación de terminal de consumo ordenada , que no obligue a ordenar toda la tubería. Básicamente, recopilaría resultados internamente como un recopilador normal, pero cuando se recopila el elemento "más a la izquierda" actual, lo pasaría al consumidor, por lo que el consumidor ve un flujo de elementos ordenados, pero todo sigue sucediendo en paralelo.

¿Hay algo por ahí como? No parece posible construirlo en la interfaz Collector existente, ya que no le da una forma de determinar cuál es el orden de los elementos.


1

4
BeeOnRope 16 dic. 2016 a las 03:15

2 respuestas

La mejor respuesta

Estás bastante cerca. Simplemente combine las soluciones map y forEachOrdered:

IntStream.rangeClosed(1, 100)
         .parallel()
         .mapToObj(Main::doWork)
         .forEachOrdered(System.out::println);
6
shmosel 16 dic. 2016 a las 00:19

FWIW, esto es con lo que terminé, ya que la respuesta de shmosel es correcta pero no proporcionó un orden "casi FIFO" particularmente útil debido a cómo funcionan los flujos paralelos:

IntStream.rangeClosed(1, 100)
    .mapToObj(i -> CompletableFuture.supplyAsync(() -> doWork(i)))
    .collect(Collectors.toList())
    .forEach(f -> System.out.println(f.join()));

Básicamente, usa la secuencia para enviar, en orden, todo el trabajo al ejecutor predeterminado (el mismo que se usa para las secuencias parallel). Luego collect() son los futuros resultantes (eso es necesario para que realmente ocurran todos los envíos de trabajos), y luego itera (la llamada forEach) los objetos Future resultantes, obteniendo uno por uno.

Esta implementación hace que el trabajo se haga de una manera más o menos FIFO (más o menos, por supuesto, si tiene 3 subprocesos en el grupo, se ejecutarán aproximadamente 3 trabajos a la vez, pero generalmente son los tres primeros). En el caso de 3 subprocesos y trabajos que toman aproximadamente la misma cantidad de tiempo, verá que la salida aparece, en orden, en ráfagas de 3 resultados a la vez.

2
BeeOnRope 16 dic. 2016 a las 18:32