pipIrr-platform/pipIrr-web/pipIrr-web-remote/src/main/java/com/dy/pipIrrRemote/rtuUpgrage/RtuUpgradeStateReceiverCtrl.java
@@ -1,12 +1,29 @@
package com.dy.pipIrrRemote.rtuUpgrage;
import com.dy.common.contant.Constant;
import com.dy.common.multiDataSource.DataSourceContext;
import com.dy.common.softUpgrade.state.UpgradeInfo;
import com.dy.common.softUpgrade.state.UpgradeRtu;
import com.dy.common.softUpgrade.state.UpgradeState;
import com.dy.common.util.Callback;
import com.dy.common.util.CreateRandom;
import com.dy.common.util.DateTime;
import com.dy.common.util.ThreadJob;
import com.dy.common.webUtil.BaseResponse;
import com.dy.pipIrrGlobal.pojoUg.UgRtuProgram;
import com.dy.pipIrrGlobal.pojoUg.UgRtuTask;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
/**
 * @Author: liurunyu
@@ -18,22 +35,334 @@
@RestController
@RequestMapping(path = "rtuUpgradeStateReceiver")
public class RtuUpgradeStateReceiverCtrl {
    protected static UpgradeInfo cache ;
    @Autowired
    private RtuUpgradeSv sv ;
    /**
     * 强制结束升级任务通信中间件成功执行后
     */
    public static void afterMwForceOverCurUgTask(){
        cache = null ;
    }
    /**
     * rtu远程升级任务通信中间件执行情况统计回收
     * @param vo 数据
     * @param info 数据
     * @return 操作结果
     */
    @Hidden //不公开接口,其只有通信中间件调用
    @PostMapping(path = "/receive")
    public BaseResponse<Boolean> receive(@RequestBody UpgradeInfo vo){
        log.info("接收到的RTU远程升级状态数据为:{}",vo.toString());
        if(vo.ugRtuStateList != null && vo.ugRtuStateList.size() > 0){
            if(vo.ugRtuStateList.size() < 10){
                for (UpgradeRtu upgradeRtu : vo.ugRtuStateList) {
                    log.info(upgradeRtu.toString());
    public BaseResponse<Boolean> receive(@RequestBody UpgradeInfo info, HttpServletRequest req, HttpServletResponse rep){
        /*
        log.info("接收到的RTU远程升级状态数据为:{}", info.toString("532328000214"));
        if(info.ugRtuStateList != null && info.ugRtuStateList.size() > 0){
            if(info.ugRtuStateList.size() < 10){
                for (UpgradeRtu rtuVo : info.ugRtuStateList) {
                    log.info(rtuVo.toString());
                }
            }
        }
         */
        //进行排序
        Comparator<UpgradeRtu> comparator = Comparator.comparing(UpgradeRtu::getRtuAddr, Comparator.naturalOrder());
        info.ugRtuStateList = info.ugRtuStateList.stream().sorted(comparator).collect(Collectors.toList());
        //通信中间件传过来的机构tag,以用于查找数据源
        String token = req.getHeader(Constant.UserTokenKeyInHeader);
        DataSourceContext.set(token);
        if(cache == null){
            cache = info;
            //此时不做任务操作,只保障cache不为空, 等待下次发来数据
            if(info.ugRtuStateList != null && info.ugRtuStateList.size() > 0){
                List<UpgradeRtu> overList = info.ugRtuStateList.stream().filter(itemVo -> itemVo.isOver).collect(Collectors.toList()) ;
                if(overList != null && overList.size() > 0){
                    this.save2Db(info.ugTaskId, overList);
                }
            }
        }else{
            //当cache中有值时,进行比对存储,对比目的是防止重复操作数据库
            if(info.ugRtuStateList != null && info.ugRtuStateList.size() > 0){
                //此时保证两个集合都不为null
                this.save2Db(info.ugTaskId, info.ugRtuStateList, cache.ugRtuStateList);
            }
            //cache赋值必须放在上面处理的后面,否则上面的比较不成功
            cache = info;
            if(info.ugOverallState != null && info.ugOverallState.allOver){
                this.saveTaskOver(info.ugTaskId) ;
            }
        }
        return null;
    }
    /**
     * 比对并数据库存储
     * @param taskId
     * @param list
     */
    private void save2Db(String taskId, List<UpgradeRtu> list){
        list.stream().forEach(vo -> {
            if(vo != null) {
                this.sv.saveRtuUpgradeState(Long.parseLong(taskId), vo);
            }
        });
    }
    /**
     * 比对并数据库存储
     * @param newList
     * @param oldList
     */
    private void save2Db(String taskId, List<UpgradeRtu> newList, List<UpgradeRtu> oldList){
        List<UpgradeRtu> newOverList = newList.stream().filter(vo -> vo.isOver).collect(Collectors.toList()) ;
        List<UpgradeRtu> oldNoOverList = newList.stream().filter(vo -> !vo.isOver).collect(Collectors.toList()) ;
        boolean oldExist = false ;
        for(UpgradeRtu nvo : newOverList){
            oldExist = false ;
            if(oldNoOverList.stream().anyMatch(vo -> vo.rtuAddr.equals(nvo.rtuAddr))){
                oldExist = true ;
            }
            if(!oldExist){
                //上次没有升级结束,而当前升级结束了
                this.sv.saveRtuUpgradeState(Long.parseLong(taskId), nvo);
            }
        }
    }
    /**
     * 保存升级任务已经执行完成
     * @param taskId
     */
    private void saveTaskOver(String taskId){
        this.sv.updateTaskOver(taskId) ;
    }
    /////////////////////////////////////////////////////
    //
    // 以下模拟数据
    //
    /////////////////////////////////////////////////////
    private static ThreadJob threadJob ;
    protected void resetDemo(){
        if(threadJob != null){
            threadJob.stop() ;
            threadJob = null ;
        }
        cache = null ;
    }
    protected void demo(){
        if(cache == null){
            UgRtuTask tpo = this.sv.selectLastTask() ;
            if(tpo != null) {
                UgRtuProgram ppo = this.sv.selectProgramById(tpo.programId);
                if (ppo != null) {
                    List<String> taskRtuAddrs = this.sv.selectAllRtuAddrByTask("" + tpo.id);
                    if (taskRtuAddrs != null && taskRtuAddrs.size() > 0) {
                        cache = new UpgradeInfo();
                        cache.ugTaskId = "" + tpo.id;
                        cache.ugOverallState = new UpgradeState() ;
                        cache.ugOverallState.rtuTotal = taskRtuAddrs.size() ;
                        cache.ugRtuStateList = new ArrayList<>() ;
                        for (String addr : taskRtuAddrs) {
                            UpgradeRtu rtu = new UpgradeRtu() ;
                            rtu.rtuAddr = addr ;
                            rtu.state = UpgradeRtu.STATE_UNSTART ;
                            rtu.totalPackage = (ppo.programBytes.length / 512) + ((ppo.programBytes.length % 512)>0?1:0) ;
                            rtu.isOver = false ;
                            cache.ugRtuStateList.add(rtu) ;
                        }
                        if(threadJob == null){
                            threadJob = new ThreadJob() {
                                @Override
                                public Object execute() throws Exception {
                                    while(!this.stop){
                                        if(!runInDemo()){
                                            this.stop = true ;
                                        }else{
                                            try {
                                                Thread.sleep(500);
                                            } catch (InterruptedException e) {
                                                e.printStackTrace();
                                            }
                                        }
                                    }
                                    return null;
                                }
                            };
                            try{
                                threadJob.start(new Callback() {
                                    @Override
                                    public void call(Object obj) {
                                    }
                                    @Override
                                    public void call(Object... objs) {
                                    }
                                    @Override
                                    public void exception(Exception e) {
                                    }
                                });
                            }catch (Exception e){
                                e.printStackTrace();
                            }
                        }
                    }
                }
            }
        }
    }
    private boolean runInDemo(){
        for(UpgradeRtu rtu : cache.ugRtuStateList){
            this.rtuUpgrade(rtu) ;
        }
        return this.statisticsNowUpgradeState() ;
    }
    private void rtuUpgrade(UpgradeRtu rtu){
        if(rtu.lastDownDt == null){
            //第一次
            this.dealUpgradeFirstTime(rtu);
        }else{
            this.dealUpgrade(rtu);
        }
    }
    private void dealUpgradeFirstTime(UpgradeRtu rtu){
        int n = Integer.parseInt(new CreateRandom().create(1)) ;
        if(n == 0){
            rtu.state = UpgradeRtu.STATE_OFFLINE ;
            rtu.currentPackage = 0 ;
            rtu.currentRamAddr = 0x00 ;
            rtu.lastDownDt = "" ;
            rtu.lastDownDtAt = 0L ;
            rtu.reTryTimes = 0 ;
            rtu.isOver = false ;
        }else{
            rtu.state = UpgradeRtu.STATE_RUNNING ;
            rtu.currentPackage = 1 ;
            rtu.currentRamAddr = 0x00 ;
            rtu.lastDownDt = DateTime.yyyy_MM_dd_HH_mm_ss() ;
            rtu.lastDownDtAt = System.currentTimeMillis() ;
            rtu.reTryTimes = 0 ;
            rtu.isOver = false ;
        }
    }
    private void dealUpgrade(UpgradeRtu rtu){
        if(rtu.state == UpgradeRtu.STATE_OFFLINE){
            //离线的,不处理
            return ;
        }
        if(rtu.currentPackage == rtu.totalPackage){
            //升级结束
            rtu.state = UpgradeRtu.STATE_SUCCESS ;
            rtu.isOver = true ;
            return ;
        }
        if(rtu.reTryTimes >= 2){
            //重试次数达到最大值
            if(rtu.state == UpgradeRtu.STATE_FAILONE ||
                    rtu.state == UpgradeRtu.STATE_FAIL){
                //又失败了,认为结束了
                rtu.isOver = true ;
                return ;
            }
        }
        int n = Integer.parseInt(new CreateRandom().create(3)) ;
        if(n == 540 || n == 541 || n == 542 || n == 543 || n == 544 || n == 545 || n == 546 || n == 547 || n == 548 || n == 549 ){
            if(rtu.currentPackage == 1){
                //1包死
                rtu.state = UpgradeRtu.STATE_FAILONE ;
                return ;
            }
        }
        if(n == 450 || n == 451 || n == 452 || n == 453 || n == 454 || n == 455){
            if(rtu.currentPackage != 1){
                //升死
                rtu.state = UpgradeRtu.STATE_FAIL ;
                return ;
            }
        }
        if(rtu.state == UpgradeRtu.STATE_FAILONE ||
                rtu.state == UpgradeRtu.STATE_FAIL){
            if(rtu.reTryTimes < 2){
                rtu.state = UpgradeRtu.STATE_RUNNING ;
                rtu.currentPackage = 1 ;
                rtu.currentRamAddr = 0x00 ;
                rtu.lastDownDt = DateTime.yyyy_MM_dd_HH_mm_ss() ;
                rtu.lastDownDtAt = System.currentTimeMillis() ;
                rtu.reTryTimes++ ;
                rtu.isOver = false ;
                return ;
            }
        }
        if(rtu.state != UpgradeRtu.STATE_FAILONE &&
                rtu.state != UpgradeRtu.STATE_FAIL){
            rtu.state = UpgradeRtu.STATE_RUNNING ;
            rtu.currentPackage += 1 ;
            rtu.currentRamAddr = 0x00 + UpgradeRtu.RAMADDRADD ;
            rtu.lastDownDt = DateTime.yyyy_MM_dd_HH_mm_ss() ;
            rtu.lastDownDtAt = System.currentTimeMillis() ;
            rtu.isOver = false ;
        }
    }
    /**
     * 当前升级状态
     * @return
     */
    public boolean statisticsNowUpgradeState() {
        boolean hasRunning = false ;
        if(cache.ugRtuStateList != null && cache.ugRtuStateList.size() > 0){
            cache.ugOverallState.init();
            cache.ugOverallState.rtuTotal = cache.ugRtuStateList.size() ;
            for(UpgradeRtu rtu : cache.ugRtuStateList){
                if(rtu.state == UpgradeRtu.STATE_OFFLINE){
                    cache.ugOverallState.offLineTotal ++ ;
                    cache.ugOverallState.failTotal++;
                }else if(rtu.state == UpgradeRtu.STATE_UNSTART){
                    cache.ugOverallState.unStartTotal ++ ;
                }else if(rtu.state == UpgradeRtu.STATE_RUNNING){
                    cache.ugOverallState.runningTotal ++ ;
                    hasRunning = true ;
                }else if(rtu.state == UpgradeRtu.STATE_SUCCESS) {
                    cache.ugOverallState.successTotal++;
                }else if(rtu.state == UpgradeRtu.STATE_FAILONE) {
                    cache.ugOverallState.dieOneTotal++;
                    cache.ugOverallState.failTotal++;
                }else if(rtu.state == UpgradeRtu.STATE_FAIL) {
                    cache.ugOverallState.dieMultiTotal++;
                    cache.ugOverallState.failTotal++;
                }
                if(rtu.isOver){
                    cache.ugOverallState.overTotal++;
                }
            }
        }
        if(!hasRunning){
            cache.ugOverallState.allOver = true ;
        }
        if(cache.ugOverallState.allOver){
            cache.ugOverallState.overTotal = 0;
            if(cache.ugRtuStateList != null && cache.ugRtuStateList.size() > 0){
                for(UpgradeRtu rtu : cache.ugRtuStateList){
                    rtu.isOver = true ;
                    cache.ugOverallState.overTotal++;
                }
            }
        }
        return hasRunning ;
    }
}