当前位置:网站首页>微服務系列--深入理解RPC底層原理與設計實踐

微服務系列--深入理解RPC底層原理與設計實踐

2022-01-15 02:04:51 Danny_idea

在微服務系統當中,各個服務之間進行遠程調用的時候需要考慮各種各樣的場景,例如以下幾種异常情况:

  • 超時調用

  • 失敗重試

  • 服務下線通知

  • 服務上線通知

  • 服務分組

  • 請求隊列

等等…

國內也有一些有先見之明的技術專家們對於這些技術有了較早的認知,因此很早便開始了關於遠程服務調用中間件的開發。慢慢地,一些國內大廠自研的RPC調用框架開始變做了一款產品向市面上去進行推廣。

今年年初的時候,我花了大概一個半月的業餘時間自己打磨了一套RPC框架,通過實踐嘗試後發現,要想真正地落地一款給公司內部使用的RPC框架難度真的超乎想象。本文不會過多地去介紹市面上某一款中間件的底層源代碼是如何執行和編寫的,更多是通過結合一些中間件底層設計的原理來闡述 我自己是如何設計一款RPC框架的。

准備工作

為了寫一款可用的RPC框架,我大概准備了這些技術工作:

  • 閱讀了Dubbo內部的大量源碼設計。

  • 了解關於RPC框架設計的難點和痛點。(這裏主要感謝nx的東哥,上了他的課程之後很多技術點都有了新的理解和感悟)

  • 不斷地實踐和測試。

  • 自己編寫的中間件如何能够優雅地接入Spring容器。

RPC的整體設計思想

起初在設計RPC遠程調用框架的時候,主要的設計思路是采用了經典的生產者-消費者思想。客戶端發送請求,服務端接收之後匹配本地已有的服務方法進行處理執行。

但是在實際到落地過程中卻發現,其中的技術複雜性遠遠超出預期~~

最終結果如下圖所示:

整個項目的包結構整理
在這裏插入圖片描述
客戶端調用:
在這裏插入圖片描述
服務端使用:
在這裏插入圖片描述
ps:這裏面的每個api和設計思路大部分都是模仿了Dubbo框架內部的源代碼設計以及部分自己的改編。

本地代理的設計

為了能够保證遠程方法的調用使用起來和本地方法調用一樣簡單,通常可以使用代理模式去實現。場景的代理模式有好2大類:靜態代理和動態代理,靜態代理需要通過硬編碼的方式實現,不現實,這裏直接不合適。

動態代理主要有以下兩種:

  • JDK代理
  • CGLIB代理

Java給出了動態代理,動態代理具有如下特點:

1.Proxy對象不需要implements接口;
2.Proxy對象的生成利用JDK的Api,在JVM內存中動態的構建Proxy對象。需要使用java.lang.reflect.Proxy類的newProxyInstance接口


public static <T> T getProxy(final Class interfaceClass, ReferenceConfig referenceConfig) {
    

        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{
    interfaceClass}, new InvocationHandler() {
    
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    
                //每次執行目標方法的時候都會回調到這個invoke方法處
                return null;
            }
        });
    }

JDK動態代理要求target對象是一個接口的實現對象,假如target對象只是一個單獨的對象,並沒有實現任何接口,這時候就會用到Cglib代理(Code Generation Library),即通過構建一個子類對象,從而實現對target對象的代理,因此目標對象不能是final類(報錯),且目標對象的方法不能是final或static(不執行代理功能)。


 //給目標對象創建一個代理對象
    public Object getProxyInstance() {
    
        //工具類
        Enhancer en = new Enhancer();
        //設置父類
        en.setSuperclass(target.getClass());
        //設置回調函數
        en.setCallback(this);
        //創建子類代理對象
        return en.create();
    }

    public Object intercept(Object object, Method method, Object[] arg2, MethodProxy proxy) throws Throwable {
    

        System.out.println("before");
        Object obj = method.invoke(target);
        System.out.println("after");
        return obj;
    }

我最終選擇了JDK作為基本的動態代理實現方案,一開始的技術選型並沒有選擇更加完美的方案,而是采用了最為簡單熟悉的技術。

如果讀者感興趣的話,可以閱讀我之前介紹aop原理的文章,內部有詳細講解cglib底層原理的細節。點擊這跳轉

遠程調用的數據傳輸

本地代理設計好了之後,需要考慮如何將數據發送給到服務端的問題了。底層采用的是netty框架,為了避免粘包和拆包的問題,我嘗試使用了ObjectEncoder和ObjectDecoder兩個netty內置的組件。

關於netty內部出現粘包,拆包現象的解决手段,可以細看這篇文章:

https://www.cnblogs.com/rickiyang/p/12904552.html

協議體的內部需要設計哪些字段?
大概整理了一下代碼,基本結構如下所示:

public class IettyProtocol implements Serializable {
    

    private static final long serialVersionUID = -7523782352702351753L;
    /** * 魔數 */
    protected long MAGIC = 0;

    /** * 客戶端的請求id */
    private String requestId;

    /** * netty專屬 */
    private ChannelHandlerContext channelHandlerContext;

    /** * 0請求 1響應 * @see CommonConstants.ReqOrRespTypeEnum */
    protected byte reqOrResp = 0;

    /** * 0需要從服務端返回數據 1不需要從服務端響應數據 */
    protected final byte way = 0;

    /** * 0是心跳時間,1不是心跳事件 */
    private byte event = 0;

    /** * 序列化類型 */
    private String serializationType;

    /** * 狀態 */
    private short status;

    /** * 返回的數據類型格式 */
    private Type type;

    /** * 消息體 請求方發送的函數類型,參數信息都存在這裏, 接收方響應的信息也都存在這裏 */
    private byte[] body;
}

稍微解釋幾個字段:

requestId 客戶端的請求id(用於請求響應做必配使用,下文中會介紹到)

reqOrResp 協議數據包的類型(標示該數據包是請求類型還是響應類型)

type 是指調用該方法的返回數據格式類型 (例如int,String,返回類型在做數據的序列化轉換的時候會非常有用)

body 這裏面是核心重點,主要的調用服務名稱,參數,方法等詳細信息都會先轉換為字節數組,然後再通過網絡將其發送出去。

如何將不同格式的數據轉換為字節數組

數字類型

將數字類型轉換為二進制,在之前的一篇文章中我有寫過詳細的底層實現機制,核心是通過將數字的二進制數右移8比特,然後存入byte數組當中。核心代碼為:

/** * 字節轉成數字 int 大小是4個字節 * * @param bytes * @return */
    public static int byteToInt(byte[] bytes) {
    
        if (bytes.length != 4) {
    
            return 0;
        }
        return (bytes[0]) & 0xff | (bytes[1] << 8) & 0xff00 | (bytes[2] << 16) & 0xff0000 | (bytes[3] << 24) & 0xff000000;
    }

    /** * 數字轉成字節 int 大小是4個字節 * * @param n * @return */
    public static byte[] intToByte(int n) {
    
        byte[] buf = new byte[4];
        for (int i = 0; i < buf.length; i++) {
    
            buf[i] = (byte) (n >> (8 * i));
        }
        return buf;
    }

字符串類型

將字符串轉換為對應的字符數組,然後每個數組的char類型使用asc碼映射為數字,接下來又是回歸到數字轉換的思路上了。

集合,複雜對象類型

這些類型可以嘗試先通過json轉換為字符串,然後再將字符串轉換為char數組,再轉換為數字數組類型,後還是要回歸到數字轉換的思路上。

數據接收與響應設計

早期在做RPC通訊設計的時候,采用的是簡單的生產者消費者模型。下邊給出早期自己在進行實現過程中所思考的一些點:
同步發送數據
在這裏插入圖片描述
這類同步發送設計案例來看,consumer端發送數據之後,consumer會一直處於等待狀態,只有等到數據抵達到provider端並且處理完畢之後,consumer端才會繼續進行下去。

這樣設計的弊端很明顯:
consumer和provider的吞吐量都不高,而且一旦某個接口出現了超時還會影響其他接口的調用堵塞。
consumer端异步發送,provider端异步接收處理
這裏需要引入兩個新的概念,io線程和業務線程。整體設計如下圖所示:
在這裏插入圖片描述
客戶端發送數據的時候,不再是處於等待的狀態,它會只需要將數據放入到一個本地的請求隊列中即可。客戶端的io線程會不斷地嘗試從隊列中取出數據,然後進行網絡發送。服務端也會專門有一個io線程負責接收這類數據,接著將數據放入到服務端的一個隊列緩沖中,然後再交給服務端的業務線程池去慢慢消費掉服務端的緩沖隊列內部的數據。

服務端的核心設計如下:
在這裏插入圖片描述
provider端的數據處理完畢之後該如何正確返回?
為了解决這個問題,我嘗試閱讀了一下Dubbo的底層源代碼,然後借鑒了其中的設計思路進行了一波實現。

客戶端如何接收響應
其核心的本質是客戶端在發送請求到時候會生成一個唯一的requestId,然後客戶端在發送數據之後,會有一個Map集合(key是requestId,value是接口響應值)管理接口響應到數據,客戶端的調用線程在執行了寫入數據到發送隊列之後需要不斷監聽Map集合中對應requestId的value是否有值,如果超過指定時間都沒有數據,那麼就拋出超時异常,如果收到了響應數據則正常返回即可。

服務端返回響應
服務端的本地代碼正常處理完數據之後要將數據寫入一個Map集合中,服務端的io線程會不斷輪訓這份Map集合(key是客戶端發送過來的requestId,value是本地代碼處理完之後寫入的數據),如果發現對應的requestId有寫好的返回數據,就會將其發送給客戶端。

整體設計大概如下圖所示:
在這裏插入圖片描述

過濾器的設計

好了基本的調用鏈路大概是如同上邊的描述給梳理出來了。接下來就是一些擴展功能模塊了。
發送過程中需要做一些裝飾包裝,以及過濾的相關功能。此時就可以采用責任鏈的方式進行設計。
在這裏插入圖片描述
過濾器部分我大概分了兩種類型,一種是消費者使用的過濾器,一種是服務提供者專屬的過濾器。
過濾器部分的設計主要是用了責任鏈的模式實現,這塊比較簡單,不打算做過多介紹了。
在這裏插入圖片描述

延時任務的設計

在微服務調用的中間件中,延時任務是一種經常會使用到的設計,例如在超時重試,定時心跳發送,注册中心發布失敗重試等場景下。其核心的共同點都是在當前時間戳過後的指定時間點執行某個任務。這類設計我看了下JDK內部的Timer和DelayedQueue設計的原理。

常規的JDK 的 java.util.Timer 和 DelayedQueue 等工具類,可實現簡單的定時任務,底層用的是堆數據結構,存取複雜度都是 O(nlog(n)),無法支撐海量定時任務。

而在定時任務量大、性能要求高的場景,為將任務存取及取消操作時間複雜度降為 O(1),會使用時間輪方案。
在自己實現RPC框架中,嘗試使用了時間輪的機制來實現心跳包發送部分。
在這裏插入圖片描述
什麼是時間輪

一種高效批量管理定時任務的調度模型。時間輪一般會實現成一個環形結構,類似一個時鐘,分為很多槽,一個槽代錶一個時間間隔,每個槽使用雙向鏈錶存儲定時任務。指針周期性地跳動,跳動到一個槽比特,就執行該槽比特的定時任務。
在這裏插入圖片描述
Dubbo 的時間輪實現比特於 dubbo-common 模塊的 org.apache.dubbo.common.timer 包中,如果感興趣的朋友可以深入閱讀下內部的源代碼設計與實現。

注册中心的引入

為了能够保證服務發布之後及時通知到各個服務的調用方,注册中心的設計必不可少。除此之外,注册中心的角色還能够較好地協調各個微服務調用之間的一些配置參數,例如權重,分組,版本隔離等等屬性。

在自己進行實現落地的過程中,我選擇了zookeeper作為默認的注册中心。為了方便後期的擴展,也是參考了Dubbo內部關於注册中心的實現思路,通過一個Registry的接口抽象,隨機擴展了一些模版類等等。大概的設計如下圖所示:
在這裏插入圖片描述
整體的服務注册接口代碼如下:

public interface RegistryService {
    


    /** * 注册url * * 將dubbo服務寫入注册中心節點 * 當出現網絡抖動的時候需要進行適當的重試做法 * 注册服務url的時候需要寫入持久化文件中 * * @param url */
    void register(URL url);

    /** * 服務下線 * * 持久化節點是無法進行服務下線操作的 * 下線的服務必須保證url是完整匹配的 * 移除持久化文件中的一些內容信息 * * @param url */
    void unRegister(URL url);

    /** * 消費方訂閱服務 * * @param urlStr * @param providerServiceName */
    void subscribe(String urlStr,String providerServiceName);

    /** * 更新節點屬性之後通知這裏 * * @param url */
    void doSubscribeAfterUpdate(URL url);


    /** * 新增節點之後通知這裏 * * @param url */
    void doSubscribeAfterAdd(URL url);


    /** * 執行取消訂閱內部的邏輯 * * @param url */
    void doUnSubscribe(URL url);
}

為了預防注册中心掛了之後,服務無法進行通信,每個通信節點都會將zk的服務注册節點信息提前預先持久化到本地進行暫存一份數據,從而保證一個服務的可用性。
在這裏插入圖片描述
在這裏插入圖片描述

負載均衡策略的實現

在集群進行調用的時候,不可避免會有負載均衡的問題,這塊的設計邏輯我參考了Dubbo的設計思路將其通過spi加載組件的方式進行框架的注入。

統一抽取了一個叫做LoadBalance的接口,然後底層實現了具體的負載均衡策略:

public class WeightLoadBalance implements LoadBalance {
    

    public static Map<String, URL[]> randomWeightMap = new ConcurrentHashMap<>();

    public static Map<String, Integer> lastIndexVisitMap = new ConcurrentHashMap<>();

    @Override
    public void doSelect(Invocation invocation) {
    
        URL[] weightArr = randomWeightMap.get(invocation.getServiceName());
        if (weightArr == null) {
    
            List<URL> urls = invocation.getUrls();
            Integer totalWeight = 0;
            for (URL url : urls) {
    
                //weight如果設置地過大,容易造成內存占用過高情况發生,所以weight統一限制最大大小應該為100
                Integer weight = Integer.valueOf(url.getParameters().get("weight"));
                totalWeight += weight;
            }
            weightArr = new URL[totalWeight];
            RandomList<URL> randomList = new RandomList(totalWeight);
            for (URL url : urls) {
    
                int weight = Integer.parseInt(url.getParameters().get("weight"));
                for (int i = 0; i < weight; i++) {
    
                    randomList.randomAdd(url);
                }
            }
            int len = randomList.getRandomList().size();
            for (int i = 0; i < len; i++) {
    
                URL url = randomList.getRandomList().get(i);
                weightArr[i] = url;
            }
            randomWeightMap.put(invocation.getServiceName(), weightArr);
        }
        Integer lastIndex = lastIndexVisitMap.get(invocation.getServiceName());
        if (lastIndex == null) {
    
            lastIndex = 0;
        }
        if (lastIndex >= weightArr.length) {
    
            lastIndex = 0;
        }
        URL referUrl = weightArr[lastIndex];
        lastIndex++;
        lastIndexVisitMap.put(invocation.getServiceName(), lastIndex);
        invocation.setReferUrl(referUrl);
    }

}

這裏面的負載均衡實現手段並不是實時計算的思路,而是提前隨機算好一組調用順序,然後每次請求的時候按照這個已經具備隨機性的數組進行挨個輪訓發送服務調用。

這樣可以避免每次請求過來都需要進行機器實時篩選計算的性能開銷。

SPI擴展機制的設計

其實Spi的加載實現部分的關鍵就是將一份配置文件按照規定格式寫好,然後通過某個loader對象將配置文件內部的每個類都提前加載到一份Map中進行管理。

下邊我給出一份自己手寫的簡單案例,但是不包含自適應spi加載和spi內部自動依賴注入的功能。

public class ExtensionLoader {
    

    /** * 存儲擴展spi的map,key是spi文件裏面寫入的key */
    private static Map<String, Class<?>> extensionClassMap = new ConcurrentHashMap<>();

    private static final String EXTENSION_LOADER_DIR_PREFIX = "META-INF/ietty/";

    public static  Map<String, Class<?>> getExtensionClassMap(){
    
        return extensionClassMap;
    }

    public void loadDirectory(Class clazz) throws IOException {
    
        synchronized (ExtensionLoader.class){
    
            String fileName = EXTENSION_LOADER_DIR_PREFIX + clazz.getName();
            ClassLoader classLoader = this.getClass().getClassLoader();
            Enumeration<URL> enumeration = classLoader.getResources(fileName);
            while (enumeration.hasMoreElements()) {
    
                URL url = enumeration.nextElement();
                InputStreamReader inputStreamReader = new InputStreamReader(url.openStream(), "utf-8");
                BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
                String line;
                while ((line = bufferedReader.readLine()) != null) {
    
                    if(line.startsWith("#")){
    
                        continue;
                    }
                    String[] keyClassInstance = line.split("=");
                    try {
    
                        extensionClassMap.put(keyClassInstance[0],Class.forName(keyClassInstance[1],true,classLoader));
                    } catch (ClassNotFoundException e) {
    
                        e.printStackTrace();
                    }

                }
            }
        }
    }

    public static <T>Object initClassInstance(String className) {
    
        if(extensionClassMap!=null && extensionClassMap.size()>0){
    
            try {
    
                return (T)extensionClassMap.get(className).newInstance();
            } catch (InstantiationException e) {
    
                e.printStackTrace();
            } catch (IllegalAccessException e) {
    
                e.printStackTrace();
            }
        }
        return null;
    }

}

底層通信組件

整套RPC框架的底層部分是采用了Netty組件進行實現的,主要的寫法其實和通用的netty編程沒有太大的差別,這裏我簡單貼出下代碼截圖吧:

客戶端:
在這裏插入圖片描述
服務端:
在這裏插入圖片描述

小結

可能整篇文章寫下來,很多的技術細節點和實現方式因為篇幅問題不能很好的展示出來。但是整體設計的幾個大難點以及難點的解决思路都基本貼出來了,希望能够對你有一定的啟發。

整個基礎中間件寫下來之後感覺頭發掉了不少,因為底層的細節點實在是太多了,不管是結構設計,數據並發問題,异步處理設計等諸多都需要考慮,所以感覺這是一件非常具有綜合挑戰性的事情。

版权声明
本文为[Danny_idea]所创,转载请带上原文链接,感谢
https://chowdera.com/2022/01/202201080559030021.html

随机推荐