Start thread in detached state because its management is implemented
via messaging to avoid any scaling issues. Block signals prior thread
start so only main tool thread would be notified on external async
signals during data collection. Thread affinity mask is used to assign
eligible cpus for the thread to run. Wait and sync on thread start using
thread ack pipe.
Signed-off-by: Alexey Bayduraev <[email protected]>
---
tools/perf/builtin-record.c | 106 +++++++++++++++++++++++++++++++++++-
1 file changed, 105 insertions(+), 1 deletion(-)
diff --git a/tools/perf/builtin-record.c b/tools/perf/builtin-record.c
index 838c1f779849..88fad12cbe5b 100644
--- a/tools/perf/builtin-record.c
+++ b/tools/perf/builtin-record.c
@@ -1423,6 +1423,66 @@ static void record__thread_munmap_filtered(struct fdarray *fda, int fd,
perf_mmap__put(map);
}
+static void *record__thread(void *arg)
+{
+ enum thread_msg msg = THREAD_MSG__READY;
+ bool terminate = false;
+ struct fdarray *pollfd;
+ int err, ctlfd_pos;
+
+ thread = arg;
+ thread->tid = syscall(SYS_gettid);
+
+ err = write(thread->pipes.ack[1], &msg, sizeof(msg));
+ if (err == -1)
+ pr_err("threads[%d]: failed to notify on start: %s", thread->tid, strerror(errno));
+
+ pr_debug("threads[%d]: started on cpu=%d\n", thread->tid, sched_getcpu());
+
+ pollfd = &thread->pollfd;
+ ctlfd_pos = thread->ctlfd_pos;
+
+ for (;;) {
+ unsigned long long hits = thread->samples;
+
+ if (record__mmap_read_all(thread->rec, false) < 0 || terminate)
+ break;
+
+ if (hits == thread->samples) {
+
+ err = fdarray__poll(pollfd, -1);
+ /*
+ * Propagate error, only if there's any. Ignore positive
+ * number of returned events and interrupt error.
+ */
+ if (err > 0 || (err < 0 && errno == EINTR))
+ err = 0;
+ thread->waking++;
+
+ if (fdarray__filter(pollfd, POLLERR | POLLHUP,
+ record__thread_munmap_filtered, NULL) == 0)
+ break;
+ }
+
+ if (pollfd->entries[ctlfd_pos].revents & POLLHUP) {
+ terminate = true;
+ close(thread->pipes.msg[0]);
+ pollfd->entries[ctlfd_pos].fd = -1;
+ pollfd->entries[ctlfd_pos].events = 0;
+ }
+
+ pollfd->entries[ctlfd_pos].revents = 0;
+ }
+ record__mmap_read_all(thread->rec, true);
+
+ err = write(thread->pipes.ack[1], &msg, sizeof(msg));
+ if (err == -1)
+ pr_err("threads[%d]: failed to notify on termination: %s",
+ thread->tid, strerror(errno));
+
+ return NULL;
+}
+
static void record__init_features(struct record *rec)
{
struct perf_session *session = rec->session;
@@ -1886,13 +1946,57 @@ static int record__terminate_thread(struct thread_data *thread_data)
static int record__start_threads(struct record *rec)
{
+ int t, tt, ret = 0, nr_threads = rec->nr_threads;
struct thread_data *thread_data = rec->thread_data;
+ sigset_t full, mask;
+ pthread_t handle;
+ pthread_attr_t attrs;
+
+ sigfillset(&full);
+ if (sigprocmask(SIG_SETMASK, &full, &mask)) {
+ pr_err("Failed to block signals on threads start: %s\n", strerror(errno));
+ return -1;
+ }
+
+ pthread_attr_init(&attrs);
+ pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
+
+ for (t = 1; t < nr_threads; t++) {
+ enum thread_msg msg = THREAD_MSG__UNDEFINED;
+
+ pthread_attr_setaffinity_np(&attrs,
+ MMAP_CPU_MASK_BYTES(&(thread_data[t].mask->affinity)),
+ (cpu_set_t *)(thread_data[t].mask->affinity.bits));
+
+ if (pthread_create(&handle, &attrs, record__thread, &thread_data[t])) {
+ for (tt = 1; tt < t; tt++)
+ record__terminate_thread(&thread_data[t]);
+ pr_err("Failed to start threads: %s\n", strerror(errno));
+ ret = -1;
+ goto out_err;
+ }
+
+ if (read(thread_data[t].pipes.ack[0], &msg, sizeof(msg)) > 0)
+ pr_debug2("threads[%d]: sent %s\n", rec->thread_data[t].tid,
+ thread_msg_tags[msg]);
+ }
+
+ if (nr_threads > 1) {
+ sched_setaffinity(0, MMAP_CPU_MASK_BYTES(&thread_data[0].mask->affinity),
+ (cpu_set_t *)thread_data[0].mask->affinity.bits);
+ }
thread = &thread_data[0];
pr_debug("threads[%d]: started on cpu=%d\n", thread->tid, sched_getcpu());
- return 0;
+out_err:
+ if (sigprocmask(SIG_SETMASK, &mask, NULL)) {
+ pr_err("Failed to unblock signals on threads start: %s\n", strerror(errno));
+ ret = -1;
+ }
+
+ return ret;
}
static int record__stop_threads(struct record *rec, unsigned long *waking)
--
2.19.0
Hi,
On Wed, 2021-05-26 at 13:52 +0300, Alexey Bayduraev wrote:
> Start thread in detached state because its management is implemented
> via messaging to avoid any scaling issues. Block signals prior thread
> start so only main tool thread would be notified on external async
> signals during data collection. Thread affinity mask is used to assign
> eligible cpus for the thread to run. Wait and sync on thread start using
> thread ack pipe.
>
> Signed-off-by: Alexey Bayduraev <[email protected]>
> ---
> tools/perf/builtin-record.c | 106 +++++++++++++++++++++++++++++++++++-
> 1 file changed, 105 insertions(+), 1 deletion(-)
>
> diff --git a/tools/perf/builtin-record.c b/tools/perf/builtin-record.c
> index 838c1f779849..88fad12cbe5b 100644
> --- a/tools/perf/builtin-record.c
> +++ b/tools/perf/builtin-record.c
> @@ -1423,6 +1423,66 @@ static void record__thread_munmap_filtered(struct
> fdarray *fda, int fd,
> perf_mmap__put(map);
> }
>
> +static void *record__thread(void *arg)
> +{
> + enum thread_msg msg = THREAD_MSG__READY;
> + bool terminate = false;
> + struct fdarray *pollfd;
> + int err, ctlfd_pos;
> +
> + thread = arg;
> + thread->tid = syscall(SYS_gettid);
> +
> + err = write(thread->pipes.ack[1], &msg, sizeof(msg));
> + if (err == -1)
> + pr_err("threads[%d]: failed to notify on start: %s", thread-
> >tid, strerror(errno));
> +
> + pr_debug("threads[%d]: started on cpu=%d\n", thread->tid,
> sched_getcpu());
> +
> + pollfd = &thread->pollfd;
> + ctlfd_pos = thread->ctlfd_pos;
> +
> + for (;;) {
> + unsigned long long hits = thread->samples;
> +
> + if (record__mmap_read_all(thread->rec, false) < 0 ||
> terminate)
> + break;
> +
> + if (hits == thread->samples) {
> +
> + err = fdarray__poll(pollfd, -1);
> + /*
> + * Propagate error, only if there's any. Ignore
> positive
> + * number of returned events and interrupt error.
> + */
> + if (err > 0 || (err < 0 && errno == EINTR))
> + err = 0;
> + thread->waking++;
> +
> + if (fdarray__filter(pollfd, POLLERR | POLLHUP,
> + record__thread_munmap_filtered,
> NULL) == 0)
> + break;
> + }
> +
> + if (pollfd->entries[ctlfd_pos].revents & POLLHUP) {
> + terminate = true;
> + close(thread->pipes.msg[0]);
> + pollfd->entries[ctlfd_pos].fd = -1;
> + pollfd->entries[ctlfd_pos].events = 0;
> + }
> +
> + pollfd->entries[ctlfd_pos].revents = 0;
> + }
> + record__mmap_read_all(thread->rec, true);
> +
> + err = write(thread->pipes.ack[1], &msg, sizeof(msg));
> + if (err == -1)
> + pr_err("threads[%d]: failed to notify on termination: %s",
> + thread->tid, strerror(errno));
> +
> + return NULL;
> +}
> +
> static void record__init_features(struct record *rec)
> {
> struct perf_session *session = rec->session;
> @@ -1886,13 +1946,57 @@ static int record__terminate_thread(struct thread_data
> *thread_data)
>
> static int record__start_threads(struct record *rec)
> {
> + int t, tt, ret = 0, nr_threads = rec->nr_threads;
> struct thread_data *thread_data = rec->thread_data;
> + sigset_t full, mask;
> + pthread_t handle;
> + pthread_attr_t attrs;
> +
> + sigfillset(&full);
> + if (sigprocmask(SIG_SETMASK, &full, &mask)) {
> + pr_err("Failed to block signals on threads start: %s\n",
> strerror(errno));
> + return -1;
> + }
> +
> + pthread_attr_init(&attrs);
> + pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
> +
> + for (t = 1; t < nr_threads; t++) {
> + enum thread_msg msg = THREAD_MSG__UNDEFINED;
> +
> + pthread_attr_setaffinity_np(&attrs,
> +
> MMAP_CPU_MASK_BYTES(&(thread_data[t].mask->affinity)),
> + (cpu_set_t *)(thread_data[t].mask-
> >affinity.bits));
> +
> + if (pthread_create(&handle, &attrs, record__thread,
> &thread_data[t])) {
> + for (tt = 1; tt < t; tt++)
> + record__terminate_thread(&thread_data[t]);
> + pr_err("Failed to start threads: %s\n",
> strerror(errno));
> + ret = -1;
> + goto out_err;
> + }
> +
> + if (read(thread_data[t].pipes.ack[0], &msg, sizeof(msg)) > 0)
> + pr_debug2("threads[%d]: sent %s\n", rec-
> >thread_data[t].tid,
> + thread_msg_tags[msg]);
> + }
> +
> + if (nr_threads > 1) {
> + sched_setaffinity(0, MMAP_CPU_MASK_BYTES(&thread_data[0].mask-
> >affinity),
> + (cpu_set_t *)thread_data[0].mask-
> >affinity.bits);
> + }
>
> thread = &thread_data[0];
>
> pr_debug("threads[%d]: started on cpu=%d\n", thread->tid,
> sched_getcpu());
>
> - return 0;
> +out_err:
> + if (sigprocmask(SIG_SETMASK, &mask, NULL)) {
> + pr_err("Failed to unblock signals on threads start: %s\n",
> strerror(errno));
> + ret = -1;
> + }
> +
> + return ret;
> }
ASan complains of a memory leak of the attrs, since pthread_attr_destroy is
missing. It could be added just after out_err label.
Thanks,
Riccardo
>
> static int record__stop_threads(struct record *rec, unsigned long *waking)