flyEn'blog

学习笔记之NIO与Netty线程模型

BIO/NIO

Redis、Zookeeper、Netty,游戏服务器等其实底层就是IO通信程序

image-20210525095246907

J2SE

BIO:阻塞IO(Blocking IO)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class SocketServer {

public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(9090);
while (true) {
System.out.println("等待连接。。。");
// 阻塞方法
Socket clientSocket = serverSocket.accept();
System.out.println("有客户端连接了。。");
}
}

private static void handler(Socket clientSocket) throws IOException {
byte[] bytes = new byte[1024];
System.out.println("准备read。。");
// 收到客户端的数据,阻塞方法,没有数据可读时就阻塞
int read = clientSocket.getInputStream().read(bytes);
System.out.println("read完毕");
if (read != -1) {
System.out.println("接收到客户端的数据:" + new String(bytes, 0, read));
}
}
}

程序启动的时候,会阻塞在Socket clientSocket = serverSocket.accept();这一行。

只有有客户端连接进来了之后,才会跳出阻塞,建立客户端Socket连接。

Telnet 客户端:与服务端建立连接(telnet localhost 9000

int read = clientSocket.getInputStream().read(bytes)这里的read也是个阻塞方法,如果客户端没有给服务端发数据就一直阻塞在那里,客户端一旦发了数据,就跳出阻塞。

很少用BIO写网络通信的东西了,不能支撑高并发。

但如果改成这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(9000);
while (true) {
System.out.println("等待连接。。。");
// 阻塞方法
Socket clientSocket = serverSocket.accept();
System.out.println("有客户端连接了。。");
// handler(clientSocket); 入门版本,不能支持高并发。
new Thread(new Runnable() {
@Override
public void run() {
try {
handler(clientSocket);
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
}

是可以支持高并发。但是C10K、C10M问题。

C10K、C10M问题:线程是会占有内存空间的。如果没有几万几十万的连接进来,没有及时发数据,会把服务端的内存撑爆掉。

如果用线程池,最多500个线程,内存可能不会爆掉,但是并发量会被线程数限制住了。

所以BIO程序再怎么优化并发都是有问题的。

比如500个连接,一直不给我们发数据,线程一直会保持着,再来一个客户端连接,是进不来了,因为线程用完了。

主要问题就是这些方法是阻塞的。

后来JDK1.4一些版本中,提供了NIO一些类库。

BIO阻塞模型图

同步阻塞模型,一个客户端连接对应一个处理线程。

image-20210525110609168

NIO:非阻塞IO(non block IO/ new IO)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class NioServer {
static List<SocketChannel> channelList = new ArrayList<>();

public static void main(String[] args) throws IOException {
// 创建NIO ServerSocketChannel,与BIO的ServerSocket类似
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9090));
// 设置ServerSocketChannel为非阻塞,如果配置为true变为BIO了
serverSocketChannel.configureBlocking(false);
System.out.println("服务启动成功");
while (true) {
// 非阻塞模式accept方法不会阻塞
// NIO的非阻塞是由操作系统内部实现的,底层调用了Linux内核的accept函数
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
System.out.println("连接成功");
// 设置SocketChannel为非阻塞
socketChannel.configureBlocking(false);
// 保存客户端连接在List中
channelList.add(socketChannel);
}
// 遍历连接进行数据读取
Iterator<SocketChannel> iterator = channelList.iterator();
while (iterator.hasNext()) {
SocketChannel sc = iterator.next();
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
// 非阻塞模式read方法不会阻塞,如果socketChannel.configureBlocking(true)会阻塞
int len = sc.read(byteBuffer);
// 如果有数据,把数据打印出来
if (len > 0) {
System.out.println("接收到消息:" + new String(byteBuffer.array()));
} else if (len == -1) {
iterator.remove();
System.out.println("客户端断开连接");
}
}
}
}
}

以上是基本入门级NIO模型。

不管来多少客户端,都会处理。

redis单线程模型为什么并发量那么多

后端的模型,也是NIO非阻塞的。

问题:

  1. while(true):会一直在那转,CPU可能百分之百。

  2. while循环假设十万个连接已建立连接,每秒可能只有几个十个才有数据发送来。大量的CPU的浪费和空转。

优化:

  1. 把专门有数据收发的放到集合中,只遍历这个集合。每次循环都能读到数据。
  2. 如果连接没有给服务端发数据,就不转,避免空转。

多路复用器——selector

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class NioSelectorServer {
public static void main(String[] args) throws IOException {
// 创建NIO ServerSocketChannel
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(9090));
// 设置ServerSocketChannel为非阻塞,如果配置为true变为BIO了
serverSocket.configureBlocking(false);
// 打开Selector处理Channel,即创建我epoll
Selector selector = Selector.open();
// 把ServerSocketChannel注册到selector上,并且selector对客户端accept连接操作感兴趣
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务启动成功");

while (true) {
// 阻塞等待需要处理的事件发生(IO事件:包括连接、读、写事件)
selector.select();
// 获取selector中注册的全部事件的 selectionKey实例
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();

// 遍历SelectionKey对事件进行处理
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 如果是OP_ACCEPT事件,则进行连接获取和事件注册
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = server.accept();
socketChannel.configureBlocking(false);
// 这里只注册了读事件,如果需要给客户端发送数据可以注册写事件。
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("客户端连接成功");
} else if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
// 非阻塞模式read方法不会阻塞,如果socketChannel.configureBlocking(true)会阻塞
int len = socketChannel.read(byteBuffer);
// 如果有数据,把数据打印出来
if (len > 0) {
System.out.println("接收到消息:" + new String(byteBuffer.array()));
} else if (len == -1) {
iterator.remove();
System.out.println("客户端断开连接");
}
}
// 从事件集合里删除本次处理的key,防止下次select重复处理
iterator.remove();
}
}
}
}

NIO线程模型图

image-20210525104645765

selector、epoll

1
Selector selector = Selector.open();

Linux的JDK底层实际上是EPollSelectorImpl对象。

image-20210525115743676

初始化的时候会初始一个 EPollArrayWrapper一个数组。

Hotspot VM源码:

image-20210525190227506

本地方法,native

怎么找底层的本地方法(C/C++实现)呢?

下载openjdk,全文搜索,类名_方法 (EPollArrayWrapper_epollCreate

EpollArrayWrapper.c

image-20210525190657773

Epfd: 文件描述符。可以看做 epoll Instance的索引

epoll_create:Linux系统函数,Linux内核的源代码。

去到Linux系统:man epoll_create

image-20210525190955254

创建了一个epoll的结构体。Selector是对epoll的封装。

serverSocket.register(selector, SelectionKey.OP_ACCEPT);

register底层调用了implRegister方法(EpollSelectorImpl)

image-20210525191415080

fd:serverSocketChannel、SocketChannel等 文件描述符

实际上把channel丢到了selector内部的array集合里去。

selector.select()

java程序实现通信是底层调用了大量的系统函数让他们进行通信的。

有事件发生之后,是先被操作系统感知到,中断程序会把这些事件

放到就绪事件列表。

epoll_wait:监听。(阻塞,让出CPU)

NIO其实是底层用了epoll的几个系统函数帮我们做的这一套实现(放事件,epoll_wait监听,没有事件进来就阻塞着)。

select、poll、epoll模型的区别

都是Linux的内核函数。

jdk的1.4是用select实现。

image-20210525193634819

有channel的时候,select会遍历所有的channel。十万个channel也会轮询一遍。有大量的空循环。(最多不能连1024)

1.4升级了一版——poll。

逐个遍历,还是会有空转。(但是没有连接上限)

jdk1.5以上——epoll。

redis的底层线程模型

NIO、多路复用

redis源码:ae_epoll.cae_kqueue.c(不同操作系统用的不同)

image-20210525194114632

通过C语言的ae_epoll.c实现的。

redis启动的时候

image-20210525194850438

image-20210525194949063

监听channel注册过来的事件。

image-20210525195101328

Redis、Zookeeper、Netty,游戏服务器等其实底层就是IO通信程序。

redis、Netty底层用的NIO。

Zookeeper:NIO、BIO都用了,用在不同的场景。

Netty主从Reactor高并发线程模型

Netty:IO通信程序

真正关心拿到数据之后怎么开发我们的业务逻辑。

netty帮我们把一系列的事情封装好。

重复性的代码封装好放到框架里,最终它就给你数据,你来处理数据。

netty的入门程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class NettyServer {

public static void main(String[] args) {
// 创建两个线程组bossGroup和workerGroup,含有的子线程NIOEventLoop的个数默认为cpu核数的两倍
// bossGroup只是处理连接请求,真正的和客户端业务处理,会交给workerGroup完成
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
// 创建服务器端的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
// 使用链式编程来配置参数
// 设置两个线程组
bootstrap.group(bossGroup, workerGroup)
// 使用NIOServerSocketChannel作为服务器的通道实现
// 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
// 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 对workerGroup的SocketChannel设置处理器
ch.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("netty server start");
// 绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
// 启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
ChannelFuture cf = bootstrap.bind(9000).sync();
//给cf注册监听器,监听我们关心的事件
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口9000成功");
} else {
System.out.println("监听端口9000失败");
}
}
});
// 对通道关闭进行监听,closeFuture是异步操作,监听通道关闭
// 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* NettyServerHandler
* 自定义Handler需要继承netty规定好的某个HandlerAdapter(规范)
*
* @author fanghaiying
* @date 2021/5/26
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelActive(ChannelHandlerContext ctx){
System.out.println("客户端连接通道建立完成");
}

/**
* 读取客户端发送的数据
* @param ctx 上下文对象,含有通道channel,管道pipeline
* @param msg 就是客户端发送的数据
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
System.out.println("收到客户端的消息" + buf.toString(CharsetUtil.UTF_8));
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(buf);
}
}

我们发现Netty框架的目标就是让你的业务逻辑从网络基础应用编码中分离出来,让你可以专注业务的开 发,而不需写一大堆类似NIO的网络处理操作。

可以分多个selector(多路复用器),连接事件用一个,读事件用另一个

主从selector

image-20210526123336878

Netty:底层就是对NIO程序的封装。

线程池:Boos Group主线程池1、Worker Group从线程池8

redis为什么不用netty底层来通信?

要用原生的epol函数而不用netty呢?

netty:大量的异步、大量的多线程模型。优化服务的性能。

如果用netty来实现就是多线程了。redis6.0多线程,但是核心处理服务还是单线程的,只是IO通信数据解析用的多线程。

需要控制并发安全问题了。

分布式锁、幂等控制都是基于单线程模型来实现的。

响应式编程

Reactor Pattern

把我们的IO事件分发给对应的处理方法去处理。基于事件、驱动

NIO其实也是响应式编程模型。只处理事件,关心事件发生后的处理逻辑就可以了。

真正就绪事件列表只不过是操作系统内核中断程序(epoll_wait)放的,Java程序只需要处理。

selector相当于观察者。监听着事件发生。

Netty集群

单机百万连接Netty高并发架构

Fork me on GitHub