基于Netty的高性能RPC框架Nifty(一)

服务端启动全解析 · · 99 次点击 · · 开始浏览    

1. 前言

Thrift是Facebook贡献给apache的rpc框架,但是这款框架的java版本在公司内部并不是那么受待见,因为其性能相比C++版本差了很多,但是后续基于netty重写了以后性能得到了极大的提升,相比于C++版本已经差距不大了。为此取了个新的名字Nifty = Netty + Thrift。

如果你使用过thrift的话,基本都会使用自动生成的代码,那真的是没法看,即使定义一个简单的类都会生成巨多的代码,把read,write方法全部写到里面去了。总之早期的thrfit各方面都似乎不那么友好。后面架构进行了升级,提供了新的swift库,注意这个不是ios的swift,从而生成的java类和普通的java类基本一致,无非多了点注解,而序列化反序列化也都移到了相应的包中,从而使得我们的代码非常简洁易懂。

其实这款rpc框架的性能是非常不错的,早几年性能是好过grpc的,目前小米还是在用的。这款框架很轻量,即不提供服务治理的功能。如果公司规模不大急需做功能,暂时没精力去做服务治理的话可能还是会选择dubbo等带服务治理功能的rpc框架。但是恰恰是thrift不提供服务治理,这样公司可以自己去定义服务治理的功能。

目前不管是书籍还是博客等关于thrift的都是少之又少,最近突然爱学习了,所以打算写一系列thrift相关的博客,关于使用基本不会介绍的过多,因为基本使用也就十几行代码,主要是介绍内部处理逻辑。具体的包括Thrift框架分析,netty框架分析,分布式服务治理等三个方面。

为了方便后续统称为Thrift而不是Nifty,因为很多代码还是沿用的Thrift。

2. Thrfit服务端创建与核心组件介绍

EchoServiceImpl service = new EchoServiceImpl();
ThriftCodecManager manager = new ThriftCodecManager();
ThriftServiceProcessor processor = new ThriftServiceProcessor(manager, ImmutableList.of(), service);
ThriftServer server = new ThriftServer(processor, new ThriftServerConfig().setPort(8081));
server.start();
复制代码

其中EchoServiceImpl是使用swift代码生成器生成的接口的实现类,有个echo方法,简单说下这几个组件。

  1. ThriftCodecManager:编解码器的管理类,会将各类编解码器,比如StringCodec添加到缓存中。
  2. ThriftServiceProcessor:thrift服务处理器,服务端收到数据后,最终将由这个类来进行处理,可以理解为最核心的类了。
  3. ThriftServer: thrift服务器,主要用来设置参。启动服务,说具体点就是设置好netty的一些处理器等参数,然后启动netty服务。
  4. ThriftServerConfig: Thrfit服务启动的一些配置类,包括了端口号,线程池,线程数量等。

3. 服务端启动流程极简介绍

从上面的组件分析是不是能猜到一点thrift的内部处理流程。简单两句话就是,创建各类自定义处理器handler,添加到netty的处理器集合中,然后启动netty服务。当收到客户端发来的数据后,交由特定处理器进行数据的处理,根据协议和编解码器从buffer中进行数据的解析和转换,最终得到类名,方法的参数和方法名等(各类信息都能解析到);从ThriftServicProcessor查到Method,传入方法参数,反射执行得到结果,然后将结果通过netty响应给客户端。

4. 服务端启动全解析

先从ThriftServer创建和启动来看,了解总体的流程,后续再回过头来看各个组件的处理流程。

ThriftServer server = new ThriftServer(processor, new ThriftServerConfig().setPort(8081));

public ThriftServer(NiftyProcessor processor, ThriftServerConfig config){
    this(processor, config, new NiftyTimer("thrift"));
}
复制代码

继续往下,追到最终的构造方法

public ThriftServer(
        final NiftyProcessor processor,
        ThriftServerConfig config,
        @ThriftServerTimer Timer timer,
        Map<String, ThriftFrameCodecFactory> availableFrameCodecFactories,
        Map<String, TDuplexProtocolFactory> availableProtocolFactories,
        @ThriftServerWorkerExecutor Map<String, ExecutorService> availableWorkerExecutors,
        NiftySecurityFactoryHolder securityFactoryHolder){
    
    NiftyProcessorFactory processorFactory = new NiftyProcessorFactory(){
        @Override
        public NiftyProcessor getProcessor(TTransport transport)
        {
            return processor;
        }
    };

    String transportName = config.getTransportName();
    String protocolName = config.getProtocolName();

    checkState(availableFrameCodecFactories.containsKey(transportName), "No available server transport named " + transportName);
    checkState(availableProtocolFactories.containsKey(protocolName), "No available server protocol named " + protocolName);

    configuredPort = config.getPort();

    workerExecutor = config.getOrBuildWorkerExecutor(availableWorkerExecutors);

    acceptorExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-acceptor-%s").build());
    acceptorThreads = config.getAcceptorThreadCount();
    ioExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-io-%s").build());
    ioThreads = config.getIoThreadCount();

    serverChannelFactory = new NioServerSocketChannelFactory(new NioServerBossPool(acceptorExecutor, acceptorThreads, ThreadNameDeterminer.CURRENT),
                                                             new NioWorkerPool(ioExecutor, ioThreads, ThreadNameDeterminer.CURRENT));

    ThriftServerDef thriftServerDef = ThriftServerDef.newBuilder()
                                                     .name("thrift")
                                                     .listen(configuredPort)
                                                     .limitFrameSizeTo((int) config.getMaxFrameSize().toBytes())
                                                     .clientIdleTimeout(config.getIdleConnectionTimeout())
                                                     .withProcessorFactory(processorFactory)
                                                     .limitConnectionsTo(config.getConnectionLimit())
                                                     .limitQueuedResponsesPerConnection(config.getMaxQueuedResponsesPerConnection())
                                                     .thriftFrameCodecFactory(availableFrameCodecFactories.get(transportName))
                                                     .protocol(availableProtocolFactories.get(protocolName))
                                                     .withSecurityFactory(securityFactoryHolder.niftySecurityFactory)
                                                     .using(workerExecutor)
                                                     .taskTimeout(config.getTaskExpirationTimeout())
                                                     .withSecurityFactory(config.getSecurityFactory())
                                                     .withHeader(config.getHeader())
                                                     .withServerHandler(config.getNiftyServerHandler())
                                                     .build();

    NettyServerConfigBuilder nettyServerConfigBuilder = NettyServerConfig.newBuilder();

    nettyServerConfigBuilder.getServerSocketChannelConfig().setBacklog(config.getAcceptBacklog());
    nettyServerConfigBuilder.setBossThreadCount(config.getAcceptorThreadCount());
    nettyServerConfigBuilder.setWorkerThreadCount(config.getIoThreadCount());
    nettyServerConfigBuilder.setTimer(timer);

    NettyServerConfig nettyServerConfig = nettyServerConfigBuilder.build();

    transport = new NettyServerTransport(thriftServerDef, nettyServerConfig, allChannels);
}
复制代码

构造方法简单来说主要做了两件事:

  1. 创建和获取netty需要的连接线程池和io处理线程池以及数量,从而构建netty服务组件NioServerSocketChannelFactory
  2. 构建netty服务的配置类nettyServerConfig, thrift服务的配置类ThriftServerDef

我们再来看下细节点的东西,思考一些问题,如果不感兴趣可以跳过。

工作线程池workerExecutor的创建:

workerExecutor = config.getOrBuildWorkerExecutor(availableWorkerExecutors);
复制代码

默认传进来的availableWorkerExecutors是空的,所以最终是构建一个新的线程池,

最终调用的方法是makeDefaultWorkerExecutor,下面的代码稍微简化了一点。

  • 默认得到的就是个无界的队列;
  • 如果你需要构建有限容量队列的线程池,可以在创建config后调用setMaxQueuedRequests来设置队列容量
  • 超出队列容量后将执行线程池拒绝策略(throw RejectedExecutionException)
  • 默认核心线程数和最大线程数是一样的,数值为200;
  • 使用guava提供的ThreadFactoryBuilder来构建线程工厂,主要是取个容易理解的名字;在thrift中大量使用了guava提供的工具类。
private ExecutorService makeDefaultWorkerExecutor(){
    BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();

    return new ThreadPoolExecutor(getWorkerThreads(),
                                  getWorkerThreads(),
                                  0L,
                                  TimeUnit.MILLISECONDS,
                                  queue,
                                  new ThreadFactoryBuilder().setNameFormat("thrift-worker-%s").build(),
                                  new ThreadPoolExecutor.AbortPolicy());
}
复制代码

netty的连接线程池和io线程池创建

acceptorExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-acceptor-%s").build());
acceptorThreads = config.getAcceptorThreadCount();
ioExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-io-%s").build());
ioThreads = config.getIoThreadCount();
复制代码
  • 该线程池创建的方法是netty提供的,看名字是不是像无界队列的线程池呢?
  • 连接数量的限定是1,io线程数量的限定默认是电脑核数*2,但都是可以在配置类中指定的。

关于netty配置类NettyServerConfig的构建nettyServerConfigBuilder.build(), 在之前设置了连接线程的线程数和io线程池线程数以及定时器timer,调用build后如下:

public NettyServerConfig build() {
    Timer timer = getTimer();
    ExecutorService bossExecutor = getBossExecutor();
    int bossThreadCount = getBossThreadCount();
    ExecutorService workerExecutor = getWorkerExecutor();
    int workerThreadCount = getWorkerThreadCount();

    return new NettyServerConfig(
            getBootstrapOptions(),
            timer != null ? timer : new NiftyTimer(threadNamePattern("")),
            bossExecutor != null ? bossExecutor : buildDefaultBossExecutor(),
            bossThreadCount,
            workerExecutor != null ? workerExecutor : buildDefaultWorkerExecutor(),
            workerThreadCount
    );
}

private ExecutorService buildDefaultBossExecutor() {
    return newCachedThreadPool(renamingThreadFactory(threadNamePattern("-boss-%s")));
}
private ExecutorService buildDefaultWorkerExecutor() {
    return newCachedThreadPool(renamingThreadFactory(threadNamePattern("-worker-%s")));
}
复制代码

由于没有设置两个线程池,所以会设置默认的线程池,注意这里一个是Boss线程池一个是Worker线程池。 其中的timer在后续构建netty处理器的时候会多次用到。

allChannels是一个netty提供的channelGroup:

private final DefaultChannelGroup allChannels = new DefaultChannelGroup();
复制代码

在介绍后续流程前,先了解下netty服务端创建步骤,因为这里用的是netty3,和我们比较熟悉的netty4差距有点大,可以对比着看下。

ChannelFactory factory = new NioServerSocketChannelFactory(
                 Executors.newCachedThreadPool(),
                 Executors.newCachedThreadPool(),
         );
ServerBootstrap bootstrap = new ServerBootstrap(factory);

bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
 public ChannelPipeline getPipeline() throws Exception {
     ChannelPipeline pipeline = Channels.pipeline();
     pipeline.addLast("handler1", new Handler1());
     return pipeline;
 }
});

Channel channel = bootstrap.bind(new InetSocketAddress(8081));
复制代码

构建NettyServerTransport的时候,在构造方法里面就是进行netty的一些设置。

transport = new NettyServerTransport(thriftServerDef, nettyServerConfig, allChannels);
复制代码

在这里面主要是进行一些对象成员变量的设置,最后也是最重要的就是构建ChannelPipelineFactory,在其中设置各种处理器,大部分继承自SimpleChannelUpstreamHandler,少部分继承自ChannelDownstreamHandler,可以类比netty4ChannelInboundHandler,ChannelOutboundHandler

this.pipelineFactory = new ChannelPipelineFactory(){
    @Override
    public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline cp = Channels.pipeline();
        // 设置处理器handler
        return cp;
    }
}
复制代码

我们来看有哪些处理器。

  1. ConnectionLimiter: 连接限流器, 创建的时候需要指定最大连接数,以及初始值为0的一个计数器;每次建立连接的时候计数器数值 + 1,关闭的时候数值 - 1;当连接数达到上限,则关闭通道,即channel。
  2. ChannelStatistics: 传入allChannels来构建对象,内部持有一个channelCount = 0,来统计建立通道channel数, 每次建立连接接受到数据的时候channelCount + 1,提供了get方法来获取channelCount;同时将channel加入到allChannels中。
  3. 编解码处理器DefaultThriftFrameCodec,每次收到的数据和传出的数据都需要进行一次编码或者解码。
  4. 连接的上下文处理器ConnectionContextHandler,在建立连接的时候创建NiftyConnectionContext,连接的上下文环境,包含了removeAddress和属性map,绑定到ChannelHandlerContext,即ctx.setAttachment(context);
  5. netty提供的关于超时处理器IdleStateHandlerIdleDisconnectHandler
  6. NiftyDispatcher,这个处理器最为核心,收到buffer经过解码后数据就传到了该处理器,该处理器会对数据进行一系列的处理和方法的调用等。
  7. 异常事件处理器NiftyExceptionLogger,重写了log方法,如果出现异常事件,该处理器会打印响应的异常日志。

来一览处理过程吧

public NettyServerTransport(final ThriftServerDef def, final NettyServerConfig nettyServerConfig, final ChannelGroup allChannels){
        this.def = def;
        this.nettyServerConfig = nettyServerConfig;
        this.port = def.getServerPort();
        this.allChannels = allChannels;
        final ConnectionLimiter connectionLimiter = new ConnectionLimiter(def.getMaxConnections());

        this.channelStatistics = new ChannelStatistics(allChannels);

        this.pipelineFactory = new ChannelPipelineFactory()
        {
            @Override
            public ChannelPipeline getPipeline()
                    throws Exception
            {
                ChannelPipeline cp = Channels.pipeline();
                TProtocolFactory inputProtocolFactory = def.getDuplexProtocolFactory().getInputProtocolFactory();
                NiftySecurityHandlers securityHandlers = def.getSecurityFactory().getSecurityHandlers(def, nettyServerConfig);
                cp.addLast("connectionContext", new ConnectionContextHandler());
                cp.addLast("connectionLimiter", connectionLimiter);
                cp.addLast(ChannelStatistics.NAME, channelStatistics);
                cp.addLast("frameCodec", def.getThriftFrameCodecFactory().create(def.getMaxFrameSize(),
                                                                                 inputProtocolFactory));
                if (def.getClientIdleTimeout() != null) {
                    // Add handlers to detect idle client connections and disconnect them
                    cp.addLast("idleTimeoutHandler", new IdleStateHandler(nettyServerConfig.getTimer(),
                                                                          def.getClientIdleTimeout().toMillis(),
                                                                          NO_WRITER_IDLE_TIMEOUT,
                                                                          NO_ALL_IDLE_TIMEOUT,
                                                                          TimeUnit.MILLISECONDS));
                    cp.addLast("idleDisconnectHandler", new IdleDisconnectHandler());
                }
              
                cp.addLast("dispatcher", new NiftyDispatcher(def, nettyServerConfig.getTimer()));
                cp.addLast("exceptionLogger", new NiftyExceptionLogger());
                return cp;
            }
        };
    }
复制代码

我们再回到server.start(),看下ThriftServer如何启动的

ThriftServer: 初始状态是NOT_STARTED

public synchronized ThriftServer start(){
    checkState(state != State.CLOSED, "Thrift server is closed");
    if (state == State.NOT_STARTED) {
        transport.start(serverChannelFactory);
        state = State.RUNNING;
    }
    return this;
}
复制代码

NettyServerTransport: 标准的netty服务端创建过程,其中pipelineFactory就是在前面NettyServerTransport构造方法中所创建的。

public void start(ServerChannelFactory serverChannelFactory){
    bootstrap = new ServerBootstrap(serverChannelFactory);
    bootstrap.setOptions(nettyServerConfig.getBootstrapOptions());
    bootstrap.setPipelineFactory(pipelineFactory);

    serverChannel = bootstrap.bind(new InetSocketAddress(port));
    // ... 
}
复制代码

到这里Thrift的服务端启动就介绍完了,下一部分将会介绍服务端接受数据,处理数据和响应结果的流程。

本文来自:服务端启动全解析

感谢作者:服务端启动全解析

查看原文:基于Netty的高性能RPC框架Nifty(一)

99 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传