zokeeper (zk)クラスタは、クライアントのクラスタの再起動、クラスタの拡張、およびその他のシナリオなど、多くのシナリオは、同時に多数のクライアントアクセスをサポートする必要があります。 zk管理クライアント接続を選択する2つの方法があります:NIOServerCnxnFactory、NettyServerCnxnFactory。
static public ServerCnxnFactory createFactory() throws IOException {
//コンフィギュレーションを取得する
String serverCnxnFactoryName =
System.getProperty(ZOOKEEPER_SERVERXN_FACTORY);
//デフォルトのネイティブNIO
if (serverCnxnFactoryName == null) {
serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
}
try {
//指定されたクラス・ファクトリーをロードする
ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
.getDeclaredConstructor().newInstance();
LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
return serverCnxnFactory;
} catch (Exception e) {
//ignore
}
}
一見すると少し複雑な図ですが、少し組み合わせればとてもシンプルでエレガントな図になります。リンクの管理は次のようなものです:
- ServerSocketの作成
- 新しいコネクションに耳を傾けること
- 接続からの読み取り/接続への書き込み
- コネクションの破棄
このプロセスは、シーンのゲストとしてあなたの家に多くのゲストのようなものです、まず第一に、あなたはゲストにドアを開く必要があります;ゲストは、少なくともそれを満たすためにあなたに来る;ゲストは手ぶらで来るのが恥ずかしい、あなたはゲストがその途中で何かを食べるようにすることはできません;同じについての飲食、それはゲストに送信する必要があります、あなたは戦場をクリーンアップする必要があります。
上の図は、この最初の3つのプロセスと、具体的な実装を見るためのソースコードを組み合わせたものです。
ServerSocketの作成
public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException {
....
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port " + addr);
ss.socket().bind(addr);
ss.configureBlocking(false);
//アプリプト Threadを作成する。
acceptThread = new AcceptThread(ss, addr, selectorThreads);
}
ソケットアクセスの待ち受け
public void run() {
try {
while (!stopped && !acceptSocket.socket().isClosed()) {
try {
select();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring unexpected exception", e);
}
}
} finally {
...
}
}
**select()メソッドでは、キーとなるアクションはdoAccept() **メソッドのままです。
selector.select();
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (!stopped && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
if (!doAccept()) {
pauseAccept(10);
}
}
}
コードを掲載する **doAccept()** メソッドは、追加の説明はまったく必要ありません。次のステップは、selectorThreadを見ることです。selectorThreadのコア処理を知るために、イメージを見てください。
SocketChannel sc = null;
try {
sc = acceptSocket.accept();
InetAddress ia = sc.socket().getInetAddress();
//接続数を制限するコードを省略する
// ポーリングはセレクタ Threadを選択する。
if (!selectorIterator.hasNext()) {
selectorIterator = selectorThreads.iterator();
}
SelectorThread selectorThread = selectorIterator.next();
//ソケットをセレクタ Threadに分配する。
if (!selectorThread.addAcceptedConnection(sc)) {
throw new IOException();
}
} catch (IOException e) {
...
}
socket読み書きイベントの処理
実際には、焦点はまた、このセレクターモジュール、つまり、データ交換部、NIOは、複数のソケットを管理するセレクターシングルスレッドを介してです。
zk は、データの読み書きをリッスンすることがパフォーマンスのボトルネックになると考えているため、SelectorThread はマルチインスタンスになっています。SelectorThread はソケットの読み書きイベントのみを担当し、特定の io 処理は IOHandle スレッドプールに引き渡されることを強調しておきます。
セレクタ構造、2つのソケットキュー、sockketキューへの新しいアクセスはacceptQueue 、処理IOキューはupdateQueueです;イベントの大きなサイクル、大きなサイクルは3つのサブサイクルを含んでいます。ソースコードと照らし合わせて見てください。
/** * 新しいソケットキュー、セレクタインスタンスのプロパティ、各セレクタは独自のプロパティを持っている。 */ private final Queue<SocketChannel> acceptedQueue; private final Queue<SelectionKey> updateQueue; //runのメソッドは while (!stopped) { try { //登録されたイベントを処理する select(); //新しい接続を処理する processAcceptedConnections(); //インタレスト・セットの更新を処理する processInterestOpsUpdateRequests(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } //いくつかのコードを省略するacceptedQueueキューのコンシューマです。行ごとにコードを見る必要はありませんが、データの流れの順序を見ることを望むかもしれません、ソケットの生産に新しいアクセスされたキューには、このキューの消費を参照してください。これは、関数名からprocessAcceptedConnections()であることを確認するのは簡単ですが、物事を行うには、ソケットのキューをプルすることです、現在のセレクタに登録されているソケットの読み取りイベントは、ソケットを分割します。
ここでは、キューのノンブロッキングメソッドである poll() を使用した詳細を示します。 後ほど、なぜここでこのようなことができるのか、全体像を見てみましょう。
private void processAcceptedConnections() {
SocketChannel accepted;
while (!stopped && (accepted = acceptedQueue.poll()) != null) {
SelectionKey key = null;
try {
//読み取りイベントを登録する
key = accepted.register(selector, SelectionKey.OP_READ);
//ソケットをロードする
NIOServerCnxnxn = createConnection(accepted, key, this);
//キーにバインドする
key.attachxn);
//接続マップに追加する
addCnxnxn);
} catch (IOException e) {
// register, createConnection
cleanupSelectionKey(key);
fastCloseSock(accepted);
}
}
}
- updateQueueプロデューサー。新しいリンクの消費とイベントを読み取るために登録されているので、それはselect()を呼び出す必要があります、それはイベント1のサイクルですselect()メソッドは、実際には、それはここでは非常に単純ですが、イベントへのリンクを読み取り、書き込みを取得することであり、その後handleIO()に引き渡されます。しかし、イベント1のループ内の次の図に注意してください、それはsocketChannel操作の生産があるように思われ、実際には、この部分はhandleIOにあり、なぜソケットの処理は再び新しいキューにスローされますか?急がないし、誰がsocketChannelキューを消費している終了していない参照してください!
try {
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
ArrayList<SelectionKey> selectedList = new ArrayList<>(selected);
//why shuffle
Collections.shuffle(selectedList);
Iterator<SelectionKey> selectedKeys = selectedList.iterator();
while (!stopped && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selected.remove(key);
if (!key.isValid()) {
cleanupSelectionKey(key);
continue;
}
if (key.isReadable() || key.isWritable()) {
handleIO(key);
} else {
LOG.warn("Unexpected ops in select " + key.readyOps());
}
}
} catch (IOException e) {
//ignore
}
private void processInterestOpsUpdateRequests() {
SelectionKey key;
while (!stopped && (key = updateQueue.poll()) != null) {
if (!key.isValid()) {
cleanupSelectionKey(key);
}
NIOServerCnxnxn = (NIOServerCnxn) key.attachment();
if xn.isSelectable()) {
key.interestOpsxn.getInterestOps());
}
}
}
- まとめ。 今、この写真を見返してみると、かなりスッキリしたのではないでしょうか。最後に、セレクタの処理におけるいくつかの詳細について触れておきます。
- メイン・ループのブロッキングは、サブループ2や3のブロッキング・キューではなく、サブループ1のselect()メソッドに依存しています。 ブロッキング・キューを使用できるかどうか考えてみてください。
- ポーリング配信では、すぐにサブループ1を返す wakeUp() が呼び出されます。
public boolean addAcceptedConnection(SocketChannel accepted) {
if (stopped || !acceptedQueue.offer(accepted)) {
return false;
}
wakeupSelector();
return true;
}





