2023-12-18 15:16:51

by Vincent Donnefort

[permalink] [raw]
Subject: [PATCH v7 0/2] Introducing trace buffer mapping by user-space

The tracing ring-buffers can be stored on disk or sent to network
without any copy via splice. However the later doesn't allow real time
processing of the traces. A solution is to give userspace direct access
to the ring-buffer pages via a mapping. An application can now become a
consumer of the ring-buffer, in a similar fashion to what trace_pipe
offers.

Attached to this cover letter an example of consuming read for a
ring-buffer, using libtracefs.

Vincent

v6 -> v7:
* Rebase onto lore.kernel.org/lkml/[email protected]/
* Support for subbufs
* Rename subbufs into bpages

v5 -> v6:
* Rebase on next-20230802.
* (unsigned long) -> (void *) cast for virt_to_page().
* Add a wait for the GET_READER_PAGE ioctl.
* Move writer fields update (overrun/pages_lost/entries/pages_touched)
in the irq_work.
* Rearrange id in struct buffer_page.
* Rearrange the meta-page.
* ring_buffer_meta_page -> trace_buffer_meta_page.
* Add meta_struct_len into the meta-page.

v4 -> v5:
* Trivial rebase onto 6.5-rc3 (previously 6.4-rc3)

v3 -> v4:
* Add to the meta-page:
- pages_lost / pages_read (allow to compute how full is the
ring-buffer)
- read (allow to compute how many entries can be read)
- A reader_page struct.
* Rename ring_buffer_meta_header -> ring_buffer_meta
* Rename ring_buffer_get_reader_page -> ring_buffer_map_get_reader_page
* Properly consume events on ring_buffer_map_get_reader_page() with
rb_advance_reader().

v2 -> v3:
* Remove data page list (for non-consuming read)
** Implies removing order > 0 meta-page
* Add a new meta page field ->read
* Rename ring_buffer_meta_page_header into ring_buffer_meta_header

v1 -> v2:
* Hide data_pages from the userspace struct
* Fix META_PAGE_MAX_PAGES
* Support for order > 0 meta-page
* Add missing page->mapping.

---

/* Need to access private struct to save counters */
struct kbuffer {
unsigned long long timestamp;
long long lost_events;
unsigned long flags;
void *subbuffer;
void *data;
unsigned int index;
unsigned int curr;
unsigned int next;
unsigned int size;
unsigned int start;
unsigned int first;

unsigned int (*read_4)(void *ptr);
unsigned long long (*read_8)(void *ptr);
unsigned long long (*read_long)(struct kbuffer *kbuf, void *ptr);
int (*next_event)(struct kbuffer *kbuf);
};

struct trace_buffer_meta {
unsigned long entries;
unsigned long overrun;
unsigned long read;

unsigned long bpages_touched;
unsigned long bpages_lost;
unsigned long bpages_read;

struct {
unsigned long lost_events; /* Events lost at the time of the reader swap */
__u32 id; /* Reader bpage ID from 0 to nr_bpages - 1 */
__u32 read; /* Number of bytes read on the reader bpage */
} reader;

__u32 bpage_size; /* Size of each buffer page including the header */
__u32 nr_bpages; /* Number of buffer pages in the ring-buffer */

__u32 meta_page_size; /* Size of the meta-page */
__u32 meta_struct_len; /* Len of this struct */
};

static char *argv0;
static bool exit_requested;

static char *get_this_name(void)
{
static char *this_name;
char *arg;
char *p;

if (this_name)
return this_name;

arg = argv0;
p = arg+strlen(arg);

while (p >= arg && *p != '/')
p--;
p++;

this_name = p;
return p;
}

static void __vdie(const char *fmt, va_list ap, int err)
{
int ret = errno;
char *p = get_this_name();

if (err && errno)
perror(p);
else
ret = -1;

fprintf(stderr, " ");
vfprintf(stderr, fmt, ap);

fprintf(stderr, "\n");
exit(ret);
}

void pdie(const char *fmt, ...)
{
va_list ap;

va_start(ap, fmt);
__vdie(fmt, ap, 1);
va_end(ap);
}

static void read_page(struct tep_handle *tep, struct kbuffer *kbuf)
{
static struct trace_seq seq;
struct tep_record record;

if (seq.buffer)
trace_seq_reset(&seq);
else
trace_seq_init(&seq);

while ((record.data = kbuffer_read_event(kbuf, &record.ts))) {
record.size = kbuffer_event_size(kbuf);
kbuffer_next_event(kbuf, NULL);
tep_print_event(tep, &seq, &record,
"%s-%d %9d\t%s: %s", TEP_PRINT_COMM,
TEP_PRINT_PID, TEP_PRINT_TIME, TEP_PRINT_NAME,
TEP_PRINT_INFO);
trace_seq_do_printf(&seq);
trace_seq_reset(&seq);
}
}

static int next_reader_subbuf(int fd, struct trace_buffer_meta *meta, unsigned long *read)
{
__u32 prev_read, prev_reader, new_reader;

prev_read = READ_ONCE(meta->reader.read);
prev_reader = READ_ONCE(meta->reader.id);
if (ioctl(fd, TRACE_MMAP_IOCTL_GET_READER) < 0)
pdie("ioctl");
new_reader = READ_ONCE(meta->reader.id);

if (prev_reader != new_reader) {
*read = 0;
printf("NEW READER PAGE: %d\n", new_reader);
} else
*read = prev_read;

return new_reader;
}

static void signal_handler(int unused)
{
exit_requested = true;
}

int main(int argc, char **argv)
{
int page_size, meta_len, data_len, subbuf, fd;
struct trace_buffer_meta *map;
struct tep_handle *tep;
struct kbuffer *kbuf;
unsigned long read;
void *meta, *data;
char path[32];
int cpu;

if (argc != 2)
return -EINVAL;

argv0 = argv[0];
cpu = atoi(argv[1]);
snprintf(path, 32, "per_cpu/cpu%d/trace_pipe_raw", cpu);

tep = tracefs_local_events(NULL);
kbuf = tep_kbuffer(tep);
page_size = getpagesize();

fd = tracefs_instance_file_open(NULL, path, O_RDONLY);
if (fd < 0)
pdie("raw");

meta = mmap(NULL, page_size, PROT_READ, MAP_SHARED, fd, 0);
if (meta == MAP_FAILED)
pdie("mmap");
map = (struct trace_buffer_meta *)meta;
meta_len = map->meta_page_size;

printf("entries: %lu\n", map->entries);
printf("overrun: %lu\n", map->overrun);
printf("read: %lu\n", map->read);
printf("bpages_touched: %lu\n", map->bpages_touched);
printf("bpages_lost: %lu\n", map->bpages_lost);
printf("bpages_read: %lu\n", map->bpages_read);
printf("bpage_size: %u\n", map->bpage_size);
printf("nr_bpages: %u\n", map->nr_bpages);

data_len = map->bpage_size * map->nr_bpages;
data = mmap(NULL, data_len, PROT_READ, MAP_SHARED, fd, meta_len);
if (data == MAP_FAILED)
pdie("mmap data");

signal(SIGINT, signal_handler);

while (!exit_requested) {
subbuf = next_reader_subbuf(fd, map, &read);
kbuffer_load_subbuffer(kbuf, data + map->bpage_size * subbuf);
while (kbuf->curr < read)
kbuffer_next_event(kbuf, NULL);

read_page(tep, kbuf);
}

munmap(data, data_len);
munmap(meta, page_size);
close(fd);

return 0;
}

Vincent Donnefort (2):
ring-buffer: Introducing ring-buffer mapping functions
tracing: Allow user-space mapping of the ring-buffer

include/linux/ring_buffer.h | 7 +
include/uapi/linux/trace_mmap.h | 31 +++
kernel/trace/ring_buffer.c | 373 +++++++++++++++++++++++++++++++-
kernel/trace/trace.c | 79 ++++++-
4 files changed, 486 insertions(+), 4 deletions(-)
create mode 100644 include/uapi/linux/trace_mmap.h

--
2.43.0.472.g3155946c3a-goog



2023-12-18 15:17:53

by Vincent Donnefort

[permalink] [raw]
Subject: [PATCH v7 1/2] ring-buffer: Introducing ring-buffer mapping functions

In preparation for allowing the user-space to map a ring-buffer, add
a set of mapping functions:

ring_buffer_{map,unmap}()
ring_buffer_map_fault()

And controls on the ring-buffer:

ring_buffer_map_get_reader() /* swap reader and head */

Mapping the ring-buffer also involves:

A unique ID for each buffer page of the ring-buffer, currently they
are only identified through their in-kernel VA.

A meta-page, where are stored ring-buffer statistics and a
description for the current reader

The linear mapping exposes the meta-page, and each bpage of the
ring-buffer, ordered following their unique ID, assigned during the
first mapping.

Once mapped, no bpage can get in or out of the ring-buffer: the buffer
size will remain unmodified and the splice enabling functions will in
reality simply memcpy the data instead of swapping the buffer pages.

Signed-off-by: Vincent Donnefort <[email protected]>

diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h
index 929ed54dd651..e77a2685fe52 100644
--- a/include/linux/ring_buffer.h
+++ b/include/linux/ring_buffer.h
@@ -6,6 +6,8 @@
#include <linux/seq_file.h>
#include <linux/poll.h>

+#include <uapi/linux/trace_mmap.h>
+
struct trace_buffer;
struct ring_buffer_iter;

@@ -221,4 +223,9 @@ int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node);
#define trace_rb_cpu_prepare NULL
#endif

+int ring_buffer_map(struct trace_buffer *buffer, int cpu);
+int ring_buffer_unmap(struct trace_buffer *buffer, int cpu);
+struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu,
+ unsigned long pgoff);
+int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu);
#endif /* _LINUX_RING_BUFFER_H */
diff --git a/include/uapi/linux/trace_mmap.h b/include/uapi/linux/trace_mmap.h
new file mode 100644
index 000000000000..9536f0b7c094
--- /dev/null
+++ b/include/uapi/linux/trace_mmap.h
@@ -0,0 +1,29 @@
+/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */
+#ifndef _UAPI_TRACE_MMAP_H_
+#define _UAPI_TRACE_MMAP_H_
+
+#include <linux/types.h>
+
+struct trace_buffer_meta {
+ unsigned long entries;
+ unsigned long overrun;
+ unsigned long read;
+
+ unsigned long bpages_touched;
+ unsigned long bpages_lost;
+ unsigned long bpages_read;
+
+ struct {
+ unsigned long lost_events; /* Events lost at the time of the reader swap */
+ __u32 id; /* Reader bpage ID from 0 to nr_bpages - 1 */
+ __u32 read; /* Number of bytes read on the reader bpage */
+ } reader;
+
+ __u32 bpage_size; /* Size of each buffer page including the header */
+ __u32 nr_bpages; /* Number of buffer pages in the ring-buffer */
+
+ __u32 meta_page_size; /* Size of the meta-page */
+ __u32 meta_struct_len; /* Len of this struct */
+};
+
+#endif /* _UAPI_TRACE_MMAP_H_ */
diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index f95ad0f5be1b..2a6307af9c6c 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -335,6 +335,7 @@ struct buffer_page {
local_t write; /* index for next write */
unsigned read; /* index for next read */
local_t entries; /* entries on this page */
+ u32 id; /* ID for external mapping */
unsigned long real_end; /* real end of data */
unsigned order; /* order of the page */
struct buffer_data_page *page; /* Actual data page */
@@ -483,6 +484,12 @@ struct ring_buffer_per_cpu {
u64 read_stamp;
/* pages removed since last reset */
unsigned long pages_removed;
+
+ int mapped;
+ struct mutex mapping_lock;
+ unsigned long *bpage_ids; /* ID to addr */
+ struct trace_buffer_meta *meta_page;
+
/* ring buffer pages to update, > 0 to add, < 0 to remove */
long nr_pages_to_update;
struct list_head new_pages; /* new pages to add */
@@ -760,6 +767,22 @@ static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int f
return (dirty * 100) > (full * nr_pages);
}

+static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
+{
+ if (unlikely(READ_ONCE(cpu_buffer->mapped))) {
+ /* Ensure the meta_page is ready */
+ smp_rmb();
+ WRITE_ONCE(cpu_buffer->meta_page->entries,
+ local_read(&cpu_buffer->entries));
+ WRITE_ONCE(cpu_buffer->meta_page->overrun,
+ local_read(&cpu_buffer->overrun));
+ WRITE_ONCE(cpu_buffer->meta_page->bpages_touched,
+ local_read(&cpu_buffer->pages_touched));
+ WRITE_ONCE(cpu_buffer->meta_page->bpages_lost,
+ local_read(&cpu_buffer->pages_lost));
+ }
+}
+
/*
* rb_wake_up_waiters - wake up tasks waiting for ring buffer input
*
@@ -769,6 +792,10 @@ static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int f
static void rb_wake_up_waiters(struct irq_work *work)
{
struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
+ struct ring_buffer_per_cpu *cpu_buffer =
+ container_of(rbwork, struct ring_buffer_per_cpu, irq_work);
+
+ rb_update_meta_page(cpu_buffer);

wake_up_all(&rbwork->waiters);
if (rbwork->full_waiters_pending || rbwork->wakeup_full) {
@@ -1562,6 +1589,7 @@ rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)
init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
init_waitqueue_head(&cpu_buffer->irq_work.waiters);
init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
+ mutex_init(&cpu_buffer->mapping_lock);

bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
GFP_KERNEL, cpu_to_node(cpu));
@@ -4474,6 +4502,14 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
cpu_buffer->last_overrun = overwrite;
}

+ if (cpu_buffer->mapped) {
+ WRITE_ONCE(cpu_buffer->meta_page->reader.read, 0);
+ WRITE_ONCE(cpu_buffer->meta_page->reader.id, reader->id);
+ WRITE_ONCE(cpu_buffer->meta_page->reader.lost_events, cpu_buffer->lost_events);
+ WRITE_ONCE(cpu_buffer->meta_page->bpages_read,
+ local_read(&cpu_buffer->pages_read));
+ }
+
goto again;

out:
@@ -4541,6 +4577,12 @@ static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
length = rb_event_length(event);
cpu_buffer->reader_page->read += length;
cpu_buffer->read_bytes += length;
+ if (cpu_buffer->mapped) {
+ WRITE_ONCE(cpu_buffer->meta_page->reader.read,
+ cpu_buffer->reader_page->read);
+ WRITE_ONCE(cpu_buffer->meta_page->read,
+ cpu_buffer->read);
+ }
}

static void rb_advance_iter(struct ring_buffer_iter *iter)
@@ -5088,6 +5130,19 @@ static void rb_clear_buffer_page(struct buffer_page *page)
page->read = 0;
}

+static void rb_reset_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
+{
+ struct trace_buffer_meta *meta = cpu_buffer->meta_page;
+
+ WRITE_ONCE(meta->entries, 0);
+ WRITE_ONCE(meta->overrun, 0);
+ WRITE_ONCE(meta->read, cpu_buffer->read);
+ WRITE_ONCE(meta->bpages_touched, 0);
+ WRITE_ONCE(meta->bpages_lost, 0);
+ WRITE_ONCE(meta->bpages_read, local_read(&cpu_buffer->pages_read));
+ WRITE_ONCE(meta->reader.read, cpu_buffer->reader_page->read);
+}
+
static void
rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
{
@@ -5132,6 +5187,9 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
cpu_buffer->lost_events = 0;
cpu_buffer->last_overrun = 0;

+ if (cpu_buffer->mapped)
+ rb_reset_meta_page(cpu_buffer);
+
rb_head_page_activate(cpu_buffer);
cpu_buffer->pages_removed = 0;
}
@@ -5346,6 +5404,11 @@ int ring_buffer_swap_cpu(struct trace_buffer *buffer_a,
cpu_buffer_a = buffer_a->buffers[cpu];
cpu_buffer_b = buffer_b->buffers[cpu];

+ if (READ_ONCE(cpu_buffer_a->mapped) || READ_ONCE(cpu_buffer_b->mapped)) {
+ ret = -EBUSY;
+ goto out;
+ }
+
/* At least make sure the two buffers are somewhat the same */
if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
goto out;
@@ -5609,7 +5672,8 @@ int ring_buffer_read_page(struct trace_buffer *buffer,
* Otherwise, we can simply swap the page with the one passed in.
*/
if (read || (len < (commit - read)) ||
- cpu_buffer->reader_page == cpu_buffer->commit_page) {
+ cpu_buffer->reader_page == cpu_buffer->commit_page ||
+ cpu_buffer->mapped) {
struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
unsigned int rpos = read;
unsigned int pos = 0;
@@ -5828,6 +5892,11 @@ int ring_buffer_page_order_set(struct trace_buffer *buffer, int order)

cpu_buffer = buffer->buffers[cpu];

+ if (cpu_buffer->mapped) {
+ err = -EBUSY;
+ goto error;
+ }
+
/* Update the number of pages to match the new size */
nr_pages = old_size * buffer->buffers[cpu]->nr_pages;
nr_pages = DIV_ROUND_UP(nr_pages, buffer->bpage_size);
@@ -5929,6 +5998,308 @@ int ring_buffer_page_order_set(struct trace_buffer *buffer, int order)
}
EXPORT_SYMBOL_GPL(ring_buffer_page_order_set);

+#define bpage_subpage(sub_off, start) \
+ virt_to_page((void *)(start + (sub_off << PAGE_SHIFT)))
+
+#define foreach_bpage_subpage(sub_off, bpage_order, start, page) \
+ for (sub_off = 0, page = bpage_subpage(0, start); \
+ sub_off < (1 << bpage_order); \
+ sub_off++, page = bpage_subpage(sub_off, start))
+
+static inline void bpage_map_prepare(unsigned long start, int order)
+{
+ struct page *page;
+ int subpage_off;
+
+ /*
+ * When allocating order > 0 pages, only the first struct page has a
+ * refcount > 1. Increasing the refcount here ensures the none of the
+ * struct page composing the bpage is freeed when the mapping is closed.
+ */
+ foreach_bpage_subpage(subpage_off, order, start, page)
+ page_ref_inc(page);
+}
+
+static inline void bpage_unmap(unsigned long start, int order)
+{
+ struct page *page;
+ int subpage_off;
+
+ foreach_bpage_subpage(subpage_off, order, start, page) {
+ page_ref_dec(page);
+ page->mapping = NULL;
+ }
+}
+
+static void rb_free_bpage_ids(struct ring_buffer_per_cpu *cpu_buffer)
+{
+ int sub_id;
+
+ for (sub_id = 0; sub_id < cpu_buffer->nr_pages + 1; sub_id++)
+ bpage_unmap(cpu_buffer->bpage_ids[sub_id],
+ cpu_buffer->buffer->bpage_order);
+
+ kfree(cpu_buffer->bpage_ids);
+ cpu_buffer->bpage_ids = NULL;
+}
+
+static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
+{
+ if (cpu_buffer->meta_page)
+ return 0;
+
+ cpu_buffer->meta_page = page_to_virt(alloc_page(GFP_USER));
+ if (!cpu_buffer->meta_page)
+ return -ENOMEM;
+
+ return 0;
+}
+
+static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
+{
+ unsigned long addr = (unsigned long)cpu_buffer->meta_page;
+
+ virt_to_page((void *)addr)->mapping = NULL;
+ free_page(addr);
+ cpu_buffer->meta_page = NULL;
+}
+
+static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer,
+ unsigned long *bpage_ids)
+{
+ struct trace_buffer_meta *meta = cpu_buffer->meta_page;
+ unsigned int nr_bpages = cpu_buffer->nr_pages + 1;
+ struct buffer_page *first_page, *bpage;
+ int id = 0;
+
+ bpage_ids[id] = (unsigned long)cpu_buffer->reader_page->page;
+ bpage_map_prepare(bpage_ids[id], cpu_buffer->buffer->bpage_order);
+ cpu_buffer->reader_page->id = id++;
+
+ first_page = bpage = rb_set_head_page(cpu_buffer);
+ do {
+
+ if (id >= nr_bpages) {
+ WARN_ON(1);
+ break;
+ }
+
+ bpage_ids[id] = (unsigned long)bpage->page;
+ bpage->id = id;
+ bpage_map_prepare(bpage_ids[id], cpu_buffer->buffer->bpage_order);
+
+ rb_inc_page(&bpage);
+ id++;
+ } while (bpage != first_page);
+
+ /* install page ID to kern VA translation */
+ cpu_buffer->bpage_ids = bpage_ids;
+
+ meta->meta_page_size = PAGE_SIZE;
+ meta->meta_struct_len = sizeof(*meta);
+ meta->nr_bpages = nr_bpages;
+ meta->bpage_size = cpu_buffer->buffer->bpage_size + BUF_PAGE_HDR_SIZE;
+ meta->reader.id = cpu_buffer->reader_page->id;
+ rb_reset_meta_page(cpu_buffer);
+}
+
+static inline struct ring_buffer_per_cpu *
+rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu)
+{
+ struct ring_buffer_per_cpu *cpu_buffer;
+
+ if (!cpumask_test_cpu(cpu, buffer->cpumask))
+ return ERR_PTR(-EINVAL);
+
+ cpu_buffer = buffer->buffers[cpu];
+
+ mutex_lock(&cpu_buffer->mapping_lock);
+
+ if (!cpu_buffer->mapped) {
+ mutex_unlock(&cpu_buffer->mapping_lock);
+ return ERR_PTR(-ENODEV);
+ }
+
+ return cpu_buffer;
+}
+
+static inline void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer)
+{
+ mutex_unlock(&cpu_buffer->mapping_lock);
+}
+
+int ring_buffer_map(struct trace_buffer *buffer, int cpu)
+{
+ struct ring_buffer_per_cpu *cpu_buffer;
+ unsigned long flags, *bpage_ids;
+ int err = 0;
+
+ if (!cpumask_test_cpu(cpu, buffer->cpumask))
+ return -EINVAL;
+
+ cpu_buffer = buffer->buffers[cpu];
+
+ mutex_lock(&cpu_buffer->mapping_lock);
+
+ if (cpu_buffer->mapped) {
+ WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped + 1);
+ goto unlock;
+ }
+
+ /* prevent another thread from changing buffer sizes */
+ mutex_lock(&buffer->mutex);
+
+ err = rb_alloc_meta_page(cpu_buffer);
+ if (err)
+ goto unlock;
+
+ /* bpage_ids include the reader while nr_pages does not */
+ bpage_ids = kzalloc(sizeof(*bpage_ids) * (cpu_buffer->nr_pages + 1),
+ GFP_KERNEL);
+ if (!bpage_ids) {
+ rb_free_meta_page(cpu_buffer);
+ err = -ENOMEM;
+ goto unlock;
+ }
+
+ atomic_inc(&cpu_buffer->resize_disabled);
+
+ /*
+ * Lock all readers to block any page swap until the page IDs are
+ * assigned.
+ */
+ raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+
+ rb_setup_ids_meta_page(cpu_buffer, bpage_ids);
+ /*
+ * Ensure rb_update_meta() will observe the meta-page before
+ * cpu_buffer->mapped.
+ */
+ smp_wmb();
+ WRITE_ONCE(cpu_buffer->mapped, 1);
+
+ /* Init meta_page values unless the writer did it already */
+ cmpxchg(&cpu_buffer->meta_page->entries, 0,
+ local_read(&cpu_buffer->entries));
+ cmpxchg(&cpu_buffer->meta_page->overrun, 0,
+ local_read(&cpu_buffer->overrun));
+ cmpxchg(&cpu_buffer->meta_page->bpages_touched, 0,
+ local_read(&cpu_buffer->pages_touched));
+ cmpxchg(&cpu_buffer->meta_page->bpages_lost, 0,
+ local_read(&cpu_buffer->pages_lost));
+
+ raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+unlock:
+ mutex_unlock(&buffer->mutex);
+ mutex_unlock(&cpu_buffer->mapping_lock);
+
+ return err;
+}
+
+int ring_buffer_unmap(struct trace_buffer *buffer, int cpu)
+{
+ struct ring_buffer_per_cpu *cpu_buffer;
+ int err = 0;
+
+ if (!cpumask_test_cpu(cpu, buffer->cpumask))
+ return -EINVAL;
+
+ cpu_buffer = buffer->buffers[cpu];
+
+ mutex_lock(&cpu_buffer->mapping_lock);
+
+ if (!cpu_buffer->mapped) {
+ err = -ENODEV;
+ goto unlock;
+ }
+
+ WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped - 1);
+ if (!cpu_buffer->mapped) {
+ /* Wait for the writer and readers to observe !mapped */
+ synchronize_rcu();
+
+ rb_free_bpage_ids(cpu_buffer);
+ rb_free_meta_page(cpu_buffer);
+ atomic_dec(&cpu_buffer->resize_disabled);
+ }
+
+unlock:
+ mutex_unlock(&cpu_buffer->mapping_lock);
+
+ return err;
+}
+
+/*
+ * +--------------+ pgoff == 0
+ * | meta page |
+ * +--------------+ pgoff == 1
+ * | bpage 0 |
+ * +--------------+ pgoff == 1 + (1 << bpage_order)
+ * | bpage 1 |
+ * ...
+ */
+struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu,
+ unsigned long pgoff)
+{
+ struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
+ unsigned long bpage_id, bpage_offset, addr;
+ struct page *page;
+
+ if (!pgoff)
+ return virt_to_page((void *)cpu_buffer->meta_page);
+
+ pgoff--;
+
+ bpage_id = pgoff >> buffer->bpage_order;
+ if (bpage_id > cpu_buffer->nr_pages)
+ return NULL;
+
+ bpage_offset = pgoff & ((1UL << buffer->bpage_order) - 1);
+ addr = cpu_buffer->bpage_ids[bpage_id] + (bpage_offset * PAGE_SIZE);
+ page = virt_to_page((void *)addr);
+
+ return page;
+}
+
+int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu)
+{
+ struct ring_buffer_per_cpu *cpu_buffer;
+ unsigned long reader_size;
+ unsigned long flags;
+
+ cpu_buffer = rb_get_mapped_buffer(buffer, cpu);
+ if (IS_ERR(cpu_buffer))
+ return (int)PTR_ERR(cpu_buffer);
+
+ raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+consume:
+ if (rb_per_cpu_empty(cpu_buffer))
+ goto out;
+
+ reader_size = rb_page_size(cpu_buffer->reader_page);
+
+ /*
+ * There are data to be read on the current reader page, we can
+ * return to the caller. But before that, we assume the latter will read
+ * everything. Let's update the kernel reader accordingly.
+ */
+ if (cpu_buffer->reader_page->read < reader_size) {
+ while (cpu_buffer->reader_page->read < reader_size)
+ rb_advance_reader(cpu_buffer);
+ goto out;
+ }
+
+ if (WARN_ON(!rb_get_reader_page(cpu_buffer)))
+ goto out;
+
+ goto consume;
+out:
+ raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+ rb_put_mapped_buffer(cpu_buffer);
+
+ return 0;
+}
+
/*
* We only allocate new buffers, never free them if the CPU goes down.
* If we were to free the buffer, then the user would lose any trace that was in
--
2.43.0.472.g3155946c3a-goog


2023-12-18 15:18:04

by Vincent Donnefort

[permalink] [raw]
Subject: [PATCH v7 2/2] tracing: Allow user-space mapping of the ring-buffer

Currently, user-space extracts data from the ring-buffer via splice,
which is handy for storage or network sharing. However, due to splice
limitations, it is imposible to do real-time analysis without a copy.

A solution for that problem is to let the user-space map the ring-buffer
directly.

The mapping is exposed via the per-CPU file trace_pipe_raw. The first
element of the mapping is the meta-page. It is followed by each
bpage constituting the ring-buffer, ordered by their unique page ID:

* Meta-page -- include/uapi/linux/trace_mmap.h for a description
* buffer-page ID 0
* buffer-page ID 1
...

It is therefore easy to translate a buffer page ID into an offset in the
mapping:

reader_id = meta->reader->id;
reader_offset = meta->meta_page_size + reader_id * meta->bpage_size;

When new data is available, the mapper must call a newly introduced ioctl:
TRACE_MMAP_IOCTL_GET_READER. This will update the Meta-page reader ID to
point to the next reader containing unread data.

Signed-off-by: Vincent Donnefort <[email protected]>

diff --git a/include/uapi/linux/trace_mmap.h b/include/uapi/linux/trace_mmap.h
index 9536f0b7c094..e44563cf5ede 100644
--- a/include/uapi/linux/trace_mmap.h
+++ b/include/uapi/linux/trace_mmap.h
@@ -26,4 +26,6 @@ struct trace_buffer_meta {
__u32 meta_struct_len; /* Len of this struct */
};

+#define TRACE_MMAP_IOCTL_GET_READER _IO('T', 0x1)
+
#endif /* _UAPI_TRACE_MMAP_H_ */
diff --git a/kernel/trace/trace.c b/kernel/trace/trace.c
index c17dd849e6f1..0a4927e56315 100644
--- a/kernel/trace/trace.c
+++ b/kernel/trace/trace.c
@@ -8590,15 +8590,31 @@ tracing_buffers_splice_read(struct file *file, loff_t *ppos,
return ret;
}

-/* An ioctl call with cmd 0 to the ring buffer file will wake up all waiters */
static long tracing_buffers_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
{
struct ftrace_buffer_info *info = file->private_data;
struct trace_iterator *iter = &info->iter;
+ int err;

- if (cmd)
- return -ENOIOCTLCMD;
+ if (cmd == TRACE_MMAP_IOCTL_GET_READER) {
+ if (!(file->f_flags & O_NONBLOCK)) {
+ err = ring_buffer_wait(iter->array_buffer->buffer,
+ iter->cpu_file,
+ iter->tr->buffer_percent);
+ if (err)
+ return err;
+ }

+ return ring_buffer_map_get_reader(iter->array_buffer->buffer,
+ iter->cpu_file);
+ } else if (cmd) {
+ return -ENOTTY;
+ }
+
+ /*
+ * An ioctl call with cmd 0 to the ring buffer file will wake up all
+ * waiters
+ */
mutex_lock(&trace_types_lock);

iter->wait_index++;
@@ -8611,6 +8627,62 @@ static long tracing_buffers_ioctl(struct file *file, unsigned int cmd, unsigned
return 0;
}

+static vm_fault_t tracing_buffers_mmap_fault(struct vm_fault *vmf)
+{
+ struct ftrace_buffer_info *info = vmf->vma->vm_file->private_data;
+ struct trace_iterator *iter = &info->iter;
+ vm_fault_t ret = VM_FAULT_SIGBUS;
+ struct page *page;
+
+ page = ring_buffer_map_fault(iter->array_buffer->buffer, iter->cpu_file,
+ vmf->pgoff);
+ if (!page)
+ return ret;
+
+ get_page(page);
+ vmf->page = page;
+ vmf->page->mapping = vmf->vma->vm_file->f_mapping;
+ vmf->page->index = vmf->pgoff;
+
+ return 0;
+}
+
+static void tracing_buffers_mmap_close(struct vm_area_struct *vma)
+{
+ struct ftrace_buffer_info *info = vma->vm_file->private_data;
+ struct trace_iterator *iter = &info->iter;
+
+ ring_buffer_unmap(iter->array_buffer->buffer, iter->cpu_file);
+}
+
+static void tracing_buffers_mmap_open(struct vm_area_struct *vma)
+{
+ struct ftrace_buffer_info *info = vma->vm_file->private_data;
+ struct trace_iterator *iter = &info->iter;
+
+ WARN_ON(ring_buffer_map(iter->array_buffer->buffer, iter->cpu_file));
+}
+
+static const struct vm_operations_struct tracing_buffers_vmops = {
+ .open = tracing_buffers_mmap_open,
+ .close = tracing_buffers_mmap_close,
+ .fault = tracing_buffers_mmap_fault,
+};
+
+static int tracing_buffers_mmap(struct file *filp, struct vm_area_struct *vma)
+{
+ struct ftrace_buffer_info *info = filp->private_data;
+ struct trace_iterator *iter = &info->iter;
+
+ if (vma->vm_flags & VM_WRITE)
+ return -EPERM;
+
+ vm_flags_mod(vma, VM_DONTCOPY | VM_DONTDUMP, VM_MAYWRITE);
+ vma->vm_ops = &tracing_buffers_vmops;
+
+ return ring_buffer_map(iter->array_buffer->buffer, iter->cpu_file);
+}
+
static const struct file_operations tracing_buffers_fops = {
.open = tracing_buffers_open,
.read = tracing_buffers_read,
@@ -8619,6 +8691,7 @@ static const struct file_operations tracing_buffers_fops = {
.splice_read = tracing_buffers_splice_read,
.unlocked_ioctl = tracing_buffers_ioctl,
.llseek = no_llseek,
+ .mmap = tracing_buffers_mmap,
};

static ssize_t
--
2.43.0.472.g3155946c3a-goog


2023-12-18 15:50:02

by Vincent Donnefort

[permalink] [raw]
Subject: [PATCH v7 0/2] ring-buffer: Rename sub-buffer into buffer page

Previously was introduced the ability to change the ring-buffer page
size. It also introduced the concept of sub-buffer that is, a contiguous
virtual memory space which can now be bigger than the system page size
(4K on most systems). But behind the scene this is really just a page
with an order > 0 and a struct buffer_page (often refered as "bpage")
already exists. We have then an unnecessary duplicate subbuffer ==
bpage.

Remove all references to sub-buffer and replace them with either bpage
or ring_buffer_page.

Signed-off-by: Vincent Donnefort <[email protected]>

---

I forgot this patch when sending the v7 :-(

diff --git a/Documentation/trace/ftrace.rst b/Documentation/trace/ftrace.rst
index 8571d84d129b..4cd1d89b4ac6 100644
--- a/Documentation/trace/ftrace.rst
+++ b/Documentation/trace/ftrace.rst
@@ -203,26 +203,26 @@ of ftrace. Here is a list of some of the key files:

This displays the total combined size of all the trace buffers.

- buffer_subbuf_size_kb:
-
- This sets or displays the sub buffer size. The ring buffer is broken up
- into several same size "sub buffers". An event can not be bigger than
- the size of the sub buffer. Normally, the sub buffer is the size of the
- architecture's page (4K on x86). The sub buffer also contains meta data
- at the start which also limits the size of an event. That means when
- the sub buffer is a page size, no event can be larger than the page
- size minus the sub buffer meta data.
-
- Note, the buffer_subbuf_size_kb is a way for the user to specify the
- minimum size of the subbuffer. The kernel may make it bigger due to the
- implementation details, or simply fail the operation if the kernel can
- not handle the request.
-
- Changing the sub buffer size allows for events to be larger than the
- page size.
-
- Note: When changing the sub-buffer size, tracing is stopped and any
- data in the ring buffer and the snapshot buffer will be discarded.
+ buffer_page_size_kb:
+
+ This sets or displays the ring-buffer page size. The ring buffer is
+ broken up into several same size "buffer pages". An event can not be
+ bigger than the size of the buffer page. Normally, the buffer page is
+ the size of the architecture's page (4K on x86). The buffer page also
+ contains meta data at the start which also limits the size of an event.
+ That means when the buffer page is a system page size, no event can be
+ larger than the system page size minus the buffer page meta data.
+
+ Note, the buffer_page_size_kb is a way for the user to specify the
+ minimum size for each buffer page. The kernel may make it bigger due to
+ the implementation details, or simply fail the operation if the kernel
+ can not handle the request.
+
+ Changing the ring-buffer page size allows for events to be larger than
+ the system page size.
+
+ Note: When changing the buffer page size, tracing is stopped and any
+ data in the ring buffer and the snapshot buffer will be discarded.

free_buffer:

diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h
index fa802db216f9..929ed54dd651 100644
--- a/include/linux/ring_buffer.h
+++ b/include/linux/ring_buffer.h
@@ -207,9 +207,9 @@ struct trace_seq;
int ring_buffer_print_entry_header(struct trace_seq *s);
int ring_buffer_print_page_header(struct trace_buffer *buffer, struct trace_seq *s);

-int ring_buffer_subbuf_order_get(struct trace_buffer *buffer);
-int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order);
-int ring_buffer_subbuf_size_get(struct trace_buffer *buffer);
+int ring_buffer_page_order_get(struct trace_buffer *buffer);
+int ring_buffer_page_order_set(struct trace_buffer *buffer, int order);
+int ring_buffer_page_size_get(struct trace_buffer *buffer);

enum ring_buffer_flags {
RB_FL_OVERWRITE = 1 << 0,
diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index 9b95297339b6..f95ad0f5be1b 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -511,8 +511,8 @@ struct trace_buffer {
struct rb_irq_work irq_work;
bool time_stamp_abs;

- unsigned int subbuf_size;
- unsigned int subbuf_order;
+ unsigned int bpage_size;
+ unsigned int bpage_order;
unsigned int max_data_size;
};

@@ -555,7 +555,7 @@ int ring_buffer_print_page_header(struct trace_buffer *buffer, struct trace_seq
trace_seq_printf(s, "\tfield: char data;\t"
"offset:%u;\tsize:%u;\tsigned:%u;\n",
(unsigned int)offsetof(typeof(field), data),
- (unsigned int)buffer->subbuf_size,
+ (unsigned int)buffer->bpage_size,
(unsigned int)is_signed_type(char));

return !trace_seq_has_overflowed(s);
@@ -1488,11 +1488,11 @@ static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
list_add(&bpage->list, pages);

page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu), mflags,
- cpu_buffer->buffer->subbuf_order);
+ cpu_buffer->buffer->bpage_order);
if (!page)
goto free_pages;
bpage->page = page_address(page);
- bpage->order = cpu_buffer->buffer->subbuf_order;
+ bpage->order = cpu_buffer->buffer->bpage_order;
rb_init_page(bpage->page);

if (user_thread && fatal_signal_pending(current))
@@ -1572,7 +1572,7 @@ rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)

cpu_buffer->reader_page = bpage;

- page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, cpu_buffer->buffer->subbuf_order);
+ page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, cpu_buffer->buffer->bpage_order);
if (!page)
goto fail_free_reader;
bpage->page = page_address(page);
@@ -1656,13 +1656,13 @@ struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
goto fail_free_buffer;

/* Default buffer page size - one system page */
- buffer->subbuf_order = 0;
- buffer->subbuf_size = PAGE_SIZE - BUF_PAGE_HDR_SIZE;
+ buffer->bpage_order = 0;
+ buffer->bpage_size = PAGE_SIZE - BUF_PAGE_HDR_SIZE;

/* Max payload is buffer page size - header (8bytes) */
- buffer->max_data_size = buffer->subbuf_size - (sizeof(u32) * 2);
+ buffer->max_data_size = buffer->bpage_size - (sizeof(u32) * 2);

- nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size);
+ nr_pages = DIV_ROUND_UP(size, buffer->bpage_size);
buffer->flags = flags;
buffer->clock = trace_clock_local;
buffer->reader_lock_key = key;
@@ -1981,7 +1981,7 @@ static void update_pages_handler(struct work_struct *work)
* @size: the new size.
* @cpu_id: the cpu buffer to resize
*
- * Minimum size is 2 * buffer->subbuf_size.
+ * Minimum size is 2 * buffer->bpage_size.
*
* Returns 0 on success and < 0 on failure.
*/
@@ -2003,7 +2003,7 @@ int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size,
!cpumask_test_cpu(cpu_id, buffer->cpumask))
return 0;

- nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size);
+ nr_pages = DIV_ROUND_UP(size, buffer->bpage_size);

/* we need a minimum of two pages */
if (nr_pages < 2)
@@ -2483,7 +2483,7 @@ static inline void
rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
unsigned long tail, struct rb_event_info *info)
{
- unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size);
+ unsigned long bsize = READ_ONCE(cpu_buffer->buffer->bpage_size);
struct buffer_page *tail_page = info->tail_page;
struct ring_buffer_event *event;
unsigned long length = info->length;
@@ -3426,7 +3426,7 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
tail = write - info->length;

/* See if we shot pass the end of this buffer page */
- if (unlikely(write > cpu_buffer->buffer->subbuf_size)) {
+ if (unlikely(write > cpu_buffer->buffer->bpage_size)) {
check_buffer(cpu_buffer, info, CHECK_FULL_PAGE);
return rb_move_tail(cpu_buffer, tail, info);
}
@@ -4355,7 +4355,7 @@ static struct buffer_page *
rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
{
struct buffer_page *reader = NULL;
- unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size);
+ unsigned long bsize = READ_ONCE(cpu_buffer->buffer->bpage_size);
unsigned long overwrite;
unsigned long flags;
int nr_loops = 0;
@@ -4935,7 +4935,7 @@ ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags)
return NULL;

/* Holds the entire event: data and meta data */
- iter->event_size = buffer->subbuf_size;
+ iter->event_size = buffer->bpage_size;
iter->event = kmalloc(iter->event_size, flags);
if (!iter->event) {
kfree(iter);
@@ -5054,14 +5054,14 @@ unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu)
{
/*
* Earlier, this method returned
- * buffer->subbuf_size * buffer->nr_pages
+ * buffer->bpage_size * buffer->nr_pages
* Since the nr_pages field is now removed, we have converted this to
* return the per cpu buffer value.
*/
if (!cpumask_test_cpu(cpu, buffer->cpumask))
return 0;

- return buffer->subbuf_size * buffer->buffers[cpu]->nr_pages;
+ return buffer->bpage_size * buffer->buffers[cpu]->nr_pages;
}
EXPORT_SYMBOL_GPL(ring_buffer_size);

@@ -5350,7 +5350,7 @@ int ring_buffer_swap_cpu(struct trace_buffer *buffer_a,
if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
goto out;

- if (buffer_a->subbuf_order != buffer_b->subbuf_order)
+ if (buffer_a->bpage_order != buffer_b->bpage_order)
goto out;

ret = -EAGAIN;
@@ -5439,7 +5439,7 @@ ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu)
if (!bpage)
return ERR_PTR(-ENOMEM);

- bpage->order = buffer->subbuf_order;
+ bpage->order = buffer->bpage_order;
cpu_buffer = buffer->buffers[cpu];
local_irq_save(flags);
arch_spin_lock(&cpu_buffer->lock);
@@ -5456,7 +5456,7 @@ ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu)
goto out;

page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL | __GFP_NORETRY,
- cpu_buffer->buffer->subbuf_order);
+ cpu_buffer->buffer->bpage_order);
if (!page) {
kfree(bpage);
return ERR_PTR(-ENOMEM);
@@ -5494,10 +5494,10 @@ void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu,

/*
* If the page is still in use someplace else, or order of the page
- * is different from the subbuffer order of the buffer -
+ * is different from the bpage order of the buffer -
* we can't reuse it
*/
- if (page_ref_count(page) > 1 || data_page->order != buffer->subbuf_order)
+ if (page_ref_count(page) > 1 || data_page->order != buffer->bpage_order)
goto out;

local_irq_save(flags);
@@ -5580,7 +5580,7 @@ int ring_buffer_read_page(struct trace_buffer *buffer,

if (!data_page || !data_page->data)
goto out;
- if (data_page->order != buffer->subbuf_order)
+ if (data_page->order != buffer->bpage_order)
goto out;

bpage = data_page->data;
@@ -5703,7 +5703,7 @@ int ring_buffer_read_page(struct trace_buffer *buffer,
/* If there is room at the end of the page to save the
* missed events, then record it there.
*/
- if (buffer->subbuf_size - commit >= sizeof(missed_events)) {
+ if (buffer->bpage_size - commit >= sizeof(missed_events)) {
memcpy(&bpage->data[commit], &missed_events,
sizeof(missed_events));
local_add(RB_MISSED_STORED, &bpage->commit);
@@ -5715,8 +5715,8 @@ int ring_buffer_read_page(struct trace_buffer *buffer,
/*
* This page may be off to user land. Zero it out here.
*/
- if (commit < buffer->subbuf_size)
- memset(&bpage->data[commit], 0, buffer->subbuf_size - commit);
+ if (commit < buffer->bpage_size)
+ memset(&bpage->data[commit], 0, buffer->bpage_size - commit);

out_unlock:
raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
@@ -5739,19 +5739,19 @@ void *ring_buffer_read_page_data(struct buffer_data_read_page *page)
EXPORT_SYMBOL_GPL(ring_buffer_read_page_data);

/**
- * ring_buffer_subbuf_size_get - get size of the sub buffer.
+ * ring_buffer_page_size_get - get size of the sub buffer.
* @buffer: the buffer to get the sub buffer size from
*
* Returns size of the sub buffer, in bytes.
*/
-int ring_buffer_subbuf_size_get(struct trace_buffer *buffer)
+int ring_buffer_page_size_get(struct trace_buffer *buffer)
{
- return buffer->subbuf_size + BUF_PAGE_HDR_SIZE;
+ return buffer->bpage_size + BUF_PAGE_HDR_SIZE;
}
-EXPORT_SYMBOL_GPL(ring_buffer_subbuf_size_get);
+EXPORT_SYMBOL_GPL(ring_buffer_page_size_get);

/**
- * ring_buffer_subbuf_order_get - get order of system sub pages in one buffer page.
+ * ring_buffer_page_order_get - get order of system sub pages in one buffer page.
* @buffer: The ring_buffer to get the system sub page order from
*
* By default, one ring buffer sub page equals to one system page. This parameter
@@ -5762,17 +5762,17 @@ EXPORT_SYMBOL_GPL(ring_buffer_subbuf_size_get);
* 0 means the sub buffer size is 1 system page and so forth.
* In case of an error < 0 is returned.
*/
-int ring_buffer_subbuf_order_get(struct trace_buffer *buffer)
+int ring_buffer_page_order_get(struct trace_buffer *buffer)
{
if (!buffer)
return -EINVAL;

- return buffer->subbuf_order;
+ return buffer->bpage_order;
}
-EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_get);
+EXPORT_SYMBOL_GPL(ring_buffer_page_order_get);

/**
- * ring_buffer_subbuf_order_set - set the size of ring buffer sub page.
+ * ring_buffer_page_order_set - set the size of ring buffer sub page.
* @buffer: The ring_buffer to set the new page size.
* @order: Order of the system pages in one sub buffer page
*
@@ -5787,7 +5787,7 @@ EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_get);
*
* Returns 0 on success or < 0 in case of an error.
*/
-int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order)
+int ring_buffer_page_order_set(struct trace_buffer *buffer, int order)
{
struct ring_buffer_per_cpu *cpu_buffer;
struct buffer_page *bpage, *tmp;
@@ -5800,15 +5800,15 @@ int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order)
if (!buffer || order < 0)
return -EINVAL;

- if (buffer->subbuf_order == order)
+ if (buffer->bpage_order == order)
return 0;

psize = (1 << order) * PAGE_SIZE;
if (psize <= BUF_PAGE_HDR_SIZE)
return -EINVAL;

- old_order = buffer->subbuf_order;
- old_size = buffer->subbuf_size;
+ old_order = buffer->bpage_order;
+ old_size = buffer->bpage_size;

/* prevent another thread from changing buffer sizes */
mutex_lock(&buffer->mutex);
@@ -5817,8 +5817,8 @@ int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order)
/* Make sure all commits have finished */
synchronize_rcu();

- buffer->subbuf_order = order;
- buffer->subbuf_size = psize - BUF_PAGE_HDR_SIZE;
+ buffer->bpage_order = order;
+ buffer->bpage_size = psize - BUF_PAGE_HDR_SIZE;

/* Make sure all new buffers are allocated, before deleting the old ones */
for_each_buffer_cpu(buffer, cpu) {
@@ -5830,7 +5830,7 @@ int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order)

/* Update the number of pages to match the new size */
nr_pages = old_size * buffer->buffers[cpu]->nr_pages;
- nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size);
+ nr_pages = DIV_ROUND_UP(nr_pages, buffer->bpage_size);

/* we need a minimum of two pages */
if (nr_pages < 2)
@@ -5907,8 +5907,8 @@ int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order)
return 0;

error:
- buffer->subbuf_order = old_order;
- buffer->subbuf_size = old_size;
+ buffer->bpage_order = old_order;
+ buffer->bpage_size = old_size;

atomic_dec(&buffer->record_disabled);
mutex_unlock(&buffer->mutex);
@@ -5927,7 +5927,7 @@ int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order)

return err;
}
-EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set);
+EXPORT_SYMBOL_GPL(ring_buffer_page_order_set);

/*
* We only allocate new buffers, never free them if the CPU goes down.
diff --git a/kernel/trace/ring_buffer_benchmark.c b/kernel/trace/ring_buffer_benchmark.c
index 008187ebd7fe..b58ced8f4626 100644
--- a/kernel/trace/ring_buffer_benchmark.c
+++ b/kernel/trace/ring_buffer_benchmark.c
@@ -118,7 +118,7 @@ static enum event_status read_page(int cpu)
if (IS_ERR(bpage))
return EVENT_DROPPED;

- page_size = ring_buffer_subbuf_size_get(buffer);
+ page_size = ring_buffer_page_size_get(buffer);
ret = ring_buffer_read_page(buffer, bpage, page_size, cpu, 1);
if (ret >= 0) {
rpage = ring_buffer_read_page_data(bpage);
diff --git a/kernel/trace/trace.c b/kernel/trace/trace.c
index b35c85edbb49..c17dd849e6f1 100644
--- a/kernel/trace/trace.c
+++ b/kernel/trace/trace.c
@@ -1269,8 +1269,8 @@ int tracing_alloc_snapshot_instance(struct trace_array *tr)
if (!tr->allocated_snapshot) {

/* Make the snapshot buffer have the same order as main buffer */
- order = ring_buffer_subbuf_order_get(tr->array_buffer.buffer);
- ret = ring_buffer_subbuf_order_set(tr->max_buffer.buffer, order);
+ order = ring_buffer_page_order_get(tr->array_buffer.buffer);
+ ret = ring_buffer_page_order_set(tr->max_buffer.buffer, order);
if (ret < 0)
return ret;

@@ -1293,7 +1293,7 @@ static void free_snapshot(struct trace_array *tr)
* The max_tr ring buffer has some state (e.g. ring->clock) and
* we want preserve it.
*/
- ring_buffer_subbuf_order_set(tr->max_buffer.buffer, 0);
+ ring_buffer_page_order_set(tr->max_buffer.buffer, 0);
ring_buffer_resize(tr->max_buffer.buffer, 1, RING_BUFFER_ALL_CPUS);
set_buffer_entries(&tr->max_buffer, 1);
tracing_reset_online_cpus(&tr->max_buffer);
@@ -8315,7 +8315,7 @@ tracing_buffers_read(struct file *filp, char __user *ubuf,
return -EBUSY;
#endif

- page_size = ring_buffer_subbuf_size_get(iter->array_buffer->buffer);
+ page_size = ring_buffer_page_size_get(iter->array_buffer->buffer);

/* Make sure the spare matches the current sub buffer size */
if (info->spare) {
@@ -8492,7 +8492,7 @@ tracing_buffers_splice_read(struct file *file, loff_t *ppos,
return -EBUSY;
#endif

- page_size = ring_buffer_subbuf_size_get(iter->array_buffer->buffer);
+ page_size = ring_buffer_page_size_get(iter->array_buffer->buffer);
if (*ppos & (page_size - 1))
return -EINVAL;

@@ -9391,7 +9391,7 @@ static const struct file_operations buffer_percent_fops = {
};

static ssize_t
-buffer_subbuf_size_read(struct file *filp, char __user *ubuf, size_t cnt, loff_t *ppos)
+buffer_page_size_read(struct file *filp, char __user *ubuf, size_t cnt, loff_t *ppos)
{
struct trace_array *tr = filp->private_data;
size_t size;
@@ -9399,7 +9399,7 @@ buffer_subbuf_size_read(struct file *filp, char __user *ubuf, size_t cnt, loff_t
int order;
int r;

- order = ring_buffer_subbuf_order_get(tr->array_buffer.buffer);
+ order = ring_buffer_page_order_get(tr->array_buffer.buffer);
size = (PAGE_SIZE << order) / 1024;

r = sprintf(buf, "%zd\n", size);
@@ -9408,8 +9408,8 @@ buffer_subbuf_size_read(struct file *filp, char __user *ubuf, size_t cnt, loff_t
}

static ssize_t
-buffer_subbuf_size_write(struct file *filp, const char __user *ubuf,
- size_t cnt, loff_t *ppos)
+buffer_page_size_write(struct file *filp, const char __user *ubuf,
+ size_t cnt, loff_t *ppos)
{
struct trace_array *tr = filp->private_data;
unsigned long val;
@@ -9434,11 +9434,11 @@ buffer_subbuf_size_write(struct file *filp, const char __user *ubuf,
/* Do not allow tracing while changing the order of the ring buffer */
tracing_stop_tr(tr);

- old_order = ring_buffer_subbuf_order_get(tr->array_buffer.buffer);
+ old_order = ring_buffer_page_order_get(tr->array_buffer.buffer);
if (old_order == order)
goto out;

- ret = ring_buffer_subbuf_order_set(tr->array_buffer.buffer, order);
+ ret = ring_buffer_page_order_set(tr->array_buffer.buffer, order);
if (ret)
goto out;

@@ -9447,10 +9447,10 @@ buffer_subbuf_size_write(struct file *filp, const char __user *ubuf,
if (!tr->allocated_snapshot)
goto out_max;

- ret = ring_buffer_subbuf_order_set(tr->max_buffer.buffer, order);
+ ret = ring_buffer_page_order_set(tr->max_buffer.buffer, order);
if (ret) {
/* Put back the old order */
- cnt = ring_buffer_subbuf_order_set(tr->array_buffer.buffer, old_order);
+ cnt = ring_buffer_page_order_set(tr->array_buffer.buffer, old_order);
if (WARN_ON_ONCE(cnt)) {
/*
* AARGH! We are left with different orders!
@@ -9479,10 +9479,10 @@ buffer_subbuf_size_write(struct file *filp, const char __user *ubuf,
return cnt;
}

-static const struct file_operations buffer_subbuf_size_fops = {
+static const struct file_operations buffer_page_size_fops = {
.open = tracing_open_generic_tr,
- .read = buffer_subbuf_size_read,
- .write = buffer_subbuf_size_write,
+ .read = buffer_page_size_read,
+ .write = buffer_page_size_write,
.release = tracing_release_generic_tr,
.llseek = default_llseek,
};
@@ -9953,8 +9953,8 @@ init_tracer_tracefs(struct trace_array *tr, struct dentry *d_tracer)
trace_create_file("buffer_percent", TRACE_MODE_WRITE, d_tracer,
tr, &buffer_percent_fops);

- trace_create_file("buffer_subbuf_size_kb", TRACE_MODE_WRITE, d_tracer,
- tr, &buffer_subbuf_size_fops);
+ trace_create_file("buffer_page_size_kb", TRACE_MODE_WRITE, d_tracer,
+ tr, &buffer_page_size_fops);

create_trace_options_dir(tr);

--
2.43.0.472.g3155946c3a-goog


2023-12-18 16:32:00

by Steven Rostedt

[permalink] [raw]
Subject: Re: [PATCH v7 0/2] ring-buffer: Rename sub-buffer into buffer page

On Mon, 18 Dec 2023 15:46:18 +0000
Vincent Donnefort <[email protected]> wrote:

> Previously was introduced the ability to change the ring-buffer page
> size. It also introduced the concept of sub-buffer that is, a contiguous
> virtual memory space which can now be bigger than the system page size
> (4K on most systems). But behind the scene this is really just a page
> with an order > 0 and a struct buffer_page (often refered as "bpage")
> already exists. We have then an unnecessary duplicate subbuffer ==
> bpage.
>
> Remove all references to sub-buffer and replace them with either bpage
> or ring_buffer_page.

No! I think you misunderstood me before.

I want it the other way around. In Linux, "page" usually means the
architecture page (4096 on x86). If we are going to remove the duplicate of
subbuffer == bpage, remove the "page" part, *not* the subbuffer part. The
"subbuf" part is part of the design, not an implementation detail. The page
part, was just an implementation detail that we do not want to expose!

When you say "page" you mean a single page. If you say buffer_page_size,
one will think this is an architecture page, not the subbuffer size.

The ring buffer is broken up into subbuffers, The page is just an
implementation detail. Let's not confuse that with the design.

Thanks,

-- Steve

2023-12-18 16:50:25

by Steven Rostedt

[permalink] [raw]
Subject: Re: [PATCH v7 0/2] ring-buffer: Rename sub-buffer into buffer page

On Mon, 18 Dec 2023 11:28:17 -0500
Steven Rostedt <[email protected]> wrote:

> > Remove all references to sub-buffer and replace them with either bpage
> > or ring_buffer_page.

The user interface should not be changed.

But what I would like to have changed (and this will come after all other
changes are complete), is to do the following renaming:

original structure name new name variable reference
----------------------- -------- ------------------
struct buffer_data_page struct buffer_data bdata (was bpage)
struct buffer_page struct buffer_subbuf subbuf (was also bpage)

Also change:

struct buffer_page {
[..]
struct buffer_data_page *page; /* Actual data page */
};

Into:

struct buffer_subbuf {
[..]
struct buffer_data *data; /* Actual data */
};



Then we can do a global rename of functions like:

free_buffer_page() -> free_buffer_subbuf()

And things like rb_init_page() into rb_init_subbuf()

__rb_allocate_pages() -> rb_allocate_subbufs()

etc.

This should be broken up into steps of changes:

1. Rename the struct buffer_data_page and bpage->bdata
2. Rename the struct buffer_page and bpage->subbuf
3. Rename static functions

And I just realized that the two functions exposed outside of the file are
not used anywhere else. We can remove one and make the other one static (as
it is used within the file).

ring_buffer_nr_pages() -> delete
ring_buffer_nr_dirty_pages -> convert to static rb_nr_dirty_pages()


Thanks,

-- Steve