Tengo una variedad de múltiples URL y puertos. Para cada uno de ellos, necesito enviar y recibir algo de regreso:

Flux.fromArray(trackersArray)
    .flatMap(tracker -> 
               ConnectToTracker.connect(tracker.getTracker(), tracker.getPort()))

Me comunico con los servidores en UDP, por lo que no puedo saber si un servidor está vivo o no, a menos que envíe un mensaje que, según un conjunto de reglas, necesite responder.

ConnectToTracker.connect puede enviar una señal onNext si la respuesta del servidor o la señal onError si, por ejemplo, el servidor no responde (SocketTimeOutException) o cualquier otra falla (general { {X4}}).

No quiero terminar la flux si, por ejemplo, la señal onError es igual a SocketTimeOutException. En cambio, me gustaría intentar comunicarme con cada rastreador que obtuve.

Este enlace contiene todas las operaciones que puedo usar para manejar errores pero no ignorarlos.

Estoy usando Reactor 3 si esto es importante.

Actualización:

Hice un truco feo , pero funciona:

Flux.fromArray(trackersArray)
        .handle((Tracker tracker, SynchronousSink<ConnectResponse> sink) -> {
            ConnectToTracker.connect(tracker.getTracker(), tracker.getPort())
                    .subscribe(sink::next, error -> {
                        if (!(error instanceof SocketTimeoutException))
                            sink.error(error);
                    }, sink::complete);
        })

Por favor, llámenos para responder si tiene algo mejor.

7
Stav Alfi 13 ene. 2018 a las 22:41

3 respuestas

La mejor respuesta

Como ya está procesando URL en un mapa plano, use onErrorResume(e -> Mono.empty()). esto permitirá que flatmap ignore el error. editar: dentro del mapa plano, en el lado derecho de la lambda

12
Simon Baslé 14 ene. 2018 a las 08:06
Flux.fromArray(trackersArray)
.flatMap(tracker -> 
           ConnectToTracker.connect(tracker.getTracker(), tracker.getPort())
                .onErrorResume(SocketTimeoutException.class, __ -> Mono.empty()))

Tal vez esto sea mejor que hacer lo mismo al recuperarse de SocketTimeOut y si la excepción es otra, iré por onError

3
Enzo Bonggio 16 ene. 2018 a las 00:04

Ahora tenemos reactor.core.publisher.onErrorContinue() en la versión 3.3.2, que le permite enviar la señal onNext() cuando algunos elementos son onError(). Usa log(), verás mejor.

La firma es (throwable, instance), por lo que si desea cerrar la sesión con error, es útil.

Flux.fromIterable(aList)
    .flatMap(this::xxxx)
    .onErrorContinue((throwable, o) -> {
        log.error("Error while processing {}. Cause: {}", o, throwable.getMessage());
})
    ....
2
WesternGun 18 mar. 2020 a las 16:56
48243630