|  |  |  | 
|---|
|  |  |  | package com.dy.rtuMw.server.forTcp; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import java.net.InetAddress; | 
|---|
|  |  |  | import java.net.InetSocketAddress; | 
|---|
|  |  |  | import java.util.*; | 
|---|
|  |  |  | import java.util.Map.Entry; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import com.dy.common.util.DateTime; | 
|---|
|  |  |  | import com.dy.common.mw.channel.tcp.TcpIoSessionAttrIdIsRtuAddr; | 
|---|
|  |  |  | import org.apache.mina.core.session.IoSession; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import com.dy.rtuMw.server.ServerProperties; | 
|---|
|  |  |  | 
|---|
|  |  |  | * | 
|---|
|  |  |  | * 2023-12-19实测,发现Hashtable并不线程安全,所以应用了HashMap和synchronized | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | private static HashMap<String, TcpSession> sessionTable = new HashMap<String, TcpSession>() ; | 
|---|
|  |  |  | private static HashMap<String, TcpSession> map = new HashMap<String, TcpSession>() ; | 
|---|
|  |  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * 得到信息 | 
|---|
|  |  |  | * @return | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | public static Integer[] info(){ | 
|---|
|  |  |  | Integer rtuTotalConnect = 0 ;//已经连接过中间件的RTU总数(包括在线与离线的) | 
|---|
|  |  |  | Integer rtuTotalOnLine = 0 ;//在线RTU总数 | 
|---|
|  |  |  | Integer rtuTotalOffLine = 0 ;//离线RTU总数 | 
|---|
|  |  |  | synchronized (map){ | 
|---|
|  |  |  | rtuTotalConnect = map.size() ; | 
|---|
|  |  |  | Collection<TcpSession> col = map.values() ; | 
|---|
|  |  |  | for(TcpSession se : col){ | 
|---|
|  |  |  | if(se.ioSession.isConnected()){ | 
|---|
|  |  |  | rtuTotalOnLine ++ ; | 
|---|
|  |  |  | }else{ | 
|---|
|  |  |  | rtuTotalOffLine ++ ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | return new Integer[] {rtuTotalConnect, rtuTotalOnLine, rtuTotalOffLine} ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * 关闭所有网络连接 | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | public static void closeAllSessions(){ | 
|---|
|  |  |  | synchronized (sessionTable){ | 
|---|
|  |  |  | Collection<TcpSession> col = sessionTable.values() ; | 
|---|
|  |  |  | synchronized (map){ | 
|---|
|  |  |  | Collection<TcpSession> col = map.values() ; | 
|---|
|  |  |  | for(TcpSession se : col){ | 
|---|
|  |  |  | se.ioSession.closeNow() ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | sessionTable.clear(); | 
|---|
|  |  |  | map.clear(); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * 加入新的IoSession | 
|---|
|  |  |  | * @param rtuAddr | 
|---|
|  |  |  | * @param protocolName | 
|---|
|  |  |  | * @param ioSession | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | public static void putNewTcpSession(String rtuAddr, String protocolName, IoSession ioSession){ | 
|---|
|  |  |  | synchronized (sessionTable){ | 
|---|
|  |  |  | TcpSession tcpSe = sessionTable.get(rtuAddr) ; | 
|---|
|  |  |  | //public static void putNewTcpSession(String rtuAddr, String protocolName, Short protocolVersion, IoSession ioSession){ | 
|---|
|  |  |  | public static void putNewTcpSession(String rtuAddr, IoSession ioSession){ | 
|---|
|  |  |  | synchronized (map){ | 
|---|
|  |  |  | TcpSession tcpSe = map.get(rtuAddr) ; | 
|---|
|  |  |  | if(tcpSe == null){ | 
|---|
|  |  |  | tcpSe = new TcpSession() ; | 
|---|
|  |  |  | tcpSe.protocolName = protocolName ; | 
|---|
|  |  |  | //tcpSe.protocolName = protocolName ; | 
|---|
|  |  |  | //tcpSe.protocolVersion = protocolVersion ; | 
|---|
|  |  |  | tcpSe.ioSession = ioSession ; | 
|---|
|  |  |  | sessionTable.put(rtuAddr, tcpSe) ; | 
|---|
|  |  |  | map.put(rtuAddr, tcpSe) ; | 
|---|
|  |  |  | }else{ | 
|---|
|  |  |  | tcpSe.ioSession = ioSession ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | 
|---|
|  |  |  | * 更新IoSession对应的rtuAddr | 
|---|
|  |  |  | * @param oldRtuAddr | 
|---|
|  |  |  | * @param newRtuAddr | 
|---|
|  |  |  | * @param protocolName | 
|---|
|  |  |  | * @param ioSession | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | public static void changeRtuAddr(String oldRtuAddr, String newRtuAddr, String protocolName, IoSession ioSession){ | 
|---|
|  |  |  | //public static void changeRtuAddr(String oldRtuAddr, String newRtuAddr, String protocolName, Short protocolVersion, IoSession ioSession){ | 
|---|
|  |  |  | public static void changeRtuAddr(String oldRtuAddr, String newRtuAddr, IoSession ioSession){ | 
|---|
|  |  |  | if(oldRtuAddr != null && newRtuAddr != null && !oldRtuAddr.equals(newRtuAddr)){ | 
|---|
|  |  |  | synchronized (sessionTable){ | 
|---|
|  |  |  | TcpSession tcpSe = sessionTable.get(oldRtuAddr) ; | 
|---|
|  |  |  | synchronized (map){ | 
|---|
|  |  |  | TcpSession tcpSe = map.get(oldRtuAddr) ; | 
|---|
|  |  |  | if(tcpSe == null){ | 
|---|
|  |  |  | putNewTcpSession(newRtuAddr, protocolName, ioSession) ; | 
|---|
|  |  |  | putNewTcpSession(newRtuAddr, ioSession) ; | 
|---|
|  |  |  | }else{ | 
|---|
|  |  |  | sessionTable.remove(oldRtuAddr) ; | 
|---|
|  |  |  | sessionTable.put(newRtuAddr, tcpSe) ; | 
|---|
|  |  |  | map.remove(oldRtuAddr) ; | 
|---|
|  |  |  | map.put(newRtuAddr, tcpSe) ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | 
|---|
|  |  |  | * @return | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | public static TcpSession getTcpSession(String rtuAddr){ | 
|---|
|  |  |  | return sessionTable.get(rtuAddr) ; | 
|---|
|  |  |  | return map.get(rtuAddr) ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * 得到Tcp通信协议名称 | 
|---|
|  |  |  | * @param rtuAddr | 
|---|
|  |  |  | * @return | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | public static String getTcpProtocolName(String rtuAddr){ | 
|---|
|  |  |  | TcpSession tcpSe = sessionTable.get(rtuAddr) ; | 
|---|
|  |  |  | if(tcpSe != null){ | 
|---|
|  |  |  | 
|---|
|  |  |  | return null ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * 得到Tcp通信协议名称 | 
|---|
|  |  |  | * @param rtuAddr | 
|---|
|  |  |  | * @return | 
|---|
|  |  |  | * */ | 
|---|
|  |  |  | public static Object[] getTcpProtocolNameVersion(String rtuAddr){ | 
|---|
|  |  |  | TcpSession tcpSe = map.get(rtuAddr) ; | 
|---|
|  |  |  | if(tcpSe != null){ | 
|---|
|  |  |  | return new Object[]{ | 
|---|
|  |  |  | tcpSe.ioSession.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolName), | 
|---|
|  |  |  | tcpSe.ioSession.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolVersion)}; | 
|---|
|  |  |  | }else{ | 
|---|
|  |  |  | return null ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * 得到所有在线与离线数量统计 | 
|---|
|  |  |  | * @return [0]=在线数量,[2]上线过,但当前离线的数量 | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | public static Integer[] allOnLineStateStatistics(){ | 
|---|
|  |  |  | synchronized (map){ | 
|---|
|  |  |  | Integer[] arr = new Integer[]{0, 0} ; | 
|---|
|  |  |  | Iterator<Entry<String, TcpSession>> it = map.entrySet().iterator() ; | 
|---|
|  |  |  | Entry<String, TcpSession> entry = null ; | 
|---|
|  |  |  | while(it.hasNext()){ | 
|---|
|  |  |  | entry = it.next() ; | 
|---|
|  |  |  | if(entry.getValue().ioSession.isConnected()){ | 
|---|
|  |  |  | arr[0]++ ; | 
|---|
|  |  |  | }else{ | 
|---|
|  |  |  | arr[1]++ ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | return arr ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * 得到所有在线情况 | 
|---|
|  |  |  | * @return | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | public static HashMap<String, Boolean> allOnLine(){ | 
|---|
|  |  |  | synchronized (sessionTable){ | 
|---|
|  |  |  | synchronized (map){ | 
|---|
|  |  |  | HashMap<String, Boolean> map = new HashMap<String, Boolean>(); | 
|---|
|  |  |  | Iterator<Entry<String, TcpSession>> it = sessionTable.entrySet().iterator() ; | 
|---|
|  |  |  | Iterator<Entry<String, TcpSession>> it = TcpSessionCache.map.entrySet().iterator() ; | 
|---|
|  |  |  | Entry<String, TcpSession> entry = null ; | 
|---|
|  |  |  | while(it.hasNext()){ | 
|---|
|  |  |  | entry = it.next() ; | 
|---|
|  |  |  | 
|---|
|  |  |  | return map ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * 得到所有RTU连接状态情况 | 
|---|
|  |  |  | * 得到部分在线情况 | 
|---|
|  |  |  | * @return | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | public static List<RtuSessionStatus> allConnectStatus(){ | 
|---|
|  |  |  | synchronized (sessionTable){ | 
|---|
|  |  |  | List<RtuSessionStatus> list = new ArrayList<RtuSessionStatus>(); | 
|---|
|  |  |  | Iterator<Entry<String, TcpSession>> it = sessionTable.entrySet().iterator() ; | 
|---|
|  |  |  | Entry<String, TcpSession> entry = null ; | 
|---|
|  |  |  | while(it.hasNext()){ | 
|---|
|  |  |  | entry = it.next() ; | 
|---|
|  |  |  | RtuSessionStatus vo = new RtuSessionStatus() ; | 
|---|
|  |  |  | vo.rtuAddr = entry.getKey() ; | 
|---|
|  |  |  | IoSession se = entry.getValue().ioSession ; | 
|---|
|  |  |  | vo.onTrueOffLine = se.isConnected() ; | 
|---|
|  |  |  | InetSocketAddress sa = (InetSocketAddress)se.getRemoteAddress() ; | 
|---|
|  |  |  | if(sa != null){ | 
|---|
|  |  |  | InetAddress inetAddr = sa.getAddress() ; | 
|---|
|  |  |  | if(inetAddr != null){ | 
|---|
|  |  |  | vo.ip = inetAddr.getHostAddress() ; | 
|---|
|  |  |  | vo.port = sa.getPort() ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | public static HashMap<String, Boolean> partOnLine(String[] rtuAddrArrGrp){ | 
|---|
|  |  |  | synchronized (map){ | 
|---|
|  |  |  | HashMap<String, Boolean> map = new HashMap<String, Boolean>(); | 
|---|
|  |  |  | for(String rtuAddr : rtuAddrArrGrp){ | 
|---|
|  |  |  | TcpSession tcpSe = TcpSessionCache.map.get(rtuAddr) ; | 
|---|
|  |  |  | if(tcpSe != null){ | 
|---|
|  |  |  | map.put(rtuAddr, tcpSe.ioSession.isConnected()) ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | list.add(vo) ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | return list ; | 
|---|
|  |  |  | return map ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | // | 
|---|
|  |  |  | //   /** | 
|---|
|  |  |  | //    * 得到所有RTU连接状态情况 | 
|---|
|  |  |  | //    * @return | 
|---|
|  |  |  | //    */ | 
|---|
|  |  |  | //   public static List<RtuSessionStatus> allConnectStatus(){ | 
|---|
|  |  |  | //      synchronized (sessionTable){ | 
|---|
|  |  |  | //         List<RtuSessionStatus> list = new ArrayList<RtuSessionStatus>(); | 
|---|
|  |  |  | //         Iterator<Entry<String, TcpSession>> it = sessionTable.entrySet().iterator() ; | 
|---|
|  |  |  | //         Entry<String, TcpSession> entry = null ; | 
|---|
|  |  |  | //         while(it.hasNext()){ | 
|---|
|  |  |  | //            entry = it.next() ; | 
|---|
|  |  |  | //            RtuSessionStatus vo = new RtuSessionStatus() ; | 
|---|
|  |  |  | //            vo.rtuAddr = entry.getKey() ; | 
|---|
|  |  |  | //            IoSession se = entry.getValue().ioSession ; | 
|---|
|  |  |  | //            vo.onTrueOffLine = se.isConnected() ; | 
|---|
|  |  |  | //            InetSocketAddress sa = (InetSocketAddress)se.getRemoteAddress() ; | 
|---|
|  |  |  | //            if(sa != null){ | 
|---|
|  |  |  | //               InetAddress inetAddr = sa.getAddress() ; | 
|---|
|  |  |  | //               if(inetAddr != null){ | 
|---|
|  |  |  | //                  vo.ip = inetAddr.getHostAddress() ; | 
|---|
|  |  |  | //                  vo.port = sa.getPort() ; | 
|---|
|  |  |  | //               } | 
|---|
|  |  |  | //            } | 
|---|
|  |  |  | //            list.add(vo) ; | 
|---|
|  |  |  | //         } | 
|---|
|  |  |  | //         return list ; | 
|---|
|  |  |  | //      } | 
|---|
|  |  |  | //   } | 
|---|
|  |  |  | // | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * 得到IoSession | 
|---|
|  |  |  | 
|---|
|  |  |  | * @return | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | public static Boolean isConnect(String rtuAddr){ | 
|---|
|  |  |  | TcpSession tcpSe = sessionTable.get(rtuAddr) ; | 
|---|
|  |  |  | TcpSession tcpSe = map.get(rtuAddr) ; | 
|---|
|  |  |  | if(tcpSe != null){ | 
|---|
|  |  |  | return tcpSe.ioSession.isConnected() ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | 
|---|
|  |  |  | * @throws Exception | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | public static void write(String rtuAddr, byte[] data) throws Exception{ | 
|---|
|  |  |  | TcpSession tcpSe = sessionTable.get(rtuAddr) ; | 
|---|
|  |  |  | TcpSession tcpSe = map.get(rtuAddr) ; | 
|---|
|  |  |  | if(tcpSe != null){ | 
|---|
|  |  |  | if(tcpSe.ioSession.isConnected()){ | 
|---|
|  |  |  | tcpSe.ioSession.write(data) ; | 
|---|
|  |  |  | 
|---|
|  |  |  | * @param rtuAddr | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | public static void cacheUpDataTime(String rtuAddr){ | 
|---|
|  |  |  | TcpSession tcpSe = sessionTable.get(rtuAddr) ; | 
|---|
|  |  |  | TcpSession tcpSe = map.get(rtuAddr) ; | 
|---|
|  |  |  | if(tcpSe != null){ | 
|---|
|  |  |  | tcpSe.lastUpDataTime = System.currentTimeMillis() ; | 
|---|
|  |  |  | tcpSe.lastUpDataTimeForOnlineCtrl = System.currentTimeMillis() ; | 
|---|
|  |  |  | 
|---|
|  |  |  | * 当一定时间内没有上行数据,则认为RTU离线 | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | public static void updateRtuStatus(Long now){ | 
|---|
|  |  |  | synchronized (sessionTable){ | 
|---|
|  |  |  | Set<Map.Entry<String, TcpSession>> entrySet = sessionTable.entrySet() ; | 
|---|
|  |  |  | synchronized (map){ | 
|---|
|  |  |  | Set<Map.Entry<String, TcpSession>> entrySet = map.entrySet() ; | 
|---|
|  |  |  | Iterator<Map.Entry<String, TcpSession>> it = entrySet.iterator() ; | 
|---|
|  |  |  | Map.Entry<String, TcpSession> entry ; | 
|---|
|  |  |  | TcpSession tcpSe ; | 
|---|
|  |  |  | 
|---|
|  |  |  | if(tcpSe.ioSession != null && tcpSe.ioSession.isConnected()){ | 
|---|
|  |  |  | if(now - tcpSe.lastUpDataTimeForOnlineCtrl > ServerProperties.disconnectedByNoUpDataMinutes){ | 
|---|
|  |  |  | tcpSe.ioSession.closeNow() ; | 
|---|
|  |  |  | RtuLogDealer.log(entry.getKey(), "因较长时间未收上行数据,认为设备离线"); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|