import com.suning.recmmentation.content.video.online.habse.HbaseDaoImpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URISyntaxException;
/**
* mr读HBase snapshot,由于yarn和HBase用的是两个hadoop集群,所以需要配置值HBase集群的hdfs地址
*/
public class MrSnapshot {
private static final String JOB_NAME = "XXX";
/**
* HBase表快照名
*/
private static String snapshotName = "XXX";
/**
* 结果输出目录,请自行规划
*/
private static Path outputpath = new Path("/XXX");
/**
* 读取snapshot生成的临时目录,目标集群地址,需要全路径
*/
private static Path tmpRestorePath = new Path("hdfs://fsName/XXX");
public static void deletePath() throws IOException {
Configuration conf = HBaseConfiguration.create();
final FileSystem fs = FileSystem.get(conf);
fs.deleteOnExit(outputpath);
fs.close();
}
public static void main(String[] args) throws InterruptedException, ClassNotFoundException, IOException, URISyntaxException {
deletePath();
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.rootdir", "hdfs://fsName/hbase");
conf.set("dfs.nameservices", "XXX");
conf.set("dfs.ha.namenodes.XXX", "nn1,nn2");
conf.set("dfs.namenode.rpc-address.XXX.nn1", "XXX");
conf.set("dfs.namenode.rpc-address.XXX.nn2", "XXX");
conf.set("dfs.client.failover.proxy.provider.XXX", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
conf.set("hbase.zookeeper.quorum", "XXX");
conf.set("hbase.zookeeper.property.clientPort", "XXX");
conf.set("zookeeper.znode.parent", "XXX");
Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false); //离线任务必须设置
Job job = Job.getInstance(conf, JOB_NAME);
job.setJarByClass(MrSnapshot.class);
job.setNumReduceTasks(1);
// 配置目标集群fs.defaultFS
job.getConfiguration().set("fs.defaultFS", "hdfs://XXX");
TableMapReduceUtil.initTableSnapshotMapperJob(
snapshotName, // input snapshot name
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper
Text.class, // mapper output key
Text.class, // mapper output value
job,
true,
tmpRestorePath); //中间临时HDFS路径,自己规划,必须是全路径,HBase集群的目录
// 设置任务输出路径
// 配置common集群fs.defaultFS
job.getConfiguration().set("fs.defaultFS", "hdfs://XXX");
FileOutputFormat.setOutputPath(job, outputpath); // 数据结果在公共集群
if (!job.waitForCompletion(true)) {
throw new IOException("error with job!");
}
}
/**
* 设置Map方法,过滤表中符合要求的数据。比如列族 f 下的 f1 列的所有数据。
*/
public static class MyMapper extends TableMapper<Text, Text> {
@Override
public void map(ImmutableBytesWritable row, Result value, Context context)
throws InterruptedException, IOException {
}
}
}
import com.suning.recmmentation.content.video.online.habse.HbaseDaoImpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.rdd.RDD;
import scala.Tuple2;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class SparkSnapshot {
public static void main(String[] args) throws IOException {
final SparkConf sparkConf = new SparkConf().setAppName("XXX");
final SparkContext sc = new SparkContext(sparkConf);
Configuration conf = new Configuration();
conf.set("hbase.rootdir", "hdfs://XXX/XXX");
conf.set("dfs.nameservices", "XXX,XXX");
conf.set("dfs.ha.namenodes.XXX", "nn1,nn2");
conf.set("dfs.namenode.rpc-address.XXX.nn1", "XXX");
conf.set("dfs.namenode.rpc-address.XXX.nn2", "XXX");
conf.set("dfs.client.failover.proxy.provider.XXX", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
conf.set("hbase.zookeeper.quorum", "XXX");
conf.set("hbase.zookeeper.property.clientPort", "XXX");
conf.set("zookeeper.znode.parent", "XXX");
Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false); //必须设置为false
conf.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray()));
String snapshotName = "XXX";
TableSnapshotInputFormatImpl.setInput(conf, snapshotName, new Path("hdfs://XXX/XXX"));
final RDD<Tuple2<ImmutableBytesWritable, Result>> hbaseRDD = sc.newAPIHadoopRDD(conf, TableSnapshotInputFormat.class, ImmutableBytesWritable.class, Result.class);
// do something...
sc.stop();
}
}
版权声明:本文为chen20111原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。