localizer
lzq 2025-11-11 18:54:34 +08:00
parent 53176657d1
commit dc4b0143e0
14 changed files with 184 additions and 772 deletions

View File

@ -6,6 +6,7 @@ import com.njzscloud.common.mqtt.util.Mqtt;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -21,7 +22,7 @@ public class DeviceStore {
Mqtt.publish("localizer/loader");
}
public static void setLocalizers(LinkedList<Localizer> list) {
public static void setLocalizers(List<Localizer> list) {
localizersWriteLock.lock();
try {
log.info("重新加载设备:{}", list.size());

View File

@ -18,7 +18,12 @@ public class Localizer {
@Getter
@Setter
protected String terminalId;
/**
*
*/
@Getter
@Setter
private String licensePlate;
/**
*
*/

View File

@ -3,6 +3,7 @@ package com.njzscloud.common.localizer.jt808.support;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.njzscloud.common.localizer.DeviceStore;
import com.njzscloud.common.localizer.Localizer;
import com.njzscloud.common.localizer.contant.LocalizerType;
@ -10,11 +11,13 @@ import com.njzscloud.common.localizer.contant.MessageType;
import com.njzscloud.common.localizer.jt808.JT808;
import com.njzscloud.common.localizer.jt808.message.*;
import com.njzscloud.common.localizer.mqtt.result.RealtimeLocationResult;
import com.njzscloud.common.localizer.service.LocalizerService;
import com.njzscloud.common.mqtt.util.Mqtt;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
import java.util.function.Consumer;
@Slf4j
@ -105,9 +108,12 @@ public abstract class JT808MessageListener {
LocationReportMessage locationReportMsg = new LocationReportMessage(byteBuf);
double speed = locationReportMsg.getSpeed();
int speedThreshold = localizer.getSpeedThreshold();
String licensePlate = localizer.getLicensePlate();
RealtimeLocationResult realtimeLocationResult = BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class)
.setLicensePlate(licensePlate)
.setTerminalId(terminalId)
.setOverspeed(speed > speedThreshold);
saveData(realtimeLocationResult);
Mqtt.publish(terminalId + "/track_location", realtimeLocationResult);
Mqtt.publish(terminalId + "/track_location_real", realtimeLocationResult);
} finally {
@ -137,10 +143,13 @@ public abstract class JT808MessageListener {
LocationReportMessage locationReportMsg = new LocationReportMessage(byteBuf.slice(byteBuf.readerIndex(), length));
byteBuf.skipBytes(length);
double speed = locationReportMsg.getSpeed();
String licensePlate = localizer.getLicensePlate();
RealtimeLocationResult realtimeLocationResult = BeanUtil.copyProperties(locationReportMsg, RealtimeLocationResult.class)
.setLicensePlate(licensePlate)
.setTerminalId(terminalId)
.setOverspeed(speed > speedThreshold)
.setType(type);
saveData(realtimeLocationResult);
Mqtt.publish(terminalId + "/track_location", realtimeLocationResult);
Mqtt.publish(terminalId + "/track_location_real", realtimeLocationResult);
}
@ -149,6 +158,17 @@ public abstract class JT808MessageListener {
}
}
protected void saveData(RealtimeLocationResult realtimeLocationResult) {
try {
LocalizerService localizerService = SpringUtil.getBean(LocalizerService.class);
if (localizerService != null) {
localizerService.save(Collections.singletonList(realtimeLocationResult));
}
} catch (Exception e) {
log.error("数据保存失败", e);
}
}
/**
*
*/

View File

@ -11,6 +11,11 @@ import lombok.experimental.Accessors;
@Accessors(chain = true)
public class RealtimeLocationResult {
private String terminalId;
/**
*
*/
private String licensePlate;
private int type;
/**

View File

@ -3,11 +3,8 @@ package com.njzscloud.common.localizer.service;
import com.njzscloud.common.localizer.mqtt.result.RealtimeLocationResult;
import java.time.LocalDateTime;
import java.util.List;
public interface LocalizerService {
void save(List<RealtimeLocationResult> data);
List<RealtimeLocationResult> search(LocalDateTime start, LocalDateTime end);
}

View File

@ -25,6 +25,10 @@
<groupId>com.njzscloud</groupId>
<artifactId>njzscloud-common-mvc</artifactId>
</dependency>
<dependency>
<groupId>com.njzscloud</groupId>
<artifactId>njzscloud-common-ws</artifactId>
</dependency>
<dependency>
<groupId>com.njzscloud</groupId>
<artifactId>njzscloud-common-mp</artifactId>

View File

@ -0,0 +1,9 @@
package com.njzscloud.localizer.device.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.njzscloud.localizer.device.pojo.entity.DeviceEntity;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface DeviceMapper extends BaseMapper<DeviceEntity> {
}

View File

@ -0,0 +1,32 @@
package com.njzscloud.localizer.device.pojo.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import com.njzscloud.common.localizer.contant.LocalizerType;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
@Getter
@Setter
@ToString
@Accessors(chain = true)
@TableName("localizer_device")
public class DeviceEntity {
/**
* Id
*/
protected String terminalId;
/**
*
*/
protected LocalizerType localizerType;
/**
*
*/
private String licensePlate;
/**
*
*/
private Integer speedThreshold;
}

View File

@ -0,0 +1,11 @@
package com.njzscloud.localizer.device.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njzscloud.localizer.device.mapper.DeviceMapper;
import com.njzscloud.localizer.device.pojo.entity.DeviceEntity;
import org.springframework.stereotype.Service;
@Service
public class DeviceService extends ServiceImpl<DeviceMapper, DeviceEntity> implements IService<DeviceEntity> {
}

View File

@ -1,17 +1,11 @@
package com.njzscloud.localizer.track.controller;
import com.njzscloud.common.core.utils.R;
import com.njzscloud.common.mp.support.PageParam;
import com.njzscloud.common.mp.support.PageResult;
import com.njzscloud.localizer.track.pojo.entity.TruckLocationTrackEntity;
import com.njzscloud.localizer.track.service.TruckLocationTrackService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.time.LocalDateTime;
import java.util.List;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
*
@ -25,73 +19,4 @@ public class TruckLocationTrackController {
private final TruckLocationTrackService truckLocationTrackService;
/**
*
*/
@PostMapping("/add")
public R<?> add(@RequestBody TruckLocationTrackEntity truckLocationTrackEntity) {
truckLocationTrackService.add(truckLocationTrackEntity);
return R.success();
}
/**
*
*/
@PostMapping("/modify")
public R<?> modify(@RequestBody TruckLocationTrackEntity truckLocationTrackEntity) {
truckLocationTrackService.modify(truckLocationTrackEntity);
return R.success();
}
/**
*
*/
@PostMapping("/del")
public R<?> del(@RequestBody List<Long> ids) {
truckLocationTrackService.del(ids);
return R.success();
}
/**
*
*/
@GetMapping("/detail")
public R<TruckLocationTrackEntity> detail(@RequestParam Long id) {
return R.success(truckLocationTrackService.detail(id));
}
/**
*
*/
@GetMapping("/paging")
public R<PageResult<TruckLocationTrackEntity>> paging(PageParam pageParam, TruckLocationTrackEntity truckLocationTrackEntity) {
return R.success(truckLocationTrackService.paging(pageParam, truckLocationTrackEntity));
}
/**
*
*/
@GetMapping("/history")
public SseEmitter history(@RequestParam("orderId") Long orderId,
@RequestParam(value = "startTime", required = false) LocalDateTime startTime,
@RequestParam(value = "endTime", required = false) LocalDateTime endTime,
@RequestParam(value = "speed", defaultValue = "1") Integer speed
) {
if (speed <= 0) {
speed = 1;
} else if (speed > 60) {
speed = 60;
}
return truckLocationTrackService.history(orderId, startTime, endTime, speed);
}
/**
*
*/
@GetMapping("/realtime")
public SseEmitter realtime(@RequestParam("orderId") Long orderId) {
return truckLocationTrackService.realtime(orderId);
}
}

View File

@ -3,19 +3,10 @@ package com.njzscloud.localizer.track.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.njzscloud.localizer.track.pojo.entity.TruckLocationTrackEntity;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
/**
*
*/
@Mapper
public interface TruckLocationTrackMapper extends BaseMapper<TruckLocationTrackEntity> {
@Select("SELECT b.gps\n" +
"FROM order_info a\n" +
" INNER JOIN biz_truck b ON b.id = a.truck_id AND b.deleted = 0\n" +
"WHERE a.id = #{orderId}\n" +
" AND a.deleted = 0")
String gpsId(@Param("orderId") Long orderId);
}

View File

@ -23,16 +23,11 @@ public class TruckLocationTrackEntity {
*/
@TableId(type = IdType.ASSIGN_ID)
private Long id;
/**
* Id
*/
private Long orderId;
/**
* Id
*
*/
private Long truckId;
private String licensePlate;
/**
*
*/

View File

@ -0,0 +1,27 @@
package com.njzscloud.localizer.track.pojo.param;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
import java.time.LocalDateTime;
@Getter
@Setter
@ToString
@Accessors(chain = true)
public class LocationTrackSearchParam {
private String terminalId;
/**
*
*/
private String licensePlate;
private LocalDateTime startTime;
private LocalDateTime endTime;
private Integer speed;
public Integer getSpeed() {
return speed == null || speed <= 0 ? 1000 : speed;
}
}

View File

@ -1,8 +1,8 @@
package com.njzscloud.localizer.track.service;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.IdUtil;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.metadata.OrderItem;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
@ -12,37 +12,41 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njzscloud.common.core.ex.ExceptionMsg;
import com.njzscloud.common.core.thread.ThreadPool;
import com.njzscloud.common.core.tuple.Tuple2;
import com.njzscloud.common.core.tuple.Tuple3;
import com.njzscloud.common.core.utils.CoordinateConverter;
import com.njzscloud.common.core.utils.R;
import com.njzscloud.common.localizer.DeviceStore;
import com.njzscloud.common.localizer.Localizer;
import com.njzscloud.common.localizer.mqtt.result.RealtimeLocationResult;
import com.njzscloud.common.mp.support.PageParam;
import com.njzscloud.common.mp.support.PageResult;
import com.njzscloud.common.mqtt.support.MqttMsg;
import com.njzscloud.common.localizer.service.LocalizerService;
import com.njzscloud.common.ws.support.Websocket;
import com.njzscloud.common.ws.support.WsMsg;
import com.njzscloud.localizer.device.pojo.entity.DeviceEntity;
import com.njzscloud.localizer.device.service.DeviceService;
import com.njzscloud.localizer.track.mapper.TruckLocationTrackMapper;
import com.njzscloud.localizer.track.pojo.entity.TruckLocationTrackEntity;
import com.njzscloud.localizer.track.pojo.param.LocationTrackSearchParam;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
/**
*
*/
@Slf4j
@Service
public class TruckLocationTrackService extends ServiceImpl<TruckLocationTrackMapper, TruckLocationTrackEntity> implements IService<TruckLocationTrackEntity> {
@RequiredArgsConstructor
public class TruckLocationTrackService extends ServiceImpl<TruckLocationTrackMapper, TruckLocationTrackEntity> implements LocalizerService, IService<TruckLocationTrackEntity> {
private final DeviceService deviceService;
ThreadPoolExecutor threadPoolExecutor = ThreadPool.createThreadPool(
"GPS 数据",
10,
@ -52,417 +56,20 @@ public class TruckLocationTrackService extends ServiceImpl<TruckLocationTrackMap
1,
new ThreadPoolExecutor.CallerRunsPolicy()
);
HashMap<String, List<SseEmitter>> emitters = new HashMap<>();
ReentrantLock lock = new ReentrantLock();
List<Tuple3<String, String, String>> realtimeDataListener = new LinkedList<>();
ReentrantLock realtimeDataListenerLock = new ReentrantLock();
public static void send(SseEmitter emitter, String eventName, TruckLocationTrackEntity data) {
try {
emitter.send(SseEmitter.event()
.id(data.getId().toString())
.name(eventName)
.data(data));
} catch (IOException e) {
log.error("发送定位器历史记录失败", e);
}
}
public static void complete(SseEmitter emitter, String eventName) {
try {
emitter.send(SseEmitter.event()
.id(IdUtil.simpleUUID())
.name(eventName)
.data("complete"));
emitter.complete();
} catch (IOException e) {
log.error("发送定位器历史记录失败", e);
}
}
/**
*
*/
public void add(TruckLocationTrackEntity truckLocationTrackEntity) {
this.save(truckLocationTrackEntity);
}
/**
*
*/
public void modify(TruckLocationTrackEntity truckLocationTrackEntity) {
this.updateById(truckLocationTrackEntity);
}
/**
*
*/
@Transactional(rollbackFor = Exception.class)
public void del(List<Long> ids) {
this.removeBatchByIds(ids);
}
/**
*
*/
public TruckLocationTrackEntity detail(Long id) {
return this.getById(id);
}
/**
*
*/
public PageResult<TruckLocationTrackEntity> paging(PageParam pageParam, TruckLocationTrackEntity truckLocationTrackEntity) {
return PageResult.of(this.page(pageParam.toPage(), Wrappers.<TruckLocationTrackEntity>query(truckLocationTrackEntity)));
}
/* public SseEmitter history(Long orderId, LocalDateTime startTime, LocalDateTime endTime, Integer speed) {
SseEmitter emitter = new SseEmitter(0L);
// 提交异步任务
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
// 记录上一条数据的时间,用于计算间隔
LocalDateTime lastRecordTime = null;
int currentPage = 1;
// 分页查询每次500条
while (true) {
// 检查是否被取消
if (Thread.currentThread().isInterrupted()) {
log.info("任务被取消");
complete(emitter);
return;
}
Page<TruckLocationTrackEntity> page = new Page<>(currentPage, 500);
page.addOrder(OrderItem.asc("location_time"));
// 分页查询当前页数据
IPage<TruckLocationTrackEntity> resultPage = this.page(page, Wrappers.lambdaQuery(TruckLocationTrackEntity.class)
.eq(TruckLocationTrackEntity::getOrderId, orderId)
.between(TruckLocationTrackEntity::getLocationTime, startTime, endTime));
List<TruckLocationTrackEntity> records = resultPage.getRecords();
if (records.isEmpty()) {
// 没有更多数据,推送完成信息
complete(emitter);
break;
}
// 处理当前页数据,按时间间隔推送
for (TruckLocationTrackEntity record : records) {
if (Thread.currentThread().isInterrupted()) {
log.info("任务被取消");
complete(emitter);
return;
}
LocalDateTime currentTime = record.getLocationTime();
// 计算与上一条数据的时间间隔
if (lastRecordTime != null && currentTime != null) {
long interval = Duration.between(lastRecordTime, currentTime).toMillis();
if (interval > 0) {
// 按实际间隔等待
ThreadUtil.sleep(interval);
}
}
// 推送当前数据
send(emitter, "historyData", record);
// 更新上一条数据的时间
lastRecordTime = currentTime;
}
// 判断是否为最后一页
if (currentPage >= resultPage.getPages()) {
complete(emitter);
break;
}
// 继续查询下一页
currentPage++;
}
// 所有数据推送完成,关闭连接
complete(emitter);
} catch (Exception e) {
// 其他异常
emitter.completeWithError(e);
}
}, threadPoolExecutor);
// 注册连接关闭回调,中断任务
emitter.onCompletion(() -> {
log.info("连接关闭,取消任务");
future.cancel(true);
});
emitter.onTimeout(() -> {
log.info("连接超时,取消任务");
future.cancel(true);
});
emitter.onError((e) -> {
log.info("连接错误,取消任务");
future.cancel(true);
});
return emitter;
} */
public SseEmitter history(Long orderId, LocalDateTime startTime, LocalDateTime endTime, Integer speed) {
SseEmitter emitter = new SseEmitter(0L);
// 提交异步任务
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
int currentPage = 1;
// 分页查询每次500条
while (true) {
// 检查是否被取消
if (Thread.currentThread().isInterrupted()) {
log.info("任务被取消");
complete(emitter, "historyData");
return;
}
Page<TruckLocationTrackEntity> page = new Page<>(currentPage, 500);
page.addOrder(OrderItem.asc("location_time"));
// 分页查询当前页数据
IPage<TruckLocationTrackEntity> resultPage = this.page(page, Wrappers.lambdaQuery(TruckLocationTrackEntity.class)
.eq(TruckLocationTrackEntity::getOrderId, orderId)
.ge(startTime != null, TruckLocationTrackEntity::getLocationTime, startTime)
.le(endTime != null, TruckLocationTrackEntity::getLocationTime, endTime));
List<TruckLocationTrackEntity> records = resultPage.getRecords();
if (records.isEmpty()) {
// 没有更多数据,推送完成信息
complete(emitter, "historyData");
return;
}
// 处理当前页数据,按时间间隔推送
for (TruckLocationTrackEntity record : records) {
if (Thread.currentThread().isInterrupted()) {
log.info("任务被取消");
complete(emitter, "historyData");
return;
}
if (!ThreadUtil.sleep(speed * 1000)) {
log.info("任务被取消");
complete(emitter, "historyData");
return;
}
send(emitter, "historyData", record);
}
// 判断是否为最后一页
if (currentPage >= resultPage.getPages()) {
complete(emitter, "historyData");
return;
}
// 继续查询下一页
currentPage++;
}
} catch (Exception e) {
// 其他异常
emitter.completeWithError(e);
}
}, threadPoolExecutor);
// 注册连接关闭回调,中断任务
emitter.onCompletion(() -> {
log.info("连接关闭,取消任务");
future.cancel(true);
});
emitter.onTimeout(() -> {
log.info("连接超时,取消任务");
future.cancel(true);
});
emitter.onError((e) -> {
log.info("连接错误,取消任务");
future.cancel(true);
});
return emitter;
}
/*
public void cache(TruckLocationTrackEntity truckLocationTrackEntity) {
Long orderId = truckLocationTrackEntity.getOrderId();
lock.lock();
try {
List<TruckLocationTrackEntity> value = LOCATION_CACHE.computeIfAbsent(orderId, k -> new LinkedList<>());
if (value.size() >= 10) {
value.remove(0);
}
value.add(truckLocationTrackEntity);
} finally {
lock.unlock();
}
}
*/
public SseEmitter realtime(Long orderId) {
String gpsId = baseMapper.gpsId(orderId);
SseEmitter emitter = new SseEmitter(0L);
lock.lock();
List<SseEmitter> value = emitters.computeIfAbsent(gpsId, k -> new LinkedList<>());
value.add(emitter);
lock.unlock();
/* // 提交异步任务
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
// 记录上一条数据的时间,用于计算间隔
LocalDateTime lastRecordTime = null;
while (true) {
// 检查是否被取消
if (Thread.currentThread().isInterrupted()) {
log.info("任务被取消");
complete(emitter, "realtimeData");
return;
}
lock.lock();
List<TruckLocationTrackEntity> records = LOCATION_CACHE.get(orderId);
LOCATION_CACHE.remove(orderId);
lock.unlock();
if (CollUtil.isEmpty(records)) {
if (ThreadUtil.sleep(1000)) {
log.info("任务被取消");
complete(emitter, "realtimeData");
return;
}
continue;
}
// 处理当前页数据,按时间间隔推送
for (TruckLocationTrackEntity record : records) {
if (Thread.currentThread().isInterrupted()) {
log.info("任务被取消");
complete(emitter, "realtimeData");
return;
}
LocalDateTime currentTime = record.getLocationTime();
// 计算与上一条数据的时间间隔
if (lastRecordTime != null && currentTime != null) {
long interval = Duration.between(lastRecordTime, currentTime).toMillis();
if (interval > 0) {
// 按实际间隔等待
ThreadUtil.sleep(interval);
}
}
// 推送当前数据
send(emitter, "realtimeData", record);
// 更新上一条数据的时间
lastRecordTime = currentTime;
}
}
} catch (Exception e) {
// 其他异常
emitter.completeWithError(e);
}
}, threadPoolExecutor); */
// 注册连接关闭回调,中断任务
// 注册连接关闭回调,中断任务
emitter.onCompletion(() -> {
log.info("连接关闭,取消任务");
lock.lock();
List<SseEmitter> emitterList = emitters.get(gpsId);
emitterList.remove(emitter);
lock.unlock();
});
emitter.onTimeout(() -> {
log.info("连接超时,取消任务");
lock.lock();
List<SseEmitter> emitterList = emitters.get(gpsId);
emitterList.remove(emitter);
lock.unlock();
});
emitter.onError((e) -> {
log.info("连接错误,取消任务");
lock.lock();
List<SseEmitter> emitterList = emitters.get(gpsId);
emitterList.remove(emitter);
lock.unlock();
});
return emitter;
}
public void sendRealTimeData(MqttMsg msg) {
RealtimeLocationResult realtimeLocationResult = msg.getMsg(RealtimeLocationResult.class);
if (realtimeLocationResult != null) {
String gpsId = realtimeLocationResult.getTerminalId();
lock.lock();
List<SseEmitter> emitterList = emitters.get(gpsId);
lock.unlock();
if (CollUtil.isEmpty(emitterList)) {
// log.info("未找到GPS {} 的实时数据订阅者", gpsId);
return;
}
for (SseEmitter emitter : emitterList) {
send(emitter, "realtimeData", new TruckLocationTrackEntity()
.setId(IdUtil.getSnowflakeNextId())
.setTerminalId(gpsId)
.setLatitude(realtimeLocationResult.getLatitude())
.setLongitude(realtimeLocationResult.getLongitude())
.setAltitude(realtimeLocationResult.getAltitude())
.setSpeed(realtimeLocationResult.getSpeed())
.setLocationTime(LocalDateTime.parse(realtimeLocationResult.getTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")))
.setDirection(realtimeLocationResult.getDirection())
.setOverspeed(realtimeLocationResult.isOverspeed())
.setCompensate(realtimeLocationResult.getType() == 1));
}
/* List<Tuple3<String, String, String>> listeners;
realtimeDataListenerLock.lock();
try {
realtimeDataListener.removeIf(it -> !Websocket.isOnline(it.get_0(), it.get_1()));
listeners = realtimeDataListener.stream()
.filter(tuple3 -> tuple3.get_2().equals(gpsId))
.map(tuple3 -> Tuple3.create(tuple3.get_0(), tuple3.get_1(), tuple3.get_2()))
.collect(Collectors.toList());
} finally {
realtimeDataListenerLock.unlock();
}
for (Tuple3<String, String, String> tuple3 : listeners) {
String uid = tuple3.get_0();
String cid = tuple3.get_1();
Websocket.send(uid, cid, new WsMsg()
.setEvent("realtimeData")
.setUid(uid)
.setCid(cid)
.setData(new TruckLocationTrackEntity()
.setId(IdUtil.getSnowflakeNextId())
.setTerminalId(gpsId)
.setLatitude(realtimeLocationResult.getLatitude())
.setLongitude(realtimeLocationResult.getLongitude())
.setAltitude(realtimeLocationResult.getAltitude())
.setSpeed(realtimeLocationResult.getSpeed())
.setLocationTime(LocalDateTime.parse(realtimeLocationResult.getTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")))
.setDirection(realtimeLocationResult.getDirection())
.setOverspeed(realtimeLocationResult.isOverspeed())
.setCompensate(realtimeLocationResult.getType() == 1))
);
} */
}
}
@EventListener(ApplicationReadyEvent.class)
public void onApplicationReady() {
List<DeviceEntity> list = deviceService.list();
List<Localizer> localizers = list.stream().map(it -> BeanUtil.copyProperties(it, Localizer.class)).collect(Collectors.toList());
DeviceStore.setLocalizers(localizers);
Websocket.subscribe("up/truck_location_track/history", msg -> {
WsMsg.User from = msg.getFrom();
String cid = from.getCid();
LocationTrackHistoryParam param = msg.extractData(LocationTrackHistoryParam.class);
LocationTrackSearchParam param = msg.extractData(LocationTrackSearchParam.class);
// String terminalId = param.getTerminalId();
String licensePlate = param.getLicensePlate();
LocalDateTime startTime = param.getStartTime();
LocalDateTime endTime = param.getEndTime();
Long orderId = param.getOrderId();
Integer speed = param.getSpeed();
CompletableFuture.runAsync(() -> {
try {
@ -480,7 +87,7 @@ public class TruckLocationTrackService extends ServiceImpl<TruckLocationTrackMap
// 分页查询当前页数据
IPage<TruckLocationTrackEntity> resultPage = this.page(page, Wrappers.lambdaQuery(TruckLocationTrackEntity.class)
.eq(TruckLocationTrackEntity::getOrderId, orderId)
.eq(TruckLocationTrackEntity::getLicensePlate, licensePlate)
.ge(startTime != null, TruckLocationTrackEntity::getLocationTime, startTime)
.le(endTime != null, TruckLocationTrackEntity::getLocationTime, endTime));
@ -548,8 +155,9 @@ public class TruckLocationTrackService extends ServiceImpl<TruckLocationTrackMap
Websocket.subscribe("up/truck_location_track/realtime", msg -> {
WsMsg.User from = msg.getFrom();
String cid = from.getCid();
LocationTrackHistoryParam param = msg.extractData(LocationTrackHistoryParam.class);
Long orderId = param.getOrderId();
LocationTrackSearchParam param = msg.extractData(LocationTrackSearchParam.class);
// String terminalId = param.getTerminalId();
String licensePlate = param.getLicensePlate();
Integer speed = param.getSpeed();
CompletableFuture.runAsync(() -> {
try {
@ -567,13 +175,13 @@ public class TruckLocationTrackService extends ServiceImpl<TruckLocationTrackMap
// 分页查询当前页数据
IPage<TruckLocationTrackEntity> resultPage = this.page(page, Wrappers.lambdaQuery(TruckLocationTrackEntity.class)
.eq(TruckLocationTrackEntity::getOrderId, orderId)
.eq(TruckLocationTrackEntity::getLicensePlate, licensePlate)
.gt(TruckLocationTrackEntity::getLocationTime, startTime));
List<TruckLocationTrackEntity> records = resultPage.getRecords();
if (records.isEmpty()) {
if (errCount >= 200) {
log.error("暂无实时数据, 订单ID: {}, 时间: {}", orderId, startTime);
log.error("暂无实时数据, 车牌号: {}, 时间: {}", licensePlate, startTime);
Websocket.publish(new WsMsg()
.setEvent("down/truck_location_track/realtime")
.setData(R.failed(ExceptionMsg.SYS_ERR_MSG, "暂无实时数据"))
@ -628,262 +236,44 @@ public class TruckLocationTrackService extends ServiceImpl<TruckLocationTrackMap
log.error("查询历史轨迹异常", e);
}
}, threadPoolExecutor);
/* CompletableFuture.runAsync(() -> {
while (true) {
ThreadUtil.sleep(3000);
realtimeDataListenerLock.lock();
try {
realtimeDataListener.removeIf(it -> !Websocket.online(it.get_0(), it.get_1()));
} finally {
realtimeDataListenerLock.unlock();
}
List<Tuple3<String, String, String>> listeners = realtimeDataListener;
if (listeners.isEmpty()) {
return;
}
for (Tuple3<String, String, String> tuple3 : listeners) {
if (!Websocket.online(uid, cid)) {
return;
}
Websocket.send(uid, cid, new WsMsg()
.setEvent("realtimeData")
.setData(R.success(new TruckLocationTrackEntity()))
);
}
}
}, threadPoolExecutor); */
/* CompletableFuture.runAsync(() -> {
try {
int currentPage = 1;
// 分页查询每次500条
while (true) {
// 检查是否被取消
if (Thread.currentThread().isInterrupted()) {
log.info("任务被取消");
return;
}
Page<TruckLocationTrackEntity> page = new Page<>(currentPage, 500);
page.addOrder(OrderItem.asc("location_time"));
LocalDateTime now = LocalDateTime.now();
// 分页查询当前页数据
IPage<TruckLocationTrackEntity> resultPage = this.page(page, Wrappers.lambdaQuery(TruckLocationTrackEntity.class)
.eq(TruckLocationTrackEntity::getOrderId, orderId)
.ge(TruckLocationTrackEntity::getLocationTime, now));
List<TruckLocationTrackEntity> records = resultPage.getRecords();
if (records.isEmpty()) {
continue;
}
// 处理当前页数据,按时间间隔推送
for (TruckLocationTrackEntity record : records) {
if (Thread.currentThread().isInterrupted()) {
log.info("任务被取消");
return;
}
if (!ThreadUtil.sleep(1000)) {
log.info("任务被取消");
return;
}
Websocket.send(msg.getUid(), msg.getCid(), new WsMsg()
.setEvent("historyData")
.setUid(msg.getUid())
.setCid(msg.getCid())
.setData(record)
);
}
// 判断是否为最后一页
if (currentPage >= resultPage.getPages()) {
continue;
}
// 继续查询下一页
currentPage++;
}
} catch (Exception e) {
// 其他异常
log.error("查询历史轨迹异常", e);
}
}, threadPoolExecutor); */
});
/* Websocket.addListener("/truck_location_track/history", msg -> {
LocationTrackHistoryParam param = msg.extractData(LocationTrackHistoryParam.class);
LocalDateTime startTime = param.getStartTime();
LocalDateTime endTime = param.getEndTime();
Long orderId = param.getOrderId();
Integer speed = param.getSpeed();
CompletableFuture.runAsync(() -> {
try {
int currentPage = 1;
// 分页查询每次500条
while (true) {
// 检查是否被取消
if (Thread.currentThread().isInterrupted()) {
log.info("任务被取消");
return;
}
Page<TruckLocationTrackEntity> page = new Page<>(currentPage, 500);
page.addOrder(OrderItem.asc("location_time"));
/* SXJ070 12082962846
// 分页查询当前页数据
IPage<TruckLocationTrackEntity> resultPage = this.page(page, Wrappers.lambdaQuery(TruckLocationTrackEntity.class)
.eq(TruckLocationTrackEntity::getOrderId, orderId)
.ge(startTime != null, TruckLocationTrackEntity::getLocationTime, startTime)
.le(endTime != null, TruckLocationTrackEntity::getLocationTime, endTime));
SB070Z 12083145045
List<TruckLocationTrackEntity> records = resultPage.getRecords();
if (records.isEmpty()) {
return;
}
SYV011 12082915117
// 处理当前页数据,按时间间隔推送
for (TruckLocationTrackEntity record : records) {
if (Thread.currentThread().isInterrupted()) {
log.info("任务被取消");
return;
}
if (speed > 100) {
if (!ThreadUtil.sleep(speed)) {
log.info("任务被取消");
return;
}
}
String uid = msg.getUid();
String cid = msg.getCid();
if (!Websocket.isOnline(uid, cid)) {
return;
}
Websocket.send(uid, cid, new WsMsg()
.setEvent("historyData")
.setData(R.success(record))
);
}
// 判断是否为最后一页
if (currentPage >= resultPage.getPages()) {
return;
}
// 继续查询下一页
currentPage++;
}
} catch (Exception e) {
// 其他异常
log.error("查询历史轨迹异常", e);
}
}, threadPoolExecutor);
}); */
/* Websocket.addListener("/truck_location_track/realtime", msg -> {
LocationTrackHistoryParam param = msg.extractData(LocationTrackHistoryParam.class);
Long orderId = param.getOrderId();
String uid = msg.getUid();
String cid = msg.getCid();
String gpsId = baseMapper.gpsId(orderId);
realtimeDataListenerLock.lock();
try {
realtimeDataListener.add(Tuple3.create(uid, cid, gpsId));
} finally {
realtimeDataListenerLock.unlock();
}
*//* CompletableFuture.runAsync(() -> {
while (true) {
ThreadUtil.sleep(3000);
realtimeDataListenerLock.lock();
try {
realtimeDataListener.removeIf(it -> !Websocket.online(it.get_0(), it.get_1()));
} finally {
realtimeDataListenerLock.unlock();
}
List<Tuple3<String, String, String>> listeners = realtimeDataListener;
if (listeners.isEmpty()) {
return;
}
for (Tuple3<String, String, String> tuple3 : listeners) {
if (!Websocket.online(uid, cid)) {
return;
}
Websocket.send(uid, cid, new WsMsg()
.setEvent("realtimeData")
.setData(R.success(new TruckLocationTrackEntity()))
);
}
}
}, threadPoolExecutor); *//*
*//* CompletableFuture.runAsync(() -> {
try {
int currentPage = 1;
// 分页查询每次500条
while (true) {
// 检查是否被取消
if (Thread.currentThread().isInterrupted()) {
log.info("任务被取消");
return;
}
Page<TruckLocationTrackEntity> page = new Page<>(currentPage, 500);
page.addOrder(OrderItem.asc("location_time"));
LocalDateTime now = LocalDateTime.now();
// 分页查询当前页数据
IPage<TruckLocationTrackEntity> resultPage = this.page(page, Wrappers.lambdaQuery(TruckLocationTrackEntity.class)
.eq(TruckLocationTrackEntity::getOrderId, orderId)
.ge(TruckLocationTrackEntity::getLocationTime, now));
List<TruckLocationTrackEntity> records = resultPage.getRecords();
if (records.isEmpty()) {
continue;
}
// 处理当前页数据,按时间间隔推送
for (TruckLocationTrackEntity record : records) {
if (Thread.currentThread().isInterrupted()) {
log.info("任务被取消");
return;
}
if (!ThreadUtil.sleep(1000)) {
log.info("任务被取消");
return;
}
Websocket.send(msg.getUid(), msg.getCid(), new WsMsg()
.setEvent("historyData")
.setUid(msg.getUid())
.setCid(msg.getCid())
.setData(record)
);
}
// 判断是否为最后一页
if (currentPage >= resultPage.getPages()) {
continue;
}
// 继续查询下一页
currentPage++;
}
} catch (Exception e) {
// 其他异常
log.error("查询历史轨迹异常", e);
}
}, threadPoolExecutor); *//*
}); */
SYE909 12082855453 */
}
@Override
@Transactional(rollbackFor = Exception.class)
public void save(List<RealtimeLocationResult> dataList) {
for (RealtimeLocationResult realtimeLocationResult : dataList) {
if (realtimeLocationResult == null) {
return;
}
String time = realtimeLocationResult.getTime();
double latitude = realtimeLocationResult.getLatitude();
double longitude = realtimeLocationResult.getLongitude();
if (latitude <= 0 && longitude <= 0) {
return;
}
TruckLocationTrackEntity entity = new TruckLocationTrackEntity()
.setLicensePlate(realtimeLocationResult.getLicensePlate())
.setTerminalId(realtimeLocationResult.getTerminalId())
.setLatitude(latitude)
.setLongitude(longitude)
.setAltitude(realtimeLocationResult.getAltitude())
.setSpeed(realtimeLocationResult.getSpeed())
.setLocationTime(DateUtil.parseLocalDateTime(time))
.setDirection(realtimeLocationResult.getDirection())
.setOverspeed(realtimeLocationResult.isOverspeed())
.setCompensate(realtimeLocationResult.getType() == 1);
this.save(entity);
}
}
}