Merge pull request 'feature/app-1.0.0' (#1) from feature/app-1.0.0 into develop
Reviewed-on: #1
This commit is contained in:
@@ -19,7 +19,7 @@ import org.springframework.util.StringUtils;
|
|||||||
|
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
import static com.tashow.cloud.redis.config.RedisAutoConfiguration.buildRedisSerializer;
|
import static com.tashow.cloud.redis.config.TashowRedisAutoConfiguration.buildRedisSerializer;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -28,7 +28,7 @@ import static com.tashow.cloud.redis.config.RedisAutoConfiguration.buildRedisSer
|
|||||||
@AutoConfiguration
|
@AutoConfiguration
|
||||||
@EnableConfigurationProperties({CacheProperties.class, TashowCacheProperties.class})
|
@EnableConfigurationProperties({CacheProperties.class, TashowCacheProperties.class})
|
||||||
@EnableCaching
|
@EnableCaching
|
||||||
public class CacheAutoConfiguration {
|
public class TashowCacheAutoConfiguration {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* RedisCacheConfiguration Bean
|
* RedisCacheConfiguration Bean
|
||||||
@@ -14,7 +14,7 @@ import org.springframework.data.redis.serializer.RedisSerializer;
|
|||||||
* Redis 配置类
|
* Redis 配置类
|
||||||
*/
|
*/
|
||||||
@AutoConfiguration(before = RedissonAutoConfigurationV2.class) // 目的:使用自己定义的 RedisTemplate Bean
|
@AutoConfiguration(before = RedissonAutoConfigurationV2.class) // 目的:使用自己定义的 RedisTemplate Bean
|
||||||
public class RedisAutoConfiguration {
|
public class TashowRedisAutoConfiguration {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 创建 RedisTemplate Bean,使用 JSON 序列化方式
|
* 创建 RedisTemplate Bean,使用 JSON 序列化方式
|
||||||
@@ -1,2 +1,2 @@
|
|||||||
com.tashow.cloud.redis.config.RedisAutoConfiguration
|
com.tashow.cloud.redis.config.TashowRedisAutoConfiguration
|
||||||
com.tashow.cloud.redis.config.CacheAutoConfiguration
|
com.tashow.cloud.redis.config.TashowCacheAutoConfiguration
|
||||||
|
|||||||
@@ -21,22 +21,11 @@
|
|||||||
<artifactId>tashow-data-redis</artifactId>
|
<artifactId>tashow-data-redis</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<!-- 消息队列相关 -->
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.springframework.kafka</groupId>
|
|
||||||
<artifactId>spring-kafka</artifactId>
|
|
||||||
<optional>true</optional>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.springframework.amqp</groupId>
|
<groupId>org.springframework.amqp</groupId>
|
||||||
<artifactId>spring-rabbit</artifactId>
|
<artifactId>spring-rabbit</artifactId>
|
||||||
<optional>true</optional>
|
<optional>true</optional>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.rocketmq</groupId>
|
|
||||||
<artifactId>rocketmq-spring-boot-starter</artifactId>
|
|
||||||
<optional>true</optional>
|
|
||||||
</dependency>
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
</project>
|
</project>
|
||||||
|
|||||||
@@ -0,0 +1 @@
|
|||||||
|
package com.tashow.cloud.mq;
|
||||||
@@ -1,151 +0,0 @@
|
|||||||
package com.tashow.cloud.mq.redis.config;
|
|
||||||
|
|
||||||
import cn.hutool.core.map.MapUtil;
|
|
||||||
import cn.hutool.core.util.StrUtil;
|
|
||||||
import cn.hutool.system.SystemUtil;
|
|
||||||
import com.tashow.cloud.common.enums.DocumentEnum;
|
|
||||||
import com.tashow.cloud.mq.redis.core.RedisMQTemplate;
|
|
||||||
import com.tashow.cloud.mq.redis.core.job.RedisPendingMessageResendJob;
|
|
||||||
import com.tashow.cloud.mq.redis.core.pubsub.AbstractRedisChannelMessageListener;
|
|
||||||
import com.tashow.cloud.mq.redis.core.stream.AbstractRedisStreamMessageListener;
|
|
||||||
import com.tashow.cloud.redis.config.RedisAutoConfiguration;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.redisson.api.RedissonClient;
|
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
|
||||||
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
|
||||||
import org.springframework.data.redis.connection.RedisServerCommands;
|
|
||||||
import org.springframework.data.redis.connection.stream.Consumer;
|
|
||||||
import org.springframework.data.redis.connection.stream.ObjectRecord;
|
|
||||||
import org.springframework.data.redis.connection.stream.ReadOffset;
|
|
||||||
import org.springframework.data.redis.connection.stream.StreamOffset;
|
|
||||||
import org.springframework.data.redis.core.RedisCallback;
|
|
||||||
import org.springframework.data.redis.core.RedisTemplate;
|
|
||||||
import org.springframework.data.redis.listener.ChannelTopic;
|
|
||||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
|
||||||
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
|
|
||||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Redis 消息队列 Consumer 配置类
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
@Slf4j
|
|
||||||
@EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息
|
|
||||||
@AutoConfiguration(after = RedisAutoConfiguration.class)
|
|
||||||
public class RedisMQConsumerAutoConfiguration {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 创建 Redis Pub/Sub 广播消费的容器
|
|
||||||
*/
|
|
||||||
@Bean
|
|
||||||
@ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听
|
|
||||||
public RedisMessageListenerContainer redisMessageListenerContainer(
|
|
||||||
RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners) {
|
|
||||||
// 创建 RedisMessageListenerContainer 对象
|
|
||||||
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
|
|
||||||
// 设置 RedisConnection 工厂。
|
|
||||||
container.setConnectionFactory(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory());
|
|
||||||
// 添加监听器
|
|
||||||
listeners.forEach(listener -> {
|
|
||||||
listener.setRedisMQTemplate(redisMQTemplate);
|
|
||||||
container.addMessageListener(listener, new ChannelTopic(listener.getChannel()));
|
|
||||||
log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]",
|
|
||||||
listener.getChannel(), listener.getClass().getName());
|
|
||||||
});
|
|
||||||
return container;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 创建 Redis Stream 重新消费的任务
|
|
||||||
*/
|
|
||||||
@Bean
|
|
||||||
@ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
|
|
||||||
public RedisPendingMessageResendJob redisPendingMessageResendJob(List<AbstractRedisStreamMessageListener<?>> listeners,
|
|
||||||
RedisMQTemplate redisTemplate,
|
|
||||||
@Value("${spring.application.name}") String groupName,
|
|
||||||
RedissonClient redissonClient) {
|
|
||||||
return new RedisPendingMessageResendJob(listeners, redisTemplate, groupName, redissonClient);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 创建 Redis Stream 集群消费的容器
|
|
||||||
*
|
|
||||||
* 基础知识:<a href="https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html">Redis Stream 的 xreadgroup 命令</a>
|
|
||||||
*/
|
|
||||||
@Bean(initMethod = "start", destroyMethod = "stop")
|
|
||||||
@ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
|
|
||||||
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(
|
|
||||||
RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners) {
|
|
||||||
RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate();
|
|
||||||
checkRedisVersion(redisTemplate);
|
|
||||||
// 第一步,创建 StreamMessageListenerContainer 容器
|
|
||||||
// 创建 options 配置
|
|
||||||
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> containerOptions =
|
|
||||||
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
|
|
||||||
.batchSize(10) // 一次性最多拉取多少条消息
|
|
||||||
.targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化
|
|
||||||
.build();
|
|
||||||
// 创建 container 对象
|
|
||||||
StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
|
|
||||||
StreamMessageListenerContainer.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions);
|
|
||||||
|
|
||||||
// 第二步,注册监听器,消费对应的 Stream 主题
|
|
||||||
String consumerName = buildConsumerName();
|
|
||||||
listeners.parallelStream().forEach(listener -> {
|
|
||||||
log.info("[redisStreamMessageListenerContainer][开始注册 StreamKey({}) 对应的监听器({})]",
|
|
||||||
listener.getStreamKey(), listener.getClass().getName());
|
|
||||||
// 创建 listener 对应的消费者分组
|
|
||||||
try {
|
|
||||||
redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());
|
|
||||||
} catch (Exception ignore) {
|
|
||||||
}
|
|
||||||
// 设置 listener 对应的 redisTemplate
|
|
||||||
listener.setRedisMQTemplate(redisMQTemplate);
|
|
||||||
// 创建 Consumer 对象
|
|
||||||
Consumer consumer = Consumer.from(listener.getGroup(), consumerName);
|
|
||||||
// 设置 Consumer 消费进度,以最小消费进度为准
|
|
||||||
StreamOffset<String> streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed());
|
|
||||||
// 设置 Consumer 监听
|
|
||||||
StreamMessageListenerContainer.StreamReadRequestBuilder<String> builder = StreamMessageListenerContainer.StreamReadRequest
|
|
||||||
.builder(streamOffset).consumer(consumer)
|
|
||||||
.autoAcknowledge(false) // 不自动 ack
|
|
||||||
.cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false
|
|
||||||
container.register(builder.build(), listener);
|
|
||||||
log.info("[redisStreamMessageListenerContainer][完成注册 StreamKey({}) 对应的监听器({})]",
|
|
||||||
listener.getStreamKey(), listener.getClass().getName());
|
|
||||||
});
|
|
||||||
return container;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 构建消费者名字,使用本地 IP + 进程编号的方式。
|
|
||||||
* 参考自 RocketMQ clientId 的实现
|
|
||||||
*
|
|
||||||
* @return 消费者名字
|
|
||||||
*/
|
|
||||||
private static String buildConsumerName() {
|
|
||||||
return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 校验 Redis 版本号,是否满足最低的版本号要求!
|
|
||||||
*/
|
|
||||||
private static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) {
|
|
||||||
// 获得 Redis 版本
|
|
||||||
Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);
|
|
||||||
String version = MapUtil.getStr(info, "redis_version");
|
|
||||||
// 校验最低版本必须大于等于 5.0.0
|
|
||||||
int majorVersion = Integer.parseInt(StrUtil.subBefore(version, '.', false));
|
|
||||||
if (majorVersion < 5) {
|
|
||||||
throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!" +
|
|
||||||
"请参考 {} 文档进行安装。", version, DocumentEnum.REDIS_INSTALL.getUrl()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,31 +0,0 @@
|
|||||||
package com.tashow.cloud.mq.redis.config;
|
|
||||||
|
|
||||||
import com.tashow.cloud.mq.redis.core.RedisMQTemplate;
|
|
||||||
import com.tashow.cloud.mq.redis.core.interceptor.RedisMessageInterceptor;
|
|
||||||
import com.tashow.cloud.redis.config.RedisAutoConfiguration;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
|
||||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Redis 消息队列 Producer 配置类
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
@Slf4j
|
|
||||||
@AutoConfiguration(after = RedisAutoConfiguration.class)
|
|
||||||
public class RedisMQProducerAutoConfiguration {
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate,
|
|
||||||
List<RedisMessageInterceptor> interceptors) {
|
|
||||||
RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate);
|
|
||||||
// 添加拦截器
|
|
||||||
interceptors.forEach(redisMQTemplate::addInterceptor);
|
|
||||||
return redisMQTemplate;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,87 +0,0 @@
|
|||||||
package com.tashow.cloud.mq.redis.core;
|
|
||||||
|
|
||||||
import com.tashow.cloud.common.util.json.JsonUtils;
|
|
||||||
import com.tashow.cloud.mq.redis.core.interceptor.RedisMessageInterceptor;
|
|
||||||
import com.tashow.cloud.mq.redis.core.message.AbstractRedisMessage;
|
|
||||||
import com.tashow.cloud.mq.redis.core.pubsub.AbstractRedisChannelMessage;
|
|
||||||
import com.tashow.cloud.mq.redis.core.stream.AbstractRedisStreamMessage;
|
|
||||||
import lombok.AllArgsConstructor;
|
|
||||||
import lombok.Getter;
|
|
||||||
import org.springframework.data.redis.connection.stream.RecordId;
|
|
||||||
import org.springframework.data.redis.connection.stream.StreamRecords;
|
|
||||||
import org.springframework.data.redis.core.RedisTemplate;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Redis MQ 操作模板类
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
@AllArgsConstructor
|
|
||||||
public class RedisMQTemplate {
|
|
||||||
|
|
||||||
@Getter
|
|
||||||
private final RedisTemplate<String, ?> redisTemplate;
|
|
||||||
/**
|
|
||||||
* 拦截器数组
|
|
||||||
*/
|
|
||||||
@Getter
|
|
||||||
private final List<RedisMessageInterceptor> interceptors = new ArrayList<>();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 发送 Redis 消息,基于 Redis pub/sub 实现
|
|
||||||
*
|
|
||||||
* @param message 消息
|
|
||||||
*/
|
|
||||||
public <T extends AbstractRedisChannelMessage> void send(T message) {
|
|
||||||
try {
|
|
||||||
sendMessageBefore(message);
|
|
||||||
// 发送消息
|
|
||||||
redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message));
|
|
||||||
} finally {
|
|
||||||
sendMessageAfter(message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 发送 Redis 消息,基于 Redis Stream 实现
|
|
||||||
*
|
|
||||||
* @param message 消息
|
|
||||||
* @return 消息记录的编号对象
|
|
||||||
*/
|
|
||||||
public <T extends AbstractRedisStreamMessage> RecordId send(T message) {
|
|
||||||
try {
|
|
||||||
sendMessageBefore(message);
|
|
||||||
// 发送消息
|
|
||||||
return redisTemplate.opsForStream().add(StreamRecords.newRecord()
|
|
||||||
.ofObject(JsonUtils.toJsonString(message)) // 设置内容
|
|
||||||
.withStreamKey(message.getStreamKey())); // 设置 stream key
|
|
||||||
} finally {
|
|
||||||
sendMessageAfter(message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 添加拦截器
|
|
||||||
*
|
|
||||||
* @param interceptor 拦截器
|
|
||||||
*/
|
|
||||||
public void addInterceptor(RedisMessageInterceptor interceptor) {
|
|
||||||
interceptors.add(interceptor);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void sendMessageBefore(AbstractRedisMessage message) {
|
|
||||||
// 正序
|
|
||||||
interceptors.forEach(interceptor -> interceptor.sendMessageBefore(message));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void sendMessageAfter(AbstractRedisMessage message) {
|
|
||||||
// 倒序
|
|
||||||
for (int i = interceptors.size() - 1; i >= 0; i--) {
|
|
||||||
interceptors.get(i).sendMessageAfter(message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,26 +0,0 @@
|
|||||||
package com.tashow.cloud.mq.redis.core.interceptor;
|
|
||||||
|
|
||||||
import com.tashow.cloud.mq.redis.core.message.AbstractRedisMessage;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@link AbstractRedisMessage} 消息拦截器
|
|
||||||
* 通过拦截器,作为插件机制,实现拓展。
|
|
||||||
* 例如说,多租户场景下的 MQ 消息处理
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
public interface RedisMessageInterceptor {
|
|
||||||
|
|
||||||
default void sendMessageBefore(AbstractRedisMessage message) {
|
|
||||||
}
|
|
||||||
|
|
||||||
default void sendMessageAfter(AbstractRedisMessage message) {
|
|
||||||
}
|
|
||||||
|
|
||||||
default void consumeMessageBefore(AbstractRedisMessage message) {
|
|
||||||
}
|
|
||||||
|
|
||||||
default void consumeMessageAfter(AbstractRedisMessage message) {
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,100 +0,0 @@
|
|||||||
package com.tashow.cloud.mq.redis.core.job;
|
|
||||||
|
|
||||||
import cn.hutool.core.collection.CollUtil;
|
|
||||||
import com.tashow.cloud.mq.redis.core.RedisMQTemplate;
|
|
||||||
import com.tashow.cloud.mq.redis.core.stream.AbstractRedisStreamMessageListener;
|
|
||||||
import lombok.AllArgsConstructor;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.redisson.api.RLock;
|
|
||||||
import org.redisson.api.RedissonClient;
|
|
||||||
import org.springframework.data.domain.Range;
|
|
||||||
import org.springframework.data.redis.connection.stream.*;
|
|
||||||
import org.springframework.data.redis.core.StreamOperations;
|
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Objects;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 这个任务用于处理,crash 之后的消费者未消费完的消息
|
|
||||||
*/
|
|
||||||
@Slf4j
|
|
||||||
@AllArgsConstructor
|
|
||||||
public class RedisPendingMessageResendJob {
|
|
||||||
|
|
||||||
private static final String LOCK_KEY = "redis:pending:msg:lock";
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 消息超时时间,默认 5 分钟
|
|
||||||
*
|
|
||||||
* 1. 超时的消息才会被重新投递
|
|
||||||
* 2. 由于定时任务 1 分钟一次,消息超时后不会被立即重投,极端情况下消息5分钟过期后,再等 1 分钟才会被扫瞄到
|
|
||||||
*/
|
|
||||||
private static final int EXPIRE_TIME = 5 * 60;
|
|
||||||
|
|
||||||
private final List<AbstractRedisStreamMessageListener<?>> listeners;
|
|
||||||
private final RedisMQTemplate redisTemplate;
|
|
||||||
private final String groupName;
|
|
||||||
private final RedissonClient redissonClient;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 一分钟执行一次,这里选择每分钟的35秒执行,是为了避免整点任务过多的问题
|
|
||||||
*/
|
|
||||||
@Scheduled(cron = "35 * * * * ?")
|
|
||||||
public void messageResend() {
|
|
||||||
RLock lock = redissonClient.getLock(LOCK_KEY);
|
|
||||||
// 尝试加锁
|
|
||||||
if (lock.tryLock()) {
|
|
||||||
try {
|
|
||||||
execute();
|
|
||||||
} catch (Exception ex) {
|
|
||||||
log.error("[messageResend][执行异常]", ex);
|
|
||||||
} finally {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 执行清理逻辑
|
|
||||||
*
|
|
||||||
* @see <a href="https://gitee.com/zhijiantianya/ruoyi-vue-pro/pulls/480/files">讨论</a>
|
|
||||||
*/
|
|
||||||
private void execute() {
|
|
||||||
StreamOperations<String, Object, Object> ops = redisTemplate.getRedisTemplate().opsForStream();
|
|
||||||
listeners.forEach(listener -> {
|
|
||||||
PendingMessagesSummary pendingMessagesSummary = Objects.requireNonNull(ops.pending(listener.getStreamKey(), groupName));
|
|
||||||
// 每个消费者的 pending 队列消息数量
|
|
||||||
Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();
|
|
||||||
pendingMessagesPerConsumer.forEach((consumerName, pendingMessageCount) -> {
|
|
||||||
log.info("[processPendingMessage][消费者({}) 消息数量({})]", consumerName, pendingMessageCount);
|
|
||||||
// 每个消费者的 pending消息的详情信息
|
|
||||||
PendingMessages pendingMessages = ops.pending(listener.getStreamKey(), Consumer.from(groupName, consumerName), Range.unbounded(), pendingMessageCount);
|
|
||||||
if (pendingMessages.isEmpty()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
pendingMessages.forEach(pendingMessage -> {
|
|
||||||
// 获取消息上一次传递到 consumer 的时间,
|
|
||||||
long lastDelivery = pendingMessage.getElapsedTimeSinceLastDelivery().getSeconds();
|
|
||||||
if (lastDelivery < EXPIRE_TIME){
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// 获取指定 id 的消息体
|
|
||||||
List<MapRecord<String, Object, Object>> records = ops.range(listener.getStreamKey(),
|
|
||||||
Range.of(Range.Bound.inclusive(pendingMessage.getIdAsString()), Range.Bound.inclusive(pendingMessage.getIdAsString())));
|
|
||||||
if (CollUtil.isEmpty(records)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// 重新投递消息
|
|
||||||
redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord()
|
|
||||||
.ofObject(records.get(0).getValue()) // 设置内容
|
|
||||||
.withStreamKey(listener.getStreamKey()));
|
|
||||||
// ack 消息消费完成
|
|
||||||
redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, records.get(0));
|
|
||||||
log.info("[processPendingMessage][消息({})重新投递成功]", records.get(0).getId());
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,29 +0,0 @@
|
|||||||
package com.tashow.cloud.mq.redis.core.message;
|
|
||||||
|
|
||||||
import lombok.Data;
|
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Redis 消息抽象基类
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
@Data
|
|
||||||
public abstract class AbstractRedisMessage {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 头
|
|
||||||
*/
|
|
||||||
private Map<String, String> headers = new HashMap<>();
|
|
||||||
|
|
||||||
public String getHeader(String key) {
|
|
||||||
return headers.get(key);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void addHeader(String key, String value) {
|
|
||||||
headers.put(key, value);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,23 +0,0 @@
|
|||||||
package com.tashow.cloud.mq.redis.core.pubsub;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
|
||||||
import com.tashow.cloud.mq.redis.core.message.AbstractRedisMessage;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Redis Channel Message 抽象类
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
public abstract class AbstractRedisChannelMessage extends AbstractRedisMessage {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获得 Redis Channel,默认使用类名
|
|
||||||
*
|
|
||||||
* @return Channel
|
|
||||||
*/
|
|
||||||
@JsonIgnore // 避免序列化。原因是,Redis 发布 Channel 消息的时候,已经会指定。
|
|
||||||
public String getChannel() {
|
|
||||||
return getClass().getSimpleName();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,103 +0,0 @@
|
|||||||
package com.tashow.cloud.mq.redis.core.pubsub;
|
|
||||||
|
|
||||||
import cn.hutool.core.util.TypeUtil;
|
|
||||||
import com.tashow.cloud.common.util.json.JsonUtils;
|
|
||||||
import com.tashow.cloud.mq.redis.core.RedisMQTemplate;
|
|
||||||
import com.tashow.cloud.mq.redis.core.interceptor.RedisMessageInterceptor;
|
|
||||||
import com.tashow.cloud.mq.redis.core.message.AbstractRedisMessage;
|
|
||||||
import lombok.Setter;
|
|
||||||
import lombok.SneakyThrows;
|
|
||||||
import org.springframework.data.redis.connection.Message;
|
|
||||||
import org.springframework.data.redis.connection.MessageListener;
|
|
||||||
|
|
||||||
import java.lang.reflect.Type;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Redis Pub/Sub 监听器抽象类,用于实现广播消费
|
|
||||||
*
|
|
||||||
* @param <T> 消息类型。一定要填写噢,不然会报错
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
public abstract class AbstractRedisChannelMessageListener<T extends AbstractRedisChannelMessage> implements MessageListener {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 消息类型
|
|
||||||
*/
|
|
||||||
private final Class<T> messageType;
|
|
||||||
/**
|
|
||||||
* Redis Channel
|
|
||||||
*/
|
|
||||||
private final String channel;
|
|
||||||
/**
|
|
||||||
* RedisMQTemplate
|
|
||||||
*/
|
|
||||||
@Setter
|
|
||||||
private RedisMQTemplate redisMQTemplate;
|
|
||||||
|
|
||||||
@SneakyThrows
|
|
||||||
protected AbstractRedisChannelMessageListener() {
|
|
||||||
this.messageType = getMessageClass();
|
|
||||||
this.channel = messageType.getDeclaredConstructor().newInstance().getChannel();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获得 Sub 订阅的 Redis Channel 通道
|
|
||||||
*
|
|
||||||
* @return channel
|
|
||||||
*/
|
|
||||||
public final String getChannel() {
|
|
||||||
return channel;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public final void onMessage(Message message, byte[] bytes) {
|
|
||||||
T messageObj = JsonUtils.parseObject(message.getBody(), messageType);
|
|
||||||
try {
|
|
||||||
consumeMessageBefore(messageObj);
|
|
||||||
// 消费消息
|
|
||||||
this.onMessage(messageObj);
|
|
||||||
} finally {
|
|
||||||
consumeMessageAfter(messageObj);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 处理消息
|
|
||||||
*
|
|
||||||
* @param message 消息
|
|
||||||
*/
|
|
||||||
public abstract void onMessage(T message);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 通过解析类上的泛型,获得消息类型
|
|
||||||
*
|
|
||||||
* @return 消息类型
|
|
||||||
*/
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private Class<T> getMessageClass() {
|
|
||||||
Type type = TypeUtil.getTypeArgument(getClass(), 0);
|
|
||||||
if (type == null) {
|
|
||||||
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
|
|
||||||
}
|
|
||||||
return (Class<T>) type;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void consumeMessageBefore(AbstractRedisMessage message) {
|
|
||||||
assert redisMQTemplate != null;
|
|
||||||
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
|
|
||||||
// 正序
|
|
||||||
interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void consumeMessageAfter(AbstractRedisMessage message) {
|
|
||||||
assert redisMQTemplate != null;
|
|
||||||
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
|
|
||||||
// 倒序
|
|
||||||
for (int i = interceptors.size() - 1; i >= 0; i--) {
|
|
||||||
interceptors.get(i).consumeMessageAfter(message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,23 +0,0 @@
|
|||||||
package com.tashow.cloud.mq.redis.core.stream;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
|
||||||
import com.tashow.cloud.mq.redis.core.message.AbstractRedisMessage;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Redis Stream Message 抽象类
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
public abstract class AbstractRedisStreamMessage extends AbstractRedisMessage {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获得 Redis Stream Key,默认使用类名
|
|
||||||
*
|
|
||||||
* @return Channel
|
|
||||||
*/
|
|
||||||
@JsonIgnore // 避免序列化
|
|
||||||
public String getStreamKey() {
|
|
||||||
return getClass().getSimpleName();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,113 +0,0 @@
|
|||||||
package com.tashow.cloud.mq.redis.core.stream;
|
|
||||||
|
|
||||||
import cn.hutool.core.util.TypeUtil;
|
|
||||||
import com.tashow.cloud.common.util.json.JsonUtils;
|
|
||||||
import com.tashow.cloud.mq.redis.core.RedisMQTemplate;
|
|
||||||
import com.tashow.cloud.mq.redis.core.interceptor.RedisMessageInterceptor;
|
|
||||||
import com.tashow.cloud.mq.redis.core.message.AbstractRedisMessage;
|
|
||||||
import lombok.Getter;
|
|
||||||
import lombok.Setter;
|
|
||||||
import lombok.SneakyThrows;
|
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
|
||||||
import org.springframework.data.redis.connection.stream.ObjectRecord;
|
|
||||||
import org.springframework.data.redis.stream.StreamListener;
|
|
||||||
|
|
||||||
import java.lang.reflect.Type;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Redis Stream 监听器抽象类,用于实现集群消费
|
|
||||||
*
|
|
||||||
* @param <T> 消息类型。一定要填写噢,不然会报错
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
public abstract class AbstractRedisStreamMessageListener<T extends AbstractRedisStreamMessage>
|
|
||||||
implements StreamListener<String, ObjectRecord<String, String>> {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 消息类型
|
|
||||||
*/
|
|
||||||
private final Class<T> messageType;
|
|
||||||
/**
|
|
||||||
* Redis Channel
|
|
||||||
*/
|
|
||||||
@Getter
|
|
||||||
private final String streamKey;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Redis 消费者分组,默认使用 spring.application.name 名字
|
|
||||||
*/
|
|
||||||
@Value("${spring.application.name}")
|
|
||||||
@Getter
|
|
||||||
private String group;
|
|
||||||
/**
|
|
||||||
* RedisMQTemplate
|
|
||||||
*/
|
|
||||||
@Setter
|
|
||||||
private RedisMQTemplate redisMQTemplate;
|
|
||||||
|
|
||||||
@SneakyThrows
|
|
||||||
protected AbstractRedisStreamMessageListener() {
|
|
||||||
this.messageType = getMessageClass();
|
|
||||||
this.streamKey = messageType.getDeclaredConstructor().newInstance().getStreamKey();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onMessage(ObjectRecord<String, String> message) {
|
|
||||||
// 消费消息
|
|
||||||
T messageObj = JsonUtils.parseObject(message.getValue(), messageType);
|
|
||||||
try {
|
|
||||||
consumeMessageBefore(messageObj);
|
|
||||||
// 消费消息
|
|
||||||
this.onMessage(messageObj);
|
|
||||||
// ack 消息消费完成
|
|
||||||
redisMQTemplate.getRedisTemplate().opsForStream().acknowledge(group, message);
|
|
||||||
// TODO 芋艿:需要额外考虑以下几个点:
|
|
||||||
// 1. 处理异常的情况
|
|
||||||
// 2. 发送日志;以及事务的结合
|
|
||||||
// 3. 消费日志;以及通用的幂等性
|
|
||||||
// 4. 消费失败的重试,https://zhuanlan.zhihu.com/p/60501638
|
|
||||||
} finally {
|
|
||||||
consumeMessageAfter(messageObj);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 处理消息
|
|
||||||
*
|
|
||||||
* @param message 消息
|
|
||||||
*/
|
|
||||||
public abstract void onMessage(T message);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 通过解析类上的泛型,获得消息类型
|
|
||||||
*
|
|
||||||
* @return 消息类型
|
|
||||||
*/
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private Class<T> getMessageClass() {
|
|
||||||
Type type = TypeUtil.getTypeArgument(getClass(), 0);
|
|
||||||
if (type == null) {
|
|
||||||
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
|
|
||||||
}
|
|
||||||
return (Class<T>) type;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void consumeMessageBefore(AbstractRedisMessage message) {
|
|
||||||
assert redisMQTemplate != null;
|
|
||||||
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
|
|
||||||
// 正序
|
|
||||||
interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void consumeMessageAfter(AbstractRedisMessage message) {
|
|
||||||
assert redisMQTemplate != null;
|
|
||||||
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
|
|
||||||
// 倒序
|
|
||||||
for (int i = interceptors.size() - 1; i >= 0; i--) {
|
|
||||||
interceptors.get(i).consumeMessageAfter(message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,6 +0,0 @@
|
|||||||
/**
|
|
||||||
* 消息队列,基于 Redis 提供:
|
|
||||||
* 1. 基于 Pub/Sub 实现广播消费
|
|
||||||
* 2. 基于 Stream 实现集群消费
|
|
||||||
*/
|
|
||||||
package com.tashow.cloud.mq.redis;
|
|
||||||
@@ -1,3 +1 @@
|
|||||||
com.tashow.cloud.mq.redis.config.RedisMQProducerAutoConfiguration
|
|
||||||
com.tashow.cloud.mq.redis.config.RedisMQConsumerAutoConfiguration
|
|
||||||
com.tashow.cloud.mq.rabbitmq.config.RabbitMQAutoConfiguration
|
com.tashow.cloud.mq.rabbitmq.config.RabbitMQAutoConfiguration
|
||||||
|
|||||||
@@ -6,14 +6,14 @@ import com.tashow.cloud.protection.idempotent.core.aop.IdempotentAspect;
|
|||||||
import com.tashow.cloud.protection.idempotent.core.keyresolver.IdempotentKeyResolver;
|
import com.tashow.cloud.protection.idempotent.core.keyresolver.IdempotentKeyResolver;
|
||||||
import com.tashow.cloud.protection.idempotent.core.keyresolver.impl.DefaultIdempotentKeyResolver;
|
import com.tashow.cloud.protection.idempotent.core.keyresolver.impl.DefaultIdempotentKeyResolver;
|
||||||
import com.tashow.cloud.protection.idempotent.core.redis.IdempotentRedisDAO;
|
import com.tashow.cloud.protection.idempotent.core.redis.IdempotentRedisDAO;
|
||||||
import com.tashow.cloud.redis.config.RedisAutoConfiguration;
|
import com.tashow.cloud.redis.config.TashowRedisAutoConfiguration;
|
||||||
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@AutoConfiguration(after = RedisAutoConfiguration.class)
|
@AutoConfiguration(after = TashowRedisAutoConfiguration.class)
|
||||||
public class IdempotentConfiguration {
|
public class IdempotentConfiguration {
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ package com.tashow.cloud.protection.ratelimiter.config;
|
|||||||
import com.tashow.cloud.protection.ratelimiter.core.aop.RateLimiterAspect;
|
import com.tashow.cloud.protection.ratelimiter.core.aop.RateLimiterAspect;
|
||||||
import com.tashow.cloud.protection.ratelimiter.core.keyresolver.RateLimiterKeyResolver;
|
import com.tashow.cloud.protection.ratelimiter.core.keyresolver.RateLimiterKeyResolver;
|
||||||
import com.tashow.cloud.protection.ratelimiter.core.keyresolver.impl.*;
|
import com.tashow.cloud.protection.ratelimiter.core.keyresolver.impl.*;
|
||||||
import com.tashow.cloud.redis.config.RedisAutoConfiguration;
|
import com.tashow.cloud.redis.config.TashowRedisAutoConfiguration;
|
||||||
import com.tashow.cloud.protection.ratelimiter.core.redis.RateLimiterRedisDAO;
|
import com.tashow.cloud.protection.ratelimiter.core.redis.RateLimiterRedisDAO;
|
||||||
import org.redisson.api.RedissonClient;
|
import org.redisson.api.RedissonClient;
|
||||||
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
||||||
@@ -11,7 +11,7 @@ import org.springframework.context.annotation.Bean;
|
|||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@AutoConfiguration(after = RedisAutoConfiguration.class)
|
@AutoConfiguration(after = TashowRedisAutoConfiguration.class)
|
||||||
public class RateLimiterConfiguration {
|
public class RateLimiterConfiguration {
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ package com.tashow.cloud.protection.signature.config;
|
|||||||
|
|
||||||
import com.tashow.cloud.protection.signature.core.redis.ApiSignatureRedisDAO;
|
import com.tashow.cloud.protection.signature.core.redis.ApiSignatureRedisDAO;
|
||||||
import com.tashow.cloud.protection.signature.core.aop.ApiSignatureAspect;
|
import com.tashow.cloud.protection.signature.core.aop.ApiSignatureAspect;
|
||||||
import com.tashow.cloud.redis.config.RedisAutoConfiguration;
|
import com.tashow.cloud.redis.config.TashowRedisAutoConfiguration;
|
||||||
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||||
@@ -12,7 +12,7 @@ import org.springframework.data.redis.core.StringRedisTemplate;
|
|||||||
*
|
*
|
||||||
* @author Zhougang
|
* @author Zhougang
|
||||||
*/
|
*/
|
||||||
@AutoConfiguration(after = RedisAutoConfiguration.class)
|
@AutoConfiguration(after = TashowRedisAutoConfiguration.class)
|
||||||
public class ApiSignatureAutoConfiguration {
|
public class ApiSignatureAutoConfiguration {
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
|
|||||||
@@ -1,17 +1,15 @@
|
|||||||
package com.tashow.cloud.tenant.config;
|
package com.tashow.cloud.tenant.config;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
|
||||||
|
import com.baomidou.mybatisplus.extension.plugins.inner.TenantLineInnerInterceptor;
|
||||||
import com.tashow.cloud.common.enums.WebFilterOrderEnum;
|
import com.tashow.cloud.common.enums.WebFilterOrderEnum;
|
||||||
import com.tashow.cloud.mybatis.mybatis.core.util.MyBatisUtils;
|
import com.tashow.cloud.mybatis.mybatis.core.util.MyBatisUtils;
|
||||||
import com.tashow.cloud.redis.config.TashowCacheProperties;
|
import com.tashow.cloud.redis.config.TashowCacheProperties;
|
||||||
import com.tashow.cloud.systemapi.api.tenant.TenantApi;
|
import com.tashow.cloud.systemapi.api.tenant.TenantApi;
|
||||||
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
|
|
||||||
import com.baomidou.mybatisplus.extension.plugins.inner.TenantLineInnerInterceptor;
|
|
||||||
import com.tashow.cloud.tenant.core.aop.TenantIgnoreAspect;
|
import com.tashow.cloud.tenant.core.aop.TenantIgnoreAspect;
|
||||||
import com.tashow.cloud.tenant.core.db.TenantDatabaseInterceptor;
|
import com.tashow.cloud.tenant.core.db.TenantDatabaseInterceptor;
|
||||||
import com.tashow.cloud.tenant.core.job.TenantJobAspect;
|
import com.tashow.cloud.tenant.core.job.TenantJobAspect;
|
||||||
import com.tashow.cloud.tenant.core.mq.rabbitmq.TenantRabbitMQInitializer;
|
import com.tashow.cloud.tenant.core.mq.rabbitmq.TenantRabbitMQInitializer;
|
||||||
import com.tashow.cloud.tenant.core.mq.redis.TenantRedisMessageInterceptor;
|
|
||||||
import com.tashow.cloud.tenant.core.mq.rocketmq.TenantRocketMQInitializer;
|
|
||||||
import com.tashow.cloud.tenant.core.redis.TenantRedisCacheManager;
|
import com.tashow.cloud.tenant.core.redis.TenantRedisCacheManager;
|
||||||
import com.tashow.cloud.tenant.core.security.TenantSecurityWebFilter;
|
import com.tashow.cloud.tenant.core.security.TenantSecurityWebFilter;
|
||||||
import com.tashow.cloud.tenant.core.service.TenantFrameworkService;
|
import com.tashow.cloud.tenant.core.service.TenantFrameworkService;
|
||||||
@@ -25,7 +23,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
|||||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||||
import org.springframework.boot.web.servlet.FilterRegistrationBean;
|
import org.springframework.boot.web.servlet.FilterRegistrationBean;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
|
||||||
import org.springframework.context.annotation.Primary;
|
import org.springframework.context.annotation.Primary;
|
||||||
import org.springframework.data.redis.cache.BatchStrategies;
|
import org.springframework.data.redis.cache.BatchStrategies;
|
||||||
import org.springframework.data.redis.cache.RedisCacheConfiguration;
|
import org.springframework.data.redis.cache.RedisCacheConfiguration;
|
||||||
@@ -97,23 +94,6 @@ public class TenantAutoConfiguration {
|
|||||||
return new TenantJobAspect(tenantFrameworkService);
|
return new TenantJobAspect(tenantFrameworkService);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ========== MQ ==========
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 多租户 Redis 消息队列的配置类
|
|
||||||
*
|
|
||||||
* 为什么要单独一个配置类呢?如果直接把 TenantRedisMessageInterceptor Bean 的初始化放外面,会报 RedisMessageInterceptor 类不存在的错误
|
|
||||||
*/
|
|
||||||
@Configuration
|
|
||||||
@ConditionalOnClass(name = "com.tashow.cloud.mq.redis.core.RedisMQTemplate")
|
|
||||||
public static class TenantRedisMQAutoConfiguration {
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public TenantRedisMessageInterceptor tenantRedisMessageInterceptor() {
|
|
||||||
return new TenantRedisMessageInterceptor();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
|
@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
|
||||||
@@ -121,12 +101,6 @@ public class TenantAutoConfiguration {
|
|||||||
return new TenantRabbitMQInitializer();
|
return new TenantRabbitMQInitializer();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
|
||||||
@ConditionalOnClass(name = "org.apache.rocketmq.spring.core.RocketMQTemplate")
|
|
||||||
public TenantRocketMQInitializer tenantRocketMQInitializer() {
|
|
||||||
return new TenantRocketMQInitializer();
|
|
||||||
}
|
|
||||||
|
|
||||||
// ========== Redis ==========
|
// ========== Redis ==========
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
|
|||||||
@@ -1,37 +0,0 @@
|
|||||||
package com.tashow.cloud.tenant.core.mq.kafka;
|
|
||||||
|
|
||||||
import cn.hutool.core.util.StrUtil;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.boot.SpringApplication;
|
|
||||||
import org.springframework.boot.env.EnvironmentPostProcessor;
|
|
||||||
import org.springframework.core.env.ConfigurableEnvironment;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 多租户的 Kafka 的 {@link EnvironmentPostProcessor} 实现类
|
|
||||||
*
|
|
||||||
* Kafka Producer 发送消息时,增加 {@link TenantKafkaProducerInterceptor} 拦截器
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
@Slf4j
|
|
||||||
public class TenantKafkaEnvironmentPostProcessor implements EnvironmentPostProcessor {
|
|
||||||
|
|
||||||
private static final String PROPERTY_KEY_INTERCEPTOR_CLASSES = "spring.kafka.producer.properties.interceptor.classes";
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
|
|
||||||
// 添加 TenantKafkaProducerInterceptor 拦截器
|
|
||||||
try {
|
|
||||||
String value = environment.getProperty(PROPERTY_KEY_INTERCEPTOR_CLASSES);
|
|
||||||
if (StrUtil.isEmpty(value)) {
|
|
||||||
value = TenantKafkaProducerInterceptor.class.getName();
|
|
||||||
} else {
|
|
||||||
value += "," + TenantKafkaProducerInterceptor.class.getName();
|
|
||||||
}
|
|
||||||
environment.getSystemProperties().put(PROPERTY_KEY_INTERCEPTOR_CLASSES, value);
|
|
||||||
} catch (NoClassDefFoundError ignore) {
|
|
||||||
// 如果触发 NoClassDefFoundError 异常,说明 TenantKafkaProducerInterceptor 类不存在,即没引入 kafka-spring 依赖
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,48 +0,0 @@
|
|||||||
package com.tashow.cloud.tenant.core.mq.kafka;
|
|
||||||
|
|
||||||
import cn.hutool.core.util.ReflectUtil;
|
|
||||||
import com.tashow.cloud.tenant.core.context.TenantContextHolder;
|
|
||||||
import org.apache.kafka.clients.producer.ProducerInterceptor;
|
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
|
||||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
|
||||||
import org.apache.kafka.common.header.Headers;
|
|
||||||
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import static com.tashow.cloud.web.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Kafka 消息队列的多租户 {@link ProducerInterceptor} 实现类
|
|
||||||
*
|
|
||||||
* 1. Producer 发送消息时,将 {@link TenantContextHolder} 租户编号,添加到消息的 Header 中
|
|
||||||
* 2. Consumer 消费消息时,将消息的 Header 的租户编号,添加到 {@link TenantContextHolder} 中,通过 {@link InvocableHandlerMethod} 实现
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
public class TenantKafkaProducerInterceptor implements ProducerInterceptor<Object, Object> {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ProducerRecord<Object, Object> onSend(ProducerRecord<Object, Object> record) {
|
|
||||||
Long tenantId = TenantContextHolder.getTenantId();
|
|
||||||
if (tenantId != null) {
|
|
||||||
Headers headers = (Headers) ReflectUtil.getFieldValue(record, "headers"); // private 属性,没有 get 方法,智能反射
|
|
||||||
headers.add(HEADER_TENANT_ID, tenantId.toString().getBytes());
|
|
||||||
}
|
|
||||||
return record;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void configure(Map<String, ?> configs) {
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -0,0 +1 @@
|
|||||||
|
package com.tashow.cloud.tenant.core.mq;
|
||||||
@@ -1,43 +0,0 @@
|
|||||||
package com.tashow.cloud.tenant.core.mq.redis;
|
|
||||||
|
|
||||||
import cn.hutool.core.util.StrUtil;
|
|
||||||
import com.tashow.cloud.mq.redis.core.interceptor.RedisMessageInterceptor;
|
|
||||||
import com.tashow.cloud.mq.redis.core.message.AbstractRedisMessage;
|
|
||||||
import com.tashow.cloud.tenant.core.context.TenantContextHolder;
|
|
||||||
|
|
||||||
import static com.tashow.cloud.web.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 多租户 {@link AbstractRedisMessage} 拦截器
|
|
||||||
*
|
|
||||||
* 1. Producer 发送消息时,将 {@link TenantContextHolder} 租户编号,添加到消息的 Header 中
|
|
||||||
* 2. Consumer 消费消息时,将消息的 Header 的租户编号,添加到 {@link TenantContextHolder} 中
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
public class TenantRedisMessageInterceptor implements RedisMessageInterceptor {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void sendMessageBefore(AbstractRedisMessage message) {
|
|
||||||
Long tenantId = TenantContextHolder.getTenantId();
|
|
||||||
if (tenantId != null) {
|
|
||||||
message.addHeader(HEADER_TENANT_ID, tenantId.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void consumeMessageBefore(AbstractRedisMessage message) {
|
|
||||||
String tenantIdStr = message.getHeader(HEADER_TENANT_ID);
|
|
||||||
if (StrUtil.isNotEmpty(tenantIdStr)) {
|
|
||||||
TenantContextHolder.setTenantId(Long.valueOf(tenantIdStr));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void consumeMessageAfter(AbstractRedisMessage message) {
|
|
||||||
// 注意,Consumer 是一个逻辑的入口,所以不考虑原本上下文就存在租户编号的情况
|
|
||||||
TenantContextHolder.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,47 +0,0 @@
|
|||||||
package com.tashow.cloud.tenant.core.mq.rocketmq;
|
|
||||||
|
|
||||||
import cn.hutool.core.lang.Assert;
|
|
||||||
import cn.hutool.core.util.StrUtil;
|
|
||||||
import com.tashow.cloud.tenant.core.context.TenantContextHolder;
|
|
||||||
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
|
|
||||||
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
|
|
||||||
import org.apache.rocketmq.common.message.MessageExt;
|
|
||||||
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import static com.tashow.cloud.web.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* RocketMQ 消息队列的多租户 {@link ConsumeMessageHook} 实现类
|
|
||||||
*
|
|
||||||
* Consumer 消费消息时,将消息的 Header 的租户编号,添加到 {@link TenantContextHolder} 中,通过 {@link InvocableHandlerMethod} 实现
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
public class TenantRocketMQConsumeMessageHook implements ConsumeMessageHook {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String hookName() {
|
|
||||||
return getClass().getSimpleName();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void consumeMessageBefore(ConsumeMessageContext context) {
|
|
||||||
// 校验,消息必须是单条,不然设置租户可能不正确
|
|
||||||
List<MessageExt> messages = context.getMsgList();
|
|
||||||
Assert.isTrue(messages.size() == 1, "消息条数({})不正确", messages.size());
|
|
||||||
// 设置租户编号
|
|
||||||
String tenantId = messages.get(0).getUserProperty(HEADER_TENANT_ID);
|
|
||||||
if (StrUtil.isNotEmpty(tenantId)) {
|
|
||||||
TenantContextHolder.setTenantId(Long.parseLong(tenantId));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void consumeMessageAfter(ConsumeMessageContext context) {
|
|
||||||
TenantContextHolder.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,53 +0,0 @@
|
|||||||
package com.tashow.cloud.tenant.core.mq.rocketmq;
|
|
||||||
|
|
||||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
|
||||||
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
|
|
||||||
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
|
|
||||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
|
||||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
|
||||||
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
|
|
||||||
import org.springframework.beans.BeansException;
|
|
||||||
import org.springframework.beans.factory.config.BeanPostProcessor;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 多租户的 RocketMQ 初始化器
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
public class TenantRocketMQInitializer implements BeanPostProcessor {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
|
|
||||||
if (bean instanceof DefaultRocketMQListenerContainer) {
|
|
||||||
DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
|
|
||||||
initTenantConsumer(container.getConsumer());
|
|
||||||
} else if (bean instanceof RocketMQTemplate) {
|
|
||||||
RocketMQTemplate template = (RocketMQTemplate) bean;
|
|
||||||
initTenantProducer(template.getProducer());
|
|
||||||
}
|
|
||||||
return bean;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void initTenantProducer(DefaultMQProducer producer) {
|
|
||||||
if (producer == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
DefaultMQProducerImpl producerImpl = producer.getDefaultMQProducerImpl();
|
|
||||||
if (producerImpl == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
producerImpl.registerSendMessageHook(new TenantRocketMQSendMessageHook());
|
|
||||||
}
|
|
||||||
|
|
||||||
private void initTenantConsumer(DefaultMQPushConsumer consumer) {
|
|
||||||
if (consumer == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
DefaultMQPushConsumerImpl consumerImpl = consumer.getDefaultMQPushConsumerImpl();
|
|
||||||
if (consumerImpl == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
consumerImpl.registerConsumeMessageHook(new TenantRocketMQConsumeMessageHook());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,37 +0,0 @@
|
|||||||
package com.tashow.cloud.tenant.core.mq.rocketmq;
|
|
||||||
|
|
||||||
import com.tashow.cloud.tenant.core.context.TenantContextHolder;
|
|
||||||
import org.apache.rocketmq.client.hook.SendMessageContext;
|
|
||||||
import org.apache.rocketmq.client.hook.SendMessageHook;
|
|
||||||
|
|
||||||
import static com.tashow.cloud.web.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* RocketMQ 消息队列的多租户 {@link SendMessageHook} 实现类
|
|
||||||
*
|
|
||||||
* Producer 发送消息时,将 {@link TenantContextHolder} 租户编号,添加到消息的 Header 中
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
public class TenantRocketMQSendMessageHook implements SendMessageHook {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String hookName() {
|
|
||||||
return getClass().getSimpleName();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void sendMessageBefore(SendMessageContext sendMessageContext) {
|
|
||||||
Long tenantId = TenantContextHolder.getTenantId();
|
|
||||||
if (tenantId == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
sendMessageContext.getMessage().putUserProperty(HEADER_TENANT_ID, tenantId.toString());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void sendMessageAfter(SendMessageContext sendMessageContext) {
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,2 +0,0 @@
|
|||||||
org.springframework.boot.env.EnvironmentPostProcessor=\
|
|
||||||
com.tashow.cloud.tenant.core.mq.kafka.TenantKafkaEnvironmentPostProcessor
|
|
||||||
@@ -1,2 +0,0 @@
|
|||||||
com.tashow.cloud.tenant.config.TenantRpcAutoConfiguration
|
|
||||||
com.tashow.cloud.tenant.config.TenantAutoConfiguration
|
|
||||||
@@ -42,21 +42,11 @@
|
|||||||
<groupId>com.tashow.cloud</groupId>
|
<groupId>com.tashow.cloud</groupId>
|
||||||
<artifactId>tashow-framework-mq</artifactId>
|
<artifactId>tashow-framework-mq</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.springframework.kafka</groupId>
|
|
||||||
<artifactId>spring-kafka</artifactId>
|
|
||||||
<optional>true</optional>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.springframework.amqp</groupId>
|
<groupId>org.springframework.amqp</groupId>
|
||||||
<artifactId>spring-rabbit</artifactId>
|
<artifactId>spring-rabbit</artifactId>
|
||||||
<optional>true</optional>
|
<optional>true</optional>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.rocketmq</groupId>
|
|
||||||
<artifactId>rocketmq-spring-boot-starter</artifactId>
|
|
||||||
<optional>true</optional>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<!-- 业务组件 -->
|
<!-- 业务组件 -->
|
||||||
<dependency>
|
<dependency>
|
||||||
|
|||||||
@@ -1,33 +1,22 @@
|
|||||||
package com.tashow.cloud.websocket.config;
|
package com.tashow.cloud.websocket.config;
|
||||||
|
|
||||||
import com.tashow.cloud.mq.redis.config.RedisMQConsumerAutoConfiguration;
|
|
||||||
import com.tashow.cloud.mq.redis.core.RedisMQTemplate;
|
|
||||||
import com.tashow.cloud.websocket.core.handler.JsonWebSocketMessageHandler;
|
import com.tashow.cloud.websocket.core.handler.JsonWebSocketMessageHandler;
|
||||||
import com.tashow.cloud.websocket.core.listener.WebSocketMessageListener;
|
import com.tashow.cloud.websocket.core.listener.WebSocketMessageListener;
|
||||||
import com.tashow.cloud.websocket.core.security.LoginUserHandshakeInterceptor;
|
import com.tashow.cloud.websocket.core.security.LoginUserHandshakeInterceptor;
|
||||||
import com.tashow.cloud.websocket.core.security.WebSocketAuthorizeRequestsCustomizer;
|
import com.tashow.cloud.websocket.core.security.WebSocketAuthorizeRequestsCustomizer;
|
||||||
import com.tashow.cloud.websocket.core.sender.kafka.KafkaWebSocketMessageConsumer;
|
|
||||||
import com.tashow.cloud.websocket.core.sender.kafka.KafkaWebSocketMessageSender;
|
|
||||||
import com.tashow.cloud.websocket.core.sender.local.LocalWebSocketMessageSender;
|
import com.tashow.cloud.websocket.core.sender.local.LocalWebSocketMessageSender;
|
||||||
import com.tashow.cloud.websocket.core.sender.rabbitmq.RabbitMQWebSocketMessageConsumer;
|
import com.tashow.cloud.websocket.core.sender.rabbitmq.RabbitMQWebSocketMessageConsumer;
|
||||||
import com.tashow.cloud.websocket.core.sender.rabbitmq.RabbitMQWebSocketMessageSender;
|
import com.tashow.cloud.websocket.core.sender.rabbitmq.RabbitMQWebSocketMessageSender;
|
||||||
import com.tashow.cloud.websocket.core.sender.redis.RedisWebSocketMessageConsumer;
|
|
||||||
import com.tashow.cloud.websocket.core.sender.redis.RedisWebSocketMessageSender;
|
|
||||||
import com.tashow.cloud.websocket.core.sender.rocketmq.RocketMQWebSocketMessageConsumer;
|
|
||||||
import com.tashow.cloud.websocket.core.sender.rocketmq.RocketMQWebSocketMessageSender;
|
|
||||||
import com.tashow.cloud.websocket.core.session.WebSocketSessionHandlerDecorator;
|
import com.tashow.cloud.websocket.core.session.WebSocketSessionHandlerDecorator;
|
||||||
import com.tashow.cloud.websocket.core.session.WebSocketSessionManager;
|
import com.tashow.cloud.websocket.core.session.WebSocketSessionManager;
|
||||||
import com.tashow.cloud.websocket.core.session.WebSocketSessionManagerImpl;
|
import com.tashow.cloud.websocket.core.session.WebSocketSessionManagerImpl;
|
||||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
|
||||||
import org.springframework.amqp.core.TopicExchange;
|
import org.springframework.amqp.core.TopicExchange;
|
||||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
import org.springframework.kafka.core.KafkaTemplate;
|
|
||||||
import org.springframework.web.socket.WebSocketHandler;
|
import org.springframework.web.socket.WebSocketHandler;
|
||||||
import org.springframework.web.socket.config.annotation.EnableWebSocket;
|
import org.springframework.web.socket.config.annotation.EnableWebSocket;
|
||||||
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
|
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
|
||||||
@@ -40,9 +29,8 @@ import java.util.List;
|
|||||||
*
|
*
|
||||||
* @author xingyu4j
|
* @author xingyu4j
|
||||||
*/
|
*/
|
||||||
@AutoConfiguration(before = RedisMQConsumerAutoConfiguration.class) // before YudaoRedisMQConsumerAutoConfiguration 的原因是,需要保证 RedisWebSocketMessageConsumer 先创建,才能创建 RedisMessageListenerContainer
|
|
||||||
@EnableWebSocket // 开启 websocket
|
@EnableWebSocket // 开启 websocket
|
||||||
@ConditionalOnProperty(prefix = "yudao.websocket", value = "enable", matchIfMissing = true) // 允许使用 yudao.websocket.enable=false 禁用 websocket
|
@ConditionalOnProperty(prefix = "tashow.websocket", value = "enable", matchIfMissing = true) // 允许使用 yudao.websocket.enable=false 禁用 websocket
|
||||||
@EnableConfigurationProperties(WebSocketProperties.class)
|
@EnableConfigurationProperties(WebSocketProperties.class)
|
||||||
public class WebSocketAutoConfiguration {
|
public class WebSocketAutoConfiguration {
|
||||||
|
|
||||||
@@ -96,44 +84,7 @@ public class WebSocketAutoConfiguration {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Configuration
|
@Configuration
|
||||||
@ConditionalOnProperty(prefix = "tashow.websocket", name = "sender-type", havingValue = "redis")
|
@ConditionalOnProperty(prefix = "tashow.websocket", name = "sender-type", havingValue = "rabbitmq")
|
||||||
public class RedisWebSocketMessageSenderConfiguration {
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public RedisWebSocketMessageSender redisWebSocketMessageSender(WebSocketSessionManager sessionManager,
|
|
||||||
RedisMQTemplate redisMQTemplate) {
|
|
||||||
return new RedisWebSocketMessageSender(sessionManager, redisMQTemplate);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public RedisWebSocketMessageConsumer redisWebSocketMessageConsumer(
|
|
||||||
RedisWebSocketMessageSender redisWebSocketMessageSender) {
|
|
||||||
return new RedisWebSocketMessageConsumer(redisWebSocketMessageSender);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Configuration
|
|
||||||
@ConditionalOnProperty(prefix = "tashow.websocket", name = "sender-type", havingValue = "rocketmq")
|
|
||||||
public class RocketMQWebSocketMessageSenderConfiguration {
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender(
|
|
||||||
WebSocketSessionManager sessionManager, RocketMQTemplate rocketMQTemplate,
|
|
||||||
@Value("${yudao.websocket.sender-rocketmq.topic}") String topic) {
|
|
||||||
return new RocketMQWebSocketMessageSender(sessionManager, rocketMQTemplate, topic);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public RocketMQWebSocketMessageConsumer rocketMQWebSocketMessageConsumer(
|
|
||||||
RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender) {
|
|
||||||
return new RocketMQWebSocketMessageConsumer(rocketMQWebSocketMessageSender);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Configuration
|
|
||||||
@ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "rabbitmq")
|
|
||||||
public class RabbitMQWebSocketMessageSenderConfiguration {
|
public class RabbitMQWebSocketMessageSenderConfiguration {
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
@@ -153,7 +104,7 @@ public class WebSocketAutoConfiguration {
|
|||||||
* 创建 Topic Exchange
|
* 创建 Topic Exchange
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
public TopicExchange websocketTopicExchange(@Value("${yudao.websocket.sender-rabbitmq.exchange}") String exchange) {
|
public TopicExchange websocketTopicExchange(@Value("${tashow.websocket.sender-rabbitmq.exchange}") String exchange) {
|
||||||
return new TopicExchange(exchange,
|
return new TopicExchange(exchange,
|
||||||
true, // durable: 是否持久化
|
true, // durable: 是否持久化
|
||||||
false); // exclusive: 是否排它
|
false); // exclusive: 是否排它
|
||||||
@@ -161,23 +112,4 @@ public class WebSocketAutoConfiguration {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Configuration
|
|
||||||
@ConditionalOnProperty(prefix = "tashow.websocket", name = "sender-type", havingValue = "kafka")
|
|
||||||
public class KafkaWebSocketMessageSenderConfiguration {
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public KafkaWebSocketMessageSender kafkaWebSocketMessageSender(
|
|
||||||
WebSocketSessionManager sessionManager, KafkaTemplate<Object, Object> kafkaTemplate,
|
|
||||||
@Value("${yudao.websocket.sender-kafka.topic}") String topic) {
|
|
||||||
return new KafkaWebSocketMessageSender(sessionManager, kafkaTemplate, topic);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public KafkaWebSocketMessageConsumer kafkaWebSocketMessageConsumer(
|
|
||||||
KafkaWebSocketMessageSender kafkaWebSocketMessageSender) {
|
|
||||||
return new KafkaWebSocketMessageConsumer(kafkaWebSocketMessageSender);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,35 +0,0 @@
|
|||||||
package com.tashow.cloud.websocket.core.sender.kafka;
|
|
||||||
|
|
||||||
import lombok.Data;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Kafka 广播 WebSocket 的消息
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
@Data
|
|
||||||
public class KafkaWebSocketMessage {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Session 编号
|
|
||||||
*/
|
|
||||||
private String sessionId;
|
|
||||||
/**
|
|
||||||
* 用户类型
|
|
||||||
*/
|
|
||||||
private Integer userType;
|
|
||||||
/**
|
|
||||||
* 用户编号
|
|
||||||
*/
|
|
||||||
private Long userId;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 消息类型
|
|
||||||
*/
|
|
||||||
private String messageType;
|
|
||||||
/**
|
|
||||||
* 消息内容
|
|
||||||
*/
|
|
||||||
private String messageContent;
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,28 +0,0 @@
|
|||||||
package com.tashow.cloud.websocket.core.sender.kafka;
|
|
||||||
|
|
||||||
import lombok.RequiredArgsConstructor;
|
|
||||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
|
||||||
import org.springframework.kafka.annotation.KafkaListener;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@link KafkaWebSocketMessage} 广播消息的消费者,真正把消息发送出去
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
@RequiredArgsConstructor
|
|
||||||
public class KafkaWebSocketMessageConsumer {
|
|
||||||
|
|
||||||
private final KafkaWebSocketMessageSender kafkaWebSocketMessageSender;
|
|
||||||
|
|
||||||
@RabbitHandler
|
|
||||||
@KafkaListener(
|
|
||||||
topics = "${yudao.websocket.sender-kafka.topic}",
|
|
||||||
// 在 Group 上,使用 UUID 生成其后缀。这样,启动的 Consumer 的 Group 不同,以达到广播消费的目的
|
|
||||||
groupId = "${yudao.websocket.sender-kafka.consumer-group}" + "-" + "#{T(java.util.UUID).randomUUID()}")
|
|
||||||
public void onMessage(KafkaWebSocketMessage message) {
|
|
||||||
kafkaWebSocketMessageSender.send(message.getSessionId(),
|
|
||||||
message.getUserType(), message.getUserId(),
|
|
||||||
message.getMessageType(), message.getMessageContent());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,67 +0,0 @@
|
|||||||
package com.tashow.cloud.websocket.core.sender.kafka;
|
|
||||||
|
|
||||||
import com.tashow.cloud.websocket.core.sender.AbstractWebSocketMessageSender;
|
|
||||||
import com.tashow.cloud.websocket.core.sender.WebSocketMessageSender;
|
|
||||||
import com.tashow.cloud.websocket.core.session.WebSocketSessionManager;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.kafka.core.KafkaTemplate;
|
|
||||||
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 基于 Kafka 的 {@link WebSocketMessageSender} 实现类
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
@Slf4j
|
|
||||||
public class KafkaWebSocketMessageSender extends AbstractWebSocketMessageSender {
|
|
||||||
|
|
||||||
private final KafkaTemplate<Object, Object> kafkaTemplate;
|
|
||||||
|
|
||||||
private final String topic;
|
|
||||||
|
|
||||||
public KafkaWebSocketMessageSender(WebSocketSessionManager sessionManager,
|
|
||||||
KafkaTemplate<Object, Object> kafkaTemplate,
|
|
||||||
String topic) {
|
|
||||||
super(sessionManager);
|
|
||||||
this.kafkaTemplate = kafkaTemplate;
|
|
||||||
this.topic = topic;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void send(Integer userType, Long userId, String messageType, String messageContent) {
|
|
||||||
sendKafkaMessage(null, userId, userType, messageType, messageContent);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void send(Integer userType, String messageType, String messageContent) {
|
|
||||||
sendKafkaMessage(null, null, userType, messageType, messageContent);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void send(String sessionId, String messageType, String messageContent) {
|
|
||||||
sendKafkaMessage(sessionId, null, null, messageType, messageContent);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 通过 Kafka 广播消息
|
|
||||||
*
|
|
||||||
* @param sessionId Session 编号
|
|
||||||
* @param userId 用户编号
|
|
||||||
* @param userType 用户类型
|
|
||||||
* @param messageType 消息类型
|
|
||||||
* @param messageContent 消息内容
|
|
||||||
*/
|
|
||||||
private void sendKafkaMessage(String sessionId, Long userId, Integer userType,
|
|
||||||
String messageType, String messageContent) {
|
|
||||||
KafkaWebSocketMessage mqMessage = new KafkaWebSocketMessage()
|
|
||||||
.setSessionId(sessionId).setUserId(userId).setUserType(userType)
|
|
||||||
.setMessageType(messageType).setMessageContent(messageContent);
|
|
||||||
try {
|
|
||||||
kafkaTemplate.send(topic, mqMessage).get();
|
|
||||||
} catch (InterruptedException | ExecutionException e) {
|
|
||||||
log.error("[sendKafkaMessage][发送消息({}) 到 Kafka 失败]", mqMessage, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -13,12 +13,12 @@ import org.springframework.amqp.rabbit.annotation.*;
|
|||||||
bindings = @QueueBinding(
|
bindings = @QueueBinding(
|
||||||
value = @Queue(
|
value = @Queue(
|
||||||
// 在 Queue 的名字上,使用 UUID 生成其后缀。这样,启动的 Consumer 的 Queue 不同,以达到广播消费的目的
|
// 在 Queue 的名字上,使用 UUID 生成其后缀。这样,启动的 Consumer 的 Queue 不同,以达到广播消费的目的
|
||||||
name = "${yudao.websocket.sender-rabbitmq.queue}" + "-" + "#{T(java.util.UUID).randomUUID()}",
|
name = "${tashow.websocket.sender-rabbitmq.queue}" + "-" + "#{T(java.util.UUID).randomUUID()}",
|
||||||
// Consumer 关闭时,该队列就可以被自动删除了
|
// Consumer 关闭时,该队列就可以被自动删除了
|
||||||
autoDelete = "true"
|
autoDelete = "true"
|
||||||
),
|
),
|
||||||
exchange = @Exchange(
|
exchange = @Exchange(
|
||||||
name = "${yudao.websocket.sender-rabbitmq.exchange}",
|
name = "${tashow.websocket.sender-rabbitmq.exchange}",
|
||||||
type = ExchangeTypes.TOPIC,
|
type = ExchangeTypes.TOPIC,
|
||||||
declare = "false"
|
declare = "false"
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -1,34 +0,0 @@
|
|||||||
package com.tashow.cloud.websocket.core.sender.redis;
|
|
||||||
|
|
||||||
import com.tashow.cloud.mq.redis.core.pubsub.AbstractRedisChannelMessage;
|
|
||||||
import lombok.Data;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Redis 广播 WebSocket 的消息
|
|
||||||
*/
|
|
||||||
@Data
|
|
||||||
public class RedisWebSocketMessage extends AbstractRedisChannelMessage {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Session 编号
|
|
||||||
*/
|
|
||||||
private String sessionId;
|
|
||||||
/**
|
|
||||||
* 用户类型
|
|
||||||
*/
|
|
||||||
private Integer userType;
|
|
||||||
/**
|
|
||||||
* 用户编号
|
|
||||||
*/
|
|
||||||
private Long userId;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 消息类型
|
|
||||||
*/
|
|
||||||
private String messageType;
|
|
||||||
/**
|
|
||||||
* 消息内容
|
|
||||||
*/
|
|
||||||
private String messageContent;
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,23 +0,0 @@
|
|||||||
package com.tashow.cloud.websocket.core.sender.redis;
|
|
||||||
|
|
||||||
import com.tashow.cloud.mq.redis.core.pubsub.AbstractRedisChannelMessageListener;
|
|
||||||
import lombok.RequiredArgsConstructor;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@link RedisWebSocketMessage} 广播消息的消费者,真正把消息发送出去
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
@RequiredArgsConstructor
|
|
||||||
public class RedisWebSocketMessageConsumer extends AbstractRedisChannelMessageListener<RedisWebSocketMessage> {
|
|
||||||
|
|
||||||
private final RedisWebSocketMessageSender redisWebSocketMessageSender;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onMessage(RedisWebSocketMessage message) {
|
|
||||||
redisWebSocketMessageSender.send(message.getSessionId(),
|
|
||||||
message.getUserType(), message.getUserId(),
|
|
||||||
message.getMessageType(), message.getMessageContent());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,57 +0,0 @@
|
|||||||
package com.tashow.cloud.websocket.core.sender.redis;
|
|
||||||
|
|
||||||
import com.tashow.cloud.mq.redis.core.RedisMQTemplate;
|
|
||||||
import com.tashow.cloud.websocket.core.sender.AbstractWebSocketMessageSender;
|
|
||||||
import com.tashow.cloud.websocket.core.sender.WebSocketMessageSender;
|
|
||||||
import com.tashow.cloud.websocket.core.session.WebSocketSessionManager;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 基于 Redis 的 {@link WebSocketMessageSender} 实现类
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
@Slf4j
|
|
||||||
public class RedisWebSocketMessageSender extends AbstractWebSocketMessageSender {
|
|
||||||
|
|
||||||
private final RedisMQTemplate redisMQTemplate;
|
|
||||||
|
|
||||||
public RedisWebSocketMessageSender(WebSocketSessionManager sessionManager,
|
|
||||||
RedisMQTemplate redisMQTemplate) {
|
|
||||||
super(sessionManager);
|
|
||||||
this.redisMQTemplate = redisMQTemplate;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void send(Integer userType, Long userId, String messageType, String messageContent) {
|
|
||||||
sendRedisMessage(null, userId, userType, messageType, messageContent);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void send(Integer userType, String messageType, String messageContent) {
|
|
||||||
sendRedisMessage(null, null, userType, messageType, messageContent);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void send(String sessionId, String messageType, String messageContent) {
|
|
||||||
sendRedisMessage(sessionId, null, null, messageType, messageContent);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 通过 Redis 广播消息
|
|
||||||
*
|
|
||||||
* @param sessionId Session 编号
|
|
||||||
* @param userId 用户编号
|
|
||||||
* @param userType 用户类型
|
|
||||||
* @param messageType 消息类型
|
|
||||||
* @param messageContent 消息内容
|
|
||||||
*/
|
|
||||||
private void sendRedisMessage(String sessionId, Long userId, Integer userType,
|
|
||||||
String messageType, String messageContent) {
|
|
||||||
RedisWebSocketMessage mqMessage = new RedisWebSocketMessage()
|
|
||||||
.setSessionId(sessionId).setUserId(userId).setUserType(userType)
|
|
||||||
.setMessageType(messageType).setMessageContent(messageContent);
|
|
||||||
redisMQTemplate.send(mqMessage);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,35 +0,0 @@
|
|||||||
package com.tashow.cloud.websocket.core.sender.rocketmq;
|
|
||||||
|
|
||||||
import lombok.Data;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* RocketMQ 广播 WebSocket 的消息
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
@Data
|
|
||||||
public class RocketMQWebSocketMessage {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Session 编号
|
|
||||||
*/
|
|
||||||
private String sessionId;
|
|
||||||
/**
|
|
||||||
* 用户类型
|
|
||||||
*/
|
|
||||||
private Integer userType;
|
|
||||||
/**
|
|
||||||
* 用户编号
|
|
||||||
*/
|
|
||||||
private Long userId;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 消息类型
|
|
||||||
*/
|
|
||||||
private String messageType;
|
|
||||||
/**
|
|
||||||
* 消息内容
|
|
||||||
*/
|
|
||||||
private String messageContent;
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,30 +0,0 @@
|
|||||||
package com.tashow.cloud.websocket.core.sender.rocketmq;
|
|
||||||
|
|
||||||
import lombok.RequiredArgsConstructor;
|
|
||||||
import org.apache.rocketmq.spring.annotation.MessageModel;
|
|
||||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
|
||||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@link RocketMQWebSocketMessage} 广播消息的消费者,真正把消息发送出去
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
@RocketMQMessageListener( // 重点:添加 @RocketMQMessageListener 注解,声明消费的 topic
|
|
||||||
topic = "${yudao.websocket.sender-rocketmq.topic}",
|
|
||||||
consumerGroup = "${yudao.websocket.sender-rocketmq.consumer-group}",
|
|
||||||
messageModel = MessageModel.BROADCASTING // 设置为广播模式,保证每个实例都能收到消息
|
|
||||||
)
|
|
||||||
@RequiredArgsConstructor
|
|
||||||
public class RocketMQWebSocketMessageConsumer implements RocketMQListener<RocketMQWebSocketMessage> {
|
|
||||||
|
|
||||||
private final RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onMessage(RocketMQWebSocketMessage message) {
|
|
||||||
rocketMQWebSocketMessageSender.send(message.getSessionId(),
|
|
||||||
message.getUserType(), message.getUserId(),
|
|
||||||
message.getMessageType(), message.getMessageContent());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,64 +0,0 @@
|
|||||||
package com.tashow.cloud.websocket.core.sender.rocketmq;
|
|
||||||
|
|
||||||
import com.tashow.cloud.websocket.core.sender.AbstractWebSocketMessageSender;
|
|
||||||
import com.tashow.cloud.websocket.core.sender.WebSocketMessageSender;
|
|
||||||
import com.tashow.cloud.websocket.core.session.WebSocketSessionManager;
|
|
||||||
import com.tashow.cloud.websocket.core.sender.AbstractWebSocketMessageSender;
|
|
||||||
import com.tashow.cloud.websocket.core.sender.WebSocketMessageSender;
|
|
||||||
import com.tashow.cloud.websocket.core.session.WebSocketSessionManager;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 基于 RocketMQ 的 {@link WebSocketMessageSender} 实现类
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
@Slf4j
|
|
||||||
public class RocketMQWebSocketMessageSender extends AbstractWebSocketMessageSender {
|
|
||||||
|
|
||||||
private final RocketMQTemplate rocketMQTemplate;
|
|
||||||
|
|
||||||
private final String topic;
|
|
||||||
|
|
||||||
public RocketMQWebSocketMessageSender(WebSocketSessionManager sessionManager,
|
|
||||||
RocketMQTemplate rocketMQTemplate,
|
|
||||||
String topic) {
|
|
||||||
super(sessionManager);
|
|
||||||
this.rocketMQTemplate = rocketMQTemplate;
|
|
||||||
this.topic = topic;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void send(Integer userType, Long userId, String messageType, String messageContent) {
|
|
||||||
sendRocketMQMessage(null, userId, userType, messageType, messageContent);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void send(Integer userType, String messageType, String messageContent) {
|
|
||||||
sendRocketMQMessage(null, null, userType, messageType, messageContent);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void send(String sessionId, String messageType, String messageContent) {
|
|
||||||
sendRocketMQMessage(sessionId, null, null, messageType, messageContent);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 通过 RocketMQ 广播消息
|
|
||||||
*
|
|
||||||
* @param sessionId Session 编号
|
|
||||||
* @param userId 用户编号
|
|
||||||
* @param userType 用户类型
|
|
||||||
* @param messageType 消息类型
|
|
||||||
* @param messageContent 消息内容
|
|
||||||
*/
|
|
||||||
private void sendRocketMQMessage(String sessionId, Long userId, Integer userType,
|
|
||||||
String messageType, String messageContent) {
|
|
||||||
RocketMQWebSocketMessage mqMessage = new RocketMQWebSocketMessage()
|
|
||||||
.setSessionId(sessionId).setUserId(userId).setUserType(userType)
|
|
||||||
.setMessageType(messageType).setMessageContent(messageContent);
|
|
||||||
rocketMQTemplate.syncSend(topic, mqMessage);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -13,6 +13,7 @@
|
|||||||
<modules>
|
<modules>
|
||||||
<module>tashow-module-system</module>
|
<module>tashow-module-system</module>
|
||||||
<module>tashow-module-infra</module>
|
<module>tashow-module-infra</module>
|
||||||
|
<module>tashow-module-app</module>
|
||||||
</modules>
|
</modules>
|
||||||
|
|
||||||
</project>
|
</project>
|
||||||
|
|||||||
32
tashow-module/tashow-module-app/pom.xml
Normal file
32
tashow-module/tashow-module-app/pom.xml
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<parent>
|
||||||
|
<groupId>com.tashow.cloud</groupId>
|
||||||
|
<artifactId>tashow-module</artifactId>
|
||||||
|
<version>${revision}</version>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<artifactId>tashow-module-app</artifactId>
|
||||||
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<!-- Spring Cloud 基础 -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.tashow.cloud</groupId>
|
||||||
|
<artifactId>tashow-framework-env</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.tashow.cloud</groupId>
|
||||||
|
<artifactId>tashow-framework-websocket</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.tashow.cloud</groupId>
|
||||||
|
<artifactId>tashow-data-redis</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.tashow.cloud</groupId>
|
||||||
|
<artifactId>tashow-framework-security</artifactId>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
</project>
|
||||||
@@ -0,0 +1,16 @@
|
|||||||
|
package com.tashow.cloud.app;
|
||||||
|
|
||||||
|
import org.springframework.boot.SpringApplication;
|
||||||
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Hello world!
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@SpringBootApplication
|
||||||
|
public class AppServerApplication {
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
SpringApplication.run(AppServerApplication.class, args);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,4 @@
|
|||||||
|
package com.tashow.cloud.app.controller;
|
||||||
|
|
||||||
|
public class LoginController {
|
||||||
|
}
|
||||||
@@ -0,0 +1,2 @@
|
|||||||
|
package com.tashow.cloud.app.dal.dataobject;
|
||||||
|
// 数据库对象
|
||||||
@@ -0,0 +1,2 @@
|
|||||||
|
package com.tashow.cloud.app.dal.dto;
|
||||||
|
// 视图层与业务层传输对象
|
||||||
@@ -0,0 +1 @@
|
|||||||
|
package com.tashow.cloud.app.dal;
|
||||||
@@ -0,0 +1,2 @@
|
|||||||
|
package com.tashow.cloud.app.dal.vo;
|
||||||
|
// 视图参数接收
|
||||||
@@ -0,0 +1,50 @@
|
|||||||
|
package com.tashow.cloud.app.security.config;
|
||||||
|
|
||||||
|
import com.tashow.cloud.infraapi.enums.ApiConstants;
|
||||||
|
import com.tashow.cloud.security.security.config.AuthorizeRequestsCustomizer;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
|
||||||
|
import org.springframework.security.config.annotation.web.configurers.AuthorizeHttpRequestsConfigurer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Infra 模块的 Security 配置
|
||||||
|
*/
|
||||||
|
@Configuration(proxyBeanMethods = false, value = "infraSecurityConfiguration")
|
||||||
|
public class SecurityConfiguration {
|
||||||
|
|
||||||
|
@Value("${spring.boot.admin.context-path:''}")
|
||||||
|
private String adminSeverContextPath;
|
||||||
|
|
||||||
|
@Bean("infraAuthorizeRequestsCustomizer")
|
||||||
|
public AuthorizeRequestsCustomizer authorizeRequestsCustomizer() {
|
||||||
|
return new AuthorizeRequestsCustomizer() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void customize(AuthorizeHttpRequestsConfigurer<HttpSecurity>.AuthorizationManagerRequestMatcherRegistry registry) {
|
||||||
|
// Swagger 接口文档
|
||||||
|
registry.requestMatchers("/v3/api-docs/**").permitAll()
|
||||||
|
.requestMatchers("/webjars/**").permitAll()
|
||||||
|
.requestMatchers("/swagger-ui").permitAll()
|
||||||
|
.requestMatchers("/swagger-ui/**").permitAll();
|
||||||
|
// Spring Boot Actuator 的安全配置
|
||||||
|
registry.requestMatchers("/actuator").permitAll()
|
||||||
|
.requestMatchers("/actuator/**").permitAll();
|
||||||
|
// Druid 监控
|
||||||
|
registry.requestMatchers("/druid/**").permitAll();
|
||||||
|
// Spring Boot Admin Server 的安全配置
|
||||||
|
registry.requestMatchers(adminSeverContextPath).permitAll()
|
||||||
|
.requestMatchers(adminSeverContextPath + "/**").permitAll();
|
||||||
|
// 文件读取
|
||||||
|
registry.requestMatchers(buildAdminApi("/infra/file/*/get/**")).permitAll();
|
||||||
|
|
||||||
|
// TODO 芋艿:这个每个项目都需要重复配置,得捉摸有没通用的方案
|
||||||
|
// RPC 服务的安全配置
|
||||||
|
registry.requestMatchers(ApiConstants.PREFIX + "/**").permitAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,4 @@
|
|||||||
|
/**
|
||||||
|
* 占位
|
||||||
|
*/
|
||||||
|
package com.tashow.cloud.app.security.core;
|
||||||
@@ -0,0 +1,16 @@
|
|||||||
|
--- #################### 注册中心 + 配置中心相关配置 ####################
|
||||||
|
|
||||||
|
spring:
|
||||||
|
cloud:
|
||||||
|
nacos:
|
||||||
|
server-addr: 43.139.42.137:8848 # Nacos 服务器地址
|
||||||
|
username: nacos # Nacos 账号
|
||||||
|
password: nacos # Nacos 密码
|
||||||
|
discovery: # 【配置中心】配置项
|
||||||
|
namespace: dev # 命名空间。这里使用 dev 开发环境
|
||||||
|
group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP
|
||||||
|
metadata:
|
||||||
|
version: 1.0.0 # 服务实例的版本号,可用于灰度发布
|
||||||
|
config: # 【注册中心】配置项
|
||||||
|
namespace: dev # 命名空间。这里使用 dev 开发环境
|
||||||
|
group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP
|
||||||
@@ -0,0 +1,12 @@
|
|||||||
|
server:
|
||||||
|
port: 48083
|
||||||
|
spring:
|
||||||
|
application:
|
||||||
|
name: app-server
|
||||||
|
profiles:
|
||||||
|
active: local
|
||||||
|
config:
|
||||||
|
import:
|
||||||
|
- optional:classpath:application-${spring.profiles.active}.yaml # 加载【本地】配置
|
||||||
|
- optional:nacos:application.yaml # 加载【Nacos】的配置
|
||||||
|
- optional:nacos:${spring.application.name}-${spring.profiles.active}.yaml # 加载【Nacos】的配置
|
||||||
Reference in New Issue
Block a user