localizer
lzq 2025-10-21 17:50:06 +08:00
parent 6f93d9d40f
commit d50f9a709d
41 changed files with 996 additions and 576 deletions

View File

@ -1,24 +1,32 @@
package com.njzscloud.common.localizer;
import cn.hutool.core.collection.CollUtil;
import com.njzscloud.common.localizer.contant.LocalizerType;
import com.njzscloud.common.mqtt.util.Mqtt;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@Slf4j
public class DeviceStore {
private static final LinkedList<Localizer> localizers = new LinkedList<>();
private static final ReadWriteLock localizersLock = new ReentrantReadWriteLock();
private static final Lock localizersWriteLock = localizersLock.writeLock();
private static final Lock localizersReadLock = localizersLock.readLock();
public static void init() {
Mqtt.publish("localizer/loader");
}
public static void setLocalizers(LinkedList<Localizer> list) {
localizers.clear();
localizersWriteLock.lock();
try {
localizers.addAll(list);
log.info("重新加载设备:{}", list.size());
localizers.clear();
if (CollUtil.isNotEmpty(list)) localizers.addAll(list);
} finally {
localizersWriteLock.unlock();
}
@ -39,15 +47,15 @@ public class DeviceStore {
}
public static LocalizerType determineType(String terminalId) {
Lock localizerInfosReadLock = localizersLock.readLock();
localizersReadLock.lock();
try {
for (Localizer localizer : localizers) {
if (localizer.getTerminalId().equals(terminalId)) {
return localizer.getType();
return localizer.getLocalizerType();
}
}
} finally {
localizerInfosReadLock.unlock();
localizersReadLock.unlock();
}
return null;
}

View File

@ -1,18 +1,7 @@
package com.njzscloud.common.localizer;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.map.MapUtil;
import com.njzscloud.common.localizer.contant.LocalizerType;
import com.njzscloud.common.localizer.jt808.message.TerminalMessageBody;
import com.njzscloud.common.localizer.jt808.message.TerminalTxtMessage;
import com.njzscloud.common.localizer.mqtt.result.RealtimeLocationResult;
import com.njzscloud.common.localizer.tuqiang.message.LocationReportMessage;
import com.njzscloud.common.localizer.tuqiang.message.SearchDeviceInfoMessage;
import com.njzscloud.common.localizer.tuqiang.message.SearchLocationResponseMessage;
import com.njzscloud.common.mqtt.util.Mqtt;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import com.njzscloud.common.localizer.jt808.JT808;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@ -24,7 +13,7 @@ import lombok.extern.slf4j.Slf4j;
@Setter
@ToString
@Accessors(chain = true)
public abstract class Localizer {
public class Localizer {
/**
* Id
*/
@ -33,121 +22,10 @@ public abstract class Localizer {
/**
*
*/
protected LocalizerType type;
/**
*
*/
protected Channel channel;
protected LocalizerType localizerType;
private int speedThreshold;
public boolean isOnline() {
return channel != null && channel.isActive();
}
/**
*
*
* @param interval
*/
public abstract void track(int interval);
/**
*
*
*/
public abstract void enableWarn(boolean enable);
/**
*
*
* @param speed km/h
*/
public abstract void speedThreshold(int speed);
/**
* IP
*
* @param ip IP
* @param port
*/
public abstract void configIpPort(String ip, int port);
/**
*
*/
public void onHeartbeat(TerminalMessageBody message) {
String terminalId = message.getTerminalId();
Mqtt.publish(terminalId + "/online", MapUtil.builder()
.put("online", true)
.put("terminalId", terminalId)
.put("time", DateUtil.now())
.build());
}
/**
*
*/
public void onLocationReport(TerminalMessageBody message) {
ByteBuf byteBuf = message.getBody();
LocationReportMessage locationReportMsg = new LocationReportMessage(byteBuf);
// log.info("终端位置信息汇报消息: {}", locationReportMsg);
String terminalId = message.getTerminalId();
RealtimeLocationResult realtimeLocationResult = BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class)
.setTerminalId(terminalId);
Mqtt.publish(terminalId + "/track_location", realtimeLocationResult);
Mqtt.publish(terminalId + "/track_location_real", realtimeLocationResult);
}
/**
*
*/
public void onLocationBatchUpload(TerminalMessageBody message) {
String terminalId = message.getTerminalId();
ByteBuf body = message.getBody();
int count = body.readUnsignedShort();
int type = body.readByte();
for (int i = 0; i < count; i++) {
int length = body.readUnsignedShort();
LocationReportMessage locationReportMsg = new LocationReportMessage(body.slice(body.readerIndex(), length));
body.skipBytes(length);
RealtimeLocationResult realtimeLocationResult = BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class)
.setTerminalId(terminalId)
.setType(type);
Mqtt.publish(terminalId + "/track_location", realtimeLocationResult);
Mqtt.publish(terminalId + "/track_location_real", realtimeLocationResult);
}
}
/**
*
*/
public void onSearchLocationResponse(TerminalMessageBody message) {
String terminalId = message.getTerminalId();
ByteBuf body = message.getBody();
SearchLocationResponseMessage searchLocationResponseMsg = new SearchLocationResponseMessage(body);
// log.info("查询位置响应消息: {}", searchLocationResponseMsg);
LocationReportMessage locationReportMsg = searchLocationResponseMsg.getLocationReportMessage();
RealtimeLocationResult realtimeLocationResult = BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class)
.setTerminalId(terminalId);
Mqtt.publish(terminalId + "/current_location", realtimeLocationResult);
}
/**
*
*/
public void onTerminalTxtReport(TerminalMessageBody message) {
TerminalTxtMessage terminalTxtReportMsg = new TerminalTxtMessage(message.getBody());
String txt = terminalTxtReportMsg.getTxt();
log.info("终端文本信息汇报消息: {}", txt);
SearchDeviceInfoMessage deviceInfo = SearchDeviceInfoMessage.parse(txt);
if (deviceInfo != null) {
String terminalId = message.getTerminalId();
deviceInfo.setTerminalId(terminalId);
Mqtt.publish(terminalId + "/device_info", deviceInfo);
}
return JT808.isOnline(terminalId);
}
}

View File

@ -1,6 +1,12 @@
package com.njzscloud.common.localizer.config;
import com.njzscloud.common.localizer.DeviceStore;
import com.njzscloud.common.localizer.htzj.Htzj;
import com.njzscloud.common.localizer.jt808.JT808;
import com.njzscloud.common.localizer.tuqiang.Tuqiang;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
public class LocalizerInitializer {
private final LocalizerProperties properties;
@ -10,10 +16,17 @@ public class LocalizerInitializer {
}
public void init() {
Tuqiang.run(properties.getPort(), properties.getBossThreads(), properties.getWorkerThreads());
Htzj.init();
Tuqiang.init();
JT808.run(properties.getPort(), properties.getBossThreads(), properties.getWorkerThreads());
}
private void destroy() {
Tuqiang.stop();
JT808.stop();
}
@EventListener(ApplicationReadyEvent.class)
public void onApplicationReady() {
DeviceStore.init();
}
}

View File

@ -6,14 +6,17 @@ import lombok.RequiredArgsConstructor;
@Getter
@RequiredArgsConstructor
public enum MessageType {
ServerRegisterReply(0x8100),
ServerTxtMsg(0x8300),
ServerGeneralReply(0x8001),
TerminalRegister(0x0100),
TerminalAuth(0x0102),
GeneralReport(0x0001),
Heartbeat(0x0002),
LocationReport(0x0200),
LocationBatchReport(0x704),
LocationSearchReport(0x0201),
TxtReport(0x6006),
TerminalGeneralReply(0x0001),
TerminalHeartbeat(0x0002),
TerminalLocationReport(0x0200),
TerminalLocationBatchReport(0x704),
TerminalLocationSearchReply(0x0201),
TerminalTxtMsg(0x6006),
;
private final int messageId;

View File

@ -0,0 +1,109 @@
package com.njzscloud.common.localizer.htzj;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;
import com.njzscloud.common.localizer.contant.BodyCharset;
import com.njzscloud.common.localizer.htzj.listener.HtzjListeners;
import com.njzscloud.common.localizer.jt808.JT808;
import com.njzscloud.common.localizer.jt808.message.JT808Message;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class Htzj {
private static HtzjListeners htzjListeners = new HtzjListeners();
public static void init() {
htzjListeners = new HtzjListeners();
}
/**
*
* <pre>
* SR
* devid?
* reset
* wakelv,8
* nmset,0,X
* X3~600
*
* nmset,0,30
* 030
* nmset,0,180
* 03</pre>
*
* @param terminalId ID
* @param directive
*/
public static void sendDirective(String terminalId, String directive) {
log.info("发送航天紫金指令终端ID{},指令:{}", terminalId, directive);
JT808.sendTxtMessage(terminalId, directive, BodyCharset.ASCII);
}
/**
*
*
* @param terminalId ID
* @param interval 0-36000
*/
public static void track(String terminalId, int interval) {
if (interval < 3) {
interval = 3;
} else if (interval > 600) {
interval = 600;
}
log.info("开启航天紫金追踪终端ID{},上报间隔:{}秒3 秒后重启设备", terminalId, interval);
sendDirective(terminalId, StrUtil.format("nmset,0,{}", interval));
ThreadUtil.sleep(3000);
sendDirective(terminalId, "reset");
}
/**
*
*
* @param terminalId ID
*/
public static void obtainDeviceInfo(String terminalId) {
}
/**
*
*
* @param terminalId ID
*/
public static void enableWarn(String terminalId, boolean enable) {
}
/**
*
*
* @param terminalId ID
* @param speed km/h
*/
public static void speedThreshold(String terminalId, int speed) {
}
/**
*
*
* @param terminalId Id
* @param ip IP
* @param port
*/
public static void configIpPort(String terminalId, String ip, int port) {
}
/**
*
*
* @param terminalId ID
*/
public static void currentLocation(String terminalId) {
log.info("航天紫金获当前位置终端ID{}", terminalId);
JT808Message baseMessage = JT808.createBaseMessage(terminalId, 0x8201);
JT808.sendMessage(baseMessage);
}
}

View File

@ -0,0 +1,34 @@
package com.njzscloud.common.localizer.htzj.listener;
import com.njzscloud.common.localizer.contant.BodyCharset;
import com.njzscloud.common.localizer.contant.LocalizerType;
import com.njzscloud.common.localizer.jt808.message.TerminalMessageBody;
import com.njzscloud.common.localizer.jt808.message.TerminalTxtMessage;
import com.njzscloud.common.localizer.jt808.support.JT808MessageListener;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HtzjListeners extends JT808MessageListener {
public HtzjListeners() {
super(LocalizerType.HangTianZiJin);
}
/**
*
*/
@Override
public void onTxtReport(TerminalMessageBody message) {
byte[] body = message.getBody();
ByteBuf byteBuf = Unpooled.wrappedBuffer(body);
try {
TerminalTxtMessage terminalTxtReportMsg = new TerminalTxtMessage(byteBuf);
String txt = terminalTxtReportMsg.getTxt(BodyCharset.ASCII);
log.info("终端文本信息汇报消息: 【\n{}\n】", txt);
} catch (Exception e) {
byteBuf.release();
}
}
}

View File

@ -1,11 +1,9 @@
package com.njzscloud.common.localizer.jt808;
import com.njzscloud.common.core.utils.BCD;
import com.njzscloud.common.localizer.jt808.message.JT808Message;
import com.njzscloud.common.localizer.jt808.message.ServerGeneralResponseMessage;
import com.njzscloud.common.localizer.jt808.message.ServerTxtMessage;
import com.njzscloud.common.localizer.jt808.message.TerminalMessageBody;
import com.njzscloud.common.localizer.jt808.support.JT808MessageResolver;
import com.njzscloud.common.localizer.contant.BodyCharset;
import com.njzscloud.common.localizer.contant.MessageType;
import com.njzscloud.common.localizer.jt808.message.*;
import com.njzscloud.common.localizer.jt808.support.TcpServer;
import com.njzscloud.common.localizer.jt808.util.FlowId;
import io.netty.buffer.ByteBuf;
@ -18,7 +16,6 @@ import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
/**
* JT808
@ -45,13 +42,6 @@ public final class JT808 {
}
}
/**
*
*/
public static void addListener(String messageId, Consumer<TerminalMessageBody> listener) {
JT808MessageResolver.addResolver(messageId, listener);
}
/**
*
*/
@ -72,35 +62,44 @@ public final class JT808 {
}
}
public static boolean isOnline(String terminalId) {
Channel channel = DeviceHandle.get(terminalId);
return channel != null && channel.isActive();
}
/**
* (0x8001)
*/
public static void sendGeneralResponse(JT808Message message, int result) {
String terminalPhone = message.getTerminalPhone();
int flowId = message.getFlowId();
int messageId = message.getMessageId();
ServerGeneralResponseMessage response = new ServerGeneralResponseMessage(messageId, flowId, result);
message = JT808.createBaseMessage(terminalPhone, 0x8001, response.toBytes());
sendMessage(message);
public static void sendGeneralReply(String terminalId, int messageId, int flowId, int result) {
ServerGeneralReplyMessage replyMessage = new ServerGeneralReplyMessage(messageId, flowId, result);
sendMessage(terminalId, MessageType.ServerGeneralReply, replyMessage);
}
public static void sendGeneralResponse(JT808Message message) {
sendGeneralResponse(message, 0);
public static void sendGeneralReply(String terminalId, int messageId, int flowId) {
sendGeneralReply(terminalId, messageId, flowId, 0);
}
/**
* (0x8300)
*/
public static void sendTxtMessage(String terminalId, String txt, BodyCharset charset) {
ServerTxtMessage txtMessage = new ServerTxtMessage(txt, charset);
sendMessage(terminalId, MessageType.ServerTxtMsg, txtMessage);
}
public static void sendTxtMessage(String terminalId, String txt) {
ServerTxtMessage txtMessage = new ServerTxtMessage(txt);
JT808Message message = JT808.createBaseMessage(terminalId, 0x8300, txtMessage.toBytes());
sendMessage(message);
sendTxtMessage(terminalId, txt, BodyCharset.GBK);
}
/**
*
*/
public static void sendMessage(String terminalId, MessageType messageType, ServerMessageBody body) {
JT808Message message = JT808.createBaseMessage(terminalId, messageType.getMessageId(), body.toBytes());
sendMessage(message);
}
public static void sendMessage(JT808Message message) {
String terminalPhone = message.getTerminalPhone();
@ -206,5 +205,12 @@ public final class JT808 {
return true;
}
public static void sendServerRegisterReply(String terminalId, int flowId, int result, String authCode) {
ServerRegisterReplyMessage serverRegisterReplyMessage = new ServerRegisterReplyMessage(terminalId, flowId, result, authCode);
JT808.sendMessage(JT808.createBaseMessage(terminalId, MessageType.ServerRegisterReply.getMessageId(), serverRegisterReplyMessage.toBytes()));
}
public static void sendServerRegisterReply(String terminalId, int flowId, String authCode) {
sendServerRegisterReply(terminalId, flowId, 0, authCode);
}
}

View File

@ -1,40 +0,0 @@
package com.njzscloud.common.localizer.jt808;
import com.njzscloud.common.localizer.jt808.message.JT808Message;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
/**
*
*/
@Getter
@Setter
@ToString
@Accessors(chain = true)
public class TerminalMessageBody {
/**
* ID
*/
protected String terminalId;
/**
*
*/
protected byte[] body;
/**
* Id
*/
private int messageId;
/**
*
*/
private int flowId;
public TerminalMessageBody(JT808Message message) {
this.terminalId = message.getTerminalPhone();
this.messageId = message.getMessageId();
this.flowId = message.getFlowId();
this.body = message.getMessageBody();
}
}

View File

@ -1,4 +1,4 @@
package com.njzscloud.common.localizer.tuqiang.message;
package com.njzscloud.common.localizer.jt808.message;
import cn.hutool.core.date.DateUtil;
import com.njzscloud.common.core.utils.BCD;

View File

@ -1,4 +1,4 @@
package com.njzscloud.common.localizer.tuqiang.message;
package com.njzscloud.common.localizer.jt808.message;
import lombok.Getter;
import lombok.Setter;

View File

@ -1,4 +1,4 @@
package com.njzscloud.common.localizer.tuqiang.message;
package com.njzscloud.common.localizer.jt808.message;
import io.netty.buffer.ByteBuf;
import lombok.Getter;

View File

@ -14,7 +14,7 @@ import lombok.experimental.Accessors;
@ToString
@Accessors(chain = true)
@AllArgsConstructor
public class ServerGeneralResponseMessage implements ServerMessageBody {
public class ServerGeneralReplyMessage implements ServerMessageBody {
/**
* ID
*/

View File

@ -17,7 +17,7 @@ import lombok.experimental.Accessors;
@ToString
@Accessors(chain = true)
@AllArgsConstructor
public class ServerRegisterResponseMessage implements ServerMessageBody {
public class ServerRegisterReplyMessage implements ServerMessageBody {
/**
* ID
*/
@ -41,7 +41,9 @@ public class ServerRegisterResponseMessage implements ServerMessageBody {
try {
body.writeShort(flowId);
body.writeByte(result);
ByteBufUtil.writeUtf8(body, authCode);
if (result == 0) {
ByteBufUtil.writeAscii(body, authCode);
}
return ByteBufUtil.getBytes(body);
} finally {
body.release();

View File

@ -2,7 +2,6 @@ package com.njzscloud.common.localizer.jt808.message;
import cn.hutool.core.util.CharsetUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@ -18,11 +17,9 @@ public class TerminalAuthMessage {
*/
private String authCode;
public TerminalAuthMessage(byte[] bytes) {
ByteBuf body = Unpooled.wrappedBuffer(bytes);
public TerminalAuthMessage(ByteBuf body) {
byte[] authCodeBytes = new byte[body.readableBytes()];
body.readBytes(authCodeBytes);
this.authCode = new String(authCodeBytes, CharsetUtil.CHARSET_GBK);
body.release();
}
}

View File

@ -1,7 +1,6 @@
package com.njzscloud.common.localizer.jt808.message;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@ -28,12 +27,10 @@ public class TerminalGeneralResponseMessage {
*/
private int result;
public TerminalGeneralResponseMessage(byte[] bytes) {
ByteBuf body = Unpooled.wrappedBuffer(bytes);
public TerminalGeneralResponseMessage(ByteBuf body) {
this.flowId = body.readUnsignedShort();
this.messageId = body.readUnsignedShort();
this.result = body.readByte();
body.release();
}
public boolean isSuccess() {

View File

@ -1,6 +1,5 @@
package com.njzscloud.common.localizer.jt808.message;
import io.netty.buffer.ByteBuf;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@ -17,9 +16,24 @@ public class TerminalMessageBody {
/**
* ID
*/
private String terminalId;
protected String terminalId;
/**
* ()
*
*/
private ByteBuf body;
protected byte[] body;
/**
* Id
*/
private int messageId;
/**
*
*/
private int flowId;
public TerminalMessageBody(JT808Message message) {
this.terminalId = message.getTerminalPhone();
this.messageId = message.getMessageId();
this.flowId = message.getFlowId();
this.body = message.getMessageBody();
}
}

View File

@ -2,7 +2,6 @@ package com.njzscloud.common.localizer.jt808.message;
import cn.hutool.core.util.CharsetUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@ -53,8 +52,7 @@ public class TerminalRegisterMessage {
*/
private String plate;
public TerminalRegisterMessage(byte[] bytes) {
ByteBuf body = Unpooled.wrappedBuffer(bytes);
public TerminalRegisterMessage(ByteBuf body) {
// 读取省域IDWORD2字节大端序
this.province = body.readUnsignedShort();
@ -84,8 +82,6 @@ public class TerminalRegisterMessage {
byte[] plateBytes = new byte[body.readableBytes()];
body.readBytes(plateBytes);
this.plate = new String(plateBytes, CharsetUtil.CHARSET_GBK);
body.release();
}
}

View File

@ -1,34 +1,51 @@
package com.njzscloud.common.localizer.jt808.message;
import cn.hutool.core.util.CharsetUtil;
import com.njzscloud.common.core.ex.Exceptions;
import com.njzscloud.common.localizer.contant.BodyCharset;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
/**
*
*/
@Getter
@Setter
@ToString
@Accessors(chain = true)
public class TerminalTxtMessage {
/**
*
*/
private byte type;
@Getter
private final byte type;
/**
*
*/
private String txt;
private final byte[] txt;
public TerminalTxtMessage(ByteBuf body) {
this.type = body.readByte();
this.txt = body.toString(body.readerIndex(), body.readableBytes(),
this.type == 0x00 ?
CharsetUtil.CHARSET_GBK :
CharsetUtil.CHARSET_UTF_8);
this.txt = ByteBufUtil.getBytes(body);
}
public String getTxt() {
return new String(txt, this.type == 0x00 ?
CharsetUtil.CHARSET_GBK :
CharsetUtil.CHARSET_UTF_8);
}
public String getTxt(BodyCharset charset) {
switch (charset) {
case GBK:
return new String(txt, CharsetUtil.CHARSET_GBK);
case ASCII:
StringBuilder stringBuilder = new StringBuilder();
for (byte b : txt) {
stringBuilder.append((char) b);
}
return stringBuilder.toString();
default:
throw Exceptions.error("不支持的文本消息编码:{}", charset);
}
}
}

View File

@ -4,8 +4,8 @@ import com.njzscloud.common.localizer.DeviceStore;
import com.njzscloud.common.localizer.Localizer;
import com.njzscloud.common.localizer.contant.LocalizerType;
import com.njzscloud.common.localizer.contant.MessageType;
import com.njzscloud.common.localizer.jt808.TerminalMessageBody;
import com.njzscloud.common.localizer.jt808.message.JT808Message;
import com.njzscloud.common.localizer.jt808.message.TerminalMessageBody;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
@ -17,7 +17,7 @@ import java.util.function.Consumer;
*/
@Slf4j
public class JT808MessageCenter {
private static final Map<LocalizerType, Map<MessageType, Consumer<TerminalMessageBody>>> resolvers = new ConcurrentHashMap<>();
private static final Map<LocalizerType, Map<MessageType, Consumer<TerminalMessageBody>>> listeners = new ConcurrentHashMap<>();
public static void dispatch(JT808Message message) {
int messageId = message.getMessageId();
@ -27,12 +27,12 @@ public class JT808MessageCenter {
log.error("未找到设备:{}", terminalId);
return;
}
LocalizerType localizerType = localizer.getType();
LocalizerType localizerType = localizer.getLocalizerType();
Map<MessageType, Consumer<TerminalMessageBody>> m = resolvers.computeIfPresent(localizerType, (k, v) -> {
Map<MessageType, Consumer<TerminalMessageBody>> m = listeners.computeIfPresent(localizerType, (k, v) -> {
MessageType messageType = MessageType.getMessageType(messageId);
if (messageType == null) {
log.error("未找到消息类型,设备号:{},消息 Id{}", terminalId, messageId);
log.error("未找到消息类型,设备号:{},消息 Id0x{}", terminalId, Integer.toHexString(messageId));
return v;
}
Consumer<TerminalMessageBody> c = v.computeIfPresent(messageType, (k1, v1) -> {
@ -53,7 +53,11 @@ public class JT808MessageCenter {
}
}
public static void addResolver(LocalizerType localizerType, MessageType messageType, Consumer<TerminalMessageBody> resolver) {
resolvers.computeIfAbsent(localizerType, k -> new ConcurrentHashMap<>()).put(messageType, resolver);
public static void addListener(LocalizerType localizerType, MessageType messageType, Consumer<TerminalMessageBody> listener) {
listeners.computeIfAbsent(localizerType, k -> new ConcurrentHashMap<>()).put(messageType, listener);
}
public static void addListeners(LocalizerType localizerType, Map<MessageType, Consumer<TerminalMessageBody>> listeners) {
listeners.forEach((messageType, consumer) -> addListener(localizerType, messageType, consumer));
}
}

View File

@ -1,12 +1,11 @@
package com.njzscloud.common.localizer.jt808.support;
import com.njzscloud.common.localizer.DeviceStore;
import com.njzscloud.common.localizer.Localizer;
import com.njzscloud.common.localizer.jt808.JT808;
import com.njzscloud.common.localizer.jt808.message.JT808Message;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
/**
@ -15,7 +14,7 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class JT808MessageHandler extends ChannelInboundHandlerAdapter {
// 管理所有连接的客户端
private static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
// private static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
@ -26,9 +25,17 @@ public class JT808MessageHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof JT808Message) {
JT808Message message = (JT808Message) msg;
String terminalPhone = message.getTerminalPhone();
Localizer localizer = DeviceStore.determineLocalizer(terminalPhone);
if (localizer == null) {
// log.warn("设备不存在:{}", terminalPhone);
ctx.channel().close();
return;
}
int messageId = message.getMessageId();
if (messageId == 0x0100) {
JT808.register(message.getTerminalPhone(), ctx.channel());
log.info("客户端连接: {} {}", terminalPhone, ctx.channel().remoteAddress());
JT808.register(terminalPhone, ctx.channel());
}
JT808MessageCenter.dispatch(message);
} else {
@ -38,8 +45,8 @@ public class JT808MessageHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("客户端断开连接: {}", ctx.channel().remoteAddress());
clients.remove(ctx.channel());
// log.warn("客户端断开连接: {}", ctx.channel().remoteAddress());
// clients.remove(ctx.channel());
JT808.unregister(ctx.channel());
}

View File

@ -0,0 +1,174 @@
package com.njzscloud.common.localizer.jt808.support;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.map.MapUtil;
import com.njzscloud.common.localizer.DeviceStore;
import com.njzscloud.common.localizer.Localizer;
import com.njzscloud.common.localizer.contant.LocalizerType;
import com.njzscloud.common.localizer.contant.MessageType;
import com.njzscloud.common.localizer.jt808.JT808;
import com.njzscloud.common.localizer.jt808.message.*;
import com.njzscloud.common.localizer.mqtt.result.RealtimeLocationResult;
import com.njzscloud.common.mqtt.util.Mqtt;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.function.Consumer;
@Slf4j
public abstract class JT808MessageListener {
public JT808MessageListener(LocalizerType localizerType) {
JT808MessageCenter.addListeners(localizerType, MapUtil.<MessageType, Consumer<TerminalMessageBody>>builder()
.put(MessageType.TerminalRegister, this::onTerminalRegister)
.put(MessageType.TerminalAuth, this::onTerminalAuth)
.put(MessageType.TerminalGeneralReply, this::onGeneralReport)
.put(MessageType.TerminalHeartbeat, this::onHeartbeat)
.put(MessageType.TerminalLocationReport, this::onLocationReport)
.put(MessageType.TerminalLocationBatchReport, this::onLocationBatchUpload)
.put(MessageType.TerminalLocationSearchReply, this::onLocationSearchReport)
.put(MessageType.TerminalTxtMsg, this::onTxtReport)
.build());
}
/**
*
*/
public void onTerminalRegister(TerminalMessageBody message) {
String terminalId = message.getTerminalId();
int flowId = message.getFlowId();
JT808.sendServerRegisterReply(terminalId, flowId, terminalId);
byte[] body = message.getBody();
ByteBuf byteBuf = Unpooled.wrappedBuffer(body);
TerminalRegisterMessage terminalRegisterMessage = new TerminalRegisterMessage(byteBuf);
log.info("{},终端注册消息: {}", terminalId, terminalRegisterMessage);
}
/**
*
*/
public void onTerminalAuth(TerminalMessageBody message) {
String terminalId = message.getTerminalId();
int messageId = message.getMessageId();
int flowId = message.getFlowId();
JT808.sendGeneralReply(terminalId, messageId, flowId);
ByteBuf body = Unpooled.wrappedBuffer(message.getBody());
TerminalAuthMessage authMsg = null;
try {
authMsg = new TerminalAuthMessage(body);
} catch (Exception e) {
body.release();
}
log.info("{},终端鉴权消息: {}", terminalId, authMsg);
}
/**
*
*/
public void onGeneralReport(TerminalMessageBody message) {
ByteBuf body = Unpooled.wrappedBuffer(message.getBody());
TerminalGeneralResponseMessage terminalGeneralResponseMessage = new TerminalGeneralResponseMessage(body);
log.info("设备通用响应,设备号:{},消息:{}", message.getTerminalId(), terminalGeneralResponseMessage.getResult());
}
/**
*
*/
public void onHeartbeat(TerminalMessageBody message) {
JT808.sendGeneralReply(message.getTerminalId(), message.getMessageId(), message.getFlowId());
String terminalId = message.getTerminalId();
Mqtt.publish(terminalId + "/online", MapUtil.builder()
.put("online", true)
.put("terminalId", terminalId)
.put("time", DateUtil.now())
.build());
}
/**
*
*/
public void onLocationReport(TerminalMessageBody message) {
String terminalId = message.getTerminalId();
JT808.sendGeneralReply(terminalId, message.getMessageId(), message.getFlowId());
byte[] body = message.getBody();
ByteBuf byteBuf = Unpooled.wrappedBuffer(body);
try {
Localizer localizer = DeviceStore.determineLocalizer(terminalId);
if (localizer == null) {
log.warn("未找到定位器:{}", terminalId);
return;
}
LocationReportMessage locationReportMsg = new LocationReportMessage(byteBuf);
RealtimeLocationResult realtimeLocationResult = BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class)
.setTerminalId(terminalId)
.setOverspeed(locationReportMsg.getSpeed() > localizer.getSpeedThreshold());
Mqtt.publish(terminalId + "/track_location", realtimeLocationResult);
Mqtt.publish(terminalId + "/track_location_real", realtimeLocationResult);
} finally {
byteBuf.release();
}
}
/**
*
*/
public void onLocationBatchUpload(TerminalMessageBody message) {
JT808.sendGeneralReply(message.getTerminalId(), message.getMessageId(), message.getFlowId());
String terminalId = message.getTerminalId();
byte[] body = message.getBody();
log.info("批量上报:{}", Arrays.toString(body));
ByteBuf byteBuf = Unpooled.wrappedBuffer(body);
try {
int count = byteBuf.readUnsignedShort();
int type = byteBuf.readByte();
Localizer localizer = DeviceStore.determineLocalizer(terminalId);
if (localizer == null) {
log.warn("未找到定位器:{}", terminalId);
return;
}
int speedThreshold = localizer.getSpeedThreshold();
for (int i = 0; i < count; i++) {
int length = byteBuf.readUnsignedShort();
LocationReportMessage locationReportMsg = new LocationReportMessage(byteBuf.slice(byteBuf.readerIndex(), length));
byteBuf.skipBytes(length);
RealtimeLocationResult realtimeLocationResult = BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class)
.setTerminalId(terminalId)
.setOverspeed(locationReportMsg.getSpeed() > speedThreshold)
.setType(type);
Mqtt.publish(terminalId + "/track_location", realtimeLocationResult);
Mqtt.publish(terminalId + "/track_location_real", realtimeLocationResult);
}
} finally {
byteBuf.release();
}
}
/**
*
*/
public void onLocationSearchReport(TerminalMessageBody message) {
JT808.sendGeneralReply(message.getTerminalId(), message.getMessageId(), message.getFlowId());
String terminalId = message.getTerminalId();
byte[] body = message.getBody();
ByteBuf byteBuf = Unpooled.wrappedBuffer(body);
try {
SearchLocationResponseMessage searchLocationResponseMsg = new SearchLocationResponseMessage(byteBuf);
LocationReportMessage locationReportMsg = searchLocationResponseMsg.getLocationReportMessage();
RealtimeLocationResult realtimeLocationResult = BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class)
.setTerminalId(terminalId);
Mqtt.publish(terminalId + "/current_location", realtimeLocationResult);
} finally {
byteBuf.release();
}
}
/**
*
*/
public abstract void onTxtReport(TerminalMessageBody message);
}

View File

@ -1,19 +1,10 @@
package com.njzscloud.common.localizer.jt808.support;
import cn.hutool.core.map.MapUtil;
import com.njzscloud.common.localizer.jt808.JT808;
import com.njzscloud.common.localizer.jt808.message.*;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
@Slf4j
public class JT808MessageResolver {
public static final String Heartbeat = "Heartbeat";
/* public static final String Heartbeat = "Heartbeat";
public static final String LocationReport = "LocationReport";
public static final String LocationBatchReport = "LocationBatchReport";
public static final String LocationSearch = "LocationSearch";
@ -61,78 +52,78 @@ public class JT808MessageResolver {
}
}
/**
*//**
*
*/
*//*
public static void onTerminalRegister(JT808Message message) {
String terminalPhone = message.getTerminalPhone();
int flowId = message.getFlowId();
ServerRegisterResponseMessage serverRegisterResponseMessage = new ServerRegisterResponseMessage(terminalPhone, flowId, 0, terminalPhone);
JT808.sendMessage(JT808.createBaseMessage(terminalPhone, 0x8100, serverRegisterResponseMessage.toBytes()));
ServerRegisterReplyMessage serverRegisterReplyMessage = new ServerRegisterReplyMessage(terminalPhone, flowId, 0, terminalPhone);
JT808.sendMessage(JT808.createBaseMessage(terminalPhone, 0x8100, serverRegisterReplyMessage.toBytes()));
TerminalRegisterMessage terminalRegisterMessage = message.parseMessageBody(TerminalRegisterMessage.class);
log.info("{},终端注册消息: {}", message.getTerminalPhone(), terminalRegisterMessage);
}
/**
*//**
*
*/
*//*
public static void onTerminalAuth(JT808Message message) {
JT808.sendGeneralResponse(message);
JT808.sendGeneralReply(message);
TerminalAuthMessage authMsg = message.parseMessageBody(TerminalAuthMessage.class);
log.info("{},终端鉴权消息: {}", message.getTerminalPhone(), authMsg);
}
/**
*//**
*
*/
*//*
public static void onTerminalGeneralResponse(JT808Message message) {
TerminalGeneralResponseMessage terminalGeneralResponseMessage = message.parseMessageBody(TerminalGeneralResponseMessage.class);
log.info("设备通用响应,设备号:{},消息:{}", message.getTerminalPhone(), terminalGeneralResponseMessage.getResult());
}
/**
*//**
*
*/
*//*
public static void onHeartbeat(JT808Message message) {
log.info("{},终端心跳消息: {}", message.getTerminalPhone(), message);
JT808.sendGeneralResponse(message);
JT808.sendGeneralReply(message);
dispatchMsg(Heartbeat, message);
}
/**
*//**
*
*/
*//*
public static void onLocationReport(JT808Message message) {
log.info("{},终端位置信息汇报消息: {}", message.getTerminalPhone(), message);
dispatchMsg(LocationReport, message);
}
/**
*//**
*
*/
*//*
public static void onLocationBatchUpload(JT808Message message) {
log.info("{},定位数据批量上传消息: {}", message.getTerminalPhone(), message);
dispatchMsg(LocationBatchReport, message);
}
/**
*//**
*
*/
*//*
public static void onSearchLocationResponse(JT808Message message) {
log.info("{},查询位置响应消息: {}", message.getTerminalPhone(), message);
JT808.sendGeneralResponse(message);
JT808.sendGeneralReply(message);
dispatchMsg(LocationSearch, message);
}
/**
*//**
*
*/
*//*
public static void onTerminalTxtReport(JT808Message message) {
log.info("{},终端文本信息汇报消息: {}", message.getTerminalPhone(), message);
JT808.sendGeneralResponse(message);
JT808.sendGeneralReply(message);
dispatchMsg(TxtReport, message);
}
} */
}

View File

@ -1,26 +1,48 @@
package com.njzscloud.common.localizer.mqtt;
import com.njzscloud.common.core.jackson.Jackson;
import com.fasterxml.jackson.core.type.TypeReference;
import com.njzscloud.common.localizer.DeviceStore;
import com.njzscloud.common.localizer.Localizer;
import com.njzscloud.common.localizer.contant.LocalizerType;
import com.njzscloud.common.localizer.htzj.Htzj;
import com.njzscloud.common.localizer.mqtt.param.*;
import com.njzscloud.common.localizer.tuqiang.Tuqiang;
import com.njzscloud.common.mqtt.support.MqttListener;
import com.njzscloud.common.mqtt.support.MqttMsg;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
@Slf4j
public class MqttMsgHandlers {
TypeReference<List<DeviceLoadParam>> typeReference = new TypeReference<List<DeviceLoadParam>>() {
};
/**
*
*/
@MqttListener(topic = "location/track")
public void onMessage(MqttMsg msg) {
LocationTrackParam locationTrackParam = msg.getMsg(LocationTrackParam.class);
// String format = StrUtil.format("nmset,0,{}", locationTrackParam.getInterval());
// String format = StrUtil.format("timer,0", locationTrackParam.getInterval());
// String format = StrUtil.format("reset", locationTrackParam.getInterval());
// JT808.sendTxtMessage(locationTrackParam.getTerminalId(), format);
Tuqiang.track(locationTrackParam.getTerminalId(), locationTrackParam.getInterval());
String terminalId = locationTrackParam.getTerminalId();
LocalizerType localizerType = DeviceStore.determineType(terminalId);
if (localizerType == null) {
log.warn("未确定定位器类型终端ID{}", terminalId);
return;
}
switch (localizerType) {
case Tuqiang:
Tuqiang.track(terminalId, locationTrackParam.getInterval());
break;
case HangTianZiJin:
Htzj.track(terminalId, locationTrackParam.getInterval());
break;
default:
log.warn("不支持的定位器类型:{}", localizerType);
}
}
/**
@ -29,7 +51,22 @@ public class MqttMsgHandlers {
@MqttListener(topic = "location/current")
public void currentLocation(MqttMsg msg) {
LocationCurrentParam locationCurrentParam = msg.getMsg(LocationCurrentParam.class);
Tuqiang.currentLocation(locationCurrentParam.getTerminalId());
String terminalId = locationCurrentParam.getTerminalId();
LocalizerType localizerType = DeviceStore.determineType(terminalId);
if (localizerType == null) {
log.warn("未确定定位器类型终端ID{}", terminalId);
return;
}
switch (localizerType) {
case Tuqiang:
Tuqiang.currentLocation(terminalId);
break;
case HangTianZiJin:
Htzj.currentLocation(terminalId);
break;
default:
log.warn("不支持的定位器类型:{}", localizerType);
}
}
/**
@ -39,8 +76,22 @@ public class MqttMsgHandlers {
public void enableWarn(MqttMsg msg) {
EnableWarnParam enableWarnParam = msg.getMsg(EnableWarnParam.class);
String terminalId = enableWarnParam.getTerminalId();
LocalizerType localizerType = DeviceStore.determineType(terminalId);
if (localizerType == null) {
log.warn("未确定定位器类型终端ID{}", terminalId);
return;
}
boolean enable = enableWarnParam.isEnable();
Tuqiang.enableWarn(terminalId, enable);
switch (localizerType) {
case Tuqiang:
Tuqiang.enableWarn(terminalId, enable);
break;
case HangTianZiJin:
Htzj.enableWarn(terminalId, enable);
break;
default:
log.warn("不支持的定位器类型:{}", localizerType);
}
}
/**
@ -50,10 +101,23 @@ public class MqttMsgHandlers {
public void configIpPort(MqttMsg msg) {
ConfigIpPortParam configIpPortParam = msg.getMsg(ConfigIpPortParam.class);
String terminalId = configIpPortParam.getTerminalId();
log.info("configIpPortParam: {}", Jackson.toJsonStr(configIpPortParam));
String ip = configIpPortParam.getIp();
int port = configIpPortParam.getPort();
Tuqiang.configIpPort(terminalId, ip, port);
LocalizerType localizerType = DeviceStore.determineType(terminalId);
if (localizerType == null) {
log.warn("未确定定位器类型终端ID{}", terminalId);
return;
}
switch (localizerType) {
case Tuqiang:
Tuqiang.configIpPort(terminalId, ip, port);
break;
case HangTianZiJin:
Htzj.configIpPort(terminalId, ip, port);
break;
default:
log.warn("不支持的定位器类型:{}", localizerType);
}
}
/**
@ -64,7 +128,21 @@ public class MqttMsgHandlers {
SpeedThresholdParam speedThresholdParam = msg.getMsg(SpeedThresholdParam.class);
String terminalId = speedThresholdParam.getTerminalId();
int speed = speedThresholdParam.getSpeed();
Tuqiang.speedThreshold(terminalId, speed);
LocalizerType localizerType = DeviceStore.determineType(terminalId);
if (localizerType == null) {
log.warn("未确定定位器类型终端ID{}", terminalId);
return;
}
switch (localizerType) {
case Tuqiang:
Tuqiang.speedThreshold(terminalId, speed);
break;
case HangTianZiJin:
Htzj.speedThreshold(terminalId, speed);
break;
default:
log.warn("不支持的定位器类型:{}", localizerType);
}
}
/**
@ -74,6 +152,57 @@ public class MqttMsgHandlers {
public void obtainDeviceInfo(MqttMsg msg) {
ObtainDeviceInfoParam obtainDeviceInfoParam = msg.getMsg(ObtainDeviceInfoParam.class);
String terminalId = obtainDeviceInfoParam.getTerminalId();
Tuqiang.obtainDeviceInfo(terminalId);
LocalizerType localizerType = DeviceStore.determineType(terminalId);
if (localizerType == null) {
log.warn("未确定定位器类型终端ID{}", terminalId);
return;
}
switch (localizerType) {
case Tuqiang:
Tuqiang.obtainDeviceInfo(terminalId);
break;
case HangTianZiJin:
Htzj.obtainDeviceInfo(terminalId);
break;
default:
log.warn("不支持的定位器类型:{}", localizerType);
}
}
/**
*
*/
@MqttListener(topic = "location/load")
public void load(MqttMsg msg) {
List<DeviceLoadParam> deviceLoadParams = msg.getMsg(typeReference.getType());
LinkedList<Localizer> list = deviceLoadParams.stream()
.map(it -> new Localizer().setTerminalId(it.getTerminalId()).setLocalizerType(it.getLocalizerType()))
.collect(Collectors.toCollection(LinkedList::new));
DeviceStore.setLocalizers(list);
}
/**
*
*/
@MqttListener(topic = "localizer/send_directive")
public void sendDirective(MqttMsg msg) {
SendDirectiveParam sendDirectiveParam = msg.getMsg(SendDirectiveParam.class);
String terminalId = sendDirectiveParam.getTerminalId();
LocalizerType localizerType = DeviceStore.determineType(terminalId);
if (localizerType == null) {
log.warn("未确定定位器类型终端ID{}", terminalId);
return;
}
String directive = sendDirectiveParam.getDirective();
switch (localizerType) {
case Tuqiang:
Tuqiang.sendDirective(terminalId, directive);
break;
case HangTianZiJin:
Htzj.sendDirective(terminalId, directive);
break;
default:
log.warn("不支持的定位器类型:{}", localizerType);
}
}
}

View File

@ -0,0 +1,22 @@
package com.njzscloud.common.localizer.mqtt.param;
import com.njzscloud.common.localizer.contant.LocalizerType;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
@Getter
@Setter
@ToString
@Accessors(chain = true)
public class DeviceLoadParam {
/**
* ID
*/
private String terminalId;
/**
*
*/
private LocalizerType localizerType;
}

View File

@ -1,6 +1,5 @@
package com.njzscloud.common.localizer.mqtt.param;
import com.njzscloud.common.localizer.contant.LocalizerType;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@ -15,8 +14,4 @@ public class LocationCurrentParam {
* ID
*/
private String terminalId;
/**
*
*/
private LocalizerType localizer;
}

View File

@ -1,6 +1,5 @@
package com.njzscloud.common.localizer.mqtt.param;
import com.njzscloud.common.localizer.contant.LocalizerType;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@ -19,8 +18,4 @@ public class LocationTrackParam {
* 0-36000
*/
private int interval;
/**
*
*/
private LocalizerType localizer;
}

View File

@ -0,0 +1,15 @@
package com.njzscloud.common.localizer.mqtt.param;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
@Getter
@Setter
@ToString
@Accessors(chain = true)
public class SendDirectiveParam {
private String terminalId;
private String directive;
}

View File

@ -1,6 +1,5 @@
package com.njzscloud.common.localizer.mqtt.param;
import com.njzscloud.common.localizer.contant.LocalizerType;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@ -19,8 +18,4 @@ public class SpeedThresholdParam {
*
*/
private int speed;
/**
*
*/
private LocalizerType localizer;
}

View File

@ -5,51 +5,43 @@ import cn.hutool.core.util.StrUtil;
import com.njzscloud.common.localizer.jt808.JT808;
import com.njzscloud.common.localizer.jt808.message.JT808Message;
import com.njzscloud.common.localizer.tuqiang.listener.TuqiangListeners;
import com.njzscloud.common.localizer.tuqiang.support.TuqiangListener;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.Method;
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class Tuqiang {
public static void run(int port, int bossThreads, int workerThreads) {
addListener(new TuqiangListeners());
JT808.run(port, bossThreads, workerThreads);
private static TuqiangListeners tuqiangListeners;
public static void init() {
tuqiangListeners = new TuqiangListeners();
}
public static void stop() {
JT808.stop();
}
private static void addListener(Object object) {
Method[] methods = object.getClass().getDeclaredMethods();
for (Method method : methods) {
if (method.isAnnotationPresent(TuqiangListener.class)) {
TuqiangListener tuqiangListener = method.getAnnotation(TuqiangListener.class);
String messageId = tuqiangListener.messageId();
JT808.addListener(messageId, (msg) -> {
try {
method.invoke(object, msg);
} catch (Exception e) {
log.error("途强处理消息 {} 时出错", messageId, e);
}
});
}
}
/**
*
*
* @param terminalId ID
* @param directive
*/
public static void sendDirective(String terminalId, String directive) {
log.info("发送途强指令终端ID{},指令:{}", terminalId, directive);
JT808.sendTxtMessage(terminalId, directive);
}
/**
*
*
* @param terminalId ID
* @param interval 0-36000
* @param interval 0-6000
*/
public static void track(String terminalId, int interval) {
if (interval < 1) {
interval = 1;
} else if (interval > 600) {
interval = 600;
}
log.info("开启途强追踪终端ID{},上报间隔:{}秒", terminalId, interval);
Assert.isTrue(interval >= 0 && interval <= 3600, "上报间隔必须为0-3600秒");
JT808.sendTxtMessage(terminalId, StrUtil.format("<SPBSJ*P:BSJGPS*C:{}*H:300>", interval));
}

View File

@ -1,104 +1,42 @@
package com.njzscloud.common.localizer.tuqiang.listener;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.map.MapUtil;
import com.njzscloud.common.localizer.contant.LocalizerType;
import com.njzscloud.common.localizer.jt808.message.SearchDeviceInfoMessage;
import com.njzscloud.common.localizer.jt808.message.TerminalMessageBody;
import com.njzscloud.common.localizer.jt808.message.TerminalTxtMessage;
import com.njzscloud.common.localizer.mqtt.result.RealtimeLocationResult;
import com.njzscloud.common.localizer.tuqiang.message.LocationReportMessage;
import com.njzscloud.common.localizer.tuqiang.message.SearchDeviceInfoMessage;
import com.njzscloud.common.localizer.tuqiang.message.SearchLocationResponseMessage;
import com.njzscloud.common.localizer.tuqiang.support.TuqiangListener;
import com.njzscloud.common.localizer.jt808.support.JT808MessageListener;
import com.njzscloud.common.mqtt.util.Mqtt;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import static com.njzscloud.common.localizer.jt808.support.JT808MessageResolver.*;
@Slf4j
public class TuqiangListeners {
/**
*
*/
@TuqiangListener(messageId = Heartbeat)
public void onHeartbeat(TerminalMessageBody message) {
String terminalId = message.getTerminalId();
Mqtt.publish(terminalId + "/online", MapUtil.builder()
.put("online", true)
.put("terminalId", terminalId)
.put("time", DateUtil.now())
.build());
}
public class TuqiangListeners extends JT808MessageListener {
/**
*
*/
@TuqiangListener(messageId = LocationReport)
public void onLocationReport(TerminalMessageBody message) {
ByteBuf byteBuf = message.getBody();
LocationReportMessage locationReportMsg = new LocationReportMessage(byteBuf);
// log.info("终端位置信息汇报消息: {}", locationReportMsg);
String terminalId = message.getTerminalId();
RealtimeLocationResult realtimeLocationResult = BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class)
.setTerminalId(terminalId);
Mqtt.publish(terminalId + "/track_location", realtimeLocationResult);
Mqtt.publish(terminalId + "/track_location_real", realtimeLocationResult);
}
/**
*
*/
@TuqiangListener(messageId = LocationBatchReport)
public void onLocationBatchUpload(TerminalMessageBody message) {
String terminalId = message.getTerminalId();
ByteBuf body = message.getBody();
int count = body.readUnsignedShort();
int type = body.readByte();
for (int i = 0; i < count; i++) {
int length = body.readUnsignedShort();
LocationReportMessage locationReportMsg = new LocationReportMessage(body.slice(body.readerIndex(), length));
body.skipBytes(length);
RealtimeLocationResult realtimeLocationResult = BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class)
.setTerminalId(terminalId)
.setType(type);
Mqtt.publish(terminalId + "/track_location", realtimeLocationResult);
Mqtt.publish(terminalId + "/track_location_real", realtimeLocationResult);
}
}
/**
*
*/
@TuqiangListener(messageId = LocationSearch)
public void onSearchLocationResponse(TerminalMessageBody message) {
String terminalId = message.getTerminalId();
ByteBuf body = message.getBody();
SearchLocationResponseMessage searchLocationResponseMsg = new SearchLocationResponseMessage(body);
// log.info("查询位置响应消息: {}", searchLocationResponseMsg);
LocationReportMessage locationReportMsg = searchLocationResponseMsg.getLocationReportMessage();
RealtimeLocationResult realtimeLocationResult = BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class)
.setTerminalId(terminalId);
Mqtt.publish(terminalId + "/current_location", realtimeLocationResult);
public TuqiangListeners() {
super(LocalizerType.Tuqiang);
}
/**
*
*/
@TuqiangListener(messageId = TxtReport)
public void onTerminalTxtReport(TerminalMessageBody message) {
TerminalTxtMessage terminalTxtReportMsg = new TerminalTxtMessage(message.getBody());
String txt = terminalTxtReportMsg.getTxt();
log.info("终端文本信息汇报消息: {}", txt);
SearchDeviceInfoMessage deviceInfo = SearchDeviceInfoMessage.parse(txt);
if (deviceInfo != null) {
String terminalId = message.getTerminalId();
deviceInfo.setTerminalId(terminalId);
@Override
public void onTxtReport(TerminalMessageBody message) {
byte[] body = message.getBody();
ByteBuf byteBuf = Unpooled.wrappedBuffer(body);
try {
TerminalTxtMessage terminalTxtReportMsg = new TerminalTxtMessage(byteBuf);
String txt = terminalTxtReportMsg.getTxt();
log.info("终端文本信息汇报消息: {}", txt);
SearchDeviceInfoMessage deviceInfo = SearchDeviceInfoMessage.parse(txt);
if (deviceInfo != null) {
String terminalId = message.getTerminalId();
deviceInfo.setTerminalId(terminalId);
Mqtt.publish(terminalId + "/device_info", deviceInfo);
Mqtt.publish(terminalId + "/device_info", deviceInfo);
}
} catch (Exception e) {
byteBuf.release();
}
}
}

View File

@ -1,10 +0,0 @@
package com.njzscloud.common.localizer.tuqiang.support;
import java.lang.annotation.*;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface TuqiangListener {
String messageId();
}

View File

@ -11,7 +11,7 @@ import org.springframework.context.annotation.Configuration;
@EnableConfigurationProperties(MqttProperties.class)
public class MqttAutoConfiguration {
@Bean(destroyMethod = "shutdown")
@Bean(destroyMethod = "shutdown", name = "mqtt")
public MqttCliWrapper mqtt(MqttProperties mqttProperties) {
return new MqttCliWrapper(mqttProperties);
}

View File

@ -5,7 +5,9 @@ import cn.hutool.core.util.StrUtil;
import com.njzscloud.common.core.jackson.Jackson;
import com.njzscloud.common.mqtt.config.MqttProperties;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
@ -17,7 +19,7 @@ import java.util.function.Consumer;
@Slf4j
public class MqttCliWrapper implements BeanPostProcessor {
private static final Map<String, Consumer<MqttMsg>> fn = new ConcurrentHashMap<>();
static final Map<String, Consumer<MqttMsg>> fn = new ConcurrentHashMap<>();
private MqttClient client;
public MqttCliWrapper(MqttProperties mqttProperties) {
@ -38,7 +40,7 @@ public class MqttCliWrapper implements BeanPostProcessor {
}
client.connect(options);
client.setCallback(new MsgHandler());
client.setCallback(new MsgHandler(fn));
} catch (Exception e) {
client = null;
log.error("mqtt连接失败", e);
@ -123,23 +125,5 @@ public class MqttCliWrapper implements BeanPostProcessor {
}
}
public static class MsgHandler implements MqttCallback {
public void messageArrived(String topic, MqttMessage message) throws Exception {
Consumer<MqttMsg> handler = fn.get(topic);
if (handler == null) return;
try {
handler.accept(new MqttMsg(message.getPayload()));
} catch (Exception e) {
log.error("mqtt 消息处理失败:{} {}", topic, new String(message.getPayload()), e);
}
}
public void connectionLost(Throwable cause) {
log.error("mqtt 连接已断开", cause);
}
public void deliveryComplete(IMqttDeliveryToken token) {
// log.info("消息投递结果:{}", token.isComplete());
}
}
}

View File

@ -2,6 +2,8 @@ package com.njzscloud.common.mqtt.support;
import com.njzscloud.common.core.jackson.Jackson;
import java.lang.reflect.Type;
public class MqttMsg {
private final byte[] msg;
@ -12,4 +14,8 @@ public class MqttMsg {
public <T> T getMsg(Class<T> clazz) {
return Jackson.toBean(msg, clazz);
}
public <T> T getMsg(Type type) {
return Jackson.toBean(this.msg, type);
}
}

View File

@ -0,0 +1,51 @@
package com.njzscloud.common.mqtt.support;
import com.njzscloud.common.core.thread.ThreadPool;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Consumer;
@Slf4j
public class MsgHandler implements MqttCallback {
private final Map<String, Consumer<MqttMsg>> fn;
private final ThreadPoolExecutor threadPool;
public MsgHandler(Map<String, Consumer<MqttMsg>> fn) {
threadPool = ThreadPool.createThreadPool(
"mqtt-handler",
10, 200, 60,
20, Integer.MAX_VALUE,
(r, p) -> log.error("任务执行器线程池已满,拒绝任务:{}", r.toString())
);
this.fn = fn;
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
Consumer<MqttMsg> handler = fn.get(topic);
if (handler == null) return;
try {
CompletableFuture
.runAsync(() -> handler.accept(new MqttMsg(message.getPayload())), threadPool)
.exceptionally(e -> {
log.error("mqtt 消息处理失败:{} {}", topic, new String(message.getPayload()), e);
return null;
});
} catch (Exception e) {
log.error("mqtt 消息处理失败", e);
}
}
public void connectionLost(Throwable cause) {
log.error("mqtt 连接已断开", cause);
}
public void deliveryComplete(IMqttDeliveryToken token) {
// log.info("消息投递结果:{}", token.isComplete());
}
}

View File

@ -1,5 +1,6 @@
package com.njzscloud.common.mqtt.util;
import cn.hutool.core.map.MapUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.njzscloud.common.mqtt.support.MqttCliWrapper;
import com.njzscloud.common.mqtt.support.MqttMsg;
@ -34,6 +35,12 @@ public final class Mqtt {
MQTT_CLI_WRAPPER.publish(topic, msg);
}
public static void publish(String topic) {
MQTT_CLI_WRAPPER.publish(topic, MapUtil.builder()
.put("topic", topic)
.build());
}
public static void publish(String topic, int qos, Object msg) {
MQTT_CLI_WRAPPER.publish(topic, qos, msg);
}

View File

@ -59,8 +59,7 @@ public class BizTruckService extends ServiceImpl<BizTruckMapper, BizTruckEntity>
if (StrUtil.isNotBlank(gps)) {
long count = deviceLocalizerService.count(Wrappers.<DeviceLocalizerEntity>query().eq("terminal_id", gps));
if (count == 0) {
deviceLocalizerService.add(new DeviceLocalizerEntity().setTerminalId(gps));
deviceLocalizerService.onApplicationReady();
throw Exceptions.exception("设备不存在:{}", gps);
}
}
this.save(bizTruckEntity);
@ -73,15 +72,14 @@ public class BizTruckService extends ServiceImpl<BizTruckMapper, BizTruckEntity>
*/
@Transactional(rollbackFor = Exception.class)
public void modify(ModifyBizTruckParam modifyBizTruckParam) {
this.updateById(BeanUtil.copyProperties(modifyBizTruckParam, BizTruckEntity.class));
BizTruckEntity bizTruck = this.getById(modifyBizTruckParam.getId());
String gps = bizTruck.getGps();
String gps = modifyBizTruckParam.getGps();
if (StrUtil.isNotBlank(gps)) {
long count = deviceLocalizerService.count(Wrappers.<DeviceLocalizerEntity>query().eq("terminal_id", gps));
if (count == 0) {
deviceLocalizerService.add(new DeviceLocalizerEntity().setTerminalId(gps));
throw Exceptions.exception("设备不存在:{}", gps);
}
}
this.updateById(BeanUtil.copyProperties(modifyBizTruckParam, BizTruckEntity.class));
}
/**
@ -105,7 +103,7 @@ public class BizTruckService extends ServiceImpl<BizTruckMapper, BizTruckEntity>
List<DictItemEntity> list = dictItemService.list(Wrappers.<DictItemEntity>lambdaQuery()
.eq(DictItemEntity::getDictKey, "vehicle_type"));
if (null != list && list.size() > 0) {
List<DictItemEntity> entityList = list.stream().filter(t-> t.getVal().equals(result.getTruckCategory()))
List<DictItemEntity> entityList = list.stream().filter(t -> t.getVal().equals(result.getTruckCategory()))
.collect(Collectors.toList());
if (entityList.size() > 0) {
result.setTruckCategory(entityList.get(0).getTxt());
@ -145,8 +143,8 @@ public class BizTruckService extends ServiceImpl<BizTruckMapper, BizTruckEntity>
List<DictItemEntity> list = dictItemService.list(Wrappers.<DictItemEntity>lambdaQuery()
.eq(DictItemEntity::getDictKey, "vehicle_type"));
if (null != list && list.size() > 0) {
for (SearchTruckResult result: page.getRecords()) {
List<DictItemEntity> entityList = list.stream().filter(t-> t.getVal().equals(result.getTruckCategory()))
for (SearchTruckResult result : page.getRecords()) {
List<DictItemEntity> entityList = list.stream().filter(t -> t.getVal().equals(result.getTruckCategory()))
.collect(Collectors.toList());
if (entityList.size() > 0) {
result.setTruckCategory(entityList.get(0).getTxt());

View File

@ -1,8 +1,11 @@
package com.njzscloud.supervisory.device.controller;
import cn.hutool.core.lang.Assert;
import com.njzscloud.common.core.ex.Exceptions;
import com.njzscloud.common.core.utils.R;
import com.njzscloud.common.mp.support.PageParam;
import com.njzscloud.common.mp.support.PageResult;
import com.njzscloud.common.security.util.SecurityUtil;
import com.njzscloud.supervisory.device.pojo.entity.DeviceLocalizerEntity;
import com.njzscloud.supervisory.device.pojo.entity.LocalizerConfig;
import com.njzscloud.supervisory.device.pojo.param.SearchDeviceLocalizerParam;
@ -84,7 +87,34 @@ public class DeviceLocalizerController {
public R<?> modifyConfig(@RequestBody LocalizerConfig config,
@RequestParam Long id
) {
Assert.isTrue(SecurityUtil.isAdmin(), () -> Exceptions.clierr("非管理员不能操作"));
deviceLocalizerService.modifyConfig(id, config);
return R.success();
}
/**
*
*/
@GetMapping("/reflash_device")
public R<?> reflash() {
Assert.isTrue(SecurityUtil.isAdmin(), () -> Exceptions.clierr("非管理员不能操作"));
deviceLocalizerService.reflash();
return R.success();
}
@GetMapping("/send_directive")
public R<?> sendDirective(@RequestParam Long id,
@RequestParam String directive
) {
Assert.isTrue(SecurityUtil.isAdmin(), () -> Exceptions.clierr("非管理员不能操作"));
deviceLocalizerService.sendDirective(id, directive);
return R.success();
}
@GetMapping("/enable_track")
public R<?> enableTrack(@RequestParam Long id) {
Assert.isTrue(SecurityUtil.isAdmin(), () -> Exceptions.clierr("非管理员不能操作"));
deviceLocalizerService.enableTrack(id);
return R.success();
}
}

View File

@ -2,6 +2,7 @@ package com.njzscloud.supervisory.device.service;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
@ -9,10 +10,9 @@ import com.baomidou.mybatisplus.extension.service.IService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njzscloud.common.core.ex.Exceptions;
import com.njzscloud.common.core.utils.R;
import com.njzscloud.common.localizer.mqtt.param.ConfigIpPortParam;
import com.njzscloud.common.localizer.mqtt.param.EnableWarnParam;
import com.njzscloud.common.localizer.mqtt.param.ObtainDeviceInfoParam;
import com.njzscloud.common.localizer.mqtt.param.SpeedThresholdParam;
import com.njzscloud.common.localizer.Localizer;
import com.njzscloud.common.localizer.contant.LocalizerType;
import com.njzscloud.common.localizer.mqtt.param.*;
import com.njzscloud.common.mp.support.PageParam;
import com.njzscloud.common.mp.support.PageResult;
import com.njzscloud.common.mqtt.support.MqttMsg;
@ -34,6 +34,7 @@ import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/**
@ -59,8 +60,16 @@ public class DeviceLocalizerService extends ServiceImpl<DeviceLocalizerMapper, D
.eq(DeviceLocalizerEntity::getLocalizerCategory, localizerCategory)),
() -> Exceptions.exception("定位器已存在"));
this.save(deviceLocalizerEntity.setConfig(new LocalizerConfig()
.setEnableWarn(false)
.setSpeedThreshold(60)
.setEnableWarn(true)
));
CompletableFuture.runAsync(() -> {
ThreadUtil.sleep(10000);
Mqtt.subscribe(terminalId + "/online", this::heartbeat);
Mqtt.subscribe(terminalId + "/device_info", this::updateDeviceLocalizerStatus);
// Mqtt.subscribe(terminalId + "/track_location_real", truckLocationTrackService::sendRealTimeData);
reflash();
});
}
/**
@ -84,6 +93,13 @@ public class DeviceLocalizerService extends ServiceImpl<DeviceLocalizerMapper, D
.setTerminalId(terminalId)
.setLocalizerCategory(localizerCategory)
);
CompletableFuture.runAsync(() -> {
ThreadUtil.sleep(10000);
Mqtt.subscribe(terminalId + "/online", this::heartbeat);
Mqtt.subscribe(terminalId + "/device_info", this::updateDeviceLocalizerStatus);
// Mqtt.subscribe(terminalId + "/track_location_real", truckLocationTrackService::sendRealTimeData);
reflash();
});
}
/**
@ -94,6 +110,10 @@ public class DeviceLocalizerService extends ServiceImpl<DeviceLocalizerMapper, D
List<DeviceLocalizerEntity> deviceLocalizerEntities = baseMapper.canDel(Wrappers.query(DeviceLocalizerEntity.class).in("a.id", ids));
Assert.isTrue(deviceLocalizerEntities.isEmpty(), () -> Exceptions.exception("存在已被关联的定位器,不能删除"));
this.removeBatchByIds(ids);
CompletableFuture.runAsync(() -> {
ThreadUtil.sleep(10000);
reflash();
});
}
/**
@ -130,12 +150,19 @@ public class DeviceLocalizerService extends ServiceImpl<DeviceLocalizerMapper, D
}
public void onApplicationReady() {
Mqtt.subscribe("localizer/loader", this::localizerLoader);
CompletableFuture.runAsync(() -> {
ThreadUtil.sleep(3000);
reflash();
});
List<DeviceLocalizerEntity> localizerEntities = this.list();
if (localizerEntities.isEmpty()) return;
List<String> collect = localizerEntities.stream().map(DeviceLocalizerEntity::getTerminalId).distinct().collect(Collectors.toList());
for (String terminalId : collect) {
Mqtt.subscribe(terminalId + "/online", this::heartbeat);
Mqtt.subscribe(terminalId + "/device_info", this::updateDeviceLocalizerStatus);
Mqtt.subscribe(terminalId + "/track_location_real", truckLocationTrackService::sendRealTimeData);
// Mqtt.subscribe(terminalId + "/track_location_real", truckLocationTrackService::sendRealTimeData);
}
}
@ -143,11 +170,16 @@ public class DeviceLocalizerService extends ServiceImpl<DeviceLocalizerMapper, D
HeartbeatResult heartbeatResult = msg.getMsg(HeartbeatResult.class);
if (heartbeatResult == null) return;
this.update(Wrappers.lambdaUpdate(DeviceLocalizerEntity.class)
.set(DeviceLocalizerEntity::getOnline, heartbeatResult.isOnline())
// .set(DeviceLocalizerEntity::getOnline, heartbeatResult.isOnline())
.set(DeviceLocalizerEntity::getLastTime, heartbeatResult.getTime())
.eq(DeviceLocalizerEntity::getTerminalId, heartbeatResult.getTerminalId()));
}
public void localizerLoader(MqttMsg msg) {
// CompletableFuture.runAsync(this::reflash);
reflash();
}
public void updateDeviceLocalizerStatus(MqttMsg msg) {
DeviceInfoResult deviceInfoResult = msg.getMsg(DeviceInfoResult.class);
if (deviceInfoResult == null) return;
@ -190,7 +222,11 @@ public class DeviceLocalizerService extends ServiceImpl<DeviceLocalizerMapper, D
oldConfig.setEnableWarn(enableWarn);
}
this.updateById(new DeviceLocalizerEntity().setId(id).setConfig(oldConfig));
this.configDevice(terminalId, config);
CompletableFuture.runAsync(() -> {
ThreadUtil.sleep(10000);
reflash();
});
// this.configDevice(terminalId, config);
}
private void configDevice(String terminalId, LocalizerConfig config) {
@ -203,14 +239,13 @@ public class DeviceLocalizerService extends ServiceImpl<DeviceLocalizerMapper, D
modifyIpPort(terminalId, ip, port);
}
if (speedThreshold != null && speedThreshold > 0) {
/* if (speedThreshold != null && speedThreshold > 0) {
modifySpeedThreshold(terminalId, speedThreshold);
}
if (enableWarn != null) {
enableWarn(terminalId, enableWarn);
}
} */
}
private void modifyIpPort(String terminalId, String ip, int port) {
@ -231,7 +266,7 @@ public class DeviceLocalizerService extends ServiceImpl<DeviceLocalizerMapper, D
}
private void enableWarn(String terminalId, boolean enableWarn) {
Mqtt.publish("location/speed_threshold", new EnableWarnParam()
Mqtt.publish("location/warn", new EnableWarnParam()
.setTerminalId(terminalId)
.setEnable(enableWarn)
);
@ -243,4 +278,40 @@ public class DeviceLocalizerService extends ServiceImpl<DeviceLocalizerMapper, D
String terminalId = localizerEntity.getTerminalId();
Mqtt.publish("location/device_info", new ObtainDeviceInfoParam().setTerminalId(terminalId));
}
public void reflash() {
List<DeviceLocalizerEntity> localizerEntities = this.list();
if (localizerEntities.isEmpty()) return;
List<Localizer> collect = localizerEntities.stream()
.map(it -> {
LocalizerConfig config = it.getConfig();
return new Localizer()
.setTerminalId(it.getTerminalId())
.setSpeedThreshold(config == null || config.getSpeedThreshold() == null ? 60 : config.getSpeedThreshold())
.setLocalizerType(LocalizerType.valueOf(it.getLocalizerCategory().getVal()));
}
)
.collect(Collectors.toList());
Mqtt.publish("location/load", collect);
}
public void sendDirective(Long id, String directive) {
DeviceLocalizerEntity localizerEntity = this.getById(id);
Assert.notNull(localizerEntity, () -> Exceptions.clierr("设备不存在"));
String terminalId = localizerEntity.getTerminalId();
Mqtt.publish("localizer/send_directive", new SendDirectiveParam()
.setTerminalId(terminalId)
.setDirective(directive)
);
}
public void enableTrack(Long id) {
DeviceLocalizerEntity localizerEntity = this.getById(id);
Assert.notNull(localizerEntity, () -> Exceptions.clierr("设备不存在"));
String terminalId = localizerEntity.getTerminalId();
Mqtt.publish("location/track", MapUtil.builder()
.put("terminalId", terminalId)
.put("interval", 1)
.build());
}
}

View File

@ -203,14 +203,14 @@ public class OrderInfoController {
}
@GetMapping("/start_track")
public R<?> startTrack(@RequestParam("orderId") String orderId) {
orderInfoService.startTuqiangTrack(orderId);
public R<?> startTrack(@RequestParam("orderId") Long orderId) {
orderInfoService.startTrackOrder(orderId);
return R.success();
}
@GetMapping("/stop_track")
public R<?> stopTrack(@RequestParam("orderId") String orderId) {
orderInfoService.stopTrack(orderId);
public R<?> stopTrack(@RequestParam("orderId") Long orderId) {
orderInfoService.stopTrackOrder(orderId);
return R.success();
}

View File

@ -92,21 +92,6 @@ public class OrderInfoService extends ServiceImpl<OrderInfoMapper, OrderInfoEnti
private final MoneyAccountService moneyAccountService;
private final MoneyChangeDetailService moneyChangeDetailService;
private static void stopTuqiangTrack(String gpsId) {
CompletableFuture.runAsync(() -> {
Assert.notEmpty(gpsId, () -> Exceptions.clierr("车辆未绑定GPS"));
Mqtt.publish("location/track", MapUtil.builder()
.put("terminalId", gpsId)
.put("interval", 0)
.build());
Mqtt.unsubscribe(gpsId + "/track_location");
}).whenComplete((aVoid, throwable) -> {
if (throwable != null) {
log.error("关闭GPS失败", throwable);
}
});
}
/**
*
*/
@ -739,8 +724,19 @@ public class OrderInfoService extends ServiceImpl<OrderInfoMapper, OrderInfoEnti
Assert.notNull(truckInfo, () -> Exceptions.clierr("车辆不存在"));
String gpsId = truckInfo.getGps();
String licensePlate = truckInfo.getLicensePlate();
startTuqiangTrack(gpsId, licensePlate, orderInfoId, truckId);
Assert.notEmpty(gpsId, () -> Exceptions.clierr("车辆未绑定GPS"));
DeviceLocalizerEntity deviceLocalizerEntity = baseMapper.gpsLastOnlineTime(gpsId);
LocalDateTime lastTime = deviceLocalizerEntity.getLastTime();
boolean after = lastTime.isBefore(LocalDateTime.now().minusMinutes(5));
if (after) {
bizWarnService.save(new BizWarnEntity()
.setWarnCategory(WarnCategory.EQUIPMENT.getVal())
.setWarnContent(StrUtil.format("{} 绑定的 GPS 设备已离线,设备号:{}", licensePlate, gpsId))
.setOrderId(orderInfoId)
);
return;
}
startTrack(gpsId, licensePlate, orderInfoId, truckId);
CompletableFuture.runAsync(() -> Websocket.publish(new WsMsg().setEvent("down/order/status_change")
.setData(MapUtil.builder()
@ -824,89 +820,22 @@ public class OrderInfoService extends ServiceImpl<OrderInfoMapper, OrderInfoEnti
}
}
private void startTuqiangTrack(String gpsId, String licensePlate, Long orderInfoId, Long truckId) {
CompletableFuture.runAsync(() -> {
Assert.notEmpty(gpsId, () -> Exceptions.clierr("车辆未绑定GPS"));
DeviceLocalizerEntity deviceLocalizerEntity = baseMapper.gpsLastOnlineTime(gpsId);
LocalDateTime lastTime = deviceLocalizerEntity.getLastTime();
boolean after = lastTime.isBefore(LocalDateTime.now().minusMinutes(5));
if (after) {
bizWarnService.save(new BizWarnEntity()
.setWarnCategory(WarnCategory.EQUIPMENT.getVal())
.setWarnContent(StrUtil.format("{} 绑定的 GPS 设备已离线,设备号:{}", licensePlate, gpsId))
.setOrderId(orderInfoId)
);
return;
}
Mqtt.subscribe(gpsId + "/track_location", (msg) -> {
RealtimeLocationResult realtimeLocationResult = msg.getMsg(RealtimeLocationResult.class);
if (realtimeLocationResult == null) {
return;
}
if (realtimeLocationResult.isOverspeed()) {
bizWarnService.save(new BizWarnEntity()
.setWarnCategory(WarnCategory.SPEED.getVal())
.setWarnContent(StrUtil.format("{} 已超速,当前时速 {}km/h", licensePlate, realtimeLocationResult.getSpeed()))
.setOrderId(orderInfoId)
);
}
log.info("mqtt 收到消息:{} {}", gpsId + "/track_location", realtimeLocationResult);
String time = realtimeLocationResult.getTime();
double latitude = realtimeLocationResult.getLatitude();
double longitude = realtimeLocationResult.getLongitude();
if (latitude <= 0 && longitude <= 0) {
return;
}
TruckLocationTrackEntity entity = new TruckLocationTrackEntity()
.setOrderId(orderInfoId)
.setTruckId(truckId)
.setTerminalId(gpsId)
.setLatitude(latitude)
.setLongitude(longitude)
.setAltitude(realtimeLocationResult.getAltitude())
.setSpeed(realtimeLocationResult.getSpeed())
.setLocationTime(DateUtil.parseLocalDateTime(time))
.setDirection(realtimeLocationResult.getDirection())
.setOverspeed(realtimeLocationResult.isOverspeed())
.setCompensate(realtimeLocationResult.getType() == 1);
truckLocationTrackService.save(entity);
});
Mqtt.publish("location/track", MapUtil.builder()
.put("terminalId", gpsId)
.put("interval", 1)
.build());
}).whenComplete((aVoid, throwable) -> {
if (throwable != null) {
log.error("开启GPS失败", throwable);
}
});
}
public void startTuqiangTrack(String orderId) {
OrderInfoEntity orderInfo = this.getOne(Wrappers.lambdaQuery(OrderInfoEntity.class)
.eq(OrderInfoEntity::getSn, orderId)
.or().eq(OrderInfoEntity::getId, orderId)
);
Long orderInfoId = orderInfo.getId();
public void startTrackOrder(Long orderId) {
OrderInfoEntity orderInfo = this.getById(orderId);
Long truckId = orderInfo.getTruckId();
BizTruckEntity truckInfo = baseMapper.getTruckInfo(truckId);
String gpsId = truckInfo.getGps();
String licensePlate = truckInfo.getLicensePlate();
startTuqiangTrack(gpsId, licensePlate, orderInfoId, truckId);
startTrack(gpsId, licensePlate, orderId, truckId);
}
public void stopTrack(String orderId) {
OrderInfoEntity orderInfo = this.getOne(Wrappers.lambdaQuery(OrderInfoEntity.class)
.eq(OrderInfoEntity::getSn, orderId)
.or().eq(OrderInfoEntity::getId, orderId)
);
public void stopTrackOrder(Long orderId) {
OrderInfoEntity orderInfo = this.getById(orderId);
Long truckId = orderInfo.getTruckId();
BizTruckEntity truckInfo = baseMapper.getTruckInfo(truckId);
String gpsId = truckInfo.getGps();
stopTuqiangTrack(gpsId);
stopTrack(gpsId);
}
@Transactional(rollbackFor = Exception.class)
@ -964,7 +893,7 @@ public class OrderInfoService extends ServiceImpl<OrderInfoMapper, OrderInfoEnti
if (StrUtil.isBlank(gpsId)) {
return;
}
stopTuqiangTrack(gpsId);
stopTrack(gpsId);
}).exceptionally(ex -> {
log.error("数据获取失败", ex);
return null;
@ -1113,10 +1042,10 @@ public class OrderInfoService extends ServiceImpl<OrderInfoMapper, OrderInfoEnti
// 3) 合并集合,计算 total_money 与 settle_money并批量落库。
// settleForTransCompany(orderInfoEntity, settleWeight);
} finally {
// TODO 关闭 GPS
// 关闭 GPS
BizTruckEntity truckInfo = baseMapper.getTruckInfo(truckId);
String gpsId = truckInfo.getGps();
stopTuqiangTrack(gpsId);
stopTrack(gpsId);
}
}
@ -1368,4 +1297,67 @@ public class OrderInfoService extends ServiceImpl<OrderInfoMapper, OrderInfoEnti
}
FileUtil.downloadExcel(downList, response, "收运统计表.xlsx");
}
private void startTrack(String gpsId, String licensePlate, Long orderInfoId, Long truckId) {
CompletableFuture.runAsync(() -> {
Mqtt.subscribe(gpsId + "/track_location", (msg) -> {
RealtimeLocationResult realtimeLocationResult = msg.getMsg(RealtimeLocationResult.class);
if (realtimeLocationResult == null) {
return;
}
if (realtimeLocationResult.isOverspeed()) {
bizWarnService.save(new BizWarnEntity()
.setWarnCategory(WarnCategory.SPEED.getVal())
.setWarnContent(StrUtil.format("{} 已超速,当前时速 {}km/h", licensePlate, realtimeLocationResult.getSpeed()))
.setOrderId(orderInfoId)
);
}
// log.info("mqtt 收到消息:{} {}", gpsId + "/track_location", realtimeLocationResult);
String time = realtimeLocationResult.getTime();
double latitude = realtimeLocationResult.getLatitude();
double longitude = realtimeLocationResult.getLongitude();
if (latitude <= 0 && longitude <= 0) {
return;
}
TruckLocationTrackEntity entity = new TruckLocationTrackEntity()
.setOrderId(orderInfoId)
.setTruckId(truckId)
.setTerminalId(gpsId)
.setLatitude(latitude)
.setLongitude(longitude)
.setAltitude(realtimeLocationResult.getAltitude())
.setSpeed(realtimeLocationResult.getSpeed())
.setLocationTime(DateUtil.parseLocalDateTime(time))
.setDirection(realtimeLocationResult.getDirection())
.setOverspeed(realtimeLocationResult.isOverspeed())
.setCompensate(realtimeLocationResult.getType() == 1);
truckLocationTrackService.save(entity);
});
/* Mqtt.publish("location/track", MapUtil.builder()
.put("terminalId", gpsId)
.put("interval", 1)
.build()); */
}).whenComplete((aVoid, throwable) -> {
if (throwable != null) {
log.error("开启GPS失败", throwable);
}
});
}
private void stopTrack(String gpsId) {
CompletableFuture.runAsync(() -> {
Assert.notEmpty(gpsId, () -> Exceptions.clierr("车辆未绑定GPS"));
/* Mqtt.publish("location/track", MapUtil.builder()
.put("terminalId", gpsId)
.put("interval", 0)
.build()); */
Mqtt.unsubscribe(gpsId + "/track_location");
}).whenComplete((aVoid, throwable) -> {
if (throwable != null) {
log.error("关闭GPS失败", throwable);
}
});
}
}