gps/src/main/java/com/njzscloud/common/jt808/JT808.java

149 lines
4.7 KiB
Java

package com.njzscloud.common.jt808;
import com.njzscloud.common.jt808.support.JT808Message;
import com.njzscloud.common.jt808.support.JT808MessageResolver;
import com.njzscloud.common.jt808.support.MessageBody;
import com.njzscloud.common.jt808.util.FlowId;
import com.njzscloud.common.tcp.TcpServer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
/**
* JT808消息发送工具类
* 提供便捷的方法发送各种类型的JT808消息
*/
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class JT808 {
// 终端手机号与Channel的映射
private static final Map<String, Channel> terminalChannels = new ConcurrentHashMap<>();
private static TcpServer tcpServer;
public static void run(int port, int bossThreads, int workerThreads) {
tcpServer = new TcpServer(port, bossThreads, workerThreads);
tcpServer.start();
}
public static void stop() {
if (tcpServer != null) {
tcpServer.stop();
tcpServer = null;
}
}
/**
* 注册消息监听器
*/
public static void addListener(String messageId, Consumer<MessageBody> listener) {
JT808MessageResolver.addResolver(messageId, listener);
}
/**
* 注册终端与通道的映射关系
*/
public static void register(String terminalId, Channel channel) {
if (terminalId != null && channel != null) {
if (terminalChannels.get(terminalId) != channel) {
terminalChannels.put(terminalId, channel);
}
}
}
/**
* 移除终端与通道的映射关系
*/
public static void unregister(Channel channel) {
if (channel != null) {
terminalChannels.values().removeIf(ch -> ch == channel);
}
}
/**
* 发送通用应答消息 (0x8001)
*/
public static void sendGeneralResponse(JT808Message message) {
String terminalPhone = message.getTerminalPhone();
int flowId = message.getFlowId();
int messageId = message.getMessageId();
ByteBuf body = Unpooled.buffer();
ByteBufUtil.writeShortBE(body, flowId);
ByteBufUtil.writeShortBE(body, messageId);
body.writeByte(0);
// 构建消息
byte[] bytes = ByteBufUtil.getBytes(body);
body.release();
message = JT808.createBaseMessage(terminalPhone, 0x8001, bytes);
sendMessage(message);
}
/**
* 向指定终端发送消息
*/
public static void sendMessage(JT808Message message) {
String terminalPhone = message.getTerminalPhone();
Channel channel = terminalChannels.get(terminalPhone);
if (channel != null && channel.isActive()) {
channel.writeAndFlush(message)
.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
log.info("消息发送成功, 终端: {}, 消息ID: 0x{}, 流水号: {}", terminalPhone, Integer.toHexString(message.getMessageId()), message.getFlowId());
} else {
log.error("消息发送失败, 终端: {}, 消息ID: 0x{}, 流水号: {}", terminalPhone, Integer.toHexString(message.getMessageId()), message.getFlowId(), future.cause());
}
});
} else {
unregister(channel);
// 终端不在线或通道已关闭
log.warn("终端不在线: {}", terminalPhone);
}
}
/**
* 创建基础消息对象
*/
public static JT808Message createBaseMessage(String terminalId, int messageId) {
return createBaseMessage(terminalId, messageId, null);
}
public static JT808Message createBaseMessage(String terminalId, int messageId, byte[] body) {
JT808Message message = new JT808Message()
.setFlowId(FlowId.next(terminalId, messageId));
// 设置消息ID
message.setMessageId(messageId);
// 设置消息体属性
// 其他属性默认:不分包,不加密
if (body != null) {
message.setMessageBodyProps(body.length);
} else {
message.setMessageBodyProps(0);
}
// 设置终端手机号
message.setTerminalPhone(terminalId);
// 设置消息体
message.setMessageBody(body);
return message;
}
}