目录
Java API连接HBase操作类和工具类
原文连接:https://www.zifangsky.cn/1286.html
引入依赖
<!-- Apache HBase Client -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.0.1</version>
</dependency>
HBase相关的基本操作:
package cn.zifangsky.common;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.math.BigInteger;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
/**
* HBase相关的基本操作
* @author zifangsky
* @date 2018/7/3
* @since 1.0.0
*/
public class HBaseService {
private Logger log = LoggerFactory.getLogger(HBaseService.class);
/**
* 声明静态配置
*/
private Configuration conf = null;
private Connection connection = null;
public HBaseService(Configuration conf) {
this.conf = conf;
try {
connection = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
log.error("获取HBase连接失败");
}
}
/**
* 创建表
* @author zifangsky
* @date 2018/7/3 17:50
* @since 1.0.0
* @param tableName 表名
* @param columnFamily 列族名
* @return void
*/
public boolean creatTable(String tableName, List<String> columnFamily) {
Admin admin = null;
try {
admin = connection.getAdmin();
List<ColumnFamilyDescriptor> familyDescriptors = new ArrayList<>(columnFamily.size());
columnFamily.forEach(cf -> {
familyDescriptors.add(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf)).build());
});
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
.setColumnFamilies(familyDescriptors)
.build();
if (admin.tableExists(TableName.valueOf(tableName))) {
log.debug("table Exists!");
} else {
admin.createTable(tableDescriptor);
log.debug("create table Success!");
}
} catch (IOException e) {
log.error(MessageFormat.format("创建表{0}失败",tableName),e);
return false;
}finally {
close(admin,null,null);
}
return true;
}
/**
* 预分区创建表
* @param tableName 表名
* @param columnFamily 列族名的集合
* @param splitKeys 预分期region
* @return 是否创建成功
*/
public boolean createTableBySplitKeys(String tableName, List<String> columnFamily, byte[][] splitKeys) {
Admin admin = null;
try {
if (StringUtils.isBlank(tableName) || columnFamily == null
|| columnFamily.size() == 0) {
log.error("===Parameters tableName|columnFamily should not be null,Please check!===");
return false;
}
admin = connection.getAdmin();
if (admin.tableExists(TableName.valueOf(tableName))) {
return true;
} else {
List<ColumnFamilyDescriptor> familyDescriptors = new ArrayList<>(columnFamily.size());
columnFamily.forEach(cf -> {
familyDescriptors.add(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf)).build());
});
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
.setColumnFamilies(familyDescriptors)
.build();
//指定splitkeys
admin.createTable(tableDescriptor,splitKeys);
log.info("===Create Table " + tableName
+ " Success!columnFamily:" + columnFamily.toString()
+ "===");
}
} catch (IOException e) {
log.error("",e);
return false;
}finally {
close(admin,null,null);
}
return true;
}
/**
* 自定义获取分区splitKeys
*/
public byte[][] getSplitKeys(String[] keys){
if(keys==null){
//默认为10个分区
keys = new String[] { "1|", "2|", "3|", "4|",
"5|", "6|", "7|", "8|", "9|" };
}
byte[][] splitKeys = new byte[keys.length][];
//升序排序
TreeSet<byte[]> rows = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
for(String key : keys){
rows.add(Bytes.toBytes(key));
}
Iterator<byte[]> rowKeyIter = rows.iterator();
int i=0;
while (rowKeyIter.hasNext()) {
byte[] tempRow = rowKeyIter.next();
rowKeyIter.remove();
splitKeys[i] = tempRow;
i++;
}
return splitKeys;
}
/**
* 按startKey和endKey,分区数获取分区
*/
public static byte[][] getHexSplits(String startKey, String endKey, int numRegions) {
byte[][] splits = new byte[numRegions-1][];
BigInteger lowestKey = new BigInteger(startKey, 16);
BigInteger highestKey = new BigInteger(endKey, 16);
BigInteger range = highestKey.subtract(lowestKey);
BigInteger regionIncrement = range.divide(BigInteger.valueOf(numRegions));
lowestKey = lowestKey.add(regionIncrement);
for(int i=0; i < numRegions-1;i++) {
BigInteger key = lowestKey.add(regionIncrement.multiply(BigInteger.valueOf(i)));
byte[] b = String.format("%016x", key).getBytes();
splits[i] = b;
}
return splits;
}
/**
* 获取table
* @param tableName 表名
* @return Table
* @throws IOException IOException
*/
private Table getTable(String tableName) throws IOException {
return connection.getTable(TableName.valueOf(tableName));
}
/**
* 查询库中所有表的表名
*/
public List<String> getAllTableNames(){
List<String> result = new ArrayList<>();
Admin admin = null;
try {
admin = connection.getAdmin();
TableName[] tableNames = admin.listTableNames();
for(TableName tableName : tableNames){
result.add(tableName.getNameAsString());
}
}catch (IOException e) {
log.error("获取所有表的表名失败",e);
}finally {
close(admin,null,null);
}
return result;
}
/**
* 遍历查询指定表中的所有数据
* @author zifangsky
* @date 2018/7/3 18:21
* @since 1.0.0
* @param tableName 表名
* @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>>
*/
public Map<String,Map<String,String>> getResultScanner(String tableName){
Scan scan = new Scan();
return this.queryData(tableName,scan);
}
/**
* 根据startRowKey和stopRowKey遍历查询指定表中的所有数据
* @author zifangsky
* @date 2018/7/4 18:21
* @since 1.0.0
* @param tableName 表名
* @param startRowKey 起始rowKey
* @param stopRowKey 结束rowKey
* @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>>
*/
public Map<String,Map<String,String>> getResultScanner(String tableName, String startRowKey, String stopRowKey){
Scan scan = new Scan();
if(StringUtils.isNoneBlank(startRowKey) && StringUtils.isNoneBlank(stopRowKey)){
scan.withStartRow(Bytes.toBytes(startRowKey));
scan.withStopRow(Bytes.toBytes(stopRowKey));
}
return this.queryData(tableName,scan);
}
/**
* 通过行前缀过滤器查询数据
* @author zifangsky
* @date 2018/7/4 18:21
* @since 1.0.0
* @param tableName 表名
* @param prefix 以prefix开始的行键
* @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>>
*/
public Map<String,Map<String,String>> getResultScannerPrefixFilter(String tableName, String prefix){
Scan scan = new Scan();
if(StringUtils.isNoneBlank(prefix)){
Filter filter = new PrefixFilter(Bytes.toBytes(prefix));
scan.setFilter(filter);
}
return this.queryData(tableName,scan);
}
/**
* 通过列前缀过滤器查询数据
* @author zifangsky
* @date 2018/7/4 18:21
* @since 1.0.0
* @param tableName 表名
* @param prefix 以prefix开始的列名
* @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>>
*/
public Map<String,Map<String,String>> getResultScannerColumnPrefixFilter(String tableName, String prefix){
Scan scan = new Scan();
if(StringUtils.isNoneBlank(prefix)){
Filter filter = new ColumnPrefixFilter(Bytes.toBytes(prefix));
scan.setFilter(filter);
}
return this.queryData(tableName,scan);
}
/**
* 查询行键中包含特定字符的数据
* @author zifangsky
* @date 2018/7/4 18:21
* @since 1.0.0
* @param tableName 表名
* @param keyword 包含指定关键词的行键
* @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>>
*/
public Map<String,Map<String,String>> getResultScannerRowFilter(String tableName, String keyword){
Scan scan = new Scan();
if(StringUtils.isNoneBlank(keyword)){
Filter filter = new RowFilter(CompareOperator.GREATER_OR_EQUAL,new SubstringComparator(keyword));
scan.setFilter(filter);
}
return this.queryData(tableName,scan);
}
/**
* 查询列名中包含特定字符的数据
* @author zifangsky
* @date 2018/7/4 18:21
* @since 1.0.0
* @param tableName 表名
* @param keyword 包含指定关键词的列名
* @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>>
*/
public Map<String,Map<String,String>> getResultScannerQualifierFilter(String tableName, String keyword){
Scan scan = new Scan();
if(StringUtils.isNoneBlank(keyword)){
Filter filter = new QualifierFilter(CompareOperator.GREATER_OR_EQUAL,new SubstringComparator(keyword));
scan.setFilter(filter);
}
return this.queryData(tableName,scan);
}
/**
* 通过表名以及过滤条件查询数据
* @author zifangsky
* @date 2018/7/4 16:13
* @since 1.0.0
* @param tableName 表名
* @param scan 过滤条件
* @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>>
*/
private Map<String,Map<String,String>> queryData(String tableName,Scan scan){
//<rowKey,对应的行数据>
Map<String,Map<String,String>> result = new HashMap<>();
ResultScanner rs = null;
// 获取表
Table table= null;
try {
table = getTable(tableName);
rs = table.getScanner(scan);
for (Result r : rs) {
//每一行数据
Map<String,String> columnMap = new HashMap<>();
String rowKey = null;
for (Cell cell : r.listCells()) {
if(rowKey == null){
rowKey = Bytes.toString(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength());
}
columnMap.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
}
if(rowKey != null){
result.put(rowKey,columnMap);
}
}
}catch (IOException e) {
log.error(MessageFormat.format("遍历查询指定表中的所有数据失败,tableName:{0}"
,tableName),e);
}finally {
close(null,rs,table);
}
return result;
}
/**
* 根据tableName和rowKey精确查询一行的数据
* @author zifangsky
* @date 2018/7/3 16:07
* @since 1.0.0
* @param tableName 表名
* @param rowKey 行键
* @return java.util.Map<java.lang.String,java.lang.String> 返回一行的数据
*/
public Map<String,String> getRowData(String tableName, String rowKey){
//返回的键值对
Map<String,String> result = new HashMap<>();
Get get = new Get(Bytes.toBytes(rowKey));
// 获取表
Table table= null;
try {
table = getTable(tableName);
Result hTableResult = table.get(get);
if (hTableResult != null && !hTableResult.isEmpty()) {
for (Cell cell : hTableResult.listCells()) {
// System.out.println("family:" + Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()));
// System.out.println("qualifier:" + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
// System.out.println("value:" + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
// System.out.println("Timestamp:" + cell.getTimestamp());
// System.out.println("-------------------------------------------");
result.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
}
}
}catch (IOException e) {
log.error(MessageFormat.format("查询一行的数据失败,tableName:{0},rowKey:{1}"
,tableName,rowKey),e);
}finally {
close(null,null,table);
}
return result;
}
/**
* 根据tableName、rowKey、familyName、column查询指定单元格的数据
* @author zifangsky
* @date 2018/7/4 10:58
* @since 1.0.0
* @param tableName 表名
* @param rowKey rowKey
* @param familyName 列族名
* @param columnName 列名
* @return java.lang.String
*/
public String getColumnValue(String tableName, String rowKey, String familyName, String columnName){
String str = null;
Get get = new Get(Bytes.toBytes(rowKey));
// 获取表
Table table= null;
try {
table = getTable(tableName);
Result result = table.get(get);
if (result != null && !result.isEmpty()) {
Cell cell = result.getColumnLatestCell(Bytes.toBytes(familyName), Bytes.toBytes(columnName));
if(cell != null){
str = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
}
}
} catch (IOException e) {
log.error(MessageFormat.format("查询指定单元格的数据失败,tableName:{0},rowKey:{1},familyName:{2},columnName:{3}"
,tableName,rowKey,familyName,columnName),e);
}finally {
close(null,null,table);
}
return str;
}
/**
* 根据tableName、rowKey、familyName、column查询指定单元格多个版本的数据
* @author zifangsky
* @date 2018/7/4 11:16
* @since 1.0.0
* @param tableName 表名
* @param rowKey rowKey
* @param familyName 列族名
* @param columnName 列名
* @param versions 需要查询的版本数
* @return java.util.List<java.lang.String>
*/
public List<String> getColumnValuesByVersion(String tableName, String rowKey, String familyName, String columnName,int versions) {
//返回数据
List<String> result = new ArrayList<>(versions);
// 获取表
Table table= null;
try {
table = getTable(tableName);
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName));
//读取多少个版本
get.readVersions(versions);
Result hTableResult = table.get(get);
if (hTableResult != null && !hTableResult.isEmpty()) {
for (Cell cell : hTableResult.listCells()) {
result.add(Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
}
}
} catch (IOException e) {
log.error(MessageFormat.format("查询指定单元格多个版本的数据失败,tableName:{0},rowKey:{1},familyName:{2},columnName:{3}"
,tableName,rowKey,familyName,columnName),e);
}finally {
close(null,null,table);
}
return result;
}
/**
* 为表添加 or 更新数据
* @author zifangsky
* @date 2018/7/3 17:26
* @since 1.0.0
* @param tableName 表名
* @param rowKey rowKey
* @param familyName 列族名
* @param columns 列名数组
* @param values 列值得数组
*/
public void putData(String tableName,String rowKey, String familyName, String[] columns, String[] values) {
// 获取表
Table table= null;
try {
table=getTable(tableName);
putData(table,rowKey,tableName,familyName,columns,values);
} catch (Exception e) {
log.error(MessageFormat.format("为表添加 or 更新数据失败,tableName:{0},rowKey:{1},familyName:{2}"
,tableName,rowKey,familyName),e);
}finally {
close(null,null,table);
}
}
/**
* 为表添加 or 更新数据
* @author zifangsky
* @date 2018/7/3 17:26
* @since 1.0.0
* @param table Table
* @param rowKey rowKey
* @param tableName 表名
* @param familyName 列族名
* @param columns 列名数组
* @param values 列值得数组
*/
private void putData(Table table, String rowKey, String tableName, String familyName, String[] columns, String[] values) {
try {
//设置rowkey
Put put = new Put(Bytes.toBytes(rowKey));
if(columns != null && values != null && columns.length == values.length){
for(int i=0;i<columns.length;i++){
if(columns[i] != null && values[i] != null){
put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));
}else{
throw new NullPointerException(MessageFormat.format("列名和列数据都不能为空,column:{0},value:{1}"
,columns[i],values[i]));
}
}
}
table.put(put);
log.debug("putData add or update data Success,rowKey:" + rowKey);
table.close();
} catch (Exception e) {
log.error(MessageFormat.format("为表添加 or 更新数据失败,tableName:{0},rowKey:{1},familyName:{2}"
,tableName,rowKey,familyName),e);
}
}
/**
* 为表的某个单元格赋值
* @author zifangsky
* @date 2018/7/4 10:20
* @since 1.0.0
* @param tableName 表名
* @param rowKey rowKey
* @param familyName 列族名
* @param column1 列名
* @param value1 列值
*/
public void setColumnValue(String tableName, String rowKey, String familyName, String column1, String value1){
Table table=null;
try {
// 获取表
table=getTable(tableName);
// 设置rowKey
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(column1), Bytes.toBytes(value1));
table.put(put);
log.debug("add data Success!");
}catch (IOException e) {
log.error(MessageFormat.format("为表的某个单元格赋值失败,tableName:{0},rowKey:{1},familyName:{2},column:{3}"
,tableName,rowKey,familyName,column1),e);
}finally {
close(null,null,table);
}
}
/**
* 删除指定的单元格
* @author zifangsky
* @date 2018/7/4 11:41
* @since 1.0.0
* @param tableName 表名
* @param rowKey rowKey
* @param familyName 列族名
* @param columnName 列名
* @return boolean
*/
public boolean deleteColumn(String tableName, String rowKey, String familyName, String columnName) {
Table table=null;
Admin admin = null;
try {
admin = connection.getAdmin();
if(admin.tableExists(TableName.valueOf(tableName))){
// 获取表
table=getTable(tableName);
Delete delete = new Delete(Bytes.toBytes(rowKey));
// 设置待删除的列
delete.addColumns(Bytes.toBytes(familyName), Bytes.toBytes(columnName));
table.delete(delete);
log.debug(MessageFormat.format("familyName({0}):columnName({1})is deleted!",familyName,columnName));
}
}catch (IOException e) {
log.error(MessageFormat.format("删除指定的列失败,tableName:{0},rowKey:{1},familyName:{2},column:{3}"
,tableName,rowKey,familyName,columnName),e);
return false;
}finally {
close(admin,null,table);
}
return true;
}
/**
* 根据rowKey删除指定的行
* @author zifangsky
* @date 2018/7/4 13:26
* @since 1.0.0
* @param tableName 表名
* @param rowKey rowKey
* @return boolean
*/
public boolean deleteRow(String tableName, String rowKey) {
Table table=null;
Admin admin = null;
try {
admin = connection.getAdmin();
if(admin.tableExists(TableName.valueOf(tableName))){
// 获取表
table=getTable(tableName);
Delete delete = new Delete(Bytes.toBytes(rowKey));
table.delete(delete);
log.debug(MessageFormat.format("row({0}) is deleted!",rowKey));
}
}catch (IOException e) {
log.error(MessageFormat.format("删除指定的行失败,tableName:{0},rowKey:{1}"
,tableName,rowKey),e);
return false;
}finally {
close(admin,null,table);
}
return true;
}
/**
* 根据columnFamily删除指定的列族
* @author zifangsky
* @date 2018/7/4 13:26
* @since 1.0.0
* @param tableName 表名
* @param columnFamily 列族
* @return boolean
*/
public boolean deleteColumnFamily(String tableName, String columnFamily) {
Admin admin = null;
try {
admin = connection.getAdmin();
if(admin.tableExists(TableName.valueOf(tableName))){
admin.deleteColumnFamily(TableName.valueOf(tableName), Bytes.toBytes(columnFamily));
log.debug(MessageFormat.format("familyName({0}) is deleted!",columnFamily));
}
}catch (IOException e) {
log.error(MessageFormat.format("删除指定的列族失败,tableName:{0},columnFamily:{1}"
,tableName,columnFamily),e);
return false;
}finally {
close(admin,null,null);
}
return true;
}
/**
* 删除表
* @author zifangsky
* @date 2018/7/3 18:02
* @since 1.0.0
* @param tableName 表名
*/
public boolean deleteTable(String tableName){
Admin admin = null;
try {
admin = connection.getAdmin();
if(admin.tableExists(TableName.valueOf(tableName))){
admin.disableTable(TableName.valueOf(tableName));
admin.deleteTable(TableName.valueOf(tableName));
log.debug(tableName + "is deleted!");
}
}catch (IOException e) {
log.error(MessageFormat.format("删除指定的表失败,tableName:{0}"
,tableName),e);
return false;
}finally {
close(admin,null,null);
}
return true;
}
/**
* 关闭流
*/
private void close(Admin admin, ResultScanner rs, Table table){
if(admin != null){
try {
admin.close();
} catch (IOException e) {
log.error("关闭Admin失败",e);
}
}
if(rs != null){
rs.close();
}
if(table != null){
try {
table.close();
} catch (IOException e) {
log.error("关闭Table失败",e);
}
}
}
}
添加HBase配置信息
package cn.zifangsky.config;
import cn.zifangsky.common.HBaseService;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* HBase相关配置
*
* @author zifangsky
* @date 2018/7/12
* @since 1.0.0
*/
@Configuration
public class HBaseConfig {
@Value("${HBase.nodes}")
private String nodes;
@Value("${HBase.maxsize}")
private String maxsize;
@Bean
public HBaseService getHbaseService(){
org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum",nodes );
conf.set("hbase.client.keyvalue.maxsize",maxsize);
return new HBaseService(conf);
}
}
用到的具体属性如下:
HBase.nodes=192.168.197.130
HBase.maxsize=500000
测试上面的基本方法
1)删除、创建表,以及基本查询:
package cn.zifangsky.hbase;
import cn.zifangsky.common.HBaseService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
import java.util.Arrays;
import java.util.Map;
/**
* 测试Hbase SQL
* @author zifangsky
* @date 2018/7/3
* @since 1.0.0
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
@WebAppConfiguration
public class TestHbaseSql {
@Autowired
private HBaseService hbaseService;
/**
* 测试删除、创建表
*/
@Test
public void testCreateTable() {
//删除表
hbaseService.deleteTable("test_base");
//创建表
hbaseService.createTableBySplitKeys("test_base", Arrays.asList("f","back"),hbaseService.getSplitKeys(null));
//插入三条数据
hbaseService.putData("test_base","66804_000001","f",new String[]{"project_id","varName","coefs","pvalues","tvalues","create_time"},new String[]{"40866","mob_3","0.9416","0.0000","12.2293","null"});
hbaseService.putData("test_base","66804_000002","f",new String[]{"project_id","varName","coefs","pvalues","tvalues","create_time"},new String[]{"40866","idno_prov","0.9317","0.0000","9.8679","null"});
hbaseService.putData("test_base","66804_000003","f",new String[]{"project_id","varName","coefs","pvalues","tvalues","create_time"},new String[]{"40866","education","0.8984","0.0000","25.5649","null"});
//查询数据
//1. 根据rowKey查询
Map<String,String> result1 = hbaseService.getRowData("test_base","66804_000001");
System.out.println("+++++++++++根据rowKey查询+++++++++++");
result1.forEach((k,value) -> {
System.out.println(k + "---" + value);
});
System.out.println();
//精确查询某个单元格的数据
String str1 = hbaseService.getColumnValue("test_base","66804_000002","f","varName");
System.out.println("+++++++++++精确查询某个单元格的数据+++++++++++");
System.out.println(str1);
System.out.println();
//2. 遍历查询
Map<String,Map<String,String>> result2 = hbaseService.getResultScanner("test_base");
System.out.println("+++++++++++遍历查询+++++++++++");
result2.forEach((k,value) -> {
System.out.println(k + "---" + value);
});
}
}
运行单元测试后,输出如下:
+++++++++++根据rowKey查询+++++++++++
coefs---0.9416
varName---mob_3
create_time---null
project_id---40866
pvalues---0.0000
tvalues---12.2293
+++++++++++精确查询某个单元格的数据+++++++++++
idno_prov
+++++++++++遍历查询+++++++++++
66804_000001---{coefs=0.9416, varName=mob_3, create_time=null, project_id=40866, pvalues=0.0000, tvalues=12.2293}
66804_000002---{coefs=0.9317, varName=idno_prov, create_time=null, project_id=40866, pvalues=0.0000, tvalues=9.8679}
66804_000003---{coefs=0.8984, varName=education, create_time=null, project_id=40866, pvalues=0.0000, tvalues=25.5649}
2)指定startRowKey和stopRowKey的查询
/**
* 测试指定startRowKey和stopRowKey的查询
*/
@Test
public void testSelectByStartStopRowKey(){
Map<String,Map<String,String>> result = hbaseService.getResultScanner("test_base","66804_000002","66804_000004");
result.forEach((rowKey,columnMap) -> {
System.out.println("rowKey:" + rowKey);
System.out.println("+++++++++++行数据+++++++++++");
columnMap.forEach((k,value) -> {
System.out.println(k + "---" + value);
});
});
System.out.println("-----------------------");
}
运行单元测试后,输出如下:
rowKey:66804_000002
+++++++++++行数据+++++++++++
coefs---0.9317
varName---idno_prov
create_time---null
project_id---40866
pvalues---0.0000
tvalues---9.8679
rowKey:66804_000003
+++++++++++行数据+++++++++++
coefs---0.8984
varName---education
create_time---null
project_id---40866
pvalues---0.0000
tvalues---25.5649
-----------------------
3)获取所有表名
/**
* 测试获取所有表名
*/
@Test
public void testGetTableNameLists(){
List<String> result = hbaseService.getAllTableNames();
result.forEach(System.out::println);
}
运行单元测试后,输出如下:
test
test_base
4)获取指定单元格多个版本的数据
/**
* 测试获取指定单元格多个版本的数据
*/
@Test
public void testGetColumnValuesByVersion(){
hbaseService.setColumnValue("test_base","66804_000002","f","varName","aa");
hbaseService.setColumnValue("test_base","66804_000002","f","varName","bb");
hbaseService.setColumnValue("test_base","66804_000002","f","varName","cc");
hbaseService.setColumnValue("test_base","66804_000002","f","varName","dd");
hbaseService.setColumnValue("test_base","66804_000002","f","varName","ee");
List<String> result = hbaseService.getColumnValuesByVersion("test_base","66804_000002","f","varName",4);
result.forEach(System.out::println);
}
运行单元测试后,输出如下:
ee
注:因为HBase默认只保存一个版本,所以这里看不出效果。
5)根据行键过滤器查询数据
/**
* 测试根据行键过滤器查询数据
*/
@Test
public void testGetResultScannerPrefixFilter(){
hbaseService.putData("test_base","111","f",new String[]{"project_id","varName","coefs","pvalues","tvalues","create_time"},new String[]{"111","111","111","111","111","null"});
hbaseService.putData("test_base","112","f",new String[]{"project_id","varName","coefs","pvalues","tvalues","create_time"},new String[]{"112","112","112","112","112","null"});
Map<String,Map<String,String>> result = hbaseService.getResultScannerPrefixFilter("test_base","11");
result.forEach((rowKey,columnMap) -> {
System.out.println("-----------------------");
System.out.println("rowKey:" + rowKey);
System.out.println("+++++++++++行数据+++++++++++");
columnMap.forEach((k,value) -> {
System.out.println(k + "---" + value);
});
System.out.println("-----------------------");
});
}
运行单元测试后,输出如下:
-----------------------
rowKey:111
+++++++++++行数据+++++++++++
coefs---111
varName---111
create_time---null
project_id---111
pvalues---111
tvalues---111
-----------------------
-----------------------
rowKey:112
+++++++++++行数据+++++++++++
coefs---112
varName---112
create_time---null
project_id---112
pvalues---112
tvalues---112
-----------------------
6)根据列名过滤器查询数据
/**
* 测试根据列名过滤器查询数据
*/
@Test
public void testGetResultScannerColumnPrefixFilter(){
hbaseService.putData("test_base","211","f",new String[]{"project_id"},new String[]{"11111"});
hbaseService.putData("test_base","211","f",new String[]{"var_name1"},new String[]{"111"});
hbaseService.putData("test_base","212","f",new String[]{"var_name2"},new String[]{"112"});
Map<String,Map<String,String>> result = hbaseService.getResultScannerColumnPrefixFilter("test_base","var_name");
result.forEach((rowKey,columnMap) -> {
System.out.println("-----------------------");
System.out.println("rowKey:" + rowKey);
System.out.println("+++++++++++行数据+++++++++++");
columnMap.forEach((k,value) -> {
System.out.println(k + "---" + value);
});
System.out.println("-----------------------");
});
}
运行单元测试后,输出如下:
-----------------------
rowKey:211
+++++++++++行数据+++++++++++
var_name1---111
-----------------------
-----------------------
rowKey:212
+++++++++++行数据+++++++++++
var_name2---112
-----------------------
7)查询行键中包含特定字符的数据
/**
* 测试查询行键中包含特定字符的数据
*/
@Test
public void testGetResultScannerRowFilter(){
hbaseService.putData("test_base","abc666666def","f",new String[]{"project_id","varName","coefs","pvalues","tvalues","create_time"},new String[]{"111","abc6666def","111","111","111","null"});
hbaseService.putData("test_base","cba666666fed","f",new String[]{"project_id","varName","coefs","pvalues","tvalues","create_time"},new String[]{"112","cba6666fed","112","112","112","null"});
Map<String,Map<String,String>> result = hbaseService.getResultScannerRowFilter("test_base","666666");
result.forEach((rowKey,columnMap) -> {
System.out.println("-----------------------");
System.out.println("rowKey:" + rowKey);
System.out.println("+++++++++++行数据+++++++++++");
columnMap.forEach((k,value) -> {
System.out.println(k + "---" + value);
});
System.out.println("-----------------------");
});
}
运行单元测试后,输出如下:
-----------------------
rowKey:cba666666fed
+++++++++++行数据+++++++++++
coefs---112
varName---cba6666fed
create_time---null
project_id---112
pvalues---112
tvalues---112
-----------------------
-----------------------
rowKey:abc666666def
+++++++++++行数据+++++++++++
coefs---111
varName---abc6666def
create_time---null
project_id---111
pvalues---111
tvalues---111
-----------------------
8)删除指定的列
/**
* 测试删除指定的列
*/
@Test
public void testDeleteColumn(){
//新增一个测试列
hbaseService.setColumnValue("test_base","66804_000002","f","xxx","123");
String str = hbaseService.getColumnValue("test_base","66804_000002","f","xxx");
System.out.println("第一次取值:" + str);
//删除测试列
hbaseService.deleteColumn("test_base","66804_000002","f","xxx");
//再次取值
String str2 = hbaseService.getColumnValue("test_base","66804_000002","f","xxx");
System.out.println("第二次取值:" + str2);
}
运行单元测试后,输出如下:
第一次取值:123
第二次取值:null
9)删除指定的行
/**
* 测试删除指定的行
*/
@Test
public void testDeleteRow(){
//取值
Map<String,String> result1 = hbaseService.getRowData("test_base","66804_000003");
System.out.println("第一次取值输出:");
result1.forEach((k,value) -> {
System.out.println(k + "---" + value);
});
//删除测试行
hbaseService.deleteRow("test_base","66804_000003");
//再次取值
Map<String,String> result2 = hbaseService.getRowData("test_base","66804_000003");
System.out.println("第二次取值输出:");
result2.forEach((k,value) -> {
System.out.println(k + "---" + value);
});
}
运行单元测试后,输出如下
第一次取值输出:
coefs---0.8984
varName---education
create_time---null
project_id---40866
pvalues---0.0000
tvalues---25.5649
第二次取值输出:
根据输出可以发现,这一行数据的确已经被删除
10)删除指定的列族
/**
* 测试删除指定的列族
*/
@Test
public void testDeleteColumnFamily(){
//添加测试数据
hbaseService.putData("test_base","777","back",new String[]{"var_name1"},new String[]{"777"});
//取值
Map<String,String> result1 = hbaseService.getRowData("test_base","777");
System.out.println("第一次取值输出:");
result1.forEach((k,value) -> {
System.out.println(k + "---" + value);
});
//删除测试列族
hbaseService.deleteColumnFamily("test_base","back");
//再次取值
Map<String,String> result2 = hbaseService.getRowData("test_base","777");
System.out.println("第二次取值输出:");
result2.forEach((k,value) -> {
System.out.println(k + "---" + value);
});
}
运行单元测试后,输出如下
第一次取值输出:
var_name1---777
第二次取值输出:
根据输出可以发现,“back”这个列族已经被删除