blog

jdk7-ConcurrenHashMapソースコードの解釈

しかし、HashTableがコンカレンシーセーフであっても、実際のアプリケーションで使われることはないでしょう。 ここでは、最もよく使われる put メソッドから HashTable が使われない理由...

Mar 29, 2020 · 9 min. read
シェア

コンカレンシーセーフの選択肢

この記事はHashMapの解析を理解することが前提ですので、HashMapのソースコードを見ることをお勧めします:

HashMapが同時並行的に安全でないことはすでに知られています。

HashTable を使用する方法と ConcurrentHashMap を使用する方法です。

しかし、いくらHashTableが並行性安全とはいえ、実際のアプリケーションで使われることはないでしょう。

ここでは、HashTableが使用されない理由を分析するために、最も一般的に使用されているputメソッドをパイプインで見てみましょう。

public synchronized V put(K key, V value) {
 // Make sure the value is not null
 if (value == null) {
 throw new NullPointerException();
 }
 // Makes sure the key is not already in the hashtable.
 Entry tab[] = table;
 int hash = hash(key);
 int index = (hash & 0x7FFFFFFF) % tab.length;
 for (Entry<K,V> e = tab[index] ; e != null ; e = e.next) {
 if ((e.hash == hash) && e.key.equals(key)) {
 V old = e.value;
 e.value = value;
 return old;
 }
 }
 modCount++;
 if (count >= threshold) {
 // Rehash the table if the threshold is exceeded
 rehash();
 tab = table;
 hash = hash(key);
 index = (hash & 0x7FFFFFFF) % tab.length;
 }
 // Creates the new entry.
 Entry<K,V> e = tab[index];
 tab[index] = new Entry<>(hash, key, value, e);
 count++;
 return null;
}

あなたが見ることができるように、HashTableは、putメソッドの使用では、オブジェクトに相当するロックを追加するには、このメソッドは、同時に同じ場所に並行性の安全性が原因で追加されますが、効率は非常に低い回避することができます。2つの要素が競合しないが、まだロックを取得するために待機する必要がありますが、同時にHashTableに入れ2つの要素があると想像してください。

したがって、同時実行安全性の問題に直面した場合、通常はConcurrentHashMapを使用することになります。

ConcurrentHashMap

基本構造

public ConcurrentHashMap() {
 this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

デフォルトの初期容量:16。HashMapと異なり、この値はすべてのHashEntryの長さの合計を指します。つまり、Entry オブジェクトは合計で 16 個あります。

デフォルトの負荷係数:0.75。

同時実行レベル:16。セグメント数を参照。

コンストラクタ

public ConcurrentHashMap(int initialCapacity,
 float loadFactor, int concurrencyLevel) {
 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
 throw new IllegalArgumentException();
 if (concurrencyLevel > MAX_SEGMENTS)
 concurrencyLevel = MAX_SEGMENTS;
 int sshift = 0;
 int ssize = 1;
 //HashMapの初期化操作と同様に、sizeは2のべき乗の最小数の現在の受信同時実行レベルよりも大きく、これを使用してSegmentオブジェクトの配列を作成する。
 while (ssize < concurrencyLevel) {
 //size左シフトの数 2^sshift=ssize
 ++sshift;
 ssize <<= 1;
 }
 //上位のsshiftのバイナリ番号を取得するためにputメソッドで使用される。
 this.segmentShift = 32 - sshift;
 //添え字については、HashMapと同様である。
 this.segmentMask = ssize - 1;
 if (initialCapacity > MAXIMUM_CAPACITY)
 initialCapacity = MAXIMUM_CAPACITY;
 //Segmentの下にあるEntry配列の長さ。
 int c = initialCapacity / ssize;
 //切り上げに相当する
 if (c * ssize < initialCapacity)
 ++c;
 	//MIN_SEGMENT_TABLE_CAPACITY=2,ご覧のように、デフォルトのinitialCapacityとconcurrencyLevelがともに16であるにもかかわらず、Segmentの最小長は2であることが要求されている。
 int cap = MIN_SEGMENT_TABLE_CAPACITY;
 while (cap < c)
 cap <<= 1;
 //Segmentオブジェクトの初期化は、必要なときにSegmentに直接配置することで行う。[]配列。プロトタイプに相当する。
 Segment<K,V> s0 =
 new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
 (HashEntry<K,V>[])new HashEntry[cap]);
 //sszieに従ってSegment配列オブジェクトを作成する。
 Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
 //s0をssの0番目の位置に置く。
 UNSAFE.putOrderedObject(ss, SBASE, s0); 
 this.segments = ss;
}

put

大まかな流れ。

(key,value)---->key.hashcode--->hashcode
 //Segment[] 配列の添え字アドレス
int index=hashcode&segment[].length-1;
//Entry[]配列の添え字アドレス
int index1=hashcode&Entry[].length-1
//セグメント配列に入れる
public V put(K key, V value) {
 Segment<K,V> s;
 if (value == null)
 throw new NullPointerException();
 //ハッシュ値を取得する
 int hash = hash(key);
 //Segment[]配列の添え字の位置
 int j = (hash >>> segmentShift) & segmentMask;
 //セグメントを取得する[]配列のj番目の位置の値
 if ((s = (Segment<K,V>)UNSAFE.getObject 
 (segments, (j << SSHIFT) + SBASE)) == null) 
 //j番目の位置が空の場合、Segmentオブジェクトを生成する。
 s = ensureSegment(j);
 return s.put(key, hash, value, false);
}
//31-1の最上位ビットの前にあるゼロの数。
SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);

Segment[]の下の添え字の位置が: & segmentMaskになっているのはなぜですか?

個人的には、ハッシュの利用率を高めるために、セグメント配列の位置でキーを決定するために使用される数ビットの後に、エントリー配列の位置でキーを決定するために使用される数ビットの最初の数ビットのキーのハッシュコードのために、ハッシュを増やすためにまだ順序であると思いますが、効果はそれほど明白ではないかもしれません。

//複数の判定を行うことで、同時実行の安全性を確保し、値を代入する際の効率を向上させる。
private Segment<K,V> ensureSegment(int k) {
 final Segment<K,V>[] ss = this.segments;
 long u = (k << SSHIFT) + SBASE;
 Segment<K,V> seg;
 //同時実行安全性の問題を防ぐために、nullかどうかを再度判断する。
 if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
 //プロトタイプ・オブジェクトとそのプロパティを取得する
 Segment<K,V> proto = ss[0]; 
 int cap = proto.table.length;
 float lf = proto.loadFactor;
 int threshold = (int)(cap * lf);
 HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
 if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
 == null) {
 //実際にSegmentオブジェクトを作成する
 Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
 while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
 == null) {
 //ss配列のu番目の位置がヌルかどうかを判断し、ヌルならsをsegに代入して返す。
 if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
 break;
 }
 }
 }
 return seg;
}
//エントリー配列に入れる
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
 //trylock()ロックを追加しようとする
 HashEntry<K,V> node = tryLock() ? null :
 scanAndLockForPut(key, hash, value);
 V oldValue;
 try {
 HashEntry<K,V>[] tab = table;
 int index = (tab.length - 1) & hash;
 //添え字インデックスのヘッダー・ノード
 HashEntry<K,V> first = entryAt(tab, index);
 //このノードはチェーンテーブルをトラバースする
 for (HashEntry<K,V> e = first;;) {
 if (e != null) {
 K k;
 if ((k = e.key) == key ||
 (e.hash == hash && key.equals(k))) {
 oldValue = e.value;
 //同じキーが見つかった場合、更新されるか?
 if (!onlyIfAbsent) {
 e.value = value;
 ++modCount;
 }
 break;
 }
 e = e.next;
 }
 //実行条件:最初が空 チェーンの最後尾にトラバースする。
 else {
 if (node != null)
 // 
 node.setNext(first);
 else
 //オブジェクトの生成、ヘッダー挿入メソッド
 node = new HashEntry<K,V>(hash, key, value, first);
 int c = count + 1;
 if (c > threshold && tab.length < MAXIMUM_CAPACITY)
 // 
 rehash(node);
 else
 //ノードをタブ配列の添え字に配置する。
 setEntryAt(tab, index, node);
 //操作回数
 ++modCount;
 //チェーンテーブルの長さ
 count = c;
 oldValue = null;
 break;
 }
 }
 } finally {
 unlock();
 }
 return oldValue;
}
final void setNext(HashEntry<K,V> n) {
 UNSAFE.putOrderedObject(this, nextOffset, n);
}

trylockとlockの違い

共通点:どちらもロックの獲得を目的としています。

trylockはブロックせず、ロック取得の有無に応じて対応するブール値を返します。

ロックがブロックされます。

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
 HashEntry<K,V> first = entryForHash(this, hash);
 HashEntry<K,V> e = first;
 HashEntry<K,V> node = null;
 int retries = -1; 
 //ロック要求に失敗した
 while (!tryLock()) {
 HashEntry<K,V> f;
 if (retries < 0) {
 if (e == null) {
 if (node == null) 
 //ノードを初期化する。この場合は単なるウォームアップであり、あまり意味がない。
 node = new HashEntry<K,V>(hash, key, value, null);
 retries = 0;
 }
 else if (key.equals(e.key))
 retries = 0;
 else
 e = e.next;
 }
 //リトライ回数が最大値を超えた場合は、直接ロックを要求し、状況に応じてブロックするかロックを取得する。
 else if (++retries > MAX_SCAN_RETRIES) {
 lock();
 break;
 }
 //retries偶数になったら、この時のヘッドノードが前に入ったヘッドノードと同じかどうかを判断して、同時実行に問題があるかどうかを判断し、問題がなければ、firstの値を更新し、retriesに-1をセットして、スレッドの再判断を行う。
 else if ((retries & 1) == 0 &&
 (f = entryForHash(this, hash)) != first) {
 e = first = f;
 retries = -1;
 }
 }
 return node;
}

rehash

拡張は、セグメント配列全体の Entry[] に対してではなく、単一のセグメントオブジェクトの下の Entry[] に対して行われます。

private void rehash(HashEntry<K,V> node) {
 // 
 HashEntry<K,V>[] oldTable = table;
 //古い配列の長さ
 int oldCapacity = oldTable.length;
 //新しい配列の長さ
 int newCapacity = oldCapacity << 1;
 //新しいしきい値を取得する
 threshold = (int)(newCapacity * loadFactor);
 //新しい配列を生成する
 HashEntry<K,V>[] newTable =
 (HashEntry<K,V>[]) new HashEntry[newCapacity];
 //添え字
 int sizeMask = newCapacity - 1;
 //古い配列を繰り返し処理し、古い配列の要素を転送する。
 for (int i = 0; i < oldCapacity ; i++) {
 HashEntry<K,V> e = oldTable[i];
 //配列の添え字が NULL ではない
 if (e != null) {
 //次のノード
 HashEntry<K,V> next = e.next;
 //拡張後の添え字
 int idx = e.hash & sizeMask;
 //nextnullの場合、新しい配列の指定された位置にeを格納する。
 if (next == null) 
 newTable[idx] = e;
 //nullでない場合、lastRunを使用して最後のチェーンテーブルの先頭ノードを指定し、新しい配列のこのノードの添え字は古い配列と異なり、同時に、配列の次の各ノードの添え字は先頭ノードの添え字と同じになる。
 else { 
 HashEntry<K,V> lastRun = e;
 int lastIdx = idx;
 for (HashEntry<K,V> last = next;
 last != null;
 last = last.next) {
 int k = last.hash & sizeMask;
 if (k != lastIdx) {
 lastIdx = k;
 lastRun = last;
 }
 }
 newTable[lastIdx] = lastRun;
 for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
 V v = p.value;
 int h = p.hash;
 int k = h & sizeMask;
 HashEntry<K,V> n = newTable[k];
 newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
 }
 }
 }
 }
 //ノード位置の挿入
 int nodeIndex = node.hash & sizeMask; 
 node.setNext(newTable[nodeIndex]);
 newTable[nodeIndex] = node;
 table = newTable;
}

get

このメソッドは比較的単純なので、ソースコードが提供され、繰り返されることはありません。

public V get(Object key) {
 Segment<K,V> s; // manually integrate access methods to reduce overhead
 HashEntry<K,V>[] tab;
 int h = hash(key);
 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
 if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
 (tab = s.table) != null) {
 for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
 e != null; e = e.next) {
 K k;
 if ((k = e.key) == key || (e.hash == h && key.equals(k)))
 return e.value;
 }
 }
 return null;
}

size()

このメソッドは、まず、Segment オブジェクトをロックせずに、Segment オブジェクトの要素数を取得し、同時に、サイズを取得中に Segment に他の変更があるかどうかを判断し、変更があれば再カウントします。サイズの取得と同時に Segment を変更する他のスレッドが存在する場合は、まずロックを取得してから要素数をカウントします。

public int size() {
 final Segment<K,V>[] segments = this.segments;
 int size;
 boolean overflow; // true if size overflows 32 bits
 long sum; // sum of modCounts
 long last = 0L; // previous sum
 int retries = -1; // first iteration isn't retry
 try {
 for (;;) {
 //RETRIES_BEFORE_LOCKデフォルトは2で、リトライ回数がこの値と等しい場合は、ロックを直接追加してカウントする。
 if (retries++ == RETRIES_BEFORE_LOCK) {
 for (int j = 0; j < segments.length; ++j)
 ensureSegment(j).lock(); 
 }
 sum = 0L;
 size = 0;
 overflow = false;
 for (int j = 0; j < segments.length; ++j) {
 Segment<K,V> seg = segmentAt(segments, j);
 if (seg != null) {
 //セグメントの修正回数を取得する
 sum += seg.modCount;
 int c = seg.count;
 if (c < 0 || (size += c) < 0)
 overflow = true;
 }
 }
 //つまり、そのSegmentに変更操作がない場合に行う。ループを抜ける
 if (sum == last)
 break;
 //セグメントが最後に変更された回数を代入する
 last = sum;
 }
 } finally {
 if (retries > RETRIES_BEFORE_LOCK) {
 for (int j = 0; j < segments.length; ++j)
 segmentAt(segments, j).unlock();
 }
 }
 return overflow ? Integer.MAX_VALUE : size;
}
Read next