オブザーバーパターン
Rxデザインパターンは実際にはObserverパターンを偽装したものなので、まず標準的なObserverパターンを理解することが重要です。
これが標準的なオブザーバーパターンです。シンプルです。
Rx Hook
まずコードの一部を見てください:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
}
})
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return null;
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
disposable = d;
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
.createではnew ObservableCreateが返され、.mapではnew ObservableMapが返されます。必要なのは、この処理をインターセプトすることです。図のように
では、その真ん中には何があるのでしょうか?ソースコードを見てみましょう。createに移動します。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
真ん中のエリアはonAssemblyで、これはグローバル・フックです。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
onAssemblyにアクセスして中に入ってください。
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
onObservableAssembly デフォルトはnullなので、これは何もしません。onObservableAssembly 次に、値が代入される場所を見てください。
/**
* Sets the specific hook function.
* @param onObservableAssembly the hook function to set, null allowed
*/
@SuppressWarnings("rawtypes")
public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
RxJavaPlugins.onObservableAssembly = onObservableAssembly;
}
コメントから明らかなように、指定されたフック関数をセットするので、この関数を呼び出して独自の関数をセットすることで、グローバルrxをインターセプトすることが可能です。
RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
@Override
public Observable apply(Observable observable) throws Exception {
//ここで、次のようなインターセプトができる。
//nullを返すと、それ以降のrxの使用はすべてnullポインタになる。
return observable;
}
});
Rxオブザーバーパターン
まず、Observable を作成し、次に Observer を作成します。
オブザーバーのソースコードを見る
public interface Observer<T> {
/**
* Provides the Observer with the means of cancelling (disposing) the
* connection (channel) with the Observable in both
* synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
* @param d the Disposable instance whose {@link Disposable#dispose()} can
* be called anytime to cancel the connection
* @since 2.0
*/
void onSubscribe(@NonNull Disposable d);
/**
* Provides the Observer with a new item to observe.
* <p>
* The {@link Observable} may call this method 0 or more times.
* <p>
* The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
* {@link #onError}.
*
* @param t
* the item emitted by the Observable
*/
void onNext(@NonNull T t);
/**
* Notifies the Observer that the {@link Observable} has experienced an error condition.
* <p>
* If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onComplete}.
*
* @param e
* the exception encountered by the Observable
*/
void onError(@NonNull Throwable e);
/**
* Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
* <p>
* The {@link Observable} will not call this method if it calls {@link #onError}.
*/
void onComplete();
}
お分かりのように、これは単なるインターフェイスです。 本当の実装は実装です。
Observableのソースコードを見てください。
Observable.createのソースコードを見てみました。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
ご覧のように、渡されたパラメータは、渡されたカスタム・ソースです。
サブスクリプション・プロセス分析
subscribe() をタップしてください。
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
subscribeActual(observer);ObservableCreate. subscribe().subscribe()は実際には、実行が完了するとすぐにcreateはObservableCreateオブジェクトを返します。 従って、subscribeActualの実装はObservableCreateの中になければなりません。
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
新しいエミッタが受信オブザーバをラップし、カスタムソースがsubscribe()を呼び出すことで、両端が接続されていることがわかります。
全体がコの字型の構造
Rx Hook
通常のオブザーバパターン:1人のオブザーバが複数のオブザーバに対応 オブザーバが変更の通知を送信 すべてのオブザーバが変更
Rxのオブザーバーパターン:複数のオブザーバーが1つのオブザーバーに対応し、変更の通知が送信され、終了点が変更される前に、開始点と終了点に再度サブスクライブする必要があります。
地図演算子の原理
サンプルコード
// ObseravbleCreate カスタムソースが渡される。== source
Observable.create(
// カスタムソース
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
}
})
// ObseravbleCreate.map
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(String s) throws Exception {
return null;
}
})
// ObservableMap.subscribe
.subscribe(
// カスタムオブザーバー
new Observer<Bitmap>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Bitmap bitmap) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
同じです。
ご覧の通り、封印と開封のプロセスが追加される以外は、サブスクリプション・プロセスと同様です。
タマネギのモデルです。
実際、Rxも装飾的なパターンを使用しています。
おそらく次のようなモデルでしょう。
そうですね......実は、Rxは標準的なU字型の構造ではなく、むしろそうなんです:
何かを注文するのと同じように ---> 小包に封をする ---> 小包を開ける





