Quellcode durchsuchen

redisson发送消息

liujc vor 2 Jahren
Ursprung
Commit
01501e27c5

+ 57 - 0
cooleshow-user/user-biz/src/main/java/com/yonge/cooleshow/biz/dal/redisson/RedissonMessageService.java

@@ -0,0 +1,57 @@
+package com.ym.mec.biz.redisson;
+
+import lombok.extern.slf4j.Slf4j;
+import org.redisson.api.RTopic;
+import org.redisson.api.RedissonClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Redisson消息发布订阅服务
+ */
+@Slf4j
+@Service
+public class RedissonMessageService {
+
+    // 订阅消息通知
+    public static final String TOPIC_MESSAGE = "topic:message";
+    // 直播在线人数
+    public static final String LIVE_ROOM_MEMBER = "delayQueue:liveRoomMember:";
+    public static final String LIVE_ROOM_MEMBER_LOCK = "LOCK:liveRoomMember:";
+
+    private final RedissonClient redissonClient;
+
+    @Autowired
+    public RedissonMessageService(RedissonClient redissonClient) {
+
+        this.redissonClient = redissonClient;
+    }
+
+    /**
+     * 订阅消息
+     * @param topic 消息主题
+     * @param listener MessageListener
+     */
+    public void subscribe(String topic, MessageListener listener) {
+        RTopic messageTopic = redissonClient.getTopic(topic);
+        messageTopic.addListener(String.class, (channel, msg) -> listener.onMessage(msg));
+    }
+
+    /**
+     * 发布消息
+     * @param topic 消息主题
+     * @param message 消息内容
+     */
+    public void publish(String topic, String message) {
+        RTopic messageTopic = redissonClient.getTopic(topic);
+        messageTopic.publish(message);
+    }
+
+    /**
+     * 消息监听
+     */
+    public interface MessageListener {
+        void onMessage(String message);
+    }
+
+}

+ 89 - 0
cooleshow-user/user-biz/src/main/java/com/yonge/cooleshow/biz/dal/redisson/RedissonTopicListener.java

@@ -0,0 +1,89 @@
+package com.mec.redisson;
+
+import com.ym.mec.biz.dal.vo.ImLiveBroadcastRoomVo;
+import com.ym.mec.biz.redisson.RedissonMessageService;
+import com.ym.mec.biz.service.ImLiveBroadcastRoomService;
+import lombok.extern.slf4j.Slf4j;
+import org.redisson.api.RBucket;
+import org.redisson.api.RLock;
+import org.redisson.api.RedissonClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.core.Ordered;
+import org.springframework.stereotype.Service;
+
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Redisson消息发布订阅服务监听
+ */
+@Slf4j
+@Service
+public class RedissonTopicListener implements ApplicationRunner, Ordered {
+ 
+    @Autowired
+    private RedissonMessageService redissonMessageService;
+    @Autowired
+    private RedissonClient redissonClient;
+
+    @Autowired
+    private ImLiveBroadcastRoomService imLiveBroadcastRoomService;
+ 
+    @Override
+    public void run(ApplicationArguments args) {
+
+        if (Objects.nonNull(redissonMessageService)) {
+
+            redissonMessageService.subscribe(RedissonMessageService.TOPIC_MESSAGE, (message) -> {
+                log.info("RedissonMessageService subscribe message={}", message);
+                try {
+                    TimeUnit.SECONDS.sleep(1);
+                } catch (InterruptedException e) {
+                    log.error("RedissonMessageService subscribe sleep error", e);
+                }
+
+                RLock lock = redissonClient.getLock(RedissonMessageService.LIVE_ROOM_MEMBER_LOCK + message);
+                try {
+                    if (lock.tryLock()) {
+                        // 缓存JoinRoom用户信息到redis
+                        RBucket<Object> bucket = redissonClient.getBucket(RedissonMessageService.LIVE_ROOM_MEMBER + message);
+                        if (!bucket.isExists()) {
+                            return;
+                        }
+                        Integer times = (Integer) bucket.get();
+                        bucket.delete();
+
+                        ImLiveBroadcastRoomVo imLiveBroadcastRoomVo = imLiveBroadcastRoomService.queryRoomInfo(message);
+                        if (Objects.isNull(imLiveBroadcastRoomVo)) {
+                            return;
+                        }
+                        try {
+                            imLiveBroadcastRoomService.setGroupMemberDefinedData(imLiveBroadcastRoomVo,imLiveBroadcastRoomVo.getLookNum(),imLiveBroadcastRoomVo.getTotalLookNum());
+                        } catch (Exception e) {
+                            log.error("RedissonMessageService subscribe setGroupMemberDefinedData error", e);
+                            if (times>=3) {
+                                return;
+                            }
+                            bucket.set(times +1, 30, TimeUnit.MINUTES);
+                        }
+                        redissonMessageService.publish(RedissonMessageService.TOPIC_MESSAGE, message);
+                    }
+                } finally {
+                    if (lock.getHoldCount() != 0 && lock.isHeldByCurrentThread()) {
+                        lock.unlock();
+                    }
+                }
+
+
+            });
+            log.info("---> RedissonMessageService subscribe success");
+        }
+    }
+
+    @Override
+    public int getOrder() {
+        return 1;
+    }
+}

+ 2 - 5
cooleshow-user/user-biz/src/main/java/com/yonge/cooleshow/biz/dal/service/impl/LiveRoomServiceImpl.java

@@ -1197,9 +1197,7 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
     private LiveRoomMessage.MessageContent getMessageContent(ImRoomMessage message) {
 
         switch (message.getObjectName()) {
-            case LiveRoomMessage.FORCED_OFFLINE:
-                return LiveRoomMessage.MessageContent.builder()
-                        .build();
+
             case LiveRoomMessage.LOOKER_LOGIN_OUT:
                 return LiveRoomMessage.MessageContent.builder()
                         .targetId( imGroupService.getImUserId(message.getFromUserId(),message.getClientType()))
@@ -1212,9 +1210,8 @@ public class LiveRoomServiceImpl extends ServiceImpl<LiveRoomDao, LiveRoom> impl
                         .viewers(liveRoomVo.getTotalLookNum().longValue())
                         .likes(liveRoomVo.getLikeNum().longValue())
                         .build();
+            case LiveRoomMessage.FORCED_OFFLINE:
             case LiveRoomMessage.WELCOME:
-                return LiveRoomMessage.MessageContent.builder()
-                        .build();
             default:
                 return LiveRoomMessage.MessageContent.builder()
                         .build();

+ 2 - 0
cooleshow-user/user-biz/src/main/java/com/yonge/cooleshow/biz/dal/wrapper/course/CourseScheduleWrapper.java

@@ -0,0 +1,2 @@
+package com.yonge.cooleshow.biz.dal.wrapper.course;public class CourseScheduleWrapper {
+}

+ 102 - 0
cooleshow-user/user-student/src/main/java/com/yonge/cooleshow/student/controller/UserController.java

@@ -0,0 +1,102 @@
+package com.yonge.cooleshow.student.controller;
+
+
+import com.alibaba.fastjson.JSON;
+import com.baomidou.mybatisplus.core.toolkit.StringUtils;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import com.yonge.cooleshow.auth.api.client.SysUserFeignService;
+import com.yonge.cooleshow.auth.api.entity.SysUser;
+import com.yonge.cooleshow.biz.dal.entity.ImUserFriend;
+import com.yonge.cooleshow.biz.dal.enums.ClientEnum;
+import com.yonge.cooleshow.biz.dal.enums.MK;
+import com.yonge.cooleshow.biz.dal.service.ImGroupService;
+import com.yonge.cooleshow.biz.dal.service.ImUserFriendService;
+import com.yonge.cooleshow.biz.dal.vo.im.ImUserFriendVO;
+import com.yonge.cooleshow.biz.dal.wrapper.im.ImUserWrapper;
+import com.yonge.cooleshow.common.controller.BaseController;
+import com.yonge.cooleshow.common.entity.HttpResponseResult;
+import io.swagger.annotations.*;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.*;
+
+import javax.annotation.Resource;
+import java.text.MessageFormat;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * 用户通讯录表(ImUserFriend)表控制层
+ *
+ * @author zx
+ * @since 2022-03-22 10:45:59
+ */
+@Api(tags = "用户通讯录表")
+@RestController
+@RequestMapping("/imUserFriend")
+public class ImUserFriendController extends BaseController {
+    /**
+     * 服务对象
+     */
+    @Resource
+    private ImUserFriendService imUserFriendService;
+    @Resource
+    private SysUserFeignService sysUserFeignService;
+
+    @Autowired
+    private ImGroupService imGroupService;
+
+    @ApiImplicitParams({
+            @ApiImplicitParam(name = "search", dataType = "String", value = "根据用户编号、昵称模糊查询")
+    })
+    @ApiOperation("获取通讯录成员列表")
+    @PostMapping(value = "/queryAll")
+    public HttpResponseResult<List<ImUserWrapper.ImUserFriend>> queryAll(@RequestBody Map<String,Object> params) throws Exception {
+
+        // 用户ID
+        SysUser sysUser = sysUserFeignService.queryUserInfo();
+        if (Objects.isNull(sysUser)) {
+            return failed("请登录");
+        }
+
+        // 学生好友列表
+        List<ImUserWrapper.ImUserFriend> userFriends = imUserFriendService.findUserAllImFriendInfo(ClientEnum.STUDENT, sysUser.getId(), params);
+
+        /*Object search = params.get("search");
+        List<ImUserFriend> userFriends = imUserFriendService.getBaseMapper().selectList(Wrappers.<ImUserFriend>query().lambda()
+                .and(Objects.nonNull(search) && StringUtils.isNotEmpty(search.toString()),
+                        e->e.eq(ImUserFriend::getFriendId, search).or()
+                        .like(ImUserFriend::getFriendNickname, search))
+                .eq(ImUserFriend::getUserId,sysUser.getId()).orderByDesc(ImUserFriend::getId));
+
+        for (ImUserFriend item : userFriends) {
+            // 学生目前添加好友都为老师
+            item.clientType(ClientEnum.TEACHER)
+                    .setImFriendId(MessageFormat.format("{0}", String.valueOf(item.getFriendId())));
+        }*/
+
+        return succeed(userFriends);
+    }
+
+    @ApiOperation("获取好友详情")
+    @PostMapping(value = "/getDetail/{userId}")
+    public HttpResponseResult<ImUserFriendVO.ImUserFriend> getDetail(@ApiParam(value = "用户编号", required = true) @PathVariable("userId") String userId) {
+
+        String ret = imGroupService.analysisImUserId(userId);
+        if (!ret.matches(MK.EXP_INT)) {
+            return failed("无效的用户ID");
+        }
+
+        ImUserFriend userFriend = imUserFriendService.getDetail(userId, ClientEnum.STUDENT);
+        if (Objects.isNull(userFriend)) {
+            return failed("当前好友不存在");
+        }
+
+        if (Objects.isNull(userFriend.getFriendType())) {
+            userFriend.setFriendType(ClientEnum.TEACHER);
+        }
+
+        return succeed(ImUserFriendVO.ImUserFriend.from(JSON.toJSONString(userFriend)));
+    }
+}
+

+ 58 - 0
cooleshow-user/user-teacher/src/main/java/com/yonge/cooleshow/teacher/controller/UserController.java

@@ -0,0 +1,58 @@
+package com.yonge.cooleshow.student.controller;
+
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.yonge.cooleshow.auth.api.client.SysUserFeignService;
+import com.yonge.cooleshow.auth.api.entity.SysUser;
+import com.yonge.cooleshow.biz.dal.entity.ImUserFriend;
+import com.yonge.cooleshow.biz.dal.entity.ImUserStateSync;
+import com.yonge.cooleshow.biz.dal.enums.ClientEnum;
+import com.yonge.cooleshow.biz.dal.enums.MK;
+import com.yonge.cooleshow.biz.dal.service.ImGroupService;
+import com.yonge.cooleshow.biz.dal.service.ImUserFriendService;
+import com.yonge.cooleshow.biz.dal.service.LiveRoomService;
+import com.yonge.cooleshow.biz.dal.vo.im.ImUserFriendVO;
+import com.yonge.cooleshow.biz.dal.wrapper.im.ImUserWrapper;
+import com.yonge.cooleshow.common.controller.BaseController;
+import com.yonge.cooleshow.common.entity.HttpResponseResult;
+import io.swagger.annotations.*;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.*;
+
+import javax.annotation.Resource;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * 用户通讯录表(ImUserFriend)表控制层
+ *
+ * @author zx
+ * @since 2022-03-22 10:45:59
+ */
+@Api(tags = "用户通讯录表")
+@RestController
+@Slf4j
+@RequestMapping("/user")
+public class UserController extends BaseController {
+
+    @Autowired
+    private LiveRoomService liveRoomService;
+
+
+
+    /**
+     * 同步融云用户状态变更
+     *
+     * @param userState
+     */
+    @PostMapping(value = "/statusImUser")
+    public void statusImUser(@RequestBody List<ImUserStateSync> userState) {
+        log.info("opsRoom >>>>> : {}", JSONObject.toJSONString(userState));
+        liveRoomService.opsRoom(userState);
+    }
+
+}
+