Merge branch 'feature/zijie' of http://gitea.tashowz.com/tashow/tashow-platform into feature/zijie

# Conflicts:
#	tashow-dependencies/pom.xml
#	tashow-module/pom.xml
#	tashow-module/tashow-module-system/tashow-module-system-biz/src/main/resources/application.yaml
This commit is contained in:
2025-05-26 17:37:29 +08:00
789 changed files with 1483 additions and 2390 deletions

View File

@@ -12,6 +12,7 @@
<module>tashow-framework</module> <module>tashow-framework</module>
<module>tashow-module</module> <module>tashow-module</module>
<module>tashow-gateway</module> <module>tashow-gateway</module>
<module>tashow-feign</module>
</modules> </modules>
<name>${project.artifactId}</name> <name>${project.artifactId}</name>

View File

@@ -81,48 +81,6 @@
<dependencyManagement> <dependencyManagement>
<dependencies> <dependencies>
<!-- Sa-Token 权限认证Reactor响应式集成在线文档https://sa-token.cc -->
<dependency>
<groupId>cn.dev33</groupId>
<artifactId>sa-token-reactor-spring-boot3-starter</artifactId>
<version>1.42.0</version>
</dependency>
<dependency>
<groupId>cn.dev33</groupId>
<artifactId>sa-token-spring-boot3-starter</artifactId>
<version>1.42.0</version>
</dependency>
<!-- Sa-Token 插件整合SSO -->
<dependency>
<groupId>cn.dev33</groupId>
<artifactId>sa-token-sso</artifactId>
<version>1.42.0</version>
</dependency>
<!-- Sa-Token OAuth2.0 模块 -->
<dependency>
<groupId>cn.dev33</groupId>
<artifactId>sa-token-oauth2</artifactId>
<version>1.42.0</version>
</dependency>
<!-- Sa-Token 整合 Redis (使用 jackson 序列化方式) -->
<dependency>
<groupId>cn.dev33</groupId>
<artifactId>sa-token-redis-jackson</artifactId>
<version>1.42.0</version>
</dependency>
<dependency>
<groupId>com.dtflys.forest</groupId>
<artifactId>forest-spring-boot-starter</artifactId>
<version>1.5.26</version>
</dependency>
<!-- 统一依赖管理 --> <!-- 统一依赖管理 -->
<dependency> <dependency>
<groupId>io.netty</groupId> <groupId>io.netty</groupId>
@@ -167,27 +125,22 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.tashow.cloud</groupId> <groupId>com.tashow.cloud</groupId>
<artifactId>tashow-module-system-api</artifactId> <artifactId>tashow-system-api</artifactId>
<version>${revision}</version> <version>${revision}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.tashow.cloud</groupId> <groupId>com.tashow.cloud</groupId>
<artifactId>tashow-module-system-biz</artifactId> <artifactId>tashow-module-system</artifactId>
<version>${revision}</version> <version>${revision}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.tashow.cloud</groupId> <groupId>com.tashow.cloud</groupId>
<artifactId>tashow-module-infra-api</artifactId> <artifactId>tashow-infra-api</artifactId>
<version>${revision}</version> <version>${revision}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.tashow.cloud</groupId> <groupId>com.tashow.cloud</groupId>
<artifactId>tashow-module-user-api</artifactId> <artifactId>tashow-module-infra</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.tashow.cloud</groupId>
<artifactId>tashow-module-infra-biz</artifactId>
<version>${revision}</version> <version>${revision}</version>
</dependency> </dependency>
<dependency> <dependency>

18
tashow-feign/pom.xml Normal file
View File

@@ -0,0 +1,18 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.tashow.cloud</groupId>
<artifactId>tashow-platform</artifactId>
<version>${revision}</version>
</parent>
<artifactId>tashow-feign</artifactId>
<packaging>pom</packaging>
<modules>
<module>tashow-infra-api</module>
<module>tashow-system-api</module>
</modules>
</project>

View File

@@ -5,10 +5,10 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<parent> <parent>
<groupId>com.tashow.cloud</groupId> <groupId>com.tashow.cloud</groupId>
<artifactId>tashow-module-infra</artifactId> <artifactId>tashow-feign</artifactId>
<version>${revision}</version> <version>${revision}</version>
</parent> </parent>
<artifactId>tashow-module-infra-api</artifactId> <artifactId>tashow-infra-api</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>${project.artifactId}</name> <name>${project.artifactId}</name>

View File

@@ -4,11 +4,11 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent> <parent>
<groupId>com.tashow.cloud</groupId> <groupId>com.tashow.cloud</groupId>
<artifactId>tashow-module-system</artifactId> <artifactId>tashow-feign</artifactId>
<version>${revision}</version> <version>${revision}</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>tashow-module-system-api</artifactId> <artifactId>tashow-system-api</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>${project.artifactId}</name> <name>${project.artifactId}</name>

View File

@@ -26,6 +26,7 @@
<module>tashow-data-mybatis</module> <module>tashow-data-mybatis</module>
<module>tashow-data-redis</module> <module>tashow-data-redis</module>
<module>tashow-data-excel</module> <module>tashow-data-excel</module>
<module>tashow-data-es</module>
</modules> </modules>

View File

@@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.tashow.cloud</groupId>
<artifactId>tashow-framework</artifactId>
<version>${revision}</version>
</parent>
<artifactId>tashow-data-es</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>es 封装拓展</description>
<dependencies>
<!-- Spring Data Elasticsearch -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version> <!-- 最新版本 -->
</dependency>
<!-- 芋道基础依赖 -->
<dependency>
<groupId>com.tashow.cloud</groupId>
<artifactId>tashow-common</artifactId>
</dependency>
<!-- 其他必要依赖 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,61 @@
package com.tashow.cloud.es.config;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.context.annotation.Bean;
import javax.annotation.PreDestroy;
@Slf4j
@AutoConfiguration
public class ElasticsearchAutoConfiguration {
private RestClient restClient;
@Bean
public ElasticsearchClient elasticsearchClient(ElasticsearchProperties properties) {
// 1. 构建 HTTP 主机数组
HttpHost[] hosts = properties.getUris().stream()
.map(uri -> {
if (!uri.startsWith("http")) {
throw new IllegalArgumentException("URI 必须包含协议 (http/https)");
}
return HttpHost.create(uri);
})
.toArray(HttpHost[]::new);
// 2. 创建低级 REST 客户端 (无认证)
this.restClient = RestClient.builder(hosts)
.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
.setConnectTimeout(properties.getConnectTimeout())
.setSocketTimeout(properties.getSocketTimeout()))
.build();
// 3. 创建 Transport 层
ElasticsearchTransport transport = new RestClientTransport(
restClient,
new JacksonJsonpMapper() // 使用 Jackson 处理 JSON
);
log.info("[Elasticsearch] 客户端初始化完成,节点: {}", properties.getUris());
return new ElasticsearchClient(transport);
}
@PreDestroy
public void destroy() {
if (restClient != null) {
try {
restClient.close();
log.info("[Elasticsearch] 客户端已关闭");
} catch (Exception e) {
log.error("[Elasticsearch] 客户端关闭异常", e);
}
}
}
}

View File

@@ -0,0 +1,31 @@
/*
package com.tashow.cloud.es.config;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ElasticsearchConfig {
@Bean
public ElasticsearchClient elasticsearchClient() {
// 创建低级客户端
RestClient restClient = RestClient.builder(
new HttpHost("43.139.42.137", 9200)
).build();
// 使用 Jackson 映射器创建传输层
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper()
);
// 创建高级客户端
return new ElasticsearchClient(transport);
}
}*/

View File

@@ -0,0 +1,65 @@
/*
package com.tashow.cloud.es.config;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import java.util.Arrays;
@Slf4j
@AutoConfiguration
@EnableConfigurationProperties(ElasticsearchProperties.class)
public class ElasticsearchConfigTest {
@Bean
public ElasticsearchClient elasticsearchClient(ElasticsearchProperties properties) {
// 1. 创建低级 REST 客户端
RestClient restClient = RestClient.builder(buildHttpHosts(properties))
.setHttpClientConfigCallback(httpClientBuilder -> {
// 认证配置
if (properties.getUsername() != null) {
CredentialsProvider credsProvider = new BasicCredentialsProvider();
credsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(properties.getUsername(), properties.getPassword())
);
httpClientBuilder.setDefaultCredentialsProvider(credsProvider);
}
return httpClientBuilder;
})
.build();
// 2. 创建 Transport 层
ElasticsearchTransport transport = new RestClientTransport(
restClient,
new JacksonJsonpMapper() // 使用 Jackson 处理 JSON
);
// 3. 返回新客户端
return new ElasticsearchClient(transport);
}
private HttpHost[] buildHttpHosts(ElasticsearchProperties properties) {
return Arrays.stream(properties.getUris())
.map(uri -> {
try {
return HttpHost.create(uri);
} catch (Exception e) {
throw new IllegalArgumentException("Invalid Elasticsearch URI: " + uri, e);
}
})
.toArray(HttpHost[]::new);
}
}*/

View File

@@ -0,0 +1,31 @@
package com.tashow.cloud.es.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import java.util.List;
@Data
public class ElasticsearchProperties {
/**
* 是否启用 Elasticsearch
*/
private Boolean enabled = true;
/**
* 节点地址列表 (格式: http://ip:port)
*/
private List<String> uris = List.of("http://43.139.42.137:9200");
/**
* 连接超时时间 (ms)
*/
private Integer connectTimeout = 3000;
/**
* 通信超时时间 (ms)
*/
private Integer socketTimeout = 10000;
}

View File

@@ -0,0 +1,34 @@
package com.tashow.cloud.es.service;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.IndexResponse;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
public class ElasticsearchService {
private final ElasticsearchClient client;
public ElasticsearchService(ElasticsearchClient client) {
this.client = client;
}
/**
* 向 Elasticsearch 索引中插入数据
*
* @param indexName 索引名称
* @param id 文档 ID
* @param jsonData JSON 格式的文档数据
* @return 插入结果
* @throws IOException 如果发生 I/O 错误
*/
public IndexResponse insertDocument(String indexName, String id, String jsonData) throws IOException {
return client.index(i -> i
.index(indexName)
.id(id)
.document(jsonData)
);
}
}

View File

@@ -0,0 +1,3 @@
com.tashow.cloud.es.config.ElasticsearchAutoConfiguration
com.tashow.cloud.es.service.ElasticsearchService
com.tashow.cloud.es.config.ElasticsearchProperties

View File

@@ -36,7 +36,7 @@
<!-- 业务组件 --> <!-- 业务组件 -->
<dependency> <dependency>
<groupId>com.tashow.cloud</groupId> <groupId>com.tashow.cloud</groupId>
<artifactId>tashow-module-system-api</artifactId> <!-- 需要使用它,进行 Dict 的查询 --> <artifactId>tashow-system-api</artifactId> <!-- 需要使用它,进行 Dict 的查询 -->
<version>${revision}</version> <version>${revision}</version>
</dependency> </dependency>

View File

@@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit;
@AutoConfiguration(before = MybatisPlusAutoConfiguration.class) // 目的先于 MyBatis Plus 自动配置避免 @MapperScan 可能扫描不到 Mapper 打印 warn 日志 @AutoConfiguration(before = MybatisPlusAutoConfiguration.class) // 目的先于 MyBatis Plus 自动配置避免 @MapperScan 可能扫描不到 Mapper 打印 warn 日志
@MapperScan(value = "${tashow.info.base-package}", annotationClass = Mapper.class, @MapperScan(value = "${tashow.info.base-package}", annotationClass = Mapper.class,
lazyInitialization = "${mybatis.lazy-initialization:false}") // Mapper 懒加载目前仅用于单元测试 lazyInitialization = "${mybatis.lazy-initialization:false}") // Mapper 懒加载目前仅用于单元测试
public class MybatisAutoConfiguration { public class BaseMybatisAutoConfiguration {
static { static {
// 动态 SQL 智能优化支持本地缓存加速解析更完善的租户复杂 XML 动态 SQL 支持静态注入缓存 // 动态 SQL 智能优化支持本地缓存加速解析更完善的租户复杂 XML 动态 SQL 支持静态注入缓存

View File

@@ -1,3 +1,3 @@
com.tashow.cloud.mybatis.datasource.config.DataSourceAutoConfiguration com.tashow.cloud.mybatis.datasource.config.DataSourceAutoConfiguration
com.tashow.cloud.mybatis.mybatis.config.MybatisAutoConfiguration com.tashow.cloud.mybatis.mybatis.config.BaseMybatisAutoConfiguration
com.tashow.cloud.mybatis.translate.config.TranslateAutoConfiguration com.tashow.cloud.mybatis.translate.config.TranslateAutoConfiguration

View File

@@ -43,7 +43,7 @@
<!-- 业务组件 --> <!-- 业务组件 -->
<dependency> <dependency>
<groupId>com.tashow.cloud</groupId> <groupId>com.tashow.cloud</groupId>
<artifactId>tashow-module-system-api</artifactId> <!-- 需要使用它,进行数据权限的获取 --> <artifactId>tashow-system-api</artifactId> <!-- 需要使用它,进行数据权限的获取 -->
<version>${revision}</version> <version>${revision}</version>
</dependency> </dependency>

View File

@@ -19,7 +19,7 @@ import org.springframework.util.StringUtils;
import java.util.Objects; import java.util.Objects;
import static com.tashow.cloud.redis.config.RedisAutoConfiguration.buildRedisSerializer; import static com.tashow.cloud.redis.config.TashowRedisAutoConfiguration.buildRedisSerializer;
/** /**
@@ -28,7 +28,7 @@ import static com.tashow.cloud.redis.config.RedisAutoConfiguration.buildRedisSer
@AutoConfiguration @AutoConfiguration
@EnableConfigurationProperties({CacheProperties.class, TashowCacheProperties.class}) @EnableConfigurationProperties({CacheProperties.class, TashowCacheProperties.class})
@EnableCaching @EnableCaching
public class CacheAutoConfiguration { public class TashowCacheAutoConfiguration {
/** /**
* RedisCacheConfiguration Bean * RedisCacheConfiguration Bean

View File

@@ -14,7 +14,7 @@ import org.springframework.data.redis.serializer.RedisSerializer;
* Redis 配置类 * Redis 配置类
*/ */
@AutoConfiguration(before = RedissonAutoConfigurationV2.class) // 目的使用自己定义的 RedisTemplate Bean @AutoConfiguration(before = RedissonAutoConfigurationV2.class) // 目的使用自己定义的 RedisTemplate Bean
public class RedisAutoConfiguration { public class TashowRedisAutoConfiguration {
/** /**
* 创建 RedisTemplate Bean使用 JSON 序列化方式 * 创建 RedisTemplate Bean使用 JSON 序列化方式

View File

@@ -1,2 +1,2 @@
com.tashow.cloud.redis.config.RedisAutoConfiguration com.tashow.cloud.redis.config.TashowRedisAutoConfiguration
com.tashow.cloud.redis.config.CacheAutoConfiguration com.tashow.cloud.redis.config.TashowCacheAutoConfiguration

View File

@@ -20,23 +20,16 @@
<groupId>com.tashow.cloud</groupId> <groupId>com.tashow.cloud</groupId>
<artifactId>tashow-data-redis</artifactId> <artifactId>tashow-data-redis</artifactId>
</dependency> </dependency>
<!-- 消息队列相关 -->
<dependency> <dependency>
<groupId>org.springframework.kafka</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-kafka</artifactId> <artifactId>spring-boot-starter-amqp</artifactId>
<optional>true</optional>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.amqp</groupId> <groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId> <artifactId>spring-rabbit</artifactId>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<optional>true</optional>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@@ -0,0 +1 @@
package com.tashow.cloud.mq;

View File

@@ -1,151 +0,0 @@
package com.tashow.cloud.mq.redis.config;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.system.SystemUtil;
import com.tashow.cloud.common.enums.DocumentEnum;
import com.tashow.cloud.mq.redis.core.RedisMQTemplate;
import com.tashow.cloud.mq.redis.core.job.RedisPendingMessageResendJob;
import com.tashow.cloud.mq.redis.core.pubsub.AbstractRedisChannelMessageListener;
import com.tashow.cloud.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import com.tashow.cloud.redis.config.RedisAutoConfiguration;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisServerCommands;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.scheduling.annotation.EnableScheduling;
import java.util.List;
import java.util.Properties;
/**
* Redis 消息队列 Consumer 配置类
*
* @author 芋道源码
*/
@Slf4j
@EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息
@AutoConfiguration(after = RedisAutoConfiguration.class)
public class RedisMQConsumerAutoConfiguration {
/**
* 创建 Redis Pub/Sub 广播消费的容器
*/
@Bean
@ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners) {
// 创建 RedisMessageListenerContainer 对象
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// 设置 RedisConnection 工厂。
container.setConnectionFactory(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory());
// 添加监听器
listeners.forEach(listener -> {
listener.setRedisMQTemplate(redisMQTemplate);
container.addMessageListener(listener, new ChannelTopic(listener.getChannel()));
log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]",
listener.getChannel(), listener.getClass().getName());
});
return container;
}
/**
* 创建 Redis Stream 重新消费的任务
*/
@Bean
@ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
public RedisPendingMessageResendJob redisPendingMessageResendJob(List<AbstractRedisStreamMessageListener<?>> listeners,
RedisMQTemplate redisTemplate,
@Value("${spring.application.name}") String groupName,
RedissonClient redissonClient) {
return new RedisPendingMessageResendJob(listeners, redisTemplate, groupName, redissonClient);
}
/**
* 创建 Redis Stream 集群消费的容器
*
* 基础知识:<a href="https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html">Redis Stream 的 xreadgroup 命令</a>
*/
@Bean(initMethod = "start", destroyMethod = "stop")
@ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(
RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners) {
RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate();
checkRedisVersion(redisTemplate);
// 第一步,创建 StreamMessageListenerContainer 容器
// 创建 options 配置
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> containerOptions =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.batchSize(10) // 一次性最多拉取多少条消息
.targetType(String.class) // 目标类型。统一使用 String通过自己封装的 AbstractStreamMessageListener 去反序列化
.build();
// 创建 container 对象
StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
StreamMessageListenerContainer.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions);
// 第二步,注册监听器,消费对应的 Stream 主题
String consumerName = buildConsumerName();
listeners.parallelStream().forEach(listener -> {
log.info("[redisStreamMessageListenerContainer][开始注册 StreamKey({}) 对应的监听器({})]",
listener.getStreamKey(), listener.getClass().getName());
// 创建 listener 对应的消费者分组
try {
redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());
} catch (Exception ignore) {
}
// 设置 listener 对应的 redisTemplate
listener.setRedisMQTemplate(redisMQTemplate);
// 创建 Consumer 对象
Consumer consumer = Consumer.from(listener.getGroup(), consumerName);
// 设置 Consumer 消费进度,以最小消费进度为准
StreamOffset<String> streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed());
// 设置 Consumer 监听
StreamMessageListenerContainer.StreamReadRequestBuilder<String> builder = StreamMessageListenerContainer.StreamReadRequest
.builder(streamOffset).consumer(consumer)
.autoAcknowledge(false) // 不自动 ack
.cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false
container.register(builder.build(), listener);
log.info("[redisStreamMessageListenerContainer][完成注册 StreamKey({}) 对应的监听器({})]",
listener.getStreamKey(), listener.getClass().getName());
});
return container;
}
/**
* 构建消费者名字,使用本地 IP + 进程编号的方式。
* 参考自 RocketMQ clientId 的实现
*
* @return 消费者名字
*/
private static String buildConsumerName() {
return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID());
}
/**
* 校验 Redis 版本号,是否满足最低的版本号要求!
*/
private static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) {
// 获得 Redis 版本
Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);
String version = MapUtil.getStr(info, "redis_version");
// 校验最低版本必须大于等于 5.0.0
int majorVersion = Integer.parseInt(StrUtil.subBefore(version, '.', false));
if (majorVersion < 5) {
throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!" +
"请参考 {} 文档进行安装。", version, DocumentEnum.REDIS_INSTALL.getUrl()));
}
}
}

Some files were not shown because too many files have changed in this diff Show More