日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關咨詢
選擇下列產(chǎn)品馬上在線溝通
服務時間:8:30-17:00
你可能遇到了下面的問題
關閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
神煩,老大要我寫一個RPC框架!

 如果大家對 RPC 有一些了解的話,或多或者都會聽到過一些大名鼎鼎的 RPC 框架,比如 Dubbo、gRPC。但是大部分人對于他們底層的實現(xiàn)原理其實不甚了解。

成都創(chuàng)新互聯(lián)專注為客戶提供全方位的互聯(lián)網(wǎng)綜合服務,包含不限于成都網(wǎng)站制作、做網(wǎng)站、外貿(mào)營銷網(wǎng)站建設、河源網(wǎng)絡推廣、小程序設計、河源網(wǎng)絡營銷、河源企業(yè)策劃、河源品牌公關、搜索引擎seo、人物專訪、企業(yè)宣傳片、企業(yè)代運營等,從售前售中售后,我們都將竭誠為您服務,您的肯定,是我們最大的嘉獎;成都創(chuàng)新互聯(lián)為所有大學生創(chuàng)業(yè)者提供河源建站搭建服務,24小時服務熱線:18980820575,官方網(wǎng)址:www.cdcxhl.com

圖片來自 Pexels

有一種比較好的學習方式就是:如果你想要了解一個框架的原理,你可以嘗試去寫一個簡易版的框架出來,就比如如果你想理解 Spring IOC 的思想,最好的方式就是自己實現(xiàn)一個小型的 IOC 容器,自己慢慢體會。

所以本文嘗試帶領大家去設計一個小型的 RPC 框架,同時對于框架會保持一些拓展點。

通過閱讀本文,你可以收獲:

  • 理解 RPC 框架最核心的理念。
  • 學習在設計框架的時候,如何保持拓展性。

本文會依賴一些組件,他們是實現(xiàn) RPC 框架必要的一些知識,文中會盡量降低這些知識帶來的障礙。

但是,最好期望讀者有以下知識基礎:

  • Zookeeper 基本入門
  • Netty 基本入門

RPC 框架應該長什么樣子

我們首先來看一下:一個 RPC 框架是什么東西?我們最直觀的感覺就是:

集成了 RPC 框架之后,通過配置一個注冊中心的地址。一個應用(稱為服務提供者)將某個接口(interface)“暴露”出去,另外一個應用(稱為服務消費者)通過“引用”這個接口(interface),然后調(diào)用了一下,就很神奇的可以調(diào)用到另外一個應用的方法了

給我們的感覺就好像調(diào)用了一個本地方法一樣。即便兩個應用不是在同一個 JVM 中甚至兩個應用都不在同一臺機器中。

那他們是如何做到的呢?當我們的服務消費者調(diào)用某個 RPC 接口的方法之后,它的底層會通過動態(tài)代理,然后經(jīng)過網(wǎng)絡調(diào)用,去到服務提供者的機器上,然后執(zhí)行對應的方法。

接著方法的結(jié)果通過網(wǎng)絡傳輸返回到服務消費者那里,然后就可以拿到結(jié)果了。

整個過程如下圖:

那么這個時候,可能有人會問了:服務消費者怎么知道服務提供者在哪臺機器的哪個端口呢?

這個時候,就需要“注冊中心”登場了,具體來說是這樣子的:

  • 服務提供者在啟動的時候,將自己應用所在機器的信息提交到注冊中心上面。
  • 服務消費者在啟動的時候,將需要消費的接口所在機器的信息抓回來。

這樣一來,服務消費者就有了一份服務提供者所在的機器列表了。

 

“服務消費者”拿到了“服務提供者”的機器列表就可以通過網(wǎng)絡請求來發(fā)起請求了。

網(wǎng)絡客戶端,我們應該采用什么呢?有幾種選擇:

  • 使用 JDK 原生 BIO(也就是 ServerSocket 那一套)。阻塞式 IO 方法,無法支撐高并發(fā)。
  • 使用 JDK 原生 NIO(Selector、SelectionKey 那一套)。非阻塞式 IO,可以支持高并發(fā),但是自己實現(xiàn)復雜,需要處理各種網(wǎng)絡問題。
  • 使用大名鼎鼎的 NIO 框架 Netty,天然支持高并發(fā),封裝好,API 易用。

“服務消費者”拿到了“服務提供者”的機器列表就可以通過網(wǎng)絡請求來發(fā)起請求了。

作為一個有追求的程序員,我們要求開發(fā)出來的框架要求支持高并發(fā)、又要求簡單、還要快。

當然是選擇 Netty 來實現(xiàn)了,使用 Netty 的一些很基本的 API 就能滿足我們的需求。

網(wǎng)絡協(xié)議定義

當然了,既然我們要使用網(wǎng)絡傳輸數(shù)據(jù)。我們首先要定義一套網(wǎng)絡協(xié)議出來。

你可能又要問了,啥叫網(wǎng)絡協(xié)議?網(wǎng)絡協(xié)議,通俗理解,意思就是說我們的客戶端發(fā)送的數(shù)據(jù)應該長什么樣子,服務端可以去解析出來知道要做什么事情。

話不多說,上代碼,假設我們現(xiàn)在服務提供者有兩個類:

 
 
 
 
  1. // com.study.rpc.test.producer.HelloService 
  2. public interface HelloService { 
  3.  
  4.     String sayHello(TestBean testBean); 
  5.  
  6. // com.study.rpc.test.producer.TestBean 
  7. public class TestBean { 
  8.  
  9.     private String name; 
  10.     private Integer age; 
  11.  
  12.     public TestBean(String name, Integer age) { 
  13.         this.name = name; 
  14.         this.age = age; 
  15.     } 
  16.     // getter setter 

現(xiàn)在我要調(diào)用 HelloService.sayHello(TestBean testBean) 這個方法。

作為“服務消費者”,應該怎么定義我們的請求,從而讓服務端知道我是要調(diào)用這個方法呢?

這需要我們將這個接口信息產(chǎn)生一個唯一的標識:這個標識會記錄了接口名、具體是那個方法、然后具體參數(shù)是什么!

然后將這些信息組織起來發(fā)送給服務端,我這里的方式是將信息保存為一個 JSON 格式的字符串來傳輸。

比如上面的接口我們傳輸?shù)臄?shù)據(jù)大概是這樣的:

 
 
 
 
  1.     "interfaces": "interface=com.study.rpc.test.producer.HelloService&method=sayHello& 
  2.     parameter=com.study.rpc.test.producer.TestBean", 
  3.     "requestId": "3", 
  4.     "parameter": { 
  5.         "com.study.rpc.test.producer.TestBean": { 
  6.             "age": 20, 
  7.             "name": "張三" 
  8.         } 
  9.     } 

嗯,我這里用一個 JSON 來標識這次調(diào)用是調(diào)用哪個接口的哪個方法,其中 interface 標識了唯一的類,parameter 標識了里面具體有哪些參數(shù), 其中 key 就是參數(shù)的類全限定名,value 就是這個類的 JSON 信息。

可能看到這里,大家可能有意見了:數(shù)據(jù)不一定用 JSON 格式傳輸啊,而且使用 JSON 也不一定性能最高啊。

你使用 JDK 的 Serializable 配合 Netty 的 ObjectDecoder 來實現(xiàn),這當然也可以,其實這里是一個拓展點,我們應該要提供多種序列化方式來供用戶選擇。

但是這里選擇了 JSON 的原因是因為它比較直觀,對于寫文章來說比較合理。

開發(fā)服務提供者

嗯,搞定了網(wǎng)絡協(xié)議之后,我們開始開發(fā)“服務提供者”了。對于服務提供者,因為我們這里是寫一個簡單版本的 RPC 框架,為了保持簡潔。

我們不會引入類似 Spring 之類的容器框架,所以我們需要定義一個服務提供者的配置類,它用于定義這個服務提供者是什么接口,然后它具體的實例對象是什么:

 
 
 
 
  1. public class ServiceConfig{ 
  2.  
  3.     public Class type; 
  4.  
  5.     public T instance; 
  6.  
  7.     public ServiceConfig(Classtype, T instance) { 
  8.         this.type = type; 
  9.         this.instance = instance; 
  10.     } 
  11.  
  12.     public ClassgetType() { 
  13.         return type; 
  14.     } 
  15.  
  16.     public void setType(Classtype) { 
  17.         this.type = type; 
  18.     } 
  19.  
  20.     public T getInstance() { 
  21.         return instance; 
  22.     } 
  23.  
  24.     public void setInstance(T instance) { 
  25.         this.instance = instance; 
  26.     } 

有了這個東西之后,我們就知道需要暴露哪些接口了。為了框架有一個統(tǒng)一的入口,我定義了一個類叫做 ApplicationContext,可以認為這是一個應用程序上下文,他的構(gòu)造函數(shù),接收 2 個參數(shù)。

代碼如下:

 
 
 
 
  1. public ApplicationContext(String registryUrl, ListserviceConfigs){ 
  2.     // 1. 保存需要暴露的接口配置 
  3.     this.serviceConfigs = serviceConfigs == null ? new ArrayList<>() : serviceConfigs; 
  4.  
  5.     // step 2: 實例化注冊中心 
  6.     initRegistry(registryUrl); 
  7.  
  8.     // step 3: 將接口注冊到注冊中心,從注冊中心獲取接口,初始化服務接口列表 
  9.     RegistryInfo registryInfo = null; 
  10.     InetAddress addr = InetAddress.getLocalHost(); 
  11.     String hostname = addr.getHostName(); 
  12.     String hostAddress = addr.getHostAddress(); 
  13.     registryInfo = new RegistryInfo(hostname, hostAddress, port); 
  14.     doRegistry(registryInfo); 
  15.  
  16.  
  17.     // step 4:初始化Netty服務器,接受到請求,直接打到服務提供者的service方法中 
  18.     if (!this.serviceConfigs.isEmpty()) { 
  19.         // 需要暴露接口才暴露 
  20.         nettyServer = new NettyServer(this.serviceConfigs, interfaceMethods); 
  21.         nettyServer.init(port); 
  22.     } 

注冊中心設計

這里分為幾個步驟,首先保存了接口配置,接著初始化注冊中心,因為注冊中心可能會提供多種來供用戶選擇,所以這里需要定義一個注冊中心的接口:

 
 
 
 
  1. public interface Registry { 
  2.     /** 
  3.      * 將生產(chǎn)者接口注冊到注冊中心 
  4.      * 
  5.      * @param clazz        類 
  6.      * @param registryInfo 本機的注冊信息 
  7.      */ 
  8.     void register(Class clazz, RegistryInfo registryInfo) throws Exception; 

這里我們提供一個注冊的方法,這個方法的語義是將 clazz 對應的接口注冊到注冊中心。

接收兩個參數(shù),一個是接口的 class 對象,另一個是注冊信息,里面包含了本機的一些基本信息,如下:

 
 
 
 
  1. public class RegistryInfo { 
  2.  
  3.     private String hostname; 
  4.     private String ip; 
  5.     private Integer port; 
  6.  
  7.     public RegistryInfo(String hostname, String ip, Integer port) { 
  8.         this.hostname = hostname; 
  9.         this.ip = ip; 
  10.         this.port = port; 
  11.     } 
  12.     // getter setter 

好了,定義好注冊中心,回到之前的實例化注冊中心的地方,代碼如下:

 
 
 
 
  1. /** 
  2.  * 注冊中心 
  3.  */ 
  4. private Registry registry; 
  5.  
  6. private void initRegistry(String registryUrl) { 
  7.     if (registryUrl.startsWith("zookeeper://")) { 
  8.         registryUrl = registryUrl.substring(12); 
  9.         registry = new ZookeeperRegistry(registryUrl); 
  10.     } else if (registryUrl.startsWith("multicast://")) { 
  11.         registry = new MulticastRegistry(registryUrl); 
  12.     } 

這里邏輯也非常簡單,就是根據(jù) url 的 schema 來判斷是那個注冊中心,注冊中心這里實現(xiàn)了 2 個實現(xiàn)類,分別使用 Zookeeper 作為注冊中心,另外一個是使用廣播的方式作為注冊中心。

廣播注冊中心這里僅僅是做個示范,內(nèi)部沒有實現(xiàn)。我們主要是實現(xiàn)了 Zookeeper 的注冊中心。

當然了,如果有興趣,可以實現(xiàn)更多的注冊中心供用戶選擇,比如 Redis 之類的,這里只是為了保持“拓展點”。

那么實例化完注冊中心之后,回到上面的代碼。

注冊服務提供者

 
 
 
 
  1. // step 3: 將接口注冊到注冊中心,從注冊中心獲取接口,初始化服務接口列表 
  2. RegistryInfo registryInfo = null; 
  3. InetAddress addr = InetAddress.getLocalHost(); 
  4. String hostname = addr.getHostName(); 
  5. String hostAddress = addr.getHostAddress(); 
  6. registryInfo = new RegistryInfo(hostname, hostAddress, port); 
  7. doRegistry(registryInfo); 

這里邏輯很簡單,就是獲取本機的的基本信息構(gòu)造成 RegistryInfo,然后調(diào)用了 doRegistry 方法:

 
 
 
 
  1. /** 
  2.  * 接口方法對應method對象 
  3.  */ 
  4. private MapinterfaceMethods = new ConcurrentHashMap<>(); 
  5.  
  6. private void doRegistry(RegistryInfo registryInfo) throws Exception { 
  7.     for (ServiceConfig config : serviceConfigs) { 
  8.         Class type = config.getType(); 
  9.         registry.register(type, registryInfo); 
  10.         Method[] declaredMethods = type.getDeclaredMethods(); 
  11.         for (Method method : declaredMethods) { 
  12.             String identify = InvokeUtils.buildInterfaceMethodIdentify(type, method); 
  13.             interfaceMethods.put(identify, method); 
  14.         } 
  15.     } 

這里做了兩件事情:

  • 將接口注冊到注冊中心中。
  • 對于每一個接口的每一個方法,生成一個唯一標識,保存在 interfaceMethods 集合中。

下面分別分析這兩件事情,首先是注冊方法:因為我們用到了 Zookeeper,為了方便,引入了 Zookeeper 的客戶端框架 Curator。

 
 
 
 
  1.  
  2.     org.apache.curatorgroupId> 
  3.     curator-recipesartifactId> 
  4.     2.3.0version> 
  5. dependency> 

接著看代碼:

 
 
 
 
  1. public class ZookeeperRegistry implements Registry { 
  2.  
  3.     private CuratorFramework client; 
  4.  
  5.     public ZookeeperRegistry(String connectString) { 
  6.         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); 
  7.         client = CuratorFrameworkFactory.newClient(connectString, retryPolicy); 
  8.         client.start(); 
  9.         try { 
  10.             Stat myRPC = client.checkExists().forPath("/myRPC"); 
  11.             if (myRPC == null) { 
  12.                 client.create() 
  13.                         .creatingParentsIfNeeded() 
  14.                         .forPath("/myRPC"); 
  15.             } 
  16.             System.out.println("Zookeeper Client初始化完畢......"); 
  17.         } catch (Exception e) { 
  18.             e.printStackTrace(); 
  19.         } 
  20.     } 
  21.  
  22.  
  23.     @Override 
  24.     public void register(Class clazz, RegistryInfo registryInfo) throws Exception { 
  25.         // 1. 注冊的時候,先從zk中獲取數(shù)據(jù) 
  26.         // 2. 將自己的服務器地址加入注冊中心中 
  27.  
  28.         // 為每一個接口的每一個方法注冊一個臨時節(jié)點,然后key為接口方法的唯一標識,data為服務地址列表 
  29.  
  30.         Method[] declaredMethods = clazz.getDeclaredMethods(); 
  31.         for (Method method : declaredMethods) { 
  32.             String key = InvokeUtils.buildInterfaceMethodIdentify(clazz, method); 
  33.             String path = "/myRPC/" + key; 
  34.             Stat stat = client.checkExists().forPath(path); 
  35.             ListregistryInfos; 
  36.             if (stat != null) { 
  37.                 // 如果這個接口已經(jīng)有人注冊過了,把數(shù)據(jù)拿回來,然后將自己的信息保存進去 
  38.                 byte[] bytes = client.getData().forPath(path); 
  39.                 String data = new String(bytes, StandardCharsets.UTF_8); 
  40.                 registryInfos = JSONArray.parseArray(data, RegistryInfo.class); 
  41.                 if (registryInfos.contains(registryInfo)) { 
  42.                     // 正常來說,zk的臨時節(jié)點,斷開連接后,直接就沒了,但是重啟會經(jīng)常發(fā)現(xiàn)存在節(jié)點,所以有了這樣的代碼 
  43.                     System.out.println("地址列表已經(jīng)包含本機【" + key + "】,不注冊了"); 
  44.                 } else { 
  45.                     registryInfos.add(registryInfo); 
  46.                     client.setData().forPath(path, JSONArray.toJSONString(registryInfos).getBytes()); 
  47.                     System.out.println("注冊到注冊中心,路徑為:【" + path + "】 信息為:" + registryInfo); 
  48.                 } 
  49.             } else { 
  50.                 registryInfos = new ArrayList<>(); 
  51.                 registryInfos.add(registryInfo); 
  52.                 client.create() 
  53.                         .creatingParentsIfNeeded() 
  54.                         // 臨時節(jié)點,斷開連接就關閉 
  55.                         .withMode(CreateMode.EPHEMERAL) 
  56.                         .forPath(path, JSONArray.toJSONString(registryInfos).getBytes()); 
  57.                 System.out.println("注冊到注冊中心,路徑為:【" + path + "】 信息為:" + registryInfo); 
  58.             } 
  59.         } 
  60.     } 

Zookeeper 注冊中心在初始化的時候,會建立好連接。然后注冊的時候,針對 clazz 接口的每一個方法,都會生成一個唯一標識。

這里使用了InvokeUtils.buildInterfaceMethodIdentify方法:

 
 
 
 
  1. public static String buildInterfaceMethodIdentify(Class clazz, Method method) { 
  2.     Map map = new LinkedHashMap<>(); 
  3.     map.put("interface", clazz.getName()); 
  4.     map.put("method", method.getName()); 
  5.     Parameter[] parameters = method.getParameters(); 
  6.     if (parameters.length > 0) { 
  7.         StringBuilder param = new StringBuilder(); 
  8.         for (int i = 0; i < parameters.length; i++) { 
  9.             Parameter p = parameters[i]; 
  10.             param.append(p.getType().getName()); 
  11.             if (i < parameters.length - 1) { 
  12.                 param.append(","); 
  13.             } 
  14.         } 
  15.         map.put("parameter", param.toString()); 
  16.     } 
  17.     return map2String(map); 
  18.  
  19. public static String map2String(Map map) { 
  20.     StringBuilder sb = new StringBuilder(); 
  21.     IteratorString>> iterator = map.entrySet().iterator(); 
  22.     while (iterator.hasNext()) { 
  23.         Map.Entry entry = iterator.next(); 
  24.         sb.append(entry.getKey() + "=" + entry.getValue()); 
  25.         if (iterator.hasNext()) { 
  26.             sb.append("&"); 
  27.         } 
  28.     } 
  29.     return sb.toString(); 

其實就是對接口的方法使用他們的限定名和參數(shù)來組成一個唯一的標識,比如 HelloService#sayHello(TestBean) 生成的大概是這樣的:

 
 
 
 
  1. interface=com.study.rpc.test.producer.HelloService&method=sayHello& 
  2. parameter=com.study.rpc.test.producer.TestBean 

接下來的邏輯就簡單了,在 Zookeeper 中的 /myRPC 路徑下面建立臨時節(jié)點,節(jié)點名稱為我們上面的接口方法唯一標識,數(shù)據(jù)內(nèi)容為機器信息。

之所以采用臨時節(jié)點是因為:如果機器宕機了,連接斷開之后,消費者可以通過 Zookeeper 的 watcher 機制感知到。

大概看起來是這樣的:

 
 
 
 
  1. /myRPC/interface=com.study.rpc.test.producer.HelloService&method=sayHello& 
  2.    parameter=com.study.rpc.test.producer.TestBean 
  3.    [ 
  4.        { 
  5.            "hostname":peer1, 
  6.            "port":8080 
  7.        }, 
  8.        { 
  9.            "hostname":peer2, 
  10.            "port":8081 
  11.        } 
  12.    ] 

通過這樣的方式,在服務消費的時候就可以拿到這樣的注冊信息,然后知道可以調(diào)用那臺機器的那個端口。

好了,注冊中心弄完了之后,我們回到前面說的注冊方法做的第二件事情,我們將每一個接口方法標識的方法放入了一個 map 中:

 
 
 
 
  1. /** 
  2.  * 接口方法對應method對象 
  3.  */ 
  4. private Map interfaceMethods = new ConcurrentHashMap<>(); 

這個的原因是因為,我們在收到網(wǎng)絡請求的時候,需要調(diào)用反射的方式調(diào)用 Method 對象,所以存起來。

啟動網(wǎng)絡服務端接受請求

接下來我們就可以看第四步了:

 
 
 
 
  1. // step 4:初始化Netty服務器,接受到請求,直接打到服務提供者的service方法中 
  2. if (!this.serviceConfigs.isEmpty()) { 
  3.     // 需要暴露接口才暴露 
  4.     nettyServer = new NettyServer(this.serviceConfigs, interfaceMethods); 
  5.     nettyServer.init(port); 

因為這里使用 Netty 來做的所以需要引入 Netty 的依賴:

 
 
 
 
  1.  
  2.     io.nettygroupId> 
  3.     netty-allartifactId> 
  4.     4.1.30.Finalversion> 
  5. dependency> 

接著來分析:

 
 
 
 
  1. public class NettyServer { 
  2.  
  3.     /** 
  4.      * 負責調(diào)用方法的handler 
  5.      */ 
  6.     private RpcInvokeHandler rpcInvokeHandler; 
  7.  
  8.     public NettyServer(ListserverConfigs, MapinterfaceMethods)throws InterruptedException { 
  9.         this.rpcInvokeHandler = new RpcInvokeHandler(serverConfigs, interfaceMethods); 
  10.     } 
  11.  
  12.     public int init(int port) throws Exception { 
  13.         EventLoopGroup bossGroup = new NioEventLoopGroup(); 
  14.         EventLoopGroup workerGroup = new NioEventLoopGroup(); 
  15.         ServerBootstrap b = new ServerBootstrap(); 
  16.         b.group(bossGroup, workerGroup) 
  17.                 .channel(NioServerSocketChannel.class) 
  18.                 .option(ChannelOption.SO_BACKLOG, 1024) 
  19.                 .childHandler(new ChannelInitializer(){ 
  20.                     @Override 
  21.                     protected void initChannel(SocketChannel ch) throws Exception { 
  22.                         ByteBuf delimiter = Unpooled.copiedBuffer("$$"); 
  23.                         // 設置按照分隔符“&&”來切分消息,單條消息限制為 1MB 
  24.                         ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, delimiter)); 
  25.                         ch.pipeline().addLast(new StringDecoder()); 
  26.                         ch.pipeline().addLast().addLast(rpcInvokeHandler); 
  27.                     } 
  28.                 }); 
  29.         ChannelFuture sync = b.bind(port).sync(); 
  30.         System.out.println("啟動NettyService,端口為:" + port); 
  31.         return port; 
  32.     } 

這部分主要的都是 Netty 的 API,我們不做過多的說明,就簡單的說一下:

  • 我們通過“&&”作為標識符號來區(qū)分兩條信息,然后一條信息的最大長度為 1MB。
  • 所有邏輯都在 RpcInvokeHandler 中,這里面?zhèn)鬟M去了配置的服務接口實例,以及服務接口實例每個接口方法唯一標識對應的 Method 對象的 Map 集合。

 

 
 
 
 
  1. public class RpcInvokeHandler extends ChannelInboundHandlerAdapter { 
  2.  
  3.     /** 
  4.      * 接口方法唯一標識對應的Method對象 
  5.      */ 
  6.     private Map interfaceMethods; 
  7.     /** 
  8.      * 接口對應的實現(xiàn)類 
  9.      */ 
  10.     private Map interfaceToInstance; 
  11.  
  12.     /** 
  13.      * 線程池,隨意寫的,不要吐槽 
  14.      */ 
  15.     private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 
  16.             50, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), 
  17.             new ThreadFactory() { 
  18.                 AtomicInteger m = new AtomicInteger(0); 
  19.  
  20.                 @Override 
  21.                 public Thread newThread(Runnable r) { 
  22.                     return new Thread(r, "IO-thread-" + m.incrementAndGet()); 
  23.                 } 
  24.             }); 
  25.  
  26.  
  27.     public RpcInvokeHandler(ListserviceConfigList, 
  28.                             Map interfaceMethods) { 
  29.         this.interfaceToInstance = new ConcurrentHashMap<>(); 
  30.         this.interfaceMethods = interfaceMethods; 
  31.         for (ServiceConfig config : serviceConfigList) { 
  32.             interfaceToInstance.put(config.getType(), config.getInstance()); 
  33.         } 
  34.     } 
  35.  
  36.     @Override 
  37.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
  38.         try { 
  39.             String message = (String) msg; 
  40.             // 這里拿到的是一串JSON數(shù)據(jù),解析為Request對象, 
  41.             // 事實上這里解析網(wǎng)絡數(shù)據(jù),可以用序列化方式,定一個接口,可以實現(xiàn)JSON格式序列化,或者其他序列化 
  42.             // 但是demo版本就算了。 
  43.             System.out.println("接收到消息:" + msg); 
  44.             RpcRequest request = RpcRequest.parse(message, ctx); 
  45.             threadPoolExecutor.execute(new RpcInvokeTask(request)); 
  46.         } finally { 
  47.             ReferenceCountUtil.release(msg); 
  48.         } 
  49.     } 
  50.  
  51.     @Override 
  52.     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 
  53.         ctx.flush(); 
  54.     } 
  55.  
  56.     @Override 
  57.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
  58.         System.out.println("發(fā)生了異常..." + cause); 
  59.         cause.printStackTrace(); 
  60.         ctx.close(); 
  61.     } 
  62.  
  63.     public class RpcInvokeTask implements Runnable { 
  64.  
  65.         private RpcRequest rpcRequest; 
  66.  
  67.         RpcInvokeTask(RpcRequest rpcRequest) { 
  68.             this.rpcRequest = rpcRequest; 
  69.         } 
  70.  
  71.         @Override 
  72.         public void run() { 
  73.             try { 
  74.                 /* 
  75.                  * 數(shù)據(jù)大概是這樣子的 
  76.                  * {"interfaces":"interface=com.study.rpc.test.producer.HelloService&method=sayHello?meter=com 
  77.                  * .study.rpc.test.producer.TestBean","requestId":"3","parameter":{"com.study.rpc.test.producer 
  78.                  * .TestBean":{"age":20,"name":"張三"}}} 
  79.                  */ 
  80.                 // 這里希望能拿到每一個服務對象的每一個接口的特定聲明 
  81.                 String interfaceIdentity = rpcRequest.getInterfaceIdentity(); 
  82.                 Method method = interfaceMethods.get(interfaceIdentity); 
  83.                 Map map = string2Map(interfaceIdentity); 
  84.                 String interfaceName = map.get("interface"); 
  85.                 Class interfaceClass = Class.forName(interfaceName); 
  86.                 Object o = interfaceToInstance.get(interfaceClass); 
  87.                 String parameterString = map.get("parameter"); 
  88.                 Object result; 
  89.                 if (parameterString != null) { 
  90.                     String[] parameterTypeClass = parameterString.split(","); 
  91.                     Map parameterMap = rpcRequest.getParameterMap(); 
  92.                     Object[] parameterInstance = new Object[parameterTypeClass.length]; 
  93.                     for (int i = 0; i < parameterTypeClass.length; i++) { 
  94.                         String parameterClazz = parameterTypeClass[i]; 
  95.                         parameterInstance[i] = parameterMap.get(parameterClazz); 
  96.                     } 
  97.                     result = method.invoke(o, parameterInstance); 
  98.                 } else { 
  99.                     result = method.invoke(o); 
  100.                 } 
  101.                 // 寫回響應 
  102.                 ChannelHandlerContext ctx = rpcRequest.getCtx(); 
  103.                 String requestId = rpcRequest.getRequestId(); 
  104.                 RpcResponse response = RpcResponse.create(JSONObject.toJSONString(result), interfaceIdentity, 
  105.                         requestId); 
  106.                 String s = JSONObject.toJSONString(response) + "$$"; 
  107.                 ByteBuf byteBuf = Unpooled.copiedBuffer(s.getBytes()); 
  108.                 ctx.writeAndFlush(byteBuf); 
  109.                 System.out.println("響應給客戶端:" + s); 
  110.             } catch (Exception e) { 
  111.                 e.printStackTrace(); 
  112.             } 
  113.         } 
  114.  
  115.          public static Map string2Map(String str) { 
  116.             String[] split = str.split("&"); 
  117.             Map map = new HashMap<>(16); <
    網(wǎng)站名稱:神煩,老大要我寫一個RPC框架!
    分享鏈接:http://www.5511xx.com/article/cocsjgj.html