Tengo una cadena de interfaz existente que quiero ejecutar como una reactor en lugar de administrar mis propios hilos y colas

public interface UserLookupService {
    public User lookup(String id);
}
public interface UsersHandler {
    public void handle(List<User> users>);
}
UserLookupService userSvc = ...;
UsersHandler usersHandler = ...

// Works well to lookup users in parallel. 
Flux.just("userA", "userB", "userC")
    .parallel(2)
    .runOn(Schedulers.parallel())
    .subscribe(str -> {
        userSvc.lookup(str);
    });

¿Cómo puedo cadenar ese resultado, por lo que invoca UsersHandler con lotes de User?

2
Johan Sjöberg 1 jul. 2019 a las 17:56

1 respuesta

La mejor respuesta

Suscribirse a algo desencadena la cadena, por lo que usted en general no puede "cadenar" a suscriptores, son lo último en la cadena.

Piense si así, configura su tubería reactiva, y cuando usted subscribe, desencadena la tubería para comenzar, y la cadena producirá un resultado.

En un servidor web, el subscriber suele ser el cliente de llamadas, y cuando el cliente subscribes activa la cadena de eventos en el servidor que publicará datos.

A Flux es una especie de una lista de 1 a n Mono s. Cada objeto en un Mono/Flux tiene una serie de "estados" por así decirlo. Estos son Success, Error, Cancel, Next, Completed y más.

Cuando un Mono/Flux entra internamente en un estado Success se emitirá el valor en él. A Mono suele va Success cuando algo se ha resuelto en el Mono.

Cuando declaras Flux.just("userA", "userB", "userC"), básicamente, pide que el flujo resuelva la entrada que está alimentando. Colocar una cadena es algo que se resolverá instantáneamente, así que el flujo entrará en un estado Success y comenzará a emitir las cadenas tan pronto como algo Subscribes. Por lo tanto, todo lo que tiene que hacer es declarar la cadena que desea que suceda después de alguien Subscribes.

Esto se puede hacer de varias maneras diferentes, cuando desea hacer algo y cambiar el valor, como lo desea de un string a un user Por lo general, usamos map.

Si solo queremos hacer algo con cada objeto y no devolver nada, podemos usar doOnNext.

Flux.just("userA", "userB", "userC")
            .parallel(2)
            .runOn(Schedulers.parallel())
            .map(userString -> {
                return lookupService.lookup(userString);
            })
            .doOnNext(user -> {
                // if you want to do something on each user
                // will return void so if you want to log something
                // or handle each user
            }).subscribe();

Suscribirse debe ser lo último en la cadena.

4
Toerktumlare 1 jul. 2019 a las 17:02