我们在使用kafka
处理数据的过程中会使用
kafka
跟一下数据库进行交互,
Hbase
就是其中的一种。下面给大家介绍一下
kafka
中的数据是如何导入
Hbase
的。
本文的思路是通过
consumers
把数据消费到
Hbase
中。
首先在
Hbase
中创建表,创建表可以在
Hbase
客户端创建也可以通过
API
创建,这里介绍通过
API
创建表的方法:
创建CreatTableTest类
-
import
java.io.IOException;
-
import
org.apache.hadoop.conf.Configuration;
-
import
org.apache.hadoop.hbase.HBaseConfiguration;
-
import
org.apache.hadoop.hbase.HColumnDescriptor;
-
import
org.apache.hadoop.hbase.HTableDescriptor;
-
import
org.apache.hadoop.hbase.client.HBaseAdmin;
-
public
class
CreatTableTest {
-
public
static
void
main(String[] args)
throws
IOException {
-
//设置HBase据库的连接配置参数
-
Configuration conf = HBaseConfiguration.create();
-
conf.set(
“hbase.zookeeper.quorum”
,
“192.168.5.128”
);
// Zookeeper的地址
-
conf.set(
“hbase.zookeeper.property.clientPort”
,
“42182”
);
-
String tableName =
“emp”
;
-
String[] family = {
“basicinfo”
,
“deptinfo”
};
-
HBaseAdmin hbaseAdmin =
new
HBaseAdmin(conf);
-
//创建表对象
-
HTableDescriptor hbaseTableDesc =
new
HTableDescriptor(tableName);
-
for
(
int
i =
0
; i < family.length; i++) {
-
//设置表字段
-
hbaseTableDesc.addFamily(
new
HColumnDescriptor(family[i]));
-
-
}
-
//判断表是否存在,不存在则创建,存在则打印提示信息
-
if
(hbaseAdmin.tableExists(tableName)) {
-
System.out.println(
“TableExists!”
);
-
System.exit(
0
);
-
}
else
{
-
hbaseAdmin.createTable(hbaseTableDesc);
-
System.out.println(
“Create table Success!”
);
-
}
-
}
-
}
创建表之后我们创建一个
consumer
来消费数据到
Hbase
中
-
import
java.io.IOException;
-
import
java.util.HashMap;
-
import
java.util.List;
-
import
java.util.Map;
-
import
java.util.Properties;
-
-
import
com.teamsun.kafka.m001.KafkaProperties;
-
-
import
kafka.consumer.ConsumerConfig;
-
import
kafka.consumer.ConsumerIterator;
-
import
kafka.consumer.KafkaStream;
-
import
kafka.javaapi.consumer.ConsumerConnector;
-
-
public
class
KafkaConsumer3
extends
Thread {
-
private
final
ConsumerConnector consumer;
-
private
final
String topic;
-
-
public
KafkaConsumer3(String topic) {
-
consumer = kafka.consumer.Consumer
-
.createJavaConsumerConnector(createConsumerConfig());
-
this
.topic = topic;
-
}
-
-
private
static
ConsumerConfig createConsumerConfig() {
-
Properties props =
new
Properties();
-
props.put(
“zookeeper.connect”
, KafkaProperties.zkConnect);
-
props.put(
“group.id”
, KafkaProperties.groupId1);
-
props.put(
“zookeeper.session.timeout.ms”
,
“40000”
);
-
props.put(
“zookeeper.sync.time.ms”
,
“200”
);
-
props.put(
“auto.commit.interval.ms”
,
“1000”
);
-
return
new
ConsumerConfig(props);
-
}
-
-
@Override
-
public
void
run() {
-
Map<String, Integer> topicCountMap =
new
HashMap<String, Integer>();
-
topicCountMap.put(topic,
new
Integer(
1
));
-
Map<String, List<KafkaStream<
byte
[],
byte
[]>>> consumerMap = consumer
-
.createMessageStreams(topicCountMap);
-
KafkaStream<
byte
[],
byte
[]> stream = consumerMap.get(topic).get(
0
);
-
ConsumerIterator<
byte
[],
byte
[]> it = stream.iterator();
-
HBaseUtils hbase =
new
HBaseUtils();
-
while
(it.hasNext()) {
-
System.out.println(
“3receive:”
+
new
String(it.next().message()));
-
try
{
-
hbase.put(
new
String(it.next().message()));
-
}
catch
(IOException e) {
-
// TODO Auto-generated catch block
-
e.printStackTrace();
-
}
-
-
// try {
-
// sleep(300); // 每条消息延迟300ms
-
// } catch (InterruptedException e) {
-
// e.printStackTrace();
-
// }
-
}
-
}
-
}
再创建一个HBaseUtils来指定要连接的Hbase数据库
-
import
java.io.IOException;
-
import
java.util.Random;
-
-
import
org.apache.hadoop.conf.Configuration;
-
import
org.apache.hadoop.hbase.HBaseConfiguration;
-
import
org.apache.hadoop.hbase.client.HTable;
-
import
org.apache.hadoop.hbase.client.Put;
-
import
org.apache.hadoop.hbase.util.Bytes;
-
public
class
HBaseUtils {
-
public
void
put(String string)
throws
IOException {
-
//设置HBase据库的连接配置参数
-
Configuration conf = HBaseConfiguration.create();
-
conf.set(
“hbase.zookeeper.quorum”
,
“192.168.5.128”
);
// Zookeeper的地址
-
conf.set(
“hbase.zookeeper.property.clientPort”
,
“42182”
);
-
Random random =
new
Random();
-
long
a = random.nextInt(
1000000000
);
-
String tableName =
“emp”
;
-
String rowkey =
“rowkey”
+a ;
-
String columnFamily =
“basicinfo”
;
-
String column =
“empname”
;
-
//String value = string;
-
HTable table=
new
HTable(conf, tableName);
-
Put put=
new
Put(Bytes.toBytes(rowkey));
-
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(string));
-
table.put(put);
//放入表
-
table.close();
//释放资源
-
}
-
}
最后再加上
consumer
的配置文件就大功告成了。
-
public
interface
KafkaProperties {
-
-
final
static
String zkConnect =
“hadoop0:42182,hadoop1:42182,hadoop2:42182,hadoop3:42182”
;
-
final
static
String groupId1=
“group1”
;
-
final
static
String topic =
“test3”
;
-
final
static
String kafkaServerURL =
“hadoop0,hadoop1”
;
-
final
static
int
kafkaServerPort =
9092
;
-
final
static
int
kafkaProducerBufferSize =
64
*
1024
;
-
final
static
int
connectionTimeOut =
20000
;
-
final
static
int
reconnectInterval =
10000
;
-
final
static
String clientId =
“SimpleConsumerDemoClient”
;
-
}
然后执行
consumer
就可以了,注意要保证
topic
中有消息才可以消费。
-
public
class
KafkaConsumerProducerTest {
-
-
public
static
void
main(String[] args) {
-
// KafkaProducer1 producerThread1 = new KafkaProducer1(KafkaProperties.topic);
-
// producerThread1.start();
-
// KafkaProducer2 producerThread2 = new KafkaProducer2(KafkaProperties.topic);
-
// producerThread2.start();
-
// KafkaProducer3 producerThread3 = new KafkaProducer3(KafkaProperties.topic);
-
// producerThread3.start();
-
-
// KafkaConsumer1 consumerThread1 = new KafkaConsumer1(KafkaProperties.topic);
-
// consumerThread1.start();
-
// KafkaConsumer2 consumerThread2 = new KafkaConsumer2(KafkaProperties.topic);
-
// consumerThread2.start();
-
KafkaConsumer3 consumerThread3 =
new
KafkaConsumer3(KafkaProperties.topic);
-
consumerThread3.start();
-
// KafkaConsumer4 consumerThread4 = new KafkaConsumer4(KafkaProperties.topic);
-
// consumerThread4.start();
-
}
-
}
在
HBase
客户端执行
hbase(main):063:0> scan ’emp’
就可以查看到数据了。
以上就是
kafka
数据进入
Hbase
的一个例子,当然上诉只是保证数据走通了,大家在具体项目中什么需求,还需要自行修改和完善。