Merge branch 'master' of http://8.140.179.55:20000/r/pipIrr-SV
2 文件已重命名
42个文件已修改
3个文件已添加
| | |
| | | |
| | | private final GenericObjectPool<MqttClient> pool; |
| | | |
| | | public MqttClientPool(String broker, String username, String password, int maxConnections) { |
| | | MqttClientPooledObjectFactory factory = new MqttClientPooledObjectFactory(broker, username, password); |
| | | public MqttClientPool(String broker, String username, String password, int maxConnections, boolean useMemoryPersistence) { |
| | | MqttClientPooledObjectFactory factory = new MqttClientPooledObjectFactory(broker, username, password, useMemoryPersistence); |
| | | GenericObjectPoolConfig<MqttClient> config = new GenericObjectPoolConfig<>(); |
| | | config.setMaxTotal(maxConnections); |
| | | config.setMaxIdle(maxConnections); |
| | |
| | | import org.apache.commons.pool2.impl.DefaultPooledObject; |
| | | import org.eclipse.paho.client.mqttv3.MqttClient; |
| | | import org.eclipse.paho.client.mqttv3.MqttConnectOptions; |
| | | import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; |
| | | |
| | | /** |
| | | * @Author: liurunyu |
| | |
| | | private final String broker; |
| | | private final String username; |
| | | private final String password; |
| | | private final Boolean useMemoryPersistence; |
| | | |
| | | public MqttClientPooledObjectFactory(String broker, String username, String password) { |
| | | public MqttClientPooledObjectFactory(String broker, String username, String password, boolean useMemoryPersistence) { |
| | | this.broker = broker; |
| | | this.username = username; |
| | | this.password = password; |
| | | this.useMemoryPersistence = useMemoryPersistence; |
| | | } |
| | | |
| | | @Override |
| | | public MqttClient create() throws Exception { |
| | | String clientId = MqttClient.generateClientId(); |
| | | MqttClient client = new MqttClient(broker, clientId); |
| | | |
| | | MqttClient client = null ; |
| | | // 使ç¨å
åæä¹
åèéé»è®¤çæä»¶æä¹
å |
| | | if (useMemoryPersistence) { |
| | | MemoryPersistence persistence = new MemoryPersistence(); |
| | | client = new MqttClient(broker, clientId, persistence); |
| | | }else{ |
| | | client = new MqttClient(broker, clientId); |
| | | } |
| | | MqttConnectOptions options = new MqttConnectOptions(); |
| | | options.setUserName(username); |
| | | options.setPassword(password.toCharArray()); |
| | |
| | | public static void main(String[] args) { |
| | | try{ |
| | | // åå§åè¿æ¥æ± |
| | | pool = new MqttClientPool("tcp://" + mqSvIp + ":" + mqSvPort, mqSvUserName, mqSvUserPassword, maxConnections); |
| | | pool = new MqttClientPool("tcp://" + mqSvIp + ":" + mqSvPort, mqSvUserName, mqSvUserPassword, maxConnections, true); |
| | | MqttClient clientSub = pool.popClient() ; |
| | | testSubscribe(clientSub, topic1); |
| | | testSubscribe(clientSub, topic2); |
| | |
| | | index += 8 ; |
| | | cdData.orderNo = ByteUtil.BCD2String_BE(bs, index, index + 7) ; |
| | | |
| | | index += 8 ; |
| | | cdData.startDt = GlParse.parseRtuDt(bs, index) ; |
| | | |
| | | if(cdData.clResult == (byte)0x81){ |
| | | //2025-06-11 çæ±æµ·å®æ |
| | | //失败äºï¼ä¸é¢å°±æ²¡ææ°æ®äº |
| | | return ; |
| | | } |
| | | |
| | | index += 8 ; |
| | | cdData.startDt = GlParse.parseRtuDt(bs, index) ; |
| | | |
| | | |
| | | index += 6 ; |
| | | cdData.endDt = GlParse.parseRtuDt(bs, index) ; |
| | | |
| | |
| | | vo.orgTag = topicGrp[0] ; |
| | | vo.protocol = topicGrp[1] ; |
| | | vo.devId = topicGrp[2] ; |
| | | vo.topic = topicGrp[3] ; |
| | | vo.name = topicGrp[3] ; |
| | | return vo ; |
| | | } |
| | | }else{ |
| | |
| | | } |
| | | |
| | | public static String createPubTopic(MqttTopic tp) throws Exception { |
| | | return tp.orgTag + "/" + tp.protocol + "/" + tp.devId + "/" + tp.topic ; |
| | | return tp.orgTag + "/" + tp.protocol + "/" + tp.devId + "/" + tp.name; |
| | | } |
| | | |
| | | public static MqttSubMsg parseSubMsg(MqttTopic subTopic, MqttMessage mqttMsg, MqttCallback callback) throws Exception { |
| | |
| | | |
| | | public String mqttResultSendWebUrl ;//Mqttè¿åå½ä»¤ç»æ ååç®çå°web URL |
| | | |
| | | public String topic ;//æ¶æ¯ä¸»é¢ |
| | | public MqttTopic topic ;//æ¶æ¯ä¸»é¢ |
| | | public String msg ;//æ¶æ¯ |
| | | |
| | | public boolean isCacheForOffLine ;//ä¸è¡å½ä»¤æ§å¶ï¼æ¶æ¯ä¸é´ä»¶ä¸å¨çº¿æ¯å¦ç¼åå½ä»¤ |
| | |
| | | public String deviceId ;//设å¤ID |
| | | public String protocol;//åè®® |
| | | |
| | | public String topic ;//æ¶æ¯ä¸»é¢ |
| | | public String msg ;//æ¶æ¯ |
| | | public MqttTopic topic ;//æ¶æ¯ä¸»é¢ |
| | | public String metaData;//MQTTæ¨éæ¥çå
æ°æ® |
| | | |
| | | public abstract boolean valid(); |
| | | |
| | |
| | | public String orgTag ;//ç»ç»æ è¯ |
| | | public String protocol ;//åè®®åç§° |
| | | public String devId ;//设å¤ï¼FBoxï¼ID |
| | | public String topic ;//æ¶æ¯ä¸»é¢ |
| | | public String name;//æ¶æ¯ä¸»é¢æ«ç«¯åç§° |
| | | |
| | | public boolean isEmpty(){ |
| | | return orgTag == null || protocol == null || devId == null || name == null |
| | | || orgTag.trim().length() == 0 || protocol.trim().length() == 0 || devId.trim().length() == 0 || name.trim().length() == 0 ; |
| | | } |
| | | |
| | | public String shortName(){ |
| | | return name; |
| | | } |
| | | |
| | | public String longName(){ |
| | | return orgTag + "/" + protocol + "/" + devId + "/" + name; |
| | | } |
| | | } |
| | |
| | | public static final String cd_Stir = "01" ;//æ
æå¯åå½ä»¤ |
| | | public static final String cd_Inject = "02" ;//注è¥å¯åå½ä»¤ |
| | | public static final String cd_Irr = "03" ;//çæºå¯åå½ä»¤ |
| | | public static final String cd_Param = "10" ;//设å®åæ° |
| | | } |
| | |
| | | public MqttSubMsgSdV1(MqttTopic subTopic, String msg) { |
| | | this.deviceId = subTopic.devId ; |
| | | this.protocol = subTopic.protocol ; |
| | | this.topic = subTopic.topic ; |
| | | this.msg = msg ; |
| | | this.topic = subTopic ; |
| | | this.metaData = msg ; |
| | | } |
| | | public String toString(){ |
| | | StringBuilder sb = new StringBuilder(); |
| | |
| | | .append("\n") ; |
| | | } |
| | | sb.append("主é¢:") |
| | | .append(topic) |
| | | .append("\n") ; |
| | | sb.append("æ¶æ¯:") |
| | | .append(msg) |
| | | .append(topic.longName()) |
| | | .append("\n") ; |
| | | if(vo4Up != null){ |
| | | sb.append("æ°æ®:") |
| | | .append(vo4Up.toString()) |
| | | .append("\n") ; |
| | | }else{ |
| | | sb.append("å
æ°æ®:") |
| | | .append(metaData) |
| | | .append("\n") ; |
| | | } |
| | | |
| | | return sb.toString() ; |
| | | } |
| | | |
| | | public boolean subMsgMatchPubMsg(MqttPubMsg pubMsg){ |
| | | if (pubMsg instanceof MqttPubMsgSdV1) { |
| | | MqttPubMsgSdV1 pubMsgSdV1 = (MqttPubMsgSdV1) pubMsg; |
| | | //MqttPubMsgSdV1 pubMsgSdV1 = (MqttPubMsgSdV1) pubMsg; |
| | | if(this.vo4Up != null && this.vo4Up instanceof StateVo){ |
| | | //åªè¦ä¸æ¥çæ¯ç¶ææ°æ®ï¼è¯´æè®¾å¤ååºäºå½ä»¤ |
| | | return true ; |
| | | } |
| | | } |
| | |
| | | if (topic == null || topic.isEmpty()) { |
| | | return false; |
| | | } |
| | | if (msg == null || msg.isEmpty()) { |
| | | if (metaData == null || metaData.isEmpty()) { |
| | | return false; |
| | | } |
| | | return true; |
| | |
| | | public static final String PubTopicStir = "ctrlStir" ;//æ
æå¯å |
| | | public static final String PubTopicInject = "ctrlInject" ;//注è¥å¯å |
| | | public static final String PubTopicIrr = "ctrlIrr" ;//çæºå¯å |
| | | public static final String PubTopicParam = "setParam" ;//è®¾ç½®åæ° |
| | | |
| | | } |
| | |
| | | import com.alibaba.fastjson2.JSONObject; |
| | | import com.dy.common.mw.protocol.Command; |
| | | import com.dy.common.mw.protocol4Mqtt.MqttCallback; |
| | | import com.dy.common.mw.protocol4Mqtt.MqttMsgParser; |
| | | import com.dy.common.mw.protocol4Mqtt.MqttTopic; |
| | | import com.dy.common.mw.protocol4Mqtt.Vo4Up; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.comParam.ComCtrlVo; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.comParam.ComSetParamVo; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.FaultClearVo; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.InjectStartVo; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.ParamSetVo; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.StirStartVo; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.upVos.*; |
| | | import com.dy.common.mw.protocol4Mqtt.status.DevRunInfo; |
| | | import org.eclipse.paho.client.mqttv3.MqttMessage; |
| | | |
| | | /** |
| | |
| | | */ |
| | | public class ProtocolParserSdV1 { |
| | | public MqttSubMsgSdV1 parseSubMsg(MqttTopic subTopic, MqttMessage mqttMsg, MqttCallback callback) throws Exception { |
| | | String msg = new String(mqttMsg.getPayload(), "UTF-8"); |
| | | if(JSON.isValid(msg)){ |
| | | throw new Exception("æ¥æ¶å°MQTTæ¶æ¯ï¼åè®®" + subTopic.protocol + "ï¼è®¾å¤ID" + subTopic.devId + "ï¼ä¸»é¢" + subTopic.topic + "æ¶æ¯æ ¼å¼éjsonæ°æ®(" + msg + ")") ; |
| | | String strTxt = new String(mqttMsg.getPayload(), "UTF-8"); |
| | | if(!JSON.isValid(strTxt)){ |
| | | throw new Exception("æ¥æ¶å°MQTTæ¶æ¯ï¼åè®®" + subTopic.protocol + "ï¼è®¾å¤ID" + subTopic.devId + "ï¼ä¸»é¢" + subTopic.longName() + "æ¶æ¯æ ¼å¼éjsonæ°æ®(" + strTxt + ")") ; |
| | | } |
| | | MqttSubMsgSdV1 ms = new MqttSubMsgSdV1(subTopic, msg); |
| | | MqttSubMsgSdV1 msg = new MqttSubMsgSdV1(subTopic, strTxt); |
| | | Vo4Up vo ; |
| | | switch (subTopic.topic) { |
| | | DevRunInfo stInfo = null ; |
| | | switch (subTopic.name) { |
| | | case ProtocolConstantSdV1.SubTopicWeather -> { |
| | | vo = JSON.parseObject(msg, WeatherVo.class); |
| | | vo = JSON.parseObject(strTxt, WeatherVo.class); |
| | | break; |
| | | } |
| | | case ProtocolConstantSdV1.SubTopicSoil -> { |
| | | vo = JSON.parseObject(msg, SoilVo.class); |
| | | vo = JSON.parseObject(strTxt, SoilVo.class); |
| | | break; |
| | | } |
| | | case ProtocolConstantSdV1.SubTopicManure -> { |
| | | vo = JSON.parseObject(msg, ManureVo.class); |
| | | vo = JSON.parseObject(strTxt, ManureVo.class); |
| | | break; |
| | | } |
| | | case ProtocolConstantSdV1.SubTopicState -> { |
| | | //æ¤å¤æªå®æï¼åºè¯¥äº§çä¸äºéä¿¡çinfoï¼ä¾ä¸é¢callback.notify(objs)éç¥åºå» |
| | | vo = JSON.parseObject(msg, StateVo.class); |
| | | vo = JSON.parseObject(strTxt, StateVo.class); |
| | | StateVo stVo = (StateVo)vo ; |
| | | stInfo = new DevRunInfo() ; |
| | | stInfo.devId = msg.deviceId ; |
| | | stInfo.stirRunning = (stVo.stirRunning==null?false:(stVo.stirRunning.byteValue()==1?true:false)) ; //æ
æè¿è¡ trueæ¯ falseå¦ |
| | | stInfo.injectRunning = (stVo.injectRunning==null?false:(stVo.injectRunning.byteValue()==1?true:false)) ; //注è¥è¿è¡ trueæ¯ falseå¦ |
| | | stInfo.irrRunning = (stVo.irrRunning==null?false:(stVo.irrRunning.byteValue()==1?true:false)) ; //çæºè¿è¡ trueæ¯ falseå¦ |
| | | stInfo.alarm = (stVo.alarm==null?false:(stVo.alarm.byteValue()==1?true:false)) ; //æ¥è¦ trueæ¯ falseå¦ |
| | | break; |
| | | } |
| | | default -> { |
| | | throw new Exception("æ¥æ¶å°MQTTæ¶æ¯ï¼åè®®" + subTopic.protocol + "ï¼è®¾å¤ID" + subTopic.devId + "ï¼ä¸»é¢" + subTopic.topic + "æ¶æ¯è§£æé»è¾æªå®ç°"); |
| | | throw new Exception("æ¥æ¶å°MQTTæ¶æ¯ï¼åè®®" + subTopic.protocol + "ï¼è®¾å¤ID" + subTopic.devId + "ï¼ä¸»é¢" + subTopic.name + "æ¶æ¯è§£æé»è¾æªå®ç°"); |
| | | } |
| | | } |
| | | ms.vo4Up = vo ; |
| | | callback.callback(ms); |
| | | callback.notify(null);//æ¤å¤æªå®æ |
| | | return ms; |
| | | msg.vo4Up = vo ; |
| | | callback.callback(msg); |
| | | callback.notify(msg.deviceId, stInfo); |
| | | return msg; |
| | | } |
| | | |
| | | public MqttPubMsgSdV1 createPubMsg(String orgTag, Command com) throws Exception { |
| | |
| | | msg = this.createPubMsgOfIrr(orgTag, com); |
| | | break; |
| | | } |
| | | case CodeSdV1.cd_Param -> { |
| | | //è®¾ç½®åæ° |
| | | this.checkParam(com); |
| | | this.checkRtnWebUrl(com); |
| | | msg = this.createPubMsgOfParam(orgTag, com); |
| | | break; |
| | | } |
| | | default -> { |
| | | throw new Exception("æ¥æ¶å°MQTTå½ä»¤ï¼åè®®" + com.protocol + "çæ¬" + com.protocolVersion + "åè½ç " + com.code + "æé 卿ªå®ç°"); |
| | | } |
| | |
| | | msg.isCacheForOffLine = false ; |
| | | msg.hasResponse = true ; |
| | | msg.cd = CodeSdV1.cd_Fault ; |
| | | msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ; |
| | | msg.msg = JSON.toJSONString(new FaultClearVo(cvo.isDo)) ; |
| | | msg.topic = new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault) ; |
| | | msg.msg = JSON.toJSONString(new FaultClearVo(cvo.startTrueStopFalse ?(byte)1:0)) ; |
| | | return msg ; |
| | | } |
| | | private MqttPubMsgSdV1 createPubMsgOfStir(String orgTag, Command com) throws Exception { |
| | |
| | | msg.isCacheForOffLine = false ; |
| | | msg.hasResponse = true ; |
| | | msg.cd = CodeSdV1.cd_Fault ; |
| | | msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ; |
| | | msg.msg = JSON.toJSONString(new StirStartVo(cvo.isDo)) ; |
| | | msg.topic = new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicStir) ; |
| | | msg.msg = JSON.toJSONString(new StirStartVo(cvo.startTrueStopFalse ?(byte)1:0)) ; |
| | | return msg ; |
| | | } |
| | | private MqttPubMsgSdV1 createPubMsgOfInject(String orgTag, Command com) throws Exception { |
| | |
| | | msg.isCacheForOffLine = false ; |
| | | msg.hasResponse = true ; |
| | | msg.cd = CodeSdV1.cd_Fault ; |
| | | msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ; |
| | | msg.msg = JSON.toJSONString(new InjectStartVo(cvo.isDo)) ; |
| | | msg.topic = new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicInject) ; |
| | | msg.msg = JSON.toJSONString(new InjectStartVo(cvo.startTrueStopFalse ?(byte)1:0)) ; |
| | | return msg ; |
| | | } |
| | | private MqttPubMsgSdV1 createPubMsgOfIrr(String orgTag, Command com) throws Exception { |
| | |
| | | msg.isCacheForOffLine = false ; |
| | | msg.hasResponse = true ; |
| | | msg.cd = CodeSdV1.cd_Fault ; |
| | | msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ; |
| | | msg.msg = JSON.toJSONString(new StirStartVo(cvo.isDo)) ; |
| | | msg.topic = new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicIrr) ; |
| | | msg.msg = JSON.toJSONString(new StirStartVo(cvo.startTrueStopFalse ?(byte)1:0)) ; |
| | | return msg ; |
| | | } |
| | | |
| | | private MqttPubMsgSdV1 createPubMsgOfParam(String orgTag, Command com) throws Exception { |
| | | JSONObject obj = (JSONObject) com.param; |
| | | String json = obj.toJSONString(); |
| | | ComSetParamVo cvo = JSON.parseObject(json, ComSetParamVo.class); |
| | | if(cvo == null){ |
| | | throw new Exception("json转ComSetParamVo为null") ; |
| | | } |
| | | MqttPubMsgSdV1 msg = new MqttPubMsgSdV1() ; |
| | | this.setPubMsgBase(com, msg); |
| | | msg.isCacheForOffLine = false ; |
| | | msg.hasResponse = false ; |
| | | msg.cd = CodeSdV1.cd_Param ; |
| | | msg.topic = new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicParam) ; |
| | | msg.msg = JSON.toJSONString(new ParamSetVo(cvo.stirDuration, cvo.injectDuration)) ; |
| | | return msg ; |
| | | } |
| | | |
| | |
| | | * @Description |
| | | */ |
| | | public class ComCtrlVo { |
| | | //æ¯å¦æ§å¶å¨ä½ï¼trueæ¯ï¼falseå¦ |
| | | //å¯åå¨ä½ï¼trueæ¯ï¼falseå¦ |
| | | //å¯ä»¥æ§è¡åè½ç 00ï¼01ï¼02ï¼03çå¨ä½ |
| | | public boolean isDo;// |
| | | public boolean startTrueStopFalse;// |
| | | } |
New file |
| | |
| | | package com.dy.common.mw.protocol4Mqtt.pSdV1.comParam; |
| | | |
| | | import com.alibaba.fastjson2.annotation.JSONField; |
| | | import lombok.AllArgsConstructor; |
| | | import lombok.Data; |
| | | import lombok.NoArgsConstructor; |
| | | |
| | | /** |
| | | * @Author: liurunyu |
| | | * @Date: 2025/6/11 17:03 |
| | | * @Description |
| | | */ |
| | | @Data |
| | | @NoArgsConstructor |
| | | @AllArgsConstructor |
| | | public class ComSetParamVo { |
| | | // æ
æè®¾å®æ¶é´ |
| | | public Integer stirDuration ; |
| | | |
| | | // 注è¥è®¾å®æ¶é´ |
| | | public Integer injectDuration ; |
| | | |
| | | } |
| | |
| | | @AllArgsConstructor |
| | | public class FaultClearVo implements Vo4Down { |
| | | @JSONField(name = "æ
éè§£é¤") |
| | | public boolean isDo ; |
| | | public Byte isDo ;//1æ¯ï¼0å¦ |
| | | |
| | | @Override |
| | | public String toString(){ |
| | | return "æ
éè§£é¤ï¼" + (isDo?"æ¯":"å¦") ; |
| | | return "æ
éè§£é¤ï¼" + (isDo==1?"æ¯":"å¦") ; |
| | | } |
| | | } |
| | |
| | | @AllArgsConstructor |
| | | public class InjectStartVo implements Vo4Down { |
| | | @JSONField(name = "注è¥å¯å") |
| | | public boolean isDo ;//true为å¯ï¼false为å |
| | | public Byte isDo ;//1æ¯ï¼0å¦ |
| | | |
| | | @Override |
| | | public String toString(){ |
| | | return "注è¥å¯åï¼" + (isDo?"å¯":"å") ; |
| | | return "注è¥å¯åï¼" + (isDo==1?"å¯":"å") ; |
| | | } |
| | | } |
| | |
| | | @AllArgsConstructor |
| | | public class IrrStartVo implements Vo4Down { |
| | | @JSONField(name = "çæºå¯å") |
| | | public boolean isDo ;//true为å¯ï¼false为å |
| | | public Byte isDo ;//1æ¯ï¼0å¦ |
| | | |
| | | @Override |
| | | public String toString(){ |
| | | return "çæºå¯åï¼" + (isDo?"å¯":"å") ; |
| | | return "çæºå¯åï¼" + (isDo==1?"æ¯":"å¦") ; |
| | | } |
| | | } |
New file |
| | |
| | | package com.dy.common.mw.protocol4Mqtt.pSdV1.downVos; |
| | | |
| | | import com.alibaba.fastjson2.annotation.JSONField; |
| | | import com.dy.common.mw.protocol4Mqtt.Vo4Down; |
| | | import lombok.AllArgsConstructor; |
| | | import lombok.Data; |
| | | import lombok.NoArgsConstructor; |
| | | |
| | | /** |
| | | * @Author: liurunyu |
| | | * @Date: 2025/6/11 16:55 |
| | | * @Description |
| | | */ |
| | | @Data |
| | | @NoArgsConstructor |
| | | @AllArgsConstructor |
| | | public class ParamSetVo implements Vo4Down { |
| | | @JSONField(name = "æ
æè®¾å®æ¶é´") |
| | | public Integer stirDuration ; |
| | | |
| | | @JSONField(name = "注è¥è®¾å®æ¶é´") |
| | | public Integer injectDuration ; |
| | | |
| | | @Override |
| | | public String toString(){ |
| | | StringBuilder sb = new StringBuilder(); |
| | | sb.append("æ
æè®¾å®æ¶é´ï¼" + stirDuration + "\n" ); |
| | | sb.append("注è¥è®¾å®æ¶é´ï¼" + injectDuration + "\n" ); |
| | | return sb.toString(); |
| | | } |
| | | } |
| | |
| | | @AllArgsConstructor |
| | | public class StirStartVo implements Vo4Down { |
| | | @JSONField(name = "æ
æå¯å") |
| | | public boolean isDo ;//true为å¯ï¼false为å |
| | | public Byte isDo ;//1æ¯ï¼0å¦ |
| | | |
| | | @Override |
| | | public String toString(){ |
| | | return "æ
æå¯åï¼" + (isDo?"å¯":"å") ; |
| | | return "æ
æå¯åï¼" + (isDo==1?"æ¯":"å¦") ; |
| | | } |
| | | } |
| | |
| | | */ |
| | | @Data |
| | | public class ManureVo implements Vo4Up { |
| | | @JSONField(name = "flexem_message_id") |
| | | public Integer messageId ;//æ¶æ¯ID |
| | | |
| | | @JSONField(name = "è¥ææµé") |
| | | public Float manureFlow ;//è¥ææµé |
| | | |
| | | @JSONField(name = "æ³¨è¥æ¶é¿") |
| | | public Integer manureTime ;//æ³¨è¥æ¶é¿ |
| | | |
| | | @JSONField(name = "æ
ææ¶é¿") |
| | | public Integer stirTime ;//æ
ææ¶é¿ |
| | | |
| | | @JSONField(name = "flexem_timestamp") |
| | | public Long devDt ;//è®¾å¤æ¶é´ |
| | | |
| | |
| | | @Override |
| | | public String toString(){ |
| | | StringBuilder sb = new StringBuilder(); |
| | | sb.append("æ°´è¥æ°æ®ï¼") ; |
| | | sb.append(" è®¾å¤æ¶é´ï¼"+devDt) ; |
| | | sb.append(" è®¾å¤æ¶é´ï¼"+ this.getDevDtStr()) ; |
| | | sb.append("æ°´è¥æ°æ®=>") ; |
| | | sb.append(" æ¶æ¯IDï¼" + messageId + ", ") ; |
| | | sb.append(" è¥ææµéï¼" + manureFlow + ", ") ; |
| | | sb.append(" æ³¨è¥æ¶é¿ï¼" + manureTime + ", ") ; |
| | | sb.append(" æ
ææ¶é¿ï¼" + stirTime + ", ") ; |
| | | sb.append(" è®¾å¤æ¶é´ï¼" + devDt + ", ") ; |
| | | sb.append(" è®¾å¤æ¶é´ï¼" + this.getDevDtStr() + ", ") ; |
| | | sb.append("\n") ; |
| | | return sb.toString() ; |
| | | } |
| | |
| | | */ |
| | | @Data |
| | | public class SoilVo implements Vo4Up { |
| | | @JSONField(name = "flexem_message_id") |
| | | public Integer messageId ;//æ¶æ¯ID |
| | | |
| | | @JSONField(name = "å壤湿度1") |
| | | public Float soilHumidity1 ;//å壤湿度1 |
| | | |
| | | @JSONField(name = "å壤湿度2") |
| | | public Float soilHumidity2 ;//å壤湿度2 |
| | | |
| | | @JSONField(name = "å壤湿度3") |
| | | public Float soilHumidity3 ;//å壤湿度3 |
| | | |
| | | @JSONField(name = "å壤湿度4") |
| | | public Float soilHumidity4 ;//å壤湿度4 |
| | | |
| | | @JSONField(name = "å壤湿度1") |
| | | public Float soilTemperature1 ;//å壤温度1 |
| | | |
| | | @JSONField(name = "å壤温度2") |
| | | public Float soilTemperature2 ;//å壤温度2 |
| | | |
| | | @JSONField(name = "å壤温度3") |
| | | public Float soilTemperature3 ;//å壤温度3 |
| | | |
| | | @JSONField(name = "å壤温度4") |
| | | public Float soilTemperature4 ;//å壤温度4 |
| | | |
| | | @JSONField(name = "flexem_timestamp") |
| | | public Long devDt ;//è®¾å¤æ¶é´ |
| | | |
| | |
| | | @Override |
| | | public String toString(){ |
| | | StringBuilder sb = new StringBuilder(); |
| | | sb.append("墿
æ°æ®ï¼") ; |
| | | sb.append(" è®¾å¤æ¶é´ï¼"+devDt) ; |
| | | sb.append(" è®¾å¤æ¶é´ï¼"+ this.getDevDtStr()) ; |
| | | sb.append("墿
æ°æ®=>") ; |
| | | sb.append(" æ¶æ¯IDï¼" + messageId + ", ") ; |
| | | sb.append(" å壤湿度1ï¼" + soilHumidity1 + ", ") ; |
| | | sb.append(" å壤湿度2ï¼" + soilHumidity2 + ", ") ; |
| | | sb.append(" å壤湿度3ï¼" + soilHumidity3 + ", ") ; |
| | | sb.append(" å壤湿度4ï¼" + soilHumidity4 + ", ") ; |
| | | sb.append(" å壤温度1ï¼" + soilTemperature1 + ", ") ; |
| | | sb.append(" å壤温度2ï¼" + soilTemperature2 + ", ") ; |
| | | sb.append(" å壤温度3ï¼" + soilTemperature3 + ", ") ; |
| | | sb.append(" å壤温度4ï¼" + soilTemperature4 + ", ") ; |
| | | sb.append(" è®¾å¤æ¶é´ï¼" + devDt + ", ") ; |
| | | sb.append(" è®¾å¤æ¶é´ï¼" + this.getDevDtStr() + ", ") ; |
| | | sb.append("\n") ; |
| | | return sb.toString() ; |
| | | |
| | | } |
| | | } |
| | |
| | | */ |
| | | @Data |
| | | public class StateVo implements Vo4Up { |
| | | @JSONField(name = "flexem_message_id") |
| | | public Integer messageId ;//æ¶æ¯ID |
| | | |
| | | @JSONField(name = "æ
æè¿è¡") |
| | | public Byte stirRunning ;//æ
æè¿è¡ |
| | | |
| | | @JSONField(name = "注è¥è¿è¡") |
| | | public Byte injectRunning ;//注è¥è¿è¡ |
| | | |
| | | @JSONField(name = "çæºè¿è¡") |
| | | public Byte irrRunning ;//çæºè¿è¡ |
| | | |
| | | @JSONField(name = "æ¥è¦") |
| | | public Byte alarm ;//æ¥è¦ |
| | | |
| | | @JSONField(name = "flexem_timestamp") |
| | | public Long devDt ;//è®¾å¤æ¶é´ |
| | | |
| | |
| | | @Override |
| | | public String toString(){ |
| | | StringBuilder sb = new StringBuilder(); |
| | | sb.append("ç¶ææ°æ®ï¼") ; |
| | | sb.append(" è®¾å¤æ¶é´ï¼"+devDt) ; |
| | | sb.append(" è®¾å¤æ¶é´ï¼"+ this.getDevDtStr()) ; |
| | | sb.append("ç¶ææ°æ®=>") ; |
| | | sb.append(" æ¶æ¯IDï¼" + messageId + ", ") ; |
| | | sb.append(" æ
æè¿è¡ï¼" + stirRunning + ", ") ; |
| | | sb.append(" 注è¥è¿è¡ï¼" + injectRunning + ", ") ; |
| | | sb.append(" çæºè¿è¡ï¼" + irrRunning + ", ") ; |
| | | sb.append(" æ¥è¦ï¼" + alarm + ", ") ; |
| | | sb.append(" è®¾å¤æ¶é´ï¼" + devDt + ", ") ; |
| | | sb.append(" è®¾å¤æ¶é´ï¼" + this.getDevDtStr() + ", ") ; |
| | | sb.append("\n") ; |
| | | return sb.toString() ; |
| | | } |
| | |
| | | |
| | | public String devDtStr ;//è®¾å¤æ¶é´ |
| | | public String getDevDtStr() { |
| | | if(devDt == null){ |
| | | if(devDt != null){ |
| | | return DateTime.yyyy_MM_dd_HH_mm_ss(DateTime.getDate(devDt)) ; |
| | | }else{ |
| | | return "" ; |
| | |
| | | @Override |
| | | public String toString(){ |
| | | StringBuilder sb = new StringBuilder(); |
| | | sb.append("æ°è±¡æ°æ®ï¼") ; |
| | | sb.append(" æ¶æ¯IDï¼"+messageId) ; |
| | | sb.append(" äºæ°§å碳ï¼"+carbonDioxide) ; |
| | | sb.append(" å
ç
§å¼ºåº¦ï¼"+lightIntensity) ; |
| | | sb.append(" 大æ°ååï¼"+atmosphericPressure) ; |
| | | sb.append(" ç©ºæ°æ¸©åº¦ï¼"+airTemperature) ; |
| | | sb.append(" ç©ºæ°æ¹¿åº¦ï¼"+airHumidity) ; |
| | | sb.append(" PM2.5ï¼"+pm25) ; |
| | | sb.append(" PM10ï¼"+pm10) ; |
| | | sb.append(" è®¾å¤æ¶é´ï¼"+devDt) ; |
| | | sb.append(" è®¾å¤æ¶é´ï¼"+ this.getDevDtStr()) ; |
| | | sb.append("æ°è±¡æ°æ®=>") ; |
| | | sb.append(" æ¶æ¯IDï¼" + messageId + ", ") ; |
| | | sb.append(" äºæ°§å碳ï¼" + carbonDioxide + ", ") ; |
| | | sb.append(" å
ç
§å¼ºåº¦ï¼" + lightIntensity + ", ") ; |
| | | sb.append(" 大æ°ååï¼" + atmosphericPressure + ", ") ; |
| | | sb.append(" ç©ºæ°æ¸©åº¦ï¼" + airTemperature + ", ") ; |
| | | sb.append(" ç©ºæ°æ¹¿åº¦ï¼" + airHumidity + ", ") ; |
| | | sb.append(" PM2.5ï¼" + pm25 + ", ") ; |
| | | sb.append(" PM10ï¼" + pm10 + ", ") ; |
| | | sb.append(" è®¾å¤æ¶é´ï¼" + devDt + ", ") ; |
| | | sb.append(" è®¾å¤æ¶é´ï¼" + this.getDevDtStr() + ", ") ; |
| | | sb.append("\n") ; |
| | | return sb.toString() ; |
| | | } |
File was renamed from pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineSt.java |
| | |
| | | * @Description |
| | | */ |
| | | @Data |
| | | public class DevOnLineSt implements MqttNotifyInfo { |
| | | public class DevOnLineInfo implements MqttNotifyInfo { |
| | | public String id ; |
| | | public String protocol ; |
| | | public Boolean onLine ; |
File was renamed from pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunSt.java |
| | |
| | | * @Description |
| | | */ |
| | | @Data |
| | | public class DevRunSt implements MqttNotifyInfo { |
| | | public String id ; |
| | | public class DevRunInfo implements MqttNotifyInfo { |
| | | public String devId ;//MQTT设置ID |
| | | public Boolean stirRunning ;//æ
æè¿è¡ trueæ¯ falseå¦ |
| | | public Boolean injectRunning ;//注è¥è¿è¡ trueæ¯ falseå¦ |
| | | public Boolean irrRunning ;//çæºè¿è¡ trueæ¯ falseå¦ |
| | |
| | | if(mqVo.poolMaxSize <= 1 || mqVo.poolMaxSize > 1000){ |
| | | throw new Exception("config.mqtt.poolMaxSizeé
ç½®çè¿æ¥æ± è¿æ¥æå¤§æ°éä¸åæ³") ; |
| | | } |
| | | mqVo.useMemoryPersistence = conf.getSetAttrBoolean(doc, "config.mqtt", "useMemoryPersistence", null, null) ; |
| | | String proAndDevIds = conf.getSetAttrTxt(doc, "config.mqtt", "protocolAndDeviceIds", null, false, null) ; |
| | | if(proAndDevIds == null || proAndDevIds.trim().equals("")){ |
| | | throw new Exception("config.mqtt.protocolAndDeviceIdsé
ç½®ä¸åæ³") ; |
| | |
| | | com2.code = CodeLocal.stopMqttSv ; |
| | | com2.type = CommandType.innerCommand ; |
| | | new CommandInnerDeaLer().deal(com2) ; |
| | | |
| | | log.info("å
³éç¨åºåï¼å
³éäºMQTTæå¡"); |
| | | }catch (Exception e){ |
| | | log.error("ç¨åºï¼æ§å¶å°ï¼å
³éé©ååçå¼å¸¸", e); |
| | | } |
| | |
| | | rCom = this.stopMqttSv(com); |
| | | break; |
| | | } |
| | | case CodeLocal.recoverMqttSv -> { |
| | | rCom = this.recoverMqttSv(com); |
| | | break; |
| | | } |
| | | default -> { |
| | | rCom = ReturnCommand.errored("åºéï¼æ¶å°å
é¨å½ä»¤çåè½ç ä¸è½è¯å«ï¼", com.getId(), com.getCode()); |
| | | break; |
| | |
| | | * @param message |
| | | */ |
| | | public static Command successed(String message, String commandId, String code, Object attachment) { |
| | | log.info(message); |
| | | Command command = new Command().createReturnSuccessCommand(message, commandId, code); |
| | | command.setAttachment(attachment); |
| | | return command; |
| | |
| | | * @param message |
| | | */ |
| | | public static Command successed(String message, String commandId, String code) { |
| | | log.info(message); |
| | | return new Command().createReturnSuccessCommand(message, commandId, code); |
| | | } |
| | | /** |
| | |
| | | * @param message |
| | | */ |
| | | public static Command errored(String message, String commandId, String code) { |
| | | log.error(message); |
| | | return new Command().createReturnErrorCommand(message, commandId, code); |
| | | } |
| | | } |
| | |
| | | |
| | | public static final String onPartLineMqtt = "LMCD0002" ;//æ¥è¯¢é¨åMQTT设å¤å¨çº¿æ
åµ |
| | | |
| | | public static final String onLineStatisticsMqtt = "LMCD0003" ;//æ¥è¯¢ææMQTT设å¤ç¶æç»è®¡æ
åµ |
| | | public static final String onLineStatisticsMqtt = "LMCD0003" ;//æ¥è¯¢ææMQTT设å¤å¨çº¿ç¶æç»è®¡æ
åµ |
| | | |
| | | public static final String allRtuStatesMqtt = "LMCD0010" ;//æ¥è¯¢ææMQTT设å¤ç¶æ |
| | | |
| | |
| | | |
| | | public static final String stopMqttSv = "LMCD0110" ;//忢Mqttæå¡ |
| | | |
| | | public static final String recoverMqttSv = "LMCD0112" ;//éå¯MQTTæå¡ï¼æ¥å
¥æ°çMQTTè¿æ¥ |
| | | |
| | | |
| | | } |
| | |
| | | package com.dy.rtuMw.server.mqtt; |
| | | |
| | | import com.dy.common.mw.protocol4Mqtt.status.DevRunSt; |
| | | import com.dy.common.mw.protocol4Mqtt.status.DevRunInfo; |
| | | import com.dy.rtuMw.server.forTcp.RtuLogDealer; |
| | | import com.dy.rtuMw.server.local.localProtocol.RtuOnLineStateStatisticsVo; |
| | | |
| | |
| | | Map.Entry<String, DevStatus> entry = null ; |
| | | while(it.hasNext()){ |
| | | entry = it.next() ; |
| | | if(((DevStatus)entry).onLine != null && ((DevStatus)entry).onLine.booleanValue()){ |
| | | if((entry.getValue()).onLine != null && (entry.getValue()).onLine.booleanValue()){ |
| | | vo.onLineNum++ ; |
| | | }else{ |
| | | vo.offLineNum++ ; |
| | |
| | | } |
| | | } |
| | | |
| | | public static void setStatus(String devId, DevRunSt st){ |
| | | public static void setStatus(String devId, DevRunInfo st){ |
| | | DevStatus vo = map.get(devId) ; |
| | | if(vo != null) { |
| | | if(st.stirRunning != null){ |
| | |
| | | import com.dy.common.mw.channel.mqtt.MqttClientPool; |
| | | import com.dy.common.mw.protocol4Mqtt.MqttNotify; |
| | | import com.dy.common.mw.protocol4Mqtt.MqttNotifyInfo; |
| | | import com.dy.common.mw.protocol4Mqtt.status.DevOnLineSt; |
| | | import com.dy.common.mw.protocol4Mqtt.status.DevRunSt; |
| | | import com.dy.common.mw.protocol4Mqtt.status.DevOnLineInfo; |
| | | import com.dy.common.mw.protocol4Mqtt.status.DevRunInfo; |
| | | import com.dy.rtuMw.server.ServerProperties; |
| | | import org.apache.logging.log4j.LogManager; |
| | | import org.apache.logging.log4j.Logger; |
| | | import org.eclipse.paho.client.mqttv3.MqttClient; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | |
| | | /** |
| | | * @Author: liurunyu |
| | |
| | | private MqttUnitConfigVo configVo ; |
| | | |
| | | private MqttClientPool pool; |
| | | |
| | | private List<MqttClient> subClients ; |
| | | |
| | | private MqttManager(){ |
| | | } |
| | |
| | | * @throws Exception |
| | | */ |
| | | public void start()throws Exception{ |
| | | subClients = new ArrayList<>(); |
| | | String URL = "tcp://" + this.configVo.svIp + ":" + this.configVo.svPort; |
| | | this.pool = new MqttClientPool(URL, this.configVo.svUserName, this.configVo.svUserPassword, this.configVo.poolMaxSize); |
| | | this.pool = new MqttClientPool(URL, this.configVo.svUserName, this.configVo.svUserPassword, this.configVo.poolMaxSize, this.configVo.useMemoryPersistence); |
| | | if(this.pool.isClose()){ |
| | | throw new Exception("Mqttè¿æ¥æ± åå§å失败"); |
| | | } |
| | |
| | | if(clientSub == null || !clientSub.isConnected()){ |
| | | throw new Exception("Mqttè¿æ¥æ± è·å¾è®¢é
è¿æ¥ä¸å¯ç¨"); |
| | | } |
| | | subClients.add(clientSub) ; |
| | | // 订é
ä¸»é¢ |
| | | for(int i = 0; i < this.configVo.subTopics.length; i++){ |
| | | for(int j = 0 ; j < this.configVo.protocolAndDeviceIds.length; j++){ |
| | |
| | | public void notify(String devId, MqttNotifyInfo... infos) { |
| | | if(devId != null && infos != null && infos.length > 0){ |
| | | for(MqttNotifyInfo info : infos){ |
| | | if(info instanceof DevOnLineSt){ |
| | | DevOnLineSt onLineSt = (DevOnLineSt)info; |
| | | if(info instanceof DevOnLineInfo){ |
| | | DevOnLineInfo onLineSt = (DevOnLineInfo)info; |
| | | if(onLineSt.onLine != null && onLineSt.onLine.booleanValue()){ |
| | | DevStatusDealer.onLine(devId, ((DevOnLineSt)info).protocol); |
| | | DevStatusDealer.onLine(devId, ((DevOnLineInfo)info).protocol); |
| | | }else{ |
| | | DevStatusDealer.offLine(devId); |
| | | } |
| | | } else if(info instanceof DevRunSt){ |
| | | DevStatusDealer.setStatus(devId, (DevRunSt)info); |
| | | } else if(info instanceof DevRunInfo){ |
| | | DevStatusDealer.setStatus(devId, (DevRunInfo)info); |
| | | } |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | public void stop()throws Exception{ |
| | | if(subClients != null && subClients.size() > 0){ |
| | | for (MqttClient client : subClients) { |
| | | if(client != null && client.isConnected()){ |
| | | try{ |
| | | client.disconnect(); |
| | | client.close(); |
| | | }catch (Exception e){ |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | if(this.pool != null){ |
| | | // å
³éè¿æ¥æ± |
| | | this.pool.close(); |
| | |
| | | |
| | | @Override |
| | | public void messageArrived(String topic, MqttMessage msg) throws Exception { |
| | | MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic) ; |
| | | MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback(){ |
| | | @Override |
| | | public void callback(MqttSubMsg subMsg) { |
| | | DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol); |
| | | DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId); |
| | | RtuLogDealer.log4Mqtt(subMsg.deviceId, "订é
æ¶æ¯ 主é¢ï¼" + subMsg.topic + " æ¶æ¯ï¼" + subMsg.msg); |
| | | } |
| | | @Override |
| | | public void notify(String devId, MqttNotifyInfo... infos) { |
| | | if(notify != null){ |
| | | notify.notify(devId, infos) ; |
| | | try { |
| | | MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic); |
| | | MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback() { |
| | | @Override |
| | | public void callback(MqttSubMsg subMsg) { |
| | | DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol); |
| | | DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId); |
| | | RtuLogDealer.log4Mqtt(subMsg.deviceId, "订é
æ¶æ¯ 主é¢ï¼" + subMsg.topic.longName() + " å
æ°æ®ï¼" + subMsg.metaData); |
| | | } |
| | | } |
| | | }) ; |
| | | this.nextDeal(subMsg); |
| | | } |
| | | private void nextDeal(MqttSubMsg subMsg)throws Exception { |
| | | subMsg.action(new Callback() { |
| | | @Override |
| | | public void call(Object obj) { |
| | | MqttSubMsg subMs = (MqttSubMsg) obj ; |
| | | MqttPubMsg pubMs = MqttPubMsgCache.matchFromTail(subMs) ; |
| | | if(pubMs != null){ |
| | | //å¹é
å°ä¸è¡æ¶æ¯ï¼å½ä»¤ï¼ |
| | | subMs.mqttResultSendWebUrl = pubMs.mqttResultSendWebUrl ; |
| | | subMs.commandId = pubMs.commandId ; |
| | | try { |
| | | MqttComResultCache.getInstance().cacheMqttComResult(new MqttComResultNode(subMs)); |
| | | } catch (Exception e) { |
| | | log.error("ç¼åå叿¶æ¯ï¼å½ä»¤ï¼ç»æåçå¼å¸¸", e); |
| | | |
| | | @Override |
| | | public void notify(String devId, MqttNotifyInfo... infos) { |
| | | if (notify != null) { |
| | | notify.notify(devId, infos); |
| | | } |
| | | } |
| | | try{ |
| | | MqttSubMsgCache.getInstance().cacheMsg(new MqttSubMsgNode(subMsg)); |
| | | }catch (Exception e){ |
| | | log.error("ç¼å订é
æ¶æ¯æ°æ®åçå¼å¸¸", e); |
| | | } |
| | | } |
| | | @Override |
| | | public void call(Object... objs) { |
| | | } |
| | | @Override |
| | | public void exception(Exception e) { |
| | | } |
| | | }); |
| | | }); |
| | | this.nextDeal(subMsg); |
| | | }catch(Exception e){ |
| | | log.error("å¤çMQTT订é
æ¶æ¯åçå¼å¸¸", e); |
| | | } |
| | | } |
| | | private void nextDeal(MqttSubMsg subMsg)throws Exception { |
| | | subMsg.action(new MqttSubMsgDealer()); |
| | | } |
| | | } |
| | |
| | | while(node != null && node.obj != null){ |
| | | obj = (MqttPubMsgNode)node.obj; |
| | | pubMsg = obj.result ; |
| | | if(pubMsg != null |
| | | && subMsg.subMsgMatchPubMsg(pubMsg)){ |
| | | if(pubMsg != null && subMsg.subMsgMatchPubMsg(pubMsg)){ |
| | | obj.onceReceivedResult = true ;//æ è¯å·²ç»æ¶å°å½ä»¤ç»æ |
| | | return pubMsg; |
| | | }else{ |
| | |
| | | }else{ |
| | | if(mqttClient != null && mqttClient.isConnected()){ |
| | | try { |
| | | mqttManager.publishMsg(mqttClient, this.result.topic, this.result.msg); |
| | | mqttManager.publishMsg(mqttClient, this.result.topic.longName(), this.result.msg); |
| | | DevStatusDealer.afterSendPubMessage(this.result.deviceId); |
| | | RtuLogDealer.log4Mqtt(this.result.deviceId, "å叿¶æ¯ 主é¢ï¼" + this.result.topic + " æ¶æ¯ï¼" + this.result.msg); |
| | | log.info("åå¸MQTTæ¶æ¯ï¼ä¸»é¢=" + this.result.topic + "ï¼" + this.result.msg); |
| | |
| | | }finally { |
| | | mqttManager.pushMqttClient(mqttClient); |
| | | } |
| | | return false ; |
| | | if(this.result.hasResponse){ |
| | | return false ; |
| | | }else{ |
| | | return true ; |
| | | } |
| | | }else{ |
| | | //æªæ¾è¿æ¥MQTTæå¡å¨ |
| | | return this.decideRemoveNodeFromCach(now) ; |
New file |
| | |
| | | package com.dy.rtuMw.server.mqtt; |
| | | |
| | | import com.dy.common.mw.protocol4Mqtt.MqttPubMsg; |
| | | import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; |
| | | import com.dy.common.util.Callback; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | |
| | | /** |
| | | * @Author: liurunyu |
| | | * @Date: 2025/6/11 17:33 |
| | | * @Description |
| | | */ |
| | | @Slf4j |
| | | public class MqttSubMsgDealer implements Callback { |
| | | @Override |
| | | public void call(Object obj) { |
| | | MqttSubMsg subMs = (MqttSubMsg) obj ; |
| | | MqttPubMsg pubMs = MqttPubMsgCache.matchFromTail(subMs) ; |
| | | if(pubMs != null){ |
| | | //å¹é
å°ä¸è¡æ¶æ¯ï¼å½ä»¤ï¼ |
| | | subMs.mqttResultSendWebUrl = pubMs.mqttResultSendWebUrl ; |
| | | subMs.commandId = pubMs.commandId ; |
| | | try { |
| | | MqttComResultCache.getInstance().cacheMqttComResult(new MqttComResultNode(subMs)); |
| | | } catch (Exception e) { |
| | | log.error("ç¼åå叿¶æ¯ï¼å½ä»¤ï¼ç»æåçå¼å¸¸", e); |
| | | } |
| | | } |
| | | try{ |
| | | MqttSubMsgCache.getInstance().cacheMsg(new MqttSubMsgNode(subMs)); |
| | | }catch (Exception e){ |
| | | log.error("ç¼å订é
æ¶æ¯æ°æ®åçå¼å¸¸", e); |
| | | } |
| | | } |
| | | @Override |
| | | public void call(Object... objs) { |
| | | } |
| | | @Override |
| | | public void exception(Exception e) { |
| | | } |
| | | } |
| | |
| | | public String svUserName ;// |
| | | public String svUserPassword ;// |
| | | public Integer poolMaxSize ;// |
| | | public Boolean useMemoryPersistence ; |
| | | public String[] protocolAndDeviceIds ;//设å¤åè®®ä¸IDï¼FBoxï¼id |
| | | public String[] deviceIds ;//设å¤ï¼FBoxï¼id |
| | | public String[] subTopics ;//订é
çä¸»é¢ |
| | |
| | | this.svUserName = "dyyjy" ; |
| | | this.svUserPassword = "Dyyjy2025,;.abc!@#" ; |
| | | this.poolMaxSize = 10 ; |
| | | useMemoryPersistence = true ; |
| | | this.pubTopicQos = 1 ; |
| | | this.noSubThenOff = 10 * 60 * 10000L ; |
| | | } |
| | |
| | | toNext = true ; |
| | | } |
| | | } |
| | | if(this.needSave2Db( d.getRtuAddr())){ |
| | | //2025-06-11 å 为81å½ä»¤æ¯RTUç¶ææ¹åæ¶å³ä¸æ¥ï¼æä»¥å¯ä»¥åæ¶ä½ç°ç¶æååï¼è¿èä¸åå卿§å¶ |
| | | //if(this.needSave2Db( d.getRtuAddr())){ |
| | | //å¨ä¸ä¸ªå°æ¶å
没æå卿¥è¦æ°æ® |
| | | this.toMsCenter(controller, d.getRtuAddr(), dV1, dataCd81Vo.alarmVo, dataCd81Vo.stateVo); |
| | | this.saveOrUpdateLast(sv, controller, d.getRtuAddr(), dataCd81Vo.rtuDt, dV1, dataCd81Vo.alarmVo, dataCd81Vo.stateVo); |
| | | this.saveHistory(sv, controller, d.getRtuAddr(), dataCd81Vo.rtuDt, dV1, dataCd81Vo.alarmVo, dataCd81Vo.stateVo); |
| | | this.cacheSaveTime(d.getRtuAddr()); |
| | | } |
| | | //this.cacheSaveTime(d.getRtuAddr()); |
| | | //} |
| | | } |
| | | } else if (cdObj instanceof DataCd84Vo) { |
| | | //éå¼å·¥ä½æ¥ |
| | |
| | | Object[] objs = this.getTaskResults(TkPreGenObjs.taskId); |
| | | DbSv sv = (DbSv) objs[0]; |
| | | PrController controller = (PrController) objs[1]; |
| | | /** éå¼å·¥ä½æ¥ï¼éé¨ç¶æä¸å®æ¯æå¼ç¶æï¼æä»¥ä¸è®¾ç½®ä¸é¢æ°æ®ä»¥å¤åç»ä»»å¡è®¾ç½®èæå¡ç¶æ |
| | | /* éå¼å·¥ä½æ¥ï¼éé¨ç¶æä¸å®æ¯æå¼ç¶æï¼æä»¥ä¸è®¾ç½®ä¸é¢æ°æ®ä»¥å¤åç»ä»»å¡è®¾ç½®èæå¡ç¶æ |
| | | if(dataCd84Vo.stateVo != null && dataCd84Vo.stateVo.valveState != null && dataCd84Vo.stateVo.valveState.byteValue() == DataStateVo.ValveCloseState){ |
| | | if(controller != null && controller.getIntakeId() != null){ |
| | | this.taskResult = new Object[]{controller.getIntakeId(), DataStateVo.ValveCloseState} ; |
| | |
| | | //éæºèªæ¥æ¥ |
| | | DataCd81Vo dataCd81Vo = (DataCd81Vo)cdObj ; |
| | | if (dataCd81Vo.alarmVo != null || dataCd81Vo.stateVo != null) { |
| | | if(this.needSave2Db( d.getRtuAddr())) { |
| | | //2025-06-11 å 为81å½ä»¤æ¯RTUç¶ææ¹åæ¶å³ä¸æ¥ï¼æä»¥å¯ä»¥åæ¶ä½ç°ç¶æååï¼è¿èä¸åå卿§å¶ |
| | | //if(this.needSave2Db( d.getRtuAddr())) { |
| | | //å¨ä¸ä¸ªå°æ¶å
没æå卿¥è¦æ°æ® |
| | | Object[] objs = this.getTaskResults(TkPreGenObjsV2.taskId); |
| | | DbSv sv = (DbSv) objs[0]; |
| | |
| | | toNext = true ; |
| | | } |
| | | } |
| | | } |
| | | //} |
| | | } |
| | | } else if (cdObj instanceof DataCd84Vo) { |
| | | //éå¼å·¥ä½æ¥ |
| | |
| | | PrController controller = (PrController)objs[1] ; |
| | | if(cdObj instanceof DataCd81Vo){ |
| | | //设å¤ç»ç«¯éæºèªæ¥ |
| | | if(this.needSave2Db( d.getRtuAddr())) { |
| | | //å¨ä¸ä¸ªå°æ¶å
没æå卿¥è¦æ°æ® |
| | | DataCd81Vo cdData = (DataCd81Vo)(cdObj) ; |
| | | this.toMsCenter(controller, d.getRtuAddr(), dV202404, cdData, dV202404.dt); |
| | | this.saveOrUpdateLast(sv, controller, d.getRtuAddr(), cdData.rtuDt, cdData, dV202404.dt) ; |
| | | this.saveHistory(sv, controller, d.getRtuAddr(), cdData.rtuDt, cdData, dV202404.dt) ; |
| | | this.cacheSaveTime(d.getRtuAddr()); |
| | | if(cdData.statePump != null && cdData.statePump.byteValue() == AlarmVo.DevCloseState){ |
| | | if(controller != null && controller.getIntakeId() != null){ |
| | | this.taskResult = new Object[]{controller.getIntakeId(), AlarmVo.DevCloseState} ; |
| | | toNext = true ; |
| | | } |
| | | //81åè½ç æ°æ®ï¼ä¸è¿è¡âå¨ä¸ä¸ªå°æ¶å
没æå卿¥è¦æ°æ®âçæ§å¶ |
| | | DataCd81Vo cdData = (DataCd81Vo)(cdObj) ; |
| | | this.toMsCenter(controller, d.getRtuAddr(), dV202404, cdData, dV202404.dt); |
| | | this.saveOrUpdateLast(sv, controller, d.getRtuAddr(), cdData.rtuDt, cdData, dV202404.dt) ; |
| | | this.saveHistory(sv, controller, d.getRtuAddr(), cdData.rtuDt, cdData, dV202404.dt) ; |
| | | this.cacheSaveTime(d.getRtuAddr()); |
| | | if(cdData.statePump != null && cdData.statePump.byteValue() == AlarmVo.DevCloseState){ |
| | | if(controller != null && controller.getIntakeId() != null){ |
| | | this.taskResult = new Object[]{controller.getIntakeId(), AlarmVo.DevCloseState} ; |
| | | toNext = true ; |
| | | } |
| | | } |
| | | }else if(cdObj instanceof DataCd80_5BVo){ |
| | |
| | | } |
| | | } |
| | | } |
| | | }else if(cdObj instanceof DataCd93_A3Vo){ |
| | | } |
| | | /* 2025-06-11 çæ±æµ·è¯´ï¼ä»¥ä¸æ°æ®ä¸çç¶æä¸åç¡®ï¼å°¤å
¶éé¨ç¶æï¼æä»¥ä¸åé纳 |
| | | else if(cdObj instanceof DataCd93_A3Vo){ |
| | | //å¹³å°/APPè¿ç¨å
³éæ°´æ³µ/éé¨ + å·å¡å
³æ³µ/é䏿¥ |
| | | if(this.needSave2Db( d.getRtuAddr())) { |
| | | //å¨ä¸ä¸ªå°æ¶å
没æå卿¥è¦æ°æ® |
| | |
| | | } |
| | | } |
| | | } |
| | | */ |
| | | }catch (Exception e){ |
| | | log.error("ä¿åæ§å¶å¨æ¥è¦åç¶ææ°æ®æ¶åçå¼å¸¸", e); |
| | | } |
| | |
| | | */ |
| | | @GetMapping("/rtuLogFile") |
| | | public void rtuLogFile(String rtuAddr, HttpServletRequest req, HttpServletResponse rep){ |
| | | File logFile = ResourceUnit.getInstance().getLogFile(rtuAddr + ".log") ; |
| | | logFile(rtuAddr, req, rep) ; |
| | | } |
| | | |
| | | /** |
| | | * ä¸è½½æ§å¶å¨ï¼RTUï¼ä¸ä¸è¡æ°æ®çlogæ¥å¿æä»¶ |
| | | * @param devId |
| | | * @param req |
| | | * @param rep |
| | | */ |
| | | @GetMapping("/mqttDevLogFile") |
| | | public void mqttDevLogFile(String devId, HttpServletRequest req, HttpServletResponse rep){ |
| | | logFile(devId, req, rep) ; |
| | | } |
| | | private void logFile(String fileName, HttpServletRequest req, HttpServletResponse rep){ |
| | | File logFile = ResourceUnit.getInstance().getLogFile(fileName + ".log") ; |
| | | if(logFile != null && logFile.exists()){ |
| | | //å¨Spring Bootä¸ï¼application/octet-stream;charset=UTF-8é常表示ååºçå
容æ¯åèæµï¼ |
| | | //å¹¶ä¸åç¬¦éæ¯UTF-8ã对äºè¿ç§ç±»åçååºï¼Spring Booté»è®¤ä½¿ç¨ByteArrayHttpMessageConverteræ¥å¤çï¼ |
| | |
| | | //å符éé常ç¨äºææ¬å
容ï¼èapplication/octet-streamé常ç¨äºäºè¿å¶å
容ï¼å æ¤å¨è¿ç§æ
åµä¸æå®å符éå¯è½æ¯ä¸åéçã |
| | | //ä¸è¿ï¼å¦æä½ ç¡®å®éè¦å¤ç带æç¹å®å符éçapplication/octet-streamååºï¼ä½ å¯è½éè¦èªå®ä¹HttpMessageConverterã |
| | | rep.addHeader("content-type", "application/octet-stream;charset=UTF-8"); |
| | | rep.addHeader("Content-Disposition", "attachment;fileName=" + (rtuAddr + ".log")) ; |
| | | rep.addHeader("Content-Disposition", "attachment;fileName=" + (fileName + ".log")) ; |
| | | ServletOutputStream out = null; |
| | | FileInputStream in = null ; |
| | | try { |
| | |
| | | */ |
| | | @GetMapping("/rtuLogText") |
| | | public BaseResponse<List<String>> rtuLogText(String rtuAddr){ |
| | | return logText(rtuAddr, true) ; |
| | | } |
| | | /** |
| | | * ä¸è½½æ§å¶å¨ï¼RTUï¼ä¸ä¸è¡æ°æ®çlogæ¥å¿æä»¶ |
| | | * @param devId |
| | | */ |
| | | @GetMapping("/mqttDevLogText") |
| | | public BaseResponse<List<String>> mqttDevLogText(String devId){ |
| | | return logText(devId, false) ; |
| | | } |
| | | /** |
| | | * ä¸è½½æ§å¶å¨ï¼RTUï¼ä¸ä¸è¡æ°æ®çlogæ¥å¿æä»¶ |
| | | * @param fileName |
| | | * @param reverseOrder |
| | | */ |
| | | private BaseResponse<List<String>> logText(String fileName, boolean reverseOrder){ |
| | | List<String> list ; |
| | | File logFile = ResourceUnit.getInstance().getLogFile(rtuAddr + ".log") ; |
| | | File logFile = ResourceUnit.getInstance().getLogFile(fileName + ".log") ; |
| | | if(logFile != null && logFile.exists()){ |
| | | BufferedReader reader = null ; |
| | | try { |
| | | reader = new BufferedReader(new FileReader(logFile)) ; |
| | | //æ°çå®ç°æ¹æ³ |
| | | Stream<String> linesStream = reader.lines() ; |
| | | //list = linesStream.toList() ; //æåæ¥é¡ºåº |
| | | list = linesStream.sorted(Comparator.reverseOrder()).collect(Collectors.toList()) ;//ååº |
| | | /* 忥çå®ç°æ¹æ³ |
| | | list = new ArrayList() ; |
| | | String line ; |
| | | while((line = reader.readLine()) != null){ |
| | | list.add(line) ; |
| | | if(reverseOrder){ |
| | | list = linesStream.sorted(Comparator.reverseOrder()).collect(Collectors.toList()) ;//ååº |
| | | }else{ |
| | | list = linesStream.toList() ; //æåæ¥é¡ºåº |
| | | } |
| | | */ |
| | | return BaseResponseUtils.buildSuccess(list); |
| | | } catch (Exception e) { |
| | | list = new ArrayList() ; |
| | | list.add("è¯»åæ§å¶å¨ï¼" + rtuAddr + "ï¼çæ¥å¿æä»¶å¼å¸¸ï¼" + (e.getMessage() == null?"":("ï¼" + e.getMessage()))) ; |
| | | list.add("è¯»åæ§å¶å¨ï¼" + fileName + "ï¼çæ¥å¿æä»¶å¼å¸¸ï¼" + (e.getMessage() == null?"":("ï¼" + e.getMessage()))) ; |
| | | return BaseResponseUtils.buildSuccess(list); |
| | | }finally{ |
| | | if(reader != null){ |
| | |
| | | } |
| | | }else{ |
| | | list = new ArrayList() ; |
| | | list.add("æªå¾å°æ§å¶å¨ï¼" + rtuAddr + "ï¼çæ¥å¿æä»¶") ; |
| | | list.add("æªå¾å°æ§å¶å¨ï¼" + fileName + "ï¼çæ¥å¿æä»¶") ; |
| | | return BaseResponseUtils.buildSuccess(list); |
| | | } |
| | | } |
| | |
| | | # çå·ï¼ gz |
| | | # åå·ï¼ lz |
| | | # éå·ï¼ jc |
| | | base.orgTag=mq |
| | | base.orgTag=ym |
| | | |
| | | # 233æå¡å¨ï¼ |
| | | # å
è°ï¼ 60000 |
| | |
| | | # çå·ï¼ mqtt.enable=false mqtt.protocolAndDeviceIds= mqtt.topicAndQos= |
| | | # åå·ï¼ mqtt.enable=false mqtt.protocolAndDeviceIds= mqtt.topicAndQos= |
| | | # éå·ï¼ mqtt.enable=true mqtt.protocolAndDeviceIds=? mqtt.topicAndQos=weather,1;soil,1;manure,1;state,1 |
| | | mqtt.enable=true |
| | | mqtt.enable=false |
| | | mqtt.protocolAndDeviceIds=sd1/338220031439,sd1/338220031440 |
| | | mqtt.subTopicAndQos=weather,1;soil,1;manure,1;state,1 |
| | | #MQtt设å¤å¨ä¸å®æ¶é´ï¼åéï¼åæªå叿¶æ¯ï¼è®¤ä¸ºè®¾å¤ç¦»çº¿ |
| | |
| | | svUserName MQTTæå¡å¨ç¨æ·å |
| | | svUserPassword MQTTæå¡å¨ç¨æ·å¯ç |
| | | poolMaxSize è¿æ¥æ± æå¤§è¿æ¥æ° |
| | | useMemoryPersistence 使ç¨å
åæä¹
åèéé»è®¤çæä»¶æä¹
å(trueæ¯ falseå¦) |
| | | protocolAndDeviceIds å¨åç³»ç»ï¼orgTagï¼ä¸æ¥å
¥ç设å¤(FBox)æç¨åè®®å设å¤idéå,å¤ä¸ªç¨éå·éå¼ï¼åè®®ä¸IDç¨æ£ææ éå¼ï¼ä¾å¦ï¼sd1/338220031439,sd1/338220031440 |
| | | subTopicAndQos: 订é
主é¢ä¸Qosï¼ä¸»é¢åä¸å
¶Qosç¨éå·éå¼ï¼å¤ä¸ªä¸»é¢åQosç¨åå·éå¼ï¼ä¾å¦ï¼ym/topic1,1;ym/topic2,1;ym/topic3,1ï¼å¦ææå¤ä¸ªOrgTagï¼ä¸»é¢åç¼ç¨å
¶OrgTag |
| | | pubTopicQos: åå¸ä¸»é¢çQosï¼åå¼èå´ï¼ |
| | |
| | | svUserName="dyyjy" |
| | | svUserPassword="Dyyjy2025,;.abc!@#" |
| | | poolMaxSize="10" |
| | | useMemoryPersistence="true" |
| | | protocolAndDeviceIds="${mqtt.protocolAndDeviceIds}" |
| | | subTopicAndQos="${mqtt.subTopicAndQos}" |
| | | pubTopicQos="1" |
| | |
| | | public static final String TcpPort = "tcpPort" ; |
| | | public static final String UpDataMinInterval = "upDataMinInterval" ; |
| | | public static final String WebPort = "webPort" ; |
| | | public static final String MqttEnable = "mqttEnable" ; |
| | | public static final String MqttNoSubThenOff = "noSubThenOff" ; |
| | | public static final String ActutorPort = "actutorPort" ; |
| | | public static final String DbName = "dbName" ; |
| | | } |
| | |
| | | newLine = "tcp.port=" + paramMap.get(ParamKey.TcpPort); |
| | | }else if(newLine != null && newLine.trim().startsWith("base.upData.min.interval=")){ |
| | | newLine = "base.upData.min.interval=" + paramMap.get(ParamKey.UpDataMinInterval); |
| | | }else if(newLine != null && newLine.trim().startsWith("mqtt.enable=")){ |
| | | newLine = "mqtt.enable=" + paramMap.get(ParamKey.MqttEnable); |
| | | }else if(newLine != null && newLine.trim().startsWith("mqtt.noSubThenOff=")){ |
| | | newLine = "mqtt.noSubThenOff=" + paramMap.get(ParamKey.MqttNoSubThenOff); |
| | | } |
| | | newLines.add(newLine); |
| | | } |
| | |
| | | key = ParamKey.WebPort; |
| | | this.getConfig(env, map, paramNamePre + key, key); |
| | | |
| | | key = ParamKey.MqttEnable; |
| | | this.getConfig(env, map, paramNamePre + key, key); |
| | | |
| | | key = ParamKey.MqttNoSubThenOff; |
| | | this.getConfig(env, map, paramNamePre + key, key); |
| | | |
| | | key = ParamKey.ActutorPort; |
| | | this.getConfig(env, map, paramNamePre + key, key); |
| | | |
| | |
| | | orgTag: ym |
| | | tcpPort: 60000 |
| | | upDataMinInterval: 6 |
| | | mqttEnable: false |
| | | noSubThenOff: 10 |
| | | webPort: 8070 |
| | | actutorPort: 9070 |
| | | dbName: pipIrr_ym |
| | |
| | | orgTag: sp |
| | | tcpPort: 62000 |
| | | upDataMinInterval: 6 |
| | | mqttEnable: false |
| | | noSubThenOff: 10 |
| | | webPort: 8073 |
| | | actutorPort: 9073 |
| | | dbName: pipIrr_sp |
| | |
| | | orgTag: test |
| | | tcpPort: 65000 |
| | | upDataMinInterval: 6 |
| | | mqttEnable: false |
| | | noSubThenOff: 10 |
| | | webPort: 8072 |
| | | actutorPort: 9072 |
| | | dbName: pipIrr_test |
| | |
| | | orgTag: mj |
| | | tcpPort: 61000 |
| | | upDataMinInterval: 6 |
| | | mqttEnable: false |
| | | noSubThenOff: 10 |
| | | webPort: 8071 |
| | | actutorPort: 9071 |
| | | dbName: pipIrr_mj |
| | |
| | | orgTag: mq |
| | | tcpPort: 60100 |
| | | upDataMinInterval: 6 |
| | | mqttEnable: true |
| | | noSubThenOff: 10 |
| | | webPort: 8100 |
| | | actutorPort: 9100 |
| | | dbName: pipIrr_mq |
| | |
| | | orgTag: yq |
| | | tcpPort: 60101 |
| | | upDataMinInterval: 6 |
| | | mqttEnable: false |
| | | noSubThenOff: 10 |
| | | webPort: 8101 |
| | | actutorPort: 9101 |
| | | dbName: pipIrr_yq |
| | |
| | | orgTag: hlj |
| | | tcpPort: 60102 |
| | | upDataMinInterval: 6 |
| | | mqttEnable: false |
| | | noSubThenOff: 10 |
| | | webPort: 8102 |
| | | actutorPort: 9102 |
| | | dbName: pipIrr_hlj |
| | |
| | | orgTag: gz |
| | | tcpPort: 60103 |
| | | upDataMinInterval: 6 |
| | | mqttEnable: false |
| | | noSubThenOff: 10 |
| | | webPort: 8103 |
| | | actutorPort: 9103 |
| | | dbName: pipIrr_gz |
| | |
| | | orgTag: lz |
| | | tcpPort: 60104 |
| | | upDataMinInterval: 6 |
| | | mqttEnable: false |
| | | noSubThenOff: 10 |
| | | webPort: 8104 |
| | | actutorPort: 9104 |
| | | dbName: pipIrr_lz |
| | |
| | | orgTag: jc |
| | | tcpPort: 60105 |
| | | upDataMinInterval: 6 |
| | | mqttEnable: true |
| | | noSubThenOff: 10 |
| | | webPort: 8105 |
| | | actutorPort: 9105 |
| | | dbName: pipIrr_jc |