进行ApiFox发送内部命令测试,MQTTX发布气象数据测试,修改测试中发现的bug,修改不完善的地方。
| | |
| | | |
| | | private final GenericObjectPool<MqttClient> pool; |
| | | |
| | | public MqttClientPool(String broker, String username, String password, int maxConnections) { |
| | | MqttClientPooledObjectFactory factory = new MqttClientPooledObjectFactory(broker, username, password); |
| | | public MqttClientPool(String broker, String username, String password, int maxConnections, boolean useMemoryPersistence) { |
| | | MqttClientPooledObjectFactory factory = new MqttClientPooledObjectFactory(broker, username, password, useMemoryPersistence); |
| | | GenericObjectPoolConfig<MqttClient> config = new GenericObjectPoolConfig<>(); |
| | | config.setMaxTotal(maxConnections); |
| | | config.setMaxIdle(maxConnections); |
| | |
| | | import org.apache.commons.pool2.impl.DefaultPooledObject; |
| | | import org.eclipse.paho.client.mqttv3.MqttClient; |
| | | import org.eclipse.paho.client.mqttv3.MqttConnectOptions; |
| | | import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; |
| | | |
| | | /** |
| | | * @Author: liurunyu |
| | |
| | | private final String broker; |
| | | private final String username; |
| | | private final String password; |
| | | private final Boolean useMemoryPersistence; |
| | | |
| | | public MqttClientPooledObjectFactory(String broker, String username, String password) { |
| | | public MqttClientPooledObjectFactory(String broker, String username, String password, boolean useMemoryPersistence) { |
| | | this.broker = broker; |
| | | this.username = username; |
| | | this.password = password; |
| | | this.useMemoryPersistence = useMemoryPersistence; |
| | | } |
| | | |
| | | @Override |
| | | public MqttClient create() throws Exception { |
| | | String clientId = MqttClient.generateClientId(); |
| | | MqttClient client = new MqttClient(broker, clientId); |
| | | |
| | | MqttClient client = null ; |
| | | // 使用内存持久化而非默认的文件持久化 |
| | | if (useMemoryPersistence) { |
| | | MemoryPersistence persistence = new MemoryPersistence(); |
| | | client = new MqttClient(broker, clientId, persistence); |
| | | }else{ |
| | | client = new MqttClient(broker, clientId); |
| | | } |
| | | MqttConnectOptions options = new MqttConnectOptions(); |
| | | options.setUserName(username); |
| | | options.setPassword(password.toCharArray()); |
| | |
| | | public static void main(String[] args) { |
| | | try{ |
| | | // 初始化连接池 |
| | | pool = new MqttClientPool("tcp://" + mqSvIp + ":" + mqSvPort, mqSvUserName, mqSvUserPassword, maxConnections); |
| | | pool = new MqttClientPool("tcp://" + mqSvIp + ":" + mqSvPort, mqSvUserName, mqSvUserPassword, maxConnections, true); |
| | | MqttClient clientSub = pool.popClient() ; |
| | | testSubscribe(clientSub, topic1); |
| | | testSubscribe(clientSub, topic2); |
| | |
| | | public String deviceId ;//设备ID |
| | | public String protocol;//协议 |
| | | |
| | | public String topic ;//消息主题 |
| | | public String msg ;//消息 |
| | | public MqttTopic topic ;//消息主题 |
| | | public String metaData;//MQTT推送来的元数据 |
| | | |
| | | public abstract boolean valid(); |
| | | |
| | |
| | | public String devId ;//设备(FBox)ID |
| | | public String topic ;//消息主题 |
| | | |
| | | public boolean isEmpty(){ |
| | | return orgTag == null || protocol == null || devId == null || topic == null |
| | | || orgTag.trim().length() == 0 || protocol.trim().length() == 0 || devId.trim().length() == 0 || topic.trim().length() == 0 ; |
| | | } |
| | | |
| | | public String shortName(){ |
| | | return topic ; |
| | | } |
| | | |
| | | public String longName(){ |
| | | return orgTag + "/" + protocol + "/" + devId + "/" + topic ; |
| | | } |
| | | } |
| | |
| | | public MqttSubMsgSdV1(MqttTopic subTopic, String msg) { |
| | | this.deviceId = subTopic.devId ; |
| | | this.protocol = subTopic.protocol ; |
| | | this.topic = subTopic.topic ; |
| | | this.msg = msg ; |
| | | this.topic = subTopic ; |
| | | this.metaData = msg ; |
| | | } |
| | | public String toString(){ |
| | | StringBuilder sb = new StringBuilder(); |
| | |
| | | .append("\n") ; |
| | | } |
| | | sb.append("主题:") |
| | | .append(topic) |
| | | .append("\n") ; |
| | | sb.append("消息:") |
| | | .append(msg) |
| | | .append(topic.longName()) |
| | | .append("\n") ; |
| | | if(vo4Up != null){ |
| | | sb.append("数据:") |
| | | .append(vo4Up.toString()) |
| | | .append("\n") ; |
| | | }else{ |
| | | sb.append("元数据:") |
| | | .append(metaData) |
| | | .append("\n") ; |
| | | } |
| | | |
| | | return sb.toString() ; |
| | | } |
| | | |
| | |
| | | if (topic == null || topic.isEmpty()) { |
| | | return false; |
| | | } |
| | | if (msg == null || msg.isEmpty()) { |
| | | if (metaData == null || metaData.isEmpty()) { |
| | | return false; |
| | | } |
| | | return true; |
| | |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.InjectStartVo; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.StirStartVo; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.upVos.*; |
| | | import com.dy.common.mw.protocol4Mqtt.status.DevRunSt; |
| | | import org.eclipse.paho.client.mqttv3.MqttMessage; |
| | | |
| | | /** |
| | |
| | | */ |
| | | public class ProtocolParserSdV1 { |
| | | public MqttSubMsgSdV1 parseSubMsg(MqttTopic subTopic, MqttMessage mqttMsg, MqttCallback callback) throws Exception { |
| | | String msg = new String(mqttMsg.getPayload(), "UTF-8"); |
| | | if(JSON.isValid(msg)){ |
| | | throw new Exception("接收到MQTT消息,协议" + subTopic.protocol + ",设备ID" + subTopic.devId + ",主题" + subTopic.topic + "消息格式非json数据(" + msg + ")") ; |
| | | String strTxt = new String(mqttMsg.getPayload(), "UTF-8"); |
| | | if(!JSON.isValid(strTxt)){ |
| | | throw new Exception("接收到MQTT消息,协议" + subTopic.protocol + ",设备ID" + subTopic.devId + ",主题" + subTopic.longName() + "消息格式非json数据(" + strTxt + ")") ; |
| | | } |
| | | MqttSubMsgSdV1 ms = new MqttSubMsgSdV1(subTopic, msg); |
| | | MqttSubMsgSdV1 msg = new MqttSubMsgSdV1(subTopic, strTxt); |
| | | Vo4Up vo ; |
| | | DevRunSt stVo ; |
| | | switch (subTopic.topic) { |
| | | case ProtocolConstantSdV1.SubTopicWeather -> { |
| | | vo = JSON.parseObject(msg, WeatherVo.class); |
| | | vo = JSON.parseObject(strTxt, WeatherVo.class); |
| | | break; |
| | | } |
| | | case ProtocolConstantSdV1.SubTopicSoil -> { |
| | | vo = JSON.parseObject(msg, SoilVo.class); |
| | | vo = JSON.parseObject(strTxt, SoilVo.class); |
| | | break; |
| | | } |
| | | case ProtocolConstantSdV1.SubTopicManure -> { |
| | | vo = JSON.parseObject(msg, ManureVo.class); |
| | | vo = JSON.parseObject(strTxt, ManureVo.class); |
| | | break; |
| | | } |
| | | case ProtocolConstantSdV1.SubTopicState -> { |
| | | //此处未完成,应该产生一些通信的info,供下面callback.notify(objs)通知出去 |
| | | vo = JSON.parseObject(msg, StateVo.class); |
| | | vo = JSON.parseObject(strTxt, StateVo.class); |
| | | stVo = new DevRunSt() ; |
| | | stVo.id = msg.deviceId ; |
| | | //stVo.stirRunning = true ; //搅拌运行 true是 false否 |
| | | //stVo.injectRunning = true ; //注肥运行 true是 false否 |
| | | //stVo.irrRunning = true ; //灌溉运行 true是 false否 |
| | | //stVo.alarm = true ; //报警 true是 false否 |
| | | break; |
| | | } |
| | | default -> { |
| | | throw new Exception("接收到MQTT消息,协议" + subTopic.protocol + ",设备ID" + subTopic.devId + ",主题" + subTopic.topic + "消息解析逻辑未实现"); |
| | | } |
| | | } |
| | | ms.vo4Up = vo ; |
| | | callback.callback(ms); |
| | | msg.vo4Up = vo ; |
| | | callback.callback(msg); |
| | | callback.notify(null);//此处未完成 |
| | | return ms; |
| | | return msg; |
| | | } |
| | | |
| | | public MqttPubMsgSdV1 createPubMsg(String orgTag, Command com) throws Exception { |
| | |
| | | |
| | | public String devDtStr ;//设备时间 |
| | | public String getDevDtStr() { |
| | | if(devDt == null){ |
| | | if(devDt != null){ |
| | | return DateTime.yyyy_MM_dd_HH_mm_ss(DateTime.getDate(devDt)) ; |
| | | }else{ |
| | | return "" ; |
| | |
| | | @Override |
| | | public String toString(){ |
| | | StringBuilder sb = new StringBuilder(); |
| | | sb.append("气象数据:") ; |
| | | sb.append(" 消息ID:"+messageId) ; |
| | | sb.append(" 二氧化碳:"+carbonDioxide) ; |
| | | sb.append(" 光照强度:"+lightIntensity) ; |
| | | sb.append(" 大气压力:"+atmosphericPressure) ; |
| | | sb.append(" 空气温度:"+airTemperature) ; |
| | | sb.append(" 空气湿度:"+airHumidity) ; |
| | | sb.append(" PM2.5:"+pm25) ; |
| | | sb.append(" PM10:"+pm10) ; |
| | | sb.append(" 设备时间:"+devDt) ; |
| | | sb.append(" 设备时间:"+ this.getDevDtStr()) ; |
| | | sb.append("气象数据=>") ; |
| | | sb.append(" 消息ID:"+messageId + ", ") ; |
| | | sb.append(" 二氧化碳:"+carbonDioxide + ", ") ; |
| | | sb.append(" 光照强度:"+lightIntensity + ", ") ; |
| | | sb.append(" 大气压力:"+atmosphericPressure + ", ") ; |
| | | sb.append(" 空气温度:"+airTemperature + ", ") ; |
| | | sb.append(" 空气湿度:"+airHumidity + ", ") ; |
| | | sb.append(" PM2.5:"+pm25 + ", ") ; |
| | | sb.append(" PM10:"+pm10 + ", ") ; |
| | | sb.append(" 设备时间:"+devDt + ", ") ; |
| | | sb.append(" 设备时间:"+ this.getDevDtStr() + ", ") ; |
| | | sb.append("\n") ; |
| | | return sb.toString() ; |
| | | } |
| | |
| | | if(mqVo.poolMaxSize <= 1 || mqVo.poolMaxSize > 1000){ |
| | | throw new Exception("config.mqtt.poolMaxSize配置的连接池连接最大数量不合法") ; |
| | | } |
| | | mqVo.useMemoryPersistence = conf.getSetAttrBoolean(doc, "config.mqtt", "useMemoryPersistence", null, null) ; |
| | | String proAndDevIds = conf.getSetAttrTxt(doc, "config.mqtt", "protocolAndDeviceIds", null, false, null) ; |
| | | if(proAndDevIds == null || proAndDevIds.trim().equals("")){ |
| | | throw new Exception("config.mqtt.protocolAndDeviceIds配置不合法") ; |
| | |
| | | com2.code = CodeLocal.stopMqttSv ; |
| | | com2.type = CommandType.innerCommand ; |
| | | new CommandInnerDeaLer().deal(com2) ; |
| | | |
| | | log.info("关闭程序前,关闭了MQTT服务"); |
| | | }catch (Exception e){ |
| | | log.error("程序(控制台)关闭钩子发生异常", e); |
| | | } |
| | |
| | | rCom = this.stopMqttSv(com); |
| | | break; |
| | | } |
| | | case CodeLocal.recoverMqttSv -> { |
| | | rCom = this.recoverMqttSv(com); |
| | | break; |
| | | } |
| | | default -> { |
| | | rCom = ReturnCommand.errored("出错,收到内部命令的功能码不能识别!", com.getId(), com.getCode()); |
| | | break; |
| | |
| | | * @param message |
| | | */ |
| | | public static Command successed(String message, String commandId, String code, Object attachment) { |
| | | log.info(message); |
| | | Command command = new Command().createReturnSuccessCommand(message, commandId, code); |
| | | command.setAttachment(attachment); |
| | | return command; |
| | |
| | | * @param message |
| | | */ |
| | | public static Command successed(String message, String commandId, String code) { |
| | | log.info(message); |
| | | return new Command().createReturnSuccessCommand(message, commandId, code); |
| | | } |
| | | /** |
| | |
| | | * @param message |
| | | */ |
| | | public static Command errored(String message, String commandId, String code) { |
| | | log.error(message); |
| | | return new Command().createReturnErrorCommand(message, commandId, code); |
| | | } |
| | | } |
| | |
| | | |
| | | public static final String onPartLineMqtt = "LMCD0002" ;//查询部分MQTT设备在线情况 |
| | | |
| | | public static final String onLineStatisticsMqtt = "LMCD0003" ;//查询所有MQTT设备状态统计情况 |
| | | public static final String onLineStatisticsMqtt = "LMCD0003" ;//查询所有MQTT设备在线状态统计情况 |
| | | |
| | | public static final String allRtuStatesMqtt = "LMCD0010" ;//查询所有MQTT设备状态 |
| | | |
| | |
| | | |
| | | public static final String stopMqttSv = "LMCD0110" ;//停止Mqtt服务 |
| | | |
| | | public static final String recoverMqttSv = "LMCD0112" ;//重启MQTT服务,接入新的MQTT连接 |
| | | |
| | | |
| | | } |
| | |
| | | Map.Entry<String, DevStatus> entry = null ; |
| | | while(it.hasNext()){ |
| | | entry = it.next() ; |
| | | if(((DevStatus)entry).onLine != null && ((DevStatus)entry).onLine.booleanValue()){ |
| | | if((entry.getValue()).onLine != null && (entry.getValue()).onLine.booleanValue()){ |
| | | vo.onLineNum++ ; |
| | | }else{ |
| | | vo.offLineNum++ ; |
| | |
| | | import org.apache.logging.log4j.Logger; |
| | | import org.eclipse.paho.client.mqttv3.MqttClient; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | |
| | | /** |
| | | * @Author: liurunyu |
| | | * @Date: 2025/6/4 14:54 |
| | |
| | | private MqttUnitConfigVo configVo ; |
| | | |
| | | private MqttClientPool pool; |
| | | |
| | | private List<MqttClient> subClients ; |
| | | |
| | | private MqttManager(){ |
| | | } |
| | |
| | | * @throws Exception |
| | | */ |
| | | public void start()throws Exception{ |
| | | subClients = new ArrayList<>(); |
| | | String URL = "tcp://" + this.configVo.svIp + ":" + this.configVo.svPort; |
| | | this.pool = new MqttClientPool(URL, this.configVo.svUserName, this.configVo.svUserPassword, this.configVo.poolMaxSize); |
| | | this.pool = new MqttClientPool(URL, this.configVo.svUserName, this.configVo.svUserPassword, this.configVo.poolMaxSize, this.configVo.useMemoryPersistence); |
| | | if(this.pool.isClose()){ |
| | | throw new Exception("Mqtt连接池初始化失败"); |
| | | } |
| | |
| | | if(clientSub == null || !clientSub.isConnected()){ |
| | | throw new Exception("Mqtt连接池获得订阅连接不可用"); |
| | | } |
| | | subClients.add(clientSub) ; |
| | | // 订阅主题 |
| | | for(int i = 0; i < this.configVo.subTopics.length; i++){ |
| | | for(int j = 0 ; j < this.configVo.protocolAndDeviceIds.length; j++){ |
| | |
| | | } |
| | | |
| | | public void stop()throws Exception{ |
| | | if(subClients != null && subClients.size() > 0){ |
| | | for (MqttClient client : subClients) { |
| | | if(client != null && client.isConnected()){ |
| | | try{ |
| | | client.disconnect(); |
| | | client.close(); |
| | | }catch (Exception e){ |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | if(this.pool != null){ |
| | | // 关闭连接池 |
| | | this.pool.close(); |
| | |
| | | |
| | | @Override |
| | | public void messageArrived(String topic, MqttMessage msg) throws Exception { |
| | | MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic) ; |
| | | MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback(){ |
| | | @Override |
| | | public void callback(MqttSubMsg subMsg) { |
| | | DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol); |
| | | DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId); |
| | | RtuLogDealer.log4Mqtt(subMsg.deviceId, "订阅消息 主题:" + subMsg.topic + " 消息:" + subMsg.msg); |
| | | } |
| | | @Override |
| | | public void notify(String devId, MqttNotifyInfo... infos) { |
| | | if(notify != null){ |
| | | notify.notify(devId, infos) ; |
| | | try { |
| | | MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic); |
| | | MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback() { |
| | | @Override |
| | | public void callback(MqttSubMsg subMsg) { |
| | | DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol); |
| | | DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId); |
| | | RtuLogDealer.log4Mqtt(subMsg.deviceId, "订阅消息 主题:" + subMsg.topic.longName() + " 元数据:" + subMsg.metaData); |
| | | } |
| | | } |
| | | }) ; |
| | | this.nextDeal(subMsg); |
| | | |
| | | @Override |
| | | public void notify(String devId, MqttNotifyInfo... infos) { |
| | | if (notify != null) { |
| | | notify.notify(devId, infos); |
| | | } |
| | | } |
| | | }); |
| | | this.nextDeal(subMsg); |
| | | }catch(Exception e){ |
| | | log.error("处理MQTT订阅消息发生异常", e); |
| | | } |
| | | } |
| | | private void nextDeal(MqttSubMsg subMsg)throws Exception { |
| | | subMsg.action(new Callback() { |
| | |
| | | public String svUserName ;// |
| | | public String svUserPassword ;// |
| | | public Integer poolMaxSize ;// |
| | | public Boolean useMemoryPersistence ; |
| | | public String[] protocolAndDeviceIds ;//设备协议与ID(FBox)id |
| | | public String[] deviceIds ;//设备(FBox)id |
| | | public String[] subTopics ;//订阅的主题 |
| | |
| | | this.svUserName = "dyyjy" ; |
| | | this.svUserPassword = "Dyyjy2025,;.abc!@#" ; |
| | | this.poolMaxSize = 10 ; |
| | | useMemoryPersistence = true ; |
| | | this.pubTopicQos = 1 ; |
| | | this.noSubThenOff = 10 * 60 * 10000L ; |
| | | } |
| | |
| | | svUserName MQTT服务器用户名 |
| | | svUserPassword MQTT服务器用户密码 |
| | | poolMaxSize 连接池最大连接数 |
| | | useMemoryPersistence 使用内存持久化而非默认的文件持久化(true是 false否) |
| | | protocolAndDeviceIds 在子系统(orgTag)中接入的设备(FBox)所用协议及设备id集合,多个用逗号隔开,协议与ID用正斜杠隔开,例如:sd1/338220031439,sd1/338220031440 |
| | | subTopicAndQos: 订阅主题与Qos,主题名与其Qos用逗号隔开,多个主题及Qos用分号隔开,例如:ym/topic1,1;ym/topic2,1;ym/topic3,1,如果有多个OrgTag,主题前缀用其OrgTag |
| | | pubTopicQos: 发布主题的Qos,取值范围: |
| | |
| | | svUserName="dyyjy" |
| | | svUserPassword="Dyyjy2025,;.abc!@#" |
| | | poolMaxSize="10" |
| | | useMemoryPersistence="true" |
| | | protocolAndDeviceIds="${mqtt.protocolAndDeviceIds}" |
| | | subTopicAndQos="${mqtt.subTopicAndQos}" |
| | | pubTopicQos="1" |