java - RXJava 2 Polling with multiple subscribers -


i start replacing/using rxjava2 polling instead of observer , listeners. there 1 problem. ive 1 polling observable should started if @ least 1 subscriber connected. if multiple subscribers connected interval should same. means: 1 observable repeat polling process n seconds. if observable has 1..* subscribers should keep polling n seconds , notify subscribers result.

this how have done listeners and/or rxjava solution.

my first try creating singleton class creates 1 publishsubject. if subscribed data in onnext(). polling observer started somewhere , push data subject. doesnt work excepted since it's

  • bad pattern design
  • doesnt start if subscriber connected , stops if no subscribers available
  • doesnt share data successfull , requires 2 classes (for subject , repeating observable)

    public class singletonclass {  private publishsubject<list<data>> subject = publishsubject.create();   public publishsubject getsubject() {     return this.subject; }  public void setdata(list<data> data) {     subject.onnext(data); } } 

i lovely avoid listener/interfaces share informations around , let rxjava2 job.

after research figured out there refcount() , share() im not sure if proper way solving this. in case it's rest-service polls server if @ least 1 subscriber connected elsehow should stop polling since doesnt make sense getting data in case.

i tried solve onec doesnt work excepted:

polling using rxjava2 / rxandroid 2 , retrofit

i this:

observable<data> datasource = observable.interval(interval, time_unit)     .observeon(schedulers.io()) // make rest requests on io threads     .map(n -> {             return requestdata();         })     .replay(1); 

replay() operator includes share(), in turn includes publish() , refcount() functionalities. makes observable hot, i.e. subscribers share single subscription. automatically subscribes (starts new interval sequence) first subscriber, , unsubscribes (stops interval) when last subscriber gone.

replay(1) caches last emitted value, i.e. new subscribers not have wait new data arrive.


Comments

Popular posts from this blog

commonjs - How to write a typescript definition file for a node module that exports a function? -

openid - Okta: Failed to get authorization code through API call -

thorough guide for profiling racket code -