websocket 工具
parent
fb02103683
commit
66d846c80d
|
|
@ -26,19 +26,58 @@ public class LocationReportMessage {
|
||||||
* 3 0:东经 1:西经
|
* 3 0:东经 1:西经
|
||||||
*/
|
*/
|
||||||
private long status;
|
private long status;
|
||||||
|
/**
|
||||||
|
* 经度
|
||||||
|
*/
|
||||||
private double longitude;
|
private double longitude;
|
||||||
|
/**
|
||||||
|
* 纬度
|
||||||
|
*/
|
||||||
private double latitude;
|
private double latitude;
|
||||||
|
/**
|
||||||
|
* 海拔
|
||||||
|
*/
|
||||||
private int altitude;
|
private int altitude;
|
||||||
|
/**
|
||||||
|
* 速度
|
||||||
|
*/
|
||||||
private double speed;
|
private double speed;
|
||||||
|
/**
|
||||||
|
* 方向
|
||||||
|
*/
|
||||||
private int direction;
|
private int direction;
|
||||||
|
/**
|
||||||
|
* 时间
|
||||||
|
*/
|
||||||
private String time;
|
private String time;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 是否超速
|
||||||
|
*/
|
||||||
private boolean overspeed;
|
private boolean overspeed;
|
||||||
|
/**
|
||||||
|
* 是否主电源欠压
|
||||||
|
*/
|
||||||
private boolean powerUnderVoltage;
|
private boolean powerUnderVoltage;
|
||||||
|
/**
|
||||||
|
* 是否声控录音报警
|
||||||
|
*/
|
||||||
private boolean soundAlarm;
|
private boolean soundAlarm;
|
||||||
|
/**
|
||||||
|
* 是否 ACC 开
|
||||||
|
*/
|
||||||
private boolean accOn;
|
private boolean accOn;
|
||||||
|
/**
|
||||||
|
* 是否定位
|
||||||
|
*/
|
||||||
private boolean position;
|
private boolean position;
|
||||||
|
/**
|
||||||
|
* 是否南纬
|
||||||
|
*/
|
||||||
private boolean south;
|
private boolean south;
|
||||||
|
/**
|
||||||
|
* 是否西经
|
||||||
|
*/
|
||||||
private boolean west;
|
private boolean west;
|
||||||
|
|
||||||
public LocationReportMessage(ByteBuf body) {
|
public LocationReportMessage(ByteBuf body) {
|
||||||
|
|
|
||||||
|
|
@ -36,12 +36,13 @@ public class ImpartialWebSocketHandler extends AbstractWebSocketHandler {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
|
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
|
||||||
String payload = message.getPayload();
|
Wsdoor wsdoor = Wsdoor.of(session);
|
||||||
WsMsg wsMsg = WsMsg.of(payload);
|
WsMsg wsMsg = WsMsg.of(wsdoor, message);
|
||||||
|
|
||||||
if (wsMsg == null
|
if (wsMsg == null
|
||||||
|| StrUtil.isBlank(wsMsg.getAction())
|
|| StrUtil.isBlank(wsMsg.getAction())
|
||||||
|| StrUtil.isBlank(wsMsg.getEvent())) {
|
|| StrUtil.isBlank(wsMsg.getEvent())) {
|
||||||
Wsdoor.of(session).send(new WsMsg().setEvent("error")
|
wsdoor.send(new WsMsg().setEvent("error")
|
||||||
.setData(R.failed(ExceptionMsg.CLI_ERR_MSG, "消息格式错误")));
|
.setData(R.failed(ExceptionMsg.CLI_ERR_MSG, "消息格式错误")));
|
||||||
session.close();
|
session.close();
|
||||||
return;
|
return;
|
||||||
|
|
@ -50,26 +51,29 @@ public class ImpartialWebSocketHandler extends AbstractWebSocketHandler {
|
||||||
|
|
||||||
if (!Websocket.ACTION_PUBLISH.equals(action)
|
if (!Websocket.ACTION_PUBLISH.equals(action)
|
||||||
&& !Websocket.ACTION_PING.equals(action)
|
&& !Websocket.ACTION_PING.equals(action)
|
||||||
|
&& !Websocket.ACTION_UNSUBSCRIBE.equals(action)
|
||||||
// && !Websocket.ACTION_PONG.equals(action)
|
// && !Websocket.ACTION_PONG.equals(action)
|
||||||
&& !Websocket.ACTION_SUBSCRIBE.equals(action)
|
&& !Websocket.ACTION_SUBSCRIBE.equals(action)
|
||||||
&& !Websocket.ACTION_UNSUBSCRIBE.equals(action)) {
|
) {
|
||||||
Wsdoor.of(session).send(new WsMsg().setEvent("error")
|
wsdoor.send(new WsMsg().setEvent("error")
|
||||||
.setData(R.failed(ExceptionMsg.CLI_ERR_MSG, "消息格式错误")));
|
.setData(R.failed(ExceptionMsg.CLI_ERR_MSG, "消息格式错误")));
|
||||||
session.close();
|
session.close();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
String event = wsMsg.getEvent();
|
String event = wsMsg.getEvent();
|
||||||
|
|
||||||
if (Websocket.ACTION_PUBLISH.equals(action)) {
|
if (Websocket.ACTION_PUBLISH.equals(action)) {
|
||||||
Object data = wsMsg.getData();
|
Websocket.publish(wsMsg);
|
||||||
Websocket.publish(event, data);
|
|
||||||
} else if (Websocket.ACTION_SUBSCRIBE.equals(action)) {
|
} else if (Websocket.ACTION_SUBSCRIBE.equals(action)) {
|
||||||
Websocket.subscribe(event, Wsdoor.of(session));
|
WsMsg.User from = wsMsg.getFrom();
|
||||||
} else if (Websocket.ACTION_PING.equals(action)) {
|
log.info("用户Id:{},客户端Id:{},订阅:{}", from.getUid(), from.getCid(), event);
|
||||||
Wsdoor.of(session).send(new WsMsg().setAction(Websocket.ACTION_PONG));
|
Websocket.subscribe(event, wsdoor);
|
||||||
|
} else if (Websocket.ACTION_UNSUBSCRIBE.equals(action)) {
|
||||||
|
WsMsg.User from = wsMsg.getFrom();
|
||||||
|
log.info("用户Id:{},客户端Id:{},取消订阅:{}", from.getUid(), from.getCid(), event);
|
||||||
|
Websocket.unsubscribe(event, wsdoor);
|
||||||
} else {
|
} else {
|
||||||
Websocket.unsubscribe(event, Wsdoor.of(session));
|
wsdoor.send(new WsMsg().setAction(Websocket.ACTION_PONG));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,8 @@
|
||||||
package com.njzscloud.common.ws.support;
|
package com.njzscloud.common.ws.support;
|
||||||
|
|
||||||
import cn.hutool.core.util.IdUtil;
|
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
|
import com.njzscloud.common.core.tuple.Tuple2;
|
||||||
|
import com.njzscloud.common.security.support.Token;
|
||||||
import com.njzscloud.common.security.support.UserDetail;
|
import com.njzscloud.common.security.support.UserDetail;
|
||||||
import com.njzscloud.common.security.util.SecurityUtil;
|
import com.njzscloud.common.security.util.SecurityUtil;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
@ -35,9 +36,10 @@ public class TokenHandshakeInterceptor implements HandshakeInterceptor {
|
||||||
String authorization = req.getParameter(Authorization);
|
String authorization = req.getParameter(Authorization);
|
||||||
if (StrUtil.isBlank(authorization)) return false;
|
if (StrUtil.isBlank(authorization)) return false;
|
||||||
|
|
||||||
Long uid = this.resolve(authorization);
|
Tuple2<Long, String> resolved = this.resolve(authorization);
|
||||||
if (uid == null) return false;
|
if (resolved == null) return false;
|
||||||
String cid = IdUtil.fastSimpleUUID();
|
Long uid = resolved.get_0();
|
||||||
|
String cid = resolved.get_1();
|
||||||
attributes.put(Constants.SESSION_ATTRIBUTE_TOKEN, authorization);
|
attributes.put(Constants.SESSION_ATTRIBUTE_TOKEN, authorization);
|
||||||
attributes.put(Constants.SESSION_ATTRIBUTE_CID, cid);
|
attributes.put(Constants.SESSION_ATTRIBUTE_CID, cid);
|
||||||
attributes.put(Constants.SESSION_ATTRIBUTE_UID, uid);
|
attributes.put(Constants.SESSION_ATTRIBUTE_UID, uid);
|
||||||
|
|
@ -62,9 +64,11 @@ public class TokenHandshakeInterceptor implements HandshakeInterceptor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Long resolve(String token) {
|
private Tuple2<Long, String> resolve(String tokenStr) {
|
||||||
UserDetail userDetail = SecurityUtil.parseToken(token);
|
UserDetail userDetail = SecurityUtil.parseToken(tokenStr);
|
||||||
if (userDetail == null) return null;
|
if (userDetail == null) return null;
|
||||||
return userDetail.getUserId();
|
Token token = userDetail.getToken();
|
||||||
|
if (token == null) return null;
|
||||||
|
return Tuple2.create(userDetail.getUserId(), token.getTid());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,11 +3,14 @@ package com.njzscloud.common.ws.support;
|
||||||
import cn.hutool.core.collection.CollUtil;
|
import cn.hutool.core.collection.CollUtil;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* websocket 工具
|
* websocket 工具
|
||||||
|
|
@ -34,19 +37,35 @@ public class Websocket {
|
||||||
static final ReentrantReadWriteLock.ReadLock LISTENER_CACHE_READ_LOCK = LISTENER_CACHE_LOCK.readLock();
|
static final ReentrantReadWriteLock.ReadLock LISTENER_CACHE_READ_LOCK = LISTENER_CACHE_LOCK.readLock();
|
||||||
static final ReentrantReadWriteLock.WriteLock LISTENER_CACHE_WRITE_LOCK = LISTENER_CACHE_LOCK.writeLock();
|
static final ReentrantReadWriteLock.WriteLock LISTENER_CACHE_WRITE_LOCK = LISTENER_CACHE_LOCK.writeLock();
|
||||||
|
|
||||||
public static boolean publish(String event, Object data) {
|
public static boolean publish(WsMsg wsMsg) {
|
||||||
|
wsMsg.setAction(ACTION_PUBLISH);
|
||||||
// 总数,成功数
|
// 总数,成功数
|
||||||
Integer[] success = {0, 0};
|
Integer[] success = {0, 0};
|
||||||
|
String event = wsMsg.getEvent();
|
||||||
|
List<WsMsg.User> to = wsMsg.getTo();
|
||||||
|
Set<Long> uids;
|
||||||
|
Set<String> cids;
|
||||||
|
if (CollUtil.isNotEmpty(to)) {
|
||||||
|
uids = to.stream().map(WsMsg.User::getUid).collect(Collectors.toSet());
|
||||||
|
cids = to.stream().map(WsMsg.User::getCid).collect(Collectors.toSet());
|
||||||
|
} else {
|
||||||
|
uids = Collections.emptySet();
|
||||||
|
cids = Collections.emptySet();
|
||||||
|
}
|
||||||
EVENT_CACHE_READ_LOCK.lock();
|
EVENT_CACHE_READ_LOCK.lock();
|
||||||
try {
|
try {
|
||||||
EVENT_CACHE.computeIfPresent(event, (k, v) -> {
|
EVENT_CACHE.computeIfPresent(event, (k, v) -> {
|
||||||
if (CollUtil.isEmpty(v)) return v;
|
if (CollUtil.isEmpty(v)) return v;
|
||||||
success[1] = v.size();
|
List<Wsdoor> wsdoors = v;
|
||||||
WsMsg wsMsg = new WsMsg()
|
if (!uids.isEmpty() || !cids.isEmpty()) {
|
||||||
.setAction(ACTION_PUBLISH)
|
wsdoors = v.stream().filter(it -> {
|
||||||
.setEvent(event)
|
Long uid = it.getUid();
|
||||||
.setData(data);
|
String cid = it.getCid();
|
||||||
for (Wsdoor wsdoor : v) {
|
return cids.contains(cid) || uids.contains(uid);
|
||||||
|
}).collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
success[1] = wsdoors.size();
|
||||||
|
for (Wsdoor wsdoor : wsdoors) {
|
||||||
boolean send = wsdoor.send(wsMsg);
|
boolean send = wsdoor.send(wsMsg);
|
||||||
if (send) success[1] = success[1] + 1;
|
if (send) success[1] = success[1] + 1;
|
||||||
}
|
}
|
||||||
|
|
@ -61,11 +80,6 @@ public class Websocket {
|
||||||
LISTENER_CACHE.computeIfPresent(event, (k, v) -> {
|
LISTENER_CACHE.computeIfPresent(event, (k, v) -> {
|
||||||
if (CollUtil.isEmpty(v)) return v;
|
if (CollUtil.isEmpty(v)) return v;
|
||||||
success[1] = success[1] + v.size();
|
success[1] = success[1] + v.size();
|
||||||
|
|
||||||
WsMsg wsMsg = new WsMsg()
|
|
||||||
.setAction(ACTION_PUBLISH)
|
|
||||||
.setEvent(event)
|
|
||||||
.setData(data);
|
|
||||||
for (Consumer<WsMsg> listener : v) {
|
for (Consumer<WsMsg> listener : v) {
|
||||||
try {
|
try {
|
||||||
listener.accept(wsMsg);
|
listener.accept(wsMsg);
|
||||||
|
|
@ -83,6 +97,13 @@ public class Websocket {
|
||||||
return success[1] > 0;
|
return success[1] > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static boolean publish(String event, Object data) {
|
||||||
|
return publish(new WsMsg()
|
||||||
|
.setAction(ACTION_PUBLISH)
|
||||||
|
.setEvent(event)
|
||||||
|
.setData(data));
|
||||||
|
}
|
||||||
|
|
||||||
public static void subscribe(String event, Wsdoor wsdoor) {
|
public static void subscribe(String event, Wsdoor wsdoor) {
|
||||||
EVENT_CACHE_READ_LOCK.lock();
|
EVENT_CACHE_READ_LOCK.lock();
|
||||||
try {
|
try {
|
||||||
|
|
@ -151,147 +172,4 @@ public class Websocket {
|
||||||
LISTENER_CACHE_READ_LOCK.unlock();
|
LISTENER_CACHE_READ_LOCK.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
*//**
|
|
||||||
* 操作 websocket 连接<br/>
|
|
||||||
* readonly 为 true 时, 使用读锁, 否则使用写锁<br/>
|
|
||||||
*
|
|
||||||
* @param readonly 是否只读
|
|
||||||
* @param consumer 操作方法
|
|
||||||
*//*
|
|
||||||
static void optLink(boolean readonly, Consumer<Map<String, Map<String, Wsdoor>>> consumer) {
|
|
||||||
if (readonly) {
|
|
||||||
try {
|
|
||||||
Websocket.LINK_CACHE_READ_LOCK.lock();
|
|
||||||
consumer.accept(LINK_CACHE);
|
|
||||||
} finally {
|
|
||||||
Websocket.LINK_CACHE_READ_LOCK.unlock();
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
Websocket.LINK_CACHE_WRITE_LOCK.lock();
|
|
||||||
consumer.accept(LINK_CACHE);
|
|
||||||
} finally {
|
|
||||||
Websocket.LINK_CACHE_WRITE_LOCK.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
*//**
|
|
||||||
* 操作 websocket 连接<br/>
|
|
||||||
* readonly 为 true 时, 使用读锁, 否则使用写锁<br/>
|
|
||||||
*
|
|
||||||
* @param readonly 是否只读
|
|
||||||
* @param uid 用户 Id
|
|
||||||
* @param consumer 操作方法
|
|
||||||
*//*
|
|
||||||
static void optLink(boolean readonly, String uid, Consumer<Map<String, Wsdoor>> consumer) {
|
|
||||||
if (readonly) {
|
|
||||||
try {
|
|
||||||
Websocket.LINK_CACHE_READ_LOCK.lock();
|
|
||||||
Map<String, Wsdoor> map = LINK_CACHE.get(uid);
|
|
||||||
consumer.accept(map);
|
|
||||||
} finally {
|
|
||||||
Websocket.LINK_CACHE_READ_LOCK.unlock();
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
Websocket.LINK_CACHE_WRITE_LOCK.lock();
|
|
||||||
Map<String, Wsdoor> map = LINK_CACHE.get(uid);
|
|
||||||
consumer.accept(map);
|
|
||||||
} finally {
|
|
||||||
Websocket.LINK_CACHE_WRITE_LOCK.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
*//**
|
|
||||||
* 操作 websocket 连接<br/>
|
|
||||||
* readonly 为 true 时, 使用读锁, 否则使用写锁<br/>
|
|
||||||
*
|
|
||||||
* @param readonly 是否只读
|
|
||||||
* @param uid 用户 Id
|
|
||||||
* @param cid 客户端 Id
|
|
||||||
* @param consumer 操作方法
|
|
||||||
*//*
|
|
||||||
static void optLink(boolean readonly, String uid, String cid, Consumer<Wsdoor> consumer) {
|
|
||||||
if (readonly) {
|
|
||||||
try {
|
|
||||||
Websocket.LINK_CACHE_READ_LOCK.lock();
|
|
||||||
Map<String, Wsdoor> map = LINK_CACHE.get(uid);
|
|
||||||
Wsdoor user = null;
|
|
||||||
if (map != null) user = map.get(cid);
|
|
||||||
consumer.accept(user);
|
|
||||||
} finally {
|
|
||||||
Websocket.LINK_CACHE_READ_LOCK.unlock();
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
Websocket.LINK_CACHE_WRITE_LOCK.lock();
|
|
||||||
Map<String, Wsdoor> map = LINK_CACHE.get(uid);
|
|
||||||
Wsdoor user = null;
|
|
||||||
if (map != null) user = map.get(cid);
|
|
||||||
consumer.accept(user);
|
|
||||||
} finally {
|
|
||||||
Websocket.LINK_CACHE_WRITE_LOCK.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void addListener(String event, Consumer<WsMsg> consumer) {
|
|
||||||
LISTENER_CACHE.computeIfAbsent(event, k -> new java.util.ArrayList<>()).add(consumer);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void removeListener(String event, Consumer<WsMsg> consumer) {
|
|
||||||
List<Consumer<WsMsg>> consumers = LISTENER_CACHE.get(event);
|
|
||||||
if (consumers != null) consumers.remove(consumer);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public static void dispatch(WsMsg wsMsg) {
|
|
||||||
if (wsMsg == null) return;
|
|
||||||
List<Consumer<WsMsg>> consumers = LISTENER_CACHE.get(wsMsg.getEvent());
|
|
||||||
if (CollUtil.isNotEmpty(consumers)) {
|
|
||||||
for (Consumer<WsMsg> consumer : consumers) {
|
|
||||||
try {
|
|
||||||
consumer.accept(wsMsg);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("WebSocket 消息处理失败, 事件: {}, 数据: {}", wsMsg.getEvent(), wsMsg, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
*//**
|
|
||||||
* 发送文本数据
|
|
||||||
*
|
|
||||||
* @param uid 用户 Id
|
|
||||||
* @param cid 客户端 Id
|
|
||||||
* @param wsMsg 消息
|
|
||||||
*//*
|
|
||||||
public static void send(String uid, String cid, WsMsg wsMsg) {
|
|
||||||
optLink(true, uid, cid, it -> {
|
|
||||||
if (it != null) it.send(wsMsg);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
*//**
|
|
||||||
* 发送文本数据
|
|
||||||
*
|
|
||||||
* @param uid 用户 Id
|
|
||||||
* @param wsMsg 消息
|
|
||||||
*//*
|
|
||||||
public static void send(String uid, WsMsg wsMsg) {
|
|
||||||
optLink(true, uid, it -> {
|
|
||||||
if (it == null) return;
|
|
||||||
|
|
||||||
Collection<Wsdoor> wsdoors = it.values();
|
|
||||||
|
|
||||||
if (wsdoors.isEmpty()) return;
|
|
||||||
|
|
||||||
for (Wsdoor wsdoor : wsdoors) wsdoor.send(wsMsg);
|
|
||||||
|
|
||||||
});
|
|
||||||
} */
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,8 +5,12 @@ import com.fasterxml.jackson.annotation.JsonInclude;
|
||||||
import com.njzscloud.common.core.jackson.Jackson;
|
import com.njzscloud.common.core.jackson.Jackson;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.Setter;
|
import lombok.Setter;
|
||||||
|
import lombok.ToString;
|
||||||
import lombok.experimental.Accessors;
|
import lombok.experimental.Accessors;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.web.socket.TextMessage;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* websocket 消息内容
|
* websocket 消息内容
|
||||||
|
|
@ -16,6 +20,17 @@ import lombok.extern.slf4j.Slf4j;
|
||||||
@Setter
|
@Setter
|
||||||
@Accessors(chain = true)
|
@Accessors(chain = true)
|
||||||
public class WsMsg {
|
public class WsMsg {
|
||||||
|
/**
|
||||||
|
* 发送方
|
||||||
|
*/
|
||||||
|
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||||
|
private User from;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 接收方
|
||||||
|
*/
|
||||||
|
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||||
|
private List<User> to;
|
||||||
|
|
||||||
@JsonInclude(JsonInclude.Include.NON_EMPTY)
|
@JsonInclude(JsonInclude.Include.NON_EMPTY)
|
||||||
private String action;
|
private String action;
|
||||||
|
|
@ -32,11 +47,19 @@ public class WsMsg {
|
||||||
@JsonInclude(JsonInclude.Include.NON_NULL)
|
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||||
private Object data;
|
private Object data;
|
||||||
|
|
||||||
public static WsMsg of(String json) {
|
public static WsMsg of(Wsdoor wsdoor, TextMessage message) {
|
||||||
|
String payload = message.getPayload();
|
||||||
try {
|
try {
|
||||||
return Jackson.toBean(json, WsMsg.class);
|
WsMsg wsMsg = Jackson.toBean(payload, WsMsg.class);
|
||||||
|
if (wsMsg != null) {
|
||||||
|
wsMsg.setFrom(new User()
|
||||||
|
.setUid(wsdoor.getUid())
|
||||||
|
.setCid(wsdoor.getCid())
|
||||||
|
);
|
||||||
|
}
|
||||||
|
return wsMsg;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("WebSocket 消息反序列化失败, 数据: {}", json, e);
|
log.error("WebSocket 消息反序列化失败, 数据: {}", payload, e);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -49,4 +72,21 @@ public class WsMsg {
|
||||||
public <T> T extractData(Class<T> clazz) {
|
public <T> T extractData(Class<T> clazz) {
|
||||||
return Jackson.toBean(Jackson.toJsonStr(data), clazz);
|
return Jackson.toBean(Jackson.toJsonStr(data), clazz);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
@Setter
|
||||||
|
@ToString
|
||||||
|
@Accessors(chain = true)
|
||||||
|
public static class User {
|
||||||
|
/**
|
||||||
|
* 用户 Id
|
||||||
|
*/
|
||||||
|
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||||
|
private Long uid;
|
||||||
|
/**
|
||||||
|
* 客户端 Id
|
||||||
|
*/
|
||||||
|
@JsonInclude(JsonInclude.Include.NON_EMPTY)
|
||||||
|
private String cid;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,7 @@ import com.njzscloud.common.mp.support.PageParam;
|
||||||
import com.njzscloud.common.mp.support.PageResult;
|
import com.njzscloud.common.mp.support.PageResult;
|
||||||
import com.njzscloud.common.mqtt.support.MqttMsg;
|
import com.njzscloud.common.mqtt.support.MqttMsg;
|
||||||
import com.njzscloud.common.ws.support.Websocket;
|
import com.njzscloud.common.ws.support.Websocket;
|
||||||
|
import com.njzscloud.common.ws.support.WsMsg;
|
||||||
import com.njzscloud.supervisory.biz.mapper.TruckLocationTrackMapper;
|
import com.njzscloud.supervisory.biz.mapper.TruckLocationTrackMapper;
|
||||||
import com.njzscloud.supervisory.biz.pojo.entity.TruckLocationTrackEntity;
|
import com.njzscloud.supervisory.biz.pojo.entity.TruckLocationTrackEntity;
|
||||||
import com.njzscloud.supervisory.biz.pojo.param.LocationTrackHistoryParam;
|
import com.njzscloud.supervisory.biz.pojo.param.LocationTrackHistoryParam;
|
||||||
|
|
@ -29,6 +30,7 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.time.format.DateTimeFormatter;
|
import java.time.format.DateTimeFormatter;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
@ -455,7 +457,9 @@ public class TruckLocationTrackService extends ServiceImpl<TruckLocationTrackMap
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onApplicationReady() {
|
public void onApplicationReady() {
|
||||||
Websocket.subscribe("get/truck_location_track/history", msg -> {
|
Websocket.subscribe("up/truck_location_track/history", msg -> {
|
||||||
|
WsMsg.User from = msg.getFrom();
|
||||||
|
String cid = from.getCid();
|
||||||
LocationTrackHistoryParam param = msg.extractData(LocationTrackHistoryParam.class);
|
LocationTrackHistoryParam param = msg.extractData(LocationTrackHistoryParam.class);
|
||||||
LocalDateTime startTime = param.getStartTime();
|
LocalDateTime startTime = param.getStartTime();
|
||||||
LocalDateTime endTime = param.getEndTime();
|
LocalDateTime endTime = param.getEndTime();
|
||||||
|
|
@ -483,7 +487,11 @@ public class TruckLocationTrackService extends ServiceImpl<TruckLocationTrackMap
|
||||||
|
|
||||||
List<TruckLocationTrackEntity> records = resultPage.getRecords();
|
List<TruckLocationTrackEntity> records = resultPage.getRecords();
|
||||||
if (records.isEmpty()) {
|
if (records.isEmpty()) {
|
||||||
Websocket.publish(orderId + "/truck_location_track/history", R.failed(ExceptionMsg.SYS_ERR_MSG, "数据发送完成"));
|
Websocket.publish(new WsMsg()
|
||||||
|
.setEvent("down/truck_location_track/history")
|
||||||
|
.setData(R.failed(ExceptionMsg.SYS_ERR_MSG, "数据发送完成"))
|
||||||
|
.setTo(Collections.singletonList(new WsMsg.User().setCid(cid)))
|
||||||
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -496,11 +504,19 @@ public class TruckLocationTrackService extends ServiceImpl<TruckLocationTrackMap
|
||||||
if (speed > 100) {
|
if (speed > 100) {
|
||||||
if (!ThreadUtil.sleep(speed)) {
|
if (!ThreadUtil.sleep(speed)) {
|
||||||
log.info("任务被取消");
|
log.info("任务被取消");
|
||||||
Websocket.publish(orderId + "/truck_location_track/history", R.failed(ExceptionMsg.SYS_ERR_MSG, "数据发送完成"));
|
Websocket.publish(new WsMsg()
|
||||||
|
.setEvent("down/truck_location_track/history")
|
||||||
|
.setData(R.failed(ExceptionMsg.SYS_ERR_MSG, "数据发送完成"))
|
||||||
|
.setTo(Collections.singletonList(new WsMsg.User().setCid(cid)))
|
||||||
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
boolean publish = Websocket.publish(orderId + "/truck_location_track/history", R.success(record));
|
boolean publish = Websocket.publish(new WsMsg()
|
||||||
|
.setEvent("down/truck_location_track/history")
|
||||||
|
.setData(R.success(record))
|
||||||
|
.setTo(Collections.singletonList(new WsMsg.User().setCid(cid)))
|
||||||
|
);
|
||||||
if (!publish) {
|
if (!publish) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
@ -508,7 +524,11 @@ public class TruckLocationTrackService extends ServiceImpl<TruckLocationTrackMap
|
||||||
|
|
||||||
// 判断是否为最后一页
|
// 判断是否为最后一页
|
||||||
if (currentPage >= resultPage.getPages()) {
|
if (currentPage >= resultPage.getPages()) {
|
||||||
Websocket.publish(orderId + "/truck_location_track/history", R.failed(ExceptionMsg.SYS_ERR_MSG, "数据发送完成"));
|
Websocket.publish(new WsMsg()
|
||||||
|
.setEvent("down/truck_location_track/history")
|
||||||
|
.setData(R.failed(ExceptionMsg.SYS_ERR_MSG, "数据发送完成"))
|
||||||
|
.setTo(Collections.singletonList(new WsMsg.User().setCid(cid)))
|
||||||
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -523,7 +543,9 @@ public class TruckLocationTrackService extends ServiceImpl<TruckLocationTrackMap
|
||||||
}, threadPoolExecutor);
|
}, threadPoolExecutor);
|
||||||
});
|
});
|
||||||
|
|
||||||
Websocket.subscribe("get/truck_location_track/realtime", msg -> {
|
Websocket.subscribe("up/truck_location_track/realtime", msg -> {
|
||||||
|
WsMsg.User from = msg.getFrom();
|
||||||
|
String cid = from.getCid();
|
||||||
LocationTrackHistoryParam param = msg.extractData(LocationTrackHistoryParam.class);
|
LocationTrackHistoryParam param = msg.extractData(LocationTrackHistoryParam.class);
|
||||||
Long orderId = param.getOrderId();
|
Long orderId = param.getOrderId();
|
||||||
Integer speed = param.getSpeed();
|
Integer speed = param.getSpeed();
|
||||||
|
|
@ -550,7 +572,11 @@ public class TruckLocationTrackService extends ServiceImpl<TruckLocationTrackMap
|
||||||
if (records.isEmpty()) {
|
if (records.isEmpty()) {
|
||||||
if (errCount >= 20) {
|
if (errCount >= 20) {
|
||||||
log.error("查询历史轨迹异常, 订单ID: {}, 时间: {}", orderId, startTime);
|
log.error("查询历史轨迹异常, 订单ID: {}, 时间: {}", orderId, startTime);
|
||||||
Websocket.publish(orderId + "/truck_location_track/realtime", R.failed(ExceptionMsg.SYS_ERR_MSG, "暂无实时数据"));
|
Websocket.publish(new WsMsg()
|
||||||
|
.setEvent("down/truck_location_track/realtime")
|
||||||
|
.setData(R.failed(ExceptionMsg.SYS_ERR_MSG, "暂无实时数据"))
|
||||||
|
.setTo(Collections.singletonList(new WsMsg.User().setCid(cid)))
|
||||||
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
errCount++;
|
errCount++;
|
||||||
|
|
@ -573,12 +599,20 @@ public class TruckLocationTrackService extends ServiceImpl<TruckLocationTrackMap
|
||||||
if (speed > 100) {
|
if (speed > 100) {
|
||||||
if (!ThreadUtil.sleep(speed)) {
|
if (!ThreadUtil.sleep(speed)) {
|
||||||
log.info("任务被取消");
|
log.info("任务被取消");
|
||||||
Websocket.publish(orderId + "/truck_location_track/realtime", R.failed(ExceptionMsg.SYS_ERR_MSG, "数据发送完成"));
|
Websocket.publish(new WsMsg()
|
||||||
|
.setEvent("down/truck_location_track/realtime")
|
||||||
|
.setData(R.failed(ExceptionMsg.SYS_ERR_MSG, "数据发送完成"))
|
||||||
|
.setTo(Collections.singletonList(new WsMsg.User().setCid(cid)))
|
||||||
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
startTime = record.getLocationTime();
|
startTime = record.getLocationTime();
|
||||||
boolean publish = Websocket.publish(orderId + "/truck_location_track/realtime", R.success(record));
|
boolean publish = Websocket.publish(new WsMsg()
|
||||||
|
.setEvent("down/truck_location_track/realtime")
|
||||||
|
.setData(R.success(record))
|
||||||
|
.setTo(Collections.singletonList(new WsMsg.User().setCid(cid)))
|
||||||
|
);
|
||||||
if (!publish) {
|
if (!publish) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue