コンカレンシーセーフの選択肢
この記事はHashMapの解析を理解することが前提ですので、HashMapのソースコードを見ることをお勧めします:
HashMapが同時並行的に安全でないことはすでに知られています。
HashTable を使用する方法と ConcurrentHashMap を使用する方法です。
しかし、いくらHashTableが並行性安全とはいえ、実際のアプリケーションで使われることはないでしょう。
ここでは、HashTableが使用されない理由を分析するために、最も一般的に使用されているputメソッドをパイプインで見てみましょう。
public synchronized V put(K key, V value) {
// Make sure the value is not null
if (value == null) {
throw new NullPointerException();
}
// Makes sure the key is not already in the hashtable.
Entry tab[] = table;
int hash = hash(key);
int index = (hash & 0x7FFFFFFF) % tab.length;
for (Entry<K,V> e = tab[index] ; e != null ; e = e.next) {
if ((e.hash == hash) && e.key.equals(key)) {
V old = e.value;
e.value = value;
return old;
}
}
modCount++;
if (count >= threshold) {
// Rehash the table if the threshold is exceeded
rehash();
tab = table;
hash = hash(key);
index = (hash & 0x7FFFFFFF) % tab.length;
}
// Creates the new entry.
Entry<K,V> e = tab[index];
tab[index] = new Entry<>(hash, key, value, e);
count++;
return null;
}
あなたが見ることができるように、HashTableは、putメソッドの使用では、オブジェクトに相当するロックを追加するには、このメソッドは、同時に同じ場所に並行性の安全性が原因で追加されますが、効率は非常に低い回避することができます。2つの要素が競合しないが、まだロックを取得するために待機する必要がありますが、同時にHashTableに入れ2つの要素があると想像してください。
したがって、同時実行安全性の問題に直面した場合、通常はConcurrentHashMapを使用することになります。
ConcurrentHashMap
基本構造
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
デフォルトの初期容量:16。HashMapと異なり、この値はすべてのHashEntryの長さの合計を指します。つまり、Entry オブジェクトは合計で 16 個あります。
デフォルトの負荷係数:0.75。
同時実行レベル:16。セグメント数を参照。
コンストラクタ
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
int sshift = 0;
int ssize = 1;
//HashMapの初期化操作と同様に、sizeは2のべき乗の最小数の現在の受信同時実行レベルよりも大きく、これを使用してSegmentオブジェクトの配列を作成する。
while (ssize < concurrencyLevel) {
//size左シフトの数 2^sshift=ssize
++sshift;
ssize <<= 1;
}
//上位のsshiftのバイナリ番号を取得するためにputメソッドで使用される。
this.segmentShift = 32 - sshift;
//添え字については、HashMapと同様である。
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
//Segmentの下にあるEntry配列の長さ。
int c = initialCapacity / ssize;
//切り上げに相当する
if (c * ssize < initialCapacity)
++c;
//MIN_SEGMENT_TABLE_CAPACITY=2,ご覧のように、デフォルトのinitialCapacityとconcurrencyLevelがともに16であるにもかかわらず、Segmentの最小長は2であることが要求されている。
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
//Segmentオブジェクトの初期化は、必要なときにSegmentに直接配置することで行う。[]配列。プロトタイプに相当する。
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
//sszieに従ってSegment配列オブジェクトを作成する。
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
//s0をssの0番目の位置に置く。
UNSAFE.putOrderedObject(ss, SBASE, s0);
this.segments = ss;
}
put
大まかな流れ。
(key,value)---->key.hashcode--->hashcode
//Segment[] 配列の添え字アドレス
int index=hashcode&segment[].length-1;
//Entry[]配列の添え字アドレス
int index1=hashcode&Entry[].length-1
//セグメント配列に入れる
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
//ハッシュ値を取得する
int hash = hash(key);
//Segment[]配列の添え字の位置
int j = (hash >>> segmentShift) & segmentMask;
//セグメントを取得する[]配列のj番目の位置の値
if ((s = (Segment<K,V>)UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) == null)
//j番目の位置が空の場合、Segmentオブジェクトを生成する。
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
//31-1の最上位ビットの前にあるゼロの数。
SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);
Segment[]の下の添え字の位置が: & segmentMaskになっているのはなぜですか?
個人的には、ハッシュの利用率を高めるために、セグメント配列の位置でキーを決定するために使用される数ビットの後に、エントリー配列の位置でキーを決定するために使用される数ビットの最初の数ビットのキーのハッシュコードのために、ハッシュを増やすためにまだ順序であると思いますが、効果はそれほど明白ではないかもしれません。
//複数の判定を行うことで、同時実行の安全性を確保し、値を代入する際の効率を向上させる。
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE;
Segment<K,V> seg;
//同時実行安全性の問題を防ぐために、nullかどうかを再度判断する。
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
//プロトタイプ・オブジェクトとそのプロパティを取得する
Segment<K,V> proto = ss[0];
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
//実際にSegmentオブジェクトを作成する
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
//ss配列のu番目の位置がヌルかどうかを判断し、ヌルならsをsegに代入して返す。
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
//エントリー配列に入れる
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//trylock()ロックを追加しようとする
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
//添え字インデックスのヘッダー・ノード
HashEntry<K,V> first = entryAt(tab, index);
//このノードはチェーンテーブルをトラバースする
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
//同じキーが見つかった場合、更新されるか?
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
//実行条件:最初が空 チェーンの最後尾にトラバースする。
else {
if (node != null)
//
node.setNext(first);
else
//オブジェクトの生成、ヘッダー挿入メソッド
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
//
rehash(node);
else
//ノードをタブ配列の添え字に配置する。
setEntryAt(tab, index, node);
//操作回数
++modCount;
//チェーンテーブルの長さ
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
final void setNext(HashEntry<K,V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
trylockとlockの違い
共通点:どちらもロックの獲得を目的としています。
trylockはブロックせず、ロック取得の有無に応じて対応するブール値を返します。
ロックがブロックされます。
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1;
//ロック要求に失敗した
while (!tryLock()) {
HashEntry<K,V> f;
if (retries < 0) {
if (e == null) {
if (node == null)
//ノードを初期化する。この場合は単なるウォームアップであり、あまり意味がない。
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
//リトライ回数が最大値を超えた場合は、直接ロックを要求し、状況に応じてブロックするかロックを取得する。
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
//retries偶数になったら、この時のヘッドノードが前に入ったヘッドノードと同じかどうかを判断して、同時実行に問題があるかどうかを判断し、問題がなければ、firstの値を更新し、retriesに-1をセットして、スレッドの再判断を行う。
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f;
retries = -1;
}
}
return node;
}
rehash
拡張は、セグメント配列全体の Entry[] に対してではなく、単一のセグメントオブジェクトの下の Entry[] に対して行われます。
private void rehash(HashEntry<K,V> node) {
//
HashEntry<K,V>[] oldTable = table;
//古い配列の長さ
int oldCapacity = oldTable.length;
//新しい配列の長さ
int newCapacity = oldCapacity << 1;
//新しいしきい値を取得する
threshold = (int)(newCapacity * loadFactor);
//新しい配列を生成する
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
//添え字
int sizeMask = newCapacity - 1;
//古い配列を繰り返し処理し、古い配列の要素を転送する。
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
//配列の添え字が NULL ではない
if (e != null) {
//次のノード
HashEntry<K,V> next = e.next;
//拡張後の添え字
int idx = e.hash & sizeMask;
//nextnullの場合、新しい配列の指定された位置にeを格納する。
if (next == null)
newTable[idx] = e;
//nullでない場合、lastRunを使用して最後のチェーンテーブルの先頭ノードを指定し、新しい配列のこのノードの添え字は古い配列と異なり、同時に、配列の次の各ノードの添え字は先頭ノードの添え字と同じになる。
else {
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
//ノード位置の挿入
int nodeIndex = node.hash & sizeMask;
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
get
このメソッドは比較的単純なので、ソースコードが提供され、繰り返されることはありません。
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
size()
このメソッドは、まず、Segment オブジェクトをロックせずに、Segment オブジェクトの要素数を取得し、同時に、サイズを取得中に Segment に他の変更があるかどうかを判断し、変更があれば再カウントします。サイズの取得と同時に Segment を変更する他のスレッドが存在する場合は、まずロックを取得してから要素数をカウントします。
public int size() {
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
//RETRIES_BEFORE_LOCKデフォルトは2で、リトライ回数がこの値と等しい場合は、ロックを直接追加してカウントする。
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock();
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
//セグメントの修正回数を取得する
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
//つまり、そのSegmentに変更操作がない場合に行う。ループを抜ける
if (sum == last)
break;
//セグメントが最後に変更された回数を代入する
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}




