ChannelHandler

netty中的ChannelHandler用于处理Channel对应的事件,每一个ChannelHandler都会和一个channel绑定,ChannelHandler体系如下:

image

ChannelHandler接口里面只定义了三个生命周期方法:

    void handlerAdded(ChannelHandlerContext var1) throws Exception;    void handlerRemoved(ChannelHandlerContext var1) throws Exception;    void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;

子接口ChannelInboundHandler和ChannelOutboundHandler对ChannelHandler进行了扩展,但netty框架提供了ChannelInboundHandlerAdapter、ChannelOutboundHandlerAdapter和ChannelDuplexHandler三个适配类,实际使用时继承这些适配类即可。

此外还有简化类SimpleChannelInboundHandler,继承SimpleChannelInboundHandler类后,会在接收到数据后⾃动release掉数据占⽤的Bytebuffer资源,并且继承该类需要指定数据格式。⽽继承ChannelInboundHandlerAdapter则不会⾃动释放Bytebuffer资源,需要⼿动调⽤ReferenceCountUtil.release()等⽅法进⾏释放,并且继承该类不需要指定数据格式。

实际编写server端时,需要继承ChannelInboundHandlerAdapter,防⽌数据未处理完就⾃动释放了。此外server端可能有多个客户端连接,并且每⼀个客户端请求的数据格式都不⼀致,相比之下ChannelInboundHandlerAdapter更灵活。

客户端根据情况可以继承SimpleChannelInboundHandler类。好处是直接指定好传输的数据格式,就不需要再进⾏格式的转换了。

ChannelHandler中的三个生命周期方法分别对应如下场景:

当前ChannelHander加入ChannelHandlerContext中;当前从ChannelHandlerContext中移除;ChannelHandler回调方法出现异常时被回调.

ChannelInboundHandler和ChannelOutboundHandler

区别主要在于ChannelInboundHandler的channelRead和channelReadComplete回调和ChannelOutboundHandler的write和flush回调上,ChannelInboundHandler的channelRead回调负责执行入栈数据的decode逻辑,ChannelOutboundHandler的write负责执行出站数据的encode工作。

ChannelInboundHandler

ChannelInboundHandler定义了如下回调方法:

    void channelRegistered(ChannelHandlerContext var1) throws Exception;    void channelUnregistered(ChannelHandlerContext var1) throws Exception;    void channelActive(ChannelHandlerContext var1) throws Exception;    void channelInactive(ChannelHandlerContext var1) throws Exception;    void channelRead(ChannelHandlerContext var1,   var2) throws Exception;    void channelReadComplete(ChannelHandlerContext var1) throws Exception;    void userEventTriggered(ChannelHandlerContext var1,   var2) throws Exception;    void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;    void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;

其回调时机为:

    channelRegistered 当前channel注册到EventLoop;        channelUnregistered 当前channel从EventLoop取消注册;        channelActive 当前channel激活的时候;        channelInactive 当前channel失活的时候;        channelRead 当前channel从远端读取到数据;        channelReadComplete channel read消费完读取的数据的时候被触发;        userEventTriggered 用户事件触发的时候;        channelWritabilityChanged channel的写状态变化的时候触发。

ChannelHandlerContext作为参数,在每个回调事件处理完成之后,使用ChannelHandlerContext的fireChannelXXX方法来传递给pipeline中下一个ChannelHandler,netty的codec模块和业务处理代码分离就用到了这个链路处理。

ChannelOutboundHandler

ChannelOutboundHandler定义了如下回调方法:

    void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception;    void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception;    void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;    void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;    void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;    void read(ChannelHandlerContext var1) throws Exception;    void write(ChannelHandlerContext var1,   var2, ChannelPromise var3) throws Exception;    void flush(ChannelHandlerContext var1) throws Exception;

回调方法触发时机:

    bind bind操作执行前触发;        connect connect 操作执行前触发;        disconnect disconnect 操作执行前触发;        close close操作执行前触发;        deregister deregister操作执行前触发;        read read操作执行前触发;        write write操作执行前触发;        flush flush操作执行前触发;

对于ChannelPromise这个参数,可以调用它的addListener注册监听,当回调方法所对应的操作完成后,会触发这个监听下面的代码。

同样添加监听器的还有ChannelFuture,而ChannelFuture也是ChannelPromise的父接口:

public interface ChannelPromise extends ChannelFuture, Promise<Void> {    ...    ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> var1);    ...}    

例如:

ctx.writeAndFlush(toFullHttpResponse()).addListener(ChannelFutureListener.CLOSE);

ChannelFutureListener.CLOSE这个监听器就会在writeAndFlush完成之后被调用来关闭channel:

    ChannelFutureListener CLOSE = new ChannelFutureListener() {        public void operationComplete(ChannelFuture future) {            future.channel().close();        }    };

ChannelHandlerContext

当ChannelHandler加入到ChannelPipeline的时候,会创建一个对应的ChannelHandlerContext并绑定,ChannelPipeline实际维护的是和ChannelHandlerContext的关系,例如在DefaultChannelPipeline:

public class DefaultChannelPipeline implements ChannelPipeline {    ...    final AbstractChannelHandlerContext head;    final AbstractChannelHandlerContext tail;}

DefaultChannelPipeline会保存第一个ChannelHandlerContext以及最后一个ChannelHandlerContext的引用。

而AbstractChannelHandlerContext中维护了next和prev指针:

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {    ...    volatile AbstractChannelHandlerContext next;    volatile AbstractChannelHandlerContext prev;}

这样ChannelHandlerContext之间形成了双向链表。

ChannelPipeline

在Channel创建的时候,会同时创建ChannelPipeline:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {    ...    protected AbstractChannel(Channel parent) {        this.parent = parent;        this.id = this.newId();        this.unsafe = this.newUnsafe();        this.pipeline = this.newChannelPipeline();    }        protected DefaultChannelPipeline newChannelPipeline() {        return new DefaultChannelPipeline(this);    }}

在ChannelPipeline中也会持有Channel的引用,ChannelPipeline会维护一个ChannelHandlerContext的双向链表,链表的头尾有默认实现:

public class DefaultChannelPipeline implements ChannelPipeline {    final AbstractChannelHandlerContext head;    final AbstractChannelHandlerContext tail;    private final Channel channel;    protected DefaultChannelPipeline(Channel channel) {        this.channel = (Channel) Util.checkNotNull(channel, "channel");        this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);        this.voidPromise = new VoidChannelPromise(channel, true);        this.tail = new DefaultChannelPipeline.TailContext(this);        this.head = new DefaultChannelPipeline.HeadContext(this);        this.head.next = this.tail;        this.tail.prev = this.head;    }}

我们添加的自定义ChannelHandler会插入到head和tail之间,以addLast为例:

    public final ChannelPipeline addLast(String name, ChannelHandler handler) {        return this.addLast((EventExecutorGroup)null, name, handler);    }    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {            ...            this.addLast0(newCtx);            ...    }    private void addLast0(AbstractChannelHandlerContext newCtx) {        AbstractChannelHandlerContext prev = this.tail.prev;        newCtx.prev = prev;        newCtx.next = this.tail;        prev.next = newCtx;        this.tail.prev = newCtx;    }

如果是ChannelInboundHandler的回调,根据插入的顺序从head向tail进行链式调用,ChannelOutboundHandler则相反:
image

值得注意的是,整条链路的调用需要通过Channel接口直接触发,如果使用ChannelContextHandler的接口方法间接触发,链路会从该ChannelContextHandler对应的ChannelHandler开始,而不是从头或尾开始。

ChannelPipeline入口在NioEventLoop的processSelectedKey():

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();        ...        try {            ...            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {                unsafe.read();            }        } catch (CancelledKeyException ignored) {...}    }

当为OP_READ事件时,调用unsafe.read():

        @Override        public final void read() {            final ChannelPipeline pipeline = pipeline();            ...            try {                do {                    ...                    pipeline.fireChannelRead(byteBuf);                    byteBuf = null;                    ....                } while (allocHandle.continueReading());            } catch (Throwable t) {...}        }

pipeline.fireChannelRead(byteBuf)是入栈入口,实际上是pipeline中ChannelHandlerContext的head节点进行fireChannelRead的同语义操作。

ChannelPipeline相关元素之间的关系如下:

image

1. 每个Channel都会绑定且只绑定一个ChannelPipeline,ChannelPipeline中也会持有Channel的引用;2. ChannelPipeline持有ChannelHandlerContext链路;3. 每个ChannelHandlerContext对应一个ChannelHandler;4. ChannelHandlerContext同时也会持有ChannelPipeline引用,也就间接持有Channel引用;5. ChannelHandler链路会根据Handler的类型,分为InBound和OutBound两条链路,inbound处理入栈数据,outbound处理出栈数据。
收藏 打印