zhubaomin
2025-06-24 62afcbeaa5cc328bff01ae1facb18a6b8c03c5bc
pipIrr-platform/pipIrr-web/pipIrr-web-remote/src/main/java/com/dy/pipIrrRemote/monitor/common/Com4MqttCtrl.java
New file
@@ -0,0 +1,194 @@
package com.dy.pipIrrRemote.monitor.common;
import com.alibaba.fastjson2.JSONObject;
import com.dy.common.mw.protocol.Command;
import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
import com.dy.common.util.Callback;
import com.dy.common.util.IDLongGenerator;
import com.dy.common.webUtil.BaseResponse;
import com.dy.common.webUtil.BaseResponseUtils;
import com.dy.pipIrrGlobal.command.ComResultWait;
import com.dy.pipIrrGlobal.pojoPr.PrStManure;
import com.dy.pipIrrGlobal.pojoRm.RmCommandHistory;
import com.dy.pipIrrRemote.common.dto.Dto4MqttBase;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.env.Environment;
import org.springframework.validation.BindingResult;
import org.springframework.web.client.RestTemplate;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
 * @Author: liurunyu
 * @Date: 2025/5/9 14:53
 * @Description
 */
public abstract class Com4MqttCtrl {
    @Autowired
    protected Environment env ;
    @Autowired
    protected RestTemplate restTemplate ;
    @Value("${mw.waitMwRtnResultTimeout}")
    protected int waitMwRtnResultTimeout ;
    @Value("${mw.mqttCallbackUrl_rm}")
    protected String mqttResultSendWebUrl;
    //水肥机对象
    protected PrStManure ctrlPo ;
    //异步等待器
    protected CompletableFuture<MqttSubMsg> feature;
    //命令名称
    protected String comName ;
    //命令日志id
    protected Long comId ;
    /**
     * 发送命令前-1:验证
     * @param comSv
     * @param comCode
     * @param dto
     * @param bindingResult
     * @return
     */
    public BaseResponse<Object> pre1(Com4MqttSv comSv, String comCode, Dto4MqttBase dto, BindingResult bindingResult) {
        if (bindingResult != null && bindingResult.hasErrors()) {
            return BaseResponseUtils.buildError(Objects.requireNonNull(bindingResult.getFieldError()).getDefaultMessage());
        }
        String msg = this.checkDto(dto) ;
        if(msg != null){
            return BaseResponseUtils.buildError("服务端出错," + msg) ;
        }
        return  null ;
    }
    /**
     * 发送命令前-2:获得数据
     * @param comSv
     * @param protocol
     * @param protocolVer
     * @param comCode
     * @param dto
     * @param bindingResult
     * @return
     */
    public BaseResponse<Object> pre2(Com4MqttSv comSv, String protocol, Short protocolVer, String comCode, Dto4MqttBase dto, BindingResult bindingResult) {
        //得到水肥机对象
        ctrlPo = comSv.getManure(dto.getManureId());
        if (ctrlPo == null) {
            return BaseResponseUtils.buildError("服务端出错,从数据库中未得到水肥机数据") ;
        }
        //检查协议
        String msg = comSv.checkProtocol(ctrlPo) ;
        if(msg != null) {
            return BaseResponseUtils.buildError("服务端出错," + msg) ;
        }
        //得到功能码对应的命令名称
        comName = comSv.getCommandName(comCode, protocol, protocolVer) ;
        if(comName == null) {
            return BaseResponseUtils.buildError("服务端出错,未得到功能码对应命令名称") ;
        }
        return  null ;
    }
    /**
     * 发送命令前-3:保存命令日志
     * @param comSv sv对象
     * @param manureId 水肥机ID
     * @param operator 当前用登录用户id(操作人)
     * @param protocol 协议
     * @param protocolVerion 协议
     * @param comCode 功能码
     * @param param 命令参数
     * @return
     */
    public BaseResponse<Object> pre3(Com4MqttSv comSv, Long manureId, Long operator, String protocol, Short protocolVerion, String comCode, Cd4MqttParameter param) {
        comId = new IDLongGenerator().generate();
        //生成并保存命令日志
        RmCommandHistory po = comSv.saveComHistoryPo(comId,
                protocol + protocolVerion ,
                comCode,
                comName,
                manureId,
                ctrlPo.fboxId ,
                param,
                operator);
        if(po == null){
            return BaseResponseUtils.buildError("服务端出错,未能生成并保存命令日志") ;
        }
        return  null ;
    }
    /**
     * 发送命令前-4:准备Feature
     * @return
     */
    public void pre4() {
        feature = new CompletableFuture<>();
        ComResultWait.put(comId, feature);
    }
    /**
     * 发送命令
     * @param comSv
     * @param com
     * @return
     */
    public BaseResponse<Object> doSend(Com4MqttSv comSv, Command com){
        //得到通信中间件发送命令的web URL
        String rqUrl = comSv.get2MwRequestUrl(env, comSv.ContextComSend) ;
        //向通信中间件发送web请求
        BaseResponse res = comSv.sendPostRequest2Mw(restTemplate, rqUrl, com) ;
        //处理通信中间件对web请求的响应
        String msg = comSv.dealMwDealResponse(res) ;
        if(msg != null) {
            return BaseResponseUtils.buildError(msg) ;
        }else{
            return null ;
        }
    }
    /**
     * 发送命令后
     * @return
     */
    public BaseResponse<Object> after(String comCode, Callback callback) {
        try{
            //等待通信中间件通知水肥机执行命令上行数据(命令结果)
            MqttSubMsg subMsg = feature.get(waitMwRtnResultTimeout, TimeUnit.SECONDS);
            return BaseResponseUtils.buildSuccess(this.dealComResult(comCode, subMsg, callback));
        }catch (Exception e){
            return BaseResponseUtils.buildFail("等待通信中间件通知命令结果超时");
        }
    }
    /**
     * 发送命令最后
     * @return
     */
    public void end(){
        try {
            //最后清除CompletableFuture缓存
            if(ComResultWait.contain(comId)){
                ComResultWait.remove(comId);
            }
        }catch (Exception ee){}
    }
    /**
     * 验证
     * @param dto
     * @return
     */
    protected abstract String checkDto(Dto4MqttBase dto) ;
    /**
     * 生成命令返回信息
     */
    protected abstract String dealComResult(String code, MqttSubMsg subMsg, Callback callback);
}