From f8b2e59a82702a790c383a8ecd90c708c76e2488 Mon Sep 17 00:00:00 2001 From: liurunyu <lry9898@163.com> Date: 星期四, 05 六月 2025 17:51:59 +0800 Subject: [PATCH] 增量开发MQTT协议、功能模块,上下行命令(消息)等 --- pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java | 49 + pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/TkReceive.java | 5 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Com4Mqtt.java | 23 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/AdapterImp_MqttUnit.java | 19 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/ServerProperties.java | 3 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java | 61 + pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttPubMsgSdV1.java | 27 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/web/comResult/CommandResultDeal.java | 35 + pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/CommandType.java | 8 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java | 29 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java | 126 ++ pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttPubMessageConstantTask.java | 81 ++ pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPool.java | 46 + pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnit.java | 68 + pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java | 26 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ComCtrlVo.java | 12 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttComResultConstantTask.java | 75 ++ pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/web/com/CommandCtrl.java | 40 + pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/说明.txt | 6 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitAdapter.java | 14 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/TkMqttData.java | 37 + pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttSubMessageConstantTask.java | 77 ++ pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgNode.java | 69 + pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPooledObjectFactory.java | 60 + pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java | 11 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.properties | 16 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgCache.java | 68 + pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/pom.xml | 12 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml | 18 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttComResultCache.java | 69 + pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java | 4 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/CodeSdV1.java | 13 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/RtuDataDealTree.xml | 4 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkFindPSdV1.java | 31 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttPubMsg.java | 23 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgCache.java | 132 +++ pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java | 16 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/WebDownCom4MqttTask.java | 51 + pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java | 86 ++ pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/Test.java | 109 +++ pipIrr-platform/pipIrr-common/pom.xml | 13 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java | 170 ++++ pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttComResultNode.java | 58 + pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java | 95 ++ pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java | 68 + pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java | 27 46 files changed, 2,057 insertions(+), 33 deletions(-) diff --git a/pipIrr-platform/pipIrr-common/pom.xml b/pipIrr-platform/pipIrr-common/pom.xml index fa7c5a5..c0ea0e3 100644 --- a/pipIrr-platform/pipIrr-common/pom.xml +++ b/pipIrr-platform/pipIrr-common/pom.xml @@ -132,6 +132,18 @@ <version>2.2.2</version> </dependency> + <!-- MQTT娑堟伅涓棿浠�,杩炴帴姹犲強瀹㈡埛绔� --> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-pool2</artifactId> + <version>2.9.0</version> + </dependency> + <dependency> + <groupId>org.eclipse.paho</groupId> + <artifactId>org.eclipse.paho.client.mqttv3</artifactId> + <version>1.2.5</version> + </dependency> + <!-- quartz --> <dependency> <groupId>org.quartz-scheduler</groupId> @@ -173,6 +185,7 @@ <version>2.17.2</version> </dependency> + </dependencies> <build> diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPool.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPool.java new file mode 100644 index 0000000..1d6d60f --- /dev/null +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPool.java @@ -0,0 +1,46 @@ +package com.dy.common.mw.channel.mqtt; + +import org.apache.commons.pool2.impl.GenericObjectPool; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.eclipse.paho.client.mqttv3.MqttClient; + +/** + * @Author: liurunyu + * @Date: 2025/6/4 11:35 + * @Description + */ +public class MqttClientPool { + + private final GenericObjectPool<MqttClient> pool; + + public MqttClientPool(String broker, String username, String password, int maxConnections) { + MqttClientPooledObjectFactory factory = new MqttClientPooledObjectFactory(broker, username, password); + GenericObjectPoolConfig<MqttClient> config = new GenericObjectPoolConfig<>(); + config.setMaxTotal(maxConnections); + config.setMaxIdle(maxConnections); + config.setMinIdle(1); + config.setTestOnBorrow(true); + config.setTestOnReturn(true); + config.setTestWhileIdle(true); + this.pool = new GenericObjectPool<>(factory, config); + } + + public MqttClient popClient() throws Exception { + return pool.borrowObject(); + } + + public void pushClient(MqttClient client) { + if (client != null) { + pool.returnObject(client); + } + } + + public boolean isClose(){ + return pool.isClosed(); + } + + public void close() { + pool.close(); + } +} + diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPooledObjectFactory.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPooledObjectFactory.java new file mode 100644 index 0000000..bd2eb0f --- /dev/null +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPooledObjectFactory.java @@ -0,0 +1,60 @@ +package com.dy.common.mw.channel.mqtt; + +import org.apache.commons.pool2.BasePooledObjectFactory; +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; + +/** + * @Author: liurunyu + * @Date: 2025/6/4 11:16 + * @Description + */ +public class MqttClientPooledObjectFactory extends BasePooledObjectFactory<MqttClient> { + + private final String broker; + private final String username; + private final String password; + + public MqttClientPooledObjectFactory(String broker, String username, String password) { + this.broker = broker; + this.username = username; + this.password = password; + } + + @Override + public MqttClient create() throws Exception { + String clientId = MqttClient.generateClientId(); + MqttClient client = new MqttClient(broker, clientId); + + MqttConnectOptions options = new MqttConnectOptions(); + options.setUserName(username); + options.setPassword(password.toCharArray()); + options.setAutomaticReconnect(true); + options.setCleanSession(true); + + client.connect(options); + return client; + } + + @Override + public PooledObject<MqttClient> wrap(MqttClient client) { + return new DefaultPooledObject<>(client); + } + + @Override + public void destroyObject(PooledObject<MqttClient> p) throws Exception { + MqttClient client = p.getObject(); + if (client.isConnected()) { + client.disconnect(); + } + client.close(); + } + + @Override + public boolean validateObject(PooledObject<MqttClient> p) { + MqttClient client = p.getObject(); + return client.isConnected(); + } +} \ No newline at end of file diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/Test.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/Test.java new file mode 100644 index 0000000..2611e24 --- /dev/null +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/Test.java @@ -0,0 +1,109 @@ +package com.dy.common.mw.channel.mqtt; + +import com.dy.common.util.Callback; +import com.dy.common.util.ThreadJob; +import org.eclipse.paho.client.mqttv3.MqttClient; + +/** + * @Author: liurunyu + * @Date: 2025/6/4 11:35 + * @Description + */ +public class Test { + + private static String mqSvIp = "121.199.41.121"; + private static Integer mqSvPort = 1883; + private static String mqSvUserName = "dyyjy"; + private static String mqSvUserPassword = "Dyyjy2025,;.abc!@#"; + + private static String topic1 = "test/topic1" ; + private static String topic2 = "test/topic2" ; + + private static int maxConnections = 10 ; + + private static MqttClientPool pool; + /** + * QoS + * 绛夌骇 鍚嶇О 娑堟伅浼犻�掔壒鎬� + * 0 鑷冲涓�娆★紙At most once锛� 娑堟伅鍙戦�佸悗涓嶄繚璇佸埌杈撅紝鍙兘涓㈠け鎴栭噸澶嶏紝寮�閿�鏈�灏忥紝鍙潬鎬ф渶浣庛�� + * 1 鑷冲皯涓�娆★紙At least once锛� 娑堟伅鑷冲皯浼氬埌杈句竴娆★紝鍙兘閲嶅锛屼絾涓嶄細涓㈠け锛屽彲闈犳�т腑绛夛紝閫傜敤浜庡鏁板満鏅�� + * 2 鎭板ソ涓�娆★紙Exactly once锛� 娑堟伅浠呬細鍒拌揪涓�娆★紝涓嶉噸澶嶄笖涓嶄涪澶憋紝鍙潬鎬ф渶楂橈紝浣嗗紑閿�鏈�澶э紝瀹炵幇鏈�澶嶆潅銆� + */ + private static int QoS = 1 ; + + public static void main(String[] args) { + try{ + // 鍒濆鍖栬繛鎺ユ睜 + pool = new MqttClientPool("tcp://" + mqSvIp + ":" + mqSvPort, mqSvUserName, mqSvUserPassword, maxConnections); + MqttClient clientSub = pool.popClient() ; + testSubscribe(clientSub, topic1); + testSubscribe(clientSub, topic2); + MqttClient clientPub = pool.popClient() ; + testPublish(clientPub, topic1, "hello world", 1000); + testPublish(clientPub, topic2, "hello China", 1500); + } catch (Exception e) { + e.printStackTrace(); + } finally { + // 鍏抽棴杩炴帴姹� + //pool.close(); + } + } + private static void testSubscribe(MqttClient clientSub, String topic) throws Exception { + ThreadJob tjob = new ThreadJob() { + @Override + public Object execute() throws Exception { + clientSub.subscribe(topic, QoS, (topic, msg) -> { + System.out.println("浠庝富棰�" + topic + "鏀跺埌娑堟伅: " + new String(msg.getPayload())); + }); + while (true) { + Thread.sleep(1000L); + } + } + }; + tjob.start(new Callback() { + @Override + public void call(Object obj) { + System.out.println("鎵ц鎴愬姛"); + } + @Override + public void call(Object... objs) { + System.out.println("鎵ц鎴愬姛"); + } + + @Override + public void exception(Exception e) { + e.printStackTrace(); + } + }); + } + + private static void testPublish(MqttClient clientPub, String topic, String message, long sleep) throws Exception { + ThreadJob tjob = new ThreadJob() { + @Override + public Object execute() throws Exception { + int count = 0 ; + while (true) { + // 鍙戝竷娑堟伅 + byte[] bs = (message + " " + count++).getBytes() ; + clientPub.publish(topic, bs, QoS, false); + Thread.sleep(sleep); + } + } + }; + tjob.start(new Callback() { + @Override + public void call(Object obj) { + System.out.println("鎵ц鎴愬姛"); + } + @Override + public void call(Object... objs) { + System.out.println("鎵ц鎴愬姛"); + } + + @Override + public void exception(Exception e) { + e.printStackTrace(); + } + }); + } +} diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/CommandType.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/CommandType.java index 345acb7..5cc3e43 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/CommandType.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/CommandType.java @@ -20,6 +20,14 @@ */ public static final String outerTransCommand = "outerTransCommand" ; + + /** + * 閽堝Mqtt澶栭儴鍛戒护 + * 鍙兘鏄紓姝ワ紝鍛戒护缁撴灉閫氳繃鐩稿叧鐨勪俊鎭彂甯冮�氶亾鍙戝竷鍑哄幓 + */ + public static final String mqttCommand = "mqttCommand" ; + + /** * 鏈懡浠ゆ槸涓�涓埆鐨勫懡浠ょ殑缁撴灉锛堢粨鏋滀互鍛戒护鐨勬柟寮忚〃绀猴級 */ diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Com4Mqtt.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Com4Mqtt.java new file mode 100644 index 0000000..8663e05 --- /dev/null +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Com4Mqtt.java @@ -0,0 +1,23 @@ +package com.dy.common.mw.protocol4Mqtt; + +import lombok.Data; + +/** + * @Author: liurunyu + * @Date: 2025/6/4 17:38 + * @Description 鍛戒护鍊煎璞� + */ +@Data +public class Com4Mqtt { + public String commandId ;//鍛戒护ID + + public String deviceId ;//璁惧ID + + public String protocol;//鍗忚鍚嶇О锛堝疄鐜板巶瀹剁紪鐮侊級 + + public Short protocolVersion;//鍗忚鐗堟湰鍙� + + public String code ;//鍔熻兘鐮� + public String value ;//鍊硷紙鍙兘鏄暣鏁般�佹诞鐐规暟銆佸竷灏旀暟锛坱rue鎴杅alse锛夛級 + +} diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java new file mode 100644 index 0000000..15ec6b0 --- /dev/null +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java @@ -0,0 +1,61 @@ +package com.dy.common.mw.protocol4Mqtt; + +import com.dy.common.mw.protocol.Command; +import com.dy.common.mw.protocol4Mqtt.pSdV1.ProtocolConstantSdV1; +import com.dy.common.mw.protocol4Mqtt.pSdV1.ProtocolParserSdV1; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +/** + * @Author: liurunyu + * @Date: 2025/6/5 10:00 + * @Description + */ +public class MqttMsgParser { + public MqttSubMsg parseSubMsg(String topic, MqttMessage mqttMsg) throws Exception { + if(topic != null && topic.trim().length() != 0){ + String[] topicGrp = topic.split("/") ; + if(topicGrp.length != 4){ + throw new Exception("鎺ユ敹鐨刴qtt娑堟伅涓婚涓嶅彲璇嗗埆") ; + }else{ + if(topicGrp[1].equals("sd1")){ + //灞变笢璁惧(鍗忚)锛屼笖鐗堟湰鍙蜂负1 + return new ProtocolParserSdV1().parseSubMsg(topicGrp[2], topic, mqttMsg); + }else{ + throw new Exception("鎺ユ敹鐨刴qtt娑堟伅涓婚涓崗璁紙鍘傚鍙婄増鏈級涓嶅彲璇嗗埆") ; + } + } + }else{ + throw new Exception("鎺ユ敹鐨刴qtt娑堟伅涓婚涓虹┖") ; + } + } + + public MqttPubMsg createPubMsg(String orgTag, Command com) throws Exception { + if(com.protocol == null && com.protocol.trim().length() != 0){ + throw new Exception("鎺ユ敹鍒癕QTT鍛戒护锛屼絾鏈彁渚涘崗璁�") ; + } + if(com.protocolVersion == null){ + throw new Exception("鎺ユ敹鍒癕QTT鍛戒护锛屼絾鏈彁渚涘崗璁増鏈彿") ; + } + if(com.code != null && com.code.trim().length() != 0){ + throw new Exception("鎺ユ敹鍒癕QTT鍛戒护锛屼絾鏈彁渚涘姛鑳界爜") ; + } + if(com.protocol.equals(ProtocolConstantSdV1.protocolName)){ + if(com.protocolVersion.shortValue() == ProtocolConstantSdV1.protocolVer){ + return new ProtocolParserSdV1().createPubMsg(orgTag, com) ; + }else{ + throw new Exception("鎺ユ敹鍒癕QTT鍛戒护锛屽崗璁�" + com.protocol + "鐗堟湰" + com.protocolVersion + "鏋勯�犲櫒鏈疄鐜�") ; + } + }else{ + throw new Exception("鎺ユ敹鍒癕QTT鍛戒护锛屽崗璁�" + com.protocol + "鏋勯�犲櫒鏈疄鐜�") ; + } + } + + + public static void main(String[] args) { + String s = "ym/sd1/10000/control/m1" ; + String[] ss = s.split("/") ; + for (String s1 : ss) { + System.out.println(s1); + } + } +} diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttPubMsg.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttPubMsg.java new file mode 100644 index 0000000..bd864d3 --- /dev/null +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttPubMsg.java @@ -0,0 +1,23 @@ +package com.dy.common.mw.protocol4Mqtt; + +/** + * @Author: liurunyu + * @Date: 2025/6/5 11:44 + * @Description + */ +public abstract class MqttPubMsg { + public String commandId ;//鍛戒护ID + + public String deviceId ;//璁惧ID + + public String mqttResultSendWebUrl ;//Mqtt杩斿洖鍛戒护缁撴灉 鍙戝悜鐩殑鍦皐eb URL + + public String topic ;//娑堟伅涓婚 + public String msg ;//娑堟伅 + + public boolean isCacheForOffLine ;//涓嬭鍛戒护鎺у埗锛屾秷鎭腑闂翠欢涓嶅湪绾挎槸鍚︾紦瀛樺懡浠� + public boolean hasResponse ;//涓嬭鍛戒护鎺у埗锛屽懡浠ゆ槸鍚︽湁搴旂瓟 + + public abstract boolean valid(); + +} diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java new file mode 100644 index 0000000..61b4f65 --- /dev/null +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java @@ -0,0 +1,26 @@ +package com.dy.common.mw.protocol4Mqtt; + +import com.dy.common.util.Callback; + +/** + * @Author: liurunyu + * @Date: 2025/6/5 11:44 + * @Description + */ + +public abstract class MqttSubMsg { + public String commandId ;//鍛戒护ID + + public String deviceId ;//璁惧ID + + public String mqttResultSendWebUrl ;//Mtt杩斿洖鍛戒护缁撴灉 鍙戝悜鐩殑鍦皐eb URL + + public String topic ;//娑堟伅涓婚 + public String msg ;//娑堟伅 + + public abstract boolean valid(); + + public abstract boolean subMsgMatchPubMsg(MqttPubMsg pubMsg); + + public abstract void action(Callback callback); +} diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/CodeSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/CodeSdV1.java new file mode 100644 index 0000000..4d0cd99 --- /dev/null +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/CodeSdV1.java @@ -0,0 +1,13 @@ +package com.dy.common.mw.protocol4Mqtt.pSdV1; + +/** + * @Author: liurunyu + * @Date: 2025/6/4 17:41 + * @Description + */ +public class CodeSdV1 { + public static final String cd_Fault = "00" ;//鏁呴殰瑙i櫎鍛戒护 + public static final String cd_Stir = "01" ;//鎼呮媽鍚仠鍛戒护 + public static final String cd_Inject = "02" ;//娉ㄨ偉鍚仠鍛戒护 + public static final String cd_Irr = "03" ;//鐏屾簤鍚仠鍛戒护 +} diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttPubMsgSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttPubMsgSdV1.java new file mode 100644 index 0000000..c93c7ee --- /dev/null +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttPubMsgSdV1.java @@ -0,0 +1,27 @@ +package com.dy.common.mw.protocol4Mqtt.pSdV1; + +import com.dy.common.mw.protocol4Mqtt.MqttPubMsg; +import lombok.Data; + +/** + * @Author: liurunyu + * @Date: 2025/6/4 18:00 + * @Description 涓嬪彂鐨勫彂甯冩秷鎭紙鍗充笅琛屽懡浠わ級 + */ +@Data +public class MqttPubMsgSdV1 extends MqttPubMsg { + + public Integer address ;//瀵勫瓨鍣ㄥ湴鍧� + public String value ;//瀵勫瓨鍣ㄥ�� + + @Override + public boolean valid() { + if (topic == null || topic.isEmpty()) { + return false; + } + if (msg == null || msg.isEmpty()) { + return false; + } + return true; + } +} diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java new file mode 100644 index 0000000..03d1e8d --- /dev/null +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java @@ -0,0 +1,68 @@ +package com.dy.common.mw.protocol4Mqtt.pSdV1; + +import com.dy.common.mw.protocol4Mqtt.MqttPubMsg; +import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; +import com.dy.common.util.Callback; +import lombok.Data; + +/** + * @Author: liurunyu + * @Date: 2025/6/4 16:58 + * @Description 鏀跺埌鐨勮闃呮秷鎭� + */ +@Data +public class MqttSubMsgSdV1 extends MqttSubMsg { + public Integer address ;//瀵勫瓨鍣ㄥ湴鍧� + public String value ;//瀵勫瓨鍣ㄥ�� + + public MqttSubMsgSdV1(){} + + public MqttSubMsgSdV1(String deviceId, String topic, String msg) { + this.deviceId = deviceId ; + this.topic = topic ; + this.msg = msg ; + } + public String toString(){ + StringBuilder sb = new StringBuilder(); + if(commandId != null){ + sb.append("commandId:") + .append(commandId) + .append("\n") ; + } + sb.append("涓婚:") + .append(topic) + .append("\n") ; + sb.append("娑堟伅:") + .append(msg) + .append("\n") ; + + return sb.toString() ; + } + + public boolean subMsgMatchPubMsg(MqttPubMsg pubMsg){ + if (pubMsg instanceof MqttPubMsgSdV1) { + MqttPubMsgSdV1 pubMsgSdV1 = (MqttPubMsgSdV1) pubMsg; + if(this.address.intValue() == pubMsgSdV1.getAddress().intValue()){ + return true ; + } + } + return false ; + } + + @Override + public boolean valid() { + if (topic == null || topic.isEmpty()) { + return false; + } + if (msg == null || msg.isEmpty()) { + return false; + } + return true; + } + + @Override + public void action(Callback callback){ + callback.call(this) ; + } + +} diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java new file mode 100644 index 0000000..8d7e5cd --- /dev/null +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java @@ -0,0 +1,11 @@ +package com.dy.common.mw.protocol4Mqtt.pSdV1; + +/** + * @Author: liurunyu + * @Date: 2025/6/5 10:52 + * @Description + */ +public class ProtocolConstantSdV1 { + public static final String protocolName = "sd" ; + public static final short protocolVer = 1 ; +} diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java new file mode 100644 index 0000000..1ca98b2 --- /dev/null +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java @@ -0,0 +1,170 @@ +package com.dy.common.mw.protocol4Mqtt.pSdV1; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.dy.common.mw.protocol.Command; +import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.ComCtrlVo; +import org.eclipse.paho.client.mqttv3.MqttMessage; + + +/** + * @Author: liurunyu + * @Date: 2025/6/5 11:41 + * @Description + */ +public class ProtocolParserSdV1 { + public MqttSubMsgSdV1 parseSubMsg(String deviceId, String topic, MqttMessage mqttMsg) throws Exception { + MqttSubMsgSdV1 ms = new MqttSubMsgSdV1(deviceId, topic, new String(mqttMsg.getPayload(), "UTF-8")); + return ms; + } + + public MqttPubMsgSdV1 createPubMsg(String orgTag, Command com) throws Exception { + MqttPubMsgSdV1 msg = null ; + switch (com.code){ + case CodeSdV1.cd_Fault:{ + //鏁呴殰瑙i櫎鍛戒护 + this.checkParam(com); + this.checkRtnWebUrl(com); + msg = this.createPubMsgOfFault(orgTag, com) ; + break ; + } + case CodeSdV1.cd_Stir:{ + //鎼呮媽鍚仠鍛戒护 + this.checkParam(com); + this.checkRtnWebUrl(com); + msg = this.createPubMsgOfStir(orgTag, com) ; + break ; + } + case CodeSdV1.cd_Inject:{ + //娉ㄨ偉鍚仠鍛戒护 + this.checkParam(com); + this.checkRtnWebUrl(com); + msg = this.createPubMsgOfInject(orgTag, com) ; + break ; + } + case CodeSdV1.cd_Irr:{ + //鐏屾簤鍚仠鍛戒护 + this.checkParam(com); + this.checkRtnWebUrl(com); + msg = this.createPubMsgOfIrr(orgTag, com) ; + break ; + } + default:{ + throw new Exception("鎺ユ敹鍒癕QTT鍛戒护锛屽崗璁�" + com.protocol + "鐗堟湰" + com.protocolVersion + "鍔熻兘鐮�" + com.code + "鏋勯�犲櫒鏈疄鐜�") ; + } + } + return msg ; + } + private void checkParam(Command com)throws Exception { + if(com.param == null){ + throw new Exception("鎺ユ敹鍒癕QTT鍛戒护锛屽崗璁�" + com.protocol + "鐗堟湰" + com.protocolVersion + "鍔熻兘鐮�" + com.code + "鍛戒护鏁版嵁涓虹┖") ; + } + } + private void checkRtnWebUrl(Command com)throws Exception { + if(com.rtuResultSendWebUrl == null || com.rtuResultSendWebUrl.trim().equals("")){ + throw new Exception("鎺ユ敹鍒癕QTT鍛戒护锛屽崗璁�" + com.protocol + "鐗堟湰" + com.protocolVersion + "鍔熻兘鐮�" + com.code + "鍛戒护缁撴灉鍥炴敹URL涓虹┖") ; + } + } + private MqttPubMsgSdV1 createPubMsgOfFault(String orgTag, Command com) throws Exception { + JSONObject obj = (JSONObject) com.param; + String json = obj.toJSONString(); + ComCtrlVo cvo = JSON.parseObject(json, ComCtrlVo.class); + if(cvo == null){ + throw new Exception("json杞珻omCtrlVo涓簄ull") ; + } + MqttPubMsgSdV1 msg = new MqttPubMsgSdV1() ; + this.setPubMsgBase(com, msg); + msg.isCacheForOffLine = false ; + msg.hasResponse = true ; + msg.address = 123 ; + msg.value = "" + (cvo.isDo?1:0); + msg.topic = createTopic(orgTag, com) ; + msg.msg = "" ; + return msg ; + } + private MqttPubMsgSdV1 createPubMsgOfStir(String orgTag, Command com) throws Exception { + JSONObject obj = (JSONObject) com.param; + String json = obj.toJSONString(); + ComCtrlVo cvo = JSON.parseObject(json, ComCtrlVo.class); + if(cvo == null){ + throw new Exception("json杞珻omCtrlVo涓簄ull") ; + } + MqttPubMsgSdV1 msg = new MqttPubMsgSdV1() ; + this.setPubMsgBase(com, msg); + msg.isCacheForOffLine = false ; + msg.hasResponse = true ; + msg.address = 123 ; + msg.value = "" + (cvo.isDo?1:0); + msg.topic = createTopic(orgTag, com) ; + msg.msg = "" ; + return msg ; + } + private MqttPubMsgSdV1 createPubMsgOfInject(String orgTag, Command com) throws Exception { + JSONObject obj = (JSONObject) com.param; + String json = obj.toJSONString(); + ComCtrlVo cvo = JSON.parseObject(json, ComCtrlVo.class); + if(cvo == null){ + throw new Exception("json杞珻omCtrlVo涓簄ull") ; + } + MqttPubMsgSdV1 msg = new MqttPubMsgSdV1() ; + this.setPubMsgBase(com, msg); + msg.isCacheForOffLine = false ; + msg.hasResponse = true ; + msg.address = 123 ; + msg.value = "" + (cvo.isDo?1:0); + msg.topic = createTopic(orgTag, com) ; + msg.msg = "" ; + return msg ; + } + private MqttPubMsgSdV1 createPubMsgOfIrr(String orgTag, Command com) throws Exception { + JSONObject obj = (JSONObject) com.param; + String json = obj.toJSONString(); + ComCtrlVo cvo = JSON.parseObject(json, ComCtrlVo.class); + if(cvo == null){ + throw new Exception("json杞珻omCtrlVo涓簄ull") ; + } + MqttPubMsgSdV1 msg = new MqttPubMsgSdV1() ; + this.setPubMsgBase(com, msg); + msg.isCacheForOffLine = false ; + msg.hasResponse = true ; + msg.address = 123 ; + msg.value = "" + (cvo.isDo?1:0); + msg.topic = createTopic(orgTag, com) ; + msg.msg = "" ; + return msg ; + } + + private void setPubMsgBase(Command com, MqttPubMsgSdV1 msg){ + msg.commandId = com.id ; + msg.deviceId = com.rtuAddr ; + msg.mqttResultSendWebUrl = com.rtuResultSendWebUrl ; + } + + private String createTopic(String orgTag, Command com){ + String topic = null ; + switch (com.code){ + case CodeSdV1.cd_Fault:{ + topic = orgTag + "/" + com.protocol + com.protocolVersion + "/" + com.rtuAddr + "/control/m4" ; + break ; + } + case CodeSdV1.cd_Stir:{ + topic = orgTag + "/" + com.protocol + com.protocolVersion + "/" + com.rtuAddr + "/control/m80" ; + break ; + } + case CodeSdV1.cd_Inject:{ + topic = orgTag + "/" + com.protocol + com.protocolVersion + "/" + com.rtuAddr + "/control/m1" ; + break ; + } + case CodeSdV1.cd_Irr:{ + topic = orgTag + "/" + com.protocol + com.protocolVersion + "/" + com.rtuAddr + "/control/m2" ; + break ; + } + default:{ + topic = null ; + break; + } + } + return topic ; + } + +} diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ComCtrlVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ComCtrlVo.java new file mode 100644 index 0000000..4f90810 --- /dev/null +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ComCtrlVo.java @@ -0,0 +1,12 @@ +package com.dy.common.mw.protocol4Mqtt.pSdV1.downVos; + +/** + * @Author: liurunyu + * @Date: 2025/6/5 15:57 + * @Description + */ +public class ComCtrlVo { + //鏄惁鎺у埗鍔ㄤ綔锛宼rue鏄紝false鍚� + //鍙互鎵ц鍔熻兘鐮� 00锛�01锛�02锛�03鐨勫姩浣� + public boolean isDo;// +} diff --git "a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/\350\257\264\346\230\216.txt" "b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/\350\257\264\346\230\216.txt" new file mode 100644 index 0000000..6f29fa0 --- /dev/null +++ "b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/\350\257\264\346\230\216.txt" @@ -0,0 +1,6 @@ +灞变笢娉板畨鍏徃鎻愪緵姘磋偉鏈恒�佸湡澹ゅ鎯呯珯銆佹皵璞$珯銆丗Box绯荤粺鍗忚 + +寤鸿娑堟伅涓婚瑙勫垯锛� +瀛愮郴缁燂紙鏈烘瀯锛�/鍗忚鍚嶇О锛堝巶瀹讹級+鐗堟湰鍙�/璁惧缂栧彿/鍔熻兘缁�/鍦板潃 +渚嬪锛� +ym/sd1/10000/control/m4 (鍏冭皨/灞变笢+鐗堟湰1/璁惧缂栧彿/璁惧鎺у埗/鍦板潃) \ No newline at end of file diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/pom.xml b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/pom.xml index dbb1600..65316d7 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/pom.xml +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/pom.xml @@ -125,6 +125,18 @@ <version>2.0.7</version> </dependency> + <!-- MQTT娑堟伅涓棿浠�,杩炴帴姹犲強瀹㈡埛绔� --> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-pool2</artifactId> + <version>2.9.0</version> + </dependency> + <dependency> + <groupId>org.eclipse.paho</groupId> + <artifactId>org.eclipse.paho.client.mqttv3</artifactId> + <version>1.2.5</version> + </dependency> + <!--閽夐拤娑堟伅鎺ㄩ��--> <dependency> <groupId>com.aliyun</groupId> diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java index 2a405c4..b7917f5 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java @@ -4,13 +4,15 @@ import java.util.List; import com.dy.common.util.ConfigProperties; +import com.dy.common.util.IPUtils; import com.dy.rtuMw.server.*; +import com.dy.rtuMw.server.mqtt.MqttUnit; +import com.dy.rtuMw.server.mqtt.MqttUnitConfigVo; import com.dy.rtuMw.server.msCenter.MsCenterConfigVo; import com.dy.rtuMw.server.msCenter.MsCenterUnit; import com.dy.rtuMw.server.rtuData.RtuDataUnit; import com.dy.rtuMw.server.rtuData.RtuDataUnitConfigVo; -import com.dy.rtuMw.server.tasks.FromRtuComResultConstantTask; -import com.dy.rtuMw.server.tasks.FromRtuDataConstantTask; +import com.dy.rtuMw.server.tasks.*; import com.dy.common.mw.UnitInterface; import com.dy.common.mw.channel.tcp.TcpConfigVo; import com.dy.common.mw.channel.tcp.TcpUnit; @@ -20,8 +22,6 @@ import com.dy.common.mw.protocol.ProtocolUnit; import com.dy.common.mw.support.SupportUnit; import com.dy.common.mw.support.SupportUnitConfigVo; -import com.dy.rtuMw.server.tasks.SendMsConstantTask; -import com.dy.rtuMw.server.tasks.RtuDownConstantTask; import com.dy.rtuMw.resource.ResourceUnit; import com.dy.rtuMw.resource.ResourceUnitConfigVo; import com.dy.common.springUtil.SpringContextUtil; @@ -359,28 +359,31 @@ //RTU杩滅▼鍗囩骇妯″潡 UpgradeUnitConfigVo ugVo = new UpgradeUnitConfigVo(); ugVo.enable = conf.getSetAttrBoolean(doc, "config.upgrade", "enable", null, null) ; - ugVo.openNoUpgrade = conf.getSetAttrBoolean(doc, "config.upgrade", "openNoUpgrade", null, null) ; - ugVo.lastOpenMaxGoOn = conf.getSetAttrPlusInt(doc, "config.upgrade", "lastOpenMaxGoOn", null, 5, 360000, null); - ugVo.lastOpenMaxGoOn = ugVo.lastOpenMaxGoOn * 1000 ;//鍙樻垚姣 - ugVo.noOneRtuUpgradeMaxDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "noOneRtuUpgradeMaxDuration", null, 5, 360000, null); - ugVo.noOneRtuUpgradeMaxDuration = ugVo.noOneRtuUpgradeMaxDuration * 1000 ;//鍙樻垚姣 - ugVo.runningAndIdleDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "runningAndIdleDuration", null, 5, 360000, null); - ugVo.runningAndIdleDuration = ugVo.runningAndIdleDuration * 1000 ;//鍙樻垚姣 - ugVo.failTryTimes = conf.getSetAttrPlusInt(doc, "config.upgrade", "failTryTimes", null, 0, 100, null); - ugVo.ugMaxRtuAtOnce = conf.getSetAttrPlusInt(doc, "config.upgrade", "ugMaxRtuAtOnce", null, 0, 1000000, null); - ugVo.rtuOffLineWaitDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "rtuOffLineWaitDuration", null, 1, 3600000, null); - ugVo.rtuOffLineWaitDuration = ugVo.rtuOffLineWaitDuration * 1000;//鍙樻垚姣 - ugVo.notifyStateInterval = conf.getSetAttrPlusInt(doc, "config.upgrade", "notifyStateInterval", null, 1, 300, null); - ugVo.notifyStateInterval = ugVo.notifyStateInterval * 1000;//鍙樻垚姣 - ugVo.notifyTimesAfterOver = conf.getSetAttrPlusInt(doc, "config.upgrade", "notifyTimesAfterOver", null, 0, null, null); - ugVo.showStartInfo = showStartInfo ; - AdapterImp_UpgradeUnit ugAdap = new AdapterImp_UpgradeUnit(); - ugAdap.setConfig(ugVo); - UpgradeUnit ugUnit = UpgradeUnit.getInstance(); - ugUnit.setAdapter(ugAdap); - ugUnit.start(obj -> { - }); - units.add(ugUnit) ; + if(ugVo.enable){ + ugVo.openNoUpgrade = conf.getSetAttrBoolean(doc, "config.upgrade", "openNoUpgrade", null, null) ; + ugVo.lastOpenMaxGoOn = conf.getSetAttrPlusInt(doc, "config.upgrade", "lastOpenMaxGoOn", null, 5, 360000, null); + ugVo.lastOpenMaxGoOn = ugVo.lastOpenMaxGoOn * 1000 ;//鍙樻垚姣 + ugVo.noOneRtuUpgradeMaxDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "noOneRtuUpgradeMaxDuration", null, 5, 360000, null); + ugVo.noOneRtuUpgradeMaxDuration = ugVo.noOneRtuUpgradeMaxDuration * 1000 ;//鍙樻垚姣 + ugVo.runningAndIdleDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "runningAndIdleDuration", null, 5, 360000, null); + ugVo.runningAndIdleDuration = ugVo.runningAndIdleDuration * 1000 ;//鍙樻垚姣 + ugVo.failTryTimes = conf.getSetAttrPlusInt(doc, "config.upgrade", "failTryTimes", null, 0, 100, null); + ugVo.ugMaxRtuAtOnce = conf.getSetAttrPlusInt(doc, "config.upgrade", "ugMaxRtuAtOnce", null, 0, 1000000, null); + ugVo.rtuOffLineWaitDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "rtuOffLineWaitDuration", null, 1, 3600000, null); + ugVo.rtuOffLineWaitDuration = ugVo.rtuOffLineWaitDuration * 1000;//鍙樻垚姣 + ugVo.notifyStateInterval = conf.getSetAttrPlusInt(doc, "config.upgrade", "notifyStateInterval", null, 1, 300, null); + ugVo.notifyStateInterval = ugVo.notifyStateInterval * 1000;//鍙樻垚姣 + ugVo.notifyTimesAfterOver = conf.getSetAttrPlusInt(doc, "config.upgrade", "notifyTimesAfterOver", null, 0, null, null); + ugVo.showStartInfo = showStartInfo ; + AdapterImp_UpgradeUnit ugAdap = new AdapterImp_UpgradeUnit(); + ugAdap.setConfig(ugVo); + UpgradeUnit ugUnit = UpgradeUnit.getInstance(); + ugUnit.setAdapter(ugAdap); + ugUnit.start(obj -> { + }); + units.add(ugUnit) ; + } + ///////////////// //RTU涓婅鏁版嵁澶勭悊妯″潡锛堜换鍔℃爲锛� @@ -410,6 +413,12 @@ CoreUnit.addConstantTask(new RtuDownConstantTask()); CoreUnit.addConstantTask(new FromRtuDataConstantTask()); CoreUnit.addConstantTask(new FromRtuComResultConstantTask()); + Boolean enableMq = conf.getSetAttrBoolean(doc, "config.mqtt", "enable", null, null) ; + if(enableMq != null && enableMq.booleanValue()){ + CoreUnit.addConstantTask(new MqttSubMessageConstantTask()); + CoreUnit.addConstantTask(new MqttPubMessageConstantTask()); + CoreUnit.addConstantTask(new MqttComResultConstantTask()); + } CoreUnit.addConstantTask(new SendMsConstantTask()); coreUnit.start(obj -> { }); @@ -433,7 +442,70 @@ }); TcpSvUrl = "[ip]:" + tcpVo.port ; units.add(tcpUnit) ; - } + } + + ///////////////// + //MQTT妯″潡 + MqttUnitConfigVo mqVo = new MqttUnitConfigVo(); + mqVo.enable = conf.getSetAttrBoolean(doc, "config.mqtt", "enable", null, null) ; + ServerProperties.mqttUnitEnable = mqVo.enable ; + if(mqVo.enable){ + mqVo.svIp = conf.getSetAttrTxt(doc, "config.mqtt", "svIp", null, true, null) ; + if(!IPUtils.ipValid(mqVo.svIp)){ + throw new Exception("config.mqtt.svIp閰嶇疆鐨処P涓嶅悎娉�") ; + } + mqVo.svPort = conf.getSetAttrPlusInt(doc, "config.mqtt", "svPort", null, 5, 360000, null); + if(mqVo.svPort < 0 || mqVo.svPort > 65535){ + throw new Exception("config.mqtt.svPort閰嶇疆鐨勭鍙d笉鍚堟硶") ; + } + mqVo.svUserName = conf.getSetAttrTxt(doc, "config.mqtt", "svUserName", null, true, null) ; + if(mqVo.svUserName == null || mqVo.svUserName.trim().equals("")){ + throw new Exception("config.mqtt.svUserName閰嶇疆鐨勭敤鎴峰悕涓嶅悎娉�") ; + }else{ + mqVo.svUserName = mqVo.svUserName.trim() ; + } + mqVo.svUserPassword = conf.getSetAttrTxt(doc, "config.mqtt", "svUserPassword", null, true, null) ; + if(mqVo.svUserPassword == null || mqVo.svUserPassword.trim().equals("")){ + throw new Exception("config.mqtt.svUserName閰嶇疆鐨勭敤鎴峰瘑鐮佷笉鍚堟硶") ; + }else{ + mqVo.svUserPassword = mqVo.svUserPassword.trim() ; + } + mqVo.poolMaxSize = conf.getSetAttrPlusInt(doc, "config.mqtt", "poolMaxSize", null, 5, 360000, null); + if(mqVo.poolMaxSize <= 1 || mqVo.poolMaxSize > 1000){ + throw new Exception("config.mqtt.poolMaxSize閰嶇疆鐨勮繛鎺ユ睜杩炴帴鏈�澶ф暟閲忎笉鍚堟硶") ; + } + String topicAndQos = conf.getSetAttrTxt(doc, "config.mqtt", "topicAndQos", null, true, null) ; + if(topicAndQos == null || topicAndQos.trim().equals("")){ + throw new Exception("config.mqtt.topicAndQos閰嶇疆鐨勪富棰樺強Qos涓嶅悎娉�") ; + }else{ + topicAndQos = topicAndQos.trim() ; + topicAndQos = topicAndQos.replaceAll("锛�", ","); + topicAndQos = topicAndQos.replaceAll("锛�", ";"); + String[] topicAndQosArr = topicAndQos.split(";") ; + mqVo.subTopics = new String[topicAndQosArr.length] ; + mqVo.topicsQos = new int[topicAndQosArr.length] ; + int index = 0 ; + for(String topicAndQosStr : topicAndQosArr){ + String[] tq = topicAndQosStr.split(",") ; + mqVo.subTopics[index] = tq[0].trim() ; + mqVo.topicsQos[index] = Integer.parseInt(tq[1].trim()) ; + index++ ; + } + } + mqVo.publishQos = conf.getSetAttrPlusInt(doc, "config.mqtt", "publishQos", null, 0, 3, null); + if(mqVo.publishQos < 0 || mqVo.publishQos > 3){ + throw new Exception("config.mqtt.publishQos閰嶇疆涓嶅悎娉�") ; + } + mqVo.showStartInfo = showStartInfo ; + AdapterImp_MqttUnit mqAdapt = new AdapterImp_MqttUnit(); + mqAdapt.setConfig(mqVo); + MqttUnit mqUnit = MqttUnit.getInstance(); + mqUnit.setAdapter(mqAdapt); + mqUnit.start(obj -> { + }); + units.add(mqUnit) ; + } + } catch (Exception e) { e.printStackTrace(); } diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java index 9f945ae..5171b77 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java @@ -20,13 +20,19 @@ try{ // 纭繚杩欐浠g爜灏藉彲鑳藉揩閫熸墽琛岋紝閬垮厤褰卞搷JVM鐨勫叧闂� log.info("绋嬪簭锛堟帶鍒跺彴锛夊叧闂挬瀛愮被鎵ц"); - Command com = new Command() ; - com.id = Command.defaultId ; - com.code = CodeLocal.stopTcpSv ; - com.type = CommandType.innerCommand ; - new CommandInnerDeaLer().deal(com) ; + Command com1 = new Command() ; + com1.id = Command.defaultId ; + com1.code = CodeLocal.stopTcpSv ; + com1.type = CommandType.innerCommand ; + new CommandInnerDeaLer().deal(com1) ; //Thread.sleep(100L);//瀹炴祴涓嶆墽琛� log.info("鍏抽棴绋嬪簭鍓嶏紝鍏抽棴浜員CP鏈嶅姟"); + Command com2 = new Command() ; + com2.id = Command.defaultId ; + com2.code = CodeLocal.stopMqttSv ; + com2.type = CommandType.innerCommand ; + new CommandInnerDeaLer().deal(com2) ; + }catch (Exception e){ log.error("绋嬪簭锛堟帶鍒跺彴锛夊叧闂挬瀛愬彂鐢熷紓甯�", e); } diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/AdapterImp_MqttUnit.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/AdapterImp_MqttUnit.java new file mode 100644 index 0000000..4f098b5 --- /dev/null +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/AdapterImp_MqttUnit.java @@ -0,0 +1,19 @@ +package com.dy.rtuMw.server; + + +import com.dy.rtuMw.server.mqtt.MqttUnitAdapter; +import com.dy.rtuMw.server.mqtt.MqttUnitConfigVo; + +public class AdapterImp_MqttUnit implements MqttUnitAdapter { + + private MqttUnitConfigVo configVo ; + + public MqttUnitConfigVo getConfig() { + return configVo; + } + + public void setConfig(MqttUnitConfigVo configVo){ + this.configVo = configVo ; + } + +} diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/ServerProperties.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/ServerProperties.java index cf3f4a4..ef5a0da 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/ServerProperties.java +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/ServerProperties.java @@ -54,4 +54,7 @@ //鏈夋姤璀﹀彂鐢熸椂锛屽悜閽夐拤鍙戦�佹秷鎭殑闂撮殧鏃堕暱锛堝垎閽燂級 public static Integer sendDingDingAlarmMsInterval = 60 ; + //Mqtt妯″潡鏄惁鍚姩 + public static Boolean mqttUnitEnable = false ; + } diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java index d2f4740..9e21c81 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java @@ -5,6 +5,7 @@ import com.dy.common.mw.protocol.Command; import com.dy.common.mw.protocol.rtuState.RtuStatus; import com.dy.rtuMw.server.local.localProtocol.*; +import com.dy.rtuMw.server.mqtt.MqttUnit; import java.util.HashMap; import java.util.Map; @@ -43,6 +44,8 @@ return this.stopTcpSv(com) ; }else if(code.equals(CodeLocal.recoverTcpSv)){ return this.recoverTcpSv(com) ; + }else if(code.equals(CodeLocal.recoverMqttSv)){ + return this.stopMqttSv(com) ; }else if(code.equals(CodeLocal.mwState)){ return this.mwInfo(com) ; } @@ -157,6 +160,30 @@ return ReturnCommand.successed("宸茬粡鍚姩鎭㈠TCP鏈嶅姟", command.getId(), command.getCode(), null) ; } + /** + * 鍋滄TCP鏈嶅姟锛屼笉鍐嶆帴鍏ユ柊鐨凾CP杩炴帴锛屽凡缁廡CP杩炴帴鐨勫叏閮ㄦ柇杩炴帴 + * @throws Exception + */ + private Command stopMqttSv(Command command) throws Exception{ + MqttUnit.getInstance().stop(new UnitCallbackInterface(){ + public void call(Object obj) throws Exception { + } + }); + return ReturnCommand.successed("宸茬粡鍚姩鍋滄Mqtt鏈嶅姟", command.getId(), command.getCode(), null) ; + } + + + /** + * 鎭㈠TCP鏈嶅姟锛屾帴鍏ユ柊鐨凾CP杩炴帴 + * @throws Exception + */ + private Command recoverMqttSv(Command command) throws Exception{ + MqttUnit.getInstance().recover(new UnitCallbackInterface(){ + public void call(Object obj) throws Exception { + } + }); + return ReturnCommand.successed("宸茬粡鍚姩鎭㈠Mqtt鏈嶅姟", command.getId(), command.getCode(), null) ; + } /** diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java index 9337b04..9edb9fb 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java @@ -22,6 +22,10 @@ public static final String recoverTcpSv = "LCD0112" ;//閲嶅惎TCP鏈嶅姟锛屾帴鍏ユ柊鐨凾CP杩炴帴 + public static final String stopMqttSv = "LCD0114" ;//鍋滄Mqtt鏈嶅姟 + + public static final String recoverMqttSv = "LCD0116" ;//閲嶅惎Mqtt鏈嶅姟 + public static final String mwState = "LCD0200" ;//寰楀埌閫氫俊涓棿浠惰繍琛屼俊鎭� } diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttComResultCache.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttComResultCache.java new file mode 100644 index 0000000..413d9f2 --- /dev/null +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttComResultCache.java @@ -0,0 +1,69 @@ +package com.dy.rtuMw.server.mqtt; + +import com.dy.common.queue.Node; +import com.dy.common.queue.Queue; +import com.dy.rtuMw.server.ServerProperties; + +/** + * @Author: liurunyu + * @Date: 2025/6/5 15:05 + * @Description + */ +public class MqttComResultCache { + + //TCP涓嬭鍛戒护缂撳瓨闃熷垪 + private static Queue cacheQueue = new Queue("MqttComResultCache") ; + + private static MqttComResultCache instance = new MqttComResultCache() ; + + private MqttComResultCache(){ + cacheQueue.setLimit(ServerProperties.cacheUpDownDataWarnCount, ServerProperties.cacheUpDownDataMaxCount); + } + + public static MqttComResultCache getInstance(){ + return instance ; + } + + /** + * 缂撳瓨鑺傜偣 + * @param node node + * @throws Exception 寮傚父 + */ + public static void cacheMqttComResult(MqttComResultNode node) throws Exception{ + if(node != null && node.obj != null){ + cacheQueue.pushHead(node); + } + } + + /** + * 寰楀埌绗竴涓妭鐐� + * @return Node + */ + public static Node getFirstQueueNode(){ + return cacheQueue.getFirstNode() ; + } + + /** + * 寰楀埌鏈�鍚庝竴涓妭鐐� + * @return Node + */ + public static Node getLastQueueNode(){ + return cacheQueue.getLastNode() ; + } + + /** + * 绉婚櫎鑺傜偣 + * @param node + */ + public static void removeNode(Node node){ + cacheQueue.remove(node); + } + + /** + * 缂撳瓨鐨勮妭鐐规暟 + * @Return 缂撳瓨鑺傜偣鏁� + */ + public static Integer size(){ + return cacheQueue.size() ; + } +} diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttComResultNode.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttComResultNode.java new file mode 100644 index 0000000..5097c57 --- /dev/null +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttComResultNode.java @@ -0,0 +1,58 @@ +package com.dy.rtuMw.server.mqtt; + +import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; +import com.dy.common.queue.NodeObj; +import com.dy.common.springUtil.SpringContextUtil; +import com.dy.common.threadPool.ThreadPool; +import com.dy.common.threadPool.TreadPoolFactory; +import com.dy.rtuMw.web.comResult.CommandResultDeal; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * @Author: liurunyu + * @Date: 2025/6/5 15:06 + * @Description + */ +public class MqttComResultNode implements NodeObj { + + private static final Logger log = LogManager.getLogger(MqttComResultNode.class.getName()); + + public Object obj ;//鏁版嵁 + + public MqttComResultNode(Object obj){ + this.obj = obj ; + } + /** + * 鑷繁澶勭悊鑷繁 + * @return + */ + public boolean dealSelf(){ + try { + ThreadPool.Pool pool = TreadPoolFactory.getThreadPoolLong() ; + pool.putJob(new ThreadPool.Job() { + public void execute() { + if(obj != null){ + if(obj instanceof MqttSubMsg){ + CommandResultDeal deal = SpringContextUtil.getBean(CommandResultDeal.class) ; + deal.deal((MqttSubMsg)obj); + } + } + } + @Override + public void destroy(){ + } + @Override + public boolean isDestroy(){ + return false ; + } + + }); + } catch (Exception e) { + log.error("鍦≧tuComResultNode鍐呭彂鐢熷紓甯�", e); + } + return true ; + } + + +} diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java new file mode 100644 index 0000000..3c775b5 --- /dev/null +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java @@ -0,0 +1,86 @@ +package com.dy.rtuMw.server.mqtt; + +import com.dy.common.mw.channel.mqtt.MqttClientPool; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.eclipse.paho.client.mqttv3.MqttClient; + +/** + * @Author: liurunyu + * @Date: 2025/6/4 14:54 + * @Description + */ +public class MqttManager { + + private static final Logger log = LogManager.getLogger(MqttManager.class.getName()); + + private static final MqttManager INSTANCE = new MqttManager(); + + private MqttUnitConfigVo configVo ; + + private MqttClientPool pool; + + private MqttManager(){ + } + + public static MqttManager getInstance() { + return MqttManager.INSTANCE; + } + /** + * 鍒濆鍖栭厤缃俊鎭� + */ + public void initOption(MqttUnitConfigVo configVo) { + this.configVo = configVo; + } + + public void start()throws Exception{ + String URL = "tcp://" + this.configVo.svIp + ":" + this.configVo.svPort; + this.pool = new MqttClientPool(URL, this.configVo.svUserName, this.configVo.svUserPassword, this.configVo.poolMaxSize); + if(this.pool.isClose()){ + throw new Exception("Mqtt杩炴帴姹犲垵濮嬪寲澶辫触"); + } + MqttClient clientSub = null ; + try { + clientSub = pool.popClient();//鏂板垱寤轰竴涓狢lient鏃讹紝姝lient瀹為檯鍘昏繛鎺QTT鏈嶅姟鍣紝濡傛灉杩炴帴涓嶄笂锛屽氨浼氭姏鍑哄紓甯� + }catch (Exception e){ + throw new Exception("Mqtt杩炴帴姹犺幏寰楄繛鎺ュ紓甯�", e); + } + if(clientSub == null || !clientSub.isConnected()){ + throw new Exception("Mqtt杩炴帴姹犺幏寰楄闃呰繛鎺ヤ笉鍙敤"); + } + for(int i = 0; i < this.configVo.subTopics.length; i++){ + clientSub.subscribe(this.configVo.subTopics[i], this.configVo.topicsQos[i], new MqttMessageListener()); + } + } + + public void stop()throws Exception{ + if(this.pool != null){ + // 鍏抽棴杩炴帴姹� + this.pool.close(); + } + } + + public MqttClient popMqttClient() throws Exception{ + return this.pool.popClient(); + } + + public void pushMqttClient(MqttClient client) { + this.pool.pushClient(client); + } + + public void publishMsg(MqttClient client, String topic, byte[] msg) throws Exception{ + client.publish(topic, msg, this.configVo.publishQos, false); + } + + public void publishMsg(MqttClient client, String topic, String msg) throws Exception{ + byte[] bs = msg.getBytes("UTF-8") ; + client.publish(topic, bs, this.configVo.publishQos, false); + } + + public boolean poolIsClose(){ + if(this.pool == null){ + return true; + } + return this.pool.isClose(); + } +} diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java new file mode 100644 index 0000000..19bd3eb --- /dev/null +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java @@ -0,0 +1,49 @@ +package com.dy.rtuMw.server.mqtt; + +import com.dy.common.mw.protocol4Mqtt.MqttMsgParser; +import com.dy.common.mw.protocol4Mqtt.MqttPubMsg; +import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; +import com.dy.common.util.Callback; +import org.eclipse.paho.client.mqttv3.* ; + +/** + * @Author: liurunyu + * @Date: 2025/6/4 15:52 + * @Description + */ +public class MqttMessageListener implements IMqttMessageListener{ + @Override + public void messageArrived(String topic, MqttMessage msg) throws Exception { + MqttMsgParser parser = new MqttMsgParser() ; + MqttSubMsg subMsg = parser.parseSubMsg(topic, msg) ; + this.nextDeal(subMsg); + } + private void nextDeal(MqttSubMsg subMsg)throws Exception { + subMsg.action(new Callback() { + @Override + public void call(Object obj) { + MqttSubMsg subMs = (MqttSubMsg) obj ; + MqttPubMsg pubMs = MqttPubMsgCache.matchFromTail(subMs) ; + if(pubMs != null){ + subMs.mqttResultSendWebUrl = pubMs.mqttResultSendWebUrl ; + subMs.commandId = pubMs.commandId ; + try { + MqttComResultCache.getInstance().cacheMqttComResult(new MqttComResultNode(subMs)); + } catch (Exception e) { + e.printStackTrace(); + } + } + try{ + MqttSubMsgCache.getInstance().cacheMsg(new MqttSubMsgNode(subMsg)); + }catch (Exception e){ + } + } + @Override + public void call(Object... objs) { + } + @Override + public void exception(Exception e) { + } + }); + } +} diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgCache.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgCache.java new file mode 100644 index 0000000..2ae8adc --- /dev/null +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgCache.java @@ -0,0 +1,132 @@ +package com.dy.rtuMw.server.mqtt; + +import com.dy.common.mw.protocol4Mqtt.MqttPubMsg; +import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; +import com.dy.common.queue.Node; +import com.dy.common.queue.Queue; +import com.dy.rtuMw.server.ServerProperties; + +/** + * @Author: liurunyu + * @Date: 2025/6/4 17:24 + * @Description + */ +public class MqttPubMsgCache { + + //TCP涓嬭鍛戒护缂撳瓨闃熷垪 + private static Queue cacheQueue = new Queue("mqttPubMsgCache") ; + + private static MqttPubMsgCache instance = new MqttPubMsgCache() ; + + private MqttPubMsgCache(){ + cacheQueue.setLimit(ServerProperties.cacheUpDownDataWarnCount, ServerProperties.cacheUpDownDataMaxCount); + } + + public static MqttPubMsgCache getInstance(){ + return instance ; + } + + + public static Integer info(){ + Integer comTotalDown = 0 ;//缂撳瓨鐨勪笅琛屽懡浠ゆ�绘暟 + MqttPubMsgNode obj ; + Node node = cacheQueue.getFirstNode() ; + while(node != null && node.obj != null){ + obj = (MqttPubMsgNode)node.obj; + if(!obj.onceReceivedResult){ + comTotalDown ++ ; + } + } + return comTotalDown ; + } + + /** + * 缂撳瓨鍛戒护 + * @param result + * @throws Exception + */ + public static void cacheCommand(MqttPubMsg result) throws Exception{ + if(result != null){ + cacheQueue.pushHead(new MqttPubMsgNode(result)); + } + } + + /** + * 鍖归厤鍛戒护缁撴灉 + * @param subMsg + * @return + */ + public static MqttPubMsg matchFromHead(MqttSubMsg subMsg){ + MqttPubMsg pubMsg = null ; + MqttPubMsgNode obj = null ; + Node node = cacheQueue.getFirstNode() ; + while(node != null && node.obj != null){ + obj = (MqttPubMsgNode)node.obj; + pubMsg = obj.result ; + if(pubMsg != null + && subMsg.subMsgMatchPubMsg(pubMsg)){ + obj.onceReceivedResult = true ;//鏍囪瘑宸茬粡鏀跺埌鍛戒护缁撴灉 + return pubMsg; + }else{ + node = node.next ; + } + } + return null ; + } + + /** + * 鍖归厤鍛戒护缁撴灉 + * @param subMsg + * @return + */ + public static MqttPubMsg matchFromTail(MqttSubMsg subMsg){ + MqttPubMsg pubMsg = null ; + MqttPubMsgNode obj = null ; + Node node = cacheQueue.getLastNode() ; + while(node != null && node.obj != null){ + obj = (MqttPubMsgNode)node.obj; + pubMsg = obj.result ; + if(pubMsg != null + && subMsg.subMsgMatchPubMsg(pubMsg)){ + obj.onceReceivedResult = true ;//鏍囪瘑宸茬粡鏀跺埌鍛戒护缁撴灉 + return pubMsg; + }else{ + node = node.pre ; + } + } + return null ; + } + + /** + * 寰楀埌绗竴涓妭鐐� + * @return + */ + public static Node getFirstQueueNode(){ + return cacheQueue.getFirstNode() ; + } + + /** + * 寰楀埌鏈�鍚庝竴涓妭鐐� + * @return + */ + public static Node getLastQueueNode(){ + return cacheQueue.getLastNode() ; + } + + /** + * 绉婚櫎鑺傜偣 + * @param node + */ + public static void removeNode(Node node){ + cacheQueue.remove(node); + } + + /** + * 缂撳瓨鐨勮妭鐐规暟 + * @Return 缂撳瓨鑺傜偣鏁� + */ + public static Integer size(){ + return cacheQueue.size() ; + } + +} diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java new file mode 100644 index 0000000..e345d59 --- /dev/null +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java @@ -0,0 +1,95 @@ +package com.dy.rtuMw.server.mqtt; + +import com.dy.common.mw.protocol4Mqtt.MqttPubMsg; +import com.dy.common.queue.NodeObj; +import com.dy.rtuMw.server.ServerProperties; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.eclipse.paho.client.mqttv3.MqttClient; + +/** + * @Author: liurunyu + * @Date: 2025/6/4 17:25 + * @Description + */ +public class MqttPubMsgNode implements NodeObj { + + private static Logger log = LogManager.getLogger(MqttPubMsgNode.class.getName()); + + public MqttPubMsg result ;//涓嬭鍛戒护 + public Long cachTime ;//缂撳瓨鏃跺埢 + public boolean onceReceivedResult ;//宸茬粡鏀跺埌鍛戒护搴旂瓟 + + + public MqttPubMsgNode(MqttPubMsg result){ + this.result = result ; + this.cachTime = System.currentTimeMillis() ; + this.onceReceivedResult = false ; + } + + /** + * 鑷繁澶勭悊鑷繁 + * @param now + * @return + */ + public boolean dealSelf(Long now){ + if(this.onceReceivedResult){ + //宸茬粡鏀跺埌鍛戒护缁撴灉 + //璁板綍鐘舵�� + //RtuStatusDealer.commandSuccess(this.result.rtuAddr, this.result.downCode, this.result.downCodeName); + return true ; + } + boolean noConnect2MqSv = false ; + MqttManager mqttManager = MqttManager.getInstance() ; + MqttClient mqttClient = null ; + + noConnect2MqSv = mqttManager.poolIsClose() ; + if(noConnect2MqSv){ + //鏈浘杩炴帴MQTT鏈嶅姟鍣� + return this.decideRemoveNodeFromCach(now) ; + }else{ + try { + //濡傛灉缃戠粶涓嶅ソ鎴栨柇缃戯紝姝ゅ鐢ㄦ椂杈冮暱 + mqttClient = mqttManager.popMqttClient() ; + if(mqttClient == null || !mqttClient.isConnected()){ + noConnect2MqSv = false ; + } + }catch (Exception e){ + log.error("鑾峰彇MQTT瀹㈡埛绔け璐�", e); + } + } + if(noConnect2MqSv){ + //鏈浘杩炴帴MQTT鏈嶅姟鍣� + return this.decideRemoveNodeFromCach(now) ; + }else{ + if(mqttClient != null && mqttClient.isConnected()){ + try { + mqttManager.publishMsg(mqttClient, this.result.topic, this.result.msg); + log.info("鍙戝竷MQTT娑堟伅锛堜富棰�=" + this.result.topic + "锛�" + this.result.msg); + }catch (Exception e){ + log.error("MQTT鍙戝竷娑堟伅澶辫触锛堜富棰�=" + this.result.topic + "锛�" , e); + }finally { + mqttManager.pushMqttClient(mqttClient); + } + return false ; + }else{ + //鏈浘杩炴帴MQTT鏈嶅姟鍣� + return this.decideRemoveNodeFromCach(now) ; + } + } + } + + private boolean decideRemoveNodeFromCach(Long now){ + if(!this.result.isCacheForOffLine){ + //涓嶅湪绾垮懡浠や笉缂撳瓨 + return true ; + }else{ + //涓嶅湪绾垮懡浠ょ紦瀛� + if(now - this.cachTime >= ServerProperties.offLineCacheTimeout){ + //缂撳瓨瓒呮椂 + return true ; + } + } + return false ; + } +} diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgCache.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgCache.java new file mode 100644 index 0000000..cfa5184 --- /dev/null +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgCache.java @@ -0,0 +1,68 @@ +package com.dy.rtuMw.server.mqtt; + +import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; +import com.dy.common.queue.Node; +import com.dy.common.queue.Queue; +import com.dy.rtuMw.server.ServerProperties; + +/** + * @Author: liurunyu + * @Date: 2025/6/4 16:13 + * @Description + */ +public class MqttSubMsgCache { + + //TCP涓嬭鍛戒护缂撳瓨闃熷垪 + private static Queue cacheQueue = new Queue("mqttSubMsgCache") ; + + private static MqttSubMsgCache instance = new MqttSubMsgCache() ; + + private MqttSubMsgCache(){ + cacheQueue.setLimit(ServerProperties.cacheUpDownDataWarnCount, ServerProperties.cacheUpDownDataMaxCount); + } + + public static MqttSubMsgCache getInstance(){ + return instance ; + } + + /** + * 缂撳瓨璁㈤槄鐨勬秷鎭� + * @param node 鏀跺埌鐨勬秷鎭� + * @throws Exception + */ + public static void cacheMsg(MqttSubMsgNode node) throws Exception{ + cacheQueue.pushHead(node); + } + /** + * 寰楀埌绗竴涓妭鐐� + * @return + */ + public static Node getFirstQueueNode(){ + return cacheQueue.getFirstNode() ; + } + + /** + * 寰楀埌鏈�鍚庝竴涓妭鐐� + * @return + */ + public static Node getLastQueueNode(){ + // 璋冪敤cacheQueue鐨刧etLastNode鏂规硶锛岃繑鍥炴渶鍚庝竴涓妭鐐� + return cacheQueue.getLastNode() ; + } + + /** + * 绉婚櫎鑺傜偣 + * @param node + */ + public static void removeNode(Node node){ + cacheQueue.remove(node); + } + + /** + * 缂撳瓨鐨勮妭鐐规暟 + * @Return 缂撳瓨鑺傜偣鏁� + */ + public static Integer size(){ + return cacheQueue.size() ; + } +} diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgNode.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgNode.java new file mode 100644 index 0000000..a134219 --- /dev/null +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgNode.java @@ -0,0 +1,69 @@ +package com.dy.rtuMw.server.mqtt; + +import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; +import com.dy.common.queue.NodeObj; +import com.dy.common.threadPool.ThreadPool; +import com.dy.common.threadPool.TreadPoolFactory; +import com.dy.rtuMw.server.rtuData.TaskPool; +import com.dy.rtuMw.server.rtuData.TaskSurpport; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * @Author: liurunyu + * @Date: 2025/6/4 16:14 + * @Description + */ +public class MqttSubMsgNode implements NodeObj { + + private static Logger log = LogManager.getLogger(MqttSubMsgNode.class.getName()); + + protected MqttSubMsg msg; + + public MqttSubMsgNode(MqttSubMsg obj){ + this.msg = obj ; + } + + /** + * 鑷繁澶勭悊鑷繁 + * @return + */ + public boolean dealSelf(){ + try { + ThreadPool.Pool pool = TreadPoolFactory.getThreadPoolLong() ; + pool.putJob(new ThreadPool.Job() { + public void execute() { + if(msg != null && msg.valid()){ + TaskSurpport task = null ; + try{ + task = TaskPool.popTask() ; + if(task != null){ + task.execute(msg); + }else{ + log.error("鏈緱鍒癕q璁㈤槄娑堟伅澶勭悊浠诲姟锛�"); + } + }catch(Exception e){ + log.error("Mq璁㈤槄娑堟伅浠诲姟姹犲鐞嗘暟鎹椂鍙戠敓寮傚父", e); + }finally { + if(task != null){ + TaskPool.freeAndCleanTask(task); + } + } + } + } + @Override + public void destroy(){ + } + @Override + public boolean isDestroy(){ + return false ; + } + + }); + } catch (Exception e) { + log.error("鍦∕qttSubMsgObj鍐呭彂鐢熷紓甯�", e); + } + return true ; + } + +} diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnit.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnit.java new file mode 100644 index 0000000..12a22f9 --- /dev/null +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnit.java @@ -0,0 +1,68 @@ +package com.dy.rtuMw.server.mqtt; + +import com.dy.common.mw.UnitAdapterInterface; +import com.dy.common.mw.UnitCallbackInterface; +import com.dy.common.mw.UnitInterface; + +/** + * @Author: liurunyu + * @Date: 2025/6/4 14:46 + * @Description + */ +public class MqttUnit implements UnitInterface { + + private static MqttUnit instance = new MqttUnit() ; + + public static MqttUnitAdapter adapter ; + public static MqttUnitConfigVo confVo ; + + private static MqttManager manager ; + private MqttUnit(){} ; + + public static MqttUnit getInstance(){ + return instance ; + } + + @Override + public void setAdapter(UnitAdapterInterface adapter) throws Exception { + if(adapter == null){ + throw new Exception("Mqtt娑堟伅妯″潡閫傞厤鍣ㄥ璞′笉鑳戒负绌猴紒") ; + } + MqttUnit.adapter = (MqttUnitAdapter)adapter ; + MqttUnit.confVo = MqttUnit.adapter.getConfig() ; + if(MqttUnit.confVo == null){ + throw new Exception("Mqtt娑堟伅妯″潡閰嶇疆瀵硅薄涓嶈兘涓虹┖锛�") ; + } + } + + /** + * 鍒濆鍖� + */ + @Override + public void start(UnitCallbackInterface callback) throws Exception { + if(confVo.enable){ + manager = MqttManager.getInstance() ; + manager.initOption(confVo); + manager.start(); + callback.call(null) ; + System.out.println("Mqtt娑堟伅妯″潡鎴愬姛鍚姩"); + }else{ + System.out.println("Mqtt娑堟伅妯″潡閰嶇疆涓嶅惎鍔�"); + } + } + + @Override + public void stop(UnitCallbackInterface callback) throws Exception { + if(manager != null){ + manager.stop(); + } + callback.call(null) ; + } + + public void recover(UnitCallbackInterface callback) throws Exception { + this.start(callback) ; + callback.call(null) ; + } + +} + diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitAdapter.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitAdapter.java new file mode 100644 index 0000000..9986c8e --- /dev/null +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitAdapter.java @@ -0,0 +1,14 @@ +package com.dy.rtuMw.server.mqtt; + +import com.dy.common.mw.UnitAdapterInterface; + +/** + * @Author: liurunyu + * @Date: 2025/6/4 14:47 + * @Description + */ +public interface MqttUnitAdapter extends UnitAdapterInterface { + + MqttUnitConfigVo getConfig(); + +} \ No newline at end of file diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java new file mode 100644 index 0000000..c767276 --- /dev/null +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java @@ -0,0 +1,29 @@ +package com.dy.rtuMw.server.mqtt; + +/** + * @Author: liurunyu + * @Date: 2025/6/4 14:46 + * @Description + */ +public class MqttUnitConfigVo { + public Boolean showStartInfo ; + public Boolean enable ;//妯″潡鏄惁鍚姩 + public String svIp ;// + public Integer svPort ;// + public String svUserName ;// + public String svUserPassword ;// + public Integer poolMaxSize ;// + public String[] subTopics ;//璁㈤槄鐨勪富棰� + public int[] topicsQos ;////璁㈤槄涓婚鐨凲os + public int publishQos ;////鍙戝竷娑堟伅鐨凲os + + public MqttUnitConfigVo(){ + this.enable = false ; + this.svIp = "127.0.0.1" ; + this.svPort = 1883 ; + this.svUserName = "dyyjy" ; + this.svUserPassword = "Dyyjy2025,;.abc!@#" ; + this.poolMaxSize = 10 ; + this.publishQos = 1 ; + } +} diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/TkMqttData.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/TkMqttData.java new file mode 100644 index 0000000..918f363 --- /dev/null +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/TkMqttData.java @@ -0,0 +1,37 @@ +package com.dy.rtuMw.server.rtuData; + +import com.dy.common.mw.protocol4Mqtt.pSdV1.MqttSubMsgSdV1; +import com.dy.rtuMw.server.rtuData.pSdV1.TkFindPSdV1; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * @Author: liurunyu + * @Date: 2025/6/4 17:05 + * @Description + */ +public class TkMqttData extends TaskSurpport { + + private static Logger log = LogManager.getLogger(TkMqttData.class.getName()) ; + + //绫籌D锛屼竴瀹氫笌Tree.xml閰嶇疆鏂囦欢涓厤缃竴鑷� + public static final String taskId = "TkMqttData" ; + + /** + * 鎵ц鑺傜偣浠诲姟 + * @param data 闇�瑕佸鐞嗙殑鏁版嵁 + */ + @Override + public void execute(Object data) { + if(data == null){ + log.error("涓ラ噸閿欒锛孧qtt璁㈤槄娑堟伅鏁版嵁涓虹┖锛�" ); + }else{ + if(data instanceof MqttSubMsgSdV1){ + this.toNextOneTask(data, TkFindPSdV1.taskId); + }else{ + log.error("涓ラ噸閿欒锛岃鏁版嵁绫诲瀷锛�" + data.getClass().getName() + "锛夛紝鎺ユ敹鏁版嵁浠诲姟杩樻湭瀹炵幇锛�" ); + } + } + } + +} diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/TkReceive.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/TkReceive.java index a0d0608..d389f0d 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/TkReceive.java +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/TkReceive.java @@ -1,5 +1,6 @@ package com.dy.rtuMw.server.rtuData; +import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; import com.dy.common.mw.protocol.Data; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -21,7 +22,9 @@ log.error("涓ラ噸閿欒锛孯TU涓婅鏁版嵁涓虹┖锛�" ); }else{ if(data instanceof Data){ - this.toNextTasks(data); + this.toNextOneTask(data, TkRtuData.taskId); + }else if(data instanceof MqttSubMsg){ + this.toNextOneTask(data, TkMqttData.taskId); }else{ log.error("涓ラ噸閿欒锛岃鏁版嵁绫诲瀷锛�" + data.getClass().getName() + "锛夛紝鎺ユ敹鏁版嵁浠诲姟杩樻湭瀹炵幇锛�" ); } diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkFindPSdV1.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkFindPSdV1.java new file mode 100644 index 0000000..eec2554 --- /dev/null +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkFindPSdV1.java @@ -0,0 +1,31 @@ +package com.dy.rtuMw.server.rtuData.pSdV1; + +import com.dy.common.mw.protocol4Mqtt.pSdV1.MqttSubMsgSdV1; +import com.dy.rtuMw.server.rtuData.TaskSurpport; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * @Author: liurunyu + * @Date: 2025/6/5 13:43 + * @Description + */ +public class TkFindPSdV1 extends TaskSurpport { + + private static Logger log = LogManager.getLogger(TkFindPSdV1.class.getName()) ; + + //绫籌D锛屼竴瀹氫笌Tree.xml閰嶇疆鏂囦欢涓厤缃竴鑷� + public static final String taskId = "TkFindPSdV1" ; + + /** + * 鎵ц鑺傜偣浠诲姟 + * @param data 闇�瑕佸鐞嗙殑鏁版嵁 + */ + @Override + public void execute(Object data) { + //鍓嶉潰鐨勪换鍔″凡缁忓垽鏂簡data涓嶄负绌� + MqttSubMsgSdV1 msg = (MqttSubMsgSdV1)data ; + log.info(msg.toString()); + } + +} diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttComResultConstantTask.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttComResultConstantTask.java new file mode 100644 index 0000000..1c266c5 --- /dev/null +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttComResultConstantTask.java @@ -0,0 +1,75 @@ +package com.dy.rtuMw.server.tasks; + +import com.dy.common.mw.core.CoreTask; +import com.dy.common.queue.Node; +import com.dy.rtuMw.server.mqtt.MqttComResultCache; +import com.dy.rtuMw.server.mqtt.MqttComResultNode; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * 瀵筊TU涓婅鏁版嵁杩涜涓氬姟澶勭悊 + */ +public class MqttComResultConstantTask extends CoreTask { + private static final Logger log = LogManager.getLogger(MqttComResultConstantTask.class.getName()); + + /** + * 鍦ㄥ崟绾跨▼鐜涓繍琛� + */ + @Override + public Integer execute() { + try{ + dealRtuComResult() ; + }catch(Exception e){ + log.error(e); + } + return MqttComResultCache.size()>0?0:1 ; + } + /** + * 澶勭悊涓婅鍛戒护缁撴灉 + */ + public void dealRtuComResult() { + Node first = MqttComResultCache.getFirstQueueNode() ; + if(first != null){ + Node last = MqttComResultCache.getLastQueueNode() ; + while (last != null){ + last = this.doDealRtuComResult(first, last); + } + } + } + + /** + * 澶勭悊缂撳瓨鐨勪笂琛屾暟鎹妭鐐� + * @param first 绗竴涓妭鐐� + * @param last 鏈�鍚庝竴涓妭鐐� + */ + private Node doDealRtuComResult(Node first, Node last){ + if(last != null){ + //鍦╠ealNode鏂规硶涓紝鍙兘瑕佹妸last浠庨槦鍒椾腑绉婚櫎锛岃繖鏃秎ast.pre涓虹┖锛屾墍浠ユ彁鍓嶆妸last.pre鍙栧嚭鏉� + Node pre = last.pre ; + dealNode(last) ; + if(first != last){ + return pre ; + }else{ + //鍋滄 + return null ; + } + }else{ + return null ; + } + } + + + /** + * 澶勭悊涓�涓妭鐐� + * @param node 鑺傜偣 + */ + private void dealNode(Node node){ + if(node != null && node.obj != null){ + MqttComResultNode obj = (MqttComResultNode)node.obj ; + obj.dealSelf() ; + MqttComResultCache.removeNode(node); + } + } + +} diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttPubMessageConstantTask.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttPubMessageConstantTask.java new file mode 100644 index 0000000..703c658 --- /dev/null +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttPubMessageConstantTask.java @@ -0,0 +1,81 @@ +package com.dy.rtuMw.server.tasks; + +import com.dy.common.mw.core.CoreTask; +import com.dy.common.queue.Node; +import com.dy.rtuMw.server.mqtt.MqttPubMsgCache; +import com.dy.rtuMw.server.mqtt.MqttPubMsgNode; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * @Author: liurunyu + * @Date: 2025/6/4 16:25 + * @Description + */ +public class MqttPubMessageConstantTask extends CoreTask { + private static final Logger log = LogManager.getLogger(MqttPubMessageConstantTask.class.getName()); + + /** + * 鍦ㄥ崟绾跨▼鐜涓繍琛� + */ + @Override + public Integer execute() { + try{ + dealMqMsg() ; + }catch(Exception e){ + log.error(e); + } + return MqttPubMsgCache.size()>0?0:1 ; + } + /** + * 澶勭悊MQTT璁㈤槄鐨勬秷鎭� + */ + public void dealMqMsg() { + Node first = MqttPubMsgCache.getFirstQueueNode() ; + if(first != null){ + Node last = MqttPubMsgCache.getLastQueueNode() ; + while (last != null){ + last = this.doDealMqMsg(System.currentTimeMillis(), first, last); + } + } + } + + /** + * 澶勭悊缂撳瓨鐨勪笂琛屾暟鎹妭鐐� + * @param now 褰撳墠鏃跺埢 + * @param first 绗竴涓妭鐐� + * @param last 鏈�鍚庝竴涓妭鐐� + */ + private Node doDealMqMsg(Long now, Node first, Node last){ + if(last != null){ + //鍦╠ealNode鏂规硶涓紝鍙兘瑕佹妸last浠庨槦鍒椾腑绉婚櫎锛岃繖鏃秎ast.pre涓虹┖锛屾墍浠ユ彁鍓嶆妸last.pre鍙栧嚭鏉� + Node pre = last.pre ; + dealNode(now, last) ; + if(first != last){ + return pre ; + }else{ + //鍋滄 + return null ; + } + }else{ + return null ; + } + } + + + /** + * 澶勭悊涓�涓妭鐐� + * @param now 鐜板湪鏃跺埢 + * @param node 鑺傜偣 + */ + private void dealNode(Long now, Node node){ + if(node != null && node.obj != null){ + MqttPubMsgNode obj = (MqttPubMsgNode)node.obj ; + boolean removeNode = obj.dealSelf(now) ; + if(removeNode){ + MqttPubMsgCache.removeNode(node); + } + } + } + +} diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttSubMessageConstantTask.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttSubMessageConstantTask.java new file mode 100644 index 0000000..80501d1 --- /dev/null +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttSubMessageConstantTask.java @@ -0,0 +1,77 @@ +package com.dy.rtuMw.server.tasks; + +import com.dy.common.mw.core.CoreTask; +import com.dy.common.queue.Node; +import com.dy.rtuMw.server.mqtt.MqttSubMsgCache; +import com.dy.rtuMw.server.mqtt.MqttSubMsgNode; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * @Author: liurunyu + * @Date: 2025/6/4 16:25 + * @Description + */ +public class MqttSubMessageConstantTask extends CoreTask { + private static final Logger log = LogManager.getLogger(MqttSubMessageConstantTask.class.getName()); + + /** + * 鍦ㄥ崟绾跨▼鐜涓繍琛� + */ + @Override + public Integer execute() { + try{ + dealMqMsg() ; + }catch(Exception e){ + log.error(e); + } + return MqttSubMsgCache.size()>0?0:1 ; + } + /** + * 澶勭悊MQTT璁㈤槄鐨勬秷鎭� + */ + public void dealMqMsg() { + Node first = MqttSubMsgCache.getFirstQueueNode() ; + if(first != null){ + Node last = MqttSubMsgCache.getLastQueueNode() ; + while (last != null){ + last = this.doDealMqMsg(first, last); + } + } + } + + /** + * 澶勭悊缂撳瓨鐨勪笂琛屾暟鎹妭鐐� + * @param first 绗竴涓妭鐐� + * @param last 鏈�鍚庝竴涓妭鐐� + */ + private Node doDealMqMsg(Node first, Node last){ + if(last != null){ + //鍦╠ealNode鏂规硶涓紝鍙兘瑕佹妸last浠庨槦鍒椾腑绉婚櫎锛岃繖鏃秎ast.pre涓虹┖锛屾墍浠ユ彁鍓嶆妸last.pre鍙栧嚭鏉� + Node pre = last.pre ; + dealNode(last) ; + if(first != last){ + return pre ; + }else{ + //鍋滄 + return null ; + } + }else{ + return null ; + } + } + + + /** + * 澶勭悊涓�涓妭鐐� + * @param node 鑺傜偣 + */ + private void dealNode(Node node){ + if(node != null && node.obj != null){ + MqttSubMsgNode obj = (MqttSubMsgNode)node.obj ; + obj.dealSelf() ; + MqttSubMsgCache.removeNode(node); + } + } + +} diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/WebDownCom4MqttTask.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/WebDownCom4MqttTask.java new file mode 100644 index 0000000..6c4e8f1 --- /dev/null +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/WebDownCom4MqttTask.java @@ -0,0 +1,51 @@ +package com.dy.rtuMw.server.tasks; + +import com.dy.common.mw.core.CoreTask; +import com.dy.common.mw.protocol.Command; +import com.dy.common.mw.protocol.Driver; +import com.dy.common.mw.protocol.MidResult; +import com.dy.common.mw.protocol.ProtocolCache; +import com.dy.common.mw.protocol4Mqtt.MqttMsgParser; +import com.dy.common.mw.protocol4Mqtt.MqttPubMsg; +import com.dy.rtuMw.server.ServerProperties; +import com.dy.rtuMw.server.forTcp.TcpSessionCache; +import com.dy.rtuMw.server.mqtt.MqttPubMsgCache; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * @Author: liurunyu + * @Date: 2025/6/5 15:42 + * @Description + */ + +public class WebDownCom4MqttTask extends CoreTask { + + private static Logger log = LogManager.getLogger(WebDownCom4MqttTask.class.getName()); + + @Override + public Integer execute() { + Command com = (Command)this.data ; + try { + log.info("涓嬪彂MQTT鍛戒护" + com.getCode() + "鐨勬牳蹇冧换鍔″紑濮嬫墽琛�"); + this.deal(com); + } catch (Exception e) { + log.error("澶勭悊涓嬭MQTT鍛戒护鍑洪敊" + (e.getMessage()==null?"!":("锛�" + e.getMessage())) ,e); + } + return null ; + } + + /** + * 澶勭悊鍛戒护 + * @param com 鍛戒护 + * @throws Exception + */ + private void deal(Command com) throws Exception{ + MqttMsgParser parser = new MqttMsgParser() ; + MqttPubMsg pubMsg = parser.createPubMsg(ServerProperties.orgTag, com) ; + if(pubMsg != null){ + MqttPubMsgCache.cacheCommand(pubMsg); + } + } + +} diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/web/com/CommandCtrl.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/web/com/CommandCtrl.java index 5b26ebc..5c13be1 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/web/com/CommandCtrl.java +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/web/com/CommandCtrl.java @@ -7,7 +7,9 @@ import com.dy.rtuMw.server.forTcp.TcpSessionCache; import com.dy.rtuMw.server.local.CommandInnerDeaLer; import com.dy.rtuMw.server.local.ReturnCommand; +import com.dy.rtuMw.server.mqtt.MqttManager; import com.dy.rtuMw.server.msCenter.MsCenterUnit; +import com.dy.rtuMw.server.tasks.WebDownCom4MqttTask; import com.dy.rtuMw.server.tasks.WebDownComTask; import com.dy.common.mw.core.CoreUnit; import com.dy.common.mw.protocol.Command; @@ -247,6 +249,13 @@ }catch(Exception e){ return BaseResponseUtils.buildError(ReturnCommand.errored("澶勭悊鍙戝悜RTU鐨勫閮ㄩ�忎紶鍛戒护鍑洪敊" + (e.getMessage() == null?"":("锛�" + e.getMessage())), com.getId(), com.getCode()) ); } + }else if(commandType.equals(CommandType.mqttCommand)){ + //鍙戝悜MQTT鐨勫閮ㄥ懡浠� + try{ + return this.dealMqttCommand(com) ; + }catch(Exception e){ + return BaseResponseUtils.buildError(ReturnCommand.errored("澶勭悊鍙戝悜RTU鐨勫閮ㄥ懡浠ゅ嚭閿�" + (e.getMessage() == null?"":("锛�" + e.getMessage())), com.getId(), com.getCode()) ); + } }else if(commandType.equals(CommandType.resultCommand)){ return BaseResponseUtils.buildError(ReturnCommand.errored("鍑洪敊锛岄�氫俊涓棿浠朵笉鎺ョ粨鏋滅被鍨嬬殑鍛戒护锛�", com.getId(), com.getCode())); }else{ @@ -336,4 +345,35 @@ return BaseResponseUtils.buildSuccess(ReturnCommand.successed("閫忎紶鍛戒护宸叉帴鍙楋紝鍗冲皢鏋勯�犲苟涓嬪彂鍛戒护銆�", command.getId(), command.getCode())); } + /** + * 澶勭悊鍙戝悜RTU鐨勫閮ㄥ懡浠� + * @return 缁撴灉 + */ + private BaseResponse<Command> dealMqttCommand(Command command){ + String rtuAddr = command.getRtuAddr() ; + if(rtuAddr == null || rtuAddr.trim().equals("")){ + return BaseResponseUtils.buildError(ReturnCommand.errored("鍑洪敊锛岃澶嘔D涓虹┖锛�", command.getId(), command.getCode())) ; + } + if(!ServerProperties.mqttUnitEnable.booleanValue()){ + return BaseResponseUtils.buildError(ReturnCommand.errored("鍑洪敊锛孧QTT杩炴帴妯″潡閰嶇疆鏈惎鍔紒", command.getId(), command.getCode())) ; + } + if(MqttManager.getInstance().poolIsClose()){ + return BaseResponseUtils.buildError(ReturnCommand.errored("鍑洪敊锛孧QTT杩炴帴姹犳按鍒涘缓鎴愬姛锛�", command.getId(), command.getCode())) ; + } + + //鐢熸垚寮傛浠诲姟 + WebDownCom4MqttTask task = new WebDownCom4MqttTask() ; + task.data = command ; + try{ + log.info("鏋勯�犱笅鍙慚QTT鍛戒护" + command.getCode() + "鐨勬牳蹇冧换鍔★紝骞舵斁鍏ヤ换鍔¢槦鍒椾腑"); + CoreUnit.getInstance().pushCoreTask(task); + }catch(Exception e){ + log.error(e.getMessage(), e); + return BaseResponseUtils.buildError(ReturnCommand.successed("MQTT鍛戒护澶勭悊澶辫触" + e.getMessage(), command.getId(), command.getCode())) ; + } + return BaseResponseUtils.buildSuccess(ReturnCommand.successed("MQTT鍛戒护宸叉帴鍙楋紝鍗冲皢鏋勯�犲苟涓嬪彂鍛戒护銆�", command.getId(), command.getCode())); + } + + + } diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/web/comResult/CommandResultDeal.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/web/comResult/CommandResultDeal.java index 2973292..658e68e 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/web/comResult/CommandResultDeal.java +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/web/comResult/CommandResultDeal.java @@ -3,6 +3,7 @@ import com.dy.common.contant.Constant; import com.dy.common.mw.protocol.Command; import com.dy.common.mw.protocol.Data; +import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; import com.dy.rtuMw.server.ServerProperties; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -33,6 +34,10 @@ this.restTemplate = restTemplate ; } + /** + * RTU璁惧鏁版嵁 + * @param data + */ public void deal(Data data) { if (data.rtuResultSendWebUrl != null && !data.rtuResultSendWebUrl.trim().equals("") @@ -58,4 +63,34 @@ log.error("涓ラ噸閿欒锛屽湪com.dy.aceMw.web.comResult.CommandResultDeal閲岋紝澶勭悊鐨勬槸RTU鍛戒护缁撴灉Node锛屼絾鏁版嵁涓璻tuResultSendWebUrl涓虹┖"); } } + + /** + * Mqtt娑堟伅鏁版嵁 + * @param subMsg + */ + public void deal(MqttSubMsg subMsg) { + if (subMsg.mqttResultSendWebUrl != null + && !subMsg.mqttResultSendWebUrl.trim().equals("") + && !subMsg.mqttResultSendWebUrl.trim().equals(Command.ignoreRtuResultSendWebUrl)) { + String url = UriComponentsBuilder.fromUriString(subMsg.mqttResultSendWebUrl) + .build() + .toUriString(); + restTemplate.getMessageConverters().set(1,new StringHttpMessageConverter(StandardCharsets.UTF_8)); + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.parseMediaType("application/json;charset=UTF-8")); + headers.set(Constant.UserTokenKeyInHeader, ServerProperties.orgTag); + HttpEntity<?> httpEntity = new HttpEntity<>(subMsg, headers); + ResponseEntity<WebResponseVo> response = null; + try { + // 閫氳繃Post鏂瑰紡璋冪敤鎺ュ彛 + response = restTemplate.exchange(url, HttpMethod.POST, httpEntity, WebResponseVo.class); + } catch (Exception e) { + log.error("鍛戒护缁撴灉鍥炶皟鍙戠敓寮傚父", e); + e.printStackTrace(); + } + //assert response != null; + } else { + log.error("涓ラ噸閿欒锛屽湪com.dy.aceMw.web.comResult.CommandResultDeal閲岋紝澶勭悊鐨勬槸RTU鍛戒护缁撴灉Node锛屼絾鏁版嵁涓璻tuResultSendWebUrl涓虹┖"); + } + } } diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/RtuDataDealTree.xml b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/RtuDataDealTree.xml index 8c52ed4..67e19f9 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/RtuDataDealTree.xml +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/RtuDataDealTree.xml @@ -92,5 +92,9 @@ </task> </task> </task> + <!-- Mqtt娑堟伅涓棿浠惰闃呯殑娑堟伅 --> + <task id="TkMqttData" name="鎺ユ敹Mqtt娑堟伅" enable="true" class="com.dy.rtuMw.server.rtuData.TkMqttData"> + <task id="TkFindPSdV1" name="璇嗗埆灞变笢V1鏁版嵁" enable="true" class="com.dy.rtuMw.server.rtuData.pSdV1.TkFindPSdV1"> + </task> </task> </project> \ No newline at end of file diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.properties b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.properties index bb7f709..ec1f674 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.properties +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.properties @@ -29,3 +29,19 @@ #RTU涓婅鏁版嵁鏈�灏忛棿闅旓紝澶т簬杩欎釜闂撮殧璁や负璁惧绂荤嚎浜嗭紝娴嬫帶涓�浣撻榾鏄�3锛岃〃闃�涓�浣撴満鏄�6锛岄粯璁ら噰鐢ㄦ椂闂存渶闀跨殑6 base.upData.min.interval=6 +# MQTT鏈嶅姟閰嶇疆 +# 233鏈嶅姟鍣細 +# 鍏冭皨锛� mqtt.enable=false +# 娌欑洏锛� mqtt.enable=false +# 娴嬭瘯锛� mqtt.enable=false +# 姊呮睙锛� mqtt.enable=false +# 121鏈嶅姟鍣細 +# 姘戝嫟锛� mqtt.enable=true +# 寤跺簡锛� mqtt.enable=false +# 榛戦緳姹燂細 mqtt.enable=false +# 鐢樺窞锛� mqtt.enable=false +# 鍑夊窞锛� mqtt.enable=false +# 閲戝窛锛� mqtt.enable=true +mqtt.enable=true +mqtt.topicAndQos=ym/sd1/10000/control/m1,1;ym/sd1/10000/control/m2,1;ym/sd1/control/m4,1;ym/sd1/10000/control/m80,1 + diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml index 631ef66..5f0b6b9 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml @@ -164,4 +164,22 @@ idle="10" /> + + <!-- + topicAndQos: 涓婚涓嶲os锛屼富棰樺悕涓庡叾Qos鐢ㄩ�楀彿闅斿紑锛屽涓富棰樺強Qos鐢ㄥ垎鍙烽殧寮�锛屼緥濡傦細ym/topic1,1;ym/topic2,1;ym/topic3,1锛屽鏋滄湁澶氫釜OrgTag锛屼富棰樺墠缂�鐢ㄥ叾OrgTag + publishQos: 鍙戝竷娑堟伅鐨凲os锛屽彇鍊艰寖鍥达細 + 0 鑷冲涓�娆★紙At most once锛� 娑堟伅鍙戦�佸悗涓嶄繚璇佸埌杈撅紝鍙兘涓㈠け鎴栭噸澶嶏紝寮�閿�鏈�灏忥紝鍙潬鎬ф渶浣庛�� + 1 鑷冲皯涓�娆★紙At least once锛� 娑堟伅鑷冲皯浼氬埌杈句竴娆★紝鍙兘閲嶅锛屼絾涓嶄細涓㈠け锛屽彲闈犳�т腑绛夛紝閫傜敤浜庡鏁板満鏅�� + 2 鎭板ソ涓�娆★紙Exactly once锛� 娑堟伅浠呬細鍒拌揪涓�娆★紝涓嶉噸澶嶄笖涓嶄涪澶憋紝鍙潬鎬ф渶楂橈紝浣嗗紑閿�鏈�澶э紝瀹炵幇鏈�澶嶆潅銆� + --> + <mqtt enable="${mqtt.enable}" + svIp="121.199.41.121" + svPort="1883" + svUserName="dyyjy" + svUserPassword="Dyyjy2025,;.abc!@#" + poolMaxSize="10" + topicAndQos="${mqtt.topicAndQos}" + publishQos="1" + /> + </config> \ No newline at end of file -- Gitblit v1.8.0