diff --git a/logs/gateway-server.log.2025-04-22.0.gz b/logs/gateway-server.log.2025-04-22.0.gz
deleted file mode 100644
index 9a94de3..0000000
Binary files a/logs/gateway-server.log.2025-04-22.0.gz and /dev/null differ
diff --git a/tashow-framework/tashow-framework-mq/pom.xml b/tashow-framework/tashow-framework-mq/pom.xml
index 835535e..7ed1e03 100644
--- a/tashow-framework/tashow-framework-mq/pom.xml
+++ b/tashow-framework/tashow-framework-mq/pom.xml
@@ -40,6 +40,21 @@
jakarta.servlet-api
true
+
+ org.projectlombok
+ lombok
+ provided
+
+
+ com.tashow.cloud
+ tashow-data-mybatis
+
+
+ org.jodd
+ jodd-util
+ 6.3.0
+ compile
+
diff --git a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/core/BaseMqMessage.java b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/core/BaseMqMessage.java
index 2415c34..11bbad2 100644
--- a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/core/BaseMqMessage.java
+++ b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/core/BaseMqMessage.java
@@ -1,25 +1,24 @@
package com.tashow.cloud.mq.core;
-
+import lombok.Data;
import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
-
/**
- * MQ消息基类,所有消息类型都应该继承此类
+ * MQ消息基类,
+ *
*
* @author tashow
*/
+@Data
public class BaseMqMessage implements Serializable {
private static final long serialVersionUID = 1L;
-
/**
* 消息ID,默认为UUID
*/
private Integer id = UUID.randomUUID().hashCode();
-
/**
* 消息状态码
*/
@@ -44,72 +43,4 @@ public class BaseMqMessage implements Serializable {
* 扩展数据
*/
private Map extraData = new HashMap<>();
-
- /**
- * 增加重试次数
- */
- public void incrementRetryCount() {
- if (retryCount == null) {
- retryCount = 0;
- }
- retryCount++;
- }
-
- /**
- * 添加额外数据
- */
- public void addExtraData(String key, Object value) {
- if (extraData == null) {
- extraData = new HashMap<>();
- }
- extraData.put(key, value);
- }
-
- public Integer getId() {
- return id;
- }
-
- public void setId(Integer id) {
- this.id = id;
- }
-
- public Integer getStatusCode() {
- return statusCode;
- }
-
- public void setStatusCode(Integer statusCode) {
- this.statusCode = statusCode;
- }
-
- public Integer getRetryCount() {
- return retryCount;
- }
-
- public void setRetryCount(Integer retryCount) {
- this.retryCount = retryCount;
- }
-
- public String getErrorMessage() {
- return errorMessage;
- }
-
- public void setErrorMessage(String errorMessage) {
- this.errorMessage = errorMessage;
- }
-
- public Date getCreateTime() {
- return createTime;
- }
-
- public void setCreateTime(Date createTime) {
- this.createTime = createTime;
- }
-
- public Map getExtraData() {
- return extraData;
- }
-
- public void setExtraData(Map extraData) {
- this.extraData = extraData;
- }
}
\ No newline at end of file
diff --git a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/core/CustomCorrelationData.java b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/core/CustomCorrelationData.java
deleted file mode 100644
index 518d0fd..0000000
--- a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/core/CustomCorrelationData.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package com.tashow.cloud.mq.core;
-
-import org.springframework.amqp.rabbit.connection.CorrelationData;
-
-/**
- * 自定义关联数据,用于存储额外的消息数据
- *
- * @author tashow
- */
-public class CustomCorrelationData extends CorrelationData {
-
- /**
- * 消息内容
- */
- private final String messageContent;
-
- /**
- * 构造函数
- *
- * @param id 关联ID
- * @param messageContent 消息内容
- */
- public CustomCorrelationData(String id, String messageContent) {
- super(id);
- this.messageContent = messageContent;
- }
-
- /**
- * 获取消息内容
- *
- * @return 消息内容
- */
- public String getMessageContent() {
- return messageContent;
- }
-
- @Override
- public String toString() {
- return "CustomCorrelationData{" +
- "id='" + getId() + '\'' +
- ", messageContent='" + messageContent + '\'' +
- '}';
- }
-}
\ No newline at end of file
diff --git a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/handler/FailRecordHandler.java b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/handler/FailRecordHandler.java
index ad5266f..1035e3f 100644
--- a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/handler/FailRecordHandler.java
+++ b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/handler/FailRecordHandler.java
@@ -1,39 +1,31 @@
package com.tashow.cloud.mq.handler;
-
/**
- * 消息发送失败记录处理接口
+ * 消息记录处理接口
*
* @author tashow
*/
public interface FailRecordHandler {
-
/**
- * 保存消息发送失败记录
+ * 保存消息记录
*
- * @param correlationId 关联ID
* @param exchange 交换机
* @param routingKey 路由键
- * @param cause 失败原因
+ * @param cause 失败原因,可为null
* @param messageContent 消息内容
+ * @param status 状态:0-未处理,1-处理成功,2-处理失败
*/
- void saveFailRecord(String correlationId, String exchange, String routingKey, String cause, String messageContent);
+ void saveMessageRecord(Integer id, String exchange, String routingKey, String cause, String messageContent, int status);
+ /**
+ * 更新消息状态
+ *
+ * @param id 关联ID
+ */
+ void updateMessageStatus(Integer id);
/**
- * 检查是否达到告警阈值
- *
- * @return 是否需要告警
+ * 更新消息状态并设置失败原因
+
*/
- default boolean checkAlertThreshold() {
- return checkAlertThreshold(null);
- }
-
- /**
- * 检查是否达到告警阈值,带错误信息
- *
- * @param cause 错误原因
- * @return 是否需要告警
- */
- default boolean checkAlertThreshold(String cause) {
- return false;
- }
+ void updateMessageStatusWithCause(Integer id, String causes);
+
}
\ No newline at end of file
diff --git a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/rabbitmq/config/RabbitMQAutoConfiguration.java b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/rabbitmq/config/RabbitMQAutoConfiguration.java
index 843e28d..0b1ab33 100644
--- a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/rabbitmq/config/RabbitMQAutoConfiguration.java
+++ b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/rabbitmq/config/RabbitMQAutoConfiguration.java
@@ -1,5 +1,4 @@
package com.tashow.cloud.mq.rabbitmq.config;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.AutoConfiguration;
@@ -13,7 +12,5 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
@AutoConfiguration
@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
public class RabbitMQAutoConfiguration extends RabbitMQConfiguration {
-
private static final Logger log = LoggerFactory.getLogger(RabbitMQAutoConfiguration.class);
-
}
\ No newline at end of file
diff --git a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/rabbitmq/config/RabbitMQConfiguration.java b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/rabbitmq/config/RabbitMQConfiguration.java
index 028f91b..47370c2 100644
--- a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/rabbitmq/config/RabbitMQConfiguration.java
+++ b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/rabbitmq/config/RabbitMQConfiguration.java
@@ -17,26 +17,6 @@ import org.springframework.context.annotation.Configuration;
@Configuration
@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
public class RabbitMQConfiguration {
-
- private static final Logger log = LoggerFactory.getLogger(RabbitMQConfiguration.class);
-
- /**
- * 初始化RabbitTemplate
- *
- * @param rabbitTemplate RabbitTemplate
- */
- protected void initRabbitTemplate(RabbitTemplate rabbitTemplate) {
- log.info("[MQ配置] 初始化RabbitTemplate: {}", rabbitTemplate);
- // 启用消息发送到交换机确认机制
- rabbitTemplate.setMandatory(true);
-
- if (rabbitTemplate.isConfirmListener()) {
- log.info("[MQ配置] 确认回调已正确配置");
- } else {
- log.error("[MQ配置] 确认回调配置失败");
- }
- }
-
/**
* 创建消息转换器
*
@@ -46,4 +26,4 @@ public class RabbitMQConfiguration {
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
-}
\ No newline at end of file
+}
\ No newline at end of file
diff --git a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/rabbitmq/consumer/AbstractRabbitMQConsumer.java b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/rabbitmq/consumer/AbstractRabbitMQConsumer.java
index ebf65bf..f4e0535 100644
--- a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/rabbitmq/consumer/AbstractRabbitMQConsumer.java
+++ b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/rabbitmq/consumer/AbstractRabbitMQConsumer.java
@@ -1,5 +1,4 @@
package com.tashow.cloud.mq.rabbitmq.consumer;
-
import com.rabbitmq.client.Channel;
import com.tashow.cloud.mq.core.BaseMqMessage;
import org.slf4j.Logger;
@@ -16,136 +15,52 @@ import org.springframework.messaging.handler.annotation.Header;
public abstract class AbstractRabbitMQConsumer {
private static final Logger log = LoggerFactory.getLogger(AbstractRabbitMQConsumer.class);
-
- /**
- * 消息状态:处理中
- */
- public static final int STATUS_PROCESSING = 10;
-
/**
* 消息状态:成功
*/
public static final int STATUS_SUCCESS = 20;
-
/**
- * 消息状态:失败
+ * 消息状态:消费异常
*/
- public static final int STATUS_ERROR = 30;
+ public static final int STATUS_SEND_EXCEPTION = 30;
+
/**
- * 处理消息
+ * 埋点处理消息
*
* @param message 消息对象
* @return 处理结果,true表示处理成功,false表示处理失败
*/
public abstract boolean processMessage(T message);
- /**
- * 获取消息重试次数
- *
- * @param message 消息对象
- * @return 重试次数
- */
- public Integer getRetryCount(T message) {
- return message.getRetryCount() != null ? message.getRetryCount() : 0;
- }
-
- /**
- * 更新消息状态
- *
- * @param message 消息对象
- */
- public abstract void updateMessageStatus(T message);
-
- /**
- * 更新消息重试次数
- *
- * @param message 消息对象
- */
- public abstract void updateRetryCount(T message);
-
- /**
- * 保存消息到数据库
- *
- * @param message 消息对象
- * @return 保存结果
- */
- public abstract boolean saveToDatabase(T message);
-
- /**
- * 保存消息到失败记录
- *
- * @param message 消息对象
- * @param cause 失败原因
- */
- public abstract void saveToFailRecord(T message, String cause);
-
- /**
- * 获取最大允许重试次数
- *
- * @return 最大重试次数
- */
- public int getMaxRetryAllowed() {
- return 3;
- }
/**
* 消息处理入口
*
- * @param message 消息对象
- * @param channel 通道
+ * @param message 消息对象
+ * @param channel 通道
* @param deliveryTag 投递标签
*/
public void onMessage(T message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
- Integer dbRetryCount = getRetryCount(message);
- message.setRetryCount(dbRetryCount);
-
- if (message.getRetryCount() != null && message.getRetryCount() >= getMaxRetryAllowed()) {
- message.setStatusCode(STATUS_ERROR);
- message.addExtraData("errorMessage", "已达到最大重试次数");
- saveToFailRecord(message, "已达到最大重试次数");
- safeChannelAck(channel, deliveryTag);
- return;
- }
-
- log.info("[MQ消费者] 收到消息: {}, 当前重试次数: {}/{}", message, message.getRetryCount(), getMaxRetryAllowed());
- message.setStatusCode(STATUS_PROCESSING);
-
+ message.setStatusCode(STATUS_SUCCESS);
try {
- boolean result = processMessage(message);
- if (result) {
- message.setStatusCode(STATUS_SUCCESS);
- updateMessageStatus(message);
- log.info("[MQ消费者] 消息处理成功,状态已更新为成功: {}", message.getId());
- } else {
- throw new RuntimeException("消息处理失败");
+ if(true){
+ throw new RuntimeException("测试异常");
}
+ processMessage(message);
safeChannelAck(channel, deliveryTag);
} catch (Exception e) {
- message.setStatusCode(STATUS_ERROR);
- message.addExtraData("errorMessage", e.getMessage());
- message.setErrorMessage(e.getMessage());
- log.error("[MQ消费者] 消息处理失败: {}, 错误: {}", message.getId(), e.getMessage());
+ message.setStatusCode(STATUS_SEND_EXCEPTION);
+ processMessage( message);
+ safeChannelAck(channel, deliveryTag);
- message.incrementRetryCount();
- updateRetryCount(message);
-
- if (message.getRetryCount() >= getMaxRetryAllowed()) {
- saveToDatabase(message);
- log.warn("[MQ消费者] 消息已达到最大重试次数: {}, 确认消息并保存到失败记录表", message.getRetryCount());
- saveToFailRecord(message, e.getMessage());
- safeChannelAck(channel, deliveryTag);
- } else {
- log.info("[MQ消费者] 消息将重新入队重试: {}, 当前重试次数: {}", message.getId(), message.getRetryCount());
- safeChannelNack(channel, deliveryTag, false, true);
- }
}
}
/**
* 安全确认消息
*
- * @param channel 通道
+ * @param channel 通道
* @param deliveryTag 投递标签
*/
protected void safeChannelAck(Channel channel, long deliveryTag) {
@@ -159,10 +74,10 @@ public abstract class AbstractRabbitMQConsumer {
/**
* 安全拒绝消息
*
- * @param channel 通道
+ * @param channel 通道
* @param deliveryTag 投递标签
- * @param multiple 是否批量
- * @param requeue 是否重新入队
+ * @param multiple 是否批量
+ * @param requeue 是否重新入队
*/
protected void safeChannelNack(Channel channel, long deliveryTag, boolean multiple, boolean requeue) {
try {
diff --git a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/rabbitmq/core/package-info.java b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/rabbitmq/core/package-info.java
deleted file mode 100644
index 8de99aa..0000000
--- a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/rabbitmq/core/package-info.java
+++ /dev/null
@@ -1,4 +0,0 @@
-/**
- * 占位符,无特殊逻辑
- */
-package com.tashow.cloud.mq.rabbitmq.core;
diff --git a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/rabbitmq/package-info.java b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/rabbitmq/package-info.java
deleted file mode 100644
index ce65891..0000000
--- a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/rabbitmq/package-info.java
+++ /dev/null
@@ -1,4 +0,0 @@
-/**
- * 消息队列,基于 RabbitMQ 提供
- */
-package com.tashow.cloud.mq.rabbitmq;
diff --git a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/rabbitmq/producer/AbstractRabbitMQProducer.java b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/rabbitmq/producer/AbstractRabbitMQProducer.java
index 970704e..072a657 100644
--- a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/rabbitmq/producer/AbstractRabbitMQProducer.java
+++ b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/rabbitmq/producer/AbstractRabbitMQProducer.java
@@ -1,15 +1,14 @@
package com.tashow.cloud.mq.rabbitmq.producer;
-
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.tashow.cloud.common.util.json.JsonUtils;
import com.tashow.cloud.mq.core.BaseMqMessage;
-import com.tashow.cloud.mq.core.CustomCorrelationData;
import com.tashow.cloud.mq.handler.FailRecordHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
-
import jakarta.annotation.PostConstruct;
import java.util.UUID;
@@ -35,11 +34,9 @@ public abstract class AbstractRabbitMQProducer
*/
@PostConstruct
public void initRabbitTemplate() {
- log.info("[MQ生产者] 初始化RabbitTemplate: {}", rabbitTemplate);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(this);
rabbitTemplate.setConfirmCallback(this);
-
if (rabbitTemplate.isConfirmListener()) {
log.info("[MQ生产者] 确认回调已正确配置");
} else {
@@ -47,51 +44,38 @@ public abstract class AbstractRabbitMQProducer
}
}
- /**
- * 将消息转换为字符串
- *
- * @param message 消息对象
- * @return 消息字符串
- */
- protected abstract String convertMessageToString(T message);
- /**
- * 异步发送消息,自动生成correlationId
- *
- * @param message 消息对象
- */
- public void asyncSendMessage(T message) {
- String correlationId = UUID.randomUUID().toString();
- asyncSendMessage(message, correlationId);
- }
+
+
/**
* 异步发送消息,使用指定的correlationId
*
* @param message 消息对象
- * @param correlationId 关联ID
*/
- public void asyncSendMessage(T message, String correlationId) {
- log.info("[MQ生产者] 准备发送消息: {}, correlationId: {}", message, correlationId);
+ public void asyncSendMessage(T message) {
try {
- String messageJson = convertMessageToString(message);
- CustomCorrelationData correlationData = new CustomCorrelationData(correlationId, messageJson);
+ String messageJson = JsonUtils.toJsonString(message);
+ CorrelationData correlationData = new CorrelationData(messageJson);
+ failRecordHandler.saveMessageRecord(
+ message.getId(),
+ getExchange(),
+ getRoutingKey(),
+ null,
+ messageJson,
+ 0
+ );
rabbitTemplate.convertAndSend(getExchange(), getRoutingKey(), message, correlationData);
- log.info("[MQ生产者] 消息发送完成: {}, 状态: {}, 重试次数: {}, correlationId: {}",
- message.getId(), message.getStatusCode(), message.getRetryCount(), correlationId);
} catch (Exception e) {
- log.error("[MQ生产者] 消息发送异常: {}, correlationId: {}", e.getMessage(), correlationId, e);
throw e;
}
}
-
/**
* 获取交换机名称
*
* @return 交换机名称
*/
public abstract String getExchange();
-
/**
* 获取路由键
*
@@ -101,45 +85,13 @@ public abstract class AbstractRabbitMQProducer
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
+ JSONObject jsonObject = JSON.parseObject(correlationData.getId());
+ Integer id = jsonObject.getInteger("id");
if (ack) {
- log.info("[MQ生产者] 消息发送确认成功: {}", correlationData.getId());
+ failRecordHandler.updateMessageStatus(id);
} else {
- log.error("[MQ生产者] 消息发送确认失败: {}, 原因: {}, correlationData={}",
- correlationData.getId(), cause, correlationData);
- if (failRecordHandler != null && correlationData instanceof CustomCorrelationData) {
- CustomCorrelationData customData = (CustomCorrelationData) correlationData;
- String messageContent = customData.getMessageContent();
- failRecordHandler.saveFailRecord(
- correlationData.getId(),
- getExchange(),
- getRoutingKey(),
- cause,
- messageContent
- );
- } else {
- log.warn("[MQ生产者] 未配置FailRecordHandler或非CustomCorrelationData类型, 无法保存失败记录");
- }
+ failRecordHandler.updateMessageStatusWithCause(id,cause);
}
}
- @Override
- public void returnedMessage(ReturnedMessage returned) {
- log.error("[MQ生产者] 消息路由失败: exchange={}, routingKey={}, replyCode={}, replyText={}, message={}",
- returned.getExchange(),
- returned.getRoutingKey(),
- returned.getReplyCode(),
- returned.getReplyText(),
- new String(returned.getMessage().getBody()));
- if (failRecordHandler != null) {
- failRecordHandler.saveFailRecord(
- returned.getMessage().getMessageProperties().getCorrelationId(),
- returned.getExchange(),
- returned.getRoutingKey(),
- "路由失败: " + returned.getReplyText(),
- new String(returned.getMessage().getBody())
- );
- } else {
- log.warn("[MQ生产者] 未配置FailRecordHandler, 无法保存失败记录");
- }
- }
}
\ No newline at end of file
diff --git a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/retry/AbstractMessageRetryTask.java b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/retry/AbstractMessageRetryTask.java
index b8cf3a0..fa4aa49 100644
--- a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/retry/AbstractMessageRetryTask.java
+++ b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/retry/AbstractMessageRetryTask.java
@@ -15,15 +15,7 @@ public abstract class AbstractMessageRetryTask {
private static final Logger log = LoggerFactory.getLogger(AbstractMessageRetryTask.class);
- /**
- * 处理中状态码
- */
- public static final int STATUS_PROCESSING = 3;
-
- /**
- * 失败状态码
- */
- public static final int STATUS_FAILED = 4;
+
/**
* 获取消息重试服务
@@ -38,15 +30,7 @@ public abstract class AbstractMessageRetryTask {
* @param record 记录对象
* @return 记录ID
*/
- protected abstract String getRecordId(T record);
-
- /**
- * 获取关联ID
- *
- * @param record 记录对象
- * @return 关联ID
- */
- protected abstract String getCorrelationId(T record);
+ protected abstract Integer getRecordId(T record);
/**
* 执行重试
@@ -54,36 +38,10 @@ public abstract class AbstractMessageRetryTask {
public void retryFailedMessages() {
try {
List unprocessedRecords = getMessageRetryService().getUnprocessedRecords();
- if (unprocessedRecords.isEmpty()) {
- log.info("[MQ重试] 没有需要重试的消息");
- return;
- }
-
- log.info("[MQ重试] 本次需要重试的消息数量: {}", unprocessedRecords.size());
for (T record : unprocessedRecords) {
- try {
- // 先将状态更新为处理中,避免其他实例重复处理
- if (!getMessageRetryService().updateStatus(record, STATUS_PROCESSING)) {
- continue; // 如果更新失败,跳过当前记录
- }
-
- // 执行重试
- String recordId = getRecordId(record);
- boolean success = getMessageRetryService().retryFailedMessage(recordId);
-
- if (success) {
- log.info("[MQ重试] 消息重试成功: {}", getCorrelationId(record));
- } else {
- log.warn("[MQ重试] 消息重试失败: {}", getCorrelationId(record));
- getMessageRetryService().updateStatus(record, STATUS_FAILED);
- }
- } catch (Exception e) {
- // 发生异常时,更新状态为失败
- getMessageRetryService().updateStatus(record, STATUS_FAILED);
- log.error("[MQ重试] 重试消息异常: {}", getCorrelationId(record), e);
- }
+ Integer recordId = getRecordId(record);
+ getMessageRetryService().retryFailedMessage(recordId);
}
- log.info("[MQ重试] 消息重试任务完成");
} catch (Exception e) {
log.error("[MQ重试] 执行消息重试任务异常", e);
}
diff --git a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/retry/MessageRetryService.java b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/retry/MessageRetryService.java
index 0dc29b0..1272dc8 100644
--- a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/retry/MessageRetryService.java
+++ b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/retry/MessageRetryService.java
@@ -23,14 +23,6 @@ public interface MessageRetryService {
* @param recordId 记录ID
* @return 重试结果
*/
- boolean retryFailedMessage(String recordId);
-
- /**
- * 更新记录状态
- *
- * @param record 记录对象
- * @param status 记录状态
- * @return 更新结果
- */
- boolean updateStatus(T record, int status);
+ void retryFailedMessage(Integer recordId);
+
}
\ No newline at end of file
diff --git a/tashow-framework/tashow-framework-web/src/main/java/com/tashow/cloud/web/apilog/core/interceptor/ApiAccessLogInterceptor.java b/tashow-framework/tashow-framework-web/src/main/java/com/tashow/cloud/web/apilog/core/interceptor/ApiAccessLogInterceptor.java
index 9f35424..6de4c67 100644
--- a/tashow-framework/tashow-framework-web/src/main/java/com/tashow/cloud/web/apilog/core/interceptor/ApiAccessLogInterceptor.java
+++ b/tashow-framework/tashow-framework-web/src/main/java/com/tashow/cloud/web/apilog/core/interceptor/ApiAccessLogInterceptor.java
@@ -63,7 +63,7 @@ public class ApiAccessLogInterceptor implements HandlerInterceptor {
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) {
- // 打印 response 日志
+ // 打印 response 日志ss
if (!SpringUtils.isProd()) {
StopWatch stopWatch = (StopWatch) request.getAttribute(ATTRIBUTE_STOP_WATCH);
stopWatch.stop();
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/config/AppConfig.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/config/AppConfig.java
index 6a1f2d3..8684b88 100644
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/config/AppConfig.java
+++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/config/AppConfig.java
@@ -1,5 +1,4 @@
package com.tashow.cloud.app.config;
-
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/config/FeiShuClientConfig.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/config/FeiShuClientConfig.java
deleted file mode 100644
index 6ad0093..0000000
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/config/FeiShuClientConfig.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package com.tashow.cloud.app.config;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.data.redis.core.StringRedisTemplate;
-
-/**
- * 飞书客户端配置
- * 用于初始化FeiShuAlertClient的相关依赖
- */
-@Configuration
-public class FeiShuClientConfig {
- @Autowired
- private StringRedisTemplate stringRedisTemplate;
- /* @PostConstruct
- public void initFeiShuClient() {
- FeiShuAlertClient.setRedisTemplate(stringRedisTemplate);
- }*/
-}
\ No newline at end of file
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/controller/FeishuController.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/controller/FeishuController.java
index e374d81..7ef438e 100644
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/controller/FeishuController.java
+++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/controller/FeishuController.java
@@ -1,4 +1,5 @@
package com.tashow.cloud.app.controller;
+
import cn.hutool.json.JSONObject;
import com.lark.oapi.core.utils.Decryptor;
import com.tashow.cloud.app.service.feishu.FeiShuCardDataService;
@@ -11,27 +12,27 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.Map;
@RestController
public class FeishuController {
private static final Logger log = LoggerFactory.getLogger(FeishuController.class);
private static final String ACTION_COMPLETE_ALARM = "complete_alarm";
-
private final FeiShuAlertClient feiShuAlertClient;
private final FeiShuCardDataService feiShuCardDataService;
private final LarkConfig larkConfig;
-
+
@Autowired
- public FeishuController(FeiShuAlertClient feiShuAlertClient,
- FeiShuCardDataService feiShuCardDataService,
- LarkConfig larkConfig) {
+ public FeishuController(FeiShuAlertClient feiShuAlertClient, FeiShuCardDataService feiShuCardDataService, LarkConfig larkConfig) {
this.feiShuAlertClient = feiShuAlertClient;
this.feiShuCardDataService = feiShuCardDataService;
this.larkConfig = larkConfig;
}
-
- @RequestMapping("/card1")
+
+ @RequestMapping("/card")
@PermitAll
public String card(@RequestBody JSONObject data) {
try {
@@ -41,30 +42,23 @@ public class FeishuController {
if (value != null && ACTION_COMPLETE_ALARM.equals(value.getStr("action"))) {
String messageId = data.getStr("open_message_id");
Map templateData = feiShuCardDataService.getCardData(messageId);
+ templateData.put("open_id", data.getStr("open_id"));
+ templateData.put("complete_time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
+ JSONObject fromValue = action.getJSONObject("form_value");
+ templateData.put("notes", fromValue.getStr("notes_input"));
return feiShuAlertClient.buildCardWithData(larkConfig.getSuccessCards(), templateData);
}
}
-
if (data.containsKey("encrypt")) {
Decryptor decryptor = new Decryptor(larkConfig.getEncryptKey());
return decryptor.decrypt(data.getStr("encrypt"));
}
-
return "{}";
} catch (Exception e) {
log.error("卡片处理异常", e);
return "{\"code\":1,\"msg\":\"处理异常: " + e.getMessage() + "\"}";
}
}
-
- /**
- * 发送并存储卡片消息
- */
- public String sendAndStoreCardMessage(String chatId, String templateId, Map templateData) throws Exception {
- String messageId = feiShuAlertClient.sendCardMessage(chatId, templateId, templateData);
- if (messageId != null) {
- feiShuCardDataService.saveCardData(messageId, templateData);
- }
- return messageId;
- }
+
+
}
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/controller/TestController.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/controller/TestController.java
index 6770341..017e700 100644
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/controller/TestController.java
+++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/controller/TestController.java
@@ -13,10 +13,6 @@ import java.util.Map;
@RestController
@RequiredArgsConstructor
public class TestController {
-
- private final BuriedPointProducer buriedPointProducer;
- private final BuriedPointMapper buriedPointMapper;
-
/**
* 基础测试接口
*/
@@ -25,18 +21,5 @@ public class TestController {
public String test() {
return "test";
}
-
- /**
- * 测试埋点拦截器
- * 这个接口会被埋点拦截器自动记录请求信息
- */
- @GetMapping("/test/buried-point")
- @PermitAll
- public Map testBuriedPoint() {
- Map result = new HashMap<>();
- result.put("success", true);
- result.put("message", "埋点拦截器测试成功");
- return result;
- }
}
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/ext/HttpTranslator.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/ext/HttpTranslator.java
deleted file mode 100644
index 095b40e..0000000
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/ext/HttpTranslator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package com.tashow.cloud.app.ext;
-
-import com.lark.oapi.core.request.EventReq;
-import com.lark.oapi.core.response.EventResp;
-import jakarta.servlet.http.HttpServletRequest;
-import jakarta.servlet.http.HttpServletResponse;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-import java.util.stream.Collectors;
-
-public class HttpTranslator {
-
- private Map> toHeaderMap(HttpServletRequest req) {
- Map> headers = new HashMap<>();
- Enumeration names = req.getHeaderNames();
- while (names.hasMoreElements()) {
- String name = names.nextElement();
- List values = Collections.list(req.getHeaders(name));
- headers.put(name, values);
- }
- return headers;
- }
-
- public EventReq translate(HttpServletRequest request) throws IOException {
- String bodyStr = request.getReader().lines()
- .collect(Collectors.joining(System.lineSeparator()));
- EventReq req = new EventReq();
- req.setHeaders(toHeaderMap(request));
- req.setBody(bodyStr.getBytes(StandardCharsets.UTF_8));
- req.setHttpPath(request.getRequestURI());
- return req;
- }
-
- public void write(HttpServletResponse response, EventResp eventResp) throws IOException {
- response.setStatus(eventResp.getStatusCode());
- eventResp.getHeaders().entrySet().stream().forEach(keyValues -> {
- String key = keyValues.getKey();
- List values = keyValues.getValue();
- values.stream().forEach(v -> response.addHeader(key, v));
- });
- if (eventResp.getBody() != null) {
- response.getWriter().write(new String(eventResp.getBody()));
- }
- }
-}
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/ext/ServletAdapter.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/ext/ServletAdapter.java
deleted file mode 100644
index 1fcfbd5..0000000
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/ext/ServletAdapter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package com.tashow.cloud.app.ext;
-
-import com.lark.oapi.card.CardActionHandler;
-import com.lark.oapi.core.request.EventReq;
-import com.lark.oapi.core.response.EventResp;
-import com.lark.oapi.event.EventDispatcher;
-import jakarta.servlet.http.HttpServletRequest;
-import jakarta.servlet.http.HttpServletResponse;
-
-
-/**
- * Servlet的适配器,用于适配基于Servlet技术栈实现的Web服务
- */
-public class ServletAdapter {
-
- private static final HttpTranslator HTTP_TRANSLATOR = new HttpTranslator();
-
- /**
- * 处理消息事件
- *
- * @param req
- * @param response
- * @param eventDispatcher
- * @throws Throwable
- */
- public void handleEvent(HttpServletRequest req, HttpServletResponse response,
- EventDispatcher eventDispatcher) throws Throwable {
- // 转换请求对象
- EventReq eventReq = HTTP_TRANSLATOR.translate(req);
-
- // 处理请求
- EventResp resp = eventDispatcher.handle(eventReq);
-
- // 回写结果
- HTTP_TRANSLATOR.write(response, resp);
- }
-
- /**
- * 处理卡片消息
- *
- * @param req
- * @param response
- * @param handler
- * @throws Throwable
- */
- public void handleCardAction(HttpServletRequest req, HttpServletResponse response,
- CardActionHandler handler) throws Throwable {
- // 转换请求对象
- EventReq eventReq = HTTP_TRANSLATOR.translate(req);
-
- // 处理请求
- EventResp resp = handler.handle(eventReq);
-
- // 回写结果
- HTTP_TRANSLATOR.write(response, resp);
- }
-}
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/interceptor/BuriedPointInterceptor.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/interceptor/BuriedPointInterceptor.java
index 5616d25..e82ea33 100644
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/interceptor/BuriedPointInterceptor.java
+++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/interceptor/BuriedPointInterceptor.java
@@ -26,69 +26,21 @@ import java.net.UnknownHostException;
public class BuriedPointInterceptor implements HandlerInterceptor {
private final BuriedPointProducer buriedPointProducer;
-
- private static final String ATTRIBUTE_STOPWATCH = "BuriedPoint.StopWatch";
- private static final String ATTRIBUTE_REQUEST_ID = "BuriedPoint.RequestId";
-
+
+
+
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
- if (!(handler instanceof HandlerMethod)) {
+ if (!(handler instanceof HandlerMethod handlerMethod)) {
return true;
}
-
- try {
- StopWatch stopWatch = new StopWatch();
- stopWatch.start();
- request.setAttribute(ATTRIBUTE_STOPWATCH, stopWatch);
-
- int requestId = (int)(Math.abs(IdUtil.getSnowflakeNextId()) % Integer.MAX_VALUE);
- request.setAttribute(ATTRIBUTE_REQUEST_ID, requestId);
- HandlerMethod handlerMethod = (HandlerMethod) handler;
- String method = request.getMethod() + " " + request.getRequestURI()+ JsonUtils.toJsonString(request.getParameterMap());
- String controllerName = handlerMethod.getBeanType().getSimpleName();
- String actionName = handlerMethod.getMethod().getName();
- BuriedMessages message = new BuriedMessages();
- message.setId(requestId);
- message.setEventTime(new java.util.Date());
- message.setService(SpringUtils.getApplicationName());
- message.setMethod(method);
- message.setUserId(getUserId(request));
- message.setSessionId(request.getSession().getId());
- message.setClientIp(ServletUtils.getClientIP(request));
- message.setServerIp(getServerIp());
- message.setEventType("API_REQUEST_START");
- message.setPagePath(controllerName + "#" + actionName);
- message.setStatusCode(BuriedMessages.STATUS_PROCESSING);
+ BuriedMessages message = new BuriedMessages(
+ request,
+ handlerMethod
+ );
buriedPointProducer.asyncSendMessage(message);
- if (log.isDebugEnabled()) {
- log.debug("[埋点] 收集请求开始数据: {}", message);
- }
- } catch (Exception e) {
- log.warn("[埋点] 埋点数据收集异常", e);
- }
return true;
}
- /**
- * 获取当前登录用户ID
- * 如果未登录返回匿名标识
- */
- private String getUserId(HttpServletRequest request) {
- Object userAttribute = request.getSession().getAttribute("USER_ID");
- if (userAttribute != null) {
- return userAttribute.toString();
- }
- return "anonymous";
- }
-
- /**
- * 获取服务器IP
- */
- private String getServerIp() {
- try {
- return InetAddress.getLocalHost().getHostAddress();
- } catch (UnknownHostException e) {
- throw new RuntimeException(e);
- }
- }
+
}
\ No newline at end of file
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mapper/BuriedPointFailRecordMapper.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mapper/BuriedPointFailRecordMapper.java
index 87236ee..d285056 100644
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mapper/BuriedPointFailRecordMapper.java
+++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mapper/BuriedPointFailRecordMapper.java
@@ -1,12 +1,13 @@
package com.tashow.cloud.app.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-import com.tashow.cloud.app.model.BuriedPointFailRecord;
+import com.tashow.cloud.app.model.MqMessageRecord;
import org.apache.ibatis.annotations.Mapper;
/**
- * 埋点消息发送失败记录Mapper接口
+ * 埋点消息发送记录Mapper接口
*/
@Mapper
-public interface BuriedPointFailRecordMapper extends BaseMapper {
-}
\ No newline at end of file
+public interface BuriedPointFailRecordMapper extends BaseMapper {
+
+}
\ No newline at end of file
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/model/BuriedPoint.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/model/BuriedPoint.java
index 435621e..29208d1 100644
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/model/BuriedPoint.java
+++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/model/BuriedPoint.java
@@ -1,18 +1,19 @@
package com.tashow.cloud.app.model;
-
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
+import com.tashow.cloud.app.mq.message.BuriedMessages;
import lombok.Data;
+import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
-
import java.util.Date;
/**
* 埋点数据实体类
*/
@Data
+@NoArgsConstructor
@Accessors(chain = true)
@TableName(value = "app_burying")
public class BuriedPoint {
@@ -21,7 +22,6 @@ public class BuriedPoint {
*/
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
-
/**
* 事件唯一ID
*/
@@ -107,9 +107,22 @@ public class BuriedPoint {
@TableField(value = "status")
private Integer status;
- /**
- * 重试次数
- */
- @TableField(value = "retry_count")
- private Integer retryCount;
-}
\ No newline at end of file
+
+ public BuriedPoint(BuriedMessages message) {
+ this.eventId = message.getId();
+ this.eventTime = System.currentTimeMillis();
+ this.userId = message.getUserId();
+ this.eventType = message.getEventType();
+ this.service = message.getService();
+ this.method = message.getMethod();
+ this.sessionId = message.getSessionId();
+ this.clientIp = message.getClientIp();
+ this.serverIp = message.getServerIp();
+ this.status = message.getStatusCode();
+ this.pagePath = message.getPagePath();
+ this.elementId = message.getElementId();
+ this.createTime = new Date();
+ this.updateTime = new Date();
+ }
+}
+
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/model/BuriedPointFailRecord.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/model/MqMessageRecord.java
similarity index 66%
rename from tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/model/BuriedPointFailRecord.java
rename to tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/model/MqMessageRecord.java
index 7ef0be3..9b9d899 100644
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/model/BuriedPointFailRecord.java
+++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/model/MqMessageRecord.java
@@ -11,24 +11,17 @@ import java.util.Date;
* 埋点消息发送失败记录实体类
*/
@Data
-@TableName("buried_point_fail_record")
-public class BuriedPointFailRecord {
-
+@TableName("mq_message_record")
+public class MqMessageRecord {
/**
* 状态常量定义
*/
- public static final int STATUS_UNPROCESSED = 0; // 未处理
- public static final int STATUS_PROCESSING = 1; // 处理中
- public static final int STATUS_SUCCESS = 2; // 处理成功
- public static final int STATUS_FAILED = 3; // 处理失败
-
- @TableId(type = IdType.AUTO)
- private Long id;
-
- /**
- * 消息关联ID
- */
- private String correlationId;
+ public static final int STATUS_UNPROCESSED = 10; // 未处理
+ public static final int STATUS_SUCCESS = 20; // 处理成功
+ public static final int STATUS_FAILED = 30; // 发送失败
+ @TableId
+ private Integer id;
+
/**
* 交换机名称
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/config/BuriedPointConfiguration.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/config/BuriedPointConfiguration.java
index 7db76e9..0dd5876 100644
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/config/BuriedPointConfiguration.java
+++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/config/BuriedPointConfiguration.java
@@ -71,7 +71,7 @@ public class BuriedPointConfiguration implements WebMvcConfigurer {
"/v3/api-docs/**",
"/webjars/**",
"/static/**",
- "/card1",
+ "/card",
"/error"
);
}
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/consumer/buriedPoint/BuriedPointConsumer.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/consumer/buriedPoint/BuriedPointConsumer.java
index 8e5b7dc..9dafcdb 100644
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/consumer/buriedPoint/BuriedPointConsumer.java
+++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/consumer/buriedPoint/BuriedPointConsumer.java
@@ -1,9 +1,9 @@
package com.tashow.cloud.app.mq.consumer.buriedPoint;
-
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.tashow.cloud.app.mapper.BuriedPointMapper;
import com.tashow.cloud.app.mapper.BuriedPointFailRecordMapper;
import com.tashow.cloud.app.model.BuriedPoint;
-import com.tashow.cloud.app.model.BuriedPointFailRecord;
+import com.tashow.cloud.app.model.MqMessageRecord;
import com.tashow.cloud.app.mq.message.BuriedMessages;
import com.tashow.cloud.app.service.feishu.BuriedPointMonitorService;
import com.rabbitmq.client.Channel;
@@ -16,12 +16,8 @@ import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
-
import java.util.Date;
import org.springframework.dao.DuplicateKeyException;
-import com.tashow.cloud.common.util.json.JsonUtils;
-import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
-
/**
* 埋点消息消费者
*/
@@ -30,123 +26,39 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@Slf4j
@RequiredArgsConstructor
public class BuriedPointConsumer extends AbstractRabbitMQConsumer {
-
private final BuriedPointMapper buriedPointMapper;
- private final BuriedPointFailRecordMapper buriedPointFailRecordMapper;
private final BuriedPointMonitorService buriedPointMonitorService;
@Value("${spring.application.name:tashow-app}")
private String applicationName;
- @Override
- public int getMaxRetryAllowed() {
- return 1;
- }
-
@RabbitHandler
public void handleMessage(BuriedMessages message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
onMessage(message, channel, deliveryTag);
}
+ /**
+ * 处理埋点消息
+ * @param message 消息对象
+ * @return
+ */
@Override
public boolean processMessage(BuriedMessages message) {
- return saveToDatabase(message);
- }
-
- @Override
- public Integer getRetryCount(BuriedMessages message) {
- try {
- BuriedPoint buriedPoint = buriedPointMapper.selectByEventId(message.getId());
- if (buriedPoint != null && buriedPoint.getRetryCount() != null) {
- if ((buriedPoint.getStatus() == BuriedMessages.STATUS_ERROR ||
- buriedPoint.getStatus() == BuriedMessages.STATUS_PROCESSING)) {
- return buriedPoint.getRetryCount() - 1;
- }
- return buriedPoint.getRetryCount();
- }
-
- String correlationId = String.valueOf(message.getId());
- BuriedPointFailRecord failRecord = findFailRecord(correlationId);
- return failRecord != null ? failRecord.getRetryCount() : 0;
- } catch (Exception e) {
- log.warn("[埋点消费者] 获取重试次数失败", e);
- return 0;
- }
- }
-
- /**
- * 根据关联ID查找失败记录
- */
- private BuriedPointFailRecord findFailRecord(String correlationId) {
- LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>();
- queryWrapper.eq(BuriedPointFailRecord::getCorrelationId, correlationId);
- return buriedPointFailRecordMapper.selectOne(queryWrapper);
- }
-
- @Override
- public void updateMessageStatus(BuriedMessages message) {
- try {
- BuriedPoint buriedPoint = buriedPointMapper.selectByEventId(message.getId());
- if (buriedPoint != null) {
- buriedPoint.setStatus(message.getStatusCode());
- buriedPoint.setUpdateTime(new Date());
- buriedPoint.setRetryCount(message.getRetryCount());
- buriedPointMapper.updateById(buriedPoint);
- }
- } catch (Exception e) {
- log.error("[埋点消费者] 更新状态失败", e);
- }
- }
-
- @Override
- public void updateRetryCount(BuriedMessages message) {
- try {
- BuriedPoint buriedPoint = buriedPointMapper.selectByEventId(message.getId());
- if (buriedPoint != null) {
- updateBuriedPointRetryCount(buriedPoint, message);
- return;
- }
-
- String correlationId = String.valueOf(message.getId());
- BuriedPointFailRecord failRecord = findFailRecord(correlationId);
- if (failRecord != null) {
- updateFailRecordRetryCount(failRecord, message);
- } else {
- saveToFailRecord(message, "");
- }
- } catch (Exception e) {
- log.error("[埋点消费者] 更新重试次数失败", e);
- }
- }
-
- /**
- * 更新埋点表中的重试次数
- */
- private void updateBuriedPointRetryCount(BuriedPoint buriedPoint, BuriedMessages message) {
- buriedPoint.setRetryCount(message.getRetryCount());
- buriedPoint.setUpdateTime(new Date());
- buriedPointMapper.updateById(buriedPoint);
- }
-
- /**
- * 更新失败记录表中的重试次数
- */
- private void updateFailRecordRetryCount(BuriedPointFailRecord failRecord, BuriedMessages message) {
- failRecord.setRetryCount(message.getRetryCount());
- failRecord.setUpdateTime(new Date());
- failRecord.setMessageContent(JsonUtils.toJsonString(message));
- buriedPointFailRecordMapper.updateById(failRecord);
- }
-
- @Override
- public boolean saveToDatabase(BuriedMessages message) {
try {
BuriedPoint existingPoint = buriedPointMapper.selectByEventId(message.getId());
if (existingPoint != null) {
- return updateExistingBuriedPoint(existingPoint, message);
+ existingPoint.setStatus(message.getStatusCode());
+ existingPoint.setUpdateTime(new Date());
+ return buriedPointMapper.updateById(existingPoint) > 0;
}
+ BuriedPoint buriedPoint = new BuriedPoint(message);
+ buriedPoint.setService(applicationName);
+ buriedPointMapper.insert(buriedPoint);
- return createNewBuriedPoint(message);
+ if(buriedPoint.getStatus() == BuriedMessages.STATUS_ERROR){
+ buriedPointMonitorService.checkFailRecordsAndAlert("埋点数据处理异常");
+ }
+ return true;
} catch (DuplicateKeyException e) {
return true;
} catch (Exception e) {
@@ -154,106 +66,4 @@ public class BuriedPointConsumer extends AbstractRabbitMQConsumer 0;
- }
-
- /**
- * 创建新的埋点记录
- */
- private boolean createNewBuriedPoint(BuriedMessages message) {
- BuriedPoint buriedPoint = new BuriedPoint();
- buriedPoint.setEventId(message.getId());
- buriedPoint.setEventTime(System.currentTimeMillis());
- buriedPoint.setUserId(message.getUserId());
- buriedPoint.setEventType(message.getEventType());
- buriedPoint.setService(applicationName);
- buriedPoint.setMethod(message.getMethod());
- buriedPoint.setSessionId(message.getSessionId());
- buriedPoint.setClientIp(message.getClientIp());
- buriedPoint.setServerIp(message.getServerIp());
- buriedPoint.setStatus(message.getStatusCode());
- buriedPoint.setRetryCount(message.getRetryCount());
- buriedPoint.setPagePath(message.getPagePath());
- buriedPoint.setElementId(message.getElementId());
- buriedPoint.setDuration(message.getDuration());
- buriedPoint.setCreateTime(new Date());
- buriedPoint.setUpdateTime(new Date());
-
- buriedPointMapper.insert(buriedPoint);
- return true;
- }
-
- @Override
- public void saveToFailRecord(BuriedMessages message, String cause) {
- try {
- String correlationId = String.valueOf(message.getId());
- BuriedPointFailRecord existingRecord = findFailRecord(correlationId);
-
- if (existingRecord != null) {
- updateExistingFailRecord(existingRecord, message, cause);
- } else {
- createNewFailRecord(message, cause);
- checkFailRecordsAndAlert();
- }
- } catch (Exception e) {
- log.error("[埋点消费者] 保存失败记录失败", e);
- }
- }
-
- /**
- * 更新已存在的失败记录
- */
- private void updateExistingFailRecord(BuriedPointFailRecord record, BuriedMessages message, String cause) {
- record.setExchange(BuriedMessages.EXCHANGE);
- record.setRoutingKey(BuriedMessages.ROUTING_KEY);
- record.setCause(message.getErrorMessage() + cause);
- record.setMessageContent(JsonUtils.toJsonString(message));
- record.setRetryCount(message.getRetryCount());
- record.setStatus(BuriedPointFailRecord.STATUS_UNPROCESSED);
- record.setUpdateTime(new Date());
- buriedPointFailRecordMapper.updateById(record);
- }
-
- /**
- * 创建新的失败记录
- */
- private void createNewFailRecord(BuriedMessages message, String cause) {
- String correlationId = String.valueOf(message.getId());
- BuriedPointFailRecord failRecord = new BuriedPointFailRecord();
- failRecord.setCorrelationId(correlationId);
- failRecord.setExchange(BuriedMessages.EXCHANGE);
- failRecord.setRoutingKey(BuriedMessages.ROUTING_KEY);
- failRecord.setCause(message.getErrorMessage() + cause);
- failRecord.setMessageContent(JsonUtils.toJsonString(message));
- failRecord.setRetryCount(message.getRetryCount());
- failRecord.setStatus(BuriedPointFailRecord.STATUS_UNPROCESSED);
- failRecord.setCreateTime(new Date());
- failRecord.setUpdateTime(new Date());
- buriedPointFailRecordMapper.insert(failRecord);
- }
-
- /**
- * 检查失败记录数量并发送告警
- */
- private void checkFailRecordsAndAlert() {
- try {
- buriedPointMonitorService.checkFailRecordsAndAlert("埋点处理异常,请检查系统");
- } catch (Exception e) {
- log.error("[埋点消费者] 检查失败记录异常", e);
- }
- }
}
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/handler/BuriedPointFailRecordHandler.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/handler/BuriedPointFailRecordHandler.java
index 79108b8..0649573 100644
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/handler/BuriedPointFailRecordHandler.java
+++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/handler/BuriedPointFailRecordHandler.java
@@ -1,93 +1,100 @@
package com.tashow.cloud.app.mq.handler;
-
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.tashow.cloud.app.mapper.BuriedPointFailRecordMapper;
-import com.tashow.cloud.app.model.BuriedPointFailRecord;
+import com.tashow.cloud.app.model.MqMessageRecord;
import com.tashow.cloud.app.service.feishu.BuriedPointMonitorService;
import com.tashow.cloud.mq.handler.FailRecordHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
-
import java.util.Date;
-
/**
- * 埋点失败记录处理器
+ * MQ消息记录处理器
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class BuriedPointFailRecordHandler implements FailRecordHandler {
-
private final BuriedPointFailRecordMapper buriedPointFailRecordMapper;
private final BuriedPointMonitorService buriedPointMonitorService;
-
/**
- * 保存消息发送失败记录
+ * 保存消息记录=
*/
@Override
- public void saveFailRecord(String correlationId, String exchange, String routingKey, String cause, String messageContent) {
+ public void saveMessageRecord(Integer id, String exchange, String routingKey, String cause, String messageContent, int status) {
try {
- // 先查询是否已存在记录
- BuriedPointFailRecord existingRecord = findExistingRecord(correlationId);
-
+ MqMessageRecord existingRecord = findExistingRecord(id);
if (existingRecord != null) {
- updateExistingRecord(existingRecord, exchange, routingKey, cause, messageContent);
+ existingRecord.setRetryCount(existingRecord.getRetryCount() + 1);
+ existingRecord.setMessageContent(messageContent);
+ existingRecord.setStatus(status);
+ existingRecord.setCause(cause);
+ existingRecord.setUpdateTime(new Date());
+ buriedPointFailRecordMapper.updateById(existingRecord);
} else {
- createNewFailRecord(correlationId, exchange, routingKey, cause, messageContent);
- checkAlertThreshold(cause);
+ MqMessageRecord record = new MqMessageRecord();
+ record.setId(id);
+ record.setExchange(exchange);
+ record.setRoutingKey(routingKey);
+ record.setCause(cause);
+ record.setMessageContent(messageContent);
+ record.setRetryCount(0);
+ record.setStatus(status);
+ record.setCreateTime(new Date());
+ record.setUpdateTime(new Date());
+ buriedPointFailRecordMapper.insert(record);
+ if (status == MqMessageRecord.STATUS_FAILED) {
+ buriedPointMonitorService.checkFailRecordsAndAlert(cause);
+ }
}
} catch (Exception e) {
- log.error("[埋点处理器] 保存失败记录异常", e);
+ log.error("[MQ消息处理器] 保存消息记录异常", e);
+ }
+ }
+
+ /**
+ * 更新消息状态
+ */
+ @Override
+ public void updateMessageStatus(Integer id) {
+ try {
+ MqMessageRecord record = findExistingRecord(id);
+ if (record != null) {
+ record.setStatus(MqMessageRecord.STATUS_SUCCESS);
+ record.setUpdateTime(new Date());
+ buriedPointFailRecordMapper.updateById(record);
+ }
+ } catch (Exception e) {
+ log.error("[MQ消息处理器] 更新消息状态异常: {}", id, e);
+ }
+ }
+
+ /**
+ * 更新消息状态并设置失败原因
+ */
+ @Override
+ public void updateMessageStatusWithCause(Integer id, String cause) {
+ try {
+ MqMessageRecord record = findExistingRecord(id);
+ if (record != null) {
+ record.setStatus(MqMessageRecord.STATUS_FAILED);
+ record.setCause(cause);
+ record.setUpdateTime(new Date());
+ buriedPointFailRecordMapper.updateById(record);
+ buriedPointMonitorService.checkFailRecordsAndAlert(cause);
+ }
+ } catch (Exception e) {
+ log.error("[MQ消息处理器] 更新消息状态和原因异常: {}", id, e);
}
}
/**
* 查找已存在的失败记录
*/
- private BuriedPointFailRecord findExistingRecord(String correlationId) {
- LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>();
- queryWrapper.eq(BuriedPointFailRecord::getCorrelationId, correlationId);
+ private MqMessageRecord findExistingRecord(Integer id) {
+ LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>();
+ queryWrapper.eq(MqMessageRecord::getId, id);
return buriedPointFailRecordMapper.selectOne(queryWrapper);
}
- /**
- * 更新已存在的失败记录
- */
- private void updateExistingRecord(BuriedPointFailRecord record, String exchange, String routingKey,
- String cause, String messageContent) {
- record.setExchange(exchange);
- record.setRoutingKey(routingKey);
- record.setCause(cause);
- record.setMessageContent(messageContent);
- record.setStatus(BuriedPointFailRecord.STATUS_UNPROCESSED);
- record.setUpdateTime(new Date());
- buriedPointFailRecordMapper.updateById(record);
- }
-
- /**
- * 创建新的失败记录
- */
- private void createNewFailRecord(String correlationId, String exchange, String routingKey,
- String cause, String messageContent) {
- BuriedPointFailRecord failRecord = new BuriedPointFailRecord();
- failRecord.setCorrelationId(correlationId);
- failRecord.setExchange(exchange);
- failRecord.setRoutingKey(routingKey);
- failRecord.setCause(cause);
- failRecord.setMessageContent(messageContent);
- failRecord.setRetryCount(0);
- failRecord.setStatus(BuriedPointFailRecord.STATUS_UNPROCESSED);
- failRecord.setCreateTime(new Date());
- failRecord.setUpdateTime(new Date());
- buriedPointFailRecordMapper.insert(failRecord);
- }
-
- /**
- * 检查是否达到告警阈值
- */
- @Override
- public boolean checkAlertThreshold(String cause) {
- return buriedPointMonitorService.checkFailRecordsAndAlert(cause);
- }
}
\ No newline at end of file
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/message/BuriedMessages.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/message/BuriedMessages.java
index fa54a12..900d572 100644
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/message/BuriedMessages.java
+++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/message/BuriedMessages.java
@@ -1,15 +1,27 @@
package com.tashow.cloud.app.mq.message;
+import cn.hutool.core.util.IdUtil;
+import com.tashow.cloud.common.util.json.JsonUtils;
+import com.tashow.cloud.common.util.servlet.ServletUtils;
+import com.tashow.cloud.common.util.spring.SpringUtils;
import com.tashow.cloud.mq.core.BaseMqMessage;
+import jakarta.servlet.http.HttpServletRequest;
import lombok.Data;
+import org.springframework.web.method.HandlerMethod;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.Date;
+import static com.tashow.cloud.web.apilog.core.interceptor.ApiAccessLogInterceptor.ATTRIBUTE_HANDLER_METHOD;
+
/**
* 埋点消息
*/
@Data
public class BuriedMessages extends BaseMqMessage {
+ private static final String ATTRIBUTE_REQUEST_ID = "BuriedPoint.RequestId";
+
/**
* 交换机名称
@@ -85,15 +97,46 @@ public class BuriedMessages extends BaseMqMessage {
* 元素ID
*/
private String elementId;
-
- /**
- * 持续时间
- */
- private Long duration;
+
/**
* 服务名称
*/
- private String service;
+ private String service;
+ public BuriedMessages() {
+ }
+ /**
+ * 从请求创建埋点消息
+ *
+ * @param request HTTP请求
+ * @param handlerMethod 处理方法
+ */
+ public BuriedMessages(HttpServletRequest request, HandlerMethod handlerMethod) {
+ try {
+ int requestId = (int)(Math.abs(IdUtil.getSnowflakeNextId()) % Integer.MAX_VALUE);
+ this.setId(requestId);
+ this.eventTime = new Date();
+ this.service = SpringUtils.getApplicationName();
+ this.method = request.getMethod() + " " + request.getRequestURI() +
+ JsonUtils.toJsonString(request.getParameterMap());
+ Object userId = request.getSession().getAttribute("USER_ID");
+ this.userId = userId != null ? userId.toString() : "anonymous";
+ this.sessionId = request.getSession().getId();
+ this.clientIp = ServletUtils.getClientIP(request);
+ try {
+ this.serverIp = InetAddress.getLocalHost().getHostAddress();
+ } catch (UnknownHostException e) {
+ this.serverIp = "unknown";
+ }
+ String controllerName = handlerMethod.getBeanType().getSimpleName();
+ String actionName = handlerMethod.getMethod().getName();
+ this.pagePath = controllerName + "#" + actionName;
+ this.eventType = "API_REQUEST_START";
+ this.setStatusCode(STATUS_PROCESSING);
+ request.setAttribute(ATTRIBUTE_REQUEST_ID, this.getId());
+ } catch (Exception e) {
+ throw new RuntimeException("创建埋点消息失败", e);
+ }
+ }
}
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/producer/buriedPoint/BuriedPointProducer.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/producer/buriedPoint/BuriedPointProducer.java
index c1db0be..5daa8b0 100644
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/producer/buriedPoint/BuriedPointProducer.java
+++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/producer/buriedPoint/BuriedPointProducer.java
@@ -3,6 +3,7 @@ package com.tashow.cloud.app.mq.producer.buriedPoint;
import com.tashow.cloud.app.mq.message.BuriedMessages;
import com.tashow.cloud.common.util.json.JsonUtils;
import com.tashow.cloud.mq.rabbitmq.producer.AbstractRabbitMQProducer;
+import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.stereotype.Component;
/**
@@ -11,10 +12,14 @@ import org.springframework.stereotype.Component;
@Component
public class BuriedPointProducer extends AbstractRabbitMQProducer {
+ @Override
+ public void returnedMessage(ReturnedMessage returned) {
+
+ }
@Override
public String getExchange() {
- return "BuriedMessages.EXCHANGE";
+ return BuriedMessages.EXCHANGE;
}
@Override
@@ -22,8 +27,5 @@ public class BuriedPointProducer extends AbstractRabbitMQProducer alertCache = new ConcurrentHashMap<>();
private final BuriedPointFailRecordMapper buriedPointFailRecordMapper;
+ private final BuriedPointMapper buriedPointMapper;
private final FeiShuAlertClient feiShuAlertClient;
private final FeiShuCardDataService feiShuCardDataService;
private final LarkConfig larkConfig;
@@ -36,122 +39,194 @@ public class BuriedPointMonitorService {
try {
Date now = new Date();
Date hoursAgo = getDateHoursAgo(now, MONITORING_HOURS);
-
- Long failCount = getUnprocessedFailCount(hoursAgo, now);
-
- if (failCount > ALERT_THRESHOLD) {
- sendAlertMessage(failCount.intValue(), hoursAgo, now, cause);
- return true;
+ boolean sentAlert = false;
+ List timeRanges = getHourRanges(hoursAgo, now);
+ long mqFailCount = countFailures(buriedPointFailRecordMapper, MqMessageRecord.class, hoursAgo, now);
+ long buriedFailCount = countFailures(buriedPointMapper, BuriedPoint.class, hoursAgo, now);
+ if (mqFailCount > ALERT_THRESHOLD||buriedFailCount > ALERT_THRESHOLD) {
+ if (!hasRecentlySentAlert(cause)) {
+ sendAlert(mqFailCount, cause, getMqStats(timeRanges));
+ alertCache.put(cause, System.currentTimeMillis());
+ sentAlert = true;
+ }
}
- return false;
+
+ return sentAlert;
} catch (Exception e) {
log.error("[埋点监控] 检查失败记录异常", e);
return false;
}
}
+
+ /**
+ * 检查是否最近已发送过相同类型的告警
+ */
+ private boolean hasRecentlySentAlert(String alertType) {
+ Long lastSentTime = alertCache.get(alertType);
+ if (lastSentTime == null) {
+ return false;
+ }
+
+ long hourInMillis = MONITORING_HOURS * 60 * 60 * 1000L;
+ return (System.currentTimeMillis() - lastSentTime) < hourInMillis;
+ }
+
/**
- * 获取未处理的失败记录数量
+ * 获取消息队列统计数据
*/
- public Long getUnprocessedFailCount(Date startDate, Date endDate) {
- LambdaQueryWrapper query = new LambdaQueryWrapper<>();
- query.ge(BuriedPointFailRecord::getCreateTime, startDate)
- .le(BuriedPointFailRecord::getCreateTime, endDate)
- .eq(BuriedPointFailRecord::getStatus, BuriedPointFailRecord.STATUS_UNPROCESSED);
- return buriedPointFailRecordMapper.selectCount(query);
+ private List getMqStats(List timeRanges) {
+ Map successData = batchQueryMqStatus(timeRanges, MqMessageRecord.STATUS_SUCCESS);
+ Map failedData = batchQueryMqFailures(timeRanges);
+
+ SimpleDateFormat timeFormat = new SimpleDateFormat("HH:00");
+ return timeRanges.stream()
+ .map(range -> new ChartImageGenerator.MonitoringDataPoint(
+ timeFormat.format(range[0]),
+ successData.getOrDefault(range[0], 0),
+ failedData.getOrDefault(range[0], 0)
+ ))
+ .toList();
+ }
+
+ /**
+ * 获取埋点表统计数据
+ */
+ private List getBuriedStats(List timeRanges) {
+ // 批量查询每个时间区间的数据
+ Map successData = batchQueryBuriedStatus(timeRanges, BuriedMessages.STATUS_SUCCESS);
+ Map failedData = batchQueryBuriedStatus(timeRanges, BuriedMessages.STATUS_ERROR);
+ SimpleDateFormat timeFormat = new SimpleDateFormat("HH:00");
+ return timeRanges.stream()
+ .map(range -> new ChartImageGenerator.MonitoringDataPoint(
+ timeFormat.format(range[0]),
+ successData.getOrDefault(range[0], 0),
+ failedData.getOrDefault(range[0], 0)
+ ))
+ .toList();
+ }
+
+ /**
+ * 查询MQ状态数据
+ */
+ private Map batchQueryMqStatus(List timeRanges, int status) {
+ Map result = new HashMap<>();
+ for (Date[] range : timeRanges) {
+ LambdaQueryWrapper query = new LambdaQueryWrapper<>();
+ query.ge(MqMessageRecord::getCreateTime, range[0])
+ .lt(MqMessageRecord::getCreateTime, range[1])
+ .eq(MqMessageRecord::getStatus, status);
+ result.put(range[0], buriedPointFailRecordMapper.selectCount(query).intValue());
+ }
+ return result;
+ }
+
+ /**
+ * 批量查询MQ失败数据
+ */
+ private Map batchQueryMqFailures(List timeRanges) {
+ Map result = new HashMap<>();
+ for (Date[] range : timeRanges) {
+ LambdaQueryWrapper query = new LambdaQueryWrapper<>();
+ query.ge(MqMessageRecord::getCreateTime, range[0])
+ .lt(MqMessageRecord::getCreateTime, range[1])
+ .in(MqMessageRecord::getStatus, Arrays.asList(
+ MqMessageRecord.STATUS_UNPROCESSED, MqMessageRecord.STATUS_FAILED));
+ result.put(range[0], buriedPointFailRecordMapper.selectCount(query).intValue());
+ }
+ return result;
+ }
+
+ /**
+ * 批量查询埋点状态数据
+ */
+ private Map batchQueryBuriedStatus(List timeRanges, int status) {
+ Map result = new HashMap<>();
+ for (Date[] range : timeRanges) {
+ LambdaQueryWrapper query = new LambdaQueryWrapper<>();
+ query.ge(BuriedPoint::getCreateTime, range[0])
+ .lt(BuriedPoint::getCreateTime, range[1])
+ .eq(BuriedPoint::getStatus, status);
+ result.put(range[0], buriedPointMapper.selectCount(query).intValue());
+ }
+ return result;
}
/**
- * 发送告警消息
+ * 计算失败数量
*/
- private void sendAlertMessage(int failCount, Date startDate, Date endDate, String errorMessage) {
+ private long countFailures(Object mapper, Class entityClass, Date startDate, Date endDate) {
try {
- List monitoringData =
- queryHourlyFailRecordData(startDate, endDate);
+ if (entityClass == BuriedPoint.class) {
+ LambdaQueryWrapper query = new LambdaQueryWrapper<>();
+ query.ge(BuriedPoint::getCreateTime, startDate)
+ .le(BuriedPoint::getCreateTime, endDate)
+ .eq(BuriedPoint::getStatus, BuriedMessages.STATUS_ERROR);
+ return ((BuriedPointMapper)mapper).selectCount(query);
+ } else {
+ LambdaQueryWrapper query = new LambdaQueryWrapper<>();
+ query.ge(MqMessageRecord::getCreateTime, startDate)
+ .le(MqMessageRecord::getCreateTime, endDate)
+ .eq(MqMessageRecord::getStatus, MqMessageRecord.STATUS_FAILED);
+ return ((BuriedPointFailRecordMapper)mapper).selectCount(query);
+ }
+ } catch (Exception e) {
+ return 0;
+ }
+ }
+
+ /**
+ * 发送告警
+ */
+ private void sendAlert(long failCount, String alertMsg, List data) {
+ try {
+ String imageKey = feiShuAlertClient.uploadImage(data, alertMsg);
+ String title = alertMsg.split(":")[0].trim();
- HashMap templateData = new HashMap<>();
- String chatId = larkConfig.getChatId();
-
- String imageKey = feiShuAlertClient.uploadImage(monitoringData, errorMessage);
-
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- templateData.put("alert_title", "埋点数据异常告警");
- templateData.put("image_key", imageKey);
- templateData.put("current_time", sdf.format(new Date()));
- templateData.put("fail_count", failCount);
+ Map templateData = Map.of(
+ "alert_title", title,
+ "image_key", Map.of("img_key", imageKey),
+ "current_time", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()),
+ "fail_count", failCount
+ );
String messageId = feiShuAlertClient.sendCardMessage(
- chatId,
- larkConfig.getExceptionCards(),
- templateData
+ larkConfig.getChatId(),
+ larkConfig.getExceptionCards(),
+ new HashMap<>(templateData)
);
feiShuCardDataService.saveCardData(messageId, templateData);
} catch (Exception e) {
- log.error("[埋点监控] 发送告警失败", e);
+ log.error("[埋点监控] 发送告警失败: {}", e.getMessage());
}
}
/**
- * 查询按小时统计的失败记录数据
+ * 获取小时范围列表
*/
- public List queryHourlyFailRecordData(Date startDate, Date endDate) {
- List result = new ArrayList<>();
-
- try {
- Date limitedStartDate = getDateHoursAgo(endDate, MONITORING_HOURS);
- Date actualStartDate = startDate.after(limitedStartDate) ? startDate : limitedStartDate;
+ private List getHourRanges(Date startDate, Date endDate) {
+ List ranges = new ArrayList<>();
+ Calendar cal = Calendar.getInstance();
+
+ cal.setTime(endDate);
+ cal.set(Calendar.MINUTE, 0);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+ Date endHour = cal.getTime();
+
+ cal.add(Calendar.HOUR_OF_DAY, -(MONITORING_HOURS - 1));
+ Date startHour = startDate.after(cal.getTime()) ? startDate : cal.getTime();
+
+ cal.setTime(startHour);
+ while (!cal.getTime().after(endHour)) {
+ Date hourStart = cal.getTime();
+ cal.add(Calendar.HOUR_OF_DAY, 1);
+ Date hourEnd = cal.getTime().after(endDate) ? endDate : cal.getTime();
+ ranges.add(new Date[]{hourStart, hourEnd});
- SimpleDateFormat sdf = new SimpleDateFormat("HH:00");
- Calendar calendar = Calendar.getInstance();
-
- long hoursDiff = (endDate.getTime() - actualStartDate.getTime()) / (60 * 60 * 1000) + 1;
- int hours = (int) Math.min(hoursDiff, MONITORING_HOURS);
-
- for (int i = 0; i < hours; i++) {
- calendar.setTime(actualStartDate);
- calendar.add(Calendar.HOUR_OF_DAY, i);
- Date currentHourStart = calendar.getTime();
- calendar.add(Calendar.HOUR_OF_DAY, 1);
- Date nextHourStart = calendar.getTime();
-
- int successCount = getHourlyRecordCount(currentHourStart, nextHourStart, BuriedPointFailRecord.STATUS_SUCCESS);
- int failCount = getHourlyFailedCount(currentHourStart, nextHourStart);
-
- result.add(new ChartImageGenerator.MonitoringDataPoint(
- sdf.format(currentHourStart),
- successCount,
- failCount
- ));
- }
-
- return result;
- } catch (Exception e) {
- log.error("[埋点监控] 查询小时数据失败", e);
- return Collections.emptyList();
+ if (hourEnd.equals(endDate)) break;
}
- }
-
- /**
- * 获取指定状态的记录数量
- */
- private int getHourlyRecordCount(Date startHour, Date endHour, int status) {
- LambdaQueryWrapper query = new LambdaQueryWrapper<>();
- query.ge(BuriedPointFailRecord::getCreateTime, startHour)
- .lt(BuriedPointFailRecord::getCreateTime, endHour)
- .eq(BuriedPointFailRecord::getStatus, status);
- return buriedPointFailRecordMapper.selectCount(query).intValue();
- }
-
- /**
- * 获取失败记录数量
- */
- private int getHourlyFailedCount(Date startHour, Date endHour) {
- LambdaQueryWrapper query = new LambdaQueryWrapper<>();
- query.ge(BuriedPointFailRecord::getCreateTime, startHour)
- .lt(BuriedPointFailRecord::getCreateTime, endHour)
- .in(BuriedPointFailRecord::getStatus,
- Arrays.asList(BuriedPointFailRecord.STATUS_UNPROCESSED, BuriedPointFailRecord.STATUS_FAILED));
- return buriedPointFailRecordMapper.selectCount(query).intValue();
+ return ranges;
}
/**
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/service/impl/BuriedPointFailRecordService.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/service/impl/BuriedPointFailRecordService.java
index ecd2006..aa92783 100644
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/service/impl/BuriedPointFailRecordService.java
+++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/service/impl/BuriedPointFailRecordService.java
@@ -2,7 +2,7 @@ package com.tashow.cloud.app.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.tashow.cloud.app.mapper.BuriedPointFailRecordMapper;
-import com.tashow.cloud.app.model.BuriedPointFailRecord;
+import com.tashow.cloud.app.model.MqMessageRecord;
import com.tashow.cloud.app.mq.message.BuriedMessages;
import com.tashow.cloud.app.mq.producer.buriedPoint.BuriedPointProducer;
import com.tashow.cloud.common.util.json.JsonUtils;
@@ -20,7 +20,7 @@ import java.util.List;
@Slf4j
@Service
@RequiredArgsConstructor
-public class BuriedPointFailRecordService implements MessageRetryService {
+public class BuriedPointFailRecordService implements MessageRetryService {
private final BuriedPointFailRecordMapper buriedPointFailRecordMapper;
private final BuriedPointProducer buriedPointProducer;
@@ -29,10 +29,10 @@ public class BuriedPointFailRecordService implements MessageRetryService getUnprocessedRecords() {
- LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>();
- queryWrapper.eq(BuriedPointFailRecord::getStatus, BuriedPointFailRecord.STATUS_UNPROCESSED)
- .orderByAsc(BuriedPointFailRecord::getCreateTime);
+ public List getUnprocessedRecords() {
+ LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>();
+ queryWrapper.eq(MqMessageRecord::getStatus, MqMessageRecord.STATUS_FAILED)
+ .orderByAsc(MqMessageRecord::getCreateTime);
return buriedPointFailRecordMapper.selectList(queryWrapper);
}
@@ -40,39 +40,14 @@ public class BuriedPointFailRecordService implements MessageRetryService 0;
- } catch (Exception e) {
- log.error("[埋点重试] 更新状态失败", e);
- return false;
}
}
}
\ No newline at end of file
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/task/BuriedPointRetryTask.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/task/BuriedPointRetryTask.java
index 686c022..aeccd90 100644
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/task/BuriedPointRetryTask.java
+++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/task/BuriedPointRetryTask.java
@@ -1,6 +1,5 @@
package com.tashow.cloud.app.task;
-
-import com.tashow.cloud.app.model.BuriedPointFailRecord;
+import com.tashow.cloud.app.model.MqMessageRecord;
import com.tashow.cloud.app.service.impl.BuriedPointFailRecordService;
import com.tashow.cloud.mq.retry.AbstractMessageRetryTask;
import com.tashow.cloud.mq.retry.MessageRetryService;
@@ -15,7 +14,7 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
-public class BuriedPointRetryTask extends AbstractMessageRetryTask {
+public class BuriedPointRetryTask extends AbstractMessageRetryTask {
private final BuriedPointFailRecordService buriedPointFailRecordService;
@@ -24,22 +23,19 @@ public class BuriedPointRetryTask extends AbstractMessageRetryTask getMessageRetryService() {
+ protected MessageRetryService getMessageRetryService() {
return buriedPointFailRecordService;
}
-
@Override
- protected String getRecordId(BuriedPointFailRecord record) {
- return String.valueOf(record.getId());
+ protected Integer getRecordId(MqMessageRecord record) {
+ return record.getId();
}
- @Override
- protected String getCorrelationId(BuriedPointFailRecord record) {
- return record.getCorrelationId();
- }
+
}
\ No newline at end of file
diff --git a/tashow-sdk/tashow-feishu-sdk/.gitignore b/tashow-sdk/tashow-feishu-sdk/.gitignore
new file mode 100644
index 0000000..7423f8b
--- /dev/null
+++ b/tashow-sdk/tashow-feishu-sdk/.gitignore
@@ -0,0 +1,3 @@
+.idea/
+.DS_Store
+src/main/java/com/tashow/cloud/sdk/feishu/client/chat_history.txt
\ No newline at end of file
diff --git a/tashow-sdk/tashow-feishu-sdk/src/main/java/com/tashow/cloud/sdk/feishu/client/FeiShuAlertClient.java b/tashow-sdk/tashow-feishu-sdk/src/main/java/com/tashow/cloud/sdk/feishu/client/FeiShuAlertClient.java
index b4cb488..642592b 100644
--- a/tashow-sdk/tashow-feishu-sdk/src/main/java/com/tashow/cloud/sdk/feishu/client/FeiShuAlertClient.java
+++ b/tashow-sdk/tashow-feishu-sdk/src/main/java/com/tashow/cloud/sdk/feishu/client/FeiShuAlertClient.java
@@ -5,22 +5,17 @@ import com.tashow.cloud.sdk.feishu.util.ChartImageGenerator;
import com.lark.oapi.service.im.v1.model.ext.MessageTemplate;
import com.lark.oapi.service.im.v1.model.ext.MessageTemplateData;
import java.io.File;
-import java.text.SimpleDateFormat;
import java.util.*;
import com.tashow.cloud.sdk.feishu.config.LarkConfig;
import com.tashow.cloud.sdk.feishu.util.LarkClientUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-
/**
* 飞书告警客户端
* 用于处理系统告警消息的发送
*/
@Service
public class FeiShuAlertClient {
- private final Logger log = LoggerFactory.getLogger(FeiShuAlertClient.class);
private final Client client;
private final LarkConfig larkConfig;
private final ChartImageGenerator chartImageGenerator;
@@ -56,42 +51,6 @@ public class FeiShuAlertClient {
return resp.getData().getChatId();
}
- /**
- * 发送带错误信息的埋点报警消息
- *
- * @param chatId 会话ID
- * @param buriedPointData 埋点数据
- * @param failCount 失败数量
- * @param errorMessage 错误信息
- * @return 发送的消息ID
- * @throws Exception 异常信息
- */
- public String sendBuriedPointAlertMessage(String chatId, List buriedPointData, int failCount, String errorMessage) throws Exception {
- HashMap templateData = new HashMap<>();
- String imageKey = uploadImage(buriedPointData, errorMessage);
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- String currentTime = sdf.format(new Date());
- templateData.put("alert_title", "埋点数据异常告警");
- templateData.put("image_key", imageKey);
- templateData.put("current_time", currentTime);
- templateData.put("fail_count", failCount);
- return sendCardMessage(chatId, "AAqdpjayeOVp2", templateData);
- }
-
- /**
- * 发送带错误信息的埋点报警消息(创建群)
- *
- * @param buriedPointData 埋点数据
- * @param failCount 失败数量
- * @param errorMessage 错误信息
- * @return 创建的群ID和消息ID,格式为 "chatId:messageId"
- * @throws Exception 异常信息
- */
- public String sendBuriedPointAlertMessage(List buriedPointData, int failCount, String errorMessage) throws Exception {
- String chatId = createAlertChat();
- String messageId = sendBuriedPointAlertMessage(chatId, buriedPointData, failCount, errorMessage);
- return chatId + ":" + messageId;
- }
/**
* 发送报警消息
diff --git a/tashow-sdk/tashow-feishu-sdk/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/tashow-sdk/tashow-feishu-sdk/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 0000000..0a69ff2
--- /dev/null
+++ b/tashow-sdk/tashow-feishu-sdk/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+com.tashow.cloud.sdk.feishu.config.LarkConfig