android - Collect results of 2 parallel Publishers in Flowable -


i'm quite new reactive programming trying apply rxjava collecting call , sms lists , combine single one. idea iterate cursor in parallel manner sequential items emitting. after items gathered single list.

but in dooncomplete not data collected (i have around 150 items , there're ~25 added). believe issue flowable.concat doesn't wait items that's suggestion.

public void readlatestactivity() {     cursor callscursor = getcallscursor();     cursor smscursor = getsmscursor();      if (callscursor == null || smscursor == null) {         return;     }      sortedset<mactivity> activities = new treeset<>((left, right) -> left.getreceivetime() < right.getreceivetime() ? 1 : 0);      flowable.concat(getcallspublisher(callscursor), getsmspublisher(smscursor))             .subscribeon(schedulers.io())             .doonnext(new consumer<mactivity>() {                 @override                 public void accept(@nonnull mactivity mactivity) throws exception {                     if (mactivity != null){                         activities.add(mactivity);                     }                 }             })             .observeon(androidschedulers.mainthread())             .dooncomplete(new action() {                 @override                 public void run() throws exception {                     timber.d("dooncomplete");                     callscursor.close();                     smscursor.close();                     mbus.post(new eactivities(activities)); //<--- here i'm getting ~25 items                 }             })             .subscribe(); }  private publisher<mactivity> getcallspublisher(cursor callscursor) {     return flowable.fromiterable(rxcursoriterable.from(callscursor))             .take(callscursor.getcount() < limit ? callscursor.getcount() : limit)             .subscribeon(schedulers.computation())             .parallel()             .runon(schedulers.computation())             .map((function<cursor, mactivity>) this::readcallfromcursor)             .sequential()             .observeon(androidschedulers.mainthread());  }  private publisher<mactivity> getsmspublisher(cursor smscursor) {     return flowable.fromiterable(rxcursoriterable.from(smscursor))             .take(smscursor.getcount() < limit ? smscursor.getcount() : limit)             .subscribeon(schedulers.computation())             .parallel()             .runon(schedulers.computation())             .map((function<cursor, mactivity>) this::readsmsfromcursor)             .sequential()             .observeon(androidschedulers.mainthread()); } 

methods reading sms , calls working fine. believe issue lies somewhere else.

also i've found class makes cursor act iterable

public class rxcursoriterable implements iterable<cursor> {  private cursor miterablecursor;  public rxcursoriterable(cursor c) {     miterablecursor = c; }  public static rxcursoriterable from(cursor c) {     return new rxcursoriterable(c); }  @override public iterator<cursor> iterator() {     return rxcursoriterator.from(miterablecursor); }  private static class rxcursoriterator implements iterator<cursor> {      private final cursor mcursor;      public rxcursoriterator(cursor cursor) {         mcursor = cursor;     }      public static iterator<cursor> from(cursor cursor) {         return new rxcursoriterator(cursor);     }      @override     public boolean hasnext() {         return !mcursor.isclosed() && mcursor.movetonext();     }      @override     public cursor next() {         return mcursor;     } } } 

i think problem lies in comparator use when creating treeset. not return -1 when left.getreceivetime() > right.getreceivetime(), i.e. activities erroneously considered duplicate.

i suggest simplifying getcallspublisher code this:

return flowable.generate(getcallscursor(),         (cursor, emitter) -> {             if (!cursor.isclosed() && cursor.movetonext())                 emitter.onnext(readcallfromcursor(cursor));             else                 emitter.oncomplete();         }, cursor::close)     .take(limit); 

and getting rid of rxcursoriterable.


Comments

Popular posts from this blog

inversion of control - Autofac named registration constructor injection -

verilog - Systemverilog dynamic casting issues -

ios - Change Storyboard View using Seague -