From 8e2c28ef3604225cea0188bdebc15c9d1db62bf6 Mon Sep 17 00:00:00 2001
From: ZiJIe <17738440858@163.com>
Date: Fri, 30 May 2025 17:49:57 +0800
Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
tashow-gateway/pom.xml | 17 --
.../filter/security/SaTokenConfigure.java | 43 ---
.../filter/security/StpInterfaceImpl.java | 37 ---
.../cloud/app/AppServerApplication.java | 2 +
.../cloud/app/controller/TestController.java | 6 +-
.../interceptor/BuriedPointInterceptor.java | 12 +-
.../cloud/app/mapper/BuriedPointMapper.java | 14 +
.../cloud/app/{mq => }/model/BuriedPoint.java | 8 +-
.../cloud/app/mq/annotation/BuriedPoint.java | 33 ---
.../cloud/app/mq/annotation/BuryPoint.java | 28 --
.../app/mq/aspect/BuriedPointAspect.java | 223 ---------------
.../mq/config/BuriedPointConfiguration.java | 129 ++++++---
.../buriedPoint/BuriedPointConsumer.java | 268 +++++++++++++-----
.../cloud/app/mq/docs/BuriedPointExample.java | 56 ----
.../docs/backend-silent-tracking-rabbitmq.md | 220 --------------
.../app/mq/mapper/BuriedPointMapper.java | 10 -
.../cloud/app/mq/message/BuriedMessages.java | 44 ++-
.../buriedPoint/BuriedPointProducer.java | 60 ++--
.../config/SecurityConfiguration.java | 1 -
19 files changed, 375 insertions(+), 836 deletions(-)
delete mode 100644 tashow-gateway/src/main/java/com/tashow/cloud/gateway/filter/security/SaTokenConfigure.java
delete mode 100644 tashow-gateway/src/main/java/com/tashow/cloud/gateway/filter/security/StpInterfaceImpl.java
rename tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/{mq => }/interceptor/BuriedPointInterceptor.java (92%)
create mode 100644 tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mapper/BuriedPointMapper.java
rename tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/{mq => }/model/BuriedPoint.java (93%)
delete mode 100644 tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/annotation/BuriedPoint.java
delete mode 100644 tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/annotation/BuryPoint.java
delete mode 100644 tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/aspect/BuriedPointAspect.java
delete mode 100644 tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/docs/BuriedPointExample.java
delete mode 100644 tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/docs/backend-silent-tracking-rabbitmq.md
delete mode 100644 tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/mapper/BuriedPointMapper.java
diff --git a/tashow-gateway/pom.xml b/tashow-gateway/pom.xml
index 14e52be..be7d00e 100644
--- a/tashow-gateway/pom.xml
+++ b/tashow-gateway/pom.xml
@@ -45,23 +45,6 @@
spring-cloud-starter-loadbalancer
-
- cn.dev33
- sa-token-reactor-spring-boot3-starter
-
-
-
-
-
- cn.dev33
- sa-token-redis-jackson
- 1.42.0
-
-
- org.apache.commons
- commons-pool2
-
-
com.alibaba.cloud
diff --git a/tashow-gateway/src/main/java/com/tashow/cloud/gateway/filter/security/SaTokenConfigure.java b/tashow-gateway/src/main/java/com/tashow/cloud/gateway/filter/security/SaTokenConfigure.java
deleted file mode 100644
index b6384b1..0000000
--- a/tashow-gateway/src/main/java/com/tashow/cloud/gateway/filter/security/SaTokenConfigure.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package com.tashow.cloud.gateway.filter.security;
-
-import cn.dev33.satoken.reactor.filter.SaReactorFilter;
-import cn.dev33.satoken.router.SaRouter;
-import cn.dev33.satoken.stp.StpUtil;
-import cn.dev33.satoken.util.SaResult;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-/**
- * [Sa-Token 权限认证] 配置类
- * @author click33
- */
-@Configuration
-public class SaTokenConfigure {
- // 注册 Sa-Token全局过滤器
- @Bean
- public SaReactorFilter getSaReactorFilter() {
- return new SaReactorFilter()
- // 拦截地址
- .addInclude("/**") /* 拦截全部path */
- // 开放地址
- .addExclude("/favicon.ico")
- // 鉴权方法:每次访问进入
- .setAuth(obj -> {
- // 登录校验 -- 拦截所有路由,并排除/user/doLogin 用于开放登录
- SaRouter.match("/**", "/user/doLogin", r -> StpUtil.checkLogin());
-
- // 权限认证 -- 不同模块, 校验不同权限
- SaRouter.match("/user/**", r -> StpUtil.checkPermission("user"));
- SaRouter.match("/admin/**", r -> StpUtil.checkPermission("admin"));
- SaRouter.match("/goods/**", r -> StpUtil.checkPermission("goods"));
- SaRouter.match("/orders/**", r -> StpUtil.checkPermission("orders"));
-
- // 更多匹配 ... */
- })
- // 异常处理方法:每次setAuth函数出现异常时进入
- .setError(e -> {
- return SaResult.error(e.getMessage());
- })
- ;
- }
-}
diff --git a/tashow-gateway/src/main/java/com/tashow/cloud/gateway/filter/security/StpInterfaceImpl.java b/tashow-gateway/src/main/java/com/tashow/cloud/gateway/filter/security/StpInterfaceImpl.java
deleted file mode 100644
index d687f3d..0000000
--- a/tashow-gateway/src/main/java/com/tashow/cloud/gateway/filter/security/StpInterfaceImpl.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package com.tashow.cloud.gateway.filter.security;
-
-import cn.dev33.satoken.stp.StpInterface;
-import org.springframework.stereotype.Component;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * 自定义权限验证接口扩展
- */
-@Component
-public class StpInterfaceImpl implements StpInterface {
-
- @Override
- public List getPermissionList(Object loginId, String loginType) {
- // 本 list 仅做模拟,实际项目中要根据具体业务逻辑来查询权限
- List list = new ArrayList();
- list.add("101");
- list.add("user.add");
- list.add("user.update");
- list.add("user.get");
- // list.add("user.delete");
- list.add("art.*");
- return list;
- }
-
- @Override
- public List getRoleList(Object loginId, String loginType) {
- // 本 list 仅做模拟,实际项目中要根据具体业务逻辑来查询角色
- List list = new ArrayList();
- list.add("admin");
- list.add("super-admin");
- return list;
- }
-
-}
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/AppServerApplication.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/AppServerApplication.java
index 24f14a9..a74dc76 100644
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/AppServerApplication.java
+++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/AppServerApplication.java
@@ -2,12 +2,14 @@ package com.tashow.cloud.app;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.scheduling.annotation.EnableScheduling;
/**
* Hello world!
*
*/
@SpringBootApplication
+@EnableScheduling
public class AppServerApplication {
public static void main(String[] args) {
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/controller/TestController.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/controller/TestController.java
index 83c962c..35ce5c5 100644
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/controller/TestController.java
+++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/controller/TestController.java
@@ -1,14 +1,10 @@
package com.tashow.cloud.app.controller;
-
-import com.tashow.cloud.app.mq.annotation.BuriedPoint;
-import com.tashow.cloud.app.mq.mapper.BuriedPointMapper;
-import com.tashow.cloud.app.mq.message.BuriedMessages;
+import com.tashow.cloud.app.mapper.BuriedPointMapper;
import com.tashow.cloud.app.mq.producer.buriedPoint.BuriedPointProducer;
import jakarta.annotation.security.PermitAll;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;
-import java.util.Date;
import java.util.HashMap;
import java.util.Map;
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/interceptor/BuriedPointInterceptor.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/interceptor/BuriedPointInterceptor.java
similarity index 92%
rename from tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/interceptor/BuriedPointInterceptor.java
rename to tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/interceptor/BuriedPointInterceptor.java
index 50b0e98..5be550d 100644
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/interceptor/BuriedPointInterceptor.java
+++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/interceptor/BuriedPointInterceptor.java
@@ -1,4 +1,4 @@
-package com.tashow.cloud.app.mq.interceptor;
+package com.tashow.cloud.app.interceptor;
import cn.hutool.core.util.IdUtil;
import com.tashow.cloud.app.mq.message.BuriedMessages;
@@ -37,22 +37,18 @@ public class BuriedPointInterceptor implements HandlerInterceptor {
}
try {
- // 开始计时
StopWatch stopWatch = new StopWatch();
stopWatch.start();
request.setAttribute(ATTRIBUTE_STOPWATCH, stopWatch);
- // 生成请求ID
int requestId = (int)(Math.abs(IdUtil.getSnowflakeNextId()) % Integer.MAX_VALUE);
request.setAttribute(ATTRIBUTE_REQUEST_ID, requestId);
-
- // 收集埋点数据
HandlerMethod handlerMethod = (HandlerMethod) handler;
String method = request.getMethod() + " " + request.getRequestURI()+ JsonUtils.toJsonString(request.getParameterMap());
String controllerName = handlerMethod.getBeanType().getSimpleName();
String actionName = handlerMethod.getMethod().getName();
- // 创建埋点消息
+
BuriedMessages message = new BuriedMessages();
message.setId(requestId);
message.setEventTime(System.currentTimeMillis());
@@ -65,7 +61,7 @@ public class BuriedPointInterceptor implements HandlerInterceptor {
message.setEventType("API_REQUEST_START");
message.setPagePath(controllerName + "#" + actionName);
message.setUserAgent(request.getHeader("User-Agent"));
- message.setStatusCode(BuriedMessages.STATUS_INIT);
+ message.setStatusCode(BuriedMessages.STATUS_PROCESSING);
buriedPointProducer.asyncSendMessage(message);
if (log.isDebugEnabled()) {
log.debug("[埋点] 收集请求开始数据: {}", message);
@@ -73,7 +69,6 @@ public class BuriedPointInterceptor implements HandlerInterceptor {
} catch (Exception e) {
log.warn("[埋点] 埋点数据收集异常", e);
}
-
return true;
}
@@ -86,7 +81,6 @@ public class BuriedPointInterceptor implements HandlerInterceptor {
if (userAttribute != null) {
return userAttribute.toString();
}
- // 返回匿名用户标识
return "anonymous";
}
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mapper/BuriedPointMapper.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mapper/BuriedPointMapper.java
new file mode 100644
index 0000000..1aa5918
--- /dev/null
+++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mapper/BuriedPointMapper.java
@@ -0,0 +1,14 @@
+package com.tashow.cloud.app.mapper;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.tashow.cloud.app.model.BuriedPoint;
+import org.apache.ibatis.annotations.Mapper;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.Select;
+
+@Mapper
+public interface BuriedPointMapper extends BaseMapper {
+
+ @Select("SELECT * FROM app_burying WHERE event_id = #{eventId} LIMIT 1")
+ BuriedPoint selectByEventId(@Param("eventId") Integer eventId);
+}
\ No newline at end of file
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/model/BuriedPoint.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/model/BuriedPoint.java
similarity index 93%
rename from tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/model/BuriedPoint.java
rename to tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/model/BuriedPoint.java
index d6e91ca..ad5b745 100644
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/model/BuriedPoint.java
+++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/model/BuriedPoint.java
@@ -1,4 +1,4 @@
-package com.tashow.cloud.app.mq.model;
+package com.tashow.cloud.app.model;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
@@ -106,4 +106,10 @@ public class BuriedPoint {
@TableField(value = "status")
private Integer status;
+
+ /**
+ * 重试次数
+ */
+ @TableField(value = "retry_count")
+ private Integer retryCount;
}
\ No newline at end of file
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/annotation/BuriedPoint.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/annotation/BuriedPoint.java
deleted file mode 100644
index c415aa0..0000000
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/annotation/BuriedPoint.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package com.tashow.cloud.app.mq.annotation;
-
-import java.lang.annotation.*;
-
-/**
- * 埋点注解
- * 用于标记需要特殊埋点的方法或接口
- */
-@Target({ElementType.METHOD})
-@Retention(RetentionPolicy.RUNTIME)
-@Documented
-public @interface BuriedPoint {
-
- /**
- * 埋点事件类型
- */
- String eventType() default "";
-
- /**
- * 埋点描述信息
- */
- String description() default "";
-
- /**
- * 是否记录方法入参
- */
- boolean recordParams() default false;
-
- /**
- * 是否记录返回结果
- */
- boolean recordResult() default false;
-}
\ No newline at end of file
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/annotation/BuryPoint.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/annotation/BuryPoint.java
deleted file mode 100644
index 1f88dd1..0000000
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/annotation/BuryPoint.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package com.tashow.cloud.app.mq.annotation;
-
-import java.lang.annotation.*;
-
-/**
- * 埋点注解
- * 用于标记需要埋点的方法
- */
-@Target(ElementType.METHOD)
-@Retention(RetentionPolicy.RUNTIME)
-@Documented
-public @interface BuryPoint {
-
- /**
- * 事件类型
- */
- String eventType() default "METHOD";
-
- /**
- * 页面路径/功能模块
- */
- String pagePath() default "";
-
- /**
- * 描述信息
- */
- String description() default "";
-}
\ No newline at end of file
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/aspect/BuriedPointAspect.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/aspect/BuriedPointAspect.java
deleted file mode 100644
index f3cd7ea..0000000
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/aspect/BuriedPointAspect.java
+++ /dev/null
@@ -1,223 +0,0 @@
-package com.tashow.cloud.app.mq.aspect;
-
-import cn.hutool.core.util.IdUtil;
-import cn.hutool.json.JSONUtil;
-import com.tashow.cloud.app.mq.annotation.BuriedPoint;
-import com.tashow.cloud.app.mq.message.BuriedMessages;
-import com.tashow.cloud.app.mq.producer.buriedPoint.BuriedPointProducer;
-import com.tashow.cloud.common.util.servlet.ServletUtils;
-import com.tashow.cloud.common.util.spring.SpringUtils;
-import jakarta.servlet.http.HttpServletRequest;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.aspectj.lang.ProceedingJoinPoint;
-import org.aspectj.lang.annotation.Around;
-import org.aspectj.lang.annotation.Aspect;
-import org.aspectj.lang.annotation.Pointcut;
-import org.aspectj.lang.reflect.MethodSignature;
-import org.springframework.core.annotation.Order;
-import org.springframework.stereotype.Component;
-import org.springframework.web.context.request.RequestContextHolder;
-import org.springframework.web.context.request.ServletRequestAttributes;
-
-import java.lang.reflect.Method;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-/**
- * 埋点切面
- * 处理自定义埋点注解
- */
-@Aspect
-@Component
-@Order(10)
-@Slf4j
-@RequiredArgsConstructor
-public class BuriedPointAspect {
-/*
- private final BuriedPointProducer buriedPointProducer;
-
- @Pointcut("@annotation(com.tashow.cloud.app.mq.annotation.BuriedPoint)")
- public void buriedPointCut() {
- }
-
- @Around("buriedPointCut()")
- public Object around(ProceedingJoinPoint point) throws Throwable {
- // 获取方法签名
- MethodSignature signature = (MethodSignature) point.getSignature();
- Method method = signature.getMethod();
-
- // 获取注解
- BuriedPoint buriedPoint = method.getAnnotation(BuriedPoint.class);
- if (buriedPoint == null) {
- return point.proceed();
- }
-
- // 生成埋点ID
- int buriedId = (int)(Math.abs(IdUtil.getSnowflakeNextId()) % Integer.MAX_VALUE);
-
- // 记录开始埋点
- recordMethodStart(point, method, buriedPoint, buriedId);
-
- long startTime = System.currentTimeMillis();
- Object result = null;
- Exception exception = null;
- try {
- // 执行原方法
- result = point.proceed();
- return result;
- } catch (Exception e) {
- exception = e;
- throw e;
- } finally {
- long duration = System.currentTimeMillis() - startTime;
- // 记录结束埋点
- recordMethodEnd(point, method, buriedPoint, result, exception, duration, buriedId);
- }
- }
-
- *//**
- * 记录方法开始埋点
- *//*
- private void recordMethodStart(ProceedingJoinPoint point, Method method, BuriedPoint buriedPoint, int buriedId) {
- try {
- // 获取类和方法名
- String className = point.getTarget().getClass().getSimpleName();
- String methodName = method.getName();
-
- // 创建埋点消息
- BuriedMessages message = new BuriedMessages();
- message.setId(buriedId);
- message.setEventTime(System.currentTimeMillis());
- message.setService(SpringUtils.getApplicationName());
- message.setMethod(className + "." + methodName);
- message.setEventType(buriedPoint.eventType().isEmpty() ? "METHOD_START" : buriedPoint.eventType() + "_START");
- message.setPagePath(className + "#" + methodName);
-
- // 设置用户ID和其他信息
- setRequestInfo(message);
-
- // 如果需要记录入参
- if (buriedPoint.recordParams() && point.getArgs() != null && point.getArgs().length > 0) {
- // 限制参数长度,避免埋点数据过大
- String params = JSONUtil.toJsonStr(point.getArgs());
- if (params.length() > 500) {
- params = params.substring(0, 500) + "...";
- }
- message.addExtraData("params", params);
- }
-
- // 记录描述信息
- if (!buriedPoint.description().isEmpty()) {
- message.addExtraData("description", buriedPoint.description());
- }
-
- // 异步发送埋点数据
- buriedPointProducer.asyncSendMessage(message);
-
- if (log.isDebugEnabled()) {
- log.debug("[埋点] 方法开始: {}.{}, 事件类型: {}", className, methodName, message.getEventType());
- }
- } catch (Exception e) {
- log.warn("[埋点] 记录方法开始埋点异常", e);
- }
- }
-
- *//**
- * 记录方法结束埋点
- *//*
- private void recordMethodEnd(ProceedingJoinPoint point, Method method, BuriedPoint buriedPoint,
- Object result, Exception exception, long duration, int buriedId) {
- try {
- // 获取类和方法名
- String className = point.getTarget().getClass().getSimpleName();
- String methodName = method.getName();
-
- // 创建埋点消息
- BuriedMessages message = new BuriedMessages();
- message.setId(buriedId);
- message.setEventTime(System.currentTimeMillis());
- message.setService(SpringUtils.getApplicationName());
- message.setMethod(className + "." + methodName);
- message.setEventType(buriedPoint.eventType().isEmpty() ? "METHOD_END" : buriedPoint.eventType() + "_END");
- message.setPagePath(className + "#" + methodName);
- message.setDuration(duration);
-
- // 设置用户ID和其他信息
- setRequestInfo(message);
-
- // 如果需要记录结果
- if (buriedPoint.recordResult() && result != null && exception == null) {
- // 限制结果长度,避免埋点数据过大
- String resultStr = JSONUtil.toJsonStr(result);
- if (resultStr.length() > 500) {
- resultStr = resultStr.substring(0, 500) + "...";
- }
- message.addExtraData("result", resultStr);
- }
-
- // 如果有异常,记录异常信息
- if (exception != null) {
- message.setErrorMessage(exception.getMessage());
- message.addExtraData("exceptionClass", exception.getClass().getName());
- }
-
- // 异步发送埋点数据
- buriedPointProducer.asyncSendMessage(message);
-
- if (log.isDebugEnabled()) {
- log.debug("[埋点] 方法结束: {}.{}, 事件类型: {}, 耗时: {}ms",
- className, methodName, message.getEventType(), duration);
- }
- } catch (Exception e) {
- log.warn("[埋点] 记录方法结束埋点异常", e);
- }
- }
-
- *//**
- * 设置请求相关信息
- *//*
- private void setRequestInfo(BuriedMessages message) {
- try {
- // 获取当前请求信息
- ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
- if (attributes != null) {
- HttpServletRequest request = attributes.getRequest();
-
- // 设置用户ID和其他请求信息
- message.setUserId(getUserId(request));
- message.setSessionId(request.getSession().getId());
- message.setClientIp(ServletUtils.getClientIP(request));
- message.setUserAgent(request.getHeader("User-Agent"));
- }
-
- // 设置服务器IP
- message.setServerIp(getServerIp());
- } catch (Exception e) {
- log.warn("[埋点] 设置请求信息异常", e);
- }
- }
-
- *//**
- * 获取当前登录用户ID
- *//*
- private String getUserId(HttpServletRequest request) {
- // 这里需要根据项目实际的用户体系来获取用户ID
- Object userAttribute = request.getSession().getAttribute("USER_ID");
- if (userAttribute != null) {
- return userAttribute.toString();
- }
- return "anonymous";
- }
-
- *//**
- * 获取服务器IP
- *//*
- private String getServerIp() {
- try {
- return InetAddress.getLocalHost().getHostAddress();
- } catch (UnknownHostException e) {
- return "unknown";
- }
- }*/
-}
\ No newline at end of file
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/config/BuriedPointConfiguration.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/config/BuriedPointConfiguration.java
index 6cef054..6cd9ca7 100644
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/config/BuriedPointConfiguration.java
+++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/config/BuriedPointConfiguration.java
@@ -1,12 +1,12 @@
package com.tashow.cloud.app.mq.config;
-import com.tashow.cloud.app.mq.interceptor.BuriedPointInterceptor;
+import com.tashow.cloud.app.interceptor.BuriedPointInterceptor;
+import com.tashow.cloud.app.mapper.BuriedPointFailRecordMapper;
import com.tashow.cloud.app.mq.message.BuriedMessages;
+import com.tashow.cloud.app.model.BuriedPointFailRecord;
import com.tashow.cloud.app.mq.producer.buriedPoint.BuriedPointProducer;
+import com.tashow.cloud.app.mq.producer.buriedPoint.CustomCorrelationData;
import lombok.RequiredArgsConstructor;
-import org.springframework.amqp.core.Binding;
-import org.springframework.amqp.core.BindingBuilder;
-import org.springframework.amqp.core.DirectExchange;
-import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -14,6 +14,8 @@ import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import lombok.extern.slf4j.Slf4j;
import jakarta.annotation.PostConstruct;
+import java.util.Date;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
/**
* 埋点功能配置类
@@ -25,42 +27,99 @@ public class BuriedPointConfiguration implements WebMvcConfigurer {
private final BuriedPointProducer buriedPointProducer;
private final RabbitTemplate rabbitTemplate;
-
+ private final BuriedPointFailRecordMapper buriedPointFailRecordMapper;
+
/**
* RabbitTemplate初始化配置
- * 确保回调正确配置,以实现消息可靠性
*/
-// @PostConstruct
-// public void initRabbitTemplate() {
-// log.info("[埋点配置] 初始化RabbitTemplate: {}", rabbitTemplate);
-// rabbitTemplate.setMandatory(true);
-// rabbitTemplate.setReturnsCallback(returned -> {
-// log.error("[埋点配置] 消息路由失败: exchange={}, routingKey={}, replyCode={}, replyText={}, message={}",
-// returned.getExchange(),
-// returned.getRoutingKey(),
-// returned.getReplyCode(),
-// returned.getReplyText(),
-// new String(returned.getMessage().getBody()));
-// });
-// rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
-// if (ack) {
-// log.debug("[埋点配置] 消息成功发送到交换机: {}", correlationData);
-// } else {
-// log.error("[埋点配置] 消息发送到交换机失败: cause={}, correlationData={}", cause, correlationData);
-// }
-// });
-//
-// // 验证配置
-// if (rabbitTemplate.isConfirmListener()) {
-// log.info("[埋点配置] 确认回调已正确配置");
-// } else {
-// log.error("[埋点配置] 确认回调配置失败,请检查RabbitMQ配置!");
-// }
-// }
+ @PostConstruct
+ public RabbitTemplate initRabbitTemplate() {
+ log.info("[埋点配置] 初始化RabbitTemplate: {}", rabbitTemplate);
+ rabbitTemplate.setMandatory(true);
+ rabbitTemplate.setReturnsCallback(returned -> {
+ log.error("[埋点配置] 消息路由失败: exchange={}, routingKey={}, replyCode={}, replyText={}, ={}",
+ returned.getExchange(),
+ returned.getRoutingKey(),
+ returned.getReplyCode(),
+ returned.getReplyText(),
+ new String(returned.getMessage().getBody()));
+
+ saveFailRecord(
+ returned.getMessage().getMessageProperties().getCorrelationId(),
+ returned.getExchange(),
+ returned.getRoutingKey(),
+ "路由失败: " + returned.getReplyText(),
+ new String(returned.getMessage().getBody())
+ );
+ });
+ rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
+ if (ack) {
+ log.info("[埋点配置] 消息成功发送到交换机: {}", correlationData.getId());
+ } else {
+ log.error("[埋点配置] 消息发送到交换机失败: cause={}, correlationData={}", cause, correlationData);
+ CustomCorrelationData customData = (CustomCorrelationData) correlationData;
+ String messageContent = customData.getMessageContent();
+ saveFailRecord(
+ correlationData.getId(),
+ BuriedMessages.EXCHANGE,
+ BuriedMessages.ROUTING_KEY,
+ cause,
+ messageContent
+ );
+ }
+ });
+ if (rabbitTemplate.isConfirmListener()) {
+ log.info("[埋点配置] 确认回调已正确配置");
+ } else {
+ log.error("[埋点配置] 确认回调配置失败");
+ }
+ return rabbitTemplate;
+ }
/**
- * 创建埋点队列
+ * 保存消息发送失败记录
*/
+ private void saveFailRecord(String correlationId, String exchange, String routingKey, String cause, String messageContent) {
+ try {
+ log.info("[埋点配置] 保存发送失败记录: correlationId={}", correlationId);
+
+ // 先查询是否已存在记录
+ LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>();
+ queryWrapper.eq(BuriedPointFailRecord::getCorrelationId, correlationId);
+ BuriedPointFailRecord existingRecord = buriedPointFailRecordMapper.selectOne(queryWrapper);
+
+ if (existingRecord != null) {
+ // 已存在记录,执行更新
+ log.info("[埋点配置] 发现已有失败记录,将更新: {}", correlationId);
+ existingRecord.setExchange(exchange);
+ existingRecord.setRoutingKey(routingKey);
+ existingRecord.setCause(cause);
+ existingRecord.setMessageContent(messageContent);
+ existingRecord.setStatus(BuriedPointFailRecord.STATUS_UNPROCESSED);
+ existingRecord.setUpdateTime(new Date());
+ buriedPointFailRecordMapper.updateById(existingRecord);
+ log.info("[埋点配置] 发送失败记录已更新: correlationId={}", correlationId);
+ } else {
+ // 不存在记录,执行插入
+ BuriedPointFailRecord failRecord = new BuriedPointFailRecord();
+ failRecord.setCorrelationId(correlationId);
+ failRecord.setExchange(exchange);
+ failRecord.setRoutingKey(routingKey);
+ failRecord.setCause(cause);
+ failRecord.setMessageContent(messageContent);
+ failRecord.setRetryCount(0);
+ failRecord.setStatus(BuriedPointFailRecord.STATUS_UNPROCESSED);
+ failRecord.setCreateTime(new Date());
+ failRecord.setUpdateTime(new Date());
+
+ buriedPointFailRecordMapper.insert(failRecord);
+ log.info("[埋点配置] 发送失败记录已保存: correlationId={}", correlationId);
+ }
+ } catch (Exception e) {
+ log.error("[埋点配置] 保存发送失败记录异常", e);
+ }
+ }
+
@Bean
public Queue buriedPointQueue() {
return new Queue(BuriedMessages.QUEUE, true, false, false);
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/consumer/buriedPoint/BuriedPointConsumer.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/consumer/buriedPoint/BuriedPointConsumer.java
index 3a7d5d0..9c7e035 100644
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/consumer/buriedPoint/BuriedPointConsumer.java
+++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/consumer/buriedPoint/BuriedPointConsumer.java
@@ -1,8 +1,9 @@
-/*
package com.tashow.cloud.app.mq.consumer.buriedPoint;
-import com.tashow.cloud.app.mq.mapper.BuriedPointMapper;
+import com.tashow.cloud.app.mapper.BuriedPointMapper;
+import com.tashow.cloud.app.mapper.BuriedPointFailRecordMapper;
import com.tashow.cloud.app.mq.message.BuriedMessages;
-import com.tashow.cloud.app.mq.model.BuriedPoint;
+import com.tashow.cloud.app.model.BuriedPoint;
+import com.tashow.cloud.app.model.BuriedPointFailRecord;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
@@ -11,16 +12,11 @@ import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
-import org.springframework.util.StringUtils;
-
-import java.io.IOException;
import java.util.Date;
import com.rabbitmq.client.Channel;
-*/
-/**
- * 埋点消息消费者
- * 将埋点数据存储到数据库
- *//*
+import org.springframework.dao.DuplicateKeyException;
+import com.tashow.cloud.common.util.json.JsonUtils;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@Component
@RabbitListener(queues = BuriedMessages.QUEUE)
@@ -29,90 +25,220 @@ import com.rabbitmq.client.Channel;
public class BuriedPointConsumer {
private final BuriedPointMapper buriedPointMapper;
-
+ private final BuriedPointFailRecordMapper buriedPointFailRecordMapper;
+
@Value("${spring.application.name:tashow-app}")
private String applicationName;
- */
-/**
- * 处理埋点消息
- *//*
+ private static final int MAX_RETRY_ALLOWED = 1;
@RabbitHandler
- public void onMessage(BuriedMessages message,
- Channel channel,
- @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
- try {
- log.info("[埋点消费者] 收到埋点消息: {}", message);
-
- // 确保事件ID不为空
- if (message.getId() == null) {
- message.setId((int)(System.currentTimeMillis() % Integer.MAX_VALUE));
- log.warn("[埋点消费者] 消息中的事件ID为空,已自动生成: {}", message.getId());
- }
- saveToDatabase(message);
- channel.basicAck(deliveryTag, false);
- log.info("[埋点消费者] 消息处理成功,已确认");
-
- } catch (Exception e) {
- try {
- channel.basicNack(deliveryTag, false, true);
- } catch (IOException ex) {
- log.error("[埋点消费者] 拒绝消息失败", ex);
- }
+ public void onMessage(BuriedMessages message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
+ Integer dbRetryCount = getActualRetryCount(message);
+ message.setRetryCount(dbRetryCount);
+ if (message.getRetryCount() != null && message.getRetryCount() >= MAX_RETRY_ALLOWED) {
+ message.setStatusCode(BuriedMessages.STATUS_ERROR);
+ message.addExtraData("errorMessage", "已达到最大重试次数");
+ saveToFailRecord(message, "已达到最大重试次数");
+ safeChannelAck(channel, deliveryTag);
+ return;
}
+ log.info("[埋点消费者] 收到埋点消息: {}, 当前重试次数: {}/{}", message, message.getRetryCount(), MAX_RETRY_ALLOWED);
+
+ message.setStatusCode(BuriedMessages.STATUS_PROCESSING);
+ log.info("[埋点消费者] 消息状态更新为处理中(STATUS_PROCESSING): {}", message.getId());
+ try {
+ /* if(true){
+ throw new RuntimeException("测试异常");
+ }*/
+ saveToDatabase(message);
+ message.setStatusCode(BuriedMessages.STATUS_SUCCESS);
+ updateMessageStatus(message);
+ log.info("[埋点消费者] 消息处理成功,状态已更新为成功(STATUS_SUCCESS): {}", message.getId());
+ safeChannelAck(channel, deliveryTag);
+ } catch (DuplicateKeyException e) {
+ log.warn("[埋点消费者] 消息已被处理过,直接确认: {}, 错误: {}", message.getId(), e.getMessage());
+ safeChannelAck(channel, deliveryTag);
+ } catch (Exception e) {
+ message.setStatusCode(BuriedMessages.STATUS_ERROR);
+ message.addExtraData("errorMessage", e.getMessage());
+ log.error("[埋点消费者] 消息处理失败: {}, 错误: {}", message.getId(), e.getMessage());
+ message.incrementRetryCount();
+ updateRetryCount(message);
+ if (message.getRetryCount() >= MAX_RETRY_ALLOWED) {
+ saveToDatabase(message);
+ log.warn("[埋点消费者] 消息已达到最大重试次数: {}, 确认消息并保存到失败记录表", message.getRetryCount());
+ saveToFailRecord(message, e.getMessage());
+ safeChannelAck(channel, deliveryTag);
+ } else {
+ log.info("[埋点消费者] 消息将重新入队重试: {}, 当前重试次数: {}", message.getId(), message.getRetryCount());
+ safeChannelNack(channel, deliveryTag, false, true);
+ }
+ }
}
-
-
- */
-/**
- * 将埋点数据保存到数据库
- *//*
+ private Integer getActualRetryCount(BuriedMessages message) {
+ try {
+ BuriedPoint buriedPoint = buriedPointMapper.selectByEventId(message.getId());
+ if (buriedPoint != null && buriedPoint.getRetryCount() != null) {
+ if ((buriedPoint.getStatus() == BuriedMessages.STATUS_ERROR ||
+ buriedPoint.getStatus() == BuriedMessages.STATUS_PROCESSING)) {
+ log.info("[埋点消费者] 检测到消息可能因服{}", message.getId());
+ return buriedPoint.getRetryCount() - 1;
+ }
+ return buriedPoint.getRetryCount();
+ } else {
+ String correlationId = String.valueOf(message.getId());
+ LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>();
+ queryWrapper.eq(BuriedPointFailRecord::getCorrelationId, correlationId);
+ BuriedPointFailRecord failRecord = buriedPointFailRecordMapper.selectOne(queryWrapper);
+ return failRecord!=null? failRecord.getRetryCount():0;
- private void saveToDatabase(BuriedMessages message) {
+ }
+ } catch (Exception e) {
+ log.warn("[埋点消费者] 获取消息重试次数失败: {}", e.getMessage());
+ throw new RuntimeException("获取消息重试次数失败", e);
+ }
+ }
+
+ private void safeChannelAck(Channel channel, long deliveryTag) {
+ try {
+ channel.basicAck(deliveryTag, false);
+ } catch (Exception e) {
+ log.error("[埋点消费者] 确认消息失败: {}", e.getMessage());
+ }
+ }
+
+ private void safeChannelNack(Channel channel, long deliveryTag, boolean multiple, boolean requeue) {
+ try {
+ channel.basicNack(deliveryTag, multiple, requeue);
+ } catch (Exception e) {
+ log.error("[埋点消费者] 拒绝消息失败: {}", e.getMessage());
+ }
+ }
+
+ private void updateRetryCount(BuriedMessages message) {
+ try {
+ BuriedPoint buriedPoint = buriedPointMapper.selectByEventId(message.getId());
+ if (buriedPoint != null) {
+ buriedPoint.setRetryCount(message.getRetryCount());
+ buriedPoint.setUpdateTime(new Date());
+ buriedPointMapper.updateById(buriedPoint);
+ } else {
+ String correlationId = String.valueOf(message.getId());
+ LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>();
+ queryWrapper.eq(BuriedPointFailRecord::getCorrelationId, correlationId);
+ BuriedPointFailRecord failRecord = buriedPointFailRecordMapper.selectOne(queryWrapper);
+ if (failRecord != null) {
+ failRecord.setRetryCount(message.getRetryCount());
+ failRecord.setUpdateTime(new Date());
+ failRecord.setMessageContent(JsonUtils.toJsonString(message));
+ buriedPointFailRecordMapper.updateById(failRecord);
+ } else {
+ // 记录或创建新的失败记录
+ log.warn("[埋点消费者] 未找到埋点记录和失败记录, 事件ID: {}, 准备创建失败记录", message.getId());
+ saveToFailRecord(message, "未找到原始埋点记录");
+ }
+ }
+ } catch (Exception e) {
+ log.error("[埋点消费者] 更新重试次数失败: {}, 错误: {}", message.getId(), e.getMessage());
+ }
+ }
+
+ private void updateMessageStatus(BuriedMessages message) {
+ try {
+ BuriedPoint buriedPoint = buriedPointMapper.selectByEventId(message.getId());
+ buriedPoint.setStatus(message.getStatusCode());
+ buriedPoint.setUpdateTime(new Date());
+ buriedPoint.setRetryCount(message.getRetryCount());
+ buriedPointMapper.updateById(buriedPoint);
+ log.debug("[埋点消费者] 已更新埋点状态, 事件ID: {}, 新状态: {}, 重试次数: {}", message.getId(), message.getStatusCode(), message.getRetryCount());
+ } catch (Exception e) {
+ log.error("[埋点消费者] 更新埋点状态失败: {}, 错误: {}", message.getId(), e.getMessage(), e);
+ }
+ }
+
+ private boolean saveToDatabase(BuriedMessages message) {
try {
log.debug("[埋点消费者] 准备保存埋点数据,事件ID: {}", message.getId());
+ BuriedPoint existingPoint = buriedPointMapper.selectByEventId(message.getId());
+ if (existingPoint != null) {
+ existingPoint.setStatus(message.getStatusCode());
+ existingPoint.setUpdateTime(new Date());
+ if (message.getRetryCount() != null) {
+ int newRetryCount = Math.max(existingPoint.getRetryCount(), message.getRetryCount());
+ existingPoint.setRetryCount(newRetryCount);
+ message.setRetryCount(newRetryCount);
+ }
+ int result = buriedPointMapper.updateById(existingPoint);
+ return result > 0;
+ }
- // 转换消息为实体
BuriedPoint buriedPoint = new BuriedPoint();
-
- // 设置必填字段,确保不为空
- buriedPoint.setEventId(message.getId());
+ buriedPoint.setEventId(message.getId());
buriedPoint.setEventTime(message.getEventTime());
-
- // 获取真实用户ID,避免使用默认anonymous
- String userId = message.getUserId();
- buriedPoint.setUserId(StringUtils.hasText(userId) && !"null".equals(userId) ? userId : "anonymous");
-
- String eventType = message.getEventType();
- buriedPoint.setEventType(eventType);
+ buriedPoint.setUserId(message.getUserId());
+ buriedPoint.setEventType(message.getEventType());
buriedPoint.setService(applicationName);
-
- // 设置method字段,确保获取真实方法名
buriedPoint.setMethod(message.getMethod());
buriedPoint.setSessionId(message.getSessionId());
buriedPoint.setClientIp(message.getClientIp());
buriedPoint.setServerIp(message.getServerIp());
-
- // 设置其他字段
+ buriedPoint.setStatus(message.getStatusCode());
+ buriedPoint.setRetryCount(message.getRetryCount());
buriedPoint.setPagePath(message.getPagePath());
buriedPoint.setElementId(message.getElementId());
buriedPoint.setDuration(message.getDuration());
-
buriedPoint.setCreateTime(new Date());
-
- log.debug("[埋点消费者] 埋点实体数据: eventId={}, eventType={}, userId={}, service={}, method={}",
- buriedPoint.getEventId(), buriedPoint.getEventType(),
- buriedPoint.getUserId(), buriedPoint.getService(), buriedPoint.getMethod());
-
- int result = buriedPointMapper.insert(buriedPoint);
-
- log.info("[埋点消费者] 埋点数据已保存到数据库, 事件ID: {}, 影响行数: {}", message.getId(), result);
+ buriedPoint.setUpdateTime(new Date());
+ log.debug("[埋点消费者] 埋点实体数据: eventId={}, eventType={}, userId={}, service={}, method={}, status={}, retryCount={}", buriedPoint.getEventId(), buriedPoint.getEventType(), buriedPoint.getUserId(), buriedPoint.getService(), buriedPoint.getMethod(), buriedPoint.getStatus(), buriedPoint.getRetryCount());
+ buriedPointMapper.insert(buriedPoint);
+ log.info("[埋点消费者] 埋点数据已保存到数据库, 事件ID: {}, 状态: {}", message.getId(), message.getStatusCode());
+ return true;
} catch (Exception e) {
- log.error("[埋点消费者] 保存埋点数据到数据库失败, 事件ID: {}, 错误: {}",
- message.getId(), e.getMessage(), e);
+ log.error("[埋点消费者] 保存埋点数据到数据库失败, 事件ID: {}, 错误: {}", message.getId(), e.getMessage(), e);
+ throw e;
}
}
-} */
+
+ /**
+ * 保存失败记录到BuriedPointFailRecord表
+ */
+ private void saveToFailRecord(BuriedMessages message, String cause) {
+ try {
+ String correlationId = String.valueOf(message.getId());
+ LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>();
+ queryWrapper.eq(BuriedPointFailRecord::getCorrelationId, correlationId);
+ BuriedPointFailRecord existingRecord = buriedPointFailRecordMapper.selectOne(queryWrapper);
+
+ if (existingRecord != null) {
+ log.info("[埋点消费者] 发现已有失败记录,将更新: {}", correlationId);
+ existingRecord.setExchange(BuriedMessages.EXCHANGE);
+ existingRecord.setRoutingKey(BuriedMessages.ROUTING_KEY);
+ existingRecord.setCause(cause);
+ existingRecord.setMessageContent(JsonUtils.toJsonString(message));
+ existingRecord.setRetryCount(message.getRetryCount());
+ existingRecord.setStatus(BuriedPointFailRecord.STATUS_UNPROCESSED);
+ existingRecord.setUpdateTime(new Date());
+ buriedPointFailRecordMapper.updateById(existingRecord);
+ log.info("[埋点消费者] 已更新失败记录, 事件ID: {}", correlationId);
+ } else {
+ BuriedPointFailRecord failRecord = new BuriedPointFailRecord();
+ failRecord.setCorrelationId(correlationId);
+ failRecord.setExchange(BuriedMessages.EXCHANGE);
+ failRecord.setRoutingKey(BuriedMessages.ROUTING_KEY);
+ failRecord.setCause(cause);
+ failRecord.setMessageContent(JsonUtils.toJsonString(message));
+ failRecord.setRetryCount(message.getRetryCount());
+ failRecord.setStatus(BuriedPointFailRecord.STATUS_UNPROCESSED);
+ failRecord.setCreateTime(new Date());
+ failRecord.setUpdateTime(new Date());
+ buriedPointFailRecordMapper.insert(failRecord);
+ log.info("[埋点消费者] 已将失败消息保存到失败记录表, 事件ID: {}", message.getId());
+ }
+ } catch (Exception e) {
+ log.error("[埋点消费者] 保存失败记录失败: {}, 错误: {}", message.getId(), e.getMessage(), e);
+ }
+ }
+}
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/docs/BuriedPointExample.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/docs/BuriedPointExample.java
deleted file mode 100644
index c863c5a..0000000
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/docs/BuriedPointExample.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package com.tashow.cloud.app.mq.docs;
-
-import com.tashow.cloud.app.mq.annotation.BuriedPoint;
-import org.springframework.stereotype.Service;
-
-/**
- * 埋点使用示例
- */
-@Service
-public class BuriedPointExample {
-
- /**
- * 使用埋点注解标记的方法
- * 系统会自动记录该方法的调用情况
- */
- @BuriedPoint(
- eventType = "BUSINESS_OPERATION",
- description = "处理业务数据",
- recordParams = true,
- recordResult = true
- )
- public String processBusiness(String businessId, String operationType) {
- // 业务处理逻辑
- return "处理完成: " + businessId;
- }
-
- /**
- * 如何使用埋点系统:
- *
- * 1. 静默埋点 - 无需任何代码
- * 系统会自动对所有API请求进行埋点,记录请求的开始和结束信息
- * 包括: 请求时间、路径、参数、响应时间、状态码等
- *
- * 2. 注解埋点 - 针对特定方法
- * 在需要埋点的方法上添加 @BuriedPoint 注解
- * 可以设置事件类型、描述,以及是否记录参数和返回结果
- *
- * 3. 埋点数据流转
- * 所有埋点数据会通过RabbitMQ发送到消息队列
- * 可以在消费者端实现数据的存储和分析
- *
- * 4. 数据字段说明
- * id: 埋点事件唯一ID
- * eventTime: 事件发生时间
- * service: 服务名称
- * method: 方法/接口
- * userId: 用户标识
- * clientIp: 客户端IP
- * eventType: 事件类型
- * duration: 操作耗时
- * extraData: 额外数据(参数、结果等)
- */
- public void howToUse() {
- // 示例方法
- }
-}
\ No newline at end of file
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/docs/backend-silent-tracking-rabbitmq.md b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/docs/backend-silent-tracking-rabbitmq.md
deleted file mode 100644
index fbc383e..0000000
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/docs/backend-silent-tracking-rabbitmq.md
+++ /dev/null
@@ -1,220 +0,0 @@
-# 后端静默埋点实现方案(基于RabbitMQ)
-
-后端静默埋点是指不依赖前端代码,完全由后端服务收集用户行为和应用性能数据的方案。以下是使用RabbitMQ实现后端静默埋点的详细方案:
-
-## 一、方案架构
-
-[业务服务] → [RabbitMQ] → [埋点消费服务] → [数据分析存储]
-
-## 二、具体实现步骤
-
-### 1. 埋点数据收集
-
-在业务服务中需要埋点的位置添加消息发送代码:
-
-```java
-// Java示例使用Spring AMQP
-@Autowired
-private RabbitTemplate rabbitTemplate;
-
-public void someBusinessMethod(User user, Action action) {
- try {
- // 构造埋点数据
- Map trackingData = new HashMap<>();
- trackingData.put("event_time", System.currentTimeMillis());
- trackingData.put("user_id", user.getId());
- trackingData.put("action", action.name());
- trackingData.put("service", "order-service");
- trackingData.put("ip", getClientIp());
-
- // 发送到RabbitMQ
- rabbitTemplate.convertAndSend(
- "tracking.exchange",
- "tracking.route",
- trackingData
- );
- } catch (Exception e) {
- // 埋点异常不应影响主流程
- log.error("Tracking data send failed", e);
- }
-}
-```
-
-### 2. RabbitMQ配置
-
-#### 创建专用Exchange和Queue
-
-```yaml
-# application.yml配置示例
-spring:
- rabbitmq:
- host: rabbitmq-server
- port: 5672
- username: guest
- password: guest
- template:
- exchange: tracking.exchange
- listener:
- simple:
- acknowledge-mode: auto
-```
-
-#### 声明Exchange和Queue
-
-```java
-@Configuration
-public class RabbitMQConfig {
-
- @Bean
- public Exchange trackingExchange() {
- return ExchangeBuilder.directExchange("tracking.exchange")
- .durable(true)
- .build();
- }
-
- @Bean
- public Queue trackingQueue() {
- return QueueBuilder.durable("tracking.queue")
- .withArgument("x-message-ttl", 86400000) // 消息存活时间
- .build();
- }
-
- @Bean
- public Binding trackingBinding() {
- return BindingBuilder.bind(trackingQueue())
- .to(trackingExchange())
- .with("tracking.route")
- .noargs();
- }
-}
-```
-
-### 3. 埋点数据处理服务
-
-创建专门的消费者服务处理埋点数据:
-
-```java
-@Component
-public class TrackingDataConsumer {
-
- private static final Logger log = LoggerFactory.getLogger(TrackingDataConsumer.class);
-
- @RabbitListener(queues = "tracking.queue")
- public void processTrackingData(Map trackingData) {
- try {
- // 1. 数据校验和补全
- validateAndEnrichData(trackingData);
-
- // 2. 数据格式化
- TrackingEvent event = convertToEvent(trackingData);
-
- // 3. 数据存储或转发
- saveToDatabase(event);
- // 或 sendToAnalyticsPlatform(event);
-
- } catch (Exception e) {
- log.error("Process tracking data failed: {}", trackingData, e);
- // 可根据需要加入重试或死信队列逻辑
- }
- }
-
- private void validateAndEnrichData(Map data) {
- if (!data.containsKey("user_id")) {
- data.put("user_id", "anonymous");
- }
- // 补充服务IP等信息
- data.put("server_ip", getServerIp());
- }
-
- // 其他辅助方法...
-}
-```
-
-### 4. 高级配置
-
-#### 消息可靠性保证
-
-```java
-// 发送端确认回调
-rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
- if (!ack) {
- log.warn("Tracking message lost, cause: {}", cause);
- // 可加入本地缓存或日志,后续重发
- }
-});
-
-// 返回回调(路由失败)
-rabbitTemplate.setReturnsCallback(returned -> {
- log.warn("Tracking message returned: {}", returned.getMessage());
-});
-```
-
-#### 批量发送优化
-
-```java
-// 使用BatchingRabbitTemplate提高性能
-@Bean
-public BatchingRabbitTemplate batchingRabbitTemplate(
- RabbitTemplate rabbitTemplate,
- SimpleBatchingStrategy strategy) {
-
- return new BatchingRabbitTemplate(strategy);
-}
-
-@Bean
-public SimpleBatchingStrategy batchingStrategy() {
- return new SimpleBatchingStrategy(100, 1024 * 1024, 5000);
-}
-```
-
-## 三、埋点数据设计建议
-
-### 1. 通用字段
-
-event_id: 事件唯一ID
-
-event_time: 事件时间戳
-
-service: 服务名称
-
-method: 方法/接口
-
-user_id: 用户标识
-
-session_id: 会话标识
-
-client_ip: 客户端IP
-
-server_ip: 服务器IP
-
-### 2. 业务字段
-
-action: 操作类型
-
-target: 操作目标
-
-params: 业务参数
-
-result: 操作结果
-
-cost_time: 耗时(ms)
-
-## 四、性能优化建议
-
-异步发送:确保埋点发送不阻塞主业务流程
-
-本地缓存:网络异常时先缓存到本地,后续重发
-
-采样率:高流量场景可配置采样率
-
-消息压缩:大数据量时可启用消息压缩
-
-独立集群:埋点使用独立的RabbitMQ集群,避免影响业务消息
-
-## 五、监控与维护
-
-监控RabbitMQ队列积压情况
-
-设置埋点数据处理的延迟告警
-
-定期审计埋点数据的完整性和准确性
\ No newline at end of file
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/mapper/BuriedPointMapper.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/mapper/BuriedPointMapper.java
deleted file mode 100644
index bd56003..0000000
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/mapper/BuriedPointMapper.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package com.tashow.cloud.app.mq.mapper;
-
-import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-import com.tashow.cloud.app.mq.model.BuriedPoint;
-import org.apache.ibatis.annotations.Mapper;
-
-
-@Mapper
-public interface BuriedPointMapper extends BaseMapper {
-}
\ No newline at end of file
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/message/BuriedMessages.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/message/BuriedMessages.java
index 243fae9..7a072e1 100644
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/message/BuriedMessages.java
+++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/message/BuriedMessages.java
@@ -7,13 +7,12 @@ import java.util.Map;
@Data
public class BuriedMessages implements Serializable {
+ private static final long serialVersionUID = 1L; // 添加序列化ID
+
// 消息队列配置
public static final String QUEUE = "BURIED_POINT_QUEUE";
public static final String EXCHANGE = "BURIED_POINT_EXCHANGE";
public static final String ROUTING_KEY = "BURIED_POINT_ROUTING_KEY";
- public static final String DEAD_LETTER_EXCHANGE = "DEAD_LETTER_EXCHANGE";
- public static final String DEAD_LETTER_ROUTING_KEY = "DEAD_LETTER_ROUTING";
- public static final String DEAD_LETTER_QUEUE = "DEAD_LETTER_QUEUE";
// 状态码定义
public static final Integer STATUS_INIT = 10; // 初始状态
@@ -21,8 +20,8 @@ public class BuriedMessages implements Serializable {
public static final Integer STATUS_SUCCESS = 30; // 处理成功
public static final Integer STATUS_WARNING = 40; // 处理警告
public static final Integer STATUS_ERROR = 50; // 处理错误
+
- // 通用字段
private Integer id; // 事件唯一ID
private Long eventTime; // 事件时间戳
private String service; // 服务名称
@@ -31,8 +30,7 @@ public class BuriedMessages implements Serializable {
private String sessionId; // 会话标识
private String clientIp; // 客户端IP
private String serverIp; // 服务器IP
-
- // 添加埋点特定字段
+
private String eventType; // 事件类型: PAGE_VIEW, API_CALL, BUTTON_CLICK 等
private String pagePath; // 页面路径/功能模块
private String elementId; // 元素标识
@@ -41,26 +39,11 @@ public class BuriedMessages implements Serializable {
private String userAgent; // 用户代理信息
private Integer statusCode; // 响应状态码
private String errorMessage; // 错误信息
-
- // 扩展字段,用于存储特定事件的额外数据
+ private Integer retryCount = 0; // 重试次数计数器,默认0
+
private Map extraData = new HashMap<>();
- /**
- * 快速创建消息的便捷方法
- */
- public static BuriedMessages create(String userId, String eventType, String pagePath) {
- BuriedMessages msg = new BuriedMessages();
- msg.setUserId(userId);
- msg.setEventType(eventType);
- msg.setPagePath(pagePath);
- msg.setEventTime(System.currentTimeMillis());
- msg.setStatusCode(STATUS_INIT); // 默认初始状态
- return msg;
- }
-
- /**
- * 添加扩展数据
- */
+
public BuriedMessages addExtraData(String key, Object value) {
if (this.extraData == null) {
this.extraData = new HashMap<>();
@@ -68,4 +51,17 @@ public class BuriedMessages implements Serializable {
this.extraData.put(key, value);
return this;
}
+
+ /**
+ * 增加重试计数
+ */
+ public void incrementRetryCount() {
+ if (this.retryCount == null) {
+ this.retryCount = 0;
+ }
+ this.retryCount++;
+ }
+
+
+
}
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/producer/buriedPoint/BuriedPointProducer.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/producer/buriedPoint/BuriedPointProducer.java
index 7130e70..648691c 100644
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/producer/buriedPoint/BuriedPointProducer.java
+++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/producer/buriedPoint/BuriedPointProducer.java
@@ -1,14 +1,13 @@
package com.tashow.cloud.app.mq.producer.buriedPoint;
import com.tashow.cloud.app.mq.message.BuriedMessages;
+import com.tashow.cloud.app.mapper.BuriedPointMapper;
+import com.tashow.cloud.common.util.json.JsonUtils;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.UUID;
/**
@@ -16,34 +15,49 @@ import java.util.UUID;
*/
@Slf4j
@Component
-public class BuriedPointProducer {
+public class BuriedPointProducer implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
+ @Autowired
+ private BuriedPointMapper buriedPointMapper;
+
/**
- * 异步发送完整的埋点消息,并确保消息已被broker接收
+ * 异步发送完整的埋点消息(生成新的correlationId)
*/
@SneakyThrows
public void asyncSendMessage(BuriedMessages message) {
- CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
- // final CompletableFuture confirmFuture = new CompletableFuture<>();
- log.info("[埋点] 异步准备发送消息: {}", message);
- correlationData.getFuture().whenComplete((confirm, ex) -> {
- log.info("[埋点] 异步消息发送确认回调: {}", message);
- if (ex != null) {
- log.error("[埋点] 异步消息发送异常: {}", ex.getMessage(), ex);
- // confirmFuture.completeExceptionally(ex);
- } else if (confirm != null && confirm.isAck()) {
- log.info("[埋点] 异步消息发送成功: {}", message);
- // confirmFuture.complete(true);
- } else {
- log.warn("[埋点] 异步消息发送未被ACK");
-//confirmFuture.complete(false);
- }
- });
+ String correlationId = UUID.randomUUID().toString();
+ asyncSendMessage(message, correlationId);
+ }
+
+ /**
+ * 异步发送完整的埋点消息(使用指定的correlationId)
+ * 用于重试场景,保持原有的correlationId
+ */
+ @SneakyThrows
+ public void asyncSendMessage(BuriedMessages message, String correlationId) {
+ log.info("[埋点] 异步准备发送消息: {}, correlationId: {}", message, correlationId);
+ String messageJson = JsonUtils.toJsonString(message);
+
+ CustomCorrelationData correlationData = new CustomCorrelationData(correlationId, messageJson);
+
rabbitTemplate.convertAndSend(BuriedMessages.EXCHANGE, BuriedMessages.ROUTING_KEY, message, correlationData);
- log.info("[埋点] 异步消息发送完成: {}", message);
- // return null;
+ log.info("[埋点] 异步消息发送完成: {}, 状态: {}, 重试次数: {}, correlationId: {}",
+ message.getId(), message.getStatusCode(), message.getRetryCount(), correlationId);
+ }
+
+
+ /**
+ * 确认消息是否成功发送到Broker的回调方法
+ */
+ @Override
+ public void confirm(CorrelationData correlationData, boolean ack, String cause) {
+ if (ack) {
+ log.info("[埋点] 消息发送确认成功: {}", correlationData.getId());
+ } else {
+ log.error("[埋点] 消息发送确认失败: {}, 原因: {}", correlationData.getId(), cause);
+ }
}
}
\ No newline at end of file
diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/security/config/SecurityConfiguration.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/security/config/SecurityConfiguration.java
index 693e32b..e84bc72 100644
--- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/security/config/SecurityConfiguration.java
+++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/security/config/SecurityConfiguration.java
@@ -20,7 +20,6 @@ public class SecurityConfiguration {
@Bean("infraAuthorizeRequestsCustomizer")
public AuthorizeRequestsCustomizer authorizeRequestsCustomizer() {
return new AuthorizeRequestsCustomizer() {
-
@Override
public void customize(AuthorizeHttpRequestsConfigurer.AuthorizationManagerRequestMatcherRegistry registry) {
// Swagger 接口文档