1、软件版本
jdk | jdk1.7.0_67 |
hadoop | hadoop-2.6.0 |
spark | spark-1.4.0-bin-hadoop2.6 |
分别下载上述源码,并进行配置,打开/etc/profile设置环境变量如下:
#set java env
export JAVA_HOME=/opt/java/jdk1.7.0_67
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
#set hadoop env
export HADOOP_HOME=/opt/hadoop/hadoop-2.6.0
export PATH=${HADOOP_HOME}/bin:${HADOP_HOME}/sbin:${JAVA_HOME}:$PATH
#set spark env
export SPARK_HOME=/opt/spark-1.4.0-bin-hadoop2.6
export PATH=${SPARK_HOME}/bin:$PATH
2、使用hadoop的Map,Reduce解决二次排序问题
2000,12,04,10
2000,11,01,20
2000,12,02,-20
2000,11,07,30
2000,11,24,-40
2012,12,21,30
2012,12,22,-20
2012,12,23,60
2012,12,24,70
2012,12,25,10
2013,01,22,80
2013,01,23,90
2013,01,24,70
2013,01,20,-10
一行中数组分别代表年,月,日,温度。我们希望输出“年-月”的温度,并将温度按升序或降序排序,例如:2012-12:5,10,35,45,
DateTemperaturePair类定义
package chap01;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
//持久存储定制数据类型 将日期和温度定义为java对象
public class DateTemperaturePair implements Writable, WritableComparable<DateTemperaturePair> {
private Text yearMonth = new Text();
private Text day = new Text();
private IntWritable temperature = new IntWritable();
public Text getYearMonth() {
return yearMonth;
}
public void setYearMonth(String yearMonth) {
this.yearMonth.set(yearMonth);
;
}
public Text getDay() {
return day;
}
public void setDay(String day) {
this.day.set(day);
;
}
public IntWritable getTemperature() {
return temperature;
}
public void setTemperature(int temperature) {
this.temperature.set(temperature);
;
}
public DateTemperaturePair() {
// TODO Auto-generated constructor stub
}
public DateTemperaturePair(String yearmonth, String day, int tempertature) {
this.yearMonth.set(yearmonth);
this.day.set(day);
this.temperature.set(tempertature);
}
public DateTemperaturePair read(DataInput in) throws IOException {
DateTemperaturePair pair = new DateTemperaturePair();
pair.readFields(in);
return pair;
}
@Override
public int compareTo(DateTemperaturePair pair) {
// TODO Auto-generated method stub
int compareValue = this.yearMonth.compareTo(pair.getYearMonth());
if (compareValue == 0) {
compareValue = temperature.compareTo(pair.getTemperature());
}
// return compareValue; // to sort ascending
return -1 * compareValue; // to sort descending
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
yearMonth.readFields(in);
day.readFields(in);
temperature.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
yearMonth.write(out);
day.write(out);
temperature.write(out);
}
@Override
public boolean equals(Object o) {
// TODO Auto-generated method stub
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DateTemperaturePair that = (DateTemperaturePair) o;
if (temperature != null ? !temperature.equals(that.temperature) : that.temperature != null) {
return false;
}
if (yearMonth != null ? !yearMonth.equals(that.yearMonth) : that.yearMonth != null) {
return false;
}
return true;
}
@Override
public String toString() {
// TODO Auto-generated method stub
StringBuilder builder = new StringBuilder();
builder.append("DateTemperaturePair{yearMonth=");
builder.append(yearMonth);
builder.append(", day=");
builder.append(day);
builder.append(", temperature=");
builder.append(temperature);
builder.append("}");
return builder.toString();
}
@Override
public int hashCode() {
int result = yearMonth != null ? yearMonth.hashCode() : 0;
result = 31 * result + (temperature != null ? temperature.hashCode() : 0);
return result;
}
}
DateTemperaturePartitioner类定义
package chap01;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class DateTemperaturePartitioner extends Partitioner<DateTemperaturePair, Text>{
@Override
public int getPartition(DateTemperaturePair pair, Text text, int numberOfPartitions) {
// TODO Auto-generated method stub
return Math.abs(pair.getYearMonth().hashCode()%numberOfPartitions);
}
}
SecondarySortMapper类定义
package chap01;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import shapeless.newtype;
public class SecondarySortMapper extends Mapper<LongWritable, Text, DateTemperaturePair, Text> {
private final Text theTemPerature = new Text();
private final DateTemperaturePair pair = new DateTemperaturePair();
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, DateTemperaturePair, Text>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] token = line.split(",");
String yearMonth=token[0]+token[1]; //得到输入数据的年月
String day=token[2]; //得到输入数据的日
int temperature=Integer.parseInt(token[3]); //得到温度
pair.setYearMonth(yearMonth);
pair.setDay(day);
pair.setTemperature(temperature);
theTemPerature.set(token[3]);
// TODO Auto-generated method stub
context.write(pair, theTemPerature);
}
}
SecondarySortReducer类定义
package chap01;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SecondarySortReducer extends Reducer<DateTemperaturePair, Text, Text, Text>{
@Override
protected void reduce(DateTemperaturePair key, Iterable<Text> values,
Reducer<DateTemperaturePair, Text, Text, Text>.Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
StringBuilder builder=new StringBuilder();
for(Text value:values) { //values表示所有的年月下的温度
builder.append(value.toString());
builder.append(",");
}
context.write(key.getYearMonth(), new Text(builder.toString()));
}
}
DateTemperatureGroupingComparator类定义
package chap01;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class DateTemperatureGroupingComparator extends WritableComparator {
public DateTemperatureGroupingComparator() {
// TODO Auto-generated constructor stub
super(DateTemperaturePair.class,true);
}
@Override
public int compare(WritableComparable wc1,WritableComparable wc2) {
// TODO Auto-generated method stub
DateTemperaturePair pair=(DateTemperaturePair) wc1;
DateTemperaturePair pair2=(DateTemperaturePair) wc2;
return pair.getYearMonth().compareTo(pair2.getYearMonth());
}
}
SecondarySortDriver类定义
package chap01;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
//驱动器类,定义输入、输出、并注册插件类
public class SecondarySortDriver extends Configured implements Tool{
private static Logger theLogger=Logger.getLogger(SecondarySortDriver.class);
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf=getConf();
Job job=new Job();
job.setJarByClass(SecondarySortDriver.class);
job.setJobName("SecondarySortDriver");
Path inputPath=new Path(args[0]); //第一个参数表示输入路径
Path outPath=new Path(args[1]); //第一个参数表示输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setOutputKeyClass(DateTemperaturePair.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(SecondarySortMapper.class); //设置map类
job.setReducerClass(SecondarySortReducer.class); //设置reduce类
job.setPartitionerClass(DateTemperaturePartitioner.class); //设置分区器
job.setGroupingComparatorClass(DateTemperatureGroupingComparator.class); //对键分组
boolean status=job.waitForCompletion(true);
theLogger.info("run():status="+status);
return status?0:1;
}
public static int submitJob(String[] args) throws Exception {
int returnStatus=ToolRunner.run(new SecondarySortDriver(),args);
return returnStatus ;
}
public static void main(String[] args) throws Exception {
if(args.length!=2) {
theLogger.warn("SecondarySortDriver <input-dir> <output-dir>");
throw new IllegalArgumentException("SecondarySortDriver <input-dir> <output-dir>");
}
int returnStatus=submitJob(args);
theLogger.info("returnStatus="+returnStatus);
System.exit(returnStatus);
}
}
运行脚本run.sh定义:
export JAVA_HOME=/opt/java/jdk1.7.0_67
export HADOOP_HOME=/opt/hadoop/hadoop-2.6.0
export SRC_HOME=/home/hadoop/hadoop/test
#需要提前将代导出为jar文件
export APP_JAR=$SRC_HOME/dist/src.jar
INPUT=/secondary_sort/input/
OUTPUT=/secondary_sort/output/
$HADOOP_HOME/bin/hadoop fs -rm -r $OUTPUT
PROG=chap01.SecondarySortDriver #
$HADOOP_HOME/bin/hadoop jar $APP_JAR $PROG $INPUT $OUTPUT
执行run.sh前,首先进入目录/opt/hadoop/hadoop-2.6.0/sbin,执行./start-all.sh脚本。
然后执行run.sh脚本。
进入目录/opt/hadoop/hadoop-2.6.0/,执行命令./hadoop fs -cat /secondary_sort/output/part-r-00000,查看运行结果如下:
201301 90,80,70,-10,
201212 70,60,30,10,-20,
200012 10,-20,
200011 30,20,-40,
3、使用spark解决二次排序
SecondarySortUsingCombineByKey函数定义:
package chap01.spark;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;import org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.intervalLiteral_return;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;
import util.DataStructures;
import util.SparkUtil;
public class SecondarySortUsingCombineByKey {
public static void main(String[] args) {
if(args.length<2) {
System.err.println("Usage: SecondarySortUsingCombineByKey <input> <output>");
System.exit(1);
}
String inputPath=args[0];
System.out.println("inputPath="+inputPath);
String outPath=args[1];
System.out.println("outputPath="+outPath);
//通过新建JavaSparkontext对象链接到Spark master
final JavaSparkContext ctx=SparkUtil.createJavaSparkContext("SecondarySorting");
//使用ctx创建RDD弹性数据集
JavaRDD<String> lines=ctx.textFile(inputPath,1);
System.out.println("=== DEBUG STEP-4 ===");
//键值对映射
JavaPairRDD<String, Tuple2<Integer, Integer>>pairs=lines.mapToPair(new PairFunction<String, String, Tuple2<Integer,Integer>>(){
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Tuple2<Integer, Integer>> call(String s) throws Exception {
// TODO Auto-generated method stub
String[] tokens=s.split(",");
System.out.println(tokens[0]+","+tokens[1]+","+tokens[2]);
Tuple2<Integer, Integer>timevalue=new Tuple2<Integer, Integer>(Integer.parseInt(tokens[1]), Integer.parseInt(tokens[2]));
return new Tuple2<String, Tuple2<Integer,Integer>>(tokens[0], timevalue);
}
});
//验证和调试上述映射
List<Tuple2<String, Tuple2<Integer,Integer>>> output=pairs.collect();
for(Tuple2 t:output) {
Tuple2<Integer, Integer>timevalue=(Tuple2<Integer, Integer>) t._2;
System.out.println(t._1+","+timevalue._1+","+timevalue._2);
}
//使用combineByKey()实现规约操作,使用combineByKey需要实现3个函数,f1,f2,f3
//最后使用combineByKey(f1,f2,f3)调用。
// function 1: create a combiner data structure
// function 2: merge a value into a combined data structure
// function 3: merge two combiner data structures
//Function1:create a combiner data structure
Function<Tuple2<Integer, Integer>, SortedMap<Integer, Integer>>createCombiner
=new Function<Tuple2<Integer,Integer>, SortedMap<Integer,Integer>>() {
@Override
public SortedMap<Integer, Integer> call(Tuple2<Integer, Integer> x) throws Exception {
// TODO Auto-generated method stub
Integer time =x._1;
Integer value=x._2;
SortedMap<Integer, Integer>map=new TreeMap<>();
map.put(time, value);
return map;
}
};
//Function2:merge a value into a combined data structure
Function2<SortedMap<Integer, Integer>, Tuple2<Integer, Integer>,SortedMap<Integer, Integer>>mergeValue=new Function2<SortedMap<Integer,Integer>, Tuple2<Integer,Integer>, SortedMap<Integer,Integer>>() {
@Override
public SortedMap<Integer, Integer> call(SortedMap<Integer, Integer> map, Tuple2<Integer, Integer> x)
throws Exception {
// TODO Auto-generated method stub
Integer time=x._1;
Integer value=x._2;
map.put(time, value);
return map;
}
};
// Function 3: merge two combiner data structures
Function2<SortedMap<Integer, Integer>, SortedMap<Integer, Integer>,SortedMap<Integer,Integer>> mergeCombiners
=new Function2<SortedMap<Integer,Integer>, SortedMap<Integer,Integer>, SortedMap<Integer,Integer>>() {
@Override
public SortedMap<Integer, Integer> call(SortedMap<Integer, Integer> map1,
SortedMap<Integer, Integer> map2) throws Exception {
// TODO Auto-generated method stub
if(map1.size()<map2.size())
return DataStructures.merge(map1, map2);
else
return DataStructures.merge(map2, map1);
}
};
// create sorted (time, value)
JavaPairRDD<String, SortedMap<Integer, Integer>>combined
=pairs.combineByKey(createCombiner, mergeValue, mergeCombiners);
//打印JavaPairRDD
System.out.println("== DeBUG STEP -6 ===");
List<Tuple2<String, SortedMap<Integer, Integer>>> output2=combined.collect();
for(Tuple2<String, SortedMap<Integer, Integer>> t:output2) {
String name=t._1;
SortedMap<Integer, Integer>map=t._2;
System.out.println(name);
System.out.println(map);
}
combined.saveAsTextFile(outPath);
ctx.close();
System.exit(1);
}
}
定义包util.
DataStructures函数定义
package util;
import java.util.SortedMap;
public class DataStructures {
//将两个Map合并成一个
public static SortedMap<Integer, Integer> merge(final SortedMap<Integer, Integer> smaller,final SortedMap<Integer, Integer> larger){
for(Integer key:smaller.keySet()) {
Integer valueFormLargerMap=larger.get(key);
if(valueFormLargerMap==null) {
larger.put(key, smaller.get(valueFormLargerMap));
}else {
int mergeValue=valueFormLargerMap+smaller.get(key);
larger.put(key, mergeValue);
}
}
return larger;
}
}
sparkUtil定义:
package util;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
public class SparkUtil {
public static JavaSparkContext createJavaSparkContext() {
return new JavaSparkContext();
}
public static JavaSparkContext createJavaSparkContext(String sparkMasterURL,String applicationName) {
JavaSparkContext ctx=new JavaSparkContext(sparkMasterURL,applicationName);
return ctx;
}
public static JavaSparkContext createJavaSparkContext(String applicationName) {
SparkConf conf=new SparkConf().setAppName(applicationName);
JavaSparkContext ctx=new JavaSparkContext(conf);
return ctx;
}
public static String version() {
return "2.0.0";
}
}
脚本run_secondarysorting.sh定义
export JAVA_HOME=/opt/java/jdk1.7.0_67
export SPARK_HOME=/opt/spark-1.4.0-bin-hadoop2.6/
export SPARK_MASTER=spark://rocky:7077
export SRC_HOME=/home/hadoop/hadoop/test
export OUTPUT=/secondary_sort/output/
#需要提前将代导出为jar文件
export APP_JAR=$SRC_HOME/dist/test.jar
echo "=========="$APP_JAR
INPUT=/home/hadoop/hadoop/test/data/time_series.txt
OUTPUT=/mp/output/
PROG=chap01.spark.SecondarySortUsingCombineByKey
$SPARK_HOME/bin/spark-submit \
--master $SPARK_MASTER \
--class $PROG \
--executor-memory 2g \
$APP_JAR \
$INPUT $OUTPUT
#$HADOOP_HOME/bin/hadoop jar $APP_JAR $PROG $INPUT $OUTPUT
echo $HADOOP_HOME
时间序列输入time_series.txt为:
p,4,40
p,6,20
x,2,9
y,2,5
x,1,3
y,1,7
y,3,1
x,3,6
z,1,4
z,2,8
z,3,7
z,4,0
p,1,10
p,3,60
运行结果:
=== DEBUG STEP-4 ===
p,4,40
p,6,20
x,2,9
y,2,5
x,1,3
y,1,7
y,3,1
x,3,6
z,1,4
z,2,8
z,3,7
z,4,0
p,1,10
p,3,60
== DeBUG STEP -6 ===
z
{1=4, 2=8, 3=7, 4=0}
p
{1=10, 3=60, 4=40, 6=20}
x
{1=3, 2=9, 3=6}
y
{1=7, 2=5, 3=1}