123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274 |
- package com.ym.mec.web.handler;
- import com.corundumstudio.socketio.BroadcastOperations;
- import com.corundumstudio.socketio.SocketIOClient;
- import com.corundumstudio.socketio.SocketIONamespace;
- import com.corundumstudio.socketio.annotation.OnConnect;
- import com.corundumstudio.socketio.annotation.OnDisconnect;
- import com.corundumstudio.socketio.annotation.OnEvent;
- import com.corundumstudio.socketio.protocol.Packet;
- import com.corundumstudio.socketio.protocol.PacketType;
- import com.corundumstudio.socketio.store.pubsub.PubSubStore;
- import com.google.common.collect.Lists;
- import com.ym.mec.biz.service.WsConnectService;
- import com.ym.mec.web.support.anno.NamespaceReference;
- import com.ym.mec.web.support.anno.OnNamespace;
- import lombok.Data;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang3.StringUtils;
- import org.joda.time.DateTime;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import org.springframework.util.CollectionUtils;
- import java.util.Arrays;
- import java.util.Collection;
- import java.util.Collections;
- import java.util.List;
- import java.util.Optional;
- import java.util.stream.Collectors;
- /**
- * var socket = io.connect('http://ip:prot/namespeace')
- */
- @Slf4j
- @Data
- @Component
- @OnNamespace("/whiteboard")
- public class WhiteboardHandler {
- // 初始化房间
- private static final String EVENT_INIT_ROOM = "init-room";
- // 心跳配置
- private static final String EVENT_HEARTBEAT_CONFIG = "heartbeat-config";
- // 心跳事件
- private static final String EVENT_HEARTBEAT_SYNC = "heartbeat-sync";
- // 第一次进入房间
- private static final String EVENT_FIRST_IN_ROOM = "first-in-room";
- // 加入房间
- private static final String EVENT_JOIN_ROOM = "join-room";
- // 新用户加入
- private static final String EVENT_NEW_USER = "new-user";
- // 房间用户改变(加入或退出)
- private static final String EVENT_ROOM_USER_CHANGE = "room-user-change";
- // 服务端广播事件
- private static final String EVENT_SERVER_BROADCAST = "server-broadcast";
- // 服务端异常广播事件
- private static final String EVENT_SERVER_VOLATILE_BROADCAST = "server-volatile-broadcast";
- // 客户端广播
- private static final String EVENT_CLIENT_BROADCAST = "client-broadcast";
- // 房间编号
- private static final String ROOM_ID = "roomId";
- private static final String USER_ID = "userId";
- private static final String CLIENT_TYPE = "clientType";
- @NamespaceReference
- private SocketIONamespace namespace;
- @Autowired
- private PubSubStore pubSubStore;
- @Autowired
- private WsConnectService wsConnectService;
- /**
- * 分布式服务器,消息分发
- *
- * @param client SocketIOClient
- * @param roomId 房间ID
- * @param eventName 事件名称
- * @param data 发送数据
- */
- public void dispatchMessage(SocketIOClient client, String roomId, String eventName, List<Object> data) {
- // 分发消息(当前服务不会向client推送自己分发出去的消息)
- try {
- // 命名空间
- String namespace = client.getNamespace().getName();
- // 发送数据包
- Packet packet = new Packet(PacketType.MESSAGE);
- packet.setSubType(PacketType.EVENT);
- packet.setName(eventName);
- packet.setNsp(namespace);
- packet.setData(data);
- //pubSubStore.publish(PubSubType.DISPATCH, new DispatchMessage(roomId, packet, namespace));
- } catch (Exception e) {
- log.error("PubSubType.DISPATCH roomId={}, sid={}", roomId, client.getSessionId(), e);
- }
- }
- /**
- * 发送初始化房间事件
- * @param client SocketIOClient
- */
- @OnConnect
- public void onConnect(SocketIOClient client) {
- // ws连接请求,需要包含roomId, userId, clientType参数
- // 房间ID
- String roomId = client.getHandshakeData().getSingleUrlParam(ROOM_ID);
- // 用户ID
- String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
- // 客户端类型
- String clientType = client.getHandshakeData().getSingleUrlParam(CLIENT_TYPE);
- log.debug("onConnect client={}, ns={}, roomId={}, userId={}", client.getSessionId(),
- client.getNamespace().getName(), roomId, userId);
- //发送初始化房间事件
- client.sendEvent(EVENT_INIT_ROOM);
- // 发送心跳同步事件
- WhiteboardMessage.HeartbeatConfig config = WhiteboardMessage.HeartbeatConfig
- .builder()
- .event(EVENT_HEARTBEAT_SYNC)
- .timestamp(DateTime.now().getMillis())
- .interval(15)
- .build();
- client.sendEvent(EVENT_HEARTBEAT_CONFIG, config);
- if (StringUtils.isNoneBlank(roomId, userId, clientType)) {
- wsConnectService.saveWsConnectRecordInfo(client.getSessionId().toString(), roomId,
- userId, clientType);
- }
- }
- /**
- * 添加@OnDisconnect事件,客户端断开连接时调用
- * @param client SocketIOClient
- */
- @OnDisconnect
- public void onDisconnect(SocketIOClient client) {
- // 房间ID
- String roomId = client.getHandshakeData().getSingleUrlParam(ROOM_ID);
- // 用户ID
- String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
- // 客户端类型
- String clientType = client.getHandshakeData().getSingleUrlParam(CLIENT_TYPE);
- log.debug("onDisconnect client={}, ns={}, roomId={}, userId={}", client.getSessionId(),
- client.getNamespace().getName(), roomId, userId);
- client.disconnect();
- // 通知用户参与所有房间,用户变化信息
- if (StringUtils.isNotEmpty(roomId)) {
- BroadcastOperations roomOperations = namespace.getRoomOperations(roomId);
- List<String> collect = roomOperations.getClients().stream()
- .map(x -> x.getSessionId().toString()).distinct().collect(Collectors.toList());
- if (!CollectionUtils.isEmpty(collect)) {
- roomOperations.sendEvent(EVENT_ROOM_USER_CHANGE, collect);
- }
- // 消息分发
- dispatchMessage(client, roomId, EVENT_ROOM_USER_CHANGE, Collections.singletonList(collect));
- }
- if (StringUtils.isNoneBlank(roomId, userId, clientType)) {
- wsConnectService.updateWsDisconnectRecordInfo(client.getSessionId().toString(), roomId, userId, clientType);
- }
- }
- /**
- * 画板心跳同步监听
- * @param client SocketIOClient
- * @param timestamp 时间戳
- */
- @OnEvent(value = EVENT_HEARTBEAT_SYNC)
- public void onHeartbeatSync(SocketIOClient client, Long timestamp) {
- // 房间ID
- String roomId = client.getHandshakeData().getSingleUrlParam(ROOM_ID);
- // 用户ID
- String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
- // 客户端类型
- String clientType = client.getHandshakeData().getSingleUrlParam(CLIENT_TYPE);
- /*log.debug("onHeartbeatSync client={}, ns={}, roomId={}, userId={}, timestamp={}", client.getSessionId(),
- client.getNamespace().getName(), roomId, userId, timestamp);*/
- if (StringUtils.isNoneBlank(roomId, userId, clientType)) {
- wsConnectService.updateWsDisconnectRecordInfo(client.getSessionId().toString(), roomId, userId, clientType);
- }
- }
- /**
- * 加入房间事件
- * @param client SocketIOClient
- * @param roomId 房间ID
- */
- @OnEvent(value = EVENT_JOIN_ROOM)
- public void joinRoom(SocketIOClient client, String roomId) {
- log.debug("joinRoom roomId={}", roomId);
- // 加入房间
- client.joinRoom(roomId);
- BroadcastOperations roomOperations = namespace.getRoomOperations(roomId);
- Collection<SocketIOClient> clients = roomOperations.getClients();
- log.debug("joinRoom clients={}", clients.size());
- if (clients.size() > 1) {
- roomOperations.sendEvent(EVENT_NEW_USER, client.getSessionId().toString());
- } else {
- //发送
- client.sendEvent(EVENT_FIRST_IN_ROOM);
- }
- List<String> collect = Optional.of(clients).orElse(Lists.newArrayList()).stream()
- .map(x -> x.getSessionId().toString()).distinct().collect(Collectors.toList());
- //发送
- if (!CollectionUtils.isEmpty(collect)) {
- roomOperations.sendEvent(EVENT_ROOM_USER_CHANGE, collect);
- }
- // 消息分发
- dispatchMessage(client, roomId, EVENT_JOIN_ROOM, Collections.singletonList(roomId));
- }
- /**
- * 转发 server-broadcast =>client-broadcast
- * @param client SocketIOClient
- * @param roomId 房间ID
- * @param encryptedData 接收透传数据
- * @param iv 接收透传数据
- */
- @OnEvent(value = EVENT_SERVER_BROADCAST)
- public void serverBroadcast(SocketIOClient client, String roomId, byte[] encryptedData, byte[] iv) {
- //log.info("serverBroadcast roomId={}", roomId);
- BroadcastOperations roomOperations = namespace.getRoomOperations(roomId);
- // 发送房间广播消息
- roomOperations.sendEvent(EVENT_CLIENT_BROADCAST, encryptedData, iv);
- // 消息分发
- dispatchMessage(client, roomId, EVENT_CLIENT_BROADCAST, Arrays.asList(encryptedData, iv));
- }
- /**
- * 转发 server-volatile-broadcast =>client-broadcast
- * @param client SocketIOClient
- * @param roomId 房间ID
- * @param encryptedData 接收透传数据
- * @param iv 接收透传数据
- */
- @OnEvent(value = EVENT_SERVER_VOLATILE_BROADCAST)
- public void serverVolatileBroadcast(SocketIOClient client, String roomId, byte[] encryptedData, byte[] iv) {
- //log.info("serverVolatileBroadcast roomId={}", roomId);
- BroadcastOperations roomOperations = namespace.getRoomOperations(roomId);
- // 发送房间广播消息
- roomOperations.sendEvent(EVENT_CLIENT_BROADCAST, encryptedData, iv);
- // 消息分发
- dispatchMessage(client, roomId, EVENT_CLIENT_BROADCAST, Arrays.asList(encryptedData, iv));
- }
- }
|