package com.dy.rtuMw.server.tasks;
|
|
import java.net.InetSocketAddress;
|
|
import com.dy.common.mw.protocol.*;
|
import com.dy.common.springUtil.SpringContextUtil;
|
import com.dy.common.util.Callback;
|
import com.dy.rtuMw.server.upgrade.UpgradeUnit;
|
import com.dy.rtuMw.web.com.CommandCtrl;
|
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.Logger;
|
import org.apache.mina.core.session.IoSession;
|
|
import com.dy.common.mw.channel.tcp.TcpIoSessionAttrIdIsRtuAddr;
|
import com.dy.common.mw.channel.tcp.TcpUnit;
|
import com.dy.common.mw.core.CoreTask;
|
import com.dy.rtuMw.server.ServerProperties;
|
import com.dy.rtuMw.server.forTcp.RtuLogDealer;
|
import com.dy.rtuMw.server.forTcp.RtuStatusDealer;
|
import com.dy.rtuMw.server.forTcp.TcpSessionCache;
|
import com.dy.common.util.ByteUtil;
|
|
public class RtuUpTask extends CoreTask {
|
|
private static final Logger log = LogManager.getLogger(RtuUpTask.class.getName());
|
|
@Override
|
public Integer execute() {
|
Object[] os = (Object[])this.data ;
|
IoSession session = (IoSession)os[0] ;
|
byte[] upBuf = (byte[])os[1] ;
|
try {
|
this.upData(session, upBuf);
|
} catch (Exception e) {
|
log.error("解析上行数据出错" + (e.getMessage()==null?"!":("," + e.getMessage())) ,e);
|
}
|
return null ;
|
}
|
|
/**
|
* RTU上行数据
|
* @param session IO会话
|
* @param upBuf 上行数据
|
*/
|
private void upData(IoSession session, byte[] upBuf) throws Exception{
|
if(upBuf == null){
|
log.error("出错,收到RTU的数据为空!") ;
|
return ;
|
}
|
String upHex = null ;
|
try {
|
upHex = ByteUtil.bytes2Hex(upBuf , true) ;
|
log.info("收到RTU数据:" + upHex) ;
|
} catch (Exception e) {
|
e.printStackTrace();
|
log.error("将数据转换为十六进制时出错!" ) ;
|
}
|
String rtuAddr = (String)session.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrRtuAddr) ;
|
String protocolName = (String)session.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolName) ;
|
Short protocolVersion = (Short)session.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolVersion) ;
|
boolean isOnLine = false ;
|
if(rtuAddr == null){
|
//说明刚建立网络连接,此数据应该是上线数据
|
isOnLine = true ;
|
Object[] rtuAddrProtocolNameVersion = this.parseOnLine(session, upBuf) ;
|
if(rtuAddrProtocolNameVersion.length == 3){
|
//解析上线数据成功,并解析出RTU地址及通信协议名称
|
rtuAddr = (String)rtuAddrProtocolNameVersion[0] ;
|
protocolName = (String)rtuAddrProtocolNameVersion[1] ;
|
protocolVersion = (Short)rtuAddrProtocolNameVersion[2] ;
|
|
if(rtuAddr != null && protocolName != null && protocolVersion != null){
|
//设置session的属性ID
|
TcpUnit.getInstance().setIoSessionArrs(session, rtuAddr, protocolName, protocolVersion) ;
|
//缓存session
|
TcpSessionCache.putNewTcpSession(rtuAddr, session);
|
|
log.info("RTU(地址:" + rtuAddr + ",协议:" + protocolName + ",版本号:" + protocolVersion + ")上线了。") ;
|
}
|
}
|
}
|
if(rtuAddr != null){
|
boolean toDeal = true ;
|
if(ServerProperties.onlyDealRtusTest){
|
boolean find = false ;
|
for(String testRtu : ServerProperties.onlyDealRtus){
|
if(testRtu.equals(rtuAddr)){
|
find = true ;
|
break ;
|
}
|
}
|
if(!find){
|
//不在处理范围内
|
toDeal = false ;
|
}
|
}
|
if(toDeal){
|
/*
|
if(protocolName == null){
|
Object[] objs = TcpSessionCache.getTcpProtocolNameVersion(rtuAddr) ;
|
protocolName = (String)objs[0] ;
|
protocolVersion = (Short)objs[1] ;
|
}
|
*/
|
|
//设置收到数据时刻
|
TcpSessionCache.cacheUpDataTime(rtuAddr);
|
|
if(protocolName != null){
|
//对上行数据进行处理
|
this.dealUpData(session, rtuAddr, protocolName, protocolVersion, isOnLine, upBuf, upHex) ;
|
}
|
}
|
}
|
}
|
|
/**
|
* 解析上线数据
|
* @param session IO会话
|
* @param upBuf 上行数据
|
*/
|
private Object[] parseOnLine(IoSession session, byte[] upBuf){
|
String rtuAddr = null ;
|
String protocolName = null ;
|
Short protocolVersion = null ;
|
try {
|
OnLine.OnLineResult rs = new OnLineHandle().parse(upBuf) ;
|
if(rs == null
|
|| rs.result == OnLine.OnLineAction_fail
|
|| rs.result == OnLine.OnLineAction_success_noMe){
|
log.error("严重错误,解析上线数据失败 !" ) ;
|
}else if(rs.result == OnLine.OnLineAction_success){
|
if(rs.rtuAddr == null){
|
log.error("严重错误,解析上线结果中RTU地址为空 !" ) ;
|
}else{
|
rtuAddr = rs.rtuAddr ;
|
}
|
if(rs.protocolName == null){
|
log.error("严重错误,解析上线结果中协议名称为空 !" ) ;
|
}else{
|
protocolName = rs.protocolName ;
|
protocolVersion = rs.protocolVersion ;
|
}
|
}else if(rs.result == OnLine.OnLineAction_success_response){
|
if(rs.remoteData != null && rs.remoteData.length > 0){
|
session.write(rs.remoteData) ;
|
}else{
|
log.error("严重错误,解析上线成功,并需要回写数据,但数据为空 !" ) ;
|
}
|
}
|
} catch (Exception e) {
|
log.error("严重错误,分析上线数据时产生异常 !\n" + e.getMessage() , e) ;
|
}
|
return new Object[]{rtuAddr, protocolName, protocolVersion} ;
|
}
|
|
|
/**
|
* 处理上行数据
|
* @param session IO会话
|
* @param rtuAddrAtHead 控制器地址头部
|
* @param protocolName 协议名称
|
* @param protocolVersion 协议版本号
|
* @param isOnLine 是否上线数据
|
* @param upBuf 上行数据
|
* @param upHex 上行数据
|
* @throws Exception 异常
|
*/
|
private void dealUpData(IoSession session,
|
String rtuAddrAtHead,
|
String protocolName,
|
Short protocolVersion,
|
boolean isOnLine,
|
byte[] upBuf,
|
String upHex) throws Exception{
|
Driver dri = ProtocolCache.getDriver(protocolName, protocolVersion) ;
|
if(dri == null){
|
log.error("严重错误,未能得到协议" + protocolName + "驱动类实例!");
|
}else{
|
MidResult[] midRs = dri.parseData(ServerProperties.isLowPower, rtuAddrAtHead, upBuf, upHex, new DriverParserDataCallback(){
|
@Override
|
public void callback(String rtuAddrAtHead, String code, String codeName, String upHex, Boolean reportOrResponse_trueOrFalse, boolean parseFail, String rtuAddrInData) {
|
//更新终端状态
|
if(rtuAddrInData != null && !rtuAddrInData.equals(rtuAddrAtHead)){
|
//数据头中的RTU地址与数据中的RTU地址不一致,更换成数据中的RTU地址
|
TcpSessionCache.changeRtuAddr(rtuAddrAtHead, rtuAddrInData, session);
|
session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrRtuAddr, rtuAddrInData) ;
|
}
|
|
String rtuAddr ;
|
if(rtuAddrInData != null){
|
rtuAddr = rtuAddrInData ;
|
}else{
|
rtuAddr = rtuAddrAtHead ;
|
}
|
|
InetSocketAddress sa = (InetSocketAddress)session.getRemoteAddress() ;
|
if(isOnLine){
|
//上线了
|
RtuStatusDealer.onLine(rtuAddr, sa.getAddress().getHostAddress(), sa.getPort(), protocolName, protocolVersion);
|
}
|
if(reportOrResponse_trueOrFalse != null && reportOrResponse_trueOrFalse){
|
RtuStatusDealer.upAutoReport(rtuAddr, code, codeName, upBuf.length) ;
|
}else{
|
RtuStatusDealer.upData(rtuAddr, code, codeName, upBuf.length) ;
|
}
|
|
//记录日志
|
if(parseFail){
|
RtuLogDealer.log(rtuAddr, (isOnLine?"上线数据 ":"上行数据 ") + code + ("(" + codeName + ")") + ":" + upHex + "(解析失败)");
|
}else{
|
RtuLogDealer.log(rtuAddr, (isOnLine?"上线数据 ":"上行数据 ") + code + ("(" + codeName + ")") + ":" + upHex);
|
}
|
|
//触发远程RTU软件升级
|
UpgradeUnit.getInstance().trigger(rtuAddr, code, protocolName, protocolVersion, new Callback() {
|
@Override
|
public void call(Object obj) {
|
if(obj != null){
|
Command com = (Command)obj ;
|
CommandCtrl comCtrl = SpringContextUtil.getBean(CommandCtrl.class) ;
|
if(comCtrl != null){
|
comCtrl.sendOutComFromLocal(com) ;
|
}
|
}
|
}
|
@Override
|
public void call(Object... objs) {
|
}
|
@Override
|
public void exception(Exception e) {
|
}
|
});
|
}
|
}) ;
|
if(midRs != null){
|
for(MidResult rs : midRs){
|
rs.action();
|
}
|
}
|
}
|
}
|
|
}
|