Search This Blog

2013-05-20

非阻塞IO简介

目录  [+]
之前用过Netty 3.2.x,Netty提供了高层API封装,对底层的依赖库只是略有了解。最近在写一篇叫做“Netty使用经验”的文章,正好又有时间,看看libevent、Java 1.4 NIO、Java 7 AIO、AIO。看到一些好文章,在此记录一下,网上有很多总结类的文章,我尽量不看这类文章,尽量引用相关技术的原始描述或者只放链接,不做置评。

下面列出的堆栈跟系统有关,我用的是Linux。

1 libevent

The libevent API provides a mechanism to execute a callback function when a specific event occurs on a file descriptor or after a timeout has been reached. Furthermore, libevent also support callbacks due to signals or regular timeouts.
Currently, libevent supports /dev/poll, kqueue(2), event ports, POSIX select(2), Windows select(), poll(2), and epoll(4).
--ref: http://libevent.org/

libevent使用基于事件的回调机制,代码比较直观。

下面是运行libevent-2.0.21-stable/sample/http-server之后获得的堆栈。
>  gstack 9325
#0  0x00007faa909c1da3 in __epoll_wait_nocancel () from /lib64/libc.so.6
#1  0x00007faa90ec6fc3 in epoll_dispatch (base=0x12a3040, tv=<optimized out>) at epoll.c:407
#2  0x00007faa90eb2e90 in event_base_loop (base=base@entry=0x12a3040, flags=flags@entry=0) at event.c:1607
#3  0x00007faa90eb4247 in event_base_dispatch (event_base=event_base@entry=0x12a3040) at event.c:1450
#4  0x000000000040183c in main (argc=<optimized out>, argv=<optimized out>) at http-server.c:402
最后调用的是epoll_wait系统函数(源码里调用的是这个,堆栈里不太一样)。这是在等待,不是在执行下载。
测试过下载500多MB的文件(只读几KB,然后停止),一直都是占用100多KB内存。

1.1 深入阅读

http://libevent.org/
libevent基本使用场景和事件流程 - Author: 张亮 - blog.csdn.net

2 epoll

epoll - I/O event notification facility
The epoll API performs a similar task to poll(2): monitoring multiple file descriptors to see if I/O is possible on any of them. The epoll API can be used either as an edge-triggered or a level-triggered interface and scales well to large numbers of watched file descriptors.
--ref: epoll manual

epoll能高效地监听很多文件描述符(socket连接也归入此类),从名字能看出来,这需要拉,拉就需要轮询,可以设置单独的线程来做这件事情。当IO准备好的时候就能批量拉到相应的事件,比如可读可写(EPOLLIN | EPOLLOUT),根据事件类型就可以做相应操作。基于epoll做一个回调机制是很方便的。

2.1 深入阅读

* > man epoll_wait
* > man epoll
* > less /usr/include/sys/epoll.h
epoll精髓 - Author: 彭帅 - www.cnblogs.com

3 Java 1.4 NIO

3.1 API

相关的API是在java.nio.channels包跟java.nio.channels.spi包,Selector、SelectionKey是比较基本的。SelectionKey支持OP_ACCEPT | OP_CONNECT | OP_READ | OP_WRITE这4种IO事件。

这个包装还是比较原滋原味的,跟epoll很像,要做成回调机制也很方便。事实上,在Linux平台,如果内核版本在2.6及以上,那默认使用epoll。

3.2 深入阅读

SelectionKey - docs.oracle.com
NIO 入门 # 连网和异步 I/O - Author: Greg Travis - www.ibm.com
* {jdk.home}/sample/nio/server
    在Linux平台,demo跟sample需要单独下载安装。

4 Java 7 Asynchronous I/O

4.1 API

相关的API是在java.nio.channels包跟java.nio.channels.spi包,Asynchronous开头的都是。比较顶层的一个类是AsynchronousChannel - docs.oracle.com,目前有一个子接口,还有三个实现类,Asynchronous I/O操作可以分为两类(operation一般是read/write):
  • Future<V> operation(...)
    • 这不是回调机制,可以理解为可中断的阻塞式IO操作
  • void operation(... A attachment, CompletionHandler<V,? super A> handler)
    • 这是回调机制,数据读写由系统完成,之后调用handler

如果是socket编程,AsynchronousSocketChannel - docs.oracle.com会很有用,read/write方法分别有4个,列举其中2个write方法:
  • <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer,? super A> handler)
    • Writes a sequence of bytes to this channel from the given buffer.
    • The handler parameter is a completion handler that is invoked when the write operation completes (or fails). The result passed to the completion handler is the number of bytes written.
  • <A> void write(ByteBuffer src, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer,? super A> handler)
    • 带超时控制的版本

性能应该没问题,代码也会直观,这是我想要的。

4.2 内部实现初探

运行{jdk.home}/sample/nio/chatserver试了下。
Pre[-]
> jstack 5010

"pool-1-thread-4" prio=10 tid=0x00007f4e50136800 nid=0x13a7 runnable [0x00007f4e3585c000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPoll.epollWait(Native Method)
        at sun.nio.ch.EPollPort$EventHandlerTask.poll(EPollPort.java:194)
        at sun.nio.ch.EPollPort$EventHandlerTask.run(EPollPort.java:268)
        at sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:722)

"pool-1-thread-3" prio=10 tid=0x00007f4e50134800 nid=0x13a6 waiting on condition [0x00007f4e3595d000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000007d6fcff80> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
        at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:374)
        at sun.nio.ch.EPollPort$EventHandlerTask.run(EPollPort.java:262)
        at sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:722)
可以看到,Asynchronous I/O是用epoll模拟的。

> gstack 5010
Thread 9 (Thread 0x7f4e35b60700 (LWP 5028)):
#0  0x00007f4e57058dc3 in epoll_wait () from /lib64/libc.so.6
#1  0x00007f4e35b6a3c3 in Java_sun_nio_ch_EPoll_epollWait () from /usr/java/jdk1.7.0_17/jre/lib/amd64/libnio.so
sun.nio.ch.EPoll.epollWait这个本地调用的就是epoll_wait。
这算是模拟的Proactor模式,数据不是由内核操作,而是在用户空间(JVM)。

5 OS级Asynchronous I/O

有些操作系统有实现,列举其中几个。

5.1 POSIX AIO

The POSIX asynchronous I/O (AIO) interface allows applications to initiate one or more I/O operations that are performed asynchronously (i.e., in the back-ground). The application can elect to be notified of completion of the I/O operation in a variety of ways: by delivery of a signal, by instantiation of a thread, or no notification at all.

The current Linux POSIX AIO implementation is provided in user space by glibc. This has a number of limitations, most notably that maintaining multiple threads to perform I/O operations is expensive and scales poorly. Work has been in progress for some time on a kernel state-machine-based implementation of asynchronous I/O (see io_submit(2), io_setup(2), io_cancel(2), io_destroy(2), io_getevents(2)), but this implementation hasn't yet matured to the point where the POSIX AIO implementation can be completely reimplemented using the kernel system calls.

--ref: man aio

不是在内核实现的,自称实现还不够好。

5.2 Linux Native AIO

这是内核级AIO,包括系统调用:io_setup, io_cancel, io_destroy, io_getevents, io_submit。
libaio使用这些系统调用做了包装:
AIO enables even a single application thread to overlap I/O operations with other processing, by providing an interface for submitting one or more I/O requests in one system call (io_submit()) without waiting for completion, and a separate interface (io_getevents()) to reap completed I/O operations associated with a given completion group.
Support for kernel AIO has been included in the 2.6 Linux kernel.
--ref: Kernel Asynchronous I/O (AIO) Support for Linux - lse.sourceforge.net

5.2.1 深入阅读


5.3 Solaris Asynchronous I/O

When an asynchronous I/O call returns successfully, the I/O operation has only been queued, waiting to be done. The actual operation also has a return value and a potential error identifier, the values that would have been returned to the caller as the result of a synchronous call. When the I/O is finished, the return value and error value are stored at a location given by the user at the time of the request as a pointer to an aio_result_t.
--ref: Solaris Asynchronous I/O - docs.oracle.com

5.4 Windows IOCP

The CreateIoCompletionPort function creates an I/O completion port and associates one or more file handles with that port. When an asynchronous I/O operation on one of these file handles completes, an I/O completion packet is queued in first-in-first-out (FIFO) order to the associated I/O completion port.
--ref: I/O Completion Ports - msdn.microsoft.com

IO操作完成之后会把完成事件放到FIFO队列,需要写代码去拉这些事件。

6 NIO友好的API

不论是BIO还是NIO,数据都是流式的,这样也更能节约内存。BIO里的InputStream天然地支持流式数据处理,NIO需要别的方法来支持流式数据处理,否则的话,可能会非常浪费内存。这一节描述的是读取到的数据。

6.1 增量式数据提交

一个好的形式可以参考java.security.MessageDigest - docs.oracle.com,对于大型数据,MessageDigest有几个update方法用来提交部分数据,完成的时候调用digest方法即可。

6.2 把数据写入外存

有些库不支持增量式数据提交。
如果数据不是超大,那增加虚拟内存应该够用了。
    如果还不够,那只能阻塞所有消耗内存的操作,有空闲内存以后再继续。
否则,那只能先把数据写入外存,然后提交InputStream。

7 深入阅读


=文章版本=

20130516 - 19

No comments:

Post a Comment