repeatWhenでfilterを使ってもonCompletedは呼ばれない

掲題の通りです TL;DR Observable#repeatWhen使って複雑な繰り返し判定するときはObservable#filterじゃなくてObservable#takeWhileを使うとonCompletedまで呼ばれる 大量のデータをローカルに同期するAPIがあって、ページングの要領で複数回APIを叩いてすべてのデータを同期するとします。 こんな感じのコードなら動きます。 boolean isLoading = false; public Completable sync() { return Observable.defer(() -> { if (isLoading) { return Observable.empty(); } isLoading = true; return syncData() }) .repeatWhen(observable -&

続:Realmインスタンスを、Observableのunsubscribe時に同じスレッドでunsubscribeする

前回の続きです。 Observable#usingを使うと、Observableのunsubscribe時にRealmインスタンスを同じスレッド上で閉じることができるよ、と前回の記事で書きました。 結論から言うと前回のコードだけでは足りませんでした。 大体の場合において同じスレッド上でcloseされるのですが、たまに別スレッドで閉じられてしまう、ということがわかりました。 Twitter上でうんうん言いながらRxJavaのコードを読みながら再現するコードを書こうとしてたら、@hydrakecatさんが捕捉してくださり、さくっと再現させてくれました。 やはり頑張ってRxJavaとRealmのコードを読み込むしかない— せーい(yshrsmz) (@_yshrsmz) 2016年7月13日 @_yshrsmz うーん、再現したかも。とりあえず、このコードを何回も実行するとたまに main スレッドでクローズされますね。https://t.co/EHda8VQeEh— Hiroshi Kurokawa (@hydrakecat)

RealmインスタンスをObservableのunsubscribe時にcloseする

※2016/07/21更新: このコードだけだと必ずしも同じスレッドでcloseされないことがわかりました。"続:Realmインスタンスを、Observableのunsubscribe時に同じスレッドでunsubscribeする"もご確認ください。 RealmはRxJavaを公式でサポートしていますが、公式のRxサポートが動くのはUIスレッドなど、Looperのあるスレッドだけです。 そこでRealmObjectをObservableで返すメソッドを自前で書いたりするわけですが、Rx使ってるとメソッドがどのスレッドから呼ばれるかわからないので、Realmをどこでcloseしたらいいかわからなくなりがちです。 そんな状態だとclose漏れのRealmインスタンスが発生して、OutOfMemoryExceptionが発生してしまいます。 最初はdoOnUnsubscribeでやればいいじゃん?って思ってたんですが、doOnUnsubscribeはObserverのスレッドで呼ばれるみたいで、スレッド違いでエラーになってしまいます。 public Obseravble<Realm> asObservable() { return Observable.defer(new Func0<Observable&

CompositeSubscriptionは再利用できる

RxJavaの CompositeSubscription は一度 unsubscribe() してしまうと使い回すことができないので毎回インスタンス化しなきゃいけないと思っていたら、clear() というメソッドがあることを知った。 clear() は追加済みの Subscription の unsubscribe は行うが、 CompositeSubscription 自体の状態は変更しないメソッドだ。 これをメインで使うようにしたらちょっと面倒が減りそうな気がする。 すでに unsubscribe() してしまった CompositeSubscription に効果はないので注意が必要だけれども。 @Override public void unsubscribe() { if (!unsubscribed) { Collection<Subscription> unsubscribe = null;