blog

Rxモデルと地図の原理

オブザーバー・パターン\nRxのデザインパターンは、実はObserverパターンを偽装したものなので、まず標準的なObserverパターンを理解する必要があります。\nこれが標準的なオブザーバー・パタ...

Jul 27, 2020 · 6 min. read
シェア

オブザーバーパターン

Rxデザインパターンは実際にはObserverパターンを偽装したものなので、まず標準的なObserverパターンを理解することが重要です。

これが標準的なオブザーバーパターンです。シンプルです。

Rx Hook

まずコードの一部を見てください:

 Observable.create(new ObservableOnSubscribe<String>() {
 @Override
 public void subscribe(ObservableEmitter<String> e) throws Exception {
 
 }
 })
 .map(new Function<String, String>() {
 @Override
 public String apply(String s) throws Exception {
 return null;
 }
 })
 .subscribe(new Observer<String>() {
 @Override
 public void onSubscribe(Disposable d) {
 disposable = d;
 }
 @Override
 public void onNext(String s) {
 }
 @Override
 public void onError(Throwable e) {
 }
 @Override
 public void onComplete() {
 }
 });

.createではnew ObservableCreateが返され、.mapではnew ObservableMapが返されます。必要なのは、この処理をインターセプトすることです。図のように

では、その真ん中には何があるのでしょうか?ソースコードを見てみましょう。createに移動します。

 		@CheckReturnValue
 @SchedulerSupport(SchedulerSupport.NONE)
 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
 ObjectHelper.requireNonNull(source, "source is null");
 return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
 }

真ん中のエリアはonAssemblyで、これはグローバル・フックです。

 		@CheckReturnValue
 @SchedulerSupport(SchedulerSupport.NONE)
 public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
 ObjectHelper.requireNonNull(mapper, "mapper is null");
 return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
 }

onAssemblyにアクセスして中に入ってください。

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
 Function<? super Observable, ? extends Observable> f = 			 onObservableAssembly;
 if (f != null) {
 return apply(f, source);
 }
 return source;
 

onObservableAssembly デフォルトはnullなので、これは何もしません。onObservableAssembly 次に、値が代入される場所を見てください。

 /**
 * Sets the specific hook function.
 * @param onObservableAssembly the hook function to set, null allowed
 */
 @SuppressWarnings("rawtypes")
 public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {
 if (lockdown) {
 throw new IllegalStateException("Plugins can't be changed anymore");
 }
 RxJavaPlugins.onObservableAssembly = onObservableAssembly;
 }

コメントから明らかなように、指定されたフック関数をセットするので、この関数を呼び出して独自の関数をセットすることで、グローバルrxをインターセプトすることが可能です。

RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
 @Override
 public Observable apply(Observable observable) throws Exception {
 //ここで、次のようなインターセプトができる。
 //nullを返すと、それ以降のrxの使用はすべてnullポインタになる。
 return observable;
 }
 });
 

Rxオブザーバーパターン

まず、Observable を作成し、次に Observer を作成します。

オブザーバーのソースコードを見る

public interface Observer<T> {
 /**
 * Provides the Observer with the means of cancelling (disposing) the
 * connection (channel) with the Observable in both
 * synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
 * @param d the Disposable instance whose {@link Disposable#dispose()} can
 * be called anytime to cancel the connection
 * @since 2.0
 */
 void onSubscribe(@NonNull Disposable d);
 /**
 * Provides the Observer with a new item to observe.
 * <p>
 * The {@link Observable} may call this method 0 or more times.
 * <p>
 * The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
 * {@link #onError}.
 *
 * @param t
 * the item emitted by the Observable
 */
 void onNext(@NonNull T t);
 /**
 * Notifies the Observer that the {@link Observable} has experienced an error condition.
 * <p>
 * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
 * {@link #onComplete}.
 *
 * @param e
 * the exception encountered by the Observable
 */
 void onError(@NonNull Throwable e);
 /**
 * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
 * <p>
 * The {@link Observable} will not call this method if it calls {@link #onError}.
 */
 void onComplete();
}

お分かりのように、これは単なるインターフェイスです。 本当の実装は実装です。

Observableのソースコードを見てください。

Observable.createのソースコードを見てみました。

 		@CheckReturnValue
 @SchedulerSupport(SchedulerSupport.NONE)
 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
 ObjectHelper.requireNonNull(source, "source is null");
 return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
 }

ご覧のように、渡されたパラメータは、渡されたカスタム・ソースです。

サブスクリプション・プロセス分析

subscribe() をタップしてください。

 @SchedulerSupport(SchedulerSupport.NONE)
 @Override
 public final void subscribe(Observer<? super T> observer) {
 ObjectHelper.requireNonNull(observer, "observer is null");
 try {
 observer = RxJavaPlugins.onSubscribe(this, observer);
 ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
 subscribeActual(observer);
 } catch (NullPointerException e) { // NOPMD
 throw e;
 } catch (Throwable e) {
 Exceptions.throwIfFatal(e);
 // can't call onError because no way to know if a Disposable has been set or not
 // can't call onSubscribe because the call might have set a Subscription already
 RxJavaPlugins.onError(e);
 NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
 npe.initCause(e);
 throw npe;
 }
 }

subscribeActual(observer);ObservableCreate. subscribe().subscribe()は実際には、実行が完了するとすぐにcreateはObservableCreateオブジェクトを返します。 従って、subscribeActualの実装はObservableCreateの中になければなりません。

 @Override
 protected void subscribeActual(Observer<? super T> observer) {
 CreateEmitter<T> parent = new CreateEmitter<T>(observer);
 observer.onSubscribe(parent);
 try {
 source.subscribe(parent);
 } catch (Throwable ex) {
 Exceptions.throwIfFatal(ex);
 parent.onError(ex);
 }
 }

新しいエミッタが受信オブザーバをラップし、カスタムソースがsubscribe()を呼び出すことで、両端が接続されていることがわかります。

全体がコの字型の構造

Rx Hook

通常のオブザーバパターン:1人のオブザーバが複数のオブザーバに対応 オブザーバが変更の通知を送信 すべてのオブザーバが変更

Rxのオブザーバーパターン:複数のオブザーバーが1つのオブザーバーに対応し、変更の通知が送信され、終了点が変更される前に、開始点と終了点に再度サブスクライブする必要があります。

地図演算子の原理

サンプルコード

 			// ObseravbleCreate カスタムソースが渡される。== source
 Observable.create(
 // カスタムソース
 new ObservableOnSubscribe<String>() {
 @Override
 public void subscribe(ObservableEmitter<String> e) throws Exception {
 }
 })
 // ObseravbleCreate.map
 .map(new Function<String, Bitmap>() {
 @Override
 public Bitmap apply(String s) throws Exception {
 return null;
 }
 })
 // ObservableMap.subscribe
 .subscribe(
 // カスタムオブザーバー
 new Observer<Bitmap>() {
 @Override
 public void onSubscribe(Disposable d) {
 }
 @Override
 public void onNext(Bitmap bitmap) {
 }
 @Override
 public void onError(Throwable e) {
 }
 @Override
 public void onComplete() {
 }
 });

同じです。

ご覧の通り、封印と開封のプロセスが追加される以外は、サブスクリプション・プロセスと同様です。

タマネギのモデルです。

実際、Rxも装飾的なパターンを使用しています。

おそらく次のようなモデルでしょう。

そうですね......実は、Rxは標準的なU字型の構造ではなく、むしろそうなんです:

何かを注文するのと同じように ---> 小包に封をする ---> 小包を開ける

Read next

一般的な時間フォーマットの方法

自分で使うための時間フォーマット法、小さな機能のためにパッケージファイルを導入するのは好きではありません。

Jul 27, 2020 · 2 min read