Quiero implementar algo como lo que permite el complemento promiseAny, pero para Observables, donde el primero en obtener una respuesta es el "ganador". Específicamente para Angular http.get() s para diferentes servidores posibles.

Tengo lo siguiente, sin embargo, está devolviendo un resultado para todos los Observables fusionados que representan http.get para cada servidor. Uno de los servidores está vivo y uno que sé que está muerto. Sin embargo, el valor que se devuelve de la suscripción tiene 2 valores y los resultados no indican que uno esté arriba y el otro abajo. La suscripción (http.get()) no parece estar activando. ¿Cómo debo escribir esto?

Esto es para Angular 7.2.

import {merge} from 'rxjs';
import {take} from 'rxjs/operators';

async getActiveServer(servers: string[]): Promise<string> {
    return new Promise(async (resolve, reject) => {
        merge(this.buildObservables(servers)).pipe(take(1))
            .subscribe((value) => {
                // .flatMap((value) => {
                console.log(`observable - value: ${JSON.stringify(value, null, 2)}`);
                if (Array.isArray(value) && value.length > 0) {
                    resolve(this.findServer(value[0]));
                } else {
                    reject('cannot find server as response is not an array - it is: ${value}');
                }
            }, (error) => {
                console.log(`observable - error: ${error}`);
            });
    });

private async buildObservables(servers: string[]): Promise<any> {
    const observablesBatch = [];
    for (const server of servers) {
        observablesBatch.push(this.http.get<any>(server + '/health/alive?server=' + server));
    }
    return observablesBatch;
}

findServer() trata el problema separado de que estoy recuperando una estructura de objeto anidada. Este método recorre la estructura para encontrar la url y extrae la información deseada de la cadena.

El valor impreso desde console.log( observable - valor: `es algo así como:

observable - value: [
  {
    "_isScalar": false,
    "source": {
      "_isScalar": false,
      "source": {
        "_isScalar": false,
        "source": {
          "_isScalar": true,
          "value": {
            "url": "http://localhost:8080/health/alive?server=http://localhost:8080",
            "body": null,
            "reportProgress": false,
            "withCredentials": false,
            "responseType": "json",
            "method": "GET",
            "headers": {
              "normalizedNames": {},
              "lazyUpdate": null,
              "headers": {}
            },
            "params": {
              "updates": null,
              "cloneFrom": null,
              "encoder": {},
              "map": null
            },
            "urlWithParams": "http://localhost:8080/health/alive?server=http://localhost:8080"
          }
        },
        "operator": {
          "concurrent": 1
        }
      },
      "operator": {}
    },
    "operator": {}
  },
  {
    "_isScalar": false,
    "source": {
      "_isScalar": false,
      "source": {
        "_isScalar": false,
        "source": {
          "_isScalar": true,
          "value": {
            "url": "https://remoteServer.net//health/alive?server=https://remoteServer.net/",
            "body": null,
            "reportProgress": false,
            "withCredentials": false,
            "responseType": "json",
            "method": "GET",
            "headers": {
              "normalizedNames": {},
              "lazyUpdate": null,
              "headers": {}
            },
            "params": {
              "updates": null,
              "cloneFrom": null,
              "encoder": {},
              "map": null
            },
            "urlWithParams": "https://remoteserver.net//health/alive?server=https://remoteserver.net/"
          }
        },
        "operator": {
          "concurrent": 1
        }
      },
      "operator": {}
    },
    "operator": {}
  }
]

Como puede ver, probé un flatMap() pero esto no funcionó en el tiempo que le asigné.

¿Cómo debo escribir esto?


1. Di una respuesta de lo que funcionó en base a la respuesta de @ Phix.

2. Ediciones: la solución usando race como lo sugiere @Adrian Brand.

Me gusta (si funcionó) pero no funciona. No tengo tiempo para resolver esto y, según la publicación vinculada de Adrian, esto debería funcionar. El error de sintaxis que obtengo es Property subscribe does not exist on MonoTypeOperatorFunction<any>.

ESTO NO FUNCIONA PERO SERÍA BONITO SI LO HIZO (aunque es necesario agregar un filtro o similar).

async getActiveServer(servers: string[]): Promise<string> {
    return new Promise(async (resolve, reject) => {
    race(...this.buildObservables(servers))
        .subscribe(r => {
            console.log('Found a live server:', r);
            resolve(r.alive);
        }, () => console.warn('Nothing is alive.'));
    });
}
2
HankCa 10 may. 2019 a las 03:56

3 respuestas

La mejor respuesta

Creo que esto es en la dirección que estás buscando. Primero pensé que race podría funcionar, pero eso devolverá lo que se resuelva primero, incluidos los errores.

import { merge, of, race } from 'rxjs'; 
import { first, filter, catchError } from 'rxjs/operators';

// Combine all http requests
merge(...buildObservables())
  .pipe(
    // Only let through those that have a good response code
    filter((server: any) => server.response < 400),
    // Just take the first one
    first(),
  )
  .subscribe(r => console.log('Found a live server:', r), () => console.warn('Nothing is alive.'))

// Builds mock server responses
function buildObservables() {
  const responses = [];
  for(let i = 0; i < 4; i++) {
    responses.push(mockResponse(`http://sub${i}.example.com/api/v1`));
  }

  return responses;
}

function mockResponse(url: string) {
  const timeout = Math.round(Math.random() * 3000)
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      if (Math.random() < .5) {
        resolve({
          server: url,
          response: 200
        })
      } else {
        reject({
          server: url,
          response: 500
        })
      }
    }, timeout)
  })
}

Stackblitz

1
Phix 10 may. 2019 a las 01:46

Esto es lo que funciona según la respuesta de @Phix (asegúrese de votarlos):

async getActiveServer(servers: string[]): Promise<string> {
        return new Promise(async (resolve, reject) => {
            merge(...this.buildObservables(servers))
                .pipe(
                    filter((server: any) => server.hasOwnProperty('alive')),
                    first()
                )
                .subscribe(r => {
                    console.log('Found a live server:', r);
                    resolve(r.alive);
                }, () => console.warn('Nothing is alive.'));
        });
    }

    private buildObservables(servers: string[]): Observable<any>[] {
        const observablesBatch: Observable<any>[] = [];
        for (const server of servers) {
            observablesBatch.push(this.http.get<any>(server + '/health/alive?server=' + server));
        }
        return observablesBatch;
    }

Tenga en cuenta que el punto final alive en mi servidor está regresando:

async alive(server): Promise<any> {
    return Promise.resolve({alive: server});
}
1
HankCa 10 may. 2019 a las 02:35

¿Has intentado usar la raza en su lugar?

https://www.learnrxjs.io/operators/combination/race.html

1
Adrian Brand 10 may. 2019 a las 01:14