浏览代码

Merge branch 'dev_20230222_live' into master_saas

Eric 2 年之前
父节点
当前提交
7b28bbb1cc

+ 140 - 0
mec-biz/src/main/java/com/ym/mec/biz/dal/dto/TencentData.java

@@ -3,11 +3,16 @@ package com.ym.mec.biz.dal.dto;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.annotation.JSONField;
 import com.ym.mec.biz.dal.enums.ETencentGroupType;
 import com.ym.mec.biz.dal.enums.ETencentImCallbackCommand;
+import io.swagger.annotations.ApiModel;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
+import java.io.Serializable;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.List;
@@ -184,4 +189,139 @@ public class TencentData {
         }
     }
 
+    @Data
+    @Builder
+    @NoArgsConstructor
+    @AllArgsConstructor
+    @ApiModel("推断流事件回调通知")
+    public static class CallbackStreamStateEvent implements Serializable {
+
+        @JSONField(name = "app")
+        private String app;
+        @JSONField(name = "appid")
+        private Integer appid;
+        @JSONField(name = "appname")
+        private String appname;
+        @JSONField(name = "channel_id")
+        private String channelId;
+        @JSONField(name = "errcode")
+        private Integer errcode;
+        @JSONField(name = "errmsg")
+        private String errmsg;
+        @JSONField(name = "event_time")
+        private Integer eventTime;
+        @JSONField(name = "event_type")
+        private Integer eventType;
+        @JSONField(name = "set_id")
+        private Integer setId;
+        @JSONField(name = "node")
+        private String node;
+        @JSONField(name = "sequence")
+        private String sequence;
+        @JSONField(name = "stream_id")
+        private String streamId;
+        @JSONField(name = "stream_param")
+        private String streamParam;
+        @JSONField(name = "user_ip")
+        private String userIp;
+        @JSONField(name = "width")
+        private Integer width;
+        @JSONField(name = "height")
+        private Integer height;
+        @JSONField(name = "sign")
+        private String sign;
+        @JSONField(name = "t")
+        private Integer t;
+    }
+
+    @Data
+    @Builder
+    @NoArgsConstructor
+    @AllArgsConstructor
+    @ApiModel("流录制事件回调通知")
+    public static class CallbackSteamRecordEvent implements Serializable {
+
+        @JSONField(name = "event_type")
+        private Integer eventType;
+        @JSONField(name = "appid")
+        private Integer appid;
+        @JSONField(name = "app")
+        private String app;
+        @JSONField(name = "callback_ext")
+        private String callbackExt;
+        @JSONField(name = "appname")
+        private String appname;
+        @JSONField(name = "stream_id")
+        private String streamId;
+        @JSONField(name = "channel_id")
+        private String channelId;
+        @JSONField(name = "file_id")
+        private String fileId;
+        @JSONField(name = "record_file_id")
+        private String recordFileId;
+        @JSONField(name = "file_format")
+        private String fileFormat;
+        @JSONField(name = "task_id")
+        private String taskId;
+        @JSONField(name = "start_time")
+        private Integer startTime;
+        @JSONField(name = "end_time")
+        private Integer endTime;
+        @JSONField(name = "start_time_usec")
+        private Integer startTimeUsec;
+        @JSONField(name = "end_time_usec")
+        private Integer endTimeUsec;
+        @JSONField(name = "duration")
+        private Integer duration;
+        @JSONField(name = "file_size")
+        private Integer fileSize;
+        @JSONField(name = "stream_param")
+        private String streamParam;
+        @JSONField(name = "video_url")
+        private String videoUrl;
+        @JSONField(name = "media_start_time")
+        private Integer mediaStartTime;
+        @JSONField(name = "record_bps")
+        private Integer recordBps;
+        @JSONField(name = "sign")
+        private String sign;
+        @JSONField(name = "t")
+        private Integer t;
+
+    }
+
+    @Data
+    @NoArgsConstructor
+    @AllArgsConstructor
+    @ApiModel("流异常事件回调通知")
+    public static class CallbackStreamExceptionEvent implements Serializable {
+
+
+        @JSONField(name = "appid")
+        private Integer appid;
+        @JSONField(name = "data_time")
+        private Long dataTime;
+        @JSONField(name = "domain")
+        private String domain;
+        @JSONField(name = "event_type")
+        private Integer eventType;
+        @JSONField(name = "interface")
+        private String interfaceX;
+        @JSONField(name = "path")
+        private String path;
+        @JSONField(name = "report_interval")
+        private Integer reportInterval;
+        @JSONField(name = "sequence")
+        private String sequence;
+        @JSONField(name = "stream_id")
+        private String streamId;
+        @JSONField(name = "stream_param")
+        private String streamParam;
+        @JSONField(name = "timeout")
+        private Integer timeout;
+        @JSONField(name = "abnormal_event")
+        private String abnormalEvent;
+    }
+
+
 }

+ 47 - 32
mec-biz/src/main/java/com/ym/mec/biz/service/impl/ImLiveBroadcastRoomServiceImpl.java

@@ -10,6 +10,8 @@ import com.fasterxml.jackson.annotation.JsonFormat;
 import com.google.common.collect.Lists;
 import com.microsvc.toolkit.middleware.live.LivePluginContext;
 import com.microsvc.toolkit.middleware.live.LivePluginService;
+import com.microsvc.toolkit.middleware.live.impl.RongCloudLivePlugin;
+import com.microsvc.toolkit.middleware.live.impl.TencentCloudLivePlugin;
 import com.microsvc.toolkit.middleware.live.message.LiveRoomMessage;
 import com.microsvc.toolkit.middleware.live.message.LiveRoomUser;
 import com.microsvc.toolkit.middleware.live.message.RTCRequest;
@@ -66,7 +68,6 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.text.MessageFormat;
-import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
@@ -835,14 +836,26 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
             //从在线人员列表删除该人员
             onlineUserInfo.fastRemove(userId);
 
+            log.info("opsRoom>>>> looker userInfo: {}", JSONObject.toJSONString(userInfo));
+            // 在线用户消息同步
+            sendOnlineUserCount(roomVo, userId, onlineUserInfo.size());
+
+            // 直播间统计数据
+            sendLiveRoomStatMessage(userid, roomVo);
+
             // 更新用户离线状态
             ImLiveBroadcastRoomMember roomMember = new ImLiveBroadcastRoomMember();
             roomMember.setOnlineStatus(0);
 
             // 用户离开直播间
             if (user.getStatus().equals("3")) {
+                // 直播间用户离开状态
                 roomMember.setLiveRoomStatus(0);
+
+                // 直播间用户离开直播间消息
+                sendLiveRoomLoginOutMessage(userid, roomVo);
             }
+
             // 更新用户在线状态为离线
             liveBroadcastRoomMemberService.lambdaUpdate()
                     .eq(ImLiveBroadcastRoomMember::getTenantId, roomVo.getTenantId())
@@ -850,16 +863,6 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
                     .eq(ImLiveBroadcastRoomMember::getUserId, userId)
                     .update(roomMember);
 
-            log.info("opsRoom>>>> looker userInfo: {}", JSONObject.toJSONString(userInfo));
-            // 在线用户消息同步
-            sendOnlineUserCount(roomVo, userId, onlineUserInfo.size());
-
-            // 直播间统计数据
-            sendLiveRoomStatMessage(userid, roomVo);
-
-            // 直播间用户离开消息
-            sendLiveRoomLoginOutMessage(userid, roomVo);
-
         });
     }
 
@@ -947,9 +950,12 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
                 .content(messageContent)
                 .build();
         try {
+            // TODO: 取消用户离开事件消息发送
             //用户离开直播间发送退出房间消息给主讲人
+            /*
             LivePluginService pluginService = livePluginContext.getPluginService(roomVo.getServiceProvider());
             pluginService.sendChatRoomMessage(message);
+            */
             log.info("sendLiveRoomLoginOutMessage>>>> looker LOOKER_LOGIN_OUT : roomId={}, userId={}", roomVo.getRoomUid(), userid);
         } catch (Exception e) {
             log.error("sendLiveRoomLoginOutMessage>>>>  looker error LOOKER_LOGIN_OUT {}", e.getMessage());
@@ -1025,6 +1031,7 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
 
         RBucket<RoomSpeakerInfo> speakerCache = getRoomSpeakerInfoCache(roomUid, userid);
         if (!speakerCache.isExists()) {
+            log.info("isSpeaker>>>> 主播用户缓存不存在, roomId: {}, userId={}", roomUid, userid);
             return false;
         }
 
@@ -1036,7 +1043,7 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
         if (user.getStatus().equals("0")) {
 
             speakerInfo.setJoinRoomTime(now);
-            log.info("opsRoom>>>> join speakerCache {}", JSONObject.toJSONString(speakerInfo));
+            log.info("isSpeaker>>>> join speakerCache {}", JSONObject.toJSONString(speakerInfo));
             speakerCache.set(speakerInfo);
             //将本次进入房间的clientIp添加到主讲人最后一次clientIp缓存中
             if (StringUtils.isNotBlank(user.getClientIp())) {
@@ -1052,17 +1059,31 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
 
             return true;
         }
-        //校验本次退出直播间的clientIp 是不是上次进入房间的clientIp
-        if (StringUtils.isNotBlank(user.getClientIp())) {
-            if (lastClientIp.isExists()) {
-                //如果是上次进入房间的clientIp和本次退出房间的clientIp不相同,则直接忽略
-                if (!user.getClientIp().equals(lastClientIp.get())) {
-                    return true;
+
+        //直播间信息
+        ImLiveBroadcastRoomVo roomVo = this.getImLiveBroadcastRoomVo(roomUid);
+        if (Objects.isNull(roomVo)) {
+            log.info("isSpeaker>>>> roomVo is null, roomId: {}, userId={}", roomUid, userid);
+            return true;
+        }
+
+        if (roomVo.getServiceProvider().equals(RongCloudLivePlugin.PLUGIN_NAME)) {
+
+            //校验本次退出直播间的clientIp 是不是上次进入房间的clientIp
+            if (StringUtils.isNotBlank(user.getClientIp())) {
+                if (lastClientIp.isExists()) {
+                    //如果是上次进入房间的clientIp和本次退出房间的clientIp不相同,则直接忽略
+                    if (!user.getClientIp().equals(lastClientIp.get())) {
+                        log.info("isSpeaker>>>> 上次进入房间的clientIp和本次退出房间的clientIp不相同, roomId: {}, userId={}", roomUid, userid);
+                        return true;
+                    }
                 }
             }
         }
+
         //如果退出时间大于进入时间就无需再次退出-直接返回
         if (compareDate.apply(speakerInfo.getExitRoomTime(), speakerInfo.getJoinRoomTime())) {
+            log.info("isSpeaker>>>> 退出时间大于进入时间, roomId: {}, userId={}", roomUid, userid);
             return true;
         }
 
@@ -1074,19 +1095,19 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
         log.info("opsRoom>>>> exit speakerCache {}", JSONObject.toJSONString(speakerInfo));
         speakerCache.set(speakerInfo);
 
-        //向直播间发送当前在线人数消息
-        ImLiveBroadcastRoomVo roomVo = this.getImLiveBroadcastRoomVo(roomUid);
-        if (Objects.isNull(roomVo)) {
-            log.info("opsRoom>>>> roomVo is null, roomId: {}, userId={}", roomUid, userid);
-            return true;
-        }
-
         ImLiveBroadcastRoom room = new ImLiveBroadcastRoom();
         room.setSpeakerStatus(0);
 
-        // 更新推流状态
+        // 主播离开直播间事件
         if (user.getStatus().equals("3")) {
+            // 更新推流状态
             room.setPushStatus(0);
+
+            // 腾讯云直播方案,推送主播离开消息
+            if (roomVo.getServiceProvider().equals(TencentCloudLivePlugin.PLUGIN_NAME)) {
+                // 主播离开消息通知
+                sendLiveRoomLoginOutMessage(userid, roomVo);
+            }
         }
         // 更新主播状态
         lambdaUpdate()
@@ -1094,12 +1115,6 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
                 .eq(ImLiveBroadcastRoom::getSpeakerId, Integer.parseInt(userid))
                 .update(room);
 
-        // 腾讯云直播方案,推送主播离开消息
-        if (roomVo.getServiceProvider().equals("tencentCloud")) {
-            // 主播离开消息通知
-            sendLiveRoomLoginOutMessage(userid, roomVo);
-        }
-
         return true;
     }