WhiteboardHandler.java 10 KB


  1. package com.ym.mec.web.handler;
  2. import com.corundumstudio.socketio.BroadcastOperations;
  3. import com.corundumstudio.socketio.SocketIOClient;
  4. import com.corundumstudio.socketio.SocketIONamespace;
  5. import com.corundumstudio.socketio.annotation.OnConnect;
  6. import com.corundumstudio.socketio.annotation.OnDisconnect;
  7. import com.corundumstudio.socketio.annotation.OnEvent;
  8. import com.corundumstudio.socketio.protocol.Packet;
  9. import com.corundumstudio.socketio.protocol.PacketType;
  10. import com.corundumstudio.socketio.store.pubsub.PubSubStore;
  11. import com.google.common.collect.Lists;
  12. import com.ym.mec.biz.service.WsConnectService;
  13. import com.ym.mec.web.support.anno.NamespaceReference;
  14. import com.ym.mec.web.support.anno.OnNamespace;
  15. import lombok.Data;
  16. import lombok.extern.slf4j.Slf4j;
  17. import org.apache.commons.lang3.StringUtils;
  18. import org.joda.time.DateTime;
  19. import org.springframework.beans.factory.annotation.Autowired;
  20. import org.springframework.stereotype.Component;
  21. import org.springframework.util.CollectionUtils;
  22. import java.util.Arrays;
  23. import java.util.Collection;
  24. import java.util.Collections;
  25. import java.util.List;
  26. import java.util.Optional;
  27. import java.util.stream.Collectors;
  28. /**
  29. * var socket = io.connect('http://ip:prot/namespeace')
  30. */
  31. @Slf4j
  32. @Data
  33. @Component
  34. @OnNamespace("/whiteboard")
  35. public class WhiteboardHandler {
  36. // 初始化房间
  37. private static final String EVENT_INIT_ROOM = "init-room";
  38. // 心跳配置
  39. private static final String EVENT_HEARTBEAT_CONFIG = "heartbeat-config";
  40. // 心跳事件
  41. private static final String EVENT_HEARTBEAT_SYNC = "heartbeat-sync";
  42. // 第一次进入房间
  43. private static final String EVENT_FIRST_IN_ROOM = "first-in-room";
  44. // 加入房间
  45. private static final String EVENT_JOIN_ROOM = "join-room";
  46. // 新用户加入
  47. private static final String EVENT_NEW_USER = "new-user";
  48. // 房间用户改变(加入或退出)
  49. private static final String EVENT_ROOM_USER_CHANGE = "room-user-change";
  50. // 服务端广播事件
  51. private static final String EVENT_SERVER_BROADCAST = "server-broadcast";
  52. // 服务端异常广播事件
  53. private static final String EVENT_SERVER_VOLATILE_BROADCAST = "server-volatile-broadcast";
  54. // 客户端广播
  55. private static final String EVENT_CLIENT_BROADCAST = "client-broadcast";
  56. // 房间编号
  57. private static final String ROOM_ID = "roomId";
  58. private static final String USER_ID = "userId";
  59. private static final String CLIENT_TYPE = "clientType";
  60. @NamespaceReference
  61. private SocketIONamespace namespace;
  62. @Autowired
  63. private PubSubStore pubSubStore;
  64. @Autowired
  65. private WsConnectService wsConnectService;
  66. /**
  67. * 分布式服务器,消息分发
  68. *
  69. * @param client SocketIOClient
  70. * @param roomId 房间ID
  71. * @param eventName 事件名称
  72. * @param data 发送数据
  73. */
  74. public void dispatchMessage(SocketIOClient client, String roomId, String eventName, List<Object> data) {
  75. // 分发消息(当前服务不会向client推送自己分发出去的消息)
  76. try {
  77. // 命名空间
  78. String namespace = client.getNamespace().getName();
  79. // 发送数据包
  80. Packet packet = new Packet(PacketType.MESSAGE);
  81. packet.setSubType(PacketType.EVENT);
  82. packet.setName(eventName);
  83. packet.setNsp(namespace);
  84. packet.setData(data);
  85. //pubSubStore.publish(PubSubType.DISPATCH, new DispatchMessage(roomId, packet, namespace));
  86. } catch (Exception e) {
  87. log.error("PubSubType.DISPATCH roomId={}, sid={}", roomId, client.getSessionId(), e);
  88. }
  89. }
  90. /**
  91. * 发送初始化房间事件
  92. * @param client SocketIOClient
  93. */
  94. @OnConnect
  95. public void onConnect(SocketIOClient client) {
  96. // ws连接请求,需要包含roomId, userId, clientType参数
  97. // 房间ID
  98. String roomId = client.getHandshakeData().getSingleUrlParam(ROOM_ID);
  99. // 用户ID
  100. String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
  101. // 客户端类型
  102. String clientType = client.getHandshakeData().getSingleUrlParam(CLIENT_TYPE);
  103. log.debug("onConnect client={}, ns={}, roomId={}, userId={}", client.getSessionId(),
  104. client.getNamespace().getName(), roomId, userId);
  105. //发送初始化房间事件
  106. client.sendEvent(EVENT_INIT_ROOM);
  107. // 发送心跳同步事件
  108. WhiteboardMessage.HeartbeatConfig config = WhiteboardMessage.HeartbeatConfig
  109. .builder()
  110. .event(EVENT_HEARTBEAT_SYNC)
  111. .timestamp(DateTime.now().getMillis())
  112. .interval(15)
  113. .build();
  114. client.sendEvent(EVENT_HEARTBEAT_CONFIG, config);
  115. if (StringUtils.isNoneBlank(roomId, userId, clientType)) {
  116. wsConnectService.saveWsConnectRecordInfo(client.getSessionId().toString(), roomId,
  117. userId, clientType);
  118. }
  119. }
  120. /**
  121. * 添加@OnDisconnect事件,客户端断开连接时调用
  122. * @param client SocketIOClient
  123. */
  124. @OnDisconnect
  125. public void onDisconnect(SocketIOClient client) {
  126. // 房间ID
  127. String roomId = client.getHandshakeData().getSingleUrlParam(ROOM_ID);
  128. // 用户ID
  129. String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
  130. // 客户端类型
  131. String clientType = client.getHandshakeData().getSingleUrlParam(CLIENT_TYPE);
  132. log.debug("onDisconnect client={}, ns={}, roomId={}, userId={}", client.getSessionId(),
  133. client.getNamespace().getName(), roomId, userId);
  134. client.disconnect();
  135. // 通知用户参与所有房间,用户变化信息
  136. if (StringUtils.isNotEmpty(roomId)) {
  137. BroadcastOperations roomOperations = namespace.getRoomOperations(roomId);
  138. List<String> collect = roomOperations.getClients().stream()
  139. .map(x -> x.getSessionId().toString()).distinct().collect(Collectors.toList());
  140. if (!CollectionUtils.isEmpty(collect)) {
  141. roomOperations.sendEvent(EVENT_ROOM_USER_CHANGE, collect);
  142. }
  143. // 消息分发
  144. dispatchMessage(client, roomId, EVENT_ROOM_USER_CHANGE, Collections.singletonList(collect));
  145. }
  146. if (StringUtils.isNoneBlank(roomId, userId, clientType)) {
  147. wsConnectService.updateWsDisconnectRecordInfo(client.getSessionId().toString(), roomId, userId, clientType);
  148. }
  149. }
  150. /**
  151. * 画板心跳同步监听
  152. * @param client SocketIOClient
  153. * @param timestamp 时间戳
  154. */
  155. @OnEvent(value = EVENT_HEARTBEAT_SYNC)
  156. public void onHeartbeatSync(SocketIOClient client, Long timestamp) {
  157. // 房间ID
  158. String roomId = client.getHandshakeData().getSingleUrlParam(ROOM_ID);
  159. // 用户ID
  160. String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
  161. // 客户端类型
  162. String clientType = client.getHandshakeData().getSingleUrlParam(CLIENT_TYPE);
  163. /*log.debug("onHeartbeatSync client={}, ns={}, roomId={}, userId={}, timestamp={}", client.getSessionId(),
  164. client.getNamespace().getName(), roomId, userId, timestamp);*/
  165. if (StringUtils.isNoneBlank(roomId, userId, clientType)) {
  166. wsConnectService.updateWsDisconnectRecordInfo(client.getSessionId().toString(), roomId, userId, clientType);
  167. }
  168. }
  169. /**
  170. * 加入房间事件
  171. * @param client SocketIOClient
  172. * @param roomId 房间ID
  173. */
  174. @OnEvent(value = EVENT_JOIN_ROOM)
  175. public void joinRoom(SocketIOClient client, String roomId) {
  176. log.debug("joinRoom roomId={}", roomId);
  177. // 加入房间
  178. client.joinRoom(roomId);
  179. BroadcastOperations roomOperations = namespace.getRoomOperations(roomId);
  180. Collection<SocketIOClient> clients = roomOperations.getClients();
  181. log.debug("joinRoom clients={}", clients.size());
  182. if (clients.size() > 1) {
  183. roomOperations.sendEvent(EVENT_NEW_USER, client.getSessionId().toString());
  184. } else {
  185. //发送
  186. client.sendEvent(EVENT_FIRST_IN_ROOM);
  187. }
  188. List<String> collect = Optional.of(clients).orElse(Lists.newArrayList()).stream()
  189. .map(x -> x.getSessionId().toString()).distinct().collect(Collectors.toList());
  190. //发送
  191. if (!CollectionUtils.isEmpty(collect)) {
  192. roomOperations.sendEvent(EVENT_ROOM_USER_CHANGE, collect);
  193. }
  194. // 消息分发
  195. dispatchMessage(client, roomId, EVENT_JOIN_ROOM, Collections.singletonList(roomId));
  196. }
  197. /**
  198. * 转发 server-broadcast =>client-broadcast
  199. * @param client SocketIOClient
  200. * @param roomId 房间ID
  201. * @param encryptedData 接收透传数据
  202. * @param iv 接收透传数据
  203. */
  204. @OnEvent(value = EVENT_SERVER_BROADCAST)
  205. public void serverBroadcast(SocketIOClient client, String roomId, byte[] encryptedData, byte[] iv) {
  206. //log.info("serverBroadcast roomId={}", roomId);
  207. BroadcastOperations roomOperations = namespace.getRoomOperations(roomId);
  208. // 发送房间广播消息
  209. roomOperations.sendEvent(EVENT_CLIENT_BROADCAST, encryptedData, iv);
  210. // 消息分发
  211. dispatchMessage(client, roomId, EVENT_CLIENT_BROADCAST, Arrays.asList(encryptedData, iv));
  212. }
  213. /**
  214. * 转发 server-volatile-broadcast =>client-broadcast
  215. * @param client SocketIOClient
  216. * @param roomId 房间ID
  217. * @param encryptedData 接收透传数据
  218. * @param iv 接收透传数据
  219. */
  220. @OnEvent(value = EVENT_SERVER_VOLATILE_BROADCAST)
  221. public void serverVolatileBroadcast(SocketIOClient client, String roomId, byte[] encryptedData, byte[] iv) {
  222. //log.info("serverVolatileBroadcast roomId={}", roomId);
  223. BroadcastOperations roomOperations = namespace.getRoomOperations(roomId);
  224. // 发送房间广播消息
  225. roomOperations.sendEvent(EVENT_CLIENT_BROADCAST, encryptedData, iv);
  226. // 消息分发
  227. dispatchMessage(client, roomId, EVENT_CLIENT_BROADCAST, Arrays.asList(encryptedData, iv));
  228. }
  229. }