Tengo una secuencia con letras (A-Z) y números (1-9). Quiero unir letras que lleguen dentro de un tiempo de espera (esto puede cambiar) y siempre emitir números inmediatamente. ¿Puede sugerirme qué funciones son las mejores para hacer esto?

Ejemplo de código de trabajo (no estoy seguro de que sea correcto y / o una buena solución):

private BehaviorSubject<TimeSpan> sTimeouts = new BehaviorSubject<TimeSpan>(0.ms());

private IObservable<string> lettersJoined(IObservable<char> ob)
{
    return Observable.Create<string>(observer =>
    {
        var letters = new List<char>();
        var lettersFlush = new SerialDisposable();

        return ob.Subscribe(c =>
        {
            if (char.IsUpper(c))
            {

                if ((await sTimeouts.FirstAsync()).Ticks > 0)
                {
                    letters.Add(c);

                    lettersFlush.Disposable =
                        VariableTimeout(sTimeouts)
                        .Subscribe(x => {
                            observer.OnNext(String.Concat(letters));
                            letters.Clear();
                        });

                }
                else
                    observer.OnNext(letters.ToString());


            }
            else if (char.IsDigit(c))
                observer.OnNext(c.ToString());
        }

    }
}


private IObservable<long> VariableTimeout(IObservable<TimeSpan> timeouts)
{
    return Observable.Create<long>(obs =>
    {
        var sd = new SerialDisposable();
        var first = DateTime.Now;

        return timeouts
            .Subscribe(timeout =>
            {
                if (timeout.Ticks == 0 || first + timeout < DateTime.Now)
                {
                    sd.Disposable = null;
                    obs.OnNext(timeout.Ticks);
                    obs.OnCompleted();
                }
                else
                {
                    timeout -= DateTime.Now - first;

                    sd.Disposable =
                        Observable
                        .Timer(timeout)
                        .Subscribe(t => {
                            obs.OnNext(t);
                            obs.OnCompleted();
                        });
                }
            });

    });
}

private void ChangeTimeout(int timeout)
{
    sTimeouts.OnNext(timeout.ms())
}


// I use the following extension method
public static class TickExtensions
{
    public static TimeSpan ms(this int ms)
    {
        return TimeSpan.FromMilliseconds(ms);
    }
}

Para modificar el tiempo de espera, simplemente puedo cambiar la variable de tiempo de espera privado, pero probablemente un Asunto estaría bien si es necesario / mejor.

ACTUALIZACIÓN

var scheduler = new TestScheduler();

var timeout = scheduler.CreateColdObservable<int>(
    ReactiveTest.OnNext(0000.Ms(), 2000),
    ReactiveTest.OnNext(4300.Ms(), 1000));

var input = scheduler.CreateColdObservable<char>(
    ReactiveTest.OnNext(0100.Ms(), '1'),
    ReactiveTest.OnNext(1600.Ms(), '2'),
    ReactiveTest.OnNext(1900.Ms(), 'A'),
    ReactiveTest.OnNext(2100.Ms(), 'B'),
    ReactiveTest.OnNext(4500.Ms(), 'C'),
    ReactiveTest.OnNext(5100.Ms(), 'A'),
    ReactiveTest.OnNext(5500.Ms(), '5'),
    ReactiveTest.OnNext(6000.Ms(), 'B'),
    ReactiveTest.OnNext(7200.Ms(), '1'),
    ReactiveTest.OnNext(7500.Ms(), 'B'),
    ReactiveTest.OnNext(7700.Ms(), 'A'),
    ReactiveTest.OnNext(8400.Ms(), 'A'));

var expected = scheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(0100.Ms(), "1"),
    ReactiveTest.OnNext(1600.Ms(), "2"),
    ReactiveTest.OnNext(4100.Ms(), "AB"),
    ReactiveTest.OnNext(5500.Ms(), "5"),
    ReactiveTest.OnNext(7000.Ms(), "CAB"),
    ReactiveTest.OnNext(7200.Ms(), "1"),
    ReactiveTest.OnNext(9400.Ms(), "BAA"));


// if ReactiveTest.OnNext(3800.Ms(), 1000)
// then expected is ReactiveTest.OnNext(3800.Ms(), "AB")

ACTUALIZACIÓN # 2

Solución refinada que admite correctamente el cambio de tiempo de espera durante el almacenamiento en búfer

0
zpul 26 jul. 2016 a las 12:39

2 respuestas

La mejor respuesta

Varias cosas que pueden ayudar aquí.

Los primeros diagramas de canicas son buenos para ayudar a visualizar el problema, pero al probar si algo funciona o no, seamos prescriptivos y realicemos pruebas unitarias con instancias ITestableObservable<T>.

En segundo lugar, no estoy seguro de cuál debería ser su solución. Si miro sus diagramas de mármol, veo algunas discrepancias. Aquí he agregado una línea de tiempo para ayudar a visualizar.

                 111111111122222222223
Time:   123456789012345678901234567890
Input:  1---2--A-B----C--A-B-1--B-A--A
Output: 1---2----AB-------CAB-1-----BAA 

Aquí veo la salida "AB" publicada en la unidad 10. Luego veo la salida "CAB" publicada en la unidad 19. Además, veo la salida "BAA" publicada en la unidad 29. Pero usted sugiere que esto debería ocurrir con tiempos de espera constantes separados. Entonces, creo que tal vez sea la brecha entre los valores lo que es importante, pero esto tampoco parece cuadrar. Esto solo me lleva de nuevo al punto anterior, proporcione una prueba unitaria que pueda aprobar o reprobar.

En tercer lugar, con respecto a su implementación, podría mejorarla un poco utilizando el tipo SerialDisposable para el tipo lettersFlush.

Para ayudarme a configurar la prueba unitaria, creo el siguiente bloque de código

var scheduler = new TestScheduler();
var input = scheduler.CreateColdObservable<char>(
    ReactiveTest.OnNext(0100.Ms(), '1'),
    ReactiveTest.OnNext(0500.Ms(), '2'),
    ReactiveTest.OnNext(0800.Ms(), 'A'),
    ReactiveTest.OnNext(1000.Ms(), 'B'),
    ReactiveTest.OnNext(1500.Ms(), 'C'),
    ReactiveTest.OnNext(1800.Ms(), 'A'),
    ReactiveTest.OnNext(2000.Ms(), 'B'),
    ReactiveTest.OnNext(2200.Ms(), '1'),
    ReactiveTest.OnNext(2500.Ms(), 'B'),
    ReactiveTest.OnNext(2700.Ms(), 'A'),
    ReactiveTest.OnNext(3000.Ms(), 'A'));

var expected = scheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(0100.Ms(), "1"),
    ReactiveTest.OnNext(0500.Ms(), "2"),
    ReactiveTest.OnNext(1000.Ms(), "AB"),
    ReactiveTest.OnNext(2000.Ms(), "CAB"),
    ReactiveTest.OnNext(2200.Ms(), "1"),
    ReactiveTest.OnNext(3000.Ms(), "BAA"));

Me he tomado algunas libertades para cambiar algunos valores a lo que creo que quisiste decir con tus diagramas de mármol.

Si luego uso la muy buena respuesta proporcionada anteriormente por @Shlomo, puedo ver más problemas con solo usar diagramas de mármol difusos. Como el límite del búfer tendría que ocurrir después de que ocurra el último valor a incluir, estas ventanas deben cerrarse una a una.

void Main()
{
    var scheduler = new TestScheduler();
    var input = scheduler.CreateColdObservable<char>(
        ReactiveTest.OnNext(0100.Ms(), '1'),
        ReactiveTest.OnNext(0500.Ms(), '2'),
        ReactiveTest.OnNext(0800.Ms(), 'A'),
        ReactiveTest.OnNext(1000.Ms(), 'B'),
        ReactiveTest.OnNext(1500.Ms(), 'C'),
        ReactiveTest.OnNext(1800.Ms(), 'A'),
        ReactiveTest.OnNext(2000.Ms(), 'B'),
        ReactiveTest.OnNext(2200.Ms(), '1'),
        ReactiveTest.OnNext(2500.Ms(), 'B'),
        ReactiveTest.OnNext(2700.Ms(), 'A'),
        ReactiveTest.OnNext(3000.Ms(), 'A'));

    var expected = scheduler.CreateColdObservable<string>(
        ReactiveTest.OnNext(0100.Ms(), "1"),
        ReactiveTest.OnNext(0500.Ms(), "2"),
        ReactiveTest.OnNext(1000.Ms()+1, "AB"),
        ReactiveTest.OnNext(2000.Ms()+1, "CAB"),
        ReactiveTest.OnNext(2200.Ms(), "1"),
        ReactiveTest.OnNext(3000.Ms()+1, "BAA"));

    /*
                     111111111122222222223
    Time:   123456789012345678901234567890
    Input:  1---2--A-B----C--A-B-1--B-A--A
    Output: 1---2----AB-------CAB-1-----BAA 
    */

    var bufferBoundaries = //Observable.Timer(TimeSpan.FromSeconds(1), scheduler);
            //Move to a hot test sequence to force the windows to close just after the values are produced
            scheduler.CreateHotObservable<Unit>(
        ReactiveTest.OnNext(1000.Ms()+1, Unit.Default),
        ReactiveTest.OnNext(2000.Ms()+1, Unit.Default),
        ReactiveTest.OnNext(3000.Ms()+1, Unit.Default),
        ReactiveTest.OnNext(4000.Ms()+1, Unit.Default));

    var publishedFinal = input
        .Publish(i => i
            .Where(c => char.IsLetter(c))
            .Buffer(bufferBoundaries)
            .Where(l => l.Any())
            .Select(lc => new string(lc.ToArray()))
            .Merge(i
                .Where(c => char.IsNumber(c))
                .Select(c => c.ToString())
            )
        );

    var observer = scheduler.CreateObserver<string>();

    publishedFinal.Subscribe(observer);
    scheduler.Start();

    //This test passes with the "+1" values hacked in.
    ReactiveAssert.AreElementsEqual(
        expected.Messages,
        observer.Messages);

}

// Define other methods and classes here
public static class TickExtensions
{
    public static long Ms(this int ms)
    {
        return TimeSpan.FromMilliseconds(ms).Ticks;
    }
}

Supongo que mi punto es que Rx es determinista, por lo tanto, podemos crear pruebas que sean deterministas. Entonces, si bien su pregunta es muy buena, y creo que @Shlomo proporciona una respuesta final sólida, podemos hacerlo mejor que solo diagramas de mármol difusos y usar Random en nuestros ejemplos / pruebas. Ser preciso aquí debería ayudar a prevenir condiciones de carrera tontas en la producción y ayudar al lector a comprender mejor estas soluciones.

2
Lee Campbell 27 jul. 2016 a las 06:42

Asumiendo sampleInput como su entrada de muestra:

var charStream = "12ABCAB1BAA".ToObservable();
var random = new Random();
var randomMilliTimings = Enumerable.Range(0, 12)
    .Select(i => random.Next(2000))
    .ToList();

var sampleInput = charStream
    .Zip(randomMilliTimings, (c, ts) => Tuple.Create(c, TimeSpan.FromMilliseconds(ts)))
    .Select(t => Observable.Return(t.Item1).Delay(t.Item2))
    .Concat();

Primero, en lugar de cambiar una variable mutable, sería mejor generar alguna secuencia para representar sus ventanas de búfer:

Input:  1---2--A-B----C--A-B-1--B-A--A
Window: ---------*--------*---------*--
Output: 1---2----AB-------CAB-1-----BAA

Genere un flujo de TimeSpan s incrementales y lo llamé bufferBoundaries así para demostrar:

var bufferBoundaries = Observable.Range(1, 20)
    .Select(t => Observable.Return(t).Delay(TimeSpan.FromSeconds(t)))
    .Concat();

Esto se vería así:

Seconds: 0--1--2--3--4--5--6--7--8--9--10
BB     : ---1-----2--------3-----------4-

... a continuación, desea dividir ese sampleInput en flujos separados para letras y números, y manejarlos en consecuencia:

var letters = sampleInput
    .Where(c => char.IsLetter(c))
    .Buffer(bufferBoundaries)
    .Where(l => l.Any())
    .Select(lc => new string(lc.ToArray()));

var numbers = sampleInput
    .Where(c => char.IsNumber(c))
    .Select(c => c.ToString());

A continuación, combine las dos corrientes:

var finalOutput = letters.Merge(numbers);

Por último, generalmente no es una buena idea suscribirse dos veces a la misma entrada (en nuestro caso, sampleInput) si puede evitarlo. Entonces, en nuestro caso, deberíamos reemplazar letters, numbers y finalOutput con lo siguiente:

var publishedFinal = sampleInput
    .Publish(_si => _si
        .Where(c => char.IsLetter(c))
        .Buffer(bufferBoundaries)
        .Where(l => l.Any())
        .Select(lc => new string(lc.ToArray()))
        .Merge( _si
            .Where(c => char.IsNumber(c))
            .Select(c => c.ToString())
        )
    );
2
Shlomo 26 jul. 2016 a las 11:28