- UID
- 872238
|
您的位置linux内核工作队列讲解及其源码详细注释更新于2011-12-21 20:39:07文章出处: 互联网关键字: 工作队列 回调函数 linux内核
1. 前言
工作队列(workqueue)的Linux内核中的定义的用来处理不是很紧急事件的回调方式处理方法。
以下代码的linux内核版本为2.6.19.2, 源代码文件主要为kernel/workqueue.c.
2. 数据结构
/* include/linux/workqueue.h */ // 工作节点结构struct work_struct { // 等待时间unsigned long pending;// 链表节点struct list_head entry;// workqueue回调函数void (*func)(void *);// 回调函数func的数据void *data;// 指向CPU相关数据, 一般指向struct cpu_workqueue_struct结构void *wq_data;// 定时器struct timer_list timer;};
struct execute_work { struct work_struct work;};
/* kernel/workqueue.c */ /* * The per-CPU workqueue (if single thread, we always use the first * possible cpu)。
* * The sequence counters are for flush_scheduled_work()。 It wants to wait * until all currently-scheduled works are completed, but it doesn't * want to be livelocked by new, incoming ones. So it waits until * remove_sequence is >= the insert_sequence which pertained when * flush_scheduled_work() was called. */ // 这个结构是针对每个CPU的struct cpu_workqueue_struct { // 结构锁spinlock_t lock;// 下一个要执行的节点序号long remove_sequence; /* Least-recently added (next to run) */ // 下一个要插入节点的序号long insert_sequence; /* Next to add */ // 工作机构链表节点struct list_head worklist;// 要进行处理的等待队列wait_queue_head_t more_work;// 处理完的等待队列wait_queue_head_t work_done;// 工作队列节点struct workqueue_struct *wq;// 进程指针struct task_struct *thread;int run_depth; /* Detect run_workqueue() recursion depth */ } ____cacheline_aligned;/* * The externally visible workqueue abstraction is an array of * per-CPU workqueues:*/ // 工作队列结构struct workqueue_struct { struct cpu_workqueue_struct *cpu_wq;const char *name;struct list_head list; /* Empty if single thread */ };
kernel/workqueue.c中定义了一个工作队列链表, 所有工作队列可以挂接到这个链表中:static LIST_HEAD(workqueues);
3. 一些宏定义
/* include/linux/workqueue.h */ // 初始化工作队列#define __WORK_INITIALIZER(n, f, d) { // 初始化list。entry = { &(n)。entry, &(n)。entry },// 回调函数。func = (f),// 回调函数参数。data = (d),// 初始化定时器。timer = TIMER_INITIALIZER(NULL, 0, 0),}
// 声明工作队列并初始化#define DECLARE_WORK(n, f, d)
struct work_struct n = __WORK_INITIALIZER(n, f, d)
/* * initialize a work-struct's func and data pointers:*/ // 重新定义工作结构参数#define PREPARE_WORK(_work, _func, _data)
do {(_work)->func = _func;(_work)->data = _data;} while (0)
/* * initialize all of a work-struct:*/ // 初始化工作结构, 和__WORK_INITIALIZER功能相同,不过__WORK_INITIALIZER用在// 参数初始化定义, 而该宏用在程序之中对工作结构赋值#define INIT_WORK(_work, _func, _data)
do { INIT_LIST_HEAD(&(_work)->entry);(_work)->pending = 0;PREPARE_WORK((_work), (_func), (_data));init_timer(&(_work)->timer);} while (0)
4. 操作函数
4.1 创建工作队列
一般的创建函数是create_workqueue, 但这其实只是一个宏:/* include/linux/workqueue.h */ #define create_workqueue(name) __create_workqueue((name), 0)
在workqueue的初始化函数中, 定义了一个针对内核中所有线程可用的事件工作队列, 其他内核线程建立的事件工作结构就都挂接到该队列:void init_workqueues(void)
{……
keventd_wq = create_workqueue("events");……
}
核心创建函数是__create_workqueue:
struct workqueue_struct *__create_workqueue(const char *name,int singlethread)
{ int cpu, destroy = 0;struct workqueue_struct *wq;struct task_struct *p;// 分配工作队列结构空间wq = kzalloc(sizeof(*wq), GFP_KERNEL);if (!wq)
return NULL;// 为每个CPU分配单独的工作队列空间wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);if (!wq->cpu_wq) { kfree(wq);return NULL;} wq->name = name;mutex_lock(&workqueue_mutex);if (singlethread) { // 使用create_workqueue宏时该参数始终为0 // 如果是单一线程模式, 在单线程中调用各个工作队列// 建立一个的工作队列内核线程INIT_LIST_HEAD(&wq->list);// 建立工作队列的线程p = create_workqueue_thread(wq, singlethread_cpu);if (!p)
destroy = 1;else // 唤醒该线程wake_up_process(p);} else { // 链表模式, 将工作队列添加到工作队列链表list_add(&wq->list, &workqueues);// 为每个CPU建立一个工作队列线程for_each_online_cpu(cpu) { p = create_workqueue_thread(wq, cpu);if (p) { // 绑定CPU kthread_bind(p, cpu);// 唤醒线程wake_up_process(p);} else destroy = 1;} mutex_unlock(&workqueue_mutex);/* * Was there any error during startup? If yes then clean up:*/ if (destroy) { // 建立线程失败, 释放工作队列destroy_workqueue(wq);wq = NULL;} return wq;} EXPORT_SYMBOL_GPL(__create_workqueue);
// 创建工作队列线程static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,int cpu)
{ // 每个CPU的工作队列struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);struct task_struct *p;spin_lock_init(&cwq->lock);// 初始化cwq->wq = wq;cwq->thread = NULL;cwq->insert_sequence = 0;cwq->remove_sequence = 0;INIT_LIST_HEAD(&cwq->worklist);// 初始化等待队列more_work, 该队列处理要执行的工作结构init_waitqueue_head(&cwq->more_work);// 初始化等待队列work_done, 该队列处理执行完的工作结构init_waitqueue_head(&cwq->work_done);// 建立内核线程work_thread if (is_single_threaded(wq))
p = kthread_create(worker_thread, cwq, "%s", wq->name);else p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);if (IS_ERR(p))
return NULL;// 保存线程指针cwq->thread = p;return p;} static int worker_thread(void *__cwq)
{ struct cpu_workqueue_struct *cwq = __cwq;// 声明一个等待队列DECLARE_WAITQUEUE(wait, current);// 信号struct k_sigaction sa;sigset_t blocked;current->flags |= PF_NOFREEZE;// 降低进程优先级, 工作进程不是个很紧急的进程,不和其他进程抢占CPU,通常在系统空闲时运行set_user_nice(current, -5);/* Block and flush all signals */ // 阻塞所有信号sigfillset(&blocked);sigprocmask(SIG_BLOCK, &blocked, NULL);flush_signals(current);/* * We inherited MPOL_INTERLEAVE from the booting kernel. * Set MPOL_DEFAULT to insure node local allocations. */ numa_default_policy();/* SIG_IGN makes children autoreap: see do_notify_parent()。 */ // 信号处理都是忽略sa.sa.sa_handler = SIG_IGN;sa.sa.sa_flags = 0;siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);// 进程可中断set_current_state(TASK_INTERRUPTIBLE);// 进入循环, 没明确停止该进程就一直运行while (!kthread_should_stop()) { // 设置more_work等待队列, 当有新work结构链入队列中时会激发此等待队列add_wait_queue(&cwq->more_work, &wait);if (list_empty(&cwq->worklist))
// 工作队列为空, 睡眠schedule();else // 进行运行状态__set_current_state(TASK_RUNNING);// 删除等待队列remove_wait_queue(&cwq->more_work, &wait);// 按链表遍历执行工作任务if (!list_empty(&cwq->worklist))
run_workqueue(cwq);// 执行完工作, 设置进程是可中断的, 重新循环等待工作set_current_state(TASK_INTERRUPTIBLE);} __set_current_state(TASK_RUNNING);return 0;}
// 运行工作结构static void run_workqueue(struct cpu_workqueue_struct *cwq)
{ unsigned long flags;/* * Keep taking off work from the queue until * done. */ // 加锁spin_lock_irqsave(&cwq->lock, flags);// 统计已经递归调用了多少次了cwq->run_depth++;if (cwq->run_depth > 3) { // 递归调用此时太多/* morton gets to eat his hat */ printk("%s: recursion depth exceeded: %dn",__FUNCTION__, cwq->run_depth);dump_stack();} // 遍历工作链表while (!list_empty(&cwq->worklist)) { // 获取的是next节点的struct work_struct *work = list_entry(cwq->worklist.next,struct work_struct, entry);void (*f) (void *) = work->func;void *data = work->data;// 删除节点, 同时节点中的list参数清空list_del_init(cwq->worklist.next);// 解锁// 现在在执行以下代码时可以中断,run_workqueue本身可能会重新被调用, 所以要判断递归深度spin_unlock_irqrestore(&cwq->lock, flags);BUG_ON(work->wq_data != cwq);// 工作结构已经不在链表中clear_bit(0, &work->pending);// 执行工作函数f(data);// 重新加锁spin_lock_irqsave(&cwq->lock, flags);// 执行完的工作序列号递增cwq->remove_sequence++;// 唤醒工作完成等待队列, 供释放工作队列wake_up(&cwq->work_done);} // 减少递归深度cwq->run_depth——;// 解锁spin_unlock_irqrestore(&cwq->lock, flags);} |
|