package com.dy.rtuMw.server.forTcp; import java.util.*; import java.util.Map.Entry; import com.dy.common.mw.channel.tcp.TcpIoSessionAttrIdIsRtuAddr; import org.apache.mina.core.session.IoSession; import com.dy.rtuMw.server.ServerProperties; public class TcpSessionCache { /** * 用Hashtable而不用HashMap原因: * Hashtable线程安全的 * HashMap线程不安全的 * 多线程对sessionTable读出或存入,可能产生异常 * TcpSessionCache是在多线程环境下运行 * * 2023-12-19实测,发现Hashtable并不线程安全,所以应用了HashMap和synchronized */ private static HashMap map = new HashMap() ; /** * 得到信息 * @return */ public static Integer[] info(){ Integer rtuTotalConnect = 0 ;//已经连接过中间件的RTU总数(包括在线与离线的) Integer rtuTotalOnLine = 0 ;//在线RTU总数 Integer rtuTotalOffLine = 0 ;//离线RTU总数 synchronized (map){ rtuTotalConnect = map.size() ; Collection col = map.values() ; for(TcpSession se : col){ if(se.ioSession.isConnected()){ rtuTotalOnLine ++ ; }else{ rtuTotalOffLine ++ ; } } } return new Integer[] {rtuTotalConnect, rtuTotalOnLine, rtuTotalOffLine} ; } /** * 关闭所有网络连接 */ public static void closeAllSessions(){ synchronized (map){ Collection col = map.values() ; for(TcpSession se : col){ se.ioSession.closeNow() ; } map.clear(); } } /** * 加入新的IoSession * @param rtuAddr * @param ioSession */ //public static void putNewTcpSession(String rtuAddr, String protocolName, Short protocolVersion, IoSession ioSession){ public static void putNewTcpSession(String rtuAddr, IoSession ioSession){ synchronized (map){ TcpSession tcpSe = map.get(rtuAddr) ; if(tcpSe == null){ tcpSe = new TcpSession() ; //tcpSe.protocolName = protocolName ; //tcpSe.protocolVersion = protocolVersion ; tcpSe.ioSession = ioSession ; map.put(rtuAddr, tcpSe) ; }else{ tcpSe.ioSession = ioSession ; } } } /** * 更新IoSession对应的rtuAddr * @param oldRtuAddr * @param newRtuAddr * @param ioSession */ //public static void changeRtuAddr(String oldRtuAddr, String newRtuAddr, String protocolName, Short protocolVersion, IoSession ioSession){ public static void changeRtuAddr(String oldRtuAddr, String newRtuAddr, IoSession ioSession){ if(oldRtuAddr != null && newRtuAddr != null && !oldRtuAddr.equals(newRtuAddr)){ synchronized (map){ TcpSession tcpSe = map.get(oldRtuAddr) ; if(tcpSe == null){ putNewTcpSession(newRtuAddr, ioSession) ; }else{ map.remove(oldRtuAddr) ; map.put(newRtuAddr, tcpSe) ; } } } } /** * 得到TcpSession * @param rtuAddr * @return */ public static TcpSession getTcpSession(String rtuAddr){ return map.get(rtuAddr) ; } /** * 得到Tcp通信协议名称 * @param rtuAddr * @return public static String getTcpProtocolName(String rtuAddr){ TcpSession tcpSe = sessionTable.get(rtuAddr) ; if(tcpSe != null){ return tcpSe.protocolName ; }else{ return null ; } } */ /** * 得到Tcp通信协议名称 * @param rtuAddr * @return * */ public static Object[] getTcpProtocolNameVersion(String rtuAddr){ TcpSession tcpSe = map.get(rtuAddr) ; if(tcpSe != null){ return new Object[]{ tcpSe.ioSession.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolName), tcpSe.ioSession.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolVersion)}; }else{ return null ; } } /** * 得到所有在线与离线数量统计 * @return [0]=在线数量,[2]上线过,但当前离线的数量 */ public static Integer[] allOnLineStateStatistics(){ synchronized (map){ Integer[] arr = new Integer[]{0, 0} ; Iterator> it = map.entrySet().iterator() ; Entry entry = null ; while(it.hasNext()){ entry = it.next() ; if(entry.getValue().ioSession.isConnected()){ arr[0]++ ; }else{ arr[1]++ ; } } return arr ; } } /** * 得到所有在线情况 * @return */ public static HashMap allOnLine(){ synchronized (map){ HashMap map = new HashMap(); Iterator> it = TcpSessionCache.map.entrySet().iterator() ; Entry entry = null ; while(it.hasNext()){ entry = it.next() ; map.put(entry.getKey(), entry.getValue().ioSession.isConnected()) ; } return map ; } } /** * 得到部分在线情况 * @return */ public static HashMap partOnLine(String[] rtuAddrArrGrp){ synchronized (map){ HashMap map = new HashMap(); for(String rtuAddr : rtuAddrArrGrp){ TcpSession tcpSe = TcpSessionCache.map.get(rtuAddr) ; if(tcpSe != null){ map.put(rtuAddr, tcpSe.ioSession.isConnected()) ; } } return map ; } } // // /** // * 得到所有RTU连接状态情况 // * @return // */ // public static List allConnectStatus(){ // synchronized (sessionTable){ // List list = new ArrayList(); // Iterator> it = sessionTable.entrySet().iterator() ; // Entry entry = null ; // while(it.hasNext()){ // entry = it.next() ; // RtuSessionStatus vo = new RtuSessionStatus() ; // vo.rtuAddr = entry.getKey() ; // IoSession se = entry.getValue().ioSession ; // vo.onTrueOffLine = se.isConnected() ; // InetSocketAddress sa = (InetSocketAddress)se.getRemoteAddress() ; // if(sa != null){ // InetAddress inetAddr = sa.getAddress() ; // if(inetAddr != null){ // vo.ip = inetAddr.getHostAddress() ; // vo.port = sa.getPort() ; // } // } // list.add(vo) ; // } // return list ; // } // } // /** * 得到IoSession * @param rtuAddr * @return */ // public IoSession getIoSession(String rtuAddr){ // TcpSession tcpSe = sessionMap.get(rtuAddr) ; // if(tcpSe != null){ // return tcpSe.ioSession ; // } // return null ; // } /** * 网络是否连接 * @param rtuAddr * @return */ public static Boolean isConnect(String rtuAddr){ TcpSession tcpSe = map.get(rtuAddr) ; if(tcpSe != null){ return tcpSe.ioSession.isConnected() ; } return null ; } /** * 通过IoSession输出数据 * @param rtuAddr * @param data * @throws Exception */ public static void write(String rtuAddr, byte[] data) throws Exception{ TcpSession tcpSe = map.get(rtuAddr) ; if(tcpSe != null){ if(tcpSe.ioSession.isConnected()){ tcpSe.ioSession.write(data) ; }else{ throw new Exception("Rtu连接已经关闭!") ; } }else{ throw new Exception("Rtu未曾上线!") ; } } /** * 设置上行数据时刻 * @param rtuAddr */ public static void cacheUpDataTime(String rtuAddr){ TcpSession tcpSe = map.get(rtuAddr) ; if(tcpSe != null){ tcpSe.lastUpDataTime = System.currentTimeMillis() ; tcpSe.lastUpDataTimeForOnlineCtrl = System.currentTimeMillis() ; } } /** * 更新上行数据时刻 * 当上行数据时刻已经过去一定时长,上行数据时刻清空 * 当一定时间内没有上行数据,则认为RTU离线 */ public static void updateRtuStatus(Long now){ synchronized (map){ Set> entrySet = map.entrySet() ; Iterator> it = entrySet.iterator() ; Map.Entry entry ; TcpSession tcpSe ; while(it.hasNext()){ entry = it.next() ; tcpSe = entry.getValue(); if(tcpSe.lastUpDataTime != null){ if(now - tcpSe.lastUpDataTime > ServerProperties.lastUpDataTimeLive){ tcpSe.lastUpDataTime = null ; } } if(tcpSe.lastUpDataTimeForOnlineCtrl != null){ if(tcpSe.ioSession != null && tcpSe.ioSession.isConnected()){ if(now - tcpSe.lastUpDataTimeForOnlineCtrl > ServerProperties.disconnectedByNoUpDataMinutes){ tcpSe.ioSession.closeNow() ; RtuLogDealer.log(entry.getKey(), "因较长时间未收上行数据,认为设备离线"); } } } } } } // public static void updateRtuStatus(Long now){ // synchronized (sessionTable){ // Iterator it = sessionTable.values().iterator() ; // TcpSession tcpSe ; // while(it.hasNext()){ // tcpSe = it.next() ; // if(tcpSe.lastUpDataTime != null){ // if(now - tcpSe.lastUpDataTime > ServerProperties.lastUpDataTimeLive){ // tcpSe.lastUpDataTime = null ; // } // } // } // } // } }