java - Calling subject.onNext() inside doOnSubscribe -
why calling subject.onnext(o)
inside doonsubscribe
not effect, calling subject.oncomplete()
causes stream terminate!?
final publishsubject<integer> subject = publishsubject.create(); final observable<integer> observable = subject.doonsubscribe(new consumer<disposable>() { @override public void accept(@nonnull disposable disposable) throws exception { system.out.println("disposable = [" + disposable + "]"); subject.onnext(1); //or observable.just(2, 3).subscribe(subject); } }); observable.subscribe(new observer<integer>() { @override public void onsubscribe(disposable d) { system.out.println("d = [" + d.isdisposed() + "]"); } @override public void onnext(integer integer) { system.out.println("item = [" + integer + "]"); } @override public void onerror(throwable e) { system.out.println("e = [" + e + "]"); } @override public void oncomplete() { system.out.println("oncomplete"); } });/* expected: disposable = [false] d = [false] item = 1 item = 2 item = 3 oncomplete received : disposable = [false] d = [false] oncomplete */
when subscribing subject
in 2.x, disposable
representing connection traversing onsubscribe()
chain before particular observer
becomes visible onnext
. can see if call hasobservers
onsubscribe
return false until onsubscribe
returns.
this required observable protocol not allowed run onsubscribe
, onnext
concurrently , onsubscribe
must happen before onnext
. if rule wasn't held, concurrent call subject.onnext
run before or @ same time observer.onsubscribe
call , find unprepared consumer.
since publishsubject
doesn't retain onnext
calls, unobserved onnext
items dropped. depending on use case, should instead use behaviorsubject
or yoursubject.startwith(initialvalue).subscribe()
value before other onnext
subject
.
Comments
Post a Comment