この記事では、canalのサーバーモジュールを分析し、いつものようにいくつかの質問をしながらソースコードを見ます:
- CanalServerは何通り使えますか?
- コンソールの管理者、クライアントはCanalServerとどのようにやりとりするのですか?
- CanalServerWithNettyとCanalServerWithEmbeddedの関係はどうなっているのですか?
- Canalの特徴であるイベント消費のためのプロトコル、非同期ストリーミングapiの設計はどのように実装されていますか?
サーバーモジュール内の構造は以下の通り:
主に3つのパッケージに分かれています:
- admin
このパッケージのCanalAdminインタフェースは、canal-adminコンソールで使用するためにcanalServer上で公開されるサービスインタフェースのいくつかを定義します。
前のデプロイモジュールの解析で述べた CanalAdminController は CanalAdmin インターフェースを実装しています。 Admin パッケージは netty をサーバーとして使用し、コンソールの Admin からのリクエストを受け付け、canalServer の現在のステータスの一部を返します。
- server
- spi
canalServerの監視内容を定義 spiを通して実装され、例えば、プロジェクトのPrometheusサブモジュールは監視機能を実装し、分析されません。
CanalServerのアーキテクチャから始めます。
CanalServerは現在2つのモードをサポートしています:
- serverMode = tcpJavaのサーバー・クライアント・モデル
- serverMode = kafak またはrocketMQのServer-MQ-Clientパターン
canalServerの構造を十分に理解していただくために、canalServerのアーキテクチャ図を入念に作成しました。
アーキテクチャを図に示します:
Server モジュールでは、さまざまなモジュールの関係と機能を明確に見ることができます:
- CanalServerWithEmbeddeは、記事で説明したパーサー・シンク・ストア方式で、binlogのサブスクライブ、フィルタリング、キャッシュを担当する特定のインスタンス・タスクを維持します。
- CanalAdminWithNettyは管理者のサーバーとして、コンソールのAdminから制御操作や状態の照会操作を受け、CanalServerやインスタンスの起動・停止・現在の状態を表示します。
アーキテクチャを図に示します:
主な部分はサーバー・クライアントモードと同じですが、主な違いは以下の通りです:
- CanalServerWithNettyの代わりに、CanalMQProducerがメッセージキューにメッセージをドロップします。
- CanalClientを使う代わりに、MqClientはメッセージキューを取得し、それを消費します。
このモードはサーバー・クライアント・モードと比較します。
- メッセージキューの特性を利用したダウンストリーム・デカップリングにより、複数のクライアントのブロードキャスト消費、クラスタ消費、リピート消費などをサポートすることができます。
- システムの複雑さが増し、待ち時間が増えます。
具体的なモードの選択は、具体的な使用シナリオによって決定される必要があります。
server
adminパッケージもspiパッケージもコアロジックの一部ではないので、焦点はサーバパッケージのコードです。
サーバーパッケージは、埋め込みパッケージ、例外パッケージ、nettyパッケージ、およびいくつかのインターフェースクラスの下に分割されていることを確認してください。
最上位の設計はCanalServerインターフェイスから始まります。
CanalServerWithEmbeddedとCanalServerWithNettyの2つの実装クラスがあります。
公式文書には、両者の違いがある程度示されています。
では、公式ドキュメントにあるように、自律型開発のためにEmbeddedをどのように使うのでしょうか?
上記のServer-ClientモードやServer-MQ-Clientモードとは全く異なり、下図のようなサーバーレスアーキテクチャを採用しています。
ご覧のように、このパターンはCanal-Serverを廃止して、自分のアプリケーションに直接canalを導入し、データの取得と購読のためにCanalServerWithEmbeddedを使用します。
もちろん、この方法は開発コストがかかるので、わざわざこの方法を使うことは通常ありません。
CanalServerWithEmbeddedとCanalServerWithNettyについては、公式のドキュメントでは、あまりうまく説明されていません。
この2つの実装クラスは、公式ドキュメントで説明されている違いを超えて、非常に関連しています。
サーバー・クライアント・モードでのモジュール接続について、上に示したアーキテクチャ図を見てみましょう。
CanalServerWithNetty
以下、まずCanalServerWithNettyクラスを見てみましょう。
private コンストラクタ + static 内部クラスを使用してシングルトンパターンを実装すると、CanalServer 内に CanalServerWithNetty が 1 つだけ存在することになります。
また、CanalServerWithEmbeddedオブジェクトが、関連するリクエストを処理するために内部で保持されていることがわかります。
ソースコードは以下の通りです:
主な工程は以下の通り:
- embeddedServerの起動
- ブートストラップ・インスタンスの作成、netty関連の設定
パラメータ NioServerSocketChannelFactory は Netty の API でもあり、2つのスレッドプールパラメータを受け付けます。最初のスレッドプールは Accept スレッドプール、2番目のスレッドプールは woker スレッドプールです。番目のスレッドプールはwokerスレッドプールです。ここでnettyの知識に属し、一時的な不慣れな深く見る必要はありませんが、単にnettyは、クライアントの高度な並列リクエストに対処するためにスレッドを使用することができます。
- デコード処理、認証、netty seesionHandlerの作成など、対応するパイプラインを構築します。
パイプラインは、実際にはクライアントの要求プロセッサチェーン上のnettyは、責任モデルのフィルタチェーンでJAVA EEのプログラミングに類似することができる、前のフィルタ処理は、次のフィルタ処理に完了すると、nettyでのみ、もはやフィルタではなく、ChannelHandlerです。
- nettyを起動してポートをリッスンすると、そのポートに対するクライアントからのリクエストを受信できるようになります。
netty関連の知識については、この記事では深く触れませんが、単純に高性能なサーバーがポートリクエストをリッスンし、それに対応する処理を行うことができると理解してください。
sessionHandler の処理に重点を置いています。
前述したように、serverWithNetty の処理ロジックは embeddedServer に委譲されているので、ハンドラが embeddedServer のインスタンスを保持し、ロジック処理を行うことは、ここでは非常に論理的です。
このメソッド内のコードは非常に長く、本質的な処理はembeddedServerに委譲されているので、メインロジックだけを見てください。
ご覧のように、パケットの種類によって、最終的にはembeddedServerに処理が委ねられ、ここでは論理的な判断と分配を行っているだけです。
この時点で、CanalServerWithNettyがどのように起動されるかが理解できます。
そして、その主な位置づけは、サーバーとして機能し、クライアントからのリクエストを受け取り、メッセージ配信を行い、処理のためにCanalServerEmbeddedに委譲することです。
以下は、関連するCanalServerEmbeddedの実装です。
CanalServerEmbedded
- 非完全なシングルトンパターンは、ここではパブリックコンストラクタの使用は、ユーザーがまだ自分の新しいオブジェクトをする機会がある、アプリケーションが独立して使用する時間の開発を導入するために使用されます。
- インスタンスを保持するオブジェクト・コンテナ
- 継承されたCanalServerとCanalServiceインターフェース
CannalServerインターフェイスは、実際にはstart()とstop()メソッドであり、特別な場所はありません、主なものは、start()がMigrateMap.makeComputingMapを設定することです。
CanalServiceインターフェイスによって定義されたメソッドに焦点を当てます。
各メソッドのエントリは、クライアントの ID である clientIdentity をもたらします。
現在、canalはインスタンスサブスクリプションに1つのクライアントのみをサポートしており、clientIdはすべて1001にデッドに書き込まれます。
特に、公式wikiに記載されているcanalのコア機能 - 非同期消費ストリーミングapi設計 - を理解することができます。
主なステップ
- clientIdentityのdestinationから対応するインスタンスを検索します。
- インスタンスのmetaManagerを通して、このクライアントが現在
- 届出中の加入関係の変更
- MemoryMetaManager:軌跡情報をメモリに格納
- ZookeeperMetaManage: 軌跡情報は zk に保存されます。
- PeriodMixedMetaManager:前の2つの混合で、メモリに保持され、ビット情報が定期的にzkにリフレッシュされます。
この方法は比較的簡単なので、ソースコードは載せません。
インスタンスに対応する metaManager を見つけ、unsubscribe メソッドを呼び出してこのクライアントの登録を解除することです。
サブスクライブを解除しても、インスタンス自体はまだ実行されており、新しいクライアントがインスタンスをサブスクライブできることに注意してください。
まず、いくつかのコンセプトを説明しましょう。
この場合、MemoryClientIdentityBatch オブジェクトが内部で保持され、クライアントのインスタンス・メッセージの消費を記録します。
このメソッドに戻ると、このメソッドはクライアントからbinlogメッセージを取得するために使用され、一般的な流れは次のようになります:
- clientIdentityの宛先に基づいて対応するインスタンスを取得します。
- ストリーミングデータの最後に取得されたポジションのバッチを取得 positionRanges
- 通常、最後のbatchIdの位置から開始し、batchIdがない場合は、カーソルレコードの消費位置から開始します。カーソルが空の場合は、eventStoreの最初のメッセージからのみ開始できます。
- イベントはエントリに変換され、新しい batchId が生成されます。
eventStoreでイベントをフェッチするとき、バッチサイズとタイムアウト時間をユーザが自分で設定できることに注意してください。効率を上げるために、通常は1つのbinlogをフェッチする代わりに、一度にバッチをフェッチします。このバッチサイズはクライアントが指定します。同時に、クライアントはタイムアウト時間を指定することができ、タイムアウト時間内であれば、batchSizeのbinlogが取得された場合、即座に返されます。 タイムアウト後、batchSizeで指定された数のbinlogが取得されなかった場合も、即座に返されます。特にタイムアウトが設定されていない場合、binlogが取得されなければ、eventStoreも即座に返されます。eventStoreのロジックについては、次回このモジュールについて説明します。
このメソッドは主にクライアント側でbinlogメッセージを取得するために使用され、基本的にはgetWithoutAckと同じです。
主な違いは、クライアントがバッチを取得し、それを自動的にackすることで、相対的には間違いなく速くなりますが、信頼性の保証はありません。
しばらくはプロジェクトで使われることはなさそうなので、拡大解釈はしません。
バッチ ID を確認します。確認後、このバッチ ID 以下のすべてのメッセージが確認されます。
- metaManagerからbatchIdに対応するレコードを削除します。
- 次のフェッチがその場所から開始できるように、正常に消費されたbinlogの場所を記録します。
- アクセプトされたデータはeventStoreでクリアされます。
rollbackにはrollback allとrollback specified batchIdの2つのメソッドがありますが、ソースコードを見る限り、現在はrollback specified specified batchIdもrollback allになっています。
ロールバックの本質は、まだアクセプトされていないすべてのバッチIDがクリアされ、ストリーミングAPIが取得したがまだアクセプトされていないメッセージが再取得されるということです。
canalMQStarter
セクション1のアーキテクチャパターンで分析したように、起動時に、serverModeにtcpが選択されていればcanalServerWithNettyが起動され、serverModeにmqが選択されていればcanalMQStarterが起動されます。
主なコンポーネントは以下の通り:
- ワーカースレッドプールexecutorService、インスタンスごとにワーカースレッドを起動
- CanalServerWithEmbedded
- CanalMQProducermqメッセージの配信
このメソッドは、先ほどのcanalStarterクラス内のstart()メソッドからCanalMQStarter.start()を呼び出したものです。
具体的には3つのことを行いました:
- CanalServerWithEmbeddedのシングルトン・オブジェクトの取得
- CanalMQRunnableのインスタンスごとにワーカースレッドを起動します。
- 終了時にスレッドプールと mqProducer を閉じるために ShutdownHook を登録します。
CanalMQRunnableが何をするのか、主なところを紹介します。
労働者の内部で何が起こっているかを見る内部クラスです。
ワーカーメソッドは1つだけで、主要なロジックは非常に明確です:
- クライアントとしてのアイデンティティを確立
- もしインスタンスがなければ、スリープしてインスタンスが生成されるのを待ちます。
- MQ デスティネーション・オブジェクトを構築し、関連する mq 構成情報をロードして、mqProducer の入力として使用します。
- このサブスクリプションクライアントをembeddedCanalに登録します。
- embeddedCanalを介したデータ消費のためのget/ack/rollbackプロトコルの実行とストリーミングの開始
要約
冒頭の質問に戻りますが、その答えは本文中にあると思います。
- CanalServerは何通り使えますか?
Server-ClientモードとServer-MQ-Clientモードの両方を使用して、独立して展開することができます。
開発はインラインで展開できます。
- コンソールの管理者、クライアントはCanalServerとどのようにやりとりするのですか?
- CanalServerWithNettyとCanalServerWithEmbeddedの関係はどうなっているのですか?
- Canalの特徴であるイベント消費のためのプロトコル、非同期ストリーミングapiの設計はどのように実装されていますか?
最後はみんな見て、元は簡単じゃない、気になるところを指差して、好きなところをクリック~。
Javaナレッジグラフを構築するために知識の断片化をリフレッシュ: