提交
This commit is contained in:
@@ -20,6 +20,10 @@
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
<artifactId>tashow-data-redis</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-amqp</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.amqp</groupId>
|
||||
|
||||
@@ -28,7 +28,10 @@
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
<artifactId>tashow-framework-rpc</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
<artifactId>tashow-data-mybatis</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
<artifactId>tashow-framework-web</artifactId>
|
||||
@@ -55,6 +58,10 @@
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
<artifactId>tashow-framework-security</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-actuator</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
</dependencies>
|
||||
|
||||
@@ -0,0 +1,114 @@
|
||||
package com.tashow.cloud.app.mq.config;
|
||||
import com.tashow.cloud.app.mq.interceptor.BuriedPointInterceptor;
|
||||
import com.tashow.cloud.app.mq.message.BuriedMessages;
|
||||
import com.tashow.cloud.app.mq.producer.buriedPoint.BuriedPointProducer;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.amqp.core.Binding;
|
||||
import org.springframework.amqp.core.BindingBuilder;
|
||||
import org.springframework.amqp.core.DirectExchange;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
|
||||
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
|
||||
/**
|
||||
* 埋点功能配置类
|
||||
*/
|
||||
@Slf4j
|
||||
@Configuration
|
||||
@RequiredArgsConstructor
|
||||
public class BuriedPointConfiguration implements WebMvcConfigurer {
|
||||
|
||||
private final BuriedPointProducer buriedPointProducer;
|
||||
private final RabbitTemplate rabbitTemplate;
|
||||
|
||||
/**
|
||||
* RabbitTemplate初始化配置
|
||||
* 确保回调正确配置,以实现消息可靠性
|
||||
*/
|
||||
// @PostConstruct
|
||||
// public void initRabbitTemplate() {
|
||||
// log.info("[埋点配置] 初始化RabbitTemplate: {}", rabbitTemplate);
|
||||
// rabbitTemplate.setMandatory(true);
|
||||
// rabbitTemplate.setReturnsCallback(returned -> {
|
||||
// log.error("[埋点配置] 消息路由失败: exchange={}, routingKey={}, replyCode={}, replyText={}, message={}",
|
||||
// returned.getExchange(),
|
||||
// returned.getRoutingKey(),
|
||||
// returned.getReplyCode(),
|
||||
// returned.getReplyText(),
|
||||
// new String(returned.getMessage().getBody()));
|
||||
// });
|
||||
// rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
|
||||
// if (ack) {
|
||||
// log.debug("[埋点配置] 消息成功发送到交换机: {}", correlationData);
|
||||
// } else {
|
||||
// log.error("[埋点配置] 消息发送到交换机失败: cause={}, correlationData={}", cause, correlationData);
|
||||
// }
|
||||
// });
|
||||
//
|
||||
// // 验证配置
|
||||
// if (rabbitTemplate.isConfirmListener()) {
|
||||
// log.info("[埋点配置] 确认回调已正确配置");
|
||||
// } else {
|
||||
// log.error("[埋点配置] 确认回调配置失败,请检查RabbitMQ配置!");
|
||||
// }
|
||||
// }
|
||||
|
||||
/**
|
||||
* 创建埋点队列
|
||||
*/
|
||||
@Bean
|
||||
public Queue buriedPointQueue() {
|
||||
return new Queue(BuriedMessages.QUEUE, true, false, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建埋点交换机
|
||||
*/
|
||||
@Bean
|
||||
public DirectExchange buriedPointExchange() {
|
||||
return new DirectExchange(BuriedMessages.EXCHANGE, true, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建埋点绑定关系
|
||||
*/
|
||||
@Bean
|
||||
public Binding buriedPointBinding() {
|
||||
return BindingBuilder.bind(buriedPointQueue())
|
||||
.to(buriedPointExchange())
|
||||
.with(BuriedMessages.ROUTING_KEY);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建埋点拦截器
|
||||
*/
|
||||
@Bean
|
||||
public BuriedPointInterceptor buriedPointInterceptor() {
|
||||
return new BuriedPointInterceptor(buriedPointProducer);
|
||||
}
|
||||
|
||||
/**
|
||||
* 注册埋点拦截器
|
||||
*/
|
||||
@Override
|
||||
public void addInterceptors(InterceptorRegistry registry) {
|
||||
// 注册拦截器,拦截所有请求
|
||||
registry.addInterceptor(buriedPointInterceptor())
|
||||
// 可以根据需要添加或排除特定路径
|
||||
.addPathPatterns("/**")
|
||||
// 排除静态资源、Swagger等路径
|
||||
.excludePathPatterns(
|
||||
"/swagger-ui/**",
|
||||
"/swagger-resources/**",
|
||||
"/v3/api-docs/**",
|
||||
"/webjars/**",
|
||||
"/static/**",
|
||||
"/error"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,118 @@
|
||||
/*
|
||||
package com.tashow.cloud.app.mq.consumer.buriedPoint;
|
||||
import com.tashow.cloud.app.mq.mapper.BuriedPointMapper;
|
||||
import com.tashow.cloud.app.mq.message.BuriedMessages;
|
||||
import com.tashow.cloud.app.mq.model.BuriedPoint;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.amqp.support.AmqpHeaders;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.messaging.handler.annotation.Header;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Date;
|
||||
import com.rabbitmq.client.Channel;
|
||||
*/
|
||||
/**
|
||||
* 埋点消息消费者
|
||||
* 将埋点数据存储到数据库
|
||||
*//*
|
||||
|
||||
@Component
|
||||
@RabbitListener(queues = BuriedMessages.QUEUE)
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class BuriedPointConsumer {
|
||||
|
||||
private final BuriedPointMapper buriedPointMapper;
|
||||
|
||||
@Value("${spring.application.name:tashow-app}")
|
||||
private String applicationName;
|
||||
|
||||
*/
|
||||
/**
|
||||
* 处理埋点消息
|
||||
*//*
|
||||
|
||||
@RabbitHandler
|
||||
public void onMessage(BuriedMessages message,
|
||||
Channel channel,
|
||||
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
|
||||
try {
|
||||
log.info("[埋点消费者] 收到埋点消息: {}", message);
|
||||
|
||||
// 确保事件ID不为空
|
||||
if (message.getId() == null) {
|
||||
message.setId((int)(System.currentTimeMillis() % Integer.MAX_VALUE));
|
||||
log.warn("[埋点消费者] 消息中的事件ID为空,已自动生成: {}", message.getId());
|
||||
}
|
||||
saveToDatabase(message);
|
||||
channel.basicAck(deliveryTag, false);
|
||||
log.info("[埋点消费者] 消息处理成功,已确认");
|
||||
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
channel.basicNack(deliveryTag, false, true);
|
||||
} catch (IOException ex) {
|
||||
log.error("[埋点消费者] 拒绝消息失败", ex);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
*/
|
||||
/**
|
||||
* 将埋点数据保存到数据库
|
||||
*//*
|
||||
|
||||
private void saveToDatabase(BuriedMessages message) {
|
||||
try {
|
||||
log.debug("[埋点消费者] 准备保存埋点数据,事件ID: {}", message.getId());
|
||||
|
||||
// 转换消息为实体
|
||||
BuriedPoint buriedPoint = new BuriedPoint();
|
||||
|
||||
// 设置必填字段,确保不为空
|
||||
buriedPoint.setEventId(message.getId());
|
||||
buriedPoint.setEventTime(message.getEventTime());
|
||||
|
||||
// 获取真实用户ID,避免使用默认anonymous
|
||||
String userId = message.getUserId();
|
||||
buriedPoint.setUserId(StringUtils.hasText(userId) && !"null".equals(userId) ? userId : "anonymous");
|
||||
|
||||
String eventType = message.getEventType();
|
||||
buriedPoint.setEventType(eventType);
|
||||
buriedPoint.setService(applicationName);
|
||||
|
||||
// 设置method字段,确保获取真实方法名
|
||||
buriedPoint.setMethod(message.getMethod());
|
||||
buriedPoint.setSessionId(message.getSessionId());
|
||||
buriedPoint.setClientIp(message.getClientIp());
|
||||
buriedPoint.setServerIp(message.getServerIp());
|
||||
|
||||
// 设置其他字段
|
||||
buriedPoint.setPagePath(message.getPagePath());
|
||||
buriedPoint.setElementId(message.getElementId());
|
||||
buriedPoint.setDuration(message.getDuration());
|
||||
|
||||
buriedPoint.setCreateTime(new Date());
|
||||
|
||||
log.debug("[埋点消费者] 埋点实体数据: eventId={}, eventType={}, userId={}, service={}, method={}",
|
||||
buriedPoint.getEventId(), buriedPoint.getEventType(),
|
||||
buriedPoint.getUserId(), buriedPoint.getService(), buriedPoint.getMethod());
|
||||
|
||||
int result = buriedPointMapper.insert(buriedPoint);
|
||||
|
||||
log.info("[埋点消费者] 埋点数据已保存到数据库, 事件ID: {}, 影响行数: {}", message.getId(), result);
|
||||
} catch (Exception e) {
|
||||
log.error("[埋点消费者] 保存埋点数据到数据库失败, 事件ID: {}, 错误: {}",
|
||||
message.getId(), e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
} */
|
||||
@@ -0,0 +1,103 @@
|
||||
package com.tashow.cloud.app.mq.interceptor;
|
||||
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import com.tashow.cloud.app.mq.message.BuriedMessages;
|
||||
import com.tashow.cloud.app.mq.producer.buriedPoint.BuriedPointProducer;
|
||||
import com.tashow.cloud.common.util.json.JsonUtils;
|
||||
import com.tashow.cloud.common.util.servlet.ServletUtils;
|
||||
import com.tashow.cloud.common.util.spring.SpringUtils;
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
import jakarta.servlet.http.HttpServletResponse;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.util.StopWatch;
|
||||
import org.springframework.web.method.HandlerMethod;
|
||||
import org.springframework.web.servlet.HandlerInterceptor;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
|
||||
/**
|
||||
* 后端静默埋点拦截器
|
||||
* 用于收集API请求信息并异步发送到消息队列
|
||||
*/
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class BuriedPointInterceptor implements HandlerInterceptor {
|
||||
|
||||
private final BuriedPointProducer buriedPointProducer;
|
||||
|
||||
private static final String ATTRIBUTE_STOPWATCH = "BuriedPoint.StopWatch";
|
||||
private static final String ATTRIBUTE_REQUEST_ID = "BuriedPoint.RequestId";
|
||||
|
||||
@Override
|
||||
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
|
||||
if (!(handler instanceof HandlerMethod)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
try {
|
||||
// 开始计时
|
||||
StopWatch stopWatch = new StopWatch();
|
||||
stopWatch.start();
|
||||
request.setAttribute(ATTRIBUTE_STOPWATCH, stopWatch);
|
||||
|
||||
// 生成请求ID
|
||||
int requestId = (int)(Math.abs(IdUtil.getSnowflakeNextId()) % Integer.MAX_VALUE);
|
||||
request.setAttribute(ATTRIBUTE_REQUEST_ID, requestId);
|
||||
|
||||
// 收集埋点数据
|
||||
HandlerMethod handlerMethod = (HandlerMethod) handler;
|
||||
String method = request.getMethod() + " " + request.getRequestURI()+ JsonUtils.toJsonString(request.getParameterMap());
|
||||
String controllerName = handlerMethod.getBeanType().getSimpleName();
|
||||
String actionName = handlerMethod.getMethod().getName();
|
||||
|
||||
// 创建埋点消息
|
||||
BuriedMessages message = new BuriedMessages();
|
||||
message.setId(requestId);
|
||||
message.setEventTime(System.currentTimeMillis());
|
||||
message.setService(SpringUtils.getApplicationName());
|
||||
message.setMethod(method);
|
||||
message.setUserId(getUserId(request));
|
||||
message.setSessionId(request.getSession().getId());
|
||||
message.setClientIp(ServletUtils.getClientIP(request));
|
||||
message.setServerIp(getServerIp());
|
||||
message.setEventType("API_REQUEST_START");
|
||||
message.setPagePath(controllerName + "#" + actionName);
|
||||
message.setUserAgent(request.getHeader("User-Agent"));
|
||||
message.setStatusCode(BuriedMessages.STATUS_INIT);
|
||||
buriedPointProducer.asyncSendMessage(message);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("[埋点] 收集请求开始数据: {}", message);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("[埋点] 埋点数据收集异常", e);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取当前登录用户ID
|
||||
* 如果未登录返回匿名标识
|
||||
*/
|
||||
private String getUserId(HttpServletRequest request) {
|
||||
Object userAttribute = request.getSession().getAttribute("USER_ID");
|
||||
if (userAttribute != null) {
|
||||
return userAttribute.toString();
|
||||
}
|
||||
// 返回匿名用户标识
|
||||
return "anonymous";
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取服务器IP
|
||||
*/
|
||||
private String getServerIp() {
|
||||
try {
|
||||
return InetAddress.getLocalHost().getHostAddress();
|
||||
} catch (UnknownHostException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
package com.tashow.cloud.app.mq.mapper;
|
||||
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import com.tashow.cloud.app.mq.model.BuriedPoint;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
|
||||
|
||||
@Mapper
|
||||
public interface BuriedPointMapper extends BaseMapper<BuriedPoint> {
|
||||
}
|
||||
@@ -0,0 +1,71 @@
|
||||
package com.tashow.cloud.app.mq.message;
|
||||
import lombok.Data;
|
||||
import java.io.Serializable;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@Data
|
||||
public class BuriedMessages implements Serializable {
|
||||
|
||||
// 消息队列配置
|
||||
public static final String QUEUE = "BURIED_POINT_QUEUE";
|
||||
public static final String EXCHANGE = "BURIED_POINT_EXCHANGE";
|
||||
public static final String ROUTING_KEY = "BURIED_POINT_ROUTING_KEY";
|
||||
public static final String DEAD_LETTER_EXCHANGE = "DEAD_LETTER_EXCHANGE";
|
||||
public static final String DEAD_LETTER_ROUTING_KEY = "DEAD_LETTER_ROUTING";
|
||||
public static final String DEAD_LETTER_QUEUE = "DEAD_LETTER_QUEUE";
|
||||
|
||||
// 状态码定义
|
||||
public static final Integer STATUS_INIT = 10; // 初始状态
|
||||
public static final Integer STATUS_PROCESSING = 20; // 处理中
|
||||
public static final Integer STATUS_SUCCESS = 30; // 处理成功
|
||||
public static final Integer STATUS_WARNING = 40; // 处理警告
|
||||
public static final Integer STATUS_ERROR = 50; // 处理错误
|
||||
|
||||
// 通用字段
|
||||
private Integer id; // 事件唯一ID
|
||||
private Long eventTime; // 事件时间戳
|
||||
private String service; // 服务名称
|
||||
private String method; // 方法/接口
|
||||
private String userId; // 用户标识
|
||||
private String sessionId; // 会话标识
|
||||
private String clientIp; // 客户端IP
|
||||
private String serverIp; // 服务器IP
|
||||
|
||||
// 添加埋点特定字段
|
||||
private String eventType; // 事件类型: PAGE_VIEW, API_CALL, BUTTON_CLICK 等
|
||||
private String pagePath; // 页面路径/功能模块
|
||||
private String elementId; // 元素标识
|
||||
private Long duration; // 操作时长(毫秒)
|
||||
private String deviceInfo; // 设备信息
|
||||
private String userAgent; // 用户代理信息
|
||||
private Integer statusCode; // 响应状态码
|
||||
private String errorMessage; // 错误信息
|
||||
|
||||
// 扩展字段,用于存储特定事件的额外数据
|
||||
private Map<String, Object> extraData = new HashMap<>();
|
||||
|
||||
/**
|
||||
* 快速创建消息的便捷方法
|
||||
*/
|
||||
public static BuriedMessages create(String userId, String eventType, String pagePath) {
|
||||
BuriedMessages msg = new BuriedMessages();
|
||||
msg.setUserId(userId);
|
||||
msg.setEventType(eventType);
|
||||
msg.setPagePath(pagePath);
|
||||
msg.setEventTime(System.currentTimeMillis());
|
||||
msg.setStatusCode(STATUS_INIT); // 默认初始状态
|
||||
return msg;
|
||||
}
|
||||
|
||||
/**
|
||||
* 添加扩展数据
|
||||
*/
|
||||
public BuriedMessages addExtraData(String key, Object value) {
|
||||
if (this.extraData == null) {
|
||||
this.extraData = new HashMap<>();
|
||||
}
|
||||
this.extraData.put(key, value);
|
||||
return this;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,109 @@
|
||||
package com.tashow.cloud.app.mq.model;
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.IdType;
|
||||
import com.baomidou.mybatisplus.annotation.TableField;
|
||||
import com.baomidou.mybatisplus.annotation.TableId;
|
||||
import com.baomidou.mybatisplus.annotation.TableName;
|
||||
import lombok.Data;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
* 埋点数据实体类
|
||||
*/
|
||||
@Data
|
||||
@Accessors(chain = true)
|
||||
@TableName(value = "app_burying")
|
||||
public class BuriedPoint {
|
||||
/**
|
||||
* 主键ID
|
||||
*/
|
||||
@TableId(value = "id", type = IdType.AUTO)
|
||||
private Integer id;
|
||||
|
||||
/**
|
||||
* 事件唯一ID
|
||||
*/
|
||||
@TableField(value = "event_id")
|
||||
private Integer eventId;
|
||||
|
||||
/**
|
||||
* 事件时间戳
|
||||
*/
|
||||
@TableField(value = "event_time")
|
||||
private Long eventTime;
|
||||
|
||||
/**
|
||||
* 服务名称
|
||||
*/
|
||||
@TableField(value = "service")
|
||||
private String service;
|
||||
|
||||
/**
|
||||
* 方法/接口
|
||||
*/
|
||||
@TableField(value = "method")
|
||||
private String method;
|
||||
|
||||
/**
|
||||
* 用户标识
|
||||
*/
|
||||
@TableField(value = "user_id")
|
||||
private String userId;
|
||||
|
||||
/**
|
||||
* 会话标识
|
||||
*/
|
||||
@TableField(value = "session_id")
|
||||
private String sessionId;
|
||||
|
||||
/**
|
||||
* 客户端IP
|
||||
*/
|
||||
@TableField(value = "client_ip")
|
||||
private String clientIp;
|
||||
|
||||
/**
|
||||
* 服务器IP
|
||||
*/
|
||||
@TableField(value = "server_ip")
|
||||
private String serverIp;
|
||||
|
||||
/**
|
||||
* 事件类型
|
||||
*/
|
||||
@TableField(value = "event_type")
|
||||
private String eventType;
|
||||
|
||||
/**
|
||||
* 页面路径/功能模块
|
||||
*/
|
||||
@TableField(value = "page_path")
|
||||
private String pagePath;
|
||||
|
||||
/**
|
||||
* 元素标识
|
||||
*/
|
||||
@TableField(value = "element_id")
|
||||
private String elementId;
|
||||
|
||||
/**
|
||||
* 操作时长(毫秒)
|
||||
*/
|
||||
@TableField(value = "duration")
|
||||
private Long duration;
|
||||
|
||||
/**
|
||||
* 创建时间
|
||||
*/
|
||||
@TableField(value = "create_time")
|
||||
private Date createTime;
|
||||
|
||||
|
||||
@TableField(value = "update_time")
|
||||
private Date updateTime;
|
||||
|
||||
@TableField(value = "status")
|
||||
private Integer status;
|
||||
}
|
||||
@@ -0,0 +1,49 @@
|
||||
package com.tashow.cloud.app.mq.producer.buriedPoint;
|
||||
import com.tashow.cloud.app.mq.message.BuriedMessages;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.rabbit.connection.CorrelationData;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Component;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* 埋点消息生产者
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class BuriedPointProducer {
|
||||
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
/**
|
||||
* 异步发送完整的埋点消息,并确保消息已被broker接收
|
||||
*/
|
||||
@SneakyThrows
|
||||
public void asyncSendMessage(BuriedMessages message) {
|
||||
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
|
||||
// final CompletableFuture<Boolean> confirmFuture = new CompletableFuture<>();
|
||||
log.info("[埋点] 异步准备发送消息: {}", message);
|
||||
correlationData.getFuture().whenComplete((confirm, ex) -> {
|
||||
log.info("[埋点] 异步消息发送确认回调: {}", message);
|
||||
if (ex != null) {
|
||||
log.error("[埋点] 异步消息发送异常: {}", ex.getMessage(), ex);
|
||||
// confirmFuture.completeExceptionally(ex);
|
||||
} else if (confirm != null && confirm.isAck()) {
|
||||
log.info("[埋点] 异步消息发送成功: {}", message);
|
||||
// confirmFuture.complete(true);
|
||||
} else {
|
||||
log.warn("[埋点] 异步消息发送未被ACK");
|
||||
//confirmFuture.complete(false);
|
||||
}
|
||||
});
|
||||
rabbitTemplate.convertAndSend(BuriedMessages.EXCHANGE, BuriedMessages.ROUTING_KEY, message, correlationData);
|
||||
log.info("[埋点] 异步消息发送完成: {}", message);
|
||||
// return null;
|
||||
}
|
||||
}
|
||||
@@ -23,7 +23,10 @@
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
<artifactId>tashow-framework-env</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
<artifactId>tashow-framework-mq</artifactId>
|
||||
</dependency>
|
||||
<!-- 依赖服务 -->
|
||||
<dependency>
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
@@ -140,6 +143,18 @@
|
||||
<groupId>org.dromara.hutool</groupId>
|
||||
<artifactId>hutool-extra</artifactId> <!-- 邮件 -->
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-test</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-test</artifactId>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
|
||||
@@ -2,12 +2,14 @@ package com.tashow.cloud.system;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.scheduling.annotation.EnableAsync;
|
||||
|
||||
/**
|
||||
* 项目的启动类
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@SpringBootApplication
|
||||
@EnableAsync // 开启异步
|
||||
public class SystemServerApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
Reference in New Issue
Block a user