Se está declarando que PublishSubject no es seguro para subprocesos en RxJava. Okay.

Estoy tratando de encontrar algún ejemplo, estoy tratando de construir cualquier ejemplo para emular la condición de la carrera, que conduce a resultados no deseados. Pero no puedo :(

¿Alguien puede proporcionar un ejemplo que pruebe que PublishSubject no es seguro para subprocesos?

0
corvax 6 abr. 2017 a las 22:42

2 respuestas

La mejor respuesta

He encontrado la prueba. Creo que este ejemplo es más obvio que @akarnokd proporcionado.

    AtomicInteger counter = new AtomicInteger();

    // Thread-safe
    // SerializedSubject<Object, Object> subject = PublishSubject.create().toSerialized();

    // Not Thread Safe
    PublishSubject<Object> subject = PublishSubject.create();

    Action1<Object> print = (x) -> System.out.println(Thread.currentThread().getName() + " " + counter);

    Consumer<Integer> sleep = (s) -> {
        try {
            Thread.sleep(s);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    };

    subject
            .doOnNext(i -> counter.incrementAndGet())
            .doOnNext(i -> counter.decrementAndGet())
            .doOnNext(print)
            .filter(i -> counter.get() != 0)
            .doOnNext(i -> {
                        throw new NullPointerException("Concurrency detected");
                    }
            )
            .subscribe();

    Runnable r = () -> {
        for (int i = 0; i < 100000; i++) {
            sleep.accept(1);
            subject.onNext(i);
        }
    };

    ExecutorService pool = Executors.newFixedThreadPool(2);
    pool.execute(r);
    pool.execute(r);
2
corvax 11 abr. 2017 a las 23:21

Por lo general, las personas preguntan por qué su configuración se comporta inesperadamente y / o falla y la respuesta es: porque llaman a los métodos onXXX en Subject al mismo tiempo:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

import rx.Scheduler.Worker;
import rx.exceptions.MissingBackpressureException;
import rx.observers.AssertableSubscriber;
import rx.schedulers.Schedulers;
import rx.subjects.*;

public class PublishSubjectRaceTest {

    @Test
    public void racy() throws Exception {
        Worker worker = Schedulers.computation().createWorker();
        try {
            for (int i = 0; i < 1000; i++) {
                AtomicInteger wip = new AtomicInteger(2);

                PublishSubject<Integer> ps = PublishSubject.create();

                AssertableSubscriber<Integer> as = ps.test(1);

                CountDownLatch cdl = new CountDownLatch(1);

                worker.schedule(() -> {
                    if (wip.decrementAndGet() != 0) {
                        while (wip.get() != 0) ;
                    }
                    ps.onNext(1);

                    cdl.countDown();
                });
                if (wip.decrementAndGet() != 0) {
                    while (wip.get() != 0) ;
                }
                ps.onNext(1);

                cdl.await();

                as.assertFailure(MissingBackpressureException.class, 1);
            }
        } finally {
            worker.unsubscribe();
        }
    }

    @Test
    public void nonRacy() throws Exception {
        Worker worker = Schedulers.computation().createWorker();
        try {
            for (int i = 0; i < 1000; i++) {
                AtomicInteger wip = new AtomicInteger(2);

                Subject<Integer, Integer> ps = PublishSubject.<Integer>create()
                    .toSerialized();

                AssertableSubscriber<Integer> as = ps.test(1);

                CountDownLatch cdl = new CountDownLatch(1);

                worker.schedule(() -> {
                    if (wip.decrementAndGet() != 0) {
                        while (wip.get() != 0) ;
                    }
                    ps.onNext(1);

                    cdl.countDown();
                });
                if (wip.decrementAndGet() != 0) {
                    while (wip.get() != 0) ;
                }
                ps.onNext(1);

                cdl.await();

                as.assertFailure(MissingBackpressureException.class, 1);
            }
        } finally {
            worker.unsubscribe();
        }
    }
}
5
akarnokd 6 abr. 2017 a las 20:46