This commit is contained in:
2025-06-18 18:30:01 +08:00
parent 98bb3529ea
commit 3d072958d6
12 changed files with 456 additions and 510 deletions

View File

@@ -1,7 +1,4 @@
package com.tashow.cloud.app.config;
import com.tashow.cloud.sdk.feishu.client.FeiShuAlertClient;
import jakarta.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.StringRedisTemplate;
@@ -12,11 +9,8 @@ import org.springframework.data.redis.core.StringRedisTemplate;
*/
@Configuration
public class FeiShuClientConfig {
@Autowired
private StringRedisTemplate stringRedisTemplate;
/* @PostConstruct
public void initFeiShuClient() {
FeiShuAlertClient.setRedisTemplate(stringRedisTemplate);

View File

@@ -1,26 +0,0 @@
package com.tashow.cloud.app.config;
import com.tashow.cloud.app.service.feishu.FeiShuCardDataService;
import com.tashow.cloud.sdk.feishu.client.FeiShuAlertClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import jakarta.annotation.PostConstruct;
/**
* 飞书配置类
* 用于初始化飞书SDK与应用层的集成
*/
@Configuration
public class FeishuConfig {
private final FeiShuAlertClient feiShuAlertClient;
private final FeiShuCardDataService feiShuCardDataService;
@Autowired
public FeishuConfig(FeiShuAlertClient feiShuAlertClient, FeiShuCardDataService feiShuCardDataService) {
this.feiShuAlertClient = feiShuAlertClient;
this.feiShuCardDataService = feiShuCardDataService;
}
}

View File

@@ -1,6 +1,5 @@
package com.tashow.cloud.app.controller;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.lark.oapi.core.utils.Decryptor;
import com.tashow.cloud.app.service.feishu.FeiShuCardDataService;
import com.tashow.cloud.sdk.feishu.client.FeiShuAlertClient;
@@ -16,7 +15,8 @@ import java.util.Map;
@RestController
public class FeishuController {
private final Logger log = LoggerFactory.getLogger(FeishuController.class);
private static final Logger log = LoggerFactory.getLogger(FeishuController.class);
private static final String ACTION_COMPLETE_ALARM = "complete_alarm";
private final FeiShuAlertClient feiShuAlertClient;
private final FeiShuCardDataService feiShuCardDataService;
@@ -29,7 +29,6 @@ public class FeishuController {
this.feiShuAlertClient = feiShuAlertClient;
this.feiShuCardDataService = feiShuCardDataService;
this.larkConfig = larkConfig;
}
@RequestMapping("/card1")
@@ -39,23 +38,33 @@ public class FeishuController {
if (data.containsKey("app_id") && data.containsKey("action")) {
JSONObject action = data.getJSONObject("action");
JSONObject value = action.getJSONObject("value");
if (value != null && "complete_alarm".equals(value.getStr("action"))) {
if (value != null && ACTION_COMPLETE_ALARM.equals(value.getStr("action"))) {
String messageId = data.getStr("open_message_id");
Map<String, Object> templateData = feiShuCardDataService.getCardData(messageId);
log.info("从Redis获取的模板数据: {}", templateData);
return feiShuAlertClient.buildCardWithData("AAqdp4Mrvf2V9", templateData);
return feiShuAlertClient.buildCardWithData(larkConfig.getSuccessCards(), templateData);
}
}
if (data.containsKey("encrypt")) {
Decryptor decryptor = new Decryptor(larkConfig.getEncryptKey());
String encrypt = decryptor.decrypt(data.getStr("encrypt"));
return encrypt;
return decryptor.decrypt(data.getStr("encrypt"));
}
return "{}";
} catch (Exception e) {
log.error("卡片处理异常", e);
return "{\"code\":1,\"msg\":\"处理异常: " + e.getMessage() + "\"}";
}
}
/**
* 发送并存储卡片消息
*/
public String sendAndStoreCardMessage(String chatId, String templateId, Map<String, Object> templateData) throws Exception {
String messageId = feiShuAlertClient.sendCardMessage(chatId, templateId, templateData);
if (messageId != null) {
feiShuCardDataService.saveCardData(messageId, templateData);
}
return messageId;
}
}

View File

@@ -71,6 +71,7 @@ public class BuriedPointConfiguration implements WebMvcConfigurer {
"/v3/api-docs/**",
"/webjars/**",
"/static/**",
"/card1",
"/error"
);
}

View File

@@ -1,13 +1,11 @@
package com.tashow.cloud.app.mq.consumer.buriedPoint;
import com.tashow.cloud.sdk.feishu.client.FeiShuAlertClient;
import com.tashow.cloud.app.mapper.BuriedPointMapper;
import com.tashow.cloud.app.mapper.BuriedPointFailRecordMapper;
import com.tashow.cloud.app.mq.message.BuriedMessages;
import com.tashow.cloud.app.model.BuriedPoint;
import com.tashow.cloud.app.model.BuriedPointFailRecord;
import com.tashow.cloud.sdk.feishu.config.LarkConfig;
import com.tashow.cloud.sdk.feishu.util.ChartImageGenerator;
import com.tashow.cloud.app.mq.message.BuriedMessages;
import com.tashow.cloud.app.service.feishu.BuriedPointMonitorService;
import com.rabbitmq.client.Channel;
import com.tashow.cloud.mq.rabbitmq.consumer.AbstractRabbitMQConsumer;
import lombok.RequiredArgsConstructor;
@@ -15,12 +13,11 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.Date;
import org.springframework.dao.DuplicateKeyException;
import com.tashow.cloud.common.util.json.JsonUtils;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@@ -36,8 +33,7 @@ public class BuriedPointConsumer extends AbstractRabbitMQConsumer<BuriedMessages
private final BuriedPointMapper buriedPointMapper;
private final BuriedPointFailRecordMapper buriedPointFailRecordMapper;
private final FeiShuAlertClient feiShuAlertClient;
private final LarkConfig larkConfig;
private final BuriedPointMonitorService buriedPointMonitorService;
@Value("${spring.application.name:tashow-app}")
private String applicationName;
@@ -54,7 +50,6 @@ public class BuriedPointConsumer extends AbstractRabbitMQConsumer<BuriedMessages
@Override
public boolean processMessage(BuriedMessages message) {
// 消息处理
return saveToDatabase(message);
}
@@ -68,19 +63,26 @@ public class BuriedPointConsumer extends AbstractRabbitMQConsumer<BuriedMessages
return buriedPoint.getRetryCount() - 1;
}
return buriedPoint.getRetryCount();
} else {
String correlationId = String.valueOf(message.getId());
LambdaQueryWrapper<BuriedPointFailRecord> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(BuriedPointFailRecord::getCorrelationId, correlationId);
BuriedPointFailRecord failRecord = buriedPointFailRecordMapper.selectOne(queryWrapper);
return failRecord != null ? failRecord.getRetryCount() : 0;
}
String correlationId = String.valueOf(message.getId());
BuriedPointFailRecord failRecord = findFailRecord(correlationId);
return failRecord != null ? failRecord.getRetryCount() : 0;
} catch (Exception e) {
log.warn("[埋点消费者] 获取消息重试次数失败: {}", e.getMessage());
log.warn("[埋点消费者] 获取重试次数失败", e);
return 0;
}
}
/**
* 根据关联ID查找失败记录
*/
private BuriedPointFailRecord findFailRecord(String correlationId) {
LambdaQueryWrapper<BuriedPointFailRecord> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(BuriedPointFailRecord::getCorrelationId, correlationId);
return buriedPointFailRecordMapper.selectOne(queryWrapper);
}
@Override
public void updateMessageStatus(BuriedMessages message) {
try {
@@ -90,11 +92,9 @@ public class BuriedPointConsumer extends AbstractRabbitMQConsumer<BuriedMessages
buriedPoint.setUpdateTime(new Date());
buriedPoint.setRetryCount(message.getRetryCount());
buriedPointMapper.updateById(buriedPoint);
log.debug("[埋点消费者] 已更新埋点状态, 事件ID: {}, 新状态: {}, 重试次数: {}",
message.getId(), message.getStatusCode(), message.getRetryCount());
}
} catch (Exception e) {
log.error("[埋点消费者] 更新埋点状态失败: {}, 错误: {}", message.getId(), e.getMessage(), e);
log.error("[埋点消费者] 更新状态失败", e);
}
}
@@ -103,206 +103,157 @@ public class BuriedPointConsumer extends AbstractRabbitMQConsumer<BuriedMessages
try {
BuriedPoint buriedPoint = buriedPointMapper.selectByEventId(message.getId());
if (buriedPoint != null) {
buriedPoint.setRetryCount(message.getRetryCount());
buriedPoint.setUpdateTime(new Date());
buriedPointMapper.updateById(buriedPoint);
updateBuriedPointRetryCount(buriedPoint, message);
return;
}
String correlationId = String.valueOf(message.getId());
BuriedPointFailRecord failRecord = findFailRecord(correlationId);
if (failRecord != null) {
updateFailRecordRetryCount(failRecord, message);
} else {
String correlationId = String.valueOf(message.getId());
LambdaQueryWrapper<BuriedPointFailRecord> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(BuriedPointFailRecord::getCorrelationId, correlationId);
BuriedPointFailRecord failRecord = buriedPointFailRecordMapper.selectOne(queryWrapper);
if (failRecord != null) {
failRecord.setRetryCount(message.getRetryCount());
failRecord.setUpdateTime(new Date());
failRecord.setMessageContent(JsonUtils.toJsonString(message));
buriedPointFailRecordMapper.updateById(failRecord);
} else {
saveToFailRecord(message, "");
}
saveToFailRecord(message, "");
}
} catch (Exception e) {
log.error("[埋点消费者] 更新重试次数失败: {}, 错误: {}", message.getId(), e.getMessage());
log.error("[埋点消费者] 更新重试次数失败", e);
}
}
/**
* 更新埋点表中的重试次数
*/
private void updateBuriedPointRetryCount(BuriedPoint buriedPoint, BuriedMessages message) {
buriedPoint.setRetryCount(message.getRetryCount());
buriedPoint.setUpdateTime(new Date());
buriedPointMapper.updateById(buriedPoint);
}
/**
* 更新失败记录表中的重试次数
*/
private void updateFailRecordRetryCount(BuriedPointFailRecord failRecord, BuriedMessages message) {
failRecord.setRetryCount(message.getRetryCount());
failRecord.setUpdateTime(new Date());
failRecord.setMessageContent(JsonUtils.toJsonString(message));
buriedPointFailRecordMapper.updateById(failRecord);
}
@Override
public boolean saveToDatabase(BuriedMessages message) {
try {
log.debug("[埋点消费者] 准备保存埋点数据事件ID: {}", message.getId());
BuriedPoint existingPoint = buriedPointMapper.selectByEventId(message.getId());
if (existingPoint != null) {
existingPoint.setStatus(message.getStatusCode());
existingPoint.setUpdateTime(new Date());
if (message.getRetryCount() != null) {
int newRetryCount = Math.max(existingPoint.getRetryCount(), message.getRetryCount());
existingPoint.setRetryCount(newRetryCount);
message.setRetryCount(newRetryCount);
}
int result = buriedPointMapper.updateById(existingPoint);
return result > 0;
return updateExistingBuriedPoint(existingPoint, message);
}
BuriedPoint buriedPoint = new BuriedPoint();
buriedPoint.setEventId(message.getId());
buriedPoint.setEventTime(System.currentTimeMillis());
buriedPoint.setUserId(message.getUserId());
buriedPoint.setEventType(message.getEventType());
buriedPoint.setService(applicationName);
buriedPoint.setMethod(message.getMethod());
buriedPoint.setSessionId(message.getSessionId());
buriedPoint.setClientIp(message.getClientIp());
buriedPoint.setServerIp(message.getServerIp());
buriedPoint.setStatus(message.getStatusCode());
buriedPoint.setRetryCount(message.getRetryCount());
buriedPoint.setPagePath(message.getPagePath());
buriedPoint.setElementId(message.getElementId());
buriedPoint.setDuration(message.getDuration());
buriedPoint.setCreateTime(new Date());
buriedPoint.setUpdateTime(new Date());
log.debug("[埋点消费者] 埋点实体数据: eventId={}, eventType={}, userId={}, service={}, method={}, status={}, retryCount={}",
buriedPoint.getEventId(), buriedPoint.getEventType(), buriedPoint.getUserId(),
buriedPoint.getService(), buriedPoint.getMethod(), buriedPoint.getStatus(),
buriedPoint.getRetryCount());
buriedPointMapper.insert(buriedPoint);
log.info("[埋点消费者] 埋点数据已保存到数据库, 事件ID: {}, 状态: {}", message.getId(), message.getStatusCode());
return true;
return createNewBuriedPoint(message);
} catch (DuplicateKeyException e) {
log.warn("[埋点消费者] 埋点数据已存在, 事件ID: {}", message.getId());
return true; // 数据已存在也视为成功
return true;
} catch (Exception e) {
log.error("[埋点消费者] 保存埋点数据到数据库失败, 事件ID: {}, 错误: {}", message.getId(), e.getMessage(), e);
log.error("[埋点消费者] 保存数据失败", e);
throw e;
}
}
/**
* 更新已存在的埋点记录
*/
private boolean updateExistingBuriedPoint(BuriedPoint existingPoint, BuriedMessages message) {
existingPoint.setStatus(message.getStatusCode());
existingPoint.setUpdateTime(new Date());
if (message.getRetryCount() != null) {
int newRetryCount = Math.max(existingPoint.getRetryCount(), message.getRetryCount());
existingPoint.setRetryCount(newRetryCount);
message.setRetryCount(newRetryCount);
}
return buriedPointMapper.updateById(existingPoint) > 0;
}
/**
* 创建新的埋点记录
*/
private boolean createNewBuriedPoint(BuriedMessages message) {
BuriedPoint buriedPoint = new BuriedPoint();
buriedPoint.setEventId(message.getId());
buriedPoint.setEventTime(System.currentTimeMillis());
buriedPoint.setUserId(message.getUserId());
buriedPoint.setEventType(message.getEventType());
buriedPoint.setService(applicationName);
buriedPoint.setMethod(message.getMethod());
buriedPoint.setSessionId(message.getSessionId());
buriedPoint.setClientIp(message.getClientIp());
buriedPoint.setServerIp(message.getServerIp());
buriedPoint.setStatus(message.getStatusCode());
buriedPoint.setRetryCount(message.getRetryCount());
buriedPoint.setPagePath(message.getPagePath());
buriedPoint.setElementId(message.getElementId());
buriedPoint.setDuration(message.getDuration());
buriedPoint.setCreateTime(new Date());
buriedPoint.setUpdateTime(new Date());
buriedPointMapper.insert(buriedPoint);
return true;
}
@Override
public void saveToFailRecord(BuriedMessages message, String cause) {
try {
String correlationId = String.valueOf(message.getId());
LambdaQueryWrapper<BuriedPointFailRecord> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(BuriedPointFailRecord::getCorrelationId, correlationId);
BuriedPointFailRecord existingRecord = buriedPointFailRecordMapper.selectOne(queryWrapper);
BuriedPointFailRecord existingRecord = findFailRecord(correlationId);
if (existingRecord != null) {
log.info("[埋点消费者] 发现已有失败记录,将更新: {}", correlationId);
existingRecord.setExchange(BuriedMessages.EXCHANGE);
existingRecord.setRoutingKey(BuriedMessages.ROUTING_KEY);
existingRecord.setCause(message.getErrorMessage()+cause);
existingRecord.setMessageContent(JsonUtils.toJsonString(message));
existingRecord.setRetryCount(message.getRetryCount());
existingRecord.setStatus(BuriedPointFailRecord.STATUS_UNPROCESSED);
existingRecord.setUpdateTime(new Date());
buriedPointFailRecordMapper.updateById(existingRecord);
log.info("[埋点消费者] 已更新失败记录, 事件ID: {}", correlationId);
updateExistingFailRecord(existingRecord, message, cause);
} else {
BuriedPointFailRecord failRecord = new BuriedPointFailRecord();
failRecord.setCorrelationId(correlationId);
failRecord.setExchange(BuriedMessages.EXCHANGE);
failRecord.setRoutingKey(BuriedMessages.ROUTING_KEY);
failRecord.setCause(message.getErrorMessage()+cause);
failRecord.setMessageContent(JsonUtils.toJsonString(message));
failRecord.setRetryCount(message.getRetryCount());
failRecord.setStatus(BuriedPointFailRecord.STATUS_UNPROCESSED);
failRecord.setCreateTime(new Date());
failRecord.setUpdateTime(new Date());
buriedPointFailRecordMapper.insert(failRecord);
log.info("[埋点消费者] 已将失败消息保存到失败记录表, 事件ID: {}", message.getId());
// 查询最近12小时的失败记录数量
createNewFailRecord(message, cause);
checkFailRecordsAndAlert();
}
} catch (Exception e) {
log.error("[埋点消费者] 保存失败记录失败: {}, 错误: {}", message.getId(), e.getMessage(), e);
log.error("[埋点消费者] 保存失败记录失败", e);
}
}
/**
* 更新已存在的失败记录
*/
private void updateExistingFailRecord(BuriedPointFailRecord record, BuriedMessages message, String cause) {
record.setExchange(BuriedMessages.EXCHANGE);
record.setRoutingKey(BuriedMessages.ROUTING_KEY);
record.setCause(message.getErrorMessage() + cause);
record.setMessageContent(JsonUtils.toJsonString(message));
record.setRetryCount(message.getRetryCount());
record.setStatus(BuriedPointFailRecord.STATUS_UNPROCESSED);
record.setUpdateTime(new Date());
buriedPointFailRecordMapper.updateById(record);
}
/**
* 创建新的失败记录
*/
private void createNewFailRecord(BuriedMessages message, String cause) {
String correlationId = String.valueOf(message.getId());
BuriedPointFailRecord failRecord = new BuriedPointFailRecord();
failRecord.setCorrelationId(correlationId);
failRecord.setExchange(BuriedMessages.EXCHANGE);
failRecord.setRoutingKey(BuriedMessages.ROUTING_KEY);
failRecord.setCause(message.getErrorMessage() + cause);
failRecord.setMessageContent(JsonUtils.toJsonString(message));
failRecord.setRetryCount(message.getRetryCount());
failRecord.setStatus(BuriedPointFailRecord.STATUS_UNPROCESSED);
failRecord.setCreateTime(new Date());
failRecord.setUpdateTime(new Date());
buriedPointFailRecordMapper.insert(failRecord);
}
/**
* 检查失败记录数量并发送告警
*/
private void checkFailRecordsAndAlert() {
try {
Date now = new Date();
Date twelveHoursAgo = new Date(now.getTime() - 12 * 60 * 60 * 1000L);
LambdaQueryWrapper<BuriedPointFailRecord> failRecordQuery = new LambdaQueryWrapper<>();
failRecordQuery.ge(BuriedPointFailRecord::getCreateTime, twelveHoursAgo)
.le(BuriedPointFailRecord::getCreateTime, now)
.eq(BuriedPointFailRecord::getStatus, BuriedPointFailRecord.STATUS_UNPROCESSED);
Long failCountLast12Hours = buriedPointFailRecordMapper.selectCount(failRecordQuery);
log.warn("[埋点配置] 最近12小时埋点失败数量: {}", failCountLast12Hours);
// 如果失败数量过多,记录警告日志
if (failCountLast12Hours > 3) {
// 查询最近12小时的埋点失败数据按小时统计
List<ChartImageGenerator.MonitoringDataPoint> monitoringData = queryHourlyFailRecordData(twelveHoursAgo, now);
try {
// 发送飞书告警消息
feiShuAlertClient.sendBuriedPointAlertMessage(larkConfig.getChatId(),
monitoringData,
failCountLast12Hours.intValue(),
"埋点处理异常,请检查系统");
} catch (Exception e) {
log.error("[埋点配置] 发送飞书告警失败", e);
}
log.error("[埋点配置] 警告最近12小时埋点失败数量过多请检查系统失败数量: {}", failCountLast12Hours);
}
buriedPointMonitorService.checkFailRecordsAndAlert("埋点处理异常,请检查系统");
} catch (Exception e) {
log.error("[埋点配置] 检查失败记录数量异常", e);
}
}
/**
* 查询失败记录数据,按小时统计
*/
private List<ChartImageGenerator.MonitoringDataPoint> queryHourlyFailRecordData(Date startDate, Date endDate) {
List<ChartImageGenerator.MonitoringDataPoint> result = new ArrayList<>();
try {
// 只取最近12个小时的数据
Calendar calendar = Calendar.getInstance();
calendar.setTime(endDate);
calendar.add(Calendar.HOUR_OF_DAY, -12);
Date twelveHoursAgo = calendar.getTime();
SimpleDateFormat sdf = new SimpleDateFormat("HH:00");
// 从12小时前开始每小时一个数据点
for (int i = 0; i < 12; i++) {
calendar.setTime(twelveHoursAgo);
calendar.add(Calendar.HOUR_OF_DAY, i);
Date currentHourStart = calendar.getTime();
calendar.add(Calendar.HOUR_OF_DAY, 1);
Date nextHourStart = calendar.getTime();
// 查询处理成功的记录数量
LambdaQueryWrapper<BuriedPointFailRecord> successQuery = new LambdaQueryWrapper<>();
successQuery.ge(BuriedPointFailRecord::getCreateTime, currentHourStart)
.lt(BuriedPointFailRecord::getCreateTime, nextHourStart)
.eq(BuriedPointFailRecord::getStatus, BuriedPointFailRecord.STATUS_SUCCESS); // 处理成功
Long successCount = buriedPointFailRecordMapper.selectCount(successQuery);
// 查询处理失败或未处理的记录数量
LambdaQueryWrapper<BuriedPointFailRecord> failQuery = new LambdaQueryWrapper<>();
failQuery.ge(BuriedPointFailRecord::getCreateTime, currentHourStart)
.lt(BuriedPointFailRecord::getCreateTime, nextHourStart)
.in(BuriedPointFailRecord::getStatus,
Arrays.asList(BuriedPointFailRecord.STATUS_UNPROCESSED, BuriedPointFailRecord.STATUS_FAILED)); // 未处理或处理失败
Long failCount = buriedPointFailRecordMapper.selectCount(failQuery);
// 添加到结果列表,无论是否有数据
String hourLabel = sdf.format(currentHourStart);
result.add(new ChartImageGenerator.MonitoringDataPoint(hourLabel, successCount.intValue(), failCount.intValue()));
}
return result;
} catch (Exception e) {
log.error("[埋点配置] 查询每小时失败记录数据失败", e);
// 返回空列表
return Collections.emptyList();
log.error("[埋点消费者] 检查失败记录异常", e);
}
}
}

View File

@@ -3,154 +3,91 @@ package com.tashow.cloud.app.mq.handler;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.tashow.cloud.app.mapper.BuriedPointFailRecordMapper;
import com.tashow.cloud.app.model.BuriedPointFailRecord;
import com.tashow.cloud.app.service.feishu.BuriedPointMonitorService;
import com.tashow.cloud.mq.handler.FailRecordHandler;
import com.tashow.cloud.sdk.feishu.client.FeiShuAlertClient;
import com.tashow.cloud.sdk.feishu.config.LarkConfig;
import com.tashow.cloud.sdk.feishu.util.ChartImageGenerator;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.List;
/**
* 埋点失败记录处理器
*
* @author tashow
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class BuriedPointFailRecordHandler implements FailRecordHandler {
private final BuriedPointFailRecordMapper buriedPointFailRecordMapper;
private final BuriedPointMonitorService buriedPointMonitorService;
@Autowired
private BuriedPointFailRecordMapper buriedPointFailRecordMapper;
@Autowired
FeiShuAlertClient feiShuAlertClient;
@Autowired
LarkConfig larkConfig;
/**
* 保存消息发送失败记录
*/
@Override
public void saveFailRecord(String correlationId, String exchange, String routingKey, String cause, String messageContent) {
try {
log.info("[埋点处理器] 保存发送失败记录: correlationId={}", correlationId);
// 先查询是否已存在记录
LambdaQueryWrapper<BuriedPointFailRecord> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(BuriedPointFailRecord::getCorrelationId, correlationId);
BuriedPointFailRecord existingRecord = buriedPointFailRecordMapper.selectOne(queryWrapper);
BuriedPointFailRecord existingRecord = findExistingRecord(correlationId);
if (existingRecord != null) {
log.info("[埋点处理器] 发现已有失败记录,将更新: {}", correlationId);
existingRecord.setExchange(exchange);
existingRecord.setRoutingKey(routingKey);
existingRecord.setCause(cause);
existingRecord.setMessageContent(messageContent);
existingRecord.setStatus(BuriedPointFailRecord.STATUS_UNPROCESSED);
existingRecord.setUpdateTime(new Date());
buriedPointFailRecordMapper.updateById(existingRecord);
log.info("[埋点处理器] 发送失败记录已更新: correlationId={}", correlationId);
updateExistingRecord(existingRecord, exchange, routingKey, cause, messageContent);
} else {
BuriedPointFailRecord failRecord = new BuriedPointFailRecord();
failRecord.setCorrelationId(correlationId);
failRecord.setExchange(exchange);
failRecord.setRoutingKey(routingKey);
failRecord.setCause(cause);
failRecord.setMessageContent(messageContent);
failRecord.setRetryCount(0);
failRecord.setStatus(BuriedPointFailRecord.STATUS_UNPROCESSED);
failRecord.setCreateTime(new Date());
failRecord.setUpdateTime(new Date());
buriedPointFailRecordMapper.insert(failRecord);
log.info("[埋点处理器] 发送失败记录已保存: correlationId={}", correlationId);
createNewFailRecord(correlationId, exchange, routingKey, cause, messageContent);
checkAlertThreshold(cause);
}
} catch (Exception e) {
log.error("[埋点处理器] 保存发送失败记录异常", e);
log.error("[埋点处理器] 保存失败记录异常", e);
}
}
/**
* 查找已存在的失败记录
*/
private BuriedPointFailRecord findExistingRecord(String correlationId) {
LambdaQueryWrapper<BuriedPointFailRecord> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(BuriedPointFailRecord::getCorrelationId, correlationId);
return buriedPointFailRecordMapper.selectOne(queryWrapper);
}
/**
* 更新已存在的失败记录
*/
private void updateExistingRecord(BuriedPointFailRecord record, String exchange, String routingKey,
String cause, String messageContent) {
record.setExchange(exchange);
record.setRoutingKey(routingKey);
record.setCause(cause);
record.setMessageContent(messageContent);
record.setStatus(BuriedPointFailRecord.STATUS_UNPROCESSED);
record.setUpdateTime(new Date());
buriedPointFailRecordMapper.updateById(record);
}
/**
* 创建新的失败记录
*/
private void createNewFailRecord(String correlationId, String exchange, String routingKey,
String cause, String messageContent) {
BuriedPointFailRecord failRecord = new BuriedPointFailRecord();
failRecord.setCorrelationId(correlationId);
failRecord.setExchange(exchange);
failRecord.setRoutingKey(routingKey);
failRecord.setCause(cause);
failRecord.setMessageContent(messageContent);
failRecord.setRetryCount(0);
failRecord.setStatus(BuriedPointFailRecord.STATUS_UNPROCESSED);
failRecord.setCreateTime(new Date());
failRecord.setUpdateTime(new Date());
buriedPointFailRecordMapper.insert(failRecord);
}
/**
* 检查是否达到告警阈值
*/
@Override
public boolean checkAlertThreshold(String cause) {
try {
Date now = new Date();
Date twelveHoursAgo = new Date(now.getTime() - 12 * 60 * 60 * 1000L);
LambdaQueryWrapper<BuriedPointFailRecord> failRecordQuery = new LambdaQueryWrapper<>();
failRecordQuery.ge(BuriedPointFailRecord::getCreateTime, twelveHoursAgo).le(BuriedPointFailRecord::getCreateTime, now).eq(BuriedPointFailRecord::getStatus, BuriedPointFailRecord.STATUS_UNPROCESSED);
Long failCountLast12Hours = buriedPointFailRecordMapper.selectCount(failRecordQuery);
// 如果失败数量过多,记录警告日志
if (failCountLast12Hours > 3) {
List<ChartImageGenerator.MonitoringDataPoint> monitoringData = queryHourlyFailRecordData(twelveHoursAgo, now);
try {
// 发送飞书告警消息
feiShuAlertClient.sendBuriedPointAlertMessage(larkConfig.getChatId(), monitoringData, failCountLast12Hours.intValue(), cause);
} catch (Exception e) {
log.error("[埋点处理器] 发送飞书告警失败", e);
}
return true;
}
return false;
} catch (Exception e) {
log.error("[埋点处理器] 检查告警阈值异常", e);
return false;
}
}
/**
* 查询失败记录数据,按小时统计
* 仅查询最近12个小时的数据
*/
private List<ChartImageGenerator.MonitoringDataPoint> queryHourlyFailRecordData(Date startDate, Date endDate) {
List<ChartImageGenerator.MonitoringDataPoint> result = new ArrayList<>();
try {
// 只取最近12个小时的数据
Calendar calendar = Calendar.getInstance();
calendar.setTime(endDate);
calendar.add(Calendar.HOUR_OF_DAY, -12);
Date twelveHoursAgo = calendar.getTime();
SimpleDateFormat sdf = new SimpleDateFormat("HH:00");
// 从12小时前开始每小时一个数据点
for (int i = 0; i < 12; i++) {
calendar.setTime(twelveHoursAgo);
calendar.add(Calendar.HOUR_OF_DAY, i);
Date currentHourStart = calendar.getTime();
calendar.add(Calendar.HOUR_OF_DAY, 1);
Date nextHourStart = calendar.getTime();
// 查询处理成功的记录数量
LambdaQueryWrapper<BuriedPointFailRecord> successQuery = new LambdaQueryWrapper<>();
successQuery.ge(BuriedPointFailRecord::getCreateTime, currentHourStart).lt(BuriedPointFailRecord::getCreateTime, nextHourStart).eq(BuriedPointFailRecord::getStatus, BuriedPointFailRecord.STATUS_SUCCESS); // 处理成功
Long successCount = buriedPointFailRecordMapper.selectCount(successQuery);
// 查询处理失败或未处理的记录数量
LambdaQueryWrapper<BuriedPointFailRecord> failQuery = new LambdaQueryWrapper<>();
failQuery.ge(BuriedPointFailRecord::getCreateTime, currentHourStart).lt(BuriedPointFailRecord::getCreateTime, nextHourStart).in(BuriedPointFailRecord::getStatus, Arrays.asList(BuriedPointFailRecord.STATUS_UNPROCESSED, BuriedPointFailRecord.STATUS_FAILED)); // 未处理或处理失败
Long failCount = buriedPointFailRecordMapper.selectCount(failQuery);
// 添加到结果列表,无论是否有数据
String hourLabel = sdf.format(currentHourStart);
result.add(new ChartImageGenerator.MonitoringDataPoint(hourLabel, successCount.intValue(), failCount.intValue()));
}
return result;
} catch (Exception e) {
log.error("[埋点处理器] 查询每小时失败记录数据失败", e);
// 返回空列表
return Collections.emptyList();
}
return buriedPointMonitorService.checkFailRecordsAndAlert(cause);
}
}

View File

@@ -0,0 +1,166 @@
package com.tashow.cloud.app.service.feishu;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.tashow.cloud.app.mapper.BuriedPointFailRecordMapper;
import com.tashow.cloud.app.model.BuriedPointFailRecord;
import com.tashow.cloud.sdk.feishu.client.FeiShuAlertClient;
import com.tashow.cloud.sdk.feishu.config.LarkConfig;
import com.tashow.cloud.sdk.feishu.util.ChartImageGenerator;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.*;
/**
* 埋点监控服务
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class BuriedPointMonitorService {
private static final int ALERT_THRESHOLD = 3;
private static final int MONITORING_HOURS = 12;
private final BuriedPointFailRecordMapper buriedPointFailRecordMapper;
private final FeiShuAlertClient feiShuAlertClient;
private final FeiShuCardDataService feiShuCardDataService;
private final LarkConfig larkConfig;
/**
* 检查失败记录并发送告警
*/
public boolean checkFailRecordsAndAlert(String cause) {
try {
Date now = new Date();
Date hoursAgo = getDateHoursAgo(now, MONITORING_HOURS);
Long failCount = getUnprocessedFailCount(hoursAgo, now);
if (failCount > ALERT_THRESHOLD) {
sendAlertMessage(failCount.intValue(), hoursAgo, now, cause);
return true;
}
return false;
} catch (Exception e) {
log.error("[埋点监控] 检查失败记录异常", e);
return false;
}
}
/**
* 获取未处理的失败记录数量
*/
public Long getUnprocessedFailCount(Date startDate, Date endDate) {
LambdaQueryWrapper<BuriedPointFailRecord> query = new LambdaQueryWrapper<>();
query.ge(BuriedPointFailRecord::getCreateTime, startDate)
.le(BuriedPointFailRecord::getCreateTime, endDate)
.eq(BuriedPointFailRecord::getStatus, BuriedPointFailRecord.STATUS_UNPROCESSED);
return buriedPointFailRecordMapper.selectCount(query);
}
/**
* 发送告警消息
*/
private void sendAlertMessage(int failCount, Date startDate, Date endDate, String errorMessage) {
try {
List<ChartImageGenerator.MonitoringDataPoint> monitoringData =
queryHourlyFailRecordData(startDate, endDate);
HashMap<String, Object> templateData = new HashMap<>();
String chatId = larkConfig.getChatId();
String imageKey = feiShuAlertClient.uploadImage(monitoringData, errorMessage);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
templateData.put("alert_title", "埋点数据异常告警");
templateData.put("image_key", imageKey);
templateData.put("current_time", sdf.format(new Date()));
templateData.put("fail_count", failCount);
String messageId = feiShuAlertClient.sendCardMessage(
chatId,
larkConfig.getExceptionCards(),
templateData
);
feiShuCardDataService.saveCardData(messageId, templateData);
} catch (Exception e) {
log.error("[埋点监控] 发送告警失败", e);
}
}
/**
* 查询按小时统计的失败记录数据
*/
public List<ChartImageGenerator.MonitoringDataPoint> queryHourlyFailRecordData(Date startDate, Date endDate) {
List<ChartImageGenerator.MonitoringDataPoint> result = new ArrayList<>();
try {
Date limitedStartDate = getDateHoursAgo(endDate, MONITORING_HOURS);
Date actualStartDate = startDate.after(limitedStartDate) ? startDate : limitedStartDate;
SimpleDateFormat sdf = new SimpleDateFormat("HH:00");
Calendar calendar = Calendar.getInstance();
long hoursDiff = (endDate.getTime() - actualStartDate.getTime()) / (60 * 60 * 1000) + 1;
int hours = (int) Math.min(hoursDiff, MONITORING_HOURS);
for (int i = 0; i < hours; i++) {
calendar.setTime(actualStartDate);
calendar.add(Calendar.HOUR_OF_DAY, i);
Date currentHourStart = calendar.getTime();
calendar.add(Calendar.HOUR_OF_DAY, 1);
Date nextHourStart = calendar.getTime();
int successCount = getHourlyRecordCount(currentHourStart, nextHourStart, BuriedPointFailRecord.STATUS_SUCCESS);
int failCount = getHourlyFailedCount(currentHourStart, nextHourStart);
result.add(new ChartImageGenerator.MonitoringDataPoint(
sdf.format(currentHourStart),
successCount,
failCount
));
}
return result;
} catch (Exception e) {
log.error("[埋点监控] 查询小时数据失败", e);
return Collections.emptyList();
}
}
/**
* 获取指定状态的记录数量
*/
private int getHourlyRecordCount(Date startHour, Date endHour, int status) {
LambdaQueryWrapper<BuriedPointFailRecord> query = new LambdaQueryWrapper<>();
query.ge(BuriedPointFailRecord::getCreateTime, startHour)
.lt(BuriedPointFailRecord::getCreateTime, endHour)
.eq(BuriedPointFailRecord::getStatus, status);
return buriedPointFailRecordMapper.selectCount(query).intValue();
}
/**
* 获取失败记录数量
*/
private int getHourlyFailedCount(Date startHour, Date endHour) {
LambdaQueryWrapper<BuriedPointFailRecord> query = new LambdaQueryWrapper<>();
query.ge(BuriedPointFailRecord::getCreateTime, startHour)
.lt(BuriedPointFailRecord::getCreateTime, endHour)
.in(BuriedPointFailRecord::getStatus,
Arrays.asList(BuriedPointFailRecord.STATUS_UNPROCESSED, BuriedPointFailRecord.STATUS_FAILED));
return buriedPointFailRecordMapper.selectCount(query).intValue();
}
/**
* 获取指定时间前N小时的时间
*/
private Date getDateHoursAgo(Date date, int hours) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
calendar.add(Calendar.HOUR_OF_DAY, -hours);
return calendar.getTime();
}
}

View File

@@ -2,7 +2,6 @@ package com.tashow.cloud.app.service.feishu;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.tashow.cloud.sdk.feishu.client.FeiShuAlertClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -15,12 +14,14 @@ import java.util.concurrent.TimeUnit;
/**
* 飞书卡片数据处理服务
* 负责卡片数据的存储和获取
*/
@Service
public class FeiShuCardDataService implements FeiShuAlertClient.CardDataHandler {
public class FeiShuCardDataService {
private final Logger log = LoggerFactory.getLogger(FeiShuCardDataService.class);
private static final Logger log = LoggerFactory.getLogger(FeiShuCardDataService.class);
private static final String REDIS_KEY_PREFIX = "feishu:card:";
private static final int CARD_EXPIRATION_DAYS = 30;
private final StringRedisTemplate stringRedisTemplate;
private final ObjectMapper objectMapper;
@@ -32,34 +33,41 @@ public class FeiShuCardDataService implements FeiShuAlertClient.CardDataHandler
/**
* 保存卡片数据到Redis
* @param messageId 消息ID
* @param data 卡片数据
*/
@Override
public void saveCardData(String messageId, Map<String, Object> data) {
public boolean saveCardData(String messageId, Map<String, Object> data) {
if (messageId == null || data == null) return false;
try {
String jsonData = objectMapper.writeValueAsString(data);
stringRedisTemplate.opsForValue().set(messageId, jsonData, 30, TimeUnit.DAYS);
log.debug("卡片数据已保存到Redis, messageId: {}", messageId);
stringRedisTemplate.opsForValue().set(
REDIS_KEY_PREFIX + messageId,
jsonData,
CARD_EXPIRATION_DAYS,
TimeUnit.DAYS
);
return true;
} catch (JsonProcessingException e) {
log.error("保存卡片数据到Redis失败", e);
log.error("保存卡片数据失败: {}", e.getMessage());
return false;
}
}
/**
* 从Redis获取卡片数据
* @param messageId 消息ID
* @return 卡片数据
*/
@Override
public Map<String, Object> getCardData(String messageId) {
try {
String jsonData = stringRedisTemplate.opsForValue().get(messageId);
return objectMapper.readValue(jsonData, Map.class);
String redisKey = REDIS_KEY_PREFIX + messageId;
String jsonData = stringRedisTemplate.opsForValue().get(redisKey);
if (jsonData == null) return new HashMap<>();
@SuppressWarnings("unchecked")
Map<String, Object> result = objectMapper.readValue(jsonData, Map.class);
return result;
} catch (Exception e) {
throw new RuntimeException(e);
log.error("获取卡片数据失败: {}", e.getMessage());
return new HashMap<>();
}
}
}

View File

@@ -44,25 +44,19 @@ public class BuriedPointFailRecordService implements MessageRetryService<BuriedP
try {
Long id = Long.valueOf(recordId);
BuriedPointFailRecord record = buriedPointFailRecordMapper.selectById(id);
if (record == null) {
log.warn("[埋点重试] 未找到失败记录: {}", id);
return false;
}
BuriedMessages message = JsonUtils.parseObject(record.getMessageContent(), BuriedMessages.class);
if (message == null) {
log.error("[埋点重试] 消息内容解析失败: {}", record.getCorrelationId());
updateStatus(record, BuriedPointFailRecord.STATUS_FAILED);
return false;
}
log.info("[埋点重试] 准备重新发送消息: {}", record.getCorrelationId());
buriedPointProducer.asyncSendMessage(message, record.getCorrelationId());
record.setStatus(BuriedPointFailRecord.STATUS_SUCCESS);
record.setUpdateTime(new Date());
buriedPointFailRecordMapper.updateById(record);
log.info("[埋点重试] 重试成功,状态已更新为成功: {}", record.getCorrelationId());
updateStatus(record, BuriedPointFailRecord.STATUS_SUCCESS);
return true;
} catch (Exception e) {
log.error("[埋点重试] 重试失败消息异常: {}", recordId, e);
log.error("[埋点重试] 重试失败", e);
return false;
}
}
@@ -75,10 +69,9 @@ public class BuriedPointFailRecordService implements MessageRetryService<BuriedP
try {
record.setStatus(status);
record.setUpdateTime(new Date());
int result = buriedPointFailRecordMapper.updateById(record);
return result > 0;
return buriedPointFailRecordMapper.updateById(record) > 0;
} catch (Exception e) {
log.error("[埋点重试] 更新状态失败: {}", record.getCorrelationId(), e);
log.error("[埋点重试] 更新状态失败", e);
return false;
}
}