日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢(xún)
選擇下列產(chǎn)品馬上在線(xiàn)溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷(xiāo)解決方案
你管這個(gè)叫Dubbo?

RPC框架的實(shí)現(xiàn)

相信很多小伙伴已經(jīng)看了很多Dubbo的八股文了。比如,Dubbo支持哪些序列化框架,支持哪些注冊(cè)中心,支持哪些集群容錯(cuò)策略,支持服務(wù)降級(jí)嗎?但是你知道Dubbo服務(wù)導(dǎo)出和服務(wù)引入的過(guò)程嗎?服務(wù)降級(jí)是如何實(shí)現(xiàn)的?等等

成都創(chuàng)新互聯(lián)成立于2013年,我們提供高端重慶網(wǎng)站建設(shè)公司、成都網(wǎng)站制作成都網(wǎng)站設(shè)計(jì)、網(wǎng)站定制、成都營(yíng)銷(xiāo)網(wǎng)站建設(shè)、微信平臺(tái)小程序開(kāi)發(fā)、微信公眾號(hào)開(kāi)發(fā)、seo優(yōu)化排名服務(wù),提供專(zhuān)業(yè)營(yíng)銷(xiāo)思路、內(nèi)容策劃、視覺(jué)設(shè)計(jì)、程序開(kāi)發(fā)來(lái)完成項(xiàng)目落地,為石涼亭企業(yè)提供源源不斷的流量和訂單咨詢(xún)。

本文就從源碼的角度來(lái)分享一下Dubbo的整個(gè)調(diào)用過(guò)程(放心,圖示為主,輔助一少部分源碼)

「RPC框架的實(shí)現(xiàn)基本上都是如下架構(gòu)」

一個(gè)RPC調(diào)用的過(guò)程如下

  • 調(diào)用方發(fā)送請(qǐng)求后由代理類(lèi)將調(diào)用的方法,參數(shù)組裝成能進(jìn)行網(wǎng)絡(luò)傳輸?shù)南Ⅲw
  • 調(diào)用方代理類(lèi)將消息體發(fā)送到提供方
  • 提供方代理類(lèi)將消息進(jìn)行解碼,得到調(diào)用的方法和參數(shù)
  • 提供方代理類(lèi)執(zhí)行相應(yīng)的方法,并將結(jié)果返回

「協(xié)議,編解碼,序列化的部分不是本文的重點(diǎn),我就不分析了,有興趣的可以看我之前的文章?!?/p>

首先來(lái)手寫(xiě)一個(gè)極簡(jiǎn)版的RPC框架,以便你對(duì)上面的流程有一個(gè)更深的認(rèn)識(shí)

手寫(xiě)一個(gè)簡(jiǎn)單的PRC框架

封裝網(wǎng)絡(luò)請(qǐng)求對(duì)象

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RpcRequest implements Serializable {

private String interfaceName;
private String methodName;
private Class[] paramTypes;
private Object[] parameters;
}

根據(jù)interfaceName可以確定需要調(diào)用的接口,methodName和paramTypes則可以確定要調(diào)用接口的方法名,定位到具體的方法,傳入?yún)?shù)即可調(diào)用方法

封裝調(diào)用接口

封裝接口到api模塊,producer端寫(xiě)實(shí)現(xiàn)邏輯,consumer端寫(xiě)調(diào)用邏輯

public interface HelloService {

String sayHello(String content);
}
public interface UpperCaseService {

String toUpperCase(String content);
}

開(kāi)發(fā)producer端

public class HelloServiceImpl implements HelloService {

@Override
public String sayHello(String content) {
return "hello " + content;
}
}
public class UpperCaseServiceImpl implements UpperCaseService {

@Override
public String toUpperCase(String content) {
return content.toUpperCase();
}
}

ServiceMap保存了producer端接口名和接口實(shí)現(xiàn)類(lèi)的映射關(guān)系,這樣可以根據(jù)請(qǐng)求對(duì)象的接口名,找到對(duì)應(yīng)的實(shí)現(xiàn)類(lèi)

public class ServiceMap {

// 接口名 -> 接口實(shí)現(xiàn)類(lèi)
private static Map serviceMap = new HashMap<>();

public static void registerService(String serviceKey, Object service) {
serviceMap.put(serviceKey, service);
}

public static Object lookupService(String serviceKey) {
return serviceMap.get(serviceKey);
}
}

為了提高服務(wù)端的并發(fā)度,我們將每一個(gè)請(qǐng)求的處理過(guò)程放到線(xiàn)程池中

@Slf4j
public class RequestHandler implements Runnable {

private Socket socket;

public RequestHandler(Socket socket) {
this.socket = socket;
}

@Override
public void run() {
try (ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream())) {
RpcRequest rpcRequest = (RpcRequest) inputStream.readObject();
Object service = ServiceMap.lookupService(rpcRequest.getInterfaceName());
Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
Object result = method.invoke(service, rpcRequest.getParameters());
outputStream.writeObject(result);
} catch (Exception e) {
log.error("invoke method error", e);
throw new RuntimeException("invoke method error");
}
}

}

啟動(dòng)服務(wù)端

public class RpcProviderMain {

private static final ExecutorService executorService = Executors.newCachedThreadPool();

public static void main(String[] args) throws Exception {

HelloService helloService = new HelloServiceImpl();
UpperCaseService upperCaseService = new UpperCaseServiceImpl();
// 將需要暴露的接口注冊(cè)到serviceMap中
ServiceMap.registerService(HelloService.class.getName(), helloService);
ServiceMap.registerService(UpperCaseService.class.getName(), upperCaseService);

ServerSocket serverSocket = new ServerSocket(8080);

while (true) {
// 獲取一個(gè)套接字(阻塞)。所以為了并行,來(lái)一個(gè)請(qǐng)求,開(kāi)一個(gè)線(xiàn)程處理
// 為了復(fù)用線(xiàn)程,用了threadPool
final Socket socket = serverSocket.accept();
executorService.execute(new RequestHandler(socket));
}
}
}

開(kāi)發(fā)consumer端

前面說(shuō)過(guò),我們要通過(guò)動(dòng)態(tài)代理對(duì)象解耦方法調(diào)用和網(wǎng)絡(luò)調(diào)用,所以接下來(lái)我們就寫(xiě)一下動(dòng)態(tài)代理對(duì)象的實(shí)現(xiàn)邏輯

生成一個(gè)代理對(duì)象的過(guò)程很簡(jiǎn)單

實(shí)現(xiàn)InvocationHandler接口,在invoke方法中增加代理邏輯

調(diào)用Proxy.newProxyInstance方法生成代理對(duì)象,3個(gè)參數(shù)分別是ClassLoader,代理對(duì)象需要實(shí)現(xiàn)的接口數(shù)組,InvocationHandler接口實(shí)現(xiàn)類(lèi)

當(dāng)執(zhí)行代理執(zhí)行實(shí)現(xiàn)的接口方法時(shí),會(huì)調(diào)用到InvocationHandler#invoke,這個(gè)方法中增加了代理邏輯哈。

public class ConsumerProxy {

public static T getProxy(final Class interfaceClass, final String host, final int port) {

return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new Class[]{interfaceClass}, new ConsumerInvocationHandler(host, port));
}
}

可以看到代理對(duì)象的主要功能就是組裝請(qǐng)求參數(shù),然后發(fā)起網(wǎng)絡(luò)調(diào)用

@Slf4j
public class ConsumerInvocationHandler implements InvocationHandler {

private String host;
private Integer port;

public ConsumerInvocationHandler(String host, Integer port) {
this.host = host;
this.port = port;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

try (Socket socket = new Socket(host, port);
ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream())) {
RpcRequest rpcRequest = RpcRequest.builder()
.interfaceName(method.getDeclaringClass().getName())
.methodName(method.getName())
.paramTypes(method.getParameterTypes())
.parameters(args).build();
outputStream.writeObject(rpcRequest);
Object result = inputStream.readObject();
return result;
} catch (Exception e) {
log.error("consumer invoke error", e);
throw new RuntimeException("consumer invoke error");
}
}
}

此時(shí)我們只需要通過(guò)ConsumerProxy#getProxy方法,就能很方便的獲取到代理對(duì)象。通過(guò)代理對(duì)象調(diào)用遠(yuǎn)程方法和調(diào)用本地方法一樣方便

public class RpcConsumerMain {

public static void main(String[] args) {

// 因?yàn)檫@是一個(gè)小demo,就不拆分多模塊了
// 這個(gè)HelloService是通過(guò)網(wǎng)絡(luò)調(diào)用的HelloServiceImpl,而不是本地調(diào)用
HelloService helloService = ConsumerProxy.getProxy(HelloService.class, "127.0.0.1", 8080);
// hello world
System.out.println(helloService.sayHello("world"));
UpperCaseService upperCaseService = ConsumerProxy.getProxy(UpperCaseService.class, "127.0.0.1", 8080);
// THIS IS CONTENT
System.out.println(upperCaseService.toUpperCase("this is content"));
}
}

至此我們已經(jīng)把一個(gè)RPC框架最核心的功能就實(shí)現(xiàn)了,是不是很簡(jiǎn)單?!钙鋵?shí)Dubbo的源碼也很簡(jiǎn)單,只不過(guò)增加了很多擴(kuò)展功能,所以大家有時(shí)候會(huì)認(rèn)為比較難?!?/p>

所以我們就來(lái)分析一下核心的擴(kuò)展功能。比如Filter,服務(wù)降級(jí),集群容錯(cuò)等是如何實(shí)現(xiàn)的?其他的擴(kuò)展功能,比如支持多種注冊(cè)中心,支持多種序列化框架,支持多種協(xié)議,基本不會(huì)打交道,所以就不浪費(fèi)時(shí)間了

從前面的圖示我們知道,代理類(lèi)在服務(wù)調(diào)用和響應(yīng)過(guò)程中扮演著重要的角色?!冈贒ubbo中,代理類(lèi)有個(gè)專(zhuān)有名詞叫做Invoker,而Dubbo中就是通過(guò)對(duì)這個(gè)Invoker不斷進(jìn)行代理增加各種新功能的」

Dubbo服務(wù)導(dǎo)出

「當(dāng)?shù)谌娇蚣芟牒蚐pring整合時(shí),有哪些方式?」

實(shí)現(xiàn)BeanFactoryPostProcessor接口(對(duì)BeanFactory進(jìn)行擴(kuò)展)

實(shí)現(xiàn)BeanPostProcessor接口(對(duì)Bean的生成過(guò)程進(jìn)行擴(kuò)展)

Dubbo也不例外,當(dāng)Dubbo和Spring整合時(shí),會(huì)往容器中注入2個(gè)BeanPostProcessor,作用如下

ServiceAnnotationBeanPostProcessor,將@Service注解的類(lèi)封裝成ServiceBean注入容器 ReferenceAnnotationBeanPostProcessor,將@Reference注解的接口封裝成ReferenceBean注入容器

所以服務(wù)導(dǎo)出和服務(wù)引入肯定和ServiceBean和ReferenceBean的生命周期有關(guān)。

「ServiceBean實(shí)現(xiàn)了ApplicationListener接口,當(dāng)收到ContextRefreshedEvent事件時(shí)(即Spring容器啟動(dòng)完成)開(kāi)始服務(wù)導(dǎo)出?!?/p>

服務(wù)導(dǎo)出比較重要的2個(gè)步驟就是

將服務(wù)注冊(cè)到zk(我們后面的分析,注冊(cè)中心都基于zk哈)

將服務(wù)對(duì)象包裝成Invoker,并保存在一個(gè)map中,key為服務(wù)名,value為Invoker對(duì)象

「當(dāng)收到請(qǐng)求時(shí),根據(jù)服務(wù)名找到Invoker對(duì)象,Invoker對(duì)象根據(jù)方法名和參數(shù)反射執(zhí)行方法,然后將結(jié)果返回?!?/p>

這里留個(gè)小問(wèn)題,反射執(zhí)行方式效率會(huì)很低,那么在Dubbo中還有哪些解決方案呢?

從圖中可以看到AbstractProxyInvoker被其他Invoker進(jìn)行代理了,而這些Invoker是用來(lái)執(zhí)行Filter的,一個(gè)Invoker代理類(lèi)執(zhí)行一個(gè)Filter,層層進(jìn)行代理

「如下圖為Dubbo收到請(qǐng)求層層調(diào)用的過(guò)程」

Dubbo服務(wù)引入

前面我們已經(jīng)推斷出來(lái)服務(wù)導(dǎo)出和ReferenceBean有關(guān)。我們來(lái)看看具體在哪個(gè)階段?ReferenceBean實(shí)現(xiàn)了FactoryBean接口,并重寫(xiě)了getObject方法,在這個(gè)方法中進(jìn)行服務(wù)導(dǎo)出。因此我們推斷服務(wù)導(dǎo)出的時(shí)機(jī)是ReferenceBean被其他對(duì)象注入時(shí)

public Object getObject() {
return get();
}

接下來(lái)就是從注冊(cè)中心獲取服務(wù)地址,構(gòu)建Invoker對(duì)象,并基于Invoker對(duì)象構(gòu)建動(dòng)態(tài)代理類(lèi),賦值給接口。

最終能發(fā)起網(wǎng)絡(luò)調(diào)用的是DubboInvoker,而這個(gè)Invoker被代理了很多層,用來(lái)實(shí)現(xiàn)各種擴(kuò)展功能。

服務(wù)降級(jí)

第一個(gè)就是服務(wù)降級(jí),什么是服務(wù)降級(jí)呢?

「當(dāng)服務(wù)可不用時(shí),我們不希望拋出異常,而是返回特定的值(友好的提示等),這時(shí)候我們就可以用到服務(wù)降級(jí)?!?/p>

dubbo中有很多服務(wù)降級(jí)策略,簡(jiǎn)單舉幾個(gè)例子

force: 代表強(qiáng)制使用 Mock 行為,在這種情況下不會(huì)走遠(yuǎn)程調(diào)用 fail: 只有當(dāng)遠(yuǎn)程調(diào)用發(fā)生錯(cuò)誤時(shí)才使用 Mock 行為

假如有如下一個(gè)controller,調(diào)用DemoService獲取值,但是DemoService并沒(méi)有啟動(dòng)

@RestController
public class DemoController {

@Reference(check = false, mock = "force:return mock")
private DemoService demoService;

@RequestMapping("hello")
public String hello(@RequestParam("msg") String msg) {
return demoService.hello(msg);
}

}

可以看到直接返回mock字符串(也并不會(huì)發(fā)生網(wǎng)絡(luò)調(diào)用)

將@Reference的mock屬性改為如下,再次調(diào)用

@RestController
public class DemoController {

@Reference(check = false, mock = "fail:return fail")
private DemoService demoService;

@RequestMapping("hello")
public String hello(@RequestParam("msg") String msg) {
return demoService.hello(msg);
}

}

會(huì)發(fā)起網(wǎng)絡(luò)調(diào)用,調(diào)用失敗,然后返回fail。

「dubbo中的服務(wù)降級(jí)只用了MockClusterInvoker這一個(gè)類(lèi)來(lái)實(shí)現(xiàn),因此相對(duì)于Hystrix等功能很簡(jiǎn)單,實(shí)現(xiàn)也很簡(jiǎn)單,如下圖。」

當(dāng)Reference不配置mock屬性或者屬性為false時(shí),表示不進(jìn)行降級(jí),直接調(diào)用代理對(duì)象即可

以屬性以force開(kāi)頭時(shí),表示直接進(jìn)行降級(jí),都不會(huì)發(fā)生網(wǎng)絡(luò)調(diào)用

其他請(qǐng)求就是在進(jìn)行網(wǎng)絡(luò)失敗后才進(jìn)行降級(jí)

集群容錯(cuò)

過(guò)了服務(wù)降級(jí)這一層,接下來(lái)就到了集群容錯(cuò)了。

dubbo中有很多集群容錯(cuò)策略

容錯(cuò)策略

解釋

代理類(lèi)

AvailableCluster

找到一個(gè)可用的節(jié)點(diǎn),直接發(fā)起調(diào)用

AbstractClusterInvoker匿名內(nèi)部類(lèi)

FailoverCluster

失敗重試(默認(rèn))

FailoverClusterInvoker

FailfastCluster

快速失敗

FailfastClusterInvoker

FailsafeCluster

安全失敗

FailsafeClusterInvoker

FailbackCluster

失敗自動(dòng)恢復(fù)

FailbackClusterInvoker

ForkingCluster

并行調(diào)用

ForkingClusterInvoker

BroadcastCluster

廣播調(diào)用

BroadcastClusterInvoker

Failover Cluster:失敗自動(dòng)切換,當(dāng)出現(xiàn)失敗,重試其它服務(wù)器。通常用于讀操作,但重試會(huì)帶來(lái)更長(zhǎng)延遲。

Failfast Cluster:快速失敗,只發(fā)起一次調(diào)用,失敗立即報(bào)錯(cuò)。通常用于非冪等性的寫(xiě)操作,比如新增記錄。

Failsafe Cluster:失敗安全,出現(xiàn)異常時(shí),直接忽略。通常用于寫(xiě)入審計(jì)日志等操作。

Failback Cluster:失敗自動(dòng)恢復(fù),后臺(tái)記錄失敗請(qǐng)求,定時(shí)重發(fā)。通常用于消息通知操作。

Forking Cluster:并行調(diào)用多個(gè)服務(wù)器,只要一個(gè)成功即返回。通常用于實(shí)時(shí)性要求較高的讀操作,但需要浪費(fèi)更多服務(wù)資源??赏ㄟ^(guò) forks=”2″ 來(lái)設(shè)置最大并行數(shù)。

Broadcast Cluster:廣播調(diào)用所有提供者,逐個(gè)調(diào)用,任意一臺(tái)報(bào)錯(cuò)則報(bào)錯(cuò) 。通常用于通知所有提供者更新緩存或日志等本地資源信息。

「讀操作建議使用 Failover 失敗自動(dòng)切換,默認(rèn)重試兩次其他服務(wù)器。寫(xiě)操作建議使用 Failfast 快速失敗,發(fā)一次調(diào)用失敗就立即報(bào)錯(cuò)?!?/p>

不知道你發(fā)現(xiàn)沒(méi)?「換集群容錯(cuò)策略就是換DubboInvoker的代理類(lèi)」

集群容錯(cuò)相關(guān)的代理類(lèi)都有一個(gè)共同的屬性RegistryDirectory,這個(gè)是一個(gè)很重要的組件,它用List保存了服務(wù)提供者對(duì)應(yīng)的所有Invoker。

「更牛逼的是這個(gè)List是動(dòng)態(tài)變化的,當(dāng)服務(wù)提供者下線(xiàn)時(shí),會(huì)觸發(fā)相應(yīng)的事件,調(diào)用方會(huì)監(jiān)聽(tīng)這個(gè)事件,并把對(duì)應(yīng)的Invoker刪除,這樣后續(xù)就不會(huì)調(diào)用到下線(xiàn)的服務(wù)了。當(dāng)有新的服務(wù)提供者時(shí),會(huì)觸發(fā)生成新的Invoker?!?/p>

當(dāng)一個(gè)服務(wù)的多個(gè)Invoker擺在我們面前時(shí),該選擇哪個(gè)來(lái)調(diào)用呢?這就不得不提到負(fù)載均衡策略了。

負(fù)載均衡策略實(shí)現(xiàn)類(lèi)

解釋

RandomLoadBalance

隨機(jī)策略(默認(rèn))

RoundRobinLoadBalance

輪詢(xún)策略

LeastActiveLoadBalance

最少活躍調(diào)用數(shù)

ConsistentHashLoadBalance

一致性hash策略

「我們只需要通過(guò)合適的負(fù)載均衡策略來(lái)選擇即可」

和服務(wù)端類(lèi)似類(lèi)似,最終能發(fā)送網(wǎng)絡(luò)請(qǐng)求的Invoker還會(huì)被Filter對(duì)應(yīng)的Invoker類(lèi)所代理,一個(gè)Filter一個(gè)代理類(lèi),層層代理。

如下圖為Dubbo發(fā)送請(qǐng)求時(shí)層層調(diào)用的過(guò)程

好了,Dubbo一些比較重要的擴(kuò)展點(diǎn)就分享完了,整個(gè)請(qǐng)求響應(yīng)的基本過(guò)程也串下來(lái)了!


新聞名稱(chēng):你管這個(gè)叫Dubbo?
分享網(wǎng)址:http://www.5511xx.com/article/cdjcceh.html