kafka学习(一)java发送接收消息,自定义有偏移量

  • Post author:
  • Post category:java


kafka学习(一)java连接

这篇连接的文章借鉴了https://www.cnblogs.com/qizhelongdeyang/p/7354183.html

手动控制offset



.这个是pom.xml文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>zhz</groupId>
  <artifactId>kafkaOffset</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>kafkaOffset</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
     <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.1.1</version>
        </dependency>
  </dependencies>
</project>

二生产者

package zhz.kafkaOffset;


import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
 * 生产者
 * @author zhaohangzhi
 *
 */
public class MyProducer {

    private static Properties                       properties;
    private static KafkaProducer<String, String>    pro;
    static {
        //配置
        properties = new Properties();
        properties.put("bootstrap.servers", "master169:9092");

        //序列化类型
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //创建生产者
        pro = new KafkaProducer<>(properties);
    }

    public static void main(String[] args) throws Exception {
        produce("test");
    }

    public static void produce(String topic) throws Exception {

        //模拟message
        //          String value = UUID.randomUUID().toString();
        for (int i = 0; i < 100; i++) {
            //封装message
            String message ="hahaha";
            ProducerRecord<String, String> pr = new ProducerRecord<String, String>(topic, i + message);
            System.out.println("发送信息:    "+i + message);
            //发送消息
            pro.send(pr);
            Thread.sleep(500);
        }

    }
}

三消费者

package zhz.kafkaOffset;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 
 *  消费者               
 *
 */
public class ConsumerThreadNew implements Runnable {
    private static Logger                   LOG = LoggerFactory.getLogger(ConsumerThreadNew.class);

    //KafkaConsumer kafka生产者
    private KafkaConsumer<String, String>   consumer;

    //消费者名字
    private String                          name;

    //消费的topic组
    private List<String>                    topics;

    //构造函数
    public ConsumerThreadNew(KafkaConsumer<String, String> consumer, String topic, String name) {
        super();
        this.consumer = consumer;
        this.name = name;
        this.topics = Arrays.asList(topic);
    }

    @Override
    public void run() {
        consumer.subscribe(topics);
        List<ConsumerRecord<String, String>> buffer = new ArrayList<>();

        // 批量提交数量
        final int minBatchSize = 1; 
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("消费者的名字为:" + name + ",消费的消息为:" + record.value());
                buffer.add(record);
            }
            if (buffer.size() >= minBatchSize) {
                //53-58行,模拟处理工程,这里就是处理成功了然后自己手动提交  ,这个相当于处理完了才会去取下一波数据
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
               // consumer.commitSync();
                consumer.commitAsync();//至少一次 ,提交offset
                System.out.println("提交完毕");
                buffer.clear();
            }
        }
    }

}
package zhz.kafkaOffset;

import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * 
 * 消费者的主类
 *
 */
public class MyConsume {
    private static Logger   LOG = LoggerFactory.getLogger(MyConsume.class);

    public MyConsume() {
        // TODO Auto-generated constructor stub
    }

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "master169:9092");//这个地方改你的kafka服务器主机名
        //设置不自动提交,自己手动更新offset
        properties.put("enable.auto.commit", "false");
        properties.put("auto.offset.reset", "latest");
        properties.put("zookeeper.connect", "master169:2181");//这个地方改你的zookepper服务器主机名
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("group.id", "Group1");
        //properties.put("auto.commit.interval.ms", "1000");
        ExecutorService executor = Executors.newFixedThreadPool(5);

        
        //如果修改i的值,这里可以模拟多个消费者
        //执行消费
        for (int i = 0; i < 1; i++) {
            System.out.println("启动一个消费者");
            executor.execute(new ConsumerThreadNew(new KafkaConsumer<String, String>(properties),
                "test", "消费者" + (i + 1)));
        }
    }
}


其中消费者在提交时,有两种方式


consumer.commitSync();//同步

consumer.commitAsync();//异步

关于同步异步方面,参照了https://blog.csdn.net/a953713428/article/details/80030893这篇文章

同步放在for循环里,异步放在for循环外

1)同步手动提交

commitSync()方法会提交由poll()方法返回的最新偏移量,提交成功后马上返回,否则跑出异常。

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        try {
           consumer.commitSync();
        } catch (Exception e) {
            System.out.println("commit failed");
        }
    }
}

每处理一次消息我们提交一次offset。

2)异步手动提交

上面我们使用commitSync()的方式提交数据,每次提交都需要等待broker返回确认结果。这样每提交一次等待一次会限制我们的吞吐量。

如果采用降低提交频率来保证吞吐量,那么则有增加消息重复消费的风险。所以kafka消费者提供了异步提交的API。我们只管发送提交请求无需等待broker返回。

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
    consumer.commitAsync();
}

commitAsync()方法提交最后一个偏移量。在成功提交或碰到无怯恢复的错误之前,commitAsync()会一直重试,但是commitAsync()不会,这也是commitAsync()不好的一个地方。它之所以不进行重试,是因为在它收到服务器响应的时候, 可能有一个更大的偏移量已经提交成功。假设我们发出一个请求用于提交偏移量2000,这个时候发生了短暂的通信问题,服务器收不到请求,自然也不会作出任何响应。与此同时,我们处理了另外一批消息,并成功提交了偏移量3000。如果commitAsync()重新尝试提交偏移量2000 ,它有可能在偏移量3000之后提交成功。这个时候如果发生再均衡,就会出现重复消息。

当然使用手动提交最大的好处就是如果发生了错误我们可以记录下来。commitAsync()也支持回调方法,提交offset发生错误我们可以记下当前的偏移量。

(这段代码,我没有试验)

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
    consumer.commitAsync(new OffsetCommitCallback() {
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
            if(e != null){
                System.out.println("commit failed"+map);
            }
        }
    });
}

=

同步和异步组合提交

提交特定偏移量

等等可以参照https://blog.csdn.net/a953713428/article/details/80030893

(一)(二)所用工程与文件


https://pan.baidu.com/s/14b6da6aBe3gDBcBw8_l1PQ


tjhb



版权声明:本文为zhaohangzhi原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。