Estoy tratando de resolver un problema, parece que no es posible con Java.

Tengo un código que llama a un processObject (SomeObject someObject) y el método lo procesa. Estoy tratando de encapsular todo esto y quiero obtener un flujo de someObjects.

A continuación se muestra mi programa de muestra:

import java.util.stream.Stream;

public class ProcessObject {

    public static void main(String[] args) {
        int i = 0;
        ProcessObject processObject = new ProcessObject();
        while (true) {
            processObject.processObject(new SomeObject("Hello " + i++));
        }
    }

    public void processObject(SomeObject someObject) {
        System.out.println(someObject);

    }
    //TODO
    public Stream<SomeObject> getStream(){
        //Producer here should wait and produce Objects as soon as 
        //they become available like "processObject" method.
        return Stream.generate(() -> new SomeObject("Hello "));
    }
}


class SomeObject {
    public String name;

    public SomeObject(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return name;
    }
}

El método principal estático sigue generando SomeObjects y llama al método processObject para procesarlos y los imprime. Entonces, todo está bien.

Quiero crear una secuencia de SomeObject para que, en lugar de llamar al método para procesarlos, solo use secuencias para procesarlas, algo como esto:

getStream de secuencia pública ();

Ahora, con Java 8 o Java 9, hay Streams disponibles. Pero ponga la condición de características INMUTABLES de las fuentes que se están generando.

¿Cómo crear una transmisión y luego agregar elementos a la transmisión tan pronto como estén disponibles como una verdadera canalización?

Pensé en usar un BlockingQueue y usarlo en el método Stream Generate como Productor comoblingQueue.take () pero nunca se compila.

2
abcdef12 14 nov. 2017 a las 09:55

2 respuestas

La mejor respuesta

Para usar un BlockingQueue para alimentar el Stream, necesitará que el productor y el consumidor se ejecuten en diferentes subprocesos.

Aquí estoy usando mi hilo principal para consumir el flujo y un hilo nuevo para alimentarlo a través de un BlockingQueue para demostrarlo.

public void test(String[] args) {
    // My queue
    BlockingQueue<BigInteger> queue = new ArrayBlockingQueue<>(10);

    // A Stream of it's contents.
    Stream<BigInteger> biStream = Stream.generate(() -> {
        try {
            return queue.take();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    });

    // Feed the queue from a thread.
    new Thread(new Runnable() {
        // Must be final to be accessible inside `run`.
        final AtomicInteger i = new AtomicInteger();

        @Override
        public void run() {
            // Slow feed to the queue.
            while (true) {
                // Add a new number to the queue.
                queue.add(BigInteger.valueOf(i.getAndIncrement()));
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }).start();

    // DEMO - Consumes the queue printing contents as they arrive.
    biStream.filter(x -> x.testBit(2))
            .limit(20)
            .forEach(x -> System.out.println(x));
}
1
OldCurmudgeon 15 nov. 2017 a las 11:13

El problema aquí es que no puede mutar una variable local i en la expresión lambda, ¿no es así? Bueno, dado que una lambda es básicamente una clase interna anónima, ¡puede poner i como un campo de la clase interna!

Stream<String> stream = Stream.generate(new Supplier<String>() {
    int i = 0;
    // if you have any other state you want to mutate, put it here as well!
    @Override
    public String get() {
        return "Hello" + i++;
    }
});
// prints Hello0 to Hello9!
System.out.println(stream.limit(10).collect(Collectors.toList()));
0
Sweeper 14 nov. 2017 a las 07:10