消息队列kafka(一)–基本使用

  • Post author:
  • Post category:其他


一、kafka简介

kafka是一种高吞吐量的消息队列。

二、kafka特点

1、轻量级,比如activeMQ等消息队列更轻量级。

2、消息在kafka中,无论消息是否被消费,都不会被删除,会保留所有消息。

3、消息删除的策略,基于时间。在config/server.properties中配置。即

# The minimum age of a log file to be eligible for deletion

log.retention.hours=168

4、采用scala语言编写

三、kafka构成

1、topic

相当于队列queue

2、broker

kafka集群中的每一台机器,即称为broker。一个topic可以有多个broker。即是同个queue下的消息可以分布在多台机器上。

3、partition

一个topic可以有多个partition,每一个partition对应一个文件,每个文件内包含数据及相应索引。

4、producer

消息的生产者

5、consumer

消息的消费者

6、consumer group

消费者组,对同一个topic,同一消费组内的消费者,只有一个能收到消息。

而不同消费组的消费者,则都可以收到消息,相当于广播。

四、安装及使用

1、下载kafka安装包kafka_2.11-0.8.2.1.tgz

2、解压到某个文件夹下:   tar zxvf kafka_2.11-0.8.2.1.tgz

3、安装zookeeper并启动

4、进入解压目录下,开启kafka服务:   bin/kafka-server-start.sh  config/server.properties

5、创建topic :   bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic my_topic_1

6、查看创建的所有topic信息:

命令:  bin/kafka-topics.sh –describe –zookeeper localhost:2181

或者指定topic信息:  bin/kafka-topics.sh –describe –zookeeper localhost:2181  –topic my_topic_1

也可使用命令 孙悦/kafka-topics.sh –list –zookeeper localhost:2181

7、消息生产者,发送消息到自定义的topic

命令:

bin/kafka-console-producer.sh –broker-list localhost:9092 –topic my_topic_1

接下来便可发送消息

8、消息消费者,

命令:

bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic my_topic_1 –from-beginning

五、使用java产生消息

1、(特别注意)若将消息发送给远程服务器上,则需修改kafka安装时的配置文件kafka/config/server.properties

(在版本kafka_2.11-0.8.2.1中)将 #host.name=localhost 去掉注释并改为本机的ip地址(不能使用localhost), 如 host.name=192.168.0.107

(在版本kafka_2.12-1.0.0中)将#listeners=PLAINTEXT://:9092 去掉注释并改为本机的ip地址(不能使用localhost), 如 listeners = PLAINTEXT://192.168.0.107:9092

将#zookeeper.connect=localhost:2181  去掉注释并改为本机的ip地址(不能使用localhost), 如 zookeeper.connect=192.168.0.107:2181

在,则需添加

2、添加maven依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.9.2</artifactId>
    <version>0.8.2.1</version>
</dependency>

3、消息产生的代码

public class KafkaSimpleProducerMain {
    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();
        properties.put("metadata.broker.list", "192.168.0.107:9092");  //指定kafka服务的地址和端口号
        properties.put("key.serializer.class", "kafka.serializer.StringEncoder"); //指定key的序列化方式
        properties.put("serializer.class", "kafka.serializer.StringEncoder");  //指定value的序列化方式
        ProducerConfig producerConfig = new ProducerConfig(properties);

        Producer<String, String> producer = new Producer<>(producerConfig);

        String topic = "my_topic"; //指定topic

        while (true) {
            String msg = "this is a test msg" + new Date();
            KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(topic, msg);
            producer.send(keyedMessage);  //发送消息
            System.out.println("send msg: " + msg);

            TimeUnit.SECONDS.sleep(1);
        }
    }
}

输出:

send msg: this is a test msgTue Oct 31 23:11:21 CST 2017

send msg: this is a test msgTue Oct 31 23:11:23 CST 2017

send msg: this is a test msgTue Oct 31 23:11:24 CST 2017

send msg: this is a test msgTue Oct 31 23:11:25 CST 2017

send msg: this is a test msgTue Oct 31 23:11:26 CST 2017

send msg: this is a test msgTue Oct 31 23:11:27 CST 2017

六、使用java消费消息

1、添加mavan依赖,同上

2、消息消费的代码

public class KafkaSimpleConsumerMain {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("zookeeper.connect","192.168.0.107:2181");  //指定zookeeper
        properties.put("zookeeper.session.timeout.ms", "300000");  //设置超时时间
        properties.put("serializer.class", "kafka.serializer.StringEncoder");  //指定value的序列化方式
        properties.put("group.id","my_group_1");
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);


        String topic = "my_topic";
        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);

        Map<String,Integer> topicConutMap = new HashMap<>();
        topicConutMap.put(topic,1);  //指定每次取数个数
        Map<String, List<KafkaStream<byte[],byte[]>>> messageStreams = consumer.createMessageStreams(topicConutMap);
        KafkaStream<byte[],byte[]> stream = messageStreams.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
        while(iterator.hasNext()){
            String msg = new String(iterator.next().message());
            System.out.println("receive msg : "+msg);
        }
    }
}

输出:

receive msg : this is a test msgTue Oct 31 23:11:21 CST 2017

receive msg : this is a test msgTue Oct 31 23:11:23 CST 2017

receive msg : this is a test msgTue Oct 31 23:11:24 CST 2017

receive msg : this is a test msgTue Oct 31 23:11:25 CST 2017

receive msg : this is a test msgTue Oct 31 23:11:26 CST 2017

receive msg : this is a test msgTue Oct 31 23:11:27 CST 2017



版权声明:本文为chinabestchina原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。