localizer
ljw 2025-10-22 11:12:42 +08:00
commit e5bc14b9d7
57 changed files with 1471 additions and 414 deletions

View File

@ -0,0 +1,63 @@
package com.njzscloud.common.localizer;
import cn.hutool.core.collection.CollUtil;
import com.njzscloud.common.localizer.contant.LocalizerType;
import com.njzscloud.common.mqtt.util.Mqtt;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@Slf4j
public class DeviceStore {
private static final LinkedList<Localizer> localizers = new LinkedList<>();
private static final ReadWriteLock localizersLock = new ReentrantReadWriteLock();
private static final Lock localizersWriteLock = localizersLock.writeLock();
private static final Lock localizersReadLock = localizersLock.readLock();
public static void init() {
Mqtt.publish("localizer/loader");
}
public static void setLocalizers(LinkedList<Localizer> list) {
localizersWriteLock.lock();
try {
log.info("重新加载设备:{}", list.size());
localizers.clear();
if (CollUtil.isNotEmpty(list)) localizers.addAll(list);
} finally {
localizersWriteLock.unlock();
}
}
public static Localizer determineLocalizer(String terminalId) {
localizersReadLock.lock();
try {
for (Localizer localizer : localizers) {
if (localizer.getTerminalId().equals(terminalId)) {
return localizer;
}
}
} finally {
localizersReadLock.unlock();
}
return null;
}
public static LocalizerType determineType(String terminalId) {
localizersReadLock.lock();
try {
for (Localizer localizer : localizers) {
if (localizer.getTerminalId().equals(terminalId)) {
return localizer.getLocalizerType();
}
}
} finally {
localizersReadLock.unlock();
}
return null;
}
}

View File

@ -0,0 +1,31 @@
package com.njzscloud.common.localizer;
import com.njzscloud.common.localizer.contant.LocalizerType;
import com.njzscloud.common.localizer.jt808.JT808;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Getter
@Setter
@ToString
@Accessors(chain = true)
public class Localizer {
/**
* Id
*/
protected String terminalId;
/**
*
*/
protected LocalizerType localizerType;
private int speedThreshold;
public boolean isOnline() {
return JT808.isOnline(terminalId);
}
}

View File

@ -1,6 +1,12 @@
package com.njzscloud.common.localizer.config;
import com.njzscloud.common.localizer.DeviceStore;
import com.njzscloud.common.localizer.htzj.Htzj;
import com.njzscloud.common.localizer.jt808.JT808;
import com.njzscloud.common.localizer.tuqiang.Tuqiang;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
public class LocalizerInitializer {
private final LocalizerProperties properties;
@ -10,10 +16,17 @@ public class LocalizerInitializer {
}
public void init() {
Tuqiang.run(properties.getPort(), properties.getBossThreads(), properties.getWorkerThreads());
Htzj.init();
Tuqiang.init();
JT808.run(properties.getPort(), properties.getBossThreads(), properties.getWorkerThreads());
}
private void destroy() {
Tuqiang.stop();
JT808.stop();
}
@EventListener(ApplicationReadyEvent.class)
public void onApplicationReady() {
DeviceStore.init();
}
}

View File

@ -0,0 +1,6 @@
package com.njzscloud.common.localizer.contant;
public enum BodyCharset {
GBK,
ASCII,
}

View File

@ -0,0 +1,21 @@
package com.njzscloud.common.localizer.contant;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
/**
*
*/
@Getter
@RequiredArgsConstructor
public enum LocalizerType {
/**
*
*/
Tuqiang,
/**
*
*/
HangTianZiJin,
;
}

View File

@ -0,0 +1,34 @@
package com.njzscloud.common.localizer.contant;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@Getter
@RequiredArgsConstructor
public enum MessageType {
ServerRegisterReply(0x8100),
ServerTxtMsg(0x8300),
ServerGeneralReply(0x8001),
TerminalRegister(0x0100),
TerminalAuth(0x0102),
TerminalGeneralReply(0x0001),
TerminalHeartbeat(0x0002),
TerminalLocationReport(0x0200),
TerminalLocationBatchReport(0x704),
TerminalLocationSearchReply(0x0201),
TerminalTxtMsg(0x6006),
;
private final int messageId;
public static MessageType getMessageType(int messageId) {
MessageType[] values = MessageType.values();
for (MessageType value : values) {
if (value.messageId == messageId) {
return value;
}
}
return null;
}
}

View File

@ -0,0 +1,109 @@
package com.njzscloud.common.localizer.htzj;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;
import com.njzscloud.common.localizer.contant.BodyCharset;
import com.njzscloud.common.localizer.htzj.listener.HtzjListeners;
import com.njzscloud.common.localizer.jt808.JT808;
import com.njzscloud.common.localizer.jt808.message.JT808Message;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class Htzj {
private static HtzjListeners htzjListeners = new HtzjListeners();
public static void init() {
htzjListeners = new HtzjListeners();
}
/**
*
* <pre>
* SR
* devid?
* reset
* wakelv,8
* nmset,0,X
* X3~600
*
* nmset,0,30
* 030
* nmset,0,180
* 03</pre>
*
* @param terminalId ID
* @param directive
*/
public static void sendDirective(String terminalId, String directive) {
log.info("发送航天紫金指令终端ID{},指令:{}", terminalId, directive);
JT808.sendTxtMessage(terminalId, directive, BodyCharset.ASCII);
}
/**
*
*
* @param terminalId ID
* @param interval 0-36000
*/
public static void track(String terminalId, int interval) {
if (interval < 3) {
interval = 3;
} else if (interval > 600) {
interval = 600;
}
log.info("开启航天紫金追踪终端ID{},上报间隔:{}秒3 秒后重启设备", terminalId, interval);
sendDirective(terminalId, StrUtil.format("nmset,0,{}", interval));
ThreadUtil.sleep(3000);
sendDirective(terminalId, "reset");
}
/**
*
*
* @param terminalId ID
*/
public static void obtainDeviceInfo(String terminalId) {
}
/**
*
*
* @param terminalId ID
*/
public static void enableWarn(String terminalId, boolean enable) {
}
/**
*
*
* @param terminalId ID
* @param speed km/h
*/
public static void speedThreshold(String terminalId, int speed) {
}
/**
*
*
* @param terminalId Id
* @param ip IP
* @param port
*/
public static void configIpPort(String terminalId, String ip, int port) {
}
/**
*
*
* @param terminalId ID
*/
public static void currentLocation(String terminalId) {
log.info("航天紫金获当前位置终端ID{}", terminalId);
JT808Message baseMessage = JT808.createBaseMessage(terminalId, 0x8201);
JT808.sendMessage(baseMessage);
}
}

View File

@ -0,0 +1,34 @@
package com.njzscloud.common.localizer.htzj.listener;
import com.njzscloud.common.localizer.contant.BodyCharset;
import com.njzscloud.common.localizer.contant.LocalizerType;
import com.njzscloud.common.localizer.jt808.message.TerminalMessageBody;
import com.njzscloud.common.localizer.jt808.message.TerminalTxtMessage;
import com.njzscloud.common.localizer.jt808.support.JT808MessageListener;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HtzjListeners extends JT808MessageListener {
public HtzjListeners() {
super(LocalizerType.HangTianZiJin);
}
/**
*
*/
@Override
public void onTxtReport(TerminalMessageBody message) {
byte[] body = message.getBody();
ByteBuf byteBuf = Unpooled.wrappedBuffer(body);
try {
TerminalTxtMessage terminalTxtReportMsg = new TerminalTxtMessage(byteBuf);
String txt = terminalTxtReportMsg.getTxt(BodyCharset.ASCII);
log.info("终端文本信息汇报消息: 【\n{}\n】", txt);
} catch (Exception e) {
byteBuf.release();
}
}
}

View File

@ -1,11 +1,9 @@
package com.njzscloud.common.localizer.jt808;
import com.njzscloud.common.core.utils.BCD;
import com.njzscloud.common.localizer.jt808.message.JT808Message;
import com.njzscloud.common.localizer.jt808.message.ServerGeneralResponseMessage;
import com.njzscloud.common.localizer.jt808.message.ServerTxtMessage;
import com.njzscloud.common.localizer.jt808.message.TerminalMessageBody;
import com.njzscloud.common.localizer.jt808.support.JT808MessageResolver;
import com.njzscloud.common.localizer.contant.BodyCharset;
import com.njzscloud.common.localizer.contant.MessageType;
import com.njzscloud.common.localizer.jt808.message.*;
import com.njzscloud.common.localizer.jt808.support.TcpServer;
import com.njzscloud.common.localizer.jt808.util.FlowId;
import io.netty.buffer.ByteBuf;
@ -18,7 +16,6 @@ import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
/**
* JT808
@ -28,7 +25,7 @@ import java.util.function.Consumer;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class JT808 {
// 终端手机号与Channel的映射
// 终端设备号与Channel的映射
public static final Map<String, Channel> DeviceHandle = new ConcurrentHashMap<>();
private static TcpServer tcpServer;
@ -45,13 +42,6 @@ public final class JT808 {
}
}
/**
*
*/
public static void addListener(String messageId, Consumer<TerminalMessageBody> listener) {
JT808MessageResolver.addResolver(messageId, listener);
}
/**
*
*/
@ -72,35 +62,44 @@ public final class JT808 {
}
}
public static boolean isOnline(String terminalId) {
Channel channel = DeviceHandle.get(terminalId);
return channel != null && channel.isActive();
}
/**
* (0x8001)
*/
public static void sendGeneralResponse(JT808Message message, int result) {
String terminalPhone = message.getTerminalPhone();
int flowId = message.getFlowId();
int messageId = message.getMessageId();
ServerGeneralResponseMessage response = new ServerGeneralResponseMessage(messageId, flowId, result);
message = JT808.createBaseMessage(terminalPhone, 0x8001, response.toBytes());
sendMessage(message);
public static void sendGeneralReply(String terminalId, int messageId, int flowId, int result) {
ServerGeneralReplyMessage replyMessage = new ServerGeneralReplyMessage(messageId, flowId, result);
sendMessage(terminalId, MessageType.ServerGeneralReply, replyMessage);
}
public static void sendGeneralResponse(JT808Message message) {
sendGeneralResponse(message, 0);
public static void sendGeneralReply(String terminalId, int messageId, int flowId) {
sendGeneralReply(terminalId, messageId, flowId, 0);
}
/**
* (0x8300)
*/
public static void sendTxtMessage(String terminalId, String txt, BodyCharset charset) {
ServerTxtMessage txtMessage = new ServerTxtMessage(txt, charset);
sendMessage(terminalId, MessageType.ServerTxtMsg, txtMessage);
}
public static void sendTxtMessage(String terminalId, String txt) {
ServerTxtMessage txtMessage = new ServerTxtMessage(txt);
JT808Message message = JT808.createBaseMessage(terminalId, 0x8300, txtMessage.toBytes());
sendMessage(message);
sendTxtMessage(terminalId, txt, BodyCharset.GBK);
}
/**
*
*/
public static void sendMessage(String terminalId, MessageType messageType, ServerMessageBody body) {
JT808Message message = JT808.createBaseMessage(terminalId, messageType.getMessageId(), body.toBytes());
sendMessage(message);
}
public static void sendMessage(JT808Message message) {
String terminalPhone = message.getTerminalPhone();
@ -108,9 +107,7 @@ public final class JT808 {
if (channel != null && channel.isActive()) {
channel.writeAndFlush(message)
.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
log.info("消息发送成功, 终端: {}, 消息ID: 0x{}, 流水号: {}", terminalPhone, Integer.toHexString(message.getMessageId()), message.getFlowId());
} else {
if (!future.isSuccess()) {
log.error("消息发送失败, 终端: {}, 消息ID: 0x{}, 流水号: {}", terminalPhone, Integer.toHexString(message.getMessageId()), message.getFlowId(), future.cause());
}
});
@ -205,10 +202,15 @@ public final class JT808 {
}
private static boolean verifyCheckCode(JT808Message message, ByteBuf originalBuf) {
// 简化实现,实际项目中需要根据协议规范计算校验码
// 校验码 = 消息头 + 消息体 中所有字节的异或
return true;
}
public static void sendServerRegisterReply(String terminalId, int flowId, int result, String authCode) {
ServerRegisterReplyMessage serverRegisterReplyMessage = new ServerRegisterReplyMessage(terminalId, flowId, result, authCode);
JT808.sendMessage(JT808.createBaseMessage(terminalId, MessageType.ServerRegisterReply.getMessageId(), serverRegisterReplyMessage.toBytes()));
}
public static void sendServerRegisterReply(String terminalId, int flowId, String authCode) {
sendServerRegisterReply(terminalId, flowId, 0, authCode);
}
}

View File

@ -1,7 +1,7 @@
package com.njzscloud.common.localizer.jt808.message;
import cn.hutool.core.util.ReflectUtil;
import com.njzscloud.common.core.fastjson.Fastjson;
import com.njzscloud.common.core.jackson.Jackson;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
@ -51,7 +51,7 @@ public class JT808Message {
@Override
public String toString() {
return Fastjson.toJsonStr(this);
return Jackson.toJsonStr(this);
}
public <T> T parseMessageBody(Class<T> clazz) {

View File

@ -1,4 +1,4 @@
package com.njzscloud.common.localizer.tuqiang.message;
package com.njzscloud.common.localizer.jt808.message;
import cn.hutool.core.date.DateUtil;
import com.njzscloud.common.core.utils.BCD;

View File

@ -1,4 +1,4 @@
package com.njzscloud.common.localizer.tuqiang.message;
package com.njzscloud.common.localizer.jt808.message;
import lombok.Getter;
import lombok.Setter;

View File

@ -1,4 +1,4 @@
package com.njzscloud.common.localizer.tuqiang.message;
package com.njzscloud.common.localizer.jt808.message;
import io.netty.buffer.ByteBuf;
import lombok.Getter;

View File

@ -14,9 +14,18 @@ import lombok.experimental.Accessors;
@ToString
@Accessors(chain = true)
@AllArgsConstructor
public class ServerGeneralResponseMessage implements ServerMessageBody {
public class ServerGeneralReplyMessage implements ServerMessageBody {
/**
* ID
*/
private int messageId;
/**
*
*/
private int flowId;
/**
* 0
*/
private int result;

View File

@ -1,5 +1,14 @@
package com.njzscloud.common.localizer.jt808.message;
/**
*
*
*/
public interface ServerMessageBody {
/**
*
*
* @return
*/
byte[] toBytes();
}

View File

@ -9,15 +9,30 @@ import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
/**
*
*/
@Getter
@Setter
@ToString
@Accessors(chain = true)
@AllArgsConstructor
public class ServerRegisterResponseMessage implements ServerMessageBody {
public class ServerRegisterReplyMessage implements ServerMessageBody {
/**
* ID
*/
private String terminalId;
/**
*
*/
private int flowId;
/**
* 0
*/
private int result;
/**
*
*/
private String authCode;
@Override
@ -26,7 +41,9 @@ public class ServerRegisterResponseMessage implements ServerMessageBody {
try {
body.writeShort(flowId);
body.writeByte(result);
ByteBufUtil.writeUtf8(body, authCode);
if (result == 0) {
ByteBufUtil.writeAscii(body, authCode);
}
return ByteBufUtil.getBytes(body);
} finally {
body.release();

View File

@ -1,36 +1,69 @@
package com.njzscloud.common.localizer.jt808.message;
import cn.hutool.core.util.CharsetUtil;
import com.njzscloud.common.core.ex.Exceptions;
import com.njzscloud.common.localizer.contant.BodyCharset;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import java.io.UnsupportedEncodingException;
@Getter
@Setter
@ToString
@Slf4j
@Accessors(chain = true)
public class ServerTxtMessage implements ServerMessageBody {
/**
*
* 0x01
* 0x01
*/
private byte flag;
/**
*
*/
private String txt;
/**
* UTF-8
*/
private BodyCharset charset;
public ServerTxtMessage(String txt) {
this(txt, BodyCharset.GBK);
}
public ServerTxtMessage(String txt, BodyCharset charset) {
this.flag = 0x01;
this.txt = txt;
this.charset = charset;
}
public byte[] toBytes() {
byte[] messageBody = new byte[txt.length() + 1];
messageBody[0] = flag;
try {
System.arraycopy(txt.getBytes(CharsetUtil.GBK), 0, messageBody, 1, txt.length());
} catch (UnsupportedEncodingException e) {
log.error("文本消息构建失败,原文本:{}", txt, e);
switch (charset) {
case ASCII: {
for (int i = 0; i < txt.length(); i++) {
char c = txt.charAt(i);
messageBody[i + 1] = (byte) c;
}
}
break;
case GBK: {
try {
System.arraycopy(txt.getBytes(), 0, messageBody, 1, txt.length());
} catch (Exception e) {
throw Exceptions.error(e, "文本消息构建失败,原文本:{}", txt);
}
}
break;
default: {
throw Exceptions.error("不支持的文本消息编码:{}", charset);
}
}
return messageBody;
}
}

View File

@ -2,7 +2,6 @@ package com.njzscloud.common.localizer.jt808.message;
import cn.hutool.core.util.CharsetUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@ -13,13 +12,14 @@ import lombok.experimental.Accessors;
@ToString
@Accessors(chain = true)
public class TerminalAuthMessage {
/**
*
*/
private String authCode;
public TerminalAuthMessage(byte[] bytes) {
ByteBuf body = Unpooled.wrappedBuffer(bytes);
public TerminalAuthMessage(ByteBuf body) {
byte[] authCodeBytes = new byte[body.readableBytes()];
body.readBytes(authCodeBytes);
this.authCode = new String(authCodeBytes, CharsetUtil.CHARSET_GBK);
body.release();
}
}

View File

@ -1,30 +1,36 @@
package com.njzscloud.common.localizer.jt808.message;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
/**
*
*/
@Getter
@Setter
@ToString
@Accessors(chain = true)
public class TerminalGeneralResponseMessage {
/**
*
*/
private int flowId;
/**
* ID
*/
private int messageId;
/**
* 0/123
*/
private int result;
public TerminalGeneralResponseMessage(byte[] bytes) {
ByteBuf body = Unpooled.wrappedBuffer(bytes);
public TerminalGeneralResponseMessage(ByteBuf body) {
this.flowId = body.readUnsignedShort();
this.messageId = body.readUnsignedShort();
this.result = body.readByte();
body.release();
}
public boolean isSuccess() {

View File

@ -1,16 +1,39 @@
package com.njzscloud.common.localizer.jt808.message;
import io.netty.buffer.ByteBuf;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
/**
*
*/
@Getter
@Setter
@ToString
@Accessors(chain = true)
public class TerminalMessageBody {
private String terminalId;
private ByteBuf body;
/**
* ID
*/
protected String terminalId;
/**
*
*/
protected byte[] body;
/**
* Id
*/
private int messageId;
/**
*
*/
private int flowId;
public TerminalMessageBody(JT808Message message) {
this.terminalId = message.getTerminalPhone();
this.messageId = message.getMessageId();
this.flowId = message.getFlowId();
this.body = message.getMessageBody();
}
}

View File

@ -2,13 +2,15 @@ package com.njzscloud.common.localizer.jt808.message;
import cn.hutool.core.util.CharsetUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
/**
*
*/
@Slf4j
@Getter
@Setter
@ -50,8 +52,7 @@ public class TerminalRegisterMessage {
*/
private String plate;
public TerminalRegisterMessage(byte[] bytes) {
ByteBuf body = Unpooled.wrappedBuffer(bytes);
public TerminalRegisterMessage(ByteBuf body) {
// 读取省域IDWORD2字节大端序
this.province = body.readUnsignedShort();
@ -81,8 +82,6 @@ public class TerminalRegisterMessage {
byte[] plateBytes = new byte[body.readableBytes()];
body.readBytes(plateBytes);
this.plate = new String(plateBytes, CharsetUtil.CHARSET_GBK);
body.release();
}
}

View File

@ -1,28 +1,51 @@
package com.njzscloud.common.localizer.jt808.message;
import cn.hutool.core.util.CharsetUtil;
import com.njzscloud.common.core.ex.Exceptions;
import com.njzscloud.common.localizer.contant.BodyCharset;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
@Getter
@Setter
@ToString
/**
*
*/
@Accessors(chain = true)
public class TerminalTxtMessage {
private byte type;
/**
*
*/
@Getter
private final byte type;
/**
*
*/
private String txt;
private final byte[] txt;
public TerminalTxtMessage(ByteBuf body) {
this.type = body.readByte();
this.txt = body.toString(body.readerIndex(), body.readableBytes(),
this.type == 0x00 ?
CharsetUtil.CHARSET_GBK :
CharsetUtil.CHARSET_UTF_8);
this.txt = ByteBufUtil.getBytes(body);
}
public String getTxt() {
return new String(txt, this.type == 0x00 ?
CharsetUtil.CHARSET_GBK :
CharsetUtil.CHARSET_UTF_8);
}
public String getTxt(BodyCharset charset) {
switch (charset) {
case GBK:
return new String(txt, CharsetUtil.CHARSET_GBK);
case ASCII:
StringBuilder stringBuilder = new StringBuilder();
for (byte b : txt) {
stringBuilder.append((char) b);
}
return stringBuilder.toString();
default:
throw Exceptions.error("不支持的文本消息编码:{}", charset);
}
}
}

View File

@ -0,0 +1,63 @@
package com.njzscloud.common.localizer.jt808.support;
import com.njzscloud.common.localizer.DeviceStore;
import com.njzscloud.common.localizer.Localizer;
import com.njzscloud.common.localizer.contant.LocalizerType;
import com.njzscloud.common.localizer.contant.MessageType;
import com.njzscloud.common.localizer.jt808.message.JT808Message;
import com.njzscloud.common.localizer.jt808.message.TerminalMessageBody;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
/**
*
*/
@Slf4j
public class JT808MessageCenter {
private static final Map<LocalizerType, Map<MessageType, Consumer<TerminalMessageBody>>> listeners = new ConcurrentHashMap<>();
public static void dispatch(JT808Message message) {
int messageId = message.getMessageId();
String terminalId = message.getTerminalPhone();
Localizer localizer = DeviceStore.determineLocalizer(terminalId);
if (localizer == null) {
log.error("未找到设备:{}", terminalId);
return;
}
LocalizerType localizerType = localizer.getLocalizerType();
Map<MessageType, Consumer<TerminalMessageBody>> m = listeners.computeIfPresent(localizerType, (k, v) -> {
MessageType messageType = MessageType.getMessageType(messageId);
if (messageType == null) {
log.error("未找到消息类型,设备号:{},消息 Id0x{}", terminalId, Integer.toHexString(messageId));
return v;
}
Consumer<TerminalMessageBody> c = v.computeIfPresent(messageType, (k1, v1) -> {
try {
v1.accept(new TerminalMessageBody(message));
} catch (Exception e) {
log.error("消息处理异常:设备号:{},设备类型:{},消息类型:{}", terminalId, localizerType, messageType, e);
}
return v1;
});
if (c == null) {
log.error("消息处理器不存在,设备号:{},设备类型:{},消息类型:{}", terminalId, localizerType, messageType);
}
return v;
});
if (m == null) {
log.error("设备处理器不存在,设备号:{},设备类型:{}", terminalId, localizerType);
}
}
public static void addListener(LocalizerType localizerType, MessageType messageType, Consumer<TerminalMessageBody> listener) {
listeners.computeIfAbsent(localizerType, k -> new ConcurrentHashMap<>()).put(messageType, listener);
}
public static void addListeners(LocalizerType localizerType, Map<MessageType, Consumer<TerminalMessageBody>> listeners) {
listeners.forEach((messageType, consumer) -> addListener(localizerType, messageType, consumer));
}
}

View File

@ -1,12 +1,11 @@
package com.njzscloud.common.localizer.jt808.support;
import com.njzscloud.common.localizer.DeviceStore;
import com.njzscloud.common.localizer.Localizer;
import com.njzscloud.common.localizer.jt808.JT808;
import com.njzscloud.common.localizer.jt808.message.JT808Message;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
/**
@ -15,23 +14,30 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class JT808MessageHandler extends ChannelInboundHandlerAdapter {
// 管理所有连接的客户端
private static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
// private static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("客户端连接: {}", ctx.channel().remoteAddress());
// log.info("客户端连接: {}", ctx.channel().remoteAddress());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof JT808Message) {
JT808Message message = (JT808Message) msg;
int messageId = message.getMessageId();
// log.info("收到消息, 设备号: {}消息ID: 0x{}, 流水号: {}", message.getTerminalPhone(), Integer.toHexString(messageId), message.getFlowId());
if (messageId == 0x0100) {
JT808.register(message.getTerminalPhone(), ctx.channel());
String terminalPhone = message.getTerminalPhone();
Localizer localizer = DeviceStore.determineLocalizer(terminalPhone);
if (localizer == null) {
// log.warn("设备不存在:{}", terminalPhone);
ctx.channel().close();
return;
}
JT808MessageResolver.dispatchMsg(messageId, message);
int messageId = message.getMessageId();
if (messageId == 0x0100) {
log.info("客户端连接: {} {}", terminalPhone, ctx.channel().remoteAddress());
JT808.register(terminalPhone, ctx.channel());
}
JT808MessageCenter.dispatch(message);
} else {
super.channelRead(ctx, msg);
}
@ -39,8 +45,8 @@ public class JT808MessageHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("客户端断开连接: {}", ctx.channel().remoteAddress());
clients.remove(ctx.channel());
// log.warn("客户端断开连接: {}", ctx.channel().remoteAddress());
// clients.remove(ctx.channel());
JT808.unregister(ctx.channel());
}

View File

@ -0,0 +1,174 @@
package com.njzscloud.common.localizer.jt808.support;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.map.MapUtil;
import com.njzscloud.common.localizer.DeviceStore;
import com.njzscloud.common.localizer.Localizer;
import com.njzscloud.common.localizer.contant.LocalizerType;
import com.njzscloud.common.localizer.contant.MessageType;
import com.njzscloud.common.localizer.jt808.JT808;
import com.njzscloud.common.localizer.jt808.message.*;
import com.njzscloud.common.localizer.mqtt.result.RealtimeLocationResult;
import com.njzscloud.common.mqtt.util.Mqtt;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.function.Consumer;
@Slf4j
public abstract class JT808MessageListener {
public JT808MessageListener(LocalizerType localizerType) {
JT808MessageCenter.addListeners(localizerType, MapUtil.<MessageType, Consumer<TerminalMessageBody>>builder()
.put(MessageType.TerminalRegister, this::onTerminalRegister)
.put(MessageType.TerminalAuth, this::onTerminalAuth)
.put(MessageType.TerminalGeneralReply, this::onGeneralReport)
.put(MessageType.TerminalHeartbeat, this::onHeartbeat)
.put(MessageType.TerminalLocationReport, this::onLocationReport)
.put(MessageType.TerminalLocationBatchReport, this::onLocationBatchUpload)
.put(MessageType.TerminalLocationSearchReply, this::onLocationSearchReport)
.put(MessageType.TerminalTxtMsg, this::onTxtReport)
.build());
}
/**
*
*/
public void onTerminalRegister(TerminalMessageBody message) {
String terminalId = message.getTerminalId();
int flowId = message.getFlowId();
JT808.sendServerRegisterReply(terminalId, flowId, terminalId);
byte[] body = message.getBody();
ByteBuf byteBuf = Unpooled.wrappedBuffer(body);
TerminalRegisterMessage terminalRegisterMessage = new TerminalRegisterMessage(byteBuf);
log.info("{},终端注册消息: {}", terminalId, terminalRegisterMessage);
}
/**
*
*/
public void onTerminalAuth(TerminalMessageBody message) {
String terminalId = message.getTerminalId();
int messageId = message.getMessageId();
int flowId = message.getFlowId();
JT808.sendGeneralReply(terminalId, messageId, flowId);
ByteBuf body = Unpooled.wrappedBuffer(message.getBody());
TerminalAuthMessage authMsg = null;
try {
authMsg = new TerminalAuthMessage(body);
} catch (Exception e) {
body.release();
}
log.info("{},终端鉴权消息: {}", terminalId, authMsg);
}
/**
*
*/
public void onGeneralReport(TerminalMessageBody message) {
ByteBuf body = Unpooled.wrappedBuffer(message.getBody());
TerminalGeneralResponseMessage terminalGeneralResponseMessage = new TerminalGeneralResponseMessage(body);
log.info("设备通用响应,设备号:{},消息:{}", message.getTerminalId(), terminalGeneralResponseMessage.getResult());
}
/**
*
*/
public void onHeartbeat(TerminalMessageBody message) {
JT808.sendGeneralReply(message.getTerminalId(), message.getMessageId(), message.getFlowId());
String terminalId = message.getTerminalId();
Mqtt.publish(terminalId + "/online", MapUtil.builder()
.put("online", true)
.put("terminalId", terminalId)
.put("time", DateUtil.now())
.build());
}
/**
*
*/
public void onLocationReport(TerminalMessageBody message) {
String terminalId = message.getTerminalId();
JT808.sendGeneralReply(terminalId, message.getMessageId(), message.getFlowId());
byte[] body = message.getBody();
ByteBuf byteBuf = Unpooled.wrappedBuffer(body);
try {
Localizer localizer = DeviceStore.determineLocalizer(terminalId);
if (localizer == null) {
log.warn("未找到定位器:{}", terminalId);
return;
}
LocationReportMessage locationReportMsg = new LocationReportMessage(byteBuf);
RealtimeLocationResult realtimeLocationResult = BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class)
.setTerminalId(terminalId)
.setOverspeed(locationReportMsg.getSpeed() > localizer.getSpeedThreshold());
Mqtt.publish(terminalId + "/track_location", realtimeLocationResult);
Mqtt.publish(terminalId + "/track_location_real", realtimeLocationResult);
} finally {
byteBuf.release();
}
}
/**
*
*/
public void onLocationBatchUpload(TerminalMessageBody message) {
JT808.sendGeneralReply(message.getTerminalId(), message.getMessageId(), message.getFlowId());
String terminalId = message.getTerminalId();
byte[] body = message.getBody();
log.info("批量上报:{}", Arrays.toString(body));
ByteBuf byteBuf = Unpooled.wrappedBuffer(body);
try {
int count = byteBuf.readUnsignedShort();
int type = byteBuf.readByte();
Localizer localizer = DeviceStore.determineLocalizer(terminalId);
if (localizer == null) {
log.warn("未找到定位器:{}", terminalId);
return;
}
int speedThreshold = localizer.getSpeedThreshold();
for (int i = 0; i < count; i++) {
int length = byteBuf.readUnsignedShort();
LocationReportMessage locationReportMsg = new LocationReportMessage(byteBuf.slice(byteBuf.readerIndex(), length));
byteBuf.skipBytes(length);
RealtimeLocationResult realtimeLocationResult = BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class)
.setTerminalId(terminalId)
.setOverspeed(locationReportMsg.getSpeed() > speedThreshold)
.setType(type);
Mqtt.publish(terminalId + "/track_location", realtimeLocationResult);
Mqtt.publish(terminalId + "/track_location_real", realtimeLocationResult);
}
} finally {
byteBuf.release();
}
}
/**
*
*/
public void onLocationSearchReport(TerminalMessageBody message) {
JT808.sendGeneralReply(message.getTerminalId(), message.getMessageId(), message.getFlowId());
String terminalId = message.getTerminalId();
byte[] body = message.getBody();
ByteBuf byteBuf = Unpooled.wrappedBuffer(body);
try {
SearchLocationResponseMessage searchLocationResponseMsg = new SearchLocationResponseMessage(byteBuf);
LocationReportMessage locationReportMsg = searchLocationResponseMsg.getLocationReportMessage();
RealtimeLocationResult realtimeLocationResult = BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class)
.setTerminalId(terminalId);
Mqtt.publish(terminalId + "/current_location", realtimeLocationResult);
} finally {
byteBuf.release();
}
}
/**
*
*/
public abstract void onTxtReport(TerminalMessageBody message);
}

View File

@ -1,19 +1,10 @@
package com.njzscloud.common.localizer.jt808.support;
import cn.hutool.core.map.MapUtil;
import com.njzscloud.common.localizer.jt808.JT808;
import com.njzscloud.common.localizer.jt808.message.*;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
@Slf4j
public class JT808MessageResolver {
public static final String Heartbeat = "Heartbeat";
/* public static final String Heartbeat = "Heartbeat";
public static final String LocationReport = "LocationReport";
public static final String LocationBatchReport = "LocationBatchReport";
public static final String LocationSearch = "LocationSearch";
@ -37,13 +28,13 @@ public class JT808MessageResolver {
}
public static void dispatchMsg(String messageId, JT808Message message) {
Consumer<TerminalMessageBody> heartbeat = resolvers.get(messageId);
if (heartbeat != null) {
Consumer<TerminalMessageBody> messageBodyConsumer = resolvers.get(messageId);
if (messageBodyConsumer != null) {
String terminalPhone = message.getTerminalPhone();
byte[] messageBody = message.getMessageBody();
ByteBuf byteBuf = Unpooled.wrappedBuffer(messageBody);
try {
heartbeat.accept(new TerminalMessageBody()
messageBodyConsumer.accept(new TerminalMessageBody()
.setTerminalId(terminalPhone)
.setBody(byteBuf));
} catch (Exception e) {
@ -61,76 +52,78 @@ public class JT808MessageResolver {
}
}
/**
*//**
*
*/
*//*
public static void onTerminalRegister(JT808Message message) {
String terminalPhone = message.getTerminalPhone();
int flowId = message.getFlowId();
ServerRegisterResponseMessage serverRegisterResponseMessage = new ServerRegisterResponseMessage(terminalPhone, flowId, 0, terminalPhone);
JT808.sendMessage(JT808.createBaseMessage(terminalPhone, 0x8100, serverRegisterResponseMessage.toBytes()));
ServerRegisterReplyMessage serverRegisterReplyMessage = new ServerRegisterReplyMessage(terminalPhone, flowId, 0, terminalPhone);
JT808.sendMessage(JT808.createBaseMessage(terminalPhone, 0x8100, serverRegisterReplyMessage.toBytes()));
TerminalRegisterMessage terminalRegisterMessage = message.parseMessageBody(TerminalRegisterMessage.class);
log.info("终端注册消息: {}", terminalRegisterMessage);
log.info("{}终端注册消息: {}", message.getTerminalPhone(), terminalRegisterMessage);
}
/**
*//**
*
*/
*//*
public static void onTerminalAuth(JT808Message message) {
JT808.sendGeneralResponse(message);
JT808.sendGeneralReply(message);
TerminalAuthMessage authMsg = message.parseMessageBody(TerminalAuthMessage.class);
log.info("终端鉴权消息: {}", authMsg);
log.info("{}终端鉴权消息: {}", message.getTerminalPhone(), authMsg);
}
/**
*//**
*
*/
*//*
public static void onTerminalGeneralResponse(JT808Message message) {
TerminalGeneralResponseMessage terminalGeneralResponseMessage = message.parseMessageBody(TerminalGeneralResponseMessage.class);
int messageId = message.getMessageId();
String terminalPhone = message.getTerminalPhone();
int result = terminalGeneralResponseMessage.getResult();
log.info("设备通用响应,设备号:{},消息 Id{},结果:{}", terminalPhone, Integer.toHexString(messageId), result);
log.info("设备通用响应,设备号:{},消息:{}", message.getTerminalPhone(), terminalGeneralResponseMessage.getResult());
}
/**
*//**
*
*/
*//*
public static void onHeartbeat(JT808Message message) {
JT808.sendGeneralResponse(message);
log.info("{},终端心跳消息: {}", message.getTerminalPhone(), message);
JT808.sendGeneralReply(message);
dispatchMsg(Heartbeat, message);
}
/**
*//**
*
*/
*//*
public static void onLocationReport(JT808Message message) {
log.info("{},终端位置信息汇报消息: {}", message.getTerminalPhone(), message);
dispatchMsg(LocationReport, message);
}
/**
*//**
*
*/
*//*
public static void onLocationBatchUpload(JT808Message message) {
log.info("{},定位数据批量上传消息: {}", message.getTerminalPhone(), message);
dispatchMsg(LocationBatchReport, message);
}
/**
*//**
*
*/
*//*
public static void onSearchLocationResponse(JT808Message message) {
JT808.sendGeneralResponse(message);
log.info("{},查询位置响应消息: {}", message.getTerminalPhone(), message);
JT808.sendGeneralReply(message);
dispatchMsg(LocationSearch, message);
}
/**
*//**
*
*/
*//*
public static void onTerminalTxtReport(JT808Message message) {
JT808.sendGeneralResponse(message);
log.info("{},终端文本信息汇报消息: {}", message.getTerminalPhone(), message);
JT808.sendGeneralReply(message);
dispatchMsg(TxtReport, message);
}
} */
}

View File

@ -17,8 +17,6 @@ public class TcpServer {
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private ServerBootstrap bootstrap;
private Thread startThread;
;
public TcpServer(int port, int bossThreads, int workerThreads) {
if (port <= 0 || bossThreads <= 0 || workerThreads <= 0) {

View File

@ -1,21 +1,48 @@
package com.njzscloud.common.localizer.mqtt;
import com.fasterxml.jackson.core.type.TypeReference;
import com.njzscloud.common.localizer.DeviceStore;
import com.njzscloud.common.localizer.Localizer;
import com.njzscloud.common.localizer.contant.LocalizerType;
import com.njzscloud.common.localizer.htzj.Htzj;
import com.njzscloud.common.localizer.mqtt.param.*;
import com.njzscloud.common.localizer.tuqiang.Tuqiang;
import com.njzscloud.common.mqtt.support.MqttListener;
import com.njzscloud.common.mqtt.support.MqttMsg;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
@Slf4j
public class MqttMsgHandlers {
TypeReference<List<DeviceLoadParam>> typeReference = new TypeReference<List<DeviceLoadParam>>() {
};
/**
*
*/
@MqttListener(topic = "location/track")
public void onMessage(MqttMsg msg) {
LocationTrackParam locationTrackParam = msg.getMsg(LocationTrackParam.class);
Tuqiang.track(locationTrackParam.getTerminalId(), locationTrackParam.getInterval());
String terminalId = locationTrackParam.getTerminalId();
LocalizerType localizerType = DeviceStore.determineType(terminalId);
if (localizerType == null) {
log.warn("未确定定位器类型终端ID{}", terminalId);
return;
}
switch (localizerType) {
case Tuqiang:
Tuqiang.track(terminalId, locationTrackParam.getInterval());
break;
case HangTianZiJin:
Htzj.track(terminalId, locationTrackParam.getInterval());
break;
default:
log.warn("不支持的定位器类型:{}", localizerType);
}
}
/**
@ -24,7 +51,22 @@ public class MqttMsgHandlers {
@MqttListener(topic = "location/current")
public void currentLocation(MqttMsg msg) {
LocationCurrentParam locationCurrentParam = msg.getMsg(LocationCurrentParam.class);
Tuqiang.currentLocation(locationCurrentParam.getTerminalId());
String terminalId = locationCurrentParam.getTerminalId();
LocalizerType localizerType = DeviceStore.determineType(terminalId);
if (localizerType == null) {
log.warn("未确定定位器类型终端ID{}", terminalId);
return;
}
switch (localizerType) {
case Tuqiang:
Tuqiang.currentLocation(terminalId);
break;
case HangTianZiJin:
Htzj.currentLocation(terminalId);
break;
default:
log.warn("不支持的定位器类型:{}", localizerType);
}
}
/**
@ -34,8 +76,22 @@ public class MqttMsgHandlers {
public void enableWarn(MqttMsg msg) {
EnableWarnParam enableWarnParam = msg.getMsg(EnableWarnParam.class);
String terminalId = enableWarnParam.getTerminalId();
LocalizerType localizerType = DeviceStore.determineType(terminalId);
if (localizerType == null) {
log.warn("未确定定位器类型终端ID{}", terminalId);
return;
}
boolean enable = enableWarnParam.isEnable();
Tuqiang.enableWarn(terminalId, enable);
switch (localizerType) {
case Tuqiang:
Tuqiang.enableWarn(terminalId, enable);
break;
case HangTianZiJin:
Htzj.enableWarn(terminalId, enable);
break;
default:
log.warn("不支持的定位器类型:{}", localizerType);
}
}
/**
@ -47,7 +103,21 @@ public class MqttMsgHandlers {
String terminalId = configIpPortParam.getTerminalId();
String ip = configIpPortParam.getIp();
int port = configIpPortParam.getPort();
Tuqiang.configIpPort(terminalId, ip, port);
LocalizerType localizerType = DeviceStore.determineType(terminalId);
if (localizerType == null) {
log.warn("未确定定位器类型终端ID{}", terminalId);
return;
}
switch (localizerType) {
case Tuqiang:
Tuqiang.configIpPort(terminalId, ip, port);
break;
case HangTianZiJin:
Htzj.configIpPort(terminalId, ip, port);
break;
default:
log.warn("不支持的定位器类型:{}", localizerType);
}
}
/**
@ -58,7 +128,21 @@ public class MqttMsgHandlers {
SpeedThresholdParam speedThresholdParam = msg.getMsg(SpeedThresholdParam.class);
String terminalId = speedThresholdParam.getTerminalId();
int speed = speedThresholdParam.getSpeed();
Tuqiang.speedThreshold(terminalId, speed);
LocalizerType localizerType = DeviceStore.determineType(terminalId);
if (localizerType == null) {
log.warn("未确定定位器类型终端ID{}", terminalId);
return;
}
switch (localizerType) {
case Tuqiang:
Tuqiang.speedThreshold(terminalId, speed);
break;
case HangTianZiJin:
Htzj.speedThreshold(terminalId, speed);
break;
default:
log.warn("不支持的定位器类型:{}", localizerType);
}
}
/**
@ -68,6 +152,57 @@ public class MqttMsgHandlers {
public void obtainDeviceInfo(MqttMsg msg) {
ObtainDeviceInfoParam obtainDeviceInfoParam = msg.getMsg(ObtainDeviceInfoParam.class);
String terminalId = obtainDeviceInfoParam.getTerminalId();
Tuqiang.obtainDeviceInfo(terminalId);
LocalizerType localizerType = DeviceStore.determineType(terminalId);
if (localizerType == null) {
log.warn("未确定定位器类型终端ID{}", terminalId);
return;
}
switch (localizerType) {
case Tuqiang:
Tuqiang.obtainDeviceInfo(terminalId);
break;
case HangTianZiJin:
Htzj.obtainDeviceInfo(terminalId);
break;
default:
log.warn("不支持的定位器类型:{}", localizerType);
}
}
/**
*
*/
@MqttListener(topic = "location/load")
public void load(MqttMsg msg) {
List<DeviceLoadParam> deviceLoadParams = msg.getMsg(typeReference.getType());
LinkedList<Localizer> list = deviceLoadParams.stream()
.map(it -> new Localizer().setTerminalId(it.getTerminalId()).setLocalizerType(it.getLocalizerType()))
.collect(Collectors.toCollection(LinkedList::new));
DeviceStore.setLocalizers(list);
}
/**
*
*/
@MqttListener(topic = "localizer/send_directive")
public void sendDirective(MqttMsg msg) {
SendDirectiveParam sendDirectiveParam = msg.getMsg(SendDirectiveParam.class);
String terminalId = sendDirectiveParam.getTerminalId();
LocalizerType localizerType = DeviceStore.determineType(terminalId);
if (localizerType == null) {
log.warn("未确定定位器类型终端ID{}", terminalId);
return;
}
String directive = sendDirectiveParam.getDirective();
switch (localizerType) {
case Tuqiang:
Tuqiang.sendDirective(terminalId, directive);
break;
case HangTianZiJin:
Htzj.sendDirective(terminalId, directive);
break;
default:
log.warn("不支持的定位器类型:{}", localizerType);
}
}
}

View File

@ -1,5 +1,6 @@
package com.njzscloud.common.localizer.mqtt.param;
import com.njzscloud.common.localizer.contant.LocalizerType;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@ -10,7 +11,20 @@ import lombok.experimental.Accessors;
@ToString
@Accessors(chain = true)
public class ConfigIpPortParam {
/**
* ID
*/
private String terminalId;
/**
* IP
*/
private String ip;
/**
*
*/
private int port;
/**
*
*/
private LocalizerType localizer;
}

View File

@ -0,0 +1,22 @@
package com.njzscloud.common.localizer.mqtt.param;
import com.njzscloud.common.localizer.contant.LocalizerType;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
@Getter
@Setter
@ToString
@Accessors(chain = true)
public class DeviceLoadParam {
/**
* ID
*/
private String terminalId;
/**
*
*/
private LocalizerType localizerType;
}

View File

@ -1,5 +1,6 @@
package com.njzscloud.common.localizer.mqtt.param;
import com.njzscloud.common.localizer.contant.LocalizerType;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@ -10,6 +11,16 @@ import lombok.experimental.Accessors;
@ToString
@Accessors(chain = true)
public class EnableWarnParam {
/**
* ID
*/
private String terminalId;
/**
*
*/
private boolean enable;
/**
*
*/
private LocalizerType localizer;
}

View File

@ -10,5 +10,8 @@ import lombok.experimental.Accessors;
@ToString
@Accessors(chain = true)
public class LocationCurrentParam {
/**
* ID
*/
private String terminalId;
}

View File

@ -10,6 +10,12 @@ import lombok.experimental.Accessors;
@ToString
@Accessors(chain = true)
public class LocationTrackParam {
/**
* ID
*/
private String terminalId;
/**
* 0-36000
*/
private int interval;
}

View File

@ -1,5 +1,6 @@
package com.njzscloud.common.localizer.mqtt.param;
import com.njzscloud.common.localizer.contant.LocalizerType;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@ -10,5 +11,12 @@ import lombok.experimental.Accessors;
@ToString
@Accessors(chain = true)
public class ObtainDeviceInfoParam {
/**
* ID
*/
private String terminalId;
/**
*
*/
private LocalizerType localizer;
}

View File

@ -0,0 +1,15 @@
package com.njzscloud.common.localizer.mqtt.param;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
@Getter
@Setter
@ToString
@Accessors(chain = true)
public class SendDirectiveParam {
private String terminalId;
private String directive;
}

View File

@ -10,6 +10,12 @@ import lombok.experimental.Accessors;
@ToString
@Accessors(chain = true)
public class SpeedThresholdParam {
/**
* ID
*/
private String terminalId;
/**
*
*/
private int speed;
}

View File

@ -5,50 +5,43 @@ import cn.hutool.core.util.StrUtil;
import com.njzscloud.common.localizer.jt808.JT808;
import com.njzscloud.common.localizer.jt808.message.JT808Message;
import com.njzscloud.common.localizer.tuqiang.listener.TuqiangListeners;
import com.njzscloud.common.localizer.tuqiang.support.TuqiangListener;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.Method;
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class Tuqiang {
public static void run(int port, int bossThreads, int workerThreads) {
addListener(new TuqiangListeners());
JT808.run(port, bossThreads, workerThreads);
private static TuqiangListeners tuqiangListeners;
public static void init() {
tuqiangListeners = new TuqiangListeners();
}
public static void stop() {
JT808.stop();
}
private static void addListener(Object object) {
Method[] methods = object.getClass().getDeclaredMethods();
for (Method method : methods) {
if (method.isAnnotationPresent(TuqiangListener.class)) {
TuqiangListener tuqiangListener = method.getAnnotation(TuqiangListener.class);
String messageId = tuqiangListener.messageId();
JT808.addListener(messageId, (msg) -> {
try {
method.invoke(object, msg);
} catch (Exception e) {
log.error("途强处理消息 {} 时出错", messageId, e);
}
});
}
}
/**
*
*
* @param terminalId ID
* @param directive
*/
public static void sendDirective(String terminalId, String directive) {
log.info("发送途强指令终端ID{},指令:{}", terminalId, directive);
JT808.sendTxtMessage(terminalId, directive);
}
/**
*
*
* @param terminalId ID
* @param interval 0-36000
* @param interval 0-6000
*/
public static void track(String terminalId, int interval) {
Assert.isTrue(interval >= 0 && interval <= 3600, "上报间隔必须为0-3600秒");
if (interval < 1) {
interval = 1;
} else if (interval > 600) {
interval = 600;
}
log.info("开启途强追踪终端ID{},上报间隔:{}秒", terminalId, interval);
JT808.sendTxtMessage(terminalId, StrUtil.format("<SPBSJ*P:BSJGPS*C:{}*H:300>", interval));
}
@ -58,6 +51,7 @@ public final class Tuqiang {
* @param terminalId ID
*/
public static void obtainDeviceInfo(String terminalId) {
log.info("获取途强设备信息终端ID{}", terminalId);
JT808.sendTxtMessage(terminalId, "<CKBSJ>");
}
@ -67,6 +61,7 @@ public final class Tuqiang {
* @param terminalId ID
*/
public static void enableWarn(String terminalId, boolean enable) {
log.info("设置途强报警终端ID{},是否启用:{}", terminalId, enable);
JT808.sendTxtMessage(terminalId, StrUtil.format("<SPBSJ*P:BSJGPS*6W:{}>", enable ? 1 : 0));
}
@ -77,6 +72,7 @@ public final class Tuqiang {
* @param speed km/h
*/
public static void speedThreshold(String terminalId, int speed) {
log.info("设置途强速度报警终端ID{},速度:{}km/h", terminalId, speed);
JT808.sendTxtMessage(terminalId, StrUtil.format("<SPBSJ*P:BSJGPS*CS:{}*H:300>", speed));
}
@ -89,6 +85,7 @@ public final class Tuqiang {
* @param port
*/
public static void configIpPort(String terminalId, String ip, int port) {
log.info("设置途强设备连接地址终端ID{}IP{},端口:{}", terminalId, ip, port);
Assert.notEmpty(terminalId);
Assert.notEmpty(ip);
Assert.isTrue(port > 0);
@ -101,6 +98,7 @@ public final class Tuqiang {
* @param terminalId ID
*/
public static void currentLocation(String terminalId) {
log.info("获取途强当前位置终端ID{}", terminalId);
JT808Message baseMessage = JT808.createBaseMessage(terminalId, 0x8201);
JT808.sendMessage(baseMessage);
}

View File

@ -1,104 +1,42 @@
package com.njzscloud.common.localizer.tuqiang.listener;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.map.MapUtil;
import com.njzscloud.common.localizer.contant.LocalizerType;
import com.njzscloud.common.localizer.jt808.message.SearchDeviceInfoMessage;
import com.njzscloud.common.localizer.jt808.message.TerminalMessageBody;
import com.njzscloud.common.localizer.jt808.message.TerminalTxtMessage;
import com.njzscloud.common.localizer.mqtt.result.RealtimeLocationResult;
import com.njzscloud.common.localizer.tuqiang.message.LocationReportMessage;
import com.njzscloud.common.localizer.tuqiang.message.SearchDeviceInfoMessage;
import com.njzscloud.common.localizer.tuqiang.message.SearchLocationResponseMessage;
import com.njzscloud.common.localizer.tuqiang.support.TuqiangListener;
import com.njzscloud.common.localizer.jt808.support.JT808MessageListener;
import com.njzscloud.common.mqtt.util.Mqtt;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import static com.njzscloud.common.localizer.jt808.support.JT808MessageResolver.*;
@Slf4j
public class TuqiangListeners {
/**
*
*/
@TuqiangListener(messageId = Heartbeat)
public void onHeartbeat(TerminalMessageBody message) {
String terminalId = message.getTerminalId();
Mqtt.publish(terminalId + "/online", MapUtil.builder()
.put("online", true)
.put("terminalId", terminalId)
.put("time", DateUtil.now())
.build());
}
public class TuqiangListeners extends JT808MessageListener {
/**
*
*/
@TuqiangListener(messageId = LocationReport)
public void onLocationReport(TerminalMessageBody message) {
ByteBuf byteBuf = message.getBody();
LocationReportMessage locationReportMsg = new LocationReportMessage(byteBuf);
// log.info("终端位置信息汇报消息: {}", locationReportMsg);
String terminalId = message.getTerminalId();
RealtimeLocationResult realtimeLocationResult = BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class)
.setTerminalId(terminalId);
Mqtt.publish(terminalId + "/track_location", realtimeLocationResult);
Mqtt.publish(terminalId + "/track_location_real", realtimeLocationResult);
}
/**
*
*/
@TuqiangListener(messageId = LocationBatchReport)
public void onLocationBatchUpload(TerminalMessageBody message) {
String terminalId = message.getTerminalId();
ByteBuf body = message.getBody();
int count = body.readUnsignedShort();
int type = body.readByte();
for (int i = 0; i < count; i++) {
int length = body.readUnsignedShort();
LocationReportMessage locationReportMsg = new LocationReportMessage(body.slice(body.readerIndex(), length));
body.skipBytes(length);
RealtimeLocationResult realtimeLocationResult = BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class)
.setTerminalId(terminalId)
.setType(type);
Mqtt.publish(terminalId + "/track_location", realtimeLocationResult);
Mqtt.publish(terminalId + "/track_location_real", realtimeLocationResult);
}
}
/**
*
*/
@TuqiangListener(messageId = LocationSearch)
public void onSearchLocationResponse(TerminalMessageBody message) {
String terminalId = message.getTerminalId();
ByteBuf body = message.getBody();
SearchLocationResponseMessage searchLocationResponseMsg = new SearchLocationResponseMessage(body);
// log.info("查询位置响应消息: {}", searchLocationResponseMsg);
LocationReportMessage locationReportMsg = searchLocationResponseMsg.getLocationReportMessage();
RealtimeLocationResult realtimeLocationResult = BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class)
.setTerminalId(terminalId);
Mqtt.publish(terminalId + "/current_location", realtimeLocationResult);
public TuqiangListeners() {
super(LocalizerType.Tuqiang);
}
/**
*
*/
@TuqiangListener(messageId = TxtReport)
public void onTerminalTxtReport(TerminalMessageBody message) {
TerminalTxtMessage terminalTxtReportMsg = new TerminalTxtMessage(message.getBody());
// log.info("终端文本信息汇报消息: {}", terminalTxtReportMsg);
String txt = terminalTxtReportMsg.getTxt();
SearchDeviceInfoMessage deviceInfo = SearchDeviceInfoMessage.parse(txt);
if (deviceInfo != null) {
String terminalId = message.getTerminalId();
deviceInfo.setTerminalId(terminalId);
@Override
public void onTxtReport(TerminalMessageBody message) {
byte[] body = message.getBody();
ByteBuf byteBuf = Unpooled.wrappedBuffer(body);
try {
TerminalTxtMessage terminalTxtReportMsg = new TerminalTxtMessage(byteBuf);
String txt = terminalTxtReportMsg.getTxt();
log.info("终端文本信息汇报消息: {}", txt);
SearchDeviceInfoMessage deviceInfo = SearchDeviceInfoMessage.parse(txt);
if (deviceInfo != null) {
String terminalId = message.getTerminalId();
deviceInfo.setTerminalId(terminalId);
Mqtt.publish(terminalId + "/device_info", deviceInfo);
Mqtt.publish(terminalId + "/device_info", deviceInfo);
}
} catch (Exception e) {
byteBuf.release();
}
}
}

View File

@ -1,10 +0,0 @@
package com.njzscloud.common.localizer.tuqiang.support;
import java.lang.annotation.*;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface TuqiangListener {
String messageId();
}

View File

@ -11,7 +11,7 @@ import org.springframework.context.annotation.Configuration;
@EnableConfigurationProperties(MqttProperties.class)
public class MqttAutoConfiguration {
@Bean(destroyMethod = "shutdown")
@Bean(destroyMethod = "shutdown", name = "mqtt")
public MqttCliWrapper mqtt(MqttProperties mqttProperties) {
return new MqttCliWrapper(mqttProperties);
}

View File

@ -5,7 +5,9 @@ import cn.hutool.core.util.StrUtil;
import com.njzscloud.common.core.jackson.Jackson;
import com.njzscloud.common.mqtt.config.MqttProperties;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
@ -17,12 +19,12 @@ import java.util.function.Consumer;
@Slf4j
public class MqttCliWrapper implements BeanPostProcessor {
private static final Map<String, Consumer<MqttMsg>> fn = new ConcurrentHashMap<>();
static final Map<String, Consumer<MqttMsg>> fn = new ConcurrentHashMap<>();
private MqttClient client;
public MqttCliWrapper(MqttProperties mqttProperties) {
String broker = mqttProperties.getBroker();
String clientId = mqttProperties.getClientId() + "-" + IdUtil.nanoId(5);
String clientId = mqttProperties.getClientId() + "-" + IdUtil.nanoId();
String username = mqttProperties.getUsername();
String password = mqttProperties.getPassword();
try {
@ -38,7 +40,7 @@ public class MqttCliWrapper implements BeanPostProcessor {
}
client.connect(options);
client.setCallback(new MsgHandler());
client.setCallback(new MsgHandler(fn));
} catch (Exception e) {
client = null;
log.error("mqtt连接失败", e);
@ -123,23 +125,5 @@ public class MqttCliWrapper implements BeanPostProcessor {
}
}
public static class MsgHandler implements MqttCallback {
public void messageArrived(String topic, MqttMessage message) throws Exception {
Consumer<MqttMsg> handler = fn.get(topic);
if (handler == null) return;
try {
handler.accept(new MqttMsg(message.getPayload()));
} catch (Exception e) {
log.error("mqtt 消息处理失败:{} {}", topic, new String(message.getPayload()), e);
}
}
public void connectionLost(Throwable cause) {
log.error("mqtt 连接已断开", cause);
}
public void deliveryComplete(IMqttDeliveryToken token) {
// log.info("消息投递结果:{}", token.isComplete());
}
}
}

View File

@ -2,6 +2,8 @@ package com.njzscloud.common.mqtt.support;
import com.njzscloud.common.core.jackson.Jackson;
import java.lang.reflect.Type;
public class MqttMsg {
private final byte[] msg;
@ -12,4 +14,8 @@ public class MqttMsg {
public <T> T getMsg(Class<T> clazz) {
return Jackson.toBean(msg, clazz);
}
public <T> T getMsg(Type type) {
return Jackson.toBean(this.msg, type);
}
}

View File

@ -0,0 +1,51 @@
package com.njzscloud.common.mqtt.support;
import com.njzscloud.common.core.thread.ThreadPool;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Consumer;
@Slf4j
public class MsgHandler implements MqttCallback {
private final Map<String, Consumer<MqttMsg>> fn;
private final ThreadPoolExecutor threadPool;
public MsgHandler(Map<String, Consumer<MqttMsg>> fn) {
threadPool = ThreadPool.createThreadPool(
"mqtt-handler",
10, 200, 60,
20, Integer.MAX_VALUE,
(r, p) -> log.error("任务执行器线程池已满,拒绝任务:{}", r.toString())
);
this.fn = fn;
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
Consumer<MqttMsg> handler = fn.get(topic);
if (handler == null) return;
try {
CompletableFuture
.runAsync(() -> handler.accept(new MqttMsg(message.getPayload())), threadPool)
.exceptionally(e -> {
log.error("mqtt 消息处理失败:{} {}", topic, new String(message.getPayload()), e);
return null;
});
} catch (Exception e) {
log.error("mqtt 消息处理失败", e);
}
}
public void connectionLost(Throwable cause) {
log.error("mqtt 连接已断开", cause);
}
public void deliveryComplete(IMqttDeliveryToken token) {
// log.info("消息投递结果:{}", token.isComplete());
}
}

View File

@ -1,5 +1,6 @@
package com.njzscloud.common.mqtt.util;
import cn.hutool.core.map.MapUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.njzscloud.common.mqtt.support.MqttCliWrapper;
import com.njzscloud.common.mqtt.support.MqttMsg;
@ -34,6 +35,12 @@ public final class Mqtt {
MQTT_CLI_WRAPPER.publish(topic, msg);
}
public static void publish(String topic) {
MQTT_CLI_WRAPPER.publish(topic, MapUtil.builder()
.put("topic", topic)
.build());
}
public static void publish(String topic, int qos, Object msg) {
MQTT_CLI_WRAPPER.publish(topic, qos, msg);
}

View File

@ -9,4 +9,4 @@ localizer:
enabled: true
boss-threads: 1
worker-threads: 1
port: 18888
port: 30100

View File

@ -66,4 +66,12 @@ public class BizAuditConfigController {
return R.success(bizAuditConfigService.paging(pageParam, bizAuditConfigEntity));
}
/**
*
*/
@GetMapping("/copy")
public void copy() {
bizAuditConfigService.copy();
}
}

View File

@ -1,7 +1,6 @@
package com.njzscloud.supervisory.biz.pojo.entity;
import com.baomidou.mybatisplus.annotation.*;
import com.njzscloud.common.mp.support.handler.j.JsonTypeHandler;
import com.njzscloud.supervisory.biz.constant.AuditStatus;
import com.njzscloud.supervisory.biz.constant.BizObj;
import lombok.Getter;
@ -10,7 +9,6 @@ import lombok.experimental.Accessors;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.List;
/**
*
@ -77,10 +75,11 @@ public class BizCompanyEntity {
*/
private String legalRepresentative;
@TableField(typeHandler = JsonTypeHandler.class)
private List<String> idcard;
private String idcard;
private LocalDate idcardStartTime;
private LocalDate idcardEndTime;
private String idcardFront;
private String idcardBack;
/**
* ;
*/

View File

@ -7,7 +7,6 @@ import lombok.ToString;
import lombok.experimental.Accessors;
import java.time.LocalDate;
import java.util.List;
/**
*
@ -57,9 +56,11 @@ public class AddBizCompanyParam {
*
*/
private String legalRepresentative;
private List<String> idcard;
private String idcard;
private LocalDate idcardStartTime;
private LocalDate idcardEndTime;
private String idcardFront;
private String idcardBack;
/**
* ;
*/

View File

@ -45,7 +45,11 @@ public class ModifyBizCompanyParam {
* ; biz_obj
*/
private BizObj bizObj;
private String idcard;
private LocalDate idcardStartTime;
private LocalDate idcardEndTime;
private String idcardFront;
private String idcardBack;
/**
*
*/

View File

@ -2,6 +2,11 @@ package com.njzscloud.supervisory.biz.service;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aliyun.oss.ClientBuilderConfiguration;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.common.auth.DefaultCredentialProvider;
import com.aliyun.oss.common.comm.SignVersion;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.IService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
@ -48,7 +53,7 @@ public class BizAuditConfigService extends ServiceImpl<BizAuditConfigMapper, Biz
String areaRole = bizAuditConfigEntity.getAreaRole();
if (StrUtil.isNotBlank(areaRole)) {
Assert.isTrue(StrUtil.isNotBlank(bizAuditConfigEntity.getCityRole()), () -> Exceptions.clierr("市级审核员不能为空"));
Assert.isTrue(StrUtil.isNotBlank(bizAuditConfigEntity.getCityRole()), () -> Exceptions.clierr("市级审核员不能为空"));
}
Assert.isTrue(StrUtil.isNotBlank(bizAuditConfigEntity.getCityRole()), () -> Exceptions.clierr("市级审核员不能为空"));
this.save(bizAuditConfigEntity);
@ -92,4 +97,82 @@ public class BizAuditConfigService extends ServiceImpl<BizAuditConfigMapper, Biz
));
}
OSS oss(String accessKey, String secretKey, String endpoint, String region) {
ClientBuilderConfiguration clientBuilderConfiguration = new ClientBuilderConfiguration();
clientBuilderConfiguration.setSignatureVersion(SignVersion.V4);
DefaultCredentialProvider credentialProvider = new DefaultCredentialProvider(accessKey, secretKey);
return OSSClientBuilder.create()
.endpoint(endpoint)
.region(region)
.credentialsProvider(credentialProvider)
.clientConfiguration(clientBuilderConfiguration)
.build();
}
/* public static String uploadFile(OSS oss,String objectName, String contentType, InputStream inputStream) {
try {
ObjectMetadata objectMetadata = new ObjectMetadata();
if (StrUtil.isNotBlank(contentType)) objectMetadata.setContentType(contentType);
oss.putObject("cdn-zsy", objectName, inputStream, objectMetadata);
return "/" + "cdn-zsy" + "/" + objectName;
} catch (OSSException oe) {
log.error("阿里云上传文件失败,错误信息:{}、错误码:{}、请求标识:{}、主机标识:{}、存储桶:{}、对象名称:{}",
oe.getErrorMessage(),
oe.getErrorCode(),
oe.getRequestId(),
oe.getHostId(),
BUCKET_NAME,
objectName);
throw Exceptions.error("文件服务器错误");
} catch (ClientException ce) {
log.error("阿里云上传文件失败,错误信息:{}、存储桶:{}、对象名称:{}",
ce.getMessage(),
BUCKET_NAME,
objectName);
throw Exceptions.error("文件服务器错误");
}
}
public static Tuple2<InputStream, String> obtainFile(OSS oss,String bucketName, String objectName) {
try {
OSSObject response = CLIENT.getObject(new GetObjectRequest(bucketName, objectName));
ObjectMetadata objectMetadata = response.getObjectMetadata();
String contentType = objectMetadata.getContentType();
InputStream objectContent = response.getObjectContent();
return Tuple2.create(objectContent, contentType);
} catch (OSSException oe) {
log.error("阿里云下载文件失败,错误信息:{}、错误码:{}、请求标识:{}、主机标识:{}、存储桶:{}、对象名称:{}",
oe.getErrorMessage(),
oe.getErrorCode(),
oe.getRequestId(),
oe.getHostId(),
bucketName,
objectName);
// throw Exceptions.error("文件服务器错误");
} catch (ClientException ce) {
log.error("阿里云下载文件失败,错误信息:{}、存储桶:{}、对象名称:{}",
ce.getMessage(),
bucketName,
objectName);
// throw Exceptions.error("文件服务器错误");
}
return Tuple2.create(new ByteArrayInputStream(new byte[0]), "");
} */
public void copy() {
OSS oss1 = oss(
"LTAI5tJJu2WayYchExrT5W1E",
"zllX0ZJ1EwsZXT6dE6swCLgTF4ImGg",
"oss-cn-shanghai.aliyuncs.com",
"cn-shanghai"
);
OSS oss2 = oss(
"LTAI5tJJu2WayYchExrT5W1E",
"zllX0ZJ1EwsZXT6dE6swCLgTF4ImGg",
"oss-cn-shanghai.aliyuncs.com",
"cn-shanghai"
);
}
}

View File

@ -59,8 +59,7 @@ public class BizTruckService extends ServiceImpl<BizTruckMapper, BizTruckEntity>
if (StrUtil.isNotBlank(gps)) {
long count = deviceLocalizerService.count(Wrappers.<DeviceLocalizerEntity>query().eq("terminal_id", gps));
if (count == 0) {
deviceLocalizerService.add(new DeviceLocalizerEntity().setTerminalId(gps));
deviceLocalizerService.onApplicationReady();
throw Exceptions.exception("设备不存在:{}", gps);
}
}
this.save(bizTruckEntity);
@ -73,15 +72,14 @@ public class BizTruckService extends ServiceImpl<BizTruckMapper, BizTruckEntity>
*/
@Transactional(rollbackFor = Exception.class)
public void modify(ModifyBizTruckParam modifyBizTruckParam) {
this.updateById(BeanUtil.copyProperties(modifyBizTruckParam, BizTruckEntity.class));
BizTruckEntity bizTruck = this.getById(modifyBizTruckParam.getId());
String gps = bizTruck.getGps();
String gps = modifyBizTruckParam.getGps();
if (StrUtil.isNotBlank(gps)) {
long count = deviceLocalizerService.count(Wrappers.<DeviceLocalizerEntity>query().eq("terminal_id", gps));
if (count == 0) {
deviceLocalizerService.add(new DeviceLocalizerEntity().setTerminalId(gps));
throw Exceptions.exception("设备不存在:{}", gps);
}
}
this.updateById(BeanUtil.copyProperties(modifyBizTruckParam, BizTruckEntity.class));
}
/**
@ -105,7 +103,7 @@ public class BizTruckService extends ServiceImpl<BizTruckMapper, BizTruckEntity>
List<DictItemEntity> list = dictItemService.list(Wrappers.<DictItemEntity>lambdaQuery()
.eq(DictItemEntity::getDictKey, "vehicle_type"));
if (null != list && list.size() > 0) {
List<DictItemEntity> entityList = list.stream().filter(t-> t.getVal().equals(result.getTruckCategory()))
List<DictItemEntity> entityList = list.stream().filter(t -> t.getVal().equals(result.getTruckCategory()))
.collect(Collectors.toList());
if (entityList.size() > 0) {
result.setTruckCategory(entityList.get(0).getTxt());
@ -145,8 +143,8 @@ public class BizTruckService extends ServiceImpl<BizTruckMapper, BizTruckEntity>
List<DictItemEntity> list = dictItemService.list(Wrappers.<DictItemEntity>lambdaQuery()
.eq(DictItemEntity::getDictKey, "vehicle_type"));
if (null != list && list.size() > 0) {
for (SearchTruckResult result: page.getRecords()) {
List<DictItemEntity> entityList = list.stream().filter(t-> t.getVal().equals(result.getTruckCategory()))
for (SearchTruckResult result : page.getRecords()) {
List<DictItemEntity> entityList = list.stream().filter(t -> t.getVal().equals(result.getTruckCategory()))
.collect(Collectors.toList());
if (entityList.size() > 0) {
result.setTruckCategory(entityList.get(0).getTxt());

View File

@ -9,6 +9,9 @@ import org.springframework.web.bind.annotation.*;
import java.util.Map;
/**
*
*/
@Slf4j
@CrossOrigin
@RestController
@ -31,14 +34,18 @@ public class StatisticsController {
return R.success(statisticsService.obtainOrder(userId));
}
/**
*
*/
@GetMapping("/supervision/obtain_data")
public R<?> obtainData1() {
// long l = System.currentTimeMillis();
Map<String, Object> data = supervisionStatisticsService.obtainData();
return R.success(data);
}
/**
*
*/
@GetMapping("/first_page")
public R<Map<String, Object>> firstPage(@RequestParam(value = "trendStatisticsType", required = false, defaultValue = "1") Integer trendStatisticsType) {
return R.success(supervisionStatisticsService.firstPage(trendStatisticsType));

View File

@ -1,8 +1,11 @@
package com.njzscloud.supervisory.device.controller;
import cn.hutool.core.lang.Assert;
import com.njzscloud.common.core.ex.Exceptions;
import com.njzscloud.common.core.utils.R;
import com.njzscloud.common.mp.support.PageParam;
import com.njzscloud.common.mp.support.PageResult;
import com.njzscloud.common.security.util.SecurityUtil;
import com.njzscloud.supervisory.device.pojo.entity.DeviceLocalizerEntity;
import com.njzscloud.supervisory.device.pojo.entity.LocalizerConfig;
import com.njzscloud.supervisory.device.pojo.param.SearchDeviceLocalizerParam;
@ -84,7 +87,34 @@ public class DeviceLocalizerController {
public R<?> modifyConfig(@RequestBody LocalizerConfig config,
@RequestParam Long id
) {
Assert.isTrue(SecurityUtil.isAdmin(), () -> Exceptions.clierr("非管理员不能操作"));
deviceLocalizerService.modifyConfig(id, config);
return R.success();
}
/**
*
*/
@GetMapping("/reflash_device")
public R<?> reflash() {
Assert.isTrue(SecurityUtil.isAdmin(), () -> Exceptions.clierr("非管理员不能操作"));
deviceLocalizerService.reflash();
return R.success();
}
@GetMapping("/send_directive")
public R<?> sendDirective(@RequestParam Long id,
@RequestParam String directive
) {
Assert.isTrue(SecurityUtil.isAdmin(), () -> Exceptions.clierr("非管理员不能操作"));
deviceLocalizerService.sendDirective(id, directive);
return R.success();
}
@GetMapping("/enable_track")
public R<?> enableTrack(@RequestParam Long id) {
Assert.isTrue(SecurityUtil.isAdmin(), () -> Exceptions.clierr("非管理员不能操作"));
deviceLocalizerService.enableTrack(id);
return R.success();
}
}

View File

@ -2,6 +2,7 @@ package com.njzscloud.supervisory.device.service;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
@ -9,10 +10,9 @@ import com.baomidou.mybatisplus.extension.service.IService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njzscloud.common.core.ex.Exceptions;
import com.njzscloud.common.core.utils.R;
import com.njzscloud.common.localizer.mqtt.param.ConfigIpPortParam;
import com.njzscloud.common.localizer.mqtt.param.EnableWarnParam;
import com.njzscloud.common.localizer.mqtt.param.ObtainDeviceInfoParam;
import com.njzscloud.common.localizer.mqtt.param.SpeedThresholdParam;
import com.njzscloud.common.localizer.Localizer;
import com.njzscloud.common.localizer.contant.LocalizerType;
import com.njzscloud.common.localizer.mqtt.param.*;
import com.njzscloud.common.mp.support.PageParam;
import com.njzscloud.common.mp.support.PageResult;
import com.njzscloud.common.mqtt.support.MqttMsg;
@ -34,6 +34,7 @@ import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/**
@ -59,8 +60,16 @@ public class DeviceLocalizerService extends ServiceImpl<DeviceLocalizerMapper, D
.eq(DeviceLocalizerEntity::getLocalizerCategory, localizerCategory)),
() -> Exceptions.exception("定位器已存在"));
this.save(deviceLocalizerEntity.setConfig(new LocalizerConfig()
.setEnableWarn(false)
.setSpeedThreshold(60)
.setEnableWarn(true)
));
CompletableFuture.runAsync(() -> {
ThreadUtil.sleep(10000);
Mqtt.subscribe(terminalId + "/online", this::heartbeat);
Mqtt.subscribe(terminalId + "/device_info", this::updateDeviceLocalizerStatus);
// Mqtt.subscribe(terminalId + "/track_location_real", truckLocationTrackService::sendRealTimeData);
reflash();
});
}
/**
@ -84,6 +93,13 @@ public class DeviceLocalizerService extends ServiceImpl<DeviceLocalizerMapper, D
.setTerminalId(terminalId)
.setLocalizerCategory(localizerCategory)
);
CompletableFuture.runAsync(() -> {
ThreadUtil.sleep(10000);
Mqtt.subscribe(terminalId + "/online", this::heartbeat);
Mqtt.subscribe(terminalId + "/device_info", this::updateDeviceLocalizerStatus);
// Mqtt.subscribe(terminalId + "/track_location_real", truckLocationTrackService::sendRealTimeData);
reflash();
});
}
/**
@ -94,6 +110,10 @@ public class DeviceLocalizerService extends ServiceImpl<DeviceLocalizerMapper, D
List<DeviceLocalizerEntity> deviceLocalizerEntities = baseMapper.canDel(Wrappers.query(DeviceLocalizerEntity.class).in("a.id", ids));
Assert.isTrue(deviceLocalizerEntities.isEmpty(), () -> Exceptions.exception("存在已被关联的定位器,不能删除"));
this.removeBatchByIds(ids);
CompletableFuture.runAsync(() -> {
ThreadUtil.sleep(10000);
reflash();
});
}
/**
@ -130,12 +150,19 @@ public class DeviceLocalizerService extends ServiceImpl<DeviceLocalizerMapper, D
}
public void onApplicationReady() {
Mqtt.subscribe("localizer/loader", this::localizerLoader);
CompletableFuture.runAsync(() -> {
ThreadUtil.sleep(3000);
reflash();
});
List<DeviceLocalizerEntity> localizerEntities = this.list();
if (localizerEntities.isEmpty()) return;
List<String> collect = localizerEntities.stream().map(DeviceLocalizerEntity::getTerminalId).distinct().collect(Collectors.toList());
for (String terminalId : collect) {
Mqtt.subscribe(terminalId + "/online", this::heartbeat);
Mqtt.subscribe(terminalId + "/device_info", this::updateDeviceLocalizerStatus);
Mqtt.subscribe(terminalId + "/track_location_real", truckLocationTrackService::sendRealTimeData);
// Mqtt.subscribe(terminalId + "/track_location_real", truckLocationTrackService::sendRealTimeData);
}
}
@ -143,11 +170,16 @@ public class DeviceLocalizerService extends ServiceImpl<DeviceLocalizerMapper, D
HeartbeatResult heartbeatResult = msg.getMsg(HeartbeatResult.class);
if (heartbeatResult == null) return;
this.update(Wrappers.lambdaUpdate(DeviceLocalizerEntity.class)
.set(DeviceLocalizerEntity::getOnline, heartbeatResult.isOnline())
// .set(DeviceLocalizerEntity::getOnline, heartbeatResult.isOnline())
.set(DeviceLocalizerEntity::getLastTime, heartbeatResult.getTime())
.eq(DeviceLocalizerEntity::getTerminalId, heartbeatResult.getTerminalId()));
}
public void localizerLoader(MqttMsg msg) {
// CompletableFuture.runAsync(this::reflash);
reflash();
}
public void updateDeviceLocalizerStatus(MqttMsg msg) {
DeviceInfoResult deviceInfoResult = msg.getMsg(DeviceInfoResult.class);
if (deviceInfoResult == null) return;
@ -190,7 +222,11 @@ public class DeviceLocalizerService extends ServiceImpl<DeviceLocalizerMapper, D
oldConfig.setEnableWarn(enableWarn);
}
this.updateById(new DeviceLocalizerEntity().setId(id).setConfig(oldConfig));
this.configDevice(terminalId, config);
CompletableFuture.runAsync(() -> {
ThreadUtil.sleep(10000);
reflash();
});
// this.configDevice(terminalId, config);
}
private void configDevice(String terminalId, LocalizerConfig config) {
@ -203,14 +239,13 @@ public class DeviceLocalizerService extends ServiceImpl<DeviceLocalizerMapper, D
modifyIpPort(terminalId, ip, port);
}
if (speedThreshold != null && speedThreshold > 0) {
/* if (speedThreshold != null && speedThreshold > 0) {
modifySpeedThreshold(terminalId, speedThreshold);
}
if (enableWarn != null) {
enableWarn(terminalId, enableWarn);
}
} */
}
private void modifyIpPort(String terminalId, String ip, int port) {
@ -231,7 +266,7 @@ public class DeviceLocalizerService extends ServiceImpl<DeviceLocalizerMapper, D
}
private void enableWarn(String terminalId, boolean enableWarn) {
Mqtt.publish("location/speed_threshold", new EnableWarnParam()
Mqtt.publish("location/warn", new EnableWarnParam()
.setTerminalId(terminalId)
.setEnable(enableWarn)
);
@ -243,4 +278,40 @@ public class DeviceLocalizerService extends ServiceImpl<DeviceLocalizerMapper, D
String terminalId = localizerEntity.getTerminalId();
Mqtt.publish("location/device_info", new ObtainDeviceInfoParam().setTerminalId(terminalId));
}
public void reflash() {
List<DeviceLocalizerEntity> localizerEntities = this.list();
if (localizerEntities.isEmpty()) return;
List<Localizer> collect = localizerEntities.stream()
.map(it -> {
LocalizerConfig config = it.getConfig();
return new Localizer()
.setTerminalId(it.getTerminalId())
.setSpeedThreshold(config == null || config.getSpeedThreshold() == null ? 60 : config.getSpeedThreshold())
.setLocalizerType(LocalizerType.valueOf(it.getLocalizerCategory().getVal()));
}
)
.collect(Collectors.toList());
Mqtt.publish("location/load", collect);
}
public void sendDirective(Long id, String directive) {
DeviceLocalizerEntity localizerEntity = this.getById(id);
Assert.notNull(localizerEntity, () -> Exceptions.clierr("设备不存在"));
String terminalId = localizerEntity.getTerminalId();
Mqtt.publish("localizer/send_directive", new SendDirectiveParam()
.setTerminalId(terminalId)
.setDirective(directive)
);
}
public void enableTrack(Long id) {
DeviceLocalizerEntity localizerEntity = this.getById(id);
Assert.notNull(localizerEntity, () -> Exceptions.clierr("设备不存在"));
String terminalId = localizerEntity.getTerminalId();
Mqtt.publish("location/track", MapUtil.builder()
.put("terminalId", terminalId)
.put("interval", 1)
.build());
}
}

View File

@ -203,14 +203,14 @@ public class OrderInfoController {
}
@GetMapping("/start_track")
public R<?> startTrack(@RequestParam("orderId") String orderId) {
orderInfoService.startTuqiangTrack(orderId);
public R<?> startTrack(@RequestParam("orderId") Long orderId) {
orderInfoService.startTrackOrder(orderId);
return R.success();
}
@GetMapping("/stop_track")
public R<?> stopTrack(@RequestParam("orderId") String orderId) {
orderInfoService.stopTrack(orderId);
public R<?> stopTrack(@RequestParam("orderId") Long orderId) {
orderInfoService.stopTrackOrder(orderId);
return R.success();
}

View File

@ -92,21 +92,6 @@ public class OrderInfoService extends ServiceImpl<OrderInfoMapper, OrderInfoEnti
private final MoneyAccountService moneyAccountService;
private final MoneyChangeDetailService moneyChangeDetailService;
private static void stopTuqiangTrack(String gpsId) {
CompletableFuture.runAsync(() -> {
Assert.notEmpty(gpsId, () -> Exceptions.clierr("车辆未绑定GPS"));
Mqtt.publish("location/track", MapUtil.builder()
.put("terminalId", gpsId)
.put("interval", 0)
.build());
Mqtt.unsubscribe(gpsId + "/track_location");
}).whenComplete((aVoid, throwable) -> {
if (throwable != null) {
log.error("关闭GPS失败", throwable);
}
});
}
/**
*
*/
@ -739,8 +724,19 @@ public class OrderInfoService extends ServiceImpl<OrderInfoMapper, OrderInfoEnti
Assert.notNull(truckInfo, () -> Exceptions.clierr("车辆不存在"));
String gpsId = truckInfo.getGps();
String licensePlate = truckInfo.getLicensePlate();
startTuqiangTrack(gpsId, licensePlate, orderInfoId, truckId);
Assert.notEmpty(gpsId, () -> Exceptions.clierr("车辆未绑定GPS"));
DeviceLocalizerEntity deviceLocalizerEntity = baseMapper.gpsLastOnlineTime(gpsId);
LocalDateTime lastTime = deviceLocalizerEntity.getLastTime();
boolean after = lastTime.isBefore(LocalDateTime.now().minusMinutes(5));
if (after) {
bizWarnService.save(new BizWarnEntity()
.setWarnCategory(WarnCategory.EQUIPMENT.getVal())
.setWarnContent(StrUtil.format("{} 绑定的 GPS 设备已离线,设备号:{}", licensePlate, gpsId))
.setOrderId(orderInfoId)
);
return;
}
startTrack(gpsId, licensePlate, orderInfoId, truckId);
CompletableFuture.runAsync(() -> Websocket.publish(new WsMsg().setEvent("down/order/status_change")
.setData(MapUtil.builder()
@ -824,89 +820,22 @@ public class OrderInfoService extends ServiceImpl<OrderInfoMapper, OrderInfoEnti
}
}
private void startTuqiangTrack(String gpsId, String licensePlate, Long orderInfoId, Long truckId) {
CompletableFuture.runAsync(() -> {
Assert.notEmpty(gpsId, () -> Exceptions.clierr("车辆未绑定GPS"));
DeviceLocalizerEntity deviceLocalizerEntity = baseMapper.gpsLastOnlineTime(gpsId);
LocalDateTime lastTime = deviceLocalizerEntity.getLastTime();
boolean after = lastTime.isBefore(LocalDateTime.now().minusMinutes(5));
if (after) {
bizWarnService.save(new BizWarnEntity()
.setWarnCategory(WarnCategory.EQUIPMENT.getVal())
.setWarnContent(StrUtil.format("{} 绑定的 GPS 设备已离线,设备号:{}", licensePlate, gpsId))
.setOrderId(orderInfoId)
);
return;
}
Mqtt.subscribe(gpsId + "/track_location", (msg) -> {
RealtimeLocationResult realtimeLocationResult = msg.getMsg(RealtimeLocationResult.class);
if (realtimeLocationResult == null) {
return;
}
if (realtimeLocationResult.isOverspeed()) {
bizWarnService.save(new BizWarnEntity()
.setWarnCategory(WarnCategory.SPEED.getVal())
.setWarnContent(StrUtil.format("{} 已超速,当前时速 {}km/h", licensePlate, realtimeLocationResult.getSpeed()))
.setOrderId(orderInfoId)
);
}
log.info("mqtt 收到消息:{} {}", gpsId + "/track_location", realtimeLocationResult);
String time = realtimeLocationResult.getTime();
double latitude = realtimeLocationResult.getLatitude();
double longitude = realtimeLocationResult.getLongitude();
if (latitude <= 0 && longitude <= 0) {
return;
}
TruckLocationTrackEntity entity = new TruckLocationTrackEntity()
.setOrderId(orderInfoId)
.setTruckId(truckId)
.setTerminalId(gpsId)
.setLatitude(latitude)
.setLongitude(longitude)
.setAltitude(realtimeLocationResult.getAltitude())
.setSpeed(realtimeLocationResult.getSpeed())
.setLocationTime(DateUtil.parseLocalDateTime(time))
.setDirection(realtimeLocationResult.getDirection())
.setOverspeed(realtimeLocationResult.isOverspeed())
.setCompensate(realtimeLocationResult.getType() == 1);
truckLocationTrackService.save(entity);
});
Mqtt.publish("location/track", MapUtil.builder()
.put("terminalId", gpsId)
.put("interval", 1)
.build());
}).whenComplete((aVoid, throwable) -> {
if (throwable != null) {
log.error("开启GPS失败", throwable);
}
});
}
public void startTuqiangTrack(String orderId) {
OrderInfoEntity orderInfo = this.getOne(Wrappers.lambdaQuery(OrderInfoEntity.class)
.eq(OrderInfoEntity::getSn, orderId)
.or().eq(OrderInfoEntity::getId, orderId)
);
Long orderInfoId = orderInfo.getId();
public void startTrackOrder(Long orderId) {
OrderInfoEntity orderInfo = this.getById(orderId);
Long truckId = orderInfo.getTruckId();
BizTruckEntity truckInfo = baseMapper.getTruckInfo(truckId);
String gpsId = truckInfo.getGps();
String licensePlate = truckInfo.getLicensePlate();
startTuqiangTrack(gpsId, licensePlate, orderInfoId, truckId);
startTrack(gpsId, licensePlate, orderId, truckId);
}
public void stopTrack(String orderId) {
OrderInfoEntity orderInfo = this.getOne(Wrappers.lambdaQuery(OrderInfoEntity.class)
.eq(OrderInfoEntity::getSn, orderId)
.or().eq(OrderInfoEntity::getId, orderId)
);
public void stopTrackOrder(Long orderId) {
OrderInfoEntity orderInfo = this.getById(orderId);
Long truckId = orderInfo.getTruckId();
BizTruckEntity truckInfo = baseMapper.getTruckInfo(truckId);
String gpsId = truckInfo.getGps();
stopTuqiangTrack(gpsId);
stopTrack(gpsId);
}
@Transactional(rollbackFor = Exception.class)
@ -964,7 +893,7 @@ public class OrderInfoService extends ServiceImpl<OrderInfoMapper, OrderInfoEnti
if (StrUtil.isBlank(gpsId)) {
return;
}
stopTuqiangTrack(gpsId);
stopTrack(gpsId);
}).exceptionally(ex -> {
log.error("数据获取失败", ex);
return null;
@ -1113,10 +1042,10 @@ public class OrderInfoService extends ServiceImpl<OrderInfoMapper, OrderInfoEnti
// 3) 合并集合,计算 total_money 与 settle_money并批量落库。
// settleForTransCompany(orderInfoEntity, settleWeight);
} finally {
// TODO 关闭 GPS
// 关闭 GPS
BizTruckEntity truckInfo = baseMapper.getTruckInfo(truckId);
String gpsId = truckInfo.getGps();
stopTuqiangTrack(gpsId);
stopTrack(gpsId);
}
}
@ -1368,4 +1297,67 @@ public class OrderInfoService extends ServiceImpl<OrderInfoMapper, OrderInfoEnti
}
FileUtil.downloadExcel(downList, response, "收运统计表.xlsx");
}
private void startTrack(String gpsId, String licensePlate, Long orderInfoId, Long truckId) {
CompletableFuture.runAsync(() -> {
Mqtt.subscribe(gpsId + "/track_location", (msg) -> {
RealtimeLocationResult realtimeLocationResult = msg.getMsg(RealtimeLocationResult.class);
if (realtimeLocationResult == null) {
return;
}
if (realtimeLocationResult.isOverspeed()) {
bizWarnService.save(new BizWarnEntity()
.setWarnCategory(WarnCategory.SPEED.getVal())
.setWarnContent(StrUtil.format("{} 已超速,当前时速 {}km/h", licensePlate, realtimeLocationResult.getSpeed()))
.setOrderId(orderInfoId)
);
}
// log.info("mqtt 收到消息:{} {}", gpsId + "/track_location", realtimeLocationResult);
String time = realtimeLocationResult.getTime();
double latitude = realtimeLocationResult.getLatitude();
double longitude = realtimeLocationResult.getLongitude();
if (latitude <= 0 && longitude <= 0) {
return;
}
TruckLocationTrackEntity entity = new TruckLocationTrackEntity()
.setOrderId(orderInfoId)
.setTruckId(truckId)
.setTerminalId(gpsId)
.setLatitude(latitude)
.setLongitude(longitude)
.setAltitude(realtimeLocationResult.getAltitude())
.setSpeed(realtimeLocationResult.getSpeed())
.setLocationTime(DateUtil.parseLocalDateTime(time))
.setDirection(realtimeLocationResult.getDirection())
.setOverspeed(realtimeLocationResult.isOverspeed())
.setCompensate(realtimeLocationResult.getType() == 1);
truckLocationTrackService.save(entity);
});
/* Mqtt.publish("location/track", MapUtil.builder()
.put("terminalId", gpsId)
.put("interval", 1)
.build()); */
}).whenComplete((aVoid, throwable) -> {
if (throwable != null) {
log.error("开启GPS失败", throwable);
}
});
}
private void stopTrack(String gpsId) {
CompletableFuture.runAsync(() -> {
Assert.notEmpty(gpsId, () -> Exceptions.clierr("车辆未绑定GPS"));
/* Mqtt.publish("location/track", MapUtil.builder()
.put("terminalId", gpsId)
.put("interval", 0)
.build()); */
Mqtt.unsubscribe(gpsId + "/track_location");
}).whenComplete((aVoid, throwable) -> {
if (throwable != null) {
log.error("关闭GPS失败", throwable);
}
});
}
}

View File

@ -7,7 +7,6 @@ import lombok.ToString;
import lombok.experimental.Accessors;
import java.time.LocalDate;
import java.util.List;
@Getter
@Setter
@ -115,9 +114,14 @@ public class UserRegisterParam {
*
*/
private String legalRepresentative;
private List<String> idcard;
/**
*
*/
private String idcard;
private LocalDate idcardStartTime;
private LocalDate idcardEndTime;
private String idcardFront;
private String idcardBack;
/**
* ;
*/