0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫(xiě)文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

如何手搓一個(gè)自定義的RPC 遠(yuǎn)程過(guò)程調(diào)用框架

京東云 ? 來(lái)源:jf_75140285 ? 作者:jf_75140285 ? 2024-07-22 12:17 ? 次閱讀

1、RPC(遠(yuǎn)程過(guò)程調(diào)用概述)

遠(yuǎn)程過(guò)程調(diào)用(RPC, Remote Procedure Call)是一種通過(guò)網(wǎng)絡(luò)從遠(yuǎn)程計(jì)算機(jī)程序上請(qǐng)求服務(wù),而無(wú)需了解網(wǎng)絡(luò)細(xì)節(jié)的通信技術(shù)。在分布式系統(tǒng)中,RPC是一種常用的技術(shù),能夠簡(jiǎn)化客戶端與服務(wù)器之間的交互。本文將介紹如何基于Netty(網(wǎng)絡(luò)編程框架)實(shí)現(xiàn)一個(gè)自定義的簡(jiǎn)單的RPC框架。

首先簡(jiǎn)單介紹一下RPC 主要特點(diǎn):

1.1、RPC遠(yuǎn)程過(guò)程調(diào)用的主要特點(diǎn)

?透明性: 調(diào)用方(客戶端)調(diào)用遠(yuǎn)程服務(wù)就像調(diào)用本地API函數(shù)一樣,而無(wú)需關(guān)心執(zhí)行過(guò)程中的底層的網(wǎng)絡(luò)通信細(xì)節(jié)。

?客戶端-服務(wù)器模型:RPC通常基于客戶端-服務(wù)器模型,客戶端發(fā)送請(qǐng)求到服務(wù)器,服務(wù)器處理請(qǐng)求并返回結(jié)果。

?序列化及反序列化:RPC需要將請(qǐng)求參數(shù)序列化成字節(jié)流(即數(shù)據(jù)轉(zhuǎn)換成網(wǎng)絡(luò)可傳輸?shù)母袷剑┎⑼ㄟ^(guò)網(wǎng)絡(luò)傳輸?shù)椒?wù)器端,服務(wù)器端接收到字節(jié)流后,需按照約定的協(xié)議將數(shù)據(jù)進(jìn)行反序列化(即恢復(fù)成三原始格式)

?同步及異步調(diào)用:RPC支持同步、異步調(diào)用。同步調(diào)用會(huì)阻塞直到服務(wù)器返回結(jié)果,或超時(shí)、異常等。而異步調(diào)用則可以立即返回,通過(guò)注冊(cè)一個(gè)回調(diào)函數(shù),在有結(jié)果返回的時(shí)候再進(jìn)行處理。從而讓客戶端可以繼續(xù)執(zhí)行其它操作。

?錯(cuò)誤處理:PRC由于涉及網(wǎng)絡(luò)通信,因此需要處理各種可能的網(wǎng)絡(luò)異常,如網(wǎng)絡(luò)故障,服務(wù)宕機(jī),請(qǐng)求超時(shí),服務(wù)重啟、或上下線、擴(kuò)縮容等,這些對(duì)調(diào)用方來(lái)說(shuō)需要保持透明。

??協(xié)議及傳輸居:RPC可以基于多種協(xié)議和傳輸層實(shí)現(xiàn),如HTTP、TCP等,本文采用的是基于TCP的自定義協(xié)議。

1.2、RPC的應(yīng)用場(chǎng)景

?分布式系統(tǒng):多個(gè)服務(wù)之間進(jìn)行通信,如微服務(wù)框架。

?客戶端-服務(wù)器架構(gòu):如移動(dòng)應(yīng)用與后臺(tái)服務(wù)器的交互。

?跨平臺(tái)調(diào)用:不同技術(shù)棧之間的服務(wù)調(diào)用。

?API服務(wù):通過(guò)公開(kāi)API對(duì)外提供功能,使用客戶端能方便使用服務(wù)提供的功能,如支付網(wǎng)關(guān),身份驗(yàn)證服務(wù)等。

?大數(shù)據(jù)處理:在大數(shù)據(jù)處理框架中,不同節(jié)點(diǎn)之間需要頻繁通信來(lái)協(xié)調(diào)任務(wù)和交接數(shù)據(jù),RPC可以提供高效的節(jié)點(diǎn)通信機(jī)制,如Hadoop 和Spark等大數(shù)據(jù)框架中節(jié)點(diǎn)間的通信。

?云計(jì)算:在云計(jì)算環(huán)境中,服務(wù)通常分布在多個(gè)虛擬機(jī)或容器中,通過(guò)RPC實(shí)現(xiàn)實(shí)現(xiàn)服務(wù)間的通信和管理。

?跨網(wǎng)絡(luò)服務(wù)調(diào)用:當(dāng)應(yīng)用需要調(diào)用部署在不同網(wǎng)絡(luò)中的服務(wù)時(shí),RPC提供了一種簡(jiǎn)單而建議目前的調(diào)用方式,如。跨數(shù)據(jù)中心或嘴唇地域的服務(wù)調(diào)用。

1.3、常見(jiàn)的RPC框架

?JSF:京東開(kāi)源的分布式服務(wù)框架,提供高性能、可擴(kuò)展、穩(wěn)定的服務(wù)治理能力,支持服務(wù)注冊(cè)及發(fā)現(xiàn),負(fù)載均衡、容錯(cuò)機(jī)制、服務(wù)監(jiān)控、多種協(xié)議支持等。

?gRPC:基于HTTP/2和Protocol Buffers的高性能RPC框架,由Google開(kāi)發(fā)。

?Dubbo:一個(gè)高性能、輕量級(jí)的Java RPC框架,用于提供基于接口的遠(yuǎn)程服務(wù)調(diào)用,支持負(fù)載均衡、服務(wù)自動(dòng)注冊(cè)及服務(wù)、容錯(cuò)等。

?JSON-RPC:使用JSON格式編碼調(diào)用和結(jié)果的RPC協(xié)議。

?Apache Thrift:由Facebook開(kāi)發(fā),支持多種編程語(yǔ)言和協(xié)議

2、實(shí)現(xiàn)自定義的RPC

要實(shí)現(xiàn)一個(gè)自定義的RPC框架需解決以下幾個(gè)主要問(wèn)題:

1.客戶端調(diào)用:客戶端調(diào)用本地的代理函數(shù)(stub代碼,這個(gè)函數(shù)負(fù)責(zé)將調(diào)用轉(zhuǎn)換為RPC請(qǐng)求)。這其實(shí)就是一個(gè)接口描述文件,它可以有多種形式如JSON、XML、甚至是一份word文檔或是口頭約定均可,只要客戶端及服務(wù)端都是遵守這份接口描述文件契約即可。在我們的實(shí)際開(kāi)發(fā)中一種常見(jiàn)的方式是服務(wù)提供者發(fā)布一個(gè)包含服務(wù)接口類的jar包到maven 中央倉(cāng)庫(kù),調(diào)用方通過(guò)pom文件將之依賴到本地。

2.參數(shù)序列化:代理函數(shù)將調(diào)用參數(shù)進(jìn)行序列化,并將請(qǐng)求發(fā)送到服務(wù)器。

3.服務(wù)端數(shù)據(jù)接收:服務(wù)器端接收到請(qǐng)求,并將其反序列化,恢復(fù)成原始參數(shù)。

4.執(zhí)行遠(yuǎn)程過(guò)程:服務(wù)端調(diào)用實(shí)際的服務(wù)過(guò)程(函數(shù))并獲取結(jié)果。

5.返回結(jié)果:服務(wù)端將調(diào)用結(jié)果進(jìn)行序列化,并通過(guò)網(wǎng)絡(luò)傳給客戶端。

6.客戶端接收調(diào)用結(jié)果:客戶到接收到服務(wù)端傳輸?shù)淖止?jié)流,進(jìn)行反序列化,轉(zhuǎn)換為實(shí)際的結(jié)果數(shù)據(jù)格式,并返回到原始調(diào)用方。

下面需我們通過(guò)代碼一一展示上述各功能是如何實(shí)現(xiàn)的。

2.1、自定義通信協(xié)議

本文的目的是要實(shí)現(xiàn)一個(gè)自定義通信協(xié)議的遠(yuǎn)程調(diào)用框架,所以首先要定義一個(gè)通信協(xié)議數(shù)據(jù)格式。

整個(gè)自定義協(xié)議總體上分為Header 及 Body Content兩部分;Header 占16個(gè)字節(jié),又分為4個(gè)部分。

前2位為魔法值用于Netty編解碼組件,解決網(wǎng)絡(luò)通信中的粘包、半包等問(wèn)題,此處不展開(kāi)細(xì)講。

msgtype用于表示消息的類型,如request(請(qǐng)求)、respone(響應(yīng))、heartbeat(心跳)等。

code 占1位,表示請(qǐng)求的響應(yīng)狀態(tài),成功還是失敗。

request id占8位,表示請(qǐng)求的序列號(hào),用于后續(xù)調(diào)用結(jié)果的匹配,保證線程內(nèi)唯一。

body size 占4位,表示實(shí)現(xiàn)請(qǐng)求內(nèi)容的長(zhǎng)度,在反序化時(shí)讀取此長(zhǎng)度的內(nèi)容字節(jié),解析出正確的數(shù)據(jù)。

客戶端、服務(wù)端在通信過(guò)程中都要按照上述約定的通信協(xié)議進(jìn)行數(shù)據(jù)的編、解碼工作。

2.2、客戶端調(diào)用

2.2.1 客戶端的使用

客戶端一般通過(guò)接口代理工廠通過(guò)動(dòng)態(tài)代理技術(shù)來(lái)生成一個(gè)代理實(shí)例,所有的遠(yuǎn)程調(diào)用中的細(xì)節(jié),如參數(shù)序列化,網(wǎng)絡(luò)傳輸,異常處理等都隱藏在代理實(shí)例中實(shí)現(xiàn),對(duì)調(diào)用方來(lái)說(shuō)調(diào)用過(guò)程是透明的,就像調(diào)用本地方法一樣。

首先看一下客戶端的使用方式,本文假設(shè)一個(gè)IShoppingCartService (購(gòu)物車(chē))的接口類,基中有一個(gè)方法根據(jù)傳入的用戶pin,返回購(gòu)物車(chē)詳情。

//接口方法
ShoppingCart shopping(String pin);
//客戶端通過(guò)代理工廠實(shí)現(xiàn)接口的一個(gè)代理實(shí)例
IShoppingCartService serviceProxy = ProxyFactory.factory(IShoppingCartService.class)                
                .setSerializerType(SerializerType.JDK) //客戶端設(shè)置所使用的序列化工具,此處為JDK原生
                .newProxyInstance(); //返回代理 實(shí)現(xiàn)
                
//像調(diào)用本地方法一樣,調(diào)用此代理實(shí)例的shopping 方法
ShoppingCart result = serviceProxy.shopping("userPin");
log.info("result={}", JSONObject.toJSONString(result));

2.2.2、客戶端代理工廠的核心功能

public class ProxyFactory {
    //……省略
    /**
     * 代理對(duì)象
     *
     * @return
     */
    public I newProxyInstance() {     
        //服務(wù)的元數(shù)據(jù)信息
        ServiceData serviceData = new ServiceData(
                group, //分組
                providerName, //服務(wù)名稱,一般為接口的class的全限定名稱
                StringUtils.isNotBlank(version) ? version : "1.0.0" //版本號(hào)
        );

        //調(diào)用器
        Calller caller = newCaller().timeoutMillis(timeoutMillis);
        //集群策略,用于實(shí)現(xiàn)快速失敗或失敗轉(zhuǎn)等功能
        Strategy strategy = StrategyConfigContext.of(strategy, retries);
        Object handler = null;
        switch (invokeType) {
            case "syncCall":
                //同步調(diào)用handler
                handler = new SyncCaller(serviceData, caller);
                break;
            case "asyncCall":
                //異步調(diào)用handler
                handler = new AsyncCaller(client.appName(), serviceData, caller, strategy);
                break;
            default:
                throw new RuntimeException("未知類型: " + invokeType);
        }

        //返回代理實(shí)例
        return ProxyEnum.getDefault().newProxy(interfaceClass, handler);
    }
    //……省略
}

代碼 ProxyEnum.getDefault().newProxy(interfaceClass, handler) 返回一個(gè)具體的代理實(shí)例,此方法要求傳入兩個(gè)參數(shù),interfaceClass 被代理的接口類class,即服務(wù)方所發(fā)布的服務(wù)接口類。

handler 為動(dòng)態(tài)代理所需要代碼增強(qiáng)邏輯,即所有的調(diào)用細(xì)節(jié)都由此增強(qiáng)類完成。按照動(dòng)態(tài)代理的實(shí)現(xiàn)方式的不同,本文支持兩種動(dòng)態(tài)代理方式:

1.JDK動(dòng)態(tài)代碼,如采用此方式,handler 需要實(shí)現(xiàn)接口 InvocationHandler

2.ByteBuddy,它是一個(gè)用于在運(yùn)行時(shí)生成、修改和操作Java類的庫(kù),允許開(kāi)發(fā)者通過(guò)簡(jiǎn)單的API生成新的類或修改已有的類,而無(wú)需手動(dòng)編寫(xiě)字節(jié)碼,它廣泛應(yīng)用于框架開(kāi)發(fā)、動(dòng)態(tài)代理、字節(jié)碼操作和類加載等領(lǐng)域。

本文默認(rèn)采用第二種方式,通代碼簡(jiǎn)單展示一下代理實(shí)例的的生成方式。

//方法newProxy 的具體實(shí)現(xiàn)
public  T newProxy(Class interfaceType, Object handler) {
            Class cls = new ByteBuddy()
                     //生成接口的子類
                    .subclass(interfaceType) 
                     //默認(rèn)代理接口中所有聲明的方法
                    .method(ElementMatchers.isDeclaredBy(interfaceType))
                     //代碼增強(qiáng),即接口中所有被代理的方法都
                     //委托給用戶自定義的handler處理,這也是動(dòng)態(tài)代理的意義所在
                   .intercept(MethodDelegation.to(handler, "handlerInstance"))
                    .make()
                     //通過(guò)類加載器加載
                   .load(interfaceType.getClassLoader(), ClassLoadingStrategy.Default.INJECTION)
                    .getLoaded();

            try {
                //通過(guò)newInstance構(gòu)建一個(gè)代理實(shí)例并返回    
               return cls.newInstance();
            } catch (Throwable t) {
                ……
            }
        }

本文以同步調(diào)用為例,現(xiàn)在展示一下 SyncInvoker 的具體實(shí)現(xiàn)邏輯

public class SyncCaller extends AbstractCaller {
    //……省略   
    /**
     * @RuntimeType 的作用提示ByteBuddy根據(jù)被攔截方法的實(shí)際類型,對(duì)此攔截器的返回值進(jìn)行類型轉(zhuǎn)換
     */
    @RuntimeType
    public Object syncCall(@Origin Method method, @AllArguments @RuntimeType Object[] args) throws Throwable {
        //封裝請(qǐng)求的接口中的方法名及方法參數(shù),組成一個(gè)request請(qǐng)求對(duì)象
        StarGateRequest request = createRequest(methodName, args);
        //集群容錯(cuò)策略調(diào)度器接口
        //提供快速失敗,失敗轉(zhuǎn)移等策略供調(diào)用方選擇,此處默認(rèn)采用了快速失敗的策略
        Invoker invoker = new FastFailInvoker();
        //returnType 的類型決定了泛型方法的實(shí)際結(jié)果類型,用于后續(xù)調(diào)用結(jié)果的類型轉(zhuǎn)換
        Future future = invoker.invoke(request, method.getReturnType());
        if (sync) {
            //同步調(diào)用,線程會(huì)阻塞在get方法,直到超時(shí)或結(jié)果可用
            Object result = future.getResult();
            return result;
        } else {
            return future;
        }
    }
}

//同步,異步調(diào)用的關(guān)鍵點(diǎn)就在于InvokeFuture,它繼承了Java的CompletionStage類,用于異步編程

通過(guò)以上核心代碼,客戶端就完成了服務(wù)調(diào)用環(huán)節(jié),下一步RPC框架需要將客戶端請(qǐng)求的接口方法及方法參數(shù)進(jìn)行序列化并通過(guò)網(wǎng)絡(luò)進(jìn)行傳輸。下面通過(guò)代碼片段展示一下序列化的實(shí)現(xiàn)方式。

2.2.3、請(qǐng)求參數(shù)序列化

我們將請(qǐng)求參數(shù)序列化的目的就是將具體的請(qǐng)求參數(shù)轉(zhuǎn)換成字節(jié)組,填充進(jìn)入上述自定義協(xié)議的 body content 部分。下面通過(guò)代碼演示一下如何進(jìn)行反序列化。

本文默認(rèn)采用JDK原生的對(duì)象序列化及反序列化框架,也可通過(guò)SPI技術(shù)擴(kuò)展支持Protocol Buffers等。

//上述代碼行Future future = invoker.invoke(request, method.getReturnType());
//具體實(shí)現(xiàn)

public  Future invoke(StarGateRequest request, Class returnType) throws Exception {
        //對(duì)象序列化器,默認(rèn)為JDK
        final Serializer _serializer = serializer();
        //message對(duì)象包含此次請(qǐng)求的接口名,方法名及實(shí)際參數(shù)列表
        final Message message = request.message();
        //通過(guò)軟負(fù)載均衡選擇一個(gè) Netty channel
        Channel channel = selectChannel(message.getMetadata());
        byte code = _serializer.code();
        //將message對(duì)象序列成字節(jié)數(shù)組
        byte[] bytes = _serializer.writeObject(message);
        request.bytes(code, bytes);

        //數(shù)據(jù)寫(xiě)入 channel 并返回 future 約定,用于同步或異步獲得調(diào)用結(jié)果
        return write(channel, request, returnType);
    }
//對(duì)象的序列化,JDK 原生方式
public  byte[] writeObject(T obj) {
        ByteArrayOutputStream buf = OutputStreams.getByteArrayOutputStream();
        try (ObjectOutputStream output = new ObjectOutputStream(buf)) {
            output.writeObject(obj);
            output.flush();
            return buf.toByteArray();
        } catch (IOException e) {
            ThrowUtil.throwException(e);
        } finally {
            OutputStreams.resetBuf(buf);
        }
        return null; 
    }

2.2.4、請(qǐng)求參數(shù)通過(guò)網(wǎng)絡(luò)發(fā)送

//上述代碼  write(channel, request, returnType);
//具體實(shí)現(xiàn)
protected  DefaultFuture write(final Channel channel,
                                               final StarGateRequest request,
                                               final Class returnType) {
        //……省略
        
        //調(diào)用結(jié)果占位 future對(duì)象,這也是promise編程模式
        final Future future = DefaultFuture.newFuture(request.invokeId(), channel, timeoutMillis, returnType);
        
        //將請(qǐng)求負(fù)載對(duì)象寫(xiě)入Netty channel通道,并綁定監(jiān)聽(tīng)器處理寫(xiě)入結(jié)果
        channel.writeAndFlush(request).addListener((ChannelFutureListener) listener -> {
            if (listener.isSuccess()) {
                //網(wǎng)絡(luò)寫(xiě)入成功
                ……
            } else {
                //異常時(shí),構(gòu)造造調(diào)用結(jié)果,供調(diào)用方進(jìn)行處理
                DefaultFuture.errorFuture(channel, response, dispatchType);
            }
        });
        
        //因?yàn)镹etty 是非阻塞的,所以寫(xiě)入后可立刻返回
        return future;
    }

2.2.4.1、Netty 消息編碼器

消息寫(xiě)入Netty channel 后,會(huì)依次經(jīng)過(guò) channel pipline 上所安裝的各種handler處理,然后再通過(guò)物理網(wǎng)絡(luò)將數(shù)據(jù)發(fā)送出去,這里展示了客戶端及服務(wù)端所使用的自定義編、解解器

//自定義的編碼器 繼承自Netty 的 MessageToByteEncoder
public class StarGateEncoder extends MessageToByteEncoder {

    //……省略
    
    private void doEncodeRequest(RequestPayload request, ByteBuf out) {
        byte sign = StarGateProtocolHeader.toSign(request.serializerCode(), StarGateProtocolHeader.REQUEST);
        long invokeId = request.invokeId();
        byte[] bytes = request.bytes();
        int length = bytes.length;

        out.writeShort(StarGateProtocolHeader.Head)  //寫(xiě)入兩個(gè)字節(jié)
                .writeByte(sign)              //寫(xiě)入1個(gè)字節(jié)
                .writeByte(0x00)            //寫(xiě)入1個(gè)字節(jié)
                .writeLong(invokeId)          //寫(xiě)入8個(gè)節(jié)節(jié)
                .writeInt(length)             //寫(xiě)入4個(gè)字節(jié)
                .writeBytes(bytes);
    }

}

至此,通過(guò)上述核心代碼,客戶的請(qǐng)求已經(jīng)按照自定義的協(xié)議格式進(jìn)行了序列化,并把數(shù)據(jù)寫(xiě)入到Netty channel中,最后通過(guò)物理網(wǎng)絡(luò)傳輸?shù)椒?wù)器端。

2.3、服務(wù)端接收數(shù)據(jù)

2.3.1、消息解碼器

服務(wù)器端接收到客戶端的發(fā)送的數(shù)據(jù)后,需要進(jìn)行正確的消息解碼,下面是解碼器的實(shí)現(xiàn)。

//消息解碼器,繼承自Netty 的ReplayingDecoder,將客戶端請(qǐng)求解碼為 RequestPayload 對(duì)象,供業(yè)務(wù)處理handler使用
public class StarGateDecoder extends ReplayingDecoder {

    //……省略
    
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
        switch (state()) {
            case HEAD:
                checkMagic(in.readShort());         // HEAD
                checkpoint(State.HEAD);
            case SIGN:
                header.sign(in.readByte());         // 消息標(biāo)志位
                checkpoint(State.STATUS);
            case STATUS:
                header.status(in.readByte());       // 狀態(tài)位
                checkpoint(State.ID);
            case ID:
                header.id(in.readLong());           // 消息id
                checkpoint(State.BODY_SIZE);
            case BODY_SIZE:
                header.bodySize(in.readInt());      // 消息體長(zhǎng)度
                checkpoint(State.BODY);
            case BODY:
                switch (header.messageCode()) {
                    //……省略
                    case StarGateProtocolHeader.REQUEST: {
                        //消息體長(zhǎng)度信息
                        int length = checkBodySize(header.bodySize());
                        byte[] bytes = new byte[length];
                        //讀取指定長(zhǎng)度字節(jié)
                        in.readBytes(bytes);
                        //調(diào)用請(qǐng)求
                        RequestPayload request = new RequestPayload(header.id());
                        //設(shè)置序列化器編碼,有效載荷
                        request.bytes(header.serializerCode(), bytes);
                        out.add(request);
                        break;
                    }
                    
                    default:
                        throw new Exception("錯(cuò)誤標(biāo)志位");
                }
                checkpoint(State.HEAD);
        }
    }

   //……省略
}

2.3.2、請(qǐng)求參數(shù)反序列化

//服務(wù)端 Netty channel pipline 上所安裝的業(yè)務(wù)處理 handler
//業(yè)務(wù)處理handler 對(duì)RequestPayload 所攜帶的字節(jié)數(shù)組進(jìn)行反序列化,解析出客戶端所傳遞的實(shí)際參數(shù)
public class ServiceHandler extends ChannelInboundHandlerAdapter {

    //……省略
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel ch = ctx.channel();
        if (msg instanceof RequestPayload) {       
            StarGateRequest request = new StarGateRequest((RequestPayload) msg);
            //約定的反序列化器, 由客戶端設(shè)置
            byte code = request.serializerCode();
            Serializer serializer = SerializerFactory.getSerializer(code);
            //實(shí)際請(qǐng)求參數(shù)字組數(shù)組
            byte[] bytes = payload.bytes();
            //對(duì)象反序列化
            Message message = serializer.readObject(bytes, Message.class);
            log.info("message={}", JSONObject.toJSONString(message));
       
            request.message(message);
            
            //業(yè)務(wù)處理
            process(message);
        } else {
            //引用釋放
            ReferenceCountUtil.release(msg);
        }
    }

    //……省略
}

2.3.3、處理客戶端請(qǐng)求

經(jīng)過(guò)反序列化后,服務(wù)端可以知道用戶所請(qǐng)求的是哪個(gè)接口、方法、以及實(shí)際的參數(shù)值,下一步就可進(jìn)行真實(shí)的方法調(diào)用。

//處理調(diào)用
public void process(Message message) {         
   try {
       ServiceMetadata metadata = msg.getMetadata(); //客戶端請(qǐng)求的元數(shù)據(jù)
       String providerName = metadata.getProviderName(); //服務(wù)名,即接口類名
       
       //根據(jù)接口類名,查找服務(wù)端實(shí)現(xiàn)此接口的類的全限定類名
       providerName = findServiceImpl(providerName);
       String methodName = msg.getMethodName();  //方法名
       Object[] args = msg.getArgs();    //客戶設(shè)置的實(shí)際參數(shù)

       //線程上下文類加載器
       ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
       //加載具體實(shí)現(xiàn)類
       Class clazz = classLoader.loadClass(providerName);
       //創(chuàng)建接口類實(shí)例
       Object instance = clazz.getDeclaredConstructor().newInstance();

       Method method = null;
       Class[] parameterTypes = new Class[args.length];
       for (int i = 0; i < args.length; i++) {
           parameterTypes[i] = args[i].getClass();
       }
       method = clazz.getMethod(methodName, parameterTypes);
       
       //反射調(diào)用 
       Object invokeResult = method.invoke(instance, args);
       } catch (Exception e) {
            log.error("調(diào)用異常:", e);
           throw new RuntimeException(e);
       }

       //處理同步調(diào)用結(jié)果
      doProcess(invokeResult);
        
}

2.3.4、 返回調(diào)用結(jié)果

通過(guò)反射調(diào)用接口實(shí)現(xiàn)類,獲取調(diào)用結(jié)果,然后對(duì)結(jié)果進(jìn)行序列化并包裝成response響應(yīng)消息,將消息寫(xiě)入到channel, 經(jīng)過(guò)channel pipline 上所安裝的編碼器對(duì)消息對(duì)象進(jìn)行編碼,最后發(fā)送給調(diào)用客戶端。

//處理同步調(diào)用結(jié)果,并將結(jié)果寫(xiě)回到 Netty channel
private void doProcess(Object realResult) {
        ResultWrapper result = new ResultWrapper();
        result.setResult(realResult);
        byte code = request.serializerCode();
        Serializer serializer = SerializerFactory.getSerializer(code);
        //new response 響應(yīng)消息對(duì)象
        Response response = new Response(request.invokeId());
        //調(diào)用結(jié)果序列成字節(jié)數(shù)組
        byte[] bytes = serializer.writeObject(result);
        response.bytes(code, bytes);
        response.status(Status.OK.value());
        
        //響應(yīng)消息對(duì)象 response 寫(xiě)入 Netty channel
        channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    log.info("響應(yīng)成功");
                } else {
                    //記錄調(diào)用失敗日志
                    log.error("響應(yīng)失敗, channel: {}, cause: {}.", channel, channelFuture.cause());
                }
            }
        });
    }

同樣的,消息寫(xiě)入channel 后,先依次經(jīng)過(guò)pipline 上所安裝的 消息編碼器,再發(fā)送給客戶端。具體編碼方式同客戶端編碼器類似,此處不再贅述。

2.4、客戶端接收調(diào)用結(jié)果

客戶端收到服務(wù)端寫(xiě)入響應(yīng)消息后,同樣經(jīng)過(guò)Netty channel pipline 上所安裝的解碼器,進(jìn)行正確的解碼。然后再對(duì)解碼后的對(duì)象進(jìn)行正確的反序列化,最終獲得調(diào)用結(jié)果 。具體的解碼,反序列化過(guò)程不再贅述,流程基本同上面服務(wù)端的解碼及反序列化類似。


public class consumerHandler extends ChannelInboundHandlerAdapter {

    //……省略

    //客戶端處理所接收到的消息
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel ch = ctx.channel();
        if (msg instanceof ResponseMessage) {
            try {
                //類型轉(zhuǎn)換
                ResponseMessage responseMessage= (ResponseMessage)msg
                StarGateResponse response = new StarGateResponse(ResponseMessage.getMsg());
                byte code = response.serializerCode();
                Serializer serializer = SerializerFactory.getSerializer(code);
                byte[] bytes = responseMessage.bytes();
                //反序列化成調(diào)用結(jié)果的包裝類
                Result result = serializer.readObject(bytes, Result.class);
                response.result(result);

                //處理調(diào)用結(jié)果
                long invokeId = response.id();
                //通過(guò) rnvokeid,從地緩存中拿到客戶端調(diào)用的結(jié)果點(diǎn)位對(duì)象 futrue
                DefaultFuture future = FUTURES_MAP.remove(invokeId);
                
                //判斷調(diào)用是否成功
                byte status = response.status();
                if (status == Status.OK.value()) {
                    //對(duì)調(diào)用結(jié)果進(jìn)行強(qiáng)制類型轉(zhuǎn)換,并設(shè)置future結(jié)果,對(duì)阻塞在future.get()的客戶端同步調(diào)用來(lái)說(shuō),調(diào)用返回。
                    complete((V) response.getResult());
                } else {
                    //todo 處理異常
                }

            } catch (Throwable t) {
                log.error("調(diào)用記錄: {}, on {} #channelRead().", t, ch);
            }
        } else {
            log.warn("消息類型不匹配: {}, channel: {}.", msg.getClass(), ch);
            //計(jì)數(shù)器減1
            ReferenceCountUtil.release(msg);
        }
    }
    
}

下面再通過(guò)一個(gè)簡(jiǎn)單的調(diào)用時(shí)序圖展示一下一次典型的Rpc調(diào)用所經(jīng)歷的步驟。

wKgZomad3XWACQsnAAMXAY-ukIw894.png

??

3、結(jié)尾

本文首先簡(jiǎn)單介紹了一下RPC的概念、應(yīng)用場(chǎng)景及常用的RPC框架,然后講述了一下如何自己手動(dòng)實(shí)現(xiàn)一個(gè)RPC框架的基本功能。目的是想讓大家對(duì)RPC框架的實(shí)現(xiàn)有一個(gè)大概思路,并對(duì)Netty 這一高效網(wǎng)絡(luò)編程框架有一個(gè)了解,通過(guò)對(duì)Netty 的編、解碼器的學(xué)習(xí),了解如何自定義一個(gè)私有的通信協(xié)議。限于篇幅本文只簡(jiǎn)單講解了RPC的核心的調(diào)用邏輯的實(shí)現(xiàn)。真正生產(chǎn)可用的RPC框架還需要有更多復(fù)雜的功能,如限流、負(fù)載均衡、融斷、降級(jí)、泛型調(diào)用、自動(dòng)重連、自定義可擴(kuò)展的攔截器等等。

另外RPC框架中一般有三種角色,服務(wù)提供者、服務(wù)消費(fèi)者、注冊(cè)中心,本文并沒(méi)有介紹注冊(cè)中心如何實(shí)現(xiàn)。并假定服務(wù)提供者已經(jīng)將服務(wù)發(fā)布到了注冊(cè)中心,服務(wù)消費(fèi)者跟服務(wù)提供者之間建立起了TCP 長(zhǎng)連接。

后續(xù)會(huì)通過(guò)其它篇章介紹注冊(cè)中心,服務(wù)自動(dòng)注冊(cè),服務(wù)發(fā)現(xiàn)等功能的實(shí)現(xiàn)原理。

注:參考資料開(kāi)源代碼庫(kù) Jupiter ( https://github.com/fengjiachun/Jupiter.git ),對(duì)RPC框架實(shí)現(xiàn)原理感興趣的同學(xué),強(qiáng)烈建議閱讀此代碼,肯定獲益匪淺。

審核編輯 黃宇

聲明:本文內(nèi)容及配圖由入駐作者撰寫(xiě)或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問(wèn)題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • 框架
    +關(guān)注

    關(guān)注

    0

    文章

    397

    瀏覽量

    17358
  • RPC
    RPC
    +關(guān)注

    關(guān)注

    0

    文章

    110

    瀏覽量

    11483
收藏 人收藏

    評(píng)論

    相關(guān)推薦

    嵌入式遠(yuǎn)程過(guò)程調(diào)用組件--eRPC

    概述 RPC(Remote Procedure call)遠(yuǎn)程過(guò)程調(diào)用。其分為兩部分:遠(yuǎn)程過(guò)程過(guò)程調(diào)用
    的頭像 發(fā)表于 04-06 14:15 ?2227次閱讀

    SQL server服務(wù),遠(yuǎn)程過(guò)程調(diào)用失敗的解決方法

    成功解決SQL server服務(wù),遠(yuǎn)程過(guò)程調(diào)用失敗
    發(fā)表于 12-27 10:11

    Linux rpc編程過(guò)程

    通過(guò)rpcgen的man手冊(cè)看到此工具的作用是把RPC源程序編譯成C語(yǔ)言源程序,從而輕松實(shí)現(xiàn)遠(yuǎn)程過(guò)程調(diào)用。
    發(fā)表于 07-24 07:25

    用戶自定義結(jié)構(gòu)數(shù)據(jù)怎么存儲(chǔ)成VARIANT類型

    的VARIANT,但是,多數(shù)時(shí)候可能需要個(gè)更加簡(jiǎn)單的,靈活的方法。我在做遠(yuǎn)程過(guò)程調(diào)用的C接口時(shí),忽然聯(lián)想到,既然RPC可以把任何數(shù)據(jù)以字節(jié)的形式發(fā)送,那么,就可以利用這個(gè)機(jī)制,把結(jié)構(gòu)
    發(fā)表于 08-02 07:08

    怎么將用戶自定義的C結(jié)構(gòu)數(shù)據(jù)存儲(chǔ)成VARIANT類型

    的VARIANT,但是,多數(shù)時(shí)候可能需要個(gè)更加簡(jiǎn)單的,靈活的方法。我在做遠(yuǎn)程過(guò)程調(diào)用的C接口時(shí),忽然聯(lián)想到,既然RPC可以把任何數(shù)據(jù)以字節(jié)的形式發(fā)送,那么,就可以利用這個(gè)機(jī)制,把結(jié)構(gòu)
    發(fā)表于 08-02 07:08

    用戶自定義C結(jié)構(gòu)數(shù)據(jù)怎么轉(zhuǎn)換成VARIANT轉(zhuǎn)換

    的VARIANT,但是,多數(shù)時(shí)候可能需要個(gè)更加簡(jiǎn)單的,靈活的方法。我在做遠(yuǎn)程過(guò)程調(diào)用的C接口時(shí),忽然聯(lián)想到,既然RPC可以把任何數(shù)據(jù)以字節(jié)的形式發(fā)送,那么,就可以利用這個(gè)機(jī)制,把結(jié)構(gòu)
    發(fā)表于 08-06 07:51

    RPC的結(jié)構(gòu)原理是什么?

    遠(yuǎn)程過(guò)程調(diào)用RPC)是個(gè)協(xié)議,程序可以使用這個(gè)協(xié)議請(qǐng)求網(wǎng)絡(luò)中另臺(tái)計(jì)算機(jī)上某程序的服務(wù)而不需知道網(wǎng)絡(luò)細(xì)節(jié)。(
    發(fā)表于 10-12 10:43

    matlab自定義函數(shù)調(diào)用的方法

    matlab自定義函數(shù)調(diào)用的方法 命令文件/函數(shù)文件+ 函數(shù)文件 - 多
    發(fā)表于 11-29 13:14 ?88次下載

    適用于鴻蒙的自定義組件框架Carbon案例教程

    項(xiàng)目名稱:Carbon 所屬系列:ohos的第三方組件適配移植 功能:個(gè)適用于鴻蒙的自定義組件框架,幫助快速實(shí)現(xiàn)各種需要的效果 項(xiàng)目移植狀態(tài):大部分移植
    發(fā)表于 04-07 09:49 ?5次下載

    深入理解RPC自定義網(wǎng)絡(luò)協(xié)議

    只要涉及到網(wǎng)絡(luò)通信,必然涉及到網(wǎng)絡(luò)協(xié)議,應(yīng)用層也是樣。在應(yīng)用層最標(biāo)準(zhǔn)和常用的就是HTTP協(xié)議。但在很多性能要求較高的場(chǎng)景各大企業(yè)內(nèi)部也會(huì)自定義RPC 協(xié)議。舉個(gè)例子,就是相當(dāng)于各個(gè)省不但用官方普通話,還都有自己的方言,
    的頭像 發(fā)表于 06-12 15:00 ?2449次閱讀

    RPC如何在遠(yuǎn)程過(guò)程調(diào)用?

    RPC(Remote Procedure Call Protocol)即遠(yuǎn)程過(guò)程調(diào)用,也就是調(diào)用的函數(shù)是在其它的控制板上運(yùn)行的,不需要理會(huì)底層的通訊協(xié)議。
    的頭像 發(fā)表于 02-07 09:52 ?839次閱讀
    <b class='flag-5'>RPC</b>如何在<b class='flag-5'>遠(yuǎn)程</b><b class='flag-5'>過(guò)程</b>中<b class='flag-5'>調(diào)用</b>?

    RPC接口與HTTP接口哪一個(gè)更好?

    HTTP接口和RPC接口都是生產(chǎn)上常用的接口,顧名思義,HTTP接口使用基于HTTP協(xié)議的URL傳參調(diào)用,而RPC接口則基于遠(yuǎn)程過(guò)程調(diào)用
    發(fā)表于 06-13 09:18 ?1105次閱讀
    <b class='flag-5'>RPC</b>接口與HTTP接口哪<b class='flag-5'>一個(gè)</b>更好?

    自定義算子開(kāi)發(fā)

    個(gè)完整的自定義算子應(yīng)用過(guò)程包括注冊(cè)算子、算子實(shí)現(xiàn)、含自定義算子模型轉(zhuǎn)換和運(yùn)行含自定義op模型四
    的頭像 發(fā)表于 04-07 16:11 ?2628次閱讀
    <b class='flag-5'>自定義</b>算子開(kāi)發(fā)

    什么是遠(yuǎn)程過(guò)程調(diào)用

    開(kāi)發(fā)環(huán)境:Ubuntu VS Code 編譯器:g++ 編程語(yǔ)言:C++ 框架源碼下載:GitHub 認(rèn)識(shí)RPC RPC的全稱是遠(yuǎn)程過(guò)程調(diào)用(Remote Procedure Call
    的頭像 發(fā)表于 11-10 10:10 ?905次閱讀
    什么是<b class='flag-5'>遠(yuǎn)程過(guò)程調(diào)用</b>

    Dubbo源碼淺析()—RPC框架與Dubbo

    、什么是RPC 1.1 RPC概念 RPC,Remote Procedure Call 即遠(yuǎn)程過(guò)程調(diào)用,與之相對(duì)的是本地服務(wù)
    的頭像 發(fā)表于 08-16 15:18 ?462次閱讀
    Dubbo源碼淺析(<b class='flag-5'>一</b>)—<b class='flag-5'>RPC</b><b class='flag-5'>框架</b>與Dubbo