初始化
This commit is contained in:
60
tashow-framework/tashow-framework-mq/pom.xml
Normal file
60
tashow-framework/tashow-framework-mq/pom.xml
Normal file
@@ -0,0 +1,60 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>tashow-framework</artifactId>
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
<version>${revision}</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>tashow-framework-mq</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<name>${project.artifactId}</name>
|
||||
<description>消息队列模块,基于RabbitMQ等中间件</description>
|
||||
<url>https://github.com/tashow/tashow-platform</url>
|
||||
|
||||
<dependencies>
|
||||
<!-- RabbitMQ -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-amqp</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
|
||||
<!-- Web Services -->
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-web</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-webmvc</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>jakarta.servlet</groupId>
|
||||
<artifactId>jakarta.servlet-api</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
<artifactId>tashow-data-mybatis</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.jodd</groupId>
|
||||
<artifactId>jodd-util</artifactId>
|
||||
<version>6.3.0</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,46 @@
|
||||
package com.tashow.cloud.mq.core;
|
||||
import lombok.Data;
|
||||
import java.io.Serializable;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
/**
|
||||
* MQ消息基类,
|
||||
*
|
||||
*
|
||||
* @author tashow
|
||||
*/
|
||||
@Data
|
||||
public class BaseMqMessage implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
/**
|
||||
* 消息ID,默认为UUID
|
||||
*/
|
||||
private Integer id = UUID.randomUUID().hashCode();
|
||||
/**
|
||||
* 消息状态码
|
||||
*/
|
||||
private Integer statusCode;
|
||||
|
||||
/**
|
||||
* 消息重试次数
|
||||
*/
|
||||
private Integer retryCount = 0;
|
||||
|
||||
/**
|
||||
* 消息错误信息
|
||||
*/
|
||||
private String errorMessage;
|
||||
|
||||
/**
|
||||
* 消息创建时间
|
||||
*/
|
||||
private Date createTime = new Date();
|
||||
|
||||
/**
|
||||
* 扩展数据
|
||||
*/
|
||||
private Map<String, Object> extraData = new HashMap<>();
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
package com.tashow.cloud.mq.handler;
|
||||
/**
|
||||
* 消息记录处理接口
|
||||
*
|
||||
* @author tashow
|
||||
*/
|
||||
public interface FailRecordHandler {
|
||||
/**
|
||||
* 保存消息记录
|
||||
*
|
||||
* @param exchange 交换机
|
||||
* @param routingKey 路由键
|
||||
* @param cause 失败原因,可为null
|
||||
* @param messageContent 消息内容
|
||||
* @param status 状态:0-未处理,1-处理成功,2-处理失败
|
||||
*/
|
||||
void saveMessageRecord(Integer id, String exchange, String routingKey, String cause, String messageContent, int status);
|
||||
/**
|
||||
* 更新消息状态
|
||||
*
|
||||
* @param id 关联ID
|
||||
*/
|
||||
void updateMessageStatus(Integer id);
|
||||
|
||||
/**
|
||||
* 更新消息状态并设置失败原因
|
||||
|
||||
*/
|
||||
void updateMessageStatusWithCause(Integer id, String causes);
|
||||
|
||||
}
|
||||
@@ -0,0 +1,16 @@
|
||||
package com.tashow.cloud.mq.rabbitmq.config;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
|
||||
/**
|
||||
* RabbitMQ 消息队列自动配置类
|
||||
*
|
||||
* @author tashow
|
||||
*/
|
||||
@AutoConfiguration
|
||||
@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
|
||||
public class RabbitMQAutoConfiguration extends RabbitMQConfiguration {
|
||||
private static final Logger log = LoggerFactory.getLogger(RabbitMQAutoConfiguration.class);
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
package com.tashow.cloud.mq.rabbitmq.config;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* RabbitMQ 配置类
|
||||
*
|
||||
* @author tashow
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
|
||||
public class RabbitMQConfiguration {
|
||||
/**
|
||||
* 创建消息转换器
|
||||
*
|
||||
* @return MessageConverter
|
||||
*/
|
||||
@Bean
|
||||
public MessageConverter messageConverter() {
|
||||
return new Jackson2JsonMessageConverter();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,89 @@
|
||||
package com.tashow.cloud.mq.rabbitmq.consumer;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.tashow.cloud.mq.core.BaseMqMessage;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.amqp.support.AmqpHeaders;
|
||||
import org.springframework.messaging.handler.annotation.Header;
|
||||
|
||||
/**
|
||||
* RabbitMQ消息消费者抽象类
|
||||
*
|
||||
* @param <T> 消息类型
|
||||
* @author tashow
|
||||
*/
|
||||
public abstract class AbstractRabbitMQConsumer<T extends BaseMqMessage> {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(AbstractRabbitMQConsumer.class);
|
||||
/**
|
||||
* 消息状态:成功
|
||||
*/
|
||||
public static final int STATUS_SUCCESS = 20;
|
||||
/**
|
||||
* 消息状态:消费异常
|
||||
*/
|
||||
public static final int STATUS_SEND_EXCEPTION = 30;
|
||||
|
||||
|
||||
/**
|
||||
* 埋点处理消息
|
||||
*
|
||||
* @param message 消息对象
|
||||
* @return 处理结果,true表示处理成功,false表示处理失败
|
||||
*/
|
||||
public abstract boolean processMessage(T message);
|
||||
|
||||
|
||||
/**
|
||||
* 消息处理入口
|
||||
*
|
||||
* @param message 消息对象
|
||||
* @param channel 通道
|
||||
* @param deliveryTag 投递标签
|
||||
*/
|
||||
public void onMessage(T message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
|
||||
message.setStatusCode(STATUS_SUCCESS);
|
||||
try {
|
||||
if(true){
|
||||
throw new RuntimeException("测试异常");
|
||||
}
|
||||
processMessage(message);
|
||||
safeChannelAck(channel, deliveryTag);
|
||||
} catch (Exception e) {
|
||||
message.setStatusCode(STATUS_SEND_EXCEPTION);
|
||||
processMessage( message);
|
||||
safeChannelAck(channel, deliveryTag);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 安全确认消息
|
||||
*
|
||||
* @param channel 通道
|
||||
* @param deliveryTag 投递标签
|
||||
*/
|
||||
protected void safeChannelAck(Channel channel, long deliveryTag) {
|
||||
try {
|
||||
channel.basicAck(deliveryTag, false);
|
||||
} catch (Exception e) {
|
||||
log.error("[MQ消费者] 确认消息失败: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 安全拒绝消息
|
||||
*
|
||||
* @param channel 通道
|
||||
* @param deliveryTag 投递标签
|
||||
* @param multiple 是否批量
|
||||
* @param requeue 是否重新入队
|
||||
*/
|
||||
protected void safeChannelNack(Channel channel, long deliveryTag, boolean multiple, boolean requeue) {
|
||||
try {
|
||||
channel.basicNack(deliveryTag, multiple, requeue);
|
||||
} catch (Exception e) {
|
||||
log.error("[MQ消费者] 拒绝消息失败: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,97 @@
|
||||
package com.tashow.cloud.mq.rabbitmq.producer;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.tashow.cloud.common.util.json.JsonUtils;
|
||||
import com.tashow.cloud.mq.core.BaseMqMessage;
|
||||
import com.tashow.cloud.mq.handler.FailRecordHandler;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.amqp.rabbit.connection.CorrelationData;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* RabbitMQ消息生产者抽象类
|
||||
*
|
||||
* @param <T> 消息类型
|
||||
* @author tashow
|
||||
*/
|
||||
public abstract class AbstractRabbitMQProducer<T extends BaseMqMessage>
|
||||
implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(AbstractRabbitMQProducer.class);
|
||||
|
||||
@Autowired
|
||||
protected RabbitTemplate rabbitTemplate;
|
||||
|
||||
@Autowired(required = false)
|
||||
protected FailRecordHandler failRecordHandler;
|
||||
|
||||
/**
|
||||
* 初始化RabbitTemplate
|
||||
*/
|
||||
@PostConstruct
|
||||
public void initRabbitTemplate() {
|
||||
rabbitTemplate.setMandatory(true);
|
||||
rabbitTemplate.setReturnsCallback(this);
|
||||
rabbitTemplate.setConfirmCallback(this);
|
||||
if (rabbitTemplate.isConfirmListener()) {
|
||||
log.info("[MQ生产者] 确认回调已正确配置");
|
||||
} else {
|
||||
log.error("[MQ生产者] 确认回调配置失败");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 异步发送消息,使用指定的correlationId
|
||||
*
|
||||
* @param message 消息对象
|
||||
*/
|
||||
public void asyncSendMessage(T message) {
|
||||
try {
|
||||
String messageJson = JsonUtils.toJsonString(message);
|
||||
CorrelationData correlationData = new CorrelationData(messageJson);
|
||||
failRecordHandler.saveMessageRecord(
|
||||
message.getId(),
|
||||
getExchange(),
|
||||
getRoutingKey(),
|
||||
null,
|
||||
messageJson,
|
||||
0
|
||||
);
|
||||
rabbitTemplate.convertAndSend(getExchange(), getRoutingKey(), message, correlationData);
|
||||
} catch (Exception e) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* 获取交换机名称
|
||||
*
|
||||
* @return 交换机名称
|
||||
*/
|
||||
public abstract String getExchange();
|
||||
/**
|
||||
* 获取路由键
|
||||
*
|
||||
* @return 路由键
|
||||
*/
|
||||
public abstract String getRoutingKey();
|
||||
|
||||
@Override
|
||||
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
|
||||
JSONObject jsonObject = JSON.parseObject(correlationData.getId());
|
||||
Integer id = jsonObject.getInteger("id");
|
||||
if (ack) {
|
||||
failRecordHandler.updateMessageStatus(id);
|
||||
} else {
|
||||
failRecordHandler.updateMessageStatusWithCause(id,cause);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,49 @@
|
||||
package com.tashow.cloud.mq.retry;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 消息重试任务抽象实现
|
||||
*
|
||||
* @param <T> 失败记录类型
|
||||
* @author tashow
|
||||
*/
|
||||
public abstract class AbstractMessageRetryTask<T> {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(AbstractMessageRetryTask.class);
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 获取消息重试服务
|
||||
*
|
||||
* @return 消息重试服务
|
||||
*/
|
||||
protected abstract MessageRetryService<T> getMessageRetryService();
|
||||
|
||||
/**
|
||||
* 获取记录ID
|
||||
*
|
||||
* @param record 记录对象
|
||||
* @return 记录ID
|
||||
*/
|
||||
protected abstract Integer getRecordId(T record);
|
||||
|
||||
/**
|
||||
* 执行重试
|
||||
*/
|
||||
public void retryFailedMessages() {
|
||||
try {
|
||||
List<T> unprocessedRecords = getMessageRetryService().getUnprocessedRecords();
|
||||
for (T record : unprocessedRecords) {
|
||||
Integer recordId = getRecordId(record);
|
||||
getMessageRetryService().retryFailedMessage(recordId);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[MQ重试] 执行消息重试任务异常", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
package com.tashow.cloud.mq.retry;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 消息重试服务接口
|
||||
*
|
||||
* @param <T> 失败记录类型
|
||||
* @author tashow
|
||||
*/
|
||||
public interface MessageRetryService<T> {
|
||||
|
||||
/**
|
||||
* 获取未处理的失败记录
|
||||
*
|
||||
* @return 失败记录列表
|
||||
*/
|
||||
List<T> getUnprocessedRecords();
|
||||
|
||||
/**
|
||||
* 重试失败消息
|
||||
*
|
||||
* @param recordId 记录ID
|
||||
* @return 重试结果
|
||||
*/
|
||||
void retryFailedMessage(Integer recordId);
|
||||
|
||||
}
|
||||
@@ -0,0 +1 @@
|
||||
com.tashow.cloud.mq.rabbitmq.config.RabbitMQAutoConfiguration
|
||||
Reference in New Issue
Block a user