|
@@ -17,6 +17,7 @@ import com.ym.mec.biz.dal.entity.ImLiveBroadcastRoom;
|
|
|
import com.ym.mec.biz.dal.entity.ImLiveBroadcastRoomData;
|
|
|
import com.ym.mec.biz.dal.entity.ImLiveBroadcastRoomMember;
|
|
|
import com.ym.mec.biz.dal.enums.MessageTypeEnum;
|
|
|
+import com.ym.mec.biz.dal.vo.BaseRoomUserVo;
|
|
|
import com.ym.mec.biz.dal.vo.ImLiveBroadcastRoomVo;
|
|
|
import com.ym.mec.biz.dal.vo.RoomUserInfoVo;
|
|
|
import com.ym.mec.biz.service.*;
|
|
@@ -33,6 +34,7 @@ import com.ym.mec.util.http.HttpUtil;
|
|
|
import org.apache.commons.collections.CollectionUtils;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.redisson.api.RBucket;
|
|
|
+import org.redisson.api.RLock;
|
|
|
import org.redisson.api.RMap;
|
|
|
import org.redisson.api.RedissonClient;
|
|
|
import org.slf4j.Logger;
|
|
@@ -44,9 +46,11 @@ import org.springframework.transaction.annotation.Transactional;
|
|
|
|
|
|
import java.io.Serializable;
|
|
|
import java.util.*;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.function.BiConsumer;
|
|
|
import java.util.function.BiFunction;
|
|
|
+import java.util.function.Function;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
@@ -81,6 +85,8 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
|
|
|
|
|
|
//直播间累计用户信息-指只要进入到该房间的用户都要记录
|
|
|
public static final String LIVE_ROOM_TOTAL_USER_LIST = "IM:LIVE_ROOM_TOTAL_USER_LIST:" + ROOM_UID;
|
|
|
+ //直播间在线用户信息
|
|
|
+ public static final String LIVE_ROOM_ONLINE_USER_LIST = "IM:LIVE_ROOM_ONLINE_USER_LIST:" + ROOM_UID;
|
|
|
//主讲人信息
|
|
|
public static final String LIVE_SPEAKER_INFO = "IM:LIVE_SPEAKER_INFO:" + USER_ID;
|
|
|
//用户对应的直播间Uid
|
|
@@ -89,6 +95,8 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
|
|
|
public static final String LIVE_USER_STATE_TIME = "IM:LIVE_USER_STATE_TIME:" + USER_ID;
|
|
|
//房间点赞数
|
|
|
public static final String LIVE_ROOM_LIKE = "IM:LIVE_ROOM_LIKE:" + ROOM_UID;
|
|
|
+ //计算人员观看时长锁
|
|
|
+ public static final String LIVE_LOOK_LOCK = "IM:LIVE_LOOK_LOCK:" + ROOM_UID;
|
|
|
//直播提前开始时间
|
|
|
public static final int PRE_LIVE_TIME_MINUTE = 30;
|
|
|
|
|
@@ -135,15 +143,20 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
|
|
|
*/
|
|
|
@Override
|
|
|
public ImLiveBroadcastRoomVo queryRoomInfo(String roomUid) {
|
|
|
+ ImLiveBroadcastRoomVo roomVo = getImLiveBroadcastRoomVo(roomUid);
|
|
|
+ if (roomVo == null) return null;
|
|
|
+ getRoomData(roomVo);
|
|
|
+ return roomVo;
|
|
|
+ }
|
|
|
+
|
|
|
+ private ImLiveBroadcastRoomVo getImLiveBroadcastRoomVo(String roomUid) {
|
|
|
List<ImLiveBroadcastRoomVo> list = baseMapper.queryPage(new HashMap<String, Object>() {{
|
|
|
put("roomUid", roomUid);
|
|
|
}});
|
|
|
if (CollectionUtils.isEmpty(list)) {
|
|
|
return null;
|
|
|
}
|
|
|
- ImLiveBroadcastRoomVo roomVo = list.get(0);
|
|
|
- getRoomData(roomVo);
|
|
|
- return roomVo;
|
|
|
+ return list.get(0);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -237,6 +250,7 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
|
|
|
obj.setRoomState(1);
|
|
|
obj.setUpdatedBy(getSysUser().getId());
|
|
|
obj.setUpdatedTime(new Date());
|
|
|
+ obj.setPopularize(0);
|
|
|
this.updateById(obj);
|
|
|
}
|
|
|
|
|
@@ -267,6 +281,7 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
|
|
|
if (popularize == 1) {
|
|
|
this.update(Wrappers.<ImLiveBroadcastRoom>lambdaUpdate()
|
|
|
.set(ImLiveBroadcastRoom::getPopularize, 0)
|
|
|
+ .eq(ImLiveBroadcastRoom::getRoomState, 0)
|
|
|
.eq(ImLiveBroadcastRoom::getTenantId, obj.getTenantId()));
|
|
|
}
|
|
|
//更新直播间推广状态
|
|
@@ -372,10 +387,13 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
|
|
|
if (room.getLiveState() == 2) {
|
|
|
throw new BizException("直播已经结束,请刷新数据!");
|
|
|
}
|
|
|
+ //销毁房间
|
|
|
roomDestroy(room);
|
|
|
}
|
|
|
|
|
|
public void roomDestroy(ImLiveBroadcastRoom room) {
|
|
|
+ //销毁房间要先关闭直播
|
|
|
+ closeLive(room.getRoomUid(),room.getSpeakerId());
|
|
|
//10秒内同一个房间不能重复销毁-防重复销毁
|
|
|
RBucket<Object> bucket = redissonClient.getBucket("IM:ROOMDESTROY:" + room.getRoomUid());
|
|
|
if (!bucket.trySet(1, 10, TimeUnit.SECONDS)) {
|
|
@@ -425,7 +443,7 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
|
|
|
//总观看人数
|
|
|
List<ImLiveBroadcastRoomMember> memberList = new ArrayList<>();
|
|
|
//获取直播间所有人数据写入 im_live_broadcast_room_member
|
|
|
- RMap<Integer, RoomUserInfoVo> roomTotalUserCache = redissonClient.getMap(LIVE_ROOM_TOTAL_USER_LIST.replace(ROOM_UID, roomUid));
|
|
|
+ RMap<Integer, RoomUserInfoVo> roomTotalUserCache = getTotalUserCache(roomUid);
|
|
|
if (roomTotalUserCache.isExists()) {
|
|
|
List<RoomUserInfoVo> roomTotalUser = new ArrayList<>(roomTotalUserCache.values());
|
|
|
for (RoomUserInfoVo v : roomTotalUser) {
|
|
@@ -556,15 +574,19 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
|
|
|
Integer userId = Integer.valueOf(userid);
|
|
|
|
|
|
//从房间累计用户信息中查询该用户的信息
|
|
|
- RMap<Integer, RoomUserInfoVo> roomTotalUser = redissonClient.getMap(LIVE_ROOM_TOTAL_USER_LIST.replace(ROOM_UID, roomUid));
|
|
|
+ RMap<Integer, RoomUserInfoVo> roomTotalUser = getTotalUserCache(roomUid);
|
|
|
//该房间未查询到用户数据则不处理
|
|
|
if (!roomTotalUser.isExists() && !roomTotalUser.containsKey(userId)) {
|
|
|
return;
|
|
|
}
|
|
|
//查询到用户数据
|
|
|
RoomUserInfoVo userInfo = roomTotalUser.get(userId);
|
|
|
+ //查询在线人员列表
|
|
|
+ RMap<Integer, BaseRoomUserVo> onlineUserInfo = getOnlineUserCache(roomUid);
|
|
|
+ //获取当前用户是否在房间状态 true在 false不在
|
|
|
+ boolean userOnline = onlineUserInfo.isExists() && onlineUserInfo.containsKey(userId);
|
|
|
//用户是在房间的状态 并且 突然离线 - 那么融云会发送用户离线消息-此刻就发送退出房间消息给主讲人
|
|
|
- if (userInfo.getState() == 0 && user.getStatus().equals("1")) {
|
|
|
+ if (userOnline && user.getStatus().equals("1")) {
|
|
|
ImRoomMessage message = new ImRoomMessage();
|
|
|
message.setFromUserId(userId.toString());
|
|
|
message.setToChatroomId(roomUid);
|
|
@@ -582,9 +604,9 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
|
|
|
userInfo.setTotalViewTime(getLookMinutes(userInfo.getDynamicLookTime(), userInfo.getTotalViewTime()));
|
|
|
userInfo.setDynamicLookTime(null);
|
|
|
}
|
|
|
- //记录退出时间 并写入缓存
|
|
|
- userInfo.setState(1);
|
|
|
roomTotalUser.fastPut(userId, userInfo);
|
|
|
+ //从在线人员列表删除该人员
|
|
|
+ onlineUserInfo.fastRemove(userId);
|
|
|
log.info("opsRoom>>>> looker userInfo: {}", JSONObject.toJSONString(userInfo));
|
|
|
});
|
|
|
}
|
|
@@ -623,15 +645,17 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
|
|
|
*/
|
|
|
public void joinRoom(String roomUid, Integer userId) {
|
|
|
//查询房间信息
|
|
|
- ImLiveBroadcastRoomVo imLiveBroadcastRoomVo = queryRoomInfo(roomUid);
|
|
|
+ ImLiveBroadcastRoomVo imLiveBroadcastRoomVo = getImLiveBroadcastRoomVo(roomUid);
|
|
|
if (Objects.isNull(imLiveBroadcastRoomVo)) {
|
|
|
log.info("opsRoom>>>> joinRoom error roomUid: {}", roomUid);
|
|
|
return;
|
|
|
}
|
|
|
//记录用户当前房间uid
|
|
|
redissonClient.getBucket(LIVE_USER_ROOM.replace(USER_ID, userId.toString())).set(roomUid);
|
|
|
+ //在线人员列表
|
|
|
+ RMap<Integer, BaseRoomUserVo> onlineUserInfo = getOnlineUserCache(roomUid);
|
|
|
//房间累计用户信息-指只要进入到该房间的用户都要记录
|
|
|
- RMap<Integer, RoomUserInfoVo> roomTotalUser = redissonClient.getMap(LIVE_ROOM_TOTAL_USER_LIST.replace(ROOM_UID, roomUid));
|
|
|
+ RMap<Integer, RoomUserInfoVo> roomTotalUser = getTotalUserCache(roomUid);
|
|
|
//判断是否第一次进房间
|
|
|
RoomUserInfoVo userInfo;
|
|
|
Date now = new Date();
|
|
@@ -644,8 +668,6 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
|
|
|
userInfo.setFirstJoinTime(now);
|
|
|
userInfo.setTotalViewTime(0);
|
|
|
}
|
|
|
- //0 进入房间
|
|
|
- userInfo.setState(0);
|
|
|
//查询主讲人信息
|
|
|
RBucket<RoomSpeakerInfo> speakerCache = redissonClient.getBucket(LIVE_SPEAKER_INFO.replace(USER_ID, imLiveBroadcastRoomVo.getSpeakerId().toString()));
|
|
|
if (speakerCache.isExists()) {
|
|
@@ -656,6 +678,8 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
|
|
|
}
|
|
|
}
|
|
|
roomTotalUser.fastPut(userId, userInfo);
|
|
|
+ //进入房间写如在线人员列表
|
|
|
+ onlineUserInfo.fastPut(userId, userInfo);
|
|
|
log.info("joinRoom>>>> userInfo: {}", JSONObject.toJSONString(userInfo));
|
|
|
}
|
|
|
|
|
@@ -672,11 +696,11 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
|
|
|
}
|
|
|
RoomSpeakerInfo roomSpeakerInfo = speakerCache.get();
|
|
|
//已是直播状态则直接返回
|
|
|
- if (Objects.nonNull(roomSpeakerInfo.getState()) && roomSpeakerInfo.getState() == 0) {
|
|
|
+ if (intEquals(roomSpeakerInfo.getState(), 0)) {
|
|
|
return;
|
|
|
}
|
|
|
//是否允许录像
|
|
|
- if (Objects.nonNull(roomSpeakerInfo.getWhetherVideo()) && roomSpeakerInfo.getWhetherVideo() == 0) {
|
|
|
+ if (intEquals(roomSpeakerInfo.getWhetherVideo(), 0)) {
|
|
|
//开始录制视频
|
|
|
try {
|
|
|
imFeignService.startRecord(roomUid);
|
|
@@ -690,26 +714,15 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
|
|
|
roomSpeakerInfo.setStartLiveTime(now);
|
|
|
speakerCache.set(roomSpeakerInfo);
|
|
|
log.info("startLive>>>> roomSpeakerInfo: {}", JSONObject.toJSONString(roomSpeakerInfo));
|
|
|
-
|
|
|
//主播开启直播,查询所有在直播间的用户并写入观看时间
|
|
|
- RMap<Integer, RoomUserInfoVo> roomTotalUser = redissonClient.getMap(LIVE_ROOM_TOTAL_USER_LIST.replace(ROOM_UID, roomSpeakerInfo.getRoomUid()));
|
|
|
- if (!roomTotalUser.isExists()) {
|
|
|
- return;
|
|
|
- }
|
|
|
- roomTotalUser.forEach((id, userInfo) -> {
|
|
|
- //对在房间的用户
|
|
|
- if (Objects.nonNull(userInfo.getState()) && userInfo.getState() == 0) {
|
|
|
- userInfo.setDynamicLookTime(now);
|
|
|
- roomTotalUser.fastPut(id, userInfo);
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
+ CompletableFuture.runAsync(() -> this.asyncOpsLiveLookTime(roomUid, 1));
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 关闭直播-录像
|
|
|
*
|
|
|
* @param roomUid 房间uid
|
|
|
+ * @param userId 老师id
|
|
|
*/
|
|
|
public void closeLive(String roomUid, Integer userId) {
|
|
|
//查询房间主播信息
|
|
@@ -723,12 +736,16 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
|
|
|
speakerCache.set(roomSpeakerInfo);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
+ /**
|
|
|
+ * 关闭直播-录像
|
|
|
+ *
|
|
|
+ * @param roomSpeakerInfo 房间主播信息
|
|
|
+ */
|
|
|
private void closeLive(RoomSpeakerInfo roomSpeakerInfo) {
|
|
|
//直播状态 true 直播中 false关闭直播
|
|
|
- boolean stateFlag = Objects.nonNull(roomSpeakerInfo.getState()) && roomSpeakerInfo.getState() == 0;
|
|
|
+ boolean stateFlag = intEquals(roomSpeakerInfo.getState(), 0);
|
|
|
//是否录像 true允许 false不允许
|
|
|
- boolean whetherVideoFlag = Objects.nonNull(roomSpeakerInfo.getWhetherVideo()) && roomSpeakerInfo.getWhetherVideo() == 0;
|
|
|
+ boolean whetherVideoFlag = intEquals(roomSpeakerInfo.getWhetherVideo(), 0);
|
|
|
//允许录像并在直播中
|
|
|
if (whetherVideoFlag && stateFlag) {
|
|
|
try {
|
|
@@ -743,25 +760,55 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
|
|
|
Date now = new Date();
|
|
|
roomSpeakerInfo.setEndLiveTime(now);
|
|
|
roomSpeakerInfo.setState(1);
|
|
|
- //写入本次直播时长
|
|
|
- roomSpeakerInfo.setTotalLiveTime(getLookMinutes(roomSpeakerInfo.getStartLiveTime(), roomSpeakerInfo.getTotalLiveTime()));
|
|
|
+ //如果开播时间和本次操作结束播放时间小于1分钟则不计算观看时间
|
|
|
+ int liveMinutes = getLookMinutes(roomSpeakerInfo.getStartLiveTime(), null);
|
|
|
+ if (liveMinutes > 1) {
|
|
|
+ //写入本次直播时长
|
|
|
+ roomSpeakerInfo.setTotalLiveTime(getLookMinutes(roomSpeakerInfo.getStartLiveTime(), roomSpeakerInfo.getTotalLiveTime()));
|
|
|
+ //关闭直播后异步执行计算房间人员观看时长
|
|
|
+ CompletableFuture.runAsync(() -> this.asyncOpsLiveLookTime(roomSpeakerInfo.getRoomUid(), 2));
|
|
|
+ }
|
|
|
//计算完后将开始直播时间设置为空,待下次开启后再计算
|
|
|
roomSpeakerInfo.setStartLiveTime(null);
|
|
|
- //主播关闭直播,查询所有在直播间的用户并计算观看时长
|
|
|
- RMap<Integer, RoomUserInfoVo> roomTotalUser = redissonClient.getMap(LIVE_ROOM_TOTAL_USER_LIST.replace(ROOM_UID, roomSpeakerInfo.getRoomUid()));
|
|
|
+ }
|
|
|
+ log.info("closeLive>>>> roomSpeakerInfo: {}", JSONObject.toJSONString(roomSpeakerInfo));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 打开/关闭直播后-异步计算房间人员观看时长
|
|
|
+ *
|
|
|
+ * @param roomUid 房间uid
|
|
|
+ * @param type type 1:开始直播-开始录像 2:关闭直播关闭录像
|
|
|
+ */
|
|
|
+ private void asyncOpsLiveLookTime(String roomUid, Integer type) {
|
|
|
+ //加锁-避免快速点击开启直播和关闭直后异步执行后直播数据错误
|
|
|
+ boolean b = this.runIfLockCanGet(LIVE_LOOK_LOCK.replace(ROOM_UID, roomUid), () -> {
|
|
|
+ //查询所有在直播间的用户并计算观看时长
|
|
|
+ RMap<Integer, RoomUserInfoVo> roomTotalUser = getTotalUserCache(roomUid);
|
|
|
if (!roomTotalUser.isExists()) {
|
|
|
return;
|
|
|
}
|
|
|
+ //查询在线人员列表
|
|
|
+ RMap<Integer, BaseRoomUserVo> onlineUserInfo = getOnlineUserCache(roomUid);
|
|
|
roomTotalUser.forEach((id, userInfo) -> {
|
|
|
- //对在房间的用户计算观看时长
|
|
|
- if (Objects.nonNull(userInfo.getState()) && userInfo.getState() == 0) {
|
|
|
- userInfo.setTotalViewTime(getLookMinutes(userInfo.getDynamicLookTime(), userInfo.getTotalViewTime()));
|
|
|
- userInfo.setDynamicLookTime(null);
|
|
|
+ //获取当前用户是否在房间状态 true在 false不在
|
|
|
+ if (onlineUserInfo.isExists() && onlineUserInfo.containsKey(id)) {
|
|
|
+ if (type.equals(1)) {
|
|
|
+ //开启直播后对当前在房间的用户写入观看时间
|
|
|
+ userInfo.setDynamicLookTime(new Date());
|
|
|
+ } else if (type.equals(2)) {
|
|
|
+ userInfo.setTotalViewTime(getLookMinutes(userInfo.getDynamicLookTime(), userInfo.getTotalViewTime()));
|
|
|
+ userInfo.setDynamicLookTime(null);
|
|
|
+ } else {
|
|
|
+ return;
|
|
|
+ }
|
|
|
roomTotalUser.fastPut(id, userInfo);
|
|
|
}
|
|
|
});
|
|
|
+ }, 2, 1, TimeUnit.MINUTES);
|
|
|
+ if (!b) {
|
|
|
+ this.asyncOpsLiveLookTime(roomUid, type);
|
|
|
}
|
|
|
- log.info("closeLive>>>> roomSpeakerInfo: {}", JSONObject.toJSONString(roomSpeakerInfo));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -849,18 +896,24 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
|
|
|
}
|
|
|
roomVo.setLikeNum((int) like);
|
|
|
//累计总用户数量
|
|
|
- List<RoomUserInfoVo> roomUserInfoVos = queryTotalRoomUserInfo(roomVo.getRoomUid());
|
|
|
- if (CollectionUtils.isNotEmpty(roomUserInfoVos)) {
|
|
|
- roomVo.setTotalLookNum(roomUserInfoVos.size());
|
|
|
- //在房间观看用户数量
|
|
|
- roomVo.setLookNum(queryRoomUserInfo(roomUserInfoVos).size());
|
|
|
- } else {
|
|
|
- roomVo.setTotalLookNum(0);
|
|
|
- roomVo.setLookNum(0);
|
|
|
- }
|
|
|
-
|
|
|
+ roomVo.setTotalLookNum(getNum.apply(this::getTotalUserCache, roomVo.getRoomUid()));
|
|
|
+ //在房间观看用户数量
|
|
|
+ roomVo.setLookNum(getNum.apply(this::getOnlineUserCache, roomVo.getRoomUid()));
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 获取房间缓存中的用户数量/观看人数
|
|
|
+ *
|
|
|
+ * <p> func:查询用户数量/观看人数的方法
|
|
|
+ * <p> roomUid :房间uid
|
|
|
+ * <p> return :用户数量/观看人数
|
|
|
+ */
|
|
|
+ private final BiFunction<Function<String, RMap<Integer, ?>>, String, Integer> getNum = (func, roomUid) -> Optional.of(roomUid)
|
|
|
+ .map(func)
|
|
|
+ .filter(RMap::isExists)
|
|
|
+ .map(RMap::size)
|
|
|
+ .orElse(0);
|
|
|
+
|
|
|
private SysUser getSysUser(Integer userId) {
|
|
|
return Optional.ofNullable(userId)
|
|
|
.map(sysUserFeignService::queryUserById)
|
|
@@ -888,13 +941,11 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
|
|
|
|
|
|
int totalLook = 0;
|
|
|
int look = 0;
|
|
|
- List<RoomUserInfoVo> inRoomUserInfo;
|
|
|
-
|
|
|
+ //正在房间观看的用户数据
|
|
|
+ List<BaseRoomUserVo> inRoomUserInfo = queryRoomOnlineUserInfo(roomUid);
|
|
|
//累计总观看的用户数量
|
|
|
List<RoomUserInfoVo> totalUserInfo = queryTotalRoomUserInfo(roomUid);
|
|
|
if (CollectionUtils.isNotEmpty(totalUserInfo)) {
|
|
|
- //正在房间观看的用户数据
|
|
|
- inRoomUserInfo = queryRoomUserInfo(totalUserInfo);
|
|
|
if (CollectionUtils.isNotEmpty(inRoomUserInfo)) {
|
|
|
look = inRoomUserInfo.size();
|
|
|
result.put("正在观看的人员信息", inRoomUserInfo);
|
|
@@ -940,29 +991,52 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 查询在观看直播的用户信息
|
|
|
+ * 查询直播间在线的用户
|
|
|
*
|
|
|
* @param roomUid 直播间uid
|
|
|
*/
|
|
|
@Override
|
|
|
- public List<RoomUserInfoVo> queryRoomUserInfo(String roomUid) {
|
|
|
- List<RoomUserInfoVo> roomUserInfoVos = queryTotalRoomUserInfo(roomUid);
|
|
|
- return queryRoomUserInfo(roomUserInfoVos);
|
|
|
+ public List<BaseRoomUserVo> queryRoomLimitOnlineUserInfo(String roomUid) {
|
|
|
+ RMap<Integer, BaseRoomUserVo> onlineUserCache = getOnlineUserCache(roomUid);
|
|
|
+ return onlineUserCache.values().stream()
|
|
|
+ .filter(Objects::nonNull)
|
|
|
+ .limit(200)
|
|
|
+ .collect(Collectors.toList());
|
|
|
}
|
|
|
|
|
|
- public List<RoomUserInfoVo> queryRoomUserInfo(List<RoomUserInfoVo> totalUserInfo) {
|
|
|
- return totalUserInfo.stream()
|
|
|
- .filter(o -> Objects.nonNull(o.getState()) && o.getState() == 0)
|
|
|
+ /**
|
|
|
+ * 查询直播间在线的用户
|
|
|
+ *
|
|
|
+ * @param roomUid 直播间uid
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public List<BaseRoomUserVo> queryRoomOnlineUserInfo(String roomUid) {
|
|
|
+ RMap<Integer, BaseRoomUserVo> onlineUserInfo = getOnlineUserCache(roomUid);
|
|
|
+ return onlineUserInfo.values().stream()
|
|
|
+ .filter(Objects::nonNull)
|
|
|
.collect(Collectors.toList());
|
|
|
}
|
|
|
|
|
|
- public List<RoomUserInfoVo> queryTotalRoomUserInfo(String roomUid) {
|
|
|
- RMap<Integer, RoomUserInfoVo> roomTotalUser = redissonClient.getMap(LIVE_ROOM_TOTAL_USER_LIST.replace(ROOM_UID, roomUid));
|
|
|
+ /**
|
|
|
+ * 查询直播间所有用户信息
|
|
|
+ *
|
|
|
+ * @param roomUid 直播间uid
|
|
|
+ */
|
|
|
+ private List<RoomUserInfoVo> queryTotalRoomUserInfo(String roomUid) {
|
|
|
+ RMap<Integer, RoomUserInfoVo> roomTotalUser = getTotalUserCache(roomUid);
|
|
|
return roomTotalUser.values().stream()
|
|
|
.filter(Objects::nonNull)
|
|
|
.collect(Collectors.toList());
|
|
|
}
|
|
|
|
|
|
+ private RMap<Integer, BaseRoomUserVo> getOnlineUserCache(String roomUid) {
|
|
|
+ return redissonClient.getMap(LIVE_ROOM_ONLINE_USER_LIST.replace(ROOM_UID, roomUid));
|
|
|
+ }
|
|
|
+
|
|
|
+ private RMap<Integer, RoomUserInfoVo> getTotalUserCache(String roomUid) {
|
|
|
+ return redissonClient.getMap(LIVE_ROOM_TOTAL_USER_LIST.replace(ROOM_UID, roomUid));
|
|
|
+ }
|
|
|
+
|
|
|
private RoomUserInfoVo getUserInfo(Integer userId) {
|
|
|
RoomUserInfoVo userInfo = new RoomUserInfoVo();
|
|
|
userInfo.setUserId(userId);
|
|
@@ -1016,6 +1090,59 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
+ * 判断Integer是否相等-null值不相等
|
|
|
+ *
|
|
|
+ * @param key1 第一个Integer
|
|
|
+ * @param key2 第二个Integer
|
|
|
+ * @return 相等 true 不相等 false
|
|
|
+ */
|
|
|
+ private boolean intEquals(Integer key1, Integer key2) {
|
|
|
+ return Objects.nonNull(key1) && Objects.nonNull(key2) && Objects.equals(key1, key2);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 分布式锁
|
|
|
+ *
|
|
|
+ * @param lockName lockKey
|
|
|
+ * @param runnable 任务
|
|
|
+ * @param waitTime 等待抢锁的时间,若该key已被占用则等待抢锁。
|
|
|
+ * @param timeout 超时时间
|
|
|
+ * @param unit 时间单位
|
|
|
+ * @return true 锁成功 false 锁失败
|
|
|
+ */
|
|
|
+ public boolean runIfLockCanGet(final String lockName, Runnable runnable, final long waitTime, final long timeout, TimeUnit unit) {
|
|
|
+ RLock lock = redissonClient.getLock(lockName);
|
|
|
+ if (Objects.isNull(lock)) {
|
|
|
+ log.info("runIfLockCanGet lock is null lockName : {}", lockName);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ if (lock.tryLock(waitTime, timeout, unit)) {
|
|
|
+ if (Objects.nonNull(runnable)) {
|
|
|
+ runnable.run();
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ } else {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("runIfLockCanGet error lockName : {}", lockName, e);
|
|
|
+ throw new RuntimeException("runIfLockCanGet error lockName :" + lockName, e);
|
|
|
+ } finally {
|
|
|
+ this.unlock(lock);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 解锁
|
|
|
+ */
|
|
|
+ public void unlock(RLock lock) {
|
|
|
+ if (lock.getHoldCount() != 0) {
|
|
|
+ lock.unlock();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
* 主讲人信息
|
|
|
*/
|
|
|
public static class RoomSpeakerInfo implements Serializable {
|
|
@@ -1064,10 +1191,16 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
|
|
|
this.speakerName = speakerName;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 直播状态 0 直播中 1关闭直播
|
|
|
+ */
|
|
|
public Integer getState() {
|
|
|
return state;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 直播状态 0 直播中 1关闭直播
|
|
|
+ */
|
|
|
public void setState(Integer state) {
|
|
|
this.state = state;
|
|
|
}
|