SpringBoot+Vue+Websocket 实现服务器端向客户端主动发送消息

概述

本文通过一个实际的场景来介绍在前后端分离的项目中通过 WebSocket 来实现服务器端主动向客户端发送消息的应用。主要内容如下

  1. WebSocket 是什么
  2. 服务器端 向 客户端 主动发送消息的案例说明
  3. SpringBoot 后端中 Websocket 的配置和使用
  4. 后端 Websocket 实现原理
  5. Vue 前端 Websocket 的配置和使用

WebSocket 是什么

Websocket 是一种在单个 TCP 连接上进行全双工通信的协议。WebSocket 连接成功后,服务端与客户端可以双向通信。在需要消息推送的场景,Websocket 相对于轮询能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。

http-vs-websocket

具体如下特点

  1. 与 HTTP 协议有着良好的兼容性。默认端口也是 80 和 443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器。
  2. 依赖于 TCP 协议
  3. 数据格式比较轻量,性能开销小,通信高效。
  4. 可以发送文本,也可以发送二进制数据。
  5. 没有同源限制,客户端可以与任意服务器通信。
  6. 协议标识符是 ws(如果加密,则为 wss),服务器网址就是 URL

服务器端 向 客户端 主动发送消息的案例说明

在客户端的列表数据中有个 status 字段,服务器端需要花费较长的时间进行处理,处理完成后才会更新对应数据的 status 字段值,通过 Websocket 的处理流程如下:

  1. 前端页面列表数据加载后,初始化一组 Websocket 客户端对象
  2. 服务器端 接收到 前端数据状态的查询请求
  3. 服务器端 每隔一段时间查询一下数据库,然后返回给客户端
  4. 客户端 根据返回的数据状态,再更新页面数据

后端 SpringBoot 中 Websocket 的配置和使用

Maven 依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

配置

通过注入 ServerEndpointExporter 类,用于在项目启动的时候自动将使用了 @ServerEndpoint 注解声明的 Websocket endpoint 注册到 WebSocketContainer 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.ckjava.config;

/**
* Function:
*
* @author chenkui 2022/4/6 17:55
*/

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
public class WebSocketConfig {
/**
* 注入 ServerEndpointExporter,
* 这个 bean 会自动注册使用了 @ServerEndpoint 注解声明的 Websocket endpoint
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}

}

通过 @ServerEndpoint 注解标注实现类

  1. 通过在类上增加 @ServerEndpoint@Component 注解,用于标注 Websocket 的实现类
  2. 通过 ConcurrentHashMap 管理多个客户端的 Session
  3. 通过 ScheduledExecutorService 和 init 方法实现定时对客户端进行消息发送
  4. @OnOpen 标注的方法,用于接收客户端的 连接请求,其中的 @PathParam 用于接收 url 中的参数
  5. @OnClose 标注的方法,用于接收客户端的 关闭请求。
  6. @OnMessage标注的方法,用于接收客户端的 消息。
  7. @OnError标注的方法,用于错误处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package com.ckjava.websocket;

import com.ckjava.xutils.JsonUtils;
import com.ckjava.entity.TSysPubEntity;
import com.ckjava.service.TSysPubService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
* Function:
*
* @author chenkui 2022/4/6 18:00
*/
@Slf4j
@Component
@ServerEndpoint("/websocket/{pubId}")
public class PubStatusWS {
/**
* 每个数据对应一个 session
*/
private static final ConcurrentHashMap<Integer, Session> sessionPool = new ConcurrentHashMap<>();
/**
* 定时任务的线程池
*/
private static final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);

@Resource
private TSysPubService pubService;

/**
* 定时从数据库中获取数据的最新状态,然后返回给前端
*/
@PostConstruct
public void init() {
scheduledExecutorService.scheduleAtFixedRate(() -> {
sessionPool.forEach((pubId, session) -> {
final TSysPubEntity entity = pubService.findById(pubId);

// 返回最新的状态
session.getAsyncRemote().sendText(JsonUtils.toJSONString(entity));
log.debug(String.format("websocket消息 server send to pubId %s, text:%s", session.getId(), entity.getPubStatus()));
});
}, 10, 30, TimeUnit.SECONDS);
}

@OnOpen
public void onOpen(final Session session, final EndpointConfig endpointConfig,
@PathParam(value = "pubId") final Integer pubId) {
sessionPool.put(pubId, session);
log.info(String.format("【websocket消息】 pubId:%s 加入连接,当前总数为:%s", pubId, sessionPool.size()));
}

@OnClose
public void onClose(final Session session, final CloseReason closeReason) {
final AtomicReference<Integer> atomicReference = new AtomicReference<>();
sessionPool.forEach((pubId, s) -> {
if (Objects.equals(s.getId(), session.getId())) {
atomicReference.set(pubId);
}
});

sessionPool.remove(atomicReference.get());
log.info(String.format("【websocket消息】pubId:%s 连接断开,当前总数为:%s", atomicReference.get(), sessionPool.size()));
}

@OnMessage
public void onMessage(final Session session, final String message) {
log.error(String.format("【websocket消息】收到客户端 pubId:%s 消息:%s", session.getId(), message));
}

@OnError
public void onError(final Session session, final Throwable throwable) {
log.error(String.format("【websocket消息】pubId:%s 出现异常:%s", session.getId(), throwable));
}

/**
* 向客户端群发消息
* @param message 文本消息
*/
public void sendAllMessage(final String message) {
sessionPool.forEach((pubId, session) -> {
session.getAsyncRemote().sendText(message);
});
}

/**
* 向某个客户端发送消息
*
* @param pubId 客户端id
* @param message 文本消息
*/
public void sendOneMessage(final Integer pubId, final String message) {
final Session session = sessionPool.get(pubId);
if (session != null) {
session.getAsyncRemote().sendText(message);
}
}


}

后端 Websocket 实现原理

为什么增加一个 ServerEndpointExporter Bean,并通过在一个类上增加 @ServerEndpoint@Component 注解就可以实现服务器端 Websocket 功能,这里简单解析一下。

ServerEndpointExporter 的核心方法

  1. ServerEndpointExporter 实现了 spring 中的 SmartInitializingSingleton 接口,并重写了 afterSingletonsInstantiated 方法,具体如下
1
2
3
4
@Override
public void afterSingletonsInstantiated() {
registerEndpoints();
}
  1. 在 registerEndpoints 方法中可以发现,通过 ApplicationContext 中的 getBeanNamesForAnnotation 方法,从 spring 的 ioc 容器中获取含有 @ServerEndpoint 注解的类。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Actually register the endpoints. Called by {@link #afterSingletonsInstantiated()}.
*/
protected void registerEndpoints() {
Set<Class<?>> endpointClasses = new LinkedHashSet<>();
if (this.annotatedEndpointClasses != null) {
endpointClasses.addAll(this.annotatedEndpointClasses);
}

ApplicationContext context = getApplicationContext();
if (context != null) {
String[] endpointBeanNames = context.getBeanNamesForAnnotation(ServerEndpoint.class);
for (String beanName : endpointBeanNames) {
endpointClasses.add(context.getType(beanName));
}
}

for (Class<?> endpointClass : endpointClasses) {
registerEndpoint(endpointClass);
}

if (context != null) {
Map<String, ServerEndpointConfig> endpointConfigMap = context.getBeansOfType(ServerEndpointConfig.class);
for (ServerEndpointConfig endpointConfig : endpointConfigMap.values()) {
registerEndpoint(endpointConfig);
}
}
}
  1. 在 registerEndpoint 方法中,通过 ServerContainer 的 addEndpoint 方法,最终将 endpoint 实现类注册到 ServerContainer 中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void registerEndpoint(Class<?> endpointClass) {
ServerContainer serverContainer = getServerContainer();
Assert.state(serverContainer != null,
"No ServerContainer set. Most likely the server's own WebSocket ServletContainerInitializer " +
"has not run yet. Was the Spring ApplicationContext refreshed through a " +
"org.springframework.web.context.ContextLoaderListener, " +
"i.e. after the ServletContext has been fully initialized?");
try {
if (logger.isDebugEnabled()) {
logger.debug("Registering @ServerEndpoint class: " + endpointClass);
}
serverContainer.addEndpoint(endpointClass);
}
catch (DeploymentException ex) {
throw new IllegalStateException("Failed to register @ServerEndpoint class: " + endpointClass, ex);
}
}

ServerContainer

java 定义了一套 javax.servlet-api, 一个 HttpServlet 就是一个 HTTP 服务。java websocket 并非基于 servlet-api 简单扩展, 而是新定义了一套 javax.websocket-api。

一个 websocket 服务对应一个 Endpoint。与 ServletContext 对应, websocket-api 也定义了 WebSocketContainer, 而编程方式注册 websocket 的接口是继承自 WebSocketContainer 的 ServerContainer。

一个 websocket 可以接受并管理多个连接, 因此可被视作一个 server。主流 servlet 容器都支持 websocket, 如 tomcat, jetty 等。看 ServerContainer api 文档, 可从 ServletContext attribute 找到 ServerContainer。

Vue 前端 Websocket 的配置和使用

  1. 在 created 方法中调用了 getPageData 方法,用于接收到列表数据后,通过 initWs 方法 给 每个数据 id 初始化一个 WebSocket 客户端
  2. 通过 onerror 事件绑定 出现异常 时候的回调方法
  3. 通过 onopen 事件绑定 连接成功 的回调方法
  4. 通过 onmessage 事件绑定 接收到服务器端消息 的回调方法,这里收到消息后,再更新前端的数据
  5. 通过 onclose 事件绑定 关闭连接 的回调方法
  6. 在 unmounted 方法中调用了 onbeforeunload 方法,用于关闭所有的 WebSocket 连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
<script>
export default {
name: "SysPub",
components: {},
data() {
return {
loading: false,
websocketArr: [],
sortable: {
sortDirections: ["ascend", "descend"],
},
dataList: [],
total: 0,
};
},

methods: {
onSubmit() {
if (this.createTimeSection) {
this.queryInfo.search.createTimeSection = this.createTimeSection;
} else {
this.queryInfo.search.createTimeSection = [];
}
if (this.queryInfo.search.name === "") {
this.queryInfo.search.name = undefined;
}
this.getPageData();
},

async getPageData() {
this.loading = true;
sysPubPageData(this.queryInfo)
.then(
(resp) => {
this.dataList = resp.list;
this.total = resp.total;

// 将数据加入到 websocket 中
this.websocketArr = [];
_.forEach(this.dataList, (d) => {
this.initWs(d.id);
});
},
(error) => {
this.$message.error("加载列表数据出现异常!" + error);
}
)
.finally(() => {
this.loading = false;
});
},

handleSizeChange(newSize) {
console.log(newSize);
this.queryInfo.pageSize = newSize;
this.getPageData();
},

handleCurrentChange(newPage) {
this.queryInfo.pageNum = newPage;
this.getPageData();
},

setErrorMessage(websocket, event) {
console.log("WebSocket连接发生错误 状态码:" + websocket.readyState);
},
setOnopenMessage(websocket, event) {
console.log("WebSocket连接成功 状态码:" + websocket.readyState);
},
setOnmessageMessage(websocket, event) {
// 根据服务器推送的消息做自己的业务处理
console.log("服务端返回:" + event.data);
if (event.data) {
let dataObj = JSON.parse(event.data);
_.forEach(this.dataList, (d) => {
if (d.id === dataObj.id) {
d.pubStatus = dataObj.pubStatus;
}
});
}
},

setOncloseMessage(websocket, event) {
console.log("WebSocket连接关闭 状态码:" + websocket.readyState);
},

// 关闭所有的 websocket 实例
onbeforeunload() {
console.log("this.websocketArr", this.websocketArr);
_.forEach(this.websocketArr, (websocket) => {
websocket.close();
});
},

initWs(pubId) {
if ("WebSocket" in window) {
let websocket = new WebSocket(
`ws://localhost:8080/websocket/${pubId}`
);
// 连接错误
websocket.onerror = (event) => this.setErrorMessage(websocket, event);
// 连接成功
websocket.onopen = (event) => this.setOnopenMessage(websocket, event);
// 收到消息的回调
websocket.onmessage = (event) =>
this.setOnmessageMessage(websocket, event);
// 连接关闭的回调
websocket.onclose = (event) =>
this.setOncloseMessage(websocket, event);

// 将 websocket 实例存入数组中
this.websocketArr.push(websocket);
} else {
alert("当前浏览器 Not support websocket");
}
},
},
created() {
this.getPageData();
},
mounted() {},
unmounted() {
this.onbeforeunload();
},
};
</script>

打赏

  • 微信

  • 支付宝