zhubaomin
2024-11-13 ba2c5cb35e1bd1a81bf4027b7aeab16a0a26bb05
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/forTcp/TcpSessionCache.java
@@ -1,11 +1,8 @@
package com.dy.rtuMw.server.forTcp;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.Map.Entry;
import com.dy.common.util.DateTime;
import org.apache.mina.core.session.IoSession;
import com.dy.rtuMw.server.ServerProperties;
@@ -22,18 +19,41 @@
    *
    * 2023-12-19实测,发现Hashtable并不线程安全,所以应用了HashMap和synchronized
    */
   private static HashMap<String, TcpSession> sessionTable = new HashMap<String, TcpSession>() ;
   private static HashMap<String, TcpSession> map = new HashMap<String, TcpSession>() ;
   /**
    * 得到信息
    * @return
    */
   public static Integer[] info(){
      Integer rtuTotalConnect = 0 ;//已经连接过中间件的RTU总数(包括在线与离线的)
      Integer rtuTotalOnLine = 0 ;//在线RTU总数
      Integer rtuTotalOffLine = 0 ;//离线RTU总数
      synchronized (map){
         rtuTotalConnect = map.size() ;
         Collection<TcpSession> col = map.values() ;
         for(TcpSession se : col){
            if(se.ioSession.isConnected()){
               rtuTotalOnLine ++ ;
            }else{
               rtuTotalOffLine ++ ;
            }
         }
      }
      return new Integer[] {rtuTotalConnect, rtuTotalOnLine, rtuTotalOffLine} ;
   }
   /**
    * 关闭所有网络连接
    */
   public static void closeAllSessions(){
      synchronized (sessionTable){
         Collection<TcpSession> col = sessionTable.values() ;
      synchronized (map){
         Collection<TcpSession> col = map.values() ;
         for(TcpSession se : col){
            se.ioSession.closeNow() ;
         }
         sessionTable.clear();
         map.clear();
      }
   }
@@ -43,14 +63,15 @@
    * @param protocolName
    * @param ioSession
    */
   public static void putNewTcpSession(String rtuAddr, String protocolName, IoSession ioSession){
      synchronized (sessionTable){
         TcpSession tcpSe = sessionTable.get(rtuAddr) ;
   public static void putNewTcpSession(String rtuAddr, String protocolName, Short protocolVersion, IoSession ioSession){
      synchronized (map){
         TcpSession tcpSe = map.get(rtuAddr) ;
         if(tcpSe == null){
            tcpSe = new TcpSession() ;
            tcpSe.protocolName = protocolName ;
            tcpSe.protocolVersion = protocolVersion ;
            tcpSe.ioSession = ioSession ;
            sessionTable.put(rtuAddr, tcpSe) ;
            map.put(rtuAddr, tcpSe) ;
         }else{
            tcpSe.ioSession = ioSession ;
         }
@@ -62,17 +83,18 @@
    * @param oldRtuAddr
    * @param newRtuAddr
    * @param protocolName
    * @param protocolVersion
    * @param ioSession
    */
   public static void changeRtuAddr(String oldRtuAddr, String newRtuAddr, String protocolName, IoSession ioSession){
   public static void changeRtuAddr(String oldRtuAddr, String newRtuAddr, String protocolName, Short protocolVersion, IoSession ioSession){
      if(oldRtuAddr != null && newRtuAddr != null && !oldRtuAddr.equals(newRtuAddr)){
         synchronized (sessionTable){
            TcpSession tcpSe = sessionTable.get(oldRtuAddr) ;
         synchronized (map){
            TcpSession tcpSe = map.get(oldRtuAddr) ;
            if(tcpSe == null){
               putNewTcpSession(newRtuAddr, protocolName, ioSession) ;
               putNewTcpSession(newRtuAddr, protocolName, protocolVersion, ioSession) ;
            }else{
               sessionTable.remove(oldRtuAddr) ;
               sessionTable.put(newRtuAddr, tcpSe) ;
               map.remove(oldRtuAddr) ;
               map.put(newRtuAddr, tcpSe) ;
            }
         }
      }
@@ -84,14 +106,14 @@
    * @return
    */
   public static TcpSession getTcpSession(String rtuAddr){
      return sessionTable.get(rtuAddr) ;
      return map.get(rtuAddr) ;
   }
   /**
    * 得到Tcp通信协议名称
    * @param rtuAddr
    * @return
    */
   public static String getTcpProtocolName(String rtuAddr){
      TcpSession tcpSe = sessionTable.get(rtuAddr) ;
      if(tcpSe != null){
@@ -100,15 +122,50 @@
         return null ;
      }
   }
    */
   /**
    * 得到Tcp通信协议名称
    * @param rtuAddr
    * @return
    */
   public static Object[] getTcpProtocolNameVersion(String rtuAddr){
      TcpSession tcpSe = map.get(rtuAddr) ;
      if(tcpSe != null){
         return new Object[]{tcpSe.protocolName, tcpSe.protocolVersion};
      }else{
         return null ;
      }
   }
   /**
    * 得到所有在线与离线数量统计
    * @return [0]=在线数量,[2]上线过,但当前离线的数量
    */
   public static Integer[] allOnLineStateStatistics(){
      synchronized (map){
         Integer[] arr = new Integer[]{0, 0} ;
         Iterator<Entry<String, TcpSession>> it = map.entrySet().iterator() ;
         Entry<String, TcpSession> entry = null ;
         while(it.hasNext()){
            entry = it.next() ;
            if(entry.getValue().ioSession.isConnected()){
               arr[0]++ ;
            }else{
               arr[1]++ ;
            }
         }
         return arr ;
      }
   }
   
   /**
    * 得到所有在线情况
    * @return
    */
   public static HashMap<String, Boolean> allOnLine(){
      synchronized (sessionTable){
      synchronized (map){
         HashMap<String, Boolean> map = new HashMap<String, Boolean>();
         Iterator<Entry<String, TcpSession>> it = sessionTable.entrySet().iterator() ;
         Iterator<Entry<String, TcpSession>> it = TcpSessionCache.map.entrySet().iterator() ;
         Entry<String, TcpSession> entry = null ;
         while(it.hasNext()){
            entry = it.next() ;
@@ -117,35 +174,54 @@
         return map ;
      }
   }
   /**
    * 得到所有RTU连接状态情况
    * 得到部分在线情况
    * @return
    */
   public static List<RtuSessionStatus> allConnectStatus(){
      synchronized (sessionTable){
         List<RtuSessionStatus> list = new ArrayList<RtuSessionStatus>();
         Iterator<Entry<String, TcpSession>> it = sessionTable.entrySet().iterator() ;
         Entry<String, TcpSession> entry = null ;
         while(it.hasNext()){
            entry = it.next() ;
            RtuSessionStatus vo = new RtuSessionStatus() ;
            vo.rtuAddr = entry.getKey() ;
            IoSession se = entry.getValue().ioSession ;
            vo.onTrueOffLine = se.isConnected() ;
            InetSocketAddress sa = (InetSocketAddress)se.getRemoteAddress() ;
            if(sa != null){
               InetAddress inetAddr = sa.getAddress() ;
               if(inetAddr != null){
                  vo.ip = inetAddr.getHostAddress() ;
                  vo.port = sa.getPort() ;
               }
   public static HashMap<String, Boolean> partOnLine(String[] rtuAddrArrGrp){
      synchronized (map){
         HashMap<String, Boolean> map = new HashMap<String, Boolean>();
         for(String rtuAddr : rtuAddrArrGrp){
            TcpSession tcpSe = TcpSessionCache.map.get(rtuAddr) ;
            if(tcpSe != null){
               map.put(rtuAddr, tcpSe.ioSession.isConnected()) ;
            }
            list.add(vo) ;
         }
         return list ;
         return map ;
      }
   }
//
//   /**
//    * 得到所有RTU连接状态情况
//    * @return
//    */
//   public static List<RtuSessionStatus> allConnectStatus(){
//      synchronized (sessionTable){
//         List<RtuSessionStatus> list = new ArrayList<RtuSessionStatus>();
//         Iterator<Entry<String, TcpSession>> it = sessionTable.entrySet().iterator() ;
//         Entry<String, TcpSession> entry = null ;
//         while(it.hasNext()){
//            entry = it.next() ;
//            RtuSessionStatus vo = new RtuSessionStatus() ;
//            vo.rtuAddr = entry.getKey() ;
//            IoSession se = entry.getValue().ioSession ;
//            vo.onTrueOffLine = se.isConnected() ;
//            InetSocketAddress sa = (InetSocketAddress)se.getRemoteAddress() ;
//            if(sa != null){
//               InetAddress inetAddr = sa.getAddress() ;
//               if(inetAddr != null){
//                  vo.ip = inetAddr.getHostAddress() ;
//                  vo.port = sa.getPort() ;
//               }
//            }
//            list.add(vo) ;
//         }
//         return list ;
//      }
//   }
//
   
   /**
    * 得到IoSession
@@ -166,7 +242,7 @@
    * @return
    */
   public static Boolean isConnect(String rtuAddr){
      TcpSession tcpSe = sessionTable.get(rtuAddr) ;
      TcpSession tcpSe = map.get(rtuAddr) ;
      if(tcpSe != null){
         return tcpSe.ioSession.isConnected() ;
      }
@@ -180,7 +256,7 @@
    * @throws Exception
    */
   public static void write(String rtuAddr, byte[] data) throws Exception{
      TcpSession tcpSe = sessionTable.get(rtuAddr) ;
      TcpSession tcpSe = map.get(rtuAddr) ;
      if(tcpSe != null){
         if(tcpSe.ioSession.isConnected()){
            tcpSe.ioSession.write(data) ;
@@ -197,7 +273,7 @@
    * @param rtuAddr
    */
   public static void cacheUpDataTime(String rtuAddr){
      TcpSession tcpSe = sessionTable.get(rtuAddr) ;
      TcpSession tcpSe = map.get(rtuAddr) ;
      if(tcpSe != null){
         tcpSe.lastUpDataTime = System.currentTimeMillis() ;
         tcpSe.lastUpDataTimeForOnlineCtrl = System.currentTimeMillis() ;
@@ -210,8 +286,8 @@
    * 当一定时间内没有上行数据,则认为RTU离线
    */
   public static void updateRtuStatus(Long now){
      synchronized (sessionTable){
         Set<Map.Entry<String, TcpSession>> entrySet = sessionTable.entrySet() ;
      synchronized (map){
         Set<Map.Entry<String, TcpSession>> entrySet = map.entrySet() ;
         Iterator<Map.Entry<String, TcpSession>> it = entrySet.iterator() ;
         Map.Entry<String, TcpSession> entry ;
         TcpSession tcpSe ;
@@ -227,6 +303,7 @@
               if(tcpSe.ioSession != null && tcpSe.ioSession.isConnected()){
                  if(now - tcpSe.lastUpDataTimeForOnlineCtrl > ServerProperties.disconnectedByNoUpDataMinutes){
                     tcpSe.ioSession.closeNow() ;
                     RtuLogDealer.log(entry.getKey(), "因较长时间未收上行数据,认为设备离线");
                  }
               }
            }