flyEn'blog

学习笔记之netty实践及源码剖析

基于Netty实现的一个聊天室

简易聊天室实现(netty入门级别)。

代码实现

ChatServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package chat;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import netty.NettyServerHandler;

/**
* ChatServer
*
* @author fanghaiying
* @date 2021/5/26
*/
public class ChatServer {

public static void main(String[] args) {

// 创建两个线程组bossGroup和workerGroup,含有的子线程NIOEventLoop的个数默认为cpu核数的两倍
// bossGroup只是处理连接请求,真正的和客户端业务处理,会交给workerGroup完成
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
// 创建服务器端的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
// 使用链式编程来配置参数
// 设置两个线程组
bootstrap.group(bossGroup, workerGroup)
// 使用NIOServerSocketChannel作为服务器的通道实现
// 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
// 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 加入处理器
ChannelPipeline pipeline = ch.pipeline();
// 向pipeline加入解码器
pipeline.addLast("decoder", new StringDecoder());
// 向pipeline加入编码器
pipeline.addLast("encoder", new StringEncoder());
// 加入自己的业务处理handler
pipeline.addLast(new ChatServerHandler());
}
});
System.out.println("聊天室server启动...");
// 绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
// 启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
ChannelFuture cf = bootstrap.bind(9000).sync();
// 对通道关闭进行监听,closeFuture是异步操作,监听通道关闭
// 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

ChatServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package chat;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;

/**
* ChatServerHandler
*
* @author fanghaiying
* @date 2021/5/26
*/
public class ChatServerHandler extends SimpleChannelInboundHandler<String> {
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

@Override
public void channelActive(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
// 将该客户加入聊天的信息推送给其他在线的客户端
// 该方法会将channelGroup中所有的channel遍历,并发送消息
channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 上线了" + "\n");
channelGroup.add(channel);
System.out.println(ctx.channel().remoteAddress() + " 上线了" + "\n");
}

/**
* 读取客户端发送的数据
*
* @param ctx 上下文对象,含有通道channel,管道pipeline
* @param msg 就是客户端发送的数据
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
// 获取到当前的channel
Channel channel = ctx.channel();
// 这时我们遍历channelGroup,根据不同的情况,回送不同的消息
channelGroup.forEach(ch -> {
if (channel != ch) {
// 不是当前的channel,转发消息
ch.writeAndFlush("[客户端]" + channel.remoteAddress() + " 发送了消息:" + msg + "\n");
} else {
ch.writeAndFlush("[自己]发送了消息:" + msg + "\n");
}
});
}
}

ChatClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package chat;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Scanner;

/**
* ChatClient
*
* @author fanghaiying
* @date 2021/5/26
*/
public class ChatClient {

public static void main(String[] args) {
//客户端需要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端启动对象
//注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
//设置线程组
bootstrap.group(group)
// 使用 NioSocketChannel 作为客户端的通道实现
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
// 加入处理器
ChannelPipeline pipeline = channel.pipeline();
// 向pipeline加入解码器
pipeline.addLast("decoder", new StringDecoder());
// 向pipeline加入编码器
pipeline.addLast("encoder", new StringEncoder());
// 加入自己的业务处理handler
pipeline.addLast(new ChatClientHandler());
}
});
//启动客户端去连接服务器端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();
Channel channel = channelFuture.channel();
System.out.println("=======" + channel.localAddress() + "=======");
// 客户端需要输入信息,创建一个扫描器
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
// 通过channel发送到服务端
channel.writeAndFlush(msg);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}

}
}

ChatClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package chat;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
* ChatClientHandler
*
* @author fanghaiying
* @date 2021/5/26
*/
public class ChatClientHandler extends SimpleChannelInboundHandler<String> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println(msg.trim());
}
}

聊天室运行

image-20210526140644722

这时Client1连接进来,打印客户端1的ip标识。

之后client2、3连接进来,开始聊天室对话。

Client1:

image-20210526140845800

Client2发送了一条消息:hello 大家好

image-20210526140940022

Client1、Client 3收到:

image-20210526140959119

image-20210526141012711

客户端1、2、3就可以通信起来了。

Netty源码剖析

image-20210526123336878

1
2
3
4
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>()...

实际上是将两个线程池(主线程池、从线程池)放到ServerBootstrap的成员变量中。

channel类、option、handler对象放到ServerBootstrap的成员变量中。

Selector是在线程池数组中。

1
ChannelFuture cf = bootstrap.bind(9000).sync();

关键代码:

bind() ---> initAndRegister() ---> channelFactory.newChannel() ---> this.clazz.getConstructor().newInstance()

即构造NioServerSocketChannel类的对象。

---> this(newSocket(DEFAULT_SELECTOR_PROVIDER));

---> newSocket(..) ---> provider.openServerSocketChannel()

跟NIO中

1
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

open()方法:

1
2
3
public static ServerSocketChannel open() throws IOException {
return SelectorProvider.provider().openServerSocketChannel();
}

做的事情是一样的,都是返回一个ServerSocketChannel对象。

---> this(newSocket(..)) --> super((Channel)null, channel, 16); --> AbstractNioChannel() --> ch.configureBlocking(false);

设置为非阻塞和NIO中 serverSocket.configureBlocking(false);一样。

对应NIO中Selector selector = Selector.open();其实是在初始化线程池的时候就开启了。

在初始化NioEventLoop的时候就会帮我们构建selector。

image-20210526211004905

NIO建立Socket完了之后,会有一些事件的注册。serverSocket.register(selector, SelectionKey.OP_ACCEPT);

这行对应netty里是在哪里封装的呢?

还是在bind()--->initAndRegister()中。

1
ChannelFuture regFuture = this.config().group().register(channel);

register() --> register0()

1
this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);

this.javaChannel():serverSocket

unwrappedSelector() :selector

相当于NIO中的:

1
serverSocket.register(selector, SelectionKey.OP_ACCEPT);

register()

1
2
3
4
5
6
7
8
9
10
11
// ...
if (eventLoop.inEventLoop()) {
this.register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
public void run() {
AbstractUnsafe.this.register0(promise);
}
});
// ...

---> execute()---> startThread()

1
SingleThreadEventExecutor.this.run();

--->run() --->

1
2
3
select(wakenUp.getAndSet(false));
--->
int selectedKeys = selector.select(timeoutMillis);

NIO中对应selector.select();

接下来就是事件的处理,怎么来帮我们处理事件

跳回到run() --> processSelectedKeys();---> run()

1
pipeline.fireChannelRead(byteBuf);

--> invokeChannelRead()

Netty高并发高性能架构设计精髓

  • 主从Reactor线程模型
  • NIO多路复用非阻塞
  • 无锁串行化设计思想
  • 支持高性能序列化协议
  • 零拷贝(直接内存的使用)
  • ByteBuffer内存池设计
  • 灵活的TCP参数配置能力
  • 并发优化

封装的过程中做了哪些优化?

零拷贝

image-20210527094601370

HeapByteBuffer:写数据是写在这个对象里面的字节数组。

DirectByteBuffer:分配的内存不是在堆里,是在物理内存中,这个对象是这块内存的一个引用

底层是调用了native本地方法,native方法中调了操作系统内核函数malloc分配了一块物理内存。返回了一个指针指向了这块内存空间。转成java类型返回给上游。

ByteBuffer Buffer = ByteBuffer.allocateDirect(1000)

Buffer内部address属性:实际上是通过malloc函数开辟的物理内存地址

Netty零拷贝原理

image-20210527102114092

用DirectByteBuffer,节约了一半的拷贝时间。

零拷贝:物理内存和堆之间没有拷贝。

如果所有的场景都用直接内存,会有什么问题?

分配堆内存快,如果没有数据的大量传输、各种拷贝,除了I/O这种程序之外一般都用堆内存,内存的分配和销毁都比较快。

Netty + Zookeeper 集群

Netty单机百万连接性能调优

Fork me on GitHub