最近在项目中在做一个消息推送的功能,比如客户下单之后通知给给对应的客户发送系统通知,这种消息推送需要使用到全双工的websocket
推送消息。
所谓的全双工表示客户端和服务端都能向对方发送消息。不使用同样是全双工的
http
是因为http
只能由客户端主动发起请求,服务接收后返回消息。websocket
建立起连接之后,客户端和服务端都能主动向对方发送消息。
上一篇文章Spring Boot 整合单机websocket介绍了websocket
在单机模式下进行消息的发送和接收:
用户A
和用户B
和web
服务器建立连接之后,用户A
发送一条消息到服务器,服务器再推送给用户B
,在单机系统上所有的用户都和同一个服务器建立连接,所有的session
都存储在同一个服务器中。
单个服务器是无法支撑几万人同时连接同一个服务器,需要使用到分布式或者集群将请求连接负载均衡到到不同的服务下。消息的发送方和接收方在同一个服务器,这就和单体服务器类似,能成功接收到消息:
但负载均衡使用轮询的算法,无法保证消息发送方和接收方处于同一个服务器,当发送方和接收方不是在同一个服务器时,接收方是无法接受到消息的:
websocket集群问题解决思路
客户端和服务端每次建立连接时候,会创建有状态的会话session
,服务器的保存维持连接的session
。客户端每次只能和集群服务器其中的一个服务器连接,后续也是和该服务器进行数据传输。
要解决集群的问题,应该考虑session共享
的问题,客户端成功连接服务器之后,其他服务器也知道客户端连接成功。
方案一:session 共享(不可行)
和websocket
类似的http
是如何解决集群问题的?解决方案之一就是共享session
,客户端登录服务端之后,将session
信息存储在Redis
数据库中,连接其他服务器时,从Redis
获取session
,实际就是将session
信息存储在Redis
中,实现redis的共享。
session
可以被共享的前提是可以被序列化,而websocket
的session
是无法被序列化的,http
的session
记录的是请求的数据,而websocket
的session
对应的是连接,连接到不同的服务器,session
也不同,无法被序列化。
方案二:ip hash(不可行)
http
不使用session
共享,就可以使用Nginx
负载均衡的ip hash
算法,客户端每次都是请求同一个服务器,客户端的session
都保存在服务器上,而后续请求都是请求该服务器,都能获取到session
,就不存在分布式session
问题了。
websocket
相对http
来说,可以由服务端主动推动消息给客户端,如果接收消息的服务端和发送消息消息的服务端不是同一个服务端,发送消息的服务端无法找到接收消息对应的session
,即两个session不处于同一个服务端,也就无法推送消息。如下图所示:
解决问题的方法是将所有消息的发送方和接收方都处于同一个服务器下,而消息发送方和接收方都是不确定的,显然是无法实现的。
方案三:广播模式
将消息的发送方和接收方都处于同一个服务器下才能发送消息,那么可以转换一下思路,可以将消息以消息广播的方式通知给所有的服务器,可以使用消息中间件发布订阅模式,消息脱离了服务器的限制,通过发送到中间件,再发送给订阅的服务器,类似广播一样,只要订阅了消息,都能接收到消息的通知:
发布者发布消息到消息中间件,消息中间件再将发送给所有订阅者:
广播模式的实现
搭建单机 websocket
参考以前写的websocket单机搭建 文章,先搭建单机websocket
实现消息的推送。
1. 添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-freemarker</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2. 创建 ServerEndpointExporter 的 bean 实例
ServerEndpointExporter 的 bean 实例自动注册 @ServerEndpoint 注解声明的 websocket endpoint,使用springboot自带tomcat启动需要该配置,使用独立 tomcat 则不需要该配置。
@Configuration public class WebSocketConfig { //tomcat启动无需该配置 @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
3. 创建服务端点 ServerEndpoint 和 客户端端
- 服务端点
@Component @ServerEndpoint(value = "/message") @Slf4j public class WebSocket { private static Map<String, WebSocket> webSocketSet = new ConcurrentHashMap<>(); private Session session; @OnOpen public void onOpen(Session session) throws SocketException { this.session = session; webSocketSet.put(this.session.getId(),this); log.info("【websocket】有新的连接,总数:{}",webSocketSet.size()); } @OnClose public void onClose(){ String id = this.session.getId(); if (id != null){ webSocketSet.remove(id); log.info("【websocket】连接断开:总数:{}",webSocketSet.size()); } } @OnMessage public void onMessage(String message){ if (!message.equals("ping")){ log.info("【wesocket】收到客户端发送的消息,message={}",message); sendMessage(message); } } /** * 发送消息 * @param message * @return */ public void sendMessage(String message){ for (WebSocket webSocket : webSocketSet.values()) { webSocket.session.getAsyncRemote().sendText(message); } log.info("【wesocket】发送消息,message={}", message); } }
- 客户端点
<div> <input type="text" name="message" id="message"> <button id="sendBtn">发送</button> </div> <div style="width:100px;height: 500px;" id="content"> </div> <script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.6.0/jquery.js"></script> <script type="text/javascript"> var ws = new WebSocket("ws://127.0.0.1:8080/message"); ws.onopen = function(evt) { console.log("Connection open ..."); }; ws.onmessage = function(evt) { console.log( "Received Message: " + evt.data); var p = $("<p>"+evt.data+"</p>") $("#content").prepend(p); $("#message").val(""); }; ws.onclose = function(evt) { console.log("Connection closed."); }; $("#sendBtn").click(function(){ var aa = $("#message").val(); ws.send(aa); }) </script>
服务端和客户端中的OnOpen
、onclose
、onmessage
都是一一对应的。
- 服务启动后,客户端
ws.onopen
调用服务端的@OnOpen
注解的方法,储存客户端的session信息,握手建立连接。 - 客户端调用
ws.send
发送消息,对应服务端的@OnMessage
注解下面的方法接收消息。 - 服务端调用
session.getAsyncRemote().sendText
发送消息,对应的客户端ws.onmessage
接收消息。
添加 controller
@GetMapping({"","index.html"}) public ModelAndView index() { ModelAndView view = new ModelAndView("index"); return view; }
效果展示
打开两个客户端,其中的一个客户端发送消息,另一个客户端也能接收到消息。
添加 RabbitMQ 中间件
这里使用比较常用的RabbitMQ
作为消息中间件,而RabbitMQ
支持发布订阅模式:
添加消息订阅
交换机使用扇形交换机,消息分发给每一条绑定该交换机的队列。以服务器所在的IP + 端口作为唯一标识作为队列的命名,启动一个服务,使用队列绑定交换机,实现消息的订阅:
@Configuration public class RabbitConfig { @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("PUBLISH_SUBSCRIBE_EXCHANGE"); } @Bean public Queue psQueue() throws SocketException { // ip + 端口 为队列名 String ip = IpUtils.getServerIp() + "_" + IpUtils.getPort(); return new Queue("ps_" + ip); } @Bean public Binding routingFirstBinding() throws SocketException { return BindingBuilder.bind(psQueue()).to(fanoutExchange()); } }
获取服务器IP和端口可以具体查看Github源码,这里就不做详细描述了。
修改服务端点 ServerEndpoint
在WebSocket
添加消息的接收方法,@RabbitListener
接收消息,队列名称使用常量命名,动态队列名称使用 #{name}
,其中的name
是Queue
的bean
名称:
@RabbitListener(queues= "#{psQueue.name}") public void pubsubQueueFirst(String message) { System.out.println(message); sendMessage(message); }
然后再调用sendMessage
方法发送给所在连接的客户端。
修改消息发送
在WebSocket
类的onMessage
方法将消息发送改成RabbitMQ
方式发送:
@OnMessage public void onMessage(String message){ if (!message.equals("ping")){ log.info("【wesocket】收到客户端发送的消息,message={}",message); //sendMessage(message); if (rabbitTemplate == null) { rabbitTemplate = (RabbitTemplate) SpringContextUtil.getBean("rabbitTemplate"); } rabbitTemplate.convertAndSend("PUBLISH_SUBSCRIBE_EXCHANGE", null, message); } }
消息通知流程如下所示:
启动两个实例,模拟集群环境
打开idea的Edit Configurations
:
点击左上角的COPY,然后添加端口server.port=8081
:
启动两个服务,端口分别是8080
和8081
。在启动8081
端口的服务,将前端连接端口改成8081
:
var ws = new WebSocket("ws://127.0.0.1:8081/message");