This commit is contained in:
2025-05-26 17:57:59 +08:00
parent 71519a730b
commit d010e55b76
7 changed files with 564 additions and 4 deletions

View File

@@ -0,0 +1,33 @@
package com.tashow.cloud.app.mq.annotation;
import java.lang.annotation.*;
/**
* 埋点注解
* 用于标记需要特殊埋点的方法或接口
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface BuriedPoint {
/**
* 埋点事件类型
*/
String eventType() default "";
/**
* 埋点描述信息
*/
String description() default "";
/**
* 是否记录方法入参
*/
boolean recordParams() default false;
/**
* 是否记录返回结果
*/
boolean recordResult() default false;
}

View File

@@ -0,0 +1,28 @@
package com.tashow.cloud.app.mq.annotation;
import java.lang.annotation.*;
/**
* 埋点注解
* 用于标记需要埋点的方法
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface BuryPoint {
/**
* 事件类型
*/
String eventType() default "METHOD";
/**
* 页面路径/功能模块
*/
String pagePath() default "";
/**
* 描述信息
*/
String description() default "";
}

View File

@@ -0,0 +1,223 @@
package com.tashow.cloud.app.mq.aspect;
import cn.hutool.core.util.IdUtil;
import cn.hutool.json.JSONUtil;
import com.tashow.cloud.app.mq.annotation.BuriedPoint;
import com.tashow.cloud.app.mq.message.BuriedMessages;
import com.tashow.cloud.app.mq.producer.buriedPoint.BuriedPointProducer;
import com.tashow.cloud.common.util.servlet.ServletUtils;
import com.tashow.cloud.common.util.spring.SpringUtils;
import jakarta.servlet.http.HttpServletRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.UnknownHostException;
/**
* 埋点切面
* 处理自定义埋点注解
*/
@Aspect
@Component
@Order(10)
@Slf4j
@RequiredArgsConstructor
public class BuriedPointAspect {
/*
private final BuriedPointProducer buriedPointProducer;
@Pointcut("@annotation(com.tashow.cloud.app.mq.annotation.BuriedPoint)")
public void buriedPointCut() {
}
@Around("buriedPointCut()")
public Object around(ProceedingJoinPoint point) throws Throwable {
// 获取方法签名
MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();
// 获取注解
BuriedPoint buriedPoint = method.getAnnotation(BuriedPoint.class);
if (buriedPoint == null) {
return point.proceed();
}
// 生成埋点ID
int buriedId = (int)(Math.abs(IdUtil.getSnowflakeNextId()) % Integer.MAX_VALUE);
// 记录开始埋点
recordMethodStart(point, method, buriedPoint, buriedId);
long startTime = System.currentTimeMillis();
Object result = null;
Exception exception = null;
try {
// 执行原方法
result = point.proceed();
return result;
} catch (Exception e) {
exception = e;
throw e;
} finally {
long duration = System.currentTimeMillis() - startTime;
// 记录结束埋点
recordMethodEnd(point, method, buriedPoint, result, exception, duration, buriedId);
}
}
*//**
* 记录方法开始埋点
*//*
private void recordMethodStart(ProceedingJoinPoint point, Method method, BuriedPoint buriedPoint, int buriedId) {
try {
// 获取类和方法名
String className = point.getTarget().getClass().getSimpleName();
String methodName = method.getName();
// 创建埋点消息
BuriedMessages message = new BuriedMessages();
message.setId(buriedId);
message.setEventTime(System.currentTimeMillis());
message.setService(SpringUtils.getApplicationName());
message.setMethod(className + "." + methodName);
message.setEventType(buriedPoint.eventType().isEmpty() ? "METHOD_START" : buriedPoint.eventType() + "_START");
message.setPagePath(className + "#" + methodName);
// 设置用户ID和其他信息
setRequestInfo(message);
// 如果需要记录入参
if (buriedPoint.recordParams() && point.getArgs() != null && point.getArgs().length > 0) {
// 限制参数长度,避免埋点数据过大
String params = JSONUtil.toJsonStr(point.getArgs());
if (params.length() > 500) {
params = params.substring(0, 500) + "...";
}
message.addExtraData("params", params);
}
// 记录描述信息
if (!buriedPoint.description().isEmpty()) {
message.addExtraData("description", buriedPoint.description());
}
// 异步发送埋点数据
buriedPointProducer.asyncSendMessage(message);
if (log.isDebugEnabled()) {
log.debug("[埋点] 方法开始: {}.{}, 事件类型: {}", className, methodName, message.getEventType());
}
} catch (Exception e) {
log.warn("[埋点] 记录方法开始埋点异常", e);
}
}
*//**
* 记录方法结束埋点
*//*
private void recordMethodEnd(ProceedingJoinPoint point, Method method, BuriedPoint buriedPoint,
Object result, Exception exception, long duration, int buriedId) {
try {
// 获取类和方法名
String className = point.getTarget().getClass().getSimpleName();
String methodName = method.getName();
// 创建埋点消息
BuriedMessages message = new BuriedMessages();
message.setId(buriedId);
message.setEventTime(System.currentTimeMillis());
message.setService(SpringUtils.getApplicationName());
message.setMethod(className + "." + methodName);
message.setEventType(buriedPoint.eventType().isEmpty() ? "METHOD_END" : buriedPoint.eventType() + "_END");
message.setPagePath(className + "#" + methodName);
message.setDuration(duration);
// 设置用户ID和其他信息
setRequestInfo(message);
// 如果需要记录结果
if (buriedPoint.recordResult() && result != null && exception == null) {
// 限制结果长度,避免埋点数据过大
String resultStr = JSONUtil.toJsonStr(result);
if (resultStr.length() > 500) {
resultStr = resultStr.substring(0, 500) + "...";
}
message.addExtraData("result", resultStr);
}
// 如果有异常,记录异常信息
if (exception != null) {
message.setErrorMessage(exception.getMessage());
message.addExtraData("exceptionClass", exception.getClass().getName());
}
// 异步发送埋点数据
buriedPointProducer.asyncSendMessage(message);
if (log.isDebugEnabled()) {
log.debug("[埋点] 方法结束: {}.{}, 事件类型: {}, 耗时: {}ms",
className, methodName, message.getEventType(), duration);
}
} catch (Exception e) {
log.warn("[埋点] 记录方法结束埋点异常", e);
}
}
*//**
* 设置请求相关信息
*//*
private void setRequestInfo(BuriedMessages message) {
try {
// 获取当前请求信息
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
if (attributes != null) {
HttpServletRequest request = attributes.getRequest();
// 设置用户ID和其他请求信息
message.setUserId(getUserId(request));
message.setSessionId(request.getSession().getId());
message.setClientIp(ServletUtils.getClientIP(request));
message.setUserAgent(request.getHeader("User-Agent"));
}
// 设置服务器IP
message.setServerIp(getServerIp());
} catch (Exception e) {
log.warn("[埋点] 设置请求信息异常", e);
}
}
*//**
* 获取当前登录用户ID
*//*
private String getUserId(HttpServletRequest request) {
// 这里需要根据项目实际的用户体系来获取用户ID
Object userAttribute = request.getSession().getAttribute("USER_ID");
if (userAttribute != null) {
return userAttribute.toString();
}
return "anonymous";
}
*//**
* 获取服务器IP
*//*
private String getServerIp() {
try {
return InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
return "unknown";
}
}*/
}

View File

@@ -0,0 +1,56 @@
package com.tashow.cloud.app.mq.docs;
import com.tashow.cloud.app.mq.annotation.BuriedPoint;
import org.springframework.stereotype.Service;
/**
* 埋点使用示例
*/
@Service
public class BuriedPointExample {
/**
* 使用埋点注解标记的方法
* 系统会自动记录该方法的调用情况
*/
@BuriedPoint(
eventType = "BUSINESS_OPERATION",
description = "处理业务数据",
recordParams = true,
recordResult = true
)
public String processBusiness(String businessId, String operationType) {
// 业务处理逻辑
return "处理完成: " + businessId;
}
/**
* 如何使用埋点系统:
*
* 1. 静默埋点 - 无需任何代码
* 系统会自动对所有API请求进行埋点记录请求的开始和结束信息
* 包括: 请求时间、路径、参数、响应时间、状态码等
*
* 2. 注解埋点 - 针对特定方法
* 在需要埋点的方法上添加 @BuriedPoint 注解
* 可以设置事件类型、描述,以及是否记录参数和返回结果
*
* 3. 埋点数据流转
* 所有埋点数据会通过RabbitMQ发送到消息队列
* 可以在消费者端实现数据的存储和分析
*
* 4. 数据字段说明
* id: 埋点事件唯一ID
* eventTime: 事件发生时间
* service: 服务名称
* method: 方法/接口
* userId: 用户标识
* clientIp: 客户端IP
* eventType: 事件类型
* duration: 操作耗时
* extraData: 额外数据(参数、结果等)
*/
public void howToUse() {
// 示例方法
}
}

View File

@@ -0,0 +1,220 @@
# 后端静默埋点实现方案基于RabbitMQ
后端静默埋点是指不依赖前端代码完全由后端服务收集用户行为和应用性能数据的方案。以下是使用RabbitMQ实现后端静默埋点的详细方案
## 一、方案架构
[业务服务] → [RabbitMQ] → [埋点消费服务] → [数据分析存储]
## 二、具体实现步骤
### 1. 埋点数据收集
在业务服务中需要埋点的位置添加消息发送代码:
```java
// Java示例使用Spring AMQP
@Autowired
private RabbitTemplate rabbitTemplate;
public void someBusinessMethod(User user, Action action) {
try {
// 构造埋点数据
Map<String, Object> trackingData = new HashMap<>();
trackingData.put("event_time", System.currentTimeMillis());
trackingData.put("user_id", user.getId());
trackingData.put("action", action.name());
trackingData.put("service", "order-service");
trackingData.put("ip", getClientIp());
// 发送到RabbitMQ
rabbitTemplate.convertAndSend(
"tracking.exchange",
"tracking.route",
trackingData
);
} catch (Exception e) {
// 埋点异常不应影响主流程
log.error("Tracking data send failed", e);
}
}
```
### 2. RabbitMQ配置
#### 创建专用Exchange和Queue
```yaml
# application.yml配置示例
spring:
rabbitmq:
host: rabbitmq-server
port: 5672
username: guest
password: guest
template:
exchange: tracking.exchange
listener:
simple:
acknowledge-mode: auto
```
#### 声明Exchange和Queue
```java
@Configuration
public class RabbitMQConfig {
@Bean
public Exchange trackingExchange() {
return ExchangeBuilder.directExchange("tracking.exchange")
.durable(true)
.build();
}
@Bean
public Queue trackingQueue() {
return QueueBuilder.durable("tracking.queue")
.withArgument("x-message-ttl", 86400000) // 消息存活时间
.build();
}
@Bean
public Binding trackingBinding() {
return BindingBuilder.bind(trackingQueue())
.to(trackingExchange())
.with("tracking.route")
.noargs();
}
}
```
### 3. 埋点数据处理服务
创建专门的消费者服务处理埋点数据:
```java
@Component
public class TrackingDataConsumer {
private static final Logger log = LoggerFactory.getLogger(TrackingDataConsumer.class);
@RabbitListener(queues = "tracking.queue")
public void processTrackingData(Map<String, Object> trackingData) {
try {
// 1. 数据校验和补全
validateAndEnrichData(trackingData);
// 2. 数据格式化
TrackingEvent event = convertToEvent(trackingData);
// 3. 数据存储或转发
saveToDatabase(event);
// 或 sendToAnalyticsPlatform(event);
} catch (Exception e) {
log.error("Process tracking data failed: {}", trackingData, e);
// 可根据需要加入重试或死信队列逻辑
}
}
private void validateAndEnrichData(Map<String, Object> data) {
if (!data.containsKey("user_id")) {
data.put("user_id", "anonymous");
}
// 补充服务IP等信息
data.put("server_ip", getServerIp());
}
// 其他辅助方法...
}
```
### 4. 高级配置
#### 消息可靠性保证
```java
// 发送端确认回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
log.warn("Tracking message lost, cause: {}", cause);
// 可加入本地缓存或日志,后续重发
}
});
// 返回回调(路由失败)
rabbitTemplate.setReturnsCallback(returned -> {
log.warn("Tracking message returned: {}", returned.getMessage());
});
```
#### 批量发送优化
```java
// 使用BatchingRabbitTemplate提高性能
@Bean
public BatchingRabbitTemplate batchingRabbitTemplate(
RabbitTemplate rabbitTemplate,
SimpleBatchingStrategy strategy) {
return new BatchingRabbitTemplate(strategy);
}
@Bean
public SimpleBatchingStrategy batchingStrategy() {
return new SimpleBatchingStrategy(100, 1024 * 1024, 5000);
}
```
## 三、埋点数据设计建议
### 1. 通用字段
event_id: 事件唯一ID
event_time: 事件时间戳
service: 服务名称
method: 方法/接口
user_id: 用户标识
session_id: 会话标识
client_ip: 客户端IP
server_ip: 服务器IP
### 2. 业务字段
action: 操作类型
target: 操作目标
params: 业务参数
result: 操作结果
cost_time: 耗时(ms)
## 四、性能优化建议
异步发送:确保埋点发送不阻塞主业务流程
本地缓存:网络异常时先缓存到本地,后续重发
采样率:高流量场景可配置采样率
消息压缩:大数据量时可启用消息压缩
独立集群埋点使用独立的RabbitMQ集群避免影响业务消息
## 五、监控与维护
监控RabbitMQ队列积压情况
设置埋点数据处理的延迟告警
定期审计埋点数据的完整性和准确性

View File

@@ -7,10 +7,10 @@ spring:
username: nacos # Nacos 账号
password: nacos # Nacos 密码
discovery: # 【配置中心】配置项
namespace: liwq # 命名空间。这里使用 dev 开发环境
namespace: 5c8b8fe6-9a89-4ae3-975e-ef3bf560ff82 # 命名空间。这里使用 dev 开发环境
group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP
metadata:
version: 1.0.0 # 服务实例的版本号,可用于灰度发布
config: # 【注册中心】配置项
namespace: liwq # 命名空间。这里使用 dev 开发环境
namespace: 5c8b8fe6-9a89-4ae3-975e-ef3bf560ff82 # 命名空间。这里使用 dev 开发环境
group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP