springboot集成rabbitmq接收消息问题
是在测试
fanout
模式的exchange消息模型时遇到的问题,发消息没毛病,问题一直是出在接收消息时,有一点很诡异,同样的代码,有时候不报错,报错则显示的是与对象的json转换有关。
报错信息:
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1746) ~[spring-rabbit-2.3.6.jar:2.3.6]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1636) ~[spring-rabbit-2.3.6.jar:2.3.6]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551) ~[spring-rabbit-2.3.6.jar:2.3.6]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539) ~[spring-rabbit-2.3.6.jar:2.3.6]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530) ~[spring-rabbit-2.3.6.jar:2.3.6]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474) ~[spring-rabbit-2.3.6.jar:2.3.6]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967) ~[spring-rabbit-2.3.6.jar:2.3.6]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913) ~[spring-rabbit-2.3.6.jar:2.3.6]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) ~[spring-rabbit-2.3.6.jar:2.3.6]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288) ~[spring-rabbit-2.3.6.jar:2.3.6]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194) ~[spring-rabbit-2.3.6.jar:2.3.6]
at java.base/java.lang.Thread.run(Thread.java:835) ~[na:na]
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:294) ~[spring-amqp-2.3.6.jar:2.3.6]
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:271) ~[spring-amqp-2.3.6.jar:2.3.6]
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:251) ~[spring-amqp-2.3.6.jar:2.3.6]
at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.extractMessage(AbstractAdaptableMessageListener.java:342) ~[spring-rabbit-2.3.6.jar:2.3.6]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter$MessagingMessageConverterAdapter.extractPayload(MessagingMessageListenerAdapter.java:325) ~[spring-rabbit-2.3.6.jar:2.3.6]
at org.springframework.amqp.support.converter.MessagingMessageConverter.fromMessage(MessagingMessageConverter.java:132) ~[spring-amqp-2.3.6.jar:2.3.6]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:207) ~[spring-rabbit-2.3.6.jar:2.3.6]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:134) ~[spring-rabbit-2.3.6.jar:2.3.6]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632) ~[spring-rabbit-2.3.6.jar:2.3.6]
... 10 common frames omitted
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `[B` out of START_OBJECT token
at [Source: (String)"{"id":1,"module":"crud","name":"老狗","desc":"desc"}"; line: 1, column: 1]
at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1468) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1242) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1148) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.deser.std.PrimitiveArrayDeserializers.handleNonArray(PrimitiveArrayDeserializers.java:220) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.deser.std.PrimitiveArrayDeserializers$ByteDeser.deserialize(PrimitiveArrayDeserializers.java:478) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.deser.std.PrimitiveArrayDeserializers$ByteDeser.deserialize(PrimitiveArrayDeserializers.java:426) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4526) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3468) ~[jackson-databind-2.11.4.jar:2.11.4]
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertBytesToObject(AbstractJackson2MessageConverter.java:351) ~[spring-amqp-2.3.6.jar:2.3.6]
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertContent(AbstractJackson2MessageConverter.java:321) ~[spring-amqp-2.3.6.jar:2.3.6]
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:291) ~[spring-amqp-2.3.6.jar:2.3.6]
... 18 common frames omitted
在网上搜到的参考信息,解决了问题,贴上链接
https://www.cnblogs.com/mussessein/p/12106553.html,问题是出在cosumer接收消息时接收的处理方式
错误代码如下:注意是用@Payload byte[] msg 接收消息的,这种方式就会报错
/**
* 监听并消费队列中的消息-fanoutExchange-one-这是第一条队列对应的消费
者
*/
@RabbitListener(queues = "local.huang.mq.fanout.one.queue",containerFactory = "singleListenerContainer")
public void consumeFanoutMsgOne(@Payload byte[] msg){
try {
//监听消费队列中的消息,并进行解析处理
EventInfo info=objectMapper.readValue(msg, EventInfo.class);
log.info("消息模型fanoutExchange-one-消费者-监听消费到消息: {} ",info);
}catch (Exception e){
log.error("消息模型-消费者-发生异常:",e.fillInStackTrace());
}
}
正确方法如下:注意使用Message对象接收消息的(org.springframework.amqp.core.Message包下的)
@RabbitListener(queues = "local.huang.mq.fanout.one.queue",containerFactory = "singleListenerContainer")
public void consumeFanoutMsgOne(Message message){
try {
//监听消费队列中的消息,并进行解析处理
byte[] body = message.getBody();
EventInfo info=objectMapper.readValue(body, EventInfo.class);
log.info("消息模型fanoutExchange-one-消费者-监听消费到消息: {} ",info);
}catch (Exception e){
log.error("消息模型-消费者-发生异常:",e.fillInStackTrace());
}
}
这样问题就解决了。
版权声明:本文为weixin_43917506原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。