自定义InputFromat
适用场景
- 无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案
分析
小文件的优化无非以下几种方式:
- 1、在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
- 2、在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并
- 3、在mapreduce处理时,可采用combineInputFormat提高效率
实现思路
程序的核心机制:
- 自定义一个InputFormat
- 重写RecordReader,实现一次读取一个完整文件封装为bytesWritable类型
- 在输出时使用SequenceFileOutPutFormat输出合并文件
代码实现:
自定义InputFromat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class DiyInputFormat extends FileInputFormat {
@Override
public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
DiyRecordReader diyRecordReader = new DiyRecordReader();
diyRecordReader.initialize(inputSplit, taskAttemptContext);
return diyRecordReader;
}
@Override
protected boolean isSplitable(JobContext context, Path filename) {
//设置不切分
return false;
}
}
自定义RecordReader
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class DiyRecordReader extends RecordReader {
private boolean next = false;
private FileSplit inputSplit1;
private Configuration configuration;
private BytesWritable bytesWritable = new BytesWritable();
//初始化操作
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
inputSplit1 = (FileSplit) inputSplit;
configuration = taskAttemptContext.getConfiguration();
}
//获得下一条数据
//系统默认读取一行数据 根据项目需求一次获取所有数据,类型为:BytesWritable
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!next) {
//实例文件系统
FileSystem fileSystem = FileSystem.get(configuration);
FSDataInputStream open = fileSystem.open(inputSplit1.getPath());
byte[] buf = new byte[(int) inputSplit1.getLength()];
//读取所有数据
IOUtils.readFully(open, buf, 0, buf.length);
//写入到BytesWritable类型的变量中
bytesWritable.set(buf, 0, buf.length);
next = true;
return true;
}
return false;
}
//获取当前的key
@Override
public Object getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}
//获取当前的value
@Override
public Object getCurrentValue() throws IOException, InterruptedException {
return bytesWritable;
}
//获取任务进度
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
//用于关闭系统内的资源
@Override
public void close() throws IOException {
}
}
定义map
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class DiyMap extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
@Override
protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
//获取文件路径并且输出
FileSplit inputSplit = (FileSplit) context.getInputSplit();
context.write(new Text(inputSplit.getPath().getName()), value);
}
}
定义主类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
public class DiyDriver {
public static void main(String[] args) throws Exception {
Job job = Job.getInstance(new Configuration(), "DiyInputFormat_001");
job.setJarByClass(DiyDriver.class);
job.setMapperClass(DiyMap.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
//设置文件读取的Format
job.setInputFormatClass(DiyInputFormat.class);
//设置读取数据的路径
DiyInputFormat.addInputPath(job, new Path("文件读取路径"));
//设置输出格式
job.setOutputFormatClass(SequenceFileOutputFormat.class);
//设置输出的路径
SequenceFileOutputFormat.setOutputPath(job, new Path("文件输出路径"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
看了不点赞,坤坤咬你蛋!!!
版权声明:本文为qq_45798620原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。