瀏覽代碼

Merge branch 'dev_20230222_live' into master_saas

liujunchi 2 年之前
父節點
當前提交
176745aaa2

+ 31 - 16
mec-biz/src/main/java/com/mec/redisson/RedissonTopicListener.java

@@ -5,6 +5,7 @@ import com.ym.mec.biz.redisson.RedissonMessageService;
 import com.ym.mec.biz.service.ImLiveBroadcastRoomService;
 import lombok.extern.slf4j.Slf4j;
 import org.redisson.api.RBucket;
+import org.redisson.api.RLock;
 import org.redisson.api.RedissonClient;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.ApplicationArguments;
@@ -43,24 +44,38 @@ public class RedissonTopicListener implements ApplicationRunner, Ordered {
                     log.error("RedissonMessageService subscribe sleep error", e);
                 }
 
-                // 缓存JoinRoom用户信息到redis
-                RBucket<Object> bucket = redissonClient.getBucket(RedissonMessageService.LIVE_ROOM_MEMBER + message);
-                if (!bucket.isExists()) {
-                   return;
-                }
-                bucket.delete();
-
-                ImLiveBroadcastRoomVo imLiveBroadcastRoomVo = imLiveBroadcastRoomService.queryRoomInfo(message);
-                if (Objects.isNull(imLiveBroadcastRoomVo)) {
-                    return;
-                }
+                RLock lock = redissonClient.getLock(RedissonMessageService.LIVE_ROOM_MEMBER_LOCK + message);
                 try {
-                    imLiveBroadcastRoomService.setGroupMemberDefinedData(imLiveBroadcastRoomVo,imLiveBroadcastRoomVo.getLookNum(),imLiveBroadcastRoomVo.getTotalLookNum());
-                } catch (Exception e) {
-                    log.error("RedissonMessageService subscribe setGroupMemberDefinedData error", e);
-                    bucket.set(message, 30, TimeUnit.MINUTES);
+                    if (lock.tryLock()) {
+                        // 缓存JoinRoom用户信息到redis
+                        RBucket<Object> bucket = redissonClient.getBucket(RedissonMessageService.LIVE_ROOM_MEMBER + message);
+                        if (!bucket.isExists()) {
+                            return;
+                        }
+                        Integer times = (Integer) bucket.get();
+                        bucket.delete();
+
+                        ImLiveBroadcastRoomVo imLiveBroadcastRoomVo = imLiveBroadcastRoomService.queryRoomInfo(message);
+                        if (Objects.isNull(imLiveBroadcastRoomVo)) {
+                            return;
+                        }
+                        try {
+                            imLiveBroadcastRoomService.setGroupMemberDefinedData(imLiveBroadcastRoomVo,imLiveBroadcastRoomVo.getLookNum(),imLiveBroadcastRoomVo.getTotalLookNum());
+                        } catch (Exception e) {
+                            log.error("RedissonMessageService subscribe setGroupMemberDefinedData error", e);
+                            if (times>=3) {
+                                return;
+                            }
+                            bucket.set(times +1, 30, TimeUnit.MINUTES);
+                        }
+                        redissonMessageService.publish(RedissonMessageService.TOPIC_MESSAGE, message);
+                    }
+                } finally {
+                    if (lock.getHoldCount() != 0 && lock.isHeldByCurrentThread()) {
+                        lock.unlock();
+                    }
                 }
-                redissonMessageService.publish(RedissonMessageService.TOPIC_MESSAGE, message);
+
 
             });
             log.info("---> RedissonMessageService subscribe success");

+ 1 - 0
mec-biz/src/main/java/com/ym/mec/biz/redisson/RedissonMessageService.java

@@ -17,6 +17,7 @@ public class RedissonMessageService {
     public static final String TOPIC_MESSAGE = "topic:message";
     // 直播在线人数
     public static final String LIVE_ROOM_MEMBER = "delayQueue:liveRoomMember:";
+    public static final String LIVE_ROOM_MEMBER_LOCK = "LOCK:liveRoomMember:";
 
     private final RedissonClient redissonClient;
 

+ 4 - 13
mec-biz/src/main/java/com/ym/mec/biz/service/impl/ImLiveBroadcastRoomServiceImpl.java

@@ -736,17 +736,6 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
                     pluginService.rtcRoomRecordStop(taskId);
                 }
             }
-            // 将在房间人员退出房间
-            List<Integer> liveRoomMember = liveBroadcastRoomMemberService.getLiveRoomMember(roomUid);
-            List<ImUserState> imUserStates = liveRoomMember.stream().map(userId1 -> {
-                ImUserState imUserState = new ImUserState();
-                imUserState.setUserid(userId1.toString());
-                imUserState.setRoomUid(roomUid);
-                imUserState.setStatus("3");
-                return imUserState;
-            }).collect(Collectors.toList());
-
-            opsRoom(imUserStates);
 
             //            imFeignService.destroyLiveRoom(roomUid);
             log.info("roomDestroy>>>> destroyLiveRoom {}", JSONObject.toJSONString(message));
@@ -776,6 +765,8 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
                 member.setTenantId(v.getTenantId());
                 member.setRoomUid(roomUid);
                 member.setUserId(v.getUserId());
+                member.setOnlineStatus(0);
+                member.setLiveRoomStatus(0);
                 member.setJoinTime(v.getFirstJoinTime());
                 member.setTotalTime(getLookMinutes(v.getDynamicLookTime(), now, v.getTotalViewTime()));
                 memberList.add(member);
@@ -933,7 +924,7 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
                 // 缓存JoinRoom用户信息到redis
                 RBucket<Object> bucket = redissonClient.getBucket(RedissonMessageService.LIVE_ROOM_MEMBER + roomUid);
                 if (!bucket.isExists()) {
-                    bucket.set(roomUid, 30, TimeUnit.MINUTES);
+                    bucket.set(0, 30, TimeUnit.MINUTES);
                     // 发布删除缓存消息
                     redissonMessageService.publish(RedissonMessageService.TOPIC_MESSAGE, roomUid);
                 }
@@ -1131,7 +1122,7 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
         // 缓存JoinRoom用户信息到redis
         RBucket<Object> bucket = redissonClient.getBucket(RedissonMessageService.LIVE_ROOM_MEMBER + roomUid);
         if (!bucket.isExists()) {
-            bucket.set(roomUid, 30, TimeUnit.MINUTES);
+            bucket.set(0, 30, TimeUnit.MINUTES);
             // 发布删除缓存消息
             redissonMessageService.publish(RedissonMessageService.TOPIC_MESSAGE, roomUid);
         }

+ 3 - 3
mec-biz/src/main/resources/config/mybatis/ImLiveBroadcastRoomMemberMapper.xml

@@ -17,11 +17,11 @@
 
     <insert id="insertBatch" keyColumn="id_" keyProperty="id" useGeneratedKeys="true"
             parameterType="com.ym.mec.biz.dal.entity.ImLiveBroadcastRoomMember">
-        insert into im_live_broadcast_room_member(tenant_id_, room_uid_, user_id_, join_time_, total_time_) values
+        insert into im_live_broadcast_room_member(tenant_id_, room_uid_, user_id_, join_time_, total_time_,online_status_,live_room_status_) values
         <foreach collection="entities" item="entity" separator=",">
-            (#{entity.tenantId}, #{entity.roomUid}, #{entity.userId}, #{entity.joinTime}, #{entity.totalTime})
+            (#{entity.tenantId}, #{entity.roomUid}, #{entity.userId}, #{entity.joinTime}, #{entity.totalTime},0,0)
         </foreach>
-        ON DUPLICATE KEY UPDATE join_time_ = VALUES(join_time_), total_time_ = VALUES(total_time_)
+        ON DUPLICATE KEY UPDATE join_time_ = VALUES(join_time_), total_time_ = VALUES(total_time_),online_status_ = 0,live_room_status_ = 0
     </insert>
 
     <select id="queryMemberPage" resultType="com.ym.mec.biz.dal.vo.ImLiveBroadcastRoomMemberVo">

+ 1 - 1
mec-student/src/main/java/com/ym/mec/student/StudentApplication.java

@@ -22,7 +22,7 @@ import com.ym.mec.common.filters.EmojiEncodingFilter;
 @EnableDiscoveryClient
 @EnableFeignClients("com.ym.mec")
 @MapperScan(basePackages = {"com.ym.mec.biz.**.dao", "com.yonge.datasource.dao"})
-@ComponentScan(basePackages = {"com.ym.mec", "com.yonge.log", "com.mec.redisson", "com.yonge.datasource"})
+@ComponentScan(basePackages = {"com.ym.mec", "com.yonge.log",  "com.yonge.datasource"})
 @Configuration
 @EnableSwagger2Doc
 @EnableAsync

+ 1 - 1
mec-teacher/src/main/java/com/ym/mec/teacher/TeacherApplication.java

@@ -22,7 +22,7 @@ import com.ym.mec.common.filters.EmojiEncodingFilter;
 @EnableDiscoveryClient
 @EnableFeignClients("com.ym.mec")
 @MapperScan(basePackages = {"com.ym.mec.biz.**.dao", "com.yonge.datasource.dao"})
-@ComponentScan(basePackages = { "com.ym.mec", "org.snaker.engine", "com.yonge.log", "com.mec.redisson", "com.yonge.datasource"})
+@ComponentScan(basePackages = { "com.ym.mec", "org.snaker.engine", "com.yonge.log",  "com.yonge.datasource"})
 @Configuration
 @EnableSwagger2Doc
 @EnableAsync