Unix 下共有五种 I/O 模型:阻塞 I/O、非阻塞 I/O、I/O 多路复用(select、poll、epoll)信号驱动 I/O(SIGIO)和异步 I/O(Posix.1的aio_系列函数),而java除了其中的信号驱动式之外,其他均有支持;
理解I/O模型,首先要理解一个输入操作所必须包含的2个阶段:
对于套接字上的输入操作,第一步通常涉及等待数据从网络中到达。当所等待的分组到达时,它被复制到内核中的某个缓冲区。第二步就是把数据从内核缓冲区复制到应用进程缓冲区。
进程调用recvfrom,其系统调用直到数据包到达且被复制到应用进程的缓冲中或者发生错误才返回。这就是阻塞式IO模型的微观图示。
针对阻塞IO模型的传统服务设计则如上图,服务器对每个client连接都会启动一个专门的线程去维护,服务器中的逻辑Handler需要在各自的线程中执行,这种模型对线程的需求较多,面对高并发的场景,会造成CPU资源浪费;原来的tomcat就是这种模式,只不过现在也支持NIO了。
常见写法(服务端):
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @program test
* @description: bio
* @author: cys
* @create: 2020/06/30 16:20
*/
public class BIOServer {
//线程池机制
//1. 创建一个线程池
//2. 如果有客户端连接了,创建一个线程与之通讯(单独写一个方法)
public static void main(String[] args) throws IOException {
ExecutorService executorService = Executors.newCachedThreadPool();
ServerSocket serverSocket = new ServerSocket(6666);
System.out.println("服务器启动了");
while (true) {
//监听,等待客户端连接
final Socket socket = serverSocket.accept();
System.out.println("有客户端连接");
executorService.submit(new Runnable() {
@Override
public void run() {
handler(socket);
}
});
}
}
private static void handler(Socket socket) {
byte[] bytes = new byte[1024];
try (InputStream inputStream = socket.getInputStream();) {
int read = 0;
while ((read = inputStream.read(bytes)) != -1) {
System.out.println(new String(bytes, 0, read));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
如图I/O复用模型将阻塞点由真正的I/O系统调用转移到了对select、poll或者epoll系统函数的调用上。单纯看这个微观图,有可能还会觉得与阻塞I/O区别不大,甚至于我们多增加了I/O调用还增加了性能损耗。其实不然,使用select以后最大的优势是用户可以在一个线程内同时处理多个连接的I/O请求
java NIO实现一个聊天的功能(服务端)
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
/**
* @program test
* @description:
* @author: cys
* @create: 2020/07/02 10:37
*/
public class GroupChatServer {
private Selector selector;
private ServerSocketChannel listenChannel;
private static final int port = 6667;
public GroupChatServer() {
try {
selector = Selector.open();
listenChannel = ServerSocketChannel.open();
listenChannel.socket().bind(new InetSocketAddress(port));
listenChannel.configureBlocking(false);
listenChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
public void listen() {
try {
while (true) {
if (selector.select(2000) > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isAcceptable()) {//连接事件
SocketChannel socketChannel = listenChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {//读取事件,即通道可读了
read(key);
}
iterator.remove();
}
} else {
System.out.println("等待。。。。");
}
}
} catch (IOException e) {
} finally {
}
}
//读取客户端消息
private void read(SelectionKey selectionKey) {
SocketChannel socketChannel = null;
try {
//定义一个SocketChannel
socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = socketChannel.read(buffer);
if (count > 0) {//读取到了数据
String str = new String(buffer.array());
System.out.println("from 客户端" + str);
//向其他的客户端转发消息
sendInfoToOtherClients(str, socketChannel);
}
} catch (IOException e) {
try {
System.out.println(socketChannel.getRemoteAddress() + "离线了");
selectionKey.cancel();
socketChannel.close();
} catch (IOException ex) {
ex.printStackTrace();
}
} finally {
}
}
private void sendInfoToOtherClients(String msg, SocketChannel self) throws IOException {
System.out.println("服务器转发消息中");
//遍历所有注册到Selector上的channel,并排除self
for (SelectionKey key : selector.keys()) {
Channel targetChannel = key.channel();
if (targetChannel instanceof SocketChannel && targetChannel != self) {
SocketChannel desc = (SocketChannel) targetChannel;
//将msg存储到buffer
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
desc.write(buffer);
}
}
}
public static void main(String[] args) {
new GroupChatServer().listen();
}
}
如图,我们也可以用信号,让内核在描述符就绪时发送SIGIO信号通知我们。我们称这种模型为信号驱动式I/O(signal-driven I/O).
它由POSIX规范定义,工作机制是:告知内核启动某个操作,并让内核在整个操作(包括将数据从内核复制到我们自己的缓冲区)完成后通知我们。与信号驱动式I/O不同的是:信号驱动式I/O是由内核通知我们何时可以启动一个I/O操作,而异步I/O模型是由内核通知我们I/O操作何时完成。
如图,前四种模型都属于同步I/O模型,因为其中真正的 I/O操作将阻塞进程。而异步I/O是完全不会阻塞请求I/O的。目前 Windows 下通过 IOCP 实现了真正的异步 I/O。而在 Linux 系统下,目前 AIO 并不完善,因此在 Linux 下实现高并发网络编程时都是以 IO 复用模型模式为主。