先说一下kafka环境
有一个现有的kafka集群,其中zookeeper为zookeeper-3.4.5,scala为2.11.4,kafka为2.9.2-0.8.1,
现在有一个spring boot的项目,要整合kafka集群,
其中spring boot为2.1.0,spring-kafka为2.2.0
结果报错:
org.springframework.context.ApplicationContextException: Failed to start bean ‘org.springframework.kafka.config.internalKafkaListenerEndpointRegistry’; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
纠结了好久,网上一般就是说什么kafka ip和host name没有对应什么,试验之后还是不行。
于是配置文件添加logging.level.root=debug,查看详细的错误
发现其报错
Error in I/O with host java.io.EOFException kafka-clients-1.0.0.jar,
估计是发送消息到kafkade server时出现连接中单,导致抛出EOF异常
参考了下面这篇文章
估计应该是用1.0.0的kafka client向0.8.1的kafka server端发送数据,版本不兼容的问题。
于是去下载最新的kafka包,
https://archive.apache.org/dist/kafka/2.1.0/
kafka_2.11-2.1.0.tgz,重新安装启动后问题解决
官网在这方面也有介绍
https://spring.io/projects/spring-kafka
Spring for Apache Kafka(spring-kafka)项目将核心Spring概念应用于基于Kafka的消息传递解决方案的开发。
它提供了一个Template作为发送消息的高级抽象。 它还通过@KafkaListener注释和listener container为消息驱动的POJO提供支持。 这些库促进了依赖注入和声明的使用。 使用中看到与Spring Framework对JMS支持和Spring AMQP中对RabbitMQ支持是类似的。
主要提供了以下几个功能
KafkaTemplate
KafkaMessageListenerContainer
@KafkaListener
KafkaTransactionManager
spring-kafka-test jar with embedded kafka server
Apache Kafka的Spring基于纯java kafka-clients jar。兼容matrix如下
可以看到kafka-clients最低也要0.8.2.2,我们最初的0.8.1的kafka是无法整合到spring boot中的
还有一点注意事项
kafka的config下的server.properties文件,要修改两个地方,一个是broker_id,要与zookeeper设置相同
第二个是zookeeper的ip+端口号
如果连接不上,可能还要修改一个地方listener
参考: