[Unity][C#] 非同期コールバック関数パターン

[Unity][C#] 非同期コールバック関数パターン

こんにちは、そして、お久しぶりです。
Aiming の土井です。
今回の開発者ブログでは、コールバックを伴う非同期プログラミングのデザインについて整理していこうと思います。

非同期関数をコールバックする

早速ですが、皆さんの中で UniRx, UniTask といった非同期ライブラリを使って開発している方も多いのではないでしょうか。最近では、Unity 単体でも Task を簡単に扱えるようになっており、手続き的な非同期処理を、見通し良く書くことができるようになりました。
非同期処理の代表的なデザインとして、メッセージをコールバック関数で受けるというやり方があります。こういった設計のフレームワークやライブラリを使ったこともあるのではないでしょうか。

コールバックの例(UniRx)


...
void Start()
{
    HogeObservable.Subscribe(_ => Hoge());
}

void Hoge()
{
    // 同期処理
    Debug.Log("Synchronous");
}

UniRx だと、こんな感じですね。
ストリームになにかメッセージが流れてきたら、Subscribe() に渡されたコールバック関数が呼び出されます。
コールバック関数では Hoge() が呼び出され、コンソールに文字列が表示されます。

では、Hoge が非同期関数の場合どうなるでしょう?

非同期コールバック関数の例


...
int counter = 0;

void Start()
{
    HogeObservable.Subscribe(_ => HogeAsync());
}

async Task HogeAsync()
{
    counter++;
    await Task.Delay(1000);
    Debug.Log($"{counter}");
}

コールバック内で非同期関数を呼び出しています。await Task.Delay(1000); のところで1秒待ってから、コンソールに文字列を出力し、処理を完了します。
しかしながら…このコード、なにか臭います。
1秒以内に HogeObservable のストリームに3つのメッセージが流れてきた場合の結果を見てみましょう。

出力

3  // 1を期待していた
3  // 2を期待していた
3

非同期コールバック関数を扱う場合、このように、並行に処理されてしまうことが原因で、意図しない挙動をすることがあります。
コードをパッと見るだけでは気づきにくい、非同期処理特有の問題と言えるでしょう。

今回の開発者ブログでは、非同期コールバック関数に注目して、設計時に役立つデザインのパターンを考えていきたいと思います。

レースコンディションについて理解する

複数の処理が並行していることが原因で生じる問題をレースコンディションと言います。

先程の HogeAsync() の例では、同時に3つのコールバック呼び出しが並行し、意図せぬ結果を招いていました。まさに、レースコンディションです。
レースコンディションを回避するためには、関数の実行状態を管理・制御したり、処理の順序を見直したり…といった対策は取れるのですが、これが結構たいへんです。

レースコンディション対策のためのコードを加えていくことによって、本質的な処理とは無関係なコードが混ざってくるので、見通しが悪くなり、コードが読みにくくなっていくのが常でしょう。
同期処理では起きることのないこの問題に対して、解決策を見出して行かなければなりません。

そこで、非同期処理を設計する際の指針として、以下のようなパターンを3つ考えてみましょう。

非同期コールバック関数を扱うパターン

並列(並行)処理パターン

きっと全部うまくいく。気にしない(ドキドキ)

レースコンディションを意識しなくて良い処理。非同期処理が並行して実行されることを許容するパターンです。処理が並行に行われても大丈夫なように実装がされている必要があります。

直列処理パターン

はいはいー、順番にやるから、そこに1列に並んでね。列の後ろの方にある処理は、実行が遅れることもあるから気をつけてね

すべての非同期処理を直列化(同時に一つだけ実行するように)して、処理が並行して実行されることを防ぎ、レースコンディションを回避します。

スイッチパターン

マジ、差し込みでスマンだけど、こっちの仕事最優先でおねがい。え?さっき振った仕事?あれは、もうやんなくていいよ?気持ち切り替えていこ?

実行中の処理は止めて、最後に発生した非同期処理だけを実行し、レースコンディションを回避します。

どのパターンが適切か判断する

実装しようとしている仕様に対して、レースコンディションを回避する必要があるか否か、どのようにレースコンディションを回避するかでこれらのパターンに振り分けると良いでしょう。
次に、これらの判断基準をもう少し明確に持てるよう、具体的な実装例を踏まえて詳しく紹介していこうと思います。

Unity + シングルスレッドを想定しています。スレッド間の競合を意識しなければならないような状況では、排他制御が必要になるなど、ここで紹介するパターンでは扱いきれないため、注意が必要です。

並列(並行)処理パターンの例

非同期コールバックが並行しても問題ないと言い切れるなら、何も考えずに非同期コールバックを呼び出してしまいましょう。


HogeObservable.Subscribe(_ => HogeAsync());
async Task HogeAsync()
{
    // 非同期処理
    await Task.Delay(1000);
    Debug.Log("Asynchronous");
}
```

HogeAsync は1秒まってから、文字列をコンソールに表示するだけの関数です。コンテクストに依存せず、何度呼び出されても結果は同じですね(冪等性を持っている)。こういった冪等性を持った関数は、ほとんどの場合、並行実行しても問題ないでしょう。これを一つ判断基準として持っておくと良いでしょう。

上述のコードのような、await されない HogeAsync() 呼び出しは、警告が表示されてしまいます。また、非同期処理内で例外が発生した際に気づくことができなくなります。
そのため、以下のように、UniTask の Forget() を使いましょう。


HogeObservable.Subscribe(_ => HogeAsync().Forget());
async UniTask HogeAsync()
{
    // 非同期処理
    await Task.Delay(1000);
    Debug.Log("Asynchronous");
}

直列処理パターンの例

非同期処理を要求された順に従って、一つ一つ順番に実行します。原理的に非同期処理が並行実行されることが無いので、レースコンディションが発生することはありません。
ただし、前の処理が完了するまで後続の処理は待たされるため、実行に遅延が発生することがあります

マルチスレッドデザインパターンにおける、Producer-Consumerパターンに含まれるパターンといえます。
ここでは処理を直列化する機能をもった調整役を置き、一つ一つ順番に実行していくことでレースコンディションを回避します。

直列化するためには、非同期キューを使うのが良いでしょう。自前で開発しても良いですが、拙作の非同期タスクキューライブラリ(TaskQueue)がありますので、ここではそれを利用したコードをサンプルとして示したいと思います。

TaskQueue には他にも、キューサイズの制限や、優先度による実行順の制御など、細やかな制御を行うための機能があります。

TaskQueue を Unity Project にインストールして、非同期処理を直列化します。


TaskQueue taskQueue = new();

void Start()
{
    taskQueue.Start(destroyCancellationToken);
    HogeObservable.Subscribe(_ => taskQueue.Enqueue(HogeAsync));
}

async Task HogeAsync(CancellationToken ct)
{
    // 非同期処理
    await Task.Delay(1000, cancellationToken: ct);
    Debug.Log("Asynchronous");
}

コールバックが呼び出されるたびに、非同期関数をキューに追加していくようにします。キューに積まれたタスクは、その順番に従って一つずつ実行されていきます。

UI においては、先行入力を許容するようなケースであったり、
ゲームのロジックにおいては、非同期処理中心のコマンドパターンを扱っているなど、実行順が重要な場合は、直列パターンが最適といえます。

直列処理は、遅延が発生する可能性があるため、キビキビとした反応を要求する局面では有用では無いかもしれません。その場合は、次のスイッチパターンを適用できないか検討してみましょう。

スイッチパターンの例

実行中の処理があれば停止し最新のコールバック要求のみを処理するようにします。非同期処理呼び出しの遅延は発生しません。

一見、安全そうで使いやすいパターンに見えますが、進行中の処理を中断するということは、相応の危険があります。実行中の処理を安全に中断ができるかどうかは非同期処理の実装次第です。キャンセル処理を慎重に実装する必要があるでしょう。

実行中の処理を中断する機構を作るのは、これまた意外と大変なので、先程紹介した TaskQueue を使っていきます。

TaskQueueを使ったスイッチパターン

TaskQueue のキューサイズを制限し、キューが溢れた場合に処理を入れ替える(TaskQueueLimitType.SwapLast)指定をして、タスクスイッチングを実現します。


TaskQueue taskQueue = new TaskQueue(TaskQueueLimitType.SwapLast, maxSize: 1); // キューのサイズを1に制限し、あふれる場合は処理を入れ替えるようにする

void Start()
{
    taskQueue.Start(destroyCancellationToken);
    HogeObservable.Subscribe(_ => taskQueue.Enqueue(HogeAsync));
}

async Task HogeAsync(CancellationToken ct)
{
    // 非同期処理
    await Task.Delay(1000, cancellationToken: ct);  // キャンセルできるようにしましょう
    Debug.Log("Asynchronous");
}

ストリームにメッセージが来たタイミングで、実行中の処理はキャンセルされ、レースコンディションを防ぎつつ、メッセージに対しても即座に応答ができるようになります。

[おまけ] UniRx + UniTask の例

最後に、TaskQueue を使わないで実現する方法も考えてみます。
UniRx で受けた非同期コールバックをキャンセルするのは、意外と大変です。

UniTaskをCancellationTokenを指定しながらToObservableするメモ @toRisouPさん
これを実現するために有用なポストがあったので、まずは、この記事を参考に ObservableConverter.FromUniTask を実装してしまうのが近道です。

Observable の Dispose と連動して非同期処理をキャンセルできるようになれば、あとは、Select().Switch() でつなぐだけでスイッチパターンを実現することができるでしょう。


HogeObservable
  .Select(_ => ObservableConverter.FromUniTask(ct => HogeAsync(ct)))
  .Switch()
  .Subscribe();

async UniTask HogeAsync(CancellationToken ct)
{
    // 非同期処理
    await Task.Delay(1000, cancellationToken: ct);  // キャンセルできるようにしましょう
    Debug.Log("Asynchronous");
}

最後に

いかがでしたでしょうか?
今までこの手の非同期処理を曖昧にしていた人(私)は、コールバックをともなう非同期処理に補助線をひいて考えることができるようになったのではないでしょうか?
非同期処理は難しいとよく言われます。難しいからこそ、シンプルな記述で、流れを簡単に追えるようにしておきたいですね!

それでは、また。