Median and standard deviation
中值和标准差的计算比前面的例子复杂一点。因为这种运算是非关联的,它们不是那么容易的能从
combiner
中获益。中值是将数据集一分为两等份的数值类型,一份比中值大,一部分比中值小。这需要数据集按顺序完成清洗。数据必须是排序的,但存在一定障碍,因为
MapReduce
不会根据
values
排序。
方差告诉我们数据跟平均值之间的差异程度。这就要求我们之前要先找到平均值。执行这种操作最容易的方法是复制值得列表到临时列表,以便找到中值,或者再一次迭代集合所有数据得到标准差。对大的数据量,这种实现可能导致
java
堆空间的问题,引文每个输入组的每个值都放进内存处理。下一个例子就是针对这种问题的。
问题:给出用户评论,计算一天中每个小时评论长度的中值和标准差。
Mapper code
。
Mapper
会处理每条输入记录计算一天内每个小时评论长度的中值(貌似事实不是这样)。输出键是小时,输出值是评论长度。
public static class
MedianStdDevMapper
extends
Mapper
<
Object
,
Text
,
IntWritable
,
IntWritable
> {
private
IntWritable outHour
=
new
IntWritable
();
private
IntWritable outCommentLength
=
new
IntWritable
();
private final static
SimpleDateFormat frmt
=
new
SimpleDateFormat
(
“yyyy-MM-dd’T’HH:mm:ss.SSS”
);
public
void
map
(
Object key
,
Text value
,
Context context
)
throws
IOException
,
InterruptedException
{
Map
<
String
,
String
>
parsed
=
transformXmlToMap
(
value
.
toString
());
// Grab the “CreationDate” field,
// since it is what we are grouping by
String strDate
=
parsed
.
get
(
“CreationDate”
);
// Grab the comment to find the length
String text
=
parsed
.
get
(
“Text”
);
// get the hour this comment was posted in
Date creationDate
=
frmt
.
parse
(
strDate
);
outHour
.
set
(
creationDate
.
getHours
());
// set the comment length
outCommentLength
.
set
(
text
.
length
());
// write out the user ID with min max dates and count
context
.
write
(
outHour
,
outCommentLength
);
}
}
Reducer code
。
Reducer
会迭代给定值得集合,并把每个值加到内存列表里。同时也会计算一个动态的
sum
和
count
。迭代之后,评论长度被排序,以便找出中值。如果数量是偶数,中值是中间两个数的平均值。下面,根据动态的
sum
和
count
计算出平均值,然后迭代排序的列表计算出标准差。每个数跟平均值的差的平方累加求和保存在一个动态
sum
中,这个
sum
的平方根就是标准差。最后输出
key
,中值和标准差。
public static class
MedianStdDevReducer
extends
Reducer
<
IntWritable
,
IntWritable
,
IntWritable
,
MedianStdDevTuple
> {
private
MedianStdDevTuple result
=
new
MedianStdDevTuple
();
private
ArrayList
<
Float
>
commentLengths
=
new
ArrayList
<
Float
>();
public
void
reduce
(
IntWritable key
,
Iterable
<
IntWritable
>
values
,
Context context
)
throws
IOException
,
InterruptedException
{
float
sum
=
0
;
float
count
=
0
;
commentLengths
.
clear
();
result
.
setStdDev
(
0
);
// Iterate through all input values for this key
for
(
IntWritable val
:
values
) {
commentLengths
.
add
((
float
)
val
.
get
());
sum
+=
val
.
get
();
++
count
;
}
// sort commentLengths to calculate median
Collections
.
sort
(
commentLengths
);
// if commentLengths is an even value, average middle two elements
if
(
count
%
2
==
0
) {
result
.
setMedian
((
commentLengths
.
get
((
int
)
count
/
2
–
1
) +
commentLengths
.
get
((
int
)
count
/
2
)) /
2.0f
);
}
else
{
// else, set median to middle value
result
.
setMedian
(
commentLengths
.
get
((
int
)
count
/
2
));
}
// calculate standard deviation
float
mean
=
sum
/
count
;
float
sumOfSquares
=
0.0f
;
for
(
Float f
:
commentLengths
) {
sumOfSquares
+= (
f
–
mean
) * (
f
–
mean
);
}
result
.
setStdDev
((
float
)
Math
.
sqrt
(
sumOfSquares
/ (
count
–
1
)));
context
.
write
(
key
,
result
);
}
}
Combiner optimization
。这种情况下不能用
combiner
。
reducer
需要所有的值去计算中值和标准差。因为
combiner
仅仅在一个
map
本地处理中间键值对。计算完整的中值,和标准值是不可能的。下面的例子是一种复杂一点的使用自定义的
combiner
的实现。
Memory-conscious median and standard deviation
下面的例子跟前一个不同,并减少了内存的使用。把值放进列表会导致很多重复的元素。一种去重的方法是标记元素的个数。例如,对于列表
< 1, 1, 1, 1, 2, 2, 3,4, 5, 5, 5 >,
可以用一个
sorted map
保存:
(1
→
4, 2
→
2, 3
→
1, 4
→
1, 5
→
3)
。核心的原理是一样的:
reduce
阶段会迭代所有值并放入内存数据结构中。数据结构和搜索的方式是改变的地方。
Map
很大程度上减少了内存的使用。前一个例子使用
list
,复杂度为
O
(
n
),
n
是评论条数,本例使用
map
,使用键值对,为
O
(
max
(
m
)),
m
是评论长度的最大值。作为额外的补充,
combiner
的使用能帮助聚合评论长度的数目,并通过
writable
对象输出
reducer
端将要使用的这个
map
。
问题:同前一个。
Mapper code
。
Mapper
处理输入记录,输出键是小时,值是
sortedmapwritable
对象,包含一个元素:评论长度和计数
1.
这个
map
在
reducer
和
combiner
里多处用到。
public static class
MedianStdDevMapper
extends
Mapper
<
lObject
,
Text
,
IntWritable
,
SortedMapWritable
> {
private
IntWritable commentLength
=
new
IntWritable
();
private static final
LongWritable ONE
=
new
LongWritable
(
1
);
private
IntWritable outHour
=
new
IntWritable
();
private final static
SimpleDateFormat frmt
=
new
SimpleDateFormat
(
“yyyy-MM-dd’T’HH:mm:ss.SSS”
);
public
void
map
(
Object key
,
Text value
,
Context context
)
throws
IOException
,
InterruptedException
{
Map
<
String
,
String
>
parsed
=
transformXmlToMap
(
value
.
toString
());
// Grab the “CreationDate” field,
// since it is what we are grouping by
String strDate
=
parsed
.
get
(
“CreationDate”
);
// Grab the comment to find the length
String text
=
parsed
.
get
(
“Text”
);
// Get the hour this comment was posted in
Date creationDate
=
frmt
.
parse
(
strDate
);
outHour
.
set
(
creationDate
.
getHours
());
commentLength
.
set
(
text
.
length
());
SortedMapWritable outCommentLength
=
new
SortedMapWritable
();
outCommentLength
.
put
(
commentLength
,
ONE
);
// Write out the user ID with min max dates and count
context
.
write
(
outHour
,
outCommentLength
);
}
}
Reducer code
。
Reducer
通过迭代上面的
map
生成一个大的
treemap
,
key
是评论长度,
value
是这个长度的评论的数目。
迭代以后,中值被计算出来。中值的索引由评论总数除以
2
得出。然后迭代
treemap
的
entrySet
找到
key
,需满足条件为:
previousCommentCount
≤
medianIndex < commentCount
,把
treeMap
的值加到每一步迭代的评论里。一旦条件满足,如果有偶数条评论且中值索引等于前一条评论的,中值取前一个的长度和当前长度的平均值。否则,中值就是当前评论的长度。
接下来,再一次迭代
treemap,
计算出平方和,确保相关联的评论长度和数目相乘。标准差就根据平方和算出来了。中值和标准差就随着
key
一块输出。
public static class
MedianStdDevReducer
extends
Reducer
<
IntWritable
,
SortedMapWritable
,
IntWritable
,
MedianStdDevTuple
> {
private
MedianStdDevTuple result
=
new
MedianStdDevTuple
();
private
TreeMap
<
Integer
,
Long
>
commentLengthCounts
=
new
TreeMap
<
Integer
,
Long
>();
public
void
reduce
(
IntWritable key
,
Iterable
<
SortedMapWritable
>
values
,
Context context
)
throws
IOException
,
InterruptedException
{
float
sum
=
0
;
long
totalComments
=
0
;
commentLengthCounts
.
clear
();
result
.
setMedian
(
0
);
result
.
setStdDev
(
0
);
for
(
SortedMapWritable v
:
values
) {
for
(
Entry
<
WritableComparable
,
Writable
>
entry
:
v
.
entrySet
()) {
int
length
= ((
IntWritable
)
entry
.
getKey
()).
get
();
long
count
= ((
LongWritable
)
entry
.
getValue
()).
get
();
totalComments
+=
count
;
sum
+=
length
*
count
;
Long storedCount
=
commentLengthCounts
.
get
(
length
);
if
(
storedCount
==
null
) {
commentLengthCounts
.
put
(
length
,
count
);
}
else
{
commentLengthCounts
.
put
(
length
,
storedCount
+
count
);
}
}
}
long
medianIndex
=
totalComments
/
2L
;
long
previousComments
=
0
;
long
comments
=
0
;
int
prevKey
=
0
;
for
(
Entry
<
Integer
,
Long
>
entry
:
commentLengthCounts
.
entrySet
()) {
comments
=
previousComments
+
entry
.
getValue
();
if
(
previousComments
≤
medianIndex
&&
medianIndex
<
comments
) {
if
(
totalComments
%
2
==
0
&&
previousComments
==
medianIndex
) {
result
.
setMedian
((
float
) (
entry
.
getKey
() +
prevKey
) /
2.0f
);
}
else
{
result
.
setMedian
(
entry
.
getKey
());
}
break
;
}
previousComments
=
comments
;
prevKey
=
entry
.
getKey
();
}
// calculate standard deviation
float
mean
=
sum
/
totalComments
;
float
sumOfSquares
=
0.0f
;
for
(
Entry
<
Integer
,
Long
>
entry
:
commentLengthCounts
.
entrySet
()) {
sumOfSquares
+= (
entry
.
getKey
() –
mean
) * (
entry
.
getKey
() –
mean
) *
entry
.
getValue
();
}
result
.
setStdDev
((
float
)
Math
.
sqrt
(
sumOfSquares
/ (
totalComments
–
1
)));
context
.
write
(
key
,
result
);
}
}
Combiner optimization
。跟前面的例子不同,这里
combiner
的逻辑跟
reducer
不同。
Reducer
计算中值和标准差,而
combiner
对每个本地
map
的中间键值对聚合
sortedMapWritable
条目。代码解析这些条目并在本地
map
聚合它们,这跟前面部分的
reducer
代码是相同的。这里用一个
hashmap
替换
treemap
,因为不需要排序,且
hashmap
更快。
Reducer
使用
map
计算中值和标准差,而
combiner
是用
sortedMapWritable
序列化为
reduce
阶段做准备。
public static class
MedianStdDevCombiner
extends
Reducer
<
IntWritable
,
SortedMapWritable
,
IntWritable
,
SortedMapWritable
> {
protected
void
reduce
(
IntWritable key
,
Iterable
<
SortedMapWritable
>
values
,
Context context
)
throws
IOException
,
InterruptedException
{
SortedMapWritable outValue
=
new
SortedMapWritable
();
for
(
SortedMapWritable v
:
values
) {
for
(
Entry
<
WritableComparable
,
Writable
>
entry
:
v
.
entrySet
()) {
LongWritable count
= (
LongWritable
)
outValue
.
get
(
entry
.
getKey
());
if
(
count
!=
null
) {
count
.
set
(
count
.
get
()
+ ((
LongWritable
)
entry
.
getValue
()).
get
());
}
else
{
outValue
.
put
(
entry
.
getKey
(),
new
LongWritable
(
((
LongWritable
)
entry
.
getValue
()).
get
()));
}
}
}
context
.
write
(
key
,
outValue
);
}
}
Data flow diagram
。图
2-4
展示了例子的数据流程图
Figure 2-4. Data flow for the standard deviation example