Tengo un IObservable anidado y estoy usando una declaración de cambio de Rx que me ayuda a eliminar las secuencias anteriores. Pero, ¿y si quiero eliminarlo manualmente? Desechar la secuencia externa no es una opción.

_performSearchSubject
.Select(_ => return PerformQuery())
            .Switch()
            .Subscribe(HandleResponseStream, HandleError);

PerformQuery devuelve IObservable<Result>;

4
VidasV 26 ene. 2016 a las 15:01

2 respuestas

La mejor respuesta

Después de un tiempo lo encontré yo mismo ... Entonces la respuesta es:

Puede usar TakeUntil(IObservable<TOther>) y luego simplemente pasar un nuevo asunto al que se puede llamar siempre que desee cancelar la transmisión anterior. Creo que es similar a lo que hace la instrucción Switch () en los flujos subyacentes.

El código final se ve así:

Subject<Unit> _cancellationObservable = new Subject<Unit>();

_performSearchSubject
.Select(_ => {
                return PerformQuery().TakeUntil(_cancellationObservable);
              })
            .Switch()
            .Subscribe(HandleResponseStream, HandleError);

Y cada vez que quiero cancelarlo, solo llamo a este tipo:

_cancellationObservable.OnNext(Unit.Default);
4
Vidas Vasiliauskas 27 ene. 2016 a las 15:52

La secuencia observable devuelta por 'PerformQuery' se eliminará automáticamente cuando:

  • Su declaración 'Switch' se ejecuta debido a un nuevo '_performSearchSubject'. es decir, la secuencia anterior será limpiada por el operador 'Switch'. O
  • Su consulta general está eliminada.

Debe almacenar el desechable devuelto por su .Subscribe y usarlo para limpiar su suscripción. Su uso de un sujeto interno es innecesario.

¿De verdad ha probado su solución original? Aquí hay un código de Linqpad para probar el punto:

var inner = Observable.Create<string>((o) =>
{
    o.OnNext("First item of new inner");
    return Disposable.Create(() => "Inner Disposed".Dump());
});

var outer = Observable.Timer(TimeSpan.MinValue, TimeSpan.FromSeconds(10))
        .Select(_ => inner)
        .Switch()
        .Subscribe(output => output.Dump());


Console.ReadLine();

outer.Dispose();

Notará que no importa cuando presione una tecla e ingrese (para pasar de Console.ReadLine), su flujo observable interno será eliminado. Del mismo modo, su declaración de Switch limpiará cualquier transmisión anterior cada vez que se active el temporizador.

Aún puede usar un 'TakeUntil' si desea que esto se controle en función de un flujo no relacionado, pero es mejor usar el desechable revertido si puede.

0
H_Andr 26 ene. 2016 a las 15:48