Estoy usando RxJava y RxAndroid con Retrofit2.

Observable<ResponseOne> responseOneObservable = getRetrofitClient().getDataOne()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());

Observable<ResponseTwo> responseTwoObservable = getRetrofitClient().getDataTwo()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());

Usando el operador zip como se muestra a continuación en los dos observadores anteriores.

 Observable<ArrayList<TestData>> testDataObservable = Observable.zip(responseOneObservable, responseTwoObservable, new Func2<ResponseOne, ResponseTwo, ArrayList<TestData>>() {
            @Override
                public ArrayList<TestData> call(ResponseOne responseOne, ResponseTwo responseTwo) {
                  ArrayList<TestData> testDataList = new ArrayList();
                      // Add test data from response responseOne & responseTwo
                  return testDataList;
            } 
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<ArrayList<TestData>>() {

        @Override
        public void onNext(ArrayList<TestData> testDataList) {

        }

        @Override
        public void onCompleted() {
            Log.d(TAG, "onCompleted" );
        }

        @Override
        public void onError(Throwable t) {
            Log.d(TAG, "onError Throwable: " + t.toString() );
        }
    });

Si se produce algún error durante la actualización de la llamada http en responseOneObservable y responseTwoObservable, llamará directamente al método onError del suscriptor de testDataObservable.

Quiero continuar en el método call del operador zip incluso si cualquiera de los dos observables tiene una respuesta exitosa.

¿Cómo manejar la respuesta de error usando el operador zip?

25
Priyank Patel 27 dic. 2016 a las 12:52

3 respuestas

La mejor respuesta

Puede usar onErrorResumeNext para devolver algo Observable o onErrorReturn para devolver algún valor predeterminado a zip, como:

Observable.zip(
   responseOneObservable
       .onErrorReturn(new Func1<Throwable, ResponseOne>() {
        @Override
        public ResponseOne call(final Throwable throwable) {
            return new ResponseOne();
        }
    }),
   responseTwoObservable
       .onErrorReturn(new Func1<Throwable, ResponseTwo>() {
        @Override
        public ResponseTwo call(final Throwable throwable) {
            return new ResponseTwo();
        }
    }),
   ...

Consulte manejo de onError para obtener más información.


ACTUALIZACIÓN: con RxJava 2.0 debe usar Function en lugar de Func1:

import io.reactivex.functions.Function;
...
Observable.zip(
   responseOneObservable
       .onErrorReturn(new Function<Throwable, ResponseOne>() {
        @Override
        public ResponseOne apply(@NonNull final Throwable throwable) {
            return new ResponseOne();
        }
    }),
   responseTwoObservable
       .onErrorReturn(new Function<Throwable, ResponseTwo>() {
        @Override
        public ResponseTwo apply(@NonNull final Throwable throwable) {
            return new ResponseTwo();
        }
    }),
   ...

O usando lambdas:

Observable.zip(
   responseOneObservable
       .onErrorReturn(throwable -> new ResponseOne()),
   responseTwoObservable
       .onErrorReturn(throwable -> new ResponseTwo()),
   ...

O usando Kotlin:

Observable.zip(
   responseOneObservable
       .onErrorReturn { ResponseOne() },
   responseTwoObservable
       .onErrorReturn { ResponseTwo() },
   ...
33
EgorD 13 jun. 2019 a las 13:34

Puede devolver la respuesta predeterminada de cualquiera de los dos observables con el operador onErrorResumeNext.

Observable<ResponseOne> responseOneObservable = getRetrofitClient().getDataOne()
    .onErrorResumeNext(throwable -> {/*some default value*/})
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread());

Observable<ResponseTwo> responseTwoObservable = getRetrofitClient().getDataTwo()
    .onErrorResumeNext(throwable -> {/*some default value*/})
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread());

Consulte también Manejo de errores en RxJava - Dan Lew

4
Alexander Perfilyev 27 dic. 2016 a las 10:04

Debe usar onErrorResumeNext sobre los observables comprimidos individuales para indicarles que emitan un elemento predeterminado en caso de error.

Consulte Operadores de manejo de errores

0
Marco Righini 27 dic. 2016 a las 10:00