概述
web管理系统中可以对业务数据执行新增和删除,现在需要当业务数据发生新增或删除操作后,尽可能实时的反应到WPF客户端上面。
web管理系统用VUE编写,后端服务为SpringBoot,WPF客户端基于.Netframework4.8编写。
整体架构
设计
流程设计
- 用户在浏览器界面执行新增业务数据操作,调用后端新增接口
- WPF客户端在启动的时候初始化websocket客户端,并创建对server的监听
- 后端新增接口先将数据落库,而后调用websocket服务端产生消息,消息在产生后立马被发送到了正在监听中的websocket-client
- websocket-server和websocket-client是一对多的关系,如何保证业务数据被正确的分发?监听的时候给server端传递一个全局唯一的clientId,业务数据在产生的时候关联到一个BizId上面,只要保证clientId=BizId就可以了。
- 删除流程和新增类似
程序设计
WebSocketServer
概述
WebSocketServer端采用SpringBoot框架实现,通过在springboot-web项目中集成 org.springframework.boot:spring-boot-starter-websocket
实现websocket的能力。
新增pom
<!-- websocket --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
新增配置类
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.server.standard.ServerEndpointExporter; @Configuration @EnableWebSocket public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
创建websocket端点
import com.alibaba.fastjson.JSON; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; @ServerEndpoint("/ws/label/{clientId}") @Component public class LabelWebSocket { /** * session list */ private static ConcurrentHashMap<String, Session> sessionList = new ConcurrentHashMap<>(); /** * 当前 clientId */ private String currentClientId = ""; @OnOpen public void open(Session session, @PathParam("clientId") String clientId) throws IOException { if (sessionList.containsKey(clientId)) { sessionList.remove(clientId); } sessionList.put(clientId, session); currentClientId = clientId; this.sendMsg(session, "connectok"); } @OnClose public void close(Session session) throws IOException { sessionList.remove(currentClientId); System.out.println("连接关闭,session=" + JSON.toJSONString(session.getId())); } @OnMessage public void receiveMsg(Session session, String msg) throws IOException { this.sendMsg(session, "接收到的消息为:" + msg); // throw new RuntimeException("主动抛异常"); } @OnError public void error(Session session, Throwable e) throws IOException { System.out.println("连接异常,session=" + JSON.toJSONString(session.getId()) + ";currentClientId=" + currentClientId); this.sendMsg(session, "发生异常,e=" + e.getMessage()); e.printStackTrace(); } /** * @param clientId * @param msg */ public boolean sendMsg(String clientId, String msg) throws IOException { if (sessionList.containsKey(clientId)) { Session session = sessionList.get(clientId); this.sendMsg(session, msg); return true; } else { return false; } } private void sendMsg(Session session, String msg) throws IOException { session.getBasicRemote().sendText(msg); } }
WebSocketClient
概述
WebSocketClient端集成在WPF应用客户端中,通过前期调研,选中 WebSocketSharp 作为websocketclient工具,WebSocketSharp 是托管在Github的开源项目,MITLicense,目前4.9K的star。
安装WebSocketSharp
//nuget Install-Package WebSocketSharp -Pre
初始化client
WebSocket ws = new WebSocket("ws://127.0.0.1:8083/ws/xx/clientId");
创建连接
private void InitWebSocket() { ws.OnOpen += (sender, e) => { Console.WriteLine("onOpen"); }; //允许ping ws.EmitOnPing = true; //接收到xiaoxi ws.OnMessage += (sender, e) => { ReceiveMessage(sender, e); }; ws.Connect(); //发送消息 //ws.Send("BALUS") ; } private void ReceiveMessage(object sender, MessageEventArgs e) { if (e.IsText) { // Do something with e.Data.like jsonstring Console.WriteLine(e.Data); return; } if (e.IsBinary) { // Do something with e.RawData. like byte[] return; } if (e.IsPing) { // Do something to notify that a ping has been received. return; } }
跨线程更新UI
由于 WebSocketSharp 会创建线程来处理 ReceiveMessage ,而WPF中子线程是无法更新UI的,所以需要引入 Dispatcher 来实现跨线程更新UI。
获取当前线程名字
//当前线程 string name = Thread.CurrentThread.ManagedThreadId.ToString();
示例代码
private void ReceiveMessage(object sender, MessageEventArgs e) { if (e.IsText) { // Do something with e.Data.like jsonstring Console.WriteLine(e.Data); //当前线程 string name = Thread.CurrentThread.ManagedThreadId.ToString(); App.Current.Dispatcher.Invoke((Action)(() => { Image lab = new Image(); lab.Uid = "123456"; lab.Name = "labName"; lab.Width = 50; lab.Height = 50; string url = "http://xxx:xxx/img/louyu.png"; BitmapImage bitmapImage = HttpUtil.getImage(url); lab.Source = bitmapImage; lab.AddHandler(MouseLeftButtonDownEvent, new MouseButtonEventHandler(LabelClick)); Canvas.SetTop(lab, 800); Canvas.SetLeft(lab, 600); this.cav.Children.Add(lab); })); return; } }
接口设计
新增接口
概述
目前WebSocketServer和web后端服务是在同一个SpringBoot的工程中,所以只要将WebSocketServer托管到SpringContainer中,web后端服务可以通过 DI 的方式直接访问 WebSocketEndPoint。
如果考虑程序的低耦合,可以在WebSocketServer和web后端服务之间架设一个MQ。
核心代码
@Autowired private LabelWebSocket ws; @GetMapping("/create") public boolean createLabel() throws IOException { String cameraId = "cml"; //todo boolean result = ws.sendMsg(cameraId, "新增标签"); return result; }
风险
分布式风险
当前在 WebSocketServer 中,已经连接的client信息是记录在当前进程的cache中,如果服务做横向扩容,cache信息无法在多实例进程中传递,将导致无法正确的处理业务数据,并可能会发生意想不到的异常和bug,此问题在并发越高的情况下造成的影响越大
资源风险
web后端服务为基于java语言的springboot程序,这种类型程序的特点是内存消耗特别严重。WebSocketServer服务在本项目中仅用作消息中间件,连通web后端服务和WPF客户端。
首先WebSocketServer没有太多的计算能力的消耗,内存消耗会随着连接客户端数量的增长而增长,网络将是最大的开销,一方面需要转发来自web后端服务的业务数据,并和WPF客户端保持长连接;另一方面WebSocketServer和WPF客户端的交互可能会走公网,而其和web后端服务必然是在局域网环境。
综上,将web后端服务和WebSocketServer分开部署对于硬件资源成本和利用率来说是最好的选择。
高可用风险
未引入重试机制,当某一个环节失败之后,将导致异常情况发生。