IO的几种方式

分类专栏:
NIO相关

文章标签:
Java自学
NIO
原创

IO

IO的方式通常分为几种,同步阻塞的BIO、同步非阻塞的NIO、异步非阻塞的AIO。

1.什么是BIO

BIO(同步阻塞):每一个IO请求都会有一个线程去处理,如果数据没有准备就绪,线程会一直等待。直到数据读取完毕线程才会释放,在此期间,进程不回去做任何其他任务,这种模式会浪费一定的线程资源。

BIO

2.什么是NIO

NIO(同步非阻塞):NIO的优点在于首先基于缓存读写文件,能够批量操作,然后用channel双向读写数据,减少每次打开断开流的资源消耗。引入selecore的概念,用一个线程管理多个通道,大大减少线程开销。
多路复用技术:如果每一个IO请求都需要一个进程去处理,如果没有控制客户端的数量,那么有多少IO请求就需要多少进程去处理,也就是所谓的多进程并发处理。问题在于这个模式消耗进程资源。如果在一个进程里面开辟多个通道,然后进程回去轮询准备就绪的通道,被选中的通道去进行读写操作,这就是多路复用技术。而NIO的selector是多路复用的一种实现。在操作系统设计中还有多种其他实现,select,poll,epoll。

NIO

3.什么是AIO

与NIO不同,当进行读写操作时,只须直接调用API的read或write方法即可。这两种方法均为异步的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序。 即可以理解为,read/write方法都是异步的,完成后会主动调用回调函数。

4.这里我们主要讲讲NIO三大元素

同步和异步,关注的是消息通知的机制
阻塞和非阻塞,关注的是等待消息过程中的状态

1)Channel

FileChannel 文件管道的数据
Pipe.SinkChannel
Pipe.SourceChannel 线程间通信的管道
ServerSocketChannel
SocketChannel 用于TCP网络通信的管道
DatagramChannel 用于UDP网络通信的管道
public class ChannelTest {

    public static void main(String[] args) throws Exception {

        File file = new File("nio.txt");
        if (!file.exists()) {
            file.createNewFile();
        }

        FileOutputStream os = new FileOutputStream(file);
        // 通道通过stream获取
        FileChannel channel = os.getChannel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        String str = "hello nio";
        buffer.put(str.getBytes());

        // 写完成  需要调用flip方法刷新
        buffer.flip();
        channel.write(buffer);
        channel.close();
        os.close();
    }
}

2)Buffer

capacity 总体容量大小
limit 存储容量的大小,是可读写和不可读写的界线
position 已读容量的大小,已读和未读区域的界线

Buffer

使用原理

a) 初始化,给定总容量,position=0, limit=capacity
b) 当使用put方法存入数据是,通过position来记录存储的容量变化,position不断后移,直到存储结束(写完成)
c)写完成需要调用flip方法刷新,limit=position,position=0
保障limit记录的是可读写区域的大小,position已读部分重置为空
d) 读数据直到读完成,需要调用clear方法,position=0, limit=capacity
public class BufferTest {

    public static void main(String[] args) {

        CharBuffer buffer = CharBuffer.allocate(8);
        System.out.println("capacity:" + buffer.capacity());
        System.out.println("limit:" + buffer.limit());
        System.out.println("position:" + buffer.position());

        buffer.put('p');
        buffer.put('z');
        System.out.println("----存入p&z");
        System.out.println("position:" + buffer.position());

        buffer.flip();
        System.out.println("------调用flip");
        System.out.println("limit:" + buffer.limit());
        System.out.println("position:" + buffer.position());
        
        System.out.println("------读取数据");
        // 不传参代表获取第一个,传参代表指定索引位置
        // 当传参有索引时,position不变化
        System.out.println(buffer.get());
        System.out.println("position:" + buffer.position());

        // 标记  存储当前的位置position
        buffer.mark();
        System.out.println(buffer.get());
        System.out.println("position:" + buffer.position());

        // 回退  退回到mark记录的位置
        buffer.reset();
        System.out.println("-------调用reset");
        System.out.println("position:" + buffer.position());
        System.out.println(buffer.get(1));
        System.out.println("position:" + buffer.position());

          //buffer的遍历方式
          while (buffer.hasRemaining()){
              System.out.println(buffer.get());
          }

        //clear清空的是索引位置  对象依然存在
        buffer.clear();
        System.out.println("-------调用clear");
        System.out.println("limit:" + buffer.limit());
        System.out.println("position:" + buffer.position());
    }
}

3)Selector

三个元素:Selector选择器、SelectableChannel可选择的通道、SelectionKey选择键

本质上,Selector是监听器
监听的是通道是否有我们关心的操作产生,操作对应的是事件(连接、接收、读/写)
使用SelectionKey代表具体的事件
在确保通道是可选择的情况下,将通道注册进选择器中
此时Selector维护的是,通道和事件之间的关联关系

Selector

1.Selector,管理被注册的通道集合,以及他们的状态

2.SelectableChannel,是一个抽象类,提供了通道可被选择需要实现的api

FileChannel就不是可选择的,Socket相关的通道都是可选择的
一个通道可以被注册到多个选择器
多个通道可以注册到一个选择器上,但一个通道只能在一个选择器中注册一次

3.SelectionKey,封装了要监听的事件,连接、接收、读、写

使用方式

a、首先通过open方法,获取通道,将通道设置为非阻塞的
b、通过open方法,获取选择器,将通道注册进选择器中,伴随设置通道要处理的事件(OP_ACCEPT)
c、轮询选择器,当前是否有要处理的操作 select() > 0?
如果有,要获取,待处理操作的集合Set<SelectionKey> , 进行遍历到SelectionKey时,判断对应哪种操作,不同的操作设置不同的处理方式
如OP_ACCEPT,接收客户端通道并进行注册,监听后续处理的事件,如OP_WRITE
如OP_WRITE,通过key的方法获取通道本身,读取数据并继续监听事件,如OP_READ
public class NioServer {

    public static void main(String[] args) throws Exception {

        // 创建一个服务端的通道  调用open方法获取
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 绑定ip地址和端口号
        SocketAddress address = new InetSocketAddress("127.0.0.1", 4321);
        serverChannel.socket().bind(address);
        // 接收客户端的连接
        SocketChannel socketChannel = serverChannel.accept();
        // 数据处理都要通过buffer
        ByteBuffer buffer = ByteBuffer.allocate(128);
        buffer.put("hello client".getBytes());
        buffer.flip();
        socketChannel.write(buffer);
        // 把数据读入到readBuffer中
        ByteBuffer readBuffer = ByteBuffer.allocate(128);
        socketChannel.read(readBuffer);
        readBuffer.flip();
        StringBuffer stringBuffer = new StringBuffer();

        while (readBuffer.hasRemaining()) {
            stringBuffer.append((char) readBuffer.get());
        }

        System.out.println("client data:" + stringBuffer.toString());
        socketChannel.close();
        serverChannel.close();
    }
}

public class NioClient {

    public static void main(String[] args) throws Exception {

        SocketChannel socketChannel = SocketChannel.open();
        SocketAddress address = new InetSocketAddress("127.0.0.1", 4321);
        socketChannel.connect(address);
        // 先写后读的逻辑处理
        ByteBuffer writeBuffer = ByteBuffer.allocate(128);
        writeBuffer.put("hello server from client".getBytes());
        writeBuffer.flip();
        socketChannel.write(writeBuffer);
        // 通过read方法  将数据写入到buffer中
        ByteBuffer readBuffer = ByteBuffer.allocate(128);
        socketChannel.read(readBuffer);
        readBuffer.flip();
        StringBuffer stringBuffer = new StringBuffer();

        while (readBuffer.hasRemaining()) {
            stringBuffer.append((char) readBuffer.get());
        }

        System.out.println("server data:" + stringBuffer.toString());
        socketChannel.close();
    }
}

public class NioSelectorServer {

    public static void main(String[] args) throws Exception {

        // 创建一个服务端的通道  调用open方法获取
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 绑定ip地址和端口号
        SocketAddress address = new InetSocketAddress("127.0.0.1", 4321);
        serverChannel.socket().bind(address);
        // 将此channel设置为非阻塞的
        serverChannel.configureBlocking(false);
        // 打开一个选择器
        Selector selector = Selector.open();
        // 将通道注册进选择器中,声明选择器监听的事件
        // 第一个被注册的事件往往是接收连接
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        // 通过选择器来管理通道
        // 需要感知  被监听的通道是否有操作被触发
        // 当select方法返回值代表要处理的操作个数>0,开始处理

        while (true) {
            int ready = selector.select();
            if (ready == 0) {
                continue;
            }
            // 进一步获取  要执行的操作集合
            Set<SelectionKey> set = selector.selectedKeys();
            Iterator<SelectionKey> iterator = set.iterator();

            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                // 取出操作后  直接移除  避免重复处理
                iterator.remove();

                //可接收的
                if (key.isAcceptable()) {
                    // 处理OP_ACCEPT事件
                    // 通过server通道 获取到客户端的对应channel通道并注册写事件
                    SocketChannel socketChannel = serverChannel.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector, SelectionKey.OP_WRITE);

                    //可写的
                } else if (key.isWritable()) {
                    // 处理OP_WRITE事件
                    // 通过key的channel方法  获取到事件对应的通道
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer writeBuffer = ByteBuffer.allocate(128);
                    writeBuffer.put("hello from 4321".getBytes());
                    writeBuffer.flip();
                    socketChannel.write(writeBuffer);
                    // 再把读事件注册进来
                    key.interestOps(SelectionKey.OP_READ);

                    //可读的
                } else if (key.isReadable()) {
                    // 处理OP_READ事件
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer readBuffer = ByteBuffer.allocate(128);
                    socketChannel.read(readBuffer);
                    readBuffer.flip();
                    StringBuffer stringBuffer = new StringBuffer();

                    while (readBuffer.hasRemaining()) {
                        stringBuffer.append((char) readBuffer.get());
                    }

                    System.out.println("client data:" + stringBuffer.toString());

                    //可连接的
                } else if (key.isConnectable()) {
                }
            }
        }
    }
}
  • 作者:潘震
  • 评论

    pz
    Test
    留言