2021-04-06 17:11:44

by Bayduraev, Alexey V

[permalink] [raw]
Subject: [PATCH v4 00/12] Introduce threaded trace streaming for basic perf record operation


Changes in v4:
- renamed 'comm' structure to 'pipes'
- moved thread fd/maps messages to verbose=2
- fixed leaks during allocation of thread_data structures
- fixed leaks during allocation of thread masks
- fixed possible fails when releasing thread masks

v3: https://lore.kernel.org/lkml/[email protected]/

Changes in v3:
- avoided skipped redundant patch 3/15
- applied "data file" and "data directory" terms allover the patch set
- captured Acked-by: tags by Namhyung Kim
- avoided braces where don't needed
- employed thread local variable for serial trace streaming
- added specs for --thread option - core, socket, numa and user defined
- added parallel loading of data directory files similar to the prototype [1]

v2: https://lore.kernel.org/lkml/[email protected]/

Changes in v2:
- explicitly added credit tags to patches 6/15 and 15/15,
additionally to cites [1], [2]
- updated description of 3/15 to explicitly mention the reason
to open data directories in read access mode (e.g. for perf report)
- implemented fix for compilation error of 2/15
- explicitly elaborated on found issues to be resolved for
threaded AUX trace capture

v1: https://lore.kernel.org/lkml/[email protected]/

Patch set provides parallel threaded trace streaming mode for basic
perf record operation. Provided mode mitigates profiling data losses
and resolves scalability issues of serial and asynchronous (--aio)
trace streaming modes on multicore server systems. The design and
implementation are based on the prototype [1], [2].

Parallel threaded mode executes trace streaming threads that read kernel
data buffers and write captured data into several data files located at
data directory. Layout of trace streaming threads and their mapping to data
buffers to read can be configured using a value of --thread command line
option. Specification value provides masks separated by colon so the masks
define cpus to be monitored by one thread and thread affinity mask is
separated by slash. <cpus mask 1>/<affinity mask 1>:<cpu mask 2>/<affinity mask 2>
specifies parallel threads layout that consists of two threads with
corresponding assigned cpus to be monitored. Specification value can be
a string e.g. "cpu", "core" or "socket" meaning creation of data streaming
thread for monitoring every cpu, whole core or socket. The option provided
with no or empty value defaults to "cpu" layout creating data streaming
thread for every cpu being monitored. Specification masks are filtered
by the mask provided via -C option.

Parallel streaming mode is compatible with Zstd compression/decompression
(--compression-level) and external control commands (--control). The mode
is not enabled for pipe mode. The mode is not enabled for AUX area tracing,
related and derived modes like --snapshot or --aux-sample. --switch-output-*
and --timestamp-filename options are not enabled for parallel streaming.
Initial intent to enable AUX area tracing faced the need to define some
optimal way to store index data in data directory. --switch-output-* and
--timestamp-filename use cases are not clear for data directories.
Asynchronous(--aio) trace streaming and affinity (--affinity) modes are
mutually exclusive to parallel streaming mode.

Basic analysis of data directories is provided in perf report mode.
Raw dump and aggregated reports are available for data directories,
still with no memory consumption optimizations.

Tested:

tools/perf/perf record -o prof.data --threads -- matrix.gcc.g.O3
tools/perf/perf record -o prof.data --threads= -- matrix.gcc.g.O3
tools/perf/perf record -o prof.data --threads=cpu -- matrix.gcc.g.O3
tools/perf/perf record -o prof.data --threads=core -- matrix.gcc.g.O3
tools/perf/perf record -o prof.data --threads=socket -- matrix.gcc.g.O3
tools/perf/perf record -o prof.data --threads=numa -- matrix.gcc.g.O3
tools/perf/perf record -o prof.data --threads=0-3/3:4-7/4 -- matrix.gcc.g.O3
tools/perf/perf record -o prof.data -C 2,5 --threads=0-3/3:4-7/4 -- matrix.gcc.g.O3
tools/perf/perf record -o prof.data -C 3,4 --threads=0-3/3:4-7/4 -- matrix.gcc.g.O3
tools/perf/perf record -o prof.data -C 0,4,2,6 --threads=core -- matrix.gcc.g.O3
tools/perf/perf record -o prof.data -C 0,4,2,6 --threads=numa -- matrix.gcc.g.O3
tools/perf/perf record -o prof.data --threads -g --call-graph dwarf,4096 -- matrix.gcc.g.O3
tools/perf/perf record -o prof.data --threads -g --call-graph dwarf,4096 --compression-level=3 -- matrix.gcc.g.O3
tools/perf/perf record -o prof.data --threads -a
tools/perf/perf record -D -1 -e cpu-cycles -a --control fd:10,11 -- sleep 30
tools/perf/perf record --threads -D -1 -e cpu-cycles -a --control fd:10,11 -- sleep 30

tools/perf/perf report -i prof.data
tools/perf/perf report -i prof.data --call-graph=callee
tools/perf/perf report -i prof.data --stdio --header
tools/perf/perf report -i prof.data -D --header

[1] git clone https://git.kernel.org/pub/scm/linux/kernel/git/jolsa/perf.git -b perf/record_threads
[2] https://lore.kernel.org/lkml/[email protected]/

---
Alexey Bayduraev (12):
perf record: introduce thread affinity and mmap masks
perf record: introduce thread specific data array
perf record: introduce thread local variable
perf record: stop threads in the end of trace streaming
perf record: start threads in the beginning of trace streaming
perf record: introduce data file at mmap buffer object
perf record: init data file at mmap buffer object
perf record: introduce --threads=<spec> command line option
perf record: document parallel data streaming mode
perf report: output data file name in raw trace dump
perf session: load data directory files for analysis
perf session: use reader functions to load perf data file

tools/include/linux/bitmap.h | 11 +
tools/lib/api/fd/array.c | 17 +
tools/lib/api/fd/array.h | 1 +
tools/lib/bitmap.c | 14 +
tools/perf/Documentation/perf-record.txt | 18 +
tools/perf/builtin-inject.c | 3 +-
tools/perf/builtin-record.c | 1027 ++++++++++++++++++++--
tools/perf/util/evlist.c | 16 +
tools/perf/util/evlist.h | 1 +
tools/perf/util/mmap.c | 6 +
tools/perf/util/mmap.h | 6 +
tools/perf/util/ordered-events.h | 1 +
tools/perf/util/record.h | 2 +
tools/perf/util/session.c | 484 +++++++---
tools/perf/util/session.h | 5 +
tools/perf/util/tool.h | 3 +-
16 files changed, 1407 insertions(+), 208 deletions(-)

--
2.19.0



2021-04-06 17:13:00

by Bayduraev, Alexey V

[permalink] [raw]
Subject: [PATCH v4 01/12] perf record: introduce thread affinity and mmap masks


Introduce affinity and mmap thread masks. Thread affinity mask
defines cpus that a thread is allowed to run on. Thread maps
mask defines mmap data buffers the thread serves to stream
profiling data from.

Signed-off-by: Alexey Bayduraev <[email protected]>
---
tools/perf/builtin-record.c | 117 ++++++++++++++++++++++++++++++++++++
1 file changed, 117 insertions(+)

diff --git a/tools/perf/builtin-record.c b/tools/perf/builtin-record.c
index 35465d1db6dd..e0cbf05d255c 100644
--- a/tools/perf/builtin-record.c
+++ b/tools/perf/builtin-record.c
@@ -85,6 +85,11 @@ struct switch_output {
int cur_file;
};

+struct thread_mask {
+ struct mmap_cpu_mask maps;
+ struct mmap_cpu_mask affinity;
+};
+
struct record {
struct perf_tool tool;
struct record_opts opts;
@@ -109,6 +114,8 @@ struct record {
unsigned long long samples;
struct mmap_cpu_mask affinity_mask;
unsigned long output_max_size; /* = 0: unlimited */
+ struct thread_mask *thread_masks;
+ int nr_threads;
};

static volatile int done;
@@ -2176,6 +2183,45 @@ static int record__parse_affinity(const struct option *opt, const char *str, int
return 0;
}

+static int record__mmap_cpu_mask_alloc(struct mmap_cpu_mask *mask, int nr_bits)
+{
+ mask->nbits = nr_bits;
+ mask->bits = bitmap_alloc(mask->nbits);
+ if (!mask->bits) {
+ pr_err("Failed to allocate mmap_cpu mask\n");
+ return -ENOMEM;
+ }
+
+ return 0;
+}
+
+static void record__mmap_cpu_mask_free(struct mmap_cpu_mask *mask)
+{
+ bitmap_free(mask->bits);
+ mask->nbits = 0;
+}
+
+static void record__thread_mask_clear(struct thread_mask *mask)
+{
+ bitmap_zero(mask->maps.bits, mask->maps.nbits);
+ bitmap_zero(mask->affinity.bits, mask->affinity.nbits);
+}
+
+static int record__thread_mask_alloc(struct thread_mask *mask, int nr_bits)
+{
+ if (record__mmap_cpu_mask_alloc(&mask->maps, nr_bits) ||
+ record__mmap_cpu_mask_alloc(&mask->affinity, nr_bits))
+ return -ENOMEM;
+
+ return 0;
+}
+
+static void record__thread_mask_free(struct thread_mask *mask)
+{
+ record__mmap_cpu_mask_free(&mask->maps);
+ record__mmap_cpu_mask_free(&mask->affinity);
+}
+
static int parse_output_max_size(const struct option *opt,
const char *str, int unset)
{
@@ -2611,6 +2657,70 @@ static struct option __record_options[] = {

struct option *record_options = __record_options;

+static void record__mmap_cpu_mask_init(struct mmap_cpu_mask *mask, struct perf_cpu_map *cpus)
+{
+ int c;
+
+ for (c = 0; c < cpus->nr; c++)
+ set_bit(cpus->map[c], mask->bits);
+}
+
+static int record__alloc_thread_masks(struct record *rec, int nr_threads, int nr_bits)
+{
+ int t, ret;
+
+ rec->thread_masks = zalloc(nr_threads * sizeof(*(rec->thread_masks)));
+ if (!rec->thread_masks) {
+ pr_err("Failed to allocate thread masks\n");
+ return -ENOMEM;
+ }
+
+ for (t = 0; t < nr_threads; t++) {
+ ret = record__thread_mask_alloc(&rec->thread_masks[t], nr_bits);
+ if (ret)
+ return ret;
+ record__thread_mask_clear(&rec->thread_masks[t]);
+ }
+
+ return 0;
+}
+static int record__init_thread_default_masks(struct record *rec, struct perf_cpu_map *cpus)
+{
+ int ret;
+
+ ret = record__alloc_thread_masks(rec, 1, cpu__max_cpu());
+ if (ret)
+ return ret;
+
+ record__mmap_cpu_mask_init(&rec->thread_masks->maps, cpus);
+
+ rec->nr_threads = 1;
+
+ return 0;
+}
+
+static int record__init_thread_masks(struct record *rec)
+{
+ struct perf_cpu_map *cpus = rec->evlist->core.cpus;
+
+ return record__init_thread_default_masks(rec, cpus);
+}
+
+static int record__fini_thread_masks(struct record *rec)
+{
+ int t;
+
+ if (rec->thread_masks)
+ for (t = 0; t < rec->nr_threads; t++)
+ record__thread_mask_free(&rec->thread_masks[t]);
+
+ zfree(&rec->thread_masks);
+
+ rec->nr_threads = 0;
+
+ return 0;
+}
+
int cmd_record(int argc, const char **argv)
{
int err;
@@ -2844,6 +2954,12 @@ int cmd_record(int argc, const char **argv)
goto out;
}

+ err = record__init_thread_masks(rec);
+ if (err) {
+ pr_err("record__init_thread_masks failed, error %d\n", err);
+ goto out;
+ }
+
if (rec->opts.nr_cblocks > nr_cblocks_max)
rec->opts.nr_cblocks = nr_cblocks_max;
pr_debug("nr_cblocks: %d\n", rec->opts.nr_cblocks);
@@ -2862,6 +2978,7 @@ int cmd_record(int argc, const char **argv)
symbol__exit();
auxtrace_record__free(rec->itr);
out_opts:
+ record__fini_thread_masks(rec);
evlist__close_control(rec->opts.ctl_fd, rec->opts.ctl_fd_ack, &rec->opts.ctl_fd_close);
return err;
}
--
2.19.0


2021-04-06 17:16:10

by Bayduraev, Alexey V

[permalink] [raw]
Subject: [PATCH v4 03/12] perf record: introduce thread local variable


Introduce thread local variable and use it for threaded trace streaming.
Use thread affinity mask instead or record affinity mask in affinity
modes.
Introduce and use evlist__ctlfd_update() function to propagate external
control commands to global evlist object.

Signed-off-by: Alexey Bayduraev <[email protected]>
---
tools/perf/builtin-record.c | 132 ++++++++++++++++++++++++------------
tools/perf/util/evlist.c | 16 +++++
tools/perf/util/evlist.h | 1 +
3 files changed, 105 insertions(+), 44 deletions(-)

diff --git a/tools/perf/builtin-record.c b/tools/perf/builtin-record.c
index d74fea2d1ca9..ecb6bf33ed85 100644
--- a/tools/perf/builtin-record.c
+++ b/tools/perf/builtin-record.c
@@ -108,6 +108,8 @@ struct thread_data {
unsigned long waking;
};

+static __thread struct thread_data *thread;
+
struct record {
struct perf_tool tool;
struct record_opts opts;
@@ -130,7 +132,6 @@ struct record {
bool timestamp_boundary;
struct switch_output switch_output;
unsigned long long samples;
- struct mmap_cpu_mask affinity_mask;
unsigned long output_max_size; /* = 0: unlimited */
struct thread_mask *thread_masks;
struct thread_data *thread_data;
@@ -565,7 +566,7 @@ static int record__pushfn(struct mmap *map, void *to, void *bf, size_t size)
bf = map->data;
}

- rec->samples++;
+ thread->samples++;
return record__write(rec, map, bf, size);
}

@@ -1250,16 +1251,23 @@ static struct perf_event_header finished_round_event = {

static void record__adjust_affinity(struct record *rec, struct mmap *map)
{
+ int ret = 0;
+
if (rec->opts.affinity != PERF_AFFINITY_SYS &&
- !bitmap_equal(rec->affinity_mask.bits, map->affinity_mask.bits,
- rec->affinity_mask.nbits)) {
- bitmap_zero(rec->affinity_mask.bits, rec->affinity_mask.nbits);
- bitmap_or(rec->affinity_mask.bits, rec->affinity_mask.bits,
- map->affinity_mask.bits, rec->affinity_mask.nbits);
- sched_setaffinity(0, MMAP_CPU_MASK_BYTES(&rec->affinity_mask),
- (cpu_set_t *)rec->affinity_mask.bits);
- if (verbose == 2)
- mmap_cpu_mask__scnprintf(&rec->affinity_mask, "thread");
+ !bitmap_equal(thread->mask->affinity.bits, map->affinity_mask.bits,
+ thread->mask->affinity.nbits)) {
+ bitmap_zero(thread->mask->affinity.bits, thread->mask->affinity.nbits);
+ bitmap_or(thread->mask->affinity.bits, thread->mask->affinity.bits,
+ map->affinity_mask.bits, thread->mask->affinity.nbits);
+ ret = sched_setaffinity(0, MMAP_CPU_MASK_BYTES(&thread->mask->affinity),
+ (cpu_set_t *)thread->mask->affinity.bits);
+ if (ret)
+ pr_err("threads[%d]: sched_setaffinity() call failed: %m\n", thread->tid);
+ if (verbose == 2) {
+ pr_debug("threads[%d]: addr=", thread->tid);
+ mmap_cpu_mask__scnprintf(&thread->mask->affinity, "thread");
+ pr_debug("threads[%d]: on cpu=%d\n", thread->tid, sched_getcpu());
+ }
}
}

@@ -1300,14 +1308,17 @@ static int record__mmap_read_evlist(struct record *rec, struct evlist *evlist,
u64 bytes_written = rec->bytes_written;
int i;
int rc = 0;
- struct mmap *maps;
+ int nr_mmaps;
+ struct mmap **maps;
int trace_fd = rec->data.file.fd;
off_t off = 0;

if (!evlist)
return 0;

- maps = overwrite ? evlist->overwrite_mmap : evlist->mmap;
+ nr_mmaps = thread->nr_mmaps;
+ maps = overwrite ? thread->overwrite_maps : thread->maps;
+
if (!maps)
return 0;

@@ -1317,9 +1328,9 @@ static int record__mmap_read_evlist(struct record *rec, struct evlist *evlist,
if (record__aio_enabled(rec))
off = record__aio_get_pos(trace_fd);

- for (i = 0; i < evlist->core.nr_mmaps; i++) {
+ for (i = 0; i < nr_mmaps; i++) {
u64 flush = 0;
- struct mmap *map = &maps[i];
+ struct mmap *map = maps[i];

if (map->core.base) {
record__adjust_affinity(rec, map);
@@ -1382,6 +1393,15 @@ static int record__mmap_read_all(struct record *rec, bool synch)
return record__mmap_read_evlist(rec, rec->evlist, true, synch);
}

+static void record__thread_munmap_filtered(struct fdarray *fda, int fd,
+ void *arg __maybe_unused)
+{
+ struct perf_mmap *map = fda->priv[fd].ptr;
+
+ if (map)
+ perf_mmap__put(map);
+}
+
static void record__init_features(struct record *rec)
{
struct perf_session *session = rec->session;
@@ -1800,6 +1820,33 @@ static void hit_auxtrace_snapshot_trigger(struct record *rec)
}
}

+static int record__start_threads(struct record *rec)
+{
+ struct thread_data *thread_data = rec->thread_data;
+
+ thread = &thread_data[0];
+
+ pr_debug("threads[%d]: started on cpu=%d\n", thread->tid, sched_getcpu());
+
+ return 0;
+}
+
+static int record__stop_threads(struct record *rec, unsigned long *waking)
+{
+ int t;
+ struct thread_data *thread_data = rec->thread_data;
+
+ for (t = 0; t < rec->nr_threads; t++) {
+ rec->samples += thread_data[t].samples;
+ *waking += thread_data[t].waking;
+ pr_debug("threads[%d]: samples=%lld, wakes=%ld, trasferred=%ld, compressed=%ld\n",
+ thread_data[t].tid, thread_data[t].samples, thread_data[t].waking,
+ rec->session->bytes_transferred, rec->session->bytes_compressed);
+ }
+
+ return 0;
+}
+
static int __cmd_record(struct record *rec, int argc, const char **argv)
{
int err;
@@ -1906,7 +1953,7 @@ static int __cmd_record(struct record *rec, int argc, const char **argv)

if (record__open(rec) != 0) {
err = -1;
- goto out_child;
+ goto out_free_threads;
}
session->header.env.comp_mmap_len = session->evlist->core.mmap_len;

@@ -1914,7 +1961,7 @@ static int __cmd_record(struct record *rec, int argc, const char **argv)
err = record__kcore_copy(&session->machines.host, data);
if (err) {
pr_err("ERROR: Failed to copy kcore\n");
- goto out_child;
+ goto out_free_threads;
}
}

@@ -1925,7 +1972,7 @@ static int __cmd_record(struct record *rec, int argc, const char **argv)
bpf__strerror_apply_obj_config(err, errbuf, sizeof(errbuf));
pr_err("ERROR: Apply config to BPF failed: %s\n",
errbuf);
- goto out_child;
+ goto out_free_threads;
}

/*
@@ -1943,11 +1990,11 @@ static int __cmd_record(struct record *rec, int argc, const char **argv)
if (data->is_pipe) {
err = perf_header__write_pipe(fd);
if (err < 0)
- goto out_child;
+ goto out_free_threads;
} else {
err = perf_session__write_header(session, rec->evlist, fd, false);
if (err < 0)
- goto out_child;
+ goto out_free_threads;
}

err = -1;
@@ -1955,16 +2002,16 @@ static int __cmd_record(struct record *rec, int argc, const char **argv)
&& !perf_header__has_feat(&session->header, HEADER_BUILD_ID)) {
pr_err("Couldn't generate buildids. "
"Use --no-buildid to profile anyway.\n");
- goto out_child;
+ goto out_free_threads;
}

err = record__setup_sb_evlist(rec);
if (err)
- goto out_child;
+ goto out_free_threads;

err = record__synthesize(rec, false);
if (err < 0)
- goto out_child;
+ goto out_free_threads;

if (rec->realtime_prio) {
struct sched_param param;
@@ -1973,10 +2020,13 @@ static int __cmd_record(struct record *rec, int argc, const char **argv)
if (sched_setscheduler(0, SCHED_FIFO, &param)) {
pr_err("Could not set realtime priority.\n");
err = -1;
- goto out_child;
+ goto out_free_threads;
}
}

+ if (record__start_threads(rec))
+ goto out_free_threads;
+
/*
* When perf is starting the traced process, all the events
* (apart from group members) have enable_on_exec=1 set,
@@ -2047,7 +2097,7 @@ static int __cmd_record(struct record *rec, int argc, const char **argv)
trigger_ready(&switch_output_trigger);
perf_hooks__invoke_record_start();
for (;;) {
- unsigned long long hits = rec->samples;
+ unsigned long long hits = thread->samples;

/*
* rec->evlist->bkw_mmap_state is possible to be
@@ -2116,20 +2166,24 @@ static int __cmd_record(struct record *rec, int argc, const char **argv)
alarm(rec->switch_output.time);
}

- if (hits == rec->samples) {
+ if (hits == thread->samples) {
if (done || draining)
break;
- err = evlist__poll(rec->evlist, -1);
+ err = fdarray__poll(&thread->pollfd, -1);
/*
* Propagate error, only if there's any. Ignore positive
* number of returned events and interrupt error.
*/
if (err > 0 || (err < 0 && errno == EINTR))
err = 0;
- waking++;
+ thread->waking++;

- if (evlist__filter_pollfd(rec->evlist, POLLERR | POLLHUP) == 0)
+ if (fdarray__filter(&thread->pollfd, POLLERR | POLLHUP,
+ record__thread_munmap_filtered, NULL) == 0)
draining = true;
+
+ evlist__ctlfd_update(rec->evlist,
+ &thread->pollfd.entries[thread->ctlfd_pos]);
}

if (evlist__ctlfd_process(rec->evlist, &cmd) > 0) {
@@ -2178,18 +2232,20 @@ static int __cmd_record(struct record *rec, int argc, const char **argv)
goto out_child;
}

- if (!quiet)
- fprintf(stderr, "[ perf record: Woken up %ld times to write data ]\n", waking);
-
if (target__none(&rec->opts.target))
record__synthesize_workload(rec, true);

out_child:
+ record__stop_threads(rec, &waking);
+out_free_threads:
record__free_thread_data(rec);
evlist__finalize_ctlfd(rec->evlist);
record__mmap_read_all(rec, true);
record__aio_mmap_read_sync(rec);

+ if (!quiet)
+ fprintf(stderr, "[ perf record: Woken up %ld times to write data ]\n", waking);
+
if (rec->session->bytes_transferred && rec->session->bytes_compressed) {
ratio = (float)rec->session->bytes_transferred/(float)rec->session->bytes_compressed;
session->header.env.comp_ratio = ratio + 0.5;
@@ -3022,17 +3078,6 @@ int cmd_record(int argc, const char **argv)

symbol__init(NULL);

- if (rec->opts.affinity != PERF_AFFINITY_SYS) {
- rec->affinity_mask.nbits = cpu__max_cpu();
- rec->affinity_mask.bits = bitmap_alloc(rec->affinity_mask.nbits);
- if (!rec->affinity_mask.bits) {
- pr_err("Failed to allocate thread mask for %zd cpus\n", rec->affinity_mask.nbits);
- err = -ENOMEM;
- goto out_opts;
- }
- pr_debug2("thread mask[%zd]: empty\n", rec->affinity_mask.nbits);
- }
-
err = record__auxtrace_init(rec);
if (err)
goto out;
@@ -3161,7 +3206,6 @@ int cmd_record(int argc, const char **argv)

err = __cmd_record(&record, argc, argv);
out:
- bitmap_free(rec->affinity_mask.bits);
evlist__delete(rec->evlist);
symbol__exit();
auxtrace_record__free(rec->itr);
diff --git a/tools/perf/util/evlist.c b/tools/perf/util/evlist.c
index f1c79ecf8107..4b55f47cb824 100644
--- a/tools/perf/util/evlist.c
+++ b/tools/perf/util/evlist.c
@@ -2128,6 +2128,22 @@ int evlist__ctlfd_process(struct evlist *evlist, enum evlist_ctl_cmd *cmd)
return err;
}

+int evlist__ctlfd_update(struct evlist *evlist, struct pollfd *update)
+{
+ int ctlfd_pos = evlist->ctl_fd.pos;
+ struct pollfd *entries = evlist->core.pollfd.entries;
+
+ if (!evlist__ctlfd_initialized(evlist))
+ return 0;
+
+ if (entries[ctlfd_pos].fd != update->fd ||
+ entries[ctlfd_pos].events != update->events)
+ return -1;
+
+ entries[ctlfd_pos].revents = update->revents;
+ return 0;
+}
+
struct evsel *evlist__find_evsel(struct evlist *evlist, int idx)
{
struct evsel *evsel;
diff --git a/tools/perf/util/evlist.h b/tools/perf/util/evlist.h
index b695ffaae519..7dda8087833c 100644
--- a/tools/perf/util/evlist.h
+++ b/tools/perf/util/evlist.h
@@ -358,6 +358,7 @@ void evlist__close_control(int ctl_fd, int ctl_fd_ack, bool *ctl_fd_close);
int evlist__initialize_ctlfd(struct evlist *evlist, int ctl_fd, int ctl_fd_ack);
int evlist__finalize_ctlfd(struct evlist *evlist);
bool evlist__ctlfd_initialized(struct evlist *evlist);
+int evlist__ctlfd_update(struct evlist *evlist, struct pollfd *update);
int evlist__ctlfd_process(struct evlist *evlist, enum evlist_ctl_cmd *cmd);
int evlist__ctlfd_ack(struct evlist *evlist);

--
2.19.0


2021-04-06 17:20:44

by Bayduraev, Alexey V

[permalink] [raw]
Subject: [PATCH v4 08/12] perf record: introduce --threads=<spec> command line option


Provide --threads option in perf record command line interface.
The option can have a value in the form of masks that specify
cpus to be monitored with data streaming threads and its layout
in system topology. The masks can be filtered using cpu mask
provided via -C option.

The specification value can be user defined list of masks. Masks
separated by colon define cpus to be monitored by one thread and
affinity mask of that thread is separated by slash. For example:
<cpus mask 1>/<affinity mask 1>:<cpu mask 2>/<affinity mask 2>
specifies parallel threads layout that consists of two threads
with corresponding assigned cpus to be monitored.

The specification value can be a string e.g. "cpu", "core" or
"socket" meaning creation of data streaming thread for every
cpu or core or socket to monitor distinct cpus or cpus grouped
by core or socket.

The option provided with no or empty value defaults to per-cpu
parallel threads layout creating data streaming thread for every
cpu being monitored.

Feature design and implementation are based on prototypes [1], [2].

[1] git clone https://git.kernel.org/pub/scm/linux/kernel/git/jolsa/perf.git -b perf/record_threads
[2] https://lore.kernel.org/lkml/[email protected]/

Suggested-by: Jiri Olsa <[email protected]>
Suggested-by: Namhyung Kim <[email protected]>
Signed-off-by: Alexey Bayduraev <[email protected]>
---
tools/include/linux/bitmap.h | 11 ++
tools/lib/bitmap.c | 14 ++
tools/perf/builtin-record.c | 317 ++++++++++++++++++++++++++++++++++-
tools/perf/util/record.h | 1 +
4 files changed, 341 insertions(+), 2 deletions(-)

diff --git a/tools/include/linux/bitmap.h b/tools/include/linux/bitmap.h
index 477a1cae513f..2eb1d1084543 100644
--- a/tools/include/linux/bitmap.h
+++ b/tools/include/linux/bitmap.h
@@ -18,6 +18,8 @@ int __bitmap_and(unsigned long *dst, const unsigned long *bitmap1,
int __bitmap_equal(const unsigned long *bitmap1,
const unsigned long *bitmap2, unsigned int bits);
void bitmap_clear(unsigned long *map, unsigned int start, int len);
+int __bitmap_intersects(const unsigned long *bitmap1,
+ const unsigned long *bitmap2, unsigned int bits);

#define BITMAP_FIRST_WORD_MASK(start) (~0UL << ((start) & (BITS_PER_LONG - 1)))

@@ -178,4 +180,13 @@ static inline int bitmap_equal(const unsigned long *src1,
return __bitmap_equal(src1, src2, nbits);
}

+static inline int bitmap_intersects(const unsigned long *src1,
+ const unsigned long *src2, unsigned int nbits)
+{
+ if (small_const_nbits(nbits))
+ return ((*src1 & *src2) & BITMAP_LAST_WORD_MASK(nbits)) != 0;
+ else
+ return __bitmap_intersects(src1, src2, nbits);
+}
+
#endif /* _PERF_BITOPS_H */
diff --git a/tools/lib/bitmap.c b/tools/lib/bitmap.c
index 5043747ef6c5..3cc3a5b43bb5 100644
--- a/tools/lib/bitmap.c
+++ b/tools/lib/bitmap.c
@@ -86,3 +86,17 @@ int __bitmap_equal(const unsigned long *bitmap1,

return 1;
}
+
+int __bitmap_intersects(const unsigned long *bitmap1,
+ const unsigned long *bitmap2, unsigned int bits)
+{
+ unsigned int k, lim = bits/BITS_PER_LONG;
+ for (k = 0; k < lim; ++k)
+ if (bitmap1[k] & bitmap2[k])
+ return 1;
+
+ if (bits % BITS_PER_LONG)
+ if ((bitmap1[k] & bitmap2[k]) & BITMAP_LAST_WORD_MASK(bits))
+ return 1;
+ return 0;
+}
diff --git a/tools/perf/builtin-record.c b/tools/perf/builtin-record.c
index c1416d28ac6d..41a22f48037d 100644
--- a/tools/perf/builtin-record.c
+++ b/tools/perf/builtin-record.c
@@ -49,6 +49,7 @@
#include "util/clockid.h"
#include "asm/bug.h"
#include "perf.h"
+#include "cputopo.h"

#include <errno.h>
#include <inttypes.h>
@@ -120,6 +121,20 @@ static const char *thread_msg_tags[THREAD_MSG__MAX] = {
"UNDEFINED", "READY"
};

+enum thread_spec {
+ THREAD_SPEC__UNDEFINED = 0,
+ THREAD_SPEC__CPU,
+ THREAD_SPEC__CORE,
+ THREAD_SPEC__SOCKET,
+ THREAD_SPEC__NUMA,
+ THREAD_SPEC__USER,
+ THREAD_SPEC__MAX,
+};
+
+static const char *thread_spec_tags[THREAD_SPEC__MAX] = {
+ "undefined", "cpu", "core", "socket", "numa", "user"
+};
+
struct record {
struct perf_tool tool;
struct record_opts opts;
@@ -2662,6 +2677,63 @@ static void record__thread_mask_free(struct thread_mask *mask)
record__mmap_cpu_mask_free(&mask->affinity);
}

+static int record__thread_mask_or(struct thread_mask *dest, struct thread_mask *src1,
+ struct thread_mask *src2)
+{
+ if (src1->maps.nbits != src2->maps.nbits || src1->affinity.nbits != src2->affinity.nbits ||
+ dest->maps.nbits != src1->maps.nbits || dest->affinity.nbits != src1->affinity.nbits)
+ return -EINVAL;
+
+ bitmap_or(dest->maps.bits, src1->maps.bits, src2->maps.bits, src1->maps.nbits);
+ bitmap_or(dest->affinity.bits, src1->affinity.bits, src2->affinity.bits, src1->affinity.nbits);
+
+ return 0;
+}
+
+static int record__thread_mask_intersects(struct thread_mask *mask_1, struct thread_mask *mask_2)
+{
+ int res1, res2;
+
+ if (mask_1->maps.nbits != mask_2->maps.nbits || mask_1->affinity.nbits != mask_2->affinity.nbits)
+ return -EINVAL;
+
+ res1 = bitmap_intersects(mask_1->maps.bits, mask_2->maps.bits, mask_1->maps.nbits);
+ res2 = bitmap_intersects(mask_1->affinity.bits, mask_2->affinity.bits, mask_1->affinity.nbits);
+ if (res1 || res2)
+ return 1;
+
+ return 0;
+}
+
+static int record__parse_threads(const struct option *opt, const char *str, int unset)
+{
+ int s;
+ struct record_opts *opts = opt->value;
+
+ if (unset || !str || !strlen(str)) {
+ opts->threads_spec = THREAD_SPEC__CPU;
+ } else {
+ for (s = 1; s < THREAD_SPEC__MAX; s++) {
+ if (s == THREAD_SPEC__USER) {
+ opts->threads_user_spec = strdup(str);
+ opts->threads_spec = THREAD_SPEC__USER;
+ break;
+ }
+ if (!strncasecmp(str, thread_spec_tags[s], strlen(thread_spec_tags[s]))) {
+ opts->threads_spec = s;
+ break;
+ }
+ }
+ }
+
+ pr_debug("threads_spec: %s", thread_spec_tags[opts->threads_spec]);
+ if (opts->threads_spec == THREAD_SPEC__USER)
+ pr_debug("=[%s]", opts->threads_user_spec);
+ pr_debug("\n");
+
+ return 0;
+}
+
static int parse_output_max_size(const struct option *opt,
const char *str, int unset)
{
@@ -3092,6 +3164,9 @@ static struct option __record_options[] = {
"\t\t\t Optionally send control command completion ('ack\\n') to ack-fd descriptor.\n"
"\t\t\t Alternatively, ctl-fifo / ack-fifo will be opened and used as ctl-fd / ack-fd.",
parse_control_option),
+ OPT_CALLBACK_OPTARG(0, "threads", &record.opts, NULL, "spec",
+ "write collected trace data into several data files using parallel threads",
+ record__parse_threads),
OPT_END()
};

@@ -3105,6 +3180,17 @@ static void record__mmap_cpu_mask_init(struct mmap_cpu_mask *mask, struct perf_c
set_bit(cpus->map[c], mask->bits);
}

+static void record__mmap_cpu_mask_init_spec(struct mmap_cpu_mask *mask, char *mask_spec)
+{
+ struct perf_cpu_map *cpus;
+
+ cpus = perf_cpu_map__new(mask_spec);
+ if (cpus) {
+ record__mmap_cpu_mask_init(mask, cpus);
+ perf_cpu_map__put(cpus);
+ }
+}
+
static int record__alloc_thread_masks(struct record *rec, int nr_threads, int nr_bits)
{
int t, ret;
@@ -3124,6 +3210,206 @@ static int record__alloc_thread_masks(struct record *rec, int nr_threads, int nr

return 0;
}
+
+static int record__init_thread_cpu_masks(struct record *rec, struct perf_cpu_map *cpus)
+{
+ int t, ret, nr_cpus = perf_cpu_map__nr(cpus);
+
+ ret = record__alloc_thread_masks(rec, nr_cpus, cpu__max_cpu());
+ if (ret)
+ return ret;
+
+ rec->nr_threads = nr_cpus;
+ pr_debug("threads: nr_threads=%d\n", rec->nr_threads);
+
+ for (t = 0; t < rec->nr_threads; t++) {
+ set_bit(cpus->map[t], rec->thread_masks[t].maps.bits);
+ pr_debug("thread_masks[%d]: maps mask [%d]\n", t, cpus->map[t]);
+ set_bit(cpus->map[t], rec->thread_masks[t].affinity.bits);
+ pr_debug("thread_masks[%d]: affinity mask [%d]\n", t, cpus->map[t]);
+ }
+
+ return 0;
+}
+
+static int record__init_thread_masks_spec(struct record *rec, struct perf_cpu_map *cpus,
+ char **maps_spec, char **affinity_spec, u32 nr_spec)
+{
+ u32 s;
+ int ret, nr_threads = 0;
+ struct mmap_cpu_mask cpus_mask;
+ struct thread_mask thread_mask, full_mask;
+
+ ret = record__mmap_cpu_mask_alloc(&cpus_mask, cpu__max_cpu());
+ if (ret)
+ return ret;
+ record__mmap_cpu_mask_init(&cpus_mask, cpus);
+ ret = record__thread_mask_alloc(&thread_mask, cpu__max_cpu());
+ if (ret)
+ goto out_free_cpu_mask;
+ ret = record__thread_mask_alloc(&full_mask, cpu__max_cpu());
+ if (ret)
+ goto out_free_thread_mask;
+ record__thread_mask_clear(&full_mask);
+
+ for (s = 0; s < nr_spec; s++) {
+ record__thread_mask_clear(&thread_mask);
+
+ record__mmap_cpu_mask_init_spec(&thread_mask.maps, maps_spec[s]);
+ record__mmap_cpu_mask_init_spec(&thread_mask.affinity, affinity_spec[s]);
+
+ if (!bitmap_and(thread_mask.maps.bits, thread_mask.maps.bits,
+ cpus_mask.bits, thread_mask.maps.nbits) ||
+ !bitmap_and(thread_mask.affinity.bits, thread_mask.affinity.bits,
+ cpus_mask.bits, thread_mask.affinity.nbits))
+ continue;
+
+ ret = record__thread_mask_intersects(&thread_mask, &full_mask);
+ if (ret)
+ return ret;
+ record__thread_mask_or(&full_mask, &full_mask, &thread_mask);
+
+ rec->thread_masks = realloc(rec->thread_masks,
+ (nr_threads + 1) * sizeof(struct thread_mask));
+ if (!rec->thread_masks) {
+ pr_err("Failed to allocate thread masks\n");
+ ret = -ENOMEM;
+ goto out_free_full_mask;
+ }
+ rec->thread_masks[nr_threads] = thread_mask;
+ pr_debug("thread_masks[%d]: addr=", nr_threads);
+ mmap_cpu_mask__scnprintf(&rec->thread_masks[nr_threads].maps, "maps");
+ pr_debug("thread_masks[%d]: addr=", nr_threads);
+ mmap_cpu_mask__scnprintf(&rec->thread_masks[nr_threads].affinity, "affinity");
+ nr_threads++;
+ ret = record__thread_mask_alloc(&thread_mask, cpu__max_cpu());
+ if (ret)
+ return ret;
+ }
+
+ rec->nr_threads = nr_threads;
+ pr_debug("threads: nr_threads=%d\n", rec->nr_threads);
+
+out_free_full_mask:
+ record__thread_mask_free(&full_mask);
+out_free_thread_mask:
+ record__thread_mask_free(&thread_mask);
+out_free_cpu_mask:
+ record__mmap_cpu_mask_free(&cpus_mask);
+
+ return 0;
+}
+
+static int record__init_thread_core_masks(struct record *rec, struct perf_cpu_map *cpus)
+{
+ int ret;
+ struct cpu_topology *topo;
+
+ topo = cpu_topology__new();
+ if (!topo)
+ return -EINVAL;
+
+ ret = record__init_thread_masks_spec(rec, cpus, topo->thread_siblings,
+ topo->thread_siblings, topo->thread_sib);
+ cpu_topology__delete(topo);
+
+ return ret;
+}
+
+static int record__init_thread_socket_masks(struct record *rec, struct perf_cpu_map *cpus)
+{
+ int ret;
+ struct cpu_topology *topo;
+
+ topo = cpu_topology__new();
+ if (!topo)
+ return -EINVAL;
+
+ ret = record__init_thread_masks_spec(rec, cpus, topo->core_siblings,
+ topo->core_siblings, topo->core_sib);
+ cpu_topology__delete(topo);
+
+ return ret;
+}
+
+static int record__init_thread_numa_masks(struct record *rec, struct perf_cpu_map *cpus)
+{
+ u32 s;
+ int ret;
+ char **spec;
+ struct numa_topology *topo;
+
+ topo = numa_topology__new();
+ if (!topo)
+ return -EINVAL;
+ spec = zalloc(topo->nr * sizeof(char *));
+ if (!spec) {
+ ret = -ENOMEM;
+ goto out_delete_topo;
+ }
+ for (s = 0; s < topo->nr; s++)
+ spec[s] = topo->nodes[s].cpus;
+
+ ret = record__init_thread_masks_spec(rec, cpus, spec, spec, topo->nr);
+
+ zfree(&spec);
+
+out_delete_topo:
+ numa_topology__delete(topo);
+
+ return ret;
+}
+
+static int record__init_thread_user_masks(struct record *rec, struct perf_cpu_map *cpus)
+{
+ int t, ret;
+ u32 s, nr_spec = 0;
+ char **maps_spec = NULL, **affinity_spec = NULL;
+ char *spec, *spec_ptr, *user_spec, *mask, *mask_ptr;
+
+ for (t = 0, user_spec = (char *)rec->opts.threads_user_spec; ; t++, user_spec = NULL) {
+ spec = strtok_r(user_spec, ":", &spec_ptr);
+ if (spec == NULL)
+ break;
+ pr_debug(" spec[%d]: %s\n", t, spec);
+ mask = strtok_r(spec, "/", &mask_ptr);
+ if (mask == NULL)
+ break;
+ pr_debug(" maps mask: %s\n", mask);
+ maps_spec = realloc(maps_spec, (nr_spec + 1) * sizeof(char *));
+ if (!maps_spec) {
+ pr_err("Failed to realloc maps_spec\n");
+ ret = -ENOMEM;
+ goto out_free_all_specs;
+ }
+ maps_spec[nr_spec] = strdup(mask);
+ mask = strtok_r(NULL, "/", &mask_ptr);
+ if (mask == NULL)
+ break;
+ pr_debug(" affinity mask: %s\n", mask);
+ affinity_spec = realloc(affinity_spec, (nr_spec + 1) * sizeof(char *));
+ if (!maps_spec) {
+ pr_err("Failed to realloc affinity_spec\n");
+ ret = -ENOMEM;
+ goto out_free_all_specs;
+ }
+ affinity_spec[nr_spec] = strdup(mask);
+ nr_spec++;
+ }
+
+ ret = record__init_thread_masks_spec(rec, cpus, maps_spec, affinity_spec, nr_spec);
+
+out_free_all_specs:
+ for (s = 0; s < nr_spec; s++) {
+ free(maps_spec[s]);
+ free(affinity_spec[s]);
+ }
+ free(affinity_spec);
+ free(maps_spec);
+
+ return ret;
+}
+
static int record__init_thread_default_masks(struct record *rec, struct perf_cpu_map *cpus)
{
int ret;
@@ -3141,9 +3427,33 @@ static int record__init_thread_default_masks(struct record *rec, struct perf_cpu

static int record__init_thread_masks(struct record *rec)
{
+ int ret = 0;
struct perf_cpu_map *cpus = rec->evlist->core.cpus;

- return record__init_thread_default_masks(rec, cpus);
+ if (!record__threads_enabled(rec))
+ return record__init_thread_default_masks(rec, cpus);
+
+ switch (rec->opts.threads_spec) {
+ case THREAD_SPEC__CPU:
+ ret = record__init_thread_cpu_masks(rec, cpus);
+ break;
+ case THREAD_SPEC__CORE:
+ ret = record__init_thread_core_masks(rec, cpus);
+ break;
+ case THREAD_SPEC__SOCKET:
+ ret = record__init_thread_socket_masks(rec, cpus);
+ break;
+ case THREAD_SPEC__NUMA:
+ ret = record__init_thread_numa_masks(rec, cpus);
+ break;
+ case THREAD_SPEC__USER:
+ ret = record__init_thread_user_masks(rec, cpus);
+ break;
+ default:
+ break;
+ }
+
+ return ret;
}

static int record__fini_thread_masks(struct record *rec)
@@ -3385,7 +3695,10 @@ int cmd_record(int argc, const char **argv)

err = record__init_thread_masks(rec);
if (err) {
- pr_err("record__init_thread_masks failed, error %d\n", err);
+ if (err > 0)
+ pr_err("ERROR: parallel data streaming masks (--threads) intersect.\n");
+ else
+ pr_err("record__init_thread_masks failed, error %d\n", err);
goto out;
}

diff --git a/tools/perf/util/record.h b/tools/perf/util/record.h
index 4d68b7e27272..3da156498f47 100644
--- a/tools/perf/util/record.h
+++ b/tools/perf/util/record.h
@@ -78,6 +78,7 @@ struct record_opts {
int ctl_fd_ack;
bool ctl_fd_close;
int threads_spec;
+ const char *threads_user_spec;
};

extern const char * const *record_usage;
--
2.19.0


2021-04-06 17:21:47

by Bayduraev, Alexey V

[permalink] [raw]
Subject: [PATCH v4 09/12] perf record: document parallel data streaming mode


Document --threads option syntax and parallel data streaming modes
in Documentation/perf-record.txt. Implement compatibility checks for
other modes and related command line options: asynchronous(--aio)
trace streaming and affinity (--affinity) modes, pipe mode, AUX
area tracing --snapshot and --aux-sample options, --switch-output,
--switch-output-event, --switch-max-files and --timestamp-filename
options. Parallel data streaming is compatible with Zstd compression
(--compression-level) and external control commands (--control).
Cpu mask provided via -C option filters --threads specification masks.

Signed-off-by: Alexey Bayduraev <[email protected]>
---
tools/perf/Documentation/perf-record.txt | 18 ++++++++++
tools/perf/builtin-record.c | 43 ++++++++++++++++++++++--
2 files changed, 58 insertions(+), 3 deletions(-)

diff --git a/tools/perf/Documentation/perf-record.txt b/tools/perf/Documentation/perf-record.txt
index f3161c9673e9..d6f9bc97f060 100644
--- a/tools/perf/Documentation/perf-record.txt
+++ b/tools/perf/Documentation/perf-record.txt
@@ -695,6 +695,24 @@ measurements:
wait -n ${perf_pid}
exit $?

+--threads=<spec>::
+Write collected trace data into several data files using parallel threads.
+<spec> value can be user defined list of masks. Masks separated by colon
+define cpus to be monitored by a thread and affinity mask of that thread
+is separated by slash. For example user specification like the following:
+<cpus mask 1>/<affinity mask 1>:<cpu mask 2>/<affinity mask 2> specifies
+parallel threads layout that consists of two threads with corresponding
+assigned cpus to be monitored. <spec> value can also be a string meaning
+predefined parallel threads layout:
+ cpu - create new data streaming thread for every monitored cpu
+ core - create new thread to monitor cpus grouped by a core
+ socket - create new thread to monitor cpus grouped by a socket
+ numa - create new threed to monitor cpus grouped by a numa domain
+Predefined layouts can be used on systems with large number of cpus in
+order not to spawn multiple per-cpu streaming threads but still avoid LOST
+events in data directory files. Option specified with no or empty value
+defaults to cpu layout. Masks defined or provided by the option value are
+filtered through the mask provided by -C option.

SEE ALSO
--------
diff --git a/tools/perf/builtin-record.c b/tools/perf/builtin-record.c
index 41a22f48037d..23aab359f110 100644
--- a/tools/perf/builtin-record.c
+++ b/tools/perf/builtin-record.c
@@ -798,6 +798,12 @@ static int record__auxtrace_init(struct record *rec)
{
int err;

+ if ((rec->opts.auxtrace_snapshot_opts || rec->opts.auxtrace_sample_opts)
+ && record__threads_enabled(rec)) {
+ pr_err("AUX area tracing options are not available in parallel streaming mode.\n");
+ return -EINVAL;
+ }
+
if (!rec->itr) {
rec->itr = auxtrace_record__init(rec->evlist, &err);
if (err)
@@ -2109,6 +2115,11 @@ static int __cmd_record(struct record *rec, int argc, const char **argv)
return PTR_ERR(session);
}

+ if (record__threads_enabled(rec) && perf_data__is_pipe(&rec->data)) {
+ pr_err("Parallel trace streaming is not available in pipe mode.\n");
+ return -1;
+ }
+
fd = perf_data__fd(data);
rec->session = session;

@@ -2854,12 +2865,22 @@ static int switch_output_setup(struct record *rec)
* --switch-output=signal, as we'll send a SIGUSR2 from the side band
* thread to its parent.
*/
- if (rec->switch_output_event_set)
+ if (rec->switch_output_event_set) {
+ if (record__threads_enabled(rec)) {
+ pr_warning("WARNING: --switch-output-event option is not available in parallel streaming mode.\n");
+ return 0;
+ }
goto do_signal;
+ }

if (!s->set)
return 0;

+ if (record__threads_enabled(rec)) {
+ pr_warning("WARNING: --switch-output option is not available in parallel streaming mode.\n");
+ return 0;
+ }
+
if (!strcmp(s->str, "signal")) {
do_signal:
s->signal = true;
@@ -3144,8 +3165,8 @@ static struct option __record_options[] = {
"Set affinity mask of trace reading thread to NUMA node cpu mask or cpu of processed mmap buffer",
record__parse_affinity),
#ifdef HAVE_ZSTD_SUPPORT
- OPT_CALLBACK_OPTARG('z', "compression-level", &record.opts, &comp_level_default,
- "n", "Compressed records using specified level (default: 1 - fastest compression, 22 - greatest compression)",
+ OPT_CALLBACK_OPTARG('z', "compression-level", &record.opts, &comp_level_default, "n",
+ "Compress records using specified level (default: 1 - fastest compression, 22 - greatest compression)",
record__parse_comp_level),
#endif
OPT_CALLBACK(0, "max-size", &record.output_max_size,
@@ -3543,6 +3564,17 @@ int cmd_record(int argc, const char **argv)
if (rec->opts.kcore || record__threads_enabled(rec))
rec->data.is_dir = true;

+ if (record__threads_enabled(rec)) {
+ if (rec->opts.affinity != PERF_AFFINITY_SYS) {
+ pr_err("--affinity option is mutually exclusive to parallel streaming mode.\n");
+ goto out_opts;
+ }
+ if (record__aio_enabled(rec)) {
+ pr_err("Asynchronous streaming mode (--aio) is mutually exclusive to parallel streaming mode.\n");
+ goto out_opts;
+ }
+ }
+
if (rec->opts.comp_level != 0) {
pr_debug("Compression enabled, disabling build id collection at the end of the session.\n");
rec->no_buildid = true;
@@ -3576,6 +3608,11 @@ int cmd_record(int argc, const char **argv)
}
}

+ if (rec->timestamp_filename && record__threads_enabled(rec)) {
+ rec->timestamp_filename = false;
+ pr_warning("WARNING: --timestamp-filename option is not available in parallel streaming mode.\n");
+ }
+
/*
* Allow aliases to facilitate the lookup of symbols for address
* filters. Refer to auxtrace_parse_filters().
--
2.19.0


2021-04-06 17:47:46

by Bayduraev, Alexey V

[permalink] [raw]
Subject: [PATCH v4 04/12] perf record: stop threads in the end of trace streaming


Signal thread to terminate by closing write fd of msg pipe.
Receive THREAD_MSG__READY message as the confirmation of the
thread's termination. Stop threads created for parallel trace
streaming prior their stats processing.

Signed-off-by: Alexey Bayduraev <[email protected]>
---
tools/perf/builtin-record.c | 30 ++++++++++++++++++++++++++++++
1 file changed, 30 insertions(+)

diff --git a/tools/perf/builtin-record.c b/tools/perf/builtin-record.c
index ecb6bf33ed85..4612314853c1 100644
--- a/tools/perf/builtin-record.c
+++ b/tools/perf/builtin-record.c
@@ -110,6 +110,16 @@ struct thread_data {

static __thread struct thread_data *thread;

+enum thread_msg {
+ THREAD_MSG__UNDEFINED = 0,
+ THREAD_MSG__READY,
+ THREAD_MSG__MAX,
+};
+
+static const char *thread_msg_tags[THREAD_MSG__MAX] = {
+ "UNDEFINED", "READY"
+};
+
struct record {
struct perf_tool tool;
struct record_opts opts;
@@ -1820,6 +1830,23 @@ static void hit_auxtrace_snapshot_trigger(struct record *rec)
}
}

+static int record__terminate_thread(struct thread_data *thread_data)
+{
+ int res;
+ enum thread_msg ack = THREAD_MSG__UNDEFINED;
+ pid_t tid = thread_data->tid;
+
+ close(thread_data->pipes.msg[1]);
+ res = read(thread_data->pipes.ack[0], &ack, sizeof(ack));
+ if (res != -1)
+ pr_debug2("threads[%d]: sent %s\n", tid, thread_msg_tags[ack]);
+ else
+ pr_err("threads[%d]: failed to recv msg=%s from tid=%d\n",
+ thread->tid, thread_msg_tags[ack], tid);
+
+ return 0;
+}
+
static int record__start_threads(struct record *rec)
{
struct thread_data *thread_data = rec->thread_data;
@@ -1836,6 +1863,9 @@ static int record__stop_threads(struct record *rec, unsigned long *waking)
int t;
struct thread_data *thread_data = rec->thread_data;

+ for (t = 1; t < rec->nr_threads; t++)
+ record__terminate_thread(&thread_data[t]);
+
for (t = 0; t < rec->nr_threads; t++) {
rec->samples += thread_data[t].samples;
*waking += thread_data[t].waking;
--
2.19.0


2021-04-06 17:48:14

by Bayduraev, Alexey V

[permalink] [raw]
Subject: [PATCH v4 05/12] perf record: start threads in the beginning of trace streaming


Start thread in detached state because its management is implemented
via messaging to avoid any scaling issues. Block signals prior thread
start so only main tool thread would be notified on external async
signals during data collection. Thread affinity mask is used to assign
eligible cpus for the thread to run. Wait and sync on thread start using
thread ack pipe.

Signed-off-by: Alexey Bayduraev <[email protected]>
---
tools/perf/builtin-record.c | 103 +++++++++++++++++++++++++++++++++++-
1 file changed, 102 insertions(+), 1 deletion(-)

diff --git a/tools/perf/builtin-record.c b/tools/perf/builtin-record.c
index 4612314853c1..339198b2e37d 100644
--- a/tools/perf/builtin-record.c
+++ b/tools/perf/builtin-record.c
@@ -1412,6 +1412,64 @@ static void record__thread_munmap_filtered(struct fdarray *fda, int fd,
perf_mmap__put(map);
}

+static void *record__thread(void *arg)
+{
+ enum thread_msg msg = THREAD_MSG__READY;
+ bool terminate = false;
+ struct fdarray *pollfd;
+ int err, ctlfd_pos;
+
+ thread = arg;
+ thread->tid = syscall(SYS_gettid);
+
+ err = write(thread->pipes.ack[1], &msg, sizeof(msg));
+ if (err == -1)
+ pr_err("threads[%d]: failed to notify on start. Error %m", thread->tid);
+
+ pr_debug("threads[%d]: started on cpu=%d\n", thread->tid, sched_getcpu());
+
+ pollfd = &thread->pollfd;
+ ctlfd_pos = thread->ctlfd_pos;
+
+ for (;;) {
+ unsigned long long hits = thread->samples;
+
+ if (record__mmap_read_all(thread->rec, false) < 0 || terminate)
+ break;
+
+ if (hits == thread->samples) {
+
+ err = fdarray__poll(pollfd, -1);
+ /*
+ * Propagate error, only if there's any. Ignore positive
+ * number of returned events and interrupt error.
+ */
+ if (err > 0 || (err < 0 && errno == EINTR))
+ err = 0;
+ thread->waking++;
+
+ if (fdarray__filter(pollfd, POLLERR | POLLHUP,
+ record__thread_munmap_filtered, NULL) == 0)
+ break;
+ }
+
+ if (pollfd->entries[ctlfd_pos].revents & POLLHUP) {
+ terminate = true;
+ close(thread->pipes.msg[0]);
+ pollfd->entries[ctlfd_pos].fd = -1;
+ pollfd->entries[ctlfd_pos].events = 0;
+ }
+
+ pollfd->entries[ctlfd_pos].revents = 0;
+ }
+
+ err = write(thread->pipes.ack[1], &msg, sizeof(msg));
+ if (err == -1)
+ pr_err("threads[%d]: failed to notify on termination. Error %m", thread->tid);
+
+ return NULL;
+}
+
static void record__init_features(struct record *rec)
{
struct perf_session *session = rec->session;
@@ -1849,13 +1907,56 @@ static int record__terminate_thread(struct thread_data *thread_data)

static int record__start_threads(struct record *rec)
{
+ int t, tt, ret = 0, nr_threads = rec->nr_threads;
struct thread_data *thread_data = rec->thread_data;
+ sigset_t full, mask;
+ pthread_t handle;
+ pthread_attr_t attrs;
+
+ sigfillset(&full);
+ if (sigprocmask(SIG_SETMASK, &full, &mask)) {
+ pr_err("Failed to block signals on threads start. Error: %m\n");
+ return -1;
+ }
+
+ pthread_attr_init(&attrs);
+ pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
+
+ for (t = 1; t < nr_threads; t++) {
+ enum thread_msg msg = THREAD_MSG__UNDEFINED;
+
+ pthread_attr_setaffinity_np(&attrs, MMAP_CPU_MASK_BYTES(&(thread_data[t].mask->affinity)),
+ (cpu_set_t *)(thread_data[t].mask->affinity.bits));
+
+ if (pthread_create(&handle, &attrs, record__thread, &thread_data[t])) {
+ for (tt = 1; tt < t; tt++)
+ record__terminate_thread(&thread_data[t]);
+ pr_err("Failed to start threads. Error: %m\n");
+ ret = -1;
+ goto out_err;
+ }
+
+ if (read(thread_data[t].pipes.ack[0], &msg, sizeof(msg)) > 0)
+ pr_debug2("threads[%d]: sent %s\n", rec->thread_data[t].tid,
+ thread_msg_tags[msg]);
+ }
+
+ if (nr_threads > 1) {
+ sched_setaffinity(0, MMAP_CPU_MASK_BYTES(&thread_data[0].mask->affinity),
+ (cpu_set_t *)thread_data[0].mask->affinity.bits);
+ }

thread = &thread_data[0];

pr_debug("threads[%d]: started on cpu=%d\n", thread->tid, sched_getcpu());

- return 0;
+out_err:
+ if (sigprocmask(SIG_SETMASK, &mask, NULL)) {
+ pr_err("Failed to unblock signals on threads start. Error: %m\n");
+ ret = -1;
+ }
+
+ return ret;
}

static int record__stop_threads(struct record *rec, unsigned long *waking)
--
2.19.0


2021-04-06 17:48:52

by Bayduraev, Alexey V

[permalink] [raw]
Subject: [PATCH v4 10/12] perf report: output data file name in raw trace dump


Print path and name of a data file into raw dump (-D)
<file_offset>@<path/file>. Print offset of PERF_RECORD_COMPRESSED
record instead of zero for decompressed records:
[email protected] [0x30]: event: 9
or
[email protected]/data.7 [0x30]: event: 9

Acked-by: Namhyung Kim <[email protected]>
Signed-off-by: Alexey Bayduraev <[email protected]>
---
tools/perf/builtin-inject.c | 3 +-
tools/perf/util/ordered-events.h | 1 +
tools/perf/util/session.c | 77 +++++++++++++++++++-------------
tools/perf/util/session.h | 1 +
tools/perf/util/tool.h | 3 +-
5 files changed, 52 insertions(+), 33 deletions(-)

diff --git a/tools/perf/builtin-inject.c b/tools/perf/builtin-inject.c
index 6fe44d97fde5..90a75739cda1 100644
--- a/tools/perf/builtin-inject.c
+++ b/tools/perf/builtin-inject.c
@@ -106,7 +106,8 @@ static int perf_event__repipe_op2_synth(struct perf_session *session,

static int perf_event__repipe_op4_synth(struct perf_session *session,
union perf_event *event,
- u64 data __maybe_unused)
+ u64 data __maybe_unused,
+ const char *str __maybe_unused)
{
return perf_event__repipe_synth(session->tool, event);
}
diff --git a/tools/perf/util/ordered-events.h b/tools/perf/util/ordered-events.h
index 75345946c4b9..42c9764c6b5b 100644
--- a/tools/perf/util/ordered-events.h
+++ b/tools/perf/util/ordered-events.h
@@ -9,6 +9,7 @@ struct perf_sample;
struct ordered_event {
u64 timestamp;
u64 file_offset;
+ const char *file_path;
union perf_event *event;
struct list_head list;
};
diff --git a/tools/perf/util/session.c b/tools/perf/util/session.c
index 9a8808507bd9..84f8f29c48a6 100644
--- a/tools/perf/util/session.c
+++ b/tools/perf/util/session.c
@@ -37,7 +37,8 @@

#ifdef HAVE_ZSTD_SUPPORT
static int perf_session__process_compressed_event(struct perf_session *session,
- union perf_event *event, u64 file_offset)
+ union perf_event *event, u64 file_offset,
+ const char *file_path)
{
void *src;
size_t decomp_size, src_size;
@@ -59,6 +60,7 @@ static int perf_session__process_compressed_event(struct perf_session *session,
}

decomp->file_pos = file_offset;
+ decomp->file_path = file_path;
decomp->mmap_len = mmap_len;
decomp->head = 0;

@@ -99,7 +101,8 @@ static int perf_session__process_compressed_event(struct perf_session *session,
static int perf_session__deliver_event(struct perf_session *session,
union perf_event *event,
struct perf_tool *tool,
- u64 file_offset);
+ u64 file_offset,
+ const char *file_path);

static int perf_session__open(struct perf_session *session)
{
@@ -181,7 +184,8 @@ static int ordered_events__deliver_event(struct ordered_events *oe,
ordered_events);

return perf_session__deliver_event(session, event->event,
- session->tool, event->file_offset);
+ session->tool, event->file_offset,
+ event->file_path);
}

struct perf_session *perf_session__new(struct perf_data *data,
@@ -453,7 +457,8 @@ static int process_stat_round_stub(struct perf_session *perf_session __maybe_unu

static int perf_session__process_compressed_event_stub(struct perf_session *session __maybe_unused,
union perf_event *event __maybe_unused,
- u64 file_offset __maybe_unused)
+ u64 file_offset __maybe_unused,
+ const char *file_path __maybe_unused)
{
dump_printf(": unhandled!\n");
return 0;
@@ -1243,13 +1248,14 @@ static void sample_read__printf(struct perf_sample *sample, u64 read_format)
}

static void dump_event(struct evlist *evlist, union perf_event *event,
- u64 file_offset, struct perf_sample *sample)
+ u64 file_offset, struct perf_sample *sample,
+ const char *file_path)
{
if (!dump_trace)
return;

- printf("\n%#" PRIx64 " [%#x]: event: %d\n",
- file_offset, event->header.size, event->header.type);
+ printf("\n%#" PRIx64 "@%s [%#x]: event: %d\n",
+ file_offset, file_path, event->header.size, event->header.type);

trace_event(event);
if (event->header.type == PERF_RECORD_SAMPLE && evlist->trace_event_sample_raw)
@@ -1450,12 +1456,13 @@ static int machines__deliver_event(struct machines *machines,
struct evlist *evlist,
union perf_event *event,
struct perf_sample *sample,
- struct perf_tool *tool, u64 file_offset)
+ struct perf_tool *tool, u64 file_offset,
+ const char *file_path)
{
struct evsel *evsel;
struct machine *machine;

- dump_event(evlist, event, file_offset, sample);
+ dump_event(evlist, event, file_offset, sample, file_path);

evsel = evlist__id2evsel(evlist, sample->id);

@@ -1532,7 +1539,8 @@ static int machines__deliver_event(struct machines *machines,
static int perf_session__deliver_event(struct perf_session *session,
union perf_event *event,
struct perf_tool *tool,
- u64 file_offset)
+ u64 file_offset,
+ const char *file_path)
{
struct perf_sample sample;
int ret = evlist__parse_sample(session->evlist, event, &sample);
@@ -1549,7 +1557,7 @@ static int perf_session__deliver_event(struct perf_session *session,
return 0;

ret = machines__deliver_event(&session->machines, session->evlist,
- event, &sample, tool, file_offset);
+ event, &sample, tool, file_offset, file_path);

if (dump_trace && sample.aux_sample.size)
auxtrace__dump_auxtrace_sample(session, &sample);
@@ -1559,7 +1567,8 @@ static int perf_session__deliver_event(struct perf_session *session,

static s64 perf_session__process_user_event(struct perf_session *session,
union perf_event *event,
- u64 file_offset)
+ u64 file_offset,
+ const char *file_path)
{
struct ordered_events *oe = &session->ordered_events;
struct perf_tool *tool = session->tool;
@@ -1569,7 +1578,7 @@ static s64 perf_session__process_user_event(struct perf_session *session,

if (event->header.type != PERF_RECORD_COMPRESSED ||
tool->compressed == perf_session__process_compressed_event_stub)
- dump_event(session->evlist, event, file_offset, &sample);
+ dump_event(session->evlist, event, file_offset, &sample, file_path);

/* These events are processed right away */
switch (event->header.type) {
@@ -1628,9 +1637,9 @@ static s64 perf_session__process_user_event(struct perf_session *session,
case PERF_RECORD_HEADER_FEATURE:
return tool->feature(session, event);
case PERF_RECORD_COMPRESSED:
- err = tool->compressed(session, event, file_offset);
+ err = tool->compressed(session, event, file_offset, file_path);
if (err)
- dump_event(session->evlist, event, file_offset, &sample);
+ dump_event(session->evlist, event, file_offset, &sample, file_path);
return err;
default:
return -EINVAL;
@@ -1647,9 +1656,9 @@ int perf_session__deliver_synth_event(struct perf_session *session,
events_stats__inc(&evlist->stats, event->header.type);

if (event->header.type >= PERF_RECORD_USER_TYPE_START)
- return perf_session__process_user_event(session, event, 0);
+ return perf_session__process_user_event(session, event, 0, NULL);

- return machines__deliver_event(&session->machines, evlist, event, sample, tool, 0);
+ return machines__deliver_event(&session->machines, evlist, event, sample, tool, 0, NULL);
}

static void event_swap(union perf_event *event, bool sample_id_all)
@@ -1745,7 +1754,8 @@ int perf_session__peek_events(struct perf_session *session, u64 offset,
}

static s64 perf_session__process_event(struct perf_session *session,
- union perf_event *event, u64 file_offset)
+ union perf_event *event, u64 file_offset,
+ const char *file_path)
{
struct evlist *evlist = session->evlist;
struct perf_tool *tool = session->tool;
@@ -1760,7 +1770,7 @@ static s64 perf_session__process_event(struct perf_session *session,
events_stats__inc(&evlist->stats, event->header.type);

if (event->header.type >= PERF_RECORD_USER_TYPE_START)
- return perf_session__process_user_event(session, event, file_offset);
+ return perf_session__process_user_event(session, event, file_offset, file_path);

if (tool->ordered_events) {
u64 timestamp = -1ULL;
@@ -1774,7 +1784,7 @@ static s64 perf_session__process_event(struct perf_session *session,
return ret;
}

- return perf_session__deliver_event(session, event, tool, file_offset);
+ return perf_session__deliver_event(session, event, tool, file_offset, file_path);
}

void perf_event_header__bswap(struct perf_event_header *hdr)
@@ -1994,7 +2004,8 @@ static int __perf_session__process_pipe_events(struct perf_session *session)
}
}

- if ((skip = perf_session__process_event(session, event, head)) < 0) {
+ skip = perf_session__process_event(session, event, head, "pipe");
+ if (skip < 0) {
pr_err("%#" PRIx64 " [%#x]: failed to process type: %d\n",
head, event->header.size, event->header.type);
err = -EINVAL;
@@ -2075,7 +2086,7 @@ fetch_decomp_event(u64 head, size_t mmap_size, char *buf, bool needs_swap)
static int __perf_session__process_decomp_events(struct perf_session *session)
{
s64 skip;
- u64 size, file_pos = 0;
+ u64 size;
struct decomp *decomp = session->decomp_last;

if (!decomp)
@@ -2089,9 +2100,9 @@ static int __perf_session__process_decomp_events(struct perf_session *session)
break;

size = event->header.size;
-
- if (size < sizeof(struct perf_event_header) ||
- (skip = perf_session__process_event(session, event, file_pos)) < 0) {
+ skip = perf_session__process_event(session, event, decomp->file_pos,
+ decomp->file_path);
+ if (size < sizeof(struct perf_event_header) || skip < 0) {
pr_err("%#" PRIx64 " [%#x]: failed to process type: %d\n",
decomp->file_pos + decomp->head, event->header.size, event->header.type);
return -EINVAL;
@@ -2122,10 +2133,12 @@ struct reader;

typedef s64 (*reader_cb_t)(struct perf_session *session,
union perf_event *event,
- u64 file_offset);
+ u64 file_offset,
+ const char *file_path);

struct reader {
int fd;
+ const char *path;
u64 data_size;
u64 data_offset;
reader_cb_t process;
@@ -2204,9 +2217,9 @@ reader__process_events(struct reader *rd, struct perf_session *session,
skip = -EINVAL;

if (size < sizeof(struct perf_event_header) ||
- (skip = rd->process(session, event, file_pos)) < 0) {
- pr_err("%#" PRIx64 " [%#x]: failed to process type: %d [%s]\n",
- file_offset + head, event->header.size,
+ (skip = rd->process(session, event, file_pos, rd->path)) < 0) {
+ pr_err("%#" PRIx64 " [%s] [%#x]: failed to process type: %d [%s]\n",
+ file_offset + head, rd->path, event->header.size,
event->header.type, strerror(-skip));
err = skip;
goto out;
@@ -2236,9 +2249,10 @@ reader__process_events(struct reader *rd, struct perf_session *session,

static s64 process_simple(struct perf_session *session,
union perf_event *event,
- u64 file_offset)
+ u64 file_offset,
+ const char *file_path)
{
- return perf_session__process_event(session, event, file_offset);
+ return perf_session__process_event(session, event, file_offset, file_path);
}

static int __perf_session__process_events(struct perf_session *session)
@@ -2248,6 +2262,7 @@ static int __perf_session__process_events(struct perf_session *session)
.data_size = session->header.data_size,
.data_offset = session->header.data_offset,
.process = process_simple,
+ .path = session->data->file.path,
};
struct ordered_events *oe = &session->ordered_events;
struct perf_tool *tool = session->tool;
diff --git a/tools/perf/util/session.h b/tools/perf/util/session.h
index f76480166d38..378ffc3e2809 100644
--- a/tools/perf/util/session.h
+++ b/tools/perf/util/session.h
@@ -46,6 +46,7 @@ struct perf_session {
struct decomp {
struct decomp *next;
u64 file_pos;
+ const char *file_path;
size_t mmap_len;
u64 head;
size_t size;
diff --git a/tools/perf/util/tool.h b/tools/perf/util/tool.h
index bbbc0dcd461f..c966531d3eca 100644
--- a/tools/perf/util/tool.h
+++ b/tools/perf/util/tool.h
@@ -28,7 +28,8 @@ typedef int (*event_attr_op)(struct perf_tool *tool,

typedef int (*event_op2)(struct perf_session *session, union perf_event *event);
typedef s64 (*event_op3)(struct perf_session *session, union perf_event *event);
-typedef int (*event_op4)(struct perf_session *session, union perf_event *event, u64 data);
+typedef int (*event_op4)(struct perf_session *session, union perf_event *event, u64 data,
+ const char *str);

typedef int (*event_oe)(struct perf_tool *tool, union perf_event *event,
struct ordered_events *oe);
--
2.19.0


2021-04-06 17:48:58

by Bayduraev, Alexey V

[permalink] [raw]
Subject: [PATCH v4 11/12] perf session: load data directory files for analysis


Introduce decompressor into trace reader object so that decompression
could be executed on per data file basis separately for every data
file located in data directory.

Load data directory files and provide basic raw dump and aggregated
analysis support of data directories in report mode, still with no
memory consumption optimizations.

Design and implementation are based on the prototype [1], [2].

[1] git clone https://git.kernel.org/pub/scm/linux/kernel/git/jolsa/perf.git -b perf/record_threads
[2] https://lore.kernel.org/lkml/[email protected]/

Suggested-by: Jiri Olsa <[email protected]>
Signed-off-by: Alexey Bayduraev <[email protected]>
---
tools/perf/util/session.c | 350 +++++++++++++++++++++++++++++++++-----
tools/perf/util/session.h | 4 +
2 files changed, 315 insertions(+), 39 deletions(-)

diff --git a/tools/perf/util/session.c b/tools/perf/util/session.c
index 84f8f29c48a6..26fffadfd6ef 100644
--- a/tools/perf/util/session.c
+++ b/tools/perf/util/session.c
@@ -35,6 +35,55 @@
#include "units.h"
#include <internal/lib.h>

+struct reader;
+
+typedef s64 (*reader_cb_t)(struct perf_session *session,
+ union perf_event *event,
+ u64 file_offset,
+ const char *file_path);
+
+/*
+ * On 64bit we can mmap the data file in one go. No need for tiny mmap
+ * slices. On 32bit we use 32MB.
+ */
+#if BITS_PER_LONG == 64
+#define MMAP_SIZE ULLONG_MAX
+#define NUM_MMAPS 1
+#else
+#define MMAP_SIZE (32 * 1024 * 1024ULL)
+#define NUM_MMAPS 128
+#endif
+
+struct reader_state {
+ char *mmaps[NUM_MMAPS];
+ size_t mmap_size;
+ int mmap_idx;
+ char *mmap_cur;
+ u64 file_pos;
+ u64 file_offset;
+ u64 data_size;
+ u64 head;
+ bool eof;
+ u64 size;
+};
+
+enum {
+ READER_EOF = 0,
+ READER_OK = 1,
+};
+
+struct reader {
+ int fd;
+ const char *path;
+ u64 data_size;
+ u64 data_offset;
+ reader_cb_t process;
+ struct zstd_data zstd_data;
+ struct decomp *decomp;
+ struct decomp *decomp_last;
+ struct reader_state state;
+};
+
#ifdef HAVE_ZSTD_SUPPORT
static int perf_session__process_compressed_event(struct perf_session *session,
union perf_event *event, u64 file_offset,
@@ -44,7 +93,10 @@ static int perf_session__process_compressed_event(struct perf_session *session,
size_t decomp_size, src_size;
u64 decomp_last_rem = 0;
size_t mmap_len, decomp_len = session->header.env.comp_mmap_len;
- struct decomp *decomp, *decomp_last = session->decomp_last;
+ struct decomp *decomp, *decomp_last = session->active_reader ?
+ session->active_reader->decomp_last : session->decomp_last;
+ struct zstd_data *zstd_data = session->active_reader ?
+ &session->active_reader->zstd_data: &session->zstd_data;

if (decomp_last) {
decomp_last_rem = decomp_last->size - decomp_last->head;
@@ -72,7 +124,7 @@ static int perf_session__process_compressed_event(struct perf_session *session,
src = (void *)event + sizeof(struct perf_record_compressed);
src_size = event->pack.header.size - sizeof(struct perf_record_compressed);

- decomp_size = zstd_decompress_stream(&(session->zstd_data), src, src_size,
+ decomp_size = zstd_decompress_stream(zstd_data, src, src_size,
&(decomp->data[decomp_last_rem]), decomp_len - decomp_last_rem);
if (!decomp_size) {
munmap(decomp, mmap_len);
@@ -82,12 +134,22 @@ static int perf_session__process_compressed_event(struct perf_session *session,

decomp->size += decomp_size;

- if (session->decomp == NULL) {
- session->decomp = decomp;
- session->decomp_last = decomp;
+ if (session->active_reader) {
+ if (session->active_reader->decomp == NULL) {
+ session->active_reader->decomp = decomp;
+ session->active_reader->decomp_last = decomp;
+ } else {
+ session->active_reader->decomp_last->next = decomp;
+ session->active_reader->decomp_last = decomp;
+ }
} else {
- session->decomp_last->next = decomp;
- session->decomp_last = decomp;
+ if (session->decomp == NULL) {
+ session->decomp = decomp;
+ session->decomp_last = decomp;
+ } else {
+ session->decomp_last->next = decomp;
+ session->decomp_last = decomp;
+ }
}

pr_debug("decomp (B): %zd to %zd\n", src_size, decomp_size);
@@ -278,11 +340,10 @@ static void perf_session__delete_threads(struct perf_session *session)
machine__delete_threads(&session->machines.host);
}

-static void perf_session__release_decomp_events(struct perf_session *session)
+static void perf_decomp__release_events(struct decomp *next)
{
- struct decomp *next, *decomp;
+ struct decomp *decomp;
size_t mmap_len;
- next = session->decomp;
do {
decomp = next;
if (decomp == NULL)
@@ -295,13 +356,21 @@ static void perf_session__release_decomp_events(struct perf_session *session)

void perf_session__delete(struct perf_session *session)
{
+ int r;
+
if (session == NULL)
return;
auxtrace__free(session);
auxtrace_index__free(&session->auxtrace_index);
perf_session__destroy_kernel_maps(session);
perf_session__delete_threads(session);
- perf_session__release_decomp_events(session);
+ if (session->readers) {
+ for (r = 0; r < session->nr_readers; r++)
+ perf_decomp__release_events(session->readers[r].decomp);
+ zfree(&session->readers);
+ session->nr_readers = 0;
+ }
+ perf_decomp__release_events(session->decomp);
perf_env__exit(&session->header.env);
machines__exit(&session->machines);
if (session->data)
@@ -2087,7 +2156,8 @@ static int __perf_session__process_decomp_events(struct perf_session *session)
{
s64 skip;
u64 size;
- struct decomp *decomp = session->decomp_last;
+ struct decomp *decomp = session->active_reader ?
+ session->active_reader->decomp_last : session->decomp_last;

if (!decomp)
return 0;
@@ -2117,33 +2187,6 @@ static int __perf_session__process_decomp_events(struct perf_session *session)
return 0;
}

-/*
- * On 64bit we can mmap the data file in one go. No need for tiny mmap
- * slices. On 32bit we use 32MB.
- */
-#if BITS_PER_LONG == 64
-#define MMAP_SIZE ULLONG_MAX
-#define NUM_MMAPS 1
-#else
-#define MMAP_SIZE (32 * 1024 * 1024ULL)
-#define NUM_MMAPS 128
-#endif
-
-struct reader;
-
-typedef s64 (*reader_cb_t)(struct perf_session *session,
- union perf_event *event,
- u64 file_offset,
- const char *file_path);
-
-struct reader {
- int fd;
- const char *path;
- u64 data_size;
- u64 data_offset;
- reader_cb_t process;
-};
-
static int
reader__process_events(struct reader *rd, struct perf_session *session,
struct ui_progress *prog)
@@ -2301,6 +2344,232 @@ static int __perf_session__process_events(struct perf_session *session)
return err;
}

+static int
+reader__init(struct reader *rd, bool *one_mmap)
+{
+ struct reader_state *st = &rd->state;
+ char **mmaps = st->mmaps;
+
+ pr_debug("reader processing %s\n", rd->path);
+
+ st->head = rd->data_offset;
+
+ st->data_size = rd->data_size + rd->data_offset;
+
+ st->mmap_size = MMAP_SIZE;
+ if (st->mmap_size > st->data_size) {
+ st->mmap_size = st->data_size;
+ if (one_mmap)
+ *one_mmap = true;
+ }
+
+ memset(mmaps, 0, sizeof(st->mmaps));
+
+ if (zstd_init(&rd->zstd_data, 0))
+ return -1;
+
+ return 0;
+}
+
+static int
+reader__mmap(struct reader *rd, struct perf_session *session)
+{
+ struct reader_state *st = &rd->state;
+ int mmap_prot, mmap_flags;
+ char *buf, **mmaps = st->mmaps;
+ u64 page_offset;
+
+ if (st->file_pos >= st->data_size) {
+ st->eof = true;
+ return READER_EOF;
+ }
+
+ mmap_prot = PROT_READ;
+ mmap_flags = MAP_SHARED;
+
+ if (session->header.needs_swap) {
+ mmap_prot |= PROT_WRITE;
+ mmap_flags = MAP_PRIVATE;
+ }
+
+ if (mmaps[st->mmap_idx]) {
+ munmap(mmaps[st->mmap_idx], st->mmap_size);
+ mmaps[st->mmap_idx] = NULL;
+ }
+
+ page_offset = page_size * (st->head / page_size);
+ st->file_offset += page_offset;
+ st->head -= page_offset;
+
+ buf = mmap(NULL, st->mmap_size, mmap_prot, mmap_flags, rd->fd,
+ st->file_offset);
+ if (buf == MAP_FAILED) {
+ pr_err("failed to mmap file\n");
+ return -errno;
+ }
+ mmaps[st->mmap_idx] = st->mmap_cur = buf;
+ st->mmap_idx = (st->mmap_idx + 1) & (ARRAY_SIZE(st->mmaps) - 1);
+ st->file_pos = st->file_offset + st->head;
+ return READER_OK;
+}
+
+static int
+reader__read_event(struct reader *rd, struct perf_session *session,
+ struct ui_progress *prog)
+{
+ struct reader_state *st = &rd->state;
+ union perf_event *event;
+ int ret = READER_OK;
+ u64 size;
+ s64 skip;
+
+ event = fetch_mmaped_event(st->head, st->mmap_size, st->mmap_cur, session->header.needs_swap);
+ if (IS_ERR(event))
+ return PTR_ERR(event);
+
+ if (!event)
+ return READER_EOF;
+
+ session->active_reader = rd;
+ size = event->header.size;
+ skip = -EINVAL;
+
+ if (size < sizeof(struct perf_event_header) ||
+ (skip = perf_session__process_event(session, event, st->file_pos, rd->path)) < 0) {
+ pr_err("%#" PRIx64 " [%s] [%#x]: failed to process type: %d [%s]\n",
+ st->file_offset + st->head, rd->path, event->header.size,
+ event->header.type, strerror(-skip));
+ ret = skip;
+ goto out;
+ }
+
+ if (skip)
+ size += skip;
+
+ st->size += size;
+ st->head += size;
+ st->file_pos += size;
+
+ skip = __perf_session__process_decomp_events(session);
+ if (skip)
+ ret = skip;
+
+ ui_progress__update(prog, size);
+
+out:
+ session->active_reader = NULL;;
+ return ret;
+}
+/*
+ * This function reads, merge and process directory data.
+ * It assumens the version 1 of directory data, where each
+ * data file holds per-cpu data, already sorted by kernel.
+ */
+static int __perf_session__process_dir_events(struct perf_session *session)
+{
+ struct perf_data *data = session->data;
+ struct perf_tool *tool = session->tool;
+ int i, ret = 0, readers = 1;
+ struct ui_progress prog;
+ u64 total_size = perf_data__size(session->data);
+ struct reader *rd;
+
+ perf_tool__fill_defaults(tool);
+
+ ui_progress__init_size(&prog, total_size, "Sorting events...");
+
+ for (i = 0; i < data->dir.nr; i++) {
+ if (data->dir.files[i].size)
+ readers++;
+ }
+
+ rd = session->readers = zalloc(readers * sizeof(struct reader));
+ if (!rd)
+ return -ENOMEM;
+ session->nr_readers = readers;
+ readers = 0;
+
+ rd[readers] = (struct reader) {
+ .fd = perf_data__fd(session->data),
+ .path = session->data->file.path,
+ .data_size = session->header.data_size,
+ .data_offset = session->header.data_offset,
+ };
+ reader__init(&rd[readers], &session->one_mmap);
+ if (reader__mmap(&rd[readers], session) != READER_OK)
+ goto out_err;
+ readers++;
+
+ for (i = 0; i < data->dir.nr; i++) {
+ if (data->dir.files[i].size) {
+ rd[readers] = (struct reader) {
+ .fd = data->dir.files[i].fd,
+ .path = data->dir.files[i].path,
+ .data_size = data->dir.files[i].size,
+ .data_offset = 0,
+ };
+ reader__init(&rd[readers], &session->one_mmap);
+ if (reader__mmap(&rd[readers], session) != READER_OK)
+ goto out_err;
+ readers++;
+ }
+ }
+
+ i = 0;
+
+ while ((ret >= 0) && readers) {
+ if (session_done())
+ return 0;
+
+ if (rd[i].state.eof) {
+ i = (i + 1) % session->nr_readers;
+ continue;
+ }
+
+ ret = reader__read_event(&rd[i], session, &prog);
+ if (ret < 0)
+ break;
+ if (ret == READER_EOF) {
+ ret = reader__mmap(&rd[i], session);
+ if (ret < 0)
+ goto out_err;
+ if (ret == READER_EOF)
+ readers--;
+ }
+
+ /*
+ * Processing 10MBs of data from each reader in sequence,
+ * because that's the way the ordered events sorting works
+ * most efficiently.
+ */
+ if (rd[i].state.size >= 10*1024*1024) {
+ rd[i].state.size = 0;
+ i = (i + 1) % session->nr_readers;
+ }
+ }
+
+ ret = ordered_events__flush(&session->ordered_events, OE_FLUSH__FINAL);
+ if (ret)
+ goto out_err;
+
+ ret = perf_session__flush_thread_stacks(session);
+out_err:
+ ui_progress__finish();
+
+ if (!tool->no_warn)
+ perf_session__warn_about_errors(session);
+
+ /*
+ * We may switching perf.data output, make ordered_events
+ * reusable.
+ */
+ ordered_events__reinit(&session->ordered_events);
+
+ session->one_mmap = false;
+
+ return ret;
+}
+
int perf_session__process_events(struct perf_session *session)
{
if (perf_session__register_idle_thread(session) < 0)
@@ -2309,6 +2578,9 @@ int perf_session__process_events(struct perf_session *session)
if (perf_data__is_pipe(session->data))
return __perf_session__process_pipe_events(session);

+ if (perf_data__is_dir(session->data))
+ return __perf_session__process_dir_events(session);
+
return __perf_session__process_events(session);
}

diff --git a/tools/perf/util/session.h b/tools/perf/util/session.h
index 378ffc3e2809..cbc54615d155 100644
--- a/tools/perf/util/session.h
+++ b/tools/perf/util/session.h
@@ -19,6 +19,7 @@ struct thread;

struct auxtrace;
struct itrace_synth_opts;
+struct reader;

struct perf_session {
struct perf_header header;
@@ -41,6 +42,9 @@ struct perf_session {
struct zstd_data zstd_data;
struct decomp *decomp;
struct decomp *decomp_last;
+ struct reader *readers;
+ int nr_readers;
+ struct reader *active_reader;
};

struct decomp {
--
2.19.0


2021-04-06 17:49:26

by Bayduraev, Alexey V

[permalink] [raw]
Subject: [PATCH v4 12/12] perf session: use reader functions to load perf data file


Use the reader functions to load data file similar to loading of
data directory files.

Signed-off-by: Alexey Bayduraev <[email protected]>
---
tools/perf/util/session.c | 215 ++++++++++++--------------------------
1 file changed, 66 insertions(+), 149 deletions(-)

diff --git a/tools/perf/util/session.c b/tools/perf/util/session.c
index 26fffadfd6ef..e60b0212f64e 100644
--- a/tools/perf/util/session.c
+++ b/tools/perf/util/session.c
@@ -2187,109 +2187,6 @@ static int __perf_session__process_decomp_events(struct perf_session *session)
return 0;
}

-static int
-reader__process_events(struct reader *rd, struct perf_session *session,
- struct ui_progress *prog)
-{
- u64 data_size = rd->data_size;
- u64 head, page_offset, file_offset, file_pos, size;
- int err = 0, mmap_prot, mmap_flags, map_idx = 0;
- size_t mmap_size;
- char *buf, *mmaps[NUM_MMAPS];
- union perf_event *event;
- s64 skip;
-
- page_offset = page_size * (rd->data_offset / page_size);
- file_offset = page_offset;
- head = rd->data_offset - page_offset;
-
- ui_progress__init_size(prog, data_size, "Processing events...");
-
- data_size += rd->data_offset;
-
- mmap_size = MMAP_SIZE;
- if (mmap_size > data_size) {
- mmap_size = data_size;
- session->one_mmap = true;
- }
-
- memset(mmaps, 0, sizeof(mmaps));
-
- mmap_prot = PROT_READ;
- mmap_flags = MAP_SHARED;
-
- if (session->header.needs_swap) {
- mmap_prot |= PROT_WRITE;
- mmap_flags = MAP_PRIVATE;
- }
-remap:
- buf = mmap(NULL, mmap_size, mmap_prot, mmap_flags, rd->fd,
- file_offset);
- if (buf == MAP_FAILED) {
- pr_err("failed to mmap file\n");
- err = -errno;
- goto out;
- }
- mmaps[map_idx] = buf;
- map_idx = (map_idx + 1) & (ARRAY_SIZE(mmaps) - 1);
- file_pos = file_offset + head;
- if (session->one_mmap) {
- session->one_mmap_addr = buf;
- session->one_mmap_offset = file_offset;
- }
-
-more:
- event = fetch_mmaped_event(head, mmap_size, buf, session->header.needs_swap);
- if (IS_ERR(event))
- return PTR_ERR(event);
-
- if (!event) {
- if (mmaps[map_idx]) {
- munmap(mmaps[map_idx], mmap_size);
- mmaps[map_idx] = NULL;
- }
-
- page_offset = page_size * (head / page_size);
- file_offset += page_offset;
- head -= page_offset;
- goto remap;
- }
-
- size = event->header.size;
-
- skip = -EINVAL;
-
- if (size < sizeof(struct perf_event_header) ||
- (skip = rd->process(session, event, file_pos, rd->path)) < 0) {
- pr_err("%#" PRIx64 " [%s] [%#x]: failed to process type: %d [%s]\n",
- file_offset + head, rd->path, event->header.size,
- event->header.type, strerror(-skip));
- err = skip;
- goto out;
- }
-
- if (skip)
- size += skip;
-
- head += size;
- file_pos += size;
-
- err = __perf_session__process_decomp_events(session);
- if (err)
- goto out;
-
- ui_progress__update(prog, size);
-
- if (session_done())
- goto out;
-
- if (file_pos < data_size)
- goto more;
-
-out:
- return err;
-}
-
static s64 process_simple(struct perf_session *session,
union perf_event *event,
u64 file_offset,
@@ -2298,52 +2195,6 @@ static s64 process_simple(struct perf_session *session,
return perf_session__process_event(session, event, file_offset, file_path);
}

-static int __perf_session__process_events(struct perf_session *session)
-{
- struct reader rd = {
- .fd = perf_data__fd(session->data),
- .data_size = session->header.data_size,
- .data_offset = session->header.data_offset,
- .process = process_simple,
- .path = session->data->file.path,
- };
- struct ordered_events *oe = &session->ordered_events;
- struct perf_tool *tool = session->tool;
- struct ui_progress prog;
- int err;
-
- perf_tool__fill_defaults(tool);
-
- if (rd.data_size == 0)
- return -1;
-
- ui_progress__init_size(&prog, rd.data_size, "Processing events...");
-
- err = reader__process_events(&rd, session, &prog);
- if (err)
- goto out_err;
- /* do the final flush for ordered samples */
- err = ordered_events__flush(oe, OE_FLUSH__FINAL);
- if (err)
- goto out_err;
- err = auxtrace__flush_events(session, tool);
- if (err)
- goto out_err;
- err = perf_session__flush_thread_stacks(session);
-out_err:
- ui_progress__finish();
- if (!tool->no_warn)
- perf_session__warn_about_errors(session);
- /*
- * We may switching perf.data output, make ordered_events
- * reusable.
- */
- ordered_events__reinit(&session->ordered_events);
- auxtrace__free_events(session);
- session->one_mmap = false;
- return err;
-}
-
static int
reader__init(struct reader *rd, bool *one_mmap)
{
@@ -2460,6 +2311,72 @@ reader__read_event(struct reader *rd, struct perf_session *session,
session->active_reader = NULL;;
return ret;
}
+
+static int __perf_session__process_events(struct perf_session *session)
+{
+ struct reader *rd;
+ struct ordered_events *oe = &session->ordered_events;
+ struct perf_tool *tool = session->tool;
+ struct ui_progress prog;
+ int err;
+
+ perf_tool__fill_defaults(tool);
+
+ rd = session->readers = zalloc(sizeof(struct reader));
+ if (!rd)
+ return -ENOMEM;
+
+ session->nr_readers = 1;
+
+ *rd = (struct reader) {
+ .fd = perf_data__fd(session->data),
+ .data_size = session->header.data_size,
+ .data_offset = session->header.data_offset,
+ .process = process_simple,
+ .path = session->data->file.path,
+ };
+
+ ui_progress__init_size(&prog, rd->data_size, "Processing events...");
+
+ reader__init(rd, &session->one_mmap);
+ if (reader__mmap(rd, session) != READER_OK)
+ goto out_err;
+
+ while (true) {
+ if (session_done())
+ break;
+
+ err = reader__read_event(rd, session, &prog);
+ if (err < 0)
+ break;
+ if (err == READER_EOF) {
+ err = reader__mmap(rd, session);
+ if (err <= 0)
+ break;
+ }
+ }
+
+ /* do the final flush for ordered samples */
+ err = ordered_events__flush(oe, OE_FLUSH__FINAL);
+ if (err)
+ goto out_err;
+ err = auxtrace__flush_events(session, tool);
+ if (err)
+ goto out_err;
+ err = perf_session__flush_thread_stacks(session);
+out_err:
+ ui_progress__finish();
+ if (!tool->no_warn)
+ perf_session__warn_about_errors(session);
+ /*
+ * We may switching perf.data output, make ordered_events
+ * reusable.
+ */
+ ordered_events__reinit(&session->ordered_events);
+ auxtrace__free_events(session);
+ session->one_mmap = false;
+ return err;
+}
/*
* This function reads, merge and process directory data.
* It assumens the version 1 of directory data, where each
--
2.19.0


2021-04-08 14:22:47

by Andi Kleen

[permalink] [raw]
Subject: Re: [PATCH v4 05/12] perf record: start threads in the beginning of trace streaming

> + err = write(thread->pipes.ack[1], &msg, sizeof(msg));
> + if (err == -1)
> + pr_err("threads[%d]: failed to notify on start. Error %m", thread->tid);

It might be safer to not use %m. I'm not sure if all the non glibc
libcs that people use support it.

2021-04-08 14:27:39

by Andi Kleen

[permalink] [raw]
Subject: Re: [PATCH v4 12/12] perf session: use reader functions to load perf data file


Except where I commented, for the series

Acked-by: Andi Kleen <[email protected]>

-Andi

2021-04-08 14:29:50

by Andi Kleen

[permalink] [raw]
Subject: Re: [PATCH v4 09/12] perf record: document parallel data streaming mode

> +--threads=<spec>::
> +Write collected trace data into several data files using parallel threads.
> +<spec> value can be user defined list of masks. Masks separated by colon
> +define cpus to be monitored by a thread and affinity mask of that thread
> +is separated by slash. For example user specification like the following:
> +<cpus mask 1>/<affinity mask 1>:<cpu mask 2>/<affinity mask 2> specifies

You need to be more clear on the exact syntax of a mask. Ideally
some full examples too.

> +parallel threads layout that consists of two threads with corresponding
> +assigned cpus to be monitored. <spec> value can also be a string meaning
> +predefined parallel threads layout:
> + cpu - create new data streaming thread for every monitored cpu
> + core - create new thread to monitor cpus grouped by a core
> + socket - create new thread to monitor cpus grouped by a socket
> + numa - create new threed to monitor cpus grouped by a numa domain
> +Predefined layouts can be used on systems with large number of cpus in
> +order not to spawn multiple per-cpu streaming threads but still avoid LOST
> +events in data directory files. Option specified with no or empty value
> +defaults to cpu layout. Masks defined or provided by the option value are
> +filtered through the mask provided by -C option.
>
>
>

2021-04-08 21:54:06

by Jiri Olsa

[permalink] [raw]
Subject: Re: [PATCH v4 00/12] Introduce threaded trace streaming for basic perf record operation

On Tue, Apr 06, 2021 at 11:37:26AM +0300, Bayduraev, Alexey V wrote:
>
> Changes in v4:
> - renamed 'comm' structure to 'pipes'
> - moved thread fd/maps messages to verbose=2
> - fixed leaks during allocation of thread_data structures
> - fixed leaks during allocation of thread masks
> - fixed possible fails when releasing thread masks
>
> v3: https://lore.kernel.org/lkml/[email protected]/

hi,
I recall there was some issue wrt threading and intel_pt,
which we either need to fixed or we need to disable threads
for it

[root@krava perf]# ./perf record -e intel_pt// --threads=cpu
^C[ perf record: Woken up 121 times to write data ]
Warning:
AUX data lost 95 times out of 206!

[ perf record: Captured and wrote 211.364 MB perf.data ]

[root@krava perf]# ./perf script
Segmentation fault (core dumped)

the fix should already be in the perf/record_threads branch,

jirka

2021-04-08 21:54:18

by Jiri Olsa

[permalink] [raw]
Subject: Re: [PATCH v4 08/12] perf record: introduce --threads=<spec> command line option

On Tue, Apr 06, 2021 at 11:49:06AM +0300, Bayduraev, Alexey V wrote:

SNIP

> Suggested-by: Jiri Olsa <[email protected]>
> Suggested-by: Namhyung Kim <[email protected]>
> Signed-off-by: Alexey Bayduraev <[email protected]>
> ---
> tools/include/linux/bitmap.h | 11 ++
> tools/lib/bitmap.c | 14 ++
> tools/perf/builtin-record.c | 317 ++++++++++++++++++++++++++++++++++-
> tools/perf/util/record.h | 1 +
> 4 files changed, 341 insertions(+), 2 deletions(-)
>
> diff --git a/tools/include/linux/bitmap.h b/tools/include/linux/bitmap.h
> index 477a1cae513f..2eb1d1084543 100644
> --- a/tools/include/linux/bitmap.h
> +++ b/tools/include/linux/bitmap.h
> @@ -18,6 +18,8 @@ int __bitmap_and(unsigned long *dst, const unsigned long *bitmap1,
> int __bitmap_equal(const unsigned long *bitmap1,
> const unsigned long *bitmap2, unsigned int bits);
> void bitmap_clear(unsigned long *map, unsigned int start, int len);
> +int __bitmap_intersects(const unsigned long *bitmap1,
> + const unsigned long *bitmap2, unsigned int bits);
>
> #define BITMAP_FIRST_WORD_MASK(start) (~0UL << ((start) & (BITS_PER_LONG - 1)))
>
> @@ -178,4 +180,13 @@ static inline int bitmap_equal(const unsigned long *src1,
> return __bitmap_equal(src1, src2, nbits);
> }
>
> +static inline int bitmap_intersects(const unsigned long *src1,
> + const unsigned long *src2, unsigned int nbits)
> +{
> + if (small_const_nbits(nbits))
> + return ((*src1 & *src2) & BITMAP_LAST_WORD_MASK(nbits)) != 0;
> + else
> + return __bitmap_intersects(src1, src2, nbits);
> +}
> +
> #endif /* _PERF_BITOPS_H */
> diff --git a/tools/lib/bitmap.c b/tools/lib/bitmap.c
> index 5043747ef6c5..3cc3a5b43bb5 100644
> --- a/tools/lib/bitmap.c
> +++ b/tools/lib/bitmap.c
> @@ -86,3 +86,17 @@ int __bitmap_equal(const unsigned long *bitmap1,
>
> return 1;
> }
> +
> +int __bitmap_intersects(const unsigned long *bitmap1,
> + const unsigned long *bitmap2, unsigned int bits)
> +{
> + unsigned int k, lim = bits/BITS_PER_LONG;
> + for (k = 0; k < lim; ++k)
> + if (bitmap1[k] & bitmap2[k])
> + return 1;
> +
> + if (bits % BITS_PER_LONG)
> + if ((bitmap1[k] & bitmap2[k]) & BITMAP_LAST_WORD_MASK(bits))
> + return 1;
> + return 0;
> +}

please move __bitmap_intersects function to the separate patch

jirka

2021-04-08 21:54:56

by Jiri Olsa

[permalink] [raw]
Subject: Re: [PATCH v4 11/12] perf session: load data directory files for analysis

On Tue, Apr 06, 2021 at 11:52:32AM +0300, Bayduraev, Alexey V wrote:
>
> Introduce decompressor into trace reader object so that decompression
> could be executed on per data file basis separately for every data
> file located in data directory.
>
> Load data directory files and provide basic raw dump and aggregated
> analysis support of data directories in report mode, still with no
> memory consumption optimizations.
>
> Design and implementation are based on the prototype [1], [2].
>
> [1] git clone https://git.kernel.org/pub/scm/linux/kernel/git/jolsa/perf.git -b perf/record_threads
> [2] https://lore.kernel.org/lkml/[email protected]/
>
> Suggested-by: Jiri Olsa <[email protected]>
> Signed-off-by: Alexey Bayduraev <[email protected]>

hi,
this and following patch seem to squash several changes from
perf/record_threads:

21289ea28f2f perf session: Add reader return codes
44d19279d0e9 perf session: Add eof flag to reader state
490cece1ff49 perf session: Add reader__read_event function
ff9d91751367 perf session: Move head/file_offset computation into reader__mmap function
75a6af52ecae perf session: Move unmap into reader__mmap function
e1fc76225518 perf session: Add reader__mmap function
f7f0473104cd perf session: Add reader__init function
262c08cb7332 perf session: Move head in reader_state
fd91847190f0 perf session: Move data_size in reader_state
dcb49769bdde perf session: Move file_offset in reader_state
69c7f9557e83 perf session: Move file_pos in reader_state
c5c5c03f09f9 perf session: Add mmap_cur to reader_state
faf7b7176f2a perf session: Move mmap_idx in reader_state
42504d97a18f perf session: Move mmap_size in reader_state
7bf7ccdabc84 perf session: Move mmaps in reader_state
fc1eb45de565 perf session: Add path to reader object

making this a really big change of important code, which can't
be easily reviewed.. that's why I split it in the first place ;-)

I think we need to bring this code first with incremental changes

jirka

2021-04-09 10:53:27

by Bayduraev, Alexey V

[permalink] [raw]
Subject: Re: [PATCH v4 00/12] Introduce threaded trace streaming for basic perf record operation



On 09.04.2021 0:52, Jiri Olsa wrote:
> On Tue, Apr 06, 2021 at 11:37:26AM +0300, Bayduraev, Alexey V wrote:
>>
>> Changes in v4:
>> - renamed 'comm' structure to 'pipes'
>> - moved thread fd/maps messages to verbose=2
>> - fixed leaks during allocation of thread_data structures
>> - fixed leaks during allocation of thread masks
>> - fixed possible fails when releasing thread masks
>>
>> v3: https://lore.kernel.org/lkml/[email protected]/
>
> hi,
> I recall there was some issue wrt threading and intel_pt,
> which we either need to fixed or we need to disable threads
> for it
>
> [root@krava perf]# ./perf record -e intel_pt// --threads=cpu
> ^C[ perf record: Woken up 121 times to write data ]
> Warning:
> AUX data lost 95 times out of 206!
>
> [ perf record: Captured and wrote 211.364 MB perf.data ]
>
> [root@krava perf]# ./perf script
> Segmentation fault (core dumped)
>
> the fix should already be in the perf/record_threads branch,

Thanks,

As I can see, the fix from perf/record_threads is partially here,
except changes in util/auxtrace.c and setting one_mmap_addr/offset.
I will fix this.

I also try to refactor patches 11 and 12.

Regards,
Alexey

>
> jirka
>

2021-04-13 12:27:48

by Namhyung Kim

[permalink] [raw]
Subject: Re: [PATCH v4 08/12] perf record: introduce --threads=<spec> command line option

Hello,

On Tue, Apr 6, 2021 at 5:49 PM Bayduraev, Alexey V
<[email protected]> wrote:
>
>
> Provide --threads option in perf record command line interface.
> The option can have a value in the form of masks that specify
> cpus to be monitored with data streaming threads and its layout
> in system topology. The masks can be filtered using cpu mask
> provided via -C option.
>
> The specification value can be user defined list of masks. Masks
> separated by colon define cpus to be monitored by one thread and
> affinity mask of that thread is separated by slash. For example:
> <cpus mask 1>/<affinity mask 1>:<cpu mask 2>/<affinity mask 2>
> specifies parallel threads layout that consists of two threads
> with corresponding assigned cpus to be monitored.
>
> The specification value can be a string e.g. "cpu", "core" or
> "socket" meaning creation of data streaming thread for every
> cpu or core or socket to monitor distinct cpus or cpus grouped
> by core or socket.
>
> The option provided with no or empty value defaults to per-cpu
> parallel threads layout creating data streaming thread for every
> cpu being monitored.
>
> Feature design and implementation are based on prototypes [1], [2].
>
> [1] git clone https://git.kernel.org/pub/scm/linux/kernel/git/jolsa/perf.git -b perf/record_threads
> [2] https://lore.kernel.org/lkml/[email protected]/
>
> Suggested-by: Jiri Olsa <[email protected]>
> Suggested-by: Namhyung Kim <[email protected]>
> Signed-off-by: Alexey Bayduraev <[email protected]>
> ---
[SNIP]
> +static int record__init_thread_masks_spec(struct record *rec, struct perf_cpu_map *cpus,
> + char **maps_spec, char **affinity_spec, u32 nr_spec)
> +{
> + u32 s;
> + int ret, nr_threads = 0;
> + struct mmap_cpu_mask cpus_mask;
> + struct thread_mask thread_mask, full_mask;
> +
> + ret = record__mmap_cpu_mask_alloc(&cpus_mask, cpu__max_cpu());
> + if (ret)
> + return ret;
> + record__mmap_cpu_mask_init(&cpus_mask, cpus);
> + ret = record__thread_mask_alloc(&thread_mask, cpu__max_cpu());
> + if (ret)
> + goto out_free_cpu_mask;
> + ret = record__thread_mask_alloc(&full_mask, cpu__max_cpu());
> + if (ret)
> + goto out_free_thread_mask;
> + record__thread_mask_clear(&full_mask);
> +
> + for (s = 0; s < nr_spec; s++) {
> + record__thread_mask_clear(&thread_mask);
> +
> + record__mmap_cpu_mask_init_spec(&thread_mask.maps, maps_spec[s]);
> + record__mmap_cpu_mask_init_spec(&thread_mask.affinity, affinity_spec[s]);
> +
> + if (!bitmap_and(thread_mask.maps.bits, thread_mask.maps.bits,
> + cpus_mask.bits, thread_mask.maps.nbits) ||
> + !bitmap_and(thread_mask.affinity.bits, thread_mask.affinity.bits,
> + cpus_mask.bits, thread_mask.affinity.nbits))
> + continue;
> +
> + ret = record__thread_mask_intersects(&thread_mask, &full_mask);
> + if (ret)
> + return ret;

I think you should free other masks.

> + record__thread_mask_or(&full_mask, &full_mask, &thread_mask);
> +
> + rec->thread_masks = realloc(rec->thread_masks,
> + (nr_threads + 1) * sizeof(struct thread_mask));
> + if (!rec->thread_masks) {
> + pr_err("Failed to allocate thread masks\n");
> + ret = -ENOMEM;
> + goto out_free_full_mask;

But this will leak rec->thread_masks as it's overwritten.


> + }
> + rec->thread_masks[nr_threads] = thread_mask;
> + pr_debug("thread_masks[%d]: addr=", nr_threads);
> + mmap_cpu_mask__scnprintf(&rec->thread_masks[nr_threads].maps, "maps");
> + pr_debug("thread_masks[%d]: addr=", nr_threads);
> + mmap_cpu_mask__scnprintf(&rec->thread_masks[nr_threads].affinity, "affinity");
> + nr_threads++;
> + ret = record__thread_mask_alloc(&thread_mask, cpu__max_cpu());
> + if (ret)
> + return ret;

Ditto, use goto.

> + }
> +
> + rec->nr_threads = nr_threads;
> + pr_debug("threads: nr_threads=%d\n", rec->nr_threads);
> +
> +out_free_full_mask:
> + record__thread_mask_free(&full_mask);
> +out_free_thread_mask:
> + record__thread_mask_free(&thread_mask);
> +out_free_cpu_mask:
> + record__mmap_cpu_mask_free(&cpus_mask);
> +
> + return 0;
> +}

[SNIP]
> +
> +static int record__init_thread_user_masks(struct record *rec, struct perf_cpu_map *cpus)
> +{
> + int t, ret;
> + u32 s, nr_spec = 0;
> + char **maps_spec = NULL, **affinity_spec = NULL;
> + char *spec, *spec_ptr, *user_spec, *mask, *mask_ptr;
> +
> + for (t = 0, user_spec = (char *)rec->opts.threads_user_spec; ; t++, user_spec = NULL) {
> + spec = strtok_r(user_spec, ":", &spec_ptr);
> + if (spec == NULL)
> + break;
> + pr_debug(" spec[%d]: %s\n", t, spec);
> + mask = strtok_r(spec, "/", &mask_ptr);
> + if (mask == NULL)
> + break;
> + pr_debug(" maps mask: %s\n", mask);
> + maps_spec = realloc(maps_spec, (nr_spec + 1) * sizeof(char *));
> + if (!maps_spec) {
> + pr_err("Failed to realloc maps_spec\n");
> + ret = -ENOMEM;
> + goto out_free_all_specs;

It'd crash as maps_spec is NULL now.

> + }
> + maps_spec[nr_spec] = strdup(mask);

You'd better check the return value.

> + mask = strtok_r(NULL, "/", &mask_ptr);
> + if (mask == NULL)
> + break;
> + pr_debug(" affinity mask: %s\n", mask);
> + affinity_spec = realloc(affinity_spec, (nr_spec + 1) * sizeof(char *));
> + if (!maps_spec) {

s/maps/affinity/ and it has the same problem.

> + pr_err("Failed to realloc affinity_spec\n");
> + ret = -ENOMEM;
> + goto out_free_all_specs;
> + }
> + affinity_spec[nr_spec] = strdup(mask);

Check the return value.

Thanks,
Namhyung

> + nr_spec++;
> + }
> +
> + ret = record__init_thread_masks_spec(rec, cpus, maps_spec, affinity_spec, nr_spec);
> +
> +out_free_all_specs:
> + for (s = 0; s < nr_spec; s++) {
> + free(maps_spec[s]);
> + free(affinity_spec[s]);
> + }
> + free(affinity_spec);
> + free(maps_spec);
> +
> + return ret;
> +}
> +