前書き
JDKはHashMapの構造を使用して同時実行のためのHashTableとConcurrentHashMap 2つのクラスを提供します。HashTableは、すべてのキー操作に加えて、同期ロックの同時実行の問題を解決するために、この練習は、粒度のロックを使用して大きすぎるですが、操作は基本的に同時実行のセキュリティを確保するために直列化された同時実行は、パフォーマンスは当然非常に悪いです。だから、JDKはバージョン1.5でConcurrentHashMapは、HashMapを使用して高同時実行シナリオの問題を解決するために提供されます。
JDKの1.7から1.8では多くのことが導入され、HashMapは多くの変更を受け、ConcurrentHashMapも大きな変更を受けましたが、この記事では2つのバージョンを分析し比較しました。
準備
Unsafe
ConcurrentHashMapは内部でUnsafeを多用しているので、まずはUnsafeについて学びましょう。
Unsafeはsun.miscパッケージの下にあるクラスで、主にシステムメモリリソースへの直接アクセスやメモリリソースの自律管理など、低レベルで安全でない操作を行うためのメソッドを提供しています。Unsafeクラスの使い方を誤ると、プログラムエラーの発生確率が高くなり、安全なJava言語が安全でなくなる可能性がありますので、Unsafeの使用には注意が必要です。
javaのUnsafeが提供する主な機能は以下の通りです:
通常はUnsafe内のものを使用せず、パッケージ化されたツールのJDKを使用するのが普通ですが、Unsafeが提供するメソッドを使用する場合。しかし、一点に注意してください:
Unsafe クラスを使って Unsafe インスタンスを取得することはできません。なぜなら
private static final sun.misc.Unsafe UNSAFE;
UNSAFE = sun.misc.Unsafe.getUnsafe();
メソッドgetUnsafe()で確認できます:
@CallerSensitive
public static Unsafe getUnsafe() {
Class var0 = Reflection.getCallerClass();
if (var0.getClassLoader() != null) {
throw new SecurityException("Unsafe");
} else {
return theUnsafe;
}
}
var0は実際にこのgetUnsafeを呼び出すクラスです。
ですから、BootStrapによってロードされたクラスだけがUnsafeを直接使うことができ、他のクラスは直接使うことができません:
- Unsafeオブジェクトはリフレクションで取得できます。
- Xbootclasspath/aを使用して、Unsafe関連メソッドを呼び出すクラスAのjarパッケージのパスを、デフォルトのブートストラップ・パスに追加します。
アンセーフの様々な機能については、ここではインターネット上に多くの情報があります。
HashMap
ConcurrentHashMapの内部は細かい部分が多く、対応するHashMapのバージョンも同じですが、ここでは詳細は省略します。
JDK1.7
構造
構造を見てみましょう。
クラス構成
内部構造
格納構造は、配列+連鎖リストを使った1.7のHashMapと同じです;
同時実行制御では、セグメント化されたロック構造を使用します。HashTableと比較して、ConcurrentHashMapは並行性制御のためのロックの粒度が細かい。
ConcurrentHashMapはHashEntryと呼ばれるkvのストレージユニットは、Segmentは倉庫の部屋に相当し、HashEntryは部屋の商品です。同じ部屋にアクセスする複数のスレッドが唯一のキューにすることができますが、全体のConcurrentHashMapは、チームのSegment.lengthをランク付けすることができます。
ソースコード
重要な構造体クラス
Segment
static final class Segment<K,V> extends ReentrantLock implements Serializable {
transient volatile HashEntry<K,V>[] table;
transient int threshold;
final float loadFactor;
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
//メソッドは省略される
final V put(K key, int hash, V value, boolean onlyIfAbsent){};
private void rehash(HashEntry<K,V> node) {};
private void scanAndLock(Object key, int hash) {};
final V remove(Object key, int hash, Object value) {};
final boolean replace(K key, int hash, V oldValue, V newValue) {};
final V replace(K key, int hash, V value) {};
final void clear() {};
}
Segment は ReentrantLock を継承しているため、Segment もリエントラントロックです。各 Segment にはロックが付属しています。コードを見てわかるように、Segment は単なるロックではなく、その内部は小さな HashMap になっています。
HashEntry
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
final void setNext(HashEntry<K,V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
//....
}
HashEntryは、キーと値のペアを実際にカプセル化するクラスです。その構造は単純なリンクリスト構造です。
コンストラクタ
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0; //ビットシフト数、ssize 2の累乗も計算する。
int ssize = 1;
//ssizeセグメント配列のサイズであり、ここで入力されるconcurrencyLevelは2のべき乗に変更される(putも2のべき乗になるため)。&(どのセグメントでカウントするか)
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift; //これはputで、キーのハッシュコードを何ビット右にずらして、どのセグメントにあるかを計算するために使う。
this.segmentMask = ssize - 1;//segment
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;//セグメントあたりのエントリ数
//除算を防ぐため、合計がinitialCapacityより小さくなることがある。=1 16*1<17
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY; //エントリーを格納する各セグメントには最小値2がある。*2,また、2の倍数でなければならない
while (cap < c)
cap <<= 1;
// create segments and segments[0]
// セグメントの配列と、s0 内にエントリ配列を持つ単一セグメントを作成する。
// -- .7 遅延ロードはしない
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);//segment内部的には小さなハッシュマップに等しい。
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
コンストラクタの3つのパラメータ
int initialCapacity; //エントリー数を設定、デフォルトは16。
float loadFactor; //負荷率、デフォルト0.75f
int concurrencyLevel; //segmentセル数、デフォルト16
上記の2つのパラメータと同じHashMapは、concurrencyLevelは、マップ内のSegmentの数の上記の構造、つまり、どのように多くのSegmentは、スレッドの同時アクセスをサポートすることができます。
実行フロー
- パラメータの較正と再調整は以下のように行いました。
- concurrencyLevel 2の累乗に調整
- 各セグメントに割り当てる HashEntry 配列の長さを計算します(各セグメント内の HashEntry の長さは 2 未満にはできません)。
- まず、initialCapacity / ssize の式で初期長を計算します。
- 長さが2以下の場合は2倍。
- 最初にセグメントを作成し、内部の HashEntry 配列を初期化します。
- Segment配列を再度作成し、作成したSegmentをSegment[0]に入れます。
構築概要
- 各セグメントに含まれるHashEntryの長さは、2より小さくすることはできず、2の倍数です。
- 1.7のConcurrentHashMapは遅延ロードではなく、まずSegment配列とHashEntry配列をSegment配列の最初の位置で初期化します。
ここでは、ConcurrentHashMapの結果は、単位としてのセグメントの理解ではなく、まだ単位としてエントリとしてHashMapに準拠し、セグメントは単なるエントリ上の変更です。
put
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
//セグメント長と比較する必要があるため、ハッシュ値を右にシフトして上位ビットを保持する。&ハッシュコードの予約ビット数とセグメント長のビット数が同じになるように計算する。
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
//segment空なら
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
実行フロー
- パラメータのチェックサムは、キーも値も NULL にはできません。
- キーのハッシュ値を計算し、そのキーがどのセグメントに格納されているかを判断します。
- Segmentに入れる必要があるSegmentの場所が初期化されているかどうかを判断します。初期化されていない場合は、secureSegment?
- Segmentのputを呼び出して要素を投入。
セグメントの保管場所はどのように決めればよいですか?
主な理解はコードにあります:
(hash >>> segmentShift) & segmentMask;
segmentShift と segmentMask は、構築時に計算されます:
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
- ssizeSegment配列の長さ。
- sshift は、Segmentの長さに対応する2の累乗の数です。例えば、ssizeが8の場合、sshiftは3です。
segmentMaskマスクはよくわかりますが、HashMapはマスクと操作で位置を決定します。
32- sshiftはHashMapのビットごとの干渉に相当し、ハッシュ値の上位ビットをマスクや演算で使用できるように保持します。
intは32バイトなので、sshiftを引くことは、実際にはsshiftの上位ビット数を保持することと等価であり、ssizeのバイナリビットでマスク&演算を行うことで、0~ssizeに位置することを保証することができます。
ensureSegment
上記のput関数は、どのSegmentにk/vを格納するかを決定しますが、最初のSegmentの最初の位置のみを構築します。
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype segment0のオブジェクトを再利用して新しいセグメントを作成する。
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
//スピン、演算中に他のスレッドによる修正がないか常に判断する
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
//cas
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
実行フロー
- どちらも Unsafe メソッドを使用して、プットが必要な場所のセグメントがヌルかどうかを判断します。
- NULL の場合、Segment[0] の位置にあるプロパティを再利用して Segment オブジェクトを構築します。
上記の手順は、基本的に実メモリ操作を含むクリティカルに行くことはありません安全でない呼び出しは、時間の実行では、他のスレッドの操作の影響がないことを確認することです。
Segmentのput関数は
最後に、Segment put メソッドを呼び出して、要素を HashEntry 配列に入れます。
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//ロックの取得を試みるが、初期化時にnode returnがnullになることがある。,
//nullでないのは、scanAndLockForPutがロック取得時にウォームアップを助けたことを意味する。
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
// Segment内部HashEntry配列
HashEntry<K,V>[] tab = table;
// ハッシュ値を格納する必要があるHashEntry配列の場所を探す
int index = (tab.length - 1) & hash;
// デポジットが必要な配列の位置にあるZIPの先頭ノードを取得する。
HashEntry<K,V> first = entryAt(tab, index);
// 連鎖表を反復処理する
for (HashEntry<K,V> e = first;;) {
// 現在位置に値がある場合
if (e != null) {
K k;
// 値があり、キーとハッシュが等しい。古い値を直接置き換える
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
// 値があってもキーが等しくなければ、リストを下にたどる。
e = e.next;
}
else { //現在位置がNULLの場合は、連鎖表が空か、最後まで走査されたことを意味する。
if (node != null)
// ロック取得時に現在のノードがすでに初期化されている場合は、元の先頭ノードの次を指して直接先頭挿入を行う。
node.setNext(first);
else
// nodeがまだnullの場合は、ここでHashEntryオブジェクトの生成とヘッダーの挿入を行う。
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
// ConcurrentHashMapはSegmentの容量のみ拡張する。
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
//ヘッドノードに設定
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
// ロック解除を忘れずに
unlock();
}
return oldValue;
}
実行フロー
- ロック取得
- 連鎖したテーブルのスキャン、ヘッダー挿入を使用した要素の挿入、キーが存在する場合の要素の更新
- 容量拡張が必要かどうかの判断
rehash
これは1.7 HashMapと同じで、ダブル展開、チェーンテーブルのポジションの再配置など、説明することはあまりありません。
scanAndLockForPut
この機能の主な目的は、ロックの獲得を試み続け、多くのウォーミングアップを行うことです。
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
//連結されたテーブルの先頭を取得する
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
//ループでロック取得を試みる。
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
// ここで、0と判定するのは、主にノードの作成が1回で済むようにするためである。
if (retries < 0) {
if (e == null) {
// 繰り返し計算して、リンクされたテーブルにキーがないことがわかったので、バックアップのために思い切って初期化する。
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
// 後者はもうここでは実行しない。
retries = 0;
}
// 同じキーのノードがあれば、それは上書きしたいということなので、新しいノードは作らない。
else if (key.equals(e.key))
retries = 0;
else
//ノードを下にトラバースし続ける
e = e.next;
}
//MAX_SCAN_RETRIESシングルコアは1、マルチコアは64。,
//試行回数がMAXより大きい場合_SCAN_RETRIES,その後、lock()メソッドを呼び出してブロックし、現在のスレッドが同期キューに入るようにする。
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
// 連結されたテーブルが変更された場合、再トラバースする必要がある。
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
// nodeを返す(nullでもよい)。
return node;
}
どのようなウォーミングアップを行いましたか?
- ロックの取得を常に試みている間、現在のリンクされたテーブルがスキャンされ、そのキーが含まれているかどうかが判断されます。
- HashEntryが含まれていない場合は、まずここでHashEntryを初期化して返します。そうでない場合は null を返します。
get
get関数はもう少し単純です:
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
//セグメントの位置を特定する
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
// HashEntry配列の位置を定義し、その位置のリンクリストをトラバースしてキーが利用可能かどうかを判断する
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
実行フロー
- セグメント位置の取得
- HashEntryの位置取得
- チェーン・テーブルを繰り返し、要素があるかどうかを調べます。
remove
ロジックも明確でシンプルで、コードを直接見れば一目瞭然です。
public V remove(Object key) {
int hash = hash(key);
//セグメントを探す
Segment<K,V> s = segmentForHash(hash);
return s == null ? null : s.remove(key, hash, null);
}
final V remove(Object key, int hash, Object value) {
//Segmentを入力し、ロックを先に取得する
if (!tryLock())
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry<K,V>[] tab = table;
//HashEntryを探す
int index = (tab.length - 1) & hash;
HashEntry<K,V> e = entryAt(tab, index);
HashEntry<K,V> pred = null;
while (e != null) {
K k;
HashEntry<K,V> next = e.next;
//削除するノードを見つけ、削除する。
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
V v = e.value;
if (value == null || value == v || value.equals(v)) {
if (pred == null)
setEntryAt(tab, index, next);
else
pred.setNext(next);
++modCount;
--count;
oldValue = v;
}
break;
}
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}
size
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // sizeサイズがオーバーフローしているかどうか
long sum; // 現在のmodCountカウント
long last = 0L; // modCount数量
int retries = -1; // first iteration isn't retry
try {
for (;;) {
//RETRIES_BEFORE_LOCK=2
//最初にロックしない方式
if (retries++ == RETRIES_BEFORE_LOCK) {
// 2回計算し、modCountの数値が2つとも同じでない場合は、修正があったことを意味し、すべてのSegmentをロックしてカウントする。
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
//ロックなしで全セグメントのカウントとmodCountを計算する。
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
// 2回の比較が同じであれば、その間に変更がないことを意味し、サイズは正確である。
if (sum == last)
break;
last = sum;
}
} finally {
// ロックされている場合は、最後にロックを解除することを忘れずに
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
JDK1.8
JDK1.8クラスは非常に複雑で、めまいを見るために内部クラスの数が多い。
構造
内部構造
基本的な構造とHashMapは同じで、配列+リンクリスト/赤黒木を使ってデータを格納します。ただ、セグメント化されたロックは使わず、同時実行の安全性を確保するためにCASとsynchronizedを使用します。
ソースコード
内部クラス
重要な構造体クラス
Node
最も基本的なクラスのConcurrentHashMapは、最終的なパッケージは、オブジェクトのキーと値を堆積。
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}
これは連鎖したテーブル構造であることがわかりますが、ConcurrentHashMapのこのハッシュにはもっと重要なことがあります:
//hash値が-1の場合、そのノードは順方向である。
static final int MOVED = -1; // hash for forwarding nodes
//hash値が-2なら木のルート。
static final int TREEBIN = -2; // hash for roots of trees
//hash値-3は予約ノードであることを意味し、computeIfAbsentなどに使用される。
static final int RESERVED = -3; // hash for transient reservations
TreeNode
赤黒木のノード、チェーン・リストがツリーに変換されるときにノード・ノードがツリー・ノードに変換されます。
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
}
TreeBin
ConcurrentHashMap内部ハッシュテーブルは、直接TreeNodeノードが格納されていませんが、TreeBinのストレージは、赤黒いツリーを維持するためにTreeBinを介して
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
}
ForwardingNode
拡張時間で使用されるクラスの拡張は、拡張時間では、ノードの操作は、ノードの状態になり、今回はプット操作があり、ノードは拡張を支援するために最初に行くことに遭遇します。
ForwardingNode オブジェクトには 2 つのソースがあります:
- 元の位置はnullですが、この時点でコピー操作は現在の位置の後ろに回り、元のバケツのこの位置をForwardingNodeオブジェクトとして設定します;
- 2.元の位置はヌルではありませんが、位置はすでに操作されています。
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
}
重要なプロパティ
table
ハッシュテーブル
transient volatile Node<K,V>[] table;
nextTable
拡張する場合は新しいハッシュテーブルを、そうでない場合はNULLを返します。
private transient volatile Node<K,V>[] nextTable;
baseCount
基本的なカウンタで、主にマルチスレッドの同時更新競争がないときに使用されます。 ConcurrentHashMapは、ブロックされたときに再びカウンタを同時更新したくないので、カウントメソッドはbaseCount + counterCellsを使用します。
private transient volatile long baseCount;
counterCells
この配列は配列で、長さは2のべき乗です。
private transient volatile CounterCell[] counterCells;
sizeCtl
テーブル展開の初期化などを制御するコア属性の1つ。後で、各状態とその役割について詳しく紹介します。
private transient volatile int sizeCtl;
コンストラクタ
HashMapとConcurrentHashMapの両方で1.8は、遅延ローディングを使用している、つまり、コンストラクタの呼び出しで、内部テーブルを初期化しませんが、最初のプットでメモリのために適用されます。
// デフォルトのパラメータで構築
// 初期容量、負荷率(0.75) concurrencyLevel (16)
public ConcurrentHashMap() {
}
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
//initialCapacityまた、concurrencyLevelを使用することで、最大の使用率を取ることができる。
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size); //テーブルが2のべき乗であることを確認する
this.sizeCtl = cap;
}
ここでは2つのコンストラクタだけが選ばれていますが、他のコンストラクタもこれと似たようなもので、コンストラクタの主な役割は値sizeCtlを計算することであることがわかります。
sizeCtl
sizeCtl は、テーブルの初期化と拡張にいくつかの方法で使用される非常に重要なプロパティです:
- 0は、初期容量が指定されていないことを示します。
- テーブルが初期化中であることを示す -1
- -Nは、拡張操作を実行しているスレッドがN-1個あることを示します。
- その他の場合
- テーブルが初期化されていない場合、初期化されるテーブルのサイズを示します。
- テーブルの初期化が完了した場合、拡張のしきい値を示し、実際の容量 >= sizeCtl の場合、容量を拡張します。
注意点
- loadFactor この値を指定すると、構築時にのみ使用され、それ以降の操作では使用されません。
(1.0 + (long)initialCapacity / loadFactor);計算式は、まず計算し、この値の2のべき乗に最も近い値を取って初期容量として使用します。この理由は、負荷率は1回限りであり、作者はユーザーが指定した初期容量が、ユーザーが見積もったストレージの容量だと考えているからです。そこで、この値に基づいて、膨張を避けようとするため、逆に、負荷率でこのサイズが確保されるようなサイズを求め、要素のinitialCapacityを格納して、膨張させないようにします。- ここでconcurrencyLevelとjdk1.7の意味は同じではありませんが、彼は同時更新に関与するスレッドの推定数を述べました。実際には、および同時実行制御は、唯一のコンストラクタでそれを使用し、initialCapacityの比較は、初期容量が同時実行の場合の推定数よりも少ない場合、初期容量として同時実行の数を行うにはあまりありません。
put
put は単に putVal メソッドの呼び出しです。
public V put(K key, V value) {
return putVal(key, value, false);
}
主に見るべきはputValです。
final V putVal(K key, V value, boolean onlyIfAbsent) {
// keybaseCounterも値もnullにはできない
if (key == null || value == null) throw new NullPointerException();
// ビット干渉を利用してハッシュ値を計算する
int hash = spread(key.hashCode());
//テーブルのセル数を数えるのに使う[index]((ビンとも呼べる)既に要素がいくつあるか、ツリーへの展開や転送を制御するために使用。
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0) //テーブルがNULLの場合は初期化する。
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //テーブルのどこにキーが格納されているかを探し、その場所に値があるかどうかを判断する。
// 値がなければCASがそのキーのNodeを挿入して終了。
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//fこれはテーブル[index]配列の先頭ノード、ここでMOVE状態かどうかを検出する、MOVEは配列展開のデータコピー段階であることを示す
else if ((fh = f.hash) == MOVED)
//現在のスレッドも、展開の複製に参加するのを手伝いに行く
tab = helpTransfer(tab, f);
else {
/*
上記の除外表は空である,table[index]空の場合、配列はケースを拡張している、これらのケースは同時実行を制御するために同期なしで処理される。
ただし、その位置に値があれば、put操作は同期的に行う必要がある。
*/
V oldVal = null;
// *core:
synchronized (f) {
if (tabAt(tab, i) == f) {
//fh .hash
if (fh >= 0) { //fh>=0 連鎖表であることを説明する
binCount = 1;
for (Node<K,V> e = f;; ++binCount) { //連鎖表を反復処理する
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) { //キーが既に存在する場合は置き換える
oldVal = e.val;
if (!onlyIfAbsent) ////putIfAbsentの場合、このキーがvalueに設定されていないときのみ値を設定する。
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) { //端のノードまでトラバースし、そのノードを末尾に追加する。
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//tableその位置はすでに赤黒木に変換されている。
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
//木を歩くロジックを入れる
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//処理後、ツリー化条件を満たすかどうか確認する
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD) //8以上の場合はツリー化する。
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//
addCount(1L, binCount);
return null;
}
実行フロー
- テーブル内のキーの位置を確認し、ハッシュテーブルがNULLかどうか、また、その位置にデータがあるかどうかを判断します。
- 上記の条件がいずれも真でない場合、同期ブロックに入り、ツリーまたはリンクリストをトラバースして、キーを挿入または更新します。
- 次に、現在のビン位置のノード数を赤黒木に変換する必要があるかどうかを判断します。
テーブル初期化
1.8の内部ハッシュテーブルは、上記のコードのinitTableメソッドを呼び出すことで、put時にのみ初期化されます。
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) { //tableまだ初期化されていない。
if ((sc = sizeCtl) < 0) //sizeCtl0より小さい場合は、他のスレッドがテーブルの初期化または拡張を行っていることを意味する。
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //テーブルを初期化する準備を始める。まずCASを使ってsizeCtrlを-1にする。
try {
if ((tab = table) == null || tab.length == 0) {
//サイズを指定した場合は、指定したサイズのNode配列を作成し、指定しない場合はデフォルトのサイズのNode配列を作成する。
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc; //初期化後、sizeCtlの長さは、配列の長さの3/4n - (n>>> 2)
}
break;
}
}
return tab;
}
コード内部では、テーブルの初期化とは別に、現在の状態を識別するためにsizeCtrlの値が常に調整されていることがわかります。現在のスレッドがすでに初期化しているスレッドを見つけると、CPUをアイドル状態にします。最後に、初期化が完了すると、sizeCtrlの値はテーブル配列のサイズの0.75倍に設定されます。
マルチスレッド展開
ConcurrentHashMapが同時並行で要素を追加していく際に、マップが膨張していくのが分かると、他のスレッドもその膨張を助けてスピードアップしていく、これが1.8で追加されたマルチスレッド展開です。この部分は、上記で呼び出されたhelpTransferメソッドで、主に拡張時にテーブル表内のノードをnextTableに転送します。
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// テーブルが空ではない&& ノードタイプはForwardingNodeの転送タイプを作る。&& ノードのnextTableが空でない。
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
// nextTabが同時に変更されず、かつtabが同時に変更されず、かつsizeCtlが同時に変更された場合。<0((まだ拡大中であることを示す)
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// 展開終了の条件
// sizeCtlの符号なし右シフト16がrsと等しくない場合。
// または sizeCtl== rs + 1 ((拡張が終わり、拡張のためのスレッドがなくなる)(デフォルトの最初のスレッドセット sc==rs 16ビット左シフトする。+ 2,最初のスレッドが展開し終わったら、scを1つ引く。この時点で、scはrsと等しい。+ 1)
// または sizeCtl== rs + 65535 ((ヘルパースレッドの最大数(65535)に達した場合)
// または、添え字を調整する
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 移行を手伝いに来たスレッドごとにsizeCtlする。+1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
//
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
カウント更新
このメソッドのメイン関数であるaddCountを呼び出す必要があります:
- これは、baseCount の値を x だけ増加させる CAS です。
- 更新が失敗した場合、または counterCells が NULL の場合は、fullAddCount メソッドを呼び出して更新をループします。
- チェック値に基づいて、現在のビンバケットを拡張する必要があるかどうかを判断します。
詳細な分析は以下の通り:
private final void addCount(long x /*加算する数*/, int check /*ビン下のノード数*/) {
CounterCell[] as; long b, s;
// カウントボックスcounterCellが空でない場合|| CASbaseCountの値の更新に失敗した。
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
// counterCellsが空なら、まだ同時処理が発生していないことを意味する。
// || counterCells配列の残りの1つの位置が空であれば、baseCounter+CellCounterを使ってカウントする。
// || このセルの変数を変更しないと同時並行が発生する
// fullAddCountを呼び出すだけで、CounterCellsに数字が加算される。
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
//fullAddCountメソッドの機能はセルの値を更新することである。要素数が正しく更新されるようにする。
fullAddCount(x, uncontended);
return;
}
// チェックが1より小さければ終了する。
if (check <= 1)
return;
s = sumCount();
}
//チェック値が0以上の場合、拡大操作が必要かどうかチェックする必要がある
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//既に他のスレッドが展開操作を行っているかどうか。
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//現在のスレッドが唯一、もしくは最初に展開を開始する。=null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
の最後にこのプロセスをここに置く、このプロセスでは、いくつかのより重要なポイントがこのプロセスに関与していない、後で別々に分析されます。
拡張演算
putのaddCountの最後のステップでは、展開操作を行う必要があるかどうかをチェックしに行きます。その呼び出しがtransferメソッドです。この拡張には2つのシナリオがあります:
- sizeCtl<0、つまり、拡張している別のスレッドがあり、この現在のスレッドは拡張を支援するタスクに関与する必要があります;
- このスレッドは唯一の拡張スレッドです。
拡張コードは、内部のConcurrentHashMapの最も複雑な部分と見なされる必要があります、拡張は主に2つのステップでは、まず、すべての2倍の配列の拡張の作成後、新しいテーブルに古いテーブルのデータを移動します。
移籍の方法は複雑なので、ここでは一般的な手順をまとめます:
- 現在のマシンのCPUコア数とハッシュテーブルの長さは、各スレッドが処理を助けるバケットの数を取得し、各スレッドが処理できる数を決定するために使用されます。
- 展開されたデータが初期化されていないことが判明した場合は、次のように初期化します。
- ループに入ると、マルチスレッド展開のロジックはほとんどここにあります。
- すなわち、ハッシュテーブルの範囲を処理するためにスレッドを割り当てます。
- また、バケットが他のスレッドによって処理された場合など、様々な状況に対処するための判断ロジックもたくさんあります。
- チェーンテーブルの拡張ノードの転送
- 拡張を加速するために、チェーンテーブルノードが2つの部分に分割され、チェーンテーブルの連続同じノードの端は、直接上にドラッグすることができますし、ドラッグするだけでポイントにすべての方法を頭から開始し、チェーンテーブルを横断し、頭が2つのチェーンテーブルの方法に挿入されます。
- その後、新しい位置に新しいテーブルに2つのリンクされたテーブルを格納します。
- ツリー拡張ノード転送
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// ここでのポイントは、1つのスレッドで処理できるテーブル・バケットの最小数は16以下にはできないということである。
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE/*=16*/)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// これが最初の拡張の場合、拡張された配列を初期化する必要がある。
if (nextTab == null) { // initiating
try {
// 2回展開する。
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
// 最後のバケットを指すので、後ろから前へのトラバーサルを容易にする
transferIndex = n;
}
int nextn = nextTab.length;
//マイグレーションが完了したバケットをマークするForwardingNodeを定義する。
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true; //次のバケツを探すために前進を続けるかどうか
boolean finishing = false; //展開が終了するタイミングを制御し、終了したものがあるかどうかを配列を再スキャンして確認する。
// このforループは、ストライド長タスクに使われる。
// iバックはストライドの最大の添え字に、バウンドはストライドの最小の添え字に割り当てられる。
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 現在のスレッドが逆進できる場合は、主にiの減少を制御する。また、各スレッドはここで、処理に必要なバケツの間隔を取得する。
// 主に現在のスレッドがバケット間隔を処理するために必要な計算を行うために、テーブルの最後の位置から開始する
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
//現在のスレッドで全タスク完了
int sc;
//拡張が終わったらフォローアップを行い、nextTableにnullをセットして拡張が終わったことを示し、テーブルを新しい配列にポイントし、sizeCtlを拡張のしきい値にセットする。
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//スレッドは展開が終わるたびにsizeCtlの値を1回更新し、マイナス1演算を行う。
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
//末尾の識別子を変更するのに加えて、iをセットする必要がある。= n; そのため、移行に成功していないバケットを見逃さないように、配列を再度チェックすることができる。
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
// 現在のスロットにNodeがない場合、CASはそのスロットが処理済みであることを示すためにfwdを挿入する。
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
// すでに他のスレッドで処理されているスレッドに遭遇したら、直接スキップする。
advance = true; // already processed
else {
// ノードの移動操作
synchronized (f) {
if (tabAt(tab, i) == f) {
//ln 新しい配列の元の位置にあるノードのリスト
//hn 新しい配列の元の位置+nのノードのリストは
Node<K,V> ln, hn;
// 連結されたテーブルを移行する
if (fh >= 0) {
// ここでしばらく迷ったのだが、元のテーブルの位置をカウントしていない(fhのはず)。&(n-1))
// ここで、配列の長さを&,配列の長さが2のべき乗であるため、そのビットのハッシュ値が0か1かが判断の基準となる。
// 連続する同一チェーンの最初のノードのハッシュを保持する。&n
int runBit = fh & n;
// 最後に連続した同一チェーンの最初のノードを保持する。
Node<K,V> lastRun = f;
/* リンクされた表全体の最初の走査
* runBitとlastRunが何をするのかを説明している興味深いものがある:
* ここでは、連鎖した表を2種類に分ける:1つは移動が必要なもの、もう1つは元の位置に残るもの。
* このループは、チェーン・テーブルの最後の位置と、その位置以降のすべてのノード、および位置ノードのハッシュを求めるものである。& n
* */
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
// 異なるたびにリセットする。
runBit = b;
lastRun = p;
}
}
// runBitが0の場合、lnの最初のノードをlastRunとする。
if (runBit == 0) {
ln = lastRun;
hn = null;
}
// 0でない場合は+n位置、hnを先に入れる
else {
hn = lastRun;
ln = null;
}
// 連鎖したテーブルを2回目、lastRunの位置までだけトラバースする。
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
// ヘッダー挿入方式を採用しているため、このチェーン・テーブルをトラバースする際には、ヘッダー挿入された要素が逆になる。
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//ln新しい配列の元の位置を設定する
setTabAt(nextTab, i, ln);
//hn新しい配列をセットアップする+n位置
setTabAt(nextTab, i + n, hn);
//その位置で元の表をfwdにする。
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
// 赤黒木を移行する。
// ........
}
}
}
}
}
}
size
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
remove
取り外しは比較的簡単で、プロセスを簡単に説明しています:
- テーブルが初期化されていない場合や、キーでテーブルをハッシュ検索できない場合は null を返します。
- ロケーションバケットノードタイプがForwardingNodeノードの場合、helpTransferを実行して拡張を支援します。
- 問題がなければ、バケツをロックし、ノードを削除します。
- 最後に、CAS の baseCount 値を更新するために addCount メソッドが呼び出されます。
get
とてもシンプルなコードです。
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// キーの配列位置を決める
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// その位置にあるすべてのノードを反復処理する
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
//見つからなければnullを返す
return null;
}
まとめ
.8
構造
どちらも要素操作にUnsafeメソッドを多用しています。同時実行制御
- なぜSegment+ReentrantLockではなくCAS+Synchronizedを使うのですか?
- メモリリソースの消費を削減:テーブルが非常に大きい場合、ReentrantLockオブジェクトが多くなります。
- Synchronizedの最適化により、パフォーマンスは悪くありません。
- ロックの粒度をより細かく制御可能
初期状態:
- 1.7: Segmentとハッシュテーブルの初期化があります。
- 1.8: 遅延ロードが使用され、ハッシュテーブルは最初のPUT時のみ初期化されます。
収納構造:
- 1.7: 配列 + 連鎖テーブル
- 1.8: 配列 + 連鎖テーブル, 赤黒ツリー
size
- : baseCounterを使用。+CellCounterでカウント。
- 1.8: baseCounter + CellCounter でカウント。
tech.meituan.com/2019/02/14/...





