netty是一个nio客户机-服务器框架,它简化了tcp和udp网络编程,相对于java传统nio,netty还屏蔽了操作系统的差异性,并且兼顾了性能。
Channel
channel封装了对socket的原子操作,实质是对socket的封装和扩展。
netty框架自己定义的通道接口,是对java nio channel的封装和扩展,客户端NIO套接字通道是NioSocketChannel,提供的服务器端NIO套接字通道是NioServerSocketChannel。
NioSocketChannel内部管理了一个Java NIO的SocketChanne实例,用来创建SocketChannel实例和设置该实例的属性,并调用Connect 方法向服务端发起 TCP 链接等。
NioServerSocketChannel内部管理了Java NIO的ServerSocketChannel实例,用来创建ServerSocketChannel实例和设置该实例属性,并调用实例的bind方法在指定端口监听客户端的链接。
EventLoopGroup
netty使用了主从多线程的Reactor模型。在netty中每个EventLoopGroup本身是一个线程池,其中包含了自定义个数的 NioEventLoop,每个NioEventLoop是一个线程,并且每个NioEventLoop都会关联一个selector选择器。
客户端一般只用一个EventLoopGroup来处理网络IO操作,服务器端一般使用两个,boss-group用来接收客户端发来的TCP链接请求,worker-group用来具体处理网络请求。
当Channel是客户端通道NioSocketChannel时,会注册NioSocketChannel管理的SocketChannel实例到自己关联的NioEventLoop的selector选择器上,然后NioEventLoop对应的线程会通过select命令监控感兴趣的网络读写事件。
当 Channel 是服务端通道NioServerSocketChannel时,NioServerSocketChannel本身会被注册到boss EventLoopGroup里面的某一个NioEventLoop管理的selector选择器,而完成三次握手的链接套接字是被注册到了worker EventLoopGroup里面的某一个NioEventLoop管理的selector选择器上。
多个Channel可以注册到同一个NioEventLoop管理的selector选择器上,这时NioEventLoop对应的单个线程就可以处理多个Channel的就绪事件;但是每个Channel只能注册到一个固定的NioEventLoop管理的selector上。
ChannelPipeline
ChannelPipeline持有一个ChannelHandler的双向链结构。每个Channel都有属于自己的ChannelPipeline,对从Channel 中读取或者要写入 Channel 中的数据进行依次处理。
多ChannelPipeline里面可以复用一个ChannelHandler。
Netty 客户端底层与 Java NIO 对应关系
NioSocketChannel是对Java NIO的SocketChannel的封装,从NioSocketChannel的构造函数可以看出:
public class NioSocketChannel extends AbstractNioByteChannel implements SocketChannel { private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); public NioSocketChannel() { this(DEFAULT_SELECTOR_PROVIDER); } public NioSocketChannel(SelectorProvider provider) { this(newSocket(provider)); } private static java.nio.channels.SocketChannel newSocket(SelectorProvider provider) { try { return provider.openSocketChannel(); } catch (IOException var2) { throw new ChannelException("Failed to open a socket.", var2); } }}SocketChannel的父类是AbstractNioChannel,在AbstractNioChannel中定义了java nio的SocketChannel类型的成员,并将其模式设置为非阻塞:
public abstract class AbstractNioChannel extends AbstractChannel { private final SelectableChannel ch; protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { ... this.ch = ch; this.readInterestOp = readInterestOp; ch.configureBlocking(false); }}所以说NioSocketChannel内部是对java nio的SocketChannel的封装扩展,创建NioSocketChannel实例对象时候相当于执行了Java NIO中:
SocketChannel socketChannel = SocketChannel.open();NioSocketChannel实例的创建和注册是在Bootstrap的connect阶段。Bootstrap的connect代码:
public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> { ... public ChannelFuture connect(InetAddress inetHost, int inetPort) { return this.connect(new InetSocketAddress(inetHost, inetPort)); } public ChannelFuture connect(SocketAddress remoteAddress) { ... return this.doResolveAndConnect(remoteAddress, this.config.localAddress()); } private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { ChannelFuture regFuture = this.initAndRegister(); ... } final ChannelFuture initAndRegister() { Channel channel = null; try { channel = this.channelFactory.newChannel(); this.init(channel); } catch (Throwable var3) {} ChannelFuture regFuture = this.config().group().register(channel); ... }channelFactory.newChannel()创建了NIOSocketChannel实例。
继续跟踪this.config().group().register(channel)到:
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { ... public ChannelFuture register(Channel channel) { return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this))); } public ChannelFuture register(ChannelPromise promise) { Util.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }最终会跟踪到AbstractNioChannel的doRegister():
public abstract class AbstractNioChannel extends AbstractChannel { ... protected void doRegister() throws Exception { boolean selected = false; while(true) { try { this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException var3) { if (selected) { throw var3; } this.eventLoop().selectNow(); selected = true; } } }} 其中:
this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this)将NioSocketChannel实例注册到了当前NioEventLoop的选择器中。
从选择器获取就绪的事件是在该客户端套接关联的NioEventLoop里面的做的,每个NioEventLoop里面有一个线程用来循环从选择器里面获取就绪的事件:
protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } ... if (ioRatio == 100) { try { processSelectedKeys(); } finally { // Ensure we always run tasks. runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) {} } } private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { while(true) { ... int selectedKeys = selector.select(timeoutMillis); ++selectCnt; ... } catch (CancelledKeyException var13) {} }从选择器选取就绪的事件后,会最终调用processSelectedKey具体处理每个事件:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); ... try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } // Process OP_WRITE if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) {} }netty和reactor模型
netty实现单线程reactor:
private EventLoopGroup group = new NioEventLoopGroup(1);ServerBootstrap bootstrap = new ServerBootstrap() .group(group) .childHandler(new HeartbeatInitializer());netty实现多线程reactor:
// 默认线程数为CPU核心数 * 2,*2是因为考虑到超线程CPU包括两个逻辑线程private EventLoopGroup group = new NioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap() .group(boss) .childHandler(new HeartbeatInitializer());netty实现主从多线程reactor:
private EventLoopGroup boss = new NioEventLoopGroup();private EventLoopGroup work = new NioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap() .group(boss, work) .childHandler(new HeartbeatInitializer()); 继续阅读与本文标签相同的文章
ChannelHandler
阿里巴巴大数据技术关键进展及展望
-
[转]央视快评丨把区块链作为核心技术自主创新重要突破口
2026-05-16栏目: 教程
-
【阿里云新品发布会】第30期:日志审计服务最新“神操作”,一键开启云上合规“保护伞”
2026-05-16栏目: 教程
-
[转]大河资本创始合伙人王童:创新创业需要更多个人天使
2026-05-16栏目: 教程
-
阿里云网站备案咨询-上传资料及真实性核验的问题汇总
2026-05-16栏目: 教程
-
免费 & 有奖 | K8s 支持 “一键部署” 功能
2026-05-16栏目: 教程
