オブザーバーパターン
Rxデザインパターンは実際にはObserverパターンを偽装したものなので、まず標準的なObserverパターンを理解することが重要です。
これが標準的なオブザーバーパターンです。シンプルです。
Rx Hook
まずコードの一部を見てください:
 Observable.create(new ObservableOnSubscribe<String>() {
 @Override
 public void subscribe(ObservableEmitter<String> e) throws Exception {
 
 }
 })
 .map(new Function<String, String>() {
 @Override
 public String apply(String s) throws Exception {
 return null;
 }
 })
 .subscribe(new Observer<String>() {
 @Override
 public void onSubscribe(Disposable d) {
 disposable = d;
 }
 @Override
 public void onNext(String s) {
 }
 @Override
 public void onError(Throwable e) {
 }
 @Override
 public void onComplete() {
 }
 });
.createではnew ObservableCreateが返され、.mapではnew ObservableMapが返されます。必要なのは、この処理をインターセプトすることです。図のように
では、その真ん中には何があるのでしょうか?ソースコードを見てみましょう。createに移動します。
 		@CheckReturnValue
 @SchedulerSupport(SchedulerSupport.NONE)
 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
 ObjectHelper.requireNonNull(source, "source is null");
 return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
 }
真ん中のエリアはonAssemblyで、これはグローバル・フックです。
 		@CheckReturnValue
 @SchedulerSupport(SchedulerSupport.NONE)
 public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
 ObjectHelper.requireNonNull(mapper, "mapper is null");
 return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
 }
onAssemblyにアクセスして中に入ってください。
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
 Function<? super Observable, ? extends Observable> f = 			 onObservableAssembly;
 if (f != null) {
 return apply(f, source);
 }
 return source;
 
onObservableAssembly デフォルトはnullなので、これは何もしません。onObservableAssembly 次に、値が代入される場所を見てください。
 /**
 * Sets the specific hook function.
 * @param onObservableAssembly the hook function to set, null allowed
 */
 @SuppressWarnings("rawtypes")
 public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {
 if (lockdown) {
 throw new IllegalStateException("Plugins can't be changed anymore");
 }
 RxJavaPlugins.onObservableAssembly = onObservableAssembly;
 }
コメントから明らかなように、指定されたフック関数をセットするので、この関数を呼び出して独自の関数をセットすることで、グローバルrxをインターセプトすることが可能です。
RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
 @Override
 public Observable apply(Observable observable) throws Exception {
 //ここで、次のようなインターセプトができる。
 //nullを返すと、それ以降のrxの使用はすべてnullポインタになる。
 return observable;
 }
 });
 
Rxオブザーバーパターン
まず、Observable を作成し、次に Observer を作成します。
オブザーバーのソースコードを見る
public interface Observer<T> {
 /**
 * Provides the Observer with the means of cancelling (disposing) the
 * connection (channel) with the Observable in both
 * synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
 * @param d the Disposable instance whose {@link Disposable#dispose()} can
 * be called anytime to cancel the connection
 * @since 2.0
 */
 void onSubscribe(@NonNull Disposable d);
 /**
 * Provides the Observer with a new item to observe.
 * <p>
 * The {@link Observable} may call this method 0 or more times.
 * <p>
 * The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
 * {@link #onError}.
 *
 * @param t
 * the item emitted by the Observable
 */
 void onNext(@NonNull T t);
 /**
 * Notifies the Observer that the {@link Observable} has experienced an error condition.
 * <p>
 * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
 * {@link #onComplete}.
 *
 * @param e
 * the exception encountered by the Observable
 */
 void onError(@NonNull Throwable e);
 /**
 * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
 * <p>
 * The {@link Observable} will not call this method if it calls {@link #onError}.
 */
 void onComplete();
}
お分かりのように、これは単なるインターフェイスです。 本当の実装は実装です。
Observableのソースコードを見てください。
Observable.createのソースコードを見てみました。
 		@CheckReturnValue
 @SchedulerSupport(SchedulerSupport.NONE)
 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
 ObjectHelper.requireNonNull(source, "source is null");
 return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
 }
ご覧のように、渡されたパラメータは、渡されたカスタム・ソースです。
サブスクリプション・プロセス分析
subscribe() をタップしてください。
 @SchedulerSupport(SchedulerSupport.NONE)
 @Override
 public final void subscribe(Observer<? super T> observer) {
 ObjectHelper.requireNonNull(observer, "observer is null");
 try {
 observer = RxJavaPlugins.onSubscribe(this, observer);
 ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
 subscribeActual(observer);
 } catch (NullPointerException e) { // NOPMD
 throw e;
 } catch (Throwable e) {
 Exceptions.throwIfFatal(e);
 // can't call onError because no way to know if a Disposable has been set or not
 // can't call onSubscribe because the call might have set a Subscription already
 RxJavaPlugins.onError(e);
 NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
 npe.initCause(e);
 throw npe;
 }
 }
 subscribeActual(observer);ObservableCreate. subscribe().subscribe()は実際には、実行が完了するとすぐにcreateはObservableCreateオブジェクトを返します。 従って、subscribeActualの実装はObservableCreateの中になければなりません。
 @Override
 protected void subscribeActual(Observer<? super T> observer) {
 CreateEmitter<T> parent = new CreateEmitter<T>(observer);
 observer.onSubscribe(parent);
 try {
 source.subscribe(parent);
 } catch (Throwable ex) {
 Exceptions.throwIfFatal(ex);
 parent.onError(ex);
 }
 }
新しいエミッタが受信オブザーバをラップし、カスタムソースがsubscribe()を呼び出すことで、両端が接続されていることがわかります。
全体がコの字型の構造
Rx Hook
通常のオブザーバパターン:1人のオブザーバが複数のオブザーバに対応 オブザーバが変更の通知を送信 すべてのオブザーバが変更
Rxのオブザーバーパターン:複数のオブザーバーが1つのオブザーバーに対応し、変更の通知が送信され、終了点が変更される前に、開始点と終了点に再度サブスクライブする必要があります。
地図演算子の原理
サンプルコード
 			// ObseravbleCreate カスタムソースが渡される。== source
 Observable.create(
 // カスタムソース
 new ObservableOnSubscribe<String>() {
 @Override
 public void subscribe(ObservableEmitter<String> e) throws Exception {
 }
 })
 // ObseravbleCreate.map
 .map(new Function<String, Bitmap>() {
 @Override
 public Bitmap apply(String s) throws Exception {
 return null;
 }
 })
 // ObservableMap.subscribe
 .subscribe(
 // カスタムオブザーバー
 new Observer<Bitmap>() {
 @Override
 public void onSubscribe(Disposable d) {
 }
 @Override
 public void onNext(Bitmap bitmap) {
 }
 @Override
 public void onError(Throwable e) {
 }
 @Override
 public void onComplete() {
 }
 });
同じです。
ご覧の通り、封印と開封のプロセスが追加される以外は、サブスクリプション・プロセスと同様です。
タマネギのモデルです。
実際、Rxも装飾的なパターンを使用しています。
おそらく次のようなモデルでしょう。
そうですね......実は、Rxは標準的なU字型の構造ではなく、むしろそうなんです:
何かを注文するのと同じように ---> 小包に封をする ---> 小包を開ける





