一、二次排序

  • Post author:
  • Post category:其他


1、软件版本

jdk jdk1.7.0_67
hadoop hadoop-2.6.0
spark spark-1.4.0-bin-hadoop2.6

分别下载上述源码,并进行配置,打开/etc/profile设置环境变量如下:

#set java env
export JAVA_HOME=/opt/java/jdk1.7.0_67
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH

#set hadoop env
export HADOOP_HOME=/opt/hadoop/hadoop-2.6.0
export PATH=${HADOOP_HOME}/bin:${HADOP_HOME}/sbin:${JAVA_HOME}:$PATH

#set spark env
export SPARK_HOME=/opt/spark-1.4.0-bin-hadoop2.6
export PATH=${SPARK_HOME}/bin:$PATH

2、使用hadoop的Map,Reduce解决二次排序问题

输入数据sample_input.txt 文件如下:
2000,12,04,10
2000,11,01,20
2000,12,02,-20
2000,11,07,30
2000,11,24,-40
2012,12,21,30
2012,12,22,-20
2012,12,23,60
2012,12,24,70
2012,12,25,10
2013,01,22,80
2013,01,23,90
2013,01,24,70
2013,01,20,-10

一行中数组分别代表年,月,日,温度。我们希望输出“年-月”的温度,并将温度按升序或降序排序,例如:2012-12:5,10,35,45,

代码如下:

DateTemperaturePair类定义

package chap01;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

//持久存储定制数据类型  将日期和温度定义为java对象
public class DateTemperaturePair implements Writable, WritableComparable<DateTemperaturePair> {

	private Text yearMonth = new Text();
	private Text day = new Text();
	private IntWritable temperature = new IntWritable();

	public Text getYearMonth() {
		return yearMonth;
	}

	public void setYearMonth(String yearMonth) {
		this.yearMonth.set(yearMonth);
		;
	}

	public Text getDay() {
		return day;
	}

	public void setDay(String day) {
		this.day.set(day);
		;
	}

	public IntWritable getTemperature() {
		return temperature;
	}

	public void setTemperature(int temperature) {
		this.temperature.set(temperature);
		;
	}

	public DateTemperaturePair() {
		// TODO Auto-generated constructor stub
	}

	public DateTemperaturePair(String yearmonth, String day, int tempertature) {
		this.yearMonth.set(yearmonth);
		this.day.set(day);
		this.temperature.set(tempertature);
	}

	public DateTemperaturePair read(DataInput in) throws IOException {
		DateTemperaturePair pair = new DateTemperaturePair();
		pair.readFields(in);
		return pair;
	}

	@Override
	public int compareTo(DateTemperaturePair pair) {
		// TODO Auto-generated method stub
		int compareValue = this.yearMonth.compareTo(pair.getYearMonth());
		if (compareValue == 0) {
			compareValue = temperature.compareTo(pair.getTemperature());
		}
		// return compareValue; // to sort ascending
		return -1 * compareValue; // to sort descending
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		// TODO Auto-generated method stub
		yearMonth.readFields(in);
		day.readFields(in);
		temperature.readFields(in);
	}

	@Override
	public void write(DataOutput out) throws IOException {
		// TODO Auto-generated method stub
		yearMonth.write(out);
		day.write(out);
		temperature.write(out);
	}

	@Override
	public boolean equals(Object o) {
		// TODO Auto-generated method stub
		 if (this == o) {
	           return true;
	        }
	        if (o == null || getClass() != o.getClass()) {
	           return false;
	        }

	        DateTemperaturePair that = (DateTemperaturePair) o;
	        if (temperature != null ? !temperature.equals(that.temperature) : that.temperature != null) {
	           return false;
	        }
	        if (yearMonth != null ? !yearMonth.equals(that.yearMonth) : that.yearMonth != null) {
	           return false;
	        }

	        return true;
	}

	@Override
	public String toString() {
		// TODO Auto-generated method stub
		StringBuilder builder = new StringBuilder();
		builder.append("DateTemperaturePair{yearMonth=");
		builder.append(yearMonth);
		builder.append(", day=");
		builder.append(day);
		builder.append(", temperature=");
		builder.append(temperature);
		builder.append("}");
		return builder.toString();
	}

	   @Override
	    public int hashCode() {
	        int result = yearMonth != null ? yearMonth.hashCode() : 0;
	        result = 31 * result + (temperature != null ? temperature.hashCode() : 0);
	        return result;
	    }

}




DateTemperaturePartitioner类定义

package chap01;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class DateTemperaturePartitioner extends Partitioner<DateTemperaturePair, Text>{

	@Override
	public int getPartition(DateTemperaturePair pair, Text text, int numberOfPartitions) {
		// TODO Auto-generated method stub
		return Math.abs(pair.getYearMonth().hashCode()%numberOfPartitions);
	}

}


SecondarySortMapper类定义

package chap01;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import shapeless.newtype;

public class SecondarySortMapper extends Mapper<LongWritable, Text, DateTemperaturePair, Text> {
	private final Text theTemPerature = new Text();
	private final DateTemperaturePair pair = new DateTemperaturePair();

	@Override
	protected void map(LongWritable key, Text value,
			Mapper<LongWritable, Text, DateTemperaturePair, Text>.Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		String[] token = line.split(",");
		
		String yearMonth=token[0]+token[1];   //得到输入数据的年月
		String day=token[2];                  //得到输入数据的日
		int temperature=Integer.parseInt(token[3]); //得到温度
		pair.setYearMonth(yearMonth);
		pair.setDay(day);
		pair.setTemperature(temperature);
		theTemPerature.set(token[3]);
		// TODO Auto-generated method stub
		context.write(pair, theTemPerature);
	}
}



SecondarySortReducer类定义

package chap01;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SecondarySortReducer extends Reducer<DateTemperaturePair, Text, Text, Text>{
@Override
protected void reduce(DateTemperaturePair key, Iterable<Text> values,
		Reducer<DateTemperaturePair, Text, Text, Text>.Context context) throws IOException, InterruptedException {
	// TODO Auto-generated method stub
	StringBuilder builder=new StringBuilder();
	for(Text value:values) {  //values表示所有的年月下的温度
		builder.append(value.toString());
		builder.append(",");
	}
	context.write(key.getYearMonth(), new Text(builder.toString()));
}
}


DateTemperatureGroupingComparator类定义

package chap01;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class DateTemperatureGroupingComparator extends WritableComparator {
	public DateTemperatureGroupingComparator() {
		// TODO Auto-generated constructor stub
		super(DateTemperaturePair.class,true);
	}
	@Override
	public int compare(WritableComparable wc1,WritableComparable wc2) {
		// TODO Auto-generated method stub
		DateTemperaturePair pair=(DateTemperaturePair) wc1;
		DateTemperaturePair pair2=(DateTemperaturePair) wc2;
		return pair.getYearMonth().compareTo(pair2.getYearMonth());
	}

}


SecondarySortDriver类定义

package chap01;



import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import org.apache.log4j.Logger;



//驱动器类,定义输入、输出、并注册插件类
public class SecondarySortDriver extends Configured implements Tool{
	
	private static Logger theLogger=Logger.getLogger(SecondarySortDriver.class);

@Override
public int run(String[] args) throws Exception {
	// TODO Auto-generated method stub
	Configuration conf=getConf();
	Job job=new Job();
	job.setJarByClass(SecondarySortDriver.class);
	job.setJobName("SecondarySortDriver");
	
	Path inputPath=new Path(args[0]);  //第一个参数表示输入路径
	Path outPath=new Path(args[1]);    //第一个参数表示输出路径
	FileInputFormat.setInputPaths(job, new Path(args[0]));
	FileOutputFormat.setOutputPath(job, new Path(args[1]));
	
	job.setOutputKeyClass(DateTemperaturePair.class);
	job.setOutputValueClass(Text.class);
	
	job.setMapperClass(SecondarySortMapper.class);  //设置map类
	job.setReducerClass(SecondarySortReducer.class);  //设置reduce类
	job.setPartitionerClass(DateTemperaturePartitioner.class);    //设置分区器
	job.setGroupingComparatorClass(DateTemperatureGroupingComparator.class);   //对键分组
	
	boolean status=job.waitForCompletion(true);
	theLogger.info("run():status="+status);
	return status?0:1;
}
public static int submitJob(String[] args) throws Exception {
	int returnStatus=ToolRunner.run(new SecondarySortDriver(),args);
	return returnStatus ;
}
public static void main(String[] args) throws Exception {
	if(args.length!=2) {
		theLogger.warn("SecondarySortDriver <input-dir> <output-dir>");
		throw new IllegalArgumentException("SecondarySortDriver <input-dir> <output-dir>");
	}
	int returnStatus=submitJob(args);
	theLogger.info("returnStatus="+returnStatus);
	System.exit(returnStatus);
}



}




运行脚本run.sh定义:

export JAVA_HOME=/opt/java/jdk1.7.0_67
export HADOOP_HOME=/opt/hadoop/hadoop-2.6.0
export SRC_HOME=/home/hadoop/hadoop/test

#需要提前将代导出为jar文件
export APP_JAR=$SRC_HOME/dist/src.jar
INPUT=/secondary_sort/input/
OUTPUT=/secondary_sort/output/
$HADOOP_HOME/bin/hadoop fs -rm  -r $OUTPUT
PROG=chap01.SecondarySortDriver  #
$HADOOP_HOME/bin/hadoop jar $APP_JAR $PROG $INPUT $OUTPUT

执行run.sh前,首先进入目录/opt/hadoop/hadoop-2.6.0/sbin,执行./start-all.sh脚本。

然后执行run.sh脚本。

进入目录/opt/hadoop/hadoop-2.6.0/,执行命令./hadoop fs -cat /secondary_sort/output/part-r-00000,查看运行结果如下:

201301	90,80,70,-10,
201212	70,60,30,10,-20,
200012	10,-20,
200011	30,20,-40,

3、使用spark解决二次排序

定义包chap01.spark


SecondarySortUsingCombineByKey函数定义:

package chap01.spark;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;

import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;

import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;import org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.intervalLiteral_return;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.Function2;

import scala.Tuple2;
import util.DataStructures;
import util.SparkUtil;


public class SecondarySortUsingCombineByKey {
	public static void main(String[] args) {
		if(args.length<2) {
			System.err.println("Usage: SecondarySortUsingCombineByKey <input> <output>");
			System.exit(1);
		}
		String inputPath=args[0];
		System.out.println("inputPath="+inputPath);
		String outPath=args[1];
		System.out.println("outputPath="+outPath);

		//通过新建JavaSparkontext对象链接到Spark master
		final JavaSparkContext ctx=SparkUtil.createJavaSparkContext("SecondarySorting");
		
		//使用ctx创建RDD弹性数据集
		JavaRDD<String> lines=ctx.textFile(inputPath,1);
		
		System.out.println("=== DEBUG STEP-4 ===");
		
		//键值对映射
		JavaPairRDD<String, Tuple2<Integer, Integer>>pairs=lines.mapToPair(new PairFunction<String, String, Tuple2<Integer,Integer>>(){

			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			@Override
			public Tuple2<String, Tuple2<Integer, Integer>> call(String s) throws Exception {
				// TODO Auto-generated method stub
				String[] tokens=s.split(",");
				System.out.println(tokens[0]+","+tokens[1]+","+tokens[2]);
				Tuple2<Integer, Integer>timevalue=new Tuple2<Integer, Integer>(Integer.parseInt(tokens[1]), Integer.parseInt(tokens[2]));
				
				return new Tuple2<String, Tuple2<Integer,Integer>>(tokens[0], timevalue);
			}
		});
		
		//验证和调试上述映射
		List<Tuple2<String, Tuple2<Integer,Integer>>> output=pairs.collect();
		for(Tuple2 t:output) {
			Tuple2<Integer, Integer>timevalue=(Tuple2<Integer, Integer>) t._2;
			System.out.println(t._1+","+timevalue._1+","+timevalue._2);
		}
		
		//使用combineByKey()实现规约操作,使用combineByKey需要实现3个函数,f1,f2,f3
		//最后使用combineByKey(f1,f2,f3)调用。
		  //    function 1: create a combiner data structure 
        //    function 2: merge a value into a combined data structure
        //    function 3: merge two combiner data structures
		
		//Function1:create a combiner data structure
		Function<Tuple2<Integer, Integer>, SortedMap<Integer, Integer>>createCombiner
			=new Function<Tuple2<Integer,Integer>, SortedMap<Integer,Integer>>() {
				@Override
				public SortedMap<Integer, Integer> call(Tuple2<Integer, Integer> x) throws Exception {
					// TODO Auto-generated method stub
					Integer time =x._1;
					Integer value=x._2;
					SortedMap<Integer, Integer>map=new TreeMap<>();
					map.put(time, value);
					return map;
				}
			};
		//Function2:merge a value into a combined data structure
			Function2<SortedMap<Integer, Integer>, Tuple2<Integer, Integer>,SortedMap<Integer, Integer>>mergeValue=new Function2<SortedMap<Integer,Integer>, Tuple2<Integer,Integer>, SortedMap<Integer,Integer>>() {
				@Override
				public SortedMap<Integer, Integer> call(SortedMap<Integer, Integer> map, Tuple2<Integer, Integer> x)
						throws Exception {
					// TODO Auto-generated method stub
					Integer time=x._1;
					Integer value=x._2;
					map.put(time, value);
					
					return map;
				}
			};
		 // Function 3: merge two combiner data structures
		Function2<SortedMap<Integer, Integer>, SortedMap<Integer, Integer>,SortedMap<Integer,Integer>>	mergeCombiners
		     =new Function2<SortedMap<Integer,Integer>, SortedMap<Integer,Integer>, SortedMap<Integer,Integer>>() {
			@Override
			public SortedMap<Integer, Integer> call(SortedMap<Integer, Integer> map1,
					SortedMap<Integer, Integer> map2) throws Exception {
				// TODO Auto-generated method stub
				if(map1.size()<map2.size())
					return DataStructures.merge(map1, map2);
				else
					return DataStructures.merge(map2, map1);
				
			}
			};
			
		//	create sorted (time, value)	
			JavaPairRDD<String, SortedMap<Integer, Integer>>combined
				=pairs.combineByKey(createCombiner, mergeValue, mergeCombiners);
			
			//打印JavaPairRDD
			System.out.println("== DeBUG STEP -6 ===");
			List<Tuple2<String, SortedMap<Integer, Integer>>> output2=combined.collect();
			for(Tuple2<String, SortedMap<Integer, Integer>> t:output2) {
				String name=t._1;
				SortedMap<Integer, Integer>map=t._2;
				System.out.println(name);
				System.out.println(map);
			}
			combined.saveAsTextFile(outPath);
			ctx.close();
			System.exit(1);
	}
}

定义包util.


DataStructures函数定义

package util;

import java.util.SortedMap;

public class DataStructures {
	
	//将两个Map合并成一个
	public static SortedMap<Integer, Integer> merge(final SortedMap<Integer, Integer> smaller,final SortedMap<Integer, Integer> larger){
		for(Integer key:smaller.keySet()) {
			Integer valueFormLargerMap=larger.get(key);
			if(valueFormLargerMap==null) {
				larger.put(key, smaller.get(valueFormLargerMap));
			}else {
				int mergeValue=valueFormLargerMap+smaller.get(key);
				larger.put(key, mergeValue);
			}
		}
		return larger;
	}
}


sparkUtil定义:

package util;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;

public class SparkUtil {
	public static JavaSparkContext createJavaSparkContext() {
		return new JavaSparkContext();
	}
	
	public static JavaSparkContext createJavaSparkContext(String sparkMasterURL,String applicationName) {
		JavaSparkContext ctx=new JavaSparkContext(sparkMasterURL,applicationName);
		return ctx;
	}
	
	public static JavaSparkContext createJavaSparkContext(String applicationName) {
		SparkConf conf=new SparkConf().setAppName(applicationName);
		JavaSparkContext ctx=new JavaSparkContext(conf);
		return ctx;
	}
	
	public static String version() {
		return "2.0.0";
	}
}



脚本run_secondarysorting.sh定义

export JAVA_HOME=/opt/java/jdk1.7.0_67
export SPARK_HOME=/opt/spark-1.4.0-bin-hadoop2.6/
export SPARK_MASTER=spark://rocky:7077
export SRC_HOME=/home/hadoop/hadoop/test
export OUTPUT=/secondary_sort/output/
#需要提前将代导出为jar文件
export APP_JAR=$SRC_HOME/dist/test.jar

echo "=========="$APP_JAR
INPUT=/home/hadoop/hadoop/test/data/time_series.txt
OUTPUT=/mp/output/
PROG=chap01.spark.SecondarySortUsingCombineByKey  
$SPARK_HOME/bin/spark-submit \
	--master $SPARK_MASTER \
	--class $PROG \
	--executor-memory 2g \
	$APP_JAR \
	$INPUT $OUTPUT
#$HADOOP_HOME/bin/hadoop jar $APP_JAR $PROG $INPUT $OUTPUT

echo $HADOOP_HOME

时间序列输入time_series.txt为:

p,4,40
p,6,20
x,2,9
y,2,5
x,1,3
y,1,7
y,3,1
x,3,6
z,1,4
z,2,8
z,3,7
z,4,0
p,1,10
p,3,60

运行结果:

=== DEBUG STEP-4 ===

p,4,40
p,6,20
x,2,9
y,2,5
x,1,3
y,1,7
y,3,1
x,3,6
z,1,4
z,2,8
z,3,7
z,4,0
p,1,10
p,3,60

== DeBUG STEP -6 ===

z
{1=4, 2=8, 3=7, 4=0}
p
{1=10, 3=60, 4=40, 6=20}
x
{1=3, 2=9, 3=6}
y
{1=7, 2=5, 3=1}



版权声明:本文为dengjiaxing0321原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。