From 1135c89deb50a080152f9086fc7b741c415ecd54 Mon Sep 17 00:00:00 2001 From: liurunyu <lry9898@163.com> Date: 星期三, 12 二月 2025 17:00:14 +0800 Subject: [PATCH] 通信中间件增加功能: 1、实现消息心中; 2、开阀报、关阀报、报警数据都会在消息中间件存入消息; 3、在消息中心注册消息接收者,消息中心周期性向消息接收者推送消息。 --- pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/forTcp/TcpSessionCache.java | 100 +++++++------------------------------------------ 1 files changed, 15 insertions(+), 85 deletions(-) diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/forTcp/TcpSessionCache.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/forTcp/TcpSessionCache.java index ca59202..5d92f5a 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/forTcp/TcpSessionCache.java +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/forTcp/TcpSessionCache.java @@ -3,6 +3,7 @@ import java.util.*; import java.util.Map.Entry; +import com.dy.common.mw.channel.tcp.TcpIoSessionAttrIdIsRtuAddr; import org.apache.mina.core.session.IoSession; import com.dy.rtuMw.server.ServerProperties; @@ -60,16 +61,16 @@ /** * 鍔犲叆鏂扮殑IoSession * @param rtuAddr - * @param protocolName * @param ioSession */ - public static void putNewTcpSession(String rtuAddr, String protocolName, Short protocolVersion, IoSession ioSession){ + //public static void putNewTcpSession(String rtuAddr, String protocolName, Short protocolVersion, IoSession ioSession){ + public static void putNewTcpSession(String rtuAddr, IoSession ioSession){ synchronized (map){ TcpSession tcpSe = map.get(rtuAddr) ; if(tcpSe == null){ tcpSe = new TcpSession() ; - tcpSe.protocolName = protocolName ; - tcpSe.protocolVersion = protocolVersion ; + //tcpSe.protocolName = protocolName ; + //tcpSe.protocolVersion = protocolVersion ; tcpSe.ioSession = ioSession ; map.put(rtuAddr, tcpSe) ; }else{ @@ -82,16 +83,15 @@ * 鏇存柊IoSession瀵瑰簲鐨剅tuAddr * @param oldRtuAddr * @param newRtuAddr - * @param protocolName - * @param protocolVersion * @param ioSession */ - public static void changeRtuAddr(String oldRtuAddr, String newRtuAddr, String protocolName, Short protocolVersion, IoSession ioSession){ + //public static void changeRtuAddr(String oldRtuAddr, String newRtuAddr, String protocolName, Short protocolVersion, IoSession ioSession){ + public static void changeRtuAddr(String oldRtuAddr, String newRtuAddr, IoSession ioSession){ if(oldRtuAddr != null && newRtuAddr != null && !oldRtuAddr.equals(newRtuAddr)){ synchronized (map){ TcpSession tcpSe = map.get(oldRtuAddr) ; if(tcpSe == null){ - putNewTcpSession(newRtuAddr, protocolName, protocolVersion, ioSession) ; + putNewTcpSession(newRtuAddr, ioSession) ; }else{ map.remove(oldRtuAddr) ; map.put(newRtuAddr, tcpSe) ; @@ -113,33 +113,22 @@ * 寰楀埌Tcp閫氫俊鍗忚鍚嶇О * @param rtuAddr * @return - - public static String getTcpProtocolName(String rtuAddr){ - TcpSession tcpSe = sessionTable.get(rtuAddr) ; - if(tcpSe != null){ - return tcpSe.protocolName ; - }else{ - return null ; - } - } - */ - /** - * 寰楀埌Tcp閫氫俊鍗忚鍚嶇О - * @param rtuAddr - * @return - */ + * */ public static Object[] getTcpProtocolNameVersion(String rtuAddr){ TcpSession tcpSe = map.get(rtuAddr) ; if(tcpSe != null){ - return new Object[]{tcpSe.protocolName, tcpSe.protocolVersion}; + return new Object[]{ + tcpSe.ioSession.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolName), + tcpSe.ioSession.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolVersion)}; }else{ return null ; } } + /** * 寰楀埌鎵�鏈夊湪绾夸笌绂荤嚎鏁伴噺缁熻 - * @return [0]=鍦ㄧ嚎鏁伴噺锛孾2]涓婄嚎杩囷紝浣嗗綋鍓嶇绾跨殑鏁伴噺 + * @return [0]=鍦ㄧ嚎鏁伴噺锛孾1]涓婄嚎杩囷紝浣嗗綋鍓嶇绾跨殑鏁伴噺 */ public static Integer[] allOnLineStateStatistics(){ synchronized (map){ @@ -192,50 +181,7 @@ return map ; } } -// -// /** -// * 寰楀埌鎵�鏈塕TU杩炴帴鐘舵�佹儏鍐� -// * @return -// */ -// public static List<RtuSessionStatus> allConnectStatus(){ -// synchronized (sessionTable){ -// List<RtuSessionStatus> list = new ArrayList<RtuSessionStatus>(); -// Iterator<Entry<String, TcpSession>> it = sessionTable.entrySet().iterator() ; -// Entry<String, TcpSession> entry = null ; -// while(it.hasNext()){ -// entry = it.next() ; -// RtuSessionStatus vo = new RtuSessionStatus() ; -// vo.rtuAddr = entry.getKey() ; -// IoSession se = entry.getValue().ioSession ; -// vo.onTrueOffLine = se.isConnected() ; -// InetSocketAddress sa = (InetSocketAddress)se.getRemoteAddress() ; -// if(sa != null){ -// InetAddress inetAddr = sa.getAddress() ; -// if(inetAddr != null){ -// vo.ip = inetAddr.getHostAddress() ; -// vo.port = sa.getPort() ; -// } -// } -// list.add(vo) ; -// } -// return list ; -// } -// } -// - - /** - * 寰楀埌IoSession - * @param rtuAddr - * @return - */ -// public IoSession getIoSession(String rtuAddr){ -// TcpSession tcpSe = sessionMap.get(rtuAddr) ; -// if(tcpSe != null){ -// return tcpSe.ioSession ; -// } -// return null ; -// } - + /** * 缃戠粶鏄惁杩炴帴 * @param rtuAddr @@ -310,21 +256,5 @@ } } } - -// public static void updateRtuStatus(Long now){ -// synchronized (sessionTable){ -// Iterator<TcpSession> it = sessionTable.values().iterator() ; -// TcpSession tcpSe ; -// while(it.hasNext()){ -// tcpSe = it.next() ; -// if(tcpSe.lastUpDataTime != null){ -// if(now - tcpSe.lastUpDataTime > ServerProperties.lastUpDataTimeLive){ -// tcpSe.lastUpDataTime = null ; -// } -// } -// } -// } -// } - } -- Gitblit v1.8.0