lzq 2025-09-28 18:03:44 +08:00
parent ee5b061e4d
commit 2c9b0a8c91
11 changed files with 106 additions and 7 deletions

View File

@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>com.njzscloud</groupId> <groupId>com.njzscloud</groupId>
<artifactId>localizer</artifactId> <artifactId>gps</artifactId>
<version>0.0.1</version> <version>0.0.1</version>
<packaging>jar</packaging> <packaging>jar</packaging>

View File

@ -1,14 +1,24 @@
package com.njzscloud; package com.njzscloud;
import com.njzscloud.common.mqtt.Mqtt; import com.njzscloud.common.mqtt.Mqtt;
import com.njzscloud.common.sichen.Sichen;
import com.njzscloud.common.tuqiang.Tuqiang; import com.njzscloud.common.tuqiang.Tuqiang;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
public class App { public class App {
public static void main(String[] args) { public static void main(String[] args) {
Mqtt.run("tcp://localhost:1883", "tuqiang", "admin", "123456"); Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Mqtt.shutdown();
Tuqiang.stop();
// Sichen.shutdown();
}));
Mqtt.run("tcp://139.224.54.144:1883", "tuqiang", "gps", "TKG4TV3dF7CeazDnUdCF");
// Mqtt.run("tcp://localhost:1883", "tuqiang", "admin", "123456");
Mqtt.addListener(new MqttMsgHandlers()); Mqtt.addListener(new MqttMsgHandlers());
Sichen.init();
// Sichen.setInterval(Tuqiang::publishDeviceInfo, 10);
Tuqiang.run(18888, 1, 2); Tuqiang.run(18888, 1, 2);
} }
} }

View File

@ -64,7 +64,7 @@ public class TuqiangListeners {
BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class); BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class);
RealtimeLocationResult realtimeLocationResult = BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class) RealtimeLocationResult realtimeLocationResult = BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class)
.setTerminalId(terminalId) .setTerminalId(terminalId)
.setType(type == 0 ? "正常位置汇报" : "盲区补报"); .setType(type);
Mqtt.publish(terminalId + "/track_location", realtimeLocationResult); Mqtt.publish(terminalId + "/track_location", realtimeLocationResult);
} }
} }
@ -96,6 +96,8 @@ public class TuqiangListeners {
DeviceInfo deviceInfo = DeviceInfo.parse(txt); DeviceInfo deviceInfo = DeviceInfo.parse(txt);
if (deviceInfo != null) { if (deviceInfo != null) {
String terminalId = message.getTerminalId(); String terminalId = message.getTerminalId();
deviceInfo.setTerminalId(terminalId);
Mqtt.publish(terminalId + "/device_info", deviceInfo); Mqtt.publish(terminalId + "/device_info", deviceInfo);
} }
} }

View File

@ -5,6 +5,7 @@ import com.njzscloud.common.jt808.support.JT808MessageResolver;
import com.njzscloud.common.jt808.support.MessageBody; import com.njzscloud.common.jt808.support.MessageBody;
import com.njzscloud.common.jt808.util.FlowId; import com.njzscloud.common.jt808.util.FlowId;
import com.njzscloud.common.tcp.TcpServer; import com.njzscloud.common.tcp.TcpServer;
import com.njzscloud.common.tuqiang.msg.PublishTxtMsg;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil; import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
@ -15,6 +16,7 @@ import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -113,6 +115,13 @@ public final class JT808 {
} }
} }
public static void publishDeviceInfo() {
Set<String> terminalIds = terminalChannels.keySet();
for (String terminalId : terminalIds) {
JT808.sendMessage(JT808.createBaseMessage(terminalId, 0x8300, new PublishTxtMsg("<CKBSJ>").toBytes()));
}
}
/** /**
* *
*/ */

View File

@ -63,6 +63,7 @@ public class JT808Decoder extends ByteToMessageDecoder {
// 解析消息内容 // 解析消息内容
JT808Message message = parseMessage(unescapedBuf); JT808Message message = parseMessage(unescapedBuf);
unescapedBuf.release();
if (message != null) { if (message != null) {
out.add(message); out.add(message);
} }

View File

@ -59,6 +59,7 @@ public class JT808Encoder extends MessageToByteEncoder<JT808Message> {
// 12. 写入结束符 // 12. 写入结束符
out.writeByte(MSG_DELIMITER); out.writeByte(MSG_DELIMITER);
escapedBuf.release();
} finally { } finally {
// 释放缓冲区 // 释放缓冲区
if (contentBuf.refCnt() > 0) { if (contentBuf.refCnt() > 0) {

View File

@ -1,5 +1,6 @@
package com.njzscloud.common.mqtt; package com.njzscloud.common.mqtt;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.njzscloud.common.core.jackson.Jackson; import com.njzscloud.common.core.jackson.Jackson;
import com.njzscloud.common.mqtt.support.MqttListener; import com.njzscloud.common.mqtt.support.MqttListener;
@ -8,9 +9,11 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -19,11 +22,23 @@ import java.util.function.Consumer;
public final class Mqtt { public final class Mqtt {
private static final Map<String, Consumer<MqttMsg>> listeners = new ConcurrentHashMap<>(); private static final Map<String, Consumer<MqttMsg>> listeners = new ConcurrentHashMap<>();
private static MqttClient client; private static MqttClient client;
private static String broker;
private static String clientId;
private static String username;
private static String password;
public static void run(String broker, String clientId, String username, String password) { public static void run(String broker, String clientId, String username, String password) {
try { try {
client = new MqttClient(broker, clientId); Mqtt.broker = broker;
Mqtt.clientId = clientId;
Mqtt.username = username;
Mqtt.password = password;
MqttDefaultFilePersistence persistence = new MqttDefaultFilePersistence("logs/mqtt");
client = new MqttClient(broker, clientId, persistence);
MqttConnectOptions options = new MqttConnectOptions(); MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setConnectionTimeout(1000);
options.setMaxReconnectDelay(30000);
if (StrUtil.isNotBlank(username) && StrUtil.isNotBlank(password)) { if (StrUtil.isNotBlank(username) && StrUtil.isNotBlank(password)) {
options.setUserName(username); options.setUserName(username);
options.setPassword(password.toCharArray()); options.setPassword(password.toCharArray());
@ -35,9 +50,18 @@ public final class Mqtt {
} catch (Exception e) { } catch (Exception e) {
log.error("mqtt连接失败", e); log.error("mqtt连接失败", e);
shutdown(); shutdown();
// reconnect();
} }
} }
private static void reconnect() {
CompletableFuture.runAsync(() -> {
ThreadUtil.sleep(10000);
log.error("mqtt 10 秒后重新连接:{} {}", broker, clientId);
run(broker, clientId, username, password);
});
}
public static void shutdown() { public static void shutdown() {
try { try {
if (client != null) { if (client != null) {
@ -47,6 +71,7 @@ public final class Mqtt {
} catch (Exception e) { } catch (Exception e) {
log.error("mqtt关闭失败", e); log.error("mqtt关闭失败", e);
} }
log.info("mqtt 关闭成功");
} }
public static void subscribe(String topic, int qos) { public static void subscribe(String topic, int qos) {
@ -108,6 +133,8 @@ public final class Mqtt {
} }
public static class MsgHandler implements MqttCallback { public static class MsgHandler implements MqttCallback {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception { public void messageArrived(String topic, MqttMessage message) throws Exception {
Consumer<MqttMsg> handler = listeners.get(topic); Consumer<MqttMsg> handler = listeners.get(topic);
if (handler == null) return; if (handler == null) return;
@ -118,12 +145,15 @@ public final class Mqtt {
} }
} }
@Override
public void connectionLost(Throwable cause) { public void connectionLost(Throwable cause) {
log.error("mqtt 连接已断开", cause); log.error("mqtt 连接已断开", cause);
// reconnect();
} }
@Override
public void deliveryComplete(IMqttDeliveryToken token) { public void deliveryComplete(IMqttDeliveryToken token) {
log.info("消息投递结果:{}", token.isComplete()); // log.info("消息投递结果:{}", token.isComplete());
} }
} }
} }

View File

@ -0,0 +1,37 @@
package com.njzscloud.common.sichen;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Slf4j
public class Sichen {
private static ScheduledExecutorService scheduler;
public static void init(int corePoolSize) {
scheduler = Executors.newScheduledThreadPool(corePoolSize);
}
public static void shutdown() {
if (scheduler != null) {
scheduler.shutdown();
scheduler = null;
}
log.info("定时器已关闭");
}
public static void init() {
scheduler = Executors.newScheduledThreadPool(1);
log.info("定时器初始化成功");
}
public static void setInterval(Runnable command, long period) {
scheduler.scheduleAtFixedRate(command, 0, period, TimeUnit.SECONDS);
}
public static void setTimeout(Runnable command, long delay) {
scheduler.schedule(command, delay, TimeUnit.SECONDS);
}
}

View File

@ -24,6 +24,10 @@ public final class Tuqiang {
JT808.run(port, bossThreads, workerThreads); JT808.run(port, bossThreads, workerThreads);
} }
public static void stop() {
JT808.stop();
}
private static void addListener(Object object) { private static void addListener(Object object) {
Method[] methods = object.getClass().getDeclaredMethods(); Method[] methods = object.getClass().getDeclaredMethods();
for (Method method : methods) { for (Method method : methods) {
@ -61,6 +65,10 @@ public final class Tuqiang {
JT808.sendMessage(JT808.createBaseMessage(terminalId, 0x8300, new PublishTxtMsg("<CKBSJ>").toBytes())); JT808.sendMessage(JT808.createBaseMessage(terminalId, 0x8300, new PublishTxtMsg("<CKBSJ>").toBytes()));
} }
public static void publishDeviceInfo() {
JT808.publishDeviceInfo();
}
/** /**
* *
* *
@ -71,7 +79,7 @@ public final class Tuqiang {
body.writeByte(1); body.writeByte(1);
body.writeByte(1); body.writeByte(1);
body.writeInt(0x0001); body.writeInt(0x0001);
// body.writeByte(1); body.writeByte(1);
body.writeByte(interval); body.writeByte(interval);
byte[] bytes = ByteBufUtil.getBytes(body); byte[] bytes = ByteBufUtil.getBytes(body);
body.release(); body.release();

View File

@ -10,6 +10,7 @@ import lombok.experimental.Accessors;
@ToString @ToString
@Accessors(chain = true) @Accessors(chain = true)
public class DeviceInfo { public class DeviceInfo {
private String terminalId;
// region GPS定位状态 // region GPS定位状态
/** /**
* GPS * GPS

View File

@ -11,7 +11,7 @@ import lombok.experimental.Accessors;
@Accessors(chain = true) @Accessors(chain = true)
public class RealtimeLocationResult { public class RealtimeLocationResult {
private String terminalId; private String terminalId;
private String type; private int type;
/** /**
* 1 1 * 1 1