websocket 工具

localizer
lzq 2025-10-15 18:45:42 +08:00
parent 089c6c2c2c
commit fb02103683
8 changed files with 512 additions and 166 deletions

View File

@ -1,16 +1,15 @@
package com.njzscloud.common.ws.support; package com.njzscloud.common.ws.support;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import com.njzscloud.common.core.ex.ExceptionMsg; import com.njzscloud.common.core.ex.ExceptionMsg;
import com.njzscloud.common.core.utils.R; import com.njzscloud.common.core.utils.R;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.*; import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.AbstractWebSocketHandler; import org.springframework.web.socket.handler.AbstractWebSocketHandler;
import java.util.HashMap;
import java.util.Map;
// TODO websocket 消息处理器, 消息转发
/** /**
* websocket <br/> * websocket <br/>
@ -22,83 +21,73 @@ public class ImpartialWebSocketHandler extends AbstractWebSocketHandler {
@Override @Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception { public void afterConnectionEstablished(WebSocketSession session) throws Exception {
Map<String, Object> attributes = session.getAttributes(); Wsdoor wsdoor = Wsdoor.of(session);
String token = (String) attributes.get(Constants.SESSION_ATTRIBUTE_TOKEN); String cid = wsdoor.getCid();
String cid = (String) attributes.get(Constants.SESSION_ATTRIBUTE_CID); Long uid = wsdoor.getUid();
String uid = (String) attributes.get(Constants.SESSION_ATTRIBUTE_UID); wsdoor.send(new WsMsg()
.setEvent("connected")
Websocket.optLink(false, it -> { .setData(R.success(
Wsdoor wsdoor = new Wsdoor(uid, cid, token, session); MapUtil.builder()
Map<String, Wsdoor> map = it.get(uid); .put("uid", uid)
if (map == null) { .put("cid", cid)
map = new HashMap<>(); .build()
map.put(cid, wsdoor); , "连接成功")));
it.put(uid, map);
return;
}
map.put(cid, wsdoor);
});
Websocket.send(uid, cid, new WsMsg().setEvent("connected")
.setData(R.success(cid, "连接成功"))
);
} }
@Override @Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
Map<String, Object> attributes = session.getAttributes();
String cid = (String) attributes.get(Constants.SESSION_ATTRIBUTE_CID);
String uid = (String) attributes.get(Constants.SESSION_ATTRIBUTE_UID);
String payload = message.getPayload(); String payload = message.getPayload();
WsMsg wsMsg = WsMsg.of(payload); WsMsg wsMsg = WsMsg.of(payload);
if (wsMsg == null) { if (wsMsg == null
Websocket.send(uid, cid, new WsMsg().setEvent("error") || StrUtil.isBlank(wsMsg.getAction())
.setData(R.failed(ExceptionMsg.CLI_ERR_MSG, "消息格式错误")) || StrUtil.isBlank(wsMsg.getEvent())) {
); Wsdoor.of(session).send(new WsMsg().setEvent("error")
.setData(R.failed(ExceptionMsg.CLI_ERR_MSG, "消息格式错误")));
session.close();
return; return;
} }
Websocket.dispatch(wsMsg String action = wsMsg.getAction();
.setUid(uid)
.setCid(cid)
);
}
@Override if (!Websocket.ACTION_PUBLISH.equals(action)
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception { && !Websocket.ACTION_PING.equals(action)
Map<String, Object> attributes = session.getAttributes(); // && !Websocket.ACTION_PONG.equals(action)
String token = (String) attributes.get(Constants.SESSION_ATTRIBUTE_TOKEN); && !Websocket.ACTION_SUBSCRIBE.equals(action)
String cid = (String) attributes.get(Constants.SESSION_ATTRIBUTE_CID); && !Websocket.ACTION_UNSUBSCRIBE.equals(action)) {
String uid = (String) attributes.get(Constants.SESSION_ATTRIBUTE_UID); Wsdoor.of(session).send(new WsMsg().setEvent("error")
BinaryMessage binaryMessage = new BinaryMessage(message.getPayload().array()); .setData(R.failed(ExceptionMsg.CLI_ERR_MSG, "消息格式错误")));
session.sendMessage(binaryMessage); session.close();
return;
}
log.info("二进制消息: {}", message.getPayloadLength()); String event = wsMsg.getEvent();
}
@Override if (Websocket.ACTION_PUBLISH.equals(action)) {
protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception { Object data = wsMsg.getData();
log.info("Pong..."); Websocket.publish(event, data);
} else if (Websocket.ACTION_SUBSCRIBE.equals(action)) {
Websocket.subscribe(event, Wsdoor.of(session));
} else if (Websocket.ACTION_PING.equals(action)) {
Wsdoor.of(session).send(new WsMsg().setAction(Websocket.ACTION_PONG));
} else {
Websocket.unsubscribe(event, Wsdoor.of(session));
}
} }
@Override @Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
Map<String, Object> attributes = session.getAttributes(); Wsdoor wsdoor = Wsdoor.of(session);
String token = (String) attributes.get(Constants.SESSION_ATTRIBUTE_TOKEN); String cid = wsdoor.getCid();
String cid = (String) attributes.get(Constants.SESSION_ATTRIBUTE_CID); Long uid = wsdoor.getUid();
String uid = (String) attributes.get(Constants.SESSION_ATTRIBUTE_UID); log.error("连接失败: {} {}", cid, uid, exception);
log.error("连接失败: {}", cid, exception); Websocket.unsubscribe(wsdoor);
Websocket.optLink(false, uid, it -> it.remove(cid));
} }
@Override @Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
Map<String, Object> attributes = session.getAttributes(); Wsdoor wsdoor = Wsdoor.of(session);
String token = (String) attributes.get(Constants.SESSION_ATTRIBUTE_TOKEN); String cid = wsdoor.getCid();
String cid = (String) attributes.get(Constants.SESSION_ATTRIBUTE_CID); Long uid = wsdoor.getUid();
String uid = (String) attributes.get(Constants.SESSION_ATTRIBUTE_UID); log.info("连接关闭: {} {} {}", cid, uid, closeStatus);
log.info("连接关闭: {} {}", cid, closeStatus); Websocket.unsubscribe(wsdoor);
Websocket.optLink(false, uid, it -> it.remove(cid));
} }
} }

View File

@ -26,20 +26,17 @@ import java.util.Map;
@Slf4j @Slf4j
public class TokenHandshakeInterceptor implements HandshakeInterceptor { public class TokenHandshakeInterceptor implements HandshakeInterceptor {
private static final String SEC_WEB_SOCKET_PROTOCOL = "Sec-WebSocket-Protocol"; private static final String SEC_WEB_SOCKET_PROTOCOL = "Sec-WebSocket-Protocol";
// private static final Pattern AUTH_PATTERN = Pattern.compile("^Auth (?<token>[a-zA-Z0-9-.:_~+/]+=*),(?<cid>[a-zA-Z0-9-.:_~+/]+=*)$", Pattern.CASE_INSENSITIVE);
private static final String Authorization = "authorization"; private static final String Authorization = "authorization";
private static final String Cid = "cid";
@Override @Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse serverHttpResponse, WebSocketHandler wsHandler, Map<String, Object> attributes) { public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse serverHttpResponse, WebSocketHandler wsHandler, Map<String, Object> attributes) {
try { try {
HttpServletRequest req = ((ServletServerHttpRequest) request).getServletRequest(); HttpServletRequest req = ((ServletServerHttpRequest) request).getServletRequest();
String authorization = req.getParameter(Authorization); String authorization = req.getParameter(Authorization);
// String cid = req.getParameter(Cid); if (StrUtil.isBlank(authorization)) return false;
if (StrUtil.isBlank(authorization)/* || StrUtil.isBlank(cid) */) return false;
String uid = this.resolve(authorization); Long uid = this.resolve(authorization);
if (StrUtil.isBlank(uid)) return false; if (uid == null) return false;
String cid = IdUtil.fastSimpleUUID(); String cid = IdUtil.fastSimpleUUID();
attributes.put(Constants.SESSION_ATTRIBUTE_TOKEN, authorization); attributes.put(Constants.SESSION_ATTRIBUTE_TOKEN, authorization);
attributes.put(Constants.SESSION_ATTRIBUTE_CID, cid); attributes.put(Constants.SESSION_ATTRIBUTE_CID, cid);
@ -65,9 +62,9 @@ public class TokenHandshakeInterceptor implements HandshakeInterceptor {
} }
} }
private String resolve(String token) { private Long resolve(String token) {
UserDetail userDetail = SecurityUtil.parseToken(token); UserDetail userDetail = SecurityUtil.parseToken(token);
if (userDetail == null) return null; if (userDetail == null) return null;
return userDetail.getUserId().toString(); return userDetail.getUserId();
} }
} }

View File

@ -1,10 +1,10 @@
package com.njzscloud.common.ws.support; package com.njzscloud.common.ws.support;
import cn.hutool.core.collection.CollUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.Collection; import java.util.LinkedList;
import java.util.HashMap; import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -14,35 +14,152 @@ import java.util.function.Consumer;
*/ */
@Slf4j @Slf4j
public class Websocket { public class Websocket {
public static final String ACTION_PUBLISH = "publish";
public static final String ACTION_PING = "ping";
public static final String ACTION_PONG = "pong";
public static final String ACTION_SUBSCRIBE = "subscribe";
public static final String ACTION_UNSUBSCRIBE = "unsubscribe";
/** /**
* <br/> *
* Id:Id:Wsdoor<br/>
*/ */
static final Map<String, Map<String, Wsdoor>> LINK_CACHE = new HashMap<>(); static final ConcurrentHashMap<String, List<Wsdoor>> EVENT_CACHE = new ConcurrentHashMap<>();
static final Map<String, Consumer<WsMsg>> LISTENERS = new ConcurrentHashMap<>(); static final ReentrantReadWriteLock EVENT_CACHE_LOCK = new ReentrantReadWriteLock();
static final ReentrantReadWriteLock.ReadLock EVENT_CACHE_READ_LOCK = EVENT_CACHE_LOCK.readLock();
static final ReentrantReadWriteLock.WriteLock EVENT_CACHE_WRITE_LOCK = EVENT_CACHE_LOCK.writeLock();
static final ReentrantReadWriteLock LINK_CACHE_LOCK = new ReentrantReadWriteLock(); static final ConcurrentHashMap<String, List<Consumer<WsMsg>>> LISTENER_CACHE = new ConcurrentHashMap<>();
static final ReentrantReadWriteLock LISTENER_CACHE_LOCK = new ReentrantReadWriteLock();
static final ReentrantReadWriteLock.ReadLock LISTENER_CACHE_READ_LOCK = LISTENER_CACHE_LOCK.readLock();
static final ReentrantReadWriteLock.WriteLock LISTENER_CACHE_WRITE_LOCK = LISTENER_CACHE_LOCK.writeLock();
static final ReentrantReadWriteLock.ReadLock LINK_CACHE_READ_LOCK = LINK_CACHE_LOCK.readLock(); public static boolean publish(String event, Object data) {
// 总数,成功数
static final ReentrantReadWriteLock.WriteLock LINK_CACHE_WRITE_LOCK = LINK_CACHE_LOCK.writeLock(); Integer[] success = {0, 0};
EVENT_CACHE_READ_LOCK.lock();
public static boolean online(String uid, String cid) {
Websocket.LINK_CACHE_READ_LOCK.lock();
try { try {
return LINK_CACHE.containsKey(uid) && LINK_CACHE.get(uid).containsKey(cid); EVENT_CACHE.computeIfPresent(event, (k, v) -> {
if (CollUtil.isEmpty(v)) return v;
success[1] = v.size();
WsMsg wsMsg = new WsMsg()
.setAction(ACTION_PUBLISH)
.setEvent(event)
.setData(data);
for (Wsdoor wsdoor : v) {
boolean send = wsdoor.send(wsMsg);
if (send) success[1] = success[1] + 1;
}
return v;
});
} finally { } finally {
Websocket.LINK_CACHE_READ_LOCK.unlock(); EVENT_CACHE_READ_LOCK.unlock();
}
LISTENER_CACHE_READ_LOCK.lock();
try {
LISTENER_CACHE.computeIfPresent(event, (k, v) -> {
if (CollUtil.isEmpty(v)) return v;
success[1] = success[1] + v.size();
WsMsg wsMsg = new WsMsg()
.setAction(ACTION_PUBLISH)
.setEvent(event)
.setData(data);
for (Consumer<WsMsg> listener : v) {
try {
listener.accept(wsMsg);
success[1] = success[1] + 1;
} catch (Exception e) {
log.error("事件 {} 监听方法执行异常", event, e);
}
}
return v;
});
} finally {
LISTENER_CACHE_READ_LOCK.unlock();
}
return success[1] > 0;
}
public static void subscribe(String event, Wsdoor wsdoor) {
EVENT_CACHE_READ_LOCK.lock();
try {
EVENT_CACHE.computeIfAbsent(event, k -> new LinkedList<>());
EVENT_CACHE.computeIfPresent(event, (k, v) -> {
v.add(wsdoor);
return v;
});
} finally {
EVENT_CACHE_READ_LOCK.unlock();
}
}
public static void unsubscribe(Wsdoor wsdoor) {
EVENT_CACHE_WRITE_LOCK.lock();
try {
EVENT_CACHE.forEach((k, v) -> {
v.removeIf(it -> it.equals(wsdoor));
});
EVENT_CACHE.entrySet().removeIf(it -> CollUtil.isEmpty(it.getValue()));
} finally {
EVENT_CACHE_WRITE_LOCK.unlock();
}
}
public static void unsubscribe(String event, Wsdoor wsdoor) {
EVENT_CACHE_WRITE_LOCK.lock();
try {
EVENT_CACHE.computeIfPresent(event, (k, v) -> {
v.removeIf(it -> it.equals(wsdoor));
return v;
});
EVENT_CACHE.entrySet().removeIf(it -> CollUtil.isEmpty(it.getValue()));
} finally {
EVENT_CACHE_WRITE_LOCK.unlock();
} }
} }
/** /**
* , <br/>
*
* @param event
*/
public static void subscribe(String event, Consumer<WsMsg> listener) {
LISTENER_CACHE_READ_LOCK.lock();
try {
LISTENER_CACHE.computeIfAbsent(event, k -> new LinkedList<>());
LISTENER_CACHE.computeIfPresent(event, (k, v) -> {
v.add(listener);
return v;
});
} finally {
LISTENER_CACHE_READ_LOCK.unlock();
}
}
public static void unsubscribe(String event, Consumer<WsMsg> listener) {
LISTENER_CACHE_READ_LOCK.lock();
try {
LISTENER_CACHE.computeIfPresent(event, (k, v) -> {
v.removeIf(it -> it == listener);
return v;
});
} finally {
LISTENER_CACHE_READ_LOCK.unlock();
}
}
/*
*//**
* websocket <br/> * websocket <br/>
* readonly true , 使, 使<br/> * readonly true , 使, 使<br/>
* *
* @param readonly * @param readonly
* @param consumer * @param consumer
*/ *//*
static void optLink(boolean readonly, Consumer<Map<String, Map<String, Wsdoor>>> consumer) { static void optLink(boolean readonly, Consumer<Map<String, Map<String, Wsdoor>>> consumer) {
if (readonly) { if (readonly) {
try { try {
@ -61,14 +178,14 @@ public class Websocket {
} }
} }
/** *//**
* websocket <br/> * websocket <br/>
* readonly true , 使, 使<br/> * readonly true , 使, 使<br/>
* *
* @param readonly * @param readonly
* @param uid Id * @param uid Id
* @param consumer * @param consumer
*/ *//*
static void optLink(boolean readonly, String uid, Consumer<Map<String, Wsdoor>> consumer) { static void optLink(boolean readonly, String uid, Consumer<Map<String, Wsdoor>> consumer) {
if (readonly) { if (readonly) {
try { try {
@ -89,7 +206,7 @@ public class Websocket {
} }
} }
/** *//**
* websocket <br/> * websocket <br/>
* readonly true , 使, 使<br/> * readonly true , 使, 使<br/>
* *
@ -97,7 +214,7 @@ public class Websocket {
* @param uid Id * @param uid Id
* @param cid Id * @param cid Id
* @param consumer * @param consumer
*/ *//*
static void optLink(boolean readonly, String uid, String cid, Consumer<Wsdoor> consumer) { static void optLink(boolean readonly, String uid, String cid, Consumer<Wsdoor> consumer) {
if (readonly) { if (readonly) {
try { try {
@ -123,44 +240,48 @@ public class Websocket {
} }
public static void addListener(String event, Consumer<WsMsg> consumer) { public static void addListener(String event, Consumer<WsMsg> consumer) {
LISTENERS.put(event, consumer); LISTENER_CACHE.computeIfAbsent(event, k -> new java.util.ArrayList<>()).add(consumer);
} }
public static void removeListener(String event, Consumer<WsMsg> consumer) { public static void removeListener(String event, Consumer<WsMsg> consumer) {
LISTENERS.remove(event, consumer); List<Consumer<WsMsg>> consumers = LISTENER_CACHE.get(event);
if (consumers != null) consumers.remove(consumer);
} }
public static void dispatch(WsMsg wsMsg) { public static void dispatch(WsMsg wsMsg) {
if (wsMsg == null) return; if (wsMsg == null) return;
Consumer<WsMsg> consumer = LISTENERS.get(wsMsg.event); List<Consumer<WsMsg>> consumers = LISTENER_CACHE.get(wsMsg.getEvent());
if (consumer != null) { if (CollUtil.isNotEmpty(consumers)) {
try { for (Consumer<WsMsg> consumer : consumers) {
consumer.accept(wsMsg); try {
} catch (Exception e) { consumer.accept(wsMsg);
log.error("WebSocket 消息处理失败, 事件: {}, 数据: {}", wsMsg.event, wsMsg, e); } catch (Exception e) {
log.error("WebSocket 消息处理失败, 事件: {}, 数据: {}", wsMsg.getEvent(), wsMsg, e);
}
} }
} }
} }
/** *//**
* *
* *
* @param uid Id * @param uid Id
* @param cid Id * @param cid Id
* @param wsMsg * @param wsMsg
*/ *//*
public static void send(String uid, String cid, WsMsg wsMsg) { public static void send(String uid, String cid, WsMsg wsMsg) {
optLink(true, uid, cid, it -> { optLink(true, uid, cid, it -> {
if (it != null) it.send(wsMsg); if (it != null) it.send(wsMsg);
}); });
} }
/** *//**
* *
* *
* @param uid Id * @param uid Id
* @param wsMsg * @param wsMsg
*/ *//*
public static void send(String uid, WsMsg wsMsg) { public static void send(String uid, WsMsg wsMsg) {
optLink(true, uid, it -> { optLink(true, uid, it -> {
if (it == null) return; if (it == null) return;
@ -172,5 +293,5 @@ public class Websocket {
for (Wsdoor wsdoor : wsdoors) wsdoor.send(wsMsg); for (Wsdoor wsdoor : wsdoors) wsdoor.send(wsMsg);
}); });
} } */
} }

View File

@ -1,6 +1,7 @@
package com.njzscloud.common.ws.support; package com.njzscloud.common.ws.support;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.njzscloud.common.core.jackson.Jackson; import com.njzscloud.common.core.jackson.Jackson;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
@ -16,25 +17,20 @@ import lombok.extern.slf4j.Slf4j;
@Accessors(chain = true) @Accessors(chain = true)
public class WsMsg { public class WsMsg {
@JsonInclude(JsonInclude.Include.NON_EMPTY)
private String action;
/** /**
* *
*/ */
public String event; @JsonInclude(JsonInclude.Include.NON_EMPTY)
private String event;
/**
* Id
*/
public String cid;
/**
* Id
*/
public String uid;
/** /**
* *
*/ */
public Object data; @JsonInclude(JsonInclude.Include.NON_NULL)
private Object data;
public static WsMsg of(String json) { public static WsMsg of(String json) {
try { try {

View File

@ -1,36 +1,64 @@
package com.njzscloud.common.ws.support; package com.njzscloud.common.ws.support;
import lombok.RequiredArgsConstructor; import cn.hutool.core.map.MapUtil;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.WebSocketSession;
/** import java.util.Map;
* websocket import java.util.Objects;
*/
@Slf4j @Slf4j
@RequiredArgsConstructor @Getter
@Setter
@ToString
@Accessors(chain = true)
public class Wsdoor { public class Wsdoor {
/** /**
* Id * Id
*/ */
public final String uid; private final Long uid;
/** /**
* Id * Id
*/ */
public final String cid; private final String cid;
/** /**
* TOKEN * TOKEN
*/ */
public final String token; private final String token;
/** /**
* *
*/ */
public final WebSocketSession session; private final WebSocketSession session;
private Wsdoor(WebSocketSession session) {
Map<String, Object> attributes = session.getAttributes();
this.uid = MapUtil.getLong(attributes, "uid");
this.cid = MapUtil.getStr(attributes, "cid");
this.token = MapUtil.getStr(attributes, "token");
this.session = session;
}
public static Wsdoor of(WebSocketSession session) {
return new Wsdoor(session);
}
@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
Wsdoor wsdoor = (Wsdoor) o;
return Objects.equals(uid, wsdoor.uid) && Objects.equals(cid, wsdoor.cid);
}
@Override
public int hashCode() {
return Objects.hash(uid, cid);
}
/** /**
* *
@ -39,6 +67,7 @@ public class Wsdoor {
*/ */
public boolean send(WsMsg wsMsg) { public boolean send(WsMsg wsMsg) {
try { try {
if (!session.isOpen()) return false;
session.sendMessage(new TextMessage(wsMsg.toString())); session.sendMessage(new TextMessage(wsMsg.toString()));
return true; return true;
} catch (Exception e) { } catch (Exception e) {

View File

@ -20,7 +20,7 @@ public class LocationTrackHistoryParam {
Integer speed; Integer speed;
public Integer getSpeed() { public Integer getSpeed() {
return speed == null ? 1 : speed; return speed == null || speed <= 0 ? 1000 : speed;
} }
public Long getOrderId() { public Long getOrderId() {

View File

@ -1,5 +1,6 @@
package com.njzscloud.supervisory.biz.service; package com.njzscloud.supervisory.biz.service;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.IdUtil;
import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.metadata.IPage;
@ -8,6 +9,7 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.IService; import com.baomidou.mybatisplus.extension.service.IService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njzscloud.common.core.ex.ExceptionMsg;
import com.njzscloud.common.core.thread.ThreadPool; import com.njzscloud.common.core.thread.ThreadPool;
import com.njzscloud.common.core.tuple.Tuple3; import com.njzscloud.common.core.tuple.Tuple3;
import com.njzscloud.common.core.utils.R; import com.njzscloud.common.core.utils.R;
@ -15,7 +17,6 @@ import com.njzscloud.common.mp.support.PageParam;
import com.njzscloud.common.mp.support.PageResult; import com.njzscloud.common.mp.support.PageResult;
import com.njzscloud.common.mqtt.support.MqttMsg; import com.njzscloud.common.mqtt.support.MqttMsg;
import com.njzscloud.common.ws.support.Websocket; import com.njzscloud.common.ws.support.Websocket;
import com.njzscloud.common.ws.support.WsMsg;
import com.njzscloud.supervisory.biz.mapper.TruckLocationTrackMapper; import com.njzscloud.supervisory.biz.mapper.TruckLocationTrackMapper;
import com.njzscloud.supervisory.biz.pojo.entity.TruckLocationTrackEntity; import com.njzscloud.supervisory.biz.pojo.entity.TruckLocationTrackEntity;
import com.njzscloud.supervisory.biz.pojo.param.LocationTrackHistoryParam; import com.njzscloud.supervisory.biz.pojo.param.LocationTrackHistoryParam;
@ -34,7 +35,6 @@ import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
/** /**
* *
@ -399,14 +399,14 @@ public class TruckLocationTrackService extends ServiceImpl<TruckLocationTrackMap
RealtimeLocationResult realtimeLocationResult = msg.getMsg(RealtimeLocationResult.class); RealtimeLocationResult realtimeLocationResult = msg.getMsg(RealtimeLocationResult.class);
if (realtimeLocationResult != null) { if (realtimeLocationResult != null) {
String gpsId = realtimeLocationResult.getTerminalId(); String gpsId = realtimeLocationResult.getTerminalId();
/* lock.lock(); lock.lock();
List<SseEmitter> emitterList = emitters.get(gpsId); List<SseEmitter> emitterList = emitters.get(gpsId);
lock.unlock(); lock.unlock();
if (CollUtil.isEmpty(emitterList)) { if (CollUtil.isEmpty(emitterList)) {
// log.info("未找到GPS {} 的实时数据订阅者", gpsId); // log.info("未找到GPS {} 的实时数据订阅者", gpsId);
return; return;
} */ }
/* for (SseEmitter emitter : emitterList) { for (SseEmitter emitter : emitterList) {
send(emitter, "realtimeData", new TruckLocationTrackEntity() send(emitter, "realtimeData", new TruckLocationTrackEntity()
.setId(IdUtil.getSnowflakeNextId()) .setId(IdUtil.getSnowflakeNextId())
.setTerminalId(gpsId) .setTerminalId(gpsId)
@ -418,11 +418,11 @@ public class TruckLocationTrackService extends ServiceImpl<TruckLocationTrackMap
.setDirection(realtimeLocationResult.getDirection()) .setDirection(realtimeLocationResult.getDirection())
.setOverspeed(realtimeLocationResult.isOverspeed()) .setOverspeed(realtimeLocationResult.isOverspeed())
.setCompensate(realtimeLocationResult.getType() == 1)); .setCompensate(realtimeLocationResult.getType() == 1));
} */ }
List<Tuple3<String, String, String>> listeners; /* List<Tuple3<String, String, String>> listeners;
realtimeDataListenerLock.lock(); realtimeDataListenerLock.lock();
try { try {
realtimeDataListener.removeIf(it -> !Websocket.online(it.get_0(), it.get_1())); realtimeDataListener.removeIf(it -> !Websocket.isOnline(it.get_0(), it.get_1()));
listeners = realtimeDataListener.stream() listeners = realtimeDataListener.stream()
.filter(tuple3 -> tuple3.get_2().equals(gpsId)) .filter(tuple3 -> tuple3.get_2().equals(gpsId))
.map(tuple3 -> Tuple3.create(tuple3.get_0(), tuple3.get_1(), tuple3.get_2())) .map(tuple3 -> Tuple3.create(tuple3.get_0(), tuple3.get_1(), tuple3.get_2()))
@ -450,12 +450,12 @@ public class TruckLocationTrackService extends ServiceImpl<TruckLocationTrackMap
.setOverspeed(realtimeLocationResult.isOverspeed()) .setOverspeed(realtimeLocationResult.isOverspeed())
.setCompensate(realtimeLocationResult.getType() == 1)) .setCompensate(realtimeLocationResult.getType() == 1))
); );
} } */
} }
} }
public void onApplicationReady() { public void onApplicationReady() {
Websocket.addListener("/truck_location_track/history", msg -> { Websocket.subscribe("get/truck_location_track/history", msg -> {
LocationTrackHistoryParam param = msg.extractData(LocationTrackHistoryParam.class); LocationTrackHistoryParam param = msg.extractData(LocationTrackHistoryParam.class);
LocalDateTime startTime = param.getStartTime(); LocalDateTime startTime = param.getStartTime();
LocalDateTime endTime = param.getEndTime(); LocalDateTime endTime = param.getEndTime();
@ -483,6 +483,7 @@ public class TruckLocationTrackService extends ServiceImpl<TruckLocationTrackMap
List<TruckLocationTrackEntity> records = resultPage.getRecords(); List<TruckLocationTrackEntity> records = resultPage.getRecords();
if (records.isEmpty()) { if (records.isEmpty()) {
Websocket.publish(orderId + "/truck_location_track/history", R.failed(ExceptionMsg.SYS_ERR_MSG, "数据发送完成"));
return; return;
} }
@ -495,22 +496,19 @@ public class TruckLocationTrackService extends ServiceImpl<TruckLocationTrackMap
if (speed > 100) { if (speed > 100) {
if (!ThreadUtil.sleep(speed)) { if (!ThreadUtil.sleep(speed)) {
log.info("任务被取消"); log.info("任务被取消");
Websocket.publish(orderId + "/truck_location_track/history", R.failed(ExceptionMsg.SYS_ERR_MSG, "数据发送完成"));
return; return;
} }
} }
String uid = msg.getUid(); boolean publish = Websocket.publish(orderId + "/truck_location_track/history", R.success(record));
String cid = msg.getCid(); if (!publish) {
if (!Websocket.online(uid, cid)) {
return; return;
} }
Websocket.send(uid, cid, new WsMsg()
.setEvent("historyData")
.setData(R.success(record))
);
} }
// 判断是否为最后一页 // 判断是否为最后一页
if (currentPage >= resultPage.getPages()) { if (currentPage >= resultPage.getPages()) {
Websocket.publish(orderId + "/truck_location_track/history", R.failed(ExceptionMsg.SYS_ERR_MSG, "数据发送完成"));
return; return;
} }
@ -525,18 +523,72 @@ public class TruckLocationTrackService extends ServiceImpl<TruckLocationTrackMap
}, threadPoolExecutor); }, threadPoolExecutor);
}); });
Websocket.addListener("/truck_location_track/realtime", msg -> { Websocket.subscribe("get/truck_location_track/realtime", msg -> {
LocationTrackHistoryParam param = msg.extractData(LocationTrackHistoryParam.class); LocationTrackHistoryParam param = msg.extractData(LocationTrackHistoryParam.class);
Long orderId = param.getOrderId(); Long orderId = param.getOrderId();
String uid = msg.getUid(); Integer speed = param.getSpeed();
String cid = msg.getCid(); CompletableFuture.runAsync(() -> {
String gpsId = baseMapper.gpsId(orderId); try {
realtimeDataListenerLock.lock(); LocalDateTime startTime = LocalDateTime.now().minusSeconds(1);
try { int errCount = 0;
realtimeDataListener.add(Tuple3.create(uid, cid, gpsId)); while (true) {
} finally { // 检查是否被取消
realtimeDataListenerLock.unlock(); if (Thread.currentThread().isInterrupted()) {
} log.info("任务被取消");
return;
}
Page<TruckLocationTrackEntity> page = new Page<>(1, 10);
page.addOrder(OrderItem.asc("location_time"));
// 分页查询当前页数据
IPage<TruckLocationTrackEntity> resultPage = this.page(page, Wrappers.lambdaQuery(TruckLocationTrackEntity.class)
.eq(TruckLocationTrackEntity::getOrderId, orderId)
.gt(TruckLocationTrackEntity::getLocationTime, startTime));
List<TruckLocationTrackEntity> records = resultPage.getRecords();
if (records.isEmpty()) {
if (errCount >= 20) {
log.error("查询历史轨迹异常, 订单ID: {}, 时间: {}", orderId, startTime);
Websocket.publish(orderId + "/truck_location_track/realtime", R.failed(ExceptionMsg.SYS_ERR_MSG, "暂无实时数据"));
return;
}
errCount++;
if (!ThreadUtil.sleep(300)) {
log.info("任务被取消");
return;
} else {
startTime = LocalDateTime.now();
continue;
}
}
errCount = 0;
// 处理当前页数据,按时间间隔推送
for (TruckLocationTrackEntity record : records) {
if (Thread.currentThread().isInterrupted()) {
log.info("任务被取消");
return;
}
if (speed > 100) {
if (!ThreadUtil.sleep(speed)) {
log.info("任务被取消");
Websocket.publish(orderId + "/truck_location_track/realtime", R.failed(ExceptionMsg.SYS_ERR_MSG, "数据发送完成"));
return;
}
}
startTime = record.getLocationTime();
boolean publish = Websocket.publish(orderId + "/truck_location_track/realtime", R.success(record));
if (!publish) {
return;
}
}
}
} catch (Exception e) {
// 其他异常
log.error("查询历史轨迹异常", e);
}
}, threadPoolExecutor);
/* CompletableFuture.runAsync(() -> { /* CompletableFuture.runAsync(() -> {
while (true) { while (true) {
@ -624,6 +676,175 @@ public class TruckLocationTrackService extends ServiceImpl<TruckLocationTrackMap
}, threadPoolExecutor); */ }, threadPoolExecutor); */
}); });
/* Websocket.addListener("/truck_location_track/history", msg -> {
LocationTrackHistoryParam param = msg.extractData(LocationTrackHistoryParam.class);
LocalDateTime startTime = param.getStartTime();
LocalDateTime endTime = param.getEndTime();
Long orderId = param.getOrderId();
Integer speed = param.getSpeed();
CompletableFuture.runAsync(() -> {
try {
int currentPage = 1;
// 分页查询每次500条
while (true) {
// 检查是否被取消
if (Thread.currentThread().isInterrupted()) {
log.info("任务被取消");
return;
}
Page<TruckLocationTrackEntity> page = new Page<>(currentPage, 500);
page.addOrder(OrderItem.asc("location_time"));
// 分页查询当前页数据
IPage<TruckLocationTrackEntity> resultPage = this.page(page, Wrappers.lambdaQuery(TruckLocationTrackEntity.class)
.eq(TruckLocationTrackEntity::getOrderId, orderId)
.ge(startTime != null, TruckLocationTrackEntity::getLocationTime, startTime)
.le(endTime != null, TruckLocationTrackEntity::getLocationTime, endTime));
List<TruckLocationTrackEntity> records = resultPage.getRecords();
if (records.isEmpty()) {
return;
}
// 处理当前页数据,按时间间隔推送
for (TruckLocationTrackEntity record : records) {
if (Thread.currentThread().isInterrupted()) {
log.info("任务被取消");
return;
}
if (speed > 100) {
if (!ThreadUtil.sleep(speed)) {
log.info("任务被取消");
return;
}
}
String uid = msg.getUid();
String cid = msg.getCid();
if (!Websocket.isOnline(uid, cid)) {
return;
}
Websocket.send(uid, cid, new WsMsg()
.setEvent("historyData")
.setData(R.success(record))
);
}
// 判断是否为最后一页
if (currentPage >= resultPage.getPages()) {
return;
}
// 继续查询下一页
currentPage++;
}
} catch (Exception e) {
// 其他异常
log.error("查询历史轨迹异常", e);
}
}, threadPoolExecutor);
}); */
/* Websocket.addListener("/truck_location_track/realtime", msg -> {
LocationTrackHistoryParam param = msg.extractData(LocationTrackHistoryParam.class);
Long orderId = param.getOrderId();
String uid = msg.getUid();
String cid = msg.getCid();
String gpsId = baseMapper.gpsId(orderId);
realtimeDataListenerLock.lock();
try {
realtimeDataListener.add(Tuple3.create(uid, cid, gpsId));
} finally {
realtimeDataListenerLock.unlock();
}
*//* CompletableFuture.runAsync(() -> {
while (true) {
ThreadUtil.sleep(3000);
realtimeDataListenerLock.lock();
try {
realtimeDataListener.removeIf(it -> !Websocket.online(it.get_0(), it.get_1()));
} finally {
realtimeDataListenerLock.unlock();
}
List<Tuple3<String, String, String>> listeners = realtimeDataListener;
if (listeners.isEmpty()) {
return;
}
for (Tuple3<String, String, String> tuple3 : listeners) {
if (!Websocket.online(uid, cid)) {
return;
}
Websocket.send(uid, cid, new WsMsg()
.setEvent("realtimeData")
.setData(R.success(new TruckLocationTrackEntity()))
);
}
}
}, threadPoolExecutor); *//*
*//* CompletableFuture.runAsync(() -> {
try {
int currentPage = 1;
// 分页查询每次500条
while (true) {
// 检查是否被取消
if (Thread.currentThread().isInterrupted()) {
log.info("任务被取消");
return;
}
Page<TruckLocationTrackEntity> page = new Page<>(currentPage, 500);
page.addOrder(OrderItem.asc("location_time"));
LocalDateTime now = LocalDateTime.now();
// 分页查询当前页数据
IPage<TruckLocationTrackEntity> resultPage = this.page(page, Wrappers.lambdaQuery(TruckLocationTrackEntity.class)
.eq(TruckLocationTrackEntity::getOrderId, orderId)
.ge(TruckLocationTrackEntity::getLocationTime, now));
List<TruckLocationTrackEntity> records = resultPage.getRecords();
if (records.isEmpty()) {
continue;
}
// 处理当前页数据,按时间间隔推送
for (TruckLocationTrackEntity record : records) {
if (Thread.currentThread().isInterrupted()) {
log.info("任务被取消");
return;
}
if (!ThreadUtil.sleep(1000)) {
log.info("任务被取消");
return;
}
Websocket.send(msg.getUid(), msg.getCid(), new WsMsg()
.setEvent("historyData")
.setUid(msg.getUid())
.setCid(msg.getCid())
.setData(record)
);
}
// 判断是否为最后一页
if (currentPage >= resultPage.getPages()) {
continue;
}
// 继续查询下一页
currentPage++;
}
} catch (Exception e) {
// 其他异常
log.error("查询历史轨迹异常", e);
}
}, threadPoolExecutor); *//*
}); */
} }
} }

View File

@ -29,7 +29,6 @@ import java.util.List;
@RequiredArgsConstructor @RequiredArgsConstructor
public class OrderInfoController { public class OrderInfoController {
private final OrderInfoService orderInfoService; private final OrderInfoService orderInfoService;
// private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 100, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy());
/** /**
* *
@ -48,12 +47,6 @@ public class OrderInfoController {
} }
orderInfoService.batchAdd(addOrderInfoParam); orderInfoService.batchAdd(addOrderInfoParam);
/* CompletableFuture.runAsync(() -> {
}, threadPoolExecutor).whenComplete((v, e) -> {
if (e != null) {
log.error("新增订单失败", e);
}
}); */
return R.success(); return R.success();
} }