liujc преди 2 години
родител
ревизия
e2024bd764

+ 18 - 4
cooleshow-user/user-biz/src/main/java/com/yonge/cooleshow/biz/dal/entity/ImRoomMessage.java

@@ -6,13 +6,27 @@ package com.yonge.cooleshow.biz.dal.entity;
  */
 public class ImRoomMessage extends BaseMessage {
 
-    //objectName 类型-观看者退出房间
-    public static final String RC_CHATROOM_LEAVE = "RC:Chatroom:Leave";
-    //objectName 类型-观看者数量
-    public static final String MEMBER_COUNT = "RC:Chatroom:MemberCountUp";
     //objectName 类型-将所有人强制踢出房间
     public static final String FORCED_OFFLINE = "RC:ForcedOffline";
 
+    //objectName 类型-观看者退出房间
+    public static final String LOOKER_LOGIN_OUT = "RC:LookerLoginOut";
+
+    //objectName 类型-观看者数量-该消息只有主播端接
+    public static final String MEMBER_COUNT = "RC:Chatroom:MemberCountUp";
+
+    //objectName 类型-商品变更
+    public static final String LIVE_GOODS_CHANGE = "DY:LIVE_GOODS_CHANGE";
+
+    //objectName 类型-在黑名单中添加该用户
+    public static final String BLOCK_BLACK_USER = "RC:BLOCK_BLACK_USER";
+
+    //objectName 类型-在黑名单中解除该用户
+    public static final String UNBLOCK_BLACK_USER = "RC:UNBLOCK_BLACK_USER";
+
+    //objectName 类型-用户点赞数同步
+    public static final String LIKES_COUNT = "RC:Chatroom:LikeCount";
+
     private String serviceProvider;
     /**
      * 消息类型

+ 23 - 0
cooleshow-user/user-biz/src/main/java/com/yonge/cooleshow/biz/dal/mapper/ImLiveBroadcastRoomMemberMapper.java

@@ -52,4 +52,27 @@ public interface ImLiveBroadcastRoomMemberMapper extends BaseMapper<ImLiveBroadc
     List<ImLiveBroadcastRoomMemberWrapper.ImLiveBroadcastRoomMember> selectStudentSubject(@Param("studentIds") List<Integer> studentIds);
 
     IPage<ImLiveBroadcastRoomMemberWrapper.ImLiveBroadcastRoomMember> queryMemberPage(Page<ImLiveBroadcastRoomMemberWrapper.ImLiveBroadcastRoomMember> pageInfo, @Param("param") Map<String, Object> param);
+
+    int insertBatch(@Param("entities") List<ImLiveBroadcastRoomMember> entities);
+
+    /**
+     * 设置在线状态
+     *
+     * @param userIds      用户id
+     * @param groupId      房间编号
+     * @param onlineStatus 状态
+     */
+    void updateOnlineStatus(@Param("userIds") List<Long> userIds, @Param("groupId") String groupId, @Param(
+            "onlineStatus") Integer onlineStatus);
+    /**
+     * 设置直播状态
+     *
+     * @param userIds      用户id
+     * @param groupId      房间编号
+     * @param liveRoomStatus 状态
+     */
+    void updateLiveRoomStatus(@Param("userIds") List<Long> userIds, @Param("groupId") String groupId, @Param(
+            "liveRoomStatus") Integer liveRoomStatus);
+
+    List<ImLiveBroadcastRoomMember> queryMember(@Param("userIds") List<Long> userIds, @Param("groupId") String groupId);
 }

+ 108 - 21
cooleshow-user/user-biz/src/main/java/com/yonge/cooleshow/biz/dal/service/impl/LiveRoomServiceImpl.java

@@ -19,6 +19,7 @@ import java.util.stream.Collectors;
 
 import com.alibaba.fastjson.JSON;
 import com.fasterxml.jackson.annotation.JsonFormat;
+import com.google.common.collect.Lists;
 import com.microsvc.toolkit.middleware.live.LivePluginContext;
 import com.microsvc.toolkit.middleware.live.LivePluginService;
 import com.microsvc.toolkit.middleware.live.impl.RongCloudLivePlugin;
@@ -760,24 +761,6 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
     }
 
     /**
-     * 销毁房间-聊天室
-     *
-     * @param roomId 房间Uid
-     */
-    private void ImDestroyLiveRoom(String roomId) {
-        try {
-            //删除服务器房间
-            IMApiResultInfo resultInfo = imHelper.deleteChrm(Collections.singletonList(roomId));
-            if (!resultInfo.isSuccess()) {
-                log.error("destroyLiveRoom error" + resultInfo.getErrorMessage());
-            }
-            log.info("destroyLiveRoom success: {}", roomId);
-        } catch (Exception e) {
-            log.error("destroyLiveRoom error" + e.getMessage());
-        }
-    }
-
-    /**
      * <p>主讲人处理进入和退出房间数据
      * <p>观看者只处理退出房间数据
      *
@@ -896,7 +879,7 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
             ImRoomMessage message = new ImRoomMessage();
             message.setFromUserId(userId.toString());
             message.setToChatroomId(roomUid);
-            message.setObjectName(ImRoomMessage.RC_CHATROOM_LEAVE);
+            message.setObjectName(ImRoomMessage.LOOKER_LOGIN_OUT);
             message.setContent(userId);
             try {
                 this.publishRoomMessage(message);
@@ -1487,18 +1470,31 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
                 .map(o -> Long.parseLong(o.getMemberAccount()))
                 .collect(Collectors.toList());
 
+        List<ImLiveBroadcastRoomMember> imLiveBroadcastRoomMembers = imLiveBroadcastRoomMemberMapper.queryMember(userIds,
+                callbackOnMemberStateChange.getGroupId());
+
+        if (CollectionUtils.isEmpty(imLiveBroadcastRoomMembers)) {
+            return;
+        }
+
+        // 根据用户id分组
+        Map<Long, ImLiveBroadcastRoomMember> userMap = imLiveBroadcastRoomMembers.stream()
+                .collect(Collectors.toMap(ImLiveBroadcastRoomMember::getUserId,o ->o, (k1, k2) -> k1));
         List<ImUserStateSync> imUserStates = new ArrayList<>();
         for (Long userId : userIds) {
 
+            Integer liveRoomStatus = userMap.getOrDefault(userId, new ImLiveBroadcastRoomMember())
+                    .getLiveRoomStatus();
             ImUserStateSync imUserState = new ImUserStateSync();
             imUserState.setUserid(userId.toString());
-            imUserState.setStatus(onlineStatus == 1?"0":"1");
+            imUserState.setStatus(onlineStatus == 1 && liveRoomStatus == 1?"0":"1");
             imUserState.setOs(callbackOnMemberStateChange.getOptPlatform());
             imUserState.setTime(new Date().getTime());
             imUserState.setRoomUid(callbackOnMemberStateChange.getGroupId());
             imUserStates.add(imUserState);
         }
         opsRoom(imUserStates);
+        imLiveBroadcastRoomMemberMapper.updateOnlineStatus(userIds, callbackOnMemberStateChange.getGroupId(),onlineStatus);
     }
 
     @Override
@@ -1507,6 +1503,7 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
         if (callbackAfterMemberExit == null) {
             return;
         }
+        Integer liveRoomStatus = 0;
         // 用户id
         if (CollectionUtils.isEmpty(callbackAfterMemberExit.getExitMemberList())) {
             return;
@@ -1528,6 +1525,7 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
             imUserStates.add(imUserState);
         }
         opsRoom(imUserStates);
+        imLiveBroadcastRoomMemberMapper.updateLiveRoomStatus(userIds, callbackAfterMemberExit.getGroupId(),liveRoomStatus);
     }
 
     @Override
@@ -1536,6 +1534,7 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
         if (callbackAfterNewMemberJoin == null) {
             return;
         }
+        Integer liveRoomStatus = 1;
 
         // 用户id
         if (CollectionUtils.isEmpty(callbackAfterNewMemberJoin.getNewMemberList())) {
@@ -1546,17 +1545,30 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
                 .map(o -> Long.parseLong(o.getMemberAccount()))
                 .collect(Collectors.toList());
 
+
+        List<ImLiveBroadcastRoomMember> imLiveBroadcastRoomMembers = imLiveBroadcastRoomMemberMapper.queryMember(userIds,
+                callbackAfterNewMemberJoin.getGroupId());
+
+        if (CollectionUtils.isEmpty(imLiveBroadcastRoomMembers)) {
+            return;
+        }
+
+        // 根据用户id分组
+        Map<Long, ImLiveBroadcastRoomMember> userMap = imLiveBroadcastRoomMembers.stream()
+                .collect(Collectors.toMap(ImLiveBroadcastRoomMember::getUserId,o ->o, (k1, k2) -> k1));
+
         List<ImUserStateSync> imUserStates = new ArrayList<>();
         for (Long userId : userIds) {
             ImUserStateSync imUserState = new ImUserStateSync();
             imUserState.setUserid(userId.toString());
-            imUserState.setStatus("0");
+            imUserState.setStatus(userMap.getOrDefault(userId, new ImLiveBroadcastRoomMember()).getOnlineStatus() == 1 ? "0" : "1");
             imUserState.setOs(callbackAfterNewMemberJoin.getOptPlatform());
             imUserState.setTime(callbackAfterNewMemberJoin.getEventTime().atZone(ZoneId.systemDefault()).toEpochSecond());
             imUserState.setRoomUid(callbackAfterNewMemberJoin.getGroupId());
             imUserStates.add(imUserState);
         }
         opsRoom(imUserStates);
+        imLiveBroadcastRoomMemberMapper.updateLiveRoomStatus(userIds, callbackAfterNewMemberJoin.getGroupId(),liveRoomStatus);
     }
 
     // 定时任务凌晨2点,关闭腾讯直播间, 融云直播间自动关闭,不做处理
@@ -1623,6 +1635,9 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
         log.error("roomDestroy>>>> room : {}", JSONObject.toJSONString(room));
         String roomUid = room.getRoomUid();
 
+        //获取所有直播间缓存数据并写入数据库后并清理缓存
+        CompletableFuture.runAsync(() -> insertAndCleanLiveData(roomUid, room.getSpeakerId()));
+        log.info("roomDestroy>>>> insertAndCleanLiveData {}", JSONObject.toJSONString(room));
         //将房间状态改为已销毁
         LiveRoom liveRoomUpdate = new LiveRoom();
         liveRoomUpdate.setId(room.getId());
@@ -1678,6 +1693,78 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
         return true;
     }
 
+    //获取该直播间所有数据写入数据库-并清理缓存
+    private void insertAndCleanLiveData(String roomUid, Long speakerId) {
+        log.info("insertAndCleanLiveData >>>> roomUid : {}", roomUid);
+        Date now = new Date();
+        //总观看人数
+        List<ImLiveBroadcastRoomMember> memberList = new ArrayList<>();
+        //获取直播间所有人数据写入 im_live_broadcast_room_member
+        RMap<Long, String> roomTotalUserCache = getTotalUserCache(roomUid);
+        if (roomTotalUserCache.isExists()) {
+            List<ImLiveBroadcastRoomMemberWrapper.RoomUserInfoVo> roomTotalUser = roomTotalUserCache.values().parallelStream()
+                    .map(a -> JSONObject.toJavaObject(JSONObject.parseObject(a), ImLiveBroadcastRoomMemberWrapper.RoomUserInfoVo.class))
+                    .collect(Collectors.toList());
+            for (ImLiveBroadcastRoomMemberWrapper.RoomUserInfoVo v : roomTotalUser) {
+                ImLiveBroadcastRoomMember member = new ImLiveBroadcastRoomMember();
+                member.setRoomUid(roomUid);
+                member.setUserId(v.getUserId());
+                member.setOnlineStatus(0);
+                member.setLiveRoomStatus(0);
+                member.setJoinTime(v.getFirstJoinTime());
+                member.setTotalTime(getLookMinutes(v.getDynamicLookTime(), now, v.getTotalViewTime()));
+                memberList.add(member);
+            }
+        }
+
+        //获取直播点赞
+        int like = 0;
+        like = syncLikeCount(roomUid);
+        RBucket<Object> likeCache = redissonClient.getBucket(LIVE_ROOM_LIKE.replace(ROOM_UID, roomUid));
+        if (likeCache.isExists()) {
+            //删除房间点赞数据
+            likeCache.delete();
+        }
+        int speakerLiveTime = 0;
+        //获取直播间主讲人信息 写入im_live_broadcast_room_data
+        RBucket<RoomSpeakerInfo> speakerCache = getRoomSpeakerInfoCache(roomUid, speakerId.toString());
+        if (speakerCache.isExists()) {
+            LiveBroadcastRoomData liveData = new LiveBroadcastRoomData();
+            RoomSpeakerInfo speakerInfo = speakerCache.get();
+            log.info("insertAndCleanLiveData >>>> speakerInfo : {}", JSONObject.toJSONString(speakerInfo));
+            liveData.setRoomUid(roomUid);
+            liveData.setLikeNum(like);
+            liveData.setTotalUserNum(CollectionUtils.isNotEmpty(memberList) ? memberList.size() : 0);
+            liveData.setUpdatedTime(now);
+            liveData.setLiveTime(getLookMillisecond(speakerInfo.getStartLiveTime(), now, speakerInfo.getTotalLiveTime()));
+            liveBroadcastRoomDataService.save(liveData);
+            //删除房间主讲人数据
+            speakerCache.delete();
+            //获取主讲人直播时长
+            speakerLiveTime = liveData.getLiveTime();
+        }
+
+        //写入im_live_broadcast_room_member表,校验用户观看时长,不能大于主讲人直播时长
+        if (CollectionUtils.isNotEmpty(memberList)) {
+            for (ImLiveBroadcastRoomMember member : memberList) {
+                if (member.getTotalTime() > speakerLiveTime) {
+                    member.setTotalTime(speakerLiveTime);
+                }
+            }
+            //添加人员数据
+            Lists.partition(memberList, 500)
+                    .forEach(list -> imLiveBroadcastRoomMemberMapper.insertBatch(list));
+        }
+        //获取在线人员信息
+        RMap<Long, String> onlineUserCache = getOnlineUserCache(roomUid);
+        //删除人员对应直播间编号信息
+        onlineUserCache.forEach((id, s) -> redissonClient.getBucket(LIVE_USER_ROOM.replace(USER_ID, id.toString())).delete());
+        //删除直播间所有用户数据
+        roomTotalUserCache.delete();
+        //删除在线用户数据
+        onlineUserCache.delete();
+    }
+
 
     /**
      * 更新主播直播间状态

+ 30 - 0
cooleshow-user/user-biz/src/main/resources/config/mybatis/ImLiveBroadcastRoomMemberMapper.xml

@@ -98,4 +98,34 @@
         group by a.user_id_
         ORDER BY a.join_time_
     </select>
+
+
+    <insert id="insertBatch" keyColumn="id_" keyProperty="id" useGeneratedKeys="true">
+        insert into im_live_broadcast_room_member(room_uid_, user_id_, join_time_, total_time_,online_status_,live_room_status_) values
+        <foreach collection="entities" item="entity" separator=",">
+            (#{entity.roomUid}, #{entity.userId}, #{entity.joinTime}, #{entity.totalTime},0,0)
+        </foreach>
+        ON DUPLICATE KEY UPDATE join_time_ = VALUES(join_time_), total_time_ = VALUES(total_time_),online_status_ = 0,live_room_status_ = 0
+    </insert>
+
+
+    <update id="updateOnlineStatus">
+        update im_live_broadcast_room_member
+        set online_status_ = #{onlineStatus}
+        where room_uid_ = #{groupId} and user_id_ in <foreach collection="userIds" item="userId" open="(" separator="," close=")">#{userId}</foreach>
+    </update>
+
+    <update id="updateLiveRoomStatus">
+        update im_live_broadcast_room_member
+        set live_room_status_ = #{liveRoomStatus}
+        where room_uid_ = #{groupId} and user_id_ in <foreach collection="userIds" item="userId" open="(" separator="," close=")">#{userId}</foreach>
+
+    </update>
+
+
+    <select id="queryMember" resultType="com.yonge.cooleshow.biz.dal.entity.ImLiveBroadcastRoomMember">
+        select * from im_live_broadcast_room_member
+        where room_uid_ = #{groupId} and user_id_ in <foreach collection="userIds" item="userId" open="(" separator="," close=")">#{userId}</foreach>
+
+    </select>
 </mapper>