Java领域要说让我最服气的RPC框架当属Dubbo,原因有许多,但是最吸引我的还是它把远程调用这个事情设计得很有艺术。
1、Dubbo优点较多,我只钟情其一
1.1、优点
业内对于微服务之间调用的框架选择较多,主流是Spring Cloud的Rest方式 和 Dubbo方式,我使用Dubbo方式居多。Dubbo工业级可用,稳定又高效,深受各大公司研发同学的喜爱。
Dubbo的优点较多,比如:
- 高性能:Dubbo 使用的是基于 Netty 的自定义通信协议,提供了高效的二进制数据传输,使得远程服务调用性能大幅提升。
- 模块化设计:Dubbo 的架构非常模块化,主要由五大核心模块组成:远程调用模块(RPC)、集群模块、负载均衡模块、容错模块和注册中心模块。
- 每个部件都支持多协议:每个部件都支持多种协议,比如注册中心,支持ZK、Redis、Nacos等等。
- 负载均衡和容错:Dubbo 提供了多种容错机制,比如失败重试、失败转移等。还支持多种负载均衡,比如随机、轮询、一致性哈希等。
- 服务注册和发现:Dubbo引入了注册中心的概念,实现了服务的自动注册和发现。
- SPI 扩展机制:在背八股文场景下,Dubbo被提及最多的就是使用了类似Java的SPI机制,提高了扩展性,这一点仁者见仁智者见智吧。
1.2、钟情其一
但是,Dubbo最吸引人的,半支烟
觉得反而倒是它的RPC调用。Dubbo的定位是一个RPC框架,这是它的核心和立足之地,所以Dubbo将RPC的调用过程透明化,使得开发者可以专注于业务逻辑,而不用关注底层通信问题。
一个RPC框架只有聚焦于先做好它的RPC调用过程这个模块,才会有人关注,其余的优点都是在这之后,慢慢迭代而来。
作者将RPC调用的这个过程,抽象成一种协议消息的传输机制,再通过控制好线程的等待和唤醒,来实现远程方法调用。这一设计思路真是美妙,充分体验了作者的智慧。
2、RPC简易示例
学Dubbo,首先就是要学习作者这种设计理念和思路。基于此,来实现一个简易的远程方法调用,将Dubbo的RPC过程简易化。
2.1、示例步骤
简易的RPC过程步骤如下,大致分5步,依旧使用Netty作用Socket通讯工具。
- 使用2个Java进程来模拟2个系统之间的调用,A进程 和 B进程。
- A进程的某个方法,使用网络请求调用B进程的某个方法。
- 然后A进程的方法就处于等待状态。
- 等B进程的方法执行完之后,在利用网络通知到A进程。
- 然后A进程的方法被唤醒,继续往下执行。
2.2、示例代码
- B进程作为服务端,启动网络服务
public class BProcessServer { private final int port; public BProcessServer(int port) { this.port = port; } public void start() throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new BProcessServerHandler()); } }); ChannelFuture future = bootstrap.bind(port).sync(); System.out.println("B启动了服务,端口号: " + port); future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new BProcessServer(8088).start(); } }
- B进程接受网络请求参数,反序列化之后,执行对应的方法,再将执行结果返回:
public class BProcessServerHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { String reqData = msg.toString(CharsetUtil.UTF_8); System.out.println("B进程接受到了请求数据: " + reqData); executeMethod(ctx); } /** * 执行方法 * * @param ctx * @throws InterruptedException */ private void executeMethod(ChannelHandlerContext ctx) throws InterruptedException { // TODO 将请求消息按照某种规则解析成方法名、方法参数等,其实就是反序列化的过程。 System.out.println("对接受的数据做反序列化,然后开始执行 消息体里指定的方法..."); // 模拟方法执行 Thread.sleep(2000); System.out.println("执行完毕,返回结果..."); // 将结果 通知给 A 进程 ByteBuf dataByteBuf = ctx.alloc().buffer().writeBytes("Task completed".getBytes(CharsetUtil.UTF_8)); ctx.writeAndFlush(dataByteBuf); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
- A进程启动Netty客户端,建立与B进程的通信,然后发起远程调用,处于等待状态。
public class AProcessClient { private final String host; private final int port; private final Object lock = new Object(); // 监视器对象 public AProcessClient(String host, int port) { this.host = host; this.port = port; } public void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new AProcessClientHandler(lock)); } }); ChannelFuture future = bootstrap.connect(host, port).sync(); System.out.println("A进程与B进程建立了通信连接"); Channel channel = future.channel(); // 发起远程调用 callRemoteMethod(channel); channel.closeFuture().sync(); } finally { group.shutdownGracefully(); } } /** * 执行方法 * * @param channel * @throws InterruptedException */ private void callRemoteMethod(Channel channel) throws InterruptedException { //TODO 此处需要将调用的方法和参数,按照协议进行序列化。这次暂且省去此过程。 System.out.println("A进程将 请求的方法和参数 进行序列化,然后向B进程发起网络调用..."); ByteBuf dataByteBuf = channel.alloc().buffer().writeBytes("Start call method".getBytes(CharsetUtil.UTF_8)); channel.writeAndFlush(dataByteBuf); // 使用wait等待B进程通知 synchronized (lock) { System.out.println("A进程等待B进程的响应..."); lock.wait(); // 等待通知 } System.out.println("A进程收到了B进程的响应通知,继续往下..."); } public static void main(String[] args) throws InterruptedException { new AProcessClient("localhost", 8088).start(); } }
- A进程接受B进程的响应,同时被唤醒,然后以上
lock.wait()
以后的代码得以继续执行。
public class AProcessClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private final Object lock; public AProcessClientHandler(Object lock) { this.lock = lock; } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { String resData = msg.toString(CharsetUtil.UTF_8); System.out.println("A进程接受到了响应数据: " + resData); // B 进程任务完成,使用 notify 唤醒等待的线程 synchronized (lock) { lock.notify(); // 唤醒 A 进程 } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
3、总结
Dubbo的优秀设计思路有许多,我只钟情其一,那就是RPC的调用过程。以上是一个简易的RPC远程调用的示例,用于理解Dubbo的原理和源码,希望对你有帮助!
本篇完结!欢迎 关注、加V(yclxiao)交流、全网可搜(程序员半支烟)