平均と分散のMap-reduce
平均、分散式の数字の束は、私はそれが具体的にどのようにマップと削減関数を設計することは非常に明確であると信じて、あなたはそれぞれ、n個の数字があると仮定し、式から始めることができますa1,a2.......anとすると、平均m = / n、分散s = [^2 + ^2 + ....+^2] / n
分散式をS=[+m^m*n-2*m* ] / nに展開すると、これに基づいて、マップ側の入力を( )に、出力を( )に設定することができ、n1は各ワーカーが計算した数値の数、sum1はそれらの数値の合計、var1はそれらの数値の2乗の合計を表します。
この情報を受け取った後、リデュース側はすべての入力 n1,n2...を加算します。sum n,sum1,sum2....sumを加算してsumを得、次に平均m=sum/nを得、var1,var2...を加算してvarを得ます。加算して var を求め、次に *** 分散 S=/n, REDUCE 出力)。
アルゴリズムコードはmrjobの実装に基づいています。
from mrjob.job import MRJob
class MRmean(MRJob):
def __init__(self, *args, **kwargs):
super(MRmean, self).__init__(*args, **kwargs)
self.inCount = 0
self.inSum = 0
self.inSqSum = 0
def map(self, key, val): #needs exactly 2 arguments
if False: yield
inVal = float(val)
self.inCount += 1
self.inSum += inVal #各要素の和
self.inSqSum += inVal*inVal #各要素の2乗を求める
def map_final(self):
mn = self.inSum/self.inCount
mnSq =self.inSqSum/self.inCount
yield (1, [self.inCount, mn, mnSq]) #map出力に基づいているが、ここではmn=sum1/mn,mnsq=var1/mn
def reduce(self, key, packedValues):
cumVal=0.0; cumSumSq=0.0; cumN=0.0
for valArr in packedValues: #get values from streamed inputs マップ側の出力を解析する
nj = float(valArr[0])
cumN += nj
cumVal += nj*float(valArr[1])
cumSumSq += nj*float(valArr[2])
mean = cumVal/cumN
var = (cumSumSq - 2*mean*cumVal + cumN*mean*mean)/cumN
yield (mean, var) #emit mean and var reduceの出力は
def steps(self):
return ([self.mr(mapper=self.map, mapper_final=self.map_final,\
reducer=self.reduce,)])
if __name__ == '__main__':
MRmean.run()




