liujunchi 2 年之前
父节点
当前提交
21b0ca6fcd

+ 1 - 1
mec-biz/src/main/java/com/ym/mec/biz/redisson/RedissonMessageService.java

@@ -16,7 +16,7 @@ public class RedissonMessageService {
     // 订阅消息通知
     public static final String TOPIC_MESSAGE = "topic:message";
     // 直播在线人数
-    public static final String LIVE_ROOM_MEMBER = "delayQueue:liveRoomMember";
+    public static final String LIVE_ROOM_MEMBER = "delayQueue:liveRoomMember:";
 
     private final RedissonClient redissonClient;
 

+ 28 - 11
mec-biz/src/main/java/com/ym/mec/biz/redisson/RedissonTopicListener.java

@@ -1,7 +1,10 @@
 package com.ym.mec.biz.redisson;
 
+import com.ym.mec.biz.dal.vo.ImLiveBroadcastRoomVo;
+import com.ym.mec.biz.service.ImLiveBroadcastRoomService;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
+import org.redisson.api.RBucket;
 import org.redisson.api.RScoredSortedSet;
 import org.redisson.api.RedissonClient;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -12,6 +15,7 @@ import org.springframework.stereotype.Service;
 
 import java.util.Collection;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Redisson消息发布订阅服务监听
@@ -24,6 +28,9 @@ public class RedissonTopicListener implements ApplicationRunner, Ordered {
     private RedissonMessageService redissonMessageService;
     @Autowired
     private RedissonClient redissonClient;
+
+    @Autowired
+    private ImLiveBroadcastRoomService imLiveBroadcastRoomService;
  
     @Override
     public void run(ApplicationArguments args) {
@@ -32,21 +39,31 @@ public class RedissonTopicListener implements ApplicationRunner, Ordered {
 
             redissonMessageService.subscribe(RedissonMessageService.TOPIC_MESSAGE, (message) -> {
                 log.info("RedissonMessageService subscribe message={}", message);
+                try {
+                    TimeUnit.SECONDS.sleep(1);
+                } catch (InterruptedException e) {
+                    log.error("RedissonMessageService subscribe sleep error", e);
+                }
 
-                RScoredSortedSet<String> sortedSet = redissonClient.getScoredSortedSet(RedissonMessageService.LIVE_ROOM_MEMBER);
-                if (!sortedSet.isExists()) {
-                    return;
+                // 缓存JoinRoom用户信息到redis
+                RBucket<Object> bucket = redissonClient.getBucket(RedissonMessageService.LIVE_ROOM_MEMBER + message);
+                if (!bucket.isExists()) {
+                   return;
                 }
+                bucket.delete();
 
-                // 取出RScoredSortedSet的值
-                Collection<String> objects = sortedSet.readAll();
-                log.info("RedissonTopicListener subscribe objects={}", objects.size());
-                if (CollectionUtils.isNotEmpty(objects)) {
-                    String roomUid = objects.stream().findFirst().orElse("");
-                    log.info("RedissonTopicListener subscribe roomUid={}", roomUid);
+                ImLiveBroadcastRoomVo imLiveBroadcastRoomVo = imLiveBroadcastRoomService.queryRoomInfo(message);
+                if (Objects.isNull(imLiveBroadcastRoomVo)) {
+                    return;
+                }
+                try {
+                    imLiveBroadcastRoomService.setGroupMemberDefinedData(imLiveBroadcastRoomVo,imLiveBroadcastRoomVo.getLookNum(),imLiveBroadcastRoomVo.getTotalLookNum());
+                } catch (Exception e) {
+                    log.error("RedissonMessageService subscribe setGroupMemberDefinedData error", e);
+                    bucket.set(message, 30, TimeUnit.MINUTES);
                 }
-                // 清除数据
-                sortedSet.clear();
+                redissonMessageService.publish(RedissonMessageService.TOPIC_MESSAGE, message);
+
             });
             log.info("---> RedissonMessageService subscribe success");
         }

+ 2 - 0
mec-biz/src/main/java/com/ym/mec/biz/service/ImLiveBroadcastRoomService.java

@@ -72,6 +72,8 @@ public interface ImLiveBroadcastRoomService extends IService<ImLiveBroadcastRoom
 
     ImLiveBroadcastRoomVo speakerJoinRoom(String roomUid, String os);
 
+    void setGroupMemberDefinedData(ImLiveBroadcastRoomVo roomVo, Integer onlineUser, Integer totalUser);
+
     void joinRoom(String roomUid, Integer userId);
 
     void startLive(String roomUid, Integer userId,String videoResolution);

+ 54 - 21
mec-biz/src/main/java/com/ym/mec/biz/service/impl/ImLiveBroadcastRoomServiceImpl.java

@@ -63,7 +63,6 @@ import org.joda.time.DateTime;
 import org.redisson.api.RBucket;
 import org.redisson.api.RLock;
 import org.redisson.api.RMap;
-import org.redisson.api.RScoredSortedSet;
 import org.redisson.api.RedissonClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -903,11 +902,14 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
                 // 用户进入直播间,发送统计数据
                 // sendLiveRoomStatMessage(userid, roomVo);
 
-                // 直播间统计数据
-                getRoomData(roomVo);
-                // 设置群组属性
-                setGroupDefinedData(roomVo,EGroupDefinedDataType.MEMBER_ONLINE,roomVo.getLookNum().toString());
-                setGroupDefinedData(roomVo,EGroupDefinedDataType.MEMBER_TOTAL,roomVo.getTotalLookNum().toString());
+                // 缓存JoinRoom用户信息到redis
+                RBucket<Object> bucket = redissonClient.getBucket(RedissonMessageService.LIVE_ROOM_MEMBER + roomUid);
+                if (!bucket.isExists()) {
+                    bucket.set(roomUid, 30, TimeUnit.MINUTES);
+                    // 发布删除缓存消息
+                    redissonMessageService.publish(RedissonMessageService.TOPIC_MESSAGE, roomUid);
+                }
+
                 log.debug("opsRoom>>>> user enter room, userState: {}", JSON.toJSONString(userState));
                 return;
             }
@@ -980,6 +982,7 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
      */
     private void sendLiveRoomStatMessage(String userid, ImLiveBroadcastRoomVo roomVo) {
 
+        userid = setFromUserId(userid,roomVo);
         // 消息发送用户
         LiveRoomMessage.MessageUser messageUser = null;
         SysUser sysUser = sysUserFeignService.queryUserInfo();
@@ -1004,7 +1007,7 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
         LiveRoomMessage message = LiveRoomMessage.builder()
                 .isIncludeSender(1)
                 .objectName(LiveRoomMessage.STAT_SYNC)
-                .fromUserId(roomVo.getSpeakerId().toString())
+                .fromUserId(userid)
                 .toChatRoomId(roomVo.getRoomUid())
                 .content(messageContent)
                 .build();
@@ -1020,6 +1023,13 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
 
     }
 
+    private String setFromUserId(String fromUserId, ImLiveBroadcastRoomVo roomVo) {
+        if (roomVo.getServiceProvider().equals(RongCloudLivePlugin.PLUGIN_NAME)) {
+            return fromUserId;
+        }
+        return roomVo.getSpeakerId().toString();
+    }
+
     /**
      * 直播间用户离开消息
      * @param userid 用户id
@@ -1027,6 +1037,7 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
      */
     private void sendLiveRoomLoginOutMessage(String userid, ImLiveBroadcastRoomVo roomVo) {
 
+        userid = setFromUserId(userid,roomVo);
         // 消息发送用户
         LiveRoomMessage.MessageUser messageUser = null;
         SysUser sysUser = sysUserFeignService.queryUserById(Integer.parseInt(userid));
@@ -1049,7 +1060,7 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
         LiveRoomMessage message = LiveRoomMessage.builder()
                 .isIncludeSender(1)
                 .objectName(LiveRoomMessage.LEAVE)
-                .fromUserId(roomVo.getSpeakerId().toString())
+                .fromUserId(userid)
                 .toChatRoomId(roomVo.getRoomUid())
                 .content(messageContent)
                 .build();
@@ -1088,18 +1099,14 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
         // 直播间统计数据
         getRoomData(roomVo);
 
+        String userId = setFromUserId(fromUserId.toString(),roomVo);
         // 缓存JoinRoom用户信息到redis
-        RScoredSortedSet<String> sortedSet = redissonClient.getScoredSortedSet(RedissonMessageService.LIVE_ROOM_MEMBER);
-        sortedSet.add(System.currentTimeMillis(), roomUid);
-        // 设置缓存失效时间, 30分钟
-        redissonClient.getKeys().expire(RedissonMessageService.LIVE_ROOM_MEMBER, 30, TimeUnit.MINUTES);
-
-        // 发布删除缓存消息
-        redissonMessageService.publish(RedissonMessageService.TOPIC_MESSAGE, DateTime.now().toString());
-
-        // 设置群组属性
-        setGroupDefinedData(roomVo,EGroupDefinedDataType.MEMBER_ONLINE,roomVo.getLookNum().toString());
-        setGroupDefinedData(roomVo,EGroupDefinedDataType.MEMBER_TOTAL,roomVo.getTotalLookNum().toString());
+        RBucket<Object> bucket = redissonClient.getBucket(RedissonMessageService.LIVE_ROOM_MEMBER + roomUid);
+        if (!bucket.isExists()) {
+            bucket.set(roomUid, 30, TimeUnit.MINUTES);
+            // 发布删除缓存消息
+            redissonMessageService.publish(RedissonMessageService.TOPIC_MESSAGE, roomUid);
+        }
 
         // 消息发送用户
         LiveRoomMessage.MessageUser messageUser = null;
@@ -1126,7 +1133,7 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
         LiveRoomMessage message = LiveRoomMessage.builder()
                 .isIncludeSender(1)
                 .objectName(LiveRoomMessage.MEMBER_COUNT)
-                .fromUserId(roomVo.getSpeakerId().toString())
+                .fromUserId(userId)
                 .toChatRoomId(roomUid)
                 .content(messageContent)
                 .build();
@@ -1289,6 +1296,31 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
         return roomVo;
     }
 
+
+    @Override
+    public void setGroupMemberDefinedData(ImLiveBroadcastRoomVo roomVo, Integer onlineUser, Integer totalUser) {
+        List<TencentRequest.ChatRoomGroupDefinedData> appDefinedData = new ArrayList<>();
+        appDefinedData.add(TencentRequest.ChatRoomGroupDefinedData.builder()
+                                                                  .key(EGroupDefinedDataType.MEMBER_ONLINE.getCode())
+                                                                  .value(onlineUser.toString())
+                                                                  .build());
+        appDefinedData.add(TencentRequest.ChatRoomGroupDefinedData.builder()
+                                                                  .key(EGroupDefinedDataType.MEMBER_TOTAL.getCode())
+                                                                  .value(totalUser.toString())
+                                                                  .build());
+        try {
+            livePluginContext.getPluginService(roomVo.getServiceProvider())
+                             .chatRoomGroupDefinedData(TencentRequest.ChatRoomGroup.builder()
+                                                                                   .groupId(roomVo.getRoomUid())
+                                                                                   .appDefinedData(appDefinedData)
+                                                                                   .build());
+            log.info("设置直播群观看人数 roomUid:{}, data {}",roomVo.getRoomUid(), JSONObject.toJSONString(appDefinedData));
+        } catch (Exception e) {
+            log.error("设置直播群观看人数失败", e);
+            throw new BizException("设置直播群观看人数失败");
+        }
+    }
+
     private void setGroupDefinedData(ImLiveBroadcastRoomVo roomVo, EGroupDefinedDataType type, String value) {
         List<TencentRequest.ChatRoomGroupDefinedData> appDefinedData = new ArrayList<>();
         appDefinedData.add(TencentRequest.ChatRoomGroupDefinedData.builder()
@@ -1408,6 +1440,7 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
             return;
         }
 
+        String fromUserid = setFromUserId(userId.toString(),roomVo);
         // 消息发送用户
         LiveRoomMessage.MessageUser messageUser = null;
         SysUser sysUser = sysUserFeignService.queryUserById(userId);
@@ -1430,7 +1463,7 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
         LiveRoomMessage message = LiveRoomMessage.builder()
                                                  .isIncludeSender(1)
                                                  .objectName(LiveRoomMessage.WELCOME)
-                                                 .fromUserId(roomVo.getSpeakerId().toString())
+                                                 .fromUserId(fromUserid)
                                                  .toChatRoomId(roomUid)
                                                  .content(messageContent)
                                                  .build();