pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java
@@ -21,7 +21,7 @@ vo.orgTag = topicGrp[0] ; vo.protocol = topicGrp[1] ; vo.devId = topicGrp[2] ; vo.topic = topicGrp[3] ; vo.name = topicGrp[3] ; return vo ; } }else{ @@ -30,7 +30,7 @@ } public static String createPubTopic(MqttTopic tp) throws Exception { return tp.orgTag + "/" + tp.protocol + "/" + tp.devId + "/" + tp.topic ; return tp.orgTag + "/" + tp.protocol + "/" + tp.devId + "/" + tp.name; } public static MqttSubMsg parseSubMsg(MqttTopic subTopic, MqttMessage mqttMsg, MqttCallback callback) throws Exception { pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttPubMsg.java
@@ -12,7 +12,7 @@ public String mqttResultSendWebUrl ;//Mqtt返回命令结果 发向目的地web URL public String topic ;//消息主题 public MqttTopic topic ;//消息主题 public String msg ;//消息 public boolean isCacheForOffLine ;//下行命令控制,消息中间件不在线是否缓存命令 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java
@@ -16,18 +16,18 @@ public String orgTag ;//组织标识 public String protocol ;//协议名称 public String devId ;//设备(FBox)ID public String topic ;//消息主题 public String name;//消息主题末端名称 public boolean isEmpty(){ return orgTag == null || protocol == null || devId == null || topic == null || orgTag.trim().length() == 0 || protocol.trim().length() == 0 || devId.trim().length() == 0 || topic.trim().length() == 0 ; return orgTag == null || protocol == null || devId == null || name == null || orgTag.trim().length() == 0 || protocol.trim().length() == 0 || devId.trim().length() == 0 || name.trim().length() == 0 ; } public String shortName(){ return topic ; return name; } public String longName(){ return orgTag + "/" + protocol + "/" + devId + "/" + topic ; return orgTag + "/" + protocol + "/" + devId + "/" + name; } } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/CodeSdV1.java
@@ -10,4 +10,5 @@ public static final String cd_Stir = "01" ;//搅拌启停命令 public static final String cd_Inject = "02" ;//注肥启停命令 public static final String cd_Irr = "03" ;//灌溉启停命令 public static final String cd_Param = "10" ;//设定参数 } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java
@@ -51,8 +51,9 @@ public boolean subMsgMatchPubMsg(MqttPubMsg pubMsg){ if (pubMsg instanceof MqttPubMsgSdV1) { MqttPubMsgSdV1 pubMsgSdV1 = (MqttPubMsgSdV1) pubMsg; //MqttPubMsgSdV1 pubMsgSdV1 = (MqttPubMsgSdV1) pubMsg; if(this.vo4Up != null && this.vo4Up instanceof StateVo){ //只要上报的是状态数据,说明设备响应了命令 return true ; } } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java
@@ -20,5 +20,6 @@ public static final String PubTopicStir = "ctrlStir" ;//搅拌启停 public static final String PubTopicInject = "ctrlInject" ;//注肥启停 public static final String PubTopicIrr = "ctrlIrr" ;//灌溉启停 public static final String PubTopicParam = "setParam" ;//设置参数 } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java
@@ -4,15 +4,16 @@ import com.alibaba.fastjson2.JSONObject; import com.dy.common.mw.protocol.Command; import com.dy.common.mw.protocol4Mqtt.MqttCallback; import com.dy.common.mw.protocol4Mqtt.MqttMsgParser; import com.dy.common.mw.protocol4Mqtt.MqttTopic; import com.dy.common.mw.protocol4Mqtt.Vo4Up; import com.dy.common.mw.protocol4Mqtt.pSdV1.comParam.ComCtrlVo; import com.dy.common.mw.protocol4Mqtt.pSdV1.comParam.ComSetParamVo; import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.FaultClearVo; import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.InjectStartVo; import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.ParamSetVo; import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.StirStartVo; import com.dy.common.mw.protocol4Mqtt.pSdV1.upVos.*; import com.dy.common.mw.protocol4Mqtt.status.DevRunSt; import com.dy.common.mw.protocol4Mqtt.status.DevRunInfo; import org.eclipse.paho.client.mqttv3.MqttMessage; /** @@ -28,8 +29,8 @@ } MqttSubMsgSdV1 msg = new MqttSubMsgSdV1(subTopic, strTxt); Vo4Up vo ; DevRunSt stVo ; switch (subTopic.topic) { DevRunInfo stInfo = null ; switch (subTopic.name) { case ProtocolConstantSdV1.SubTopicWeather -> { vo = JSON.parseObject(strTxt, WeatherVo.class); break; @@ -45,21 +46,22 @@ case ProtocolConstantSdV1.SubTopicState -> { //此处未完成,应该产生一些通信的info,供下面callback.notify(objs)通知出去 vo = JSON.parseObject(strTxt, StateVo.class); stVo = new DevRunSt() ; stVo.id = msg.deviceId ; //stVo.stirRunning = true ; //搅拌运行 true是 false否 //stVo.injectRunning = true ; //注肥运行 true是 false否 //stVo.irrRunning = true ; //灌溉运行 true是 false否 //stVo.alarm = true ; //报警 true是 false否 StateVo stVo = (StateVo)vo ; stInfo = new DevRunInfo() ; stInfo.devId = msg.deviceId ; stInfo.stirRunning = (stVo.stirRunning==null?false:(stVo.stirRunning.byteValue()==1?true:false)) ; //搅拌运行 true是 false否 stInfo.injectRunning = (stVo.injectRunning==null?false:(stVo.injectRunning.byteValue()==1?true:false)) ; //注肥运行 true是 false否 stInfo.irrRunning = (stVo.irrRunning==null?false:(stVo.irrRunning.byteValue()==1?true:false)) ; //灌溉运行 true是 false否 stInfo.alarm = (stVo.alarm==null?false:(stVo.alarm.byteValue()==1?true:false)) ; //报警 true是 false否 break; } default -> { throw new Exception("接收到MQTT消息,协议" + subTopic.protocol + ",设备ID" + subTopic.devId + ",主题" + subTopic.topic + "消息解析逻辑未实现"); throw new Exception("接收到MQTT消息,协议" + subTopic.protocol + ",设备ID" + subTopic.devId + ",主题" + subTopic.name + "消息解析逻辑未实现"); } } msg.vo4Up = vo ; callback.callback(msg); callback.notify(null);//此处未完成 callback.notify(msg.deviceId, stInfo); return msg; } @@ -94,6 +96,13 @@ msg = this.createPubMsgOfIrr(orgTag, com); break; } case CodeSdV1.cd_Param -> { //设置参数 this.checkParam(com); this.checkRtnWebUrl(com); msg = this.createPubMsgOfParam(orgTag, com); break; } default -> { throw new Exception("接收到MQTT命令,协议" + com.protocol + "版本" + com.protocolVersion + "功能码" + com.code + "构造器未实现"); } @@ -122,8 +131,8 @@ msg.isCacheForOffLine = false ; msg.hasResponse = true ; msg.cd = CodeSdV1.cd_Fault ; msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ; msg.msg = JSON.toJSONString(new FaultClearVo(cvo.isDo)) ; msg.topic = new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault) ; msg.msg = JSON.toJSONString(new FaultClearVo(cvo.startTrueStopFalse ?(byte)1:0)) ; return msg ; } private MqttPubMsgSdV1 createPubMsgOfStir(String orgTag, Command com) throws Exception { @@ -138,8 +147,8 @@ msg.isCacheForOffLine = false ; msg.hasResponse = true ; msg.cd = CodeSdV1.cd_Fault ; msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ; msg.msg = JSON.toJSONString(new StirStartVo(cvo.isDo)) ; msg.topic = new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicStir) ; msg.msg = JSON.toJSONString(new StirStartVo(cvo.startTrueStopFalse ?(byte)1:0)) ; return msg ; } private MqttPubMsgSdV1 createPubMsgOfInject(String orgTag, Command com) throws Exception { @@ -154,8 +163,8 @@ msg.isCacheForOffLine = false ; msg.hasResponse = true ; msg.cd = CodeSdV1.cd_Fault ; msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ; msg.msg = JSON.toJSONString(new InjectStartVo(cvo.isDo)) ; msg.topic = new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicInject) ; msg.msg = JSON.toJSONString(new InjectStartVo(cvo.startTrueStopFalse ?(byte)1:0)) ; return msg ; } private MqttPubMsgSdV1 createPubMsgOfIrr(String orgTag, Command com) throws Exception { @@ -170,8 +179,25 @@ msg.isCacheForOffLine = false ; msg.hasResponse = true ; msg.cd = CodeSdV1.cd_Fault ; msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ; msg.msg = JSON.toJSONString(new StirStartVo(cvo.isDo)) ; msg.topic = new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicIrr) ; msg.msg = JSON.toJSONString(new StirStartVo(cvo.startTrueStopFalse ?(byte)1:0)) ; return msg ; } private MqttPubMsgSdV1 createPubMsgOfParam(String orgTag, Command com) throws Exception { JSONObject obj = (JSONObject) com.param; String json = obj.toJSONString(); ComSetParamVo cvo = JSON.parseObject(json, ComSetParamVo.class); if(cvo == null){ throw new Exception("json转ComSetParamVo为null") ; } MqttPubMsgSdV1 msg = new MqttPubMsgSdV1() ; this.setPubMsgBase(com, msg); msg.isCacheForOffLine = false ; msg.hasResponse = false ; msg.cd = CodeSdV1.cd_Param ; msg.topic = new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicParam) ; msg.msg = JSON.toJSONString(new ParamSetVo(cvo.stirDuration, cvo.injectDuration)) ; return msg ; } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComCtrlVo.java
@@ -6,7 +6,7 @@ * @Description */ public class ComCtrlVo { //是否控制动作,true是,false否 //启停动作,true是,false否 //可以执行功能码 00,01,02,03的动作 public boolean isDo;// public boolean startTrueStopFalse;// } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComSetParamVo.java
New file @@ -0,0 +1,23 @@ package com.dy.common.mw.protocol4Mqtt.pSdV1.comParam; import com.alibaba.fastjson2.annotation.JSONField; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * @Author: liurunyu * @Date: 2025/6/11 17:03 * @Description */ @Data @NoArgsConstructor @AllArgsConstructor public class ComSetParamVo { // 搅拌设定时间 public Integer stirDuration ; // 注肥设定时间 public Integer injectDuration ; } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/FaultClearVo.java
@@ -16,10 +16,10 @@ @AllArgsConstructor public class FaultClearVo implements Vo4Down { @JSONField(name = "故障解除") public boolean isDo ; public Byte isDo ;//1是,0否 @Override public String toString(){ return "故障解除:" + (isDo?"是":"否") ; return "故障解除:" + (isDo==1?"是":"否") ; } } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/InjectStartVo.java
@@ -16,9 +16,10 @@ @AllArgsConstructor public class InjectStartVo implements Vo4Down { @JSONField(name = "注肥启停") public boolean isDo ;//true为启,false为停 public Byte isDo ;//1是,0否 @Override public String toString(){ return "注肥启停:" + (isDo?"启":"停") ; return "注肥启停:" + (isDo==1?"启":"停") ; } } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/IrrStartVo.java
@@ -16,9 +16,10 @@ @AllArgsConstructor public class IrrStartVo implements Vo4Down { @JSONField(name = "灌溉启停") public boolean isDo ;//true为启,false为停 public Byte isDo ;//1是,0否 @Override public String toString(){ return "灌溉启停:" + (isDo?"启":"停") ; return "灌溉启停:" + (isDo==1?"是":"否") ; } } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ParamSetVo.java
New file @@ -0,0 +1,31 @@ package com.dy.common.mw.protocol4Mqtt.pSdV1.downVos; import com.alibaba.fastjson2.annotation.JSONField; import com.dy.common.mw.protocol4Mqtt.Vo4Down; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * @Author: liurunyu * @Date: 2025/6/11 16:55 * @Description */ @Data @NoArgsConstructor @AllArgsConstructor public class ParamSetVo implements Vo4Down { @JSONField(name = "搅拌设定时间") public Integer stirDuration ; @JSONField(name = "注肥设定时间") public Integer injectDuration ; @Override public String toString(){ StringBuilder sb = new StringBuilder(); sb.append("搅拌设定时间:" + stirDuration + "\n" ); sb.append("注肥设定时间:" + injectDuration + "\n" ); return sb.toString(); } } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/StirStartVo.java
@@ -16,9 +16,10 @@ @AllArgsConstructor public class StirStartVo implements Vo4Down { @JSONField(name = "搅拌启停") public boolean isDo ;//true为启,false为停 public Byte isDo ;//1是,0否 @Override public String toString(){ return "搅拌启停:" + (isDo?"启":"停") ; return "搅拌启停:" + (isDo==1?"是":"否") ; } } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/ManureVo.java
@@ -12,6 +12,18 @@ */ @Data public class ManureVo implements Vo4Up { @JSONField(name = "flexem_message_id") public Integer messageId ;//消息ID @JSONField(name = "肥料流量") public Float manureFlow ;//肥料流量 @JSONField(name = "注肥时长") public Integer manureTime ;//注肥时长 @JSONField(name = "搅拌时长") public Integer stirTime ;//搅拌时长 @JSONField(name = "flexem_timestamp") public Long devDt ;//设备时间 @@ -27,9 +39,13 @@ @Override public String toString(){ StringBuilder sb = new StringBuilder(); sb.append("水肥数据:") ; sb.append(" 设备时间:"+devDt) ; sb.append(" 设备时间:"+ this.getDevDtStr()) ; sb.append("水肥数据=>") ; sb.append(" 消息ID:" + messageId + ", ") ; sb.append(" 肥料流量:" + manureFlow + ", ") ; sb.append(" 注肥时长:" + manureTime + ", ") ; sb.append(" 搅拌时长:" + stirTime + ", ") ; sb.append(" 设备时间:" + devDt + ", ") ; sb.append(" 设备时间:" + this.getDevDtStr() + ", ") ; sb.append("\n") ; return sb.toString() ; } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/SoilVo.java
@@ -12,6 +12,33 @@ */ @Data public class SoilVo implements Vo4Up { @JSONField(name = "flexem_message_id") public Integer messageId ;//消息ID @JSONField(name = "土壤湿度1") public Float soilHumidity1 ;//土壤湿度1 @JSONField(name = "土壤湿度2") public Float soilHumidity2 ;//土壤湿度2 @JSONField(name = "土壤湿度3") public Float soilHumidity3 ;//土壤湿度3 @JSONField(name = "土壤湿度4") public Float soilHumidity4 ;//土壤湿度4 @JSONField(name = "土壤湿度1") public Float soilTemperature1 ;//土壤温度1 @JSONField(name = "土壤温度2") public Float soilTemperature2 ;//土壤温度2 @JSONField(name = "土壤温度3") public Float soilTemperature3 ;//土壤温度3 @JSONField(name = "土壤温度4") public Float soilTemperature4 ;//土壤温度4 @JSONField(name = "flexem_timestamp") public Long devDt ;//设备时间 @@ -27,10 +54,20 @@ @Override public String toString(){ StringBuilder sb = new StringBuilder(); sb.append("墒情数据:") ; sb.append(" 设备时间:"+devDt) ; sb.append(" 设备时间:"+ this.getDevDtStr()) ; sb.append("墒情数据=>") ; sb.append(" 消息ID:" + messageId + ", ") ; sb.append(" 土壤湿度1:" + soilHumidity1 + ", ") ; sb.append(" 土壤湿度2:" + soilHumidity2 + ", ") ; sb.append(" 土壤湿度3:" + soilHumidity3 + ", ") ; sb.append(" 土壤湿度4:" + soilHumidity4 + ", ") ; sb.append(" 土壤温度1:" + soilTemperature1 + ", ") ; sb.append(" 土壤温度2:" + soilTemperature2 + ", ") ; sb.append(" 土壤温度3:" + soilTemperature3 + ", ") ; sb.append(" 土壤温度4:" + soilTemperature4 + ", ") ; sb.append(" 设备时间:" + devDt + ", ") ; sb.append(" 设备时间:" + this.getDevDtStr() + ", ") ; sb.append("\n") ; return sb.toString() ; } } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/StateVo.java
@@ -12,6 +12,21 @@ */ @Data public class StateVo implements Vo4Up { @JSONField(name = "flexem_message_id") public Integer messageId ;//消息ID @JSONField(name = "搅拌运行") public Byte stirRunning ;//搅拌运行 @JSONField(name = "注肥运行") public Byte injectRunning ;//注肥运行 @JSONField(name = "灌溉运行") public Byte irrRunning ;//灌溉运行 @JSONField(name = "报警") public Byte alarm ;//报警 @JSONField(name = "flexem_timestamp") public Long devDt ;//设备时间 @@ -27,9 +42,14 @@ @Override public String toString(){ StringBuilder sb = new StringBuilder(); sb.append("状态数据:") ; sb.append(" 设备时间:"+devDt) ; sb.append(" 设备时间:"+ this.getDevDtStr()) ; sb.append("状态数据=>") ; sb.append(" 消息ID:" + messageId + ", ") ; sb.append(" 搅拌运行:" + stirRunning + ", ") ; sb.append(" 注肥运行:" + injectRunning + ", ") ; sb.append(" 灌溉运行:" + irrRunning + ", ") ; sb.append(" 报警:" + alarm + ", ") ; sb.append(" 设备时间:" + devDt + ", ") ; sb.append(" 设备时间:" + this.getDevDtStr() + ", ") ; sb.append("\n") ; return sb.toString() ; } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java
@@ -54,16 +54,16 @@ public String toString(){ StringBuilder sb = new StringBuilder(); sb.append("气象数据=>") ; sb.append(" 消息ID:"+messageId + ", ") ; sb.append(" 二氧化碳:"+carbonDioxide + ", ") ; sb.append(" 光照强度:"+lightIntensity + ", ") ; sb.append(" 大气压力:"+atmosphericPressure + ", ") ; sb.append(" 空气温度:"+airTemperature + ", ") ; sb.append(" 空气湿度:"+airHumidity + ", ") ; sb.append(" PM2.5:"+pm25 + ", ") ; sb.append(" PM10:"+pm10 + ", ") ; sb.append(" 设备时间:"+devDt + ", ") ; sb.append(" 设备时间:"+ this.getDevDtStr() + ", ") ; sb.append(" 消息ID:" + messageId + ", ") ; sb.append(" 二氧化碳:" + carbonDioxide + ", ") ; sb.append(" 光照强度:" + lightIntensity + ", ") ; sb.append(" 大气压力:" + atmosphericPressure + ", ") ; sb.append(" 空气温度:" + airTemperature + ", ") ; sb.append(" 空气湿度:" + airHumidity + ", ") ; sb.append(" PM2.5:" + pm25 + ", ") ; sb.append(" PM10:" + pm10 + ", ") ; sb.append(" 设备时间:" + devDt + ", ") ; sb.append(" 设备时间:" + this.getDevDtStr() + ", ") ; sb.append("\n") ; return sb.toString() ; } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineInfo.java
File was renamed from pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineSt.java @@ -9,7 +9,7 @@ * @Description */ @Data public class DevOnLineSt implements MqttNotifyInfo { public class DevOnLineInfo implements MqttNotifyInfo { public String id ; public String protocol ; public Boolean onLine ; pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunInfo.java
File was renamed from pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunSt.java @@ -9,8 +9,8 @@ * @Description */ @Data public class DevRunSt implements MqttNotifyInfo { public String id ; public class DevRunInfo implements MqttNotifyInfo { public String devId ;//MQTT设置ID public Boolean stirRunning ;//搅拌运行 true是 false否 public Boolean injectRunning ;//注肥运行 true是 false否 public Boolean irrRunning ;//灌溉运行 true是 false否 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java
@@ -1,6 +1,6 @@ package com.dy.rtuMw.server.mqtt; import com.dy.common.mw.protocol4Mqtt.status.DevRunSt; import com.dy.common.mw.protocol4Mqtt.status.DevRunInfo; import com.dy.rtuMw.server.forTcp.RtuLogDealer; import com.dy.rtuMw.server.local.localProtocol.RtuOnLineStateStatisticsVo; @@ -166,7 +166,7 @@ } } public static void setStatus(String devId, DevRunSt st){ public static void setStatus(String devId, DevRunInfo st){ DevStatus vo = map.get(devId) ; if(vo != null) { if(st.stirRunning != null){ pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java
@@ -3,8 +3,8 @@ import com.dy.common.mw.channel.mqtt.MqttClientPool; import com.dy.common.mw.protocol4Mqtt.MqttNotify; import com.dy.common.mw.protocol4Mqtt.MqttNotifyInfo; import com.dy.common.mw.protocol4Mqtt.status.DevOnLineSt; import com.dy.common.mw.protocol4Mqtt.status.DevRunSt; import com.dy.common.mw.protocol4Mqtt.status.DevOnLineInfo; import com.dy.common.mw.protocol4Mqtt.status.DevRunInfo; import com.dy.rtuMw.server.ServerProperties; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -77,15 +77,15 @@ public void notify(String devId, MqttNotifyInfo... infos) { if(devId != null && infos != null && infos.length > 0){ for(MqttNotifyInfo info : infos){ if(info instanceof DevOnLineSt){ DevOnLineSt onLineSt = (DevOnLineSt)info; if(info instanceof DevOnLineInfo){ DevOnLineInfo onLineSt = (DevOnLineInfo)info; if(onLineSt.onLine != null && onLineSt.onLine.booleanValue()){ DevStatusDealer.onLine(devId, ((DevOnLineSt)info).protocol); DevStatusDealer.onLine(devId, ((DevOnLineInfo)info).protocol); }else{ DevStatusDealer.offLine(devId); } } else if(info instanceof DevRunSt){ DevStatusDealer.setStatus(devId, (DevRunSt)info); } else if(info instanceof DevRunInfo){ DevStatusDealer.setStatus(devId, (DevRunInfo)info); } } } pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
@@ -45,33 +45,6 @@ } } private void nextDeal(MqttSubMsg subMsg)throws Exception { subMsg.action(new Callback() { @Override public void call(Object obj) { MqttSubMsg subMs = (MqttSubMsg) obj ; MqttPubMsg pubMs = MqttPubMsgCache.matchFromTail(subMs) ; if(pubMs != null){ //匹配到下行消息(命令) subMs.mqttResultSendWebUrl = pubMs.mqttResultSendWebUrl ; subMs.commandId = pubMs.commandId ; try { MqttComResultCache.getInstance().cacheMqttComResult(new MqttComResultNode(subMs)); } catch (Exception e) { log.error("缓存发布消息(命令)结果发生异常", e); } } try{ MqttSubMsgCache.getInstance().cacheMsg(new MqttSubMsgNode(subMsg)); }catch (Exception e){ log.error("缓存订阅消息数据发生异常", e); } } @Override public void call(Object... objs) { } @Override public void exception(Exception e) { } }); subMsg.action(new MqttSubMsgDealer()); } } pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgCache.java
@@ -86,8 +86,7 @@ while(node != null && node.obj != null){ obj = (MqttPubMsgNode)node.obj; pubMsg = obj.result ; if(pubMsg != null && subMsg.subMsgMatchPubMsg(pubMsg)){ if(pubMsg != null && subMsg.subMsgMatchPubMsg(pubMsg)){ obj.onceReceivedResult = true ;//标识已经收到命令结果 return pubMsg; }else{ pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java
@@ -65,7 +65,7 @@ }else{ if(mqttClient != null && mqttClient.isConnected()){ try { mqttManager.publishMsg(mqttClient, this.result.topic, this.result.msg); mqttManager.publishMsg(mqttClient, this.result.topic.longName(), this.result.msg); DevStatusDealer.afterSendPubMessage(this.result.deviceId); RtuLogDealer.log4Mqtt(this.result.deviceId, "发布消息 主题:" + this.result.topic + " 消息:" + this.result.msg); log.info("发布MQTT消息(主题=" + this.result.topic + ")" + this.result.msg); @@ -74,7 +74,11 @@ }finally { mqttManager.pushMqttClient(mqttClient); } return false ; if(this.result.hasResponse){ return false ; }else{ return true ; } }else{ //未曾连接MQTT服务器 return this.decideRemoveNodeFromCach(now) ; pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgDealer.java
New file @@ -0,0 +1,41 @@ package com.dy.rtuMw.server.mqtt; import com.dy.common.mw.protocol4Mqtt.MqttPubMsg; import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; import com.dy.common.util.Callback; import lombok.extern.slf4j.Slf4j; /** * @Author: liurunyu * @Date: 2025/6/11 17:33 * @Description */ @Slf4j public class MqttSubMsgDealer implements Callback { @Override public void call(Object obj) { MqttSubMsg subMs = (MqttSubMsg) obj ; MqttPubMsg pubMs = MqttPubMsgCache.matchFromTail(subMs) ; if(pubMs != null){ //匹配到下行消息(命令) subMs.mqttResultSendWebUrl = pubMs.mqttResultSendWebUrl ; subMs.commandId = pubMs.commandId ; try { MqttComResultCache.getInstance().cacheMqttComResult(new MqttComResultNode(subMs)); } catch (Exception e) { log.error("缓存发布消息(命令)结果发生异常", e); } } try{ MqttSubMsgCache.getInstance().cacheMsg(new MqttSubMsgNode(subMs)); }catch (Exception e){ log.error("缓存订阅消息数据发生异常", e); } } @Override public void call(Object... objs) { } @Override public void exception(Exception e) { } }