zhubaomin
2025-02-25 842237345ac469b02e9add8f9fd8bae5d4f7cdac
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/RtuUpTask.java
@@ -2,6 +2,11 @@
import java.net.InetSocketAddress;
import com.dy.common.mw.protocol.*;
import com.dy.common.springUtil.SpringContextUtil;
import com.dy.common.util.Callback;
import com.dy.rtuMw.server.upgrade.UpgradeUnit;
import com.dy.rtuMw.web.com.CommandCtrl;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.mina.core.session.IoSession;
@@ -9,12 +14,6 @@
import com.dy.common.mw.channel.tcp.TcpIoSessionAttrIdIsRtuAddr;
import com.dy.common.mw.channel.tcp.TcpUnit;
import com.dy.common.mw.core.CoreTask;
import com.dy.common.mw.protocol.DriverParserDataCallback;
import com.dy.common.mw.protocol.MidResult;
import com.dy.common.mw.protocol.Driver;
import com.dy.common.mw.protocol.OnLine;
import com.dy.common.mw.protocol.OnLineHandle;
import com.dy.common.mw.protocol.ProtocolCache;
import com.dy.rtuMw.server.ServerProperties;
import com.dy.rtuMw.server.forTcp.RtuLogDealer;
import com.dy.rtuMw.server.forTcp.RtuStatusDealer;
@@ -56,25 +55,27 @@
         e.printStackTrace();
         log.error("将数据转换为十六进制时出错!" ) ;
      }
      String rtuAddr = (String)session.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrIdKey) ;
      String protocolName = null ;
      String rtuAddr = (String)session.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrRtuAddr) ;
      String protocolName = (String)session.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolName) ;
      Short protocolVersion = (Short)session.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolVersion) ;
      boolean isOnLine = false ;
      if(rtuAddr == null){
         //说明刚建立网络连接,此数据应该是上线数据
         isOnLine = true ;
         String[] rtuAddrProtocolName = this.parseOnLine(session, upBuf) ;
         if(rtuAddrProtocolName.length == 2){
         Object[] rtuAddrProtocolNameVersion = this.parseOnLine(session, upBuf) ;
         if(rtuAddrProtocolNameVersion.length == 3){
            //解析上线数据成功,并解析出RTU地址及通信协议名称
            rtuAddr = rtuAddrProtocolName[0] ;
            protocolName = rtuAddrProtocolName[1] ;
            if(rtuAddr != null && protocolName != null){
            rtuAddr = (String)rtuAddrProtocolNameVersion[0] ;
            protocolName = (String)rtuAddrProtocolNameVersion[1] ;
            protocolVersion = (Short)rtuAddrProtocolNameVersion[2] ;
            if(rtuAddr != null && protocolName != null && protocolVersion != null){
               //设置session的属性ID
               TcpUnit.getInstance().setIoSessionArrId(session, rtuAddr);
               TcpUnit.getInstance().setIoSessionArrs(session, rtuAddr, protocolName, protocolVersion) ;
               //缓存session
               TcpSessionCache.putNewTcpSession(rtuAddr, protocolName, session);
               TcpSessionCache.putNewTcpSession(rtuAddr, session);
               
               log.info("RTU(地址:" + rtuAddr + ",协议:" + protocolName + ")上线了。") ;
               log.info("RTU(地址:" + rtuAddr + ",协议:" + protocolName + ",版本号:" + protocolVersion + ")上线了。") ;
            }
         }
      }
@@ -94,16 +95,20 @@
            }
         }
         if(toDeal){
            /*
            if(protocolName == null){
               protocolName = TcpSessionCache.getTcpProtocolName(rtuAddr) ;
               Object[] objs = TcpSessionCache.getTcpProtocolNameVersion(rtuAddr) ;
               protocolName = (String)objs[0] ;
               protocolVersion = (Short)objs[1] ;
            }
            */
            //设置收到数据时刻
            TcpSessionCache.cacheUpDataTime(rtuAddr);
            TcpSessionCache.whenUpData(rtuAddr);
            if(protocolName != null){
               //对上行数据进行处理
               this.dealUpData(session, rtuAddr, protocolName, isOnLine, upBuf, upHex) ;
               this.dealUpData(session, rtuAddr, protocolName, protocolVersion, isOnLine, upBuf, upHex) ;
            }
         }
      }
@@ -114,9 +119,10 @@
    * @param session IO会话
    * @param upBuf 上行数据
    */
   private String[] parseOnLine(IoSession session, byte[] upBuf){
   private Object[] parseOnLine(IoSession session, byte[] upBuf){
      String rtuAddr = null ;
      String protocolName = null ;
      Short protocolVersion = null ;
      try {
         OnLine.OnLineResult rs = new OnLineHandle().parse(upBuf) ;
         if(rs == null 
@@ -133,6 +139,7 @@
               log.error("严重错误,解析上线结果中协议名称为空 !" ) ;
            }else{
               protocolName = rs.protocolName ;
               protocolVersion = rs.protocolVersion ;
            }
         }else if(rs.result == OnLine.OnLineAction_success_response){
            if(rs.remoteData != null && rs.remoteData.length > 0){
@@ -144,7 +151,7 @@
      } catch (Exception e) {
         log.error("严重错误,分析上线数据时产生异常 !\n" + e.getMessage() , e) ;
      }
      return new String[]{rtuAddr, protocolName} ;
      return new Object[]{rtuAddr, protocolName, protocolVersion} ;
   }
   
   
@@ -153,24 +160,38 @@
    * @param session IO会话
    * @param rtuAddrAtHead 控制器地址头部
    * @param protocolName 协议名称
    * @param protocolVersion 协议版本号
    * @param isOnLine 是否上线数据
    * @param upBuf 上行数据
    * @param upHex 上行数据
    * @throws Exception 异常
    */
   private void dealUpData(IoSession session, String rtuAddrAtHead, String protocolName, boolean isOnLine, byte[] upBuf, String upHex) throws Exception{
      Driver dri = ProtocolCache.getDriver(protocolName) ;
   private void dealUpData(IoSession session,
                     String rtuAddrAtHead,
                     String protocolName,
                     Short protocolVersion,
                     boolean isOnLine,
                     byte[] upBuf,
                     String upHex) throws Exception{
      Driver dri = ProtocolCache.getDriver(protocolName, protocolVersion) ;
      if(dri == null){
         log.error("严重错误,未能得到协议" + protocolName + "驱动类实例!");
      }else{
         MidResult[] midRs = dri.parseData(ServerProperties.isLowPower, rtuAddrAtHead, upBuf, upHex, new DriverParserDataCallback(){
            @Override
            public void callback(String rtuAddrAtHead, String code, String codeName, String upHex, Boolean reportOrResponse_trueOrFalse, boolean parseFail, String rtuAddrInData) {
            public void callback(String rtuAddrAtHead,
                            String code,
                            String codeName,
                            String upHex,
                            Boolean reportOrResponse_trueOrFalse,
                            boolean parseFail,
                            String rtuAddrInData,
                            Object ...objs) {
               //更新终端状态
               if(rtuAddrInData != null && !rtuAddrInData.equals(rtuAddrAtHead)){
                  //数据头中的RTU地址与数据中的RTU地址不一致,更换成数据中的RTU地址
                  TcpSessionCache.changeRtuAddr(rtuAddrAtHead, rtuAddrInData, protocolName, session);
                  session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrIdKey, rtuAddrInData) ;
                  TcpSessionCache.changeRtuAddr(rtuAddrAtHead, rtuAddrInData, session);
                  session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrRtuAddr, rtuAddrInData) ;
               }
               String rtuAddr ;
@@ -183,12 +204,12 @@
               InetSocketAddress sa = (InetSocketAddress)session.getRemoteAddress() ;
               if(isOnLine){
                  //上线了
                  RtuStatusDealer.onLine(rtuAddr, sa.getAddress().getHostAddress(), sa.getPort());
                  RtuStatusDealer.onLine(rtuAddr, sa.getAddress().getHostAddress(), sa.getPort(), protocolName, protocolVersion);
               }
               if(reportOrResponse_trueOrFalse != null && reportOrResponse_trueOrFalse){
                  RtuStatusDealer.upReport(rtuAddr, upBuf.length) ;
                  RtuStatusDealer.upAutoReport(rtuAddr, code, codeName, upBuf.length) ;
               }else{
                  RtuStatusDealer.upData(rtuAddr, upBuf.length) ;
                  RtuStatusDealer.upData(rtuAddr, code, codeName, upBuf.length) ;
               }
               //记录日志
@@ -197,6 +218,26 @@
               }else{
                  RtuLogDealer.log(rtuAddr, (isOnLine?"上线数据 ":"上行数据 ") +  code + ("(" + codeName + ")") + ":" + upHex);
               }
               //触发远程RTU软件升级
               UpgradeUnit.getInstance().trigger(rtuAddr, code, protocolName, protocolVersion, new Callback() {
                  @Override
                  public void call(Object obj) {
                     if(obj != null){
                        Command com = (Command)obj ;
                        CommandCtrl comCtrl = SpringContextUtil.getBean(CommandCtrl.class) ;
                        if(comCtrl != null){
                           comCtrl.sendOutComFromLocal(com) ;
                        }
                     }
                  }
                  @Override
                  public void call(Object... objs) {
                  }
                  @Override
                  public void exception(Exception e) {
                  }
               }, objs);
            }
         }) ;
         if(midRs != null){