Hbase使用MapReduce编程导出数据到HDFS

  • Post author:
  • Post category:其他


废话少说,直接上代码!

package cn.com.oozie.demo;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.KeyValue;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

import org.apache.hadoop.hbase.mapreduce.TableMapper;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class HbaseExportHtable {

public static class HbaseMapper extends TableMapper<Text, Text> {

@Override

public void map(ImmutableBytesWritable row, Result values,

Context context) throws IOException {

StringBuilder sb = new StringBuilder();

String str = “&&”;

for (KeyValue keyValue : values.raw()) {

sb.append(new String(keyValue.getValue())).append(str);

}

try {

context.write(new Text(row.get()),

new Text(sb.substring(0, sb.length()-2)));

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

public static class HbaseReducer

extends Reducer<Text, Text, NullWritable, Text> {

public void reduce(Text key, Iterable<Text> values, Context context) {

Text result = new Text();

StringBuilder sb = new StringBuilder();

String str = “&&”;

for (Text text : values) {

result = text;

}

result=new Text(sb.append(key.toString()).append(str).append(result.toString()).toString());

try {

context.write(NullWritable.get(), result);

} catch (IOException e) {

// TODO Auto-generated catch block

e.printStackTrace();

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

public static void main(String[] args) throws IOException,

ClassNotFoundException, InterruptedException {

Configuration conf = HBaseConfiguration.create();

/*conf.set(“hbase.zookeeper.property.clientPort”, “2181”);

conf.set(“hbase.zookeeper.quorum”,

“hadoop-master-node,hadoop-slave1-node,hadoop-slave2-node”);

conf.set(“user.name”, “hadoop”);

conf.set(“groups.name”, “hadoop”);

conf.set(“mapred.job.tracker”, “hadoop-master-node:8021”);*/

Job job = new Job(conf, “HbaseExportHtable”);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(NullWritable.class);

job.setReducerClass(HbaseReducer.class);

job.setJarByClass(HbaseExportHtable.class);

Scan scan = new Scan();

TableMapReduceUtil.initTableMapperJob(“phone_http_log”, scan, HbaseMapper.class,

Text.class, Text.class, job);

FileOutputFormat.setOutputPath(job, new Path(

“hdfs://hadoop-master:8020/user/oozie/outputdir”));

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

按照列族遍历,导出数据到文本!

转载于:https://www.cnblogs.com/QuestionsZhang/p/3375311.html