From 2c9b0a8c91cbafc8dfe61118f1d35064cb237d3f Mon Sep 17 00:00:00 2001 From: lzq Date: Sun, 28 Sep 2025 18:03:44 +0800 Subject: [PATCH] 1 --- pom.xml | 2 +- src/main/java/com/njzscloud/App.java | 12 +++++- .../java/com/njzscloud/TuqiangListeners.java | 4 +- .../com/njzscloud/common/jt808/JT808.java | 9 +++++ .../common/jt808/support/JT808Decoder.java | 1 + .../common/jt808/support/JT808Encoder.java | 1 + .../java/com/njzscloud/common/mqtt/Mqtt.java | 34 ++++++++++++++++- .../com/njzscloud/common/sichen/Sichen.java | 37 +++++++++++++++++++ .../com/njzscloud/common/tuqiang/Tuqiang.java | 10 ++++- .../common/tuqiang/msg/DeviceInfo.java | 1 + .../result/RealtimeLocationResult.java | 2 +- 11 files changed, 106 insertions(+), 7 deletions(-) create mode 100644 src/main/java/com/njzscloud/common/sichen/Sichen.java diff --git a/pom.xml b/pom.xml index 945d5a6..b4bba2f 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.njzscloud - localizer + gps 0.0.1 jar diff --git a/src/main/java/com/njzscloud/App.java b/src/main/java/com/njzscloud/App.java index 0caace6..3974a40 100644 --- a/src/main/java/com/njzscloud/App.java +++ b/src/main/java/com/njzscloud/App.java @@ -1,14 +1,24 @@ package com.njzscloud; import com.njzscloud.common.mqtt.Mqtt; +import com.njzscloud.common.sichen.Sichen; import com.njzscloud.common.tuqiang.Tuqiang; import lombok.extern.slf4j.Slf4j; @Slf4j public class App { public static void main(String[] args) { - Mqtt.run("tcp://localhost:1883", "tuqiang", "admin", "123456"); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + Mqtt.shutdown(); + Tuqiang.stop(); + // Sichen.shutdown(); + })); + + Mqtt.run("tcp://139.224.54.144:1883", "tuqiang", "gps", "TKG4TV3dF7CeazDnUdCF"); + // Mqtt.run("tcp://localhost:1883", "tuqiang", "admin", "123456"); Mqtt.addListener(new MqttMsgHandlers()); + Sichen.init(); + // Sichen.setInterval(Tuqiang::publishDeviceInfo, 10); Tuqiang.run(18888, 1, 2); } } diff --git a/src/main/java/com/njzscloud/TuqiangListeners.java b/src/main/java/com/njzscloud/TuqiangListeners.java index 94aeba4..f0a65d0 100644 --- a/src/main/java/com/njzscloud/TuqiangListeners.java +++ b/src/main/java/com/njzscloud/TuqiangListeners.java @@ -64,7 +64,7 @@ public class TuqiangListeners { BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class); RealtimeLocationResult realtimeLocationResult = BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class) .setTerminalId(terminalId) - .setType(type == 0 ? "正常位置汇报" : "盲区补报"); + .setType(type); Mqtt.publish(terminalId + "/track_location", realtimeLocationResult); } } @@ -96,6 +96,8 @@ public class TuqiangListeners { DeviceInfo deviceInfo = DeviceInfo.parse(txt); if (deviceInfo != null) { String terminalId = message.getTerminalId(); + deviceInfo.setTerminalId(terminalId); + Mqtt.publish(terminalId + "/device_info", deviceInfo); } } diff --git a/src/main/java/com/njzscloud/common/jt808/JT808.java b/src/main/java/com/njzscloud/common/jt808/JT808.java index f279c09..f2f9d72 100644 --- a/src/main/java/com/njzscloud/common/jt808/JT808.java +++ b/src/main/java/com/njzscloud/common/jt808/JT808.java @@ -5,6 +5,7 @@ import com.njzscloud.common.jt808.support.JT808MessageResolver; import com.njzscloud.common.jt808.support.MessageBody; import com.njzscloud.common.jt808.util.FlowId; import com.njzscloud.common.tcp.TcpServer; +import com.njzscloud.common.tuqiang.msg.PublishTxtMsg; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; @@ -15,6 +16,7 @@ import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; @@ -113,6 +115,13 @@ public final class JT808 { } } + public static void publishDeviceInfo() { + Set terminalIds = terminalChannels.keySet(); + for (String terminalId : terminalIds) { + JT808.sendMessage(JT808.createBaseMessage(terminalId, 0x8300, new PublishTxtMsg("").toBytes())); + } + } + /** * 创建基础消息对象 */ diff --git a/src/main/java/com/njzscloud/common/jt808/support/JT808Decoder.java b/src/main/java/com/njzscloud/common/jt808/support/JT808Decoder.java index da74f55..4e6afe4 100644 --- a/src/main/java/com/njzscloud/common/jt808/support/JT808Decoder.java +++ b/src/main/java/com/njzscloud/common/jt808/support/JT808Decoder.java @@ -63,6 +63,7 @@ public class JT808Decoder extends ByteToMessageDecoder { // 解析消息内容 JT808Message message = parseMessage(unescapedBuf); + unescapedBuf.release(); if (message != null) { out.add(message); } diff --git a/src/main/java/com/njzscloud/common/jt808/support/JT808Encoder.java b/src/main/java/com/njzscloud/common/jt808/support/JT808Encoder.java index 3b23134..94eb6b5 100644 --- a/src/main/java/com/njzscloud/common/jt808/support/JT808Encoder.java +++ b/src/main/java/com/njzscloud/common/jt808/support/JT808Encoder.java @@ -59,6 +59,7 @@ public class JT808Encoder extends MessageToByteEncoder { // 12. 写入结束符 out.writeByte(MSG_DELIMITER); + escapedBuf.release(); } finally { // 释放缓冲区 if (contentBuf.refCnt() > 0) { diff --git a/src/main/java/com/njzscloud/common/mqtt/Mqtt.java b/src/main/java/com/njzscloud/common/mqtt/Mqtt.java index 0b6ac23..61f20f1 100644 --- a/src/main/java/com/njzscloud/common/mqtt/Mqtt.java +++ b/src/main/java/com/njzscloud/common/mqtt/Mqtt.java @@ -1,5 +1,6 @@ package com.njzscloud.common.mqtt; +import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.util.StrUtil; import com.njzscloud.common.core.jackson.Jackson; import com.njzscloud.common.mqtt.support.MqttListener; @@ -8,9 +9,11 @@ import lombok.AccessLevel; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; import java.lang.reflect.Method; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; @@ -19,11 +22,23 @@ import java.util.function.Consumer; public final class Mqtt { private static final Map> listeners = new ConcurrentHashMap<>(); private static MqttClient client; + private static String broker; + private static String clientId; + private static String username; + private static String password; public static void run(String broker, String clientId, String username, String password) { try { - client = new MqttClient(broker, clientId); + Mqtt.broker = broker; + Mqtt.clientId = clientId; + Mqtt.username = username; + Mqtt.password = password; + MqttDefaultFilePersistence persistence = new MqttDefaultFilePersistence("logs/mqtt"); + client = new MqttClient(broker, clientId, persistence); MqttConnectOptions options = new MqttConnectOptions(); + options.setAutomaticReconnect(true); + options.setConnectionTimeout(1000); + options.setMaxReconnectDelay(30000); if (StrUtil.isNotBlank(username) && StrUtil.isNotBlank(password)) { options.setUserName(username); options.setPassword(password.toCharArray()); @@ -35,9 +50,18 @@ public final class Mqtt { } catch (Exception e) { log.error("mqtt连接失败", e); shutdown(); + // reconnect(); } } + private static void reconnect() { + CompletableFuture.runAsync(() -> { + ThreadUtil.sleep(10000); + log.error("mqtt 10 秒后重新连接:{} {}", broker, clientId); + run(broker, clientId, username, password); + }); + } + public static void shutdown() { try { if (client != null) { @@ -47,6 +71,7 @@ public final class Mqtt { } catch (Exception e) { log.error("mqtt关闭失败", e); } + log.info("mqtt 关闭成功"); } public static void subscribe(String topic, int qos) { @@ -108,6 +133,8 @@ public final class Mqtt { } public static class MsgHandler implements MqttCallback { + + @Override public void messageArrived(String topic, MqttMessage message) throws Exception { Consumer handler = listeners.get(topic); if (handler == null) return; @@ -118,12 +145,15 @@ public final class Mqtt { } } + @Override public void connectionLost(Throwable cause) { log.error("mqtt 连接已断开", cause); + // reconnect(); } + @Override public void deliveryComplete(IMqttDeliveryToken token) { - log.info("消息投递结果:{}", token.isComplete()); + // log.info("消息投递结果:{}", token.isComplete()); } } } diff --git a/src/main/java/com/njzscloud/common/sichen/Sichen.java b/src/main/java/com/njzscloud/common/sichen/Sichen.java new file mode 100644 index 0000000..aa41084 --- /dev/null +++ b/src/main/java/com/njzscloud/common/sichen/Sichen.java @@ -0,0 +1,37 @@ +package com.njzscloud.common.sichen; + +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class Sichen { + private static ScheduledExecutorService scheduler; + + public static void init(int corePoolSize) { + scheduler = Executors.newScheduledThreadPool(corePoolSize); + } + + public static void shutdown() { + if (scheduler != null) { + scheduler.shutdown(); + scheduler = null; + } + log.info("定时器已关闭"); + } + + public static void init() { + scheduler = Executors.newScheduledThreadPool(1); + log.info("定时器初始化成功"); + } + + public static void setInterval(Runnable command, long period) { + scheduler.scheduleAtFixedRate(command, 0, period, TimeUnit.SECONDS); + } + + public static void setTimeout(Runnable command, long delay) { + scheduler.schedule(command, delay, TimeUnit.SECONDS); + } +} diff --git a/src/main/java/com/njzscloud/common/tuqiang/Tuqiang.java b/src/main/java/com/njzscloud/common/tuqiang/Tuqiang.java index 1a453d6..42984e5 100644 --- a/src/main/java/com/njzscloud/common/tuqiang/Tuqiang.java +++ b/src/main/java/com/njzscloud/common/tuqiang/Tuqiang.java @@ -24,6 +24,10 @@ public final class Tuqiang { JT808.run(port, bossThreads, workerThreads); } + public static void stop() { + JT808.stop(); + } + private static void addListener(Object object) { Method[] methods = object.getClass().getDeclaredMethods(); for (Method method : methods) { @@ -61,6 +65,10 @@ public final class Tuqiang { JT808.sendMessage(JT808.createBaseMessage(terminalId, 0x8300, new PublishTxtMsg("").toBytes())); } + public static void publishDeviceInfo() { + JT808.publishDeviceInfo(); + } + /** * 设置心跳 * @@ -71,7 +79,7 @@ public final class Tuqiang { body.writeByte(1); body.writeByte(1); body.writeInt(0x0001); - // body.writeByte(1); + body.writeByte(1); body.writeByte(interval); byte[] bytes = ByteBufUtil.getBytes(body); body.release(); diff --git a/src/main/java/com/njzscloud/common/tuqiang/msg/DeviceInfo.java b/src/main/java/com/njzscloud/common/tuqiang/msg/DeviceInfo.java index 2c3935f..753dd2f 100644 --- a/src/main/java/com/njzscloud/common/tuqiang/msg/DeviceInfo.java +++ b/src/main/java/com/njzscloud/common/tuqiang/msg/DeviceInfo.java @@ -10,6 +10,7 @@ import lombok.experimental.Accessors; @ToString @Accessors(chain = true) public class DeviceInfo { + private String terminalId; // region GPS定位状态 /** * 这是 GPS 定位的核心状态,三个参数分别代表: diff --git a/src/main/java/com/njzscloud/result/RealtimeLocationResult.java b/src/main/java/com/njzscloud/result/RealtimeLocationResult.java index 3698e70..7f95373 100644 --- a/src/main/java/com/njzscloud/result/RealtimeLocationResult.java +++ b/src/main/java/com/njzscloud/result/RealtimeLocationResult.java @@ -11,7 +11,7 @@ import lombok.experimental.Accessors; @Accessors(chain = true) public class RealtimeLocationResult { private String terminalId; - private String type; + private int type; /** * 1 1:超速报警 标志维持至报警条件解除