VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > 编程开发 > Java教程 >
  • 基于大量图片与实例深度解析Netty中的核心组件

本篇文章主要详细分析Netty中的核心组件。

启动器Bootstrap和ServerBootstrap作为Netty构建客户端和服务端的路口,是编写Netty网络程序的第一步。它可以让我们把Netty的核心组件像搭积木一样组装在一起。在Netty Server端构建的过程中,我们需要关注三个重要的步骤

  • 配置线程池
  • Channel初始化
  • Handler处理器构建

调度器详解#

前面我们讲过NIO多路复用的设计模式之Reactor模型,Reactor模型的主要思想就是把网络连接、事件分发、任务处理的职责进行分离,并且通过引入多线程来提高Reactor模型中的吞吐量。其中包括三种Reactor模型

  • 单线程单Reactor模型
  • 多线程单Reactor模型
  • 多线程多Reactor模型

在Netty中,可以非常轻松的实现上述三种线程模型,并且Netty推荐使用主从多线程模型,这样就可以轻松的实现成千上万的客户端连接的处理。在海量的客户端并发请求中,主从多线程模型可以通过增加SubReactor线程数量,充分利用多核能力提升系统吞吐量。

Reactor模型的运行机制分为四个步骤,如图2-10所示。

  • 连接注册,Channel建立后,注册到Reactor线程中的Selector选择器
  • 事件轮询,轮询Selector选择器中已经注册的所有Channel的I/O事件
  • 事件分发,为准备就绪的I/O事件分配相应的处理线程
  • 任务处理,Reactor线程还负责任务队列中的非I/O任务,每个Worker线程从各自维护的任务队列中取出任务异步执行。

image-20210814175742838

图2-10 Reactor工作流程

EventLoop事件循环#

在Netty中,Reactor模型的事件处理器是使用EventLoop来实现的,一个EventLoop对应一个线程,EventLoop内部维护了一个Selector和taskQueue,分别用来处理网络IO事件以及内部任务,它的工作原理如图2-11所示。

image-20210816142508054

图2-11 NioEventLoop原理

EventLoop基本应用#

下面这段代码表示EventLoop,分别实现Selector注册以及普通任务提交功能。

public class EventLoopExample {

    public static void main(String[] args) {
        EventLoopGroup group=new NioEventLoopGroup(2);
        System.out.println(group.next()); //输出第一个NioEventLoop
        System.out.println(group.next()); //输出第二个NioEventLoop
        System.out.println(group.next()); //由于只有两个,所以又会从第一个开始
        //获取一个事件循环对象NioEventLoop
        group.next().register(); //注册到selector上
        group.next().submit(()->{
            System.out.println(Thread.currentThread().getName()+"-----");
        });
    }
}

EventLoop的核心流程#

基于上述的讲解,理解了EventLoop的工作机制后,我们再通过一个整体的流程图来说明,如图2-12所示。

EventLoop是一个Reactor模型的事件处理器,一个EventLoop对应一个线程,其内部会维护一个selector和taskQueue,负责处理IO事件和内部任务。IO事件和内部任务执行时间百分比通过ioRatio来调节,ioRatio表示执行IO时间所占百分比。任务包括普通任务和已经到时的延迟任务,延迟任务存放到一个优先级队列PriorityQueue中,执行任务前从PriorityQueue读取所有到时的task,然后添加到taskQueue中,最后统一执行task。

image-20210816144036419

图2-12 EventLoop工作机制

EventLoop如何实现多种Reactor模型#

  • 单线程模式

    EventLoopGroup group=new NioEventLoopGroup(1);
    ServerBootstrap b=new ServerBootstrap();
    b.group(group);
    
  • 多线程模式

    EventLoopGroup group =new NioEventLoopGroup(); //默认会设置cpu核心数的2倍
    ServerBootstrap b=new ServerBootstrap();
    b.group(group);
    
  • 多线程主从模式

    EventLoopGroup boss=new NioEventLoopGroup(1);
    EventLoopGroup work=new NioEventLoopGroup();
    ServerBootstrap b=new ServerBootstrap();
    b.group(boss,work);
    

EventLoop实现原理#

  • EventLoopGroup初始化方法,在MultithreadEventExecutorGroup.java中,根据配置的nThreads数量,构建一个EventExecutor数组

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        checkPositive(nThreads, "nThreads");
    
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
    
        children = new EventExecutor[nThreads];
    
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args);
            }
        }
    }
    
  • 注册channel到多路复用器的实现,MultithreadEventLoopGroup.register方法()

    SingleThreadEventLoop ->AbstractUnsafe.register ->AbstractChannel.register0->AbstractNioChannel.doRegister()

    可以看到会把channel注册到某一个eventLoop中的unwrappedSelector复路器中。

    protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {
                    selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                    return;
                }
            }
    }
    
  • 事件处理过程,通过NioEventLoop中的run方法不断遍历

    protected void run() {
        int selectCnt = 0;
        for (;;) {
            try {
                int strategy;
                try {
                    //计算策略,根据阻塞队列中是否含有任务来决定当前的处理方式
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                        case SelectStrategy.CONTINUE:
                            continue;
                        case SelectStrategy.BUSY_WAIT:
                            // fall-through to SELECT since the busy-wait is not supported with NIO
                        case SelectStrategy.SELECT:
                            long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                            if (curDeadlineNanos == -1L) {
                                curDeadlineNanos = NONE; // nothing on the calendar
                            }
                            nextWakeupNanos.set(curDeadlineNanos);
                            try {
                                if (!hasTasks()) { //如果队列中数据为空,则调用select查询就绪事件
                                    strategy = select(curDeadlineNanos);
                                }
                            } finally {
                                nextWakeupNanos.lazySet(AWAKE);
                            }
                        default:
                    }
                }
                selectCnt++;
                cancelledKeys = 0;
                needsToSelectAgain = false;
                   /* ioRatio调节连接事件和内部任务执行事件百分比
                    * ioRatio越大,连接事件处理占用百分比越大 */
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
                if (ioRatio == 100) {
                    try {
                        if (strategy > 0) { //处理IO时间
                            processSelectedKeys();
                        }
                    } finally {
                        //确保每次都要执行队列中的任务
                        ranTasks = runAllTasks();
                    }
                } else if (strategy > 0) {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                } else {
                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                }
                if (ranTasks || strategy > 0) {
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                     selectCnt - 1, selector);
                    }
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                    selectCnt = 0;
                }
            }
    }
    

服务编排层Pipeline的协调处理#

通过EventLoop可以实现任务的调度,负责监听I/O事件、信号事件等,当收到相关事件后,需要有人来响应这些事件和数据,而这些事件是通过ChannelPipeline中所定义的ChannelHandler完成的,他们是Netty中服务编排层的核心组件。

在下面这段代码中,我们增加了h1和h2两个InboundHandler,用来处理客户端数据的读取操作,代码如下。

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
    //配置Server的通道,相当于NIO中的ServerSocketChannel
    .channel(NioServerSocketChannel.class)
    //childHandler表示给worker那些线程配置了一个处理器,
    // 这个就是上面NIO中说的,把处理业务的具体逻辑抽象出来,放到Handler里面
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            //                            socketChannel.pipeline().addLast(new NormalMessageHandler());
            socketChannel.pipeline().addLast("h1",new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    System.out.println("handler-01");
                    super.channelRead(ctx, msg);
                }
            }).addLast("h2",new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    System.out.println("handler-02");
                    super.channelRead(ctx, msg);
                }
            });
        }
    });

上述代码构建了一个ChannelPipeline,得到如图2-13所示的结构,每个Channel都会绑定一个ChannelPipeline,一个ChannelPipeline包含多个ChannelHandler,这些Handler会被包装成ChannelHandlerContext加入到Pipeline构建的双向链表中。

ChannelHandlerContext用来保存ChannelHandler的上下文,它包含了ChannelHandler生命周期中的所有事件,比如connect/bind/read/write等,这样设计的好处是,各个ChannelHandler进行数据传递时,前置和后置的通用逻辑就可以直接保存到ChannelHandlerContext中进行传递。

image-20210816165542050

图2-13

出站和入站操作#

根据网络数据的流向,ChannelPipeline分为入站ChannelInBoundHandler和出站ChannelOutboundHandler两个处理器,如图2-14所示,客户端与服务端通信过程中,数据从客户端发向服务端的过程叫出站,对于服务端来说,数据从客户端流入到服务端,这个时候是入站。

image-20210812224219710

图2-14 InBound和OutBound的关系

ChannelHandler事件触发机制#

当某个Channel触发了IO事件后,会通过Handler进行处理,而ChannelHandler是围绕I/O事件的生命周期来设计的,比如建立连接、读数据、写数据、连接销毁等。

ChannelHandler有两个重要的子接口实现,分别拦截数据流入和数据流出的I/O事件

  • ChannelInboundHandler
  • ChannelOutboundHandler

图2-15中显示的Adapter类,提供很多默认操作,比如ChannelHandler中有很多很多方法,我们用户自定义的方法有时候不需要重载全部,只需要重载一两个方法,那么可以使用Adapter类,它里面有很多默认的方法。其它框架中结尾是Adapter的类的作用也大都是如此。所以我们在使用netty的时候,往往很少直接实现ChannelHandler的接口,经常是继承Adapter类。

image-20210816200206761
图2-15 ChannelHandler类关系图

ChannelInboundHandler事件回调和触发时机如下

事件回调方法 触发时机
channelRegistered Channel 被注册到 EventLoop
channelUnregistered Channel 从 EventLoop 中取消注册
channelActive Channel 处于就绪状态,可以被读写
channelInactive Channel 处于非就绪状态
channelRead Channel 可以从远端读取到数据
channelReadComplete Channel 读取数据完成
userEventTriggered 用户事件触发时
channelWritabilityChanged Channel 的写状态发生变化

ChannelOutboundHandler时间回调触发时机

事件回调方法 触发时机
bind 当请求将channel绑定到本地地址时被调用
connect 当请求将channel连接到远程节点时被调用
disconnect 当请求将channel从远程节点断开时被调用
close 当请求关闭channel时被调用
deregister 当请求将channel从它的EventLoop注销时被调用
read 当请求通过channel读取数据时被调用
flush 当请求通过channel将入队数据刷新到远程节点时调用
write 当请求通过channel将数据写到远程节点时被调用

事件传播机制演示#

public class NormalOutBoundHandler extends ChannelOutboundHandlerAdapter {
    private final String name;

    public NormalOutBoundHandler(String name) {
        this.name = name;
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("OutBoundHandler:"+name);
        super.write(ctx, msg, promise);
    }
}
public class NormalInBoundHandler extends ChannelInboundHandlerAdapter {
    private final String name;
    private final boolean flush;

    public NormalInBoundHandler(String name, boolean flush) {
        this.name = name;
        this.flush = flush;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("InboundHandler:"+name);
        if(flush){
            ctx.channel().writeAndFlush(msg);
        }else {
            super.channelRead(ctx, msg);
        }
    }
}
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
    //配置Server的通道,相当于NIO中的ServerSocketChannel
    .channel(NioServerSocketChannel.class)
    //childHandler表示给worker那些线程配置了一个处理器,
    // 这个就是上面NIO中说的,把处理业务的具体逻辑抽象出来,放到Handler里面
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            socketChannel.pipeline()
                .addLast(new NormalInBoundHandler("NormalInBoundA",false))
                .addLast(new NormalInBoundHandler("NormalInBoundB",false))
                .addLast(new NormalInBoundHandler("NormalInBoundC",true));
            socketChannel.pipeline()
                .addLast(new NormalOutBoundHandler("NormalOutBoundA"))
                .addLast(new NormalOutBoundHandler("NormalOutBoundB"))
                .addLast(new NormalOutBoundHandler("NormalOutBoundC"));
        }
    });

上述代码运行后会得到如下执行结果

InboundHandler:NormalInBoundA
InboundHandler:NormalInBoundB
InboundHandler:NormalInBoundC
OutBoundHandler:NormalOutBoundC
OutBoundHandler:NormalOutBoundB
OutBoundHandler:NormalOutBoundA

当客户端向服务端发送请求时,会触发服务端的NormalInBound调用链,按照排列顺序逐个调用Handler,当InBound处理完成后调用WriteAndFlush方法向客户端写回数据,此时会触发NormalOutBoundHandler调用链的write事件。

从执行结果来看,Inbound和Outbound的事件传播方向是不同的,Inbound传播方向是head->tail,Outbound传播方向是Tail-Head。

异常传播机制#

ChannelPipeline时间传播机制是典型的责任链模式,那么有同学肯定会有疑问,如果这条链路中某个handler出现异常,那会导致什么问题呢?我们对前面的例子修改NormalInBoundHandler

public class NormalInBoundHandler extends ChannelInboundHandlerAdapter {
    private final String name;
    private final boolean flush;

    public NormalInBoundHandler(String name, boolean flush) {
        this.name = name;
        this.flush = flush;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("InboundHandler:"+name);
        if(flush){
            ctx.channel().writeAndFlush(msg);
        }else {
            //增加异常处理
            throw new RuntimeException("InBoundHandler:"+name);
        }
    }
}

这个时候一旦抛出异常,会导致整个请求链被中断,在ChannelHandler中提供了一个异常捕获方法,这个方法可以避免ChannelHandler链中某个Handler异常导致请求链路中断。它会把异常按照Handler链路的顺序从head节点传播到Tail节点。如果用户最终没有对异常进行处理,则最后由Tail节点进行统一处理

修改NormalInboundHandler,重写下面这个方法。

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    System.out.println("InboundHandlerException:"+name);
    super.exceptionCaught(ctx, cause);
}

在Netty应用开发中,好的异常处理非常重要能够让问题排查变得很轻松,所以我们可以通过一种统一拦截的方式来解决异常处理问题。

添加一个复合处理器实现类

public class ExceptionHandler extends ChannelDuplexHandler {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if(cause instanceof RuntimeException){
            System.out.println("处理业务异常");
        }
        super.exceptionCaught(ctx, cause);
    }
}

把新增的ExceptionHandler添加到ChannelPipeline中

bootstrap.group(bossGroup, workerGroup)
    //配置Server的通道,相当于NIO中的ServerSocketChannel
    .channel(NioServerSocketChannel.class)
    //childHandler表示给worker那些线程配置了一个处理器,
    // 这个就是上面NIO中说的,把处理业务的具体逻辑抽象出来,放到Handler里面
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            socketChannel.pipeline()
                .addLast(new NormalInBoundHandler("NormalInBoundA",false))
                .addLast(new NormalInBoundHandler("NormalInBoundB",false))
                .addLast(new NormalInBoundHandler("NormalInBoundC",true));
            socketChannel.pipeline()
                .addLast(new NormalOutBoundHandler("NormalOutBoundA"))
                .addLast(new NormalOutBoundHandler("NormalOutBoundB"))
                .addLast(new NormalOutBoundHandler("NormalOutBoundC"))
                .addLast(new ExceptionHandler());
        }
    });

最终,我们就能够实现异常的统一处理。

 

版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Mic带你学架构


相关教程