39 Commits

Author SHA1 Message Date
xuelijun
cde19e180a 获得SKU回收站分页列表 2025-08-05 17:20:56 +08:00
xuelijun
360c497c49 规则修改 2:sku模块 2025-08-05 15:38:12 +08:00
xuelijun
038a09f286 获取sku扩展服务配置信息 2025-08-02 17:00:48 +08:00
xuelijun
bd9c07313f 创建sku扩展服务配置 2025-08-02 10:43:25 +08:00
xuelijun
fb863ef9d1 扩展服务 2025-08-01 18:02:59 +08:00
xuelijun
d92b69e1f1 商品服务查询 2025-07-31 16:50:43 +08:00
xuelijun
2f54798561 创建商品服务1 2025-07-31 11:08:24 +08:00
xuelijun
d80bea35c6 创建商品服务 2025-07-30 18:29:26 +08:00
xuelijun
083b4e0bf1 创建商品 2025-07-30 10:45:19 +08:00
xuelijun
4675e14813 产品模块7 2025-07-29 17:24:00 +08:00
xuelijun
cdaeb3d908 产品模块6 2025-07-29 16:54:57 +08:00
xuelijun
baaec3f5dc 产品模块5 2025-07-29 10:54:37 +08:00
xuelijun
61f5816910 产品模块4 2025-07-29 10:25:56 +08:00
xuelijun
203749552d 产品模块3 2025-07-28 18:11:54 +08:00
xuelijun
d36f914aa3 产品模块2 2025-07-28 17:51:25 +08:00
xuelijun
d07d884286 产品模块1 2025-07-28 17:32:31 +08:00
xuelijun
ba3fe6a242 产品模块 2025-07-28 17:28:04 +08:00
94f5254e5c Merge branch 'xlj' into develop 2025-07-24 09:55:11 +08:00
xuelijun
5e5c13329f cannal注释 2025-07-24 09:43:39 +08:00
xuelijun
10f8e8251b canal 2025-07-15 10:04:16 +08:00
bb4432e643 提交 2025-07-09 18:19:13 +08:00
3d072958d6 提交 2025-06-18 18:30:01 +08:00
98bb3529ea 提交 2025-06-18 17:14:27 +08:00
e384dc1163 Merge branch 'refs/heads/develop' into feature/zijie
# Conflicts:
#	tashow-module/tashow-module-app/pom.xml
#	tashow-module/tashow-module-system/src/main/resources/application-local.yaml
2025-06-09 14:58:46 +08:00
c2adf5155f 调整 2025-06-04 18:31:22 +08:00
a55fa06c74 删除依赖 2025-06-04 17:11:07 +08:00
731eaf3629 删除文档 修改dockerfile文件 2025-06-04 15:18:17 +08:00
3af438bcb7 Merge remote-tracking branch 'origin/feature/app-1.0.0' into feature/zijie 2025-05-30 17:50:21 +08:00
8e2c28ef36 提交 2025-05-30 17:49:57 +08:00
d010e55b76 提交 2025-05-26 17:58:35 +08:00
71519a730b 还原子杰代码 2025-05-26 17:50:49 +08:00
4708c10358 Merge branch 'feature/zijie' of http://gitea.tashowz.com/tashow/tashow-platform into feature/zijie
# Conflicts:
#	tashow-dependencies/pom.xml
#	tashow-module/pom.xml
#	tashow-module/tashow-module-system/tashow-module-system-biz/src/main/resources/application.yaml
2025-05-26 17:37:29 +08:00
55a99fdf7b 提交 2025-05-26 17:24:48 +08:00
0d168cc260 提交 2025-05-26 17:24:20 +08:00
2bbecd241a 提交 2025-05-21 11:24:15 +08:00
6010f4efe9 提交 2025-05-21 11:16:48 +08:00
01671a3ed2 提交 2025-04-22 15:23:13 +08:00
e731ef8bcd 提交 2025-04-19 17:31:40 +08:00
6230c36cf2 提交 2025-04-17 18:32:11 +08:00
338 changed files with 16649 additions and 180 deletions

48
.cursor/rules/1.mdc Normal file
View File

@@ -0,0 +1,48 @@
---
description:
globs:
alwaysApply: false
---
---
description:
globs:
alwaysApply: false
---
# Your rule content
#角色
你是一名精通开发的高级工程师拥有10年以上的应用开发经验熟悉*等开发工具和技术栈。
你的任务是帮助用户设计和开发易用且易于推护的 *** 应用。始终遵循最佳实践,并坚持干净代码和健壮架构的原则。
#目标
你的目标是以用户容易理解的方式帮助他们完成“应用的设计和开发工作,确保应用功能完善、性能优异、用户体验良好。
#要求
在理解用户需求、设计UI、编写代码、解决问题和项目选代优化时你应该始终遵循以下原则:
##需求理解
-充分理解用户需求,站在用户角度思考,分析需求是否存在缺漏,并与用户讨论完善需求;
-选择最简单的解决方案来满足用户需求,避免过度设计。
##UI和样式设计
-使用现代UI框架进行样式设计(例如***这里可以根据不同开发项目仔纽展开比如使用哪些视觉规范或者UI框架没有的话也可以不用过多展开);
-在不同平台上实现一致的设计和响应式模式
##代码编写
技术选型:根据项目需求选择合适的技术栈(例如***,这里需要仔细展开,比如介招某个技术栈用在什么地方,以及要遵循什么最佳实践)
代码结构:强调代码的清晰性、模块化、可维护性,遵循最佳实践(如DRY原则、最小权限原则、响应式设计等)
-代码安全性:在编写代码时,始终考虑安全性,避免引入漏洞,确保用户输入的安全处理
-性能优化:优化代码的性能,减少资源占用,提升加载速度,确保项目的高效运行
-测试与文档:编写单元测试,确保代码的健壮性,并提供清晰的中文注释和文档。方便后续阅读和维护
##问题解决
-全面阅读相关代码,理解***应用的工作原理
-根据用户的反馈分析问题的原因,提出解决问题的思路
-确保每次代码变更不会破坏现有功能,且尽可能保持最小的改动
##迭代优化
与用户保持密切沟通,根据反读调整功能和设计,确保应用符合用户需求
在不确定需求时,主动询问用户以澄清需求或技术细节
##方法论
-系统2思维:以分析严谨的方式解决问题。将需求分解为更小、可管理的部分,并在实施前仔细考虑每一步
思维树:评估多种可能的解决方案及其后果。使用结构化的方法探索不同的路径。并选择最优的解决方案
-选代改进:在最终确定代码之前,考虑改进、边缘情况和优化。通过潜在增强的迭代,确保最终解决方案是健壮的

View File

@@ -0,0 +1,2 @@
**添加规则文件可帮助模型精准理解你的编码偏好,如框架、代码风格等**
**规则文件只对当前工程生效单文件限制10000字符。如果无需将该文件提交到远程 Git 仓库,请将其添加到 .gitignore**

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -1,3 +1,3 @@
暂未适配 IBM DB2 数据库,如果你有需要,可以微信联系 wangwenbin-server 一起建设。 暂未适配 IBM DB2 数据库,如果你有需要,可以微信联系 wangwenbin-server 一起建设。
你需要把表结构与数据导入到 DM 数据库,我来测试与适配代码。 你需要把表结构与数据导入到 DM 数据库,我a来测试与适配代码。

View File

@@ -13,6 +13,7 @@
<modules> <modules>
<module>tashow-infra-api</module> <module>tashow-infra-api</module>
<module>tashow-system-api</module> <module>tashow-system-api</module>
<module>tashow-product-api</module>
</modules> </modules>
</project> </project>

View File

@@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.tashow.cloud</groupId>
<artifactId>tashow-feign</artifactId>
<version>${revision}</version>
</parent>
<artifactId>tashow-product-api</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>
infra 模块 API暴露给其它模块调用
</description>
<dependencies>
<dependency>
<groupId>com.tashow.cloud</groupId>
<artifactId>tashow-common</artifactId>
</dependency>
<!-- 参数校验 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
<optional>true</optional>
</dependency>
<!-- RPC 远程调用相关 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.5.13</version> <!-- 推荐使用最新稳定版本 -->
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,4 @@
/**
* infra API 包,定义暴露给其它模块的 API
*/
package com.tashow.cloud.productapi.api;

View File

@@ -0,0 +1,24 @@
package com.tashow.cloud.productapi.enums;
import com.tashow.cloud.common.enums.RpcConstants;
/**
* API 相关的枚举
*
* @author 芋道源码
*/
public class ApiConstants {
/**
* 服务名
*
* 注意,需要保证和 spring.application.name 保持一致
*/
public static final String NAME = "product-server";
public static final String PREFIX = RpcConstants.RPC_API_PREFIX + "/infra";
public static final String VERSION = "1.0.0";
}

View File

@@ -0,0 +1,42 @@
package com.tashow.cloud.productapi.enums;
import lombok.Getter;
/**
* @Author LGF
* @create 2020/10/26 16:36
*/
public enum BaseEnum {
/**
* 基础 枚举
*/
YES_ONE(1, ""),
NO_ZERO(0, ""),
ENABLE_ZERO(0, "启用"),
FORBIDDEN_ONE(1, "禁用"),
YES_BINDING(1, "已绑定"),
NO_BINDING(0, "未绑定"),
NO(1, "删除"),
YES(0, "正常"),
HIDE(3, "隐藏"),
NO_TWO(2, ""),
;
@Getter
Integer key;
@Getter
String value;
;
BaseEnum(Integer key, String value) {
this.key = key;
this.value = value;
}
}

View File

@@ -0,0 +1,20 @@
package com.tashow.cloud.productapi.enums;
/**
* Infra 字典类型的枚举类
*
* @author 芋道源码
*/
public interface DictTypeConstants {
String JOB_STATUS = "product_job_status"; // 定时任务状态的枚举
String JOB_LOG_STATUS = "product_job_log_status"; // 定时任务日志状态的枚举
String API_ERROR_LOG_PROCESS_STATUS = "product_api_error_log_process_status"; // API 错误日志的处理状态的枚举
String CONFIG_TYPE = "product_config_type"; // 参数配置类型
String BOOLEAN_STRING = "product_boolean_string"; // Boolean 是否类型
String OPERATE_TYPE = "product_operate_type"; // 操作类型
}

View File

@@ -0,0 +1,35 @@
package com.tashow.cloud.productapi.enums;
import com.tashow.cloud.common.exception.ErrorCode;
/**
* Infra 错误码枚举类
*
* infra 系统,使用 1-001-000-000 段
*/
public interface ErrorCodeConstants {
ErrorCode CATEGORY_NOT_EXISTS = new ErrorCode(10001, "产品类目不存在");
ErrorCode PROD_NOT_EXISTS = new ErrorCode(10002, "商品不存在");
ErrorCode PROD_ADDITIONAL_FEE_DATES_NOT_EXISTS = new ErrorCode(10003, "特殊日期附加费用规则不存在");
ErrorCode PROD_ADDITIONAL_FEE_PERIODS_NOT_EXISTS = new ErrorCode(10004, "特殊时段附加费用规则不存在");
ErrorCode PROD_EMERGENCY_RESPONSE_NOT_EXISTS = new ErrorCode(10005, "商品紧急响应服务设置不存在");
ErrorCode PROD_EMERGENCY_RESPONSE_INTERVALS_NOT_EXISTS = new ErrorCode(10006, "紧急响应时间区间设置不存在");
ErrorCode PROD_PROP_NOT_EXISTS = new ErrorCode(10007, "商品属性不存在");
ErrorCode PROD_PROP_VALUE_NOT_EXISTS = new ErrorCode(10008, "属性规则不存在");
ErrorCode PROD_RESERVATION_CONFIG_NOT_EXISTS = new ErrorCode(10009, "商品预约配置不存在");
ErrorCode PROD_SERVICE_AREA_RELEVANCE_NOT_EXISTS = new ErrorCode(10010, "商品与服务区域关联不存在");
ErrorCode PROD_SERVICE_AREAS_NOT_EXISTS = new ErrorCode(10011, "服务区域不存在");
ErrorCode PROD_SERVICE_OVER_AREA_RULES_NOT_EXISTS = new ErrorCode(10012, "超区规则不存在");
ErrorCode PROD_TAGS_NOT_EXISTS = new ErrorCode(10013, "商品和标签管理不存在");
ErrorCode PRODUCT_ORDER_LIMIT_NOT_EXISTS = new ErrorCode(10014, "商品接单上限设置不存在");
ErrorCode PROD_WEIGHT_RANGE_PRICES_NOT_EXISTS = new ErrorCode(10015, "体重区间价格不存在");
ErrorCode SHOP_DETAIL_NOT_EXISTS = new ErrorCode(10016, "店铺信息不存在");
ErrorCode SKU_NOT_EXISTS = new ErrorCode(10017, "单品SKU不存在");
ErrorCode SKU_SERVICE_DELIVER_NOT_EXISTS = new ErrorCode(10018, "服务交付方式不存在");
ErrorCode SKU_SERVICE_MATERIAL_NOT_EXISTS = new ErrorCode(10019, "服务物料详情不存在");
ErrorCode SKU_SERVICES_FORM_NOT_EXISTS = new ErrorCode(10021, "商品SKU扩展服务表单不存在");
ErrorCode SKU_SERVICE_TRANSPORT_NOT_EXISTS = new ErrorCode(10022, "服务遗体运输不存在");
ErrorCode SKU_SERVICE_DETAILS_NOT_EXISTS = new ErrorCode(10023, "服务详情不存在");
}

View File

@@ -0,0 +1,44 @@
/*
* Copyright (c) 2018-2999 广州市蓝海创新科技有限公司 All rights reserved.
*
* https://www.mall4j.com/
*
* 未经允许,不可做商业用途!
*
* 版权所有,侵权必究!
*/
package com.tashow.cloud.productapi.enums;
/**
* 商品规格参数、属性类型
* @author lgh
*/
public enum ProdPropRule {
// 规格属性 (用于商品商品发布时关联sku)
SPEC(1),
// 规格参数(用于商品搜索时,与分类关联搜索)
ATTRIBUTE(2);
private Integer num;
public Integer value() {
return num;
}
ProdPropRule(Integer num){
this.num = num;
}
public static ProdPropRule instance(Integer value) {
ProdPropRule[] enums = values();
for (ProdPropRule statusEnum : enums) {
if (statusEnum.value().equals(value)) {
return statusEnum;
}
}
return null;
}
}

View File

@@ -0,0 +1,45 @@
package com.tashow.cloud.productapi.enums;
public enum ServiceTypeEnum {
TRANSPORT_CAR_CONFIG(1, "接运车辆配置"),
TRANSPORT_CAR_MATERIAL(2, "接运车辆服务物料"),
BODY_TRANSPORT_CONFIG(3, "遗体运输目的地配置"),
BODY_TRANSPORT_MATERIAL(4, "遗体运输目的地物料"),
BODY_CLEAN_CONFIG(5, "遗体清洁配置"),
BODY_CLEAN_MATERIAL(6, "遗体清洁物料"),
MEMORIAL_CONFIG(7, "追思告别配置"),
MEMORIAL_MATERIAL(8, "追思告别物料"),
CREMATION_CONFIG(9, "遗体火化配置"),
CREMATION_MATERIAL(10, "遗体火化物料"),
ASH_PROCESSING_CONFIG(11, "骨灰处理配置"),
ASH_PROCESSING_DELIVERY(12, "骨灰处理配送方式"),
ASH_PROCESSING_MATERIAL(13, "骨灰处理物料"),
BONE_ASH_CONFIG(14, "骨灰装殓配置"),
SOUVENIR_CONFIG(15, "纪念品配置"),
SOUVENIR_DELIVERY(16, "纪念品配送方式");
private final int code;
private final String description;
ServiceTypeEnum(int code, String description) {
this.code = code;
this.description = description;
}
public int getCode() {
return code;
}
public String getDescription() {
return description;
}
public static ServiceTypeEnum getByCode(int code) {
for (ServiceTypeEnum type : ServiceTypeEnum.values()) {
if (type.getCode() == code) {
return type;
}
}
return null;
}
}

View File

@@ -0,0 +1,54 @@
package com.tashow.cloud.productapi.general;
import org.apache.ibatis.type.BaseTypeHandler;
import org.apache.ibatis.type.JdbcType;
import java.sql.CallableStatement;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* 处理 List<String> 与数据库逗号分隔字符串之间的转换
*/
public class StringListTypeHandler extends BaseTypeHandler<List<String>> {
@Override
public void setNonNullParameter(PreparedStatement ps, int i, List<String> parameter, JdbcType jdbcType) throws SQLException {
// 将 List 转为逗号分隔的字符串
StringBuilder sb = new StringBuilder();
for (int j = 0; j < parameter.size(); j++) {
if (j > 0) sb.append(",");
sb.append(parameter.get(j));
}
ps.setString(i, sb.toString());
}
@Override
public List<String> getNullableResult(ResultSet rs, String columnName) throws SQLException {
String str = rs.getString(columnName);
return parseStringToList(str);
}
@Override
public List<String> getNullableResult(ResultSet rs, int columnIndex) throws SQLException {
String str = rs.getString(columnIndex);
return parseStringToList(str);
}
@Override
public List<String> getNullableResult(CallableStatement cs, int columnIndex) throws SQLException {
String str = cs.getString(columnIndex);
return parseStringToList(str);
}
private List<String> parseStringToList(String str) {
if (str == null || str.trim().length() == 0) {
return Collections.emptyList();
}
return Arrays.asList(str.split(","));
}
}

View File

@@ -27,6 +27,7 @@
<module>tashow-data-redis</module> <module>tashow-data-redis</module>
<module>tashow-data-excel</module> <module>tashow-data-excel</module>
<module>tashow-data-es</module> <module>tashow-data-es</module>
<module>tashow-data-canal</module>
</modules> </modules>

View File

@@ -108,6 +108,25 @@
<artifactId>jackson-datatype-jsr310</artifactId> <artifactId>jackson-datatype-jsr310</artifactId>
<scope>provided</scope> <!-- 设置为 provided只有工具类需要使用到 --> <scope>provided</scope> <!-- 设置为 provided只有工具类需要使用到 -->
</dependency> </dependency>
<!-- pagehelper 分页插件 -->
<!-- <dependency>
<groupId>com.github.pagehelper</groupId>
<artifactId>pagehelper-spring-boot-starter</artifactId>
<version>1.4.6</version>
<exclusions>
<exclusion>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
</exclusion>
</exclusions>
</dependency>-->
<!--常用工具类 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>

View File

@@ -3,6 +3,7 @@ package com.tashow.cloud.common.util.date;
import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.date.LocalDateTimeUtil;
import java.time.*; import java.time.*;
import java.time.temporal.ChronoUnit;
import java.util.Calendar; import java.util.Calendar;
import java.util.Date; import java.util.Date;
@@ -27,6 +28,9 @@ public class DateUtils {
public static final String FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND = "yyyy-MM-dd HH:mm:ss"; public static final String FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND = "yyyy-MM-dd HH:mm:ss";
// 默认数据保留天数
private static final long RETENTION_DAYS = 90;
/** /**
* 将 LocalDateTime 转换成 Date * 将 LocalDateTime 转换成 Date
* *
@@ -146,4 +150,36 @@ public class DateUtils {
return LocalDateTimeUtil.isSameDay(date, LocalDateTime.now().minusDays(1)); return LocalDateTimeUtil.isSameDay(date, LocalDateTime.now().minusDays(1));
} }
/**
* 根据删除时间,计算还剩多少天被彻底删除(默认保留 90 天)
*
* @param deleteTime 删除时间
* @return 剩余天数(>=00 表示已过期
*/
public static long getRemainingDays(Date deleteTime) {
if (deleteTime == null) {
throw new IllegalArgumentException("删除时间不能为 null");
}
// 将 Date 转换为 LocalDateTime
LocalDateTime deleteDateTime = deleteTime.toInstant()
.atZone(ZoneId.systemDefault())
.toLocalDateTime();
// 当前时间
LocalDateTime now = LocalDateTime.now();
// 到期时间 = 删除时间 + 保留天数
LocalDateTime expireTime = deleteDateTime.plusDays(RETENTION_DAYS);
// 如果当前时间已经超过到期时间,剩余天数为 0
if (now.isAfter(expireTime)) {
return 0;
}
// 计算剩余天数(向下取整,不进位)
return ChronoUnit.DAYS.between(now, expireTime);
}
} }

View File

@@ -0,0 +1,64 @@
/*
* Copyright (c) 2018-2999 广州市蓝海创新科技有限公司 All rights reserved.
*
* https://www.mall4j.com/
*
* 未经允许,不可做商业用途!
*
* 版权所有,侵权必究!
*/
package com.tashow.cloud.common.util.serializer;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* @author lanhai
*/
@Component
public class ImgJsonSerializer extends JsonSerializer<String> {
/* @Autowired
private Qiniu qiniu;
@Autowired
private ImgUploadUtil imgUploadUtil;*/
@Override
public void serialize(String value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
/*if (StrUtil.isBlank(value)) {
gen.writeString(StrUtil.EMPTY);
return;
}
String[] imgs = value.split(StrUtil.COMMA);
StringBuilder sb = new StringBuilder();
String resourceUrl = "";
String rule="^((http[s]{0,1})://)";
Pattern pattern= Pattern.compile(rule);
if (Objects.equals(imgUploadUtil.getUploadType(), 2)) {
resourceUrl = qiniu.getResourcesUrl();
} else if (Objects.equals(imgUploadUtil.getUploadType(), 1)) {
resourceUrl = imgUploadUtil.getResourceUrl();
}
for (String img : imgs) {
Matcher matcher = pattern.matcher(img);
//若图片以http或https开头直接返回
if (matcher.find()){
sb.append(img).append(StrUtil.COMMA);
}else {
sb.append(resourceUrl).append(img).append(StrUtil.COMMA);
}
}
sb.deleteCharAt(sb.length()-1);
gen.writeString(sb.toString());*/
}
}

View File

@@ -1 +0,0 @@
<http://www.iocoder.cn/Spring-Boot/Validation/?yudao>

View File

@@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.tashow.cloud</groupId>
<artifactId>tashow-framework</artifactId>
<version>${revision}</version>
</parent>
<artifactId>tashow-data-canal</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>canal 封装拓展</description>
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
<!-- Dynamic-Datasource Starter -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>4.2.0</version> <!-- 推荐使用稳定版本 -->
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version> <!-- 最新版本 -->
</dependency>
<!-- 芋道基础依赖 -->
<dependency>
<groupId>com.tashow.cloud</groupId>
<artifactId>tashow-common</artifactId>
</dependency>
<!-- 其他必要依赖 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,22 @@
package com.tashow.cloud.canal.config;
import com.tashow.cloud.canal.service.CanalSyncService;
import com.tashow.cloud.canal.service.SqlExecutorService;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@AutoConfiguration
public class CanalAutoConfiguration {
@Bean
public CanalSyncService canalSyncService() {
return new CanalSyncService();
}
@Bean
public SqlExecutorService getdb() {
return new SqlExecutorService();
}
}

View File

@@ -0,0 +1,188 @@
package com.tashow.cloud.canal.service;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
@Service
public class CanalSyncService {
private static final Logger log = LoggerFactory.getLogger(CanalSyncService.class);
private static final Queue<SqlTask> SQL_QUEUE = new ConcurrentLinkedQueue<>();
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private SqlExecutorService sqlExecutorService;
@PostConstruct
public void start() {
new Thread(this::runCanalClient).start();
}
private void runCanalClient() {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("43.139.42.137", 11111),
"example",
"",
""
);
int batchSize = 1000;
try {
connector.connect();
connector.subscribe("tashow-platform\\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(batchSize);
log.info("Received message id: {}, entries size: {}", message.getId(), message.getEntries().size());
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(1000);
continue;
}
dataHandle(message.getEntries());
connector.ack(batchId);
if (!SQL_QUEUE.isEmpty()) {
executeQueueSql();
}
}
} catch (Exception e) {
log.error("Canal client error occurred.", e);
} finally {
connector.disconnect();
}
}
private void dataHandle(List<Entry> entries) {
for (Entry entry : entries) {
if (entry.getEntryType() != EntryType.ROWDATA) continue;
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType();
String schemaName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
log.info("schema: {}, table: {}, type: {}", schemaName, tableName, eventType);
if (eventType == EventType.DELETE) {
saveDeleteSql(entry);
} else if (eventType == EventType.UPDATE) {
saveUpdateSql(entry);
} else if (eventType == EventType.INSERT) {
saveInsertSql(entry);
}
} catch (Exception e) {
log.error("Error handling entry: {}", entry.toString(), e);
}
}
}
private void saveInsertSql(Entry entry) throws Exception {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
String tableName = entry.getHeader().getTableName();
for (RowData rowData : rowChange.getRowDatasList()) {
List<Column> columns = rowData.getAfterColumnsList();
List<String> columnNames = new ArrayList<>();
List<Object> values = new ArrayList<>();
for (Column col : columns) {
columnNames.add(col.getName());
values.add(col.getValue());
}
String sql = "INSERT INTO " + tableName + " (" +
String.join(",", columnNames) + ") VALUES (";
StringBuilder placeholders = new StringBuilder();
for (int i = 0; i < values.size(); i++) {
placeholders.append("?,");
}
if (placeholders.length() > 0) placeholders.deleteCharAt(placeholders.length() - 1);
sql += placeholders + ")";
SQL_QUEUE.add(new SqlTask(sql, values.toArray()));
}
}
private void saveUpdateSql(Entry entry) throws Exception {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
String tableName = entry.getHeader().getTableName();
for (RowData rowData : rowChange.getRowDatasList()) {
List<Column> newColumns = rowData.getAfterColumnsList();
List<Column> oldColumns = rowData.getBeforeColumnsList();
List<String> updateColumns = new ArrayList<>();
List<Object> params = new ArrayList<>();
for (Column col : newColumns) {
updateColumns.add(col.getName() + "=?");
params.add(col.getValue());
}
Optional<Column> primaryKeyOpt = oldColumns.stream().filter(Column::getIsKey).findFirst();
Column primaryKey = primaryKeyOpt.orElseThrow(() -> new RuntimeException("未找到主键"));
params.add(primaryKey.getValue());
String sql = "UPDATE " + tableName + " SET " +
String.join(",", updateColumns) +
" WHERE " + primaryKey.getName() + "=?";
SQL_QUEUE.add(new SqlTask(sql, params.toArray()));
}
}
private void saveDeleteSql(Entry entry) throws Exception {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
String tableName = entry.getHeader().getTableName();
for (RowData rowData : rowChange.getRowDatasList()) {
List<Column> beforeColumns = rowData.getBeforeColumnsList();
Optional<Column> primaryKeyOpt = beforeColumns.stream().filter(Column::getIsKey).findFirst();
Column primaryKey = primaryKeyOpt.orElseThrow(() -> new RuntimeException("未找到主键"));
String sql = "DELETE FROM " + tableName + " WHERE " + primaryKey.getName() + "=?";
SQL_QUEUE.add(new SqlTask(sql, primaryKey.getValue()));
}
}
private void executeQueueSql() {
List<SqlTask> tasks = new ArrayList<>();
SqlTask task;
while ((task = SQL_QUEUE.poll()) != null) {
tasks.add(task);
}
if (!tasks.isEmpty()) {
sqlExecutorService.executeBatch(tasks);
}
}
}

View File

@@ -0,0 +1,156 @@
/*
package com.tashow.cloud.canal.service;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@Service
public class CanalSyncServiceTest {
private static final Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private Canaldb canaldb;
@PostConstruct
public void start() {
new Thread(this::runCanalClient).start();
}
private void runCanalClient() {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("43.139.42.137", 11111),
"example",
"",
""
);
int batchSize = 1000;
try {
connector.connect();
connector.subscribe("tashow-platform\\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(batchSize);
System.out.println("Received message id: " + message.getId() + ", entries size: " + message.getEntries().size());
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(1000);
continue;
}
dataHandle(message.getEntries());
connector.ack(batchId);
if (SQL_QUEUE.size() > 0) {
executeQueueSql();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
private void dataHandle(List<Entry> entries) {
for (Entry entry : entries) {
if (entry.getEntryType() != EntryType.ROWDATA) continue;
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType();
String schemaName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
System.out.println("schema: " + schemaName + ", table: " + tableName + ", type: " + eventType);
if (eventType == EventType.DELETE) {
saveDeleteSql(entry);
} else if (eventType == EventType.UPDATE) {
saveUpdateSql(entry);
} else if (eventType == EventType.INSERT) {
saveInsertSql(entry);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void saveInsertSql(Entry entry) throws Exception {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
for (RowData rowData : rowChange.getRowDatasList()) {
List<Column> columns = rowData.getAfterColumnsList();
StringBuilder sql = new StringBuilder("INSERT INTO ")
.append(entry.getHeader().getTableName()).append(" (")
.append(columns.stream().map(Column::getName).reduce((a, b) -> a + "," + b).orElse(""))
.append(") VALUES (")
.append(columns.stream().map(c -> "'" + c.getValue() + "'").reduce((a, b) -> a + "," + b).orElse(""))
.append(");");
SQL_QUEUE.add(sql.toString());
}
}
private void saveUpdateSql(Entry entry) throws Exception {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
for (RowData rowData : rowChange.getRowDatasList()) {
List<Column> newColumns = rowData.getAfterColumnsList();
List<Column> oldColumns = rowData.getBeforeColumnsList();
StringBuilder setClause = new StringBuilder();
for (Column col : newColumns) {
setClause.append(col.getName()).append("='").append(col.getValue()).append("', ");
}
if (setClause.length() > 0) setClause.setLength(setClause.length() - 2);
String whereClause = oldColumns.stream()
.filter(Column::getIsKey)
.map(c -> c.getName() + "='" + c.getValue() + "'")
.findFirst()
.orElseThrow(() -> new RuntimeException("未找到主键"));
SQL_QUEUE.add("UPDATE " + entry.getHeader().getTableName() + " SET " + setClause + " WHERE " + whereClause + ";");
}
}
private void saveDeleteSql(Entry entry) throws Exception {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
for (RowData rowData : rowChange.getRowDatasList()) {
String whereClause = rowData.getBeforeColumnsList().stream()
.filter(Column::getIsKey)
.map(c -> c.getName() + "='" + c.getValue() + "'")
.findFirst()
.orElseThrow(() -> new RuntimeException("未找到主键"));
SQL_QUEUE.add("DELETE FROM " + entry.getHeader().getTableName() + " WHERE " + whereClause + ";");
}
}
private void executeQueueSql() {
int size = SQL_QUEUE.size();
for (int i = 0; i < size; i++) {
String sql = SQL_QUEUE.poll();
canaldb.execute(sql);
}
}
}
*/

View File

@@ -0,0 +1,27 @@
/*
package com.tashow.cloud.canal.service;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
@Service
public class Canaldb {
@Autowired
private JdbcTemplate jdbcTemplate;
@DS("slave")
public void execute(String sql) {
try {
String ds = DynamicDataSourceContextHolder.peek(); // 调试查看当前数据源
System.out.println("当前数据源:" + ds);
System.out.println("[execute]----> " + sql);
jdbcTemplate.execute(sql);
} catch (Exception e) {
e.printStackTrace();
}
}
}
*/

View File

@@ -0,0 +1,56 @@
package com.tashow.cloud.canal.service;
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
@Service
public class SqlExecutorService {
private static final Logger log = LoggerFactory.getLogger(SqlExecutorService.class);
@Autowired
private JdbcTemplate jdbcTemplate;
public void executeBatch(List<SqlTask> tasks) {
if (tasks == null || tasks.isEmpty()) return;
DynamicDataSourceContextHolder.push("slave");
try {
// 提取所有 SQL 模板(假设它们都是一样的)
String sqlTemplate = tasks.get(0).getSql();
// 执行批量更新
jdbcTemplate.batchUpdate(sqlTemplate, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
SqlTask task = tasks.get(i);
Object[] args = task.getArgs();
for (int j = 0; j < args.length; j++) {
ps.setObject(j + 1, args[j]);
}
}
@Override
public int getBatchSize() {
return tasks.size();
}
});
log.info("✅ 成功执行 {} 条 SQL", tasks.size());
} catch (Exception e) {
log.error("❌ 批量执行 SQL 失败", e);
} finally {
DynamicDataSourceContextHolder.poll();
}
}
}

View File

@@ -0,0 +1,19 @@
package com.tashow.cloud.canal.service;
public class SqlTask {
private final String sql;
private final Object[] args;
public SqlTask(String sql, Object... args) {
this.sql = sql;
this.args = args;
}
public String getSql() {
return sql;
}
public Object[] getArgs() {
return args;
}
}

View File

@@ -0,0 +1,32 @@
package com.tashow.cloud.canal.service;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
public class SqlTaskQueue {
private static final Queue<SqlTask> queue = new ConcurrentLinkedQueue<>();
public static void add(SqlTask task) {
queue.add(task);
}
public static boolean isEmpty() {
return queue.isEmpty();
}
public static SqlTask poll() {
return queue.poll();
}
public static int size() {
return queue.size();
}
public static List<SqlTask> drainAll() {
List<SqlTask> list = new java.util.ArrayList<>();
queue.forEach(list::add);
queue.clear();
return list;
}
}

View File

@@ -0,0 +1 @@
com.tashow.cloud.canal.config.CanalAutoConfiguration

View File

@@ -37,6 +37,11 @@
<artifactId>ojdbc8</artifactId> <artifactId>ojdbc8</artifactId>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<!--<dependency>
<groupId>com.github.jsqlparser</groupId>
<artifactId>jsqlparser</artifactId>
<version>4.5</version>
</dependency>-->
<dependency> <dependency>
<groupId>org.postgresql</groupId> <groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId> <artifactId>postgresql</artifactId>

View File

@@ -51,6 +51,6 @@ public abstract class BaseDO implements Serializable, TransPojo {
* 是否删除 * 是否删除
*/ */
@TableLogic @TableLogic
private Boolean deleted; private Integer deleted;
} }

View File

@@ -1 +0,0 @@
<http://www.iocoder.cn/Spring-Boot/MyBatis/?yudao>

View File

@@ -1 +0,0 @@
<http://www.iocoder.cn/Spring-Boot/dynamic-datasource/?yudao>

View File

@@ -1 +0,0 @@
<http://www.iocoder.cn/Spring-Boot/datasource-pool/?yudao>

View File

@@ -1 +0,0 @@
<http://www.iocoder.cn/Spring-Boot/Cache/?yudao>

View File

@@ -1 +0,0 @@
<http://www.iocoder.cn/Spring-Boot/Redis/?yudao>

View File

@@ -1 +0,0 @@
<http://www.iocoder.cn/Spring-Boot/Job/?yudao>

View File

@@ -1 +0,0 @@
<http://www.iocoder.cn/Spring-Boot/Async-Job/?yudao>

View File

@@ -1 +0,0 @@
<https://www.iocoder.cn/Spring-Boot/Admin/?yudao>

View File

@@ -1 +0,0 @@
<https://www.iocoder.cn/Spring-Boot/Actuator/?yudao>

View File

@@ -1 +0,0 @@
<http://www.iocoder.cn/Spring-Boot/SkyWalking/?yudao>

View File

@@ -2,30 +2,59 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" <project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent> <parent>
<groupId>com.tashow.cloud</groupId>
<artifactId>tashow-framework</artifactId> <artifactId>tashow-framework</artifactId>
<groupId>com.tashow.cloud</groupId>
<version>${revision}</version> <version>${revision}</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>tashow-framework-mq</artifactId> <artifactId>tashow-framework-mq</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>${project.artifactId}</name> <name>${project.artifactId}</name>
<description>消息队列,支持 Redis、RocketMQ、RabbitMQ、Kafka 四种</description> <description>消息队列模块基于RabbitMQ等中间件</description>
<url>https://github.com/tashow/tashow-platform</url>
<dependencies> <dependencies>
<!-- DB 相关 --> <!-- RabbitMQ -->
<dependency> <dependency>
<groupId>com.tashow.cloud</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>tashow-data-redis</artifactId> <artifactId>spring-boot-starter-amqp</artifactId>
<optional>true</optional>
</dependency> </dependency>
<!-- Web Services -->
<dependency> <dependency>
<groupId>org.springframework.amqp</groupId> <groupId>org.springframework</groupId>
<artifactId>spring-rabbit</artifactId> <artifactId>spring-web</artifactId>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>jakarta.servlet</groupId>
<artifactId>jakarta.servlet-api</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.tashow.cloud</groupId>
<artifactId>tashow-data-mybatis</artifactId>
</dependency>
<dependency>
<groupId>org.jodd</groupId>
<artifactId>jodd-util</artifactId>
<version>6.3.0</version>
<scope>compile</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@@ -0,0 +1,46 @@
package com.tashow.cloud.mq.core;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* MQ消息基类
*
*
* @author tashow
*/
@Data
public class BaseMqMessage implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 消息ID默认为UUID
*/
private Integer id = UUID.randomUUID().hashCode();
/**
* 消息状态码
*/
private Integer statusCode;
/**
* 消息重试次数
*/
private Integer retryCount = 0;
/**
* 消息错误信息
*/
private String errorMessage;
/**
* 消息创建时间
*/
private Date createTime = new Date();
/**
* 扩展数据
*/
private Map<String, Object> extraData = new HashMap<>();
}

View File

@@ -0,0 +1,31 @@
package com.tashow.cloud.mq.handler;
/**
* 消息记录处理接口
*
* @author tashow
*/
public interface FailRecordHandler {
/**
* 保存消息记录
*
* @param exchange 交换机
* @param routingKey 路由键
* @param cause 失败原因可为null
* @param messageContent 消息内容
* @param status 状态0-未处理1-处理成功2-处理失败
*/
void saveMessageRecord(Integer id, String exchange, String routingKey, String cause, String messageContent, int status);
/**
* 更新消息状态
*
* @param id 关联ID
*/
void updateMessageStatus(Integer id);
/**
* 更新消息状态并设置失败原因
*/
void updateMessageStatusWithCause(Integer id, String causes);
}

View File

@@ -1,28 +1,16 @@
package com.tashow.cloud.mq.rabbitmq.config; package com.tashow.cloud.mq.rabbitmq.config;
import org.slf4j.Logger;
import lombok.extern.slf4j.Slf4j; import org.slf4j.LoggerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
/** /**
* RabbitMQ 消息队列配置类 * RabbitMQ 消息队列自动配置类
* *
* @author 芋道源码 * @author tashow
*/ */
@AutoConfiguration @AutoConfiguration
@Slf4j
@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate") @ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
public class RabbitMQAutoConfiguration { public class RabbitMQAutoConfiguration extends RabbitMQConfiguration {
private static final Logger log = LoggerFactory.getLogger(RabbitMQAutoConfiguration.class);
/**
* Jackson2JsonMessageConverter Bean使用 jackson 序列化消息
*/
@Bean
public MessageConverter createMessageConverter() {
return new Jackson2JsonMessageConverter();
}
} }

View File

@@ -0,0 +1,29 @@
package com.tashow.cloud.mq.rabbitmq.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ 配置类
*
* @author tashow
*/
@Configuration
@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
public class RabbitMQConfiguration {
/**
* 创建消息转换器
*
* @return MessageConverter
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}

View File

@@ -0,0 +1,89 @@
package com.tashow.cloud.mq.rabbitmq.consumer;
import com.rabbitmq.client.Channel;
import com.tashow.cloud.mq.core.BaseMqMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
/**
* RabbitMQ消息消费者抽象类
*
* @param <T> 消息类型
* @author tashow
*/
public abstract class AbstractRabbitMQConsumer<T extends BaseMqMessage> {
private static final Logger log = LoggerFactory.getLogger(AbstractRabbitMQConsumer.class);
/**
* 消息状态:成功
*/
public static final int STATUS_SUCCESS = 20;
/**
* 消息状态:消费异常
*/
public static final int STATUS_SEND_EXCEPTION = 30;
/**
* 埋点处理消息
*
* @param message 消息对象
* @return 处理结果true表示处理成功false表示处理失败
*/
public abstract boolean processMessage(T message);
/**
* 消息处理入口
*
* @param message 消息对象
* @param channel 通道
* @param deliveryTag 投递标签
*/
public void onMessage(T message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
message.setStatusCode(STATUS_SUCCESS);
try {
if(true){
throw new RuntimeException("测试异常");
}
processMessage(message);
safeChannelAck(channel, deliveryTag);
} catch (Exception e) {
message.setStatusCode(STATUS_SEND_EXCEPTION);
processMessage( message);
safeChannelAck(channel, deliveryTag);
}
}
/**
* 安全确认消息
*
* @param channel 通道
* @param deliveryTag 投递标签
*/
protected void safeChannelAck(Channel channel, long deliveryTag) {
try {
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("[MQ消费者] 确认消息失败: {}", e.getMessage());
}
}
/**
* 安全拒绝消息
*
* @param channel 通道
* @param deliveryTag 投递标签
* @param multiple 是否批量
* @param requeue 是否重新入队
*/
protected void safeChannelNack(Channel channel, long deliveryTag, boolean multiple, boolean requeue) {
try {
channel.basicNack(deliveryTag, multiple, requeue);
} catch (Exception e) {
log.error("[MQ消费者] 拒绝消息失败: {}", e.getMessage());
}
}
}

View File

@@ -1,4 +0,0 @@
/**
* 占位符,无特殊逻辑
*/
package com.tashow.cloud.mq.rabbitmq.core;

View File

@@ -1,4 +0,0 @@
/**
* 消息队列,基于 RabbitMQ 提供
*/
package com.tashow.cloud.mq.rabbitmq;

View File

@@ -0,0 +1,97 @@
package com.tashow.cloud.mq.rabbitmq.producer;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.tashow.cloud.common.util.json.JsonUtils;
import com.tashow.cloud.mq.core.BaseMqMessage;
import com.tashow.cloud.mq.handler.FailRecordHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import jakarta.annotation.PostConstruct;
import java.util.UUID;
/**
* RabbitMQ消息生产者抽象类
*
* @param <T> 消息类型
* @author tashow
*/
public abstract class AbstractRabbitMQProducer<T extends BaseMqMessage>
implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
private static final Logger log = LoggerFactory.getLogger(AbstractRabbitMQProducer.class);
@Autowired
protected RabbitTemplate rabbitTemplate;
@Autowired(required = false)
protected FailRecordHandler failRecordHandler;
/**
* 初始化RabbitTemplate
*/
@PostConstruct
public void initRabbitTemplate() {
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(this);
rabbitTemplate.setConfirmCallback(this);
if (rabbitTemplate.isConfirmListener()) {
log.info("[MQ生产者] 确认回调已正确配置");
} else {
log.error("[MQ生产者] 确认回调配置失败");
}
}
/**
* 异步发送消息使用指定的correlationId
*
* @param message 消息对象
*/
public void asyncSendMessage(T message) {
try {
String messageJson = JsonUtils.toJsonString(message);
CorrelationData correlationData = new CorrelationData(messageJson);
failRecordHandler.saveMessageRecord(
message.getId(),
getExchange(),
getRoutingKey(),
null,
messageJson,
0
);
rabbitTemplate.convertAndSend(getExchange(), getRoutingKey(), message, correlationData);
} catch (Exception e) {
throw e;
}
}
/**
* 获取交换机名称
*
* @return 交换机名称
*/
public abstract String getExchange();
/**
* 获取路由键
*
* @return 路由键
*/
public abstract String getRoutingKey();
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
JSONObject jsonObject = JSON.parseObject(correlationData.getId());
Integer id = jsonObject.getInteger("id");
if (ack) {
failRecordHandler.updateMessageStatus(id);
} else {
failRecordHandler.updateMessageStatusWithCause(id,cause);
}
}
}

View File

@@ -0,0 +1,49 @@
package com.tashow.cloud.mq.retry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* 消息重试任务抽象实现
*
* @param <T> 失败记录类型
* @author tashow
*/
public abstract class AbstractMessageRetryTask<T> {
private static final Logger log = LoggerFactory.getLogger(AbstractMessageRetryTask.class);
/**
* 获取消息重试服务
*
* @return 消息重试服务
*/
protected abstract MessageRetryService<T> getMessageRetryService();
/**
* 获取记录ID
*
* @param record 记录对象
* @return 记录ID
*/
protected abstract Integer getRecordId(T record);
/**
* 执行重试
*/
public void retryFailedMessages() {
try {
List<T> unprocessedRecords = getMessageRetryService().getUnprocessedRecords();
for (T record : unprocessedRecords) {
Integer recordId = getRecordId(record);
getMessageRetryService().retryFailedMessage(recordId);
}
} catch (Exception e) {
log.error("[MQ重试] 执行消息重试任务异常", e);
}
}
}

View File

@@ -0,0 +1,28 @@
package com.tashow.cloud.mq.retry;
import java.util.List;
/**
* 消息重试服务接口
*
* @param <T> 失败记录类型
* @author tashow
*/
public interface MessageRetryService<T> {
/**
* 获取未处理的失败记录
*
* @return 失败记录列表
*/
List<T> getUnprocessedRecords();
/**
* 重试失败消息
*
* @param recordId 记录ID
* @return 重试结果
*/
void retryFailedMessage(Integer recordId);
}

View File

@@ -1 +0,0 @@
<http://www.iocoder.cn/Spring-Boot/RocketMQ/?yudao>

View File

@@ -1 +0,0 @@
<http://www.iocoder.cn/Spring-Boot/Kafka/?yudao>

View File

@@ -1 +0,0 @@
<http://www.iocoder.cn/Spring-Boot/RabbitMQ/?yudao>

View File

@@ -1 +0,0 @@
<http://www.iocoder.cn/Spring-Boot/RocketMQ/?yudao>

View File

@@ -1 +0,0 @@
<http://www.iocoder.cn/Spring-Boot/Feign/?yudao>

View File

@@ -1 +0,0 @@
<http://www.iocoder.cn/Spring-Cloud/Feign/?yudao>

View File

@@ -149,10 +149,6 @@ public class WebSecurityConfigurerAdapter {
return httpSecurity.build(); return httpSecurity.build();
} }
private String buildAppApi(String url) {
return webProperties.getAppApi().getPrefix() + url;
}
private Multimap<HttpMethod, String> getPermitAllUrlsFromAnnotations() { private Multimap<HttpMethod, String> getPermitAllUrlsFromAnnotations() {
Multimap<HttpMethod, String> result = HashMultimap.create(); Multimap<HttpMethod, String> result = HashMultimap.create();
// 获得接口对应的 HandlerMethod 集合 // 获得接口对应的 HandlerMethod 集合

View File

@@ -1,2 +0,0 @@
* 芋道 Spring Security 入门:<http://www.iocoder.cn/Spring-Boot/Spring-Security/?yudao>
* Spring Security 基本概念:<http://www.iocoder.cn/Fight/Spring-Security-4-1-0-Basic-concept-description/?yudao>

View File

@@ -63,7 +63,7 @@ public class ApiAccessLogInterceptor implements HandlerInterceptor {
@Override @Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) { public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) {
// 打印 response 日志 // 打印 response 日志ss
if (!SpringUtils.isProd()) { if (!SpringUtils.isProd()) {
StopWatch stopWatch = (StopWatch) request.getAttribute(ATTRIBUTE_STOP_WATCH); StopWatch stopWatch = (StopWatch) request.getAttribute(ATTRIBUTE_STOP_WATCH);
stopWatch.stop(); stopWatch.stop();

View File

@@ -1 +0,0 @@
<http://www.iocoder.cn/Spring-Boot/Swagger/?yudao>

View File

@@ -1 +0,0 @@
<http://www.iocoder.cn/Spring-Boot/SpringMVC/?yudao>

View File

@@ -1 +0,0 @@
<http://www.iocoder.cn/Spring-Boot/WebSocket/?yudao>

View File

@@ -57,6 +57,8 @@
<groupId>com.google.guava</groupId> <groupId>com.google.guava</groupId>
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@@ -1,5 +1,4 @@
package com.tashow.cloud.gateway; package com.tashow.cloud.gateway;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;

View File

@@ -7,10 +7,10 @@ spring:
username: nacos # Nacos 账号 username: nacos # Nacos 账号
password: nacos # Nacos 密码 password: nacos # Nacos 密码
discovery: # 【配置中心】配置项 discovery: # 【配置中心】配置项
namespace: liwq # 命名空间。这里使用 dev 开发环境 namespace: 76667956-2ac2-4e05-906b-4bca4ebcc5f0 # 命名空间。这里使用 dev 开发环境
group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP
config: # 【注册中心】配置项 config: # 【注册中心】配置项
namespace: liwq # 命名空间。这里使用 dev 开发环境 namespace: 76667956-2ac2-4e05-906b-4bca4ebcc5f0 # 命名空间。这里使用 dev 开发环境
group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP
# 日志文件配置 # 日志文件配置

View File

@@ -14,6 +14,7 @@
<module>tashow-module-system</module> <module>tashow-module-system</module>
<module>tashow-module-infra</module> <module>tashow-module-infra</module>
<module>tashow-module-app</module> <module>tashow-module-app</module>
<module>tashow-module-product</module>
</modules> </modules>
</project> </project>

View File

@@ -28,7 +28,10 @@
<groupId>com.tashow.cloud</groupId> <groupId>com.tashow.cloud</groupId>
<artifactId>tashow-framework-rpc</artifactId> <artifactId>tashow-framework-rpc</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.tashow.cloud</groupId>
<artifactId>tashow-data-mybatis</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.tashow.cloud</groupId> <groupId>com.tashow.cloud</groupId>
<artifactId>tashow-framework-web</artifactId> <artifactId>tashow-framework-web</artifactId>
@@ -42,7 +45,6 @@
<artifactId>tashow-infra-api</artifactId> <artifactId>tashow-infra-api</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.tashow.cloud</groupId> <groupId>com.tashow.cloud</groupId>
<artifactId>tashow-framework-websocket</artifactId> <artifactId>tashow-framework-websocket</artifactId>
@@ -55,7 +57,23 @@
<groupId>com.tashow.cloud</groupId> <groupId>com.tashow.cloud</groupId>
<artifactId>tashow-framework-security</artifactId> <artifactId>tashow-framework-security</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>com.tashow.cloud</groupId>
<artifactId>tashow-feishu-sdk</artifactId>
<version>1.0.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
<dependency>
<groupId>com.tashow.cloud</groupId>
<artifactId>tashow-data-redis</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@@ -2,12 +2,15 @@ package com.tashow.cloud.app;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;
/** /**
* Hello world! * 应用服务启动类
*
*/ */
@SpringBootApplication @SpringBootApplication
@EnableScheduling
@ComponentScan(basePackages = {"com.tashow.cloud.app", "com.tashow.cloud.sdk.feishu"})
public class AppServerApplication { public class AppServerApplication {
public static void main(String[] args) { public static void main(String[] args) {

View File

@@ -0,0 +1,19 @@
package com.tashow.cloud.app.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 应用配置类
*/
@Configuration
public class AppConfig {
/**
* 提供ObjectMapper bean用于JSON处理
*/
@Bean
public ObjectMapper objectMapper() {
return new ObjectMapper();
}
}

View File

@@ -0,0 +1,64 @@
package com.tashow.cloud.app.controller;
import cn.hutool.json.JSONObject;
import com.lark.oapi.core.utils.Decryptor;
import com.tashow.cloud.app.service.feishu.FeiShuCardDataService;
import com.tashow.cloud.sdk.feishu.client.FeiShuAlertClient;
import com.tashow.cloud.sdk.feishu.config.LarkConfig;
import jakarta.annotation.security.PermitAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
@RestController
public class FeishuController {
private static final Logger log = LoggerFactory.getLogger(FeishuController.class);
private static final String ACTION_COMPLETE_ALARM = "complete_alarm";
private final FeiShuAlertClient feiShuAlertClient;
private final FeiShuCardDataService feiShuCardDataService;
private final LarkConfig larkConfig;
@Autowired
public FeishuController(FeiShuAlertClient feiShuAlertClient, FeiShuCardDataService feiShuCardDataService, LarkConfig larkConfig) {
this.feiShuAlertClient = feiShuAlertClient;
this.feiShuCardDataService = feiShuCardDataService;
this.larkConfig = larkConfig;
}
@RequestMapping("/card")
@PermitAll
public String card(@RequestBody JSONObject data) {
try {
if (data.containsKey("app_id") && data.containsKey("action")) {
JSONObject action = data.getJSONObject("action");
JSONObject value = action.getJSONObject("value");
if (value != null && ACTION_COMPLETE_ALARM.equals(value.getStr("action"))) {
String messageId = data.getStr("open_message_id");
Map<String, Object> templateData = feiShuCardDataService.getCardData(messageId);
templateData.put("open_id", data.getStr("open_id"));
templateData.put("complete_time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
JSONObject fromValue = action.getJSONObject("form_value");
templateData.put("notes", fromValue.getStr("notes_input"));
return feiShuAlertClient.buildCardWithData(larkConfig.getSuccessCards(), templateData);
}
}
if (data.containsKey("encrypt")) {
Decryptor decryptor = new Decryptor(larkConfig.getEncryptKey());
return decryptor.decrypt(data.getStr("encrypt"));
}
return "{}";
} catch (Exception e) {
log.error("卡片处理异常", e);
return "{\"code\":1,\"msg\":\"处理异常: " + e.getMessage() + "\"}";
}
}
}

View File

@@ -0,0 +1,25 @@
package com.tashow.cloud.app.controller;
import com.tashow.cloud.app.mapper.BuriedPointMapper;
import com.tashow.cloud.app.mq.producer.buriedPoint.BuriedPointProducer;
import jakarta.annotation.security.PermitAll;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
import java.util.Map;
/**
* 测试控制器
*/
@RestController
@RequiredArgsConstructor
public class TestController {
/**
* 基础测试接口
*/
@GetMapping("/test")
@PermitAll
public String test() {
return "test";
}
}

View File

@@ -0,0 +1,46 @@
package com.tashow.cloud.app.interceptor;
import cn.hutool.core.util.IdUtil;
import com.tashow.cloud.app.mq.message.BuriedMessages;
import com.tashow.cloud.app.mq.producer.buriedPoint.BuriedPointProducer;
import com.tashow.cloud.common.util.json.JsonUtils;
import com.tashow.cloud.common.util.servlet.ServletUtils;
import com.tashow.cloud.common.util.spring.SpringUtils;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.servlet.HandlerInterceptor;
import java.net.InetAddress;
import java.net.UnknownHostException;
/**
* 后端静默埋点拦截器
* 用于收集API请求信息并异步发送到消息队列
*/
@Slf4j
@RequiredArgsConstructor
public class BuriedPointInterceptor implements HandlerInterceptor {
private final BuriedPointProducer buriedPointProducer;
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
if (!(handler instanceof HandlerMethod handlerMethod)) {
return true;
}
BuriedMessages message = new BuriedMessages(
request,
handlerMethod
);
buriedPointProducer.asyncSendMessage(message);
return true;
}
}

View File

@@ -0,0 +1,13 @@
package com.tashow.cloud.app.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.tashow.cloud.app.model.MqMessageRecord;
import org.apache.ibatis.annotations.Mapper;
/**
* 埋点消息发送记录Mapper接口
*/
@Mapper
public interface BuriedPointFailRecordMapper extends BaseMapper<MqMessageRecord> {
}

View File

@@ -0,0 +1,14 @@
package com.tashow.cloud.app.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.tashow.cloud.app.model.BuriedPoint;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
@Mapper
public interface BuriedPointMapper extends BaseMapper<BuriedPoint> {
@Select("SELECT * FROM app_burying WHERE event_id = #{eventId} LIMIT 1")
BuriedPoint selectByEventId(@Param("eventId") Integer eventId);
}

View File

@@ -0,0 +1,128 @@
package com.tashow.cloud.app.model;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.tashow.cloud.app.mq.message.BuriedMessages;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import java.util.Date;
/**
* 埋点数据实体类
*/
@Data
@NoArgsConstructor
@Accessors(chain = true)
@TableName(value = "app_burying")
public class BuriedPoint {
/**
* 主键ID
*/
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
/**
* 事件唯一ID
*/
@TableField(value = "event_id")
private Integer eventId;
/**
* 事件时间戳
*/
@TableField(value = "event_time")
private Long eventTime;
/**
* 服务名称
*/
@TableField(value = "service")
private String service;
/**
* 方法/接口
*/
@TableField(value = "method")
private String method;
/**
* 用户标识
*/
@TableField(value = "user_id")
private String userId;
/**
* 会话标识
*/
@TableField(value = "session_id")
private String sessionId;
/**
* 客户端IP
*/
@TableField(value = "client_ip")
private String clientIp;
/**
* 服务器IP
*/
@TableField(value = "server_ip")
private String serverIp;
/**
* 事件类型
*/
@TableField(value = "event_type")
private String eventType;
/**
* 页面路径/功能模块
*/
@TableField(value = "page_path")
private String pagePath;
/**
* 元素标识
*/
@TableField(value = "element_id")
private String elementId;
/**
* 操作时长(毫秒)
*/
@TableField(value = "duration")
private Long duration;
/**
* 创建时间
*/
@TableField(value = "create_time")
private Date createTime;
@TableField(value = "update_time")
private Date updateTime;
@TableField(value = "status")
private Integer status;
public BuriedPoint(BuriedMessages message) {
this.eventId = message.getId();
this.eventTime = System.currentTimeMillis();
this.userId = message.getUserId();
this.eventType = message.getEventType();
this.service = message.getService();
this.method = message.getMethod();
this.sessionId = message.getSessionId();
this.clientIp = message.getClientIp();
this.serverIp = message.getServerIp();
this.status = message.getStatusCode();
this.pagePath = message.getPagePath();
this.elementId = message.getElementId();
this.createTime = new Date();
this.updateTime = new Date();
}
}

View File

@@ -0,0 +1,65 @@
package com.tashow.cloud.app.model;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.util.Date;
/**
* 埋点消息发送失败记录实体类
*/
@Data
@TableName("mq_message_record")
public class MqMessageRecord {
/**
* 状态常量定义
*/
public static final int STATUS_UNPROCESSED = 10; // 未处理
public static final int STATUS_SUCCESS = 20; // 处理成功
public static final int STATUS_FAILED = 30; // 发送失败
@TableId
private Integer id;
/**
* 交换机名称
*/
private String exchange;
/**
* 路由键
*/
private String routingKey;
/**
* 失败原因
*/
private String cause;
/**
* 消息内容
*/
private String messageContent;
/**
* 重试次数
*/
private Integer retryCount;
/**
* 状态0-未处理1-处理中2-处理成功3-处理失败
*/
private Integer status;
/**
* 创建时间
*/
private Date createTime;
/**
* 更新时间
*/
private Date updateTime;
}

View File

@@ -0,0 +1,78 @@
package com.tashow.cloud.app.mq.config;
import com.tashow.cloud.app.interceptor.BuriedPointInterceptor;
import com.tashow.cloud.app.mq.message.BuriedMessages;
import com.tashow.cloud.app.mq.producer.buriedPoint.BuriedPointProducer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
/**
* 埋点功能配置类
*/
@Slf4j
@Configuration
@RequiredArgsConstructor
public class BuriedPointConfiguration implements WebMvcConfigurer {
private final BuriedPointProducer buriedPointProducer;
/**
* 创建埋点队列
*/
@Bean
public Queue buriedPointQueue() {
return new Queue(BuriedMessages.QUEUE, true, false, false);
}
/**
* 创建埋点交换机
*/
@Bean
public DirectExchange buriedPointExchange() {
return new DirectExchange(BuriedMessages.EXCHANGE, true, false);
}
/**
* 创建埋点绑定关系
*/
@Bean
public Binding buriedPointBinding() {
return BindingBuilder.bind(buriedPointQueue())
.to(buriedPointExchange())
.with(BuriedMessages.ROUTING_KEY);
}
/**
* 创建埋点拦截器
*/
@Bean
public BuriedPointInterceptor buriedPointInterceptor() {
return new BuriedPointInterceptor(buriedPointProducer);
}
/**
* 注册埋点拦截器
*/
@Override
public void addInterceptors(InterceptorRegistry registry) {
// 注册拦截器,拦截所有请求
registry.addInterceptor(buriedPointInterceptor())
// 可以根据需要添加或排除特定路径
.addPathPatterns("/**")
// 排除静态资源、Swagger等路径
.excludePathPatterns(
"/swagger-ui/**",
"/swagger-resources/**",
"/v3/api-docs/**",
"/webjars/**",
"/static/**",
"/card",
"/error"
);
}
}

View File

@@ -0,0 +1,69 @@
package com.tashow.cloud.app.mq.consumer.buriedPoint;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.tashow.cloud.app.mapper.BuriedPointMapper;
import com.tashow.cloud.app.mapper.BuriedPointFailRecordMapper;
import com.tashow.cloud.app.model.BuriedPoint;
import com.tashow.cloud.app.model.MqMessageRecord;
import com.tashow.cloud.app.mq.message.BuriedMessages;
import com.tashow.cloud.app.service.feishu.BuriedPointMonitorService;
import com.rabbitmq.client.Channel;
import com.tashow.cloud.mq.rabbitmq.consumer.AbstractRabbitMQConsumer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.util.Date;
import org.springframework.dao.DuplicateKeyException;
/**
* 埋点消息消费者
*/
@Component
@RabbitListener(queues = BuriedMessages.QUEUE)
@Slf4j
@RequiredArgsConstructor
public class BuriedPointConsumer extends AbstractRabbitMQConsumer<BuriedMessages> {
private final BuriedPointMapper buriedPointMapper;
private final BuriedPointMonitorService buriedPointMonitorService;
@Value("${spring.application.name:tashow-app}")
private String applicationName;
@RabbitHandler
public void handleMessage(BuriedMessages message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
onMessage(message, channel, deliveryTag);
}
/**
* 处理埋点消息
* @param message 消息对象
* @return
*/
@Override
public boolean processMessage(BuriedMessages message) {
try {
BuriedPoint existingPoint = buriedPointMapper.selectByEventId(message.getId());
if (existingPoint != null) {
existingPoint.setStatus(message.getStatusCode());
existingPoint.setUpdateTime(new Date());
return buriedPointMapper.updateById(existingPoint) > 0;
}
BuriedPoint buriedPoint = new BuriedPoint(message);
buriedPoint.setService(applicationName);
buriedPointMapper.insert(buriedPoint);
if(buriedPoint.getStatus() == BuriedMessages.STATUS_ERROR){
buriedPointMonitorService.checkFailRecordsAndAlert("埋点数据处理异常");
}
return true;
} catch (DuplicateKeyException e) {
return true;
} catch (Exception e) {
log.error("[埋点消费者] 保存数据失败", e);
throw e;
}
}
}

View File

@@ -0,0 +1,100 @@
package com.tashow.cloud.app.mq.handler;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.tashow.cloud.app.mapper.BuriedPointFailRecordMapper;
import com.tashow.cloud.app.model.MqMessageRecord;
import com.tashow.cloud.app.service.feishu.BuriedPointMonitorService;
import com.tashow.cloud.mq.handler.FailRecordHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* MQ消息记录处理器
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class BuriedPointFailRecordHandler implements FailRecordHandler {
private final BuriedPointFailRecordMapper buriedPointFailRecordMapper;
private final BuriedPointMonitorService buriedPointMonitorService;
/**
* 保存消息记录=
*/
@Override
public void saveMessageRecord(Integer id, String exchange, String routingKey, String cause, String messageContent, int status) {
try {
MqMessageRecord existingRecord = findExistingRecord(id);
if (existingRecord != null) {
existingRecord.setRetryCount(existingRecord.getRetryCount() + 1);
existingRecord.setMessageContent(messageContent);
existingRecord.setStatus(status);
existingRecord.setCause(cause);
existingRecord.setUpdateTime(new Date());
buriedPointFailRecordMapper.updateById(existingRecord);
} else {
MqMessageRecord record = new MqMessageRecord();
record.setId(id);
record.setExchange(exchange);
record.setRoutingKey(routingKey);
record.setCause(cause);
record.setMessageContent(messageContent);
record.setRetryCount(0);
record.setStatus(status);
record.setCreateTime(new Date());
record.setUpdateTime(new Date());
buriedPointFailRecordMapper.insert(record);
if (status == MqMessageRecord.STATUS_FAILED) {
buriedPointMonitorService.checkFailRecordsAndAlert(cause);
}
}
} catch (Exception e) {
log.error("[MQ消息处理器] 保存消息记录异常", e);
}
}
/**
* 更新消息状态
*/
@Override
public void updateMessageStatus(Integer id) {
try {
MqMessageRecord record = findExistingRecord(id);
if (record != null) {
record.setStatus(MqMessageRecord.STATUS_SUCCESS);
record.setUpdateTime(new Date());
buriedPointFailRecordMapper.updateById(record);
}
} catch (Exception e) {
log.error("[MQ消息处理器] 更新消息状态异常: {}", id, e);
}
}
/**
* 更新消息状态并设置失败原因
*/
@Override
public void updateMessageStatusWithCause(Integer id, String cause) {
try {
MqMessageRecord record = findExistingRecord(id);
if (record != null) {
record.setStatus(MqMessageRecord.STATUS_FAILED);
record.setCause(cause);
record.setUpdateTime(new Date());
buriedPointFailRecordMapper.updateById(record);
buriedPointMonitorService.checkFailRecordsAndAlert(cause);
}
} catch (Exception e) {
log.error("[MQ消息处理器] 更新消息状态和原因异常: {}", id, e);
}
}
/**
* 查找已存在的失败记录
*/
private MqMessageRecord findExistingRecord(Integer id) {
LambdaQueryWrapper<MqMessageRecord> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(MqMessageRecord::getId, id);
return buriedPointFailRecordMapper.selectOne(queryWrapper);
}
}

View File

@@ -0,0 +1,142 @@
package com.tashow.cloud.app.mq.message;
import cn.hutool.core.util.IdUtil;
import com.tashow.cloud.common.util.json.JsonUtils;
import com.tashow.cloud.common.util.servlet.ServletUtils;
import com.tashow.cloud.common.util.spring.SpringUtils;
import com.tashow.cloud.mq.core.BaseMqMessage;
import jakarta.servlet.http.HttpServletRequest;
import lombok.Data;
import org.springframework.web.method.HandlerMethod;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import static com.tashow.cloud.web.apilog.core.interceptor.ApiAccessLogInterceptor.ATTRIBUTE_HANDLER_METHOD;
/**
* 埋点消息
*/
@Data
public class BuriedMessages extends BaseMqMessage {
private static final String ATTRIBUTE_REQUEST_ID = "BuriedPoint.RequestId";
/**
* 交换机名称
*/
public static final String EXCHANGE = "tashow.buried.point.exchange";
/**
* 队列名称
*/
public static final String QUEUE = "tashow.buried.point.queue";
/**
* 路由键
*/
public static final String ROUTING_KEY = "tashow.buried.point.routing.key";
/**
* 消息状态:处理中
*/
public static final int STATUS_PROCESSING = 10;
/**
* 消息状态:成功
*/
public static final int STATUS_SUCCESS = 20;
/**
* 消息状态:失败
*/
public static final int STATUS_ERROR = 30;
/**
* 事件时间
*/
private Date eventTime;
/**
* 用户ID
*/
private String userId;
/**
* 事件类型
*/
private String eventType;
/**
* 方法名称
*/
private String method;
/**
* 会话ID
*/
private String sessionId;
/**
* 客户端IP
*/
private String clientIp;
/**
* 服务端IP
*/
private String serverIp;
/**
* 页面路径
*/
private String pagePath;
/**
* 元素ID
*/
private String elementId;
/**
* 服务名称
*/
private String service;
public BuriedMessages() {
}
/**
* 从请求创建埋点消息
*
* @param request HTTP请求
* @param handlerMethod 处理方法
*/
public BuriedMessages(HttpServletRequest request, HandlerMethod handlerMethod) {
try {
int requestId = (int)(Math.abs(IdUtil.getSnowflakeNextId()) % Integer.MAX_VALUE);
this.setId(requestId);
this.eventTime = new Date();
this.service = SpringUtils.getApplicationName();
this.method = request.getMethod() + " " + request.getRequestURI() +
JsonUtils.toJsonString(request.getParameterMap());
Object userId = request.getSession().getAttribute("USER_ID");
this.userId = userId != null ? userId.toString() : "anonymous";
this.sessionId = request.getSession().getId();
this.clientIp = ServletUtils.getClientIP(request);
try {
this.serverIp = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
this.serverIp = "unknown";
}
String controllerName = handlerMethod.getBeanType().getSimpleName();
String actionName = handlerMethod.getMethod().getName();
this.pagePath = controllerName + "#" + actionName;
this.eventType = "API_REQUEST_START";
this.setStatusCode(STATUS_PROCESSING);
request.setAttribute(ATTRIBUTE_REQUEST_ID, this.getId());
} catch (Exception e) {
throw new RuntimeException("创建埋点消息失败", e);
}
}
}

View File

@@ -0,0 +1,31 @@
package com.tashow.cloud.app.mq.producer.buriedPoint;
import com.tashow.cloud.app.mq.message.BuriedMessages;
import com.tashow.cloud.common.util.json.JsonUtils;
import com.tashow.cloud.mq.rabbitmq.producer.AbstractRabbitMQProducer;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.stereotype.Component;
/**
* 埋点消息生产者
*/
@Component
public class BuriedPointProducer extends AbstractRabbitMQProducer<BuriedMessages> {
@Override
public void returnedMessage(ReturnedMessage returned) {
}
@Override
public String getExchange() {
return BuriedMessages.EXCHANGE;
}
@Override
public String getRoutingKey() {
return BuriedMessages.ROUTING_KEY;
}
}

View File

@@ -20,7 +20,6 @@ public class SecurityConfiguration {
@Bean("infraAuthorizeRequestsCustomizer") @Bean("infraAuthorizeRequestsCustomizer")
public AuthorizeRequestsCustomizer authorizeRequestsCustomizer() { public AuthorizeRequestsCustomizer authorizeRequestsCustomizer() {
return new AuthorizeRequestsCustomizer() { return new AuthorizeRequestsCustomizer() {
@Override @Override
public void customize(AuthorizeHttpRequestsConfigurer<HttpSecurity>.AuthorizationManagerRequestMatcherRegistry registry) { public void customize(AuthorizeHttpRequestsConfigurer<HttpSecurity>.AuthorizationManagerRequestMatcherRegistry registry) {
// Spring Boot Actuator 的安全配置 // Spring Boot Actuator 的安全配置

View File

@@ -0,0 +1,241 @@
package com.tashow.cloud.app.service.feishu;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.tashow.cloud.app.mapper.BuriedPointFailRecordMapper;
import com.tashow.cloud.app.mapper.BuriedPointMapper;
import com.tashow.cloud.app.model.BuriedPoint;
import com.tashow.cloud.app.model.MqMessageRecord;
import com.tashow.cloud.app.mq.message.BuriedMessages;
import com.tashow.cloud.sdk.feishu.client.FeiShuAlertClient;
import com.tashow.cloud.sdk.feishu.config.LarkConfig;
import com.tashow.cloud.sdk.feishu.util.ChartImageGenerator;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* 埋点监控服务
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class BuriedPointMonitorService {
private static final int ALERT_THRESHOLD = 3;
private static final int MONITORING_HOURS = 12;
private final Map<String, Long> alertCache = new ConcurrentHashMap<>();
private final BuriedPointFailRecordMapper buriedPointFailRecordMapper;
private final BuriedPointMapper buriedPointMapper;
private final FeiShuAlertClient feiShuAlertClient;
private final FeiShuCardDataService feiShuCardDataService;
private final LarkConfig larkConfig;
/**
* 检查失败记录并发送告警
*/
public boolean checkFailRecordsAndAlert(String cause) {
try {
Date now = new Date();
Date hoursAgo = getDateHoursAgo(now, MONITORING_HOURS);
boolean sentAlert = false;
List<Date[]> timeRanges = getHourRanges(hoursAgo, now);
long mqFailCount = countFailures(buriedPointFailRecordMapper, MqMessageRecord.class, hoursAgo, now);
long buriedFailCount = countFailures(buriedPointMapper, BuriedPoint.class, hoursAgo, now);
if (mqFailCount > ALERT_THRESHOLD||buriedFailCount > ALERT_THRESHOLD) {
if (!hasRecentlySentAlert(cause)) {
sendAlert(mqFailCount, cause, getMqStats(timeRanges));
alertCache.put(cause, System.currentTimeMillis());
sentAlert = true;
}
}
return sentAlert;
} catch (Exception e) {
log.error("[埋点监控] 检查失败记录异常", e);
return false;
}
}
/**
* 检查是否最近已发送过相同类型的告警
*/
private boolean hasRecentlySentAlert(String alertType) {
Long lastSentTime = alertCache.get(alertType);
if (lastSentTime == null) {
return false;
}
long hourInMillis = MONITORING_HOURS * 60 * 60 * 1000L;
return (System.currentTimeMillis() - lastSentTime) < hourInMillis;
}
/**
* 获取消息队列统计数据
*/
private List<ChartImageGenerator.MonitoringDataPoint> getMqStats(List<Date[]> timeRanges) {
Map<Date, Integer> successData = batchQueryMqStatus(timeRanges, MqMessageRecord.STATUS_SUCCESS);
Map<Date, Integer> failedData = batchQueryMqFailures(timeRanges);
SimpleDateFormat timeFormat = new SimpleDateFormat("HH:00");
return timeRanges.stream()
.map(range -> new ChartImageGenerator.MonitoringDataPoint(
timeFormat.format(range[0]),
successData.getOrDefault(range[0], 0),
failedData.getOrDefault(range[0], 0)
))
.toList();
}
/**
* 获取埋点表统计数据
*/
private List<ChartImageGenerator.MonitoringDataPoint> getBuriedStats(List<Date[]> timeRanges) {
// 批量查询每个时间区间的数据
Map<Date, Integer> successData = batchQueryBuriedStatus(timeRanges, BuriedMessages.STATUS_SUCCESS);
Map<Date, Integer> failedData = batchQueryBuriedStatus(timeRanges, BuriedMessages.STATUS_ERROR);
SimpleDateFormat timeFormat = new SimpleDateFormat("HH:00");
return timeRanges.stream()
.map(range -> new ChartImageGenerator.MonitoringDataPoint(
timeFormat.format(range[0]),
successData.getOrDefault(range[0], 0),
failedData.getOrDefault(range[0], 0)
))
.toList();
}
/**
* 查询MQ状态数据
*/
private Map<Date, Integer> batchQueryMqStatus(List<Date[]> timeRanges, int status) {
Map<Date, Integer> result = new HashMap<>();
for (Date[] range : timeRanges) {
LambdaQueryWrapper<MqMessageRecord> query = new LambdaQueryWrapper<>();
query.ge(MqMessageRecord::getCreateTime, range[0])
.lt(MqMessageRecord::getCreateTime, range[1])
.eq(MqMessageRecord::getStatus, status);
result.put(range[0], buriedPointFailRecordMapper.selectCount(query).intValue());
}
return result;
}
/**
* 批量查询MQ失败数据
*/
private Map<Date, Integer> batchQueryMqFailures(List<Date[]> timeRanges) {
Map<Date, Integer> result = new HashMap<>();
for (Date[] range : timeRanges) {
LambdaQueryWrapper<MqMessageRecord> query = new LambdaQueryWrapper<>();
query.ge(MqMessageRecord::getCreateTime, range[0])
.lt(MqMessageRecord::getCreateTime, range[1])
.in(MqMessageRecord::getStatus, Arrays.asList(
MqMessageRecord.STATUS_UNPROCESSED, MqMessageRecord.STATUS_FAILED));
result.put(range[0], buriedPointFailRecordMapper.selectCount(query).intValue());
}
return result;
}
/**
* 批量查询埋点状态数据
*/
private Map<Date, Integer> batchQueryBuriedStatus(List<Date[]> timeRanges, int status) {
Map<Date, Integer> result = new HashMap<>();
for (Date[] range : timeRanges) {
LambdaQueryWrapper<BuriedPoint> query = new LambdaQueryWrapper<>();
query.ge(BuriedPoint::getCreateTime, range[0])
.lt(BuriedPoint::getCreateTime, range[1])
.eq(BuriedPoint::getStatus, status);
result.put(range[0], buriedPointMapper.selectCount(query).intValue());
}
return result;
}
/**
* 计算失败数量
*/
private <T> long countFailures(Object mapper, Class<T> entityClass, Date startDate, Date endDate) {
try {
if (entityClass == BuriedPoint.class) {
LambdaQueryWrapper<BuriedPoint> query = new LambdaQueryWrapper<>();
query.ge(BuriedPoint::getCreateTime, startDate)
.le(BuriedPoint::getCreateTime, endDate)
.eq(BuriedPoint::getStatus, BuriedMessages.STATUS_ERROR);
return ((BuriedPointMapper)mapper).selectCount(query);
} else {
LambdaQueryWrapper<MqMessageRecord> query = new LambdaQueryWrapper<>();
query.ge(MqMessageRecord::getCreateTime, startDate)
.le(MqMessageRecord::getCreateTime, endDate)
.eq(MqMessageRecord::getStatus, MqMessageRecord.STATUS_FAILED);
return ((BuriedPointFailRecordMapper)mapper).selectCount(query);
}
} catch (Exception e) {
return 0;
}
}
/**
* 发送告警
*/
private void sendAlert(long failCount, String alertMsg, List<ChartImageGenerator.MonitoringDataPoint> data) {
try {
String imageKey = feiShuAlertClient.uploadImage(data, alertMsg);
String title = alertMsg.split(":")[0].trim();
Map<String, Object> templateData = Map.of(
"alert_title", title,
"image_key", Map.of("img_key", imageKey),
"current_time", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()),
"fail_count", failCount
);
String messageId = feiShuAlertClient.sendCardMessage(
larkConfig.getChatId(),
larkConfig.getExceptionCards(),
new HashMap<>(templateData)
);
feiShuCardDataService.saveCardData(messageId, templateData);
} catch (Exception e) {
log.error("[埋点监控] 发送告警失败: {}", e.getMessage());
}
}
/**
* 获取小时范围列表
*/
private List<Date[]> getHourRanges(Date startDate, Date endDate) {
List<Date[]> ranges = new ArrayList<>();
Calendar cal = Calendar.getInstance();
cal.setTime(endDate);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
Date endHour = cal.getTime();
cal.add(Calendar.HOUR_OF_DAY, -(MONITORING_HOURS - 1));
Date startHour = startDate.after(cal.getTime()) ? startDate : cal.getTime();
cal.setTime(startHour);
while (!cal.getTime().after(endHour)) {
Date hourStart = cal.getTime();
cal.add(Calendar.HOUR_OF_DAY, 1);
Date hourEnd = cal.getTime().after(endDate) ? endDate : cal.getTime();
ranges.add(new Date[]{hourStart, hourEnd});
if (hourEnd.equals(endDate)) break;
}
return ranges;
}
/**
* 获取指定时间前N小时的时间
*/
private Date getDateHoursAgo(Date date, int hours) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
calendar.add(Calendar.HOUR_OF_DAY, -hours);
return calendar.getTime();
}
}

View File

@@ -0,0 +1,73 @@
package com.tashow.cloud.app.service.feishu;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 飞书卡片数据处理服务
*/
@Service
public class FeiShuCardDataService {
private static final Logger log = LoggerFactory.getLogger(FeiShuCardDataService.class);
private static final String REDIS_KEY_PREFIX = "feishu:card:";
private static final int CARD_EXPIRATION_DAYS = 30;
private final StringRedisTemplate stringRedisTemplate;
private final ObjectMapper objectMapper;
@Autowired
public FeiShuCardDataService(StringRedisTemplate stringRedisTemplate, ObjectMapper objectMapper) {
this.stringRedisTemplate = stringRedisTemplate;
this.objectMapper = objectMapper;
}
/**
* 保存卡片数据到Redis
*/
public boolean saveCardData(String messageId, Map<String, Object> data) {
if (messageId == null || data == null) return false;
try {
String jsonData = objectMapper.writeValueAsString(data);
stringRedisTemplate.opsForValue().set(
REDIS_KEY_PREFIX + messageId,
jsonData,
CARD_EXPIRATION_DAYS,
TimeUnit.DAYS
);
return true;
} catch (JsonProcessingException e) {
log.error("保存卡片数据失败: {}", e.getMessage());
return false;
}
}
/**
* 从Redis获取卡片数据
*/
public Map<String, Object> getCardData(String messageId) {
try {
String redisKey = REDIS_KEY_PREFIX + messageId;
String jsonData = stringRedisTemplate.opsForValue().get(redisKey);
if (jsonData == null) return new HashMap<>();
@SuppressWarnings("unchecked")
Map<String, Object> result = objectMapper.readValue(jsonData, Map.class);
return result;
} catch (Exception e) {
log.error("获取卡片数据失败: {}", e.getMessage());
return new HashMap<>();
}
}
}

View File

@@ -0,0 +1,53 @@
package com.tashow.cloud.app.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.tashow.cloud.app.mapper.BuriedPointFailRecordMapper;
import com.tashow.cloud.app.model.MqMessageRecord;
import com.tashow.cloud.app.mq.message.BuriedMessages;
import com.tashow.cloud.app.mq.producer.buriedPoint.BuriedPointProducer;
import com.tashow.cloud.common.util.json.JsonUtils;
import com.tashow.cloud.mq.retry.MessageRetryService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.List;
/**
* 埋点失败记录服务
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class BuriedPointFailRecordService implements MessageRetryService<MqMessageRecord> {
private final BuriedPointFailRecordMapper buriedPointFailRecordMapper;
private final BuriedPointProducer buriedPointProducer;
/**
* 获取未处理的失败记录
*/
@Override
public List<MqMessageRecord> getUnprocessedRecords() {
LambdaQueryWrapper<MqMessageRecord> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(MqMessageRecord::getStatus, MqMessageRecord.STATUS_FAILED)
.orderByAsc(MqMessageRecord::getCreateTime);
return buriedPointFailRecordMapper.selectList(queryWrapper);
}
/**
* 重试失败消息
*/
@Override
public void retryFailedMessage(Integer recordId) {
try {
Long id = Long.valueOf(recordId);
MqMessageRecord record = buriedPointFailRecordMapper.selectById(id);
BuriedMessages message = JsonUtils.parseObject(record.getMessageContent(), BuriedMessages.class);
buriedPointProducer.asyncSendMessage(message);
} catch (Exception e) {
log.error("[埋点重试] 重试失败", e);
}
}
}

View File

@@ -0,0 +1,41 @@
package com.tashow.cloud.app.task;
import com.tashow.cloud.app.model.MqMessageRecord;
import com.tashow.cloud.app.service.impl.BuriedPointFailRecordService;
import com.tashow.cloud.mq.retry.AbstractMessageRetryTask;
import com.tashow.cloud.mq.retry.MessageRetryService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* 埋点消息重试定时任务
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class BuriedPointRetryTask extends AbstractMessageRetryTask<MqMessageRecord> {
private final BuriedPointFailRecordService buriedPointFailRecordService;
/**
* 定时重试失败消息
* 每天凌晨执行一次
*/
@Scheduled(cron = "0 0 0 * * ?")
// @Scheduled(cron = "0/10 * * * * ?")
public void execute() {
retryFailedMessages();
}
@Override
protected MessageRetryService<MqMessageRecord> getMessageRetryService() {
return buriedPointFailRecordService;
}
@Override
protected Integer getRecordId(MqMessageRecord record) {
return record.getId();
}
}

View File

@@ -7,10 +7,10 @@ spring:
username: nacos # Nacos 账号 username: nacos # Nacos 账号
password: nacos # Nacos 密码 password: nacos # Nacos 密码
discovery: # 【配置中心】配置项 discovery: # 【配置中心】配置项
namespace: liwq # 命名空间。这里使用 dev 开发环境 namespace: 5c8b8fe6-9a89-4ae3-975e-ef3bf560ff82 # 命名空间。这里使用 dev 开发环境
group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP
metadata: metadata:
version: 1.0.0 # 服务实例的版本号,可用于灰度发布 version: 1.0.0 # 服务实例的版本号,可用于灰度发布
config: # 【注册中心】配置项 config: # 【注册中心】配置项
namespace: liwq # 命名空间。这里使用 dev 开发环境 namespace: 5c8b8fe6-9a89-4ae3-975e-ef3bf560ff82 # 命名空间。这里使用 dev 开发环境
group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP

View File

@@ -1,12 +1,12 @@
## AdoptOpenJDK 停止发布 OpenJDK 二进制,而 Eclipse Temurin 是它的延伸,提供更好的稳定性 ## AdoptOpenJDK 停止发布 OpenJDK 二进制,而 Eclipse Temurin 是它的延伸,提供更好的稳定性
## 感谢复旦核博士的建议!灰子哥,牛皮! ## 感谢复旦核博士的建议!灰子哥,牛皮!
FROM eclipse-temurin:21-jre FROM eclipse-temurin:17-jre
## 创建目录,并使用它作为工作目录 ## 创建目录,并使用它作为工作目录
RUN mkdir -p /yudao-module-infra-biz RUN mkdir -p /home/java-work/system-infra
WORKDIR /yudao-module-infra-biz WORKDIR /home/java-work/system-infra
## 将后端项目的 Jar 文件,复制到镜像中 ## 将后端项目的 Jar 文件,复制到镜像中
COPY ./target/yudao-module-infra-biz.jar app.jar COPY ./target/tashow-module-infra.jar app.jar
## 设置 TZ 时区 ## 设置 TZ 时区
## 设置 JAVA_OPTS 环境变量,可通过 docker run -e "JAVA_OPTS=" 进行覆盖 ## 设置 JAVA_OPTS 环境变量,可通过 docker run -e "JAVA_OPTS=" 进行覆盖

View File

@@ -37,6 +37,14 @@
<version>${revision}</version> <version>${revision}</version>
</dependency> </dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
<!-- 业务组件 --> <!-- 业务组件 -->
<dependency> <dependency>
<groupId>com.tashow.cloud</groupId> <groupId>com.tashow.cloud</groupId>
@@ -69,6 +77,13 @@
<artifactId>tashow-data-redis</artifactId> <artifactId>tashow-data-redis</artifactId>
</dependency> </dependency>
<!--<dependency>
<groupId>com.tashow.cloud</groupId>
<artifactId>tashow-data-canal</artifactId>
<version>1.0.0</version>
<scope>compile</scope>
</dependency>-->
<!-- RPC 远程调用相关 --> <!-- RPC 远程调用相关 -->
<dependency> <dependency>
<groupId>com.tashow.cloud</groupId> <groupId>com.tashow.cloud</groupId>

View File

@@ -0,0 +1,204 @@
package com.tashow.cloud.infra.framework;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@Component
public class CanalClient {
//sql队列
private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
/**
* canal入库方法
*/
public void run() {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("43.139.42.137",
11111), "example", "", "");
int batchSize = 1000;
try {
connector.connect();
// connector.subscribe(".*\\..*");
connector.subscribe("tashow-platform");
connector.rollback();
try {
while (true) {
//尝试从master那边拉去数据batchSize条记录有多少取多少
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(1000);
} else {
dataHandle(message.getEntries());
}
connector.ack(batchId);
//当队列里面堆积的sql大于一定数值的时候就模拟执行
if (SQL_QUEUE.size() >= 1) {
executeQueueSql();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
} finally {
connector.disconnect();
}
}
/**
* 模拟执行队列里面的sql语句
*/
public void executeQueueSql() {
int size = SQL_QUEUE.size();
for (int i = 0; i < size; i++) {
String sql = SQL_QUEUE.poll();
System.out.println("[sql]----> " + sql);
this.execute(sql);
}
}
/**
* 数据处理
*
* @param entrys
*/
private void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException {
for (Entry entry : entrys) {
if(entry.getHeader().getSchemaName().equals("hc")){
return;
}
if (EntryType.ROWDATA == entry.getEntryType()) {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType();
if (eventType == EventType.DELETE) {
saveDeleteSql(entry);
} else if (eventType == EventType.UPDATE) {
saveUpdateSql(entry);
} else if (eventType == EventType.INSERT) {
saveInsertSql(entry);
}
}
}
}
/**
* 保存更新语句
*
* @param entry
*/
private void saveUpdateSql(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> newColumnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");
for (int i = 0; i < newColumnList.size(); i++) {
sql.append(" " + newColumnList.get(i).getName()
+ " = '" + newColumnList.get(i).getValue() + "'");
if (i != newColumnList.size() - 1) {
sql.append(",");
}
}
sql.append(" where ");
List<Column> oldColumnList = rowData.getBeforeColumnsList();
for (Column column : oldColumnList) {
if (column.getIsKey()) {
//暂时只支持单一主键
sql.append(column.getName() + "=" + column.getValue());
break;
}
}
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/**
* 保存删除语句
*
* @param entry
*/
private void saveDeleteSql(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> columnList = rowData.getBeforeColumnsList();
StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");
for (Column column : columnList) {
if (column.getIsKey()) {
//暂时只支持单一主键
sql.append(column.getName() + "=" + column.getValue());
break;
}
}
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/**
* 保存插入语句
*
* @param entry
*/
private void saveInsertSql(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> columnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");
for (int i = 0; i < columnList.size(); i++) {
sql.append(columnList.get(i).getName());
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(") VALUES (");
for (int i = 0; i < columnList.size(); i++) {
sql.append("'" + columnList.get(i).getValue() + "'");
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(")");
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/**
* 入库
* @param sql
*/
public void execute(String sql) {
System.out.println("sql======="+sql);
}
}

View File

@@ -7,10 +7,11 @@ spring:
username: nacos # Nacos 账号 username: nacos # Nacos 账号
password: nacos # Nacos 密码 password: nacos # Nacos 密码
discovery: # 【配置中心】配置项 discovery: # 【配置中心】配置项
namespace: liwq # 命名空间。这里使用 dev 开发环境 namespace: 76667956-2ac2-4e05-906b-4bca4ebcc5f0 # 命名空间。这里使用 dev 开发环境
group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP
metadata: metadata:
version: 1.0.0 # 服务实例的版本号,可用于灰度发布 version: 1.0.0 # 服务实例的版本号,可用于灰度发布
config: # 【注册中心】配置项 config: # 【注册中心】配置项
namespace: liwq # 命名空间。这里使用 dev 开发环境 namespace: 76667956-2ac2-4e05-906b-4bca4ebcc5f0 # 命名空间。这里使用 dev 开发环境
group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP

Some files were not shown because too many files have changed in this diff Show More