|  |  | 
 |  |  | package com.dy.rtuMw.server.forTcp; | 
 |  |  |  | 
 |  |  | import java.net.InetAddress; | 
 |  |  | import java.net.InetSocketAddress; | 
 |  |  | import java.util.ArrayList; | 
 |  |  | import java.util.HashMap; | 
 |  |  | import java.util.Iterator; | 
 |  |  | import java.util.List; | 
 |  |  | import java.util.*; | 
 |  |  | import java.util.Map.Entry; | 
 |  |  |  | 
 |  |  | import com.dy.common.mw.channel.tcp.TcpIoSessionAttrIdIsRtuAddr; | 
 |  |  | import org.apache.mina.core.session.IoSession; | 
 |  |  |  | 
 |  |  | import com.dy.rtuMw.server.ServerProperties; | 
 |  |  | 
 |  |  |     * | 
 |  |  |     * 2023-12-19实测,发现Hashtable并不线程安全,所以应用了HashMap和synchronized | 
 |  |  |     */ | 
 |  |  |    private static HashMap<String, TcpSession> sessionTable = new HashMap<String, TcpSession>() ; | 
 |  |  | 	 | 
 |  |  |    private static HashMap<String, TcpSession> map = new HashMap<String, TcpSession>() ; | 
 |  |  |  | 
 |  |  |  | 
 |  |  |    /** | 
 |  |  |     * 得到信息 | 
 |  |  |     * @return | 
 |  |  |     */ | 
 |  |  |    public static Integer[] info(){ | 
 |  |  |       Integer rtuTotalConnect = 0 ;//已经连接过中间件的RTU总数(包括在线与离线的) | 
 |  |  |       Integer rtuTotalOnLine = 0 ;//在线RTU总数 | 
 |  |  |       Integer rtuTotalOffLine = 0 ;//离线RTU总数 | 
 |  |  |       synchronized (map){ | 
 |  |  |          rtuTotalConnect = map.size() ; | 
 |  |  |          Collection<TcpSession> col = map.values() ; | 
 |  |  |          for(TcpSession se : col){ | 
 |  |  |             if(se.ioSession.isConnected()){ | 
 |  |  |                rtuTotalOnLine ++ ; | 
 |  |  |             }else{ | 
 |  |  |                rtuTotalOffLine ++ ; | 
 |  |  |             } | 
 |  |  |          } | 
 |  |  |       } | 
 |  |  |       return new Integer[] {rtuTotalConnect, rtuTotalOnLine, rtuTotalOffLine} ; | 
 |  |  |    } | 
 |  |  |  | 
 |  |  |    /** | 
 |  |  |     * 关闭所有网络连接 | 
 |  |  |     */ | 
 |  |  |    public static void closeAllSessions(){ | 
 |  |  |       synchronized (map){ | 
 |  |  |          Collection<TcpSession> col = map.values() ; | 
 |  |  |          for(TcpSession se : col){ | 
 |  |  |             se.ioSession.closeNow() ; | 
 |  |  |          } | 
 |  |  |          map.clear(); | 
 |  |  |       } | 
 |  |  |    } | 
 |  |  |  | 
 |  |  |    /** | 
 |  |  |     * 加入新的IoSession | 
 |  |  |     * @param rtuAddr | 
 |  |  |     * @param protocolName | 
 |  |  |     * @param ioSession | 
 |  |  |     */ | 
 |  |  |    public static void putNewTcpSession(String rtuAddr, String protocolName, IoSession ioSession){ | 
 |  |  |       synchronized (sessionTable){ | 
 |  |  |          TcpSession tcpSe = sessionTable.get(rtuAddr) ; | 
 |  |  |    //public static void putNewTcpSession(String rtuAddr, String protocolName, Short protocolVersion, IoSession ioSession){ | 
 |  |  |    public static void putNewTcpSession(String rtuAddr, IoSession ioSession){ | 
 |  |  |       synchronized (map){ | 
 |  |  |          TcpSession tcpSe = map.get(rtuAddr) ; | 
 |  |  |          if(tcpSe == null){ | 
 |  |  |             tcpSe = new TcpSession() ; | 
 |  |  |             tcpSe.protocolName = protocolName ; | 
 |  |  |             //tcpSe.protocolName = protocolName ; | 
 |  |  |             //tcpSe.protocolVersion = protocolVersion ; | 
 |  |  |             tcpSe.ioSession = ioSession ; | 
 |  |  |             sessionTable.put(rtuAddr, tcpSe) ; | 
 |  |  |             map.put(rtuAddr, tcpSe) ; | 
 |  |  |          }else{ | 
 |  |  |             tcpSe.ioSession = ioSession ; | 
 |  |  |          } | 
 |  |  | 
 |  |  |     * 更新IoSession对应的rtuAddr | 
 |  |  |     * @param oldRtuAddr | 
 |  |  |     * @param newRtuAddr | 
 |  |  |     * @param protocolName | 
 |  |  |     * @param ioSession | 
 |  |  |     */ | 
 |  |  |    public static void changeRtuAddr(String oldRtuAddr, String newRtuAddr, String protocolName, IoSession ioSession){ | 
 |  |  |    //public static void changeRtuAddr(String oldRtuAddr, String newRtuAddr, String protocolName, Short protocolVersion, IoSession ioSession){ | 
 |  |  |    public static void changeRtuAddr(String oldRtuAddr, String newRtuAddr, IoSession ioSession){ | 
 |  |  |       if(oldRtuAddr != null && newRtuAddr != null && !oldRtuAddr.equals(newRtuAddr)){ | 
 |  |  |          synchronized (sessionTable){ | 
 |  |  |             TcpSession tcpSe = sessionTable.get(oldRtuAddr) ; | 
 |  |  |          synchronized (map){ | 
 |  |  |             TcpSession tcpSe = map.get(oldRtuAddr) ; | 
 |  |  |             if(tcpSe == null){ | 
 |  |  |                putNewTcpSession(newRtuAddr, protocolName, ioSession) ; | 
 |  |  |                putNewTcpSession(newRtuAddr, ioSession) ; | 
 |  |  |             }else{ | 
 |  |  |                sessionTable.remove(oldRtuAddr) ; | 
 |  |  |                sessionTable.put(newRtuAddr, tcpSe) ; | 
 |  |  |                map.remove(oldRtuAddr) ; | 
 |  |  |                map.put(newRtuAddr, tcpSe) ; | 
 |  |  |             } | 
 |  |  |          } | 
 |  |  |       } | 
 |  |  | 
 |  |  |     * @return | 
 |  |  |     */ | 
 |  |  |    public static TcpSession getTcpSession(String rtuAddr){ | 
 |  |  |       return sessionTable.get(rtuAddr) ; | 
 |  |  |       return map.get(rtuAddr) ; | 
 |  |  |    } | 
 |  |  | 	 | 
 |  |  |  | 
 |  |  |    /** | 
 |  |  |     * 得到Tcp通信协议名称 | 
 |  |  |     * @param rtuAddr | 
 |  |  |     * @return | 
 |  |  |     */ | 
 |  |  |    public static String getTcpProtocolName(String rtuAddr){ | 
 |  |  |       TcpSession tcpSe = sessionTable.get(rtuAddr) ; | 
 |  |  |     * */ | 
 |  |  |    public static Object[] getTcpProtocolNameVersion(String rtuAddr){ | 
 |  |  |       TcpSession tcpSe = map.get(rtuAddr) ; | 
 |  |  |       if(tcpSe != null){ | 
 |  |  |          return tcpSe.protocolName ; | 
 |  |  |          return new Object[]{ | 
 |  |  |                tcpSe.ioSession.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolName), | 
 |  |  |                tcpSe.ioSession.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolVersion)}; | 
 |  |  |       }else{ | 
 |  |  |          return null ; | 
 |  |  |       } | 
 |  |  |    } | 
 |  |  |  | 
 |  |  |  | 
 |  |  |    /** | 
 |  |  |     * 得到所有在线与离线数量统计 | 
 |  |  |     * @return [0]=在线数量,[1]上线过,但当前离线的数量 | 
 |  |  |     */ | 
 |  |  |    public static Integer[] allOnLineStateStatistics(){ | 
 |  |  |       synchronized (map){ | 
 |  |  |          Integer[] arr = new Integer[]{0, 0} ; | 
 |  |  |          Iterator<Entry<String, TcpSession>> it = map.entrySet().iterator() ; | 
 |  |  |          Entry<String, TcpSession> entry = null ; | 
 |  |  |          while(it.hasNext()){ | 
 |  |  |             entry = it.next() ; | 
 |  |  |             if(entry.getValue().ioSession.isConnected()){ | 
 |  |  |                arr[0]++ ; | 
 |  |  |             }else{ | 
 |  |  |                arr[1]++ ; | 
 |  |  |             } | 
 |  |  |          } | 
 |  |  |          return arr ; | 
 |  |  |       } | 
 |  |  |    } | 
 |  |  |     | 
 |  |  | 
 |  |  |     * @return | 
 |  |  |     */ | 
 |  |  |    public static HashMap<String, Boolean> allOnLine(){ | 
 |  |  |       synchronized (sessionTable){ | 
 |  |  |       synchronized (map){ | 
 |  |  |          HashMap<String, Boolean> map = new HashMap<String, Boolean>(); | 
 |  |  |          Iterator<Entry<String, TcpSession>> it = sessionTable.entrySet().iterator() ; | 
 |  |  |          Iterator<Entry<String, TcpSession>> it = TcpSessionCache.map.entrySet().iterator() ; | 
 |  |  |          Entry<String, TcpSession> entry = null ; | 
 |  |  |          while(it.hasNext()){ | 
 |  |  |             entry = it.next() ; | 
 |  |  | 
 |  |  |          return map ; | 
 |  |  |       } | 
 |  |  |    } | 
 |  |  |  | 
 |  |  |  | 
 |  |  |    /** | 
 |  |  |     * 得到所有RTU连接状态情况 | 
 |  |  |     * 得到部分在线情况 | 
 |  |  |     * @return | 
 |  |  |     */ | 
 |  |  |    public static List<RtuSessionStatus> allConnectStatus(){ | 
 |  |  |       synchronized (sessionTable){ | 
 |  |  |          List<RtuSessionStatus> list = new ArrayList<RtuSessionStatus>(); | 
 |  |  |          Iterator<Entry<String, TcpSession>> it = sessionTable.entrySet().iterator() ; | 
 |  |  |          Entry<String, TcpSession> entry = null ; | 
 |  |  |          while(it.hasNext()){ | 
 |  |  |             entry = it.next() ; | 
 |  |  |             RtuSessionStatus vo = new RtuSessionStatus() ; | 
 |  |  |             vo.rtuAddr = entry.getKey() ; | 
 |  |  |             IoSession se = entry.getValue().ioSession ; | 
 |  |  |             vo.onTrueOffLine = se.isConnected() ; | 
 |  |  |             InetSocketAddress sa = (InetSocketAddress)se.getRemoteAddress() ; | 
 |  |  |             if(sa != null){ | 
 |  |  |                InetAddress inetAddr = sa.getAddress() ; | 
 |  |  |                if(inetAddr != null){ | 
 |  |  |                   vo.ip = inetAddr.getHostAddress() ; | 
 |  |  |                   vo.port = sa.getPort() ; | 
 |  |  |                } | 
 |  |  |    public static HashMap<String, Boolean> partOnLine(String[] rtuAddrArrGrp){ | 
 |  |  |       synchronized (map){ | 
 |  |  |          HashMap<String, Boolean> map = new HashMap<String, Boolean>(); | 
 |  |  |          for(String rtuAddr : rtuAddrArrGrp){ | 
 |  |  |             TcpSession tcpSe = TcpSessionCache.map.get(rtuAddr) ; | 
 |  |  |             if(tcpSe != null){ | 
 |  |  |                map.put(rtuAddr, tcpSe.ioSession.isConnected()) ; | 
 |  |  |             } | 
 |  |  |             list.add(vo) ; | 
 |  |  |          } | 
 |  |  |          return list ; | 
 |  |  |          return map ; | 
 |  |  |       } | 
 |  |  |    } | 
 |  |  | 	 | 
 |  |  | 	 | 
 |  |  |    /** | 
 |  |  |     * 得到IoSession | 
 |  |  |     * @param rtuAddr | 
 |  |  |     * @return | 
 |  |  |     */ | 
 |  |  | //   public IoSession getIoSession(String rtuAddr){ | 
 |  |  | //      TcpSession tcpSe = sessionMap.get(rtuAddr) ; | 
 |  |  | //      if(tcpSe != null){ | 
 |  |  | //         return tcpSe.ioSession ; | 
 |  |  | //      } | 
 |  |  | //      return null ; | 
 |  |  | //   } | 
 |  |  | 	 | 
 |  |  |  | 
 |  |  |    /** | 
 |  |  |     * 网络是否连接 | 
 |  |  |     * @param rtuAddr | 
 |  |  |     * @return | 
 |  |  |     */ | 
 |  |  |    public static Boolean isConnect(String rtuAddr){ | 
 |  |  |       TcpSession tcpSe = sessionTable.get(rtuAddr) ; | 
 |  |  |       TcpSession tcpSe = map.get(rtuAddr) ; | 
 |  |  |       if(tcpSe != null){ | 
 |  |  |          return tcpSe.ioSession.isConnected() ; | 
 |  |  |       } | 
 |  |  | 
 |  |  |     * @throws Exception | 
 |  |  |     */ | 
 |  |  |    public static void write(String rtuAddr, byte[] data) throws Exception{ | 
 |  |  |       TcpSession tcpSe = sessionTable.get(rtuAddr) ; | 
 |  |  |       TcpSession tcpSe = map.get(rtuAddr) ; | 
 |  |  |       if(tcpSe != null){ | 
 |  |  |          if(tcpSe.ioSession.isConnected()){ | 
 |  |  |             tcpSe.ioSession.write(data) ; | 
 |  |  | 
 |  |  |    } | 
 |  |  |     | 
 |  |  |    /** | 
 |  |  |     * 设置上行数据时刻 | 
 |  |  |     * 当有上行数据时 | 
 |  |  |     * @param rtuAddr | 
 |  |  |     */ | 
 |  |  |    public static void cacheUpDataTime(String rtuAddr){ | 
 |  |  |       TcpSession tcpSe = sessionTable.get(rtuAddr) ; | 
 |  |  |    public static void whenUpData(String rtuAddr){ | 
 |  |  |       TcpSession tcpSe = map.get(rtuAddr) ; | 
 |  |  |       if(tcpSe != null){ | 
 |  |  |          tcpSe.lastDownComTime = 0L ;//置0,使等待lastUpDataTimeLive(见config.xml配置文件)时长后,即刻下发缓存中的命令 | 
 |  |  |          tcpSe.lastUpDataTime = System.currentTimeMillis() ; | 
 |  |  |          tcpSe.lastUpDataTimeForOnlineCtrl = System.currentTimeMillis() ; | 
 |  |  |       } | 
 |  |  |    } | 
 |  |  |     | 
 |  |  |    /** | 
 |  |  |     * 更新上行数据时刻 | 
 |  |  |     * 当上行数据时刻已经过去一定时长,上行数据时刻清空 | 
 |  |  |     * 当一定时间内没有上行数据,则认为RTU离线 | 
 |  |  |     */ | 
 |  |  |    public static void updateUpDataTime(Long now){ | 
 |  |  |       synchronized (sessionTable){ | 
 |  |  |          Iterator<TcpSession> it = sessionTable.values().iterator() ; | 
 |  |  |    public static void updateRtuStatus(Long now){ | 
 |  |  |       synchronized (map){ | 
 |  |  |          Set<Map.Entry<String, TcpSession>> entrySet = map.entrySet() ; | 
 |  |  |          Iterator<Map.Entry<String, TcpSession>> it = entrySet.iterator() ; | 
 |  |  |          Map.Entry<String, TcpSession> entry ; | 
 |  |  |          TcpSession tcpSe ; | 
 |  |  |          while(it.hasNext()){ | 
 |  |  |             tcpSe = it.next() ; | 
 |  |  |             entry = it.next() ; | 
 |  |  |             tcpSe = entry.getValue(); | 
 |  |  |             if(tcpSe.lastUpDataTime != null){ | 
 |  |  |                if(now - tcpSe.lastUpDataTime > ServerProperties.lastUpDataTimeLive){ | 
 |  |  |                   tcpSe.lastUpDataTime = null ; | 
 |  |  |                } | 
 |  |  |             } | 
 |  |  |             if(tcpSe.lastUpDataTimeForOnlineCtrl != null){ | 
 |  |  |                if(tcpSe.ioSession != null && tcpSe.ioSession.isConnected()){ | 
 |  |  |                   if(now - tcpSe.lastUpDataTimeForOnlineCtrl > ServerProperties.disconnectedByNoUpDataMinutes){ | 
 |  |  |                      tcpSe.ioSession.closeNow() ; | 
 |  |  |                      RtuLogDealer.log(entry.getKey(), "因较长时间未收到上行数据,认为设备离线"); | 
 |  |  |                   } | 
 |  |  |                } | 
 |  |  |             } | 
 |  |  |          } | 
 |  |  |       } | 
 |  |  |    } | 
 |  |  |  | 
 |  |  |  | 
 |  |  | } |