This commit is contained in:
2025-06-18 17:14:27 +08:00
parent e384dc1163
commit 98bb3529ea
41 changed files with 2776 additions and 335 deletions

View File

@@ -2,32 +2,42 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.tashow.cloud</groupId>
<artifactId>tashow-framework</artifactId>
<groupId>com.tashow.cloud</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>tashow-framework-mq</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>消息队列,支持 Redis、RocketMQ、RabbitMQ、Kafka 四种</description>
<description>消息队列模块基于RabbitMQ等中间件</description>
<url>https://github.com/tashow/tashow-platform</url>
<dependencies>
<!-- DB 相关 -->
<dependency>
<groupId>com.tashow.cloud</groupId>
<artifactId>tashow-data-redis</artifactId>
</dependency>
<!-- RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<optional>true</optional>
</dependency>
<!-- Web Services -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>jakarta.servlet</groupId>
<artifactId>jakarta.servlet-api</artifactId>
<optional>true</optional>
</dependency>
</dependencies>

View File

@@ -0,0 +1,115 @@
package com.tashow.cloud.mq.core;
import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* MQ消息基类所有消息类型都应该继承此类
*
* @author tashow
*/
public class BaseMqMessage implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 消息ID默认为UUID
*/
private Integer id = UUID.randomUUID().hashCode();
/**
* 消息状态码
*/
private Integer statusCode;
/**
* 消息重试次数
*/
private Integer retryCount = 0;
/**
* 消息错误信息
*/
private String errorMessage;
/**
* 消息创建时间
*/
private Date createTime = new Date();
/**
* 扩展数据
*/
private Map<String, Object> extraData = new HashMap<>();
/**
* 增加重试次数
*/
public void incrementRetryCount() {
if (retryCount == null) {
retryCount = 0;
}
retryCount++;
}
/**
* 添加额外数据
*/
public void addExtraData(String key, Object value) {
if (extraData == null) {
extraData = new HashMap<>();
}
extraData.put(key, value);
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public Integer getStatusCode() {
return statusCode;
}
public void setStatusCode(Integer statusCode) {
this.statusCode = statusCode;
}
public Integer getRetryCount() {
return retryCount;
}
public void setRetryCount(Integer retryCount) {
this.retryCount = retryCount;
}
public String getErrorMessage() {
return errorMessage;
}
public void setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Map<String, Object> getExtraData() {
return extraData;
}
public void setExtraData(Map<String, Object> extraData) {
this.extraData = extraData;
}
}

View File

@@ -0,0 +1,44 @@
package com.tashow.cloud.mq.core;
import org.springframework.amqp.rabbit.connection.CorrelationData;
/**
* 自定义关联数据,用于存储额外的消息数据
*
* @author tashow
*/
public class CustomCorrelationData extends CorrelationData {
/**
* 消息内容
*/
private final String messageContent;
/**
* 构造函数
*
* @param id 关联ID
* @param messageContent 消息内容
*/
public CustomCorrelationData(String id, String messageContent) {
super(id);
this.messageContent = messageContent;
}
/**
* 获取消息内容
*
* @return 消息内容
*/
public String getMessageContent() {
return messageContent;
}
@Override
public String toString() {
return "CustomCorrelationData{" +
"id='" + getId() + '\'' +
", messageContent='" + messageContent + '\'' +
'}';
}
}

View File

@@ -0,0 +1,39 @@
package com.tashow.cloud.mq.handler;
/**
* 消息发送失败记录处理接口
*
* @author tashow
*/
public interface FailRecordHandler {
/**
* 保存消息发送失败记录
*
* @param correlationId 关联ID
* @param exchange 交换机
* @param routingKey 路由键
* @param cause 失败原因
* @param messageContent 消息内容
*/
void saveFailRecord(String correlationId, String exchange, String routingKey, String cause, String messageContent);
/**
* 检查是否达到告警阈值
*
* @return 是否需要告警
*/
default boolean checkAlertThreshold() {
return checkAlertThreshold(null);
}
/**
* 检查是否达到告警阈值,带错误信息
*
* @param cause 错误原因
* @return 是否需要告警
*/
default boolean checkAlertThreshold(String cause) {
return false;
}
}

View File

@@ -1,28 +1,19 @@
package com.tashow.cloud.mq.rabbitmq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
/**
* RabbitMQ 消息队列配置类
* RabbitMQ 消息队列自动配置类
*
* @author 芋道源码
* @author tashow
*/
@AutoConfiguration
@Slf4j
@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
public class RabbitMQAutoConfiguration {
public class RabbitMQAutoConfiguration extends RabbitMQConfiguration {
/**
* Jackson2JsonMessageConverter Bean使用 jackson 序列化消息
*/
@Bean
public MessageConverter createMessageConverter() {
return new Jackson2JsonMessageConverter();
}
private static final Logger log = LoggerFactory.getLogger(RabbitMQAutoConfiguration.class);
}
}

View File

@@ -0,0 +1,49 @@
package com.tashow.cloud.mq.rabbitmq.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ 配置类
*
* @author tashow
*/
@Configuration
@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
public class RabbitMQConfiguration {
private static final Logger log = LoggerFactory.getLogger(RabbitMQConfiguration.class);
/**
* 初始化RabbitTemplate
*
* @param rabbitTemplate RabbitTemplate
*/
protected void initRabbitTemplate(RabbitTemplate rabbitTemplate) {
log.info("[MQ配置] 初始化RabbitTemplate: {}", rabbitTemplate);
// 启用消息发送到交换机确认机制
rabbitTemplate.setMandatory(true);
if (rabbitTemplate.isConfirmListener()) {
log.info("[MQ配置] 确认回调已正确配置");
} else {
log.error("[MQ配置] 确认回调配置失败");
}
}
/**
* 创建消息转换器
*
* @return MessageConverter
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}

View File

@@ -0,0 +1,174 @@
package com.tashow.cloud.mq.rabbitmq.consumer;
import com.rabbitmq.client.Channel;
import com.tashow.cloud.mq.core.BaseMqMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
/**
* RabbitMQ消息消费者抽象类
*
* @param <T> 消息类型
* @author tashow
*/
public abstract class AbstractRabbitMQConsumer<T extends BaseMqMessage> {
private static final Logger log = LoggerFactory.getLogger(AbstractRabbitMQConsumer.class);
/**
* 消息状态:处理中
*/
public static final int STATUS_PROCESSING = 10;
/**
* 消息状态:成功
*/
public static final int STATUS_SUCCESS = 20;
/**
* 消息状态:失败
*/
public static final int STATUS_ERROR = 30;
/**
* 处理消息
*
* @param message 消息对象
* @return 处理结果true表示处理成功false表示处理失败
*/
public abstract boolean processMessage(T message);
/**
* 获取消息重试次数
*
* @param message 消息对象
* @return 重试次数
*/
public Integer getRetryCount(T message) {
return message.getRetryCount() != null ? message.getRetryCount() : 0;
}
/**
* 更新消息状态
*
* @param message 消息对象
*/
public abstract void updateMessageStatus(T message);
/**
* 更新消息重试次数
*
* @param message 消息对象
*/
public abstract void updateRetryCount(T message);
/**
* 保存消息到数据库
*
* @param message 消息对象
* @return 保存结果
*/
public abstract boolean saveToDatabase(T message);
/**
* 保存消息到失败记录
*
* @param message 消息对象
* @param cause 失败原因
*/
public abstract void saveToFailRecord(T message, String cause);
/**
* 获取最大允许重试次数
*
* @return 最大重试次数
*/
public int getMaxRetryAllowed() {
return 3;
}
/**
* 消息处理入口
*
* @param message 消息对象
* @param channel 通道
* @param deliveryTag 投递标签
*/
public void onMessage(T message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
Integer dbRetryCount = getRetryCount(message);
message.setRetryCount(dbRetryCount);
if (message.getRetryCount() != null && message.getRetryCount() >= getMaxRetryAllowed()) {
message.setStatusCode(STATUS_ERROR);
message.addExtraData("errorMessage", "已达到最大重试次数");
saveToFailRecord(message, "已达到最大重试次数");
safeChannelAck(channel, deliveryTag);
return;
}
log.info("[MQ消费者] 收到消息: {}, 当前重试次数: {}/{}", message, message.getRetryCount(), getMaxRetryAllowed());
message.setStatusCode(STATUS_PROCESSING);
try {
boolean result = processMessage(message);
if (result) {
message.setStatusCode(STATUS_SUCCESS);
updateMessageStatus(message);
log.info("[MQ消费者] 消息处理成功,状态已更新为成功: {}", message.getId());
} else {
throw new RuntimeException("消息处理失败");
}
safeChannelAck(channel, deliveryTag);
} catch (Exception e) {
message.setStatusCode(STATUS_ERROR);
message.addExtraData("errorMessage", e.getMessage());
message.setErrorMessage(e.getMessage());
log.error("[MQ消费者] 消息处理失败: {}, 错误: {}", message.getId(), e.getMessage());
message.incrementRetryCount();
updateRetryCount(message);
if (message.getRetryCount() >= getMaxRetryAllowed()) {
saveToDatabase(message);
log.warn("[MQ消费者] 消息已达到最大重试次数: {}, 确认消息并保存到失败记录表", message.getRetryCount());
saveToFailRecord(message, e.getMessage());
safeChannelAck(channel, deliveryTag);
} else {
log.info("[MQ消费者] 消息将重新入队重试: {}, 当前重试次数: {}", message.getId(), message.getRetryCount());
safeChannelNack(channel, deliveryTag, false, true);
}
}
}
/**
* 安全确认消息
*
* @param channel 通道
* @param deliveryTag 投递标签
*/
protected void safeChannelAck(Channel channel, long deliveryTag) {
try {
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("[MQ消费者] 确认消息失败: {}", e.getMessage());
}
}
/**
* 安全拒绝消息
*
* @param channel 通道
* @param deliveryTag 投递标签
* @param multiple 是否批量
* @param requeue 是否重新入队
*/
protected void safeChannelNack(Channel channel, long deliveryTag, boolean multiple, boolean requeue) {
try {
channel.basicNack(deliveryTag, multiple, requeue);
} catch (Exception e) {
log.error("[MQ消费者] 拒绝消息失败: {}", e.getMessage());
}
}
}

View File

@@ -0,0 +1,145 @@
package com.tashow.cloud.mq.rabbitmq.producer;
import com.tashow.cloud.mq.core.BaseMqMessage;
import com.tashow.cloud.mq.core.CustomCorrelationData;
import com.tashow.cloud.mq.handler.FailRecordHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import jakarta.annotation.PostConstruct;
import java.util.UUID;
/**
* RabbitMQ消息生产者抽象类
*
* @param <T> 消息类型
* @author tashow
*/
public abstract class AbstractRabbitMQProducer<T extends BaseMqMessage>
implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
private static final Logger log = LoggerFactory.getLogger(AbstractRabbitMQProducer.class);
@Autowired
protected RabbitTemplate rabbitTemplate;
@Autowired(required = false)
protected FailRecordHandler failRecordHandler;
/**
* 初始化RabbitTemplate
*/
@PostConstruct
public void initRabbitTemplate() {
log.info("[MQ生产者] 初始化RabbitTemplate: {}", rabbitTemplate);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(this);
rabbitTemplate.setConfirmCallback(this);
if (rabbitTemplate.isConfirmListener()) {
log.info("[MQ生产者] 确认回调已正确配置");
} else {
log.error("[MQ生产者] 确认回调配置失败");
}
}
/**
* 将消息转换为字符串
*
* @param message 消息对象
* @return 消息字符串
*/
protected abstract String convertMessageToString(T message);
/**
* 异步发送消息自动生成correlationId
*
* @param message 消息对象
*/
public void asyncSendMessage(T message) {
String correlationId = UUID.randomUUID().toString();
asyncSendMessage(message, correlationId);
}
/**
* 异步发送消息使用指定的correlationId
*
* @param message 消息对象
* @param correlationId 关联ID
*/
public void asyncSendMessage(T message, String correlationId) {
log.info("[MQ生产者] 准备发送消息: {}, correlationId: {}", message, correlationId);
try {
String messageJson = convertMessageToString(message);
CustomCorrelationData correlationData = new CustomCorrelationData(correlationId, messageJson);
rabbitTemplate.convertAndSend(getExchange(), getRoutingKey(), message, correlationData);
log.info("[MQ生产者] 消息发送完成: {}, 状态: {}, 重试次数: {}, correlationId: {}",
message.getId(), message.getStatusCode(), message.getRetryCount(), correlationId);
} catch (Exception e) {
log.error("[MQ生产者] 消息发送异常: {}, correlationId: {}", e.getMessage(), correlationId, e);
throw e;
}
}
/**
* 获取交换机名称
*
* @return 交换机名称
*/
public abstract String getExchange();
/**
* 获取路由键
*
* @return 路由键
*/
public abstract String getRoutingKey();
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("[MQ生产者] 消息发送确认成功: {}", correlationData.getId());
} else {
log.error("[MQ生产者] 消息发送确认失败: {}, 原因: {}, correlationData={}",
correlationData.getId(), cause, correlationData);
if (failRecordHandler != null && correlationData instanceof CustomCorrelationData) {
CustomCorrelationData customData = (CustomCorrelationData) correlationData;
String messageContent = customData.getMessageContent();
failRecordHandler.saveFailRecord(
correlationData.getId(),
getExchange(),
getRoutingKey(),
cause,
messageContent
);
} else {
log.warn("[MQ生产者] 未配置FailRecordHandler或非CustomCorrelationData类型, 无法保存失败记录");
}
}
}
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("[MQ生产者] 消息路由失败: exchange={}, routingKey={}, replyCode={}, replyText={}, message={}",
returned.getExchange(),
returned.getRoutingKey(),
returned.getReplyCode(),
returned.getReplyText(),
new String(returned.getMessage().getBody()));
if (failRecordHandler != null) {
failRecordHandler.saveFailRecord(
returned.getMessage().getMessageProperties().getCorrelationId(),
returned.getExchange(),
returned.getRoutingKey(),
"路由失败: " + returned.getReplyText(),
new String(returned.getMessage().getBody())
);
} else {
log.warn("[MQ生产者] 未配置FailRecordHandler, 无法保存失败记录");
}
}
}

View File

@@ -0,0 +1,91 @@
package com.tashow.cloud.mq.retry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* 消息重试任务抽象实现
*
* @param <T> 失败记录类型
* @author tashow
*/
public abstract class AbstractMessageRetryTask<T> {
private static final Logger log = LoggerFactory.getLogger(AbstractMessageRetryTask.class);
/**
* 处理中状态码
*/
public static final int STATUS_PROCESSING = 3;
/**
* 失败状态码
*/
public static final int STATUS_FAILED = 4;
/**
* 获取消息重试服务
*
* @return 消息重试服务
*/
protected abstract MessageRetryService<T> getMessageRetryService();
/**
* 获取记录ID
*
* @param record 记录对象
* @return 记录ID
*/
protected abstract String getRecordId(T record);
/**
* 获取关联ID
*
* @param record 记录对象
* @return 关联ID
*/
protected abstract String getCorrelationId(T record);
/**
* 执行重试
*/
public void retryFailedMessages() {
try {
List<T> unprocessedRecords = getMessageRetryService().getUnprocessedRecords();
if (unprocessedRecords.isEmpty()) {
log.info("[MQ重试] 没有需要重试的消息");
return;
}
log.info("[MQ重试] 本次需要重试的消息数量: {}", unprocessedRecords.size());
for (T record : unprocessedRecords) {
try {
// 先将状态更新为处理中,避免其他实例重复处理
if (!getMessageRetryService().updateStatus(record, STATUS_PROCESSING)) {
continue; // 如果更新失败,跳过当前记录
}
// 执行重试
String recordId = getRecordId(record);
boolean success = getMessageRetryService().retryFailedMessage(recordId);
if (success) {
log.info("[MQ重试] 消息重试成功: {}", getCorrelationId(record));
} else {
log.warn("[MQ重试] 消息重试失败: {}", getCorrelationId(record));
getMessageRetryService().updateStatus(record, STATUS_FAILED);
}
} catch (Exception e) {
// 发生异常时,更新状态为失败
getMessageRetryService().updateStatus(record, STATUS_FAILED);
log.error("[MQ重试] 重试消息异常: {}", getCorrelationId(record), e);
}
}
log.info("[MQ重试] 消息重试任务完成");
} catch (Exception e) {
log.error("[MQ重试] 执行消息重试任务异常", e);
}
}
}

View File

@@ -0,0 +1,36 @@
package com.tashow.cloud.mq.retry;
import java.util.List;
/**
* 消息重试服务接口
*
* @param <T> 失败记录类型
* @author tashow
*/
public interface MessageRetryService<T> {
/**
* 获取未处理的失败记录
*
* @return 失败记录列表
*/
List<T> getUnprocessedRecords();
/**
* 重试失败消息
*
* @param recordId 记录ID
* @return 重试结果
*/
boolean retryFailedMessage(String recordId);
/**
* 更新记录状态
*
* @param record 记录对象
* @param status 记录状态
* @return 更新结果
*/
boolean updateStatus(T record, int status);
}