springboot 整合 spring-integration-mqtt

  • Post author:
  • Post category:其他


1引入依赖

       <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

2.定义配置文件

package io.renren.network.mqtt.config;

import lombok.Data;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.stereotype.Component;

/**
 * @author chenkang
 * @date 2022/2/14 14:33
 */
@Data
@Component
@Configuration
@ConfigurationProperties("spring.mqtt")
public class MqttConfig {

    /**
     * 链接用户名
     */
    private String username;


    /**
     * 链接密码
     */
    private String password;


    /**
     * 链接地址
     */
    private String url;


    /**
     * 客户端ID 前缀
     */
    private String clientId;


    private Integer completionTimeout = 2000;

    /**
     * 订阅topic
     */
    private String  inputTopic;


    /**
     *
     * 发布topic
     */
    private String  outTopic;


    /**
     *  注册MQTT客户端工厂
     *
     * @return MqttPahoClientFactory
     */
    @Bean
    public MqttPahoClientFactory mqttClientFactory() throws MqttException {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setConnectionTimeout(0);
        options.setKeepAliveInterval(90);
        options.setAutomaticReconnect(true);
        options.setUserName(this.getUsername());
        options.setPassword(this.getPassword().toCharArray());
        options.setServerURIs(new String[]{this.getUrl()});
        factory.setConnectionOptions(options);
        return factory;
    }
}

2.定义订阅配置类

package io.renren.network.mqtt;

import io.renren.network.mqtt.config.MqttConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import javax.annotation.Resource;
import java.util.UUID;

/**
 *
 * 订阅通道
 *
 * @author chenkang
 * @date 2022/2/14 14:39
 */
@Configuration
public class MqttInboundConfiguration {

    @Resource
    private MqttConfig mqttConfig;

    @Resource
    private MqttPahoClientFactory factory;


    @Resource
    private MqttMessageReceiver mqttMessageReceiver;


    @Bean
    public MessageProducerSupport mqttInbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(UUID.randomUUID().toString(), factory, mqttConfig.getInputTopic());
        adapter.setCompletionTimeout(60000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setRecoveryInterval(10000);
        adapter.setQos(0);
        adapter.setOutputChannel(mqttInBoundChannel());
        return adapter;
    }


    /**
     * 此处可以使用其他消息通道
     * Spring Integration默认的消息通道,它允许将消息发送给一个订阅者,然后阻碍发送直到消息被接收。
     *
     * @return MessageChannel
     */
    @Bean
    public MessageChannel mqttInBoundChannel() {
        return  new DirectChannel();
    }


    /**
     * mqtt入站消息处理工具,对于指定消息入站通道接收到生产者生产的消息后处理消息的工具。
     *
     * @return MessageHandler
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttInBoundChannel")
    public MessageHandler mqttMessageHandler() {
        return this.mqttMessageReceiver;
    }
}

3.定义消息接收器

package io.renren.network.mqtt;

import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;


/**
 * @author chenkang
 * @date 2022/2/14 14:42
 */
@Slf4j
@Component
public class MqttMessageReceiver implements MessageHandler {




    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        try {
            MessageHeaders headers = message.getHeaders();
            //获取消息Topic
            String receivedTopic = (String) headers.get(MqttHeaders.RECEIVED_TOPIC);
            log.info("[获取到的消息的topic :]{} ", receivedTopic);
            //获取消息体
            String payload = (String) message.getPayload();
            log.info("[获取到的消息的payload :]{} ", payload);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

4.定义发布配置类

package io.renren.network.mqtt;

import io.renren.network.mqtt.config.MqttConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.stream.CharacterStreamReadingMessageSource;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * 发布通道
 * @author chenkang
 * @date 2022/2/14 14:47
 */
@Configuration
public class MqttOutboundConfiguration {


    @Resource
    private MqttConfig mqttConfiguration;

    @Resource
    private MqttPahoClientFactory factory;




    @Bean
    public IntegrationFlow mqttOutFlow() {
        return IntegrationFlows.from(CharacterStreamReadingMessageSource.stdin(),
                e -> e.poller(Pollers.fixedDelay(2000)))
                .transform(p -> p + "")
                .handle(mqttOutbound())
                .get();
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(UUID.randomUUID().toString(), factory);
        messageHandler.setDefaultQos(0);
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(mqttConfiguration.getOutTopic());
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }






}

5.定义消息网关

package io.renren.network.mqtt;

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * 消息发送网关
 * @author chenkang
 * @date 2022/2/14 16:56
 */
@MessagingGateway
@Component
public interface MsgGateway {



    @Gateway(requestChannel = "mqttOutboundChannel")
    void send(@Header(MqttHeaders.TOPIC) String a, Message<byte[]> out);



}

1.发布  注入  MsgGateway  发送   message  使用:

MessageBuilder.withPayload("nihao".getBytes()).build()

2.receiver 接受类处理订阅接收到的消息

3.本来想实现 多服务链接的 的但是集成太死了 放弃了 自己使用mqttv3做了



版权声明:本文为weixin_38845058原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。