Java NIO 选择器(Selector) 知识预备 (linux poll)

最近花了些功夫在研究Java NIO的JDK源码,发现Selector的实现,除了在唤醒机制上做了手脚,主要依赖操作系统的实现,为了无负担的弄懂Selector,有必要研究一下操作系统是如何实现选择的。本文主要参考linux-2.6.10内核poll的实现。

本文可能会表现得很肤浅,高手们请直接略过,另外,本文所出现的“政府”字样,乃比喻性质的,或者就认为它是“清政府”好了,请相关人员不要曲解。

先从poll开刀,poll中文译“轮询”,非常贴切,不错,他就是干这个的。man一下poll,可以看到这样的函数定义:

#include
int poll(struct pollfd *fds, nfds_t nfds, int timeout);

其中,pollfd是这样的一个结构体:

struct pollfd {
    int   fd;         /* file descriptor */
    short events;     /* requested events */
    short revents;    /* returned events */
};

而,nfds_t猜测一下,就应该知道是一个整数类型,没错,它确实是这样定义的:

typedef unsigned long nfds_t

很容易从fds指针和nfds的无符号整数看出,这是一个pollfd数组,有效元素个数为nfds。

现在,我们大胆的猜测一下这个poll在做什么吧,算上timeout,我们可以想象出这样一幅场景:

poll两手叉腰,满脸横肉,目中无人似的对我们说:“你们这些小喽啰只要给我一组pollfd,把你们感兴趣的事件告诉我,我就能在规定时间timeout之内告诉你们哪些事件发生,厉害吧?”

鄙人从小就有大侠梦,最看不惯的就是这副嘴脸,于是深夜裹黑衣,房梁探究竟。
不就是个政府帮办(系统调用)嘛,先找到你家再说,走街串巷,东寻西问,原来你丫躲在fs/select.c,眼看大门sys_poll还挺高,够气派,果然是给政府办事的。小心为上,先分段探查一下大门:

asmlinkage long sys_poll(struct pollfd __user * ufds, unsigned int nfds, long timeout)
{
    //疑点一:看情形可能是等待队列之类的
    struct poll_wqueues table;
    int fdcount, err;
    unsigned int i;
    //疑点二:poll_list是什么东东,还一下定义了两个指针,看来得先停下来研究一下
    struct poll_list *head;
    struct poll_list *walk;

于是提起poll_wqueues一家:
[linux/poll.h]

typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *);

先不管poll_queue_proc,反正就是个函数指针,看参数wait_queue_head_t,肯定与等待队列脱不了干系。

typedef struct poll_table_struct {
    poll_queue_proc qproc;
} poll_table;

看到poll_table,就更懒得理了,你丫就把上面那函数指针封装成一个结构体。

struct poll_wqueues {
    poll_table pt;
    struct poll_table_page * table;
    int error;
};

poll_table我认识,就是上面封装了一个与等待队列有关的函数指针的结构体类型,至于poll_table_page,还是不认识,先不评论,先找找它:
[fs/select.c]

struct poll_table_entry {
    struct file * filp;
    wait_queue_t wait;
    wait_queue_head_t * wait_address;
};

poll_table_entry有文件指针,有等待队列二人组wait_queue_t和wait_queue_head_t*,还敢说不是与等待队列有关系?wait_queue_head_t* wait_address不就是一个等待队列的头指针嘛,wait_queue_t wait自然就是等待队列中的项了。

struct poll_table_page {
    struct poll_table_page * next;
    struct poll_table_entry * entry;
    struct poll_table_entry entries[0];
};

至于poll_table_page,看到自己内部再定义一个poll_table_page*,不用说,就是一个链表,不过看到下面关于poll_table_entry的两个奇怪的定义,似曾相识,哦,不就是政府常用的小伎俩嘛:
entry与entries相互配合,看似优雅的组成了一个poll_table_entry动态数组,entry为poll_table_entry的一个指针,总指向第一个创建poll_table_entry的地址,entries看似一个数组,但元素个数居然为0,事实上,它是不占空间的,它充当着数组首地址的角色,不信你可以用sizeof(struct poll_table_page)尝试一下,结果肯定是8(32位政府),即为两个指针的空间大小。
我想在某处,肯定会这样来使用entry和entries:

// 初始化时,entry指向了entries,因为这正是数组首地址,
// 在这里即可创建poll_table_entry变量
new_table->entry = new_table->entries;
struct poll_table_entry * entry = table->entry;
// entry自然的指向了下一个可创建poll_table_entry变量的地址
table->entry = entry+1;

看着这一家子,我总结一下poll_wqueues扮演的角色:

poll_wqueues掌管一个封装了与等待队列有关的函数指针的结构体变量,同时掌管着一个poll_table_page链表的头指针,按照政府一贯的办事风格,这个函数指针应该是一个回调函数指针,至于poll_table_page链表,应该是一个循环链表,链表中的每一项包含poll_table_entry数组,而数组中的每一项又可以是等待队列wait_queue中的一个元素。讲一大堆反而让人糊涂,上一幅图来说明:

这么看来,你丫不就是管理等待队列和回调函数嘛,个管家还整个分页啥的。

再看看poll_list:
[fs/select.c]

struct poll_list {
    struct poll_list *next;
    int len;
    struct pollfd entries[0];
};

咋和poll_table_entry一个德性,果然都是政府的好帮手。很明显,poll_list组成一个链表,链表的每一项包含一个pollfd数组,len即为第一个可以创建pollfd变量的数组下标,entries即为数组首地址。至于为什么又要用链表,肯定又是用来分页的。
政府从来都不是很相信人民的,我记得我传进来的是一个pollfd数组,我想到时候肯定会被拷贝到这所谓的poll_list链表中去吧。

回到大门,再探:

    /* Do a sanity check on nfds ... */
    if (nfds > current->files->max_fdset && nfds > OPEN_MAX)
        return -EINVAL;

原来pollfd的数量不是无限的,也是,哪能让你那么好过。

    if (timeout) {
        /* Careful about overflow in the intermediate values */
        if ((unsigned long) timeout < MAX_SCHEDULE_TIMEOUT / HZ)
            timeout = (unsigned long)(timeout*HZ+999)/1000+1;
        else /* Negative or overflow */
            timeout = MAX_SCHEDULE_TIMEOUT;
    }

timeout看来也被参和了一下,还严正其词:要规范,要和谐。

    poll_initwait(&table);

等等,这个实在干啥?初始化什么?刚刚那个poll_wqueues变量?你丫终于被我逮到了:
[fs/select.c]

void poll_initwait(struct poll_wqueues *pwq)
{
    init_poll_funcptr(&pwq->pt, __pollwait);
    pwq->error = 0;
    pwq->table = NULL;
}

[linux/poll.h]

static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)
{
    pt->qproc = qproc;
}

居然就只是把poll_wqueues table做了个初始化,等等,__pollwait是什么?上文猜测它是回调函数,下面就把它拎出来看看:
[fs/select.c]

void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *_p)
{
    //根据poll_wqueues内部变量pt的地址推算出poll_wqueues的地址
    struct poll_wqueues *p = container_of(_p, struct poll_wqueues, pt);
    //拿到上文说的链表头结点指针
    struct poll_table_page *table = p->table;

    //如果该指针为空,或者对应页已经不能存放更多的entry了,
    //我记得刚才初始化时,table应该为NULL,则该函数第一次调用时,应该会走这个分支了
    if (!table || POLL_TABLE_FULL(table)) {

        struct poll_table_page *new_table;

        // 分配新的一页空间给poll_table_page指针new_table
        new_table = (struct poll_table_page *) __get_free_page(GFP_KERNEL);
        //分配失败返回,恢复线程状态为TASK_RUNNING
        if (!new_table) {
            p->error = -ENOMEM;
            __set_current_state(TASK_RUNNING);
            return;
        }
        //第一次都这样,entry指向了该页的entry数组首地址
        new_table->entry = new_table->entries;
        //果不其然,next指向了poll_wqueues所指向的地址,最终形成了环
        new_table->next = table;
        // 因为新增了一页,则poll_wqueues指针自然更新而指向了new_table,
        // 从这里可以看出,poll_wqueues的table指针永远指向了最新一页的poll_table_page
        p->table = new_table;
        // table更新为new_table,便于后续添加poll_table_entry
        table = new_table;
    }

    /* Add a new entry */
    {
        //拿到第一个可以存放poll_table_entry变量的地址
        struct poll_table_entry * entry = table->entry;
        // entry偏移,指向了新的第一个可以存放poll_table_entry变量的地址
        table->entry = entry+1;
        // #define get_file(x)	atomic_inc(&(x)->f_count)
        // 可见,get_file增加filp的f_count引用计数,
        // 意思是有一个新的entry将要引用filp指针了
        get_file(filp);
        entry->filp = filp;
        entry->wait_address = wait_address;
        init_waitqueue_entry(&entry->wait, current); //见下文解释
        add_wait_queue(wait_address,&entry->wait);
    }
}

上述代码最后这两步比较常见,先看init_waitqueue_entry:
[linux/wait.h]

static inline void init_waitqueue_entry(wait_queue_t *q, struct task_struct *p)
{
    q->flags = 0;
    q->task = p;
    q->func = default_wake_function;
}

简单的讲,就是初始化等待队列项,挂载当前线程current;
而add_wait_queue即为等待队列的常用操作,将entry中的wait_queue_t变量wait,加入到等待队列头结点的后面。
综合这两步,其实就是将当前线程current挂载到等待队列上。
这里可以先小透一下:轮询工作,最终会落实到各个具体的file上,如tcp的tcp_poll在得到设备回应之前,
会执行poll_wait(file, sk->sk_sleep, wait),具体函数定义如下:
[linux/poll.h]

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
    if (p && wait_address)
        p->qproc(filp, wait_address, p);
}

意思是,如果等待队列指针不为空,并且poll_table不为空,则会执行回调函数,由poll_wait(file, sk->sk_sleep, wait)我们可以知道,sk->sk_sleep即为设备sock自己的等待队列,执行完之后,当前线程挂载到了该设备的等待队列上,当设备状态变更时,会唤醒线程,就可以开始新的一次轮询,即可以拿到更新后的设备状态,我们通过比较状态可以确认自己感兴趣的事件是否发生。
回顾一下__pollwait所做的工作:每当__pollwait被触发时,它在poll_wqueues新创建一个entry,然后将当前线程current挂载到entry中的wait上,并加入到当前设备的等待队列中。

回过神,有点耐心,再探大门:

    head = NULL;
    walk = NULL;
    i = nfds;
    err = -ENOMEM;
    while(i!=0) {
        struct poll_list *pp;
        pp = kmalloc(sizeof(struct poll_list)+
                sizeof(struct pollfd)*
                (i>POLLFD_PER_PAGE?POLLFD_PER_PAGE:i),
                    GFP_KERNEL);
        if(pp==NULL)
            goto out_fds;
        pp->next=NULL;
        pp->len = (i>POLLFD_PER_PAGE?POLLFD_PER_PAGE:i);
        if (head == NULL)
            head = pp;
        else
            walk->next = pp;

        walk = pp;
        if (copy_from_user(pp->entries, ufds + nfds-i,
                sizeof(struct pollfd)*pp->len)) {
            err = -EFAULT;
            goto out_fds;
        }
        i -= pp->len;
    }

看到这整整一大段while,我笑了,你丫的写这么多,不就是为了将用户传入的pollfd数组一个个拷贝到poll_list链表中吗?链表的每个结点最多存POLLFD_PER_PAGE个pollfd,跟poll_table_page存储entry异曲同工嘛。

    fdcount = do_poll(nfds, head, &table, timeout);

到底是混官场多年的,居然收了个单,就把跑腿工作交给do_poll了。
我们先看它传给do_poll的参数,nfds为用户传入的整数,代表传入的pollfd的数量,而head即为拷贝后的poll_list链表,其实内容即为用户传入的pollfd,而table自然就是上文说明的等待队列大管家,timeout经和谐后还是传给了do_poll。于是,do_poll悲惨的跑腿生活就开始了:
[fs/select.c]

static int do_poll(unsigned int nfds,  struct poll_list *list,
            struct poll_wqueues *wait, long timeout)
{
    int count = 0;
    // pt即为poll_initwait(&table);初始化后的结果,其实就是__pollwait
    poll_table* pt = &wait->pt;

    // 看到这里,大家应该明白了为什么timeout==0是非阻塞的,因为无回调函数,
    // poll_wait实际什么都没做,即当前线程不会挂入等待队列,
    //设置pt=NULL这一招非常管用,后文仍有巧妙运用的地方。
    if (!timeout)
        pt = NULL;

    for (;;) { //好吧,大循环
        // poll_list的代步指针
        struct poll_list *walk;
        //标准的与schedule_timeout配合,因为schedule_timeout会阻塞当前线程,
        //故需要设置当前线程状态为可中断的
        set_current_state(TASK_INTERRUPTIBLE);
        walk = list;
        //好吧,遍历poll_list链表,从上文大家应该注意到,
        //链表每个结点上都存储了固定数量的pollfd
        while(walk != NULL) {
            // 针对一个poll_list结点,执行do_pollfd,晕,do_poll我看错你了,
            // 以为你是良民,不会欺压百姓,结果还是把担子扔给do_pollfd
            do_pollfd( walk->len, walk->entries, &pt, &count);
            // 遍历指针后移
            walk = walk->next;
        }
        /*这里pt设置为空,则下次循环不会再调用__pollwait将当前线程挂入等待队列,可是为什么要这么做呢?
          do_pollfd遍历一个结点的pollfd数组时,会因pt=__pollwait将当前线程一个一个挂入fd对应设备的等待队列中,
          整个while操作执行完后,表明所有pollfd均被访问到,如果过程中count不为0,则会直接返回,反之,当count==0,
          则表明没有询问结果,则不会跳出循环,于是就因schedule_timeout而阻塞,等待设备来唤醒当前线程;
          一旦设备因状态改变,唤醒了当前线程,当前线程于是开启了另一次while循环,继续轮询是否有fd的感兴趣事件ready,
          第二次轮询之前,如果不设置pt=NULL,则轮询中又会将当前线程挂入等待队列一次,这是没有必要的*/
        pt = NULL;
        //如果有事件发生,或者已经超时,或者被中断了,则会跳出循环
        if (count || !timeout || signal_pending(current))
            break;
        //统计一遍之后发现还没时间发生,就查看一下等待队列项中的错误信息,默认错误信息为0,
        //当不为0时,则退出循环,返回错误信息,错误信息为负数
        count = wait->error;
        if (count)
            break;
        /*线程超时睡眠就靠它了,执行schedule_timeout之后,当前线程即陷入睡眠,即阻塞,停止阻塞有两种方式,
          一种是超时,另一种即为当前线程从等待队列中被唤醒,schedule_timeout在消耗timeout时间,
          正常返回时值为0,被唤醒时值为剩余值,除非被中断或者找到结果,否则最终会因为timeout==0而退出循环*/
        timeout = schedule_timeout(timeout);
    }
    //恢复线程状态为运行中
    __set_current_state(TASK_RUNNING);
    //返回ready的数量,或者错误信息
    return count;
}

好吧,最后看看被压迫的do_pollfd吧,这回总该没看错吧:
[fs/select.c]

static void do_pollfd(unsigned int num, struct pollfd * fdpage,
    poll_table ** pwait, int *count)
{
    int i;

    //遍历poll_list中一个结点中的pollfd数组
    for (i = 0; i < num; i++) {          
        int fd;         
        unsigned int mask;
        struct pollfd *fdp;                           

        mask = 0;
        fdp = fdpage+i;
        fd = fdp->fd;
        if (fd >= 0) {
            //根据fd获取文件指针,同时增加其f_count引用计数
            struct file * file = fget(fd);
            mask = POLLNVAL;
            //如果拿到file,开始干活
            if (file != NULL) {
                mask = DEFAULT_POLLMASK;
                if (file->f_op && file->f_op->poll)
                    /*好吧,我无语了,本以为你小子是被压迫的,原来你也是个监工,这是个什么社会啊,
                     一层压迫一层。我在想file->f_op->poll是否也再压迫别人呢,事实是肯定的,
                     它至少还有设备可以压迫。言归正传,上文描述过一点,具体文件的poll操作过程中,
                     如果pwait不为空,即为__pollwait时,会将当前线程挂载到等待队列中,去等待设备的唤醒。
                     mask为当前fd返回的I/O状态值*/
                    mask = file->f_op->poll(file, *pwait);
                //不要那么乐观,即使返回了,只能说明有事件发生,是否是自己感兴趣的事件,
                //还得经过fdp->events的位与操作
                mask &= fdp->events | POLLERR | POLLHUP;
                //释放file的f_count引用计数,当引用计数为0时,会close掉file
                fput(file);
            }
            //经过一番洗劫,如果还能幸存下来,说明真的有事件发生了,这个时候,就替换pwait为空,
            //表明,后面不用在把当前线程挂入等待队列这么麻烦了,反正我已经统计到了,
            //后面的发生了就增加一下count,没发生也无所谓
            if (mask) {
                *pwait = NULL;
                (*count)++;
            }
        }
        //最后一步非常重要,可以发现,最终结果被写回到pollfd中的revents,好吧,
        //原来就这样而已,那我们只需要看revents是否为空不就知道有无事情发生了吗?也没那么巧妙嘛
        fdp->revents = mask;
    }
}

回顾一下do_poll和do_pollfd,通过遍历所有的pollfd,调用对应文件文件操作的poll过程,因等待回调函数为__pollwait,则会将当前线程挂入设备的等待队列中,等待对应设备的唤醒,设备唤醒当前线程后,poll知道有事情发生了,与是就开启了新的一轮轮询,除非找到感兴趣事件ready的fd或者等待超时或者被中断,轮询会一直进行下去。
另外,关于timeout,如果一开始就是为0,则do_pollfd至始至终都不会阻塞,又因timeout为0,则在遍历一般之后即会返回;当timeout>0时,由于schedule_timeout会将当前线程阻塞,并消耗timeout,schedule_timeout的正常返回值为0,而阻塞过程中被唤醒时,timeout更新为剩余的等待时间,不管怎么说,除非发现感兴趣事件已经ready或者被中断,则最终会因为timeout==0而退出循环。

貌似已经知道它们内部的勾当了,那么,找到结果后,又怎么返还给用户呢?要知道,现在结果不是还放在poll_list链表中吗?
是的,所以不用猜也知道下面在干什么,肯定是收集结果并修改对应用户pollfd中的revents呗:

    /* OK, now copy the revents fields back to user space. */
    walk = head;
    err = -EFAULT;

walk即为poll_list链表的代步指针了。

	while(walk != NULL) {

好吧,walk在遍历整个poll_list链表,知道其为空。

		struct pollfd *fds = walk->entries;

拿到poll_list单个结点中pollfd数组的首地址。

		int j;

                for (j=0; j len; j++, ufds++) {

同步遍历poll_list结点中的pollfd数组和用户传入的pollfd数组。

			if(__put_user(fds[j].revents, &ufds->revents))
                             goto out_fds;

从poll_list结点pollfd数组单个元素中拷贝revents值到对应pollfd数组元素的revents中。通过该遍历操作,用户pollfd中的所有revents都被更新了一遍,用户只需要查看各个pollfd中的revents既可以知道结果。

		}
                walk = walk->next;
            }
            err = fdcount;
            if (!fdcount && signal_pending(current))
                err = -EINTR;

上文三行代码意思很简单:如果无ready事件发生,且当前线程是被中断的,则设置返回结果为-EINTR,反之无改变。

        out_fds:
            walk = head;
            while(walk!=NULL) {
                struct poll_list *pp = walk->next;
                kfree(walk);
                walk = pp;
            }
            poll_freewait(&table);
            return err;
        }

最后一段代码就不重点描述了,它其实就在释放两个链表:poll_list链表及table的poll_table_page链表。

探索完成,秘密也发现了,原来sys_poll居然说得没错,的确按它说的那样,我们传递给它一组pollfd,填充好自己感兴趣的事件,给定timeout,它的确能够在规定的时间内告诉我们哪些事件发生了,事件发生的标志即为对应的pollfd中的revents被修改掉了。因为默认传入的revents均为0,只要对应revents不为0,即表明有事件发生。

好吧,我承认,政府是有一套,毕竟有权利自然有力量,我还是不问了,乖乖使用就好了。

3 Comments

  1. argeric 说道:

    public class SelectSockets {
    public static int PORT_NUMBER = 1234;

    public static void main(String[] argv) throws Exception {
    new SelectSockets().go(argv);
    }

    public void go(String[] argv) throws Exception {
    int port = PORT_NUMBER;

    if (argv.length > 0) { // override default listen port
    port = Integer.parseInt(argv[0]);
    }

    System.out.println(“Listening on port ” + port);

    // allocate an unbound server socket channel
    ServerSocketChannel serverChannel = ServerSocketChannel.open();
    // Get the associated ServerSocket to bind it with
    ServerSocket serverSocket = serverChannel.socket();
    // set the port the server channel will listen to
    serverSocket.bind(new InetSocketAddress(port));

    // create a new Selector for use below
    Selector selector = Selector.open();

    // set non-blocking mode for the listening socket
    serverChannel.configureBlocking(false);

    // register the ServerSocketChannel with the Selector
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);

    while (true) {
    // this may block for a long time, upon return the
    // selected set contains keys of the ready channels
    System.out.println(“SelectSocket here 1″);
    int n = selector.select();
    System.out.println(“SelectSocket here 2″);

    if (n == 0) {
    continue; // nothing to do
    }

    // get an iterator over the set of selected keys
    Iterator it = selector.selectedKeys().iterator();

    // look at each key in the selected set
    while (it.hasNext()) {

    SelectionKey key = (SelectionKey) it.next();

    // Is a new connection coming in?
    if (key.isAcceptable()) {
    ServerSocketChannel server = (ServerSocketChannel) key
    .channel();
    SocketChannel channel = server.accept();

    registerChannel(selector, channel, SelectionKey.OP_READ);

    sayHello(channel);
    }

    // is there data to read on this channel?
    if (key.isReadable()) {
    readDataFromSocket(key);
    }

    // remove key from selected set, it’s been handled, important
    it.remove();
    }
    }
    }

    // ———————————————————-

    /**
    * Register the given channel with the given selector for the given
    * operations of interest
    */
    protected void registerChannel(Selector selector,
    SelectableChannel channel, int ops) throws Exception {
    if (channel == null) {
    return; // could happen
    }

    // set the new channel non-blocking
    channel.configureBlocking(false);

    // register it with the selector
    channel.register(selector, ops);
    }

    // ———————————————————-

    // Use the same byte buffer for all channels. A single thread is
    // servicing all the channels, so no danger of concurrent access.
    private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);

    /**
    * Sample data handler method for a channel with data ready to read.
    *
    * @param key
    * A SelectionKey object associated with a channel determined by
    * the selector to be ready for reading. If the channel returns
    * an EOF condition, it is closed here, which automatically
    * invalidates the associated key. The selector will then
    * de-register the channel on the next select call.
    */
    protected void readDataFromSocket(SelectionKey key) throws Exception {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    int count;
    buffer.clear(); // make buffer empty
    // loop while data available, channel is non-blocking
    while ((count = socketChannel.read(buffer)) > 0) {
    buffer.flip(); // make buffer readable

    // send the data, don’t assume it goes all at once
    while (buffer.hasRemaining()) {
    socketChannel.write(buffer);
    }
    // WARNING: the above loop is evil. Because
    // it’s writing back to the same non-blocking
    // channel it read the data from, this code can
    // potentially spin in a busy loop. In real life
    // you’d do something more useful than this.

    buffer.clear(); // make buffer empty
    }

    if (count < 0) {
    // close channel on EOF, invalidates the key
    socketChannel.close();
    }
    }

    // ———————————————————-

    /**
    * Spew a greeting to the incoming client connection.
    *
    * @param channel
    * The newly connected SocketChannel to say hello to.
    */
    private void sayHello(SocketChannel channel) throws Exception {
    buffer.clear();
    buffer.put("Hi there!\r\n".getBytes());
    buffer.flip();

    channel.write(buffer);
    }

    }
    //———————————————————————————————-
    public class SelectSocketsThreadPool extends SelectSockets {
    private static final int MAX_THREADS = 5;

    private ThreadPool pool = new ThreadPool(MAX_THREADS);

    // ————————————————————-

    public static void main(String[] argv) throws Exception {
    new SelectSocketsThreadPool().go(argv);
    }

    // ————————————————————-

    /**
    * Sample data handler method for a channel with data ready to read. This
    * method is invoked from the go() method in the parent class. This handler
    * delegates to a worker thread in a thread pool to service the channel,
    * then returns immediately.
    *
    * @param key
    * A SelectionKey object representing a channel determined by the
    * selector to be ready for reading. If the channel returns an
    * EOF condition, it is closed here, which automatically
    * invalidates the associated key. The selector will then
    * de-register the channel on the next select call.
    */
    protected void readDataFromSocket(SelectionKey key) throws Exception {
    WorkerThread worker = pool.getWorker();

    if (worker == null) {
    // No threads available, do nothing, the selection
    // loop will keep calling this method until a
    // thread becomes available. This design could
    // be improved.
    return;
    }

    // invoking this wakes up the worker thread then returns
    worker.serviceChannel(key);
    }

    // —————————————————————

    /**
    * A very simple thread pool class. The pool size is set at construction
    * time and remains fixed. Threads are cycled through a FIFO idle queue.
    */
    private class ThreadPool {
    List idle = new LinkedList();

    ThreadPool(int poolSize) {
    // fill up the pool with worker threads
    for (int i = 0; i 0) {
    worker = (WorkerThread) idle.remove(0);
    }
    }

    return worker;
    }

    /**
    * Called by the worker thread to return itself to the idle pool.
    */
    void returnWorker(WorkerThread worker) {
    synchronized (idle) {
    idle.add(worker);
    }
    }
    }

    /**
    * A worker thread class which can drain channels and echo-back the input.
    * Each instance is constructed with a reference to the owning thread pool
    * object. When started, the thread loops forever waiting to be awakened to
    * service the channel associated with a SelectionKey object. The worker is
    * tasked by calling its serviceChannel() method with a SelectionKey object.
    * The serviceChannel() method stores the key reference in the thread object
    * then calls notify() to wake it up. When the channel has been drained, the
    * worker thread returns itself to its parent pool.
    */
    private class WorkerThread extends Thread {
    private ByteBuffer buffer = ByteBuffer.allocate(1024);
    private ThreadPool pool;
    private SelectionKey key;

    WorkerThread(ThreadPool pool) {
    this.pool = pool;
    }

    // loop forever waiting for work to do
    public synchronized void run() {
    System.out.println(this.getName() + ” is ready”);

    while (true) {
    try {
    // sleep and release object lock
    this.wait();
    } catch (InterruptedException e) {
    e.printStackTrace();
    // clear interrupt status
    Thread.interrupted();
    }

    if (key == null) {
    continue; // just in case
    }

    System.out.println(this.getName() + ” has been awakened”);

    try {
    System.out.println(this.getName() + “here1″);
    drainChannel(key);
    } catch (Exception e) {
    System.out.println(“Caught ‘” + e + “‘ closing channel”);

    // close channel and nudge selector
    try {
    key.channel().close();
    } catch (IOException ex) {
    ex.printStackTrace();
    }

    key.selector().wakeup();
    }

    key = null;

    // done, ready for more, return to pool
    System.out.println(this.getName() + “here7″);
    this.pool.returnWorker(this);
    System.out.println(this.getName() + “here8″);
    }
    }

    /**
    * Called to initiate a unit of work by this worker thread on the
    * provided SelectionKey object. This method is synchronized, as is the
    * run() method, so only one key can be serviced at a given time. Before
    * waking the worker thread, and before returning to the main selection
    * loop, this key’s interest set is updated to remove OP_READ. This will
    * cause the selector to ignore read-readiness for this channel while
    * the worker thread is servicing it.
    */
    synchronized void serviceChannel(SelectionKey key) {
    this.key = key;

    key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));

    this.notify(); // awaken the thread
    }

    /**
    * The actual code which drains the channel associated with the given
    * key. This method assumes the key has been modified prior to
    * invocation to turn off selection interest in OP_READ. When this
    * method completes it re-enables OP_READ and calls wakeup() on the
    * selector so the selector will resume watching this channel.
    */
    void drainChannel(SelectionKey key) throws Exception {
    System.out.println(this.getName() + “here2″);
    SocketChannel channel = (SocketChannel) key.channel();
    int count;

    buffer.clear(); // make buffer empty

    // loop while data available, channel is non-blocking
    while ((count = channel.read(buffer)) > 0) {
    buffer.flip(); // make buffer readable

    // send the data, may not go all at once
    while (buffer.hasRemaining()) {
    channel.write(buffer);
    }
    // WARNING: the above loop is evil.
    // See comments in superclass.

    buffer.clear(); // make buffer empty
    }

    if (count < 0) {
    // close channel on EOF, invalidates the key
    System.out.println(this.getName() + "here3");
    channel.close();
    System.out.println(this.getName() + "here4");
    return;
    }

    // resume interest in OP_READ
    System.out.println(this.getName() + "here5");
    key.interestOps(key.interestOps() | SelectionKey.OP_READ);
    System.out.println(this.getName() + "here6");

    // cycle the selector so this key is active again
    key.selector().wakeup();
    }
    }
    }
    goldendoc,你好,有个问题想请教:上边两个类SelectSockets.class不好在select()阻塞,SelectSocketsThreadPool.class如果不调用key.interestOps(key.interestOps() | SelectionKey.OP_READ);和key.selector().wakeup();就会阻塞在select()上,这是为什么,看了你对源码的分析,没找对答案,希望有空看下我贴的代码,多谢

    • fp1203 说道:

      我看了您贴的代码,有部分代码有遗漏,您的问题,如果我没有理解错的话,是指为什么会阻塞在select()上是吗?
      可以看看Selector类的方法声明,会出现以下三个方法:

      select(long timeout):在给定的timeout时间内来不断轮询,正常情况下有三种方法可以返回:

      timeout已到,返回,不保证拿到ready事件
      timeout未到,非中断返回,select轮询到ready事件
      timeout未到,因为外部中断返回,如wakeup之类,不保证拿到ready事件

      selectNow():就是简单一憋,看看有没有ready的事件,不管有没有,事实上selectNow等效于select(0)
      select():它最单纯,拿不到ready我还不回了,除非被中断

      其实对应于底层的实现,就是epoll之类,可以参照前后几篇,都有介绍

      最后回到你遇到的现象:为什么为阻塞到select()上?
      首先,代码中创建了一个ServerSocketChannel,并注册OP_ACCEPT事件,然后就在while(true) { selector.select(); },注意到,因为并没有加timeout所以只可能
      因为中断或者轮询到感兴趣事件才能返回,感兴趣时间就只有OP_ACCEPT了,实际上,源码中并没有安排在其它线程中通过SocketChannel连接到Server端,自然该事件
      不会满足,所以只有这两个条件之一满足,才能返回.

      从你贴的代码可以看到一个很常见的server端的例子,其实如jetty,netty之类在做这件事情上就做了很多变化:
      如select.select(10)之类的,相当于在自己的代码逻辑里也做了简单的轮询,这种方法见仁见智,特定环境下才能有所体现。

      希望能解答您的疑惑,仍有问题,欢迎提问.
      PS: 怪我写文章没有串联好,不方便读,后面准备重新组织一版。

  2. 冯小卫 说道:

    看了你的文章,感觉和你一比,我他妈的就不是程序员……

Leave a Reply