Explorar o código

Merge remote-tracking branch 'origin/master_saas' into master_saas

zouxuan %!s(int64=2) %!d(string=hai) anos
pai
achega
12b3c38210

+ 140 - 0
mec-biz/src/main/java/com/ym/mec/biz/dal/dto/TencentData.java

@@ -3,11 +3,16 @@ package com.ym.mec.biz.dal.dto;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.annotation.JSONField;
 import com.ym.mec.biz.dal.enums.ETencentGroupType;
 import com.ym.mec.biz.dal.enums.ETencentImCallbackCommand;
+import io.swagger.annotations.ApiModel;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
+import java.io.Serializable;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.List;
@@ -184,4 +189,139 @@ public class TencentData {
         }
     }
 
+    @Data
+    @Builder
+    @NoArgsConstructor
+    @AllArgsConstructor
+    @ApiModel("推断流事件回调通知")
+    public static class CallbackStreamStateEvent implements Serializable {
+
+        @JSONField(name = "app")
+        private String app;
+        @JSONField(name = "appid")
+        private Integer appid;
+        @JSONField(name = "appname")
+        private String appname;
+        @JSONField(name = "channel_id")
+        private String channelId;
+        @JSONField(name = "errcode")
+        private Integer errcode;
+        @JSONField(name = "errmsg")
+        private String errmsg;
+        @JSONField(name = "event_time")
+        private Integer eventTime;
+        @JSONField(name = "event_type")
+        private Integer eventType;
+        @JSONField(name = "set_id")
+        private Integer setId;
+        @JSONField(name = "node")
+        private String node;
+        @JSONField(name = "sequence")
+        private String sequence;
+        @JSONField(name = "stream_id")
+        private String streamId;
+        @JSONField(name = "stream_param")
+        private String streamParam;
+        @JSONField(name = "user_ip")
+        private String userIp;
+        @JSONField(name = "width")
+        private Integer width;
+        @JSONField(name = "height")
+        private Integer height;
+        @JSONField(name = "sign")
+        private String sign;
+        @JSONField(name = "t")
+        private Integer t;
+    }
+
+    @Data
+    @Builder
+    @NoArgsConstructor
+    @AllArgsConstructor
+    @ApiModel("流录制事件回调通知")
+    public static class CallbackSteamRecordEvent implements Serializable {
+
+        @JSONField(name = "event_type")
+        private Integer eventType;
+        @JSONField(name = "appid")
+        private Integer appid;
+        @JSONField(name = "app")
+        private String app;
+        @JSONField(name = "callback_ext")
+        private String callbackExt;
+        @JSONField(name = "appname")
+        private String appname;
+        @JSONField(name = "stream_id")
+        private String streamId;
+        @JSONField(name = "channel_id")
+        private String channelId;
+        @JSONField(name = "file_id")
+        private String fileId;
+        @JSONField(name = "record_file_id")
+        private String recordFileId;
+        @JSONField(name = "file_format")
+        private String fileFormat;
+        @JSONField(name = "task_id")
+        private String taskId;
+        @JSONField(name = "start_time")
+        private Integer startTime;
+        @JSONField(name = "end_time")
+        private Integer endTime;
+        @JSONField(name = "start_time_usec")
+        private Integer startTimeUsec;
+        @JSONField(name = "end_time_usec")
+        private Integer endTimeUsec;
+        @JSONField(name = "duration")
+        private Integer duration;
+        @JSONField(name = "file_size")
+        private Integer fileSize;
+        @JSONField(name = "stream_param")
+        private String streamParam;
+        @JSONField(name = "video_url")
+        private String videoUrl;
+        @JSONField(name = "media_start_time")
+        private Integer mediaStartTime;
+        @JSONField(name = "record_bps")
+        private Integer recordBps;
+        @JSONField(name = "sign")
+        private String sign;
+        @JSONField(name = "t")
+        private Integer t;
+
+    }
+
+    @Data
+    @NoArgsConstructor
+    @AllArgsConstructor
+    @ApiModel("流异常事件回调通知")
+    public static class CallbackStreamExceptionEvent implements Serializable {
+
+
+        @JSONField(name = "appid")
+        private Integer appid;
+        @JSONField(name = "data_time")
+        private Long dataTime;
+        @JSONField(name = "domain")
+        private String domain;
+        @JSONField(name = "event_type")
+        private Integer eventType;
+        @JSONField(name = "interface")
+        private String interfaceX;
+        @JSONField(name = "path")
+        private String path;
+        @JSONField(name = "report_interval")
+        private Integer reportInterval;
+        @JSONField(name = "sequence")
+        private String sequence;
+        @JSONField(name = "stream_id")
+        private String streamId;
+        @JSONField(name = "stream_param")
+        private String streamParam;
+        @JSONField(name = "timeout")
+        private Integer timeout;
+        @JSONField(name = "abnormal_event")
+        private String abnormalEvent;
+    }
+
+
 }

+ 60 - 33
mec-biz/src/main/java/com/ym/mec/biz/service/impl/ImLiveBroadcastRoomServiceImpl.java

@@ -10,6 +10,8 @@ import com.fasterxml.jackson.annotation.JsonFormat;
 import com.google.common.collect.Lists;
 import com.microsvc.toolkit.middleware.live.LivePluginContext;
 import com.microsvc.toolkit.middleware.live.LivePluginService;
+import com.microsvc.toolkit.middleware.live.impl.RongCloudLivePlugin;
+import com.microsvc.toolkit.middleware.live.impl.TencentCloudLivePlugin;
 import com.microsvc.toolkit.middleware.live.message.LiveRoomMessage;
 import com.microsvc.toolkit.middleware.live.message.LiveRoomUser;
 import com.microsvc.toolkit.middleware.live.message.RTCRequest;
@@ -66,7 +68,6 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.text.MessageFormat;
-import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
@@ -336,6 +337,15 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
         this.save(obj);
         //推送老师端-直播已经创建
         sendRoomLiveState(sysUser, obj, MessageTypeEnum.JIGUANG_LIVE_CREATED);
+
+
+        CompletableFuture.runAsync(() -> {
+
+            Date endTime = DateUtil.addMinutes(now, PRE_LIVE_TIME_MINUTE);
+            if (endTime.getTime() >= dto.getLiveStartTime().getTime()) {
+                this.createLiveRoom(obj);
+            }
+        });
     }
 
     /**
@@ -835,14 +845,26 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
             //从在线人员列表删除该人员
             onlineUserInfo.fastRemove(userId);
 
+            log.info("opsRoom>>>> looker userInfo: {}", JSONObject.toJSONString(userInfo));
+            // 在线用户消息同步
+            sendOnlineUserCount(roomVo, userId, onlineUserInfo.size());
+
+            // 直播间统计数据
+            sendLiveRoomStatMessage(userid, roomVo);
+
             // 更新用户离线状态
             ImLiveBroadcastRoomMember roomMember = new ImLiveBroadcastRoomMember();
             roomMember.setOnlineStatus(0);
 
             // 用户离开直播间
             if (user.getStatus().equals("3")) {
+                // 直播间用户离开状态
                 roomMember.setLiveRoomStatus(0);
+
+                // 直播间用户离开直播间消息
+                sendLiveRoomLoginOutMessage(userid, roomVo);
             }
+
             // 更新用户在线状态为离线
             liveBroadcastRoomMemberService.lambdaUpdate()
                     .eq(ImLiveBroadcastRoomMember::getTenantId, roomVo.getTenantId())
@@ -850,16 +872,6 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
                     .eq(ImLiveBroadcastRoomMember::getUserId, userId)
                     .update(roomMember);
 
-            log.info("opsRoom>>>> looker userInfo: {}", JSONObject.toJSONString(userInfo));
-            // 在线用户消息同步
-            sendOnlineUserCount(roomVo, userId, onlineUserInfo.size());
-
-            // 直播间统计数据
-            sendLiveRoomStatMessage(userid, roomVo);
-
-            // 直播间用户离开消息
-            sendLiveRoomLoginOutMessage(userid, roomVo);
-
         });
     }
 
@@ -947,9 +959,12 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
                 .content(messageContent)
                 .build();
         try {
+            // TODO: 取消用户离开事件消息发送
             //用户离开直播间发送退出房间消息给主讲人
+            /*
             LivePluginService pluginService = livePluginContext.getPluginService(roomVo.getServiceProvider());
             pluginService.sendChatRoomMessage(message);
+            */
             log.info("sendLiveRoomLoginOutMessage>>>> looker LOOKER_LOGIN_OUT : roomId={}, userId={}", roomVo.getRoomUid(), userid);
         } catch (Exception e) {
             log.error("sendLiveRoomLoginOutMessage>>>>  looker error LOOKER_LOGIN_OUT {}", e.getMessage());
@@ -1025,6 +1040,7 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
 
         RBucket<RoomSpeakerInfo> speakerCache = getRoomSpeakerInfoCache(roomUid, userid);
         if (!speakerCache.isExists()) {
+            log.info("isSpeaker>>>> 主播用户缓存不存在, roomId: {}, userId={}", roomUid, userid);
             return false;
         }
 
@@ -1036,7 +1052,7 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
         if (user.getStatus().equals("0")) {
 
             speakerInfo.setJoinRoomTime(now);
-            log.info("opsRoom>>>> join speakerCache {}", JSONObject.toJSONString(speakerInfo));
+            log.info("isSpeaker>>>> join speakerCache {}", JSONObject.toJSONString(speakerInfo));
             speakerCache.set(speakerInfo);
             //将本次进入房间的clientIp添加到主讲人最后一次clientIp缓存中
             if (StringUtils.isNotBlank(user.getClientIp())) {
@@ -1052,17 +1068,31 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
 
             return true;
         }
-        //校验本次退出直播间的clientIp 是不是上次进入房间的clientIp
-        if (StringUtils.isNotBlank(user.getClientIp())) {
-            if (lastClientIp.isExists()) {
-                //如果是上次进入房间的clientIp和本次退出房间的clientIp不相同,则直接忽略
-                if (!user.getClientIp().equals(lastClientIp.get())) {
-                    return true;
+
+        //直播间信息
+        ImLiveBroadcastRoomVo roomVo = this.getImLiveBroadcastRoomVo(roomUid);
+        if (Objects.isNull(roomVo)) {
+            log.info("isSpeaker>>>> roomVo is null, roomId: {}, userId={}", roomUid, userid);
+            return true;
+        }
+
+        if (roomVo.getServiceProvider().equals(RongCloudLivePlugin.PLUGIN_NAME)) {
+
+            //校验本次退出直播间的clientIp 是不是上次进入房间的clientIp
+            if (StringUtils.isNotBlank(user.getClientIp())) {
+                if (lastClientIp.isExists()) {
+                    //如果是上次进入房间的clientIp和本次退出房间的clientIp不相同,则直接忽略
+                    if (!user.getClientIp().equals(lastClientIp.get())) {
+                        log.info("isSpeaker>>>> 上次进入房间的clientIp和本次退出房间的clientIp不相同, roomId: {}, userId={}", roomUid, userid);
+                        return true;
+                    }
                 }
             }
         }
+
         //如果退出时间大于进入时间就无需再次退出-直接返回
         if (compareDate.apply(speakerInfo.getExitRoomTime(), speakerInfo.getJoinRoomTime())) {
+            log.info("isSpeaker>>>> 退出时间大于进入时间, roomId: {}, userId={}", roomUid, userid);
             return true;
         }
 
@@ -1074,19 +1104,19 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
         log.info("opsRoom>>>> exit speakerCache {}", JSONObject.toJSONString(speakerInfo));
         speakerCache.set(speakerInfo);
 
-        //向直播间发送当前在线人数消息
-        ImLiveBroadcastRoomVo roomVo = this.getImLiveBroadcastRoomVo(roomUid);
-        if (Objects.isNull(roomVo)) {
-            log.info("opsRoom>>>> roomVo is null, roomId: {}, userId={}", roomUid, userid);
-            return true;
-        }
-
         ImLiveBroadcastRoom room = new ImLiveBroadcastRoom();
         room.setSpeakerStatus(0);
 
-        // 更新推流状态
+        // 主播离开直播间事件
         if (user.getStatus().equals("3")) {
+            // 更新推流状态
             room.setPushStatus(0);
+
+            // 腾讯云直播方案,推送主播离开消息
+            if (roomVo.getServiceProvider().equals(TencentCloudLivePlugin.PLUGIN_NAME)) {
+                // 主播离开消息通知
+                sendLiveRoomLoginOutMessage(userid, roomVo);
+            }
         }
         // 更新主播状态
         lambdaUpdate()
@@ -1094,12 +1124,6 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
                 .eq(ImLiveBroadcastRoom::getSpeakerId, Integer.parseInt(userid))
                 .update(room);
 
-        // 腾讯云直播方案,推送主播离开消息
-        if (roomVo.getServiceProvider().equals("tencentCloud")) {
-            // 主播离开消息通知
-            sendLiveRoomLoginOutMessage(userid, roomVo);
-        }
-
         return true;
     }
 
@@ -1433,8 +1457,10 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
      * 提前30分钟主动去融云注册并创建房间
      */
     public void createLiveRoom() {
+        log.info("createLiveRoom>>>>");
         RBucket<Object> createLock = redissonClient.getBucket("IM:LIVE_ROOM_CREATE_LOCK");
         if (!createLock.trySet(1, 1, TimeUnit.MINUTES)) {
+            log.info("createLiveRoom>>>>>createLock is exists");
             return;
         }
         Date now = new Date();
@@ -1538,8 +1564,9 @@ public class ImLiveBroadcastRoomServiceImpl extends ServiceImpl<ImLiveBroadcastR
      */
     private void sendRoomLiveState(SysUser user, ImLiveBroadcastRoom room, MessageTypeEnum en) {
         String baseUrl;
-        if (room.getClientType().equals(SysUserType.EDUCATION)) {
+        if (SysUserType.EDUCATION.equals(room.getClientType())) {
             baseUrl=sysConfigDao.findConfigValue(SysConfigService.EDU_TEACHER_BASE_URL);
+            return;
         } else {
             baseUrl=sysConfigDao.findConfigValue(SysConfigService.TEACHER_BASE_URL);
         }

+ 41 - 9
mec-biz/src/main/java/com/ym/mec/biz/service/impl/LiveGoodsMapperServiceImpl.java

@@ -1,8 +1,11 @@
 package com.ym.mec.biz.service.impl;
 
+import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.microsvc.toolkit.middleware.live.LivePluginContext;
 import com.microsvc.toolkit.middleware.live.message.LiveRoomMessage;
+import com.ym.mec.auth.api.client.SysUserFeignService;
+import com.ym.mec.auth.api.entity.SysUser;
 import com.ym.mec.biz.dal.dao.LiveGoodsMapperDao;
 import com.ym.mec.biz.dal.dto.LiveGoodsMapperDto;
 import com.ym.mec.biz.dal.dto.RedisKeyConstant;
@@ -18,6 +21,7 @@ import com.ym.mec.common.exception.BizException;
 import com.ym.mec.common.page.PageInfo;
 import com.ym.mec.common.service.impl.BaseServiceImpl;
 import com.ym.mec.util.collection.MapUtil;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.redisson.api.RedissonClient;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -27,6 +31,7 @@ import org.springframework.transaction.annotation.Transactional;
 import java.util.*;
 
 @Service
+@Slf4j
 public class LiveGoodsMapperServiceImpl extends BaseServiceImpl<Integer, LiveGoodsMapper>  implements LiveGoodsMapperService {
 	
 	@Autowired
@@ -40,6 +45,8 @@ public class LiveGoodsMapperServiceImpl extends BaseServiceImpl<Integer, LiveGoo
 	@Autowired
 	private RedissonClient redissonClient;
 
+	@Autowired
+	private SysUserFeignService sysUserFeignService;
 
 	@Override
 	public BaseDAO<Integer, LiveGoodsMapper> getDAO() {
@@ -104,17 +111,42 @@ public class LiveGoodsMapperServiceImpl extends BaseServiceImpl<Integer, LiveGoo
 	}
 
 	private void publishRoomMsg(ImLiveBroadcastRoom imLiveBroadcastRoom) {
-		LiveRoomMessage message = new LiveRoomMessage();
-		message.setIsIncludeSender(1);
-		message.setObjectName(ImRoomMessage.LIVE_GOODS_CHANGE);
-		message.setToChatRoomId(imLiveBroadcastRoom.getRoomUid());
-		message.setFromUserId(imLiveBroadcastRoom.getSpeakerId().toString());
-		message.setContent(liveGoodsMapperDao.getLiveGoodsList(imLiveBroadcastRoom.getRoomUid(),null,true));
+
+
+		// 消息发送用户
+		LiveRoomMessage.MessageUser messageUser = null;
+		SysUser sysUser = sysUserFeignService.queryUserById(imLiveBroadcastRoom.getSpeakerId());
+		if (Objects.nonNull(sysUser)) {
+			// 发送用户信息
+			messageUser = LiveRoomMessage.MessageUser
+				.builder()
+				.sendUserId(sysUser.getId().toString())
+				.sendUserName(sysUser.getUsername())
+				.avatarUrl(sysUser.getAvatar())
+				.build();
+		}
+
+		LiveRoomMessage.MessageContent messageContent = LiveRoomMessage.MessageContent
+			.builder()
+			.sendUserInfo(messageUser)
+			.goodsContent(liveGoodsMapperDao.getLiveGoodsList(imLiveBroadcastRoom.getRoomUid(),null,true))
+			.build();
+
+		LiveRoomMessage message = LiveRoomMessage.builder()
+												 .isIncludeSender(1)
+												 .objectName(LiveRoomMessage.LIVE_GOODS_CHANGE)
+												 .fromUserId(sysUser.getId().toString())
+												 .toChatRoomId(imLiveBroadcastRoom.getRoomUid())
+												 .content(messageContent)
+												 .build();
+
+		//发送消息
 		try {
-			livePluginContext.getPluginService(imLiveBroadcastRoom.getServiceProvider())
-					.sendChatRoomMessage(message);
+			livePluginContext.getPluginService(imLiveBroadcastRoom.getServiceProvider()).sendChatRoomMessage(message);
+			log.info("LIVE_GOODS_CHANGE>>>> message: {}", JSONObject.toJSONString(message));
 		} catch (Exception e) {
-			throw new RuntimeException(e);
+			log.error("LIVE_GOODS_CHANGE>>>> error {}", e.getMessage());
+			log.error("LIVE_GOODS_CHANGE>>>> sendMessage {} :", JSONObject.toJSONString(message));
 		}
 	}
 

+ 1 - 1
mec-web/src/main/java/com/ym/mec/web/controller/ImLiveBroadcastRoomController.java

@@ -112,7 +112,7 @@ public class ImLiveBroadcastRoomController extends BaseController {
     public HttpResponseResult<Object> add(@Valid @RequestBody ImLiveBroadcastRoomDto dto) {
         imLiveBroadcastRoomService.add(dto);
         //看是否需要马上创建房间
-        CompletableFuture.runAsync(imLiveBroadcastRoomService::createLiveRoom);
+        // CompletableFuture.runAsync(imLiveBroadcastRoomService::createLiveRoom);
         return succeed();
     }