El escenario es el siguiente: un dispositivo que se comunica se considera conectado si realiza una devolución de llamada al servidor en un período corto de tiempo. Quiero crear una clase que encapsule la funcionalidad de realizar un seguimiento de este estado. Al llamar al dispositivo, el tiempo de espera debe restablecerse. En la devolución de llamada, la conexión se confirma y el estado debe establecerse en verdadero , si la devolución de llamada se agota, debe establecerse en falso . Pero la próxima llamada debería poder restablecer el tiempo de espera nuevamente indiferente al estado actual.

Estaba pensando en lograr esto con RX usando swith y timeout. Pero no sé por qué deja de funcionar.

public class ConnectionStatus
{
private Subject<bool> pending = new Subject<bool>();
private Subject<bool> connected = new Subject<bool>();

public bool IsConnected { get; private set; }

public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
{
    pending.Select(outer => connected.Timeout(TimeSpan.FromSeconds(timeoutSeconds))) 
        .Switch()
        .Subscribe(_ => IsConnected = true, e => IsConnected = false, token);
}

public void ConfirmConnected()
{
    connected.OnNext(true);
}

public void SetPending()
{
    pending.OnNext(true);
}
}

Este es el "caso de prueba":

var c = new ConnectionStatus(default(CancellationToken));

c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(5));
c.ConfirmConnected();   
c.IsConnected.Dump(); // TRUE, OK

c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(5));
c.ConfirmConnected();
c.IsConnected.Dump(); // TRUE, OK

c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(20));
c.IsConnected.Dump(); // FALSE, OK
c.ConfirmConnected(); 
c.IsConnected.Dump(); // FALSE, OK

c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(10));
c.ConfirmConnected(); 
c.IsConnected.Dump(); // FALSE, NOT OK!

Supongo que el tiempo de espera del observable interno también detiene el observable externo. Como la outer => lambda ya no se llama. ¿Cuál es la manera correcta?

Gracias

4
ZorgoZ 15 nov. 2017 a las 15:59

2 respuestas

La mejor respuesta

El problema es que Timeout esencialmente provoca una Excepción que hace explotar las suscripciones de Rx. Una vez que se activa el tiempo de espera (como lo tiene codificado), no se enviarán otras notificaciones. La gramática de Rx es que puede tener mensajes * OnNext seguidos de uno OnCompleted o uno OnError. Después de que se envíe el OnError del Timeout, no verá más mensajes.

Debe recibir el mensaje de tiempo de espera mediante mensajes OnNext en lugar de un mensaje OnError. En su código anterior, convirtió cualquier OnError en falso y cualquier OnNext en verdadero. En su lugar, debe incrustar el nuevo valor IsConnected adecuado en los mensajes OnNext. A continuación, le indicamos cómo hacerlo:

public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
{
    pending.Select(_ => connected
            .Timeout(TimeSpan.FromSeconds(timeoutSeconds))
            .Materialize()
            .Select(n => n.Kind == NotificationKind.OnError && n.Exception.GetType() == typeof(TimeoutException) 
                ? Notification.CreateOnNext(false)
                : n)
            .Dematerialize()
            .Take(1)
        )
        .Switch()
        .Subscribe(b => IsConnected = b, token);
}
3
Shlomo 15 nov. 2017 a las 18:38

Aquí hay una forma alternativa de producir el flujo de valores IsConnected sin usar .TimeOut:

public class ConnectionStatus
{
    private Subject<Unit> pending = new Subject<Unit>();
    private Subject<Unit> connected = new Subject<Unit>();

    public bool IsConnected { get; private set; }

    public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
    {
        pending
            .Select(outer =>
                Observable.Amb(
                    connected.Select(_ => true),
                    Observable.Timer(TimeSpan.FromSeconds(timeoutSeconds)).Select(_ => false)))
            .Switch()
            .Subscribe(isConnected => IsConnected = isConnected, token);
    }

    public void ConfirmConnected()
    {
        connected.OnNext(Unit.Default);
    }

    public void SetPending()
    {
        pending.OnNext(Unit.Default);
    }
}

El operador Observable.Amb simplemente toma un valor de cualquier observable que produzca un valor primero; es preferible a la codificación con excepciones.

3
Enigmativity 16 nov. 2017 a las 06:08