From 1135c89deb50a080152f9086fc7b741c415ecd54 Mon Sep 17 00:00:00 2001
From: liurunyu <lry9898@163.com>
Date: 星期三, 12 二月 2025 17:00:14 +0800
Subject: [PATCH] 通信中间件增加功能: 1、实现消息心中; 2、开阀报、关阀报、报警数据都会在消息中间件存入消息; 3、在消息中心注册消息接收者,消息中心周期性向消息接收者推送消息。
---
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/tcp/DataDecoder.java | 27 +++++++++++++++++----------
1 files changed, 17 insertions(+), 10 deletions(-)
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/tcp/DataDecoder.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/tcp/DataDecoder.java
index 28720a5..3508fcd 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/tcp/DataDecoder.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/tcp/DataDecoder.java
@@ -27,16 +27,17 @@
* org.apache.mina.core.buffer.IoBuffer,
* org.apache.mina.filter.codec.ProtocolDecoderOutput)
*/
+ @Override
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) {
- String meterNo = (String)session.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrIdKey) ;
- if(meterNo == null){
+ String rtuAddr = (String)session.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrRtuAddr) ;
+ if(rtuAddr == null){
//浼氳瘽鏈绠$悊
//浼氳瘽绠$悊鍣ㄤ腑涓嶅瓨鍦ㄦ浼氳瘽锛岃鏄庡垰寤虹珛缃戠粶杩炴帴锛�
return this.doDecode_onLineData(session, in, out) ;
}else{
//浼氳瘽宸茶绠$悊
//浼氳瘽绠$悊鍣ㄤ腑瀛樺湪姝や細璇濓紝璇存槑宸茬粡涓婄嚎浜嗭紝
- return this.doDecode_data(session, in, out, meterNo) ;
+ return this.doDecode_data(session, in, out, rtuAddr) ;
}
}
@@ -52,7 +53,13 @@
//鏈変竴浜涘崗璁紝璐熻矗鍙戜笂绾挎暟鎹紝浣嗕笂绾�(鎴栧績璺�)鏁版嵁涓棤 Rtu鍦板潃锛屾墍浠ヨ繖鏍风殑鏁版嵁鍙兘鏀捐锛岃�屼笖涓嶈兘浜х敓鏈夎韩浠�(Rtu鍦板潃)鐨勭綉缁滀細璇濓紝鍗充笉鑳芥妸session鏀惧叆浼氳瘽缂撳瓨涓�
//鏈変竴浜涘崗璁紝Rtu璐熻矗鍙戜笂绾挎暟鎹紝涓婄嚎鏁版嵁涓湁 Rtu鍦板潃锛屾墍浠ヨ繖鏍风殑鏁版嵁鏀捐鍚庯紝鑳戒骇鐢熸湁韬唤(Rtu鍦板潃)鐨勭綉缁滀細璇濓紝鍗宠兘鎶妔ession鏀惧叆浼氳瘽缂撳瓨涓�
//涓嶈浣曠鎯呭舰锛屼笂绾挎暟鎹殑鏁版嵁閲忎笉浼氬緢澶э紝涓�鑸笉浼氫骇鐢熸柇鍖咃紝鎵�浠ヨ繖閲屽彧杩涜绠�鍗曟柇鍖呮鏌ャ��
- PrefixedDataAvailableStatus dataStatus = this.pdaHandle.forOnLine(in) ;
+ PrefixedDataAvailableStatus dataStatus = this.pdaHandle.forOnLine(session, in) ;
+
+ if(dataStatus.protocolName != null && dataStatus.protocolVersion != null){
+ session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolName, dataStatus.protocolName) ;
+ session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolVersion, dataStatus.protocolVersion) ;
+ }
+
if(dataStatus.isCompleted() || dataStatus.isAdjoined()){
//姝eソ鎴栫矘鍖�
this.nextDeal(in, dataStatus.getDataLen(), out) ;
@@ -73,16 +80,16 @@
* @param session IO浼氳瘽
* @param in 杈撳叆Buffer
* @param out 鍗忚杈撳嚭缂栫爜
- * @param meterNo 鎺у埗鍣ㄧ紪鍙�
+ * @param rtuAddr 鎺у埗鍣ㄥ湴鍧�
* @return 鏄惁姝eソ鎴栫矘鍖�
*/
@SuppressWarnings("unused")
- private boolean doDecode_data(IoSession session, IoBuffer in, ProtocolDecoderOutput out, String meterNo) {
+ private boolean doDecode_data(IoSession session, IoBuffer in, ProtocolDecoderOutput out, String rtuAddr) {
//闈炰笂绾挎暟鎹紝鍙兘浼氬嚭鐜版柇鍖呮垨绮樺寘鐜拌薄
- PrefixedDataAvailableStatus dataStatus = this.pdaHandle.forUpData(in) ;
+ PrefixedDataAvailableStatus dataStatus = this.pdaHandle.forUpData(session, in) ;
if(dataStatus == null){
//涓嶅彲鑳藉彂鐢�
- log.error("涓ラ噸閿欒锛孯tu (姘磋〃鍙蜂负" + meterNo + ")涓婅鏁版嵁瀹屾暣鎬ф鏌ユ椂锛岃繑鍥炵殑瀵硅薄涓虹┖銆�") ;
+ log.error("涓ラ噸閿欒锛孯tu (RTU" + rtuAddr + ")涓婅鏁版嵁瀹屾暣鎬ф鏌ユ椂锛岃繑鍥炵殑瀵硅薄涓虹┖銆�") ;
this.nextDeal(in, null, out) ;
return true;
}else{
@@ -90,11 +97,11 @@
//鏂寘浜�
return false ;
}else if(dataStatus.isCompleted() || dataStatus.isAdjoined()){
- //鏈寘鏁版嵁宸茬粡鍏ㄩ儴鎺ユ敹锛屽苟涓斿彲鑳界矘鏈変笅鍖呮暟鎹�
+ //鏈寘鏁版嵁宸茬粡鍏ㄩ儴鎺ユ敹锛屾垨鍙兘绮樻湁涓嬪寘鏁版嵁
this.nextDeal(in, dataStatus.getDataLen(), out) ;
if(dataStatus.isAdjoined()){
//璇存槑绮樺寘浜嗭紝杩樻湁鏁版嵁锛岄渶瑕佸杩欎簺鏁版嵁鍐嶆鎵цdoDecode_鏂规硶.
- return this.doDecode_data(session, in, out, meterNo) ;//鍔犱笂閫掑綊
+ return this.doDecode_data(session, in, out, rtuAddr) ;//鍔犱笂閫掑綊
}else if(dataStatus.isCompleted()){
//鏁版嵁涓嶆柇涓嶇矘
return true;
--
Gitblit v1.8.0