レスポンシブ・プログラミングの最大の問題 - 悪いデバッグ
従来のコードを分析する場合、ブレークポイントにヒットすると、コールスタックを視覚的に見ることができ、誰がこのコードを呼び出したのか、パラメータにどのような変更が加えられたのか、などを把握することができます。しかし、レスポンシブ・プログラミングでは、これは問題です。次の例を見てください。
public class FluxUtil1 {
public static Flux<Integer> test(Flux<Integer> integerFlux) {
return FluxUtil2.test2(integerFlux.map(Object::toString));
}
}
public class FluxUtil2 {
public static Flux<Integer> test2(Flux<String> stringFlux) {
return stringFlux.map(Integer::new);
}
}
public class FluxTest {
public static void main(String[] args) {
Flux<Integer> integerFlux = Flux.fromIterable(List.of(1, 2, 3));
FluxUtil1.test(integerFlux.log()).subscribe(integer -> {
System.out.println(integer);
});
}
}
System.out.println(integer)サブスクリプション消費にサブスクライブするためのデバッグでは、通常、このことにサブスクライブするためにどのような処理が行われたかを知りたいと思うでしょう。
FluxUtil1だと全然わからない、FluxUtil2がこのFluxを処理しました。シンプルなコードでいい、複雑なデバッグは単に人を殺すだけ。公式もそれを意識して、操作時にスタックキャッシュをキャプチャする仕組みを提供しています。
これらのメカニズムがどのように使用されるかについては、まずここで説明し、実装の原則については後で分析します。
グローバルオペレータスタックを開いてトレース
reactor.trace.operatorStacktrace環-Dreactor.trace.operatorStacktrace=true境変数をtrueに設定、つまりスタートアップ・パラメーターに追加すると、グローバル・オペレーター・スタック・トレースが開始されます。
これはコードによって動的にオン・オフすることもできます:
//
Hooks.onOperatorDebug();
//
Hooks.resetOnOperatorDebug();
このトレースをオンにすると、追加のOperatorごとに、追加のFluxOnAssemblyが存在し、このFluxOnAssemblyを通して、内部にスタック情報が存在します。それを取得するには?Scannable.from(フラックス).parents().collect(Collectors.toList())FluxOnAssemblyを含む内部のすべてのレイヤーのFluxを取得するために使用することができ、FluxOnAssemblyにはスタック情報が含まれています。
System.out.println(integer)Scannable.from(FluxUtil1.test(integerFlux.log())).parents().collect(Collectors.toList())ここでは、それを見ることができます:
ご覧のように、各マップ操作がコードのどの行で発生したかを正確に確認することができます。
IDEAのProバージョンを使用している場合、設定することもできます:
そうすれば、Debugを中断して正確なスタックを見ることができます:
これはReactorDebugAgentを追加することで実現されます。
依存関係を追加します:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-tools</artifactId>
<version> </version>
</dependency>
その後、この2つのコードで
//
ReactorDebugAgent.init();
//例えば、最初はロードされなかったが、後で動的にロードされたクラスなど、有効化されなかったクラスがある場合、有効化を再処理するためにこれを呼び出すことができる。
ReactorDebugAgent.processExistingClasses();
このようにして、オンラインアプリケーションを動的に変更し、デバッグモードを有効にすることができます。例えば、Arthasツールalibaba.github.io/arthas/ognl)でoggnlを介して静的メソッドを呼び出すことができます。
IDEAのProバージョンを使用している場合、設定することもできます:
そうすれば、Debugを中断して正確なスタックを見ることができます:
レスポンシブ・プログラミング - フローの理解
java.util.concurrent.Flow Flowは、Java 9で導入されたレスポンシブ・プログラミングの抽象化であり、対応するクラスは次のとおりです。これらのインターフェースは、Publisher、Subscriber、Subscriptionです。
//ラベルはFunctionalInterfaceである。
@FunctionalInterface
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
public static interface Subscription {
public void request(long n);
public void cancel();
}
Subscribeメソッドは、サブスクライバを登録します。登録に成功すると、Subscriber の onSubscribe メソッドが呼び出され、Subscription が渡されます。このサブスクリプションの内部で、requestはPublisherに何個のアイテムを送信するように要求するために使用され、cancelはPublisherにアイテムを送信しないように指示するために使用されます。Publisherがアイテムを生成し、Subscriptionリクエストの数を超えないたびに、onNextメソッドが呼び出されてアイテムが送信され、例外が発生するとonErrorが呼び出されます。Publisherは、新しいアイテムや例外が発生しないと判断すると、onCompleteを呼び出してSubscriberに消費が完了したことを伝えます。これが基本的なプロセスです。
Project Reactor - FluxFlowのインターフェースの実装方法
Fluxは同じタイプのデータのストリームで、例えば0~n個のオブジェクトを含んで放出します:
Flux<String> just = Flux.just("1", "2", "3");
これは、以下の3つの文字列を含むFluxストリームを生成します。
次に、Flowで述べたプロセスに従って、単純なサブスクライブから始めます。
Flux.just("test1", "test2", "test3")
//詳細なフローログを印刷する
.log()
//サブスクリプションの消費
.subscribe(System.out::println);
コードを実行すると、ログ出力が表示されます:
.816 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
.822 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
.823 [main] INFO reactor.Flux.Array.1 - | onNext(test1)
test1
.823 [main] INFO reactor.Flux.Array.1 - | onNext(test2)
test2
.823 [main] INFO reactor.Flux.Array.1 - | onNext(test3)
test3
.824 [main] INFO reactor.Flux.Array.1 - | onComplete()
これらのログを見れば、サブスクライブが実際にどのように機能しているかがよくわかります:
まず、subscribe 中に onSubscribe が最初に呼ばれます。
request(unbounded)を呼び出すと、ここでrequestはいくつのデータを要求するかを意味し、unboundedは無制限に要求することを意味します。
各データ・オブジェクトについて、onNextメソッドを呼び出します: onNext(test1), onNext(test2), onNext(test3)
最終的な完了時に、onComplete が呼び出されます。例外が発生した場合は、onError が呼び出され、onComplete は呼び出されません。これらのメソッドは、実際には、Flux のサブスクライバであるサブスクライバのメソッドであり、サブスクライバがどのように消費されるか、および消費の具体的な操作を設定します。
//操作の各要素について @Override public void onNext(String o) { System.out.println(o); } //エラーが発生した場合 @Override public void onError(Throwable throwable) { log.error("error: {}", throwable.getMessage(), throwable); } //完了時、エラーは完了としてカウントされない @Override public void onComplete() { log.info("complete"); }};
Flux.just("test1", "test2", "test3") //print detailed stream log .log() //subscribe to consumption .subscribe(subscriber);
実行後のログは
.227 [main] INFO reactor.Flux.Array.2 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
.227 [main] INFO reactor.Flux.Array.2 - | request(unbounded)
.228 [main] INFO reactor.Flux.Array.2 - | onNext(test1)
test1
.228 [main] INFO reactor.Flux.Array.2 - | onNext(test2)
test2
.228 [main] INFO reactor.Flux.Array.2 - | onNext(test3)
test3
.228 [main] INFO reactor.Flux.Array.2 - | onComplete()
.235 [main] INFO com.test.TestMonoFlux - complete
subscribeには次のようなapiもあります:
//コンシュームする必要がなく、Fluxの中間処理を開始するだけでよい場合は、次のようにする。
subscribe();
//同等である:
new Subscriber() {
@Override
public void onSubscribe(Subscription subscription) {
//要素の最大数を取る
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Object o) {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
};
//コンシューマーを指定する
subscribe(Consumer<? super T> consumer);
//同等である:
new Subscriber() {
@Override
public void onSubscribe(Subscription subscription) {
//要素の最大数を取る
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Object o) {
consumer.accept(o);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
};
//コンシューマーと例外ハンドラーを指定する
subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer);
//同等である:
new Subscriber() {
@Override
public void onSubscribe(Subscription subscription) {
//要素の最大数を取る
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Object o) {
consumer.accept(o);
}
@Override
public void onError(Throwable throwable) {
errorConsumer.accept(throwable);
}
@Override
public void onComplete() {
}
};
//コンシューマー、例外ハンドラー、完了時に実行するアクションを指定する。
subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer);
//同等である:
new Subscriber() {
@Override
public void onSubscribe(Subscription subscription) {
//要素の最大数を取る
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Object o) {
consumer.accept(o);
}
@Override
public void onError(Throwable throwable) {
errorConsumer.accept(throwable);
}
@Override
public void onComplete() {
completeConsumer.run();
}
};
//サブスクライバーの必要な要素をすべて指定する
subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer);
//同等である:
new Subscriber() {
@Override
public void onSubscribe(Subscription subscription) {
subscriptionConsumer.accept(subscription);
}
@Override
public void onNext(Object o) {
consumer.accept(o);
}
@Override
public void onError(Throwable throwable) {
errorConsumer.accept(throwable);
}
@Override
public void onComplete() {
completeConsumer.run();
}
};
このように、同フローのデザインに対応しています。




