提交
This commit is contained in:
@@ -45,23 +45,6 @@
|
|||||||
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
|
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
|
||||||
<groupId>cn.dev33</groupId>
|
|
||||||
<artifactId>sa-token-reactor-spring-boot3-starter</artifactId>
|
|
||||||
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<!-- Sa-Token 整合 Redis (使用 jackson 序列化方式) -->
|
|
||||||
<dependency>
|
|
||||||
<groupId>cn.dev33</groupId>
|
|
||||||
<artifactId>sa-token-redis-jackson</artifactId>
|
|
||||||
<version>1.42.0</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.commons</groupId>
|
|
||||||
<artifactId>commons-pool2</artifactId>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<!-- Registry 注册中心相关 -->
|
<!-- Registry 注册中心相关 -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.alibaba.cloud</groupId>
|
<groupId>com.alibaba.cloud</groupId>
|
||||||
|
|||||||
@@ -1,43 +0,0 @@
|
|||||||
package com.tashow.cloud.gateway.filter.security;
|
|
||||||
|
|
||||||
import cn.dev33.satoken.reactor.filter.SaReactorFilter;
|
|
||||||
import cn.dev33.satoken.router.SaRouter;
|
|
||||||
import cn.dev33.satoken.stp.StpUtil;
|
|
||||||
import cn.dev33.satoken.util.SaResult;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
|
||||||
import org.springframework.context.annotation.Configuration;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* [Sa-Token 权限认证] 配置类
|
|
||||||
* @author click33
|
|
||||||
*/
|
|
||||||
@Configuration
|
|
||||||
public class SaTokenConfigure {
|
|
||||||
// 注册 Sa-Token全局过滤器
|
|
||||||
@Bean
|
|
||||||
public SaReactorFilter getSaReactorFilter() {
|
|
||||||
return new SaReactorFilter()
|
|
||||||
// 拦截地址
|
|
||||||
.addInclude("/**") /* 拦截全部path */
|
|
||||||
// 开放地址
|
|
||||||
.addExclude("/favicon.ico")
|
|
||||||
// 鉴权方法:每次访问进入
|
|
||||||
.setAuth(obj -> {
|
|
||||||
// 登录校验 -- 拦截所有路由,并排除/user/doLogin 用于开放登录
|
|
||||||
SaRouter.match("/**", "/user/doLogin", r -> StpUtil.checkLogin());
|
|
||||||
|
|
||||||
// 权限认证 -- 不同模块, 校验不同权限
|
|
||||||
SaRouter.match("/user/**", r -> StpUtil.checkPermission("user"));
|
|
||||||
SaRouter.match("/admin/**", r -> StpUtil.checkPermission("admin"));
|
|
||||||
SaRouter.match("/goods/**", r -> StpUtil.checkPermission("goods"));
|
|
||||||
SaRouter.match("/orders/**", r -> StpUtil.checkPermission("orders"));
|
|
||||||
|
|
||||||
// 更多匹配 ... */
|
|
||||||
})
|
|
||||||
// 异常处理方法:每次setAuth函数出现异常时进入
|
|
||||||
.setError(e -> {
|
|
||||||
return SaResult.error(e.getMessage());
|
|
||||||
})
|
|
||||||
;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,37 +0,0 @@
|
|||||||
package com.tashow.cloud.gateway.filter.security;
|
|
||||||
|
|
||||||
import cn.dev33.satoken.stp.StpInterface;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 自定义权限验证接口扩展
|
|
||||||
*/
|
|
||||||
@Component
|
|
||||||
public class StpInterfaceImpl implements StpInterface {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<String> getPermissionList(Object loginId, String loginType) {
|
|
||||||
// 本 list 仅做模拟,实际项目中要根据具体业务逻辑来查询权限
|
|
||||||
List<String> list = new ArrayList<String>();
|
|
||||||
list.add("101");
|
|
||||||
list.add("user.add");
|
|
||||||
list.add("user.update");
|
|
||||||
list.add("user.get");
|
|
||||||
// list.add("user.delete");
|
|
||||||
list.add("art.*");
|
|
||||||
return list;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<String> getRoleList(Object loginId, String loginType) {
|
|
||||||
// 本 list 仅做模拟,实际项目中要根据具体业务逻辑来查询角色
|
|
||||||
List<String> list = new ArrayList<String>();
|
|
||||||
list.add("admin");
|
|
||||||
list.add("super-admin");
|
|
||||||
return list;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -2,12 +2,14 @@ package com.tashow.cloud.app;
|
|||||||
|
|
||||||
import org.springframework.boot.SpringApplication;
|
import org.springframework.boot.SpringApplication;
|
||||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
|
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hello world!
|
* Hello world!
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
@SpringBootApplication
|
@SpringBootApplication
|
||||||
|
@EnableScheduling
|
||||||
public class AppServerApplication {
|
public class AppServerApplication {
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
|
|||||||
@@ -1,14 +1,10 @@
|
|||||||
package com.tashow.cloud.app.controller;
|
package com.tashow.cloud.app.controller;
|
||||||
|
import com.tashow.cloud.app.mapper.BuriedPointMapper;
|
||||||
import com.tashow.cloud.app.mq.annotation.BuriedPoint;
|
|
||||||
import com.tashow.cloud.app.mq.mapper.BuriedPointMapper;
|
|
||||||
import com.tashow.cloud.app.mq.message.BuriedMessages;
|
|
||||||
import com.tashow.cloud.app.mq.producer.buriedPoint.BuriedPointProducer;
|
import com.tashow.cloud.app.mq.producer.buriedPoint.BuriedPointProducer;
|
||||||
import jakarta.annotation.security.PermitAll;
|
import jakarta.annotation.security.PermitAll;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import org.springframework.web.bind.annotation.*;
|
import org.springframework.web.bind.annotation.*;
|
||||||
|
|
||||||
import java.util.Date;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
package com.tashow.cloud.app.mq.interceptor;
|
package com.tashow.cloud.app.interceptor;
|
||||||
|
|
||||||
import cn.hutool.core.util.IdUtil;
|
import cn.hutool.core.util.IdUtil;
|
||||||
import com.tashow.cloud.app.mq.message.BuriedMessages;
|
import com.tashow.cloud.app.mq.message.BuriedMessages;
|
||||||
@@ -37,22 +37,18 @@ public class BuriedPointInterceptor implements HandlerInterceptor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// 开始计时
|
|
||||||
StopWatch stopWatch = new StopWatch();
|
StopWatch stopWatch = new StopWatch();
|
||||||
stopWatch.start();
|
stopWatch.start();
|
||||||
request.setAttribute(ATTRIBUTE_STOPWATCH, stopWatch);
|
request.setAttribute(ATTRIBUTE_STOPWATCH, stopWatch);
|
||||||
|
|
||||||
// 生成请求ID
|
|
||||||
int requestId = (int)(Math.abs(IdUtil.getSnowflakeNextId()) % Integer.MAX_VALUE);
|
int requestId = (int)(Math.abs(IdUtil.getSnowflakeNextId()) % Integer.MAX_VALUE);
|
||||||
request.setAttribute(ATTRIBUTE_REQUEST_ID, requestId);
|
request.setAttribute(ATTRIBUTE_REQUEST_ID, requestId);
|
||||||
|
|
||||||
// 收集埋点数据
|
|
||||||
HandlerMethod handlerMethod = (HandlerMethod) handler;
|
HandlerMethod handlerMethod = (HandlerMethod) handler;
|
||||||
String method = request.getMethod() + " " + request.getRequestURI()+ JsonUtils.toJsonString(request.getParameterMap());
|
String method = request.getMethod() + " " + request.getRequestURI()+ JsonUtils.toJsonString(request.getParameterMap());
|
||||||
String controllerName = handlerMethod.getBeanType().getSimpleName();
|
String controllerName = handlerMethod.getBeanType().getSimpleName();
|
||||||
String actionName = handlerMethod.getMethod().getName();
|
String actionName = handlerMethod.getMethod().getName();
|
||||||
|
|
||||||
// 创建埋点消息
|
|
||||||
BuriedMessages message = new BuriedMessages();
|
BuriedMessages message = new BuriedMessages();
|
||||||
message.setId(requestId);
|
message.setId(requestId);
|
||||||
message.setEventTime(System.currentTimeMillis());
|
message.setEventTime(System.currentTimeMillis());
|
||||||
@@ -65,7 +61,7 @@ public class BuriedPointInterceptor implements HandlerInterceptor {
|
|||||||
message.setEventType("API_REQUEST_START");
|
message.setEventType("API_REQUEST_START");
|
||||||
message.setPagePath(controllerName + "#" + actionName);
|
message.setPagePath(controllerName + "#" + actionName);
|
||||||
message.setUserAgent(request.getHeader("User-Agent"));
|
message.setUserAgent(request.getHeader("User-Agent"));
|
||||||
message.setStatusCode(BuriedMessages.STATUS_INIT);
|
message.setStatusCode(BuriedMessages.STATUS_PROCESSING);
|
||||||
buriedPointProducer.asyncSendMessage(message);
|
buriedPointProducer.asyncSendMessage(message);
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.debug("[埋点] 收集请求开始数据: {}", message);
|
log.debug("[埋点] 收集请求开始数据: {}", message);
|
||||||
@@ -73,7 +69,6 @@ public class BuriedPointInterceptor implements HandlerInterceptor {
|
|||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.warn("[埋点] 埋点数据收集异常", e);
|
log.warn("[埋点] 埋点数据收集异常", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -86,7 +81,6 @@ public class BuriedPointInterceptor implements HandlerInterceptor {
|
|||||||
if (userAttribute != null) {
|
if (userAttribute != null) {
|
||||||
return userAttribute.toString();
|
return userAttribute.toString();
|
||||||
}
|
}
|
||||||
// 返回匿名用户标识
|
|
||||||
return "anonymous";
|
return "anonymous";
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -0,0 +1,14 @@
|
|||||||
|
package com.tashow.cloud.app.mapper;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||||
|
import com.tashow.cloud.app.model.BuriedPoint;
|
||||||
|
import org.apache.ibatis.annotations.Mapper;
|
||||||
|
import org.apache.ibatis.annotations.Param;
|
||||||
|
import org.apache.ibatis.annotations.Select;
|
||||||
|
|
||||||
|
@Mapper
|
||||||
|
public interface BuriedPointMapper extends BaseMapper<BuriedPoint> {
|
||||||
|
|
||||||
|
@Select("SELECT * FROM app_burying WHERE event_id = #{eventId} LIMIT 1")
|
||||||
|
BuriedPoint selectByEventId(@Param("eventId") Integer eventId);
|
||||||
|
}
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package com.tashow.cloud.app.mq.model;
|
package com.tashow.cloud.app.model;
|
||||||
|
|
||||||
import com.baomidou.mybatisplus.annotation.IdType;
|
import com.baomidou.mybatisplus.annotation.IdType;
|
||||||
import com.baomidou.mybatisplus.annotation.TableField;
|
import com.baomidou.mybatisplus.annotation.TableField;
|
||||||
@@ -106,4 +106,10 @@ public class BuriedPoint {
|
|||||||
|
|
||||||
@TableField(value = "status")
|
@TableField(value = "status")
|
||||||
private Integer status;
|
private Integer status;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 重试次数
|
||||||
|
*/
|
||||||
|
@TableField(value = "retry_count")
|
||||||
|
private Integer retryCount;
|
||||||
}
|
}
|
||||||
@@ -1,33 +0,0 @@
|
|||||||
package com.tashow.cloud.app.mq.annotation;
|
|
||||||
|
|
||||||
import java.lang.annotation.*;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 埋点注解
|
|
||||||
* 用于标记需要特殊埋点的方法或接口
|
|
||||||
*/
|
|
||||||
@Target({ElementType.METHOD})
|
|
||||||
@Retention(RetentionPolicy.RUNTIME)
|
|
||||||
@Documented
|
|
||||||
public @interface BuriedPoint {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 埋点事件类型
|
|
||||||
*/
|
|
||||||
String eventType() default "";
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 埋点描述信息
|
|
||||||
*/
|
|
||||||
String description() default "";
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 是否记录方法入参
|
|
||||||
*/
|
|
||||||
boolean recordParams() default false;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 是否记录返回结果
|
|
||||||
*/
|
|
||||||
boolean recordResult() default false;
|
|
||||||
}
|
|
||||||
@@ -1,28 +0,0 @@
|
|||||||
package com.tashow.cloud.app.mq.annotation;
|
|
||||||
|
|
||||||
import java.lang.annotation.*;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 埋点注解
|
|
||||||
* 用于标记需要埋点的方法
|
|
||||||
*/
|
|
||||||
@Target(ElementType.METHOD)
|
|
||||||
@Retention(RetentionPolicy.RUNTIME)
|
|
||||||
@Documented
|
|
||||||
public @interface BuryPoint {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 事件类型
|
|
||||||
*/
|
|
||||||
String eventType() default "METHOD";
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 页面路径/功能模块
|
|
||||||
*/
|
|
||||||
String pagePath() default "";
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 描述信息
|
|
||||||
*/
|
|
||||||
String description() default "";
|
|
||||||
}
|
|
||||||
@@ -1,223 +0,0 @@
|
|||||||
package com.tashow.cloud.app.mq.aspect;
|
|
||||||
|
|
||||||
import cn.hutool.core.util.IdUtil;
|
|
||||||
import cn.hutool.json.JSONUtil;
|
|
||||||
import com.tashow.cloud.app.mq.annotation.BuriedPoint;
|
|
||||||
import com.tashow.cloud.app.mq.message.BuriedMessages;
|
|
||||||
import com.tashow.cloud.app.mq.producer.buriedPoint.BuriedPointProducer;
|
|
||||||
import com.tashow.cloud.common.util.servlet.ServletUtils;
|
|
||||||
import com.tashow.cloud.common.util.spring.SpringUtils;
|
|
||||||
import jakarta.servlet.http.HttpServletRequest;
|
|
||||||
import lombok.RequiredArgsConstructor;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.aspectj.lang.ProceedingJoinPoint;
|
|
||||||
import org.aspectj.lang.annotation.Around;
|
|
||||||
import org.aspectj.lang.annotation.Aspect;
|
|
||||||
import org.aspectj.lang.annotation.Pointcut;
|
|
||||||
import org.aspectj.lang.reflect.MethodSignature;
|
|
||||||
import org.springframework.core.annotation.Order;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
import org.springframework.web.context.request.RequestContextHolder;
|
|
||||||
import org.springframework.web.context.request.ServletRequestAttributes;
|
|
||||||
|
|
||||||
import java.lang.reflect.Method;
|
|
||||||
import java.net.InetAddress;
|
|
||||||
import java.net.UnknownHostException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 埋点切面
|
|
||||||
* 处理自定义埋点注解
|
|
||||||
*/
|
|
||||||
@Aspect
|
|
||||||
@Component
|
|
||||||
@Order(10)
|
|
||||||
@Slf4j
|
|
||||||
@RequiredArgsConstructor
|
|
||||||
public class BuriedPointAspect {
|
|
||||||
/*
|
|
||||||
private final BuriedPointProducer buriedPointProducer;
|
|
||||||
|
|
||||||
@Pointcut("@annotation(com.tashow.cloud.app.mq.annotation.BuriedPoint)")
|
|
||||||
public void buriedPointCut() {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Around("buriedPointCut()")
|
|
||||||
public Object around(ProceedingJoinPoint point) throws Throwable {
|
|
||||||
// 获取方法签名
|
|
||||||
MethodSignature signature = (MethodSignature) point.getSignature();
|
|
||||||
Method method = signature.getMethod();
|
|
||||||
|
|
||||||
// 获取注解
|
|
||||||
BuriedPoint buriedPoint = method.getAnnotation(BuriedPoint.class);
|
|
||||||
if (buriedPoint == null) {
|
|
||||||
return point.proceed();
|
|
||||||
}
|
|
||||||
|
|
||||||
// 生成埋点ID
|
|
||||||
int buriedId = (int)(Math.abs(IdUtil.getSnowflakeNextId()) % Integer.MAX_VALUE);
|
|
||||||
|
|
||||||
// 记录开始埋点
|
|
||||||
recordMethodStart(point, method, buriedPoint, buriedId);
|
|
||||||
|
|
||||||
long startTime = System.currentTimeMillis();
|
|
||||||
Object result = null;
|
|
||||||
Exception exception = null;
|
|
||||||
try {
|
|
||||||
// 执行原方法
|
|
||||||
result = point.proceed();
|
|
||||||
return result;
|
|
||||||
} catch (Exception e) {
|
|
||||||
exception = e;
|
|
||||||
throw e;
|
|
||||||
} finally {
|
|
||||||
long duration = System.currentTimeMillis() - startTime;
|
|
||||||
// 记录结束埋点
|
|
||||||
recordMethodEnd(point, method, buriedPoint, result, exception, duration, buriedId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
*//**
|
|
||||||
* 记录方法开始埋点
|
|
||||||
*//*
|
|
||||||
private void recordMethodStart(ProceedingJoinPoint point, Method method, BuriedPoint buriedPoint, int buriedId) {
|
|
||||||
try {
|
|
||||||
// 获取类和方法名
|
|
||||||
String className = point.getTarget().getClass().getSimpleName();
|
|
||||||
String methodName = method.getName();
|
|
||||||
|
|
||||||
// 创建埋点消息
|
|
||||||
BuriedMessages message = new BuriedMessages();
|
|
||||||
message.setId(buriedId);
|
|
||||||
message.setEventTime(System.currentTimeMillis());
|
|
||||||
message.setService(SpringUtils.getApplicationName());
|
|
||||||
message.setMethod(className + "." + methodName);
|
|
||||||
message.setEventType(buriedPoint.eventType().isEmpty() ? "METHOD_START" : buriedPoint.eventType() + "_START");
|
|
||||||
message.setPagePath(className + "#" + methodName);
|
|
||||||
|
|
||||||
// 设置用户ID和其他信息
|
|
||||||
setRequestInfo(message);
|
|
||||||
|
|
||||||
// 如果需要记录入参
|
|
||||||
if (buriedPoint.recordParams() && point.getArgs() != null && point.getArgs().length > 0) {
|
|
||||||
// 限制参数长度,避免埋点数据过大
|
|
||||||
String params = JSONUtil.toJsonStr(point.getArgs());
|
|
||||||
if (params.length() > 500) {
|
|
||||||
params = params.substring(0, 500) + "...";
|
|
||||||
}
|
|
||||||
message.addExtraData("params", params);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 记录描述信息
|
|
||||||
if (!buriedPoint.description().isEmpty()) {
|
|
||||||
message.addExtraData("description", buriedPoint.description());
|
|
||||||
}
|
|
||||||
|
|
||||||
// 异步发送埋点数据
|
|
||||||
buriedPointProducer.asyncSendMessage(message);
|
|
||||||
|
|
||||||
if (log.isDebugEnabled()) {
|
|
||||||
log.debug("[埋点] 方法开始: {}.{}, 事件类型: {}", className, methodName, message.getEventType());
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.warn("[埋点] 记录方法开始埋点异常", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
*//**
|
|
||||||
* 记录方法结束埋点
|
|
||||||
*//*
|
|
||||||
private void recordMethodEnd(ProceedingJoinPoint point, Method method, BuriedPoint buriedPoint,
|
|
||||||
Object result, Exception exception, long duration, int buriedId) {
|
|
||||||
try {
|
|
||||||
// 获取类和方法名
|
|
||||||
String className = point.getTarget().getClass().getSimpleName();
|
|
||||||
String methodName = method.getName();
|
|
||||||
|
|
||||||
// 创建埋点消息
|
|
||||||
BuriedMessages message = new BuriedMessages();
|
|
||||||
message.setId(buriedId);
|
|
||||||
message.setEventTime(System.currentTimeMillis());
|
|
||||||
message.setService(SpringUtils.getApplicationName());
|
|
||||||
message.setMethod(className + "." + methodName);
|
|
||||||
message.setEventType(buriedPoint.eventType().isEmpty() ? "METHOD_END" : buriedPoint.eventType() + "_END");
|
|
||||||
message.setPagePath(className + "#" + methodName);
|
|
||||||
message.setDuration(duration);
|
|
||||||
|
|
||||||
// 设置用户ID和其他信息
|
|
||||||
setRequestInfo(message);
|
|
||||||
|
|
||||||
// 如果需要记录结果
|
|
||||||
if (buriedPoint.recordResult() && result != null && exception == null) {
|
|
||||||
// 限制结果长度,避免埋点数据过大
|
|
||||||
String resultStr = JSONUtil.toJsonStr(result);
|
|
||||||
if (resultStr.length() > 500) {
|
|
||||||
resultStr = resultStr.substring(0, 500) + "...";
|
|
||||||
}
|
|
||||||
message.addExtraData("result", resultStr);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 如果有异常,记录异常信息
|
|
||||||
if (exception != null) {
|
|
||||||
message.setErrorMessage(exception.getMessage());
|
|
||||||
message.addExtraData("exceptionClass", exception.getClass().getName());
|
|
||||||
}
|
|
||||||
|
|
||||||
// 异步发送埋点数据
|
|
||||||
buriedPointProducer.asyncSendMessage(message);
|
|
||||||
|
|
||||||
if (log.isDebugEnabled()) {
|
|
||||||
log.debug("[埋点] 方法结束: {}.{}, 事件类型: {}, 耗时: {}ms",
|
|
||||||
className, methodName, message.getEventType(), duration);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.warn("[埋点] 记录方法结束埋点异常", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
*//**
|
|
||||||
* 设置请求相关信息
|
|
||||||
*//*
|
|
||||||
private void setRequestInfo(BuriedMessages message) {
|
|
||||||
try {
|
|
||||||
// 获取当前请求信息
|
|
||||||
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
|
|
||||||
if (attributes != null) {
|
|
||||||
HttpServletRequest request = attributes.getRequest();
|
|
||||||
|
|
||||||
// 设置用户ID和其他请求信息
|
|
||||||
message.setUserId(getUserId(request));
|
|
||||||
message.setSessionId(request.getSession().getId());
|
|
||||||
message.setClientIp(ServletUtils.getClientIP(request));
|
|
||||||
message.setUserAgent(request.getHeader("User-Agent"));
|
|
||||||
}
|
|
||||||
|
|
||||||
// 设置服务器IP
|
|
||||||
message.setServerIp(getServerIp());
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.warn("[埋点] 设置请求信息异常", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
*//**
|
|
||||||
* 获取当前登录用户ID
|
|
||||||
*//*
|
|
||||||
private String getUserId(HttpServletRequest request) {
|
|
||||||
// 这里需要根据项目实际的用户体系来获取用户ID
|
|
||||||
Object userAttribute = request.getSession().getAttribute("USER_ID");
|
|
||||||
if (userAttribute != null) {
|
|
||||||
return userAttribute.toString();
|
|
||||||
}
|
|
||||||
return "anonymous";
|
|
||||||
}
|
|
||||||
|
|
||||||
*//**
|
|
||||||
* 获取服务器IP
|
|
||||||
*//*
|
|
||||||
private String getServerIp() {
|
|
||||||
try {
|
|
||||||
return InetAddress.getLocalHost().getHostAddress();
|
|
||||||
} catch (UnknownHostException e) {
|
|
||||||
return "unknown";
|
|
||||||
}
|
|
||||||
}*/
|
|
||||||
}
|
|
||||||
@@ -1,12 +1,12 @@
|
|||||||
package com.tashow.cloud.app.mq.config;
|
package com.tashow.cloud.app.mq.config;
|
||||||
import com.tashow.cloud.app.mq.interceptor.BuriedPointInterceptor;
|
import com.tashow.cloud.app.interceptor.BuriedPointInterceptor;
|
||||||
|
import com.tashow.cloud.app.mapper.BuriedPointFailRecordMapper;
|
||||||
import com.tashow.cloud.app.mq.message.BuriedMessages;
|
import com.tashow.cloud.app.mq.message.BuriedMessages;
|
||||||
|
import com.tashow.cloud.app.model.BuriedPointFailRecord;
|
||||||
import com.tashow.cloud.app.mq.producer.buriedPoint.BuriedPointProducer;
|
import com.tashow.cloud.app.mq.producer.buriedPoint.BuriedPointProducer;
|
||||||
|
import com.tashow.cloud.app.mq.producer.buriedPoint.CustomCorrelationData;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import org.springframework.amqp.core.Binding;
|
import org.springframework.amqp.core.*;
|
||||||
import org.springframework.amqp.core.BindingBuilder;
|
|
||||||
import org.springframework.amqp.core.DirectExchange;
|
|
||||||
import org.springframework.amqp.core.Queue;
|
|
||||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
@@ -14,6 +14,8 @@ import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
|
|||||||
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
|
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import jakarta.annotation.PostConstruct;
|
import jakarta.annotation.PostConstruct;
|
||||||
|
import java.util.Date;
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 埋点功能配置类
|
* 埋点功能配置类
|
||||||
@@ -25,42 +27,99 @@ public class BuriedPointConfiguration implements WebMvcConfigurer {
|
|||||||
|
|
||||||
private final BuriedPointProducer buriedPointProducer;
|
private final BuriedPointProducer buriedPointProducer;
|
||||||
private final RabbitTemplate rabbitTemplate;
|
private final RabbitTemplate rabbitTemplate;
|
||||||
|
private final BuriedPointFailRecordMapper buriedPointFailRecordMapper;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* RabbitTemplate初始化配置
|
* RabbitTemplate初始化配置
|
||||||
* 确保回调正确配置,以实现消息可靠性
|
|
||||||
*/
|
*/
|
||||||
// @PostConstruct
|
@PostConstruct
|
||||||
// public void initRabbitTemplate() {
|
public RabbitTemplate initRabbitTemplate() {
|
||||||
// log.info("[埋点配置] 初始化RabbitTemplate: {}", rabbitTemplate);
|
log.info("[埋点配置] 初始化RabbitTemplate: {}", rabbitTemplate);
|
||||||
// rabbitTemplate.setMandatory(true);
|
rabbitTemplate.setMandatory(true);
|
||||||
// rabbitTemplate.setReturnsCallback(returned -> {
|
rabbitTemplate.setReturnsCallback(returned -> {
|
||||||
// log.error("[埋点配置] 消息路由失败: exchange={}, routingKey={}, replyCode={}, replyText={}, message={}",
|
log.error("[埋点配置] 消息路由失败: exchange={}, routingKey={}, replyCode={}, replyText={}, ={}",
|
||||||
// returned.getExchange(),
|
returned.getExchange(),
|
||||||
// returned.getRoutingKey(),
|
returned.getRoutingKey(),
|
||||||
// returned.getReplyCode(),
|
returned.getReplyCode(),
|
||||||
// returned.getReplyText(),
|
returned.getReplyText(),
|
||||||
// new String(returned.getMessage().getBody()));
|
new String(returned.getMessage().getBody()));
|
||||||
// });
|
|
||||||
// rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
|
saveFailRecord(
|
||||||
// if (ack) {
|
returned.getMessage().getMessageProperties().getCorrelationId(),
|
||||||
// log.debug("[埋点配置] 消息成功发送到交换机: {}", correlationData);
|
returned.getExchange(),
|
||||||
// } else {
|
returned.getRoutingKey(),
|
||||||
// log.error("[埋点配置] 消息发送到交换机失败: cause={}, correlationData={}", cause, correlationData);
|
"路由失败: " + returned.getReplyText(),
|
||||||
// }
|
new String(returned.getMessage().getBody())
|
||||||
// });
|
);
|
||||||
//
|
});
|
||||||
// // 验证配置
|
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
|
||||||
// if (rabbitTemplate.isConfirmListener()) {
|
if (ack) {
|
||||||
// log.info("[埋点配置] 确认回调已正确配置");
|
log.info("[埋点配置] 消息成功发送到交换机: {}", correlationData.getId());
|
||||||
// } else {
|
} else {
|
||||||
// log.error("[埋点配置] 确认回调配置失败,请检查RabbitMQ配置!");
|
log.error("[埋点配置] 消息发送到交换机失败: cause={}, correlationData={}", cause, correlationData);
|
||||||
// }
|
CustomCorrelationData customData = (CustomCorrelationData) correlationData;
|
||||||
// }
|
String messageContent = customData.getMessageContent();
|
||||||
|
saveFailRecord(
|
||||||
|
correlationData.getId(),
|
||||||
|
BuriedMessages.EXCHANGE,
|
||||||
|
BuriedMessages.ROUTING_KEY,
|
||||||
|
cause,
|
||||||
|
messageContent
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if (rabbitTemplate.isConfirmListener()) {
|
||||||
|
log.info("[埋点配置] 确认回调已正确配置");
|
||||||
|
} else {
|
||||||
|
log.error("[埋点配置] 确认回调配置失败");
|
||||||
|
}
|
||||||
|
return rabbitTemplate;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 创建埋点队列
|
* 保存消息发送失败记录
|
||||||
*/
|
*/
|
||||||
|
private void saveFailRecord(String correlationId, String exchange, String routingKey, String cause, String messageContent) {
|
||||||
|
try {
|
||||||
|
log.info("[埋点配置] 保存发送失败记录: correlationId={}", correlationId);
|
||||||
|
|
||||||
|
// 先查询是否已存在记录
|
||||||
|
LambdaQueryWrapper<BuriedPointFailRecord> queryWrapper = new LambdaQueryWrapper<>();
|
||||||
|
queryWrapper.eq(BuriedPointFailRecord::getCorrelationId, correlationId);
|
||||||
|
BuriedPointFailRecord existingRecord = buriedPointFailRecordMapper.selectOne(queryWrapper);
|
||||||
|
|
||||||
|
if (existingRecord != null) {
|
||||||
|
// 已存在记录,执行更新
|
||||||
|
log.info("[埋点配置] 发现已有失败记录,将更新: {}", correlationId);
|
||||||
|
existingRecord.setExchange(exchange);
|
||||||
|
existingRecord.setRoutingKey(routingKey);
|
||||||
|
existingRecord.setCause(cause);
|
||||||
|
existingRecord.setMessageContent(messageContent);
|
||||||
|
existingRecord.setStatus(BuriedPointFailRecord.STATUS_UNPROCESSED);
|
||||||
|
existingRecord.setUpdateTime(new Date());
|
||||||
|
buriedPointFailRecordMapper.updateById(existingRecord);
|
||||||
|
log.info("[埋点配置] 发送失败记录已更新: correlationId={}", correlationId);
|
||||||
|
} else {
|
||||||
|
// 不存在记录,执行插入
|
||||||
|
BuriedPointFailRecord failRecord = new BuriedPointFailRecord();
|
||||||
|
failRecord.setCorrelationId(correlationId);
|
||||||
|
failRecord.setExchange(exchange);
|
||||||
|
failRecord.setRoutingKey(routingKey);
|
||||||
|
failRecord.setCause(cause);
|
||||||
|
failRecord.setMessageContent(messageContent);
|
||||||
|
failRecord.setRetryCount(0);
|
||||||
|
failRecord.setStatus(BuriedPointFailRecord.STATUS_UNPROCESSED);
|
||||||
|
failRecord.setCreateTime(new Date());
|
||||||
|
failRecord.setUpdateTime(new Date());
|
||||||
|
|
||||||
|
buriedPointFailRecordMapper.insert(failRecord);
|
||||||
|
log.info("[埋点配置] 发送失败记录已保存: correlationId={}", correlationId);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[埋点配置] 保存发送失败记录异常", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public Queue buriedPointQueue() {
|
public Queue buriedPointQueue() {
|
||||||
return new Queue(BuriedMessages.QUEUE, true, false, false);
|
return new Queue(BuriedMessages.QUEUE, true, false, false);
|
||||||
|
|||||||
@@ -1,8 +1,9 @@
|
|||||||
/*
|
|
||||||
package com.tashow.cloud.app.mq.consumer.buriedPoint;
|
package com.tashow.cloud.app.mq.consumer.buriedPoint;
|
||||||
import com.tashow.cloud.app.mq.mapper.BuriedPointMapper;
|
import com.tashow.cloud.app.mapper.BuriedPointMapper;
|
||||||
|
import com.tashow.cloud.app.mapper.BuriedPointFailRecordMapper;
|
||||||
import com.tashow.cloud.app.mq.message.BuriedMessages;
|
import com.tashow.cloud.app.mq.message.BuriedMessages;
|
||||||
import com.tashow.cloud.app.mq.model.BuriedPoint;
|
import com.tashow.cloud.app.model.BuriedPoint;
|
||||||
|
import com.tashow.cloud.app.model.BuriedPointFailRecord;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||||
@@ -11,16 +12,11 @@ import org.springframework.amqp.support.AmqpHeaders;
|
|||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.messaging.handler.annotation.Header;
|
import org.springframework.messaging.handler.annotation.Header;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.springframework.util.StringUtils;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import com.rabbitmq.client.Channel;
|
import com.rabbitmq.client.Channel;
|
||||||
*/
|
import org.springframework.dao.DuplicateKeyException;
|
||||||
/**
|
import com.tashow.cloud.common.util.json.JsonUtils;
|
||||||
* 埋点消息消费者
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
* 将埋点数据存储到数据库
|
|
||||||
*//*
|
|
||||||
|
|
||||||
@Component
|
@Component
|
||||||
@RabbitListener(queues = BuriedMessages.QUEUE)
|
@RabbitListener(queues = BuriedMessages.QUEUE)
|
||||||
@@ -29,90 +25,220 @@ import com.rabbitmq.client.Channel;
|
|||||||
public class BuriedPointConsumer {
|
public class BuriedPointConsumer {
|
||||||
|
|
||||||
private final BuriedPointMapper buriedPointMapper;
|
private final BuriedPointMapper buriedPointMapper;
|
||||||
|
private final BuriedPointFailRecordMapper buriedPointFailRecordMapper;
|
||||||
|
|
||||||
@Value("${spring.application.name:tashow-app}")
|
@Value("${spring.application.name:tashow-app}")
|
||||||
private String applicationName;
|
private String applicationName;
|
||||||
|
|
||||||
*/
|
private static final int MAX_RETRY_ALLOWED = 1;
|
||||||
/**
|
|
||||||
* 处理埋点消息
|
|
||||||
*//*
|
|
||||||
|
|
||||||
@RabbitHandler
|
@RabbitHandler
|
||||||
public void onMessage(BuriedMessages message,
|
public void onMessage(BuriedMessages message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
|
||||||
Channel channel,
|
Integer dbRetryCount = getActualRetryCount(message);
|
||||||
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
|
message.setRetryCount(dbRetryCount);
|
||||||
try {
|
if (message.getRetryCount() != null && message.getRetryCount() >= MAX_RETRY_ALLOWED) {
|
||||||
log.info("[埋点消费者] 收到埋点消息: {}", message);
|
message.setStatusCode(BuriedMessages.STATUS_ERROR);
|
||||||
|
message.addExtraData("errorMessage", "已达到最大重试次数");
|
||||||
// 确保事件ID不为空
|
saveToFailRecord(message, "已达到最大重试次数");
|
||||||
if (message.getId() == null) {
|
safeChannelAck(channel, deliveryTag);
|
||||||
message.setId((int)(System.currentTimeMillis() % Integer.MAX_VALUE));
|
return;
|
||||||
log.warn("[埋点消费者] 消息中的事件ID为空,已自动生成: {}", message.getId());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.info("[埋点消费者] 收到埋点消息: {}, 当前重试次数: {}/{}", message, message.getRetryCount(), MAX_RETRY_ALLOWED);
|
||||||
|
|
||||||
|
message.setStatusCode(BuriedMessages.STATUS_PROCESSING);
|
||||||
|
log.info("[埋点消费者] 消息状态更新为处理中(STATUS_PROCESSING): {}", message.getId());
|
||||||
|
try {
|
||||||
|
/* if(true){
|
||||||
|
throw new RuntimeException("测试异常");
|
||||||
|
}*/
|
||||||
saveToDatabase(message);
|
saveToDatabase(message);
|
||||||
channel.basicAck(deliveryTag, false);
|
message.setStatusCode(BuriedMessages.STATUS_SUCCESS);
|
||||||
log.info("[埋点消费者] 消息处理成功,已确认");
|
updateMessageStatus(message);
|
||||||
|
log.info("[埋点消费者] 消息处理成功,状态已更新为成功(STATUS_SUCCESS): {}", message.getId());
|
||||||
|
safeChannelAck(channel, deliveryTag);
|
||||||
|
} catch (DuplicateKeyException e) {
|
||||||
|
log.warn("[埋点消费者] 消息已被处理过,直接确认: {}, 错误: {}", message.getId(), e.getMessage());
|
||||||
|
safeChannelAck(channel, deliveryTag);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
message.setStatusCode(BuriedMessages.STATUS_ERROR);
|
||||||
|
message.addExtraData("errorMessage", e.getMessage());
|
||||||
|
log.error("[埋点消费者] 消息处理失败: {}, 错误: {}", message.getId(), e.getMessage());
|
||||||
|
message.incrementRetryCount();
|
||||||
|
updateRetryCount(message);
|
||||||
|
if (message.getRetryCount() >= MAX_RETRY_ALLOWED) {
|
||||||
|
saveToDatabase(message);
|
||||||
|
log.warn("[埋点消费者] 消息已达到最大重试次数: {}, 确认消息并保存到失败记录表", message.getRetryCount());
|
||||||
|
saveToFailRecord(message, e.getMessage());
|
||||||
|
safeChannelAck(channel, deliveryTag);
|
||||||
|
} else {
|
||||||
|
log.info("[埋点消费者] 消息将重新入队重试: {}, 当前重试次数: {}", message.getId(), message.getRetryCount());
|
||||||
|
safeChannelNack(channel, deliveryTag, false, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Integer getActualRetryCount(BuriedMessages message) {
|
||||||
try {
|
try {
|
||||||
channel.basicNack(deliveryTag, false, true);
|
BuriedPoint buriedPoint = buriedPointMapper.selectByEventId(message.getId());
|
||||||
} catch (IOException ex) {
|
if (buriedPoint != null && buriedPoint.getRetryCount() != null) {
|
||||||
log.error("[埋点消费者] 拒绝消息失败", ex);
|
if ((buriedPoint.getStatus() == BuriedMessages.STATUS_ERROR ||
|
||||||
|
buriedPoint.getStatus() == BuriedMessages.STATUS_PROCESSING)) {
|
||||||
|
log.info("[埋点消费者] 检测到消息可能因服{}", message.getId());
|
||||||
|
return buriedPoint.getRetryCount() - 1;
|
||||||
|
}
|
||||||
|
return buriedPoint.getRetryCount();
|
||||||
|
} else {
|
||||||
|
String correlationId = String.valueOf(message.getId());
|
||||||
|
LambdaQueryWrapper<BuriedPointFailRecord> queryWrapper = new LambdaQueryWrapper<>();
|
||||||
|
queryWrapper.eq(BuriedPointFailRecord::getCorrelationId, correlationId);
|
||||||
|
BuriedPointFailRecord failRecord = buriedPointFailRecordMapper.selectOne(queryWrapper);
|
||||||
|
return failRecord!=null? failRecord.getRetryCount():0;
|
||||||
|
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("[埋点消费者] 获取消息重试次数失败: {}", e.getMessage());
|
||||||
|
throw new RuntimeException("获取消息重试次数失败", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void safeChannelAck(Channel channel, long deliveryTag) {
|
||||||
|
try {
|
||||||
|
channel.basicAck(deliveryTag, false);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[埋点消费者] 确认消息失败: {}", e.getMessage());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void safeChannelNack(Channel channel, long deliveryTag, boolean multiple, boolean requeue) {
|
||||||
|
try {
|
||||||
|
channel.basicNack(deliveryTag, multiple, requeue);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[埋点消费者] 拒绝消息失败: {}", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateRetryCount(BuriedMessages message) {
|
||||||
|
try {
|
||||||
|
BuriedPoint buriedPoint = buriedPointMapper.selectByEventId(message.getId());
|
||||||
|
if (buriedPoint != null) {
|
||||||
|
buriedPoint.setRetryCount(message.getRetryCount());
|
||||||
|
buriedPoint.setUpdateTime(new Date());
|
||||||
|
buriedPointMapper.updateById(buriedPoint);
|
||||||
|
} else {
|
||||||
|
String correlationId = String.valueOf(message.getId());
|
||||||
|
LambdaQueryWrapper<BuriedPointFailRecord> queryWrapper = new LambdaQueryWrapper<>();
|
||||||
|
queryWrapper.eq(BuriedPointFailRecord::getCorrelationId, correlationId);
|
||||||
|
BuriedPointFailRecord failRecord = buriedPointFailRecordMapper.selectOne(queryWrapper);
|
||||||
|
if (failRecord != null) {
|
||||||
|
failRecord.setRetryCount(message.getRetryCount());
|
||||||
|
failRecord.setUpdateTime(new Date());
|
||||||
|
failRecord.setMessageContent(JsonUtils.toJsonString(message));
|
||||||
|
buriedPointFailRecordMapper.updateById(failRecord);
|
||||||
|
} else {
|
||||||
|
// 记录或创建新的失败记录
|
||||||
|
log.warn("[埋点消费者] 未找到埋点记录和失败记录, 事件ID: {}, 准备创建失败记录", message.getId());
|
||||||
|
saveToFailRecord(message, "未找到原始埋点记录");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[埋点消费者] 更新重试次数失败: {}, 错误: {}", message.getId(), e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
*/
|
private void updateMessageStatus(BuriedMessages message) {
|
||||||
/**
|
try {
|
||||||
* 将埋点数据保存到数据库
|
BuriedPoint buriedPoint = buriedPointMapper.selectByEventId(message.getId());
|
||||||
*//*
|
buriedPoint.setStatus(message.getStatusCode());
|
||||||
|
buriedPoint.setUpdateTime(new Date());
|
||||||
|
buriedPoint.setRetryCount(message.getRetryCount());
|
||||||
|
buriedPointMapper.updateById(buriedPoint);
|
||||||
|
log.debug("[埋点消费者] 已更新埋点状态, 事件ID: {}, 新状态: {}, 重试次数: {}", message.getId(), message.getStatusCode(), message.getRetryCount());
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[埋点消费者] 更新埋点状态失败: {}, 错误: {}", message.getId(), e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void saveToDatabase(BuriedMessages message) {
|
private boolean saveToDatabase(BuriedMessages message) {
|
||||||
try {
|
try {
|
||||||
log.debug("[埋点消费者] 准备保存埋点数据,事件ID: {}", message.getId());
|
log.debug("[埋点消费者] 准备保存埋点数据,事件ID: {}", message.getId());
|
||||||
|
BuriedPoint existingPoint = buriedPointMapper.selectByEventId(message.getId());
|
||||||
|
if (existingPoint != null) {
|
||||||
|
existingPoint.setStatus(message.getStatusCode());
|
||||||
|
existingPoint.setUpdateTime(new Date());
|
||||||
|
if (message.getRetryCount() != null) {
|
||||||
|
int newRetryCount = Math.max(existingPoint.getRetryCount(), message.getRetryCount());
|
||||||
|
existingPoint.setRetryCount(newRetryCount);
|
||||||
|
message.setRetryCount(newRetryCount);
|
||||||
|
}
|
||||||
|
int result = buriedPointMapper.updateById(existingPoint);
|
||||||
|
return result > 0;
|
||||||
|
}
|
||||||
|
|
||||||
// 转换消息为实体
|
|
||||||
BuriedPoint buriedPoint = new BuriedPoint();
|
BuriedPoint buriedPoint = new BuriedPoint();
|
||||||
|
|
||||||
// 设置必填字段,确保不为空
|
|
||||||
buriedPoint.setEventId(message.getId());
|
buriedPoint.setEventId(message.getId());
|
||||||
buriedPoint.setEventTime(message.getEventTime());
|
buriedPoint.setEventTime(message.getEventTime());
|
||||||
|
buriedPoint.setUserId(message.getUserId());
|
||||||
// 获取真实用户ID,避免使用默认anonymous
|
buriedPoint.setEventType(message.getEventType());
|
||||||
String userId = message.getUserId();
|
|
||||||
buriedPoint.setUserId(StringUtils.hasText(userId) && !"null".equals(userId) ? userId : "anonymous");
|
|
||||||
|
|
||||||
String eventType = message.getEventType();
|
|
||||||
buriedPoint.setEventType(eventType);
|
|
||||||
buriedPoint.setService(applicationName);
|
buriedPoint.setService(applicationName);
|
||||||
|
|
||||||
// 设置method字段,确保获取真实方法名
|
|
||||||
buriedPoint.setMethod(message.getMethod());
|
buriedPoint.setMethod(message.getMethod());
|
||||||
buriedPoint.setSessionId(message.getSessionId());
|
buriedPoint.setSessionId(message.getSessionId());
|
||||||
buriedPoint.setClientIp(message.getClientIp());
|
buriedPoint.setClientIp(message.getClientIp());
|
||||||
buriedPoint.setServerIp(message.getServerIp());
|
buriedPoint.setServerIp(message.getServerIp());
|
||||||
|
buriedPoint.setStatus(message.getStatusCode());
|
||||||
// 设置其他字段
|
buriedPoint.setRetryCount(message.getRetryCount());
|
||||||
buriedPoint.setPagePath(message.getPagePath());
|
buriedPoint.setPagePath(message.getPagePath());
|
||||||
buriedPoint.setElementId(message.getElementId());
|
buriedPoint.setElementId(message.getElementId());
|
||||||
buriedPoint.setDuration(message.getDuration());
|
buriedPoint.setDuration(message.getDuration());
|
||||||
|
|
||||||
buriedPoint.setCreateTime(new Date());
|
buriedPoint.setCreateTime(new Date());
|
||||||
|
buriedPoint.setUpdateTime(new Date());
|
||||||
log.debug("[埋点消费者] 埋点实体数据: eventId={}, eventType={}, userId={}, service={}, method={}",
|
log.debug("[埋点消费者] 埋点实体数据: eventId={}, eventType={}, userId={}, service={}, method={}, status={}, retryCount={}", buriedPoint.getEventId(), buriedPoint.getEventType(), buriedPoint.getUserId(), buriedPoint.getService(), buriedPoint.getMethod(), buriedPoint.getStatus(), buriedPoint.getRetryCount());
|
||||||
buriedPoint.getEventId(), buriedPoint.getEventType(),
|
buriedPointMapper.insert(buriedPoint);
|
||||||
buriedPoint.getUserId(), buriedPoint.getService(), buriedPoint.getMethod());
|
log.info("[埋点消费者] 埋点数据已保存到数据库, 事件ID: {}, 状态: {}", message.getId(), message.getStatusCode());
|
||||||
|
return true;
|
||||||
int result = buriedPointMapper.insert(buriedPoint);
|
|
||||||
|
|
||||||
log.info("[埋点消费者] 埋点数据已保存到数据库, 事件ID: {}, 影响行数: {}", message.getId(), result);
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[埋点消费者] 保存埋点数据到数据库失败, 事件ID: {}, 错误: {}",
|
log.error("[埋点消费者] 保存埋点数据到数据库失败, 事件ID: {}, 错误: {}", message.getId(), e.getMessage(), e);
|
||||||
message.getId(), e.getMessage(), e);
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} */
|
|
||||||
|
/**
|
||||||
|
* 保存失败记录到BuriedPointFailRecord表
|
||||||
|
*/
|
||||||
|
private void saveToFailRecord(BuriedMessages message, String cause) {
|
||||||
|
try {
|
||||||
|
String correlationId = String.valueOf(message.getId());
|
||||||
|
LambdaQueryWrapper<BuriedPointFailRecord> queryWrapper = new LambdaQueryWrapper<>();
|
||||||
|
queryWrapper.eq(BuriedPointFailRecord::getCorrelationId, correlationId);
|
||||||
|
BuriedPointFailRecord existingRecord = buriedPointFailRecordMapper.selectOne(queryWrapper);
|
||||||
|
|
||||||
|
if (existingRecord != null) {
|
||||||
|
log.info("[埋点消费者] 发现已有失败记录,将更新: {}", correlationId);
|
||||||
|
existingRecord.setExchange(BuriedMessages.EXCHANGE);
|
||||||
|
existingRecord.setRoutingKey(BuriedMessages.ROUTING_KEY);
|
||||||
|
existingRecord.setCause(cause);
|
||||||
|
existingRecord.setMessageContent(JsonUtils.toJsonString(message));
|
||||||
|
existingRecord.setRetryCount(message.getRetryCount());
|
||||||
|
existingRecord.setStatus(BuriedPointFailRecord.STATUS_UNPROCESSED);
|
||||||
|
existingRecord.setUpdateTime(new Date());
|
||||||
|
buriedPointFailRecordMapper.updateById(existingRecord);
|
||||||
|
log.info("[埋点消费者] 已更新失败记录, 事件ID: {}", correlationId);
|
||||||
|
} else {
|
||||||
|
BuriedPointFailRecord failRecord = new BuriedPointFailRecord();
|
||||||
|
failRecord.setCorrelationId(correlationId);
|
||||||
|
failRecord.setExchange(BuriedMessages.EXCHANGE);
|
||||||
|
failRecord.setRoutingKey(BuriedMessages.ROUTING_KEY);
|
||||||
|
failRecord.setCause(cause);
|
||||||
|
failRecord.setMessageContent(JsonUtils.toJsonString(message));
|
||||||
|
failRecord.setRetryCount(message.getRetryCount());
|
||||||
|
failRecord.setStatus(BuriedPointFailRecord.STATUS_UNPROCESSED);
|
||||||
|
failRecord.setCreateTime(new Date());
|
||||||
|
failRecord.setUpdateTime(new Date());
|
||||||
|
buriedPointFailRecordMapper.insert(failRecord);
|
||||||
|
log.info("[埋点消费者] 已将失败消息保存到失败记录表, 事件ID: {}", message.getId());
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[埋点消费者] 保存失败记录失败: {}, 错误: {}", message.getId(), e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,56 +0,0 @@
|
|||||||
package com.tashow.cloud.app.mq.docs;
|
|
||||||
|
|
||||||
import com.tashow.cloud.app.mq.annotation.BuriedPoint;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 埋点使用示例
|
|
||||||
*/
|
|
||||||
@Service
|
|
||||||
public class BuriedPointExample {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 使用埋点注解标记的方法
|
|
||||||
* 系统会自动记录该方法的调用情况
|
|
||||||
*/
|
|
||||||
@BuriedPoint(
|
|
||||||
eventType = "BUSINESS_OPERATION",
|
|
||||||
description = "处理业务数据",
|
|
||||||
recordParams = true,
|
|
||||||
recordResult = true
|
|
||||||
)
|
|
||||||
public String processBusiness(String businessId, String operationType) {
|
|
||||||
// 业务处理逻辑
|
|
||||||
return "处理完成: " + businessId;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 如何使用埋点系统:
|
|
||||||
*
|
|
||||||
* 1. 静默埋点 - 无需任何代码
|
|
||||||
* 系统会自动对所有API请求进行埋点,记录请求的开始和结束信息
|
|
||||||
* 包括: 请求时间、路径、参数、响应时间、状态码等
|
|
||||||
*
|
|
||||||
* 2. 注解埋点 - 针对特定方法
|
|
||||||
* 在需要埋点的方法上添加 @BuriedPoint 注解
|
|
||||||
* 可以设置事件类型、描述,以及是否记录参数和返回结果
|
|
||||||
*
|
|
||||||
* 3. 埋点数据流转
|
|
||||||
* 所有埋点数据会通过RabbitMQ发送到消息队列
|
|
||||||
* 可以在消费者端实现数据的存储和分析
|
|
||||||
*
|
|
||||||
* 4. 数据字段说明
|
|
||||||
* id: 埋点事件唯一ID
|
|
||||||
* eventTime: 事件发生时间
|
|
||||||
* service: 服务名称
|
|
||||||
* method: 方法/接口
|
|
||||||
* userId: 用户标识
|
|
||||||
* clientIp: 客户端IP
|
|
||||||
* eventType: 事件类型
|
|
||||||
* duration: 操作耗时
|
|
||||||
* extraData: 额外数据(参数、结果等)
|
|
||||||
*/
|
|
||||||
public void howToUse() {
|
|
||||||
// 示例方法
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,220 +0,0 @@
|
|||||||
# 后端静默埋点实现方案(基于RabbitMQ)
|
|
||||||
|
|
||||||
后端静默埋点是指不依赖前端代码,完全由后端服务收集用户行为和应用性能数据的方案。以下是使用RabbitMQ实现后端静默埋点的详细方案:
|
|
||||||
|
|
||||||
## 一、方案架构
|
|
||||||
|
|
||||||
[业务服务] → [RabbitMQ] → [埋点消费服务] → [数据分析存储]
|
|
||||||
|
|
||||||
## 二、具体实现步骤
|
|
||||||
|
|
||||||
### 1. 埋点数据收集
|
|
||||||
|
|
||||||
在业务服务中需要埋点的位置添加消息发送代码:
|
|
||||||
|
|
||||||
```java
|
|
||||||
// Java示例使用Spring AMQP
|
|
||||||
@Autowired
|
|
||||||
private RabbitTemplate rabbitTemplate;
|
|
||||||
|
|
||||||
public void someBusinessMethod(User user, Action action) {
|
|
||||||
try {
|
|
||||||
// 构造埋点数据
|
|
||||||
Map<String, Object> trackingData = new HashMap<>();
|
|
||||||
trackingData.put("event_time", System.currentTimeMillis());
|
|
||||||
trackingData.put("user_id", user.getId());
|
|
||||||
trackingData.put("action", action.name());
|
|
||||||
trackingData.put("service", "order-service");
|
|
||||||
trackingData.put("ip", getClientIp());
|
|
||||||
|
|
||||||
// 发送到RabbitMQ
|
|
||||||
rabbitTemplate.convertAndSend(
|
|
||||||
"tracking.exchange",
|
|
||||||
"tracking.route",
|
|
||||||
trackingData
|
|
||||||
);
|
|
||||||
} catch (Exception e) {
|
|
||||||
// 埋点异常不应影响主流程
|
|
||||||
log.error("Tracking data send failed", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
### 2. RabbitMQ配置
|
|
||||||
|
|
||||||
#### 创建专用Exchange和Queue
|
|
||||||
|
|
||||||
```yaml
|
|
||||||
# application.yml配置示例
|
|
||||||
spring:
|
|
||||||
rabbitmq:
|
|
||||||
host: rabbitmq-server
|
|
||||||
port: 5672
|
|
||||||
username: guest
|
|
||||||
password: guest
|
|
||||||
template:
|
|
||||||
exchange: tracking.exchange
|
|
||||||
listener:
|
|
||||||
simple:
|
|
||||||
acknowledge-mode: auto
|
|
||||||
```
|
|
||||||
|
|
||||||
#### 声明Exchange和Queue
|
|
||||||
|
|
||||||
```java
|
|
||||||
@Configuration
|
|
||||||
public class RabbitMQConfig {
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public Exchange trackingExchange() {
|
|
||||||
return ExchangeBuilder.directExchange("tracking.exchange")
|
|
||||||
.durable(true)
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public Queue trackingQueue() {
|
|
||||||
return QueueBuilder.durable("tracking.queue")
|
|
||||||
.withArgument("x-message-ttl", 86400000) // 消息存活时间
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public Binding trackingBinding() {
|
|
||||||
return BindingBuilder.bind(trackingQueue())
|
|
||||||
.to(trackingExchange())
|
|
||||||
.with("tracking.route")
|
|
||||||
.noargs();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
### 3. 埋点数据处理服务
|
|
||||||
|
|
||||||
创建专门的消费者服务处理埋点数据:
|
|
||||||
|
|
||||||
```java
|
|
||||||
@Component
|
|
||||||
public class TrackingDataConsumer {
|
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(TrackingDataConsumer.class);
|
|
||||||
|
|
||||||
@RabbitListener(queues = "tracking.queue")
|
|
||||||
public void processTrackingData(Map<String, Object> trackingData) {
|
|
||||||
try {
|
|
||||||
// 1. 数据校验和补全
|
|
||||||
validateAndEnrichData(trackingData);
|
|
||||||
|
|
||||||
// 2. 数据格式化
|
|
||||||
TrackingEvent event = convertToEvent(trackingData);
|
|
||||||
|
|
||||||
// 3. 数据存储或转发
|
|
||||||
saveToDatabase(event);
|
|
||||||
// 或 sendToAnalyticsPlatform(event);
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("Process tracking data failed: {}", trackingData, e);
|
|
||||||
// 可根据需要加入重试或死信队列逻辑
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void validateAndEnrichData(Map<String, Object> data) {
|
|
||||||
if (!data.containsKey("user_id")) {
|
|
||||||
data.put("user_id", "anonymous");
|
|
||||||
}
|
|
||||||
// 补充服务IP等信息
|
|
||||||
data.put("server_ip", getServerIp());
|
|
||||||
}
|
|
||||||
|
|
||||||
// 其他辅助方法...
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
### 4. 高级配置
|
|
||||||
|
|
||||||
#### 消息可靠性保证
|
|
||||||
|
|
||||||
```java
|
|
||||||
// 发送端确认回调
|
|
||||||
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
|
|
||||||
if (!ack) {
|
|
||||||
log.warn("Tracking message lost, cause: {}", cause);
|
|
||||||
// 可加入本地缓存或日志,后续重发
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// 返回回调(路由失败)
|
|
||||||
rabbitTemplate.setReturnsCallback(returned -> {
|
|
||||||
log.warn("Tracking message returned: {}", returned.getMessage());
|
|
||||||
});
|
|
||||||
```
|
|
||||||
|
|
||||||
#### 批量发送优化
|
|
||||||
|
|
||||||
```java
|
|
||||||
// 使用BatchingRabbitTemplate提高性能
|
|
||||||
@Bean
|
|
||||||
public BatchingRabbitTemplate batchingRabbitTemplate(
|
|
||||||
RabbitTemplate rabbitTemplate,
|
|
||||||
SimpleBatchingStrategy strategy) {
|
|
||||||
|
|
||||||
return new BatchingRabbitTemplate(strategy);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public SimpleBatchingStrategy batchingStrategy() {
|
|
||||||
return new SimpleBatchingStrategy(100, 1024 * 1024, 5000);
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
## 三、埋点数据设计建议
|
|
||||||
|
|
||||||
### 1. 通用字段
|
|
||||||
|
|
||||||
event_id: 事件唯一ID
|
|
||||||
|
|
||||||
event_time: 事件时间戳
|
|
||||||
|
|
||||||
service: 服务名称
|
|
||||||
|
|
||||||
method: 方法/接口
|
|
||||||
|
|
||||||
user_id: 用户标识
|
|
||||||
|
|
||||||
session_id: 会话标识
|
|
||||||
|
|
||||||
client_ip: 客户端IP
|
|
||||||
|
|
||||||
server_ip: 服务器IP
|
|
||||||
|
|
||||||
### 2. 业务字段
|
|
||||||
|
|
||||||
action: 操作类型
|
|
||||||
|
|
||||||
target: 操作目标
|
|
||||||
|
|
||||||
params: 业务参数
|
|
||||||
|
|
||||||
result: 操作结果
|
|
||||||
|
|
||||||
cost_time: 耗时(ms)
|
|
||||||
|
|
||||||
## 四、性能优化建议
|
|
||||||
|
|
||||||
异步发送:确保埋点发送不阻塞主业务流程
|
|
||||||
|
|
||||||
本地缓存:网络异常时先缓存到本地,后续重发
|
|
||||||
|
|
||||||
采样率:高流量场景可配置采样率
|
|
||||||
|
|
||||||
消息压缩:大数据量时可启用消息压缩
|
|
||||||
|
|
||||||
独立集群:埋点使用独立的RabbitMQ集群,避免影响业务消息
|
|
||||||
|
|
||||||
## 五、监控与维护
|
|
||||||
|
|
||||||
监控RabbitMQ队列积压情况
|
|
||||||
|
|
||||||
设置埋点数据处理的延迟告警
|
|
||||||
|
|
||||||
定期审计埋点数据的完整性和准确性
|
|
||||||
@@ -1,10 +0,0 @@
|
|||||||
package com.tashow.cloud.app.mq.mapper;
|
|
||||||
|
|
||||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
|
||||||
import com.tashow.cloud.app.mq.model.BuriedPoint;
|
|
||||||
import org.apache.ibatis.annotations.Mapper;
|
|
||||||
|
|
||||||
|
|
||||||
@Mapper
|
|
||||||
public interface BuriedPointMapper extends BaseMapper<BuriedPoint> {
|
|
||||||
}
|
|
||||||
@@ -7,13 +7,12 @@ import java.util.Map;
|
|||||||
@Data
|
@Data
|
||||||
public class BuriedMessages implements Serializable {
|
public class BuriedMessages implements Serializable {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 1L; // 添加序列化ID
|
||||||
|
|
||||||
// 消息队列配置
|
// 消息队列配置
|
||||||
public static final String QUEUE = "BURIED_POINT_QUEUE";
|
public static final String QUEUE = "BURIED_POINT_QUEUE";
|
||||||
public static final String EXCHANGE = "BURIED_POINT_EXCHANGE";
|
public static final String EXCHANGE = "BURIED_POINT_EXCHANGE";
|
||||||
public static final String ROUTING_KEY = "BURIED_POINT_ROUTING_KEY";
|
public static final String ROUTING_KEY = "BURIED_POINT_ROUTING_KEY";
|
||||||
public static final String DEAD_LETTER_EXCHANGE = "DEAD_LETTER_EXCHANGE";
|
|
||||||
public static final String DEAD_LETTER_ROUTING_KEY = "DEAD_LETTER_ROUTING";
|
|
||||||
public static final String DEAD_LETTER_QUEUE = "DEAD_LETTER_QUEUE";
|
|
||||||
|
|
||||||
// 状态码定义
|
// 状态码定义
|
||||||
public static final Integer STATUS_INIT = 10; // 初始状态
|
public static final Integer STATUS_INIT = 10; // 初始状态
|
||||||
@@ -22,7 +21,7 @@ public class BuriedMessages implements Serializable {
|
|||||||
public static final Integer STATUS_WARNING = 40; // 处理警告
|
public static final Integer STATUS_WARNING = 40; // 处理警告
|
||||||
public static final Integer STATUS_ERROR = 50; // 处理错误
|
public static final Integer STATUS_ERROR = 50; // 处理错误
|
||||||
|
|
||||||
// 通用字段
|
|
||||||
private Integer id; // 事件唯一ID
|
private Integer id; // 事件唯一ID
|
||||||
private Long eventTime; // 事件时间戳
|
private Long eventTime; // 事件时间戳
|
||||||
private String service; // 服务名称
|
private String service; // 服务名称
|
||||||
@@ -32,7 +31,6 @@ public class BuriedMessages implements Serializable {
|
|||||||
private String clientIp; // 客户端IP
|
private String clientIp; // 客户端IP
|
||||||
private String serverIp; // 服务器IP
|
private String serverIp; // 服务器IP
|
||||||
|
|
||||||
// 添加埋点特定字段
|
|
||||||
private String eventType; // 事件类型: PAGE_VIEW, API_CALL, BUTTON_CLICK 等
|
private String eventType; // 事件类型: PAGE_VIEW, API_CALL, BUTTON_CLICK 等
|
||||||
private String pagePath; // 页面路径/功能模块
|
private String pagePath; // 页面路径/功能模块
|
||||||
private String elementId; // 元素标识
|
private String elementId; // 元素标识
|
||||||
@@ -41,26 +39,11 @@ public class BuriedMessages implements Serializable {
|
|||||||
private String userAgent; // 用户代理信息
|
private String userAgent; // 用户代理信息
|
||||||
private Integer statusCode; // 响应状态码
|
private Integer statusCode; // 响应状态码
|
||||||
private String errorMessage; // 错误信息
|
private String errorMessage; // 错误信息
|
||||||
|
private Integer retryCount = 0; // 重试次数计数器,默认0
|
||||||
|
|
||||||
// 扩展字段,用于存储特定事件的额外数据
|
|
||||||
private Map<String, Object> extraData = new HashMap<>();
|
private Map<String, Object> extraData = new HashMap<>();
|
||||||
|
|
||||||
/**
|
|
||||||
* 快速创建消息的便捷方法
|
|
||||||
*/
|
|
||||||
public static BuriedMessages create(String userId, String eventType, String pagePath) {
|
|
||||||
BuriedMessages msg = new BuriedMessages();
|
|
||||||
msg.setUserId(userId);
|
|
||||||
msg.setEventType(eventType);
|
|
||||||
msg.setPagePath(pagePath);
|
|
||||||
msg.setEventTime(System.currentTimeMillis());
|
|
||||||
msg.setStatusCode(STATUS_INIT); // 默认初始状态
|
|
||||||
return msg;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 添加扩展数据
|
|
||||||
*/
|
|
||||||
public BuriedMessages addExtraData(String key, Object value) {
|
public BuriedMessages addExtraData(String key, Object value) {
|
||||||
if (this.extraData == null) {
|
if (this.extraData == null) {
|
||||||
this.extraData = new HashMap<>();
|
this.extraData = new HashMap<>();
|
||||||
@@ -68,4 +51,17 @@ public class BuriedMessages implements Serializable {
|
|||||||
this.extraData.put(key, value);
|
this.extraData.put(key, value);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 增加重试计数
|
||||||
|
*/
|
||||||
|
public void incrementRetryCount() {
|
||||||
|
if (this.retryCount == null) {
|
||||||
|
this.retryCount = 0;
|
||||||
|
}
|
||||||
|
this.retryCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,14 +1,13 @@
|
|||||||
package com.tashow.cloud.app.mq.producer.buriedPoint;
|
package com.tashow.cloud.app.mq.producer.buriedPoint;
|
||||||
import com.tashow.cloud.app.mq.message.BuriedMessages;
|
import com.tashow.cloud.app.mq.message.BuriedMessages;
|
||||||
|
import com.tashow.cloud.app.mapper.BuriedPointMapper;
|
||||||
|
import com.tashow.cloud.common.util.json.JsonUtils;
|
||||||
import lombok.SneakyThrows;
|
import lombok.SneakyThrows;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.amqp.rabbit.connection.CorrelationData;
|
import org.springframework.amqp.rabbit.connection.CorrelationData;
|
||||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.scheduling.annotation.Async;
|
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import java.util.concurrent.CompletableFuture;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -16,34 +15,49 @@ import java.util.UUID;
|
|||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Component
|
@Component
|
||||||
public class BuriedPointProducer {
|
public class BuriedPointProducer implements RabbitTemplate.ConfirmCallback {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private RabbitTemplate rabbitTemplate;
|
private RabbitTemplate rabbitTemplate;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private BuriedPointMapper buriedPointMapper;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 异步发送完整的埋点消息,并确保消息已被broker接收
|
* 异步发送完整的埋点消息(生成新的correlationId)
|
||||||
*/
|
*/
|
||||||
@SneakyThrows
|
@SneakyThrows
|
||||||
public void asyncSendMessage(BuriedMessages message) {
|
public void asyncSendMessage(BuriedMessages message) {
|
||||||
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
|
String correlationId = UUID.randomUUID().toString();
|
||||||
// final CompletableFuture<Boolean> confirmFuture = new CompletableFuture<>();
|
asyncSendMessage(message, correlationId);
|
||||||
log.info("[埋点] 异步准备发送消息: {}", message);
|
|
||||||
correlationData.getFuture().whenComplete((confirm, ex) -> {
|
|
||||||
log.info("[埋点] 异步消息发送确认回调: {}", message);
|
|
||||||
if (ex != null) {
|
|
||||||
log.error("[埋点] 异步消息发送异常: {}", ex.getMessage(), ex);
|
|
||||||
// confirmFuture.completeExceptionally(ex);
|
|
||||||
} else if (confirm != null && confirm.isAck()) {
|
|
||||||
log.info("[埋点] 异步消息发送成功: {}", message);
|
|
||||||
// confirmFuture.complete(true);
|
|
||||||
} else {
|
|
||||||
log.warn("[埋点] 异步消息发送未被ACK");
|
|
||||||
//confirmFuture.complete(false);
|
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
/**
|
||||||
|
* 异步发送完整的埋点消息(使用指定的correlationId)
|
||||||
|
* 用于重试场景,保持原有的correlationId
|
||||||
|
*/
|
||||||
|
@SneakyThrows
|
||||||
|
public void asyncSendMessage(BuriedMessages message, String correlationId) {
|
||||||
|
log.info("[埋点] 异步准备发送消息: {}, correlationId: {}", message, correlationId);
|
||||||
|
String messageJson = JsonUtils.toJsonString(message);
|
||||||
|
|
||||||
|
CustomCorrelationData correlationData = new CustomCorrelationData(correlationId, messageJson);
|
||||||
|
|
||||||
rabbitTemplate.convertAndSend(BuriedMessages.EXCHANGE, BuriedMessages.ROUTING_KEY, message, correlationData);
|
rabbitTemplate.convertAndSend(BuriedMessages.EXCHANGE, BuriedMessages.ROUTING_KEY, message, correlationData);
|
||||||
log.info("[埋点] 异步消息发送完成: {}", message);
|
log.info("[埋点] 异步消息发送完成: {}, 状态: {}, 重试次数: {}, correlationId: {}",
|
||||||
// return null;
|
message.getId(), message.getStatusCode(), message.getRetryCount(), correlationId);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 确认消息是否成功发送到Broker的回调方法
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
|
||||||
|
if (ack) {
|
||||||
|
log.info("[埋点] 消息发送确认成功: {}", correlationData.getId());
|
||||||
|
} else {
|
||||||
|
log.error("[埋点] 消息发送确认失败: {}, 原因: {}", correlationData.getId(), cause);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -20,7 +20,6 @@ public class SecurityConfiguration {
|
|||||||
@Bean("infraAuthorizeRequestsCustomizer")
|
@Bean("infraAuthorizeRequestsCustomizer")
|
||||||
public AuthorizeRequestsCustomizer authorizeRequestsCustomizer() {
|
public AuthorizeRequestsCustomizer authorizeRequestsCustomizer() {
|
||||||
return new AuthorizeRequestsCustomizer() {
|
return new AuthorizeRequestsCustomizer() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void customize(AuthorizeHttpRequestsConfigurer<HttpSecurity>.AuthorizationManagerRequestMatcherRegistry registry) {
|
public void customize(AuthorizeHttpRequestsConfigurer<HttpSecurity>.AuthorizationManagerRequestMatcherRegistry registry) {
|
||||||
// Swagger 接口文档
|
// Swagger 接口文档
|
||||||
|
|||||||
Reference in New Issue
Block a user