Browse Source

画板SocketIO服务重构Java版

Eric 2 years ago
parent
commit
c2d1dc7d90

+ 1 - 0
.gitignore

@@ -35,6 +35,7 @@ mvnw.cmd
 **/bootstrap-dev.properties
 **/bootstrap-local.yml
 **/bootstrap-local.properties
+**/logback-test.xml
 
 /bin/
 **/logback-spring.xml

+ 1 - 1
mec-auth/mec-auth-api/src/main/java/com/ym/mec/auth/api/client/fallback/SysUserFeignServiceFallback.java

@@ -9,7 +9,7 @@ import com.ym.mec.auth.api.client.SysUserFeignService;
 import com.ym.mec.auth.api.entity.SysUser;
 import com.ym.mec.common.entity.HttpResponseResult;
 
-@Component
+//@Component
 public class SysUserFeignServiceFallback implements SysUserFeignService {
 
 	@Override

+ 0 - 17
mec-websocket/src/main/java/com/ym/mec/web/WebSocketApplication.java

@@ -29,21 +29,4 @@ public class WebSocketApplication {
 		SpringApplication.run(WebSocketApplication.class, args);
 	}
 
-	/**
-	 * 注册filter
-	 * @return
-	 */
-	@Bean
-	public FilterRegistrationBean<Filter> filterRegistrationBean() {
-		FilterRegistrationBean<Filter> registration = new FilterRegistrationBean<Filter>();
-		// 注入过滤器
-		registration.setFilter(new EmojiEncodingFilter());
-		// 过滤器名称
-		registration.setName("emojiEncodingFilter");
-		// 拦截规则
-		registration.addUrlPatterns("/*");
-		// 过滤器顺序(值越小,优先级越高)
-		registration.setOrder(1);
-		return registration;
-	}
 }

+ 0 - 52
mec-websocket/src/main/java/com/ym/mec/web/config/PermissionCheckService.java

@@ -1,52 +0,0 @@
-package com.ym.mec.web.config;
-
-import java.util.Collection;
-
-import org.apache.commons.lang3.StringUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.annotation.Lazy;
-import org.springframework.security.core.Authentication;
-import org.springframework.security.core.GrantedAuthority;
-import org.springframework.stereotype.Component;
-
-import com.ym.mec.auth.api.client.SysUserFeignService;
-import com.ym.mec.auth.api.entity.SysUser;
-import com.ym.mec.common.security.SecurityUtils;
-
-@Component("pcs")
-public class PermissionCheckService {
-	
-	@Autowired
-	@Lazy
-	private SysUserFeignService sysUserFeignService;
-
-	public boolean hasPermissions(String... permissions) {
-		Authentication authentication = SecurityUtils.getAuthentication();
-		if (authentication == null) {
-			return false;
-		}
-
-		SysUser user = sysUserFeignService.queryUserInfo();
-		if(user.getIsSuperAdmin()){
-			return true;
-		}
-
-		Collection<? extends GrantedAuthority> authorities = authentication.getAuthorities();
-
-		for (String perm : permissions) {
-			for (GrantedAuthority authority : authorities) {
-				if (StringUtils.equalsIgnoreCase(perm, authority.getAuthority())) {
-					return true;
-				}
-			}
-		}
-
-		return false;
-	}
-
-	public boolean hasRoles(String... roles) {
-
-		return hasPermissions(roles);
-	}
-
-}

+ 1 - 1
mec-websocket/src/main/java/com/ym/mec/web/config/ResourceServerConfig.java

@@ -32,7 +32,7 @@ public class ResourceServerConfig extends ResourceServerConfigurerAdapter {
                 .authorizeRequests()
                 .antMatchers("/task/**")
                 .hasIpAddress("0.0.0.0/0")
-                .antMatchers("/v2/api-docs","/socket.io").permitAll().anyRequest()
+                .antMatchers("/v2/api-docs", "/ws-server").permitAll().anyRequest()
                 .authenticated().and().httpBasic();
     }
 

+ 28 - 17
mec-websocket/src/main/java/com/ym/mec/web/config/SocketIoConfig.java

@@ -1,46 +1,57 @@
 package com.ym.mec.web.config;
 
-import com.corundumstudio.socketio.AuthorizationListener;
-import com.corundumstudio.socketio.HandshakeData;
 import com.corundumstudio.socketio.SocketConfig;
 import com.corundumstudio.socketio.SocketIOServer;
+import com.corundumstudio.socketio.Transport;
+import com.corundumstudio.socketio.store.RedissonStoreFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.redisson.api.RedissonClient;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+@Slf4j
 @Configuration
 public class SocketIoConfig {
-    @Value("${socket.server.host}")
+    @Value("${socket.server.host:127.0.0.1}")
     private String host;
 
-    @Value("${socket.server.port}")
+    @Value("${socket.server.port:3002}")
     private Integer port;
 
+    @Autowired
+    private RedissonClient redissonClient;
+
     @Bean
     public SocketIOServer socketIOServer() {
         com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
         config.setHostname(host);
         config.setPort(port);
-        config.setPingInterval(5000);
-        config.setPingTimeout(3000);
+        config.setPingInterval(25000);
+        config.setPingTimeout(60000);
         config.setWorkerThreads(100);
         config.setRandomSession(true);
-
-        SocketConfig socketConfig = new SocketConfig();
-        socketConfig.setReuseAddress(true);
-        config.setSocketConfig(socketConfig);
-
         //设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
         config.setMaxFramePayloadLength(1024 * 1024);
         //设置http交互最大内容长度
         config.setMaxHttpContentLength(1024 * 1024);
         //授权
-        config.setAuthorizationListener(new AuthorizationListener() {
-            @Override
-            public boolean isAuthorized(HandshakeData data) {
-                return true;
-            }
-        });
+        config.setAuthorizationListener(data -> true);
+        config.setTransports(Transport.WEBSOCKET, Transport.POLLING);
+        // 推荐使用redisson
+        config.setStoreFactory(new RedissonStoreFactory(redissonClient));
+        // 配置一个统一的URL,nginx做后端请求转发; ****注意"/"不能少****
+        config.setContext("/ws-server");
+        log.info("--------SocketIO------- context={}", config.getContext());
+
+        // 服务端配置
+        SocketConfig socketConfig = new SocketConfig();
+        socketConfig.setReuseAddress(true);
+        socketConfig.setTcpNoDelay(true);
+        socketConfig.setSoLinger(0);
+        config.setSocketConfig(socketConfig);
+
         return new SocketIOServer(config);
     }
 

+ 30 - 9
mec-websocket/src/main/java/com/ym/mec/web/config/WebMvcConfig.java

@@ -2,9 +2,11 @@ package com.ym.mec.web.config;
 
 import com.ym.mec.common.config.EnumConverterFactory;
 import com.ym.mec.common.config.LocalFastJsonHttpMessageConverter;
+import com.ym.mec.common.filters.EmojiEncodingFilter;
 import com.ym.mec.web.interceptor.OperationLogInterceptor;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.http.HttpMessageConverters;
+import org.springframework.boot.web.servlet.FilterRegistrationBean;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.format.FormatterRegistry;
@@ -13,6 +15,7 @@ import org.springframework.web.servlet.config.annotation.CorsRegistry;
 import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
 import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
 
+import javax.servlet.Filter;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -36,15 +39,6 @@ public class WebMvcConfig implements WebMvcConfigurer {
 		registry.addInterceptor(operationLogInterceptor).addPathPatterns("/**").excludePathPatterns("/login");
 	}
 
-	@Bean
-	public HttpMessageConverters fastJsonHttpMessageConverters() {
-		LocalFastJsonHttpMessageConverter converter = new LocalFastJsonHttpMessageConverter();
-		List<MediaType> fastMediaTypes = new ArrayList<MediaType>();
-		fastMediaTypes.add(MediaType.APPLICATION_JSON_UTF8);
-		converter.setSupportedMediaTypes(fastMediaTypes);
-		return new HttpMessageConverters(converter);
-	}
-
 	@Override
 	public void addCorsMappings(CorsRegistry registry) {
 		registry.addMapping("/**").allowedOrigins("*")
@@ -57,4 +51,31 @@ public class WebMvcConfig implements WebMvcConfigurer {
 				// 跨域允许时间
 				.maxAge(3600);
 	}
+
+	@Bean
+	public HttpMessageConverters fastJsonHttpMessageConverters() {
+		LocalFastJsonHttpMessageConverter converter = new LocalFastJsonHttpMessageConverter();
+		List<MediaType> fastMediaTypes = new ArrayList<>();
+		fastMediaTypes.add(MediaType.APPLICATION_JSON_UTF8);
+		converter.setSupportedMediaTypes(fastMediaTypes);
+		return new HttpMessageConverters(converter);
+	}
+
+	/**
+	 * 注册filter
+	 * @return FilterRegistrationBean<Filter>
+	 */
+	@Bean
+	public FilterRegistrationBean<Filter> filterRegistrationBean() {
+		FilterRegistrationBean<Filter> registration = new FilterRegistrationBean<Filter>();
+		// 注入过滤器
+		registration.setFilter(new EmojiEncodingFilter());
+		// 过滤器名称
+		registration.setName("emojiEncodingFilter");
+		// 拦截规则
+		registration.addUrlPatterns("/*");
+		// 过滤器顺序(值越小,优先级越高)
+		registration.setOrder(1);
+		return registration;
+	}
 }

+ 3 - 10
mec-websocket/src/main/java/com/ym/mec/web/controller/WebsocketController.java

@@ -1,23 +1,16 @@
 package com.ym.mec.web.controller;
 
-import com.alibaba.fastjson.JSON;
-import com.ym.mec.biz.dal.dto.MusicPitchDetailDto;
 import com.ym.mec.biz.handler.WebSocketHandler;
-import com.ym.mec.biz.service.SoundService;
 import com.ym.mec.common.controller.BaseController;
 import com.ym.mec.common.entity.HttpResponseResult;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.web.bind.annotation.*;
-import org.springframework.web.multipart.MultipartFile;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.socket.TextMessage;
 
 import java.io.IOException;
-import java.util.List;
 
 /**
  * @Author Joburgess

+ 0 - 5
mec-websocket/src/main/java/com/ym/mec/web/handler/Chat2.java

@@ -9,8 +9,6 @@ import com.corundumstudio.socketio.annotation.OnEvent;
 import com.ym.mec.web.support.anno.NamespaceReference;
 import com.ym.mec.web.support.anno.OnNamespace;
 import com.ym.mec.web.support.mes.ChatObject;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 import org.springframework.stereotype.Component;
 
 
@@ -20,9 +18,6 @@ public class Chat2 {
     @NamespaceReference
     private SocketIONamespace namespace;
 
-    private Logger logger = LogManager.getLogger(getClass().getName());
-
-
     /**
      * 添加connect事件,当客户端发起连接时调用,本文中将clientid与sessionid存入数据库
      * 方便后面发送消息时查找到对应的目标client

+ 102 - 34
mec-websocket/src/main/java/com/ym/mec/web/handler/WhiteboardHandler.java

@@ -1,97 +1,165 @@
 package com.ym.mec.web.handler;
 
-import com.corundumstudio.socketio.AckRequest;
 import com.corundumstudio.socketio.BroadcastOperations;
 import com.corundumstudio.socketio.SocketIOClient;
 import com.corundumstudio.socketio.SocketIONamespace;
 import com.corundumstudio.socketio.annotation.OnConnect;
 import com.corundumstudio.socketio.annotation.OnDisconnect;
 import com.corundumstudio.socketio.annotation.OnEvent;
+import com.google.common.collect.Lists;
 import com.ym.mec.web.support.anno.NamespaceReference;
 import com.ym.mec.web.support.anno.OnNamespace;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
 
 import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
 /**
  *    var socket = io.connect('http://ip:prot/namespeace')
  */
-
+@Slf4j
+@Data
 @Component
 @OnNamespace("/whiteboard")
 public class WhiteboardHandler {
-    @NamespaceReference
-    private SocketIONamespace namespace;
 
-    private Logger logger = LogManager.getLogger(getClass().getName());
+    // 初始化房间
+    private static final String EVENT_INIT_ROOM = "init-room";
+    // 第一次进入房间
+    private static final String EVENT_FIRST_IN_ROOM = "first-in-room";
+    // 加入房间
+    private static final String EVENT_JOIN_ROOM = "join-room";
+    // 新用户加入
+    private static final String EVENT_NEW_USER = "new-user";
+    // 房间用户改变(加入或退出)
+    private static final String EVENT_ROOM_USER_CHANGE = "room-user-change";
+    // 服务端广播事件
+    private static final String EVENT_SERVER_BROADCAST = "server-broadcast";
+    // 服务端异常广播事件
+    private static final String EVENT_SERVER_VOLATILE_BROADCAST = "server-volatile-broadcast";
+    // 客户端广播
+    private static final String EVENT_CLIENT_BROADCAST = "client-broadcast";
 
+    @NamespaceReference
+    private SocketIONamespace namespace;
 
     /**
      * 发送初始化房间事件
+     * @param client SocketIOClient
      */
     @OnConnect
     public void onConnect(SocketIOClient client) {
+
+        // 房间ID
+        String roomId = client.getHandshakeData().getSingleUrlParam("roomId");
+
+        log.info("onConnect client={}, ns={}, roomId={}", client.getSessionId(), client.getNamespace().getName(), roomId);
         //发送初始化房间事件
-        client.sendEvent("init-room");
+        client.sendEvent(EVENT_INIT_ROOM);
+
+        client.set("socket-room", roomId);
 
     }
 
     /**
      * 添加@OnDisconnect事件,客户端断开连接时调用
+     * @param client SocketIOClient
      */
     @OnDisconnect
     public void onDisconnect(SocketIOClient client) {
+
+        // 房间ID
+        String roomId = client.getHandshakeData().getSingleUrlParam("roomId");
+
+        log.info("onDisconnect client={}, ns={}, roomId={}", client.getSessionId(), client.getNamespace().getName(), roomId);
         client.disconnect();
+
+        // 删除数据
+        client.del("socket-room");
+        // 通知用户参与所有房间,用户变化信息
+        if (StringUtils.isNotEmpty(roomId)) {
+
+            BroadcastOperations roomOperations = namespace.getRoomOperations(roomId);
+
+            List<String> collect = roomOperations.getClients().stream()
+                    .map(x -> x.getSessionId().toString()).distinct().collect(Collectors.toList());
+
+            if (!CollectionUtils.isEmpty(collect)) {
+                roomOperations.sendEvent(EVENT_ROOM_USER_CHANGE, collect);
+            }
+
+        }
     }
 
 
     /**
      * 加入房间事件
-     * @param client
-     * @param data
-     * @param ackRequest
+     * @param client SocketIOClient
+     * @param roomId 房间ID
      */
-    @OnEvent(value = "join-room")
-    public void joinRoom(SocketIOClient client, Object data, AckRequest ackRequest) {
-        BroadcastOperations roomOperations = namespace.getRoomOperations("whiteboard");
+    @OnEvent(value = EVENT_JOIN_ROOM)
+    public void joinRoom(SocketIOClient client, String roomId) {
+        log.info("joinRoom roomId={}", roomId);
+
+        // 加入房间
+        client.joinRoom(roomId);
+
+        BroadcastOperations roomOperations = namespace.getRoomOperations(roomId);
         Collection<SocketIOClient> clients = roomOperations.getClients();
-        client.joinRoom("whiteboard");
+
+        log.debug("joinRoom clients={}", clients.size());
         if (clients.size() > 1) {
-            for (SocketIOClient socketIOClient : clients) {
-                roomOperations.sendEvent("new-user", socketIOClient.getSessionId().toString());
-            }
+            roomOperations.sendEvent(EVENT_NEW_USER, client.getSessionId().toString());
         } else {
             //发送
-            client.sendEvent("first-in-room");
+            client.sendEvent(EVENT_FIRST_IN_ROOM);
         }
+
+        List<String> collect = Optional.of(clients).orElse(Lists.newArrayList()).stream()
+                .map(x -> x.getSessionId().toString()).distinct().collect(Collectors.toList());
         //发送
-        roomOperations.sendEvent("room-user-change", clients);
+        if (!CollectionUtils.isEmpty(collect)) {
+
+            roomOperations.sendEvent(EVENT_ROOM_USER_CHANGE, collect);
+        }
 
     }
 
     /**
      * 转发 server-broadcast =>client-broadcast
-     * @param client
-     * @param data
-     * @param ackRequest
+     * @param roomId 房间ID
+     * @param encryptedData 接收透传数据
+     * @param iv 接收透传数据
      */
-    @OnEvent(value = "server-broadcast")
-    public void serverBroadcast(SocketIOClient client, Object data, AckRequest ackRequest) {
-        BroadcastOperations roomOperations = namespace.getRoomOperations("whiteboard");
-        roomOperations.sendEvent("client-broadcast", data);
+    @OnEvent(value = EVENT_SERVER_BROADCAST)
+    public void serverBroadcast(String roomId, Object encryptedData, Object iv) {
+        log.info("serverBroadcast roomId={} data={}, iv={}", roomId, encryptedData, iv);
+
+        BroadcastOperations roomOperations = namespace.getRoomOperations(roomId);
+        // 发送房间广播消息
+        roomOperations.sendEvent(EVENT_CLIENT_BROADCAST, encryptedData, iv);
+
     }
 
     /**
      * 转发 server-volatile-broadcast =>client-broadcast
-     * @param client
-     * @param data
-     * @param ackRequest
+     * @param roomId 房间ID
+     * @param encryptedData 接收透传数据
+     * @param iv 接收透传数据
      */
-    @OnEvent(value = "server-volatile-broadcast")
-    public void serverVolatileBroadcast(SocketIOClient client, Object data, AckRequest ackRequest) {
-        BroadcastOperations roomOperations = namespace.getRoomOperations("whiteboard");
-        roomOperations.sendEvent("client-broadcast", data);
+    @OnEvent(value = EVENT_SERVER_VOLATILE_BROADCAST)
+    public void serverVolatileBroadcast(String roomId, Object encryptedData, Object iv) {
+        log.info("serverVolatileBroadcast roomId={} data={}, iv={}", roomId, encryptedData, iv);
+
+        BroadcastOperations roomOperations = namespace.getRoomOperations(roomId);
+        // 发送房间广播消息
+        roomOperations.sendEvent(EVENT_CLIENT_BROADCAST, encryptedData, iv);
+
     }
 }

+ 4 - 1
mec-websocket/src/main/java/com/ym/mec/web/support/socket/ServerRunner.java

@@ -1,11 +1,12 @@
 package com.ym.mec.web.support.socket;
 
 import com.corundumstudio.socketio.SocketIOServer;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.stereotype.Component;
 
-
+@Slf4j
 @Component
 public class ServerRunner implements CommandLineRunner {
 
@@ -19,5 +20,7 @@ public class ServerRunner implements CommandLineRunner {
     @Override
     public void run(String... args) throws Exception {
         server.start();
+
+        log.info("--------SocketIO------- SERVER.START PORT={}", server.getConfiguration().getPort());
     }
 }

+ 0 - 0
mec-websocket/src/main/resources/application.yml → mec-websocket/src/main/resources/application-template.yml


+ 17 - 0
pom.xml

@@ -300,6 +300,23 @@
 				</exclusion>
 			</exclusions>
 		</dependency>
+
+		<!--增加通用依赖-->
+		<dependency>
+			<groupId>org.projectlombok</groupId>
+			<artifactId>lombok</artifactId>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-api</artifactId>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-starter-test</artifactId>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 
 	<build>