リアクティブスパゲティを避けるための2つの原則

リアクティブスパゲティを避けるための2つの原則

こんにちは!Aiming ソフトウェアエンジニアの栗本です。

AimingではUnityでのイベントハンドリングや非同期タスクの処理に UniRx というライブラリを使っています。
UniRxとは Rx (Reactive Extensions) という仕組みをUnityで扱うためのライブラリです。
これを使うことで、イベントと非同期タスクを IObservable という共通のインターフェースにし、 Linq と似たオペレーターを使うことができるようになります。

Rxを利用すると、従来ではコールバックの入れ子になってしまうようなコードを宣言的に記述することができるようになり、大変便利です。
正しく扱えば強力なRxですが、しかし一歩間違えば「リアクティブスパゲティ」と呼ばれる、コールバックが複雑に入り組む危険なコードを書いてしまいやすいという問題があります。
今回の記事では、この「リアクティブスパゲティ」を避けるために私達のプロジェクトで心がけている2つの原則を紹介します。

なお、記述を簡単にするため、記載のコードでは一部の型や破棄処理を省略しています。

Observableを生成する関数は純粋にする

プログラミングにおいて「純粋」とは、その関数が参照透過であることを意味します。
厳密な定義とは多少違いますが、ここでは便宜上純粋であることを「引数以外の変更可能な値を読まず、状態を変更しない」とします。

RxではタスクやイベントはObservableというインターフェースになり、オペレーターを使うことで組み合わせたり変更したりすることができます。
しかし、その操作に純粋でないものが混ざっていると、使い方によっては想定とは違う振る舞いをしてしまうことがあります。

// bad
IObservable ImpureObservable(arg)
{
  if (arg == mutableProperty) // 変更可能な値の読み取り
  {
    DoSomething(); // 状態の変更
  }
  ...
}

// good
IObservable PureObservable(arg)
{
  if (arg == constant) // 引数と定数の読み取り
  {
    return Observable.Start(DoSomething); // 動作をするObservableの作成
  }
  ...
}

勘違いをされることが多いのですが、ここで問題にしているのは「副作用を起こすObservable」ではなく、「Observableを生成する関数が持つ副作用」のことです。
多くの場合、Observableは生成と同時に購読されるので、この2つの差が問題になることはあまりありません。
しかし、以下のような例では生成と購読のタイミングが大きく異なり、Observable提供側の意図と違う動作をしてしまう可能性があります。

// 生成と購読のタイミングが異なる素朴な例
_request.SelectMany(GetObservable()).Subscribe(); // 引数に直接Observableを取るSelectMany
_request.Select(_ => GetObservable()).Concat().Subscribe(); // Observable.Concat

このようにして起こった意図せぬタイミングでの副作用は気付きづらく、Observableを生成する関数の副作用を購読側が考慮するのも大変です。
そのため、書いた時点で副作用が不具合にならないとしても、Observableを生成する関数は純粋を保つように心がけています。
また、生成関数を純粋にすることで、用意されたオペレーターをいつでもどこでも安心して使うことができ、Observableを生成するタイミングを気にする必要もなくなります。

値の読み書き等が付随する非同期の処理をしたい場合は、 Observable.Defer や Observable.Create を使って、副作用をObservableの購読時に行わせるように変更します。

// good
IObservable DeferredObservable(arg)
{
  // 中身全体をDeferで囲む
  return Observable.Defer(()=>
  {
    if (arg == mutableProperty)
    {
      DoSomething();
    }
    return ...;
  });
}

初期化以外でSubscribeを書かない

この原則は、簡単に言えば「同期的な関数は同期的に動作する」ことを目的としています。
また、明示的な購読を初期化時のみにすることで、「非同期的なふるまいは初期化をみれば分かる」ようにもなります。
初期化タイミングとは、MonoBehaviourならStartまたはAwake、そうでないならコンストラクタのタイミングにしています。

初期化以外のタイミングで非同期タスクを開始したい場合というのはよくあると思います。

// 事実上の非同期関数
void DoSomething()
{
  DoSomethingAsObservable().Subscribe();
}

このようなコードを書きたくなったときに、私達のプロジェクトでは以下のように書き換えるようにしています。

readonly ISubject<Unit> _onSomethingRequested = new Subject<Unit>();

// 初期化関数でSubscribeする
void Awake()
{
  _onSomethingRequested.SelectMany(DoSomethingAsObservable()).Subscribe();
}

// 同期的な関数は同期的な動作をする
void DoSomething()
{
  _onSomethingRequested.OnNext(Unit.Default);
}

このような単純な例では変数が増え、ただ複雑になっただけに見えてしまいます。
しかし、大きなプロダクトでは非常に多くの状態を取り扱うことになり、非同期周りでの不具合はどうしても起きてしまいがちです。
購読を初期化時にまとめることは、非同期的な動作の兼ね合いを調査・修正をする際に大きなメリットがあります。

// オペレーターを変えるだけで購読の仕方を変えることができる
void Awake()
{
  // すべて並列で行う
  _onSomethingRequested
    .SelectMany(DoSomethingAsObservable)
    .Subscribe();

  // 初回のみ行う
  _onSomethingRequested
    .First()
    .ContinueWith(DoSomethingAsObservable)
    .Subscribe();

  // 最新のみ行う
  _onSomethingRequested
    .Select(DoSomethingAsObservable)
    .Switch()
    .Subscribe();

  // 順番に行う
  _onSomethingRequested
    .Select(DoSomethingAsObservable)
    .Concat()
    .Subscribe();
}

このような使い分けを個別の関数でやろうとすると、フラグ・Disposable・コールバックなどがどんどん増えていきます。
たちまちリアクティブスパゲティの仲間入りしてしまいますし、大抵の場合はオペレーターの再発明に過ぎないです。

かつては、MonoBehaviourの場合でもInitializeという関数を作り、引数として渡された値を使った購読を許可していました。
しかし、オブジェクトを非アクティブ状態で生成するようにしたり、他のStartでInitializeを呼ぶようにしたりした際に動作が壊れてしまうことが何度かありました
そのため、現在は上の例と似たような感じでAsyncSubjectを作り、StartまたはAwakeでそれを購読するようにしています。

2つの原則を組み合わせると

1つ目の原則により、Observableを返す関数は純粋なので、この関数を呼ぶだけでは意味はなく、かならず返り値が使われることが分かります。
2つ目の原則により、Observableを返さない関数では IObservable.Subscribe() は使えません。
さて、ここでIObservableの定義はSubscribeのみなので、この2つの原則を組み合わせると「初期化以外の同期的な関数ではObservableを使わない」という方針になることが分かります。
厳しい制限ではありますが、この方針に従うことでObservable周りのコードスタイルを統一でき、変更をある程度機械的に行えるようになります。

例えば以下のようなコードがあったときに、関数の一つを非同期化する (Observable化する) 必要が出てきたとしましょう。

class X : MonoBehaviour
{
  public void DoX()
  {
    DoA();
    DoB();
    DoC(); // これをObservable化する
  }
}

そのまま関数をObservableにしただけでは購読されてないので動作しません。

class X : MonoBehaviour
{
  public void DoX()
  {
    DoA();
    DoB();
    DoCAsObservable(); // wrong: 純粋関数なのに返り値が使われていない
  }
}

初期化以外では購読できないので、この関数はIObservableを返す関数になります。

class X : MonoBehaviour
{
  public IObservable DoXAsObservable()
  {
    DoA(); // wrong: Observableを生成する関数なのに純粋でない
    DoB();
    return DoCAsObservable();
  }
}

この際、他の動作も一緒にObservableに入れなければいけません。

class X : MonoBehaviour
{
  // ok
  public IObservable DoXAsObservable()
  {
    return Observable.Defer(()=>
    {
      DoA();
      DoB();
      return DoCAsObservable();
    })
  }
}

さて、ここでこのObservableを「どのクラスが購読するか」を判断する必要が出てきます。
基本的には、非同期的な振る舞いの完了を受け取りたい場所か、購読をキャンセルしたい場所で行うのが良いでしょう。
購読をXの外で行いたい場合、今まで DoC に行ってきたことと同様の操作を DoX に対して再帰的に行います。
この関数を外から同期的に呼べるように保ちたい場合、初期化関数で購読するように修正します。

class X : MonoBehaviour
{
  readonly ISubject _onXRequested = new Subject;

  void Awake()
  {
    // 購読は初期化時に行う
    _onXRequested.SelectMany(DoXAsObservable()).Subscribe();
  }

  // 同期的な関数を公開する
  public void DoX()
  {
    _onXRequested.OnNext();
  }

  IObservable DoXAsObservable()
  {
    return Observable.Defer(()=>
    {
      DoA();
      DoB();
      return DoCAsObservable();
    })
  }
}

以上のように、この2つの原則を取り入れ、「どのクラスが購読するか」を明確にすれば、他は機械的な変換で非同期への対応ができることが分かります。

複雑なパターンへの適用

購読の有無がオブジェクトの内部状態に依存するが、その結果は受け取りたい、という場合があります。
「ボタンを押すたびにモデルの構築を非同期で行うが、その間ユーザーの入力はブロックしたくない。新しくボタンが押されたら以前のロードを中断し、表示に反映するのは最新のみにしたい。」のような状況です。

// Observableを公開する例
class ModelGenerator
{
  readonly ISubject<Unit> _onGenerateRequested = new Subject<Unit>();

  // 最新のみ動くようにする。
  // 書きづらい・読みづらい・使いづらい!
  public IObservable<Result> GenerateAsObservable(T param)
  {
    return GenerateInternalAsObservable()
      .TakeUntil(_onGenerateRequested)
      .DoOnSubscribe(()=>_onGenerateRequested.OnNext(Unit.Default));
  }
}
// 自身が購読する例
class ModelGenerator
{
  readonly ISubject<T> _onGenerateRequested = new Subject<T>();
  public ModelGenerator()
  {
    _onGenerateRequested.Select(GenerateInternalAsObservable).Switch().Subscribe();
  }

  // 終了を通知できない!
  public void Generate(T param)
  {
    _onGenerateRequested.OnNext(param);
  }
}

このような場合は同期的なリクエストと、結果のみを返すイベント的なObservableを組み合わせて実装しています。

// リクエストと結果を分ける
class ModelGenerator
{
  readonly ISubject<T> _onGenerateRequested = new Subject<T>();
  readonly ISubject<Result> _onGenerated = new Subject<Result>();

  public ModelGenerator()
  {
    _onGenerateRequested.Select(GenerateInternalAsObservable).Switch().Subscribe(_onGenerated);
  }

  public void Generate(T param)
  {
    _onGenerateRequested.OnNext(param);
  }

  public IObservable<Result> OnGeneratedAsObservable()
  {
    return _onGenerated;
  }
}

リクエストと結果の結びつきはなくなってしまいますが、どちらの関数も素直な動作をしています。
結果に対してどのようなリクエストがあったかも必要な場合は、Resultの中にその情報も入れて返すのが良いと思います。

まとめ

1つ目の原則ですが、純粋かどうかを正しく判断するのは難しいものの、単純な書き換えをするだけで見つけづらい不具合を回避できます。
Rxを使う場合は常に意識することをオススメします。
2つ目の原則は制限が大きいですが、非同期の書き方や振る舞いをかなり統一させることができます。
大きなプロダクトでRxを導入する際は検討してもいいのではないでしょうか。