2024-02-27 17:33:05

by Tejun Heo

[permalink] [raw]
Subject: [PATCHSET v3 wq/6.10] workqueue: Implement disable/enable_work()

Hello,

Changes since v2 (http://lkml.kernel.org/r/[email protected]):

- enable_work() was incorrectly using local_irq_enable() instead of
local_irq_restore(). Fixed. (Lai, Boqun).

- In __flush_work(), instead of usring the pool returned from
start_flush_work() and testing it to tell whether the work item being
flushed is BH, use the work item's WORK_OFFQ_BH bit. This is safe because
the test is only need when @from_cancel is %true and we know that the work
item must be off queue and thus its OFFQ bits are valid at that point.
This makes the code a bit simpler and makes
0005-workqueue-Update-how-start_flush_work-is-called.patch unnecessary.
(Lai)

- There were discussions on whether we want to name the syncing variant
disable_work() and async one dsiable_work_no_sync() instead of naming them
disable_work_sync() and disable_work() respectively. While there are clear
benefits in doing so (continuity from tasklet interface, shorter name for
something used more widely), there is also the clear downside of breaking
consistency with cancel_work_sync() and cancel_work(), which the disable
functionality directly builds upon. I'm still leaning towards keeping
things consistent with existing workqueue interface but open to listening
to arguments, so if anyone has strong opinions, please pipe up.

Changes since v1 (http://lkml.kernel.org/[email protected]):

- Added missing disabled checks to queue_work_node() and
mod_delayed_work_on(). (Lai)

- __flush_work() was derefing RCU protected pool pointer outside
rcu_read_lock() section. Deref and test inside and remember the result.
(Lai)

- queue_rcu_work() doesn't need a disabled check as rcu_work doesn't support
canceling and disabling; however, in case users reach into rcu_work and
invoke disable on the embedded work item, add disabled check to
queue_rcu_work() and trigger warning if disabled. (Lai)

- The first ten cleanup patches have been applied to wq/for-6.9 and dropped
from this series.

4cb1ef64609f ("workqueue: Implement BH workqueues to eventually replace
tasklets") implemented workqueues that execute work items in the BH context
with the goal of eventually replacing tasklet.

While the existing workqueue API covers the basic queueing and canceling
operations, tasklet also has tasklet_disable*() which blocks the execution
of the tasklet until it's re-enabled with tasklet_enable(). The interface if
fairly widely used and workqueue currently doesn't have a counterpart.

This patchset implements disable/enable_work() and the delayed_work
counterparts to address the gap. The ability to block future executions is
something which some users asked for in the past, and, while not essential
as the caller can and often has to shutdown the queuer anyway, it's a nice
convenience to have. Also, timer grew a similar feature recently with
timer_shutdown().

- tasklet_disable() keeps disable depth so that a tasklet can be disabled
and re-enabled by multiple parties at the same time. Workqueue is updated
to carry 16bit disable count in work->data while the work item is not
queued. When non-zero, attempts to queue the work item fail.

- The cancel_work_sync() path used WORK_OFFQ_CANCELING to synchronize
multiple cancel_sync attempts. This added a completely separate wait
mechanism which wasn't very congruent with the rest of workqueue. This was
because the canceling state was carried in a way which couldn't
accommodate multiple concurrent uses. This mechanism is replaced by
disable - cancel_sync now simply disables the work item, flushes and
re-enables it.

- There is a wart in how tasklet_disable/enable() works. When a tasklet is
disabled, if the tasklet is queued, it keeps the softirq constantly raised
until the tasklet is re-enabled and executed. This makes disabling
unnecessarily expensive and brittle. The only busy looping workqueue's
implementation does is on the party that's trying to cancel_sync or
disable_sync to wait for the completion of the currently executing
instance, which should be safe as long as it's from process and BH
contexts.

- A disabled tasklet remembers whether it was queued while disabled and
starts executing when re-enabled. It turns out doing this with work items
is challenging as there are a lot more to remember and the synchronization
becomes more complicated too. Looking at the use cases and given the
continuity from how cancel_work_sync() works, it seems better to just
ignore queueings which happen while a work item is disabled and require
the users to explicitly queue the work item after re-enabling as
necessary. Most users should be able to re-enqueue unconditionally after
enabling.

After the changes in this patchset, BH workqueues cover most of the
functionalities provided by tasklets. The following is how the two APIs map:

- tasklet_setup/init() -> INIT_WORK()
- tasklet_schedule() -> queue_work(system_bh_wq, ...)
- tasklet_hi_schedule() -> queue_work(system_bh_highpri_wq, ...)
- tasklet_disable_nosync() -> disable_work()
- tasklet_disable[_in_atomic]() -> disable_work_sync()
- tasklet_enable() -> enable_work() + queue_work()
- tasklet_kill() -> cancel_work_sync()

Note that unlike tasklet_enable(), enable_work() doesn't queue the work item
automatically according to whether the work item was queued while disabled.
While the caller can track this separately, unconditionally scheduling the
work item after enable_work() returns %true should work for most users.

This patchset contains the following 6 patches:

0001-workqueue-Preserve-OFFQ-bits-in-cancel-_sync-paths.patch
0002-workqueue-Implement-disable-enable-for-delayed-work-.patch
0003-workqueue-Remove-WORK_OFFQ_CANCELING.patch
0004-workqueue-Remember-whether-a-work-item-was-on-a-BH-w.patch
0005-workqueue-Allow-cancel_work_sync-and-disable_work-fr.patch
0006-r8152-Convert-from-tasklet-to-BH-workqueue.patch

Given how many invasive workqueue changes are already queued for v6.9 and
the subtle nature of these patches, I think it'd be best to defer this
patchset to v6.10 so that we can use v6.9 as an intermediate verification
point.

0001-0002 implement disable_work() and enable_work(). At this stage, all
disable[_sync]_work and enable_work operations might_sleep(). disable_work()
and enable_work() due to CANCELING synchronization described above.
disable_work_sync() also needs to wait for the in-flight work item to finish
which requires blocking.

0003 replaces CANCELING with internal use of disble/enable. This removes one
ugliness from workqueue code and allows disable_work() and enable_work() to
be used from atomic contexts.

0004-0005 implement busy-wait for BH work items when they're being canceled
thus allowing cancel_work_sync() and disable_work_sync() to be called from
atomic contexts for them. This makes workqueue interface a superset of
tasklet and also makes BH workqueues easier to live with.

0006 converts drivers/net/r8152.c from tasklet to BH workqueue as a
demonstration. It seems to work fine.

The patchset is on top of wq/for-6.9 (ccdec92198df ("workqueue: Control
intensive warning threshold through cmdline")) and also available in the
following git branch:

git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq.git disable_work-v3

diffstat follows. Thanks.

drivers/net/usb/r8152.c | 44 ++++----
include/linux/workqueue.h | 23 +++-
kernel/workqueue.c | 384 ++++++++++++++++++++++++++++++++++++++++++++++---------------------------

--
tejun


2024-02-27 17:33:15

by Tejun Heo

[permalink] [raw]
Subject: [PATCH 3/6] workqueue: Remove WORK_OFFQ_CANCELING

cancel[_delayed]_work_sync() guarantees that it can shut down
self-requeueing work items. To achieve that, it grabs and then holds
WORK_STRUCT_PENDING bit set while flushing the currently executing instance.
As the PENDING bit is set, all queueing attempts including the
self-requeueing ones fail and once the currently executing instance is
flushed, the work item should be idle as long as someone else isn't actively
queueing it.

This means that the cancel_work_sync path may hold the PENDING bit set while
flushing the target work item. This isn't a problem for the queueing path -
it can just fail which is the desired effect. It doesn't affect flush. It
doesn't matter to cancel_work either as it can just report that the work
item has successfully canceled. However, if there's another cancel_work_sync
attempt on the work item, it can't simply fail or report success and that
would breach the guarantee that it should provide. cancel_work_sync has to
wait for and grab that PENDING bit and go through the motions.

WORK_OFFQ_CANCELING and wq_cancel_waitq are what implement this
cancel_work_sync to cancel_work_sync wait mechanism. When a work item is
being canceled, WORK_OFFQ_CANCELING is also set on it and other
cancel_work_sync attempts wait on the bit to be cleared using the wait
queue.

While this works, it's an isolated wart which doesn't jive with the rest of
flush and cancel mechanisms and forces enable_work() and disable_work() to
require a sleepable context, which hampers their usability.

Now that a work item can be disabled, we can use that to block queueing
while cancel_work_sync is in progress. Instead of holding PENDING the bit,
it can temporarily disable the work item, flush and then re-enable it as
that'd achieve the same end result of blocking queueings while canceling and
thus enable canceling of self-requeueing work items.

- WORK_OFFQ_CANCELING and the surrounding mechanims are removed.

- work_grab_pending() is now simpler, no longer has to wait for a blocking
operation and thus can be called from any context.

- With work_grab_pending() simplified, no need to use try_to_grab_pending()
directly. All users are converted to use work_grab_pending().

- __cancel_work_sync() is updated to __cancel_work() with
WORK_CANCEL_DISABLE to cancel and plug racing queueing attempts. It then
flushes and re-enables the work item if necessary.

- These changes allow disable_work() and enable_work() to be called from any
context.

v2: Lai pointed out that mod_delayed_work_on() needs to check the disable
count before queueing the delayed work item. Added
clear_pending_if_disabled() call.

Signed-off-by: Tejun Heo <[email protected]>
Cc: Lai Jiangshan <[email protected]>
---
include/linux/workqueue.h | 4 +-
kernel/workqueue.c | 140 ++++++--------------------------------
2 files changed, 20 insertions(+), 124 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index f25915e47efb..86483743ad28 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -52,10 +52,9 @@ enum work_bits {
*
* MSB
* [ pool ID ] [ disable depth ] [ OFFQ flags ] [ STRUCT flags ]
- * 16 bits 1 bit 4 or 5 bits
+ * 16 bits 0 bits 4 or 5 bits
*/
WORK_OFFQ_FLAG_SHIFT = WORK_STRUCT_FLAG_BITS,
- WORK_OFFQ_CANCELING_BIT = WORK_OFFQ_FLAG_SHIFT,
WORK_OFFQ_FLAG_END,
WORK_OFFQ_FLAG_BITS = WORK_OFFQ_FLAG_END - WORK_OFFQ_FLAG_SHIFT,

@@ -99,7 +98,6 @@ enum wq_misc_consts {
};

/* Convenience constants - of type 'unsigned long', not 'enum'! */
-#define WORK_OFFQ_CANCELING (1ul << WORK_OFFQ_CANCELING_BIT)
#define WORK_OFFQ_FLAG_MASK (((1ul << WORK_OFFQ_FLAG_BITS) - 1) << WORK_OFFQ_FLAG_SHIFT)
#define WORK_OFFQ_DISABLE_MASK (((1ul << WORK_OFFQ_DISABLE_BITS) - 1) << WORK_OFFQ_DISABLE_SHIFT)
#define WORK_OFFQ_POOL_NONE ((1ul << WORK_OFFQ_POOL_BITS) - 1)
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index a2f2847d464b..07e77130227c 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -495,12 +495,6 @@ static struct workqueue_attrs *unbound_std_wq_attrs[NR_STD_WORKER_POOLS];
/* I: attributes used when instantiating ordered pools on demand */
static struct workqueue_attrs *ordered_wq_attrs[NR_STD_WORKER_POOLS];

-/*
- * Used to synchronize multiple cancel_sync attempts on the same work item. See
- * work_grab_pending() and __cancel_work_sync().
- */
-static DECLARE_WAIT_QUEUE_HEAD(wq_cancel_waitq);
-
/*
* I: kthread_worker to release pwq's. pwq release needs to be bounced to a
* process context while holding a pool lock. Bounce to a dedicated kthread
@@ -782,11 +776,6 @@ static int work_next_color(int color)
* corresponding to a work. Pool is available once the work has been
* queued anywhere after initialization until it is sync canceled. pwq is
* available only while the work item is queued.
- *
- * %WORK_OFFQ_CANCELING is used to mark a work item which is being
- * canceled. While being canceled, a work item may have its PENDING set
- * but stay off timer and worklist for arbitrarily long and nobody should
- * try to steal the PENDING bit.
*/
static inline void set_work_data(struct work_struct *work, unsigned long data)
{
@@ -920,13 +909,6 @@ static unsigned long work_offqd_pack_flags(struct work_offq_data *offqd)
((unsigned long)offqd->flags);
}

-static bool work_is_canceling(struct work_struct *work)
-{
- unsigned long data = atomic_long_read(&work->data);
-
- return !(data & WORK_STRUCT_PWQ) && (data & WORK_OFFQ_CANCELING);
-}
-
/*
* Policy functions. These define the policies on how the global worker
* pools are managed. Unless noted otherwise, these functions assume that
@@ -2055,8 +2037,6 @@ static void pwq_dec_nr_in_flight(struct pool_workqueue *pwq, unsigned long work_
* 1 if @work was pending and we successfully stole PENDING
* 0 if @work was idle and we claimed PENDING
* -EAGAIN if PENDING couldn't be grabbed at the moment, safe to busy-retry
- * -ENOENT if someone else is canceling @work, this state may persist
- * for arbitrarily long
* ======== ================================================================
*
* Note:
@@ -2152,26 +2132,9 @@ static int try_to_grab_pending(struct work_struct *work, u32 cflags,
fail:
rcu_read_unlock();
local_irq_restore(*irq_flags);
- if (work_is_canceling(work))
- return -ENOENT;
- cpu_relax();
return -EAGAIN;
}

-struct cwt_wait {
- wait_queue_entry_t wait;
- struct work_struct *work;
-};
-
-static int cwt_wakefn(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
-{
- struct cwt_wait *cwait = container_of(wait, struct cwt_wait, wait);
-
- if (cwait->work != key)
- return 0;
- return autoremove_wake_function(wait, mode, sync, key);
-}
-
/**
* work_grab_pending - steal work item from worklist and disable irq
* @work: work item to steal
@@ -2181,7 +2144,7 @@ static int cwt_wakefn(wait_queue_entry_t *wait, unsigned mode, int sync, void *k
* Grab PENDING bit of @work. @work can be in any stable state - idle, on timer
* or on worklist.
*
- * Must be called in process context. IRQ is disabled on return with IRQ state
+ * Can be called from any context. IRQ is disabled on return with IRQ state
* stored in *@irq_flags. The caller is responsible for re-enabling it using
* local_irq_restore().
*
@@ -2190,41 +2153,14 @@ static int cwt_wakefn(wait_queue_entry_t *wait, unsigned mode, int sync, void *k
static bool work_grab_pending(struct work_struct *work, u32 cflags,
unsigned long *irq_flags)
{
- struct cwt_wait cwait;
int ret;

- might_sleep();
-repeat:
- ret = try_to_grab_pending(work, cflags, irq_flags);
- if (likely(ret >= 0))
- return ret;
- if (ret != -ENOENT)
- goto repeat;
-
- /*
- * Someone is already canceling. Wait for it to finish. flush_work()
- * doesn't work for PREEMPT_NONE because we may get woken up between
- * @work's completion and the other canceling task resuming and clearing
- * CANCELING - flush_work() will return false immediately as @work is no
- * longer busy, try_to_grab_pending() will return -ENOENT as @work is
- * still being canceled and the other canceling task won't be able to
- * clear CANCELING as we're hogging the CPU.
- *
- * Let's wait for completion using a waitqueue. As this may lead to the
- * thundering herd problem, use a custom wake function which matches
- * @work along with exclusive wait and wakeup.
- */
- init_wait(&cwait.wait);
- cwait.wait.func = cwt_wakefn;
- cwait.work = work;
-
- prepare_to_wait_exclusive(&wq_cancel_waitq, &cwait.wait,
- TASK_UNINTERRUPTIBLE);
- if (work_is_canceling(work))
- schedule();
- finish_wait(&wq_cancel_waitq, &cwait.wait);
-
- goto repeat;
+ while (true) {
+ ret = try_to_grab_pending(work, cflags, irq_flags);
+ if (ret >= 0)
+ return ret;
+ cpu_relax();
+ }
}

/**
@@ -2642,19 +2578,14 @@ bool mod_delayed_work_on(int cpu, struct workqueue_struct *wq,
struct delayed_work *dwork, unsigned long delay)
{
unsigned long irq_flags;
- int ret;
+ bool ret;

- do {
- ret = try_to_grab_pending(&dwork->work, WORK_CANCEL_DELAYED,
- &irq_flags);
- } while (unlikely(ret == -EAGAIN));
+ ret = work_grab_pending(&dwork->work, WORK_CANCEL_DELAYED, &irq_flags);

- if (likely(ret >= 0)) {
+ if (!clear_pending_if_disabled(&dwork->work))
__queue_delayed_work(cpu, wq, dwork, delay);
- local_irq_restore(irq_flags);
- }

- /* -ENOENT from try_to_grab_pending() becomes %true */
+ local_irq_restore(irq_flags);
return ret;
}
EXPORT_SYMBOL_GPL(mod_delayed_work_on);
@@ -4235,16 +4166,7 @@ static bool __cancel_work(struct work_struct *work, u32 cflags)
unsigned long irq_flags;
int ret;

- if (cflags & WORK_CANCEL_DISABLE) {
- ret = work_grab_pending(work, cflags, &irq_flags);
- } else {
- do {
- ret = try_to_grab_pending(work, cflags, &irq_flags);
- } while (unlikely(ret == -EAGAIN));
-
- if (unlikely(ret < 0))
- return false;
- }
+ ret = work_grab_pending(work, cflags, &irq_flags);

work_offqd_unpack(&offqd, *work_data_bits(work));

@@ -4259,22 +4181,9 @@ static bool __cancel_work(struct work_struct *work, u32 cflags)

static bool __cancel_work_sync(struct work_struct *work, u32 cflags)
{
- struct work_offq_data offqd;
- unsigned long irq_flags;
bool ret;

- /* claim @work and tell other tasks trying to grab @work to back off */
- ret = work_grab_pending(work, cflags, &irq_flags);
-
- work_offqd_unpack(&offqd, *work_data_bits(work));
-
- if (cflags & WORK_CANCEL_DISABLE)
- work_offqd_disable(&offqd);
-
- offqd.flags |= WORK_OFFQ_CANCELING;
- set_work_pool_and_keep_pending(work, offqd.pool_id,
- work_offqd_pack_flags(&offqd));
- local_irq_restore(irq_flags);
+ ret = __cancel_work(work, cflags | WORK_CANCEL_DISABLE);

/*
* Skip __flush_work() during early boot when we know that @work isn't
@@ -4283,19 +4192,8 @@ static bool __cancel_work_sync(struct work_struct *work, u32 cflags)
if (wq_online)
__flush_work(work, true);

- work_offqd_unpack(&offqd, *work_data_bits(work));
-
- /*
- * smp_mb() at the end of set_work_pool_and_clear_pending() is paired
- * with prepare_to_wait() above so that either waitqueue_active() is
- * visible here or !work_is_canceling() is visible there.
- */
- offqd.flags &= ~WORK_OFFQ_CANCELING;
- set_work_pool_and_clear_pending(work, WORK_OFFQ_POOL_NONE,
- work_offqd_pack_flags(&offqd));
-
- if (waitqueue_active(&wq_cancel_waitq))
- __wake_up(&wq_cancel_waitq, TASK_NORMAL, 1, work);
+ if (!(cflags & WORK_CANCEL_DISABLE))
+ enable_work(work);

return ret;
}
@@ -4379,8 +4277,8 @@ EXPORT_SYMBOL(cancel_delayed_work_sync);
* will fail and return %false. The maximum supported disable depth is 2 to the
* power of %WORK_OFFQ_DISABLE_BITS, currently 65536.
*
- * Must be called from a sleepable context. Returns %true if @work was pending,
- * %false otherwise.
+ * Can be called from any context. Returns %true if @work was pending, %false
+ * otherwise.
*/
bool disable_work(struct work_struct *work)
{
@@ -4411,8 +4309,8 @@ EXPORT_SYMBOL_GPL(disable_work_sync);
* Undo disable_work[_sync]() by decrementing @work's disable count. @work can
* only be queued if its disable count is 0.
*
- * Must be called from a sleepable context. Returns %true if the disable count
- * reached 0. Otherwise, %false.
+ * Can be called from any context. Returns %true if the disable count reached 0.
+ * Otherwise, %false.
*/
bool enable_work(struct work_struct *work)
{
--
2.43.2


2024-02-27 17:33:18

by Tejun Heo

[permalink] [raw]
Subject: [PATCH 4/6] workqueue: Remember whether a work item was on a BH workqueue

Add an off-queue flag, WORK_OFFQ_BH, that indicates whether the last
workqueue the work item was on was a BH one. This will be used to test
whether a work item is BH in cancel_sync path to implement atomic
cancel_sync'ing for BH work items.

Signed-off-by: Tejun Heo <[email protected]>
---
include/linux/workqueue.h | 4 +++-
kernel/workqueue.c | 10 ++++++++--
2 files changed, 11 insertions(+), 3 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 86483743ad28..7710cd52f7f0 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -52,9 +52,10 @@ enum work_bits {
*
* MSB
* [ pool ID ] [ disable depth ] [ OFFQ flags ] [ STRUCT flags ]
- * 16 bits 0 bits 4 or 5 bits
+ * 16 bits 1 bit 4 or 5 bits
*/
WORK_OFFQ_FLAG_SHIFT = WORK_STRUCT_FLAG_BITS,
+ WORK_OFFQ_BH_BIT = WORK_OFFQ_FLAG_SHIFT,
WORK_OFFQ_FLAG_END,
WORK_OFFQ_FLAG_BITS = WORK_OFFQ_FLAG_END - WORK_OFFQ_FLAG_SHIFT,

@@ -98,6 +99,7 @@ enum wq_misc_consts {
};

/* Convenience constants - of type 'unsigned long', not 'enum'! */
+#define WORK_OFFQ_BH (1ul << WORK_OFFQ_BH_BIT)
#define WORK_OFFQ_FLAG_MASK (((1ul << WORK_OFFQ_FLAG_BITS) - 1) << WORK_OFFQ_FLAG_SHIFT)
#define WORK_OFFQ_DISABLE_MASK (((1ul << WORK_OFFQ_DISABLE_BITS) - 1) << WORK_OFFQ_DISABLE_SHIFT)
#define WORK_OFFQ_POOL_NONE ((1ul << WORK_OFFQ_POOL_BITS) - 1)
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 07e77130227c..5c71fbd9d854 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -763,6 +763,11 @@ static int work_next_color(int color)
return (color + 1) % WORK_NR_COLORS;
}

+static unsigned long pool_offq_flags(struct worker_pool *pool)
+{
+ return (pool->flags & POOL_BH) ? WORK_OFFQ_BH : 0;
+}
+
/*
* While queued, %WORK_STRUCT_PWQ is set and non flag bits of a work's data
* contain the pointer to the queued pwq. Once execution starts, the flag
@@ -2119,7 +2124,8 @@ static int try_to_grab_pending(struct work_struct *work, u32 cflags,
* this destroys work->data needed by the next step, stash it.
*/
work_data = *work_data_bits(work);
- set_work_pool_and_keep_pending(work, pool->id, 0);
+ set_work_pool_and_keep_pending(work, pool->id,
+ pool_offq_flags(pool));

/* must be the last step, see the function comment */
pwq_dec_nr_in_flight(pwq, work_data);
@@ -3171,7 +3177,7 @@ __acquires(&pool->lock)
* PENDING and queued state changes happen together while IRQ is
* disabled.
*/
- set_work_pool_and_clear_pending(work, pool->id, 0);
+ set_work_pool_and_clear_pending(work, pool->id, pool_offq_flags(pool));

pwq->stats[PWQ_STAT_STARTED]++;
raw_spin_unlock_irq(&pool->lock);
--
2.43.2


2024-02-27 17:33:46

by Tejun Heo

[permalink] [raw]
Subject: [PATCH 5/6] workqueue: Allow cancel_work_sync() and disable_work() from atomic contexts on BH work items

Now that work_grab_pending() can always grab the PENDING bit without
sleeping, the only thing that prevents allowing cancel_work_sync() of a BH
work item from an atomic context is the flushing of the in-flight instance.

When we're flushing a BH work item for cancel_work_sync(), we know that the
work item is not queued and must be executing in a BH context, which means
that it's safe to busy-wait for its completion from a non-hardirq atomic
context.

This patch updates __flush_work() so that it busy-waits when flushing a BH
work item for cancel_work_sync(). might_sleep() is pushed from
start_flush_work() to its callers - when operating on a BH work item,
__cancel_work_sync() now enforces !in_hardirq() instead of might_sleep().

This allows cancel_work_sync() and disable_work() to be called from
non-hardirq atomic contexts on BH work items.

v3: In __flush_work(), test WORK_OFFQ_BH to tell whether a work item being
canceled can be busy waited instead of making start_flush_work() return
the pool. (Lai)

v2: Lai pointed out that __flush_work() was accessing pool->flags outside
the RCU critical section protecting the pool pointer. Fix it by testing
and remembering the result inside the RCU critical section.

Signed-off-by: Tejun Heo <[email protected]>
Cc: Lai Jiangshan <[email protected]>
---
kernel/workqueue.c | 74 ++++++++++++++++++++++++++++++++++------------
1 file changed, 55 insertions(+), 19 deletions(-)

diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 5c71fbd9d854..7d8eaca294c9 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -4020,8 +4020,6 @@ static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr,
struct pool_workqueue *pwq;
struct workqueue_struct *wq;

- might_sleep();
-
rcu_read_lock();
pool = get_work_pool(work);
if (!pool) {
@@ -4073,6 +4071,7 @@ static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr,
static bool __flush_work(struct work_struct *work, bool from_cancel)
{
struct wq_barrier barr;
+ unsigned long data;

if (WARN_ON(!wq_online))
return false;
@@ -4080,13 +4079,41 @@ static bool __flush_work(struct work_struct *work, bool from_cancel)
if (WARN_ON(!work->func))
return false;

- if (start_flush_work(work, &barr, from_cancel)) {
- wait_for_completion(&barr.done);
- destroy_work_on_stack(&barr.work);
- return true;
- } else {
+ if (!start_flush_work(work, &barr, from_cancel))
return false;
+
+ /*
+ * start_flush_work() returned %true. If @from_cancel is set, we know
+ * that @work must have been executing during start_flush_work() and
+ * can't currently be queued. Its data must contain OFFQ bits. If @work
+ * was queued on a BH workqueue, we also know that it was running in the
+ * BH context and thus can be busy-waited.
+ */
+ data = *work_data_bits(work);
+ if (from_cancel &&
+ !WARN_ON_ONCE(data & WORK_STRUCT_PWQ) && (data & WORK_OFFQ_BH)) {
+ /*
+ * On RT, prevent a live lock when %current preempted soft
+ * interrupt processing or prevents ksoftirqd from running by
+ * keeping flipping BH. If the BH work item runs on a different
+ * CPU then this has no effect other than doing the BH
+ * disable/enable dance for nothing. This is copied from
+ * kernel/softirq.c::tasklet_unlock_spin_wait().
+ */
+ while (!try_wait_for_completion(&barr.done)) {
+ if (IS_ENABLED(CONFIG_PREEMPT_RT)) {
+ local_bh_disable();
+ local_bh_enable();
+ } else {
+ cpu_relax();
+ }
+ }
+ } else {
+ wait_for_completion(&barr.done);
}
+
+ destroy_work_on_stack(&barr.work);
+ return true;
}

/**
@@ -4102,6 +4129,7 @@ static bool __flush_work(struct work_struct *work, bool from_cancel)
*/
bool flush_work(struct work_struct *work)
{
+ might_sleep();
return __flush_work(work, false);
}
EXPORT_SYMBOL_GPL(flush_work);
@@ -4191,6 +4219,11 @@ static bool __cancel_work_sync(struct work_struct *work, u32 cflags)

ret = __cancel_work(work, cflags | WORK_CANCEL_DISABLE);

+ if (*work_data_bits(work) & WORK_OFFQ_BH)
+ WARN_ON_ONCE(in_hardirq());
+ else
+ might_sleep();
+
/*
* Skip __flush_work() during early boot when we know that @work isn't
* executing. This allows canceling during early boot.
@@ -4217,19 +4250,19 @@ EXPORT_SYMBOL(cancel_work);
* cancel_work_sync - cancel a work and wait for it to finish
* @work: the work to cancel
*
- * Cancel @work and wait for its execution to finish. This function
- * can be used even if the work re-queues itself or migrates to
- * another workqueue. On return from this function, @work is
- * guaranteed to be not pending or executing on any CPU.
+ * Cancel @work and wait for its execution to finish. This function can be used
+ * even if the work re-queues itself or migrates to another workqueue. On return
+ * from this function, @work is guaranteed to be not pending or executing on any
+ * CPU as long as there aren't racing enqueues.
*
- * cancel_work_sync(&delayed_work->work) must not be used for
- * delayed_work's. Use cancel_delayed_work_sync() instead.
+ * cancel_work_sync(&delayed_work->work) must not be used for delayed_work's.
+ * Use cancel_delayed_work_sync() instead.
*
- * The caller must ensure that the workqueue on which @work was last
- * queued can't be destroyed before this function returns.
+ * Must be called from a sleepable context if @work was last queued on a non-BH
+ * workqueue. Can also be called from non-hardirq atomic contexts including BH
+ * if @work was last queued on a BH workqueue.
*
- * Return:
- * %true if @work was pending, %false otherwise.
+ * Returns %true if @work was pending, %false otherwise.
*/
bool cancel_work_sync(struct work_struct *work)
{
@@ -4299,8 +4332,11 @@ EXPORT_SYMBOL_GPL(disable_work);
* Similar to disable_work() but also wait for @work to finish if currently
* executing.
*
- * Must be called from a sleepable context. Returns %true if @work was pending,
- * %false otherwise.
+ * Must be called from a sleepable context if @work was last queued on a non-BH
+ * workqueue. Can also be called from non-hardirq atomic contexts including BH
+ * if @work was last queued on a BH workqueue.
+ *
+ * Returns %true if @work was pending, %false otherwise.
*/
bool disable_work_sync(struct work_struct *work)
{
--
2.43.2


2024-02-27 17:33:59

by Tejun Heo

[permalink] [raw]
Subject: [PATCH 6/6] r8152: Convert from tasklet to BH workqueue

tasklet is being replaced by BH workqueue. No noticeable behavior or
performance changes are expected. The following is how the two APIs map:

- tasklet_setup/init() -> INIT_WORK()
- tasklet_schedule() -> queue_work(system_bh_wq, ...)
- tasklet_hi_schedule() -> queue_work(system_bh_highpri_wq, ...)
- tasklet_disable_nosync() -> disable_work()
- tasklet_disable[_in_atomic]() -> disable_work_sync()
- tasklet_enable() -> enable_work() + queue_work()
- tasklet_kill() -> cancel_work_sync()

Note that unlike tasklet_enable(), enable_work() doesn't queue the work item
automatically according to whether the work item was queued while disabled.
While the caller can track this separately, unconditionally scheduling the
work item after enable_work() returns %true should work for most users.

r8152 conversion has been tested by repeatedly forcing the device to go
through resets using usbreset under iperf3 generated traffic.

Signed-off-by: Tejun Heo <[email protected]>
---
drivers/net/usb/r8152.c | 44 ++++++++++++++++++++++-------------------
1 file changed, 24 insertions(+), 20 deletions(-)

diff --git a/drivers/net/usb/r8152.c b/drivers/net/usb/r8152.c
index 9bf2140fd0a1..24e284b9eb38 100644
--- a/drivers/net/usb/r8152.c
+++ b/drivers/net/usb/r8152.c
@@ -882,7 +882,7 @@ struct r8152 {
#ifdef CONFIG_PM_SLEEP
struct notifier_block pm_notifier;
#endif
- struct tasklet_struct tx_tl;
+ struct work_struct tx_work;

struct rtl_ops {
void (*init)(struct r8152 *tp);
@@ -1948,7 +1948,7 @@ static void write_bulk_callback(struct urb *urb)
return;

if (!skb_queue_empty(&tp->tx_queue))
- tasklet_schedule(&tp->tx_tl);
+ queue_work(system_bh_wq, &tp->tx_work);
}

static void intr_callback(struct urb *urb)
@@ -2746,9 +2746,9 @@ static void tx_bottom(struct r8152 *tp)
} while (res == 0);
}

-static void bottom_half(struct tasklet_struct *t)
+static void bottom_half(struct work_struct *work)
{
- struct r8152 *tp = from_tasklet(tp, t, tx_tl);
+ struct r8152 *tp = container_of(work, struct r8152, tx_work);

if (test_bit(RTL8152_INACCESSIBLE, &tp->flags))
return;
@@ -2942,7 +2942,7 @@ static netdev_tx_t rtl8152_start_xmit(struct sk_buff *skb,
schedule_delayed_work(&tp->schedule, 0);
} else {
usb_mark_last_busy(tp->udev);
- tasklet_schedule(&tp->tx_tl);
+ queue_work(system_bh_wq, &tp->tx_work);
}
} else if (skb_queue_len(&tp->tx_queue) > tp->tx_qlen) {
netif_stop_queue(netdev);
@@ -6824,11 +6824,12 @@ static void set_carrier(struct r8152 *tp)
} else {
if (netif_carrier_ok(netdev)) {
netif_carrier_off(netdev);
- tasklet_disable(&tp->tx_tl);
+ disable_work_sync(&tp->tx_work);
napi_disable(napi);
tp->rtl_ops.disable(tp);
napi_enable(napi);
- tasklet_enable(&tp->tx_tl);
+ enable_work(&tp->tx_work);
+ queue_work(system_bh_wq, &tp->tx_work);
netif_info(tp, link, netdev, "carrier off\n");
}
}
@@ -6864,7 +6865,7 @@ static void rtl_work_func_t(struct work_struct *work)
/* don't schedule tasket before linking */
if (test_and_clear_bit(SCHEDULE_TASKLET, &tp->flags) &&
netif_carrier_ok(tp->netdev))
- tasklet_schedule(&tp->tx_tl);
+ queue_work(system_bh_wq, &tp->tx_work);

if (test_and_clear_bit(RX_EPROTO, &tp->flags) &&
!list_empty(&tp->rx_done))
@@ -6971,7 +6972,7 @@ static int rtl8152_open(struct net_device *netdev)
goto out_unlock;
}
napi_enable(&tp->napi);
- tasklet_enable(&tp->tx_tl);
+ enable_work(&tp->tx_work);

mutex_unlock(&tp->control);

@@ -6999,7 +7000,7 @@ static int rtl8152_close(struct net_device *netdev)
#ifdef CONFIG_PM_SLEEP
unregister_pm_notifier(&tp->pm_notifier);
#endif
- tasklet_disable(&tp->tx_tl);
+ disable_work_sync(&tp->tx_work);
clear_bit(WORK_ENABLE, &tp->flags);
usb_kill_urb(tp->intr_urb);
cancel_delayed_work_sync(&tp->schedule);
@@ -8421,7 +8422,7 @@ static int rtl8152_pre_reset(struct usb_interface *intf)
return 0;

netif_stop_queue(netdev);
- tasklet_disable(&tp->tx_tl);
+ disable_work_sync(&tp->tx_work);
clear_bit(WORK_ENABLE, &tp->flags);
usb_kill_urb(tp->intr_urb);
cancel_delayed_work_sync(&tp->schedule);
@@ -8466,7 +8467,8 @@ static int rtl8152_post_reset(struct usb_interface *intf)
}

napi_enable(&tp->napi);
- tasklet_enable(&tp->tx_tl);
+ enable_work(&tp->tx_work);
+ queue_work(system_bh_wq, &tp->tx_work);
netif_wake_queue(netdev);
usb_submit_urb(tp->intr_urb, GFP_KERNEL);

@@ -8625,12 +8627,13 @@ static int rtl8152_system_suspend(struct r8152 *tp)

clear_bit(WORK_ENABLE, &tp->flags);
usb_kill_urb(tp->intr_urb);
- tasklet_disable(&tp->tx_tl);
+ disable_work_sync(&tp->tx_work);
napi_disable(napi);
cancel_delayed_work_sync(&tp->schedule);
tp->rtl_ops.down(tp);
napi_enable(napi);
- tasklet_enable(&tp->tx_tl);
+ enable_work(&tp->tx_work);
+ queue_work(system_bh_wq, &tp->tx_work);
}

return 0;
@@ -9387,11 +9390,12 @@ static int rtl8152_change_mtu(struct net_device *dev, int new_mtu)
if (netif_carrier_ok(dev)) {
netif_stop_queue(dev);
napi_disable(&tp->napi);
- tasklet_disable(&tp->tx_tl);
+ disable_work_sync(&tp->tx_work);
tp->rtl_ops.disable(tp);
tp->rtl_ops.enable(tp);
rtl_start_rx(tp);
- tasklet_enable(&tp->tx_tl);
+ enable_work(&tp->tx_work);
+ queue_work(system_bh_wq, &tp->tx_work);
napi_enable(&tp->napi);
rtl8152_set_rx_mode(dev);
netif_wake_queue(dev);
@@ -9819,8 +9823,8 @@ static int rtl8152_probe_once(struct usb_interface *intf,
mutex_init(&tp->control);
INIT_DELAYED_WORK(&tp->schedule, rtl_work_func_t);
INIT_DELAYED_WORK(&tp->hw_phy_work, rtl_hw_phy_work_func_t);
- tasklet_setup(&tp->tx_tl, bottom_half);
- tasklet_disable(&tp->tx_tl);
+ INIT_WORK(&tp->tx_work, bottom_half);
+ disable_work(&tp->tx_work);

netdev->netdev_ops = &rtl8152_netdev_ops;
netdev->watchdog_timeo = RTL8152_TX_TIMEOUT;
@@ -9954,7 +9958,7 @@ static int rtl8152_probe_once(struct usb_interface *intf,
unregister_netdev(netdev);

out1:
- tasklet_kill(&tp->tx_tl);
+ cancel_work_sync(&tp->tx_work);
cancel_delayed_work_sync(&tp->hw_phy_work);
if (tp->rtl_ops.unload)
tp->rtl_ops.unload(tp);
@@ -10010,7 +10014,7 @@ static void rtl8152_disconnect(struct usb_interface *intf)
rtl_set_unplug(tp);

unregister_netdev(tp->netdev);
- tasklet_kill(&tp->tx_tl);
+ cancel_work_sync(&tp->tx_work);
cancel_delayed_work_sync(&tp->hw_phy_work);
if (tp->rtl_ops.unload)
tp->rtl_ops.unload(tp);
--
2.43.2


2024-02-27 18:11:42

by Tejun Heo

[permalink] [raw]
Subject: [PATCH 1/6] workqueue: Preserve OFFQ bits in cancel[_sync] paths

The cancel[_sync] paths acquire and release WORK_STRUCT_PENDING, and
manipulate WORK_OFFQ_CANCELING. However, they assume that all the OFFQ bit
values except for the pool ID are statically known and don't preserve them,
which is not wrong in the current code as the pool ID and CANCELING are the
only information carried. However, the planned disable/enable support will
add more fields and need them to be preserved.

This patch updates work data handling so that only the bits which need
updating are updated.

- struct work_offq_data is added along with work_offqd_unpack() and
work_offqd_pack_flags() to help manipulating multiple fields contained in
work->data. Note that the helpers look a bit silly right now as there
isn't that much to pack. The next patch will add more.

- mark_work_canceling() which is used only by __cancel_work_sync() is
replaced by open-coded usage of work_offq_data and
set_work_pool_and_keep_pending() in __cancel_work_sync().

- __cancel_work[_sync]() uses offq_data helpers to preserve other OFFQ bits
when clearing WORK_STRUCT_PENDING and WORK_OFFQ_CANCELING at the end.

- This removes all users of get_work_pool_id() which is dropped. Note that
get_work_pool_id() could handle both WORK_STRUCT_PWQ and !WORK_STRUCT_PWQ
cases; however, it was only being called after try_to_grab_pending()
succeeded, in which case WORK_STRUCT_PWQ is never set and thus it's safe
to use work_offqd_unpack() instead.

No behavior changes intended.

Signed-off-by: Tejun Heo <[email protected]>
---
include/linux/workqueue.h | 1 +
kernel/workqueue.c | 51 ++++++++++++++++++++++++---------------
2 files changed, 32 insertions(+), 20 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 0ad534fe6673..e15fc77bf2e2 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -97,6 +97,7 @@ enum wq_misc_consts {

/* Convenience constants - of type 'unsigned long', not 'enum'! */
#define WORK_OFFQ_CANCELING (1ul << WORK_OFFQ_CANCELING_BIT)
+#define WORK_OFFQ_FLAG_MASK (((1ul << WORK_OFFQ_FLAG_BITS) - 1) << WORK_OFFQ_FLAG_SHIFT)
#define WORK_OFFQ_POOL_NONE ((1ul << WORK_OFFQ_POOL_BITS) - 1)
#define WORK_STRUCT_NO_POOL (WORK_OFFQ_POOL_NONE << WORK_OFFQ_POOL_SHIFT)
#define WORK_STRUCT_PWQ_MASK (~((1ul << WORK_STRUCT_PWQ_SHIFT) - 1))
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 38783e3a60bb..ecd46fbed60b 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -391,6 +391,11 @@ struct wq_pod_type {
int *cpu_pod; /* cpu -> pod */
};

+struct work_offq_data {
+ u32 pool_id;
+ u32 flags;
+};
+
static const char *wq_affn_names[WQ_AFFN_NR_TYPES] = {
[WQ_AFFN_DFL] = "default",
[WQ_AFFN_CPU] = "cpu",
@@ -891,29 +896,23 @@ static struct worker_pool *get_work_pool(struct work_struct *work)
return idr_find(&worker_pool_idr, pool_id);
}

-/**
- * get_work_pool_id - return the worker pool ID a given work is associated with
- * @work: the work item of interest
- *
- * Return: The worker_pool ID @work was last associated with.
- * %WORK_OFFQ_POOL_NONE if none.
- */
-static int get_work_pool_id(struct work_struct *work)
+static unsigned long shift_and_mask(unsigned long v, u32 shift, u32 bits)
{
- unsigned long data = atomic_long_read(&work->data);
+ return (v >> shift) & ((1 << bits) - 1);
+}

- if (data & WORK_STRUCT_PWQ)
- return work_struct_pwq(data)->pool->id;
+static void work_offqd_unpack(struct work_offq_data *offqd, unsigned long data)
+{
+ WARN_ON_ONCE(data & WORK_STRUCT_PWQ);

- return data >> WORK_OFFQ_POOL_SHIFT;
+ offqd->pool_id = shift_and_mask(data, WORK_OFFQ_POOL_SHIFT,
+ WORK_OFFQ_POOL_BITS);
+ offqd->flags = data & WORK_OFFQ_FLAG_MASK;
}

-static void mark_work_canceling(struct work_struct *work)
+static unsigned long work_offqd_pack_flags(struct work_offq_data *offqd)
{
- unsigned long pool_id = get_work_pool_id(work);
-
- pool_id <<= WORK_OFFQ_POOL_SHIFT;
- set_work_data(work, pool_id | WORK_STRUCT_PENDING | WORK_OFFQ_CANCELING);
+ return (unsigned long)offqd->flags;
}

static bool work_is_canceling(struct work_struct *work)
@@ -4186,6 +4185,7 @@ EXPORT_SYMBOL(flush_rcu_work);

static bool __cancel_work(struct work_struct *work, u32 cflags)
{
+ struct work_offq_data offqd;
unsigned long irq_flags;
int ret;

@@ -4196,19 +4196,26 @@ static bool __cancel_work(struct work_struct *work, u32 cflags)
if (unlikely(ret < 0))
return false;

- set_work_pool_and_clear_pending(work, get_work_pool_id(work), 0);
+ work_offqd_unpack(&offqd, *work_data_bits(work));
+ set_work_pool_and_clear_pending(work, offqd.pool_id,
+ work_offqd_pack_flags(&offqd));
local_irq_restore(irq_flags);
return ret;
}

static bool __cancel_work_sync(struct work_struct *work, u32 cflags)
{
+ struct work_offq_data offqd;
unsigned long irq_flags;
bool ret;

/* claim @work and tell other tasks trying to grab @work to back off */
ret = work_grab_pending(work, cflags, &irq_flags);
- mark_work_canceling(work);
+
+ work_offqd_unpack(&offqd, *work_data_bits(work));
+ offqd.flags |= WORK_OFFQ_CANCELING;
+ set_work_pool_and_keep_pending(work, offqd.pool_id,
+ work_offqd_pack_flags(&offqd));
local_irq_restore(irq_flags);

/*
@@ -4218,12 +4225,16 @@ static bool __cancel_work_sync(struct work_struct *work, u32 cflags)
if (wq_online)
__flush_work(work, true);

+ work_offqd_unpack(&offqd, *work_data_bits(work));
+
/*
* smp_mb() at the end of set_work_pool_and_clear_pending() is paired
* with prepare_to_wait() above so that either waitqueue_active() is
* visible here or !work_is_canceling() is visible there.
*/
- set_work_pool_and_clear_pending(work, WORK_OFFQ_POOL_NONE, 0);
+ offqd.flags &= ~WORK_OFFQ_CANCELING;
+ set_work_pool_and_clear_pending(work, WORK_OFFQ_POOL_NONE,
+ work_offqd_pack_flags(&offqd));

if (waitqueue_active(&wq_cancel_waitq))
__wake_up(&wq_cancel_waitq, TASK_NORMAL, 1, work);
--
2.43.2


2024-02-28 10:01:01

by Lai Jiangshan

[permalink] [raw]
Subject: Re: [PATCHSET v3 wq/6.10] workqueue: Implement disable/enable_work()

Hello

For patch 1-5:

Reviewed-by: Lai Jiangshan <[email protected]>

thanks
Lai

On Wed, Feb 28, 2024 at 1:28 AM Tejun Heo <[email protected]> wrote:

>
> git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq.git disable_work-v3
>
> diffstat follows. Thanks.
>
> drivers/net/usb/r8152.c | 44 ++++----
> include/linux/workqueue.h | 23 +++-
> kernel/workqueue.c | 384 ++++++++++++++++++++++++++++++++++++++++++++++---------------------------
>

2024-02-28 17:34:08

by Allen

[permalink] [raw]
Subject: Re: [PATCH 6/6] r8152: Convert from tasklet to BH workqueue

> tasklet is being replaced by BH workqueue. No noticeable behavior or
> performance changes are expected. The following is how the two APIs map:
>
> - tasklet_setup/init() -> INIT_WORK()
> - tasklet_schedule() -> queue_work(system_bh_wq, ...)
> - tasklet_hi_schedule() -> queue_work(system_bh_highpri_wq, ...)
> - tasklet_disable_nosync() -> disable_work()
> - tasklet_disable[_in_atomic]() -> disable_work_sync()
> - tasklet_enable() -> enable_work() + queue_work()
> - tasklet_kill() -> cancel_work_sync()
>
> Note that unlike tasklet_enable(), enable_work() doesn't queue the work item
> automatically according to whether the work item was queued while disabled.
> While the caller can track this separately, unconditionally scheduling the
> work item after enable_work() returns %true should work for most users.
>
> r8152 conversion has been tested by repeatedly forcing the device to go
> through resets using usbreset under iperf3 generated traffic.
>
> Signed-off-by: Tejun Heo <[email protected]>
> ---
> drivers/net/usb/r8152.c | 44 ++++++++++++++++++++++-------------------
> 1 file changed, 24 insertions(+), 20 deletions(-)
>
> diff --git a/drivers/net/usb/r8152.c b/drivers/net/usb/r8152.c
> index 9bf2140fd0a1..24e284b9eb38 100644
> --- a/drivers/net/usb/r8152.c
> +++ b/drivers/net/usb/r8152.c
> @@ -882,7 +882,7 @@ struct r8152 {
> #ifdef CONFIG_PM_SLEEP
> struct notifier_block pm_notifier;
> #endif
> - struct tasklet_struct tx_tl;
> + struct work_struct tx_work;
>
> struct rtl_ops {
> void (*init)(struct r8152 *tp);
> @@ -1948,7 +1948,7 @@ static void write_bulk_callback(struct urb *urb)
> return;
>
> if (!skb_queue_empty(&tp->tx_queue))
> - tasklet_schedule(&tp->tx_tl);
> + queue_work(system_bh_wq, &tp->tx_work);
> }
>
> static void intr_callback(struct urb *urb)
> @@ -2746,9 +2746,9 @@ static void tx_bottom(struct r8152 *tp)
> } while (res == 0);
> }
>
> -static void bottom_half(struct tasklet_struct *t)
> +static void bottom_half(struct work_struct *work)
> {
> - struct r8152 *tp = from_tasklet(tp, t, tx_tl);
> + struct r8152 *tp = container_of(work, struct r8152, tx_work);
>
> if (test_bit(RTL8152_INACCESSIBLE, &tp->flags))
> return;
> @@ -2942,7 +2942,7 @@ static netdev_tx_t rtl8152_start_xmit(struct sk_buff *skb,
> schedule_delayed_work(&tp->schedule, 0);
> } else {
> usb_mark_last_busy(tp->udev);
> - tasklet_schedule(&tp->tx_tl);
> + queue_work(system_bh_wq, &tp->tx_work);
> }
> } else if (skb_queue_len(&tp->tx_queue) > tp->tx_qlen) {
> netif_stop_queue(netdev);
> @@ -6824,11 +6824,12 @@ static void set_carrier(struct r8152 *tp)
> } else {
> if (netif_carrier_ok(netdev)) {
> netif_carrier_off(netdev);
> - tasklet_disable(&tp->tx_tl);
> + disable_work_sync(&tp->tx_work);
> napi_disable(napi);
> tp->rtl_ops.disable(tp);
> napi_enable(napi);
> - tasklet_enable(&tp->tx_tl);
> + enable_work(&tp->tx_work);
> + queue_work(system_bh_wq, &tp->tx_work);
> netif_info(tp, link, netdev, "carrier off\n");
> }
> }
> @@ -6864,7 +6865,7 @@ static void rtl_work_func_t(struct work_struct *work)
> /* don't schedule tasket before linking */
> if (test_and_clear_bit(SCHEDULE_TASKLET, &tp->flags) &&
> netif_carrier_ok(tp->netdev))
> - tasklet_schedule(&tp->tx_tl);
> + queue_work(system_bh_wq, &tp->tx_work);
>
> if (test_and_clear_bit(RX_EPROTO, &tp->flags) &&
> !list_empty(&tp->rx_done))
> @@ -6971,7 +6972,7 @@ static int rtl8152_open(struct net_device *netdev)
> goto out_unlock;
> }
> napi_enable(&tp->napi);
> - tasklet_enable(&tp->tx_tl);
> + enable_work(&tp->tx_work);

I think we are missing queue_work() above, right?

To avoid such situations, could we combine enable_work() + queue_work(),
into a single API?
Perhaps, something like:

static inline bool enable_and_queue_work(struct workqueue_struct *wq,
struct work_struct *work)
{
if (enable_work(work))
{
queue_work(wq, work);
return true;
}
return false;
}


Thanks,
Allen


> mutex_unlock(&tp->control);
>
> @@ -6999,7 +7000,7 @@ static int rtl8152_close(struct net_device *netdev)
> #ifdef CONFIG_PM_SLEEP
> unregister_pm_notifier(&tp->pm_notifier);
> #endif
> - tasklet_disable(&tp->tx_tl);
> + disable_work_sync(&tp->tx_work);
> clear_bit(WORK_ENABLE, &tp->flags);
> usb_kill_urb(tp->intr_urb);
> cancel_delayed_work_sync(&tp->schedule);
> @@ -8421,7 +8422,7 @@ static int rtl8152_pre_reset(struct usb_interface *intf)
> return 0;
>
> netif_stop_queue(netdev);
> - tasklet_disable(&tp->tx_tl);
> + disable_work_sync(&tp->tx_work);
> clear_bit(WORK_ENABLE, &tp->flags);
> usb_kill_urb(tp->intr_urb);
> cancel_delayed_work_sync(&tp->schedule);
> @@ -8466,7 +8467,8 @@ static int rtl8152_post_reset(struct usb_interface *intf)
> }
>
> napi_enable(&tp->napi);
> - tasklet_enable(&tp->tx_tl);
> + enable_work(&tp->tx_work);
> + queue_work(system_bh_wq, &tp->tx_work);
> netif_wake_queue(netdev);
> usb_submit_urb(tp->intr_urb, GFP_KERNEL);
>
> @@ -8625,12 +8627,13 @@ static int rtl8152_system_suspend(struct r8152 *tp)
>
> clear_bit(WORK_ENABLE, &tp->flags);
> usb_kill_urb(tp->intr_urb);
> - tasklet_disable(&tp->tx_tl);
> + disable_work_sync(&tp->tx_work);
> napi_disable(napi);
> cancel_delayed_work_sync(&tp->schedule);
> tp->rtl_ops.down(tp);
> napi_enable(napi);
> - tasklet_enable(&tp->tx_tl);
> + enable_work(&tp->tx_work);
> + queue_work(system_bh_wq, &tp->tx_work);
> }
>
> return 0;
> @@ -9387,11 +9390,12 @@ static int rtl8152_change_mtu(struct net_device *dev, int new_mtu)
> if (netif_carrier_ok(dev)) {
> netif_stop_queue(dev);
> napi_disable(&tp->napi);
> - tasklet_disable(&tp->tx_tl);
> + disable_work_sync(&tp->tx_work);
> tp->rtl_ops.disable(tp);
> tp->rtl_ops.enable(tp);
> rtl_start_rx(tp);
> - tasklet_enable(&tp->tx_tl);
> + enable_work(&tp->tx_work);
> + queue_work(system_bh_wq, &tp->tx_work);
> napi_enable(&tp->napi);
> rtl8152_set_rx_mode(dev);
> netif_wake_queue(dev);
> @@ -9819,8 +9823,8 @@ static int rtl8152_probe_once(struct usb_interface *intf,
> mutex_init(&tp->control);
> INIT_DELAYED_WORK(&tp->schedule, rtl_work_func_t);
> INIT_DELAYED_WORK(&tp->hw_phy_work, rtl_hw_phy_work_func_t);
> - tasklet_setup(&tp->tx_tl, bottom_half);
> - tasklet_disable(&tp->tx_tl);
> + INIT_WORK(&tp->tx_work, bottom_half);
> + disable_work(&tp->tx_work);
>
> netdev->netdev_ops = &rtl8152_netdev_ops;
> netdev->watchdog_timeo = RTL8152_TX_TIMEOUT;
> @@ -9954,7 +9958,7 @@ static int rtl8152_probe_once(struct usb_interface *intf,
> unregister_netdev(netdev);
>
> out1:
> - tasklet_kill(&tp->tx_tl);
> + cancel_work_sync(&tp->tx_work);
> cancel_delayed_work_sync(&tp->hw_phy_work);
> if (tp->rtl_ops.unload)
> tp->rtl_ops.unload(tp);
> @@ -10010,7 +10014,7 @@ static void rtl8152_disconnect(struct usb_interface *intf)
> rtl_set_unplug(tp);
>
> unregister_netdev(tp->netdev);
> - tasklet_kill(&tp->tx_tl);
> + cancel_work_sync(&tp->tx_work);
> cancel_delayed_work_sync(&tp->hw_phy_work);
> if (tp->rtl_ops.unload)
> tp->rtl_ops.unload(tp);
> --
> 2.43.2
>


--
- Allen

2024-02-28 18:01:11

by Allen

[permalink] [raw]
Subject: Re: [PATCH 6/6] r8152: Convert from tasklet to BH workqueue

> > > @@ -6971,7 +6972,7 @@ static int rtl8152_open(struct net_device *netdev)
> > > goto out_unlock;
> > > }
> > > napi_enable(&tp->napi);
> > > - tasklet_enable(&tp->tx_tl);
> > > + enable_work(&tp->tx_work);
> >
> > I think we are missing queue_work() above, right?
> >
> > To avoid such situations, could we combine enable_work() + queue_work(),
> > into a single API?
>
> Here, the device is newly being opened and the work item is just disabled
> from the init. So, it doesn't need queueing.
>

Ah, my bad. Thanks for pointing it out.

> > Perhaps, something like:
> >
> > static inline bool enable_and_queue_work(struct workqueue_struct *wq,
> > struct work_struct *work)
> > {
> > if (enable_work(work))
> > {
> > queue_work(wq, work);
> > return true;
> > }
> > return false;
> > }
>
> That said, this may still be nice to have.
>

If the above is okay, I could send out a patch. Please let me know.

Thanks.

2024-02-28 18:02:43

by Tejun Heo

[permalink] [raw]
Subject: Re: [PATCH 6/6] r8152: Convert from tasklet to BH workqueue

On Wed, Feb 28, 2024 at 10:00:08AM -0800, Allen wrote:
> > > static inline bool enable_and_queue_work(struct workqueue_struct *wq,
> > > struct work_struct *work)
> > > {
> > > if (enable_work(work))
> > > {
> > > queue_work(wq, work);
> > > return true;
> > > }
> > > return false;
> > > }
> >
> > That said, this may still be nice to have.
>
> If the above is okay, I could send out a patch. Please let me know.

Yes, please.

Thanks.

--
tejun

2024-02-28 18:06:27

by Tejun Heo

[permalink] [raw]
Subject: Re: [PATCH 6/6] r8152: Convert from tasklet to BH workqueue

Hello,

On Wed, Feb 28, 2024 at 09:33:40AM -0800, Allen wrote:
> > @@ -6971,7 +6972,7 @@ static int rtl8152_open(struct net_device *netdev)
> > goto out_unlock;
> > }
> > napi_enable(&tp->napi);
> > - tasklet_enable(&tp->tx_tl);
> > + enable_work(&tp->tx_work);
>
> I think we are missing queue_work() above, right?
>
> To avoid such situations, could we combine enable_work() + queue_work(),
> into a single API?

Here, the device is newly being opened and the work item is just disabled
from the init. So, it doesn't need queueing.

> Perhaps, something like:
>
> static inline bool enable_and_queue_work(struct workqueue_struct *wq,
> struct work_struct *work)
> {
> if (enable_work(work))
> {
> queue_work(wq, work);
> return true;
> }
> return false;
> }

That said, this may still be nice to have.

Thanks.

--
tejun

2024-03-25 18:10:36

by Tejun Heo

[permalink] [raw]
Subject: Re: [PATCHSET v3 wq/6.10] workqueue: Implement disable/enable_work()

On Wed, Feb 28, 2024 at 06:00:22PM +0800, Lai Jiangshan wrote:
> Hello
>
> For patch 1-5:
>
> Reviewed-by: Lai Jiangshan <[email protected]>

Applied 1-5 to wq/for-6.10.

Thanks.

--
tejun