采用模拟软件MQTTX进行模拟测试,发现并修改MQTT相关水肥机上行数据与下行命令逻辑bug.
| | |
| | | s += (protocol == null ? "" : ("协议=" + protocol + "\n")); |
| | | s += (protocolVersion == null ? "" : ("协议版本号=" + protocolVersion + "\n")); |
| | | s += (rtuAddr == null ? "" : ("Rtu地址=" + rtuAddr + "\n")); |
| | | s += "命令类型=" + (type.equals(CommandType.innerCommand)?"内部命令":(type.equals(CommandType.outerCommand)?"RTU命令":"透传命令")) + "\n" ; |
| | | s += "命令类型=" + ( |
| | | type.equals(CommandType.innerCommand)?"内部命令":( |
| | | type.equals(CommandType.outerCommand)?"RTU命令":( |
| | | type.equals(CommandType.mqttCommand)?"MQTT命令":( |
| | | type.equals(CommandType.outerTransCommand)?"透传命令": |
| | | "")))) + "\n" ; |
| | | s += (code == null ? "" : ("功能码=" + code + "\n")) ; |
| | | s += (rtuResultSendWebUrl == null ? "" : ("回调网址=" + rtuResultSendWebUrl + "\n")); |
| | | if(param != null){ |
| | |
| | | package com.dy.common.mw.protocol4Mqtt; |
| | | |
| | | import lombok.Data; |
| | | import lombok.EqualsAndHashCode; |
| | | |
| | | /** |
| | | * @Author: liurunyu |
| | | * @Date: 2025/6/5 11:44 |
| | | * @Description |
| | | */ |
| | | public abstract class MqttPubMsg { |
| | | @Data |
| | | public class MqttPubMsg { |
| | | public String commandId ;//命令ID |
| | | |
| | | public String cd ;//功能码 |
| | | |
| | | public String deviceId ;//设备ID |
| | | |
| | |
| | | public boolean isCacheForOffLine ;//下行命令控制,消息中间件不在线是否缓存命令 |
| | | public boolean hasResponse ;//下行命令控制,命令是否有应答 |
| | | |
| | | public abstract boolean valid(); |
| | | public boolean valid(){ |
| | | if (topic == null || topic.isEmpty()) { |
| | | return false; |
| | | } |
| | | if (msg == null || msg.isEmpty()) { |
| | | return false; |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | } |
| | |
| | | package com.dy.common.mw.protocol4Mqtt; |
| | | |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.upVos.ManureVo; |
| | | import com.dy.common.util.Callback; |
| | | import lombok.Data; |
| | | |
| | | /** |
| | | * @Author: liurunyu |
| | | * @Date: 2025/6/5 11:44 |
| | | * @Description |
| | | */ |
| | | |
| | | public abstract class MqttSubMsg { |
| | | @Data |
| | | public class MqttSubMsg { |
| | | public String commandId ;//命令ID |
| | | public String mqttResultSendWebUrl ;//Mtt返回命令结果 发向目的地web URL |
| | | |
| | |
| | | public MqttTopic topic ;//消息主题 |
| | | public String metaData;//MQTT推送来的元数据 |
| | | |
| | | public abstract boolean valid(); |
| | | public Vo4Up vo4Up;//订阅的消息数据值对象 |
| | | public MqttSubMsg() { |
| | | } |
| | | public MqttSubMsg(MqttTopic subTopic, String msg) { |
| | | this.deviceId = subTopic.devId ; |
| | | this.protocol = subTopic.protocol ; |
| | | this.topic = subTopic ; |
| | | this.metaData = msg ; |
| | | } |
| | | |
| | | public abstract boolean subMsgMatchPubMsg(MqttPubMsg pubMsg); |
| | | public boolean valid() { |
| | | if (topic == null || topic.isEmpty()) { |
| | | return false; |
| | | } |
| | | if (metaData == null || metaData.isEmpty()) { |
| | | return false; |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | public abstract void action(Callback callback); |
| | | public boolean subMsgMatchPubMsg(MqttPubMsg pubMsg){ |
| | | if(this.vo4Up != null && this.vo4Up instanceof ManureVo){ |
| | | //只要上报的是状态数据,说明设备响应了命令 |
| | | return true ; |
| | | } |
| | | return false ; |
| | | } |
| | | |
| | | public void action(Callback callback){ |
| | | callback.call(this) ; |
| | | } |
| | | |
| | | |
| | | public String toString(){ |
| | | StringBuilder sb = new StringBuilder(); |
| | | if(commandId != null){ |
| | | sb.append("commandId:") |
| | | .append(commandId) |
| | | .append("\n") ; |
| | | } |
| | | sb.append("主题:") |
| | | .append(topic.longName()) |
| | | .append("\n") ; |
| | | if(vo4Up != null){ |
| | | sb.append("数据:") |
| | | .append(vo4Up.toString()) |
| | | .append("\n") ; |
| | | }else{ |
| | | sb.append("元数据:") |
| | | .append(metaData) |
| | | .append("\n") ; |
| | | } |
| | | return sb.toString() ; |
| | | } |
| | | } |
| | |
| | | public static final String SubTopicWeather = "weather" ;//气象 |
| | | public static final String SubTopicSoil = "soil" ;//土壤墒情 |
| | | public static final String SubTopicManure = "manure" ;//水肥 |
| | | public static final String SubTopicState = "state" ;//状态 |
| | | |
| | | //发布的主题 |
| | | public static final String PubTopicFault = "ctrlFault" ;//故障解除 |
| | |
| | | import com.alibaba.fastjson2.JSON; |
| | | import com.alibaba.fastjson2.JSONObject; |
| | | import com.dy.common.mw.protocol.Command; |
| | | import com.dy.common.mw.protocol4Mqtt.MqttCallback; |
| | | import com.dy.common.mw.protocol4Mqtt.MqttTopic; |
| | | import com.dy.common.mw.protocol4Mqtt.Vo4Up; |
| | | import com.dy.common.mw.protocol4Mqtt.*; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.comParam.ComCtrlVo; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.comParam.ComSetParamVo; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.FaultClearVo; |
| | |
| | | * @Description |
| | | */ |
| | | public class ProtocolParserSdV1 { |
| | | public MqttSubMsgSdV1 parseSubMsg(MqttTopic subTopic, MqttMessage mqttMsg, MqttCallback callback) throws Exception { |
| | | public MqttSubMsg parseSubMsg(MqttTopic subTopic, MqttMessage mqttMsg, MqttCallback callback) throws Exception { |
| | | String strTxt = new String(mqttMsg.getPayload(), "UTF-8"); |
| | | if(!JSON.isValid(strTxt)){ |
| | | throw new Exception("接收到MQTT消息,协议" + subTopic.protocol + ",设备ID" + subTopic.devId + ",主题" + subTopic.longName() + "消息格式非json数据(" + strTxt + ")") ; |
| | | } |
| | | MqttSubMsgSdV1 msg = new MqttSubMsgSdV1(subTopic, strTxt); |
| | | MqttSubMsg msg = new MqttSubMsg(subTopic, strTxt); |
| | | Vo4Up vo ; |
| | | DevRunInfo stInfo = null ; |
| | | switch (subTopic.type) { |
| | |
| | | return msg; |
| | | } |
| | | |
| | | public MqttPubMsgSdV1 createPubMsg(String orgTag, Command com) throws Exception { |
| | | MqttPubMsgSdV1 msg ; |
| | | public MqttPubMsg createPubMsg(String orgTag, Command com) throws Exception { |
| | | MqttPubMsg msg ; |
| | | switch (com.code) { |
| | | case CodeSdV1.cd_Fault -> { |
| | | //故障解除命令 |
| | |
| | | throw new Exception("接收到MQTT命令,协议" + com.protocol + "版本" + com.protocolVersion + "功能码" + com.code + "命令结果回收URL为空") ; |
| | | } |
| | | } |
| | | private MqttPubMsgSdV1 createPubMsgOfFault(String orgTag, Command com) throws Exception { |
| | | private MqttPubMsg createPubMsgOfFault(String orgTag, Command com) throws Exception { |
| | | JSONObject obj = (JSONObject) com.param; |
| | | String json = obj.toJSONString(); |
| | | ComCtrlVo cvo = JSON.parseObject(json, ComCtrlVo.class); |
| | | if(cvo == null){ |
| | | throw new Exception("json转ComCtrlVo为null") ; |
| | | } |
| | | MqttPubMsgSdV1 msg = new MqttPubMsgSdV1() ; |
| | | MqttPubMsg msg = new MqttPubMsg() ; |
| | | this.setPubMsgBase(com, msg); |
| | | msg.isCacheForOffLine = false ; |
| | | msg.hasResponse = true ; |
| | |
| | | msg.msg = JSON.toJSONString(new FaultClearVo(cvo.startTrueStopFalse ?(byte)1:0)) ; |
| | | return msg ; |
| | | } |
| | | private MqttPubMsgSdV1 createPubMsgOfStir(String orgTag, Command com) throws Exception { |
| | | private MqttPubMsg createPubMsgOfStir(String orgTag, Command com) throws Exception { |
| | | JSONObject obj = (JSONObject) com.param; |
| | | String json = obj.toJSONString(); |
| | | ComCtrlVo cvo = JSON.parseObject(json, ComCtrlVo.class); |
| | | if(cvo == null){ |
| | | throw new Exception("json转ComCtrlVo为null") ; |
| | | } |
| | | MqttPubMsgSdV1 msg = new MqttPubMsgSdV1() ; |
| | | MqttPubMsg msg = new MqttPubMsg() ; |
| | | this.setPubMsgBase(com, msg); |
| | | msg.isCacheForOffLine = false ; |
| | | msg.hasResponse = true ; |
| | |
| | | msg.msg = JSON.toJSONString(new StirStartVo(cvo.startTrueStopFalse ?(byte)1:0)) ; |
| | | return msg ; |
| | | } |
| | | private MqttPubMsgSdV1 createPubMsgOfInject(String orgTag, Command com) throws Exception { |
| | | private MqttPubMsg createPubMsgOfInject(String orgTag, Command com) throws Exception { |
| | | JSONObject obj = (JSONObject) com.param; |
| | | String json = obj.toJSONString(); |
| | | ComCtrlVo cvo = JSON.parseObject(json, ComCtrlVo.class); |
| | | if(cvo == null){ |
| | | throw new Exception("json转ComCtrlVo为null") ; |
| | | } |
| | | MqttPubMsgSdV1 msg = new MqttPubMsgSdV1() ; |
| | | MqttPubMsg msg = new MqttPubMsg() ; |
| | | this.setPubMsgBase(com, msg); |
| | | msg.isCacheForOffLine = false ; |
| | | msg.hasResponse = true ; |
| | |
| | | msg.msg = JSON.toJSONString(new InjectStartVo(cvo.startTrueStopFalse ?(byte)1:0)) ; |
| | | return msg ; |
| | | } |
| | | private MqttPubMsgSdV1 createPubMsgOfIrr(String orgTag, Command com) throws Exception { |
| | | private MqttPubMsg createPubMsgOfIrr(String orgTag, Command com) throws Exception { |
| | | JSONObject obj = (JSONObject) com.param; |
| | | String json = obj.toJSONString(); |
| | | ComCtrlVo cvo = JSON.parseObject(json, ComCtrlVo.class); |
| | | if(cvo == null){ |
| | | throw new Exception("json转ComCtrlVo为null") ; |
| | | } |
| | | MqttPubMsgSdV1 msg = new MqttPubMsgSdV1() ; |
| | | MqttPubMsg msg = new MqttPubMsg() ; |
| | | this.setPubMsgBase(com, msg); |
| | | msg.isCacheForOffLine = false ; |
| | | msg.hasResponse = true ; |
| | |
| | | return msg ; |
| | | } |
| | | |
| | | private MqttPubMsgSdV1 createPubMsgOfParam(String orgTag, Command com) throws Exception { |
| | | private MqttPubMsg createPubMsgOfParam(String orgTag, Command com) throws Exception { |
| | | JSONObject obj = (JSONObject) com.param; |
| | | String json = obj.toJSONString(); |
| | | ComSetParamVo cvo = JSON.parseObject(json, ComSetParamVo.class); |
| | | if(cvo == null){ |
| | | throw new Exception("json转ComSetParamVo为null") ; |
| | | } |
| | | MqttPubMsgSdV1 msg = new MqttPubMsgSdV1() ; |
| | | MqttPubMsg msg = new MqttPubMsg() ; |
| | | this.setPubMsgBase(com, msg); |
| | | msg.isCacheForOffLine = false ; |
| | | msg.hasResponse = false ; |
| | | msg.hasResponse = true ; |
| | | msg.cd = CodeSdV1.cd_Param ; |
| | | msg.topic = new MqttTopic(orgTag, com.protocol + com.protocolVersion, com.rtuAddr, ProtocolConstantSdV1.PubTopicParam, cvo.no) ; |
| | | msg.msg = JSON.toJSONString(new ParamSetVo(cvo.stirDuration, cvo.injectDuration)) ; |
| | | return msg ; |
| | | } |
| | | |
| | | private void setPubMsgBase(Command com, MqttPubMsgSdV1 msg){ |
| | | private void setPubMsgBase(Command com, MqttPubMsg msg){ |
| | | msg.commandId = com.id ; |
| | | msg.deviceId = com.rtuAddr ; |
| | | msg.mqttResultSendWebUrl = com.rtuResultSendWebUrl ; |
| | |
| | | /** |
| | | * 搅拌机1是否运行;(水肥机还可存在搅拌机2、搅拌机3、搅拌机4 ......) 运行时1,停止0. |
| | | */ |
| | | @JSONField(name = "搅拌运行1") |
| | | @JSONField(alternateNames = {"搅拌运行1", "stirRunning1"}) |
| | | public Byte stirRunning1;//搅拌1运行状态 |
| | | /** |
| | | * 搅拌机2是否运行;(水肥机还可存在搅拌机2、搅拌机3、搅拌机4 ......) 运行时1,停止0. |
| | | * (当前大禹应用不存在) |
| | | */ |
| | | @JSONField(name = "搅拌运行2") |
| | | @JSONField(alternateNames = {"搅拌运行2", "stirRunning2"}) |
| | | public Byte stirRunning2;//搅拌2运行状态 |
| | | /** |
| | | * 搅拌机3是否运行;(水肥机还可存在搅拌机2、搅拌机3、搅拌机4 ......) 运行时1,停止0. |
| | | * (当前大禹应用不存在) |
| | | */ |
| | | @JSONField(name = "搅拌运行3") |
| | | @JSONField(alternateNames = {"搅拌运行3", "stirRunning3"}) |
| | | public Byte stirRunning3;//搅拌3运行状态 |
| | | /** |
| | | * 搅拌机4是否运行;(水肥机还可存在搅拌机2、搅拌机3、搅拌机4 ......) 运行时1,停止0. |
| | | * (当前大禹应用不存在) |
| | | */ |
| | | @JSONField(name = "搅拌运行4") |
| | | @JSONField(alternateNames = {"搅拌运行4", "stirRunning4"}) |
| | | public Byte stirRunning4;//搅拌4运行状态 |
| | | |
| | | /** |
| | | * 一个水肥机只有一个注肥泵 |
| | | */ |
| | | @JSONField(name = "注肥运行") |
| | | @JSONField(alternateNames = {"注肥运行", "injectRunning"}) |
| | | public Byte injectRunning ;//注肥运行状态 |
| | | |
| | | @JSONField(name = "灌溉运行") |
| | | |
| | | @JSONField(alternateNames = {"灌溉运行", "irrRunning"}) |
| | | public Byte irrRunning ;//灌溉运行状态 |
| | | |
| | | /** |
| | | * 1:注肥泵有故障。0:注肥泵没有故障 |
| | | */ |
| | | @JSONField(name = "报警") |
| | | @JSONField(alternateNames = {"报警", "alarm"}) |
| | | public Byte alarm ;//注肥泵故障 |
| | | |
| | | /** |
| | | * 肥料流量,单位升 |
| | | */ |
| | | @JSONField(name = "肥料流量") |
| | | @JSONField(alternateNames = {"肥料流量", "manureFlow"}) |
| | | public Float manureFlow ; |
| | | |
| | | /** |
| | | * 单位秒 |
| | | */ |
| | | @JSONField(name = "注肥经过时间") |
| | | @JSONField(alternateNames = {"注肥经过时间", "manureTime"}) |
| | | public Integer manureTime ; |
| | | |
| | | /** |
| | | * 单位秒 |
| | | */ |
| | | @JSONField(name = "搅拌经过时间") |
| | | @JSONField(alternateNames = {"搅拌经过时间", "stirTime"}) |
| | | public Integer stirTime ; |
| | | |
| | | /** |
| | | * 单位秒 |
| | | */ |
| | | @JSONField(alternateNames = {"搅拌设定时间", "stirDuration"}) |
| | | public Integer stirDuration ; |
| | | |
| | | /** |
| | | * 单位秒 |
| | | */ |
| | | @JSONField(alternateNames = {"注肥设定时间", "injectDuration"}) |
| | | public Integer injectDuration ; |
| | | |
| | | @JSONField(name = "flexem_timestamp") |
| | | public Long devDt ;//设备时间 |
| | |
| | | sb.append(" 肥料流量:" + manureFlow + ", ") ; |
| | | sb.append(" 注肥经过时间:" + manureTime + ", ") ; |
| | | sb.append(" 搅拌经过时间:" + stirTime + ", ") ; |
| | | sb.append(" 注肥设定时间:" + injectDuration + ", ") ; |
| | | sb.append(" 搅拌设定时间:" + stirDuration + ", ") ; |
| | | sb.append(" 设备时间:" + devDt + ", ") ; |
| | | sb.append(" 设备时间:" + this.getDevDtStr()) ; |
| | | sb.append("\n") ; |
| | |
| | | return rsMap ; |
| | | } |
| | | } |
| | | |
| | | public static Boolean oneOnLine(String devId){ |
| | | synchronized (map){ |
| | | DevStatus st = map.get(devId) ; |
| | | if(st != null){ |
| | | return st.onLine ; |
| | | } |
| | | return false ; |
| | | } |
| | | } |
| | | /** |
| | | * 统计在线与不在线情况 |
| | | */ |
| | |
| | | package com.dy.rtuMw.server.rtuData; |
| | | |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.MqttSubMsgSdV1; |
| | | import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; |
| | | import com.dy.rtuMw.server.rtuData.pSdV1.TkFindPSdV1; |
| | | import org.apache.logging.log4j.LogManager; |
| | | import org.apache.logging.log4j.Logger; |
| | |
| | | if(data == null){ |
| | | log.error("严重错误,Mqtt订阅消息数据为空!" ); |
| | | }else{ |
| | | if(data instanceof MqttSubMsgSdV1){ |
| | | if(data instanceof MqttSubMsg){ |
| | | this.toNextOneTask(data, TkFindPSdV1.taskId); |
| | | }else{ |
| | | log.error("严重错误,该数据类型(" + data.getClass().getName() + "),接收数据任务还未实现!" ); |
| | |
| | | package com.dy.rtuMw.server.rtuData.pSdV1; |
| | | |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.MqttSubMsgSdV1; |
| | | import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; |
| | | import com.dy.rtuMw.server.rtuData.TaskSurpport; |
| | | import org.apache.logging.log4j.LogManager; |
| | | import org.apache.logging.log4j.Logger; |
| | |
| | | @Override |
| | | public void execute(Object data) { |
| | | //前面的任务已经判断了data不为空 |
| | | MqttSubMsgSdV1 msg = (MqttSubMsgSdV1)data ; |
| | | MqttSubMsg msg = (MqttSubMsg)data ; |
| | | log.info(msg.toString()); |
| | | } |
| | | |
| | |
| | | import com.dy.rtuMw.server.forTcp.TcpSessionCache; |
| | | import com.dy.rtuMw.server.local.CommandInnerDeaLer; |
| | | import com.dy.rtuMw.server.local.ReturnCommand; |
| | | import com.dy.rtuMw.server.mqtt.DevStatusDealer; |
| | | import com.dy.rtuMw.server.mqtt.MqttManager; |
| | | import com.dy.rtuMw.server.msCenter.MsCenterUnit; |
| | | import com.dy.rtuMw.server.tasks.WebDownCom4MqttTask; |
| | |
| | | * @return 结果 |
| | | */ |
| | | private BaseResponse<Command> dealMqttCommand(Command command){ |
| | | String rtuAddr = command.getRtuAddr() ;//FBox设备号 |
| | | if(rtuAddr == null || rtuAddr.trim().equals("")){ |
| | | String mqttDevId = command.getRtuAddr() ;//FBox设备号 |
| | | if(mqttDevId == null || mqttDevId.trim().equals("")){ |
| | | return BaseResponseUtils.buildError(ReturnCommand.errored("出错,FBox设备ID为空!", command.getId(), command.getCode())) ; |
| | | } |
| | | if(!ServerProperties.mqttUnitEnable.booleanValue()){ |
| | |
| | | if(MqttManager.getInstance().poolIsClose()){ |
| | | return BaseResponseUtils.buildError(ReturnCommand.errored("出错,MQTT连接池未创建成功!", command.getId(), command.getCode())) ; |
| | | } |
| | | if(!DevStatusDealer.oneOnLine(mqttDevId)){ |
| | | return BaseResponseUtils.buildError(ReturnCommand.errored("出错,FBox设备未在线!", command.getId(), command.getCode())) ; |
| | | } |
| | | |
| | | //生成异步任务 |
| | | WebDownCom4MqttTask task = new WebDownCom4MqttTask() ; |
| | |
| | | # 凉州: lz |
| | | # 金川: jc |
| | | # 嘉峪关: jyg |
| | | base.orgTag=ym |
| | | base.orgTag=jyg |
| | | |
| | | # 233服务器: |
| | | # 元谋: 60000 |
| | |
| | | # 凉州: 60104 |
| | | # 金川: 60105 |
| | | # 嘉峪关: 60106 |
| | | tcp.port=60000 |
| | | tcp.port=60106 |
| | | |
| | | #RTU上行数据最小间隔,大于这个间隔认为设备离线了,测控一体阀是3,表阀一体机是6,默认采用时间最长的6 |
| | | base.upData.min.interval=6 |
| | |
| | | mqtt.port=1883 |
| | | mqtt.user=dyyjy |
| | | mqtt.password=Dyyjy2025,;.abc!@# |
| | | mqtt.protocolAndDeviceIds=sd1/123456789 |
| | | mqtt.protocolAndDeviceIds=sd1/2430002404000840,sd1/2430002404000840 |
| | | mqtt.subTopicAndQos=weather/1,1;soil/1,1;manure/1,1 |
| | | #MQtt设备在一定时间(分钟)后未发布消息,认为设备离线 |
| | | mqtt.noSubThenOff=10 |
| | |
| | | if(codeDataObj != null){ |
| | | if(codeDataObj instanceof JSONObject){ |
| | | codeData = protocolData == null ? null : (protocolData.getJSONObject("subData")) ; //协议功能码数据 |
| | | }else if(codeDataObj instanceof JSONArray){{ |
| | | }else if(codeDataObj instanceof JSONArray){ |
| | | codeArrayData = protocolData == null ? null : (protocolData.getJSONArray("subData")) ; //协议功能码数据 |
| | | }} |
| | | } |
| | | } |
| | | JSONObject job_response = new JSONObject(); |
| | | job_response.put("data", codeData!=null?codeData:(codeArrayData!=null?codeArrayData:null)); |
New file |
| | |
| | | package com.dy.pipIrrRemote.common; |
| | | |
| | | import com.alibaba.fastjson2.JSONObject; |
| | | import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; |
| | | import com.dy.pipIrrGlobal.command.ComResultWait; |
| | | import com.dy.pipIrrGlobal.command.ComSupport; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.http.MediaType; |
| | | import org.springframework.web.bind.annotation.PostMapping; |
| | | import org.springframework.web.bind.annotation.RequestBody; |
| | | import org.springframework.web.bind.annotation.RequestMapping; |
| | | import org.springframework.web.bind.annotation.RestController; |
| | | |
| | | import java.util.concurrent.CompletableFuture; |
| | | |
| | | /** |
| | | * @author ZhuBaoMin |
| | | * @date 2024-05-23 8:19 |
| | | * @LastEditTime 2024-05-23 8:19 |
| | | * @Description |
| | | */ |
| | | |
| | | @Slf4j |
| | | @RestController |
| | | @RequestMapping(path="mqttRes") |
| | | public class MqttResultCtrl extends ComSupport { |
| | | @PostMapping(path = "receive", consumes = MediaType.APPLICATION_JSON_VALUE) |
| | | public void receive(@RequestBody MqttSubMsg subMsg) { |
| | | if(subMsg != null){ |
| | | CompletableFuture<MqttSubMsg> feature = (CompletableFuture<MqttSubMsg>) ComResultWait.get(Long.parseLong(subMsg.commandId)); |
| | | if(feature != null) { |
| | | feature.complete(subMsg); |
| | | }else{ |
| | | //超时,feature被清除了 |
| | | } |
| | | }else{ |
| | | log.error("mqtt协议消息为空"); |
| | | } |
| | | } |
| | | } |
| | |
| | | |
| | | import com.alibaba.fastjson2.JSONObject; |
| | | import com.dy.common.mw.protocol.Command; |
| | | import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; |
| | | import com.dy.common.util.Callback; |
| | | import com.dy.common.util.IDLongGenerator; |
| | | import com.dy.common.webUtil.BaseResponse; |
| | |
| | | @Value("${mw.waitMwRtnResultTimeout}") |
| | | protected int waitMwRtnResultTimeout ; |
| | | |
| | | @Value("${mw.rtuCallbackUrl_rm}") |
| | | protected String rtuResultSendWebUrl; |
| | | @Value("${mw.mqttCallbackUrl_rm}") |
| | | protected String mqttResultSendWebUrl; |
| | | |
| | | //水肥机对象 |
| | | protected PrStManure ctrlPo ; |
| | | //异步等待器 |
| | | protected CompletableFuture<JSONObject> feature; |
| | | protected CompletableFuture<MqttSubMsg> feature; |
| | | //命令名称 |
| | | protected String comName ; |
| | | //命令日志id |
| | |
| | | public BaseResponse<Object> after(String comCode, Callback callback) { |
| | | try{ |
| | | //等待通信中间件通知水肥机执行命令上行数据(命令结果) |
| | | JSONObject resultData = feature.get(waitMwRtnResultTimeout, TimeUnit.SECONDS); |
| | | return BaseResponseUtils.buildSuccess(this.dealComResult(comCode, resultData, callback)); |
| | | MqttSubMsg subMsg = feature.get(waitMwRtnResultTimeout, TimeUnit.SECONDS); |
| | | return BaseResponseUtils.buildSuccess(this.dealComResult(comCode, subMsg, callback)); |
| | | }catch (Exception e){ |
| | | return BaseResponseUtils.buildFail("等待通信中间件通知命令结果超时"); |
| | | } |
| | |
| | | /** |
| | | * 生成命令返回信息 |
| | | */ |
| | | protected abstract String dealComResult(String code, JSONObject resultData, Callback callback); |
| | | protected abstract String dealComResult(String code, MqttSubMsg subMsg, Callback callback); |
| | | } |
| | |
| | | package com.dy.pipIrrRemote.monitor.mqttSd1.fault; |
| | | |
| | | import com.alibaba.fastjson2.JSON; |
| | | import com.alibaba.fastjson2.JSONObject; |
| | | import com.dy.common.aop.SsoAop; |
| | | import com.dy.common.mw.protocol.Command; |
| | | import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.CodeSdV1; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.ProtocolConstantSdV1; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.upVos.ManureVo; |
| | |
| | | import org.springframework.web.bind.annotation.RequestBody; |
| | | import org.springframework.web.bind.annotation.RequestMapping; |
| | | import org.springframework.web.bind.annotation.RestController; |
| | | |
| | | import java.lang.reflect.InvocationHandler; |
| | | import java.lang.reflect.Proxy; |
| | | |
| | | /** |
| | | * @Author: liurunyu |
| | |
| | | try { |
| | | //创建外部命令(发给MQTT->FBox) |
| | | Command com = sv.createMQTTCommand(ctrlPo.fboxId, "" + comId, Protocol, ProtocolVersion, ComCode); |
| | | com.rtuResultSendWebUrl = rtuResultSendWebUrl; |
| | | com.rtuResultSendWebUrl = mqttResultSendWebUrl; |
| | | com.param = comParam ; |
| | | //发送命令 |
| | | res = super.doSend(sv, com); |
| | |
| | | } |
| | | |
| | | @Override |
| | | protected String dealComResult(String code, JSONObject resultData, Callback callback){ |
| | | protected String dealComResult(String code, MqttSubMsg subMsg, Callback callback){ |
| | | String msg; |
| | | if(resultData != null){ |
| | | log.info(resultData.toString()); |
| | | JSONObject codeData = resultData.getJSONObject("data") ; |
| | | if(codeData == null){ |
| | | if(subMsg != null){ |
| | | if(subMsg.vo4Up != null && Proxy.isProxyClass(subMsg.vo4Up.getClass())){ |
| | | // 获取代理的 InvocationHandler |
| | | InvocationHandler handler = Proxy.getInvocationHandler(subMsg.vo4Up); |
| | | String json = JSON.toJSONString(handler) ; |
| | | ManureVo vo = JSON.parseObject(json, ManureVo.class); |
| | | msg = vo.toString() ; |
| | | }else{ |
| | | msg = RtuSuccessMsg ; |
| | | }else { |
| | | String json = codeData.toJSONString(); |
| | | ManureVo cvo = JSON.parseObject(json, ManureVo.class) ; |
| | | if(cvo != null){ |
| | | msg = cvo.toString() ; |
| | | }else{ |
| | | msg = RtuSuccessMsg ; |
| | | } |
| | | } |
| | | }else{ |
| | | msg = RtuSuccessMsg ; |
| | | } |
| | | return msg; |
| | | }} |
| | | } |
| | | } |
| | |
| | | package com.dy.pipIrrRemote.monitor.mqttSd1.inject; |
| | | |
| | | import com.alibaba.fastjson2.JSON; |
| | | import com.alibaba.fastjson2.JSONObject; |
| | | import com.dy.common.aop.SsoAop; |
| | | import com.dy.common.mw.protocol.Command; |
| | | import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.CodeSdV1; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.ProtocolConstantSdV1; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.upVos.ManureVo; |
| | |
| | | import com.dy.common.webUtil.BaseResponseUtils; |
| | | import com.dy.pipIrrRemote.common.dto.Dto4MqttBase; |
| | | import com.dy.pipIrrRemote.monitor.common.Com4MqttCtrl; |
| | | import com.dy.pipIrrRemote.monitor.mqttSd1.stir.CdDto; |
| | | import io.swagger.v3.oas.annotations.tags.Tag; |
| | | import jakarta.validation.Valid; |
| | | import lombok.RequiredArgsConstructor; |
| | |
| | | import org.springframework.web.bind.annotation.RequestBody; |
| | | import org.springframework.web.bind.annotation.RequestMapping; |
| | | import org.springframework.web.bind.annotation.RestController; |
| | | |
| | | import java.lang.reflect.InvocationHandler; |
| | | import java.lang.reflect.Proxy; |
| | | |
| | | /** |
| | | * @Author: liurunyu |
| | |
| | | */ |
| | | @PostMapping(path = "start", consumes = MediaType.APPLICATION_JSON_VALUE) |
| | | @SsoAop() |
| | | public BaseResponse<Object> start(@RequestBody @Valid com.dy.pipIrrRemote.monitor.mqttSd1.stir.CdDto dto, BindingResult bindingResult) { |
| | | public BaseResponse<Object> start(@RequestBody @Valid CdDto dto, BindingResult bindingResult) { |
| | | return this.send(dto, bindingResult, true) ; |
| | | } |
| | | |
| | |
| | | */ |
| | | @PostMapping(path = "stop", consumes = MediaType.APPLICATION_JSON_VALUE) |
| | | @SsoAop() |
| | | public BaseResponse<Object> stop(@RequestBody @Valid com.dy.pipIrrRemote.monitor.mqttSd1.stir.CdDto dto, BindingResult bindingResult) { |
| | | public BaseResponse<Object> stop(@RequestBody @Valid CdDto dto, BindingResult bindingResult) { |
| | | return this.send(dto, bindingResult, false) ; |
| | | } |
| | | |
| | |
| | | try { |
| | | //创建外部命令(发给MQTT->FBox) |
| | | Command com = sv.createMQTTCommand(ctrlPo.fboxId, "" + comId, Protocol, ProtocolVersion, ComCode); |
| | | com.rtuResultSendWebUrl = rtuResultSendWebUrl; |
| | | com.rtuResultSendWebUrl = mqttResultSendWebUrl; |
| | | com.param = comParam ; |
| | | //发送命令 |
| | | res = super.doSend(sv, com); |
| | |
| | | } |
| | | |
| | | @Override |
| | | protected String dealComResult(String code, JSONObject resultData, Callback callback){ |
| | | protected String dealComResult(String code, MqttSubMsg subMsg, Callback callback){ |
| | | String msg; |
| | | if(resultData != null){ |
| | | log.info(resultData.toString()); |
| | | JSONObject codeData = resultData.getJSONObject("data") ; |
| | | if(codeData == null){ |
| | | if(subMsg != null){ |
| | | if(subMsg.vo4Up != null && Proxy.isProxyClass(subMsg.vo4Up.getClass())){ |
| | | // 获取代理的 InvocationHandler |
| | | InvocationHandler handler = Proxy.getInvocationHandler(subMsg.vo4Up); |
| | | String json = JSON.toJSONString(handler) ; |
| | | ManureVo vo = JSON.parseObject(json, ManureVo.class); |
| | | msg = vo.toString() ; |
| | | }else{ |
| | | msg = RtuSuccessMsg ; |
| | | }else { |
| | | String json = codeData.toJSONString(); |
| | | ManureVo cvo = JSON.parseObject(json, ManureVo.class) ; |
| | | if(cvo != null){ |
| | | msg = cvo.toString() ; |
| | | }else{ |
| | | msg = RtuSuccessMsg ; |
| | | } |
| | | } |
| | | }else{ |
| | | msg = RtuSuccessMsg ; |
| | | } |
| | | return msg; |
| | | }} |
| | | } |
| | | } |
| | |
| | | package com.dy.pipIrrRemote.monitor.mqttSd1.irr; |
| | | |
| | | import com.alibaba.fastjson2.JSON; |
| | | import com.alibaba.fastjson2.JSONObject; |
| | | import com.dy.common.aop.SsoAop; |
| | | import com.dy.common.mw.protocol.Command; |
| | | import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.CodeSdV1; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.ProtocolConstantSdV1; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.upVos.ManureVo; |
| | |
| | | import org.springframework.web.bind.annotation.RequestBody; |
| | | import org.springframework.web.bind.annotation.RequestMapping; |
| | | import org.springframework.web.bind.annotation.RestController; |
| | | |
| | | import java.lang.reflect.InvocationHandler; |
| | | import java.lang.reflect.Proxy; |
| | | |
| | | /** |
| | | * @Author: liurunyu |
| | |
| | | try { |
| | | //创建外部命令(发给MQTT->FBox) |
| | | Command com = sv.createMQTTCommand(ctrlPo.fboxId, "" + comId, Protocol, ProtocolVersion, ComCode); |
| | | com.rtuResultSendWebUrl = rtuResultSendWebUrl; |
| | | com.rtuResultSendWebUrl = mqttResultSendWebUrl; |
| | | com.param = comParam ; |
| | | //发送命令 |
| | | res = super.doSend(sv, com); |
| | |
| | | } |
| | | |
| | | @Override |
| | | protected String dealComResult(String code, JSONObject resultData, Callback callback){ |
| | | protected String dealComResult(String code, MqttSubMsg subMsg, Callback callback){ |
| | | String msg; |
| | | if(resultData != null){ |
| | | log.info(resultData.toString()); |
| | | JSONObject codeData = resultData.getJSONObject("data") ; |
| | | if(codeData == null){ |
| | | if(subMsg != null){ |
| | | if(subMsg.vo4Up != null && Proxy.isProxyClass(subMsg.vo4Up.getClass())){ |
| | | // 获取代理的 InvocationHandler |
| | | InvocationHandler handler = Proxy.getInvocationHandler(subMsg.vo4Up); |
| | | String json = JSON.toJSONString(handler) ; |
| | | ManureVo vo = JSON.parseObject(json, ManureVo.class); |
| | | msg = vo.toString() ; |
| | | }else{ |
| | | msg = RtuSuccessMsg ; |
| | | }else { |
| | | String json = codeData.toJSONString(); |
| | | ManureVo cvo = JSON.parseObject(json, ManureVo.class) ; |
| | | if(cvo != null){ |
| | | msg = cvo.toString() ; |
| | | }else{ |
| | | msg = RtuSuccessMsg ; |
| | | } |
| | | } |
| | | }else{ |
| | | msg = RtuSuccessMsg ; |
| | | } |
| | | return msg; |
| | | }} |
| | | } |
| | | } |
| | |
| | | package com.dy.pipIrrRemote.monitor.mqttSd1.paramSet; |
| | | |
| | | import com.alibaba.fastjson2.JSON; |
| | | import com.alibaba.fastjson2.JSONObject; |
| | | import com.dy.common.aop.SsoAop; |
| | | import com.dy.common.mw.protocol.Command; |
| | | import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.CodeSdV1; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.ProtocolConstantSdV1; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.upVos.ManureVo; |
| | |
| | | import com.dy.common.webUtil.BaseResponseUtils; |
| | | import com.dy.pipIrrRemote.common.dto.Dto4MqttBase; |
| | | import com.dy.pipIrrRemote.monitor.common.Com4MqttCtrl; |
| | | import com.dy.pipIrrRemote.monitor.mqttSd1.stir.CdDto; |
| | | import io.swagger.v3.oas.annotations.tags.Tag; |
| | | import jakarta.validation.Valid; |
| | | import lombok.RequiredArgsConstructor; |
| | |
| | | import org.springframework.web.bind.annotation.RequestBody; |
| | | import org.springframework.web.bind.annotation.RequestMapping; |
| | | import org.springframework.web.bind.annotation.RestController; |
| | | |
| | | import java.lang.reflect.InvocationHandler; |
| | | import java.lang.reflect.Proxy; |
| | | |
| | | /** |
| | | * @Author: liurunyu |
| | |
| | | try { |
| | | //创建外部命令(发给MQTT->FBox) |
| | | Command com = sv.createMQTTCommand(ctrlPo.fboxId, "" + comId, Protocol, ProtocolVersion, ComCode); |
| | | com.rtuResultSendWebUrl = rtuResultSendWebUrl; |
| | | com.rtuResultSendWebUrl = mqttResultSendWebUrl; |
| | | com.param = comParam ; |
| | | //发送命令 |
| | | res = super.doSend(sv, com); |
| | |
| | | } |
| | | |
| | | @Override |
| | | protected String dealComResult(String code, JSONObject resultData, Callback callback){ |
| | | protected String dealComResult(String code, MqttSubMsg subMsg, Callback callback){ |
| | | String msg; |
| | | if(resultData != null){ |
| | | log.info(resultData.toString()); |
| | | JSONObject codeData = resultData.getJSONObject("data") ; |
| | | if(codeData == null){ |
| | | if(subMsg != null){ |
| | | if(subMsg.vo4Up != null && Proxy.isProxyClass(subMsg.vo4Up.getClass())){ |
| | | // 获取代理的 InvocationHandler |
| | | InvocationHandler handler = Proxy.getInvocationHandler(subMsg.vo4Up); |
| | | String json = JSON.toJSONString(handler) ; |
| | | ManureVo vo = JSON.parseObject(json, ManureVo.class); |
| | | msg = vo.toString() ; |
| | | }else{ |
| | | msg = RtuSuccessMsg ; |
| | | }else { |
| | | String json = codeData.toJSONString(); |
| | | ManureVo cvo = JSON.parseObject(json, ManureVo.class) ; |
| | | if(cvo != null){ |
| | | msg = cvo.toString() ; |
| | | }else{ |
| | | msg = RtuSuccessMsg ; |
| | | } |
| | | } |
| | | }else{ |
| | | msg = RtuSuccessMsg ; |
| | | } |
| | | return msg; |
| | | }} |
| | | } |
| | | } |
| | |
| | | package com.dy.pipIrrRemote.monitor.mqttSd1.paramSet; |
| | | |
| | | import com.dy.pipIrrRemote.common.dto.Dto4MqttBase; |
| | | import jakarta.validation.constraints.NotNull; |
| | | import lombok.Data; |
| | | import lombok.EqualsAndHashCode; |
| | | |
| | |
| | | @EqualsAndHashCode(callSuper=true) |
| | | public class CdDto extends Dto4MqttBase { |
| | | public static final long serialVersionUID = 202506201656001L; |
| | | |
| | | // 搅拌设定时间 单位秒 |
| | | @NotNull(message = "搅拌设定时间不能为空") |
| | | public Integer stirDuration ; |
| | | |
| | | // 注肥设定时间 单位秒 |
| | | @NotNull(message = "注肥设定时间不能为空") |
| | | public Integer injectDuration ; |
| | | } |
| | |
| | | package com.dy.pipIrrRemote.monitor.mqttSd1.stir; |
| | | |
| | | import com.alibaba.fastjson2.JSON; |
| | | import com.alibaba.fastjson2.JSONObject; |
| | | import com.dy.common.aop.SsoAop; |
| | | import com.dy.common.mw.protocol.Command; |
| | | import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.CodeSdV1; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.ProtocolConstantSdV1; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.upVos.ManureVo; |
| | |
| | | import org.springframework.web.bind.annotation.RequestMapping; |
| | | import org.springframework.web.bind.annotation.RestController; |
| | | |
| | | import java.lang.reflect.InvocationHandler; |
| | | import java.lang.reflect.Proxy; |
| | | |
| | | /** |
| | | * @Author: liurunyu |
| | | * @Date: 2025/6/16 17:35 |
| | |
| | | @Slf4j |
| | | @Tag(name = "远程命令", description = "搅拌启停") |
| | | @RestController("mqttSd1StirCtrl") |
| | | @RequestMapping(path = "mqttSd1/stirCtrl") |
| | | @RequestMapping(path = "mqttSd1/stir") |
| | | @RequiredArgsConstructor |
| | | @Scope("prototype") //因为有对象类属性,所以采用原型模式,每次请求新建一个实例对象 |
| | | public class CdCtrl extends Com4MqttCtrl { |
| | |
| | | try { |
| | | //创建外部命令(发给MQTT->FBox) |
| | | Command com = sv.createMQTTCommand(ctrlPo.fboxId, "" + comId, Protocol, ProtocolVersion, ComCode); |
| | | com.rtuResultSendWebUrl = rtuResultSendWebUrl; |
| | | com.rtuResultSendWebUrl = mqttResultSendWebUrl; |
| | | com.param = comParam ; |
| | | //发送命令 |
| | | res = super.doSend(sv, com); |
| | |
| | | } |
| | | |
| | | @Override |
| | | protected String dealComResult(String code, JSONObject resultData, Callback callback){ |
| | | protected String dealComResult(String code, MqttSubMsg subMsg, Callback callback){ |
| | | String msg; |
| | | if(resultData != null){ |
| | | log.info(resultData.toString()); |
| | | JSONObject codeData = resultData.getJSONObject("data") ; |
| | | if(codeData == null){ |
| | | if(subMsg != null){ |
| | | if(subMsg.vo4Up != null && Proxy.isProxyClass(subMsg.vo4Up.getClass())){ |
| | | // 获取代理的 InvocationHandler |
| | | InvocationHandler handler = Proxy.getInvocationHandler(subMsg.vo4Up); |
| | | String json = JSON.toJSONString(handler) ; |
| | | ManureVo vo = JSON.parseObject(json, ManureVo.class); |
| | | msg = vo.toString() ; |
| | | }else{ |
| | | msg = RtuSuccessMsg ; |
| | | }else { |
| | | String json = codeData.toJSONString(); |
| | | ManureVo cvo = JSON.parseObject(json, ManureVo.class) ; |
| | | if(cvo != null){ |
| | | msg = cvo.toString() ; |
| | | }else{ |
| | | msg = RtuSuccessMsg ; |
| | | } |
| | | } |
| | | }else{ |
| | | msg = RtuSuccessMsg ; |
| | | } |
| | | return msg; |
| | | }} |
| | | } |
| | | } |
| | |
| | | public class CdDto extends Dto4MqttBase { |
| | | public static final long serialVersionUID = 202506201655001L; |
| | | |
| | | // 搅拌设定时间 单位秒 |
| | | @NotNull(message = "搅拌设定时间不能为空") |
| | | public Integer stirDuration ; |
| | | |
| | | // 注肥设定时间 单位秒 |
| | | @NotNull(message = "注肥设定时间不能为空") |
| | | public Integer injectDuration ; |
| | | } |