记录一次java开发基于mqtt的数据推送

  • Post author:
  • Post category:java




前言

接到客户需求,需要把生产工艺数据推送到第三方展示,查阅相关博客后(面向百度编程)了解了大致流程,需要我写个客户端发布订阅消息,所有下面代码做了发布相关;



正文

项目结构分布

在这里插入图片描述

下面开始贴代码

依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

		<!--方便调试可以不要-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>

        <!--mqtt-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

        <dependency>
            <groupId>cn.guoyukun.jdbc</groupId>
            <artifactId>oracle-ojdbc6</artifactId>
            <version>11.2.0.3.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.9</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
@Component
@ConfigurationProperties(prefix = "com.mqtt")
@Data
public class MqttConfiguration {

    private String host;

    private String clientid;

    private String topic;

    private String username;

    private String password;

    private int timeout;

    private int keepalive;
}

对应配置

com:
  mqtt:
    host: tcp://ip:port
    clientid: 这个自己随意,单个客户端不用注意啥,多个需要保持唯一,不然一个启动会导致另一个断开
    topic: 主题 这个一般协议文档会指定
    username: 同上
    password: 同上
    timeout: 30 同上
    keepalive: 60 同上

SpringUtil 用于非spring环境bean的注入

@Component
public class SpringUtil implements ApplicationContextAware {

    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        if(SpringUtil.applicationContext == null){
            SpringUtil.applicationContext = applicationContext;
        }
    }

    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    /**
     * 根据name获取bean
     * @param name
     * @return
     */
    public static Object getBean(String name){
        return getApplicationContext().getBean(name);
    }

    /**
     * 根据class获取bean
     * @param clazz
     * @param <T>
     * @return
     */
    public static <T> T getBean(Class<T> clazz){
        return getApplicationContext().getBean(clazz);
    }

    /**
     * 根据name,class返回指定bean
     * @param name
     * @param clazz
     * @param <T>
     * @return
     */
    public static <T> T getBean(String name,Class<T> clazz){
        return getApplicationContext().getBean(name,clazz);
    }
}

缓存的

@Component
public class RedisUtil {

    private  RedisTemplate redisTemplate;

    @Autowired
    public void setRedisTemplate(RedisTemplate redisTemplate){
        RedisSerializer serializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(serializer);
        redisTemplate.setValueSerializer(serializer);
        redisTemplate.setHashKeySerializer(serializer);
        redisTemplate.setHashValueSerializer(serializer);
        this.redisTemplate = redisTemplate;
    }

    /**
     * 向一张hash表中放入数据,如果不存在将创建
     * @param key 键
     * @param item 项
     * @param value 值
     * @return
     */
    public  boolean hset(String key, String item, Object value){
        try {
            redisTemplate.opsForHash().put(key, item, value);
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }

    public boolean hsetAll(String key, Map value){
        try {
            redisTemplate.opsForHash().putAll(key,value);
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }

    /**
     * hget
     * @param key 键
     * @param item 项
     * @return
     */
    public Object hget(String key, String item){
        return redisTemplate.opsForHash().get(key,item);
    }

    /**
     * 指定有效时间
     * @param key 键
     * @param time 时间 s
     * @return
     */
    public boolean expire(String key, long time){
        try {
            if (time > 0){
                redisTemplate.expire(key, time, TimeUnit.SECONDS);
            }
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }


}

客户端相关

@Slf4j
public class MqttPushClient {

    private MqttConfiguration mqttConfiguration = (MqttConfiguration) SpringUtil.getBean("mqttConfiguration");

    private MqttClient client;

    private static volatile MqttPushClient mqttPushClient = null;

    public static MqttPushClient getInstance(){

        if(null == mqttPushClient){
            synchronized (MqttPushClient.class){
                if (null == mqttPushClient){
                    mqttPushClient = new MqttPushClient();
                }
            }
        }
        return mqttPushClient;
    }

    private MqttPushClient(){
        connect();
    }


    private void connect(){
        try {
            client = new MqttClient(mqttConfiguration.getHost(),mqttConfiguration.getClientid(),new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(false);  // 将订阅持久
            options.setUserName(mqttConfiguration.getUsername());
            options.setPassword(mqttConfiguration.getPassword().toCharArray());
            options.setConnectionTimeout(mqttConfiguration.getTimeout());
            options.setKeepAliveInterval(mqttConfiguration.getKeepalive());
            try {
                client.setCallback(new PushCallback());
                client.connect(options);
            }catch (Exception e){
                log.error("通信异常",e);
            }
        }catch (Exception e){
            log.error("创建客户端异常",e);
        }
    }

    public void publish(String topic, String msg){
        publish(0,false,topic,msg);
    }

    /**
     * 发布主题
     * @param qos 默认qos为0 (只管发不管接)(协议文档会有要求)
     * @param retained
     * @param topic 主题
     * @param msg 数据
     */
    public void publish(int qos, boolean retained, String topic, String msg){
        // 消息体
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(msg.getBytes());
        // 主题目标 这里用来发布
        MqttTopic mqttTopic = client.getTopic(topic);
        if (null == mqttTopic){
            log.error("topic not exist");
        }
        MqttDeliveryToken token;
        try {
            token = mqttTopic.publish(message);
            token.waitForCompletion();
            log.info("数据推送服务器[{}]",new String(message.getPayload()));
        }catch (MqttPersistenceException e){
            log.error("发布异常",e);
        }catch (MqttException e){
            log.error("发布异常",e);
        }
    }


}

MqttCallback 接口实现

@Slf4j
public class PushCallback implements MqttCallback {
    private SubscribeHandle subscribeHandle = (SubscribeHandle) SpringUtil.getBean("subscribeHandle");
    private MqttConfiguration mqttConfiguration = (MqttConfiguration) SpringUtil.getBean("mqttConfiguration");
    @Override
    public void connectionLost(Throwable cause) {
        // 连接丢失
        log.error("连接丢失,重新订阅");
        MqttPushClient.getInstance().subscribe(mqttConfiguration.getSubtopic());
    }
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // 消息订阅处理
//        System.out.println("接收消息主题 : " + topic);
//        System.out.println("接收消息Qos : " + message.getQos());
//        System.out.println("接收消息内容 : " + new String(message.getPayload()));
        log.info("接收消息内容[{}]",new String(message.getPayload()));
        subscribeHandle.handleData(new String(message.getPayload()));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        // 数据收到
    }
}
MqttPushClient.getInstance().publish("pos_message_all","测试");

测试OK

在这里插入图片描述

调试用到工具

链接: https://pan.baidu.com/s/1oTcuhCM0oeyjWckcU52FNw 提取码: xak5

参考文章

https://blog.csdn.net/zhangxing52077/article/details/80568244



版权声明:本文为Gyele原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。