2015-04-02 11:12:07

by Lai Jiangshan

[permalink] [raw]
Subject: [PATCH 0/4 V6] workqueue: Introduce low-level unbound wq sysfs cpumask v5

This patchset mostly copies from Frederic and split the apply_workqueue_attrs()
as TJ's suggest.

This patchset still doesn't include the patch "workqueue: Allow changing attributions
of ordered workqueues", I hope to reduce the review processing. The handling
for the ordered workqueue will be repose after this patchset accepted.

changed from v5:
Apply TJ's comment, rename, error-path .etc.

The default pwq fallback to the low level global cpumask when (and ONLY when) the
cpumask set by the user doesn't overlap with the low level cpumask.

Changed from V4
Add workqueue_unbounds_cpumask_set() kernel API and minimally restruct the patch4.



Cc: Tejun Heo <[email protected]>
Cc: Christoph Lameter <[email protected]>
Cc: Kevin Hilman <[email protected]>
Cc: Lai Jiangshan <[email protected]>
Cc: Mike Galbraith <[email protected]>
Cc: Paul E. McKenney <[email protected]>
Cc: Tejun Heo <[email protected]>
Cc: Viresh Kumar <[email protected]>
Cc: Frederic Weisbecker <[email protected]>
---

Frederic Weisbecker (2):
workqueue: Reorder sysfs code
workqueue: Create low-level unbound workqueues cpumask

Lai Jiangshan (2):
workqueue: split apply_workqueue_attrs() into 3 stages
workqueue: Allow modifying low level unbound workqueue cpumask

include/linux/workqueue.h | 1 +
kernel/workqueue.c | 971 +++++++++++++++++++++++++++-------------------
2 files changed, 568 insertions(+), 404 deletions(-)

--
2.1.0


2015-04-02 11:12:35

by Lai Jiangshan

[permalink] [raw]
Subject: [PATCH 1/4 V6] workqueue: Reorder sysfs code

From: Frederic Weisbecker <[email protected]>

The sysfs code usually belongs to the botom of the file since it deals
with high level objects. In the workqueue code it's misplaced and such
that we'll need to work around functions references to allow the sysfs
code to call APIs like apply_workqueue_attrs().

Lets move that block further in the file, almost the botom.

And declare workqueue_sysfs_unregister() just before destroy_workqueue()
which reference it.

Suggested-by: Tejun Heo <[email protected]>
Cc: Christoph Lameter <[email protected]>
Cc: Kevin Hilman <[email protected]>
Cc: Lai Jiangshan <[email protected]>
Cc: Mike Galbraith <[email protected]>
Cc: Paul E. McKenney <[email protected]>
Cc: Tejun Heo <[email protected]>
Cc: Viresh Kumar <[email protected]>
Signed-off-by: Frederic Weisbecker <[email protected]>
Signed-off-by: Lai Jiangshan <[email protected]>
---
kernel/workqueue.c | 636 +++++++++++++++++++++++++++--------------------------
1 file changed, 319 insertions(+), 317 deletions(-)

diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 1ca0b1d..25394f6 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -3001,323 +3001,6 @@ int execute_in_process_context(work_func_t fn, struct execute_work *ew)
}
EXPORT_SYMBOL_GPL(execute_in_process_context);

-#ifdef CONFIG_SYSFS
-/*
- * Workqueues with WQ_SYSFS flag set is visible to userland via
- * /sys/bus/workqueue/devices/WQ_NAME. All visible workqueues have the
- * following attributes.
- *
- * per_cpu RO bool : whether the workqueue is per-cpu or unbound
- * max_active RW int : maximum number of in-flight work items
- *
- * Unbound workqueues have the following extra attributes.
- *
- * id RO int : the associated pool ID
- * nice RW int : nice value of the workers
- * cpumask RW mask : bitmask of allowed CPUs for the workers
- */
-struct wq_device {
- struct workqueue_struct *wq;
- struct device dev;
-};
-
-static struct workqueue_struct *dev_to_wq(struct device *dev)
-{
- struct wq_device *wq_dev = container_of(dev, struct wq_device, dev);
-
- return wq_dev->wq;
-}
-
-static ssize_t per_cpu_show(struct device *dev, struct device_attribute *attr,
- char *buf)
-{
- struct workqueue_struct *wq = dev_to_wq(dev);
-
- return scnprintf(buf, PAGE_SIZE, "%d\n", (bool)!(wq->flags & WQ_UNBOUND));
-}
-static DEVICE_ATTR_RO(per_cpu);
-
-static ssize_t max_active_show(struct device *dev,
- struct device_attribute *attr, char *buf)
-{
- struct workqueue_struct *wq = dev_to_wq(dev);
-
- return scnprintf(buf, PAGE_SIZE, "%d\n", wq->saved_max_active);
-}
-
-static ssize_t max_active_store(struct device *dev,
- struct device_attribute *attr, const char *buf,
- size_t count)
-{
- struct workqueue_struct *wq = dev_to_wq(dev);
- int val;
-
- if (sscanf(buf, "%d", &val) != 1 || val <= 0)
- return -EINVAL;
-
- workqueue_set_max_active(wq, val);
- return count;
-}
-static DEVICE_ATTR_RW(max_active);
-
-static struct attribute *wq_sysfs_attrs[] = {
- &dev_attr_per_cpu.attr,
- &dev_attr_max_active.attr,
- NULL,
-};
-ATTRIBUTE_GROUPS(wq_sysfs);
-
-static ssize_t wq_pool_ids_show(struct device *dev,
- struct device_attribute *attr, char *buf)
-{
- struct workqueue_struct *wq = dev_to_wq(dev);
- const char *delim = "";
- int node, written = 0;
-
- rcu_read_lock_sched();
- for_each_node(node) {
- written += scnprintf(buf + written, PAGE_SIZE - written,
- "%s%d:%d", delim, node,
- unbound_pwq_by_node(wq, node)->pool->id);
- delim = " ";
- }
- written += scnprintf(buf + written, PAGE_SIZE - written, "\n");
- rcu_read_unlock_sched();
-
- return written;
-}
-
-static ssize_t wq_nice_show(struct device *dev, struct device_attribute *attr,
- char *buf)
-{
- struct workqueue_struct *wq = dev_to_wq(dev);
- int written;
-
- mutex_lock(&wq->mutex);
- written = scnprintf(buf, PAGE_SIZE, "%d\n", wq->unbound_attrs->nice);
- mutex_unlock(&wq->mutex);
-
- return written;
-}
-
-/* prepare workqueue_attrs for sysfs store operations */
-static struct workqueue_attrs *wq_sysfs_prep_attrs(struct workqueue_struct *wq)
-{
- struct workqueue_attrs *attrs;
-
- attrs = alloc_workqueue_attrs(GFP_KERNEL);
- if (!attrs)
- return NULL;
-
- mutex_lock(&wq->mutex);
- copy_workqueue_attrs(attrs, wq->unbound_attrs);
- mutex_unlock(&wq->mutex);
- return attrs;
-}
-
-static ssize_t wq_nice_store(struct device *dev, struct device_attribute *attr,
- const char *buf, size_t count)
-{
- struct workqueue_struct *wq = dev_to_wq(dev);
- struct workqueue_attrs *attrs;
- int ret;
-
- attrs = wq_sysfs_prep_attrs(wq);
- if (!attrs)
- return -ENOMEM;
-
- if (sscanf(buf, "%d", &attrs->nice) == 1 &&
- attrs->nice >= MIN_NICE && attrs->nice <= MAX_NICE)
- ret = apply_workqueue_attrs(wq, attrs);
- else
- ret = -EINVAL;
-
- free_workqueue_attrs(attrs);
- return ret ?: count;
-}
-
-static ssize_t wq_cpumask_show(struct device *dev,
- struct device_attribute *attr, char *buf)
-{
- struct workqueue_struct *wq = dev_to_wq(dev);
- int written;
-
- mutex_lock(&wq->mutex);
- written = scnprintf(buf, PAGE_SIZE, "%*pb\n",
- cpumask_pr_args(wq->unbound_attrs->cpumask));
- mutex_unlock(&wq->mutex);
- return written;
-}
-
-static ssize_t wq_cpumask_store(struct device *dev,
- struct device_attribute *attr,
- const char *buf, size_t count)
-{
- struct workqueue_struct *wq = dev_to_wq(dev);
- struct workqueue_attrs *attrs;
- int ret;
-
- attrs = wq_sysfs_prep_attrs(wq);
- if (!attrs)
- return -ENOMEM;
-
- ret = cpumask_parse(buf, attrs->cpumask);
- if (!ret)
- ret = apply_workqueue_attrs(wq, attrs);
-
- free_workqueue_attrs(attrs);
- return ret ?: count;
-}
-
-static ssize_t wq_numa_show(struct device *dev, struct device_attribute *attr,
- char *buf)
-{
- struct workqueue_struct *wq = dev_to_wq(dev);
- int written;
-
- mutex_lock(&wq->mutex);
- written = scnprintf(buf, PAGE_SIZE, "%d\n",
- !wq->unbound_attrs->no_numa);
- mutex_unlock(&wq->mutex);
-
- return written;
-}
-
-static ssize_t wq_numa_store(struct device *dev, struct device_attribute *attr,
- const char *buf, size_t count)
-{
- struct workqueue_struct *wq = dev_to_wq(dev);
- struct workqueue_attrs *attrs;
- int v, ret;
-
- attrs = wq_sysfs_prep_attrs(wq);
- if (!attrs)
- return -ENOMEM;
-
- ret = -EINVAL;
- if (sscanf(buf, "%d", &v) == 1) {
- attrs->no_numa = !v;
- ret = apply_workqueue_attrs(wq, attrs);
- }
-
- free_workqueue_attrs(attrs);
- return ret ?: count;
-}
-
-static struct device_attribute wq_sysfs_unbound_attrs[] = {
- __ATTR(pool_ids, 0444, wq_pool_ids_show, NULL),
- __ATTR(nice, 0644, wq_nice_show, wq_nice_store),
- __ATTR(cpumask, 0644, wq_cpumask_show, wq_cpumask_store),
- __ATTR(numa, 0644, wq_numa_show, wq_numa_store),
- __ATTR_NULL,
-};
-
-static struct bus_type wq_subsys = {
- .name = "workqueue",
- .dev_groups = wq_sysfs_groups,
-};
-
-static int __init wq_sysfs_init(void)
-{
- return subsys_virtual_register(&wq_subsys, NULL);
-}
-core_initcall(wq_sysfs_init);
-
-static void wq_device_release(struct device *dev)
-{
- struct wq_device *wq_dev = container_of(dev, struct wq_device, dev);
-
- kfree(wq_dev);
-}
-
-/**
- * workqueue_sysfs_register - make a workqueue visible in sysfs
- * @wq: the workqueue to register
- *
- * Expose @wq in sysfs under /sys/bus/workqueue/devices.
- * alloc_workqueue*() automatically calls this function if WQ_SYSFS is set
- * which is the preferred method.
- *
- * Workqueue user should use this function directly iff it wants to apply
- * workqueue_attrs before making the workqueue visible in sysfs; otherwise,
- * apply_workqueue_attrs() may race against userland updating the
- * attributes.
- *
- * Return: 0 on success, -errno on failure.
- */
-int workqueue_sysfs_register(struct workqueue_struct *wq)
-{
- struct wq_device *wq_dev;
- int ret;
-
- /*
- * Adjusting max_active or creating new pwqs by applyting
- * attributes breaks ordering guarantee. Disallow exposing ordered
- * workqueues.
- */
- if (WARN_ON(wq->flags & __WQ_ORDERED))
- return -EINVAL;
-
- wq->wq_dev = wq_dev = kzalloc(sizeof(*wq_dev), GFP_KERNEL);
- if (!wq_dev)
- return -ENOMEM;
-
- wq_dev->wq = wq;
- wq_dev->dev.bus = &wq_subsys;
- wq_dev->dev.init_name = wq->name;
- wq_dev->dev.release = wq_device_release;
-
- /*
- * unbound_attrs are created separately. Suppress uevent until
- * everything is ready.
- */
- dev_set_uevent_suppress(&wq_dev->dev, true);
-
- ret = device_register(&wq_dev->dev);
- if (ret) {
- kfree(wq_dev);
- wq->wq_dev = NULL;
- return ret;
- }
-
- if (wq->flags & WQ_UNBOUND) {
- struct device_attribute *attr;
-
- for (attr = wq_sysfs_unbound_attrs; attr->attr.name; attr++) {
- ret = device_create_file(&wq_dev->dev, attr);
- if (ret) {
- device_unregister(&wq_dev->dev);
- wq->wq_dev = NULL;
- return ret;
- }
- }
- }
-
- dev_set_uevent_suppress(&wq_dev->dev, false);
- kobject_uevent(&wq_dev->dev.kobj, KOBJ_ADD);
- return 0;
-}
-
-/**
- * workqueue_sysfs_unregister - undo workqueue_sysfs_register()
- * @wq: the workqueue to unregister
- *
- * If @wq is registered to sysfs by workqueue_sysfs_register(), unregister.
- */
-static void workqueue_sysfs_unregister(struct workqueue_struct *wq)
-{
- struct wq_device *wq_dev = wq->wq_dev;
-
- if (!wq->wq_dev)
- return;
-
- wq->wq_dev = NULL;
- device_unregister(&wq_dev->dev);
-}
-#else /* CONFIG_SYSFS */
-static void workqueue_sysfs_unregister(struct workqueue_struct *wq) { }
-#endif /* CONFIG_SYSFS */
-
/**
* free_workqueue_attrs - free a workqueue_attrs
* @attrs: workqueue_attrs to free
@@ -4183,6 +3866,8 @@ err_destroy:
}
EXPORT_SYMBOL_GPL(__alloc_workqueue_key);

+static void workqueue_sysfs_unregister(struct workqueue_struct *wq);
+
/**
* destroy_workqueue - safely terminate a workqueue
* @wq: target workqueue
@@ -5014,6 +4699,323 @@ out_unlock:
}
#endif /* CONFIG_FREEZER */

+#ifdef CONFIG_SYSFS
+/*
+ * Workqueues with WQ_SYSFS flag set is visible to userland via
+ * /sys/bus/workqueue/devices/WQ_NAME. All visible workqueues have the
+ * following attributes.
+ *
+ * per_cpu RO bool : whether the workqueue is per-cpu or unbound
+ * max_active RW int : maximum number of in-flight work items
+ *
+ * Unbound workqueues have the following extra attributes.
+ *
+ * id RO int : the associated pool ID
+ * nice RW int : nice value of the workers
+ * cpumask RW mask : bitmask of allowed CPUs for the workers
+ */
+struct wq_device {
+ struct workqueue_struct *wq;
+ struct device dev;
+};
+
+static struct workqueue_struct *dev_to_wq(struct device *dev)
+{
+ struct wq_device *wq_dev = container_of(dev, struct wq_device, dev);
+
+ return wq_dev->wq;
+}
+
+static ssize_t per_cpu_show(struct device *dev, struct device_attribute *attr,
+ char *buf)
+{
+ struct workqueue_struct *wq = dev_to_wq(dev);
+
+ return scnprintf(buf, PAGE_SIZE, "%d\n", (bool)!(wq->flags & WQ_UNBOUND));
+}
+static DEVICE_ATTR_RO(per_cpu);
+
+static ssize_t max_active_show(struct device *dev,
+ struct device_attribute *attr, char *buf)
+{
+ struct workqueue_struct *wq = dev_to_wq(dev);
+
+ return scnprintf(buf, PAGE_SIZE, "%d\n", wq->saved_max_active);
+}
+
+static ssize_t max_active_store(struct device *dev,
+ struct device_attribute *attr, const char *buf,
+ size_t count)
+{
+ struct workqueue_struct *wq = dev_to_wq(dev);
+ int val;
+
+ if (sscanf(buf, "%d", &val) != 1 || val <= 0)
+ return -EINVAL;
+
+ workqueue_set_max_active(wq, val);
+ return count;
+}
+static DEVICE_ATTR_RW(max_active);
+
+static struct attribute *wq_sysfs_attrs[] = {
+ &dev_attr_per_cpu.attr,
+ &dev_attr_max_active.attr,
+ NULL,
+};
+ATTRIBUTE_GROUPS(wq_sysfs);
+
+static ssize_t wq_pool_ids_show(struct device *dev,
+ struct device_attribute *attr, char *buf)
+{
+ struct workqueue_struct *wq = dev_to_wq(dev);
+ const char *delim = "";
+ int node, written = 0;
+
+ rcu_read_lock_sched();
+ for_each_node(node) {
+ written += scnprintf(buf + written, PAGE_SIZE - written,
+ "%s%d:%d", delim, node,
+ unbound_pwq_by_node(wq, node)->pool->id);
+ delim = " ";
+ }
+ written += scnprintf(buf + written, PAGE_SIZE - written, "\n");
+ rcu_read_unlock_sched();
+
+ return written;
+}
+
+static ssize_t wq_nice_show(struct device *dev, struct device_attribute *attr,
+ char *buf)
+{
+ struct workqueue_struct *wq = dev_to_wq(dev);
+ int written;
+
+ mutex_lock(&wq->mutex);
+ written = scnprintf(buf, PAGE_SIZE, "%d\n", wq->unbound_attrs->nice);
+ mutex_unlock(&wq->mutex);
+
+ return written;
+}
+
+/* prepare workqueue_attrs for sysfs store operations */
+static struct workqueue_attrs *wq_sysfs_prep_attrs(struct workqueue_struct *wq)
+{
+ struct workqueue_attrs *attrs;
+
+ attrs = alloc_workqueue_attrs(GFP_KERNEL);
+ if (!attrs)
+ return NULL;
+
+ mutex_lock(&wq->mutex);
+ copy_workqueue_attrs(attrs, wq->unbound_attrs);
+ mutex_unlock(&wq->mutex);
+ return attrs;
+}
+
+static ssize_t wq_nice_store(struct device *dev, struct device_attribute *attr,
+ const char *buf, size_t count)
+{
+ struct workqueue_struct *wq = dev_to_wq(dev);
+ struct workqueue_attrs *attrs;
+ int ret;
+
+ attrs = wq_sysfs_prep_attrs(wq);
+ if (!attrs)
+ return -ENOMEM;
+
+ if (sscanf(buf, "%d", &attrs->nice) == 1 &&
+ attrs->nice >= MIN_NICE && attrs->nice <= MAX_NICE)
+ ret = apply_workqueue_attrs(wq, attrs);
+ else
+ ret = -EINVAL;
+
+ free_workqueue_attrs(attrs);
+ return ret ?: count;
+}
+
+static ssize_t wq_cpumask_show(struct device *dev,
+ struct device_attribute *attr, char *buf)
+{
+ struct workqueue_struct *wq = dev_to_wq(dev);
+ int written;
+
+ mutex_lock(&wq->mutex);
+ written = scnprintf(buf, PAGE_SIZE, "%*pb\n",
+ cpumask_pr_args(wq->unbound_attrs->cpumask));
+ mutex_unlock(&wq->mutex);
+ return written;
+}
+
+static ssize_t wq_cpumask_store(struct device *dev,
+ struct device_attribute *attr,
+ const char *buf, size_t count)
+{
+ struct workqueue_struct *wq = dev_to_wq(dev);
+ struct workqueue_attrs *attrs;
+ int ret;
+
+ attrs = wq_sysfs_prep_attrs(wq);
+ if (!attrs)
+ return -ENOMEM;
+
+ ret = cpumask_parse(buf, attrs->cpumask);
+ if (!ret)
+ ret = apply_workqueue_attrs(wq, attrs);
+
+ free_workqueue_attrs(attrs);
+ return ret ?: count;
+}
+
+static ssize_t wq_numa_show(struct device *dev, struct device_attribute *attr,
+ char *buf)
+{
+ struct workqueue_struct *wq = dev_to_wq(dev);
+ int written;
+
+ mutex_lock(&wq->mutex);
+ written = scnprintf(buf, PAGE_SIZE, "%d\n",
+ !wq->unbound_attrs->no_numa);
+ mutex_unlock(&wq->mutex);
+
+ return written;
+}
+
+static ssize_t wq_numa_store(struct device *dev, struct device_attribute *attr,
+ const char *buf, size_t count)
+{
+ struct workqueue_struct *wq = dev_to_wq(dev);
+ struct workqueue_attrs *attrs;
+ int v, ret;
+
+ attrs = wq_sysfs_prep_attrs(wq);
+ if (!attrs)
+ return -ENOMEM;
+
+ ret = -EINVAL;
+ if (sscanf(buf, "%d", &v) == 1) {
+ attrs->no_numa = !v;
+ ret = apply_workqueue_attrs(wq, attrs);
+ }
+
+ free_workqueue_attrs(attrs);
+ return ret ?: count;
+}
+
+static struct device_attribute wq_sysfs_unbound_attrs[] = {
+ __ATTR(pool_ids, 0444, wq_pool_ids_show, NULL),
+ __ATTR(nice, 0644, wq_nice_show, wq_nice_store),
+ __ATTR(cpumask, 0644, wq_cpumask_show, wq_cpumask_store),
+ __ATTR(numa, 0644, wq_numa_show, wq_numa_store),
+ __ATTR_NULL,
+};
+
+static struct bus_type wq_subsys = {
+ .name = "workqueue",
+ .dev_groups = wq_sysfs_groups,
+};
+
+static int __init wq_sysfs_init(void)
+{
+ return subsys_virtual_register(&wq_subsys, NULL);
+}
+core_initcall(wq_sysfs_init);
+
+static void wq_device_release(struct device *dev)
+{
+ struct wq_device *wq_dev = container_of(dev, struct wq_device, dev);
+
+ kfree(wq_dev);
+}
+
+/**
+ * workqueue_sysfs_register - make a workqueue visible in sysfs
+ * @wq: the workqueue to register
+ *
+ * Expose @wq in sysfs under /sys/bus/workqueue/devices.
+ * alloc_workqueue*() automatically calls this function if WQ_SYSFS is set
+ * which is the preferred method.
+ *
+ * Workqueue user should use this function directly iff it wants to apply
+ * workqueue_attrs before making the workqueue visible in sysfs; otherwise,
+ * apply_workqueue_attrs() may race against userland updating the
+ * attributes.
+ *
+ * Return: 0 on success, -errno on failure.
+ */
+int workqueue_sysfs_register(struct workqueue_struct *wq)
+{
+ struct wq_device *wq_dev;
+ int ret;
+
+ /*
+ * Adjusting max_active or creating new pwqs by applyting
+ * attributes breaks ordering guarantee. Disallow exposing ordered
+ * workqueues.
+ */
+ if (WARN_ON(wq->flags & __WQ_ORDERED))
+ return -EINVAL;
+
+ wq->wq_dev = wq_dev = kzalloc(sizeof(*wq_dev), GFP_KERNEL);
+ if (!wq_dev)
+ return -ENOMEM;
+
+ wq_dev->wq = wq;
+ wq_dev->dev.bus = &wq_subsys;
+ wq_dev->dev.init_name = wq->name;
+ wq_dev->dev.release = wq_device_release;
+
+ /*
+ * unbound_attrs are created separately. Suppress uevent until
+ * everything is ready.
+ */
+ dev_set_uevent_suppress(&wq_dev->dev, true);
+
+ ret = device_register(&wq_dev->dev);
+ if (ret) {
+ kfree(wq_dev);
+ wq->wq_dev = NULL;
+ return ret;
+ }
+
+ if (wq->flags & WQ_UNBOUND) {
+ struct device_attribute *attr;
+
+ for (attr = wq_sysfs_unbound_attrs; attr->attr.name; attr++) {
+ ret = device_create_file(&wq_dev->dev, attr);
+ if (ret) {
+ device_unregister(&wq_dev->dev);
+ wq->wq_dev = NULL;
+ return ret;
+ }
+ }
+ }
+
+ dev_set_uevent_suppress(&wq_dev->dev, false);
+ kobject_uevent(&wq_dev->dev.kobj, KOBJ_ADD);
+ return 0;
+}
+
+/**
+ * workqueue_sysfs_unregister - undo workqueue_sysfs_register()
+ * @wq: the workqueue to unregister
+ *
+ * If @wq is registered to sysfs by workqueue_sysfs_register(), unregister.
+ */
+static void workqueue_sysfs_unregister(struct workqueue_struct *wq)
+{
+ struct wq_device *wq_dev = wq->wq_dev;
+
+ if (!wq->wq_dev)
+ return;
+
+ wq->wq_dev = NULL;
+ device_unregister(&wq_dev->dev);
+}
+#else /* CONFIG_SYSFS */
+static void workqueue_sysfs_unregister(struct workqueue_struct *wq) { }
+#endif /* CONFIG_SYSFS */
+
static void __init wq_numa_init(void)
{
cpumask_var_t *tbl;
--
2.1.0

2015-04-02 11:12:10

by Lai Jiangshan

[permalink] [raw]
Subject: [PATCH 2/4 V6] workqueue: split apply_workqueue_attrs() into 3 stages

Current apply_workqueue_attrs() includes pwqs-allocation and pwqs-installation,
so when we batch multiple apply_workqueue_attrs()s as a transaction, we can't
ensure the transaction must succeed or fail as a complete unit.

To solve this, we split apply_workqueue_attrs() into three stages.
The first stage does the preparation: allocation memory, pwqs.
The second stage does the attrs-installaion and pwqs-installation.
The third stage frees the allocated memory and (old or unused) pwqs.

As the result, batching multiple apply_workqueue_attrs()s can
succeed or fail as a complete unit:
1) batch do all the first stage for all the workqueues
2) only commit all when all the above succeed.

This patch is a preparation for the next patch ("Allow modifying low level
unbound workqueue cpumask") which will do a multiple apply_workqueue_attrs().

The patch doesn't have functionality changed except two minor adjustment:
1) free_unbound_pwq() for the error path is removed, we use the
heavier version put_pwq_unlocked() instead since the error path
is rare. this adjustment simplifies the code.
2) the memory-allocation is also moved into wq_pool_mutex.
this is needed to avoid to do the further splitting.

Suggested-by: Tejun Heo <[email protected]>
Cc: Christoph Lameter <[email protected]>
Cc: Kevin Hilman <[email protected]>
Cc: Lai Jiangshan <[email protected]>
Cc: Mike Galbraith <[email protected]>
Cc: Paul E. McKenney <[email protected]>
Cc: Tejun Heo <[email protected]>
Cc: Viresh Kumar <[email protected]>
Cc: Frederic Weisbecker <[email protected]>
Signed-off-by: Lai Jiangshan <[email protected]>
---
kernel/workqueue.c | 200 +++++++++++++++++++++++++++++++----------------------
1 file changed, 116 insertions(+), 84 deletions(-)

diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 25394f6..15531b8 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -3424,17 +3424,6 @@ static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
return pwq;
}

-/* undo alloc_unbound_pwq(), used only in the error path */
-static void free_unbound_pwq(struct pool_workqueue *pwq)
-{
- lockdep_assert_held(&wq_pool_mutex);
-
- if (pwq) {
- put_unbound_pool(pwq->pool);
- kmem_cache_free(pwq_cache, pwq);
- }
-}
-
/**
* wq_calc_node_mask - calculate a wq_attrs' cpumask for the specified node
* @attrs: the wq_attrs of interest
@@ -3497,42 +3486,49 @@ static struct pool_workqueue *numa_pwq_tbl_install(struct workqueue_struct *wq,
return old_pwq;
}

-/**
- * apply_workqueue_attrs - apply new workqueue_attrs to an unbound workqueue
- * @wq: the target workqueue
- * @attrs: the workqueue_attrs to apply, allocated with alloc_workqueue_attrs()
- *
- * Apply @attrs to an unbound workqueue @wq. Unless disabled, on NUMA
- * machines, this function maps a separate pwq to each NUMA node with
- * possibles CPUs in @attrs->cpumask so that work items are affine to the
- * NUMA node it was issued on. Older pwqs are released as in-flight work
- * items finish. Note that a work item which repeatedly requeues itself
- * back-to-back will stay on its current pwq.
- *
- * Performs GFP_KERNEL allocations.
- *
- * Return: 0 on success and -errno on failure.
- */
-int apply_workqueue_attrs(struct workqueue_struct *wq,
- const struct workqueue_attrs *attrs)
+/* Context to store the prepared attrs & pwqs before installed */
+struct apply_wqattrs_ctx {
+ struct workqueue_struct *wq; /* target to be installed */
+ struct workqueue_attrs *attrs; /* attrs for installing */
+ struct pool_workqueue *dfl_pwq;
+ struct pool_workqueue *pwq_tbl[];
+};
+
+/* Free the resources after success or abort */
+static void apply_wqattrs_cleanup(struct apply_wqattrs_ctx *ctx)
+{
+ if (ctx) {
+ int node;
+
+ /* put the pwqs */
+ for_each_node(node)
+ put_pwq_unlocked(ctx->pwq_tbl[node]);
+ put_pwq_unlocked(ctx->dfl_pwq);
+
+ free_workqueue_attrs(ctx->attrs);
+
+ kfree(ctx);
+ }
+}
+
+/* Allocates the attrs and pwqs for later installment */
+static struct apply_wqattrs_ctx *
+apply_wqattrs_prepare(struct workqueue_struct *wq,
+ const struct workqueue_attrs *attrs)
{
+ struct apply_wqattrs_ctx *ctx;
struct workqueue_attrs *new_attrs, *tmp_attrs;
- struct pool_workqueue **pwq_tbl, *dfl_pwq;
- int node, ret;
+ int node;

- /* only unbound workqueues can change attributes */
- if (WARN_ON(!(wq->flags & WQ_UNBOUND)))
- return -EINVAL;
+ lockdep_assert_held(&wq_pool_mutex);

- /* creating multiple pwqs breaks ordering guarantee */
- if (WARN_ON((wq->flags & __WQ_ORDERED) && !list_empty(&wq->pwqs)))
- return -EINVAL;
+ ctx = kzalloc(sizeof(*ctx) + nr_node_ids * sizeof(ctx->pwq_tbl[0]),
+ GFP_KERNEL);

- pwq_tbl = kzalloc(nr_node_ids * sizeof(pwq_tbl[0]), GFP_KERNEL);
new_attrs = alloc_workqueue_attrs(GFP_KERNEL);
tmp_attrs = alloc_workqueue_attrs(GFP_KERNEL);
- if (!pwq_tbl || !new_attrs || !tmp_attrs)
- goto enomem;
+ if (!ctx || !new_attrs || !tmp_attrs)
+ goto out_free;

/* make a copy of @attrs and sanitize it */
copy_workqueue_attrs(new_attrs, attrs);
@@ -3546,75 +3542,111 @@ int apply_workqueue_attrs(struct workqueue_struct *wq,
copy_workqueue_attrs(tmp_attrs, new_attrs);

/*
- * CPUs should stay stable across pwq creations and installations.
- * Pin CPUs, determine the target cpumask for each node and create
- * pwqs accordingly.
- */
- get_online_cpus();
-
- mutex_lock(&wq_pool_mutex);
-
- /*
* If something goes wrong during CPU up/down, we'll fall back to
* the default pwq covering whole @attrs->cpumask. Always create
* it even if we don't use it immediately.
*/
- dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
- if (!dfl_pwq)
- goto enomem_pwq;
+ ctx->dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
+ if (!ctx->dfl_pwq)
+ goto out_free;

for_each_node(node) {
if (wq_calc_node_cpumask(attrs, node, -1, tmp_attrs->cpumask)) {
- pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
- if (!pwq_tbl[node])
- goto enomem_pwq;
+ ctx->pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
+ if (!ctx->pwq_tbl[node])
+ goto out_free;
} else {
- dfl_pwq->refcnt++;
- pwq_tbl[node] = dfl_pwq;
+ ctx->dfl_pwq->refcnt++;
+ ctx->pwq_tbl[node] = ctx->dfl_pwq;
}
}

- mutex_unlock(&wq_pool_mutex);
+ ctx->wq = wq;
+ ctx->attrs = new_attrs;
+ free_workqueue_attrs(tmp_attrs);
+ return ctx;
+
+out_free:
+ free_workqueue_attrs(tmp_attrs);
+ free_workqueue_attrs(new_attrs);
+ apply_wqattrs_cleanup(ctx);
+ return NULL;
+}
+
+/* Set the unbound_attr and install the prepared pwqs. Should not fail */
+static void apply_wqattrs_commit(struct apply_wqattrs_ctx *ctx)
+{
+ int node;

/* all pwqs have been created successfully, let's install'em */
- mutex_lock(&wq->mutex);
+ mutex_lock(&ctx->wq->mutex);

- copy_workqueue_attrs(wq->unbound_attrs, new_attrs);
+ copy_workqueue_attrs(ctx->wq->unbound_attrs, ctx->attrs);

/* save the previous pwq and install the new one */
for_each_node(node)
- pwq_tbl[node] = numa_pwq_tbl_install(wq, node, pwq_tbl[node]);
+ ctx->pwq_tbl[node] = numa_pwq_tbl_install(ctx->wq, node,
+ ctx->pwq_tbl[node]);

/* @dfl_pwq might not have been used, ensure it's linked */
- link_pwq(dfl_pwq);
- swap(wq->dfl_pwq, dfl_pwq);
+ link_pwq(ctx->dfl_pwq);
+ swap(ctx->wq->dfl_pwq, ctx->dfl_pwq);

- mutex_unlock(&wq->mutex);
+ mutex_unlock(&ctx->wq->mutex);
+}

- /* put the old pwqs */
- for_each_node(node)
- put_pwq_unlocked(pwq_tbl[node]);
- put_pwq_unlocked(dfl_pwq);
+/**
+ * apply_workqueue_attrs - apply new workqueue_attrs to an unbound workqueue
+ * @wq: the target workqueue
+ * @attrs: the workqueue_attrs to apply, allocated with alloc_workqueue_attrs()
+ *
+ * Apply @attrs to an unbound workqueue @wq. Unless disabled, on NUMA
+ * machines, this function maps a separate pwq to each NUMA node with
+ * possibles CPUs in @attrs->cpumask so that work items are affine to the
+ * NUMA node it was issued on. Older pwqs are released as in-flight work
+ * items finish. Note that a work item which repeatedly requeues itself
+ * back-to-back will stay on its current pwq.
+ *
+ * Performs GFP_KERNEL allocations.
+ *
+ * Return: 0 on success and -errno on failure.
+ */
+int apply_workqueue_attrs(struct workqueue_struct *wq,
+ const struct workqueue_attrs *attrs)
+{
+ struct apply_wqattrs_ctx *ctx;
+ int ret = -ENOMEM;

- put_online_cpus();
- ret = 0;
- /* fall through */
-out_free:
- free_workqueue_attrs(tmp_attrs);
- free_workqueue_attrs(new_attrs);
- kfree(pwq_tbl);
- return ret;
+ /* only unbound workqueues can change attributes */
+ if (WARN_ON(!(wq->flags & WQ_UNBOUND)))
+ return -EINVAL;

-enomem_pwq:
- free_unbound_pwq(dfl_pwq);
- for_each_node(node)
- if (pwq_tbl && pwq_tbl[node] != dfl_pwq)
- free_unbound_pwq(pwq_tbl[node]);
+ /* creating multiple pwqs breaks ordering guarantee */
+ if (WARN_ON((wq->flags & __WQ_ORDERED) && !list_empty(&wq->pwqs)))
+ return -EINVAL;
+
+ /*
+ * CPUs should stay stable across pwq creations and installations.
+ * Pin CPUs, determine the target cpumask for each node and create
+ * pwqs accordingly.
+ */
+ get_online_cpus();
+
+ mutex_lock(&wq_pool_mutex);
+ ctx = apply_wqattrs_prepare(wq, attrs);
mutex_unlock(&wq_pool_mutex);
+
put_online_cpus();
-enomem:
- ret = -ENOMEM;
- goto out_free;
+
+ /* the ctx has been prepared successfully, let's commit it */
+ if (ctx) {
+ apply_wqattrs_commit(ctx);
+ ret = 0;
+ }
+
+ apply_wqattrs_cleanup(ctx);
+
+ return ret;
}

/**
--
2.1.0

2015-04-02 11:12:31

by Lai Jiangshan

[permalink] [raw]
Subject: [PATCH 3/4 V6] workqueue: Create low-level unbound workqueues cpumask

From: Frederic Weisbecker <[email protected]>

Create a cpumask that limit the affinity of all unbound workqueues.
This cpumask is controlled though a file at the root of the workqueue
sysfs directory.

It works on a lower-level than the per WQ_SYSFS workqueues cpumask files
such that the effective cpumask applied for a given unbound workqueue is
the intersection of /sys/devices/virtual/workqueue/$WORKQUEUE/cpumask and
the new /sys/devices/virtual/workqueue/cpumask file.

This patch implements the basic infrastructure and the read interface.
wq_unbound_global_cpumask is initially set to cpu_possible_mask.

Cc: Christoph Lameter <[email protected]>
Cc: Kevin Hilman <[email protected]>
Cc: Lai Jiangshan <[email protected]>
Cc: Mike Galbraith <[email protected]>
Cc: Paul E. McKenney <[email protected]>
Cc: Tejun Heo <[email protected]>
Cc: Viresh Kumar <[email protected]>
Signed-off-by: Frederic Weisbecker <[email protected]>
Signed-off-by: Lai Jiangshan <[email protected]>
---
kernel/workqueue.c | 29 +++++++++++++++++++++++++++--
1 file changed, 27 insertions(+), 2 deletions(-)

diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 15531b8..f58c549 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -299,6 +299,8 @@ static DEFINE_SPINLOCK(wq_mayday_lock); /* protects wq->maydays list */
static LIST_HEAD(workqueues); /* PR: list of all workqueues */
static bool workqueue_freezing; /* PL: have wqs started freezing? */

+static cpumask_var_t wq_unbound_global_cpumask;
+
/* the per-cpu worker pools */
static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
cpu_worker_pools);
@@ -3532,7 +3534,7 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,

/* make a copy of @attrs and sanitize it */
copy_workqueue_attrs(new_attrs, attrs);
- cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask);
+ cpumask_and(new_attrs->cpumask, new_attrs->cpumask, wq_unbound_global_cpumask);

/*
* We may create multiple pwqs with differing cpumasks. Make a
@@ -4947,9 +4949,29 @@ static struct bus_type wq_subsys = {
.dev_groups = wq_sysfs_groups,
};

+static ssize_t wq_unbound_global_cpumask_show(struct device *dev,
+ struct device_attribute *attr, char *buf)
+{
+ int written;
+
+ written = scnprintf(buf, PAGE_SIZE, "%*pb\n",
+ cpumask_pr_args(wq_unbound_global_cpumask));
+
+ return written;
+}
+
+static struct device_attribute wq_sysfs_cpumask_attr =
+ __ATTR(cpumask, 0444, wq_unbound_global_cpumask_show, NULL);
+
static int __init wq_sysfs_init(void)
{
- return subsys_virtual_register(&wq_subsys, NULL);
+ int err;
+
+ err = subsys_virtual_register(&wq_subsys, NULL);
+ if (err)
+ return err;
+
+ return device_create_file(wq_subsys.dev_root, &wq_sysfs_cpumask_attr);
}
core_initcall(wq_sysfs_init);

@@ -5097,6 +5119,9 @@ static int __init init_workqueues(void)

WARN_ON(__alignof__(struct pool_workqueue) < __alignof__(long long));

+ BUG_ON(!alloc_cpumask_var(&wq_unbound_global_cpumask, GFP_KERNEL));
+ cpumask_copy(wq_unbound_global_cpumask, cpu_possible_mask);
+
pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC);

cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP);
--
2.1.0

2015-04-02 11:12:29

by Lai Jiangshan

[permalink] [raw]
Subject: [PATCH 4/4 V6] workqueue: Allow modifying low level unbound workqueue cpumask

Allow to modify the low-level unbound workqueues cpumask through
sysfs. This is performed by traversing the entire workqueue list
and calling apply_wqattrs_prepare() on the unbound workqueues
with the low level mask passed in. Only after all the preparation are done,
we commit them all together.

The oreder-workquue is ignore from the low level unbound workqueue cpumask,
it will be handled in near future.

The per-nodes' pwqs are mandatorily controlled by the low level cpumask, while
the default pwq fallback to the low level global cpumask when (and ONLY when) the
cpumask set by the user doesn't overlap with the low level cpumask.

The default wq_unbound_global_cpumask is still cpu_possible_mask due to the workqueue
subsystem doesn't know what is the best default value for the runtime, the
system manager or other subsystem which knows the sufficient information should set
it when needed.

Cc: Christoph Lameter <[email protected]>
Cc: Kevin Hilman <[email protected]>
Cc: Lai Jiangshan <[email protected]>
Cc: Mike Galbraith <[email protected]>
Cc: Paul E. McKenney <[email protected]>
Cc: Tejun Heo <[email protected]>
Cc: Viresh Kumar <[email protected]>
Cc: Frederic Weisbecker <[email protected]>
Original-patch-by: Frederic Weisbecker <[email protected]>
Signed-off-by: Lai Jiangshan <[email protected]>
---
include/linux/workqueue.h | 1 +
kernel/workqueue.c | 124 ++++++++++++++++++++++++++++++++++++++++++----
2 files changed, 115 insertions(+), 10 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index deee212..01483b3 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -424,6 +424,7 @@ struct workqueue_attrs *alloc_workqueue_attrs(gfp_t gfp_mask);
void free_workqueue_attrs(struct workqueue_attrs *attrs);
int apply_workqueue_attrs(struct workqueue_struct *wq,
const struct workqueue_attrs *attrs);
+int workqueue_set_unbound_global_cpumask(cpumask_var_t cpumask);

extern bool queue_work_on(int cpu, struct workqueue_struct *wq,
struct work_struct *work);
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index f58c549..bc21fda 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -299,7 +299,7 @@ static DEFINE_SPINLOCK(wq_mayday_lock); /* protects wq->maydays list */
static LIST_HEAD(workqueues); /* PR: list of all workqueues */
static bool workqueue_freezing; /* PL: have wqs started freezing? */

-static cpumask_var_t wq_unbound_global_cpumask;
+static cpumask_var_t wq_unbound_global_cpumask; /* PL: low level cpumask for all unbound wqs */

/* the per-cpu worker pools */
static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
@@ -3492,6 +3492,7 @@ static struct pool_workqueue *numa_pwq_tbl_install(struct workqueue_struct *wq,
struct apply_wqattrs_ctx {
struct workqueue_struct *wq; /* target to be installed */
struct workqueue_attrs *attrs; /* attrs for installing */
+ struct list_head list; /* queued for batching commit */
struct pool_workqueue *dfl_pwq;
struct pool_workqueue *pwq_tbl[];
};
@@ -3516,10 +3517,11 @@ static void apply_wqattrs_cleanup(struct apply_wqattrs_ctx *ctx)
/* Allocates the attrs and pwqs for later installment */
static struct apply_wqattrs_ctx *
apply_wqattrs_prepare(struct workqueue_struct *wq,
- const struct workqueue_attrs *attrs)
+ const struct workqueue_attrs *attrs,
+ cpumask_var_t unbound_cpumask)
{
struct apply_wqattrs_ctx *ctx;
- struct workqueue_attrs *new_attrs, *tmp_attrs;
+ struct workqueue_attrs *new_attrs, *pwq_attrs, *tmp_attrs;
int node;

lockdep_assert_held(&wq_pool_mutex);
@@ -3528,32 +3530,41 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,
GFP_KERNEL);

new_attrs = alloc_workqueue_attrs(GFP_KERNEL);
+ pwq_attrs = alloc_workqueue_attrs(GFP_KERNEL);
tmp_attrs = alloc_workqueue_attrs(GFP_KERNEL);
if (!ctx || !new_attrs || !tmp_attrs)
goto out_free;

/* make a copy of @attrs and sanitize it */
copy_workqueue_attrs(new_attrs, attrs);
- cpumask_and(new_attrs->cpumask, new_attrs->cpumask, wq_unbound_global_cpumask);
+ copy_workqueue_attrs(pwq_attrs, attrs);
+ cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask);
+ cpumask_and(pwq_attrs->cpumask, pwq_attrs->cpumask, unbound_cpumask);

/*
* We may create multiple pwqs with differing cpumasks. Make a
- * copy of @new_attrs which will be modified and used to obtain
+ * copy of @pwq_attrs which will be modified and used to obtain
* pools.
*/
- copy_workqueue_attrs(tmp_attrs, new_attrs);
+ copy_workqueue_attrs(tmp_attrs, pwq_attrs);

/*
* If something goes wrong during CPU up/down, we'll fall back to
* the default pwq covering whole @attrs->cpumask. Always create
* it even if we don't use it immediately.
+ *
+ * If the cpumask set by the user doesn't overlap with the global
+ * wq_unbound_global_cpumask, we fallback to the global
+ * wq_unbound_global_cpumask.
*/
- ctx->dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
+ if (unlikely(cpumask_empty(tmp_attrs->cpumask)))
+ cpumask_copy(tmp_attrs->cpumask, unbound_cpumask);
+ ctx->dfl_pwq = alloc_unbound_pwq(wq, tmp_attrs);
if (!ctx->dfl_pwq)
goto out_free;

for_each_node(node) {
- if (wq_calc_node_cpumask(attrs, node, -1, tmp_attrs->cpumask)) {
+ if (wq_calc_node_cpumask(pwq_attrs, node, -1, tmp_attrs->cpumask)) {
ctx->pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
if (!ctx->pwq_tbl[node])
goto out_free;
@@ -3566,10 +3577,12 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,
ctx->wq = wq;
ctx->attrs = new_attrs;
free_workqueue_attrs(tmp_attrs);
+ free_workqueue_attrs(pwq_attrs);
return ctx;

out_free:
free_workqueue_attrs(tmp_attrs);
+ free_workqueue_attrs(pwq_attrs);
free_workqueue_attrs(new_attrs);
apply_wqattrs_cleanup(ctx);
return NULL;
@@ -3635,7 +3648,7 @@ int apply_workqueue_attrs(struct workqueue_struct *wq,
get_online_cpus();

mutex_lock(&wq_pool_mutex);
- ctx = apply_wqattrs_prepare(wq, attrs);
+ ctx = apply_wqattrs_prepare(wq, attrs, wq_unbound_global_cpumask);
mutex_unlock(&wq_pool_mutex);

put_online_cpus();
@@ -3709,6 +3722,9 @@ static void wq_update_unbound_numa(struct workqueue_struct *wq, int cpu,
* wq's, the default pwq should be used.
*/
if (wq_calc_node_cpumask(wq->unbound_attrs, node, cpu_off, cpumask)) {
+ cpumask_and(cpumask, cpumask, wq_unbound_global_cpumask);
+ if (cpumask_empty(cpumask))
+ goto use_dfl_pwq;
if (cpumask_equal(cpumask, pwq->pool->attrs->cpumask))
goto out_unlock;
} else {
@@ -4733,6 +4749,74 @@ out_unlock:
}
#endif /* CONFIG_FREEZER */

+static int workqueue_apply_unbound_global_cpumask(cpumask_var_t cpumask)
+{
+ LIST_HEAD(ctxs);
+ int ret = 0;
+ struct workqueue_struct *wq;
+ struct apply_wqattrs_ctx *ctx, *n;
+
+ lockdep_assert_held(&wq_pool_mutex);
+
+ list_for_each_entry(wq, &workqueues, list) {
+ if (!(wq->flags & WQ_UNBOUND))
+ continue;
+ /* creating multiple pwqs breaks ordering guarantee */
+ if (wq->flags & __WQ_ORDERED)
+ continue;
+
+ ctx = apply_wqattrs_prepare(wq, wq->unbound_attrs,
+ cpumask);
+ if (!ctx) {
+ ret = -ENOMEM;
+ break;
+ }
+
+ list_add_tail(&ctx->list, &ctxs);
+ }
+
+ list_for_each_entry_safe(ctx, n, &ctxs, list) {
+ if (!ret)
+ apply_wqattrs_commit(ctx);
+ apply_wqattrs_cleanup(ctx);
+ }
+
+ return ret;
+}
+
+/**
+ * workqueue_set_unbound_global_cpumask - Set the low-level unbound cpumask
+ * @cpumask: the cpumask to set
+ *
+ * The low-level workqueues cpumask is a global cpumask that limits
+ * the affinity of all unbound workqueues. This function check the @cpumask
+ * and apply it to all unbound workqueues and updates all pwqs of them.
+ * When all succeed, it saves @cpumask to the global low-level unbound
+ * cpumask.
+ *
+ * Retun: 0 - Success
+ * -EINVAL - No online cpu in the @cpumask
+ * -ENOMEM - Failed to allocate memory for attrs or pwqs.
+ */
+int workqueue_set_unbound_global_cpumask(cpumask_var_t cpumask)
+{
+ int ret = -EINVAL;
+
+ get_online_cpus();
+ cpumask_and(cpumask, cpumask, cpu_possible_mask);
+ if (!cpumask_empty(cpumask)) {
+ mutex_lock(&wq_pool_mutex);
+ ret = workqueue_apply_unbound_global_cpumask(cpumask);
+ if (ret >= 0)
+ cpumask_copy(wq_unbound_global_cpumask, cpumask);
+ mutex_unlock(&wq_pool_mutex);
+ }
+ put_online_cpus();
+
+ return ret;
+}
+EXPORT_SYMBOL_GPL(workqueue_set_unbound_global_cpumask);
+
#ifdef CONFIG_SYSFS
/*
* Workqueues with WQ_SYSFS flag set is visible to userland via
@@ -4954,14 +5038,34 @@ static ssize_t wq_unbound_global_cpumask_show(struct device *dev,
{
int written;

+ mutex_lock(&wq_pool_mutex);
written = scnprintf(buf, PAGE_SIZE, "%*pb\n",
cpumask_pr_args(wq_unbound_global_cpumask));
+ mutex_unlock(&wq_pool_mutex);

return written;
}

+static ssize_t wq_unbound_global_cpumask_store(struct device *dev,
+ struct device_attribute *attr, const char *buf, size_t count)
+{
+ cpumask_var_t cpumask;
+ int ret;
+
+ if (!zalloc_cpumask_var(&cpumask, GFP_KERNEL))
+ return -ENOMEM;
+
+ ret = cpumask_parse(buf, cpumask);
+ if (!ret)
+ ret = workqueue_set_unbound_global_cpumask(cpumask);
+
+ free_cpumask_var(cpumask);
+ return ret ? ret : count;
+}
+
static struct device_attribute wq_sysfs_cpumask_attr =
- __ATTR(cpumask, 0444, wq_unbound_global_cpumask_show, NULL);
+ __ATTR(cpumask, 0644, wq_unbound_global_cpumask_show,
+ wq_unbound_global_cpumask_store);

static int __init wq_sysfs_init(void)
{
--
2.1.0

2015-04-06 15:22:11

by Tejun Heo

[permalink] [raw]
Subject: Re: [PATCH 1/4 V6] workqueue: Reorder sysfs code

On Thu, Apr 02, 2015 at 07:14:39PM +0800, Lai Jiangshan wrote:
> From: Frederic Weisbecker <[email protected]>
>
> The sysfs code usually belongs to the botom of the file since it deals
> with high level objects. In the workqueue code it's misplaced and such
> that we'll need to work around functions references to allow the sysfs
> code to call APIs like apply_workqueue_attrs().
>
> Lets move that block further in the file, almost the botom.
>
> And declare workqueue_sysfs_unregister() just before destroy_workqueue()
> which reference it.
>
> Suggested-by: Tejun Heo <[email protected]>
> Cc: Christoph Lameter <[email protected]>
> Cc: Kevin Hilman <[email protected]>
> Cc: Lai Jiangshan <[email protected]>
> Cc: Mike Galbraith <[email protected]>
> Cc: Paul E. McKenney <[email protected]>
> Cc: Tejun Heo <[email protected]>
> Cc: Viresh Kumar <[email protected]>
> Signed-off-by: Frederic Weisbecker <[email protected]>
> Signed-off-by: Lai Jiangshan <[email protected]>

Moved the forward declaration of workqueue_sysfs_unregister() where
other forward declarations are and applied to wq/for-4.1.

Thanks.

--
tejun

2015-04-06 15:39:27

by Tejun Heo

[permalink] [raw]
Subject: Re: [PATCH 2/4 V6] workqueue: split apply_workqueue_attrs() into 3 stages

On Thu, Apr 02, 2015 at 07:14:40PM +0800, Lai Jiangshan wrote:
> The patch doesn't have functionality changed except two minor adjustment:
> 1) free_unbound_pwq() for the error path is removed, we use the
> heavier version put_pwq_unlocked() instead since the error path
> is rare. this adjustment simplifies the code.
> 2) the memory-allocation is also moved into wq_pool_mutex.
> this is needed to avoid to do the further splitting.

And we're dropping online_cpus locking before applying the new pwq's.
Is that safe?

Thanks.

--
tejun

2015-04-06 15:53:20

by Tejun Heo

[permalink] [raw]
Subject: Re: [PATCH 4/4 V6] workqueue: Allow modifying low level unbound workqueue cpumask

On Thu, Apr 02, 2015 at 07:14:42PM +0800, Lai Jiangshan wrote:
> /* make a copy of @attrs and sanitize it */
> copy_workqueue_attrs(new_attrs, attrs);
> - cpumask_and(new_attrs->cpumask, new_attrs->cpumask, wq_unbound_global_cpumask);
> + copy_workqueue_attrs(pwq_attrs, attrs);
> + cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask);
> + cpumask_and(pwq_attrs->cpumask, pwq_attrs->cpumask, unbound_cpumask);

Hmmm... why do we need to keep track of both cpu_possible_mask and
unbound_cpumask? Can't we just make unbound_cpumask replace
cpu_possible_mask for unbound workqueues?

Thanks.

--
tejun

2015-04-07 01:23:20

by Lai Jiangshan

[permalink] [raw]
Subject: Re: [PATCH 4/4 V6] workqueue: Allow modifying low level unbound workqueue cpumask

On 04/06/2015 11:53 PM, Tejun Heo wrote:
> On Thu, Apr 02, 2015 at 07:14:42PM +0800, Lai Jiangshan wrote:
>> /* make a copy of @attrs and sanitize it */
>> copy_workqueue_attrs(new_attrs, attrs);
>> - cpumask_and(new_attrs->cpumask, new_attrs->cpumask, wq_unbound_global_cpumask);
>> + copy_workqueue_attrs(pwq_attrs, attrs);
>> + cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask);
>> + cpumask_and(pwq_attrs->cpumask, pwq_attrs->cpumask, unbound_cpumask);
>
> Hmmm... why do we need to keep track of both cpu_possible_mask and
> unbound_cpumask? Can't we just make unbound_cpumask replace
> cpu_possible_mask for unbound workqueues?
>

I want to save the original user-setting cpumask.

When any time the wq_unbound_global_cpumask is changed,
the new effective cpumask is
the-original-user-setting-cpumask & wq_unbound_global_cpumask
instead of
the-last-effective-cpumask & wq_unbound_global_cpumask.

thanks,
Lai

> Thanks.
>

2015-04-07 01:58:23

by Tejun Heo

[permalink] [raw]
Subject: Re: [PATCH 4/4 V6] workqueue: Allow modifying low level unbound workqueue cpumask

Hello, Lai.

On Tue, Apr 07, 2015 at 09:25:59AM +0800, Lai Jiangshan wrote:
> On 04/06/2015 11:53 PM, Tejun Heo wrote:
> > On Thu, Apr 02, 2015 at 07:14:42PM +0800, Lai Jiangshan wrote:
> >> /* make a copy of @attrs and sanitize it */
> >> copy_workqueue_attrs(new_attrs, attrs);
> >> - cpumask_and(new_attrs->cpumask, new_attrs->cpumask, wq_unbound_global_cpumask);
> >> + copy_workqueue_attrs(pwq_attrs, attrs);
> >> + cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask);
> >> + cpumask_and(pwq_attrs->cpumask, pwq_attrs->cpumask, unbound_cpumask);
> >
> > Hmmm... why do we need to keep track of both cpu_possible_mask and
> > unbound_cpumask? Can't we just make unbound_cpumask replace
> > cpu_possible_mask for unbound workqueues?
> >
>
> I want to save the original user-setting cpumask.
>
> When any time the wq_unbound_global_cpumask is changed,
> the new effective cpumask is
> the-original-user-setting-cpumask & wq_unbound_global_cpumask
> instead of
> the-last-effective-cpumask & wq_unbound_global_cpumask.

Yes, I get that, but that'd require just tracking the original
configured value and the unbound_cpumask masked value, no? What am I
missing?

Thanks.

--
tejun

2015-04-07 02:30:37

by Lai Jiangshan

[permalink] [raw]
Subject: Re: [PATCH 4/4 V6] workqueue: Allow modifying low level unbound workqueue cpumask

On 04/07/2015 09:58 AM, Tejun Heo wrote:
> Hello, Lai.
>
> On Tue, Apr 07, 2015 at 09:25:59AM +0800, Lai Jiangshan wrote:
>> On 04/06/2015 11:53 PM, Tejun Heo wrote:
>>> On Thu, Apr 02, 2015 at 07:14:42PM +0800, Lai Jiangshan wrote:
>>>> /* make a copy of @attrs and sanitize it */
>>>> copy_workqueue_attrs(new_attrs, attrs);
>>>> - cpumask_and(new_attrs->cpumask, new_attrs->cpumask, wq_unbound_global_cpumask);
>>>> + copy_workqueue_attrs(pwq_attrs, attrs);
>>>> + cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask);
>>>> + cpumask_and(pwq_attrs->cpumask, pwq_attrs->cpumask, unbound_cpumask);
>>>
>>> Hmmm... why do we need to keep track of both cpu_possible_mask and
>>> unbound_cpumask? Can't we just make unbound_cpumask replace
>>> cpu_possible_mask for unbound workqueues?
>>>
>>
>> I want to save the original user-setting cpumask.
>>
>> When any time the wq_unbound_global_cpumask is changed,
>> the new effective cpumask is
>> the-original-user-setting-cpumask & wq_unbound_global_cpumask
>> instead of
>> the-last-effective-cpumask & wq_unbound_global_cpumask.
>
> Yes, I get that, but that'd require just tracking the original

wq->unbound_attrs (new_attrs) saves the original configured value
and is needed to be keep track of.
For sanity, it needs to be masked with cpu_possible_mask.

+ cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask);

This code is changed back to the original code (before this patchset).

In the next iterate, I will reduce the number of the local vars to make
the code clearer.

> configured value and the unbound_cpumask masked value, no? What am I
> missing?
>
> Thanks.
>

2015-04-07 11:24:00

by Lai Jiangshan

[permalink] [raw]
Subject: [PATCH 1/3 V7] workqueue: split apply_workqueue_attrs() into 3 stages

Current apply_workqueue_attrs() includes pwqs-allocation and pwqs-installation,
so when we batch multiple apply_workqueue_attrs()s as a transaction, we can't
ensure the transaction must succeed or fail as a complete unit.

To solve this, we split apply_workqueue_attrs() into three stages.
The first stage does the preparation: allocation memory, pwqs.
The second stage does the attrs-installaion and pwqs-installation.
The third stage frees the allocated memory and (old or unused) pwqs.

As the result, batching multiple apply_workqueue_attrs()s can
succeed or fail as a complete unit:
1) batch do all the first stage for all the workqueues
2) only commit all when all the above succeed.

This patch is a preparation for the next patch ("Allow modifying low level
unbound workqueue cpumask") which will do a multiple apply_workqueue_attrs().

The patch doesn't have functionality changed except two minor adjustment:
1) free_unbound_pwq() for the error path is removed, we use the
heavier version put_pwq_unlocked() instead since the error path
is rare. this adjustment simplifies the code.
2) the memory-allocation is also moved into wq_pool_mutex.
this is needed to avoid to do the further splitting.

Suggested-by: Tejun Heo <[email protected]>
Cc: Christoph Lameter <[email protected]>
Cc: Kevin Hilman <[email protected]>
Cc: Lai Jiangshan <[email protected]>
Cc: Mike Galbraith <[email protected]>
Cc: Paul E. McKenney <[email protected]>
Cc: Tejun Heo <[email protected]>
Cc: Viresh Kumar <[email protected]>
Cc: Frederic Weisbecker <[email protected]>
Signed-off-by: Lai Jiangshan <[email protected]>
---
kernel/workqueue.c | 200 +++++++++++++++++++++++++++++++----------------------
1 file changed, 116 insertions(+), 84 deletions(-)

diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 586ad91..b13753a 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -3425,17 +3425,6 @@ static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
return pwq;
}

-/* undo alloc_unbound_pwq(), used only in the error path */
-static void free_unbound_pwq(struct pool_workqueue *pwq)
-{
- lockdep_assert_held(&wq_pool_mutex);
-
- if (pwq) {
- put_unbound_pool(pwq->pool);
- kmem_cache_free(pwq_cache, pwq);
- }
-}
-
/**
* wq_calc_node_mask - calculate a wq_attrs' cpumask for the specified node
* @attrs: the wq_attrs of interest
@@ -3498,42 +3487,49 @@ static struct pool_workqueue *numa_pwq_tbl_install(struct workqueue_struct *wq,
return old_pwq;
}

-/**
- * apply_workqueue_attrs - apply new workqueue_attrs to an unbound workqueue
- * @wq: the target workqueue
- * @attrs: the workqueue_attrs to apply, allocated with alloc_workqueue_attrs()
- *
- * Apply @attrs to an unbound workqueue @wq. Unless disabled, on NUMA
- * machines, this function maps a separate pwq to each NUMA node with
- * possibles CPUs in @attrs->cpumask so that work items are affine to the
- * NUMA node it was issued on. Older pwqs are released as in-flight work
- * items finish. Note that a work item which repeatedly requeues itself
- * back-to-back will stay on its current pwq.
- *
- * Performs GFP_KERNEL allocations.
- *
- * Return: 0 on success and -errno on failure.
- */
-int apply_workqueue_attrs(struct workqueue_struct *wq,
- const struct workqueue_attrs *attrs)
+/* Context to store the prepared attrs & pwqs before applied */
+struct apply_wqattrs_ctx {
+ struct workqueue_struct *wq; /* target to be applied */
+ struct workqueue_attrs *attrs; /* configured attrs */
+ struct pool_workqueue *dfl_pwq;
+ struct pool_workqueue *pwq_tbl[];
+};
+
+/* Free the resources after success or abort */
+static void apply_wqattrs_cleanup(struct apply_wqattrs_ctx *ctx)
+{
+ if (ctx) {
+ int node;
+
+ /* put the pwqs */
+ for_each_node(node)
+ put_pwq_unlocked(ctx->pwq_tbl[node]);
+ put_pwq_unlocked(ctx->dfl_pwq);
+
+ free_workqueue_attrs(ctx->attrs);
+
+ kfree(ctx);
+ }
+}
+
+/* Allocates the attrs and pwqs for later installment */
+static struct apply_wqattrs_ctx *
+apply_wqattrs_prepare(struct workqueue_struct *wq,
+ const struct workqueue_attrs *attrs)
{
+ struct apply_wqattrs_ctx *ctx;
struct workqueue_attrs *new_attrs, *tmp_attrs;
- struct pool_workqueue **pwq_tbl, *dfl_pwq;
- int node, ret;
+ int node;

- /* only unbound workqueues can change attributes */
- if (WARN_ON(!(wq->flags & WQ_UNBOUND)))
- return -EINVAL;
+ lockdep_assert_held(&wq_pool_mutex);

- /* creating multiple pwqs breaks ordering guarantee */
- if (WARN_ON((wq->flags & __WQ_ORDERED) && !list_empty(&wq->pwqs)))
- return -EINVAL;
+ ctx = kzalloc(sizeof(*ctx) + nr_node_ids * sizeof(ctx->pwq_tbl[0]),
+ GFP_KERNEL);

- pwq_tbl = kzalloc(nr_node_ids * sizeof(pwq_tbl[0]), GFP_KERNEL);
new_attrs = alloc_workqueue_attrs(GFP_KERNEL);
tmp_attrs = alloc_workqueue_attrs(GFP_KERNEL);
- if (!pwq_tbl || !new_attrs || !tmp_attrs)
- goto enomem;
+ if (!ctx || !new_attrs || !tmp_attrs)
+ goto out_free;

/* make a copy of @attrs and sanitize it */
copy_workqueue_attrs(new_attrs, attrs);
@@ -3547,75 +3543,111 @@ int apply_workqueue_attrs(struct workqueue_struct *wq,
copy_workqueue_attrs(tmp_attrs, new_attrs);

/*
- * CPUs should stay stable across pwq creations and installations.
- * Pin CPUs, determine the target cpumask for each node and create
- * pwqs accordingly.
- */
- get_online_cpus();
-
- mutex_lock(&wq_pool_mutex);
-
- /*
* If something goes wrong during CPU up/down, we'll fall back to
* the default pwq covering whole @attrs->cpumask. Always create
* it even if we don't use it immediately.
*/
- dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
- if (!dfl_pwq)
- goto enomem_pwq;
+ ctx->dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
+ if (!ctx->dfl_pwq)
+ goto out_free;

for_each_node(node) {
if (wq_calc_node_cpumask(attrs, node, -1, tmp_attrs->cpumask)) {
- pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
- if (!pwq_tbl[node])
- goto enomem_pwq;
+ ctx->pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
+ if (!ctx->pwq_tbl[node])
+ goto out_free;
} else {
- dfl_pwq->refcnt++;
- pwq_tbl[node] = dfl_pwq;
+ ctx->dfl_pwq->refcnt++;
+ ctx->pwq_tbl[node] = ctx->dfl_pwq;
}
}

- mutex_unlock(&wq_pool_mutex);
+ ctx->attrs = new_attrs;
+ ctx->wq = wq;
+ free_workqueue_attrs(tmp_attrs);
+ return ctx;
+
+out_free:
+ free_workqueue_attrs(tmp_attrs);
+ free_workqueue_attrs(new_attrs);
+ apply_wqattrs_cleanup(ctx);
+ return NULL;
+}
+
+/* Set the unbound_attr and install the prepared pwqs. Should not fail */
+static void apply_wqattrs_commit(struct apply_wqattrs_ctx *ctx)
+{
+ int node;

/* all pwqs have been created successfully, let's install'em */
- mutex_lock(&wq->mutex);
+ mutex_lock(&ctx->wq->mutex);

- copy_workqueue_attrs(wq->unbound_attrs, new_attrs);
+ copy_workqueue_attrs(ctx->wq->unbound_attrs, ctx->attrs);

/* save the previous pwq and install the new one */
for_each_node(node)
- pwq_tbl[node] = numa_pwq_tbl_install(wq, node, pwq_tbl[node]);
+ ctx->pwq_tbl[node] = numa_pwq_tbl_install(ctx->wq, node,
+ ctx->pwq_tbl[node]);

/* @dfl_pwq might not have been used, ensure it's linked */
- link_pwq(dfl_pwq);
- swap(wq->dfl_pwq, dfl_pwq);
+ link_pwq(ctx->dfl_pwq);
+ swap(ctx->wq->dfl_pwq, ctx->dfl_pwq);

- mutex_unlock(&wq->mutex);
+ mutex_unlock(&ctx->wq->mutex);
+}

- /* put the old pwqs */
- for_each_node(node)
- put_pwq_unlocked(pwq_tbl[node]);
- put_pwq_unlocked(dfl_pwq);
+/**
+ * apply_workqueue_attrs - apply new workqueue_attrs to an unbound workqueue
+ * @wq: the target workqueue
+ * @attrs: the workqueue_attrs to apply, allocated with alloc_workqueue_attrs()
+ *
+ * Apply @attrs to an unbound workqueue @wq. Unless disabled, on NUMA
+ * machines, this function maps a separate pwq to each NUMA node with
+ * possibles CPUs in @attrs->cpumask so that work items are affine to the
+ * NUMA node it was issued on. Older pwqs are released as in-flight work
+ * items finish. Note that a work item which repeatedly requeues itself
+ * back-to-back will stay on its current pwq.
+ *
+ * Performs GFP_KERNEL allocations.
+ *
+ * Return: 0 on success and -errno on failure.
+ */
+int apply_workqueue_attrs(struct workqueue_struct *wq,
+ const struct workqueue_attrs *attrs)
+{
+ struct apply_wqattrs_ctx *ctx;
+ int ret = -ENOMEM;

- put_online_cpus();
- ret = 0;
- /* fall through */
-out_free:
- free_workqueue_attrs(tmp_attrs);
- free_workqueue_attrs(new_attrs);
- kfree(pwq_tbl);
- return ret;
+ /* only unbound workqueues can change attributes */
+ if (WARN_ON(!(wq->flags & WQ_UNBOUND)))
+ return -EINVAL;

-enomem_pwq:
- free_unbound_pwq(dfl_pwq);
- for_each_node(node)
- if (pwq_tbl && pwq_tbl[node] != dfl_pwq)
- free_unbound_pwq(pwq_tbl[node]);
+ /* creating multiple pwqs breaks ordering guarantee */
+ if (WARN_ON((wq->flags & __WQ_ORDERED) && !list_empty(&wq->pwqs)))
+ return -EINVAL;
+
+ /*
+ * CPUs should stay stable across pwq creations and installations.
+ * Pin CPUs, determine the target cpumask for each node and create
+ * pwqs accordingly.
+ */
+ get_online_cpus();
+
+ mutex_lock(&wq_pool_mutex);
+ ctx = apply_wqattrs_prepare(wq, attrs);
mutex_unlock(&wq_pool_mutex);
+
+ /* the ctx has been prepared successfully, let's commit it */
+ if (ctx) {
+ apply_wqattrs_commit(ctx);
+ ret = 0;
+ }
+
put_online_cpus();
-enomem:
- ret = -ENOMEM;
- goto out_free;
+
+ apply_wqattrs_cleanup(ctx);
+
+ return ret;
}

/**
--
2.1.0

2015-04-07 11:23:57

by Lai Jiangshan

[permalink] [raw]
Subject: [PATCH 2/3 V7] workqueue: Create low-level unbound workqueues cpumask

From: Frederic Weisbecker <[email protected]>

Create a cpumask that limit the affinity of all unbound workqueues.
This cpumask is controlled though a file at the root of the workqueue
sysfs directory.

It works on a lower-level than the per WQ_SYSFS workqueues cpumask files
such that the effective cpumask applied for a given unbound workqueue is
the intersection of /sys/devices/virtual/workqueue/$WORKQUEUE/cpumask and
the new /sys/devices/virtual/workqueue/cpumask file.

This patch implements the basic infrastructure and the read interface.
wq_unbound_global_cpumask is initially set to cpu_possible_mask.

Cc: Christoph Lameter <[email protected]>
Cc: Kevin Hilman <[email protected]>
Cc: Lai Jiangshan <[email protected]>
Cc: Mike Galbraith <[email protected]>
Cc: Paul E. McKenney <[email protected]>
Cc: Tejun Heo <[email protected]>
Cc: Viresh Kumar <[email protected]>
Signed-off-by: Frederic Weisbecker <[email protected]>
Signed-off-by: Lai Jiangshan <[email protected]>
---
kernel/workqueue.c | 29 +++++++++++++++++++++++++++--
1 file changed, 27 insertions(+), 2 deletions(-)

diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index b13753a..cbccf5d 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -299,6 +299,8 @@ static DEFINE_SPINLOCK(wq_mayday_lock); /* protects wq->maydays list */
static LIST_HEAD(workqueues); /* PR: list of all workqueues */
static bool workqueue_freezing; /* PL: have wqs started freezing? */

+static cpumask_var_t wq_unbound_global_cpumask;
+
/* the per-cpu worker pools */
static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
cpu_worker_pools);
@@ -3533,7 +3535,7 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,

/* make a copy of @attrs and sanitize it */
copy_workqueue_attrs(new_attrs, attrs);
- cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask);
+ cpumask_and(new_attrs->cpumask, new_attrs->cpumask, wq_unbound_global_cpumask);

/*
* We may create multiple pwqs with differing cpumasks. Make a
@@ -4946,9 +4948,29 @@ static struct bus_type wq_subsys = {
.dev_groups = wq_sysfs_groups,
};

+static ssize_t wq_unbound_global_cpumask_show(struct device *dev,
+ struct device_attribute *attr, char *buf)
+{
+ int written;
+
+ written = scnprintf(buf, PAGE_SIZE, "%*pb\n",
+ cpumask_pr_args(wq_unbound_global_cpumask));
+
+ return written;
+}
+
+static struct device_attribute wq_sysfs_cpumask_attr =
+ __ATTR(cpumask, 0444, wq_unbound_global_cpumask_show, NULL);
+
static int __init wq_sysfs_init(void)
{
- return subsys_virtual_register(&wq_subsys, NULL);
+ int err;
+
+ err = subsys_virtual_register(&wq_subsys, NULL);
+ if (err)
+ return err;
+
+ return device_create_file(wq_subsys.dev_root, &wq_sysfs_cpumask_attr);
}
core_initcall(wq_sysfs_init);

@@ -5096,6 +5118,9 @@ static int __init init_workqueues(void)

WARN_ON(__alignof__(struct pool_workqueue) < __alignof__(long long));

+ BUG_ON(!alloc_cpumask_var(&wq_unbound_global_cpumask, GFP_KERNEL));
+ cpumask_copy(wq_unbound_global_cpumask, cpu_possible_mask);
+
pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC);

cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP);
--
2.1.0

2015-04-07 11:24:05

by Lai Jiangshan

[permalink] [raw]
Subject: [PATCH 3/3 V7] workqueue: Allow modifying low level unbound workqueue cpumask

Allow to modify the low-level unbound workqueues cpumask through
sysfs. This is performed by traversing the entire workqueue list
and calling apply_wqattrs_prepare() on the unbound workqueues
with the low level mask passed in. Only after all the preparation are done,
we commit them all together.

The oreder-workquue is ignore from the low level unbound workqueue cpumask,
it will be handled in near future.

The per-nodes' pwqs are mandatorily controlled by the low level cpumask, while
the default pwq fallback to the low level global cpumask when (and ONLY when) the
cpumask set by the user doesn't overlap with the low level cpumask.

The default wq_unbound_global_cpumask is still cpu_possible_mask due to the workqueue
subsystem doesn't know what is the best default value for the runtime, the
system manager or other subsystem which knows the sufficient information should set
it when needed.

Cc: Christoph Lameter <[email protected]>
Cc: Kevin Hilman <[email protected]>
Cc: Lai Jiangshan <[email protected]>
Cc: Mike Galbraith <[email protected]>
Cc: Paul E. McKenney <[email protected]>
Cc: Tejun Heo <[email protected]>
Cc: Viresh Kumar <[email protected]>
Cc: Frederic Weisbecker <[email protected]>
Original-patch-by: Frederic Weisbecker <[email protected]>
Signed-off-by: Lai Jiangshan <[email protected]>
---
include/linux/workqueue.h | 1 +
kernel/workqueue.c | 122 +++++++++++++++++++++++++++++++++++++++++++---
2 files changed, 116 insertions(+), 7 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index deee212..01483b3 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -424,6 +424,7 @@ struct workqueue_attrs *alloc_workqueue_attrs(gfp_t gfp_mask);
void free_workqueue_attrs(struct workqueue_attrs *attrs);
int apply_workqueue_attrs(struct workqueue_struct *wq,
const struct workqueue_attrs *attrs);
+int workqueue_set_unbound_global_cpumask(cpumask_var_t cpumask);

extern bool queue_work_on(int cpu, struct workqueue_struct *wq,
struct work_struct *work);
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index cbccf5d..557612e 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -299,7 +299,7 @@ static DEFINE_SPINLOCK(wq_mayday_lock); /* protects wq->maydays list */
static LIST_HEAD(workqueues); /* PR: list of all workqueues */
static bool workqueue_freezing; /* PL: have wqs started freezing? */

-static cpumask_var_t wq_unbound_global_cpumask;
+static cpumask_var_t wq_unbound_global_cpumask; /* PL: low level cpumask for all unbound wqs */

/* the per-cpu worker pools */
static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
@@ -3493,6 +3493,7 @@ static struct pool_workqueue *numa_pwq_tbl_install(struct workqueue_struct *wq,
struct apply_wqattrs_ctx {
struct workqueue_struct *wq; /* target to be applied */
struct workqueue_attrs *attrs; /* configured attrs */
+ struct list_head list; /* queued for batching commit */
struct pool_workqueue *dfl_pwq;
struct pool_workqueue *pwq_tbl[];
};
@@ -3517,7 +3518,8 @@ static void apply_wqattrs_cleanup(struct apply_wqattrs_ctx *ctx)
/* Allocates the attrs and pwqs for later installment */
static struct apply_wqattrs_ctx *
apply_wqattrs_prepare(struct workqueue_struct *wq,
- const struct workqueue_attrs *attrs)
+ const struct workqueue_attrs *attrs,
+ cpumask_var_t unbound_cpumask)
{
struct apply_wqattrs_ctx *ctx;
struct workqueue_attrs *new_attrs, *tmp_attrs;
@@ -3535,7 +3537,7 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,

/* make a copy of @attrs and sanitize it */
copy_workqueue_attrs(new_attrs, attrs);
- cpumask_and(new_attrs->cpumask, new_attrs->cpumask, wq_unbound_global_cpumask);
+ cpumask_and(new_attrs->cpumask, new_attrs->cpumask, unbound_cpumask);

/*
* We may create multiple pwqs with differing cpumasks. Make a
@@ -3548,13 +3550,18 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,
* If something goes wrong during CPU up/down, we'll fall back to
* the default pwq covering whole @attrs->cpumask. Always create
* it even if we don't use it immediately.
+ *
+ * If the cpumask set by the user doesn't overlap with the
+ * unbound_cpumask, we fallback to the unbound_cpumask.
*/
- ctx->dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
+ if (unlikely(cpumask_empty(tmp_attrs->cpumask)))
+ cpumask_copy(tmp_attrs->cpumask, unbound_cpumask);
+ ctx->dfl_pwq = alloc_unbound_pwq(wq, tmp_attrs);
if (!ctx->dfl_pwq)
goto out_free;

for_each_node(node) {
- if (wq_calc_node_cpumask(attrs, node, -1, tmp_attrs->cpumask)) {
+ if (wq_calc_node_cpumask(new_attrs, node, -1, tmp_attrs->cpumask)) {
ctx->pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
if (!ctx->pwq_tbl[node])
goto out_free;
@@ -3564,7 +3571,11 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,
}
}

+ /* save the user configured attrs */
+ copy_workqueue_attrs(new_attrs, attrs);
+ cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask);
ctx->attrs = new_attrs;
+
ctx->wq = wq;
free_workqueue_attrs(tmp_attrs);
return ctx;
@@ -3636,7 +3647,7 @@ int apply_workqueue_attrs(struct workqueue_struct *wq,
get_online_cpus();

mutex_lock(&wq_pool_mutex);
- ctx = apply_wqattrs_prepare(wq, attrs);
+ ctx = apply_wqattrs_prepare(wq, attrs, wq_unbound_global_cpumask);
mutex_unlock(&wq_pool_mutex);

/* the ctx has been prepared successfully, let's commit it */
@@ -3710,6 +3721,14 @@ static void wq_update_unbound_numa(struct workqueue_struct *wq, int cpu,
* wq's, the default pwq should be used.
*/
if (wq_calc_node_cpumask(wq->unbound_attrs, node, cpu_off, cpumask)) {
+ /*
+ * wq->unbound_attrs is the user configured attrs whose
+ * cpumask is not masked with wq_unbound_global_cpumask,
+ * so we make complete it.
+ */
+ cpumask_and(cpumask, cpumask, wq_unbound_global_cpumask);
+ if (cpumask_empty(cpumask))
+ goto use_dfl_pwq;
if (cpumask_equal(cpumask, pwq->pool->attrs->cpumask))
goto out_unlock;
} else {
@@ -4732,6 +4751,75 @@ out_unlock:
}
#endif /* CONFIG_FREEZER */

+static int workqueue_apply_unbound_global_cpumask(cpumask_var_t cpumask)
+{
+ LIST_HEAD(ctxs);
+ int ret = 0;
+ struct workqueue_struct *wq;
+ struct apply_wqattrs_ctx *ctx, *n;
+
+ lockdep_assert_held(&wq_pool_mutex);
+
+ list_for_each_entry(wq, &workqueues, list) {
+ if (!(wq->flags & WQ_UNBOUND))
+ continue;
+ /* creating multiple pwqs breaks ordering guarantee */
+ if (wq->flags & __WQ_ORDERED)
+ continue;
+
+ ctx = apply_wqattrs_prepare(wq, wq->unbound_attrs,
+ cpumask);
+ if (!ctx) {
+ ret = -ENOMEM;
+ break;
+ }
+
+ list_add_tail(&ctx->list, &ctxs);
+ }
+
+ list_for_each_entry_safe(ctx, n, &ctxs, list) {
+ list_del(&ctx->list);
+ if (!ret)
+ apply_wqattrs_commit(ctx);
+ apply_wqattrs_cleanup(ctx);
+ }
+
+ return ret;
+}
+
+/**
+ * workqueue_set_unbound_global_cpumask - Set the low-level unbound cpumask
+ * @cpumask: the cpumask to set
+ *
+ * The low-level workqueues cpumask is a global cpumask that limits
+ * the affinity of all unbound workqueues. This function check the @cpumask
+ * and apply it to all unbound workqueues and updates all pwqs of them.
+ * When all succeed, it saves @cpumask to the global low-level unbound
+ * cpumask.
+ *
+ * Retun: 0 - Success
+ * -EINVAL - No online cpu in the @cpumask
+ * -ENOMEM - Failed to allocate memory for attrs or pwqs.
+ */
+int workqueue_set_unbound_global_cpumask(cpumask_var_t cpumask)
+{
+ int ret = -EINVAL;
+
+ get_online_cpus();
+ cpumask_and(cpumask, cpumask, cpu_possible_mask);
+ if (!cpumask_empty(cpumask)) {
+ mutex_lock(&wq_pool_mutex);
+ ret = workqueue_apply_unbound_global_cpumask(cpumask);
+ if (ret >= 0)
+ cpumask_copy(wq_unbound_global_cpumask, cpumask);
+ mutex_unlock(&wq_pool_mutex);
+ }
+ put_online_cpus();
+
+ return ret;
+}
+EXPORT_SYMBOL_GPL(workqueue_set_unbound_global_cpumask);
+
#ifdef CONFIG_SYSFS
/*
* Workqueues with WQ_SYSFS flag set is visible to userland via
@@ -4953,14 +5041,34 @@ static ssize_t wq_unbound_global_cpumask_show(struct device *dev,
{
int written;

+ mutex_lock(&wq_pool_mutex);
written = scnprintf(buf, PAGE_SIZE, "%*pb\n",
cpumask_pr_args(wq_unbound_global_cpumask));
+ mutex_unlock(&wq_pool_mutex);

return written;
}

+static ssize_t wq_unbound_global_cpumask_store(struct device *dev,
+ struct device_attribute *attr, const char *buf, size_t count)
+{
+ cpumask_var_t cpumask;
+ int ret;
+
+ if (!zalloc_cpumask_var(&cpumask, GFP_KERNEL))
+ return -ENOMEM;
+
+ ret = cpumask_parse(buf, cpumask);
+ if (!ret)
+ ret = workqueue_set_unbound_global_cpumask(cpumask);
+
+ free_cpumask_var(cpumask);
+ return ret ? ret : count;
+}
+
static struct device_attribute wq_sysfs_cpumask_attr =
- __ATTR(cpumask, 0444, wq_unbound_global_cpumask_show, NULL);
+ __ATTR(cpumask, 0644, wq_unbound_global_cpumask_show,
+ wq_unbound_global_cpumask_store);

static int __init wq_sysfs_init(void)
{
--
2.1.0

2015-04-17 14:57:29

by Tejun Heo

[permalink] [raw]
Subject: Re: [PATCH 1/3 V7] workqueue: split apply_workqueue_attrs() into 3 stages

On Tue, Apr 07, 2015 at 07:26:35PM +0800, Lai Jiangshan wrote:
> Current apply_workqueue_attrs() includes pwqs-allocation and pwqs-installation,
> so when we batch multiple apply_workqueue_attrs()s as a transaction, we can't
> ensure the transaction must succeed or fail as a complete unit.

Lai, can you please

* Break out threads when posting new version.

* List the changes since the last version?

Thanks.

--
tejun

2015-04-20 03:18:22

by Lai Jiangshan

[permalink] [raw]
Subject: Re: [PATCH 1/3 V7] workqueue: split apply_workqueue_attrs() into 3 stages

On 04/17/2015 10:57 PM, Tejun Heo wrote:
> On Tue, Apr 07, 2015 at 07:26:35PM +0800, Lai Jiangshan wrote:
>> Current apply_workqueue_attrs() includes pwqs-allocation and pwqs-installation,
>> so when we batch multiple apply_workqueue_attrs()s as a transaction, we can't
>> ensure the transaction must succeed or fail as a complete unit.
>
> Lai, can you please
>
> * Break out threads when posting new version.
>
> * List the changes since the last version?

My bad.

I thought you had only two comments which ware handled in v7.
It was considered (with my laziness) too less to be renarrated.

---

In [2/4 V6]:
```quote from TJ:
And we're dropping online_cpus locking before applying the new pwq's.
Is that safe?
```

It was my fault, I didn't remember when I wrongly moved the code.
The patch had been tried my best keep functionality unchanged.
It is fixed in [1/3 V7].

----

The changes from [4/4 V6] to [3/3 V7]:
ctx->attrs (the original configured value) is not calculated until
it is saved into ctx->attrs, and a corresponding local-var is killed.

Could you please consider it as [0/3 V7] this time?

Thx
Lai.

>
> Thanks.
>

2015-04-22 19:39:43

by Tejun Heo

[permalink] [raw]
Subject: Re: [PATCH 3/3 V7] workqueue: Allow modifying low level unbound workqueue cpumask

Hello,

Generally looks good to me. Some minor things below.

On Tue, Apr 07, 2015 at 07:26:37PM +0800, Lai Jiangshan wrote:
> diff --git a/kernel/workqueue.c b/kernel/workqueue.c
> index cbccf5d..557612e 100644
> --- a/kernel/workqueue.c
> +++ b/kernel/workqueue.c
> @@ -299,7 +299,7 @@ static DEFINE_SPINLOCK(wq_mayday_lock); /* protects wq->maydays list */
> static LIST_HEAD(workqueues); /* PR: list of all workqueues */
> static bool workqueue_freezing; /* PL: have wqs started freezing? */
>
> -static cpumask_var_t wq_unbound_global_cpumask;
> +static cpumask_var_t wq_unbound_global_cpumask; /* PL: low level cpumask for all unbound wqs */

Are we set on this variable name? What would we lose by naming it
wq_unbound_cpumask or wq_cpu_possible_mask?

> @@ -3493,6 +3493,7 @@ static struct pool_workqueue *numa_pwq_tbl_install(struct workqueue_struct *wq,
> struct apply_wqattrs_ctx {
> struct workqueue_struct *wq; /* target to be applied */
> struct workqueue_attrs *attrs; /* configured attrs */
> + struct list_head list; /* queued for batching commit */
batch commit
> struct pool_workqueue *dfl_pwq;
> struct pool_workqueue *pwq_tbl[];
> };
> @@ -3517,7 +3518,8 @@ static void apply_wqattrs_cleanup(struct apply_wqattrs_ctx *ctx)
> /* Allocates the attrs and pwqs for later installment */
> static struct apply_wqattrs_ctx *
> apply_wqattrs_prepare(struct workqueue_struct *wq,
> - const struct workqueue_attrs *attrs)
> + const struct workqueue_attrs *attrs,
> + cpumask_var_t unbound_cpumask)

Why do we need this tho? The global mask is protected by pool mutex,
right? The update function can set it to the new value and just call
update and revert on failure.

> @@ -3710,6 +3721,14 @@ static void wq_update_unbound_numa(struct workqueue_struct *wq, int cpu,
> * wq's, the default pwq should be used.
> */
> if (wq_calc_node_cpumask(wq->unbound_attrs, node, cpu_off, cpumask)) {
> + /*
> + * wq->unbound_attrs is the user configured attrs whose
> + * cpumask is not masked with wq_unbound_global_cpumask,
> + * so we make complete it.
> + */
> + cpumask_and(cpumask, cpumask, wq_unbound_global_cpumask);
> + if (cpumask_empty(cpumask))
> + goto use_dfl_pwq;

Wouldn't it be better to apply the global cpumask before calling
wq_calc_node_cpumask()? Or just move it inside wq_calc_node_cpumask?

Thanks.

--
tejun

2015-04-22 23:03:00

by Frederic Weisbecker

[permalink] [raw]
Subject: Re: [PATCH 3/3 V7] workqueue: Allow modifying low level unbound workqueue cpumask

On Wed, Apr 22, 2015 at 03:39:35PM -0400, Tejun Heo wrote:
> Hello,
>
> Generally looks good to me. Some minor things below.
>
> On Tue, Apr 07, 2015 at 07:26:37PM +0800, Lai Jiangshan wrote:
> > diff --git a/kernel/workqueue.c b/kernel/workqueue.c
> > index cbccf5d..557612e 100644
> > --- a/kernel/workqueue.c
> > +++ b/kernel/workqueue.c
> > @@ -299,7 +299,7 @@ static DEFINE_SPINLOCK(wq_mayday_lock); /* protects wq->maydays list */
> > static LIST_HEAD(workqueues); /* PR: list of all workqueues */
> > static bool workqueue_freezing; /* PL: have wqs started freezing? */
> >
> > -static cpumask_var_t wq_unbound_global_cpumask;
> > +static cpumask_var_t wq_unbound_global_cpumask; /* PL: low level cpumask for all unbound wqs */
>
> Are we set on this variable name? What would we lose by naming it
> wq_unbound_cpumask or wq_cpu_possible_mask?

I like wq_unbound_cpumask personally. In fact I like to have "unbound"
inside to express what's concerned here. I like wq_cpu_possible_mask too
but unfortunately it suggests it's about all workqueues (including per cpu
ones) while it's not.

2015-04-23 06:29:15

by Mike Galbraith

[permalink] [raw]
Subject: Re: [PATCH 3/3 V7] workqueue: Allow modifying low level unbound workqueue cpumask


FWIW on the testing side, I'm running these in 3.12(ish), 4.0 and 4.1
rt trees with NOHZ_FULL, and have yet to meet a problem.

-Mike