The workqueue library is made up by two components:
- threadpool: handles the lifetime of the threads
- workqueue: handles work distribution among the threads
This first patch introduces the threadpool, starting from its creation
and destruction functions.
Thread management is based on the prototype from Alexey:
https://lore.kernel.org/lkml/[email protected]/
Each thread in the threadpool executes the same function (aka task)
with a different argument tidx.
Threads use a pair of pipes to communicate with the main process.
The threadpool is static (all threads will be spawned at the same time).
Future work could include making it resizable and adding affinity support
(as in Alexey prototype).
Suggested-by: Alexey Bayduraev <[email protected]>
Signed-off-by: Riccardo Mancini <[email protected]>
---
tools/perf/util/Build | 1 +
tools/perf/util/workqueue/Build | 1 +
tools/perf/util/workqueue/threadpool.c | 208 +++++++++++++++++++++++++
tools/perf/util/workqueue/threadpool.h | 30 ++++
4 files changed, 240 insertions(+)
create mode 100644 tools/perf/util/workqueue/Build
create mode 100644 tools/perf/util/workqueue/threadpool.c
create mode 100644 tools/perf/util/workqueue/threadpool.h
diff --git a/tools/perf/util/Build b/tools/perf/util/Build
index 2d4fa13041789cd6..c7b09701661c869d 100644
--- a/tools/perf/util/Build
+++ b/tools/perf/util/Build
@@ -180,6 +180,7 @@ perf-$(CONFIG_LIBBABELTRACE) += data-convert-bt.o
perf-y += data-convert-json.o
perf-y += scripting-engines/
+perf-y += workqueue/
perf-$(CONFIG_ZLIB) += zlib.o
perf-$(CONFIG_LZMA) += lzma.o
diff --git a/tools/perf/util/workqueue/Build b/tools/perf/util/workqueue/Build
new file mode 100644
index 0000000000000000..8b72a6cd4e2cba0d
--- /dev/null
+++ b/tools/perf/util/workqueue/Build
@@ -0,0 +1 @@
+perf-y += threadpool.o
diff --git a/tools/perf/util/workqueue/threadpool.c b/tools/perf/util/workqueue/threadpool.c
new file mode 100644
index 0000000000000000..0004ce606d5fa73d
--- /dev/null
+++ b/tools/perf/util/workqueue/threadpool.c
@@ -0,0 +1,208 @@
+// SPDX-License-Identifier: GPL-2.0
+#include <stdlib.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include "debug.h"
+#include <asm/bug.h>
+#include <linux/zalloc.h>
+#include <linux/string.h>
+#include <linux/err.h>
+#include <linux/kernel.h>
+#include "threadpool.h"
+
+enum threadpool_status {
+ THREADPOOL_STATUS__STOPPED, /* no threads */
+ THREADPOOL_STATUS__ERROR, /* errors */
+ THREADPOOL_STATUS__MAX
+};
+
+struct threadpool {
+ int nr_threads; /* number of threads in the pool */
+ struct threadpool_entry *threads; /* array of threads in the pool */
+ struct task_struct *current_task; /* current executing function */
+ enum threadpool_status status; /* current status of the pool */
+};
+
+struct threadpool_entry {
+ int idx; /* idx of thread in pool->threads */
+ pid_t tid; /* tid of thread */
+ struct threadpool *pool; /* parent threadpool */
+ struct {
+ int ack[2]; /* messages from thread (acks) */
+ int cmd[2]; /* messages to thread (commands) */
+ } pipes;
+};
+
+/**
+ * threadpool_entry__init_pipes - initialize all pipes of @thread
+ */
+static void threadpool_entry__init_pipes(struct threadpool_entry *thread)
+{
+ thread->pipes.ack[0] = -1;
+ thread->pipes.ack[1] = -1;
+ thread->pipes.cmd[0] = -1;
+ thread->pipes.cmd[1] = -1;
+}
+
+/**
+ * threadpool_entry__open_pipes - open all pipes of @thread
+ */
+static int threadpool_entry__open_pipes(struct threadpool_entry *thread)
+{
+ if (pipe(thread->pipes.ack)) {
+ pr_debug2("threadpool: failed to create comm pipe 'from': %s\n",
+ strerror(errno));
+ return -ENOMEM;
+ }
+
+ if (pipe(thread->pipes.cmd)) {
+ pr_debug2("threadpool: failed to create comm pipe 'to': %s\n",
+ strerror(errno));
+ close(thread->pipes.ack[0]);
+ thread->pipes.ack[0] = -1;
+ close(thread->pipes.ack[1]);
+ thread->pipes.ack[1] = -1;
+ return -ENOMEM;
+ }
+
+ return 0;
+}
+
+/**
+ * threadpool_entry__close_pipes - close all communication pipes of @thread
+ */
+static void threadpool_entry__close_pipes(struct threadpool_entry *thread)
+{
+ if (thread->pipes.ack[0] != -1) {
+ close(thread->pipes.ack[0]);
+ thread->pipes.ack[0] = -1;
+ }
+ if (thread->pipes.ack[1] != -1) {
+ close(thread->pipes.ack[1]);
+ thread->pipes.ack[1] = -1;
+ }
+ if (thread->pipes.cmd[0] != -1) {
+ close(thread->pipes.cmd[0]);
+ thread->pipes.cmd[0] = -1;
+ }
+ if (thread->pipes.cmd[1] != -1) {
+ close(thread->pipes.cmd[1]);
+ thread->pipes.cmd[1] = -1;
+ }
+}
+
+/**
+ * threadpool__new - create a fixed threadpool with @n_threads threads
+ */
+struct threadpool *threadpool__new(int n_threads)
+{
+ int ret, err, t;
+ struct threadpool *pool = malloc(sizeof(*pool));
+
+ if (!pool) {
+ pr_debug2("threadpool: cannot allocate pool: %s\n",
+ strerror(errno));
+ err = -ENOMEM;
+ goto out_return;
+ }
+
+ if (n_threads <= 0) {
+ pr_debug2("threadpool: invalid number of threads: %d\n",
+ n_threads);
+ err = -EINVAL;
+ goto out_free_pool;
+ }
+
+ pool->nr_threads = n_threads;
+ pool->current_task = NULL;
+
+ pool->threads = calloc(n_threads, sizeof(*pool->threads));
+ if (!pool->threads) {
+ pr_debug2("threadpool: cannot allocate threads: %s\n",
+ strerror(errno));
+ err = -ENOMEM;
+ goto out_free_pool;
+ }
+
+ for (t = 0; t < n_threads; t++) {
+ pool->threads[t].idx = t;
+ pool->threads[t].tid = -1;
+ pool->threads[t].pool = pool;
+ threadpool_entry__init_pipes(&pool->threads[t]);
+ }
+
+ for (t = 0; t < n_threads; t++) {
+ ret = threadpool_entry__open_pipes(&pool->threads[t]);
+ if (ret) {
+ err = -ret;
+ goto out_close_pipes;
+ }
+ }
+
+ pool->status = THREADPOOL_STATUS__STOPPED;
+
+ return pool;
+
+out_close_pipes:
+ for (t = 0; t < n_threads; t++)
+ threadpool_entry__close_pipes(&pool->threads[t]);
+
+ zfree(&pool->threads);
+out_free_pool:
+ free(pool);
+out_return:
+ return ERR_PTR(err);
+}
+
+/**
+ * threadpool__strerror - print message regarding given @err in @pool
+ *
+ * Buffer size should be at least THREADPOOL_STRERR_BUFSIZE bytes.
+ */
+int threadpool__strerror(struct threadpool *pool __maybe_unused, int err, char *buf, size_t size)
+{
+ char sbuf[STRERR_BUFSIZE], *emsg;
+
+ emsg = str_error_r(err, sbuf, sizeof(sbuf));
+ return scnprintf(buf, size, "Error: %s.\n", emsg);
+}
+
+/**
+ * threadpool__new_strerror - print message regarding @err_ptr
+ *
+ * Buffer size should be at least THREADPOOL_STRERR_BUFSIZE bytes.
+ */
+int threadpool__new_strerror(struct threadpool *err_ptr, char *buf, size_t size)
+{
+ return threadpool__strerror(err_ptr, PTR_ERR(err_ptr), buf, size);
+}
+
+/**
+ * threadpool__delete - free the @pool and all its resources
+ */
+void threadpool__delete(struct threadpool *pool)
+{
+ int t;
+
+ if (IS_ERR_OR_NULL(pool))
+ return;
+
+ WARN_ON(pool->status != THREADPOOL_STATUS__STOPPED
+ && pool->status != THREADPOOL_STATUS__ERROR);
+
+ for (t = 0; t < pool->nr_threads; t++)
+ threadpool_entry__close_pipes(&pool->threads[t]);
+
+ zfree(&pool->threads);
+ free(pool);
+}
+
+/**
+ * threadpool__size - get number of threads in the threadpool
+ */
+int threadpool__size(struct threadpool *pool)
+{
+ return pool->nr_threads;
+}
diff --git a/tools/perf/util/workqueue/threadpool.h b/tools/perf/util/workqueue/threadpool.h
new file mode 100644
index 0000000000000000..fb18aa32fb64f671
--- /dev/null
+++ b/tools/perf/util/workqueue/threadpool.h
@@ -0,0 +1,30 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef __WORKQUEUE_THREADPOOL_H
+#define __WORKQUEUE_THREADPOOL_H
+
+struct threadpool;
+struct task_struct;
+
+typedef void (*task_func_t)(int tidx, struct task_struct *task);
+
+struct task_struct {
+ task_func_t fn;
+};
+
+extern struct threadpool *threadpool__new(int n_threads);
+extern void threadpool__delete(struct threadpool *pool);
+
+extern int threadpool__start(struct threadpool *pool);
+extern int threadpool__stop(struct threadpool *pool);
+
+extern int threadpool__execute(struct threadpool *pool, struct task_struct *task);
+extern int threadpool__wait(struct threadpool *pool);
+
+extern int threadpool__size(struct threadpool *pool);
+
+/* Error management */
+#define THREADPOOL_STRERR_BUFSIZE (128+STRERR_BUFSIZE)
+extern int threadpool__strerror(struct threadpool *pool, int err, char *buf, size_t size);
+extern int threadpool__new_strerror(struct threadpool *err_ptr, char *buf, size_t size);
+
+#endif /* __WORKQUEUE_THREADPOOL_H */
--
2.31.1
Hi Riccardo,
On Fri, Jul 30, 2021 at 8:34 AM Riccardo Mancini <[email protected]> wrote:
>
> The workqueue library is made up by two components:
> - threadpool: handles the lifetime of the threads
> - workqueue: handles work distribution among the threads
>
> This first patch introduces the threadpool, starting from its creation
> and destruction functions.
> Thread management is based on the prototype from Alexey:
> https://lore.kernel.org/lkml/[email protected]/
>
> Each thread in the threadpool executes the same function (aka task)
> with a different argument tidx.
> Threads use a pair of pipes to communicate with the main process.
> The threadpool is static (all threads will be spawned at the same time).
> Future work could include making it resizable and adding affinity support
> (as in Alexey prototype).
>
> Suggested-by: Alexey Bayduraev <[email protected]>
> Signed-off-by: Riccardo Mancini <[email protected]>
> ---
> tools/perf/util/Build | 1 +
> tools/perf/util/workqueue/Build | 1 +
> tools/perf/util/workqueue/threadpool.c | 208 +++++++++++++++++++++++++
> tools/perf/util/workqueue/threadpool.h | 30 ++++
> 4 files changed, 240 insertions(+)
> create mode 100644 tools/perf/util/workqueue/Build
> create mode 100644 tools/perf/util/workqueue/threadpool.c
> create mode 100644 tools/perf/util/workqueue/threadpool.h
>
> diff --git a/tools/perf/util/Build b/tools/perf/util/Build
> index 2d4fa13041789cd6..c7b09701661c869d 100644
> --- a/tools/perf/util/Build
> +++ b/tools/perf/util/Build
> @@ -180,6 +180,7 @@ perf-$(CONFIG_LIBBABELTRACE) += data-convert-bt.o
> perf-y += data-convert-json.o
>
> perf-y += scripting-engines/
> +perf-y += workqueue/
>
> perf-$(CONFIG_ZLIB) += zlib.o
> perf-$(CONFIG_LZMA) += lzma.o
> diff --git a/tools/perf/util/workqueue/Build b/tools/perf/util/workqueue/Build
> new file mode 100644
> index 0000000000000000..8b72a6cd4e2cba0d
> --- /dev/null
> +++ b/tools/perf/util/workqueue/Build
> @@ -0,0 +1 @@
> +perf-y += threadpool.o
> diff --git a/tools/perf/util/workqueue/threadpool.c b/tools/perf/util/workqueue/threadpool.c
> new file mode 100644
> index 0000000000000000..0004ce606d5fa73d
> --- /dev/null
> +++ b/tools/perf/util/workqueue/threadpool.c
> @@ -0,0 +1,208 @@
> +// SPDX-License-Identifier: GPL-2.0
> +#include <stdlib.h>
> +#include <stdio.h>
> +#include <unistd.h>
> +#include <errno.h>
> +#include <string.h>
> +#include "debug.h"
> +#include <asm/bug.h>
> +#include <linux/zalloc.h>
> +#include <linux/string.h>
> +#include <linux/err.h>
> +#include <linux/kernel.h>
> +#include "threadpool.h"
> +
> +enum threadpool_status {
> + THREADPOOL_STATUS__STOPPED, /* no threads */
> + THREADPOOL_STATUS__ERROR, /* errors */
> + THREADPOOL_STATUS__MAX
> +};
> +
> +struct threadpool {
> + int nr_threads; /* number of threads in the pool */
> + struct threadpool_entry *threads; /* array of threads in the pool */
> + struct task_struct *current_task; /* current executing function */
> + enum threadpool_status status; /* current status of the pool */
Better to move to below the nr_threads for alignment.
> +};
> +
> +struct threadpool_entry {
> + int idx; /* idx of thread in pool->threads */
> + pid_t tid; /* tid of thread */
> + struct threadpool *pool; /* parent threadpool */
> + struct {
> + int ack[2]; /* messages from thread (acks) */
> + int cmd[2]; /* messages to thread (commands) */
> + } pipes;
> +};
> +
> +/**
> + * threadpool_entry__init_pipes - initialize all pipes of @thread
> + */
> +static void threadpool_entry__init_pipes(struct threadpool_entry *thread)
> +{
> + thread->pipes.ack[0] = -1;
> + thread->pipes.ack[1] = -1;
> + thread->pipes.cmd[0] = -1;
> + thread->pipes.cmd[1] = -1;
> +}
> +
> +/**
> + * threadpool_entry__open_pipes - open all pipes of @thread
> + */
> +static int threadpool_entry__open_pipes(struct threadpool_entry *thread)
> +{
> + if (pipe(thread->pipes.ack)) {
> + pr_debug2("threadpool: failed to create comm pipe 'from': %s\n",
> + strerror(errno));
> + return -ENOMEM;
> + }
> +
> + if (pipe(thread->pipes.cmd)) {
> + pr_debug2("threadpool: failed to create comm pipe 'to': %s\n",
> + strerror(errno));
> + close(thread->pipes.ack[0]);
> + thread->pipes.ack[0] = -1;
> + close(thread->pipes.ack[1]);
> + thread->pipes.ack[1] = -1;
Maybe you don't need to do it here if the caller handles it already ...
> + return -ENOMEM;
> + }
> +
> + return 0;
> +}
> +
> +/**
> + * threadpool_entry__close_pipes - close all communication pipes of @thread
> + */
> +static void threadpool_entry__close_pipes(struct threadpool_entry *thread)
> +{
> + if (thread->pipes.ack[0] != -1) {
> + close(thread->pipes.ack[0]);
> + thread->pipes.ack[0] = -1;
> + }
> + if (thread->pipes.ack[1] != -1) {
> + close(thread->pipes.ack[1]);
> + thread->pipes.ack[1] = -1;
> + }
> + if (thread->pipes.cmd[0] != -1) {
> + close(thread->pipes.cmd[0]);
> + thread->pipes.cmd[0] = -1;
> + }
> + if (thread->pipes.cmd[1] != -1) {
> + close(thread->pipes.cmd[1]);
> + thread->pipes.cmd[1] = -1;
> + }
> +}
> +
> +/**
> + * threadpool__new - create a fixed threadpool with @n_threads threads
> + */
> +struct threadpool *threadpool__new(int n_threads)
> +{
> + int ret, err, t;
> + struct threadpool *pool = malloc(sizeof(*pool));
> +
> + if (!pool) {
> + pr_debug2("threadpool: cannot allocate pool: %s\n",
> + strerror(errno));
> + err = -ENOMEM;
> + goto out_return;
> + }
> +
> + if (n_threads <= 0) {
> + pr_debug2("threadpool: invalid number of threads: %d\n",
> + n_threads);
> + err = -EINVAL;
> + goto out_free_pool;
Isn't' it natural to check it before the allocation?
> + }
> +
> + pool->nr_threads = n_threads;
> + pool->current_task = NULL;
> +
> + pool->threads = calloc(n_threads, sizeof(*pool->threads));
> + if (!pool->threads) {
> + pr_debug2("threadpool: cannot allocate threads: %s\n",
> + strerror(errno));
> + err = -ENOMEM;
> + goto out_free_pool;
> + }
> +
> + for (t = 0; t < n_threads; t++) {
> + pool->threads[t].idx = t;
> + pool->threads[t].tid = -1;
> + pool->threads[t].pool = pool;
> + threadpool_entry__init_pipes(&pool->threads[t]);
> + }
> +
> + for (t = 0; t < n_threads; t++) {
> + ret = threadpool_entry__open_pipes(&pool->threads[t]);
> + if (ret) {
> + err = -ret;
> + goto out_close_pipes;
... like this. But threadpool_entry__open_pipes() already
returns a negative.
> + }
> + }
> +
> + pool->status = THREADPOOL_STATUS__STOPPED;
> +
> + return pool;
> +
> +out_close_pipes:
> + for (t = 0; t < n_threads; t++)
> + threadpool_entry__close_pipes(&pool->threads[t]);
> +
> + zfree(&pool->threads);
> +out_free_pool:
> + free(pool);
> +out_return:
> + return ERR_PTR(err);
> +}
> +
> +/**
> + * threadpool__strerror - print message regarding given @err in @pool
> + *
> + * Buffer size should be at least THREADPOOL_STRERR_BUFSIZE bytes.
> + */
> +int threadpool__strerror(struct threadpool *pool __maybe_unused, int err, char *buf, size_t size)
> +{
> + char sbuf[STRERR_BUFSIZE], *emsg;
> +
> + emsg = str_error_r(err, sbuf, sizeof(sbuf));
> + return scnprintf(buf, size, "Error: %s.\n", emsg);
> +}
> +
> +/**
> + * threadpool__new_strerror - print message regarding @err_ptr
> + *
> + * Buffer size should be at least THREADPOOL_STRERR_BUFSIZE bytes.
> + */
> +int threadpool__new_strerror(struct threadpool *err_ptr, char *buf, size_t size)
> +{
> + return threadpool__strerror(err_ptr, PTR_ERR(err_ptr), buf, size);
> +}
Why two different functions?
> +
> +/**
> + * threadpool__delete - free the @pool and all its resources
> + */
> +void threadpool__delete(struct threadpool *pool)
> +{
> + int t;
> +
> + if (IS_ERR_OR_NULL(pool))
> + return;
> +
> + WARN_ON(pool->status != THREADPOOL_STATUS__STOPPED
> + && pool->status != THREADPOOL_STATUS__ERROR);
> +
> + for (t = 0; t < pool->nr_threads; t++)
> + threadpool_entry__close_pipes(&pool->threads[t]);
> +
> + zfree(&pool->threads);
> + free(pool);
> +}
> +
> +/**
> + * threadpool__size - get number of threads in the threadpool
> + */
> +int threadpool__size(struct threadpool *pool)
> +{
> + return pool->nr_threads;
> +}
> diff --git a/tools/perf/util/workqueue/threadpool.h b/tools/perf/util/workqueue/threadpool.h
> new file mode 100644
> index 0000000000000000..fb18aa32fb64f671
> --- /dev/null
> +++ b/tools/perf/util/workqueue/threadpool.h
> @@ -0,0 +1,30 @@
> +/* SPDX-License-Identifier: GPL-2.0 */
> +#ifndef __WORKQUEUE_THREADPOOL_H
> +#define __WORKQUEUE_THREADPOOL_H
> +
> +struct threadpool;
> +struct task_struct;
You can just move the definition here.
> +
> +typedef void (*task_func_t)(int tidx, struct task_struct *task);
> +
> +struct task_struct {
> + task_func_t fn;
> +};
> +
> +extern struct threadpool *threadpool__new(int n_threads);
> +extern void threadpool__delete(struct threadpool *pool);
> +
> +extern int threadpool__start(struct threadpool *pool);
> +extern int threadpool__stop(struct threadpool *pool);
> +
> +extern int threadpool__execute(struct threadpool *pool, struct task_struct *task);
> +extern int threadpool__wait(struct threadpool *pool);
These are not defined yet.
> +
> +extern int threadpool__size(struct threadpool *pool);
> +
> +/* Error management */
> +#define THREADPOOL_STRERR_BUFSIZE (128+STRERR_BUFSIZE)
> +extern int threadpool__strerror(struct threadpool *pool, int err, char *buf, size_t size);
> +extern int threadpool__new_strerror(struct threadpool *err_ptr, char *buf, size_t size);
> +
> +#endif /* __WORKQUEUE_THREADPOOL_H */
> --
> 2.31.1
>
Hi Namhyung,
thanks for the review!
On Fri, 2021-08-06 at 19:24 -0700, Namhyung Kim wrote:
> Hi Riccardo,
>
> On Fri, Jul 30, 2021 at 8:34 AM Riccardo Mancini <[email protected]> wrote:
> >
> > The workqueue library is made up by two components:
> > - threadpool: handles the lifetime of the threads
> > - workqueue: handles work distribution among the threads
> >
> > This first patch introduces the threadpool, starting from its creation
> > and destruction functions.
> > Thread management is based on the prototype from Alexey:
> > https://lore.kernel.org/lkml/[email protected]/
> >
> > Each thread in the threadpool executes the same function (aka task)
> > with a different argument tidx.
> > Threads use a pair of pipes to communicate with the main process.
> > The threadpool is static (all threads will be spawned at the same time).
> > Future work could include making it resizable and adding affinity support
> > (as in Alexey prototype).
> >
> > Suggested-by: Alexey Bayduraev <[email protected]>
> > Signed-off-by: Riccardo Mancini <[email protected]>
> > ---
> > tools/perf/util/Build | 1 +
> > tools/perf/util/workqueue/Build | 1 +
> > tools/perf/util/workqueue/threadpool.c | 208 +++++++++++++++++++++++++
> > tools/perf/util/workqueue/threadpool.h | 30 ++++
> > 4 files changed, 240 insertions(+)
> > create mode 100644 tools/perf/util/workqueue/Build
> > create mode 100644 tools/perf/util/workqueue/threadpool.c
> > create mode 100644 tools/perf/util/workqueue/threadpool.h
> >
> > diff --git a/tools/perf/util/Build b/tools/perf/util/Build
> > index 2d4fa13041789cd6..c7b09701661c869d 100644
> > --- a/tools/perf/util/Build
> > +++ b/tools/perf/util/Build
> > @@ -180,6 +180,7 @@ perf-$(CONFIG_LIBBABELTRACE) += data-convert-bt.o
> > perf-y += data-convert-json.o
> >
> > perf-y += scripting-engines/
> > +perf-y += workqueue/
> >
> > perf-$(CONFIG_ZLIB) += zlib.o
> > perf-$(CONFIG_LZMA) += lzma.o
> > diff --git a/tools/perf/util/workqueue/Build
> > b/tools/perf/util/workqueue/Build
> > new file mode 100644
> > index 0000000000000000..8b72a6cd4e2cba0d
> > --- /dev/null
> > +++ b/tools/perf/util/workqueue/Build
> > @@ -0,0 +1 @@
> > +perf-y += threadpool.o
> > diff --git a/tools/perf/util/workqueue/threadpool.c
> > b/tools/perf/util/workqueue/threadpool.c
> > new file mode 100644
> > index 0000000000000000..0004ce606d5fa73d
> > --- /dev/null
> > +++ b/tools/perf/util/workqueue/threadpool.c
> > @@ -0,0 +1,208 @@
> > +// SPDX-License-Identifier: GPL-2.0
> > +#include <stdlib.h>
> > +#include <stdio.h>
> > +#include <unistd.h>
> > +#include <errno.h>
> > +#include <string.h>
> > +#include "debug.h"
> > +#include <asm/bug.h>
> > +#include <linux/zalloc.h>
> > +#include <linux/string.h>
> > +#include <linux/err.h>
> > +#include <linux/kernel.h>
> > +#include "threadpool.h"
> > +
> > +enum threadpool_status {
> > + THREADPOOL_STATUS__STOPPED, /* no threads */
> > + THREADPOOL_STATUS__ERROR, /* errors */
> > + THREADPOOL_STATUS__MAX
> > +};
> > +
> > +struct threadpool {
> > + int nr_threads; /* number of threads in the
> > pool */
> > + struct threadpool_entry *threads; /* array of threads in the
> > pool */
> > + struct task_struct *current_task; /* current executing
> > function */
> > + enum threadpool_status status; /* current status of the
> > pool */
>
> Better to move to below the nr_threads for alignment.
ack
>
>
> > +};
> > +
> > +struct threadpool_entry {
> > + int idx; /* idx of thread in pool-
> > >threads */
> > + pid_t tid; /* tid of thread */
> > + struct threadpool *pool; /* parent threadpool */
> > + struct {
> > + int ack[2]; /* messages from thread
> > (acks) */
> > + int cmd[2]; /* messages to thread
> > (commands) */
> > + } pipes;
> > +};
> > +
> > +/**
> > + * threadpool_entry__init_pipes - initialize all pipes of @thread
> > + */
> > +static void threadpool_entry__init_pipes(struct threadpool_entry *thread)
> > +{
> > + thread->pipes.ack[0] = -1;
> > + thread->pipes.ack[1] = -1;
> > + thread->pipes.cmd[0] = -1;
> > + thread->pipes.cmd[1] = -1;
> > +}
> > +
> > +/**
> > + * threadpool_entry__open_pipes - open all pipes of @thread
> > + */
> > +static int threadpool_entry__open_pipes(struct threadpool_entry *thread)
> > +{
> > + if (pipe(thread->pipes.ack)) {
> > + pr_debug2("threadpool: failed to create comm pipe 'from':
> > %s\n",
> > + strerror(errno));
> > + return -ENOMEM;
> > + }
> > +
> > + if (pipe(thread->pipes.cmd)) {
> > + pr_debug2("threadpool: failed to create comm pipe 'to':
> > %s\n",
> > + strerror(errno));
> > + close(thread->pipes.ack[0]);
> > + thread->pipes.ack[0] = -1;
> > + close(thread->pipes.ack[1]);
> > + thread->pipes.ack[1] = -1;
>
> Maybe you don't need to do it here if the caller handles it already ...
oops, thanks.
>
> > + return -ENOMEM;
> > + }
> > +
> > + return 0;
> > +}
> > +
> > +/**
> > + * threadpool_entry__close_pipes - close all communication pipes of @thread
> > + */
> > +static void threadpool_entry__close_pipes(struct threadpool_entry *thread)
> > +{
> > + if (thread->pipes.ack[0] != -1) {
> > + close(thread->pipes.ack[0]);
> > + thread->pipes.ack[0] = -1;
> > + }
> > + if (thread->pipes.ack[1] != -1) {
> > + close(thread->pipes.ack[1]);
> > + thread->pipes.ack[1] = -1;
> > + }
> > + if (thread->pipes.cmd[0] != -1) {
> > + close(thread->pipes.cmd[0]);
> > + thread->pipes.cmd[0] = -1;
> > + }
> > + if (thread->pipes.cmd[1] != -1) {
> > + close(thread->pipes.cmd[1]);
> > + thread->pipes.cmd[1] = -1;
> > + }
> > +}
> > +
> > +/**
> > + * threadpool__new - create a fixed threadpool with @n_threads threads
> > + */
> > +struct threadpool *threadpool__new(int n_threads)
> > +{
> > + int ret, err, t;
> > + struct threadpool *pool = malloc(sizeof(*pool));
> > +
> > + if (!pool) {
> > + pr_debug2("threadpool: cannot allocate pool: %s\n",
> > + strerror(errno));
> > + err = -ENOMEM;
> > + goto out_return;
> > + }
> > +
> > + if (n_threads <= 0) {
> > + pr_debug2("threadpool: invalid number of threads: %d\n",
> > + n_threads);
> > + err = -EINVAL;
> > + goto out_free_pool;
>
> Isn't' it natural to check it before the allocation?
It sure is, thanks.
>
> > + }
> > +
> > + pool->nr_threads = n_threads;
> > + pool->current_task = NULL;
> > +
> > + pool->threads = calloc(n_threads, sizeof(*pool->threads));
> > + if (!pool->threads) {
> > + pr_debug2("threadpool: cannot allocate threads: %s\n",
> > + strerror(errno));
> > + err = -ENOMEM;
> > + goto out_free_pool;
> > + }
> > +
> > + for (t = 0; t < n_threads; t++) {
> > + pool->threads[t].idx = t;
> > + pool->threads[t].tid = -1;
> > + pool->threads[t].pool = pool;
> > + threadpool_entry__init_pipes(&pool->threads[t]);
> > + }
> > +
> > + for (t = 0; t < n_threads; t++) {
> > + ret = threadpool_entry__open_pipes(&pool->threads[t]);
> > + if (ret) {
> > + err = -ret;
> > + goto out_close_pipes;
>
> ... like this. But threadpool_entry__open_pipes() already
> returns a negative.
Yeah, I made some confusion with signs in this version, because I first wanted
to use positive for errnos and negative for custom errors but it didn't sound
like a good idea, so then I reverted to only negative errors, using the offset
for the custom errors.
I will have a better look at all return codes.
>
> > + }
> > + }
> > +
> > + pool->status = THREADPOOL_STATUS__STOPPED;
> > +
> > + return pool;
> > +
> > +out_close_pipes:
> > + for (t = 0; t < n_threads; t++)
> > + threadpool_entry__close_pipes(&pool->threads[t]);
> > +
> > + zfree(&pool->threads);
> > +out_free_pool:
> > + free(pool);
> > +out_return:
> > + return ERR_PTR(err);
> > +}
> > +
> > +/**
> > + * threadpool__strerror - print message regarding given @err in @pool
> > + *
> > + * Buffer size should be at least THREADPOOL_STRERR_BUFSIZE bytes.
> > + */
> > +int threadpool__strerror(struct threadpool *pool __maybe_unused, int err,
> > char *buf, size_t size)
> > +{
> > + char sbuf[STRERR_BUFSIZE], *emsg;
> > +
> > + emsg = str_error_r(err, sbuf, sizeof(sbuf));
> > + return scnprintf(buf, size, "Error: %s.\n", emsg);
> > +}
> > +
> > +/**
> > + * threadpool__new_strerror - print message regarding @err_ptr
> > + *
> > + * Buffer size should be at least THREADPOOL_STRERR_BUFSIZE bytes.
> > + */
> > +int threadpool__new_strerror(struct threadpool *err_ptr, char *buf, size_t
> > size)
> > +{
> > + return threadpool__strerror(err_ptr, PTR_ERR(err_ptr), buf, size);
> > +}
>
> Why two different functions?
Since when new fails you don't have a err number, just an err_ptr so it's not
very clear how to call threadpool__strerror. Therefore I made a wrapper to
remove any ambiguity.
>
> > +
> > +/**
> > + * threadpool__delete - free the @pool and all its resources
> > + */
> > +void threadpool__delete(struct threadpool *pool)
> > +{
> > + int t;
> > +
> > + if (IS_ERR_OR_NULL(pool))
> > + return;
> > +
> > + WARN_ON(pool->status != THREADPOOL_STATUS__STOPPED
> > + && pool->status != THREADPOOL_STATUS__ERROR);
> > +
> > + for (t = 0; t < pool->nr_threads; t++)
> > + threadpool_entry__close_pipes(&pool->threads[t]);
> > +
> > + zfree(&pool->threads);
> > + free(pool);
> > +}
> > +
> > +/**
> > + * threadpool__size - get number of threads in the threadpool
> > + */
> > +int threadpool__size(struct threadpool *pool)
> > +{
> > + return pool->nr_threads;
> > +}
> > diff --git a/tools/perf/util/workqueue/threadpool.h
> > b/tools/perf/util/workqueue/threadpool.h
> > new file mode 100644
> > index 0000000000000000..fb18aa32fb64f671
> > --- /dev/null
> > +++ b/tools/perf/util/workqueue/threadpool.h
> > @@ -0,0 +1,30 @@
> > +/* SPDX-License-Identifier: GPL-2.0 */
> > +#ifndef __WORKQUEUE_THREADPOOL_H
> > +#define __WORKQUEUE_THREADPOOL_H
> > +
> > +struct threadpool;
> > +struct task_struct;
>
> You can just move the definition here.
>
> > +
> > +typedef void (*task_func_t)(int tidx, struct task_struct *task);
> > +
> > +struct task_struct {
> > + task_func_t fn;
> > +};
I thought it was not allowed, since task_func_t refers to task_struct and
viceversa.
I will try to remove it if possible.
> > +
> > +extern struct threadpool *threadpool__new(int n_threads);
> > +extern void threadpool__delete(struct threadpool *pool);
> > +
> > +extern int threadpool__start(struct threadpool *pool);
> > +extern int threadpool__stop(struct threadpool *pool);
> > +
> > +extern int threadpool__execute(struct threadpool *pool, struct task_struct
> > *task);
> > +extern int threadpool__wait(struct threadpool *pool);
>
> These are not defined yet.
Oops, they must've leaked from the 3rd patch.
Thanks,
Riccardo
>
> > +
> > +extern int threadpool__size(struct threadpool *pool);
> > +
> > +/* Error management */
> > +#define THREADPOOL_STRERR_BUFSIZE (128+STRERR_BUFSIZE)
> > +extern int threadpool__strerror(struct threadpool *pool, int err, char
> > *buf, size_t size);
> > +extern int threadpool__new_strerror(struct threadpool *err_ptr, char *buf,
> > size_t size);
> > +
> > +#endif /* __WORKQUEUE_THREADPOOL_H */
> > --
> > 2.31.1
> >
On Mon, Aug 9, 2021 at 3:30 AM Riccardo Mancini <[email protected]> wrote:
> On Fri, 2021-08-06 at 19:24 -0700, Namhyung Kim wrote:
> > > +
> > > +/**
> > > + * threadpool__strerror - print message regarding given @err in @pool
> > > + *
> > > + * Buffer size should be at least THREADPOOL_STRERR_BUFSIZE bytes.
> > > + */
> > > +int threadpool__strerror(struct threadpool *pool __maybe_unused, int err,
> > > char *buf, size_t size)
> > > +{
> > > + char sbuf[STRERR_BUFSIZE], *emsg;
> > > +
> > > + emsg = str_error_r(err, sbuf, sizeof(sbuf));
> > > + return scnprintf(buf, size, "Error: %s.\n", emsg);
> > > +}
> > > +
> > > +/**
> > > + * threadpool__new_strerror - print message regarding @err_ptr
> > > + *
> > > + * Buffer size should be at least THREADPOOL_STRERR_BUFSIZE bytes.
> > > + */
> > > +int threadpool__new_strerror(struct threadpool *err_ptr, char *buf, size_t
> > > size)
> > > +{
> > > + return threadpool__strerror(err_ptr, PTR_ERR(err_ptr), buf, size);
> > > +}
> >
> > Why two different functions?
>
> Since when new fails you don't have a err number, just an err_ptr so it's not
> very clear how to call threadpool__strerror. Therefore I made a wrapper to
> remove any ambiguity.
What do you mean by "when new fails"?
> >
> > > +
> > > +/**
> > > + * threadpool__delete - free the @pool and all its resources
> > > + */
> > > +void threadpool__delete(struct threadpool *pool)
> > > +{
> > > + int t;
> > > +
> > > + if (IS_ERR_OR_NULL(pool))
> > > + return;
> > > +
> > > + WARN_ON(pool->status != THREADPOOL_STATUS__STOPPED
> > > + && pool->status != THREADPOOL_STATUS__ERROR);
> > > +
> > > + for (t = 0; t < pool->nr_threads; t++)
> > > + threadpool_entry__close_pipes(&pool->threads[t]);
> > > +
> > > + zfree(&pool->threads);
> > > + free(pool);
> > > +}
> > > +
> > > +/**
> > > + * threadpool__size - get number of threads in the threadpool
> > > + */
> > > +int threadpool__size(struct threadpool *pool)
> > > +{
> > > + return pool->nr_threads;
> > > +}
> > > diff --git a/tools/perf/util/workqueue/threadpool.h
> > > b/tools/perf/util/workqueue/threadpool.h
> > > new file mode 100644
> > > index 0000000000000000..fb18aa32fb64f671
> > > --- /dev/null
> > > +++ b/tools/perf/util/workqueue/threadpool.h
> > > @@ -0,0 +1,30 @@
> > > +/* SPDX-License-Identifier: GPL-2.0 */
> > > +#ifndef __WORKQUEUE_THREADPOOL_H
> > > +#define __WORKQUEUE_THREADPOOL_H
> > > +
> > > +struct threadpool;
> > > +struct task_struct;
> >
> > You can just move the definition here.
> >
> > > +
> > > +typedef void (*task_func_t)(int tidx, struct task_struct *task);
> > > +
> > > +struct task_struct {
> > > + task_func_t fn;
> > > +};
>
> I thought it was not allowed, since task_func_t refers to task_struct and
> viceversa.
> I will try to remove it if possible.
Oh, I missed that, sorry for the noise.
Thanks,
namhyung
Em Tue, Aug 10, 2021 at 11:54:19AM -0700, Namhyung Kim escreveu:
> On Mon, Aug 9, 2021 at 3:30 AM Riccardo Mancini <[email protected]> wrote:
> > On Fri, 2021-08-06 at 19:24 -0700, Namhyung Kim wrote:
> > > > +/**
> > > > + * threadpool__strerror - print message regarding given @err in @pool
> > > > + *
> > > > + * Buffer size should be at least THREADPOOL_STRERR_BUFSIZE bytes.
> > > > + */
> > > > +int threadpool__strerror(struct threadpool *pool __maybe_unused, int err,
> > > > char *buf, size_t size)
> > > > +{
> > > > + char sbuf[STRERR_BUFSIZE], *emsg;
> > > > +
> > > > + emsg = str_error_r(err, sbuf, sizeof(sbuf));
> > > > + return scnprintf(buf, size, "Error: %s.\n", emsg);
> > > > +}
> > > > +
> > > > +/**
> > > > + * threadpool__new_strerror - print message regarding @err_ptr
> > > > + *
> > > > + * Buffer size should be at least THREADPOOL_STRERR_BUFSIZE bytes.
> > > > + */
> > > > +int threadpool__new_strerror(struct threadpool *err_ptr, char *buf, size_t
> > > > size)
> > > > +{
> > > > + return threadpool__strerror(err_ptr, PTR_ERR(err_ptr), buf, size);
> > > > +}
> > > Why two different functions?
> > Since when new fails you don't have a err number, just an err_ptr so it's not
> > very clear how to call threadpool__strerror. Therefore I made a wrapper to
> > remove any ambiguity.
>
> What do you mean by "when new fails"?
I think 'new' is 'constructor', i.e. something__new() returns a newly
created object and this not an error number, so he uses ERR_PTR() and
then he needs to pass it to the 'strerror' specific to the
threadpool__new, which will use PTR_ERR() to get an integer, and then
map that to a proper error string, right?
- Arnaldo
On Tue, 2021-08-10 at 17:24 -0300, Arnaldo Carvalho de Melo wrote:
> Em Tue, Aug 10, 2021 at 11:54:19AM -0700, Namhyung Kim escreveu:
> > On Mon, Aug 9, 2021 at 3:30 AM Riccardo Mancini <[email protected]> wrote:
> > > On Fri, 2021-08-06 at 19:24 -0700, Namhyung Kim wrote:
> > > > > +/**
> > > > > + * threadpool__strerror - print message regarding given @err in @pool
> > > > > + *
> > > > > + * Buffer size should be at least THREADPOOL_STRERR_BUFSIZE bytes.
> > > > > + */
> > > > > +int threadpool__strerror(struct threadpool *pool __maybe_unused, int
> > > > > err,
> > > > > char *buf, size_t size)
> > > > > +{
> > > > > + char sbuf[STRERR_BUFSIZE], *emsg;
> > > > > +
> > > > > + emsg = str_error_r(err, sbuf, sizeof(sbuf));
> > > > > + return scnprintf(buf, size, "Error: %s.\n", emsg);
> > > > > +}
> > > > > +
> > > > > +/**
> > > > > + * threadpool__new_strerror - print message regarding @err_ptr
> > > > > + *
> > > > > + * Buffer size should be at least THREADPOOL_STRERR_BUFSIZE bytes.
> > > > > + */
> > > > > +int threadpool__new_strerror(struct threadpool *err_ptr, char *buf,
> > > > > size_t
> > > > > size)
> > > > > +{
> > > > > + return threadpool__strerror(err_ptr, PTR_ERR(err_ptr), buf,
> > > > > size);
> > > > > +}
>
> > > > Why two different functions?
>
> > > Since when new fails you don't have a err number, just an err_ptr so it's
> > > not
> > > very clear how to call threadpool__strerror. Therefore I made a wrapper to
> > > remove any ambiguity.
> >
> > What do you mean by "when new fails"?
>
> I think 'new' is 'constructor', i.e. something__new() returns a newly
> created object and this not an error number, so he uses ERR_PTR() and
> then he needs to pass it to the 'strerror' specific to the
> threadpool__new, which will use PTR_ERR() to get an integer, and then
> map that to a proper error string, right?
Correct.
threadpool__new_strerror is not really needed since one could use
threadpool__strerror directly, but then I would need to handle all possible ways
it could be called (e.g. (NULL, PTR_ERR(err_ptr)), (err_ptr, 0), (err_ptr,
PTR_ERR(err_ptr)), so I thought it was better to just provide a strerror that
only took the err_ptr.
Maybe I can provide it as an inline in the header, or as a macro.
Riccardo
>
> - Arnaldo