进行ApiFox发送内部命令测试,MQTTX发布气象数据测试,修改测试中发现的bug,修改不完善的地方。
18个文件已修改
183 ■■■■■ 已修改文件
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPool.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPooledObjectFactory.java 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/Test.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java 16 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java 24 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/ReturnCommand.java 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java 35 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPool.java
@@ -13,8 +13,8 @@
    private final GenericObjectPool<MqttClient> pool;
    public MqttClientPool(String broker, String username, String password, int maxConnections) {
        MqttClientPooledObjectFactory factory = new MqttClientPooledObjectFactory(broker, username, password);
    public MqttClientPool(String broker, String username, String password, int maxConnections, boolean useMemoryPersistence) {
        MqttClientPooledObjectFactory factory = new MqttClientPooledObjectFactory(broker, username, password, useMemoryPersistence);
        GenericObjectPoolConfig<MqttClient> config = new GenericObjectPoolConfig<>();
        config.setMaxTotal(maxConnections);
        config.setMaxIdle(maxConnections);
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPooledObjectFactory.java
@@ -5,6 +5,7 @@
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
 * @Author: liurunyu
@@ -16,18 +17,26 @@
    private final String broker;
    private final String username;
    private final String password;
    private final Boolean useMemoryPersistence;
    public MqttClientPooledObjectFactory(String broker, String username, String password) {
    public MqttClientPooledObjectFactory(String broker, String username, String password, boolean useMemoryPersistence) {
        this.broker = broker;
        this.username = username;
        this.password = password;
        this.useMemoryPersistence = useMemoryPersistence;
    }
    @Override
    public MqttClient create() throws Exception {
        String clientId = MqttClient.generateClientId();
        MqttClient client = new MqttClient(broker, clientId);
        MqttClient client = null ;
        // 使用内存持久化而非默认的文件持久化
        if (useMemoryPersistence) {
            MemoryPersistence persistence = new MemoryPersistence();
            client = new MqttClient(broker, clientId, persistence);
        }else{
            client = new MqttClient(broker, clientId);
        }
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(username);
        options.setPassword(password.toCharArray());
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/Test.java
@@ -34,7 +34,7 @@
    public static void main(String[] args) {
        try{
            // 初始化连接池
            pool = new MqttClientPool("tcp://" + mqSvIp + ":" + mqSvPort, mqSvUserName, mqSvUserPassword, maxConnections);
            pool = new MqttClientPool("tcp://" + mqSvIp + ":" + mqSvPort, mqSvUserName, mqSvUserPassword, maxConnections, true);
            MqttClient clientSub = pool.popClient() ;
            testSubscribe(clientSub, topic1);
            testSubscribe(clientSub, topic2);
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java
@@ -15,8 +15,8 @@
    public String deviceId ;//设备ID
    public String protocol;//协议
    public String topic ;//消息主题
    public String msg ;//消息
    public MqttTopic topic ;//消息主题
    public String metaData;//MQTT推送来的元数据
    public abstract boolean valid();
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java
@@ -18,4 +18,16 @@
    public String devId ;//设备(FBox)ID
    public String topic ;//消息主题
    public boolean isEmpty(){
        return orgTag == null || protocol == null || devId == null || topic == null
                || orgTag.trim().length() == 0 || protocol.trim().length() == 0 || devId.trim().length() == 0 || topic.trim().length() == 0 ;
    }
    public String shortName(){
        return topic ;
    }
    public String longName(){
        return orgTag + "/" + protocol + "/" + devId + "/" + topic ;
    }
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java
@@ -24,8 +24,8 @@
    public MqttSubMsgSdV1(MqttTopic subTopic, String msg) {
        this.deviceId = subTopic.devId ;
        this.protocol = subTopic.protocol ;
        this.topic = subTopic.topic ;
        this.msg = msg ;
        this.topic = subTopic ;
        this.metaData = msg ;
    }
    public String toString(){
        StringBuilder sb = new StringBuilder();
@@ -35,17 +35,17 @@
                    .append("\n") ;
        }
        sb.append("主题:")
                .append(topic)
                .append("\n") ;
        sb.append("消息:")
                .append(msg)
                .append(topic.longName())
                .append("\n") ;
        if(vo4Up != null){
            sb.append("数据:")
                    .append(vo4Up.toString())
                    .append("\n") ;
        }else{
            sb.append("元数据:")
                    .append(metaData)
                    .append("\n") ;
        }
        return sb.toString() ;
    }
@@ -64,7 +64,7 @@
        if (topic == null || topic.isEmpty()) {
            return false;
        }
        if (msg == null || msg.isEmpty()) {
        if (metaData == null || metaData.isEmpty()) {
            return false;
        }
        return true;
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java
@@ -12,6 +12,7 @@
import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.InjectStartVo;
import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.StirStartVo;
import com.dy.common.mw.protocol4Mqtt.pSdV1.upVos.*;
import com.dy.common.mw.protocol4Mqtt.status.DevRunSt;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
@@ -21,38 +22,45 @@
 */
public class ProtocolParserSdV1 {
    public MqttSubMsgSdV1 parseSubMsg(MqttTopic subTopic, MqttMessage mqttMsg, MqttCallback callback) throws Exception {
        String msg = new String(mqttMsg.getPayload(), "UTF-8");
        if(JSON.isValid(msg)){
            throw new Exception("接收到MQTT消息,协议" + subTopic.protocol + ",设备ID" + subTopic.devId + ",主题" + subTopic.topic + "消息格式非json数据(" + msg + ")") ;
        String strTxt = new String(mqttMsg.getPayload(), "UTF-8");
        if(!JSON.isValid(strTxt)){
            throw new Exception("接收到MQTT消息,协议" + subTopic.protocol + ",设备ID" + subTopic.devId + ",主题" + subTopic.longName() + "消息格式非json数据(" + strTxt + ")") ;
        }
        MqttSubMsgSdV1 ms = new MqttSubMsgSdV1(subTopic, msg);
        MqttSubMsgSdV1 msg = new MqttSubMsgSdV1(subTopic, strTxt);
        Vo4Up vo ;
        DevRunSt stVo ;
        switch (subTopic.topic) {
            case ProtocolConstantSdV1.SubTopicWeather -> {
                vo = JSON.parseObject(msg, WeatherVo.class);
                vo = JSON.parseObject(strTxt, WeatherVo.class);
                break;
            }
            case ProtocolConstantSdV1.SubTopicSoil -> {
                vo = JSON.parseObject(msg, SoilVo.class);
                vo = JSON.parseObject(strTxt, SoilVo.class);
                break;
            }
            case ProtocolConstantSdV1.SubTopicManure -> {
                vo = JSON.parseObject(msg, ManureVo.class);
                vo = JSON.parseObject(strTxt, ManureVo.class);
                break;
            }
            case ProtocolConstantSdV1.SubTopicState -> {
                //此处未完成,应该产生一些通信的info,供下面callback.notify(objs)通知出去
                vo = JSON.parseObject(msg, StateVo.class);
                vo = JSON.parseObject(strTxt, StateVo.class);
                stVo = new DevRunSt() ;
                stVo.id = msg.deviceId ;
                //stVo.stirRunning = true ; //搅拌运行 true是 false否
                //stVo.injectRunning = true ; //注肥运行 true是 false否
                //stVo.irrRunning = true ; //灌溉运行 true是 false否
                //stVo.alarm = true ; //报警 true是 false否
                break;
            }
            default -> {
                throw new Exception("接收到MQTT消息,协议" + subTopic.protocol + ",设备ID" + subTopic.devId + ",主题" + subTopic.topic + "消息解析逻辑未实现");
            }
        }
        ms.vo4Up = vo ;
        callback.callback(ms);
        msg.vo4Up = vo ;
        callback.callback(msg);
        callback.notify(null);//此处未完成
        return ms;
        return msg;
    }
    public MqttPubMsgSdV1 createPubMsg(String orgTag, Command com) throws Exception {
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java
@@ -43,7 +43,7 @@
    public String devDtStr ;//设备时间
    public String getDevDtStr() {
        if(devDt == null){
        if(devDt != null){
            return DateTime.yyyy_MM_dd_HH_mm_ss(DateTime.getDate(devDt)) ;
        }else{
            return "" ;
@@ -53,17 +53,17 @@
    @Override
    public String toString(){
        StringBuilder sb = new StringBuilder();
        sb.append("气象数据:") ;
        sb.append(" 消息ID:"+messageId) ;
        sb.append(" 二氧化碳:"+carbonDioxide) ;
        sb.append(" 光照强度:"+lightIntensity) ;
        sb.append(" 大气压力:"+atmosphericPressure) ;
        sb.append(" 空气温度:"+airTemperature) ;
        sb.append(" 空气湿度:"+airHumidity) ;
        sb.append(" PM2.5:"+pm25) ;
        sb.append(" PM10:"+pm10) ;
        sb.append(" 设备时间:"+devDt) ;
        sb.append(" 设备时间:"+ this.getDevDtStr()) ;
        sb.append("气象数据=>") ;
        sb.append(" 消息ID:"+messageId + ", ") ;
        sb.append(" 二氧化碳:"+carbonDioxide + ", ") ;
        sb.append(" 光照强度:"+lightIntensity + ", ") ;
        sb.append(" 大气压力:"+atmosphericPressure + ", ") ;
        sb.append(" 空气温度:"+airTemperature + ", ") ;
        sb.append(" 空气湿度:"+airHumidity + ", ") ;
        sb.append(" PM2.5:"+pm25 + ", ") ;
        sb.append(" PM10:"+pm10 + ", ") ;
        sb.append(" 设备时间:"+devDt + ", ") ;
        sb.append(" 设备时间:"+ this.getDevDtStr() + ", ") ;
        sb.append("\n") ;
        return sb.toString() ;
    }
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
@@ -474,6 +474,7 @@
                if(mqVo.poolMaxSize <= 1 || mqVo.poolMaxSize > 1000){
                    throw new Exception("config.mqtt.poolMaxSize配置的连接池连接最大数量不合法") ;
                }
                mqVo.useMemoryPersistence = conf.getSetAttrBoolean(doc, "config.mqtt", "useMemoryPersistence", null, null) ;
                String proAndDevIds = conf.getSetAttrTxt(doc, "config.mqtt", "protocolAndDeviceIds", null, false, null) ;
                if(proAndDevIds == null || proAndDevIds.trim().equals("")){
                    throw new Exception("config.mqtt.protocolAndDeviceIds配置不合法") ;
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java
@@ -32,7 +32,7 @@
                com2.code = CodeLocal.stopMqttSv ;
                com2.type = CommandType.innerCommand ;
                new CommandInnerDeaLer().deal(com2) ;
                log.info("关闭程序前,关闭了MQTT服务");
            }catch (Exception e){
                log.error("程序(控制台)关闭钩子发生异常", e);
            }
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java
@@ -113,6 +113,10 @@
                rCom = this.stopMqttSv(com);
                break;
            }
            case CodeLocal.recoverMqttSv -> {
                rCom = this.recoverMqttSv(com);
                break;
            }
            default -> {
                rCom = ReturnCommand.errored("出错,收到内部命令的功能码不能识别!", com.getId(), com.getCode());
                break;
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/ReturnCommand.java
@@ -19,7 +19,6 @@
     * @param message
     */
    public static Command successed(String message, String commandId, String code, Object attachment) {
        log.info(message);
        Command command = new Command().createReturnSuccessCommand(message, commandId, code);
        command.setAttachment(attachment);
        return command;
@@ -29,7 +28,6 @@
     * @param message
     */
    public static Command successed(String message, String commandId, String code) {
        log.info(message);
        return new Command().createReturnSuccessCommand(message, commandId, code);
    }
    /**
@@ -37,7 +35,6 @@
     * @param message
     */
    public static Command errored(String message, String commandId, String code) {
        log.error(message);
        return new Command().createReturnErrorCommand(message, commandId, code);
    }
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java
@@ -38,7 +38,7 @@
    public static final String onPartLineMqtt = "LMCD0002" ;//查询部分MQTT设备在线情况
    public static final String onLineStatisticsMqtt = "LMCD0003" ;//查询所有MQTT设备状态统计情况
    public static final String onLineStatisticsMqtt = "LMCD0003" ;//查询所有MQTT设备在线状态统计情况
    public static final String allRtuStatesMqtt = "LMCD0010" ;//查询所有MQTT设备状态
@@ -48,5 +48,7 @@
    public static final String stopMqttSv = "LMCD0110" ;//停止Mqtt服务
    public static final String recoverMqttSv = "LMCD0112" ;//重启MQTT服务,接入新的MQTT连接
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java
@@ -55,7 +55,7 @@
            Map.Entry<String, DevStatus> entry = null ;
            while(it.hasNext()){
                entry = it.next() ;
                if(((DevStatus)entry).onLine != null && ((DevStatus)entry).onLine.booleanValue()){
                if((entry.getValue()).onLine != null && (entry.getValue()).onLine.booleanValue()){
                    vo.onLineNum++ ;
                }else{
                    vo.offLineNum++ ;
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java
@@ -10,6 +10,9 @@
import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.MqttClient;
import java.util.ArrayList;
import java.util.List;
/**
 * @Author: liurunyu
 * @Date: 2025/6/4 14:54
@@ -24,6 +27,8 @@
    private MqttUnitConfigVo configVo ;
    private MqttClientPool pool;
    private List<MqttClient> subClients ;
    private MqttManager(){
    }
@@ -43,8 +48,9 @@
     * @throws Exception
     */
    public void start()throws Exception{
        subClients = new ArrayList<>();
        String URL = "tcp://" + this.configVo.svIp + ":" + this.configVo.svPort;
        this.pool = new MqttClientPool(URL, this.configVo.svUserName, this.configVo.svUserPassword, this.configVo.poolMaxSize);
        this.pool = new MqttClientPool(URL, this.configVo.svUserName, this.configVo.svUserPassword, this.configVo.poolMaxSize, this.configVo.useMemoryPersistence);
        if(this.pool.isClose()){
            throw new Exception("Mqtt连接池初始化失败");
        }
@@ -57,6 +63,7 @@
        if(clientSub == null || !clientSub.isConnected()){
            throw new Exception("Mqtt连接池获得订阅连接不可用");
        }
        subClients.add(clientSub) ;
        // 订阅主题
        for(int i = 0; i < this.configVo.subTopics.length; i++){
            for(int j = 0 ; j < this.configVo.protocolAndDeviceIds.length; j++){
@@ -90,6 +97,18 @@
    }
    public void stop()throws Exception{
        if(subClients != null && subClients.size() > 0){
            for (MqttClient client : subClients) {
                if(client != null && client.isConnected()){
                    try{
                        client.disconnect();
                        client.close();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }
        }
        if(this.pool != null){
            // 关闭连接池
            this.pool.close();
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
@@ -22,22 +22,27 @@
    @Override
    public void messageArrived(String topic, MqttMessage msg) throws Exception {
        MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic) ;
        MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback(){
            @Override
            public void callback(MqttSubMsg subMsg) {
                DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol);
                DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId);
                RtuLogDealer.log4Mqtt(subMsg.deviceId, "订阅消息    主题:" + subMsg.topic + "   消息:" + subMsg.msg);
            }
            @Override
            public void notify(String devId, MqttNotifyInfo... infos) {
                if(notify != null){
                    notify.notify(devId, infos) ;
        try {
            MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic);
            MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback() {
                @Override
                public void callback(MqttSubMsg subMsg) {
                    DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol);
                    DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId);
                    RtuLogDealer.log4Mqtt(subMsg.deviceId, "订阅消息    主题:" + subMsg.topic.longName() + "   元数据:" + subMsg.metaData);
                }
            }
        }) ;
        this.nextDeal(subMsg);
                @Override
                public void notify(String devId, MqttNotifyInfo... infos) {
                    if (notify != null) {
                        notify.notify(devId, infos);
                    }
                }
            });
            this.nextDeal(subMsg);
        }catch(Exception e){
            log.error("处理MQTT订阅消息发生异常", e);
        }
    }
    private void nextDeal(MqttSubMsg subMsg)throws Exception {
        subMsg.action(new Callback() {
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java
@@ -13,6 +13,7 @@
    public String svUserName ;//
    public String svUserPassword ;//
    public Integer poolMaxSize ;//
    public Boolean useMemoryPersistence ;
    public String[] protocolAndDeviceIds ;//设备协议与ID(FBox)id
    public String[] deviceIds ;//设备(FBox)id
    public String[] subTopics ;//订阅的主题
@@ -27,6 +28,7 @@
        this.svUserName = "dyyjy" ;
        this.svUserPassword = "Dyyjy2025,;.abc!@#" ;
        this.poolMaxSize = 10 ;
        useMemoryPersistence = true ;
        this.pubTopicQos = 1 ;
        this.noSubThenOff = 10 * 60 * 10000L ;
    }
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml
@@ -171,6 +171,7 @@
    svUserName MQTT服务器用户名
    svUserPassword MQTT服务器用户密码
    poolMaxSize 连接池最大连接数
    useMemoryPersistence 使用内存持久化而非默认的文件持久化(true是 false否)
    protocolAndDeviceIds 在子系统(orgTag)中接入的设备(FBox)所用协议及设备id集合,多个用逗号隔开,协议与ID用正斜杠隔开,例如:sd1/338220031439,sd1/338220031440
    subTopicAndQos: 订阅主题与Qos,主题名与其Qos用逗号隔开,多个主题及Qos用分号隔开,例如:ym/topic1,1;ym/topic2,1;ym/topic3,1,如果有多个OrgTag,主题前缀用其OrgTag
    pubTopicQos: 发布主题的Qos,取值范围:
@@ -185,6 +186,7 @@
          svUserName="dyyjy"
          svUserPassword="Dyyjy2025,;.abc!@#"
          poolMaxSize="10"
          useMemoryPersistence="true"
          protocolAndDeviceIds="${mqtt.protocolAndDeviceIds}"
          subTopicAndQos="${mqtt.subTopicAndQos}"
          pubTopicQos="1"