読者です 読者をやめる 読者になる 読者になる

kitak.blog

Kみたいなエンジニアになりたいブログ

Rx.jsのControlled observableを試した

JavaScript Rx

ObservableのsubscribeOnNextに渡した関数(あるいはsubscribeの第一引数として渡した関数)でおこなう副作用の伴う処理、非同期処理が終わるまでの間、値の発行を止めたくなった。
受け取ったデータを蓄えつつ、受け取った順にひとつずつ一定時間表示するようなケース。(この記事を書いていて、わんこそばが頭に浮かんだ)

ひとつの非同期処理が終わるのを待って、次の非同期処理を実行するという点では、先日、書いた記事( async/awaitをつかった非同期処理の直列実行 - kitak.blog )と同じ。異なるのは、先の記事では、事前にトータルで何回非同期処理を行うか明らかになっているのに対して、今回は離散的に届くデータを扱うので、非同期処理をおこなう回数は未定という点。

色々調べたところ、Controlled observable( RxJS/backpressure.md at b0e20aab69b28ff0cbab68e536caff9533e50dfb · Reactive-Extensions/RxJS · GitHub )を使えば実現できそうだったのでメモしておく。

var Rx = require('rx-lite');

// 適当にtimerからObservableを生成しているが、WebSocketやUIイベントに置き換えること
var source = Rx.Observable.timer(200).controlled();  

source.subscribe((x) => {
  displayReceivedData(x)
    .then(() => {
      source.request(1);
    });
});

source.request(1);

Controlled observableはObservableのcontrolledメソッドを呼ぶことで生成でき、requestメソッドで受信する個数を指定できる。やりたいことに対して、Controlled observableを使うのはやりすぎな気もするので、もっと良いやり方があったら教えてください。