精通
英语
和
开源
,
擅长
开发
与
培训
,
胸怀四海
第一信赖
本文分析了Apache中的队列Queue。
/////////////////////////////////////////////////////////////
//Apache源代码分析——配置命令执行过程
//张中庆于西安交通大学软件所
//tingya$stu,xjtu,edu,cn,将$换成@,,换成.防止地址被收集
//转载请保留出处
//最初出自西安交通大学兵马俑Linux版
//////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
队列通常用来保存
其中最重要的几个数据结构就是fd_queue_info_t,
fd_queue_info_t struct fd_queue_info_t { int idlers; apr_thread_mutex_t *idlers_mutex; apr_thread_cond_t *wait_for_idler; int terminated; int max_idlers; apr_pool_t **recycled_pools; int num_recycled; };
idlers标记当前队列中的空闲消息数目,外部模块根据该值检测是否有可用的消息。如果idlers<=0,则外部调用必须等待,否则从队列中取出消息。
当对队列进行访问的时候必须保证互斥性,这个可以通过队列结构中内嵌的idlers_mutex来保证。
Wait_for_idler用来进行同步处理。如果idlers<=0,队列调用wait_for_idle的Wait使外部模块等待。一旦idlers>=0,其将发送Signal信号唤醒等待的外部调用。
max_idlers表明队列中允许的最大的空闲消息数目。
recycled_pools和num_recycled
///////////////////////////////////////////////////////////////////////////////////////
apr_status_t ap_queue_info_create(fd_queue_info_t **queue_info, apr_pool_t *pool, int max_idlers)
函数描述:
该函数用来创建消息队列数组queue_info,每个数组元素为fd_queue_info_t类型。创建的消息队列从内存池pool中申请,消息队列最大空闲数目为max_idlers。
流程描述:
函数首先从内存池pool中使用apr_palloc分配一块sizeof(fd_queue_info_t)大小的内存块,对其初始化,将其赋值给内部fd_queue_info_t类型的qi变量。
一旦分配成功,则调用apr_thread_mutex_create对qi中的mutex锁以及同步条件变量进行创建和初始化。继而函数从内存中分配max_idlers个子内存池。最后调用apr_pool_cleanup_register(pool, qi, queue_info_cleanup, apr_pool_cleanup_null)注册内存池的清理函数。
///////////////////////////////////////////////////////////////////////////////////////
apr_status_t ap_queue_info_set_idle(fd_queue_info_t *queue_info,apr_pool_t *pool_to_recycle)
函数描述:
该函数将内存池pool_to_recyle放入queue_info的备用内存池中。
流程描述:
函数调用之前,首先调用queue_info中的锁进行锁定保证互斥操作。在所有的操作结束后,函数还必须调用该锁进行解锁操作。
一旦函数进行了锁定,如果pool_to_recycle为有效的内存池,函数将将其放入queue_info的recycled_pools数组的末尾,同时空闲内存池数目变量num_recycled加一。
至此,如果当前的idlers数目>=0,表示存在空闲消息队列,此时可以调用signal方法通知等待的调用进程。
///////////////////////////////////////////////////////////////////////////////////////
apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t *queue_info, apr_pool_t **recycled_pool)
函数描述:
该函数与ap_queue_info_set_idle是对应的孪生函数。ap_queue_info_set_idle往消息队列中增加空闲内存池,而ap_queue_info_wait_for_idle等待从消息队列中取空闲内存池,如果取不到则等待。取到的空闲内存池在recyled_pool中返回。
流程描述:
当对queue_info中的idler_mutex锁定之后,函数将不断的对两个变量进行判断:idlers和terminiated。如果terminated为1,表示Apache要求其退出等待,此时不再循环,直接退出。如果idlers>=0,系统直接取出空闲内存池。只有当terminated=0并且idlers的时候,函数将调用queue_info内在的wait_for_idle进行等待。即执行下面的代码:
while ((queue_info->idlers == 0) && (!queue_info->terminated)) {rv = apr_thread_cond_wait(queue_info->wait_for_idler, queue_info->idlers_mutex);
一旦等待进行被Signal被唤醒,则表明当前存在空闲的内存池使用,此时idlers将立即减一。同时如果空闲内存池的数目num_recycled不为0,则将第num_recyled-1个元素返回出去。
///////////////////////////////////////////////////////////////////////////////////////
此外函数中还定义了几个与消息队列相关的宏:
#define ap_queue_full(queue) ((queue)->nelts == (queue)->bounds)
该宏用来判断fd_queue_t是否已经满了。
#define ap_queue_empty(queue) ((queue)->nelts == 0)
判断给定的fd_queue_t是否为空
上面的两个宏是非线程安全的,因此调用该代码的时候必须放入临界区执行。
///////////////////////////////////////////////////////////////////////////////////////
除了fd_queue_info_t结构之外,关于队列的另外两个重要的数据结构就是fd_queue_t和fd_queue_elem_t。
fd_queue_elem_t描述的使消息队列中放的数据的内容,其定义如下。从中可以看出,每个消息队列包含两个内容,侦听socket描述sd和其该socket所处的内存池。
struct fd_queue_elem_t { apr_socket_t *sd; apr_pool_t *p; }; struct fd_queue_t { fd_queue_elem_t *data; int nelts; int bounds; apr_thread_mutex_t *one_big_mutex; apr_thread_cond_t *not_empty; int terminated; };
fd_queue_t 是是一个堆栈结构。data中保存实际的堆栈元素,堆栈元素由bounds决定。nelts为用来标记当前的“水位”,即最上面的可用的元素的索引。
one_big_mutex是结构中内嵌的锁,当多线程访问内部数据,比如data,nelts等的时候确保互斥。not_empty是条件变量,用于同步对队列的访问。当队列为空的时候调用not_empty阻塞,否则调用signal唤醒阻塞。
与queue对应的函数包括下面的几个:
apr_status_t ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a); apr_status_t ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p); apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p); apr_status_t ap_queue_interrupt_all(fd_queue_t *queue); apr_status_t ap_queue_term(fd_queue_t *queue); /////////////////////////////////////////////////////////////////////////////////////// apr_status_t ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a);
函数描述:
该函数实际上队列的入口函数,其用来创建一个队列,创建队列所需要的内存从内存池a中申请。
Queue返回创建后的队列指针,queue_capacity是队列的容量,即能够容纳的元素的最大个数。
流程描述:
函数初始化的第一件事情就是创建queue中内嵌的one_big_mutex和条件变量not_empty。一旦创建成功,函数将从内存池a中分配queue_capacity*sizeof(ap_elem_t)的空间,用来“装入”queue_capacity个元素。空间指针由queue->data进行保存。同时queue->bounds赋值为capacity,nelts置为0。同时将queue_capacity个元素中的套接字置为NULL,表示尚未使用。
最后在pool中登记queue销毁时候需要调用的处理函数。当pool中对queue进行销毁的时候将调用ap_queue_destroy函数进行处理。
///////////////////////////////////////////////////////////////////////////////////////
apr_status_t ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p)
函数描述:
该函数将sd“压入”队列queue中。Queue一个先进后出的堆栈一样。
流程描述:
函数在压入之前,将内部代码通过对one_big_mutex进行锁定设置成为临界区,既而函数必须确保当前的空间没有被全部占用,尚有空闲,其次要确保外部没有要求退出。一旦这两项都满足之后,那么函数将元素压入最上面的位置,即netls,同时nelts上移一格,指向新的空闲位置,代码如下:
fd_queue_elem_t *elem; elem = &queue->data[queue->nelts]; elem->sd = sd; elem->p = p; queue->nelts++;
一旦压入新的元素,函数将发出广播通知所有的等待的调用者可以获取新放入的元素。
///////////////////////////////////////////////////////////////////////////////////////
apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p)
函数描述:
该函数从队列queue中弹出一个元素,取得其中的相关元素。
流程描述:
函数在操作之前将内部代码通过对one_big_mutex进行锁定设置成为临界区。如果队列中没有可用消息,则等待,否则直接取出第nelts-1个元素,同时将“水位”减一。
elem = &queue->data[--queue->nelts]; *sd = elem->sd; *p = elem->p;