Kafka 使用SASL接入点PLAIN机制收发消息(集成Spring Boot)

  • Post author:
  • Post category:其他


默认接入点接入kafka可以参考:

Spring工程项目使用拉取型消费者 KafkaConsumer的常用配置及注意事项

这里没有直接使用 spring封装的kafkaTemplate, 使用的是 原生的 Java API ,支持使用拉取型消费者 并使用 poll() 方法拉取消费消息.



1. 消费者具体常用配置项



1. 添加pom.xml 文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.10.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>kafka-consumer-spring-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-consumer-spring-demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>


2. 配置application.yml :
kafka:
  bootstrap-servers: 127.0.0.1:9092 #指定kafka server的地址,集群配多个,中间,逗号隔开
  producer:
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.apache.kafka.common.serialization.StringSerializer
    batch-size: 1000 # 每次批量发送消息的数量
  consumer:
    # 消费群组ID
    group-id: test-fya-first
    # 是否自动提交
    enable-auto-commit: false
    auto-commit-interval: 1000
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    # 每次最大拉取的消息数目
    max-poll-records: 100
    session-timeout: 30000
    # 新消费组消费初始位置, 默认为 latest 最后点开始
    auto-offset-reset: earliest
    # 每次 poll 的允许最大阻塞时间, 单位 ms
    pollTimeout: 1000
    # 消费者个数, 建议设置不大于分区数目
    consumerCount: 11
	# 使用SASL接入点PLAIN机制
    sasl-mechanism: PLAIN
    security-protocol: SASL_PLAINTEXT
    # Topic名称
    topicName: topic.quick.initial


3. 编写 KafkaConsumer 配置类 KafkaConsumerConfig类:
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import java.util.Properties;

@Configuration
public class KafkaConsumerConfig {

    @Value("${kafka.bootstrap-servers}")
    private String servers;
    @Value("${kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;
    @Value("${kafka.consumer.session-timeout}")
    private String sessionTimeout;
    @Value("${kafka.consumer.auto-commit-interval}")
    private String autoCommitInterval;
    @Value("${kafka.consumer.group-id}")
    private String groupId;
    @Value("${kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${kafka.consumer.max-poll-records}")
    private int maxPollRecord;
    @Value("${kafka.consumer.key-deserializer}")
    private String keyDeserializer;
    @Value("${kafka.consumer.value-deserializer}")
    private String valueDeserializer;
    @Value("${kafka.consumer.consumerCount}")
    protected int consumerCount;

    /**
     * saslMechanism
     */
    @Value("${kafka.consumer.sasl-mechanism}")
    private String saslMechanism;

    /**
     * saslMechanism
     */
    @Value("${kafka.consumer.security-protocol}")
    private String securityProtocol;


    public Properties consumerConfig() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.servers);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, this.enableAutoCommit);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, this.autoCommitInterval);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, this.sessionTimeout);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.autoOffsetReset);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, this.keyDeserializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, this.valueDeserializer);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.maxPollRecord);
        //接入协议,目前支持使用SASL_SSL协议接入
        //SASL鉴权方式,保持不变
        if (StringUtils.isNotEmpty(this.saslMechanism)) {
            props.put(SaslConfigs.SASL_MECHANISM, this.saslMechanism);
        }
        if (StringUtils.isNotEmpty(this.securityProtocol)) {
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, this.securityProtocol);
        }
        return props;
    }
    
}

相比默认方式接入, 主要增加了以下参数:

        props.put(SaslConfigs.SASL_MECHANISM, this.saslMechanism);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, this.securityProtocol);


3. 创建 kafka_client_jaas.conf 文件

内容如下:

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="xxxx"
  password="xxxx";
};

用户名和密码改成实际的值, 创建完成后, 将该文件放到 spring 工程目录的 resource 目录下



4. 在 spring 工程启动类中加载上一步编写的配置文件(使用静态代码块)

spring启动类代码示例:

@SpringBootApplication
public class Boot {
    static {
        URL auth = Boot.class.getClassLoader().getResource("kafka_client_jaas.conf");
        System.setProperty("java.security.auth.login.config", auth.toExternalForm());
    }

    public static void main(String[] args) {
        ApplicationContext context = SpringApplication.run(Boot.class, args);
        String[] activeProfiles = context.getEnvironment().getActiveProfiles();
        StringBuilder sb = new StringBuilder();
        sb.append("使用的profile为:");
        for (String activeProfile : activeProfiles) {
            sb.append(activeProfile + "  ");
        }
        System.out.println(sb.toString());
    }
    
}

参考:


SASL接入点PLAIN机制收发消息


阿里云Demo


Kafka Client配置JAAS(用户密码)的几种方式 实测记录



版权声明:本文为qq_35077107原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。