Hello,
4cb1ef64609f ("workqueue: Implement BH workqueues to eventually replace
tasklets") implemented workqueues that execute work items in the BH context
with the goal of eventually replacing tasklet.
While the existing workqueue API covers the basic queueing and canceling
operations, tasklet also has tasklet_disable*() which blocks the execution
of the tasklet until it's re-enabled with tasklet_enable(). The interface if
fairly widely used and workqueue currently doesn't have a counterpart.
This patchset implements disable/enable_work() and the delayed_work
counterparts to address the gap. The ability to block future executions is
something which some users asked for in the past, and, while not essential
as the caller can and often has to shutdown the queuer anyway, it's a nice
convenience to have. Also, timer grew a similar feature recently with
timer_shutdown().
- tasklet_disable() keeps disable depth so that a tasklet can be disabled
and re-enabled by multiple parties at the same time. Workqueue is updated
to carry 16bit disable count in work->data while the work item is not
queued. When non-zero, attempts to queue the work item fail.
- The cancel_work_sync() path used WORK_OFFQ_CANCELING to synchronize
multiple cancel_sync attempts. This added a completely separate wait
mechanism which wasn't very congruent with the rest of workqueue. This was
because the canceling state was carried in a way which couldn't
accommodate multiple concurrent uses. This mechanism is replaced by
disable - cancel_sync now simply disables the work item, flushes and
re-enables it.
- There is a wart in how tasklet_disable/enable() works. When a tasklet is
disabled, if the tasklet is queued, it keeps the softirq constantly raised
until the tasklet is re-enabled and executed. This makes disabling
unnecessarily expensive and brittle. The only busy looping workqueue's
implementation does is on the party that's trying to cancel_sync or
disable_sync to wait for the completion of the currently executing
instance, which should be safe as long as it's from process and BH
contexts.
- A disabled tasklet remembers whether it was queued while disabled and
starts executing when re-enabled. It turns out doing this with work items
is challenging as there are a lot more to remember and the synchronization
becomes more complicated too. Looking at the use cases and given the
continuity from how cancel_work_sync() works, it seems better to just
ignore queueings which happen while a work item is disabled and require
the users to explicitly queue the work item after re-enabling as
necessary. Most users should be able to re-enqueue unconditionally after
enabling.
This patchset is on top of wq/for-6.9 and contains the following 17 patches:
0001-workqueue-Cosmetic-changes.patch
0002-workqueue-Use-rcu_read_lock_any_held-instead-of-rcu_.patch
0003-workqueue-Rename-__cancel_work_timer-to-__cancel_tim.patch
0004-workqueue-Reorganize-flush-and-cancel-_sync-function.patch
0005-workqueue-Use-variable-name-irq_flags-for-saving-loc.patch
0006-workqueue-Introduce-work_cancel_flags.patch
0007-workqueue-Clean-up-enum-work_bits-and-related-consta.patch
0008-workqueue-Factor-out-work_grab_pending-from-__cancel.patch
0009-workqueue-Remove-clear_work_data.patch
0010-workqueue-Make-flags-handling-consistent-across-set_.patch
0011-workqueue-Preserve-OFFQ-bits-in-cancel-_sync-paths.patch
0012-workqueue-Implement-disable-enable-for-delayed-work-.patch
0013-workqueue-Remove-WORK_OFFQ_CANCELING.patch
0014-workqueue-Remember-whether-a-work-item-was-on-a-BH-w.patch
0015-workqueue-Update-how-start_flush_work-is-called.patch
0016-workqueue-Allow-cancel_work_sync-and-disable_work-fr.patch
0017-r8152-Convert-from-tasklet-to-BH-workqueue.patch
0001-0010 are cleanup and prep patches with the only functional change being
the use of rcu_read_lock_any_held() instead of rcu_read_lock() in 0002. I'll
apply them to wq/for-6.9 unless there are objections. I thought about making
these a separate patch series but the cleanups make more sense as a part of
this series.
For the rest of the series, given how many invasive workqueue changes are
already queued for v6.9 and the subtle nature of these patches, I think it'd
be best to defer them to the one after that so that we can use v6.9 as an
intermediate verification point.
0011-0012 implement disable_work() and enable_work(). At this stage, all
disable[_sync]_work and enable_work operations might_sleep(). disable_work()
and enable_work() due to CANCELING synchronization described above.
disable_work_sync() also needs to wait for the in-flight work item to finish
which requires blocking.
0013 replaces CANCELING with internal use of disble/enable. This removes one
ugliness from workqueue code and allows disable_work() and enable_work() to
be used from atomic contexts.
0014-0016 implement busy-wait for BH work items when they're being canceled
thus allowing cancel_work_sync() and disable_work_sync() to be called from
atomic contexts for them. This makes workqueue interface a superset of
tasklet and also makes BH workqueues easier to live with.
0017 converts drivers/net/r8152.c from tasklet to BH workqueue as a
demonstration. It seems to work fine.
The patchset is on top of wq/for-6.9 fd0a68a2337b ("workqueue, irq_work:
Build fix for !CONFIG_IRQ_WORK") and also available in the following git
branch:
git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq.git disable_work-v1
diffstat follows. Thanks.
drivers/net/usb/r8152.c | 44 ++--
include/linux/workqueue.h | 69 ++++---
kernel/workqueue.c | 623 ++++++++++++++++++++++++++++++++++++++++----------------------------
3 files changed, 441 insertions(+), 295 deletions(-)
--
tejun
Reorder some global declarations and adjust comments and whitespaces for
clarity and consistency. No functional changes.
Signed-off-by: Tejun Heo <[email protected]>
---
kernel/workqueue.c | 30 ++++++++++++++----------------
1 file changed, 14 insertions(+), 16 deletions(-)
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 6ae441e13804..b280caf81fb2 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -376,8 +376,6 @@ struct workqueue_struct {
struct wq_node_nr_active *node_nr_active[]; /* I: per-node nr_active */
};
-static struct kmem_cache *pwq_cache;
-
/*
* Each pod type describes how CPUs should be grouped for unbound workqueues.
* See the comment above workqueue_attrs->affn_scope.
@@ -389,20 +387,15 @@ struct wq_pod_type {
int *cpu_pod; /* cpu -> pod */
};
-static struct wq_pod_type wq_pod_types[WQ_AFFN_NR_TYPES];
-static enum wq_affn_scope wq_affn_dfl = WQ_AFFN_CACHE;
-
static const char *wq_affn_names[WQ_AFFN_NR_TYPES] = {
- [WQ_AFFN_DFL] = "default",
- [WQ_AFFN_CPU] = "cpu",
- [WQ_AFFN_SMT] = "smt",
- [WQ_AFFN_CACHE] = "cache",
- [WQ_AFFN_NUMA] = "numa",
- [WQ_AFFN_SYSTEM] = "system",
+ [WQ_AFFN_DFL] = "default",
+ [WQ_AFFN_CPU] = "cpu",
+ [WQ_AFFN_SMT] = "smt",
+ [WQ_AFFN_CACHE] = "cache",
+ [WQ_AFFN_NUMA] = "numa",
+ [WQ_AFFN_SYSTEM] = "system",
};
-static bool wq_topo_initialized __read_mostly = false;
-
/*
* Per-cpu work items which run for longer than the following threshold are
* automatically considered CPU intensive and excluded from concurrency
@@ -418,6 +411,12 @@ static bool wq_power_efficient = IS_ENABLED(CONFIG_WQ_POWER_EFFICIENT_DEFAULT);
module_param_named(power_efficient, wq_power_efficient, bool, 0444);
static bool wq_online; /* can kworkers be created yet? */
+static bool wq_topo_initialized __read_mostly = false;
+
+static struct kmem_cache *pwq_cache;
+
+static struct wq_pod_type wq_pod_types[WQ_AFFN_NR_TYPES];
+static enum wq_affn_scope wq_affn_dfl = WQ_AFFN_CACHE;
/* buf for wq_update_unbound_pod_attrs(), protected by CPU hotplug exclusion */
static struct workqueue_attrs *wq_update_pod_attrs_buf;
@@ -2231,7 +2230,6 @@ static void __queue_work(int cpu, struct workqueue_struct *wq,
*/
lockdep_assert_irqs_disabled();
-
/*
* For a draining wq, only works from the same workqueue are
* allowed. The __WQ_DESTROYING helps to spot the issue that
@@ -4121,8 +4119,8 @@ static bool __cancel_work_timer(struct work_struct *work, bool is_dwork)
local_irq_restore(flags);
/*
- * This allows canceling during early boot. We know that @work
- * isn't executing.
+ * Skip __flush_work() during early boot when we know that @work isn't
+ * executing. This allows canceling during early boot.
*/
if (wq_online)
__flush_work(work, true);
--
2.43.2
They are currently a bit disorganized with flush and cancel functions mixed.
Reoranize them so that flush functions come first, cancel next and
cancel_sync last. This way, we won't have to add prototypes for internal
functions for the planned disable/enable support.
This is pure code reorganization. No functional changes.
Signed-off-by: Tejun Heo <[email protected]>
---
kernel/workqueue.c | 136 ++++++++++++++++++++++-----------------------
1 file changed, 68 insertions(+), 68 deletions(-)
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 7e2af79bfa62..962061dca05c 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -4061,6 +4061,65 @@ bool flush_work(struct work_struct *work)
}
EXPORT_SYMBOL_GPL(flush_work);
+/**
+ * flush_delayed_work - wait for a dwork to finish executing the last queueing
+ * @dwork: the delayed work to flush
+ *
+ * Delayed timer is cancelled and the pending work is queued for
+ * immediate execution. Like flush_work(), this function only
+ * considers the last queueing instance of @dwork.
+ *
+ * Return:
+ * %true if flush_work() waited for the work to finish execution,
+ * %false if it was already idle.
+ */
+bool flush_delayed_work(struct delayed_work *dwork)
+{
+ local_irq_disable();
+ if (del_timer_sync(&dwork->timer))
+ __queue_work(dwork->cpu, dwork->wq, &dwork->work);
+ local_irq_enable();
+ return flush_work(&dwork->work);
+}
+EXPORT_SYMBOL(flush_delayed_work);
+
+/**
+ * flush_rcu_work - wait for a rwork to finish executing the last queueing
+ * @rwork: the rcu work to flush
+ *
+ * Return:
+ * %true if flush_rcu_work() waited for the work to finish execution,
+ * %false if it was already idle.
+ */
+bool flush_rcu_work(struct rcu_work *rwork)
+{
+ if (test_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&rwork->work))) {
+ rcu_barrier();
+ flush_work(&rwork->work);
+ return true;
+ } else {
+ return flush_work(&rwork->work);
+ }
+}
+EXPORT_SYMBOL(flush_rcu_work);
+
+static bool __cancel_work(struct work_struct *work, bool is_dwork)
+{
+ unsigned long flags;
+ int ret;
+
+ do {
+ ret = try_to_grab_pending(work, is_dwork, &flags);
+ } while (unlikely(ret == -EAGAIN));
+
+ if (unlikely(ret < 0))
+ return false;
+
+ set_work_pool_and_clear_pending(work, get_work_pool_id(work));
+ local_irq_restore(flags);
+ return ret;
+}
+
struct cwt_wait {
wait_queue_entry_t wait;
struct work_struct *work;
@@ -4139,6 +4198,15 @@ static bool __cancel_work_sync(struct work_struct *work, bool is_dwork)
return ret;
}
+/*
+ * See cancel_delayed_work()
+ */
+bool cancel_work(struct work_struct *work)
+{
+ return __cancel_work(work, false);
+}
+EXPORT_SYMBOL(cancel_work);
+
/**
* cancel_work_sync - cancel a work and wait for it to finish
* @work: the work to cancel
@@ -4163,74 +4231,6 @@ bool cancel_work_sync(struct work_struct *work)
}
EXPORT_SYMBOL_GPL(cancel_work_sync);
-/**
- * flush_delayed_work - wait for a dwork to finish executing the last queueing
- * @dwork: the delayed work to flush
- *
- * Delayed timer is cancelled and the pending work is queued for
- * immediate execution. Like flush_work(), this function only
- * considers the last queueing instance of @dwork.
- *
- * Return:
- * %true if flush_work() waited for the work to finish execution,
- * %false if it was already idle.
- */
-bool flush_delayed_work(struct delayed_work *dwork)
-{
- local_irq_disable();
- if (del_timer_sync(&dwork->timer))
- __queue_work(dwork->cpu, dwork->wq, &dwork->work);
- local_irq_enable();
- return flush_work(&dwork->work);
-}
-EXPORT_SYMBOL(flush_delayed_work);
-
-/**
- * flush_rcu_work - wait for a rwork to finish executing the last queueing
- * @rwork: the rcu work to flush
- *
- * Return:
- * %true if flush_rcu_work() waited for the work to finish execution,
- * %false if it was already idle.
- */
-bool flush_rcu_work(struct rcu_work *rwork)
-{
- if (test_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&rwork->work))) {
- rcu_barrier();
- flush_work(&rwork->work);
- return true;
- } else {
- return flush_work(&rwork->work);
- }
-}
-EXPORT_SYMBOL(flush_rcu_work);
-
-static bool __cancel_work(struct work_struct *work, bool is_dwork)
-{
- unsigned long flags;
- int ret;
-
- do {
- ret = try_to_grab_pending(work, is_dwork, &flags);
- } while (unlikely(ret == -EAGAIN));
-
- if (unlikely(ret < 0))
- return false;
-
- set_work_pool_and_clear_pending(work, get_work_pool_id(work));
- local_irq_restore(flags);
- return ret;
-}
-
-/*
- * See cancel_delayed_work()
- */
-bool cancel_work(struct work_struct *work)
-{
- return __cancel_work(work, false);
-}
-EXPORT_SYMBOL(cancel_work);
-
/**
* cancel_delayed_work - cancel a delayed work
* @dwork: delayed_work to cancel
--
2.43.2
__cancel_work_timer() is used to implement cancel_work_sync() and
cancel_delayed_work_sync(), similarly to how __cancel_work() is used to
implement cancel_work() and cancel_delayed_work(). ie. The _timer part of
the name is a complete misnomer. The difference from __cancel_work() is the
fact that it syncs against work item execution not whether it handles timers
or not.
Let's rename it to less confusing __cancel_work_sync(). No functional
change.
Signed-off-by: Tejun Heo <[email protected]>
---
kernel/workqueue.c | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 87750e70b638..7e2af79bfa62 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -4075,7 +4075,7 @@ static int cwt_wakefn(wait_queue_entry_t *wait, unsigned mode, int sync, void *k
return autoremove_wake_function(wait, mode, sync, key);
}
-static bool __cancel_work_timer(struct work_struct *work, bool is_dwork)
+static bool __cancel_work_sync(struct work_struct *work, bool is_dwork)
{
static DECLARE_WAIT_QUEUE_HEAD(cancel_waitq);
unsigned long flags;
@@ -4159,7 +4159,7 @@ static bool __cancel_work_timer(struct work_struct *work, bool is_dwork)
*/
bool cancel_work_sync(struct work_struct *work)
{
- return __cancel_work_timer(work, false);
+ return __cancel_work_sync(work, false);
}
EXPORT_SYMBOL_GPL(cancel_work_sync);
@@ -4264,7 +4264,7 @@ EXPORT_SYMBOL(cancel_delayed_work);
*/
bool cancel_delayed_work_sync(struct delayed_work *dwork)
{
- return __cancel_work_timer(&dwork->work, true);
+ return __cancel_work_sync(&dwork->work, true);
}
EXPORT_SYMBOL(cancel_delayed_work_sync);
--
2.43.2
Using the generic term `flags` for irq flags is conventional but can be
confusing as there's quite a bit of code dealing with work flags which
involves some subtleties. Let's use a more explicit name `irq_flags` for
local irq flags. No functional changes.
Signed-off-by: Tejun Heo <[email protected]>
---
kernel/workqueue.c | 76 +++++++++++++++++++++++-----------------------
1 file changed, 38 insertions(+), 38 deletions(-)
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 962061dca05c..b590d93d054b 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -2029,7 +2029,7 @@ static void pwq_dec_nr_in_flight(struct pool_workqueue *pwq, unsigned long work_
* try_to_grab_pending - steal work item from worklist and disable irq
* @work: work item to steal
* @is_dwork: @work is a delayed_work
- * @flags: place to store irq state
+ * @irq_flags: place to store irq state
*
* Try to grab PENDING bit of @work. This function can handle @work in any
* stable state - idle, on timer or on worklist.
@@ -2051,17 +2051,17 @@ static void pwq_dec_nr_in_flight(struct pool_workqueue *pwq, unsigned long work_
* irqsafe, ensures that we return -EAGAIN for finite short period of time.
*
* On successful return, >= 0, irq is disabled and the caller is
- * responsible for releasing it using local_irq_restore(*@flags).
+ * responsible for releasing it using local_irq_restore(*@irq_flags).
*
* This function is safe to call from any context including IRQ handler.
*/
static int try_to_grab_pending(struct work_struct *work, bool is_dwork,
- unsigned long *flags)
+ unsigned long *irq_flags)
{
struct worker_pool *pool;
struct pool_workqueue *pwq;
- local_irq_save(*flags);
+ local_irq_save(*irq_flags);
/* try to steal the timer if it exists */
if (is_dwork) {
@@ -2136,7 +2136,7 @@ static int try_to_grab_pending(struct work_struct *work, bool is_dwork,
raw_spin_unlock(&pool->lock);
fail:
rcu_read_unlock();
- local_irq_restore(*flags);
+ local_irq_restore(*irq_flags);
if (work_is_canceling(work))
return -ENOENT;
cpu_relax();
@@ -2344,16 +2344,16 @@ bool queue_work_on(int cpu, struct workqueue_struct *wq,
struct work_struct *work)
{
bool ret = false;
- unsigned long flags;
+ unsigned long irq_flags;
- local_irq_save(flags);
+ local_irq_save(irq_flags);
if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
__queue_work(cpu, wq, work);
ret = true;
}
- local_irq_restore(flags);
+ local_irq_restore(irq_flags);
return ret;
}
EXPORT_SYMBOL(queue_work_on);
@@ -2410,7 +2410,7 @@ static int select_numa_node_cpu(int node)
bool queue_work_node(int node, struct workqueue_struct *wq,
struct work_struct *work)
{
- unsigned long flags;
+ unsigned long irq_flags;
bool ret = false;
/*
@@ -2424,7 +2424,7 @@ bool queue_work_node(int node, struct workqueue_struct *wq,
*/
WARN_ON_ONCE(!(wq->flags & WQ_UNBOUND));
- local_irq_save(flags);
+ local_irq_save(irq_flags);
if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
int cpu = select_numa_node_cpu(node);
@@ -2433,7 +2433,7 @@ bool queue_work_node(int node, struct workqueue_struct *wq,
ret = true;
}
- local_irq_restore(flags);
+ local_irq_restore(irq_flags);
return ret;
}
EXPORT_SYMBOL_GPL(queue_work_node);
@@ -2503,17 +2503,17 @@ bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
{
struct work_struct *work = &dwork->work;
bool ret = false;
- unsigned long flags;
+ unsigned long irq_flags;
/* read the comment in __queue_work() */
- local_irq_save(flags);
+ local_irq_save(irq_flags);
if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
__queue_delayed_work(cpu, wq, dwork, delay);
ret = true;
}
- local_irq_restore(flags);
+ local_irq_restore(irq_flags);
return ret;
}
EXPORT_SYMBOL(queue_delayed_work_on);
@@ -2539,16 +2539,16 @@ EXPORT_SYMBOL(queue_delayed_work_on);
bool mod_delayed_work_on(int cpu, struct workqueue_struct *wq,
struct delayed_work *dwork, unsigned long delay)
{
- unsigned long flags;
+ unsigned long irq_flags;
int ret;
do {
- ret = try_to_grab_pending(&dwork->work, true, &flags);
+ ret = try_to_grab_pending(&dwork->work, true, &irq_flags);
} while (unlikely(ret == -EAGAIN));
if (likely(ret >= 0)) {
__queue_delayed_work(cpu, wq, dwork, delay);
- local_irq_restore(flags);
+ local_irq_restore(irq_flags);
}
/* -ENOENT from try_to_grab_pending() becomes %true */
@@ -4105,18 +4105,18 @@ EXPORT_SYMBOL(flush_rcu_work);
static bool __cancel_work(struct work_struct *work, bool is_dwork)
{
- unsigned long flags;
+ unsigned long irq_flags;
int ret;
do {
- ret = try_to_grab_pending(work, is_dwork, &flags);
+ ret = try_to_grab_pending(work, is_dwork, &irq_flags);
} while (unlikely(ret == -EAGAIN));
if (unlikely(ret < 0))
return false;
set_work_pool_and_clear_pending(work, get_work_pool_id(work));
- local_irq_restore(flags);
+ local_irq_restore(irq_flags);
return ret;
}
@@ -4137,11 +4137,11 @@ static int cwt_wakefn(wait_queue_entry_t *wait, unsigned mode, int sync, void *k
static bool __cancel_work_sync(struct work_struct *work, bool is_dwork)
{
static DECLARE_WAIT_QUEUE_HEAD(cancel_waitq);
- unsigned long flags;
+ unsigned long irq_flags;
int ret;
do {
- ret = try_to_grab_pending(work, is_dwork, &flags);
+ ret = try_to_grab_pending(work, is_dwork, &irq_flags);
/*
* If someone else is already canceling, wait for it to
* finish. flush_work() doesn't work for PREEMPT_NONE
@@ -4175,7 +4175,7 @@ static bool __cancel_work_sync(struct work_struct *work, bool is_dwork)
/* tell other tasks trying to grab @work to back off */
mark_work_canceling(work);
- local_irq_restore(flags);
+ local_irq_restore(irq_flags);
/*
* Skip __flush_work() during early boot when we know that @work isn't
@@ -5381,15 +5381,15 @@ static void wq_adjust_max_active(struct workqueue_struct *wq)
activated = false;
for_each_pwq(pwq, wq) {
- unsigned long flags;
+ unsigned long irq_flags;
/* can be called during early boot w/ irq disabled */
- raw_spin_lock_irqsave(&pwq->pool->lock, flags);
+ raw_spin_lock_irqsave(&pwq->pool->lock, irq_flags);
if (pwq_activate_first_inactive(pwq, true)) {
activated = true;
kick_pool(pwq->pool);
}
- raw_spin_unlock_irqrestore(&pwq->pool->lock, flags);
+ raw_spin_unlock_irqrestore(&pwq->pool->lock, irq_flags);
}
} while (activated);
}
@@ -5762,7 +5762,7 @@ EXPORT_SYMBOL_GPL(workqueue_congested);
unsigned int work_busy(struct work_struct *work)
{
struct worker_pool *pool;
- unsigned long flags;
+ unsigned long irq_flags;
unsigned int ret = 0;
if (work_pending(work))
@@ -5771,10 +5771,10 @@ unsigned int work_busy(struct work_struct *work)
rcu_read_lock();
pool = get_work_pool(work);
if (pool) {
- raw_spin_lock_irqsave(&pool->lock, flags);
+ raw_spin_lock_irqsave(&pool->lock, irq_flags);
if (find_worker_executing_work(pool, work))
ret |= WORK_BUSY_RUNNING;
- raw_spin_unlock_irqrestore(&pool->lock, flags);
+ raw_spin_unlock_irqrestore(&pool->lock, irq_flags);
}
rcu_read_unlock();
@@ -6006,7 +6006,7 @@ void show_one_workqueue(struct workqueue_struct *wq)
{
struct pool_workqueue *pwq;
bool idle = true;
- unsigned long flags;
+ unsigned long irq_flags;
for_each_pwq(pwq, wq) {
if (!pwq_is_empty(pwq)) {
@@ -6020,7 +6020,7 @@ void show_one_workqueue(struct workqueue_struct *wq)
pr_info("workqueue %s: flags=0x%x\n", wq->name, wq->flags);
for_each_pwq(pwq, wq) {
- raw_spin_lock_irqsave(&pwq->pool->lock, flags);
+ raw_spin_lock_irqsave(&pwq->pool->lock, irq_flags);
if (!pwq_is_empty(pwq)) {
/*
* Defer printing to avoid deadlocks in console
@@ -6031,7 +6031,7 @@ void show_one_workqueue(struct workqueue_struct *wq)
show_pwq(pwq);
printk_deferred_exit();
}
- raw_spin_unlock_irqrestore(&pwq->pool->lock, flags);
+ raw_spin_unlock_irqrestore(&pwq->pool->lock, irq_flags);
/*
* We could be printing a lot from atomic context, e.g.
* sysrq-t -> show_all_workqueues(). Avoid triggering
@@ -6050,10 +6050,10 @@ static void show_one_worker_pool(struct worker_pool *pool)
{
struct worker *worker;
bool first = true;
- unsigned long flags;
+ unsigned long irq_flags;
unsigned long hung = 0;
- raw_spin_lock_irqsave(&pool->lock, flags);
+ raw_spin_lock_irqsave(&pool->lock, irq_flags);
if (pool->nr_workers == pool->nr_idle)
goto next_pool;
@@ -6081,7 +6081,7 @@ static void show_one_worker_pool(struct worker_pool *pool)
pr_cont("\n");
printk_deferred_exit();
next_pool:
- raw_spin_unlock_irqrestore(&pool->lock, flags);
+ raw_spin_unlock_irqrestore(&pool->lock, irq_flags);
/*
* We could be printing a lot from atomic context, e.g.
* sysrq-t -> show_all_workqueues(). Avoid triggering
@@ -7212,10 +7212,10 @@ static DEFINE_PER_CPU(unsigned long, wq_watchdog_touched_cpu) = INITIAL_JIFFIES;
static void show_cpu_pool_hog(struct worker_pool *pool)
{
struct worker *worker;
- unsigned long flags;
+ unsigned long irq_flags;
int bkt;
- raw_spin_lock_irqsave(&pool->lock, flags);
+ raw_spin_lock_irqsave(&pool->lock, irq_flags);
hash_for_each(pool->busy_hash, bkt, worker, hentry) {
if (task_is_running(worker->task)) {
@@ -7233,7 +7233,7 @@ static void show_cpu_pool_hog(struct worker_pool *pool)
}
}
- raw_spin_unlock_irqrestore(&pool->lock, flags);
+ raw_spin_unlock_irqrestore(&pool->lock, irq_flags);
}
static void show_cpu_pools_hogs(void)
--
2.43.2
The bits of work->data are used for a few different purposes. How the bits
are used is determined by enum work_bits. The planned disable/enable support
will add another use, so let's clean it up a bit in preparation.
- Let WORK_STRUCT_*_BIT's values be determined by enum definition order.
- Deliminate different bit sections the same way using SHIFT and BITS
values.
- Rename __WORK_OFFQ_CANCELING to WORK_OFFQ_CANCELING_BIT for consistency.
- Introduce WORK_STRUCT_PWQ_SHIFT and replace WORK_STRUCT_FLAG_MASK and
WORK_STRUCT_WQ_DATA_MASK with WQ_STRUCT_PWQ_MASK for clarity.
- Improve documentation.
No functional changes.
Signed-off-by: Tejun Heo <[email protected]>
---
include/linux/workqueue.h | 58 +++++++++++++++++++++------------------
kernel/workqueue.c | 8 +++---
2 files changed, 36 insertions(+), 30 deletions(-)
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 1565bab9edc8..0ad534fe6673 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -24,41 +24,49 @@
enum work_bits {
WORK_STRUCT_PENDING_BIT = 0, /* work item is pending execution */
- WORK_STRUCT_INACTIVE_BIT= 1, /* work item is inactive */
- WORK_STRUCT_PWQ_BIT = 2, /* data points to pwq */
- WORK_STRUCT_LINKED_BIT = 3, /* next work is linked to this one */
+ WORK_STRUCT_INACTIVE_BIT, /* work item is inactive */
+ WORK_STRUCT_PWQ_BIT, /* data points to pwq */
+ WORK_STRUCT_LINKED_BIT, /* next work is linked to this one */
#ifdef CONFIG_DEBUG_OBJECTS_WORK
- WORK_STRUCT_STATIC_BIT = 4, /* static initializer (debugobjects) */
- WORK_STRUCT_COLOR_SHIFT = 5, /* color for workqueue flushing */
-#else
- WORK_STRUCT_COLOR_SHIFT = 4, /* color for workqueue flushing */
+ WORK_STRUCT_STATIC_BIT, /* static initializer (debugobjects) */
#endif
+ WORK_STRUCT_FLAG_BITS,
+ /* color for workqueue flushing */
+ WORK_STRUCT_COLOR_SHIFT = WORK_STRUCT_FLAG_BITS,
WORK_STRUCT_COLOR_BITS = 4,
/*
- * Reserve 8 bits off of pwq pointer w/ debugobjects turned off.
- * This makes pwqs aligned to 256 bytes and allows 16 workqueue
- * flush colors.
+ * When WORK_STRUCT_PWQ is set, reserve 8 bits off of pwq pointer w/
+ * debugobjects turned off. This makes pwqs aligned to 256 bytes (512
+ * bytes w/ DEBUG_OBJECTS_WORK) and allows 16 workqueue flush colors.
+ *
+ * MSB
+ * [ pwq pointer ] [ flush color ] [ STRUCT flags ]
+ * 4 bits 4 or 5 bits
*/
- WORK_STRUCT_FLAG_BITS = WORK_STRUCT_COLOR_SHIFT +
- WORK_STRUCT_COLOR_BITS,
+ WORK_STRUCT_PWQ_SHIFT = WORK_STRUCT_COLOR_SHIFT + WORK_STRUCT_COLOR_BITS,
- /* data contains off-queue information when !WORK_STRUCT_PWQ */
- WORK_OFFQ_FLAG_BASE = WORK_STRUCT_COLOR_SHIFT,
-
- __WORK_OFFQ_CANCELING = WORK_OFFQ_FLAG_BASE,
+ /*
+ * data contains off-queue information when !WORK_STRUCT_PWQ.
+ *
+ * MSB
+ * [ pool ID ] [ OFFQ flags ] [ STRUCT flags ]
+ * 1 bit 4 or 5 bits
+ */
+ WORK_OFFQ_FLAG_SHIFT = WORK_STRUCT_FLAG_BITS,
+ WORK_OFFQ_CANCELING_BIT = WORK_OFFQ_FLAG_SHIFT,
+ WORK_OFFQ_FLAG_END,
+ WORK_OFFQ_FLAG_BITS = WORK_OFFQ_FLAG_END - WORK_OFFQ_FLAG_SHIFT,
/*
- * When a work item is off queue, its high bits point to the last
- * pool it was on. Cap at 31 bits and use the highest number to
- * indicate that no pool is associated.
+ * When a work item is off queue, the high bits encode off-queue flags
+ * and the last pool it was on. Cap pool ID to 31 bits and use the
+ * highest number to indicate that no pool is associated.
*/
- WORK_OFFQ_FLAG_BITS = 1,
- WORK_OFFQ_POOL_SHIFT = WORK_OFFQ_FLAG_BASE + WORK_OFFQ_FLAG_BITS,
+ WORK_OFFQ_POOL_SHIFT = WORK_OFFQ_FLAG_SHIFT + WORK_OFFQ_FLAG_BITS,
WORK_OFFQ_LEFT = BITS_PER_LONG - WORK_OFFQ_POOL_SHIFT,
WORK_OFFQ_POOL_BITS = WORK_OFFQ_LEFT <= 31 ? WORK_OFFQ_LEFT : 31,
-
};
enum work_flags {
@@ -88,12 +96,10 @@ enum wq_misc_consts {
};
/* Convenience constants - of type 'unsigned long', not 'enum'! */
-#define WORK_OFFQ_CANCELING (1ul << __WORK_OFFQ_CANCELING)
+#define WORK_OFFQ_CANCELING (1ul << WORK_OFFQ_CANCELING_BIT)
#define WORK_OFFQ_POOL_NONE ((1ul << WORK_OFFQ_POOL_BITS) - 1)
#define WORK_STRUCT_NO_POOL (WORK_OFFQ_POOL_NONE << WORK_OFFQ_POOL_SHIFT)
-
-#define WORK_STRUCT_FLAG_MASK ((1ul << WORK_STRUCT_FLAG_BITS) - 1)
-#define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK)
+#define WORK_STRUCT_PWQ_MASK (~((1ul << WORK_STRUCT_PWQ_SHIFT) - 1))
#define WORK_DATA_INIT() ATOMIC_LONG_INIT((unsigned long)WORK_STRUCT_NO_POOL)
#define WORK_DATA_STATIC_INIT() \
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 317c85f051b0..7c6915e23c5c 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -247,7 +247,7 @@ enum pool_workqueue_stats {
};
/*
- * The per-pool workqueue. While queued, the lower WORK_STRUCT_FLAG_BITS
+ * The per-pool workqueue. While queued, bits below WORK_PWQ_SHIFT
* of work_struct->data are used for flags and the remaining high bits
* point to the pwq; thus, pwqs need to be aligned at two's power of the
* number of flag bits.
@@ -294,7 +294,7 @@ struct pool_workqueue {
*/
struct kthread_work release_work;
struct rcu_head rcu;
-} __aligned(1 << WORK_STRUCT_FLAG_BITS);
+} __aligned(1 << WORK_STRUCT_PWQ_SHIFT);
/*
* Structure used to wait for workqueue flush.
@@ -843,7 +843,7 @@ static void clear_work_data(struct work_struct *work)
static inline struct pool_workqueue *work_struct_pwq(unsigned long data)
{
- return (struct pool_workqueue *)(data & WORK_STRUCT_WQ_DATA_MASK);
+ return (struct pool_workqueue *)(data & WORK_STRUCT_PWQ_MASK);
}
static struct pool_workqueue *get_work_pwq(struct work_struct *work)
@@ -4851,7 +4851,7 @@ static void pwq_release_workfn(struct kthread_work *work)
static void init_pwq(struct pool_workqueue *pwq, struct workqueue_struct *wq,
struct worker_pool *pool)
{
- BUG_ON((unsigned long)pwq & WORK_STRUCT_FLAG_MASK);
+ BUG_ON((unsigned long)pwq & ~WORK_STRUCT_PWQ_MASK);
memset(pwq, 0, sizeof(*pwq));
--
2.43.2
The planned disable/enable support will need the same logic. Let's factor it
out. No functional changes.
Signed-off-by: Tejun Heo <[email protected]>
---
kernel/workqueue.c | 130 +++++++++++++++++++++++++++------------------
1 file changed, 78 insertions(+), 52 deletions(-)
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 7c6915e23c5c..38e589b6871c 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -484,6 +484,12 @@ static struct workqueue_attrs *unbound_std_wq_attrs[NR_STD_WORKER_POOLS];
/* I: attributes used when instantiating ordered pools on demand */
static struct workqueue_attrs *ordered_wq_attrs[NR_STD_WORKER_POOLS];
+/*
+ * Used to synchronize multiple cancel_sync attempts on the same work item. See
+ * work_grab_pending() and __cancel_work_sync().
+ */
+static DECLARE_WAIT_QUEUE_HEAD(wq_cancel_waitq);
+
/*
* I: kthread_worker to release pwq's. pwq release needs to be bounced to a
* process context while holding a pool lock. Bounce to a dedicated kthread
@@ -2147,6 +2153,73 @@ static int try_to_grab_pending(struct work_struct *work, u32 cflags,
return -EAGAIN;
}
+struct cwt_wait {
+ wait_queue_entry_t wait;
+ struct work_struct *work;
+};
+
+static int cwt_wakefn(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
+{
+ struct cwt_wait *cwait = container_of(wait, struct cwt_wait, wait);
+
+ if (cwait->work != key)
+ return 0;
+ return autoremove_wake_function(wait, mode, sync, key);
+}
+
+/**
+ * work_grab_pending - steal work item from worklist and disable irq
+ * @work: work item to steal
+ * @cflags: %WORK_CANCEL_ flags
+ *
+ * Grab PENDING bit of @work. @work can be in any stable state - idle, on timer
+ * or on worklist.
+ *
+ * Must be called in process context. IRQ is disabled on return. The caller is
+ * responsible for re-enabling it using local_irq_enable().
+ *
+ * Returns %true if @work was pending. %false if idle.
+ */
+static bool work_grab_pending(struct work_struct *work, u32 cflags,
+ unsigned long *irq_flags)
+{
+ struct cwt_wait cwait;
+ int ret;
+
+ might_sleep();
+repeat:
+ ret = try_to_grab_pending(work, cflags, irq_flags);
+ if (likely(ret >= 0))
+ return ret;
+ if (ret != -ENOENT)
+ goto repeat;
+
+ /*
+ * Someone is already canceling. Wait for it to finish. flush_work()
+ * doesn't work for PREEMPT_NONE because we may get woken up between
+ * @work's completion and the other canceling task resuming and clearing
+ * CANCELING - flush_work() will return false immediately as @work is no
+ * longer busy, try_to_grab_pending() will return -ENOENT as @work is
+ * still being canceled and the other canceling task won't be able to
+ * clear CANCELING as we're hogging the CPU.
+ *
+ * Let's wait for completion using a waitqueue. As this may lead to the
+ * thundering herd problem, use a custom wake function which matches
+ * @work along with exclusive wait and wakeup.
+ */
+ init_wait(&cwait.wait);
+ cwait.wait.func = cwt_wakefn;
+ cwait.work = work;
+
+ prepare_to_wait_exclusive(&wq_cancel_waitq, &cwait.wait,
+ TASK_UNINTERRUPTIBLE);
+ if (work_is_canceling(work))
+ schedule();
+ finish_wait(&wq_cancel_waitq, &cwait.wait);
+
+ goto repeat;
+}
+
/**
* insert_work - insert a work into a pool
* @pwq: pwq @work belongs to
@@ -4125,60 +4198,13 @@ static bool __cancel_work(struct work_struct *work, u32 cflags)
return ret;
}
-struct cwt_wait {
- wait_queue_entry_t wait;
- struct work_struct *work;
-};
-
-static int cwt_wakefn(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
-{
- struct cwt_wait *cwait = container_of(wait, struct cwt_wait, wait);
-
- if (cwait->work != key)
- return 0;
- return autoremove_wake_function(wait, mode, sync, key);
-}
-
static bool __cancel_work_sync(struct work_struct *work, u32 cflags)
{
- static DECLARE_WAIT_QUEUE_HEAD(cancel_waitq);
unsigned long irq_flags;
- int ret;
-
- do {
- ret = try_to_grab_pending(work, cflags, &irq_flags);
- /*
- * If someone else is already canceling, wait for it to
- * finish. flush_work() doesn't work for PREEMPT_NONE
- * because we may get scheduled between @work's completion
- * and the other canceling task resuming and clearing
- * CANCELING - flush_work() will return false immediately
- * as @work is no longer busy, try_to_grab_pending() will
- * return -ENOENT as @work is still being canceled and the
- * other canceling task won't be able to clear CANCELING as
- * we're hogging the CPU.
- *
- * Let's wait for completion using a waitqueue. As this
- * may lead to the thundering herd problem, use a custom
- * wake function which matches @work along with exclusive
- * wait and wakeup.
- */
- if (unlikely(ret == -ENOENT)) {
- struct cwt_wait cwait;
-
- init_wait(&cwait.wait);
- cwait.wait.func = cwt_wakefn;
- cwait.work = work;
-
- prepare_to_wait_exclusive(&cancel_waitq, &cwait.wait,
- TASK_UNINTERRUPTIBLE);
- if (work_is_canceling(work))
- schedule();
- finish_wait(&cancel_waitq, &cwait.wait);
- }
- } while (unlikely(ret < 0));
+ bool ret;
- /* tell other tasks trying to grab @work to back off */
+ /* claim @work and tell other tasks trying to grab @work to back off */
+ ret = work_grab_pending(work, cflags, &irq_flags);
mark_work_canceling(work);
local_irq_restore(irq_flags);
@@ -4197,8 +4223,8 @@ static bool __cancel_work_sync(struct work_struct *work, u32 cflags)
* visible there.
*/
smp_mb();
- if (waitqueue_active(&cancel_waitq))
- __wake_up(&cancel_waitq, TASK_NORMAL, 1, work);
+ if (waitqueue_active(&wq_cancel_waitq))
+ __wake_up(&wq_cancel_waitq, TASK_NORMAL, 1, work);
return ret;
}
--
2.43.2
clear_work_data() is only used in one place and immediately followed by
smp_mb(), making it equivalent to set_work_pool_and_clear_pending() w/
WORK_OFFQ_POOL_NONE for @pool_id. Drop it. No functional changes.
Signed-off-by: Tejun Heo <[email protected]>
---
kernel/workqueue.c | 24 ++++++++----------------
1 file changed, 8 insertions(+), 16 deletions(-)
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 38e589b6871c..ea53b53f8ff9 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -763,10 +763,9 @@ static int work_next_color(int color)
* contain the pointer to the queued pwq. Once execution starts, the flag
* is cleared and the high bits contain OFFQ flags and pool ID.
*
- * set_work_pwq(), set_work_pool_and_clear_pending(), mark_work_canceling()
- * and clear_work_data() can be used to set the pwq, pool or clear
- * work->data. These functions should only be called while the work is
- * owned - ie. while the PENDING bit is set.
+ * set_work_pwq(), set_work_pool_and_clear_pending() and mark_work_canceling()
+ * can be used to set the pwq, pool or clear work->data. These functions should
+ * only be called while the work is owned - ie. while the PENDING bit is set.
*
* get_work_pool() and get_work_pwq() can be used to obtain the pool or pwq
* corresponding to a work. Pool is available once the work has been
@@ -841,12 +840,6 @@ static void set_work_pool_and_clear_pending(struct work_struct *work,
smp_mb();
}
-static void clear_work_data(struct work_struct *work)
-{
- smp_wmb(); /* see set_work_pool_and_clear_pending() */
- set_work_data(work, WORK_STRUCT_NO_POOL, 0);
-}
-
static inline struct pool_workqueue *work_struct_pwq(unsigned long data)
{
return (struct pool_workqueue *)(data & WORK_STRUCT_PWQ_MASK);
@@ -4215,14 +4208,13 @@ static bool __cancel_work_sync(struct work_struct *work, u32 cflags)
if (wq_online)
__flush_work(work, true);
- clear_work_data(work);
-
/*
- * Paired with prepare_to_wait() above so that either
- * waitqueue_active() is visible here or !work_is_canceling() is
- * visible there.
+ * smp_mb() at the end of set_work_pool_and_clear_pending() is paired
+ * with prepare_to_wait() above so that either waitqueue_active() is
+ * visible here or !work_is_canceling() is visible there.
*/
- smp_mb();
+ set_work_pool_and_clear_pending(work, WORK_OFFQ_POOL_NONE);
+
if (waitqueue_active(&wq_cancel_waitq))
__wake_up(&wq_cancel_waitq, TASK_NORMAL, 1, work);
--
2.43.2
- set_work_data() takes a separate @flags argument but just ORs it to @data.
This is more confusing than helpful. Just take @data.
- Use the name @flags consistently and add the parameter to
set_work_pool_and_{keep|clear}_pending(). This will be used by the planned
disable/enable support.
No functional changes.
Signed-off-by: Tejun Heo <[email protected]>
---
kernel/workqueue.c | 32 ++++++++++++++++----------------
1 file changed, 16 insertions(+), 16 deletions(-)
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index ea53b53f8ff9..7aa53a2e41af 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -777,29 +777,28 @@ static int work_next_color(int color)
* but stay off timer and worklist for arbitrarily long and nobody should
* try to steal the PENDING bit.
*/
-static inline void set_work_data(struct work_struct *work, unsigned long data,
- unsigned long flags)
+static inline void set_work_data(struct work_struct *work, unsigned long data)
{
WARN_ON_ONCE(!work_pending(work));
- atomic_long_set(&work->data, data | flags | work_static(work));
+ atomic_long_set(&work->data, data | work_static(work));
}
static void set_work_pwq(struct work_struct *work, struct pool_workqueue *pwq,
- unsigned long extra_flags)
+ unsigned long flags)
{
- set_work_data(work, (unsigned long)pwq,
- WORK_STRUCT_PENDING | WORK_STRUCT_PWQ | extra_flags);
+ set_work_data(work, (unsigned long)pwq | WORK_STRUCT_PENDING |
+ WORK_STRUCT_PWQ | flags);
}
static void set_work_pool_and_keep_pending(struct work_struct *work,
- int pool_id)
+ int pool_id, unsigned long flags)
{
- set_work_data(work, (unsigned long)pool_id << WORK_OFFQ_POOL_SHIFT,
- WORK_STRUCT_PENDING);
+ set_work_data(work, ((unsigned long)pool_id << WORK_OFFQ_POOL_SHIFT) |
+ WORK_STRUCT_PENDING | flags);
}
static void set_work_pool_and_clear_pending(struct work_struct *work,
- int pool_id)
+ int pool_id, unsigned long flags)
{
/*
* The following wmb is paired with the implied mb in
@@ -808,7 +807,8 @@ static void set_work_pool_and_clear_pending(struct work_struct *work,
* owner.
*/
smp_wmb();
- set_work_data(work, (unsigned long)pool_id << WORK_OFFQ_POOL_SHIFT, 0);
+ set_work_data(work, ((unsigned long)pool_id << WORK_OFFQ_POOL_SHIFT) |
+ flags);
/*
* The following mb guarantees that previous clear of a PENDING bit
* will not be reordered with any speculative LOADS or STORES from
@@ -909,7 +909,7 @@ static void mark_work_canceling(struct work_struct *work)
unsigned long pool_id = get_work_pool_id(work);
pool_id <<= WORK_OFFQ_POOL_SHIFT;
- set_work_data(work, pool_id | WORK_OFFQ_CANCELING, WORK_STRUCT_PENDING);
+ set_work_data(work, pool_id | WORK_STRUCT_PENDING | WORK_OFFQ_CANCELING);
}
static bool work_is_canceling(struct work_struct *work)
@@ -2127,7 +2127,7 @@ static int try_to_grab_pending(struct work_struct *work, u32 cflags,
* this destroys work->data needed by the next step, stash it.
*/
work_data = *work_data_bits(work);
- set_work_pool_and_keep_pending(work, pool->id);
+ set_work_pool_and_keep_pending(work, pool->id, 0);
/* must be the last step, see the function comment */
pwq_dec_nr_in_flight(pwq, work_data);
@@ -3203,7 +3203,7 @@ __acquires(&pool->lock)
* PENDING and queued state changes happen together while IRQ is
* disabled.
*/
- set_work_pool_and_clear_pending(work, pool->id);
+ set_work_pool_and_clear_pending(work, pool->id, 0);
pwq->stats[PWQ_STAT_STARTED]++;
raw_spin_unlock_irq(&pool->lock);
@@ -4186,7 +4186,7 @@ static bool __cancel_work(struct work_struct *work, u32 cflags)
if (unlikely(ret < 0))
return false;
- set_work_pool_and_clear_pending(work, get_work_pool_id(work));
+ set_work_pool_and_clear_pending(work, get_work_pool_id(work), 0);
local_irq_restore(irq_flags);
return ret;
}
@@ -4213,7 +4213,7 @@ static bool __cancel_work_sync(struct work_struct *work, u32 cflags)
* with prepare_to_wait() above so that either waitqueue_active() is
* visible here or !work_is_canceling() is visible there.
*/
- set_work_pool_and_clear_pending(work, WORK_OFFQ_POOL_NONE);
+ set_work_pool_and_clear_pending(work, WORK_OFFQ_POOL_NONE, 0);
if (waitqueue_active(&wq_cancel_waitq))
__wake_up(&wq_cancel_waitq, TASK_NORMAL, 1, work);
--
2.43.2
The cancel[_sync] paths acquire and release WORK_STRUCT_PENDING, and
manipulate WORK_OFFQ_CANCELING. However, they assume that all the OFFQ bit
values except for the pool ID are statically known and don't preserve them,
which is not wrong in the current code as the pool ID and CANCELING are the
only information carried. However, the planned disable/enable support will
add more fields and need them to be preserved.
This patch updates work data handling so that only the bits which need
updating are updated.
- struct work_offq_data is added along with work_offqd_unpack() and
work_offqd_pack_flags() to help manipulating multiple fields contained in
work->data. Note that the helpers look a bit silly right now as there
isn't that much to pack. The next patch will add more.
- mark_work_canceling() which is used only by __cancel_work_sync() is
replaced by open-coded usage of work_offq_data and
set_work_pool_and_keep_pending() in __cancel_work_sync().
- __cancel_work[_sync]() uses offq_data helpers to preserve other OFFQ bits
when clearing WORK_STRUCT_PENDING and WORK_OFFQ_CANCELING at the end.
- This removes all users of get_work_pool_id() which is dropped. Note that
get_work_pool_id() could handle both WORK_STRUCT_PWQ and !WORK_STRUCT_PWQ
cases; however, it was only being called after try_to_grab_pending()
succeeded, in which case WORK_STRUCT_PWQ is never set and thus it's safe
to use work_offqd_unpack() instead.
No behavior changes intended.
Signed-off-by: Tejun Heo <[email protected]>
---
include/linux/workqueue.h | 1 +
kernel/workqueue.c | 51 ++++++++++++++++++++++++---------------
2 files changed, 32 insertions(+), 20 deletions(-)
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 0ad534fe6673..e15fc77bf2e2 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -97,6 +97,7 @@ enum wq_misc_consts {
/* Convenience constants - of type 'unsigned long', not 'enum'! */
#define WORK_OFFQ_CANCELING (1ul << WORK_OFFQ_CANCELING_BIT)
+#define WORK_OFFQ_FLAG_MASK (((1ul << WORK_OFFQ_FLAG_BITS) - 1) << WORK_OFFQ_FLAG_SHIFT)
#define WORK_OFFQ_POOL_NONE ((1ul << WORK_OFFQ_POOL_BITS) - 1)
#define WORK_STRUCT_NO_POOL (WORK_OFFQ_POOL_NONE << WORK_OFFQ_POOL_SHIFT)
#define WORK_STRUCT_PWQ_MASK (~((1ul << WORK_STRUCT_PWQ_SHIFT) - 1))
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 7aa53a2e41af..e8f310aa16d6 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -391,6 +391,11 @@ struct wq_pod_type {
int *cpu_pod; /* cpu -> pod */
};
+struct work_offq_data {
+ u32 pool_id;
+ u32 flags;
+};
+
static const char *wq_affn_names[WQ_AFFN_NR_TYPES] = {
[WQ_AFFN_DFL] = "default",
[WQ_AFFN_CPU] = "cpu",
@@ -887,29 +892,23 @@ static struct worker_pool *get_work_pool(struct work_struct *work)
return idr_find(&worker_pool_idr, pool_id);
}
-/**
- * get_work_pool_id - return the worker pool ID a given work is associated with
- * @work: the work item of interest
- *
- * Return: The worker_pool ID @work was last associated with.
- * %WORK_OFFQ_POOL_NONE if none.
- */
-static int get_work_pool_id(struct work_struct *work)
+static unsigned long shift_and_mask(unsigned long v, u32 shift, u32 bits)
{
- unsigned long data = atomic_long_read(&work->data);
+ return (v >> shift) & ((1 << bits) - 1);
+}
- if (data & WORK_STRUCT_PWQ)
- return work_struct_pwq(data)->pool->id;
+static void work_offqd_unpack(struct work_offq_data *offqd, unsigned long data)
+{
+ WARN_ON_ONCE(data & WORK_STRUCT_PWQ);
- return data >> WORK_OFFQ_POOL_SHIFT;
+ offqd->pool_id = shift_and_mask(data, WORK_OFFQ_POOL_SHIFT,
+ WORK_OFFQ_POOL_BITS);
+ offqd->flags = data & WORK_OFFQ_FLAG_MASK;
}
-static void mark_work_canceling(struct work_struct *work)
+static unsigned long work_offqd_pack_flags(struct work_offq_data *offqd)
{
- unsigned long pool_id = get_work_pool_id(work);
-
- pool_id <<= WORK_OFFQ_POOL_SHIFT;
- set_work_data(work, pool_id | WORK_STRUCT_PENDING | WORK_OFFQ_CANCELING);
+ return (unsigned long)offqd->flags;
}
static bool work_is_canceling(struct work_struct *work)
@@ -4176,6 +4175,7 @@ EXPORT_SYMBOL(flush_rcu_work);
static bool __cancel_work(struct work_struct *work, u32 cflags)
{
+ struct work_offq_data offqd;
unsigned long irq_flags;
int ret;
@@ -4186,19 +4186,26 @@ static bool __cancel_work(struct work_struct *work, u32 cflags)
if (unlikely(ret < 0))
return false;
- set_work_pool_and_clear_pending(work, get_work_pool_id(work), 0);
+ work_offqd_unpack(&offqd, *work_data_bits(work));
+ set_work_pool_and_clear_pending(work, offqd.pool_id,
+ work_offqd_pack_flags(&offqd));
local_irq_restore(irq_flags);
return ret;
}
static bool __cancel_work_sync(struct work_struct *work, u32 cflags)
{
+ struct work_offq_data offqd;
unsigned long irq_flags;
bool ret;
/* claim @work and tell other tasks trying to grab @work to back off */
ret = work_grab_pending(work, cflags, &irq_flags);
- mark_work_canceling(work);
+
+ work_offqd_unpack(&offqd, *work_data_bits(work));
+ offqd.flags |= WORK_OFFQ_CANCELING;
+ set_work_pool_and_keep_pending(work, offqd.pool_id,
+ work_offqd_pack_flags(&offqd));
local_irq_restore(irq_flags);
/*
@@ -4208,12 +4215,16 @@ static bool __cancel_work_sync(struct work_struct *work, u32 cflags)
if (wq_online)
__flush_work(work, true);
+ work_offqd_unpack(&offqd, *work_data_bits(work));
+
/*
* smp_mb() at the end of set_work_pool_and_clear_pending() is paired
* with prepare_to_wait() above so that either waitqueue_active() is
* visible here or !work_is_canceling() is visible there.
*/
- set_work_pool_and_clear_pending(work, WORK_OFFQ_POOL_NONE, 0);
+ offqd.flags &= ~WORK_OFFQ_CANCELING;
+ set_work_pool_and_clear_pending(work, WORK_OFFQ_POOL_NONE,
+ work_offqd_pack_flags(&offqd));
if (waitqueue_active(&wq_cancel_waitq))
__wake_up(&wq_cancel_waitq, TASK_NORMAL, 1, work);
--
2.43.2
While (delayed) work items could be flushed and canceled, there was no way
to prevent them from being queued in the future. While this didn't lead to
functional deficiencies, it sometimes required a bit more effort from the
workqueue users to e.g. sequence shutdown steps with more care.
Workqueue is currently in the process of replacing tasklet which does
support disabling and enabling. The feature is used relatively widely to,
for example, temporarily suppress main path while a control plane operation
(reset or config change) is in progress.
To enable easy conversion of tasklet users and as it seems like an inherent
useful feature, this patch implements disabling and enabling of work items.
- A work item carries 10bit disable count in work->data while not queued.
The access to the count is synchronized by the PENDING bit like all other
parts of work->data.
- If the count is non-zero, the work item cannot be queued. Any attempt to
queue the work item fails and returns %false.
- disable_work[_sync](), enable_work(), disable_delayed_work[_sync]() and
enable_delayed_work() are added.
Signed-off-by: Tejun Heo <[email protected]>
---
include/linux/workqueue.h | 18 +++-
kernel/workqueue.c | 167 ++++++++++++++++++++++++++++++++++++--
2 files changed, 174 insertions(+), 11 deletions(-)
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index e15fc77bf2e2..f25915e47efb 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -51,20 +51,23 @@ enum work_bits {
* data contains off-queue information when !WORK_STRUCT_PWQ.
*
* MSB
- * [ pool ID ] [ OFFQ flags ] [ STRUCT flags ]
- * 1 bit 4 or 5 bits
+ * [ pool ID ] [ disable depth ] [ OFFQ flags ] [ STRUCT flags ]
+ * 16 bits 1 bit 4 or 5 bits
*/
WORK_OFFQ_FLAG_SHIFT = WORK_STRUCT_FLAG_BITS,
WORK_OFFQ_CANCELING_BIT = WORK_OFFQ_FLAG_SHIFT,
WORK_OFFQ_FLAG_END,
WORK_OFFQ_FLAG_BITS = WORK_OFFQ_FLAG_END - WORK_OFFQ_FLAG_SHIFT,
+ WORK_OFFQ_DISABLE_SHIFT = WORK_OFFQ_FLAG_SHIFT + WORK_OFFQ_FLAG_BITS,
+ WORK_OFFQ_DISABLE_BITS = 16,
+
/*
* When a work item is off queue, the high bits encode off-queue flags
* and the last pool it was on. Cap pool ID to 31 bits and use the
* highest number to indicate that no pool is associated.
*/
- WORK_OFFQ_POOL_SHIFT = WORK_OFFQ_FLAG_SHIFT + WORK_OFFQ_FLAG_BITS,
+ WORK_OFFQ_POOL_SHIFT = WORK_OFFQ_DISABLE_SHIFT + WORK_OFFQ_DISABLE_BITS,
WORK_OFFQ_LEFT = BITS_PER_LONG - WORK_OFFQ_POOL_SHIFT,
WORK_OFFQ_POOL_BITS = WORK_OFFQ_LEFT <= 31 ? WORK_OFFQ_LEFT : 31,
};
@@ -98,6 +101,7 @@ enum wq_misc_consts {
/* Convenience constants - of type 'unsigned long', not 'enum'! */
#define WORK_OFFQ_CANCELING (1ul << WORK_OFFQ_CANCELING_BIT)
#define WORK_OFFQ_FLAG_MASK (((1ul << WORK_OFFQ_FLAG_BITS) - 1) << WORK_OFFQ_FLAG_SHIFT)
+#define WORK_OFFQ_DISABLE_MASK (((1ul << WORK_OFFQ_DISABLE_BITS) - 1) << WORK_OFFQ_DISABLE_SHIFT)
#define WORK_OFFQ_POOL_NONE ((1ul << WORK_OFFQ_POOL_BITS) - 1)
#define WORK_STRUCT_NO_POOL (WORK_OFFQ_POOL_NONE << WORK_OFFQ_POOL_SHIFT)
#define WORK_STRUCT_PWQ_MASK (~((1ul << WORK_STRUCT_PWQ_SHIFT) - 1))
@@ -556,6 +560,14 @@ extern bool flush_delayed_work(struct delayed_work *dwork);
extern bool cancel_delayed_work(struct delayed_work *dwork);
extern bool cancel_delayed_work_sync(struct delayed_work *dwork);
+extern bool disable_work(struct work_struct *work);
+extern bool disable_work_sync(struct work_struct *work);
+extern bool enable_work(struct work_struct *work);
+
+extern bool disable_delayed_work(struct delayed_work *dwork);
+extern bool disable_delayed_work_sync(struct delayed_work *dwork);
+extern bool enable_delayed_work(struct delayed_work *dwork);
+
extern bool flush_rcu_work(struct rcu_work *rwork);
extern void workqueue_set_max_active(struct workqueue_struct *wq,
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index e8f310aa16d6..87e40e6e14cb 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -98,6 +98,7 @@ enum worker_flags {
enum work_cancel_flags {
WORK_CANCEL_DELAYED = 1 << 0, /* canceling a delayed_work */
+ WORK_CANCEL_DISABLE = 1 << 1, /* canceling to disable */
};
enum wq_internal_consts {
@@ -393,6 +394,7 @@ struct wq_pod_type {
struct work_offq_data {
u32 pool_id;
+ u32 disable;
u32 flags;
};
@@ -903,12 +905,15 @@ static void work_offqd_unpack(struct work_offq_data *offqd, unsigned long data)
offqd->pool_id = shift_and_mask(data, WORK_OFFQ_POOL_SHIFT,
WORK_OFFQ_POOL_BITS);
+ offqd->disable = shift_and_mask(data, WORK_OFFQ_DISABLE_SHIFT,
+ WORK_OFFQ_DISABLE_BITS);
offqd->flags = data & WORK_OFFQ_FLAG_MASK;
}
static unsigned long work_offqd_pack_flags(struct work_offq_data *offqd)
{
- return (unsigned long)offqd->flags;
+ return ((unsigned long)offqd->disable << WORK_OFFQ_DISABLE_SHIFT) |
+ ((unsigned long)offqd->flags);
}
static bool work_is_canceling(struct work_struct *work)
@@ -2395,6 +2400,21 @@ static void __queue_work(int cpu, struct workqueue_struct *wq,
rcu_read_unlock();
}
+static bool clear_pending_if_disabled(struct work_struct *work)
+{
+ unsigned long data = *work_data_bits(work);
+ struct work_offq_data offqd;
+
+ if (likely((data & WORK_STRUCT_PWQ) ||
+ !(data & WORK_OFFQ_DISABLE_MASK)))
+ return false;
+
+ work_offqd_unpack(&offqd, data);
+ set_work_pool_and_clear_pending(work, offqd.pool_id,
+ work_offqd_pack_flags(&offqd));
+ return true;
+}
+
/**
* queue_work_on - queue work on specific cpu
* @cpu: CPU number to execute work on
@@ -2417,7 +2437,8 @@ bool queue_work_on(int cpu, struct workqueue_struct *wq,
local_irq_save(irq_flags);
- if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
+ if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)) &&
+ !clear_pending_if_disabled(work)) {
__queue_work(cpu, wq, work);
ret = true;
}
@@ -2577,7 +2598,8 @@ bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
/* read the comment in __queue_work() */
local_irq_save(irq_flags);
- if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
+ if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)) &&
+ !clear_pending_if_disabled(work)) {
__queue_delayed_work(cpu, wq, dwork, delay);
ret = true;
}
@@ -4173,20 +4195,46 @@ bool flush_rcu_work(struct rcu_work *rwork)
}
EXPORT_SYMBOL(flush_rcu_work);
+static void work_offqd_disable(struct work_offq_data *offqd)
+{
+ const unsigned long max = (1lu << WORK_OFFQ_DISABLE_BITS) - 1;
+
+ if (likely(offqd->disable < max))
+ offqd->disable++;
+ else
+ WARN_ONCE(true, "workqueue: work disable count overflowed\n");
+}
+
+static void work_offqd_enable(struct work_offq_data *offqd)
+{
+ if (likely(offqd->disable > 0))
+ offqd->disable--;
+ else
+ WARN_ONCE(true, "workqueue: work disable count underflowed\n");
+}
+
static bool __cancel_work(struct work_struct *work, u32 cflags)
{
struct work_offq_data offqd;
unsigned long irq_flags;
int ret;
- do {
- ret = try_to_grab_pending(work, cflags, &irq_flags);
- } while (unlikely(ret == -EAGAIN));
+ if (cflags & WORK_CANCEL_DISABLE) {
+ ret = work_grab_pending(work, cflags, &irq_flags);
+ } else {
+ do {
+ ret = try_to_grab_pending(work, cflags, &irq_flags);
+ } while (unlikely(ret == -EAGAIN));
- if (unlikely(ret < 0))
- return false;
+ if (unlikely(ret < 0))
+ return false;
+ }
work_offqd_unpack(&offqd, *work_data_bits(work));
+
+ if (cflags & WORK_CANCEL_DISABLE)
+ work_offqd_disable(&offqd);
+
set_work_pool_and_clear_pending(work, offqd.pool_id,
work_offqd_pack_flags(&offqd));
local_irq_restore(irq_flags);
@@ -4203,6 +4251,10 @@ static bool __cancel_work_sync(struct work_struct *work, u32 cflags)
ret = work_grab_pending(work, cflags, &irq_flags);
work_offqd_unpack(&offqd, *work_data_bits(work));
+
+ if (cflags & WORK_CANCEL_DISABLE)
+ work_offqd_disable(&offqd);
+
offqd.flags |= WORK_OFFQ_CANCELING;
set_work_pool_and_keep_pending(work, offqd.pool_id,
work_offqd_pack_flags(&offqd));
@@ -4302,6 +4354,105 @@ bool cancel_delayed_work_sync(struct delayed_work *dwork)
}
EXPORT_SYMBOL(cancel_delayed_work_sync);
+/**
+ * disable_work - Disable and cancel a work item
+ * @work: work item to disable
+ *
+ * Disable @work by incrementing its disable count and cancel it if currently
+ * pending. As long as the disable count is non-zero, any attempt to queue @work
+ * will fail and return %false. The maximum supported disable depth is 2 to the
+ * power of %WORK_OFFQ_DISABLE_BITS, currently 65536.
+ *
+ * Must be called from a sleepable context. Returns %true if @work was pending,
+ * %false otherwise.
+ */
+bool disable_work(struct work_struct *work)
+{
+ return __cancel_work(work, WORK_CANCEL_DISABLE);
+}
+EXPORT_SYMBOL_GPL(disable_work);
+
+/**
+ * disable_work_sync - Disable, cancel and drain a work item
+ * @work: work item to disable
+ *
+ * Similar to disable_work() but also wait for @work to finish if currently
+ * executing.
+ *
+ * Must be called from a sleepable context. Returns %true if @work was pending,
+ * %false otherwise.
+ */
+bool disable_work_sync(struct work_struct *work)
+{
+ return __cancel_work_sync(work, WORK_CANCEL_DISABLE);
+}
+EXPORT_SYMBOL_GPL(disable_work_sync);
+
+/**
+ * enable_work - Enable a work item
+ * @work: work item to enable
+ *
+ * Undo disable_work[_sync]() by decrementing @work's disable count. @work can
+ * only be queued if its disable count is 0.
+ *
+ * Must be called from a sleepable context. Returns %true if the disable count
+ * reached 0. Otherwise, %false.
+ */
+bool enable_work(struct work_struct *work)
+{
+ struct work_offq_data offqd;
+ unsigned long irq_flags;
+
+ work_grab_pending(work, 0, &irq_flags);
+
+ work_offqd_unpack(&offqd, *work_data_bits(work));
+ work_offqd_enable(&offqd);
+ set_work_pool_and_clear_pending(work, offqd.pool_id,
+ work_offqd_pack_flags(&offqd));
+ local_irq_enable();
+
+ return !offqd.disable;
+}
+EXPORT_SYMBOL_GPL(enable_work);
+
+/**
+ * disable_delayed_work - Disable and cancel a delayed work item
+ * @dwork: delayed work item to disable
+ *
+ * disable_work() for delayed work items.
+ */
+bool disable_delayed_work(struct delayed_work *dwork)
+{
+ return __cancel_work(&dwork->work,
+ WORK_CANCEL_DELAYED | WORK_CANCEL_DISABLE);
+}
+EXPORT_SYMBOL_GPL(disable_delayed_work);
+
+/**
+ * disable_delayed_work_sync - Disable, cancel and drain a delayed work item
+ * @dwork: delayed work item to disable
+ *
+ * disable_work_sync() for delayed work items.
+ */
+bool disable_delayed_work_sync(struct delayed_work *dwork)
+{
+ return __cancel_work_sync(&dwork->work,
+ WORK_CANCEL_DELAYED | WORK_CANCEL_DISABLE);
+}
+EXPORT_SYMBOL_GPL(disable_delayed_work_sync);
+
+/**
+ * enable_delayed_work - Enable a delayed work item
+ * @dwork: delayed work item to enable
+ *
+ * enable_work() for delayed work items.
+ */
+bool enable_delayed_work(struct delayed_work *dwork)
+{
+ return enable_work(&dwork->work);
+}
+EXPORT_SYMBOL_GPL(enable_delayed_work);
+
/**
* schedule_on_each_cpu - execute a function synchronously on each online CPU
* @func: the function to call
--
2.43.2
cancel[_delayed]_work_sync() guarantees that it can shut down
self-requeueing work items. To achieve that, it grabs and then holds
WORK_STRUCT_PENDING bit set while flushing the currently executing instance.
As the PENDING bit is set, all queueing attempts including the
self-requeueing ones fail and once the currently executing instance is
flushed, the work item should be idle as long as someone else isn't actively
queueing it.
This means that the cancel_work_sync path may hold the PENDING bit set while
flushing the target work item. This isn't a problem for the queueing path -
it can just fail which is the desired effect. It doesn't affect flush. It
doesn't matter to cancel_work either as it can just report that the work
item has successfully canceled. However, if there's another cancel_work_sync
attempt on the work item, it can't simply fail or report success and that
would breach the guarantee that it should provide. cancel_work_sync has to
wait for and grab that PENDING bit and go through the motions.
WORK_OFFQ_CANCELING and wq_cancel_waitq are what implement this
cancel_work_sync to cancel_work_sync wait mechanism. When a work item is
being canceled, WORK_OFFQ_CANCELING is also set on it and other
cancel_work_sync attempts wait on the bit to be cleared using the wait
queue.
While this works, it's an isolated wart which doesn't jive with the rest of
flush and cancel mechanisms and forces enable_work() and disable_work() to
require a sleepable context, which hampers their usability.
Now that a work item can be disabled, we can use that to block queueing
while cancel_work_sync is in progress. Instead of holding PENDING the bit,
it can temporarily disable the work item, flush and then re-enable it as
that'd achieve the same end result of blocking queueings while canceling and
thus enable canceling of self-requeueing work items.
- WORK_OFFQ_CANCELING and the surrounding mechanims are removed.
- work_grab_pending() is now simpler, no longer has to wait for a blocking
operation and thus can be called from any context.
- With work_grab_pending() simplified, no need to use try_to_grab_pending()
directly. All users are converted to use work_grab_pending().
- __cancel_work_sync() is updated to __cancel_work() with
WORK_CANCEL_DISABLE to cancel and plug racing queueing attempts. It then
flushes and re-enables the work item if necessary.
- These changes allow disable_work() and enable_work() to be called from any
context.
Signed-off-by: Tejun Heo <[email protected]>
---
include/linux/workqueue.h | 4 +-
kernel/workqueue.c | 139 +++++---------------------------------
2 files changed, 19 insertions(+), 124 deletions(-)
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index f25915e47efb..86483743ad28 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -52,10 +52,9 @@ enum work_bits {
*
* MSB
* [ pool ID ] [ disable depth ] [ OFFQ flags ] [ STRUCT flags ]
- * 16 bits 1 bit 4 or 5 bits
+ * 16 bits 0 bits 4 or 5 bits
*/
WORK_OFFQ_FLAG_SHIFT = WORK_STRUCT_FLAG_BITS,
- WORK_OFFQ_CANCELING_BIT = WORK_OFFQ_FLAG_SHIFT,
WORK_OFFQ_FLAG_END,
WORK_OFFQ_FLAG_BITS = WORK_OFFQ_FLAG_END - WORK_OFFQ_FLAG_SHIFT,
@@ -99,7 +98,6 @@ enum wq_misc_consts {
};
/* Convenience constants - of type 'unsigned long', not 'enum'! */
-#define WORK_OFFQ_CANCELING (1ul << WORK_OFFQ_CANCELING_BIT)
#define WORK_OFFQ_FLAG_MASK (((1ul << WORK_OFFQ_FLAG_BITS) - 1) << WORK_OFFQ_FLAG_SHIFT)
#define WORK_OFFQ_DISABLE_MASK (((1ul << WORK_OFFQ_DISABLE_BITS) - 1) << WORK_OFFQ_DISABLE_SHIFT)
#define WORK_OFFQ_POOL_NONE ((1ul << WORK_OFFQ_POOL_BITS) - 1)
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 87e40e6e14cb..e9d25e1f79d7 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -491,12 +491,6 @@ static struct workqueue_attrs *unbound_std_wq_attrs[NR_STD_WORKER_POOLS];
/* I: attributes used when instantiating ordered pools on demand */
static struct workqueue_attrs *ordered_wq_attrs[NR_STD_WORKER_POOLS];
-/*
- * Used to synchronize multiple cancel_sync attempts on the same work item. See
- * work_grab_pending() and __cancel_work_sync().
- */
-static DECLARE_WAIT_QUEUE_HEAD(wq_cancel_waitq);
-
/*
* I: kthread_worker to release pwq's. pwq release needs to be bounced to a
* process context while holding a pool lock. Bounce to a dedicated kthread
@@ -778,11 +772,6 @@ static int work_next_color(int color)
* corresponding to a work. Pool is available once the work has been
* queued anywhere after initialization until it is sync canceled. pwq is
* available only while the work item is queued.
- *
- * %WORK_OFFQ_CANCELING is used to mark a work item which is being
- * canceled. While being canceled, a work item may have its PENDING set
- * but stay off timer and worklist for arbitrarily long and nobody should
- * try to steal the PENDING bit.
*/
static inline void set_work_data(struct work_struct *work, unsigned long data)
{
@@ -916,13 +905,6 @@ static unsigned long work_offqd_pack_flags(struct work_offq_data *offqd)
((unsigned long)offqd->flags);
}
-static bool work_is_canceling(struct work_struct *work)
-{
- unsigned long data = atomic_long_read(&work->data);
-
- return !(data & WORK_STRUCT_PWQ) && (data & WORK_OFFQ_CANCELING);
-}
-
/*
* Policy functions. These define the policies on how the global worker
* pools are managed. Unless noted otherwise, these functions assume that
@@ -2047,8 +2029,6 @@ static void pwq_dec_nr_in_flight(struct pool_workqueue *pwq, unsigned long work_
* 1 if @work was pending and we successfully stole PENDING
* 0 if @work was idle and we claimed PENDING
* -EAGAIN if PENDING couldn't be grabbed at the moment, safe to busy-retry
- * -ENOENT if someone else is canceling @work, this state may persist
- * for arbitrarily long
* ======== ================================================================
*
* Note:
@@ -2144,26 +2124,9 @@ static int try_to_grab_pending(struct work_struct *work, u32 cflags,
fail:
rcu_read_unlock();
local_irq_restore(*irq_flags);
- if (work_is_canceling(work))
- return -ENOENT;
- cpu_relax();
return -EAGAIN;
}
-struct cwt_wait {
- wait_queue_entry_t wait;
- struct work_struct *work;
-};
-
-static int cwt_wakefn(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
-{
- struct cwt_wait *cwait = container_of(wait, struct cwt_wait, wait);
-
- if (cwait->work != key)
- return 0;
- return autoremove_wake_function(wait, mode, sync, key);
-}
-
/**
* work_grab_pending - steal work item from worklist and disable irq
* @work: work item to steal
@@ -2180,41 +2143,14 @@ static int cwt_wakefn(wait_queue_entry_t *wait, unsigned mode, int sync, void *k
static bool work_grab_pending(struct work_struct *work, u32 cflags,
unsigned long *irq_flags)
{
- struct cwt_wait cwait;
int ret;
- might_sleep();
-repeat:
- ret = try_to_grab_pending(work, cflags, irq_flags);
- if (likely(ret >= 0))
- return ret;
- if (ret != -ENOENT)
- goto repeat;
-
- /*
- * Someone is already canceling. Wait for it to finish. flush_work()
- * doesn't work for PREEMPT_NONE because we may get woken up between
- * @work's completion and the other canceling task resuming and clearing
- * CANCELING - flush_work() will return false immediately as @work is no
- * longer busy, try_to_grab_pending() will return -ENOENT as @work is
- * still being canceled and the other canceling task won't be able to
- * clear CANCELING as we're hogging the CPU.
- *
- * Let's wait for completion using a waitqueue. As this may lead to the
- * thundering herd problem, use a custom wake function which matches
- * @work along with exclusive wait and wakeup.
- */
- init_wait(&cwait.wait);
- cwait.wait.func = cwt_wakefn;
- cwait.work = work;
-
- prepare_to_wait_exclusive(&wq_cancel_waitq, &cwait.wait,
- TASK_UNINTERRUPTIBLE);
- if (work_is_canceling(work))
- schedule();
- finish_wait(&wq_cancel_waitq, &cwait.wait);
-
- goto repeat;
+ while (true) {
+ ret = try_to_grab_pending(work, cflags, irq_flags);
+ if (ret >= 0)
+ return ret;
+ cpu_relax();
+ }
}
/**
@@ -2631,19 +2567,13 @@ bool mod_delayed_work_on(int cpu, struct workqueue_struct *wq,
struct delayed_work *dwork, unsigned long delay)
{
unsigned long irq_flags;
- int ret;
+ bool ret;
- do {
- ret = try_to_grab_pending(&dwork->work, WORK_CANCEL_DELAYED,
- &irq_flags);
- } while (unlikely(ret == -EAGAIN));
+ ret = work_grab_pending(&dwork->work, WORK_CANCEL_DELAYED, &irq_flags);
- if (likely(ret >= 0)) {
- __queue_delayed_work(cpu, wq, dwork, delay);
- local_irq_restore(irq_flags);
- }
+ __queue_delayed_work(cpu, wq, dwork, delay);
- /* -ENOENT from try_to_grab_pending() becomes %true */
+ local_irq_restore(irq_flags);
return ret;
}
EXPORT_SYMBOL_GPL(mod_delayed_work_on);
@@ -4219,16 +4149,7 @@ static bool __cancel_work(struct work_struct *work, u32 cflags)
unsigned long irq_flags;
int ret;
- if (cflags & WORK_CANCEL_DISABLE) {
- ret = work_grab_pending(work, cflags, &irq_flags);
- } else {
- do {
- ret = try_to_grab_pending(work, cflags, &irq_flags);
- } while (unlikely(ret == -EAGAIN));
-
- if (unlikely(ret < 0))
- return false;
- }
+ ret = work_grab_pending(work, cflags, &irq_flags);
work_offqd_unpack(&offqd, *work_data_bits(work));
@@ -4243,22 +4164,9 @@ static bool __cancel_work(struct work_struct *work, u32 cflags)
static bool __cancel_work_sync(struct work_struct *work, u32 cflags)
{
- struct work_offq_data offqd;
- unsigned long irq_flags;
bool ret;
- /* claim @work and tell other tasks trying to grab @work to back off */
- ret = work_grab_pending(work, cflags, &irq_flags);
-
- work_offqd_unpack(&offqd, *work_data_bits(work));
-
- if (cflags & WORK_CANCEL_DISABLE)
- work_offqd_disable(&offqd);
-
- offqd.flags |= WORK_OFFQ_CANCELING;
- set_work_pool_and_keep_pending(work, offqd.pool_id,
- work_offqd_pack_flags(&offqd));
- local_irq_restore(irq_flags);
+ ret = __cancel_work(work, cflags | WORK_CANCEL_DISABLE);
/*
* Skip __flush_work() during early boot when we know that @work isn't
@@ -4267,19 +4175,8 @@ static bool __cancel_work_sync(struct work_struct *work, u32 cflags)
if (wq_online)
__flush_work(work, true);
- work_offqd_unpack(&offqd, *work_data_bits(work));
-
- /*
- * smp_mb() at the end of set_work_pool_and_clear_pending() is paired
- * with prepare_to_wait() above so that either waitqueue_active() is
- * visible here or !work_is_canceling() is visible there.
- */
- offqd.flags &= ~WORK_OFFQ_CANCELING;
- set_work_pool_and_clear_pending(work, WORK_OFFQ_POOL_NONE,
- work_offqd_pack_flags(&offqd));
-
- if (waitqueue_active(&wq_cancel_waitq))
- __wake_up(&wq_cancel_waitq, TASK_NORMAL, 1, work);
+ if (!(cflags & WORK_CANCEL_DISABLE))
+ enable_work(work);
return ret;
}
@@ -4363,8 +4260,8 @@ EXPORT_SYMBOL(cancel_delayed_work_sync);
* will fail and return %false. The maximum supported disable depth is 2 to the
* power of %WORK_OFFQ_DISABLE_BITS, currently 65536.
*
- * Must be called from a sleepable context. Returns %true if @work was pending,
- * %false otherwise.
+ * Can be called from any context. Returns %true if @work was pending, %false
+ * otherwise.
*/
bool disable_work(struct work_struct *work)
{
@@ -4395,8 +4292,8 @@ EXPORT_SYMBOL_GPL(disable_work_sync);
* Undo disable_work[_sync]() by decrementing @work's disable count. @work can
* only be queued if its disable count is 0.
*
- * Must be called from a sleepable context. Returns %true if the disable count
- * reached 0. Otherwise, %false.
+ * Can be called from any context. Returns %true if the disable count reached 0.
+ * Otherwise, %false.
*/
bool enable_work(struct work_struct *work)
{
--
2.43.2
Add an off-queue flag, WORK_OFFQ_BH, that indicates whether the last
workqueue the work item was on was a BH one. This will be used to sanity
check the context cancel_sync operations can be called from for the work
item.
Signed-off-by: Tejun Heo <[email protected]>
---
include/linux/workqueue.h | 4 +++-
kernel/workqueue.c | 10 ++++++++--
2 files changed, 11 insertions(+), 3 deletions(-)
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 86483743ad28..7710cd52f7f0 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -52,9 +52,10 @@ enum work_bits {
*
* MSB
* [ pool ID ] [ disable depth ] [ OFFQ flags ] [ STRUCT flags ]
- * 16 bits 0 bits 4 or 5 bits
+ * 16 bits 1 bit 4 or 5 bits
*/
WORK_OFFQ_FLAG_SHIFT = WORK_STRUCT_FLAG_BITS,
+ WORK_OFFQ_BH_BIT = WORK_OFFQ_FLAG_SHIFT,
WORK_OFFQ_FLAG_END,
WORK_OFFQ_FLAG_BITS = WORK_OFFQ_FLAG_END - WORK_OFFQ_FLAG_SHIFT,
@@ -98,6 +99,7 @@ enum wq_misc_consts {
};
/* Convenience constants - of type 'unsigned long', not 'enum'! */
+#define WORK_OFFQ_BH (1ul << WORK_OFFQ_BH_BIT)
#define WORK_OFFQ_FLAG_MASK (((1ul << WORK_OFFQ_FLAG_BITS) - 1) << WORK_OFFQ_FLAG_SHIFT)
#define WORK_OFFQ_DISABLE_MASK (((1ul << WORK_OFFQ_DISABLE_BITS) - 1) << WORK_OFFQ_DISABLE_SHIFT)
#define WORK_OFFQ_POOL_NONE ((1ul << WORK_OFFQ_POOL_BITS) - 1)
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index e9d25e1f79d7..6a2abc81ae2b 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -759,6 +759,11 @@ static int work_next_color(int color)
return (color + 1) % WORK_NR_COLORS;
}
+static unsigned long pool_offq_flags(struct worker_pool *pool)
+{
+ return (pool->flags & POOL_BH) ? WORK_OFFQ_BH : 0;
+}
+
/*
* While queued, %WORK_STRUCT_PWQ is set and non flag bits of a work's data
* contain the pointer to the queued pwq. Once execution starts, the flag
@@ -2111,7 +2116,8 @@ static int try_to_grab_pending(struct work_struct *work, u32 cflags,
* this destroys work->data needed by the next step, stash it.
*/
work_data = *work_data_bits(work);
- set_work_pool_and_keep_pending(work, pool->id, 0);
+ set_work_pool_and_keep_pending(work, pool->id,
+ pool_offq_flags(pool));
/* must be the last step, see the function comment */
pwq_dec_nr_in_flight(pwq, work_data);
@@ -3154,7 +3160,7 @@ __acquires(&pool->lock)
* PENDING and queued state changes happen together while IRQ is
* disabled.
*/
- set_work_pool_and_clear_pending(work, pool->id, 0);
+ set_work_pool_and_clear_pending(work, pool->id, pool_offq_flags(pool));
pwq->stats[PWQ_STAT_STARTED]++;
raw_spin_unlock_irq(&pool->lock);
--
2.43.2
In prepartion of in-BH canceling of BH work items, update start_flush_work()
so that:
- rcu_read_lock()'ing is moved to the caller.
- Instead of true or false, it now returns the worker_pool associated with
the work item if the work item needs to be waited for. NULL if waiting is
not needed.
- Add a WARN if it encounters a queued work item when @from_cancel. This
shouldn't happen.
No behavior changes are intended.
Signed-off-by: Tejun Heo <[email protected]>
---
kernel/workqueue.c | 39 ++++++++++++++++++++++-----------------
1 file changed, 22 insertions(+), 17 deletions(-)
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 6a2abc81ae2b..f6ea25628701 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -3995,8 +3995,9 @@ void drain_workqueue(struct workqueue_struct *wq)
}
EXPORT_SYMBOL_GPL(drain_workqueue);
-static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr,
- bool from_cancel)
+static struct worker_pool *start_flush_work(struct work_struct *work,
+ struct wq_barrier *barr,
+ bool from_cancel)
{
struct worker *worker = NULL;
struct worker_pool *pool;
@@ -4005,12 +4006,9 @@ static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr,
might_sleep();
- rcu_read_lock();
pool = get_work_pool(work);
- if (!pool) {
- rcu_read_unlock();
- return false;
- }
+ if (!pool)
+ return NULL;
raw_spin_lock_irq(&pool->lock);
/* see the comment in try_to_grab_pending() with the same code */
@@ -4018,6 +4016,12 @@ static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr,
if (pwq) {
if (unlikely(pwq->pool != pool))
goto already_gone;
+ /*
+ * Cancel path should already have removed @work from worklist
+ * in try_to_grab_pending(). Control should get here iff we need
+ * to wait for the current execution to finish.
+ */
+ WARN_ON_ONCE(from_cancel);
} else {
worker = find_worker_executing_work(pool, work);
if (!worker)
@@ -4045,17 +4049,16 @@ static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr,
if (!from_cancel && (wq->saved_max_active == 1 || wq->rescuer))
touch_wq_lockdep_map(wq);
- rcu_read_unlock();
- return true;
+ return pool;
already_gone:
raw_spin_unlock_irq(&pool->lock);
- rcu_read_unlock();
- return false;
+ return NULL;
}
static bool __flush_work(struct work_struct *work, bool from_cancel)
{
struct wq_barrier barr;
+ struct worker_pool *pool;
if (WARN_ON(!wq_online))
return false;
@@ -4063,13 +4066,15 @@ static bool __flush_work(struct work_struct *work, bool from_cancel)
if (WARN_ON(!work->func))
return false;
- if (start_flush_work(work, &barr, from_cancel)) {
- wait_for_completion(&barr.done);
- destroy_work_on_stack(&barr.work);
- return true;
- } else {
+ rcu_read_lock();
+ pool = start_flush_work(work, &barr, from_cancel);
+ rcu_read_unlock();
+ if (!pool)
return false;
- }
+
+ wait_for_completion(&barr.done);
+ destroy_work_on_stack(&barr.work);
+ return true;
}
/**
--
2.43.2
tasklet is being replaced by BH workqueue. No noticeable behavior or
performance changes are expected. The following is how the two APIs map:
- tasklet_setup/init() -> INIT_WORK()
- tasklet_schedule() -> queue_work(system_bh_wq, ...)
- tasklet_hi_schedule() -> queue_work(system_bh_highpri_wq, ...)
- tasklet_disable_nosync() -> disable_work()
- tasklet_disable[_in_atomic]() -> disable_work_sync()
- tasklet_enable() -> enable_work() + queue_work()
- tasklet_kill() -> cancel_work_sync()
Note that unlike tasklet_enable(), enable_work() doesn't queue the work item
automatically according to whether the work item was queued while disabled.
While the caller can track this separately, unconditionally scheduling the
work item after enable_work() returns %true should work for most users.
r8152 conversion has been tested by repeatedly forcing the device to go
through resets using usbreset under iperf3 generated traffic.
Signed-off-by: Tejun Heo <[email protected]>
---
drivers/net/usb/r8152.c | 44 ++++++++++++++++++++++-------------------
1 file changed, 24 insertions(+), 20 deletions(-)
diff --git a/drivers/net/usb/r8152.c b/drivers/net/usb/r8152.c
index 9bf2140fd0a1..24e284b9eb38 100644
--- a/drivers/net/usb/r8152.c
+++ b/drivers/net/usb/r8152.c
@@ -882,7 +882,7 @@ struct r8152 {
#ifdef CONFIG_PM_SLEEP
struct notifier_block pm_notifier;
#endif
- struct tasklet_struct tx_tl;
+ struct work_struct tx_work;
struct rtl_ops {
void (*init)(struct r8152 *tp);
@@ -1948,7 +1948,7 @@ static void write_bulk_callback(struct urb *urb)
return;
if (!skb_queue_empty(&tp->tx_queue))
- tasklet_schedule(&tp->tx_tl);
+ queue_work(system_bh_wq, &tp->tx_work);
}
static void intr_callback(struct urb *urb)
@@ -2746,9 +2746,9 @@ static void tx_bottom(struct r8152 *tp)
} while (res == 0);
}
-static void bottom_half(struct tasklet_struct *t)
+static void bottom_half(struct work_struct *work)
{
- struct r8152 *tp = from_tasklet(tp, t, tx_tl);
+ struct r8152 *tp = container_of(work, struct r8152, tx_work);
if (test_bit(RTL8152_INACCESSIBLE, &tp->flags))
return;
@@ -2942,7 +2942,7 @@ static netdev_tx_t rtl8152_start_xmit(struct sk_buff *skb,
schedule_delayed_work(&tp->schedule, 0);
} else {
usb_mark_last_busy(tp->udev);
- tasklet_schedule(&tp->tx_tl);
+ queue_work(system_bh_wq, &tp->tx_work);
}
} else if (skb_queue_len(&tp->tx_queue) > tp->tx_qlen) {
netif_stop_queue(netdev);
@@ -6824,11 +6824,12 @@ static void set_carrier(struct r8152 *tp)
} else {
if (netif_carrier_ok(netdev)) {
netif_carrier_off(netdev);
- tasklet_disable(&tp->tx_tl);
+ disable_work_sync(&tp->tx_work);
napi_disable(napi);
tp->rtl_ops.disable(tp);
napi_enable(napi);
- tasklet_enable(&tp->tx_tl);
+ enable_work(&tp->tx_work);
+ queue_work(system_bh_wq, &tp->tx_work);
netif_info(tp, link, netdev, "carrier off\n");
}
}
@@ -6864,7 +6865,7 @@ static void rtl_work_func_t(struct work_struct *work)
/* don't schedule tasket before linking */
if (test_and_clear_bit(SCHEDULE_TASKLET, &tp->flags) &&
netif_carrier_ok(tp->netdev))
- tasklet_schedule(&tp->tx_tl);
+ queue_work(system_bh_wq, &tp->tx_work);
if (test_and_clear_bit(RX_EPROTO, &tp->flags) &&
!list_empty(&tp->rx_done))
@@ -6971,7 +6972,7 @@ static int rtl8152_open(struct net_device *netdev)
goto out_unlock;
}
napi_enable(&tp->napi);
- tasklet_enable(&tp->tx_tl);
+ enable_work(&tp->tx_work);
mutex_unlock(&tp->control);
@@ -6999,7 +7000,7 @@ static int rtl8152_close(struct net_device *netdev)
#ifdef CONFIG_PM_SLEEP
unregister_pm_notifier(&tp->pm_notifier);
#endif
- tasklet_disable(&tp->tx_tl);
+ disable_work_sync(&tp->tx_work);
clear_bit(WORK_ENABLE, &tp->flags);
usb_kill_urb(tp->intr_urb);
cancel_delayed_work_sync(&tp->schedule);
@@ -8421,7 +8422,7 @@ static int rtl8152_pre_reset(struct usb_interface *intf)
return 0;
netif_stop_queue(netdev);
- tasklet_disable(&tp->tx_tl);
+ disable_work_sync(&tp->tx_work);
clear_bit(WORK_ENABLE, &tp->flags);
usb_kill_urb(tp->intr_urb);
cancel_delayed_work_sync(&tp->schedule);
@@ -8466,7 +8467,8 @@ static int rtl8152_post_reset(struct usb_interface *intf)
}
napi_enable(&tp->napi);
- tasklet_enable(&tp->tx_tl);
+ enable_work(&tp->tx_work);
+ queue_work(system_bh_wq, &tp->tx_work);
netif_wake_queue(netdev);
usb_submit_urb(tp->intr_urb, GFP_KERNEL);
@@ -8625,12 +8627,13 @@ static int rtl8152_system_suspend(struct r8152 *tp)
clear_bit(WORK_ENABLE, &tp->flags);
usb_kill_urb(tp->intr_urb);
- tasklet_disable(&tp->tx_tl);
+ disable_work_sync(&tp->tx_work);
napi_disable(napi);
cancel_delayed_work_sync(&tp->schedule);
tp->rtl_ops.down(tp);
napi_enable(napi);
- tasklet_enable(&tp->tx_tl);
+ enable_work(&tp->tx_work);
+ queue_work(system_bh_wq, &tp->tx_work);
}
return 0;
@@ -9387,11 +9390,12 @@ static int rtl8152_change_mtu(struct net_device *dev, int new_mtu)
if (netif_carrier_ok(dev)) {
netif_stop_queue(dev);
napi_disable(&tp->napi);
- tasklet_disable(&tp->tx_tl);
+ disable_work_sync(&tp->tx_work);
tp->rtl_ops.disable(tp);
tp->rtl_ops.enable(tp);
rtl_start_rx(tp);
- tasklet_enable(&tp->tx_tl);
+ enable_work(&tp->tx_work);
+ queue_work(system_bh_wq, &tp->tx_work);
napi_enable(&tp->napi);
rtl8152_set_rx_mode(dev);
netif_wake_queue(dev);
@@ -9819,8 +9823,8 @@ static int rtl8152_probe_once(struct usb_interface *intf,
mutex_init(&tp->control);
INIT_DELAYED_WORK(&tp->schedule, rtl_work_func_t);
INIT_DELAYED_WORK(&tp->hw_phy_work, rtl_hw_phy_work_func_t);
- tasklet_setup(&tp->tx_tl, bottom_half);
- tasklet_disable(&tp->tx_tl);
+ INIT_WORK(&tp->tx_work, bottom_half);
+ disable_work(&tp->tx_work);
netdev->netdev_ops = &rtl8152_netdev_ops;
netdev->watchdog_timeo = RTL8152_TX_TIMEOUT;
@@ -9954,7 +9958,7 @@ static int rtl8152_probe_once(struct usb_interface *intf,
unregister_netdev(netdev);
out1:
- tasklet_kill(&tp->tx_tl);
+ cancel_work_sync(&tp->tx_work);
cancel_delayed_work_sync(&tp->hw_phy_work);
if (tp->rtl_ops.unload)
tp->rtl_ops.unload(tp);
@@ -10010,7 +10014,7 @@ static void rtl8152_disconnect(struct usb_interface *intf)
rtl_set_unplug(tp);
unregister_netdev(tp->netdev);
- tasklet_kill(&tp->tx_tl);
+ cancel_work_sync(&tp->tx_work);
cancel_delayed_work_sync(&tp->hw_phy_work);
if (tp->rtl_ops.unload)
tp->rtl_ops.unload(tp);
--
2.43.2
Now that work_grab_pending() can always grab the PENDING bit without
sleeping, the only thing that prevents allowing cancel_work_sync() of a BH
work item from an atomic context is the flushing of the in-flight instance.
When we're flushing a BH work item for cancel_work_sync(), we know that the
work item is not queued and must be executing in a BH context, which means
that it's safe to busy-wait for its completion from a non-hardirq atomic
context.
This patch updates __flush_work() so that it busy-waits when flushing a BH
work item for cancel_work_sync(). might_sleep() is pushed from
start_flush_work() to its callers - when operating on a BH work item,
__cancel_work_sync() now enforces !in_hardirq() instead of might_sleep().
This allows cancel_work_sync() and disable_work() to be called from
non-hardirq atomic contexts on BH work items.
Signed-off-by: Tejun Heo <[email protected]>
---
kernel/workqueue.c | 62 +++++++++++++++++++++++++++++++++++-----------
1 file changed, 47 insertions(+), 15 deletions(-)
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index f6ea25628701..00eac314e62a 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -4004,8 +4004,6 @@ static struct worker_pool *start_flush_work(struct work_struct *work,
struct pool_workqueue *pwq;
struct workqueue_struct *wq;
- might_sleep();
-
pool = get_work_pool(work);
if (!pool)
return NULL;
@@ -4072,7 +4070,32 @@ static bool __flush_work(struct work_struct *work, bool from_cancel)
if (!pool)
return false;
- wait_for_completion(&barr.done);
+ if ((pool->flags & POOL_BH) && from_cancel) {
+ /*
+ * We're flushing a BH work item which is being canceled. It
+ * must have been executing during start_flush_work() and can't
+ * currently be queued. If @work is still executing, we know it
+ * is running in the BH context and thus can be busy-waited.
+ *
+ * On RT, prevent a live lock when current preempted soft
+ * interrupt processing or prevents ksoftirqd from running by
+ * keeping flipping BH. If the tasklet runs on a different CPU
+ * then this has no effect other than doing the BH
+ * disable/enable dance for nothing. This is copied from
+ * kernel/softirq.c::tasklet_unlock_spin_wait().
+ */
+ while (!try_wait_for_completion(&barr.done)) {
+ if (IS_ENABLED(CONFIG_PREEMPT_RT)) {
+ local_bh_disable();
+ local_bh_enable();
+ } else {
+ cpu_relax();
+ }
+ }
+ } else {
+ wait_for_completion(&barr.done);
+ }
+
destroy_work_on_stack(&barr.work);
return true;
}
@@ -4090,6 +4113,7 @@ static bool __flush_work(struct work_struct *work, bool from_cancel)
*/
bool flush_work(struct work_struct *work)
{
+ might_sleep();
return __flush_work(work, false);
}
EXPORT_SYMBOL_GPL(flush_work);
@@ -4179,6 +4203,11 @@ static bool __cancel_work_sync(struct work_struct *work, u32 cflags)
ret = __cancel_work(work, cflags | WORK_CANCEL_DISABLE);
+ if (*work_data_bits(work) & WORK_OFFQ_BH)
+ WARN_ON_ONCE(in_hardirq());
+ else
+ might_sleep();
+
/*
* Skip __flush_work() during early boot when we know that @work isn't
* executing. This allows canceling during early boot.
@@ -4205,19 +4234,19 @@ EXPORT_SYMBOL(cancel_work);
* cancel_work_sync - cancel a work and wait for it to finish
* @work: the work to cancel
*
- * Cancel @work and wait for its execution to finish. This function
- * can be used even if the work re-queues itself or migrates to
- * another workqueue. On return from this function, @work is
- * guaranteed to be not pending or executing on any CPU.
+ * Cancel @work and wait for its execution to finish. This function can be used
+ * even if the work re-queues itself or migrates to another workqueue. On return
+ * from this function, @work is guaranteed to be not pending or executing on any
+ * CPU as long as there aren't racing enqueues.
*
- * cancel_work_sync(&delayed_work->work) must not be used for
- * delayed_work's. Use cancel_delayed_work_sync() instead.
+ * cancel_work_sync(&delayed_work->work) must not be used for delayed_work's.
+ * Use cancel_delayed_work_sync() instead.
*
- * The caller must ensure that the workqueue on which @work was last
- * queued can't be destroyed before this function returns.
+ * Must be called from a sleepable context if @work was last queued on a non-BH
+ * workqueue. Can also be called from non-hardirq atomic contexts including BH
+ * if @work was last queued on a BH workqueue.
*
- * Return:
- * %true if @work was pending, %false otherwise.
+ * Returns %true if @work was pending, %false otherwise.
*/
bool cancel_work_sync(struct work_struct *work)
{
@@ -4287,8 +4316,11 @@ EXPORT_SYMBOL_GPL(disable_work);
* Similar to disable_work() but also wait for @work to finish if currently
* executing.
*
- * Must be called from a sleepable context. Returns %true if @work was pending,
- * %false otherwise.
+ * Must be called from a sleepable context if @work was last queued on a non-BH
+ * workqueue. Can also be called from non-hardirq atomic contexts including BH
+ * if @work was last queued on a BH workqueue.
+ *
+ * Returns %true if @work was pending, %false otherwise.
*/
bool disable_work_sync(struct work_struct *work)
{
--
2.43.2
The different flavors of RCU read critical sections have been unified. Let's
update the locking assertion macros accordingly to avoid requiring
unnecessary explicit rcu_read_[un]lock() calls.
Signed-off-by: Tejun Heo <[email protected]>
---
kernel/workqueue.c | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index b280caf81fb2..87750e70b638 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -515,12 +515,12 @@ static void show_one_worker_pool(struct worker_pool *pool);
#include <trace/events/workqueue.h>
#define assert_rcu_or_pool_mutex() \
- RCU_LOCKDEP_WARN(!rcu_read_lock_held() && \
+ RCU_LOCKDEP_WARN(!rcu_read_lock_any_held() && \
!lockdep_is_held(&wq_pool_mutex), \
"RCU or wq_pool_mutex should be held")
#define assert_rcu_or_wq_mutex_or_pool_mutex(wq) \
- RCU_LOCKDEP_WARN(!rcu_read_lock_held() && \
+ RCU_LOCKDEP_WARN(!rcu_read_lock_any_held() && \
!lockdep_is_held(&wq->mutex) && \
!lockdep_is_held(&wq_pool_mutex), \
"RCU, wq->mutex or wq_pool_mutex should be held")
--
2.43.2
The cancel path used bool @is_dwork to distinguish canceling a regular work
and a delayed one. The planned disable/enable support will need passing
around another flag in the code path. As passing them around with bools will
be confusing, let's introduce named flags to pass around in the cancel path.
WORK_CANCEL_DELAYED replaces @is_dwork. No functional changes.
Signed-off-by: Tejun Heo <[email protected]>
---
kernel/workqueue.c | 29 +++++++++++++++++------------
1 file changed, 17 insertions(+), 12 deletions(-)
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index b590d93d054b..317c85f051b0 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -96,6 +96,10 @@ enum worker_flags {
WORKER_UNBOUND | WORKER_REBOUND,
};
+enum work_cancel_flags {
+ WORK_CANCEL_DELAYED = 1 << 0, /* canceling a delayed_work */
+};
+
enum wq_internal_consts {
NR_STD_WORKER_POOLS = 2, /* # standard pools per cpu */
@@ -2028,7 +2032,7 @@ static void pwq_dec_nr_in_flight(struct pool_workqueue *pwq, unsigned long work_
/**
* try_to_grab_pending - steal work item from worklist and disable irq
* @work: work item to steal
- * @is_dwork: @work is a delayed_work
+ * @cflags: %WORK_CANCEL_ flags
* @irq_flags: place to store irq state
*
* Try to grab PENDING bit of @work. This function can handle @work in any
@@ -2055,7 +2059,7 @@ static void pwq_dec_nr_in_flight(struct pool_workqueue *pwq, unsigned long work_
*
* This function is safe to call from any context including IRQ handler.
*/
-static int try_to_grab_pending(struct work_struct *work, bool is_dwork,
+static int try_to_grab_pending(struct work_struct *work, u32 cflags,
unsigned long *irq_flags)
{
struct worker_pool *pool;
@@ -2064,7 +2068,7 @@ static int try_to_grab_pending(struct work_struct *work, bool is_dwork,
local_irq_save(*irq_flags);
/* try to steal the timer if it exists */
- if (is_dwork) {
+ if (cflags & WORK_CANCEL_DELAYED) {
struct delayed_work *dwork = to_delayed_work(work);
/*
@@ -2543,7 +2547,8 @@ bool mod_delayed_work_on(int cpu, struct workqueue_struct *wq,
int ret;
do {
- ret = try_to_grab_pending(&dwork->work, true, &irq_flags);
+ ret = try_to_grab_pending(&dwork->work, WORK_CANCEL_DELAYED,
+ &irq_flags);
} while (unlikely(ret == -EAGAIN));
if (likely(ret >= 0)) {
@@ -4103,13 +4108,13 @@ bool flush_rcu_work(struct rcu_work *rwork)
}
EXPORT_SYMBOL(flush_rcu_work);
-static bool __cancel_work(struct work_struct *work, bool is_dwork)
+static bool __cancel_work(struct work_struct *work, u32 cflags)
{
unsigned long irq_flags;
int ret;
do {
- ret = try_to_grab_pending(work, is_dwork, &irq_flags);
+ ret = try_to_grab_pending(work, cflags, &irq_flags);
} while (unlikely(ret == -EAGAIN));
if (unlikely(ret < 0))
@@ -4134,14 +4139,14 @@ static int cwt_wakefn(wait_queue_entry_t *wait, unsigned mode, int sync, void *k
return autoremove_wake_function(wait, mode, sync, key);
}
-static bool __cancel_work_sync(struct work_struct *work, bool is_dwork)
+static bool __cancel_work_sync(struct work_struct *work, u32 cflags)
{
static DECLARE_WAIT_QUEUE_HEAD(cancel_waitq);
unsigned long irq_flags;
int ret;
do {
- ret = try_to_grab_pending(work, is_dwork, &irq_flags);
+ ret = try_to_grab_pending(work, cflags, &irq_flags);
/*
* If someone else is already canceling, wait for it to
* finish. flush_work() doesn't work for PREEMPT_NONE
@@ -4203,7 +4208,7 @@ static bool __cancel_work_sync(struct work_struct *work, bool is_dwork)
*/
bool cancel_work(struct work_struct *work)
{
- return __cancel_work(work, false);
+ return __cancel_work(work, 0);
}
EXPORT_SYMBOL(cancel_work);
@@ -4227,7 +4232,7 @@ EXPORT_SYMBOL(cancel_work);
*/
bool cancel_work_sync(struct work_struct *work)
{
- return __cancel_work_sync(work, false);
+ return __cancel_work_sync(work, 0);
}
EXPORT_SYMBOL_GPL(cancel_work_sync);
@@ -4249,7 +4254,7 @@ EXPORT_SYMBOL_GPL(cancel_work_sync);
*/
bool cancel_delayed_work(struct delayed_work *dwork)
{
- return __cancel_work(&dwork->work, true);
+ return __cancel_work(&dwork->work, WORK_CANCEL_DELAYED);
}
EXPORT_SYMBOL(cancel_delayed_work);
@@ -4264,7 +4269,7 @@ EXPORT_SYMBOL(cancel_delayed_work);
*/
bool cancel_delayed_work_sync(struct delayed_work *dwork)
{
- return __cancel_work_sync(&dwork->work, true);
+ return __cancel_work_sync(&dwork->work, WORK_CANCEL_DELAYED);
}
EXPORT_SYMBOL(cancel_delayed_work_sync);
--
2.43.2
Hello, Tejun
On Sat, Feb 17, 2024 at 2:06 AM Tejun Heo <[email protected]> wrote:
> - A work item carries 10bit disable count in work->data while not queued.
> The access to the count is synchronized by the PENDING bit like all other
> parts of work->data.
It is 16bit disable count in the code.
> @@ -2417,7 +2437,8 @@ bool queue_work_on(int cpu, struct workqueue_struct *wq,
>
> local_irq_save(irq_flags);
>
> - if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
> + if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)) &&
> + !clear_pending_if_disabled(work)) {
> __queue_work(cpu, wq, work);
> ret = true;
> }
> @@ -2577,7 +2598,8 @@ bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
> /* read the comment in __queue_work() */
> local_irq_save(irq_flags);
>
> - if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
> + if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)) &&
> + !clear_pending_if_disabled(work)) {
> __queue_delayed_work(cpu, wq, dwork, delay);
> ret = true;
> }
It misses the same handling at queue_work_node() and queue_rcu_work().
But it is bad idea to cancel or disable rcu work since it can be possibly
in the rcu's waiting list.
> @@ -4173,20 +4195,46 @@ bool flush_rcu_work(struct rcu_work *rwork)
> }
> EXPORT_SYMBOL(flush_rcu_work);
>
Thanks
Lai
Hello, Tejun
On Sat, Feb 17, 2024 at 2:06 AM Tejun Heo <[email protected]> wrote:
> @@ -2631,19 +2567,13 @@ bool mod_delayed_work_on(int cpu, struct workqueue_struct *wq,
> struct delayed_work *dwork, unsigned long delay)
> {
> unsigned long irq_flags;
> - int ret;
> + bool ret;
>
> - do {
> - ret = try_to_grab_pending(&dwork->work, WORK_CANCEL_DELAYED,
> - &irq_flags);
> - } while (unlikely(ret == -EAGAIN));
> + ret = work_grab_pending(&dwork->work, WORK_CANCEL_DELAYED, &irq_flags);
>
> - if (likely(ret >= 0)) {
> - __queue_delayed_work(cpu, wq, dwork, delay);
> - local_irq_restore(irq_flags);
> - }
> + __queue_delayed_work(cpu, wq, dwork, delay);
The disable count has to be checked before queuing it.
Thanks
Lai
>
> - /* -ENOENT from try_to_grab_pending() becomes %true */
> + local_irq_restore(irq_flags);
> return ret;
> }
> EXPORT_SYMBOL_GPL(mod_delayed_work_on);
Hello, Tejun
On Sat, Feb 17, 2024 at 2:06 AM Tejun Heo <[email protected]> wrote:
> @@ -4072,7 +4070,32 @@ static bool __flush_work(struct work_struct *work, bool from_cancel)
> if (!pool)
> return false;
>
> - wait_for_completion(&barr.done);
> + if ((pool->flags & POOL_BH) && from_cancel) {
pool pointer might be invalid here, please check POOL_BH before
rcu_read_unlock()
or move rcu_read_unlock() here, or use "*work_data_bits(work) & WORK_OFFQ_BH".
> + /*
> + * We're flushing a BH work item which is being canceled. It
> + * must have been executing during start_flush_work() and can't
> + * currently be queued. If @work is still executing, we know it
> + * is running in the BH context and thus can be busy-waited.
> + *
> + * On RT, prevent a live lock when current preempted soft
> + * interrupt processing or prevents ksoftirqd from running by
> + * keeping flipping BH. If the tasklet runs on a different CPU
> + * then this has no effect other than doing the BH
> + * disable/enable dance for nothing. This is copied from
> + * kernel/softirq.c::tasklet_unlock_spin_wait().
> + */
s/tasklet/BH work/g
Although the comment is copied from kernel/softirq.c, but I can't
envision what the scenario is when the current task
"prevents ksoftirqd from running by keeping flipping BH"
since the @work is still executing or the tasklet is running.
> + while (!try_wait_for_completion(&barr.done)) {
> + if (IS_ENABLED(CONFIG_PREEMPT_RT)) {
> + local_bh_disable();
> + local_bh_enable();
> + } else {
> + cpu_relax();
> + }
> + }
> + } else {
> + wait_for_completion(&barr.done);
> + }
> +
> destroy_work_on_stack(&barr.work);
> return true;
> }
> @@ -4090,6 +4113,7 @@ static bool __flush_work(struct work_struct *work, bool from_cancel)
> */
> bool flush_work(struct work_struct *work)
> {
> + might_sleep();
> return __flush_work(work, false);
> }
> EXPORT_SYMBOL_GPL(flush_work);
> @@ -4179,6 +4203,11 @@ static bool __cancel_work_sync(struct work_struct *work, u32 cflags)
>
> ret = __cancel_work(work, cflags | WORK_CANCEL_DISABLE);
>
> + if (*work_data_bits(work) & WORK_OFFQ_BH)
> + WARN_ON_ONCE(in_hardirq());
When !PREEMPT_RT, this check is sufficient.
But when PREEMP_RT, it should be only in the contexts that allow
local_bh_disable() for synching a BH work, although I'm not sure
what check code is proper.
In PREEMPT_RT, local_bh_disable() is disallowed in not only hardirq
context but also !preemptible() context (I'm not sure about it).
__local_bh_disable_ip() (PREEMP_RT version) doesn't contain
full check except for "WARN_ON_ONCE(in_hardirq())" either.
Since the check is just for debugging, I'm OK with the current check.
Thanks
Lai
>
> 4cb1ef64609f ("workqueue: Implement BH workqueues to eventually replace
> tasklets") implemented workqueues that execute work items in the BH context
> with the goal of eventually replacing tasklet.
>
> While the existing workqueue API covers the basic queueing and canceling
> operations, tasklet also has tasklet_disable*() which blocks the execution
> of the tasklet until it's re-enabled with tasklet_enable(). The interface if
> fairly widely used and workqueue currently doesn't have a counterpart.
>
> This patchset implements disable/enable_work() and the delayed_work
> counterparts to address the gap. The ability to block future executions is
> something which some users asked for in the past, and, while not essential
> as the caller can and often has to shutdown the queuer anyway, it's a nice
> convenience to have. Also, timer grew a similar feature recently with
> timer_shutdown().
>
> - tasklet_disable() keeps disable depth so that a tasklet can be disabled
> and re-enabled by multiple parties at the same time. Workqueue is updated
> to carry 16bit disable count in work->data while the work item is not
> queued. When non-zero, attempts to queue the work item fail.
>
> - The cancel_work_sync() path used WORK_OFFQ_CANCELING to synchronize
> multiple cancel_sync attempts. This added a completely separate wait
> mechanism which wasn't very congruent with the rest of workqueue. This was
> because the canceling state was carried in a way which couldn't
> accommodate multiple concurrent uses. This mechanism is replaced by
> disable - cancel_sync now simply disables the work item, flushes and
> re-enables it.
>
> - There is a wart in how tasklet_disable/enable() works. When a tasklet is
> disabled, if the tasklet is queued, it keeps the softirq constantly raised
> until the tasklet is re-enabled and executed. This makes disabling
> unnecessarily expensive and brittle. The only busy looping workqueue's
> implementation does is on the party that's trying to cancel_sync or
> disable_sync to wait for the completion of the currently executing
> instance, which should be safe as long as it's from process and BH
> contexts.
>
> - A disabled tasklet remembers whether it was queued while disabled and
> starts executing when re-enabled. It turns out doing this with work items
> is challenging as there are a lot more to remember and the synchronization
> becomes more complicated too. Looking at the use cases and given the
> continuity from how cancel_work_sync() works, it seems better to just
> ignore queueings which happen while a work item is disabled and require
> the users to explicitly queue the work item after re-enabling as
> necessary. Most users should be able to re-enqueue unconditionally after
> enabling.
>
> This patchset is on top of wq/for-6.9 and contains the following 17 patches:
>
> 0001-workqueue-Cosmetic-changes.patch
> 0002-workqueue-Use-rcu_read_lock_any_held-instead-of-rcu_.patch
> 0003-workqueue-Rename-__cancel_work_timer-to-__cancel_tim.patch
> 0004-workqueue-Reorganize-flush-and-cancel-_sync-function.patch
> 0005-workqueue-Use-variable-name-irq_flags-for-saving-loc.patch
> 0006-workqueue-Introduce-work_cancel_flags.patch
> 0007-workqueue-Clean-up-enum-work_bits-and-related-consta.patch
> 0008-workqueue-Factor-out-work_grab_pending-from-__cancel.patch
> 0009-workqueue-Remove-clear_work_data.patch
> 0010-workqueue-Make-flags-handling-consistent-across-set_.patch
> 0011-workqueue-Preserve-OFFQ-bits-in-cancel-_sync-paths.patch
> 0012-workqueue-Implement-disable-enable-for-delayed-work-.patch
> 0013-workqueue-Remove-WORK_OFFQ_CANCELING.patch
> 0014-workqueue-Remember-whether-a-work-item-was-on-a-BH-w.patch
> 0015-workqueue-Update-how-start_flush_work-is-called.patch
> 0016-workqueue-Allow-cancel_work_sync-and-disable_work-fr.patch
> 0017-r8152-Convert-from-tasklet-to-BH-workqueue.patch
>
> 0001-0010 are cleanup and prep patches with the only functional change being
> the use of rcu_read_lock_any_held() instead of rcu_read_lock() in 0002. I'll
> apply them to wq/for-6.9 unless there are objections. I thought about making
> these a separate patch series but the cleanups make more sense as a part of
> this series.
>
> For the rest of the series, given how many invasive workqueue changes are
> already queued for v6.9 and the subtle nature of these patches, I think it'd
> be best to defer them to the one after that so that we can use v6.9 as an
> intermediate verification point.
>
> 0011-0012 implement disable_work() and enable_work(). At this stage, all
> disable[_sync]_work and enable_work operations might_sleep(). disable_work()
> and enable_work() due to CANCELING synchronization described above.
> disable_work_sync() also needs to wait for the in-flight work item to finish
> which requires blocking.
>
> 0013 replaces CANCELING with internal use of disble/enable. This removes one
> ugliness from workqueue code and allows disable_work() and enable_work() to
> be used from atomic contexts.
>
> 0014-0016 implement busy-wait for BH work items when they're being canceled
> thus allowing cancel_work_sync() and disable_work_sync() to be called from
> atomic contexts for them. This makes workqueue interface a superset of
> tasklet and also makes BH workqueues easier to live with.
>
> 0017 converts drivers/net/r8152.c from tasklet to BH workqueue as a
> demonstration. It seems to work fine.
>
> The patchset is on top of wq/for-6.9 fd0a68a2337b ("workqueue, irq_work:
> Build fix for !CONFIG_IRQ_WORK") and also available in the following git
> branch:
>
> git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq.git disable_work-v1
>
Thank you. Rebasing all the conversion patches on top of these changes.
I should have the entire set going out in a day or two for review.
- Allen
> diffstat follows. Thanks.
>
> drivers/net/usb/r8152.c | 44 ++--
> include/linux/workqueue.h | 69 ++++---
> kernel/workqueue.c | 623 ++++++++++++++++++++++++++++++++++++++++----------------------------
> 3 files changed, 441 insertions(+), 295 deletions(-)
>
> --
> tejun
--
- Allen
Hello,
On Tue, Feb 20, 2024 at 03:22:26PM +0800, Lai Jiangshan wrote:
> > - A work item carries 10bit disable count in work->data while not queued.
> > The access to the count is synchronized by the PENDING bit like all other
> > parts of work->data.
>
> It is 16bit disable count in the code.
Fixed.
> It misses the same handling at queue_work_node() and queue_rcu_work().
Oops, fixed queued_work_node() but I don't think the latter is an issue
given that calling work interface functions in the embedded work is not
supported and rcu_work can't even be canceled.
I'm not quite sure flush_delayed_work() is safe. Will think more about that.
> But it is bad idea to cancel or disable rcu work since it can be possibly
> in the rcu's waiting list.
Yeah, this isn't currently supported. If we want to add this, we'd have to
have a mechanism to shoot it down from RCU's pending list.
Thanks.
--
tejun
On Tue, Feb 20, 2024 at 03:23:53PM +0800, Lai Jiangshan wrote:
> Hello, Tejun
>
> On Sat, Feb 17, 2024 at 2:06 AM Tejun Heo <[email protected]> wrote:
>
> > @@ -2631,19 +2567,13 @@ bool mod_delayed_work_on(int cpu, struct workqueue_struct *wq,
> > struct delayed_work *dwork, unsigned long delay)
> > {
> > unsigned long irq_flags;
> > - int ret;
> > + bool ret;
> >
> > - do {
> > - ret = try_to_grab_pending(&dwork->work, WORK_CANCEL_DELAYED,
> > - &irq_flags);
> > - } while (unlikely(ret == -EAGAIN));
> > + ret = work_grab_pending(&dwork->work, WORK_CANCEL_DELAYED, &irq_flags);
> >
> > - if (likely(ret >= 0)) {
> > - __queue_delayed_work(cpu, wq, dwork, delay);
> > - local_irq_restore(irq_flags);
> > - }
> > + __queue_delayed_work(cpu, wq, dwork, delay);
>
> The disable count has to be checked before queuing it.
Ah, right, this also needs a clear_pending_if_disabled() call.
Thanks.
--
tejun
Hello,
On Tue, Feb 20, 2024 at 03:33:34PM +0800, Lai Jiangshan wrote:
> Hello, Tejun
>
> On Sat, Feb 17, 2024 at 2:06 AM Tejun Heo <[email protected]> wrote:
>
> > @@ -4072,7 +4070,32 @@ static bool __flush_work(struct work_struct *work, bool from_cancel)
> > if (!pool)
> > return false;
> >
> > - wait_for_completion(&barr.done);
> > + if ((pool->flags & POOL_BH) && from_cancel) {
>
> pool pointer might be invalid here, please check POOL_BH before
> rcu_read_unlock()
Right, it had a local variable caching the test result from inside the
rcu_read_lock() section and I removed it by mistake while splitting patches.
Will fix.
> > + /*
> > + * We're flushing a BH work item which is being canceled. It
> > + * must have been executing during start_flush_work() and can't
> > + * currently be queued. If @work is still executing, we know it
> > + * is running in the BH context and thus can be busy-waited.
> > + *
> > + * On RT, prevent a live lock when current preempted soft
> > + * interrupt processing or prevents ksoftirqd from running by
> > + * keeping flipping BH. If the tasklet runs on a different CPU
> > + * then this has no effect other than doing the BH
> > + * disable/enable dance for nothing. This is copied from
> > + * kernel/softirq.c::tasklet_unlock_spin_wait().
> > + */
>
> s/tasklet/BH work/g
Updated.
> Although the comment is copied from kernel/softirq.c, but I can't
> envision what the scenario is when the current task
> "prevents ksoftirqd from running by keeping flipping BH"
Yeah, that sentence is not the easiest to parse. The following parentheses
might be helpful:
prevent a live lock (when current (preempted soft interrupt processing) or
(prevents ksoftirqd from running)) by keeping flipping BH.
> since the @work is still executing or the tasklet is running.
eb2dafbba8b8 ("tasklets: Prevent tasklet_unlock_spin_wait() deadlock on RT")
is the commit which added the flipping to tasklet_unlock_spin_wait(). My
understanding of RT isn't great but it sounds like BH execution can be
preempted by someone else who does the busy wait which would be sad. IIUC,
it looks like flipping BH off/on makes the busy waiting one yield to BH
processing.
> > @@ -4179,6 +4203,11 @@ static bool __cancel_work_sync(struct work_struct *work, u32 cflags)
> >
> > ret = __cancel_work(work, cflags | WORK_CANCEL_DISABLE);
> >
> > + if (*work_data_bits(work) & WORK_OFFQ_BH)
> > + WARN_ON_ONCE(in_hardirq());
>
> When !PREEMPT_RT, this check is sufficient.
>
> But when PREEMP_RT, it should be only in the contexts that allow
> local_bh_disable() for synching a BH work, although I'm not sure
> what check code is proper.
>
> In PREEMPT_RT, local_bh_disable() is disallowed in not only hardirq
> context but also !preemptible() context (I'm not sure about it).
>
> __local_bh_disable_ip() (PREEMP_RT version) doesn't contain
> full check except for "WARN_ON_ONCE(in_hardirq())" either.
>
> Since the check is just for debugging, I'm OK with the current check.
We should have enough test coverage with !RT kernels. If you can think of a
better notation for RT, please be my guest.
Thanks.
--
tejun
On Fri, Feb 16, 2024 at 08:04:41AM -1000, Tejun Heo wrote:
> 0001-workqueue-Cosmetic-changes.patch
> 0002-workqueue-Use-rcu_read_lock_any_held-instead-of-rcu_.patch
> 0003-workqueue-Rename-__cancel_work_timer-to-__cancel_tim.patch
> 0004-workqueue-Reorganize-flush-and-cancel-_sync-function.patch
> 0005-workqueue-Use-variable-name-irq_flags-for-saving-loc.patch
> 0006-workqueue-Introduce-work_cancel_flags.patch
> 0007-workqueue-Clean-up-enum-work_bits-and-related-consta.patch
> 0008-workqueue-Factor-out-work_grab_pending-from-__cancel.patch
> 0009-workqueue-Remove-clear_work_data.patch
> 0010-workqueue-Make-flags-handling-consistent-across-set_.patch
> 0011-workqueue-Preserve-OFFQ-bits-in-cancel-_sync-paths.patch
> 0012-workqueue-Implement-disable-enable-for-delayed-work-.patch
> 0013-workqueue-Remove-WORK_OFFQ_CANCELING.patch
> 0014-workqueue-Remember-whether-a-work-item-was-on-a-BH-w.patch
> 0015-workqueue-Update-how-start_flush_work-is-called.patch
> 0016-workqueue-Allow-cancel_work_sync-and-disable_work-fr.patch
> 0017-r8152-Convert-from-tasklet-to-BH-workqueue.patch
>
> 0001-0010 are cleanup and prep patches with the only functional change being
> the use of rcu_read_lock_any_held() instead of rcu_read_lock() in 0002. I'll
> apply them to wq/for-6.9 unless there are objections. I thought about making
> these a separate patch series but the cleanups make more sense as a part of
> this series.
Lai, would you mind reviewing patches 0001-00010? If you're okay, I'll apply
them to wq/for-6.9 and then post the v2 patchset with the rest of the
patches updated to reflect your reviews.
Thanks.
--
tejun
Tejun
> > 0001-workqueue-Cosmetic-changes.patch
> > 0002-workqueue-Use-rcu_read_lock_any_held-instead-of-rcu_.patch
> > 0003-workqueue-Rename-__cancel_work_timer-to-__cancel_tim.patch
> > 0004-workqueue-Reorganize-flush-and-cancel-_sync-function.patch
> > 0005-workqueue-Use-variable-name-irq_flags-for-saving-loc.patch
> > 0006-workqueue-Introduce-work_cancel_flags.patch
> > 0007-workqueue-Clean-up-enum-work_bits-and-related-consta.patch
> > 0008-workqueue-Factor-out-work_grab_pending-from-__cancel.patch
> > 0009-workqueue-Remove-clear_work_data.patch
> > 0010-workqueue-Make-flags-handling-consistent-across-set_.patch
> > 0011-workqueue-Preserve-OFFQ-bits-in-cancel-_sync-paths.patch
> > 0012-workqueue-Implement-disable-enable-for-delayed-work-.patch
> > 0013-workqueue-Remove-WORK_OFFQ_CANCELING.patch
> > 0014-workqueue-Remember-whether-a-work-item-was-on-a-BH-w.patch
> > 0015-workqueue-Update-how-start_flush_work-is-called.patch
> > 0016-workqueue-Allow-cancel_work_sync-and-disable_work-fr.patch
> > 0017-r8152-Convert-from-tasklet-to-BH-workqueue.patch
> >
> > 0001-0010 are cleanup and prep patches with the only functional change being
> > the use of rcu_read_lock_any_held() instead of rcu_read_lock() in 0002. I'll
> > apply them to wq/for-6.9 unless there are objections. I thought about making
> > these a separate patch series but the cleanups make more sense as a part of
> > this series.
>
> Lai, would you mind reviewing patches 0001-00010? If you're okay, I'll apply
> them to wq/for-6.9 and then post the v2 patchset with the rest of the
> patches updated to reflect your reviews.
>
In addition to the above features, would it make sense to introduce a "count"
variable in struct work_struct?(I am not an expert with workqueue
internals). Or perhaps
we could use the existing "data" variable.
struct tasklet_struct has a variable count, which is referenced in
several drivers, Ex:
from drivers/gpu/drm/i915/i915_tasklet.h
28 static inline bool __tasklet_is_enabled(const struct tasklet_struct *t)
29 {
30 return !atomic_read(&t->count);
31 }
Also, there are several helper functions that use the "state"
variable, tasklet_lock() which calls tasklet_trylock().
I was thinking of adding/introducing these helper functions. Do let me
know if it makes sense.
Thanks.
On Wed, Feb 21, 2024 at 4:26 AM Tejun Heo <[email protected]> wrote:
>
> On Fri, Feb 16, 2024 at 08:04:41AM -1000, Tejun Heo wrote:
> > 0001-workqueue-Cosmetic-changes.patch
> > 0002-workqueue-Use-rcu_read_lock_any_held-instead-of-rcu_.patch
> > 0003-workqueue-Rename-__cancel_work_timer-to-__cancel_tim.patch
> > 0004-workqueue-Reorganize-flush-and-cancel-_sync-function.patch
> > 0005-workqueue-Use-variable-name-irq_flags-for-saving-loc.patch
> > 0006-workqueue-Introduce-work_cancel_flags.patch
> > 0007-workqueue-Clean-up-enum-work_bits-and-related-consta.patch
> > 0008-workqueue-Factor-out-work_grab_pending-from-__cancel.patch
> > 0009-workqueue-Remove-clear_work_data.patch
> > 0010-workqueue-Make-flags-handling-consistent-across-set_.patch
> > 0011-workqueue-Preserve-OFFQ-bits-in-cancel-_sync-paths.patch
> > 0012-workqueue-Implement-disable-enable-for-delayed-work-.patch
> > 0013-workqueue-Remove-WORK_OFFQ_CANCELING.patch
> > 0014-workqueue-Remember-whether-a-work-item-was-on-a-BH-w.patch
> > 0015-workqueue-Update-how-start_flush_work-is-called.patch
> > 0016-workqueue-Allow-cancel_work_sync-and-disable_work-fr.patch
> > 0017-r8152-Convert-from-tasklet-to-BH-workqueue.patch
> >
> > 0001-0010 are cleanup and prep patches with the only functional change being
> > the use of rcu_read_lock_any_held() instead of rcu_read_lock() in 0002. I'll
> > apply them to wq/for-6.9 unless there are objections. I thought about making
> > these a separate patch series but the cleanups make more sense as a part of
> > this series.
>
> Lai, would you mind reviewing patches 0001-00010? If you're okay, I'll apply
> them to wq/for-6.9 and then post the v2 patchset with the rest of the
> patches updated to reflect your reviews.
>
Hello, Tejun
For 0001-00010:
Reviewed-by: Lai Jiangshan <[email protected]>
Thanks
Lai
On Wed, Feb 21, 2024 at 2:38 AM Tejun Heo <[email protected]> wrote:
>
> Hello,
>
> On Tue, Feb 20, 2024 at 03:22:26PM +0800, Lai Jiangshan wrote:
> > > - A work item carries 10bit disable count in work->data while not queued.
> > > The access to the count is synchronized by the PENDING bit like all other
> > > parts of work->data.
> >
> > It is 16bit disable count in the code.
>
> Fixed.
>
> > It misses the same handling at queue_work_node() and queue_rcu_work().
>
> Oops, fixed queued_work_node() but I don't think the latter is an issue
> given that calling work interface functions in the embedded work is not
> supported and rcu_work can't even be canceled.
Hello, Tejun
I think it is better to have the same handling (checking disable count)
in queue_rcu_work().
1) code is consistent with other queuing code
2) known state: no work item is queued with disable count > 0
3) catch wrong usages: some complaining code can be added when adding the check.
Adding checking and complaining in the code is as important as
adding a comment stating rcu work is not allowed to be disabled/canceled.
>
> I'm not quite sure flush_delayed_work() is safe. Will think more about that.
I think the code successfully deleting the timer not only owns the pending bit
but also ensures the disable count is zero.
Thanks
Lai
On Wed, Feb 21, 2024 at 10:39:05AM +0800, Lai Jiangshan wrote:
> For 0001-00010:
>
> Reviewed-by: Lai Jiangshan <[email protected]>
Applied 0001-0010 to wq/for-6.9.
Thanks.
--
tejun
Hello,
On Wed, Feb 21, 2024 at 10:54:46AM +0800, Lai Jiangshan wrote:
> I think it is better to have the same handling (checking disable count)
> in queue_rcu_work().
>
> 1) code is consistent with other queuing code
> 2) known state: no work item is queued with disable count > 0
> 3) catch wrong usages: some complaining code can be added when adding the check.
>
> Adding checking and complaining in the code is as important as
> adding a comment stating rcu work is not allowed to be disabled/canceled.
Sure, will add a WARN_ON_ONCE().
> > I'm not quite sure flush_delayed_work() is safe. Will think more about that.
>
> I think the code successfully deleting the timer not only owns the pending bit
> but also ensures the disable count is zero.
Yeah, this should be fine.
Thanks.
--
tejun
Tejun,
> > For 0001-00010:
> >
> > Reviewed-by: Lai Jiangshan <[email protected]>
>
> Applied 0001-0010 to wq/for-6.9.
>
> Thanks.
I have rebased the conversion work based on
git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq.git disable_work-v1
0001-dma-Convert-from-tasklet-to-BH-workqueue.patch
0002-IB-Convert-from-tasklet-to-BH-workqueue.patch
0003-USB-Convert-from-tasklet-to-BH-workqueue.patch
0004-mailbox-Convert-from-tasklet-to-BH-workqueue.patch
0005-ipmi-Convert-from-tasklet-to-BH-workqueue.patch
0006-s390-Convert-from-tasklet-to-BH-workqueue.patch
0007-drivers-media-Convert-from-tasklet-to-BH-workqueue.patch
0008-mmc-Convert-from-tasklet-to-BH-workqueue.patch
Patches: https://github.com/allenpais/for-6.9-bh-conversions
KSPP Issue: refactor all tasklet users into other APIs · Issue #94 ·
https://github.com/KSPP/linux/issues/94
I am working on drivers/crypto/* and drivers/net/*. This might a
day or two more. Please provide guidance on the types of testing I
should perform.
Thanks.
Tejun,
> > Lai, would you mind reviewing patches 0001-00010? If you're okay, I'll apply
> > them to wq/for-6.9 and then post the v2 patchset with the rest of the
> > patches updated to reflect your reviews.
> >
>
> In addition to the above features, would it make sense to introduce a "count"
> variable in struct work_struct?(I am not an expert with workqueue
> internals). Or perhaps
> we could use the existing "data" variable.
>
> struct tasklet_struct has a variable count, which is referenced in
> several drivers, Ex:
>
> from drivers/gpu/drm/i915/i915_tasklet.h
> 28 static inline bool __tasklet_is_enabled(const struct tasklet_struct *t)
> 29 {
> 30 return !atomic_read(&t->count);
> 31 }
>
> Also, there are several helper functions that use the "state"
> variable, tasklet_lock() which calls tasklet_trylock().
> I was thinking of adding/introducing these helper functions. Do let me
> know if it makes sense.
Ignore my above question. I quite frankly had not read enough about workqueues
or did look into the code. I do understand more than what I did after
looking at the
sources.
I do understand the work bits, but I don't fully understand the
concept of colors
(work_color, flush_color). A swift overview of it would be highly appreciated.
Thanks.
--
- Allen
On Wed, Feb 21, 2024 at 09:03:47AM -0800, Allen wrote:
> Tejun,
>
> > > For 0001-00010:
> > >
> > > Reviewed-by: Lai Jiangshan <[email protected]>
> >
> > Applied 0001-0010 to wq/for-6.9.
> >
> > Thanks.
>
> I have rebased the conversion work based on
>
> git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq.git disable_work-v1
>
> 0001-dma-Convert-from-tasklet-to-BH-workqueue.patch
> 0002-IB-Convert-from-tasklet-to-BH-workqueue.patch
> 0003-USB-Convert-from-tasklet-to-BH-workqueue.patch
> 0004-mailbox-Convert-from-tasklet-to-BH-workqueue.patch
> 0005-ipmi-Convert-from-tasklet-to-BH-workqueue.patch
> 0006-s390-Convert-from-tasklet-to-BH-workqueue.patch
> 0007-drivers-media-Convert-from-tasklet-to-BH-workqueue.patch
> 0008-mmc-Convert-from-tasklet-to-BH-workqueue.patch
>
> Patches: https://github.com/allenpais/for-6.9-bh-conversions
> KSPP Issue: refactor all tasklet users into other APIs ? Issue #94 ?
> https://github.com/KSPP/linux/issues/94
>
> I am working on drivers/crypto/* and drivers/net/*. This might a
> day or two more. Please provide guidance on the types of testing I
> should perform.
Great! What's the plan for these? Will they go to subsystem maintainers
after the next merge window, or is Tejun carrying them?
-Kees
--
Kees Cook
On Fri, Feb 16, 2024 at 08:04:53AM -1000, Tejun Heo wrote:
[...]
> +/**
> + * enable_work - Enable a work item
> + * @work: work item to enable
> + *
> + * Undo disable_work[_sync]() by decrementing @work's disable count. @work can
> + * only be queued if its disable count is 0.
> + *
> + * Must be called from a sleepable context. Returns %true if the disable count
> + * reached 0. Otherwise, %false.
> + */
> +bool enable_work(struct work_struct *work)
> +{
> + struct work_offq_data offqd;
> + unsigned long irq_flags;
> +
> + work_grab_pending(work, 0, &irq_flags);
> +
> + work_offqd_unpack(&offqd, *work_data_bits(work));
> + work_offqd_enable(&offqd);
> + set_work_pool_and_clear_pending(work, offqd.pool_id,
> + work_offqd_pack_flags(&offqd));
> + local_irq_enable();
Maybe
local_irq_restore(irq_flags);
?
Because in a later patch, you make this funciton callable in any
context, so it may be called with irq disabled.
Regards,
Boqun
> +
> + return !offqd.disable;
> +}
> +EXPORT_SYMBOL_GPL(enable_work);
[..]
> > > > For 0001-00010:
> > > >
> > > > Reviewed-by: Lai Jiangshan <[email protected]>
> > >
> > > Applied 0001-0010 to wq/for-6.9.
> > >
> > > Thanks.
> >
> > I have rebased the conversion work based on
> >
> > git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq.git disable_work-v1
> >
> > 0001-dma-Convert-from-tasklet-to-BH-workqueue.patch
> > 0002-IB-Convert-from-tasklet-to-BH-workqueue.patch
> > 0003-USB-Convert-from-tasklet-to-BH-workqueue.patch
> > 0004-mailbox-Convert-from-tasklet-to-BH-workqueue.patch
> > 0005-ipmi-Convert-from-tasklet-to-BH-workqueue.patch
> > 0006-s390-Convert-from-tasklet-to-BH-workqueue.patch
> > 0007-drivers-media-Convert-from-tasklet-to-BH-workqueue.patch
> > 0008-mmc-Convert-from-tasklet-to-BH-workqueue.patch
> >
> > Patches: https://github.com/allenpais/for-6.9-bh-conversions
> > KSPP Issue: refactor all tasklet users into other APIs · Issue #94 ·
> > https://github.com/KSPP/linux/issues/94
> >
Kees,
> > I am working on drivers/crypto/* and drivers/net/*. This might a
> > day or two more. Please provide guidance on the types of testing I
> > should perform.
>
> Great! What's the plan for these? Will they go to subsystem maintainers
> after the next merge window, or is Tejun carrying them?
>
I think it's best to send it out the subsystem mailing list. Ideally, get some
testing/benchmarks run and then look at getting them into either Tejun's
branch or via subsystem maintainers. I am open to other ideas.
Thanks.
> -Kees
>
> --
> Kees Cook
--
- Allen
Hello,
On Sun, Feb 25, 2024 at 07:42:27PM -0800, Boqun Feng wrote:
> > +bool enable_work(struct work_struct *work)
> > +{
> > + struct work_offq_data offqd;
> > + unsigned long irq_flags;
> > +
> > + work_grab_pending(work, 0, &irq_flags);
> > +
> > + work_offqd_unpack(&offqd, *work_data_bits(work));
> > + work_offqd_enable(&offqd);
> > + set_work_pool_and_clear_pending(work, offqd.pool_id,
> > + work_offqd_pack_flags(&offqd));
> > + local_irq_enable();
>
> Maybe
> local_irq_restore(irq_flags);
>
> ?
>
> Because in a later patch, you make this funciton callable in any
> context, so it may be called with irq disabled.
Yeah, Lai already pointed that out. Will send out an updated patchset later.
Thanks.
--
tejun
Hello,
Sorry about the late reply.
On Thu, Feb 22, 2024 at 12:26:35PM -0800, Allen wrote:
> I do understand the work bits, but I don't fully understand the
> concept of colors
> (work_color, flush_color). A swift overview of it would be highly appreciated.
That's just to group work items for flush_workqueue(). Let's say there are
work items which are queued in sequence, 1 to 8. Let's also say that there
are three flush_workqueue() attempts which take place after work item 2, 4
and 6. Then, the flush colors A, B, C, D are assigned so that:
work items 1 2 3 4 5 6 7 8
flush_workqueue ^ ^ ^
\-----/ \-----/ \-----/ \------~~~
flush_color A B C D
and the flush_workqueue() code waits for its preceding colors to be drained
to implement flushing.
It's just a way to keep track of the number of work items in flight which
were issued before a certain point in time. It's expensive to do that for
arbitrary number of points in time, so it just has N slots for groups and
calls them flush colors.
This shouldn't really matter for the bh conversion given that everyone
operates on individual work item.
Thanks.
--
tejun
Hello,
On Mon, Feb 26, 2024 at 09:30:14AM -0800, Allen wrote:
> I think it's best to send it out the subsystem mailing list. Ideally, get some
> testing/benchmarks run and then look at getting them into either Tejun's
> branch or via subsystem maintainers. I am open to other ideas.
Yeah, either way is fine. If subsys trees take them, that's the best. If
not, they can be routed through the wq tree.
Thanks.
--
tejun
Tejun,
>
> Sorry about the late reply.
>
No worries :)
> On Thu, Feb 22, 2024 at 12:26:35PM -0800, Allen wrote:
> > I do understand the work bits, but I don't fully understand the
> > concept of colors
> > (work_color, flush_color). A swift overview of it would be highly appreciated.
>
> That's just to group work items for flush_workqueue(). Let's say there are
> work items which are queued in sequence, 1 to 8. Let's also say that there
> are three flush_workqueue() attempts which take place after work item 2, 4
> and 6. Then, the flush colors A, B, C, D are assigned so that:
>
> work items 1 2 3 4 5 6 7 8
> flush_workqueue ^ ^ ^
> \-----/ \-----/ \-----/ \------~~~
> flush_color A B C D
>
> and the flush_workqueue() code waits for its preceding colors to be drained
> to implement flushing.
>
> It's just a way to keep track of the number of work items in flight which
> were issued before a certain point in time. It's expensive to do that for
> arbitrary number of points in time, so it just has N slots for groups and
> calls them flush colors.
>
Thank you very much. This is very helpful.
- Allen
> This shouldn't really matter for the bh conversion given that everyone
> operates on individual work item.
>
> Thanks.
>
> --
> tejun