浏览代码

feat:小节评分

Joburgess 4 年之前
父节点
当前提交
e1e0ed2188

+ 20 - 0
mec-teacher/src/main/java/com/ym/mec/teacher/config/WebSocketConfig.java

@@ -2,9 +2,11 @@ package com.ym.mec.teacher.config;
 
 import com.ym.mec.teacher.handler.CustomPrincipalHandshakeHandler;
 import com.ym.mec.teacher.handler.SoundHandler;
+import com.ym.mec.teacher.interceptor.WebSocketChannelInterceptor;
 import com.ym.mec.teacher.interceptor.WebSocketHandshakeInterceptor;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.messaging.simp.config.ChannelRegistration;
 import org.springframework.messaging.simp.config.MessageBrokerRegistry;
 import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
 import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
@@ -39,4 +41,22 @@ public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
         config.setApplicationDestinationPrefixes("/push");
     }
 
+    @Override
+    public void configureClientInboundChannel(ChannelRegistration registration) {
+        /*
+         * 配置消息线程池
+         * 1、corePoolSize() 配置核心线程池, 当线程数小于此配置时, 不管线程中有无空闲的线程, 都会产生新线程处理任务
+         * 2、maxPoolSize() 配置线程池最大数, 当线程池等于此配置时, 不会产生新线程
+         * 3、keepAliveSeconds() 线程池维护线程所允许的空闲时间, 单位为秒
+         */
+        registration.taskExecutor()
+                .corePoolSize(10)
+                .maxPoolSize(20)
+                .keepAliveSeconds(60);
+        /*
+         * 添加STOMP自定义拦截器
+         * 消息拦截器, 实现ChannelInterceptor接口
+         */
+        registration.interceptors(new WebSocketChannelInterceptor());
+    }
 }

+ 6 - 3
mec-teacher/src/main/java/com/ym/mec/teacher/controller/SoundController.java

@@ -13,6 +13,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Lazy;
 import org.springframework.messaging.handler.annotation.MessageMapping;
 import org.springframework.messaging.handler.annotation.SendTo;
 import org.springframework.messaging.simp.SimpMessagingTemplate;
@@ -37,6 +38,7 @@ public class SoundController extends BaseController {
     private SoundService soundService;
     @Autowired
     private SysUserFeignService sysUserFeignService;
+    @Lazy
     @Autowired
     private SimpMessagingTemplate template;
 
@@ -57,19 +59,20 @@ public class SoundController extends BaseController {
     }
 
     @MessageMapping("/soundDate")
-    @SendTo("/topic/greetings")
+//    @SendTo("/topic/greetings")
     public String greeting(String message){
         SysUser sysUser = sysUserFeignService.queryUserInfo();
         if(Objects.isNull(sysUser)){
             LOGGER.error("用户登录信息异常");
         }
         LOGGER.info("{}:{}", sysUser.getUsername(), message);
-        return message;
+        return "Hello";
     }
 
     @RequestMapping("sendToUser")
     public HttpResponseResult sendToUser(String phone, String message){
-        template.convertAndSendToUser(StringUtils.joinWith(":", "username", phone), "/topic/greetings", message);
+        template.convertAndSendToUser(phone, "/topic/greetings", message);
+        template.convertAndSend("/topic/greetings", message);
         return succeed();
     }
 

+ 1 - 1
mec-teacher/src/main/java/com/ym/mec/teacher/handler/CustomPrincipalHandshakeHandler.java

@@ -27,7 +27,7 @@ public class CustomPrincipalHandshakeHandler extends DefaultHandshakeHandler {
     protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
         Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
         CustomPrincipalPrincipal principal = new CustomPrincipalPrincipal();
-        principal.setName(authentication.getName());
+        principal.setName(authentication.getName().split(":")[1]);
         return principal;
     }
 

+ 83 - 0
mec-teacher/src/main/java/com/ym/mec/teacher/interceptor/WebSocketChannelInterceptor.java

@@ -0,0 +1,83 @@
+package com.ym.mec.teacher.interceptor;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.simp.stomp.StompCommand;
+import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
+import org.springframework.messaging.support.ChannelInterceptor;
+import org.springframework.messaging.support.MessageHeaderAccessor;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * @Author Joburgess
+ * @Date 2021/6/15 0015
+ */
+public class WebSocketChannelInterceptor implements ChannelInterceptor {
+
+    private final Logger LOGGER = LoggerFactory.getLogger(WebSocketChannelInterceptor.class);
+
+    /**
+     * 在消息发送之前执行, 如果此方法返回值为空, 则不会发生实际的消息发送
+     * @param message 消息
+     * @param channel 通道
+     * @return 消息
+     */
+    @Override
+    public Message<?> preSend(Message<?> message, MessageChannel channel) {
+        StompHeaderAccessor stompHeader = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
+        /*
+         * 判断是否首次连接请求, 如果已经连接, 返回message
+         */
+        if (StompCommand.CONNECT.equals(stompHeader.getCommand())) {//首次连接
+            List<String> tokens = stompHeader.getNativeHeader("Authorization");//获取TOKEN
+            LOGGER.info("preSend: stompHeader.getNativeHeader tokens --> {}", tokens);
+            String token = Optional.ofNullable(tokens).map(tokenList -> tokenList.get(0)).orElse(null);
+            LOGGER.info("preSend: token --> {}", token);
+        } else if (StompCommand.DISCONNECT.equals(stompHeader.getCommand())) {//断开连接
+            LOGGER.info("preSend: {} close connect", stompHeader.getUser());
+        }
+        return message;
+    }
+
+    /**
+     * 在消息被实际检索之前调用, 在WebSocket场景中应用不到
+     * @param channel 通道
+     * @return false: 不会检索任何消息
+     */
+    @Override
+    public boolean preReceive(MessageChannel channel) {
+        return true;
+    }
+
+    /**
+     * 在检索到消息之后, 返回调用方法之前调用, 可以进行消息修改, 如果返回null, 就不会执行下一步操作
+     * @param message 消息
+     * @param channel 通道
+     * @return 消息
+     */
+    @Override
+    public Message<?> postReceive(Message<?> message, MessageChannel channel) {
+        return message;
+    }
+
+    /**
+     * 在消息发送后立刻调用
+     * @param message 消息
+     * @param channel 通道
+     * @param sent 表示调用的返回值
+     */
+    @Override
+    public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
+        /*StompHeaderAccessor stompHeader = StompHeaderAccessor.wrap(message);
+        if (Objects.isNull(stompHeader.getCommand())) {//心跳消息
+            return;
+        }
+        log.info("postSend -- command --> {}", stompHeader.getCommand());*/
+    }
+
+}