Java NIO 通道概念选择器使用示例详解
作者:lane
Java NIO通道
通道相当于一个传递物品的管子,两边都可以往对面传递东西。
有哪些通道?
对应文件IO和网络IO,通道也分为一个FileChannel和三个socket通道(SocketChannel、ServerSocketChannel和DatagramChannel)
基础一般情况下,一个通道必然关联着一个文件描述符或者是文件句柄。
通道可以是单向的,也可以是双向的(读写)。
socket通道可以是阻塞的或非阻塞的,FileChannel只支持阻塞模式。
Scatter和Gather 发散和汇聚
从字面理解,通道支持多个缓冲区同时读写。这样能够充分利用现代操作系统多核CPU功能,同时填充或排干多个缓冲区。
Scatter/Gather是一个简单却强大的概念,它是指在多个缓冲区上实现一个简单的 I/O 操作。对于一个 write 操作而言,数据是从几个缓冲区按顺序抽取(称为 gather)并沿着通道发送的。
缓冲区本身并不需要具备这种gather 的能力(通常它们也没有此能力)。该 gather 过程的效果就好比全部缓冲区的内容被连结起来,并在发送数据前存放到一个大的缓冲区中。对于 read 操作而言,从
通道读取的数据会按顺序被散布(称为 scatter)到多个缓冲区,将每个缓冲区填满直至通道中的数据或者缓冲区的最大空间被消耗完。
大多数现代操作系统都支持本地矢量 I/O(native vectored I/O)。当您在一个通道上请求一个Scatter/Gather 操作时,该请求会被翻译为适当的本地调用来直接填充或抽取缓冲区。这是一个很大
的进步,因为减少或避免了缓冲区拷贝和系统调用。Scatter/Gather 应该使用直接的 ByteBuffers 以从本地 I/O 获取最大性能优势。
Java NIO 选择器
从最基础的层面上来看,选择器提供了问询通道是否就绪操作I/O的能力,选择器可以监控注册在上面的多个通道,通道注册时会返回选择键(记录通道与选择器之间的关联关系),选择器管理者这些注册的键、和就绪状态键的集合
SelectableChannel
所有继承SelectableChannel的通道都可以在选择器中注册,FileChannel没有继承这个类,所以无法使用选择器
选择键(SelectionKey)
选择键是选择器的重点内容,选择器就绪的通道通过返回选择键集合来通知
public abstract class SelectionKey { public static final int OP_READ public static final int OP_WRITE public static final int OP_CONNECT public static final int OP_ACCEPT public abstract SelectableChannel channel(); public abstract Selector selector(); public abstract void cancel(); public abstract boolean isValid(); public abstract int interestOps(); public abstract void interestOps(int ops); public abstract int readyOps(); public final boolean isReadable() public final boolean isWritable() public final boolean isConnectable() public final boolean isAcceptable() public final Object attach(Object ob) public final Object attachment() }
选择键维护了通道和选择器之间的关联,可以通过选择键获取Channel或Selector,键对象表示一种特殊的关联关系,当这种关系需要终止时,可以调用cancel()方法取消,调用这个方法时,不会立即被取消,而是将这个键放到被取消的集合里,当Selector下次调用select()方法时会真正被清理掉。当通道关闭时,选择键会自动被取消,当选择器关闭时,所有键都会被清理掉。
一个选择器键包含有两个准备好的操作集合,包括感兴趣的事件集合instrest和就绪的操作集合ready,通过掩码保存
感兴趣的事件集合interestOps()
通常一个键的instrest注册时就已经确认,但是可以在注册后通过interestOps(newOps)传入一个新的ops来改变这个值
channel.register(this.selector, SelectionKey.OP_READ);
上面的代码注册的键interest包含read事件,可以在对通道IO异步处理时,改变这个ops来临时取消对read事件的关注,以防止重复处理未处理完的通道
就绪的操作集合readyOps()
通过这个方法返回就绪的操作,isReadable( ),isWritable( ),isConnectable( ), 和isAcceptable( )用来判断这些操作是否就绪,进行下一步的处理
示范选择器的使用
下面列举两个示例来示范选择器的使用
单选择器单线程
public abstract class AbstractNioServer { protected final static String CHARSET = "utf-8"; protected String ip; protected Integer port; protected Selector selector; public AbstractNioServer(String ip, Integer port) { this.ip = ip; this.port = port; } /** * 客户端连接请求 * * @param key */ protected abstract void accept(SelectionKey key) throws IOException; /** * 读取数据 * * @param key */ protected abstract void read(SelectionKey key) throws IOException; /** * 初始化服务器 * * @throws IOException */ public void init() throws IOException { //设置服务器地址端口 SocketAddress address = new InetSocketAddress(this.ip, this.port); //创建服务端通道 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //绑定服务器地址 serverSocketChannel.bind(address); //设置为非阻塞模式 serverSocketChannel.configureBlocking(false); //创建一个选择器 this.selector = Selector.open(); //将服务器通道注册到选择器中,ServerSocketChannel只支持accept事件注册,validOps返回16 serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT); } public void start() throws IOException { this.init(); while (true) { int count = this.selector.select(); if (count == 0) { //没有就绪的选择键 continue; } Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (!key.isValid()) { continue; } if (key.isAcceptable()) { //连接请求 accept(key); } else if (key.isReadable()) { //修改键的感兴趣事件,防止被 select 重复调用,处理完事件后及时恢复 key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); //读取消息 read(key); } iterator.remove(); } } } /** * 恢复键的感兴趣事件 * @param key */ protected void resumeInterOpsRead(SelectionKey key) { //还原key的感兴趣事件 key.interestOps(key.interestOps() | SelectionKey.OP_READ); //唤醒selector的select事件 key.selector().wakeup(); } }
public class SingleNioServer extends AbstractNioServer { public static void main(String[] args) { SingleNioServer server = new SingleNioServer("127.0.0.1", 1008); try { server.start(); } catch (IOException e) { e.printStackTrace(); } } public SingleNioServer(String ip, Integer port) { super(ip, port); } @Override protected void accept(SelectionKey key) throws IOException { ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); //SocketChannel支持 read、write、connect 事件注册,validOps返回13=1+4+8 SocketChannel channel = serverChannel.accept(); if (channel == null) { return; } System.out.println("新的连接请求"); channel.configureBlocking(false); //如果是阻塞通道进行注册,会抛出 IllegalBlockingModeException 异常 channel.register(this.selector, SelectionKey.OP_READ); } @Override protected void read(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); try { ByteBuffer buffer = ByteBuffer.allocate(1024); int len = channel.read(buffer); buffer.flip(); if (len > 0) { String str = Charset.forName(CHARSET).decode(buffer).toString(); System.out.println("客户端消息:" + str); String msg = "消息已收到"; byte[] sendData = msg.getBytes(CHARSET); ByteBuffer sendBuffer = ByteBuffer.wrap(sendData); channel.write(sendBuffer); super.resumeInterOpsRead(key); } else if (len == -1) { System.out.println("socket client close"); key.cancel(); channel.close(); } } catch (IOException ex) { key.cancel(); channel.close(); } } }
单选择器多线程
public class MulitpleNioServer extends AbstractNioServer { public static void main(String[] args) throws IOException { MulitpleNioServer server = new MulitpleNioServer("127.0.0.1", 1008); server.start(); } /** * 线程池 */ private ExecutorService executorService = Executors.newFixedThreadPool(5); public MulitpleNioServer(String ip, Integer port) { super(ip, port); } @Override protected void accept(SelectionKey key) throws IOException { ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); //SocketChannel支持 read、write、connect 事件注册,validOps返回13=1+4+8 SocketChannel channel = serverChannel.accept(); System.out.println("新的连接请求"); channel.configureBlocking(false); channel.register(this.selector, SelectionKey.OP_READ); } @Override protected void read(SelectionKey key) throws IOException { executorService.submit(new Runnable() { @Override public void run() { readData(key); } }); } private void readData(SelectionKey key) { SocketChannel channel = (SocketChannel) key.channel(); try { ByteBuffer buffer = ByteBuffer.allocate(1024); if (channel.isOpen()) { if (channel.isConnected()) { int len = channel.read(buffer); buffer.flip(); if (len > 0) { String str = Charset.forName(CHARSET).decode(buffer).toString(); System.out.println("客户端消息:" + str); String msg = "消息已收到"; byte[] sendData = msg.getBytes(CHARSET); ByteBuffer sendBuffer = ByteBuffer.wrap(sendData); channel.write(sendBuffer); } else if (len == -1) { System.out.println("socket client close1"); key.cancel(); channel.close(); } super.resumeInterOpsRead(key); } } } catch (IOException ex) { System.out.println("client is close2"); key.cancel(); try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } } }
以上就是Java NIO 通道概念选择器使用示例详解的详细内容,更多关于Java NIO 通道选择器的资料请关注脚本之家其它相关文章!