Java NIO 选择器(Selector) 知识预备 (linux poll)
最近花了些功夫在研究Java NIO的JDK源码,发现Selector的实现,除了在唤醒机制上做了手脚,主要依赖操作系统的实现,为了无负担的弄懂Selector,有必要研究一下操作系统是如何实现选择的。本文主要参考linux-2.6.10内核poll的实现。
本文可能会表现得很肤浅,高手们请直接略过,另外,本文所出现的“政府”字样,乃比喻性质的,或者就认为它是“清政府”好了,请相关人员不要曲解。
先从poll开刀,poll中文译“轮询”,非常贴切,不错,他就是干这个的。man一下poll,可以看到这样的函数定义:
#include int poll(struct pollfd *fds, nfds_t nfds, int timeout);
其中,pollfd是这样的一个结构体:
struct pollfd {
int fd; /* file descriptor */
short events; /* requested events */
short revents; /* returned events */
};
而,nfds_t猜测一下,就应该知道是一个整数类型,没错,它确实是这样定义的:
typedef unsigned long nfds_t
很容易从fds指针和nfds的无符号整数看出,这是一个pollfd数组,有效元素个数为nfds。
现在,我们大胆的猜测一下这个poll在做什么吧,算上timeout,我们可以想象出这样一幅场景:
poll两手叉腰,满脸横肉,目中无人似的对我们说:“你们这些小喽啰只要给我一组pollfd,把你们感兴趣的事件告诉我,我就能在规定时间timeout之内告诉你们哪些事件发生,厉害吧?”
鄙人从小就有大侠梦,最看不惯的就是这副嘴脸,于是深夜裹黑衣,房梁探究竟。
不就是个政府帮办(系统调用)嘛,先找到你家再说,走街串巷,东寻西问,原来你丫躲在fs/select.c,眼看大门sys_poll还挺高,够气派,果然是给政府办事的。小心为上,先分段探查一下大门:
asmlinkage long sys_poll(struct pollfd __user * ufds, unsigned int nfds, long timeout)
{
//疑点一:看情形可能是等待队列之类的
struct poll_wqueues table;
int fdcount, err;
unsigned int i;
//疑点二:poll_list是什么东东,还一下定义了两个指针,看来得先停下来研究一下
struct poll_list *head;
struct poll_list *walk;
于是提起poll_wqueues一家:
[linux/poll.h]
typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *);
先不管poll_queue_proc,反正就是个函数指针,看参数wait_queue_head_t,肯定与等待队列脱不了干系。
typedef struct poll_table_struct {
poll_queue_proc qproc;
} poll_table;
看到poll_table,就更懒得理了,你丫就把上面那函数指针封装成一个结构体。
struct poll_wqueues {
poll_table pt;
struct poll_table_page * table;
int error;
};
poll_table我认识,就是上面封装了一个与等待队列有关的函数指针的结构体类型,至于poll_table_page,还是不认识,先不评论,先找找它:
[fs/select.c]
struct poll_table_entry {
struct file * filp;
wait_queue_t wait;
wait_queue_head_t * wait_address;
};
poll_table_entry有文件指针,有等待队列二人组wait_queue_t和wait_queue_head_t*,还敢说不是与等待队列有关系?wait_queue_head_t* wait_address不就是一个等待队列的头指针嘛,wait_queue_t wait自然就是等待队列中的项了。
struct poll_table_page {
struct poll_table_page * next;
struct poll_table_entry * entry;
struct poll_table_entry entries[0];
};
至于poll_table_page,看到自己内部再定义一个poll_table_page*,不用说,就是一个链表,不过看到下面关于poll_table_entry的两个奇怪的定义,似曾相识,哦,不就是政府常用的小伎俩嘛:
entry与entries相互配合,看似优雅的组成了一个poll_table_entry动态数组,entry为poll_table_entry的一个指针,总指向第一个创建poll_table_entry的地址,entries看似一个数组,但元素个数居然为0,事实上,它是不占空间的,它充当着数组首地址的角色,不信你可以用sizeof(struct poll_table_page)尝试一下,结果肯定是8(32位政府),即为两个指针的空间大小。
我想在某处,肯定会这样来使用entry和entries:
// 初始化时,entry指向了entries,因为这正是数组首地址, // 在这里即可创建poll_table_entry变量 new_table->entry = new_table->entries; struct poll_table_entry * entry = table->entry; // entry自然的指向了下一个可创建poll_table_entry变量的地址 table->entry = entry+1;
看着这一家子,我总结一下poll_wqueues扮演的角色:
poll_wqueues掌管一个封装了与等待队列有关的函数指针的结构体变量,同时掌管着一个poll_table_page链表的头指针,按照政府一贯的办事风格,这个函数指针应该是一个回调函数指针,至于poll_table_page链表,应该是一个循环链表,链表中的每一项包含poll_table_entry数组,而数组中的每一项又可以是等待队列wait_queue中的一个元素。讲一大堆反而让人糊涂,上一幅图来说明:
这么看来,你丫不就是管理等待队列和回调函数嘛,个管家还整个分页啥的。
再看看poll_list:
[fs/select.c]
struct poll_list {
struct poll_list *next;
int len;
struct pollfd entries[0];
};
咋和poll_table_entry一个德性,果然都是政府的好帮手。很明显,poll_list组成一个链表,链表的每一项包含一个pollfd数组,len即为第一个可以创建pollfd变量的数组下标,entries即为数组首地址。至于为什么又要用链表,肯定又是用来分页的。
政府从来都不是很相信人民的,我记得我传进来的是一个pollfd数组,我想到时候肯定会被拷贝到这所谓的poll_list链表中去吧。
回到大门,再探:
/* Do a sanity check on nfds ... */
if (nfds > current->files->max_fdset && nfds > OPEN_MAX)
return -EINVAL;
原来pollfd的数量不是无限的,也是,哪能让你那么好过。
if (timeout) {
/* Careful about overflow in the intermediate values */
if ((unsigned long) timeout < MAX_SCHEDULE_TIMEOUT / HZ)
timeout = (unsigned long)(timeout*HZ+999)/1000+1;
else /* Negative or overflow */
timeout = MAX_SCHEDULE_TIMEOUT;
}
timeout看来也被参和了一下,还严正其词:要规范,要和谐。
poll_initwait(&table);
等等,这个实在干啥?初始化什么?刚刚那个poll_wqueues变量?你丫终于被我逮到了:
[fs/select.c]
void poll_initwait(struct poll_wqueues *pwq)
{
init_poll_funcptr(&pwq->pt, __pollwait);
pwq->error = 0;
pwq->table = NULL;
}
[linux/poll.h]
static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)
{
pt->qproc = qproc;
}
居然就只是把poll_wqueues table做了个初始化,等等,__pollwait是什么?上文猜测它是回调函数,下面就把它拎出来看看:
[fs/select.c]
void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *_p)
{
//根据poll_wqueues内部变量pt的地址推算出poll_wqueues的地址
struct poll_wqueues *p = container_of(_p, struct poll_wqueues, pt);
//拿到上文说的链表头结点指针
struct poll_table_page *table = p->table;
//如果该指针为空,或者对应页已经不能存放更多的entry了,
//我记得刚才初始化时,table应该为NULL,则该函数第一次调用时,应该会走这个分支了
if (!table || POLL_TABLE_FULL(table)) {
struct poll_table_page *new_table;
// 分配新的一页空间给poll_table_page指针new_table
new_table = (struct poll_table_page *) __get_free_page(GFP_KERNEL);
//分配失败返回,恢复线程状态为TASK_RUNNING
if (!new_table) {
p->error = -ENOMEM;
__set_current_state(TASK_RUNNING);
return;
}
//第一次都这样,entry指向了该页的entry数组首地址
new_table->entry = new_table->entries;
//果不其然,next指向了poll_wqueues所指向的地址,最终形成了环
new_table->next = table;
// 因为新增了一页,则poll_wqueues指针自然更新而指向了new_table,
// 从这里可以看出,poll_wqueues的table指针永远指向了最新一页的poll_table_page
p->table = new_table;
// table更新为new_table,便于后续添加poll_table_entry
table = new_table;
}
/* Add a new entry */
{
//拿到第一个可以存放poll_table_entry变量的地址
struct poll_table_entry * entry = table->entry;
// entry偏移,指向了新的第一个可以存放poll_table_entry变量的地址
table->entry = entry+1;
// #define get_file(x) atomic_inc(&(x)->f_count)
// 可见,get_file增加filp的f_count引用计数,
// 意思是有一个新的entry将要引用filp指针了
get_file(filp);
entry->filp = filp;
entry->wait_address = wait_address;
init_waitqueue_entry(&entry->wait, current); //见下文解释
add_wait_queue(wait_address,&entry->wait);
}
}
上述代码最后这两步比较常见,先看init_waitqueue_entry:
[linux/wait.h]
static inline void init_waitqueue_entry(wait_queue_t *q, struct task_struct *p)
{
q->flags = 0;
q->task = p;
q->func = default_wake_function;
}
简单的讲,就是初始化等待队列项,挂载当前线程current;
而add_wait_queue即为等待队列的常用操作,将entry中的wait_queue_t变量wait,加入到等待队列头结点的后面。
综合这两步,其实就是将当前线程current挂载到等待队列上。
这里可以先小透一下:轮询工作,最终会落实到各个具体的file上,如tcp的tcp_poll在得到设备回应之前,
会执行poll_wait(file, sk->sk_sleep, wait),具体函数定义如下:
[linux/poll.h]
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && wait_address)
p->qproc(filp, wait_address, p);
}
意思是,如果等待队列指针不为空,并且poll_table不为空,则会执行回调函数,由poll_wait(file, sk->sk_sleep, wait)我们可以知道,sk->sk_sleep即为设备sock自己的等待队列,执行完之后,当前线程挂载到了该设备的等待队列上,当设备状态变更时,会唤醒线程,就可以开始新的一次轮询,即可以拿到更新后的设备状态,我们通过比较状态可以确认自己感兴趣的事件是否发生。
回顾一下__pollwait所做的工作:每当__pollwait被触发时,它在poll_wqueues新创建一个entry,然后将当前线程current挂载到entry中的wait上,并加入到当前设备的等待队列中。
回过神,有点耐心,再探大门:
head = NULL;
walk = NULL;
i = nfds;
err = -ENOMEM;
while(i!=0) {
struct poll_list *pp;
pp = kmalloc(sizeof(struct poll_list)+
sizeof(struct pollfd)*
(i>POLLFD_PER_PAGE?POLLFD_PER_PAGE:i),
GFP_KERNEL);
if(pp==NULL)
goto out_fds;
pp->next=NULL;
pp->len = (i>POLLFD_PER_PAGE?POLLFD_PER_PAGE:i);
if (head == NULL)
head = pp;
else
walk->next = pp;
walk = pp;
if (copy_from_user(pp->entries, ufds + nfds-i,
sizeof(struct pollfd)*pp->len)) {
err = -EFAULT;
goto out_fds;
}
i -= pp->len;
}
看到这整整一大段while,我笑了,你丫的写这么多,不就是为了将用户传入的pollfd数组一个个拷贝到poll_list链表中吗?链表的每个结点最多存POLLFD_PER_PAGE个pollfd,跟poll_table_page存储entry异曲同工嘛。
fdcount = do_poll(nfds, head, &table, timeout);
到底是混官场多年的,居然收了个单,就把跑腿工作交给do_poll了。
我们先看它传给do_poll的参数,nfds为用户传入的整数,代表传入的pollfd的数量,而head即为拷贝后的poll_list链表,其实内容即为用户传入的pollfd,而table自然就是上文说明的等待队列大管家,timeout经和谐后还是传给了do_poll。于是,do_poll悲惨的跑腿生活就开始了:
[fs/select.c]
static int do_poll(unsigned int nfds, struct poll_list *list,
struct poll_wqueues *wait, long timeout)
{
int count = 0;
// pt即为poll_initwait(&table);初始化后的结果,其实就是__pollwait
poll_table* pt = &wait->pt;
// 看到这里,大家应该明白了为什么timeout==0是非阻塞的,因为无回调函数,
// poll_wait实际什么都没做,即当前线程不会挂入等待队列,
//设置pt=NULL这一招非常管用,后文仍有巧妙运用的地方。
if (!timeout)
pt = NULL;
for (;;) { //好吧,大循环
// poll_list的代步指针
struct poll_list *walk;
//标准的与schedule_timeout配合,因为schedule_timeout会阻塞当前线程,
//故需要设置当前线程状态为可中断的
set_current_state(TASK_INTERRUPTIBLE);
walk = list;
//好吧,遍历poll_list链表,从上文大家应该注意到,
//链表每个结点上都存储了固定数量的pollfd
while(walk != NULL) {
// 针对一个poll_list结点,执行do_pollfd,晕,do_poll我看错你了,
// 以为你是良民,不会欺压百姓,结果还是把担子扔给do_pollfd
do_pollfd( walk->len, walk->entries, &pt, &count);
// 遍历指针后移
walk = walk->next;
}
/*这里pt设置为空,则下次循环不会再调用__pollwait将当前线程挂入等待队列,可是为什么要这么做呢?
do_pollfd遍历一个结点的pollfd数组时,会因pt=__pollwait将当前线程一个一个挂入fd对应设备的等待队列中,
整个while操作执行完后,表明所有pollfd均被访问到,如果过程中count不为0,则会直接返回,反之,当count==0,
则表明没有询问结果,则不会跳出循环,于是就因schedule_timeout而阻塞,等待设备来唤醒当前线程;
一旦设备因状态改变,唤醒了当前线程,当前线程于是开启了另一次while循环,继续轮询是否有fd的感兴趣事件ready,
第二次轮询之前,如果不设置pt=NULL,则轮询中又会将当前线程挂入等待队列一次,这是没有必要的*/
pt = NULL;
//如果有事件发生,或者已经超时,或者被中断了,则会跳出循环
if (count || !timeout || signal_pending(current))
break;
//统计一遍之后发现还没时间发生,就查看一下等待队列项中的错误信息,默认错误信息为0,
//当不为0时,则退出循环,返回错误信息,错误信息为负数
count = wait->error;
if (count)
break;
/*线程超时睡眠就靠它了,执行schedule_timeout之后,当前线程即陷入睡眠,即阻塞,停止阻塞有两种方式,
一种是超时,另一种即为当前线程从等待队列中被唤醒,schedule_timeout在消耗timeout时间,
正常返回时值为0,被唤醒时值为剩余值,除非被中断或者找到结果,否则最终会因为timeout==0而退出循环*/
timeout = schedule_timeout(timeout);
}
//恢复线程状态为运行中
__set_current_state(TASK_RUNNING);
//返回ready的数量,或者错误信息
return count;
}
好吧,最后看看被压迫的do_pollfd吧,这回总该没看错吧:
[fs/select.c]
static void do_pollfd(unsigned int num, struct pollfd * fdpage,
poll_table ** pwait, int *count)
{
int i;
//遍历poll_list中一个结点中的pollfd数组
for (i = 0; i < num; i++) {
int fd;
unsigned int mask;
struct pollfd *fdp;
mask = 0;
fdp = fdpage+i;
fd = fdp->fd;
if (fd >= 0) {
//根据fd获取文件指针,同时增加其f_count引用计数
struct file * file = fget(fd);
mask = POLLNVAL;
//如果拿到file,开始干活
if (file != NULL) {
mask = DEFAULT_POLLMASK;
if (file->f_op && file->f_op->poll)
/*好吧,我无语了,本以为你小子是被压迫的,原来你也是个监工,这是个什么社会啊,
一层压迫一层。我在想file->f_op->poll是否也再压迫别人呢,事实是肯定的,
它至少还有设备可以压迫。言归正传,上文描述过一点,具体文件的poll操作过程中,
如果pwait不为空,即为__pollwait时,会将当前线程挂载到等待队列中,去等待设备的唤醒。
mask为当前fd返回的I/O状态值*/
mask = file->f_op->poll(file, *pwait);
//不要那么乐观,即使返回了,只能说明有事件发生,是否是自己感兴趣的事件,
//还得经过fdp->events的位与操作
mask &= fdp->events | POLLERR | POLLHUP;
//释放file的f_count引用计数,当引用计数为0时,会close掉file
fput(file);
}
//经过一番洗劫,如果还能幸存下来,说明真的有事件发生了,这个时候,就替换pwait为空,
//表明,后面不用在把当前线程挂入等待队列这么麻烦了,反正我已经统计到了,
//后面的发生了就增加一下count,没发生也无所谓
if (mask) {
*pwait = NULL;
(*count)++;
}
}
//最后一步非常重要,可以发现,最终结果被写回到pollfd中的revents,好吧,
//原来就这样而已,那我们只需要看revents是否为空不就知道有无事情发生了吗?也没那么巧妙嘛
fdp->revents = mask;
}
}
回顾一下do_poll和do_pollfd,通过遍历所有的pollfd,调用对应文件文件操作的poll过程,因等待回调函数为__pollwait,则会将当前线程挂入设备的等待队列中,等待对应设备的唤醒,设备唤醒当前线程后,poll知道有事情发生了,与是就开启了新的一轮轮询,除非找到感兴趣事件ready的fd或者等待超时或者被中断,轮询会一直进行下去。
另外,关于timeout,如果一开始就是为0,则do_pollfd至始至终都不会阻塞,又因timeout为0,则在遍历一般之后即会返回;当timeout>0时,由于schedule_timeout会将当前线程阻塞,并消耗timeout,schedule_timeout的正常返回值为0,而阻塞过程中被唤醒时,timeout更新为剩余的等待时间,不管怎么说,除非发现感兴趣事件已经ready或者被中断,则最终会因为timeout==0而退出循环。
貌似已经知道它们内部的勾当了,那么,找到结果后,又怎么返还给用户呢?要知道,现在结果不是还放在poll_list链表中吗?
是的,所以不用猜也知道下面在干什么,肯定是收集结果并修改对应用户pollfd中的revents呗:
/* OK, now copy the revents fields back to user space. */
walk = head;
err = -EFAULT;
walk即为poll_list链表的代步指针了。
while(walk != NULL) {
好吧,walk在遍历整个poll_list链表,知道其为空。
struct pollfd *fds = walk->entries;
拿到poll_list单个结点中pollfd数组的首地址。
int j;
for (j=0; j len; j++, ufds++) {
同步遍历poll_list结点中的pollfd数组和用户传入的pollfd数组。
if(__put_user(fds[j].revents, &ufds->revents))
goto out_fds;
从poll_list结点pollfd数组单个元素中拷贝revents值到对应pollfd数组元素的revents中。通过该遍历操作,用户pollfd中的所有revents都被更新了一遍,用户只需要查看各个pollfd中的revents既可以知道结果。
}
walk = walk->next;
}
err = fdcount;
if (!fdcount && signal_pending(current))
err = -EINTR;
上文三行代码意思很简单:如果无ready事件发生,且当前线程是被中断的,则设置返回结果为-EINTR,反之无改变。
out_fds:
walk = head;
while(walk!=NULL) {
struct poll_list *pp = walk->next;
kfree(walk);
walk = pp;
}
poll_freewait(&table);
return err;
}
最后一段代码就不重点描述了,它其实就在释放两个链表:poll_list链表及table的poll_table_page链表。
探索完成,秘密也发现了,原来sys_poll居然说得没错,的确按它说的那样,我们传递给它一组pollfd,填充好自己感兴趣的事件,给定timeout,它的确能够在规定的时间内告诉我们哪些事件发生了,事件发生的标志即为对应的pollfd中的revents被修改掉了。因为默认传入的revents均为0,只要对应revents不为0,即表明有事件发生。
好吧,我承认,政府是有一套,毕竟有权利自然有力量,我还是不问了,乖乖使用就好了。

public class SelectSockets {
public static int PORT_NUMBER = 1234;
public static void main(String[] argv) throws Exception {
new SelectSockets().go(argv);
}
public void go(String[] argv) throws Exception {
int port = PORT_NUMBER;
if (argv.length > 0) { // override default listen port
port = Integer.parseInt(argv[0]);
}
System.out.println(“Listening on port ” + port);
// allocate an unbound server socket channel
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// Get the associated ServerSocket to bind it with
ServerSocket serverSocket = serverChannel.socket();
// set the port the server channel will listen to
serverSocket.bind(new InetSocketAddress(port));
// create a new Selector for use below
Selector selector = Selector.open();
// set non-blocking mode for the listening socket
serverChannel.configureBlocking(false);
// register the ServerSocketChannel with the Selector
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// this may block for a long time, upon return the
// selected set contains keys of the ready channels
System.out.println(“SelectSocket here 1″);
int n = selector.select();
System.out.println(“SelectSocket here 2″);
if (n == 0) {
continue; // nothing to do
}
// get an iterator over the set of selected keys
Iterator it = selector.selectedKeys().iterator();
// look at each key in the selected set
while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
// Is a new connection coming in?
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key
.channel();
SocketChannel channel = server.accept();
registerChannel(selector, channel, SelectionKey.OP_READ);
sayHello(channel);
}
// is there data to read on this channel?
if (key.isReadable()) {
readDataFromSocket(key);
}
// remove key from selected set, it’s been handled, important
it.remove();
}
}
}
// ———————————————————-
/**
* Register the given channel with the given selector for the given
* operations of interest
*/
protected void registerChannel(Selector selector,
SelectableChannel channel, int ops) throws Exception {
if (channel == null) {
return; // could happen
}
// set the new channel non-blocking
channel.configureBlocking(false);
// register it with the selector
channel.register(selector, ops);
}
// ———————————————————-
// Use the same byte buffer for all channels. A single thread is
// servicing all the channels, so no danger of concurrent access.
private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
/**
* Sample data handler method for a channel with data ready to read.
*
* @param key
* A SelectionKey object associated with a channel determined by
* the selector to be ready for reading. If the channel returns
* an EOF condition, it is closed here, which automatically
* invalidates the associated key. The selector will then
* de-register the channel on the next select call.
*/
protected void readDataFromSocket(SelectionKey key) throws Exception {
SocketChannel socketChannel = (SocketChannel) key.channel();
int count;
buffer.clear(); // make buffer empty
// loop while data available, channel is non-blocking
while ((count = socketChannel.read(buffer)) > 0) {
buffer.flip(); // make buffer readable
// send the data, don’t assume it goes all at once
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
// WARNING: the above loop is evil. Because
// it’s writing back to the same non-blocking
// channel it read the data from, this code can
// potentially spin in a busy loop. In real life
// you’d do something more useful than this.
buffer.clear(); // make buffer empty
}
if (count < 0) {
// close channel on EOF, invalidates the key
socketChannel.close();
}
}
// ———————————————————-
/**
* Spew a greeting to the incoming client connection.
*
* @param channel
* The newly connected SocketChannel to say hello to.
*/
private void sayHello(SocketChannel channel) throws Exception {
buffer.clear();
buffer.put("Hi there!\r\n".getBytes());
buffer.flip();
channel.write(buffer);
}
}
//———————————————————————————————-
public class SelectSocketsThreadPool extends SelectSockets {
private static final int MAX_THREADS = 5;
private ThreadPool pool = new ThreadPool(MAX_THREADS);
// ————————————————————-
public static void main(String[] argv) throws Exception {
new SelectSocketsThreadPool().go(argv);
}
// ————————————————————-
/**
* Sample data handler method for a channel with data ready to read. This
* method is invoked from the go() method in the parent class. This handler
* delegates to a worker thread in a thread pool to service the channel,
* then returns immediately.
*
* @param key
* A SelectionKey object representing a channel determined by the
* selector to be ready for reading. If the channel returns an
* EOF condition, it is closed here, which automatically
* invalidates the associated key. The selector will then
* de-register the channel on the next select call.
*/
protected void readDataFromSocket(SelectionKey key) throws Exception {
WorkerThread worker = pool.getWorker();
if (worker == null) {
// No threads available, do nothing, the selection
// loop will keep calling this method until a
// thread becomes available. This design could
// be improved.
return;
}
// invoking this wakes up the worker thread then returns
worker.serviceChannel(key);
}
// —————————————————————
/**
* A very simple thread pool class. The pool size is set at construction
* time and remains fixed. Threads are cycled through a FIFO idle queue.
*/
private class ThreadPool {
List idle = new LinkedList();
ThreadPool(int poolSize) {
// fill up the pool with worker threads
for (int i = 0; i 0) {
worker = (WorkerThread) idle.remove(0);
}
}
return worker;
}
/**
* Called by the worker thread to return itself to the idle pool.
*/
void returnWorker(WorkerThread worker) {
synchronized (idle) {
idle.add(worker);
}
}
}
/**
* A worker thread class which can drain channels and echo-back the input.
* Each instance is constructed with a reference to the owning thread pool
* object. When started, the thread loops forever waiting to be awakened to
* service the channel associated with a SelectionKey object. The worker is
* tasked by calling its serviceChannel() method with a SelectionKey object.
* The serviceChannel() method stores the key reference in the thread object
* then calls notify() to wake it up. When the channel has been drained, the
* worker thread returns itself to its parent pool.
*/
private class WorkerThread extends Thread {
private ByteBuffer buffer = ByteBuffer.allocate(1024);
private ThreadPool pool;
private SelectionKey key;
WorkerThread(ThreadPool pool) {
this.pool = pool;
}
// loop forever waiting for work to do
public synchronized void run() {
System.out.println(this.getName() + ” is ready”);
while (true) {
try {
// sleep and release object lock
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
// clear interrupt status
Thread.interrupted();
}
if (key == null) {
continue; // just in case
}
System.out.println(this.getName() + ” has been awakened”);
try {
System.out.println(this.getName() + “here1″);
drainChannel(key);
} catch (Exception e) {
System.out.println(“Caught ‘” + e + “‘ closing channel”);
// close channel and nudge selector
try {
key.channel().close();
} catch (IOException ex) {
ex.printStackTrace();
}
key.selector().wakeup();
}
key = null;
// done, ready for more, return to pool
System.out.println(this.getName() + “here7″);
this.pool.returnWorker(this);
System.out.println(this.getName() + “here8″);
}
}
/**
* Called to initiate a unit of work by this worker thread on the
* provided SelectionKey object. This method is synchronized, as is the
* run() method, so only one key can be serviced at a given time. Before
* waking the worker thread, and before returning to the main selection
* loop, this key’s interest set is updated to remove OP_READ. This will
* cause the selector to ignore read-readiness for this channel while
* the worker thread is servicing it.
*/
synchronized void serviceChannel(SelectionKey key) {
this.key = key;
key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
this.notify(); // awaken the thread
}
/**
* The actual code which drains the channel associated with the given
* key. This method assumes the key has been modified prior to
* invocation to turn off selection interest in OP_READ. When this
* method completes it re-enables OP_READ and calls wakeup() on the
* selector so the selector will resume watching this channel.
*/
void drainChannel(SelectionKey key) throws Exception {
System.out.println(this.getName() + “here2″);
SocketChannel channel = (SocketChannel) key.channel();
int count;
buffer.clear(); // make buffer empty
// loop while data available, channel is non-blocking
while ((count = channel.read(buffer)) > 0) {
buffer.flip(); // make buffer readable
// send the data, may not go all at once
while (buffer.hasRemaining()) {
channel.write(buffer);
}
// WARNING: the above loop is evil.
// See comments in superclass.
buffer.clear(); // make buffer empty
}
if (count < 0) {
// close channel on EOF, invalidates the key
System.out.println(this.getName() + "here3");
channel.close();
System.out.println(this.getName() + "here4");
return;
}
// resume interest in OP_READ
System.out.println(this.getName() + "here5");
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
System.out.println(this.getName() + "here6");
// cycle the selector so this key is active again
key.selector().wakeup();
}
}
}
goldendoc,你好,有个问题想请教:上边两个类SelectSockets.class不好在select()阻塞,SelectSocketsThreadPool.class如果不调用key.interestOps(key.interestOps() | SelectionKey.OP_READ);和key.selector().wakeup();就会阻塞在select()上,这是为什么,看了你对源码的分析,没找对答案,希望有空看下我贴的代码,多谢
我看了您贴的代码,有部分代码有遗漏,您的问题,如果我没有理解错的话,是指为什么会阻塞在select()上是吗?
可以看看Selector类的方法声明,会出现以下三个方法:
select(long timeout):在给定的timeout时间内来不断轮询,正常情况下有三种方法可以返回:
timeout已到,返回,不保证拿到ready事件
timeout未到,非中断返回,select轮询到ready事件
timeout未到,因为外部中断返回,如wakeup之类,不保证拿到ready事件
selectNow():就是简单一憋,看看有没有ready的事件,不管有没有,事实上selectNow等效于select(0)
select():它最单纯,拿不到ready我还不回了,除非被中断
其实对应于底层的实现,就是epoll之类,可以参照前后几篇,都有介绍
最后回到你遇到的现象:为什么为阻塞到select()上?
首先,代码中创建了一个ServerSocketChannel,并注册OP_ACCEPT事件,然后就在while(true) { selector.select(); },注意到,因为并没有加timeout所以只可能
因为中断或者轮询到感兴趣事件才能返回,感兴趣时间就只有OP_ACCEPT了,实际上,源码中并没有安排在其它线程中通过SocketChannel连接到Server端,自然该事件
不会满足,所以只有这两个条件之一满足,才能返回.
从你贴的代码可以看到一个很常见的server端的例子,其实如jetty,netty之类在做这件事情上就做了很多变化:
如select.select(10)之类的,相当于在自己的代码逻辑里也做了简单的轮询,这种方法见仁见智,特定环境下才能有所体现。
希望能解答您的疑惑,仍有问题,欢迎提问.
PS: 怪我写文章没有串联好,不方便读,后面准备重新组织一版。
看了你的文章,感觉和你一比,我他妈的就不是程序员……