localizer
parent
4b0c7627fe
commit
53176657d1
|
|
@ -13,7 +13,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@NoArgsConstructor(access = AccessLevel.PRIVATE)
|
@NoArgsConstructor(access = AccessLevel.PRIVATE)
|
||||||
public final class Htzj {
|
public final class Htzj {
|
||||||
private static HtzjListeners htzjListeners = new HtzjListeners();
|
private static HtzjListeners htzjListeners;
|
||||||
|
|
||||||
public static void init() {
|
public static void init() {
|
||||||
htzjListeners = new HtzjListeners();
|
htzjListeners = new HtzjListeners();
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,13 @@
|
||||||
|
package com.njzscloud.common.localizer.service;
|
||||||
|
|
||||||
|
|
||||||
|
import com.njzscloud.common.localizer.mqtt.result.RealtimeLocationResult;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public interface LocalizerService {
|
||||||
|
void save(List<RealtimeLocationResult> data);
|
||||||
|
|
||||||
|
List<RealtimeLocationResult> search(LocalDateTime start, LocalDateTime end);
|
||||||
|
}
|
||||||
|
|
@ -21,6 +21,15 @@
|
||||||
<artifactId>njzscloud-common-core</artifactId>
|
<artifactId>njzscloud-common-core</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.njzscloud</groupId>
|
||||||
|
<artifactId>njzscloud-common-mvc</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.njzscloud</groupId>
|
||||||
|
<artifactId>njzscloud-common-mp</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.njzscloud</groupId>
|
<groupId>com.njzscloud</groupId>
|
||||||
<artifactId>njzscloud-common-localizer</artifactId>
|
<artifactId>njzscloud-common-localizer</artifactId>
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,97 @@
|
||||||
|
package com.njzscloud.localizer.track.controller;
|
||||||
|
|
||||||
|
import com.njzscloud.common.core.utils.R;
|
||||||
|
import com.njzscloud.common.mp.support.PageParam;
|
||||||
|
import com.njzscloud.common.mp.support.PageResult;
|
||||||
|
import com.njzscloud.localizer.track.pojo.entity.TruckLocationTrackEntity;
|
||||||
|
import com.njzscloud.localizer.track.service.TruckLocationTrackService;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.web.bind.annotation.*;
|
||||||
|
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 车辆定位数据
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@CrossOrigin
|
||||||
|
@RestController
|
||||||
|
@RequestMapping("/truck_location_track")
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class TruckLocationTrackController {
|
||||||
|
|
||||||
|
private final TruckLocationTrackService truckLocationTrackService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 新增
|
||||||
|
*/
|
||||||
|
@PostMapping("/add")
|
||||||
|
public R<?> add(@RequestBody TruckLocationTrackEntity truckLocationTrackEntity) {
|
||||||
|
truckLocationTrackService.add(truckLocationTrackEntity);
|
||||||
|
return R.success();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 修改
|
||||||
|
*/
|
||||||
|
@PostMapping("/modify")
|
||||||
|
public R<?> modify(@RequestBody TruckLocationTrackEntity truckLocationTrackEntity) {
|
||||||
|
truckLocationTrackService.modify(truckLocationTrackEntity);
|
||||||
|
return R.success();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 删除
|
||||||
|
*/
|
||||||
|
@PostMapping("/del")
|
||||||
|
public R<?> del(@RequestBody List<Long> ids) {
|
||||||
|
truckLocationTrackService.del(ids);
|
||||||
|
return R.success();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 详情
|
||||||
|
*/
|
||||||
|
@GetMapping("/detail")
|
||||||
|
public R<TruckLocationTrackEntity> detail(@RequestParam Long id) {
|
||||||
|
return R.success(truckLocationTrackService.detail(id));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 分页查询
|
||||||
|
*/
|
||||||
|
@GetMapping("/paging")
|
||||||
|
public R<PageResult<TruckLocationTrackEntity>> paging(PageParam pageParam, TruckLocationTrackEntity truckLocationTrackEntity) {
|
||||||
|
return R.success(truckLocationTrackService.paging(pageParam, truckLocationTrackEntity));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 历史记录
|
||||||
|
*/
|
||||||
|
@GetMapping("/history")
|
||||||
|
public SseEmitter history(@RequestParam("orderId") Long orderId,
|
||||||
|
@RequestParam(value = "startTime", required = false) LocalDateTime startTime,
|
||||||
|
@RequestParam(value = "endTime", required = false) LocalDateTime endTime,
|
||||||
|
@RequestParam(value = "speed", defaultValue = "1") Integer speed
|
||||||
|
) {
|
||||||
|
if (speed <= 0) {
|
||||||
|
speed = 1;
|
||||||
|
} else if (speed > 60) {
|
||||||
|
speed = 60;
|
||||||
|
}
|
||||||
|
return truckLocationTrackService.history(orderId, startTime, endTime, speed);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 实时数据
|
||||||
|
*/
|
||||||
|
@GetMapping("/realtime")
|
||||||
|
public SseEmitter realtime(@RequestParam("orderId") Long orderId) {
|
||||||
|
return truckLocationTrackService.realtime(orderId);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,21 @@
|
||||||
|
package com.njzscloud.localizer.track.mapper;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||||
|
import com.njzscloud.localizer.track.pojo.entity.TruckLocationTrackEntity;
|
||||||
|
import org.apache.ibatis.annotations.Mapper;
|
||||||
|
import org.apache.ibatis.annotations.Param;
|
||||||
|
import org.apache.ibatis.annotations.Select;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 车辆定位数据
|
||||||
|
*/
|
||||||
|
@Mapper
|
||||||
|
public interface TruckLocationTrackMapper extends BaseMapper<TruckLocationTrackEntity> {
|
||||||
|
|
||||||
|
@Select("SELECT b.gps\n" +
|
||||||
|
"FROM order_info a\n" +
|
||||||
|
" INNER JOIN biz_truck b ON b.id = a.truck_id AND b.deleted = 0\n" +
|
||||||
|
"WHERE a.id = #{orderId}\n" +
|
||||||
|
" AND a.deleted = 0")
|
||||||
|
String gpsId(@Param("orderId") Long orderId);
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,87 @@
|
||||||
|
package com.njzscloud.localizer.track.pojo.entity;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.annotation.*;
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.Setter;
|
||||||
|
import lombok.ToString;
|
||||||
|
import lombok.experimental.Accessors;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 车辆定位数据
|
||||||
|
*/
|
||||||
|
@Getter
|
||||||
|
@Setter
|
||||||
|
@ToString
|
||||||
|
@Accessors(chain = true)
|
||||||
|
@TableName("truck_location_track")
|
||||||
|
public class TruckLocationTrackEntity {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Id
|
||||||
|
*/
|
||||||
|
@TableId(type = IdType.ASSIGN_ID)
|
||||||
|
private Long id;
|
||||||
|
/**
|
||||||
|
* 订单 Id
|
||||||
|
*/
|
||||||
|
private Long orderId;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 车辆 Id
|
||||||
|
*/
|
||||||
|
private Long truckId;
|
||||||
|
/**
|
||||||
|
* 设备号
|
||||||
|
*/
|
||||||
|
private String terminalId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 经度; 单位:度
|
||||||
|
*/
|
||||||
|
private Double longitude;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 纬度; 单位:度
|
||||||
|
*/
|
||||||
|
private Double latitude;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 海拔; 米
|
||||||
|
*/
|
||||||
|
private Integer altitude;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 速度; 千米/小时
|
||||||
|
*/
|
||||||
|
private Double speed;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 定位时间
|
||||||
|
*/
|
||||||
|
private LocalDateTime locationTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 方向; 正北为 0 度
|
||||||
|
*/
|
||||||
|
private Integer direction;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 是否超速; 0-->否、1-->是
|
||||||
|
*/
|
||||||
|
private Boolean overspeed;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 是否为补偿数据; 0-->否、1-->是
|
||||||
|
*/
|
||||||
|
private Boolean compensate;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建时间
|
||||||
|
*/
|
||||||
|
@TableField(fill = FieldFill.INSERT)
|
||||||
|
private LocalDateTime createTime;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,889 @@
|
||||||
|
package com.njzscloud.localizer.track.service;
|
||||||
|
|
||||||
|
import cn.hutool.core.collection.CollUtil;
|
||||||
|
import cn.hutool.core.thread.ThreadUtil;
|
||||||
|
import cn.hutool.core.util.IdUtil;
|
||||||
|
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||||
|
import com.baomidou.mybatisplus.core.metadata.OrderItem;
|
||||||
|
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
||||||
|
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||||
|
import com.baomidou.mybatisplus.extension.service.IService;
|
||||||
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||||
|
import com.njzscloud.common.core.ex.ExceptionMsg;
|
||||||
|
import com.njzscloud.common.core.thread.ThreadPool;
|
||||||
|
import com.njzscloud.common.core.tuple.Tuple2;
|
||||||
|
import com.njzscloud.common.core.tuple.Tuple3;
|
||||||
|
import com.njzscloud.common.core.utils.CoordinateConverter;
|
||||||
|
import com.njzscloud.common.core.utils.R;
|
||||||
|
import com.njzscloud.common.localizer.mqtt.result.RealtimeLocationResult;
|
||||||
|
import com.njzscloud.common.mp.support.PageParam;
|
||||||
|
import com.njzscloud.common.mp.support.PageResult;
|
||||||
|
import com.njzscloud.common.mqtt.support.MqttMsg;
|
||||||
|
import com.njzscloud.localizer.track.mapper.TruckLocationTrackMapper;
|
||||||
|
import com.njzscloud.localizer.track.pojo.entity.TruckLocationTrackEntity;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 车辆定位数据
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Service
|
||||||
|
public class TruckLocationTrackService extends ServiceImpl<TruckLocationTrackMapper, TruckLocationTrackEntity> implements IService<TruckLocationTrackEntity> {
|
||||||
|
ThreadPoolExecutor threadPoolExecutor = ThreadPool.createThreadPool(
|
||||||
|
"GPS 数据",
|
||||||
|
10,
|
||||||
|
200,
|
||||||
|
60,
|
||||||
|
1,
|
||||||
|
1,
|
||||||
|
new ThreadPoolExecutor.CallerRunsPolicy()
|
||||||
|
);
|
||||||
|
HashMap<String, List<SseEmitter>> emitters = new HashMap<>();
|
||||||
|
ReentrantLock lock = new ReentrantLock();
|
||||||
|
List<Tuple3<String, String, String>> realtimeDataListener = new LinkedList<>();
|
||||||
|
ReentrantLock realtimeDataListenerLock = new ReentrantLock();
|
||||||
|
|
||||||
|
|
||||||
|
public static void send(SseEmitter emitter, String eventName, TruckLocationTrackEntity data) {
|
||||||
|
try {
|
||||||
|
emitter.send(SseEmitter.event()
|
||||||
|
.id(data.getId().toString())
|
||||||
|
.name(eventName)
|
||||||
|
.data(data));
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.error("发送定位器历史记录失败", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void complete(SseEmitter emitter, String eventName) {
|
||||||
|
try {
|
||||||
|
emitter.send(SseEmitter.event()
|
||||||
|
.id(IdUtil.simpleUUID())
|
||||||
|
.name(eventName)
|
||||||
|
.data("complete"));
|
||||||
|
emitter.complete();
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.error("发送定位器历史记录失败", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 新增
|
||||||
|
*/
|
||||||
|
public void add(TruckLocationTrackEntity truckLocationTrackEntity) {
|
||||||
|
this.save(truckLocationTrackEntity);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 修改
|
||||||
|
*/
|
||||||
|
public void modify(TruckLocationTrackEntity truckLocationTrackEntity) {
|
||||||
|
this.updateById(truckLocationTrackEntity);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 删除
|
||||||
|
*/
|
||||||
|
@Transactional(rollbackFor = Exception.class)
|
||||||
|
public void del(List<Long> ids) {
|
||||||
|
this.removeBatchByIds(ids);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 详情
|
||||||
|
*/
|
||||||
|
public TruckLocationTrackEntity detail(Long id) {
|
||||||
|
return this.getById(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 分页查询
|
||||||
|
*/
|
||||||
|
public PageResult<TruckLocationTrackEntity> paging(PageParam pageParam, TruckLocationTrackEntity truckLocationTrackEntity) {
|
||||||
|
return PageResult.of(this.page(pageParam.toPage(), Wrappers.<TruckLocationTrackEntity>query(truckLocationTrackEntity)));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* public SseEmitter history(Long orderId, LocalDateTime startTime, LocalDateTime endTime, Integer speed) {
|
||||||
|
SseEmitter emitter = new SseEmitter(0L);
|
||||||
|
|
||||||
|
// 提交异步任务
|
||||||
|
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
|
||||||
|
try {
|
||||||
|
// 记录上一条数据的时间,用于计算间隔
|
||||||
|
LocalDateTime lastRecordTime = null;
|
||||||
|
int currentPage = 1;
|
||||||
|
// 分页查询,每次500条
|
||||||
|
while (true) {
|
||||||
|
// 检查是否被取消
|
||||||
|
if (Thread.currentThread().isInterrupted()) {
|
||||||
|
log.info("任务被取消");
|
||||||
|
complete(emitter);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Page<TruckLocationTrackEntity> page = new Page<>(currentPage, 500);
|
||||||
|
page.addOrder(OrderItem.asc("location_time"));
|
||||||
|
|
||||||
|
// 分页查询当前页数据
|
||||||
|
IPage<TruckLocationTrackEntity> resultPage = this.page(page, Wrappers.lambdaQuery(TruckLocationTrackEntity.class)
|
||||||
|
.eq(TruckLocationTrackEntity::getOrderId, orderId)
|
||||||
|
.between(TruckLocationTrackEntity::getLocationTime, startTime, endTime));
|
||||||
|
|
||||||
|
List<TruckLocationTrackEntity> records = resultPage.getRecords();
|
||||||
|
if (records.isEmpty()) {
|
||||||
|
// 没有更多数据,推送完成信息
|
||||||
|
complete(emitter);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 处理当前页数据,按时间间隔推送
|
||||||
|
for (TruckLocationTrackEntity record : records) {
|
||||||
|
if (Thread.currentThread().isInterrupted()) {
|
||||||
|
log.info("任务被取消");
|
||||||
|
complete(emitter);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
LocalDateTime currentTime = record.getLocationTime();
|
||||||
|
// 计算与上一条数据的时间间隔
|
||||||
|
if (lastRecordTime != null && currentTime != null) {
|
||||||
|
long interval = Duration.between(lastRecordTime, currentTime).toMillis();
|
||||||
|
if (interval > 0) {
|
||||||
|
// 按实际间隔等待
|
||||||
|
ThreadUtil.sleep(interval);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 推送当前数据
|
||||||
|
send(emitter, "historyData", record);
|
||||||
|
|
||||||
|
// 更新上一条数据的时间
|
||||||
|
lastRecordTime = currentTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 判断是否为最后一页
|
||||||
|
if (currentPage >= resultPage.getPages()) {
|
||||||
|
complete(emitter);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 继续查询下一页
|
||||||
|
currentPage++;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 所有数据推送完成,关闭连接
|
||||||
|
complete(emitter);
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
// 其他异常
|
||||||
|
emitter.completeWithError(e);
|
||||||
|
}
|
||||||
|
}, threadPoolExecutor);
|
||||||
|
|
||||||
|
// 注册连接关闭回调,中断任务
|
||||||
|
emitter.onCompletion(() -> {
|
||||||
|
log.info("连接关闭,取消任务");
|
||||||
|
future.cancel(true);
|
||||||
|
});
|
||||||
|
emitter.onTimeout(() -> {
|
||||||
|
log.info("连接超时,取消任务");
|
||||||
|
future.cancel(true);
|
||||||
|
});
|
||||||
|
emitter.onError((e) -> {
|
||||||
|
log.info("连接错误,取消任务");
|
||||||
|
future.cancel(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
return emitter;
|
||||||
|
} */
|
||||||
|
public SseEmitter history(Long orderId, LocalDateTime startTime, LocalDateTime endTime, Integer speed) {
|
||||||
|
SseEmitter emitter = new SseEmitter(0L);
|
||||||
|
|
||||||
|
// 提交异步任务
|
||||||
|
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
|
||||||
|
try {
|
||||||
|
int currentPage = 1;
|
||||||
|
// 分页查询,每次500条
|
||||||
|
while (true) {
|
||||||
|
// 检查是否被取消
|
||||||
|
if (Thread.currentThread().isInterrupted()) {
|
||||||
|
log.info("任务被取消");
|
||||||
|
complete(emitter, "historyData");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Page<TruckLocationTrackEntity> page = new Page<>(currentPage, 500);
|
||||||
|
page.addOrder(OrderItem.asc("location_time"));
|
||||||
|
|
||||||
|
// 分页查询当前页数据
|
||||||
|
IPage<TruckLocationTrackEntity> resultPage = this.page(page, Wrappers.lambdaQuery(TruckLocationTrackEntity.class)
|
||||||
|
.eq(TruckLocationTrackEntity::getOrderId, orderId)
|
||||||
|
.ge(startTime != null, TruckLocationTrackEntity::getLocationTime, startTime)
|
||||||
|
.le(endTime != null, TruckLocationTrackEntity::getLocationTime, endTime));
|
||||||
|
|
||||||
|
List<TruckLocationTrackEntity> records = resultPage.getRecords();
|
||||||
|
if (records.isEmpty()) {
|
||||||
|
// 没有更多数据,推送完成信息
|
||||||
|
complete(emitter, "historyData");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 处理当前页数据,按时间间隔推送
|
||||||
|
for (TruckLocationTrackEntity record : records) {
|
||||||
|
if (Thread.currentThread().isInterrupted()) {
|
||||||
|
log.info("任务被取消");
|
||||||
|
complete(emitter, "historyData");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!ThreadUtil.sleep(speed * 1000)) {
|
||||||
|
log.info("任务被取消");
|
||||||
|
complete(emitter, "historyData");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
send(emitter, "historyData", record);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 判断是否为最后一页
|
||||||
|
if (currentPage >= resultPage.getPages()) {
|
||||||
|
complete(emitter, "historyData");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 继续查询下一页
|
||||||
|
currentPage++;
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
// 其他异常
|
||||||
|
emitter.completeWithError(e);
|
||||||
|
}
|
||||||
|
}, threadPoolExecutor);
|
||||||
|
|
||||||
|
// 注册连接关闭回调,中断任务
|
||||||
|
emitter.onCompletion(() -> {
|
||||||
|
log.info("连接关闭,取消任务");
|
||||||
|
future.cancel(true);
|
||||||
|
});
|
||||||
|
emitter.onTimeout(() -> {
|
||||||
|
log.info("连接超时,取消任务");
|
||||||
|
future.cancel(true);
|
||||||
|
});
|
||||||
|
emitter.onError((e) -> {
|
||||||
|
log.info("连接错误,取消任务");
|
||||||
|
future.cancel(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
return emitter;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
public void cache(TruckLocationTrackEntity truckLocationTrackEntity) {
|
||||||
|
Long orderId = truckLocationTrackEntity.getOrderId();
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
List<TruckLocationTrackEntity> value = LOCATION_CACHE.computeIfAbsent(orderId, k -> new LinkedList<>());
|
||||||
|
if (value.size() >= 10) {
|
||||||
|
value.remove(0);
|
||||||
|
}
|
||||||
|
value.add(truckLocationTrackEntity);
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
public SseEmitter realtime(Long orderId) {
|
||||||
|
String gpsId = baseMapper.gpsId(orderId);
|
||||||
|
SseEmitter emitter = new SseEmitter(0L);
|
||||||
|
lock.lock();
|
||||||
|
List<SseEmitter> value = emitters.computeIfAbsent(gpsId, k -> new LinkedList<>());
|
||||||
|
value.add(emitter);
|
||||||
|
lock.unlock();
|
||||||
|
|
||||||
|
/* // 提交异步任务
|
||||||
|
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
|
||||||
|
try {
|
||||||
|
// 记录上一条数据的时间,用于计算间隔
|
||||||
|
LocalDateTime lastRecordTime = null;
|
||||||
|
while (true) {
|
||||||
|
// 检查是否被取消
|
||||||
|
if (Thread.currentThread().isInterrupted()) {
|
||||||
|
log.info("任务被取消");
|
||||||
|
complete(emitter, "realtimeData");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
lock.lock();
|
||||||
|
List<TruckLocationTrackEntity> records = LOCATION_CACHE.get(orderId);
|
||||||
|
LOCATION_CACHE.remove(orderId);
|
||||||
|
lock.unlock();
|
||||||
|
if (CollUtil.isEmpty(records)) {
|
||||||
|
if (ThreadUtil.sleep(1000)) {
|
||||||
|
log.info("任务被取消");
|
||||||
|
complete(emitter, "realtimeData");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// 处理当前页数据,按时间间隔推送
|
||||||
|
for (TruckLocationTrackEntity record : records) {
|
||||||
|
if (Thread.currentThread().isInterrupted()) {
|
||||||
|
log.info("任务被取消");
|
||||||
|
complete(emitter, "realtimeData");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
LocalDateTime currentTime = record.getLocationTime();
|
||||||
|
// 计算与上一条数据的时间间隔
|
||||||
|
if (lastRecordTime != null && currentTime != null) {
|
||||||
|
long interval = Duration.between(lastRecordTime, currentTime).toMillis();
|
||||||
|
if (interval > 0) {
|
||||||
|
// 按实际间隔等待
|
||||||
|
ThreadUtil.sleep(interval);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 推送当前数据
|
||||||
|
send(emitter, "realtimeData", record);
|
||||||
|
|
||||||
|
// 更新上一条数据的时间
|
||||||
|
lastRecordTime = currentTime;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
// 其他异常
|
||||||
|
emitter.completeWithError(e);
|
||||||
|
}
|
||||||
|
}, threadPoolExecutor); */
|
||||||
|
|
||||||
|
// 注册连接关闭回调,中断任务
|
||||||
|
// 注册连接关闭回调,中断任务
|
||||||
|
emitter.onCompletion(() -> {
|
||||||
|
log.info("连接关闭,取消任务");
|
||||||
|
lock.lock();
|
||||||
|
List<SseEmitter> emitterList = emitters.get(gpsId);
|
||||||
|
emitterList.remove(emitter);
|
||||||
|
lock.unlock();
|
||||||
|
});
|
||||||
|
emitter.onTimeout(() -> {
|
||||||
|
log.info("连接超时,取消任务");
|
||||||
|
lock.lock();
|
||||||
|
List<SseEmitter> emitterList = emitters.get(gpsId);
|
||||||
|
emitterList.remove(emitter);
|
||||||
|
lock.unlock();
|
||||||
|
});
|
||||||
|
emitter.onError((e) -> {
|
||||||
|
log.info("连接错误,取消任务");
|
||||||
|
lock.lock();
|
||||||
|
List<SseEmitter> emitterList = emitters.get(gpsId);
|
||||||
|
emitterList.remove(emitter);
|
||||||
|
lock.unlock();
|
||||||
|
});
|
||||||
|
|
||||||
|
return emitter;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void sendRealTimeData(MqttMsg msg) {
|
||||||
|
RealtimeLocationResult realtimeLocationResult = msg.getMsg(RealtimeLocationResult.class);
|
||||||
|
if (realtimeLocationResult != null) {
|
||||||
|
String gpsId = realtimeLocationResult.getTerminalId();
|
||||||
|
lock.lock();
|
||||||
|
List<SseEmitter> emitterList = emitters.get(gpsId);
|
||||||
|
lock.unlock();
|
||||||
|
if (CollUtil.isEmpty(emitterList)) {
|
||||||
|
// log.info("未找到GPS {} 的实时数据订阅者", gpsId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (SseEmitter emitter : emitterList) {
|
||||||
|
send(emitter, "realtimeData", new TruckLocationTrackEntity()
|
||||||
|
.setId(IdUtil.getSnowflakeNextId())
|
||||||
|
.setTerminalId(gpsId)
|
||||||
|
.setLatitude(realtimeLocationResult.getLatitude())
|
||||||
|
.setLongitude(realtimeLocationResult.getLongitude())
|
||||||
|
.setAltitude(realtimeLocationResult.getAltitude())
|
||||||
|
.setSpeed(realtimeLocationResult.getSpeed())
|
||||||
|
.setLocationTime(LocalDateTime.parse(realtimeLocationResult.getTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")))
|
||||||
|
.setDirection(realtimeLocationResult.getDirection())
|
||||||
|
.setOverspeed(realtimeLocationResult.isOverspeed())
|
||||||
|
.setCompensate(realtimeLocationResult.getType() == 1));
|
||||||
|
}
|
||||||
|
/* List<Tuple3<String, String, String>> listeners;
|
||||||
|
realtimeDataListenerLock.lock();
|
||||||
|
try {
|
||||||
|
realtimeDataListener.removeIf(it -> !Websocket.isOnline(it.get_0(), it.get_1()));
|
||||||
|
listeners = realtimeDataListener.stream()
|
||||||
|
.filter(tuple3 -> tuple3.get_2().equals(gpsId))
|
||||||
|
.map(tuple3 -> Tuple3.create(tuple3.get_0(), tuple3.get_1(), tuple3.get_2()))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
} finally {
|
||||||
|
realtimeDataListenerLock.unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Tuple3<String, String, String> tuple3 : listeners) {
|
||||||
|
String uid = tuple3.get_0();
|
||||||
|
String cid = tuple3.get_1();
|
||||||
|
Websocket.send(uid, cid, new WsMsg()
|
||||||
|
.setEvent("realtimeData")
|
||||||
|
.setUid(uid)
|
||||||
|
.setCid(cid)
|
||||||
|
.setData(new TruckLocationTrackEntity()
|
||||||
|
.setId(IdUtil.getSnowflakeNextId())
|
||||||
|
.setTerminalId(gpsId)
|
||||||
|
.setLatitude(realtimeLocationResult.getLatitude())
|
||||||
|
.setLongitude(realtimeLocationResult.getLongitude())
|
||||||
|
.setAltitude(realtimeLocationResult.getAltitude())
|
||||||
|
.setSpeed(realtimeLocationResult.getSpeed())
|
||||||
|
.setLocationTime(LocalDateTime.parse(realtimeLocationResult.getTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")))
|
||||||
|
.setDirection(realtimeLocationResult.getDirection())
|
||||||
|
.setOverspeed(realtimeLocationResult.isOverspeed())
|
||||||
|
.setCompensate(realtimeLocationResult.getType() == 1))
|
||||||
|
);
|
||||||
|
} */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void onApplicationReady() {
|
||||||
|
Websocket.subscribe("up/truck_location_track/history", msg -> {
|
||||||
|
WsMsg.User from = msg.getFrom();
|
||||||
|
String cid = from.getCid();
|
||||||
|
LocationTrackHistoryParam param = msg.extractData(LocationTrackHistoryParam.class);
|
||||||
|
LocalDateTime startTime = param.getStartTime();
|
||||||
|
LocalDateTime endTime = param.getEndTime();
|
||||||
|
Long orderId = param.getOrderId();
|
||||||
|
Integer speed = param.getSpeed();
|
||||||
|
CompletableFuture.runAsync(() -> {
|
||||||
|
try {
|
||||||
|
int currentPage = 1;
|
||||||
|
// 分页查询,每次500条
|
||||||
|
while (true) {
|
||||||
|
// 检查是否被取消
|
||||||
|
if (Thread.currentThread().isInterrupted()) {
|
||||||
|
log.info("任务被取消");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Page<TruckLocationTrackEntity> page = new Page<>(currentPage, 500);
|
||||||
|
page.addOrder(OrderItem.asc("location_time"));
|
||||||
|
|
||||||
|
// 分页查询当前页数据
|
||||||
|
IPage<TruckLocationTrackEntity> resultPage = this.page(page, Wrappers.lambdaQuery(TruckLocationTrackEntity.class)
|
||||||
|
.eq(TruckLocationTrackEntity::getOrderId, orderId)
|
||||||
|
.ge(startTime != null, TruckLocationTrackEntity::getLocationTime, startTime)
|
||||||
|
.le(endTime != null, TruckLocationTrackEntity::getLocationTime, endTime));
|
||||||
|
|
||||||
|
List<TruckLocationTrackEntity> records = resultPage.getRecords();
|
||||||
|
if (records.isEmpty()) {
|
||||||
|
Websocket.publish(new WsMsg()
|
||||||
|
.setEvent("down/truck_location_track/history")
|
||||||
|
.setData(R.failed(ExceptionMsg.SYS_ERR_MSG, "数据发送完成"))
|
||||||
|
.setTo(Collections.singletonList(new WsMsg.User().setCid(cid)))
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 处理当前页数据,按时间间隔推送
|
||||||
|
for (TruckLocationTrackEntity record : records) {
|
||||||
|
if (Thread.currentThread().isInterrupted()) {
|
||||||
|
log.info("任务被取消");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (speed > 100) {
|
||||||
|
if (!ThreadUtil.sleep(speed)) {
|
||||||
|
log.info("任务被取消");
|
||||||
|
Websocket.publish(new WsMsg()
|
||||||
|
.setEvent("down/truck_location_track/history")
|
||||||
|
.setData(R.failed(ExceptionMsg.SYS_ERR_MSG, "数据发送完成"))
|
||||||
|
.setTo(Collections.singletonList(new WsMsg.User().setCid(cid)))
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Tuple2<Double, Double> doubles = CoordinateConverter.wgs84ToGcj02(record.getLatitude(), record.getLongitude());
|
||||||
|
record.setLatitude(doubles.get_0())
|
||||||
|
.setLongitude(doubles.get_1());
|
||||||
|
boolean publish = Websocket.publish(new WsMsg()
|
||||||
|
.setEvent("down/truck_location_track/history")
|
||||||
|
.setData(R.success(record))
|
||||||
|
.setTo(Collections.singletonList(new WsMsg.User().setCid(cid)))
|
||||||
|
);
|
||||||
|
if (!publish) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 判断是否为最后一页
|
||||||
|
if (currentPage >= resultPage.getPages()) {
|
||||||
|
Websocket.publish(new WsMsg()
|
||||||
|
.setEvent("down/truck_location_track/history")
|
||||||
|
.setData(R.failed(ExceptionMsg.SYS_ERR_MSG, "数据发送完成"))
|
||||||
|
.setTo(Collections.singletonList(new WsMsg.User().setCid(cid)))
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 继续查询下一页
|
||||||
|
currentPage++;
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
// 其他异常
|
||||||
|
log.error("查询历史轨迹异常", e);
|
||||||
|
}
|
||||||
|
}, threadPoolExecutor);
|
||||||
|
});
|
||||||
|
|
||||||
|
Websocket.subscribe("up/truck_location_track/realtime", msg -> {
|
||||||
|
WsMsg.User from = msg.getFrom();
|
||||||
|
String cid = from.getCid();
|
||||||
|
LocationTrackHistoryParam param = msg.extractData(LocationTrackHistoryParam.class);
|
||||||
|
Long orderId = param.getOrderId();
|
||||||
|
Integer speed = param.getSpeed();
|
||||||
|
CompletableFuture.runAsync(() -> {
|
||||||
|
try {
|
||||||
|
LocalDateTime startTime = LocalDateTime.now().minusSeconds(1);
|
||||||
|
int errCount = 0;
|
||||||
|
while (true) {
|
||||||
|
// 检查是否被取消
|
||||||
|
if (Thread.currentThread().isInterrupted()) {
|
||||||
|
log.info("任务被取消");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Page<TruckLocationTrackEntity> page = new Page<>(1, 10);
|
||||||
|
page.addOrder(OrderItem.asc("location_time"));
|
||||||
|
|
||||||
|
// 分页查询当前页数据
|
||||||
|
IPage<TruckLocationTrackEntity> resultPage = this.page(page, Wrappers.lambdaQuery(TruckLocationTrackEntity.class)
|
||||||
|
.eq(TruckLocationTrackEntity::getOrderId, orderId)
|
||||||
|
.gt(TruckLocationTrackEntity::getLocationTime, startTime));
|
||||||
|
|
||||||
|
List<TruckLocationTrackEntity> records = resultPage.getRecords();
|
||||||
|
if (records.isEmpty()) {
|
||||||
|
if (errCount >= 200) {
|
||||||
|
log.error("暂无实时数据, 订单ID: {}, 时间: {}", orderId, startTime);
|
||||||
|
Websocket.publish(new WsMsg()
|
||||||
|
.setEvent("down/truck_location_track/realtime")
|
||||||
|
.setData(R.failed(ExceptionMsg.SYS_ERR_MSG, "暂无实时数据"))
|
||||||
|
.setTo(Collections.singletonList(new WsMsg.User().setCid(cid)))
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
errCount++;
|
||||||
|
if (!ThreadUtil.sleep(300)) {
|
||||||
|
log.info("任务被取消");
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
startTime = LocalDateTime.now();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
errCount = 0;
|
||||||
|
|
||||||
|
// 处理当前页数据,按时间间隔推送
|
||||||
|
for (TruckLocationTrackEntity record : records) {
|
||||||
|
if (Thread.currentThread().isInterrupted()) {
|
||||||
|
log.info("任务被取消");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (speed > 100) {
|
||||||
|
if (!ThreadUtil.sleep(speed)) {
|
||||||
|
log.info("任务被取消");
|
||||||
|
Websocket.publish(new WsMsg()
|
||||||
|
.setEvent("down/truck_location_track/realtime")
|
||||||
|
.setData(R.failed(ExceptionMsg.SYS_ERR_MSG, "数据发送完成"))
|
||||||
|
.setTo(Collections.singletonList(new WsMsg.User().setCid(cid)))
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
startTime = record.getLocationTime();
|
||||||
|
Tuple2<Double, Double> doubles = CoordinateConverter.wgs84ToGcj02(record.getLatitude(), record.getLongitude());
|
||||||
|
record.setLatitude(doubles.get_0())
|
||||||
|
.setLongitude(doubles.get_1());
|
||||||
|
boolean publish = Websocket.publish(new WsMsg()
|
||||||
|
.setEvent("down/truck_location_track/realtime")
|
||||||
|
.setData(R.success(record))
|
||||||
|
.setTo(Collections.singletonList(new WsMsg.User().setCid(cid)))
|
||||||
|
);
|
||||||
|
if (!publish) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
// 其他异常
|
||||||
|
log.error("查询历史轨迹异常", e);
|
||||||
|
}
|
||||||
|
}, threadPoolExecutor);
|
||||||
|
|
||||||
|
/* CompletableFuture.runAsync(() -> {
|
||||||
|
while (true) {
|
||||||
|
ThreadUtil.sleep(3000);
|
||||||
|
realtimeDataListenerLock.lock();
|
||||||
|
try {
|
||||||
|
realtimeDataListener.removeIf(it -> !Websocket.online(it.get_0(), it.get_1()));
|
||||||
|
} finally {
|
||||||
|
realtimeDataListenerLock.unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
List<Tuple3<String, String, String>> listeners = realtimeDataListener;
|
||||||
|
if (listeners.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Tuple3<String, String, String> tuple3 : listeners) {
|
||||||
|
if (!Websocket.online(uid, cid)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Websocket.send(uid, cid, new WsMsg()
|
||||||
|
.setEvent("realtimeData")
|
||||||
|
.setData(R.success(new TruckLocationTrackEntity()))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, threadPoolExecutor); */
|
||||||
|
|
||||||
|
/* CompletableFuture.runAsync(() -> {
|
||||||
|
try {
|
||||||
|
int currentPage = 1;
|
||||||
|
// 分页查询,每次500条
|
||||||
|
while (true) {
|
||||||
|
// 检查是否被取消
|
||||||
|
if (Thread.currentThread().isInterrupted()) {
|
||||||
|
log.info("任务被取消");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Page<TruckLocationTrackEntity> page = new Page<>(currentPage, 500);
|
||||||
|
page.addOrder(OrderItem.asc("location_time"));
|
||||||
|
|
||||||
|
LocalDateTime now = LocalDateTime.now();
|
||||||
|
// 分页查询当前页数据
|
||||||
|
IPage<TruckLocationTrackEntity> resultPage = this.page(page, Wrappers.lambdaQuery(TruckLocationTrackEntity.class)
|
||||||
|
.eq(TruckLocationTrackEntity::getOrderId, orderId)
|
||||||
|
.ge(TruckLocationTrackEntity::getLocationTime, now));
|
||||||
|
|
||||||
|
List<TruckLocationTrackEntity> records = resultPage.getRecords();
|
||||||
|
if (records.isEmpty()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 处理当前页数据,按时间间隔推送
|
||||||
|
for (TruckLocationTrackEntity record : records) {
|
||||||
|
if (Thread.currentThread().isInterrupted()) {
|
||||||
|
log.info("任务被取消");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!ThreadUtil.sleep(1000)) {
|
||||||
|
log.info("任务被取消");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Websocket.send(msg.getUid(), msg.getCid(), new WsMsg()
|
||||||
|
.setEvent("historyData")
|
||||||
|
.setUid(msg.getUid())
|
||||||
|
.setCid(msg.getCid())
|
||||||
|
.setData(record)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 判断是否为最后一页
|
||||||
|
if (currentPage >= resultPage.getPages()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 继续查询下一页
|
||||||
|
currentPage++;
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
// 其他异常
|
||||||
|
log.error("查询历史轨迹异常", e);
|
||||||
|
}
|
||||||
|
}, threadPoolExecutor); */
|
||||||
|
|
||||||
|
});
|
||||||
|
/* Websocket.addListener("/truck_location_track/history", msg -> {
|
||||||
|
LocationTrackHistoryParam param = msg.extractData(LocationTrackHistoryParam.class);
|
||||||
|
LocalDateTime startTime = param.getStartTime();
|
||||||
|
LocalDateTime endTime = param.getEndTime();
|
||||||
|
Long orderId = param.getOrderId();
|
||||||
|
Integer speed = param.getSpeed();
|
||||||
|
CompletableFuture.runAsync(() -> {
|
||||||
|
try {
|
||||||
|
int currentPage = 1;
|
||||||
|
// 分页查询,每次500条
|
||||||
|
while (true) {
|
||||||
|
// 检查是否被取消
|
||||||
|
if (Thread.currentThread().isInterrupted()) {
|
||||||
|
log.info("任务被取消");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Page<TruckLocationTrackEntity> page = new Page<>(currentPage, 500);
|
||||||
|
page.addOrder(OrderItem.asc("location_time"));
|
||||||
|
|
||||||
|
// 分页查询当前页数据
|
||||||
|
IPage<TruckLocationTrackEntity> resultPage = this.page(page, Wrappers.lambdaQuery(TruckLocationTrackEntity.class)
|
||||||
|
.eq(TruckLocationTrackEntity::getOrderId, orderId)
|
||||||
|
.ge(startTime != null, TruckLocationTrackEntity::getLocationTime, startTime)
|
||||||
|
.le(endTime != null, TruckLocationTrackEntity::getLocationTime, endTime));
|
||||||
|
|
||||||
|
List<TruckLocationTrackEntity> records = resultPage.getRecords();
|
||||||
|
if (records.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 处理当前页数据,按时间间隔推送
|
||||||
|
for (TruckLocationTrackEntity record : records) {
|
||||||
|
if (Thread.currentThread().isInterrupted()) {
|
||||||
|
log.info("任务被取消");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (speed > 100) {
|
||||||
|
if (!ThreadUtil.sleep(speed)) {
|
||||||
|
log.info("任务被取消");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
String uid = msg.getUid();
|
||||||
|
String cid = msg.getCid();
|
||||||
|
if (!Websocket.isOnline(uid, cid)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Websocket.send(uid, cid, new WsMsg()
|
||||||
|
.setEvent("historyData")
|
||||||
|
.setData(R.success(record))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 判断是否为最后一页
|
||||||
|
if (currentPage >= resultPage.getPages()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 继续查询下一页
|
||||||
|
currentPage++;
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
// 其他异常
|
||||||
|
log.error("查询历史轨迹异常", e);
|
||||||
|
}
|
||||||
|
}, threadPoolExecutor);
|
||||||
|
}); */
|
||||||
|
|
||||||
|
/* Websocket.addListener("/truck_location_track/realtime", msg -> {
|
||||||
|
LocationTrackHistoryParam param = msg.extractData(LocationTrackHistoryParam.class);
|
||||||
|
Long orderId = param.getOrderId();
|
||||||
|
String uid = msg.getUid();
|
||||||
|
String cid = msg.getCid();
|
||||||
|
String gpsId = baseMapper.gpsId(orderId);
|
||||||
|
realtimeDataListenerLock.lock();
|
||||||
|
try {
|
||||||
|
realtimeDataListener.add(Tuple3.create(uid, cid, gpsId));
|
||||||
|
} finally {
|
||||||
|
realtimeDataListenerLock.unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
*//* CompletableFuture.runAsync(() -> {
|
||||||
|
while (true) {
|
||||||
|
ThreadUtil.sleep(3000);
|
||||||
|
realtimeDataListenerLock.lock();
|
||||||
|
try {
|
||||||
|
realtimeDataListener.removeIf(it -> !Websocket.online(it.get_0(), it.get_1()));
|
||||||
|
} finally {
|
||||||
|
realtimeDataListenerLock.unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
List<Tuple3<String, String, String>> listeners = realtimeDataListener;
|
||||||
|
if (listeners.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Tuple3<String, String, String> tuple3 : listeners) {
|
||||||
|
if (!Websocket.online(uid, cid)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Websocket.send(uid, cid, new WsMsg()
|
||||||
|
.setEvent("realtimeData")
|
||||||
|
.setData(R.success(new TruckLocationTrackEntity()))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, threadPoolExecutor); *//*
|
||||||
|
|
||||||
|
*//* CompletableFuture.runAsync(() -> {
|
||||||
|
try {
|
||||||
|
int currentPage = 1;
|
||||||
|
// 分页查询,每次500条
|
||||||
|
while (true) {
|
||||||
|
// 检查是否被取消
|
||||||
|
if (Thread.currentThread().isInterrupted()) {
|
||||||
|
log.info("任务被取消");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Page<TruckLocationTrackEntity> page = new Page<>(currentPage, 500);
|
||||||
|
page.addOrder(OrderItem.asc("location_time"));
|
||||||
|
|
||||||
|
LocalDateTime now = LocalDateTime.now();
|
||||||
|
// 分页查询当前页数据
|
||||||
|
IPage<TruckLocationTrackEntity> resultPage = this.page(page, Wrappers.lambdaQuery(TruckLocationTrackEntity.class)
|
||||||
|
.eq(TruckLocationTrackEntity::getOrderId, orderId)
|
||||||
|
.ge(TruckLocationTrackEntity::getLocationTime, now));
|
||||||
|
|
||||||
|
List<TruckLocationTrackEntity> records = resultPage.getRecords();
|
||||||
|
if (records.isEmpty()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 处理当前页数据,按时间间隔推送
|
||||||
|
for (TruckLocationTrackEntity record : records) {
|
||||||
|
if (Thread.currentThread().isInterrupted()) {
|
||||||
|
log.info("任务被取消");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!ThreadUtil.sleep(1000)) {
|
||||||
|
log.info("任务被取消");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Websocket.send(msg.getUid(), msg.getCid(), new WsMsg()
|
||||||
|
.setEvent("historyData")
|
||||||
|
.setUid(msg.getUid())
|
||||||
|
.setCid(msg.getCid())
|
||||||
|
.setData(record)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 判断是否为最后一页
|
||||||
|
if (currentPage >= resultPage.getPages()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 继续查询下一页
|
||||||
|
currentPage++;
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
// 其他异常
|
||||||
|
log.error("查询历史轨迹异常", e);
|
||||||
|
}
|
||||||
|
}, threadPoolExecutor); *//*
|
||||||
|
|
||||||
|
}); */
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue