首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

inux内核工作队列讲解和源码详细注释 01

inux内核工作队列讲解和源码详细注释 01

4.2 释放工作队列/** * destroy_workqueue - safely terminate a workqueue * @wq: target workqueue * * Safely destroy a workqueue. All work currently pending will be done first. */ void destroy_workqueue(struct workqueue_struct *wq)
  { int cpu;// 清除当前工作队列中的所有工作flush_workqueue(wq);/* We don't need the distraction of CPUs appearing and vanishing. */ mutex_lock(&workqueue_mutex);// 结束该工作队列的线程if (is_single_threaded(wq))
  cleanup_workqueue_thread(wq, singlethread_cpu);else { for_each_online_cpu(cpu)
  cleanup_workqueue_thread(wq, cpu);list_del(&wq->list);} mutex_unlock(&workqueue_mutex);// 释放工作队列中对应每个CPU的工作队列数据free_percpu(wq->cpu_wq);kfree(wq);} EXPORT_SYMBOL_GPL(destroy_workqueue);
  /** * flush_workqueue - ensure that any scheduled work has run to completion. * @wq: workqueue to flush * * Forces execution of the workqueue and blocks until its completion. * This is typically used in driver shutdown handlers. * * This function will sample each workqueue's current insert_sequence number and * will sleep until the head sequence is greater than or equal to that. This * means that we sleep until all works which were queued on entry have been * handled, but we are not livelocked by new incoming ones. * * This function used to run the workqueues itself. Now we just wait for the * helper threads to do it. */ void fastcall flush_workqueue(struct workqueue_struct *wq)
  { // 该进程可以睡眠might_sleep();// 清空每个CPU上的工作队列if (is_single_threaded(wq)) { /* Always use first cpu's area. */ flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));} else { int cpu;mutex_lock(&workqueue_mutex);for_each_online_cpu(cpu)
  flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));mutex_unlock(&workqueue_mutex);} EXPORT_SYMBOL_GPL(flush_workqueue);
  flush_workqueue的核心处理函数为flush_cpu_workqueue:static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
  { if (cwq->thread == current) { // 如果是工作队列进程正在被调度/* * Probably keventd trying to flush its own queue. So simply run * it by hand rather than deadlocking. */ // 执行完该工作队列run_workqueue(cwq);} else { // 定义等待DEFINE_WAIT(wait);long sequence_needed;// 加锁spin_lock_irq(&cwq->lock);// 最新工作结构序号sequence_needed = cwq->insert_sequence;// 该条件是判断队列中是否还有没有执行的工作结构while (sequence_needed - cwq->remove_sequence > 0) { // 有为执行的工作结构// 通过work_done等待队列等待prepare_to_wait(&cwq->work_done, &wait,TASK_UNINTERRUPTIBLE);// 解锁spin_unlock_irq(&cwq->lock);// 睡眠, 由wake_up(&cwq->work_done)来唤醒schedule();// 重新加锁spin_lock_irq(&cwq->lock);} // 等待清除finish_wait(&cwq->work_done, &wait);spin_unlock_irq(&cwq->lock);}
  4.3 调度工作
  在大多数情况下, 并不需要自己建立工作队列,而是只定义工作, 将工作结构挂接到内核预定义的事件工作队列中调度, 在kernel/workqueue.c中定义了一个静态全局量的工作队列keventd_wq:static struct workqueue_struct *keventd_wq;
  4.3.1 立即调度// 在其他函数中使用以下函数来调度工作结构, 是把工作结构挂接到工作队列中进行调度/** * schedule_work - put work task in global workqueue * @work: job to be done * * This puts a job in the kernel-global workqueue. */ // 调度工作结构, 将工作结构添加到事件工作队列keventd_wq int fastcall schedule_work(struct work_struct *work)
  { return queue_work(keventd_wq, work);} EXPORT_SYMBOL(schedule_work);
  /** * queue_work - queue work on a workqueue * @wq: workqueue to use * @work: work to queue * * Returns 0 if @work was already on a queue, non-zero otherwise. * * We queue the work to the CPU it was submitted, but there is no * guarantee that it will be processed by that CPU. */ int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
  { int ret = 0, cpu = get_cpu();if (!test_and_set_bit(0, &work->pending)) { // 工作结构还没在队列, 设置pending标志表示把工作结构挂接到队列中if (unlikely(is_single_threaded(wq)))
  cpu = singlethread_cpu;BUG_ON(!list_empty(&work->entry));//  进行具体的排队__queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);ret = 1;} put_cpu();return ret;} EXPORT_SYMBOL_GPL(queue_work);/* Preempt must be disabled. */ // 不能被抢占static void __queue_work(struct cpu_workqueue_struct *cwq,struct work_struct *work)
  { unsigned long flags;// 加锁spin_lock_irqsave(&cwq->lock, flags);// 指向CPU工作队列work->wq_data = cwq;// 挂接到工作链表list_add_tail(&work->entry, &cwq->worklist);// 递增插入的序列号cwq->insert_sequence++;// 唤醒等待队列准备处理工作结构wake_up(&cwq->more_work);spin_unlock_irqrestore(&cwq->lock, flags);}
  4.3.2 延迟调度
  4.3.2.1 schedule_delayed_work /** * schedule_delayed_work - put work task in global workqueue after delay * @work: job to be done * @delay: number of jiffies to wait * * After waiting for a given time this puts a job in the kernel-global * workqueue. */ // 延迟调度工作, 延迟一定时间后再将工作结构挂接到工作队列int fastcall schedule_delayed_work(struct work_struct *work, unsigned long delay)
  { return queue_delayed_work(keventd_wq, work, delay);} EXPORT_SYMBOL(schedule_delayed_work);
  /** * queue_delayed_work - queue work on a workqueue after delay * @wq: workqueue to use * @work: work to queue * @delay: number of jiffies to wait before queueing * * Returns 0 if @work was already on a queue, non-zero otherwise. */ int fastcall queue_delayed_work(struct workqueue_struct *wq,struct work_struct *work, unsigned long delay)
  { int ret = 0;// 定时器, 此时的定时器应该是不起效的, 延迟将通过该定时器来实现struct timer_list *timer = &work->timer;if (!test_and_set_bit(0, &work->pending)) { // 工作结构还没在队列, 设置pending标志表示把工作结构挂接到队列中// 如果现在定时器已经起效, 出错BUG_ON(timer_pending(timer));// 工作结构已经挂接到链表, 出错BUG_ON(!list_empty(&work->entry));/* This stores wq for the moment, for the timer_fn */ // 保存工作队列的指针work->wq_data = wq;// 定时器初始化timer->expires = jiffies + delay;timer->data = (unsigned long)work;// 定时函数timer->function = delayed_work_timer_fn;// 定时器生效, 定时到期后再添加到工作队列add_timer(timer);ret = 1;} return ret;} EXPORT_SYMBOL_GPL(queue_delayed_work);
  // 定时中断函数static void delayed_work_timer_fn(unsigned long __data)
  { struct work_struct *work = (struct work_struct *)__data;struct workqueue_struct *wq = work->wq_data;// 获取CPU int cpu = smp_processor_id();if (unlikely(is_single_threaded(wq)))
  cpu = singlethread_cpu;// 将工作结构添加到工作队列,注意这是在时间中断调用__queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);}
  4.3.2.2 schedule_delayed_work_on
  指定CPU的延迟调度工作结构, 和schedule_delayed_work相比增加了一个CPU参数, 其他都相同/** * schedule_delayed_work_on - queue work in global workqueue on CPU after delay * @cpu: cpu to use * @work: job to be done * @delay: number of jiffies to wait * * After waiting for a given time this puts a job in the kernel-global * workqueue on the specified CPU. */ int schedule_delayed_work_on(int cpu,struct work_struct *work, unsigned long delay)
  { return queue_delayed_work_on(cpu, keventd_wq, work, delay);}
  /** * queue_delayed_work_on - queue work on specific CPU after delay * @cpu: CPU number to execute work on * @wq: workqueue to use * @work: work to queue * @delay: number of jiffies to wait before queueing * * Returns 0 if @work was already on a queue, non-zero otherwise. */ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,struct work_struct *work, unsigned long delay)
  { int ret = 0;struct timer_list *timer = &work->timer;if (!test_and_set_bit(0, &work->pending)) { BUG_ON(timer_pending(timer));BUG_ON(!list_empty(&work->entry));/* This stores wq for the moment, for the timer_fn */ work->wq_data = wq;timer->expires = jiffies + delay;timer->data = (unsigned  long)work;timer->function = delayed_work_timer_fn;add_timer_on(timer, cpu);ret = 1;} return ret;} EXPORT_SYMBOL_GPL(queue_delayed_work_on);
  5. 结论
  工作队列和定时器函数处理有点类似, 都是执行一定的回调函数, 但和定时器处理函数不同的是定时器回调函数只执行一次, 而且执行定时器回调函数的时候是在时钟中断中, 限制比较多, 因此回调程序不能太复杂; 而工作队列是通过内核线程实现, 一直有效, 可重复执行, 由于执行时降低了线程的优先级, 执行时可能休眠, 因此工作队列处理的应该是那些不是很紧急的任务, 如垃圾回收处理等, 通常在系统空闲时执行,在xfrm库中就广泛使用了workqueue,使用时,只需要定义work结构,然后调用schedule_(delayed_)work即可。
返回列表