package com.dy.rtuMw.server.tasks; import java.net.InetSocketAddress; import com.dy.common.mw.protocol.*; import com.dy.common.springUtil.SpringContextUtil; import com.dy.common.util.Callback; import com.dy.rtuMw.server.upgrade.UpgradeUnit; import com.dy.rtuMw.web.com.CommandCtrl; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.mina.core.session.IoSession; import com.dy.common.mw.channel.tcp.TcpIoSessionAttrIdIsRtuAddr; import com.dy.common.mw.channel.tcp.TcpUnit; import com.dy.common.mw.core.CoreTask; import com.dy.rtuMw.server.ServerProperties; import com.dy.rtuMw.server.forTcp.RtuLogDealer; import com.dy.rtuMw.server.forTcp.RtuStatusDealer; import com.dy.rtuMw.server.forTcp.TcpSessionCache; import com.dy.common.util.ByteUtil; public class RtuUpTask extends CoreTask { private static final Logger log = LogManager.getLogger(RtuUpTask.class.getName()); @Override public Integer execute() { Object[] os = (Object[])this.data ; IoSession session = (IoSession)os[0] ; byte[] upBuf = (byte[])os[1] ; try { this.upData(session, upBuf); } catch (Exception e) { log.error("解析上行数据出错" + (e.getMessage()==null?"!":("," + e.getMessage())) ,e); } return null ; } /** * RTU上行数据 * @param session IO会话 * @param upBuf 上行数据 */ private void upData(IoSession session, byte[] upBuf) throws Exception{ if(upBuf == null){ log.error("出错,收到RTU的数据为空!") ; return ; } String upHex = null ; try { upHex = ByteUtil.bytes2Hex(upBuf , true) ; log.info("收到RTU数据:" + upHex) ; } catch (Exception e) { e.printStackTrace(); log.error("将数据转换为十六进制时出错!" ) ; } String rtuAddr = (String)session.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrIdKey) ; String protocolName = null ; Short protocolVersion = null ; boolean isOnLine = false ; if(rtuAddr == null){ //说明刚建立网络连接,此数据应该是上线数据 isOnLine = true ; Object[] rtuAddrProtocolNameVersion = this.parseOnLine(session, upBuf) ; if(rtuAddrProtocolNameVersion.length == 3){ //解析上线数据成功,并解析出RTU地址及通信协议名称 rtuAddr = (String)rtuAddrProtocolNameVersion[0] ; protocolName = (String)rtuAddrProtocolNameVersion[1] ; protocolVersion = (Short)rtuAddrProtocolNameVersion[2] ; if(rtuAddr != null && protocolName != null){ //设置session的属性ID TcpUnit.getInstance().setIoSessionArrId(session, rtuAddr); //缓存session TcpSessionCache.putNewTcpSession(rtuAddr, protocolName, protocolVersion, session); log.info("RTU(地址:" + rtuAddr + ",协议:" + protocolName + ",协议版本号:" + protocolVersion + ")上线了。") ; } } } if(rtuAddr != null){ boolean toDeal = true ; if(ServerProperties.onlyDealRtusTest){ boolean find = false ; for(String testRtu : ServerProperties.onlyDealRtus){ if(testRtu.equals(rtuAddr)){ find = true ; break ; } } if(!find){ //不在处理范围内 toDeal = false ; } } if(toDeal){ if(protocolName == null){ Object[] objs = TcpSessionCache.getTcpProtocolNameVersion(rtuAddr) ; protocolName = (String)objs[0] ; protocolVersion = (Short)objs[1] ; } //设置收到数据时刻 TcpSessionCache.cacheUpDataTime(rtuAddr); if(protocolName != null){ //对上行数据进行处理 this.dealUpData(session, rtuAddr, protocolName, protocolVersion, isOnLine, upBuf, upHex) ; } } } } /** * 解析上线数据 * @param session IO会话 * @param upBuf 上行数据 */ private Object[] parseOnLine(IoSession session, byte[] upBuf){ String rtuAddr = null ; String protocolName = null ; Short protocolVersion = null ; try { OnLine.OnLineResult rs = new OnLineHandle().parse(upBuf) ; if(rs == null || rs.result == OnLine.OnLineAction_fail || rs.result == OnLine.OnLineAction_success_noMe){ log.error("严重错误,解析上线数据失败 !" ) ; }else if(rs.result == OnLine.OnLineAction_success){ if(rs.rtuAddr == null){ log.error("严重错误,解析上线结果中RTU地址为空 !" ) ; }else{ rtuAddr = rs.rtuAddr ; } if(rs.protocolName == null){ log.error("严重错误,解析上线结果中协议名称为空 !" ) ; }else{ protocolName = rs.protocolName ; protocolVersion = rs.protocolVersion ; } }else if(rs.result == OnLine.OnLineAction_success_response){ if(rs.remoteData != null && rs.remoteData.length > 0){ session.write(rs.remoteData) ; }else{ log.error("严重错误,解析上线成功,并需要回写数据,但数据为空 !" ) ; } } } catch (Exception e) { log.error("严重错误,分析上线数据时产生异常 !\n" + e.getMessage() , e) ; } return new Object[]{rtuAddr, protocolName, protocolVersion} ; } /** * 处理上行数据 * @param session IO会话 * @param rtuAddrAtHead 控制器地址头部 * @param protocolName 协议名称 * @param protocolVersion 协议版本号 * @param isOnLine 是否上线数据 * @param upBuf 上行数据 * @param upHex 上行数据 * @throws Exception 异常 */ private void dealUpData(IoSession session, String rtuAddrAtHead, String protocolName, Short protocolVersion, boolean isOnLine, byte[] upBuf, String upHex) throws Exception{ Driver dri = ProtocolCache.getDriver(protocolName) ; if(dri == null){ log.error("严重错误,未能得到协议" + protocolName + "驱动类实例!"); }else{ MidResult[] midRs = dri.parseData(ServerProperties.isLowPower, rtuAddrAtHead, upBuf, upHex, new DriverParserDataCallback(){ @Override public void callback(String rtuAddrAtHead, String code, String codeName, String upHex, Boolean reportOrResponse_trueOrFalse, boolean parseFail, String rtuAddrInData) { //更新终端状态 if(rtuAddrInData != null && !rtuAddrInData.equals(rtuAddrAtHead)){ //数据头中的RTU地址与数据中的RTU地址不一致,更换成数据中的RTU地址 TcpSessionCache.changeRtuAddr(rtuAddrAtHead, rtuAddrInData, protocolName, protocolVersion, session); session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrIdKey, rtuAddrInData) ; } String rtuAddr ; if(rtuAddrInData != null){ rtuAddr = rtuAddrInData ; }else{ rtuAddr = rtuAddrAtHead ; } InetSocketAddress sa = (InetSocketAddress)session.getRemoteAddress() ; if(isOnLine){ //上线了 RtuStatusDealer.onLine(rtuAddr, sa.getAddress().getHostAddress(), sa.getPort(), protocolName, protocolVersion); } if(reportOrResponse_trueOrFalse != null && reportOrResponse_trueOrFalse){ RtuStatusDealer.upAutoReport(rtuAddr, code, codeName, upBuf.length) ; }else{ RtuStatusDealer.upData(rtuAddr, code, codeName, upBuf.length) ; } //记录日志 if(parseFail){ RtuLogDealer.log(rtuAddr, (isOnLine?"上线数据 ":"上行数据 ") + code + ("(" + codeName + ")") + ":" + upHex + "(解析失败)"); }else{ RtuLogDealer.log(rtuAddr, (isOnLine?"上线数据 ":"上行数据 ") + code + ("(" + codeName + ")") + ":" + upHex); } //触发远程RTU软件升级 UpgradeUnit.getInstance().trigger(rtuAddr, code, protocolName, protocolVersion, new Callback() { @Override public void call(Object obj) { if(obj != null){ Command com = (Command)obj ; CommandCtrl comCtrl = SpringContextUtil.getBean(CommandCtrl.class) ; if(comCtrl != null){ comCtrl.sendOutComFromLocal(com) ; } } } @Override public void call(Object... objs) { } @Override public void exception(Exception e) { } }); } }) ; if(midRs != null){ for(MidResult rs : midRs){ rs.action(); } } } } }