This commit is contained in:
2025-07-09 18:19:13 +08:00
parent 3d072958d6
commit bb4432e643
35 changed files with 464 additions and 1100 deletions

View File

@@ -40,6 +40,21 @@
<artifactId>jakarta.servlet-api</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.tashow.cloud</groupId>
<artifactId>tashow-data-mybatis</artifactId>
</dependency>
<dependency>
<groupId>org.jodd</groupId>
<artifactId>jodd-util</artifactId>
<version>6.3.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@@ -1,25 +1,24 @@
package com.tashow.cloud.mq.core;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* MQ消息基类所有消息类型都应该继承此类
* MQ消息基类
*
*
* @author tashow
*/
@Data
public class BaseMqMessage implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 消息ID默认为UUID
*/
private Integer id = UUID.randomUUID().hashCode();
/**
* 消息状态码
*/
@@ -44,72 +43,4 @@ public class BaseMqMessage implements Serializable {
* 扩展数据
*/
private Map<String, Object> extraData = new HashMap<>();
/**
* 增加重试次数
*/
public void incrementRetryCount() {
if (retryCount == null) {
retryCount = 0;
}
retryCount++;
}
/**
* 添加额外数据
*/
public void addExtraData(String key, Object value) {
if (extraData == null) {
extraData = new HashMap<>();
}
extraData.put(key, value);
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public Integer getStatusCode() {
return statusCode;
}
public void setStatusCode(Integer statusCode) {
this.statusCode = statusCode;
}
public Integer getRetryCount() {
return retryCount;
}
public void setRetryCount(Integer retryCount) {
this.retryCount = retryCount;
}
public String getErrorMessage() {
return errorMessage;
}
public void setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Map<String, Object> getExtraData() {
return extraData;
}
public void setExtraData(Map<String, Object> extraData) {
this.extraData = extraData;
}
}

View File

@@ -1,44 +0,0 @@
package com.tashow.cloud.mq.core;
import org.springframework.amqp.rabbit.connection.CorrelationData;
/**
* 自定义关联数据,用于存储额外的消息数据
*
* @author tashow
*/
public class CustomCorrelationData extends CorrelationData {
/**
* 消息内容
*/
private final String messageContent;
/**
* 构造函数
*
* @param id 关联ID
* @param messageContent 消息内容
*/
public CustomCorrelationData(String id, String messageContent) {
super(id);
this.messageContent = messageContent;
}
/**
* 获取消息内容
*
* @return 消息内容
*/
public String getMessageContent() {
return messageContent;
}
@Override
public String toString() {
return "CustomCorrelationData{" +
"id='" + getId() + '\'' +
", messageContent='" + messageContent + '\'' +
'}';
}
}

View File

@@ -1,39 +1,31 @@
package com.tashow.cloud.mq.handler;
/**
* 消息发送失败记录处理接口
* 消息记录处理接口
*
* @author tashow
*/
public interface FailRecordHandler {
/**
* 保存消息发送失败记录
* 保存消息记录
*
* @param correlationId 关联ID
* @param exchange 交换机
* @param routingKey 路由键
* @param cause 失败原因
* @param cause 失败原因可为null
* @param messageContent 消息内容
* @param status 状态0-未处理1-处理成功2-处理失败
*/
void saveFailRecord(String correlationId, String exchange, String routingKey, String cause, String messageContent);
void saveMessageRecord(Integer id, String exchange, String routingKey, String cause, String messageContent, int status);
/**
* 更新消息状态
*
* @param id 关联ID
*/
void updateMessageStatus(Integer id);
/**
* 检查是否达到告警阈值
*
* @return 是否需要告警
* 更新消息状态并设置失败原因
*/
default boolean checkAlertThreshold() {
return checkAlertThreshold(null);
}
/**
* 检查是否达到告警阈值,带错误信息
*
* @param cause 错误原因
* @return 是否需要告警
*/
default boolean checkAlertThreshold(String cause) {
return false;
}
void updateMessageStatusWithCause(Integer id, String causes);
}

View File

@@ -1,5 +1,4 @@
package com.tashow.cloud.mq.rabbitmq.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.AutoConfiguration;
@@ -13,7 +12,5 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
@AutoConfiguration
@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
public class RabbitMQAutoConfiguration extends RabbitMQConfiguration {
private static final Logger log = LoggerFactory.getLogger(RabbitMQAutoConfiguration.class);
}

View File

@@ -17,26 +17,6 @@ import org.springframework.context.annotation.Configuration;
@Configuration
@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
public class RabbitMQConfiguration {
private static final Logger log = LoggerFactory.getLogger(RabbitMQConfiguration.class);
/**
* 初始化RabbitTemplate
*
* @param rabbitTemplate RabbitTemplate
*/
protected void initRabbitTemplate(RabbitTemplate rabbitTemplate) {
log.info("[MQ配置] 初始化RabbitTemplate: {}", rabbitTemplate);
// 启用消息发送到交换机确认机制
rabbitTemplate.setMandatory(true);
if (rabbitTemplate.isConfirmListener()) {
log.info("[MQ配置] 确认回调已正确配置");
} else {
log.error("[MQ配置] 确认回调配置失败");
}
}
/**
* 创建消息转换器
*
@@ -46,4 +26,4 @@ public class RabbitMQConfiguration {
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
}

View File

@@ -1,5 +1,4 @@
package com.tashow.cloud.mq.rabbitmq.consumer;
import com.rabbitmq.client.Channel;
import com.tashow.cloud.mq.core.BaseMqMessage;
import org.slf4j.Logger;
@@ -16,136 +15,52 @@ import org.springframework.messaging.handler.annotation.Header;
public abstract class AbstractRabbitMQConsumer<T extends BaseMqMessage> {
private static final Logger log = LoggerFactory.getLogger(AbstractRabbitMQConsumer.class);
/**
* 消息状态:处理中
*/
public static final int STATUS_PROCESSING = 10;
/**
* 消息状态:成功
*/
public static final int STATUS_SUCCESS = 20;
/**
* 消息状态:失败
* 消息状态:消费异常
*/
public static final int STATUS_ERROR = 30;
public static final int STATUS_SEND_EXCEPTION = 30;
/**
* 处理消息
* 埋点处理消息
*
* @param message 消息对象
* @return 处理结果true表示处理成功false表示处理失败
*/
public abstract boolean processMessage(T message);
/**
* 获取消息重试次数
*
* @param message 消息对象
* @return 重试次数
*/
public Integer getRetryCount(T message) {
return message.getRetryCount() != null ? message.getRetryCount() : 0;
}
/**
* 更新消息状态
*
* @param message 消息对象
*/
public abstract void updateMessageStatus(T message);
/**
* 更新消息重试次数
*
* @param message 消息对象
*/
public abstract void updateRetryCount(T message);
/**
* 保存消息到数据库
*
* @param message 消息对象
* @return 保存结果
*/
public abstract boolean saveToDatabase(T message);
/**
* 保存消息到失败记录
*
* @param message 消息对象
* @param cause 失败原因
*/
public abstract void saveToFailRecord(T message, String cause);
/**
* 获取最大允许重试次数
*
* @return 最大重试次数
*/
public int getMaxRetryAllowed() {
return 3;
}
/**
* 消息处理入口
*
* @param message 消息对象
* @param channel 通道
* @param message 消息对象
* @param channel 通道
* @param deliveryTag 投递标签
*/
public void onMessage(T message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
Integer dbRetryCount = getRetryCount(message);
message.setRetryCount(dbRetryCount);
if (message.getRetryCount() != null && message.getRetryCount() >= getMaxRetryAllowed()) {
message.setStatusCode(STATUS_ERROR);
message.addExtraData("errorMessage", "已达到最大重试次数");
saveToFailRecord(message, "已达到最大重试次数");
safeChannelAck(channel, deliveryTag);
return;
}
log.info("[MQ消费者] 收到消息: {}, 当前重试次数: {}/{}", message, message.getRetryCount(), getMaxRetryAllowed());
message.setStatusCode(STATUS_PROCESSING);
message.setStatusCode(STATUS_SUCCESS);
try {
boolean result = processMessage(message);
if (result) {
message.setStatusCode(STATUS_SUCCESS);
updateMessageStatus(message);
log.info("[MQ消费者] 消息处理成功,状态已更新为成功: {}", message.getId());
} else {
throw new RuntimeException("消息处理失败");
if(true){
throw new RuntimeException("测试异常");
}
processMessage(message);
safeChannelAck(channel, deliveryTag);
} catch (Exception e) {
message.setStatusCode(STATUS_ERROR);
message.addExtraData("errorMessage", e.getMessage());
message.setErrorMessage(e.getMessage());
log.error("[MQ消费者] 消息处理失败: {}, 错误: {}", message.getId(), e.getMessage());
message.setStatusCode(STATUS_SEND_EXCEPTION);
processMessage( message);
safeChannelAck(channel, deliveryTag);
message.incrementRetryCount();
updateRetryCount(message);
if (message.getRetryCount() >= getMaxRetryAllowed()) {
saveToDatabase(message);
log.warn("[MQ消费者] 消息已达到最大重试次数: {}, 确认消息并保存到失败记录表", message.getRetryCount());
saveToFailRecord(message, e.getMessage());
safeChannelAck(channel, deliveryTag);
} else {
log.info("[MQ消费者] 消息将重新入队重试: {}, 当前重试次数: {}", message.getId(), message.getRetryCount());
safeChannelNack(channel, deliveryTag, false, true);
}
}
}
/**
* 安全确认消息
*
* @param channel 通道
* @param channel 通道
* @param deliveryTag 投递标签
*/
protected void safeChannelAck(Channel channel, long deliveryTag) {
@@ -159,10 +74,10 @@ public abstract class AbstractRabbitMQConsumer<T extends BaseMqMessage> {
/**
* 安全拒绝消息
*
* @param channel 通道
* @param channel 通道
* @param deliveryTag 投递标签
* @param multiple 是否批量
* @param requeue 是否重新入队
* @param multiple 是否批量
* @param requeue 是否重新入队
*/
protected void safeChannelNack(Channel channel, long deliveryTag, boolean multiple, boolean requeue) {
try {

View File

@@ -1,4 +0,0 @@
/**
* 占位符,无特殊逻辑
*/
package com.tashow.cloud.mq.rabbitmq.core;

View File

@@ -1,4 +0,0 @@
/**
* 消息队列,基于 RabbitMQ 提供
*/
package com.tashow.cloud.mq.rabbitmq;

View File

@@ -1,15 +1,14 @@
package com.tashow.cloud.mq.rabbitmq.producer;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.tashow.cloud.common.util.json.JsonUtils;
import com.tashow.cloud.mq.core.BaseMqMessage;
import com.tashow.cloud.mq.core.CustomCorrelationData;
import com.tashow.cloud.mq.handler.FailRecordHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import jakarta.annotation.PostConstruct;
import java.util.UUID;
@@ -35,11 +34,9 @@ public abstract class AbstractRabbitMQProducer<T extends BaseMqMessage>
*/
@PostConstruct
public void initRabbitTemplate() {
log.info("[MQ生产者] 初始化RabbitTemplate: {}", rabbitTemplate);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(this);
rabbitTemplate.setConfirmCallback(this);
if (rabbitTemplate.isConfirmListener()) {
log.info("[MQ生产者] 确认回调已正确配置");
} else {
@@ -47,51 +44,38 @@ public abstract class AbstractRabbitMQProducer<T extends BaseMqMessage>
}
}
/**
* 将消息转换为字符串
*
* @param message 消息对象
* @return 消息字符串
*/
protected abstract String convertMessageToString(T message);
/**
* 异步发送消息自动生成correlationId
*
* @param message 消息对象
*/
public void asyncSendMessage(T message) {
String correlationId = UUID.randomUUID().toString();
asyncSendMessage(message, correlationId);
}
/**
* 异步发送消息使用指定的correlationId
*
* @param message 消息对象
* @param correlationId 关联ID
*/
public void asyncSendMessage(T message, String correlationId) {
log.info("[MQ生产者] 准备发送消息: {}, correlationId: {}", message, correlationId);
public void asyncSendMessage(T message) {
try {
String messageJson = convertMessageToString(message);
CustomCorrelationData correlationData = new CustomCorrelationData(correlationId, messageJson);
String messageJson = JsonUtils.toJsonString(message);
CorrelationData correlationData = new CorrelationData(messageJson);
failRecordHandler.saveMessageRecord(
message.getId(),
getExchange(),
getRoutingKey(),
null,
messageJson,
0
);
rabbitTemplate.convertAndSend(getExchange(), getRoutingKey(), message, correlationData);
log.info("[MQ生产者] 消息发送完成: {}, 状态: {}, 重试次数: {}, correlationId: {}",
message.getId(), message.getStatusCode(), message.getRetryCount(), correlationId);
} catch (Exception e) {
log.error("[MQ生产者] 消息发送异常: {}, correlationId: {}", e.getMessage(), correlationId, e);
throw e;
}
}
/**
* 获取交换机名称
*
* @return 交换机名称
*/
public abstract String getExchange();
/**
* 获取路由键
*
@@ -101,45 +85,13 @@ public abstract class AbstractRabbitMQProducer<T extends BaseMqMessage>
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
JSONObject jsonObject = JSON.parseObject(correlationData.getId());
Integer id = jsonObject.getInteger("id");
if (ack) {
log.info("[MQ生产者] 消息发送确认成功: {}", correlationData.getId());
failRecordHandler.updateMessageStatus(id);
} else {
log.error("[MQ生产者] 消息发送确认失败: {}, 原因: {}, correlationData={}",
correlationData.getId(), cause, correlationData);
if (failRecordHandler != null && correlationData instanceof CustomCorrelationData) {
CustomCorrelationData customData = (CustomCorrelationData) correlationData;
String messageContent = customData.getMessageContent();
failRecordHandler.saveFailRecord(
correlationData.getId(),
getExchange(),
getRoutingKey(),
cause,
messageContent
);
} else {
log.warn("[MQ生产者] 未配置FailRecordHandler或非CustomCorrelationData类型, 无法保存失败记录");
}
failRecordHandler.updateMessageStatusWithCause(id,cause);
}
}
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("[MQ生产者] 消息路由失败: exchange={}, routingKey={}, replyCode={}, replyText={}, message={}",
returned.getExchange(),
returned.getRoutingKey(),
returned.getReplyCode(),
returned.getReplyText(),
new String(returned.getMessage().getBody()));
if (failRecordHandler != null) {
failRecordHandler.saveFailRecord(
returned.getMessage().getMessageProperties().getCorrelationId(),
returned.getExchange(),
returned.getRoutingKey(),
"路由失败: " + returned.getReplyText(),
new String(returned.getMessage().getBody())
);
} else {
log.warn("[MQ生产者] 未配置FailRecordHandler, 无法保存失败记录");
}
}
}

View File

@@ -15,15 +15,7 @@ public abstract class AbstractMessageRetryTask<T> {
private static final Logger log = LoggerFactory.getLogger(AbstractMessageRetryTask.class);
/**
* 处理中状态码
*/
public static final int STATUS_PROCESSING = 3;
/**
* 失败状态码
*/
public static final int STATUS_FAILED = 4;
/**
* 获取消息重试服务
@@ -38,15 +30,7 @@ public abstract class AbstractMessageRetryTask<T> {
* @param record 记录对象
* @return 记录ID
*/
protected abstract String getRecordId(T record);
/**
* 获取关联ID
*
* @param record 记录对象
* @return 关联ID
*/
protected abstract String getCorrelationId(T record);
protected abstract Integer getRecordId(T record);
/**
* 执行重试
@@ -54,36 +38,10 @@ public abstract class AbstractMessageRetryTask<T> {
public void retryFailedMessages() {
try {
List<T> unprocessedRecords = getMessageRetryService().getUnprocessedRecords();
if (unprocessedRecords.isEmpty()) {
log.info("[MQ重试] 没有需要重试的消息");
return;
}
log.info("[MQ重试] 本次需要重试的消息数量: {}", unprocessedRecords.size());
for (T record : unprocessedRecords) {
try {
// 先将状态更新为处理中,避免其他实例重复处理
if (!getMessageRetryService().updateStatus(record, STATUS_PROCESSING)) {
continue; // 如果更新失败,跳过当前记录
}
// 执行重试
String recordId = getRecordId(record);
boolean success = getMessageRetryService().retryFailedMessage(recordId);
if (success) {
log.info("[MQ重试] 消息重试成功: {}", getCorrelationId(record));
} else {
log.warn("[MQ重试] 消息重试失败: {}", getCorrelationId(record));
getMessageRetryService().updateStatus(record, STATUS_FAILED);
}
} catch (Exception e) {
// 发生异常时,更新状态为失败
getMessageRetryService().updateStatus(record, STATUS_FAILED);
log.error("[MQ重试] 重试消息异常: {}", getCorrelationId(record), e);
}
Integer recordId = getRecordId(record);
getMessageRetryService().retryFailedMessage(recordId);
}
log.info("[MQ重试] 消息重试任务完成");
} catch (Exception e) {
log.error("[MQ重试] 执行消息重试任务异常", e);
}

View File

@@ -23,14 +23,6 @@ public interface MessageRetryService<T> {
* @param recordId 记录ID
* @return 重试结果
*/
boolean retryFailedMessage(String recordId);
/**
* 更新记录状态
*
* @param record 记录对象
* @param status 记录状态
* @return 更新结果
*/
boolean updateStatus(T record, int status);
void retryFailedMessage(Integer recordId);
}