java - RXJava 2 Polling with multiple subscribers -
i start replacing/using rxjava2 polling instead of observer , listeners. there 1 problem. ive 1 polling observable should started if @ least 1 subscriber connected. if multiple subscribers connected interval should same. means: 1 observable repeat polling process n seconds. if observable has 1..* subscribers should keep polling n seconds , notify subscribers result.
this how have done listeners and/or rxjava solution.
my first try creating singleton class creates 1 publishsubject. if subscribed data in onnext(). polling observer started somewhere , push data subject. doesnt work excepted since it's
- bad pattern design
- doesnt start if subscriber connected , stops if no subscribers available
doesnt share data successfull , requires 2 classes (for subject , repeating observable)
public class singletonclass { private publishsubject<list<data>> subject = publishsubject.create(); public publishsubject getsubject() { return this.subject; } public void setdata(list<data> data) { subject.onnext(data); } }
i lovely avoid listener/interfaces share informations around , let rxjava2 job.
after research figured out there refcount() , share() im not sure if proper way solving this. in case it's rest-service polls server if @ least 1 subscriber connected elsehow should stop polling since doesnt make sense getting data in case.
i tried solve onec doesnt work excepted:
i this:
observable<data> datasource = observable.interval(interval, time_unit) .observeon(schedulers.io()) // make rest requests on io threads .map(n -> { return requestdata(); }) .replay(1);
replay()
operator includes share()
, in turn includes publish()
, refcount()
functionalities. makes observable hot, i.e. subscribers share single subscription. automatically subscribes (starts new interval
sequence) first subscriber, , unsubscribes (stops interval
) when last subscriber gone.
replay(1)
caches last emitted value, i.e. new subscribers not have wait new data arrive.
Comments
Post a Comment