This commit is contained in:
2025-09-23 17:20:58 +08:00
parent ca2b70dfbe
commit 5f3e9a08f6
25 changed files with 1471 additions and 1095 deletions

View File

@@ -198,11 +198,13 @@ public class ClientAccountController extends BaseController {
Map<String, Object> claims = Jwts.parser().setSigningKey(jwtRsaKeyService.getPublicKey()).parseClaimsJws(token).getBody();
String username = (String) claims.getOrDefault("sub", claims.get("subject"));
String tokenClientId = (String) claims.get("clientId");
if (username == null || tokenClientId == null || !tokenClientId.equals(clientId)) {
throw new RuntimeException("会话不匹配");
}
SseEmitter emitter = sseHubService.register(username, clientId, 0L);
try { emitter.send(SseEmitter.event().name("ready").data("ok")); } catch (Exception ignored) {}
try { emitter.send(SseEmitter.event().data("{\"type\":\"ready\"}")); } catch (Exception ignored) {}
return emitter;
}

View File

@@ -114,11 +114,31 @@ public class ClientDeviceController {
return AjaxResult.success();
}
if (!"removed".equals(exists.getStatus())) {
// 先推送下线事件,再断开连接
sseHubService.sendEvent(exists.getUsername(), deviceId, "DEVICE_REMOVED", "{}");
// 立即断开SSE连接防止重新上线
sseHubService.disconnectDevice(exists.getUsername(), deviceId);
// 更新设备状态
exists.setStatus("removed");
exists.setLastActiveAt(new java.util.Date());
clientDeviceMapper.updateByDeviceId(exists);
// 推送SSE下线事件
try { sseHubService.sendEvent(exists.getUsername(), deviceId, "DEVICE_REMOVED", "{}"); } catch (Exception ignored) {}
}
return AjaxResult.success();
}
/**
* 设备离线
*/
@PostMapping("/offline")
public AjaxResult offline(@RequestBody Map<String, String> body) {
String deviceId = body.get("deviceId");
if (deviceId == null) return AjaxResult.error("deviceId不能为空");
ClientDevice device = clientDeviceMapper.selectByDeviceId(deviceId);
if (device != null) {
device.setStatus("offline");
device.setLastActiveAt(new java.util.Date());
clientDeviceMapper.updateByDeviceId(device);
}
return AjaxResult.success();
}

View File

@@ -1,430 +1,52 @@
package com.ruoyi.web.controller.tool;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.context.request.async.DeferredResult;
import org.springframework.web.bind.annotation.*;
import com.ruoyi.common.annotation.Anonymous;
import com.ruoyi.common.core.controller.BaseController;
import com.ruoyi.common.core.domain.R;
import com.ruoyi.common.utils.StringUtils;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import com.ruoyi.system.domain.BanmaAccount;
import com.ruoyi.system.service.IBanmaAccountService;
/**
* 斑马订单控制器
*
* @author ruoyi
* 斑马账号管理(数据库版,极简接口):
* - 仅负责账号与 Token 的存取
* - 不参与登录/刷新与数据采集,客户端自行处理
*/
@Api("斑马订单接口")
@RestController
@RequestMapping("/tool/banma")
@Anonymous
public class BanmaOrderController extends BaseController {
private static String AUTH_TOKEN = "Bearer e5V8Vlaf9xh5i31xaI300wbdXEE3iLtgip+JXfzZsb7GShP2XCGhoVzTEVxyc8LH";
private static final String LOGIN_URL = "https://banma365.cn/api/login";
private static final String LOGIN_USERNAME = "大赢家网络科技(主账号)";
private static final String LOGIN_PASSWORD = "banma123456";
private static final String API_URL = "https://banma365.cn/api/order/list?recipientName=&page=%d&size=%d&markFlag=0&state=4&_t=%d";
private static final String API_URL_WITH_TIME = "https://banma365.cn/api/order/list?recipientName=&page=%d&size=%d&markFlag=0&state=4&orderedAtStart=%s&orderedAtEnd=%s&_t=%d";
private static final String TRACKING_URL = "https://banma365.cn/zebraExpressHub/web/tracking/getByExpressNumber/%s";
private static final int CONNECTION_TIMEOUT = 999999999;
private static final int READ_TIMEOUT = 999999999;
private static final int DEFAULT_PAGE_SIZE = 20;
@Autowired
private RestTemplate restTemplate;
private IBanmaAccountService accountService;
@Autowired
private SagawaExpressController sagawaExpressController;
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
public BanmaOrderController() {
HttpComponentsClientHttpRequestFactory factory = new HttpComponentsClientHttpRequestFactory();
factory.setConnectTimeout(CONNECTION_TIMEOUT);
factory.setReadTimeout(READ_TIMEOUT);
restTemplate = new RestTemplate(factory);
}
/**
* 初始化方法启动时刷新token
* 查询账号列表(仅返回必要字段)
*/
@PostConstruct
public void init() {
refreshToken();
}
/**
* 关闭线程池
*/
@PreDestroy
public void destroy() {
executorService.shutdownNow();
}
@Scheduled(fixedRate = 86400000 * 3)
public void refreshToken() {
try {
// 1. 输入准备:构建请求参数
Map<String, String> loginParams = new HashMap<>();
loginParams.put("username", LOGIN_USERNAME);
loginParams.put("password", LOGIN_PASSWORD);
HttpHeaders headers = new HttpHeaders();
headers.set("Content-Type", "application/json");
ResponseEntity<Map> response = restTemplate.postForEntity(
LOGIN_URL,
new HttpEntity<>(loginParams, headers),
Map.class
);
Optional.ofNullable(response.getBody())
.filter(body -> Integer.valueOf(0).equals(body.get("code")))
.map(body -> (Map<String, Object>) body.get("data"))
.map(data -> (String) data.get("token"))
.filter(StringUtils::isNotEmpty)
.ifPresent(token -> {
AUTH_TOKEN = "Bearer " + token;
logger.info("斑马token刷新成功: {}", token);
});
} catch (Exception e) {
logger.error("斑马token刷新异常: {}", e.getMessage());
}
@GetMapping("/accounts")
public R<?> listAccounts() {
List<BanmaAccount> list = accountService.listSimple();
return R.ok(list);
}
/**
* 创建HTTP请求实体
* 新增或编辑账号(含设为默认)
*/
private HttpEntity<String> createHttpEntity() {
HttpHeaders headers = new HttpHeaders();
headers.set("Authorization", AUTH_TOKEN);
return new HttpEntity<>(headers);
@PostMapping("/accounts")
public R<?> saveAccount(@RequestBody BanmaAccount body) {
Long id = accountService.saveOrUpdate(body);
return R.ok(Map.of("id", id));
}
/**
* 处理订单数据
* 删除账号
*/
@SuppressWarnings("unchecked")
private CompletableFuture<Map<String, Object>> processOrderDataAsync(Map<String, Object> order) {
return CompletableFuture.supplyAsync(() -> {
if (order == null) return null;
Map<String, Object> simplifiedOrder = new HashMap<>();
// 提取国际运单号和运费
String trackingNumber = (String) order.get("internationalTrackingNumber");
simplifiedOrder.put("internationalTrackingNumber", trackingNumber);
simplifiedOrder.put("internationalShippingFee", order.get("internationalShippingFee"));
// 获取物流轨迹信息
if (StringUtils.isNotEmpty(trackingNumber)) {
simplifiedOrder.put("trackInfo", getTrackingInfo(trackingNumber));
}
// 处理子订单信息
Optional.ofNullable(order.get("subOrders"))
.map(subOrders -> (List<Map<String, Object>>) subOrders)
.filter(list -> !list.isEmpty())
.map(list -> list.get(0))
.ifPresent(subOrder -> extractSubOrderFields(simplifiedOrder, subOrder));
return simplifiedOrder;
}, executorService);
@DeleteMapping("/accounts/{id}")
public R<?> remove(@PathVariable Long id) {
accountService.remove(id);
return R.ok();
}
/**
* 提取子订单字段
*/
private void extractSubOrderFields(Map<String, Object> simplifiedOrder, Map<String, Object> subOrder) {
// 基础信息
simplifiedOrder.put("orderedAt", subOrder.get("orderedAt"));
simplifiedOrder.put("timeSinceOrder", subOrder.get("timeSinceOrder"));
simplifiedOrder.put("productImage", subOrder.get("productImage"));
simplifiedOrder.put("createdAt", subOrder.get("createdAt"));
simplifiedOrder.put("poTrackingNumber", subOrder.get("poTrackingNumber"));
// 商品信息
simplifiedOrder.put("productTitle", subOrder.get("productTitle"));
simplifiedOrder.put("shopOrderNumber", subOrder.get("shopOrderNumber"));
simplifiedOrder.put("priceJpy", subOrder.get("priceJpy"));
simplifiedOrder.put("productQuantity", subOrder.get("productQuantity"));
simplifiedOrder.put("shippingFeeJpy", subOrder.get("shippingFeeJpy"));
simplifiedOrder.put("productNumber", subOrder.get("productNumber"));
// 采购信息
simplifiedOrder.put("poNumber", subOrder.get("poNumber"));
simplifiedOrder.put("shippingFeeCny", subOrder.get("shippingFeeCny"));
simplifiedOrder.put("poLogisticsCompany", subOrder.get("poLogisticsCompany"));
}
/**
* 获取斑马订单数据 - 异步方法
*/
@SuppressWarnings("unchecked")
private CompletableFuture<List<Map<String, Object>>> fetchOrdersFromApiAsync(int page, int size, String startDate, String endDate) {
return CompletableFuture.supplyAsync(() -> {
try {
HttpEntity<String> entity = createHttpEntity();
String url = buildApiUrl(page, size, startDate, endDate);
ResponseEntity<Map> response = restTemplate.exchange(url, HttpMethod.GET, entity, Map.class);
Map<String, Object> responseBody = response.getBody();
if (responseBody == null || !responseBody.containsKey("data")) {
return Collections.emptyList();
}
Map<String, Object> dataMap = (Map<String, Object>) responseBody.get("data");
List<Map<String, Object>> orders = Optional.ofNullable(dataMap.get("list"))
.map(list -> (List<Map<String, Object>>) list)
.orElse(Collections.emptyList());
return orders;
} catch (Exception e) {
logger.error("获取订单数据失败: {}", e.getMessage());
return Collections.emptyList();
}
}, executorService);
}
/**
* 构建API URL
*/
private String buildApiUrl(int page, int size, String startDate, String endDate) {
if (StringUtils.isNotEmpty(startDate) && StringUtils.isNotEmpty(endDate)) {
String startTime = startDate + " 00:00:00";
String endTime = endDate + " 23:59:59";
return String.format(API_URL_WITH_TIME, page, size, startTime, endTime, System.currentTimeMillis());
}
return String.format(API_URL, page, size, System.currentTimeMillis());
}
/**
* 获取物流轨迹信息
*/
@SuppressWarnings("unchecked")
private String getTrackingInfo(String trackingNumber) {
try {
R<Map<String, Object>> sagawaResult = sagawaExpressController.getTrackingInfo(trackingNumber);
if (sagawaResult != null && sagawaResult.getCode() == 200) {
Map<String, Object> sagawaData = sagawaResult.getData();
if (sagawaData != null && "success".equals(sagawaData.get("status"))) {
Map<String, String> trackInfo = (Map<String, String>) sagawaData.get("trackInfo");
if (trackInfo != null) {
return String.format("%s - %s - %s",
trackInfo.get("status"),
trackInfo.get("dateTime"),
trackInfo.get("office"));
}
}
}
try {
String url = String.format(TRACKING_URL, trackingNumber);
ResponseEntity<Map> response = restTemplate.getForEntity(url, Map.class);
Map<String, Object> responseBody = response.getBody();
if (responseBody != null && Integer.valueOf(0).equals(responseBody.get("code"))) {
return Optional.ofNullable(responseBody.get("data"))
.map(data -> (List<Map<String, Object>>) data)
.filter(list -> !list.isEmpty())
.map(list -> list.get(0))
.map(track -> (String) track.get("track"))
.orElse(null);
}
} catch (Exception e) {
logger.error("从斑马API获取物流信息失败: {}", e.getMessage());
}
} catch (Exception e) {
logger.error("获取物流信息失败: {}", e.getMessage());
}
return "暂无物流信息";
}
/**
* 获取物流轨迹信息
*/
@ApiOperation("获取物流轨迹信息")
@GetMapping("/tracking/{trackingNumber}")
public R<String> getTracking(@PathVariable("trackingNumber") String trackingNumber) {
try {
String trackInfo = getTrackingInfo(trackingNumber);
return trackInfo != null ? R.ok(trackInfo) : R.fail("未找到物流信息");
} catch (Exception e) {
return R.fail("获取物流信息失败: " + e.getMessage());
}
}
/**
* 获取所有页的斑马订单 - 优化版本
*/
@ApiOperation("获取所有页的斑马订单")
@GetMapping("/orders/all")
@SuppressWarnings("unchecked")
public DeferredResult<R<Map<String, Object>>> getAllOrders(
@ApiParam("开始日期(yyyy-MM-dd)") @RequestParam(required = false) String startDate,
@ApiParam("结束日期(yyyy-MM-dd)") @RequestParam(required = false) String endDate) {
DeferredResult<R<Map<String, Object>>> deferredResult = new DeferredResult<>(9999000L);
CompletableFuture.runAsync(() -> {
try {
HttpEntity<String> entity = createHttpEntity();
String url = buildApiUrl(1, DEFAULT_PAGE_SIZE, startDate, endDate);
ResponseEntity<Map> response = restTemplate.exchange(url, HttpMethod.GET, entity, Map.class);
Map<String, Object> responseBody = response.getBody();
Map<String, Object> dataMap = (Map<String, Object>) responseBody.get("data");
int totalCount = ((Number) dataMap.getOrDefault("total", 0)).intValue();
List<Map<String, Object>> orders = Optional.ofNullable(dataMap.get("list"))
.map(list -> (List<Map<String, Object>>) list)
.orElse(Collections.emptyList());
List<CompletableFuture<Map<String, Object>>> futures = orders.stream()
.map(this::processOrderDataAsync)
.toList();
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
// 收集所有处理结果
CompletableFuture<List<Map<String, Object>>> resultsFuture = allFutures.thenApply(v ->
futures.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList())
);
List<Map<String, Object>> processedOrders = resultsFuture.get();
int totalPages = (int) Math.ceil((double) totalCount / DEFAULT_PAGE_SIZE);
boolean hasMore = totalCount > 0 && 1 < totalPages;
Map<String, Object> resultMap = new HashMap<>();
resultMap.put("orders", processedOrders);
resultMap.put("total", totalCount);
resultMap.put("totalPages", totalPages);
resultMap.put("hasMore", hasMore);
resultMap.put("nextPage", 2);
deferredResult.setResult(R.ok(resultMap));
} catch (Exception e) {
logger.error("获取订单数据失败: {}", e.getMessage());
deferredResult.setResult(R.fail("获取订单失败: " + e.getMessage()));
}
}, executorService);
return deferredResult;
}
/**
* 获取下一页斑马订单 - 优化版本
*/
@ApiOperation("获取下一页斑马订单")
@GetMapping("/orders/next")
public DeferredResult<R<Map<String, Object>>> getNextPageOrders(
@RequestParam(value = "page", defaultValue = "1") Integer page,
@ApiParam("开始日期(yyyy-MM-dd)") @RequestParam(required = false) String startDate,
@ApiParam("结束日期(yyyy-MM-dd)") @RequestParam(required = false) String endDate) {
DeferredResult<R<Map<String, Object>>> deferredResult = new DeferredResult<>(999999999L);
CompletableFuture.runAsync(() -> {
try {
// 获取总页数信息
HttpEntity<String> entity = createHttpEntity();
String url = buildApiUrl(1, DEFAULT_PAGE_SIZE, startDate, endDate);
ResponseEntity<Map> countResponse = restTemplate.exchange(url, HttpMethod.GET, entity, Map.class);
Map<String, Object> countResponseBody = countResponse.getBody();
int totalPages = 1;
if (countResponseBody != null && countResponseBody.containsKey("data")) {
Map<String, Object> dataMap = (Map<String, Object>) countResponseBody.get("data");
int totalCount = ((Number) dataMap.getOrDefault("total", 0)).intValue();
totalPages = (int) Math.ceil((double) totalCount / DEFAULT_PAGE_SIZE);
}
// 获取当前页数据
CompletableFuture<List<Map<String, Object>>> ordersFuture = fetchOrdersFromApiAsync(page, DEFAULT_PAGE_SIZE, startDate, endDate);
List<Map<String, Object>> orders = ordersFuture.get();
// 并行处理订单数据
List<CompletableFuture<Map<String, Object>>> processFutures = orders.stream()
.map(this::processOrderDataAsync)
.collect(Collectors.toList());
// 等待所有处理完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
processFutures.toArray(new CompletableFuture[0])
);
CompletableFuture<List<Map<String, Object>>> resultsFuture = allFutures.thenApply(v ->
processFutures.stream()
.map(CompletableFuture::join)
.filter(order -> order != null)
.collect(Collectors.toList())
);
List<Map<String, Object>> processedOrders = resultsFuture.get();
// 修改hasMore判断逻辑根据当前页数和总页数判断
boolean hasMore = page < totalPages;
Map<String, Object> resultMap = new HashMap<>();
resultMap.put("orders", processedOrders);
resultMap.put("hasMore", hasMore);
resultMap.put("nextPage", page + 1);
resultMap.put("totalPages", totalPages);
deferredResult.setResult(R.ok(resultMap));
} catch (Exception e) {
logger.error("获取下一页订单失败: {}", e.getMessage());
deferredResult.setResult(R.fail("获取订单失败: " + e.getMessage()));
}
}, executorService);
return deferredResult;
}
/**
* 图片代理接口
*/
@ApiOperation("图片代理接口")
@GetMapping("/image-proxy")
public void imageProxy(@RequestParam("url") String imageUrl, javax.servlet.http.HttpServletResponse response) {
if (StringUtils.isEmpty(imageUrl)) {
return;
}
try {
HttpComponentsClientHttpRequestFactory factory = new HttpComponentsClientHttpRequestFactory();
factory.setConnectTimeout(999999999);
factory.setReadTimeout(999999999);
RestTemplate proxyTemplate = new RestTemplate(factory);
ResponseEntity<byte[]> imageResponse = proxyTemplate.getForEntity(imageUrl, byte[].class);
byte[] imageBytes = imageResponse.getBody();
if (imageBytes != null) {
String contentType = Optional.ofNullable(imageResponse.getHeaders().getContentType())
.map(Object::toString)
.orElse("image/jpeg");
response.setContentType(contentType);
response.setContentLength(imageBytes.length);
response.getOutputStream().write(imageBytes);
response.getOutputStream().flush();
}
} catch (Exception e) {
logger.error("图片代理请求失败: {}", e.getMessage());
}
}
/**
* 手动刷新token接口
*/
@GetMapping("/refresh-token")
public R<String> manualRefreshToken() {
refreshToken();
return R.ok("Token刷新请求已执行");
}
}
}

View File

@@ -1,9 +1,13 @@
package com.ruoyi.web.sse;
import com.ruoyi.system.domain.ClientDevice;
import com.ruoyi.system.mapper.ClientDeviceMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -12,6 +16,9 @@ public class SseHubService {
private final Map<String, SseEmitter> sessionEmitters = new ConcurrentHashMap<>();
@Autowired
private ClientDeviceMapper clientDeviceMapper;
public String buildSessionKey(String username, String clientId) {
return (username == null ? "" : username) + ":" + (clientId == null ? "" : clientId);
}
@@ -20,18 +27,36 @@ public class SseHubService {
String key = buildSessionKey(username, clientId);
SseEmitter emitter = new SseEmitter(timeoutMs != null ? timeoutMs : 0L);
sessionEmitters.put(key, emitter);
emitter.onCompletion(() -> sessionEmitters.remove(key));
emitter.onTimeout(() -> sessionEmitters.remove(key));
// SSE连接建立 = 设备上线
updateDeviceStatus(clientId, "online");
emitter.onCompletion(() -> {
sessionEmitters.remove(key);
updateDeviceStatus(clientId, "offline");
});
emitter.onTimeout(() -> {
sessionEmitters.remove(key);
updateDeviceStatus(clientId, "offline");
});
emitter.onError((throwable) -> {
sessionEmitters.remove(key);
updateDeviceStatus(clientId, "offline");
});
return emitter;
}
public void sendEvent(String username, String clientId, String type, String message) {
String key = buildSessionKey(username, clientId);
SseEmitter emitter = sessionEmitters.get(key);
if (emitter == null) return;
try {
String data = message != null ? message : "{}";
emitter.send(SseEmitter.event().name("event").data("{\"type\":\"" + type + "\",\"message\":" + escapeJson(data) + "}"));
String eventData = "{\"type\":\"" + type + "\",\"message\":" + escapeJson(data) + "}";
emitter.send(SseEmitter.event().data(eventData));
} catch (IOException e) {
sessionEmitters.remove(key);
try { emitter.complete(); } catch (Exception ignored) {}
@@ -53,6 +78,39 @@ public class SseHubService {
private String escapeJson(String raw) {
return "\"" + raw.replace("\\", "\\\\").replace("\"", "\\\"") + "\"";
}
/**
* 强制断开指定设备的SSE连接
*/
public void disconnectDevice(String username, String clientId) {
String key = buildSessionKey(username, clientId);
SseEmitter emitter = sessionEmitters.remove(key);
if (emitter != null) {
try {
emitter.complete();
} catch (Exception ignored) {}
}
}
/**
* 更新设备状态
*/
private void updateDeviceStatus(String deviceId, String status) {
try {
ClientDevice device = clientDeviceMapper.selectByDeviceId(deviceId);
if (device != null) {
// 如果设备被移除断开SSE连接
if ("removed".equals(status)) {
disconnectDevice(device.getUsername(), deviceId);
}
device.setStatus(status);
device.setLastActiveAt(new Date());
clientDeviceMapper.updateByDeviceId(device);
}
} catch (Exception ignored) {
// 静默处理不影响SSE主流程
}
}
}