UniRxで課題だったRxとasync/awaitの連携がR3では楽になった件

UniRxで課題だったRxとasync/awaitの連携がR3では楽になった件

こんにちは。ソフトウェアエンジニアのyashiheiです。

最近はお絵描きにハマり毎月100時間ぐらいのめり込んでます。上手い絵は全然描けないですがコツコツものを作ってる感覚があり楽しいですね。

さて、筆者が関わってるプロジェクトではCysharp様が開発されているOSSのR3を導入しています。R3はまだリリースされてから日も浅いのですが、これまでUniRxで開発してて感じてた課題を解決しており、良い方向に進化していると感じました。その中でも特にRxとasync/awaitの連携はずっと欲しかったものであり、この記事では背景なども振り返りつつ掘り下げようと思います。UniRxからR3に移行しようと考えてる人の参考になれば幸いです。

Single Observable問題

R3から入門する人にとってはあまり関係が無い話ですが、UniTaskが無かったころは非同期処理を全てUniRxで書いてた時代がありました。単発の非同期処理をObservableで表現するので、1回しか流れないObservable(Single Observable)が現れます。

下のコードは以前所属してたプロジェクトのコードから適当にSingle Observableのコードを見繕ってきました。async/awaitで書き直したものも置いてます。どちらが読みやすいでしょうか?async/awaitで書いたほうが素直に読めると思います。

これならまだマシな方で、もっと数多くのSingle Observableが複雑にチェインされたコードもあった記憶があります。そうなるともう全然読めないです。

// HogePresenter.cs

// Single Observable
public IObservable<Unit> ShowAsObservable()
{
    return Observable.Defer(() =>
    {
        return dialogView == null
            ? Observable.Return(Unit.Default)
            : dialogView.CreateAsync() // 開発途中からUniTaskが入ったので中途半端にUniTaskになってたりする…
                .ToObservable()
                .Do(_ => Subscribe())
                .SelectMany(_ => dialogView.ShowAsObservable());
    });
}

// async/await
public async UniTask ShowAsync()
{
    if (dialogView == null)
    {
        return;
    }

    await dialogView.CreateAsync();
    Subscribe();
    await dialogView.ShowAsync();
}

なので、neueccさんのR3の思想にはとても共感できます。

R3のメイン思想としてSingle Observable死すべしというのがあり、というかUniTaskと併用してるなら不要というかむしろ使うべきじゃないはずなのに意外と残ってる感があり、Observable.ContinueWithやWhenAllの存在がそれを誘発しているのなら真っ先に死すべしという。

https://x.com/neuecc/status/1759718937162600537

単発の非同期処理はasync/awaitで表現しましょう。素直に上から下に読むことが出来るプログラムが読みやすいです。第一事業部だとまだUniTaskが無かった頃の非同期処理がSingle Observableで書かれてることが多く、未だに排除出来てない印象です。

UniRxではどうやってRxとasync/awaitを連携してたか

Single Observableを排除したところで、UniRxでRxとasync/awaitを連携させるのは工夫が必要で落とし穴もありました。プロジェクトごとに流派はありましたが、筆者が見てきた中だと以下のような実装です。

  • UniTask#ToObservableしてSelectManyなどに繋ぐ
    • .SelectMany(_ => HogeAsync().ToObservable())
    • Taskのキャンセルが出来ない
      • (全体的にCancellationTokenをリレーさせてないコードベースだったので見逃されてたという背景もあります)
      • SelectManyを通ってしまうとToObservable内でTaskがFire and Forgetされるので制御出来ないTaskがあった
      • UniTaskをCancellationTokenを指定しながらToObservableするメモObservableConverter#FromUniTask のような対策を積む必要があると思います
  • Subscribe内でUniTaskをFire and Forget
    • Subscribe(_ => HogeAsync(ct).Forget())
    • HogeAsyncの中に一連の非同期処理を全て詰める
    • イベントが連続するとTaskが並列に走ってしまう問題がある
  • TaskQueueを使う

UniTaskAsyncEnumerableでは駄目なのか

ViewのイベントをUniTaskAsyncEnumerableに変換してForEachAwaitAsync内で非同期処理を書くことも出来るのですが、Observableの互換として扱うには厳しいという判断になりました。以前社内のknowledgeにその理由を書いてたので引用します。

  • UniTask.LinqはUniRxのオペレーターを完全に補完は出来ない
    • 具体的には
      • Switch
      • WithLatestFrom
      • Throttle あたりなど
    • UniTaskAsyncEnumerableはPullベースなのでObservableの完全な互換が出来ない
    • 一時変数を用意したりとか泥臭い書き方になった記憶
      • あとSwitchやろうとしたらキャンセルがめっちゃめんどい
    • やっぱPushなObservable欲しい
  • UniTaskAsyncEnumerableは書き手がクセを理解しないと怖い
    • 期待してたブロッキングにもなるが、逆にQueue挟まないと歯抜けしてしまうケースなども有りそう
    • Pushに慣れたチームにはしんどいかも(Pullから入ったら自然に思えるかもしれない)
  • イベント通知用にChannelをラップしたPubSubを作って公開してたが何処からもPublish出来るのちょっと怖いなってなった
    • UniRxでSubject生成して外部にObserveableを公開してたところ

R3のSubscribeAwait

R3ならば上記のことは気にしなくて良いです。

var subject = new Subject<int>();
var timeProvider = new FakeTimeProvider();

var liveList = new List<int>();
using var _ = subject
    .SubscribeAwait(async (x, ct) =>
    {
        await Task.Delay(TimeSpan.FromSeconds(3), timeProvider, ct);
        liveList.Add(x * 100);
    }, AwaitOperation.Sequential);

https://github.com/Cysharp/R3/blob/1.2.6/tests/R3.Tests/OperatorTests/SubscribeAwaitTest.cs#L16-L25

(R3のテストコードから抜粋)Subscribe内でasync/awaitが書けるようになります。むしろ最初からこう書きたかったぐらい自然な構文だと感じました。実際UniRxに習熟してないメンバーがSubscribeに渡すラムダ式にそのままasync/awaitを書いて、レビューで指摘されるケースが多かったです…。

Select/Where でも同様の対応が積まれてます。

挙動を理解したい

挙動を理解したかったので、R3のSubscribeAwait周辺のコードを写経しました。Ver.1.2.6を参照してます。どのように実装が使われているか証明されているテストコードから写経することで、全体像が理解しやすかったです。

AwaitOperation

当然ながらイベントの間隔と非同期処理の長さは一致しません。なので非同期処理が重なる場合どうやって扱うか決める必要があります。SubscribeAwaitではAwaitOperationを指定することで制御できます。

AwaitOperation解説図

AwaitOperationObserver

/// <param name="maxConcurrent">This option is only valid for AwaitOperation.Parallel and AwaitOperation.SequentialParallel. It sets the number of concurrent executions. If set to -1, there is no limit.</param>
public static IDisposable SubscribeAwait<T>(this Observable<T> source, Func<T, CancellationToken, ValueTask> onNextAsync, Action<Exception> onErrorResume, Action<Result> onCompleted, AwaitOperation awaitOperation = AwaitOperation.Sequential, bool configureAwait = true, bool cancelOnCompleted = false, int maxConcurrent = -1)
{
    switch (awaitOperation)
    {
        case AwaitOperation.Sequential:
            return source.Subscribe(new SubscribeAwaitSequential<T>(onNextAsync, onErrorResume, onCompleted, configureAwait, cancelOnCompleted));
        case AwaitOperation.Drop:
            return source.Subscribe(new SubscribeAwaitDrop<T>(onNextAsync, onErrorResume, onCompleted, configureAwait, cancelOnCompleted));
        case AwaitOperation.Parallel:
            if (maxConcurrent == -1)
            {
                return source.Subscribe(new SubscribeAwaitParallel<T>(onNextAsync, onErrorResume, onCompleted, configureAwait, cancelOnCompleted));
            }
            else
            {
                if (maxConcurrent == 0 || maxConcurrent < -1) throw new ArgumentException("maxConcurrent must be a -1 or greater than 1.");
                return source.Subscribe(new SubscribeAwaitParallelConcurrentLimit<T>(onNextAsync, onErrorResume, onCompleted, configureAwait, cancelOnCompleted, maxConcurrent));
            }
        case AwaitOperation.Switch:
            return source.Subscribe(new SubscribeAwaitSwitch<T>(onNextAsync, onErrorResume, onCompleted, configureAwait, cancelOnCompleted));
        case AwaitOperation.SequentialParallel:
            throw new ArgumentException("SubscribeAwait does not support SequentialParallel. Use Sequential for sequential operation, use parallel for parallel operation instead.");
        case AwaitOperation.ThrottleFirstLast:
            return source.Subscribe(new SubscribeAwaitThrottleFirstLast<T>(onNextAsync, onErrorResume, onCompleted, configureAwait, cancelOnCompleted));
        default:
            throw new ArgumentException();
    }
}

https://github.com/Cysharp/R3/blob/1.2.6/src/R3/Operators/SubscribeAwait.cs#L20-L48

SubscribeAwaitの内部では、渡されたAwaitOperationに応じたObserverが生成され購読されます。

AwaitOperation.csにこれらのObserverが実装されてます(WhereとSelectにも対応する必要があるので各Observerがabstractで実装されてますがここでは説明を省略します)

AwaitOperationのObserver実装をOnNextCoreから読むことでより具体的に挙動が掴めました。

Observerの実装
  • AwaitOperationSequentialObserver
    protected override sealed void OnNextCore(T value)
    {
        channel.Writer.TryWrite(value);
    }
    
    • OnNextCoreでChannel(容量制限無し)にWrite
    • RunQueueWorkerで順次Read
  • AwaitOperationDropObserver
    protected override sealed void OnNextCore(T value)
    {
        if (Interlocked.CompareExchange(ref runningState, 1, 0) == 0)
        {
            StartAsync(value);
        }
    }
    
    • 既にrunningStateが1なら無視
  • AwaitOperationSwitchObserver
    protected override sealed void OnNextCore(T value)
    {
        CancellationToken token = cancellationTokenSource.Token;
        lock (gate)
        {
            if (running)
            {
                if (IsDisposed) return;
                cancellationTokenSource.Cancel();
                cancellationTokenSource = new CancellationTokenSource();
                token = cancellationTokenSource.Token;
            }
            running = true;
        }
    
        StartAsync(value, token);
    }
    
    • 一つ前のTaskをキャンセルしてから新たにTaskを実行
  • AwaitOperationParallelObserver
    protected override sealed void OnNextCore(T value)
    {
        Interlocked.Increment(ref runningCount);
        StartAsync(value);
    }
    
    • そのまま実行
  • AwaitOperationParallelConcurrentLimitObserver
    protected override sealed void OnNextCore(T value)
    {
        lock (gate)
        {
            if (runningCount < maxConcurrent)
            {
                runningCount++;
                StartAsync(value);
            }
            else
            {
                queue.Enqueue(value);
            }
        }
    }
    
    • 無印Parallelとは違ってmaxConcurrentを超えてたら一旦Queueに入れる
  • AwaitOperationThrottleFirstLastObserver
        protected override sealed void OnNextCore(T value)
        {
            channel.Writer.TryWrite(value);
        }
    

まとめ

  • 単発の非同期処理はasync/awaitで表現するのが読みやすい(R3ではSingle Observableを誘発しない設計になっている)
  • R3はRxとasync/awaitの連携を迷うことなく書くことが出来る
  • 非同期処理が重なったときもAwaitOperationでパターンに応じた対応が出来る

紆余曲折があってやっと書きたかった形でViewのイベントと非同期処理が書けるようになったという印象です。R3使っていきましょう。