新聞中心
如果大家對 RPC 有一些了解的話,或多或者都會聽到過一些大名鼎鼎的 RPC 框架,比如 Dubbo、gRPC。但是大部分人對于他們底層的實現(xiàn)原理其實不甚了解。

成都創(chuàng)新互聯(lián)專注為客戶提供全方位的互聯(lián)網(wǎng)綜合服務,包含不限于成都網(wǎng)站制作、做網(wǎng)站、外貿(mào)營銷網(wǎng)站建設、河源網(wǎng)絡推廣、小程序設計、河源網(wǎng)絡營銷、河源企業(yè)策劃、河源品牌公關、搜索引擎seo、人物專訪、企業(yè)宣傳片、企業(yè)代運營等,從售前售中售后,我們都將竭誠為您服務,您的肯定,是我們最大的嘉獎;成都創(chuàng)新互聯(lián)為所有大學生創(chuàng)業(yè)者提供河源建站搭建服務,24小時服務熱線:18980820575,官方網(wǎng)址:www.cdcxhl.com
圖片來自 Pexels
有一種比較好的學習方式就是:如果你想要了解一個框架的原理,你可以嘗試去寫一個簡易版的框架出來,就比如如果你想理解 Spring IOC 的思想,最好的方式就是自己實現(xiàn)一個小型的 IOC 容器,自己慢慢體會。
所以本文嘗試帶領大家去設計一個小型的 RPC 框架,同時對于框架會保持一些拓展點。
通過閱讀本文,你可以收獲:
- 理解 RPC 框架最核心的理念。
- 學習在設計框架的時候,如何保持拓展性。
本文會依賴一些組件,他們是實現(xiàn) RPC 框架必要的一些知識,文中會盡量降低這些知識帶來的障礙。
但是,最好期望讀者有以下知識基礎:
- Zookeeper 基本入門
- Netty 基本入門
RPC 框架應該長什么樣子
我們首先來看一下:一個 RPC 框架是什么東西?我們最直觀的感覺就是:
集成了 RPC 框架之后,通過配置一個注冊中心的地址。一個應用(稱為服務提供者)將某個接口(interface)“暴露”出去,另外一個應用(稱為服務消費者)通過“引用”這個接口(interface),然后調(diào)用了一下,就很神奇的可以調(diào)用到另外一個應用的方法了
給我們的感覺就好像調(diào)用了一個本地方法一樣。即便兩個應用不是在同一個 JVM 中甚至兩個應用都不在同一臺機器中。
那他們是如何做到的呢?當我們的服務消費者調(diào)用某個 RPC 接口的方法之后,它的底層會通過動態(tài)代理,然后經(jīng)過網(wǎng)絡調(diào)用,去到服務提供者的機器上,然后執(zhí)行對應的方法。
接著方法的結(jié)果通過網(wǎng)絡傳輸返回到服務消費者那里,然后就可以拿到結(jié)果了。
整個過程如下圖:
那么這個時候,可能有人會問了:服務消費者怎么知道服務提供者在哪臺機器的哪個端口呢?
這個時候,就需要“注冊中心”登場了,具體來說是這樣子的:
- 服務提供者在啟動的時候,將自己應用所在機器的信息提交到注冊中心上面。
- 服務消費者在啟動的時候,將需要消費的接口所在機器的信息抓回來。
這樣一來,服務消費者就有了一份服務提供者所在的機器列表了。
“服務消費者”拿到了“服務提供者”的機器列表就可以通過網(wǎng)絡請求來發(fā)起請求了。
網(wǎng)絡客戶端,我們應該采用什么呢?有幾種選擇:
- 使用 JDK 原生 BIO(也就是 ServerSocket 那一套)。阻塞式 IO 方法,無法支撐高并發(fā)。
- 使用 JDK 原生 NIO(Selector、SelectionKey 那一套)。非阻塞式 IO,可以支持高并發(fā),但是自己實現(xiàn)復雜,需要處理各種網(wǎng)絡問題。
- 使用大名鼎鼎的 NIO 框架 Netty,天然支持高并發(fā),封裝好,API 易用。
“服務消費者”拿到了“服務提供者”的機器列表就可以通過網(wǎng)絡請求來發(fā)起請求了。
作為一個有追求的程序員,我們要求開發(fā)出來的框架要求支持高并發(fā)、又要求簡單、還要快。
當然是選擇 Netty 來實現(xiàn)了,使用 Netty 的一些很基本的 API 就能滿足我們的需求。
網(wǎng)絡協(xié)議定義
當然了,既然我們要使用網(wǎng)絡傳輸數(shù)據(jù)。我們首先要定義一套網(wǎng)絡協(xié)議出來。
你可能又要問了,啥叫網(wǎng)絡協(xié)議?網(wǎng)絡協(xié)議,通俗理解,意思就是說我們的客戶端發(fā)送的數(shù)據(jù)應該長什么樣子,服務端可以去解析出來知道要做什么事情。
話不多說,上代碼,假設我們現(xiàn)在服務提供者有兩個類:
- // com.study.rpc.test.producer.HelloService
- public interface HelloService {
- String sayHello(TestBean testBean);
- }
- // com.study.rpc.test.producer.TestBean
- public class TestBean {
- private String name;
- private Integer age;
- public TestBean(String name, Integer age) {
- this.name = name;
- this.age = age;
- }
- // getter setter
- }
現(xiàn)在我要調(diào)用 HelloService.sayHello(TestBean testBean) 這個方法。
作為“服務消費者”,應該怎么定義我們的請求,從而讓服務端知道我是要調(diào)用這個方法呢?
這需要我們將這個接口信息產(chǎn)生一個唯一的標識:這個標識會記錄了接口名、具體是那個方法、然后具體參數(shù)是什么!
然后將這些信息組織起來發(fā)送給服務端,我這里的方式是將信息保存為一個 JSON 格式的字符串來傳輸。
比如上面的接口我們傳輸?shù)臄?shù)據(jù)大概是這樣的:
- {
- "interfaces": "interface=com.study.rpc.test.producer.HelloService&method=sayHello&
- parameter=com.study.rpc.test.producer.TestBean",
- "requestId": "3",
- "parameter": {
- "com.study.rpc.test.producer.TestBean": {
- "age": 20,
- "name": "張三"
- }
- }
- }
嗯,我這里用一個 JSON 來標識這次調(diào)用是調(diào)用哪個接口的哪個方法,其中 interface 標識了唯一的類,parameter 標識了里面具體有哪些參數(shù), 其中 key 就是參數(shù)的類全限定名,value 就是這個類的 JSON 信息。
可能看到這里,大家可能有意見了:數(shù)據(jù)不一定用 JSON 格式傳輸啊,而且使用 JSON 也不一定性能最高啊。
你使用 JDK 的 Serializable 配合 Netty 的 ObjectDecoder 來實現(xiàn),這當然也可以,其實這里是一個拓展點,我們應該要提供多種序列化方式來供用戶選擇。
但是這里選擇了 JSON 的原因是因為它比較直觀,對于寫文章來說比較合理。
開發(fā)服務提供者
嗯,搞定了網(wǎng)絡協(xié)議之后,我們開始開發(fā)“服務提供者”了。對于服務提供者,因為我們這里是寫一個簡單版本的 RPC 框架,為了保持簡潔。
我們不會引入類似 Spring 之類的容器框架,所以我們需要定義一個服務提供者的配置類,它用于定義這個服務提供者是什么接口,然后它具體的實例對象是什么:
- public class ServiceConfig{
- public Class type;
- public T instance;
- public ServiceConfig(Classtype, T instance) {
- this.type = type;
- this.instance = instance;
- }
- public ClassgetType() {
- return type;
- }
- public void setType(Classtype) {
- this.type = type;
- }
- public T getInstance() {
- return instance;
- }
- public void setInstance(T instance) {
- this.instance = instance;
- }
- }
有了這個東西之后,我們就知道需要暴露哪些接口了。為了框架有一個統(tǒng)一的入口,我定義了一個類叫做 ApplicationContext,可以認為這是一個應用程序上下文,他的構(gòu)造函數(shù),接收 2 個參數(shù)。
代碼如下:
- public ApplicationContext(String registryUrl, ListserviceConfigs){
- // 1. 保存需要暴露的接口配置
- this.serviceConfigs = serviceConfigs == null ? new ArrayList<>() : serviceConfigs;
- // step 2: 實例化注冊中心
- initRegistry(registryUrl);
- // step 3: 將接口注冊到注冊中心,從注冊中心獲取接口,初始化服務接口列表
- RegistryInfo registryInfo = null;
- InetAddress addr = InetAddress.getLocalHost();
- String hostname = addr.getHostName();
- String hostAddress = addr.getHostAddress();
- registryInfo = new RegistryInfo(hostname, hostAddress, port);
- doRegistry(registryInfo);
- // step 4:初始化Netty服務器,接受到請求,直接打到服務提供者的service方法中
- if (!this.serviceConfigs.isEmpty()) {
- // 需要暴露接口才暴露
- nettyServer = new NettyServer(this.serviceConfigs, interfaceMethods);
- nettyServer.init(port);
- }
- }
注冊中心設計
這里分為幾個步驟,首先保存了接口配置,接著初始化注冊中心,因為注冊中心可能會提供多種來供用戶選擇,所以這里需要定義一個注冊中心的接口:
- public interface Registry {
- /**
- * 將生產(chǎn)者接口注冊到注冊中心
- *
- * @param clazz 類
- * @param registryInfo 本機的注冊信息
- */
- void register(Class clazz, RegistryInfo registryInfo) throws Exception;
- }
這里我們提供一個注冊的方法,這個方法的語義是將 clazz 對應的接口注冊到注冊中心。
接收兩個參數(shù),一個是接口的 class 對象,另一個是注冊信息,里面包含了本機的一些基本信息,如下:
- public class RegistryInfo {
- private String hostname;
- private String ip;
- private Integer port;
- public RegistryInfo(String hostname, String ip, Integer port) {
- this.hostname = hostname;
- this.ip = ip;
- this.port = port;
- }
- // getter setter
- }
好了,定義好注冊中心,回到之前的實例化注冊中心的地方,代碼如下:
- /**
- * 注冊中心
- */
- private Registry registry;
- private void initRegistry(String registryUrl) {
- if (registryUrl.startsWith("zookeeper://")) {
- registryUrl = registryUrl.substring(12);
- registry = new ZookeeperRegistry(registryUrl);
- } else if (registryUrl.startsWith("multicast://")) {
- registry = new MulticastRegistry(registryUrl);
- }
- }
這里邏輯也非常簡單,就是根據(jù) url 的 schema 來判斷是那個注冊中心,注冊中心這里實現(xiàn)了 2 個實現(xiàn)類,分別使用 Zookeeper 作為注冊中心,另外一個是使用廣播的方式作為注冊中心。
廣播注冊中心這里僅僅是做個示范,內(nèi)部沒有實現(xiàn)。我們主要是實現(xiàn)了 Zookeeper 的注冊中心。
當然了,如果有興趣,可以實現(xiàn)更多的注冊中心供用戶選擇,比如 Redis 之類的,這里只是為了保持“拓展點”。
那么實例化完注冊中心之后,回到上面的代碼。
注冊服務提供者
- // step 3: 將接口注冊到注冊中心,從注冊中心獲取接口,初始化服務接口列表
- RegistryInfo registryInfo = null;
- InetAddress addr = InetAddress.getLocalHost();
- String hostname = addr.getHostName();
- String hostAddress = addr.getHostAddress();
- registryInfo = new RegistryInfo(hostname, hostAddress, port);
- doRegistry(registryInfo);
這里邏輯很簡單,就是獲取本機的的基本信息構(gòu)造成 RegistryInfo,然后調(diào)用了 doRegistry 方法:
- /**
- * 接口方法對應method對象
- */
- private MapinterfaceMethods = new ConcurrentHashMap<>();
- private void doRegistry(RegistryInfo registryInfo) throws Exception {
- for (ServiceConfig config : serviceConfigs) {
- Class type = config.getType();
- registry.register(type, registryInfo);
- Method[] declaredMethods = type.getDeclaredMethods();
- for (Method method : declaredMethods) {
- String identify = InvokeUtils.buildInterfaceMethodIdentify(type, method);
- interfaceMethods.put(identify, method);
- }
- }
- }
這里做了兩件事情:
- 將接口注冊到注冊中心中。
- 對于每一個接口的每一個方法,生成一個唯一標識,保存在 interfaceMethods 集合中。
下面分別分析這兩件事情,首先是注冊方法:因為我們用到了 Zookeeper,為了方便,引入了 Zookeeper 的客戶端框架 Curator。
org.apache.curatorgroupId> curator-recipesartifactId> 2.3.0version> - dependency>
接著看代碼:
- public class ZookeeperRegistry implements Registry {
- private CuratorFramework client;
- public ZookeeperRegistry(String connectString) {
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
- client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
- client.start();
- try {
- Stat myRPC = client.checkExists().forPath("/myRPC");
- if (myRPC == null) {
- client.create()
- .creatingParentsIfNeeded()
- .forPath("/myRPC");
- }
- System.out.println("Zookeeper Client初始化完畢......");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- @Override
- public void register(Class clazz, RegistryInfo registryInfo) throws Exception {
- // 1. 注冊的時候,先從zk中獲取數(shù)據(jù)
- // 2. 將自己的服務器地址加入注冊中心中
- // 為每一個接口的每一個方法注冊一個臨時節(jié)點,然后key為接口方法的唯一標識,data為服務地址列表
- Method[] declaredMethods = clazz.getDeclaredMethods();
- for (Method method : declaredMethods) {
- String key = InvokeUtils.buildInterfaceMethodIdentify(clazz, method);
- String path = "/myRPC/" + key;
- Stat stat = client.checkExists().forPath(path);
- ListregistryInfos;
- if (stat != null) {
- // 如果這個接口已經(jīng)有人注冊過了,把數(shù)據(jù)拿回來,然后將自己的信息保存進去
- byte[] bytes = client.getData().forPath(path);
- String data = new String(bytes, StandardCharsets.UTF_8);
- registryInfos = JSONArray.parseArray(data, RegistryInfo.class);
- if (registryInfos.contains(registryInfo)) {
- // 正常來說,zk的臨時節(jié)點,斷開連接后,直接就沒了,但是重啟會經(jīng)常發(fā)現(xiàn)存在節(jié)點,所以有了這樣的代碼
- System.out.println("地址列表已經(jīng)包含本機【" + key + "】,不注冊了");
- } else {
- registryInfos.add(registryInfo);
- client.setData().forPath(path, JSONArray.toJSONString(registryInfos).getBytes());
- System.out.println("注冊到注冊中心,路徑為:【" + path + "】 信息為:" + registryInfo);
- }
- } else {
- registryInfos = new ArrayList<>();
- registryInfos.add(registryInfo);
- client.create()
- .creatingParentsIfNeeded()
- // 臨時節(jié)點,斷開連接就關閉
- .withMode(CreateMode.EPHEMERAL)
- .forPath(path, JSONArray.toJSONString(registryInfos).getBytes());
- System.out.println("注冊到注冊中心,路徑為:【" + path + "】 信息為:" + registryInfo);
- }
- }
- }
- }
Zookeeper 注冊中心在初始化的時候,會建立好連接。然后注冊的時候,針對 clazz 接口的每一個方法,都會生成一個唯一標識。
這里使用了InvokeUtils.buildInterfaceMethodIdentify方法:
- public static String buildInterfaceMethodIdentify(Class clazz, Method method) {
- Map
map = new LinkedHashMap<>(); - map.put("interface", clazz.getName());
- map.put("method", method.getName());
- Parameter[] parameters = method.getParameters();
- if (parameters.length > 0) {
- StringBuilder param = new StringBuilder();
- for (int i = 0; i < parameters.length; i++) {
- Parameter p = parameters[i];
- param.append(p.getType().getName());
- if (i < parameters.length - 1) {
- param.append(",");
- }
- }
- map.put("parameter", param.toString());
- }
- return map2String(map);
- }
- public static String map2String(Map
map) { - StringBuilder sb = new StringBuilder();
- Iterator
String>> iterator = map.entrySet().iterator(); - while (iterator.hasNext()) {
- Map.Entry
entry = iterator.next(); - sb.append(entry.getKey() + "=" + entry.getValue());
- if (iterator.hasNext()) {
- sb.append("&");
- }
- }
- return sb.toString();
- }
其實就是對接口的方法使用他們的限定名和參數(shù)來組成一個唯一的標識,比如 HelloService#sayHello(TestBean) 生成的大概是這樣的:
- interface=com.study.rpc.test.producer.HelloService&method=sayHello&
- parameter=com.study.rpc.test.producer.TestBean
接下來的邏輯就簡單了,在 Zookeeper 中的 /myRPC 路徑下面建立臨時節(jié)點,節(jié)點名稱為我們上面的接口方法唯一標識,數(shù)據(jù)內(nèi)容為機器信息。
之所以采用臨時節(jié)點是因為:如果機器宕機了,連接斷開之后,消費者可以通過 Zookeeper 的 watcher 機制感知到。
大概看起來是這樣的:
- /myRPC/interface=com.study.rpc.test.producer.HelloService&method=sayHello&
- parameter=com.study.rpc.test.producer.TestBean
- [
- {
- "hostname":peer1,
- "port":8080
- },
- {
- "hostname":peer2,
- "port":8081
- }
- ]
通過這樣的方式,在服務消費的時候就可以拿到這樣的注冊信息,然后知道可以調(diào)用那臺機器的那個端口。
好了,注冊中心弄完了之后,我們回到前面說的注冊方法做的第二件事情,我們將每一個接口方法標識的方法放入了一個 map 中:
- /**
- * 接口方法對應method對象
- */
- private Map
interfaceMethods = new ConcurrentHashMap<>();
這個的原因是因為,我們在收到網(wǎng)絡請求的時候,需要調(diào)用反射的方式調(diào)用 Method 對象,所以存起來。
啟動網(wǎng)絡服務端接受請求
接下來我們就可以看第四步了:
- // step 4:初始化Netty服務器,接受到請求,直接打到服務提供者的service方法中
- if (!this.serviceConfigs.isEmpty()) {
- // 需要暴露接口才暴露
- nettyServer = new NettyServer(this.serviceConfigs, interfaceMethods);
- nettyServer.init(port);
- }
因為這里使用 Netty 來做的所以需要引入 Netty 的依賴:
io.nettygroupId> netty-allartifactId> 4.1.30.Finalversion> - dependency>
接著來分析:
- public class NettyServer {
- /**
- * 負責調(diào)用方法的handler
- */
- private RpcInvokeHandler rpcInvokeHandler;
- public NettyServer(ListserverConfigs, MapinterfaceMethods)throws InterruptedException {
- this.rpcInvokeHandler = new RpcInvokeHandler(serverConfigs, interfaceMethods);
- }
- public int init(int port) throws Exception {
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, 1024)
- .childHandler(new ChannelInitializer(){
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ByteBuf delimiter = Unpooled.copiedBuffer("$$");
- // 設置按照分隔符“&&”來切分消息,單條消息限制為 1MB
- ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, delimiter));
- ch.pipeline().addLast(new StringDecoder());
- ch.pipeline().addLast().addLast(rpcInvokeHandler);
- }
- });
- ChannelFuture sync = b.bind(port).sync();
- System.out.println("啟動NettyService,端口為:" + port);
- return port;
- }
- }
這部分主要的都是 Netty 的 API,我們不做過多的說明,就簡單的說一下:
- 我們通過“&&”作為標識符號來區(qū)分兩條信息,然后一條信息的最大長度為 1MB。
- 所有邏輯都在 RpcInvokeHandler 中,這里面?zhèn)鬟M去了配置的服務接口實例,以及服務接口實例每個接口方法唯一標識對應的 Method 對象的 Map 集合。
- public class RpcInvokeHandler extends ChannelInboundHandlerAdapter {
- /**
- * 接口方法唯一標識對應的Method對象
- */
- private Map
interfaceMethods; - /**
- * 接口對應的實現(xiàn)類
- */
- private Map
interfaceToInstance; - /**
- * 線程池,隨意寫的,不要吐槽
- */
- private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10,
- 50, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),
- new ThreadFactory() {
- AtomicInteger m = new AtomicInteger(0);
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "IO-thread-" + m.incrementAndGet());
- }
- });
- public RpcInvokeHandler(ListserviceConfigList,
- Map
interfaceMethods) { - this.interfaceToInstance = new ConcurrentHashMap<>();
- this.interfaceMethods = interfaceMethods;
- for (ServiceConfig config : serviceConfigList) {
- interfaceToInstance.put(config.getType(), config.getInstance());
- }
- }
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- try {
- String message = (String) msg;
- // 這里拿到的是一串JSON數(shù)據(jù),解析為Request對象,
- // 事實上這里解析網(wǎng)絡數(shù)據(jù),可以用序列化方式,定一個接口,可以實現(xiàn)JSON格式序列化,或者其他序列化
- // 但是demo版本就算了。
- System.out.println("接收到消息:" + msg);
- RpcRequest request = RpcRequest.parse(message, ctx);
- threadPoolExecutor.execute(new RpcInvokeTask(request));
- } finally {
- ReferenceCountUtil.release(msg);
- }
- }
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- ctx.flush();
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- System.out.println("發(fā)生了異常..." + cause);
- cause.printStackTrace();
- ctx.close();
- }
- public class RpcInvokeTask implements Runnable {
- private RpcRequest rpcRequest;
- RpcInvokeTask(RpcRequest rpcRequest) {
- this.rpcRequest = rpcRequest;
- }
- @Override
- public void run() {
- try {
- /*
- * 數(shù)據(jù)大概是這樣子的
- * {"interfaces":"interface=com.study.rpc.test.producer.HelloService&method=sayHello?meter=com
- * .study.rpc.test.producer.TestBean","requestId":"3","parameter":{"com.study.rpc.test.producer
- * .TestBean":{"age":20,"name":"張三"}}}
- */
- // 這里希望能拿到每一個服務對象的每一個接口的特定聲明
- String interfaceIdentity = rpcRequest.getInterfaceIdentity();
- Method method = interfaceMethods.get(interfaceIdentity);
- Map
map = string2Map(interfaceIdentity); - String interfaceName = map.get("interface");
- Class interfaceClass = Class.forName(interfaceName);
- Object o = interfaceToInstance.get(interfaceClass);
- String parameterString = map.get("parameter");
- Object result;
- if (parameterString != null) {
- String[] parameterTypeClass = parameterString.split(",");
- Map
parameterMap = rpcRequest.getParameterMap(); - Object[] parameterInstance = new Object[parameterTypeClass.length];
- for (int i = 0; i < parameterTypeClass.length; i++) {
- String parameterClazz = parameterTypeClass[i];
- parameterInstance[i] = parameterMap.get(parameterClazz);
- }
- result = method.invoke(o, parameterInstance);
- } else {
- result = method.invoke(o);
- }
- // 寫回響應
- ChannelHandlerContext ctx = rpcRequest.getCtx();
- String requestId = rpcRequest.getRequestId();
- RpcResponse response = RpcResponse.create(JSONObject.toJSONString(result), interfaceIdentity,
- requestId);
- String s = JSONObject.toJSONString(response) + "$$";
- ByteBuf byteBuf = Unpooled.copiedBuffer(s.getBytes());
- ctx.writeAndFlush(byteBuf);
- System.out.println("響應給客戶端:" + s);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- public static Map
string2Map(String str) { - String[] split = str.split("&");
- Map
map = new HashMap<>(16); <
網(wǎng)站名稱:神煩,老大要我寫一個RPC框架!
分享鏈接:http://www.5511xx.com/article/cocsjgj.html


咨詢
建站咨詢
