Coup de Grace

新浪的rpc框架motan源码解读

想上手用一下发现最近手头没有集群用…是时候搞个pi cluster了.

所以欢迎看客到about页面找到微博私信我捐套房子什么的.我好攒钱买pi cluster.

项目地址:motan


模块依赖

我们可以看到demo-api里面有一个简单的接口,module -> show dependencies应该就可以轻易得到项目的模块依赖


絮絮叨叨

为什么我把这节放在第二部分说呢?因为初见者可能看到这节就感觉没意思Command+W了…

RPC框架最简易模型不外乎这几样东西:

涉及到细碎的点:

以上的部分是触手可及的大家能想到的部分,本篇有时间的话我会挑几个点念叨一下,没写的话那我可能是因为最近的腹泻耽误了宝贵的时间(斜眼笑..

普通文章一般会随着demo跟着代码调用进去,我的习惯一般不是这样.

个人来讲我是从工具类开始翻,翻完独立的部分再把粘合的部分一看就ok.

我们大体知道了实现思路之后按着module和文档进行代码阅读.


core-module

总pom managementd的部分全被core module依赖了,包括hessian/fastjson/metrics等.


com.weibo.api.motan.core.extension

定义了@Spi与@SpiMeta这样的注解,形如Spring bean一样的定义,内含一个scope,作用是开发用接口,对应api与元数据定义.

这样复杂的话,那么就前者是接口,后者是基本实现.

ExtensionLoader类跟日常的加载器一样,依照一定的规则读取package名+class名,之后使用指定的classloader加载.之后把实例跟id对应起来,存在Concurrent*Map中.

上面的部分在motan中是这样实现的:

读取的规则:

写死在ExtensionLoader中某个文件夹prefix,内含如下的部分,文件内是待load的类的完整package.

加载的部分:

实现一如SpringFactoriesLoader.


com.weibo.api.motan.rpc

定义了一些接口对应RPC的模型

public interface Node {
    void init();
    void destroy();
    boolean isAvailable();
    String desc();
    URL getUrl();
}

在基本属性上实现了上图的部分,看名字大家应该就知道大概意思.

好奇的话我们看一下AbstractProvider,较之Node接口它新增了维护实例内method列表的部分.而实现类DefaultProvider实现了invoke方法,负责调用与response构造,设置了res基本value与附件.

上图中AbstractReferer跟AbstractExpoter则代表着服务发布与引用的抽象,二者的实现都封装在protocol层面成为了内部类,以一个Factory的形式提供出来.

于是我们逃离了这个package.


com.weibo.api.motan.transport

用来进行远程通信,默认使用Netty nio的TCP长链接方式。

抽象了具有调用地址的client与server对象,其中client还具有心跳检测的功能,模拟真实request来实现服务可用性的检测.

support包中包含着EndpointFactory进行server与client的创建,内部维护着server列表,这是我们就可以回忆起rpc调用的分层:

本次实现简化了一下并没有记录角色,而是给出了客户端的计数.

server中msgHandler对于心跳检测的情况进行了一次包装,使response的value具有一些标志.

client的创建则是一个Netty的Bootstrap,附上了创建时HeartbeatClientEndpointManager对其心跳进行管理,置入线程池进行调用.


com.weibo.api.motan.protocol

用来进行RPC服务的描述和RPC服务的配置管理,这一层还可以添加不同功能的filter用来完成统计、并发限制等功能。

@Spi(scope = Scope.SINGLETON)
public interface Protocol {
    /**
     * 暴露服务
     */
    <T> Exporter<T> export(Provider<T> provider, URL url);
    /**
     * 引用服务
     */
    <T> Referer<T> refer(Class<T> clz, URL url, URL serviceUrl);
    void destroy();
}

此处分为injvm的本地服务调用跟rpc的远程调用.injvm的形式实际就是直接操作AbstractProtocol内的map来获取实例.

有代表性的InjvmReferer实现如下:

我们直接看rpc的部分

endpointFactory = ExtensionLoader.getExtensionLoader(EndpointFactory.class).getExtension(url.getParameter(URLParamType.endpointFactory.getName(),URLParamType.endpointFactory.getValue()));

server = endpointFactory.createServer(url, requestRouter);

server(extends AbstractReferer)通过上文的spi加载进行创建,同时创建了map维护端口与服务的对应关系,kv分别是端口与服务路由器

client(extends AbstractReferer)的创建同理

ProviderMessageRouter requestRouter = null;
String ipPort = url.getServerPortStr();

ProviderMessageRouter中根据 group/interface/version 来唯一标示一个服务,而后通过provider.call来进行服务实例方法的调用.

而传输前的filter在support包中,我们可以看到已有的filter如下:

我们也可以通过ProtocolFilterDecorator来进行拓展,可以看到内部获取filter的实现是这样的:

// load default filters
List<Filter> filters = new ArrayList<Filter>();
List<Filter> defaultFilters = ExtensionLoader.getExtensionLoader(Filter.class).getExtensions(key);
if (defaultFilters != null && defaultFilters.size() > 0) {
    filters.addAll(defaultFilters);
}

// add filters via "filter" config
String filterStr = url.getParameter(URLParamType.filter.getName());
if (StringUtils.isNotBlank(filterStr)) {
    String[] filterNames = MotanConstants.COMMA_SPLIT_PATTERN.split(filterStr);
    for (String fn : filterNames) addIfAbsent(filters, fn);
}

于是这两个package完成,我们随着流程图进入register与cluster的部分


com.weibo.api.motan.cluster

Client端使用的模块,cluster是一组可用的Server在逻辑上的封装,包含若干可以提供RPC服务的Server,实际请求时会根据不同的高可用与负载均衡策略选择一个可用的Server发起远程调用。

在进行RPC请求时,Client通过代理机制调用cluster模块,cluster根据配置的HA和LoadBalance选出一个可用的Server,通过serialize模块把RPC请求转换为字节流,然后通过transport模块发送到Server端。

private HaStrategy<T> haStrategy;//高可用策略
private LoadBalance<T> loadBalance;//负载均衡算法
private List<Referer<T>> referers;//对服务的依赖
private AtomicBoolean available = new AtomicBoolean(false);
private URL url;//自身节点地址

@Override
void init();
void setUrl(URL url);
void setLoadBalance(LoadBalance<T> loadBalance);
void setHaStrategy(HaStrategy<T> haStrategy);
void onRefresh(List<Referer<T>> referers);
List<Referer<T>> getReferers();
LoadBalance<T> getLoadBalance();

作为集群节点来讲它具有这样的属性与方法,属性部分我加上了注释.

负载均衡算法的部分它有着如下的实现,经典的随机啊,轮询啊,一致hash啊,权重啊都有.

Localfirst相对来讲比较适合单机开发测试使用了我觉得.

比较期待的最低负载之类的耦合比较高的并没有给出默认实现.

LoadBalance中比较重要的方法就两个

这里我们拿比较典型的一致性哈希的select进行举例:

@Override
protected Referer<T> doSelect(Request request) {
    int hash = getHash(request);
    Referer<T> ref;
    for (int i = 0; i < getReferers().size(); i++) {
        ref = consistentHashReferers.get((hash + i) % consistentHashReferers.size());
        if (ref.isAvailable()) return ref;
    }
    return null;
}

private int getHash(Request request) {
    if (request.getArguments() == null || request.getArguments().length == 0) return request.hashCode();
    else return Arrays.hashCode(request.getArguments());
}

很简单是不是..

高可用策略方面默认实现了failfast与failover,亟待开发在此基础上拓展跟业务结合更深的策略出来.

此处的failover是hold住了调用的时候所有符合条件的服务实例,超过重试次数了再换一个去试.比较核心的代码如下:

for (int i = 0; i <= tryCount; i++) {
    Referer<T> refer = referers.get(i % referers.size());
    try {
        request.setRetries(i);
        return refer.call(request);
    } catch (RuntimeException e) {
        // 对于业务异常,直接抛出
        if (ExceptionUtil.isBizException(e)) throw e;
        else if (i >= tryCount) throw e;
        LoggerUtil.warn(xxx));
    }
}

也是干净清爽恰到好处.


com.weibo.api.motan.registry

用来和注册中心进行交互,包括注册服务、订阅服务、服务变更通知、服务心跳发送等功能;

我们通过常识可知需要点对点调用,心跳机制来保证可用性,服务发现之类的功能.

下面来瞄一下以上部分的实现.

面向服务端与客户端我们有这样的需求:

抽象成接口就变成了DiscoveryService与RegistryService

而注册中心基于基于zk和consul有两套实现在其他module中,此处是抽象,我们后文再谈.

AbstractRegistry实现了上述全部的接口,作为一个模板方法扩展了如下部分供zk跟consul实现节点发现与配置推送共享,供接口内方法调用:

protected abstract void doRegister(URL url);
protected abstract void doUnregister(URL url);
protected abstract void doSubscribe(URL url, NotifyListener listener);
protected abstract void doUnsubscribe(URL url, NotifyListener listener);
protected abstract List<URL> doDiscover(URL url);
protected abstract void doAvailable(URL url);
protected abstract void doUnavailable(URL url);

相对比较核心的实现:

protected void notify(URL refUrl, NotifyListener listener, List<URL> urls) {
    Map<String, List<URL>> nodeTypeUrlsInRs = new HashMap<String, List<URL>>();
    for (URL surl : urls) {
        //此处我们node type有service服务方与referer订阅方两种,此处都是作为订阅者出现    
        String nodeType = surl.getParameter(URLParamType.nodeType.getName(), URLParamType.nodeType.getValue());
        List<URL> oneNodeTypeUrls = nodeTypeUrlsInRs.get(nodeType);
        if (oneNodeTypeUrls == null) {
            nodeTypeUrlsInRs.put(nodeType, new ArrayList<URL>());
            oneNodeTypeUrls = nodeTypeUrlsInRs.get(nodeType);
        }
        oneNodeTypeUrls.add(surl);
    }
    Map<String, List<URL>> curls = subscribedCategoryResponses.get(refUrl);
    if (curls == null) {
        subscribedCategoryResponses.putIfAbsent(refUrl, new ConcurrentHashMap<String, List<URL>>());
        curls = subscribedCategoryResponses.get(refUrl);
    }

    // refresh local urls cache
    for (String nodeType : nodeTypeUrlsInRs.keySet()) {
        curls.put(nodeType, nodeTypeUrlsInRs.get(nodeType));
    }

    for (List<URL> us : nodeTypeUrlsInRs.values()) {
        //向注册中心通知当前服务地址列表
        listener.notify(getUrl(), us);
    }
}

我不知道是不是所有框架都是这么实现的…这部分我读起来不太舒服. 回头有时间会看看dubbo.


com.weibo.api.motan.serialize

封装了fastjson跟hessian2,这部分就不提了.


springsupport-module

这部分之前我没有实现过,所以短平快的介绍一下顺便自己摘抄一下

通过继承NamespaceHandlerSupport获得registerBeanDefinitionParser的能力,通过自定义parser解析配置文件将属性灌入bean并维护在容器中,完成框架的启动.

闲话,所谓的优雅关闭是不是都是用shutdownhook实现的…


etc

motan manager中则是web控制台,操作service完成一些黑话版(导流啊,降级啊之类的)的功能,此处不做介绍了.依赖其他中间件的部分也暂且不提.

鉴于手头没有集群,我的13 late的机器多跑几个docker容器也要爆炸,姑且还没有实战去使用一下.

或许以后会进行一次配置,顺便熟悉一下consul的使用.

完结撒花

以上.