まえがき
ここ数日、Javaの並行処理関連の知識を見ていて、新しい知識をたくさん学びました。兄1つずつ、より多くのブログの速度がとても速く見て、あなたは豪雨の川のように氾濫の知識を考えることができます。今週は、いくつかの並行処理の知識を学ぶために、FTPサーバを構築し、私はそれらの知識を学ぶために、今週は少しダーリンを知らない?
ConcurrentHashMapは、スレッドセーフを実現する方法です。
前節でハッシュテーブルはスレッドセーフな構造体であることを説明しましたが、操作のたびに同期ブロックを使用するため、あまり効率的ではありません。スレッドの同期ブロックはロックを取得すると他のスレッドをブロックするので、スレッドをブロックするとオペレーティング・システムがユーザー状態からブロッキング状態になり、パフォーマンスの消費が増えます。
そのため、並行性の高いシナリオでは、ConcurrentHashMapの使用が最も適切です。では、なぜConcurrentHashMap構造を使用したいのか、それは結局のところ、スレッドセーフと並行性を実現する方法なのです。
ConcurrentHashMapの基本的な実装は何ですか?
JDK 1.7のConcurrentHashMapではどのようなテクニックが使われていますか?
1.7 関数の構成要素と変数など
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
static final int DEFAULT_INITIAL_CAPACITY = 16;//デフォルトの容量、セグメント数
static final float DEFAULT_LOAD_FACTOR = 0.75f;//負荷係数
static final int DEFAULT_CONCURRENCY_LEVEL = 16;//
static final int MAXIMUM_CAPACITY = 1 << 30;//最大容量、または30ビットを左にシフトする
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;//最小のセグメントの容量は2である
final Segment<K,V>[] segments;//重要なコンポーネント
}
セグメントロック技術は、ConcurrentHashMapの内部を複数のセグメント(Segment)に分割するために使用されます。
Segment
static final class Segment<K,V> extends ReentrantLock implements Serializable {
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
transient volatile HashEntry<K,V>[] table;//HashMapエントリーに似ている
transient int modCount;//修正
transient int threshold;//
final float loadFactor;//負荷係数
}
HashEntry<K,V>[] table
セグメント内部は、配列に似たエントリのHashMapのJDK1.7バージョンとチェーンテーブルの形式です。しかし、ここでは、追加のvolatileキーワード修飾子があるので、結局それを行うことの利点は何ですか?
volatile
volatileキーワードの裏表を理解するには、Javaのメモリ・モデルやCPUのキャッシュ・モデルなどの知識が必要です。そこで、並行プログラミングの3つの重要な特徴である、原子性、順序性、可視性から始めましょう。
- 原子性
原子性とは、1つまたは複数の操作において、すべての操作が同時に成功するか、または同時に失敗することを意味します。これは前回の記事でも取り上げましたが、例えばタオバオで歯ブラシを購入する場合、私が20を使い、タオバオが20を受け取るか、私が送金できず、タオバオが受け取らないか。淘宝で歯ブラシを買えば、淘宝は20を受け取ります。淘宝で歯ブラシを買って、淘宝が20を受け取るか、淘宝が20を受け取らないか。 - 順序性
順序性とは、コードの実行プロセスの順序のことで、コンパイル時のJavaの最適化により、JVMが命令を並べ替える可能性があるため、コードの実行順序が順次実行されない場合があります。 - 可視性
可視性とは、あるスレッドが変数に変更を加えた際に、別のスレッドがその変更の最新値を即座に確認できることを意味します。これはMySQLのread-uncommittedトランザクション分離レベルに似ており、ダーティなデータの読み取りを防ぎます。
では、JVMはどのようにして「可視性」を確保するのでしょうか?
- volatileキーワードを使用すると、共有リソースに対する読み取り操作はメインメモリで直接実行され、共有データに対する書き込み操作は、もちろん最初に作業メモリで変更されますが、変更後はメインメモリにアクティブにフラッシュされます。
- synchronizedキーワードでは、synchronizedは1つのスレッドだけがロックを取得し、同時にsynchronizedメソッドを実行することを保証し、ロックが解放されたときに変数への変更がメインメモリにアクティブにフラッシュされることも保証します。
- これは、JUCが提供する明示的ロックLockによっても保証され、そのロックメソッドは、1つのスレッドだけがロックを取得し、同時に同期メソッドを実行することを保証し、ロックが解放されたときに変更された変数がメインメモリにフラッシュされることを保証します。
シナリオは、読み取りスレッド自体がInit_valueの値を持っている場合は、値が出力に変更されず、プリントアウト上で変更された場合、現在の値と比較する必要があるたびに、あるとします。変更スレッドは、現在の値が変更されるたびに、変更は、それが交互に出力される場合でも、読み取りスレッドの出力を待機するために使用される、10ミリ秒スリープ状態になります。
どうなったと思いますか?
その結果、変更スレッドは出力を続け、読み取りスレッドはコンソールに何も出力しません。これは、共有データがCPUのキャッシュ、つまり読み取りスレッドのローカルメモリにキャッシュされているためです。変更するスレッドがデータを変更しても、読み込みスレッドはそれをローカルにフェッチし、ローカルメモリの値は変更されないので、出力されません。次にvolatileキーワードを使用すると、読み取りスレッドにメモリからローカルスレッドメモリへのリフレッシュを強制できます。
揮発性キーワードのセマンティクス:
- 順序性の保証
- 原子性は保証されません
- 原子性は保証されません
ここで HashEntry に戻ると、volatile キーワードは、同時実行環境における HashEntry の順序と可視性を確保するために使用されます。
ReentrantLock の使用と理解
java.util.concurrent
ソースコードからセグメント継承ReentrantLockを見ることができる、ReentrantLockは非常に重要な知識ポイントは、我々はすべてのスレッドの安全性を確保するために、相互排除同期を使用することができます知っている、同期相互排除の最も基本的な手段は、同期を達成するためにReentrantLockの下に同期とパッケージです。基本的な使用では、ReentrantLockとsynchronizedは非常に似ていますが、JDK1.5でReentrantLock関数は、広い範囲の使用でJDK1.8でより豊富です。
JDK1.8のConcurrentHashMapではどのようなテクニックが使われていますか?
悲観的なロック戦略
ミューテックス同期の主な問題は、スレッドのブロッキングとウェイクアップによるパフォーマンスの問題です。相互排他的同期は、その扱い方において悲観的な並行性戦略であり、常に正しい同期対策を行わなければ問題が発生するという前提に立ち、共有データに対する競合の有無に関わらずロックが必要です。
楽観的ロック戦略
競合検出に基づく楽観的な同時実行戦略は、最初に操作を実行し、共有リソースを競合している他のスレッドがなければ、操作は成功します。共有データに対する競合があり、競合が存在する場合は、他のノーゴー措置が取られます。
put
public V put(K key, V value) {//セグメントを見つける
Segment<K,V> s;
if (value == null)
throw new NullPointerException();//NULLの場合は例外!
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);
return s.put(key, hash, value, false);//次に、HashEntryから挿入する。
}
/**
**segment配列のput操作、putのHashMapに似ている。
**/
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);//スピンロック戦略
V oldValue;//古い値を保存する
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;//セグメントを計算する[i]特定の場所にあるテーブル、これらはハッシュマップの操作に似ている
HashEntry<K,V> first = entryAt(tab, index);//テーブルの先頭ノードを見つける
for (HashEntry<K,V> e = first;;) {//連鎖したテーブルを繰り返し処理する
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {//同じ値を持つノードの存在
oldValue = e.value;//直接オーバーライドする
if (!onlyIfAbsent) {//状態は存在する
e.value = value;
++modCount;//修正数プラス1
}
break;
}
e = e.next;//次のノードに置き換える
}
else {
if (node != null)//値の存在を証明する
node.setNext(first);//
else
node = new HashEntry<K,V>(hash, key, value, first);//空のノードが直接コピーされる
int c = count + 1;//
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
スピンロック
/**
**HashEntryノードをスキャンしてロックを取得する
**/
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);//挿入値現在のチェーンテーブルの先頭ノードを取得する
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;//新しいノード
int retries = -1; // ロック取得の試行回数、最初は-1回である。
while (!tryLock()) {//ロックを取得しない場合
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {//最初のケースに進む
if (e == null) {//ヘッド・ノードが空なので、新しいノードを作成する
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);//今後、少しでも作業を少なくするために、直接
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {//2つ目のケースでは、スピンカウントが超過し、ブロックされる。
lock();//ロックを取得できない場合は、ブロックして待つ
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
//現在のロックが取得されていないため、他のスレッドによって先頭ノードが変更される可能性があり、先頭ノードかどうかを判断する必要がある。もしそうでなければ、判定をやり直す。
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
tryLock() メソッドはロックの取得を試み、取得できなければ false を返し、取得できれば true を返します。
Lock() メソッドは、ロックを取得できなければブロックします。
フローチャート
拡張操作
/**
**拡張操作は、直接ハッシュを使用して、リハッシュする必要はない
**/
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;//古いハッシュ・テーブル
int oldCapacity = oldTable.length;//ハッシュ・テーブルの長さは
int newCapacity = oldCapacity << 1;//倍の容量
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];//拡張ハッシュ・テーブル
int sizeMask = newCapacity - 1;//配列の添え字を計算する
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;//毎回同じインデックスを持つノードの最後の文字列を記録し、値を再割り当てする
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {//ノードを反復処理し、同じ最後のインデックス値を持つノードの文字列を記録する
int k = last.hash & sizeMask;
if (k != lastIdx) {//最後のノードのインデックスが配列のインデックスと同じ値ではない場合
lastIdx = k;//最後のノードのインデックスを更新する
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {//同じインデックス値を持つ最後の文字列を、新しい配列の添え字に移動し直す!
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // 追加されたノードのGetindex
node.setNext(newTable[nodeIndex]);//
newTable[nodeIndex] = node;
table = newTable;
}
get
get はもっと単純で、ハッシュを通してキーで特定のセグメントを探し、ハッシュを通して特定の要素を探します。
複数のスレッドが HashEntry を変更しても、volatile 修飾子を持つ HashEntry は毎回最新の値を取得します。
結論
なぜスレッドセーフなのかというと、バージョン1.7では各メソッドがロックされるため、スピンロックを使用することでスレッドがブロッキングすることがなくなり、hashtableよりもパフォーマンスが向上するからです。また、各HashEntry[i]はvolatileで変更され、スレッド操作の可視性を確保できます。つまり、他のスレッドが値を変更しても、毎回ダーティリードされることはなく、強制的にローカルメモリにリフレッシュされます。
では、なぜ高い同時実行性?
それは、ロックのための単一のセグメント[i]であるため、セグメントが16ある場合、同時に変更する16のスレッドがあってもスレッドセーフであることを意味します。Hashtableロックとは対照的に、Hashtableオブジェクト全体をロックするには、複数のスレッドアクセスをブロックする必要があります。
JDK1.8ConcurrentHashMap
バージョン1.8では、1.7の分割ロック戦略をキャンセルし、セキュリティのためにCAS + synchronizedを使用しています。これもHashMapのバージョン1.8と同様で、チェーンテーブルのノード数が8より大きい場合に変換される赤黒木を導入しています。
ソースコードの構築
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
private static final int MAXIMUM_CAPACITY = 1 << 30;//最大容量
private static final int DEFAULT_CAPACITY = 16;//デフォルトの容量
private static final float LOAD_FACTOR = 0.75f;//負荷係数
static final int TREEIFY_THRESHOLD = 8;//链表节点数大于8 转红黑树
static final int UNTREEIFY_THRESHOLD = 6;//連鎖テーブルに6未満の赤黒ツリーノードを削除する
transient volatile Node<K,V>[] table;//ノードバケット、揮発性修飾子
private transient volatile Node<K,V>[] nextTable;//次の表
}
のputメソッドは
final V putVal(K key, V value, boolean onlyIfAbsent) {//putでputValを呼び出す。
if (key == null || value == null) throw new NullPointerException();//挿入された値はNULLであってはならない
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {//テーブル・ノードを繰り返し処理する
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();//初期化テーブル
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//テーブルを取得する[i]クラスのヘッド・ノードは空である
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))//CAS操作、ノードへの書き込み
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)//別のスレッドが拡張しているとき
tab = helpTransfer(tab, f);//現在のスレッドは容量を拡大し、効率を高めるのに役立つ
else {//テーブル・ノードに入り、現在のポイントが連鎖したテーブル・ノードか赤黒いツリー・ノードかを判断する
V oldVal = null;
synchronized (f) {//テーブル・ヘッド・ノードのロック・リソースを取得する
if (tabAt(tab, i) == f) {//変更されているかどうかを再判定し、変更されていれば上位レベルに戻って再度取得する
if (fh >= 0) {//連鎖ノード
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {//ノードの連鎖表を繰り返し処理する
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {//同じ直接オーバーライド
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {//尾插法 插入节点
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {//红黑树节点
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)//連鎖するテーブルのノードが8より大きい
treeifyBin(tab, i);//赤黒ツリーを回す
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);//ノードの数を数える
return null;
}
TreeBinは、赤黒木が自己バランスされている場合でも、ロックリソースを解放しないことを保証します。HashMapの赤黒木構造を使用する場合は、ノードの挿入は、ヘッドノードの交換、およびヘッドノードがロックを取得するように自己バランスされるため、ヘッドノードの交換は、この時点で他のスレッドは、例外の挿入の結果、ノードのロックを取得する場合。
パットプロセス:
- 1.まず、キーと値が空かどうかを判断し、空の場合は例外をスローします。
- 2.ハッシュ値を計算し、テーブル構造をトラバースし、次の3つのケースに
- テーブルを初期化する必要があるかどうか、空の場合は初期化して2に戻ります。
- table[i]のheadノードを取得、空ならCASで挿入、2へ。
- 現在拡張中のスレッドがあるかどうかを確認し、あればhelpTransferを拡張して2へ進みます。
- synchronizedはロック・リソースを取得し、それがリンクリスト・ノードなのか赤黒木ノードなのかを判断して
- 空文字列を返します。
- 空に戻る
比較と置換
compareAndSwapLong()
楽観的なロック戦略は、CAS命令は、それが実施されていることを確認する必要があり、楽観的なロックを実装する方法です、CAS操作は、sun.misc.UnsafeクラスcompareAndSwapInt()と他のいくつかのメソッドパッケージによって提供される軽量ロックの一種である、多くのツールクラスのJ.U.Cは、CASの実装に基づいているような。CompareAndSetとgetAndIncrement()など、安全でないクラスのCAS操作を使用します。
CAS操作のプロセスは、データを書き戻すための準備で、ロックすることなく、読み取りデータのスレッドですが、元の値を比較すると、他のスレッドが書き戻されて変更されていない場合は、変更されている場合は、読み取り処理を再実行します。
get
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {//hashバケツが存在し、現在のバケツは空ではない
if ((eh = e.hash) == h) {//赤黒木であれば、赤黒木に従って値を取得する
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)//連鎖表をたどって
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
最後に2つの質問を投げかけます:
- 1.8のConcurrentHashMapがLastrunを使わない理由
- あなたは安全でないクラスに属していますか?Javaプログラムで最も素晴らしいクラスと、それが最も素晴らしい理由は何ですか?