blog

zookeeper NIOServerCnxnFactory をマスターするための図

zookeeper クラスタは、多数のクライアントアクセスをサポートする必要があり、多くのシナリオは、クライアントクラスタの再起動、クラスタの拡張、その他のシナリオなど、多数のクライアントが同時にアク...

Jun 23, 2020 · 6 min. read
シェア

zokeeper (zk)クラスタは、クライアントのクラスタの再起動、クラスタの拡張、およびその他のシナリオなど、多くのシナリオは、同時に多数のクライアントアクセスをサポートする必要があります。 zk管理クライアント接続を選択する2つの方法があります:NIOServerCnxnFactory、NettyServerCnxnFactory

 static public ServerCnxnFactory createFactory() throws IOException {
 //コンフィギュレーションを取得する
 String serverCnxnFactoryName =
 System.getProperty(ZOOKEEPER_SERVERXN_FACTORY);
 //デフォルトのネイティブNIO
 if (serverCnxnFactoryName == null) {
 serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
 }
 try {
 //指定されたクラス・ファクトリーをロードする
 ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
 .getDeclaredConstructor().newInstance();
 LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
 return serverCnxnFactory;
 } catch (Exception e) {
 //ignore
 }
 }

一見すると少し複雑な図ですが、少し組み合わせればとてもシンプルでエレガントな図になります。リンクの管理は次のようなものです:

  • ServerSocketの作成
  • 新しいコネクションに耳を傾けること
  • 接続からの読み取り/接続への書き込み
  • コネクションの破棄

このプロセスは、シーンのゲストとしてあなたの家に多くのゲストのようなものです、まず第一に、あなたはゲストにドアを開く必要があります;ゲストは、少なくともそれを満たすためにあなたに来る;ゲストは手ぶらで来るのが恥ずかしい、あなたはゲストがその途中で何かを食べるようにすることはできません;同じについての飲食、それはゲストに送信する必要があります、あなたは戦場をクリーンアップする必要があります。

上の図は、この最初の3つのプロセスと、具体的な実装を見るためのソースコードを組み合わせたものです。

ServerSocketの作成

public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException {
 	....
 this.ss = ServerSocketChannel.open();
 ss.socket().setReuseAddress(true);
 LOG.info("binding to port " + addr);
 ss.socket().bind(addr);
 ss.configureBlocking(false);
 //アプリプト Threadを作成する。
 acceptThread = new AcceptThread(ss, addr, selectorThreads);
}

ソケットアクセスの待ち受け

public void run() {
 try {
 while (!stopped && !acceptSocket.socket().isClosed()) {
 try {
 select();
 } catch (RuntimeException e) {
 LOG.warn("Ignoring unexpected runtime exception", e);
 } catch (Exception e) {
 LOG.warn("Ignoring unexpected exception", e);
 }
 }
 } finally {
 ...
 }
 }

**select()メソッドでは、キーとなるアクションはdoAccept() **メソッドのままです

selector.select();
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (!stopped && selectedKeys.hasNext()) {
 SelectionKey key = selectedKeys.next();
 selectedKeys.remove();
 if (!key.isValid()) {
 continue;
 }
 if (key.isAcceptable()) {
 if (!doAccept()) {
 
 pauseAccept(10);
 }
 }
}

コードを掲載する **doAccept()** メソッドは、追加の説明はまったく必要ありません。次のステップは、selectorThreadを見ることです。selectorThreadのコア処理を知るために、イメージを見てください。

SocketChannel sc = null;
try {
 sc = acceptSocket.accept();
 InetAddress ia = sc.socket().getInetAddress();
 	//接続数を制限するコードを省略する
 // ポーリングはセレクタ Threadを選択する。
 if (!selectorIterator.hasNext()) {
 selectorIterator = selectorThreads.iterator();
 }
 SelectorThread selectorThread = selectorIterator.next();
 //ソケットをセレクタ Threadに分配する。
 if (!selectorThread.addAcceptedConnection(sc)) {
 throw new IOException();
 }
 
} catch (IOException e) {
 ...
}

socket読み書きイベントの処理

実際には、焦点はまた、このセレクターモジュール、つまり、データ交換部、NIOは、複数のソケットを管理するセレクターシングルスレッドを介してです。

  1. zk は、データの読み書きをリッスンすることがパフォーマンスのボトルネックになると考えているため、SelectorThread はマルチインスタンスになっています。SelectorThread はソケットの読み書きイベントのみを担当し、特定の io 処理は IOHandle スレッドプールに引き渡されることを強調しておきます。

  2. セレクタ構造、2つのソケットキュー、sockketキューへの新しいアクセスはacceptQueue 、処理IOキューはupdateQueueです;イベントの大きなサイクル、大きなサイクルは3つのサブサイクルを含んでいます。ソースコードと照らし合わせて見てください。

     /**
     * 新しいソケットキュー、セレクタインスタンスのプロパティ、各セレクタは独自のプロパティを持っている。
     */
     private final Queue<SocketChannel> acceptedQueue;
     private final Queue<SelectionKey> updateQueue;
     
     
     //runのメソッドは
     while (!stopped) {
     try {
     //登録されたイベントを処理する
     select();
     //新しい接続を処理する
     processAcceptedConnections();
     //インタレスト・セットの更新を処理する
     processInterestOpsUpdateRequests();
     } catch (RuntimeException e) {
     LOG.warn("Ignoring unexpected runtime exception", e);
     } catch (Exception e) {
     LOG.warn("Ignoring unexpected exception", e);
     }
     }
     //いくつかのコードを省略する
    
  3. acceptedQueueキューのコンシューマです。行ごとにコードを見る必要はありませんが、データの流れの順序を見ることを望むかもしれません、ソケットの生産に新しいアクセスされたキューには、このキューの消費を参照してください。これは、関数名からprocessAcceptedConnections()であることを確認するのは簡単ですが物事を行うには、ソケットのキューをプルすることです、現在のセレクタに登録されているソケットの読み取りイベントは、ソケットを分割します。

ここでは、キューのノンブロッキングメソッドである poll() を使用した詳細を示します。 後ほど、なぜここでこのようなことができるのか、全体像を見てみましょう。

 private void processAcceptedConnections() {
 SocketChannel accepted;
 while (!stopped && (accepted = acceptedQueue.poll()) != null) {
 SelectionKey key = null;
 try {
 //読み取りイベントを登録する
 key = accepted.register(selector, SelectionKey.OP_READ);
 //ソケットをロードする
 NIOServerCnxnxn = createConnection(accepted, key, this);
 //キーにバインドする
 key.attachxn);
 //接続マップに追加する
 addCnxnxn);
 } catch (IOException e) {
 // register, createConnection
 cleanupSelectionKey(key);
 fastCloseSock(accepted);
 }
 }
 }
  1. updateQueueプロデューサー。新しいリンクの消費とイベントを読み取るために登録されているので、それはselect()を呼び出す必要があります、それはイベント1のサイクルですselect()メソッドは、実際には、それはここでは非常に単純ですが、イベントへのリンクを読み取り、書き込みを取得することであり、その後handleIO()に引き渡されます。しかし、イベント1のループ内の次の図に注意してください、それはsocketChannel操作の生産があるように思われ、実際には、この部分はhandleIOにあり、なぜソケットの処理は再び新しいキューにスローされますか?急がないし、誰がsocketChannelキューを消費している終了していない参照してください!
 try {
 selector.select();
 Set<SelectionKey> selected = selector.selectedKeys();
 ArrayList<SelectionKey> selectedList = new ArrayList<>(selected);
 //why shuffle 
 Collections.shuffle(selectedList);
 Iterator<SelectionKey> selectedKeys = selectedList.iterator();
 while (!stopped && selectedKeys.hasNext()) {
 SelectionKey key = selectedKeys.next();
 selected.remove(key);
 if (!key.isValid()) {
 cleanupSelectionKey(key);
 continue;
 }
 if (key.isReadable() || key.isWritable()) {
 handleIO(key);
 } else {
 LOG.warn("Unexpected ops in select " + key.readyOps());
 }
 }
 } catch (IOException e) {
 //ignore
 }
 private void processInterestOpsUpdateRequests() {
 SelectionKey key;
 while (!stopped && (key = updateQueue.poll()) != null) {
 if (!key.isValid()) {
 cleanupSelectionKey(key);
 }
 NIOServerCnxnxn = (NIOServerCnxn) key.attachment();
 if xn.isSelectable()) {
 key.interestOpsxn.getInterestOps());
 }
 }
 }
  1. まとめ。 今、この写真を見返してみると、かなりスッキリしたのではないでしょうか。最後に、セレクタの処理におけるいくつかの詳細について触れておきます。
  • メイン・ループのブロッキングは、サブループ2や3のブロッキング・キューではなく、サブループ1のselect()メソッドに依存しています。 ブロッキング・キューを使用できるかどうか考えてみてください。
  • ポーリング配信では、すぐにサブループ1を返す wakeUp() が呼び出されます。
public boolean addAcceptedConnection(SocketChannel accepted) { if (stopped || !acceptedQueue.offer(accepted)) { return false; } wakeupSelector(); return true; }
Read next

Jenkins分散環境のセットアップとビルドタスクをハンズオンで教える

分散配置ノードの追加\nシステム管理/ノード管理の新規ノードをクリック\n\nノード名を入力し、固定ノードをクリックします。\n\nノードの設定\n\n保存をクリックしてもノードが接続されません\n\nlaunchをクリックしてダウンロードします。\n\nノードの実行可能ディレクトリにファイルをコピーし、ダブルクリックしてインストールします。

Jun 23, 2020 · 3 min read