1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package com.dy.aceMw.server.tasks;
 
import java.net.InetSocketAddress;
 
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.mina.core.session.IoSession;
 
import com.dy.common.mw.channel.tcp.TcpIoSessionAttrIdIsRtuAddr;
import com.dy.common.mw.channel.tcp.TcpUnit;
import com.dy.common.mw.core.CoreTask;
import com.dy.common.mw.protocol.DriverParserDataCallback;
import com.dy.common.mw.protocol.MidResult;
import com.dy.common.mw.protocol.Driver;
import com.dy.common.mw.protocol.OnLine;
import com.dy.common.mw.protocol.OnLineHandle;
import com.dy.common.mw.protocol.ProtocolCache;
import com.dy.aceMw.server.ServerProperties;
import com.dy.aceMw.server.forTcp.RtuLogDealer;
import com.dy.aceMw.server.forTcp.RtuStatusDealer;
import com.dy.aceMw.server.forTcp.TcpSessionCache;
import com.dy.common.util.ByteUtil;
 
public class RtuUpTask extends CoreTask {
    
    private static final Logger log = LogManager.getLogger(RtuUpTask.class.getName());
 
    @Override
    public Integer execute() {
        Object[] os = (Object[])this.data ;
        IoSession session = (IoSession)os[0] ;
        byte[] upBuf = (byte[])os[1] ;
        try {
            this.upData(session, upBuf);
        } catch (Exception e) {
            log.error("解析上行数据出错" + (e.getMessage()==null?"!":("," + e.getMessage())) ,e);
        }
        return null ;
    }
 
    /**
     * RTU上行数据
     * @param session IO会话
     * @param upBuf 上行数据
     */
    private void upData(IoSession session, byte[] upBuf) throws Exception{
        if(upBuf == null){
            log.error("出错,收到RTU的数据为空!") ;
            return ;
        }
        String upHex = null ;
        try {
            upHex = ByteUtil.bytes2Hex(upBuf , true) ;
            log.info("收到RTU数据:" + upHex) ;
        } catch (Exception e) {
            e.printStackTrace();
            log.error("将数据转换为十六进制时出错!" ) ;
        }
        String rtuAddr = (String)session.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrIdKey) ;
        String protocolName = null ;
        boolean isOnLine = false ;
        if(rtuAddr == null){
            //说明刚建立网络连接,此数据应该是上线数据
            isOnLine = true ;
            String[] rtuAddrProtocolName = this.parseOnLine(session, upBuf) ;
            if(rtuAddrProtocolName.length == 2){
                //解析上线数据成功,并解析出RTU地址及通信协议名称
                rtuAddr = rtuAddrProtocolName[0] ;
                protocolName = rtuAddrProtocolName[1] ;
                
                if(rtuAddr != null && protocolName != null){
                    //设置session的属性ID
                    TcpUnit.getInstance().setIoSessionArrId(session, rtuAddr);
                    //缓存session
                    TcpSessionCache.putNewTcpSession(rtuAddr, protocolName, session);
                    
                    log.info("RTU(地址:" + rtuAddr + ",协议:" + protocolName + ")上线了。") ;
                }
            }
        }
        
        if(rtuAddr != null){
            if(protocolName == null){
                protocolName = TcpSessionCache.getTcpProtocolName(rtuAddr) ;
            }
            
            //设置收到数据时刻
            TcpSessionCache.cacheUpDataTime(rtuAddr);
            
            if(protocolName != null){
                //对上行数据进行处理
                this.dealUpData(session, rtuAddr, protocolName, isOnLine, upBuf, upHex) ;
            }
        }
    }
    
    /**
     * 解析上线数据
     * @param session IO会话
     * @param upBuf 上行数据
     */
    private String[] parseOnLine(IoSession session, byte[] upBuf){
        String rtuAddr = null ;
        String protocolName = null ;
        try {
            OnLine.OnLineResult rs = new OnLineHandle().parse(upBuf) ;
            if(rs == null 
                    || rs.result == OnLine.OnLineAction_fail 
                    || rs.result == OnLine.OnLineAction_success_noMe){
                log.error("严重错误,解析上线数据失败 !" ) ;
            }else if(rs.result == OnLine.OnLineAction_success){
                if(rs.rtuAddr == null){
                    log.error("严重错误,解析上线结果中RTU地址为空 !" ) ;
                }else{
                    rtuAddr = rs.rtuAddr ;
                }
                if(rs.protocolName == null){
                    log.error("严重错误,解析上线结果中协议名称为空 !" ) ;
                }else{
                    protocolName = rs.protocolName ;
                }
            }else if(rs.result == OnLine.OnLineAction_success_response){
                if(rs.remoteData != null && rs.remoteData.length > 0){
                    session.write(rs.remoteData) ;
                }else{
                    log.error("严重错误,解析上线成功,并需要回写数据,但数据为空 !" ) ;
                }
            }
        } catch (Exception e) {
            log.error("严重错误,分析上线数据时产生异常 !\n" + e.getMessage() , e) ;
        }
        return new String[]{rtuAddr, protocolName} ;
    }
    
    
    /**
     * 处理上行数据
     * @param session IO会话
     * @param rtuAddrAtHead 控制器地址头部
     * @param protocolName 协议名称
     * @param isOnLine 是否上线数据
     * @param upBuf 上行数据
     * @param upHex 上行数据
     * @throws Exception 异常
     */
    private void dealUpData(IoSession session, String rtuAddrAtHead, String protocolName, boolean isOnLine, byte[] upBuf, String upHex) throws Exception{
        Driver dri = ProtocolCache.getDriver(protocolName) ;
        if(dri == null){
            log.error("严重错误,未能得到协议" + protocolName + "驱动类实例!");
        }else{
            MidResult[] midRs = dri.parseData(ServerProperties.isLowPower, rtuAddrAtHead, upBuf, upHex, new DriverParserDataCallback(){
                @Override
                public void callback(String rtuAddrAtHead, String code, String upHex, Boolean reportOrResponse_trueOrFalse, boolean parseFail, String rtuAddrInData) {
                    //更新终端状态
                    if(rtuAddrInData != null && !rtuAddrInData.equals(rtuAddrAtHead)){
                        //数据头中的RTU地址与数据中的RTU地址不一致,更换成数据中的RTU地址
                        TcpSessionCache.changeRtuAddr(rtuAddrAtHead, rtuAddrInData, protocolName, session);
                        session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrIdKey, rtuAddrInData) ;
                    }
 
                    String rtuAddr ;
                    if(rtuAddrInData != null){
                        rtuAddr = rtuAddrInData  ;
                    }else{
                        rtuAddr = rtuAddrAtHead ;
                    }
 
                    InetSocketAddress sa = (InetSocketAddress)session.getRemoteAddress() ;
                    if(isOnLine){
                        //上线了
                        RtuStatusDealer.onLine(rtuAddr, sa.getAddress().getHostAddress(), sa.getPort());
                    }
                    if(reportOrResponse_trueOrFalse != null && reportOrResponse_trueOrFalse){
                        RtuStatusDealer.upReport(rtuAddr, upBuf.length) ;
                    }else{
                        RtuStatusDealer.upData(rtuAddr, upBuf.length) ;
                    }
 
                    //记录日志
                    if(parseFail){
                        RtuLogDealer.log(rtuAddr, (isOnLine?"上线数据 ":"上行数据 ") + "" + code + ":" + upHex + "(解析失败)");
                    }else{
                        RtuLogDealer.log(rtuAddr, (isOnLine?"上线数据 ":"上行数据 ") + "" + code + ":" + upHex);
                    }
                }
            }) ;
            if(midRs != null){
                for(MidResult rs : midRs){
                    rs.action();
                }
            }
        }
    }
 
}