2024-02-05 21:29:56

by Waiman Long

[permalink] [raw]
Subject: [PATCH-wq v3 1/4] workqueue: Enable unbound cpumask update on ordered workqueues

Ordered workqueues does not currently follow changes made to the
global unbound cpumask because per-pool workqueue changes may break
the ordering guarantee. IOW, a work function in an ordered workqueue
may run on an isolated CPU.

This patch enables ordered workqueues to follow changes made to
the global unbound cpumask by temporaily freeze the newly allocated
pool_workqueue by using the new frozen flag to freeze execution of
newly queued work items until the old pwq has been properly flushed.

This enables ordered workqueues to follow the unbound cpumask changes
like other unbound workqueues at the expense of some delay in execution
of work functions during the transition period.

Signed-off-by: Waiman Long <[email protected]>
Tested-by: Juri Lelli <[email protected]>
---
kernel/workqueue.c | 93 +++++++++++++++++++++++++++++++++++++++-------
1 file changed, 80 insertions(+), 13 deletions(-)

diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 68c48489eab3..9b107e8a2c15 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -255,6 +255,7 @@ struct pool_workqueue {
int refcnt; /* L: reference count */
int nr_in_flight[WORK_NR_COLORS];
/* L: nr of in_flight works */
+ int frozen; /* L: temporarily frozen */

/*
* nr_active management and WORK_STRUCT_INACTIVE:
@@ -1702,6 +1703,9 @@ static bool pwq_tryinc_nr_active(struct pool_workqueue *pwq, bool fill)

lockdep_assert_held(&pool->lock);

+ if (pwq->frozen)
+ return false;
+
if (!nna) {
/* BH or per-cpu workqueue, pwq->nr_active is sufficient */
obtained = pwq->nr_active < READ_ONCE(wq->max_active);
@@ -1782,6 +1786,21 @@ static bool pwq_activate_first_inactive(struct pool_workqueue *pwq, bool fill)
}
}

+/**
+ * thaw_pwq - thaw a frozen pool_workqueue
+ * @pwq: pool_workqueue to be thawed
+ */
+static void thaw_pwq(struct pool_workqueue *pwq)
+{
+ unsigned long flags;
+
+ raw_spin_lock_irqsave(&pwq->pool->lock, flags);
+ pwq->frozen = false;
+ if (pwq_activate_first_inactive(pwq, true))
+ kick_pool(pwq->pool);
+ raw_spin_unlock_irqrestore(&pwq->pool->lock, flags);
+}
+
/**
* node_activate_pending_pwq - Activate a pending pwq on a wq_node_nr_active
* @nna: wq_node_nr_active to activate a pending pwq for
@@ -4740,6 +4759,18 @@ static void pwq_release_workfn(struct kthread_work *work)
mutex_lock(&wq->mutex);
list_del_rcu(&pwq->pwqs_node);
is_last = list_empty(&wq->pwqs);
+
+ /*
+ * For ordered workqueue with a frozen dfl_pwq, thaw it now.
+ */
+ if (!is_last && (wq->flags & __WQ_ORDERED_EXPLICIT)) {
+ struct pool_workqueue *dfl_pwq;
+
+ dfl_pwq = rcu_access_pointer(wq->dfl_pwq);
+ if (dfl_pwq && dfl_pwq->frozen)
+ thaw_pwq(dfl_pwq);
+ }
+
mutex_unlock(&wq->mutex);
}

@@ -4906,7 +4937,22 @@ static void apply_wqattrs_cleanup(struct apply_wqattrs_ctx *ctx)

for_each_possible_cpu(cpu)
put_pwq_unlocked(ctx->pwq_tbl[cpu]);
+
+ /*
+ * Acquire rcu_read_lock() before refcnt can become 0 to
+ * ensure that ctx->dfl_pwq won't be freed.
+ */
+ rcu_read_lock();
put_pwq_unlocked(ctx->dfl_pwq);
+ if ((ctx->wq->flags & __WQ_ORDERED_EXPLICIT) &&
+ ctx->dfl_pwq && !ctx->dfl_pwq->refcnt) {
+ struct pool_workqueue *dfl_pwq;
+
+ dfl_pwq = rcu_access_pointer(ctx->wq->dfl_pwq);
+ if (dfl_pwq && dfl_pwq->frozen)
+ thaw_pwq(dfl_pwq);
+ }
+ rcu_read_unlock();

free_workqueue_attrs(ctx->attrs);

@@ -4966,6 +5012,15 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,
cpumask_copy(new_attrs->__pod_cpumask, new_attrs->cpumask);
ctx->attrs = new_attrs;

+ /*
+ * For initialized ordered workqueues, there is only one pwq (dfl_pwq).
+ * Temporarily the frozen flag of ctx->dfl_pwq to freeze the execution
+ * of newly queued work items until execution of older work items in
+ * the old pwq has completed.
+ */
+ if (!list_empty(&wq->pwqs) && (wq->flags & __WQ_ORDERED_EXPLICIT))
+ ctx->dfl_pwq->frozen = true;
+
ctx->wq = wq;
return ctx;

@@ -5006,13 +5061,8 @@ static int apply_workqueue_attrs_locked(struct workqueue_struct *wq,
if (WARN_ON(!(wq->flags & WQ_UNBOUND)))
return -EINVAL;

- /* creating multiple pwqs breaks ordering guarantee */
- if (!list_empty(&wq->pwqs)) {
- if (WARN_ON(wq->flags & __WQ_ORDERED_EXPLICIT))
- return -EINVAL;
-
+ if (!list_empty(&wq->pwqs) && !(wq->flags & __WQ_ORDERED_EXPLICIT))
wq->flags &= ~__WQ_ORDERED;
- }

ctx = apply_wqattrs_prepare(wq, attrs, wq_unbound_cpumask);
if (IS_ERR(ctx))
@@ -6504,11 +6554,29 @@ static int workqueue_apply_unbound_cpumask(const cpumask_var_t unbound_cpumask)
if (!(wq->flags & WQ_UNBOUND) || (wq->flags & __WQ_DESTROYING))
continue;

- /* creating multiple pwqs breaks ordering guarantee */
+ /*
+ * We does not support changing cpumask of an ordered workqueue
+ * again before the previous cpumask change is completed.
+ * Sleep up to 100ms in 10ms interval to allow previous
+ * operation to complete and skip it if not done by then.
+ */
if (!list_empty(&wq->pwqs)) {
- if (wq->flags & __WQ_ORDERED_EXPLICIT)
- continue;
- wq->flags &= ~__WQ_ORDERED;
+ struct pool_workqueue *dfl_pwq;
+
+ dfl_pwq = rcu_access_pointer(wq->dfl_pwq);
+ if (!(wq->flags & __WQ_ORDERED_EXPLICIT)) {
+ wq->flags &= ~__WQ_ORDERED;
+ } else if (dfl_pwq && dfl_pwq->frozen) {
+ int i;
+
+ for (i = 0; i < 10; i++) {
+ msleep(10);
+ if (!dfl_pwq->frozen)
+ break;
+ }
+ if (WARN_ON_ONCE(dfl_pwq->frozen))
+ continue;
+ }
}

ctx = apply_wqattrs_prepare(wq, wq->unbound_attrs, unbound_cpumask);
@@ -7024,9 +7092,8 @@ int workqueue_sysfs_register(struct workqueue_struct *wq)
int ret;

/*
- * Adjusting max_active or creating new pwqs by applying
- * attributes breaks ordering guarantee. Disallow exposing ordered
- * workqueues.
+ * Adjusting max_active breaks ordering guarantee. Disallow exposing
+ * ordered workqueues.
*/
if (WARN_ON(wq->flags & __WQ_ORDERED_EXPLICIT))
return -EINVAL;
--
2.39.3