java - Calling subject.onNext() inside doOnSubscribe -


why calling subject.onnext(o) inside doonsubscribe not effect, calling subject.oncomplete() causes stream terminate!?

final publishsubject<integer> subject = publishsubject.create();      final observable<integer> observable = subject.doonsubscribe(new consumer<disposable>() {         @override         public void accept(@nonnull disposable disposable) throws exception {             system.out.println("disposable = [" + disposable + "]");             subject.onnext(1);             //or             observable.just(2, 3).subscribe(subject);         }     });      observable.subscribe(new observer<integer>() {         @override         public void onsubscribe(disposable d) {             system.out.println("d = [" + d.isdisposed() + "]");         }          @override         public void onnext(integer integer) {             system.out.println("item = [" + integer + "]");         }          @override         public void onerror(throwable e) {             system.out.println("e = [" + e + "]");         }          @override         public void oncomplete() {             system.out.println("oncomplete");         }     });/*     expected:     disposable = [false]     d = [false]     item = 1     item = 2     item = 3     oncomplete     received :     disposable = [false]     d = [false]     oncomplete     */ 

when subscribing subject in 2.x, disposable representing connection traversing onsubscribe() chain before particular observer becomes visible onnext. can see if call hasobservers onsubscribe return false until onsubscribe returns.

this required observable protocol not allowed run onsubscribe , onnext concurrently , onsubscribe must happen before onnext. if rule wasn't held, concurrent call subject.onnext run before or @ same time observer.onsubscribe call , find unprepared consumer.

since publishsubject doesn't retain onnext calls, unobserved onnext items dropped. depending on use case, should instead use behaviorsubject or yoursubject.startwith(initialvalue).subscribe() value before other onnext subject.


Comments

Popular posts from this blog

commonjs - How to write a typescript definition file for a node module that exports a function? -

openid - Okta: Failed to get authorization code through API call -

thorough guide for profiling racket code -