diff --git a/tashow-gateway/pom.xml b/tashow-gateway/pom.xml index 14e52be..be7d00e 100644 --- a/tashow-gateway/pom.xml +++ b/tashow-gateway/pom.xml @@ -45,23 +45,6 @@ spring-cloud-starter-loadbalancer - - cn.dev33 - sa-token-reactor-spring-boot3-starter - - - - - - cn.dev33 - sa-token-redis-jackson - 1.42.0 - - - org.apache.commons - commons-pool2 - - com.alibaba.cloud diff --git a/tashow-gateway/src/main/java/com/tashow/cloud/gateway/filter/security/SaTokenConfigure.java b/tashow-gateway/src/main/java/com/tashow/cloud/gateway/filter/security/SaTokenConfigure.java deleted file mode 100644 index b6384b1..0000000 --- a/tashow-gateway/src/main/java/com/tashow/cloud/gateway/filter/security/SaTokenConfigure.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.tashow.cloud.gateway.filter.security; - -import cn.dev33.satoken.reactor.filter.SaReactorFilter; -import cn.dev33.satoken.router.SaRouter; -import cn.dev33.satoken.stp.StpUtil; -import cn.dev33.satoken.util.SaResult; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -/** - * [Sa-Token 权限认证] 配置类 - * @author click33 - */ -@Configuration -public class SaTokenConfigure { - // 注册 Sa-Token全局过滤器 - @Bean - public SaReactorFilter getSaReactorFilter() { - return new SaReactorFilter() - // 拦截地址 - .addInclude("/**") /* 拦截全部path */ - // 开放地址 - .addExclude("/favicon.ico") - // 鉴权方法:每次访问进入 - .setAuth(obj -> { - // 登录校验 -- 拦截所有路由,并排除/user/doLogin 用于开放登录 - SaRouter.match("/**", "/user/doLogin", r -> StpUtil.checkLogin()); - - // 权限认证 -- 不同模块, 校验不同权限 - SaRouter.match("/user/**", r -> StpUtil.checkPermission("user")); - SaRouter.match("/admin/**", r -> StpUtil.checkPermission("admin")); - SaRouter.match("/goods/**", r -> StpUtil.checkPermission("goods")); - SaRouter.match("/orders/**", r -> StpUtil.checkPermission("orders")); - - // 更多匹配 ... */ - }) - // 异常处理方法:每次setAuth函数出现异常时进入 - .setError(e -> { - return SaResult.error(e.getMessage()); - }) - ; - } -} diff --git a/tashow-gateway/src/main/java/com/tashow/cloud/gateway/filter/security/StpInterfaceImpl.java b/tashow-gateway/src/main/java/com/tashow/cloud/gateway/filter/security/StpInterfaceImpl.java deleted file mode 100644 index d687f3d..0000000 --- a/tashow-gateway/src/main/java/com/tashow/cloud/gateway/filter/security/StpInterfaceImpl.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.tashow.cloud.gateway.filter.security; - -import cn.dev33.satoken.stp.StpInterface; -import org.springframework.stereotype.Component; - -import java.util.ArrayList; -import java.util.List; - -/** - * 自定义权限验证接口扩展 - */ -@Component -public class StpInterfaceImpl implements StpInterface { - - @Override - public List getPermissionList(Object loginId, String loginType) { - // 本 list 仅做模拟,实际项目中要根据具体业务逻辑来查询权限 - List list = new ArrayList(); - list.add("101"); - list.add("user.add"); - list.add("user.update"); - list.add("user.get"); - // list.add("user.delete"); - list.add("art.*"); - return list; - } - - @Override - public List getRoleList(Object loginId, String loginType) { - // 本 list 仅做模拟,实际项目中要根据具体业务逻辑来查询角色 - List list = new ArrayList(); - list.add("admin"); - list.add("super-admin"); - return list; - } - -} diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/AppServerApplication.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/AppServerApplication.java index 24f14a9..a74dc76 100644 --- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/AppServerApplication.java +++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/AppServerApplication.java @@ -2,12 +2,14 @@ package com.tashow.cloud.app; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableScheduling; /** * Hello world! * */ @SpringBootApplication +@EnableScheduling public class AppServerApplication { public static void main(String[] args) { diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/controller/TestController.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/controller/TestController.java index 83c962c..35ce5c5 100644 --- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/controller/TestController.java +++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/controller/TestController.java @@ -1,14 +1,10 @@ package com.tashow.cloud.app.controller; - -import com.tashow.cloud.app.mq.annotation.BuriedPoint; -import com.tashow.cloud.app.mq.mapper.BuriedPointMapper; -import com.tashow.cloud.app.mq.message.BuriedMessages; +import com.tashow.cloud.app.mapper.BuriedPointMapper; import com.tashow.cloud.app.mq.producer.buriedPoint.BuriedPointProducer; import jakarta.annotation.security.PermitAll; import lombok.RequiredArgsConstructor; import org.springframework.web.bind.annotation.*; -import java.util.Date; import java.util.HashMap; import java.util.Map; diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/interceptor/BuriedPointInterceptor.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/interceptor/BuriedPointInterceptor.java similarity index 92% rename from tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/interceptor/BuriedPointInterceptor.java rename to tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/interceptor/BuriedPointInterceptor.java index 50b0e98..5be550d 100644 --- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/interceptor/BuriedPointInterceptor.java +++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/interceptor/BuriedPointInterceptor.java @@ -1,4 +1,4 @@ -package com.tashow.cloud.app.mq.interceptor; +package com.tashow.cloud.app.interceptor; import cn.hutool.core.util.IdUtil; import com.tashow.cloud.app.mq.message.BuriedMessages; @@ -37,22 +37,18 @@ public class BuriedPointInterceptor implements HandlerInterceptor { } try { - // 开始计时 StopWatch stopWatch = new StopWatch(); stopWatch.start(); request.setAttribute(ATTRIBUTE_STOPWATCH, stopWatch); - // 生成请求ID int requestId = (int)(Math.abs(IdUtil.getSnowflakeNextId()) % Integer.MAX_VALUE); request.setAttribute(ATTRIBUTE_REQUEST_ID, requestId); - - // 收集埋点数据 HandlerMethod handlerMethod = (HandlerMethod) handler; String method = request.getMethod() + " " + request.getRequestURI()+ JsonUtils.toJsonString(request.getParameterMap()); String controllerName = handlerMethod.getBeanType().getSimpleName(); String actionName = handlerMethod.getMethod().getName(); - // 创建埋点消息 + BuriedMessages message = new BuriedMessages(); message.setId(requestId); message.setEventTime(System.currentTimeMillis()); @@ -65,7 +61,7 @@ public class BuriedPointInterceptor implements HandlerInterceptor { message.setEventType("API_REQUEST_START"); message.setPagePath(controllerName + "#" + actionName); message.setUserAgent(request.getHeader("User-Agent")); - message.setStatusCode(BuriedMessages.STATUS_INIT); + message.setStatusCode(BuriedMessages.STATUS_PROCESSING); buriedPointProducer.asyncSendMessage(message); if (log.isDebugEnabled()) { log.debug("[埋点] 收集请求开始数据: {}", message); @@ -73,7 +69,6 @@ public class BuriedPointInterceptor implements HandlerInterceptor { } catch (Exception e) { log.warn("[埋点] 埋点数据收集异常", e); } - return true; } @@ -86,7 +81,6 @@ public class BuriedPointInterceptor implements HandlerInterceptor { if (userAttribute != null) { return userAttribute.toString(); } - // 返回匿名用户标识 return "anonymous"; } diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mapper/BuriedPointMapper.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mapper/BuriedPointMapper.java new file mode 100644 index 0000000..1aa5918 --- /dev/null +++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mapper/BuriedPointMapper.java @@ -0,0 +1,14 @@ +package com.tashow.cloud.app.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.tashow.cloud.app.model.BuriedPoint; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; + +@Mapper +public interface BuriedPointMapper extends BaseMapper { + + @Select("SELECT * FROM app_burying WHERE event_id = #{eventId} LIMIT 1") + BuriedPoint selectByEventId(@Param("eventId") Integer eventId); +} \ No newline at end of file diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/model/BuriedPoint.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/model/BuriedPoint.java similarity index 93% rename from tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/model/BuriedPoint.java rename to tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/model/BuriedPoint.java index d6e91ca..ad5b745 100644 --- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/model/BuriedPoint.java +++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/model/BuriedPoint.java @@ -1,4 +1,4 @@ -package com.tashow.cloud.app.mq.model; +package com.tashow.cloud.app.model; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; @@ -106,4 +106,10 @@ public class BuriedPoint { @TableField(value = "status") private Integer status; + + /** + * 重试次数 + */ + @TableField(value = "retry_count") + private Integer retryCount; } \ No newline at end of file diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/annotation/BuriedPoint.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/annotation/BuriedPoint.java deleted file mode 100644 index c415aa0..0000000 --- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/annotation/BuriedPoint.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.tashow.cloud.app.mq.annotation; - -import java.lang.annotation.*; - -/** - * 埋点注解 - * 用于标记需要特殊埋点的方法或接口 - */ -@Target({ElementType.METHOD}) -@Retention(RetentionPolicy.RUNTIME) -@Documented -public @interface BuriedPoint { - - /** - * 埋点事件类型 - */ - String eventType() default ""; - - /** - * 埋点描述信息 - */ - String description() default ""; - - /** - * 是否记录方法入参 - */ - boolean recordParams() default false; - - /** - * 是否记录返回结果 - */ - boolean recordResult() default false; -} \ No newline at end of file diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/annotation/BuryPoint.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/annotation/BuryPoint.java deleted file mode 100644 index 1f88dd1..0000000 --- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/annotation/BuryPoint.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.tashow.cloud.app.mq.annotation; - -import java.lang.annotation.*; - -/** - * 埋点注解 - * 用于标记需要埋点的方法 - */ -@Target(ElementType.METHOD) -@Retention(RetentionPolicy.RUNTIME) -@Documented -public @interface BuryPoint { - - /** - * 事件类型 - */ - String eventType() default "METHOD"; - - /** - * 页面路径/功能模块 - */ - String pagePath() default ""; - - /** - * 描述信息 - */ - String description() default ""; -} \ No newline at end of file diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/aspect/BuriedPointAspect.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/aspect/BuriedPointAspect.java deleted file mode 100644 index f3cd7ea..0000000 --- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/aspect/BuriedPointAspect.java +++ /dev/null @@ -1,223 +0,0 @@ -package com.tashow.cloud.app.mq.aspect; - -import cn.hutool.core.util.IdUtil; -import cn.hutool.json.JSONUtil; -import com.tashow.cloud.app.mq.annotation.BuriedPoint; -import com.tashow.cloud.app.mq.message.BuriedMessages; -import com.tashow.cloud.app.mq.producer.buriedPoint.BuriedPointProducer; -import com.tashow.cloud.common.util.servlet.ServletUtils; -import com.tashow.cloud.common.util.spring.SpringUtils; -import jakarta.servlet.http.HttpServletRequest; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.aspectj.lang.ProceedingJoinPoint; -import org.aspectj.lang.annotation.Around; -import org.aspectj.lang.annotation.Aspect; -import org.aspectj.lang.annotation.Pointcut; -import org.aspectj.lang.reflect.MethodSignature; -import org.springframework.core.annotation.Order; -import org.springframework.stereotype.Component; -import org.springframework.web.context.request.RequestContextHolder; -import org.springframework.web.context.request.ServletRequestAttributes; - -import java.lang.reflect.Method; -import java.net.InetAddress; -import java.net.UnknownHostException; - -/** - * 埋点切面 - * 处理自定义埋点注解 - */ -@Aspect -@Component -@Order(10) -@Slf4j -@RequiredArgsConstructor -public class BuriedPointAspect { -/* - private final BuriedPointProducer buriedPointProducer; - - @Pointcut("@annotation(com.tashow.cloud.app.mq.annotation.BuriedPoint)") - public void buriedPointCut() { - } - - @Around("buriedPointCut()") - public Object around(ProceedingJoinPoint point) throws Throwable { - // 获取方法签名 - MethodSignature signature = (MethodSignature) point.getSignature(); - Method method = signature.getMethod(); - - // 获取注解 - BuriedPoint buriedPoint = method.getAnnotation(BuriedPoint.class); - if (buriedPoint == null) { - return point.proceed(); - } - - // 生成埋点ID - int buriedId = (int)(Math.abs(IdUtil.getSnowflakeNextId()) % Integer.MAX_VALUE); - - // 记录开始埋点 - recordMethodStart(point, method, buriedPoint, buriedId); - - long startTime = System.currentTimeMillis(); - Object result = null; - Exception exception = null; - try { - // 执行原方法 - result = point.proceed(); - return result; - } catch (Exception e) { - exception = e; - throw e; - } finally { - long duration = System.currentTimeMillis() - startTime; - // 记录结束埋点 - recordMethodEnd(point, method, buriedPoint, result, exception, duration, buriedId); - } - } - - *//** - * 记录方法开始埋点 - *//* - private void recordMethodStart(ProceedingJoinPoint point, Method method, BuriedPoint buriedPoint, int buriedId) { - try { - // 获取类和方法名 - String className = point.getTarget().getClass().getSimpleName(); - String methodName = method.getName(); - - // 创建埋点消息 - BuriedMessages message = new BuriedMessages(); - message.setId(buriedId); - message.setEventTime(System.currentTimeMillis()); - message.setService(SpringUtils.getApplicationName()); - message.setMethod(className + "." + methodName); - message.setEventType(buriedPoint.eventType().isEmpty() ? "METHOD_START" : buriedPoint.eventType() + "_START"); - message.setPagePath(className + "#" + methodName); - - // 设置用户ID和其他信息 - setRequestInfo(message); - - // 如果需要记录入参 - if (buriedPoint.recordParams() && point.getArgs() != null && point.getArgs().length > 0) { - // 限制参数长度,避免埋点数据过大 - String params = JSONUtil.toJsonStr(point.getArgs()); - if (params.length() > 500) { - params = params.substring(0, 500) + "..."; - } - message.addExtraData("params", params); - } - - // 记录描述信息 - if (!buriedPoint.description().isEmpty()) { - message.addExtraData("description", buriedPoint.description()); - } - - // 异步发送埋点数据 - buriedPointProducer.asyncSendMessage(message); - - if (log.isDebugEnabled()) { - log.debug("[埋点] 方法开始: {}.{}, 事件类型: {}", className, methodName, message.getEventType()); - } - } catch (Exception e) { - log.warn("[埋点] 记录方法开始埋点异常", e); - } - } - - *//** - * 记录方法结束埋点 - *//* - private void recordMethodEnd(ProceedingJoinPoint point, Method method, BuriedPoint buriedPoint, - Object result, Exception exception, long duration, int buriedId) { - try { - // 获取类和方法名 - String className = point.getTarget().getClass().getSimpleName(); - String methodName = method.getName(); - - // 创建埋点消息 - BuriedMessages message = new BuriedMessages(); - message.setId(buriedId); - message.setEventTime(System.currentTimeMillis()); - message.setService(SpringUtils.getApplicationName()); - message.setMethod(className + "." + methodName); - message.setEventType(buriedPoint.eventType().isEmpty() ? "METHOD_END" : buriedPoint.eventType() + "_END"); - message.setPagePath(className + "#" + methodName); - message.setDuration(duration); - - // 设置用户ID和其他信息 - setRequestInfo(message); - - // 如果需要记录结果 - if (buriedPoint.recordResult() && result != null && exception == null) { - // 限制结果长度,避免埋点数据过大 - String resultStr = JSONUtil.toJsonStr(result); - if (resultStr.length() > 500) { - resultStr = resultStr.substring(0, 500) + "..."; - } - message.addExtraData("result", resultStr); - } - - // 如果有异常,记录异常信息 - if (exception != null) { - message.setErrorMessage(exception.getMessage()); - message.addExtraData("exceptionClass", exception.getClass().getName()); - } - - // 异步发送埋点数据 - buriedPointProducer.asyncSendMessage(message); - - if (log.isDebugEnabled()) { - log.debug("[埋点] 方法结束: {}.{}, 事件类型: {}, 耗时: {}ms", - className, methodName, message.getEventType(), duration); - } - } catch (Exception e) { - log.warn("[埋点] 记录方法结束埋点异常", e); - } - } - - *//** - * 设置请求相关信息 - *//* - private void setRequestInfo(BuriedMessages message) { - try { - // 获取当前请求信息 - ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); - if (attributes != null) { - HttpServletRequest request = attributes.getRequest(); - - // 设置用户ID和其他请求信息 - message.setUserId(getUserId(request)); - message.setSessionId(request.getSession().getId()); - message.setClientIp(ServletUtils.getClientIP(request)); - message.setUserAgent(request.getHeader("User-Agent")); - } - - // 设置服务器IP - message.setServerIp(getServerIp()); - } catch (Exception e) { - log.warn("[埋点] 设置请求信息异常", e); - } - } - - *//** - * 获取当前登录用户ID - *//* - private String getUserId(HttpServletRequest request) { - // 这里需要根据项目实际的用户体系来获取用户ID - Object userAttribute = request.getSession().getAttribute("USER_ID"); - if (userAttribute != null) { - return userAttribute.toString(); - } - return "anonymous"; - } - - *//** - * 获取服务器IP - *//* - private String getServerIp() { - try { - return InetAddress.getLocalHost().getHostAddress(); - } catch (UnknownHostException e) { - return "unknown"; - } - }*/ -} \ No newline at end of file diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/config/BuriedPointConfiguration.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/config/BuriedPointConfiguration.java index 6cef054..6cd9ca7 100644 --- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/config/BuriedPointConfiguration.java +++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/config/BuriedPointConfiguration.java @@ -1,12 +1,12 @@ package com.tashow.cloud.app.mq.config; -import com.tashow.cloud.app.mq.interceptor.BuriedPointInterceptor; +import com.tashow.cloud.app.interceptor.BuriedPointInterceptor; +import com.tashow.cloud.app.mapper.BuriedPointFailRecordMapper; import com.tashow.cloud.app.mq.message.BuriedMessages; +import com.tashow.cloud.app.model.BuriedPointFailRecord; import com.tashow.cloud.app.mq.producer.buriedPoint.BuriedPointProducer; +import com.tashow.cloud.app.mq.producer.buriedPoint.CustomCorrelationData; import lombok.RequiredArgsConstructor; -import org.springframework.amqp.core.Binding; -import org.springframework.amqp.core.BindingBuilder; -import org.springframework.amqp.core.DirectExchange; -import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -14,6 +14,8 @@ import org.springframework.web.servlet.config.annotation.InterceptorRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; import lombok.extern.slf4j.Slf4j; import jakarta.annotation.PostConstruct; +import java.util.Date; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; /** * 埋点功能配置类 @@ -25,42 +27,99 @@ public class BuriedPointConfiguration implements WebMvcConfigurer { private final BuriedPointProducer buriedPointProducer; private final RabbitTemplate rabbitTemplate; - + private final BuriedPointFailRecordMapper buriedPointFailRecordMapper; + /** * RabbitTemplate初始化配置 - * 确保回调正确配置,以实现消息可靠性 */ -// @PostConstruct -// public void initRabbitTemplate() { -// log.info("[埋点配置] 初始化RabbitTemplate: {}", rabbitTemplate); -// rabbitTemplate.setMandatory(true); -// rabbitTemplate.setReturnsCallback(returned -> { -// log.error("[埋点配置] 消息路由失败: exchange={}, routingKey={}, replyCode={}, replyText={}, message={}", -// returned.getExchange(), -// returned.getRoutingKey(), -// returned.getReplyCode(), -// returned.getReplyText(), -// new String(returned.getMessage().getBody())); -// }); -// rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { -// if (ack) { -// log.debug("[埋点配置] 消息成功发送到交换机: {}", correlationData); -// } else { -// log.error("[埋点配置] 消息发送到交换机失败: cause={}, correlationData={}", cause, correlationData); -// } -// }); -// -// // 验证配置 -// if (rabbitTemplate.isConfirmListener()) { -// log.info("[埋点配置] 确认回调已正确配置"); -// } else { -// log.error("[埋点配置] 确认回调配置失败,请检查RabbitMQ配置!"); -// } -// } + @PostConstruct + public RabbitTemplate initRabbitTemplate() { + log.info("[埋点配置] 初始化RabbitTemplate: {}", rabbitTemplate); + rabbitTemplate.setMandatory(true); + rabbitTemplate.setReturnsCallback(returned -> { + log.error("[埋点配置] 消息路由失败: exchange={}, routingKey={}, replyCode={}, replyText={}, ={}", + returned.getExchange(), + returned.getRoutingKey(), + returned.getReplyCode(), + returned.getReplyText(), + new String(returned.getMessage().getBody())); + + saveFailRecord( + returned.getMessage().getMessageProperties().getCorrelationId(), + returned.getExchange(), + returned.getRoutingKey(), + "路由失败: " + returned.getReplyText(), + new String(returned.getMessage().getBody()) + ); + }); + rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { + if (ack) { + log.info("[埋点配置] 消息成功发送到交换机: {}", correlationData.getId()); + } else { + log.error("[埋点配置] 消息发送到交换机失败: cause={}, correlationData={}", cause, correlationData); + CustomCorrelationData customData = (CustomCorrelationData) correlationData; + String messageContent = customData.getMessageContent(); + saveFailRecord( + correlationData.getId(), + BuriedMessages.EXCHANGE, + BuriedMessages.ROUTING_KEY, + cause, + messageContent + ); + } + }); + if (rabbitTemplate.isConfirmListener()) { + log.info("[埋点配置] 确认回调已正确配置"); + } else { + log.error("[埋点配置] 确认回调配置失败"); + } + return rabbitTemplate; + } /** - * 创建埋点队列 + * 保存消息发送失败记录 */ + private void saveFailRecord(String correlationId, String exchange, String routingKey, String cause, String messageContent) { + try { + log.info("[埋点配置] 保存发送失败记录: correlationId={}", correlationId); + + // 先查询是否已存在记录 + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + queryWrapper.eq(BuriedPointFailRecord::getCorrelationId, correlationId); + BuriedPointFailRecord existingRecord = buriedPointFailRecordMapper.selectOne(queryWrapper); + + if (existingRecord != null) { + // 已存在记录,执行更新 + log.info("[埋点配置] 发现已有失败记录,将更新: {}", correlationId); + existingRecord.setExchange(exchange); + existingRecord.setRoutingKey(routingKey); + existingRecord.setCause(cause); + existingRecord.setMessageContent(messageContent); + existingRecord.setStatus(BuriedPointFailRecord.STATUS_UNPROCESSED); + existingRecord.setUpdateTime(new Date()); + buriedPointFailRecordMapper.updateById(existingRecord); + log.info("[埋点配置] 发送失败记录已更新: correlationId={}", correlationId); + } else { + // 不存在记录,执行插入 + BuriedPointFailRecord failRecord = new BuriedPointFailRecord(); + failRecord.setCorrelationId(correlationId); + failRecord.setExchange(exchange); + failRecord.setRoutingKey(routingKey); + failRecord.setCause(cause); + failRecord.setMessageContent(messageContent); + failRecord.setRetryCount(0); + failRecord.setStatus(BuriedPointFailRecord.STATUS_UNPROCESSED); + failRecord.setCreateTime(new Date()); + failRecord.setUpdateTime(new Date()); + + buriedPointFailRecordMapper.insert(failRecord); + log.info("[埋点配置] 发送失败记录已保存: correlationId={}", correlationId); + } + } catch (Exception e) { + log.error("[埋点配置] 保存发送失败记录异常", e); + } + } + @Bean public Queue buriedPointQueue() { return new Queue(BuriedMessages.QUEUE, true, false, false); diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/consumer/buriedPoint/BuriedPointConsumer.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/consumer/buriedPoint/BuriedPointConsumer.java index 3a7d5d0..9c7e035 100644 --- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/consumer/buriedPoint/BuriedPointConsumer.java +++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/consumer/buriedPoint/BuriedPointConsumer.java @@ -1,8 +1,9 @@ -/* package com.tashow.cloud.app.mq.consumer.buriedPoint; -import com.tashow.cloud.app.mq.mapper.BuriedPointMapper; +import com.tashow.cloud.app.mapper.BuriedPointMapper; +import com.tashow.cloud.app.mapper.BuriedPointFailRecordMapper; import com.tashow.cloud.app.mq.message.BuriedMessages; -import com.tashow.cloud.app.mq.model.BuriedPoint; +import com.tashow.cloud.app.model.BuriedPoint; +import com.tashow.cloud.app.model.BuriedPointFailRecord; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; @@ -11,16 +12,11 @@ import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Value; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; -import org.springframework.util.StringUtils; - -import java.io.IOException; import java.util.Date; import com.rabbitmq.client.Channel; -*/ -/** - * 埋点消息消费者 - * 将埋点数据存储到数据库 - *//* +import org.springframework.dao.DuplicateKeyException; +import com.tashow.cloud.common.util.json.JsonUtils; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; @Component @RabbitListener(queues = BuriedMessages.QUEUE) @@ -29,90 +25,220 @@ import com.rabbitmq.client.Channel; public class BuriedPointConsumer { private final BuriedPointMapper buriedPointMapper; - + private final BuriedPointFailRecordMapper buriedPointFailRecordMapper; + @Value("${spring.application.name:tashow-app}") private String applicationName; - */ -/** - * 处理埋点消息 - *//* + private static final int MAX_RETRY_ALLOWED = 1; @RabbitHandler - public void onMessage(BuriedMessages message, - Channel channel, - @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { - try { - log.info("[埋点消费者] 收到埋点消息: {}", message); - - // 确保事件ID不为空 - if (message.getId() == null) { - message.setId((int)(System.currentTimeMillis() % Integer.MAX_VALUE)); - log.warn("[埋点消费者] 消息中的事件ID为空,已自动生成: {}", message.getId()); - } - saveToDatabase(message); - channel.basicAck(deliveryTag, false); - log.info("[埋点消费者] 消息处理成功,已确认"); - - } catch (Exception e) { - try { - channel.basicNack(deliveryTag, false, true); - } catch (IOException ex) { - log.error("[埋点消费者] 拒绝消息失败", ex); - } + public void onMessage(BuriedMessages message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { + Integer dbRetryCount = getActualRetryCount(message); + message.setRetryCount(dbRetryCount); + if (message.getRetryCount() != null && message.getRetryCount() >= MAX_RETRY_ALLOWED) { + message.setStatusCode(BuriedMessages.STATUS_ERROR); + message.addExtraData("errorMessage", "已达到最大重试次数"); + saveToFailRecord(message, "已达到最大重试次数"); + safeChannelAck(channel, deliveryTag); + return; } + log.info("[埋点消费者] 收到埋点消息: {}, 当前重试次数: {}/{}", message, message.getRetryCount(), MAX_RETRY_ALLOWED); + + message.setStatusCode(BuriedMessages.STATUS_PROCESSING); + log.info("[埋点消费者] 消息状态更新为处理中(STATUS_PROCESSING): {}", message.getId()); + try { + /* if(true){ + throw new RuntimeException("测试异常"); + }*/ + saveToDatabase(message); + message.setStatusCode(BuriedMessages.STATUS_SUCCESS); + updateMessageStatus(message); + log.info("[埋点消费者] 消息处理成功,状态已更新为成功(STATUS_SUCCESS): {}", message.getId()); + safeChannelAck(channel, deliveryTag); + } catch (DuplicateKeyException e) { + log.warn("[埋点消费者] 消息已被处理过,直接确认: {}, 错误: {}", message.getId(), e.getMessage()); + safeChannelAck(channel, deliveryTag); + } catch (Exception e) { + message.setStatusCode(BuriedMessages.STATUS_ERROR); + message.addExtraData("errorMessage", e.getMessage()); + log.error("[埋点消费者] 消息处理失败: {}, 错误: {}", message.getId(), e.getMessage()); + message.incrementRetryCount(); + updateRetryCount(message); + if (message.getRetryCount() >= MAX_RETRY_ALLOWED) { + saveToDatabase(message); + log.warn("[埋点消费者] 消息已达到最大重试次数: {}, 确认消息并保存到失败记录表", message.getRetryCount()); + saveToFailRecord(message, e.getMessage()); + safeChannelAck(channel, deliveryTag); + } else { + log.info("[埋点消费者] 消息将重新入队重试: {}, 当前重试次数: {}", message.getId(), message.getRetryCount()); + safeChannelNack(channel, deliveryTag, false, true); + } + } } - - - */ -/** - * 将埋点数据保存到数据库 - *//* + private Integer getActualRetryCount(BuriedMessages message) { + try { + BuriedPoint buriedPoint = buriedPointMapper.selectByEventId(message.getId()); + if (buriedPoint != null && buriedPoint.getRetryCount() != null) { + if ((buriedPoint.getStatus() == BuriedMessages.STATUS_ERROR || + buriedPoint.getStatus() == BuriedMessages.STATUS_PROCESSING)) { + log.info("[埋点消费者] 检测到消息可能因服{}", message.getId()); + return buriedPoint.getRetryCount() - 1; + } + return buriedPoint.getRetryCount(); + } else { + String correlationId = String.valueOf(message.getId()); + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + queryWrapper.eq(BuriedPointFailRecord::getCorrelationId, correlationId); + BuriedPointFailRecord failRecord = buriedPointFailRecordMapper.selectOne(queryWrapper); + return failRecord!=null? failRecord.getRetryCount():0; - private void saveToDatabase(BuriedMessages message) { + } + } catch (Exception e) { + log.warn("[埋点消费者] 获取消息重试次数失败: {}", e.getMessage()); + throw new RuntimeException("获取消息重试次数失败", e); + } + } + + private void safeChannelAck(Channel channel, long deliveryTag) { + try { + channel.basicAck(deliveryTag, false); + } catch (Exception e) { + log.error("[埋点消费者] 确认消息失败: {}", e.getMessage()); + } + } + + private void safeChannelNack(Channel channel, long deliveryTag, boolean multiple, boolean requeue) { + try { + channel.basicNack(deliveryTag, multiple, requeue); + } catch (Exception e) { + log.error("[埋点消费者] 拒绝消息失败: {}", e.getMessage()); + } + } + + private void updateRetryCount(BuriedMessages message) { + try { + BuriedPoint buriedPoint = buriedPointMapper.selectByEventId(message.getId()); + if (buriedPoint != null) { + buriedPoint.setRetryCount(message.getRetryCount()); + buriedPoint.setUpdateTime(new Date()); + buriedPointMapper.updateById(buriedPoint); + } else { + String correlationId = String.valueOf(message.getId()); + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + queryWrapper.eq(BuriedPointFailRecord::getCorrelationId, correlationId); + BuriedPointFailRecord failRecord = buriedPointFailRecordMapper.selectOne(queryWrapper); + if (failRecord != null) { + failRecord.setRetryCount(message.getRetryCount()); + failRecord.setUpdateTime(new Date()); + failRecord.setMessageContent(JsonUtils.toJsonString(message)); + buriedPointFailRecordMapper.updateById(failRecord); + } else { + // 记录或创建新的失败记录 + log.warn("[埋点消费者] 未找到埋点记录和失败记录, 事件ID: {}, 准备创建失败记录", message.getId()); + saveToFailRecord(message, "未找到原始埋点记录"); + } + } + } catch (Exception e) { + log.error("[埋点消费者] 更新重试次数失败: {}, 错误: {}", message.getId(), e.getMessage()); + } + } + + private void updateMessageStatus(BuriedMessages message) { + try { + BuriedPoint buriedPoint = buriedPointMapper.selectByEventId(message.getId()); + buriedPoint.setStatus(message.getStatusCode()); + buriedPoint.setUpdateTime(new Date()); + buriedPoint.setRetryCount(message.getRetryCount()); + buriedPointMapper.updateById(buriedPoint); + log.debug("[埋点消费者] 已更新埋点状态, 事件ID: {}, 新状态: {}, 重试次数: {}", message.getId(), message.getStatusCode(), message.getRetryCount()); + } catch (Exception e) { + log.error("[埋点消费者] 更新埋点状态失败: {}, 错误: {}", message.getId(), e.getMessage(), e); + } + } + + private boolean saveToDatabase(BuriedMessages message) { try { log.debug("[埋点消费者] 准备保存埋点数据,事件ID: {}", message.getId()); + BuriedPoint existingPoint = buriedPointMapper.selectByEventId(message.getId()); + if (existingPoint != null) { + existingPoint.setStatus(message.getStatusCode()); + existingPoint.setUpdateTime(new Date()); + if (message.getRetryCount() != null) { + int newRetryCount = Math.max(existingPoint.getRetryCount(), message.getRetryCount()); + existingPoint.setRetryCount(newRetryCount); + message.setRetryCount(newRetryCount); + } + int result = buriedPointMapper.updateById(existingPoint); + return result > 0; + } - // 转换消息为实体 BuriedPoint buriedPoint = new BuriedPoint(); - - // 设置必填字段,确保不为空 - buriedPoint.setEventId(message.getId()); + buriedPoint.setEventId(message.getId()); buriedPoint.setEventTime(message.getEventTime()); - - // 获取真实用户ID,避免使用默认anonymous - String userId = message.getUserId(); - buriedPoint.setUserId(StringUtils.hasText(userId) && !"null".equals(userId) ? userId : "anonymous"); - - String eventType = message.getEventType(); - buriedPoint.setEventType(eventType); + buriedPoint.setUserId(message.getUserId()); + buriedPoint.setEventType(message.getEventType()); buriedPoint.setService(applicationName); - - // 设置method字段,确保获取真实方法名 buriedPoint.setMethod(message.getMethod()); buriedPoint.setSessionId(message.getSessionId()); buriedPoint.setClientIp(message.getClientIp()); buriedPoint.setServerIp(message.getServerIp()); - - // 设置其他字段 + buriedPoint.setStatus(message.getStatusCode()); + buriedPoint.setRetryCount(message.getRetryCount()); buriedPoint.setPagePath(message.getPagePath()); buriedPoint.setElementId(message.getElementId()); buriedPoint.setDuration(message.getDuration()); - buriedPoint.setCreateTime(new Date()); - - log.debug("[埋点消费者] 埋点实体数据: eventId={}, eventType={}, userId={}, service={}, method={}", - buriedPoint.getEventId(), buriedPoint.getEventType(), - buriedPoint.getUserId(), buriedPoint.getService(), buriedPoint.getMethod()); - - int result = buriedPointMapper.insert(buriedPoint); - - log.info("[埋点消费者] 埋点数据已保存到数据库, 事件ID: {}, 影响行数: {}", message.getId(), result); + buriedPoint.setUpdateTime(new Date()); + log.debug("[埋点消费者] 埋点实体数据: eventId={}, eventType={}, userId={}, service={}, method={}, status={}, retryCount={}", buriedPoint.getEventId(), buriedPoint.getEventType(), buriedPoint.getUserId(), buriedPoint.getService(), buriedPoint.getMethod(), buriedPoint.getStatus(), buriedPoint.getRetryCount()); + buriedPointMapper.insert(buriedPoint); + log.info("[埋点消费者] 埋点数据已保存到数据库, 事件ID: {}, 状态: {}", message.getId(), message.getStatusCode()); + return true; } catch (Exception e) { - log.error("[埋点消费者] 保存埋点数据到数据库失败, 事件ID: {}, 错误: {}", - message.getId(), e.getMessage(), e); + log.error("[埋点消费者] 保存埋点数据到数据库失败, 事件ID: {}, 错误: {}", message.getId(), e.getMessage(), e); + throw e; } } -} */ + + /** + * 保存失败记录到BuriedPointFailRecord表 + */ + private void saveToFailRecord(BuriedMessages message, String cause) { + try { + String correlationId = String.valueOf(message.getId()); + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + queryWrapper.eq(BuriedPointFailRecord::getCorrelationId, correlationId); + BuriedPointFailRecord existingRecord = buriedPointFailRecordMapper.selectOne(queryWrapper); + + if (existingRecord != null) { + log.info("[埋点消费者] 发现已有失败记录,将更新: {}", correlationId); + existingRecord.setExchange(BuriedMessages.EXCHANGE); + existingRecord.setRoutingKey(BuriedMessages.ROUTING_KEY); + existingRecord.setCause(cause); + existingRecord.setMessageContent(JsonUtils.toJsonString(message)); + existingRecord.setRetryCount(message.getRetryCount()); + existingRecord.setStatus(BuriedPointFailRecord.STATUS_UNPROCESSED); + existingRecord.setUpdateTime(new Date()); + buriedPointFailRecordMapper.updateById(existingRecord); + log.info("[埋点消费者] 已更新失败记录, 事件ID: {}", correlationId); + } else { + BuriedPointFailRecord failRecord = new BuriedPointFailRecord(); + failRecord.setCorrelationId(correlationId); + failRecord.setExchange(BuriedMessages.EXCHANGE); + failRecord.setRoutingKey(BuriedMessages.ROUTING_KEY); + failRecord.setCause(cause); + failRecord.setMessageContent(JsonUtils.toJsonString(message)); + failRecord.setRetryCount(message.getRetryCount()); + failRecord.setStatus(BuriedPointFailRecord.STATUS_UNPROCESSED); + failRecord.setCreateTime(new Date()); + failRecord.setUpdateTime(new Date()); + buriedPointFailRecordMapper.insert(failRecord); + log.info("[埋点消费者] 已将失败消息保存到失败记录表, 事件ID: {}", message.getId()); + } + } catch (Exception e) { + log.error("[埋点消费者] 保存失败记录失败: {}, 错误: {}", message.getId(), e.getMessage(), e); + } + } +} diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/docs/BuriedPointExample.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/docs/BuriedPointExample.java deleted file mode 100644 index c863c5a..0000000 --- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/docs/BuriedPointExample.java +++ /dev/null @@ -1,56 +0,0 @@ -package com.tashow.cloud.app.mq.docs; - -import com.tashow.cloud.app.mq.annotation.BuriedPoint; -import org.springframework.stereotype.Service; - -/** - * 埋点使用示例 - */ -@Service -public class BuriedPointExample { - - /** - * 使用埋点注解标记的方法 - * 系统会自动记录该方法的调用情况 - */ - @BuriedPoint( - eventType = "BUSINESS_OPERATION", - description = "处理业务数据", - recordParams = true, - recordResult = true - ) - public String processBusiness(String businessId, String operationType) { - // 业务处理逻辑 - return "处理完成: " + businessId; - } - - /** - * 如何使用埋点系统: - * - * 1. 静默埋点 - 无需任何代码 - * 系统会自动对所有API请求进行埋点,记录请求的开始和结束信息 - * 包括: 请求时间、路径、参数、响应时间、状态码等 - * - * 2. 注解埋点 - 针对特定方法 - * 在需要埋点的方法上添加 @BuriedPoint 注解 - * 可以设置事件类型、描述,以及是否记录参数和返回结果 - * - * 3. 埋点数据流转 - * 所有埋点数据会通过RabbitMQ发送到消息队列 - * 可以在消费者端实现数据的存储和分析 - * - * 4. 数据字段说明 - * id: 埋点事件唯一ID - * eventTime: 事件发生时间 - * service: 服务名称 - * method: 方法/接口 - * userId: 用户标识 - * clientIp: 客户端IP - * eventType: 事件类型 - * duration: 操作耗时 - * extraData: 额外数据(参数、结果等) - */ - public void howToUse() { - // 示例方法 - } -} \ No newline at end of file diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/docs/backend-silent-tracking-rabbitmq.md b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/docs/backend-silent-tracking-rabbitmq.md deleted file mode 100644 index fbc383e..0000000 --- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/docs/backend-silent-tracking-rabbitmq.md +++ /dev/null @@ -1,220 +0,0 @@ -# 后端静默埋点实现方案(基于RabbitMQ) - -后端静默埋点是指不依赖前端代码,完全由后端服务收集用户行为和应用性能数据的方案。以下是使用RabbitMQ实现后端静默埋点的详细方案: - -## 一、方案架构 - -[业务服务] → [RabbitMQ] → [埋点消费服务] → [数据分析存储] - -## 二、具体实现步骤 - -### 1. 埋点数据收集 - -在业务服务中需要埋点的位置添加消息发送代码: - -```java -// Java示例使用Spring AMQP -@Autowired -private RabbitTemplate rabbitTemplate; - -public void someBusinessMethod(User user, Action action) { - try { - // 构造埋点数据 - Map trackingData = new HashMap<>(); - trackingData.put("event_time", System.currentTimeMillis()); - trackingData.put("user_id", user.getId()); - trackingData.put("action", action.name()); - trackingData.put("service", "order-service"); - trackingData.put("ip", getClientIp()); - - // 发送到RabbitMQ - rabbitTemplate.convertAndSend( - "tracking.exchange", - "tracking.route", - trackingData - ); - } catch (Exception e) { - // 埋点异常不应影响主流程 - log.error("Tracking data send failed", e); - } -} -``` - -### 2. RabbitMQ配置 - -#### 创建专用Exchange和Queue - -```yaml -# application.yml配置示例 -spring: - rabbitmq: - host: rabbitmq-server - port: 5672 - username: guest - password: guest - template: - exchange: tracking.exchange - listener: - simple: - acknowledge-mode: auto -``` - -#### 声明Exchange和Queue - -```java -@Configuration -public class RabbitMQConfig { - - @Bean - public Exchange trackingExchange() { - return ExchangeBuilder.directExchange("tracking.exchange") - .durable(true) - .build(); - } - - @Bean - public Queue trackingQueue() { - return QueueBuilder.durable("tracking.queue") - .withArgument("x-message-ttl", 86400000) // 消息存活时间 - .build(); - } - - @Bean - public Binding trackingBinding() { - return BindingBuilder.bind(trackingQueue()) - .to(trackingExchange()) - .with("tracking.route") - .noargs(); - } -} -``` - -### 3. 埋点数据处理服务 - -创建专门的消费者服务处理埋点数据: - -```java -@Component -public class TrackingDataConsumer { - - private static final Logger log = LoggerFactory.getLogger(TrackingDataConsumer.class); - - @RabbitListener(queues = "tracking.queue") - public void processTrackingData(Map trackingData) { - try { - // 1. 数据校验和补全 - validateAndEnrichData(trackingData); - - // 2. 数据格式化 - TrackingEvent event = convertToEvent(trackingData); - - // 3. 数据存储或转发 - saveToDatabase(event); - // 或 sendToAnalyticsPlatform(event); - - } catch (Exception e) { - log.error("Process tracking data failed: {}", trackingData, e); - // 可根据需要加入重试或死信队列逻辑 - } - } - - private void validateAndEnrichData(Map data) { - if (!data.containsKey("user_id")) { - data.put("user_id", "anonymous"); - } - // 补充服务IP等信息 - data.put("server_ip", getServerIp()); - } - - // 其他辅助方法... -} -``` - -### 4. 高级配置 - -#### 消息可靠性保证 - -```java -// 发送端确认回调 -rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { - if (!ack) { - log.warn("Tracking message lost, cause: {}", cause); - // 可加入本地缓存或日志,后续重发 - } -}); - -// 返回回调(路由失败) -rabbitTemplate.setReturnsCallback(returned -> { - log.warn("Tracking message returned: {}", returned.getMessage()); -}); -``` - -#### 批量发送优化 - -```java -// 使用BatchingRabbitTemplate提高性能 -@Bean -public BatchingRabbitTemplate batchingRabbitTemplate( - RabbitTemplate rabbitTemplate, - SimpleBatchingStrategy strategy) { - - return new BatchingRabbitTemplate(strategy); -} - -@Bean -public SimpleBatchingStrategy batchingStrategy() { - return new SimpleBatchingStrategy(100, 1024 * 1024, 5000); -} -``` - -## 三、埋点数据设计建议 - -### 1. 通用字段 - -event_id: 事件唯一ID - -event_time: 事件时间戳 - -service: 服务名称 - -method: 方法/接口 - -user_id: 用户标识 - -session_id: 会话标识 - -client_ip: 客户端IP - -server_ip: 服务器IP - -### 2. 业务字段 - -action: 操作类型 - -target: 操作目标 - -params: 业务参数 - -result: 操作结果 - -cost_time: 耗时(ms) - -## 四、性能优化建议 - -异步发送:确保埋点发送不阻塞主业务流程 - -本地缓存:网络异常时先缓存到本地,后续重发 - -采样率:高流量场景可配置采样率 - -消息压缩:大数据量时可启用消息压缩 - -独立集群:埋点使用独立的RabbitMQ集群,避免影响业务消息 - -## 五、监控与维护 - -监控RabbitMQ队列积压情况 - -设置埋点数据处理的延迟告警 - -定期审计埋点数据的完整性和准确性 \ No newline at end of file diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/mapper/BuriedPointMapper.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/mapper/BuriedPointMapper.java deleted file mode 100644 index bd56003..0000000 --- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/mapper/BuriedPointMapper.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.tashow.cloud.app.mq.mapper; - -import com.baomidou.mybatisplus.core.mapper.BaseMapper; -import com.tashow.cloud.app.mq.model.BuriedPoint; -import org.apache.ibatis.annotations.Mapper; - - -@Mapper -public interface BuriedPointMapper extends BaseMapper { -} \ No newline at end of file diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/message/BuriedMessages.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/message/BuriedMessages.java index 243fae9..7a072e1 100644 --- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/message/BuriedMessages.java +++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/message/BuriedMessages.java @@ -7,13 +7,12 @@ import java.util.Map; @Data public class BuriedMessages implements Serializable { + private static final long serialVersionUID = 1L; // 添加序列化ID + // 消息队列配置 public static final String QUEUE = "BURIED_POINT_QUEUE"; public static final String EXCHANGE = "BURIED_POINT_EXCHANGE"; public static final String ROUTING_KEY = "BURIED_POINT_ROUTING_KEY"; - public static final String DEAD_LETTER_EXCHANGE = "DEAD_LETTER_EXCHANGE"; - public static final String DEAD_LETTER_ROUTING_KEY = "DEAD_LETTER_ROUTING"; - public static final String DEAD_LETTER_QUEUE = "DEAD_LETTER_QUEUE"; // 状态码定义 public static final Integer STATUS_INIT = 10; // 初始状态 @@ -21,8 +20,8 @@ public class BuriedMessages implements Serializable { public static final Integer STATUS_SUCCESS = 30; // 处理成功 public static final Integer STATUS_WARNING = 40; // 处理警告 public static final Integer STATUS_ERROR = 50; // 处理错误 + - // 通用字段 private Integer id; // 事件唯一ID private Long eventTime; // 事件时间戳 private String service; // 服务名称 @@ -31,8 +30,7 @@ public class BuriedMessages implements Serializable { private String sessionId; // 会话标识 private String clientIp; // 客户端IP private String serverIp; // 服务器IP - - // 添加埋点特定字段 + private String eventType; // 事件类型: PAGE_VIEW, API_CALL, BUTTON_CLICK 等 private String pagePath; // 页面路径/功能模块 private String elementId; // 元素标识 @@ -41,26 +39,11 @@ public class BuriedMessages implements Serializable { private String userAgent; // 用户代理信息 private Integer statusCode; // 响应状态码 private String errorMessage; // 错误信息 - - // 扩展字段,用于存储特定事件的额外数据 + private Integer retryCount = 0; // 重试次数计数器,默认0 + private Map extraData = new HashMap<>(); - /** - * 快速创建消息的便捷方法 - */ - public static BuriedMessages create(String userId, String eventType, String pagePath) { - BuriedMessages msg = new BuriedMessages(); - msg.setUserId(userId); - msg.setEventType(eventType); - msg.setPagePath(pagePath); - msg.setEventTime(System.currentTimeMillis()); - msg.setStatusCode(STATUS_INIT); // 默认初始状态 - return msg; - } - - /** - * 添加扩展数据 - */ + public BuriedMessages addExtraData(String key, Object value) { if (this.extraData == null) { this.extraData = new HashMap<>(); @@ -68,4 +51,17 @@ public class BuriedMessages implements Serializable { this.extraData.put(key, value); return this; } + + /** + * 增加重试计数 + */ + public void incrementRetryCount() { + if (this.retryCount == null) { + this.retryCount = 0; + } + this.retryCount++; + } + + + } diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/producer/buriedPoint/BuriedPointProducer.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/producer/buriedPoint/BuriedPointProducer.java index 7130e70..648691c 100644 --- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/producer/buriedPoint/BuriedPointProducer.java +++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/mq/producer/buriedPoint/BuriedPointProducer.java @@ -1,14 +1,13 @@ package com.tashow.cloud.app.mq.producer.buriedPoint; import com.tashow.cloud.app.mq.message.BuriedMessages; +import com.tashow.cloud.app.mapper.BuriedPointMapper; +import com.tashow.cloud.common.util.json.JsonUtils; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.UUID; /** @@ -16,34 +15,49 @@ import java.util.UUID; */ @Slf4j @Component -public class BuriedPointProducer { +public class BuriedPointProducer implements RabbitTemplate.ConfirmCallback { @Autowired private RabbitTemplate rabbitTemplate; + @Autowired + private BuriedPointMapper buriedPointMapper; + /** - * 异步发送完整的埋点消息,并确保消息已被broker接收 + * 异步发送完整的埋点消息(生成新的correlationId) */ @SneakyThrows public void asyncSendMessage(BuriedMessages message) { - CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); - // final CompletableFuture confirmFuture = new CompletableFuture<>(); - log.info("[埋点] 异步准备发送消息: {}", message); - correlationData.getFuture().whenComplete((confirm, ex) -> { - log.info("[埋点] 异步消息发送确认回调: {}", message); - if (ex != null) { - log.error("[埋点] 异步消息发送异常: {}", ex.getMessage(), ex); - // confirmFuture.completeExceptionally(ex); - } else if (confirm != null && confirm.isAck()) { - log.info("[埋点] 异步消息发送成功: {}", message); - // confirmFuture.complete(true); - } else { - log.warn("[埋点] 异步消息发送未被ACK"); -//confirmFuture.complete(false); - } - }); + String correlationId = UUID.randomUUID().toString(); + asyncSendMessage(message, correlationId); + } + + /** + * 异步发送完整的埋点消息(使用指定的correlationId) + * 用于重试场景,保持原有的correlationId + */ + @SneakyThrows + public void asyncSendMessage(BuriedMessages message, String correlationId) { + log.info("[埋点] 异步准备发送消息: {}, correlationId: {}", message, correlationId); + String messageJson = JsonUtils.toJsonString(message); + + CustomCorrelationData correlationData = new CustomCorrelationData(correlationId, messageJson); + rabbitTemplate.convertAndSend(BuriedMessages.EXCHANGE, BuriedMessages.ROUTING_KEY, message, correlationData); - log.info("[埋点] 异步消息发送完成: {}", message); - // return null; + log.info("[埋点] 异步消息发送完成: {}, 状态: {}, 重试次数: {}, correlationId: {}", + message.getId(), message.getStatusCode(), message.getRetryCount(), correlationId); + } + + + /** + * 确认消息是否成功发送到Broker的回调方法 + */ + @Override + public void confirm(CorrelationData correlationData, boolean ack, String cause) { + if (ack) { + log.info("[埋点] 消息发送确认成功: {}", correlationData.getId()); + } else { + log.error("[埋点] 消息发送确认失败: {}, 原因: {}", correlationData.getId(), cause); + } } } \ No newline at end of file diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/security/config/SecurityConfiguration.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/security/config/SecurityConfiguration.java index 693e32b..e84bc72 100644 --- a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/security/config/SecurityConfiguration.java +++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/security/config/SecurityConfiguration.java @@ -20,7 +20,6 @@ public class SecurityConfiguration { @Bean("infraAuthorizeRequestsCustomizer") public AuthorizeRequestsCustomizer authorizeRequestsCustomizer() { return new AuthorizeRequestsCustomizer() { - @Override public void customize(AuthorizeHttpRequestsConfigurer.AuthorizationManagerRequestMatcherRegistry registry) { // Swagger 接口文档