2021-07-13 12:12:57

by Riccardo Mancini

[permalink] [raw]
Subject: [RFC PATCH 00/10] perf: add workqueue library and use it in synthetic-events

This patchset introduces a new utility library inside perf/util, which
provides a work queue abstraction, which loosely follows the Kernel
workqueue API.

The workqueue abstraction is made up by two components:
- threadpool: which takes care of managing a pool of threads. It is
inspired by the prototype for threaded trace in perf-record from Alexey:
https://lore.kernel.org/lkml/[email protected]/
- workqueue: manages a shared queue and provides the workers implementation.

On top of the workqueue, a simple parallel-for utility is implemented
which is then showcased in synthetic-events.c, replacing the previous
manual pthread-created threads.

Through some experiments with perf bench, I can see how the new
workqueue has a higher overhead compared to manual creation of threads,
but is able to more effectively partition work among threads, yielding
a better result with more threads.
Furthermore, the overhead could be configured by changing the
`work_size` (currently 1), aka the number of dirents that are
processed by a thread before grabbing a lock to get the new work item.
I experimented with different sizes but, while bigger sizes reduce overhead
as expected, they do not scale as well to more threads.

I tried to keep the patchset as simple as possible, deferring possible
improvements and features to future work.
Naming a few:
- in order to achieve a better performance, we could consider using
work-stealing instead of a common queue.
- affinities in the thread pool, as in Alexey prototype for
perf-record. Doing so would enable reusing the same threadpool for
different purposes (evlist open, threaded trace, synthetic threads),
avoiding having to spin up threads multiple times.
- resizable threadpool, e.g. for lazy spawining of threads.

@Arnaldo
Since I wanted the workqueue to provide a similar API to the Kernel's
workqueue, I followed the naming style I found there, instead of the
usual object__method style that is typically found in perf.
Let me know if you'd like me to follow perf style instead.

Thanks,
Riccardo

Riccardo Mancini (10):
perf workqueue: threadpool creation and destruction
perf tests: add test for workqueue
perf workqueue: add threadpool start and stop functions
perf workqueue: add threadpool execute and wait functions
perf workqueue: add sparse annotation header
perf workqueue: introduce workqueue struct
perf workqueue: implement worker thread and management
perf workqueue: add queue_work and flush_workqueue functions
perf workqueue: add utility to execute a for loop in parallel
perf synthetic-events: use workqueue parallel_for

tools/perf/tests/Build | 1 +
tools/perf/tests/builtin-test.c | 9 +
tools/perf/tests/tests.h | 3 +
tools/perf/tests/workqueue.c | 453 +++++++++++++++++
tools/perf/util/Build | 1 +
tools/perf/util/synthetic-events.c | 131 +++--
tools/perf/util/workqueue/Build | 2 +
tools/perf/util/workqueue/sparse.h | 21 +
tools/perf/util/workqueue/threadpool.c | 516 ++++++++++++++++++++
tools/perf/util/workqueue/threadpool.h | 29 ++
tools/perf/util/workqueue/workqueue.c | 642 +++++++++++++++++++++++++
tools/perf/util/workqueue/workqueue.h | 38 ++
12 files changed, 1771 insertions(+), 75 deletions(-)
create mode 100644 tools/perf/tests/workqueue.c
create mode 100644 tools/perf/util/workqueue/Build
create mode 100644 tools/perf/util/workqueue/sparse.h
create mode 100644 tools/perf/util/workqueue/threadpool.c
create mode 100644 tools/perf/util/workqueue/threadpool.h
create mode 100644 tools/perf/util/workqueue/workqueue.c
create mode 100644 tools/perf/util/workqueue/workqueue.h

--
2.31.1


2021-07-13 12:13:06

by Riccardo Mancini

[permalink] [raw]
Subject: [RFC PATCH 01/10] perf workqueue: threadpool creation and destruction

The workqueue library is made up by two components:
- threadpool: handles the lifetime of the threads
- workqueue: handles work distribution among the threads

This first patch introduces the threadpool, starting from its creation
and destruction functions.
Thread management is based on the prototype from Alexey:
https://lore.kernel.org/lkml/[email protected]/

Each thread in the threadpool executes the same function (aka task)
with a different argument tidx.
Threads use a pair of pipes to communicate with the main process.
The threadpool is static (all threads will be spawned at the same time).
Future work could include making it resizable and adding affinity support
(as in Alexey prototype).

Suggested-by: Alexey Bayduraev <[email protected]>
Signed-off-by: Riccardo Mancini <[email protected]>
---
tools/perf/util/Build | 1 +
tools/perf/util/workqueue/Build | 1 +
tools/perf/util/workqueue/threadpool.c | 175 +++++++++++++++++++++++++
tools/perf/util/workqueue/threadpool.h | 19 +++
4 files changed, 196 insertions(+)
create mode 100644 tools/perf/util/workqueue/Build
create mode 100644 tools/perf/util/workqueue/threadpool.c
create mode 100644 tools/perf/util/workqueue/threadpool.h

diff --git a/tools/perf/util/Build b/tools/perf/util/Build
index 2d4fa13041789cd6..c7b09701661c869d 100644
--- a/tools/perf/util/Build
+++ b/tools/perf/util/Build
@@ -180,6 +180,7 @@ perf-$(CONFIG_LIBBABELTRACE) += data-convert-bt.o
perf-y += data-convert-json.o

perf-y += scripting-engines/
+perf-y += workqueue/

perf-$(CONFIG_ZLIB) += zlib.o
perf-$(CONFIG_LZMA) += lzma.o
diff --git a/tools/perf/util/workqueue/Build b/tools/perf/util/workqueue/Build
new file mode 100644
index 0000000000000000..8b72a6cd4e2cba0d
--- /dev/null
+++ b/tools/perf/util/workqueue/Build
@@ -0,0 +1 @@
+perf-y += threadpool.o
diff --git a/tools/perf/util/workqueue/threadpool.c b/tools/perf/util/workqueue/threadpool.c
new file mode 100644
index 0000000000000000..70c67569f956a3e2
--- /dev/null
+++ b/tools/perf/util/workqueue/threadpool.c
@@ -0,0 +1,175 @@
+// SPDX-License-Identifier: GPL-2.0
+#include <stdlib.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include "debug.h"
+#include "asm/bug.h"
+#include "threadpool.h"
+
+enum threadpool_status {
+ THREADPOOL_STATUS__STOPPED, /* no threads */
+ THREADPOOL_STATUS__ERROR, /* errors */
+ THREADPOOL_STATUS__MAX
+};
+
+struct threadpool_struct {
+ int nr_threads; /* number of threads in the pool */
+ struct thread_struct *threads; /* array of threads in the pool */
+ struct task_struct *current_task; /* current executing function */
+ enum threadpool_status status; /* current status of the pool */
+};
+
+struct thread_struct {
+ int idx; /* idx of thread in pool->threads */
+ pid_t tid; /* tid of thread */
+ struct threadpool_struct *pool; /* parent threadpool */
+ struct {
+ int from[2]; /* messages from thread (acks) */
+ int to[2]; /* messages to thread (commands) */
+ } pipes;
+};
+
+/**
+ * init_pipes - initialize all pipes of @thread
+ */
+static void init_pipes(struct thread_struct *thread)
+{
+ thread->pipes.from[0] = -1;
+ thread->pipes.from[1] = -1;
+ thread->pipes.to[0] = -1;
+ thread->pipes.to[1] = -1;
+}
+
+/**
+ * open_pipes - open all pipes of @thread
+ */
+static int open_pipes(struct thread_struct *thread)
+{
+ if (pipe(thread->pipes.from)) {
+ pr_err("threadpool: failed to create comm pipe 'from': %s\n",
+ strerror(errno));
+ return -ENOMEM;
+ }
+
+ if (pipe(thread->pipes.to)) {
+ pr_err("threadpool: failed to create comm pipe 'to': %s\n",
+ strerror(errno));
+ close(thread->pipes.from[0]);
+ thread->pipes.from[0] = -1;
+ close(thread->pipes.from[1]);
+ thread->pipes.from[1] = -1;
+ return -ENOMEM;
+ }
+
+ return 0;
+}
+
+/**
+ * close_pipes - close all communication pipes of @thread
+ */
+static void close_pipes(struct thread_struct *thread)
+{
+ if (thread->pipes.from[0] != -1) {
+ close(thread->pipes.from[0]);
+ thread->pipes.from[0] = -1;
+ }
+ if (thread->pipes.from[1] != -1) {
+ close(thread->pipes.from[1]);
+ thread->pipes.from[1] = -1;
+ }
+ if (thread->pipes.to[0] != -1) {
+ close(thread->pipes.to[0]);
+ thread->pipes.to[0] = -1;
+ }
+ if (thread->pipes.to[1] != -1) {
+ close(thread->pipes.to[1]);
+ thread->pipes.to[1] = -1;
+ }
+}
+
+/**
+ * create_threadpool - create a fixed threadpool with @n_threads threads
+ */
+struct threadpool_struct *create_threadpool(int n_threads)
+{
+ int ret, t;
+ struct threadpool_struct *pool = malloc(sizeof(*pool));
+
+ if (!pool) {
+ pr_err("threadpool: cannot allocate pool: %s\n",
+ strerror(errno));
+ return NULL;
+ }
+
+ if (n_threads <= 0) {
+ pr_err("threadpool: invalid number of threads: %d\n",
+ n_threads);
+ goto out_free_pool;
+ }
+
+ pool->nr_threads = n_threads;
+ pool->current_task = NULL;
+
+ pool->threads = malloc(n_threads * sizeof(*pool->threads));
+ if (!pool->threads) {
+ pr_err("threadpool: cannot allocate threads: %s\n",
+ strerror(errno));
+ goto out_free_pool;
+ }
+
+ for (t = 0; t < n_threads; t++) {
+ pool->threads[t].idx = t;
+ pool->threads[t].tid = -1;
+ pool->threads[t].pool = pool;
+ init_pipes(&pool->threads[t]);
+ }
+
+ for (t = 0; t < n_threads; t++) {
+ ret = open_pipes(&pool->threads[t]);
+ if (ret)
+ goto out_close_pipes;
+ }
+
+ pool->status = THREADPOOL_STATUS__STOPPED;
+
+ return pool;
+
+out_close_pipes:
+ for (t = 0; t < n_threads; t++)
+ close_pipes(&pool->threads[t]);
+
+ free(pool->threads);
+out_free_pool:
+ free(pool);
+ return NULL;
+}
+
+/**
+ * destroy_threadpool - free the @pool and all its resources
+ */
+void destroy_threadpool(struct threadpool_struct *pool)
+{
+ int t;
+
+ if (!pool)
+ return;
+
+ WARN_ON(pool->status != THREADPOOL_STATUS__STOPPED
+ && pool->status != THREADPOOL_STATUS__ERROR);
+
+ for (t = 0; t < pool->nr_threads; t++)
+ close_pipes(&pool->threads[t]);
+
+ free(pool->threads);
+ free(pool);
+}
+
+/**
+ * threadpool_size - get number of threads in the threadpool
+ */
+int threadpool_size(struct threadpool_struct *pool)
+{
+ return pool->nr_threads;
+}
diff --git a/tools/perf/util/workqueue/threadpool.h b/tools/perf/util/workqueue/threadpool.h
new file mode 100644
index 0000000000000000..2b9388c768a0b588
--- /dev/null
+++ b/tools/perf/util/workqueue/threadpool.h
@@ -0,0 +1,19 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef __WORKQUEUE_THREADPOOL_H
+#define __WORKQUEUE_THREADPOOL_H
+
+struct threadpool_struct;
+struct task_struct;
+
+typedef void (*task_func_t)(int tidx, struct task_struct *task);
+
+struct task_struct {
+ task_func_t fn;
+};
+
+extern struct threadpool_struct *create_threadpool(int n_threads);
+extern void destroy_threadpool(struct threadpool_struct *pool);
+
+extern int threadpool_size(struct threadpool_struct *pool);
+
+#endif /* __WORKQUEUE_THREADPOOL_H */
--
2.31.1

2021-07-13 12:13:07

by Riccardo Mancini

[permalink] [raw]
Subject: [RFC PATCH 02/10] perf tests: add test for workqueue

It will have subtests testing threadpool and workqueue separately.
This patch only introduces the first subtest, checking that the
threadpool is correctly created and destructed.
This test will be expanded when new functions are added in next
patches.

Signed-off-by: Riccardo Mancini <[email protected]>
---
tools/perf/tests/Build | 1 +
tools/perf/tests/builtin-test.c | 9 +++
tools/perf/tests/tests.h | 3 +
tools/perf/tests/workqueue.c | 113 ++++++++++++++++++++++++++++++++
4 files changed, 126 insertions(+)
create mode 100644 tools/perf/tests/workqueue.c

diff --git a/tools/perf/tests/Build b/tools/perf/tests/Build
index 650aec19d49052ca..eda6c78a37cfbc13 100644
--- a/tools/perf/tests/Build
+++ b/tools/perf/tests/Build
@@ -64,6 +64,7 @@ perf-y += parse-metric.o
perf-y += pe-file-parsing.o
perf-y += expand-cgroup.o
perf-y += perf-time-to-tsc.o
+perf-y += workqueue.o

$(OUTPUT)tests/llvm-src-base.c: tests/bpf-script-example.c tests/Build
$(call rule_mkdir)
diff --git a/tools/perf/tests/builtin-test.c b/tools/perf/tests/builtin-test.c
index 5e6242576236325c..2ff5d38ed83a723d 100644
--- a/tools/perf/tests/builtin-test.c
+++ b/tools/perf/tests/builtin-test.c
@@ -360,6 +360,15 @@ static struct test generic_tests[] = {
.func = test__perf_time_to_tsc,
.is_supported = test__tsc_is_supported,
},
+ {
+ .desc = "Test workqueue lib",
+ .func = test__workqueue,
+ .subtest = {
+ .skip_if_fail = false,
+ .get_nr = test__workqueue_subtest_get_nr,
+ .get_desc = test__workqueue_subtest_get_desc,
+ }
+ },
{
.func = NULL,
},
diff --git a/tools/perf/tests/tests.h b/tools/perf/tests/tests.h
index 1100dd55b657b779..9ca67113a7402463 100644
--- a/tools/perf/tests/tests.h
+++ b/tools/perf/tests/tests.h
@@ -127,6 +127,9 @@ int test__parse_metric(struct test *test, int subtest);
int test__pe_file_parsing(struct test *test, int subtest);
int test__expand_cgroup_events(struct test *test, int subtest);
int test__perf_time_to_tsc(struct test *test, int subtest);
+int test__workqueue(struct test *test, int subtest);
+const char *test__workqueue_subtest_get_desc(int subtest);
+int test__workqueue_subtest_get_nr(void);

bool test__bp_signal_is_supported(void);
bool test__bp_account_is_supported(void);
diff --git a/tools/perf/tests/workqueue.c b/tools/perf/tests/workqueue.c
new file mode 100644
index 0000000000000000..1bd4d78c13eb3b14
--- /dev/null
+++ b/tools/perf/tests/workqueue.c
@@ -0,0 +1,113 @@
+// SPDX-License-Identifier: GPL-2.0
+#include <linux/kernel.h>
+#include "tests.h"
+#include "util/debug.h"
+#include "util/workqueue/threadpool.h"
+
+struct threadpool_test_args_t {
+ int pool_size;
+};
+
+static int __threadpool__prepare(struct threadpool_struct **pool, int pool_size)
+{
+ *pool = create_threadpool(pool_size);
+ TEST_ASSERT_VAL("threadpool creation failure", *pool != NULL);
+ TEST_ASSERT_VAL("threadpool size is wrong",
+ threadpool_size(*pool) == pool_size);
+
+ return 0;
+}
+
+static int __threadpool__teardown(struct threadpool_struct *pool)
+{
+ destroy_threadpool(pool);
+
+ return 0;
+}
+
+
+static int __test__threadpool(void *_args)
+{
+ struct threadpool_test_args_t *args = _args;
+ struct threadpool_struct *pool;
+ int ret;
+
+ ret = __threadpool__prepare(&pool, args->pool_size);
+ if (ret)
+ return ret;
+
+ ret = __threadpool__teardown(pool);
+ if (ret)
+ return ret;
+
+ return 0;
+}
+
+static const struct threadpool_test_args_t threadpool_test_args[] = {
+ {
+ .pool_size = 1
+ },
+ {
+ .pool_size = 2
+ },
+ {
+ .pool_size = 4
+ },
+ {
+ .pool_size = 8
+ },
+ {
+ .pool_size = 16
+ }
+};
+
+struct test_case {
+ const char *desc;
+ int (*func)(void *args);
+ void *args;
+ int n_args;
+ int arg_size;
+};
+
+static struct test_case workqueue_testcase_table[] = {
+ {
+ .desc = "Threadpool",
+ .func = __test__threadpool,
+ .args = (void *) threadpool_test_args,
+ .n_args = (int)ARRAY_SIZE(threadpool_test_args),
+ .arg_size = sizeof(struct threadpool_test_args_t)
+ }
+};
+
+
+int test__workqueue(struct test *test __maybe_unused, int i)
+{
+ int j, ret = 0;
+ struct test_case *tc;
+
+ if (i < 0 || i >= (int)ARRAY_SIZE(workqueue_testcase_table))
+ return -1;
+
+ tc = &workqueue_testcase_table[i];
+
+ for (j = 0; j < tc->n_args; j++) {
+ ret = tc->func(tc->args + (j*tc->arg_size));
+ if (ret)
+ return ret;
+ }
+
+ return 0;
+}
+
+
+int test__workqueue_subtest_get_nr(void)
+{
+ return (int)ARRAY_SIZE(workqueue_testcase_table);
+}
+
+const char *test__workqueue_subtest_get_desc(int i)
+{
+ if (i < 0 || i >= (int)ARRAY_SIZE(workqueue_testcase_table))
+ return NULL;
+ return workqueue_testcase_table[i].desc;
+}
--
2.31.1

2021-07-13 12:13:07

by Riccardo Mancini

[permalink] [raw]
Subject: [RFC PATCH 03/10] perf workqueue: add threadpool start and stop functions

This patch adds the start and stop functions, alongside the thread
function.
Each thread will run until a stop signal is received.
Furthermore, start and stop are added to the test.

Thread management is based on the prototype from Alexey:
https://lore.kernel.org/lkml/[email protected]/

Suggested-by: Alexey Bayduraev <[email protected]>
Signed-off-by: Riccardo Mancini <[email protected]>
---
tools/perf/tests/workqueue.c | 13 ++
tools/perf/util/workqueue/threadpool.c | 238 +++++++++++++++++++++++++
tools/perf/util/workqueue/threadpool.h | 5 +
3 files changed, 256 insertions(+)

diff --git a/tools/perf/tests/workqueue.c b/tools/perf/tests/workqueue.c
index 1bd4d78c13eb3b14..be377e9897bab4e9 100644
--- a/tools/perf/tests/workqueue.c
+++ b/tools/perf/tests/workqueue.c
@@ -10,16 +10,29 @@ struct threadpool_test_args_t {

static int __threadpool__prepare(struct threadpool_struct **pool, int pool_size)
{
+ int ret;
+
*pool = create_threadpool(pool_size);
TEST_ASSERT_VAL("threadpool creation failure", *pool != NULL);
TEST_ASSERT_VAL("threadpool size is wrong",
threadpool_size(*pool) == pool_size);

+ ret = start_threadpool(*pool);
+ TEST_ASSERT_VAL("threadpool start failure", ret == 0);
+ TEST_ASSERT_VAL("threadpool is not ready", threadpool_is_ready(*pool));
+
return 0;
}

static int __threadpool__teardown(struct threadpool_struct *pool)
{
+ int ret;
+
+ ret = stop_threadpool(pool);
+ TEST_ASSERT_VAL("threadpool start failure", ret == 0);
+ TEST_ASSERT_VAL("stopped threadpool is ready",
+ !threadpool_is_ready(pool));
+
destroy_threadpool(pool);

return 0;
diff --git a/tools/perf/util/workqueue/threadpool.c b/tools/perf/util/workqueue/threadpool.c
index 70c67569f956a3e2..f4635ff782b9388e 100644
--- a/tools/perf/util/workqueue/threadpool.c
+++ b/tools/perf/util/workqueue/threadpool.c
@@ -4,12 +4,23 @@
#include <unistd.h>
#include <errno.h>
#include <string.h>
+#include <pthread.h>
+#include <signal.h>
+#include <syscall.h>
#include "debug.h"
#include "asm/bug.h"
#include "threadpool.h"

+#ifndef HAVE_GETTID
+static inline pid_t gettid(void)
+{
+ return (pid_t)syscall(__NR_gettid);
+}
+#endif
+
enum threadpool_status {
THREADPOOL_STATUS__STOPPED, /* no threads */
+ THREADPOOL_STATUS__READY, /* threads are ready but idle */
THREADPOOL_STATUS__ERROR, /* errors */
THREADPOOL_STATUS__MAX
};
@@ -31,6 +42,21 @@ struct thread_struct {
} pipes;
};

+enum thread_msg {
+ THREAD_MSG__UNDEFINED = 0,
+ THREAD_MSG__ACK, /* from th: create and exit ack */
+ THREAD_MSG__WAKE, /* to th: wake up */
+ THREAD_MSG__STOP, /* to th: exit */
+ THREAD_MSG__MAX
+};
+
+static const char * const thread_msg_tags[] = {
+ "undefined",
+ "ack",
+ "wake",
+ "stop"
+};
+
/**
* init_pipes - initialize all pipes of @thread
*/
@@ -89,6 +115,113 @@ static void close_pipes(struct thread_struct *thread)
}
}

+/**
+ * wait_thread - receive ack from thread
+ *
+ * NB: call only from main thread!
+ */
+static int wait_thread(struct thread_struct *thread)
+{
+ int res;
+ enum thread_msg msg = THREAD_MSG__UNDEFINED;
+
+ res = read(thread->pipes.from[0], &msg, sizeof(msg));
+ if (res < 0) {
+ pr_err("threadpool: failed to recv msg from tid=%d: %s\n",
+ thread->tid, strerror(errno));
+ return -1;
+ }
+ if (msg != THREAD_MSG__ACK) {
+ pr_err("threadpool: received unexpected msg from tid=%d: %s\n",
+ thread->tid, thread_msg_tags[msg]);
+ return -1;
+ }
+
+ pr_debug2("threadpool: received ack from tid=%d\n", thread->tid);
+
+ return 0;
+}
+
+/**
+ * terminate_thread - send stop signal to thread and wait for ack
+ *
+ * NB: call only from main thread!
+ */
+static int terminate_thread(struct thread_struct *thread)
+{
+ int res;
+ enum thread_msg msg = THREAD_MSG__STOP;
+
+ res = write(thread->pipes.to[1], &msg, sizeof(msg));
+ if (res < 0) {
+ pr_err("threadpool: error sending stop msg to tid=%d: %s\n",
+ thread->tid, strerror(errno));
+ return res;
+ }
+
+ res = wait_thread(thread);
+
+ return res;
+}
+
+/**
+ * threadpool_thread - function running on thread
+ *
+ * This function waits for a signal from main thread to start executing
+ * a task.
+ * On completion, it will go back to sleep, waiting for another signal.
+ * Signals are delivered through pipes.
+ */
+static void *threadpool_thread(void *args)
+{
+ struct thread_struct *thread = (struct thread_struct *) args;
+ enum thread_msg msg;
+ int err;
+
+ thread->tid = gettid();
+
+ pr_debug2("threadpool[%d]: started\n", thread->tid);
+
+ for (;;) {
+ msg = THREAD_MSG__ACK;
+ err = write(thread->pipes.from[1], &msg, sizeof(msg));
+ if (err == -1) {
+ pr_err("threadpool[%d]: failed to send ack: %s\n",
+ thread->tid, strerror(errno));
+ break;
+ }
+
+ msg = THREAD_MSG__UNDEFINED;
+ err = read(thread->pipes.to[0], &msg, sizeof(msg));
+ if (err < 0) {
+ pr_err("threadpool[%d]: error receiving msg: %s\n",
+ thread->tid, strerror(errno));
+ break;
+ }
+
+ if (msg != THREAD_MSG__WAKE && msg != THREAD_MSG__STOP) {
+ pr_err("threadpool[%d]: received unexpected msg: %s\n",
+ thread->tid, thread_msg_tags[msg]);
+ break;
+ }
+
+ if (msg == THREAD_MSG__STOP)
+ break;
+ }
+
+ pr_debug2("threadpool[%d]: exit\n", thread->tid);
+
+ msg = THREAD_MSG__ACK;
+ err = write(thread->pipes.from[1], &msg, sizeof(msg));
+ if (err == -1) {
+ pr_err("threadpool[%d]: failed to send ack: %s\n",
+ thread->tid, strerror(errno));
+ return NULL;
+ }
+
+ return NULL;
+}
+
/**
* create_threadpool - create a fixed threadpool with @n_threads threads
*/
@@ -173,3 +306,108 @@ int threadpool_size(struct threadpool_struct *pool)
{
return pool->nr_threads;
}
+
+/**
+ * __start_threadpool - start all threads in the pool.
+ *
+ * This function does not change @pool->status.
+ */
+static int __start_threadpool(struct threadpool_struct *pool)
+{
+ int t, tt, ret = 0, nr_threads = pool->nr_threads;
+ sigset_t full, mask;
+ pthread_t handle;
+ pthread_attr_t attrs;
+
+ sigfillset(&full);
+ if (sigprocmask(SIG_SETMASK, &full, &mask)) {
+ pr_err("Failed to block signals on threads start: %s\n",
+ strerror(errno));
+ return -1;
+ }
+
+ pthread_attr_init(&attrs);
+ pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
+
+ for (t = 0; t < nr_threads; t++) {
+ struct thread_struct *thread = &pool->threads[t];
+
+ if (pthread_create(&handle, &attrs, threadpool_thread, thread)) {
+ for (tt = 1; tt < t; tt++)
+ terminate_thread(thread);
+ pr_err("Failed to start threads: %s\n", strerror(errno));
+ ret = -1;
+ goto out_free_attr;
+ }
+
+ if (wait_thread(thread)) {
+ for (tt = 1; tt <= t; tt++)
+ terminate_thread(thread);
+ ret = -1;
+ goto out_free_attr;
+ }
+ }
+
+out_free_attr:
+ pthread_attr_destroy(&attrs);
+
+ if (sigprocmask(SIG_SETMASK, &mask, NULL)) {
+ pr_err("Failed to unblock signals on threads start: %s\n",
+ strerror(errno));
+ ret = -1;
+ }
+
+ return ret;
+}
+
+/**
+ * start_threadpool - start all threads in the pool.
+ *
+ * The function blocks until all threads are up and running.
+ */
+int start_threadpool(struct threadpool_struct *pool)
+{
+ int err;
+
+ if (pool->status != THREADPOOL_STATUS__STOPPED) {
+ pr_err("threadpool: starting not stopped pool\n");
+ return -1;
+ }
+
+ err = __start_threadpool(pool);
+ pool->status = err ? THREADPOOL_STATUS__ERROR : THREADPOOL_STATUS__READY;
+ return err;
+}
+
+/**
+ * stop_threadpool - stop all threads in the pool.
+ *
+ * This function blocks waiting for ack from all threads.
+ */
+int stop_threadpool(struct threadpool_struct *pool)
+{
+ int t, ret, err = 0;
+
+ if (pool->status != THREADPOOL_STATUS__READY) {
+ pr_err("threadpool: stopping not ready pool\n");
+ return -1;
+ }
+
+ for (t = 0; t < pool->nr_threads; t++) {
+ ret = terminate_thread(&pool->threads[t]);
+ if (ret && !err)
+ err = -1;
+ }
+
+ pool->status = err ? THREADPOOL_STATUS__ERROR : THREADPOOL_STATUS__STOPPED;
+
+ return err;
+}
+
+/**
+ * threadpool_is_ready - check if the threads are running
+ */
+bool threadpool_is_ready(struct threadpool_struct *pool)
+{
+ return pool->status == THREADPOOL_STATUS__READY;
+}
diff --git a/tools/perf/util/workqueue/threadpool.h b/tools/perf/util/workqueue/threadpool.h
index 2b9388c768a0b588..b62cad2b2c5dd331 100644
--- a/tools/perf/util/workqueue/threadpool.h
+++ b/tools/perf/util/workqueue/threadpool.h
@@ -14,6 +14,11 @@ struct task_struct {
extern struct threadpool_struct *create_threadpool(int n_threads);
extern void destroy_threadpool(struct threadpool_struct *pool);

+extern int start_threadpool(struct threadpool_struct *pool);
+extern int stop_threadpool(struct threadpool_struct *pool);
+
extern int threadpool_size(struct threadpool_struct *pool);

+extern bool threadpool_is_ready(struct threadpool_struct *pool);
+
#endif /* __WORKQUEUE_THREADPOOL_H */
--
2.31.1

2021-07-13 12:13:08

by Riccardo Mancini

[permalink] [raw]
Subject: [RFC PATCH 04/10] perf workqueue: add threadpool execute and wait functions

This patch adds:
- execute_in_threadpool: assigns a task to the threads to execute
asynchronously.
- wait_threadpool: waits for the task to complete on all threads.
Furthermore, testing for these new functions is added.

This patch completes the threadpool.

Signed-off-by: Riccardo Mancini <[email protected]>
---
tools/perf/tests/workqueue.c | 86 ++++++++++++++++++++-
tools/perf/util/workqueue/threadpool.c | 103 +++++++++++++++++++++++++
tools/perf/util/workqueue/threadpool.h | 5 ++
3 files changed, 193 insertions(+), 1 deletion(-)

diff --git a/tools/perf/tests/workqueue.c b/tools/perf/tests/workqueue.c
index be377e9897bab4e9..3c64db8203556847 100644
--- a/tools/perf/tests/workqueue.c
+++ b/tools/perf/tests/workqueue.c
@@ -1,13 +1,59 @@
// SPDX-License-Identifier: GPL-2.0
+#include <stdlib.h>
#include <linux/kernel.h>
+#include <linux/zalloc.h>
#include "tests.h"
#include "util/debug.h"
#include "util/workqueue/threadpool.h"

+#define DUMMY_FACTOR 100000
+#define N_DUMMY_WORK_SIZES 7
+
struct threadpool_test_args_t {
int pool_size;
};

+struct test_task {
+ struct task_struct task;
+ int n_threads;
+ int *array;
+};
+
+/**
+ * dummy_work - calculates DUMMY_FACTOR * (idx % N_DUMMY_WORK_SIZES) inefficiently
+ *
+ * This function uses modulus to create work items of different sizes.
+ */
+static void dummy_work(int idx)
+{
+ int prod = 0;
+ int k = idx % N_DUMMY_WORK_SIZES;
+ int i, j;
+
+ for (i = 0; i < DUMMY_FACTOR; i++)
+ for (j = 0; j < k; j++)
+ prod ++;
+
+ pr_debug3("dummy: %d * %d = %d\n", DUMMY_FACTOR, k, prod);
+}
+
+static void test_task_fn1(int tidx, struct task_struct *task)
+{
+ struct test_task *mtask = container_of(task, struct test_task, task);
+
+ dummy_work(tidx);
+ mtask->array[tidx] = tidx+1;
+}
+
+static void test_task_fn2(int tidx, struct task_struct *task)
+{
+ struct test_task *mtask = container_of(task, struct test_task, task);
+
+ dummy_work(tidx);
+ mtask->array[tidx] = tidx*2;
+}
+
+
static int __threadpool__prepare(struct threadpool_struct **pool, int pool_size)
{
int ret;
@@ -38,21 +84,59 @@ static int __threadpool__teardown(struct threadpool_struct *pool)
return 0;
}

+static int __threadpool__exec_wait(struct threadpool_struct *pool,
+ struct task_struct *task)
+{
+ int ret;
+
+ ret = execute_in_threadpool(pool, task);
+ TEST_ASSERT_VAL("threadpool execute failure", ret == 0);
+ TEST_ASSERT_VAL("threadpool is not executing", threadpool_is_busy(pool));
+
+ ret = wait_threadpool(pool);
+ TEST_ASSERT_VAL("threadpool wait failure", ret == 0);
+ TEST_ASSERT_VAL("waited threadpool is not ready", threadpool_is_ready(pool));
+
+ return 0;
+}

static int __test__threadpool(void *_args)
{
struct threadpool_test_args_t *args = _args;
struct threadpool_struct *pool;
- int ret;
+ int ret, i;
+ struct test_task task;
+
+ task.task.fn = test_task_fn1;
+ task.n_threads = args->pool_size;
+ task.array = calloc(args->pool_size, sizeof(*task.array));

ret = __threadpool__prepare(&pool, args->pool_size);
if (ret)
return ret;

+ ret = __threadpool__exec_wait(pool, &task.task);
+ if (ret)
+ return ret;
+
+ for (i = 0; i < args->pool_size; i++)
+ TEST_ASSERT_VAL("failed array check (1)", task.array[i] == i+1);
+
+ task.task.fn = test_task_fn2;
+
+ ret = __threadpool__exec_wait(pool, &task.task);
+ if (ret)
+ return ret;
+
+ for (i = 0; i < args->pool_size; i++)
+ TEST_ASSERT_VAL("failed array check (2)", task.array[i] == 2*i);
+
ret = __threadpool__teardown(pool);
if (ret)
return ret;

+ free(task.array);
+
return 0;
}

diff --git a/tools/perf/util/workqueue/threadpool.c b/tools/perf/util/workqueue/threadpool.c
index f4635ff782b9388e..720c7b2a562d6816 100644
--- a/tools/perf/util/workqueue/threadpool.c
+++ b/tools/perf/util/workqueue/threadpool.c
@@ -21,6 +21,7 @@ static inline pid_t gettid(void)
enum threadpool_status {
THREADPOOL_STATUS__STOPPED, /* no threads */
THREADPOOL_STATUS__READY, /* threads are ready but idle */
+ THREADPOOL_STATUS__BUSY, /* threads are busy */
THREADPOOL_STATUS__ERROR, /* errors */
THREADPOOL_STATUS__MAX
};
@@ -164,6 +165,28 @@ static int terminate_thread(struct thread_struct *thread)
return res;
}

+/**
+ * wake_thread - send wake msg to @thread
+ *
+ * This function does not wait for the thread to actually wake
+ * NB: call only from main thread!
+ */
+static int wake_thread(struct thread_struct *thread)
+{
+ int res;
+ enum thread_msg msg = THREAD_MSG__WAKE;
+
+ res = write(thread->pipes.to[1], &msg, sizeof(msg));
+ if (res < 0) {
+ pr_err("threadpool: error sending wake msg: %s\n", strerror(errno));
+ return -1;
+ }
+
+ pr_debug2("threadpool: sent wake msg %s to tid=%d\n",
+ thread_msg_tags[msg], thread->tid);
+ return 0;
+}
+
/**
* threadpool_thread - function running on thread
*
@@ -207,6 +230,15 @@ static void *threadpool_thread(void *args)

if (msg == THREAD_MSG__STOP)
break;
+
+ if (!thread->pool->current_task) {
+ pr_err("threadpool[%d]: received wake without task\n",
+ thread->tid);
+ break;
+ }
+
+ pr_debug("threadpool[%d]: executing task\n", thread->tid);
+ thread->pool->current_task->fn(thread->idx, thread->pool->current_task);
}

pr_debug2("threadpool[%d]: exit\n", thread->tid);
@@ -383,11 +415,16 @@ int start_threadpool(struct threadpool_struct *pool)
* stop_threadpool - stop all threads in the pool.
*
* This function blocks waiting for ack from all threads.
+ * If the pool was busy, it will first wait for the task to finish.
*/
int stop_threadpool(struct threadpool_struct *pool)
{
int t, ret, err = 0;

+ err = wait_threadpool(pool);
+ if (err)
+ return err;
+
if (pool->status != THREADPOOL_STATUS__READY) {
pr_err("threadpool: stopping not ready pool\n");
return -1;
@@ -411,3 +448,69 @@ bool threadpool_is_ready(struct threadpool_struct *pool)
{
return pool->status == THREADPOOL_STATUS__READY;
}
+
+/**
+ * execute_in_threadpool - execute @task on all threads of the @pool
+ *
+ * The task will run asynchronously wrt the main thread.
+ * The task can be waited with wait_threadpool.
+ *
+ * NB: make sure the pool is ready before calling this, since no queueing is
+ * performed. If you need queueing, have a look at the workqueue.
+ */
+int execute_in_threadpool(struct threadpool_struct *pool, struct task_struct *task)
+{
+ int t, err;
+
+ WARN_ON(pool->status != THREADPOOL_STATUS__READY);
+
+ pool->current_task = task;
+
+ for (t = 0; t < pool->nr_threads; t++) {
+ err = wake_thread(&pool->threads[t]);
+
+ if (err) {
+ pool->status = THREADPOOL_STATUS__ERROR;
+ return err;
+ }
+ }
+
+ pool->status = THREADPOOL_STATUS__BUSY;
+ return 0;
+}
+
+/**
+ * wait_threadpool - wait until all threads in @pool are done
+ *
+ * This function will wait for all threads to finish execution and send their
+ * ack message.
+ *
+ * NB: call only from main thread!
+ */
+int wait_threadpool(struct threadpool_struct *pool)
+{
+ int t, err = 0, ret;
+
+ if (pool->status != THREADPOOL_STATUS__BUSY)
+ return 0;
+
+ for (t = 0; t < pool->nr_threads; t++) {
+ ret = wait_thread(&pool->threads[t]);
+ if (ret) {
+ pool->status = THREADPOOL_STATUS__ERROR;
+ err = -1;
+ }
+ }
+
+ pool->status = err ? THREADPOOL_STATUS__ERROR : THREADPOOL_STATUS__READY;
+ pool->current_task = NULL;
+ return err;
+}
+
+/**
+ * threadpool_is_busy - check if the pool is busy
+ */
+int threadpool_is_busy(struct threadpool_struct *pool)
+{
+ return pool->status == THREADPOOL_STATUS__BUSY;
+}
diff --git a/tools/perf/util/workqueue/threadpool.h b/tools/perf/util/workqueue/threadpool.h
index b62cad2b2c5dd331..dd9c2103ebe8d23b 100644
--- a/tools/perf/util/workqueue/threadpool.h
+++ b/tools/perf/util/workqueue/threadpool.h
@@ -17,8 +17,13 @@ extern void destroy_threadpool(struct threadpool_struct *pool);
extern int start_threadpool(struct threadpool_struct *pool);
extern int stop_threadpool(struct threadpool_struct *pool);

+extern int execute_in_threadpool(struct threadpool_struct *pool,
+ struct task_struct *task);
+extern int wait_threadpool(struct threadpool_struct *pool);
+
extern int threadpool_size(struct threadpool_struct *pool);

extern bool threadpool_is_ready(struct threadpool_struct *pool);
+extern int threadpool_is_busy(struct threadpool_struct *pool);

#endif /* __WORKQUEUE_THREADPOOL_H */
--
2.31.1

2021-07-13 12:13:57

by Riccardo Mancini

[permalink] [raw]
Subject: [RFC PATCH 05/10] perf workqueue: add sparse annotation header

This patch adds a simple header containing sparse annotations.

TODO: what is the best place to put this?

Signed-off-by: Riccardo Mancini <[email protected]>
---
tools/perf/util/workqueue/sparse.h | 21 +++++++++++++++++++++
1 file changed, 21 insertions(+)
create mode 100644 tools/perf/util/workqueue/sparse.h

diff --git a/tools/perf/util/workqueue/sparse.h b/tools/perf/util/workqueue/sparse.h
new file mode 100644
index 0000000000000000..644f6db8f050ab50
--- /dev/null
+++ b/tools/perf/util/workqueue/sparse.h
@@ -0,0 +1,21 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef __WORKQUEUE_SPARSE_H
+#define __WORKQUEUE_SPARSE_H
+
+#ifdef __CHECKER__
+# define __must_hold(x) __attribute__((context(x, 1, 1)))
+# define __acquires(x) __attribute__((context(x, 0, 1)))
+# define __releases(x) __attribute__((context(x, 1, 0)))
+# define __acquire(x) __context__(x, 1)
+# define __release(x) __context__(x, -1)
+# define __cond_lock(x, c) ((c) ? ({ __acquire(x); 1; }) : 0)
+#else
+# define __must_hold(x)
+# define __acquires(x)
+# define __releases(x)
+# define __acquire(x) ((void)0)
+# define __release(x) ((void)0)
+# define __cond_lock(x, c) (c)
+#endif
+
+#endif /* __WORKQUEUE_SPARSE_H */
--
2.31.1

2021-07-13 12:14:09

by Riccardo Mancini

[permalink] [raw]
Subject: [RFC PATCH 10/10] perf synthetic-events: use workqueue parallel_for

To generate synthetic events, perf has the option to use multiple
threads. These threads are created manually using pthread_created.

This patch replaces the manual pthread_create with a workqueue,
using the parallel_for utility.

Experimental results show that workqueue has a higher overhead, but
this is repayed by the improved work balancing among threads.

Results of perf bench before and after are reported below:
Command: sudo ./perf bench internals synthesize -t
Average synthesis time in usec is reported.

Laptop (dual core i7 w/ hyperthreading), avg num events ~14200:
N pthread (before) workqueue (after)
1 70714.400 +- 908.789 73306.000 +- 1597.868
2 77426.700 +- 2986.579 46782.300 +- 326.221
3 53176.300 +- 3405.635 41614.100 +- 239.827
4 50760.900 +- 702.623 41071.300 +- 230.200

VM (16 vCPUs over 16 core Intel Xeon E5-2630L v3), avg num events ~2760:
N pthread (before) workqueue (after)
1 30309.500 +- 578.283 34252.000 +- 839.474
2 23815.200 +- 1339.102 28487.200 +- 1423.481
3 20644.300 +- 311.573 19220.200 +- 1436.024
4 19091.500 +- 446.109 15048.600 +- 319.138
5 17574.000 +- 988.612 14938.500 +- 411.078
6 18908.900 +- 520.676 13997.600 +- 358.668
7 19275.700 +- 631.989 11371.400 +- 365.038
8 15671.200 +- 306.727 11964.800 +- 338.021
9 14660.900 +- 333.218 11762.800 +- 652.763
10 12490.200 +- 579.211 11832.300 +- 200.601
11 18052.900 +- 941.578 13166.900 +- 704.318
12 14253.600 +- 354.332 12012.000 +- 309.724
13 12219.000 +- 516.438 12023.800 +- 273.626
14 15896.600 +- 442.419 11764.600 +- 353.961
15 15087.200 +- 337.612 11942.600 +- 304.102
16 15368.700 +- 336.785 13625.200 +- 715.125

Signed-off-by: Riccardo Mancini <[email protected]>
---
tools/perf/util/synthetic-events.c | 131 ++++++++++++-----------------
1 file changed, 56 insertions(+), 75 deletions(-)

diff --git a/tools/perf/util/synthetic-events.c b/tools/perf/util/synthetic-events.c
index 35aa0c0f7cd955b2..a55c7fa41b4f86d3 100644
--- a/tools/perf/util/synthetic-events.c
+++ b/tools/perf/util/synthetic-events.c
@@ -41,6 +41,7 @@
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
+#include "util/workqueue/workqueue.h"

#define DEFAULT_PROC_MAP_PARSE_TIMEOUT 500

@@ -882,16 +883,13 @@ static int __perf_event__synthesize_threads(struct perf_tool *tool,
perf_event__handler_t process,
struct machine *machine,
bool mmap_data,
- struct dirent **dirent,
- int start,
- int num)
+ char *d_name)
{
union perf_event *comm_event, *mmap_event, *fork_event;
union perf_event *namespaces_event;
int err = -1;
char *end;
pid_t pid;
- int i;

comm_event = malloc(sizeof(comm_event->comm) + machine->id_hdr_size);
if (comm_event == NULL)
@@ -911,24 +909,22 @@ static int __perf_event__synthesize_threads(struct perf_tool *tool,
if (namespaces_event == NULL)
goto out_free_fork;

- for (i = start; i < start + num; i++) {
- if (!isdigit(dirent[i]->d_name[0]))
- continue;
+ if (!isdigit(d_name[0]))
+ goto out_free_namespaces;

- pid = (pid_t)strtol(dirent[i]->d_name, &end, 10);
- /* only interested in proper numerical dirents */
- if (*end)
- continue;
- /*
- * We may race with exiting thread, so don't stop just because
- * one thread couldn't be synthesized.
- */
- __event__synthesize_thread(comm_event, mmap_event, fork_event,
- namespaces_event, pid, 1, process,
- tool, machine, mmap_data);
- }
+ pid = (pid_t)strtol(d_name, &end, 10);
+ /* only interested in proper numerical dirents */
+ if (*end)
+ goto out_free_namespaces;
+ /*
+ * We may race with exiting thread, so don't stop just because
+ * one thread couldn't be synthesized.
+ */
+ __event__synthesize_thread(comm_event, mmap_event, fork_event,
+ namespaces_event, pid, 1, process,
+ tool, machine, mmap_data);
err = 0;
-
+out_free_namespaces:
free(namespaces_event);
out_free_fork:
free(fork_event);
@@ -946,19 +942,15 @@ struct synthesize_threads_arg {
struct machine *machine;
bool mmap_data;
struct dirent **dirent;
- int num;
- int start;
};

-static void *synthesize_threads_worker(void *arg)
+static void synthesize_threads_worker(int i, void *arg)
{
struct synthesize_threads_arg *args = arg;

__perf_event__synthesize_threads(args->tool, args->process,
args->machine, args->mmap_data,
- args->dirent,
- args->start, args->num);
- return NULL;
+ args->dirent[i]->d_name);
}

int perf_event__synthesize_threads(struct perf_tool *tool,
@@ -967,15 +959,14 @@ int perf_event__synthesize_threads(struct perf_tool *tool,
bool mmap_data,
unsigned int nr_threads_synthesize)
{
- struct synthesize_threads_arg *args = NULL;
- pthread_t *synthesize_threads = NULL;
+ struct synthesize_threads_arg args;
char proc_path[PATH_MAX];
struct dirent **dirent;
- int num_per_thread;
- int m, n, i, j;
+ int n, i;
int thread_nr;
- int base = 0;
- int err = -1;
+ int err = -1, ret;
+ struct threadpool_struct *pool;
+ struct workqueue_struct *wq;


if (machine__is_default_guest(machine))
@@ -992,54 +983,44 @@ int perf_event__synthesize_threads(struct perf_tool *tool,
thread_nr = nr_threads_synthesize;

if (thread_nr <= 1) {
- err = __perf_event__synthesize_threads(tool, process,
- machine, mmap_data,
- dirent, base, n);
+ for (i = 0; i < n; i++)
+ err = __perf_event__synthesize_threads(tool, process,
+ machine, mmap_data,
+ dirent[i]->d_name);
goto free_dirent;
}
- if (thread_nr > n)
- thread_nr = n;

- synthesize_threads = calloc(sizeof(pthread_t), thread_nr);
- if (synthesize_threads == NULL)
+ pool = create_threadpool(thread_nr);
+ if (!pool)
goto free_dirent;

- args = calloc(sizeof(*args), thread_nr);
- if (args == NULL)
- goto free_threads;
-
- num_per_thread = n / thread_nr;
- m = n % thread_nr;
- for (i = 0; i < thread_nr; i++) {
- args[i].tool = tool;
- args[i].process = process;
- args[i].machine = machine;
- args[i].mmap_data = mmap_data;
- args[i].dirent = dirent;
- }
- for (i = 0; i < m; i++) {
- args[i].num = num_per_thread + 1;
- args[i].start = i * args[i].num;
- }
- if (i != 0)
- base = args[i-1].start + args[i-1].num;
- for (j = i; j < thread_nr; j++) {
- args[j].num = num_per_thread;
- args[j].start = base + (j - i) * args[i].num;
- }
-
- for (i = 0; i < thread_nr; i++) {
- if (pthread_create(&synthesize_threads[i], NULL,
- synthesize_threads_worker, &args[i]))
- goto out_join;
- }
- err = 0;
-out_join:
- for (i = 0; i < thread_nr; i++)
- pthread_join(synthesize_threads[i], NULL);
- free(args);
-free_threads:
- free(synthesize_threads);
+ err = start_threadpool(pool);
+ if (err)
+ goto free_pool;
+
+ wq = create_workqueue(pool);
+ if (!wq)
+ goto stop_pool;
+
+ args.tool = tool;
+ args.process = process;
+ args.machine = machine;
+ args.mmap_data = mmap_data;
+ args.dirent = dirent;
+
+ ret = parallel_for(wq, 0, n, 1, synthesize_threads_worker, &args);
+ if (ret)
+ err = ret;
+
+ ret = destroy_workqueue(wq);
+ if (ret)
+ err = ret;
+stop_pool:
+ ret = stop_threadpool(pool);
+ if (ret)
+ err = ret;
+free_pool:
+ destroy_threadpool(pool);
free_dirent:
for (i = 0; i < n; i++)
zfree(&dirent[i]);
--
2.31.1

2021-07-13 12:14:58

by Riccardo Mancini

[permalink] [raw]
Subject: [RFC PATCH 06/10] perf workqueue: introduce workqueue struct

This patch adds the workqueue definition, along with simple creation and
destruction functions.
Furthermore, a simple subtest is added.

A workqueue is attached to a pool, on which it executes its workers.
Next patches will introduce workers.

Signed-off-by: Riccardo Mancini <[email protected]>
---
tools/perf/tests/workqueue.c | 92 +++++++++++++
tools/perf/util/workqueue/Build | 1 +
tools/perf/util/workqueue/workqueue.c | 184 ++++++++++++++++++++++++++
tools/perf/util/workqueue/workqueue.h | 24 ++++
4 files changed, 301 insertions(+)
create mode 100644 tools/perf/util/workqueue/workqueue.c
create mode 100644 tools/perf/util/workqueue/workqueue.h

diff --git a/tools/perf/tests/workqueue.c b/tools/perf/tests/workqueue.c
index 3c64db8203556847..423dc8a92ca2563c 100644
--- a/tools/perf/tests/workqueue.c
+++ b/tools/perf/tests/workqueue.c
@@ -5,6 +5,7 @@
#include "tests.h"
#include "util/debug.h"
#include "util/workqueue/threadpool.h"
+#include "util/workqueue/workqueue.h"

#define DUMMY_FACTOR 100000
#define N_DUMMY_WORK_SIZES 7
@@ -13,6 +14,11 @@ struct threadpool_test_args_t {
int pool_size;
};

+struct workqueue_test_args_t {
+ int pool_size;
+ int n_work_items;
+};
+
struct test_task {
struct task_struct task;
int n_threads;
@@ -140,6 +146,58 @@ static int __test__threadpool(void *_args)
return 0;
}

+
+static int __workqueue__prepare(struct threadpool_struct **pool,
+ struct workqueue_struct **wq,
+ int pool_size)
+{
+ int ret;
+
+ ret = __threadpool__prepare(pool, pool_size);
+ if (ret)
+ return ret;
+
+ *wq = create_workqueue(*pool);
+ TEST_ASSERT_VAL("workqueue creation failure", *wq);
+ TEST_ASSERT_VAL("workqueue wrong size", workqueue_nr_threads(*wq) == pool_size);
+ TEST_ASSERT_VAL("threadpool is not executing", threadpool_is_busy(*pool));
+
+ return 0;
+}
+
+static int __workqueue__teardown(struct threadpool_struct *pool,
+ struct workqueue_struct *wq)
+{
+ int ret;
+
+ ret = destroy_workqueue(wq);
+ TEST_ASSERT_VAL("workqueue detruction failure", ret == 0);
+
+ ret = __threadpool__teardown(pool);
+ if (ret)
+ return ret;
+
+ return 0;
+}
+
+static int __test__workqueue(void *_args)
+{
+ struct workqueue_test_args_t *args = _args;
+ struct threadpool_struct *pool;
+ struct workqueue_struct *wq;
+ int ret;
+
+ ret = __workqueue__prepare(&pool, &wq, args->pool_size);
+ if (ret)
+ return ret;
+
+ ret = __workqueue__teardown(pool, wq);
+ if (ret)
+ return ret;
+
+ return 0;
+}
+
static const struct threadpool_test_args_t threadpool_test_args[] = {
{
.pool_size = 1
@@ -158,6 +216,33 @@ static const struct threadpool_test_args_t threadpool_test_args[] = {
}
};

+static const struct workqueue_test_args_t workqueue_test_args[] = {
+ {
+ .pool_size = 1,
+ .n_work_items = 1
+ },
+ {
+ .pool_size = 1,
+ .n_work_items = 10
+ },
+ {
+ .pool_size = 2,
+ .n_work_items = 1
+ },
+ {
+ .pool_size = 2,
+ .n_work_items = 100
+ },
+ {
+ .pool_size = 16,
+ .n_work_items = 7
+ },
+ {
+ .pool_size = 16,
+ .n_work_items = 2789
+ }
+};
+
struct test_case {
const char *desc;
int (*func)(void *args);
@@ -173,6 +258,13 @@ static struct test_case workqueue_testcase_table[] = {
.args = (void *) threadpool_test_args,
.n_args = (int)ARRAY_SIZE(threadpool_test_args),
.arg_size = sizeof(struct threadpool_test_args_t)
+ },
+ {
+ .desc = "Workqueue",
+ .func = __test__workqueue,
+ .args = (void *) workqueue_test_args,
+ .n_args = (int)ARRAY_SIZE(workqueue_test_args),
+ .arg_size = sizeof(struct workqueue_test_args_t)
}
};

diff --git a/tools/perf/util/workqueue/Build b/tools/perf/util/workqueue/Build
index 8b72a6cd4e2cba0d..4af721345c0a6bb7 100644
--- a/tools/perf/util/workqueue/Build
+++ b/tools/perf/util/workqueue/Build
@@ -1 +1,2 @@
perf-y += threadpool.o
+perf-y += workqueue.o
diff --git a/tools/perf/util/workqueue/workqueue.c b/tools/perf/util/workqueue/workqueue.c
new file mode 100644
index 0000000000000000..5099252a0662e788
--- /dev/null
+++ b/tools/perf/util/workqueue/workqueue.c
@@ -0,0 +1,184 @@
+// SPDX-License-Identifier: GPL-2.0
+#include <stdlib.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <pthread.h>
+#include <linux/list.h>
+#include "debug.h"
+#include "workqueue.h"
+
+enum workqueue_status {
+ WORKQUEUE_STATUS__READY, /* wq is ready to receive work */
+ WORKQUEUE_STATUS__ERROR,
+ WORKQUEUE_STATUS__MAX
+};
+
+struct workqueue_struct {
+ pthread_mutex_t lock; /* locking of the thread_pool */
+ pthread_cond_t idle_cond; /* all workers are idle cond */
+ struct threadpool_struct *pool; /* underlying pool */
+ struct task_struct task; /* threadpool task */
+ struct list_head busy_list; /* busy workers */
+ struct list_head idle_list; /* idle workers */
+ struct list_head pending; /* pending work items */
+ int msg_pipe[2]; /* main thread comm pipes */
+ enum workqueue_status status;
+};
+
+/**
+ * worker_thread - worker function executed on threadpool
+ */
+static void worker_thread(int tidx, struct task_struct *task)
+{
+ struct workqueue_struct *wq = container_of(task, struct workqueue_struct, task);
+
+ pr_debug("hi from worker %d. Pool is in status %d\n", tidx, wq->status);
+}
+
+/**
+ * attach_threadpool_to_workqueue - start @wq workers on @pool
+ */
+static int attach_threadpool_to_workqueue(struct workqueue_struct *wq,
+ struct threadpool_struct *pool)
+{
+ int err;
+
+ if (!threadpool_is_ready(pool)) {
+ pr_err("workqueue: cannot attach to pool: pool is not ready\n");
+ return -1;
+ }
+
+ wq->pool = pool;
+
+ err = execute_in_threadpool(pool, &wq->task);
+ if (err)
+ return -1;
+
+ return 0;
+}
+
+/**
+ * detach_threadpool_from_workqueue - stop @wq workers on @pool
+ */
+static int detach_threadpool_from_workqueue(struct workqueue_struct *wq)
+{
+ int ret, err = 0;
+
+ if (wq->status != WORKQUEUE_STATUS__READY) {
+ pr_err("workqueue: cannot attach to pool: wq is not ready\n");
+ return -1;
+ }
+
+ ret = wait_threadpool(wq->pool);
+ if (ret) {
+ pr_err("workqueue: error waiting threadpool\n");
+ err = -1;
+ }
+
+ wq->pool = NULL;
+ return err;
+}
+
+/**
+ * create_workqueue - create a workqueue associated to @pool
+ *
+ * Only one workqueue can execute on a pool at a time.
+ */
+struct workqueue_struct *create_workqueue(struct threadpool_struct *pool)
+{
+ int err;
+ struct workqueue_struct *wq = malloc(sizeof(struct workqueue_struct));
+
+
+ err = pthread_mutex_init(&wq->lock, NULL);
+ if (err)
+ goto out_free_wq;
+
+ err = pthread_cond_init(&wq->idle_cond, NULL);
+ if (err)
+ goto out_destroy_mutex;
+
+ wq->pool = NULL;
+ INIT_LIST_HEAD(&wq->busy_list);
+ INIT_LIST_HEAD(&wq->idle_list);
+
+ INIT_LIST_HEAD(&wq->pending);
+
+ err = pipe(wq->msg_pipe);
+ if (err)
+ goto out_destroy_cond;
+
+ wq->task.fn = worker_thread;
+
+ err = attach_threadpool_to_workqueue(wq, pool);
+ if (err)
+ goto out_destroy_cond;
+
+ wq->status = WORKQUEUE_STATUS__READY;
+
+ return wq;
+
+out_destroy_cond:
+ pthread_cond_destroy(&wq->idle_cond);
+out_destroy_mutex:
+ pthread_mutex_destroy(&wq->lock);
+out_free_wq:
+ free(wq);
+ return NULL;
+}
+
+/**
+ * destroy_workqueue - stop @wq workers and destroy @wq
+ */
+int destroy_workqueue(struct workqueue_struct *wq)
+{
+ int err = 0, ret;
+
+ ret = detach_threadpool_from_workqueue(wq);
+ if (ret) {
+ pr_err("workqueue: error detaching from threadpool.\n");
+ err = -1;
+ }
+
+ ret = pthread_mutex_destroy(&wq->lock);
+ if (ret) {
+ err = -1;
+ pr_err("workqueue: error pthread_mutex_destroy: %s\n",
+ strerror(errno));
+ }
+
+ ret = pthread_cond_destroy(&wq->idle_cond);
+ if (ret) {
+ err = -1;
+ pr_err("workqueue: error pthread_cond_destroy: %s\n",
+ strerror(errno));
+ }
+
+ ret = close(wq->msg_pipe[0]);
+ if (ret) {
+ err = -1;
+ pr_err("workqueue: error close msg_pipe[0]: %s\n",
+ strerror(errno));
+ }
+
+ ret = close(wq->msg_pipe[1]);
+ if (ret) {
+ err = -1;
+ pr_err("workqueue: error close msg_pipe[1]: %s\n",
+ strerror(errno));
+ }
+
+ free(wq);
+
+ return err;
+}
+
+/**
+ * workqueue_nr_threads - get size of threadpool underlying @wq
+ */
+int workqueue_nr_threads(struct workqueue_struct *wq)
+{
+ return threadpool_size(wq->pool);
+}
diff --git a/tools/perf/util/workqueue/workqueue.h b/tools/perf/util/workqueue/workqueue.h
new file mode 100644
index 0000000000000000..86ec1d69274f41db
--- /dev/null
+++ b/tools/perf/util/workqueue/workqueue.h
@@ -0,0 +1,24 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef __WORKQUEUE_WORKQUEUE_H
+#define __WORKQUEUE_WORKQUEUE_H
+
+#include <stdlib.h>
+#include <sys/types.h>
+#include <linux/list.h>
+#include "threadpool.h"
+
+struct work_struct;
+typedef void (*work_func_t)(struct work_struct *work);
+
+struct work_struct {
+ struct list_head entry;
+ work_func_t func;
+};
+
+struct workqueue_struct;
+
+extern struct workqueue_struct *create_workqueue(struct threadpool_struct *pool);
+extern int destroy_workqueue(struct workqueue_struct *wq);
+
+extern int workqueue_nr_threads(struct workqueue_struct *wq);
+#endif /* __WORKQUEUE_WORKQUEUE_H */
--
2.31.1

2021-07-13 12:15:15

by Riccardo Mancini

[permalink] [raw]
Subject: [RFC PATCH 09/10] perf workqueue: add utility to execute a for loop in parallel

This patch adds the parallel_for which executes a given function inside
the workqueue, taking care of managing the work items.

Signed-off-by: Riccardo Mancini <[email protected]>
---
tools/perf/tests/workqueue.c | 84 +++++++++++++++++
tools/perf/util/workqueue/workqueue.c | 125 ++++++++++++++++++++++++++
tools/perf/util/workqueue/workqueue.h | 7 ++
3 files changed, 216 insertions(+)

diff --git a/tools/perf/tests/workqueue.c b/tools/perf/tests/workqueue.c
index f71a839d5752d224..462a17904f2858db 100644
--- a/tools/perf/tests/workqueue.c
+++ b/tools/perf/tests/workqueue.c
@@ -19,6 +19,12 @@ struct workqueue_test_args_t {
int n_work_items;
};

+struct parallel_for_test_args_t {
+ int pool_size;
+ int n_work_items;
+ int work_size;
+};
+
struct test_task {
struct task_struct task;
int n_threads;
@@ -265,6 +271,44 @@ static int __test__workqueue(void *_args)
return 0;
}

+static void test_pfw_fn(int i, void *args)
+{
+ int *array = args;
+
+ dummy_work(i);
+ array[i] = i+1;
+}
+
+static int __test__parallel_for(void *_args)
+{
+ struct parallel_for_test_args_t *args = _args;
+ struct threadpool_struct *pool;
+ struct workqueue_struct *wq;
+ int ret, i;
+ int *array;
+
+ array = calloc(args->n_work_items, sizeof(*array));
+
+ ret = __workqueue__prepare(&pool, &wq, args->pool_size);
+ if (ret)
+ return ret;
+
+ ret = parallel_for(wq, 0, args->n_work_items, args->work_size,
+ test_pfw_fn, array);
+ TEST_ASSERT_VAL("parallel_for failure", ret == 0);
+
+ for (i = 0; i < args->n_work_items; i++)
+ TEST_ASSERT_VAL("failed array check", array[i] == i+1);
+
+ ret = __workqueue__teardown(pool, wq);
+ if (ret)
+ return ret;
+
+ free(array);
+
+ return 0;
+}
+
static const struct threadpool_test_args_t threadpool_test_args[] = {
{
.pool_size = 1
@@ -310,6 +354,39 @@ static const struct workqueue_test_args_t workqueue_test_args[] = {
}
};

+static const struct parallel_for_test_args_t parallel_for_test_args[] = {
+ {
+ .pool_size = 1,
+ .n_work_items = 1,
+ .work_size = 1
+ },
+ {
+ .pool_size = 1,
+ .n_work_items = 10,
+ .work_size = 3
+ },
+ {
+ .pool_size = 2,
+ .n_work_items = 1,
+ .work_size = 1
+ },
+ {
+ .pool_size = 2,
+ .n_work_items = 100,
+ .work_size = 10
+ },
+ {
+ .pool_size = 16,
+ .n_work_items = 7,
+ .work_size = 2
+ },
+ {
+ .pool_size = 16,
+ .n_work_items = 2789,
+ .work_size = 16
+ }
+};
+
struct test_case {
const char *desc;
int (*func)(void *args);
@@ -332,6 +409,13 @@ static struct test_case workqueue_testcase_table[] = {
.args = (void *) workqueue_test_args,
.n_args = (int)ARRAY_SIZE(workqueue_test_args),
.arg_size = sizeof(struct workqueue_test_args_t)
+ },
+ {
+ .desc = "Workqueue parallel-for",
+ .func = __test__parallel_for,
+ .args = (void *) parallel_for_test_args,
+ .n_args = (int)ARRAY_SIZE(parallel_for_test_args),
+ .arg_size = sizeof(struct parallel_for_test_args_t)
}
};

diff --git a/tools/perf/util/workqueue/workqueue.c b/tools/perf/util/workqueue/workqueue.c
index 20d196de9500d369..e69ed1568228a261 100644
--- a/tools/perf/util/workqueue/workqueue.c
+++ b/tools/perf/util/workqueue/workqueue.c
@@ -515,3 +515,128 @@ void init_work(struct work_struct *work)
{
INIT_LIST_HEAD(&work->entry);
}
+
+/* Parallel-for utility */
+
+#define ceil_div(a, b) (((a)+(b)-1)/(b))
+
+struct parallel_for_work {
+ struct work_struct work; /* work item that is queued */
+ parallel_for_func_t func; /* function to execute for each item */
+ void *args; /* additional args to pass to func */
+ int start; /* first item to execute */
+ int num; /* number of items to execute */
+};
+
+/**
+ * parallel_for_work_fn - execute parallel_for_work.func in parallel
+ *
+ * This function will be executed by workqueue's workers.
+ */
+static void parallel_for_work_fn(struct work_struct *work)
+{
+ struct parallel_for_work *pfw = container_of(work, struct parallel_for_work, work);
+ int i;
+
+ for (i = 0; i < pfw->num; i++)
+ pfw->func(pfw->start+i, pfw->args);
+}
+
+static inline void init_parallel_for_work(struct parallel_for_work *pfw,
+ parallel_for_func_t func, void *args,
+ int start, int num)
+{
+ init_work(&pfw->work);
+ pfw->work.func = parallel_for_work_fn;
+ pfw->func = func;
+ pfw->args = args;
+ pfw->start = start;
+ pfw->num = num;
+
+ pr_debug2("pfw: start=%d, num=%d\n", start, num);
+}
+
+/**
+ * parallel_for - execute @func in parallel over indexes between @from and @to
+ * @wq: workqueue that will run @func in parallel
+ * @from: first index
+ * @to: last index (excluded)
+ * @work_size: number of indexes to handle on the same work item.
+ * ceil((to-from)/work_size) work items will be added to @wq
+ * NB: this is only a hint. The function will reduce the size of
+ * the work items to fill all workers.
+ * @func: function to execute in parallel
+ * @args: additional arguments to @func
+ *
+ * This function is equivalent to:
+ * for (i = from; i < to; i++) {
+ * // parallel
+ * func(i, args);
+ * }
+ * // sync
+ *
+ * This function takes care of:
+ * - creating balanced work items to submit to workqueue
+ * - submitting the work items to the workqueue
+ * - waiting for completion of the work items
+ * - cleanup of the work items
+ */
+int parallel_for(struct workqueue_struct *wq, int from, int to, int work_size,
+ parallel_for_func_t func, void *args)
+{
+ int n = to-from;
+ int n_work_items;
+ int nr_threads = workqueue_nr_threads(wq);
+ int i, j, start, num, m, base, num_per_item;
+ struct parallel_for_work *pfw_array;
+ int err = 0;
+
+ if (work_size <= 0) {
+ pr_err("workqueue parallel-for: work_size must be >0\n");
+ return -EINVAL;
+ }
+
+ if (to < from) {
+ pr_err("workqueue parallel-for: to must be >= from\n");
+ return -EINVAL;
+ } else if (to == from) {
+ pr_info("workqueue parallel-for: skip since from == to\n");
+ return 0;
+ }
+
+ n_work_items = ceil_div(n, work_size);
+ if (n_work_items < nr_threads)
+ n_work_items = min(n, nr_threads);
+
+ pfw_array = calloc(n_work_items, sizeof(*pfw_array));
+
+ num_per_item = n / n_work_items;
+ m = n % n_work_items;
+
+ for (i = 0; i < m; i++) {
+ num = num_per_item + 1;
+ start = i * num;
+ init_parallel_for_work(&pfw_array[i], func, args, start, num);
+ err = queue_work(wq, &pfw_array[i].work);
+ if (err)
+ goto out;
+ }
+ if (i != 0)
+ base = pfw_array[i-1].start + pfw_array[i-1].num;
+ else
+ base = 0;
+ for (j = i; j < n_work_items; j++) {
+ num = num_per_item;
+ start = base + (j - i) * num;
+ init_parallel_for_work(&pfw_array[j], func, args, start, num);
+ err = queue_work(wq, &pfw_array[j].work);
+ if (err)
+ goto out;
+ }
+
+out:
+ err = flush_workqueue(wq);
+
+ free(pfw_array);
+ return err;
+}
diff --git a/tools/perf/util/workqueue/workqueue.h b/tools/perf/util/workqueue/workqueue.h
index 719bd0e5fb0ce7b7..409acacbdba9e60d 100644
--- a/tools/perf/util/workqueue/workqueue.h
+++ b/tools/perf/util/workqueue/workqueue.h
@@ -28,4 +28,11 @@ extern int flush_workqueue(struct workqueue_struct *wq);

extern void init_work(struct work_struct *work);

+/* parallel_for utility */
+
+typedef void (*parallel_for_func_t)(int i, void *args);
+
+extern int parallel_for(struct workqueue_struct *wq, int from, int to, int work_size,
+ parallel_for_func_t func, void *args);
+
#endif /* __WORKQUEUE_WORKQUEUE_H */
--
2.31.1

2021-07-13 12:15:59

by Riccardo Mancini

[permalink] [raw]
Subject: [RFC PATCH 07/10] perf workqueue: implement worker thread and management

This patch adds the implementation of the worker thread that is executed
in the threadpool, and all management-related functions.

At startup, a worker registers itself with the workqueue by adding itself
to the idle_list, then it sends an ack back to the main thread. When
creating wotkers, the main thread will wait for the related acks.
Once there is work to do, threads are woken up to perform the work.
Threads will try to dequeue a new pending work before going to sleep.

This registering mechanism has been implemented to enable for dynamic
growth of the workqueue in the future.

Signed-off-by: Riccardo Mancini <[email protected]>
---
tools/perf/util/workqueue/workqueue.c | 244 +++++++++++++++++++++++++-
1 file changed, 242 insertions(+), 2 deletions(-)

diff --git a/tools/perf/util/workqueue/workqueue.c b/tools/perf/util/workqueue/workqueue.c
index 5099252a0662e788..5934b14b9ed3c0e1 100644
--- a/tools/perf/util/workqueue/workqueue.c
+++ b/tools/perf/util/workqueue/workqueue.c
@@ -7,8 +7,18 @@
#include <pthread.h>
#include <linux/list.h>
#include "debug.h"
+#include "sparse.h"
#include "workqueue.h"

+enum worker_msg {
+ WORKER_MSG__UNDEFINED,
+ WORKER_MSG__READY, /* from worker: ack */
+ WORKER_MSG__WAKE, /* to worker: wake up */
+ WORKER_MSG__STOP, /* to worker: exit */
+ WORKER_MSG__ERROR,
+ WORKER_MSG__MAX
+};
+
enum workqueue_status {
WORKQUEUE_STATUS__READY, /* wq is ready to receive work */
WORKQUEUE_STATUS__ERROR,
@@ -27,14 +37,217 @@ struct workqueue_struct {
enum workqueue_status status;
};

+struct worker {
+ int tidx; /* idx of thread in pool */
+ struct list_head entry; /* in idle or busy list */
+ struct work_struct *current_work; /* work being processed */
+ int msg_pipe[2]; /* main thread comm pipes*/
+};
+
+#define for_each_busy_worker(wq, m_worker) \
+ list_for_each_entry(m_worker, &wq->busy_list, entry)
+
+#define for_each_idle_worker(wq, m_worker) \
+ list_for_each_entry(m_worker, &wq->idle_list, entry)
+
+static inline int lock_workqueue(struct workqueue_struct *wq)
+__acquires(&wq->lock)
+{
+ __acquire(&wq->lock);
+ return pthread_mutex_lock(&wq->lock);
+}
+
+static inline int unlock_workqueue(struct workqueue_struct *wq)
+__releases(&wq->lock)
+{
+ __release(&wq->lock);
+ return pthread_mutex_unlock(&wq->lock);
+}
+
+/**
+ * available_work - check if @wq has work to do
+ */
+static int available_work(struct workqueue_struct *wq)
+__must_hold(&wq->lock)
+{
+ return !list_empty(&wq->pending);
+}
+
+/**
+ * dequeue_work - retrieve the next work in @wq to be executed by the worker
+ *
+ * Called inside worker.
+ */
+static struct work_struct *dequeue_work(struct workqueue_struct *wq)
+__must_hold(&wq->lock)
+{
+ struct work_struct *work = list_first_entry(&wq->pending, struct work_struct, entry);
+
+ list_del_init(&work->entry);
+ return work;
+}
+
+/**
+ * sleep_worker - worker @w of workqueue @wq goes to sleep
+ *
+ * Called inside worker.
+ * If this was the last idle thread, signal it to the main thread, in case it
+ * was flushing the workqueue.
+ */
+static void sleep_worker(struct workqueue_struct *wq, struct worker *w)
+__must_hold(&wq->lock)
+{
+ list_move(&w->entry, &wq->idle_list);
+ if (list_empty(&wq->busy_list))
+ pthread_cond_signal(&wq->idle_cond);
+}
+
+/**
+ * stop_worker - stop worker @w
+ *
+ * Called from main thread.
+ * Send stop message to worker @w.
+ */
+static int stop_worker(struct worker *w)
+{
+ int ret;
+ enum worker_msg msg;
+
+ msg = WORKER_MSG__STOP;
+ ret = write(w->msg_pipe[1], &msg, sizeof(msg));
+ if (ret < 0) {
+ pr_err("workqueue: error sending stop msg: %s\n",
+ strerror(errno));
+ return -1;
+ }
+
+ return 0;
+}
+
+/**
+ * init_worker - init @w struct
+ * @w: the struct to init
+ * @tidx: index of the executing thread inside the threadpool
+ */
+static int init_worker(struct worker *w, int tidx)
+{
+ if (pipe(w->msg_pipe)) {
+ pr_err("worker[%d]: error opening pipe: %s\n", tidx, strerror(errno));
+ return -1;
+ }
+
+ w->tidx = tidx;
+ w->current_work = NULL;
+ INIT_LIST_HEAD(&w->entry);
+
+ return 0;
+}
+
+/**
+ * fini_worker - deallocate resources used by @w struct
+ */
+static void fini_worker(struct worker *w)
+{
+ close(w->msg_pipe[0]);
+ w->msg_pipe[0] = -1;
+ close(w->msg_pipe[1]);
+ w->msg_pipe[1] = -1;
+}
+
+/**
+ * register_worker - add worker to @wq->idle_list
+ */
+static void register_worker(struct workqueue_struct *wq, struct worker *w)
+__must_hold(&wq->lock)
+{
+ list_move(&w->entry, &wq->idle_list);
+}
+
+/**
+ * unregister_worker - remove worker from @wq->idle_list
+ */
+static void unregister_worker(struct workqueue_struct *wq __maybe_unused,
+ struct worker *w)
+__must_hold(&wq->lock)
+{
+ list_del_init(&w->entry);
+}
+
/**
* worker_thread - worker function executed on threadpool
*/
static void worker_thread(int tidx, struct task_struct *task)
{
struct workqueue_struct *wq = container_of(task, struct workqueue_struct, task);
+ struct worker this_worker;
+ enum worker_msg msg;
+ int ret, init_err;
+
+ init_err = init_worker(&this_worker, tidx);
+ if (init_err) {
+ // send error message to main thread
+ msg = WORKER_MSG__ERROR;
+ } else {
+ lock_workqueue(wq);
+ register_worker(wq, &this_worker);
+ unlock_workqueue(wq);
+
+ // ack worker creation
+ msg = WORKER_MSG__READY;
+ }
+
+ ret = write(wq->msg_pipe[1], &msg, sizeof(msg));
+ if (ret < 0) {
+ pr_err("worker[%d]: error sending msg: %s\n",
+ tidx, strerror(errno));
+
+ if (init_err)
+ return;
+ goto out;
+ }

- pr_debug("hi from worker %d. Pool is in status %d\n", tidx, wq->status);
+ // stop if there have been errors in init
+ if (init_err)
+ return;
+
+ for (;;) {
+ msg = WORKER_MSG__UNDEFINED;
+ ret = read(this_worker.msg_pipe[0], &msg, sizeof(msg));
+ if (ret < 0 || (msg != WORKER_MSG__WAKE && msg != WORKER_MSG__STOP)) {
+ pr_err("worker[%d]: error receiving msg: %s\n",
+ tidx, strerror(errno));
+ break;
+ }
+
+ if (msg == WORKER_MSG__STOP)
+ break;
+
+ // main thread takes care of moving to busy list and assigning current_work
+
+ while (this_worker.current_work) {
+ this_worker.current_work->func(this_worker.current_work);
+
+ lock_workqueue(wq);
+ if (available_work(wq)) {
+ this_worker.current_work = dequeue_work(wq);
+ pr_debug("worker[%d]: dequeued work\n",
+ tidx);
+ } else {
+ this_worker.current_work = NULL;
+ sleep_worker(wq, &this_worker);
+ pr_debug("worker[%d]: going to sleep\n",
+ tidx);
+ }
+ unlock_workqueue(wq);
+ }
+ }
+
+out:
+ lock_workqueue(wq);
+ unregister_worker(wq, &this_worker);
+ unlock_workqueue(wq);
+
+ fini_worker(&this_worker);
}

/**
@@ -43,7 +256,8 @@ static void worker_thread(int tidx, struct task_struct *task)
static int attach_threadpool_to_workqueue(struct workqueue_struct *wq,
struct threadpool_struct *pool)
{
- int err;
+ int err, ret, t;
+ enum worker_msg msg;

if (!threadpool_is_ready(pool)) {
pr_err("workqueue: cannot attach to pool: pool is not ready\n");
@@ -56,6 +270,22 @@ static int attach_threadpool_to_workqueue(struct workqueue_struct *wq,
if (err)
return -1;

+
+ // wait ack from all threads
+ for (t = 0; t < threadpool_size(pool); t++) {
+ msg = WORKER_MSG__UNDEFINED;
+ ret = read(wq->msg_pipe[0], &msg, sizeof(msg));
+ if (ret < 0) {
+ pr_err("workqueue: error receiving ack: %s\n",
+ strerror(errno));
+ return -1;
+ }
+ if (msg != WORKER_MSG__READY) {
+ pr_err("workqueue: received error\n");
+ return -1;
+ }
+ }
+
return 0;
}

@@ -65,12 +295,22 @@ static int attach_threadpool_to_workqueue(struct workqueue_struct *wq,
static int detach_threadpool_from_workqueue(struct workqueue_struct *wq)
{
int ret, err = 0;
+ struct worker *w;

if (wq->status != WORKQUEUE_STATUS__READY) {
pr_err("workqueue: cannot attach to pool: wq is not ready\n");
return -1;
}

+ lock_workqueue(wq);
+ for_each_idle_worker(wq, w) {
+ ret = stop_worker(w);
+ if (ret)
+ err = -1;
+ }
+ unlock_workqueue(wq);
+
+
ret = wait_threadpool(wq->pool);
if (ret) {
pr_err("workqueue: error waiting threadpool\n");
--
2.31.1

2021-07-13 12:16:06

by Riccardo Mancini

[permalink] [raw]
Subject: [RFC PATCH 08/10] perf workqueue: add queue_work and flush_workqueue functions

This patch adds functions to queue and wait work_structs, and
related tests.

When a new work item is added, the workqueue first checks if there
are threads to wake up. If so, it wakes it up with the given work item,
otherwise it will just add it to the list pending work items. A thread
which completes a work item will check this list before going to sleep.

Signed-off-by: Riccardo Mancini <[email protected]>
---
tools/perf/tests/workqueue.c | 69 +++++++++++++++++++-
tools/perf/util/workqueue/workqueue.c | 93 +++++++++++++++++++++++++++
tools/perf/util/workqueue/workqueue.h | 7 ++
3 files changed, 168 insertions(+), 1 deletion(-)

diff --git a/tools/perf/tests/workqueue.c b/tools/perf/tests/workqueue.c
index 423dc8a92ca2563c..f71a839d5752d224 100644
--- a/tools/perf/tests/workqueue.c
+++ b/tools/perf/tests/workqueue.c
@@ -146,6 +146,27 @@ static int __test__threadpool(void *_args)
return 0;
}

+struct test_work {
+ struct work_struct work;
+ int i;
+ int *array;
+};
+
+static void test_work_fn1(struct work_struct *work)
+{
+ struct test_work *mwork = container_of(work, struct test_work, work);
+
+ dummy_work(mwork->i);
+ mwork->array[mwork->i] = mwork->i+1;
+}
+
+static void test_work_fn2(struct work_struct *work)
+{
+ struct test_work *mwork = container_of(work, struct test_work, work);
+
+ dummy_work(mwork->i);
+ mwork->array[mwork->i] = mwork->i*2;
+}

static int __workqueue__prepare(struct threadpool_struct **pool,
struct workqueue_struct **wq,
@@ -180,21 +201,67 @@ static int __workqueue__teardown(struct threadpool_struct *pool,
return 0;
}

+static int __workqueue__exec_wait(struct workqueue_struct *wq,
+ int *array, struct test_work *works,
+ work_func_t func, int n_work_items)
+{
+ int ret, i;
+
+ for (i = 0; i < n_work_items; i++) {
+ works[i].array = array;
+ works[i].i = i;
+
+ init_work(&works[i].work);
+ works[i].work.func = func;
+ queue_work(wq, &works[i].work);
+ }
+
+ ret = flush_workqueue(wq);
+ TEST_ASSERT_VAL("workqueue flush failure", ret == 0);
+
+ return 0;
+}
+
+
static int __test__workqueue(void *_args)
{
struct workqueue_test_args_t *args = _args;
struct threadpool_struct *pool;
struct workqueue_struct *wq;
- int ret;
+ int ret, i;
+ int *array;
+ struct test_work *works;
+
+ array = calloc(args->n_work_items, sizeof(*array));
+ works = calloc(args->n_work_items, sizeof(*works));

ret = __workqueue__prepare(&pool, &wq, args->pool_size);
if (ret)
return ret;

+ ret = __workqueue__exec_wait(wq, array, works, test_work_fn1,
+ args->n_work_items);
+ if (ret)
+ return ret;
+
+ for (i = 0; i < args->n_work_items; i++)
+ TEST_ASSERT_VAL("failed array check (1)", array[i] == i+1);
+
+ ret = __workqueue__exec_wait(wq, array, works, test_work_fn2,
+ args->n_work_items);
+ if (ret)
+ return ret;
+
+ for (i = 0; i < args->n_work_items; i++)
+ TEST_ASSERT_VAL("failed array check (2)", array[i] == 2*i);
+
ret = __workqueue__teardown(pool, wq);
if (ret)
return ret;

+ free(array);
+ free(works);
+
return 0;
}

diff --git a/tools/perf/util/workqueue/workqueue.c b/tools/perf/util/workqueue/workqueue.c
index 5934b14b9ed3c0e1..20d196de9500d369 100644
--- a/tools/perf/util/workqueue/workqueue.c
+++ b/tools/perf/util/workqueue/workqueue.c
@@ -21,6 +21,7 @@ enum worker_msg {

enum workqueue_status {
WORKQUEUE_STATUS__READY, /* wq is ready to receive work */
+ WORKQUEUE_STATUS__STOPPING, /* wq is being destructed */
WORKQUEUE_STATUS__ERROR,
WORKQUEUE_STATUS__MAX
};
@@ -102,6 +103,39 @@ __must_hold(&wq->lock)
pthread_cond_signal(&wq->idle_cond);
}

+/**
+ * wake_worker - wake worker @w of workqueue @wq assigning @work to do
+ *
+ * Called from main thread.
+ * Moves worker from idle to busy list, assigns @work to it and sends it a
+ * wake up message.
+ *
+ * NB: this function releases the lock to be able to send the notification
+ * outside the critical section.
+ */
+static int wake_worker(struct workqueue_struct *wq, struct worker *w,
+ struct work_struct *work)
+__must_hold(&wq->lock)
+__releases(&wq->lock)
+{
+ enum worker_msg msg = WORKER_MSG__WAKE;
+ int err;
+
+ list_move(&w->entry, &wq->busy_list);
+ w->current_work = work;
+ unlock_workqueue(wq);
+
+ // send wake msg outside critical section to reduce time spent inside it
+ err = write(w->msg_pipe[1], &msg, sizeof(msg));
+ if (err < 0) {
+ pr_err("wake_worker[%d]: error seding msg: %s\n",
+ w->tidx, strerror(errno));
+ return -1;
+ }
+
+ return 0;
+}
+
/**
* stop_worker - stop worker @w
*
@@ -302,6 +336,11 @@ static int detach_threadpool_from_workqueue(struct workqueue_struct *wq)
return -1;
}

+ wq->status = WORKQUEUE_STATUS__STOPPING;
+ ret = flush_workqueue(wq);
+ if (ret)
+ return -1;
+
lock_workqueue(wq);
for_each_idle_worker(wq, w) {
ret = stop_worker(w);
@@ -422,3 +461,57 @@ int workqueue_nr_threads(struct workqueue_struct *wq)
{
return threadpool_size(wq->pool);
}
+
+/**
+ * queue_work - add @work to @wq internal queue
+ *
+ * If there are idle threads, one of these will be woken up.
+ * Otherwise, the work is added to the pending list.
+ */
+int queue_work(struct workqueue_struct *wq, struct work_struct *work)
+{
+ int ret = 0;
+ struct worker *chosen_worker;
+
+ // in particular, this can fail if workqueue is marked to be stopping
+ if (wq->status != WORKQUEUE_STATUS__READY) {
+ pr_err("workqueue: trying to queue but workqueue is not ready\n");
+ return -1;
+ }
+
+ lock_workqueue(wq);
+ if (list_empty(&wq->idle_list)) {
+ list_add_tail(&work->entry, &wq->pending);
+ unlock_workqueue(wq);
+ pr_debug("workqueue: queued new work item\n");
+ } else {
+ chosen_worker = list_first_entry(&wq->idle_list, struct worker, entry);
+ ret = wake_worker(wq, chosen_worker, work);
+ pr_debug("workqueue: woke worker %d\n", chosen_worker->tidx);
+ }
+
+ return ret;
+}
+
+/**
+ * flush_workqueue - wait for all currently executed and pending work to finish
+ *
+ * This function blocks until all threads become idle.
+ */
+int flush_workqueue(struct workqueue_struct *wq)
+{
+ lock_workqueue(wq);
+ while (!list_empty(&wq->busy_list))
+ pthread_cond_wait(&wq->idle_cond, &wq->lock);
+ unlock_workqueue(wq);
+
+ return 0;
+}
+
+/**
+ * init_work - initialize the @work struct
+ */
+void init_work(struct work_struct *work)
+{
+ INIT_LIST_HEAD(&work->entry);
+}
diff --git a/tools/perf/util/workqueue/workqueue.h b/tools/perf/util/workqueue/workqueue.h
index 86ec1d69274f41db..719bd0e5fb0ce7b7 100644
--- a/tools/perf/util/workqueue/workqueue.h
+++ b/tools/perf/util/workqueue/workqueue.h
@@ -21,4 +21,11 @@ extern struct workqueue_struct *create_workqueue(struct threadpool_struct *pool)
extern int destroy_workqueue(struct workqueue_struct *wq);

extern int workqueue_nr_threads(struct workqueue_struct *wq);
+
+extern int queue_work(struct workqueue_struct *wq, struct work_struct *work);
+
+extern int flush_workqueue(struct workqueue_struct *wq);
+
+extern void init_work(struct work_struct *work);
+
#endif /* __WORKQUEUE_WORKQUEUE_H */
--
2.31.1

2021-07-13 19:18:05

by Arnaldo Carvalho de Melo

[permalink] [raw]
Subject: Re: [RFC PATCH 00/10] perf: add workqueue library and use it in synthetic-events

Em Tue, Jul 13, 2021 at 02:11:11PM +0200, Riccardo Mancini escreveu:
> This patchset introduces a new utility library inside perf/util, which
> provides a work queue abstraction, which loosely follows the Kernel
> workqueue API.
>
> The workqueue abstraction is made up by two components:
> - threadpool: which takes care of managing a pool of threads. It is
> inspired by the prototype for threaded trace in perf-record from Alexey:
> https://lore.kernel.org/lkml/[email protected]/
> - workqueue: manages a shared queue and provides the workers implementation.
>
> On top of the workqueue, a simple parallel-for utility is implemented
> which is then showcased in synthetic-events.c, replacing the previous
> manual pthread-created threads.
>
> Through some experiments with perf bench, I can see how the new
> workqueue has a higher overhead compared to manual creation of threads,
> but is able to more effectively partition work among threads, yielding
> a better result with more threads.
> Furthermore, the overhead could be configured by changing the
> `work_size` (currently 1), aka the number of dirents that are
> processed by a thread before grabbing a lock to get the new work item.
> I experimented with different sizes but, while bigger sizes reduce overhead
> as expected, they do not scale as well to more threads.
>
> I tried to keep the patchset as simple as possible, deferring possible
> improvements and features to future work.
> Naming a few:
> - in order to achieve a better performance, we could consider using
> work-stealing instead of a common queue.
> - affinities in the thread pool, as in Alexey prototype for
> perf-record. Doing so would enable reusing the same threadpool for
> different purposes (evlist open, threaded trace, synthetic threads),
> avoiding having to spin up threads multiple times.
> - resizable threadpool, e.g. for lazy spawining of threads.
>
> @Arnaldo
> Since I wanted the workqueue to provide a similar API to the Kernel's
> workqueue, I followed the naming style I found there, instead of the
> usual object__method style that is typically found in perf.
> Let me know if you'd like me to follow perf style instead.

You did the right thing, that is how we do with other kernel APIs, we
use list_add(), rb_first(), bitmap_weight(), hash_del(), etc.

- Arnaldo

> Thanks,
> Riccardo
>
> Riccardo Mancini (10):
> perf workqueue: threadpool creation and destruction
> perf tests: add test for workqueue
> perf workqueue: add threadpool start and stop functions
> perf workqueue: add threadpool execute and wait functions
> perf workqueue: add sparse annotation header
> perf workqueue: introduce workqueue struct
> perf workqueue: implement worker thread and management
> perf workqueue: add queue_work and flush_workqueue functions
> perf workqueue: add utility to execute a for loop in parallel
> perf synthetic-events: use workqueue parallel_for
>
> tools/perf/tests/Build | 1 +
> tools/perf/tests/builtin-test.c | 9 +
> tools/perf/tests/tests.h | 3 +
> tools/perf/tests/workqueue.c | 453 +++++++++++++++++
> tools/perf/util/Build | 1 +
> tools/perf/util/synthetic-events.c | 131 +++--
> tools/perf/util/workqueue/Build | 2 +
> tools/perf/util/workqueue/sparse.h | 21 +
> tools/perf/util/workqueue/threadpool.c | 516 ++++++++++++++++++++
> tools/perf/util/workqueue/threadpool.h | 29 ++
> tools/perf/util/workqueue/workqueue.c | 642 +++++++++++++++++++++++++
> tools/perf/util/workqueue/workqueue.h | 38 ++
> 12 files changed, 1771 insertions(+), 75 deletions(-)
> create mode 100644 tools/perf/tests/workqueue.c
> create mode 100644 tools/perf/util/workqueue/Build
> create mode 100644 tools/perf/util/workqueue/sparse.h
> create mode 100644 tools/perf/util/workqueue/threadpool.c
> create mode 100644 tools/perf/util/workqueue/threadpool.h
> create mode 100644 tools/perf/util/workqueue/workqueue.c
> create mode 100644 tools/perf/util/workqueue/workqueue.h
>
> --
> 2.31.1
>

--

- Arnaldo

2021-07-14 14:18:53

by Arnaldo Carvalho de Melo

[permalink] [raw]
Subject: Re: [RFC PATCH 01/10] perf workqueue: threadpool creation and destruction

Em Tue, Jul 13, 2021 at 02:11:12PM +0200, Riccardo Mancini escreveu:
> The workqueue library is made up by two components:
> - threadpool: handles the lifetime of the threads
> - workqueue: handles work distribution among the threads
>
> This first patch introduces the threadpool, starting from its creation
> and destruction functions.
> Thread management is based on the prototype from Alexey:
> https://lore.kernel.org/lkml/[email protected]/
>
> Each thread in the threadpool executes the same function (aka task)
> with a different argument tidx.
> Threads use a pair of pipes to communicate with the main process.
> The threadpool is static (all threads will be spawned at the same time).
> Future work could include making it resizable and adding affinity support
> (as in Alexey prototype).
>
> Suggested-by: Alexey Bayduraev <[email protected]>
> Signed-off-by: Riccardo Mancini <[email protected]>
> ---
> tools/perf/util/Build | 1 +
> tools/perf/util/workqueue/Build | 1 +
> tools/perf/util/workqueue/threadpool.c | 175 +++++++++++++++++++++++++
> tools/perf/util/workqueue/threadpool.h | 19 +++
> 4 files changed, 196 insertions(+)
> create mode 100644 tools/perf/util/workqueue/Build
> create mode 100644 tools/perf/util/workqueue/threadpool.c
> create mode 100644 tools/perf/util/workqueue/threadpool.h
>
> diff --git a/tools/perf/util/Build b/tools/perf/util/Build
> index 2d4fa13041789cd6..c7b09701661c869d 100644
> --- a/tools/perf/util/Build
> +++ b/tools/perf/util/Build
> @@ -180,6 +180,7 @@ perf-$(CONFIG_LIBBABELTRACE) += data-convert-bt.o
> perf-y += data-convert-json.o
>
> perf-y += scripting-engines/
> +perf-y += workqueue/
>
> perf-$(CONFIG_ZLIB) += zlib.o
> perf-$(CONFIG_LZMA) += lzma.o
> diff --git a/tools/perf/util/workqueue/Build b/tools/perf/util/workqueue/Build
> new file mode 100644
> index 0000000000000000..8b72a6cd4e2cba0d
> --- /dev/null
> +++ b/tools/perf/util/workqueue/Build
> @@ -0,0 +1 @@
> +perf-y += threadpool.o
> diff --git a/tools/perf/util/workqueue/threadpool.c b/tools/perf/util/workqueue/threadpool.c
> new file mode 100644
> index 0000000000000000..70c67569f956a3e2
> --- /dev/null
> +++ b/tools/perf/util/workqueue/threadpool.c
> @@ -0,0 +1,175 @@
> +// SPDX-License-Identifier: GPL-2.0
> +#include <stdlib.h>
> +#include <stdio.h>
> +#include <unistd.h>
> +#include <errno.h>
> +#include <string.h>
> +#include "debug.h"
> +#include "asm/bug.h"
> +#include "threadpool.h"
> +
> +enum threadpool_status {
> + THREADPOOL_STATUS__STOPPED, /* no threads */
> + THREADPOOL_STATUS__ERROR, /* errors */
> + THREADPOOL_STATUS__MAX
> +};
> +
> +struct threadpool_struct {

Can this be just 'struct threadpool'? I think its descriptive enough:

> + int nr_threads; /* number of threads in the pool */
> + struct thread_struct *threads; /* array of threads in the pool */
> + struct task_struct *current_task; /* current executing function */
> + enum threadpool_status status; /* current status of the pool */
> +};
> +
> +struct thread_struct {
> + int idx; /* idx of thread in pool->threads */
> + pid_t tid; /* tid of thread */
> + struct threadpool_struct *pool; /* parent threadpool */
> + struct {
> + int from[2]; /* messages from thread (acks) */
> + int to[2]; /* messages to thread (commands) */
> + } pipes;
> +};

This one, since we have already a 'struct thread' in tools/perf, to
represent a PERF_RECORD_FORK, perhaps we can call it 'struct threadpool_entry'?

> +
> +/**
> + * init_pipes - initialize all pipes of @thread
> + */
> +static void init_pipes(struct thread_struct *thread)
> +{
> + thread->pipes.from[0] = -1;
> + thread->pipes.from[1] = -1;
> + thread->pipes.to[0] = -1;
> + thread->pipes.to[1] = -1;
> +}
> +
> +/**
> + * open_pipes - open all pipes of @thread
> + */
> +static int open_pipes(struct thread_struct *thread)

Here please:

threadpool_entry__open_pipes()

Its longer, but helps with ctags/cscope navigation and we can go
directly to it via:

:ta threadpool_entry__open_p<TAB>

While 'ta: open_pipes' may bo to various places where this idiom is
used.

> +{
> + if (pipe(thread->pipes.from)) {
> + pr_err("threadpool: failed to create comm pipe 'from': %s\n",
> + strerror(errno));
> + return -ENOMEM;
> + }
> +
> + if (pipe(thread->pipes.to)) {
> + pr_err("threadpool: failed to create comm pipe 'to': %s\n",
> + strerror(errno));
> + close(thread->pipes.from[0]);
> + thread->pipes.from[0] = -1;
> + close(thread->pipes.from[1]);
> + thread->pipes.from[1] = -1;
> + return -ENOMEM;
> + }
> +
> + return 0;
> +}
> +
> +/**
> + * close_pipes - close all communication pipes of @thread
> + */
> +static void close_pipes(struct thread_struct *thread)
> +{
> + if (thread->pipes.from[0] != -1) {
> + close(thread->pipes.from[0]);
> + thread->pipes.from[0] = -1;
> + }
> + if (thread->pipes.from[1] != -1) {
> + close(thread->pipes.from[1]);
> + thread->pipes.from[1] = -1;
> + }
> + if (thread->pipes.to[0] != -1) {
> + close(thread->pipes.to[0]);
> + thread->pipes.to[0] = -1;
> + }
> + if (thread->pipes.to[1] != -1) {
> + close(thread->pipes.to[1]);
> + thread->pipes.to[1] = -1;
> + }
> +}
> +
> +/**
> + * create_threadpool - create a fixed threadpool with @n_threads threads
> + */
> +struct threadpool_struct *create_threadpool(int n_threads)


Is this already something the kernel has and thus we should keep the
naming? I couldn't find it in the kernel, so please name it:

struct threadpool *threadpool__new(int nthreads)

> +{
> + int ret, t;
> + struct threadpool_struct *pool = malloc(sizeof(*pool));
> +
> + if (!pool) {
> + pr_err("threadpool: cannot allocate pool: %s\n",
> + strerror(errno));o

Humm, pr_err() at this level isn't appropriate, please make callers
complain.

> + return NULL;
> + }
> +
> + if (n_threads <= 0) {
> + pr_err("threadpool: invalid number of threads: %d\n",
> + n_threads);

pr_debug()

> + goto out_free_pool;
> + }
> +
> + pool->nr_threads = n_threads;
> + pool->current_task = NULL;
> +
> + pool->threads = malloc(n_threads * sizeof(*pool->threads));
> + if (!pool->threads) {
> + pr_err("threadpool: cannot allocate threads: %s\n",
> + strerror(errno));
> + goto out_free_pool;
> + }
> +
> + for (t = 0; t < n_threads; t++) {
> + pool->threads[t].idx = t;
> + pool->threads[t].tid = -1;
> + pool->threads[t].pool = pool;
> + init_pipes(&pool->threads[t]);
> + }
> +
> + for (t = 0; t < n_threads; t++) {
> + ret = open_pipes(&pool->threads[t]);
> + if (ret)
> + goto out_close_pipes;
> + }
> +
> + pool->status = THREADPOOL_STATUS__STOPPED;
> +
> + return pool;
> +
> +out_close_pipes:
> + for (t = 0; t < n_threads; t++)
> + close_pipes(&pool->threads[t]);
> +
> + free(pool->threads);
> +out_free_pool:
> + free(pool);
> + return NULL;

Here we can use ERR_PTR()/PTR_ERR() to let the caller know what was the
problem, i.e. we can ditch all the pr_err/pr_debug(), etc and instead
have a threadpool__strerror(struct threadpool *pool, int err) like we
have for 'struct evsel', please take a look at evsel__open_strerror().


> +}
> +
> +/**
> + * destroy_threadpool - free the @pool and all its resources
> + */
> +void destroy_threadpool(struct threadpool_struct *pool)


void threadpool__delete(struct threadpool *pool)
> +{
> + int t;
> +
> + if (!pool)
> + return;
> +
> + WARN_ON(pool->status != THREADPOOL_STATUS__STOPPED
> + && pool->status != THREADPOOL_STATUS__ERROR);
> +
> + for (t = 0; t < pool->nr_threads; t++)
> + close_pipes(&pool->threads[t]);

reset pool->threads[t] to -1

> +
> + free(pool->threads);

zfree

> + free(pool);
> +}
> +
> +/**
> + * threadpool_size - get number of threads in the threadpool
> + */
> +int threadpool_size(struct threadpool_struct *pool)

threadpool__size()

> +{
> + return pool->nr_threads;
> +}
> diff --git a/tools/perf/util/workqueue/threadpool.h b/tools/perf/util/workqueue/threadpool.h
> new file mode 100644
> index 0000000000000000..2b9388c768a0b588
> --- /dev/null
> +++ b/tools/perf/util/workqueue/threadpool.h
> @@ -0,0 +1,19 @@
> +/* SPDX-License-Identifier: GPL-2.0 */
> +#ifndef __WORKQUEUE_THREADPOOL_H
> +#define __WORKQUEUE_THREADPOOL_H
> +
> +struct threadpool_struct;
> +struct task_struct;
> +
> +typedef void (*task_func_t)(int tidx, struct task_struct *task);
> +
> +struct task_struct {
> + task_func_t fn;
> +};
> +
> +extern struct threadpool_struct *create_threadpool(int n_threads);
> +extern void destroy_threadpool(struct threadpool_struct *pool);
> +
> +extern int threadpool_size(struct threadpool_struct *pool);
> +
> +#endif /* __WORKQUEUE_THREADPOOL_H */
> --
> 2.31.1
>

--

- Arnaldo

2021-07-14 15:12:00

by Arnaldo Carvalho de Melo

[permalink] [raw]
Subject: Re: [RFC PATCH 02/10] perf tests: add test for workqueue

Em Tue, Jul 13, 2021 at 02:11:13PM +0200, Riccardo Mancini escreveu:
> It will have subtests testing threadpool and workqueue separately.
> This patch only introduces the first subtest, checking that the
> threadpool is correctly created and destructed.
> This test will be expanded when new functions are added in next
> patches.
>
> Signed-off-by: Riccardo Mancini <[email protected]>
> ---
> tools/perf/tests/Build | 1 +
> tools/perf/tests/builtin-test.c | 9 +++
> tools/perf/tests/tests.h | 3 +
> tools/perf/tests/workqueue.c | 113 ++++++++++++++++++++++++++++++++
> 4 files changed, 126 insertions(+)
> create mode 100644 tools/perf/tests/workqueue.c
>
> diff --git a/tools/perf/tests/Build b/tools/perf/tests/Build
> index 650aec19d49052ca..eda6c78a37cfbc13 100644
> --- a/tools/perf/tests/Build
> +++ b/tools/perf/tests/Build
> @@ -64,6 +64,7 @@ perf-y += parse-metric.o
> perf-y += pe-file-parsing.o
> perf-y += expand-cgroup.o
> perf-y += perf-time-to-tsc.o
> +perf-y += workqueue.o
>
> $(OUTPUT)tests/llvm-src-base.c: tests/bpf-script-example.c tests/Build
> $(call rule_mkdir)
> diff --git a/tools/perf/tests/builtin-test.c b/tools/perf/tests/builtin-test.c
> index 5e6242576236325c..2ff5d38ed83a723d 100644
> --- a/tools/perf/tests/builtin-test.c
> +++ b/tools/perf/tests/builtin-test.c
> @@ -360,6 +360,15 @@ static struct test generic_tests[] = {
> .func = test__perf_time_to_tsc,
> .is_supported = test__tsc_is_supported,
> },
> + {
> + .desc = "Test workqueue lib",
> + .func = test__workqueue,
> + .subtest = {
> + .skip_if_fail = false,
> + .get_nr = test__workqueue_subtest_get_nr,
> + .get_desc = test__workqueue_subtest_get_desc,
> + }
> + },
> {
> .func = NULL,
> },
> diff --git a/tools/perf/tests/tests.h b/tools/perf/tests/tests.h
> index 1100dd55b657b779..9ca67113a7402463 100644
> --- a/tools/perf/tests/tests.h
> +++ b/tools/perf/tests/tests.h
> @@ -127,6 +127,9 @@ int test__parse_metric(struct test *test, int subtest);
> int test__pe_file_parsing(struct test *test, int subtest);
> int test__expand_cgroup_events(struct test *test, int subtest);
> int test__perf_time_to_tsc(struct test *test, int subtest);
> +int test__workqueue(struct test *test, int subtest);
> +const char *test__workqueue_subtest_get_desc(int subtest);
> +int test__workqueue_subtest_get_nr(void);
>
> bool test__bp_signal_is_supported(void);
> bool test__bp_account_is_supported(void);
> diff --git a/tools/perf/tests/workqueue.c b/tools/perf/tests/workqueue.c
> new file mode 100644
> index 0000000000000000..1bd4d78c13eb3b14
> --- /dev/null
> +++ b/tools/perf/tests/workqueue.c
> @@ -0,0 +1,113 @@
> +// SPDX-License-Identifier: GPL-2.0
> +#include <linux/kernel.h>
> +#include "tests.h"
> +#include "util/debug.h"
> +#include "util/workqueue/threadpool.h"
> +
> +struct threadpool_test_args_t {
> + int pool_size;
> +};
> +
> +static int __threadpool__prepare(struct threadpool_struct **pool, int pool_size)
> +{
> + *pool = create_threadpool(pool_size);
> + TEST_ASSERT_VAL("threadpool creation failure", *pool != NULL);
> + TEST_ASSERT_VAL("threadpool size is wrong",
> + threadpool_size(*pool) == pool_size);
> +
> + return 0;
> +}
> +
> +static int __threadpool__teardown(struct threadpool_struct *pool)
> +{
> + destroy_threadpool(pool);
> +
> + return 0;
> +}
> +
> +
> +static int __test__threadpool(void *_args)
> +{
> + struct threadpool_test_args_t *args = _args;
> + struct threadpool_struct *pool;
> + int ret;
> +
> + ret = __threadpool__prepare(&pool, args->pool_size);

Turn the last three lines into one;

int ret = __threadpool__prepare(&pool, args->pool_size);

> + if (ret)
> + return ret;
> +
> + ret = __threadpool__teardown(pool);
> + if (ret)
> + return ret;
> +
> + return 0;

Humm, will you add something here in the following csets? Otherwise turn
these 5 lines into one:

return __threadpool__teardown(pool);

> +}
> +
> +static const struct threadpool_test_args_t threadpool_test_args[] = {
> + {
> + .pool_size = 1
> + },
> + {
> + .pool_size = 2
> + },
> + {
> + .pool_size = 4
> + },
> + {
> + .pool_size = 8
> + },
> + {
> + .pool_size = 16
> + }
> +};
> +
> +struct test_case {
> + const char *desc;
> + int (*func)(void *args);
> + void *args;
> + int n_args;
> + int arg_size;
> +};
> +
> +static struct test_case workqueue_testcase_table[] = {
> + {
> + .desc = "Threadpool",
> + .func = __test__threadpool,
> + .args = (void *) threadpool_test_args,
> + .n_args = (int)ARRAY_SIZE(threadpool_test_args),
> + .arg_size = sizeof(struct threadpool_test_args_t)
> + }
> +};
> +
> +
> +int test__workqueue(struct test *test __maybe_unused, int i)
> +{
> + int j, ret = 0;
> + struct test_case *tc;
> +
> + if (i < 0 || i >= (int)ARRAY_SIZE(workqueue_testcase_table))
> + return -1;
> +
> + tc = &workqueue_testcase_table[i];
> +
> + for (j = 0; j < tc->n_args; j++) {
> + ret = tc->func(tc->args + (j*tc->arg_size));
> + if (ret)
> + return ret;
> + }
> +
> + return 0;
> +}
> +
> +
> +int test__workqueue_subtest_get_nr(void)
> +{
> + return (int)ARRAY_SIZE(workqueue_testcase_table);
> +}
> +
> +const char *test__workqueue_subtest_get_desc(int i)
> +{
> + if (i < 0 || i >= (int)ARRAY_SIZE(workqueue_testcase_table))
> + return NULL;
> + return workqueue_testcase_table[i].desc;
> +}
> --
> 2.31.1
>

--

- Arnaldo

2021-07-14 15:17:09

by Arnaldo Carvalho de Melo

[permalink] [raw]
Subject: Re: [RFC PATCH 03/10] perf workqueue: add threadpool start and stop functions

Em Tue, Jul 13, 2021 at 02:11:14PM +0200, Riccardo Mancini escreveu:
> This patch adds the start and stop functions, alongside the thread
> function.
> Each thread will run until a stop signal is received.
> Furthermore, start and stop are added to the test.
>
> Thread management is based on the prototype from Alexey:
> https://lore.kernel.org/lkml/[email protected]/
>
> Suggested-by: Alexey Bayduraev <[email protected]>
> Signed-off-by: Riccardo Mancini <[email protected]>
> ---
> tools/perf/tests/workqueue.c | 13 ++
> tools/perf/util/workqueue/threadpool.c | 238 +++++++++++++++++++++++++
> tools/perf/util/workqueue/threadpool.h | 5 +
> 3 files changed, 256 insertions(+)
>
> diff --git a/tools/perf/tests/workqueue.c b/tools/perf/tests/workqueue.c
> index 1bd4d78c13eb3b14..be377e9897bab4e9 100644
> --- a/tools/perf/tests/workqueue.c
> +++ b/tools/perf/tests/workqueue.c
> @@ -10,16 +10,29 @@ struct threadpool_test_args_t {
>
> static int __threadpool__prepare(struct threadpool_struct **pool, int pool_size)
> {
> + int ret;
> +
> *pool = create_threadpool(pool_size);
> TEST_ASSERT_VAL("threadpool creation failure", *pool != NULL);
> TEST_ASSERT_VAL("threadpool size is wrong",
> threadpool_size(*pool) == pool_size);
>
> + ret = start_threadpool(*pool);
> + TEST_ASSERT_VAL("threadpool start failure", ret == 0);
> + TEST_ASSERT_VAL("threadpool is not ready", threadpool_is_ready(*pool));
> +
> return 0;
> }
>
> static int __threadpool__teardown(struct threadpool_struct *pool)
> {
> + int ret;
> +
> + ret = stop_threadpool(pool);

int ret = stop_threadpool(pool);

> + TEST_ASSERT_VAL("threadpool start failure", ret == 0);
> + TEST_ASSERT_VAL("stopped threadpool is ready",
> + !threadpool_is_ready(pool));
> +
> destroy_threadpool(pool);
>
> return 0;
> diff --git a/tools/perf/util/workqueue/threadpool.c b/tools/perf/util/workqueue/threadpool.c
> index 70c67569f956a3e2..f4635ff782b9388e 100644
> --- a/tools/perf/util/workqueue/threadpool.c
> +++ b/tools/perf/util/workqueue/threadpool.c
> @@ -4,12 +4,23 @@
> #include <unistd.h>
> #include <errno.h>
> #include <string.h>
> +#include <pthread.h>
> +#include <signal.h>
> +#include <syscall.h>
> #include "debug.h"
> #include "asm/bug.h"
> #include "threadpool.h"
>
> +#ifndef HAVE_GETTID
> +static inline pid_t gettid(void)
> +{
> + return (pid_t)syscall(__NR_gettid);
> +}
> +#endif

Isn't this defined elsewhere? Yeah, when we decide to move it to
tools/lib/workqueue/ we'll need it, but for now, reduce patch size.

> enum threadpool_status {
> THREADPOOL_STATUS__STOPPED, /* no threads */
> + THREADPOOL_STATUS__READY, /* threads are ready but idle */
> THREADPOOL_STATUS__ERROR, /* errors */
> THREADPOOL_STATUS__MAX
> };
> @@ -31,6 +42,21 @@ struct thread_struct {
> } pipes;
> };
>
> +enum thread_msg {
> + THREAD_MSG__UNDEFINED = 0,
> + THREAD_MSG__ACK, /* from th: create and exit ack */
> + THREAD_MSG__WAKE, /* to th: wake up */
> + THREAD_MSG__STOP, /* to th: exit */
> + THREAD_MSG__MAX
> +};
> +
> +static const char * const thread_msg_tags[] = {
> + "undefined",
> + "ack",
> + "wake",
> + "stop"
> +};
> +
> /**
> * init_pipes - initialize all pipes of @thread
> */
> @@ -89,6 +115,113 @@ static void close_pipes(struct thread_struct *thread)
> }
> }
>
> +/**
> + * wait_thread - receive ack from thread
> + *
> + * NB: call only from main thread!
> + */
> +static int wait_thread(struct thread_struct *thread)
> +{
> + int res;
> + enum thread_msg msg = THREAD_MSG__UNDEFINED;
> +
> + res = read(thread->pipes.from[0], &msg, sizeof(msg));

int res = read(thread->pipes.from[0], &msg, sizeof(msg));

> + if (res < 0) {
> + pr_err("threadpool: failed to recv msg from tid=%d: %s\n",
> + thread->tid, strerror(errno));
> + return -1;
> + }
> + if (msg != THREAD_MSG__ACK) {
> + pr_err("threadpool: received unexpected msg from tid=%d: %s\n",
> + thread->tid, thread_msg_tags[msg]);
> + return -1;
> + }
> +
> + pr_debug2("threadpool: received ack from tid=%d\n", thread->tid);
> +
> + return 0;
> +}
> +
> +/**
> + * terminate_thread - send stop signal to thread and wait for ack
> + *
> + * NB: call only from main thread!
> + */
> +static int terminate_thread(struct thread_struct *thread)
> +{
> + int res;
> + enum thread_msg msg = THREAD_MSG__STOP;
> +
> + res = write(thread->pipes.to[1], &msg, sizeof(msg));
> + if (res < 0) {
> + pr_err("threadpool: error sending stop msg to tid=%d: %s\n",
> + thread->tid, strerror(errno));
> + return res;
> + }
> +
> + res = wait_thread(thread);
> +
> + return res;
> +}
> +
> +/**
> + * threadpool_thread - function running on thread
> + *
> + * This function waits for a signal from main thread to start executing
> + * a task.
> + * On completion, it will go back to sleep, waiting for another signal.
> + * Signals are delivered through pipes.
> + */
> +static void *threadpool_thread(void *args)

threadpool_function()

ETOMANY 'thread' in a name.

> +{
> + struct thread_struct *thread = (struct thread_struct *) args;
> + enum thread_msg msg;
> + int err;
> +
> + thread->tid = gettid();
> +
> + pr_debug2("threadpool[%d]: started\n", thread->tid);
> +
> + for (;;) {
> + msg = THREAD_MSG__ACK;
> + err = write(thread->pipes.from[1], &msg, sizeof(msg));
> + if (err == -1) {
> + pr_err("threadpool[%d]: failed to send ack: %s\n",
> + thread->tid, strerror(errno));
> + break;
> + }
> +
> + msg = THREAD_MSG__UNDEFINED;
> + err = read(thread->pipes.to[0], &msg, sizeof(msg));
> + if (err < 0) {
> + pr_err("threadpool[%d]: error receiving msg: %s\n",
> + thread->tid, strerror(errno));
> + break;
> + }
> +
> + if (msg != THREAD_MSG__WAKE && msg != THREAD_MSG__STOP) {
> + pr_err("threadpool[%d]: received unexpected msg: %s\n",
> + thread->tid, thread_msg_tags[msg]);
> + break;
> + }
> +
> + if (msg == THREAD_MSG__STOP)
> + break;
> + }
> +
> + pr_debug2("threadpool[%d]: exit\n", thread->tid);
> +
> + msg = THREAD_MSG__ACK;
> + err = write(thread->pipes.from[1], &msg, sizeof(msg));
> + if (err == -1) {
> + pr_err("threadpool[%d]: failed to send ack: %s\n",
> + thread->tid, strerror(errno));
> + return NULL;
> + }
> +
> + return NULL;
> +}
> +
> /**
> * create_threadpool - create a fixed threadpool with @n_threads threads
> */
> @@ -173,3 +306,108 @@ int threadpool_size(struct threadpool_struct *pool)
> {
> return pool->nr_threads;
> }
> +
> +/**
> + * __start_threadpool - start all threads in the pool.
> + *
> + * This function does not change @pool->status.
> + */
> +static int __start_threadpool(struct threadpool_struct *pool)
> +{
> + int t, tt, ret = 0, nr_threads = pool->nr_threads;
> + sigset_t full, mask;
> + pthread_t handle;
> + pthread_attr_t attrs;
> +
> + sigfillset(&full);
> + if (sigprocmask(SIG_SETMASK, &full, &mask)) {
> + pr_err("Failed to block signals on threads start: %s\n",
> + strerror(errno));
> + return -1;
> + }
> +
> + pthread_attr_init(&attrs);
> + pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
> +
> + for (t = 0; t < nr_threads; t++) {
> + struct thread_struct *thread = &pool->threads[t];
> +
> + if (pthread_create(&handle, &attrs, threadpool_thread, thread)) {
> + for (tt = 1; tt < t; tt++)
> + terminate_thread(thread);
> + pr_err("Failed to start threads: %s\n", strerror(errno));
> + ret = -1;
> + goto out_free_attr;
> + }
> +
> + if (wait_thread(thread)) {
> + for (tt = 1; tt <= t; tt++)
> + terminate_thread(thread);
> + ret = -1;
> + goto out_free_attr;
> + }
> + }
> +
> +out_free_attr:
> + pthread_attr_destroy(&attrs);
> +
> + if (sigprocmask(SIG_SETMASK, &mask, NULL)) {
> + pr_err("Failed to unblock signals on threads start: %s\n",
> + strerror(errno));
> + ret = -1;
> + }
> +
> + return ret;
> +}
> +
> +/**
> + * start_threadpool - start all threads in the pool.
> + *
> + * The function blocks until all threads are up and running.
> + */
> +int start_threadpool(struct threadpool_struct *pool)

int threadpool__start(struct threadpool *pool)

> +{
> + int err;
> +
> + if (pool->status != THREADPOOL_STATUS__STOPPED) {
> + pr_err("threadpool: starting not stopped pool\n");
> + return -1;
> + }
> +
> + err = __start_threadpool(pool);
> + pool->status = err ? THREADPOOL_STATUS__ERROR : THREADPOOL_STATUS__READY;
> + return err;
> +}
> +
> +/**
> + * stop_threadpool - stop all threads in the pool.
> + *
> + * This function blocks waiting for ack from all threads.
> + */
> +int stop_threadpool(struct threadpool_struct *pool)

int threadpool__stop(struct threadpool *pool)

> +{
> + int t, ret, err = 0;
> +
> + if (pool->status != THREADPOOL_STATUS__READY) {
> + pr_err("threadpool: stopping not ready pool\n");
> + return -1;
> + }
> +
> + for (t = 0; t < pool->nr_threads; t++) {
> + ret = terminate_thread(&pool->threads[t]);
> + if (ret && !err)
> + err = -1;
> + }
> +
> + pool->status = err ? THREADPOOL_STATUS__ERROR : THREADPOOL_STATUS__STOPPED;
> +
> + return err;
> +}
> +
> +/**
> + * threadpool_is_ready - check if the threads are running
> + */
> +bool threadpool_is_ready(struct threadpool_struct *pool)

bool threadpool__is_ready(struct threadpool *pool)

> +{
> + return pool->status == THREADPOOL_STATUS__READY;
> +}
> diff --git a/tools/perf/util/workqueue/threadpool.h b/tools/perf/util/workqueue/threadpool.h
> index 2b9388c768a0b588..b62cad2b2c5dd331 100644
> --- a/tools/perf/util/workqueue/threadpool.h
> +++ b/tools/perf/util/workqueue/threadpool.h
> @@ -14,6 +14,11 @@ struct task_struct {
> extern struct threadpool_struct *create_threadpool(int n_threads);
> extern void destroy_threadpool(struct threadpool_struct *pool);
>
> +extern int start_threadpool(struct threadpool_struct *pool);
> +extern int stop_threadpool(struct threadpool_struct *pool);
> +
> extern int threadpool_size(struct threadpool_struct *pool);
>
> +extern bool threadpool_is_ready(struct threadpool_struct *pool);
> +
> #endif /* __WORKQUEUE_THREADPOOL_H */
> --
> 2.31.1
>

--

- Arnaldo

2021-07-14 15:24:00

by Arnaldo Carvalho de Melo

[permalink] [raw]
Subject: Re: [RFC PATCH 06/10] perf workqueue: introduce workqueue struct

Em Tue, Jul 13, 2021 at 02:11:17PM +0200, Riccardo Mancini escreveu:
> This patch adds the workqueue definition, along with simple creation and
> destruction functions.
> Furthermore, a simple subtest is added.
>
> A workqueue is attached to a pool, on which it executes its workers.
> Next patches will introduce workers.
>
> Signed-off-by: Riccardo Mancini <[email protected]>
> ---
> tools/perf/tests/workqueue.c | 92 +++++++++++++
> tools/perf/util/workqueue/Build | 1 +
> tools/perf/util/workqueue/workqueue.c | 184 ++++++++++++++++++++++++++
> tools/perf/util/workqueue/workqueue.h | 24 ++++
> 4 files changed, 301 insertions(+)
> create mode 100644 tools/perf/util/workqueue/workqueue.c
> create mode 100644 tools/perf/util/workqueue/workqueue.h
>
> diff --git a/tools/perf/tests/workqueue.c b/tools/perf/tests/workqueue.c
> index 3c64db8203556847..423dc8a92ca2563c 100644
> --- a/tools/perf/tests/workqueue.c
> +++ b/tools/perf/tests/workqueue.c
> @@ -5,6 +5,7 @@
> #include "tests.h"
> #include "util/debug.h"
> #include "util/workqueue/threadpool.h"
> +#include "util/workqueue/workqueue.h"
>
> #define DUMMY_FACTOR 100000
> #define N_DUMMY_WORK_SIZES 7
> @@ -13,6 +14,11 @@ struct threadpool_test_args_t {
> int pool_size;
> };
>
> +struct workqueue_test_args_t {
> + int pool_size;
> + int n_work_items;
> +};
> +
> struct test_task {
> struct task_struct task;
> int n_threads;
> @@ -140,6 +146,58 @@ static int __test__threadpool(void *_args)
> return 0;
> }
>
> +
> +static int __workqueue__prepare(struct threadpool_struct **pool,
> + struct workqueue_struct **wq,
> + int pool_size)
> +{
> + int ret;
> +
> + ret = __threadpool__prepare(pool, pool_size);
> + if (ret)
> + return ret;
> +
> + *wq = create_workqueue(*pool);
> + TEST_ASSERT_VAL("workqueue creation failure", *wq);
> + TEST_ASSERT_VAL("workqueue wrong size", workqueue_nr_threads(*wq) == pool_size);
> + TEST_ASSERT_VAL("threadpool is not executing", threadpool_is_busy(*pool));
> +
> + return 0;
> +}
> +
> +static int __workqueue__teardown(struct threadpool_struct *pool,
> + struct workqueue_struct *wq)
> +{
> + int ret;
> +
> + ret = destroy_workqueue(wq);
> + TEST_ASSERT_VAL("workqueue detruction failure", ret == 0);
> +
> + ret = __threadpool__teardown(pool);
> + if (ret)
> + return ret;
> +
> + return 0;
> +}
> +
> +static int __test__workqueue(void *_args)
> +{
> + struct workqueue_test_args_t *args = _args;
> + struct threadpool_struct *pool;
> + struct workqueue_struct *wq;
> + int ret;
> +
> + ret = __workqueue__prepare(&pool, &wq, args->pool_size);
> + if (ret)
> + return ret;
> +
> + ret = __workqueue__teardown(pool, wq);
> + if (ret)
> + return ret;
> +
> + return 0;
> +}
> +
> static const struct threadpool_test_args_t threadpool_test_args[] = {
> {
> .pool_size = 1
> @@ -158,6 +216,33 @@ static const struct threadpool_test_args_t threadpool_test_args[] = {
> }
> };
>
> +static const struct workqueue_test_args_t workqueue_test_args[] = {
> + {
> + .pool_size = 1,
> + .n_work_items = 1
> + },
> + {
> + .pool_size = 1,
> + .n_work_items = 10
> + },
> + {
> + .pool_size = 2,
> + .n_work_items = 1
> + },
> + {
> + .pool_size = 2,
> + .n_work_items = 100
> + },
> + {
> + .pool_size = 16,
> + .n_work_items = 7
> + },
> + {
> + .pool_size = 16,
> + .n_work_items = 2789
> + }
> +};
> +
> struct test_case {
> const char *desc;
> int (*func)(void *args);
> @@ -173,6 +258,13 @@ static struct test_case workqueue_testcase_table[] = {
> .args = (void *) threadpool_test_args,
> .n_args = (int)ARRAY_SIZE(threadpool_test_args),
> .arg_size = sizeof(struct threadpool_test_args_t)
> + },
> + {
> + .desc = "Workqueue",
> + .func = __test__workqueue,
> + .args = (void *) workqueue_test_args,
> + .n_args = (int)ARRAY_SIZE(workqueue_test_args),
> + .arg_size = sizeof(struct workqueue_test_args_t)
> }
> };
>
> diff --git a/tools/perf/util/workqueue/Build b/tools/perf/util/workqueue/Build
> index 8b72a6cd4e2cba0d..4af721345c0a6bb7 100644
> --- a/tools/perf/util/workqueue/Build
> +++ b/tools/perf/util/workqueue/Build
> @@ -1 +1,2 @@
> perf-y += threadpool.o
> +perf-y += workqueue.o
> diff --git a/tools/perf/util/workqueue/workqueue.c b/tools/perf/util/workqueue/workqueue.c
> new file mode 100644
> index 0000000000000000..5099252a0662e788
> --- /dev/null
> +++ b/tools/perf/util/workqueue/workqueue.c
> @@ -0,0 +1,184 @@
> +// SPDX-License-Identifier: GPL-2.0
> +#include <stdlib.h>
> +#include <stdio.h>
> +#include <unistd.h>
> +#include <errno.h>
> +#include <string.h>
> +#include <pthread.h>
> +#include <linux/list.h>
> +#include "debug.h"
> +#include "workqueue.h"
> +
> +enum workqueue_status {
> + WORKQUEUE_STATUS__READY, /* wq is ready to receive work */
> + WORKQUEUE_STATUS__ERROR,
> + WORKQUEUE_STATUS__MAX
> +};
> +
> +struct workqueue_struct {
> + pthread_mutex_t lock; /* locking of the thread_pool */
> + pthread_cond_t idle_cond; /* all workers are idle cond */
> + struct threadpool_struct *pool; /* underlying pool */
> + struct task_struct task; /* threadpool task */
> + struct list_head busy_list; /* busy workers */
> + struct list_head idle_list; /* idle workers */
> + struct list_head pending; /* pending work items */
> + int msg_pipe[2]; /* main thread comm pipes */
> + enum workqueue_status status;
> +};
> +
> +/**
> + * worker_thread - worker function executed on threadpool
> + */
> +static void worker_thread(int tidx, struct task_struct *task)
> +{
> + struct workqueue_struct *wq = container_of(task, struct workqueue_struct, task);
> +
> + pr_debug("hi from worker %d. Pool is in status %d\n", tidx, wq->status);
> +}
> +
> +/**
> + * attach_threadpool_to_workqueue - start @wq workers on @pool
> + */
> +static int attach_threadpool_to_workqueue(struct workqueue_struct *wq,
> + struct threadpool_struct *pool)
> +{
> + int err;
> +
> + if (!threadpool_is_ready(pool)) {
> + pr_err("workqueue: cannot attach to pool: pool is not ready\n");
> + return -1;
> + }
> +
> + wq->pool = pool;
> +
> + err = execute_in_threadpool(pool, &wq->task);
> + if (err)
> + return -1;
> +
> + return 0;
> +}
> +
> +/**
> + * detach_threadpool_from_workqueue - stop @wq workers on @pool
> + */
> +static int detach_threadpool_from_workqueue(struct workqueue_struct *wq)
> +{
> + int ret, err = 0;
> +
> + if (wq->status != WORKQUEUE_STATUS__READY) {
> + pr_err("workqueue: cannot attach to pool: wq is not ready\n");
> + return -1;
> + }
> +
> + ret = wait_threadpool(wq->pool);
> + if (ret) {
> + pr_err("workqueue: error waiting threadpool\n");
> + err = -1;
> + }
> +
> + wq->pool = NULL;
> + return err;
> +}
> +
> +/**
> + * create_workqueue - create a workqueue associated to @pool
> + *
> + * Only one workqueue can execute on a pool at a time.
> + */
> +struct workqueue_struct *create_workqueue(struct threadpool_struct *pool)

I wonder if we should use the exact same kernel signature and not pass a
threadpool, essentially having just one threadpool in tools/perf/ that
is used by create_workqueue(void)?

> +{
> + int err;
> + struct workqueue_struct *wq = malloc(sizeof(struct workqueue_struct));
> +
> +
> + err = pthread_mutex_init(&wq->lock, NULL);
> + if (err)
> + goto out_free_wq;
> +
> + err = pthread_cond_init(&wq->idle_cond, NULL);
> + if (err)
> + goto out_destroy_mutex;
> +
> + wq->pool = NULL;
> + INIT_LIST_HEAD(&wq->busy_list);
> + INIT_LIST_HEAD(&wq->idle_list);
> +
> + INIT_LIST_HEAD(&wq->pending);
> +
> + err = pipe(wq->msg_pipe);
> + if (err)
> + goto out_destroy_cond;
> +
> + wq->task.fn = worker_thread;
> +
> + err = attach_threadpool_to_workqueue(wq, pool);
> + if (err)
> + goto out_destroy_cond;
> +
> + wq->status = WORKQUEUE_STATUS__READY;
> +
> + return wq;
> +
> +out_destroy_cond:
> + pthread_cond_destroy(&wq->idle_cond);
> +out_destroy_mutex:
> + pthread_mutex_destroy(&wq->lock);
> +out_free_wq:
> + free(wq);
> + return NULL;
> +}
> +
> +/**
> + * destroy_workqueue - stop @wq workers and destroy @wq
> + */
> +int destroy_workqueue(struct workqueue_struct *wq)
> +{
> + int err = 0, ret;
> +
> + ret = detach_threadpool_from_workqueue(wq);
> + if (ret) {
> + pr_err("workqueue: error detaching from threadpool.\n");
> + err = -1;
> + }
> +
> + ret = pthread_mutex_destroy(&wq->lock);
> + if (ret) {
> + err = -1;
> + pr_err("workqueue: error pthread_mutex_destroy: %s\n",
> + strerror(errno));
> + }
> +
> + ret = pthread_cond_destroy(&wq->idle_cond);
> + if (ret) {
> + err = -1;
> + pr_err("workqueue: error pthread_cond_destroy: %s\n",
> + strerror(errno));
> + }
> +
> + ret = close(wq->msg_pipe[0]);
> + if (ret) {
> + err = -1;
> + pr_err("workqueue: error close msg_pipe[0]: %s\n",
> + strerror(errno));
> + }
> +
> + ret = close(wq->msg_pipe[1]);
> + if (ret) {
> + err = -1;
> + pr_err("workqueue: error close msg_pipe[1]: %s\n",
> + strerror(errno));
> + }
> +
> + free(wq);
> +
> + return err;
> +}
> +
> +/**
> + * workqueue_nr_threads - get size of threadpool underlying @wq
> + */
> +int workqueue_nr_threads(struct workqueue_struct *wq)
> +{
> + return threadpool_size(wq->pool);
> +}
> diff --git a/tools/perf/util/workqueue/workqueue.h b/tools/perf/util/workqueue/workqueue.h
> new file mode 100644
> index 0000000000000000..86ec1d69274f41db
> --- /dev/null
> +++ b/tools/perf/util/workqueue/workqueue.h
> @@ -0,0 +1,24 @@
> +/* SPDX-License-Identifier: GPL-2.0 */
> +#ifndef __WORKQUEUE_WORKQUEUE_H
> +#define __WORKQUEUE_WORKQUEUE_H
> +
> +#include <stdlib.h>
> +#include <sys/types.h>
> +#include <linux/list.h>
> +#include "threadpool.h"
> +
> +struct work_struct;
> +typedef void (*work_func_t)(struct work_struct *work);
> +
> +struct work_struct {
> + struct list_head entry;
> + work_func_t func;
> +};
> +
> +struct workqueue_struct;
> +
> +extern struct workqueue_struct *create_workqueue(struct threadpool_struct *pool);
> +extern int destroy_workqueue(struct workqueue_struct *wq);
> +
> +extern int workqueue_nr_threads(struct workqueue_struct *wq);
> +#endif /* __WORKQUEUE_WORKQUEUE_H */
> --
> 2.31.1
>

--

- Arnaldo

2021-07-15 17:00:44

by Riccardo Mancini

[permalink] [raw]
Subject: Re: [RFC PATCH 01/10] perf workqueue: threadpool creation and destruction

Hi Arnaldo,

thanks for reviewing the patch!

On Wed, 2021-07-14 at 11:16 -0300, Arnaldo Carvalho de Melo wrote:
<SNIP>
> > +
> > +enum threadpool_status {
> > +       THREADPOOL_STATUS__STOPPED,             /* no threads */
> > +       THREADPOOL_STATUS__ERROR,               /* errors */
> > +       THREADPOOL_STATUS__MAX
> > +};
> > +
> > +struct threadpool_struct {
>
> Can this be just 'struct threadpool'? I think its descriptive enough:

I agree, but I wanted to keep the naming consistent between workqueue.c and
threadpool.c.

>
> > +       int                     nr_threads;     /* number of threads in the
> > pool */
> > +       struct thread_struct    *threads;       /* array of threads in the
> > pool */
> > +       struct task_struct      *current_task;  /* current executing
> > function
> > */
> > +       enum threadpool_status  status;         /* current status of the
> > pool
> > */
> > +};
> > +
> > +struct thread_struct {
> > +       int                             idx;    /* idx of thread in pool-
> > > threads */
> > +       pid_t                           tid;    /* tid of thread */
> > +       struct threadpool_struct        *pool;  /* parent threadpool */
> > +       struct {
> > +               int from[2];                    /* messages from thread
> > (acks)
> > */
> > +               int to[2];                      /* messages to thread
> > (commands) */
> > +       } pipes;
> > +};
>
> This one, since we have already a 'struct thread' in tools/perf, to
> represent a PERF_RECORD_FORK, perhaps we can call it 'struct
> threadpool_entry'?

Agreed.

>
> > +
> > +/**
> > + * init_pipes - initialize all pipes of @thread
> > + */
> > +static void init_pipes(struct thread_struct *thread)
> > +{
> > +       thread->pipes.from[0] = -1;
> > +       thread->pipes.from[1] = -1;
> > +       thread->pipes.to[0] = -1;
> > +       thread->pipes.to[1] = -1;
> > +}
> > +
> > +/**
> > + * open_pipes - open all pipes of @thread
> > + */
> > +static int open_pipes(struct thread_struct *thread)
>
> Here please:
>
> threadpool_entry__open_pipes()
>
> Its longer, but helps with ctags/cscope navigation and we can go
> directly to it via:
>
> :ta threadpool_entry__open_p<TAB>
>
> While 'ta: open_pipes' may bo to various places where this idiom is
> used.

Agreed.

<SNIP>
> > +/**
> > + * create_threadpool - create a fixed threadpool with @n_threads threads
> > + */
> > +struct threadpool_struct *create_threadpool(int n_threads)
>
>
> Is this already something the kernel has and thus we should keep the
> naming? I couldn't find it in the kernel, so please name it:
>
> struct threadpool *threadpool__new(int nthreads)

As before, I did this to keep consistency with workqueue.
Since this threadpool+workqueue can be a standalone library, I preferred to keep
the naming consistent inside it, instead of making it consistent with perf (this
is what I was referring to in the cover letter, not just the workqueue API).
What do you think?
I also prefer perf's naming conventions, but it'd feel strange to use two
different naming conventions inside the same library.

>
> > +{
> > +       int ret, t;
> > +       struct threadpool_struct *pool = malloc(sizeof(*pool));
> > +
> > +       if (!pool) {
> > +               pr_err("threadpool: cannot allocate pool: %s\n",
> > +                       strerror(errno));o
>
> Humm, pr_err() at this level isn't appropriate, please make callers
> complain.

ok.

>
> > +               return NULL;
> > +       }
> > +
> > +       if (n_threads <= 0) {
> > +               pr_err("threadpool: invalid number of threads: %d\n",
> > +                       n_threads);
>
> pr_debug()

ok

>
> > +               goto out_free_pool;
> > +       }
> > +
> > +       pool->nr_threads = n_threads;
> > +       pool->current_task = NULL;
> > +
> > +       pool->threads = malloc(n_threads * sizeof(*pool->threads));
> > +       if (!pool->threads) {
> > +               pr_err("threadpool: cannot allocate threads: %s\n",
> > +                       strerror(errno));
> > +               goto out_free_pool;
> > +       }
> > +
> > +       for (t = 0; t < n_threads; t++) {
> > +               pool->threads[t].idx = t;
> > +               pool->threads[t].tid = -1;
> > +               pool->threads[t].pool = pool;
> > +               init_pipes(&pool->threads[t]);
> > +       }
> > +
> > +       for (t = 0; t < n_threads; t++) {
> > +               ret = open_pipes(&pool->threads[t]);
> > +               if (ret)
> > +                       goto out_close_pipes;
> > +       }
> > +
> > +       pool->status = THREADPOOL_STATUS__STOPPED;
> > +
> > +       return pool;
> > +
> > +out_close_pipes:
> > +       for (t = 0; t < n_threads; t++)
> > +               close_pipes(&pool->threads[t]);
> > +
> > +       free(pool->threads);
> > +out_free_pool:
> > +       free(pool);
> > +       return NULL;
>
> Here we can use ERR_PTR()/PTR_ERR() to let the caller know what was the
> problem, i.e. we can ditch all the pr_err/pr_debug(), etc and instead
> have a threadpool__strerror(struct threadpool *pool, int err) like we
> have for 'struct evsel', please take a look at evsel__open_strerror().

Thanks, I'll have a look at it.
So, what I sould do is not use pr_* higher than debug inside library code and
return meaningful errors through PR_ERR, right?

>
>
> > +}
> > +
> > +/**
> > + * destroy_threadpool - free the @pool and all its resources
> > + */
> > +void destroy_threadpool(struct threadpool_struct *pool)
>
>
> void threadpool__delete(struct threadpool *pool)
> > +{
> > +       int t;
> > +
> > +       if (!pool)
> > +               return;
> > +
> > +       WARN_ON(pool->status != THREADPOOL_STATUS__STOPPED
> > +               && pool->status != THREADPOOL_STATUS__ERROR);
> > +
> > +       for (t = 0; t < pool->nr_threads; t++)
> > +               close_pipes(&pool->threads[t]);
>
> reset pool->threads[t] to -1

already inside close_pipes. I agree it might be confusing without the
threadpool_entry__ prefix.

>
> > +
> > +       free(pool->threads);
>
> zfree

In general, when should I use zfree instead of free?

>
> > +       free(pool);
> > +}
> > +
> > +/**
> > + * threadpool_size - get number of threads in the threadpool
> > + */
> > +int threadpool_size(struct threadpool_struct *pool)
>  
> threadpool__size()

ok

Thanks,
Riccardo

>
> > +{
> > +       return pool->nr_threads;
> > +}
> > diff --git a/tools/perf/util/workqueue/threadpool.h
> > b/tools/perf/util/workqueue/threadpool.h
> > new file mode 100644
> > index 0000000000000000..2b9388c768a0b588
> > --- /dev/null
> > +++ b/tools/perf/util/workqueue/threadpool.h
> > @@ -0,0 +1,19 @@
> > +/* SPDX-License-Identifier: GPL-2.0 */
> > +#ifndef __WORKQUEUE_THREADPOOL_H
> > +#define __WORKQUEUE_THREADPOOL_H
> > +
> > +struct threadpool_struct;
> > +struct task_struct;
> > +
> > +typedef void (*task_func_t)(int tidx, struct task_struct *task);
> > +
> > +struct task_struct {
> > +       task_func_t fn;
> > +};
> > +
> > +extern struct threadpool_struct *create_threadpool(int n_threads);
> > +extern void destroy_threadpool(struct threadpool_struct *pool);
> > +
> > +extern int threadpool_size(struct threadpool_struct *pool);
> > +
> > +#endif /* __WORKQUEUE_THREADPOOL_H */
> > --
> > 2.31.1
> >
>



2021-07-15 17:07:18

by Riccardo Mancini

[permalink] [raw]
Subject: Re: [RFC PATCH 02/10] perf tests: add test for workqueue

Hi Arnaldo,

On Wed, 2021-07-14 at 12:10 -0300, Arnaldo Carvalho de Melo wrote:
> Em Tue, Jul 13, 2021 at 02:11:13PM +0200, Riccardo Mancini escreveu:
> > It will have subtests testing threadpool and workqueue separately.
> > This patch only introduces the first subtest, checking that the
> > threadpool is correctly created and destructed.
> > This test will be expanded when new functions are added in next
> > patches.
> >
> > Signed-off-by: Riccardo Mancini <[email protected]>
<SNIP>
> > +
> > +
> > +static int __test__threadpool(void *_args)
> > +{
> > +       struct threadpool_test_args_t *args = _args;
> > +       struct threadpool_struct *pool;
> > +       int ret;
> > +
> > +       ret = __threadpool__prepare(&pool, args->pool_size);
>
> Turn the last three lines into one;

ok

>
>         int ret = __threadpool__prepare(&pool, args->pool_size);
>
> > +       if (ret)
> > +               return ret;
> > +
> > +       ret = __threadpool__teardown(pool);
> > +       if (ret)
> > +               return ret;
> > +
> > +       return 0;
>
> Humm, will you add something here in the following csets? Otherwise turn
> these 5 lines into one:
>
>         return __threadpool__teardown(pool);

ok, it was just copy-paste from above.

Thanks,
Riccardo

>
> > +}
> > +
> > +static const struct threadpool_test_args_t threadpool_test_args[] = {
> > +       {
> > +               .pool_size = 1
> > +       },
> > +       {
> > +               .pool_size = 2
> > +       },
> > +       {
> > +               .pool_size = 4
> > +       },
> > +       {
> > +               .pool_size = 8
> > +       },
> > +       {
> > +               .pool_size = 16
> > +       }
> > +};
> > +
> > +struct test_case {
> > +       const char *desc;
> > +       int (*func)(void *args);
> > +       void *args;
> > +       int n_args;
> > +       int arg_size;
> > +};
> > +
> > +static struct test_case workqueue_testcase_table[] = {
> > +       {
> > +               .desc = "Threadpool",
> > +               .func = __test__threadpool,
> > +               .args = (void *) threadpool_test_args,
> > +               .n_args = (int)ARRAY_SIZE(threadpool_test_args),
> > +               .arg_size = sizeof(struct threadpool_test_args_t)
> > +       }
> > +};
> > +
> > +
> > +int test__workqueue(struct test *test __maybe_unused, int i)
> > +{
> > +       int j, ret = 0;
> > +       struct test_case *tc;
> > +
> > +       if (i < 0 || i >= (int)ARRAY_SIZE(workqueue_testcase_table))
> > +               return -1;
> > +
> > +       tc = &workqueue_testcase_table[i];
> > +
> > +       for (j = 0; j < tc->n_args; j++) {
> > +               ret = tc->func(tc->args + (j*tc->arg_size));
> > +               if (ret)
> > +                       return ret;
> > +       }
> > +
> > +       return 0;
> > +}
> > +
> > +
> > +int test__workqueue_subtest_get_nr(void)
> > +{
> > +       return (int)ARRAY_SIZE(workqueue_testcase_table);
> > +}
> > +
> > +const char *test__workqueue_subtest_get_desc(int i)
> > +{
> > +       if (i < 0 || i >= (int)ARRAY_SIZE(workqueue_testcase_table))
> > +               return NULL;
> > +       return workqueue_testcase_table[i].desc;
> > +}
> > --
> > 2.31.1
> >
>


2021-07-15 17:24:57

by Riccardo Mancini

[permalink] [raw]
Subject: Re: [RFC PATCH 06/10] perf workqueue: introduce workqueue struct

Hi Arnaldo,
thanks again for having a look at this patchset!

On Wed, 2021-07-14 at 12:22 -0300, Arnaldo Carvalho de Melo wrote:
> Em Tue, Jul 13, 2021 at 02:11:17PM +0200, Riccardo Mancini escreveu:
> > This patch adds the workqueue definition, along with simple creation and
> > destruction functions.
> > Furthermore, a simple subtest is added.
> >
> > A workqueue is attached to a pool, on which it executes its workers.
> > Next patches will introduce workers.
> >
> > Signed-off-by: Riccardo Mancini <[email protected]>
> >
<SNIP>
> > +
> > +/**
> > + * create_workqueue - create a workqueue associated to @pool
> > + *
> > + * Only one workqueue can execute on a pool at a time.
> > + */
> > +struct workqueue_struct *create_workqueue(struct threadpool_struct *pool)
>
> I wonder if we should use the exact same kernel signature and not pass a
> threadpool, essentially having just one threadpool in tools/perf/ that
> is used by create_workqueue(void)?

I wondered the same thing, but I thought that we'd need it to be dynamically
created to prevent spawning threads at the beginning that might not even be
used.
I think this could be a follow-up patch.

Thanks,
Riccardo

>
> > +{
> > +       int err;
> > +       struct workqueue_struct *wq = malloc(sizeof(struct
> > workqueue_struct));
> > +
> > +
> > +       err = pthread_mutex_init(&wq->lock, NULL);
> > +       if (err)
> > +               goto out_free_wq;
> > +
> > +       err = pthread_cond_init(&wq->idle_cond, NULL);
> > +       if (err)
> > +               goto out_destroy_mutex;
> > +
> > +       wq->pool = NULL;
> > +       INIT_LIST_HEAD(&wq->busy_list);
> > +       INIT_LIST_HEAD(&wq->idle_list);
> > +
> > +       INIT_LIST_HEAD(&wq->pending);
> > +
> > +       err = pipe(wq->msg_pipe);
> > +       if (err)
> > +               goto out_destroy_cond;
> > +
> > +       wq->task.fn = worker_thread;
> > +
> > +       err = attach_threadpool_to_workqueue(wq, pool);
> > +       if (err)
> > +               goto out_destroy_cond;
> > +
> > +       wq->status = WORKQUEUE_STATUS__READY;
> > +
> > +       return wq;
> > +
> > +out_destroy_cond:
> > +       pthread_cond_destroy(&wq->idle_cond);
> > +out_destroy_mutex:
> > +       pthread_mutex_destroy(&wq->lock);
> > +out_free_wq:
> > +       free(wq);
> > +       return NULL;
> > +}
> > +
> > +/**
> > + * destroy_workqueue - stop @wq workers and destroy @wq
> > + */
> > +int destroy_workqueue(struct workqueue_struct *wq)
> > +{
> > +       int err = 0, ret;
> > +
> > +       ret = detach_threadpool_from_workqueue(wq);
> > +       if (ret) {
> > +               pr_err("workqueue: error detaching from threadpool.\n");
> > +               err = -1;
> > +       }
> > +
> > +       ret = pthread_mutex_destroy(&wq->lock);
> > +       if (ret) {
> > +               err = -1;
> > +               pr_err("workqueue: error pthread_mutex_destroy: %s\n",
> > +                       strerror(errno));
> > +       }
> > +
> > +       ret = pthread_cond_destroy(&wq->idle_cond);
> > +       if (ret) {
> > +               err = -1;
> > +               pr_err("workqueue: error pthread_cond_destroy: %s\n",
> > +                       strerror(errno));
> > +       }
> > +
> > +       ret = close(wq->msg_pipe[0]);
> > +       if (ret) {
> > +               err = -1;
> > +               pr_err("workqueue: error close msg_pipe[0]: %s\n",
> > +                       strerror(errno));
> > +       }
> > +
> > +       ret = close(wq->msg_pipe[1]);
> > +       if (ret) {
> > +               err = -1;
> > +               pr_err("workqueue: error close msg_pipe[1]: %s\n",
> > +                       strerror(errno));
> > +       }
> > +
> > +       free(wq);
> > +
> > +       return err;
> > +}
> > +
> > +/**
> > + * workqueue_nr_threads - get size of threadpool underlying @wq
> > + */
> > +int workqueue_nr_threads(struct workqueue_struct *wq)
> > +{
> > +       return threadpool_size(wq->pool);
> > +}
> > diff --git a/tools/perf/util/workqueue/workqueue.h
> > b/tools/perf/util/workqueue/workqueue.h
> > new file mode 100644
> > index 0000000000000000..86ec1d69274f41db
> > --- /dev/null
> > +++ b/tools/perf/util/workqueue/workqueue.h
> > @@ -0,0 +1,24 @@
> > +/* SPDX-License-Identifier: GPL-2.0 */
> > +#ifndef __WORKQUEUE_WORKQUEUE_H
> > +#define __WORKQUEUE_WORKQUEUE_H
> > +
> > +#include <stdlib.h>
> > +#include <sys/types.h>
> > +#include <linux/list.h>
> > +#include "threadpool.h"
> > +
> > +struct work_struct;
> > +typedef void (*work_func_t)(struct work_struct *work);
> > +
> > +struct work_struct {
> > +       struct list_head entry;
> > +       work_func_t func;
> > +};
> > +
> > +struct workqueue_struct;
> > +
> > +extern struct workqueue_struct *create_workqueue(struct threadpool_struct
> > *pool);
> > +extern int destroy_workqueue(struct workqueue_struct *wq);
> > +
> > +extern int workqueue_nr_threads(struct workqueue_struct *wq);
> > +#endif /* __WORKQUEUE_WORKQUEUE_H */
> > --
> > 2.31.1
> >
>


2021-07-15 20:46:02

by Arnaldo Carvalho de Melo

[permalink] [raw]
Subject: Re: [RFC PATCH 03/10] perf workqueue: add threadpool start and stop functions

Em Thu, Jul 15, 2021 at 06:42:16PM +0200, Riccardo Mancini escreveu:
> Hi Arnaldo,
>
> On Wed, 2021-07-14 at 12:15 -0300, Arnaldo Carvalho de Melo wrote:
> > Em Tue, Jul 13, 2021 at 02:11:14PM +0200, Riccardo Mancini escreveu:
> > > +++ b/tools/perf/util/workqueue/threadpool.c
> > > @@ -4,12 +4,23 @@
> > > ?#include <unistd.h>
> > > ?#include <errno.h>
> > > ?#include <string.h>
> > > +#include <pthread.h>
> > > +#include <signal.h>
> > > +#include <syscall.h>
> > > ?#include "debug.h"
> > > ?#include "asm/bug.h"
> > > ?#include "threadpool.h"
> > > ?
> > > +#ifndef HAVE_GETTID
> > > +static inline pid_t gettid(void)
> > > +{
> > > +???????return (pid_t)syscall(__NR_gettid);
> > > +}
> > > +#endif

> > Isn't this defined elsewhere? Yeah, when we decide to move it to
> > tools/lib/workqueue/ we'll need it, but for now, reduce patch size.

> No, it's just statically defined in tools/perf/jvmti/jvmti_agent.c.
> I saw there is a libc_compat.h header in tools/include/tools, I could put this
> definition there, and remove the one from jvmti_agent.c.

Please, do it as a prep patch.

Thanks,

- Arnaldo

2021-07-15 20:49:18

by Arnaldo Carvalho de Melo

[permalink] [raw]
Subject: Re: [RFC PATCH 06/10] perf workqueue: introduce workqueue struct

Em Thu, Jul 15, 2021 at 06:49:57PM +0200, Riccardo Mancini escreveu:
> Hi Arnaldo,
> thanks again for having a look at this patchset!
>
> On Wed, 2021-07-14 at 12:22 -0300, Arnaldo Carvalho de Melo wrote:
> > Em Tue, Jul 13, 2021 at 02:11:17PM +0200, Riccardo Mancini escreveu:
> > > This patch adds the workqueue definition, along with simple creation and
> > > destruction functions.
> > > Furthermore, a simple subtest is added.
> > >
> > > A workqueue is attached to a pool, on which it executes its workers.
> > > Next patches will introduce workers.
> > >
> > > Signed-off-by: Riccardo Mancini <[email protected]>
> > >
> <SNIP>
> > > +
> > > +/**
> > > + * create_workqueue - create a workqueue associated to @pool
> > > + *
> > > + * Only one workqueue can execute on a pool at a time.
> > > + */
> > > +struct workqueue_struct *create_workqueue(struct threadpool_struct *pool)
> >
> > I wonder if we should use the exact same kernel signature and not pass a
> > threadpool, essentially having just one threadpool in tools/perf/ that
> > is used by create_workqueue(void)?
>
> I wondered the same thing, but I thought that we'd need it to be dynamically
> created to prevent spawning threads at the beginning that might not even be
> used.
> I think this could be a follow-up patch.

I see your point.

My practice is to use the perf convention for things that don't come
from the kernel, and if we do as I suggested, tooling will probably not
even see this threadpool struct, i.e. just the workqueue APIs are
exposed and workqueue_create() will notice that a threadpool is not yet
created, doing its creation behind tooling's back.

- Arnaldo

> Thanks,
> Riccardo
>
> >
> > > +{
> > > +???????int err;
> > > +???????struct workqueue_struct *wq = malloc(sizeof(struct
> > > workqueue_struct));
> > > +
> > > +
> > > +???????err = pthread_mutex_init(&wq->lock, NULL);
> > > +???????if (err)
> > > +???????????????goto out_free_wq;
> > > +
> > > +???????err = pthread_cond_init(&wq->idle_cond, NULL);
> > > +???????if (err)
> > > +???????????????goto out_destroy_mutex;
> > > +
> > > +???????wq->pool = NULL;
> > > +???????INIT_LIST_HEAD(&wq->busy_list);
> > > +???????INIT_LIST_HEAD(&wq->idle_list);
> > > +
> > > +???????INIT_LIST_HEAD(&wq->pending);
> > > +
> > > +???????err = pipe(wq->msg_pipe);
> > > +???????if (err)
> > > +???????????????goto out_destroy_cond;
> > > +
> > > +???????wq->task.fn = worker_thread;
> > > +
> > > +???????err = attach_threadpool_to_workqueue(wq, pool);
> > > +???????if (err)
> > > +???????????????goto out_destroy_cond;
> > > +
> > > +???????wq->status = WORKQUEUE_STATUS__READY;
> > > +
> > > +???????return wq;
> > > +
> > > +out_destroy_cond:
> > > +???????pthread_cond_destroy(&wq->idle_cond);
> > > +out_destroy_mutex:
> > > +???????pthread_mutex_destroy(&wq->lock);
> > > +out_free_wq:
> > > +???????free(wq);
> > > +???????return NULL;
> > > +}
> > > +
> > > +/**
> > > + * destroy_workqueue - stop @wq workers and destroy @wq
> > > + */
> > > +int destroy_workqueue(struct workqueue_struct *wq)
> > > +{
> > > +???????int err = 0, ret;
> > > +
> > > +???????ret = detach_threadpool_from_workqueue(wq);
> > > +???????if (ret) {
> > > +???????????????pr_err("workqueue: error detaching from threadpool.\n");
> > > +???????????????err = -1;
> > > +???????}
> > > +
> > > +???????ret = pthread_mutex_destroy(&wq->lock);
> > > +???????if (ret) {
> > > +???????????????err = -1;
> > > +???????????????pr_err("workqueue: error pthread_mutex_destroy: %s\n",
> > > +???????????????????????strerror(errno));
> > > +???????}
> > > +
> > > +???????ret = pthread_cond_destroy(&wq->idle_cond);
> > > +???????if (ret) {
> > > +???????????????err = -1;
> > > +???????????????pr_err("workqueue: error pthread_cond_destroy: %s\n",
> > > +???????????????????????strerror(errno));
> > > +???????}
> > > +
> > > +???????ret = close(wq->msg_pipe[0]);
> > > +???????if (ret) {
> > > +???????????????err = -1;
> > > +???????????????pr_err("workqueue: error close msg_pipe[0]: %s\n",
> > > +???????????????????????strerror(errno));
> > > +???????}
> > > +
> > > +???????ret = close(wq->msg_pipe[1]);
> > > +???????if (ret) {
> > > +???????????????err = -1;
> > > +???????????????pr_err("workqueue: error close msg_pipe[1]: %s\n",
> > > +???????????????????????strerror(errno));
> > > +???????}
> > > +
> > > +???????free(wq);
> > > +
> > > +???????return err;
> > > +}
> > > +
> > > +/**
> > > + * workqueue_nr_threads - get size of threadpool underlying @wq
> > > + */
> > > +int workqueue_nr_threads(struct workqueue_struct *wq)
> > > +{
> > > +???????return threadpool_size(wq->pool);
> > > +}
> > > diff --git a/tools/perf/util/workqueue/workqueue.h
> > > b/tools/perf/util/workqueue/workqueue.h
> > > new file mode 100644
> > > index 0000000000000000..86ec1d69274f41db
> > > --- /dev/null
> > > +++ b/tools/perf/util/workqueue/workqueue.h
> > > @@ -0,0 +1,24 @@
> > > +/* SPDX-License-Identifier: GPL-2.0 */
> > > +#ifndef __WORKQUEUE_WORKQUEUE_H
> > > +#define __WORKQUEUE_WORKQUEUE_H
> > > +
> > > +#include <stdlib.h>
> > > +#include <sys/types.h>
> > > +#include <linux/list.h>
> > > +#include "threadpool.h"
> > > +
> > > +struct work_struct;
> > > +typedef void (*work_func_t)(struct work_struct *work);
> > > +
> > > +struct work_struct {
> > > +???????struct list_head entry;
> > > +???????work_func_t func;
> > > +};
> > > +
> > > +struct workqueue_struct;
> > > +
> > > +extern struct workqueue_struct *create_workqueue(struct threadpool_struct
> > > *pool);
> > > +extern int destroy_workqueue(struct workqueue_struct *wq);
> > > +
> > > +extern int workqueue_nr_threads(struct workqueue_struct *wq);
> > > +#endif /* __WORKQUEUE_WORKQUEUE_H */
> > > --
> > > 2.31.1
> > >
> >
>
>

--

- Arnaldo

2021-07-15 20:49:45

by Arnaldo Carvalho de Melo

[permalink] [raw]
Subject: Re: [RFC PATCH 01/10] perf workqueue: threadpool creation and destruction

Em Thu, Jul 15, 2021 at 06:31:07PM +0200, Riccardo Mancini escreveu:
> Hi Arnaldo,
>
> thanks for reviewing the patch!
>
> On Wed, 2021-07-14 at 11:16 -0300, Arnaldo Carvalho de Melo wrote:
> <SNIP>
> > > +
> > > +enum threadpool_status {
> > > +???????THREADPOOL_STATUS__STOPPED,?????????????/* no threads */
> > > +???????THREADPOOL_STATUS__ERROR,???????????????/* errors */
> > > +???????THREADPOOL_STATUS__MAX
> > > +};
> > > +
> > > +struct threadpool_struct {
> >
> > Can this be just 'struct threadpool'? I think its descriptive enough:
>
> I agree, but I wanted to keep the naming consistent between workqueue.c and
> threadpool.c.
>
> >
> > > +???????int?????????????????????nr_threads;?????/* number of threads in the
> > > pool */
> > > +???????struct thread_struct????*threads;???????/* array of threads in the
> > > pool */
> > > +???????struct task_struct??????*current_task;??/* current executing
> > > function
> > > */
> > > +???????enum threadpool_status??status;?????????/* current status of the
> > > pool
> > > */
> > > +};
> > > +
> > > +struct thread_struct {
> > > +???????int?????????????????????????????idx;????/* idx of thread in pool-
> > > > threads */
> > > +???????pid_t???????????????????????????tid;????/* tid of thread */
> > > +???????struct threadpool_struct????????*pool;??/* parent threadpool */
> > > +???????struct {
> > > +???????????????int from[2];????????????????????/* messages from thread
> > > (acks)
> > > */
> > > +???????????????int to[2];??????????????????????/* messages to thread
> > > (commands) */
> > > +???????} pipes;
> > > +};
> >
> > This one, since we have already a 'struct thread' in tools/perf, to
> > represent a PERF_RECORD_FORK, perhaps we can call it 'struct
> > threadpool_entry'?
>
> Agreed.
>
> >
> > > +
> > > +/**
> > > + * init_pipes - initialize all pipes of @thread
> > > + */
> > > +static void init_pipes(struct thread_struct *thread)
> > > +{
> > > +???????thread->pipes.from[0] = -1;
> > > +???????thread->pipes.from[1] = -1;
> > > +???????thread->pipes.to[0] = -1;
> > > +???????thread->pipes.to[1] = -1;
> > > +}
> > > +
> > > +/**
> > > + * open_pipes - open all pipes of @thread
> > > + */
> > > +static int open_pipes(struct thread_struct *thread)
> >
> > Here please:
> >
> > threadpool_entry__open_pipes()
> >
> > Its longer, but helps with ctags/cscope navigation and we can go
> > directly to it via:
> >
> > :ta threadpool_entry__open_p<TAB>
> >
> > While 'ta: open_pipes' may bo to various places where this idiom is
> > used.
>
> Agreed.
>
> <SNIP>
> > > +/**
> > > + * create_threadpool - create a fixed threadpool with @n_threads threads
> > > + */
> > > +struct threadpool_struct *create_threadpool(int n_threads)
> >
> >
> > Is this already something the kernel has and thus we should keep the
> > naming? I couldn't find it in the kernel, so please name it:
> >
> > struct threadpool *threadpool__new(int nthreads)
>
> As before, I did this to keep consistency with workqueue.
> Since this threadpool+workqueue can be a standalone library, I preferred to keep
> the naming consistent inside it, instead of making it consistent with perf (this
> is what I was referring to in the cover letter, not just the workqueue API).
> What do you think?
> I also prefer perf's naming conventions, but it'd feel strange to use two
> different naming conventions inside the same library.

See my comment on the other message about this naming dilemma :-)

> >
> > > +{
> > > +???????int ret, t;
> > > +???????struct threadpool_struct *pool = malloc(sizeof(*pool));
> > > +
> > > +???????if (!pool) {
> > > +???????????????pr_err("threadpool: cannot allocate pool: %s\n",
> > > +???????????????????????strerror(errno));o
> >
> > Humm, pr_err() at this level isn't appropriate, please make callers
> > complain.
>
> ok.
>
> >
> > > +???????????????return NULL;
> > > +???????}
> > > +
> > > +???????if (n_threads <= 0) {
> > > +???????????????pr_err("threadpool: invalid number of threads: %d\n",
> > > +???????????????????????n_threads);
> >
> > pr_debug()
>
> ok
>
> >
> > > +???????????????goto out_free_pool;
> > > +???????}
> > > +
> > > +???????pool->nr_threads = n_threads;
> > > +???????pool->current_task = NULL;
> > > +
> > > +???????pool->threads = malloc(n_threads * sizeof(*pool->threads));
> > > +???????if (!pool->threads) {
> > > +???????????????pr_err("threadpool: cannot allocate threads: %s\n",
> > > +???????????????????????strerror(errno));
> > > +???????????????goto out_free_pool;
> > > +???????}
> > > +
> > > +???????for (t = 0; t < n_threads; t++) {
> > > +???????????????pool->threads[t].idx = t;
> > > +???????????????pool->threads[t].tid = -1;
> > > +???????????????pool->threads[t].pool = pool;
> > > +???????????????init_pipes(&pool->threads[t]);
> > > +???????}
> > > +
> > > +???????for (t = 0; t < n_threads; t++) {
> > > +???????????????ret = open_pipes(&pool->threads[t]);
> > > +???????????????if (ret)
> > > +???????????????????????goto out_close_pipes;
> > > +???????}
> > > +
> > > +???????pool->status = THREADPOOL_STATUS__STOPPED;
> > > +
> > > +???????return pool;
> > > +
> > > +out_close_pipes:
> > > +???????for (t = 0; t < n_threads; t++)
> > > +???????????????close_pipes(&pool->threads[t]);
> > > +
> > > +???????free(pool->threads);
> > > +out_free_pool:
> > > +???????free(pool);
> > > +???????return NULL;
> >
> > Here we can use ERR_PTR()/PTR_ERR() to let the caller know what was the
> > problem, i.e. we can ditch all the pr_err/pr_debug(), etc and instead
> > have a threadpool__strerror(struct threadpool *pool, int err) like we
> > have for 'struct evsel', please take a look at evsel__open_strerror().
>
> Thanks, I'll have a look at it.
> So, what I sould do is not use pr_* higher than debug inside library code and
> return meaningful errors through PR_ERR, right?

Right.

> >
> >
> > > +}
> > > +
> > > +/**
> > > + * destroy_threadpool - free the @pool and all its resources
> > > + */
> > > +void destroy_threadpool(struct threadpool_struct *pool)
> >
> >
> > void threadpool__delete(struct threadpool *pool)
> > > +{
> > > +???????int t;
> > > +
> > > +???????if (!pool)
> > > +???????????????return;
> > > +
> > > +???????WARN_ON(pool->status != THREADPOOL_STATUS__STOPPED
> > > +???????????????&& pool->status != THREADPOOL_STATUS__ERROR);
> > > +
> > > +???????for (t = 0; t < pool->nr_threads; t++)
> > > +???????????????close_pipes(&pool->threads[t]);
> >
> > reset pool->threads[t] to -1
>
> already inside close_pipes. I agree it might be confusing without the
> threadpool_entry__ prefix.
>
> >
> > > +
> > > +???????free(pool->threads);
> >
> > zfree
>
> In general, when should I use zfree instead of free?
>
> >
> > > +???????free(pool);
> > > +}
> > > +
> > > +/**
> > > + * threadpool_size - get number of threads in the threadpool
> > > + */
> > > +int threadpool_size(struct threadpool_struct *pool)
> > ?
> > threadpool__size()
>
> ok
>
> Thanks,
> Riccardo
>
> >
> > > +{
> > > +???????return pool->nr_threads;
> > > +}
> > > diff --git a/tools/perf/util/workqueue/threadpool.h
> > > b/tools/perf/util/workqueue/threadpool.h
> > > new file mode 100644
> > > index 0000000000000000..2b9388c768a0b588
> > > --- /dev/null
> > > +++ b/tools/perf/util/workqueue/threadpool.h
> > > @@ -0,0 +1,19 @@
> > > +/* SPDX-License-Identifier: GPL-2.0 */
> > > +#ifndef __WORKQUEUE_THREADPOOL_H
> > > +#define __WORKQUEUE_THREADPOOL_H
> > > +
> > > +struct threadpool_struct;
> > > +struct task_struct;
> > > +
> > > +typedef void (*task_func_t)(int tidx, struct task_struct *task);
> > > +
> > > +struct task_struct {
> > > +???????task_func_t fn;
> > > +};
> > > +
> > > +extern struct threadpool_struct *create_threadpool(int n_threads);
> > > +extern void destroy_threadpool(struct threadpool_struct *pool);
> > > +
> > > +extern int threadpool_size(struct threadpool_struct *pool);
> > > +
> > > +#endif /* __WORKQUEUE_THREADPOOL_H */
> > > --
> > > 2.31.1
> > >
> >
>
>
>

--

- Arnaldo

2021-07-15 21:01:37

by Riccardo Mancini

[permalink] [raw]
Subject: Re: [RFC PATCH 03/10] perf workqueue: add threadpool start and stop functions

Hi Arnaldo,

On Wed, 2021-07-14 at 12:15 -0300, Arnaldo Carvalho de Melo wrote:
> Em Tue, Jul 13, 2021 at 02:11:14PM +0200, Riccardo Mancini escreveu:
> > This patch adds the start and stop functions, alongside the thread
> > function.
> > Each thread will run until a stop signal is received.
> > Furthermore, start and stop are added to the test.
> >
> > Thread management is based on the prototype from Alexey:
> > https://lore.kernel.org/lkml/[email protected]/
> >
> > Suggested-by: Alexey Bayduraev <[email protected]>
> > Signed-off-by: Riccardo Mancini <[email protected]>
<SNIP>
> >  
> >  static int __threadpool__teardown(struct threadpool_struct *pool)
> >  {
> > +       int ret;
> > +
> > +       ret = stop_threadpool(pool);
>
>         int ret = stop_threadpool(pool);

ok

>
> > +       TEST_ASSERT_VAL("threadpool start failure", ret == 0);
> > +       TEST_ASSERT_VAL("stopped threadpool is ready",
> > +                       !threadpool_is_ready(pool));
> > +
> >         destroy_threadpool(pool);
> >  
> >         return 0;
> > diff --git a/tools/perf/util/workqueue/threadpool.c
> > b/tools/perf/util/workqueue/threadpool.c
> > index 70c67569f956a3e2..f4635ff782b9388e 100644
> > --- a/tools/perf/util/workqueue/threadpool.c
> > +++ b/tools/perf/util/workqueue/threadpool.c
> > @@ -4,12 +4,23 @@
> >  #include <unistd.h>
> >  #include <errno.h>
> >  #include <string.h>
> > +#include <pthread.h>
> > +#include <signal.h>
> > +#include <syscall.h>
> >  #include "debug.h"
> >  #include "asm/bug.h"
> >  #include "threadpool.h"
> >  
> > +#ifndef HAVE_GETTID
> > +static inline pid_t gettid(void)
> > +{
> > +       return (pid_t)syscall(__NR_gettid);
> > +}
> > +#endif
>
> Isn't this defined elsewhere? Yeah, when we decide to move it to
> tools/lib/workqueue/ we'll need it, but for now, reduce patch size.

No, it's just statically defined in tools/perf/jvmti/jvmti_agent.c.
I saw there is a libc_compat.h header in tools/include/tools, I could put this
definition there, and remove the one from jvmti_agent.c.

<SNIP>
> > +/**
> > + * wait_thread - receive ack from thread
> > + *
> > + * NB: call only from main thread!
> > + */
> > +static int wait_thread(struct thread_struct *thread)
> > +{
> > +       int res;
> > +       enum thread_msg msg = THREAD_MSG__UNDEFINED;
> > +
> > +       res = read(thread->pipes.from[0], &msg, sizeof(msg));
>
>         int res = read(thread->pipes.from[0], &msg, sizeof(msg));

ok

<SNIP>
> > +/**
> > + * threadpool_thread - function running on thread
> > + *
> > + * This function waits for a signal from main thread to start executing
> > + * a task.
> > + * On completion, it will go back to sleep, waiting for another signal.
> > + * Signals are delivered through pipes.
> > + */
> > +static void *threadpool_thread(void *args)
>
>    threadpool_function()
>
>  ETOMANY 'thread' in a name.

Agreed :)

<SNIP>

> > +/**
> > + * start_threadpool - start all threads in the pool.
> > + *
> > + * The function blocks until all threads are up and running.
> > + */
> > +int start_threadpool(struct threadpool_struct *pool)
>
> int threadpool__start(struct threadpool *pool)

ok

>
> > +{
> > +       int err;
> > +
> > +       if (pool->status != THREADPOOL_STATUS__STOPPED) {
> > +               pr_err("threadpool: starting not stopped pool\n");
> > +               return -1;
> > +       }
> > +
> > +       err = __start_threadpool(pool);
> > +       pool->status = err ? THREADPOOL_STATUS__ERROR :
> > THREADPOOL_STATUS__READY;
> > +       return err;
> > +}
> > +
> > +/**
> > + * stop_threadpool - stop all threads in the pool.
> > + *
> > + * This function blocks waiting for ack from all threads.
> > + */
> > +int stop_threadpool(struct threadpool_struct *pool)
>
> int threadpool__stop(struct threadpool *pool)

ok

>
> > +{
> > +       int t, ret, err = 0;
> > +
> > +       if (pool->status != THREADPOOL_STATUS__READY) {
> > +               pr_err("threadpool: stopping not ready pool\n");
> > +               return -1;
> > +       }
> > +
> > +       for (t = 0; t < pool->nr_threads; t++) {
> > +               ret = terminate_thread(&pool->threads[t]);
> > +               if (ret && !err)
> > +                       err = -1;
> > +       }
> > +
> > +       pool->status = err ? THREADPOOL_STATUS__ERROR :
> > THREADPOOL_STATUS__STOPPED;
> > +
> > +       return err;
> > +}
> > +
> > +/**
> > + * threadpool_is_ready - check if the threads are running
> > + */
> > +bool threadpool_is_ready(struct threadpool_struct *pool)
>
> bool threadpool__is_ready(struct threadpool *pool)

ok

Thanks,
Riccardo

>
> > +{
> > +       return pool->status == THREADPOOL_STATUS__READY;
> > +}
> > diff --git a/tools/perf/util/workqueue/threadpool.h
> > b/tools/perf/util/workqueue/threadpool.h
> > index 2b9388c768a0b588..b62cad2b2c5dd331 100644
> > --- a/tools/perf/util/workqueue/threadpool.h
> > +++ b/tools/perf/util/workqueue/threadpool.h
> > @@ -14,6 +14,11 @@ struct task_struct {
> >  extern struct threadpool_struct *create_threadpool(int n_threads);
> >  extern void destroy_threadpool(struct threadpool_struct *pool);
> >  
> > +extern int start_threadpool(struct threadpool_struct *pool);
> > +extern int stop_threadpool(struct threadpool_struct *pool);
> > +
> >  extern int threadpool_size(struct threadpool_struct *pool);
> >  
> > +extern bool threadpool_is_ready(struct threadpool_struct *pool);
> > +
> >  #endif /* __WORKQUEUE_THREADPOOL_H */
> > --
> > 2.31.1
> >
>


2021-07-15 23:31:44

by Namhyung Kim

[permalink] [raw]
Subject: Re: [RFC PATCH 01/10] perf workqueue: threadpool creation and destruction

Hi Riccardo and Arnaldo,

On Wed, Jul 14, 2021 at 7:16 AM Arnaldo Carvalho de Melo
<[email protected]> wrote:
>
> Em Tue, Jul 13, 2021 at 02:11:12PM +0200, Riccardo Mancini escreveu:
> > The workqueue library is made up by two components:
> > - threadpool: handles the lifetime of the threads
> > - workqueue: handles work distribution among the threads
> >
> > This first patch introduces the threadpool, starting from its creation
> > and destruction functions.
> > Thread management is based on the prototype from Alexey:
> > https://lore.kernel.org/lkml/[email protected]/
> >
> > Each thread in the threadpool executes the same function (aka task)
> > with a different argument tidx.
> > Threads use a pair of pipes to communicate with the main process.
> > The threadpool is static (all threads will be spawned at the same time).
> > Future work could include making it resizable and adding affinity support
> > (as in Alexey prototype).
> >
> > Suggested-by: Alexey Bayduraev <[email protected]>
> > Signed-off-by: Riccardo Mancini <[email protected]>
> > ---
> > tools/perf/util/Build | 1 +
> > tools/perf/util/workqueue/Build | 1 +
> > tools/perf/util/workqueue/threadpool.c | 175 +++++++++++++++++++++++++
> > tools/perf/util/workqueue/threadpool.h | 19 +++
> > 4 files changed, 196 insertions(+)
> > create mode 100644 tools/perf/util/workqueue/Build
> > create mode 100644 tools/perf/util/workqueue/threadpool.c
> > create mode 100644 tools/perf/util/workqueue/threadpool.h
> >
> > diff --git a/tools/perf/util/Build b/tools/perf/util/Build
> > index 2d4fa13041789cd6..c7b09701661c869d 100644
> > --- a/tools/perf/util/Build
> > +++ b/tools/perf/util/Build
> > @@ -180,6 +180,7 @@ perf-$(CONFIG_LIBBABELTRACE) += data-convert-bt.o
> > perf-y += data-convert-json.o
> >
> > perf-y += scripting-engines/
> > +perf-y += workqueue/
> >
> > perf-$(CONFIG_ZLIB) += zlib.o
> > perf-$(CONFIG_LZMA) += lzma.o
> > diff --git a/tools/perf/util/workqueue/Build b/tools/perf/util/workqueue/Build
> > new file mode 100644
> > index 0000000000000000..8b72a6cd4e2cba0d
> > --- /dev/null
> > +++ b/tools/perf/util/workqueue/Build
> > @@ -0,0 +1 @@
> > +perf-y += threadpool.o
> > diff --git a/tools/perf/util/workqueue/threadpool.c b/tools/perf/util/workqueue/threadpool.c
> > new file mode 100644
> > index 0000000000000000..70c67569f956a3e2
> > --- /dev/null
> > +++ b/tools/perf/util/workqueue/threadpool.c
> > @@ -0,0 +1,175 @@
> > +// SPDX-License-Identifier: GPL-2.0
> > +#include <stdlib.h>
> > +#include <stdio.h>
> > +#include <unistd.h>
> > +#include <errno.h>
> > +#include <string.h>
> > +#include "debug.h"
> > +#include "asm/bug.h"
> > +#include "threadpool.h"
> > +
> > +enum threadpool_status {
> > + THREADPOOL_STATUS__STOPPED, /* no threads */
> > + THREADPOOL_STATUS__ERROR, /* errors */
> > + THREADPOOL_STATUS__MAX
> > +};
> > +
> > +struct threadpool_struct {
>
> Can this be just 'struct threadpool'? I think its descriptive enough:
>
> > + int nr_threads; /* number of threads in the pool */
> > + struct thread_struct *threads; /* array of threads in the pool */
> > + struct task_struct *current_task; /* current executing function */

Does this mean it can only have a single function to run?
Why do we need it?


> > + enum threadpool_status status; /* current status of the pool */
> > +};
> > +
> > +struct thread_struct {
> > + int idx; /* idx of thread in pool->threads */
> > + pid_t tid; /* tid of thread */
> > + struct threadpool_struct *pool; /* parent threadpool */
> > + struct {
> > + int from[2]; /* messages from thread (acks) */
> > + int to[2]; /* messages to thread (commands) */

It can be confusing if you think from the main thread.
Maybe 'ack' and 'cmd' would be better.


> > + } pipes;
> > +};
>
> This one, since we have already a 'struct thread' in tools/perf, to
> represent a PERF_RECORD_FORK, perhaps we can call it 'struct threadpool_entry'?

I think we can even use 'worker' instead of 'thread' but it requires
huge renaming and conflicts so I won't insist on it strongly. :)

Thanks,
Namhyung

2021-07-15 23:52:25

by Namhyung Kim

[permalink] [raw]
Subject: Re: [RFC PATCH 03/10] perf workqueue: add threadpool start and stop functions

On Tue, Jul 13, 2021 at 5:11 AM Riccardo Mancini <[email protected]> wrote:
>
> This patch adds the start and stop functions, alongside the thread
> function.
> Each thread will run until a stop signal is received.
> Furthermore, start and stop are added to the test.
>
> Thread management is based on the prototype from Alexey:
> https://lore.kernel.org/lkml/[email protected]/
>
> Suggested-by: Alexey Bayduraev <[email protected]>
> Signed-off-by: Riccardo Mancini <[email protected]>
> ---
> tools/perf/tests/workqueue.c | 13 ++
> tools/perf/util/workqueue/threadpool.c | 238 +++++++++++++++++++++++++
> tools/perf/util/workqueue/threadpool.h | 5 +
> 3 files changed, 256 insertions(+)
>
> diff --git a/tools/perf/tests/workqueue.c b/tools/perf/tests/workqueue.c
> index 1bd4d78c13eb3b14..be377e9897bab4e9 100644
> --- a/tools/perf/tests/workqueue.c
> +++ b/tools/perf/tests/workqueue.c
> @@ -10,16 +10,29 @@ struct threadpool_test_args_t {
>
> static int __threadpool__prepare(struct threadpool_struct **pool, int pool_size)
> {
> + int ret;
> +
> *pool = create_threadpool(pool_size);
> TEST_ASSERT_VAL("threadpool creation failure", *pool != NULL);
> TEST_ASSERT_VAL("threadpool size is wrong",
> threadpool_size(*pool) == pool_size);
>
> + ret = start_threadpool(*pool);
> + TEST_ASSERT_VAL("threadpool start failure", ret == 0);
> + TEST_ASSERT_VAL("threadpool is not ready", threadpool_is_ready(*pool));
> +
> return 0;
> }
>
> static int __threadpool__teardown(struct threadpool_struct *pool)
> {
> + int ret;
> +
> + ret = stop_threadpool(pool);
> + TEST_ASSERT_VAL("threadpool start failure", ret == 0);

s/start/stop/

> + TEST_ASSERT_VAL("stopped threadpool is ready",
> + !threadpool_is_ready(pool));
> +
> destroy_threadpool(pool);
>
> return 0;
> diff --git a/tools/perf/util/workqueue/threadpool.c b/tools/perf/util/workqueue/threadpool.c
> index 70c67569f956a3e2..f4635ff782b9388e 100644
> --- a/tools/perf/util/workqueue/threadpool.c
> +++ b/tools/perf/util/workqueue/threadpool.c
[SNIP]
> +/**
> + * wait_thread - receive ack from thread
> + *
> + * NB: call only from main thread!
> + */
> +static int wait_thread(struct thread_struct *thread)
> +{
> + int res;
> + enum thread_msg msg = THREAD_MSG__UNDEFINED;
> +
> + res = read(thread->pipes.from[0], &msg, sizeof(msg));
> + if (res < 0) {

Maybe it needs to handle -EINTR.


> + pr_err("threadpool: failed to recv msg from tid=%d: %s\n",
> + thread->tid, strerror(errno));
> + return -1;
> + }
> + if (msg != THREAD_MSG__ACK) {
> + pr_err("threadpool: received unexpected msg from tid=%d: %s\n",
> + thread->tid, thread_msg_tags[msg]);
> + return -1;
> + }
> +
> + pr_debug2("threadpool: received ack from tid=%d\n", thread->tid);
> +
> + return 0;
> +}
> +
> +/**
> + * terminate_thread - send stop signal to thread and wait for ack
> + *
> + * NB: call only from main thread!
> + */
> +static int terminate_thread(struct thread_struct *thread)
> +{
> + int res;
> + enum thread_msg msg = THREAD_MSG__STOP;
> +
> + res = write(thread->pipes.to[1], &msg, sizeof(msg));
> + if (res < 0) {
> + pr_err("threadpool: error sending stop msg to tid=%d: %s\n",
> + thread->tid, strerror(errno));
> + return res;
> + }
> +
> + res = wait_thread(thread);
> +
> + return res;
> +}
> +
> +/**
> + * threadpool_thread - function running on thread
> + *
> + * This function waits for a signal from main thread to start executing
> + * a task.
> + * On completion, it will go back to sleep, waiting for another signal.
> + * Signals are delivered through pipes.
> + */
> +static void *threadpool_thread(void *args)
> +{
> + struct thread_struct *thread = (struct thread_struct *) args;
> + enum thread_msg msg;
> + int err;
> +
> + thread->tid = gettid();
> +
> + pr_debug2("threadpool[%d]: started\n", thread->tid);
> +
> + for (;;) {
> + msg = THREAD_MSG__ACK;
> + err = write(thread->pipes.from[1], &msg, sizeof(msg));
> + if (err == -1) {
> + pr_err("threadpool[%d]: failed to send ack: %s\n",
> + thread->tid, strerror(errno));
> + break;
> + }
> +
> + msg = THREAD_MSG__UNDEFINED;
> + err = read(thread->pipes.to[0], &msg, sizeof(msg));
> + if (err < 0) {
> + pr_err("threadpool[%d]: error receiving msg: %s\n",
> + thread->tid, strerror(errno));
> + break;
> + }
> +
> + if (msg != THREAD_MSG__WAKE && msg != THREAD_MSG__STOP) {
> + pr_err("threadpool[%d]: received unexpected msg: %s\n",
> + thread->tid, thread_msg_tags[msg]);
> + break;
> + }
> +
> + if (msg == THREAD_MSG__STOP)
> + break;
> + }
> +
> + pr_debug2("threadpool[%d]: exit\n", thread->tid);
> +
> + msg = THREAD_MSG__ACK;
> + err = write(thread->pipes.from[1], &msg, sizeof(msg));
> + if (err == -1) {
> + pr_err("threadpool[%d]: failed to send ack: %s\n",
> + thread->tid, strerror(errno));
> + return NULL;
> + }
> +
> + return NULL;
> +}
> +
> /**
> * create_threadpool - create a fixed threadpool with @n_threads threads
> */
> @@ -173,3 +306,108 @@ int threadpool_size(struct threadpool_struct *pool)
> {
> return pool->nr_threads;
> }
> +
> +/**
> + * __start_threadpool - start all threads in the pool.
> + *
> + * This function does not change @pool->status.
> + */
> +static int __start_threadpool(struct threadpool_struct *pool)
> +{
> + int t, tt, ret = 0, nr_threads = pool->nr_threads;
> + sigset_t full, mask;
> + pthread_t handle;
> + pthread_attr_t attrs;
> +
> + sigfillset(&full);
> + if (sigprocmask(SIG_SETMASK, &full, &mask)) {
> + pr_err("Failed to block signals on threads start: %s\n",
> + strerror(errno));
> + return -1;
> + }
> +
> + pthread_attr_init(&attrs);
> + pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
> +
> + for (t = 0; t < nr_threads; t++) {
> + struct thread_struct *thread = &pool->threads[t];
> +
> + if (pthread_create(&handle, &attrs, threadpool_thread, thread)) {
> + for (tt = 1; tt < t; tt++)
> + terminate_thread(thread);
> + pr_err("Failed to start threads: %s\n", strerror(errno));
> + ret = -1;
> + goto out_free_attr;
> + }
> +
> + if (wait_thread(thread)) {
> + for (tt = 1; tt <= t; tt++)
> + terminate_thread(thread);
> + ret = -1;
> + goto out_free_attr;
> + }
> + }

Isn't it better doing this way?

for (t = 0; t < nr_threads; t++) {
pthread_create(t)
}

for (t = 0; t < nr_threads; t++) {
wait_thread(t)
}

Thanks,
Namhyung


> +
> +out_free_attr:
> + pthread_attr_destroy(&attrs);
> +
> + if (sigprocmask(SIG_SETMASK, &mask, NULL)) {
> + pr_err("Failed to unblock signals on threads start: %s\n",
> + strerror(errno));
> + ret = -1;
> + }
> +
> + return ret;
> +}
> +

2021-07-15 23:58:04

by Namhyung Kim

[permalink] [raw]
Subject: Re: [RFC PATCH 04/10] perf workqueue: add threadpool execute and wait functions

On Tue, Jul 13, 2021 at 5:11 AM Riccardo Mancini <[email protected]> wrote:
>
> This patch adds:
> - execute_in_threadpool: assigns a task to the threads to execute
> asynchronously.
> - wait_threadpool: waits for the task to complete on all threads.
> Furthermore, testing for these new functions is added.
>
> This patch completes the threadpool.
>
> Signed-off-by: Riccardo Mancini <[email protected]>
> ---
> tools/perf/tests/workqueue.c | 86 ++++++++++++++++++++-
> tools/perf/util/workqueue/threadpool.c | 103 +++++++++++++++++++++++++
> tools/perf/util/workqueue/threadpool.h | 5 ++
> 3 files changed, 193 insertions(+), 1 deletion(-)
>
> diff --git a/tools/perf/tests/workqueue.c b/tools/perf/tests/workqueue.c
> index be377e9897bab4e9..3c64db8203556847 100644
> --- a/tools/perf/tests/workqueue.c
> +++ b/tools/perf/tests/workqueue.c
> @@ -1,13 +1,59 @@
> // SPDX-License-Identifier: GPL-2.0
> +#include <stdlib.h>
> #include <linux/kernel.h>
> +#include <linux/zalloc.h>
> #include "tests.h"
> #include "util/debug.h"
> #include "util/workqueue/threadpool.h"
>
> +#define DUMMY_FACTOR 100000
> +#define N_DUMMY_WORK_SIZES 7
> +
> struct threadpool_test_args_t {
> int pool_size;
> };
>
> +struct test_task {
> + struct task_struct task;
> + int n_threads;
> + int *array;
> +};
> +
> +/**
> + * dummy_work - calculates DUMMY_FACTOR * (idx % N_DUMMY_WORK_SIZES) inefficiently
> + *
> + * This function uses modulus to create work items of different sizes.
> + */
> +static void dummy_work(int idx)
> +{
> + int prod = 0;

I'm not sure but having 'volatile' would prevent some kind of
possible compiler optimizations..

> + int k = idx % N_DUMMY_WORK_SIZES;
> + int i, j;
> +
> + for (i = 0; i < DUMMY_FACTOR; i++)
> + for (j = 0; j < k; j++)
> + prod ++;
> +
> + pr_debug3("dummy: %d * %d = %d\n", DUMMY_FACTOR, k, prod);
> +}
> +
> +static void test_task_fn1(int tidx, struct task_struct *task)
> +{
> + struct test_task *mtask = container_of(task, struct test_task, task);
> +
> + dummy_work(tidx);
> + mtask->array[tidx] = tidx+1;
> +}
> +
> +static void test_task_fn2(int tidx, struct task_struct *task)
> +{
> + struct test_task *mtask = container_of(task, struct test_task, task);
> +
> + dummy_work(tidx);
> + mtask->array[tidx] = tidx*2;
> +}
> +
> +
> static int __threadpool__prepare(struct threadpool_struct **pool, int pool_size)
> {
> int ret;
> @@ -38,21 +84,59 @@ static int __threadpool__teardown(struct threadpool_struct *pool)
> return 0;
> }
>
> +static int __threadpool__exec_wait(struct threadpool_struct *pool,
> + struct task_struct *task)
> +{
> + int ret;
> +
> + ret = execute_in_threadpool(pool, task);
> + TEST_ASSERT_VAL("threadpool execute failure", ret == 0);
> + TEST_ASSERT_VAL("threadpool is not executing", threadpool_is_busy(pool));
> +
> + ret = wait_threadpool(pool);
> + TEST_ASSERT_VAL("threadpool wait failure", ret == 0);
> + TEST_ASSERT_VAL("waited threadpool is not ready", threadpool_is_ready(pool));
> +
> + return 0;
> +}
>
> static int __test__threadpool(void *_args)
> {
> struct threadpool_test_args_t *args = _args;
> struct threadpool_struct *pool;
> - int ret;
> + int ret, i;
> + struct test_task task;
> +
> + task.task.fn = test_task_fn1;
> + task.n_threads = args->pool_size;
> + task.array = calloc(args->pool_size, sizeof(*task.array));

Need to check the return value.

>
> ret = __threadpool__prepare(&pool, args->pool_size);
> if (ret)
> return ret;
>
> + ret = __threadpool__exec_wait(pool, &task.task);
> + if (ret)
> + return ret;
> +
> + for (i = 0; i < args->pool_size; i++)
> + TEST_ASSERT_VAL("failed array check (1)", task.array[i] == i+1);
> +
> + task.task.fn = test_task_fn2;
> +
> + ret = __threadpool__exec_wait(pool, &task.task);
> + if (ret)
> + return ret;
> +
> + for (i = 0; i < args->pool_size; i++)
> + TEST_ASSERT_VAL("failed array check (2)", task.array[i] == 2*i);
> +
> ret = __threadpool__teardown(pool);
> if (ret)
> return ret;
>
> + free(task.array);

All previous returns will leak it.

Thanks,
Namhyung

2021-07-16 13:38:09

by Riccardo Mancini

[permalink] [raw]
Subject: Re: [RFC PATCH 01/10] perf workqueue: threadpool creation and destruction

Hi Namhyung,
thanks for the review.

On Thu, 2021-07-15 at 16:29 -0700, Namhyung Kim wrote:
> Hi Riccardo and Arnaldo,
>
> On Wed, Jul 14, 2021 at 7:16 AM Arnaldo Carvalho de Melo
> <[email protected]> wrote:
> >
> > Em Tue, Jul 13, 2021 at 02:11:12PM +0200, Riccardo Mancini escreveu:
> > > The workqueue library is made up by two components:
> > >  - threadpool: handles the lifetime of the threads
> > >  - workqueue: handles work distribution among the threads
> > >
> > > This first patch introduces the threadpool, starting from its creation
> > > and destruction functions.
> > > Thread management is based on the prototype from Alexey:
> > > https://lore.kernel.org/lkml/[email protected]/
> > >
> > > Each thread in the threadpool executes the same function (aka task)
> > > with a different argument tidx.
> > > Threads use a pair of pipes to communicate with the main process.
> > > The threadpool is static (all threads will be spawned at the same time).
> > > Future work could include making it resizable and adding affinity support
> > > (as in Alexey prototype).
> > >
> > > Suggested-by: Alexey Bayduraev <[email protected]>
> > > Signed-off-by: Riccardo Mancini <[email protected]>
> > > ---
> > >  tools/perf/util/Build                  |   1 +
> > >  tools/perf/util/workqueue/Build        |   1 +
> > >  tools/perf/util/workqueue/threadpool.c | 175 +++++++++++++++++++++++++
> > >  tools/perf/util/workqueue/threadpool.h |  19 +++
> > >  4 files changed, 196 insertions(+)
> > >  create mode 100644 tools/perf/util/workqueue/Build
> > >  create mode 100644 tools/perf/util/workqueue/threadpool.c
> > >  create mode 100644 tools/perf/util/workqueue/threadpool.h
<SNIP>
> > > +
> > > +struct threadpool_struct {
> >
> > Can this be just 'struct threadpool'? I think its descriptive enough:
> >
> > > +     int                     nr_threads;     /* number of threads in the
> > > pool */
> > > +     struct thread_struct    *threads;       /* array of threads in the
> > > pool */
> > > +     struct task_struct      *current_task;  /* current executing
> > > function */
>
> Does this mean it can only have a single function to run?

Yes.

> Why do we need it?

My idea is to separate the workqueue from the actual implementation of the
threads. This way, when the function executing on the threadpool ends, the
threads are kept alive to execute new work. 
By adding this additional layer of abstraction, we can achieve more flexibility.
For example, the use-case I have in mind is to recycle the same threadpool for
both Alexey's threaded trace and the workqueue.
I don't think this could be easily achieved with just the workqueue since the
perf-record threads are not just a task that needs to be executed by they have
specific affinities to be respected.

What are your thoughts?

>
>
> > > +     enum threadpool_status  status;         /* current status of the
> > > pool */
> > > +};
> > > +
> > > +struct thread_struct {
> > > +     int                             idx;    /* idx of thread in pool-
> > > >threads */
> > > +     pid_t                           tid;    /* tid of thread */
> > > +     struct threadpool_struct        *pool;  /* parent threadpool */
> > > +     struct {
> > > +             int from[2];                    /* messages from thread
> > > (acks) */
> > > +             int to[2];                      /* messages to thread
> > > (commands) */
>
> It can be confusing if you think from the main thread.
> Maybe 'ack' and 'cmd' would be better.

Agreed.

>
>
> > > +     } pipes;
> > > +};
> >
> > This one, since we have already a 'struct thread' in tools/perf, to
> > represent a PERF_RECORD_FORK, perhaps we can call it 'struct
> > threadpool_entry'?
>
> I think we can even use 'worker' instead of 'thread' but it requires
> huge renaming and conflicts so I won't insist on it strongly.  :)

Also, worker internally conflicts with the workqueue's worker, which runs on a
(threadpool-)thread.
Another name I had in mind is pool_thread to prevent having too many 'thread' in
the name, but it might be confusing.
I think threadpool_entry is fine.

I have another question.
In general, when should I use zfree instead of free?

Thanks,
Riccardo

>
> Thanks,
> Namhyung


2021-07-16 13:57:36

by Riccardo Mancini

[permalink] [raw]
Subject: Re: [RFC PATCH 03/10] perf workqueue: add threadpool start and stop functions

Hi Namhyung,

On Thu, 2021-07-15 at 16:48 -0700, Namhyung Kim wrote:
> On Tue, Jul 13, 2021 at 5:11 AM Riccardo Mancini <[email protected]> wrote:
> >
> > This patch adds the start and stop functions, alongside the thread
> > function.
> > Each thread will run until a stop signal is received.
> > Furthermore, start and stop are added to the test.
> >
> > Thread management is based on the prototype from Alexey:
> > https://lore.kernel.org/lkml/[email protected]/
> >
> > Suggested-by: Alexey Bayduraev <[email protected]>
> > Signed-off-by: Riccardo Mancini <[email protected]>
> > ---
> >  tools/perf/tests/workqueue.c           |  13 ++
> >  tools/perf/util/workqueue/threadpool.c | 238 +++++++++++++++++++++++++
> >  tools/perf/util/workqueue/threadpool.h |   5 +
> >  3 files changed, 256 insertions(+)
> >
> > diff --git a/tools/perf/tests/workqueue.c b/tools/perf/tests/workqueue.c
> > index 1bd4d78c13eb3b14..be377e9897bab4e9 100644
> > --- a/tools/perf/tests/workqueue.c
> > +++ b/tools/perf/tests/workqueue.c
> > @@ -10,16 +10,29 @@ struct threadpool_test_args_t {
> >
> >  static int __threadpool__prepare(struct threadpool_struct **pool, int
> > pool_size)
> >  {
> > +       int ret;
> > +
> >         *pool = create_threadpool(pool_size);
> >         TEST_ASSERT_VAL("threadpool creation failure", *pool != NULL);
> >         TEST_ASSERT_VAL("threadpool size is wrong",
> >                         threadpool_size(*pool) == pool_size);
> >
> > +       ret = start_threadpool(*pool);
> > +       TEST_ASSERT_VAL("threadpool start failure", ret == 0);
> > +       TEST_ASSERT_VAL("threadpool is not ready",
> > threadpool_is_ready(*pool));
> > +
> >         return 0;
> >  }
> >
> >  static int __threadpool__teardown(struct threadpool_struct *pool)
> >  {
> > +       int ret;
> > +
> > +       ret = stop_threadpool(pool);
> > +       TEST_ASSERT_VAL("threadpool start failure", ret == 0);
>
> s/start/stop/
Thanks.
>
> > +       TEST_ASSERT_VAL("stopped threadpool is ready",
> > +                       !threadpool_is_ready(pool));
> > +
> >         destroy_threadpool(pool);
> >
> >         return 0;
> > diff --git a/tools/perf/util/workqueue/threadpool.c
> > b/tools/perf/util/workqueue/threadpool.c
> > index 70c67569f956a3e2..f4635ff782b9388e 100644
> > --- a/tools/perf/util/workqueue/threadpool.c
> > +++ b/tools/perf/util/workqueue/threadpool.c
> [SNIP]
> > +/**
> > + * wait_thread - receive ack from thread
> > + *
> > + * NB: call only from main thread!
> > + */
> > +static int wait_thread(struct thread_struct *thread)
> > +{
> > +       int res;
> > +       enum thread_msg msg = THREAD_MSG__UNDEFINED;
> > +
> > +       res = read(thread->pipes.from[0], &msg, sizeof(msg));
> > +       if (res < 0) {
>
> Maybe it needs to handle -EINTR.

Its behaviour should be retry, right?
Since these reads are used multiple times in the code, maybe I'm better off
writing a wrapper function handling also EINTR.

>
> > +               pr_err("threadpool: failed to recv msg from tid=%d: %s\n",
> > +                      thread->tid, strerror(errno));
> > +               return -1;
> > +       }
> > +       if (msg != THREAD_MSG__ACK) {
> > +               pr_err("threadpool: received unexpected msg from tid=%d:
> > %s\n",
> > +                      thread->tid, thread_msg_tags[msg]);
> > +               return -1;
> > +       }
> > +
> > +       pr_debug2("threadpool: received ack from tid=%d\n", thread->tid);
> > +
> > +       return 0;
> > +}
> >
<SNIP>
> > +static int __start_threadpool(struct threadpool_struct *pool)
> > +{
> > +       int t, tt, ret = 0, nr_threads = pool->nr_threads;
> > +       sigset_t full, mask;
> > +       pthread_t handle;
> > +       pthread_attr_t attrs;
> > +
> > +       sigfillset(&full);
> > +       if (sigprocmask(SIG_SETMASK, &full, &mask)) {
> > +               pr_err("Failed to block signals on threads start: %s\n",
> > +                       strerror(errno));
> > +               return -1;
> > +       }
> > +
> > +       pthread_attr_init(&attrs);
> > +       pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
> > +
> > +       for (t = 0; t < nr_threads; t++) {
> > +               struct thread_struct *thread = &pool->threads[t];
> > +
> > +               if (pthread_create(&handle, &attrs, threadpool_thread,
> > thread)) {
> > +                       for (tt = 1; tt < t; tt++)
> > +                               terminate_thread(thread);
> > +                       pr_err("Failed to start threads: %s\n",
> > strerror(errno));
> > +                       ret = -1;
> > +                       goto out_free_attr;
> > +               }
> > +
> > +               if (wait_thread(thread)) {
> > +                       for (tt = 1; tt <= t; tt++)
> > +                               terminate_thread(thread);
> > +                       ret = -1;
> > +                       goto out_free_attr;
> > +               }
> > +       }
>
> Isn't it better doing this way?
>
> for (t = 0; t < nr_threads; t++) {
>     pthread_create(t)
> }
>
> for (t = 0; t < nr_threads; t++) {
>     wait_thread(t)
> }

I wondered the same thing, but I saw that it was done like that also in Alexey
patch, so I kept it like so.
To me, it also looks like it should be not a problem doing as you suggest. It
should also be more efficient.

Thanks,
Riccardo

>
> Thanks,
> Namhyung
>
>
> > +
> > +out_free_attr:
> > +       pthread_attr_destroy(&attrs);
> > +
> > +       if (sigprocmask(SIG_SETMASK, &mask, NULL)) {
> > +               pr_err("Failed to unblock signals on threads start: %s\n",
> > +                       strerror(errno));
> > +               ret = -1;
> > +       }
> > +
> > +       return ret;
> > +}
> > +


2021-07-16 13:59:35

by Riccardo Mancini

[permalink] [raw]
Subject: Re: [RFC PATCH 04/10] perf workqueue: add threadpool execute and wait functions

Hi Namhyung,
thanks again for the review.

On Thu, 2021-07-15 at 16:56 -0700, Namhyung Kim wrote:
> On Tue, Jul 13, 2021 at 5:11 AM Riccardo Mancini <[email protected]> wrote:
> >
> > This patch adds:
> >  - execute_in_threadpool: assigns a task to the threads to execute
> >    asynchronously.
> >  - wait_threadpool: waits for the task to complete on all threads.
> > Furthermore, testing for these new functions is added.
> >
> > This patch completes the threadpool.
> >
> > Signed-off-by: Riccardo Mancini <[email protected]>
> > ---
> >  tools/perf/tests/workqueue.c           |  86 ++++++++++++++++++++-
> >  tools/perf/util/workqueue/threadpool.c | 103 +++++++++++++++++++++++++
> >  tools/perf/util/workqueue/threadpool.h |   5 ++
> >  3 files changed, 193 insertions(+), 1 deletion(-)
> >
> > diff --git a/tools/perf/tests/workqueue.c b/tools/perf/tests/workqueue.c
> > index be377e9897bab4e9..3c64db8203556847 100644
> > --- a/tools/perf/tests/workqueue.c
> > +++ b/tools/perf/tests/workqueue.c
> > @@ -1,13 +1,59 @@
> >  // SPDX-License-Identifier: GPL-2.0
> > +#include <stdlib.h>
> >  #include <linux/kernel.h>
> > +#include <linux/zalloc.h>
> >  #include "tests.h"
> >  #include "util/debug.h"
> >  #include "util/workqueue/threadpool.h"
> >
> > +#define DUMMY_FACTOR 100000
> > +#define N_DUMMY_WORK_SIZES 7
> > +
> >  struct threadpool_test_args_t {
> >         int pool_size;
> >  };
> >
> > +struct test_task {
> > +       struct task_struct task;
> > +       int n_threads;
> > +       int *array;
> > +};
> > +
> > +/**
> > + * dummy_work - calculates DUMMY_FACTOR * (idx % N_DUMMY_WORK_SIZES)
> > inefficiently
> > + *
> > + * This function uses modulus to create work items of different sizes.
> > + */
> > +static void dummy_work(int idx)
> > +{
> > +       int prod = 0;
>
> I'm not sure but having 'volatile' would prevent some kind of
> possible compiler optimizations..

Agreed.

>
> > +       int k = idx % N_DUMMY_WORK_SIZES;
> > +       int i, j;
> > +
> > +       for (i = 0; i < DUMMY_FACTOR; i++)
> > +               for (j = 0; j < k; j++)
> > +                       prod ++;
> > +
> > +       pr_debug3("dummy: %d * %d = %d\n", DUMMY_FACTOR, k, prod);
> > +}
> > +
> > +static void test_task_fn1(int tidx, struct task_struct *task)
> > +{
> > +       struct test_task *mtask = container_of(task, struct test_task,
> > task);
> > +
> > +       dummy_work(tidx);
> > +       mtask->array[tidx] = tidx+1;
> > +}
> > +
> > +static void test_task_fn2(int tidx, struct task_struct *task)
> > +{
> > +       struct test_task *mtask = container_of(task, struct test_task,
> > task);
> > +
> > +       dummy_work(tidx);
> > +       mtask->array[tidx] = tidx*2;
> > +}
> > +
> > +
> >  static int __threadpool__prepare(struct threadpool_struct **pool, int
> > pool_size)
> >  {
> >         int ret;
> > @@ -38,21 +84,59 @@ static int __threadpool__teardown(struct
> > threadpool_struct *pool)
> >         return 0;
> >  }
> >
> > +static int __threadpool__exec_wait(struct threadpool_struct *pool,
> > +                               struct task_struct *task)
> > +{
> > +       int ret;
> > +
> > +       ret = execute_in_threadpool(pool, task);
> > +       TEST_ASSERT_VAL("threadpool execute failure", ret == 0);
> > +       TEST_ASSERT_VAL("threadpool is not executing",
> > threadpool_is_busy(pool));
> > +
> > +       ret = wait_threadpool(pool);
> > +       TEST_ASSERT_VAL("threadpool wait failure", ret == 0);
> > +       TEST_ASSERT_VAL("waited threadpool is not ready",
> > threadpool_is_ready(pool));
> > +
> > +       return 0;
> > +}
> >
> >  static int __test__threadpool(void *_args)
> >  {
> >         struct threadpool_test_args_t *args = _args;
> >         struct threadpool_struct *pool;
> > -       int ret;
> > +       int ret, i;
> > +       struct test_task task;
> > +
> > +       task.task.fn = test_task_fn1;
> > +       task.n_threads = args->pool_size;
> > +       task.array = calloc(args->pool_size, sizeof(*task.array));
>
> Need to check the return value.

Thanks.

>
> >
> >         ret = __threadpool__prepare(&pool, args->pool_size);
> >         if (ret)
> >                 return ret;
> >
> > +       ret = __threadpool__exec_wait(pool, &task.task);
> > +       if (ret)
> > +               return ret;
> > +
> > +       for (i = 0; i < args->pool_size; i++)
> > +               TEST_ASSERT_VAL("failed array check (1)", task.array[i] ==
> > i+1);
> > +
> > +       task.task.fn = test_task_fn2;
> > +
> > +       ret = __threadpool__exec_wait(pool, &task.task);
> > +       if (ret)
> > +               return ret;
> > +
> > +       for (i = 0; i < args->pool_size; i++)
> > +               TEST_ASSERT_VAL("failed array check (2)", task.array[i] ==
> > 2*i);
> > +
> >         ret = __threadpool__teardown(pool);
> >         if (ret)
> >                 return ret;
> >
> > +       free(task.array);
>
> All previous returns will leak it.

Oh, right.

Thanks,
Riccardo

>
> Thanks,
> Namhyung


2021-07-16 16:32:07

by Arnaldo Carvalho de Melo

[permalink] [raw]
Subject: Re: [RFC PATCH 03/10] perf workqueue: add threadpool start and stop functions

Em Fri, Jul 16, 2021 at 03:53:58PM +0200, Riccardo Mancini escreveu:
> On Thu, 2021-07-15 at 16:48 -0700, Namhyung Kim wrote:
> > On Tue, Jul 13, 2021 at 5:11 AM Riccardo Mancini <[email protected]> wrote:
> > > +++ b/tools/perf/util/workqueue/threadpool.c
> > [SNIP]
> > > +/**
> > > + * wait_thread - receive ack from thread
> > > + *
> > > + * NB: call only from main thread!
> > > + */
> > > +static int wait_thread(struct thread_struct *thread)
> > > +{
> > > +?????? int res;
> > > +?????? enum thread_msg msg = THREAD_MSG__UNDEFINED;
> > > +
> > > +?????? res = read(thread->pipes.from[0], &msg, sizeof(msg));
> > > +?????? if (res < 0) {

> > Maybe it needs to handle -EINTR.

> Its behaviour should be retry, right?
> Since these reads are used multiple times in the code, maybe I'm better off
> writing a wrapper function handling also EINTR.

Take a look at readn():

tools/lib/perf/lib.c

static ssize_t ion(bool is_read, int fd, void *buf, size_t n)
{
void *buf_start = buf;
size_t left = n;

while (left) {
/* buf must be treated as const if !is_read. */
ssize_t ret = is_read ? read(fd, buf, left) :
write(fd, buf, left);

if (ret < 0 && errno == EINTR)
continue;
if (ret <= 0)
return ret;

left -= ret;
buf += ret;
}

BUG_ON((size_t)(buf - buf_start) != n);
return n;
}

/*
* Read exactly 'n' bytes or return an error.
*/
ssize_t readn(int fd, void *buf, size_t n)
{
return ion(true, fd, buf, n);
}


- Arnaldo

2021-07-19 20:29:29

by Namhyung Kim

[permalink] [raw]
Subject: Re: [RFC PATCH 01/10] perf workqueue: threadpool creation and destruction

On Fri, Jul 16, 2021 at 6:36 AM Riccardo Mancini <[email protected]> wrote:
>
> Hi Namhyung,
> thanks for the review.
>
> On Thu, 2021-07-15 at 16:29 -0700, Namhyung Kim wrote:
> > Hi Riccardo and Arnaldo,
> >
> > On Wed, Jul 14, 2021 at 7:16 AM Arnaldo Carvalho de Melo
> > <[email protected]> wrote:
> > >
> > > Em Tue, Jul 13, 2021 at 02:11:12PM +0200, Riccardo Mancini escreveu:
> > > > The workqueue library is made up by two components:
> > > > - threadpool: handles the lifetime of the threads
> > > > - workqueue: handles work distribution among the threads
> > > >
> > > > This first patch introduces the threadpool, starting from its creation
> > > > and destruction functions.
> > > > Thread management is based on the prototype from Alexey:
> > > > https://lore.kernel.org/lkml/[email protected]/
> > > >
> > > > Each thread in the threadpool executes the same function (aka task)
> > > > with a different argument tidx.
> > > > Threads use a pair of pipes to communicate with the main process.
> > > > The threadpool is static (all threads will be spawned at the same time).
> > > > Future work could include making it resizable and adding affinity support
> > > > (as in Alexey prototype).
> > > >
> > > > Suggested-by: Alexey Bayduraev <[email protected]>
> > > > Signed-off-by: Riccardo Mancini <[email protected]>
> > > > ---
> > > > tools/perf/util/Build | 1 +
> > > > tools/perf/util/workqueue/Build | 1 +
> > > > tools/perf/util/workqueue/threadpool.c | 175 +++++++++++++++++++++++++
> > > > tools/perf/util/workqueue/threadpool.h | 19 +++
> > > > 4 files changed, 196 insertions(+)
> > > > create mode 100644 tools/perf/util/workqueue/Build
> > > > create mode 100644 tools/perf/util/workqueue/threadpool.c
> > > > create mode 100644 tools/perf/util/workqueue/threadpool.h
> <SNIP>
> > > > +
> > > > +struct threadpool_struct {
> > >
> > > Can this be just 'struct threadpool'? I think its descriptive enough:
> > >
> > > > + int nr_threads; /* number of threads in the
> > > > pool */
> > > > + struct thread_struct *threads; /* array of threads in the
> > > > pool */
> > > > + struct task_struct *current_task; /* current executing
> > > > function */
> >
> > Does this mean it can only have a single function to run?
>
> Yes.
>
> > Why do we need it?
>
> My idea is to separate the workqueue from the actual implementation of the
> threads. This way, when the function executing on the threadpool ends, the
> threads are kept alive to execute new work.
> By adding this additional layer of abstraction, we can achieve more flexibility.
> For example, the use-case I have in mind is to recycle the same threadpool for
> both Alexey's threaded trace and the workqueue.
> I don't think this could be easily achieved with just the workqueue since the
> perf-record threads are not just a task that needs to be executed by they have
> specific affinities to be respected.
>
> What are your thoughts?

I'm fine with the separation of work(queue) and thread-pool.

I thought the backing thread-pool is general and can handle
multiple works at the same time.

The work queue should keep track of works it submitted
and their status. We can have multiple workqueues
sharing a single thread pool.


>
> >
> >
> > > > + enum threadpool_status status; /* current status of the
> > > > pool */
> > > > +};
> > > > +
> > > > +struct thread_struct {
> > > > + int idx; /* idx of thread in pool-
> > > > >threads */
> > > > + pid_t tid; /* tid of thread */
> > > > + struct threadpool_struct *pool; /* parent threadpool */
> > > > + struct {
> > > > + int from[2]; /* messages from thread
> > > > (acks) */
> > > > + int to[2]; /* messages to thread
> > > > (commands) */
> >
> > It can be confusing if you think from the main thread.
> > Maybe 'ack' and 'cmd' would be better.
>
> Agreed.
>
> >
> >
> > > > + } pipes;
> > > > +};
> > >
> > > This one, since we have already a 'struct thread' in tools/perf, to
> > > represent a PERF_RECORD_FORK, perhaps we can call it 'struct
> > > threadpool_entry'?
> >
> > I think we can even use 'worker' instead of 'thread' but it requires
> > huge renaming and conflicts so I won't insist on it strongly. :)
>
> Also, worker internally conflicts with the workqueue's worker, which runs on a
> (threadpool-)thread.
> Another name I had in mind is pool_thread to prevent having too many 'thread' in
> the name, but it might be confusing.
> I think threadpool_entry is fine.
>
> I have another question.
> In general, when should I use zfree instead of free?

I think the zfree is generally preferable to free.
Especially if the pointer can be accessed after free.

Thanks,
Namhyung

2021-07-19 21:45:07

by Jiri Olsa

[permalink] [raw]
Subject: Re: [RFC PATCH 00/10] perf: add workqueue library and use it in synthetic-events

On Tue, Jul 13, 2021 at 02:11:11PM +0200, Riccardo Mancini wrote:
> This patchset introduces a new utility library inside perf/util, which
> provides a work queue abstraction, which loosely follows the Kernel
> workqueue API.
>
> The workqueue abstraction is made up by two components:
> - threadpool: which takes care of managing a pool of threads. It is
> inspired by the prototype for threaded trace in perf-record from Alexey:
> https://lore.kernel.org/lkml/[email protected]/
> - workqueue: manages a shared queue and provides the workers implementation.
>
> On top of the workqueue, a simple parallel-for utility is implemented
> which is then showcased in synthetic-events.c, replacing the previous
> manual pthread-created threads.
>
> Through some experiments with perf bench, I can see how the new
> workqueue has a higher overhead compared to manual creation of threads,
> but is able to more effectively partition work among threads, yielding
> a better result with more threads.
> Furthermore, the overhead could be configured by changing the
> `work_size` (currently 1), aka the number of dirents that are
> processed by a thread before grabbing a lock to get the new work item.
> I experimented with different sizes but, while bigger sizes reduce overhead
> as expected, they do not scale as well to more threads.
>
> I tried to keep the patchset as simple as possible, deferring possible
> improvements and features to future work.
> Naming a few:
> - in order to achieve a better performance, we could consider using
> work-stealing instead of a common queue.
> - affinities in the thread pool, as in Alexey prototype for
> perf-record. Doing so would enable reusing the same threadpool for
> different purposes (evlist open, threaded trace, synthetic threads),
> avoiding having to spin up threads multiple times.
> - resizable threadpool, e.g. for lazy spawining of threads.
>
> @Arnaldo
> Since I wanted the workqueue to provide a similar API to the Kernel's
> workqueue, I followed the naming style I found there, instead of the
> usual object__method style that is typically found in perf.
> Let me know if you'd like me to follow perf style instead.
>
> Thanks,
> Riccardo
>
> Riccardo Mancini (10):
> perf workqueue: threadpool creation and destruction
> perf tests: add test for workqueue
> perf workqueue: add threadpool start and stop functions
> perf workqueue: add threadpool execute and wait functions
> perf workqueue: add sparse annotation header
> perf workqueue: introduce workqueue struct
> perf workqueue: implement worker thread and management
> perf workqueue: add queue_work and flush_workqueue functions
> perf workqueue: add utility to execute a for loop in parallel
> perf synthetic-events: use workqueue parallel_for

looks great, would it make sense to put this to libperf?

jirka

>
> tools/perf/tests/Build | 1 +
> tools/perf/tests/builtin-test.c | 9 +
> tools/perf/tests/tests.h | 3 +
> tools/perf/tests/workqueue.c | 453 +++++++++++++++++
> tools/perf/util/Build | 1 +
> tools/perf/util/synthetic-events.c | 131 +++--
> tools/perf/util/workqueue/Build | 2 +
> tools/perf/util/workqueue/sparse.h | 21 +
> tools/perf/util/workqueue/threadpool.c | 516 ++++++++++++++++++++
> tools/perf/util/workqueue/threadpool.h | 29 ++
> tools/perf/util/workqueue/workqueue.c | 642 +++++++++++++++++++++++++
> tools/perf/util/workqueue/workqueue.h | 38 ++
> 12 files changed, 1771 insertions(+), 75 deletions(-)
> create mode 100644 tools/perf/tests/workqueue.c
> create mode 100644 tools/perf/util/workqueue/Build
> create mode 100644 tools/perf/util/workqueue/sparse.h
> create mode 100644 tools/perf/util/workqueue/threadpool.c
> create mode 100644 tools/perf/util/workqueue/threadpool.h
> create mode 100644 tools/perf/util/workqueue/workqueue.c
> create mode 100644 tools/perf/util/workqueue/workqueue.h
>
> --
> 2.31.1
>

2021-07-22 16:18:18

by Riccardo Mancini

[permalink] [raw]
Subject: Re: [RFC PATCH 00/10] perf: add workqueue library and use it in synthetic-events

Hi Jiri,

On Mon, 2021-07-19 at 23:13 +0200, Jiri Olsa wrote:
> On Tue, Jul 13, 2021 at 02:11:11PM +0200, Riccardo Mancini wrote:
> > This patchset introduces a new utility library inside perf/util, which
> > provides a work queue abstraction, which loosely follows the Kernel
> > workqueue API.
> >
> > The workqueue abstraction is made up by two components:
> >  - threadpool: which takes care of managing a pool of threads. It is
> >    inspired by the prototype for threaded trace in perf-record from Alexey:
> >   
> > https://lore.kernel.org/lkml/[email protected]/
> >  - workqueue: manages a shared queue and provides the workers
> > implementation.
> >
> > On top of the workqueue, a simple parallel-for utility is implemented
> > which is then showcased in synthetic-events.c, replacing the previous
> > manual pthread-created threads.
> >
> > Through some experiments with perf bench, I can see how the new
> > workqueue has a higher overhead compared to manual creation of threads,
> > but is able to more effectively partition work among threads, yielding
> > a better result with more threads.
> > Furthermore, the overhead could be configured by changing the
> > `work_size` (currently 1), aka the number of dirents that are
> > processed by a thread before grabbing a lock to get the new work item.
> > I experimented with different sizes but, while bigger sizes reduce overhead
> > as expected, they do not scale as well to more threads.
> >
> > I tried to keep the patchset as simple as possible, deferring possible
> > improvements and features to future work.
> > Naming a few:
> >  - in order to achieve a better performance, we could consider using
> >    work-stealing instead of a common queue.
> >  - affinities in the thread pool, as in Alexey prototype for
> >    perf-record. Doing so would enable reusing the same threadpool for
> >    different purposes (evlist open, threaded trace, synthetic threads),
> >    avoiding having to spin up threads multiple times.
> >  - resizable threadpool, e.g. for lazy spawining of threads.
> >
> > @Arnaldo
> > Since I wanted the workqueue to provide a similar API to the Kernel's
> > workqueue, I followed the naming style I found there, instead of the
> > usual object__method style that is typically found in perf.
> > Let me know if you'd like me to follow perf style instead.
> >
> > Thanks,
> > Riccardo
> >
> > Riccardo Mancini (10):
> >   perf workqueue: threadpool creation and destruction
> >   perf tests: add test for workqueue
> >   perf workqueue: add threadpool start and stop functions
> >   perf workqueue: add threadpool execute and wait functions
> >   perf workqueue: add sparse annotation header
> >   perf workqueue: introduce workqueue struct
> >   perf workqueue: implement worker thread and management
> >   perf workqueue: add queue_work and flush_workqueue functions
> >   perf workqueue: add utility to execute a for loop in parallel
> >   perf synthetic-events: use workqueue parallel_for
>
> looks great, would it make sense to put this to libperf?

I don't know about libperf in particular.
The idea is to start using it in perf and, if everything goes well, to put it in
lib/ so that everyone interested in it could just include it.
Since I'm looking for other parts where a workqueue could be useful, if you know
of some in libperf, I could try having a look at them too.

Riccardo

>
> jirka
>
> >
> >  tools/perf/tests/Build                 |   1 +
> >  tools/perf/tests/builtin-test.c        |   9 +
> >  tools/perf/tests/tests.h               |   3 +
> >  tools/perf/tests/workqueue.c           | 453 +++++++++++++++++
> >  tools/perf/util/Build                  |   1 +
> >  tools/perf/util/synthetic-events.c     | 131 +++--
> >  tools/perf/util/workqueue/Build        |   2 +
> >  tools/perf/util/workqueue/sparse.h     |  21 +
> >  tools/perf/util/workqueue/threadpool.c | 516 ++++++++++++++++++++
> >  tools/perf/util/workqueue/threadpool.h |  29 ++
> >  tools/perf/util/workqueue/workqueue.c  | 642 +++++++++++++++++++++++++
> >  tools/perf/util/workqueue/workqueue.h  |  38 ++
> >  12 files changed, 1771 insertions(+), 75 deletions(-)
> >  create mode 100644 tools/perf/tests/workqueue.c
> >  create mode 100644 tools/perf/util/workqueue/Build
> >  create mode 100644 tools/perf/util/workqueue/sparse.h
> >  create mode 100644 tools/perf/util/workqueue/threadpool.c
> >  create mode 100644 tools/perf/util/workqueue/threadpool.h
> >  create mode 100644 tools/perf/util/workqueue/workqueue.c
> >  create mode 100644 tools/perf/util/workqueue/workqueue.h
> >
> > --
> > 2.31.1
> >
>