From 1135c89deb50a080152f9086fc7b741c415ecd54 Mon Sep 17 00:00:00 2001
From: liurunyu <lry9898@163.com>
Date: 星期三, 12 二月 2025 17:00:14 +0800
Subject: [PATCH] 通信中间件增加功能: 1、实现消息心中; 2、开阀报、关阀报、报警数据都会在消息中间件存入消息; 3、在消息中心注册消息接收者,消息中心周期性向消息接收者推送消息。

---
 pipIrr-platform/pipIrr-web/pipIrr-web-remote/src/main/java/com/dy/pipIrrRemote/rtuUpgrage/RtuUpgradeStateReceiverCtrl.java |  144 ++++++++++++++++++++++++++++++++++-------------
 1 files changed, 104 insertions(+), 40 deletions(-)

diff --git a/pipIrr-platform/pipIrr-web/pipIrr-web-remote/src/main/java/com/dy/pipIrrRemote/rtuUpgrage/RtuUpgradeStateReceiverCtrl.java b/pipIrr-platform/pipIrr-web/pipIrr-web-remote/src/main/java/com/dy/pipIrrRemote/rtuUpgrage/RtuUpgradeStateReceiverCtrl.java
index 4cd05ba..a31da14 100644
--- a/pipIrr-platform/pipIrr-web/pipIrr-web-remote/src/main/java/com/dy/pipIrrRemote/rtuUpgrage/RtuUpgradeStateReceiverCtrl.java
+++ b/pipIrr-platform/pipIrr-web/pipIrr-web-remote/src/main/java/com/dy/pipIrrRemote/rtuUpgrage/RtuUpgradeStateReceiverCtrl.java
@@ -1,5 +1,7 @@
 package com.dy.pipIrrRemote.rtuUpgrage;
 
+import com.dy.common.contant.Constant;
+import com.dy.common.multiDataSource.DataSourceContext;
 import com.dy.common.softUpgrade.state.UpgradeInfo;
 import com.dy.common.softUpgrade.state.UpgradeRtu;
 import com.dy.common.softUpgrade.state.UpgradeState;
@@ -12,11 +14,14 @@
 import com.dy.pipIrrGlobal.pojoUg.UgRtuTask;
 import io.swagger.v3.oas.annotations.Hidden;
 import io.swagger.v3.oas.annotations.tags.Tag;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.*;
 
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -37,18 +42,22 @@
     private RtuUpgradeSv sv ;
 
     /**
+     * 寮哄埗缁撴潫鍗囩骇浠诲姟閫氫俊涓棿浠舵垚鍔熸墽琛屽悗
+     */
+    public static void afterMwForceOverCurUgTask(){
+        cache = null ;
+    }
+
+    /**
      * rtu杩滅▼鍗囩骇浠诲姟閫氫俊涓棿浠舵墽琛屾儏鍐电粺璁″洖鏀�
      * @param info 鏁版嵁
      * @return 鎿嶄綔缁撴灉
      */
     @Hidden //涓嶅叕寮�鎺ュ彛锛屽叾鍙湁閫氫俊涓棿浠惰皟鐢�
     @PostMapping(path = "/receive")
-    public BaseResponse<Boolean> receive(@RequestBody UpgradeInfo info){
-        log.info("鎺ユ敹鍒扮殑RTU杩滅▼鍗囩骇鐘舵�佹暟鎹负锛歿}", info.toString());
-        //杩涜鎺掑簭
-        //Comparator<UpgradeRtu> comparator = Comparator.comparing(UpgradeRtu::getRtuAddr, Comparator.naturalOrder());
-        //info.ugRtuStateList = info.ugRtuStateList.stream().sorted(comparator).collect(Collectors.toList());
-
+    public BaseResponse<Boolean> receive(@RequestBody UpgradeInfo info, HttpServletRequest req, HttpServletResponse rep){
+        /*
+        log.info("鎺ユ敹鍒扮殑RTU杩滅▼鍗囩骇鐘舵�佹暟鎹负锛歿}", info.toString("532328000214"));
         if(info.ugRtuStateList != null && info.ugRtuStateList.size() > 0){
             if(info.ugRtuStateList.size() < 10){
                 for (UpgradeRtu rtuVo : info.ugRtuStateList) {
@@ -56,12 +65,22 @@
                 }
             }
         }
+         */
+
+        //杩涜鎺掑簭
+        Comparator<UpgradeRtu> comparator = Comparator.comparing(UpgradeRtu::getRtuAddr, Comparator.naturalOrder());
+        info.ugRtuStateList = info.ugRtuStateList.stream().sorted(comparator).collect(Collectors.toList());
+
+        //閫氫俊涓棿浠朵紶杩囨潵鐨勬満鏋則ag锛屼互鐢ㄤ簬鏌ユ壘鏁版嵁婧�
+        String token = req.getHeader(Constant.UserTokenKeyInHeader);
+        DataSourceContext.set(token);
+
         if(cache == null){
+            cache = info;
             //姝ゆ椂涓嶅仛浠诲姟鎿嶄綔锛屽彧淇濋殰cache涓嶄负绌猴紝 绛夊緟涓嬫鍙戞潵鏁版嵁
             if(info.ugRtuStateList != null && info.ugRtuStateList.size() > 0){
                 List<UpgradeRtu> overList = info.ugRtuStateList.stream().filter(itemVo -> itemVo.isOver).collect(Collectors.toList()) ;
                 if(overList != null && overList.size() > 0){
-                    cache = info;
                     this.save2Db(info.ugTaskId, overList);
                 }
             }
@@ -70,7 +89,11 @@
             if(info.ugRtuStateList != null && info.ugRtuStateList.size() > 0){
                 //姝ゆ椂淇濊瘉涓や釜闆嗗悎閮戒笉涓簄ull
                 this.save2Db(info.ugTaskId, info.ugRtuStateList, cache.ugRtuStateList);
-                cache = info;
+            }
+            //cache璧嬪�煎繀椤绘斁鍦ㄤ笂闈㈠鐞嗙殑鍚庨潰锛屽惁鍒欎笂闈㈢殑姣旇緝涓嶆垚鍔�
+            cache = info;
+            if(info.ugOverallState != null && info.ugOverallState.allOver){
+                this.saveTaskOver(info.ugTaskId) ;
             }
         }
         return null;
@@ -95,15 +118,31 @@
      */
     private void save2Db(String taskId, List<UpgradeRtu> newList, List<UpgradeRtu> oldList){
         List<UpgradeRtu> newOverList = newList.stream().filter(vo -> vo.isOver).collect(Collectors.toList()) ;
+        List<UpgradeRtu> oldNoOverList = newList.stream().filter(vo -> !vo.isOver).collect(Collectors.toList()) ;
+        boolean oldExist = false ;
         for(UpgradeRtu nvo : newOverList){
-            if(nvo != null) {
-                if(oldList.stream().anyMatch(vo -> vo.rtuAddr.equals(nvo.rtuAddr) && vo.isOver == false)){
-                    //涓婃娌℃湁鍗囩骇缁撴潫锛岃�屽綋鍓嶅崌绾х粨鏉熶簡
-                    this.sv.saveRtuUpgradeState(Long.parseLong(taskId), nvo);
-                }
+            oldExist = false ;
+            if(oldNoOverList.stream().anyMatch(vo -> vo.rtuAddr.equals(nvo.rtuAddr))){
+                oldExist = true ;
+            }
+            if(!oldExist){
+                //涓婃娌℃湁鍗囩骇缁撴潫锛岃�屽綋鍓嶅崌绾х粨鏉熶簡
+                this.sv.saveRtuUpgradeState(Long.parseLong(taskId), nvo);
             }
         }
     }
+
+    /**
+     * 淇濆瓨鍗囩骇浠诲姟宸茬粡鎵ц瀹屾垚
+     * @param taskId
+     */
+    private void saveTaskOver(String taskId){
+        this.sv.updateTaskOver(taskId) ;
+    }
+
+
+
+
     /////////////////////////////////////////////////////
     //
     // 浠ヤ笅妯℃嫙鏁版嵁
@@ -146,7 +185,7 @@
                                 @Override
                                 public Object execute() throws Exception {
                                     while(!this.stop){
-                                        if(!runDemo()){
+                                        if(!runInDemo()){
                                             this.stop = true ;
                                         }else{
                                             try {
@@ -180,7 +219,7 @@
             }
         }
     }
-    private boolean runDemo(){
+    private boolean runInDemo(){
         for(UpgradeRtu rtu : cache.ugRtuStateList){
             this.rtuUpgrade(rtu) ;
         }
@@ -202,6 +241,7 @@
             rtu.currentPackage = 0 ;
             rtu.currentRamAddr = 0x00 ;
             rtu.lastDownDt = "" ;
+            rtu.lastDownDtAt = 0L ;
             rtu.reTryTimes = 0 ;
             rtu.isOver = false ;
         }else{
@@ -209,6 +249,7 @@
             rtu.currentPackage = 1 ;
             rtu.currentRamAddr = 0x00 ;
             rtu.lastDownDt = DateTime.yyyy_MM_dd_HH_mm_ss() ;
+            rtu.lastDownDtAt = System.currentTimeMillis() ;
             rtu.reTryTimes = 0 ;
             rtu.isOver = false ;
         }
@@ -218,22 +259,25 @@
             //绂荤嚎鐨勶紝涓嶅鐞�
             return ;
         }
-        int n = Integer.parseInt(new CreateRandom().create(2)) ;
-        if(n == 4
-                || n == 14
-                || n == 24
-                || n == 34
-                || n == 44
-                || n == 54
-                || n == 64
-                || n == 74
-                || n == 84
-                || n == 95
-                || n == 45
-                || n == 46
-                || n == 47
-                || n == 48
-                || n == 49){
+
+        if(rtu.currentPackage == rtu.totalPackage){
+            //鍗囩骇缁撴潫
+            rtu.state = UpgradeRtu.STATE_SUCCESS ;
+            rtu.isOver = true ;
+            return ;
+        }
+        if(rtu.reTryTimes >= 2){
+            //閲嶈瘯娆℃暟杈惧埌鏈�澶у��
+            if(rtu.state == UpgradeRtu.STATE_FAILONE ||
+                    rtu.state == UpgradeRtu.STATE_FAIL){
+                //鍙堝け璐ヤ簡锛岃涓虹粨鏉熶簡
+                rtu.isOver = true ;
+                return ;
+            }
+        }
+
+        int n = Integer.parseInt(new CreateRandom().create(3)) ;
+        if(n == 540 || n == 541 || n == 542 || n == 543 || n == 544 || n == 545 || n == 546 || n == 547 || n == 548 || n == 549 ){
             if(rtu.currentPackage == 1){
                 //1鍖呮
                 rtu.state = UpgradeRtu.STATE_FAILONE ;
@@ -241,8 +285,7 @@
             }
         }
 
-        if(n == 44
-                || n == 45){
+        if(n == 450 || n == 451 || n == 452 || n == 453 || n == 454 || n == 455){
             if(rtu.currentPackage != 1){
                 //鍗囨
                 rtu.state = UpgradeRtu.STATE_FAIL ;
@@ -250,11 +293,18 @@
             }
         }
 
-        if(rtu.currentPackage == rtu.totalPackage){
-            //鍗囩骇缁撴潫
-            rtu.state = UpgradeRtu.STATE_SUCCESS ;
-            rtu.isOver = true ;
-            return ;
+        if(rtu.state == UpgradeRtu.STATE_FAILONE ||
+                rtu.state == UpgradeRtu.STATE_FAIL){
+            if(rtu.reTryTimes < 2){
+                rtu.state = UpgradeRtu.STATE_RUNNING ;
+                rtu.currentPackage = 1 ;
+                rtu.currentRamAddr = 0x00 ;
+                rtu.lastDownDt = DateTime.yyyy_MM_dd_HH_mm_ss() ;
+                rtu.lastDownDtAt = System.currentTimeMillis() ;
+                rtu.reTryTimes++ ;
+                rtu.isOver = false ;
+                return ;
+            }
         }
 
         if(rtu.state != UpgradeRtu.STATE_FAILONE &&
@@ -263,7 +313,7 @@
             rtu.currentPackage += 1 ;
             rtu.currentRamAddr = 0x00 + UpgradeRtu.RAMADDRADD ;
             rtu.lastDownDt = DateTime.yyyy_MM_dd_HH_mm_ss() ;
-            rtu.reTryTimes = 0 ;
+            rtu.lastDownDtAt = System.currentTimeMillis() ;
             rtu.isOver = false ;
         }
 
@@ -281,6 +331,7 @@
             for(UpgradeRtu rtu : cache.ugRtuStateList){
                 if(rtu.state == UpgradeRtu.STATE_OFFLINE){
                     cache.ugOverallState.offLineTotal ++ ;
+                    cache.ugOverallState.failTotal++;
                 }else if(rtu.state == UpgradeRtu.STATE_UNSTART){
                     cache.ugOverallState.unStartTotal ++ ;
                 }else if(rtu.state == UpgradeRtu.STATE_RUNNING){
@@ -289,9 +340,10 @@
                 }else if(rtu.state == UpgradeRtu.STATE_SUCCESS) {
                     cache.ugOverallState.successTotal++;
                 }else if(rtu.state == UpgradeRtu.STATE_FAILONE) {
-                    cache.ugOverallState.failOneTotal++;
+                    cache.ugOverallState.dieOneTotal++;
                     cache.ugOverallState.failTotal++;
                 }else if(rtu.state == UpgradeRtu.STATE_FAIL) {
+                    cache.ugOverallState.dieMultiTotal++;
                     cache.ugOverallState.failTotal++;
                 }
                 if(rtu.isOver){
@@ -299,6 +351,18 @@
                 }
             }
         }
+        if(!hasRunning){
+            cache.ugOverallState.allOver = true ;
+        }
+        if(cache.ugOverallState.allOver){
+            cache.ugOverallState.overTotal = 0;
+            if(cache.ugRtuStateList != null && cache.ugRtuStateList.size() > 0){
+                for(UpgradeRtu rtu : cache.ugRtuStateList){
+                    rtu.isOver = true ;
+                    cache.ugOverallState.overTotal++;
+                }
+            }
+        }
         return hasRunning ;
     }
 }

--
Gitblit v1.8.0