Observable#intervalはunsubscribeしただけではonCompleteが呼ばれない

このエントリーをはてなブックマークに追加

Today I Learned的なことをやってみる

表題の通りなんだけど、Observable#intervalunsubscribeしただけでは、onCompleteまで呼ばれない。

Subscription subscription = Observable.interval(1, TimeUnit.SECONDS)  
  .subscribe(aLong -> {
    print("interval onNext: %d", aLong);
  }, throwable -> {
    print("interval onError");
  }, () -> {
    print("interval onComplete");
  });

Thread.sleep(3500)

subscription.unsubscribe();  

こんなコード(擬似コード)を書くと、

interval onNext: 0  
interval onNext: 1  
interval onNext: 2  
...

こんなログが出て、subscription.unsubscribe()が呼ばれたタイミングで、interval onCompleteが出力されることなくログの出力が終了する。

Observable#intervalはHot Observableで誰がsubscribeしてるとか関係ない感じなんだろうなーという認識。

どうやったらonComplete呼ばれるのん?っていうと下記スレッドでやり方が紹介されていた。
Call onComplete when an observable uses interval method.

Observable#takeUntilを使うらしい。引数として受け取ったObservableonNextが呼ばれたら、それより後に来る元のObservableonNextを全部無視するオペレータだ。

PublishSubject<Void> stop = PublishSubject.create();  
Subscription subscription = Observable.interval(1, TimeUnit.SECONDS)  
  .takeUntil(stop)
  .subscribe(aLong -> {
    print("interval onNext: %d", aLong);
  }, throwable -> {
    print("interval onError");
  }, () -> {
    print("interval onComplete");
  });

Thread.sleep(3500)

stop.onNext(null);  

こうするとonCompleteが呼ばれて終了する。

ただ、Observable#interval使った時だけPublishSubjectunsubscribeとかダルいなーって思ってたらこんなことができた

PublishSubject<Void> stop = PublishSubject.create();  
Subscription subscription = Observable.interval(1, TimeUnit.SECONDS)  
  .doOnUnsubscribe(() -> {
    print("interval onUnsubscribe");
    stop.onNext(null);
  })
  .takeUntil(stop)
  .subscribe(aLong -> {
    print("interval onNext: %d", aLong);
  }, throwable -> {
    print("interval onError");
  }, () -> {
    print("interval onComplete");
  });

Thread.sleep(3500)

subscription.unsubscribe();  

ログはこんな感じ

interval onNext: 0  
interval onNext: 1  
interval onNext: 2  
...
interval onUnsubscribe  
interval onComplete  

doOnUnsubscribeはどうやらObserverにイベントが来なくなるより前に実行されるみたい。

これでonCompleteは呼ばれるようになったけど、Observable#takeUntilのドキュメントを見る限りだとObservable#intervaleのストリームは依然続いてそうな感じもするし、onCompleteでしたい処理がある場合以外はこんなめんどくさいことしなくていいかもしれない。