2009-11-16 17:17:42

by Tejun Heo

[permalink] [raw]
Subject: [PATCHSET] workqueue: prepare for concurrency managed workqueue

Hello, all.

This patchset is updated version of patches 01-18 (everything except
for the actual concurrency amanged workqueue implementation) of
implement-concurrency-managed-workqueue RFC patchset[1]. Changes are

* Schedule class override trick dropped in favor of generic scheduler
notifier mechanism which replaces preempt notifiers.

* SINGLE_THREAD workqueues reimplemented using per-workqueue mutex.
Long term goal is to remove most of single threaded workqueues.

* Colored workqueue flushing reimplemented to support multi color
flushing. It currently uses 16 colors. As flushers are processed
in batch once colors are used up, the effect of limited number of
colors shouldn't be too noticeable even when there are many
concurrent flushers.

* insert_wq_barrier() fixed to take @cwq from caller. @target may
change underneath leading so the caller needs to specify @cwq
explicitly.

* Workers on a cpu now gets assigned unique number using ida.

This patchset contains the following patches.

0001-workqueue-fix-race-condition-in-schedule_on_each_cpu.patch
0002-sched-kvm-fix-race-condition-involving-sched_in_pree.patch
0003-workqueue-Add-debugobjects-support.patch
0004-sched-implement-scheduler-notifiers.patch
0005-kvm-convert-kvm-to-use-new-scheduler-notifiers.patch
0006-sched-drop-preempt-notifiers.patch
0007-sched-implement-sched_notifier_wake_up_process.patch
0008-scheduler-implement-force_cpus_allowed_ptr.patch
0009-acpi-use-queue_work_on-instead-of-binding-workqueue-.patch
0010-stop_machine-reimplement-without-using-workqueue.patch
0011-workqueue-misc-cosmetic-updates.patch
0012-workqueue-merge-feature-parametesr-into-flags.patch
0013-workqueue-update-cwq-alignement-and-make-one-more-fl.patch
0014-workqueue-define-both-bit-position-and-mask-for-work.patch
0015-workqueue-separate-out-process_one_work.patch
0016-workqueue-temporarily-disable-workqueue-tracing.patch
0017-workqueue-simple-reimplementation-of-SINGLE_THREAD-w.patch
0018-workqueue-reimplement-workqueue-flushing-using-color.patch
0019-workqueue-introduce-worker.patch
0020-workqueue-reimplement-work-flushing-using-linked-wor.patch
0021-workqueue-reimplement-workqueue-freeze-using-cwq-fro.patch

0001-0002 are bug fixes to existing code. They have been acked and
are currently in subsystem trees queued for 2.6.32.

0003 currently is in wq#for-next tree.

0004-0006 implement scheduler notifiers and replace preempt notifiers
with it. 0007 implements wake up function which can be used from
notifiers.

0017 replaces 0014 of [1] and reimplements SINGLE_THREAD so that it
behaves the same as mutl thread ones except that it uses mutex for
exclusion when actually executing the work.

0018 implements the new multi-color workqueue flushing.

Other than the above, patches remain mostly the same as the previous
posting except for minor bug fixes and updates.

If there's no objection (especially the scheduler notifier part), I
want to push the patches into linux-next and wait a while before
pushing concurrency managed workqueue. This prep patchset already
contains a lot of changes so I think it would be helpful to verify
things are okay at this point.

This patchset is available in the following git tree.

git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq.git review-wq-prep

Please note that the above tree is temporary. It needs to be rebased
once the fix patches appear upstream.

This patchset contains the following changes.

arch/ia64/kvm/Kconfig | 1
arch/powerpc/kvm/Kconfig | 1
arch/s390/kvm/Kconfig | 1
arch/x86/kernel/smpboot.c | 4
arch/x86/kvm/Kconfig | 1
drivers/acpi/osl.c | 41 -
include/linux/kvm_host.h | 5
include/linux/preempt.h | 43 -
include/linux/sched.h | 45 +
include/linux/stop_machine.h | 6
include/linux/workqueue.h | 110 ++-
init/Kconfig | 4
init/main.c | 2
kernel/power/process.c | 22
kernel/sched.c | 245 ++++---
kernel/stop_machine.c | 151 +++-
kernel/trace/Kconfig | 4
kernel/workqueue.c | 1387 +++++++++++++++++++++++++++++++++----------
lib/Kconfig.debug | 8
virt/kvm/kvm_main.c | 51 -
20 files changed, 1544 insertions(+), 588 deletions(-)

Thanks.

--
tejun

[1] http://thread.gmane.org/gmane.linux.kernel/896268


2009-11-16 17:17:02

by Tejun Heo

[permalink] [raw]
Subject: [PATCH 01/21] workqueue: fix race condition in schedule_on_each_cpu()

Commit 65a64464349883891e21e74af16c05d6e1eeb4e9 which allows
schedule_on_each_cpu() to be called from keventd added a race
condition. schedule_on_each_cpu() may race with cpu hotplug and end
up executing the function twice on a cpu.

Fix it by moving direct execution into the section protected with
get/put_online_cpus(). While at it, update code such that direct
execution is done after works have been scheduled for all other cpus
and drop unnecessary cpu != orig test from flush loop.

Signed-off-by: Tejun Heo <[email protected]>
Cc: Andi Kleen <[email protected]>
Cc: Oleg Nesterov <[email protected]>
---
kernel/workqueue.c | 28 +++++++++++++---------------
1 files changed, 13 insertions(+), 15 deletions(-)

diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 1232814..67e526b 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -692,31 +692,29 @@ int schedule_on_each_cpu(work_func_t func)
if (!works)
return -ENOMEM;

+ get_online_cpus();
+
/*
- * when running in keventd don't schedule a work item on itself.
- * Can just call directly because the work queue is already bound.
- * This also is faster.
- * Make this a generic parameter for other workqueues?
+ * When running in keventd don't schedule a work item on
+ * itself. Can just call directly because the work queue is
+ * already bound. This also is faster.
*/
- if (current_is_keventd()) {
+ if (current_is_keventd())
orig = raw_smp_processor_id();
- INIT_WORK(per_cpu_ptr(works, orig), func);
- func(per_cpu_ptr(works, orig));
- }

- get_online_cpus();
for_each_online_cpu(cpu) {
struct work_struct *work = per_cpu_ptr(works, cpu);

- if (cpu == orig)
- continue;
INIT_WORK(work, func);
- schedule_work_on(cpu, work);
- }
- for_each_online_cpu(cpu) {
if (cpu != orig)
- flush_work(per_cpu_ptr(works, cpu));
+ schedule_work_on(cpu, work);
}
+ if (orig >= 0)
+ func(per_cpu_ptr(works, orig));
+
+ for_each_online_cpu(cpu)
+ flush_work(per_cpu_ptr(works, cpu));
+
put_online_cpus();
free_percpu(works);
return 0;
--
1.6.4.2

2009-11-16 17:17:36

by Tejun Heo

[permalink] [raw]
Subject: [PATCH 02/21] sched, kvm: fix race condition involving sched_in_preempt_notifers

In finish_task_switch(), fire_sched_in_preempt_notifiers() is called
after finish_lock_switch(). However, depending on architecture,
preemption can be enabled after finish_lock_switch() which breaks the
semantics of preempt notifiers. Move it before finish_arch_switch().
This also makes in notifiers symmetric to out notifiers in terms of
locking - now both are called under rq lock.

Signed-off-by: Tejun Heo <[email protected]>
Cc: Avi Kivity <[email protected]>
---
kernel/sched.c | 2 +-
1 files changed, 1 insertions(+), 1 deletions(-)

diff --git a/kernel/sched.c b/kernel/sched.c
index 3c11ae0..de8a765 100644
--- a/kernel/sched.c
+++ b/kernel/sched.c
@@ -2751,9 +2751,9 @@ static void finish_task_switch(struct rq *rq, struct task_struct *prev)
prev_state = prev->state;
finish_arch_switch(prev);
perf_event_task_sched_in(current, cpu_of(rq));
+ fire_sched_in_preempt_notifiers(current);
finish_lock_switch(rq, prev);

- fire_sched_in_preempt_notifiers(current);
if (mm)
mmdrop(mm);
if (unlikely(prev_state == TASK_DEAD)) {
--
1.6.4.2

2009-11-16 17:17:56

by Tejun Heo

[permalink] [raw]
Subject: [PATCH 03/21] workqueue: Add debugobjects support

From: Thomas Gleixner <[email protected]>

Add debugobject support to track the life time of work_structs.

While at it, remove duplicate definition of
INIT_DELAYED_WORK_ON_STACK().

Signed-off-by: Thomas Gleixner <[email protected]>
Signed-off-by: Tejun Heo <[email protected]>
---
arch/x86/kernel/smpboot.c | 4 +-
include/linux/workqueue.h | 38 +++++++++----
kernel/workqueue.c | 131 +++++++++++++++++++++++++++++++++++++++++++-
lib/Kconfig.debug | 8 +++
4 files changed, 166 insertions(+), 15 deletions(-)

diff --git a/arch/x86/kernel/smpboot.c b/arch/x86/kernel/smpboot.c
index 565ebc6..ba43dfe 100644
--- a/arch/x86/kernel/smpboot.c
+++ b/arch/x86/kernel/smpboot.c
@@ -687,7 +687,7 @@ static int __cpuinit do_boot_cpu(int apicid, int cpu)
.done = COMPLETION_INITIALIZER_ONSTACK(c_idle.done),
};

- INIT_WORK(&c_idle.work, do_fork_idle);
+ INIT_WORK_ON_STACK(&c_idle.work, do_fork_idle);

alternatives_smp_switch(1);

@@ -713,6 +713,7 @@ static int __cpuinit do_boot_cpu(int apicid, int cpu)

if (IS_ERR(c_idle.idle)) {
printk("failed fork for CPU %d\n", cpu);
+ destroy_work_on_stack(&c_idle.work);
return PTR_ERR(c_idle.idle);
}

@@ -831,6 +832,7 @@ do_rest:
smpboot_restore_warm_reset_vector();
}

+ destroy_work_on_stack(&c_idle.work);
return boot_error;
}

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index cf24c20..9466e86 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -25,6 +25,7 @@ typedef void (*work_func_t)(struct work_struct *work);
struct work_struct {
atomic_long_t data;
#define WORK_STRUCT_PENDING 0 /* T if work item pending execution */
+#define WORK_STRUCT_STATIC 1 /* static initializer (debugobjects) */
#define WORK_STRUCT_FLAG_MASK (3UL)
#define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK)
struct list_head entry;
@@ -35,6 +36,7 @@ struct work_struct {
};

#define WORK_DATA_INIT() ATOMIC_LONG_INIT(0)
+#define WORK_DATA_STATIC_INIT() ATOMIC_LONG_INIT(2)

struct delayed_work {
struct work_struct work;
@@ -63,7 +65,7 @@ struct execute_work {
#endif

#define __WORK_INITIALIZER(n, f) { \
- .data = WORK_DATA_INIT(), \
+ .data = WORK_DATA_STATIC_INIT(), \
.entry = { &(n).entry, &(n).entry }, \
.func = (f), \
__WORK_INIT_LOCKDEP_MAP(#n, &(n)) \
@@ -91,6 +93,14 @@ struct execute_work {
#define PREPARE_DELAYED_WORK(_work, _func) \
PREPARE_WORK(&(_work)->work, (_func))

+#ifdef CONFIG_DEBUG_OBJECTS_WORK
+extern void __init_work(struct work_struct *work, int onstack);
+extern void destroy_work_on_stack(struct work_struct *work);
+#else
+static inline void __init_work(struct work_struct *work, int onstack) { }
+static inline void destroy_work_on_stack(struct work_struct *work) { }
+#endif
+
/*
* initialize all of a work item in one go
*
@@ -99,24 +109,36 @@ struct execute_work {
* to generate better code.
*/
#ifdef CONFIG_LOCKDEP
-#define INIT_WORK(_work, _func) \
+#define __INIT_WORK(_work, _func, _onstack) \
do { \
static struct lock_class_key __key; \
\
+ __init_work((_work), _onstack); \
(_work)->data = (atomic_long_t) WORK_DATA_INIT(); \
lockdep_init_map(&(_work)->lockdep_map, #_work, &__key, 0);\
INIT_LIST_HEAD(&(_work)->entry); \
PREPARE_WORK((_work), (_func)); \
} while (0)
#else
-#define INIT_WORK(_work, _func) \
+#define __INIT_WORK(_work, _func, _onstack) \
do { \
+ __init_work((_work), _onstack); \
(_work)->data = (atomic_long_t) WORK_DATA_INIT(); \
INIT_LIST_HEAD(&(_work)->entry); \
PREPARE_WORK((_work), (_func)); \
} while (0)
#endif

+#define INIT_WORK(_work, _func) \
+ do { \
+ __INIT_WORK((_work), (_func), 0); \
+ } while (0)
+
+#define INIT_WORK_ON_STACK(_work, _func) \
+ do { \
+ __INIT_WORK((_work), (_func), 1); \
+ } while (0)
+
#define INIT_DELAYED_WORK(_work, _func) \
do { \
INIT_WORK(&(_work)->work, (_func)); \
@@ -125,22 +147,16 @@ struct execute_work {

#define INIT_DELAYED_WORK_ON_STACK(_work, _func) \
do { \
- INIT_WORK(&(_work)->work, (_func)); \
+ INIT_WORK_ON_STACK(&(_work)->work, (_func)); \
init_timer_on_stack(&(_work)->timer); \
} while (0)

-#define INIT_DELAYED_WORK_DEFERRABLE(_work, _func) \
+#define INIT_DELAYED_WORK_DEFERRABLE(_work, _func) \
do { \
INIT_WORK(&(_work)->work, (_func)); \
init_timer_deferrable(&(_work)->timer); \
} while (0)

-#define INIT_DELAYED_WORK_ON_STACK(_work, _func) \
- do { \
- INIT_WORK(&(_work)->work, (_func)); \
- init_timer_on_stack(&(_work)->timer); \
- } while (0)
-
/**
* work_pending - Find out whether a work item is currently pending
* @work: The work item in question
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 67e526b..dee4865 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -68,6 +68,116 @@ struct workqueue_struct {
#endif
};

+#ifdef CONFIG_DEBUG_OBJECTS_WORK
+
+static struct debug_obj_descr work_debug_descr;
+
+/*
+ * fixup_init is called when:
+ * - an active object is initialized
+ */
+static int work_fixup_init(void *addr, enum debug_obj_state state)
+{
+ struct work_struct *work = addr;
+
+ switch (state) {
+ case ODEBUG_STATE_ACTIVE:
+ cancel_work_sync(work);
+ debug_object_init(work, &work_debug_descr);
+ return 1;
+ default:
+ return 0;
+ }
+}
+
+/*
+ * fixup_activate is called when:
+ * - an active object is activated
+ * - an unknown object is activated (might be a statically initialized object)
+ */
+static int work_fixup_activate(void *addr, enum debug_obj_state state)
+{
+ struct work_struct *work = addr;
+
+ switch (state) {
+
+ case ODEBUG_STATE_NOTAVAILABLE:
+ /*
+ * This is not really a fixup. The work struct was
+ * statically initialized. We just make sure that it
+ * is tracked in the object tracker.
+ */
+ if (test_bit(WORK_STRUCT_STATIC, work_data_bits(work))) {
+ debug_object_init(work, &work_debug_descr);
+ debug_object_activate(work, &work_debug_descr);
+ return 0;
+ }
+ WARN_ON_ONCE(1);
+ return 0;
+
+ case ODEBUG_STATE_ACTIVE:
+ WARN_ON(1);
+
+ default:
+ return 0;
+ }
+}
+
+/*
+ * fixup_free is called when:
+ * - an active object is freed
+ */
+static int work_fixup_free(void *addr, enum debug_obj_state state)
+{
+ struct work_struct *work = addr;
+
+ switch (state) {
+ case ODEBUG_STATE_ACTIVE:
+ cancel_work_sync(work);
+ debug_object_free(work, &work_debug_descr);
+ return 1;
+ default:
+ return 0;
+ }
+}
+
+static struct debug_obj_descr work_debug_descr = {
+ .name = "work_struct",
+ .fixup_init = work_fixup_init,
+ .fixup_activate = work_fixup_activate,
+ .fixup_free = work_fixup_free,
+};
+
+static inline void debug_work_activate(struct work_struct *work)
+{
+ debug_object_activate(work, &work_debug_descr);
+}
+
+static inline void debug_work_deactivate(struct work_struct *work)
+{
+ debug_object_deactivate(work, &work_debug_descr);
+}
+
+void __init_work(struct work_struct *work, int onstack)
+{
+ if (onstack)
+ debug_object_init_on_stack(work, &work_debug_descr);
+ else
+ debug_object_init(work, &work_debug_descr);
+}
+EXPORT_SYMBOL_GPL(__init_work);
+
+void destroy_work_on_stack(struct work_struct *work)
+{
+ debug_object_free(work, &work_debug_descr);
+}
+EXPORT_SYMBOL_GPL(destroy_work_on_stack);
+
+#else
+static inline void debug_work_activate(struct work_struct *work) { }
+static inline void debug_work_deactivate(struct work_struct *work) { }
+#endif
+
/* Serializes the accesses to the list of workqueues. */
static DEFINE_SPINLOCK(workqueue_lock);
static LIST_HEAD(workqueues);
@@ -145,6 +255,7 @@ static void __queue_work(struct cpu_workqueue_struct *cwq,
{
unsigned long flags;

+ debug_work_activate(work);
spin_lock_irqsave(&cwq->lock, flags);
insert_work(cwq, work, &cwq->worklist);
spin_unlock_irqrestore(&cwq->lock, flags);
@@ -280,6 +391,7 @@ static void run_workqueue(struct cpu_workqueue_struct *cwq)
struct lockdep_map lockdep_map = work->lockdep_map;
#endif
trace_workqueue_execution(cwq->thread, work);
+ debug_work_deactivate(work);
cwq->current_work = work;
list_del_init(cwq->worklist.next);
spin_unlock_irq(&cwq->lock);
@@ -350,11 +462,18 @@ static void wq_barrier_func(struct work_struct *work)
static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
struct wq_barrier *barr, struct list_head *head)
{
- INIT_WORK(&barr->work, wq_barrier_func);
+ /*
+ * debugobject calls are safe here even with cwq->lock locked
+ * as we know for sure that this will not trigger any of the
+ * checks and call back into the fixup functions where we
+ * might deadlock.
+ */
+ INIT_WORK_ON_STACK(&barr->work, wq_barrier_func);
__set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work));

init_completion(&barr->done);

+ debug_work_activate(&barr->work);
insert_work(cwq, &barr->work, head);
}

@@ -372,8 +491,10 @@ static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
}
spin_unlock_irq(&cwq->lock);

- if (active)
+ if (active) {
wait_for_completion(&barr.done);
+ destroy_work_on_stack(&barr.work);
+ }

return active;
}
@@ -451,6 +572,7 @@ out:
return 0;

wait_for_completion(&barr.done);
+ destroy_work_on_stack(&barr.work);
return 1;
}
EXPORT_SYMBOL_GPL(flush_work);
@@ -485,6 +607,7 @@ static int try_to_grab_pending(struct work_struct *work)
*/
smp_rmb();
if (cwq == get_wq_data(work)) {
+ debug_work_deactivate(work);
list_del_init(&work->entry);
ret = 1;
}
@@ -507,8 +630,10 @@ static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
}
spin_unlock_irq(&cwq->lock);

- if (unlikely(running))
+ if (unlikely(running)) {
wait_for_completion(&barr.done);
+ destroy_work_on_stack(&barr.work);
+ }
}

static void wait_on_work(struct work_struct *work)
diff --git a/lib/Kconfig.debug b/lib/Kconfig.debug
index 234ceb1..c91f051 100644
--- a/lib/Kconfig.debug
+++ b/lib/Kconfig.debug
@@ -298,6 +298,14 @@ config DEBUG_OBJECTS_TIMERS
timer routines to track the life time of timer objects and
validate the timer operations.

+config DEBUG_OBJECTS_WORK
+ bool "Debug work objects"
+ depends on DEBUG_OBJECTS
+ help
+ If you say Y here, additional code will be inserted into the
+ work queue routines to track the life time of work objects and
+ validate the work operations.
+
config DEBUG_OBJECTS_ENABLE_DEFAULT
int "debug_objects bootup default value (0-1)"
range 0 1
--
1.6.4.2

2009-11-16 17:19:08

by Tejun Heo

[permalink] [raw]
Subject: [PATCH 04/21] sched: implement scheduler notifiers

Implement scheduler notifiers. This is superset of preempt notifiers
which will be removed in favor of new notifiers. Four notifications
are defined - activated, deactivated, in and out. In and out are
identical to preempt notifiers. Activated and deactivated are called
when a task's readiness to run changes. The first three are always
called under rq lock. Out may not be called under rq lock depending
on architecture.

The notifier block contains union of all four callbacks to avoid
defining separate interface for each.

Signed-off-by: Tejun Heo <[email protected]>
---
include/linux/sched.h | 33 +++++++++++++++++++++++++++++
kernel/sched.c | 54 +++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 87 insertions(+), 0 deletions(-)

diff --git a/include/linux/sched.h b/include/linux/sched.h
index 75e6e60..0012980 100644
--- a/include/linux/sched.h
+++ b/include/linux/sched.h
@@ -1210,6 +1210,37 @@ struct sched_rt_entity {
#endif
};

+/*
+ * Scheduler notifiers
+ *
+ * All notifications other than OUT are guaranteed to be called with
+ * the respective rq lock held. Depending on architecture
+ * (__ARCH_WANT_UNLOCKED_CTXSW), OUT might be called without the rq
+ * lock but task->oncpu is guaranteed to be true.
+ */
+enum sched_notifier_type {
+ SCHED_NOTIFIER_ACTIVATED, /* put on runqueue */
+ SCHED_NOTIFIER_DEACTIVATED, /* removed from runqueue */
+ SCHED_NOTIFIER_IN, /* occupying CPU */
+ SCHED_NOTIFIER_OUT, /* leaving CPU */
+
+ SCHED_NR_NOTIFIERS,
+};
+
+struct sched_notifier {
+ struct hlist_node link;
+ union {
+ void (*activated)(struct sched_notifier *n, bool wakeup);
+ void (*deactivated)(struct sched_notifier *n, bool sleep);
+ void (*in)(struct sched_notifier *n, struct task_struct *prev);
+ void (*out)(struct sched_notifier *n, struct task_struct *next);
+ };
+};
+
+void sched_notifier_register(enum sched_notifier_type type,
+ struct sched_notifier *notifier);
+void sched_notifier_unregister(struct sched_notifier *notifier);
+
struct rcu_node;

struct task_struct {
@@ -1237,6 +1268,8 @@ struct task_struct {
/* list of struct preempt_notifier: */
struct hlist_head preempt_notifiers;
#endif
+ /* sched notifiers */
+ struct hlist_head notifiers[SCHED_NR_NOTIFIERS];

/*
* fpu_counter contains the number of consecutive context switches
diff --git a/kernel/sched.c b/kernel/sched.c
index de8a765..946c7a8 100644
--- a/kernel/sched.c
+++ b/kernel/sched.c
@@ -1389,6 +1389,20 @@ static const u32 prio_to_wmult[40] = {
/* 15 */ 119304647, 148102320, 186737708, 238609294, 286331153,
};

+#define sched_notifier_for_each(notifier, pos, p, type) \
+ hlist_for_each_entry((notifier), (pos), \
+ &(p)->notifiers[(type)], link)
+
+#define sched_notifier_call(p, type, callback, args...) do { \
+ struct task_struct *__p = (p); \
+ struct sched_notifier *__notifier; \
+ struct hlist_node *__pos; \
+ \
+ if (unlikely(!hlist_empty(&__p->notifiers[(type)]))) \
+ sched_notifier_for_each(__notifier, __pos, __p, (type)) \
+ __notifier->callback(__notifier , ##args); \
+} while (0)
+
static void activate_task(struct rq *rq, struct task_struct *p, int wakeup);

/*
@@ -1939,6 +1953,8 @@ static int effective_prio(struct task_struct *p)
*/
static void activate_task(struct rq *rq, struct task_struct *p, int wakeup)
{
+ sched_notifier_call(p, SCHED_NOTIFIER_ACTIVATED, activated, wakeup);
+
if (task_contributes_to_load(p))
rq->nr_uninterruptible--;

@@ -1951,6 +1967,8 @@ static void activate_task(struct rq *rq, struct task_struct *p, int wakeup)
*/
static void deactivate_task(struct rq *rq, struct task_struct *p, int sleep)
{
+ sched_notifier_call(p, SCHED_NOTIFIER_DEACTIVATED, deactivated, sleep);
+
if (task_contributes_to_load(p))
rq->nr_uninterruptible++;

@@ -2478,6 +2496,8 @@ int wake_up_state(struct task_struct *p, unsigned int state)
*/
static void __sched_fork(struct task_struct *p)
{
+ int i;
+
p->se.exec_start = 0;
p->se.sum_exec_runtime = 0;
p->se.prev_sum_exec_runtime = 0;
@@ -2529,6 +2549,8 @@ static void __sched_fork(struct task_struct *p)
#ifdef CONFIG_PREEMPT_NOTIFIERS
INIT_HLIST_HEAD(&p->preempt_notifiers);
#endif
+ for (i = 0; i < SCHED_NR_NOTIFIERS; i++)
+ INIT_HLIST_HEAD(&p->notifiers[i]);

/*
* We mark the process as running here, but have not actually
@@ -2709,6 +2731,7 @@ static inline void
prepare_task_switch(struct rq *rq, struct task_struct *prev,
struct task_struct *next)
{
+ sched_notifier_call(prev, SCHED_NOTIFIER_OUT, out, next);
fire_sched_out_preempt_notifiers(prev, next);
prepare_lock_switch(rq, next);
prepare_arch_switch(next);
@@ -2751,6 +2774,7 @@ static void finish_task_switch(struct rq *rq, struct task_struct *prev)
prev_state = prev->state;
finish_arch_switch(prev);
perf_event_task_sched_in(current, cpu_of(rq));
+ sched_notifier_call(current, SCHED_NOTIFIER_IN, in, prev);
fire_sched_in_preempt_notifiers(current);
finish_lock_switch(rq, prev);

@@ -7399,6 +7423,34 @@ static void calc_global_load_remove(struct rq *rq)
}
#endif /* CONFIG_HOTPLUG_CPU */

+/**
+ * sched_notifier_register - register a sched_notifier
+ * @type: type of sched_notifier to register
+ * @notifier: sched_notifier to register
+ *
+ * Register @notifier of @type to the current task.
+ */
+void sched_notifier_register(enum sched_notifier_type type,
+ struct sched_notifier *notifier)
+{
+ BUG_ON(type < 0 || type >= SCHED_NR_NOTIFIERS);
+ hlist_add_head(&notifier->link, &current->notifiers[type]);
+}
+EXPORT_SYMBOL_GPL(sched_notifier_register);
+
+/**
+ * sched_notifier_unregister - unregister a sched_notifier
+ * @notifier: sched_notifier to unregister
+ *
+ * Unregister @notifier from the current task. This function must be
+ * called from the task @notifier is registered to.
+ */
+void sched_notifier_unregister(struct sched_notifier *notifier)
+{
+ hlist_del_init(&notifier->link);
+}
+EXPORT_SYMBOL_GPL(sched_notifier_unregister);
+
#if defined(CONFIG_SCHED_DEBUG) && defined(CONFIG_SYSCTL)

static struct ctl_table sd_ctl_dir[] = {
@@ -9534,6 +9586,8 @@ void __init sched_init(void)
#ifdef CONFIG_PREEMPT_NOTIFIERS
INIT_HLIST_HEAD(&init_task.preempt_notifiers);
#endif
+ for (i = 0; i < SCHED_NR_NOTIFIERS; i++)
+ INIT_HLIST_HEAD(&init_task.notifiers[i]);

#ifdef CONFIG_SMP
open_softirq(SCHED_SOFTIRQ, run_rebalance_domains);
--
1.6.4.2

2009-11-16 17:19:27

by Tejun Heo

[permalink] [raw]
Subject: [PATCH 05/21] kvm: convert kvm to use new scheduler notifiers

Convert kvm to use new scheduler notifiers instead of preempt
notifiers.

Signed-off-by: Tejun Heo <[email protected]>
Cc: Avi Kivity <[email protected]>
---
include/linux/kvm_host.h | 5 +--
virt/kvm/kvm_main.c | 51 +++++++++++++++++++--------------------------
2 files changed, 24 insertions(+), 32 deletions(-)

diff --git a/include/linux/kvm_host.h b/include/linux/kvm_host.h
index b7bbb5d..b6e56f1 100644
--- a/include/linux/kvm_host.h
+++ b/include/linux/kvm_host.h
@@ -74,9 +74,8 @@ void kvm_io_bus_unregister_dev(struct kvm *kvm, struct kvm_io_bus *bus,

struct kvm_vcpu {
struct kvm *kvm;
-#ifdef CONFIG_PREEMPT_NOTIFIERS
- struct preempt_notifier preempt_notifier;
-#endif
+ struct sched_notifier sched_in_notifier;
+ struct sched_notifier sched_out_notifier;
int vcpu_id;
struct mutex mutex;
int cpu;
diff --git a/virt/kvm/kvm_main.c b/virt/kvm/kvm_main.c
index 7495ce3..4cc8051 100644
--- a/virt/kvm/kvm_main.c
+++ b/virt/kvm/kvm_main.c
@@ -79,8 +79,6 @@ static cpumask_var_t cpus_hardware_enabled;
struct kmem_cache *kvm_vcpu_cache;
EXPORT_SYMBOL_GPL(kvm_vcpu_cache);

-static __read_mostly struct preempt_ops kvm_preempt_ops;
-
struct dentry *kvm_debugfs_dir;

static long kvm_vcpu_ioctl(struct file *file, unsigned int ioctl,
@@ -713,7 +711,8 @@ void vcpu_load(struct kvm_vcpu *vcpu)

mutex_lock(&vcpu->mutex);
cpu = get_cpu();
- preempt_notifier_register(&vcpu->preempt_notifier);
+ sched_notifier_register(SCHED_NOTIFIER_IN, &vcpu->sched_in_notifier);
+ sched_notifier_register(SCHED_NOTIFIER_OUT, &vcpu->sched_out_notifier);
kvm_arch_vcpu_load(vcpu, cpu);
put_cpu();
}
@@ -722,11 +721,28 @@ void vcpu_put(struct kvm_vcpu *vcpu)
{
preempt_disable();
kvm_arch_vcpu_put(vcpu);
- preempt_notifier_unregister(&vcpu->preempt_notifier);
+ sched_notifier_unregister(&vcpu->sched_in_notifier);
+ sched_notifier_unregister(&vcpu->sched_out_notifier);
preempt_enable();
mutex_unlock(&vcpu->mutex);
}

+static void kvm_sched_in(struct sched_notifier *sn, struct task_struct *prev)
+{
+ struct kvm_vcpu *vcpu =
+ container_of(sn, struct kvm_vcpu, sched_in_notifier);
+
+ kvm_arch_vcpu_load(vcpu, smp_processor_id());
+}
+
+static void kvm_sched_out(struct sched_notifier *sn, struct task_struct *next)
+{
+ struct kvm_vcpu *vcpu =
+ container_of(sn, struct kvm_vcpu, sched_out_notifier);
+
+ kvm_arch_vcpu_put(vcpu);
+}
+
static void ack_flush(void *_completed)
{
}
@@ -1772,7 +1788,8 @@ static int kvm_vm_ioctl_create_vcpu(struct kvm *kvm, u32 id)
if (IS_ERR(vcpu))
return PTR_ERR(vcpu);

- preempt_notifier_init(&vcpu->preempt_notifier, &kvm_preempt_ops);
+ vcpu->sched_in_notifier.in = kvm_sched_in;
+ vcpu->sched_out_notifier.out = kvm_sched_out;

r = kvm_arch_vcpu_setup(vcpu);
if (r)
@@ -2690,27 +2707,6 @@ static struct sys_device kvm_sysdev = {
struct page *bad_page;
pfn_t bad_pfn;

-static inline
-struct kvm_vcpu *preempt_notifier_to_vcpu(struct preempt_notifier *pn)
-{
- return container_of(pn, struct kvm_vcpu, preempt_notifier);
-}
-
-static void kvm_sched_in(struct preempt_notifier *pn, int cpu)
-{
- struct kvm_vcpu *vcpu = preempt_notifier_to_vcpu(pn);
-
- kvm_arch_vcpu_load(vcpu, cpu);
-}
-
-static void kvm_sched_out(struct preempt_notifier *pn,
- struct task_struct *next)
-{
- struct kvm_vcpu *vcpu = preempt_notifier_to_vcpu(pn);
-
- kvm_arch_vcpu_put(vcpu);
-}
-
int kvm_init(void *opaque, unsigned int vcpu_size,
struct module *module)
{
@@ -2780,9 +2776,6 @@ int kvm_init(void *opaque, unsigned int vcpu_size,
goto out_free;
}

- kvm_preempt_ops.sched_in = kvm_sched_in;
- kvm_preempt_ops.sched_out = kvm_sched_out;
-
kvm_init_debug();

return 0;
--
1.6.4.2

2009-11-16 17:16:34

by Tejun Heo

[permalink] [raw]
Subject: [PATCH 06/21] sched: drop preempt notifiers

With kvm converted, preempt notifiers have no user. Kill it.

Signed-off-by: Tejun Heo <[email protected]>
---
arch/ia64/kvm/Kconfig | 1 -
arch/powerpc/kvm/Kconfig | 1 -
arch/s390/kvm/Kconfig | 1 -
arch/x86/kvm/Kconfig | 1 -
include/linux/preempt.h | 43 ------------------------------
include/linux/sched.h | 4 ---
init/Kconfig | 4 ---
kernel/sched.c | 66 ----------------------------------------------
8 files changed, 0 insertions(+), 121 deletions(-)

diff --git a/arch/ia64/kvm/Kconfig b/arch/ia64/kvm/Kconfig
index ef3e7be..a9e2b9c 100644
--- a/arch/ia64/kvm/Kconfig
+++ b/arch/ia64/kvm/Kconfig
@@ -22,7 +22,6 @@ config KVM
depends on HAVE_KVM && MODULES && EXPERIMENTAL
# for device assignment:
depends on PCI
- select PREEMPT_NOTIFIERS
select ANON_INODES
select HAVE_KVM_IRQCHIP
select KVM_APIC_ARCHITECTURE
diff --git a/arch/powerpc/kvm/Kconfig b/arch/powerpc/kvm/Kconfig
index c299268..092503e 100644
--- a/arch/powerpc/kvm/Kconfig
+++ b/arch/powerpc/kvm/Kconfig
@@ -18,7 +18,6 @@ if VIRTUALIZATION

config KVM
bool
- select PREEMPT_NOTIFIERS
select ANON_INODES

config KVM_440
diff --git a/arch/s390/kvm/Kconfig b/arch/s390/kvm/Kconfig
index bf164fc..e125d45 100644
--- a/arch/s390/kvm/Kconfig
+++ b/arch/s390/kvm/Kconfig
@@ -18,7 +18,6 @@ if VIRTUALIZATION
config KVM
tristate "Kernel-based Virtual Machine (KVM) support"
depends on HAVE_KVM && EXPERIMENTAL
- select PREEMPT_NOTIFIERS
select ANON_INODES
select S390_SWITCH_AMODE
---help---
diff --git a/arch/x86/kvm/Kconfig b/arch/x86/kvm/Kconfig
index b84e571..b391852 100644
--- a/arch/x86/kvm/Kconfig
+++ b/arch/x86/kvm/Kconfig
@@ -22,7 +22,6 @@ config KVM
depends on HAVE_KVM
# for device assignment:
depends on PCI
- select PREEMPT_NOTIFIERS
select MMU_NOTIFIER
select ANON_INODES
select HAVE_KVM_IRQCHIP
diff --git a/include/linux/preempt.h b/include/linux/preempt.h
index 72b1a10..538c675 100644
--- a/include/linux/preempt.h
+++ b/include/linux/preempt.h
@@ -93,47 +93,4 @@ do { \

#endif

-#ifdef CONFIG_PREEMPT_NOTIFIERS
-
-struct preempt_notifier;
-
-/**
- * preempt_ops - notifiers called when a task is preempted and rescheduled
- * @sched_in: we're about to be rescheduled:
- * notifier: struct preempt_notifier for the task being scheduled
- * cpu: cpu we're scheduled on
- * @sched_out: we've just been preempted
- * notifier: struct preempt_notifier for the task being preempted
- * next: the task that's kicking us out
- */
-struct preempt_ops {
- void (*sched_in)(struct preempt_notifier *notifier, int cpu);
- void (*sched_out)(struct preempt_notifier *notifier,
- struct task_struct *next);
-};
-
-/**
- * preempt_notifier - key for installing preemption notifiers
- * @link: internal use
- * @ops: defines the notifier functions to be called
- *
- * Usually used in conjunction with container_of().
- */
-struct preempt_notifier {
- struct hlist_node link;
- struct preempt_ops *ops;
-};
-
-void preempt_notifier_register(struct preempt_notifier *notifier);
-void preempt_notifier_unregister(struct preempt_notifier *notifier);
-
-static inline void preempt_notifier_init(struct preempt_notifier *notifier,
- struct preempt_ops *ops)
-{
- INIT_HLIST_NODE(&notifier->link);
- notifier->ops = ops;
-}
-
-#endif
-
#endif /* __LINUX_PREEMPT_H */
diff --git a/include/linux/sched.h b/include/linux/sched.h
index 0012980..16dcd58 100644
--- a/include/linux/sched.h
+++ b/include/linux/sched.h
@@ -1264,10 +1264,6 @@ struct task_struct {
struct sched_entity se;
struct sched_rt_entity rt;

-#ifdef CONFIG_PREEMPT_NOTIFIERS
- /* list of struct preempt_notifier: */
- struct hlist_head preempt_notifiers;
-#endif
/* sched notifiers */
struct hlist_head notifiers[SCHED_NR_NOTIFIERS];

diff --git a/init/Kconfig b/init/Kconfig
index 9e03ef8..0220aa7 100644
--- a/init/Kconfig
+++ b/init/Kconfig
@@ -1206,7 +1206,3 @@ config STOP_MACHINE
Need stop_machine() primitive.

source "block/Kconfig"
-
-config PREEMPT_NOTIFIERS
- bool
-
diff --git a/kernel/sched.c b/kernel/sched.c
index 946c7a8..a25b993 100644
--- a/kernel/sched.c
+++ b/kernel/sched.c
@@ -2546,9 +2546,6 @@ static void __sched_fork(struct task_struct *p)
p->se.on_rq = 0;
INIT_LIST_HEAD(&p->se.group_node);

-#ifdef CONFIG_PREEMPT_NOTIFIERS
- INIT_HLIST_HEAD(&p->preempt_notifiers);
-#endif
for (i = 0; i < SCHED_NR_NOTIFIERS; i++)
INIT_HLIST_HEAD(&p->notifiers[i]);

@@ -2656,64 +2653,6 @@ void wake_up_new_task(struct task_struct *p, unsigned long clone_flags)
task_rq_unlock(rq, &flags);
}

-#ifdef CONFIG_PREEMPT_NOTIFIERS
-
-/**
- * preempt_notifier_register - tell me when current is being preempted & rescheduled
- * @notifier: notifier struct to register
- */
-void preempt_notifier_register(struct preempt_notifier *notifier)
-{
- hlist_add_head(&notifier->link, &current->preempt_notifiers);
-}
-EXPORT_SYMBOL_GPL(preempt_notifier_register);
-
-/**
- * preempt_notifier_unregister - no longer interested in preemption notifications
- * @notifier: notifier struct to unregister
- *
- * This is safe to call from within a preemption notifier.
- */
-void preempt_notifier_unregister(struct preempt_notifier *notifier)
-{
- hlist_del(&notifier->link);
-}
-EXPORT_SYMBOL_GPL(preempt_notifier_unregister);
-
-static void fire_sched_in_preempt_notifiers(struct task_struct *curr)
-{
- struct preempt_notifier *notifier;
- struct hlist_node *node;
-
- hlist_for_each_entry(notifier, node, &curr->preempt_notifiers, link)
- notifier->ops->sched_in(notifier, raw_smp_processor_id());
-}
-
-static void
-fire_sched_out_preempt_notifiers(struct task_struct *curr,
- struct task_struct *next)
-{
- struct preempt_notifier *notifier;
- struct hlist_node *node;
-
- hlist_for_each_entry(notifier, node, &curr->preempt_notifiers, link)
- notifier->ops->sched_out(notifier, next);
-}
-
-#else /* !CONFIG_PREEMPT_NOTIFIERS */
-
-static void fire_sched_in_preempt_notifiers(struct task_struct *curr)
-{
-}
-
-static void
-fire_sched_out_preempt_notifiers(struct task_struct *curr,
- struct task_struct *next)
-{
-}
-
-#endif /* CONFIG_PREEMPT_NOTIFIERS */
-
/**
* prepare_task_switch - prepare to switch tasks
* @rq: the runqueue preparing to switch
@@ -2732,7 +2671,6 @@ prepare_task_switch(struct rq *rq, struct task_struct *prev,
struct task_struct *next)
{
sched_notifier_call(prev, SCHED_NOTIFIER_OUT, out, next);
- fire_sched_out_preempt_notifiers(prev, next);
prepare_lock_switch(rq, next);
prepare_arch_switch(next);
}
@@ -2775,7 +2713,6 @@ static void finish_task_switch(struct rq *rq, struct task_struct *prev)
finish_arch_switch(prev);
perf_event_task_sched_in(current, cpu_of(rq));
sched_notifier_call(current, SCHED_NOTIFIER_IN, in, prev);
- fire_sched_in_preempt_notifiers(current);
finish_lock_switch(rq, prev);

if (mm)
@@ -9583,9 +9520,6 @@ void __init sched_init(void)

set_load_weight(&init_task);

-#ifdef CONFIG_PREEMPT_NOTIFIERS
- INIT_HLIST_HEAD(&init_task.preempt_notifiers);
-#endif
for (i = 0; i < SCHED_NR_NOTIFIERS; i++)
INIT_HLIST_HEAD(&init_task.notifiers[i]);

--
1.6.4.2

2009-11-16 17:16:38

by Tejun Heo

[permalink] [raw]
Subject: [PATCH 07/21] sched: implement sched_notifier_wake_up_process()

Implement sched_notifier_wake_up_process() which can be called from
activate, deactivate and in scheduler notifiers to wake up a task
which is bound to the same cpu. This will be used to implement
concurrency managed workqueue.

Signed-off-by: Tejun Heo <[email protected]>
---
include/linux/sched.h | 1 +
kernel/sched.c | 37 +++++++++++++++++++++++++++++++++++++
2 files changed, 38 insertions(+), 0 deletions(-)

diff --git a/include/linux/sched.h b/include/linux/sched.h
index 16dcd58..5d3a554 100644
--- a/include/linux/sched.h
+++ b/include/linux/sched.h
@@ -1240,6 +1240,7 @@ struct sched_notifier {
void sched_notifier_register(enum sched_notifier_type type,
struct sched_notifier *notifier);
void sched_notifier_unregister(struct sched_notifier *notifier);
+bool sched_notifier_wake_up_process(struct task_struct *p);

struct rcu_node;

diff --git a/kernel/sched.c b/kernel/sched.c
index a25b993..c8868e2 100644
--- a/kernel/sched.c
+++ b/kernel/sched.c
@@ -7388,6 +7388,43 @@ void sched_notifier_unregister(struct sched_notifier *notifier)
}
EXPORT_SYMBOL_GPL(sched_notifier_unregister);

+/**
+ * sched_notifier_wake_up_process - wake up a process from sched notifier
+ * @p: task to wake up
+ *
+ * Wake up @p. This function can only be called from activate,
+ * deactivate and in scheduler notifiers and can only wake up tasks
+ * which are already bound to the cpu in question.
+ *
+ * CONTEXT:
+ * Scheduler notifiers.
+ *
+ * RETURNS:
+ * true if @p was waken up, false if @p was already awake.
+ */
+bool sched_notifier_wake_up_process(struct task_struct *p)
+{
+ struct rq *rq = task_rq(p);
+ bool success = false;
+
+ assert_spin_locked(&rq->lock);
+
+ if (!p->se.on_rq) {
+ schedstat_inc(p, se.nr_wakeups);
+ schedstat_inc(p, se.nr_wakeups_local);
+ activate_task(rq, p, 1);
+ success = true;
+ }
+
+ trace_sched_wakeup(rq, p, success);
+ p->state = TASK_RUNNING;
+#ifdef CONFIG_SMP
+ if (p->sched_class->task_wake_up)
+ p->sched_class->task_wake_up(rq, p);
+#endif
+ return success;
+}
+
#if defined(CONFIG_SCHED_DEBUG) && defined(CONFIG_SYSCTL)

static struct ctl_table sd_ctl_dir[] = {
--
1.6.4.2

2009-11-16 17:19:16

by Tejun Heo

[permalink] [raw]
Subject: [PATCH 08/21] scheduler: implement force_cpus_allowed_ptr()

Implement force_cpus_allowed_ptr() which is similar to
set_cpus_allowed_ptr() but bypasses PF_THREAD_BOUND check and ignores
cpu_active() status as long as the target cpu is online. This will be
used for concurrency-managed workqueue.

Signed-off-by: Tejun Heo <[email protected]>
---
include/linux/sched.h | 7 ++++
kernel/sched.c | 88 +++++++++++++++++++++++++++++++++----------------
2 files changed, 66 insertions(+), 29 deletions(-)

diff --git a/include/linux/sched.h b/include/linux/sched.h
index 5d3a554..f283e6f 100644
--- a/include/linux/sched.h
+++ b/include/linux/sched.h
@@ -1839,6 +1839,8 @@ static inline void rcu_copy_process(struct task_struct *p)
#ifdef CONFIG_SMP
extern int set_cpus_allowed_ptr(struct task_struct *p,
const struct cpumask *new_mask);
+extern int force_cpus_allowed_ptr(struct task_struct *p,
+ const struct cpumask *new_mask);
#else
static inline int set_cpus_allowed_ptr(struct task_struct *p,
const struct cpumask *new_mask)
@@ -1847,6 +1849,11 @@ static inline int set_cpus_allowed_ptr(struct task_struct *p,
return -EINVAL;
return 0;
}
+static inline int force_cpus_allowed_ptr(struct task_struct *p,
+ const struct cpumask *new_mask)
+{
+ return set_cpus_allowed_ptr(p, new_mask);
+}
#endif

#ifndef CONFIG_CPUMASK_OFFSTACK
diff --git a/kernel/sched.c b/kernel/sched.c
index c8868e2..a6d863b 100644
--- a/kernel/sched.c
+++ b/kernel/sched.c
@@ -2115,6 +2115,7 @@ struct migration_req {

struct task_struct *task;
int dest_cpu;
+ bool force;

struct completion done;
};
@@ -2123,8 +2124,8 @@ struct migration_req {
* The task's runqueue lock must be held.
* Returns true if you have to wait for migration thread.
*/
-static int
-migrate_task(struct task_struct *p, int dest_cpu, struct migration_req *req)
+static int migrate_task(struct task_struct *p, int dest_cpu,
+ struct migration_req *req, bool force)
{
struct rq *rq = task_rq(p);

@@ -2140,6 +2141,7 @@ migrate_task(struct task_struct *p, int dest_cpu, struct migration_req *req)
init_completion(&req->done);
req->task = p;
req->dest_cpu = dest_cpu;
+ req->force = force;
list_add(&req->list, &rq->migration_queue);

return 1;
@@ -3086,7 +3088,7 @@ static void sched_migrate_task(struct task_struct *p, int dest_cpu)
goto out;

/* force the process onto the specified CPU */
- if (migrate_task(p, dest_cpu, &req)) {
+ if (migrate_task(p, dest_cpu, &req, false)) {
/* Need to wait for migration thread (might exit: take ref). */
struct task_struct *mt = rq->migration_thread;

@@ -6999,34 +7001,19 @@ static inline void sched_init_granularity(void)
* 7) we wake up and the migration is done.
*/

-/*
- * Change a given task's CPU affinity. Migrate the thread to a
- * proper CPU and schedule it away if the CPU it's executing on
- * is removed from the allowed bitmask.
- *
- * NOTE: the caller must have a valid reference to the task, the
- * task must not exit() & deallocate itself prematurely. The
- * call is not atomic; no spinlocks may be held.
- */
-int set_cpus_allowed_ptr(struct task_struct *p, const struct cpumask *new_mask)
+static inline int __set_cpus_allowed_ptr(struct task_struct *p,
+ const struct cpumask *new_mask,
+ struct rq *rq, unsigned long *flags,
+ bool force)
{
struct migration_req req;
- unsigned long flags;
- struct rq *rq;
int ret = 0;

- rq = task_rq_lock(p, &flags);
if (!cpumask_intersects(new_mask, cpu_online_mask)) {
ret = -EINVAL;
goto out;
}

- if (unlikely((p->flags & PF_THREAD_BOUND) && p != current &&
- !cpumask_equal(&p->cpus_allowed, new_mask))) {
- ret = -EINVAL;
- goto out;
- }
-
if (p->sched_class->set_cpus_allowed)
p->sched_class->set_cpus_allowed(p, new_mask);
else {
@@ -7038,12 +7025,13 @@ int set_cpus_allowed_ptr(struct task_struct *p, const struct cpumask *new_mask)
if (cpumask_test_cpu(task_cpu(p), new_mask))
goto out;

- if (migrate_task(p, cpumask_any_and(cpu_online_mask, new_mask), &req)) {
+ if (migrate_task(p, cpumask_any_and(cpu_online_mask, new_mask), &req,
+ force)) {
/* Need help from migration thread: drop lock and wait. */
struct task_struct *mt = rq->migration_thread;

get_task_struct(mt);
- task_rq_unlock(rq, &flags);
+ task_rq_unlock(rq, flags);
wake_up_process(rq->migration_thread);
put_task_struct(mt);
wait_for_completion(&req.done);
@@ -7051,13 +7039,53 @@ int set_cpus_allowed_ptr(struct task_struct *p, const struct cpumask *new_mask)
return 0;
}
out:
- task_rq_unlock(rq, &flags);
+ task_rq_unlock(rq, flags);

return ret;
}
+
+/*
+ * Change a given task's CPU affinity. Migrate the thread to a
+ * proper CPU and schedule it away if the CPU it's executing on
+ * is removed from the allowed bitmask.
+ *
+ * NOTE: the caller must have a valid reference to the task, the
+ * task must not exit() & deallocate itself prematurely. The
+ * call is not atomic; no spinlocks may be held.
+ */
+int set_cpus_allowed_ptr(struct task_struct *p, const struct cpumask *new_mask)
+{
+ unsigned long flags;
+ struct rq *rq;
+
+ rq = task_rq_lock(p, &flags);
+
+ if (unlikely((p->flags & PF_THREAD_BOUND) && p != current &&
+ !cpumask_equal(&p->cpus_allowed, new_mask))) {
+ task_rq_unlock(rq, &flags);
+ return -EINVAL;
+ }
+
+ return __set_cpus_allowed_ptr(p, new_mask, rq, &flags, false);
+}
EXPORT_SYMBOL_GPL(set_cpus_allowed_ptr);

/*
+ * Similar to set_cpus_allowed_ptr() but bypasses PF_THREAD_BOUND
+ * check and ignores cpu_active() status as long as the cpu is online.
+ * The caller is responsible for ensuring things don't go bonkers.
+ */
+int force_cpus_allowed_ptr(struct task_struct *p,
+ const struct cpumask *new_mask)
+{
+ unsigned long flags;
+ struct rq *rq;
+
+ rq = task_rq_lock(p, &flags);
+ return __set_cpus_allowed_ptr(p, new_mask, rq, &flags, true);
+}
+
+/*
* Move (not current) task off this cpu, onto dest cpu. We're doing
* this because either it can't run here any more (set_cpus_allowed()
* away from this CPU, or CPU going down), or because we're
@@ -7068,12 +7096,13 @@ EXPORT_SYMBOL_GPL(set_cpus_allowed_ptr);
*
* Returns non-zero if task was successfully migrated.
*/
-static int __migrate_task(struct task_struct *p, int src_cpu, int dest_cpu)
+static int __migrate_task(struct task_struct *p, int src_cpu, int dest_cpu,
+ bool force)
{
struct rq *rq_dest, *rq_src;
int ret = 0, on_rq;

- if (unlikely(!cpu_active(dest_cpu)))
+ if (!force && unlikely(!cpu_active(dest_cpu)))
return ret;

rq_src = cpu_rq(src_cpu);
@@ -7152,7 +7181,8 @@ static int migration_thread(void *data)

if (req->task != NULL) {
spin_unlock(&rq->lock);
- __migrate_task(req->task, cpu, req->dest_cpu);
+ __migrate_task(req->task, cpu, req->dest_cpu,
+ req->force);
} else if (likely(cpu == (badcpu = smp_processor_id()))) {
req->dest_cpu = RCU_MIGRATION_GOT_QS;
spin_unlock(&rq->lock);
@@ -7177,7 +7207,7 @@ static int __migrate_task_irq(struct task_struct *p, int src_cpu, int dest_cpu)
int ret;

local_irq_disable();
- ret = __migrate_task(p, src_cpu, dest_cpu);
+ ret = __migrate_task(p, src_cpu, dest_cpu, false);
local_irq_enable();
return ret;
}
--
1.6.4.2

2009-11-16 17:18:32

by Tejun Heo

[permalink] [raw]
Subject: [PATCH 09/21] acpi: use queue_work_on() instead of binding workqueue worker to cpu0

ACPI works need to be executed on cpu0 and acpi/osl.c achieves this by
creating singlethread workqueue and then binding it to cpu0 from a
work which is quite unorthodox. Make it create regular workqueues and
use queue_work_on() instead. This is in preparation of concurrency
managed workqueue and the extra workers won't be a problem after it's
implemented.

Signed-off-by: Tejun Heo <[email protected]>
---
drivers/acpi/osl.c | 41 ++++++++++++-----------------------------
1 files changed, 12 insertions(+), 29 deletions(-)

diff --git a/drivers/acpi/osl.c b/drivers/acpi/osl.c
index 7c1c59e..f742b7b 100644
--- a/drivers/acpi/osl.c
+++ b/drivers/acpi/osl.c
@@ -191,36 +191,11 @@ acpi_status __init acpi_os_initialize(void)
return AE_OK;
}

-static void bind_to_cpu0(struct work_struct *work)
-{
- set_cpus_allowed_ptr(current, cpumask_of(0));
- kfree(work);
-}
-
-static void bind_workqueue(struct workqueue_struct *wq)
-{
- struct work_struct *work;
-
- work = kzalloc(sizeof(struct work_struct), GFP_KERNEL);
- INIT_WORK(work, bind_to_cpu0);
- queue_work(wq, work);
-}
-
acpi_status acpi_os_initialize1(void)
{
- /*
- * On some machines, a software-initiated SMI causes corruption unless
- * the SMI runs on CPU 0. An SMI can be initiated by any AML, but
- * typically it's done in GPE-related methods that are run via
- * workqueues, so we can avoid the known corruption cases by binding
- * the workqueues to CPU 0.
- */
- kacpid_wq = create_singlethread_workqueue("kacpid");
- bind_workqueue(kacpid_wq);
- kacpi_notify_wq = create_singlethread_workqueue("kacpi_notify");
- bind_workqueue(kacpi_notify_wq);
- kacpi_hotplug_wq = create_singlethread_workqueue("kacpi_hotplug");
- bind_workqueue(kacpi_hotplug_wq);
+ kacpid_wq = create_workqueue("kacpid");
+ kacpi_notify_wq = create_workqueue("kacpi_notify");
+ kacpi_hotplug_wq = create_workqueue("kacpi_hotplug");
BUG_ON(!kacpid_wq);
BUG_ON(!kacpi_notify_wq);
BUG_ON(!kacpi_hotplug_wq);
@@ -759,7 +734,15 @@ static acpi_status __acpi_os_execute(acpi_execute_type type,
(type == OSL_NOTIFY_HANDLER ? kacpi_notify_wq : kacpid_wq);
dpc->wait = hp ? 1 : 0;
INIT_WORK(&dpc->work, acpi_os_execute_deferred);
- ret = queue_work(queue, &dpc->work);
+
+ /*
+ * On some machines, a software-initiated SMI causes corruption unless
+ * the SMI runs on CPU 0. An SMI can be initiated by any AML, but
+ * typically it's done in GPE-related methods that are run via
+ * workqueues, so we can avoid the known corruption cases by always
+ * queueing on CPU 0.
+ */
+ ret = queue_work_on(0, queue, &dpc->work);

if (!ret) {
printk(KERN_ERR PREFIX
--
1.6.4.2

2009-11-16 17:17:58

by Tejun Heo

[permalink] [raw]
Subject: [PATCH 10/21] stop_machine: reimplement without using workqueue

stop_machine() is the only user of RT workqueue. Reimplement it using
kthreads directly and rip RT support from workqueue. This is in
preparation of concurrency managed workqueue.

Signed-off-by: Tejun Heo <[email protected]>
---
include/linux/stop_machine.h | 6 ++
include/linux/workqueue.h | 20 +++---
init/main.c | 2 +
kernel/stop_machine.c | 151 ++++++++++++++++++++++++++++++++++-------
kernel/workqueue.c | 6 --
5 files changed, 142 insertions(+), 43 deletions(-)

diff --git a/include/linux/stop_machine.h b/include/linux/stop_machine.h
index baba3a2..2d32e06 100644
--- a/include/linux/stop_machine.h
+++ b/include/linux/stop_machine.h
@@ -53,6 +53,11 @@ int stop_machine_create(void);
*/
void stop_machine_destroy(void);

+/**
+ * init_stop_machine: initialize stop_machine during boot
+ */
+void init_stop_machine(void);
+
#else

static inline int stop_machine(int (*fn)(void *), void *data,
@@ -67,6 +72,7 @@ static inline int stop_machine(int (*fn)(void *), void *data,

static inline int stop_machine_create(void) { return 0; }
static inline void stop_machine_destroy(void) { }
+static inline void init_stop_machine(void) { }

#endif /* CONFIG_SMP */
#endif /* _LINUX_STOP_MACHINE */
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 9466e86..0697946 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -181,12 +181,11 @@ static inline void destroy_work_on_stack(struct work_struct *work) { }


extern struct workqueue_struct *
-__create_workqueue_key(const char *name, int singlethread,
- int freezeable, int rt, struct lock_class_key *key,
- const char *lock_name);
+__create_workqueue_key(const char *name, int singlethread, int freezeable,
+ struct lock_class_key *key, const char *lock_name);

#ifdef CONFIG_LOCKDEP
-#define __create_workqueue(name, singlethread, freezeable, rt) \
+#define __create_workqueue(name, singlethread, freezeable) \
({ \
static struct lock_class_key __key; \
const char *__lock_name; \
@@ -197,19 +196,18 @@ __create_workqueue_key(const char *name, int singlethread,
__lock_name = #name; \
\
__create_workqueue_key((name), (singlethread), \
- (freezeable), (rt), &__key, \
+ (freezeable), &__key, \
__lock_name); \
})
#else
-#define __create_workqueue(name, singlethread, freezeable, rt) \
- __create_workqueue_key((name), (singlethread), (freezeable), (rt), \
+#define __create_workqueue(name, singlethread, freezeable) \
+ __create_workqueue_key((name), (singlethread), (freezeable), \
NULL, NULL)
#endif

-#define create_workqueue(name) __create_workqueue((name), 0, 0, 0)
-#define create_rt_workqueue(name) __create_workqueue((name), 0, 0, 1)
-#define create_freezeable_workqueue(name) __create_workqueue((name), 1, 1, 0)
-#define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0, 0)
+#define create_workqueue(name) __create_workqueue((name), 0, 0)
+#define create_freezeable_workqueue(name) __create_workqueue((name), 1, 1)
+#define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0)

extern void destroy_workqueue(struct workqueue_struct *wq);

diff --git a/init/main.c b/init/main.c
index 5988deb..9998725 100644
--- a/init/main.c
+++ b/init/main.c
@@ -34,6 +34,7 @@
#include <linux/security.h>
#include <linux/smp.h>
#include <linux/workqueue.h>
+#include <linux/stop_machine.h>
#include <linux/profile.h>
#include <linux/rcupdate.h>
#include <linux/moduleparam.h>
@@ -779,6 +780,7 @@ static void __init do_initcalls(void)
static void __init do_basic_setup(void)
{
init_workqueues();
+ init_stop_machine();
cpuset_init_smp();
usermodehelper_init();
init_tmpfs();
diff --git a/kernel/stop_machine.c b/kernel/stop_machine.c
index 912823e..671a4ac 100644
--- a/kernel/stop_machine.c
+++ b/kernel/stop_machine.c
@@ -25,6 +25,8 @@ enum stopmachine_state {
STOPMACHINE_RUN,
/* Exit */
STOPMACHINE_EXIT,
+ /* Done */
+ STOPMACHINE_DONE,
};
static enum stopmachine_state state;

@@ -42,10 +44,9 @@ static DEFINE_MUTEX(lock);
static DEFINE_MUTEX(setup_lock);
/* Users of stop_machine. */
static int refcount;
-static struct workqueue_struct *stop_machine_wq;
+static struct task_struct **stop_machine_threads;
static struct stop_machine_data active, idle;
static const struct cpumask *active_cpus;
-static void *stop_machine_work;

static void set_state(enum stopmachine_state newstate)
{
@@ -63,14 +64,31 @@ static void ack_state(void)
}

/* This is the actual function which stops the CPU. It runs
- * in the context of a dedicated stopmachine workqueue. */
-static void stop_cpu(struct work_struct *unused)
+ * on dedicated per-cpu kthreads. */
+static int stop_cpu(void *unused)
{
enum stopmachine_state curstate = STOPMACHINE_NONE;
- struct stop_machine_data *smdata = &idle;
+ struct stop_machine_data *smdata;
int cpu = smp_processor_id();
int err;

+repeat:
+ /* Wait for __stop_machine() to initiate */
+ while (true) {
+ set_current_state(TASK_INTERRUPTIBLE);
+ /* <- kthread_stop() and __stop_machine()::smp_wmb() */
+ if (kthread_should_stop()) {
+ __set_current_state(TASK_RUNNING);
+ return 0;
+ }
+ if (state == STOPMACHINE_PREPARE)
+ break;
+ schedule();
+ }
+ smp_rmb(); /* <- __stop_machine()::set_state() */
+
+ /* Okay, let's go */
+ smdata = &idle;
if (!active_cpus) {
if (cpu == cpumask_first(cpu_online_mask))
smdata = &active;
@@ -104,6 +122,7 @@ static void stop_cpu(struct work_struct *unused)
} while (curstate != STOPMACHINE_EXIT);

local_irq_enable();
+ goto repeat;
}

/* Callback for CPUs which aren't supposed to do anything. */
@@ -112,46 +131,122 @@ static int chill(void *unused)
return 0;
}

+static int create_stop_machine_thread(unsigned int cpu)
+{
+ struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 };
+ struct task_struct **pp = per_cpu_ptr(stop_machine_threads, cpu);
+ struct task_struct *p;
+
+ if (*pp)
+ return -EBUSY;
+
+ p = kthread_create(stop_cpu, NULL, "kstop/%u", cpu);
+ if (IS_ERR(p))
+ return PTR_ERR(p);
+
+ sched_setscheduler_nocheck(p, SCHED_FIFO, &param);
+ *pp = p;
+ return 0;
+}
+
+/* Should be called with cpu hotplug disabled and setup_lock held */
+static void kill_stop_machine_threads(void)
+{
+ unsigned int cpu;
+
+ if (!stop_machine_threads)
+ return;
+
+ for_each_online_cpu(cpu) {
+ struct task_struct *p = *per_cpu_ptr(stop_machine_threads, cpu);
+ if (p)
+ kthread_stop(p);
+ }
+ free_percpu(stop_machine_threads);
+ stop_machine_threads = NULL;
+}
+
int stop_machine_create(void)
{
+ unsigned int cpu;
+
+ get_online_cpus();
mutex_lock(&setup_lock);
if (refcount)
goto done;
- stop_machine_wq = create_rt_workqueue("kstop");
- if (!stop_machine_wq)
- goto err_out;
- stop_machine_work = alloc_percpu(struct work_struct);
- if (!stop_machine_work)
+
+ stop_machine_threads = alloc_percpu(struct task_struct *);
+ if (!stop_machine_threads)
goto err_out;
+
+ /*
+ * cpu hotplug is disabled, create only for online cpus,
+ * cpu_callback() will handle cpu hot [un]plugs.
+ */
+ for_each_online_cpu(cpu) {
+ if (create_stop_machine_thread(cpu))
+ goto err_out;
+ kthread_bind(*per_cpu_ptr(stop_machine_threads, cpu), cpu);
+ }
done:
refcount++;
mutex_unlock(&setup_lock);
+ put_online_cpus();
return 0;

err_out:
- if (stop_machine_wq)
- destroy_workqueue(stop_machine_wq);
+ kill_stop_machine_threads();
mutex_unlock(&setup_lock);
+ put_online_cpus();
return -ENOMEM;
}
EXPORT_SYMBOL_GPL(stop_machine_create);

void stop_machine_destroy(void)
{
+ get_online_cpus();
mutex_lock(&setup_lock);
- refcount--;
- if (refcount)
- goto done;
- destroy_workqueue(stop_machine_wq);
- free_percpu(stop_machine_work);
-done:
+ if (!--refcount)
+ kill_stop_machine_threads();
mutex_unlock(&setup_lock);
+ put_online_cpus();
}
EXPORT_SYMBOL_GPL(stop_machine_destroy);

+static int __cpuinit stop_machine_cpu_callback(struct notifier_block *nfb,
+ unsigned long action, void *hcpu)
+{
+ unsigned int cpu = (unsigned long)hcpu;
+ struct task_struct **pp = per_cpu_ptr(stop_machine_threads, cpu);
+
+ /* Hotplug exclusion is enough, no need to worry about setup_lock */
+ if (!stop_machine_threads)
+ return NOTIFY_OK;
+
+ switch (action & ~CPU_TASKS_FROZEN) {
+ case CPU_UP_PREPARE:
+ if (create_stop_machine_thread(cpu)) {
+ printk(KERN_ERR "failed to create stop machine "
+ "thread for %u\n", cpu);
+ return NOTIFY_BAD;
+ }
+ break;
+
+ case CPU_ONLINE:
+ kthread_bind(*pp, cpu);
+ break;
+
+ case CPU_UP_CANCELED:
+ case CPU_POST_DEAD:
+ kthread_stop(*pp);
+ *pp = NULL;
+ break;
+ }
+ return NOTIFY_OK;
+}
+
int __stop_machine(int (*fn)(void *), void *data, const struct cpumask *cpus)
{
- struct work_struct *sm_work;
int i, ret;

/* Set up initial state. */
@@ -164,19 +259,18 @@ int __stop_machine(int (*fn)(void *), void *data, const struct cpumask *cpus)
idle.fn = chill;
idle.data = NULL;

- set_state(STOPMACHINE_PREPARE);
+ set_state(STOPMACHINE_PREPARE); /* -> stop_cpu()::smp_rmb() */
+ smp_wmb(); /* -> stop_cpu()::set_current_state() */

/* Schedule the stop_cpu work on all cpus: hold this CPU so one
* doesn't hit this CPU until we're ready. */
get_cpu();
- for_each_online_cpu(i) {
- sm_work = per_cpu_ptr(stop_machine_work, i);
- INIT_WORK(sm_work, stop_cpu);
- queue_work_on(i, stop_machine_wq, sm_work);
- }
+ for_each_online_cpu(i)
+ wake_up_process(*per_cpu_ptr(stop_machine_threads, i));
/* This will release the thread on our CPU. */
put_cpu();
- flush_workqueue(stop_machine_wq);
+ while (state < STOPMACHINE_DONE)
+ yield();
ret = active.fnret;
mutex_unlock(&lock);
return ret;
@@ -197,3 +291,8 @@ int stop_machine(int (*fn)(void *), void *data, const struct cpumask *cpus)
return ret;
}
EXPORT_SYMBOL_GPL(stop_machine);
+
+void __init init_stop_machine(void)
+{
+ hotcpu_notifier(stop_machine_cpu_callback, 0);
+}
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index dee4865..3dccec6 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -62,7 +62,6 @@ struct workqueue_struct {
const char *name;
int singlethread;
int freezeable; /* Freeze threads during suspend */
- int rt;
#ifdef CONFIG_LOCKDEP
struct lockdep_map lockdep_map;
#endif
@@ -913,7 +912,6 @@ init_cpu_workqueue(struct workqueue_struct *wq, int cpu)

static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
{
- struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 };
struct workqueue_struct *wq = cwq->wq;
const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d";
struct task_struct *p;
@@ -929,8 +927,6 @@ static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
*/
if (IS_ERR(p))
return PTR_ERR(p);
- if (cwq->wq->rt)
- sched_setscheduler_nocheck(p, SCHED_FIFO, &param);
cwq->thread = p;

trace_workqueue_creation(cwq->thread, cpu);
@@ -952,7 +948,6 @@ static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
struct workqueue_struct *__create_workqueue_key(const char *name,
int singlethread,
int freezeable,
- int rt,
struct lock_class_key *key,
const char *lock_name)
{
@@ -974,7 +969,6 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
wq->singlethread = singlethread;
wq->freezeable = freezeable;
- wq->rt = rt;
INIT_LIST_HEAD(&wq->list);

if (singlethread) {
--
1.6.4.2

2009-11-16 17:16:45

by Tejun Heo

[permalink] [raw]
Subject: [PATCH 11/21] workqueue: misc/cosmetic updates

Make the following updates in preparation of concurrency managed
workqueue. None of these changes causes any visible behavior
difference.

* Add comments and adjust indentations to data structures and several
functions.

* Rename wq_per_cpu() to get_cwq() and swap the position of two
parameters for consistency. Convert a direct per_cpu_ptr() access
to wq->cpu_wq to get_cwq().

* Add work_static() and Update set_wq_data() such that it sets the
flags part to WORK_STRUCT_PENDING | WORK_STRUCT_STATIC if static |
@extra_flags.

* Move santiy check on work->entry emptiness from queue_work_on() to
__queue_work() which all queueing paths share.

* Make __queue_work() take @cpu and @wq instead of @cwq.

* Restructure flush_work() and __create_workqueue_key() to make them
easier to modify.

Signed-off-by: Tejun Heo <[email protected]>
---
include/linux/workqueue.h | 5 ++
kernel/workqueue.c | 127 +++++++++++++++++++++++++++++----------------
2 files changed, 88 insertions(+), 44 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 0697946..ac06c55 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -96,9 +96,14 @@ struct execute_work {
#ifdef CONFIG_DEBUG_OBJECTS_WORK
extern void __init_work(struct work_struct *work, int onstack);
extern void destroy_work_on_stack(struct work_struct *work);
+static inline bool work_static(struct work_struct *work)
+{
+ return test_bit(WORK_STRUCT_STATIC, work_data_bits(work));
+}
#else
static inline void __init_work(struct work_struct *work, int onstack) { }
static inline void destroy_work_on_stack(struct work_struct *work) { }
+static inline bool work_static(struct work_struct *work) { return false; }
#endif

/*
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 3dccec6..e16c457 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -37,6 +37,16 @@
#include <trace/events/workqueue.h>

/*
+ * Structure fields follow one of the following exclusion rules.
+ *
+ * I: Set during initialization and read-only afterwards.
+ *
+ * L: cwq->lock protected. Access with cwq->lock held.
+ *
+ * W: workqueue_lock protected.
+ */
+
+/*
* The per-CPU workqueue (if single thread, we always use the first
* possible cpu).
*/
@@ -48,8 +58,8 @@ struct cpu_workqueue_struct {
wait_queue_head_t more_work;
struct work_struct *current_work;

- struct workqueue_struct *wq;
- struct task_struct *thread;
+ struct workqueue_struct *wq; /* I: the owning workqueue */
+ struct task_struct *thread;
} ____cacheline_aligned;

/*
@@ -57,13 +67,13 @@ struct cpu_workqueue_struct {
* per-CPU workqueues:
*/
struct workqueue_struct {
- struct cpu_workqueue_struct *cpu_wq;
- struct list_head list;
- const char *name;
+ struct cpu_workqueue_struct *cpu_wq; /* I: cwq's */
+ struct list_head list; /* W: list of all workqueues */
+ const char *name; /* I: workqueue name */
int singlethread;
int freezeable; /* Freeze threads during suspend */
#ifdef CONFIG_LOCKDEP
- struct lockdep_map lockdep_map;
+ struct lockdep_map lockdep_map;
#endif
};

@@ -204,8 +214,8 @@ static const struct cpumask *wq_cpu_map(struct workqueue_struct *wq)
? cpu_singlethread_map : cpu_populated_map;
}

-static
-struct cpu_workqueue_struct *wq_per_cpu(struct workqueue_struct *wq, int cpu)
+static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
+ struct workqueue_struct *wq)
{
if (unlikely(is_wq_single_threaded(wq)))
cpu = singlethread_cpu;
@@ -217,15 +227,14 @@ struct cpu_workqueue_struct *wq_per_cpu(struct workqueue_struct *wq, int cpu)
* - Must *only* be called if the pending flag is set
*/
static inline void set_wq_data(struct work_struct *work,
- struct cpu_workqueue_struct *cwq)
+ struct cpu_workqueue_struct *cwq,
+ unsigned long extra_flags)
{
- unsigned long new;
-
BUG_ON(!work_pending(work));

- new = (unsigned long) cwq | (1UL << WORK_STRUCT_PENDING);
- new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work);
- atomic_long_set(&work->data, new);
+ atomic_long_set(&work->data, (unsigned long)cwq |
+ (work_static(work) ? (1UL << WORK_STRUCT_STATIC) : 0) |
+ (1UL << WORK_STRUCT_PENDING) | extra_flags);
}

static inline
@@ -234,29 +243,47 @@ struct cpu_workqueue_struct *get_wq_data(struct work_struct *work)
return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK);
}

+/**
+ * insert_work - insert a work into cwq
+ * @cwq: cwq @work belongs to
+ * @work: work to insert
+ * @head: insertion point
+ * @extra_flags: extra WORK_STRUCT_* flags to set
+ *
+ * Insert @work into @cwq after @head.
+ *
+ * CONTEXT:
+ * spin_lock_irq(cwq->lock).
+ */
static void insert_work(struct cpu_workqueue_struct *cwq,
- struct work_struct *work, struct list_head *head)
+ struct work_struct *work, struct list_head *head,
+ unsigned int extra_flags)
{
trace_workqueue_insertion(cwq->thread, work);

- set_wq_data(work, cwq);
+ /* we own @work, set data and link */
+ set_wq_data(work, cwq, extra_flags);
+
/*
* Ensure that we get the right work->data if we see the
* result of list_add() below, see try_to_grab_pending().
*/
smp_wmb();
+
list_add_tail(&work->entry, head);
wake_up(&cwq->more_work);
}

-static void __queue_work(struct cpu_workqueue_struct *cwq,
+static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
struct work_struct *work)
{
+ struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
unsigned long flags;

debug_work_activate(work);
spin_lock_irqsave(&cwq->lock, flags);
- insert_work(cwq, work, &cwq->worklist);
+ BUG_ON(!list_empty(&work->entry));
+ insert_work(cwq, work, &cwq->worklist, 0);
spin_unlock_irqrestore(&cwq->lock, flags);
}

@@ -298,8 +325,7 @@ queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
int ret = 0;

if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
- BUG_ON(!list_empty(&work->entry));
- __queue_work(wq_per_cpu(wq, cpu), work);
+ __queue_work(cpu, wq, work);
ret = 1;
}
return ret;
@@ -310,9 +336,8 @@ static void delayed_work_timer_fn(unsigned long __data)
{
struct delayed_work *dwork = (struct delayed_work *)__data;
struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work);
- struct workqueue_struct *wq = cwq->wq;

- __queue_work(wq_per_cpu(wq, smp_processor_id()), &dwork->work);
+ __queue_work(smp_processor_id(), cwq->wq, &dwork->work);
}

/**
@@ -356,7 +381,7 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
timer_stats_timer_set_start_info(&dwork->timer);

/* This stores cwq for the moment, for the timer_fn */
- set_wq_data(work, wq_per_cpu(wq, raw_smp_processor_id()));
+ set_wq_data(work, get_cwq(raw_smp_processor_id(), wq), 0);
timer->expires = jiffies + delay;
timer->data = (unsigned long)dwork;
timer->function = delayed_work_timer_fn;
@@ -420,6 +445,12 @@ static void run_workqueue(struct cpu_workqueue_struct *cwq)
spin_unlock_irq(&cwq->lock);
}

+/**
+ * worker_thread - the worker thread function
+ * @__cwq: cwq to serve
+ *
+ * The cwq worker thread function.
+ */
static int worker_thread(void *__cwq)
{
struct cpu_workqueue_struct *cwq = __cwq;
@@ -458,6 +489,17 @@ static void wq_barrier_func(struct work_struct *work)
complete(&barr->done);
}

+/**
+ * insert_wq_barrier - insert a barrier work
+ * @cwq: cwq to insert barrier into
+ * @barr: wq_barrier to insert
+ * @head: insertion point
+ *
+ * Insert barrier @barr into @cwq before @head.
+ *
+ * CONTEXT:
+ * spin_lock_irq(cwq->lock).
+ */
static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
struct wq_barrier *barr, struct list_head *head)
{
@@ -469,11 +511,10 @@ static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
*/
INIT_WORK_ON_STACK(&barr->work, wq_barrier_func);
__set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work));
-
init_completion(&barr->done);

debug_work_activate(&barr->work);
- insert_work(cwq, &barr->work, head);
+ insert_work(cwq, &barr->work, head, 0);
}

static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
@@ -507,9 +548,6 @@ static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
*
* We sleep until all works which were queued on entry have been handled,
* but we are not livelocked by new incoming ones.
- *
- * This function used to run the workqueues itself. Now we just wait for the
- * helper threads to do it.
*/
void flush_workqueue(struct workqueue_struct *wq)
{
@@ -548,7 +586,6 @@ int flush_work(struct work_struct *work)
lock_map_acquire(&cwq->wq->lockdep_map);
lock_map_release(&cwq->wq->lockdep_map);

- prev = NULL;
spin_lock_irq(&cwq->lock);
if (!list_empty(&work->entry)) {
/*
@@ -557,22 +594,22 @@ int flush_work(struct work_struct *work)
*/
smp_rmb();
if (unlikely(cwq != get_wq_data(work)))
- goto out;
+ goto already_gone;
prev = &work->entry;
} else {
if (cwq->current_work != work)
- goto out;
+ goto already_gone;
prev = &cwq->worklist;
}
insert_wq_barrier(cwq, &barr, prev->next);
-out:
- spin_unlock_irq(&cwq->lock);
- if (!prev)
- return 0;

+ spin_unlock_irq(&cwq->lock);
wait_for_completion(&barr.done);
destroy_work_on_stack(&barr.work);
return 1;
+already_gone:
+ spin_unlock_irq(&cwq->lock);
+ return 0;
}
EXPORT_SYMBOL_GPL(flush_work);

@@ -655,7 +692,7 @@ static void wait_on_work(struct work_struct *work)
cpu_map = wq_cpu_map(wq);

for_each_cpu(cpu, cpu_map)
- wait_on_cpu_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
+ wait_on_cpu_work(get_cwq(cpu, wq), work);
}

static int __cancel_work_timer(struct work_struct *work,
@@ -772,9 +809,7 @@ EXPORT_SYMBOL(schedule_delayed_work);
void flush_delayed_work(struct delayed_work *dwork)
{
if (del_timer_sync(&dwork->timer)) {
- struct cpu_workqueue_struct *cwq;
- cwq = wq_per_cpu(keventd_wq, get_cpu());
- __queue_work(cwq, &dwork->work);
+ __queue_work(get_cpu(), keventd_wq, &dwork->work);
put_cpu();
}
flush_work(&dwork->work);
@@ -957,13 +992,11 @@ struct workqueue_struct *__create_workqueue_key(const char *name,

wq = kzalloc(sizeof(*wq), GFP_KERNEL);
if (!wq)
- return NULL;
+ goto err;

wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
- if (!wq->cpu_wq) {
- kfree(wq);
- return NULL;
- }
+ if (!wq->cpu_wq)
+ goto err;

wq->name = name;
lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
@@ -1007,6 +1040,12 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
wq = NULL;
}
return wq;
+err:
+ if (wq) {
+ free_percpu(wq->cpu_wq);
+ kfree(wq);
+ }
+ return NULL;
}
EXPORT_SYMBOL_GPL(__create_workqueue_key);

--
1.6.4.2

2009-11-16 17:17:09

by Tejun Heo

[permalink] [raw]
Subject: [PATCH 12/21] workqueue: merge feature parametesr into flags

Currently, __create_workqueue_key() takes @singlethread and
@freezeable paramters and store them separately in workqueue_struct.
Merge them into a single flags parameter and field and use
WQ_FREEZEABLE and WQ_SINGLE_THREAD.

Signed-off-by: Tejun Heo <[email protected]>
---
include/linux/workqueue.h | 25 +++++++++++++++----------
kernel/workqueue.c | 17 +++++++----------
2 files changed, 22 insertions(+), 20 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index ac06c55..495572a 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -184,13 +184,17 @@ static inline bool work_static(struct work_struct *work) { return false; }
#define work_clear_pending(work) \
clear_bit(WORK_STRUCT_PENDING, work_data_bits(work))

+enum {
+ WQ_FREEZEABLE = 1 << 0, /* freeze during suspend */
+ WQ_SINGLE_THREAD = 1 << 1, /* no per-cpu worker */
+};

extern struct workqueue_struct *
-__create_workqueue_key(const char *name, int singlethread, int freezeable,
+__create_workqueue_key(const char *name, unsigned int flags,
struct lock_class_key *key, const char *lock_name);

#ifdef CONFIG_LOCKDEP
-#define __create_workqueue(name, singlethread, freezeable) \
+#define __create_workqueue(name, flags) \
({ \
static struct lock_class_key __key; \
const char *__lock_name; \
@@ -200,19 +204,20 @@ __create_workqueue_key(const char *name, int singlethread, int freezeable,
else \
__lock_name = #name; \
\
- __create_workqueue_key((name), (singlethread), \
- (freezeable), &__key, \
+ __create_workqueue_key((name), (flags), &__key, \
__lock_name); \
})
#else
-#define __create_workqueue(name, singlethread, freezeable) \
- __create_workqueue_key((name), (singlethread), (freezeable), \
- NULL, NULL)
+#define __create_workqueue(name, flags) \
+ __create_workqueue_key((name), (flags), NULL, NULL)
#endif

-#define create_workqueue(name) __create_workqueue((name), 0, 0)
-#define create_freezeable_workqueue(name) __create_workqueue((name), 1, 1)
-#define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0)
+#define create_workqueue(name) \
+ __create_workqueue((name), 0)
+#define create_freezeable_workqueue(name) \
+ __create_workqueue((name), WQ_FREEZEABLE | WQ_SINGLE_THREAD)
+#define create_singlethread_workqueue(name) \
+ __create_workqueue((name), WQ_SINGLE_THREAD)

extern void destroy_workqueue(struct workqueue_struct *wq);

diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index e16c457..579041f 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -67,11 +67,10 @@ struct cpu_workqueue_struct {
* per-CPU workqueues:
*/
struct workqueue_struct {
+ unsigned int flags; /* I: WQ_* flags */
struct cpu_workqueue_struct *cpu_wq; /* I: cwq's */
struct list_head list; /* W: list of all workqueues */
const char *name; /* I: workqueue name */
- int singlethread;
- int freezeable; /* Freeze threads during suspend */
#ifdef CONFIG_LOCKDEP
struct lockdep_map lockdep_map;
#endif
@@ -203,9 +202,9 @@ static const struct cpumask *cpu_singlethread_map __read_mostly;
static cpumask_var_t cpu_populated_map __read_mostly;

/* If it's single threaded, it isn't in the list of workqueues. */
-static inline int is_wq_single_threaded(struct workqueue_struct *wq)
+static inline bool is_wq_single_threaded(struct workqueue_struct *wq)
{
- return wq->singlethread;
+ return wq->flags & WQ_SINGLE_THREAD;
}

static const struct cpumask *wq_cpu_map(struct workqueue_struct *wq)
@@ -456,7 +455,7 @@ static int worker_thread(void *__cwq)
struct cpu_workqueue_struct *cwq = __cwq;
DEFINE_WAIT(wait);

- if (cwq->wq->freezeable)
+ if (cwq->wq->flags & WQ_FREEZEABLE)
set_freezable();

for (;;) {
@@ -981,8 +980,7 @@ static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
}

struct workqueue_struct *__create_workqueue_key(const char *name,
- int singlethread,
- int freezeable,
+ unsigned int flags,
struct lock_class_key *key,
const char *lock_name)
{
@@ -998,13 +996,12 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
if (!wq->cpu_wq)
goto err;

+ wq->flags = flags;
wq->name = name;
lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
- wq->singlethread = singlethread;
- wq->freezeable = freezeable;
INIT_LIST_HEAD(&wq->list);

- if (singlethread) {
+ if (flags & WQ_SINGLE_THREAD) {
cwq = init_cpu_workqueue(wq, singlethread_cpu);
err = create_workqueue_thread(cwq, singlethread_cpu);
start_workqueue_thread(cwq, -1);
--
1.6.4.2

2009-11-16 17:19:56

by Tejun Heo

[permalink] [raw]
Subject: [PATCH 13/21] workqueue: update cwq alignement and make one more flag bit available

Currently cwqs are aligned to cacheline and lower two bits of
work_struct->data are considered to be available and used as flags.
Make the alignement requirement official by defining
WORK_STRUCT_FLAG_BITS and aligning cwqs to two's power of it.

This is in preparation of concurrency managed workqueue and cwqs being
aligned to cacheline wouldn't matter as much. While at it, this patch
reserves one more bit for work flags and make sure the resulting
alignment is at least equal to or larger than that of long long.

Signed-off-by: Tejun Heo <[email protected]>
---
include/linux/workqueue.h | 19 +++++++++++++++----
kernel/workqueue.c | 15 +++++++++++++--
2 files changed, 28 insertions(+), 6 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 495572a..5ff8c44 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -22,12 +22,23 @@ typedef void (*work_func_t)(struct work_struct *work);
*/
#define work_data_bits(work) ((unsigned long *)(&(work)->data))

+enum {
+ WORK_STRUCT_PENDING = 0, /* work item is pending execution */
+ WORK_STRUCT_STATIC = 1, /* static initializer (debugobjects) */
+
+ /*
+ * Reserve 3bits off of cwq pointer. This is enough and
+ * provides acceptable alignment on both 32 and 64bit
+ * machines.
+ */
+ WORK_STRUCT_FLAG_BITS = 3,
+
+ WORK_STRUCT_FLAG_MASK = (1UL << WORK_STRUCT_FLAG_BITS) - 1,
+ WORK_STRUCT_WQ_DATA_MASK = ~WORK_STRUCT_FLAG_MASK,
+};
+
struct work_struct {
atomic_long_t data;
-#define WORK_STRUCT_PENDING 0 /* T if work item pending execution */
-#define WORK_STRUCT_STATIC 1 /* static initializer (debugobjects) */
-#define WORK_STRUCT_FLAG_MASK (3UL)
-#define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK)
struct list_head entry;
work_func_t func;
#ifdef CONFIG_LOCKDEP
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 579041f..f30977f 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -48,7 +48,9 @@

/*
* The per-CPU workqueue (if single thread, we always use the first
- * possible cpu).
+ * possible cpu). The lower WORK_STRUCT_FLAG_BITS of
+ * work_struct->data are used for flags and thus cwqs need to be
+ * aligned at two's power of the number of flag bits.
*/
struct cpu_workqueue_struct {

@@ -60,7 +62,7 @@ struct cpu_workqueue_struct {

struct workqueue_struct *wq; /* I: the owning workqueue */
struct task_struct *thread;
-} ____cacheline_aligned;
+} __attribute__((aligned(1 << WORK_STRUCT_FLAG_BITS)));

/*
* The externally visible workqueue abstraction is an array of
@@ -1198,6 +1200,15 @@ EXPORT_SYMBOL_GPL(work_on_cpu);

void __init init_workqueues(void)
{
+ /*
+ * cwqs are forced aligned according to WORK_STRUCT_FLAG_BITS.
+ * Make sure that the alignment isn't lower than that of
+ * unsigned long long in case this code survives for longer
+ * than twenty years. :-P
+ */
+ BUILD_BUG_ON(__alignof__(struct cpu_workqueue_struct) <
+ __alignof__(unsigned long long));
+
alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL);

cpumask_copy(cpu_populated_map, cpu_online_mask);
--
1.6.4.2

2009-11-16 17:17:00

by Tejun Heo

[permalink] [raw]
Subject: [PATCH 14/21] workqueue: define both bit position and mask for work flags

Work flags are about to see more traditional mask handling. Define
WORK_STRUCT_*_BIT as the bit position constant and redefine
WORK_STRUCT_* as bit masks.

Signed-off-by: Tejun Heo <[email protected]>
---
include/linux/workqueue.h | 15 +++++++++------
kernel/workqueue.c | 14 +++++++-------
2 files changed, 16 insertions(+), 13 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 5ff8c44..8e689d1 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -23,8 +23,11 @@ typedef void (*work_func_t)(struct work_struct *work);
#define work_data_bits(work) ((unsigned long *)(&(work)->data))

enum {
- WORK_STRUCT_PENDING = 0, /* work item is pending execution */
- WORK_STRUCT_STATIC = 1, /* static initializer (debugobjects) */
+ WORK_STRUCT_PENDING_BIT = 0, /* work item is pending execution */
+ WORK_STRUCT_STATIC_BIT = 1, /* static initializer (debugobjects) */
+
+ WORK_STRUCT_PENDING = 1 << WORK_STRUCT_PENDING_BIT,
+ WORK_STRUCT_STATIC = 1 << WORK_STRUCT_STATIC_BIT,

/*
* Reserve 3bits off of cwq pointer. This is enough and
@@ -47,7 +50,7 @@ struct work_struct {
};

#define WORK_DATA_INIT() ATOMIC_LONG_INIT(0)
-#define WORK_DATA_STATIC_INIT() ATOMIC_LONG_INIT(2)
+#define WORK_DATA_STATIC_INIT() ATOMIC_LONG_INIT(WORK_STRUCT_STATIC)

struct delayed_work {
struct work_struct work;
@@ -109,7 +112,7 @@ extern void __init_work(struct work_struct *work, int onstack);
extern void destroy_work_on_stack(struct work_struct *work);
static inline bool work_static(struct work_struct *work)
{
- return test_bit(WORK_STRUCT_STATIC, work_data_bits(work));
+ return test_bit(WORK_STRUCT_STATIC_BIT, work_data_bits(work));
}
#else
static inline void __init_work(struct work_struct *work, int onstack) { }
@@ -178,7 +181,7 @@ static inline bool work_static(struct work_struct *work) { return false; }
* @work: The work item in question
*/
#define work_pending(work) \
- test_bit(WORK_STRUCT_PENDING, work_data_bits(work))
+ test_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))

/**
* delayed_work_pending - Find out whether a delayable work item is currently
@@ -193,7 +196,7 @@ static inline bool work_static(struct work_struct *work) { return false; }
* @work: The work item in question
*/
#define work_clear_pending(work) \
- clear_bit(WORK_STRUCT_PENDING, work_data_bits(work))
+ clear_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))

enum {
WQ_FREEZEABLE = 1 << 0, /* freeze during suspend */
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index f30977f..0083da6 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -117,7 +117,7 @@ static int work_fixup_activate(void *addr, enum debug_obj_state state)
* statically initialized. We just make sure that it
* is tracked in the object tracker.
*/
- if (test_bit(WORK_STRUCT_STATIC, work_data_bits(work))) {
+ if (test_bit(WORK_STRUCT_STATIC_BIT, work_data_bits(work))) {
debug_object_init(work, &work_debug_descr);
debug_object_activate(work, &work_debug_descr);
return 0;
@@ -234,8 +234,8 @@ static inline void set_wq_data(struct work_struct *work,
BUG_ON(!work_pending(work));

atomic_long_set(&work->data, (unsigned long)cwq |
- (work_static(work) ? (1UL << WORK_STRUCT_STATIC) : 0) |
- (1UL << WORK_STRUCT_PENDING) | extra_flags);
+ (work_static(work) ? WORK_STRUCT_STATIC : 0) |
+ WORK_STRUCT_PENDING | extra_flags);
}

static inline
@@ -325,7 +325,7 @@ queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
{
int ret = 0;

- if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
+ if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
__queue_work(cpu, wq, work);
ret = 1;
}
@@ -375,7 +375,7 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
struct timer_list *timer = &dwork->timer;
struct work_struct *work = &dwork->work;

- if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
+ if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
BUG_ON(timer_pending(timer));
BUG_ON(!list_empty(&work->entry));

@@ -511,7 +511,7 @@ static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
* might deadlock.
*/
INIT_WORK_ON_STACK(&barr->work, wq_barrier_func);
- __set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work));
+ __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
init_completion(&barr->done);

debug_work_activate(&barr->work);
@@ -623,7 +623,7 @@ static int try_to_grab_pending(struct work_struct *work)
struct cpu_workqueue_struct *cwq;
int ret = -1;

- if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work)))
+ if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)))
return 0;

/*
--
1.6.4.2

2009-11-16 17:17:40

by Tejun Heo

[permalink] [raw]
Subject: [PATCH 15/21] workqueue: separate out process_one_work()

Separate out process_one_work() out of run_workqueue(). This patch
doesn't cause any behavior change.

Signed-off-by: Tejun Heo <[email protected]>
---
kernel/workqueue.c | 100 +++++++++++++++++++++++++++++++--------------------
1 files changed, 61 insertions(+), 39 deletions(-)

diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 0083da6..d8ecc40 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -397,51 +397,73 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
}
EXPORT_SYMBOL_GPL(queue_delayed_work_on);

+/**
+ * process_one_work - process single work
+ * @cwq: cwq to process work for
+ * @work: work to process
+ *
+ * Process @work. This function contains all the logics necessary to
+ * process a single work including synchronization against and
+ * interaction with other workers on the same cpu, queueing and
+ * flushing. As long as context requirement is met, any worker can
+ * call this function to process a work.
+ *
+ * CONTEXT:
+ * spin_lock_irq(cwq->lock) which is released and regrabbed.
+ */
+static void process_one_work(struct cpu_workqueue_struct *cwq,
+ struct work_struct *work)
+{
+ work_func_t f = work->func;
+#ifdef CONFIG_LOCKDEP
+ /*
+ * It is permissible to free the struct work_struct from
+ * inside the function that is called from it, this we need to
+ * take into account for lockdep too. To avoid bogus "held
+ * lock freed" warnings as well as problems when looking into
+ * work->lockdep_map, make a copy and use that here.
+ */
+ struct lockdep_map lockdep_map = work->lockdep_map;
+#endif
+ /* claim and process */
+ trace_workqueue_execution(cwq->thread, work);
+ debug_work_deactivate(work);
+ cwq->current_work = work;
+ list_del_init(&work->entry);
+
+ spin_unlock_irq(&cwq->lock);
+
+ BUG_ON(get_wq_data(work) != cwq);
+ work_clear_pending(work);
+ lock_map_acquire(&cwq->wq->lockdep_map);
+ lock_map_acquire(&lockdep_map);
+ f(work);
+ lock_map_release(&lockdep_map);
+ lock_map_release(&cwq->wq->lockdep_map);
+
+ if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
+ printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
+ "%s/0x%08x/%d\n",
+ current->comm, preempt_count(), task_pid_nr(current));
+ printk(KERN_ERR " last function: ");
+ print_symbol("%s\n", (unsigned long)f);
+ debug_show_held_locks(current);
+ dump_stack();
+ }
+
+ spin_lock_irq(&cwq->lock);
+
+ /* we're done with it, release */
+ cwq->current_work = NULL;
+}
+
static void run_workqueue(struct cpu_workqueue_struct *cwq)
{
spin_lock_irq(&cwq->lock);
while (!list_empty(&cwq->worklist)) {
struct work_struct *work = list_entry(cwq->worklist.next,
struct work_struct, entry);
- work_func_t f = work->func;
-#ifdef CONFIG_LOCKDEP
- /*
- * It is permissible to free the struct work_struct
- * from inside the function that is called from it,
- * this we need to take into account for lockdep too.
- * To avoid bogus "held lock freed" warnings as well
- * as problems when looking into work->lockdep_map,
- * make a copy and use that here.
- */
- struct lockdep_map lockdep_map = work->lockdep_map;
-#endif
- trace_workqueue_execution(cwq->thread, work);
- debug_work_deactivate(work);
- cwq->current_work = work;
- list_del_init(cwq->worklist.next);
- spin_unlock_irq(&cwq->lock);
-
- BUG_ON(get_wq_data(work) != cwq);
- work_clear_pending(work);
- lock_map_acquire(&cwq->wq->lockdep_map);
- lock_map_acquire(&lockdep_map);
- f(work);
- lock_map_release(&lockdep_map);
- lock_map_release(&cwq->wq->lockdep_map);
-
- if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
- printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
- "%s/0x%08x/%d\n",
- current->comm, preempt_count(),
- task_pid_nr(current));
- printk(KERN_ERR " last function: ");
- print_symbol("%s\n", (unsigned long)f);
- debug_show_held_locks(current);
- dump_stack();
- }
-
- spin_lock_irq(&cwq->lock);
- cwq->current_work = NULL;
+ process_one_work(cwq, work);
}
spin_unlock_irq(&cwq->lock);
}
--
1.6.4.2

2009-11-16 17:17:26

by Tejun Heo

[permalink] [raw]
Subject: [PATCH 16/21] workqueue: temporarily disable workqueue tracing

Strip tracing code from workqueue and disable workqueue tracing. This
is temporary measure till concurrency managed workqueue is complete.

Signed-off-by: Tejun Heo <[email protected]>
---
kernel/trace/Kconfig | 4 +++-
kernel/workqueue.c | 14 +++-----------
2 files changed, 6 insertions(+), 12 deletions(-)

diff --git a/kernel/trace/Kconfig b/kernel/trace/Kconfig
index b416512..0e14ecf 100644
--- a/kernel/trace/Kconfig
+++ b/kernel/trace/Kconfig
@@ -393,7 +393,9 @@ config KMEMTRACE
If unsure, say N.

config WORKQUEUE_TRACER
- bool "Trace workqueues"
+# Temporarily disabled during workqueue reimplementation
+# bool "Trace workqueues"
+ def_bool n
select GENERIC_TRACER
help
The workqueue tracer provides some statistical informations
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index d8ecc40..5392939 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -33,8 +33,6 @@
#include <linux/kallsyms.h>
#include <linux/debug_locks.h>
#include <linux/lockdep.h>
-#define CREATE_TRACE_POINTS
-#include <trace/events/workqueue.h>

/*
* Structure fields follow one of the following exclusion rules.
@@ -238,10 +236,10 @@ static inline void set_wq_data(struct work_struct *work,
WORK_STRUCT_PENDING | extra_flags);
}

-static inline
-struct cpu_workqueue_struct *get_wq_data(struct work_struct *work)
+static inline struct cpu_workqueue_struct *get_wq_data(struct work_struct *work)
{
- return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK);
+ return (void *)(atomic_long_read(&work->data) &
+ WORK_STRUCT_WQ_DATA_MASK);
}

/**
@@ -260,8 +258,6 @@ static void insert_work(struct cpu_workqueue_struct *cwq,
struct work_struct *work, struct list_head *head,
unsigned int extra_flags)
{
- trace_workqueue_insertion(cwq->thread, work);
-
/* we own @work, set data and link */
set_wq_data(work, cwq, extra_flags);

@@ -426,7 +422,6 @@ static void process_one_work(struct cpu_workqueue_struct *cwq,
struct lockdep_map lockdep_map = work->lockdep_map;
#endif
/* claim and process */
- trace_workqueue_execution(cwq->thread, work);
debug_work_deactivate(work);
cwq->current_work = work;
list_del_init(&work->entry);
@@ -987,8 +982,6 @@ static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
return PTR_ERR(p);
cwq->thread = p;

- trace_workqueue_creation(cwq->thread, cpu);
-
return 0;
}

@@ -1093,7 +1086,6 @@ static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq)
* checks list_empty(), and a "normal" queue_work() can't use
* a dead CPU.
*/
- trace_workqueue_destruction(cwq->thread);
kthread_stop(cwq->thread);
cwq->thread = NULL;
}
--
1.6.4.2

2009-11-16 17:17:26

by Tejun Heo

[permalink] [raw]
Subject: [PATCH 17/21] workqueue: simple reimplementation of SINGLE_THREAD workqueue

SINGLE_THREAD workqueues are used to reduce the number of worker
threads and ease synchronization. The first reason will be irrelevant
with concurrency managed workqueue implementation. Simplify
SINGLE_THREAD implementation by creating the workqueues the same but
making the worker grab mutex before actually executing works on the
workqueue. In the long run, most SINGLE_THREAD workqueues will be
replaced with generic ones.

Signed-off-by: Tejun Heo <[email protected]>
---
kernel/workqueue.c | 151 ++++++++++++++++++----------------------------------
1 files changed, 52 insertions(+), 99 deletions(-)

diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 5392939..82b03a1 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -33,6 +33,7 @@
#include <linux/kallsyms.h>
#include <linux/debug_locks.h>
#include <linux/lockdep.h>
+#include <linux/mutex.h>

/*
* Structure fields follow one of the following exclusion rules.
@@ -71,6 +72,7 @@ struct workqueue_struct {
struct cpu_workqueue_struct *cpu_wq; /* I: cwq's */
struct list_head list; /* W: list of all workqueues */
const char *name; /* I: workqueue name */
+ struct mutex single_thread_mutex; /* for SINGLE_THREAD wq */
#ifdef CONFIG_LOCKDEP
struct lockdep_map lockdep_map;
#endif
@@ -190,34 +192,9 @@ static inline void debug_work_deactivate(struct work_struct *work) { }
static DEFINE_SPINLOCK(workqueue_lock);
static LIST_HEAD(workqueues);

-static int singlethread_cpu __read_mostly;
-static const struct cpumask *cpu_singlethread_map __read_mostly;
-/*
- * _cpu_down() first removes CPU from cpu_online_map, then CPU_DEAD
- * flushes cwq->worklist. This means that flush_workqueue/wait_on_work
- * which comes in between can't use for_each_online_cpu(). We could
- * use cpu_possible_map, the cpumask below is more a documentation
- * than optimization.
- */
-static cpumask_var_t cpu_populated_map __read_mostly;
-
-/* If it's single threaded, it isn't in the list of workqueues. */
-static inline bool is_wq_single_threaded(struct workqueue_struct *wq)
-{
- return wq->flags & WQ_SINGLE_THREAD;
-}
-
-static const struct cpumask *wq_cpu_map(struct workqueue_struct *wq)
-{
- return is_wq_single_threaded(wq)
- ? cpu_singlethread_map : cpu_populated_map;
-}
-
static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
struct workqueue_struct *wq)
{
- if (unlikely(is_wq_single_threaded(wq)))
- cpu = singlethread_cpu;
return per_cpu_ptr(wq->cpu_wq, cpu);
}

@@ -410,6 +387,8 @@ EXPORT_SYMBOL_GPL(queue_delayed_work_on);
static void process_one_work(struct cpu_workqueue_struct *cwq,
struct work_struct *work)
{
+ struct workqueue_struct *wq = cwq->wq;
+ bool single_thread = wq->flags & WQ_SINGLE_THREAD;
work_func_t f = work->func;
#ifdef CONFIG_LOCKDEP
/*
@@ -430,11 +409,18 @@ static void process_one_work(struct cpu_workqueue_struct *cwq,

BUG_ON(get_wq_data(work) != cwq);
work_clear_pending(work);
- lock_map_acquire(&cwq->wq->lockdep_map);
+ lock_map_acquire(&wq->lockdep_map);
lock_map_acquire(&lockdep_map);
- f(work);
+
+ if (unlikely(single_thread)) {
+ mutex_lock(&wq->single_thread_mutex);
+ f(work);
+ mutex_unlock(&wq->single_thread_mutex);
+ } else
+ f(work);
+
lock_map_release(&lockdep_map);
- lock_map_release(&cwq->wq->lockdep_map);
+ lock_map_release(&wq->lockdep_map);

if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
@@ -569,14 +555,13 @@ static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
*/
void flush_workqueue(struct workqueue_struct *wq)
{
- const struct cpumask *cpu_map = wq_cpu_map(wq);
int cpu;

might_sleep();
lock_map_acquire(&wq->lockdep_map);
lock_map_release(&wq->lockdep_map);
- for_each_cpu(cpu, cpu_map)
- flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
+ for_each_possible_cpu(cpu)
+ flush_cpu_workqueue(get_cwq(cpu, wq));
}
EXPORT_SYMBOL_GPL(flush_workqueue);

@@ -694,7 +679,6 @@ static void wait_on_work(struct work_struct *work)
{
struct cpu_workqueue_struct *cwq;
struct workqueue_struct *wq;
- const struct cpumask *cpu_map;
int cpu;

might_sleep();
@@ -707,9 +691,8 @@ static void wait_on_work(struct work_struct *work)
return;

wq = cwq->wq;
- cpu_map = wq_cpu_map(wq);

- for_each_cpu(cpu, cpu_map)
+ for_each_possible_cpu(cpu)
wait_on_cpu_work(get_cwq(cpu, wq), work);
}

@@ -942,7 +925,7 @@ int current_is_keventd(void)

BUG_ON(!keventd_wq);

- cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu);
+ cwq = get_cwq(cpu, keventd_wq);
if (current == cwq->thread)
ret = 1;

@@ -950,26 +933,12 @@ int current_is_keventd(void)

}

-static struct cpu_workqueue_struct *
-init_cpu_workqueue(struct workqueue_struct *wq, int cpu)
-{
- struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
-
- cwq->wq = wq;
- spin_lock_init(&cwq->lock);
- INIT_LIST_HEAD(&cwq->worklist);
- init_waitqueue_head(&cwq->more_work);
-
- return cwq;
-}
-
static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
{
struct workqueue_struct *wq = cwq->wq;
- const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d";
struct task_struct *p;

- p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);
+ p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
/*
* Nobody can add the work_struct to this cwq,
* if (caller is __create_workqueue)
@@ -1002,7 +971,6 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
const char *lock_name)
{
struct workqueue_struct *wq;
- struct cpu_workqueue_struct *cwq;
int err = 0, cpu;

wq = kzalloc(sizeof(*wq), GFP_KERNEL);
@@ -1015,39 +983,40 @@ struct workqueue_struct *__create_workqueue_key(const char *name,

wq->flags = flags;
wq->name = name;
+ mutex_init(&wq->single_thread_mutex);
lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
INIT_LIST_HEAD(&wq->list);

- if (flags & WQ_SINGLE_THREAD) {
- cwq = init_cpu_workqueue(wq, singlethread_cpu);
- err = create_workqueue_thread(cwq, singlethread_cpu);
- start_workqueue_thread(cwq, -1);
- } else {
- cpu_maps_update_begin();
- /*
- * We must place this wq on list even if the code below fails.
- * cpu_down(cpu) can remove cpu from cpu_populated_map before
- * destroy_workqueue() takes the lock, in that case we leak
- * cwq[cpu]->thread.
- */
- spin_lock(&workqueue_lock);
- list_add(&wq->list, &workqueues);
- spin_unlock(&workqueue_lock);
- /*
- * We must initialize cwqs for each possible cpu even if we
- * are going to call destroy_workqueue() finally. Otherwise
- * cpu_up() can hit the uninitialized cwq once we drop the
- * lock.
- */
- for_each_possible_cpu(cpu) {
- cwq = init_cpu_workqueue(wq, cpu);
- if (err || !cpu_online(cpu))
- continue;
- err = create_workqueue_thread(cwq, cpu);
- start_workqueue_thread(cwq, cpu);
- }
- cpu_maps_update_done();
+ cpu_maps_update_begin();
+ /*
+ * We must place this wq on list even if the code below fails.
+ * cpu_down(cpu) can remove cpu from cpu_populated_map before
+ * destroy_workqueue() takes the lock, in that case we leak
+ * cwq[cpu]->thread.
+ */
+ spin_lock(&workqueue_lock);
+ list_add(&wq->list, &workqueues);
+ spin_unlock(&workqueue_lock);
+ /*
+ * We must initialize cwqs for each possible cpu even if we
+ * are going to call destroy_workqueue() finally. Otherwise
+ * cpu_up() can hit the uninitialized cwq once we drop the
+ * lock.
+ */
+ for_each_possible_cpu(cpu) {
+ struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+
+ cwq->wq = wq;
+ spin_lock_init(&cwq->lock);
+ INIT_LIST_HEAD(&cwq->worklist);
+ init_waitqueue_head(&cwq->more_work);
+
+ if (err || !cpu_online(cpu))
+ continue;
+ err = create_workqueue_thread(cwq, cpu);
+ start_workqueue_thread(cwq, cpu);
}
+ cpu_maps_update_done();

if (err) {
destroy_workqueue(wq);
@@ -1098,7 +1067,6 @@ static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq)
*/
void destroy_workqueue(struct workqueue_struct *wq)
{
- const struct cpumask *cpu_map = wq_cpu_map(wq);
int cpu;

cpu_maps_update_begin();
@@ -1106,8 +1074,8 @@ void destroy_workqueue(struct workqueue_struct *wq)
list_del(&wq->list);
spin_unlock(&workqueue_lock);

- for_each_cpu(cpu, cpu_map)
- cleanup_workqueue_thread(per_cpu_ptr(wq->cpu_wq, cpu));
+ for_each_possible_cpu(cpu)
+ cleanup_workqueue_thread(get_cwq(cpu, wq));
cpu_maps_update_done();

free_percpu(wq->cpu_wq);
@@ -1126,13 +1094,9 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,

action &= ~CPU_TASKS_FROZEN;

- switch (action) {
- case CPU_UP_PREPARE:
- cpumask_set_cpu(cpu, cpu_populated_map);
- }
undo:
list_for_each_entry(wq, &workqueues, list) {
- cwq = per_cpu_ptr(wq->cpu_wq, cpu);
+ cwq = get_cwq(cpu, wq);

switch (action) {
case CPU_UP_PREPARE:
@@ -1156,12 +1120,6 @@ undo:
}
}

- switch (action) {
- case CPU_UP_CANCELED:
- case CPU_POST_DEAD:
- cpumask_clear_cpu(cpu, cpu_populated_map);
- }
-
return ret;
}

@@ -1223,11 +1181,6 @@ void __init init_workqueues(void)
BUILD_BUG_ON(__alignof__(struct cpu_workqueue_struct) <
__alignof__(unsigned long long));

- alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL);
-
- cpumask_copy(cpu_populated_map, cpu_online_mask);
- singlethread_cpu = cpumask_first(cpu_possible_mask);
- cpu_singlethread_map = cpumask_of(singlethread_cpu);
hotcpu_notifier(workqueue_cpu_callback, 0);
keventd_wq = create_workqueue("events");
BUG_ON(!keventd_wq);
--
1.6.4.2

2009-11-16 17:17:22

by Tejun Heo

[permalink] [raw]
Subject: [PATCH 18/21] workqueue: reimplement workqueue flushing using color coded works

Reimplement workqueue flushing using color coded works. wq has the
current work color which is painted on the works being issued via
cwqs. Flushing a workqueue is achieved by advancing the current work
colors of cwqs and waiting for all the works which have any of the
previous colors to drain.

Currently there are 16 colors allowing 15 concurrent flushes. When
color space gets full, flush attempts are batched up and processed
together when color frees up, so even with many concurrent flushers,
the new implementation won't build up huge queue of flushers which has
to be processed one after another.

This new implementation leaves only cleanup_workqueue_thread() as the
user of flush_cpu_workqueue(). Just make its users use
flush_workqueue() and kthread_stop() directly and kill
cleanup_workqueue_thread(). As workqueue flushing doesn't use barrier
request anymore, the comment describing the complex synchronization
around it in cleanup_workqueue_thread() is removed together with the
function.

This new implementation is to allow having and sharing multiple
workers per cpu.

Signed-off-by: Tejun Heo <[email protected]>
---
include/linux/workqueue.h | 13 +-
kernel/workqueue.c | 354 ++++++++++++++++++++++++++++++++++++++-------
2 files changed, 313 insertions(+), 54 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 8e689d1..e1428e5 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -29,12 +29,17 @@ enum {
WORK_STRUCT_PENDING = 1 << WORK_STRUCT_PENDING_BIT,
WORK_STRUCT_STATIC = 1 << WORK_STRUCT_STATIC_BIT,

+ WORK_STRUCT_COLOR_SHIFT = 3, /* color for workqueue flushing */
+ WORK_STRUCT_COLOR_BITS = 4,
+
+ WORK_COLORS = 1 << WORK_STRUCT_COLOR_BITS,
+
/*
- * Reserve 3bits off of cwq pointer. This is enough and
- * provides acceptable alignment on both 32 and 64bit
- * machines.
+ * Reserve 7 bits off of cwq pointer. This makes cwqs aligned
+ * to 128 bytes which isn't too excessive while allowing 16
+ * workqueue flush colors.
*/
- WORK_STRUCT_FLAG_BITS = 3,
+ WORK_STRUCT_FLAG_BITS = 7,

WORK_STRUCT_FLAG_MASK = (1UL << WORK_STRUCT_FLAG_BITS) - 1,
WORK_STRUCT_WQ_DATA_MASK = ~WORK_STRUCT_FLAG_MASK,
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 82b03a1..dce5ad5 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -42,6 +42,8 @@
*
* L: cwq->lock protected. Access with cwq->lock held.
*
+ * F: wq->flush_mutex protected.
+ *
* W: workqueue_lock protected.
*/

@@ -60,10 +62,23 @@ struct cpu_workqueue_struct {
struct work_struct *current_work;

struct workqueue_struct *wq; /* I: the owning workqueue */
+ int work_color; /* L: current color */
+ int flush_color; /* L: flushing color */
+ int nr_in_flight[WORK_COLORS];
+ /* L: nr of in_flight works */
struct task_struct *thread;
} __attribute__((aligned(1 << WORK_STRUCT_FLAG_BITS)));

/*
+ * Structure used to wait for workqueue flush.
+ */
+struct wq_flusher {
+ struct list_head list; /* F: list of flushers */
+ int flush_color; /* F: flush color waiting for */
+ struct completion done; /* flush completion */
+};
+
+/*
* The externally visible workqueue abstraction is an array of
* per-CPU workqueues:
*/
@@ -71,6 +86,15 @@ struct workqueue_struct {
unsigned int flags; /* I: WQ_* flags */
struct cpu_workqueue_struct *cpu_wq; /* I: cwq's */
struct list_head list; /* W: list of all workqueues */
+
+ struct mutex flush_mutex; /* protects wq flushing */
+ int work_color; /* F: current work color */
+ int flush_color; /* F: current flush color */
+ atomic_t nr_cwqs_to_flush; /* flush in progress */
+ struct wq_flusher *first_flusher; /* F: first flusher */
+ struct list_head flusher_queue; /* F: flush waiters */
+ struct list_head flusher_overflow; /* F: flush overflow list */
+
const char *name; /* I: workqueue name */
struct mutex single_thread_mutex; /* for SINGLE_THREAD wq */
#ifdef CONFIG_LOCKDEP
@@ -198,6 +222,22 @@ static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
return per_cpu_ptr(wq->cpu_wq, cpu);
}

+static unsigned int work_color_to_flags(int color)
+{
+ return color << WORK_STRUCT_COLOR_SHIFT;
+}
+
+static int work_flags_to_color(unsigned int flags)
+{
+ return (flags >> WORK_STRUCT_COLOR_SHIFT) &
+ ((1 << WORK_STRUCT_COLOR_BITS) - 1);
+}
+
+static int work_next_color(int color)
+{
+ return (color + 1) % WORK_COLORS;
+}
+
/*
* Set the workqueue on which a work item is to be run
* - Must *only* be called if the pending flag is set
@@ -235,8 +275,11 @@ static void insert_work(struct cpu_workqueue_struct *cwq,
struct work_struct *work, struct list_head *head,
unsigned int extra_flags)
{
+ cwq->nr_in_flight[cwq->work_color]++;
+
/* we own @work, set data and link */
- set_wq_data(work, cwq, extra_flags);
+ set_wq_data(work, cwq,
+ work_color_to_flags(cwq->work_color) | extra_flags);

/*
* Ensure that we get the right work->data if we see the
@@ -371,6 +414,41 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
EXPORT_SYMBOL_GPL(queue_delayed_work_on);

/**
+ * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
+ * @cwq: cwq of interest
+ * @color: color of work which left the queue
+ *
+ * A work either has completed or is removed from pending queue,
+ * decrement nr_in_flight of its cwq and handle workqueue flushing.
+ *
+ * CONTEXT:
+ * spin_lock_irq(cwq->lock).
+ */
+static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq,
+ unsigned int color)
+{
+ cwq->nr_in_flight[color]--;
+
+ /* is flush in progress and are we at the flushing tip? */
+ if (likely(cwq->flush_color != color))
+ return;
+
+ /* are there still in-flight works? */
+ if (cwq->nr_in_flight[color])
+ return;
+
+ /* this cwq is done, clear flush_color */
+ cwq->flush_color = -1;
+
+ /*
+ * If this was the last cwq, wake up the first flusher. It
+ * will handle the rest.
+ */
+ if (atomic_dec_and_test(&cwq->wq->nr_cwqs_to_flush))
+ complete(&cwq->wq->first_flusher->done);
+}
+
+/**
* process_one_work - process single work
* @cwq: cwq to process work for
* @work: work to process
@@ -390,6 +468,7 @@ static void process_one_work(struct cpu_workqueue_struct *cwq,
struct workqueue_struct *wq = cwq->wq;
bool single_thread = wq->flags & WQ_SINGLE_THREAD;
work_func_t f = work->func;
+ int work_color;
#ifdef CONFIG_LOCKDEP
/*
* It is permissible to free the struct work_struct from
@@ -403,6 +482,7 @@ static void process_one_work(struct cpu_workqueue_struct *cwq,
/* claim and process */
debug_work_deactivate(work);
cwq->current_work = work;
+ work_color = work_flags_to_color(*work_data_bits(work));
list_del_init(&work->entry);

spin_unlock_irq(&cwq->lock);
@@ -436,6 +516,7 @@ static void process_one_work(struct cpu_workqueue_struct *cwq,

/* we're done with it, release */
cwq->current_work = NULL;
+ cwq_dec_nr_in_flight(cwq, work_color);
}

static void run_workqueue(struct cpu_workqueue_struct *cwq)
@@ -521,26 +602,69 @@ static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
insert_work(cwq, &barr->work, head, 0);
}

-static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
+/**
+ * flush_workqueue_prep_cwqs - prepare cwqs for workqueue flushing
+ * @wq: workqueue being flushed
+ * @flush_color: new flush color, < 0 for no-op
+ * @work_color: new work color, < 0 for no-op
+ *
+ * Prepare cwqs for workqueue flushing.
+ *
+ * If @flush_color is non-negative, flush_color on all cwqs should be
+ * -1. If no cwq has in-flight commands at the specified color, all
+ * cwq->flush_color's stay at -1 and %false is returned. If any cwq
+ * has in flight commands, its cwq->flush_color is set to
+ * @flush_color, @wq->nr_cwqs_to_flush is updated accordingly, cwq
+ * wakeup logic is armed and %true is returned.
+ *
+ * The caller should have initialized @wq->first_flusher prior to
+ * calling this function with non-negative @flush_color. If
+ * @flush_color is negative, no flush color update is done and %false
+ * is returned.
+ *
+ * If @work_color is non-negative, all cwqs should have the same
+ * work_color which is previous to @work_color and all will be
+ * advanced to @work_color.
+ *
+ * CONTEXT:
+ * mutex_lock(wq->flush_mutex).
+ *
+ * RETURNS:
+ * %true if @flush_color >= 0 and wakeup logic is armed. %false
+ * otherwise.
+ */
+static bool flush_workqueue_prep_cwqs(struct workqueue_struct *wq,
+ int flush_color, int work_color)
{
- int active = 0;
- struct wq_barrier barr;
+ bool wait = false;
+ unsigned int cpu;

- WARN_ON(cwq->thread == current);
+ BUG_ON(flush_color >= 0 && atomic_read(&wq->nr_cwqs_to_flush));

- spin_lock_irq(&cwq->lock);
- if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) {
- insert_wq_barrier(cwq, &barr, &cwq->worklist);
- active = 1;
- }
- spin_unlock_irq(&cwq->lock);
+ for_each_possible_cpu(cpu) {
+ struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);

- if (active) {
- wait_for_completion(&barr.done);
- destroy_work_on_stack(&barr.work);
+ spin_lock_irq(&cwq->lock);
+
+ if (flush_color >= 0) {
+ BUG_ON(cwq->flush_color != -1);
+
+ if (cwq->nr_in_flight[flush_color]) {
+ cwq->flush_color = flush_color;
+ atomic_inc(&wq->nr_cwqs_to_flush);
+ wait = true;
+ }
+ }
+
+ if (work_color >= 0) {
+ BUG_ON(work_color != work_next_color(cwq->work_color));
+ cwq->work_color = work_color;
+ }
+
+ spin_unlock_irq(&cwq->lock);
}

- return active;
+ return wait;
}

/**
@@ -555,13 +679,144 @@ static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
*/
void flush_workqueue(struct workqueue_struct *wq)
{
- int cpu;
+ struct wq_flusher this_flusher = {
+ .list = LIST_HEAD_INIT(this_flusher.list),
+ .flush_color = -1,
+ .done = COMPLETION_INITIALIZER_ONSTACK(this_flusher.done),
+ };
+ int next_color;

- might_sleep();
lock_map_acquire(&wq->lockdep_map);
lock_map_release(&wq->lockdep_map);
- for_each_possible_cpu(cpu)
- flush_cpu_workqueue(get_cwq(cpu, wq));
+
+ mutex_lock(&wq->flush_mutex);
+
+ /*
+ * Start-to-wait phase
+ */
+ next_color = work_next_color(wq->work_color);
+
+ if (next_color != wq->flush_color) {
+ /*
+ * Color space is not full. The current work_color
+ * becomes our flush_color and work_color is advanced
+ * by one.
+ */
+ BUG_ON(!list_empty(&wq->flusher_overflow));
+ this_flusher.flush_color = wq->work_color;
+ wq->work_color = next_color;
+
+ if (!wq->first_flusher) {
+ /* no flush in progress, become the first flusher */
+ BUG_ON(wq->flush_color != this_flusher.flush_color);
+
+ wq->first_flusher = &this_flusher;
+
+ if (!flush_workqueue_prep_cwqs(wq, wq->flush_color,
+ wq->work_color)) {
+ /* nothing to flush, done */
+ wq->flush_color = next_color;
+ wq->first_flusher = NULL;
+ goto out_unlock;
+ }
+ } else {
+ /* wait in queue */
+ BUG_ON(wq->flush_color == this_flusher.flush_color);
+ list_add_tail(&this_flusher.list, &wq->flusher_queue);
+ flush_workqueue_prep_cwqs(wq, -1, wq->work_color);
+ }
+ } else {
+ /*
+ * Oops, color space is full, wait on overflow queue.
+ * The next flush completion will assign us
+ * flush_color and transfer to flusher_queue.
+ */
+ list_add_tail(&this_flusher.list, &wq->flusher_overflow);
+ }
+
+ mutex_unlock(&wq->flush_mutex);
+
+ wait_for_completion(&this_flusher.done);
+
+ /*
+ * Wake-up-and-cascade phase
+ *
+ * First flushers are responsible for cascading flushes and
+ * handling overflow. Non-first flushers can simply return.
+ */
+ if (wq->first_flusher != &this_flusher)
+ return;
+
+ mutex_lock(&wq->flush_mutex);
+
+ wq->first_flusher = NULL;
+
+ BUG_ON(!list_empty(&this_flusher.list));
+ BUG_ON(wq->flush_color != this_flusher.flush_color);
+
+ while (true) {
+ struct wq_flusher *next, *tmp;
+
+ /* complete all the flushers sharing the current flush color */
+ list_for_each_entry_safe(next, tmp, &wq->flusher_queue, list) {
+ if (next->flush_color != wq->flush_color)
+ break;
+ list_del_init(&next->list);
+ complete(&next->done);
+ }
+
+ BUG_ON(!list_empty(&wq->flusher_overflow) &&
+ wq->flush_color != work_next_color(wq->work_color));
+
+ /* this flush_color is finished, advance by one */
+ wq->flush_color = work_next_color(wq->flush_color);
+
+ /* one color has been freed, handle overflow queue */
+ if (!list_empty(&wq->flusher_overflow)) {
+ /*
+ * Assign the same color to all overflowed
+ * flushers, advance work_color and append to
+ * flusher_queue. This is the start-to-wait
+ * phase for these overflowed flushers.
+ */
+ list_for_each_entry(tmp, &wq->flusher_overflow, list)
+ tmp->flush_color = wq->work_color;
+
+ wq->work_color = work_next_color(wq->work_color);
+
+ list_splice_tail_init(&wq->flusher_overflow,
+ &wq->flusher_queue);
+ flush_workqueue_prep_cwqs(wq, -1, wq->work_color);
+ }
+
+ if (list_empty(&wq->flusher_queue)) {
+ BUG_ON(wq->flush_color != wq->work_color);
+ break;
+ }
+
+ /*
+ * Need to flush more colors. Make the next flusher
+ * the new first flusher and arm cwqs.
+ */
+ BUG_ON(wq->flush_color == wq->work_color);
+ BUG_ON(wq->flush_color != next->flush_color);
+
+ list_del_init(&next->list);
+ wq->first_flusher = next;
+
+ if (flush_workqueue_prep_cwqs(wq, wq->flush_color, -1))
+ break;
+
+ /*
+ * Meh... this color is already done, clear first
+ * flusher and repeat cascading.
+ */
+ wq->first_flusher = NULL;
+ complete(&next->done);
+ }
+
+out_unlock:
+ mutex_unlock(&wq->flush_mutex);
}
EXPORT_SYMBOL_GPL(flush_workqueue);

@@ -648,6 +903,8 @@ static int try_to_grab_pending(struct work_struct *work)
if (cwq == get_wq_data(work)) {
debug_work_deactivate(work);
list_del_init(&work->entry);
+ cwq_dec_nr_in_flight(cwq,
+ work_flags_to_color(*work_data_bits(work)));
ret = 1;
}
}
@@ -982,6 +1239,10 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
goto err;

wq->flags = flags;
+ mutex_init(&wq->flush_mutex);
+ atomic_set(&wq->nr_cwqs_to_flush, 0);
+ INIT_LIST_HEAD(&wq->flusher_queue);
+ INIT_LIST_HEAD(&wq->flusher_overflow);
wq->name = name;
mutex_init(&wq->single_thread_mutex);
lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
@@ -1007,6 +1268,7 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);

cwq->wq = wq;
+ cwq->flush_color = -1;
spin_lock_init(&cwq->lock);
INIT_LIST_HEAD(&cwq->worklist);
init_waitqueue_head(&cwq->more_work);
@@ -1032,33 +1294,6 @@ err:
}
EXPORT_SYMBOL_GPL(__create_workqueue_key);

-static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq)
-{
- /*
- * Our caller is either destroy_workqueue() or CPU_POST_DEAD,
- * cpu_add_remove_lock protects cwq->thread.
- */
- if (cwq->thread == NULL)
- return;
-
- lock_map_acquire(&cwq->wq->lockdep_map);
- lock_map_release(&cwq->wq->lockdep_map);
-
- flush_cpu_workqueue(cwq);
- /*
- * If the caller is CPU_POST_DEAD and cwq->worklist was not empty,
- * a concurrent flush_workqueue() can insert a barrier after us.
- * However, in that case run_workqueue() won't return and check
- * kthread_should_stop() until it flushes all work_struct's.
- * When ->worklist becomes empty it is safe to exit because no
- * more work_structs can be queued on this cwq: flush_workqueue
- * checks list_empty(), and a "normal" queue_work() can't use
- * a dead CPU.
- */
- kthread_stop(cwq->thread);
- cwq->thread = NULL;
-}
-
/**
* destroy_workqueue - safely terminate a workqueue
* @wq: target workqueue
@@ -1074,9 +1309,23 @@ void destroy_workqueue(struct workqueue_struct *wq)
list_del(&wq->list);
spin_unlock(&workqueue_lock);

- for_each_possible_cpu(cpu)
- cleanup_workqueue_thread(get_cwq(cpu, wq));
- cpu_maps_update_done();
+ flush_workqueue(wq);
+
+ for_each_possible_cpu(cpu) {
+ struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+ int i;
+
+ /* cpu_add_remove_lock protects cwq->thread */
+ if (cwq->thread) {
+ kthread_stop(cwq->thread);
+ cwq->thread = NULL;
+ }
+
+ for (i = 0; i < WORK_COLORS; i++)
+ BUG_ON(cwq->nr_in_flight[i]);
+ }
+
+ cpu_maps_update_done();

free_percpu(wq->cpu_wq);
kfree(wq);
@@ -1115,7 +1364,12 @@ undo:
case CPU_UP_CANCELED:
start_workqueue_thread(cwq, -1);
case CPU_POST_DEAD:
- cleanup_workqueue_thread(cwq);
+ flush_workqueue(wq);
+ /* cpu_add_remove_lock protects cwq->thread */
+ if (cwq->thread) {
+ kthread_stop(cwq->thread);
+ cwq->thread = NULL;
+ }
break;
}
}
--
1.6.4.2

2009-11-16 17:18:42

by Tejun Heo

[permalink] [raw]
Subject: [PATCH 19/21] workqueue: introduce worker

Separate out worker thread related information to struct worker from
struct cpu_workqueue_struct and implement helper functions to deal
with the new struct worker. The only change which is visible outside
is that now workqueue worker are all named "kworker/CPUID:WORKERID"
where WORKERID is allocated from per-cpu ida.

This is in preparation of concurrency managed workqueue where shared
multiple workers would be available per cpu.

Signed-off-by: Tejun Heo <[email protected]>
---
kernel/workqueue.c | 219 +++++++++++++++++++++++++++++++++++++---------------
1 files changed, 157 insertions(+), 62 deletions(-)

diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index dce5ad5..6694b3e 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -34,6 +34,7 @@
#include <linux/debug_locks.h>
#include <linux/lockdep.h>
#include <linux/mutex.h>
+#include <linux/idr.h>

/*
* Structure fields follow one of the following exclusion rules.
@@ -47,6 +48,15 @@
* W: workqueue_lock protected.
*/

+struct cpu_workqueue_struct;
+
+struct worker {
+ struct work_struct *current_work; /* L: work being processed */
+ struct task_struct *task; /* I: worker task */
+ struct cpu_workqueue_struct *cwq; /* I: the associated cwq */
+ int id; /* I: worker id */
+};
+
/*
* The per-CPU workqueue (if single thread, we always use the first
* possible cpu). The lower WORK_STRUCT_FLAG_BITS of
@@ -59,14 +69,14 @@ struct cpu_workqueue_struct {

struct list_head worklist;
wait_queue_head_t more_work;
- struct work_struct *current_work;
+ unsigned int cpu;
+ struct worker *worker;

struct workqueue_struct *wq; /* I: the owning workqueue */
int work_color; /* L: current color */
int flush_color; /* L: flushing color */
int nr_in_flight[WORK_COLORS];
/* L: nr of in_flight works */
- struct task_struct *thread;
} __attribute__((aligned(1 << WORK_STRUCT_FLAG_BITS)));

/*
@@ -215,6 +225,9 @@ static inline void debug_work_deactivate(struct work_struct *work) { }
/* Serializes the accesses to the list of workqueues. */
static DEFINE_SPINLOCK(workqueue_lock);
static LIST_HEAD(workqueues);
+static DEFINE_PER_CPU(struct ida, worker_ida);
+
+static int worker_thread(void *__worker);

static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
struct workqueue_struct *wq)
@@ -413,6 +426,105 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
}
EXPORT_SYMBOL_GPL(queue_delayed_work_on);

+static struct worker *alloc_worker(void)
+{
+ struct worker *worker;
+
+ worker = kzalloc(sizeof(*worker), GFP_KERNEL);
+ return worker;
+}
+
+/**
+ * create_worker - create a new workqueue worker
+ * @cwq: cwq the new worker will belong to
+ * @bind: whether to set affinity to @cpu or not
+ *
+ * Create a new worker which is bound to @cwq. The returned worker
+ * can be started by calling start_worker() or destroyed using
+ * destroy_worker().
+ *
+ * CONTEXT:
+ * Might sleep. Does GFP_KERNEL allocations.
+ *
+ * RETURNS:
+ * Pointer to the newly created worker.
+ */
+static struct worker *create_worker(struct cpu_workqueue_struct *cwq, bool bind)
+{
+ int id = -1;
+ struct worker *worker = NULL;
+
+ spin_lock(&workqueue_lock);
+ while (ida_get_new(&per_cpu(worker_ida, cwq->cpu), &id)) {
+ spin_unlock_irq(&workqueue_lock);
+ if (!ida_pre_get(&per_cpu(worker_ida, cwq->cpu), GFP_KERNEL))
+ goto fail;
+ spin_lock(&workqueue_lock);
+ }
+ spin_unlock_irq(&workqueue_lock);
+
+ worker = alloc_worker();
+ if (!worker)
+ goto fail;
+
+ worker->cwq = cwq;
+ worker->id = id;
+
+ worker->task = kthread_create(worker_thread, worker, "kworker/%u:%d",
+ cwq->cpu, id);
+ if (IS_ERR(worker->task))
+ goto fail;
+
+ if (bind)
+ kthread_bind(worker->task, cwq->cpu);
+
+ return worker;
+fail:
+ if (id >= 0) {
+ spin_lock(&workqueue_lock);
+ ida_remove(&per_cpu(worker_ida, cwq->cpu), id);
+ spin_unlock_irq(&workqueue_lock);
+ }
+ kfree(worker);
+ return NULL;
+}
+
+/**
+ * start_worker - start a newly created worker
+ * @worker: worker to start
+ *
+ * Start @worker.
+ *
+ * CONTEXT:
+ * spin_lock_irq(cwq->lock).
+ */
+static void start_worker(struct worker *worker)
+{
+ wake_up_process(worker->task);
+}
+
+/**
+ * destroy_worker - destroy a workqueue worker
+ * @worker: worker to be destroyed
+ *
+ * Destroy @worker.
+ */
+static void destroy_worker(struct worker *worker)
+{
+ int cpu = worker->cwq->cpu;
+ int id = worker->id;
+
+ /* sanity check frenzy */
+ BUG_ON(worker->current_work);
+
+ kthread_stop(worker->task);
+ kfree(worker);
+
+ spin_lock_irq(&workqueue_lock);
+ ida_remove(&per_cpu(worker_ida, cpu), id);
+ spin_unlock_irq(&workqueue_lock);
+}
+
/**
* cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
* @cwq: cwq of interest
@@ -450,7 +562,7 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq,

/**
* process_one_work - process single work
- * @cwq: cwq to process work for
+ * @worker: self
* @work: work to process
*
* Process @work. This function contains all the logics necessary to
@@ -462,9 +574,9 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq,
* CONTEXT:
* spin_lock_irq(cwq->lock) which is released and regrabbed.
*/
-static void process_one_work(struct cpu_workqueue_struct *cwq,
- struct work_struct *work)
+static void process_one_work(struct worker *worker, struct work_struct *work)
{
+ struct cpu_workqueue_struct *cwq = worker->cwq;
struct workqueue_struct *wq = cwq->wq;
bool single_thread = wq->flags & WQ_SINGLE_THREAD;
work_func_t f = work->func;
@@ -481,7 +593,7 @@ static void process_one_work(struct cpu_workqueue_struct *cwq,
#endif
/* claim and process */
debug_work_deactivate(work);
- cwq->current_work = work;
+ worker->current_work = work;
work_color = work_flags_to_color(*work_data_bits(work));
list_del_init(&work->entry);

@@ -515,30 +627,33 @@ static void process_one_work(struct cpu_workqueue_struct *cwq,
spin_lock_irq(&cwq->lock);

/* we're done with it, release */
- cwq->current_work = NULL;
+ worker->current_work = NULL;
cwq_dec_nr_in_flight(cwq, work_color);
}

-static void run_workqueue(struct cpu_workqueue_struct *cwq)
+static void run_workqueue(struct worker *worker)
{
+ struct cpu_workqueue_struct *cwq = worker->cwq;
+
spin_lock_irq(&cwq->lock);
while (!list_empty(&cwq->worklist)) {
struct work_struct *work = list_entry(cwq->worklist.next,
struct work_struct, entry);
- process_one_work(cwq, work);
+ process_one_work(worker, work);
}
spin_unlock_irq(&cwq->lock);
}

/**
* worker_thread - the worker thread function
- * @__cwq: cwq to serve
+ * @__worker: self
*
* The cwq worker thread function.
*/
-static int worker_thread(void *__cwq)
+static int worker_thread(void *__worker)
{
- struct cpu_workqueue_struct *cwq = __cwq;
+ struct worker *worker = __worker;
+ struct cpu_workqueue_struct *cwq = worker->cwq;
DEFINE_WAIT(wait);

if (cwq->wq->flags & WQ_FREEZEABLE)
@@ -557,7 +672,7 @@ static int worker_thread(void *__cwq)
if (kthread_should_stop())
break;

- run_workqueue(cwq);
+ run_workqueue(worker);
}

return 0;
@@ -855,7 +970,7 @@ int flush_work(struct work_struct *work)
goto already_gone;
prev = &work->entry;
} else {
- if (cwq->current_work != work)
+ if (!cwq->worker || cwq->worker->current_work != work)
goto already_gone;
prev = &cwq->worklist;
}
@@ -920,7 +1035,7 @@ static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
int running = 0;

spin_lock_irq(&cwq->lock);
- if (unlikely(cwq->current_work == work)) {
+ if (unlikely(cwq->worker && cwq->worker->current_work == work)) {
insert_wq_barrier(cwq, &barr, cwq->worklist.next);
running = 1;
}
@@ -1183,52 +1298,21 @@ int current_is_keventd(void)
BUG_ON(!keventd_wq);

cwq = get_cwq(cpu, keventd_wq);
- if (current == cwq->thread)
+ if (current == cwq->worker->task)
ret = 1;

return ret;

}

-static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
-{
- struct workqueue_struct *wq = cwq->wq;
- struct task_struct *p;
-
- p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
- /*
- * Nobody can add the work_struct to this cwq,
- * if (caller is __create_workqueue)
- * nobody should see this wq
- * else // caller is CPU_UP_PREPARE
- * cpu is not on cpu_online_map
- * so we can abort safely.
- */
- if (IS_ERR(p))
- return PTR_ERR(p);
- cwq->thread = p;
-
- return 0;
-}
-
-static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
-{
- struct task_struct *p = cwq->thread;
-
- if (p != NULL) {
- if (cpu >= 0)
- kthread_bind(p, cpu);
- wake_up_process(p);
- }
-}
-
struct workqueue_struct *__create_workqueue_key(const char *name,
unsigned int flags,
struct lock_class_key *key,
const char *lock_name)
{
struct workqueue_struct *wq;
- int err = 0, cpu;
+ bool failed = false;
+ unsigned int cpu;

wq = kzalloc(sizeof(*wq), GFP_KERNEL);
if (!wq)
@@ -1267,20 +1351,24 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
for_each_possible_cpu(cpu) {
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);

+ cwq->cpu = cpu;
cwq->wq = wq;
cwq->flush_color = -1;
spin_lock_init(&cwq->lock);
INIT_LIST_HEAD(&cwq->worklist);
init_waitqueue_head(&cwq->more_work);

- if (err || !cpu_online(cpu))
+ if (failed || !cpu_online(cpu))
continue;
- err = create_workqueue_thread(cwq, cpu);
- start_workqueue_thread(cwq, cpu);
+ cwq->worker = create_worker(cwq, true);
+ if (cwq->worker)
+ start_worker(cwq->worker);
+ else
+ failed = true;
}
cpu_maps_update_done();

- if (err) {
+ if (failed) {
destroy_workqueue(wq);
wq = NULL;
}
@@ -1316,9 +1404,9 @@ void destroy_workqueue(struct workqueue_struct *wq)
int i;

/* cpu_add_remove_lock protects cwq->thread */
- if (cwq->thread) {
- kthread_stop(cwq->thread);
- cwq->thread = NULL;
+ if (cwq->worker) {
+ destroy_worker(cwq->worker);
+ cwq->worker = NULL;
}

for (i = 0; i < WORK_COLORS; i++)
@@ -1349,7 +1437,8 @@ undo:

switch (action) {
case CPU_UP_PREPARE:
- if (!create_workqueue_thread(cwq, cpu))
+ cwq->worker = create_worker(cwq, false);
+ if (cwq->worker)
break;
printk(KERN_ERR "workqueue [%s] for %i failed\n",
wq->name, cpu);
@@ -1358,17 +1447,18 @@ undo:
goto undo;

case CPU_ONLINE:
- start_workqueue_thread(cwq, cpu);
+ kthread_bind(cwq->worker->task, cpu);
+ start_worker(cwq->worker);
break;

case CPU_UP_CANCELED:
- start_workqueue_thread(cwq, -1);
+ start_worker(cwq->worker);
case CPU_POST_DEAD:
flush_workqueue(wq);
/* cpu_add_remove_lock protects cwq->thread */
- if (cwq->thread) {
- kthread_stop(cwq->thread);
- cwq->thread = NULL;
+ if (cwq->worker) {
+ destroy_worker(cwq->worker);
+ cwq->worker = NULL;
}
break;
}
@@ -1426,6 +1516,8 @@ EXPORT_SYMBOL_GPL(work_on_cpu);

void __init init_workqueues(void)
{
+ unsigned int cpu;
+
/*
* cwqs are forced aligned according to WORK_STRUCT_FLAG_BITS.
* Make sure that the alignment isn't lower than that of
@@ -1435,6 +1527,9 @@ void __init init_workqueues(void)
BUILD_BUG_ON(__alignof__(struct cpu_workqueue_struct) <
__alignof__(unsigned long long));

+ for_each_possible_cpu(cpu)
+ ida_init(&per_cpu(worker_ida, cpu));
+
hotcpu_notifier(workqueue_cpu_callback, 0);
keventd_wq = create_workqueue("events");
BUG_ON(!keventd_wq);
--
1.6.4.2

2009-11-16 17:18:40

by Tejun Heo

[permalink] [raw]
Subject: [PATCH 20/21] workqueue: reimplement work flushing using linked works

A work is linked to the next one by having WORK_STRUCT_LINKED bit set
and these links can be chained. When a linked work is dispatched to a
worker, all linked works are dispatched to the worker's newly added
->scheduled queue and processed back-to-back.

Currently, as there's only single worker per cwq, having linked works
doesn't make any visible behavior difference. This change is to
prepare for multiple shared workers per cpu.

Signed-off-by: Tejun Heo <[email protected]>
---
include/linux/workqueue.h | 2 +
kernel/workqueue.c | 151 ++++++++++++++++++++++++++++++++++++++------
2 files changed, 132 insertions(+), 21 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index e1428e5..53d1410 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -25,9 +25,11 @@ typedef void (*work_func_t)(struct work_struct *work);
enum {
WORK_STRUCT_PENDING_BIT = 0, /* work item is pending execution */
WORK_STRUCT_STATIC_BIT = 1, /* static initializer (debugobjects) */
+ WORK_STRUCT_LINKED_BIT = 2, /* next work is linked to this one */

WORK_STRUCT_PENDING = 1 << WORK_STRUCT_PENDING_BIT,
WORK_STRUCT_STATIC = 1 << WORK_STRUCT_STATIC_BIT,
+ WORK_STRUCT_LINKED = 1 << WORK_STRUCT_LINKED_BIT,

WORK_STRUCT_COLOR_SHIFT = 3, /* color for workqueue flushing */
WORK_STRUCT_COLOR_BITS = 4,
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 6694b3e..3029bb2 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -52,6 +52,7 @@ struct cpu_workqueue_struct;

struct worker {
struct work_struct *current_work; /* L: work being processed */
+ struct list_head scheduled; /* L: scheduled works */
struct task_struct *task; /* I: worker task */
struct cpu_workqueue_struct *cwq; /* I: the associated cwq */
int id; /* I: worker id */
@@ -431,6 +432,8 @@ static struct worker *alloc_worker(void)
struct worker *worker;

worker = kzalloc(sizeof(*worker), GFP_KERNEL);
+ if (worker)
+ INIT_LIST_HEAD(&worker->scheduled);
return worker;
}

@@ -516,6 +519,7 @@ static void destroy_worker(struct worker *worker)

/* sanity check frenzy */
BUG_ON(worker->current_work);
+ BUG_ON(!list_empty(&worker->scheduled));

kthread_stop(worker->task);
kfree(worker);
@@ -526,6 +530,49 @@ static void destroy_worker(struct worker *worker)
}

/**
+ * schedule_work_to_worker - schedule linked works to a worker
+ * @worker: target worker
+ * @work: start of series of works to be scheduled
+ * @nextp: out paramter for nested worklist walking
+ *
+ * Schedule linked works starting from @work to @worker. Work series
+ * to be scheduled starts at @work and includes any consecutive work
+ * with WORK_STRUCT_LINKED set in its predecessor.
+ *
+ * If @nextp is not NULL, it's updated to point to the next work of
+ * the last scheduled work. This allows schedule_work_to_worker() to
+ * be nested inside outer list_for_each_entry_safe().
+ *
+ * CONTEXT:
+ * spin_lock_irq(cwq->lock).
+ */
+static void schedule_work_to_worker(struct worker *worker,
+ struct work_struct *work,
+ struct work_struct **nextp)
+{
+ struct work_struct *n;
+
+ /*
+ * Linked worklist will always end before the end of the list,
+ * use NULL for list head.
+ */
+ work = list_entry(work->entry.prev, struct work_struct, entry);
+ list_for_each_entry_safe_continue(work, n, NULL, entry) {
+ list_move_tail(&work->entry, &worker->scheduled);
+ if (!(*work_data_bits(work) & WORK_STRUCT_LINKED))
+ break;
+ }
+
+ /*
+ * If we're already inside safe list traversal and have moved
+ * multiple works to the scheduled queue, the next position
+ * needs to be updated.
+ */
+ if (nextp)
+ *nextp = n;
+}
+
+/**
* cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
* @cwq: cwq of interest
* @color: color of work which left the queue
@@ -631,17 +678,25 @@ static void process_one_work(struct worker *worker, struct work_struct *work)
cwq_dec_nr_in_flight(cwq, work_color);
}

-static void run_workqueue(struct worker *worker)
+/**
+ * process_scheduled_works - process scheduled works
+ * @worker: self
+ *
+ * Process all scheduled works. Please note that the scheduled list
+ * may change while processing a work, so this function repeatedly
+ * fetches a work from the top and executes it.
+ *
+ * CONTEXT:
+ * spin_lock_irq(cwq->lock) which may be released and regrabbed
+ * multiple times.
+ */
+static void process_scheduled_works(struct worker *worker)
{
- struct cpu_workqueue_struct *cwq = worker->cwq;
-
- spin_lock_irq(&cwq->lock);
- while (!list_empty(&cwq->worklist)) {
- struct work_struct *work = list_entry(cwq->worklist.next,
+ while (!list_empty(&worker->scheduled)) {
+ struct work_struct *work = list_first_entry(&worker->scheduled,
struct work_struct, entry);
process_one_work(worker, work);
}
- spin_unlock_irq(&cwq->lock);
}

/**
@@ -672,7 +727,26 @@ static int worker_thread(void *__worker)
if (kthread_should_stop())
break;

- run_workqueue(worker);
+ spin_lock_irq(&cwq->lock);
+
+ while (!list_empty(&cwq->worklist)) {
+ struct work_struct *work =
+ list_first_entry(&cwq->worklist,
+ struct work_struct, entry);
+
+ if (likely(!(*work_data_bits(work) &
+ WORK_STRUCT_LINKED))) {
+ /* optimization path, not strictly necessary */
+ process_one_work(worker, work);
+ if (unlikely(!list_empty(&worker->scheduled)))
+ process_scheduled_works(worker);
+ } else {
+ schedule_work_to_worker(worker, work, NULL);
+ process_scheduled_works(worker);
+ }
+ }
+
+ spin_unlock_irq(&cwq->lock);
}

return 0;
@@ -693,16 +767,33 @@ static void wq_barrier_func(struct work_struct *work)
* insert_wq_barrier - insert a barrier work
* @cwq: cwq to insert barrier into
* @barr: wq_barrier to insert
- * @head: insertion point
+ * @target: target work to attach @barr to
+ * @worker: worker currently executing @target, NULL if @target is not executing
*
- * Insert barrier @barr into @cwq before @head.
+ * @barr is linked to @target such that @barr is completed only after
+ * @target finishes execution. Please note that the ordering
+ * guarantee is observed only with respect to @target and on the local
+ * cpu.
+ *
+ * Currently, a queued barrier can't be canceled. This is because
+ * try_to_grab_pending() can't determine whether the work to be
+ * grabbed is at the head of the queue and thus can't clear LINKED
+ * flag of the previous work while there must be a valid next work
+ * after a work with LINKED flag set.
+ *
+ * Note that when @worker is non-NULL, @target may be modified
+ * underneath us, so we can't reliably determine cwq from @target.
*
* CONTEXT:
* spin_lock_irq(cwq->lock).
*/
static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
- struct wq_barrier *barr, struct list_head *head)
+ struct wq_barrier *barr,
+ struct work_struct *target, struct worker *worker)
{
+ struct list_head *head;
+ unsigned int linked = 0;
+
/*
* debugobject calls are safe here even with cwq->lock locked
* as we know for sure that this will not trigger any of the
@@ -713,8 +804,23 @@ static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
__set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
init_completion(&barr->done);

+ /*
+ * If @target is currently being executed, schedule the
+ * barrier to the worker; otherwise, put it after @target.
+ */
+ if (worker)
+ head = worker->scheduled.next;
+ else {
+ unsigned long *bits = work_data_bits(target);
+
+ head = target->entry.next;
+ /* there can already be other linked works, inherit and set */
+ linked = *bits & WORK_STRUCT_LINKED;
+ *bits |= WORK_STRUCT_LINKED;
+ }
+
debug_work_activate(&barr->work);
- insert_work(cwq, &barr->work, head, 0);
+ insert_work(cwq, &barr->work, head, linked);
}

/**
@@ -947,8 +1053,8 @@ EXPORT_SYMBOL_GPL(flush_workqueue);
*/
int flush_work(struct work_struct *work)
{
+ struct worker *worker = NULL;
struct cpu_workqueue_struct *cwq;
- struct list_head *prev;
struct wq_barrier barr;

might_sleep();
@@ -968,14 +1074,14 @@ int flush_work(struct work_struct *work)
smp_rmb();
if (unlikely(cwq != get_wq_data(work)))
goto already_gone;
- prev = &work->entry;
} else {
- if (!cwq->worker || cwq->worker->current_work != work)
+ if (cwq->worker && cwq->worker->current_work == work)
+ worker = cwq->worker;
+ if (!worker)
goto already_gone;
- prev = &cwq->worklist;
}
- insert_wq_barrier(cwq, &barr, prev->next);

+ insert_wq_barrier(cwq, &barr, work, worker);
spin_unlock_irq(&cwq->lock);
wait_for_completion(&barr.done);
destroy_work_on_stack(&barr.work);
@@ -1032,16 +1138,19 @@ static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
struct work_struct *work)
{
struct wq_barrier barr;
- int running = 0;
+ struct worker *worker;

spin_lock_irq(&cwq->lock);
+
+ worker = NULL;
if (unlikely(cwq->worker && cwq->worker->current_work == work)) {
- insert_wq_barrier(cwq, &barr, cwq->worklist.next);
- running = 1;
+ worker = cwq->worker;
+ insert_wq_barrier(cwq, &barr, work, worker);
}
+
spin_unlock_irq(&cwq->lock);

- if (unlikely(running)) {
+ if (unlikely(worker)) {
wait_for_completion(&barr.done);
destroy_work_on_stack(&barr.work);
}
--
1.6.4.2

2009-11-16 17:18:03

by Tejun Heo

[permalink] [raw]
Subject: [PATCH 21/21] workqueue: reimplement workqueue freeze using cwq->frozen_works queue

Currently, workqueue freezing is implemented by marking the worker
freezeable and calling try_to_freeze() from dispatch loop.
Reimplement it so that the workqueue is frozen instead of the worker.

* cwq->cur_worklist and cwq->frozen_works are added. During normal
operation cwq->cur_worklist points to cwq->worklist.

* When freezing starts, cwq->cur_worklist is switched to
cwq->frozen_works so that new works are stored in cwq->frozen_works
instead of being processed.

* Freezing is complete when cwq->nr_in_flight equals the number of
works on cwq->frozen_works for all cwqs of all freezeable
workqueues.

* Thawing is done by restoring cwq->cur_worklist to cwq->worklist and
splicing cwq->frozen_works to cwq->worklist.

This new implementation allows having multiple shared workers per cpu.

Signed-off-by: Tejun Heo <[email protected]>
---
include/linux/workqueue.h | 7 ++
kernel/power/process.c | 22 +++++-
kernel/workqueue.c | 182 ++++++++++++++++++++++++++++++++++++++++++---
3 files changed, 199 insertions(+), 12 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 53d1410..d7efa66 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -326,4 +326,11 @@ static inline long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg)
#else
long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg);
#endif /* CONFIG_SMP */
+
+#ifdef CONFIG_FREEZER
+extern void freeze_workqueues_begin(void);
+extern bool freeze_workqueues_busy(void);
+extern void thaw_workqueues(void);
+#endif /* CONFIG_FREEZER */
+
#endif
diff --git a/kernel/power/process.c b/kernel/power/process.c
index cc2e553..701e17f 100644
--- a/kernel/power/process.c
+++ b/kernel/power/process.c
@@ -14,6 +14,7 @@
#include <linux/module.h>
#include <linux/syscalls.h>
#include <linux/freezer.h>
+#include <linux/workqueue.h>

/*
* Timeout for stopping processes
@@ -34,6 +35,7 @@ static int try_to_freeze_tasks(bool sig_only)
struct task_struct *g, *p;
unsigned long end_time;
unsigned int todo;
+ bool wq_busy = false;
struct timeval start, end;
u64 elapsed_csecs64;
unsigned int elapsed_csecs;
@@ -41,6 +43,10 @@ static int try_to_freeze_tasks(bool sig_only)
do_gettimeofday(&start);

end_time = jiffies + TIMEOUT;
+
+ if (!sig_only)
+ freeze_workqueues_begin();
+
do {
todo = 0;
read_lock(&tasklist_lock);
@@ -62,7 +68,14 @@ static int try_to_freeze_tasks(bool sig_only)
todo++;
} while_each_thread(g, p);
read_unlock(&tasklist_lock);
+
yield(); /* Yield is okay here */
+
+ if (!sig_only) {
+ wq_busy = freeze_workqueues_busy();
+ todo += wq_busy;
+ }
+
if (time_after(jiffies, end_time))
break;
} while (todo);
@@ -80,9 +93,13 @@ static int try_to_freeze_tasks(bool sig_only)
*/
printk("\n");
printk(KERN_ERR "Freezing of tasks failed after %d.%02d seconds "
- "(%d tasks refusing to freeze):\n",
- elapsed_csecs / 100, elapsed_csecs % 100, todo);
+ "(%d tasks refusing to freeze, wq_busy=%d):\n",
+ elapsed_csecs / 100, elapsed_csecs % 100,
+ todo - wq_busy, wq_busy);
show_state();
+
+ thaw_workqueues();
+
read_lock(&tasklist_lock);
do_each_thread(g, p) {
task_lock(p);
@@ -152,6 +169,7 @@ void thaw_processes(void)
oom_killer_enable();

printk("Restarting tasks ... ");
+ thaw_workqueues();
thaw_tasks(true);
thaw_tasks(false);
schedule();
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 3029bb2..933eb84 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -46,6 +46,10 @@
* F: wq->flush_mutex protected.
*
* W: workqueue_lock protected.
+ *
+ * V: Similar to L except that operation is limited to only one
+ * direction if workqueues are frozen (ie. can be added but can't
+ * be removed).
*/

struct cpu_workqueue_struct;
@@ -72,12 +76,14 @@ struct cpu_workqueue_struct {
wait_queue_head_t more_work;
unsigned int cpu;
struct worker *worker;
+ struct list_head *cur_worklist; /* L: current worklist */

struct workqueue_struct *wq; /* I: the owning workqueue */
int work_color; /* L: current color */
int flush_color; /* L: flushing color */
int nr_in_flight[WORK_COLORS];
/* L: nr of in_flight works */
+ struct list_head frozen_works; /* V: used while frozen */
} __attribute__((aligned(1 << WORK_STRUCT_FLAG_BITS)));

/*
@@ -227,6 +233,7 @@ static inline void debug_work_deactivate(struct work_struct *work) { }
static DEFINE_SPINLOCK(workqueue_lock);
static LIST_HEAD(workqueues);
static DEFINE_PER_CPU(struct ida, worker_ida);
+static bool workqueue_frozen;

static int worker_thread(void *__worker);

@@ -314,7 +321,7 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
debug_work_activate(work);
spin_lock_irqsave(&cwq->lock, flags);
BUG_ON(!list_empty(&work->entry));
- insert_work(cwq, work, &cwq->worklist, 0);
+ insert_work(cwq, work, cwq->cur_worklist, 0);
spin_unlock_irqrestore(&cwq->lock, flags);
}

@@ -711,19 +718,13 @@ static int worker_thread(void *__worker)
struct cpu_workqueue_struct *cwq = worker->cwq;
DEFINE_WAIT(wait);

- if (cwq->wq->flags & WQ_FREEZEABLE)
- set_freezable();
-
for (;;) {
prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
- if (!freezing(current) &&
- !kthread_should_stop() &&
+ if (!kthread_should_stop() &&
list_empty(&cwq->worklist))
schedule();
finish_wait(&cwq->more_work, &wait);

- try_to_freeze();
-
if (kthread_should_stop())
break;

@@ -1450,6 +1451,14 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
*/
spin_lock(&workqueue_lock);
list_add(&wq->list, &workqueues);
+ for_each_possible_cpu(cpu) {
+ struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+
+ if (workqueue_frozen && wq->flags & WQ_FREEZEABLE)
+ cwq->cur_worklist = &cwq->frozen_works;
+ else
+ cwq->cur_worklist = &cwq->worklist;
+ }
spin_unlock(&workqueue_lock);
/*
* We must initialize cwqs for each possible cpu even if we
@@ -1466,6 +1475,7 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
spin_lock_init(&cwq->lock);
INIT_LIST_HEAD(&cwq->worklist);
init_waitqueue_head(&cwq->more_work);
+ INIT_LIST_HEAD(&cwq->frozen_works);

if (failed || !cpu_online(cpu))
continue;
@@ -1502,12 +1512,17 @@ void destroy_workqueue(struct workqueue_struct *wq)
int cpu;

cpu_maps_update_begin();
+
+ flush_workqueue(wq);
+
+ /*
+ * wq list is used to freeze wq, remove from list after
+ * flushing is complete in case freeze races us.
+ */
spin_lock(&workqueue_lock);
list_del(&wq->list);
spin_unlock(&workqueue_lock);

- flush_workqueue(wq);
-
for_each_possible_cpu(cpu) {
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
int i;
@@ -1520,6 +1535,7 @@ void destroy_workqueue(struct workqueue_struct *wq)

for (i = 0; i < WORK_COLORS; i++)
BUG_ON(cwq->nr_in_flight[i]);
+ BUG_ON(!list_empty(&cwq->frozen_works));
}

cpu_maps_update_done();
@@ -1623,6 +1639,152 @@ long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg)
EXPORT_SYMBOL_GPL(work_on_cpu);
#endif /* CONFIG_SMP */

+#ifdef CONFIG_FREEZER
+/**
+ * freeze_workqueues_begin - begin freezing workqueues
+ *
+ * Start freezing workqueues. After this function returns, all
+ * freezeable workqueues will queue new works to their frozen_works
+ * list instead of the cwq ones.
+ *
+ * CONTEXT:
+ * Grabs and releases workqueue_lock and cwq->lock's.
+ */
+void freeze_workqueues_begin(void)
+{
+ struct workqueue_struct *wq;
+ unsigned int cpu;
+
+ spin_lock(&workqueue_lock);
+
+ BUG_ON(workqueue_frozen);
+ workqueue_frozen = true;
+
+ for_each_possible_cpu(cpu) {
+ list_for_each_entry(wq, &workqueues, list) {
+ struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+
+ if (!(wq->flags & WQ_FREEZEABLE))
+ continue;
+
+ spin_lock_irq(&cwq->lock);
+
+ BUG_ON(cwq->cur_worklist != &cwq->worklist);
+ BUG_ON(!list_empty(&cwq->frozen_works));
+
+ cwq->cur_worklist = &cwq->frozen_works;
+
+ spin_unlock_irq(&cwq->lock);
+ }
+ }
+ spin_unlock(&workqueue_lock);
+}
+
+/**
+ * freeze_workqueues_busy - are freezeable workqueues still busy?
+ *
+ * Check whether freezing is complete. This function must be called
+ * between freeeze_workqueues_begin() and thaw_workqueues().
+ *
+ * CONTEXT:
+ * Grabs and releases workqueue_lock.
+ *
+ * RETURNS:
+ * %true if some freezeable workqueues are still busy. %false if
+ * freezing is complete.
+ */
+bool freeze_workqueues_busy(void)
+{
+ struct workqueue_struct *wq;
+ unsigned int cpu;
+ bool busy = false;
+
+ spin_lock(&workqueue_lock);
+
+ BUG_ON(!workqueue_frozen);
+
+ for_each_possible_cpu(cpu) {
+ list_for_each_entry(wq, &workqueues, list) {
+ struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+ struct work_struct *work;
+ int i, nr_in_flight;
+
+ if (!(wq->flags & WQ_FREEZEABLE))
+ continue;
+
+ spin_lock_irq(&cwq->lock);
+
+ BUG_ON(cwq->cur_worklist != &cwq->frozen_works);
+
+ nr_in_flight = 0;
+ for (i = 0; i < WORK_COLORS; i++)
+ nr_in_flight += cwq->nr_in_flight[i];
+
+ list_for_each_entry(work, &cwq->frozen_works, entry)
+ nr_in_flight--;
+
+ spin_unlock_irq(&cwq->lock);
+
+ BUG_ON(nr_in_flight < 0);
+ if (nr_in_flight) {
+ busy = true;
+ break;
+ }
+ }
+ if (busy)
+ break;
+ }
+ spin_unlock(&workqueue_lock);
+ return busy;
+}
+
+/**
+ * thaw_workqueues - thaw workqueues
+ *
+ * Thaw workqueues. Normal queueing is restored and all collected
+ * frozen works are transferred to their respective cwq worklists.
+ *
+ * CONTEXT:
+ * Grabs and releases workqueue_lock and cwq->lock's.
+ */
+void thaw_workqueues(void)
+{
+ struct workqueue_struct *wq;
+ unsigned int cpu;
+
+ spin_lock(&workqueue_lock);
+
+ if (!workqueue_frozen)
+ goto out_unlock;
+
+ workqueue_frozen = false;
+
+ for_each_possible_cpu(cpu) {
+ list_for_each_entry(wq, &workqueues, list) {
+ struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+
+ if (!(wq->flags & WQ_FREEZEABLE))
+ continue;
+
+ spin_lock_irq(&cwq->lock);
+
+ /* switch to normal work queueing */
+ BUG_ON(cwq->cur_worklist != &cwq->frozen_works);
+ cwq->cur_worklist = &cwq->worklist;
+
+ /* transfer frozen tasks to cwq worklist */
+ list_splice_tail_init(&cwq->frozen_works,
+ &cwq->worklist);
+ wake_up(&cwq->more_work);
+
+ spin_unlock_irq(&cwq->lock);
+ }
+ }
+out_unlock:
+ spin_unlock(&workqueue_lock);
+}
+#endif /* CONFIG_FREEZER */
+
void __init init_workqueues(void)
{
unsigned int cpu;
--
1.6.4.2

2009-11-16 18:32:25

by Avi Kivity

[permalink] [raw]
Subject: Re: [PATCH 04/21] sched: implement scheduler notifiers

On 11/16/2009 07:15 PM, Tejun Heo wrote:
> Implement scheduler notifiers. This is superset of preempt notifiers
> which will be removed in favor of new notifiers. Four notifications
> are defined - activated, deactivated, in and out. In and out are
> identical to preempt notifiers. Activated and deactivated are called
> when a task's readiness to run changes. The first three are always
> called under rq lock. Out may not be called under rq lock depending
> on architecture.
>
> The notifier block contains union of all four callbacks to avoid
> defining separate interface for each.
>

> +
> +struct sched_notifier {
> + struct hlist_node link;
> + union {
> + void (*activated)(struct sched_notifier *n, bool wakeup);
> + void (*deactivated)(struct sched_notifier *n, bool sleep);
> + void (*in)(struct sched_notifier *n, struct task_struct *prev);
> + void (*out)(struct sched_notifier *n, struct task_struct *next);
> + };
> +};
> +
>
> struct task_struct {
> @@ -1237,6 +1268,8 @@ struct task_struct {
> /* list of struct preempt_notifier: */
> struct hlist_head preempt_notifiers;
> #endif
> + /* sched notifiers */
> + struct hlist_head notifiers[SCHED_NR_NOTIFIERS];
>
>

Four hlist_heads (64 bytes) is pretty heavy for this. I having all
members present in sched_notifier (instead of a union) and calling a
callback if it is not NULL. This reduces the overhead to 16 bytes at
the expense of an extra check for sched_notifier users.

Besides this, is there any difference to preempt_notifiers? if not we
can just add the new members and rename.

--
Do not meddle in the internals of kernels, for they are subtle and quick to panic.

2009-11-16 18:40:56

by Peter Zijlstra

[permalink] [raw]
Subject: Re: [PATCH 04/21] sched: implement scheduler notifiers

On Tue, 2009-11-17 at 02:15 +0900, Tejun Heo wrote:
> @@ -1939,6 +1953,8 @@ static int effective_prio(struct task_struct *p)
> */
> static void activate_task(struct rq *rq, struct task_struct *p, int wakeup)
> {
> + sched_notifier_call(p, SCHED_NOTIFIER_ACTIVATED, activated, wakeup);
> +
> if (task_contributes_to_load(p))
> rq->nr_uninterruptible--;
>
> @@ -1951,6 +1967,8 @@ static void activate_task(struct rq *rq, struct task_struct *p, int wakeup)
> */
> static void deactivate_task(struct rq *rq, struct task_struct *p, int sleep)
> {
> + sched_notifier_call(p, SCHED_NOTIFIER_DEACTIVATED, deactivated, sleep);
> +
> if (task_contributes_to_load(p))
> rq->nr_uninterruptible++;
>

I really hate exposing activate/deactivate.

You say:

> Activated and deactivated are called
> when a task's readiness to run changes.

How is that not clear from the out hook? It would seem to me that when
you get scheduled out with a p->state != TASK_RUNNING you're not ready.

2009-11-16 18:44:53

by Tejun Heo

[permalink] [raw]
Subject: Re: [PATCH 04/21] sched: implement scheduler notifiers

Hello,

Avi Kivity wrote:
> Four hlist_heads (64 bytes) is pretty heavy for this.

hlist_head is one pointer, so it will be 32bytes on 64bit machines.

> I having all members present in sched_notifier (instead of a union)
> and calling a callback if it is not NULL. This reduces the overhead
> to 16 bytes at the expense of an extra check for sched_notifier
> users.

And it will reduce the overhead to 8 bytes. Anyways, Linus was
against walking the list multiple times for different callbacks and
the way kvm uses these notifiers doesn't work very well with
allocating separate table on demand, so I just went with four
pointers. Given that these notifiers are quite unpopular yet, I lean
toward Avi's suggestion. Linus?

> Besides this, is there any difference to preempt_notifiers? if not we
> can just add the new members and rename.

Yeap, if we're gonna add things to ops table, I agree that would be
better.

Thanks.

--
tejun

2009-11-16 18:55:36

by Tejun Heo

[permalink] [raw]
Subject: Re: [PATCH 04/21] sched: implement scheduler notifiers

Peter Zijlstra wrote:
> I really hate exposing activate/deactivate.
>
> You say:
>
>> Activated and deactivated are called
>> when a task's readiness to run changes.
>
> How is that not clear from the out hook? It would seem to me that when
> you get scheduled out with a p->state != TASK_RUNNING you're not ready.

In that in OUT hook the next task to switch to is already determined
and it wouldn't be symmetric with activate (but I suppose we can match
the symmetry from activate side). If deactivate/activate/in/out
events are too low level, we can have sleep/ready/run hooks instead.

Thanks.

--
tejun

2009-11-16 20:30:04

by Peter Zijlstra

[permalink] [raw]
Subject: Re: [PATCH 04/21] sched: implement scheduler notifiers

On Tue, 2009-11-17 at 03:54 +0900, Tejun Heo wrote:
> Peter Zijlstra wrote:
> > I really hate exposing activate/deactivate.
> >
> > You say:
> >
> >> Activated and deactivated are called
> >> when a task's readiness to run changes.
> >
> > How is that not clear from the out hook? It would seem to me that when
> > you get scheduled out with a p->state != TASK_RUNNING you're not ready.
>
> In that in OUT hook the next task to switch to is already determined
> and it wouldn't be symmetric with activate (but I suppose we can match
> the symmetry from activate side). If deactivate/activate/in/out
> events are too low level,

Not too low, just wrong. Most functions operating on the scheduler state
like sys_renice(), sys_sched_setscheduler() etc.. all do a
deactivate/activate series, even though the task at hand never goes
through a sleep or blocking state.

> we can have sleep/ready/run hooks instead.

I would much prefer that, sleep/ready are significantly different from
deactivate/activate.

2009-11-16 23:04:40

by Frederic Weisbecker

[permalink] [raw]
Subject: Re: [PATCH 01/21] workqueue: fix race condition in schedule_on_each_cpu()

On Tue, Nov 17, 2009 at 02:15:06AM +0900, Tejun Heo wrote:
> Commit 65a64464349883891e21e74af16c05d6e1eeb4e9 which allows
> schedule_on_each_cpu() to be called from keventd added a race
> condition. schedule_on_each_cpu() may race with cpu hotplug and end
> up executing the function twice on a cpu.
>
> Fix it by moving direct execution into the section protected with
> get/put_online_cpus(). While at it, update code such that direct
> execution is done after works have been scheduled for all other cpus
> and drop unnecessary cpu != orig test from flush loop.
>
> Signed-off-by: Tejun Heo <[email protected]>
> Cc: Andi Kleen <[email protected]>
> Cc: Oleg Nesterov <[email protected]>
> ---


Shouldn't this patch go to .32 ?
This looks like an important fix.

2009-11-17 00:08:22

by Tejun Heo

[permalink] [raw]
Subject: Re: [PATCH 01/21] workqueue: fix race condition in schedule_on_each_cpu()

11/17/2009 08:04 AM, Frederic Weisbecker wrote:
> Shouldn't this patch go to .32 ?
> This looks like an important fix.

This one and the next one are already on their way to .32.

Thanks.

--
tejun

2009-11-17 00:50:16

by Andy Walls

[permalink] [raw]
Subject: Re: [PATCH 17/21] workqueue: simple reimplementation of SINGLE_THREAD workqueue

On Tue, 2009-11-17 at 02:15 +0900, Tejun Heo wrote:
> SINGLE_THREAD workqueues are used to reduce the number of worker
> threads and ease synchronization. The first reason will be irrelevant
> with concurrency managed workqueue implementation. Simplify
> SINGLE_THREAD implementation by creating the workqueues the same but
> making the worker grab mutex before actually executing works on the
> workqueue. In the long run, most SINGLE_THREAD workqueues will be
> replaced with generic ones.
>
> Signed-off-by: Tejun Heo <[email protected]>
>
> ---
> kernel/workqueue.c | 151 ++++++++++++++++++----------------------------------
> 1 files changed, 52 insertions(+), 99 deletions(-)
>
> diff --git a/kernel/workqueue.c b/kernel/workqueue.c
> index 5392939..82b03a1 100644
> --- a/kernel/workqueue.c
> +++ b/kernel/workqueue.c
> @@ -33,6 +33,7 @@
> #include <linux/kallsyms.h>
> #include <linux/debug_locks.h>
> #include <linux/lockdep.h>
> +#include <linux/mutex.h>
>
> /*
> * Structure fields follow one of the following exclusion rules.
> @@ -71,6 +72,7 @@ struct workqueue_struct {
> struct cpu_workqueue_struct *cpu_wq; /* I: cwq's */
> struct list_head list; /* W: list of all workqueues */
> const char *name; /* I: workqueue name */
> + struct mutex single_thread_mutex; /* for SINGLE_THREAD wq */
> #ifdef CONFIG_LOCKDEP
> struct lockdep_map lockdep_map;
> #endif


> @@ -410,6 +387,8 @@ EXPORT_SYMBOL_GPL(queue_delayed_work_on);
> static void process_one_work(struct cpu_workqueue_struct *cwq,
> struct work_struct *work)
> {
> + struct workqueue_struct *wq = cwq->wq;
> + bool single_thread = wq->flags & WQ_SINGLE_THREAD;
> work_func_t f = work->func;
> #ifdef CONFIG_LOCKDEP
> /*
> @@ -430,11 +409,18 @@ static void process_one_work(struct cpu_workqueue_struct *cwq,
>
> BUG_ON(get_wq_data(work) != cwq);
> work_clear_pending(work);
> - lock_map_acquire(&cwq->wq->lockdep_map);
> + lock_map_acquire(&wq->lockdep_map);
> lock_map_acquire(&lockdep_map);
> - f(work);
> +
> + if (unlikely(single_thread)) {
> + mutex_lock(&wq->single_thread_mutex);
> + f(work);
> + mutex_unlock(&wq->single_thread_mutex);
> + } else
> + f(work);
> +

An important property of the single threaded workqueue, upon which the
cx18 driver relies, is that work objects will be processed strictly in
the order in which they were queued. The cx18 driver has a pool of
"work orders" and multiple active work orders can be queued up on the
workqueue especially if multiple streams are active. If these work
orders were to be processed out of order, video artifacts would result
in video display applications.


With multiple work handling threads, I don't think the

mutex_lock(&wq->single_thread_mutex);
f(work);

here can guarantee work requests from the workqueue will always be
processed in the order they are received.

Am I missing something?

Regards,
Andy

2009-11-17 05:14:26

by Rusty Russell

[permalink] [raw]
Subject: Re: [PATCH 08/21] scheduler: implement force_cpus_allowed_ptr()

On Tue, 17 Nov 2009 03:45:13 am Tejun Heo wrote:
> Implement force_cpus_allowed_ptr() which is similar to
> set_cpus_allowed_ptr() but bypasses PF_THREAD_BOUND check and ignores
> cpu_active() status as long as the target cpu is online. This will be
> used for concurrency-managed workqueue.

Can we drop the silly _ptr() postfix? It was a hack someone added to
avoid churning set_cpus_allowed(), and no need to repeat here.

Thanks,
Rusty.

2009-11-17 05:19:41

by Tejun Heo

[permalink] [raw]
Subject: Re: [PATCH 08/21] scheduler: implement force_cpus_allowed_ptr()

11/17/2009 02:14 PM, Rusty Russell wrote:
> On Tue, 17 Nov 2009 03:45:13 am Tejun Heo wrote:
>> Implement force_cpus_allowed_ptr() which is similar to
>> set_cpus_allowed_ptr() but bypasses PF_THREAD_BOUND check and ignores
>> cpu_active() status as long as the target cpu is online. This will be
>> used for concurrency-managed workqueue.
>
> Can we drop the silly _ptr() postfix? It was a hack someone added to
> avoid churning set_cpus_allowed(), and no need to repeat here.

Yeah, it's an ugly name. I'll drop it from the new function.

Thanks.

--
tejun

2009-11-17 05:23:14

by Tejun Heo

[permalink] [raw]
Subject: Re: [PATCH 17/21] workqueue: simple reimplementation of SINGLE_THREAD workqueue

Hello,

11/17/2009 09:47 AM, Andy Walls wrote:
> An important property of the single threaded workqueue, upon which the
> cx18 driver relies, is that work objects will be processed strictly in
> the order in which they were queued. The cx18 driver has a pool of
> "work orders" and multiple active work orders can be queued up on the
> workqueue especially if multiple streams are active. If these work
> orders were to be processed out of order, video artifacts would result
> in video display applications.

That's an interesting use of single thread workqueue. Most of single
thread workqueues seem to be made single thread just to save number of
threads. Some seem to depend on single thread of execution but I
never knew there are ones which depend on the exact execution order.
Do you think that usage is wide-spread? Implementing strict ordering
shouldn't be too difficult but I can't help but feeling that such
assumption is abuse of implementation detail.

Thanks.

--
tejun

2009-11-17 07:06:26

by Avi Kivity

[permalink] [raw]
Subject: Re: [PATCH 01/21] workqueue: fix race condition in schedule_on_each_cpu()

On 11/17/2009 01:04 AM, Frederic Weisbecker wrote:
> On Tue, Nov 17, 2009 at 02:15:06AM +0900, Tejun Heo wrote:
>
>> Commit 65a64464349883891e21e74af16c05d6e1eeb4e9 which allows
>> schedule_on_each_cpu() to be called from keventd added a race
>> condition. schedule_on_each_cpu() may race with cpu hotplug and end
>> up executing the function twice on a cpu.
>>
>> Fix it by moving direct execution into the section protected with
>> get/put_online_cpus(). While at it, update code such that direct
>> execution is done after works have been scheduled for all other cpus
>> and drop unnecessary cpu != orig test from flush loop.
>>
>> Signed-off-by: Tejun Heo<[email protected]>
>> Cc: Andi Kleen<[email protected]>
>> Cc: Oleg Nesterov<[email protected]>
>> ---
>>
>
> Shouldn't this patch go to .32 ?
> This looks like an important fix.
>

Is anything using preeempt notifier on affected architectures?

--
Do not meddle in the internals of kernels, for they are subtle and quick to panic.

2009-11-17 11:45:22

by Louis Rilling

[permalink] [raw]
Subject: Re: [PATCH 19/21] workqueue: introduce worker

Hi Tejun,

On 17/11/09 2:15 +0900, Tejun Heo wrote:
> Separate out worker thread related information to struct worker from
> struct cpu_workqueue_struct and implement helper functions to deal
> with the new struct worker. The only change which is visible outside
> is that now workqueue worker are all named "kworker/CPUID:WORKERID"
> where WORKERID is allocated from per-cpu ida.
>
> This is in preparation of concurrency managed workqueue where shared
> multiple workers would be available per cpu.
>
> Signed-off-by: Tejun Heo <[email protected]>
> ---
> kernel/workqueue.c | 219 +++++++++++++++++++++++++++++++++++++---------------
> 1 files changed, 157 insertions(+), 62 deletions(-)
>
> diff --git a/kernel/workqueue.c b/kernel/workqueue.c
> index dce5ad5..6694b3e 100644
> --- a/kernel/workqueue.c
> +++ b/kernel/workqueue.c

[...]

> @@ -413,6 +426,105 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
> }
> EXPORT_SYMBOL_GPL(queue_delayed_work_on);
>
> +static struct worker *alloc_worker(void)
> +{
> + struct worker *worker;
> +
> + worker = kzalloc(sizeof(*worker), GFP_KERNEL);
> + return worker;
> +}
> +
> +/**
> + * create_worker - create a new workqueue worker
> + * @cwq: cwq the new worker will belong to
> + * @bind: whether to set affinity to @cpu or not
> + *
> + * Create a new worker which is bound to @cwq. The returned worker
> + * can be started by calling start_worker() or destroyed using
> + * destroy_worker().
> + *
> + * CONTEXT:
> + * Might sleep. Does GFP_KERNEL allocations.
> + *
> + * RETURNS:
> + * Pointer to the newly created worker.
> + */
> +static struct worker *create_worker(struct cpu_workqueue_struct *cwq, bool bind)
> +{
> + int id = -1;
> + struct worker *worker = NULL;
> +
> + spin_lock(&workqueue_lock);
> + while (ida_get_new(&per_cpu(worker_ida, cwq->cpu), &id)) {
> + spin_unlock_irq(&workqueue_lock);
> + if (!ida_pre_get(&per_cpu(worker_ida, cwq->cpu), GFP_KERNEL))
> + goto fail;
> + spin_lock(&workqueue_lock);
> + }
> + spin_unlock_irq(&workqueue_lock);
> +
> + worker = alloc_worker();
> + if (!worker)
> + goto fail;
> +
> + worker->cwq = cwq;
> + worker->id = id;
> +
> + worker->task = kthread_create(worker_thread, worker, "kworker/%u:%d",
> + cwq->cpu, id);
> + if (IS_ERR(worker->task))
> + goto fail;
> +
> + if (bind)
> + kthread_bind(worker->task, cwq->cpu);
> +
> + return worker;
> +fail:
> + if (id >= 0) {
> + spin_lock(&workqueue_lock);
> + ida_remove(&per_cpu(worker_ida, cwq->cpu), id);
> + spin_unlock_irq(&workqueue_lock);
> + }
> + kfree(worker);
> + return NULL;
> +}

AFAIU create_worker() should call spin_lock_irq() instead of spin_lock().

[...]

Thanks,

Louis

--
Dr Louis Rilling Kerlabs
Skype: louis.rilling Batiment Germanium
Phone: (+33|0) 6 80 89 08 23 80 avenue des Buttes de Coesmes
http://www.kerlabs.com/ 35700 Rennes


Attachments:
(No filename) (2.81 kB)
signature.asc (197.00 B)
Digital signature
Download all attachments

2009-11-17 11:51:39

by Louis Rilling

[permalink] [raw]
Subject: Re: [PATCH 19/21] workqueue: introduce worker

On 17/11/09 12:39 +0100, Louis Rilling wrote:
> Hi Tejun,
>
> On 17/11/09 2:15 +0900, Tejun Heo wrote:
> > Separate out worker thread related information to struct worker from
> > struct cpu_workqueue_struct and implement helper functions to deal
> > with the new struct worker. The only change which is visible outside
> > is that now workqueue worker are all named "kworker/CPUID:WORKERID"
> > where WORKERID is allocated from per-cpu ida.
> >
> > This is in preparation of concurrency managed workqueue where shared
> > multiple workers would be available per cpu.
> >
> > Signed-off-by: Tejun Heo <[email protected]>
> > ---
> > kernel/workqueue.c | 219 +++++++++++++++++++++++++++++++++++++---------------
> > 1 files changed, 157 insertions(+), 62 deletions(-)
> >
> > diff --git a/kernel/workqueue.c b/kernel/workqueue.c
> > index dce5ad5..6694b3e 100644
> > --- a/kernel/workqueue.c
> > +++ b/kernel/workqueue.c
>
> [...]
>
> > @@ -413,6 +426,105 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
> > }
> > EXPORT_SYMBOL_GPL(queue_delayed_work_on);
> >
> > +static struct worker *alloc_worker(void)
> > +{
> > + struct worker *worker;
> > +
> > + worker = kzalloc(sizeof(*worker), GFP_KERNEL);
> > + return worker;
> > +}
> > +
> > +/**
> > + * create_worker - create a new workqueue worker
> > + * @cwq: cwq the new worker will belong to
> > + * @bind: whether to set affinity to @cpu or not
> > + *
> > + * Create a new worker which is bound to @cwq. The returned worker
> > + * can be started by calling start_worker() or destroyed using
> > + * destroy_worker().
> > + *
> > + * CONTEXT:
> > + * Might sleep. Does GFP_KERNEL allocations.
> > + *
> > + * RETURNS:
> > + * Pointer to the newly created worker.
> > + */
> > +static struct worker *create_worker(struct cpu_workqueue_struct *cwq, bool bind)
> > +{
> > + int id = -1;
> > + struct worker *worker = NULL;
> > +
> > + spin_lock(&workqueue_lock);
> > + while (ida_get_new(&per_cpu(worker_ida, cwq->cpu), &id)) {
> > + spin_unlock_irq(&workqueue_lock);
> > + if (!ida_pre_get(&per_cpu(worker_ida, cwq->cpu), GFP_KERNEL))
> > + goto fail;
> > + spin_lock(&workqueue_lock);
> > + }
> > + spin_unlock_irq(&workqueue_lock);
> > +
> > + worker = alloc_worker();
> > + if (!worker)
> > + goto fail;
> > +
> > + worker->cwq = cwq;
> > + worker->id = id;
> > +
> > + worker->task = kthread_create(worker_thread, worker, "kworker/%u:%d",
> > + cwq->cpu, id);
> > + if (IS_ERR(worker->task))
> > + goto fail;
> > +
> > + if (bind)
> > + kthread_bind(worker->task, cwq->cpu);
> > +
> > + return worker;
> > +fail:
> > + if (id >= 0) {
> > + spin_lock(&workqueue_lock);
> > + ida_remove(&per_cpu(worker_ida, cwq->cpu), id);
> > + spin_unlock_irq(&workqueue_lock);
> > + }
> > + kfree(worker);
> > + return NULL;
> > +}
>
> AFAIU create_worker() should call spin_lock_irq() instead of spin_lock().

spin_unlock() instead of spin_unlock_irq() looks better in fact ;).

Louis

--
Dr Louis Rilling Kerlabs
Skype: louis.rilling Batiment Germanium
Phone: (+33|0) 6 80 89 08 23 80 avenue des Buttes de Coesmes
http://www.kerlabs.com/ 35700 Rennes


Attachments:
(No filename) (3.08 kB)
signature.asc (197.00 B)
Digital signature
Download all attachments

2009-11-17 12:07:29

by Andy Walls

[permalink] [raw]
Subject: Re: [PATCH 17/21] workqueue: simple reimplementation of SINGLE_THREAD workqueue

On Tue, 2009-11-17 at 14:23 +0900, Tejun Heo wrote:
> Hello,
>
> 11/17/2009 09:47 AM, Andy Walls wrote:
> > An important property of the single threaded workqueue, upon which the
> > cx18 driver relies, is that work objects will be processed strictly in
> > the order in which they were queued. The cx18 driver has a pool of
> > "work orders" and multiple active work orders can be queued up on the
> > workqueue especially if multiple streams are active. If these work
> > orders were to be processed out of order, video artifacts would result
> > in video display applications.
>
> That's an interesting use of single thread workqueue. Most of single
> thread workqueues seem to be made single thread just to save number of
> threads. Some seem to depend on single thread of execution but I
> never knew there are ones which depend on the exact execution order.
> Do you think that usage is wide-spread?

I doubt it.

Most that I have seen use the singlethreaded workqueue object with a
queue depth of essentially 1 for syncronization - as you have noted.


> Implementing strict ordering
> shouldn't be too difficult but I can't help but feeling that such
> assumption is abuse of implementation detail.

Hmmm, does not the "queue" in workqueue mean "FIFO"?

If not for strict ordering, why else would a driver absolutely need a
singlethreaded workqueue object? It seems to me the strict ording is
the driving requirement for a singlethreaded workqueue at all. Your
patch series indicates to me that the performance and synchronization
use cases are not driving requirements for a singlethreaded workqueue.

Thanks for your consideration.

Regards,
Andy

> Thanks.

2009-11-17 14:03:51

by Johannes Berg

[permalink] [raw]
Subject: Re: [PATCH 17/21] workqueue: simple reimplementation of SINGLE_THREAD workqueue

On Tue, 2009-11-17 at 02:15 +0900, Tejun Heo wrote:
> SINGLE_THREAD workqueues are used to reduce the number of worker
> threads and ease synchronization.

Wireless (mac80211) also requires that the order in which different work
structs are queued up is identical to the processing order. At least
some code was written with that assumption in mind, and I think it's
actually required in a few places.

Also, that unlikely() here:

> + if (unlikely(single_thread)) {
> + mutex_lock(&wq->single_thread_mutex);
> + f(work);
> + mutex_unlock(&wq->single_thread_mutex);
> + } else
> + f(work);

seems wrong, there are many single-threaded workqueues after all.

johannes


Attachments:
signature.asc (801.00 B)
This is a digitally signed message part

2009-11-17 15:08:50

by Linus Torvalds

[permalink] [raw]
Subject: Re: [PATCH 17/21] workqueue: simple reimplementation of SINGLE_THREAD workqueue



On Tue, 17 Nov 2009, Tejun Heo wrote:
>
> Do you think that usage is wide-spread? Implementing strict ordering
> shouldn't be too difficult but I can't help but feeling that such
> assumption is abuse of implementation detail.

I think it would be good if it were more than an implementation detail,
and was something documented and known.

The less random and timing-dependent our interfaces are, the better off we
are. Guaranteeing that a single-threaded workqueue is done in order seems
to me to be a GoodThing(tm), regardless of whether much code depends on
it.

Of course, if there is some fundamental reason why it wouldn't be the
case, that's another thing. But if you think uit should be easy, and since
there _are_ users, then it shouldn't be seen as an "implementation
detail". It's a feature.

Linus

2009-11-17 16:13:47

by Tejun Heo

[permalink] [raw]
Subject: Re: [PATCH 17/21] workqueue: simple reimplementation of SINGLE_THREAD workqueue

Hello, Linus.

11/18/2009 12:05 AM, Linus Torvalds wrote:
>> Do you think that usage is wide-spread? Implementing strict ordering
>> shouldn't be too difficult but I can't help but feeling that such
>> assumption is abuse of implementation detail.
>
> I think it would be good if it were more than an implementation detail,
> and was something documented and known.
>
> The less random and timing-dependent our interfaces are, the better off we
> are. Guaranteeing that a single-threaded workqueue is done in order seems
> to me to be a GoodThing(tm), regardless of whether much code depends on
> it.
>
> Of course, if there is some fundamental reason why it wouldn't be the
> case, that's another thing. But if you think uit should be easy, and since
> there _are_ users, then it shouldn't be seen as an "implementation
> detail". It's a feature.

I might have been too early with the 'easy' part but I definitely can
give it a shot. What do you think about the scheduler notifier
implementation? It seems we'll end up with three callbacks. It can
either be three hlist_heads in the struct_task linking each ops or
single hilst_head links ops tables (like the current preempt
notifiers). Which one should I go with?

Thanks.

--
tejun

2009-11-17 16:18:13

by Tejun Heo

[permalink] [raw]
Subject: Re: [PATCH 01/21] workqueue: fix race condition in schedule_on_each_cpu()

11/17/2009 04:04 PM, Avi Kivity wrote:
>> Shouldn't this patch go to .32 ?
>> This looks like an important fix.
>
> Is anything using preeempt notifier on affected architectures?

Checking... yeap, ia64 has both __ARCH_WANT_UNLOCKED_CTXSW and KVM.

--
tejun

2009-11-17 16:17:24

by Tejun Heo

[permalink] [raw]
Subject: Re: [PATCH 04/21] sched: implement scheduler notifiers

Hello,

11/17/2009 05:29 AM, Peter Zijlstra wrote:
>> we can have sleep/ready/run hooks instead.
>
> I would much prefer that, sleep/ready are significantly different from
> deactivate/activate.

Alright, will update.

Thanks.

--
tejun

2009-11-17 16:23:07

by Tejun Heo

[permalink] [raw]
Subject: Re: [PATCH 17/21] workqueue: simple reimplementation of SINGLE_THREAD workqueue

Hello,

11/17/2009 09:05 PM, Andy Walls wrote:
>> Implementing strict ordering
>> shouldn't be too difficult but I can't help but feeling that such
>> assumption is abuse of implementation detail.
>
> Hmmm, does not the "queue" in workqueue mean "FIFO"?

I don't think it necessarily means strict execution ordering.

> If not for strict ordering, why else would a driver absolutely need a
> singlethreaded workqueue object? It seems to me the strict ording is
> the driving requirement for a singlethreaded workqueue at all. Your
> patch series indicates to me that the performance and synchronization
> use cases are not driving requirements for a singlethreaded workqueue.

I still think the biggest reason why single threaded workqueue is used
is just to reduce the number of threads hanging around. I tried to
audit single thread users some time ago. My impression was that many
of single thread work users did synchronization itself anyway while
smaller portion depended on single threadedness. I didn't notice the
strict ordering requirement but then again I wasn't looking for them.
It seems there are at least two cases depending on FIFO behavior, so
let's see if we can retain the behavior for single threaded
workqueues (maybe it should be renamed to ORDERED?).

Thanks.

--
tejun

2009-11-17 16:26:17

by Tejun Heo

[permalink] [raw]
Subject: Re: [PATCH 17/21] workqueue: simple reimplementation of SINGLE_THREAD workqueue

Hello,

11/17/2009 11:03 PM, Johannes Berg wrote:
> On Tue, 2009-11-17 at 02:15 +0900, Tejun Heo wrote:
>> SINGLE_THREAD workqueues are used to reduce the number of worker
>> threads and ease synchronization.
>
> Wireless (mac80211) also requires that the order in which different work
> structs are queued up is identical to the processing order. At least
> some code was written with that assumption in mind, and I think it's
> actually required in a few places.

Thanks for pointing it out.

> Also, that unlikely() here:
>
>> + if (unlikely(single_thread)) {
>> + mutex_lock(&wq->single_thread_mutex);
>> + f(work);
>> + mutex_unlock(&wq->single_thread_mutex);
>> + } else
>> + f(work);
>
> seems wrong, there are many single-threaded workqueues after all.

Well, most single threaded users which chose single threaded queue to
reduce the number of threads won't need to, so I'm expecting the
number of single threaded users to drop. I'll probably drop the
unlikely on the next round.

Thanks.

--
tejun

2009-11-17 16:27:03

by Tejun Heo

[permalink] [raw]
Subject: Re: [PATCH 19/21] workqueue: introduce worker

Hello,

11/17/2009 08:51 PM, Louis Rilling wrote:
>>> +static struct worker *create_worker(struct cpu_workqueue_struct *cwq, bool bind)
>>> +{
>>> + int id = -1;
>>> + struct worker *worker = NULL;
>>> +
>>> + spin_lock(&workqueue_lock);
>>> + while (ida_get_new(&per_cpu(worker_ida, cwq->cpu), &id)) {
>>> + spin_unlock_irq(&workqueue_lock);
>>> + if (!ida_pre_get(&per_cpu(worker_ida, cwq->cpu), GFP_KERNEL))
>>> + goto fail;
>>> + spin_lock(&workqueue_lock);
>>> + }
>>> + spin_unlock_irq(&workqueue_lock);
>>> +
>>> + worker = alloc_worker();
>>> + if (!worker)
>>> + goto fail;
>>> +
>>> + worker->cwq = cwq;
>>> + worker->id = id;
>>> +
>>> + worker->task = kthread_create(worker_thread, worker, "kworker/%u:%d",
>>> + cwq->cpu, id);
>>> + if (IS_ERR(worker->task))
>>> + goto fail;
>>> +
>>> + if (bind)
>>> + kthread_bind(worker->task, cwq->cpu);
>>> +
>>> + return worker;
>>> +fail:
>>> + if (id >= 0) {
>>> + spin_lock(&workqueue_lock);
>>> + ida_remove(&per_cpu(worker_ida, cwq->cpu), id);
>>> + spin_unlock_irq(&workqueue_lock);
>>> + }
>>> + kfree(worker);
>>> + return NULL;
>>> +}
>>
>> AFAIU create_worker() should call spin_lock_irq() instead of spin_lock().
>
> spin_unlock() instead of spin_unlock_irq() looks better in fact ;).

Oooh... right, thanks for spotting that. I'll fix it up.

--
tejun

Subject: Hi ... I want to introduce myself :)

Hi all,

I come from Indonesia
I'm fanatique Linux user and i want to involve the Linux kernel


Setiajie Cahyadi
================
I am registered Linux user number 484420
http://counter.li.org/cgi-bin/certificate.cgi/484420



2009-11-17 19:04:17

by Linus Torvalds

[permalink] [raw]
Subject: Re: [PATCH 17/21] workqueue: simple reimplementation of SINGLE_THREAD workqueue



On Wed, 18 Nov 2009, Tejun Heo wrote:
>
> I might have been too early with the 'easy' part but I definitely can
> give it a shot. What do you think about the scheduler notifier
> implementation? It seems we'll end up with three callbacks. It can
> either be three hlist_heads in the struct_task linking each ops or
> single hilst_head links ops tables (like the current preempt
> notifiers). Which one should I go with?

I have to say that I don't know. Will this eventually be something common?
Is the cache footprint problem of 3 pointers that are usually empty worse
than the cache problem of following a chain where you don't use half the
entries? Who knows?

And when it actually _is_ used, is it going to be horrible to have a
possibly upredictable indirect branch (and on some architectures, _all_
indirect branches are unpredictable) in a really hot path?

In general, "notifiers" are always horrible. If there's only one or two
common cases, it's probably going to be better to hardcode those with
flags to be tested instead of following function pointers. So I just don't
know.

Linus