Browse Source

增加:
定时销毁房间方法及定时任务

hgw 3 years ago
parent
commit
37e9dd5b91

+ 2 - 0
mec-biz/src/main/java/com/ym/mec/biz/service/ImLiveBroadcastRoomService.java

@@ -33,6 +33,8 @@ public interface ImLiveBroadcastRoomService extends IService<ImLiveBroadcastRoom
 
     void delete(Integer id);
 
+    void destroyExpiredLiveRoom();
+
     void syncLike(String roomUid, Integer likeNum);
 
     void quitRoom(List<ImUserState> userState);

+ 137 - 41
mec-biz/src/main/java/com/ym/mec/biz/service/impl/ImLiveBroadcastRoomServiceImpl.java

@@ -42,6 +42,7 @@ import org.springframework.stereotype.Service;
 
 import java.io.Serializable;
 import java.util.*;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 /**
@@ -206,7 +207,59 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
     }
 
     /**
-     * 关闭房间
+     * 定时任务,每分钟执行
+     * 销毁主讲人退出超过30分钟的直播间
+     */
+    public void destroyExpiredLiveRoom() {
+        //查询已经开始并且没有删除及销毁的直播间
+        List<ImLiveBroadcastRoom> list = this.list(new WrapperUtil<ImLiveBroadcastRoom>()
+                .hasEq("live_state_", 1)
+                .hasEq("room_state_", 0)
+                .queryWrapper());
+        if (CollectionUtils.isEmpty(list)) {
+            return;
+        }
+        Date now = new Date();
+        list.forEach(room -> {
+            //过期时间= 房间正式开始时间+30分钟
+            Date expiredTime = DateUtil.addMinutes(room.getCreatedTime(), 30);
+            // 现在 大于等于 过期时间
+            if (now.getTime() >= expiredTime.getTime()) {
+                //获取直播间主讲人信息
+                RBucket<RoomSpeakerInfo> speakerCache = redissonClient.getBucket(LIVE_SPEAKER_INFO.replace(USER_ID, room.getSpeakerId().toString()));
+                if (speakerCache.isExists()) {
+                    RoomSpeakerInfo speakerInfo = speakerCache.get();
+                    //超过30分钟,没有进入房间
+                    if (Objects.isNull(speakerInfo.getJoinRoomTime())) {
+                        roomDestroy(room.getId());
+                    }
+                    //超过30分钟,但是未开启直播,则销毁
+                    if (Objects.isNull(speakerInfo.getState())) {
+                        roomDestroy(room.getId());
+                    }
+                    //现在 大于 (传入时间+30分钟) 则销毁
+                    Consumer<Date> consumer = date -> {
+                        Date comparedTime = DateUtil.addMinutes(date, 30);
+                        if (now.getTime() >= comparedTime.getTime()) {
+                            roomDestroy(room.getId());
+                        }
+                    };
+                    //如果直播状态=1(关闭直播),并且结束直播时间 +30分钟 大于等于 现在,则销毁
+                    if (speakerInfo.getState() == 1) {
+                        consumer.accept(speakerInfo.getEndLiveTime());
+                    }
+                    //退出时间 +30分钟 大于等于 现在,则销毁
+                    if (Objects.nonNull(speakerInfo.getExitRoomTime())) {
+                        consumer.accept(speakerInfo.getExitRoomTime());
+                    }
+                }
+            }
+        });
+    }
+
+    /**
+     * 关闭房间-融云
+     * 获取所有直播间缓存数据并写入数据库后并清理缓存
      *
      * @param id 直播房间表id
      */
@@ -219,15 +272,19 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
         if (room.getLiveState() == 0) {
             throw new BizException("直播未开始");
         }
+
+        String roomUid = room.getRoomUid();
+        Integer speakerId = room.getSpeakerId();
         try {
-            imFeignService.destroyLiveRoom(room.getRoomUid());
+            imFeignService.destroyLiveRoom(roomUid);
         } catch (Exception e) {
             log.error(e.getMessage(), e.getCause());
             throw new BizException(e.getMessage());
         }
-        //获取所有直播间缓存数据并写入数据库
-        insertLiveAllData(room.getRoomUid());
-        //todo 删除直播间缓存数据
+
+        //获取所有直播间缓存数据并写入数据库后并清理缓存
+        insertAndCleanLiveData(roomUid, speakerId);
+        log.info("roomDestroy>>>> insertAndCleanLiveData {}", JSONObject.toJSONString(room));
 
         //将房间状态改为已销毁
         Date date = new Date();
@@ -240,44 +297,69 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
 
         //向聊天室发自定义消息踢出所有人
         ImRoomMessage message = new ImRoomMessage();
-        message.setFromUserId(room.getSpeakerId().toString());
-        message.setToChatroomId(room.getRoomUid());
+        message.setFromUserId(speakerId.toString());
+        message.setToChatroomId(roomUid);
         message.setObjectName(ImRoomMessage.FORCED_OFFLINE);
         imFeignService.publishRoomMsg(message);
+        log.info("roomDestroy>>>> FORCED_OFFLINE {}", JSONObject.toJSONString(message));
     }
 
-    //获取该直播间所有数据
-    private void insertLiveAllData(String roomUid) {
+    //获取该直播间所有数据写入数据库-并清理缓存
+    private void insertAndCleanLiveData(String roomUid, Integer speakerId) {
+        //总观看人数
+        int totalLookNum = 0;
         //获取直播间所有人数据写入 im_live_broadcast_room_member
-        List<RoomUserInfoVo> roomTotalUser = queryTotalRoomUserInfo(roomUid);
-        List<ImLiveBroadcastRoomMember> memberList = new ArrayList<>();
-        roomTotalUser.forEach(v -> {
-            ImLiveBroadcastRoomMember member = new ImLiveBroadcastRoomMember();
-            member.setTenantId(v.getTenantId());
-            member.setRoomUid(roomUid);
-            member.setUserId(v.getUserId());
-            member.setJoinTime(v.getFirstJoinTime());
-            member.setTotalTime(v.getTotalViewTime());
-            memberList.add(member);
-        });
-        if (CollectionUtils.isNotEmpty(memberList)) {
-            liveBroadcastRoomMemberService.getDao().insertBatch(memberList);
-        }
-
-        //获取直播间数据
-        ImLiveBroadcastRoomVo roomVo = queryRoomInfo(roomUid);
-        //获取直播间主讲人信息
-        RBucket<RoomSpeakerInfo> speakerCache = redissonClient.getBucket(LIVE_SPEAKER_INFO.replace(USER_ID, roomVo.getSpeakerId().toString()));
-        RoomSpeakerInfo speakerInfo = speakerCache.get();
-        //获取直播间数据写入im_live_broadcast_room_data
-        ImLiveBroadcastRoomData liveData = new ImLiveBroadcastRoomData();
-        liveData.setTenantId(speakerInfo.getTenantId());
-        liveData.setRoomUid(roomUid);
-        liveData.setLikeNum(roomVo.getLikeNum());
-        liveData.setTotalUserNum(roomVo.getTotalLookNum());
-        liveData.setUpdatedTime(new Date());
-        liveData.setLiveTime(speakerInfo.getTotalLiveTime());
-        liveBroadcastRoomDataService.save(liveData);
+        RMap<Integer, RoomUserInfoVo> roomTotalUserCache = redissonClient.getMap(LIVE_ROOM_TOTAL_USER_LIST.replace(ROOM_UID, roomUid));
+        if (roomTotalUserCache.isExists()) {
+            List<RoomUserInfoVo> roomTotalUser = new ArrayList<>(roomTotalUserCache.values());
+            List<ImLiveBroadcastRoomMember> memberList = new ArrayList<>();
+            roomTotalUser.forEach(v -> {
+                ImLiveBroadcastRoomMember member = new ImLiveBroadcastRoomMember();
+                member.setTenantId(v.getTenantId());
+                member.setRoomUid(roomUid);
+                member.setUserId(v.getUserId());
+                member.setJoinTime(v.getFirstJoinTime());
+                member.setTotalTime(v.getTotalViewTime());
+                memberList.add(member);
+            });
+            if (CollectionUtils.isNotEmpty(memberList)) {
+                liveBroadcastRoomMemberService.getDao().insertBatch(memberList);
+                totalLookNum = memberList.size();
+            }
+            //删除用户对应的直播间关系缓存
+            roomTotalUser.stream()
+                    .map(RoomUserInfoVo::getUserId)
+                    .filter(Objects::nonNull)
+                    .forEach(id -> redissonClient.getBucket(LIVE_USER_ROOM.replace(USER_ID, id.toString())).delete());
+            //删除直播间所有人数据
+            roomTotalUserCache.clear();
+        }
+
+        //获取直播点赞
+        int like = 0;
+        RBucket<Object> likeCache = redissonClient.getBucket(LIVE_ROOM_LIKE.replace(ROOM_UID, roomUid));
+        if (likeCache.isExists()) {
+            like = (int) likeCache.get();
+            //删除房间点赞数据
+            likeCache.delete();
+        }
+
+        //获取直播间主讲人信息 写入im_live_broadcast_room_data
+        RBucket<RoomSpeakerInfo> speakerCache = redissonClient.getBucket(LIVE_SPEAKER_INFO.replace(USER_ID, speakerId.toString()));
+        if (speakerCache.isExists()) {
+            ImLiveBroadcastRoomData liveData = new ImLiveBroadcastRoomData();
+            RoomSpeakerInfo speakerInfo = speakerCache.get();
+            liveData.setTenantId(speakerInfo.getTenantId());
+            liveData.setRoomUid(roomUid);
+            liveData.setLikeNum(like);
+            liveData.setTotalUserNum(totalLookNum);
+            liveData.setUpdatedTime(new Date());
+            liveData.setLiveTime(speakerInfo.getTotalLiveTime());
+            liveBroadcastRoomDataService.save(liveData);
+            //删除房间主讲人数据
+            speakerCache.delete();
+        }
+
     }
 
     /**
@@ -342,6 +424,7 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
                 message.setToChatroomId(roomUid);
                 message.setObjectName(ImRoomMessage.LOOKER_LOGIN_OUT);
                 imFeignService.publishRoomMsg(message);
+                log.info("quitRoom>>>> LOOKER_LOGIN_OUT : {}", JSONObject.toJSONString(userInfo));
             }
             //每次退出房间计算当前用户观看时长
             int minutesBetween = getMinutesBetween(userInfo.getDynamicJoinTime(), now);
@@ -454,9 +537,11 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
             imFeignService.stopRecord(roomSpeakerInfo.getRoomUid());
         }
         if (stateFlag) {
+            Date now = new Date();
+            roomSpeakerInfo.setEndLiveTime(now);
             roomSpeakerInfo.setState(1);
             //计算时长
-            int minutesBetween = getMinutesBetween(roomSpeakerInfo.getStartLiveTime(), new Date());
+            int minutesBetween = getMinutesBetween(roomSpeakerInfo.getStartLiveTime(), now);
             int i = Objects.isNull(roomSpeakerInfo.getTotalLiveTime()) ? 0 : roomSpeakerInfo.getTotalLiveTime();
             roomSpeakerInfo.setTotalLiveTime(i + minutesBetween);
         }
@@ -626,7 +711,7 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
     }
 
     /**
-     * 查询直播间用户信息
+     * 查询在观看直播的用户信息
      *
      * @param roomUid 直播间uid
      */
@@ -638,7 +723,7 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
 
     public List<RoomUserInfoVo> queryRoomUserInfo(List<RoomUserInfoVo> totalUserInfo) {
         return totalUserInfo.stream()
-                .filter(o -> o.getState() == 0)
+                .filter(o -> Objects.nonNull(o.getState()) && o.getState() == 0)
                 .collect(Collectors.toList());
     }
 
@@ -700,6 +785,9 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
         //开始直播时间
         @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
         private Date startLiveTime;
+        //开始直播时间
+        @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
+        private Date endLiveTime;
         //退出房间时间
         @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
         private Date exitRoomTime;
@@ -797,6 +885,14 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
         public void setTenantId(Integer tenantId) {
             this.tenantId = tenantId;
         }
+
+        public Date getEndLiveTime() {
+            return endLiveTime;
+        }
+
+        public void setEndLiveTime(Date endLiveTime) {
+            this.endLiveTime = endLiveTime;
+        }
     }
 
 }

+ 6 - 0
mec-client-api/src/main/java/com/ym/mec/task/TaskRemoteService.java

@@ -248,4 +248,10 @@ public interface TaskRemoteService {
      */
     @GetMapping("task/createLiveRoom")
     void createLiveRoom();
+
+    /**
+     * 每分钟-查询是否有直播间需要销毁
+     */
+    @GetMapping("task/destroyExpiredLiveRoom")
+    void destroyExpiredLiveRoom();
 }

+ 5 - 0
mec-client-api/src/main/java/com/ym/mec/task/fallback/TaskRemoteServiceFallback.java

@@ -299,4 +299,9 @@ public class TaskRemoteServiceFallback implements TaskRemoteService {
     public void createLiveRoom() {
         logger.error("直播间创建失败");
     }
+
+    @Override
+    public void destroyExpiredLiveRoom() {
+        logger.error("销毁直播间失败");
+    }
 }

+ 22 - 0
mec-task/src/main/java/com/ym/mec/task/jobs/DestroyExpiredLiveRoomTask.java

@@ -0,0 +1,22 @@
+package com.ym.mec.task.jobs;
+
+import com.ym.mec.task.TaskRemoteService;
+import com.ym.mec.task.core.BaseTask;
+import com.ym.mec.task.core.TaskException;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * @author hgw
+ * Created by 2022-03-04
+ */
+public class DestroyExpiredLiveRoomTask extends BaseTask {
+
+    @Autowired
+    private TaskRemoteService taskRemoteService;
+
+    @Override
+    public void execute() throws TaskException {
+        taskRemoteService.destroyExpiredLiveRoom();
+    }
+
+}

+ 6 - 0
mec-web/src/main/java/com/ym/mec/web/controller/TaskController.java

@@ -573,4 +573,10 @@ public class TaskController extends BaseController {
     public void createLiveRoom(){
         imLiveBroadcastRoomService.createLiveRoom();
     }
+
+    @ApiOperation("每分钟-查询是否有直播间需要销毁")
+    @GetMapping(value = "/destroyExpiredLiveRoom")
+    public void destroyExpiredLiveRoom(){
+        imLiveBroadcastRoomService.destroyExpiredLiveRoom();
+    }
 }