默认接入点接入kafka可以参考:
Spring工程项目使用拉取型消费者 KafkaConsumer的常用配置及注意事项
这里没有直接使用 spring封装的kafkaTemplate, 使用的是 原生的 Java API ,支持使用拉取型消费者 并使用 poll() 方法拉取消费消息.
1. 消费者具体常用配置项
1. 添加pom.xml 文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.10.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>kafka-consumer-spring-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka-consumer-spring-demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
2. 配置application.yml :
kafka:
bootstrap-servers: 127.0.0.1:9092 #指定kafka server的地址,集群配多个,中间,逗号隔开
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
batch-size: 1000 # 每次批量发送消息的数量
consumer:
# 消费群组ID
group-id: test-fya-first
# 是否自动提交
enable-auto-commit: false
auto-commit-interval: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 每次最大拉取的消息数目
max-poll-records: 100
session-timeout: 30000
# 新消费组消费初始位置, 默认为 latest 最后点开始
auto-offset-reset: earliest
# 每次 poll 的允许最大阻塞时间, 单位 ms
pollTimeout: 1000
# 消费者个数, 建议设置不大于分区数目
consumerCount: 11
# 使用SASL接入点PLAIN机制
sasl-mechanism: PLAIN
security-protocol: SASL_PLAINTEXT
# Topic名称
topicName: topic.quick.initial
3. 编写 KafkaConsumer 配置类 KafkaConsumerConfig类:
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import java.util.Properties;
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka.bootstrap-servers}")
private String servers;
@Value("${kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${kafka.consumer.session-timeout}")
private String sessionTimeout;
@Value("${kafka.consumer.auto-commit-interval}")
private String autoCommitInterval;
@Value("${kafka.consumer.group-id}")
private String groupId;
@Value("${kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.max-poll-records}")
private int maxPollRecord;
@Value("${kafka.consumer.key-deserializer}")
private String keyDeserializer;
@Value("${kafka.consumer.value-deserializer}")
private String valueDeserializer;
@Value("${kafka.consumer.consumerCount}")
protected int consumerCount;
/**
* saslMechanism
*/
@Value("${kafka.consumer.sasl-mechanism}")
private String saslMechanism;
/**
* saslMechanism
*/
@Value("${kafka.consumer.security-protocol}")
private String securityProtocol;
public Properties consumerConfig() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.servers);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, this.enableAutoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, this.autoCommitInterval);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, this.sessionTimeout);
props.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.autoOffsetReset);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, this.keyDeserializer);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, this.valueDeserializer);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.maxPollRecord);
//接入协议,目前支持使用SASL_SSL协议接入
//SASL鉴权方式,保持不变
if (StringUtils.isNotEmpty(this.saslMechanism)) {
props.put(SaslConfigs.SASL_MECHANISM, this.saslMechanism);
}
if (StringUtils.isNotEmpty(this.securityProtocol)) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, this.securityProtocol);
}
return props;
}
}
相比默认方式接入, 主要增加了以下参数:
props.put(SaslConfigs.SASL_MECHANISM, this.saslMechanism);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, this.securityProtocol);
3. 创建 kafka_client_jaas.conf 文件
内容如下:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="xxxx"
password="xxxx";
};
用户名和密码改成实际的值, 创建完成后, 将该文件放到 spring 工程目录的 resource 目录下
4. 在 spring 工程启动类中加载上一步编写的配置文件(使用静态代码块)
spring启动类代码示例:
@SpringBootApplication
public class Boot {
static {
URL auth = Boot.class.getClassLoader().getResource("kafka_client_jaas.conf");
System.setProperty("java.security.auth.login.config", auth.toExternalForm());
}
public static void main(String[] args) {
ApplicationContext context = SpringApplication.run(Boot.class, args);
String[] activeProfiles = context.getEnvironment().getActiveProfiles();
StringBuilder sb = new StringBuilder();
sb.append("使用的profile为:");
for (String activeProfile : activeProfiles) {
sb.append(activeProfile + " ");
}
System.out.println(sb.toString());
}
}
参考:
版权声明:本文为qq_35077107原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。