MapReduce Design Patterns(chapter 2 (part 2))(三)

  • Post author:
  • Post category:其他



Median and standard deviation


中值和标准差的计算比前面的例子复杂一点。因为这种运算是非关联的,它们不是那么容易的能从

combiner

中获益。中值是将数据集一分为两等份的数值类型,一份比中值大,一部分比中值小。这需要数据集按顺序完成清洗。数据必须是排序的,但存在一定障碍,因为

MapReduce

不会根据

values

排序。



方差告诉我们数据跟平均值之间的差异程度。这就要求我们之前要先找到平均值。执行这种操作最容易的方法是复制值得列表到临时列表,以便找到中值,或者再一次迭代集合所有数据得到标准差。对大的数据量,这种实现可能导致

java

堆空间的问题,引文每个输入组的每个值都放进内存处理。下一个例子就是针对这种问题的。



问题:给出用户评论,计算一天中每个小时评论长度的中值和标准差。




Mapper code



Mapper

会处理每条输入记录计算一天内每个小时评论长度的中值(貌似事实不是这样)。输出键是小时,输出值是评论长度。






public static class


MedianStdDevMapper


extends






Mapper


<


Object


,


Text


,


IntWritable


,


IntWritable


> {







private



IntWritable outHour


=



new



IntWritable


();






private



IntWritable outCommentLength


=



new



IntWritable


();






private final static



SimpleDateFormat frmt


=



new



SimpleDateFormat


(





“yyyy-MM-dd’T’HH:mm:ss.SSS”


);






public


void



map


(


Object key


,


Text value


,


Context context


)






throws



IOException


,


InterruptedException


{






Map


<


String


,


String


>


parsed


=


transformXmlToMap


(


value


.


toString


());





// Grab the “CreationDate” field,





// since it is what we are grouping by





String strDate


=


parsed


.


get


(


“CreationDate”


);





// Grab the comment to find the length





String text


=


parsed


.


get


(


“Text”


);





// get the hour this comment was posted in





Date creationDate


=


frmt


.


parse


(


strDate


);





outHour


.


set


(


creationDate


.


getHours


());





// set the comment length





outCommentLength


.


set


(


text


.


length


());





// write out the user ID with min max dates and count





context


.


write


(


outHour


,


outCommentLength


);




}



}




Reducer code



Reducer

会迭代给定值得集合,并把每个值加到内存列表里。同时也会计算一个动态的

sum



count

。迭代之后,评论长度被排序,以便找出中值。如果数量是偶数,中值是中间两个数的平均值。下面,根据动态的

sum



count

计算出平均值,然后迭代排序的列表计算出标准差。每个数跟平均值的差的平方累加求和保存在一个动态

sum

中,这个

sum

的平方根就是标准差。最后输出

key

,中值和标准差。






public static class


MedianStdDevReducer


extends






Reducer


<


IntWritable


,


IntWritable


,





IntWritable


,


MedianStdDevTuple


> {







private



MedianStdDevTuple result


=



new



MedianStdDevTuple


();






private



ArrayList


<


Float


>


commentLengths


=



new



ArrayList


<


Float


>();






public


void



reduce


(


IntWritable key


,


Iterable


<


IntWritable


>


values


,





Context context


)



throws



IOException


,


InterruptedException


{







float



sum


=


0


;






float



count


=


0


;





commentLengths


.


clear


();





result


.


setStdDev


(


0


);





// Iterate through all input values for this key






for



(


IntWritable val


:


values


) {






commentLengths


.


add


((



float



)


val


.


get


());





sum


+=


val


.


get


();





++


count


;




}




// sort commentLengths to calculate median





Collections


.


sort


(


commentLengths


);





// if commentLengths is an even value, average middle two elements






if



(


count


%


2


==


0


) {






result


.


setMedian


((


commentLengths


.


get


((



int



)


count


/


2





1


) +





commentLengths


.


get


((



int



)


count


/


2


)) /


2.0f


);





}



else



{






// else, set median to middle value





result


.


setMedian


(


commentLengths


.


get


((



int



)


count


/


2


));




}




// calculate standard deviation






float



mean


=


sum


/


count


;






float



sumOfSquares


=


0.0f


;






for



(


Float f


:


commentLengths


) {






sumOfSquares


+= (


f





mean


) * (


f





mean


);




}




result


.


setStdDev


((



float



)


Math


.


sqrt


(


sumOfSquares


/ (


count





1


)));





context


.


write


(


key


,


result


);




}



}




Combiner optimization

。这种情况下不能用

combiner



reducer

需要所有的值去计算中值和标准差。因为

combiner

仅仅在一个

map

本地处理中间键值对。计算完整的中值,和标准值是不可能的。下面的例子是一种复杂一点的使用自定义的

combiner

的实现。



Memory-conscious median and standard deviation


下面的例子跟前一个不同,并减少了内存的使用。把值放进列表会导致很多重复的元素。一种去重的方法是标记元素的个数。例如,对于列表

< 1, 1, 1, 1, 2, 2, 3,4, 5, 5, 5 >,

可以用一个

sorted map

保存:

(1



4, 2



2, 3



1, 4



1, 5



3)

。核心的原理是一样的:

reduce

阶段会迭代所有值并放入内存数据结构中。数据结构和搜索的方式是改变的地方。

Map

很大程度上减少了内存的使用。前一个例子使用

list

,复杂度为

O



n

),

n

是评论条数,本例使用

map

,使用键值对,为

O



max



m

)),

m

是评论长度的最大值。作为额外的补充,

combiner

的使用能帮助聚合评论长度的数目,并通过

writable

对象输出

reducer

端将要使用的这个

map




问题:同前一个。




Mapper code



Mapper

处理输入记录,输出键是小时,值是

sortedmapwritable

对象,包含一个元素:评论长度和计数

1.

这个

map



reducer



combiner

里多处用到。






public static class


MedianStdDevMapper


extends






Mapper


<


lObject


,


Text


,


IntWritable


,


SortedMapWritable


> {







private



IntWritable commentLength


=



new



IntWritable


();






private static final



LongWritable ONE


=



new



LongWritable


(


1


);






private



IntWritable outHour


=



new



IntWritable


();






private final static



SimpleDateFormat frmt


=



new



SimpleDateFormat


(





“yyyy-MM-dd’T’HH:mm:ss.SSS”


);






public


void



map


(


Object key


,


Text value


,


Context context


)






throws



IOException


,


InterruptedException


{






Map


<


String


,


String


>


parsed


=


transformXmlToMap


(


value


.


toString


());





// Grab the “CreationDate” field,





// since it is what we are grouping by





String strDate


=


parsed


.


get


(


“CreationDate”


);





// Grab the comment to find the length





String text


=


parsed


.


get


(


“Text”


);





// Get the hour this comment was posted in





Date creationDate


=


frmt


.


parse


(


strDate


);





outHour


.


set


(


creationDate


.


getHours


());





commentLength


.


set


(


text


.


length


());





SortedMapWritable outCommentLength


=



new



SortedMapWritable


();





outCommentLength


.


put


(


commentLength


,


ONE


);





// Write out the user ID with min max dates and count





context


.


write


(


outHour


,


outCommentLength


);




}



}




Reducer code



Reducer

通过迭代上面的

map

生成一个大的

treemap



key

是评论长度,

value

是这个长度的评论的数目。



迭代以后,中值被计算出来。中值的索引由评论总数除以

2

得出。然后迭代

treemap



entrySet

找到

key

,需满足条件为:

previousCommentCount



medianIndex < commentCount

,把

treeMap

的值加到每一步迭代的评论里。一旦条件满足,如果有偶数条评论且中值索引等于前一条评论的,中值取前一个的长度和当前长度的平均值。否则,中值就是当前评论的长度。



接下来,再一次迭代

treemap,

计算出平方和,确保相关联的评论长度和数目相乘。标准差就根据平方和算出来了。中值和标准差就随着

key

一块输出。





public static class


MedianStdDevReducer


extends






Reducer


<


IntWritable


,


SortedMapWritable


,





IntWritable


,


MedianStdDevTuple


> {







private



MedianStdDevTuple result


=



new



MedianStdDevTuple


();






private



TreeMap


<


Integer


,


Long


>


commentLengthCounts


=






new



TreeMap


<


Integer


,


Long


>();






public


void



reduce


(


IntWritable key


,


Iterable


<


SortedMapWritable


>


values


,





Context context


)



throws



IOException


,


InterruptedException


{







float



sum


=


0


;






long



totalComments


=


0


;





commentLengthCounts


.


clear


();





result


.


setMedian


(


0


);





result


.


setStdDev


(


0


);






for



(


SortedMapWritable v


:


values


) {







for



(


Entry


<


WritableComparable


,


Writable


>


entry


:


v


.


entrySet


()) {







int



length


= ((


IntWritable


)


entry


.


getKey


()).


get


();






long



count


= ((


LongWritable


)


entry


.


getValue


()).


get


();





totalComments


+=


count


;





sum


+=


length


*


count


;





Long storedCount


=


commentLengthCounts


.


get


(


length


);






if



(


storedCount


==



null



) {






commentLengthCounts


.


put


(


length


,


count


);





}



else



{






commentLengthCounts


.


put


(


length


,


storedCount


+


count


);




}



}



}





long



medianIndex


=


totalComments


/


2L


;






long



previousComments


=


0


;






long



comments


=


0


;






int



prevKey


=


0


;






for



(


Entry


<


Integer


,


Long


>


entry


:


commentLengthCounts


.


entrySet


()) {






comments


=


previousComments


+


entry


.


getValue


();






if



(


previousComments





medianIndex


&&


medianIndex


<


comments


) {







if



(


totalComments


%


2


==


0


&&


previousComments


==


medianIndex


) {






result


.


setMedian


((



float



) (


entry


.


getKey


() +


prevKey


) /


2.0f


);





}



else



{






result


.


setMedian


(


entry


.


getKey


());




}





break



;




}




previousComments


=


comments


;





prevKey


=


entry


.


getKey


();




}




// calculate standard deviation






float



mean


=


sum


/


totalComments


;






float



sumOfSquares


=


0.0f


;






for



(


Entry


<


Integer


,


Long


>


entry


:


commentLengthCounts


.


entrySet


()) {






sumOfSquares


+= (


entry


.


getKey


() –


mean


) * (


entry


.


getKey


() –


mean


) *





entry


.


getValue


();




}




result


.


setStdDev


((



float



)


Math


.


sqrt


(


sumOfSquares


/ (


totalComments





1


)));





context


.


write


(


key


,


result


);




}



}




Combiner optimization

。跟前面的例子不同,这里

combiner

的逻辑跟

reducer

不同。

Reducer

计算中值和标准差,而

combiner

对每个本地

map

的中间键值对聚合

sortedMapWritable

条目。代码解析这些条目并在本地

map

聚合它们,这跟前面部分的

reducer

代码是相同的。这里用一个

hashmap

替换

treemap

,因为不需要排序,且

hashmap

更快。

Reducer

使用

map

计算中值和标准差,而

combiner

是用

sortedMapWritable

序列化为

reduce

阶段做准备。






public static class


MedianStdDevCombiner


extends






Reducer


<


IntWritable


,


SortedMapWritable


,


IntWritable


,


SortedMapWritable


> {







protected


void



reduce


(


IntWritable key


,





Iterable


<


SortedMapWritable


>


values


,


Context context


)






throws



IOException


,


InterruptedException


{






SortedMapWritable outValue


=



new



SortedMapWritable


();






for



(


SortedMapWritable v


:


values


) {







for



(


Entry


<


WritableComparable


,


Writable


>


entry


:


v


.


entrySet


()) {






LongWritable count


= (


LongWritable


)


outValue


.


get


(


entry


.


getKey


());






if



(


count


!=



null



) {






count


.


set


(


count


.


get


()





+ ((


LongWritable


)


entry


.


getValue


()).


get


());





}



else



{






outValue


.


put


(


entry


.


getKey


(),



new



LongWritable


(





((


LongWritable


)


entry


.


getValue


()).


get


()));




}



}



}




context


.


write


(


key


,


outValue


);




}



}




Data flow diagram

。图

2-4

展示了例子的数据流程图






Figure 2-4. Data flow for the standard deviation example