小白学linux——工作队列(workqueue)

  • Post author:
  • Post category:linux




工作队列

工作队列在linux2.6版本中被引入,类似于linux2.4中的“task queue”(任务队列)。工作队列机制允许内核函数延迟执行,也就是你将一个即将要执行的函数放进工作队列中,工作队列机制中的工作线程会在一定时间之后执行该函数。

虽然有点像可延迟函数,但是他们还是有一些区别。最主要的区别在于可延迟函数在中断上下文中运行,而工作队列中的函数在进程上下文中执行。在进程上下文中运行是可以执行阻塞函数(例如,需要访问磁盘上数据块的函数)。可延迟函数和工作队列中的函数都不能访问进程的用户模式地址空间。

#工作队列的数据结构

基于linux2.6.0源码

与工作队列相关的主要数据结构是workqueue_struct描述符,里面包含元素个数为NR_CPUS的cpu_workqueue_struct数组

struct workqueue_struct {
	struct cpu_workqueue_struct cpu_wq[NR_CPUS];
};
struct cpu_workqueue_struct {

	spinlock_t lock;                /* 用于保护结构的自旋锁 */

	long remove_sequence;	        /* Least-recently added (next to run) */
	long insert_sequence;	        /* Next to add */

	struct list_head worklist;      /* 待处理函数列表的头部 */
	wait_queue_head_t more_work;    /* 等待队列,等待更多工作完成的工作线程休眠 */
	wait_queue_head_t work_done;    /* 等待队列,等待工作队列刷新的进程进入睡眠状态 */

	struct workqueue_struct *wq;    /* 指向包含此描述符的 workqueue_struct 结构的指针 */
	task_t *thread;                 /* 结构体的工作线程的进程描述符指针 */
	struct completion exit;         /* run_workqueue() 的当前执行深度
	                                  (当工作队列列表中的函数阻塞时,该字段可能会大于1 */

} ____cacheline_aligned;

每个待执行的函数都会挂在双向链表worklist中,每个待执行的函数都由结构work_struct描述符来描述

struct work_struct {
	unsigned long pending;          /* 如果函数已经在工作队列列表中,则设置为 1,否则设置为 0 */
	struct list_head entry;         /* 指向待处理函数列表中下一个和上一个元素的指针 */
	void (*func)(void *);           /* 要执行函数的地址 */
	void *data;                     /* 作为参数传递给挂起函数的指针 */
	void *wq_data;                  /* 通常指向父 cpu_workqueue_struct 描述符 */
	struct timer_list timer;        /* 用于延迟挂起函数执行的软件定时器 */
};



工作队列的函数

**create_workqueue(“foo”) **函数接收一个字符串作为其参数,并返回新创建的工作队列的 workqueue_struct 描述符的地址。该函数还创建了 n 个工作线程(其中 n 是系统中有效存在的 CPU 数量),以传递给该函数的字符串命名:foo/0、foo/1 等。**create_singlethread_workqueue()

函数功能类似,但它只创建一个工作线程,不管系统中的 CPU 数量是多少。为了销毁一个工作队列,内核调用

destroy_workqueue()**函数,它接收一个指向workqueue_struct数组的指针作为参数。

**queue_work() **向工作队列中插入一个函数(已经打包在 work_struct 描述符中);它接收一个指向 workqueue_struct 描述符的指针 wq 和一个指向 work_struct 描述符的指针 work。 queue_work() 主要执行以下步骤:

  1. 检查要插入的函数是否已经存在于工作队列中(work->pending 字段等于 1);如果是,则终止.
  2. 将 work_struct 描述符添加到工作队列列表,并将 work->pending 设置为 1。
  3. 如果工作线程在本地 CPU 的 cpu_workqueue_struct 描述符的 more_work 等待队列中休眠,则该函数将其唤醒。
int queue_work(struct workqueue_struct *wq, struct work_struct *work)
{
	unsigned long flags;
	int ret = 0, cpu = get_cpu();
	struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;

	if (!test_and_set_bit(0, &work->pending)) {
		BUG_ON(!list_empty(&work->entry));
		work->wq_data = cwq;

		spin_lock_irqsave(&cwq->lock, flags);
		list_add_tail(&work->entry, &cwq->worklist);
		cwq->insert_sequence++;
		wake_up(&cwq->more_work);
		spin_unlock_irqrestore(&cwq->lock, flags);
		ret = 1;
	}
	put_cpu();
	return ret;
}

**queue_delayed_work() **函数几乎与 queue_work() 相同,只是它接收第三个参数,表示系统滴答的时间延迟,用于确保挂起函数执行前的最小延迟。实际上, queue_delayed_work() 依赖于 work_struct 描述符的 timer 字段中的软件定时器来推迟 work_struct 描述符在工作队列列表中的实际插入。

int queue_delayed_work(struct workqueue_struct *wq,
			struct work_struct *work, unsigned long delay)
{
	int ret = 0, cpu = get_cpu();
	struct timer_list *timer = &work->timer;
	struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;

	if (!test_and_set_bit(0, &work->pending)) {
		BUG_ON(timer_pending(timer));
		BUG_ON(!list_empty(&work->entry));

		work->wq_data = cwq;
		timer->expires = jiffies + delay;
		timer->data = (unsigned long)work;
		timer->function = delayed_work_timer_fn;
		add_timer(timer);
		ret = 1;
	}
	put_cpu();
	return ret;
}

static void delayed_work_timer_fn(unsigned long __data)
{
	struct work_struct *work = (struct work_struct *)__data;
	struct cpu_workqueue_struct *cwq = work->wq_data;
	unsigned long flags;

	/*
	 * Do the wakeup within the spinlock, so that flushing
	 * can be done in a guaranteed way.
	 */
	spin_lock_irqsave(&cwq->lock, flags);
	list_add_tail(&work->entry, &cwq->worklist);
	cwq->insert_sequence++;
	wake_up(&cwq->more_work);
	spin_unlock_irqrestore(&cwq->lock, flags);
}

**cancel_delayed_work() **取消先前调度的工作队列函数,前提是相应的 work_struct 描述符尚未插入工作队列列表中.

static inline int cancel_delayed_work(struct work_struct *work)
{
	return del_timer_sync(&work->timer);
}

每个工作线程在 worker_thread() 函数内不断地执行一个循环,大部分时间线程都在休眠并等待一些工作排​​队.一旦被唤醒,工作线程调用run_workqueue()函数,该函数实质上从工作线程的工作队列列表中删除每个work_struct描述符并执行相应的挂起函数。因为工作队列函数可以阻塞,工作线程可以进入睡眠状态,甚至在恢复时迁移到另一个 CPU。

static int worker_thread(void *__startup)
{
	startup_t *startup = __startup;
	struct cpu_workqueue_struct *cwq = startup->cwq;
	int cpu = cwq - cwq->wq->cpu_wq;
	DECLARE_WAITQUEUE(wait, current);
	struct k_sigaction sa;

	daemonize("%s/%d", startup->name, cpu);
	allow_signal(SIGCHLD);
	current->flags |= PF_IOTHREAD;
	cwq->thread = current;

	set_user_nice(current, -10);
	set_cpus_allowed(current, cpumask_of_cpu(cpu));

	complete(&startup->done);

	/* Install a handler so SIGCLD is delivered */
	sa.sa.sa_handler = SIG_IGN;
	sa.sa.sa_flags = 0;
	siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
	do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);

	for (;;) {
		set_task_state(current, TASK_INTERRUPTIBLE);

		add_wait_queue(&cwq->more_work, &wait);
		if (!cwq->thread)
			break;
		if (list_empty(&cwq->worklist))
			schedule();
		else
			set_task_state(current, TASK_RUNNING);
		remove_wait_queue(&cwq->more_work, &wait);

		if (!list_empty(&cwq->worklist))
			run_workqueue(cwq);

		if (signal_pending(current)) {
			while (waitpid(-1, NULL, __WALL|WNOHANG) > 0)
				/* SIGCHLD - auto-reaping */ ;

			/* zap all other signals */
			flush_signals(current);
		}
	}
	remove_wait_queue(&cwq->more_work, &wait);
	complete(&cwq->exit);

	return 0;
}

有时内核必须等到工作队列中的所有挂起函数都执行完毕。**flush_workqueue() **函数接收一个workqueue_struct 描述符地址并阻塞调用进程,直到工作队列中挂起的所有函数都终止。然而,该函数不等待在调用flush_workqueue()之后添加到工作队列中的任何挂起函数;

void flush_workqueue(struct workqueue_struct *wq)
{
	struct cpu_workqueue_struct *cwq;
	int cpu;

	might_sleep();

	for (cpu = 0; cpu < NR_CPUS; cpu++) {
		DEFINE_WAIT(wait);
		long sequence_needed;

		if (!cpu_online(cpu))
			continue;
		cwq = wq->cpu_wq + cpu;

		spin_lock_irq(&cwq->lock);
		sequence_needed = cwq->insert_sequence;

		while (sequence_needed - cwq->remove_sequence > 0) {
			prepare_to_wait(&cwq->work_done, &wait,
					TASK_UNINTERRUPTIBLE);
			spin_unlock_irq(&cwq->lock);
			schedule();
			spin_lock_irq(&cwq->lock);
		}
		finish_wait(&cwq->work_done, &wait);
		spin_unlock_irq(&cwq->lock);
	}
}

共享工作队列

在大多数情况下,创建一整套工作线程来运行一个函数是多余的。因此,内核提供了一个称为事件的预定义工作队列,每个内核开发人员都可以自由使用。预定义的工作队列无非是一个标准的工作队列,可能包含不同内核层和I/O驱动的功能;它的 workqueue_struct 描述符存储在 keventd_wq 数组中。

schedule_work(w)                      queue_work(keventd_wq,w)

schedule_delayed_work(w,d)            queue_delayed_work(keventd_wq,w,d)
                                      (on any CPU)

schedule_delayed_work_on(cpu,w,d)     queue_delayed_work(keventd_wq,w,d)
                                      (on a given CPU)

flush_scheduled_work( )               flush_workqueue(keventd_wq)

当函数很少被调用时,预定义的工作队列可以节省大量的系统资源。另一方面,在预定义的工作队列中执行的函数不应长时间阻塞:因为工作队列列表中待处理函数的执行在每个 CPU 上都是串行化的,长时间的延迟会对预定义工作队列的其他用户产生负面影响。



版权声明:本文为qq_36413391原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。