Sparkのデータはパーティショニングしてクラスタマシンに分散させることができ、SparkはPartitionerクラスを使ってキーに基づいてデータをパーティショニングします。
Sparkには、Partitionerを継承したHashPartitioner/RangePartitionerの2つのパーティショニングクラスがあります。パーティショニングロジックをカスタマイズしたい場合は、Partitionerを継承した独自のクラスを作成できます。
パーティショニング戦略の指定
rdd.partitionBy(new CustomPartitioner(10)).saveAsTextFile("...")
rdd1 = rdd1.partitionBy(new CustomPartitioner(10))
rdd2.partitionBy(new CustomPartitioner(10)).join(rdd1)
デフォルトのパーティショニング戦略
Spark では、パーティショニング・アルゴリズムが指定されていない場合、デフォルトで HashPartitioner が使用されます。
メソッドdefaultPartitionerは、シャッフルによって生成されるRDDパーティションの数と、どのパーティショニング・アルゴリズムを使用するかを取得するためにPartitionerで定義されています。
パーティション数の取得
val defaultNumPartitions =
if (rdd.context.conf.contains("spark.default.parallelism")) {
rdd.context.defaultParallelism
} else {
rdds.map(_.partitions.length).max
}
シャッフルの並列数がプログラムで指定されている場合、これがパーティション数として使用されます。指定されていない場合は、すべての上流RDDの最大パーティション数が結果のRDDパーティション数として使用されます。
パーティショニング戦略の取得
val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
val hasMaxPartitioner: Option[RDD[_]] = if (hasPartitioner.nonEmpty) {
Some(hasPartitioner.maxBy(_.partitions.length))
} else {
None
}
if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) {
hasMaxPartitioner.get.partitioner.get
} else {
new HashPartitioner(defaultNumPartitions)
}
すべての上流RDDからパーティショナーを含むRDDをフィルタリングし、その中から最大のパーティション番号を持つRDDを選択します。そのパーティション番号が、他のすべてのRDDの最大のパーティション番号の1桁以内であれば、そのパーティショナーをパーティショニング・ポリシーとして使用します。そうでない場合は、デフォルトでHashPartitionerがパーティショニング・ポリシーとして使用されます。
パーティショニングポリシーの配信
あるシャッフルで使用されたパーティショニング・ポリシーは次のシャッフルに引き継がれません。つまり、あるシャッフルがあるパーティショニング・ポリシーを使用してRDDを生成すると、次のシャッフルではそのRDDはNoneパーティショニング・ポリシーを持つため、必要に応じてパーティショニング・ポリシーを再設定する必要があります。
HashPartitioner
HashPartitionerは、ハッシュ値を計算するためにキーに、モードを取るためにパーティションの数、0未満の場合は、パーティションの数を追加します。また、サポートキーがNULLの場合、NULLは0パーティションを返します。
HashPartitioner 特定のパーティションが特に大量のデータを持ち、その結果、データが偏ってしまうケースがあります。
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}
RangePartitioner
RangePartitionerはデータ量が均一になるようにすることができ、それは各パーティションに均等に分散されたデータの一定の範囲になります。RangePartitionerでは、RDDデータをサンプリングして分析し、サンプリングされたデータの分布に応じてキーの範囲を調整し、各パーティションの最大キーを計算してrangeBoundsを生成します。
RangePartitionerは主にsortByKeyのようなデータソート関連のAPIで使用されます。
//samplePointsPerPartitionHint=20
val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
採取サンプル数は、sample20*パーティション数のsampleSizePerPartition、または1e6より大きい場合は1e6です。パーティションごとのサンプル数は sampleSizePerPartition です。
sketched.foreach { case (idx, n, sample) =>
if (fraction * n > sampleSizePerPartition) {
imbalancedPartitions += idx
} else {
// The weight is 1 over the sampling probability.
val weight = (n.toDouble / sample.length).toFloat
for (key <- sample) {
candidates += ((key, weight))
}
}
}
取得したサンプル数を計算し、それがsampleSizePerPartitionより大きい場合は、データを均等にするために再描画する必要があります。
データの最終的なパーティショニングはrangeBoundsに基づいて決定されます。
def getPartition(key: Any): Int = {
val k = key.asInstanceOf[K]
var partition = 0
if (rangeBounds.length <= 128) {
// If we have less than 128 partitions naive search
while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
partition += 1
}
} else {
// Determine which binary search method to use only once.
partition = binarySearch(rangeBounds, k)
// binarySearch either returns the match location or -[insertion point]-1
if (partition < 0) {
partition = -partition-1
}
if (partition > rangeBounds.length) {
partition = rangeBounds.length
}
}
if (ascending) {
partition
} else {
rangeBounds.length - partition
}
}
パーティション数が128未満の場合はトラバーサルを使い、128以上の場合はバイセクションを使います。
パーティショナーのカスタマイズ
パーティショナーを継承し、メソッドを実装します:
numPartitions: 返されるパーティションを指定します;
getPartition: キーが計算され、特定のロジックに従ってデータが分割されます;
class CustomPartitioner(numParts: Int) extends Partitioner{
override def numPartitions: Int = numParts
override def getPartition(key: Any): Int = {
var partition = key.hashCode() % numPartitions
if (partition < 0 ){
partition + numPartitions
}else{
partition
}
}
}





