Browse Source

直播间人员状态

liujc 1 year ago
parent
commit
b105c33cc0

+ 1 - 1
cooleshow-user/user-admin/src/main/java/com/yonge/cooleshow/admin/controller/open/ImController.java

@@ -233,7 +233,7 @@ public class ImController extends BaseController {
             }
 
             // 自动关闭录制
-            liveRoomService.closeLive(getRoomUid(event.getStreamId()), getSpeakerId(event.getStreamId()),event.getSequence());
+            liveRoomService.closeLive(getRoomUid(event.getStreamId()), getSpeakerId(event.getStreamId()).longValue(),event.getSequence());
             // 同步点赞数
             liveRoomService.syncLikeCount(getRoomUid(event.getStreamId()));
         }

+ 1 - 1
cooleshow-user/user-biz/src/main/java/com/yonge/cooleshow/biz/dal/service/LiveRoomService.java

@@ -229,7 +229,7 @@ public interface LiveRoomService extends IService<LiveRoom> {
      */
     void updateLiveRoomPushStreamTime(TencentData.CallbackStreamStateEvent event);
 
-    void closeLive(String roomUid, Integer speakerId, String sequence);
+    void closeLive(String roomUid, Long speakerId, String sequence);
 
     void startLive(String roomUid, Integer speakerId, String sequence);
 }

+ 75 - 70
cooleshow-user/user-biz/src/main/java/com/yonge/cooleshow/biz/dal/service/impl/LiveRoomServiceImpl.java

@@ -692,6 +692,13 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
         if (Objects.isNull(room)) {
             return;
         }
+
+        //10秒内同一个房间不能重复销毁-防重复销毁
+        RBucket<Object> bucket = redissonClient.getBucket("IM:ROOMDESTROY:" + room.getRoomUid());
+        if (!bucket.trySet(1, 10, TimeUnit.SECONDS)) {
+            return;
+        }
+
         CourseSchedule schedule = getCourseScheduleByRoomUid(room.getRoomUid());
         //直播课
         if (room.getType().equals(RoomTypeEnum.LIVE.getCode())) {
@@ -709,13 +716,9 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
         }
         //修改房间状态
         room.setLiveState(2);
-        // room.setRoomState(1);
+        room.setRoomState(2);
         this.updateById(room);
 
-
-        //获取所有直播间缓存数据并写入数据库后并清理缓存
-        CompletableFuture.runAsync(() -> insertAndCleanLiveData(room.getRoomUid(), room.getSpeakerId()));
-
         String speakerIdStr = room.getSpeakerId().toString();
         String roomUid = room.getRoomUid();
 
@@ -734,20 +737,14 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
         }
 
         Date now = new Date();
-        //删除人员与房间关联关系缓存的方法
-        Consumer<String> deleteUserRoomCache = (id) -> redissonClient.getBucket(LIVE_USER_ROOM.replace(USER_ID, id)).deleteAsync();
         //给老师签退
         teacherAttendanceService.update(Wrappers.<TeacherAttendance>lambdaUpdate()
                 .eq(TeacherAttendance::getTeacherId, room.getSpeakerId())
                 .eq(TeacherAttendance::getCourseScheduleId, schedule.getId())
                 .set(TeacherAttendance::getSignOutTime, now));
-        //删除老师与房间关联关系
-        deleteUserRoomCache.accept(speakerIdStr);
         //获取在线人员信息
         RMap<Long, String> onlineUserCache = this.getOnlineUserCache(roomUid);
         onlineUserCache.forEach((id, s) -> {
-            //删除观看者对应直播间编号的缓存
-            deleteUserRoomCache.accept(id.toString());
             //观看者签退
             studentAttendanceService.update(Wrappers.<StudentAttendance>lambdaUpdate()
                     .eq(StudentAttendance::getStudentId, id)
@@ -755,20 +752,12 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
                     .set(StudentAttendance::getSignOutTime, now));
         });
 
+
+        //获取所有直播间缓存数据并写入数据库后并清理缓存
+        CompletableFuture.runAsync(() -> insertAndCleanLiveData(room.getRoomUid(), room.getSpeakerId()));
+        log.info("roomDestroy>>>> insertAndCleanLiveData {}", JSONObject.toJSONString(room));
+
         tryDestroyLiveRoom(room);
-        //删除房间信息
-        this.getLiveRoomInfo(roomUid).deleteAsync();
-        //删除点赞数
-        redissonClient.getBucket(LIVE_ROOM_LIKE.replace(ROOM_UID, roomUid)).deleteAsync();
-        //删除当前主讲人最后一次进入房间的ip
-        redissonClient.getBucket(LIVE_USER_LAST_CLIENT_IP.replace(ROOM_UID, roomUid).replace(USER_ID, speakerIdStr)).deleteAsync();
-        //删除当前主讲人最后一次心跳
-        redissonClient.getBucket(LIVE_ROOM_HEART_BEAT.replace(ROOM_UID, roomUid)).deleteAsync();
-        //删除房间全部人员缓存
-        this.getTotalUserCache(roomUid).deleteAsync();
-        //删除该临时直播间列表
-        RMap<Long, String> map = redissonClient.getMap(TEACHER_TEMP_LIVE_ROOM);
-        map.remove(room.getSpeakerId());
     }
 
     /**
@@ -809,11 +798,6 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
             }
             //将最新的时间写入缓存
             userStateTimeCache.set(userStateTime, 60L, TimeUnit.MINUTES);
-            //根据房间号获取房间信息
-            RBucket<RoomInfoCache> roomInfoCache = this.getLiveRoomInfo(roomUid);
-            if (!roomInfoCache.isExists()) {
-                return;
-            }
 
             // 查询房间信息
             LiveRoom roomVo = getByRoomUid(roomUid);
@@ -827,10 +811,9 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
                 return;
             }
 
-            RoomInfoCache roomInfo = roomInfoCache.get();
             // 查询userId是不是主讲人 ,如果是主讲人则返回
-            if (roomInfo.getSpeakerId().toString().equals(userIdStr)) {
-                this.opsSpeaker(roomInfoCache, user, now, userIdStr);
+            if (roomVo.getSpeakerId().toString().equals(userIdStr)) {
+                this.opsSpeaker(roomUid, user, now, userIdStr);
                 return;
             }
             //这里开始只处理观看者的数据,观看者只接受退出消息 status=0 是进入房间
@@ -858,8 +841,31 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
                 log.debug("opsRoom>>>> user enter room, userState: {}", JSON.toJSONString(userState));
                 return;
             }
+
             //用户id
             Long userId = Long.valueOf(userIdStr);
+            // 更新用户离线状态
+            ImLiveBroadcastRoomMember roomMember = new ImLiveBroadcastRoomMember();
+            roomMember.setOnlineStatus(0);
+
+            // 用户离开直播间
+            if (user.getStatus().equals("3")) {
+                // 直播间用户离开状态
+                roomMember.setLiveRoomStatus(0);
+
+            }
+            // 更新用户在线状态为离线
+            ImLiveBroadcastRoomMember one = imLiveBroadcastRoomMemberService.lambdaQuery()
+                    .eq(ImLiveBroadcastRoomMember::getRoomUid, roomVo.getRoomUid())
+                    .eq(ImLiveBroadcastRoomMember::getUserId, userId)
+                    .last("limit 1")
+                    .one();
+            if (one != null) {
+                roomMember.setId(one.getId());
+                imLiveBroadcastRoomMemberService.updateById(roomMember);
+            }
+            liveBroadcastRoomMemberService.saveRecord(roomUid, userId, InOrOutEnum.OUT, YesOrNoEnum.NO);
+
             //从房间累计用户信息中查询该用户的信息
             RMap<Long, String> roomTotalUser = this.getTotalUserCache(roomUid);
             //该房间未查询到用户数据则不处理
@@ -885,7 +891,6 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
                 return;
             }
             CourseSchedule schedule = getCourseScheduleByRoomUid(roomUid);
-            liveBroadcastRoomMemberService.saveRecord(roomUid,userId, InOrOutEnum.OUT, YesOrNoEnum.NO);
             //从在线人员列表删除该人员
             onlineUserInfo.fastRemove(userId);
             //学员退出 写学生考勤表
@@ -894,24 +899,8 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
                     .eq(StudentAttendance::getCourseScheduleId, schedule.getId())
                     .set(StudentAttendance::getSignOutTime, now));
 
-            // 更新用户离线状态
-            ImLiveBroadcastRoomMember roomMember = new ImLiveBroadcastRoomMember();
-            roomMember.setOnlineStatus(0);
-
-            // 用户离开直播间
-            if (user.getStatus().equals("3")) {
-                // 直播间用户离开状态
-                roomMember.setLiveRoomStatus(0);
-
-            }
-            // 更新用户在线状态为离线
-            imLiveBroadcastRoomMemberService.lambdaUpdate()
-                    .eq(ImLiveBroadcastRoomMember::getRoomUid, roomVo.getRoomUid())
-                    .eq(ImLiveBroadcastRoomMember::getUserId, userId)
-                    .update(roomMember);
-
             //向直播间发送当前在线人数消息
-            this.sendOnlineUserCount(roomUid, roomInfo.getSpeakerId(), onlineUserInfo.size());
+            this.sendOnlineUserCount(roomUid, roomVo.getSpeakerId(), onlineUserInfo.size());
             log.info("opsRoom>>>> looker userInfo: {}", userJsonStr);
             //用户离开直播间发送退出房间消息给主讲人
             ImRoomMessage message = new ImRoomMessage();
@@ -952,10 +941,16 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
     /**
      * 主讲人
      */
-    private void opsSpeaker(RBucket<RoomInfoCache> roomInfoCache, ImUserStateSync user, Date now, String userIdStr) {
-        RoomInfoCache roomInfo = roomInfoCache.get();
-        String roomUid = roomInfo.getRoomUid();
+    private void opsSpeaker(String roomUid, ImUserStateSync user, Date now, String userIdStr) {
 
+        LiveRoom liveRoom = getByRoomUid(roomUid);
+        RBucket<RoomSpeakerInfo> speakerCache = getRoomSpeakerInfoCache(roomUid, userIdStr);
+        if (!speakerCache.isExists()) {
+            log.info("isSpeaker>>>> 主播用户缓存不存在, roomId: {}, userId={}", roomUid, userIdStr);
+            return;
+        }
+        // 当前用户为主播
+        RoomSpeakerInfo speakerInfo = speakerCache.get();
 
         CourseSchedule schedule = getCourseScheduleByRoomUid(roomUid);
         //进退房间写老师考勤表
@@ -967,8 +962,10 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
         RBucket<String> lastClientIp = redissonClient.getBucket(LIVE_USER_LAST_CLIENT_IP.replace(ROOM_UID, roomUid).replace(USER_ID, userIdStr));
         //主讲人进入房间
         if (user.getStatus().equals("0")) {
-            roomInfo.setSpeakerState(0);
-            roomInfo.setJoinRoomTime(now);
+
+            speakerInfo.setJoinRoomTime(now);
+            log.info("isSpeaker>>>> join speakerCache {}", JSONObject.toJSONString(speakerInfo));
+            speakerCache.set(speakerInfo);
             //将本次进入房间的clientIp添加到主讲人最后一次clientIp缓存中
             if (StringUtils.isNotBlank(user.getClientIp())) {
                 lastClientIp.set(user.getClientIp());
@@ -977,8 +974,8 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
             if (Objects.isNull(teacherAttendance)) {
                 this.setTeacherAttendance(Long.parseLong(userIdStr), schedule.getCourseGroupId(), schedule.getId());
             }
-            roomInfoCache.set(roomInfo);
-            log.info("opsRoom>>>> join speakerCache {}", JSONObject.toJSONString(roomInfo));
+            // 设置直播群组自定义数据
+            setGroupDefinedData(liveRoom,EGroupDefinedDataType.ANCHOR_STATUS,"ONLINE");
             return;
         }
         //退出房间 - 校验本次退出直播间的clientIp 是不是上次进入房间的clientIp
@@ -991,12 +988,11 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
             }
         }
         //如果退出时间大于进入时间就无需再次退出-直接返回
-        if (compareDate.test(roomInfo.getExitRoomTime(), roomInfo.getJoinRoomTime())) {
+        if (compareDate.test(speakerInfo.getExitRoomTime(), speakerInfo.getJoinRoomTime())) {
             return;
         }
 
-        roomInfo.setSpeakerState(1);
-        roomInfo.setExitRoomTime(now);
+
         if (Objects.isNull(teacherAttendance)) {
             teacherAttendance = new TeacherAttendance();
             teacherAttendance.setTeacherId(Long.parseLong(userIdStr));
@@ -1013,12 +1009,13 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
             teacherAttendance.setUpdateTime(now);
             teacherAttendanceService.updateById(teacherAttendance);
         }
-        log.info("opsRoom>>>> exit speakerCache {}", JSONObject.toJSONString(roomInfo));
-        roomInfoCache.set(roomInfo);
+
+        //主讲人退出房间关闭录像
+        closeLive(roomUid,speakerInfo.getSpeakerId(),null);
 
 
         // 设置直播群组自定义数据
-        setGroupDefinedData(getByRoomUid(roomUid),EGroupDefinedDataType.ANCHOR_STATUS,"OFFLINE");
+        setGroupDefinedData(liveRoom,EGroupDefinedDataType.ANCHOR_STATUS,"OFFLINE");
 
     }
 
@@ -1717,7 +1714,7 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
                 }
                 log.info("查询直播间流状态:{},roomUid:{}", JSON.toJSONString(liveStreamState), liveRoom.getRoomUid());
                 if (!"active".equals(liveStreamState.getStreamState())) {
-                    return roomDestroy(liveRoom);
+                    pluginService.liveStreamStop(getStreamId(liveRoom.getRoomUid(), liveRoom.getSpeakerId()));
                 }
             } else if (pluginService.pluginName().equals(RongCloudLivePlugin.PLUGIN_NAME)) {
                 // 融云走原有逻辑 融云自动销毁
@@ -1865,17 +1862,27 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
                 }
             }
             //添加人员数据
-            Lists.partition(memberList, 500)
-                    .forEach(list -> imLiveBroadcastRoomMemberMapper.insertBatch(list));
+//            Lists.partition(memberList, 500)
+//                    .forEach(list -> imLiveBroadcastRoomMemberMapper.insertBatch(list));
         }
         //获取在线人员信息
         RMap<Long, String> onlineUserCache = getOnlineUserCache(roomUid);
         //删除人员对应直播间编号信息
-        onlineUserCache.forEach((id, s) -> redissonClient.getBucket(LIVE_USER_ROOM.replace(USER_ID, id.toString())).delete());
+//        onlineUserCache.forEach((id, s) -> redissonClient.getBucket(LIVE_USER_ROOM.replace(USER_ID, id.toString())).delete());
         //删除直播间所有用户数据
         roomTotalUserCache.delete();
         //删除在线用户数据
         onlineUserCache.delete();
+        //删除当前主讲人最后一次进入房间的ip
+        redissonClient.getBucket(LIVE_USER_LAST_CLIENT_IP.replace(ROOM_UID, roomUid).replace(USER_ID, speakerId.toString())).deleteAsync();
+
+        //删除当前主讲人最后一次心跳
+        redissonClient.getBucket(LIVE_ROOM_HEART_BEAT.replace(ROOM_UID, roomUid)).deleteAsync();
+        //删除房间全部人员缓存
+        this.getTotalUserCache(roomUid).deleteAsync();
+        //删除该临时直播间列表
+        RMap<Long, String> map = redissonClient.getMap(TEACHER_TEMP_LIVE_ROOM);
+        map.remove(speakerId);
     }
 
 
@@ -2264,8 +2271,6 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
         this.sendOnlineUserCount(roomUid, userId, onlineUserCache.size());
         log.info("join sendOnlineUserCount>>>> param is null   roomUid: {}  fromUserId:{}  count:{}", roomUid, userId, onlineUserCache.size());
         log.info("joinRoom>>>> userInfo: {}", userJsonStr);
-        //记录当前用户对应的房间uid
-        redissonClient.getBucket(LIVE_USER_ROOM.replace(USER_ID, userId.toString())).set(roomUid, 2L, TimeUnit.DAYS);
         return new Date().getTime();
     }
 
@@ -2299,7 +2304,7 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
     }
 
     @Override
-    public void closeLive(String roomUid, Integer userId, String sequence) {
+    public void closeLive(String roomUid, Long userId, String sequence) {
         //查询房间主播信息
         RBucket<RoomSpeakerInfo> speakerCache = getRoomSpeakerInfoCache(roomUid, userId.toString());
         if (!speakerCache.isExists()) {