pipIrr-platform/pipIrr-global/src/main/java/com/dy/pipIrrGlobal/daoPr/PrIntakeMapper.java
@@ -127,6 +127,23 @@ //List<VoOnLineIntake> getOnLineIntakes(@Param("onLineMap") String onLineMap, @Param("isOnLine") Boolean isOnLine); List<VoOnLineIntake> getOnLineIntakes(Map<?, ?> params); /** * 获取取水口数量(在线或不在线) * @param params * @return */ Long selectIntakesCountForOnLine(Map<?, ?> params); /** * 获取取水口列表在线或不在线) * @param params * @return */ List<VoOnLineIntake> selectIntakesForOnLine(Map<?, ?> params); /** * 根据取水口编号获取取水口对象 * @param params pipIrr-platform/pipIrr-global/src/main/java/com/dy/pipIrrGlobal/rtuMw/CodeLocal.java
New file @@ -0,0 +1,21 @@ package com.dy.pipIrrGlobal.rtuMw; public class CodeLocal { public static final String clock = "LCD0000" ;//查询监控中间件时钟 public static final String onLineAll = "LCD0001" ;//查询所有RTU在线情况 public static final String onLinePart = "LCD0002" ;//查询部分RTU在线情况 public static final String onLineStatistics = "LCD0003" ;//查询所有RTU状态情况 public static final String allProtocols = "LCD0100" ;//查询所有协议配置 public static final String stopTcpSv = "LCD0110" ;//停止TCP服务,不再接入新的TCP连接,已经TCP连接的全部断连接 public static final String recoverTcpSv = "LCD0112" ;//重启TCP服务,接入新的TCP连接 public static final String mwState = "LCD0200" ;//得到通信中间件运行信息 } pipIrr-platform/pipIrr-global/src/main/java/com/dy/pipIrrGlobal/rtuMw/ToRtuMwCom.java
New file @@ -0,0 +1,85 @@ package com.dy.pipIrrGlobal.rtuMw; import com.dy.common.multiDataSource.DataSourceContext; import com.dy.common.mw.protocol.Command; import com.dy.common.mw.protocol.CommandType; import com.dy.common.webUtil.BaseResponse; import org.springframework.core.env.Environment; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.ResponseEntity; import org.springframework.web.client.RestTemplate; import org.springframework.web.util.UriComponentsBuilder; /** * @Author: liurunyu * @Date: 2024/10/23 11:45 * @Description */ public abstract class ToRtuMwCom { /** * pro_mw:属性 * tag从控制器中获取 * key_mw:url的key */ private static final String pro_mw = "mw"; private static final String key_mw = "comSendUrl"; /** * 得到向通信中间件发送数据的URL * @param env * @return */ protected String getToMwUrl(Environment env) { return env.getProperty(pro_mw + "." + DataSourceContext.get() + "." + key_mw); } /** * 创建外部命令(发给控制器) * @param code 命令code * @return */ protected Command createOuterCommand(String comId, String code) { Command com = new Command(); com.id = comId; com.code = code ; com.type = CommandType.outerCommand; return com ; } /** * 创建内部 * @param code 命令code * @return */ protected Command createInnerCommand(String code) { Command com = new Command(); com.id = Command.defaultId; com.code = code ; com.type = CommandType.innerCommand; return com ; } /** * 发送命令 * * @return */ protected BaseResponse sendCom2Mw(RestTemplate restTemplate, String comSendUrl, Command com) { String url = UriComponentsBuilder.fromUriString(comSendUrl) .build() .toUriString(); HttpHeaders headers = new HttpHeaders(); HttpEntity<Command> httpEntity = new HttpEntity<>(com, headers); ResponseEntity<BaseResponse> response = null; try { // 通过Post方式调用接口 response = restTemplate.exchange(url, HttpMethod.POST, httpEntity, BaseResponse.class); } catch (Exception e) { e.printStackTrace(); } return response.getBody(); } } pipIrr-platform/pipIrr-global/src/main/resources/mapper/PrIntakeMapper.xml
@@ -520,6 +520,97 @@ </if> </trim> </select> <!--获取取水口数量(在线或离线或不设状态)--> <select id="selectIntakesCountForOnLine" resultType="java.lang.Long"> SELECT COUNT(*) AS recordCount FROM pr_intake inta INNER JOIN pr_controller con ON con.intakeId = inta.id <if test="onLineMap != null and onLineMap !='' and isOnLine != null"> LEFT JOIN JSON_TABLE( <!--'[{"rtuAddr":"37142501020100215","isOnLine":true},{"rtuAddr":"4000004","isOnLine":true},{"rtuAddr":"dy20240325","isOnLine":false}]',--> #{onLineMap}, '$[*]' COLUMNS ( rtuAddr VARCHAR(20) PATH '$.rtuAddr', isOnLine BOOLEAN PATH '$.isOnLine' ) ) rtus ON con.rtuAddr = rtus.rtuAddr </if> WHERE con.intakeId is not null <if test="isOnLine != null"> AND rtus.isOnLine = #{isOnLine} </if> <if test="intakeNum != null and intakeNum != ''"> AND inta.name = #{intakeNum} </if> </select> <!--获取取水口列表(在线或离线或不设状态)--> <select id="selectIntakesForOnLine" resultType="com.dy.pipIrrGlobal.voPr.VoOnLineIntake"> SELECT inta.id AS intakeId, con.rtuAddr, inta.name AS intakeNum, inta.lng, inta.lat, IFNULL(hou.total_amount, 0) AS totalAmount, alarm.alarm FROM pr_intake inta INNER JOIN pr_controller con ON con.intakeId = inta.id LEFT JOIN rm_on_hour_report_last hou ON hou.intake_id = inta.id LEFT JOIN( SELECT intake_id AS intakeId, CONCAT( IF(alarm_loss = 1, '漏损报警,', ''), IF(alarm_battery_volt = 1, '电池电压报警,', ''), IF(alarm_valve = 1, '阀门报警,', ''), IF(alarm_water_meter_fault = 1, '流量计故障报警,', '') ) AS alarm FROM rm_alarm_state_last WHERE (alarm_loss = 1 OR alarm_battery_volt = 1 OR alarm_valve = 1 OR alarm_water_meter_fault = 1) AND dt >= DATE_SUB(NOW(), INTERVAL 12 HOUR) ) alarm ON alarm.intakeId = inta.id <if test="onLineMap != null and onLineMap !='' and isOnLine != null"> LEFT JOIN JSON_TABLE( <!--'[{"rtuAddr":"37142501020100215","isOnLine":true},{"rtuAddr":"4000004","isOnLine":true},{"rtuAddr":"dy20240325","isOnLine":false}]',--> #{onLineMap}, '$[*]' COLUMNS ( rtuAddr VARCHAR(20) PATH '$.rtuAddr', isOnLine BOOLEAN PATH '$.isOnLine' ) ) rtus ON con.rtuAddr = rtus.rtuAddr </if> WHERE con.intakeId is not null <if test="isOnLine != null"> AND rtus.isOnLine = #{isOnLine} </if> <if test="intakeNum != null and intakeNum != ''"> AND inta.name = #{intakeNum} </if> order by inta.id ASC <trim prefix="limit "> <if test="start != null and count != null"> #{start,javaType=Integer,jdbcType=INTEGER}, #{count,javaType=Integer,jdbcType=INTEGER} </if> </trim> </select> <!--根据取水口编号获取取水口对象--> <select id="getIntakeByName" resultType="com.dy.pipIrrGlobal.voPr.VoOnLineIntake"> pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/forTcp/TcpSessionCache.java
@@ -162,6 +162,25 @@ return map ; } } /** * 得到部分在线情况 * @return */ public static HashMap<String, Boolean> partOnLine(String[] rtuAddrArrGrp){ synchronized (sessionTable){ HashMap<String, Boolean> map = new HashMap<String, Boolean>(); for(String rtuAddr : rtuAddrArrGrp){ TcpSession tcpSe = sessionTable.get(rtuAddr) ; if(tcpSe != null){ map.put(rtuAddr, tcpSe.ioSession.isConnected()) ; } } return map ; } } /** * 得到所有RTU连接状态情况 * @return pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java
@@ -5,6 +5,8 @@ import com.dy.common.mw.protocol.Command; import com.dy.rtuMw.server.local.localProtocol.*; import java.util.HashMap; /** * @Author liurunyu * @Date 2023/12/21 15:56 @@ -21,8 +23,10 @@ String code = com.getCode() ; if(code.equals(CodeLocal.clock)){ return this.clock(com) ; }else if(code.equals(CodeLocal.onLine)){ return this.onLine(com) ; }else if(code.equals(CodeLocal.onAllLine)){ return this.onAllLine(com) ; }else if(code.equals(CodeLocal.onPartLine)){ return this.onPartLine(com) ; }else if(code.equals(CodeLocal.onLineStatistics)){ return this.onLineStateStatistics(com) ; }else if(code.equals(CodeLocal.allProtocols)){ @@ -50,9 +54,23 @@ * 查询所有RTU在线情况 * @throws Exception */ private Command onLine(Command command) throws Exception{ RtuOnLineVo ol = new RtuOnLineDeal().deal() ; return ReturnCommand.successed("查询所有测站在线情况结果", command.getId(), command.getCode(), ol) ; private Command onAllLine(Command command) throws Exception{ HashMap<String, Boolean> map = new RtuOnLineDeal().dealAll() ; return ReturnCommand.successed("查询所有RTU在线情况结果", command.getId(), command.getCode(), map) ; } /** * 查询部分RTU在线情况 * @throws Exception */ private Command onPartLine(Command command) throws Exception{ if(command.param != null && command.param instanceof String && !command.param.equals("")){ String[] rtuAddrGrp = ((String)command.param).split(","); HashMap<String, Boolean> map = new RtuOnLineDeal().dealPart(rtuAddrGrp) ; return ReturnCommand.successed("查询部分RTU在线情况结果", command.getId(), command.getCode(), map) ; }else{ return ReturnCommand.errored("出错,命令参数应该是所查询RTU的地址串", command.getId(), command.getCode()) ; } } /** @@ -61,7 +79,7 @@ */ private Command onLineStateStatistics(Command command) throws Exception{ RtuOnLineStateStatisticsVo vo = new RtuOnLineStateStatisticsDeal().deal() ; return ReturnCommand.successed("查询所有测站在线情况结果", command.getId(), command.getCode(), vo) ; return ReturnCommand.successed("查询所有RTU在线情况结果", command.getId(), command.getCode(), vo) ; } /** @@ -80,7 +98,6 @@ private Command stopTcpSv(Command command) throws Exception{ TcpUnit.getInstance().stop(new UnitCallbackInterface(){ public void call(Object obj) throws Exception { } }); return ReturnCommand.successed("已经启动停止TCP服务", command.getId(), command.getCode(), null) ; pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java
@@ -4,9 +4,11 @@ public static final String clock = "LCD0000" ;//查询监控中间件时钟 public static final String onLine = "LCD0001" ;//查询所有RTU在线情况 public static final String onAllLine = "LCD0001" ;//查询所有RTU在线情况 public static final String onLineStatistics = "LCD0002" ;//查询所有RTU在线情况 public static final String onPartLine = "LCD0002" ;//查询所有RTU在线情况 public static final String onLineStatistics = "LCD0003" ;//查询所有RTU状态情况 public static final String allProtocols = "LCD0100" ;//查询所有协议配置 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/RtuOnLineDeal.java
@@ -2,12 +2,22 @@ import com.dy.rtuMw.server.forTcp.TcpSessionCache; import java.util.HashMap; public class RtuOnLineDeal { /** * 查询在线与不在线情况 */ public RtuOnLineVo deal(){ return (new RtuOnLineVo()).setOnLine(TcpSessionCache.allOnLine()); public HashMap<String, Boolean> dealAll(){ return TcpSessionCache.allOnLine(); } /** * 查询在线与不在线情况 */ public HashMap<String, Boolean> dealPart(String[] rtuAddrGrp){ return TcpSessionCache.partOnLine(rtuAddrGrp) ; } } pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/RtuOnLineVo.java
File was deleted pipIrr-platform/pipIrr-web/pipIrr-web-remote/src/main/java/com/dy/pipIrrRemote/monitor/MonitorCtrl.java
New file @@ -0,0 +1,49 @@ package com.dy.pipIrrRemote.monitor; import com.dy.common.aop.SsoAop; import com.dy.common.webUtil.BaseResponse; import com.dy.common.webUtil.BaseResponseUtils; import com.dy.common.webUtil.QueryResultVo; import com.dy.pipIrrGlobal.voPr.VoOnLineIntake; import io.swagger.v3.oas.annotations.tags.Tag; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.List; /** * @Author: liurunyu * @Date: 2024/10/23 11:32 * @Description */ @Slf4j @Tag(name = "监测控制", description = "监测控制") @RestController @RequestMapping(path = "monitor") @RequiredArgsConstructor public class MonitorCtrl { private final MonitorSv monitorSv; /** * 获取取水口列表(在线和不在线) * * @param qo * @return */ @GetMapping(path = "all_intakes") @SsoAop() public BaseResponse<QueryResultVo<List<VoOnLineIntake>>> allIntakes(QueryVo qo) { try { QueryResultVo<List<VoOnLineIntake>> res = monitorSv.selectOnLineIntakes(qo); return BaseResponseUtils.buildSuccess(res); } catch (Exception e) { log.error("查询取水口异常", e); return BaseResponseUtils.buildException(e.getMessage()); } } } pipIrr-platform/pipIrr-web/pipIrr-web-remote/src/main/java/com/dy/pipIrrRemote/monitor/MonitorSv.java
New file @@ -0,0 +1,174 @@ package com.dy.pipIrrRemote.monitor; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.dy.common.mw.protocol.Command; import com.dy.common.mw.protocol.CommandBackParam; import com.dy.common.webUtil.BaseResponse; import com.dy.common.webUtil.QueryResultVo; import com.dy.pipIrrGlobal.daoPr.PrIntakeMapper; import com.dy.pipIrrGlobal.rtuMw.CodeLocal; import com.dy.pipIrrGlobal.rtuMw.ToRtuMwCom; import com.dy.pipIrrGlobal.voPr.VoOnLineIntake; import lombok.extern.slf4j.Slf4j; import org.apache.dubbo.common.utils.PojoUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @Author: liurunyu * @Date: 2024/10/23 11:32 * @Description */ @Slf4j @Service public class MonitorSv extends ToRtuMwCom { @Autowired private PrIntakeMapper prIntakeMapper; @Autowired private Environment env; @Autowired private RestTemplate restTemplate; /** * 获取已经绑定控制器的取水口列表(并标记在线与不在线) * * @return */ public QueryResultVo<List<VoOnLineIntake>> selectOnLineIntakes(QueryVo vo) { if(vo.getIsOnLine() == null){ //不区分在线与离线 return selectIntakesIgnoreOnOffLine(vo) ; }else{ //在线 或 离线 return selectIntakesOnOrOffLine(vo) ; } } /** * 查询取水口,不限制在线与离线状态 * @param vo * @return */ private QueryResultVo<List<VoOnLineIntake>> selectIntakesIgnoreOnOffLine(QueryVo vo) { QueryResultVo<List<VoOnLineIntake>> rsVo = this.queryDb(vo) ; if(rsVo != null && rsVo.obj != null && (rsVo.obj.size() > 0)){ String rtuAddrs = "" ; boolean first = true ; for(VoOnLineIntake rVo : rsVo.obj){ if(first){ first = false ; rtuAddrs += rVo.getRtuAddr() ; }else{ rtuAddrs += "," + rVo.getRtuAddr() ; } } //向通信中间件发关命令,查询部分RTU在线情况 Command com = this.createInnerCommand(CodeLocal.onLinePart); com.setParam(rtuAddrs) ; String comSendUrl = this.getToMwUrl(this.env) ; BaseResponse res = sendCom2Mw(restTemplate, comSendUrl, com) ; if(res != null){ if(res.isSuccess()){ Command reCom = JSON.parseObject(res.getContent() == null ? null : JSON.toJSONString(res.getContent()), Command.class) ; CommandBackParam bakParam = JSON.parseObject((reCom== null || reCom.param == null) ? null : JSON.toJSONString(reCom.param), CommandBackParam.class) ; if(bakParam != null){ if(bakParam.getSuccess().booleanValue()){ //通信中间件成功返回命令结果 HashMap<String, Boolean> onLineMap = JSON.parseObject(JSON.toJSONString(reCom.getAttachment()), HashMap.class); for(VoOnLineIntake rVo : rsVo.obj){ if(onLineMap.containsKey(rVo.getRtuAddr())) { rVo.setIsOnLine(onLineMap.get(rVo.getRtuAddr())); } } } }else{ log.error("通信中间件返回内部命令结果中不包含CommandBackParam类型参数"); } }else{ log.error("通信中间件返回内部命令执行失败" + (res.getMsg() == null? "" : ("," + res.getMsg()))) ; } }else{ log.error("通信中间件返回内部命令结果为null"); } } return rsVo; } /** * 查询取水口,在线或离线状态 * @param vo * @return */ private QueryResultVo<List<VoOnLineIntake>> selectIntakesOnOrOffLine(QueryVo vo) { //向通信中间件发关命令,查询部分RTU在线情况 Command com = this.createInnerCommand(CodeLocal.onLineAll); String comSendUrl = this.getToMwUrl(this.env) ; BaseResponse res = sendCom2Mw(restTemplate, comSendUrl, com) ; if(res != null){ if(res.isSuccess()){ Command reCom = JSON.parseObject(res.getContent() == null ? null : JSON.toJSONString(res.getContent()), Command.class) ; CommandBackParam bakParam = JSON.parseObject((reCom== null || reCom.param == null) ? null : JSON.toJSONString(reCom.param), CommandBackParam.class) ; if(bakParam != null){ if(bakParam.getSuccess().booleanValue()){ //通信中间件成功返回命令结果 HashMap<String, Boolean> onLineMap = JSON.parseObject(JSON.toJSONString(reCom.getAttachment()), HashMap.class); JSONArray jsonArray = new JSONArray(); for (Map.Entry<String, Boolean> entry : onLineMap.entrySet()) { JSONObject jsonObject = new JSONObject(); jsonObject.put("rtuAddr", entry.getKey()); jsonObject.put("isOnLine", entry.getValue()); jsonArray.add(jsonObject); } vo.setOnLineMap(jsonArray.toJSONString()); } }else{ log.error("通信中间件返回内部命令结果中不包含CommandBackParam类型参数"); } }else{ log.error("通信中间件返回内部命令执行失败" + (res.getMsg() == null? "" : ("," + res.getMsg()))) ; } }else{ log.error("通信中间件返回内部命令结果为null"); } return this.queryDb(vo) ; } /** * 查询数据库 * @param vo * @return */ private QueryResultVo<List<VoOnLineIntake>> queryDb(QueryVo vo){ Map<String, Object> params = (Map<String, Object>) PojoUtils.generalize(vo); Long itemTotal = prIntakeMapper.selectIntakesCountForOnLine(params); QueryResultVo<List<VoOnLineIntake>> rsVo = new QueryResultVo<>(); rsVo.pageSize = vo.pageSize; rsVo.pageCurr = vo.pageCurr; rsVo.calculateAndSet(itemTotal, params); List<VoOnLineIntake> records = prIntakeMapper.selectIntakesForOnLine(params) ; rsVo.obj = records ; for(VoOnLineIntake rVo : records){ if(rVo.getAlarm() != null && !rVo.getAlarm().trim().equals("") && rVo.getAlarm().endsWith(",")){ rVo.setAlarm(rVo.getAlarm().substring(0, rVo.getAlarm().length() - 1)) ; } } return rsVo ; } } pipIrr-platform/pipIrr-web/pipIrr-web-remote/src/main/java/com/dy/pipIrrRemote/monitor/QueryVo.java
New file @@ -0,0 +1,32 @@ package com.dy.pipIrrRemote.monitor; import com.dy.common.webUtil.QueryConditionVo; import lombok.Data; /** * @author ZhuBaoMin * @date 2024-05-27 20:32 * @LastEditTime 2024-05-27 20:32 * @Description */ @Data public class QueryVo extends QueryConditionVo { /** * 取水口编号 */ private String intakeNum; /** * 是否在线 */ private Boolean isOnLine; /** * 非前端设置的参数据,中间件返回的RTU在线情况对象数组 */ private String onLineMap; }