Просмотр исходного кода

优化4-取消每3秒像正在直播的直播间发送策略
修改为监听融云人员变更消息后发送直播间人数给主播

hgw 3 лет назад
Родитель
Сommit
7271187554

+ 0 - 5
mec-biz/src/main/java/com/ym/mec/biz/service/ImLiveBroadcastRoomService.java

@@ -52,11 +52,6 @@ public interface ImLiveBroadcastRoomService extends IService<ImLiveBroadcastRoom
 
 
     void syncLike(String roomUid, Integer likeNum);
     void syncLike(String roomUid, Integer likeNum);
 
 
-    /**
-     * 同步直播间在线人数 - 3秒一次发送到直播间消息中
-     */
-    void sendLiveOnlineNum();
-
     void opsRoom(List<ImUserState> userState);
     void opsRoom(List<ImUserState> userState);
 
 
     ImLiveBroadcastRoomVo speakerJoinRoom(String roomUid);
     ImLiveBroadcastRoomVo speakerJoinRoom(String roomUid);

+ 56 - 110
mec-biz/src/main/java/com/ym/mec/biz/service/impl/ImLiveBroadcastRoomServiceImpl.java

@@ -517,61 +517,6 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
     }
     }
 
 
     /**
     /**
-     * 同步直播间在线人数 - 3秒一次发送到直播间消息中
-     */
-    @Override
-    public void sendLiveOnlineNum() {
-        //获取所有在直播中的直播间
-        List<ImLiveBroadcastRoom> liveRooms = this.list(Wrappers.<ImLiveBroadcastRoom>lambdaQuery()
-                .eq(ImLiveBroadcastRoom::getRoomState, 0)
-                .eq(ImLiveBroadcastRoom::getLiveState, 1)
-        );
-        if (CollectionUtils.isEmpty(liveRooms)) {
-            return;
-        }
-        log.info("sendLiveOnlineNum>>>> liveRooms{}", JSONObject.toJSONString(liveRooms));
-        Map<String, Integer> onlineUserMap = new HashMap<>();
-        liveRooms.forEach(room -> {
-            //查询在线观看者信息
-            RMap<Integer, BaseRoomUserVo> onlineUserCache = this.getOnlineUserCache(room.getRoomUid());
-            if (!onlineUserCache.isExists()) {
-                return;
-            }
-            //发送消息到直播间
-            ImRoomMessage message = new ImRoomMessage();
-            message.setFromUserId(room.getSpeakerId().toString());
-            message.setToChatroomId(room.getRoomUid());
-            message.setObjectName(ImRoomMessage.MEMBER_COUNT);
-            int size = onlineUserCache.size();
-            if (size < 1) {
-                return;
-            }
-            onlineUserMap.put("count", size);
-            message.setContent(onlineUserMap);
-            //主讲人发送消息
-            try {
-                imFeignService.publishRoomMsg(message);
-                log.info("sendLiveOnlineNum>>>> speakerId  room:{}", JSONObject.toJSONString(message));
-            } catch (Exception e) {
-                log.error("sendLiveOnlineNum>>>>  speakerId error {}", e.getMessage());
-                log.error("sendLiveOnlineNum>>>>  speakerId sendMessage {} :", JSONObject.toJSONString(message));
-            }
-
-            //主讲人自己发消息自己收不到,需要一个观看者发送消息
-            List<BaseRoomUserVo> collect = onlineUserCache.values().stream().limit(1).collect(Collectors.toList());
-            BaseRoomUserVo baseRoomUserVo = collect.get(0);
-            message.setFromUserId(baseRoomUserVo.getUserId().toString());
-            try {
-                imFeignService.publishRoomMsg(message);
-                log.info("sendLiveOnlineNum>>>> looker room:{}", JSONObject.toJSONString(message));
-            } catch (Exception e) {
-                log.error("sendLiveOnlineNum>>>> looker error {}", e.getMessage());
-                log.error("sendLiveOnlineNum>>>> looker sendMessage {} :", JSONObject.toJSONString(message));
-            }
-        });
-    }
-
-    /**
      * <p>主讲人处理进入和退出房间数据
      * <p>主讲人处理进入和退出房间数据
      * <p>观看者只处理退出房间数据
      * <p>观看者只处理退出房间数据
      *
      *
@@ -601,25 +546,11 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
             }
             }
             //将最新的时间写入缓存
             //将最新的时间写入缓存
             userStateTimeCache.set(userStateTime, 5L, TimeUnit.MINUTES);
             userStateTimeCache.set(userStateTime, 5L, TimeUnit.MINUTES);
-            //查询主讲人userId,若是主讲人
-            RBucket<RoomSpeakerInfo> speakerCache = redissonClient.getBucket(LIVE_SPEAKER_INFO.replace(USER_ID, userid));
-            if (speakerCache.isExists()) {
-                RoomSpeakerInfo speakerInfo = speakerCache.get();
-                //主讲人进入房间
-                if (user.getStatus().equals("0")) {
-                    speakerInfo.setJoinRoomTime(now);
-                    log.info("opsRoom>>>> join speakerCache {}", JSONObject.toJSONString(speakerInfo));
-                    speakerCache.set(speakerInfo);
-                    return;
-                }
-                //主讲人退出房间关闭录像
-                closeLive(speakerInfo);
-                speakerInfo.setExitRoomTime(now);
-                log.info("opsRoom>>>> exit speakerCache {}", JSONObject.toJSONString(speakerInfo));
-                speakerCache.set(speakerInfo);
+            //查询userId是不是主讲人 ,如果是主讲人则返回
+            if (isSpeaker(user, now, userid)) {
                 return;
                 return;
             }
             }
-            //未查询到主讲人信息,观看者只接受退出消息 status=0 是进入房间
+            //这里开始只处理观看者的数据,观看者只接受退出消息 status=0 是进入房间
             if (user.getStatus().equals("0")) {
             if (user.getStatus().equals("0")) {
                 return;
                 return;
             }
             }
@@ -637,50 +568,41 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
             if (!roomTotalUser.isExists() && !roomTotalUser.containsKey(userId)) {
             if (!roomTotalUser.isExists() && !roomTotalUser.containsKey(userId)) {
                 return;
                 return;
             }
             }
-            //查询用户数据
+            //查询用户数据
             RoomUserInfoVo userInfo = roomTotalUser.get(userId);
             RoomUserInfoVo userInfo = roomTotalUser.get(userId);
-            //查询在线人员列表
-            RMap<Integer, BaseRoomUserVo> onlineUserInfo = getOnlineUserCache(roomUid);
-            //获取当前用户是否在房间状态 true在 false不在
-            boolean userOnline = onlineUserInfo.isExists() && onlineUserInfo.containsKey(userId);
-            //用户是在房间的状态 并且 突然离线 - 那么融云会发送用户离线消息-此刻就发送退出房间消息给主讲人
-            if (userOnline && user.getStatus().equals("1")) {
-                ImRoomMessage message = new ImRoomMessage();
-                message.setFromUserId(userId.toString());
-                message.setToChatroomId(roomUid);
-                message.setObjectName(ImRoomMessage.LOOKER_LOGIN_OUT);
-                try {
-                    imFeignService.publishRoomMsg(message);
-                } catch (Exception e) {
-                    log.error("opsRoom>>>>  looker error {}", e.getMessage());
-                    log.error("opsRoom>>>>  looker error sendMessage {} : LOOKER_LOGIN_OUT : {}", message, JSONObject.toJSONString(userInfo));
-                }
-                log.info("opsRoom>>>> looker LOOKER_LOGIN_OUT : {}", JSONObject.toJSONString(userInfo));
-            }
-            //只有在主播开播后用户才有观看时间,才需要计算当前用户观看时长
+            //如果有动态观看时间则证明主播开播过,需要计算当前用户观看时长
             if (Objects.nonNull(userInfo.getDynamicLookTime())) {
             if (Objects.nonNull(userInfo.getDynamicLookTime())) {
                 userInfo.setTotalViewTime(getLookMinutes(userInfo.getDynamicLookTime(), userInfo.getTotalViewTime()));
                 userInfo.setTotalViewTime(getLookMinutes(userInfo.getDynamicLookTime(), userInfo.getTotalViewTime()));
                 userInfo.setDynamicLookTime(null);
                 userInfo.setDynamicLookTime(null);
             }
             }
             roomTotalUser.fastPut(userId, userInfo);
             roomTotalUser.fastPut(userId, userInfo);
-            //如果是最后一个人离开房间则发送一条消息给主播
-            if (onlineUserInfo.isExists() && onlineUserInfo.size() == 1) {
-                //发送消息到直播间
-                ImRoomMessage message = new ImRoomMessage();
-                message.setFromUserId(userId.toString());
-                message.setToChatroomId(roomUid);
-                message.setObjectName(ImRoomMessage.MEMBER_COUNT);
-                message.setContent(new HashMap<String, Integer>() {{
-                    put("count", 0);
-                }});
-                //发送消息
-                try {
-                    imFeignService.publishRoomMsg(message);
-                    log.info("opsRoom>>>> sendLiveOnlineNum>>>> speakerId  room:{}", JSONObject.toJSONString(message));
-                } catch (Exception e) {
-                    log.error("opsRoom>>>> sendLiveOnlineNum>>>>  speakerId error {}", e.getMessage());
-                    log.error("opsRoom>>>> sendLiveOnlineNum>>>>  speakerId sendMessage {} :", JSONObject.toJSONString(message));
-                }
+
+            //查询在线人员列表
+            RMap<Integer, BaseRoomUserVo> onlineUserInfo = getOnlineUserCache(roomUid);
+            if (!onlineUserInfo.isExists()) {
+                return;
+            }
+            //发消息给主播 告知现在人数
+            int size = 0;
+            ImRoomMessage message = new ImRoomMessage();
+            message.setIsIncludeSender(1);
+            message.setObjectName(ImRoomMessage.MEMBER_COUNT);
+            message.setToChatroomId(roomUid);
+            //大于1就发送实际人数,如果是最后一个人离开房间则发送一条0人数消息给主播
+            if (onlineUserInfo.size() > 1) {
+                size = onlineUserInfo.size();
+            }
+            HashMap<String, Integer> sendMap = new HashMap<>();
+            sendMap.put("count", size);
+            message.setFromUserId(userId.toString());
+            message.setContent(sendMap);
+            //发送消息
+            try {
+                imFeignService.publishRoomMsg(message);
+                log.info("opsRoom>>>> sendLiveOnlineNum>>>> speakerId  room:{}", JSONObject.toJSONString(message));
+            } catch (Exception e) {
+                log.error("opsRoom>>>> sendLiveOnlineNum>>>>  speakerId error {}", e.getMessage());
+                log.error("opsRoom>>>> sendLiveOnlineNum>>>>  speakerId sendMessage {} :", JSONObject.toJSONString(message));
             }
             }
             //从在线人员列表删除该人员
             //从在线人员列表删除该人员
             onlineUserInfo.fastRemove(userId);
             onlineUserInfo.fastRemove(userId);
@@ -689,6 +611,30 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
     }
     }
 
 
     /**
     /**
+     * 查询userId是不是主讲人
+     */
+    private boolean isSpeaker(ImUserState user, Date now, String userid) {
+        RBucket<RoomSpeakerInfo> speakerCache = redissonClient.getBucket(LIVE_SPEAKER_INFO.replace(USER_ID, userid));
+        if (speakerCache.isExists()) {
+            RoomSpeakerInfo speakerInfo = speakerCache.get();
+            //主讲人进入房间
+            if (user.getStatus().equals("0")) {
+                speakerInfo.setJoinRoomTime(now);
+                log.info("opsRoom>>>> join speakerCache {}", JSONObject.toJSONString(speakerInfo));
+                speakerCache.set(speakerInfo);
+                return true;
+            }
+            //主讲人退出房间关闭录像
+            closeLive(speakerInfo);
+            speakerInfo.setExitRoomTime(now);
+            log.info("opsRoom>>>> exit speakerCache {}", JSONObject.toJSONString(speakerInfo));
+            speakerCache.set(speakerInfo);
+            return true;
+        }
+        return false;
+    }
+
+    /**
      * 主讲人登录专用
      * 主讲人登录专用
      *
      *
      * @param roomUid
      * @param roomUid

+ 0 - 6
mec-client-api/src/main/java/com/ym/mec/task/TaskRemoteService.java

@@ -256,12 +256,6 @@ public interface TaskRemoteService {
     void destroyExpiredLiveRoom();
     void destroyExpiredLiveRoom();
 
 
     /**
     /**
-     * 同步直播间在线人数 - 3秒一次发送到直播间消息中
-     */
-    @GetMapping("task/sendLiveOnlineNum")
-    void sendLiveOnlineNum();
-
-    /**
      * 学员小课统计
      * 学员小课统计
      */
      */
     @GetMapping("task/studentSmallClassStatistics")
     @GetMapping("task/studentSmallClassStatistics")

+ 0 - 5
mec-client-api/src/main/java/com/ym/mec/task/fallback/TaskRemoteServiceFallback.java

@@ -306,11 +306,6 @@ public class TaskRemoteServiceFallback implements TaskRemoteService {
     }
     }
 
 
     @Override
     @Override
-    public void sendLiveOnlineNum(){
-    	logger.error("直播间在线人数推送失败");
-    }
-
-    @Override
     public void studentSmallClassStatistics() {
     public void studentSmallClassStatistics() {
         logger.error("学员小课统计失败");
         logger.error("学员小课统计失败");
     }
     }

+ 50 - 2
mec-common/common-core/src/main/java/com/ym/mec/common/entity/ImRoomMessage.java

@@ -12,29 +12,49 @@ public class ImRoomMessage extends BaseMessage {
     //objectName 类型-观看者退出房间
     //objectName 类型-观看者退出房间
     public static final String LOOKER_LOGIN_OUT = "RC:LookerLoginOut";
     public static final String LOOKER_LOGIN_OUT = "RC:LookerLoginOut";
 
 
-    //objectName 类型-观看者数量
-    public static final String MEMBER_COUNT = "RC:Chatroom:MemberCount";
+    //objectName 类型-观看者数量-该消息只有主播端接
+    public static final String MEMBER_COUNT = "RC:Chatroom:MemberCountUp";
 
 
     /**
     /**
+     * <p>必传
      * 消息类型
      * 消息类型
      */
      */
     private String objectName;
     private String objectName;
 
 
     /**
     /**
+     * <p>必传
      * 消息内容
      * 消息内容
      */
      */
     private Object content;
     private Object content;
 
 
     /**
     /**
+     * <p>必传
      * 发送者id
      * 发送者id
      */
      */
     private String fromUserId;
     private String fromUserId;
 
 
     /**
     /**
+     * <p>必传
      * 发送到的房间uid
      * 发送到的房间uid
      */
      */
     private String toChatroomId;
     private String toChatroomId;
 
 
+    /**
+     * <p>非必传
+     * <p>开通“聊天室消息云存储”服务情况下,针对融云服务端历史消息中是否存储此条消息,
+     * <p>0 表示为不存储、 1 表示为存储,默认为 1 存储消息。
+     * <p>客户端发送自定义消息时则根据消息注册的 ISPERSISTED 标识判断是否在服务端存储。
+     */
+    private Integer isPersisted = 1;
+
+    /**
+     * <p>非必传
+     * <p>发送用户自己是否接收消息,0 表示为不接收,1 表示为接收,
+     * <p>默认为 0 不接收,只有向一个聊天室发送消息时此参数有效。
+     * <p>如开通了聊天室消息云存储功能,不为 1 时该消息不会存储到历史消息中。
+     */
+    private Integer isIncludeSender = 0;
+
     @Override
     @Override
     public String getObjectName() {
     public String getObjectName() {
         return objectName;
         return objectName;
@@ -67,4 +87,32 @@ public class ImRoomMessage extends BaseMessage {
     public void setToChatroomId(String toChatroomId) {
     public void setToChatroomId(String toChatroomId) {
         this.toChatroomId = toChatroomId;
         this.toChatroomId = toChatroomId;
     }
     }
+
+    /**
+     * 0 表示为不存储、 1 表示为存储
+     */
+    public Integer getIsPersisted() {
+        return isPersisted;
+    }
+
+    /**
+     * 0 表示为不存储、 1 表示为存储
+     */
+    public void setIsPersisted(Integer isPersisted) {
+        this.isPersisted = isPersisted;
+    }
+
+    /**
+     * 0 表示为不接收,1 表示为接收
+     */
+    public Integer getIsIncludeSender() {
+        return isIncludeSender;
+    }
+
+    /**
+     * 0 表示为不接收,1 表示为接收
+     */
+    public void setIsIncludeSender(Integer isIncludeSender) {
+        this.isIncludeSender = isIncludeSender;
+    }
 }
 }

+ 0 - 20
mec-task/src/main/java/com/ym/mec/task/jobs/SendLiveOnlineNumTask.java

@@ -1,20 +0,0 @@
-package com.ym.mec.task.jobs;
-
-import com.ym.mec.task.TaskRemoteService;
-import com.ym.mec.task.core.BaseTask;
-import com.ym.mec.task.core.TaskException;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-@Service
-public class SendLiveOnlineNumTask extends BaseTask {
-
-    @Autowired
-    private TaskRemoteService taskRemoteService;
-
-    @Override
-    public void execute() throws TaskException {
-        taskRemoteService.sendLiveOnlineNum();
-    }
-
-}

+ 0 - 6
mec-web/src/main/java/com/ym/mec/web/controller/TaskController.java

@@ -583,12 +583,6 @@ public class TaskController extends BaseController {
         imLiveBroadcastRoomService.destroyExpiredLiveRoom();
         imLiveBroadcastRoomService.destroyExpiredLiveRoom();
     }
     }
 
 
-    @ApiOperation("同步直播间在线人数 - 3秒一次发送到直播间消息中")
-    @GetMapping(value = "/sendLiveOnlineNum")
-    public void sendLiveOnlineNum(){
-        imLiveBroadcastRoomService.sendLiveOnlineNum();
-    }
-
     @ApiOperation("学员小课统计")
     @ApiOperation("学员小课统计")
     @GetMapping(value = "/studentSmallClassStatistics")
     @GetMapping(value = "/studentSmallClassStatistics")
     public void studentSmallClassStatistics(){
     public void studentSmallClassStatistics(){