0%

netty(二)

netty执行流程

img

核心组件

Channel

Channel是一个连接网络输入和IO处理的桥梁。你可以通过Channel来判断当前的状态,是open还是connected,还可以判断当前Channel支持的IO操作,还可以使用ChannelPipeline对Channel中的消息进行处理。

在 Netty 中,Channel 是一个表示与实体(如远程节点)进行通信的开放连接,例如一个网络套接字。它类似于 Java NIO 中的 java.nio.channels.Channel 接口,但提供了更多的功能和易用性。

Netty 网络通信的组件,能够用于执行网络 I/O 操作。Channel 为用户提供:

  • 当前网络连接的通道的状态(例如是否打开?是否已连接?)

  • 网络连接的配置参数 (例如接收缓冲区大小)

  • 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回,并且不保证在调用结束时所请求的 I/O 操作已完成。

    调用立即返回一个 ChannelFuture 实例,通过注册监听器到 ChannelFuture 上,可以 I/O 操作成功、失败或取消时回调通知调用方。

  • 支持关联 I/O 操作与对应的处理程序。

不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应。下面是一些常用的 Channel 类型:

  • NioSocketChannel,异步的客户端 TCP Socket 连接。
  • NioServerSocketChannel,异步的服务器端 TCP Socket 连接。
  • NioDatagramChannel,异步的 UDP 连接。
  • NioSctpChannel,异步的客户端 Sctp 连接。
  • NioSctpServerChannel,异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO。

在 Netty 中,Channel 是一个重要的概念,因为所有的 I/O 操作都是通过 Channel 进行的。

启动器-ServerBootstrap、Bootstrap

​ Bootstarp 和 ServerBootstrap 被称为引导类,指对应用程序进行配置,并使他运行起来的过程。Netty处理引导的方式是使你的应用程序和网络层相隔离。

​ Bootstrap 是客户端的引导类,Bootstrap 在调用 bind()(连接UDP)和 connect()(连接TCP)方法时,会新创建一个 Channel,仅创建一个单独的、没有父 Channel 的 Channel 来实现所有的网络交换。

​ ServerBootstrap 是服务端的引导类,ServerBootstarp 在调用 bind() 方法时会创建一个 ServerChannel 来接受来自客户端的连接,并且该 ServerChannel 管理了多个子 Channel 用于同客户端之间的通信。

对比项 服务端 客户端
BIO ServerSocket Socket
NIO ServerSocketChannel SocketChannel
AIO AsynchronousServerSocketChannel AsynchronousSocketChannel
Netty ServerBootstrap Bootstrap

从上表中能明显感觉出它俩在Netty中的作用,无非就是服务端与客户端换了个叫法而已。

事件组-EventLoopGroup、EventLoop

EventLoop这东西翻译过来就是事件循环的意思,你可以把它理解成NIO中的Selector选择器,实际它本质上就是这玩意儿,因为内部会维护一个Selector,然后由一条线程会循环处理Channel通道上发生的所有事件,所以每个EventLoop对象都可以看成一个单线程执行器。EventLoop 本身只是一个线程驱动,在其生命周期内只会绑定一个线程,让该线程处理一个 Channel 的所有 IO 事件。

一个 Channel 一旦与一个 EventLoop 相绑定,那么在 Channel 的整个生命周期内是不能改变的。一个 EventLoop 可以与多个 Channel 绑定。即 Channel 与 EventLoop 的关系是 n:1,而 EventLoop 与线程的关系是 1:1。

EventLoopGroup 是一个 EventLoop 池,包含很多的 EventLoop:EventLoopGroup可以将其理解成AIO中的AsynchronousChannelGroup可能会更合适,在AIOACG(前面那玩意儿的缩写)中,我们需要手动指定一个线程池,然后AIO的所有客户端工作都会使用线程池中的线程进行管理,而Netty中的EventLoopGroup就类似于AIO-ACG这玩意儿,只不过不需要我们管理线程池了,而是Netty内部维护。

为什么在服务端要定义两个组呢?一个难道不行吗?其实也是可以的,但定义两个组的好处在于:可以让Group中的每个EventLoop分工更加明确,不同的Group分别处理不同类型的事件,各司其职。

在前面案例中,为服务端绑定了两个事件循环组,也就代表着会根据ServerSocketChannel上触发的不同事件,将对应的工作分发到这两个Group中处理,其中boss主要负责客户端的连接事件,而worker大多数情况下负责处理客户端的IO读写事件。

当客户端的SocketChannel连接到来时,首先会将这个注册事件的工作交给boss处理,boss会调用worker.register()方法,将这条客户端连接注册到worker工作组中的一个EventLoop上。前面提到过:EventLoop内部会维护一个Selector选择器,因此实际上也就是将客户端通道注册到其内部中的选择器上。

注意:将一个Socket连接注册到一个EventLoop上之后,这个客户端连接则会和这个EventLoop绑定,以后这条通道上发生的所有事件,都会交由这个EventLoop处理。

到这里大家应该也理解了为何要拆出两个EventLoopGroup,主要目的就在于分工更为明细。当然,由于EventLoopGroup本质上可以理解成一个线程池,其中存在的线程资源自然是有限的,那此时如果到来的客户端连接大于线程数量怎么办呢?这是不影响的,因为Netty本身是基于Java-NIO封装的,而NIO底层又是基于多路复用模型实现的,天生就能实现一条线程管理多个连接的功能,所以就算连接数大于线程数,也完全可以Hold住。

Netty中的增强版通道(ChannelFuture)

Netty 中所有的 I/O 操作都是异步的,即操作不会立即得到返回结果,所以 Netty 中定义了一个 ChannelFuture 对象作为这个异步操作的“代言人”,表示异步操作本身。如果想获取到该异步操作的返回值,可以通过该异步操作对象的addListener() 方法为该异步操作添加监 NIO 网络编程框架 Netty 听器,为其注册回调:当结果出来后马上调用执行。

Netty 的异步编程模型都是建立在 Future 与回调概念之上的。ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个的 ChannelHandler 如何相互交互。

还记得之前客户端如何连接服务端的嘛?如下:

1
2
Bootstrap client = new Bootstrap();
client.connect("127.0.0.1", 8888);

但这个connect()连接方法,本质上是一个异步方法,返回的并不是Channel对象,而是一个ChannelFuture对象,如下:

1
public ChannelFuture connect(String inetHost, int inetPort);

也包括ServerBootstrap绑定地址的bind()也相同,返回的并非ServerChannel,也是一个ChannelFuture对象。这是因为在Netty的机制中,绑定/连接工作都是异步的,因此如果要用Netty创建一个客户端连接,为了确保连接建立成功后再操作,通常情况下都会再调用.sync()方法同步阻塞,直到连接建立成功后再使用通道写入数据。

1
2
3
4
5
6
// 与服务端建立连接
ChannelFuture cf = client.connect("127.0.0.1", 8888);
// 同步阻塞至连接建立成功为止
cf.sync();
// 连接建立成功后再获取对应的Socket通道写入数据
cf.channel().writeAndFlush("...");

在 Netty 中,ChannelFuture 代表了一个尚未完成的 I/O 操作,比如写入数据、连接服务器等等。当这些 I/O 操作发起时,会返回一个 ChannelFuture 对象,通过该对象可以获知操作是否已经完成、是否成功,以及可以注册回调函数来处理操作完成后的事件。

具体而言,ChannelFuture 主要有以下作用:

  1. 同步等待操作完成:通过调用 await() 方法可以阻塞当前线程,直到操作完成。
  2. 异步注册回调函数:通过调用 addListener() 方法可以注册一个回调函数,在操作完成后自动触发。
  3. 获取操作状态:通过调用 isDone()isSuccess() 方法可以判断操作是否已经完成或是否成功。
  4. 获取操作结果:通过调用 get() 方法可以获取操作的结果,或者在操作未完成时阻塞当前线程等待结果。

总之,ChannelFuture 提供了一种方便的异步处理 I/O 操作的机制,可以让开发者更加灵活地管理网络连接和数据传输。

核心组件 - 通道处理器(Handler)

ChannelHandler 是对 Channel 中数据的处理器,这些处理器可以是系统本身定义好的编解码器,也可以是用户自定义的。这些处理器会被统一添加到一个 ChannelPipeline 的对象中,然后按照添加的顺序对 Channel 中的数据进行依次处理。

Handler可谓是整个Netty框架中最为重要的一部分,它的职责主要是用于处理Channel通道上的各种事件,所有的处理器都可被大体分为两类:

  • 入站处理器:一般都是ChannelInboundHandlerAdapter以及它的子类实现。
  • 出站处理器:一般都是ChannelOutboundHandlerAdapter以及它的子类实现。

在系统中网络操作都通常会分为入站和出站两种,所谓的入站即是指接收请求,反之,所谓的出站则是指返回响应,而Netty中的入站处理器,会在客户端消息到来时被触发,而出站处理器则会在服务端返回数据时被触发。

ChannelHandler

ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。

ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类:

  • ChannelInboundHandler 用于处理入站 I/O 事件。
  • ChannelOutboundHandler 用于处理出站 I/O 操作。

或者使用以下适配器类:

  • ChannelInboundHandlerAdapter 用于处理入站 I/O 事件。
  • ChannelOutboundHandlerAdapter 用于处理出站 I/O 操作。
  • ChannelDuplexHandler 用于处理入站和出站事件。

ChannelHandlerContext

保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象。

入站处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// 服务端
public class HandlerServer {
public static void main(String[] args) {
// 0.准备工作:创建一个事件循环组、一个ServerBootstrap服务端
EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();

server
// 1.绑定前面创建的事件循环组
.group(group)
// 2.声明通道类型为服务端NIO通道
.channel(NioServerSocketChannel.class)
// 3.通过ChannelInitializer完成通道的初始化工作
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nsc) throws Exception {
// 4.获取通道的ChannelPipeline处理器链表
ChannelPipeline pipeline = nsc.pipeline();
// 5.基于pipeline链表向通道上添加入站处理器
pipeline.addLast("In-①",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("俺是第一个入站处理器...");
super.channelRead(ctx, msg);
}
});
pipeline.addLast("In-②",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("我是第二个入站处理器...");
super.channelRead(ctx, msg);
}
});
pipeline.addLast("In-③",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("朕是第三个入站处理器...");
}
});
}
})
// 为当前启动的服务端绑定IP和端口地址
.bind("127.0.0.1",8888);
}
}

// 客户端
public class HandlerClient {
public static void main(String[] args) {
// 0.准备工作:创建一个事件循环组、一个Bootstrap启动器
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client
// 1.绑定事件循环组
.group(group)
// 2.声明通道类型为NIO客户端通道
.channel(NioSocketChannel.class)
// 3.初始化通道,添加一个UTF-8的编码器
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc)
throws Exception {
// 添加一个编码处理器,对数据编码为UTF-8格式
ChannelPipeline pipeline = sc.pipeline();
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
}
});

// 4.与指定的地址建立连接
ChannelFuture cf = client.connect("127.0.0.1", 8888).sync();
// 5.建立连接成功后,向服务端发送数据
System.out.println("正在向服务端发送信息......");
cf.channel().writeAndFlush("我是<竹子爱熊猫>!");
} catch (Exception e){
e.printStackTrace();
} finally {
// 6.最后关闭事件循环组
group.shutdownGracefully();
}
}
}

netty服务器示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
// 创建两个EventLoopGroup,boss:处理连接事件,worker处理I/O事件
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
// 创建一个ServerBootstrap服务端(同之前的ServerSocket类似)
ServerBootstrap server = new ServerBootstrap();
try {
// 将前面创建的两个EventLoopGroup绑定在server上
server.group(boss,worker)
// 指定服务端的通道为Nio类型
.channel(NioServerSocketChannel.class)
// 为到来的客户端Socket添加处理器
.childHandler(new ChannelInitializer<NioSocketChannel>() {
// 这个只会执行一次(主要是用于添加更多的处理器)
@Override
protected void initChannel(NioSocketChannel ch) {
// 添加一个字符解码处理器:对客户端的数据解码
ch.pipeline().addLast(
new StringDecoder(CharsetUtil.UTF_8));
// 添加一个入站处理器,对收到的数据进行处理
ch.pipeline().addLast(
new SimpleChannelInboundHandler<String>() {
// 读取事件的回调方法
@Override
protected void channelRead0(ChannelHandlerContext
ctx,String msg) {
System.out.println("收到客户端信息:" + msg);
}
});
}
});
// 为当前服务端绑定IP与端口地址(sync是同步阻塞至连接成功为止)
ChannelFuture cf = server.bind("127.0.0.1",8888).sync();
// 关闭服务端的方法(之后不会在这里关闭)
cf.channel().closeFuture().sync();
}finally {
// 优雅停止之前创建的两个Group
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}

①先创建两个EventLoopGroup事件组,然后创建一个ServerBootstrap服务端。

②将创建的两个事件组boss、worker绑定在服务端上,并指定服务端通道为NIO类型。

③在server上添加处理器,对新到来的Socket连接进行处理,在这里主要分为两类:

  • ChannelInitializer:连接到来时执行,主要是用于添加更多的处理器(只触发一次)。
  • addLast():通过该方式添加的处理器不会立马执行,而是根据处理器类型择机执行。

④为创建好的服务端绑定IP及端口号,调用sync()意思是阻塞至绑定成功为止。

⑤再创建一个EventLoopGroup事件组,并创建一个Bootstrap客户端。

⑥将事件组绑定在客户端上,由于无需处理连接事件,所以只需要一个事件组。

⑦指定Channel通道类型为NIO、添加处理器…..(同服务端类似)

⑧与前面服务端绑定的地址建立连接,由于默认是异步的,也要调用sync()阻塞。

⑨建立连接后,客户端将数据写入到通道准备发送,首先会先经过添加好的编码处理器,将数据的格式设为UTF-8

⑩服务器收到数据后,会先经过解码处理器,然后再去到入站处理,执行对应的Read()方法逻辑。

⑪客户端完成数据发送后,先关闭通道,再优雅关闭创建好的事件组。

⑫同理,服务端工作完成后,先关闭通道再停止事件组。

事件驱动机制

在Netty里,所有事件都来自ChannelEvent接口,这些事件涵盖监听端口、建立连接、读写数据等网络通讯的各个阶段。而事件的处理者就是ChannelHandler,这样,不但是业务逻辑,连网络通讯流程中底层的处理,都可以通过实现ChannelHandler来完成了。事实上,Netty内部的连接处理、协议编解码、超时等机制,都是通过handler完成的。当博主弄明白其中的奥妙时,不得不佩服这种设计!

下图描述了Netty进行事件处理的流程。Channel是连接的通道,是ChannelEvent的产生者,而ChannelPipeline可以理解为ChannelHandler的集合。

event driven in Netty

理解了Netty的事件驱动机制,我们现在可以来研究Netty的各个模块了。Netty的包结构如下:

1
2
3
4
5
6
7
8
9
10
11
org
└── jboss
└── netty
├── bootstrap 配置并启动服务的类
├── buffer 缓冲相关类,对NIO Buffer做了一些封装
├── channel 核心部分,处理连接
├── container 连接其他容器的代码
├── example 使用示例
├── handler 基于handler的扩展部分,实现协议编解码等附加功能
├── logging 日志
└── util 工具类