KafkaToHbase接口实现五张表 学习记录

  • Post author:
  • Post category:其他


handler

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import java.util.ArrayList;
import java.util.List;

public class EventAttendHandler implements IParseRecord {
    @Override
    public List<Put> parse(ConsumerRecords<String, String> records) {
        ArrayList<Put> datas = new ArrayList<>();
        for (ConsumerRecord record:
                records) {  //event_id,user_id,status
            //123454,11111,yes/maybe/no/invited
            System.out.println(record.value().toString());
            String[] split = record.value().toString().split(","); //[12345,22222,yes]
            Put put = new Put(Bytes.toBytes(split[0] + split[1]+split[2]));//rowKey
            put.addColumn("euat".getBytes(),"eventid".getBytes(),split[0].getBytes());
            put.addColumn("euat".getBytes(),"friendid".getBytes(),split[1].getBytes());
            put.addColumn("euat".getBytes(),"state".getBytes(),split[2].getBytes());
            datas.add(put);

        }
        return datas;
    }
}
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import java.util.ArrayList;
import java.util.List;

public class EventsHandler implements IParseRecord {
    @Override
    public List<Put> parse(ConsumerRecords<String, String> records) {
        ArrayList<Put> datas = new ArrayList<>();
        for (ConsumerRecord record : records) {
            System.out.println(record.value().toString());
            if(record.value().toString().trim().length()==0)
                continue;
            String[] split = record.value().toString().split(",");//[111,2222]
            Put put = new Put(Bytes.toBytes(split[0]));// rowKey
            put.addColumn("creator".getBytes(),"user_id".getBytes(),split[1].getBytes());
            put.addColumn("location".getBytes(),"city".getBytes(),split[3].getBytes());
            put.addColumn("location".getBytes(),"state".getBytes(),split[4].getBytes());
            put.addColumn("location".getBytes(),"zip".getBytes(),split[5].getBytes());
            put.addColumn("location".getBytes(),"country".getBytes(), split[6].getBytes());
            put.addColumn("location".getBytes(),"lat".getBytes(), split[7].getBytes());
            put.addColumn("location".getBytes(),"lng".getBytes(), split[8].getBytes());
            put.addColumn("remark".getBytes(),"common_words".getBytes(),split[9].getBytes());
            put.addColumn("schedule".getBytes(),"start_time".getBytes(), split[2].getBytes());
            datas.add(put);
        }
        return datas;
    }
}
import org.apache.hadoop.hbase.client.Put;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import java.util.List;

public interface IParseRecord {
    public List<Put>parse(ConsumerRecords<String,String>records);
}

worker

public interface IWorker {
    /**
     * 目标数据库表名字
     * @param targetTableName
     */
    public void fillData(String targetTableName);

}
import nj.zb.kb15.kafkatohbase.oop.writer.IWriter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;



public class Worker implements IWorker {
    private KafkaConsumer<String, String> consumer = null;
    private IWriter writer = null;

    public Worker(String topicName, String groupId, IWriter writer) {
        this.writer = writer;
        //配置kafka prop属性
        Properties prop = new Properties();
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.111.131:9092");
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //设置是否自动提交,获取数据的状态 false手动提交,true自动提交
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); //消费者组名
        //earliest latest none
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
        consumer.subscribe(Collections.singleton(topicName));

    }

    @Override
    public void fillData(String targetTableName) {
        Duration duration = Duration.ofMillis(100);
        long num = 0;
        try {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(duration);
            int count = writer.write(targetTableName, records);
            num += count;
            System.out.println("---------------num:" + num);

            Thread.sleep(10);
        }
        } catch (InterruptedException e) {
                e.printStackTrace();
        }
    }
}

writer

import nj.zb.kb15.kafkatohbase.oop.handler.IParseRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import java.io.IOException;
import java.util.List;

public class HBaseWriter implements IWriter {

    private Connection connection =null;
    private IParseRecord handler=null;

    public HBaseWriter(IParseRecord handler){
        //配置hbase信息,连接hbase数据库
        Configuration conf = HBaseConfiguration.create();
        conf.set(HConstants.HBASE_DIR,"hdfs://192.168.111.131:9000/hbase");
        conf.set(HConstants.ZOOKEEPER_QUORUM,"192.168.111.131");
        conf.set(HConstants.CLIENT_PORT_STR,"2181");

        try {
            connection=ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
            e.printStackTrace();
        }
        this.handler= handler;

    }



    @Override
    public int write(String targetTableName, ConsumerRecords<String, String> records) {

        try {
            Table table = connection.getTable(TableName.valueOf(targetTableName));
            List<Put>datas=handler.parse(records);
            if(datas!=null && datas.size()>0){
                table.put(datas);
                table.close();
                return datas.size();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return 0;
    }
}
import org.apache.kafka.clients.consumer.ConsumerRecords;

public interface IWriter {
    public int write(String targetTableName, ConsumerRecords<String,String> records);

}

test

import nj.zb.kb15.kafkatohbase.oop.handler.EventAttendHandler;
import nj.zb.kb15.kafkatohbase.oop.handler.IParseRecord;
import nj.zb.kb15.kafkatohbase.oop.worker.IWorker;
import nj.zb.kb15.kafkatohbase.oop.worker.Worker;
import nj.zb.kb15.kafkatohbase.oop.writer.HBaseWriter;
import nj.zb.kb15.kafkatohbase.oop.writer.IWriter;


/**
 * 将kafka中主题为event_attendess 数据导入到HBASE数据库中的event_attendee 表中
 */
public class EventAttendToHB2 {
    public static void main(String[] args) {

        IParseRecord handler = new EventAttendHandler();
//        IParseRecord handler2=new EventsHandler();
        IWriter writer = new HBaseWriter(handler);
//        IWriter writer = new HBaseWriter(handler2);
        IWorker worker=new Worker("event_attendess","event_attend_group",writer);
        worker.fillData("events_db:event_attendee");
//        IWorker worker = new Worker("events","events_group",writer);
//        worker.fillData("events_db:events");



    }
}










版权声明:本文为liuyongsheng666原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。