背景: 最近因项目需要用到kafka的消息实现程序的业务逻辑,但是配置消费者还是xml形式,于是研究一下怎么基于xml实现多个topic配置。
场景一:基于xml文件配置消费同一集群下的多个topic
1. 创建xml文件,内容参考如下
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 1. 加载properties文件 -->
<context:property-placeholder
ignore-resource-not-found="true"
file-encoding="UTF-8"
location="classpath*:test1.properties,
classpath*:test2.properties"/>
<!-- 2.配置初始化地址 -->
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
<entry key="group.id" value="${kafka.group.id}" />
<entry key="enable.auto.commit" value="${kafka.enable.auto.commit}" />
<entry key="session.timeout.ms" value="${kafka.session.timeout.ms}" />
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
<entry key="value.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
</map>
</constructor-arg>
</bean>
<!-- 3.创建consumerFactory bean -->
<bean id="consumerFactory"
class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" primary="true">
<constructor-arg>
<ref bean="consumerProperties" />
</constructor-arg>
</bean>
<!-- 4.定义消费实现类 -->
<bean id="kafkaConsumerService" class="包名地址" />
<!-- 5.消费者容器配置信息 -->
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<!-- topic -->
<constructor-arg name="topics">
<list>
<value>${kafka.topic.topic}</value>
<value>${program.kafka.topic}</value>
</list>
</constructor-arg>
<property name="messageListener" ref="kafkaConsumerService" />
</bean>
<!-- 6.消费者并发消息监听容器,执行doStart()方法 -->
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" >
<constructor-arg ref="consumerFactory" />
<constructor-arg ref="containerProperties" />
<property name="concurrency" value="${kafka.concurrency}" />
</bean>
</beans>
2. 初始化xml,基于springboot注解初始化方式
@Configuration
@ImportResource(locations = {"classpath:applicationContext-kafka-consumer.xml"})
public class KafkaConfig {
}
场景二:基于xml文件配置不同集群下的多个topic
那怎么样消费不同的集群呢,其实也挺简单的创建同样的一个XML文件指定不同的消费类,同时初始化两个XML文件即可。
可能遇到如下问题:
springboot启动失败错误提示
Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a single bean, but 2 were found:
– consumerFactory: defined in class path resource [applicationContext-kafka-consumer.xml]
– programConsumerFactory: defined in class path resource [applicationContext-kafka-program-consumer.xml]
因为springboot启动初始化了两个相同kafkaListenerContainerFactory单例类,解决办法指定一个主的就可以了设置primary=”true”
配置参考如下样例
<!-- 2.创建consumerFactory bean -->
<bean id="consumerFactory"
class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" primary="true">
<constructor-arg>
<ref bean="consumerProperties" />
</constructor-arg>
</bean>
版权声明:本文为qq1353424111原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。