android - Collect results of 2 parallel Publishers in Flowable -
i'm quite new reactive programming trying apply rxjava collecting call , sms lists , combine single one. idea iterate cursor in parallel manner sequential items emitting. after items gathered single list.
but in dooncomplete not data collected (i have around 150 items , there're ~25 added). believe issue flowable.concat doesn't wait items that's suggestion.
public void readlatestactivity() { cursor callscursor = getcallscursor(); cursor smscursor = getsmscursor(); if (callscursor == null || smscursor == null) { return; } sortedset<mactivity> activities = new treeset<>((left, right) -> left.getreceivetime() < right.getreceivetime() ? 1 : 0); flowable.concat(getcallspublisher(callscursor), getsmspublisher(smscursor)) .subscribeon(schedulers.io()) .doonnext(new consumer<mactivity>() { @override public void accept(@nonnull mactivity mactivity) throws exception { if (mactivity != null){ activities.add(mactivity); } } }) .observeon(androidschedulers.mainthread()) .dooncomplete(new action() { @override public void run() throws exception { timber.d("dooncomplete"); callscursor.close(); smscursor.close(); mbus.post(new eactivities(activities)); //<--- here i'm getting ~25 items } }) .subscribe(); } private publisher<mactivity> getcallspublisher(cursor callscursor) { return flowable.fromiterable(rxcursoriterable.from(callscursor)) .take(callscursor.getcount() < limit ? callscursor.getcount() : limit) .subscribeon(schedulers.computation()) .parallel() .runon(schedulers.computation()) .map((function<cursor, mactivity>) this::readcallfromcursor) .sequential() .observeon(androidschedulers.mainthread()); } private publisher<mactivity> getsmspublisher(cursor smscursor) { return flowable.fromiterable(rxcursoriterable.from(smscursor)) .take(smscursor.getcount() < limit ? smscursor.getcount() : limit) .subscribeon(schedulers.computation()) .parallel() .runon(schedulers.computation()) .map((function<cursor, mactivity>) this::readsmsfromcursor) .sequential() .observeon(androidschedulers.mainthread()); } methods reading sms , calls working fine. believe issue lies somewhere else.
also i've found class makes cursor act iterable
public class rxcursoriterable implements iterable<cursor> { private cursor miterablecursor; public rxcursoriterable(cursor c) { miterablecursor = c; } public static rxcursoriterable from(cursor c) { return new rxcursoriterable(c); } @override public iterator<cursor> iterator() { return rxcursoriterator.from(miterablecursor); } private static class rxcursoriterator implements iterator<cursor> { private final cursor mcursor; public rxcursoriterator(cursor cursor) { mcursor = cursor; } public static iterator<cursor> from(cursor cursor) { return new rxcursoriterator(cursor); } @override public boolean hasnext() { return !mcursor.isclosed() && mcursor.movetonext(); } @override public cursor next() { return mcursor; } } }
i think problem lies in comparator use when creating treeset. not return -1 when left.getreceivetime() > right.getreceivetime(), i.e. activities erroneously considered duplicate.
i suggest simplifying getcallspublisher code this:
return flowable.generate(getcallscursor(), (cursor, emitter) -> { if (!cursor.isclosed() && cursor.movetonext()) emitter.onnext(readcallfromcursor(cursor)); else emitter.oncomplete(); }, cursor::close) .take(limit); and getting rid of rxcursoriterable.
Comments
Post a Comment