Eric 2 年之前
父节点
当前提交
ea75135ea8

+ 56 - 0
mec-biz/src/main/java/com/ym/mec/biz/redisson/RedissonMessageService.java

@@ -0,0 +1,56 @@
+package com.ym.mec.biz.redisson;
+
+import lombok.extern.slf4j.Slf4j;
+import org.redisson.api.RTopic;
+import org.redisson.api.RedissonClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Redisson消息发布订阅服务
+ */
+@Slf4j
+@Service
+public class RedissonMessageService {
+
+    // 订阅消息通知
+    public static final String TOPIC_MESSAGE = "topic:message";
+    // 直播在线人数
+    public static final String LIVE_ROOM_MEMBER = "delayQueue:liveRoomMember";
+
+    private final RedissonClient redissonClient;
+
+    @Autowired
+    public RedissonMessageService(RedissonClient redissonClient) {
+
+        this.redissonClient = redissonClient;
+    }
+
+    /**
+     * 订阅消息
+     * @param topic 消息主题
+     * @param listener MessageListener
+     */
+    public void subscribe(String topic, MessageListener listener) {
+        RTopic messageTopic = redissonClient.getTopic(topic);
+        messageTopic.addListener(String.class, (channel, msg) -> listener.onMessage(msg));
+    }
+
+    /**
+     * 发布消息
+     * @param topic 消息主题
+     * @param message 消息内容
+     */
+    public void publish(String topic, String message) {
+        RTopic messageTopic = redissonClient.getTopic(topic);
+        messageTopic.publish(message);
+    }
+
+    /**
+     * 消息监听
+     */
+    public interface MessageListener {
+        void onMessage(String message);
+    }
+
+}

+ 59 - 0
mec-biz/src/main/java/com/ym/mec/biz/redisson/RedissonTopicListener.java

@@ -0,0 +1,59 @@
+package com.ym.mec.biz.redisson;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.redisson.api.RScoredSortedSet;
+import org.redisson.api.RedissonClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.core.Ordered;
+import org.springframework.stereotype.Service;
+
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * Redisson消息发布订阅服务监听
+ */
+@Slf4j
+@Service
+public class RedissonTopicListener implements ApplicationRunner, Ordered {
+ 
+    @Autowired
+    private RedissonMessageService redissonMessageService;
+    @Autowired
+    private RedissonClient redissonClient;
+ 
+    @Override
+    public void run(ApplicationArguments args) {
+
+        if (Objects.nonNull(redissonMessageService)) {
+
+            redissonMessageService.subscribe(RedissonMessageService.TOPIC_MESSAGE, (message) -> {
+                log.info("RedissonMessageService subscribe message={}", message);
+
+                RScoredSortedSet<String> sortedSet = redissonClient.getScoredSortedSet(RedissonMessageService.LIVE_ROOM_MEMBER);
+                if (!sortedSet.isExists()) {
+                    return;
+                }
+
+                // 取出RScoredSortedSet的值
+                Collection<String> objects = sortedSet.readAll();
+                log.info("RedissonTopicListener subscribe objects={}", objects.size());
+                if (CollectionUtils.isNotEmpty(objects)) {
+                    String roomUid = objects.stream().findFirst().orElse("");
+                    log.info("RedissonTopicListener subscribe roomUid={}", roomUid);
+                }
+                // 清除数据
+                sortedSet.clear();
+            });
+            log.info("---> RedissonMessageService subscribe success");
+        }
+    }
+
+    @Override
+    public int getOrder() {
+        return 1;
+    }
+}

+ 13 - 0
mec-biz/src/main/java/com/ym/mec/biz/service/impl/ImLiveBroadcastRoomServiceImpl.java

@@ -41,6 +41,7 @@ import com.ym.mec.biz.dal.enums.EOnOffStatus;
 import com.ym.mec.biz.dal.enums.MessageTypeEnum;
 import com.ym.mec.biz.dal.page.LiveRoomGoodsOrderQueryInfo;
 import com.ym.mec.biz.dal.vo.*;
+import com.ym.mec.biz.redisson.RedissonMessageService;
 import com.ym.mec.biz.service.*;
 import com.ym.mec.common.entity.ImRoomMessage;
 import com.ym.mec.common.entity.ImUserState;
@@ -62,6 +63,7 @@ import org.joda.time.DateTime;
 import org.redisson.api.RBucket;
 import org.redisson.api.RLock;
 import org.redisson.api.RMap;
+import org.redisson.api.RScoredSortedSet;
 import org.redisson.api.RedissonClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -118,6 +120,8 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
     private ImLiveRoomVideoService imLiveRoomVideoService;
     @Autowired
     private LivePluginContext livePluginContext;
+    @Autowired
+    private RedissonMessageService redissonMessageService;
 
     @Autowired
     private ImFeignService imFeignService;
@@ -1079,6 +1083,15 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
         // 直播间统计数据
         getRoomData(roomVo);
 
+        // 缓存JoinRoom用户信息到redis
+        RScoredSortedSet<String> sortedSet = redissonClient.getScoredSortedSet(RedissonMessageService.LIVE_ROOM_MEMBER);
+        sortedSet.add(System.currentTimeMillis(), roomUid);
+        // 设置缓存失效时间, 30分钟
+        redissonClient.getKeys().expire(RedissonMessageService.LIVE_ROOM_MEMBER, 30, TimeUnit.MINUTES);
+
+        // 发布删除缓存消息
+        redissonMessageService.publish(RedissonMessageService.TOPIC_MESSAGE, DateTime.now().toString());
+
         // 设置群组属性
         setGroupDefinedData(roomVo,EGroupDefinedDataType.MEMBER_ONLINE,roomVo.getLookNum().toString());
         setGroupDefinedData(roomVo,EGroupDefinedDataType.MEMBER_TOTAL,roomVo.getTotalLookNum().toString());