kafka数据导入hbase

  • Post author:
  • Post category:其他


我们在使用kafka

处理数据的过程中会使用


kafka


跟一下数据库进行交互,


Hbase


就是其中的一种。下面给大家介绍一下


kafka


中的数据是如何导入


Hbase


的。

本文的思路是通过

consumers


把数据消费到


Hbase


中。

首先在

Hbase


中创建表,创建表可以在


Hbase


客户端创建也可以通过


API


创建,这里介绍通过


API


创建表的方法:

创建CreatTableTest类



  1. import


    java.io.IOException;



  2. import


    org.apache.hadoop.conf.Configuration;



  3. import


    org.apache.hadoop.hbase.HBaseConfiguration;



  4. import


    org.apache.hadoop.hbase.HColumnDescriptor;



  5. import


    org.apache.hadoop.hbase.HTableDescriptor;



  6. import


    org.apache.hadoop.hbase.client.HBaseAdmin;



  7. public




    class


    CreatTableTest {



  8. public




    static




    void


    main(String[] args)


    throws


    IOException  {



  9. //设置HBase据库的连接配置参数




  10. Configuration conf = HBaseConfiguration.create();

  11. conf.set(

    “hbase.zookeeper.quorum”


    ,


    “192.168.5.128”


    );


    //  Zookeeper的地址




  12. conf.set(

    “hbase.zookeeper.property.clientPort”


    ,


    “42182”


    );


  13. String tableName =

    “emp”


    ;


  14. String[] family = {

    “basicinfo”


    ,


    “deptinfo”


    };


  15. HBaseAdmin hbaseAdmin =

    new


    HBaseAdmin(conf);



  16. //创建表对象




  17. HTableDescriptor hbaseTableDesc =

    new


    HTableDescriptor(tableName);



  18. for


    (


    int


    i =


    0


    ; i < family.length; i++) {



  19. //设置表字段




  20. hbaseTableDesc.addFamily(

    new


    HColumnDescriptor(family[i]));



  21. }


  22. //判断表是否存在,不存在则创建,存在则打印提示信息





  23. if


    (hbaseAdmin.tableExists(tableName)) {


  24. System.out.println(

    “TableExists!”


    );


  25. System.exit(

    0


    );


  26. }

    else


    {


  27. hbaseAdmin.createTable(hbaseTableDesc);

  28. System.out.println(

    “Create table Success!”


    );


  29. }

  30. }

  31. }

创建表之后我们创建一个

consumer


来消费数据到


Hbase





  1. import


    java.io.IOException;



  2. import


    java.util.HashMap;



  3. import


    java.util.List;



  4. import


    java.util.Map;



  5. import


    java.util.Properties;




  6. import


    com.teamsun.kafka.m001.KafkaProperties;




  7. import


    kafka.consumer.ConsumerConfig;



  8. import


    kafka.consumer.ConsumerIterator;



  9. import


    kafka.consumer.KafkaStream;



  10. import


    kafka.javaapi.consumer.ConsumerConnector;




  11. public




    class


    KafkaConsumer3


    extends


    Thread {



  12. private




    final


    ConsumerConnector consumer;



  13. private




    final


    String topic;




  14. public


    KafkaConsumer3(String topic) {


  15. consumer = kafka.consumer.Consumer

  16. .createJavaConsumerConnector(createConsumerConfig());


  17. this


    .topic = topic;


  18. }



  19. private




    static


    ConsumerConfig createConsumerConfig() {


  20. Properties props =

    new


    Properties();


  21. props.put(

    “zookeeper.connect”


    , KafkaProperties.zkConnect);


  22. props.put(

    “group.id”


    , KafkaProperties.groupId1);


  23. props.put(

    “zookeeper.session.timeout.ms”


    ,


    “40000”


    );


  24. props.put(

    “zookeeper.sync.time.ms”


    ,


    “200”


    );


  25. props.put(

    “auto.commit.interval.ms”


    ,


    “1000”


    );



  26. return




    new


    ConsumerConfig(props);


  27. }



  28. @Override





  29. public




    void


    run() {


  30. Map<String, Integer> topicCountMap =

    new


    HashMap<String, Integer>();


  31. topicCountMap.put(topic,

    new


    Integer(


    1


    ));


  32. Map<String, List<KafkaStream<

    byte


    [],


    byte


    []>>> consumerMap = consumer


  33. .createMessageStreams(topicCountMap);

  34. KafkaStream<

    byte


    [],


    byte


    []> stream = consumerMap.get(topic).get(


    0


    );


  35. ConsumerIterator<

    byte


    [],


    byte


    []> it = stream.iterator();


  36. HBaseUtils hbase =

    new


    HBaseUtils();



  37. while


    (it.hasNext()) {


  38. System.out.println(

    “3receive:”


    +


    new


    String(it.next().message()));



  39. try


    {


  40. hbase.put(

    new


    String(it.next().message()));


  41. }

    catch


    (IOException e) {



  42. // TODO Auto-generated catch block




  43. e.printStackTrace();

  44. }



  45. //          try {






  46. //              sleep(300);    // 每条消息延迟300ms





  47. //          } catch (InterruptedException e) {






  48. //              e.printStackTrace();





  49. //          }




  50. }

  51. }

  52. }

再创建一个HBaseUtils来指定要连接的Hbase数据库






  1. import


    java.io.IOException;



  2. import


    java.util.Random;




  3. import


    org.apache.hadoop.conf.Configuration;



  4. import


    org.apache.hadoop.hbase.HBaseConfiguration;



  5. import


    org.apache.hadoop.hbase.client.HTable;



  6. import


    org.apache.hadoop.hbase.client.Put;



  7. import


    org.apache.hadoop.hbase.util.Bytes;



  8. public




    class


    HBaseUtils {



  9. public




    void


    put(String string)


    throws


    IOException {



  10. //设置HBase据库的连接配置参数




  11. Configuration conf = HBaseConfiguration.create();

  12. conf.set(

    “hbase.zookeeper.quorum”


    ,


    “192.168.5.128”


    );


    //  Zookeeper的地址




  13. conf.set(

    “hbase.zookeeper.property.clientPort”


    ,


    “42182”


    );


  14. Random random =

    new


    Random();



  15. long


    a = random.nextInt(


    1000000000


    );


  16. String tableName =

    “emp”


    ;


  17. String rowkey =

    “rowkey”


    +a ;


  18. String columnFamily =

    “basicinfo”


    ;


  19. String column =

    “empname”


    ;



  20. //String value = string;




  21. HTable table=

    new


    HTable(conf, tableName);


  22. Put put=

    new


    Put(Bytes.toBytes(rowkey));


  23. put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(string));

  24. table.put(put);

    //放入表




  25. table.close();

    //释放资源




  26. }

  27. }


最后再加上

consumer


的配置文件就大功告成了。




  1. public




    interface


    KafkaProperties {




  2. final




    static


    String zkConnect =


    “hadoop0:42182,hadoop1:42182,hadoop2:42182,hadoop3:42182”


    ;



  3. final




    static


    String groupId1=


    “group1”


    ;



  4. final




    static


    String topic =


    “test3”


    ;



  5. final




    static


    String kafkaServerURL =


    “hadoop0,hadoop1”


    ;



  6. final




    static




    int


    kafkaServerPort =


    9092


    ;



  7. final




    static




    int


    kafkaProducerBufferSize =


    64


    *


    1024


    ;



  8. final




    static




    int


    connectionTimeOut =


    20000


    ;



  9. final




    static




    int


    reconnectInterval =


    10000


    ;



  10. final




    static


    String clientId =


    “SimpleConsumerDemoClient”


    ;


  11. }


然后执行

consumer


就可以了,注意要保证


topic


中有消息才可以消费。




  1. public




    class


    KafkaConsumerProducerTest {




  2. public




    static




    void


    main(String[] args) {



  3. //       KafkaProducer1 producerThread1 = new KafkaProducer1(KafkaProperties.topic);





  4. //       producerThread1.start();





  5. //       KafkaProducer2 producerThread2 = new KafkaProducer2(KafkaProperties.topic);





  6. //       producerThread2.start();





  7. //       KafkaProducer3 producerThread3 = new KafkaProducer3(KafkaProperties.topic);





  8. //       producerThread3.start();






  9. //       KafkaConsumer1 consumerThread1 = new KafkaConsumer1(KafkaProperties.topic);





  10. //       consumerThread1.start();





  11. //       KafkaConsumer2 consumerThread2 = new KafkaConsumer2(KafkaProperties.topic);





  12. //       consumerThread2.start();




  13. KafkaConsumer3 consumerThread3 =

    new


    KafkaConsumer3(KafkaProperties.topic);


  14. consumerThread3.start();


  15. //       KafkaConsumer4 consumerThread4 = new KafkaConsumer4(KafkaProperties.topic);





  16. //       consumerThread4.start();




  17. }

  18. }



HBase


客户端执行

hbase(main):063:0> scan  ’emp’

就可以查看到数据了。

以上就是

kafka


数据进入


Hbase


的一个例子,当然上诉只是保证数据走通了,大家在具体项目中什么需求,还需要自行修改和完善。