Prechádzať zdrojové kódy

socketio支持分布式布署

Eric 2 rokov pred
rodič
commit
e10c1c22be

+ 16 - 1
mec-websocket/src/main/java/com/ym/mec/web/config/SocketIoConfig.java

@@ -4,6 +4,7 @@ import com.corundumstudio.socketio.SocketConfig;
 import com.corundumstudio.socketio.SocketIOServer;
 import com.corundumstudio.socketio.Transport;
 import com.corundumstudio.socketio.store.RedissonStoreFactory;
+import com.corundumstudio.socketio.store.pubsub.PubSubStore;
 import lombok.extern.slf4j.Slf4j;
 import org.redisson.api.RedissonClient;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -20,8 +21,12 @@ public class SocketIoConfig {
     @Value("${socket.server.port:3002}")
     private Integer port;
 
+    private final RedissonClient redissonClient;
+
     @Autowired
-    private RedissonClient redissonClient;
+    public SocketIoConfig(RedissonClient redissonClient) {
+        this.redissonClient = redissonClient;
+    }
 
     @Bean
     public SocketIOServer socketIOServer() {
@@ -55,4 +60,14 @@ public class SocketIoConfig {
         return new SocketIOServer(config);
     }
 
+    /**
+     * 解决分布式服务器实例环境
+     * @param socketServer SocketIOServer
+     * @return PubSubStore
+     */
+    @Bean
+    public PubSubStore pubSubStore(SocketIOServer socketServer) {
+
+        return socketServer.getConfiguration().getStoreFactory().pubSubStore();
+    }
 }

+ 58 - 10
mec-websocket/src/main/java/com/ym/mec/web/handler/WhiteboardHandler.java

@@ -6,16 +6,24 @@ import com.corundumstudio.socketio.SocketIONamespace;
 import com.corundumstudio.socketio.annotation.OnConnect;
 import com.corundumstudio.socketio.annotation.OnDisconnect;
 import com.corundumstudio.socketio.annotation.OnEvent;
+import com.corundumstudio.socketio.protocol.Packet;
+import com.corundumstudio.socketio.protocol.PacketType;
+import com.corundumstudio.socketio.store.pubsub.DispatchMessage;
+import com.corundumstudio.socketio.store.pubsub.PubSubStore;
+import com.corundumstudio.socketio.store.pubsub.PubSubType;
 import com.google.common.collect.Lists;
 import com.ym.mec.web.support.anno.NamespaceReference;
 import com.ym.mec.web.support.anno.OnNamespace;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.util.CollectionUtils;
 
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -46,8 +54,42 @@ public class WhiteboardHandler {
     // 客户端广播
     private static final String EVENT_CLIENT_BROADCAST = "client-broadcast";
 
+    // 房间编号
+    private static final String ROOM_ID = "room";
+
     @NamespaceReference
     private SocketIONamespace namespace;
+    @Autowired
+    private PubSubStore pubSubStore;
+
+    /**
+     * 分布式服务器,消息分发
+     *
+     * @param client    SocketIOClient
+     * @param roomId    房间ID
+     * @param eventName 事件名称
+     * @param data      发送数据
+     */
+    public void dispatchMessage(SocketIOClient client, String roomId, String eventName, List<Object> data) {
+
+        // 分发消息(当前服务不会向client推送自己分发出去的消息)
+        try {
+
+            // 命名空间
+            String namespace = client.getNamespace().getName();
+
+            // 发送数据包
+            Packet packet = new Packet(PacketType.MESSAGE);
+            packet.setSubType(PacketType.EVENT);
+            packet.setName(eventName);
+            packet.setNsp(namespace);
+            packet.setData(data);
+
+            pubSubStore.publish(PubSubType.DISPATCH, new DispatchMessage(roomId, packet, namespace));
+        } catch (Exception e) {
+            log.error("PubSubType.DISPATCH roomId={}, sid={}", roomId, client.getSessionId(), e);
+        }
+    }
 
     /**
      * 发送初始化房间事件
@@ -57,14 +99,12 @@ public class WhiteboardHandler {
     public void onConnect(SocketIOClient client) {
 
         // 房间ID
-        String roomId = client.getHandshakeData().getSingleUrlParam("roomId");
+        String roomId = client.getHandshakeData().getSingleUrlParam(ROOM_ID);
 
         log.info("onConnect client={}, ns={}, roomId={}", client.getSessionId(), client.getNamespace().getName(), roomId);
         //发送初始化房间事件
         client.sendEvent(EVENT_INIT_ROOM);
 
-        client.set("socket-room", roomId);
-
     }
 
     /**
@@ -75,13 +115,11 @@ public class WhiteboardHandler {
     public void onDisconnect(SocketIOClient client) {
 
         // 房间ID
-        String roomId = client.getHandshakeData().getSingleUrlParam("roomId");
+        String roomId = client.getHandshakeData().getSingleUrlParam(ROOM_ID);
 
         log.info("onDisconnect client={}, ns={}, roomId={}", client.getSessionId(), client.getNamespace().getName(), roomId);
         client.disconnect();
 
-        // 删除数据
-        client.del("socket-room");
         // 通知用户参与所有房间,用户变化信息
         if (StringUtils.isNotEmpty(roomId)) {
 
@@ -94,6 +132,8 @@ public class WhiteboardHandler {
                 roomOperations.sendEvent(EVENT_ROOM_USER_CHANGE, collect);
             }
 
+            // 消息分发
+            dispatchMessage(client, roomId, EVENT_ROOM_USER_CHANGE, Collections.singletonList(collect));
         }
     }
 
@@ -129,37 +169,45 @@ public class WhiteboardHandler {
             roomOperations.sendEvent(EVENT_ROOM_USER_CHANGE, collect);
         }
 
+        // 消息分发
+        dispatchMessage(client, roomId, EVENT_JOIN_ROOM, Collections.singletonList(roomId));
     }
 
     /**
      * 转发 server-broadcast =>client-broadcast
+     * @param client SocketIOClient
      * @param roomId 房间ID
      * @param encryptedData 接收透传数据
      * @param iv 接收透传数据
      */
     @OnEvent(value = EVENT_SERVER_BROADCAST)
-    public void serverBroadcast(String roomId, Object encryptedData, Object iv) {
-        log.info("serverBroadcast roomId={} data={}, iv={}", roomId, encryptedData, iv);
+    public void serverBroadcast(SocketIOClient client, String roomId, byte[] encryptedData, byte[] iv) {
+        //log.info("serverBroadcast roomId={}", roomId);
 
         BroadcastOperations roomOperations = namespace.getRoomOperations(roomId);
         // 发送房间广播消息
         roomOperations.sendEvent(EVENT_CLIENT_BROADCAST, encryptedData, iv);
 
+        // 消息分发
+        dispatchMessage(client, roomId, EVENT_CLIENT_BROADCAST, Arrays.asList(encryptedData, iv));
     }
 
     /**
      * 转发 server-volatile-broadcast =>client-broadcast
+     * @param client SocketIOClient
      * @param roomId 房间ID
      * @param encryptedData 接收透传数据
      * @param iv 接收透传数据
      */
     @OnEvent(value = EVENT_SERVER_VOLATILE_BROADCAST)
-    public void serverVolatileBroadcast(String roomId, Object encryptedData, Object iv) {
-        log.info("serverVolatileBroadcast roomId={} data={}, iv={}", roomId, encryptedData, iv);
+    public void serverVolatileBroadcast(SocketIOClient client, String roomId, byte[] encryptedData, byte[] iv) {
+        //log.info("serverVolatileBroadcast roomId={}", roomId);
 
         BroadcastOperations roomOperations = namespace.getRoomOperations(roomId);
         // 发送房间广播消息
         roomOperations.sendEvent(EVENT_CLIENT_BROADCAST, encryptedData, iv);
 
+        // 消息分发
+        dispatchMessage(client, roomId, EVENT_CLIENT_BROADCAST, Arrays.asList(encryptedData, iv));
     }
 }

+ 27 - 2
mec-websocket/src/main/java/com/ym/mec/web/support/socket/ServerRunner.java

@@ -1,26 +1,51 @@
 package com.ym.mec.web.support.socket;
 
 import com.corundumstudio.socketio.SocketIOServer;
+import com.corundumstudio.socketio.namespace.Namespace;
+import com.corundumstudio.socketio.namespace.NamespacesHub;
+import com.corundumstudio.socketio.store.pubsub.DispatchMessage;
+import com.corundumstudio.socketio.store.pubsub.PubSubStore;
+import com.corundumstudio.socketio.store.pubsub.PubSubType;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.stereotype.Component;
 
+import java.util.Objects;
+
 @Slf4j
 @Component
 public class ServerRunner implements CommandLineRunner {
 
     private final SocketIOServer server;
+    private final PubSubStore pubSubStore;
+    private final NamespacesHub namespacesHub;
 
     @Autowired
-    public ServerRunner(SocketIOServer server) {
+    public ServerRunner(SocketIOServer server, PubSubStore pubSubStore) {
         this.server = server;
+        this.pubSubStore = pubSubStore;
+        this.namespacesHub = new NamespacesHub(server.getConfiguration());
     }
 
     @Override
-    public void run(String... args) throws Exception {
+    public void run(String... args) {
+
+        // 启动服务
         server.start();
 
+        // 订阅消息
+        pubSubStore.subscribe(PubSubType.DISPATCH, message -> {
+
+            // 分布式服务空间请求分发
+            Namespace namespace = namespacesHub.get(message.getNamespace());
+            if (Objects.nonNull(namespace)) {
+                log.info("PubSubType.DISPATCH room={}, package={}", message.getRoom(), message.getPacket());
+                namespace.dispatch(message.getRoom(), message.getPacket());
+            }
+
+        }, DispatchMessage.class);
+
         log.info("--------SocketIO------- SERVER.START PORT={}", server.getConfiguration().getPort());
     }
 }