diff --git a/tashow-framework/tashow-data-redis/src/main/java/com/tashow/cloud/redis/config/CacheAutoConfiguration.java b/tashow-framework/tashow-data-redis/src/main/java/com/tashow/cloud/redis/config/TashowCacheAutoConfiguration.java similarity index 96% rename from tashow-framework/tashow-data-redis/src/main/java/com/tashow/cloud/redis/config/CacheAutoConfiguration.java rename to tashow-framework/tashow-data-redis/src/main/java/com/tashow/cloud/redis/config/TashowCacheAutoConfiguration.java index dcb0e72..37c80f6 100644 --- a/tashow-framework/tashow-data-redis/src/main/java/com/tashow/cloud/redis/config/CacheAutoConfiguration.java +++ b/tashow-framework/tashow-data-redis/src/main/java/com/tashow/cloud/redis/config/TashowCacheAutoConfiguration.java @@ -19,7 +19,7 @@ import org.springframework.util.StringUtils; import java.util.Objects; -import static com.tashow.cloud.redis.config.RedisAutoConfiguration.buildRedisSerializer; +import static com.tashow.cloud.redis.config.TashowRedisAutoConfiguration.buildRedisSerializer; /** @@ -28,7 +28,7 @@ import static com.tashow.cloud.redis.config.RedisAutoConfiguration.buildRedisSer @AutoConfiguration @EnableConfigurationProperties({CacheProperties.class, TashowCacheProperties.class}) @EnableCaching -public class CacheAutoConfiguration { +public class TashowCacheAutoConfiguration { /** * RedisCacheConfiguration Bean diff --git a/tashow-framework/tashow-data-redis/src/main/java/com/tashow/cloud/redis/config/RedisAutoConfiguration.java b/tashow-framework/tashow-data-redis/src/main/java/com/tashow/cloud/redis/config/TashowRedisAutoConfiguration.java similarity index 97% rename from tashow-framework/tashow-data-redis/src/main/java/com/tashow/cloud/redis/config/RedisAutoConfiguration.java rename to tashow-framework/tashow-data-redis/src/main/java/com/tashow/cloud/redis/config/TashowRedisAutoConfiguration.java index 39a9ee7..116c615 100644 --- a/tashow-framework/tashow-data-redis/src/main/java/com/tashow/cloud/redis/config/RedisAutoConfiguration.java +++ b/tashow-framework/tashow-data-redis/src/main/java/com/tashow/cloud/redis/config/TashowRedisAutoConfiguration.java @@ -14,7 +14,7 @@ import org.springframework.data.redis.serializer.RedisSerializer; * Redis 配置类 */ @AutoConfiguration(before = RedissonAutoConfigurationV2.class) // 目的:使用自己定义的 RedisTemplate Bean -public class RedisAutoConfiguration { +public class TashowRedisAutoConfiguration { /** * 创建 RedisTemplate Bean,使用 JSON 序列化方式 diff --git a/tashow-framework/tashow-data-redis/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/tashow-framework/tashow-data-redis/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index bcaa8a2..3b32e89 100644 --- a/tashow-framework/tashow-data-redis/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/tashow-framework/tashow-data-redis/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1,2 +1,2 @@ -com.tashow.cloud.redis.config.RedisAutoConfiguration -com.tashow.cloud.redis.config.CacheAutoConfiguration +com.tashow.cloud.redis.config.TashowRedisAutoConfiguration +com.tashow.cloud.redis.config.TashowCacheAutoConfiguration diff --git a/tashow-framework/tashow-framework-mq/pom.xml b/tashow-framework/tashow-framework-mq/pom.xml index 41a37c1..11b483f 100644 --- a/tashow-framework/tashow-framework-mq/pom.xml +++ b/tashow-framework/tashow-framework-mq/pom.xml @@ -21,22 +21,11 @@ tashow-data-redis - - - org.springframework.kafka - spring-kafka - true - org.springframework.amqp spring-rabbit true - - org.apache.rocketmq - rocketmq-spring-boot-starter - true - diff --git a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/package-info.java b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/package-info.java new file mode 100644 index 0000000..356754a --- /dev/null +++ b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/package-info.java @@ -0,0 +1 @@ +package com.tashow.cloud.mq; \ No newline at end of file diff --git a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/config/RedisMQConsumerAutoConfiguration.java b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/config/RedisMQConsumerAutoConfiguration.java deleted file mode 100644 index c26f4d2..0000000 --- a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/config/RedisMQConsumerAutoConfiguration.java +++ /dev/null @@ -1,151 +0,0 @@ -package com.tashow.cloud.mq.redis.config; - -import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.StrUtil; -import cn.hutool.system.SystemUtil; -import com.tashow.cloud.common.enums.DocumentEnum; -import com.tashow.cloud.mq.redis.core.RedisMQTemplate; -import com.tashow.cloud.mq.redis.core.job.RedisPendingMessageResendJob; -import com.tashow.cloud.mq.redis.core.pubsub.AbstractRedisChannelMessageListener; -import com.tashow.cloud.mq.redis.core.stream.AbstractRedisStreamMessageListener; -import com.tashow.cloud.redis.config.RedisAutoConfiguration; -import lombok.extern.slf4j.Slf4j; -import org.redisson.api.RedissonClient; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.AutoConfiguration; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.context.annotation.Bean; -import org.springframework.data.redis.connection.RedisServerCommands; -import org.springframework.data.redis.connection.stream.Consumer; -import org.springframework.data.redis.connection.stream.ObjectRecord; -import org.springframework.data.redis.connection.stream.ReadOffset; -import org.springframework.data.redis.connection.stream.StreamOffset; -import org.springframework.data.redis.core.RedisCallback; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.data.redis.listener.ChannelTopic; -import org.springframework.data.redis.listener.RedisMessageListenerContainer; -import org.springframework.data.redis.stream.StreamMessageListenerContainer; -import org.springframework.scheduling.annotation.EnableScheduling; - -import java.util.List; -import java.util.Properties; - -/** - * Redis 消息队列 Consumer 配置类 - * - * @author 芋道源码 - */ -@Slf4j -@EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息 -@AutoConfiguration(after = RedisAutoConfiguration.class) -public class RedisMQConsumerAutoConfiguration { - - /** - * 创建 Redis Pub/Sub 广播消费的容器 - */ - @Bean - @ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听 - public RedisMessageListenerContainer redisMessageListenerContainer( - RedisMQTemplate redisMQTemplate, List> listeners) { - // 创建 RedisMessageListenerContainer 对象 - RedisMessageListenerContainer container = new RedisMessageListenerContainer(); - // 设置 RedisConnection 工厂。 - container.setConnectionFactory(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory()); - // 添加监听器 - listeners.forEach(listener -> { - listener.setRedisMQTemplate(redisMQTemplate); - container.addMessageListener(listener, new ChannelTopic(listener.getChannel())); - log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]", - listener.getChannel(), listener.getClass().getName()); - }); - return container; - } - - /** - * 创建 Redis Stream 重新消费的任务 - */ - @Bean - @ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听 - public RedisPendingMessageResendJob redisPendingMessageResendJob(List> listeners, - RedisMQTemplate redisTemplate, - @Value("${spring.application.name}") String groupName, - RedissonClient redissonClient) { - return new RedisPendingMessageResendJob(listeners, redisTemplate, groupName, redissonClient); - } - - /** - * 创建 Redis Stream 集群消费的容器 - * - * 基础知识:Redis Stream 的 xreadgroup 命令 - */ - @Bean(initMethod = "start", destroyMethod = "stop") - @ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听 - public StreamMessageListenerContainer> redisStreamMessageListenerContainer( - RedisMQTemplate redisMQTemplate, List> listeners) { - RedisTemplate redisTemplate = redisMQTemplate.getRedisTemplate(); - checkRedisVersion(redisTemplate); - // 第一步,创建 StreamMessageListenerContainer 容器 - // 创建 options 配置 - StreamMessageListenerContainer.StreamMessageListenerContainerOptions> containerOptions = - StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() - .batchSize(10) // 一次性最多拉取多少条消息 - .targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化 - .build(); - // 创建 container 对象 - StreamMessageListenerContainer> container = - StreamMessageListenerContainer.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions); - - // 第二步,注册监听器,消费对应的 Stream 主题 - String consumerName = buildConsumerName(); - listeners.parallelStream().forEach(listener -> { - log.info("[redisStreamMessageListenerContainer][开始注册 StreamKey({}) 对应的监听器({})]", - listener.getStreamKey(), listener.getClass().getName()); - // 创建 listener 对应的消费者分组 - try { - redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup()); - } catch (Exception ignore) { - } - // 设置 listener 对应的 redisTemplate - listener.setRedisMQTemplate(redisMQTemplate); - // 创建 Consumer 对象 - Consumer consumer = Consumer.from(listener.getGroup(), consumerName); - // 设置 Consumer 消费进度,以最小消费进度为准 - StreamOffset streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed()); - // 设置 Consumer 监听 - StreamMessageListenerContainer.StreamReadRequestBuilder builder = StreamMessageListenerContainer.StreamReadRequest - .builder(streamOffset).consumer(consumer) - .autoAcknowledge(false) // 不自动 ack - .cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false - container.register(builder.build(), listener); - log.info("[redisStreamMessageListenerContainer][完成注册 StreamKey({}) 对应的监听器({})]", - listener.getStreamKey(), listener.getClass().getName()); - }); - return container; - } - - /** - * 构建消费者名字,使用本地 IP + 进程编号的方式。 - * 参考自 RocketMQ clientId 的实现 - * - * @return 消费者名字 - */ - private static String buildConsumerName() { - return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID()); - } - - /** - * 校验 Redis 版本号,是否满足最低的版本号要求! - */ - private static void checkRedisVersion(RedisTemplate redisTemplate) { - // 获得 Redis 版本 - Properties info = redisTemplate.execute((RedisCallback) RedisServerCommands::info); - String version = MapUtil.getStr(info, "redis_version"); - // 校验最低版本必须大于等于 5.0.0 - int majorVersion = Integer.parseInt(StrUtil.subBefore(version, '.', false)); - if (majorVersion < 5) { - throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!" + - "请参考 {} 文档进行安装。", version, DocumentEnum.REDIS_INSTALL.getUrl())); - } - } - -} diff --git a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/config/RedisMQProducerAutoConfiguration.java b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/config/RedisMQProducerAutoConfiguration.java deleted file mode 100644 index 0b2a62a..0000000 --- a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/config/RedisMQProducerAutoConfiguration.java +++ /dev/null @@ -1,31 +0,0 @@ -package com.tashow.cloud.mq.redis.config; - -import com.tashow.cloud.mq.redis.core.RedisMQTemplate; -import com.tashow.cloud.mq.redis.core.interceptor.RedisMessageInterceptor; -import com.tashow.cloud.redis.config.RedisAutoConfiguration; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.AutoConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.data.redis.core.StringRedisTemplate; - -import java.util.List; - -/** - * Redis 消息队列 Producer 配置类 - * - * @author 芋道源码 - */ -@Slf4j -@AutoConfiguration(after = RedisAutoConfiguration.class) -public class RedisMQProducerAutoConfiguration { - - @Bean - public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate, - List interceptors) { - RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate); - // 添加拦截器 - interceptors.forEach(redisMQTemplate::addInterceptor); - return redisMQTemplate; - } - -} diff --git a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/core/RedisMQTemplate.java b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/core/RedisMQTemplate.java deleted file mode 100644 index 6d79c41..0000000 --- a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/core/RedisMQTemplate.java +++ /dev/null @@ -1,87 +0,0 @@ -package com.tashow.cloud.mq.redis.core; - -import com.tashow.cloud.common.util.json.JsonUtils; -import com.tashow.cloud.mq.redis.core.interceptor.RedisMessageInterceptor; -import com.tashow.cloud.mq.redis.core.message.AbstractRedisMessage; -import com.tashow.cloud.mq.redis.core.pubsub.AbstractRedisChannelMessage; -import com.tashow.cloud.mq.redis.core.stream.AbstractRedisStreamMessage; -import lombok.AllArgsConstructor; -import lombok.Getter; -import org.springframework.data.redis.connection.stream.RecordId; -import org.springframework.data.redis.connection.stream.StreamRecords; -import org.springframework.data.redis.core.RedisTemplate; - -import java.util.ArrayList; -import java.util.List; - -/** - * Redis MQ 操作模板类 - * - * @author 芋道源码 - */ -@AllArgsConstructor -public class RedisMQTemplate { - - @Getter - private final RedisTemplate redisTemplate; - /** - * 拦截器数组 - */ - @Getter - private final List interceptors = new ArrayList<>(); - - /** - * 发送 Redis 消息,基于 Redis pub/sub 实现 - * - * @param message 消息 - */ - public void send(T message) { - try { - sendMessageBefore(message); - // 发送消息 - redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message)); - } finally { - sendMessageAfter(message); - } - } - - /** - * 发送 Redis 消息,基于 Redis Stream 实现 - * - * @param message 消息 - * @return 消息记录的编号对象 - */ - public RecordId send(T message) { - try { - sendMessageBefore(message); - // 发送消息 - return redisTemplate.opsForStream().add(StreamRecords.newRecord() - .ofObject(JsonUtils.toJsonString(message)) // 设置内容 - .withStreamKey(message.getStreamKey())); // 设置 stream key - } finally { - sendMessageAfter(message); - } - } - - /** - * 添加拦截器 - * - * @param interceptor 拦截器 - */ - public void addInterceptor(RedisMessageInterceptor interceptor) { - interceptors.add(interceptor); - } - - private void sendMessageBefore(AbstractRedisMessage message) { - // 正序 - interceptors.forEach(interceptor -> interceptor.sendMessageBefore(message)); - } - - private void sendMessageAfter(AbstractRedisMessage message) { - // 倒序 - for (int i = interceptors.size() - 1; i >= 0; i--) { - interceptors.get(i).sendMessageAfter(message); - } - } - -} diff --git a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/core/interceptor/RedisMessageInterceptor.java b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/core/interceptor/RedisMessageInterceptor.java deleted file mode 100644 index 3cea3ff..0000000 --- a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/core/interceptor/RedisMessageInterceptor.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.tashow.cloud.mq.redis.core.interceptor; - -import com.tashow.cloud.mq.redis.core.message.AbstractRedisMessage; - -/** - * {@link AbstractRedisMessage} 消息拦截器 - * 通过拦截器,作为插件机制,实现拓展。 - * 例如说,多租户场景下的 MQ 消息处理 - * - * @author 芋道源码 - */ -public interface RedisMessageInterceptor { - - default void sendMessageBefore(AbstractRedisMessage message) { - } - - default void sendMessageAfter(AbstractRedisMessage message) { - } - - default void consumeMessageBefore(AbstractRedisMessage message) { - } - - default void consumeMessageAfter(AbstractRedisMessage message) { - } - -} diff --git a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/core/job/RedisPendingMessageResendJob.java b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/core/job/RedisPendingMessageResendJob.java deleted file mode 100644 index 7ddd81d..0000000 --- a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/core/job/RedisPendingMessageResendJob.java +++ /dev/null @@ -1,100 +0,0 @@ -package com.tashow.cloud.mq.redis.core.job; - -import cn.hutool.core.collection.CollUtil; -import com.tashow.cloud.mq.redis.core.RedisMQTemplate; -import com.tashow.cloud.mq.redis.core.stream.AbstractRedisStreamMessageListener; -import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.redisson.api.RLock; -import org.redisson.api.RedissonClient; -import org.springframework.data.domain.Range; -import org.springframework.data.redis.connection.stream.*; -import org.springframework.data.redis.core.StreamOperations; -import org.springframework.scheduling.annotation.Scheduled; - -import java.util.List; -import java.util.Map; -import java.util.Objects; - -/** - * 这个任务用于处理,crash 之后的消费者未消费完的消息 - */ -@Slf4j -@AllArgsConstructor -public class RedisPendingMessageResendJob { - - private static final String LOCK_KEY = "redis:pending:msg:lock"; - - /** - * 消息超时时间,默认 5 分钟 - * - * 1. 超时的消息才会被重新投递 - * 2. 由于定时任务 1 分钟一次,消息超时后不会被立即重投,极端情况下消息5分钟过期后,再等 1 分钟才会被扫瞄到 - */ - private static final int EXPIRE_TIME = 5 * 60; - - private final List> listeners; - private final RedisMQTemplate redisTemplate; - private final String groupName; - private final RedissonClient redissonClient; - - /** - * 一分钟执行一次,这里选择每分钟的35秒执行,是为了避免整点任务过多的问题 - */ - @Scheduled(cron = "35 * * * * ?") - public void messageResend() { - RLock lock = redissonClient.getLock(LOCK_KEY); - // 尝试加锁 - if (lock.tryLock()) { - try { - execute(); - } catch (Exception ex) { - log.error("[messageResend][执行异常]", ex); - } finally { - lock.unlock(); - } - } - } - - /** - * 执行清理逻辑 - * - * @see 讨论 - */ - private void execute() { - StreamOperations ops = redisTemplate.getRedisTemplate().opsForStream(); - listeners.forEach(listener -> { - PendingMessagesSummary pendingMessagesSummary = Objects.requireNonNull(ops.pending(listener.getStreamKey(), groupName)); - // 每个消费者的 pending 队列消息数量 - Map pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer(); - pendingMessagesPerConsumer.forEach((consumerName, pendingMessageCount) -> { - log.info("[processPendingMessage][消费者({}) 消息数量({})]", consumerName, pendingMessageCount); - // 每个消费者的 pending消息的详情信息 - PendingMessages pendingMessages = ops.pending(listener.getStreamKey(), Consumer.from(groupName, consumerName), Range.unbounded(), pendingMessageCount); - if (pendingMessages.isEmpty()) { - return; - } - pendingMessages.forEach(pendingMessage -> { - // 获取消息上一次传递到 consumer 的时间, - long lastDelivery = pendingMessage.getElapsedTimeSinceLastDelivery().getSeconds(); - if (lastDelivery < EXPIRE_TIME){ - return; - } - // 获取指定 id 的消息体 - List> records = ops.range(listener.getStreamKey(), - Range.of(Range.Bound.inclusive(pendingMessage.getIdAsString()), Range.Bound.inclusive(pendingMessage.getIdAsString()))); - if (CollUtil.isEmpty(records)) { - return; - } - // 重新投递消息 - redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord() - .ofObject(records.get(0).getValue()) // 设置内容 - .withStreamKey(listener.getStreamKey())); - // ack 消息消费完成 - redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, records.get(0)); - log.info("[processPendingMessage][消息({})重新投递成功]", records.get(0).getId()); - }); - }); - }); - } -} diff --git a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/core/message/AbstractRedisMessage.java b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/core/message/AbstractRedisMessage.java deleted file mode 100644 index 546934a..0000000 --- a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/core/message/AbstractRedisMessage.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.tashow.cloud.mq.redis.core.message; - -import lombok.Data; - -import java.util.HashMap; -import java.util.Map; - -/** - * Redis 消息抽象基类 - * - * @author 芋道源码 - */ -@Data -public abstract class AbstractRedisMessage { - - /** - * 头 - */ - private Map headers = new HashMap<>(); - - public String getHeader(String key) { - return headers.get(key); - } - - public void addHeader(String key, String value) { - headers.put(key, value); - } - -} diff --git a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/core/pubsub/AbstractRedisChannelMessage.java b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/core/pubsub/AbstractRedisChannelMessage.java deleted file mode 100644 index 3a46d92..0000000 --- a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/core/pubsub/AbstractRedisChannelMessage.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.tashow.cloud.mq.redis.core.pubsub; - -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.tashow.cloud.mq.redis.core.message.AbstractRedisMessage; - -/** - * Redis Channel Message 抽象类 - * - * @author 芋道源码 - */ -public abstract class AbstractRedisChannelMessage extends AbstractRedisMessage { - - /** - * 获得 Redis Channel,默认使用类名 - * - * @return Channel - */ - @JsonIgnore // 避免序列化。原因是,Redis 发布 Channel 消息的时候,已经会指定。 - public String getChannel() { - return getClass().getSimpleName(); - } - -} diff --git a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/core/pubsub/AbstractRedisChannelMessageListener.java b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/core/pubsub/AbstractRedisChannelMessageListener.java deleted file mode 100644 index b64a550..0000000 --- a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/core/pubsub/AbstractRedisChannelMessageListener.java +++ /dev/null @@ -1,103 +0,0 @@ -package com.tashow.cloud.mq.redis.core.pubsub; - -import cn.hutool.core.util.TypeUtil; -import com.tashow.cloud.common.util.json.JsonUtils; -import com.tashow.cloud.mq.redis.core.RedisMQTemplate; -import com.tashow.cloud.mq.redis.core.interceptor.RedisMessageInterceptor; -import com.tashow.cloud.mq.redis.core.message.AbstractRedisMessage; -import lombok.Setter; -import lombok.SneakyThrows; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.connection.MessageListener; - -import java.lang.reflect.Type; -import java.util.List; - -/** - * Redis Pub/Sub 监听器抽象类,用于实现广播消费 - * - * @param 消息类型。一定要填写噢,不然会报错 - * - * @author 芋道源码 - */ -public abstract class AbstractRedisChannelMessageListener implements MessageListener { - - /** - * 消息类型 - */ - private final Class messageType; - /** - * Redis Channel - */ - private final String channel; - /** - * RedisMQTemplate - */ - @Setter - private RedisMQTemplate redisMQTemplate; - - @SneakyThrows - protected AbstractRedisChannelMessageListener() { - this.messageType = getMessageClass(); - this.channel = messageType.getDeclaredConstructor().newInstance().getChannel(); - } - - /** - * 获得 Sub 订阅的 Redis Channel 通道 - * - * @return channel - */ - public final String getChannel() { - return channel; - } - - @Override - public final void onMessage(Message message, byte[] bytes) { - T messageObj = JsonUtils.parseObject(message.getBody(), messageType); - try { - consumeMessageBefore(messageObj); - // 消费消息 - this.onMessage(messageObj); - } finally { - consumeMessageAfter(messageObj); - } - } - - /** - * 处理消息 - * - * @param message 消息 - */ - public abstract void onMessage(T message); - - /** - * 通过解析类上的泛型,获得消息类型 - * - * @return 消息类型 - */ - @SuppressWarnings("unchecked") - private Class getMessageClass() { - Type type = TypeUtil.getTypeArgument(getClass(), 0); - if (type == null) { - throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName())); - } - return (Class) type; - } - - private void consumeMessageBefore(AbstractRedisMessage message) { - assert redisMQTemplate != null; - List interceptors = redisMQTemplate.getInterceptors(); - // 正序 - interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message)); - } - - private void consumeMessageAfter(AbstractRedisMessage message) { - assert redisMQTemplate != null; - List interceptors = redisMQTemplate.getInterceptors(); - // 倒序 - for (int i = interceptors.size() - 1; i >= 0; i--) { - interceptors.get(i).consumeMessageAfter(message); - } - } - -} diff --git a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/core/stream/AbstractRedisStreamMessage.java b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/core/stream/AbstractRedisStreamMessage.java deleted file mode 100644 index d254d84..0000000 --- a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/core/stream/AbstractRedisStreamMessage.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.tashow.cloud.mq.redis.core.stream; - -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.tashow.cloud.mq.redis.core.message.AbstractRedisMessage; - -/** - * Redis Stream Message 抽象类 - * - * @author 芋道源码 - */ -public abstract class AbstractRedisStreamMessage extends AbstractRedisMessage { - - /** - * 获得 Redis Stream Key,默认使用类名 - * - * @return Channel - */ - @JsonIgnore // 避免序列化 - public String getStreamKey() { - return getClass().getSimpleName(); - } - -} diff --git a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/core/stream/AbstractRedisStreamMessageListener.java b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/core/stream/AbstractRedisStreamMessageListener.java deleted file mode 100644 index bf30a9b..0000000 --- a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/core/stream/AbstractRedisStreamMessageListener.java +++ /dev/null @@ -1,113 +0,0 @@ -package com.tashow.cloud.mq.redis.core.stream; - -import cn.hutool.core.util.TypeUtil; -import com.tashow.cloud.common.util.json.JsonUtils; -import com.tashow.cloud.mq.redis.core.RedisMQTemplate; -import com.tashow.cloud.mq.redis.core.interceptor.RedisMessageInterceptor; -import com.tashow.cloud.mq.redis.core.message.AbstractRedisMessage; -import lombok.Getter; -import lombok.Setter; -import lombok.SneakyThrows; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.data.redis.connection.stream.ObjectRecord; -import org.springframework.data.redis.stream.StreamListener; - -import java.lang.reflect.Type; -import java.util.List; - -/** - * Redis Stream 监听器抽象类,用于实现集群消费 - * - * @param 消息类型。一定要填写噢,不然会报错 - * - * @author 芋道源码 - */ -public abstract class AbstractRedisStreamMessageListener - implements StreamListener> { - - /** - * 消息类型 - */ - private final Class messageType; - /** - * Redis Channel - */ - @Getter - private final String streamKey; - - /** - * Redis 消费者分组,默认使用 spring.application.name 名字 - */ - @Value("${spring.application.name}") - @Getter - private String group; - /** - * RedisMQTemplate - */ - @Setter - private RedisMQTemplate redisMQTemplate; - - @SneakyThrows - protected AbstractRedisStreamMessageListener() { - this.messageType = getMessageClass(); - this.streamKey = messageType.getDeclaredConstructor().newInstance().getStreamKey(); - } - - @Override - public void onMessage(ObjectRecord message) { - // 消费消息 - T messageObj = JsonUtils.parseObject(message.getValue(), messageType); - try { - consumeMessageBefore(messageObj); - // 消费消息 - this.onMessage(messageObj); - // ack 消息消费完成 - redisMQTemplate.getRedisTemplate().opsForStream().acknowledge(group, message); - // TODO 芋艿:需要额外考虑以下几个点: - // 1. 处理异常的情况 - // 2. 发送日志;以及事务的结合 - // 3. 消费日志;以及通用的幂等性 - // 4. 消费失败的重试,https://zhuanlan.zhihu.com/p/60501638 - } finally { - consumeMessageAfter(messageObj); - } - } - - /** - * 处理消息 - * - * @param message 消息 - */ - public abstract void onMessage(T message); - - /** - * 通过解析类上的泛型,获得消息类型 - * - * @return 消息类型 - */ - @SuppressWarnings("unchecked") - private Class getMessageClass() { - Type type = TypeUtil.getTypeArgument(getClass(), 0); - if (type == null) { - throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName())); - } - return (Class) type; - } - - private void consumeMessageBefore(AbstractRedisMessage message) { - assert redisMQTemplate != null; - List interceptors = redisMQTemplate.getInterceptors(); - // 正序 - interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message)); - } - - private void consumeMessageAfter(AbstractRedisMessage message) { - assert redisMQTemplate != null; - List interceptors = redisMQTemplate.getInterceptors(); - // 倒序 - for (int i = interceptors.size() - 1; i >= 0; i--) { - interceptors.get(i).consumeMessageAfter(message); - } - } - -} diff --git a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/package-info.java b/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/package-info.java deleted file mode 100644 index cc39879..0000000 --- a/tashow-framework/tashow-framework-mq/src/main/java/com/tashow/cloud/mq/redis/package-info.java +++ /dev/null @@ -1,6 +0,0 @@ -/** - * 消息队列,基于 Redis 提供: - * 1. 基于 Pub/Sub 实现广播消费 - * 2. 基于 Stream 实现集群消费 - */ -package com.tashow.cloud.mq.redis; diff --git a/tashow-framework/tashow-framework-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/tashow-framework/tashow-framework-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index ea037a6..7d713e1 100644 --- a/tashow-framework/tashow-framework-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/tashow-framework/tashow-framework-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1,3 +1 @@ -com.tashow.cloud.mq.redis.config.RedisMQProducerAutoConfiguration -com.tashow.cloud.mq.redis.config.RedisMQConsumerAutoConfiguration com.tashow.cloud.mq.rabbitmq.config.RabbitMQAutoConfiguration diff --git a/tashow-framework/tashow-framework-protection/src/main/java/com/tashow/cloud/protection/idempotent/config/IdempotentConfiguration.java b/tashow-framework/tashow-framework-protection/src/main/java/com/tashow/cloud/protection/idempotent/config/IdempotentConfiguration.java index 784bbad..67effc5 100644 --- a/tashow-framework/tashow-framework-protection/src/main/java/com/tashow/cloud/protection/idempotent/config/IdempotentConfiguration.java +++ b/tashow-framework/tashow-framework-protection/src/main/java/com/tashow/cloud/protection/idempotent/config/IdempotentConfiguration.java @@ -6,14 +6,14 @@ import com.tashow.cloud.protection.idempotent.core.aop.IdempotentAspect; import com.tashow.cloud.protection.idempotent.core.keyresolver.IdempotentKeyResolver; import com.tashow.cloud.protection.idempotent.core.keyresolver.impl.DefaultIdempotentKeyResolver; import com.tashow.cloud.protection.idempotent.core.redis.IdempotentRedisDAO; -import com.tashow.cloud.redis.config.RedisAutoConfiguration; +import com.tashow.cloud.redis.config.TashowRedisAutoConfiguration; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.data.redis.core.StringRedisTemplate; import java.util.List; -@AutoConfiguration(after = RedisAutoConfiguration.class) +@AutoConfiguration(after = TashowRedisAutoConfiguration.class) public class IdempotentConfiguration { @Bean diff --git a/tashow-framework/tashow-framework-protection/src/main/java/com/tashow/cloud/protection/ratelimiter/config/RateLimiterConfiguration.java b/tashow-framework/tashow-framework-protection/src/main/java/com/tashow/cloud/protection/ratelimiter/config/RateLimiterConfiguration.java index 0ad87a8..b982bf4 100644 --- a/tashow-framework/tashow-framework-protection/src/main/java/com/tashow/cloud/protection/ratelimiter/config/RateLimiterConfiguration.java +++ b/tashow-framework/tashow-framework-protection/src/main/java/com/tashow/cloud/protection/ratelimiter/config/RateLimiterConfiguration.java @@ -3,7 +3,7 @@ package com.tashow.cloud.protection.ratelimiter.config; import com.tashow.cloud.protection.ratelimiter.core.aop.RateLimiterAspect; import com.tashow.cloud.protection.ratelimiter.core.keyresolver.RateLimiterKeyResolver; import com.tashow.cloud.protection.ratelimiter.core.keyresolver.impl.*; -import com.tashow.cloud.redis.config.RedisAutoConfiguration; +import com.tashow.cloud.redis.config.TashowRedisAutoConfiguration; import com.tashow.cloud.protection.ratelimiter.core.redis.RateLimiterRedisDAO; import org.redisson.api.RedissonClient; import org.springframework.boot.autoconfigure.AutoConfiguration; @@ -11,7 +11,7 @@ import org.springframework.context.annotation.Bean; import java.util.List; -@AutoConfiguration(after = RedisAutoConfiguration.class) +@AutoConfiguration(after = TashowRedisAutoConfiguration.class) public class RateLimiterConfiguration { @Bean diff --git a/tashow-framework/tashow-framework-protection/src/main/java/com/tashow/cloud/protection/signature/config/ApiSignatureAutoConfiguration.java b/tashow-framework/tashow-framework-protection/src/main/java/com/tashow/cloud/protection/signature/config/ApiSignatureAutoConfiguration.java index fcf1394..dbb4268 100644 --- a/tashow-framework/tashow-framework-protection/src/main/java/com/tashow/cloud/protection/signature/config/ApiSignatureAutoConfiguration.java +++ b/tashow-framework/tashow-framework-protection/src/main/java/com/tashow/cloud/protection/signature/config/ApiSignatureAutoConfiguration.java @@ -2,7 +2,7 @@ package com.tashow.cloud.protection.signature.config; import com.tashow.cloud.protection.signature.core.redis.ApiSignatureRedisDAO; import com.tashow.cloud.protection.signature.core.aop.ApiSignatureAspect; -import com.tashow.cloud.redis.config.RedisAutoConfiguration; +import com.tashow.cloud.redis.config.TashowRedisAutoConfiguration; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.data.redis.core.StringRedisTemplate; @@ -12,7 +12,7 @@ import org.springframework.data.redis.core.StringRedisTemplate; * * @author Zhougang */ -@AutoConfiguration(after = RedisAutoConfiguration.class) +@AutoConfiguration(after = TashowRedisAutoConfiguration.class) public class ApiSignatureAutoConfiguration { @Bean diff --git a/tashow-framework/tashow-framework-tenant/src/main/java/com/tashow/cloud/tenant/config/TenantAutoConfiguration.java b/tashow-framework/tashow-framework-tenant/src/main/java/com/tashow/cloud/tenant/config/TenantAutoConfiguration.java index 2771086..6112d02 100644 --- a/tashow-framework/tashow-framework-tenant/src/main/java/com/tashow/cloud/tenant/config/TenantAutoConfiguration.java +++ b/tashow-framework/tashow-framework-tenant/src/main/java/com/tashow/cloud/tenant/config/TenantAutoConfiguration.java @@ -1,17 +1,15 @@ package com.tashow.cloud.tenant.config; +import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor; +import com.baomidou.mybatisplus.extension.plugins.inner.TenantLineInnerInterceptor; import com.tashow.cloud.common.enums.WebFilterOrderEnum; import com.tashow.cloud.mybatis.mybatis.core.util.MyBatisUtils; import com.tashow.cloud.redis.config.TashowCacheProperties; import com.tashow.cloud.systemapi.api.tenant.TenantApi; -import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor; -import com.baomidou.mybatisplus.extension.plugins.inner.TenantLineInnerInterceptor; import com.tashow.cloud.tenant.core.aop.TenantIgnoreAspect; import com.tashow.cloud.tenant.core.db.TenantDatabaseInterceptor; import com.tashow.cloud.tenant.core.job.TenantJobAspect; import com.tashow.cloud.tenant.core.mq.rabbitmq.TenantRabbitMQInitializer; -import com.tashow.cloud.tenant.core.mq.redis.TenantRedisMessageInterceptor; -import com.tashow.cloud.tenant.core.mq.rocketmq.TenantRocketMQInitializer; import com.tashow.cloud.tenant.core.redis.TenantRedisCacheManager; import com.tashow.cloud.tenant.core.security.TenantSecurityWebFilter; import com.tashow.cloud.tenant.core.service.TenantFrameworkService; @@ -25,7 +23,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.web.servlet.FilterRegistrationBean; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.data.redis.cache.BatchStrategies; import org.springframework.data.redis.cache.RedisCacheConfiguration; @@ -97,23 +94,6 @@ public class TenantAutoConfiguration { return new TenantJobAspect(tenantFrameworkService); } - // ========== MQ ========== - - /** - * 多租户 Redis 消息队列的配置类 - * - * 为什么要单独一个配置类呢?如果直接把 TenantRedisMessageInterceptor Bean 的初始化放外面,会报 RedisMessageInterceptor 类不存在的错误 - */ - @Configuration - @ConditionalOnClass(name = "com.tashow.cloud.mq.redis.core.RedisMQTemplate") - public static class TenantRedisMQAutoConfiguration { - - @Bean - public TenantRedisMessageInterceptor tenantRedisMessageInterceptor() { - return new TenantRedisMessageInterceptor(); - } - - } @Bean @ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate") @@ -121,12 +101,6 @@ public class TenantAutoConfiguration { return new TenantRabbitMQInitializer(); } - @Bean - @ConditionalOnClass(name = "org.apache.rocketmq.spring.core.RocketMQTemplate") - public TenantRocketMQInitializer tenantRocketMQInitializer() { - return new TenantRocketMQInitializer(); - } - // ========== Redis ========== @Bean diff --git a/tashow-framework/tashow-framework-tenant/src/main/java/com/tashow/cloud/tenant/core/mq/kafka/TenantKafkaEnvironmentPostProcessor.java b/tashow-framework/tashow-framework-tenant/src/main/java/com/tashow/cloud/tenant/core/mq/kafka/TenantKafkaEnvironmentPostProcessor.java deleted file mode 100644 index 71896db..0000000 --- a/tashow-framework/tashow-framework-tenant/src/main/java/com/tashow/cloud/tenant/core/mq/kafka/TenantKafkaEnvironmentPostProcessor.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.tashow.cloud.tenant.core.mq.kafka; - -import cn.hutool.core.util.StrUtil; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.env.EnvironmentPostProcessor; -import org.springframework.core.env.ConfigurableEnvironment; - -/** - * 多租户的 Kafka 的 {@link EnvironmentPostProcessor} 实现类 - * - * Kafka Producer 发送消息时,增加 {@link TenantKafkaProducerInterceptor} 拦截器 - * - * @author 芋道源码 - */ -@Slf4j -public class TenantKafkaEnvironmentPostProcessor implements EnvironmentPostProcessor { - - private static final String PROPERTY_KEY_INTERCEPTOR_CLASSES = "spring.kafka.producer.properties.interceptor.classes"; - - @Override - public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) { - // 添加 TenantKafkaProducerInterceptor 拦截器 - try { - String value = environment.getProperty(PROPERTY_KEY_INTERCEPTOR_CLASSES); - if (StrUtil.isEmpty(value)) { - value = TenantKafkaProducerInterceptor.class.getName(); - } else { - value += "," + TenantKafkaProducerInterceptor.class.getName(); - } - environment.getSystemProperties().put(PROPERTY_KEY_INTERCEPTOR_CLASSES, value); - } catch (NoClassDefFoundError ignore) { - // 如果触发 NoClassDefFoundError 异常,说明 TenantKafkaProducerInterceptor 类不存在,即没引入 kafka-spring 依赖 - } - } - -} diff --git a/tashow-framework/tashow-framework-tenant/src/main/java/com/tashow/cloud/tenant/core/mq/kafka/TenantKafkaProducerInterceptor.java b/tashow-framework/tashow-framework-tenant/src/main/java/com/tashow/cloud/tenant/core/mq/kafka/TenantKafkaProducerInterceptor.java deleted file mode 100644 index b55e215..0000000 --- a/tashow-framework/tashow-framework-tenant/src/main/java/com/tashow/cloud/tenant/core/mq/kafka/TenantKafkaProducerInterceptor.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.tashow.cloud.tenant.core.mq.kafka; - -import cn.hutool.core.util.ReflectUtil; -import com.tashow.cloud.tenant.core.context.TenantContextHolder; -import org.apache.kafka.clients.producer.ProducerInterceptor; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.header.Headers; -import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; - -import java.util.Map; - -import static com.tashow.cloud.web.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID; - - -/** - * Kafka 消息队列的多租户 {@link ProducerInterceptor} 实现类 - * - * 1. Producer 发送消息时,将 {@link TenantContextHolder} 租户编号,添加到消息的 Header 中 - * 2. Consumer 消费消息时,将消息的 Header 的租户编号,添加到 {@link TenantContextHolder} 中,通过 {@link InvocableHandlerMethod} 实现 - * - * @author 芋道源码 - */ -public class TenantKafkaProducerInterceptor implements ProducerInterceptor { - - @Override - public ProducerRecord onSend(ProducerRecord record) { - Long tenantId = TenantContextHolder.getTenantId(); - if (tenantId != null) { - Headers headers = (Headers) ReflectUtil.getFieldValue(record, "headers"); // private 属性,没有 get 方法,智能反射 - headers.add(HEADER_TENANT_ID, tenantId.toString().getBytes()); - } - return record; - } - - @Override - public void onAcknowledgement(RecordMetadata metadata, Exception exception) { - } - - @Override - public void close() { - } - - @Override - public void configure(Map configs) { - } - -} diff --git a/tashow-framework/tashow-framework-tenant/src/main/java/com/tashow/cloud/tenant/core/mq/package-info.java b/tashow-framework/tashow-framework-tenant/src/main/java/com/tashow/cloud/tenant/core/mq/package-info.java new file mode 100644 index 0000000..3ba04e3 --- /dev/null +++ b/tashow-framework/tashow-framework-tenant/src/main/java/com/tashow/cloud/tenant/core/mq/package-info.java @@ -0,0 +1 @@ +package com.tashow.cloud.tenant.core.mq; \ No newline at end of file diff --git a/tashow-framework/tashow-framework-tenant/src/main/java/com/tashow/cloud/tenant/core/mq/redis/TenantRedisMessageInterceptor.java b/tashow-framework/tashow-framework-tenant/src/main/java/com/tashow/cloud/tenant/core/mq/redis/TenantRedisMessageInterceptor.java deleted file mode 100644 index e4f6e91..0000000 --- a/tashow-framework/tashow-framework-tenant/src/main/java/com/tashow/cloud/tenant/core/mq/redis/TenantRedisMessageInterceptor.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.tashow.cloud.tenant.core.mq.redis; - -import cn.hutool.core.util.StrUtil; -import com.tashow.cloud.mq.redis.core.interceptor.RedisMessageInterceptor; -import com.tashow.cloud.mq.redis.core.message.AbstractRedisMessage; -import com.tashow.cloud.tenant.core.context.TenantContextHolder; - -import static com.tashow.cloud.web.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID; - - -/** - * 多租户 {@link AbstractRedisMessage} 拦截器 - * - * 1. Producer 发送消息时,将 {@link TenantContextHolder} 租户编号,添加到消息的 Header 中 - * 2. Consumer 消费消息时,将消息的 Header 的租户编号,添加到 {@link TenantContextHolder} 中 - * - * @author 芋道源码 - */ -public class TenantRedisMessageInterceptor implements RedisMessageInterceptor { - - @Override - public void sendMessageBefore(AbstractRedisMessage message) { - Long tenantId = TenantContextHolder.getTenantId(); - if (tenantId != null) { - message.addHeader(HEADER_TENANT_ID, tenantId.toString()); - } - } - - @Override - public void consumeMessageBefore(AbstractRedisMessage message) { - String tenantIdStr = message.getHeader(HEADER_TENANT_ID); - if (StrUtil.isNotEmpty(tenantIdStr)) { - TenantContextHolder.setTenantId(Long.valueOf(tenantIdStr)); - } - } - - @Override - public void consumeMessageAfter(AbstractRedisMessage message) { - // 注意,Consumer 是一个逻辑的入口,所以不考虑原本上下文就存在租户编号的情况 - TenantContextHolder.clear(); - } - -} diff --git a/tashow-framework/tashow-framework-tenant/src/main/java/com/tashow/cloud/tenant/core/mq/rocketmq/TenantRocketMQConsumeMessageHook.java b/tashow-framework/tashow-framework-tenant/src/main/java/com/tashow/cloud/tenant/core/mq/rocketmq/TenantRocketMQConsumeMessageHook.java deleted file mode 100644 index 83b4f5b..0000000 --- a/tashow-framework/tashow-framework-tenant/src/main/java/com/tashow/cloud/tenant/core/mq/rocketmq/TenantRocketMQConsumeMessageHook.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.tashow.cloud.tenant.core.mq.rocketmq; - -import cn.hutool.core.lang.Assert; -import cn.hutool.core.util.StrUtil; -import com.tashow.cloud.tenant.core.context.TenantContextHolder; -import org.apache.rocketmq.client.hook.ConsumeMessageContext; -import org.apache.rocketmq.client.hook.ConsumeMessageHook; -import org.apache.rocketmq.common.message.MessageExt; -import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; - -import java.util.List; - -import static com.tashow.cloud.web.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID; - - -/** - * RocketMQ 消息队列的多租户 {@link ConsumeMessageHook} 实现类 - * - * Consumer 消费消息时,将消息的 Header 的租户编号,添加到 {@link TenantContextHolder} 中,通过 {@link InvocableHandlerMethod} 实现 - * - * @author 芋道源码 - */ -public class TenantRocketMQConsumeMessageHook implements ConsumeMessageHook { - - @Override - public String hookName() { - return getClass().getSimpleName(); - } - - @Override - public void consumeMessageBefore(ConsumeMessageContext context) { - // 校验,消息必须是单条,不然设置租户可能不正确 - List messages = context.getMsgList(); - Assert.isTrue(messages.size() == 1, "消息条数({})不正确", messages.size()); - // 设置租户编号 - String tenantId = messages.get(0).getUserProperty(HEADER_TENANT_ID); - if (StrUtil.isNotEmpty(tenantId)) { - TenantContextHolder.setTenantId(Long.parseLong(tenantId)); - } - } - - @Override - public void consumeMessageAfter(ConsumeMessageContext context) { - TenantContextHolder.clear(); - } - -} diff --git a/tashow-framework/tashow-framework-tenant/src/main/java/com/tashow/cloud/tenant/core/mq/rocketmq/TenantRocketMQInitializer.java b/tashow-framework/tashow-framework-tenant/src/main/java/com/tashow/cloud/tenant/core/mq/rocketmq/TenantRocketMQInitializer.java deleted file mode 100644 index 601ede5..0000000 --- a/tashow-framework/tashow-framework-tenant/src/main/java/com/tashow/cloud/tenant/core/mq/rocketmq/TenantRocketMQInitializer.java +++ /dev/null @@ -1,53 +0,0 @@ -package com.tashow.cloud.tenant.core.mq.rocketmq; - -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; -import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; -import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.spring.core.RocketMQTemplate; -import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer; -import org.springframework.beans.BeansException; -import org.springframework.beans.factory.config.BeanPostProcessor; - -/** - * 多租户的 RocketMQ 初始化器 - * - * @author 芋道源码 - */ -public class TenantRocketMQInitializer implements BeanPostProcessor { - - @Override - public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { - if (bean instanceof DefaultRocketMQListenerContainer) { - DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean; - initTenantConsumer(container.getConsumer()); - } else if (bean instanceof RocketMQTemplate) { - RocketMQTemplate template = (RocketMQTemplate) bean; - initTenantProducer(template.getProducer()); - } - return bean; - } - - private void initTenantProducer(DefaultMQProducer producer) { - if (producer == null) { - return; - } - DefaultMQProducerImpl producerImpl = producer.getDefaultMQProducerImpl(); - if (producerImpl == null) { - return; - } - producerImpl.registerSendMessageHook(new TenantRocketMQSendMessageHook()); - } - - private void initTenantConsumer(DefaultMQPushConsumer consumer) { - if (consumer == null) { - return; - } - DefaultMQPushConsumerImpl consumerImpl = consumer.getDefaultMQPushConsumerImpl(); - if (consumerImpl == null) { - return; - } - consumerImpl.registerConsumeMessageHook(new TenantRocketMQConsumeMessageHook()); - } - -} diff --git a/tashow-framework/tashow-framework-tenant/src/main/java/com/tashow/cloud/tenant/core/mq/rocketmq/TenantRocketMQSendMessageHook.java b/tashow-framework/tashow-framework-tenant/src/main/java/com/tashow/cloud/tenant/core/mq/rocketmq/TenantRocketMQSendMessageHook.java deleted file mode 100644 index fc99bcb..0000000 --- a/tashow-framework/tashow-framework-tenant/src/main/java/com/tashow/cloud/tenant/core/mq/rocketmq/TenantRocketMQSendMessageHook.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.tashow.cloud.tenant.core.mq.rocketmq; - -import com.tashow.cloud.tenant.core.context.TenantContextHolder; -import org.apache.rocketmq.client.hook.SendMessageContext; -import org.apache.rocketmq.client.hook.SendMessageHook; - -import static com.tashow.cloud.web.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID; - - -/** - * RocketMQ 消息队列的多租户 {@link SendMessageHook} 实现类 - * - * Producer 发送消息时,将 {@link TenantContextHolder} 租户编号,添加到消息的 Header 中 - * - * @author 芋道源码 - */ -public class TenantRocketMQSendMessageHook implements SendMessageHook { - - @Override - public String hookName() { - return getClass().getSimpleName(); - } - - @Override - public void sendMessageBefore(SendMessageContext sendMessageContext) { - Long tenantId = TenantContextHolder.getTenantId(); - if (tenantId == null) { - return; - } - sendMessageContext.getMessage().putUserProperty(HEADER_TENANT_ID, tenantId.toString()); - } - - @Override - public void sendMessageAfter(SendMessageContext sendMessageContext) { - } - -} diff --git a/tashow-framework/tashow-framework-tenant/src/main/resources/META-INF/spring.factories b/tashow-framework/tashow-framework-tenant/src/main/resources/META-INF/spring.factories deleted file mode 100644 index ddf705c..0000000 --- a/tashow-framework/tashow-framework-tenant/src/main/resources/META-INF/spring.factories +++ /dev/null @@ -1,2 +0,0 @@ -org.springframework.boot.env.EnvironmentPostProcessor=\ - com.tashow.cloud.tenant.core.mq.kafka.TenantKafkaEnvironmentPostProcessor diff --git a/tashow-framework/tashow-framework-tenant/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/tashow-framework/tashow-framework-tenant/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports deleted file mode 100644 index 3ec3751..0000000 --- a/tashow-framework/tashow-framework-tenant/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ /dev/null @@ -1,2 +0,0 @@ -com.tashow.cloud.tenant.config.TenantRpcAutoConfiguration -com.tashow.cloud.tenant.config.TenantAutoConfiguration diff --git a/tashow-framework/tashow-framework-websocket/pom.xml b/tashow-framework/tashow-framework-websocket/pom.xml index 3e234e0..4757c29 100644 --- a/tashow-framework/tashow-framework-websocket/pom.xml +++ b/tashow-framework/tashow-framework-websocket/pom.xml @@ -42,21 +42,11 @@ com.tashow.cloud tashow-framework-mq - - org.springframework.kafka - spring-kafka - true - org.springframework.amqp spring-rabbit true - - org.apache.rocketmq - rocketmq-spring-boot-starter - true - diff --git a/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/config/WebSocketAutoConfiguration.java b/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/config/WebSocketAutoConfiguration.java index 639719a..ef7d63f 100644 --- a/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/config/WebSocketAutoConfiguration.java +++ b/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/config/WebSocketAutoConfiguration.java @@ -1,33 +1,22 @@ package com.tashow.cloud.websocket.config; -import com.tashow.cloud.mq.redis.config.RedisMQConsumerAutoConfiguration; -import com.tashow.cloud.mq.redis.core.RedisMQTemplate; import com.tashow.cloud.websocket.core.handler.JsonWebSocketMessageHandler; import com.tashow.cloud.websocket.core.listener.WebSocketMessageListener; import com.tashow.cloud.websocket.core.security.LoginUserHandshakeInterceptor; import com.tashow.cloud.websocket.core.security.WebSocketAuthorizeRequestsCustomizer; -import com.tashow.cloud.websocket.core.sender.kafka.KafkaWebSocketMessageConsumer; -import com.tashow.cloud.websocket.core.sender.kafka.KafkaWebSocketMessageSender; import com.tashow.cloud.websocket.core.sender.local.LocalWebSocketMessageSender; import com.tashow.cloud.websocket.core.sender.rabbitmq.RabbitMQWebSocketMessageConsumer; import com.tashow.cloud.websocket.core.sender.rabbitmq.RabbitMQWebSocketMessageSender; -import com.tashow.cloud.websocket.core.sender.redis.RedisWebSocketMessageConsumer; -import com.tashow.cloud.websocket.core.sender.redis.RedisWebSocketMessageSender; -import com.tashow.cloud.websocket.core.sender.rocketmq.RocketMQWebSocketMessageConsumer; -import com.tashow.cloud.websocket.core.sender.rocketmq.RocketMQWebSocketMessageSender; import com.tashow.cloud.websocket.core.session.WebSocketSessionHandlerDecorator; import com.tashow.cloud.websocket.core.session.WebSocketSessionManager; import com.tashow.cloud.websocket.core.session.WebSocketSessionManagerImpl; -import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; @@ -40,9 +29,8 @@ import java.util.List; * * @author xingyu4j */ -@AutoConfiguration(before = RedisMQConsumerAutoConfiguration.class) // before YudaoRedisMQConsumerAutoConfiguration 的原因是,需要保证 RedisWebSocketMessageConsumer 先创建,才能创建 RedisMessageListenerContainer @EnableWebSocket // 开启 websocket -@ConditionalOnProperty(prefix = "yudao.websocket", value = "enable", matchIfMissing = true) // 允许使用 yudao.websocket.enable=false 禁用 websocket +@ConditionalOnProperty(prefix = "tashow.websocket", value = "enable", matchIfMissing = true) // 允许使用 yudao.websocket.enable=false 禁用 websocket @EnableConfigurationProperties(WebSocketProperties.class) public class WebSocketAutoConfiguration { @@ -96,44 +84,7 @@ public class WebSocketAutoConfiguration { } @Configuration - @ConditionalOnProperty(prefix = "tashow.websocket", name = "sender-type", havingValue = "redis") - public class RedisWebSocketMessageSenderConfiguration { - - @Bean - public RedisWebSocketMessageSender redisWebSocketMessageSender(WebSocketSessionManager sessionManager, - RedisMQTemplate redisMQTemplate) { - return new RedisWebSocketMessageSender(sessionManager, redisMQTemplate); - } - - @Bean - public RedisWebSocketMessageConsumer redisWebSocketMessageConsumer( - RedisWebSocketMessageSender redisWebSocketMessageSender) { - return new RedisWebSocketMessageConsumer(redisWebSocketMessageSender); - } - - } - - @Configuration - @ConditionalOnProperty(prefix = "tashow.websocket", name = "sender-type", havingValue = "rocketmq") - public class RocketMQWebSocketMessageSenderConfiguration { - - @Bean - public RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender( - WebSocketSessionManager sessionManager, RocketMQTemplate rocketMQTemplate, - @Value("${yudao.websocket.sender-rocketmq.topic}") String topic) { - return new RocketMQWebSocketMessageSender(sessionManager, rocketMQTemplate, topic); - } - - @Bean - public RocketMQWebSocketMessageConsumer rocketMQWebSocketMessageConsumer( - RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender) { - return new RocketMQWebSocketMessageConsumer(rocketMQWebSocketMessageSender); - } - - } - - @Configuration - @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "rabbitmq") + @ConditionalOnProperty(prefix = "tashow.websocket", name = "sender-type", havingValue = "rabbitmq") public class RabbitMQWebSocketMessageSenderConfiguration { @Bean @@ -153,7 +104,7 @@ public class WebSocketAutoConfiguration { * 创建 Topic Exchange */ @Bean - public TopicExchange websocketTopicExchange(@Value("${yudao.websocket.sender-rabbitmq.exchange}") String exchange) { + public TopicExchange websocketTopicExchange(@Value("${tashow.websocket.sender-rabbitmq.exchange}") String exchange) { return new TopicExchange(exchange, true, // durable: 是否持久化 false); // exclusive: 是否排它 @@ -161,23 +112,4 @@ public class WebSocketAutoConfiguration { } - @Configuration - @ConditionalOnProperty(prefix = "tashow.websocket", name = "sender-type", havingValue = "kafka") - public class KafkaWebSocketMessageSenderConfiguration { - - @Bean - public KafkaWebSocketMessageSender kafkaWebSocketMessageSender( - WebSocketSessionManager sessionManager, KafkaTemplate kafkaTemplate, - @Value("${yudao.websocket.sender-kafka.topic}") String topic) { - return new KafkaWebSocketMessageSender(sessionManager, kafkaTemplate, topic); - } - - @Bean - public KafkaWebSocketMessageConsumer kafkaWebSocketMessageConsumer( - KafkaWebSocketMessageSender kafkaWebSocketMessageSender) { - return new KafkaWebSocketMessageConsumer(kafkaWebSocketMessageSender); - } - - } - } diff --git a/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/kafka/KafkaWebSocketMessage.java b/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/kafka/KafkaWebSocketMessage.java deleted file mode 100644 index 9ba2d90..0000000 --- a/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/kafka/KafkaWebSocketMessage.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.tashow.cloud.websocket.core.sender.kafka; - -import lombok.Data; - -/** - * Kafka 广播 WebSocket 的消息 - * - * @author 芋道源码 - */ -@Data -public class KafkaWebSocketMessage { - - /** - * Session 编号 - */ - private String sessionId; - /** - * 用户类型 - */ - private Integer userType; - /** - * 用户编号 - */ - private Long userId; - - /** - * 消息类型 - */ - private String messageType; - /** - * 消息内容 - */ - private String messageContent; - -} diff --git a/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/kafka/KafkaWebSocketMessageConsumer.java b/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/kafka/KafkaWebSocketMessageConsumer.java deleted file mode 100644 index 44b13c2..0000000 --- a/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/kafka/KafkaWebSocketMessageConsumer.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.tashow.cloud.websocket.core.sender.kafka; - -import lombok.RequiredArgsConstructor; -import org.springframework.amqp.rabbit.annotation.RabbitHandler; -import org.springframework.kafka.annotation.KafkaListener; - -/** - * {@link KafkaWebSocketMessage} 广播消息的消费者,真正把消息发送出去 - * - * @author 芋道源码 - */ -@RequiredArgsConstructor -public class KafkaWebSocketMessageConsumer { - - private final KafkaWebSocketMessageSender kafkaWebSocketMessageSender; - - @RabbitHandler - @KafkaListener( - topics = "${yudao.websocket.sender-kafka.topic}", - // 在 Group 上,使用 UUID 生成其后缀。这样,启动的 Consumer 的 Group 不同,以达到广播消费的目的 - groupId = "${yudao.websocket.sender-kafka.consumer-group}" + "-" + "#{T(java.util.UUID).randomUUID()}") - public void onMessage(KafkaWebSocketMessage message) { - kafkaWebSocketMessageSender.send(message.getSessionId(), - message.getUserType(), message.getUserId(), - message.getMessageType(), message.getMessageContent()); - } - -} diff --git a/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/kafka/KafkaWebSocketMessageSender.java b/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/kafka/KafkaWebSocketMessageSender.java deleted file mode 100644 index ccbd632..0000000 --- a/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/kafka/KafkaWebSocketMessageSender.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.tashow.cloud.websocket.core.sender.kafka; - -import com.tashow.cloud.websocket.core.sender.AbstractWebSocketMessageSender; -import com.tashow.cloud.websocket.core.sender.WebSocketMessageSender; -import com.tashow.cloud.websocket.core.session.WebSocketSessionManager; -import lombok.extern.slf4j.Slf4j; -import org.springframework.kafka.core.KafkaTemplate; - -import java.util.concurrent.ExecutionException; - -/** - * 基于 Kafka 的 {@link WebSocketMessageSender} 实现类 - * - * @author 芋道源码 - */ -@Slf4j -public class KafkaWebSocketMessageSender extends AbstractWebSocketMessageSender { - - private final KafkaTemplate kafkaTemplate; - - private final String topic; - - public KafkaWebSocketMessageSender(WebSocketSessionManager sessionManager, - KafkaTemplate kafkaTemplate, - String topic) { - super(sessionManager); - this.kafkaTemplate = kafkaTemplate; - this.topic = topic; - } - - @Override - public void send(Integer userType, Long userId, String messageType, String messageContent) { - sendKafkaMessage(null, userId, userType, messageType, messageContent); - } - - @Override - public void send(Integer userType, String messageType, String messageContent) { - sendKafkaMessage(null, null, userType, messageType, messageContent); - } - - @Override - public void send(String sessionId, String messageType, String messageContent) { - sendKafkaMessage(sessionId, null, null, messageType, messageContent); - } - - /** - * 通过 Kafka 广播消息 - * - * @param sessionId Session 编号 - * @param userId 用户编号 - * @param userType 用户类型 - * @param messageType 消息类型 - * @param messageContent 消息内容 - */ - private void sendKafkaMessage(String sessionId, Long userId, Integer userType, - String messageType, String messageContent) { - KafkaWebSocketMessage mqMessage = new KafkaWebSocketMessage() - .setSessionId(sessionId).setUserId(userId).setUserType(userType) - .setMessageType(messageType).setMessageContent(messageContent); - try { - kafkaTemplate.send(topic, mqMessage).get(); - } catch (InterruptedException | ExecutionException e) { - log.error("[sendKafkaMessage][发送消息({}) 到 Kafka 失败]", mqMessage, e); - } - } - -} diff --git a/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageConsumer.java b/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageConsumer.java index 9de0e64..3f30c80 100644 --- a/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageConsumer.java +++ b/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageConsumer.java @@ -13,12 +13,12 @@ import org.springframework.amqp.rabbit.annotation.*; bindings = @QueueBinding( value = @Queue( // 在 Queue 的名字上,使用 UUID 生成其后缀。这样,启动的 Consumer 的 Queue 不同,以达到广播消费的目的 - name = "${yudao.websocket.sender-rabbitmq.queue}" + "-" + "#{T(java.util.UUID).randomUUID()}", + name = "${tashow.websocket.sender-rabbitmq.queue}" + "-" + "#{T(java.util.UUID).randomUUID()}", // Consumer 关闭时,该队列就可以被自动删除了 autoDelete = "true" ), exchange = @Exchange( - name = "${yudao.websocket.sender-rabbitmq.exchange}", + name = "${tashow.websocket.sender-rabbitmq.exchange}", type = ExchangeTypes.TOPIC, declare = "false" ) diff --git a/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/redis/RedisWebSocketMessage.java b/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/redis/RedisWebSocketMessage.java deleted file mode 100644 index 68d694b..0000000 --- a/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/redis/RedisWebSocketMessage.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.tashow.cloud.websocket.core.sender.redis; - -import com.tashow.cloud.mq.redis.core.pubsub.AbstractRedisChannelMessage; -import lombok.Data; - -/** - * Redis 广播 WebSocket 的消息 - */ -@Data -public class RedisWebSocketMessage extends AbstractRedisChannelMessage { - - /** - * Session 编号 - */ - private String sessionId; - /** - * 用户类型 - */ - private Integer userType; - /** - * 用户编号 - */ - private Long userId; - - /** - * 消息类型 - */ - private String messageType; - /** - * 消息内容 - */ - private String messageContent; - -} diff --git a/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/redis/RedisWebSocketMessageConsumer.java b/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/redis/RedisWebSocketMessageConsumer.java deleted file mode 100644 index 0991062..0000000 --- a/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/redis/RedisWebSocketMessageConsumer.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.tashow.cloud.websocket.core.sender.redis; - -import com.tashow.cloud.mq.redis.core.pubsub.AbstractRedisChannelMessageListener; -import lombok.RequiredArgsConstructor; - -/** - * {@link RedisWebSocketMessage} 广播消息的消费者,真正把消息发送出去 - * - * @author 芋道源码 - */ -@RequiredArgsConstructor -public class RedisWebSocketMessageConsumer extends AbstractRedisChannelMessageListener { - - private final RedisWebSocketMessageSender redisWebSocketMessageSender; - - @Override - public void onMessage(RedisWebSocketMessage message) { - redisWebSocketMessageSender.send(message.getSessionId(), - message.getUserType(), message.getUserId(), - message.getMessageType(), message.getMessageContent()); - } - -} diff --git a/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/redis/RedisWebSocketMessageSender.java b/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/redis/RedisWebSocketMessageSender.java deleted file mode 100644 index d31e5a1..0000000 --- a/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/redis/RedisWebSocketMessageSender.java +++ /dev/null @@ -1,57 +0,0 @@ -package com.tashow.cloud.websocket.core.sender.redis; - -import com.tashow.cloud.mq.redis.core.RedisMQTemplate; -import com.tashow.cloud.websocket.core.sender.AbstractWebSocketMessageSender; -import com.tashow.cloud.websocket.core.sender.WebSocketMessageSender; -import com.tashow.cloud.websocket.core.session.WebSocketSessionManager; -import lombok.extern.slf4j.Slf4j; - -/** - * 基于 Redis 的 {@link WebSocketMessageSender} 实现类 - * - * @author 芋道源码 - */ -@Slf4j -public class RedisWebSocketMessageSender extends AbstractWebSocketMessageSender { - - private final RedisMQTemplate redisMQTemplate; - - public RedisWebSocketMessageSender(WebSocketSessionManager sessionManager, - RedisMQTemplate redisMQTemplate) { - super(sessionManager); - this.redisMQTemplate = redisMQTemplate; - } - - @Override - public void send(Integer userType, Long userId, String messageType, String messageContent) { - sendRedisMessage(null, userId, userType, messageType, messageContent); - } - - @Override - public void send(Integer userType, String messageType, String messageContent) { - sendRedisMessage(null, null, userType, messageType, messageContent); - } - - @Override - public void send(String sessionId, String messageType, String messageContent) { - sendRedisMessage(sessionId, null, null, messageType, messageContent); - } - - /** - * 通过 Redis 广播消息 - * - * @param sessionId Session 编号 - * @param userId 用户编号 - * @param userType 用户类型 - * @param messageType 消息类型 - * @param messageContent 消息内容 - */ - private void sendRedisMessage(String sessionId, Long userId, Integer userType, - String messageType, String messageContent) { - RedisWebSocketMessage mqMessage = new RedisWebSocketMessage() - .setSessionId(sessionId).setUserId(userId).setUserType(userType) - .setMessageType(messageType).setMessageContent(messageContent); - redisMQTemplate.send(mqMessage); - } - -} diff --git a/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/rocketmq/RocketMQWebSocketMessage.java b/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/rocketmq/RocketMQWebSocketMessage.java deleted file mode 100644 index acda3aa..0000000 --- a/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/rocketmq/RocketMQWebSocketMessage.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.tashow.cloud.websocket.core.sender.rocketmq; - -import lombok.Data; - -/** - * RocketMQ 广播 WebSocket 的消息 - * - * @author 芋道源码 - */ -@Data -public class RocketMQWebSocketMessage { - - /** - * Session 编号 - */ - private String sessionId; - /** - * 用户类型 - */ - private Integer userType; - /** - * 用户编号 - */ - private Long userId; - - /** - * 消息类型 - */ - private String messageType; - /** - * 消息内容 - */ - private String messageContent; - -} diff --git a/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/rocketmq/RocketMQWebSocketMessageConsumer.java b/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/rocketmq/RocketMQWebSocketMessageConsumer.java deleted file mode 100644 index 09c012e..0000000 --- a/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/rocketmq/RocketMQWebSocketMessageConsumer.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.tashow.cloud.websocket.core.sender.rocketmq; - -import lombok.RequiredArgsConstructor; -import org.apache.rocketmq.spring.annotation.MessageModel; -import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; -import org.apache.rocketmq.spring.core.RocketMQListener; - -/** - * {@link RocketMQWebSocketMessage} 广播消息的消费者,真正把消息发送出去 - * - * @author 芋道源码 - */ -@RocketMQMessageListener( // 重点:添加 @RocketMQMessageListener 注解,声明消费的 topic - topic = "${yudao.websocket.sender-rocketmq.topic}", - consumerGroup = "${yudao.websocket.sender-rocketmq.consumer-group}", - messageModel = MessageModel.BROADCASTING // 设置为广播模式,保证每个实例都能收到消息 -) -@RequiredArgsConstructor -public class RocketMQWebSocketMessageConsumer implements RocketMQListener { - - private final RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender; - - @Override - public void onMessage(RocketMQWebSocketMessage message) { - rocketMQWebSocketMessageSender.send(message.getSessionId(), - message.getUserType(), message.getUserId(), - message.getMessageType(), message.getMessageContent()); - } - -} diff --git a/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/rocketmq/RocketMQWebSocketMessageSender.java b/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/rocketmq/RocketMQWebSocketMessageSender.java deleted file mode 100644 index d097156..0000000 --- a/tashow-framework/tashow-framework-websocket/src/main/java/com/tashow/cloud/websocket/core/sender/rocketmq/RocketMQWebSocketMessageSender.java +++ /dev/null @@ -1,64 +0,0 @@ -package com.tashow.cloud.websocket.core.sender.rocketmq; - -import com.tashow.cloud.websocket.core.sender.AbstractWebSocketMessageSender; -import com.tashow.cloud.websocket.core.sender.WebSocketMessageSender; -import com.tashow.cloud.websocket.core.session.WebSocketSessionManager; -import com.tashow.cloud.websocket.core.sender.AbstractWebSocketMessageSender; -import com.tashow.cloud.websocket.core.sender.WebSocketMessageSender; -import com.tashow.cloud.websocket.core.session.WebSocketSessionManager; -import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.spring.core.RocketMQTemplate; - -/** - * 基于 RocketMQ 的 {@link WebSocketMessageSender} 实现类 - * - * @author 芋道源码 - */ -@Slf4j -public class RocketMQWebSocketMessageSender extends AbstractWebSocketMessageSender { - - private final RocketMQTemplate rocketMQTemplate; - - private final String topic; - - public RocketMQWebSocketMessageSender(WebSocketSessionManager sessionManager, - RocketMQTemplate rocketMQTemplate, - String topic) { - super(sessionManager); - this.rocketMQTemplate = rocketMQTemplate; - this.topic = topic; - } - - @Override - public void send(Integer userType, Long userId, String messageType, String messageContent) { - sendRocketMQMessage(null, userId, userType, messageType, messageContent); - } - - @Override - public void send(Integer userType, String messageType, String messageContent) { - sendRocketMQMessage(null, null, userType, messageType, messageContent); - } - - @Override - public void send(String sessionId, String messageType, String messageContent) { - sendRocketMQMessage(sessionId, null, null, messageType, messageContent); - } - - /** - * 通过 RocketMQ 广播消息 - * - * @param sessionId Session 编号 - * @param userId 用户编号 - * @param userType 用户类型 - * @param messageType 消息类型 - * @param messageContent 消息内容 - */ - private void sendRocketMQMessage(String sessionId, Long userId, Integer userType, - String messageType, String messageContent) { - RocketMQWebSocketMessage mqMessage = new RocketMQWebSocketMessage() - .setSessionId(sessionId).setUserId(userId).setUserType(userType) - .setMessageType(messageType).setMessageContent(messageContent); - rocketMQTemplate.syncSend(topic, mqMessage); - } - -} diff --git a/tashow-module/pom.xml b/tashow-module/pom.xml index 24e63d2..37c811b 100644 --- a/tashow-module/pom.xml +++ b/tashow-module/pom.xml @@ -13,6 +13,7 @@ tashow-module-system tashow-module-infra + tashow-module-app diff --git a/tashow-module/tashow-module-app/pom.xml b/tashow-module/tashow-module-app/pom.xml new file mode 100644 index 0000000..dfa0c20 --- /dev/null +++ b/tashow-module/tashow-module-app/pom.xml @@ -0,0 +1,32 @@ + + 4.0.0 + + com.tashow.cloud + tashow-module + ${revision} + + + tashow-module-app + jar + + + + + com.tashow.cloud + tashow-framework-env + + + com.tashow.cloud + tashow-framework-websocket + + + com.tashow.cloud + tashow-data-redis + + + com.tashow.cloud + tashow-framework-security + + + diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/AppServerApplication.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/AppServerApplication.java new file mode 100644 index 0000000..24f14a9 --- /dev/null +++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/AppServerApplication.java @@ -0,0 +1,16 @@ +package com.tashow.cloud.app; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * Hello world! + * + */ +@SpringBootApplication +public class AppServerApplication { + + public static void main(String[] args) { + SpringApplication.run(AppServerApplication.class, args); + } +} diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/controller/LoginController.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/controller/LoginController.java new file mode 100644 index 0000000..216bbfe --- /dev/null +++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/controller/LoginController.java @@ -0,0 +1,4 @@ +package com.tashow.cloud.app.controller; + +public class LoginController { +} diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/dal/dataobject/package-info.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/dal/dataobject/package-info.java new file mode 100644 index 0000000..fffab69 --- /dev/null +++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/dal/dataobject/package-info.java @@ -0,0 +1,2 @@ +package com.tashow.cloud.app.dal.dataobject; +// 数据库对象 \ No newline at end of file diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/dal/dto/package-info.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/dal/dto/package-info.java new file mode 100644 index 0000000..0cc6509 --- /dev/null +++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/dal/dto/package-info.java @@ -0,0 +1,2 @@ +package com.tashow.cloud.app.dal.dto; +// 视图层与业务层传输对象 \ No newline at end of file diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/dal/package-info.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/dal/package-info.java new file mode 100644 index 0000000..8f128a7 --- /dev/null +++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/dal/package-info.java @@ -0,0 +1 @@ +package com.tashow.cloud.app.dal; \ No newline at end of file diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/dal/vo/package-info.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/dal/vo/package-info.java new file mode 100644 index 0000000..94b57c5 --- /dev/null +++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/dal/vo/package-info.java @@ -0,0 +1,2 @@ +package com.tashow.cloud.app.dal.vo; +// 视图参数接收 \ No newline at end of file diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/security/config/SecurityConfiguration.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/security/config/SecurityConfiguration.java new file mode 100644 index 0000000..693e32b --- /dev/null +++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/security/config/SecurityConfiguration.java @@ -0,0 +1,50 @@ +package com.tashow.cloud.app.security.config; + +import com.tashow.cloud.infraapi.enums.ApiConstants; +import com.tashow.cloud.security.security.config.AuthorizeRequestsCustomizer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.security.config.annotation.web.builders.HttpSecurity; +import org.springframework.security.config.annotation.web.configurers.AuthorizeHttpRequestsConfigurer; + +/** + * Infra 模块的 Security 配置 + */ +@Configuration(proxyBeanMethods = false, value = "infraSecurityConfiguration") +public class SecurityConfiguration { + + @Value("${spring.boot.admin.context-path:''}") + private String adminSeverContextPath; + + @Bean("infraAuthorizeRequestsCustomizer") + public AuthorizeRequestsCustomizer authorizeRequestsCustomizer() { + return new AuthorizeRequestsCustomizer() { + + @Override + public void customize(AuthorizeHttpRequestsConfigurer.AuthorizationManagerRequestMatcherRegistry registry) { + // Swagger 接口文档 + registry.requestMatchers("/v3/api-docs/**").permitAll() + .requestMatchers("/webjars/**").permitAll() + .requestMatchers("/swagger-ui").permitAll() + .requestMatchers("/swagger-ui/**").permitAll(); + // Spring Boot Actuator 的安全配置 + registry.requestMatchers("/actuator").permitAll() + .requestMatchers("/actuator/**").permitAll(); + // Druid 监控 + registry.requestMatchers("/druid/**").permitAll(); + // Spring Boot Admin Server 的安全配置 + registry.requestMatchers(adminSeverContextPath).permitAll() + .requestMatchers(adminSeverContextPath + "/**").permitAll(); + // 文件读取 + registry.requestMatchers(buildAdminApi("/infra/file/*/get/**")).permitAll(); + + // TODO 芋艿:这个每个项目都需要重复配置,得捉摸有没通用的方案 + // RPC 服务的安全配置 + registry.requestMatchers(ApiConstants.PREFIX + "/**").permitAll(); + } + + }; + } + +} diff --git a/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/security/core/package-info.java b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/security/core/package-info.java new file mode 100644 index 0000000..93a4969 --- /dev/null +++ b/tashow-module/tashow-module-app/src/main/java/com/tashow/cloud/app/security/core/package-info.java @@ -0,0 +1,4 @@ +/** + * 占位 + */ +package com.tashow.cloud.app.security.core; diff --git a/tashow-module/tashow-module-app/src/main/resources/application-local.yaml b/tashow-module/tashow-module-app/src/main/resources/application-local.yaml new file mode 100644 index 0000000..06fa7ed --- /dev/null +++ b/tashow-module/tashow-module-app/src/main/resources/application-local.yaml @@ -0,0 +1,16 @@ +--- #################### 注册中心 + 配置中心相关配置 #################### + +spring: + cloud: + nacos: + server-addr: 43.139.42.137:8848 # Nacos 服务器地址 + username: nacos # Nacos 账号 + password: nacos # Nacos 密码 + discovery: # 【配置中心】配置项 + namespace: dev # 命名空间。这里使用 dev 开发环境 + group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP + metadata: + version: 1.0.0 # 服务实例的版本号,可用于灰度发布 + config: # 【注册中心】配置项 + namespace: dev # 命名空间。这里使用 dev 开发环境 + group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP diff --git a/tashow-module/tashow-module-app/src/main/resources/application.yaml b/tashow-module/tashow-module-app/src/main/resources/application.yaml new file mode 100644 index 0000000..e9de0d5 --- /dev/null +++ b/tashow-module/tashow-module-app/src/main/resources/application.yaml @@ -0,0 +1,12 @@ +server: + port: 48083 +spring: + application: + name: app-server + profiles: + active: local + config: + import: + - optional:classpath:application-${spring.profiles.active}.yaml # 加载【本地】配置 + - optional:nacos:application.yaml # 加载【Nacos】的配置 + - optional:nacos:${spring.application.name}-${spring.profiles.active}.yaml # 加载【Nacos】的配置