続:Realmインスタンスを、Observableのunsubscribe時に同じスレッドでunsubscribeする

このエントリーをはてなブックマークに追加

前回の続きです。

Observable#usingを使うと、Observableのunsubscribe時にRealmインスタンスを同じスレッド上で閉じることができるよ、と前回の記事で書きました。

結論から言うと前回のコードだけでは足りませんでした。
大体の場合において同じスレッド上でcloseされるのですが、たまに別スレッドで閉じられてしまう、ということがわかりました。

Twitter上でうんうん言いながらRxJavaのコードを読みながら再現するコードを書こうとしてたら、@hydrakecatさんが捕捉してくださり、さくっと再現させてくれました。

最初は毎回再現するわけじゃないしバグなのかな? と思っていたのですが、どうやらObservable#subscribeOnはunsubscribeするスレッドを関知しないようでした。

じゃあObservable#unsubscribeOn指定したらいいじゃん? というわけにもいきません。Schedulers.io()等のRxJavaが標準で用意しているスケジューラは、実行時にスレッドプールから適当なスレッドを渡すためです。

シングルスレッドのスケジューラだとRealm関連の操作が全部直列になってしまうしパフォーマンス的に良くないよな〜と思っていろいろ考えたのですが、妙案は思いつかず。
最後の手段、ということでRxJavaのGithubレポにissueを投げてみました。

Question about Observable.using's resourceFactory & disposeAction #4197 ReactiveX/RxJava

回答は以下のような感じでした。

Hi.

1) Yes. The operator doesn't deal with scheduling.
2) Not with subscribeOn; try unsubscribeOn but you need a single-threaded Scheduler as all the default ones will give you different threads most likely.

やはり、@hydrakecatさんと@chibatchingさんがおっしゃっていたようにObservable#unsbscribeOnとシングルスレッドのスケジューラを合わせるのが正攻法のようです。

最終的な解決策は上述のissueにも書きましたが、複数のシングルスレッドスケジューラを順番に使い回す、という形になりました。

public class RealmSchedulerPool {

    private final static String PREFIX = "RealmScheduler-";

    private final static List<Scheduler> SCHEDULERS = new ArrayList<Scheduler>() {{
        add(Schedulers.from(Executors.newSingleThreadExecutor(new RxThreadFactory(PREFIX + "1-"))));
        add(Schedulers.from(Executors.newSingleThreadExecutor(new RxThreadFactory(PREFIX + "2-"))));
        add(Schedulers.from(Executors.newSingleThreadExecutor(new RxThreadFactory(PREFIX + "3-"))));
    }};

    private final static AtomicLong COUNT = new AtomicLong(0L);

    private RealmSchedulerPool() {
        // no-op
    }

    public static Scheduler get() {
        long current = COUNT.getAndIncrement();
        return SCHEDULERS.get((int) (current % 3)); // 3 is the size of SCHEDULERS
    }
}


public static <T> Observable.Transformer<T, T> doInRealmScheduler() {  
  return tObservable -> {
    Scheduler s = RealmSchedulerPool.get();
    return tObservable
            .subscribeOn(s)
            .unsubscribeOn(s);
  };
}

これを前回のコードと合わせて下記のように使います。

asObservable()  
  .compose(doInRealmScheduler())
  .map(realm -> realm.where(Foo.class).findAll());

助言をくださった@hydrakecatさん、@chibatchingさん、ありがとうございました。