VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > 编程开发 > Java教程 >
  • 从零开始实现简单 RPC 框架 1:RPC 框架的结构和设计

前言

RPC 框架是后端攻城狮永远都绕不开的知识点,目前业界比较知名有 DubboSpring Cloud 等。很多人都停留在了只会用的阶段,作为程序猿,拥有好奇心深入学习,才能有效提高自己的竞争力。再进一层的同学,会去翻源码,看功能是如何实现的,这是很好的开始。看源码过一段时间容易忘记,我觉得看完源码之后,更好的做法是自己动手开发一个出来,这样你对框架的理解会更深。我认为,"会用"、"会读源码"、"会写出来"是完全不一样的水平。
本系列 "造轮子系列之RPC",手把手教大家如何打造自己的RPC框架。
以下是我个人写的简单版 RPC 框架 ccx-rpc 的源码,欢迎 Star、Fork。水平有限,大家有更好的想法可以提出来。
Github:https://github.com/chenchuxin/ccx-rpc
Gitee:https://gitee.com/imccx/ccx-rpc

RPC 框架的结构

一个最简单的 RPC 框架分成三个部分:注册中心、服务端、客户端。以下是一个最简单的结构流程图。
RPC框架最简单的结构
组成部分:

  1. 注册中心:用于注册和获取服务。
  2. 服务端:指提供服务的一方,也叫服务提供方 Provider
  3. 客户端:指调用服务的一方,也叫服务消费者 Consumer

流程:

  1. 服务端把服务信息注册到注册中心,通常包含服务端地址、接口类和方法
  2. 客户端从注册中心获取对应服务的信息
  3. 客户端根据服务的信息,通过网络调用到服务端的接口

RPC 框架的设计

上面的流程有很多细节没有画出来,例如:

  1. 服务端以什么形式注册到注册中心?
  2. 客户端是怎么做到像调用接口一样调用服务?
  3. 调用服务的网络协议是怎样的?

一个基本的 RPC 框架,需要包含以下部分:

  1. 注册中心:注册中心负责服务信息的注册与查找。服务端在启动的时候,扫描所有的服务,然后将自己的服务地址和服务名注册到注册中心。客户端在调用服务之前,通过注册中心查找到服务的地址,就可以通过服务的地址调用到服务啦。常见的注册中心有 ZookeeperEureka 等。
  2. 动态代理:客户端调用接口,需要框架能自己根据接口去远程调用服务,这一步是用户无感知的。这样一来,就需要使用到动态代理,用户调用接口,实际上是在调用动态生成的代理类。常见的动态代理有:JDK ProxyCGLibJavassist 等。
  3. 网络传输:RPC 远程调用实际上就是网络传输,所以网络传输是 RPC 框架中必不可少的部分。网络框架有 Java NIONetty 框架等。
  4. 自定义协议:网络传输需要制定好协议,一个良好的协议能提高传输的效率。
  5. 序列化:网络传输肯定会涉及到序列化,常见的序列化有JsonProtostuffKyro 等。
  6. 负载均衡:当请求调用量大的时候,需要增加服务端的数量,一旦增加,就会涉及到符合选择服务的问题,这就是负载均衡。常见的负载均衡策略有:轮询、随机、加权轮询、加权随机、一致性哈希等等。
  7. 集群容错:当请求服务异常的时候,我们是应该直接报错呢?还是重试?还是请求其他服务?这个就是集群容错策略啦。

代码实现概览#

下面我们从代码的角度上,来看看以上几部分是如何组织的:

服务注册、监听#

1. 扫描服务#

服务要注册到注册中心,第一步是需要扫描到需要注册的接口。
我们通过 Spring 的 BeanPostProcessor#postProcessBeforeInitialization,将带有 @RpcService 注解的接口进行发布。

@Component
public class ServiceBeanPostProcessor implements BeanPostProcessor {
    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        RpcService rpcService = bean.getClass().getAnnotation(RpcService.class);
        // rpc 服务发布到注册中心
        if (rpcService != null) {
            RegistryFactory registryFactory = ExtensionLoader.getLoader(RegistryFactory.class).getAdaptiveExtension();
            RegistryConfig registryConfig = ConfigManager.getInstant().getRegistryConfig();
            Registry registry = registryFactory.getRegistry(registryConfig.toURL());
            registry.register(buildServiceURL(bean, rpcService));
        }
        return bean;
    }
}

2. 注册中心#

服务注册最终的表现就是:把服务信息注册到注册中心中。
根据注册中心的特性,可以抽出一个接口 Registry ,包含了注册、取消注册、查找服务的方法。
通过实现 Registry 接口,可以扩展出多种类型的注册中心。

public interface Registry {

    /**
     * 向注册中心注册服务
     */
    void register(URL url);

    /**
     * 向注册中心取消注册服务
     */
    void unregister(URL url);

    /**
     * 查找注册的服务
     */
    List<URL> lookup(URL condition);
}

3. 监听#

RPC 的请求响应本质上是网络请求,作为服务方,需要开启端口监听客户端的请求。
Netty 是目前最流行的网络开发框架。

@Component
public class NettyServerBootstrap {

    public void start() {
        ShutdownHook.addShutdownHook();
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        DefaultEventExecutorGroup serviceHandlerGroup = new DefaultEventExecutorGroup(
                RuntimeUtil.getProcessorCount() * 2,
                ThreadUtil.newNamedThreadFactory("service-handler-group", false)
        );
        try {
            ServerBootstrap bootstrap = new ServerBootstrap()
                    .group(bossGroup, workerGroup)
                    .channel(Epoll.isAvailable() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline p = ch.pipeline();
                            // 编解码器
                            p.addLast(new RpcMessageEncoder());
                            p.addLast(new RpcMessageDecoder());
                            // RPC 消息处理器
                            p.addLast(serviceHandlerGroup, new NettyServerHandler());
                        }
                    });
            // 绑定端口,同步等待绑定成功
            ServiceConfig serviceConfig = ConfigManager.getInstant().getServiceConfig();
            ChannelFuture channelFuture = bootstrap.bind(NetUtil.getLocalHostName(), serviceConfig.getPort()).sync();
            log.info("server start success. port=" + serviceConfig.getPort());
            // 等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
        } catch (Exception ex) {
            log.error("shutdown bossGroup and workerGroup");
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

客户端发现、请求#

1. 扫描#

客户端要是用 RPC 接口,首先要用 @RpcReference 注解标出。
通过 Spring 的 BeanPostProcessor#postProcessAfterInitialization 初始化 Bean 之后,生成代理类。
调用接口的时候,这个代理类,就会在背地里偷偷找到服务,并请求到结果返回。

public class ServiceBeanPostProcessor implements BeanPostProcessor {
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        Field[] fields = bean.getClass().getDeclaredFields();
        for (Field field : fields) {
            RpcReference rpcReference = field.getAnnotation(RpcReference.class);
            if (rpcReference != null) {
                // 生成代理对象
                RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcReference);
                Object proxy = rpcClientProxy.getProxy(field.getType());
                field.setAccessible(true);
                try {
                    // 设置字段
                    field.set(bean, proxy);
                } catch (IllegalAccessException e) {
                    log.error("field.set error. bean={}, field={}", bean.getClass(), field.getName(), e);
                }
            }
        }
        return bean;
    }
}

2. 服务发现#

客户端要请求服务,首先需要找到服务对应的域名/IP 和 端口,这个过程就是服务发现。
服务发现就是从注册中心找到对应服务的地址,上面注册中心的接口有提供对应的方法。

public interface Registry {
	// ... 省略其他代码

    /**
     * 查找注册的服务
     */
    List<URL> lookup(URL condition);
}

3. 负载均衡#

从注册中心找到的地址可能是多个,那我们如何从多个地址中选择一个地址,这就是负载均衡。
负载均衡抽象出一个接口 LoadBalance ,方法只有一个,就是选择 select

public interface LoadBalance {

    /**
     * 选择
     *
     * @param candidateUrls 候选的 URL
     * @param request       请求
     * @return 选择的 URL
     */
    URL select(List<URL> candidateUrls, RpcRequest request);
}

使用方法如下:

// 注册中心拿出所有服务的信息
List<URL> urls = registry.lookup(url);
// 通过负载均衡选出一个地址
URL selected = loadBalance.select(urls, request);

4. 集群容错#

当请求服务失败之后,应该如何处理?重试?快速失败?这个就是集群容错策略啦。我们来简单看一下重试策略吧。

public class RetryInvoker extends AbstractFaultTolerantInvoker {

    /**
     * 默认重试次数
     */
    private static final Integer DEFAULT_RETRY_TIMES = 3;

    @Override
    protected RpcResult doInvoke(RpcRequest request, Invoker invoker, List<URL> candidateUrls, LoadBalance loadBalance) throws RpcException {
        // 获取重试次数
        int retryTimes = Optional.ofNullable(clusterConfig.getRetryTimes()).orElse(DEFAULT_RETRY_TIMES);
        RpcException rpcException = null;
        for (int i = 0; i < retryTimes; i++) {
            try {
                // 执行,如果成功则返回结果,失败继续尝试
                RpcResult result = invoker.invoke(request);
                if (result.isSuccess()) {
                    return result;
                }
            } catch (RpcException ex) {
                log.error("invoke error. retry times=" + i, ex);
                rpcException = ex;
            }
        }
        if (rpcException == null) {
            rpcException = new RpcException("invoker error. request=" + request);
        }
        throw rpcException;
    }
}

网络传输#

1. 序列化#

网络传输不可获取的就是序列化,序列化就是怎么把一个对象的状态信息转化为可以存储或传输的形式的过程。我们常见的序列化方式有JSONProtobuf等等。
序列化和反序列化是一对,共同组成序列化器。

public interface Serializer {

    /**
     * 序列化
     *
     * @param object 要序列化的对象
     * @return 字节数组
     */
    byte[] serialize(Object object);

    /**
     * 反序列化
     *
     * @param bytes 字节数组
     * @param clazz 要反序列化的类
     * @param <T>   类型
     * @return 反序列化的对象
     */
    <T> T deserialize(byte[] bytes, Class<T> clazz);
}

2. 自定义协议#

网络传输中,收发两端如何正确解析请求,统一的协议是必不可少的。
在 Netty 中的表现就是编码解码器 codec。下面是 ccx-rpc 的自定义协议。可以简单看一下,后面再仔细讲解哈。

0     1     2       3    4    5    6    7           8        9        10   11   12   13   14   15   16   17   18
+-----+-----+-------+----+----+----+----+-----------+---------+--------+----+----+----+----+----+----+----+---+
|   magic   |version|    full length    |messageType|serialize|compress|              RequestId               |
+-----+-----+-------+----+----+----+----+-----------+----- ---+--------+----+----+----+----+----+----+----+---+
|                                                                                                             |
|                                         body                                                                |
|                                                                                                             |
|                                        ... ...                                                              |
+-------------------------------------------------------------------------------------------------------------+
2B magic(魔法数)
1B version(版本)
4B full length(消息长度)
1B messageType(消息类型)
1B serialize(序列化类型)
1B compress(压缩类型)
8B requestId(请求的Id)
body(object类型数据)

总结

RPC 的组成包括: 注册中心、动态代理、网络传输、自定义协议、序列化、负载均衡、集群容错等等。
想要深入了解,先要知道他们是怎么组合运作的,其简单的运作都在上面提到了。
更加细节的代码实现,将会在接下来的文章中详细讲解,希望各位赏脸一看。

作者:小新是也

出处:https://www.cnblogs.com/chenchuxin/p/15116332.html

版权:本作品采用「署名-非商业性使用-相同方式共享 4.0 国际」许可协议进行许可。



相关教程