java - Calling subject.onNext() inside doOnSubscribe -
why calling subject.onnext(o) inside doonsubscribe not effect, calling subject.oncomplete() causes stream terminate!?
final publishsubject<integer> subject = publishsubject.create(); final observable<integer> observable = subject.doonsubscribe(new consumer<disposable>() { @override public void accept(@nonnull disposable disposable) throws exception { system.out.println("disposable = [" + disposable + "]"); subject.onnext(1); //or observable.just(2, 3).subscribe(subject); } }); observable.subscribe(new observer<integer>() { @override public void onsubscribe(disposable d) { system.out.println("d = [" + d.isdisposed() + "]"); } @override public void onnext(integer integer) { system.out.println("item = [" + integer + "]"); } @override public void onerror(throwable e) { system.out.println("e = [" + e + "]"); } @override public void oncomplete() { system.out.println("oncomplete"); } });/* expected: disposable = [false] d = [false] item = 1 item = 2 item = 3 oncomplete received : disposable = [false] d = [false] oncomplete */
when subscribing subject in 2.x, disposable representing connection traversing onsubscribe() chain before particular observer becomes visible onnext. can see if call hasobservers onsubscribe return false until onsubscribe returns.
this required observable protocol not allowed run onsubscribe , onnext concurrently , onsubscribe must happen before onnext. if rule wasn't held, concurrent call subject.onnext run before or @ same time observer.onsubscribe call , find unprepared consumer.
since publishsubject doesn't retain onnext calls, unobserved onnext items dropped. depending on use case, should instead use behaviorsubject or yoursubject.startwith(initialvalue).subscribe() value before other onnext subject.
Comments
Post a Comment