kafka 基于xml配置消费同一集群多个topic、不同集群下的多个topic

  • Post author:
  • Post category:其他


背景: 最近因项目需要用到kafka的消息实现程序的业务逻辑,但是配置消费者还是xml形式,于是研究一下怎么基于xml实现多个topic配置。

场景一:基于xml文件配置消费同一集群下的多个topic

1. 创建xml文件,内容参考如下

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
         http://www.springframework.org/schema/context
         http://www.springframework.org/schema/context/spring-context.xsd">

    <!-- 1. 加载properties文件 -->
    <context:property-placeholder
            ignore-resource-not-found="true"
            file-encoding="UTF-8"
            location="classpath*:test1.properties,
                    classpath*:test2.properties"/>
    
   <!-- 2.配置初始化地址 -->
    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
                <entry key="group.id" value="${kafka.group.id}" />
                <entry key="enable.auto.commit" value="${kafka.enable.auto.commit}" />
                <entry key="session.timeout.ms" value="${kafka.session.timeout.ms}" />
                <entry key="key.deserializer"
                       value="org.apache.kafka.common.serialization.StringDeserializer" />
                <entry key="value.deserializer"
                       value="org.apache.kafka.common.serialization.StringDeserializer" />
            </map>
        </constructor-arg>
    </bean>

    <!-- 3.创建consumerFactory bean -->
    <bean id="consumerFactory"
          class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" primary="true">
        <constructor-arg>
            <ref bean="consumerProperties" />
        </constructor-arg>
    </bean>

    <!-- 4.定义消费实现类 -->
    <bean id="kafkaConsumerService" class="包名地址" />

    <!-- 5.消费者容器配置信息 -->
    <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
        <!-- topic -->
        <constructor-arg name="topics">
            <list>
                <value>${kafka.topic.topic}</value>
                <value>${program.kafka.topic}</value>
            </list>
        </constructor-arg>
        <property name="messageListener" ref="kafkaConsumerService" />
    </bean>
    <!-- 6.消费者并发消息监听容器,执行doStart()方法 -->
    <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" >
        <constructor-arg ref="consumerFactory" />
        <constructor-arg ref="containerProperties" />
        <property name="concurrency" value="${kafka.concurrency}" />
    </bean>
</beans>

2. 初始化xml,基于springboot注解初始化方式

@Configuration
@ImportResource(locations = {"classpath:applicationContext-kafka-consumer.xml"})
public class KafkaConfig {
}

场景二:基于xml文件配置不同集群下的多个topic

那怎么样消费不同的集群呢,其实也挺简单的创建同样的一个XML文件指定不同的消费类,同时初始化两个XML文件即可。

可能遇到如下问题:

springboot启动失败错误提示

Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a single bean, but 2 were found:

– consumerFactory: defined in class path resource [applicationContext-kafka-consumer.xml]

– programConsumerFactory: defined in class path resource [applicationContext-kafka-program-consumer.xml]

因为springboot启动初始化了两个相同kafkaListenerContainerFactory单例类,解决办法指定一个主的就可以了设置primary=”true”

配置参考如下样例

<!-- 2.创建consumerFactory bean -->
<bean id="consumerFactory"
      class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" primary="true">
    <constructor-arg>
        <ref bean="consumerProperties" />
    </constructor-arg>
</bean>



版权声明:本文为qq1353424111原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。