Java领域要说让我最服气的RPC框架当属Dubbo,原因有许多,但是最吸引我的还是它把远程调用这个事情设计得很有艺术。
业内对于微服务之间调用的框架选择较多,主流是Spring Cloud的Rest方式 和 Dubbo方式,我使用Dubbo方式居多。Dubbo工业级可用,稳定又高效,深受各大公司研发同学的喜爱。
Dubbo的优点较多,比如:
但是,Dubbo最吸引人的,半支烟
觉得反而倒是它的RPC调用。Dubbo的定位是一个RPC框架,这是它的核心和立足之地,所以Dubbo将RPC的调用过程透明化,使得开发者可以专注于业务逻辑,而不用关注底层通信问题。
一个RPC框架只有聚焦于先做好它的RPC调用过程这个模块,才会有人关注,其余的优点都是在这之后,慢慢迭代而来。
作者将RPC调用的这个过程,抽象成一种协议消息的传输机制,再通过控制好线程的等待和唤醒,来实现远程方法调用。这一设计思路真是美妙,充分体验了作者的智慧。
学Dubbo,首先就是要学习作者这种设计理念和思路。基于此,来实现一个简易的远程方法调用,将Dubbo的RPC过程简易化。
简易的RPC过程步骤如下,大致分5步,依旧使用Netty作用Socket通讯工具。
public class BProcessServer {
private final int port;
public BProcessServer(int port) {
this.port = port;
}
public void start() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new BProcessServerHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
System.out.println("B启动了服务,端口号: " + port);
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new BProcessServer(8088).start();
}
}
public class BProcessServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
String reqData = msg.toString(CharsetUtil.UTF_8);
System.out.println("B进程接受到了请求数据: " + reqData);
executeMethod(ctx);
}
/**
* 执行方法
*
* @param ctx
* @throws InterruptedException
*/
private void executeMethod(ChannelHandlerContext ctx) throws InterruptedException {
// TODO 将请求消息按照某种规则解析成方法名、方法参数等,其实就是反序列化的过程。
System.out.println("对接受的数据做反序列化,然后开始执行 消息体里指定的方法...");
// 模拟方法执行
Thread.sleep(2000);
System.out.println("执行完毕,返回结果...");
// 将结果 通知给 A 进程
ByteBuf dataByteBuf = ctx.alloc().buffer().writeBytes("Task completed".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(dataByteBuf);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
public class AProcessClient {
private final String host;
private final int port;
private final Object lock = new Object(); // 监视器对象
public AProcessClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new AProcessClientHandler(lock));
}
});
ChannelFuture future = bootstrap.connect(host, port).sync();
System.out.println("A进程与B进程建立了通信连接");
Channel channel = future.channel();
// 发起远程调用
callRemoteMethod(channel);
channel.closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
/**
* 执行方法
*
* @param channel
* @throws InterruptedException
*/
private void callRemoteMethod(Channel channel) throws InterruptedException {
//TODO 此处需要将调用的方法和参数,按照协议进行序列化。这次暂且省去此过程。
System.out.println("A进程将 请求的方法和参数 进行序列化,然后向B进程发起网络调用...");
ByteBuf dataByteBuf = channel.alloc().buffer().writeBytes("Start call method".getBytes(CharsetUtil.UTF_8));
channel.writeAndFlush(dataByteBuf);
// 使用wait等待B进程通知
synchronized (lock) {
System.out.println("A进程等待B进程的响应...");
lock.wait(); // 等待通知
}
System.out.println("A进程收到了B进程的响应通知,继续往下...");
}
public static void main(String[] args) throws InterruptedException {
new AProcessClient("localhost", 8088).start();
}
}
lock.wait()
以后的代码得以继续执行。public class AProcessClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final Object lock;
public AProcessClientHandler(Object lock) {
this.lock = lock;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
String resData = msg.toString(CharsetUtil.UTF_8);
System.out.println("A进程接受到了响应数据: " + resData);
// B 进程任务完成,使用 notify 唤醒等待的线程
synchronized (lock) {
lock.notify(); // 唤醒 A 进程
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
Dubbo的优秀设计思路有许多,我只钟情其一,那就是RPC的调用过程。以上是一个简易的RPC远程调用的示例,用于理解Dubbo的原理和源码,希望对你有帮助!
本篇完结!欢迎 关注、加V(yclxiao)交流、全网可搜(程序员半支烟)