blog

スパークのパーティショニング戦略

Sparkのデータは、クラスタ内の各マシンに分散してパーティショニングすることができ、Sparkは、データパーティショニングのためのクラスを使用するキーに基づいて行われます:...

Jun 21, 2020 · 4 min. read
シェア

Sparkのデータはパーティショニングしてクラスタマシンに分散させることができ、SparkはPartitionerクラスを使ってキーに基づいてデータをパーティショニングします。

Sparkには、Partitionerを継承したHashPartitioner/RangePartitionerの2つのパーティショニングクラスがあります。パーティショニングロジックをカスタマイズしたい場合は、Partitionerを継承した独自のクラスを作成できます。

パーティショニング戦略の指定

rdd.partitionBy(new CustomPartitioner(10)).saveAsTextFile("...")
rdd1 = rdd1.partitionBy(new CustomPartitioner(10))
rdd2.partitionBy(new CustomPartitioner(10)).join(rdd1)

デフォルトのパーティショニング戦略

Spark では、パーティショニング・アルゴリズムが指定されていない場合、デフォルトで HashPartitioner が使用されます。

メソッドdefaultPartitionerは、シャッフルによって生成されるRDDパーティションの数と、どのパーティショニング・アルゴリズムを使用するかを取得するためにPartitionerで定義されています。

パーティション数の取得

val defaultNumPartitions = 
if (rdd.context.conf.contains("spark.default.parallelism")) { 
 rdd.context.defaultParallelism
} else { 
 rdds.map(_.partitions.length).max
}

シャッフルの並列数がプログラムで指定されている場合、これがパーティション数として使用されます。指定されていない場合は、すべての上流RDDの最大パーティション数が結果のRDDパーティション数として使用されます。

パーティショニング戦略の取得

val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
val hasMaxPartitioner: Option[RDD[_]] = if (hasPartitioner.nonEmpty) {
 Some(hasPartitioner.maxBy(_.partitions.length))
} else {
 None
}
if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
 defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) {
 hasMaxPartitioner.get.partitioner.get
} else {
 new HashPartitioner(defaultNumPartitions)
}

すべての上流RDDからパーティショナーを含むRDDをフィルタリングし、その中から最大のパーティション番号を持つRDDを選択します。そのパーティション番号が、他のすべてのRDDの最大のパーティション番号の1桁以内であれば、そのパーティショナーをパーティショニング・ポリシーとして使用します。そうでない場合は、デフォルトでHashPartitionerがパーティショニング・ポリシーとして使用されます。

パーティショニングポリシーの配信

あるシャッフルで使用されたパーティショニング・ポリシーは次のシャッフルに引き継がれません。つまり、あるシャッフルがあるパーティショニング・ポリシーを使用してRDDを生成すると、次のシャッフルではそのRDDはNoneパーティショニング・ポリシーを持つため、必要に応じてパーティショニング・ポリシーを再設定する必要があります。

HashPartitioner

HashPartitionerは、ハッシュ値を計算するためにキーに、モードを取るためにパーティションの数、0未満の場合は、パーティションの数を追加します。また、サポートキーがNULLの場合、NULLは0パーティションを返します。

HashPartitioner 特定のパーティションが特に大量のデータを持ち、その結果、データが偏ってしまうケースがあります。

def getPartition(key: Any): Int = key match { 
 case null => 0 
 case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
def nonNegativeMod(x: Int, mod: Int): Int = { 
 val rawMod = x % mod 
 rawMod + (if (rawMod < 0) mod else 0)
}

RangePartitioner

RangePartitionerはデータ量が均一になるようにすることができ、それは各パーティションに均等に分散されたデータの一定の範囲になります。RangePartitionerでは、RDDデータをサンプリングして分析し、サンプリングされたデータの分布に応じてキーの範囲を調整し、各パーティションの最大キーを計算してrangeBoundsを生成します。

RangePartitionerは主にsortByKeyのようなデータソート関連のAPIで使用されます。

//samplePointsPerPartitionHint=20
val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)

採取サンプル数は、sample20*パーティション数のsampleSizePerPartition、または1e6より大きい場合は1e6です。パーティションごとのサンプル数は sampleSizePerPartition です。

sketched.foreach { case (idx, n, sample) =>
 if (fraction * n > sampleSizePerPartition) {
 imbalancedPartitions += idx
 } else {
 // The weight is 1 over the sampling probability.
 val weight = (n.toDouble / sample.length).toFloat
 for (key <- sample) {
 candidates += ((key, weight))
 }
 }
}

取得したサンプル数を計算し、それがsampleSizePerPartitionより大きい場合は、データを均等にするために再描画する必要があります。

データの最終的なパーティショニングはrangeBoundsに基づいて決定されます。

def getPartition(key: Any): Int = {
 val k = key.asInstanceOf[K]
 var partition = 0
 if (rangeBounds.length <= 128) {
 // If we have less than 128 partitions naive search
 while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
 partition += 1
 }
 } else {
 // Determine which binary search method to use only once.
 partition = binarySearch(rangeBounds, k)
 // binarySearch either returns the match location or -[insertion point]-1
 if (partition < 0) {
 partition = -partition-1
 }
 if (partition > rangeBounds.length) {
 partition = rangeBounds.length
 }
 }
 if (ascending) {
 partition
 } else {
 rangeBounds.length - partition
 }
 }

パーティション数が128未満の場合はトラバーサルを使い、128以上の場合はバイセクションを使います。

パーティショナーのカスタマイズ

パーティショナーを継承し、メソッドを実装します:

numPartitions: 返されるパーティションを指定します;

getPartition: キーが計算され、特定のロジックに従ってデータが分割されます;

class CustomPartitioner(numParts: Int) extends Partitioner{
 override def numPartitions: Int = numParts
 override def getPartition(key: Any): Int = {
 var partition = key.hashCode() % numPartitions
 if (partition < 0 ){
 partition + numPartitions
 }else{
 partition
 }
 }
}
Read next

CocosCreatorスクリプト・プロパティの個人設定 - ドロップダウン・リスト・プロパティ, スライダー・プロパティ

プロパティを宣言するには、cc.Classで定義されたフィールドにプロパティ名とプロパティ・パラメータを入力します。ここで宣言されたプロパティはCocos CreatorエディタのProperty Inspectorパネルで読み込んだり、編集したりすることができます。プロパティ・インスペクタでドロップダウン・リストとして表示するには、プロパティのタイプをEnumerationに設定し、デフォルト値をEnumerationに設定します。設定するには

Jun 21, 2020 · 3 min read