spark与MR读取HBase snapshot

  • Post author:
  • Post category:其他


import com.suning.recmmentation.content.video.online.habse.HbaseDaoImpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URISyntaxException;

/**
 * mr读HBase snapshot,由于yarn和HBase用的是两个hadoop集群,所以需要配置值HBase集群的hdfs地址
 */
public class MrSnapshot {
    private static final String JOB_NAME = "XXX";
    /**
     * HBase表快照名
     */
    private static String snapshotName = "XXX";
    /**
     * 结果输出目录,请自行规划
     */
    private static Path outputpath = new Path("/XXX");
    /**
     * 读取snapshot生成的临时目录,目标集群地址,需要全路径
     */
    private static Path tmpRestorePath = new Path("hdfs://fsName/XXX");

    public static void deletePath() throws IOException {
        Configuration conf = HBaseConfiguration.create();
        final FileSystem fs = FileSystem.get(conf);
        fs.deleteOnExit(outputpath);
        fs.close();
    }

    public static void main(String[] args) throws InterruptedException, ClassNotFoundException, IOException, URISyntaxException {
        deletePath();
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.rootdir", "hdfs://fsName/hbase");
        conf.set("dfs.nameservices", "XXX");
        conf.set("dfs.ha.namenodes.XXX", "nn1,nn2");
        conf.set("dfs.namenode.rpc-address.XXX.nn1", "XXX");
        conf.set("dfs.namenode.rpc-address.XXX.nn2", "XXX");
        conf.set("dfs.client.failover.proxy.provider.XXX", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
        conf.set("hbase.zookeeper.quorum", "XXX");
        conf.set("hbase.zookeeper.property.clientPort", "XXX");
        conf.set("zookeeper.znode.parent", "XXX");
        Scan scan = new Scan();
        scan.setCaching(500);
        scan.setCacheBlocks(false);      //离线任务必须设置
        Job job = Job.getInstance(conf, JOB_NAME);
        job.setJarByClass(MrSnapshot.class);
        job.setNumReduceTasks(1);
        // 配置目标集群fs.defaultFS
        job.getConfiguration().set("fs.defaultFS", "hdfs://XXX");
        TableMapReduceUtil.initTableSnapshotMapperJob(
                snapshotName,       // input snapshot name
                scan,               // Scan instance to control CF and attribute selection
                MyMapper.class,     // mapper
                Text.class,         // mapper output key
                Text.class,         // mapper output value
                job,
                true,
                tmpRestorePath);    //中间临时HDFS路径,自己规划,必须是全路径,HBase集群的目录
        // 设置任务输出路径
        // 配置common集群fs.defaultFS
        job.getConfiguration().set("fs.defaultFS", "hdfs://XXX");
        FileOutputFormat.setOutputPath(job, outputpath);   // 数据结果在公共集群
        if (!job.waitForCompletion(true)) {
            throw new IOException("error with job!");
        }
    }

    /**
     * 设置Map方法,过滤表中符合要求的数据。比如列族 f 下的 f1 列的所有数据。
     */
    public static class MyMapper extends TableMapper<Text, Text> {

        @Override
        public void map(ImmutableBytesWritable row, Result value, Context context)
                throws InterruptedException, IOException {
            
        }

    }

}

import com.suning.recmmentation.content.video.online.habse.HbaseDaoImpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.rdd.RDD;
import scala.Tuple2;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class SparkSnapshot {

    public static void main(String[] args) throws IOException {
        final SparkConf sparkConf = new SparkConf().setAppName("XXX");
        final SparkContext sc = new SparkContext(sparkConf);
        Configuration conf = new Configuration();
        conf.set("hbase.rootdir", "hdfs://XXX/XXX");
        conf.set("dfs.nameservices", "XXX,XXX");
        conf.set("dfs.ha.namenodes.XXX", "nn1,nn2");
        conf.set("dfs.namenode.rpc-address.XXX.nn1", "XXX");
        conf.set("dfs.namenode.rpc-address.XXX.nn2", "XXX");
        conf.set("dfs.client.failover.proxy.provider.XXX", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
        conf.set("hbase.zookeeper.quorum", "XXX");
        conf.set("hbase.zookeeper.property.clientPort", "XXX");
        conf.set("zookeeper.znode.parent", "XXX");

        Scan scan = new Scan();
        scan.setCaching(500);
        scan.setCacheBlocks(false); //必须设置为false
        conf.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray()));
        String snapshotName = "XXX";
        TableSnapshotInputFormatImpl.setInput(conf, snapshotName, new Path("hdfs://XXX/XXX"));
        final RDD<Tuple2<ImmutableBytesWritable, Result>> hbaseRDD = sc.newAPIHadoopRDD(conf, TableSnapshotInputFormat.class, ImmutableBytesWritable.class, Result.class);
        // do something...

        sc.stop();
    }

}



版权声明:本文为chen20111原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。