基于Netty实现的一个聊天室
代码实现
ChatServer
1 | package chat; |
ChatServerHandler
1 | package chat; |
ChatClient
1 | package chat; |
ChatClientHandler
1 | package chat; |
聊天室运行
这时Client1连接进来,打印客户端1的ip标识。
之后client2、3连接进来,开始聊天室对话。
Client1:
Client2发送了一条消息:hello 大家好
Client1、Client 3收到:
客户端1、2、3就可以通信起来了。
Netty源码剖析
1 | bootstrap.group(bossGroup, workerGroup) |
实际上是将两个线程池(主线程池、从线程池)放到ServerBootstrap的成员变量中。
channel类、option、handler对象放到ServerBootstrap的成员变量中。
Selector是在线程池数组中。
1 | ChannelFuture cf = bootstrap.bind(9000).sync(); |
关键代码:
bind() ---> initAndRegister() ---> channelFactory.newChannel() ---> this.clazz.getConstructor().newInstance()
即构造NioServerSocketChannel类的对象。
---> this(newSocket(DEFAULT_SELECTOR_PROVIDER));
---> newSocket(..) ---> provider.openServerSocketChannel()
跟NIO中
1 | ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); |
open()
方法:
1 | public static ServerSocketChannel open() throws IOException { |
做的事情是一样的,都是返回一个ServerSocketChannel
对象。
---> this(newSocket(..)) --> super((Channel)null, channel, 16); --> AbstractNioChannel() --> ch.configureBlocking(false);
设置为非阻塞和NIO中 serverSocket.configureBlocking(false);
一样。
对应NIO中Selector selector = Selector.open();
其实是在初始化线程池的时候就开启了。
在初始化NioEventLoop
的时候就会帮我们构建selector。
NIO建立Socket完了之后,会有一些事件的注册。serverSocket.register(selector, SelectionKey.OP_ACCEPT);
这行对应netty里是在哪里封装的呢?
还是在bind()--->initAndRegister()
中。
1 | ChannelFuture regFuture = this.config().group().register(channel); |
register() --> register0()
1 | this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this); |
this.javaChannel():serverSocket
unwrappedSelector() :selector
相当于NIO中的:
1 | serverSocket.register(selector, SelectionKey.OP_ACCEPT); |
register()
1 | // ... |
---> execute()---> startThread()
1 | SingleThreadEventExecutor.this.run(); |
--->run() --->
1 | select(wakenUp.getAndSet(false)); |
NIO中对应selector.select();
接下来就是事件的处理,怎么来帮我们处理事件
跳回到run() --> processSelectedKeys();---> run()
1 | pipeline.fireChannelRead(byteBuf); |
--> invokeChannelRead()
Netty高并发高性能架构设计精髓
- 主从Reactor线程模型
- NIO多路复用非阻塞
- 无锁串行化设计思想
- 支持高性能序列化协议
- 零拷贝(直接内存的使用)
- ByteBuffer内存池设计
- 灵活的TCP参数配置能力
- 并发优化
封装的过程中做了哪些优化?
零拷贝
HeapByteBuffer:写数据是写在这个对象里面的字节数组。
DirectByteBuffer:分配的内存不是在堆里,是在物理内存中,这个对象是这块内存的一个引用
底层是调用了native本地方法,native方法中调了操作系统内核函数malloc分配了一块物理内存。返回了一个指针指向了这块内存空间。转成java类型返回给上游。
ByteBuffer Buffer = ByteBuffer.allocateDirect(1000)
:Buffer内部address属性:实际上是通过malloc函数开辟的物理内存地址
Netty零拷贝原理
用DirectByteBuffer,节约了一半的拷贝时间。
零拷贝:物理内存和堆之间没有拷贝。
如果所有的场景都用直接内存,会有什么问题?
分配堆内存快,如果没有数据的大量传输、各种拷贝,除了I/O这种程序之外一般都用堆内存,内存的分配和销毁都比较快。
Netty + Zookeeper 集群
Netty单机百万连接性能调优