2024-01-29 14:28:22

by Vincent Donnefort

[permalink] [raw]
Subject: [PATCH v13 0/6] Introducing trace buffer mapping by user-space

The tracing ring-buffers can be stored on disk or sent to network
without any copy via splice. However the later doesn't allow real time
processing of the traces. A solution is to give userspace direct access
to the ring-buffer pages via a mapping. An application can now become a
consumer of the ring-buffer, in a similar fashion to what trace_pipe
offers.

Support for this new feature can already be found in libtracefs from
version 1.8, when built with EXTRA_CFLAGS=-DFORCE_MMAP_ENABLE.

Vincent

v12 -> v13:
* Swap subbufs_{touched,lost} for Reserved fields.
* Add a flag field in the meta-page.
* Fix CONFIG_TRACER_MAX_TRACE.
* Rebase on top of trace/urgent. (29142dc92c37d3259a33aef15b03e6ee25b0d188)
* Add a comment for try_unregister_trigger()

v11 -> v12:
* Fix code sample mmap bug.
* Add logging in sample code.
* Reset tracer in selftest.
* Add a refcount for the snapshot users.
* Prevent mapping when there are snapshot users and vice versa.
* Refine the meta-page.
* Fix types in the meta-page.
* Collect Reviewed-by.

v10 -> v11:
* Add Documentation and code sample.
* Add a selftest.
* Move all the update to the meta-page into a single
rb_update_meta_page().
* rb_update_meta_page() is now called from
ring_buffer_map_get_reader() to fix NOBLOCK callers.
* kerneldoc for struct trace_meta_page.
* Add a patch to zero all the ring-buffer allocations.

v9 -> v10:
* Refactor rb_update_meta_page()
* In-loop declaration for foreach_subbuf_page()
* Check for cpu_buffer->mapped overflow

v8 -> v9:
* Fix the unlock path in ring_buffer_map()
* Fix cpu_buffer cast with rb_work_rq->is_cpu_buffer
* Rebase on linux-trace/for-next (3cb3091138ca0921c4569bcf7ffa062519639b6a)

v7 -> v8:
* Drop the subbufs renaming into bpages
* Use subbuf as a name when relevant

v6 -> v7:
* Rebase onto lore.kernel.org/lkml/[email protected]/
* Support for subbufs
* Rename subbufs into bpages

v5 -> v6:
* Rebase on next-20230802.
* (unsigned long) -> (void *) cast for virt_to_page().
* Add a wait for the GET_READER_PAGE ioctl.
* Move writer fields update (overrun/pages_lost/entries/pages_touched)
in the irq_work.
* Rearrange id in struct buffer_page.
* Rearrange the meta-page.
* ring_buffer_meta_page -> trace_buffer_meta_page.
* Add meta_struct_len into the meta-page.

v4 -> v5:
* Trivial rebase onto 6.5-rc3 (previously 6.4-rc3)

v3 -> v4:
* Add to the meta-page:
- pages_lost / pages_read (allow to compute how full is the
ring-buffer)
- read (allow to compute how many entries can be read)
- A reader_page struct.
* Rename ring_buffer_meta_header -> ring_buffer_meta
* Rename ring_buffer_get_reader_page -> ring_buffer_map_get_reader_page
* Properly consume events on ring_buffer_map_get_reader_page() with
rb_advance_reader().

v2 -> v3:
* Remove data page list (for non-consuming read)
** Implies removing order > 0 meta-page
* Add a new meta page field ->read
* Rename ring_buffer_meta_page_header into ring_buffer_meta_header

v1 -> v2:
* Hide data_pages from the userspace struct
* Fix META_PAGE_MAX_PAGES
* Support for order > 0 meta-page
* Add missing page->mapping.

Vincent Donnefort (6):
ring-buffer: Zero ring-buffer sub-buffers
ring-buffer: Introducing ring-buffer mapping functions
tracing: Add snapshot refcount
tracing: Allow user-space mapping of the ring-buffer
Documentation: tracing: Add ring-buffer mapping
ring-buffer/selftest: Add ring-buffer mapping test

Documentation/trace/index.rst | 1 +
Documentation/trace/ring-buffer-map.rst | 104 ++++++
include/linux/ring_buffer.h | 7 +
include/uapi/linux/trace_mmap.h | 45 +++
kernel/trace/ring_buffer.c | 335 +++++++++++++++++-
kernel/trace/trace.c | 210 ++++++++++-
kernel/trace/trace.h | 6 +
kernel/trace/trace_events_trigger.c | 58 ++-
tools/testing/selftests/ring-buffer/Makefile | 8 +
tools/testing/selftests/ring-buffer/config | 1 +
.../testing/selftests/ring-buffer/map_test.c | 183 ++++++++++
11 files changed, 918 insertions(+), 40 deletions(-)
create mode 100644 Documentation/trace/ring-buffer-map.rst
create mode 100644 include/uapi/linux/trace_mmap.h
create mode 100644 tools/testing/selftests/ring-buffer/Makefile
create mode 100644 tools/testing/selftests/ring-buffer/config
create mode 100644 tools/testing/selftests/ring-buffer/map_test.c


base-commit: 29142dc92c37d3259a33aef15b03e6ee25b0d188
--
2.43.0.429.g432eaa2c6b-goog



2024-01-29 14:28:54

by Vincent Donnefort

[permalink] [raw]
Subject: [PATCH v13 2/6] ring-buffer: Introducing ring-buffer mapping functions

In preparation for allowing the user-space to map a ring-buffer, add
a set of mapping functions:

ring_buffer_{map,unmap}()
ring_buffer_map_fault()

And controls on the ring-buffer:

ring_buffer_map_get_reader() /* swap reader and head */

Mapping the ring-buffer also involves:

A unique ID for each subbuf of the ring-buffer, currently they are
only identified through their in-kernel VA.

A meta-page, where are stored ring-buffer statistics and a
description for the current reader

The linear mapping exposes the meta-page, and each subbuf of the
ring-buffer, ordered following their unique ID, assigned during the
first mapping.

Once mapped, no subbuf can get in or out of the ring-buffer: the buffer
size will remain unmodified and the splice enabling functions will in
reality simply memcpy the data instead of swapping subbufs.

Signed-off-by: Vincent Donnefort <[email protected]>

diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h
index fa802db216f9..0841ba8bab14 100644
--- a/include/linux/ring_buffer.h
+++ b/include/linux/ring_buffer.h
@@ -6,6 +6,8 @@
#include <linux/seq_file.h>
#include <linux/poll.h>

+#include <uapi/linux/trace_mmap.h>
+
struct trace_buffer;
struct ring_buffer_iter;

@@ -221,4 +223,9 @@ int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node);
#define trace_rb_cpu_prepare NULL
#endif

+int ring_buffer_map(struct trace_buffer *buffer, int cpu);
+int ring_buffer_unmap(struct trace_buffer *buffer, int cpu);
+struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu,
+ unsigned long pgoff);
+int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu);
#endif /* _LINUX_RING_BUFFER_H */
diff --git a/include/uapi/linux/trace_mmap.h b/include/uapi/linux/trace_mmap.h
new file mode 100644
index 000000000000..d4bb67430719
--- /dev/null
+++ b/include/uapi/linux/trace_mmap.h
@@ -0,0 +1,43 @@
+/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */
+#ifndef _TRACE_MMAP_H_
+#define _TRACE_MMAP_H_
+
+#include <linux/types.h>
+
+/**
+ * struct trace_buffer_meta - Ring-buffer Meta-page description
+ * @meta_page_size: Size of this meta-page.
+ * @meta_struct_len: Size of this structure.
+ * @subbuf_size: Size of each subbuf, including the header.
+ * @nr_subbufs: Number of subbfs in the ring-buffer.
+ * @reader.lost_events: Number of events lost at the time of the reader swap.
+ * @reader.id: subbuf ID of the current reader. From 0 to @nr_subbufs - 1
+ * @reader.read: Number of bytes read on the reader subbuf.
+ * @entries: Number of entries in the ring-buffer.
+ * @overrun: Number of entries lost in the ring-buffer.
+ * @read: Number of entries that have been read.
+ */
+struct trace_buffer_meta {
+ __u32 meta_page_size;
+ __u32 meta_struct_len;
+
+ __u32 subbuf_size;
+ __u32 nr_subbufs;
+
+ struct {
+ __u64 lost_events;
+ __u32 id;
+ __u32 read;
+ } reader;
+
+ __u64 flags;
+
+ __u64 entries;
+ __u64 overrun;
+ __u64 read;
+
+ __u64 Reserved1;
+ __u64 Reserved2;
+};
+
+#endif /* _TRACE_MMAP_H_ */
diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index 8179e0a8984e..081065e76d4a 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -338,6 +338,7 @@ struct buffer_page {
local_t entries; /* entries on this page */
unsigned long real_end; /* real end of data */
unsigned order; /* order of the page */
+ u32 id; /* ID for external mapping */
struct buffer_data_page *page; /* Actual data page */
};

@@ -484,6 +485,12 @@ struct ring_buffer_per_cpu {
u64 read_stamp;
/* pages removed since last reset */
unsigned long pages_removed;
+
+ int mapped;
+ struct mutex mapping_lock;
+ unsigned long *subbuf_ids; /* ID to addr */
+ struct trace_buffer_meta *meta_page;
+
/* ring buffer pages to update, > 0 to add, < 0 to remove */
long nr_pages_to_update;
struct list_head new_pages; /* new pages to add */
@@ -1548,6 +1555,7 @@ rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)
init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
init_waitqueue_head(&cpu_buffer->irq_work.waiters);
init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
+ mutex_init(&cpu_buffer->mapping_lock);

bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
GFP_KERNEL, cpu_to_node(cpu));
@@ -5160,6 +5168,19 @@ static void rb_clear_buffer_page(struct buffer_page *page)
page->read = 0;
}

+static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
+{
+ struct trace_buffer_meta *meta = cpu_buffer->meta_page;
+
+ WRITE_ONCE(meta->reader.read, cpu_buffer->reader_page->read);
+ WRITE_ONCE(meta->reader.id, cpu_buffer->reader_page->id);
+ WRITE_ONCE(meta->reader.lost_events, cpu_buffer->lost_events);
+
+ WRITE_ONCE(meta->entries, local_read(&cpu_buffer->entries));
+ WRITE_ONCE(meta->overrun, local_read(&cpu_buffer->overrun));
+ WRITE_ONCE(meta->read, cpu_buffer->read);
+}
+
static void
rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
{
@@ -5204,6 +5225,9 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
cpu_buffer->lost_events = 0;
cpu_buffer->last_overrun = 0;

+ if (cpu_buffer->mapped)
+ rb_update_meta_page(cpu_buffer);
+
rb_head_page_activate(cpu_buffer);
cpu_buffer->pages_removed = 0;
}
@@ -5418,6 +5442,11 @@ int ring_buffer_swap_cpu(struct trace_buffer *buffer_a,
cpu_buffer_a = buffer_a->buffers[cpu];
cpu_buffer_b = buffer_b->buffers[cpu];

+ if (READ_ONCE(cpu_buffer_a->mapped) || READ_ONCE(cpu_buffer_b->mapped)) {
+ ret = -EBUSY;
+ goto out;
+ }
+
/* At least make sure the two buffers are somewhat the same */
if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
goto out;
@@ -5682,7 +5711,8 @@ int ring_buffer_read_page(struct trace_buffer *buffer,
* Otherwise, we can simply swap the page with the one passed in.
*/
if (read || (len < (commit - read)) ||
- cpu_buffer->reader_page == cpu_buffer->commit_page) {
+ cpu_buffer->reader_page == cpu_buffer->commit_page ||
+ cpu_buffer->mapped) {
struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
unsigned int rpos = read;
unsigned int pos = 0;
@@ -5901,6 +5931,11 @@ int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order)

cpu_buffer = buffer->buffers[cpu];

+ if (cpu_buffer->mapped) {
+ err = -EBUSY;
+ goto error;
+ }
+
/* Update the number of pages to match the new size */
nr_pages = old_size * buffer->buffers[cpu]->nr_pages;
nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size);
@@ -6002,6 +6037,295 @@ int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order)
}
EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set);

+#define subbuf_page(off, start) \
+ virt_to_page((void *)(start + (off << PAGE_SHIFT)))
+
+#define foreach_subbuf_page(sub_order, start, page) \
+ page = subbuf_page(0, (start)); \
+ for (int __off = 0; __off < (1 << (sub_order)); \
+ __off++, page = subbuf_page(__off, (start)))
+
+static inline void subbuf_map_prepare(unsigned long subbuf_start, int order)
+{
+ struct page *page;
+
+ /*
+ * When allocating order > 0 pages, only the first struct page has a
+ * refcount > 1. Increasing the refcount here ensures none of the struct
+ * page composing the sub-buffer is freeed when the mapping is closed.
+ */
+ foreach_subbuf_page(order, subbuf_start, page)
+ page_ref_inc(page);
+}
+
+static inline void subbuf_unmap(unsigned long subbuf_start, int order)
+{
+ struct page *page;
+
+ foreach_subbuf_page(order, subbuf_start, page) {
+ page_ref_dec(page);
+ page->mapping = NULL;
+ }
+}
+
+static void rb_free_subbuf_ids(struct ring_buffer_per_cpu *cpu_buffer)
+{
+ int sub_id;
+
+ for (sub_id = 0; sub_id < cpu_buffer->nr_pages + 1; sub_id++)
+ subbuf_unmap(cpu_buffer->subbuf_ids[sub_id],
+ cpu_buffer->buffer->subbuf_order);
+
+ kfree(cpu_buffer->subbuf_ids);
+ cpu_buffer->subbuf_ids = NULL;
+}
+
+static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
+{
+ if (cpu_buffer->meta_page)
+ return 0;
+
+ cpu_buffer->meta_page = page_to_virt(alloc_page(GFP_USER | __GFP_ZERO));
+ if (!cpu_buffer->meta_page)
+ return -ENOMEM;
+
+ return 0;
+}
+
+static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
+{
+ unsigned long addr = (unsigned long)cpu_buffer->meta_page;
+
+ virt_to_page((void *)addr)->mapping = NULL;
+ free_page(addr);
+ cpu_buffer->meta_page = NULL;
+}
+
+static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer,
+ unsigned long *subbuf_ids)
+{
+ struct trace_buffer_meta *meta = cpu_buffer->meta_page;
+ unsigned int nr_subbufs = cpu_buffer->nr_pages + 1;
+ struct buffer_page *first_subbuf, *subbuf;
+ int id = 0;
+
+ subbuf_ids[id] = (unsigned long)cpu_buffer->reader_page->page;
+ subbuf_map_prepare(subbuf_ids[id], cpu_buffer->buffer->subbuf_order);
+ cpu_buffer->reader_page->id = id++;
+
+ first_subbuf = subbuf = rb_set_head_page(cpu_buffer);
+ do {
+ if (id >= nr_subbufs) {
+ WARN_ON(1);
+ break;
+ }
+
+ subbuf_ids[id] = (unsigned long)subbuf->page;
+ subbuf->id = id;
+ subbuf_map_prepare(subbuf_ids[id], cpu_buffer->buffer->subbuf_order);
+
+ rb_inc_page(&subbuf);
+ id++;
+ } while (subbuf != first_subbuf);
+
+ /* install subbuf ID to kern VA translation */
+ cpu_buffer->subbuf_ids = subbuf_ids;
+
+ meta->meta_page_size = PAGE_SIZE;
+ meta->meta_struct_len = sizeof(*meta);
+ meta->nr_subbufs = nr_subbufs;
+ meta->subbuf_size = cpu_buffer->buffer->subbuf_size + BUF_PAGE_HDR_SIZE;
+
+ rb_update_meta_page(cpu_buffer);
+}
+
+static inline struct ring_buffer_per_cpu *
+rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu)
+{
+ struct ring_buffer_per_cpu *cpu_buffer;
+
+ if (!cpumask_test_cpu(cpu, buffer->cpumask))
+ return ERR_PTR(-EINVAL);
+
+ cpu_buffer = buffer->buffers[cpu];
+
+ mutex_lock(&cpu_buffer->mapping_lock);
+
+ if (!cpu_buffer->mapped) {
+ mutex_unlock(&cpu_buffer->mapping_lock);
+ return ERR_PTR(-ENODEV);
+ }
+
+ return cpu_buffer;
+}
+
+static inline void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer)
+{
+ mutex_unlock(&cpu_buffer->mapping_lock);
+}
+
+int ring_buffer_map(struct trace_buffer *buffer, int cpu)
+{
+ struct ring_buffer_per_cpu *cpu_buffer;
+ unsigned long flags, *subbuf_ids;
+ int err = 0;
+
+ if (!cpumask_test_cpu(cpu, buffer->cpumask))
+ return -EINVAL;
+
+ cpu_buffer = buffer->buffers[cpu];
+
+ mutex_lock(&cpu_buffer->mapping_lock);
+
+ if (cpu_buffer->mapped) {
+ if (cpu_buffer->mapped == INT_MAX)
+ err = -EBUSY;
+ else
+ WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped + 1);
+ mutex_unlock(&cpu_buffer->mapping_lock);
+ return err;
+ }
+
+ /* prevent another thread from changing buffer sizes */
+ mutex_lock(&buffer->mutex);
+
+ err = rb_alloc_meta_page(cpu_buffer);
+ if (err)
+ goto unlock;
+
+ /* subbuf_ids include the reader while nr_pages does not */
+ subbuf_ids = kzalloc(sizeof(*subbuf_ids) * (cpu_buffer->nr_pages + 1),
+ GFP_KERNEL);
+ if (!subbuf_ids) {
+ rb_free_meta_page(cpu_buffer);
+ err = -ENOMEM;
+ goto unlock;
+ }
+
+ atomic_inc(&cpu_buffer->resize_disabled);
+
+ /*
+ * Lock all readers to block any subbuf swap until the subbuf IDs are
+ * assigned.
+ */
+ raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+
+ rb_setup_ids_meta_page(cpu_buffer, subbuf_ids);
+
+ WRITE_ONCE(cpu_buffer->mapped, 1);
+
+ raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+unlock:
+ mutex_unlock(&buffer->mutex);
+ mutex_unlock(&cpu_buffer->mapping_lock);
+
+ return err;
+}
+
+int ring_buffer_unmap(struct trace_buffer *buffer, int cpu)
+{
+ struct ring_buffer_per_cpu *cpu_buffer;
+ int err = 0;
+
+ if (!cpumask_test_cpu(cpu, buffer->cpumask))
+ return -EINVAL;
+
+ cpu_buffer = buffer->buffers[cpu];
+
+ mutex_lock(&cpu_buffer->mapping_lock);
+
+ if (!cpu_buffer->mapped) {
+ err = -ENODEV;
+ goto unlock;
+ }
+
+ WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped - 1);
+ if (!cpu_buffer->mapped) {
+ /* Wait for the writer and readers to observe !mapped */
+ synchronize_rcu();
+
+ rb_free_subbuf_ids(cpu_buffer);
+ rb_free_meta_page(cpu_buffer);
+ atomic_dec(&cpu_buffer->resize_disabled);
+ }
+unlock:
+ mutex_unlock(&cpu_buffer->mapping_lock);
+
+ return err;
+}
+
+/*
+ * +--------------+ pgoff == 0
+ * | meta page |
+ * +--------------+ pgoff == 1
+ * | subbuffer 0 |
+ * +--------------+ pgoff == 1 + (1 << subbuf_order)
+ * | subbuffer 1 |
+ * ...
+ */
+struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu,
+ unsigned long pgoff)
+{
+ struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
+ unsigned long subbuf_id, subbuf_offset, addr;
+ struct page *page;
+
+ if (!pgoff)
+ return virt_to_page((void *)cpu_buffer->meta_page);
+
+ pgoff--;
+
+ subbuf_id = pgoff >> buffer->subbuf_order;
+ if (subbuf_id > cpu_buffer->nr_pages)
+ return NULL;
+
+ subbuf_offset = pgoff & ((1UL << buffer->subbuf_order) - 1);
+ addr = cpu_buffer->subbuf_ids[subbuf_id] + (subbuf_offset * PAGE_SIZE);
+ page = virt_to_page((void *)addr);
+
+ return page;
+}
+
+int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu)
+{
+ struct ring_buffer_per_cpu *cpu_buffer;
+ unsigned long reader_size;
+ unsigned long flags;
+
+ cpu_buffer = rb_get_mapped_buffer(buffer, cpu);
+ if (IS_ERR(cpu_buffer))
+ return (int)PTR_ERR(cpu_buffer);
+
+ raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+consume:
+ if (rb_per_cpu_empty(cpu_buffer))
+ goto out;
+
+ reader_size = rb_page_size(cpu_buffer->reader_page);
+
+ /*
+ * There are data to be read on the current reader page, we can
+ * return to the caller. But before that, we assume the latter will read
+ * everything. Let's update the kernel reader accordingly.
+ */
+ if (cpu_buffer->reader_page->read < reader_size) {
+ while (cpu_buffer->reader_page->read < reader_size)
+ rb_advance_reader(cpu_buffer);
+ goto out;
+ }
+
+ if (WARN_ON(!rb_get_reader_page(cpu_buffer)))
+ goto out;
+
+ goto consume;
+out:
+ rb_update_meta_page(cpu_buffer);
+ raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+ rb_put_mapped_buffer(cpu_buffer);
+
+ return 0;
+}
+
/*
* We only allocate new buffers, never free them if the CPU goes down.
* If we were to free the buffer, then the user would lose any trace that was in
--
2.43.0.429.g432eaa2c6b-goog


2024-01-29 14:29:57

by Vincent Donnefort

[permalink] [raw]
Subject: [PATCH v13 6/6] ring-buffer/selftest: Add ring-buffer mapping test

This test maps a ring-buffer and validate the meta-page after reset and
after emitting few events.

Cc: Shuah Khan <[email protected]>
Cc: Shuah Khan <[email protected]>
Cc: [email protected]
Signed-off-by: Vincent Donnefort <[email protected]>

diff --git a/tools/testing/selftests/ring-buffer/Makefile b/tools/testing/selftests/ring-buffer/Makefile
new file mode 100644
index 000000000000..627c5fa6d1ab
--- /dev/null
+++ b/tools/testing/selftests/ring-buffer/Makefile
@@ -0,0 +1,8 @@
+# SPDX-License-Identifier: GPL-2.0
+CFLAGS += -Wl,-no-as-needed -Wall
+CFLAGS += $(KHDR_INCLUDES)
+CFLAGS += -D_GNU_SOURCE
+
+TEST_GEN_PROGS = map_test
+
+include ../lib.mk
diff --git a/tools/testing/selftests/ring-buffer/config b/tools/testing/selftests/ring-buffer/config
new file mode 100644
index 000000000000..ef8214661612
--- /dev/null
+++ b/tools/testing/selftests/ring-buffer/config
@@ -0,0 +1 @@
+CONFIG_FTRACE=y
diff --git a/tools/testing/selftests/ring-buffer/map_test.c b/tools/testing/selftests/ring-buffer/map_test.c
new file mode 100644
index 000000000000..e3220f8d618b
--- /dev/null
+++ b/tools/testing/selftests/ring-buffer/map_test.c
@@ -0,0 +1,183 @@
+// SPDX-License-Identifier: GPL-2.0
+/*
+ * Ring-buffer memory mapping tests
+ *
+ * Copyright (c) 2024 Vincent Donnefort <[email protected]>
+ */
+#include <fcntl.h>
+#include <sched.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include <linux/trace_mmap.h>
+
+#include <sys/mman.h>
+#include <sys/ioctl.h>
+
+#include "../user_events/user_events_selftests.h" /* share tracefs setup */
+#include "../kselftest_harness.h"
+
+#define TRACEFS_ROOT "/sys/kernel/tracing"
+
+static int __tracefs_write(const char *path, const char *value)
+{
+ FILE *file;
+
+ file = fopen(path, "w");
+ if (!file)
+ return -1;
+
+ fputs(value, file);
+ fclose(file);
+
+ return 0;
+}
+
+static int __tracefs_write_int(const char *path, int value)
+{
+ char *str;
+ int ret;
+
+ if (asprintf(&str, "%d", value) < 0)
+ return -1;
+
+ ret = __tracefs_write(path, str);
+
+ free(str);
+
+ return ret;
+}
+
+#define tracefs_write_int(path, value) \
+ ASSERT_EQ(__tracefs_write_int((path), (value)), 0)
+
+static int tracefs_reset(void)
+{
+ if (__tracefs_write_int(TRACEFS_ROOT"/tracing_on", 0))
+ return -1;
+ if (__tracefs_write_int(TRACEFS_ROOT"/trace", 0))
+ return -1;
+ if (__tracefs_write(TRACEFS_ROOT"/set_event", ""))
+ return -1;
+ if (__tracefs_write(TRACEFS_ROOT"/current_tracer", "nop"))
+ return -1;
+
+ return 0;
+}
+
+FIXTURE(map) {
+ struct trace_buffer_meta *meta;
+ void *data;
+ int cpu_fd;
+ bool umount;
+};
+
+FIXTURE_VARIANT(map) {
+ int subbuf_size;
+};
+
+FIXTURE_VARIANT_ADD(map, subbuf_size_4k) {
+ .subbuf_size = 4,
+};
+
+FIXTURE_VARIANT_ADD(map, subbuf_size_8k) {
+ .subbuf_size = 8,
+};
+
+FIXTURE_SETUP(map)
+{
+ int cpu = sched_getcpu(), page_size = getpagesize();
+ unsigned long meta_len, data_len;
+ char *cpu_path, *message;
+ bool fail, umount;
+ cpu_set_t cpu_mask;
+ void *map;
+
+ if (!tracefs_enabled(&message, &fail, &umount)) {
+ if (fail) {
+ TH_LOG("Tracefs setup failed: %s", message);
+ ASSERT_FALSE(fail);
+ }
+ SKIP(return, "Skipping: %s", message);
+ }
+
+ self->umount = umount;
+
+ ASSERT_GE(cpu, 0);
+
+ ASSERT_EQ(tracefs_reset(), 0);
+
+ tracefs_write_int(TRACEFS_ROOT"/buffer_subbuf_size_kb", variant->subbuf_size);
+
+ ASSERT_GE(asprintf(&cpu_path,
+ TRACEFS_ROOT"/per_cpu/cpu%d/trace_pipe_raw",
+ cpu), 0);
+
+ self->cpu_fd = open(cpu_path, O_RDONLY | O_NONBLOCK);
+ ASSERT_GE(self->cpu_fd, 0);
+ free(cpu_path);
+
+ map = mmap(NULL, page_size, PROT_READ, MAP_SHARED, self->cpu_fd, 0);
+ ASSERT_NE(map, MAP_FAILED);
+ self->meta = (struct trace_buffer_meta *)map;
+
+ meta_len = self->meta->meta_page_size;
+ data_len = self->meta->subbuf_size * self->meta->nr_subbufs;
+
+ map = mmap(NULL, data_len, PROT_READ, MAP_SHARED, self->cpu_fd, meta_len);
+ ASSERT_NE(map, MAP_FAILED);
+ self->data = map;
+
+ /*
+ * Ensure generated events will be found on this very same ring-buffer.
+ */
+ CPU_ZERO(&cpu_mask);
+ CPU_SET(cpu, &cpu_mask);
+ ASSERT_EQ(sched_setaffinity(0, sizeof(cpu_mask), &cpu_mask), 0);
+}
+
+FIXTURE_TEARDOWN(map)
+{
+ tracefs_reset();
+
+ if (self->umount)
+ tracefs_unmount();
+
+ munmap(self->data, self->meta->subbuf_size * self->meta->nr_subbufs);
+ munmap(self->meta, self->meta->meta_page_size);
+ close(self->cpu_fd);
+}
+
+TEST_F(map, meta_page_check)
+{
+ int cnt = 0;
+
+ ASSERT_EQ(self->meta->entries, 0);
+ ASSERT_EQ(self->meta->overrun, 0);
+ ASSERT_EQ(self->meta->read, 0);
+
+ ASSERT_EQ(self->meta->reader.id, 0);
+ ASSERT_EQ(self->meta->reader.read, 0);
+
+ ASSERT_EQ(ioctl(self->cpu_fd, TRACE_MMAP_IOCTL_GET_READER), 0);
+ ASSERT_EQ(self->meta->reader.id, 0);
+
+ tracefs_write_int(TRACEFS_ROOT"/tracing_on", 1);
+ for (int i = 0; i < 16; i++)
+ tracefs_write_int(TRACEFS_ROOT"/trace_marker", i);
+again:
+ ASSERT_EQ(ioctl(self->cpu_fd, TRACE_MMAP_IOCTL_GET_READER), 0);
+
+ ASSERT_EQ(self->meta->entries, 16);
+ ASSERT_EQ(self->meta->overrun, 0);
+ ASSERT_EQ(self->meta->read, 16);
+
+ ASSERT_EQ(self->meta->reader.id, 1);
+
+ if (!(cnt++))
+ goto again;
+}
+
+TEST_HARNESS_MAIN
--
2.43.0.429.g432eaa2c6b-goog


2024-01-29 14:31:53

by Vincent Donnefort

[permalink] [raw]
Subject: [PATCH v13 1/6] ring-buffer: Zero ring-buffer sub-buffers

In preparation for the ring-buffer memory mapping where each subbuf will
be accessible to user-space, zero all the page allocations.

Signed-off-by: Vincent Donnefort <[email protected]>
Reviewed-by: Masami Hiramatsu (Google) <[email protected]>

diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index 13aaf5e85b81..8179e0a8984e 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -1472,7 +1472,8 @@ static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,

list_add(&bpage->list, pages);

- page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu), mflags,
+ page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu),
+ mflags | __GFP_ZERO,
cpu_buffer->buffer->subbuf_order);
if (!page)
goto free_pages;
@@ -1557,7 +1558,8 @@ rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)

cpu_buffer->reader_page = bpage;

- page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, cpu_buffer->buffer->subbuf_order);
+ page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL | __GFP_ZERO,
+ cpu_buffer->buffer->subbuf_order);
if (!page)
goto fail_free_reader;
bpage->page = page_address(page);
@@ -5525,7 +5527,8 @@ ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu)
if (bpage->data)
goto out;

- page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL | __GFP_NORETRY,
+ page = alloc_pages_node(cpu_to_node(cpu),
+ GFP_KERNEL | __GFP_NORETRY | __GFP_ZERO,
cpu_buffer->buffer->subbuf_order);
if (!page) {
kfree(bpage);
--
2.43.0.429.g432eaa2c6b-goog


2024-01-29 14:41:44

by Vincent Donnefort

[permalink] [raw]
Subject: [PATCH v13 3/6] tracing: Add snapshot refcount

When a ring-buffer is memory mapped by user-space, no trace or
ring-buffer swap is possible. This means the snapshot feature is
mutually exclusive with the memory mapping. Having a refcount on
snapshot users will help to know if a mapping is possible or not.

Signed-off-by: Vincent Donnefort <[email protected]>

diff --git a/kernel/trace/trace.c b/kernel/trace/trace.c
index 2a7c6fd934e9..b6a0e506b3f9 100644
--- a/kernel/trace/trace.c
+++ b/kernel/trace/trace.c
@@ -1300,6 +1300,52 @@ static void free_snapshot(struct trace_array *tr)
tr->allocated_snapshot = false;
}

+static int tracing_arm_snapshot_locked(struct trace_array *tr)
+{
+ int ret;
+
+ lockdep_assert_held(&trace_types_lock);
+
+ if (tr->snapshot == UINT_MAX)
+ return -EBUSY;
+
+ ret = tracing_alloc_snapshot_instance(tr);
+ if (ret)
+ return ret;
+
+ tr->snapshot++;
+
+ return 0;
+}
+
+static void tracing_disarm_snapshot_locked(struct trace_array *tr)
+{
+ lockdep_assert_held(&trace_types_lock);
+
+ if (WARN_ON(!tr->snapshot))
+ return;
+
+ tr->snapshot--;
+}
+
+int tracing_arm_snapshot(struct trace_array *tr)
+{
+ int ret;
+
+ mutex_lock(&trace_types_lock);
+ ret = tracing_arm_snapshot_locked(tr);
+ mutex_unlock(&trace_types_lock);
+
+ return ret;
+}
+
+void tracing_disarm_snapshot(struct trace_array *tr)
+{
+ mutex_lock(&trace_types_lock);
+ tracing_disarm_snapshot_locked(tr);
+ mutex_unlock(&trace_types_lock);
+}
+
/**
* tracing_alloc_snapshot - allocate snapshot buffer.
*
@@ -1373,10 +1419,6 @@ int tracing_snapshot_cond_enable(struct trace_array *tr, void *cond_data,

mutex_lock(&trace_types_lock);

- ret = tracing_alloc_snapshot_instance(tr);
- if (ret)
- goto fail_unlock;
-
if (tr->current_trace->use_max_tr) {
ret = -EBUSY;
goto fail_unlock;
@@ -1395,6 +1437,10 @@ int tracing_snapshot_cond_enable(struct trace_array *tr, void *cond_data,
goto fail_unlock;
}

+ ret = tracing_arm_snapshot_locked(tr);
+ if (ret)
+ goto fail_unlock;
+
local_irq_disable();
arch_spin_lock(&tr->max_lock);
tr->cond_snapshot = cond_snapshot;
@@ -1439,6 +1485,8 @@ int tracing_snapshot_cond_disable(struct trace_array *tr)
arch_spin_unlock(&tr->max_lock);
local_irq_enable();

+ tracing_disarm_snapshot(tr);
+
return ret;
}
EXPORT_SYMBOL_GPL(tracing_snapshot_cond_disable);
@@ -6593,11 +6641,12 @@ int tracing_set_tracer(struct trace_array *tr, const char *buf)
*/
synchronize_rcu();
free_snapshot(tr);
+ tracing_disarm_snapshot_locked(tr);
}

- if (t->use_max_tr && !tr->allocated_snapshot) {
- ret = tracing_alloc_snapshot_instance(tr);
- if (ret < 0)
+ if (t->use_max_tr) {
+ ret = tracing_arm_snapshot_locked(tr);
+ if (ret)
goto out;
}
#else
@@ -6606,8 +6655,13 @@ int tracing_set_tracer(struct trace_array *tr, const char *buf)

if (t->init) {
ret = tracer_init(t, tr);
- if (ret)
+ if (ret) {
+#ifdef CONFIG_TRACER_MAX_TRACE
+ if (t->use_max_tr)
+ tracing_disarm_snapshot_locked(tr);
+#endif
goto out;
+ }
}

tr->current_trace = t;
@@ -7709,10 +7763,11 @@ tracing_snapshot_write(struct file *filp, const char __user *ubuf, size_t cnt,
if (tr->allocated_snapshot)
ret = resize_buffer_duplicate_size(&tr->max_buffer,
&tr->array_buffer, iter->cpu_file);
- else
- ret = tracing_alloc_snapshot_instance(tr);
- if (ret < 0)
+
+ ret = tracing_arm_snapshot_locked(tr);
+ if (ret)
break;
+
/* Now, we're going to swap */
if (iter->cpu_file == RING_BUFFER_ALL_CPUS) {
local_irq_disable();
@@ -7722,6 +7777,7 @@ tracing_snapshot_write(struct file *filp, const char __user *ubuf, size_t cnt,
smp_call_function_single(iter->cpu_file, tracing_swap_cpu_buffer,
(void *)tr, 1);
}
+ tracing_disarm_snapshot_locked(tr);
break;
default:
if (tr->allocated_snapshot) {
@@ -8846,8 +8902,13 @@ ftrace_trace_snapshot_callback(struct trace_array *tr, struct ftrace_hash *hash,

ops = param ? &snapshot_count_probe_ops : &snapshot_probe_ops;

- if (glob[0] == '!')
- return unregister_ftrace_function_probe_func(glob+1, tr, ops);
+ if (glob[0] == '!') {
+ ret = unregister_ftrace_function_probe_func(glob+1, tr, ops);
+ if (!ret)
+ tracing_disarm_snapshot(tr);
+
+ return ret;
+ }

if (!param)
goto out_reg;
@@ -8866,12 +8927,13 @@ ftrace_trace_snapshot_callback(struct trace_array *tr, struct ftrace_hash *hash,
return ret;

out_reg:
- ret = tracing_alloc_snapshot_instance(tr);
+ ret = tracing_arm_snapshot(tr);
if (ret < 0)
goto out;

ret = register_ftrace_function_probe(glob, tr, ops, count);
-
+ if (ret < 0)
+ tracing_disarm_snapshot(tr);
out:
return ret < 0 ? ret : 0;
}
diff --git a/kernel/trace/trace.h b/kernel/trace/trace.h
index 00f873910c5d..3aa06bd5e48d 100644
--- a/kernel/trace/trace.h
+++ b/kernel/trace/trace.h
@@ -334,6 +334,7 @@ struct trace_array {
*/
struct array_buffer max_buffer;
bool allocated_snapshot;
+ unsigned int snapshot;
#endif
#ifdef CONFIG_TRACER_MAX_TRACE
unsigned long max_latency;
@@ -1973,12 +1974,16 @@ static inline void trace_event_eval_update(struct trace_eval_map **map, int len)
#ifdef CONFIG_TRACER_SNAPSHOT
void tracing_snapshot_instance(struct trace_array *tr);
int tracing_alloc_snapshot_instance(struct trace_array *tr);
+int tracing_arm_snapshot(struct trace_array *tr);
+void tracing_disarm_snapshot(struct trace_array *tr);
#else
static inline void tracing_snapshot_instance(struct trace_array *tr) { }
static inline int tracing_alloc_snapshot_instance(struct trace_array *tr)
{
return 0;
}
+static inline int tracing_arm_snapshot(struct trace_array *tr) { return 0; }
+static inline void tracing_disarm_snapshot(struct trace_array *tr) { }
#endif

#ifdef CONFIG_PREEMPT_TRACER
diff --git a/kernel/trace/trace_events_trigger.c b/kernel/trace/trace_events_trigger.c
index b33c3861fbbb..62e4f58b8671 100644
--- a/kernel/trace/trace_events_trigger.c
+++ b/kernel/trace/trace_events_trigger.c
@@ -597,20 +597,12 @@ static int register_trigger(char *glob,
return ret;
}

-/**
- * unregister_trigger - Generic event_command @unreg implementation
- * @glob: The raw string used to register the trigger
- * @test: Trigger-specific data used to find the trigger to remove
- * @file: The trace_event_file associated with the event
- *
- * Common implementation for event trigger unregistration.
- *
- * Usually used directly as the @unreg method in event command
- * implementations.
+/*
+ * True if the trigger was found and unregistered, else false.
*/
-static void unregister_trigger(char *glob,
- struct event_trigger_data *test,
- struct trace_event_file *file)
+static bool try_unregister_trigger(char *glob,
+ struct event_trigger_data *test,
+ struct trace_event_file *file)
{
struct event_trigger_data *data = NULL, *iter;

@@ -626,8 +618,32 @@ static void unregister_trigger(char *glob,
}
}

- if (data && data->ops->free)
- data->ops->free(data);
+ if (data) {
+ if (data->ops->free)
+ data->ops->free(data);
+
+ return true;
+ }
+
+ return false;
+}
+
+/**
+ * unregister_trigger - Generic event_command @unreg implementation
+ * @glob: The raw string used to register the trigger
+ * @test: Trigger-specific data used to find the trigger to remove
+ * @file: The trace_event_file associated with the event
+ *
+ * Common implementation for event trigger unregistration.
+ *
+ * Usually used directly as the @unreg method in event command
+ * implementations.
+ */
+static void unregister_trigger(char *glob,
+ struct event_trigger_data *test,
+ struct trace_event_file *file)
+{
+ try_unregister_trigger(glob, test, file);
}

/*
@@ -1470,7 +1486,7 @@ register_snapshot_trigger(char *glob,
struct event_trigger_data *data,
struct trace_event_file *file)
{
- int ret = tracing_alloc_snapshot_instance(file->tr);
+ int ret = tracing_arm_snapshot(file->tr);

if (ret < 0)
return ret;
@@ -1478,6 +1494,14 @@ register_snapshot_trigger(char *glob,
return register_trigger(glob, data, file);
}

+static void unregister_snapshot_trigger(char *glob,
+ struct event_trigger_data *data,
+ struct trace_event_file *file)
+{
+ if (try_unregister_trigger(glob, data, file))
+ tracing_disarm_snapshot(file->tr);
+}
+
static int
snapshot_trigger_print(struct seq_file *m, struct event_trigger_data *data)
{
@@ -1510,7 +1534,7 @@ static struct event_command trigger_snapshot_cmd = {
.trigger_type = ETT_SNAPSHOT,
.parse = event_trigger_parse,
.reg = register_snapshot_trigger,
- .unreg = unregister_trigger,
+ .unreg = unregister_snapshot_trigger,
.get_trigger_ops = snapshot_get_trigger_ops,
.set_filter = set_trigger_filter,
};
--
2.43.0.429.g432eaa2c6b-goog


2024-01-29 14:42:12

by Vincent Donnefort

[permalink] [raw]
Subject: [PATCH v13 4/6] tracing: Allow user-space mapping of the ring-buffer

Currently, user-space extracts data from the ring-buffer via splice,
which is handy for storage or network sharing. However, due to splice
limitations, it is imposible to do real-time analysis without a copy.

A solution for that problem is to let the user-space map the ring-buffer
directly.

The mapping is exposed via the per-CPU file trace_pipe_raw. The first
element of the mapping is the meta-page. It is followed by each
subbuffer constituting the ring-buffer, ordered by their unique page ID:

* Meta-page -- include/uapi/linux/trace_mmap.h for a description
* Subbuf ID 0
* Subbuf ID 1
...

It is therefore easy to translate a subbuf ID into an offset in the
mapping:

reader_id = meta->reader->id;
reader_offset = meta->meta_page_size + reader_id * meta->subbuf_size;

When new data is available, the mapper must call a newly introduced ioctl:
TRACE_MMAP_IOCTL_GET_READER. This will update the Meta-page reader ID to
point to the next reader containing unread data.

Signed-off-by: Vincent Donnefort <[email protected]>

diff --git a/include/uapi/linux/trace_mmap.h b/include/uapi/linux/trace_mmap.h
index d4bb67430719..47194c51a4ac 100644
--- a/include/uapi/linux/trace_mmap.h
+++ b/include/uapi/linux/trace_mmap.h
@@ -40,4 +40,6 @@ struct trace_buffer_meta {
__u64 Reserved2;
};

+#define TRACE_MMAP_IOCTL_GET_READER _IO('T', 0x1)
+
#endif /* _TRACE_MMAP_H_ */
diff --git a/kernel/trace/trace.c b/kernel/trace/trace.c
index b6a0e506b3f9..b570f4519d87 100644
--- a/kernel/trace/trace.c
+++ b/kernel/trace/trace.c
@@ -1175,6 +1175,12 @@ static void tracing_snapshot_instance_cond(struct trace_array *tr,
return;
}

+ if (tr->mapped) {
+ trace_array_puts(tr, "*** BUFFER MEMORY MAPPED ***\n");
+ trace_array_puts(tr, "*** Can not use snapshot (sorry) ***\n");
+ return;
+ }
+
local_irq_save(flags);
update_max_tr(tr, current, smp_processor_id(), cond_data);
local_irq_restore(flags);
@@ -1309,6 +1315,9 @@ static int tracing_arm_snapshot_locked(struct trace_array *tr)
if (tr->snapshot == UINT_MAX)
return -EBUSY;

+ if (tr->mapped)
+ return -EBUSY;
+
ret = tracing_alloc_snapshot_instance(tr);
if (ret)
return ret;
@@ -6534,7 +6543,7 @@ static void tracing_set_nop(struct trace_array *tr)
{
if (tr->current_trace == &nop_trace)
return;
-
+
tr->current_trace->enabled--;

if (tr->current_trace->reset)
@@ -8653,15 +8662,31 @@ tracing_buffers_splice_read(struct file *file, loff_t *ppos,
return ret;
}

-/* An ioctl call with cmd 0 to the ring buffer file will wake up all waiters */
static long tracing_buffers_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
{
struct ftrace_buffer_info *info = file->private_data;
struct trace_iterator *iter = &info->iter;
+ int err;

- if (cmd)
- return -ENOIOCTLCMD;
+ if (cmd == TRACE_MMAP_IOCTL_GET_READER) {
+ if (!(file->f_flags & O_NONBLOCK)) {
+ err = ring_buffer_wait(iter->array_buffer->buffer,
+ iter->cpu_file,
+ iter->tr->buffer_percent);
+ if (err)
+ return err;
+ }
+
+ return ring_buffer_map_get_reader(iter->array_buffer->buffer,
+ iter->cpu_file);
+ } else if (cmd) {
+ return -ENOTTY;
+ }

+ /*
+ * An ioctl call with cmd 0 to the ring buffer file will wake up all
+ * waiters
+ */
mutex_lock(&trace_types_lock);

iter->wait_index++;
@@ -8674,6 +8699,90 @@ static long tracing_buffers_ioctl(struct file *file, unsigned int cmd, unsigned
return 0;
}

+static vm_fault_t tracing_buffers_mmap_fault(struct vm_fault *vmf)
+{
+ struct ftrace_buffer_info *info = vmf->vma->vm_file->private_data;
+ struct trace_iterator *iter = &info->iter;
+ vm_fault_t ret = VM_FAULT_SIGBUS;
+ struct page *page;
+
+ page = ring_buffer_map_fault(iter->array_buffer->buffer, iter->cpu_file,
+ vmf->pgoff);
+ if (!page)
+ return ret;
+
+ get_page(page);
+ vmf->page = page;
+ vmf->page->mapping = vmf->vma->vm_file->f_mapping;
+ vmf->page->index = vmf->pgoff;
+
+ return 0;
+}
+
+static void tracing_buffers_mmap_close(struct vm_area_struct *vma)
+{
+ struct ftrace_buffer_info *info = vma->vm_file->private_data;
+ struct trace_iterator *iter = &info->iter;
+
+ ring_buffer_unmap(iter->array_buffer->buffer, iter->cpu_file);
+
+#ifdef CONFIG_TRACER_MAX_TRACE
+ mutex_lock(&trace_types_lock);
+ if (!WARN_ON(!iter->tr->mapped))
+ iter->tr->mapped--;
+ mutex_unlock(&trace_types_lock);
+#endif
+}
+
+static void tracing_buffers_mmap_open(struct vm_area_struct *vma)
+{
+ struct ftrace_buffer_info *info = vma->vm_file->private_data;
+ struct trace_iterator *iter = &info->iter;
+
+ WARN_ON(ring_buffer_map(iter->array_buffer->buffer, iter->cpu_file));
+}
+
+static const struct vm_operations_struct tracing_buffers_vmops = {
+ .open = tracing_buffers_mmap_open,
+ .close = tracing_buffers_mmap_close,
+ .fault = tracing_buffers_mmap_fault,
+};
+
+static int tracing_buffers_mmap(struct file *filp, struct vm_area_struct *vma)
+{
+ struct ftrace_buffer_info *info = filp->private_data;
+ struct trace_iterator *iter = &info->iter;
+ int ret = 0;
+
+ if (vma->vm_flags & VM_WRITE)
+ return -EPERM;
+
+ vm_flags_mod(vma, VM_DONTCOPY | VM_DONTDUMP, VM_MAYWRITE);
+ vma->vm_ops = &tracing_buffers_vmops;
+
+#ifdef CONFIG_TRACER_MAX_TRACE
+ mutex_lock(&trace_types_lock);
+ if (iter->tr->snapshot || iter->tr->mapped == INT_MAX) {
+ ret = -EBUSY;
+ goto unlock;
+ }
+
+ /* Wait for update_max_tr() to observe iter->tr->mapped */
+ if (!iter->tr->mapped++)
+ synchronize_rcu();
+#endif
+
+ ret = ring_buffer_map(iter->array_buffer->buffer, iter->cpu_file);
+
+#ifdef CONFIG_TRACER_MAX_TRACE
+ if (ret)
+ iter->tr->mapped--;
+unlock:
+ mutex_unlock(&trace_types_lock);
+#endif
+ return ret;
+}
+
static const struct file_operations tracing_buffers_fops = {
.open = tracing_buffers_open,
.read = tracing_buffers_read,
@@ -8682,6 +8791,7 @@ static const struct file_operations tracing_buffers_fops = {
.splice_read = tracing_buffers_splice_read,
.unlocked_ioctl = tracing_buffers_ioctl,
.llseek = no_llseek,
+ .mmap = tracing_buffers_mmap,
};

static ssize_t
diff --git a/kernel/trace/trace.h b/kernel/trace/trace.h
index 3aa06bd5e48d..25ec246d5f1e 100644
--- a/kernel/trace/trace.h
+++ b/kernel/trace/trace.h
@@ -335,6 +335,7 @@ struct trace_array {
struct array_buffer max_buffer;
bool allocated_snapshot;
unsigned int snapshot;
+ int mapped;
#endif
#ifdef CONFIG_TRACER_MAX_TRACE
unsigned long max_latency;
--
2.43.0.429.g432eaa2c6b-goog


2024-01-29 14:43:11

by Vincent Donnefort

[permalink] [raw]
Subject: [PATCH v13 5/6] Documentation: tracing: Add ring-buffer mapping

It is now possible to mmap() a ring-buffer to stream its content. Add
some documentation and a code example.

Signed-off-by: Vincent Donnefort <[email protected]>

diff --git a/Documentation/trace/index.rst b/Documentation/trace/index.rst
index 5092d6c13af5..0b300901fd75 100644
--- a/Documentation/trace/index.rst
+++ b/Documentation/trace/index.rst
@@ -29,6 +29,7 @@ Linux Tracing Technologies
timerlat-tracer
intel_th
ring-buffer-design
+ ring-buffer-map
stm
sys-t
coresight/index
diff --git a/Documentation/trace/ring-buffer-map.rst b/Documentation/trace/ring-buffer-map.rst
new file mode 100644
index 000000000000..628254e63830
--- /dev/null
+++ b/Documentation/trace/ring-buffer-map.rst
@@ -0,0 +1,104 @@
+.. SPDX-License-Identifier: GPL-2.0
+
+==================================
+Tracefs ring-buffer memory mapping
+==================================
+
+:Author: Vincent Donnefort <[email protected]>
+
+Overview
+========
+Tracefs ring-buffer memory map provides an efficient method to stream data
+as no memory copy is necessary. The application mapping the ring-buffer becomes
+then a consumer for that ring-buffer, in a similar fashion to trace_pipe.
+
+Memory mapping setup
+====================
+The mapping works with a mmap() of the trace_pipe_raw interface.
+
+The first system page of the mapping contains ring-buffer statistics and
+description. It is referred as the meta-page. One of the most important field of
+the meta-page is the reader. It contains the sub-buffer ID which can be safely
+read by the mapper (see ring-buffer-design.rst).
+
+The meta-page is followed by all the sub-buffers, ordered by ascendant ID. It is
+therefore effortless to know where the reader starts in the mapping:
+
+.. code-block:: c
+
+ reader_id = meta->reader->id;
+ reader_offset = meta->meta_page_size + reader_id * meta->subbuf_size;
+
+When the application is done with the current reader, it can get a new one using
+the trace_pipe_raw ioctl() TRACE_MMAP_IOCTL_GET_READER. This ioctl also updates
+the meta-page fields.
+
+Limitations
+===========
+When a mapping is in place on a Tracefs ring-buffer, it is not possible to
+either resize it (either by increasing the entire size of the ring-buffer or
+each subbuf). It is also not possible to use snapshot or splice.
+
+Concurrent readers (either another application mapping that ring-buffer or the
+kernel with trace_pipe) are allowed but not recommended. They will compete for
+the ring-buffer and the output is unpredictable.
+
+Example
+=======
+
+.. code-block:: c
+
+ #include <fcntl.h>
+ #include <stdio.h>
+ #include <stdlib.h>
+ #include <unistd.h>
+
+ #include <linux/trace_mmap.h>
+
+ #include <sys/mman.h>
+ #include <sys/ioctl.h>
+
+ #define TRACE_PIPE_RAW "/sys/kernel/tracing/per_cpu/cpu0/trace_pipe_raw"
+
+ int main(void)
+ {
+ int page_size = getpagesize(), fd, reader_id;
+ unsigned long meta_len, data_len;
+ struct trace_buffer_meta *meta;
+ void *map, *reader, *data;
+
+ fd = open(TRACE_PIPE_RAW, O_RDONLY | O_NONBLOCK);
+ if (fd < 0)
+ exit(EXIT_FAILURE);
+
+ map = mmap(NULL, page_size, PROT_READ, MAP_SHARED, fd, 0);
+ if (map == MAP_FAILED)
+ exit(EXIT_FAILURE);
+
+ meta = (struct trace_buffer_meta *)map;
+ meta_len = meta->meta_page_size;
+
+ printf("entries: %llu\n", meta->entries);
+ printf("overrun: %llu\n", meta->overrun);
+ printf("read: %llu\n", meta->read);
+ printf("nr_subbufs: %u\n", meta->nr_subbufs);
+
+ data_len = meta->subbuf_size * meta->nr_subbufs;
+ data = mmap(NULL, data_len, PROT_READ, MAP_SHARED, fd, meta_len);
+ if (data == MAP_FAILED)
+ exit(EXIT_FAILURE);
+
+ if (ioctl(fd, TRACE_MMAP_IOCTL_GET_READER) < 0)
+ exit(EXIT_FAILURE);
+
+ reader_id = meta->reader.id;
+ reader = data + meta->subbuf_size * reader_id;
+
+ printf("Current reader address: %p\n", reader);
+
+ munmap(data, data_len);
+ munmap(meta, meta_len);
+ close (fd);
+
+ return 0;
+ }
--
2.43.0.429.g432eaa2c6b-goog


2024-01-30 09:33:15

by kernel test robot

[permalink] [raw]
Subject: Re: [PATCH v13 3/6] tracing: Add snapshot refcount

Hi Vincent,

kernel test robot noticed the following build errors:

[auto build test ERROR on 29142dc92c37d3259a33aef15b03e6ee25b0d188]

url: https://github.com/intel-lab-lkp/linux/commits/Vincent-Donnefort/ring-buffer-Zero-ring-buffer-sub-buffers/20240129-223025
base: 29142dc92c37d3259a33aef15b03e6ee25b0d188
patch link: https://lore.kernel.org/r/20240129142802.2145305-4-vdonnefort%40google.com
patch subject: [PATCH v13 3/6] tracing: Add snapshot refcount
config: arc-randconfig-002-20240130 (https://download.01.org/0day-ci/archive/20240130/[email protected]/config)
compiler: arceb-elf-gcc (GCC) 13.2.0
reproduce (this is a W=1 build): (https://download.01.org/0day-ci/archive/20240130/[email protected]/reproduce)

If you fix the issue in a separate patch/commit (i.e. not just a new version of
the same patch/commit), kindly add following tags
| Reported-by: kernel test robot <[email protected]>
| Closes: https://lore.kernel.org/oe-kbuild-all/[email protected]/

All errors (new ones prefixed by >>):

kernel/trace/trace.c: In function 'tracing_set_tracer':
kernel/trace/trace.c:6644:17: error: implicit declaration of function 'tracing_disarm_snapshot_locked'; did you mean 'tracing_disarm_snapshot'? [-Werror=implicit-function-declaration]
6644 | tracing_disarm_snapshot_locked(tr);
| ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
| tracing_disarm_snapshot
>> kernel/trace/trace.c:6648:23: error: implicit declaration of function 'tracing_arm_snapshot_locked'; did you mean 'tracing_arm_snapshot'? [-Werror=implicit-function-declaration]
6648 | ret = tracing_arm_snapshot_locked(tr);
| ^~~~~~~~~~~~~~~~~~~~~~~~~~~
| tracing_arm_snapshot
cc1: some warnings being treated as errors


vim +6648 kernel/trace/trace.c

6560
6561 int tracing_set_tracer(struct trace_array *tr, const char *buf)
6562 {
6563 struct tracer *t;
6564 #ifdef CONFIG_TRACER_MAX_TRACE
6565 bool had_max_tr;
6566 #endif
6567 int ret = 0;
6568
6569 mutex_lock(&trace_types_lock);
6570
6571 if (!tr->ring_buffer_expanded) {
6572 ret = __tracing_resize_ring_buffer(tr, trace_buf_size,
6573 RING_BUFFER_ALL_CPUS);
6574 if (ret < 0)
6575 goto out;
6576 ret = 0;
6577 }
6578
6579 for (t = trace_types; t; t = t->next) {
6580 if (strcmp(t->name, buf) == 0)
6581 break;
6582 }
6583 if (!t) {
6584 ret = -EINVAL;
6585 goto out;
6586 }
6587 if (t == tr->current_trace)
6588 goto out;
6589
6590 #ifdef CONFIG_TRACER_SNAPSHOT
6591 if (t->use_max_tr) {
6592 local_irq_disable();
6593 arch_spin_lock(&tr->max_lock);
6594 if (tr->cond_snapshot)
6595 ret = -EBUSY;
6596 arch_spin_unlock(&tr->max_lock);
6597 local_irq_enable();
6598 if (ret)
6599 goto out;
6600 }
6601 #endif
6602 /* Some tracers won't work on kernel command line */
6603 if (system_state < SYSTEM_RUNNING && t->noboot) {
6604 pr_warn("Tracer '%s' is not allowed on command line, ignored\n",
6605 t->name);
6606 goto out;
6607 }
6608
6609 /* Some tracers are only allowed for the top level buffer */
6610 if (!trace_ok_for_array(t, tr)) {
6611 ret = -EINVAL;
6612 goto out;
6613 }
6614
6615 /* If trace pipe files are being read, we can't change the tracer */
6616 if (tr->trace_ref) {
6617 ret = -EBUSY;
6618 goto out;
6619 }
6620
6621 trace_branch_disable();
6622
6623 tr->current_trace->enabled--;
6624
6625 if (tr->current_trace->reset)
6626 tr->current_trace->reset(tr);
6627
6628 #ifdef CONFIG_TRACER_MAX_TRACE
6629 had_max_tr = tr->current_trace->use_max_tr;
6630
6631 /* Current trace needs to be nop_trace before synchronize_rcu */
6632 tr->current_trace = &nop_trace;
6633
6634 if (had_max_tr && !t->use_max_tr) {
6635 /*
6636 * We need to make sure that the update_max_tr sees that
6637 * current_trace changed to nop_trace to keep it from
6638 * swapping the buffers after we resize it.
6639 * The update_max_tr is called from interrupts disabled
6640 * so a synchronized_sched() is sufficient.
6641 */
6642 synchronize_rcu();
6643 free_snapshot(tr);
6644 tracing_disarm_snapshot_locked(tr);
6645 }
6646
6647 if (t->use_max_tr) {
> 6648 ret = tracing_arm_snapshot_locked(tr);
6649 if (ret)
6650 goto out;
6651 }
6652 #else
6653 tr->current_trace = &nop_trace;
6654 #endif
6655
6656 if (t->init) {
6657 ret = tracer_init(t, tr);
6658 if (ret) {
6659 #ifdef CONFIG_TRACER_MAX_TRACE
6660 if (t->use_max_tr)
6661 tracing_disarm_snapshot_locked(tr);
6662 #endif
6663 goto out;
6664 }
6665 }
6666
6667 tr->current_trace = t;
6668 tr->current_trace->enabled++;
6669 trace_branch_enable(tr);
6670 out:
6671 mutex_unlock(&trace_types_lock);
6672
6673 return ret;
6674 }
6675

--
0-DAY CI Kernel Test Service
https://github.com/intel/lkp-tests/wiki

2024-01-30 10:33:52

by Vincent Donnefort

[permalink] [raw]
Subject: Re: [PATCH v13 3/6] tracing: Add snapshot refcount

On Tue, Jan 30, 2024 at 05:30:38PM +0800, kernel test robot wrote:
> Hi Vincent,
>
> kernel test robot noticed the following build errors:
>
> [auto build test ERROR on 29142dc92c37d3259a33aef15b03e6ee25b0d188]
>
> url: https://github.com/intel-lab-lkp/linux/commits/Vincent-Donnefort/ring-buffer-Zero-ring-buffer-sub-buffers/20240129-223025
> base: 29142dc92c37d3259a33aef15b03e6ee25b0d188
> patch link: https://lore.kernel.org/r/20240129142802.2145305-4-vdonnefort%40google.com
> patch subject: [PATCH v13 3/6] tracing: Add snapshot refcount
> config: arc-randconfig-002-20240130 (https://download.01.org/0day-ci/archive/20240130/[email protected]/config)
> compiler: arceb-elf-gcc (GCC) 13.2.0
> reproduce (this is a W=1 build): (https://download.01.org/0day-ci/archive/20240130/[email protected]/reproduce)
>
> If you fix the issue in a separate patch/commit (i.e. not just a new version of
> the same patch/commit), kindly add following tags
> | Reported-by: kernel test robot <[email protected]>
> | Closes: https://lore.kernel.org/oe-kbuild-all/[email protected]/
>
> All errors (new ones prefixed by >>):
>
> kernel/trace/trace.c: In function 'tracing_set_tracer':
> kernel/trace/trace.c:6644:17: error: implicit declaration of function 'tracing_disarm_snapshot_locked'; did you mean 'tracing_disarm_snapshot'? [-Werror=implicit-function-declaration]
> 6644 | tracing_disarm_snapshot_locked(tr);
> | ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> | tracing_disarm_snapshot
> >> kernel/trace/trace.c:6648:23: error: implicit declaration of function 'tracing_arm_snapshot_locked'; did you mean 'tracing_arm_snapshot'? [-Werror=implicit-function-declaration]
> 6648 | ret = tracing_arm_snapshot_locked(tr);
> | ^~~~~~~~~~~~~~~~~~~~~~~~~~~
> | tracing_arm_snapshot
> cc1: some warnings being treated as errors

Right, two tracers (hwlat and osnoise) select _only_ MAX_TRACE and not
TRACER_SNAPSHOT.

However, AFAICT, they will not call any of the swapping functions (they don't
set use_max_tr). So I suppose arm/disarm can be ommited in that case.

>
>
> vim +6648 kernel/trace/trace.c
>
> 6560
> 6561 int tracing_set_tracer(struct trace_array *tr, const char *buf)
> 6562 {
> 6563 struct tracer *t;
> 6564 #ifdef CONFIG_TRACER_MAX_TRACE
> 6565 bool had_max_tr;
> 6566 #endif
> 6567 int ret = 0;
> 6568
> 6569 mutex_lock(&trace_types_lock);
> 6570
> 6571 if (!tr->ring_buffer_expanded) {
> 6572 ret = __tracing_resize_ring_buffer(tr, trace_buf_size,
> 6573 RING_BUFFER_ALL_CPUS);
> 6574 if (ret < 0)
> 6575 goto out;
> 6576 ret = 0;
> 6577 }
> 6578
> 6579 for (t = trace_types; t; t = t->next) {
> 6580 if (strcmp(t->name, buf) == 0)
> 6581 break;
> 6582 }
> 6583 if (!t) {
> 6584 ret = -EINVAL;
> 6585 goto out;
> 6586 }
> 6587 if (t == tr->current_trace)
> 6588 goto out;
> 6589
> 6590 #ifdef CONFIG_TRACER_SNAPSHOT
> 6591 if (t->use_max_tr) {
> 6592 local_irq_disable();
> 6593 arch_spin_lock(&tr->max_lock);
> 6594 if (tr->cond_snapshot)
> 6595 ret = -EBUSY;
> 6596 arch_spin_unlock(&tr->max_lock);
> 6597 local_irq_enable();
> 6598 if (ret)
> 6599 goto out;
> 6600 }
> 6601 #endif
> 6602 /* Some tracers won't work on kernel command line */
> 6603 if (system_state < SYSTEM_RUNNING && t->noboot) {
> 6604 pr_warn("Tracer '%s' is not allowed on command line, ignored\n",
> 6605 t->name);
> 6606 goto out;
> 6607 }
> 6608
> 6609 /* Some tracers are only allowed for the top level buffer */
> 6610 if (!trace_ok_for_array(t, tr)) {
> 6611 ret = -EINVAL;
> 6612 goto out;
> 6613 }
> 6614
> 6615 /* If trace pipe files are being read, we can't change the tracer */
> 6616 if (tr->trace_ref) {
> 6617 ret = -EBUSY;
> 6618 goto out;
> 6619 }
> 6620
> 6621 trace_branch_disable();
> 6622
> 6623 tr->current_trace->enabled--;
> 6624
> 6625 if (tr->current_trace->reset)
> 6626 tr->current_trace->reset(tr);
> 6627
> 6628 #ifdef CONFIG_TRACER_MAX_TRACE
> 6629 had_max_tr = tr->current_trace->use_max_tr;
> 6630
> 6631 /* Current trace needs to be nop_trace before synchronize_rcu */
> 6632 tr->current_trace = &nop_trace;
> 6633
> 6634 if (had_max_tr && !t->use_max_tr) {
> 6635 /*
> 6636 * We need to make sure that the update_max_tr sees that
> 6637 * current_trace changed to nop_trace to keep it from
> 6638 * swapping the buffers after we resize it.
> 6639 * The update_max_tr is called from interrupts disabled
> 6640 * so a synchronized_sched() is sufficient.
> 6641 */
> 6642 synchronize_rcu();
> 6643 free_snapshot(tr);
> 6644 tracing_disarm_snapshot_locked(tr);
> 6645 }
> 6646
> 6647 if (t->use_max_tr) {
> > 6648 ret = tracing_arm_snapshot_locked(tr);
> 6649 if (ret)
> 6650 goto out;
> 6651 }
> 6652 #else
> 6653 tr->current_trace = &nop_trace;
> 6654 #endif
> 6655
> 6656 if (t->init) {
> 6657 ret = tracer_init(t, tr);
> 6658 if (ret) {
> 6659 #ifdef CONFIG_TRACER_MAX_TRACE
> 6660 if (t->use_max_tr)
> 6661 tracing_disarm_snapshot_locked(tr);
> 6662 #endif
> 6663 goto out;
> 6664 }
> 6665 }
> 6666
> 6667 tr->current_trace = t;
> 6668 tr->current_trace->enabled++;
> 6669 trace_branch_enable(tr);
> 6670 out:
> 6671 mutex_unlock(&trace_types_lock);
> 6672
> 6673 return ret;
> 6674 }
> 6675
>
> --
> 0-DAY CI Kernel Test Service
> https://github.com/intel/lkp-tests/wiki

2024-01-30 14:55:44

by Masami Hiramatsu

[permalink] [raw]
Subject: Re: [PATCH v13 2/6] ring-buffer: Introducing ring-buffer mapping functions

Hi Vincent,

Thanks for update the code.

On Mon, 29 Jan 2024 14:27:58 +0000
Vincent Donnefort <[email protected]> wrote:

> In preparation for allowing the user-space to map a ring-buffer, add
> a set of mapping functions:
>
> ring_buffer_{map,unmap}()
> ring_buffer_map_fault()
>
> And controls on the ring-buffer:
>
> ring_buffer_map_get_reader() /* swap reader and head */
>
> Mapping the ring-buffer also involves:
>
> A unique ID for each subbuf of the ring-buffer, currently they are
> only identified through their in-kernel VA.
>
> A meta-page, where are stored ring-buffer statistics and a
> description for the current reader
>
> The linear mapping exposes the meta-page, and each subbuf of the
> ring-buffer, ordered following their unique ID, assigned during the
> first mapping.
>
> Once mapped, no subbuf can get in or out of the ring-buffer: the buffer
> size will remain unmodified and the splice enabling functions will in
> reality simply memcpy the data instead of swapping subbufs.
>
> Signed-off-by: Vincent Donnefort <[email protected]>
>
> diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h
> index fa802db216f9..0841ba8bab14 100644
> --- a/include/linux/ring_buffer.h
> +++ b/include/linux/ring_buffer.h
> @@ -6,6 +6,8 @@
> #include <linux/seq_file.h>
> #include <linux/poll.h>
>
> +#include <uapi/linux/trace_mmap.h>
> +
> struct trace_buffer;
> struct ring_buffer_iter;
>
> @@ -221,4 +223,9 @@ int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node);
> #define trace_rb_cpu_prepare NULL
> #endif
>
> +int ring_buffer_map(struct trace_buffer *buffer, int cpu);
> +int ring_buffer_unmap(struct trace_buffer *buffer, int cpu);
> +struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu,
> + unsigned long pgoff);
> +int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu);
> #endif /* _LINUX_RING_BUFFER_H */
> diff --git a/include/uapi/linux/trace_mmap.h b/include/uapi/linux/trace_mmap.h
> new file mode 100644
> index 000000000000..d4bb67430719
> --- /dev/null
> +++ b/include/uapi/linux/trace_mmap.h
> @@ -0,0 +1,43 @@
> +/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */
> +#ifndef _TRACE_MMAP_H_
> +#define _TRACE_MMAP_H_
> +
> +#include <linux/types.h>
> +
> +/**
> + * struct trace_buffer_meta - Ring-buffer Meta-page description
> + * @meta_page_size: Size of this meta-page.
> + * @meta_struct_len: Size of this structure.
> + * @subbuf_size: Size of each subbuf, including the header.
> + * @nr_subbufs: Number of subbfs in the ring-buffer.
> + * @reader.lost_events: Number of events lost at the time of the reader swap.
> + * @reader.id: subbuf ID of the current reader. From 0 to @nr_subbufs - 1
> + * @reader.read: Number of bytes read on the reader subbuf.
> + * @entries: Number of entries in the ring-buffer.
> + * @overrun: Number of entries lost in the ring-buffer.
> + * @read: Number of entries that have been read.
> + */
> +struct trace_buffer_meta {
> + __u32 meta_page_size;
> + __u32 meta_struct_len;
> +
> + __u32 subbuf_size;
> + __u32 nr_subbufs;
> +
> + struct {
> + __u64 lost_events;
> + __u32 id;
> + __u32 read;
> + } reader;
> +
> + __u64 flags;
> +
> + __u64 entries;
> + __u64 overrun;
> + __u64 read;
> +
> + __u64 Reserved1;
> + __u64 Reserved2;
> +};
> +
> +#endif /* _TRACE_MMAP_H_ */
> diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
> index 8179e0a8984e..081065e76d4a 100644
> --- a/kernel/trace/ring_buffer.c
> +++ b/kernel/trace/ring_buffer.c
> @@ -338,6 +338,7 @@ struct buffer_page {
> local_t entries; /* entries on this page */
> unsigned long real_end; /* real end of data */
> unsigned order; /* order of the page */
> + u32 id; /* ID for external mapping */
> struct buffer_data_page *page; /* Actual data page */
> };
>
> @@ -484,6 +485,12 @@ struct ring_buffer_per_cpu {
> u64 read_stamp;
> /* pages removed since last reset */
> unsigned long pages_removed;
> +
> + int mapped;
> + struct mutex mapping_lock;
> + unsigned long *subbuf_ids; /* ID to addr */
> + struct trace_buffer_meta *meta_page;
> +
> /* ring buffer pages to update, > 0 to add, < 0 to remove */
> long nr_pages_to_update;
> struct list_head new_pages; /* new pages to add */
> @@ -1548,6 +1555,7 @@ rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)
> init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
> init_waitqueue_head(&cpu_buffer->irq_work.waiters);
> init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
> + mutex_init(&cpu_buffer->mapping_lock);
>
> bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
> GFP_KERNEL, cpu_to_node(cpu));
> @@ -5160,6 +5168,19 @@ static void rb_clear_buffer_page(struct buffer_page *page)
> page->read = 0;
> }
>
> +static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
> +{
> + struct trace_buffer_meta *meta = cpu_buffer->meta_page;
> +
> + WRITE_ONCE(meta->reader.read, cpu_buffer->reader_page->read);
> + WRITE_ONCE(meta->reader.id, cpu_buffer->reader_page->id);
> + WRITE_ONCE(meta->reader.lost_events, cpu_buffer->lost_events);
> +
> + WRITE_ONCE(meta->entries, local_read(&cpu_buffer->entries));
> + WRITE_ONCE(meta->overrun, local_read(&cpu_buffer->overrun));
> + WRITE_ONCE(meta->read, cpu_buffer->read);
> +}
> +
> static void
> rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
> {
> @@ -5204,6 +5225,9 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
> cpu_buffer->lost_events = 0;
> cpu_buffer->last_overrun = 0;
>
> + if (cpu_buffer->mapped)

There are some cpu_buffer->mapped are accessed via WRITE_ONCE/READ_ONCE()
but others are not. What makes those different?

> + rb_update_meta_page(cpu_buffer);
> +
> rb_head_page_activate(cpu_buffer);
> cpu_buffer->pages_removed = 0;
> }
> @@ -5418,6 +5442,11 @@ int ring_buffer_swap_cpu(struct trace_buffer *buffer_a,
> cpu_buffer_a = buffer_a->buffers[cpu];
> cpu_buffer_b = buffer_b->buffers[cpu];
>
> + if (READ_ONCE(cpu_buffer_a->mapped) || READ_ONCE(cpu_buffer_b->mapped)) {
> + ret = -EBUSY;
> + goto out;
> + }
> +
> /* At least make sure the two buffers are somewhat the same */
> if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
> goto out;
> @@ -5682,7 +5711,8 @@ int ring_buffer_read_page(struct trace_buffer *buffer,
> * Otherwise, we can simply swap the page with the one passed in.
> */
> if (read || (len < (commit - read)) ||
> - cpu_buffer->reader_page == cpu_buffer->commit_page) {
> + cpu_buffer->reader_page == cpu_buffer->commit_page ||
> + cpu_buffer->mapped) {
> struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
> unsigned int rpos = read;
> unsigned int pos = 0;
> @@ -5901,6 +5931,11 @@ int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order)
>
> cpu_buffer = buffer->buffers[cpu];
>
> + if (cpu_buffer->mapped) {
> + err = -EBUSY;
> + goto error;
> + }
> +
> /* Update the number of pages to match the new size */
> nr_pages = old_size * buffer->buffers[cpu]->nr_pages;
> nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size);
> @@ -6002,6 +6037,295 @@ int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order)
> }
> EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set);
>
> +#define subbuf_page(off, start) \
> + virt_to_page((void *)(start + (off << PAGE_SHIFT)))
> +
> +#define foreach_subbuf_page(sub_order, start, page) \
> + page = subbuf_page(0, (start)); \
> + for (int __off = 0; __off < (1 << (sub_order)); \
> + __off++, page = subbuf_page(__off, (start)))
> +
> +static inline void subbuf_map_prepare(unsigned long subbuf_start, int order)
> +{
> + struct page *page;
> +
> + /*
> + * When allocating order > 0 pages, only the first struct page has a
> + * refcount > 1. Increasing the refcount here ensures none of the struct
> + * page composing the sub-buffer is freeed when the mapping is closed.
> + */
> + foreach_subbuf_page(order, subbuf_start, page)
> + page_ref_inc(page);
> +}
> +
> +static inline void subbuf_unmap(unsigned long subbuf_start, int order)
> +{
> + struct page *page;
> +
> + foreach_subbuf_page(order, subbuf_start, page) {
> + page_ref_dec(page);
> + page->mapping = NULL;
> + }
> +}
> +
> +static void rb_free_subbuf_ids(struct ring_buffer_per_cpu *cpu_buffer)
> +{
> + int sub_id;
> +
> + for (sub_id = 0; sub_id < cpu_buffer->nr_pages + 1; sub_id++)
> + subbuf_unmap(cpu_buffer->subbuf_ids[sub_id],
> + cpu_buffer->buffer->subbuf_order);
> +
> + kfree(cpu_buffer->subbuf_ids);
> + cpu_buffer->subbuf_ids = NULL;
> +}
> +
> +static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
> +{
> + if (cpu_buffer->meta_page)
> + return 0;
> +
> + cpu_buffer->meta_page = page_to_virt(alloc_page(GFP_USER | __GFP_ZERO));
> + if (!cpu_buffer->meta_page)
> + return -ENOMEM;
> +
> + return 0;
> +}
> +
> +static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
> +{
> + unsigned long addr = (unsigned long)cpu_buffer->meta_page;
> +
> + virt_to_page((void *)addr)->mapping = NULL;
> + free_page(addr);
> + cpu_buffer->meta_page = NULL;
> +}
> +
> +static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer,
> + unsigned long *subbuf_ids)
> +{
> + struct trace_buffer_meta *meta = cpu_buffer->meta_page;
> + unsigned int nr_subbufs = cpu_buffer->nr_pages + 1;
> + struct buffer_page *first_subbuf, *subbuf;
> + int id = 0;
> +
> + subbuf_ids[id] = (unsigned long)cpu_buffer->reader_page->page;
> + subbuf_map_prepare(subbuf_ids[id], cpu_buffer->buffer->subbuf_order);
> + cpu_buffer->reader_page->id = id++;
> +
> + first_subbuf = subbuf = rb_set_head_page(cpu_buffer);
> + do {
> + if (id >= nr_subbufs) {
> + WARN_ON(1);
> + break;
> + }
> +
> + subbuf_ids[id] = (unsigned long)subbuf->page;
> + subbuf->id = id;
> + subbuf_map_prepare(subbuf_ids[id], cpu_buffer->buffer->subbuf_order);
> +
> + rb_inc_page(&subbuf);
> + id++;
> + } while (subbuf != first_subbuf);
> +
> + /* install subbuf ID to kern VA translation */
> + cpu_buffer->subbuf_ids = subbuf_ids;
> +
> + meta->meta_page_size = PAGE_SIZE;
> + meta->meta_struct_len = sizeof(*meta);
> + meta->nr_subbufs = nr_subbufs;
> + meta->subbuf_size = cpu_buffer->buffer->subbuf_size + BUF_PAGE_HDR_SIZE;
> +
> + rb_update_meta_page(cpu_buffer);
> +}
> +
> +static inline struct ring_buffer_per_cpu *
> +rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu)
> +{
> + struct ring_buffer_per_cpu *cpu_buffer;
> +
> + if (!cpumask_test_cpu(cpu, buffer->cpumask))
> + return ERR_PTR(-EINVAL);
> +
> + cpu_buffer = buffer->buffers[cpu];
> +
> + mutex_lock(&cpu_buffer->mapping_lock);
> +
> + if (!cpu_buffer->mapped) {
> + mutex_unlock(&cpu_buffer->mapping_lock);
> + return ERR_PTR(-ENODEV);
> + }
> +
> + return cpu_buffer;
> +}
> +
> +static inline void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer)
> +{
> + mutex_unlock(&cpu_buffer->mapping_lock);
> +}
> +
> +int ring_buffer_map(struct trace_buffer *buffer, int cpu)
> +{
> + struct ring_buffer_per_cpu *cpu_buffer;
> + unsigned long flags, *subbuf_ids;
> + int err = 0;
> +
> + if (!cpumask_test_cpu(cpu, buffer->cpumask))
> + return -EINVAL;
> +
> + cpu_buffer = buffer->buffers[cpu];
> +
> + mutex_lock(&cpu_buffer->mapping_lock);
> +
> + if (cpu_buffer->mapped) {
> + if (cpu_buffer->mapped == INT_MAX)
> + err = -EBUSY;
> + else
> + WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped + 1);
> + mutex_unlock(&cpu_buffer->mapping_lock);
> + return err;
> + }
> +
> + /* prevent another thread from changing buffer sizes */
> + mutex_lock(&buffer->mutex);
> +
> + err = rb_alloc_meta_page(cpu_buffer);
> + if (err)
> + goto unlock;
> +
> + /* subbuf_ids include the reader while nr_pages does not */
> + subbuf_ids = kzalloc(sizeof(*subbuf_ids) * (cpu_buffer->nr_pages + 1),
> + GFP_KERNEL);
> + if (!subbuf_ids) {
> + rb_free_meta_page(cpu_buffer);
> + err = -ENOMEM;
> + goto unlock;
> + }
> +
> + atomic_inc(&cpu_buffer->resize_disabled);
> +
> + /*
> + * Lock all readers to block any subbuf swap until the subbuf IDs are
> + * assigned.
> + */
> + raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
> +
> + rb_setup_ids_meta_page(cpu_buffer, subbuf_ids);
> +
> + WRITE_ONCE(cpu_buffer->mapped, 1);
> +
> + raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
> +unlock:
> + mutex_unlock(&buffer->mutex);
> + mutex_unlock(&cpu_buffer->mapping_lock);
> +
> + return err;
> +}
> +
> +int ring_buffer_unmap(struct trace_buffer *buffer, int cpu)
> +{
> + struct ring_buffer_per_cpu *cpu_buffer;
> + int err = 0;
> +
> + if (!cpumask_test_cpu(cpu, buffer->cpumask))
> + return -EINVAL;
> +
> + cpu_buffer = buffer->buffers[cpu];
> +
> + mutex_lock(&cpu_buffer->mapping_lock);
> +
> + if (!cpu_buffer->mapped) {
> + err = -ENODEV;
> + goto unlock;
> + }
> +
> + WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped - 1);
> + if (!cpu_buffer->mapped) {
> + /* Wait for the writer and readers to observe !mapped */
> + synchronize_rcu();

How does this synchronize_rcu() work for ensuring to observe?
(All of those writers/readers are in non-preemptive critical section?)

Thank you,

> +
> + rb_free_subbuf_ids(cpu_buffer);
> + rb_free_meta_page(cpu_buffer);
> + atomic_dec(&cpu_buffer->resize_disabled);
> + }
> +unlock:
> + mutex_unlock(&cpu_buffer->mapping_lock);
> +
> + return err;
> +}
> +
> +/*
> + * +--------------+ pgoff == 0
> + * | meta page |
> + * +--------------+ pgoff == 1
> + * | subbuffer 0 |
> + * +--------------+ pgoff == 1 + (1 << subbuf_order)
> + * | subbuffer 1 |
> + * ...
> + */
> +struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu,
> + unsigned long pgoff)
> +{
> + struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
> + unsigned long subbuf_id, subbuf_offset, addr;
> + struct page *page;
> +
> + if (!pgoff)
> + return virt_to_page((void *)cpu_buffer->meta_page);
> +
> + pgoff--;
> +
> + subbuf_id = pgoff >> buffer->subbuf_order;
> + if (subbuf_id > cpu_buffer->nr_pages)
> + return NULL;
> +
> + subbuf_offset = pgoff & ((1UL << buffer->subbuf_order) - 1);
> + addr = cpu_buffer->subbuf_ids[subbuf_id] + (subbuf_offset * PAGE_SIZE);
> + page = virt_to_page((void *)addr);
> +
> + return page;
> +}
> +
> +int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu)
> +{
> + struct ring_buffer_per_cpu *cpu_buffer;
> + unsigned long reader_size;
> + unsigned long flags;
> +
> + cpu_buffer = rb_get_mapped_buffer(buffer, cpu);
> + if (IS_ERR(cpu_buffer))
> + return (int)PTR_ERR(cpu_buffer);
> +
> + raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
> +consume:
> + if (rb_per_cpu_empty(cpu_buffer))
> + goto out;
> +
> + reader_size = rb_page_size(cpu_buffer->reader_page);
> +
> + /*
> + * There are data to be read on the current reader page, we can
> + * return to the caller. But before that, we assume the latter will read
> + * everything. Let's update the kernel reader accordingly.
> + */
> + if (cpu_buffer->reader_page->read < reader_size) {
> + while (cpu_buffer->reader_page->read < reader_size)
> + rb_advance_reader(cpu_buffer);
> + goto out;
> + }
> +
> + if (WARN_ON(!rb_get_reader_page(cpu_buffer)))
> + goto out;
> +
> + goto consume;
> +out:
> + rb_update_meta_page(cpu_buffer);
> + raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
> + rb_put_mapped_buffer(cpu_buffer);
> +
> + return 0;
> +}
> +
> /*
> * We only allocate new buffers, never free them if the CPU goes down.
> * If we were to free the buffer, then the user would lose any trace that was in
> --
> 2.43.0.429.g432eaa2c6b-goog
>


--
Masami Hiramatsu (Google) <[email protected]>

2024-01-30 16:39:34

by Vincent Donnefort

[permalink] [raw]
Subject: Re: [PATCH v13 2/6] ring-buffer: Introducing ring-buffer mapping functions

On Tue, Jan 30, 2024 at 11:55:10PM +0900, Masami Hiramatsu wrote:
> Hi Vincent,
>
> Thanks for update the code.
>
> On Mon, 29 Jan 2024 14:27:58 +0000
> Vincent Donnefort <[email protected]> wrote:
>
> > In preparation for allowing the user-space to map a ring-buffer, add
> > a set of mapping functions:
> >
> > ring_buffer_{map,unmap}()
> > ring_buffer_map_fault()
> >
> > And controls on the ring-buffer:
> >
> > ring_buffer_map_get_reader() /* swap reader and head */
> >
> > Mapping the ring-buffer also involves:
> >
> > A unique ID for each subbuf of the ring-buffer, currently they are
> > only identified through their in-kernel VA.
> >
> > A meta-page, where are stored ring-buffer statistics and a
> > description for the current reader
> >
> > The linear mapping exposes the meta-page, and each subbuf of the
> > ring-buffer, ordered following their unique ID, assigned during the
> > first mapping.
> >
> > Once mapped, no subbuf can get in or out of the ring-buffer: the buffer
> > size will remain unmodified and the splice enabling functions will in
> > reality simply memcpy the data instead of swapping subbufs.
> >
> > Signed-off-by: Vincent Donnefort <[email protected]>
> >
> > diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h
> > index fa802db216f9..0841ba8bab14 100644
> > --- a/include/linux/ring_buffer.h
> > +++ b/include/linux/ring_buffer.h
> > @@ -6,6 +6,8 @@
> > #include <linux/seq_file.h>
> > #include <linux/poll.h>
> >
> > +#include <uapi/linux/trace_mmap.h>
> > +
> > struct trace_buffer;
> > struct ring_buffer_iter;
> >
> > @@ -221,4 +223,9 @@ int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node);
> > #define trace_rb_cpu_prepare NULL
> > #endif
> >
> > +int ring_buffer_map(struct trace_buffer *buffer, int cpu);
> > +int ring_buffer_unmap(struct trace_buffer *buffer, int cpu);
> > +struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu,
> > + unsigned long pgoff);
> > +int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu);
> > #endif /* _LINUX_RING_BUFFER_H */
> > diff --git a/include/uapi/linux/trace_mmap.h b/include/uapi/linux/trace_mmap.h
> > new file mode 100644
> > index 000000000000..d4bb67430719
> > --- /dev/null
> > +++ b/include/uapi/linux/trace_mmap.h
> > @@ -0,0 +1,43 @@
> > +/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */
> > +#ifndef _TRACE_MMAP_H_
> > +#define _TRACE_MMAP_H_
> > +
> > +#include <linux/types.h>
> > +
> > +/**
> > + * struct trace_buffer_meta - Ring-buffer Meta-page description
> > + * @meta_page_size: Size of this meta-page.
> > + * @meta_struct_len: Size of this structure.
> > + * @subbuf_size: Size of each subbuf, including the header.
> > + * @nr_subbufs: Number of subbfs in the ring-buffer.
> > + * @reader.lost_events: Number of events lost at the time of the reader swap.
> > + * @reader.id: subbuf ID of the current reader. From 0 to @nr_subbufs - 1
> > + * @reader.read: Number of bytes read on the reader subbuf.
> > + * @entries: Number of entries in the ring-buffer.
> > + * @overrun: Number of entries lost in the ring-buffer.
> > + * @read: Number of entries that have been read.
> > + */
> > +struct trace_buffer_meta {
> > + __u32 meta_page_size;
> > + __u32 meta_struct_len;
> > +
> > + __u32 subbuf_size;
> > + __u32 nr_subbufs;
> > +
> > + struct {
> > + __u64 lost_events;
> > + __u32 id;
> > + __u32 read;
> > + } reader;
> > +
> > + __u64 flags;
> > +
> > + __u64 entries;
> > + __u64 overrun;
> > + __u64 read;
> > +
> > + __u64 Reserved1;
> > + __u64 Reserved2;
> > +};
> > +
> > +#endif /* _TRACE_MMAP_H_ */
> > diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
> > index 8179e0a8984e..081065e76d4a 100644
> > --- a/kernel/trace/ring_buffer.c
> > +++ b/kernel/trace/ring_buffer.c
> > @@ -338,6 +338,7 @@ struct buffer_page {
> > local_t entries; /* entries on this page */
> > unsigned long real_end; /* real end of data */
> > unsigned order; /* order of the page */
> > + u32 id; /* ID for external mapping */
> > struct buffer_data_page *page; /* Actual data page */
> > };
> >
> > @@ -484,6 +485,12 @@ struct ring_buffer_per_cpu {
> > u64 read_stamp;
> > /* pages removed since last reset */
> > unsigned long pages_removed;
> > +
> > + int mapped;
> > + struct mutex mapping_lock;
> > + unsigned long *subbuf_ids; /* ID to addr */
> > + struct trace_buffer_meta *meta_page;
> > +
> > /* ring buffer pages to update, > 0 to add, < 0 to remove */
> > long nr_pages_to_update;
> > struct list_head new_pages; /* new pages to add */
> > @@ -1548,6 +1555,7 @@ rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)
> > init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
> > init_waitqueue_head(&cpu_buffer->irq_work.waiters);
> > init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
> > + mutex_init(&cpu_buffer->mapping_lock);
> >
> > bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
> > GFP_KERNEL, cpu_to_node(cpu));
> > @@ -5160,6 +5168,19 @@ static void rb_clear_buffer_page(struct buffer_page *page)
> > page->read = 0;
> > }
> >
> > +static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
> > +{
> > + struct trace_buffer_meta *meta = cpu_buffer->meta_page;
> > +
> > + WRITE_ONCE(meta->reader.read, cpu_buffer->reader_page->read);
> > + WRITE_ONCE(meta->reader.id, cpu_buffer->reader_page->id);
> > + WRITE_ONCE(meta->reader.lost_events, cpu_buffer->lost_events);
> > +
> > + WRITE_ONCE(meta->entries, local_read(&cpu_buffer->entries));
> > + WRITE_ONCE(meta->overrun, local_read(&cpu_buffer->overrun));
> > + WRITE_ONCE(meta->read, cpu_buffer->read);
> > +}
> > +
> > static void
> > rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
> > {
> > @@ -5204,6 +5225,9 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
> > cpu_buffer->lost_events = 0;
> > cpu_buffer->last_overrun = 0;
> >
> > + if (cpu_buffer->mapped)
>
> There are some cpu_buffer->mapped are accessed via WRITE_ONCE/READ_ONCE()
> but others are not. What makes those different?

The cpu_buffer->mapped is READ_ONCE for the section where it is not protected
with a lock. That is (in this version) only ring_buffer_swap_cpu().

That said...

a. This is not enough protection at this level, Ideally the _map should also
call synchronize_rcu() to make sure the _swap does see the ->mapped > 0.

b. With refcount for the snapshot in trace/trace.c, it is not possible for those
functions to race. trace_arm_snapshot() and tracing_buffers_mmap() are mutually
exclusive and already stabilized with the trace mutex.

So how about I completely remove those WRITE_ONCE/READ_ONCE and just rely on the
protection given in trace/trace.c instead of duplicating it in
trace/ring_buffer.c?

>
> > + rb_update_meta_page(cpu_buffer);
> > +
> > rb_head_page_activate(cpu_buffer);
> > cpu_buffer->pages_removed = 0;
> > }
> > @@ -5418,6 +5442,11 @@ int ring_buffer_swap_cpu(struct trace_buffer *buffer_a,
> > cpu_buffer_a = buffer_a->buffers[cpu];
> > cpu_buffer_b = buffer_b->buffers[cpu];
> >
> > + if (READ_ONCE(cpu_buffer_a->mapped) || READ_ONCE(cpu_buffer_b->mapped)) {
> > + ret = -EBUSY;
> > + goto out;
> > + }
> > +
> > /* At least make sure the two buffers are somewhat the same */
> > if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
> > goto out;
> > @@ -5682,7 +5711,8 @@ int ring_buffer_read_page(struct trace_buffer *buffer,
> > * Otherwise, we can simply swap the page with the one passed in.
> > */
> > if (read || (len < (commit - read)) ||
> > - cpu_buffer->reader_page == cpu_buffer->commit_page) {
> > + cpu_buffer->reader_page == cpu_buffer->commit_page ||
> > + cpu_buffer->mapped) {
> > struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
> > unsigned int rpos = read;
> > unsigned int pos = 0;
> > @@ -5901,6 +5931,11 @@ int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order)
> >
> > cpu_buffer = buffer->buffers[cpu];
> >
> > + if (cpu_buffer->mapped) {
> > + err = -EBUSY;
> > + goto error;
> > + }
> > +
> > /* Update the number of pages to match the new size */
> > nr_pages = old_size * buffer->buffers[cpu]->nr_pages;
> > nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size);
> > @@ -6002,6 +6037,295 @@ int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order)
> > }
> > EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set);
> >
> > +#define subbuf_page(off, start) \
> > + virt_to_page((void *)(start + (off << PAGE_SHIFT)))
> > +
> > +#define foreach_subbuf_page(sub_order, start, page) \
> > + page = subbuf_page(0, (start)); \
> > + for (int __off = 0; __off < (1 << (sub_order)); \
> > + __off++, page = subbuf_page(__off, (start)))
> > +
> > +static inline void subbuf_map_prepare(unsigned long subbuf_start, int order)
> > +{
> > + struct page *page;
> > +
> > + /*
> > + * When allocating order > 0 pages, only the first struct page has a
> > + * refcount > 1. Increasing the refcount here ensures none of the struct
> > + * page composing the sub-buffer is freeed when the mapping is closed.
> > + */
> > + foreach_subbuf_page(order, subbuf_start, page)
> > + page_ref_inc(page);
> > +}
> > +
> > +static inline void subbuf_unmap(unsigned long subbuf_start, int order)
> > +{
> > + struct page *page;
> > +
> > + foreach_subbuf_page(order, subbuf_start, page) {
> > + page_ref_dec(page);
> > + page->mapping = NULL;
> > + }
> > +}
> > +
> > +static void rb_free_subbuf_ids(struct ring_buffer_per_cpu *cpu_buffer)
> > +{
> > + int sub_id;
> > +
> > + for (sub_id = 0; sub_id < cpu_buffer->nr_pages + 1; sub_id++)
> > + subbuf_unmap(cpu_buffer->subbuf_ids[sub_id],
> > + cpu_buffer->buffer->subbuf_order);
> > +
> > + kfree(cpu_buffer->subbuf_ids);
> > + cpu_buffer->subbuf_ids = NULL;
> > +}
> > +
> > +static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
> > +{
> > + if (cpu_buffer->meta_page)
> > + return 0;
> > +
> > + cpu_buffer->meta_page = page_to_virt(alloc_page(GFP_USER | __GFP_ZERO));
> > + if (!cpu_buffer->meta_page)
> > + return -ENOMEM;
> > +
> > + return 0;
> > +}
> > +
> > +static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
> > +{
> > + unsigned long addr = (unsigned long)cpu_buffer->meta_page;
> > +
> > + virt_to_page((void *)addr)->mapping = NULL;
> > + free_page(addr);
> > + cpu_buffer->meta_page = NULL;
> > +}
> > +
> > +static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer,
> > + unsigned long *subbuf_ids)
> > +{
> > + struct trace_buffer_meta *meta = cpu_buffer->meta_page;
> > + unsigned int nr_subbufs = cpu_buffer->nr_pages + 1;
> > + struct buffer_page *first_subbuf, *subbuf;
> > + int id = 0;
> > +
> > + subbuf_ids[id] = (unsigned long)cpu_buffer->reader_page->page;
> > + subbuf_map_prepare(subbuf_ids[id], cpu_buffer->buffer->subbuf_order);
> > + cpu_buffer->reader_page->id = id++;
> > +
> > + first_subbuf = subbuf = rb_set_head_page(cpu_buffer);
> > + do {
> > + if (id >= nr_subbufs) {
> > + WARN_ON(1);
> > + break;
> > + }
> > +
> > + subbuf_ids[id] = (unsigned long)subbuf->page;
> > + subbuf->id = id;
> > + subbuf_map_prepare(subbuf_ids[id], cpu_buffer->buffer->subbuf_order);
> > +
> > + rb_inc_page(&subbuf);
> > + id++;
> > + } while (subbuf != first_subbuf);
> > +
> > + /* install subbuf ID to kern VA translation */
> > + cpu_buffer->subbuf_ids = subbuf_ids;
> > +
> > + meta->meta_page_size = PAGE_SIZE;
> > + meta->meta_struct_len = sizeof(*meta);
> > + meta->nr_subbufs = nr_subbufs;
> > + meta->subbuf_size = cpu_buffer->buffer->subbuf_size + BUF_PAGE_HDR_SIZE;
> > +
> > + rb_update_meta_page(cpu_buffer);
> > +}
> > +
> > +static inline struct ring_buffer_per_cpu *
> > +rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu)
> > +{
> > + struct ring_buffer_per_cpu *cpu_buffer;
> > +
> > + if (!cpumask_test_cpu(cpu, buffer->cpumask))
> > + return ERR_PTR(-EINVAL);
> > +
> > + cpu_buffer = buffer->buffers[cpu];
> > +
> > + mutex_lock(&cpu_buffer->mapping_lock);
> > +
> > + if (!cpu_buffer->mapped) {
> > + mutex_unlock(&cpu_buffer->mapping_lock);
> > + return ERR_PTR(-ENODEV);
> > + }
> > +
> > + return cpu_buffer;
> > +}
> > +
> > +static inline void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer)
> > +{
> > + mutex_unlock(&cpu_buffer->mapping_lock);
> > +}
> > +
> > +int ring_buffer_map(struct trace_buffer *buffer, int cpu)
> > +{
> > + struct ring_buffer_per_cpu *cpu_buffer;
> > + unsigned long flags, *subbuf_ids;
> > + int err = 0;
> > +
> > + if (!cpumask_test_cpu(cpu, buffer->cpumask))
> > + return -EINVAL;
> > +
> > + cpu_buffer = buffer->buffers[cpu];
> > +
> > + mutex_lock(&cpu_buffer->mapping_lock);
> > +
> > + if (cpu_buffer->mapped) {
> > + if (cpu_buffer->mapped == INT_MAX)
> > + err = -EBUSY;
> > + else
> > + WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped + 1);
> > + mutex_unlock(&cpu_buffer->mapping_lock);
> > + return err;
> > + }
> > +
> > + /* prevent another thread from changing buffer sizes */
> > + mutex_lock(&buffer->mutex);
> > +
> > + err = rb_alloc_meta_page(cpu_buffer);
> > + if (err)
> > + goto unlock;
> > +
> > + /* subbuf_ids include the reader while nr_pages does not */
> > + subbuf_ids = kzalloc(sizeof(*subbuf_ids) * (cpu_buffer->nr_pages + 1),
> > + GFP_KERNEL);
> > + if (!subbuf_ids) {
> > + rb_free_meta_page(cpu_buffer);
> > + err = -ENOMEM;
> > + goto unlock;
> > + }
> > +
> > + atomic_inc(&cpu_buffer->resize_disabled);
> > +
> > + /*
> > + * Lock all readers to block any subbuf swap until the subbuf IDs are
> > + * assigned.
> > + */
> > + raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
> > +
> > + rb_setup_ids_meta_page(cpu_buffer, subbuf_ids);
> > +
> > + WRITE_ONCE(cpu_buffer->mapped, 1);
> > +
> > + raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
> > +unlock:
> > + mutex_unlock(&buffer->mutex);
> > + mutex_unlock(&cpu_buffer->mapping_lock);
> > +
> > + return err;
> > +}
> > +
> > +int ring_buffer_unmap(struct trace_buffer *buffer, int cpu)
> > +{
> > + struct ring_buffer_per_cpu *cpu_buffer;
> > + int err = 0;
> > +
> > + if (!cpumask_test_cpu(cpu, buffer->cpumask))
> > + return -EINVAL;
> > +
> > + cpu_buffer = buffer->buffers[cpu];
> > +
> > + mutex_lock(&cpu_buffer->mapping_lock);
> > +
> > + if (!cpu_buffer->mapped) {
> > + err = -ENODEV;
> > + goto unlock;
> > + }
> > +
> > + WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped - 1);
> > + if (!cpu_buffer->mapped) {
> > + /* Wait for the writer and readers to observe !mapped */
> > + synchronize_rcu();
>
> How does this synchronize_rcu() work for ensuring to observe?
> (All of those writers/readers are in non-preemptive critical section?)

Apologies, that is something I should have reworked. In earlier versions of the
patch series, the writer could update themselves the meta-page.

This has been abandoned and now only rb_reset_cpu() and the get_reader ioctl are
updating the meta-page.

ring_buffer_map_get_reader() is already protected with the mapping mutex.
rb_reset_cpu() is behind the reader_lock, Taking the later would be way more
efficient than the synchronize_rcu()

>
> Thank you,
>
> > +
> > + rb_free_subbuf_ids(cpu_buffer);
> > + rb_free_meta_page(cpu_buffer);
> > + atomic_dec(&cpu_buffer->resize_disabled);
> > + }
> > +unlock:
> > + mutex_unlock(&cpu_buffer->mapping_lock);
> > +
> > + return err;
> > +}
> > +
> > +/*
> > + * +--------------+ pgoff == 0
> > + * | meta page |
> > + * +--------------+ pgoff == 1
> > + * | subbuffer 0 |
> > + * +--------------+ pgoff == 1 + (1 << subbuf_order)
> > + * | subbuffer 1 |
> > + * ...
> > + */
> > +struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu,
> > + unsigned long pgoff)
> > +{
> > + struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
> > + unsigned long subbuf_id, subbuf_offset, addr;
> > + struct page *page;
> > +
> > + if (!pgoff)
> > + return virt_to_page((void *)cpu_buffer->meta_page);
> > +
> > + pgoff--;
> > +
> > + subbuf_id = pgoff >> buffer->subbuf_order;
> > + if (subbuf_id > cpu_buffer->nr_pages)
> > + return NULL;
> > +
> > + subbuf_offset = pgoff & ((1UL << buffer->subbuf_order) - 1);
> > + addr = cpu_buffer->subbuf_ids[subbuf_id] + (subbuf_offset * PAGE_SIZE);
> > + page = virt_to_page((void *)addr);
> > +
> > + return page;
> > +}
> > +
> > +int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu)
> > +{
> > + struct ring_buffer_per_cpu *cpu_buffer;
> > + unsigned long reader_size;
> > + unsigned long flags;
> > +
> > + cpu_buffer = rb_get_mapped_buffer(buffer, cpu);
> > + if (IS_ERR(cpu_buffer))
> > + return (int)PTR_ERR(cpu_buffer);
> > +
> > + raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
> > +consume:
> > + if (rb_per_cpu_empty(cpu_buffer))
> > + goto out;
> > +
> > + reader_size = rb_page_size(cpu_buffer->reader_page);
> > +
> > + /*
> > + * There are data to be read on the current reader page, we can
> > + * return to the caller. But before that, we assume the latter will read
> > + * everything. Let's update the kernel reader accordingly.
> > + */
> > + if (cpu_buffer->reader_page->read < reader_size) {
> > + while (cpu_buffer->reader_page->read < reader_size)
> > + rb_advance_reader(cpu_buffer);
> > + goto out;
> > + }
> > +
> > + if (WARN_ON(!rb_get_reader_page(cpu_buffer)))
> > + goto out;
> > +
> > + goto consume;
> > +out:
> > + rb_update_meta_page(cpu_buffer);
> > + raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
> > + rb_put_mapped_buffer(cpu_buffer);
> > +
> > + return 0;
> > +}
> > +
> > /*
> > * We only allocate new buffers, never free them if the CPU goes down.
> > * If we were to free the buffer, then the user would lose any trace that was in
> > --
> > 2.43.0.429.g432eaa2c6b-goog
> >
>
>
> --
> Masami Hiramatsu (Google) <[email protected]>

2024-02-04 00:34:01

by Steven Rostedt

[permalink] [raw]
Subject: Re: [PATCH v13 2/6] ring-buffer: Introducing ring-buffer mapping functions

On Mon, 29 Jan 2024 14:27:58 +0000
Vincent Donnefort <[email protected]> wrote:

> --- /dev/null
> +++ b/include/uapi/linux/trace_mmap.h
> @@ -0,0 +1,43 @@
> +/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */
> +#ifndef _TRACE_MMAP_H_
> +#define _TRACE_MMAP_H_
> +
> +#include <linux/types.h>
> +
> +/**
> + * struct trace_buffer_meta - Ring-buffer Meta-page description
> + * @meta_page_size: Size of this meta-page.
> + * @meta_struct_len: Size of this structure.
> + * @subbuf_size: Size of each subbuf, including the header.

That is "the header"?

> + * @nr_subbufs: Number of subbfs in the ring-buffer.
> + * @reader.lost_events: Number of events lost at the time of the reader swap.
> + * @reader.id: subbuf ID of the current reader. From 0 to @nr_subbufs - 1
> + * @reader.read: Number of bytes read on the reader subbuf.

Note, flags needs a comment.

> + * @entries: Number of entries in the ring-buffer.
> + * @overrun: Number of entries lost in the ring-buffer.
> + * @read: Number of entries that have been read.

So does the two Reserved words, otherwise I'm going to start getting
error reports about sparse warnings that check kerneldoc comments
against their content. Every field needs to be represented in the
kerneldoc comment.

-- Steve


> + */
> +struct trace_buffer_meta {
> + __u32 meta_page_size;
> + __u32 meta_struct_len;
> +
> + __u32 subbuf_size;
> + __u32 nr_subbufs;
> +
> + struct {
> + __u64 lost_events;
> + __u32 id;
> + __u32 read;
> + } reader;
> +
> + __u64 flags;
> +
> + __u64 entries;
> + __u64 overrun;
> + __u64 read;
> +
> + __u64 Reserved1;
> + __u64 Reserved2;
> +};
> +
> +#endif /* _TRACE_MMAP_H_ */

2024-02-04 00:56:52

by Steven Rostedt

[permalink] [raw]
Subject: Re: [PATCH v13 2/6] ring-buffer: Introducing ring-buffer mapping functions

On Tue, 30 Jan 2024 16:22:06 +0000
Vincent Donnefort <[email protected]> wrote:
> > > +static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
> > > +{
> > > + struct trace_buffer_meta *meta = cpu_buffer->meta_page;
> > > +
> > > + WRITE_ONCE(meta->reader.read, cpu_buffer->reader_page->read);
> > > + WRITE_ONCE(meta->reader.id, cpu_buffer->reader_page->id);
> > > + WRITE_ONCE(meta->reader.lost_events, cpu_buffer->lost_events);
> > > +
> > > + WRITE_ONCE(meta->entries, local_read(&cpu_buffer->entries));
> > > + WRITE_ONCE(meta->overrun, local_read(&cpu_buffer->overrun));
> > > + WRITE_ONCE(meta->read, cpu_buffer->read);
> > > +}
> > > +
> > > static void
> > > rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
> > > {
> > > @@ -5204,6 +5225,9 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
> > > cpu_buffer->lost_events = 0;
> > > cpu_buffer->last_overrun = 0;
> > >
> > > + if (cpu_buffer->mapped)
> >
> > There are some cpu_buffer->mapped are accessed via WRITE_ONCE/READ_ONCE()
> > but others are not. What makes those different?
>
> The cpu_buffer->mapped is READ_ONCE for the section where it is not protected
> with a lock. That is (in this version) only ring_buffer_swap_cpu().
>
> That said...
>
> a. This is not enough protection at this level, Ideally the _map should also
> call synchronize_rcu() to make sure the _swap does see the ->mapped > 0.
>
> b. With refcount for the snapshot in trace/trace.c, it is not possible for those
> functions to race. trace_arm_snapshot() and tracing_buffers_mmap() are mutually
> exclusive and already stabilized with the trace mutex.
>
> So how about I completely remove those WRITE_ONCE/READ_ONCE and just rely on the
> protection given in trace/trace.c instead of duplicating it in
> trace/ring_buffer.c?

I would add a comment and replace the READ_ONCE with WARN_ON_ONCE():

/* It is up to the callers to not swap mapped buffers */
if (WARN_ON_ONCE(cpu_buffer_a->mapped || cpu_buffer_b->mapped)) {
ret = -EBUSY;
goto out;
}


-- Steve

2024-02-05 09:44:18

by Vincent Donnefort

[permalink] [raw]
Subject: Re: [PATCH v13 2/6] ring-buffer: Introducing ring-buffer mapping functions

On Sat, Feb 03, 2024 at 07:33:51PM -0500, Steven Rostedt wrote:
> On Mon, 29 Jan 2024 14:27:58 +0000
> Vincent Donnefort <[email protected]> wrote:
>
> > --- /dev/null
> > +++ b/include/uapi/linux/trace_mmap.h
> > @@ -0,0 +1,43 @@
> > +/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */
> > +#ifndef _TRACE_MMAP_H_
> > +#define _TRACE_MMAP_H_
> > +
> > +#include <linux/types.h>
> > +
> > +/**
> > + * struct trace_buffer_meta - Ring-buffer Meta-page description
> > + * @meta_page_size: Size of this meta-page.
> > + * @meta_struct_len: Size of this structure.
> > + * @subbuf_size: Size of each subbuf, including the header.
>
> That is "the header"?

I mean the header of the "bpage". But that is confusing. I'll either remove that
mention or just say it is aligned with the order of a system page size.

>
> > + * @nr_subbufs: Number of subbfs in the ring-buffer.
> > + * @reader.lost_events: Number of events lost at the time of the reader swap.
> > + * @reader.id: subbuf ID of the current reader. From 0 to @nr_subbufs - 1
> > + * @reader.read: Number of bytes read on the reader subbuf.
>
> Note, flags needs a comment.
>
> > + * @entries: Number of entries in the ring-buffer.
> > + * @overrun: Number of entries lost in the ring-buffer.
> > + * @read: Number of entries that have been read.
>
> So does the two Reserved words, otherwise I'm going to start getting
> error reports about sparse warnings that check kerneldoc comments
> against their content. Every field needs to be represented in the
> kerneldoc comment.

Ack, wasn't sure it was needed.

>
> -- Steve
>
>
> > + */
> > +struct trace_buffer_meta {
> > + __u32 meta_page_size;
> > + __u32 meta_struct_len;
> > +
> > + __u32 subbuf_size;
> > + __u32 nr_subbufs;
> > +
> > + struct {
> > + __u64 lost_events;
> > + __u32 id;
> > + __u32 read;
> > + } reader;
> > +
> > + __u64 flags;
> > +
> > + __u64 entries;
> > + __u64 overrun;
> > + __u64 read;
> > +
> > + __u64 Reserved1;
> > + __u64 Reserved2;
> > +};
> > +
> > +#endif /* _TRACE_MMAP_H_ */

2024-02-05 10:26:37

by Steven Rostedt

[permalink] [raw]
Subject: Re: [PATCH v13 3/6] tracing: Add snapshot refcount

On Tue, 30 Jan 2024 10:32:45 +0000
Vincent Donnefort <[email protected]> wrote:

> > All errors (new ones prefixed by >>):
> >
> > kernel/trace/trace.c: In function 'tracing_set_tracer':
> > kernel/trace/trace.c:6644:17: error: implicit declaration of function 'tracing_disarm_snapshot_locked'; did you mean 'tracing_disarm_snapshot'? [-Werror=implicit-function-declaration]
> > 6644 | tracing_disarm_snapshot_locked(tr);
> > | ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> > | tracing_disarm_snapshot
> > >> kernel/trace/trace.c:6648:23: error: implicit declaration of function 'tracing_arm_snapshot_locked'; did you mean 'tracing_arm_snapshot'? [-Werror=implicit-function-declaration]
> > 6648 | ret = tracing_arm_snapshot_locked(tr);
> > | ^~~~~~~~~~~~~~~~~~~~~~~~~~~
> > | tracing_arm_snapshot
> > cc1: some warnings being treated as errors
>
> Right, two tracers (hwlat and osnoise) select _only_ MAX_TRACE and not
> TRACER_SNAPSHOT.
>
> However, AFAICT, they will not call any of the swapping functions (they don't
> set use_max_tr). So I suppose arm/disarm can be ommited in that case.

Yeah, if you can test with the various configs enabled and disabled to
make sure that it still builds properly, then that should be good.

I should make sure that my own ktest config that I use to run tests
checks these variations too.

-- Steve