Compare commits
39 Commits
master
...
cde19e180a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cde19e180a | ||
|
|
360c497c49 | ||
|
|
038a09f286 | ||
|
|
bd9c07313f | ||
|
|
fb863ef9d1 | ||
|
|
d92b69e1f1 | ||
|
|
2f54798561 | ||
|
|
d80bea35c6 | ||
|
|
083b4e0bf1 | ||
|
|
4675e14813 | ||
|
|
cdaeb3d908 | ||
|
|
baaec3f5dc | ||
|
|
61f5816910 | ||
|
|
203749552d | ||
|
|
d36f914aa3 | ||
|
|
d07d884286 | ||
|
|
ba3fe6a242 | ||
| 94f5254e5c | |||
|
|
5e5c13329f | ||
|
|
10f8e8251b | ||
| bb4432e643 | |||
| 3d072958d6 | |||
| 98bb3529ea | |||
| e384dc1163 | |||
| c2adf5155f | |||
| a55fa06c74 | |||
| 731eaf3629 | |||
| 3af438bcb7 | |||
| 8e2c28ef36 | |||
| d010e55b76 | |||
| 71519a730b | |||
| 4708c10358 | |||
| 55a99fdf7b | |||
| 0d168cc260 | |||
| 2bbecd241a | |||
| 6010f4efe9 | |||
| 01671a3ed2 | |||
| e731ef8bcd | |||
| 6230c36cf2 |
48
.cursor/rules/1.mdc
Normal file
48
.cursor/rules/1.mdc
Normal file
@@ -0,0 +1,48 @@
|
||||
---
|
||||
description:
|
||||
globs:
|
||||
alwaysApply: false
|
||||
---
|
||||
---
|
||||
description:
|
||||
globs:
|
||||
alwaysApply: false
|
||||
---
|
||||
|
||||
# Your rule content
|
||||
#角色
|
||||
你是一名精通开发的高级工程师,拥有10年以上的应用开发经验,熟悉*等开发工具和技术栈。
|
||||
你的任务是帮助用户设计和开发易用且易于推护的 *** 应用。始终遵循最佳实践,并坚持干净代码和健壮架构的原则。
|
||||
|
||||
#目标
|
||||
你的目标是以用户容易理解的方式帮助他们完成“应用的设计和开发工作,确保应用功能完善、性能优异、用户体验良好。
|
||||
|
||||
#要求
|
||||
在理解用户需求、设计UI、编写代码、解决问题和项目选代优化时,你应该始终遵循以下原则:
|
||||
|
||||
|
||||
##需求理解
|
||||
-充分理解用户需求,站在用户角度思考,分析需求是否存在缺漏,并与用户讨论完善需求;
|
||||
-选择最简单的解决方案来满足用户需求,避免过度设计。
|
||||
##UI和样式设计
|
||||
-使用现代UI框架进行样式设计(例如***,这里可以根据不同开发项目仔纽展开,比如使用哪些视觉规范或者UI框架,没有的话也可以不用过多展开);
|
||||
-在不同平台上实现一致的设计和响应式模式
|
||||
##代码编写
|
||||
技术选型:根据项目需求选择合适的技术栈(例如***,这里需要仔细展开,比如介招某个技术栈用在什么地方,以及要遵循什么最佳实践)
|
||||
代码结构:强调代码的清晰性、模块化、可维护性,遵循最佳实践(如DRY原则、最小权限原则、响应式设计等)
|
||||
-代码安全性:在编写代码时,始终考虑安全性,避免引入漏洞,确保用户输入的安全处理
|
||||
-性能优化:优化代码的性能,减少资源占用,提升加载速度,确保项目的高效运行
|
||||
-测试与文档:编写单元测试,确保代码的健壮性,并提供清晰的中文注释和文档。方便后续阅读和维护
|
||||
##问题解决
|
||||
-全面阅读相关代码,理解***应用的工作原理
|
||||
-根据用户的反馈分析问题的原因,提出解决问题的思路
|
||||
-确保每次代码变更不会破坏现有功能,且尽可能保持最小的改动
|
||||
##迭代优化
|
||||
与用户保持密切沟通,根据反读调整功能和设计,确保应用符合用户需求
|
||||
在不确定需求时,主动询问用户以澄清需求或技术细节
|
||||
##方法论
|
||||
-系统2思维:以分析严谨的方式解决问题。将需求分解为更小、可管理的部分,并在实施前仔细考虑每一步
|
||||
思维树:评估多种可能的解决方案及其后果。使用结构化的方法探索不同的路径。并选择最优的解决方案
|
||||
-选代改进:在最终确定代码之前,考虑改进、边缘情况和优化。通过潜在增强的迭代,确保最终解决方案是健壮的
|
||||
|
||||
|
||||
2
.lingma/rules/project_rule.md
Normal file
2
.lingma/rules/project_rule.md
Normal file
@@ -0,0 +1,2 @@
|
||||
**添加规则文件可帮助模型精准理解你的编码偏好,如框架、代码风格等**
|
||||
**规则文件只对当前工程生效,单文件限制10000字符。如果无需将该文件提交到远程 Git 仓库,请将其添加到 .gitignore**
|
||||
BIN
logs/gateway-server.log.2025-07-25.0.gz
Normal file
BIN
logs/gateway-server.log.2025-07-25.0.gz
Normal file
Binary file not shown.
BIN
logs/gateway-server.log.2025-07-28.0.gz
Normal file
BIN
logs/gateway-server.log.2025-07-28.0.gz
Normal file
Binary file not shown.
Binary file not shown.
Binary file not shown.
BIN
logs/infra-server.log.2025-07-25.0.gz
Normal file
BIN
logs/infra-server.log.2025-07-25.0.gz
Normal file
Binary file not shown.
BIN
logs/infra-server.log.2025-07-28.0.gz
Normal file
BIN
logs/infra-server.log.2025-07-28.0.gz
Normal file
Binary file not shown.
BIN
logs/system-server.log.2025-07-25.0.gz
Normal file
BIN
logs/system-server.log.2025-07-25.0.gz
Normal file
Binary file not shown.
BIN
logs/system-server.log.2025-07-28.0.gz
Normal file
BIN
logs/system-server.log.2025-07-28.0.gz
Normal file
Binary file not shown.
@@ -1,3 +1,3 @@
|
||||
暂未适配 IBM DB2 数据库,如果你有需要,可以微信联系 wangwenbin-server 一起建设。
|
||||
|
||||
你需要把表结构与数据导入到 DM 数据库,我来测试与适配代码。
|
||||
你需要把表结构与数据导入到 DM 数据库,我a来测试与适配代码。
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
<modules>
|
||||
<module>tashow-infra-api</module>
|
||||
<module>tashow-system-api</module>
|
||||
<module>tashow-product-api</module>
|
||||
</modules>
|
||||
|
||||
</project>
|
||||
|
||||
45
tashow-feign/tashow-product-api/pom.xml
Normal file
45
tashow-feign/tashow-product-api/pom.xml
Normal file
@@ -0,0 +1,45 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
<artifactId>tashow-feign</artifactId>
|
||||
<version>${revision}</version>
|
||||
</parent>
|
||||
<artifactId>tashow-product-api</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<name>${project.artifactId}</name>
|
||||
<description>
|
||||
infra 模块 API,暴露给其它模块调用
|
||||
</description>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
<artifactId>tashow-common</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- 参数校验 -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-validation</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
|
||||
<!-- RPC 远程调用相关 -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-openfeign</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mybatis</groupId>
|
||||
<artifactId>mybatis</artifactId>
|
||||
<version>3.5.13</version> <!-- 推荐使用最新稳定版本 -->
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,4 @@
|
||||
/**
|
||||
* infra API 包,定义暴露给其它模块的 API
|
||||
*/
|
||||
package com.tashow.cloud.productapi.api;
|
||||
@@ -0,0 +1,24 @@
|
||||
package com.tashow.cloud.productapi.enums;
|
||||
|
||||
|
||||
import com.tashow.cloud.common.enums.RpcConstants;
|
||||
|
||||
/**
|
||||
* API 相关的枚举
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public class ApiConstants {
|
||||
|
||||
/**
|
||||
* 服务名
|
||||
*
|
||||
* 注意,需要保证和 spring.application.name 保持一致
|
||||
*/
|
||||
public static final String NAME = "product-server";
|
||||
|
||||
public static final String PREFIX = RpcConstants.RPC_API_PREFIX + "/infra";
|
||||
|
||||
public static final String VERSION = "1.0.0";
|
||||
|
||||
}
|
||||
@@ -0,0 +1,42 @@
|
||||
package com.tashow.cloud.productapi.enums;
|
||||
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* @Author LGF
|
||||
* @create 2020/10/26 16:36
|
||||
*/
|
||||
public enum BaseEnum {
|
||||
/**
|
||||
* 基础 枚举
|
||||
*/
|
||||
|
||||
YES_ONE(1, "是"),
|
||||
NO_ZERO(0, "否"),
|
||||
|
||||
ENABLE_ZERO(0, "启用"),
|
||||
FORBIDDEN_ONE(1, "禁用"),
|
||||
|
||||
YES_BINDING(1, "已绑定"),
|
||||
NO_BINDING(0, "未绑定"),
|
||||
|
||||
NO(1, "删除"),
|
||||
YES(0, "正常"),
|
||||
|
||||
HIDE(3, "隐藏"),
|
||||
|
||||
NO_TWO(2, "否"),
|
||||
;
|
||||
|
||||
@Getter
|
||||
Integer key;
|
||||
@Getter
|
||||
String value;
|
||||
;
|
||||
|
||||
BaseEnum(Integer key, String value) {
|
||||
this.key = key;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
package com.tashow.cloud.productapi.enums;
|
||||
|
||||
/**
|
||||
* Infra 字典类型的枚举类
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public interface DictTypeConstants {
|
||||
|
||||
String JOB_STATUS = "product_job_status"; // 定时任务状态的枚举
|
||||
String JOB_LOG_STATUS = "product_job_log_status"; // 定时任务日志状态的枚举
|
||||
|
||||
String API_ERROR_LOG_PROCESS_STATUS = "product_api_error_log_process_status"; // API 错误日志的处理状态的枚举
|
||||
|
||||
String CONFIG_TYPE = "product_config_type"; // 参数配置类型
|
||||
String BOOLEAN_STRING = "product_boolean_string"; // Boolean 是否类型
|
||||
|
||||
String OPERATE_TYPE = "product_operate_type"; // 操作类型
|
||||
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
package com.tashow.cloud.productapi.enums;
|
||||
|
||||
|
||||
import com.tashow.cloud.common.exception.ErrorCode;
|
||||
|
||||
/**
|
||||
* Infra 错误码枚举类
|
||||
*
|
||||
* infra 系统,使用 1-001-000-000 段
|
||||
*/
|
||||
public interface ErrorCodeConstants {
|
||||
|
||||
ErrorCode CATEGORY_NOT_EXISTS = new ErrorCode(10001, "产品类目不存在");
|
||||
ErrorCode PROD_NOT_EXISTS = new ErrorCode(10002, "商品不存在");
|
||||
ErrorCode PROD_ADDITIONAL_FEE_DATES_NOT_EXISTS = new ErrorCode(10003, "特殊日期附加费用规则不存在");
|
||||
ErrorCode PROD_ADDITIONAL_FEE_PERIODS_NOT_EXISTS = new ErrorCode(10004, "特殊时段附加费用规则不存在");
|
||||
ErrorCode PROD_EMERGENCY_RESPONSE_NOT_EXISTS = new ErrorCode(10005, "商品紧急响应服务设置不存在");
|
||||
ErrorCode PROD_EMERGENCY_RESPONSE_INTERVALS_NOT_EXISTS = new ErrorCode(10006, "紧急响应时间区间设置不存在");
|
||||
ErrorCode PROD_PROP_NOT_EXISTS = new ErrorCode(10007, "商品属性不存在");
|
||||
ErrorCode PROD_PROP_VALUE_NOT_EXISTS = new ErrorCode(10008, "属性规则不存在");
|
||||
ErrorCode PROD_RESERVATION_CONFIG_NOT_EXISTS = new ErrorCode(10009, "商品预约配置不存在");
|
||||
ErrorCode PROD_SERVICE_AREA_RELEVANCE_NOT_EXISTS = new ErrorCode(10010, "商品与服务区域关联不存在");
|
||||
ErrorCode PROD_SERVICE_AREAS_NOT_EXISTS = new ErrorCode(10011, "服务区域不存在");
|
||||
ErrorCode PROD_SERVICE_OVER_AREA_RULES_NOT_EXISTS = new ErrorCode(10012, "超区规则不存在");
|
||||
ErrorCode PROD_TAGS_NOT_EXISTS = new ErrorCode(10013, "商品和标签管理不存在");
|
||||
ErrorCode PRODUCT_ORDER_LIMIT_NOT_EXISTS = new ErrorCode(10014, "商品接单上限设置不存在");
|
||||
ErrorCode PROD_WEIGHT_RANGE_PRICES_NOT_EXISTS = new ErrorCode(10015, "体重区间价格不存在");
|
||||
ErrorCode SHOP_DETAIL_NOT_EXISTS = new ErrorCode(10016, "店铺信息不存在");
|
||||
ErrorCode SKU_NOT_EXISTS = new ErrorCode(10017, "单品SKU不存在");
|
||||
ErrorCode SKU_SERVICE_DELIVER_NOT_EXISTS = new ErrorCode(10018, "服务交付方式不存在");
|
||||
ErrorCode SKU_SERVICE_MATERIAL_NOT_EXISTS = new ErrorCode(10019, "服务物料详情不存在");
|
||||
ErrorCode SKU_SERVICES_FORM_NOT_EXISTS = new ErrorCode(10021, "商品SKU扩展服务表单不存在");
|
||||
ErrorCode SKU_SERVICE_TRANSPORT_NOT_EXISTS = new ErrorCode(10022, "服务遗体运输不存在");
|
||||
ErrorCode SKU_SERVICE_DETAILS_NOT_EXISTS = new ErrorCode(10023, "服务详情不存在");
|
||||
}
|
||||
@@ -0,0 +1,44 @@
|
||||
/*
|
||||
* Copyright (c) 2018-2999 广州市蓝海创新科技有限公司 All rights reserved.
|
||||
*
|
||||
* https://www.mall4j.com/
|
||||
*
|
||||
* 未经允许,不可做商业用途!
|
||||
*
|
||||
* 版权所有,侵权必究!
|
||||
*/
|
||||
|
||||
package com.tashow.cloud.productapi.enums;
|
||||
|
||||
/**
|
||||
* 商品规格参数、属性类型
|
||||
* @author lgh
|
||||
*/
|
||||
public enum ProdPropRule {
|
||||
|
||||
// 规格属性 (用于商品商品发布时,关联sku)
|
||||
SPEC(1),
|
||||
|
||||
// 规格参数(用于商品搜索时,与分类关联搜索)
|
||||
ATTRIBUTE(2);
|
||||
|
||||
private Integer num;
|
||||
|
||||
public Integer value() {
|
||||
return num;
|
||||
}
|
||||
|
||||
ProdPropRule(Integer num){
|
||||
this.num = num;
|
||||
}
|
||||
|
||||
public static ProdPropRule instance(Integer value) {
|
||||
ProdPropRule[] enums = values();
|
||||
for (ProdPropRule statusEnum : enums) {
|
||||
if (statusEnum.value().equals(value)) {
|
||||
return statusEnum;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,45 @@
|
||||
package com.tashow.cloud.productapi.enums;
|
||||
|
||||
public enum ServiceTypeEnum {
|
||||
TRANSPORT_CAR_CONFIG(1, "接运车辆配置"),
|
||||
TRANSPORT_CAR_MATERIAL(2, "接运车辆服务物料"),
|
||||
BODY_TRANSPORT_CONFIG(3, "遗体运输目的地配置"),
|
||||
BODY_TRANSPORT_MATERIAL(4, "遗体运输目的地物料"),
|
||||
BODY_CLEAN_CONFIG(5, "遗体清洁配置"),
|
||||
BODY_CLEAN_MATERIAL(6, "遗体清洁物料"),
|
||||
MEMORIAL_CONFIG(7, "追思告别配置"),
|
||||
MEMORIAL_MATERIAL(8, "追思告别物料"),
|
||||
CREMATION_CONFIG(9, "遗体火化配置"),
|
||||
CREMATION_MATERIAL(10, "遗体火化物料"),
|
||||
ASH_PROCESSING_CONFIG(11, "骨灰处理配置"),
|
||||
ASH_PROCESSING_DELIVERY(12, "骨灰处理配送方式"),
|
||||
ASH_PROCESSING_MATERIAL(13, "骨灰处理物料"),
|
||||
BONE_ASH_CONFIG(14, "骨灰装殓配置"),
|
||||
SOUVENIR_CONFIG(15, "纪念品配置"),
|
||||
SOUVENIR_DELIVERY(16, "纪念品配送方式");
|
||||
|
||||
private final int code;
|
||||
private final String description;
|
||||
|
||||
ServiceTypeEnum(int code, String description) {
|
||||
this.code = code;
|
||||
this.description = description;
|
||||
}
|
||||
|
||||
public int getCode() {
|
||||
return code;
|
||||
}
|
||||
|
||||
public String getDescription() {
|
||||
return description;
|
||||
}
|
||||
|
||||
public static ServiceTypeEnum getByCode(int code) {
|
||||
for (ServiceTypeEnum type : ServiceTypeEnum.values()) {
|
||||
if (type.getCode() == code) {
|
||||
return type;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,54 @@
|
||||
package com.tashow.cloud.productapi.general;
|
||||
|
||||
import org.apache.ibatis.type.BaseTypeHandler;
|
||||
import org.apache.ibatis.type.JdbcType;
|
||||
|
||||
import java.sql.CallableStatement;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 处理 List<String> 与数据库逗号分隔字符串之间的转换
|
||||
*/
|
||||
public class StringListTypeHandler extends BaseTypeHandler<List<String>> {
|
||||
|
||||
@Override
|
||||
public void setNonNullParameter(PreparedStatement ps, int i, List<String> parameter, JdbcType jdbcType) throws SQLException {
|
||||
// 将 List 转为逗号分隔的字符串
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (int j = 0; j < parameter.size(); j++) {
|
||||
if (j > 0) sb.append(",");
|
||||
sb.append(parameter.get(j));
|
||||
}
|
||||
ps.setString(i, sb.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getNullableResult(ResultSet rs, String columnName) throws SQLException {
|
||||
String str = rs.getString(columnName);
|
||||
return parseStringToList(str);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getNullableResult(ResultSet rs, int columnIndex) throws SQLException {
|
||||
String str = rs.getString(columnIndex);
|
||||
return parseStringToList(str);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getNullableResult(CallableStatement cs, int columnIndex) throws SQLException {
|
||||
String str = cs.getString(columnIndex);
|
||||
return parseStringToList(str);
|
||||
}
|
||||
|
||||
private List<String> parseStringToList(String str) {
|
||||
if (str == null || str.trim().length() == 0) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
return Arrays.asList(str.split(","));
|
||||
}
|
||||
}
|
||||
@@ -27,6 +27,7 @@
|
||||
<module>tashow-data-redis</module>
|
||||
<module>tashow-data-excel</module>
|
||||
<module>tashow-data-es</module>
|
||||
<module>tashow-data-canal</module>
|
||||
</modules>
|
||||
|
||||
|
||||
|
||||
@@ -108,6 +108,25 @@
|
||||
<artifactId>jackson-datatype-jsr310</artifactId>
|
||||
<scope>provided</scope> <!-- 设置为 provided,只有工具类需要使用到 -->
|
||||
</dependency>
|
||||
<!-- pagehelper 分页插件 -->
|
||||
<!-- <dependency>
|
||||
<groupId>com.github.pagehelper</groupId>
|
||||
<artifactId>pagehelper-spring-boot-starter</artifactId>
|
||||
<version>1.4.6</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.mybatis</groupId>
|
||||
<artifactId>mybatis</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>-->
|
||||
<!--常用工具类 -->
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
|
||||
@@ -93,7 +93,7 @@ public class CommonResult<T> implements Serializable {
|
||||
|
||||
// ========= 和 Exception 异常体系集成 =========
|
||||
|
||||
/**
|
||||
/**
|
||||
* 判断是否有异常。如果有,则抛出 {@link ServiceException} 异常
|
||||
*/
|
||||
public void checkError() throws ServiceException {
|
||||
|
||||
@@ -3,6 +3,7 @@ package com.tashow.cloud.common.util.date;
|
||||
import cn.hutool.core.date.LocalDateTimeUtil;
|
||||
|
||||
import java.time.*;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.Calendar;
|
||||
import java.util.Date;
|
||||
|
||||
@@ -27,6 +28,9 @@ public class DateUtils {
|
||||
|
||||
public static final String FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND = "yyyy-MM-dd HH:mm:ss";
|
||||
|
||||
|
||||
// 默认数据保留天数
|
||||
private static final long RETENTION_DAYS = 90;
|
||||
/**
|
||||
* 将 LocalDateTime 转换成 Date
|
||||
*
|
||||
@@ -146,4 +150,36 @@ public class DateUtils {
|
||||
return LocalDateTimeUtil.isSameDay(date, LocalDateTime.now().minusDays(1));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 根据删除时间,计算还剩多少天被彻底删除(默认保留 90 天)
|
||||
*
|
||||
* @param deleteTime 删除时间
|
||||
* @return 剩余天数(>=0),0 表示已过期
|
||||
*/
|
||||
public static long getRemainingDays(Date deleteTime) {
|
||||
if (deleteTime == null) {
|
||||
throw new IllegalArgumentException("删除时间不能为 null");
|
||||
}
|
||||
|
||||
// 将 Date 转换为 LocalDateTime
|
||||
LocalDateTime deleteDateTime = deleteTime.toInstant()
|
||||
.atZone(ZoneId.systemDefault())
|
||||
.toLocalDateTime();
|
||||
|
||||
// 当前时间
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
|
||||
// 到期时间 = 删除时间 + 保留天数
|
||||
LocalDateTime expireTime = deleteDateTime.plusDays(RETENTION_DAYS);
|
||||
|
||||
// 如果当前时间已经超过到期时间,剩余天数为 0
|
||||
if (now.isAfter(expireTime)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// 计算剩余天数(向下取整,不进位)
|
||||
return ChronoUnit.DAYS.between(now, expireTime);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,64 @@
|
||||
/*
|
||||
* Copyright (c) 2018-2999 广州市蓝海创新科技有限公司 All rights reserved.
|
||||
*
|
||||
* https://www.mall4j.com/
|
||||
*
|
||||
* 未经允许,不可做商业用途!
|
||||
*
|
||||
* 版权所有,侵权必究!
|
||||
*/
|
||||
|
||||
package com.tashow.cloud.common.util.serializer;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.fasterxml.jackson.core.JsonGenerator;
|
||||
import com.fasterxml.jackson.databind.JsonSerializer;
|
||||
import com.fasterxml.jackson.databind.SerializerProvider;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
* @author lanhai
|
||||
*/
|
||||
@Component
|
||||
public class ImgJsonSerializer extends JsonSerializer<String> {
|
||||
|
||||
/* @Autowired
|
||||
private Qiniu qiniu;
|
||||
@Autowired
|
||||
private ImgUploadUtil imgUploadUtil;*/
|
||||
|
||||
@Override
|
||||
public void serialize(String value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
|
||||
/*if (StrUtil.isBlank(value)) {
|
||||
gen.writeString(StrUtil.EMPTY);
|
||||
return;
|
||||
}
|
||||
String[] imgs = value.split(StrUtil.COMMA);
|
||||
StringBuilder sb = new StringBuilder();
|
||||
String resourceUrl = "";
|
||||
String rule="^((http[s]{0,1})://)";
|
||||
Pattern pattern= Pattern.compile(rule);
|
||||
if (Objects.equals(imgUploadUtil.getUploadType(), 2)) {
|
||||
resourceUrl = qiniu.getResourcesUrl();
|
||||
} else if (Objects.equals(imgUploadUtil.getUploadType(), 1)) {
|
||||
resourceUrl = imgUploadUtil.getResourceUrl();
|
||||
}
|
||||
for (String img : imgs) {
|
||||
Matcher matcher = pattern.matcher(img);
|
||||
//若图片以http或https开头,直接返回
|
||||
if (matcher.find()){
|
||||
sb.append(img).append(StrUtil.COMMA);
|
||||
}else {
|
||||
sb.append(resourceUrl).append(img).append(StrUtil.COMMA);
|
||||
}
|
||||
}
|
||||
sb.deleteCharAt(sb.length()-1);
|
||||
gen.writeString(sb.toString());*/
|
||||
}
|
||||
}
|
||||
@@ -1 +0,0 @@
|
||||
<http://www.iocoder.cn/Spring-Boot/Validation/?yudao>
|
||||
49
tashow-framework/tashow-data-canal/pom.xml
Normal file
49
tashow-framework/tashow-data-canal/pom.xml
Normal file
@@ -0,0 +1,49 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
<artifactId>tashow-framework</artifactId>
|
||||
<version>${revision}</version>
|
||||
</parent>
|
||||
<artifactId>tashow-data-canal</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<name>${project.artifactId}</name>
|
||||
<description>canal 封装拓展</description>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.alibaba.otter</groupId>
|
||||
<artifactId>canal.client</artifactId>
|
||||
<version>1.1.0</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Dynamic-Datasource Starter -->
|
||||
<dependency>
|
||||
<groupId>com.baomidou</groupId>
|
||||
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
|
||||
<version>4.2.0</version> <!-- 推荐使用稳定版本 -->
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>javax.annotation</groupId>
|
||||
<artifactId>javax.annotation-api</artifactId>
|
||||
<version>1.3.2</version> <!-- 最新版本 -->
|
||||
</dependency>
|
||||
<!-- 芋道基础依赖 -->
|
||||
<dependency>
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
<artifactId>tashow-common</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- 其他必要依赖 -->
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
@@ -0,0 +1,22 @@
|
||||
package com.tashow.cloud.canal.config;
|
||||
|
||||
import com.tashow.cloud.canal.service.CanalSyncService;
|
||||
import com.tashow.cloud.canal.service.SqlExecutorService;
|
||||
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@AutoConfiguration
|
||||
public class CanalAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
public CanalSyncService canalSyncService() {
|
||||
return new CanalSyncService();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public SqlExecutorService getdb() {
|
||||
return new SqlExecutorService();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,188 @@
|
||||
package com.tashow.cloud.canal.service;
|
||||
|
||||
import com.alibaba.otter.canal.client.CanalConnector;
|
||||
import com.alibaba.otter.canal.client.CanalConnectors;
|
||||
import com.alibaba.otter.canal.protocol.CanalEntry.*;
|
||||
import com.alibaba.otter.canal.protocol.Message;
|
||||
import com.baomidou.dynamic.datasource.annotation.DS;
|
||||
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.SQLException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
@Service
|
||||
public class CanalSyncService {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(CanalSyncService.class);
|
||||
|
||||
private static final Queue<SqlTask> SQL_QUEUE = new ConcurrentLinkedQueue<>();
|
||||
|
||||
@Autowired
|
||||
private JdbcTemplate jdbcTemplate;
|
||||
@Autowired
|
||||
private SqlExecutorService sqlExecutorService;
|
||||
|
||||
@PostConstruct
|
||||
public void start() {
|
||||
new Thread(this::runCanalClient).start();
|
||||
}
|
||||
|
||||
private void runCanalClient() {
|
||||
CanalConnector connector = CanalConnectors.newSingleConnector(
|
||||
new InetSocketAddress("43.139.42.137", 11111),
|
||||
"example",
|
||||
"",
|
||||
""
|
||||
);
|
||||
int batchSize = 1000;
|
||||
|
||||
try {
|
||||
connector.connect();
|
||||
connector.subscribe("tashow-platform\\..*");
|
||||
connector.rollback();
|
||||
|
||||
while (true) {
|
||||
Message message = connector.getWithoutAck(batchSize);
|
||||
log.info("Received message id: {}, entries size: {}", message.getId(), message.getEntries().size());
|
||||
long batchId = message.getId();
|
||||
int size = message.getEntries().size();
|
||||
|
||||
if (batchId == -1 || size == 0) {
|
||||
Thread.sleep(1000);
|
||||
continue;
|
||||
}
|
||||
|
||||
dataHandle(message.getEntries());
|
||||
connector.ack(batchId);
|
||||
|
||||
if (!SQL_QUEUE.isEmpty()) {
|
||||
executeQueueSql();
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Canal client error occurred.", e);
|
||||
} finally {
|
||||
connector.disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
private void dataHandle(List<Entry> entries) {
|
||||
for (Entry entry : entries) {
|
||||
if (entry.getEntryType() != EntryType.ROWDATA) continue;
|
||||
|
||||
try {
|
||||
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
|
||||
EventType eventType = rowChange.getEventType();
|
||||
String schemaName = entry.getHeader().getSchemaName();
|
||||
String tableName = entry.getHeader().getTableName();
|
||||
|
||||
log.info("schema: {}, table: {}, type: {}", schemaName, tableName, eventType);
|
||||
|
||||
if (eventType == EventType.DELETE) {
|
||||
saveDeleteSql(entry);
|
||||
} else if (eventType == EventType.UPDATE) {
|
||||
saveUpdateSql(entry);
|
||||
} else if (eventType == EventType.INSERT) {
|
||||
saveInsertSql(entry);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("Error handling entry: {}", entry.toString(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void saveInsertSql(Entry entry) throws Exception {
|
||||
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
|
||||
String tableName = entry.getHeader().getTableName();
|
||||
|
||||
for (RowData rowData : rowChange.getRowDatasList()) {
|
||||
List<Column> columns = rowData.getAfterColumnsList();
|
||||
|
||||
List<String> columnNames = new ArrayList<>();
|
||||
List<Object> values = new ArrayList<>();
|
||||
|
||||
for (Column col : columns) {
|
||||
columnNames.add(col.getName());
|
||||
values.add(col.getValue());
|
||||
}
|
||||
|
||||
String sql = "INSERT INTO " + tableName + " (" +
|
||||
String.join(",", columnNames) + ") VALUES (";
|
||||
|
||||
StringBuilder placeholders = new StringBuilder();
|
||||
for (int i = 0; i < values.size(); i++) {
|
||||
placeholders.append("?,");
|
||||
}
|
||||
if (placeholders.length() > 0) placeholders.deleteCharAt(placeholders.length() - 1);
|
||||
sql += placeholders + ")";
|
||||
|
||||
SQL_QUEUE.add(new SqlTask(sql, values.toArray()));
|
||||
}
|
||||
}
|
||||
|
||||
private void saveUpdateSql(Entry entry) throws Exception {
|
||||
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
|
||||
String tableName = entry.getHeader().getTableName();
|
||||
|
||||
for (RowData rowData : rowChange.getRowDatasList()) {
|
||||
List<Column> newColumns = rowData.getAfterColumnsList();
|
||||
List<Column> oldColumns = rowData.getBeforeColumnsList();
|
||||
|
||||
List<String> updateColumns = new ArrayList<>();
|
||||
List<Object> params = new ArrayList<>();
|
||||
|
||||
for (Column col : newColumns) {
|
||||
updateColumns.add(col.getName() + "=?");
|
||||
params.add(col.getValue());
|
||||
}
|
||||
|
||||
Optional<Column> primaryKeyOpt = oldColumns.stream().filter(Column::getIsKey).findFirst();
|
||||
Column primaryKey = primaryKeyOpt.orElseThrow(() -> new RuntimeException("未找到主键"));
|
||||
|
||||
params.add(primaryKey.getValue());
|
||||
|
||||
String sql = "UPDATE " + tableName + " SET " +
|
||||
String.join(",", updateColumns) +
|
||||
" WHERE " + primaryKey.getName() + "=?";
|
||||
|
||||
SQL_QUEUE.add(new SqlTask(sql, params.toArray()));
|
||||
}
|
||||
}
|
||||
|
||||
private void saveDeleteSql(Entry entry) throws Exception {
|
||||
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
|
||||
String tableName = entry.getHeader().getTableName();
|
||||
|
||||
for (RowData rowData : rowChange.getRowDatasList()) {
|
||||
List<Column> beforeColumns = rowData.getBeforeColumnsList();
|
||||
|
||||
Optional<Column> primaryKeyOpt = beforeColumns.stream().filter(Column::getIsKey).findFirst();
|
||||
Column primaryKey = primaryKeyOpt.orElseThrow(() -> new RuntimeException("未找到主键"));
|
||||
|
||||
String sql = "DELETE FROM " + tableName + " WHERE " + primaryKey.getName() + "=?";
|
||||
SQL_QUEUE.add(new SqlTask(sql, primaryKey.getValue()));
|
||||
}
|
||||
}
|
||||
|
||||
private void executeQueueSql() {
|
||||
List<SqlTask> tasks = new ArrayList<>();
|
||||
SqlTask task;
|
||||
while ((task = SQL_QUEUE.poll()) != null) {
|
||||
tasks.add(task);
|
||||
}
|
||||
if (!tasks.isEmpty()) {
|
||||
sqlExecutorService.executeBatch(tasks);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,156 @@
|
||||
/*
|
||||
package com.tashow.cloud.canal.service;
|
||||
|
||||
import com.alibaba.otter.canal.client.CanalConnector;
|
||||
import com.alibaba.otter.canal.client.CanalConnectors;
|
||||
import com.alibaba.otter.canal.protocol.CanalEntry.*;
|
||||
import com.alibaba.otter.canal.protocol.Message;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
@Service
|
||||
public class CanalSyncServiceTest {
|
||||
|
||||
private static final Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
|
||||
@Autowired
|
||||
private JdbcTemplate jdbcTemplate;
|
||||
@Autowired
|
||||
private Canaldb canaldb;
|
||||
|
||||
@PostConstruct
|
||||
public void start() {
|
||||
new Thread(this::runCanalClient).start();
|
||||
}
|
||||
|
||||
private void runCanalClient() {
|
||||
CanalConnector connector = CanalConnectors.newSingleConnector(
|
||||
new InetSocketAddress("43.139.42.137", 11111),
|
||||
"example",
|
||||
"",
|
||||
""
|
||||
);
|
||||
int batchSize = 1000;
|
||||
|
||||
try {
|
||||
connector.connect();
|
||||
connector.subscribe("tashow-platform\\..*");
|
||||
connector.rollback();
|
||||
|
||||
while (true) {
|
||||
Message message = connector.getWithoutAck(batchSize);
|
||||
System.out.println("Received message id: " + message.getId() + ", entries size: " + message.getEntries().size());
|
||||
long batchId = message.getId();
|
||||
int size = message.getEntries().size();
|
||||
|
||||
if (batchId == -1 || size == 0) {
|
||||
Thread.sleep(1000);
|
||||
continue;
|
||||
}
|
||||
|
||||
dataHandle(message.getEntries());
|
||||
connector.ack(batchId);
|
||||
|
||||
if (SQL_QUEUE.size() > 0) {
|
||||
executeQueueSql();
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
connector.disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
private void dataHandle(List<Entry> entries) {
|
||||
for (Entry entry : entries) {
|
||||
if (entry.getEntryType() != EntryType.ROWDATA) continue;
|
||||
|
||||
try {
|
||||
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
|
||||
EventType eventType = rowChange.getEventType();
|
||||
String schemaName = entry.getHeader().getSchemaName();
|
||||
String tableName = entry.getHeader().getTableName();
|
||||
|
||||
System.out.println("schema: " + schemaName + ", table: " + tableName + ", type: " + eventType);
|
||||
|
||||
if (eventType == EventType.DELETE) {
|
||||
saveDeleteSql(entry);
|
||||
} else if (eventType == EventType.UPDATE) {
|
||||
saveUpdateSql(entry);
|
||||
} else if (eventType == EventType.INSERT) {
|
||||
saveInsertSql(entry);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void saveInsertSql(Entry entry) throws Exception {
|
||||
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
|
||||
for (RowData rowData : rowChange.getRowDatasList()) {
|
||||
List<Column> columns = rowData.getAfterColumnsList();
|
||||
StringBuilder sql = new StringBuilder("INSERT INTO ")
|
||||
.append(entry.getHeader().getTableName()).append(" (")
|
||||
.append(columns.stream().map(Column::getName).reduce((a, b) -> a + "," + b).orElse(""))
|
||||
.append(") VALUES (")
|
||||
.append(columns.stream().map(c -> "'" + c.getValue() + "'").reduce((a, b) -> a + "," + b).orElse(""))
|
||||
.append(");");
|
||||
|
||||
SQL_QUEUE.add(sql.toString());
|
||||
}
|
||||
}
|
||||
|
||||
private void saveUpdateSql(Entry entry) throws Exception {
|
||||
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
|
||||
for (RowData rowData : rowChange.getRowDatasList()) {
|
||||
List<Column> newColumns = rowData.getAfterColumnsList();
|
||||
List<Column> oldColumns = rowData.getBeforeColumnsList();
|
||||
|
||||
StringBuilder setClause = new StringBuilder();
|
||||
for (Column col : newColumns) {
|
||||
setClause.append(col.getName()).append("='").append(col.getValue()).append("', ");
|
||||
}
|
||||
if (setClause.length() > 0) setClause.setLength(setClause.length() - 2);
|
||||
|
||||
String whereClause = oldColumns.stream()
|
||||
.filter(Column::getIsKey)
|
||||
.map(c -> c.getName() + "='" + c.getValue() + "'")
|
||||
.findFirst()
|
||||
.orElseThrow(() -> new RuntimeException("未找到主键"));
|
||||
|
||||
SQL_QUEUE.add("UPDATE " + entry.getHeader().getTableName() + " SET " + setClause + " WHERE " + whereClause + ";");
|
||||
}
|
||||
}
|
||||
|
||||
private void saveDeleteSql(Entry entry) throws Exception {
|
||||
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
|
||||
for (RowData rowData : rowChange.getRowDatasList()) {
|
||||
String whereClause = rowData.getBeforeColumnsList().stream()
|
||||
.filter(Column::getIsKey)
|
||||
.map(c -> c.getName() + "='" + c.getValue() + "'")
|
||||
.findFirst()
|
||||
.orElseThrow(() -> new RuntimeException("未找到主键"));
|
||||
|
||||
SQL_QUEUE.add("DELETE FROM " + entry.getHeader().getTableName() + " WHERE " + whereClause + ";");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void executeQueueSql() {
|
||||
int size = SQL_QUEUE.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
String sql = SQL_QUEUE.poll();
|
||||
canaldb.execute(sql);
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
@@ -0,0 +1,27 @@
|
||||
/*
|
||||
package com.tashow.cloud.canal.service;
|
||||
|
||||
import com.baomidou.dynamic.datasource.annotation.DS;
|
||||
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Service
|
||||
public class Canaldb {
|
||||
|
||||
@Autowired
|
||||
private JdbcTemplate jdbcTemplate;
|
||||
@DS("slave")
|
||||
public void execute(String sql) {
|
||||
try {
|
||||
String ds = DynamicDataSourceContextHolder.peek(); // 调试查看当前数据源
|
||||
System.out.println("当前数据源:" + ds);
|
||||
System.out.println("[execute]----> " + sql);
|
||||
jdbcTemplate.execute(sql);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
@@ -0,0 +1,56 @@
|
||||
package com.tashow.cloud.canal.service;
|
||||
|
||||
|
||||
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.SQLException;
|
||||
import java.util.List;
|
||||
|
||||
@Service
|
||||
public class SqlExecutorService {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SqlExecutorService.class);
|
||||
|
||||
@Autowired
|
||||
private JdbcTemplate jdbcTemplate;
|
||||
|
||||
public void executeBatch(List<SqlTask> tasks) {
|
||||
if (tasks == null || tasks.isEmpty()) return;
|
||||
|
||||
DynamicDataSourceContextHolder.push("slave");
|
||||
try {
|
||||
// 提取所有 SQL 模板(假设它们都是一样的)
|
||||
String sqlTemplate = tasks.get(0).getSql();
|
||||
|
||||
// 执行批量更新
|
||||
jdbcTemplate.batchUpdate(sqlTemplate, new BatchPreparedStatementSetter() {
|
||||
@Override
|
||||
public void setValues(PreparedStatement ps, int i) throws SQLException {
|
||||
SqlTask task = tasks.get(i);
|
||||
Object[] args = task.getArgs();
|
||||
for (int j = 0; j < args.length; j++) {
|
||||
ps.setObject(j + 1, args[j]);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getBatchSize() {
|
||||
return tasks.size();
|
||||
}
|
||||
});
|
||||
|
||||
log.info("✅ 成功执行 {} 条 SQL", tasks.size());
|
||||
} catch (Exception e) {
|
||||
log.error("❌ 批量执行 SQL 失败", e);
|
||||
} finally {
|
||||
DynamicDataSourceContextHolder.poll();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
package com.tashow.cloud.canal.service;
|
||||
|
||||
public class SqlTask {
|
||||
private final String sql;
|
||||
private final Object[] args;
|
||||
|
||||
public SqlTask(String sql, Object... args) {
|
||||
this.sql = sql;
|
||||
this.args = args;
|
||||
}
|
||||
|
||||
public String getSql() {
|
||||
return sql;
|
||||
}
|
||||
|
||||
public Object[] getArgs() {
|
||||
return args;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
package com.tashow.cloud.canal.service;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
public class SqlTaskQueue {
|
||||
private static final Queue<SqlTask> queue = new ConcurrentLinkedQueue<>();
|
||||
|
||||
public static void add(SqlTask task) {
|
||||
queue.add(task);
|
||||
}
|
||||
|
||||
public static boolean isEmpty() {
|
||||
return queue.isEmpty();
|
||||
}
|
||||
|
||||
public static SqlTask poll() {
|
||||
return queue.poll();
|
||||
}
|
||||
|
||||
public static int size() {
|
||||
return queue.size();
|
||||
}
|
||||
|
||||
public static List<SqlTask> drainAll() {
|
||||
List<SqlTask> list = new java.util.ArrayList<>();
|
||||
queue.forEach(list::add);
|
||||
queue.clear();
|
||||
return list;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1 @@
|
||||
com.tashow.cloud.canal.config.CanalAutoConfiguration
|
||||
@@ -37,6 +37,11 @@
|
||||
<artifactId>ojdbc8</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<!--<dependency>
|
||||
<groupId>com.github.jsqlparser</groupId>
|
||||
<artifactId>jsqlparser</artifactId>
|
||||
<version>4.5</version>
|
||||
</dependency>-->
|
||||
<dependency>
|
||||
<groupId>org.postgresql</groupId>
|
||||
<artifactId>postgresql</artifactId>
|
||||
|
||||
@@ -51,6 +51,6 @@ public abstract class BaseDO implements Serializable, TransPojo {
|
||||
* 是否删除
|
||||
*/
|
||||
@TableLogic
|
||||
private Boolean deleted;
|
||||
private Integer deleted;
|
||||
|
||||
}
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
<http://www.iocoder.cn/Spring-Boot/MyBatis/?yudao>
|
||||
@@ -1 +0,0 @@
|
||||
<http://www.iocoder.cn/Spring-Boot/dynamic-datasource/?yudao>
|
||||
@@ -1 +0,0 @@
|
||||
<http://www.iocoder.cn/Spring-Boot/datasource-pool/?yudao>
|
||||
@@ -1 +0,0 @@
|
||||
<http://www.iocoder.cn/Spring-Boot/Cache/?yudao>
|
||||
@@ -1 +0,0 @@
|
||||
<http://www.iocoder.cn/Spring-Boot/Redis/?yudao>
|
||||
@@ -1 +0,0 @@
|
||||
<http://www.iocoder.cn/Spring-Boot/Job/?yudao>
|
||||
@@ -1 +0,0 @@
|
||||
<http://www.iocoder.cn/Spring-Boot/Async-Job/?yudao>
|
||||
@@ -1 +0,0 @@
|
||||
<https://www.iocoder.cn/Spring-Boot/Admin/?yudao>
|
||||
@@ -1 +0,0 @@
|
||||
<https://www.iocoder.cn/Spring-Boot/Actuator/?yudao>
|
||||
@@ -1 +0,0 @@
|
||||
<http://www.iocoder.cn/Spring-Boot/SkyWalking/?yudao>
|
||||
@@ -2,30 +2,59 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
<artifactId>tashow-framework</artifactId>
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
<version>${revision}</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>tashow-framework-mq</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<name>${project.artifactId}</name>
|
||||
<description>消息队列,支持 Redis、RocketMQ、RabbitMQ、Kafka 四种</description>
|
||||
<description>消息队列模块,基于RabbitMQ等中间件</description>
|
||||
<url>https://github.com/tashow/tashow-platform</url>
|
||||
|
||||
<dependencies>
|
||||
<!-- DB 相关 -->
|
||||
<!-- RabbitMQ -->
|
||||
<dependency>
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
<artifactId>tashow-data-redis</artifactId>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-amqp</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
|
||||
<!-- Web Services -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.amqp</groupId>
|
||||
<artifactId>spring-rabbit</artifactId>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-web</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-webmvc</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>jakarta.servlet</groupId>
|
||||
<artifactId>jakarta.servlet-api</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
<artifactId>tashow-data-mybatis</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.jodd</groupId>
|
||||
<artifactId>jodd-util</artifactId>
|
||||
<version>6.3.0</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
package com.tashow.cloud.mq.core;
|
||||
import lombok.Data;
|
||||
import java.io.Serializable;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
/**
|
||||
* MQ消息基类,
|
||||
*
|
||||
*
|
||||
* @author tashow
|
||||
*/
|
||||
@Data
|
||||
public class BaseMqMessage implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
/**
|
||||
* 消息ID,默认为UUID
|
||||
*/
|
||||
private Integer id = UUID.randomUUID().hashCode();
|
||||
/**
|
||||
* 消息状态码
|
||||
*/
|
||||
private Integer statusCode;
|
||||
|
||||
/**
|
||||
* 消息重试次数
|
||||
*/
|
||||
private Integer retryCount = 0;
|
||||
|
||||
/**
|
||||
* 消息错误信息
|
||||
*/
|
||||
private String errorMessage;
|
||||
|
||||
/**
|
||||
* 消息创建时间
|
||||
*/
|
||||
private Date createTime = new Date();
|
||||
|
||||
/**
|
||||
* 扩展数据
|
||||
*/
|
||||
private Map<String, Object> extraData = new HashMap<>();
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
package com.tashow.cloud.mq.handler;
|
||||
/**
|
||||
* 消息记录处理接口
|
||||
*
|
||||
* @author tashow
|
||||
*/
|
||||
public interface FailRecordHandler {
|
||||
/**
|
||||
* 保存消息记录
|
||||
*
|
||||
* @param exchange 交换机
|
||||
* @param routingKey 路由键
|
||||
* @param cause 失败原因,可为null
|
||||
* @param messageContent 消息内容
|
||||
* @param status 状态:0-未处理,1-处理成功,2-处理失败
|
||||
*/
|
||||
void saveMessageRecord(Integer id, String exchange, String routingKey, String cause, String messageContent, int status);
|
||||
/**
|
||||
* 更新消息状态
|
||||
*
|
||||
* @param id 关联ID
|
||||
*/
|
||||
void updateMessageStatus(Integer id);
|
||||
|
||||
/**
|
||||
* 更新消息状态并设置失败原因
|
||||
|
||||
*/
|
||||
void updateMessageStatusWithCause(Integer id, String causes);
|
||||
|
||||
}
|
||||
@@ -1 +0,0 @@
|
||||
package com.tashow.cloud.mq;
|
||||
@@ -1,28 +1,16 @@
|
||||
package com.tashow.cloud.mq.rabbitmq.config;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
|
||||
/**
|
||||
* RabbitMQ 消息队列配置类
|
||||
* RabbitMQ 消息队列自动配置类
|
||||
*
|
||||
* @author 芋道源码
|
||||
* @author tashow
|
||||
*/
|
||||
@AutoConfiguration
|
||||
@Slf4j
|
||||
@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
|
||||
public class RabbitMQAutoConfiguration {
|
||||
|
||||
/**
|
||||
* Jackson2JsonMessageConverter Bean:使用 jackson 序列化消息
|
||||
*/
|
||||
@Bean
|
||||
public MessageConverter createMessageConverter() {
|
||||
return new Jackson2JsonMessageConverter();
|
||||
}
|
||||
|
||||
}
|
||||
public class RabbitMQAutoConfiguration extends RabbitMQConfiguration {
|
||||
private static final Logger log = LoggerFactory.getLogger(RabbitMQAutoConfiguration.class);
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
package com.tashow.cloud.mq.rabbitmq.config;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* RabbitMQ 配置类
|
||||
*
|
||||
* @author tashow
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
|
||||
public class RabbitMQConfiguration {
|
||||
/**
|
||||
* 创建消息转换器
|
||||
*
|
||||
* @return MessageConverter
|
||||
*/
|
||||
@Bean
|
||||
public MessageConverter messageConverter() {
|
||||
return new Jackson2JsonMessageConverter();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,89 @@
|
||||
package com.tashow.cloud.mq.rabbitmq.consumer;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.tashow.cloud.mq.core.BaseMqMessage;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.amqp.support.AmqpHeaders;
|
||||
import org.springframework.messaging.handler.annotation.Header;
|
||||
|
||||
/**
|
||||
* RabbitMQ消息消费者抽象类
|
||||
*
|
||||
* @param <T> 消息类型
|
||||
* @author tashow
|
||||
*/
|
||||
public abstract class AbstractRabbitMQConsumer<T extends BaseMqMessage> {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(AbstractRabbitMQConsumer.class);
|
||||
/**
|
||||
* 消息状态:成功
|
||||
*/
|
||||
public static final int STATUS_SUCCESS = 20;
|
||||
/**
|
||||
* 消息状态:消费异常
|
||||
*/
|
||||
public static final int STATUS_SEND_EXCEPTION = 30;
|
||||
|
||||
|
||||
/**
|
||||
* 埋点处理消息
|
||||
*
|
||||
* @param message 消息对象
|
||||
* @return 处理结果,true表示处理成功,false表示处理失败
|
||||
*/
|
||||
public abstract boolean processMessage(T message);
|
||||
|
||||
|
||||
/**
|
||||
* 消息处理入口
|
||||
*
|
||||
* @param message 消息对象
|
||||
* @param channel 通道
|
||||
* @param deliveryTag 投递标签
|
||||
*/
|
||||
public void onMessage(T message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
|
||||
message.setStatusCode(STATUS_SUCCESS);
|
||||
try {
|
||||
if(true){
|
||||
throw new RuntimeException("测试异常");
|
||||
}
|
||||
processMessage(message);
|
||||
safeChannelAck(channel, deliveryTag);
|
||||
} catch (Exception e) {
|
||||
message.setStatusCode(STATUS_SEND_EXCEPTION);
|
||||
processMessage( message);
|
||||
safeChannelAck(channel, deliveryTag);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 安全确认消息
|
||||
*
|
||||
* @param channel 通道
|
||||
* @param deliveryTag 投递标签
|
||||
*/
|
||||
protected void safeChannelAck(Channel channel, long deliveryTag) {
|
||||
try {
|
||||
channel.basicAck(deliveryTag, false);
|
||||
} catch (Exception e) {
|
||||
log.error("[MQ消费者] 确认消息失败: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 安全拒绝消息
|
||||
*
|
||||
* @param channel 通道
|
||||
* @param deliveryTag 投递标签
|
||||
* @param multiple 是否批量
|
||||
* @param requeue 是否重新入队
|
||||
*/
|
||||
protected void safeChannelNack(Channel channel, long deliveryTag, boolean multiple, boolean requeue) {
|
||||
try {
|
||||
channel.basicNack(deliveryTag, multiple, requeue);
|
||||
} catch (Exception e) {
|
||||
log.error("[MQ消费者] 拒绝消息失败: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,4 +0,0 @@
|
||||
/**
|
||||
* 占位符,无特殊逻辑
|
||||
*/
|
||||
package com.tashow.cloud.mq.rabbitmq.core;
|
||||
@@ -1,4 +0,0 @@
|
||||
/**
|
||||
* 消息队列,基于 RabbitMQ 提供
|
||||
*/
|
||||
package com.tashow.cloud.mq.rabbitmq;
|
||||
@@ -0,0 +1,97 @@
|
||||
package com.tashow.cloud.mq.rabbitmq.producer;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.tashow.cloud.common.util.json.JsonUtils;
|
||||
import com.tashow.cloud.mq.core.BaseMqMessage;
|
||||
import com.tashow.cloud.mq.handler.FailRecordHandler;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.amqp.rabbit.connection.CorrelationData;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* RabbitMQ消息生产者抽象类
|
||||
*
|
||||
* @param <T> 消息类型
|
||||
* @author tashow
|
||||
*/
|
||||
public abstract class AbstractRabbitMQProducer<T extends BaseMqMessage>
|
||||
implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(AbstractRabbitMQProducer.class);
|
||||
|
||||
@Autowired
|
||||
protected RabbitTemplate rabbitTemplate;
|
||||
|
||||
@Autowired(required = false)
|
||||
protected FailRecordHandler failRecordHandler;
|
||||
|
||||
/**
|
||||
* 初始化RabbitTemplate
|
||||
*/
|
||||
@PostConstruct
|
||||
public void initRabbitTemplate() {
|
||||
rabbitTemplate.setMandatory(true);
|
||||
rabbitTemplate.setReturnsCallback(this);
|
||||
rabbitTemplate.setConfirmCallback(this);
|
||||
if (rabbitTemplate.isConfirmListener()) {
|
||||
log.info("[MQ生产者] 确认回调已正确配置");
|
||||
} else {
|
||||
log.error("[MQ生产者] 确认回调配置失败");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 异步发送消息,使用指定的correlationId
|
||||
*
|
||||
* @param message 消息对象
|
||||
*/
|
||||
public void asyncSendMessage(T message) {
|
||||
try {
|
||||
String messageJson = JsonUtils.toJsonString(message);
|
||||
CorrelationData correlationData = new CorrelationData(messageJson);
|
||||
failRecordHandler.saveMessageRecord(
|
||||
message.getId(),
|
||||
getExchange(),
|
||||
getRoutingKey(),
|
||||
null,
|
||||
messageJson,
|
||||
0
|
||||
);
|
||||
rabbitTemplate.convertAndSend(getExchange(), getRoutingKey(), message, correlationData);
|
||||
} catch (Exception e) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* 获取交换机名称
|
||||
*
|
||||
* @return 交换机名称
|
||||
*/
|
||||
public abstract String getExchange();
|
||||
/**
|
||||
* 获取路由键
|
||||
*
|
||||
* @return 路由键
|
||||
*/
|
||||
public abstract String getRoutingKey();
|
||||
|
||||
@Override
|
||||
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
|
||||
JSONObject jsonObject = JSON.parseObject(correlationData.getId());
|
||||
Integer id = jsonObject.getInteger("id");
|
||||
if (ack) {
|
||||
failRecordHandler.updateMessageStatus(id);
|
||||
} else {
|
||||
failRecordHandler.updateMessageStatusWithCause(id,cause);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,49 @@
|
||||
package com.tashow.cloud.mq.retry;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 消息重试任务抽象实现
|
||||
*
|
||||
* @param <T> 失败记录类型
|
||||
* @author tashow
|
||||
*/
|
||||
public abstract class AbstractMessageRetryTask<T> {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(AbstractMessageRetryTask.class);
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 获取消息重试服务
|
||||
*
|
||||
* @return 消息重试服务
|
||||
*/
|
||||
protected abstract MessageRetryService<T> getMessageRetryService();
|
||||
|
||||
/**
|
||||
* 获取记录ID
|
||||
*
|
||||
* @param record 记录对象
|
||||
* @return 记录ID
|
||||
*/
|
||||
protected abstract Integer getRecordId(T record);
|
||||
|
||||
/**
|
||||
* 执行重试
|
||||
*/
|
||||
public void retryFailedMessages() {
|
||||
try {
|
||||
List<T> unprocessedRecords = getMessageRetryService().getUnprocessedRecords();
|
||||
for (T record : unprocessedRecords) {
|
||||
Integer recordId = getRecordId(record);
|
||||
getMessageRetryService().retryFailedMessage(recordId);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[MQ重试] 执行消息重试任务异常", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
package com.tashow.cloud.mq.retry;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 消息重试服务接口
|
||||
*
|
||||
* @param <T> 失败记录类型
|
||||
* @author tashow
|
||||
*/
|
||||
public interface MessageRetryService<T> {
|
||||
|
||||
/**
|
||||
* 获取未处理的失败记录
|
||||
*
|
||||
* @return 失败记录列表
|
||||
*/
|
||||
List<T> getUnprocessedRecords();
|
||||
|
||||
/**
|
||||
* 重试失败消息
|
||||
*
|
||||
* @param recordId 记录ID
|
||||
* @return 重试结果
|
||||
*/
|
||||
void retryFailedMessage(Integer recordId);
|
||||
|
||||
}
|
||||
@@ -1 +0,0 @@
|
||||
<http://www.iocoder.cn/Spring-Boot/RocketMQ/?yudao>
|
||||
@@ -1 +0,0 @@
|
||||
<http://www.iocoder.cn/Spring-Boot/Kafka/?yudao>
|
||||
@@ -1 +0,0 @@
|
||||
<http://www.iocoder.cn/Spring-Boot/RabbitMQ/?yudao>
|
||||
@@ -1 +0,0 @@
|
||||
<http://www.iocoder.cn/Spring-Boot/RocketMQ/?yudao>
|
||||
@@ -1 +0,0 @@
|
||||
<http://www.iocoder.cn/Spring-Boot/Feign/?yudao>
|
||||
@@ -1 +0,0 @@
|
||||
<http://www.iocoder.cn/Spring-Cloud/Feign/?yudao>
|
||||
@@ -149,10 +149,6 @@ public class WebSecurityConfigurerAdapter {
|
||||
return httpSecurity.build();
|
||||
}
|
||||
|
||||
private String buildAppApi(String url) {
|
||||
return webProperties.getAppApi().getPrefix() + url;
|
||||
}
|
||||
|
||||
private Multimap<HttpMethod, String> getPermitAllUrlsFromAnnotations() {
|
||||
Multimap<HttpMethod, String> result = HashMultimap.create();
|
||||
// 获得接口对应的 HandlerMethod 集合
|
||||
|
||||
@@ -1,2 +0,0 @@
|
||||
* 芋道 Spring Security 入门:<http://www.iocoder.cn/Spring-Boot/Spring-Security/?yudao>
|
||||
* Spring Security 基本概念:<http://www.iocoder.cn/Fight/Spring-Security-4-1-0-Basic-concept-description/?yudao>
|
||||
@@ -63,7 +63,7 @@ public class ApiAccessLogInterceptor implements HandlerInterceptor {
|
||||
|
||||
@Override
|
||||
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) {
|
||||
// 打印 response 日志
|
||||
// 打印 response 日志ss
|
||||
if (!SpringUtils.isProd()) {
|
||||
StopWatch stopWatch = (StopWatch) request.getAttribute(ATTRIBUTE_STOP_WATCH);
|
||||
stopWatch.stop();
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
<http://www.iocoder.cn/Spring-Boot/Swagger/?yudao>
|
||||
@@ -1 +0,0 @@
|
||||
<http://www.iocoder.cn/Spring-Boot/SpringMVC/?yudao>
|
||||
@@ -1 +0,0 @@
|
||||
<http://www.iocoder.cn/Spring-Boot/WebSocket/?yudao>
|
||||
@@ -57,6 +57,8 @@
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
</dependencies>
|
||||
|
||||
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
package com.tashow.cloud.gateway;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
|
||||
@@ -7,10 +7,10 @@ spring:
|
||||
username: nacos # Nacos 账号
|
||||
password: nacos # Nacos 密码
|
||||
discovery: # 【配置中心】配置项
|
||||
namespace: liwq # 命名空间。这里使用 dev 开发环境
|
||||
namespace: 76667956-2ac2-4e05-906b-4bca4ebcc5f0 # 命名空间。这里使用 dev 开发环境
|
||||
group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP
|
||||
config: # 【注册中心】配置项
|
||||
namespace: liwq # 命名空间。这里使用 dev 开发环境
|
||||
namespace: 76667956-2ac2-4e05-906b-4bca4ebcc5f0 # 命名空间。这里使用 dev 开发环境
|
||||
group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP
|
||||
|
||||
# 日志文件配置
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
<module>tashow-module-system</module>
|
||||
<module>tashow-module-infra</module>
|
||||
<module>tashow-module-app</module>
|
||||
<module>tashow-module-product</module>
|
||||
</modules>
|
||||
|
||||
</project>
|
||||
|
||||
@@ -28,7 +28,10 @@
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
<artifactId>tashow-framework-rpc</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
<artifactId>tashow-data-mybatis</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
<artifactId>tashow-framework-web</artifactId>
|
||||
@@ -42,7 +45,6 @@
|
||||
<artifactId>tashow-infra-api</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
<artifactId>tashow-framework-websocket</artifactId>
|
||||
@@ -55,7 +57,23 @@
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
<artifactId>tashow-framework-security</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-actuator</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
<artifactId>tashow-feishu-sdk</artifactId>
|
||||
<version>1.0.0</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.amqp</groupId>
|
||||
<artifactId>spring-rabbit</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
<artifactId>tashow-data-redis</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
||||
@@ -2,12 +2,15 @@ package com.tashow.cloud.app;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
|
||||
/**
|
||||
* Hello world!
|
||||
*
|
||||
* 应用服务启动类
|
||||
*/
|
||||
@SpringBootApplication
|
||||
@EnableScheduling
|
||||
@ComponentScan(basePackages = {"com.tashow.cloud.app", "com.tashow.cloud.sdk.feishu"})
|
||||
public class AppServerApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
package com.tashow.cloud.app.config;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* 应用配置类
|
||||
*/
|
||||
@Configuration
|
||||
public class AppConfig {
|
||||
|
||||
/**
|
||||
* 提供ObjectMapper bean用于JSON处理
|
||||
*/
|
||||
@Bean
|
||||
public ObjectMapper objectMapper() {
|
||||
return new ObjectMapper();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,64 @@
|
||||
package com.tashow.cloud.app.controller;
|
||||
|
||||
import cn.hutool.json.JSONObject;
|
||||
import com.lark.oapi.core.utils.Decryptor;
|
||||
import com.tashow.cloud.app.service.feishu.FeiShuCardDataService;
|
||||
import com.tashow.cloud.sdk.feishu.client.FeiShuAlertClient;
|
||||
import com.tashow.cloud.sdk.feishu.config.LarkConfig;
|
||||
import jakarta.annotation.security.PermitAll;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.Map;
|
||||
|
||||
@RestController
|
||||
public class FeishuController {
|
||||
private static final Logger log = LoggerFactory.getLogger(FeishuController.class);
|
||||
private static final String ACTION_COMPLETE_ALARM = "complete_alarm";
|
||||
private final FeiShuAlertClient feiShuAlertClient;
|
||||
private final FeiShuCardDataService feiShuCardDataService;
|
||||
private final LarkConfig larkConfig;
|
||||
|
||||
@Autowired
|
||||
public FeishuController(FeiShuAlertClient feiShuAlertClient, FeiShuCardDataService feiShuCardDataService, LarkConfig larkConfig) {
|
||||
this.feiShuAlertClient = feiShuAlertClient;
|
||||
this.feiShuCardDataService = feiShuCardDataService;
|
||||
this.larkConfig = larkConfig;
|
||||
}
|
||||
|
||||
@RequestMapping("/card")
|
||||
@PermitAll
|
||||
public String card(@RequestBody JSONObject data) {
|
||||
try {
|
||||
if (data.containsKey("app_id") && data.containsKey("action")) {
|
||||
JSONObject action = data.getJSONObject("action");
|
||||
JSONObject value = action.getJSONObject("value");
|
||||
if (value != null && ACTION_COMPLETE_ALARM.equals(value.getStr("action"))) {
|
||||
String messageId = data.getStr("open_message_id");
|
||||
Map<String, Object> templateData = feiShuCardDataService.getCardData(messageId);
|
||||
templateData.put("open_id", data.getStr("open_id"));
|
||||
templateData.put("complete_time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
|
||||
JSONObject fromValue = action.getJSONObject("form_value");
|
||||
templateData.put("notes", fromValue.getStr("notes_input"));
|
||||
return feiShuAlertClient.buildCardWithData(larkConfig.getSuccessCards(), templateData);
|
||||
}
|
||||
}
|
||||
if (data.containsKey("encrypt")) {
|
||||
Decryptor decryptor = new Decryptor(larkConfig.getEncryptKey());
|
||||
return decryptor.decrypt(data.getStr("encrypt"));
|
||||
}
|
||||
return "{}";
|
||||
} catch (Exception e) {
|
||||
log.error("卡片处理异常", e);
|
||||
return "{\"code\":1,\"msg\":\"处理异常: " + e.getMessage() + "\"}";
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,25 @@
|
||||
package com.tashow.cloud.app.controller;
|
||||
import com.tashow.cloud.app.mapper.BuriedPointMapper;
|
||||
import com.tashow.cloud.app.mq.producer.buriedPoint.BuriedPointProducer;
|
||||
import jakarta.annotation.security.PermitAll;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 测试控制器
|
||||
*/
|
||||
@RestController
|
||||
@RequiredArgsConstructor
|
||||
public class TestController {
|
||||
/**
|
||||
* 基础测试接口
|
||||
*/
|
||||
@GetMapping("/test")
|
||||
@PermitAll
|
||||
public String test() {
|
||||
return "test";
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,46 @@
|
||||
package com.tashow.cloud.app.interceptor;
|
||||
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import com.tashow.cloud.app.mq.message.BuriedMessages;
|
||||
import com.tashow.cloud.app.mq.producer.buriedPoint.BuriedPointProducer;
|
||||
import com.tashow.cloud.common.util.json.JsonUtils;
|
||||
import com.tashow.cloud.common.util.servlet.ServletUtils;
|
||||
import com.tashow.cloud.common.util.spring.SpringUtils;
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
import jakarta.servlet.http.HttpServletResponse;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.util.StopWatch;
|
||||
import org.springframework.web.method.HandlerMethod;
|
||||
import org.springframework.web.servlet.HandlerInterceptor;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
|
||||
/**
|
||||
* 后端静默埋点拦截器
|
||||
* 用于收集API请求信息并异步发送到消息队列
|
||||
*/
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class BuriedPointInterceptor implements HandlerInterceptor {
|
||||
|
||||
private final BuriedPointProducer buriedPointProducer;
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
|
||||
if (!(handler instanceof HandlerMethod handlerMethod)) {
|
||||
return true;
|
||||
}
|
||||
BuriedMessages message = new BuriedMessages(
|
||||
request,
|
||||
handlerMethod
|
||||
);
|
||||
buriedPointProducer.asyncSendMessage(message);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
package com.tashow.cloud.app.mapper;
|
||||
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import com.tashow.cloud.app.model.MqMessageRecord;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
|
||||
/**
|
||||
* 埋点消息发送记录Mapper接口
|
||||
*/
|
||||
@Mapper
|
||||
public interface BuriedPointFailRecordMapper extends BaseMapper<MqMessageRecord> {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
package com.tashow.cloud.app.mapper;
|
||||
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import com.tashow.cloud.app.model.BuriedPoint;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
import org.apache.ibatis.annotations.Param;
|
||||
import org.apache.ibatis.annotations.Select;
|
||||
|
||||
@Mapper
|
||||
public interface BuriedPointMapper extends BaseMapper<BuriedPoint> {
|
||||
|
||||
@Select("SELECT * FROM app_burying WHERE event_id = #{eventId} LIMIT 1")
|
||||
BuriedPoint selectByEventId(@Param("eventId") Integer eventId);
|
||||
}
|
||||
@@ -0,0 +1,128 @@
|
||||
package com.tashow.cloud.app.model;
|
||||
import com.baomidou.mybatisplus.annotation.IdType;
|
||||
import com.baomidou.mybatisplus.annotation.TableField;
|
||||
import com.baomidou.mybatisplus.annotation.TableId;
|
||||
import com.baomidou.mybatisplus.annotation.TableName;
|
||||
import com.tashow.cloud.app.mq.message.BuriedMessages;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.Accessors;
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
* 埋点数据实体类
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@Accessors(chain = true)
|
||||
@TableName(value = "app_burying")
|
||||
public class BuriedPoint {
|
||||
/**
|
||||
* 主键ID
|
||||
*/
|
||||
@TableId(value = "id", type = IdType.AUTO)
|
||||
private Integer id;
|
||||
/**
|
||||
* 事件唯一ID
|
||||
*/
|
||||
@TableField(value = "event_id")
|
||||
private Integer eventId;
|
||||
|
||||
/**
|
||||
* 事件时间戳
|
||||
*/
|
||||
@TableField(value = "event_time")
|
||||
private Long eventTime;
|
||||
|
||||
/**
|
||||
* 服务名称
|
||||
*/
|
||||
@TableField(value = "service")
|
||||
private String service;
|
||||
|
||||
/**
|
||||
* 方法/接口
|
||||
*/
|
||||
@TableField(value = "method")
|
||||
private String method;
|
||||
|
||||
/**
|
||||
* 用户标识
|
||||
*/
|
||||
@TableField(value = "user_id")
|
||||
private String userId;
|
||||
|
||||
/**
|
||||
* 会话标识
|
||||
*/
|
||||
@TableField(value = "session_id")
|
||||
private String sessionId;
|
||||
|
||||
/**
|
||||
* 客户端IP
|
||||
*/
|
||||
@TableField(value = "client_ip")
|
||||
private String clientIp;
|
||||
|
||||
/**
|
||||
* 服务器IP
|
||||
*/
|
||||
@TableField(value = "server_ip")
|
||||
private String serverIp;
|
||||
|
||||
/**
|
||||
* 事件类型
|
||||
*/
|
||||
@TableField(value = "event_type")
|
||||
private String eventType;
|
||||
|
||||
/**
|
||||
* 页面路径/功能模块
|
||||
*/
|
||||
@TableField(value = "page_path")
|
||||
private String pagePath;
|
||||
|
||||
/**
|
||||
* 元素标识
|
||||
*/
|
||||
@TableField(value = "element_id")
|
||||
private String elementId;
|
||||
|
||||
/**
|
||||
* 操作时长(毫秒)
|
||||
*/
|
||||
@TableField(value = "duration")
|
||||
private Long duration;
|
||||
|
||||
/**
|
||||
* 创建时间
|
||||
*/
|
||||
@TableField(value = "create_time")
|
||||
private Date createTime;
|
||||
|
||||
|
||||
@TableField(value = "update_time")
|
||||
private Date updateTime;
|
||||
|
||||
@TableField(value = "status")
|
||||
private Integer status;
|
||||
|
||||
|
||||
public BuriedPoint(BuriedMessages message) {
|
||||
this.eventId = message.getId();
|
||||
this.eventTime = System.currentTimeMillis();
|
||||
this.userId = message.getUserId();
|
||||
this.eventType = message.getEventType();
|
||||
this.service = message.getService();
|
||||
this.method = message.getMethod();
|
||||
this.sessionId = message.getSessionId();
|
||||
this.clientIp = message.getClientIp();
|
||||
this.serverIp = message.getServerIp();
|
||||
this.status = message.getStatusCode();
|
||||
this.pagePath = message.getPagePath();
|
||||
this.elementId = message.getElementId();
|
||||
this.createTime = new Date();
|
||||
this.updateTime = new Date();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,65 @@
|
||||
package com.tashow.cloud.app.model;
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.IdType;
|
||||
import com.baomidou.mybatisplus.annotation.TableId;
|
||||
import com.baomidou.mybatisplus.annotation.TableName;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
* 埋点消息发送失败记录实体类
|
||||
*/
|
||||
@Data
|
||||
@TableName("mq_message_record")
|
||||
public class MqMessageRecord {
|
||||
/**
|
||||
* 状态常量定义
|
||||
*/
|
||||
public static final int STATUS_UNPROCESSED = 10; // 未处理
|
||||
public static final int STATUS_SUCCESS = 20; // 处理成功
|
||||
public static final int STATUS_FAILED = 30; // 发送失败
|
||||
@TableId
|
||||
private Integer id;
|
||||
|
||||
|
||||
/**
|
||||
* 交换机名称
|
||||
*/
|
||||
private String exchange;
|
||||
|
||||
/**
|
||||
* 路由键
|
||||
*/
|
||||
private String routingKey;
|
||||
|
||||
/**
|
||||
* 失败原因
|
||||
*/
|
||||
private String cause;
|
||||
|
||||
/**
|
||||
* 消息内容
|
||||
*/
|
||||
private String messageContent;
|
||||
|
||||
/**
|
||||
* 重试次数
|
||||
*/
|
||||
private Integer retryCount;
|
||||
|
||||
/**
|
||||
* 状态:0-未处理,1-处理中,2-处理成功,3-处理失败
|
||||
*/
|
||||
private Integer status;
|
||||
|
||||
/**
|
||||
* 创建时间
|
||||
*/
|
||||
private Date createTime;
|
||||
|
||||
/**
|
||||
* 更新时间
|
||||
*/
|
||||
private Date updateTime;
|
||||
}
|
||||
@@ -0,0 +1,78 @@
|
||||
package com.tashow.cloud.app.mq.config;
|
||||
|
||||
import com.tashow.cloud.app.interceptor.BuriedPointInterceptor;
|
||||
import com.tashow.cloud.app.mq.message.BuriedMessages;
|
||||
import com.tashow.cloud.app.mq.producer.buriedPoint.BuriedPointProducer;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.*;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
|
||||
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
|
||||
|
||||
/**
|
||||
* 埋点功能配置类
|
||||
*/
|
||||
@Slf4j
|
||||
@Configuration
|
||||
@RequiredArgsConstructor
|
||||
public class BuriedPointConfiguration implements WebMvcConfigurer {
|
||||
|
||||
private final BuriedPointProducer buriedPointProducer;
|
||||
|
||||
/**
|
||||
* 创建埋点队列
|
||||
*/
|
||||
@Bean
|
||||
public Queue buriedPointQueue() {
|
||||
return new Queue(BuriedMessages.QUEUE, true, false, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建埋点交换机
|
||||
*/
|
||||
@Bean
|
||||
public DirectExchange buriedPointExchange() {
|
||||
return new DirectExchange(BuriedMessages.EXCHANGE, true, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建埋点绑定关系
|
||||
*/
|
||||
@Bean
|
||||
public Binding buriedPointBinding() {
|
||||
return BindingBuilder.bind(buriedPointQueue())
|
||||
.to(buriedPointExchange())
|
||||
.with(BuriedMessages.ROUTING_KEY);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建埋点拦截器
|
||||
*/
|
||||
@Bean
|
||||
public BuriedPointInterceptor buriedPointInterceptor() {
|
||||
return new BuriedPointInterceptor(buriedPointProducer);
|
||||
}
|
||||
|
||||
/**
|
||||
* 注册埋点拦截器
|
||||
*/
|
||||
@Override
|
||||
public void addInterceptors(InterceptorRegistry registry) {
|
||||
// 注册拦截器,拦截所有请求
|
||||
registry.addInterceptor(buriedPointInterceptor())
|
||||
// 可以根据需要添加或排除特定路径
|
||||
.addPathPatterns("/**")
|
||||
// 排除静态资源、Swagger等路径
|
||||
.excludePathPatterns(
|
||||
"/swagger-ui/**",
|
||||
"/swagger-resources/**",
|
||||
"/v3/api-docs/**",
|
||||
"/webjars/**",
|
||||
"/static/**",
|
||||
"/card",
|
||||
"/error"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,69 @@
|
||||
package com.tashow.cloud.app.mq.consumer.buriedPoint;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.tashow.cloud.app.mapper.BuriedPointMapper;
|
||||
import com.tashow.cloud.app.mapper.BuriedPointFailRecordMapper;
|
||||
import com.tashow.cloud.app.model.BuriedPoint;
|
||||
import com.tashow.cloud.app.model.MqMessageRecord;
|
||||
import com.tashow.cloud.app.mq.message.BuriedMessages;
|
||||
import com.tashow.cloud.app.service.feishu.BuriedPointMonitorService;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.tashow.cloud.mq.rabbitmq.consumer.AbstractRabbitMQConsumer;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.amqp.support.AmqpHeaders;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.messaging.handler.annotation.Header;
|
||||
import org.springframework.stereotype.Component;
|
||||
import java.util.Date;
|
||||
import org.springframework.dao.DuplicateKeyException;
|
||||
/**
|
||||
* 埋点消息消费者
|
||||
*/
|
||||
@Component
|
||||
@RabbitListener(queues = BuriedMessages.QUEUE)
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class BuriedPointConsumer extends AbstractRabbitMQConsumer<BuriedMessages> {
|
||||
private final BuriedPointMapper buriedPointMapper;
|
||||
private final BuriedPointMonitorService buriedPointMonitorService;
|
||||
|
||||
@Value("${spring.application.name:tashow-app}")
|
||||
private String applicationName;
|
||||
|
||||
@RabbitHandler
|
||||
public void handleMessage(BuriedMessages message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
|
||||
onMessage(message, channel, deliveryTag);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理埋点消息
|
||||
* @param message 消息对象
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
public boolean processMessage(BuriedMessages message) {
|
||||
try {
|
||||
BuriedPoint existingPoint = buriedPointMapper.selectByEventId(message.getId());
|
||||
if (existingPoint != null) {
|
||||
existingPoint.setStatus(message.getStatusCode());
|
||||
existingPoint.setUpdateTime(new Date());
|
||||
return buriedPointMapper.updateById(existingPoint) > 0;
|
||||
}
|
||||
BuriedPoint buriedPoint = new BuriedPoint(message);
|
||||
buriedPoint.setService(applicationName);
|
||||
buriedPointMapper.insert(buriedPoint);
|
||||
|
||||
if(buriedPoint.getStatus() == BuriedMessages.STATUS_ERROR){
|
||||
buriedPointMonitorService.checkFailRecordsAndAlert("埋点数据处理异常");
|
||||
}
|
||||
return true;
|
||||
} catch (DuplicateKeyException e) {
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
log.error("[埋点消费者] 保存数据失败", e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,100 @@
|
||||
package com.tashow.cloud.app.mq.handler;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.tashow.cloud.app.mapper.BuriedPointFailRecordMapper;
|
||||
import com.tashow.cloud.app.model.MqMessageRecord;
|
||||
import com.tashow.cloud.app.service.feishu.BuriedPointMonitorService;
|
||||
import com.tashow.cloud.mq.handler.FailRecordHandler;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import java.util.Date;
|
||||
/**
|
||||
* MQ消息记录处理器
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class BuriedPointFailRecordHandler implements FailRecordHandler {
|
||||
private final BuriedPointFailRecordMapper buriedPointFailRecordMapper;
|
||||
private final BuriedPointMonitorService buriedPointMonitorService;
|
||||
/**
|
||||
* 保存消息记录=
|
||||
*/
|
||||
@Override
|
||||
public void saveMessageRecord(Integer id, String exchange, String routingKey, String cause, String messageContent, int status) {
|
||||
try {
|
||||
MqMessageRecord existingRecord = findExistingRecord(id);
|
||||
if (existingRecord != null) {
|
||||
existingRecord.setRetryCount(existingRecord.getRetryCount() + 1);
|
||||
existingRecord.setMessageContent(messageContent);
|
||||
existingRecord.setStatus(status);
|
||||
existingRecord.setCause(cause);
|
||||
existingRecord.setUpdateTime(new Date());
|
||||
buriedPointFailRecordMapper.updateById(existingRecord);
|
||||
} else {
|
||||
MqMessageRecord record = new MqMessageRecord();
|
||||
record.setId(id);
|
||||
record.setExchange(exchange);
|
||||
record.setRoutingKey(routingKey);
|
||||
record.setCause(cause);
|
||||
record.setMessageContent(messageContent);
|
||||
record.setRetryCount(0);
|
||||
record.setStatus(status);
|
||||
record.setCreateTime(new Date());
|
||||
record.setUpdateTime(new Date());
|
||||
buriedPointFailRecordMapper.insert(record);
|
||||
if (status == MqMessageRecord.STATUS_FAILED) {
|
||||
buriedPointMonitorService.checkFailRecordsAndAlert(cause);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[MQ消息处理器] 保存消息记录异常", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新消息状态
|
||||
*/
|
||||
@Override
|
||||
public void updateMessageStatus(Integer id) {
|
||||
try {
|
||||
MqMessageRecord record = findExistingRecord(id);
|
||||
if (record != null) {
|
||||
record.setStatus(MqMessageRecord.STATUS_SUCCESS);
|
||||
record.setUpdateTime(new Date());
|
||||
buriedPointFailRecordMapper.updateById(record);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[MQ消息处理器] 更新消息状态异常: {}", id, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新消息状态并设置失败原因
|
||||
*/
|
||||
@Override
|
||||
public void updateMessageStatusWithCause(Integer id, String cause) {
|
||||
try {
|
||||
MqMessageRecord record = findExistingRecord(id);
|
||||
if (record != null) {
|
||||
record.setStatus(MqMessageRecord.STATUS_FAILED);
|
||||
record.setCause(cause);
|
||||
record.setUpdateTime(new Date());
|
||||
buriedPointFailRecordMapper.updateById(record);
|
||||
buriedPointMonitorService.checkFailRecordsAndAlert(cause);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[MQ消息处理器] 更新消息状态和原因异常: {}", id, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 查找已存在的失败记录
|
||||
*/
|
||||
private MqMessageRecord findExistingRecord(Integer id) {
|
||||
LambdaQueryWrapper<MqMessageRecord> queryWrapper = new LambdaQueryWrapper<>();
|
||||
queryWrapper.eq(MqMessageRecord::getId, id);
|
||||
return buriedPointFailRecordMapper.selectOne(queryWrapper);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,142 @@
|
||||
package com.tashow.cloud.app.mq.message;
|
||||
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import com.tashow.cloud.common.util.json.JsonUtils;
|
||||
import com.tashow.cloud.common.util.servlet.ServletUtils;
|
||||
import com.tashow.cloud.common.util.spring.SpringUtils;
|
||||
import com.tashow.cloud.mq.core.BaseMqMessage;
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
import lombok.Data;
|
||||
import org.springframework.web.method.HandlerMethod;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Date;
|
||||
|
||||
import static com.tashow.cloud.web.apilog.core.interceptor.ApiAccessLogInterceptor.ATTRIBUTE_HANDLER_METHOD;
|
||||
|
||||
/**
|
||||
* 埋点消息
|
||||
*/
|
||||
@Data
|
||||
public class BuriedMessages extends BaseMqMessage {
|
||||
private static final String ATTRIBUTE_REQUEST_ID = "BuriedPoint.RequestId";
|
||||
|
||||
|
||||
/**
|
||||
* 交换机名称
|
||||
*/
|
||||
public static final String EXCHANGE = "tashow.buried.point.exchange";
|
||||
|
||||
/**
|
||||
* 队列名称
|
||||
*/
|
||||
public static final String QUEUE = "tashow.buried.point.queue";
|
||||
|
||||
/**
|
||||
* 路由键
|
||||
*/
|
||||
public static final String ROUTING_KEY = "tashow.buried.point.routing.key";
|
||||
|
||||
/**
|
||||
* 消息状态:处理中
|
||||
*/
|
||||
public static final int STATUS_PROCESSING = 10;
|
||||
|
||||
/**
|
||||
* 消息状态:成功
|
||||
*/
|
||||
public static final int STATUS_SUCCESS = 20;
|
||||
|
||||
/**
|
||||
* 消息状态:失败
|
||||
*/
|
||||
public static final int STATUS_ERROR = 30;
|
||||
|
||||
/**
|
||||
* 事件时间
|
||||
*/
|
||||
private Date eventTime;
|
||||
|
||||
/**
|
||||
* 用户ID
|
||||
*/
|
||||
private String userId;
|
||||
|
||||
/**
|
||||
* 事件类型
|
||||
*/
|
||||
private String eventType;
|
||||
|
||||
/**
|
||||
* 方法名称
|
||||
*/
|
||||
private String method;
|
||||
|
||||
/**
|
||||
* 会话ID
|
||||
*/
|
||||
private String sessionId;
|
||||
|
||||
/**
|
||||
* 客户端IP
|
||||
*/
|
||||
private String clientIp;
|
||||
|
||||
/**
|
||||
* 服务端IP
|
||||
*/
|
||||
private String serverIp;
|
||||
|
||||
/**
|
||||
* 页面路径
|
||||
*/
|
||||
private String pagePath;
|
||||
|
||||
/**
|
||||
* 元素ID
|
||||
*/
|
||||
private String elementId;
|
||||
|
||||
/**
|
||||
* 服务名称
|
||||
*/
|
||||
private String service;
|
||||
|
||||
public BuriedMessages() {
|
||||
}
|
||||
|
||||
/**
|
||||
* 从请求创建埋点消息
|
||||
*
|
||||
* @param request HTTP请求
|
||||
* @param handlerMethod 处理方法
|
||||
*/
|
||||
public BuriedMessages(HttpServletRequest request, HandlerMethod handlerMethod) {
|
||||
try {
|
||||
int requestId = (int)(Math.abs(IdUtil.getSnowflakeNextId()) % Integer.MAX_VALUE);
|
||||
this.setId(requestId);
|
||||
this.eventTime = new Date();
|
||||
this.service = SpringUtils.getApplicationName();
|
||||
this.method = request.getMethod() + " " + request.getRequestURI() +
|
||||
JsonUtils.toJsonString(request.getParameterMap());
|
||||
Object userId = request.getSession().getAttribute("USER_ID");
|
||||
this.userId = userId != null ? userId.toString() : "anonymous";
|
||||
this.sessionId = request.getSession().getId();
|
||||
this.clientIp = ServletUtils.getClientIP(request);
|
||||
try {
|
||||
this.serverIp = InetAddress.getLocalHost().getHostAddress();
|
||||
} catch (UnknownHostException e) {
|
||||
this.serverIp = "unknown";
|
||||
}
|
||||
String controllerName = handlerMethod.getBeanType().getSimpleName();
|
||||
String actionName = handlerMethod.getMethod().getName();
|
||||
this.pagePath = controllerName + "#" + actionName;
|
||||
this.eventType = "API_REQUEST_START";
|
||||
this.setStatusCode(STATUS_PROCESSING);
|
||||
request.setAttribute(ATTRIBUTE_REQUEST_ID, this.getId());
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("创建埋点消息失败", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
package com.tashow.cloud.app.mq.producer.buriedPoint;
|
||||
|
||||
import com.tashow.cloud.app.mq.message.BuriedMessages;
|
||||
import com.tashow.cloud.common.util.json.JsonUtils;
|
||||
import com.tashow.cloud.mq.rabbitmq.producer.AbstractRabbitMQProducer;
|
||||
import org.springframework.amqp.core.ReturnedMessage;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 埋点消息生产者
|
||||
*/
|
||||
@Component
|
||||
public class BuriedPointProducer extends AbstractRabbitMQProducer<BuriedMessages> {
|
||||
|
||||
@Override
|
||||
public void returnedMessage(ReturnedMessage returned) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getExchange() {
|
||||
return BuriedMessages.EXCHANGE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRoutingKey() {
|
||||
return BuriedMessages.ROUTING_KEY;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -20,7 +20,6 @@ public class SecurityConfiguration {
|
||||
@Bean("infraAuthorizeRequestsCustomizer")
|
||||
public AuthorizeRequestsCustomizer authorizeRequestsCustomizer() {
|
||||
return new AuthorizeRequestsCustomizer() {
|
||||
|
||||
@Override
|
||||
public void customize(AuthorizeHttpRequestsConfigurer<HttpSecurity>.AuthorizationManagerRequestMatcherRegistry registry) {
|
||||
// Spring Boot Actuator 的安全配置
|
||||
|
||||
@@ -0,0 +1,241 @@
|
||||
package com.tashow.cloud.app.service.feishu;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.tashow.cloud.app.mapper.BuriedPointFailRecordMapper;
|
||||
import com.tashow.cloud.app.mapper.BuriedPointMapper;
|
||||
import com.tashow.cloud.app.model.BuriedPoint;
|
||||
import com.tashow.cloud.app.model.MqMessageRecord;
|
||||
import com.tashow.cloud.app.mq.message.BuriedMessages;
|
||||
import com.tashow.cloud.sdk.feishu.client.FeiShuAlertClient;
|
||||
import com.tashow.cloud.sdk.feishu.config.LarkConfig;
|
||||
import com.tashow.cloud.sdk.feishu.util.ChartImageGenerator;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* 埋点监控服务
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class BuriedPointMonitorService {
|
||||
private static final int ALERT_THRESHOLD = 3;
|
||||
private static final int MONITORING_HOURS = 12;
|
||||
private final Map<String, Long> alertCache = new ConcurrentHashMap<>();
|
||||
|
||||
private final BuriedPointFailRecordMapper buriedPointFailRecordMapper;
|
||||
private final BuriedPointMapper buriedPointMapper;
|
||||
private final FeiShuAlertClient feiShuAlertClient;
|
||||
private final FeiShuCardDataService feiShuCardDataService;
|
||||
private final LarkConfig larkConfig;
|
||||
|
||||
/**
|
||||
* 检查失败记录并发送告警
|
||||
*/
|
||||
public boolean checkFailRecordsAndAlert(String cause) {
|
||||
try {
|
||||
Date now = new Date();
|
||||
Date hoursAgo = getDateHoursAgo(now, MONITORING_HOURS);
|
||||
boolean sentAlert = false;
|
||||
List<Date[]> timeRanges = getHourRanges(hoursAgo, now);
|
||||
long mqFailCount = countFailures(buriedPointFailRecordMapper, MqMessageRecord.class, hoursAgo, now);
|
||||
long buriedFailCount = countFailures(buriedPointMapper, BuriedPoint.class, hoursAgo, now);
|
||||
if (mqFailCount > ALERT_THRESHOLD||buriedFailCount > ALERT_THRESHOLD) {
|
||||
if (!hasRecentlySentAlert(cause)) {
|
||||
sendAlert(mqFailCount, cause, getMqStats(timeRanges));
|
||||
alertCache.put(cause, System.currentTimeMillis());
|
||||
sentAlert = true;
|
||||
}
|
||||
}
|
||||
|
||||
return sentAlert;
|
||||
} catch (Exception e) {
|
||||
log.error("[埋点监控] 检查失败记录异常", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查是否最近已发送过相同类型的告警
|
||||
*/
|
||||
private boolean hasRecentlySentAlert(String alertType) {
|
||||
Long lastSentTime = alertCache.get(alertType);
|
||||
if (lastSentTime == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
long hourInMillis = MONITORING_HOURS * 60 * 60 * 1000L;
|
||||
return (System.currentTimeMillis() - lastSentTime) < hourInMillis;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 获取消息队列统计数据
|
||||
*/
|
||||
private List<ChartImageGenerator.MonitoringDataPoint> getMqStats(List<Date[]> timeRanges) {
|
||||
Map<Date, Integer> successData = batchQueryMqStatus(timeRanges, MqMessageRecord.STATUS_SUCCESS);
|
||||
Map<Date, Integer> failedData = batchQueryMqFailures(timeRanges);
|
||||
|
||||
SimpleDateFormat timeFormat = new SimpleDateFormat("HH:00");
|
||||
return timeRanges.stream()
|
||||
.map(range -> new ChartImageGenerator.MonitoringDataPoint(
|
||||
timeFormat.format(range[0]),
|
||||
successData.getOrDefault(range[0], 0),
|
||||
failedData.getOrDefault(range[0], 0)
|
||||
))
|
||||
.toList();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取埋点表统计数据
|
||||
*/
|
||||
private List<ChartImageGenerator.MonitoringDataPoint> getBuriedStats(List<Date[]> timeRanges) {
|
||||
// 批量查询每个时间区间的数据
|
||||
Map<Date, Integer> successData = batchQueryBuriedStatus(timeRanges, BuriedMessages.STATUS_SUCCESS);
|
||||
Map<Date, Integer> failedData = batchQueryBuriedStatus(timeRanges, BuriedMessages.STATUS_ERROR);
|
||||
SimpleDateFormat timeFormat = new SimpleDateFormat("HH:00");
|
||||
return timeRanges.stream()
|
||||
.map(range -> new ChartImageGenerator.MonitoringDataPoint(
|
||||
timeFormat.format(range[0]),
|
||||
successData.getOrDefault(range[0], 0),
|
||||
failedData.getOrDefault(range[0], 0)
|
||||
))
|
||||
.toList();
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询MQ状态数据
|
||||
*/
|
||||
private Map<Date, Integer> batchQueryMqStatus(List<Date[]> timeRanges, int status) {
|
||||
Map<Date, Integer> result = new HashMap<>();
|
||||
for (Date[] range : timeRanges) {
|
||||
LambdaQueryWrapper<MqMessageRecord> query = new LambdaQueryWrapper<>();
|
||||
query.ge(MqMessageRecord::getCreateTime, range[0])
|
||||
.lt(MqMessageRecord::getCreateTime, range[1])
|
||||
.eq(MqMessageRecord::getStatus, status);
|
||||
result.put(range[0], buriedPointFailRecordMapper.selectCount(query).intValue());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量查询MQ失败数据
|
||||
*/
|
||||
private Map<Date, Integer> batchQueryMqFailures(List<Date[]> timeRanges) {
|
||||
Map<Date, Integer> result = new HashMap<>();
|
||||
for (Date[] range : timeRanges) {
|
||||
LambdaQueryWrapper<MqMessageRecord> query = new LambdaQueryWrapper<>();
|
||||
query.ge(MqMessageRecord::getCreateTime, range[0])
|
||||
.lt(MqMessageRecord::getCreateTime, range[1])
|
||||
.in(MqMessageRecord::getStatus, Arrays.asList(
|
||||
MqMessageRecord.STATUS_UNPROCESSED, MqMessageRecord.STATUS_FAILED));
|
||||
result.put(range[0], buriedPointFailRecordMapper.selectCount(query).intValue());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量查询埋点状态数据
|
||||
*/
|
||||
private Map<Date, Integer> batchQueryBuriedStatus(List<Date[]> timeRanges, int status) {
|
||||
Map<Date, Integer> result = new HashMap<>();
|
||||
for (Date[] range : timeRanges) {
|
||||
LambdaQueryWrapper<BuriedPoint> query = new LambdaQueryWrapper<>();
|
||||
query.ge(BuriedPoint::getCreateTime, range[0])
|
||||
.lt(BuriedPoint::getCreateTime, range[1])
|
||||
.eq(BuriedPoint::getStatus, status);
|
||||
result.put(range[0], buriedPointMapper.selectCount(query).intValue());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* 计算失败数量
|
||||
*/
|
||||
private <T> long countFailures(Object mapper, Class<T> entityClass, Date startDate, Date endDate) {
|
||||
try {
|
||||
if (entityClass == BuriedPoint.class) {
|
||||
LambdaQueryWrapper<BuriedPoint> query = new LambdaQueryWrapper<>();
|
||||
query.ge(BuriedPoint::getCreateTime, startDate)
|
||||
.le(BuriedPoint::getCreateTime, endDate)
|
||||
.eq(BuriedPoint::getStatus, BuriedMessages.STATUS_ERROR);
|
||||
return ((BuriedPointMapper)mapper).selectCount(query);
|
||||
} else {
|
||||
LambdaQueryWrapper<MqMessageRecord> query = new LambdaQueryWrapper<>();
|
||||
query.ge(MqMessageRecord::getCreateTime, startDate)
|
||||
.le(MqMessageRecord::getCreateTime, endDate)
|
||||
.eq(MqMessageRecord::getStatus, MqMessageRecord.STATUS_FAILED);
|
||||
return ((BuriedPointFailRecordMapper)mapper).selectCount(query);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送告警
|
||||
*/
|
||||
private void sendAlert(long failCount, String alertMsg, List<ChartImageGenerator.MonitoringDataPoint> data) {
|
||||
try {
|
||||
String imageKey = feiShuAlertClient.uploadImage(data, alertMsg);
|
||||
String title = alertMsg.split(":")[0].trim();
|
||||
|
||||
Map<String, Object> templateData = Map.of(
|
||||
"alert_title", title,
|
||||
"image_key", Map.of("img_key", imageKey),
|
||||
"current_time", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()),
|
||||
"fail_count", failCount
|
||||
);
|
||||
|
||||
String messageId = feiShuAlertClient.sendCardMessage(
|
||||
larkConfig.getChatId(),
|
||||
larkConfig.getExceptionCards(),
|
||||
new HashMap<>(templateData)
|
||||
);
|
||||
feiShuCardDataService.saveCardData(messageId, templateData);
|
||||
} catch (Exception e) {
|
||||
log.error("[埋点监控] 发送告警失败: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取小时范围列表
|
||||
*/
|
||||
private List<Date[]> getHourRanges(Date startDate, Date endDate) {
|
||||
List<Date[]> ranges = new ArrayList<>();
|
||||
Calendar cal = Calendar.getInstance();
|
||||
|
||||
cal.setTime(endDate);
|
||||
cal.set(Calendar.MINUTE, 0);
|
||||
cal.set(Calendar.SECOND, 0);
|
||||
cal.set(Calendar.MILLISECOND, 0);
|
||||
Date endHour = cal.getTime();
|
||||
|
||||
cal.add(Calendar.HOUR_OF_DAY, -(MONITORING_HOURS - 1));
|
||||
Date startHour = startDate.after(cal.getTime()) ? startDate : cal.getTime();
|
||||
|
||||
cal.setTime(startHour);
|
||||
while (!cal.getTime().after(endHour)) {
|
||||
Date hourStart = cal.getTime();
|
||||
cal.add(Calendar.HOUR_OF_DAY, 1);
|
||||
Date hourEnd = cal.getTime().after(endDate) ? endDate : cal.getTime();
|
||||
ranges.add(new Date[]{hourStart, hourEnd});
|
||||
|
||||
if (hourEnd.equals(endDate)) break;
|
||||
}
|
||||
return ranges;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取指定时间前N小时的时间
|
||||
*/
|
||||
private Date getDateHoursAgo(Date date, int hours) {
|
||||
Calendar calendar = Calendar.getInstance();
|
||||
calendar.setTime(date);
|
||||
calendar.add(Calendar.HOUR_OF_DAY, -hours);
|
||||
return calendar.getTime();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
package com.tashow.cloud.app.service.feishu;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 飞书卡片数据处理服务
|
||||
*/
|
||||
@Service
|
||||
public class FeiShuCardDataService {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(FeiShuCardDataService.class);
|
||||
private static final String REDIS_KEY_PREFIX = "feishu:card:";
|
||||
private static final int CARD_EXPIRATION_DAYS = 30;
|
||||
|
||||
private final StringRedisTemplate stringRedisTemplate;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
@Autowired
|
||||
public FeiShuCardDataService(StringRedisTemplate stringRedisTemplate, ObjectMapper objectMapper) {
|
||||
this.stringRedisTemplate = stringRedisTemplate;
|
||||
this.objectMapper = objectMapper;
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存卡片数据到Redis
|
||||
*/
|
||||
public boolean saveCardData(String messageId, Map<String, Object> data) {
|
||||
if (messageId == null || data == null) return false;
|
||||
|
||||
try {
|
||||
String jsonData = objectMapper.writeValueAsString(data);
|
||||
stringRedisTemplate.opsForValue().set(
|
||||
REDIS_KEY_PREFIX + messageId,
|
||||
jsonData,
|
||||
CARD_EXPIRATION_DAYS,
|
||||
TimeUnit.DAYS
|
||||
);
|
||||
return true;
|
||||
} catch (JsonProcessingException e) {
|
||||
log.error("保存卡片数据失败: {}", e.getMessage());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 从Redis获取卡片数据
|
||||
*/
|
||||
public Map<String, Object> getCardData(String messageId) {
|
||||
try {
|
||||
String redisKey = REDIS_KEY_PREFIX + messageId;
|
||||
String jsonData = stringRedisTemplate.opsForValue().get(redisKey);
|
||||
|
||||
if (jsonData == null) return new HashMap<>();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> result = objectMapper.readValue(jsonData, Map.class);
|
||||
return result;
|
||||
} catch (Exception e) {
|
||||
log.error("获取卡片数据失败: {}", e.getMessage());
|
||||
return new HashMap<>();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
package com.tashow.cloud.app.service.impl;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.tashow.cloud.app.mapper.BuriedPointFailRecordMapper;
|
||||
import com.tashow.cloud.app.model.MqMessageRecord;
|
||||
import com.tashow.cloud.app.mq.message.BuriedMessages;
|
||||
import com.tashow.cloud.app.mq.producer.buriedPoint.BuriedPointProducer;
|
||||
import com.tashow.cloud.common.util.json.JsonUtils;
|
||||
import com.tashow.cloud.mq.retry.MessageRetryService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 埋点失败记录服务
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class BuriedPointFailRecordService implements MessageRetryService<MqMessageRecord> {
|
||||
|
||||
private final BuriedPointFailRecordMapper buriedPointFailRecordMapper;
|
||||
private final BuriedPointProducer buriedPointProducer;
|
||||
|
||||
/**
|
||||
* 获取未处理的失败记录
|
||||
*/
|
||||
@Override
|
||||
public List<MqMessageRecord> getUnprocessedRecords() {
|
||||
LambdaQueryWrapper<MqMessageRecord> queryWrapper = new LambdaQueryWrapper<>();
|
||||
queryWrapper.eq(MqMessageRecord::getStatus, MqMessageRecord.STATUS_FAILED)
|
||||
.orderByAsc(MqMessageRecord::getCreateTime);
|
||||
return buriedPointFailRecordMapper.selectList(queryWrapper);
|
||||
}
|
||||
|
||||
/**
|
||||
* 重试失败消息
|
||||
*/
|
||||
@Override
|
||||
public void retryFailedMessage(Integer recordId) {
|
||||
try {
|
||||
Long id = Long.valueOf(recordId);
|
||||
MqMessageRecord record = buriedPointFailRecordMapper.selectById(id);
|
||||
BuriedMessages message = JsonUtils.parseObject(record.getMessageContent(), BuriedMessages.class);
|
||||
buriedPointProducer.asyncSendMessage(message);
|
||||
} catch (Exception e) {
|
||||
log.error("[埋点重试] 重试失败", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,41 @@
|
||||
package com.tashow.cloud.app.task;
|
||||
import com.tashow.cloud.app.model.MqMessageRecord;
|
||||
import com.tashow.cloud.app.service.impl.BuriedPointFailRecordService;
|
||||
import com.tashow.cloud.mq.retry.AbstractMessageRetryTask;
|
||||
import com.tashow.cloud.mq.retry.MessageRetryService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 埋点消息重试定时任务
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class BuriedPointRetryTask extends AbstractMessageRetryTask<MqMessageRecord> {
|
||||
|
||||
private final BuriedPointFailRecordService buriedPointFailRecordService;
|
||||
|
||||
/**
|
||||
* 定时重试失败消息
|
||||
* 每天凌晨执行一次
|
||||
*/
|
||||
@Scheduled(cron = "0 0 0 * * ?")
|
||||
// @Scheduled(cron = "0/10 * * * * ?")
|
||||
public void execute() {
|
||||
retryFailedMessages();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MessageRetryService<MqMessageRecord> getMessageRetryService() {
|
||||
return buriedPointFailRecordService;
|
||||
}
|
||||
@Override
|
||||
protected Integer getRecordId(MqMessageRecord record) {
|
||||
return record.getId();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -7,10 +7,10 @@ spring:
|
||||
username: nacos # Nacos 账号
|
||||
password: nacos # Nacos 密码
|
||||
discovery: # 【配置中心】配置项
|
||||
namespace: liwq # 命名空间。这里使用 dev 开发环境
|
||||
namespace: 5c8b8fe6-9a89-4ae3-975e-ef3bf560ff82 # 命名空间。这里使用 dev 开发环境
|
||||
group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP
|
||||
metadata:
|
||||
version: 1.0.0 # 服务实例的版本号,可用于灰度发布
|
||||
config: # 【注册中心】配置项
|
||||
namespace: liwq # 命名空间。这里使用 dev 开发环境
|
||||
namespace: 5c8b8fe6-9a89-4ae3-975e-ef3bf560ff82 # 命名空间。这里使用 dev 开发环境
|
||||
group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
## AdoptOpenJDK 停止发布 OpenJDK 二进制,而 Eclipse Temurin 是它的延伸,提供更好的稳定性
|
||||
## 感谢复旦核博士的建议!灰子哥,牛皮!
|
||||
FROM eclipse-temurin:21-jre
|
||||
FROM eclipse-temurin:17-jre
|
||||
|
||||
## 创建目录,并使用它作为工作目录
|
||||
RUN mkdir -p /yudao-module-infra-biz
|
||||
WORKDIR /yudao-module-infra-biz
|
||||
RUN mkdir -p /home/java-work/system-infra
|
||||
WORKDIR /home/java-work/system-infra
|
||||
## 将后端项目的 Jar 文件,复制到镜像中
|
||||
COPY ./target/yudao-module-infra-biz.jar app.jar
|
||||
COPY ./target/tashow-module-infra.jar app.jar
|
||||
|
||||
## 设置 TZ 时区
|
||||
## 设置 JAVA_OPTS 环境变量,可通过 docker run -e "JAVA_OPTS=" 进行覆盖
|
||||
|
||||
@@ -37,6 +37,14 @@
|
||||
<version>${revision}</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>com.alibaba.otter</groupId>
|
||||
<artifactId>canal.client</artifactId>
|
||||
<version>1.1.0</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<!-- 业务组件 -->
|
||||
<dependency>
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
@@ -69,6 +77,13 @@
|
||||
<artifactId>tashow-data-redis</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!--<dependency>
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
<artifactId>tashow-data-canal</artifactId>
|
||||
<version>1.0.0</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>-->
|
||||
|
||||
<!-- RPC 远程调用相关 -->
|
||||
<dependency>
|
||||
<groupId>com.tashow.cloud</groupId>
|
||||
|
||||
@@ -0,0 +1,204 @@
|
||||
package com.tashow.cloud.infra.framework;
|
||||
|
||||
|
||||
import com.alibaba.otter.canal.client.CanalConnector;
|
||||
import com.alibaba.otter.canal.client.CanalConnectors;
|
||||
import com.alibaba.otter.canal.protocol.CanalEntry.*;
|
||||
import com.alibaba.otter.canal.protocol.Message;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.sql.Connection;
|
||||
import java.sql.SQLException;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
@Component
|
||||
public class CanalClient {
|
||||
|
||||
//sql队列
|
||||
private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
|
||||
|
||||
|
||||
/**
|
||||
* canal入库方法
|
||||
*/
|
||||
public void run() {
|
||||
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("43.139.42.137",
|
||||
11111), "example", "", "");
|
||||
int batchSize = 1000;
|
||||
try {
|
||||
connector.connect();
|
||||
// connector.subscribe(".*\\..*");
|
||||
connector.subscribe("tashow-platform");
|
||||
|
||||
connector.rollback();
|
||||
try {
|
||||
while (true) {
|
||||
//尝试从master那边拉去数据batchSize条记录,有多少取多少
|
||||
Message message = connector.getWithoutAck(batchSize);
|
||||
long batchId = message.getId();
|
||||
int size = message.getEntries().size();
|
||||
if (batchId == -1 || size == 0) {
|
||||
Thread.sleep(1000);
|
||||
} else {
|
||||
dataHandle(message.getEntries());
|
||||
}
|
||||
connector.ack(batchId);
|
||||
|
||||
//当队列里面堆积的sql大于一定数值的时候就模拟执行
|
||||
if (SQL_QUEUE.size() >= 1) {
|
||||
executeQueueSql();
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
} finally {
|
||||
connector.disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 模拟执行队列里面的sql语句
|
||||
*/
|
||||
public void executeQueueSql() {
|
||||
int size = SQL_QUEUE.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
String sql = SQL_QUEUE.poll();
|
||||
System.out.println("[sql]----> " + sql);
|
||||
|
||||
this.execute(sql);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 数据处理
|
||||
*
|
||||
* @param entrys
|
||||
*/
|
||||
private void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException {
|
||||
for (Entry entry : entrys) {
|
||||
if(entry.getHeader().getSchemaName().equals("hc")){
|
||||
return;
|
||||
}
|
||||
if (EntryType.ROWDATA == entry.getEntryType()) {
|
||||
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
|
||||
EventType eventType = rowChange.getEventType();
|
||||
if (eventType == EventType.DELETE) {
|
||||
saveDeleteSql(entry);
|
||||
} else if (eventType == EventType.UPDATE) {
|
||||
saveUpdateSql(entry);
|
||||
} else if (eventType == EventType.INSERT) {
|
||||
saveInsertSql(entry);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存更新语句
|
||||
*
|
||||
* @param entry
|
||||
*/
|
||||
private void saveUpdateSql(Entry entry) {
|
||||
try {
|
||||
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
|
||||
List<RowData> rowDatasList = rowChange.getRowDatasList();
|
||||
for (RowData rowData : rowDatasList) {
|
||||
List<Column> newColumnList = rowData.getAfterColumnsList();
|
||||
StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");
|
||||
for (int i = 0; i < newColumnList.size(); i++) {
|
||||
sql.append(" " + newColumnList.get(i).getName()
|
||||
+ " = '" + newColumnList.get(i).getValue() + "'");
|
||||
if (i != newColumnList.size() - 1) {
|
||||
sql.append(",");
|
||||
}
|
||||
}
|
||||
sql.append(" where ");
|
||||
List<Column> oldColumnList = rowData.getBeforeColumnsList();
|
||||
for (Column column : oldColumnList) {
|
||||
if (column.getIsKey()) {
|
||||
//暂时只支持单一主键
|
||||
sql.append(column.getName() + "=" + column.getValue());
|
||||
break;
|
||||
}
|
||||
}
|
||||
SQL_QUEUE.add(sql.toString());
|
||||
}
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存删除语句
|
||||
*
|
||||
* @param entry
|
||||
*/
|
||||
private void saveDeleteSql(Entry entry) {
|
||||
try {
|
||||
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
|
||||
List<RowData> rowDatasList = rowChange.getRowDatasList();
|
||||
for (RowData rowData : rowDatasList) {
|
||||
List<Column> columnList = rowData.getBeforeColumnsList();
|
||||
StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");
|
||||
for (Column column : columnList) {
|
||||
if (column.getIsKey()) {
|
||||
//暂时只支持单一主键
|
||||
sql.append(column.getName() + "=" + column.getValue());
|
||||
break;
|
||||
}
|
||||
}
|
||||
SQL_QUEUE.add(sql.toString());
|
||||
}
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存插入语句
|
||||
*
|
||||
* @param entry
|
||||
*/
|
||||
private void saveInsertSql(Entry entry) {
|
||||
try {
|
||||
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
|
||||
List<RowData> rowDatasList = rowChange.getRowDatasList();
|
||||
for (RowData rowData : rowDatasList) {
|
||||
List<Column> columnList = rowData.getAfterColumnsList();
|
||||
StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");
|
||||
for (int i = 0; i < columnList.size(); i++) {
|
||||
sql.append(columnList.get(i).getName());
|
||||
if (i != columnList.size() - 1) {
|
||||
sql.append(",");
|
||||
}
|
||||
}
|
||||
sql.append(") VALUES (");
|
||||
for (int i = 0; i < columnList.size(); i++) {
|
||||
sql.append("'" + columnList.get(i).getValue() + "'");
|
||||
if (i != columnList.size() - 1) {
|
||||
sql.append(",");
|
||||
}
|
||||
}
|
||||
sql.append(")");
|
||||
SQL_QUEUE.add(sql.toString());
|
||||
}
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 入库
|
||||
* @param sql
|
||||
*/
|
||||
public void execute(String sql) {
|
||||
System.out.println("sql======="+sql);
|
||||
}
|
||||
}
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user