Tengo un generador Flux (del reactor del proyecto) para flujo que demora 1 segundo en emitir cada elemento. Y debido a que la secuencia es infinita, también decido cuántos elementos me gustaría tomar usando el método take(). Quiero probar que dicho método se puede ejecutar en un tiempo determinado. Intenté crear mi solución inspirada en esta respuesta pero sin éxito.

Este es el método Flux

import reactor.core.publisher.Flux;
import java.time.Duration;
@Slf4j
public class FluxAndMonoStreams {
    public Flux<Double> createFluxDoubleWithDelay(long numberOfElements, long delaySec) {
        return Flux.interval(Duration.ofSeconds(delaySec))
                .map(l -> l.doubleValue())
                .take(numberOfElements)
                .log();
    }
}

Y esta es la prueba unitaria. Probé los enfoques usando .thenConsumeWhile(value -> true), .expectNextCount(4), .expectNext(0.0, 1.0, 2.0, 3.0) después de esperar 4 segundos en .thenAwait(Duration.ofSeconds(4)). Pero ninguno de ellos funciona.

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
public class FluxAndMonoStreamsTest {
    FluxAndMonoStreams myFluxAndMonoStreams = new FluxAndMonoStreams();
    @Test
    void testCreateFluxStreamWithMapAndVerifyDelay() {
        Flux<Double> streamLongFlux = myFluxAndMonoStreams
                .createFluxDoubleWithDelay(4, 1);

        Scheduler scheduler = Schedulers.newSingle("test");
        AtomicInteger incrementer = new AtomicInteger();

        StepVerifier
                .withVirtualTime(() -> streamLongFlux
                        .subscribeOn(scheduler)
                        .doOnNext(value -> incrementer.incrementAndGet())
                )
                .expectSubscription()
                .thenAwait(Duration.ofSeconds(4))
                // .thenConsumeWhile(value -> true)
                // .expectNextCount(4)
                // .expectNext(0.0, 1.0, 2.0, 3.0)
                .verifyComplete()
        ;
    }
}

Recibo este error que dice que no estoy consumiendo valores 1.0, 2.0, ... Básicamente, quiero asegurarme de consumir solo los valores 0.0, 1.0, 2.0, 3.0 en 4 segundos. ¿Cómo puedo hacer eso?

12:37:31.853 [Test worker] DEBUG reactor.util.Loggers - Using Slf4j logging framework
12:37:31.912 [test-1] INFO reactor.Flux.Take.1 - onSubscribe(FluxTake.TakeSubscriber)
12:37:31.915 [test-1] INFO reactor.Flux.Take.1 - request(unbounded)
12:37:32.917 [parallel-1] INFO reactor.Flux.Take.1 - onNext(0.0)
12:37:32.921 [parallel-1] INFO reactor.Flux.Take.1 - cancel()

expectation "expectComplete" failed (expected: onComplete(); actual: onNext(0.0))
java.lang.AssertionError: expectation "expectComplete" failed (expected: onComplete(); actual: onNext(0.0))

Explicación adicional : Solo para aclarar una vez más el comportamiento que quiero. Si quito el comentario de .expectNext(0.0, 1.0, 2.0, 3.0) y reemplazo de 4 segundos a 2 segundos en .thenAwait(Duration.ofSeconds(2)), la prueba debería fallar. Pero no lo es.

1
Felipe 12 mar. 2021 a las 14:42

2 respuestas

La mejor respuesta

Alguien más me ayudó y estoy publicando la solución aquí. Estoy usando JUnit 5 tiempo de espera. Usando @Timeout(value = 5, unit = TimeUnit.SECONDS) la prueba pasa, si uso 4 segundos o menos la prueba falla. La solución no se basa en StepVerifier pero aún logra sus propósitos.

    @Test
    @Timeout(value = 5, unit = TimeUnit.SECONDS)
    void testCreateFluxStreamWithMapAndVerifyDelay() {
        Flux<Double> streamLongFlux = myFluxAndMonoStreams
                .createFluxDoubleWithDelay(4, 1);

        StepVerifier.create(streamLongFlux)
                .expectSubscription()
                .expectNext(0.0, 1.0, 2.0, 3.0)
                .verifyComplete();
    }
0
Felipe 13 mar. 2021 a las 12:25

Esta prueba funciona "bien" para mí si elimino el comentario de hopeNext.

    public Flux<Double> create(long numberOfElements, long delaySec) {
        return Flux.interval(Duration.ofSeconds(delaySec)).map(l -> l.doubleValue()).take(numberOfElements).log();
    }

    @Test
    public void virtualTimeTest() {
        Flux<Double> streamLongFlux = create(4,1);

        Scheduler scheduler = Schedulers.newSingle("test");
        AtomicInteger incrementer = new AtomicInteger();

        StepVerifier
                .withVirtualTime(() -> streamLongFlux
                        .subscribeOn(scheduler)
                        .doOnNext(value -> incrementer.incrementAndGet())
                )
                .expectSubscription()
                .thenAwait(Duration.ofSeconds(4))
                // .thenConsumeWhile(value -> true)
                // .expectNextCount(4)
                 .expectNext(0.0, 1.0, 2.0, 3.0)
                .verifyComplete()
        ;
    }

No se está ejecutando en tiempo virtual porque se llama a Flux.interval antes de que se establezca el tiempo virtual.

 StepVerifier.withVirtualTime(() -> create(4,1))
                    .expectSubscription()
                    .thenAwait(Duration.ofSeconds(4))
                    .expectNext(0.0, 1.0, 2.0, 3.0)
                    .verifyComplete();

Esto se ejecuta en tiempo virtual y tarda 400 ms en lugar de 4,4 segundos.

Otra prueba aún más explícita sería:

@Test
    public void virtualTime(){
        StepVerifier.withVirtualTime(() -> create(4,1))
                    .expectSubscription()
                    .expectNoEvent(Duration.ofSeconds(1))
                    .expectNext(0.0)
                    .expectNoEvent(Duration.ofSeconds(1))
                    .expectNext(1.0)
                    .expectNoEvent(Duration.ofSeconds(1))
                    .expectNext(2.0)
                    .expectNoEvent(Duration.ofSeconds(1))
                    .expectNext(3.0)
                    .verifyComplete();
    }
1
p.streef 12 mar. 2021 a las 18:58