package com.dy.rtuMw.server.tasks; 
 | 
  
 | 
import java.net.InetSocketAddress; 
 | 
  
 | 
import org.apache.logging.log4j.LogManager; 
 | 
import org.apache.logging.log4j.Logger; 
 | 
import org.apache.mina.core.session.IoSession; 
 | 
  
 | 
import com.dy.common.mw.channel.tcp.TcpIoSessionAttrIdIsRtuAddr; 
 | 
import com.dy.common.mw.channel.tcp.TcpUnit; 
 | 
import com.dy.common.mw.core.CoreTask; 
 | 
import com.dy.common.mw.protocol.DriverParserDataCallback; 
 | 
import com.dy.common.mw.protocol.MidResult; 
 | 
import com.dy.common.mw.protocol.Driver; 
 | 
import com.dy.common.mw.protocol.OnLine; 
 | 
import com.dy.common.mw.protocol.OnLineHandle; 
 | 
import com.dy.common.mw.protocol.ProtocolCache; 
 | 
import com.dy.rtuMw.server.ServerProperties; 
 | 
import com.dy.rtuMw.server.forTcp.RtuLogDealer; 
 | 
import com.dy.rtuMw.server.forTcp.RtuStatusDealer; 
 | 
import com.dy.rtuMw.server.forTcp.TcpSessionCache; 
 | 
import com.dy.common.util.ByteUtil; 
 | 
  
 | 
public class RtuUpTask extends CoreTask { 
 | 
     
 | 
    private static final Logger log = LogManager.getLogger(RtuUpTask.class.getName()); 
 | 
  
 | 
    @Override 
 | 
    public Integer execute() { 
 | 
        Object[] os = (Object[])this.data ; 
 | 
        IoSession session = (IoSession)os[0] ; 
 | 
        byte[] upBuf = (byte[])os[1] ; 
 | 
        try { 
 | 
            this.upData(session, upBuf); 
 | 
        } catch (Exception e) { 
 | 
            log.error("解析上行数据出错" + (e.getMessage()==null?"!":("," + e.getMessage())) ,e); 
 | 
        } 
 | 
        return null ; 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * RTU上行数据 
 | 
     * @param session IO会话 
 | 
     * @param upBuf 上行数据 
 | 
     */ 
 | 
    private void upData(IoSession session, byte[] upBuf) throws Exception{ 
 | 
        if(upBuf == null){ 
 | 
            log.error("出错,收到RTU的数据为空!") ; 
 | 
            return ; 
 | 
        } 
 | 
        String upHex = null ; 
 | 
        try { 
 | 
            upHex = ByteUtil.bytes2Hex(upBuf , true) ; 
 | 
            log.info("收到RTU数据:" + upHex) ; 
 | 
        } catch (Exception e) { 
 | 
            e.printStackTrace(); 
 | 
            log.error("将数据转换为十六进制时出错!" ) ; 
 | 
        } 
 | 
        String rtuAddr = (String)session.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrIdKey) ; 
 | 
        String protocolName = null ; 
 | 
        boolean isOnLine = false ; 
 | 
        if(rtuAddr == null){ 
 | 
            //说明刚建立网络连接,此数据应该是上线数据 
 | 
            isOnLine = true ; 
 | 
            String[] rtuAddrProtocolName = this.parseOnLine(session, upBuf) ; 
 | 
            if(rtuAddrProtocolName.length == 2){ 
 | 
                //解析上线数据成功,并解析出RTU地址及通信协议名称 
 | 
                rtuAddr = rtuAddrProtocolName[0] ; 
 | 
                protocolName = rtuAddrProtocolName[1] ; 
 | 
                 
 | 
                if(rtuAddr != null && protocolName != null){ 
 | 
                    //设置session的属性ID 
 | 
                    TcpUnit.getInstance().setIoSessionArrId(session, rtuAddr); 
 | 
                    //缓存session 
 | 
                    TcpSessionCache.putNewTcpSession(rtuAddr, protocolName, session); 
 | 
                     
 | 
                    log.info("RTU(地址:" + rtuAddr + ",协议:" + protocolName + ")上线了。") ; 
 | 
                } 
 | 
            } 
 | 
        } 
 | 
        if(rtuAddr != null){ 
 | 
            boolean toDeal = true ; 
 | 
            if(ServerProperties.onlyDealRtusTest){ 
 | 
                boolean find = false ; 
 | 
                for(String testRtu : ServerProperties.onlyDealRtus){ 
 | 
                    if(testRtu.equals(rtuAddr)){ 
 | 
                        find = true ; 
 | 
                        break ; 
 | 
                    } 
 | 
                } 
 | 
                if(!find){ 
 | 
                    //不在处理范围内 
 | 
                    toDeal = false ; 
 | 
                } 
 | 
            } 
 | 
            if(toDeal){ 
 | 
                if(protocolName == null){ 
 | 
                    protocolName = TcpSessionCache.getTcpProtocolName(rtuAddr) ; 
 | 
                } 
 | 
  
 | 
                //设置收到数据时刻 
 | 
                TcpSessionCache.cacheUpDataTime(rtuAddr); 
 | 
  
 | 
                if(protocolName != null){ 
 | 
                    //对上行数据进行处理 
 | 
                    this.dealUpData(session, rtuAddr, protocolName, isOnLine, upBuf, upHex) ; 
 | 
                } 
 | 
            } 
 | 
        } 
 | 
    } 
 | 
     
 | 
    /** 
 | 
     * 解析上线数据 
 | 
     * @param session IO会话 
 | 
     * @param upBuf 上行数据 
 | 
     */ 
 | 
    private String[] parseOnLine(IoSession session, byte[] upBuf){ 
 | 
        String rtuAddr = null ; 
 | 
        String protocolName = null ; 
 | 
        try { 
 | 
            OnLine.OnLineResult rs = new OnLineHandle().parse(upBuf) ; 
 | 
            if(rs == null  
 | 
                    || rs.result == OnLine.OnLineAction_fail  
 | 
                    || rs.result == OnLine.OnLineAction_success_noMe){ 
 | 
                log.error("严重错误,解析上线数据失败 !" ) ; 
 | 
            }else if(rs.result == OnLine.OnLineAction_success){ 
 | 
                if(rs.rtuAddr == null){ 
 | 
                    log.error("严重错误,解析上线结果中RTU地址为空 !" ) ; 
 | 
                }else{ 
 | 
                    rtuAddr = rs.rtuAddr ; 
 | 
                } 
 | 
                if(rs.protocolName == null){ 
 | 
                    log.error("严重错误,解析上线结果中协议名称为空 !" ) ; 
 | 
                }else{ 
 | 
                    protocolName = rs.protocolName ; 
 | 
                } 
 | 
            }else if(rs.result == OnLine.OnLineAction_success_response){ 
 | 
                if(rs.remoteData != null && rs.remoteData.length > 0){ 
 | 
                    session.write(rs.remoteData) ; 
 | 
                }else{ 
 | 
                    log.error("严重错误,解析上线成功,并需要回写数据,但数据为空 !" ) ; 
 | 
                } 
 | 
            } 
 | 
        } catch (Exception e) { 
 | 
            log.error("严重错误,分析上线数据时产生异常 !\n" + e.getMessage() , e) ; 
 | 
        } 
 | 
        return new String[]{rtuAddr, protocolName} ; 
 | 
    } 
 | 
     
 | 
     
 | 
    /** 
 | 
     * 处理上行数据 
 | 
     * @param session IO会话 
 | 
     * @param rtuAddrAtHead 控制器地址头部 
 | 
     * @param protocolName 协议名称 
 | 
     * @param isOnLine 是否上线数据 
 | 
     * @param upBuf 上行数据 
 | 
     * @param upHex 上行数据 
 | 
     * @throws Exception 异常 
 | 
     */ 
 | 
    private void dealUpData(IoSession session, String rtuAddrAtHead, String protocolName, boolean isOnLine, byte[] upBuf, String upHex) throws Exception{ 
 | 
        Driver dri = ProtocolCache.getDriver(protocolName) ; 
 | 
        if(dri == null){ 
 | 
            log.error("严重错误,未能得到协议" + protocolName + "驱动类实例!"); 
 | 
        }else{ 
 | 
            MidResult[] midRs = dri.parseData(ServerProperties.isLowPower, rtuAddrAtHead, upBuf, upHex, new DriverParserDataCallback(){ 
 | 
                @Override 
 | 
                public void callback(String rtuAddrAtHead, String code, String codeName, String upHex, Boolean reportOrResponse_trueOrFalse, boolean parseFail, String rtuAddrInData) { 
 | 
                    //更新终端状态 
 | 
                    if(rtuAddrInData != null && !rtuAddrInData.equals(rtuAddrAtHead)){ 
 | 
                        //数据头中的RTU地址与数据中的RTU地址不一致,更换成数据中的RTU地址 
 | 
                        TcpSessionCache.changeRtuAddr(rtuAddrAtHead, rtuAddrInData, protocolName, session); 
 | 
                        session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrIdKey, rtuAddrInData) ; 
 | 
                    } 
 | 
  
 | 
                    String rtuAddr ; 
 | 
                    if(rtuAddrInData != null){ 
 | 
                        rtuAddr = rtuAddrInData  ; 
 | 
                    }else{ 
 | 
                        rtuAddr = rtuAddrAtHead ; 
 | 
                    } 
 | 
  
 | 
                    InetSocketAddress sa = (InetSocketAddress)session.getRemoteAddress() ; 
 | 
                    if(isOnLine){ 
 | 
                        //上线了 
 | 
                        RtuStatusDealer.onLine(rtuAddr, sa.getAddress().getHostAddress(), sa.getPort()); 
 | 
                    } 
 | 
                    if(reportOrResponse_trueOrFalse != null && reportOrResponse_trueOrFalse){ 
 | 
                        RtuStatusDealer.upReport(rtuAddr, upBuf.length) ; 
 | 
                    }else{ 
 | 
                        RtuStatusDealer.upData(rtuAddr, upBuf.length) ; 
 | 
                    } 
 | 
  
 | 
                    //记录日志 
 | 
                    if(parseFail){ 
 | 
                        RtuLogDealer.log(rtuAddr, (isOnLine?"上线数据 ":"上行数据 ") +  code + ("(" + codeName + ")") + ":" + upHex + "(解析失败)"); 
 | 
                    }else{ 
 | 
                        RtuLogDealer.log(rtuAddr, (isOnLine?"上线数据 ":"上行数据 ") +  code + ("(" + codeName + ")") + ":" + upHex); 
 | 
                    } 
 | 
                } 
 | 
            }) ; 
 | 
            if(midRs != null){ 
 | 
                for(MidResult rs : midRs){ 
 | 
                    rs.action(); 
 | 
                } 
 | 
            } 
 | 
        } 
 | 
    } 
 | 
  
 | 
} 
 |