疯狂的小鸡

IO-java.nio

字数统计: 4.7k阅读时长: 18 min
2018/10/12 Share

NIO概述

Java NIO(New IO)是从Java 1.4版本开始引入的一个新的IO API,可以替代标准的Java IO API。

Java NIO 由以下几个核心部分组成:

  • Channels
  • Buffers
  • Selectors

虽然Java NIO 中除此之外还有很多类和组件,但在我看来,Channel,Buffer 和 Selector 构成了核心的API。其它组件,如Pipe和FileLock,只不过是与三个核心组件共同使用的工具类。

Channel

Channel和Java NIO的通道类似流,但又有些不同:

  • 既可以从通道中读取数据,又可以写数据到通道。但流的读写通常是单向的。

  • 通道可以异步地读写。

  • 通道中的数据总是要先读到一个Buffer,或者总是要从一个Buffer中写入。

Java NIO中最重要的通道的实现:

  • FileChannel:从文件中读写数据。

  • DatagramChannel:能通过UDP读写网络中的数据。

  • SocketChannel:能通过TCP读写网络中的数据。

  • ServerSocketChannel:可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。

FileChannel

Java NIO中的FileChannel是一个连接到文件的通道。可以通过文件通道读写文件。

FileChannel无法设置为非阻塞模式,它总是运行在阻塞模式下。

1)打开FileChannel
在使用FileChannel之前,必须先打开它。但是,我们无法直接打开一个FileChannel,需要通过使用一个InputStream、OutputStream或RandomAccessFile的getChannel方法来获取一个FileChannel实例

1
2
RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt""rw");  
FileChannel inChannel = aFile.getChannel();

2)从FileChannel读取数据
调用FileChannel.read()方法。该方法将数据从FileChannel读取到Buffer中。

read()方法返回的int值表示了有多少字节被读到了Buffer中。如果返回-1,表示到了文件末尾。

3)向FileChannel写数据
使用FileChannel.write()方法向FileChannel写数据,该方法的参数是一个Buffer。FileChannel.write()是在while循环中调用的。

因为无法保证write()方法一次能向FileChannel写入多少字节,因此需要重复调用write()方法,直到Buffer中已经没有尚未写入通道的字节。

1
2
3
while(buf.hasRemaining()) {  
    channel.write(buf);  
}

4)关闭FileChannel

1
channel.close();

5)FileChannel的position方法
有时可能需要在FileChannel的某个特定位置进行数据的读/写操作。可以通过调用position()方法获取FileChannel的当前位置。也可以通过调用position(long pos)方法设置FileChannel的当前位置。

6)FileChannel的size方法
FileChannel实例的size()方法将返回该实例所关联文件的大小

7)FileChannel的truncate方法
可以使用FileChannel.truncate()方法截取一个文件。截取文件时,文件中指定长度后面的部分将被删除。

8)FileChannel的force方法
FileChannel.force()方法将通道里尚未写入磁盘的数据强制写到磁盘上。出于性能方面的考虑,操作系统会将数据缓存在内存中,所以无法保证写入到FileChannel里的数据一定会即时写到磁盘上。要保证这一点,需要调用force()方法。

force()方法有一个boolean类型的参数,指明是否同时将文件元数据(权限信息等)写到磁盘上。

SocketChannel

Java NIO中的SocketChannel是一个连接到TCP网络套接字的通道。可以通过以下2种方式创建:

  • 打开一个SocketChannel并连接到互联网上的某台服务器。

  • 一个新连接到达ServerSocketChannel时,会创建一个SocketChannel。

1)打开 SocketChannel

1
2
SocketChannel socketChannel = SocketChannel.open();  
socketChannel.connect(new InetSocketAddress("http://jenkov.com"80));

2)关闭 SocketChannel
当用完SocketChannel之后调用SocketChannel.close()关闭SocketChannel。

3)SocketChannel阻塞读写
同FileChannel

4)非阻塞模式
可以设置 SocketChannel 为非阻塞模式(non-blocking mode).设置之后,就可以在异步模式下调用connect(), read() 和write()了。非阻塞模式与选择器搭配会工作的更好,通过将一或多个SocketChannel注册到Selector,可以询问选择器哪个通道已经准备好了读取,写入等。

  • connect()
    如果SocketChannel在非阻塞模式下,此时调用connect(),该方法可能在连接建立之前就返回了。为了确定连接是否建立,可以调用finishConnect()的方法。

    1
    2
    3
    4
    5
    6
    socketChannel.configureBlocking(false);  
    socketChannel.connect(new InetSocketAddress("http://jenkov.com"80));  
      
    while(! socketChannel.finishConnect() ){  
       //wait, or do something else...  
    }
  • write()
    非阻塞模式下,write()方法在尚未写出任何内容时可能就返回了。所以需要在循环中调用write()。

  • read()
    非阻塞模式下,read()方法在尚未读取到任何数据时可能就返回了。所以需要关注它的int返回值,它会告诉你读取了多少字节。

ServerSocketChannel

ServerSocketChannel 是一个可以监听新进来的TCP连接的通道,就像标准IO中的ServerSocket一样。ServerSocketChannel类在 java.nio.channels包中。

1
2
3
4
5
6
7
8
9
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();  
  
serverSocketChannel.socket().bind(new InetSocketAddress(9999));  
  
while(true){  
    SocketChannel socketChannel =  serverSocketChannel.accept();  
  
    //do something with socketChannel...  
}

1)打开 ServerSocketChannel
通过调用 ServerSocketChannel.open() 方法来打开ServerSocketChannel。

2)关闭 ServerSocketChannel
通过调用ServerSocketChannel.close() 方法来关闭ServerSocketChannel。

3)监听新进来的连接
通过 ServerSocketChannel.accept() 方法监听新进来的连接。当 accept()方法返回的时候,它返回一个包含新进来的连接的 SocketChannel。因此,accept()方法会一直阻塞到有新连接到达。 通常不会仅仅只监听一个连接,在while循环中调用 accept()方法。

4)非阻塞模式
ServerSocketChannel可以设置成非阻塞模式。在非阻塞模式下,accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是null。 因此,需要检查返回的SocketChannel是否是null。

1
2
3
4
5
6
7
8
9
10
11
12
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();  
  
serverSocketChannel.socket().bind(new InetSocketAddress(9999));  
serverSocketChannel.configureBlocking(false);  

while(true){  
   SocketChannel socketChannel = serverSocketChannel.accept();  
  
    if(socketChannel != null){  
  //do something with socketChannel...  
    }  
}

DatagramChannel

DatagramChannel是一个能收发UDP包的通道。因为UDP是无连接的网络协议,所以不能像其它通道那样读取和写入。它发送和接收的是数据包。

1)打开 DatagramChannel

1
2
DatagramChannel channel = DatagramChannel.open();  
channel.socket().bind(new InetSocketAddress(9999));

2)接收数据
通过receive()方法从DatagramChannel接收数据。

1
2
3
ByteBuffer buf = ByteBuffer.allocate(48);  
buf.clear();  
channel.receive(buf);

receive()方法会将接收到的数据包内容复制到指定的Buffer. 如果Buffer容不下收到的数据,多出的数据将被丢弃。

3)发送数据
通过send()方法从DatagramChannel发送数据。

1
2
3
4
5
6
7
8
String newData = "New String to write to file" + System.currentTimeMillis();  
  
ByteBuffer buf = ByteBuffer.allocate(48);  
buf.clear();  
buf.put(newData.getBytes());  
buf.flip();  
  
int bytesSent = channel.send(buf, new InetSocketAddress("jenkov.com"80));

这个例子发送一串字符到”jenkov.com”服务器的UDP端口80。 因为服务端并没有监控这个端口,所以什么也不会发生。也不会通知你发出的数据包是否已收到,因为UDP在数据传送方面没有任何保证。

4)连接到特定的地址
可以将DatagramChannel“连接”到网络中的特定地址的。由于UDP是无连接的,连接到特定地址并不会像TCP通道那样创建一个真正的连接。而是锁住DatagramChannel ,让其只能从特定地址收发数据。

1
channel.connect(new InetSocketAddress("jenkov.com"80));

通道之间的数据传输

FileChannel允许通道间的复制,复制双方有一方为FileChannel即可。
若接收方为FileChannel,transferFrom()方法可以将数据从源通道传输到FileChannel中。

1
2
3
4
5
6
7
8
9
10
RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt""rw");  
FileChannel      fromChannel = fromFile.getChannel();  
 
RandomAccessFile toFile = new RandomAccessFile("toFile.txt""rw");  
FileChannel      toChannel = toFile.getChannel();  
  
long position = 0;  
long count = fromChannel.size();  
  
toChannel.transferFrom(position, count, fromChannel);

若传输方为FileChannel,可使用transferTo()方法。

Buffer

Java NIO中的Buffer用于和NIO通道进行交互。数据是从通道读入缓冲区,从缓冲区写入到通道中的。 缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存。这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问该块内存。

使用Buffer读写数据一般遵循以下四个步骤:
1)写入数据到Buffer
2)调用flip()方法
3)从Buffer中读取数据
4)调用clear()方法或者compact()方法

当向buffer写入数据时,buffer会记录下写了多少数据。一旦要读取数据,需要通过flip()方法将Buffer从写模式切换到读模式。在读模式下,可以读取之前写入到buffer的所有数据。

一旦读完了所有的数据,就需要清空缓冲区,让它可以再次被写入。有两种方式能清空缓冲区:调用clear()或compact()方法。clear()方法会清空整个缓冲区。compact()方法只会清除已经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的后面。

Buffer的三大属性

  • capacity
    作为一个内存块,Buffer有一个固定的大小值,也叫“capacity”.你只能往里写capacity个byte、long,char等类型。一旦Buffer满了,需要将其清空(通过读数据或者清除数据)才能继续写数据往里写数据。

  • position
    当你写数据到Buffer中时,position表示当前的位置。初始的position值为0.当一个byte、long等数据写到Buffer后, position会向前移动到下一个可插入数据的Buffer单元。position最大可为capacity – 1。
    当读取数据时,也是从某个特定位置读。当将Buffer从写模式切换到读模式,position会被重置为0。当从Buffer的position处读取数据时,position向前移动到下一个可读的位置。

  • limit
    在写模式下,Buffer的limit表示你最多能往Buffer里写多少数据。 写模式下,limit等于Buffer的capacity。
    当切换Buffer到读模式时, limit表示你最多能读到多少数据。因此,当切换Buffer到读模式时,limit会被设置成写模式下的position值。

Buffer类型:

  • ByteBuffer
  • MappedByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

这些Buffer类型代表了不同的数据类型。换句话说,就是可以通过char,short,int,long,float 或 double类型来操作缓冲区中的字节。MappedByteBuffer 有些特别,在后面再讲。

Buffer的操作

1)Buffer的分配
要想获得一个Buffer对象首先要进行分配。 每一个Buffer类都有一个allocate方法。例如:

1
2
ByteBuffer buf = ByteBuffer.allocate(48);
CharBuffer buf = CharBuffer.allocate(1024);

2) 向Buffer中写数据
写数据到Buffer有两种方式:

1
2
3
4
//从Channel写到Buffer。
int bytesRead = inChannel.read(buf); 
//通过Buffer的put()方法写到Buffer里。
buf.put(127);

put方法有很多版本,允许你以不同的方式把数据写入到Buffer中。例如, 写到一个指定的位置,或者把一个字节数组写入到Buffer。

3) 从Buffer中读取数据
从Buffer中读取数据首先要调用flip()方法,读有两种方式:

1
2
3
4
//从Buffer读取数据到Channel。
int bytesWritten = inChannel.write(buf); 
//使用get()方法从Buffer中读取数据。
byte aByte = buf.get();

get方法同样有很多版本,允许你以不同的方式从Buffer中读取数据。

3) rewind()方法
Buffer.rewind()将position设回0,所以你可以重读Buffer中的所有数据。limit保持不变,仍然表示能从Buffer中读取多少个元素(byte、char等)。

4) clear()与compact()方法
一旦读完Buffer中的数据,需要让Buffer准备好再次被写入。可以通过clear()或compact()方法来完成。

如果调用的是clear()方法,position将被设回0,limit被设置成 capacity的值。换句话说,Buffer 被清空了。Buffer中的数据并未清除,只是这些标记告诉我们可以从哪里开始往Buffer里写数据。 如果Buffer中有一些未读的数据,调用clear()方法,数据将“被遗忘”,意味着不再有任何标记会告诉你哪些数据被读过,哪些还没有。

如果Buffer中仍有未读的数据,且后续还需要这些数据,但是此时想要先先写些数据,那么使用compact()方法。compact()方法将所有未读的数据拷贝到Buffer起始处。然后将position设到最后一个未读元素正后面。limit属性依然像clear()方法一样,设置成capacity。现在Buffer准备好写数据了,但是不会覆盖未读的数据。

5) mark()与reset()方法
通过调用Buffer.mark()方法,可以标记Buffer中的一个特定position。之后可以通过调用Buffer.reset()方法恢复到这个position。

6) equals()与compareTo()方法
当满足下列条件时,表示两个Buffer相等:

  • 有相同的类型(byte、char、int等)。

  • Buffer中剩余的byte、char等的个数相等。

  • Buffer中所有剩余的byte、char等都相同。

如你所见,equals只是比较Buffer的一部分,不是每一个在它里面的元素都比较。实际上,它只比较Buffer中的剩余元素。

compareTo()方法比较两个Buffer的剩余元素(byte、char等), 如果满足下列条件,则认为一个Buffer“小于”另一个Buffer:

  • 第一个不相等的元素小于另一个Buffer中对应的元素。

  • 所有元素都相等,但第一个Buffer比另一个先耗尽(第一个Buffer的元素个数比另一个少)。

分散(Scatter)/聚集(Gather)

分散(scatter)从Channel中读取是指在读操作时将读取的数据写入多个buffer中。因此,Channel将从Channel中读取的数据“分散(scatter)”到多个Buffer中。
聚集(gather)写入Channel是指在写操作时将多个buffer的数据写入同一个Channel,因此,Channel 将多个Buffer中的数据“聚集(gather)”后发送到Channel。
scatter / gather经常用于需要将传输的数据分开处理的场合,例如传输一个由消息头和消息体组成的消息,你可能会将消息体和消息头分散到不同的buffer中,这样你可以方便的处理消息头和消息体。

1
2
3
4
5
6
7
8
9
10
11
12
13
ByteBuffer header = ByteBuffer.allocate(128);  
ByteBuffer body   = ByteBuffer.allocate(1024);  
  
ByteBuffer[] bufferArray = { header, body };  
  
channel.read(bufferArray); 

ByteBuffer header = ByteBuffer.allocate(128);  
ByteBuffer body   = ByteBuffer.allocate(1024);  
 
ByteBuffer[] bufferArray = { header, body };  
  
channel.write(bufferArray);

Selector

Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。

仅用单个线程来处理多个Channels的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程处理所有的通道。对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统的一些资源(如内存)。因此,使用的线程越少越好。

Selector的使用

1)Selector的创建
通过调用Selector.open()方法创建一个Selector

1
Selector selector = Selector.open();

2)向Selector注册通道
为了将Channel和Selector配合使用,必须将channel注册到selector上。通过SelectableChannel.register()方法来实现

1
2
channel.configureBlocking(false);  
SelectionKey key = channel.register(selector, Selectionkey.OP_READ);

与Selector一起使用时,Channel必须处于非阻塞模式下。这意味着不能将FileChannel与Selector一起使用,因为FileChannel不能切换到非阻塞模式。而套接字通道都可以。

register()方法的第二个参数。这是一个“interest集合”,意思是在通过Selector监听Channel时对什么事件感兴趣。

可以监听四种不同类型的事件: Connect/Accept/Read/Write

如果你对不止一种事件感兴趣,那么可以用“位或”操作符将常量连接起来

1
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

通道触发了一个事件意思是该事件已经就绪。所以,某个channel成功连接到另一个服务器称为“连接就绪”。一个server socket channel准备好接收新进入的连接称为“接收就绪”。一个有数据可读的通道可以说是“读就绪”。等待写数据的通道可以说是“写就绪”。

3) SelectionKey
当向Selector注册Channel时,register()方法会返回一个SelectionKey对象。这个对象包含了一些你感兴趣的属性:

  • interest集合
    interest集合是你所选择的感兴趣的事件集合。可以通过SelectionKey读写interest集合

    1
    int interestSet = selectionKey.interestOps();
  • ready集合
    ready 集合是通道已经准备就绪的操作的集合。在一次选择(Selection)之后,你会首先访问这个ready set。

  • Channel + Selector

    1
    2
    Channel  channel  = selectionKey.channel();  
    Selector selector = selectionKey.selector();
  • 附加的对象
    可以将一个对象或者更多信息附着到SelectionKey上,这样就能方便的识别某个给定的通道。例如,可以附加 与通道一起使用的Buffer,或是包含聚集数据的某个对象。

    1
    2
    selectionKey.attach(theObject);  
    Object attachedObj = selectionKey.attachment();

可以在用register()方法向Selector注册Channel的时候附加对象。

1
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

4)通过Selector选择通道
一旦向Selector注册了一或多个通道,就可以调用几个重载的select()方法。这些方法返回你所感兴趣的事件(如连接、接受、读或写)已经准备就绪的那些通道。换句话说,如果你对“读就绪”的通道感兴趣,select()方法会返回读事件已经就绪的那些通道。select()方法返回的int值表示有多少通道已经就绪。

select()阻塞到至少有一个通道在你注册的事件上就绪了。其他线程可调用该Selector的wakeup()方法解除阻塞。

select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。 selectNow()不会阻塞,不管什么通道就绪都立刻返回。如果自从前一次选择操作后,没有通道变成可选择的,则此方法直接返回零。

5)访问就绪通道
一旦调用了select()方法,并且返回值表明有一个或更多个通道就绪了,然后可以通过调用selector的selectedKeys()方法,访问“已选择键集(selected key set)”中的就绪通道。可以遍历这个已选择的键集合来访问就绪的通道

1
2
3
4
5
6
7
8
9
10
11
12
13
Iterator keyIterator = selectedKeys.iterator();  
while(keyIterator.hasNext()) {  
    SelectionKey key = keyIterator.next();  
    if(key.isAcceptable()) {  
        // a connection was accepted by a ServerSocketChannel.  
  } else if (key.isConnectable()) {  
        // a connection was established with a remote server.  
    } else if (key.isReadable()) {  
        // a channel is ready for reading  
  } else if (key.isWritable()) {  
        // a channel is ready for writing  
    }  
}

6)关闭close()
用完Selector后调用其close()方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效。通道本身并不会关闭。

管道Pipe

Java NIO 管道是2个线程之间的单向数据连接。Pipe有一个source通道和一个sink通道。数据会被写到sink通道,从source通道读取。

1)创建管道
通过Pipe.open()方法打开管道。

1
Pipe pipe = Pipe.open();

2)向管道写数据
要向管道写数据,需要访问sink通道。通过调用SinkChannel的write()方法,将数据写入SinkChannel

1
Pipe.SinkChannel sinkChannel = pipe.sink();

3)从管道读取数据
读取管道的数据,需要访问source通道。调用source通道的read()方法来读取数据

1
Pipe.SourceChannel sourceChannel = pipe.source();

参考文档:
Jenkov.com/java-nio

更多Java基础系列文章,参见Java基础大纲

CATALOG
  1. 1. NIO概述
  2. 2. Channel
    1. 2.1. FileChannel
    2. 2.2. SocketChannel
    3. 2.3. ServerSocketChannel
    4. 2.4. DatagramChannel
    5. 2.5. 通道之间的数据传输
  3. 3. Buffer
    1. 3.1. Buffer的三大属性
    2. 3.2. Buffer类型:
    3. 3.3. Buffer的操作
    4. 3.4. 分散(Scatter)/聚集(Gather)
  4. 4. Selector
    1. 4.1. Selector的使用
  5. 5. 管道Pipe