|  | @@ -1,122 +1,139 @@
 | 
	
		
			
				|  |  |  package com.keao.edu.user.controller;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +import com.alibaba.fastjson.JSON;
 | 
	
		
			
				|  |  | +import com.alibaba.fastjson.JSONObject;
 | 
	
		
			
				|  |  | +import org.apache.commons.lang3.StringUtils;
 | 
	
		
			
				|  |  |  import org.slf4j.Logger;
 | 
	
		
			
				|  |  |  import org.slf4j.LoggerFactory;
 | 
	
		
			
				|  |  |  import org.springframework.stereotype.Component;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -import javax.annotation.PostConstruct;
 | 
	
		
			
				|  |  |  import javax.websocket.*;
 | 
	
		
			
				|  |  | +import javax.websocket.server.PathParam;
 | 
	
		
			
				|  |  |  import javax.websocket.server.ServerEndpoint;
 | 
	
		
			
				|  |  |  import java.io.IOException;
 | 
	
		
			
				|  |  | -import java.util.concurrent.CopyOnWriteArraySet;
 | 
	
		
			
				|  |  | -import java.util.concurrent.atomic.AtomicInteger;
 | 
	
		
			
				|  |  | +import java.util.concurrent.ConcurrentHashMap;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -@ServerEndpoint(value = "/ws/asset")
 | 
	
		
			
				|  |  | +@ServerEndpoint(value = "/ws/{userId}")
 | 
	
		
			
				|  |  |  @Component
 | 
	
		
			
				|  |  |  public class WebSocketServer {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    @PostConstruct
 | 
	
		
			
				|  |  | -    public void init() {
 | 
	
		
			
				|  |  | -        System.out.println("websocket 加载");
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  |      private static Logger log = LoggerFactory.getLogger(WebSocketServer.class);
 | 
	
		
			
				|  |  | -    private static final AtomicInteger OnlineCount = new AtomicInteger(0);
 | 
	
		
			
				|  |  | -    // concurrent包的线程安全Set,用来存放每个客户端对应的Session对象。
 | 
	
		
			
				|  |  | -    private static CopyOnWriteArraySet<Session> SessionSet = new CopyOnWriteArraySet<Session>();
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | +    /**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/
 | 
	
		
			
				|  |  | +    private static int onlineCount = 0;
 | 
	
		
			
				|  |  | +    /**concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。*/
 | 
	
		
			
				|  |  | +    private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
 | 
	
		
			
				|  |  | +    /**与某个客户端的连接会话,需要通过它来给客户端发送数据*/
 | 
	
		
			
				|  |  | +    private Session session;
 | 
	
		
			
				|  |  | +    /**接收userId*/
 | 
	
		
			
				|  |  | +    private String userId="";
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      /**
 | 
	
		
			
				|  |  | -     * 连接建立成功调用的方法
 | 
	
		
			
				|  |  | -     */
 | 
	
		
			
				|  |  | +     * 连接建立成功调用的方法*/
 | 
	
		
			
				|  |  |      @OnOpen
 | 
	
		
			
				|  |  | -    public void onOpen(Session session) {
 | 
	
		
			
				|  |  | -        SessionSet.add(session);
 | 
	
		
			
				|  |  | -        int cnt = OnlineCount.incrementAndGet(); // 在线数加1
 | 
	
		
			
				|  |  | -        log.info("有连接加入,当前连接数为:{}", cnt);
 | 
	
		
			
				|  |  | -        SendMessage(session, "连接成功");
 | 
	
		
			
				|  |  | +    public void onOpen(Session session,@PathParam("userId") String userId) {
 | 
	
		
			
				|  |  | +        this.session = session;
 | 
	
		
			
				|  |  | +        this.userId=userId;
 | 
	
		
			
				|  |  | +        if(webSocketMap.containsKey(userId)){
 | 
	
		
			
				|  |  | +            webSocketMap.remove(userId);
 | 
	
		
			
				|  |  | +            webSocketMap.put(userId,this);
 | 
	
		
			
				|  |  | +            //加入set中
 | 
	
		
			
				|  |  | +        }else{
 | 
	
		
			
				|  |  | +            webSocketMap.put(userId,this);
 | 
	
		
			
				|  |  | +            //加入set中
 | 
	
		
			
				|  |  | +            addOnlineCount();
 | 
	
		
			
				|  |  | +            //在线数加1
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        log.info("用户连接:"+userId+",当前在线人数为:" + getOnlineCount());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        try {
 | 
	
		
			
				|  |  | +            sendMessage("连接成功");
 | 
	
		
			
				|  |  | +        } catch (IOException e) {
 | 
	
		
			
				|  |  | +            log.error("用户:"+userId+",网络异常!!!!!!");
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      /**
 | 
	
		
			
				|  |  |       * 连接关闭调用的方法
 | 
	
		
			
				|  |  |       */
 | 
	
		
			
				|  |  |      @OnClose
 | 
	
		
			
				|  |  | -    public void onClose(Session session) {
 | 
	
		
			
				|  |  | -        SessionSet.remove(session);
 | 
	
		
			
				|  |  | -        int cnt = OnlineCount.decrementAndGet();
 | 
	
		
			
				|  |  | -        log.info("有连接关闭,当前连接数为:{}", cnt);
 | 
	
		
			
				|  |  | +    public void onClose() {
 | 
	
		
			
				|  |  | +        if(webSocketMap.containsKey(userId)){
 | 
	
		
			
				|  |  | +            webSocketMap.remove(userId);
 | 
	
		
			
				|  |  | +            //从set中删除
 | 
	
		
			
				|  |  | +            subOnlineCount();
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        log.info("用户退出:"+userId+",当前在线人数为:" + getOnlineCount());
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      /**
 | 
	
		
			
				|  |  |       * 收到客户端消息后调用的方法
 | 
	
		
			
				|  |  |       *
 | 
	
		
			
				|  |  | -     * @param message
 | 
	
		
			
				|  |  | -     *            客户端发送过来的消息
 | 
	
		
			
				|  |  | -     */
 | 
	
		
			
				|  |  | +     * @param message 客户端发送过来的消息*/
 | 
	
		
			
				|  |  |      @OnMessage
 | 
	
		
			
				|  |  |      public void onMessage(String message, Session session) {
 | 
	
		
			
				|  |  | -        log.info("来自客户端的消息:{}",message);
 | 
	
		
			
				|  |  | -        SendMessage(session, "收到消息,消息内容:"+message);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | +        log.info("用户消息:"+userId+",报文:"+message);
 | 
	
		
			
				|  |  | +        //可以群发消息
 | 
	
		
			
				|  |  | +        //消息保存到数据库、redis
 | 
	
		
			
				|  |  | +        if(StringUtils.isNotBlank(message)){
 | 
	
		
			
				|  |  | +            try {
 | 
	
		
			
				|  |  | +                //解析发送的报文
 | 
	
		
			
				|  |  | +                JSONObject jsonObject = JSON.parseObject(message);
 | 
	
		
			
				|  |  | +                //追加发送人(防止串改)
 | 
	
		
			
				|  |  | +                jsonObject.put("fromUserId",this.userId);
 | 
	
		
			
				|  |  | +                String toUserId=jsonObject.getString("toUserId");
 | 
	
		
			
				|  |  | +                //传送给对应toUserId用户的websocket
 | 
	
		
			
				|  |  | +                if(StringUtils.isNotBlank(toUserId)&&webSocketMap.containsKey(toUserId)){
 | 
	
		
			
				|  |  | +                    webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
 | 
	
		
			
				|  |  | +                }else{
 | 
	
		
			
				|  |  | +                    log.error("请求的userId:"+toUserId+"不在该服务器上");
 | 
	
		
			
				|  |  | +                    //否则不在这个服务器上,发送到mysql或者redis
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            }catch (Exception e){
 | 
	
		
			
				|  |  | +                e.printStackTrace();
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      /**
 | 
	
		
			
				|  |  | -     * 出现错误
 | 
	
		
			
				|  |  | +     *
 | 
	
		
			
				|  |  |       * @param session
 | 
	
		
			
				|  |  |       * @param error
 | 
	
		
			
				|  |  |       */
 | 
	
		
			
				|  |  |      @OnError
 | 
	
		
			
				|  |  |      public void onError(Session session, Throwable error) {
 | 
	
		
			
				|  |  | -        log.error("发生错误:{},Session ID: {}",error.getMessage(),session.getId());
 | 
	
		
			
				|  |  | +        log.error("用户错误:"+this.userId+",原因:"+error.getMessage());
 | 
	
		
			
				|  |  |          error.printStackTrace();
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |      /**
 | 
	
		
			
				|  |  | -     * 发送消息,实践表明,每次浏览器刷新,session会发生变化。
 | 
	
		
			
				|  |  | -     * @param session
 | 
	
		
			
				|  |  | -     * @param message
 | 
	
		
			
				|  |  | +     * 实现服务器主动推送
 | 
	
		
			
				|  |  |       */
 | 
	
		
			
				|  |  | -    public static void SendMessage(Session session, String message) {
 | 
	
		
			
				|  |  | -        try {
 | 
	
		
			
				|  |  | -            session.getBasicRemote().sendText(String.format("%s (From Server,Session ID=%s)",message,session.getId()));
 | 
	
		
			
				|  |  | -        } catch (IOException e) {
 | 
	
		
			
				|  |  | -            log.error("发送消息出错:{}", e.getMessage());
 | 
	
		
			
				|  |  | -            e.printStackTrace();
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | +    public void sendMessage(String message) throws IOException {
 | 
	
		
			
				|  |  | +        this.session.getBasicRemote().sendText(message);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      /**
 | 
	
		
			
				|  |  | -     * 群发消息
 | 
	
		
			
				|  |  | -     * @param message
 | 
	
		
			
				|  |  | -     * @throws IOException
 | 
	
		
			
				|  |  | -     */
 | 
	
		
			
				|  |  | -    public static void BroadCastInfo(String message) throws IOException {
 | 
	
		
			
				|  |  | -        for (Session session : SessionSet) {
 | 
	
		
			
				|  |  | -            if(session.isOpen()){
 | 
	
		
			
				|  |  | -                SendMessage(session, message);
 | 
	
		
			
				|  |  | -            }
 | 
	
		
			
				|  |  | +     * 发送自定义消息
 | 
	
		
			
				|  |  | +     * */
 | 
	
		
			
				|  |  | +    public static void sendInfo(String message,@PathParam("userId") String userId) throws IOException {
 | 
	
		
			
				|  |  | +        log.info("发送消息到:"+userId+",报文:"+message);
 | 
	
		
			
				|  |  | +        if(StringUtils.isNotBlank(userId)&&webSocketMap.containsKey(userId)){
 | 
	
		
			
				|  |  | +            webSocketMap.get(userId).sendMessage(message);
 | 
	
		
			
				|  |  | +        }else{
 | 
	
		
			
				|  |  | +            log.error("用户"+userId+",不在线!");
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    /**
 | 
	
		
			
				|  |  | -     * 指定Session发送消息
 | 
	
		
			
				|  |  | -     * @param sessionId
 | 
	
		
			
				|  |  | -     * @param message
 | 
	
		
			
				|  |  | -     * @throws IOException
 | 
	
		
			
				|  |  | -     */
 | 
	
		
			
				|  |  | -    public static void SendMessage(String message,String sessionId) throws IOException {
 | 
	
		
			
				|  |  | -        Session session = null;
 | 
	
		
			
				|  |  | -        for (Session s : SessionSet) {
 | 
	
		
			
				|  |  | -            if(s.getId().equals(sessionId)){
 | 
	
		
			
				|  |  | -                session = s;
 | 
	
		
			
				|  |  | -                break;
 | 
	
		
			
				|  |  | -            }
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -        if(session!=null){
 | 
	
		
			
				|  |  | -            SendMessage(session, message);
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -        else{
 | 
	
		
			
				|  |  | -            log.warn("没有找到你指定ID的会话:{}",sessionId);
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | +    public static synchronized int getOnlineCount() {
 | 
	
		
			
				|  |  | +        return onlineCount;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    public static synchronized void addOnlineCount() {
 | 
	
		
			
				|  |  | +        WebSocketServer.onlineCount++;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    public static synchronized void subOnlineCount() {
 | 
	
		
			
				|  |  | +        WebSocketServer.onlineCount--;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |  }
 |