diff --git a/pom.xml b/pom.xml
index 945d5a6..b4bba2f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
4.0.0
com.njzscloud
- localizer
+ gps
0.0.1
jar
diff --git a/src/main/java/com/njzscloud/App.java b/src/main/java/com/njzscloud/App.java
index 0caace6..3974a40 100644
--- a/src/main/java/com/njzscloud/App.java
+++ b/src/main/java/com/njzscloud/App.java
@@ -1,14 +1,24 @@
package com.njzscloud;
import com.njzscloud.common.mqtt.Mqtt;
+import com.njzscloud.common.sichen.Sichen;
import com.njzscloud.common.tuqiang.Tuqiang;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class App {
public static void main(String[] args) {
- Mqtt.run("tcp://localhost:1883", "tuqiang", "admin", "123456");
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ Mqtt.shutdown();
+ Tuqiang.stop();
+ // Sichen.shutdown();
+ }));
+
+ Mqtt.run("tcp://139.224.54.144:1883", "tuqiang", "gps", "TKG4TV3dF7CeazDnUdCF");
+ // Mqtt.run("tcp://localhost:1883", "tuqiang", "admin", "123456");
Mqtt.addListener(new MqttMsgHandlers());
+ Sichen.init();
+ // Sichen.setInterval(Tuqiang::publishDeviceInfo, 10);
Tuqiang.run(18888, 1, 2);
}
}
diff --git a/src/main/java/com/njzscloud/TuqiangListeners.java b/src/main/java/com/njzscloud/TuqiangListeners.java
index 94aeba4..f0a65d0 100644
--- a/src/main/java/com/njzscloud/TuqiangListeners.java
+++ b/src/main/java/com/njzscloud/TuqiangListeners.java
@@ -64,7 +64,7 @@ public class TuqiangListeners {
BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class);
RealtimeLocationResult realtimeLocationResult = BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class)
.setTerminalId(terminalId)
- .setType(type == 0 ? "正常位置汇报" : "盲区补报");
+ .setType(type);
Mqtt.publish(terminalId + "/track_location", realtimeLocationResult);
}
}
@@ -96,6 +96,8 @@ public class TuqiangListeners {
DeviceInfo deviceInfo = DeviceInfo.parse(txt);
if (deviceInfo != null) {
String terminalId = message.getTerminalId();
+ deviceInfo.setTerminalId(terminalId);
+
Mqtt.publish(terminalId + "/device_info", deviceInfo);
}
}
diff --git a/src/main/java/com/njzscloud/common/jt808/JT808.java b/src/main/java/com/njzscloud/common/jt808/JT808.java
index f279c09..f2f9d72 100644
--- a/src/main/java/com/njzscloud/common/jt808/JT808.java
+++ b/src/main/java/com/njzscloud/common/jt808/JT808.java
@@ -5,6 +5,7 @@ import com.njzscloud.common.jt808.support.JT808MessageResolver;
import com.njzscloud.common.jt808.support.MessageBody;
import com.njzscloud.common.jt808.util.FlowId;
import com.njzscloud.common.tcp.TcpServer;
+import com.njzscloud.common.tuqiang.msg.PublishTxtMsg;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
@@ -15,6 +16,7 @@ import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
@@ -113,6 +115,13 @@ public final class JT808 {
}
}
+ public static void publishDeviceInfo() {
+ Set terminalIds = terminalChannels.keySet();
+ for (String terminalId : terminalIds) {
+ JT808.sendMessage(JT808.createBaseMessage(terminalId, 0x8300, new PublishTxtMsg("").toBytes()));
+ }
+ }
+
/**
* 创建基础消息对象
*/
diff --git a/src/main/java/com/njzscloud/common/jt808/support/JT808Decoder.java b/src/main/java/com/njzscloud/common/jt808/support/JT808Decoder.java
index da74f55..4e6afe4 100644
--- a/src/main/java/com/njzscloud/common/jt808/support/JT808Decoder.java
+++ b/src/main/java/com/njzscloud/common/jt808/support/JT808Decoder.java
@@ -63,6 +63,7 @@ public class JT808Decoder extends ByteToMessageDecoder {
// 解析消息内容
JT808Message message = parseMessage(unescapedBuf);
+ unescapedBuf.release();
if (message != null) {
out.add(message);
}
diff --git a/src/main/java/com/njzscloud/common/jt808/support/JT808Encoder.java b/src/main/java/com/njzscloud/common/jt808/support/JT808Encoder.java
index 3b23134..94eb6b5 100644
--- a/src/main/java/com/njzscloud/common/jt808/support/JT808Encoder.java
+++ b/src/main/java/com/njzscloud/common/jt808/support/JT808Encoder.java
@@ -59,6 +59,7 @@ public class JT808Encoder extends MessageToByteEncoder {
// 12. 写入结束符
out.writeByte(MSG_DELIMITER);
+ escapedBuf.release();
} finally {
// 释放缓冲区
if (contentBuf.refCnt() > 0) {
diff --git a/src/main/java/com/njzscloud/common/mqtt/Mqtt.java b/src/main/java/com/njzscloud/common/mqtt/Mqtt.java
index 0b6ac23..61f20f1 100644
--- a/src/main/java/com/njzscloud/common/mqtt/Mqtt.java
+++ b/src/main/java/com/njzscloud/common/mqtt/Mqtt.java
@@ -1,5 +1,6 @@
package com.njzscloud.common.mqtt;
+import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;
import com.njzscloud.common.core.jackson.Jackson;
import com.njzscloud.common.mqtt.support.MqttListener;
@@ -8,9 +9,11 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import java.lang.reflect.Method;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
@@ -19,11 +22,23 @@ import java.util.function.Consumer;
public final class Mqtt {
private static final Map> listeners = new ConcurrentHashMap<>();
private static MqttClient client;
+ private static String broker;
+ private static String clientId;
+ private static String username;
+ private static String password;
public static void run(String broker, String clientId, String username, String password) {
try {
- client = new MqttClient(broker, clientId);
+ Mqtt.broker = broker;
+ Mqtt.clientId = clientId;
+ Mqtt.username = username;
+ Mqtt.password = password;
+ MqttDefaultFilePersistence persistence = new MqttDefaultFilePersistence("logs/mqtt");
+ client = new MqttClient(broker, clientId, persistence);
MqttConnectOptions options = new MqttConnectOptions();
+ options.setAutomaticReconnect(true);
+ options.setConnectionTimeout(1000);
+ options.setMaxReconnectDelay(30000);
if (StrUtil.isNotBlank(username) && StrUtil.isNotBlank(password)) {
options.setUserName(username);
options.setPassword(password.toCharArray());
@@ -35,9 +50,18 @@ public final class Mqtt {
} catch (Exception e) {
log.error("mqtt连接失败", e);
shutdown();
+ // reconnect();
}
}
+ private static void reconnect() {
+ CompletableFuture.runAsync(() -> {
+ ThreadUtil.sleep(10000);
+ log.error("mqtt 10 秒后重新连接:{} {}", broker, clientId);
+ run(broker, clientId, username, password);
+ });
+ }
+
public static void shutdown() {
try {
if (client != null) {
@@ -47,6 +71,7 @@ public final class Mqtt {
} catch (Exception e) {
log.error("mqtt关闭失败", e);
}
+ log.info("mqtt 关闭成功");
}
public static void subscribe(String topic, int qos) {
@@ -108,6 +133,8 @@ public final class Mqtt {
}
public static class MsgHandler implements MqttCallback {
+
+ @Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Consumer handler = listeners.get(topic);
if (handler == null) return;
@@ -118,12 +145,15 @@ public final class Mqtt {
}
}
+ @Override
public void connectionLost(Throwable cause) {
log.error("mqtt 连接已断开", cause);
+ // reconnect();
}
+ @Override
public void deliveryComplete(IMqttDeliveryToken token) {
- log.info("消息投递结果:{}", token.isComplete());
+ // log.info("消息投递结果:{}", token.isComplete());
}
}
}
diff --git a/src/main/java/com/njzscloud/common/sichen/Sichen.java b/src/main/java/com/njzscloud/common/sichen/Sichen.java
new file mode 100644
index 0000000..aa41084
--- /dev/null
+++ b/src/main/java/com/njzscloud/common/sichen/Sichen.java
@@ -0,0 +1,37 @@
+package com.njzscloud.common.sichen;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class Sichen {
+ private static ScheduledExecutorService scheduler;
+
+ public static void init(int corePoolSize) {
+ scheduler = Executors.newScheduledThreadPool(corePoolSize);
+ }
+
+ public static void shutdown() {
+ if (scheduler != null) {
+ scheduler.shutdown();
+ scheduler = null;
+ }
+ log.info("定时器已关闭");
+ }
+
+ public static void init() {
+ scheduler = Executors.newScheduledThreadPool(1);
+ log.info("定时器初始化成功");
+ }
+
+ public static void setInterval(Runnable command, long period) {
+ scheduler.scheduleAtFixedRate(command, 0, period, TimeUnit.SECONDS);
+ }
+
+ public static void setTimeout(Runnable command, long delay) {
+ scheduler.schedule(command, delay, TimeUnit.SECONDS);
+ }
+}
diff --git a/src/main/java/com/njzscloud/common/tuqiang/Tuqiang.java b/src/main/java/com/njzscloud/common/tuqiang/Tuqiang.java
index 1a453d6..42984e5 100644
--- a/src/main/java/com/njzscloud/common/tuqiang/Tuqiang.java
+++ b/src/main/java/com/njzscloud/common/tuqiang/Tuqiang.java
@@ -24,6 +24,10 @@ public final class Tuqiang {
JT808.run(port, bossThreads, workerThreads);
}
+ public static void stop() {
+ JT808.stop();
+ }
+
private static void addListener(Object object) {
Method[] methods = object.getClass().getDeclaredMethods();
for (Method method : methods) {
@@ -61,6 +65,10 @@ public final class Tuqiang {
JT808.sendMessage(JT808.createBaseMessage(terminalId, 0x8300, new PublishTxtMsg("").toBytes()));
}
+ public static void publishDeviceInfo() {
+ JT808.publishDeviceInfo();
+ }
+
/**
* 设置心跳
*
@@ -71,7 +79,7 @@ public final class Tuqiang {
body.writeByte(1);
body.writeByte(1);
body.writeInt(0x0001);
- // body.writeByte(1);
+ body.writeByte(1);
body.writeByte(interval);
byte[] bytes = ByteBufUtil.getBytes(body);
body.release();
diff --git a/src/main/java/com/njzscloud/common/tuqiang/msg/DeviceInfo.java b/src/main/java/com/njzscloud/common/tuqiang/msg/DeviceInfo.java
index 2c3935f..753dd2f 100644
--- a/src/main/java/com/njzscloud/common/tuqiang/msg/DeviceInfo.java
+++ b/src/main/java/com/njzscloud/common/tuqiang/msg/DeviceInfo.java
@@ -10,6 +10,7 @@ import lombok.experimental.Accessors;
@ToString
@Accessors(chain = true)
public class DeviceInfo {
+ private String terminalId;
// region GPS定位状态
/**
* 这是 GPS 定位的核心状态,三个参数分别代表:
diff --git a/src/main/java/com/njzscloud/result/RealtimeLocationResult.java b/src/main/java/com/njzscloud/result/RealtimeLocationResult.java
index 3698e70..7f95373 100644
--- a/src/main/java/com/njzscloud/result/RealtimeLocationResult.java
+++ b/src/main/java/com/njzscloud/result/RealtimeLocationResult.java
@@ -11,7 +11,7 @@ import lombok.experimental.Accessors;
@Accessors(chain = true)
public class RealtimeLocationResult {
private String terminalId;
- private String type;
+ private int type;
/**
* 1 1:超速报警 标志维持至报警条件解除