localizer
parent
713f047aa1
commit
6f93d9d40f
|
|
@ -0,0 +1,55 @@
|
|||
package com.njzscloud.common.localizer;
|
||||
|
||||
import com.njzscloud.common.localizer.contant.LocalizerType;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
public class DeviceStore {
|
||||
private static final LinkedList<Localizer> localizers = new LinkedList<>();
|
||||
private static final ReadWriteLock localizersLock = new ReentrantReadWriteLock();
|
||||
private static final Lock localizersWriteLock = localizersLock.writeLock();
|
||||
private static final Lock localizersReadLock = localizersLock.readLock();
|
||||
|
||||
|
||||
public static void setLocalizers(LinkedList<Localizer> list) {
|
||||
localizers.clear();
|
||||
localizersWriteLock.lock();
|
||||
try {
|
||||
localizers.addAll(list);
|
||||
} finally {
|
||||
localizersWriteLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public static Localizer determineLocalizer(String terminalId) {
|
||||
localizersReadLock.lock();
|
||||
try {
|
||||
for (Localizer localizer : localizers) {
|
||||
if (localizer.getTerminalId().equals(terminalId)) {
|
||||
return localizer;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
localizersReadLock.unlock();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static LocalizerType determineType(String terminalId) {
|
||||
Lock localizerInfosReadLock = localizersLock.readLock();
|
||||
try {
|
||||
for (Localizer localizer : localizers) {
|
||||
if (localizer.getTerminalId().equals(terminalId)) {
|
||||
return localizer.getType();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
localizerInfosReadLock.unlock();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,153 @@
|
|||
package com.njzscloud.common.localizer;
|
||||
|
||||
import cn.hutool.core.bean.BeanUtil;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import com.njzscloud.common.localizer.contant.LocalizerType;
|
||||
import com.njzscloud.common.localizer.jt808.message.TerminalMessageBody;
|
||||
import com.njzscloud.common.localizer.jt808.message.TerminalTxtMessage;
|
||||
import com.njzscloud.common.localizer.mqtt.result.RealtimeLocationResult;
|
||||
import com.njzscloud.common.localizer.tuqiang.message.LocationReportMessage;
|
||||
import com.njzscloud.common.localizer.tuqiang.message.SearchDeviceInfoMessage;
|
||||
import com.njzscloud.common.localizer.tuqiang.message.SearchLocationResponseMessage;
|
||||
import com.njzscloud.common.mqtt.util.Mqtt;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.Channel;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.Accessors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@Getter
|
||||
@Setter
|
||||
@ToString
|
||||
@Accessors(chain = true)
|
||||
public abstract class Localizer {
|
||||
/**
|
||||
* 设备 Id
|
||||
*/
|
||||
protected String terminalId;
|
||||
|
||||
/**
|
||||
* 设备类型
|
||||
*/
|
||||
protected LocalizerType type;
|
||||
|
||||
/**
|
||||
* 网络通道
|
||||
*/
|
||||
protected Channel channel;
|
||||
|
||||
public boolean isOnline() {
|
||||
return channel != null && channel.isActive();
|
||||
}
|
||||
|
||||
/**
|
||||
* 开启追踪模式
|
||||
*
|
||||
* @param interval 上报间隔
|
||||
*/
|
||||
public abstract void track(int interval);
|
||||
|
||||
/**
|
||||
* 启用报警
|
||||
*
|
||||
*/
|
||||
public abstract void enableWarn(boolean enable);
|
||||
|
||||
/**
|
||||
* 设置速度报警
|
||||
*
|
||||
* @param speed 速度,单位km/h
|
||||
*/
|
||||
public abstract void speedThreshold(int speed);
|
||||
|
||||
/**
|
||||
* 配置 IP 端口
|
||||
*
|
||||
* @param ip IP地址
|
||||
* @param port 端口号
|
||||
*/
|
||||
public abstract void configIpPort(String ip, int port);
|
||||
|
||||
/**
|
||||
* 终端心跳消息
|
||||
*/
|
||||
public void onHeartbeat(TerminalMessageBody message) {
|
||||
String terminalId = message.getTerminalId();
|
||||
Mqtt.publish(terminalId + "/online", MapUtil.builder()
|
||||
.put("online", true)
|
||||
.put("terminalId", terminalId)
|
||||
.put("time", DateUtil.now())
|
||||
.build());
|
||||
}
|
||||
|
||||
/**
|
||||
* 终端位置信息汇报消息
|
||||
*/
|
||||
public void onLocationReport(TerminalMessageBody message) {
|
||||
ByteBuf byteBuf = message.getBody();
|
||||
LocationReportMessage locationReportMsg = new LocationReportMessage(byteBuf);
|
||||
// log.info("终端位置信息汇报消息: {}", locationReportMsg);
|
||||
|
||||
String terminalId = message.getTerminalId();
|
||||
RealtimeLocationResult realtimeLocationResult = BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class)
|
||||
.setTerminalId(terminalId);
|
||||
Mqtt.publish(terminalId + "/track_location", realtimeLocationResult);
|
||||
Mqtt.publish(terminalId + "/track_location_real", realtimeLocationResult);
|
||||
}
|
||||
|
||||
/**
|
||||
* 定位数据批量上传消息
|
||||
*/
|
||||
public void onLocationBatchUpload(TerminalMessageBody message) {
|
||||
String terminalId = message.getTerminalId();
|
||||
ByteBuf body = message.getBody();
|
||||
int count = body.readUnsignedShort();
|
||||
int type = body.readByte();
|
||||
|
||||
for (int i = 0; i < count; i++) {
|
||||
int length = body.readUnsignedShort();
|
||||
LocationReportMessage locationReportMsg = new LocationReportMessage(body.slice(body.readerIndex(), length));
|
||||
body.skipBytes(length);
|
||||
RealtimeLocationResult realtimeLocationResult = BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class)
|
||||
.setTerminalId(terminalId)
|
||||
.setType(type);
|
||||
Mqtt.publish(terminalId + "/track_location", realtimeLocationResult);
|
||||
Mqtt.publish(terminalId + "/track_location_real", realtimeLocationResult);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询位置响应消息
|
||||
*/
|
||||
public void onSearchLocationResponse(TerminalMessageBody message) {
|
||||
String terminalId = message.getTerminalId();
|
||||
ByteBuf body = message.getBody();
|
||||
SearchLocationResponseMessage searchLocationResponseMsg = new SearchLocationResponseMessage(body);
|
||||
// log.info("查询位置响应消息: {}", searchLocationResponseMsg);
|
||||
|
||||
LocationReportMessage locationReportMsg = searchLocationResponseMsg.getLocationReportMessage();
|
||||
RealtimeLocationResult realtimeLocationResult = BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class)
|
||||
.setTerminalId(terminalId);
|
||||
Mqtt.publish(terminalId + "/current_location", realtimeLocationResult);
|
||||
}
|
||||
|
||||
/**
|
||||
* 终端文本信息汇报消息
|
||||
*/
|
||||
public void onTerminalTxtReport(TerminalMessageBody message) {
|
||||
TerminalTxtMessage terminalTxtReportMsg = new TerminalTxtMessage(message.getBody());
|
||||
String txt = terminalTxtReportMsg.getTxt();
|
||||
log.info("终端文本信息汇报消息: {}", txt);
|
||||
SearchDeviceInfoMessage deviceInfo = SearchDeviceInfoMessage.parse(txt);
|
||||
if (deviceInfo != null) {
|
||||
String terminalId = message.getTerminalId();
|
||||
deviceInfo.setTerminalId(terminalId);
|
||||
|
||||
Mqtt.publish(terminalId + "/device_info", deviceInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,6 @@
|
|||
package com.njzscloud.common.localizer.contant;
|
||||
|
||||
public enum BodyCharset {
|
||||
GBK,
|
||||
ASCII,
|
||||
}
|
||||
|
|
@ -0,0 +1,21 @@
|
|||
package com.njzscloud.common.localizer.contant;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
||||
/**
|
||||
* 定位器类别
|
||||
*/
|
||||
@Getter
|
||||
@RequiredArgsConstructor
|
||||
public enum LocalizerType {
|
||||
/**
|
||||
* 途强定位器
|
||||
*/
|
||||
Tuqiang,
|
||||
/**
|
||||
* 航天紫金定位器
|
||||
*/
|
||||
HangTianZiJin,
|
||||
;
|
||||
}
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
package com.njzscloud.common.localizer.contant;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
||||
@Getter
|
||||
@RequiredArgsConstructor
|
||||
public enum MessageType {
|
||||
TerminalRegister(0x0100),
|
||||
TerminalAuth(0x0102),
|
||||
GeneralReport(0x0001),
|
||||
Heartbeat(0x0002),
|
||||
LocationReport(0x0200),
|
||||
LocationBatchReport(0x704),
|
||||
LocationSearchReport(0x0201),
|
||||
TxtReport(0x6006),
|
||||
;
|
||||
|
||||
private final int messageId;
|
||||
|
||||
public static MessageType getMessageType(int messageId) {
|
||||
MessageType[] values = MessageType.values();
|
||||
for (MessageType value : values) {
|
||||
if (value.messageId == messageId) {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -28,7 +28,7 @@ import java.util.function.Consumer;
|
|||
@NoArgsConstructor(access = AccessLevel.PRIVATE)
|
||||
public final class JT808 {
|
||||
|
||||
// 终端手机号与Channel的映射
|
||||
// 终端设备号与Channel的映射
|
||||
public static final Map<String, Channel> DeviceHandle = new ConcurrentHashMap<>();
|
||||
|
||||
private static TcpServer tcpServer;
|
||||
|
|
@ -108,9 +108,7 @@ public final class JT808 {
|
|||
if (channel != null && channel.isActive()) {
|
||||
channel.writeAndFlush(message)
|
||||
.addListener((ChannelFutureListener) future -> {
|
||||
if (future.isSuccess()) {
|
||||
log.info("消息发送成功, 终端: {}, 消息ID: 0x{}, 流水号: {}", terminalPhone, Integer.toHexString(message.getMessageId()), message.getFlowId());
|
||||
} else {
|
||||
if (!future.isSuccess()) {
|
||||
log.error("消息发送失败, 终端: {}, 消息ID: 0x{}, 流水号: {}", terminalPhone, Integer.toHexString(message.getMessageId()), message.getFlowId(), future.cause());
|
||||
}
|
||||
});
|
||||
|
|
@ -205,8 +203,6 @@ public final class JT808 {
|
|||
}
|
||||
|
||||
private static boolean verifyCheckCode(JT808Message message, ByteBuf originalBuf) {
|
||||
// 简化实现,实际项目中需要根据协议规范计算校验码
|
||||
// 校验码 = 消息头 + 消息体 中所有字节的异或
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,40 @@
|
|||
package com.njzscloud.common.localizer.jt808;
|
||||
|
||||
import com.njzscloud.common.localizer.jt808.message.JT808Message;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
/**
|
||||
* 终端消息体
|
||||
*/
|
||||
@Getter
|
||||
@Setter
|
||||
@ToString
|
||||
@Accessors(chain = true)
|
||||
public class TerminalMessageBody {
|
||||
/**
|
||||
* 终端ID
|
||||
*/
|
||||
protected String terminalId;
|
||||
/**
|
||||
* 消息体
|
||||
*/
|
||||
protected byte[] body;
|
||||
/**
|
||||
* 消息 Id
|
||||
*/
|
||||
private int messageId;
|
||||
/**
|
||||
* 流水号
|
||||
*/
|
||||
private int flowId;
|
||||
|
||||
public TerminalMessageBody(JT808Message message) {
|
||||
this.terminalId = message.getTerminalPhone();
|
||||
this.messageId = message.getMessageId();
|
||||
this.flowId = message.getFlowId();
|
||||
this.body = message.getMessageBody();
|
||||
}
|
||||
}
|
||||
|
|
@ -1,7 +1,7 @@
|
|||
package com.njzscloud.common.localizer.jt808.message;
|
||||
|
||||
import cn.hutool.core.util.ReflectUtil;
|
||||
import com.njzscloud.common.core.fastjson.Fastjson;
|
||||
import com.njzscloud.common.core.jackson.Jackson;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.experimental.Accessors;
|
||||
|
|
@ -51,7 +51,7 @@ public class JT808Message {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Fastjson.toJsonStr(this);
|
||||
return Jackson.toJsonStr(this);
|
||||
}
|
||||
|
||||
public <T> T parseMessageBody(Class<T> clazz) {
|
||||
|
|
|
|||
|
|
@ -15,8 +15,17 @@ import lombok.experimental.Accessors;
|
|||
@Accessors(chain = true)
|
||||
@AllArgsConstructor
|
||||
public class ServerGeneralResponseMessage implements ServerMessageBody {
|
||||
/**
|
||||
* 消息ID
|
||||
*/
|
||||
private int messageId;
|
||||
/**
|
||||
* 流水号
|
||||
*/
|
||||
private int flowId;
|
||||
/**
|
||||
* 结果码,0 成功,其他失败
|
||||
*/
|
||||
private int result;
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,14 @@
|
|||
package com.njzscloud.common.localizer.jt808.message;
|
||||
|
||||
/**
|
||||
* 服务器消息体接口
|
||||
* 所有服务器消息体都必须实现此接口
|
||||
*/
|
||||
public interface ServerMessageBody {
|
||||
/**
|
||||
* 将消息体转换为字节数组
|
||||
*
|
||||
* @return 消息体的字节数组表示
|
||||
*/
|
||||
byte[] toBytes();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,15 +9,30 @@ import lombok.Setter;
|
|||
import lombok.ToString;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
/**
|
||||
* 服务器注册响应消息体
|
||||
*/
|
||||
@Getter
|
||||
@Setter
|
||||
@ToString
|
||||
@Accessors(chain = true)
|
||||
@AllArgsConstructor
|
||||
public class ServerRegisterResponseMessage implements ServerMessageBody {
|
||||
/**
|
||||
* 终端ID
|
||||
*/
|
||||
private String terminalId;
|
||||
/**
|
||||
* 流水号
|
||||
*/
|
||||
private int flowId;
|
||||
/**
|
||||
* 结果码,0 成功,其他失败
|
||||
*/
|
||||
private int result;
|
||||
/**
|
||||
* 认证码
|
||||
*/
|
||||
private String authCode;
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -1,36 +1,69 @@
|
|||
package com.njzscloud.common.localizer.jt808.message;
|
||||
|
||||
import cn.hutool.core.util.CharsetUtil;
|
||||
import com.njzscloud.common.core.ex.Exceptions;
|
||||
import com.njzscloud.common.localizer.contant.BodyCharset;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.Accessors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
@ToString
|
||||
@Slf4j
|
||||
@Accessors(chain = true)
|
||||
public class ServerTxtMessage implements ServerMessageBody {
|
||||
/**
|
||||
* 文本消息标志位,
|
||||
* 途强:0x01 表示紧急文本消息
|
||||
* 紫金航天:0x01表示后续数据是指令
|
||||
*/
|
||||
private byte flag;
|
||||
/**
|
||||
* 文本消息内容
|
||||
*/
|
||||
private String txt;
|
||||
|
||||
/**
|
||||
* 文本消息编码,默认UTF-8
|
||||
*/
|
||||
private BodyCharset charset;
|
||||
|
||||
public ServerTxtMessage(String txt) {
|
||||
this(txt, BodyCharset.GBK);
|
||||
}
|
||||
|
||||
public ServerTxtMessage(String txt, BodyCharset charset) {
|
||||
this.flag = 0x01;
|
||||
this.txt = txt;
|
||||
this.charset = charset;
|
||||
}
|
||||
|
||||
public byte[] toBytes() {
|
||||
byte[] messageBody = new byte[txt.length() + 1];
|
||||
messageBody[0] = flag;
|
||||
try {
|
||||
System.arraycopy(txt.getBytes(CharsetUtil.GBK), 0, messageBody, 1, txt.length());
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
log.error("文本消息构建失败,原文本:{}", txt, e);
|
||||
switch (charset) {
|
||||
case ASCII: {
|
||||
for (int i = 0; i < txt.length(); i++) {
|
||||
char c = txt.charAt(i);
|
||||
messageBody[i + 1] = (byte) c;
|
||||
}
|
||||
}
|
||||
break;
|
||||
case GBK: {
|
||||
try {
|
||||
System.arraycopy(txt.getBytes(), 0, messageBody, 1, txt.length());
|
||||
} catch (Exception e) {
|
||||
throw Exceptions.error(e, "文本消息构建失败,原文本:{}", txt);
|
||||
}
|
||||
}
|
||||
break;
|
||||
default: {
|
||||
throw Exceptions.error("不支持的文本消息编码:{}", charset);
|
||||
}
|
||||
}
|
||||
|
||||
return messageBody;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,6 +13,9 @@ import lombok.experimental.Accessors;
|
|||
@ToString
|
||||
@Accessors(chain = true)
|
||||
public class TerminalAuthMessage {
|
||||
/**
|
||||
* 认证码
|
||||
*/
|
||||
private String authCode;
|
||||
|
||||
public TerminalAuthMessage(byte[] bytes) {
|
||||
|
|
|
|||
|
|
@ -7,12 +7,21 @@ import lombok.Setter;
|
|||
import lombok.ToString;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
/**
|
||||
* 终端通用响应
|
||||
*/
|
||||
@Getter
|
||||
@Setter
|
||||
@ToString
|
||||
@Accessors(chain = true)
|
||||
public class TerminalGeneralResponseMessage {
|
||||
/**
|
||||
* 流水号
|
||||
*/
|
||||
private int flowId;
|
||||
/**
|
||||
* 消息ID
|
||||
*/
|
||||
private int messageId;
|
||||
/**
|
||||
* 0:成功/确认;1:失败;2:消息有误;3:不支持
|
||||
|
|
|
|||
|
|
@ -6,11 +6,20 @@ import lombok.Setter;
|
|||
import lombok.ToString;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
/**
|
||||
* 终端消息体
|
||||
*/
|
||||
@Getter
|
||||
@Setter
|
||||
@ToString
|
||||
@Accessors(chain = true)
|
||||
public class TerminalMessageBody {
|
||||
/**
|
||||
* 终端ID
|
||||
*/
|
||||
private String terminalId;
|
||||
/**
|
||||
* 消息体,(自动释放)
|
||||
*/
|
||||
private ByteBuf body;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,6 +9,9 @@ import lombok.ToString;
|
|||
import lombok.experimental.Accessors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* 终端注册消息
|
||||
*/
|
||||
@Slf4j
|
||||
@Getter
|
||||
@Setter
|
||||
|
|
|
|||
|
|
@ -7,11 +7,17 @@ import lombok.Setter;
|
|||
import lombok.ToString;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
/**
|
||||
* 终端文本消息
|
||||
*/
|
||||
@Getter
|
||||
@Setter
|
||||
@ToString
|
||||
@Accessors(chain = true)
|
||||
public class TerminalTxtMessage {
|
||||
/**
|
||||
* 编码类型
|
||||
*/
|
||||
private byte type;
|
||||
/**
|
||||
* 文本信息
|
||||
|
|
|
|||
|
|
@ -0,0 +1,59 @@
|
|||
package com.njzscloud.common.localizer.jt808.support;
|
||||
|
||||
import com.njzscloud.common.localizer.DeviceStore;
|
||||
import com.njzscloud.common.localizer.Localizer;
|
||||
import com.njzscloud.common.localizer.contant.LocalizerType;
|
||||
import com.njzscloud.common.localizer.contant.MessageType;
|
||||
import com.njzscloud.common.localizer.jt808.TerminalMessageBody;
|
||||
import com.njzscloud.common.localizer.jt808.message.JT808Message;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* 消息中心
|
||||
*/
|
||||
@Slf4j
|
||||
public class JT808MessageCenter {
|
||||
private static final Map<LocalizerType, Map<MessageType, Consumer<TerminalMessageBody>>> resolvers = new ConcurrentHashMap<>();
|
||||
|
||||
public static void dispatch(JT808Message message) {
|
||||
int messageId = message.getMessageId();
|
||||
String terminalId = message.getTerminalPhone();
|
||||
Localizer localizer = DeviceStore.determineLocalizer(terminalId);
|
||||
if (localizer == null) {
|
||||
log.error("未找到设备:{}", terminalId);
|
||||
return;
|
||||
}
|
||||
LocalizerType localizerType = localizer.getType();
|
||||
|
||||
Map<MessageType, Consumer<TerminalMessageBody>> m = resolvers.computeIfPresent(localizerType, (k, v) -> {
|
||||
MessageType messageType = MessageType.getMessageType(messageId);
|
||||
if (messageType == null) {
|
||||
log.error("未找到消息类型,设备号:{},消息 Id:{}", terminalId, messageId);
|
||||
return v;
|
||||
}
|
||||
Consumer<TerminalMessageBody> c = v.computeIfPresent(messageType, (k1, v1) -> {
|
||||
try {
|
||||
v1.accept(new TerminalMessageBody(message));
|
||||
} catch (Exception e) {
|
||||
log.error("消息处理异常:设备号:{},设备类型:{},消息类型:{}", terminalId, localizerType, messageType, e);
|
||||
}
|
||||
return v1;
|
||||
});
|
||||
if (c == null) {
|
||||
log.error("消息处理器不存在,设备号:{},设备类型:{},消息类型:{}", terminalId, localizerType, messageType);
|
||||
}
|
||||
return v;
|
||||
});
|
||||
if (m == null) {
|
||||
log.error("设备处理器不存在,设备号:{},设备类型:{}", terminalId, localizerType);
|
||||
}
|
||||
}
|
||||
|
||||
public static void addResolver(LocalizerType localizerType, MessageType messageType, Consumer<TerminalMessageBody> resolver) {
|
||||
resolvers.computeIfAbsent(localizerType, k -> new ConcurrentHashMap<>()).put(messageType, resolver);
|
||||
}
|
||||
}
|
||||
|
|
@ -19,7 +19,7 @@ public class JT808MessageHandler extends ChannelInboundHandlerAdapter {
|
|||
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
log.info("客户端连接: {}", ctx.channel().remoteAddress());
|
||||
// log.info("客户端连接: {}", ctx.channel().remoteAddress());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -27,11 +27,10 @@ public class JT808MessageHandler extends ChannelInboundHandlerAdapter {
|
|||
if (msg instanceof JT808Message) {
|
||||
JT808Message message = (JT808Message) msg;
|
||||
int messageId = message.getMessageId();
|
||||
// log.info("收到消息, 设备号: {},消息ID: 0x{}, 流水号: {}", message.getTerminalPhone(), Integer.toHexString(messageId), message.getFlowId());
|
||||
if (messageId == 0x0100) {
|
||||
JT808.register(message.getTerminalPhone(), ctx.channel());
|
||||
}
|
||||
JT808MessageResolver.dispatchMsg(messageId, message);
|
||||
JT808MessageCenter.dispatch(message);
|
||||
} else {
|
||||
super.channelRead(ctx, msg);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,13 +37,13 @@ public class JT808MessageResolver {
|
|||
}
|
||||
|
||||
public static void dispatchMsg(String messageId, JT808Message message) {
|
||||
Consumer<TerminalMessageBody> heartbeat = resolvers.get(messageId);
|
||||
if (heartbeat != null) {
|
||||
Consumer<TerminalMessageBody> messageBodyConsumer = resolvers.get(messageId);
|
||||
if (messageBodyConsumer != null) {
|
||||
String terminalPhone = message.getTerminalPhone();
|
||||
byte[] messageBody = message.getMessageBody();
|
||||
ByteBuf byteBuf = Unpooled.wrappedBuffer(messageBody);
|
||||
try {
|
||||
heartbeat.accept(new TerminalMessageBody()
|
||||
messageBodyConsumer.accept(new TerminalMessageBody()
|
||||
.setTerminalId(terminalPhone)
|
||||
.setBody(byteBuf));
|
||||
} catch (Exception e) {
|
||||
|
|
@ -72,7 +72,7 @@ public class JT808MessageResolver {
|
|||
|
||||
TerminalRegisterMessage terminalRegisterMessage = message.parseMessageBody(TerminalRegisterMessage.class);
|
||||
|
||||
log.info("终端注册消息: {}", terminalRegisterMessage);
|
||||
log.info("{},终端注册消息: {}", message.getTerminalPhone(), terminalRegisterMessage);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -81,7 +81,7 @@ public class JT808MessageResolver {
|
|||
public static void onTerminalAuth(JT808Message message) {
|
||||
JT808.sendGeneralResponse(message);
|
||||
TerminalAuthMessage authMsg = message.parseMessageBody(TerminalAuthMessage.class);
|
||||
log.info("终端鉴权消息: {}", authMsg);
|
||||
log.info("{},终端鉴权消息: {}", message.getTerminalPhone(), authMsg);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -89,16 +89,14 @@ public class JT808MessageResolver {
|
|||
*/
|
||||
public static void onTerminalGeneralResponse(JT808Message message) {
|
||||
TerminalGeneralResponseMessage terminalGeneralResponseMessage = message.parseMessageBody(TerminalGeneralResponseMessage.class);
|
||||
int messageId = message.getMessageId();
|
||||
String terminalPhone = message.getTerminalPhone();
|
||||
int result = terminalGeneralResponseMessage.getResult();
|
||||
log.info("设备通用响应,设备号:{},消息 Id:{},结果:{}", terminalPhone, Integer.toHexString(messageId), result);
|
||||
log.info("设备通用响应,设备号:{},消息:{}", message.getTerminalPhone(), terminalGeneralResponseMessage.getResult());
|
||||
}
|
||||
|
||||
/**
|
||||
* 终端心跳消息
|
||||
*/
|
||||
public static void onHeartbeat(JT808Message message) {
|
||||
log.info("{},终端心跳消息: {}", message.getTerminalPhone(), message);
|
||||
JT808.sendGeneralResponse(message);
|
||||
dispatchMsg(Heartbeat, message);
|
||||
}
|
||||
|
|
@ -107,6 +105,7 @@ public class JT808MessageResolver {
|
|||
* 终端位置信息汇报消息
|
||||
*/
|
||||
public static void onLocationReport(JT808Message message) {
|
||||
log.info("{},终端位置信息汇报消息: {}", message.getTerminalPhone(), message);
|
||||
dispatchMsg(LocationReport, message);
|
||||
}
|
||||
|
||||
|
|
@ -114,6 +113,7 @@ public class JT808MessageResolver {
|
|||
* 定位数据批量上传消息
|
||||
*/
|
||||
public static void onLocationBatchUpload(JT808Message message) {
|
||||
log.info("{},定位数据批量上传消息: {}", message.getTerminalPhone(), message);
|
||||
dispatchMsg(LocationBatchReport, message);
|
||||
}
|
||||
|
||||
|
|
@ -121,6 +121,7 @@ public class JT808MessageResolver {
|
|||
* 查询位置响应消息
|
||||
*/
|
||||
public static void onSearchLocationResponse(JT808Message message) {
|
||||
log.info("{},查询位置响应消息: {}", message.getTerminalPhone(), message);
|
||||
JT808.sendGeneralResponse(message);
|
||||
dispatchMsg(LocationSearch, message);
|
||||
}
|
||||
|
|
@ -129,6 +130,7 @@ public class JT808MessageResolver {
|
|||
* 终端文本信息汇报消息
|
||||
*/
|
||||
public static void onTerminalTxtReport(JT808Message message) {
|
||||
log.info("{},终端文本信息汇报消息: {}", message.getTerminalPhone(), message);
|
||||
JT808.sendGeneralResponse(message);
|
||||
dispatchMsg(TxtReport, message);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,8 +17,6 @@ public class TcpServer {
|
|||
private EventLoopGroup bossGroup;
|
||||
private EventLoopGroup workerGroup;
|
||||
private ServerBootstrap bootstrap;
|
||||
private Thread startThread;
|
||||
;
|
||||
|
||||
public TcpServer(int port, int bossThreads, int workerThreads) {
|
||||
if (port <= 0 || bossThreads <= 0 || workerThreads <= 0) {
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
package com.njzscloud.common.localizer.mqtt;
|
||||
|
||||
import com.njzscloud.common.core.jackson.Jackson;
|
||||
import com.njzscloud.common.localizer.mqtt.param.*;
|
||||
import com.njzscloud.common.localizer.tuqiang.Tuqiang;
|
||||
import com.njzscloud.common.mqtt.support.MqttListener;
|
||||
|
|
@ -15,6 +16,10 @@ public class MqttMsgHandlers {
|
|||
@MqttListener(topic = "location/track")
|
||||
public void onMessage(MqttMsg msg) {
|
||||
LocationTrackParam locationTrackParam = msg.getMsg(LocationTrackParam.class);
|
||||
// String format = StrUtil.format("nmset,0,{}", locationTrackParam.getInterval());
|
||||
// String format = StrUtil.format("timer,0", locationTrackParam.getInterval());
|
||||
// String format = StrUtil.format("reset", locationTrackParam.getInterval());
|
||||
// JT808.sendTxtMessage(locationTrackParam.getTerminalId(), format);
|
||||
Tuqiang.track(locationTrackParam.getTerminalId(), locationTrackParam.getInterval());
|
||||
}
|
||||
|
||||
|
|
@ -45,6 +50,7 @@ public class MqttMsgHandlers {
|
|||
public void configIpPort(MqttMsg msg) {
|
||||
ConfigIpPortParam configIpPortParam = msg.getMsg(ConfigIpPortParam.class);
|
||||
String terminalId = configIpPortParam.getTerminalId();
|
||||
log.info("configIpPortParam: {}", Jackson.toJsonStr(configIpPortParam));
|
||||
String ip = configIpPortParam.getIp();
|
||||
int port = configIpPortParam.getPort();
|
||||
Tuqiang.configIpPort(terminalId, ip, port);
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
package com.njzscloud.common.localizer.mqtt.param;
|
||||
|
||||
import com.njzscloud.common.localizer.contant.LocalizerType;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.ToString;
|
||||
|
|
@ -10,7 +11,20 @@ import lombok.experimental.Accessors;
|
|||
@ToString
|
||||
@Accessors(chain = true)
|
||||
public class ConfigIpPortParam {
|
||||
/**
|
||||
* 终端ID
|
||||
*/
|
||||
private String terminalId;
|
||||
/**
|
||||
* IP地址
|
||||
*/
|
||||
private String ip;
|
||||
/**
|
||||
* 端口号
|
||||
*/
|
||||
private int port;
|
||||
/**
|
||||
* 定位器类别
|
||||
*/
|
||||
private LocalizerType localizer;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
package com.njzscloud.common.localizer.mqtt.param;
|
||||
|
||||
import com.njzscloud.common.localizer.contant.LocalizerType;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.ToString;
|
||||
|
|
@ -10,6 +11,16 @@ import lombok.experimental.Accessors;
|
|||
@ToString
|
||||
@Accessors(chain = true)
|
||||
public class EnableWarnParam {
|
||||
/**
|
||||
* 终端ID
|
||||
*/
|
||||
private String terminalId;
|
||||
/**
|
||||
* 是否启用报警
|
||||
*/
|
||||
private boolean enable;
|
||||
/**
|
||||
* 定位器类别
|
||||
*/
|
||||
private LocalizerType localizer;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
package com.njzscloud.common.localizer.mqtt.param;
|
||||
|
||||
import com.njzscloud.common.localizer.contant.LocalizerType;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.ToString;
|
||||
|
|
@ -10,5 +11,12 @@ import lombok.experimental.Accessors;
|
|||
@ToString
|
||||
@Accessors(chain = true)
|
||||
public class LocationCurrentParam {
|
||||
/**
|
||||
* 终端ID
|
||||
*/
|
||||
private String terminalId;
|
||||
/**
|
||||
* 定位器类别
|
||||
*/
|
||||
private LocalizerType localizer;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
package com.njzscloud.common.localizer.mqtt.param;
|
||||
|
||||
import com.njzscloud.common.localizer.contant.LocalizerType;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.ToString;
|
||||
|
|
@ -10,6 +11,16 @@ import lombok.experimental.Accessors;
|
|||
@ToString
|
||||
@Accessors(chain = true)
|
||||
public class LocationTrackParam {
|
||||
/**
|
||||
* 终端ID
|
||||
*/
|
||||
private String terminalId;
|
||||
/**
|
||||
* 上报间隔,单位:秒,0-3600秒,0表示不追踪
|
||||
*/
|
||||
private int interval;
|
||||
/**
|
||||
* 定位器类别
|
||||
*/
|
||||
private LocalizerType localizer;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
package com.njzscloud.common.localizer.mqtt.param;
|
||||
|
||||
import com.njzscloud.common.localizer.contant.LocalizerType;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.ToString;
|
||||
|
|
@ -10,5 +11,12 @@ import lombok.experimental.Accessors;
|
|||
@ToString
|
||||
@Accessors(chain = true)
|
||||
public class ObtainDeviceInfoParam {
|
||||
/**
|
||||
* 终端ID
|
||||
*/
|
||||
private String terminalId;
|
||||
/**
|
||||
* 定位器类别
|
||||
*/
|
||||
private LocalizerType localizer;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
package com.njzscloud.common.localizer.mqtt.param;
|
||||
|
||||
import com.njzscloud.common.localizer.contant.LocalizerType;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.ToString;
|
||||
|
|
@ -10,6 +11,16 @@ import lombok.experimental.Accessors;
|
|||
@ToString
|
||||
@Accessors(chain = true)
|
||||
public class SpeedThresholdParam {
|
||||
/**
|
||||
* 终端ID
|
||||
*/
|
||||
private String terminalId;
|
||||
/**
|
||||
* 速度阈值
|
||||
*/
|
||||
private int speed;
|
||||
/**
|
||||
* 定位器类别
|
||||
*/
|
||||
private LocalizerType localizer;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -48,6 +48,7 @@ public final class Tuqiang {
|
|||
* @param interval 上报间隔,单位秒,0-3600秒,0表示不追踪
|
||||
*/
|
||||
public static void track(String terminalId, int interval) {
|
||||
log.info("开启途强追踪,终端ID:{},上报间隔:{}秒", terminalId, interval);
|
||||
Assert.isTrue(interval >= 0 && interval <= 3600, "上报间隔必须为0-3600秒");
|
||||
JT808.sendTxtMessage(terminalId, StrUtil.format("<SPBSJ*P:BSJGPS*C:{}*H:300>", interval));
|
||||
}
|
||||
|
|
@ -58,6 +59,7 @@ public final class Tuqiang {
|
|||
* @param terminalId 终端ID
|
||||
*/
|
||||
public static void obtainDeviceInfo(String terminalId) {
|
||||
log.info("获取途强设备信息,终端ID:{}", terminalId);
|
||||
JT808.sendTxtMessage(terminalId, "<CKBSJ>");
|
||||
}
|
||||
|
||||
|
|
@ -67,6 +69,7 @@ public final class Tuqiang {
|
|||
* @param terminalId 终端ID
|
||||
*/
|
||||
public static void enableWarn(String terminalId, boolean enable) {
|
||||
log.info("设置途强报警,终端ID:{},是否启用:{}", terminalId, enable);
|
||||
JT808.sendTxtMessage(terminalId, StrUtil.format("<SPBSJ*P:BSJGPS*6W:{}>", enable ? 1 : 0));
|
||||
}
|
||||
|
||||
|
|
@ -77,6 +80,7 @@ public final class Tuqiang {
|
|||
* @param speed 速度,单位km/h
|
||||
*/
|
||||
public static void speedThreshold(String terminalId, int speed) {
|
||||
log.info("设置途强速度报警,终端ID:{},速度:{}km/h", terminalId, speed);
|
||||
JT808.sendTxtMessage(terminalId, StrUtil.format("<SPBSJ*P:BSJGPS*CS:{}*H:300>", speed));
|
||||
}
|
||||
|
||||
|
|
@ -89,6 +93,7 @@ public final class Tuqiang {
|
|||
* @param port 服务器端口
|
||||
*/
|
||||
public static void configIpPort(String terminalId, String ip, int port) {
|
||||
log.info("设置途强设备连接地址,终端ID:{},IP:{},端口:{}", terminalId, ip, port);
|
||||
Assert.notEmpty(terminalId);
|
||||
Assert.notEmpty(ip);
|
||||
Assert.isTrue(port > 0);
|
||||
|
|
@ -101,6 +106,7 @@ public final class Tuqiang {
|
|||
* @param terminalId 终端ID
|
||||
*/
|
||||
public static void currentLocation(String terminalId) {
|
||||
log.info("获取途强当前位置,终端ID:{}", terminalId);
|
||||
JT808Message baseMessage = JT808.createBaseMessage(terminalId, 0x8201);
|
||||
JT808.sendMessage(baseMessage);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -91,8 +91,8 @@ public class TuqiangListeners {
|
|||
@TuqiangListener(messageId = TxtReport)
|
||||
public void onTerminalTxtReport(TerminalMessageBody message) {
|
||||
TerminalTxtMessage terminalTxtReportMsg = new TerminalTxtMessage(message.getBody());
|
||||
// log.info("终端文本信息汇报消息: {}", terminalTxtReportMsg);
|
||||
String txt = terminalTxtReportMsg.getTxt();
|
||||
log.info("终端文本信息汇报消息: {}", txt);
|
||||
SearchDeviceInfoMessage deviceInfo = SearchDeviceInfoMessage.parse(txt);
|
||||
if (deviceInfo != null) {
|
||||
String terminalId = message.getTerminalId();
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ public class MqttCliWrapper implements BeanPostProcessor {
|
|||
|
||||
public MqttCliWrapper(MqttProperties mqttProperties) {
|
||||
String broker = mqttProperties.getBroker();
|
||||
String clientId = mqttProperties.getClientId() + "-" + IdUtil.nanoId(5);
|
||||
String clientId = mqttProperties.getClientId() + "-" + IdUtil.nanoId();
|
||||
String username = mqttProperties.getUsername();
|
||||
String password = mqttProperties.getPassword();
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -9,4 +9,4 @@ localizer:
|
|||
enabled: true
|
||||
boss-threads: 1
|
||||
worker-threads: 1
|
||||
port: 18888
|
||||
port: 30100
|
||||
|
|
|
|||
Loading…
Reference in New Issue