In preparation for allowing the user-space to map a ring-buffer, add
a set of mapping functions:
ring_buffer_{map,unmap}()
ring_buffer_map_fault()
And controls on the ring-buffer:
ring_buffer_map_get_reader_page() /* swap reader and head */
Mapping the ring-buffer also involves:
A unique ID for each page of the ring-buffer, as currently the pages
are only identified through their in-kernel VA.
A meta-page, where are stored statistics about the ring-buffer and
a page IDs list, ordered. A field gives what page is the reader
one and one to gives where the ring-buffer starts in the list of data
pages.
The linear mapping exposes the meta-page, and each page of the
ring-buffer, ordered following their unique ID, assigned during the
first mapping.
Once mapped, no page can get in or out of the ring-buffer: the buffer
size will remain unmodified and the splice enabling functions will in
reality simply memcpy the data instead of swapping pages.
Signed-off-by: Vincent Donnefort <[email protected]>
diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h
index 782e14f62201..980abfbd92ed 100644
--- a/include/linux/ring_buffer.h
+++ b/include/linux/ring_buffer.h
@@ -6,6 +6,8 @@
#include <linux/seq_file.h>
#include <linux/poll.h>
+#include <uapi/linux/trace_mmap.h>
+
struct trace_buffer;
struct ring_buffer_iter;
@@ -211,4 +213,9 @@ int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node);
#define trace_rb_cpu_prepare NULL
#endif
+int ring_buffer_map(struct trace_buffer *buffer, int cpu);
+int ring_buffer_unmap(struct trace_buffer *buffer, int cpu);
+struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu,
+ unsigned long pgoff);
+int ring_buffer_map_get_reader_page(struct trace_buffer *buffer, int cpu);
#endif /* _LINUX_RING_BUFFER_H */
diff --git a/include/uapi/linux/trace_mmap.h b/include/uapi/linux/trace_mmap.h
new file mode 100644
index 000000000000..653176cc50bc
--- /dev/null
+++ b/include/uapi/linux/trace_mmap.h
@@ -0,0 +1,26 @@
+/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */
+#ifndef _UAPI_TRACE_MMAP_H_
+#define _UAPI_TRACE_MMAP_H_
+
+#include <linux/types.h>
+
+struct ring_buffer_meta {
+ unsigned long entries;
+ unsigned long overrun;
+ unsigned long read;
+
+ unsigned long pages_touched;
+ unsigned long pages_lost;
+ unsigned long pages_read;
+
+ __u32 meta_page_size;
+ __u32 nr_data_pages; /* Number of pages in the ring-buffer */
+
+ struct reader_page {
+ __u32 id; /* Reader page ID from 0 to nr_data_pages - 1 */
+ __u32 read; /* Number of bytes read on the reader page */
+ unsigned long lost_events; /* Events lost at the time of the reader swap */
+ } reader_page;
+};
+
+#endif /* _UAPI_TRACE_MMAP_H_ */
diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index de061dd47313..8f367fd3dbdd 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -332,6 +332,7 @@ struct buffer_page {
local_t entries; /* entries on this page */
unsigned long real_end; /* real end of data */
struct buffer_data_page *page; /* Actual data page */
+ u32 id; /* ID for external mapping */
};
/*
@@ -523,6 +524,12 @@ struct ring_buffer_per_cpu {
rb_time_t before_stamp;
u64 event_stamp[MAX_NEST];
u64 read_stamp;
+
+ int mapped;
+ struct mutex mapping_lock;
+ unsigned long *page_ids; /* ID to addr */
+ struct ring_buffer_meta *meta_page;
+
/* ring buffer pages to update, > 0 to add, < 0 to remove */
long nr_pages_to_update;
struct list_head new_pages; /* new pages to add */
@@ -1562,6 +1569,13 @@ static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer,
/* Again, either we update tail_page or an interrupt does */
(void)cmpxchg(&cpu_buffer->tail_page, tail_page, next_page);
}
+
+ if (READ_ONCE(cpu_buffer->mapped)) {
+ /* Ensure the meta_page is ready */
+ smp_rmb();
+ WRITE_ONCE(cpu_buffer->meta_page->pages_touched,
+ local_read(&cpu_buffer->pages_touched));
+ }
}
static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
@@ -1725,6 +1739,7 @@ rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)
init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
init_waitqueue_head(&cpu_buffer->irq_work.waiters);
init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
+ mutex_init(&cpu_buffer->mapping_lock);
bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
GFP_KERNEL, cpu_to_node(cpu));
@@ -2521,6 +2536,15 @@ rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer,
local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes);
local_inc(&cpu_buffer->pages_lost);
+ if (READ_ONCE(cpu_buffer->mapped)) {
+ /* Ensure the meta_page is ready */
+ smp_rmb();
+ WRITE_ONCE(cpu_buffer->meta_page->overrun,
+ local_read(&cpu_buffer->overrun));
+ WRITE_ONCE(cpu_buffer->meta_page->pages_lost,
+ local_read(&cpu_buffer->pages_lost));
+ }
+
/*
* The entries will be zeroed out when we move the
* tail page.
@@ -3183,6 +3207,14 @@ static inline void rb_event_discard(struct ring_buffer_event *event)
static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer)
{
local_inc(&cpu_buffer->entries);
+
+ if (READ_ONCE(cpu_buffer->mapped)) {
+ /* Ensure the meta_page is ready */
+ smp_rmb();
+ WRITE_ONCE(cpu_buffer->meta_page->entries,
+ local_read(&cpu_buffer->entries));
+ }
+
rb_end_commit(cpu_buffer);
}
@@ -3486,7 +3518,7 @@ static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer,
return;
/*
- * If this interrupted another event,
+ * If this interrupted another event,
*/
if (atomic_inc_return(this_cpu_ptr(&checking)) != 1)
goto out;
@@ -4658,6 +4690,13 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
cpu_buffer->last_overrun = overwrite;
}
+ if (cpu_buffer->mapped) {
+ WRITE_ONCE(cpu_buffer->meta_page->reader_page.read, 0);
+ WRITE_ONCE(cpu_buffer->meta_page->reader_page.id, reader->id);
+ WRITE_ONCE(cpu_buffer->meta_page->reader_page.lost_events, cpu_buffer->lost_events);
+ WRITE_ONCE(cpu_buffer->meta_page->pages_read, local_read(&cpu_buffer->pages_read));
+ }
+
goto again;
out:
@@ -4724,6 +4763,13 @@ static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
length = rb_event_length(event);
cpu_buffer->reader_page->read += length;
+
+ if (cpu_buffer->mapped) {
+ WRITE_ONCE(cpu_buffer->meta_page->reader_page.read,
+ cpu_buffer->reader_page->read);
+ WRITE_ONCE(cpu_buffer->meta_page->read,
+ cpu_buffer->read);
+ }
}
static void rb_advance_iter(struct ring_buffer_iter *iter)
@@ -5253,6 +5299,19 @@ static void rb_clear_buffer_page(struct buffer_page *page)
page->read = 0;
}
+static void rb_reset_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
+{
+ struct ring_buffer_meta *meta = cpu_buffer->meta_page;
+
+ WRITE_ONCE(meta->entries, 0);
+ WRITE_ONCE(meta->overrun, 0);
+ WRITE_ONCE(meta->read, cpu_buffer->read);
+ WRITE_ONCE(meta->pages_touched, 0);
+ WRITE_ONCE(meta->pages_lost, 0);
+ WRITE_ONCE(meta->pages_read, local_read(&cpu_buffer->pages_read));
+ WRITE_ONCE(meta->reader_page.read, cpu_buffer->reader_page->read);
+}
+
static void
rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
{
@@ -5297,6 +5356,9 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
cpu_buffer->lost_events = 0;
cpu_buffer->last_overrun = 0;
+ if (cpu_buffer->mapped)
+ rb_reset_meta_page(cpu_buffer);
+
rb_head_page_activate(cpu_buffer);
}
@@ -5511,6 +5573,11 @@ int ring_buffer_swap_cpu(struct trace_buffer *buffer_a,
cpu_buffer_a = buffer_a->buffers[cpu];
cpu_buffer_b = buffer_b->buffers[cpu];
+ if (READ_ONCE(cpu_buffer_a->mapped) || READ_ONCE(cpu_buffer_b->mapped)) {
+ ret = -EBUSY;
+ goto out;
+ }
+
/* At least make sure the two buffers are somewhat the same */
if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
goto out;
@@ -5753,7 +5820,8 @@ int ring_buffer_read_page(struct trace_buffer *buffer,
* Otherwise, we can simply swap the page with the one passed in.
*/
if (read || (len < (commit - read)) ||
- cpu_buffer->reader_page == cpu_buffer->commit_page) {
+ cpu_buffer->reader_page == cpu_buffer->commit_page ||
+ cpu_buffer->mapped) {
struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
unsigned int rpos = read;
unsigned int pos = 0;
@@ -5870,6 +5938,255 @@ int ring_buffer_read_page(struct trace_buffer *buffer,
}
EXPORT_SYMBOL_GPL(ring_buffer_read_page);
+static void rb_free_page_ids(struct ring_buffer_per_cpu *cpu_buffer)
+{
+ int i;
+
+ for (i = 0; i < cpu_buffer->nr_pages + 1; i++)
+ virt_to_page(cpu_buffer->page_ids[i])->mapping = NULL;
+
+ kfree(cpu_buffer->page_ids);
+ cpu_buffer->page_ids = NULL;
+}
+
+static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
+{
+ if (cpu_buffer->meta_page)
+ return 0;
+
+ cpu_buffer->meta_page = page_to_virt(alloc_page(GFP_USER));
+ if (!cpu_buffer->meta_page)
+ return -ENOMEM;
+
+ return 0;
+}
+
+static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
+{
+ unsigned long addr = (unsigned long)cpu_buffer->meta_page;
+
+ virt_to_page(addr)->mapping = NULL;
+ free_page(addr);
+ cpu_buffer->meta_page = NULL;
+}
+
+static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer,
+ unsigned long *page_ids)
+{
+ struct ring_buffer_meta *meta = cpu_buffer->meta_page;
+ unsigned int nr_data_pages = cpu_buffer->nr_pages + 1;
+ struct buffer_page *first_page, *bpage;
+ int id = 0;
+
+ page_ids[id] = (unsigned long)cpu_buffer->reader_page->page;
+ cpu_buffer->reader_page->id = id++;
+
+ first_page = bpage = rb_set_head_page(cpu_buffer);
+ do {
+ if (id >= nr_data_pages) {
+ WARN_ON(1);
+ break;
+ }
+
+ page_ids[id] = (unsigned long)bpage->page;
+ bpage->id = id;
+
+ rb_inc_page(&bpage);
+ id++;
+ } while (bpage != first_page);
+
+ /* install page ID to kern VA translation */
+ cpu_buffer->page_ids = page_ids;
+
+ meta->meta_page_size = PAGE_SIZE;
+ meta->nr_data_pages = nr_data_pages;
+ meta->reader_page.id = cpu_buffer->reader_page->id;
+ rb_reset_meta_page(cpu_buffer);
+}
+
+static inline struct ring_buffer_per_cpu *
+rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu)
+{
+ struct ring_buffer_per_cpu *cpu_buffer;
+
+ if (!cpumask_test_cpu(cpu, buffer->cpumask))
+ return ERR_PTR(-EINVAL);
+
+ cpu_buffer = buffer->buffers[cpu];
+
+ mutex_lock(&cpu_buffer->mapping_lock);
+
+ if (!cpu_buffer->mapped) {
+ mutex_unlock(&cpu_buffer->mapping_lock);
+ return ERR_PTR(-ENODEV);
+ }
+
+ return cpu_buffer;
+}
+
+static inline void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer)
+{
+ mutex_unlock(&cpu_buffer->mapping_lock);
+}
+
+int ring_buffer_map(struct trace_buffer *buffer, int cpu)
+{
+ struct ring_buffer_per_cpu *cpu_buffer;
+ unsigned long flags, *page_ids;
+ int err = 0;
+
+ if (!cpumask_test_cpu(cpu, buffer->cpumask))
+ return -EINVAL;
+
+ cpu_buffer = buffer->buffers[cpu];
+
+ mutex_lock(&cpu_buffer->mapping_lock);
+
+ if (cpu_buffer->mapped) {
+ WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped + 1);
+ goto unlock;
+ }
+
+ /* prevent another thread from changing buffer sizes */
+ mutex_lock(&buffer->mutex);
+ atomic_inc(&cpu_buffer->resize_disabled);
+ mutex_unlock(&buffer->mutex);
+
+ err = rb_alloc_meta_page(cpu_buffer);
+ if (err) {
+ atomic_dec(&cpu_buffer->resize_disabled);
+ goto unlock;
+ }
+
+ /* page_ids include the reader page while nr_pages does not */
+ page_ids = kzalloc(sizeof(*page_ids) * (cpu_buffer->nr_pages + 1),
+ GFP_KERNEL);
+ if (!page_ids) {
+ rb_free_meta_page(cpu_buffer);
+ atomic_dec(&cpu_buffer->resize_disabled);
+ err = -ENOMEM;
+ goto unlock;
+ }
+
+ /*
+ * Lock all readers to block any page swap until the page IDs are
+ * assigned.
+ */
+ raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+
+ rb_setup_ids_meta_page(cpu_buffer, page_ids);
+ /*
+ * Ensure the writer will observe the meta-page before
+ * cpu_buffer->mapped.
+ */
+ smp_wmb();
+ WRITE_ONCE(cpu_buffer->mapped, 1);
+
+ /* Init meta_page values unless the writer did it already */
+ cmpxchg(&cpu_buffer->meta_page->entries, 0,
+ local_read(&cpu_buffer->entries));
+ cmpxchg(&cpu_buffer->meta_page->overrun, 0,
+ local_read(&cpu_buffer->overrun));
+ cmpxchg(&cpu_buffer->meta_page->pages_touched, 0,
+ local_read(&cpu_buffer->pages_touched));
+ cmpxchg(&cpu_buffer->meta_page->pages_lost, 0,
+ local_read(&cpu_buffer->pages_lost));
+
+ raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+unlock:
+ mutex_unlock(&cpu_buffer->mapping_lock);
+
+ return err;
+}
+
+int ring_buffer_unmap(struct trace_buffer *buffer, int cpu)
+{
+ struct ring_buffer_per_cpu *cpu_buffer;
+ int err = 0;
+
+ if (!cpumask_test_cpu(cpu, buffer->cpumask))
+ return -EINVAL;
+
+ cpu_buffer = buffer->buffers[cpu];
+
+ mutex_lock(&cpu_buffer->mapping_lock);
+
+ if (!cpu_buffer->mapped) {
+ err = -ENODEV;
+ goto unlock;
+ }
+
+ WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped - 1);
+ if (!cpu_buffer->mapped) {
+ /* Wait the writer and readers to observe !mapped */
+ synchronize_rcu();
+
+ rb_free_page_ids(cpu_buffer);
+ rb_free_meta_page(cpu_buffer);
+ atomic_dec(&cpu_buffer->resize_disabled);
+ }
+
+unlock:
+ mutex_unlock(&cpu_buffer->mapping_lock);
+
+ return err;
+}
+
+/*
+ * +--------------+
+ * | meta page | pgoff=0
+ * +--------------+
+ * | data page1 | page_ids=0
+ * +--------------+
+ * | data page2 | page_ids=1
+ * ...
+ */
+struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu,
+ unsigned long pgoff)
+{
+ struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
+
+ if (!pgoff)
+ return virt_to_page((void *)cpu_buffer->meta_page);
+
+ pgoff--;
+ if (pgoff > cpu_buffer->nr_pages)
+ return NULL;
+
+ return virt_to_page(cpu_buffer->page_ids[pgoff]);
+}
+
+int ring_buffer_map_get_reader_page(struct trace_buffer *buffer, int cpu)
+{
+ struct ring_buffer_per_cpu *cpu_buffer;
+ unsigned long reader_size, flags;
+
+ cpu_buffer = rb_get_mapped_buffer(buffer, cpu);
+ if (IS_ERR(cpu_buffer))
+ return (int)PTR_ERR(cpu_buffer);
+
+ raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+consume:
+ if (rb_per_cpu_empty(cpu_buffer))
+ goto out;
+ reader_size = rb_page_size(cpu_buffer->reader_page);
+ if (cpu_buffer->reader_page->read < reader_size) {
+ while (cpu_buffer->reader_page->read < reader_size)
+ rb_advance_reader(cpu_buffer);
+ goto out;
+ }
+
+ if (WARN_ON(!rb_get_reader_page(cpu_buffer)))
+ goto out;
+
+ goto consume;
+out:
+ raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+ rb_put_mapped_buffer(cpu_buffer);
+
+ return 0;
+}
+
/*
* We only allocate new buffers, never free them if the CPU goes down.
* If we were to free the buffer, then the user would lose any trace that was in
--
2.41.0.487.g6d72f3e995-goog
Hi Vincent,
kernel test robot noticed the following build errors:
[auto build test ERROR on linus/master]
[cannot apply to rostedt-trace/for-next rostedt-trace/for-next-urgent]
[If your patch is applied to the wrong git tree, kindly drop us a note.
And when submitting patch, we suggest to use '--base' as documented in
https://git-scm.com/docs/git-format-patch#_base_tree_information]
url: https://github.com/intel-lab-lkp/linux/commits/Vincent-Donnefort/ring-buffer-Introducing-ring-buffer-mapping-functions/20230729-005300
base: linus/master
patch link: https://lore.kernel.org/r/20230728164754.460767-2-vdonnefort%40google.com
patch subject: [PATCH v5 1/2] ring-buffer: Introducing ring-buffer mapping functions
config: arm-randconfig-r033-20230728 (https://download.01.org/0day-ci/archive/20230729/[email protected]/config)
compiler: clang version 17.0.0 (https://github.com/llvm/llvm-project.git 4a5ac14ee968ff0ad5d2cc1ffa0299048db4c88a)
reproduce: (https://download.01.org/0day-ci/archive/20230729/[email protected]/reproduce)
If you fix the issue in a separate patch/commit (i.e. not just a new version of
the same patch/commit), kindly add following tags
| Reported-by: kernel test robot <[email protected]>
| Closes: https://lore.kernel.org/oe-kbuild-all/[email protected]/
All errors (new ones prefixed by >>):
>> kernel/trace/ring_buffer.c:5946:16: error: incompatible integer to pointer conversion passing 'unsigned long' to parameter of type 'const void *' [-Wint-conversion]
5946 | virt_to_page(cpu_buffer->page_ids[i])->mapping = NULL;
| ^~~~~~~~~~~~~~~~~~~~~~~
arch/arm/include/asm/memory.h:390:53: note: expanded from macro 'virt_to_page'
390 | #define virt_to_page(kaddr) pfn_to_page(virt_to_pfn(kaddr))
| ^~~~~
include/asm-generic/memory_model.h:18:41: note: expanded from macro '__pfn_to_page'
18 | #define __pfn_to_page(pfn) (mem_map + ((pfn) - ARCH_PFN_OFFSET))
| ^~~
arch/arm/include/asm/memory.h:296:53: note: passing argument to parameter 'p' here
296 | static inline unsigned long virt_to_pfn(const void *p)
| ^
kernel/trace/ring_buffer.c:5968:15: error: incompatible integer to pointer conversion passing 'unsigned long' to parameter of type 'const void *' [-Wint-conversion]
5968 | virt_to_page(addr)->mapping = NULL;
| ^~~~
arch/arm/include/asm/memory.h:390:53: note: expanded from macro 'virt_to_page'
390 | #define virt_to_page(kaddr) pfn_to_page(virt_to_pfn(kaddr))
| ^~~~~
include/asm-generic/memory_model.h:18:41: note: expanded from macro '__pfn_to_page'
18 | #define __pfn_to_page(pfn) (mem_map + ((pfn) - ARCH_PFN_OFFSET))
| ^~~
arch/arm/include/asm/memory.h:296:53: note: passing argument to parameter 'p' here
296 | static inline unsigned long virt_to_pfn(const void *p)
| ^
kernel/trace/ring_buffer.c:6156:22: error: incompatible integer to pointer conversion passing 'unsigned long' to parameter of type 'const void *' [-Wint-conversion]
6156 | return virt_to_page(cpu_buffer->page_ids[pgoff]);
| ^~~~~~~~~~~~~~~~~~~~~~~~~~~
arch/arm/include/asm/memory.h:390:53: note: expanded from macro 'virt_to_page'
390 | #define virt_to_page(kaddr) pfn_to_page(virt_to_pfn(kaddr))
| ^~~~~
include/asm-generic/memory_model.h:18:41: note: expanded from macro '__pfn_to_page'
18 | #define __pfn_to_page(pfn) (mem_map + ((pfn) - ARCH_PFN_OFFSET))
| ^~~
arch/arm/include/asm/memory.h:296:53: note: passing argument to parameter 'p' here
296 | static inline unsigned long virt_to_pfn(const void *p)
| ^
3 errors generated.
vim +5946 kernel/trace/ring_buffer.c
5940
5941 static void rb_free_page_ids(struct ring_buffer_per_cpu *cpu_buffer)
5942 {
5943 int i;
5944
5945 for (i = 0; i < cpu_buffer->nr_pages + 1; i++)
> 5946 virt_to_page(cpu_buffer->page_ids[i])->mapping = NULL;
5947
5948 kfree(cpu_buffer->page_ids);
5949 cpu_buffer->page_ids = NULL;
5950 }
5951
--
0-DAY CI Kernel Test Service
https://github.com/intel/lkp-tests/wiki
Hi Vincent,
kernel test robot noticed the following build warnings:
[auto build test WARNING on linus/master]
[cannot apply to rostedt-trace/for-next rostedt-trace/for-next-urgent]
[If your patch is applied to the wrong git tree, kindly drop us a note.
And when submitting patch, we suggest to use '--base' as documented in
https://git-scm.com/docs/git-format-patch#_base_tree_information]
url: https://github.com/intel-lab-lkp/linux/commits/Vincent-Donnefort/ring-buffer-Introducing-ring-buffer-mapping-functions/20230729-005300
base: linus/master
patch link: https://lore.kernel.org/r/20230728164754.460767-2-vdonnefort%40google.com
patch subject: [PATCH v5 1/2] ring-buffer: Introducing ring-buffer mapping functions
config: arm-randconfig-r046-20230728 (https://download.01.org/0day-ci/archive/20230729/[email protected]/config)
compiler: arm-linux-gnueabi-gcc (GCC) 12.3.0
reproduce: (https://download.01.org/0day-ci/archive/20230729/[email protected]/reproduce)
If you fix the issue in a separate patch/commit (i.e. not just a new version of
the same patch/commit), kindly add following tags
| Reported-by: kernel test robot <[email protected]>
| Closes: https://lore.kernel.org/oe-kbuild-all/[email protected]/
All warnings (new ones prefixed by >>):
In file included from arch/arm/include/asm/page.h:193,
from arch/arm/include/asm/thread_info.h:14,
from include/linux/thread_info.h:60,
from include/asm-generic/preempt.h:5,
from ./arch/arm/include/generated/asm/preempt.h:1,
from include/linux/preempt.h:79,
from include/linux/percpu.h:6,
from include/linux/context_tracking_state.h:5,
from include/linux/hardirq.h:5,
from include/linux/interrupt.h:11,
from include/linux/trace_recursion.h:5,
from kernel/trace/ring_buffer.c:7:
kernel/trace/ring_buffer.c: In function 'rb_free_page_ids':
>> kernel/trace/ring_buffer.c:5946:50: warning: passing argument 1 of 'virt_to_pfn' makes pointer from integer without a cast [-Wint-conversion]
5946 | virt_to_page(cpu_buffer->page_ids[i])->mapping = NULL;
| ~~~~~~~~~~~~~~~~~~~~^~~
| |
| long unsigned int
include/asm-generic/memory_model.h:18:46: note: in definition of macro '__pfn_to_page'
18 | #define __pfn_to_page(pfn) (mem_map + ((pfn) - ARCH_PFN_OFFSET))
| ^~~
kernel/trace/ring_buffer.c:5946:17: note: in expansion of macro 'virt_to_page'
5946 | virt_to_page(cpu_buffer->page_ids[i])->mapping = NULL;
| ^~~~~~~~~~~~
In file included from arch/arm/include/asm/page.h:188:
arch/arm/include/asm/memory.h:296:53: note: expected 'const void *' but argument is of type 'long unsigned int'
296 | static inline unsigned long virt_to_pfn(const void *p)
| ~~~~~~~~~~~~^
kernel/trace/ring_buffer.c: In function 'rb_free_meta_page':
kernel/trace/ring_buffer.c:5968:22: warning: passing argument 1 of 'virt_to_pfn' makes pointer from integer without a cast [-Wint-conversion]
5968 | virt_to_page(addr)->mapping = NULL;
| ^~~~
| |
| long unsigned int
include/asm-generic/memory_model.h:18:46: note: in definition of macro '__pfn_to_page'
18 | #define __pfn_to_page(pfn) (mem_map + ((pfn) - ARCH_PFN_OFFSET))
| ^~~
kernel/trace/ring_buffer.c:5968:9: note: in expansion of macro 'virt_to_page'
5968 | virt_to_page(addr)->mapping = NULL;
| ^~~~~~~~~~~~
arch/arm/include/asm/memory.h:296:53: note: expected 'const void *' but argument is of type 'long unsigned int'
296 | static inline unsigned long virt_to_pfn(const void *p)
| ~~~~~~~~~~~~^
kernel/trace/ring_buffer.c: In function 'ring_buffer_map_fault':
kernel/trace/ring_buffer.c:6156:49: warning: passing argument 1 of 'virt_to_pfn' makes pointer from integer without a cast [-Wint-conversion]
6156 | return virt_to_page(cpu_buffer->page_ids[pgoff]);
| ~~~~~~~~~~~~~~~~~~~~^~~~~~~
| |
| long unsigned int
include/asm-generic/memory_model.h:18:46: note: in definition of macro '__pfn_to_page'
18 | #define __pfn_to_page(pfn) (mem_map + ((pfn) - ARCH_PFN_OFFSET))
| ^~~
kernel/trace/ring_buffer.c:6156:16: note: in expansion of macro 'virt_to_page'
6156 | return virt_to_page(cpu_buffer->page_ids[pgoff]);
| ^~~~~~~~~~~~
arch/arm/include/asm/memory.h:296:53: note: expected 'const void *' but argument is of type 'long unsigned int'
296 | static inline unsigned long virt_to_pfn(const void *p)
| ~~~~~~~~~~~~^
vim +/virt_to_pfn +5946 kernel/trace/ring_buffer.c
5940
5941 static void rb_free_page_ids(struct ring_buffer_per_cpu *cpu_buffer)
5942 {
5943 int i;
5944
5945 for (i = 0; i < cpu_buffer->nr_pages + 1; i++)
> 5946 virt_to_page(cpu_buffer->page_ids[i])->mapping = NULL;
5947
5948 kfree(cpu_buffer->page_ids);
5949 cpu_buffer->page_ids = NULL;
5950 }
5951
--
0-DAY CI Kernel Test Service
https://github.com/intel/lkp-tests/wiki
On Fri, 28 Jul 2023 17:47:53 +0100
Vincent Donnefort <[email protected]> wrote:
> In preparation for allowing the user-space to map a ring-buffer, add
> a set of mapping functions:
>
> ring_buffer_{map,unmap}()
> ring_buffer_map_fault()
>
> And controls on the ring-buffer:
>
> ring_buffer_map_get_reader_page() /* swap reader and head */
>
> Mapping the ring-buffer also involves:
>
> A unique ID for each page of the ring-buffer, as currently the pages
> are only identified through their in-kernel VA.
>
> A meta-page, where are stored statistics about the ring-buffer and
> a page IDs list, ordered. A field gives what page is the reader
> one and one to gives where the ring-buffer starts in the list of data
> pages.
>
> The linear mapping exposes the meta-page, and each page of the
> ring-buffer, ordered following their unique ID, assigned during the
> first mapping.
>
> Once mapped, no page can get in or out of the ring-buffer: the buffer
> size will remain unmodified and the splice enabling functions will in
> reality simply memcpy the data instead of swapping pages.
So I tested these, and they look good. But I have some comments still.
>
> Signed-off-by: Vincent Donnefort <[email protected]>
>
> diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h
> index 782e14f62201..980abfbd92ed 100644
> --- a/include/linux/ring_buffer.h
> +++ b/include/linux/ring_buffer.h
> @@ -6,6 +6,8 @@
> #include <linux/seq_file.h>
> #include <linux/poll.h>
>
> +#include <uapi/linux/trace_mmap.h>
> +
> struct trace_buffer;
> struct ring_buffer_iter;
>
> @@ -211,4 +213,9 @@ int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node);
> #define trace_rb_cpu_prepare NULL
> #endif
>
> +int ring_buffer_map(struct trace_buffer *buffer, int cpu);
> +int ring_buffer_unmap(struct trace_buffer *buffer, int cpu);
> +struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu,
> + unsigned long pgoff);
> +int ring_buffer_map_get_reader_page(struct trace_buffer *buffer, int cpu);
> #endif /* _LINUX_RING_BUFFER_H */
> diff --git a/include/uapi/linux/trace_mmap.h b/include/uapi/linux/trace_mmap.h
> new file mode 100644
> index 000000000000..653176cc50bc
> --- /dev/null
> +++ b/include/uapi/linux/trace_mmap.h
> @@ -0,0 +1,26 @@
> +/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */
> +#ifndef _UAPI_TRACE_MMAP_H_
> +#define _UAPI_TRACE_MMAP_H_
> +
> +#include <linux/types.h>
> +
> +struct ring_buffer_meta {
To be consistent with the naming of the internal structure, let's call this:
struct trace_buffer_meta {
> + unsigned long entries;
> + unsigned long overrun;
> + unsigned long read;
> +
> + unsigned long pages_touched;
> + unsigned long pages_lost;
> + unsigned long pages_read;
> +
> + __u32 meta_page_size;
We still want this meta structure size exported. That way if we ever extend
the interface, the applications will know if the kernel supports it or not.
__u32 meta_struct_len; ?
> + __u32 nr_data_pages; /* Number of pages in the ring-buffer */
> +
> + struct reader_page {
> + __u32 id; /* Reader page ID from 0 to nr_data_pages - 1 */
> + __u32 read; /* Number of bytes read on the reader page */
> + unsigned long lost_events; /* Events lost at the time of the reader swap */
> + } reader_page;
I think we should define the structure outside the other structure, and
also rename it as reader_page is too generic. Perhaps call it
trace_buffer_read_page.
> +};
> +
> +#endif /* _UAPI_TRACE_MMAP_H_ */
> diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
> index de061dd47313..8f367fd3dbdd 100644
> --- a/kernel/trace/ring_buffer.c
> +++ b/kernel/trace/ring_buffer.c
> @@ -332,6 +332,7 @@ struct buffer_page {
> local_t entries; /* entries on this page */
> unsigned long real_end; /* real end of data */
> struct buffer_data_page *page; /* Actual data page */
> + u32 id; /* ID for external mapping */
I noticed that we have a whole in the current structure for 64 bit systems:
struct buffer_page {
struct list_head list; 16 bytes depending on arch
local_t write; 4 bytes
unsigned read; 4 bytes
local_t entries; 4 bytes
[ 4 byte whole on 64 bit archs ]
unsigned long real_end; 8 bytes
struct buffer_data_page *page; 8 bytes
};
If we put the id after entries, it will fill that whole.
struct buffer_page {
struct list_head list; /* list of buffer pages */
local_t write; /* index for next write */
unsigned read; /* index for next read */
local_t entries; /* entries on this page */
u32 id; /* ID for external mapping */
unsigned long real_end; /* real end of data */
struct buffer_data_page *page; /* Actual data page */
};
> };
>
> /*
> @@ -523,6 +524,12 @@ struct ring_buffer_per_cpu {
> rb_time_t before_stamp;
> u64 event_stamp[MAX_NEST];
> u64 read_stamp;
> +
> + int mapped;
> + struct mutex mapping_lock;
> + unsigned long *page_ids; /* ID to addr */
> + struct ring_buffer_meta *meta_page;
> +
> /* ring buffer pages to update, > 0 to add, < 0 to remove */
> long nr_pages_to_update;
> struct list_head new_pages; /* new pages to add */
> @@ -1562,6 +1569,13 @@ static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer,
> /* Again, either we update tail_page or an interrupt does */
> (void)cmpxchg(&cpu_buffer->tail_page, tail_page, next_page);
> }
> +
> + if (READ_ONCE(cpu_buffer->mapped)) {
> + /* Ensure the meta_page is ready */
> + smp_rmb();
> + WRITE_ONCE(cpu_buffer->meta_page->pages_touched,
> + local_read(&cpu_buffer->pages_touched));
> + }
I was thinking instead of doing this in the semi fast path, put this logic
into the rb_wakeup_waiters() code. That is, if a task is mapped, we call
the irq_work() to do this for us. It could even do more, like handle
blocked mapped waiters.
> }
>
> static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
> @@ -1725,6 +1739,7 @@ rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)
> init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
> init_waitqueue_head(&cpu_buffer->irq_work.waiters);
> init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
> + mutex_init(&cpu_buffer->mapping_lock);
>
> bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
> GFP_KERNEL, cpu_to_node(cpu));
> @@ -2521,6 +2536,15 @@ rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer,
> local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes);
> local_inc(&cpu_buffer->pages_lost);
>
> + if (READ_ONCE(cpu_buffer->mapped)) {
> + /* Ensure the meta_page is ready */
> + smp_rmb();
> + WRITE_ONCE(cpu_buffer->meta_page->overrun,
> + local_read(&cpu_buffer->overrun));
> + WRITE_ONCE(cpu_buffer->meta_page->pages_lost,
> + local_read(&cpu_buffer->pages_lost));
> + }
> +
Perhaps this too could be handled in the irq work?
> /*
> * The entries will be zeroed out when we move the
> * tail page.
> @@ -3183,6 +3207,14 @@ static inline void rb_event_discard(struct ring_buffer_event *event)
> static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer)
> {
> local_inc(&cpu_buffer->entries);
> +
> + if (READ_ONCE(cpu_buffer->mapped)) {
> + /* Ensure the meta_page is ready */
> + smp_rmb();
> + WRITE_ONCE(cpu_buffer->meta_page->entries,
> + local_read(&cpu_buffer->entries));
> + }
As well as this.
In other words, since the irq_work will trigger when something is waiting
for it, it could handle all the updates.
> +
> rb_end_commit(cpu_buffer);
> }
>
> @@ -3486,7 +3518,7 @@ static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer,
> return;
>
> /*
> - * If this interrupted another event,
> + * If this interrupted another event,
> */
> if (atomic_inc_return(this_cpu_ptr(&checking)) != 1)
> goto out;
> @@ -4658,6 +4690,13 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
> cpu_buffer->last_overrun = overwrite;
> }
>
> + if (cpu_buffer->mapped) {
> + WRITE_ONCE(cpu_buffer->meta_page->reader_page.read, 0);
> + WRITE_ONCE(cpu_buffer->meta_page->reader_page.id, reader->id);
> + WRITE_ONCE(cpu_buffer->meta_page->reader_page.lost_events, cpu_buffer->lost_events);
> + WRITE_ONCE(cpu_buffer->meta_page->pages_read, local_read(&cpu_buffer->pages_read));
> + }
> +
> goto again;
>
> out:
> @@ -4724,6 +4763,13 @@ static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
>
> length = rb_event_length(event);
> cpu_buffer->reader_page->read += length;
> +
> + if (cpu_buffer->mapped) {
> + WRITE_ONCE(cpu_buffer->meta_page->reader_page.read,
> + cpu_buffer->reader_page->read);
> + WRITE_ONCE(cpu_buffer->meta_page->read,
> + cpu_buffer->read);
> + }
> }
>
> static void rb_advance_iter(struct ring_buffer_iter *iter)
> @@ -5253,6 +5299,19 @@ static void rb_clear_buffer_page(struct buffer_page *page)
> page->read = 0;
> }
>
> +static void rb_reset_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
> +{
> + struct ring_buffer_meta *meta = cpu_buffer->meta_page;
> +
> + WRITE_ONCE(meta->entries, 0);
> + WRITE_ONCE(meta->overrun, 0);
> + WRITE_ONCE(meta->read, cpu_buffer->read);
> + WRITE_ONCE(meta->pages_touched, 0);
> + WRITE_ONCE(meta->pages_lost, 0);
> + WRITE_ONCE(meta->pages_read, local_read(&cpu_buffer->pages_read));
> + WRITE_ONCE(meta->reader_page.read, cpu_buffer->reader_page->read);
> +}
> +
> static void
> rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
> {
> @@ -5297,6 +5356,9 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
> cpu_buffer->lost_events = 0;
> cpu_buffer->last_overrun = 0;
>
> + if (cpu_buffer->mapped)
> + rb_reset_meta_page(cpu_buffer);
> +
> rb_head_page_activate(cpu_buffer);
> }
>
> @@ -5511,6 +5573,11 @@ int ring_buffer_swap_cpu(struct trace_buffer *buffer_a,
> cpu_buffer_a = buffer_a->buffers[cpu];
> cpu_buffer_b = buffer_b->buffers[cpu];
>
> + if (READ_ONCE(cpu_buffer_a->mapped) || READ_ONCE(cpu_buffer_b->mapped)) {
> + ret = -EBUSY;
> + goto out;
> + }
> +
> /* At least make sure the two buffers are somewhat the same */
> if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
> goto out;
> @@ -5753,7 +5820,8 @@ int ring_buffer_read_page(struct trace_buffer *buffer,
> * Otherwise, we can simply swap the page with the one passed in.
> */
> if (read || (len < (commit - read)) ||
> - cpu_buffer->reader_page == cpu_buffer->commit_page) {
> + cpu_buffer->reader_page == cpu_buffer->commit_page ||
> + cpu_buffer->mapped) {
> struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
> unsigned int rpos = read;
> unsigned int pos = 0;
> @@ -5870,6 +5938,255 @@ int ring_buffer_read_page(struct trace_buffer *buffer,
> }
> EXPORT_SYMBOL_GPL(ring_buffer_read_page);
>
> +static void rb_free_page_ids(struct ring_buffer_per_cpu *cpu_buffer)
> +{
> + int i;
> +
> + for (i = 0; i < cpu_buffer->nr_pages + 1; i++)
> + virt_to_page(cpu_buffer->page_ids[i])->mapping = NULL;
> +
> + kfree(cpu_buffer->page_ids);
> + cpu_buffer->page_ids = NULL;
> +}
> +
> +static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
> +{
> + if (cpu_buffer->meta_page)
> + return 0;
> +
> + cpu_buffer->meta_page = page_to_virt(alloc_page(GFP_USER));
> + if (!cpu_buffer->meta_page)
> + return -ENOMEM;
> +
> + return 0;
> +}
> +
> +static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
> +{
> + unsigned long addr = (unsigned long)cpu_buffer->meta_page;
> +
> + virt_to_page(addr)->mapping = NULL;
> + free_page(addr);
> + cpu_buffer->meta_page = NULL;
> +}
> +
> +static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer,
> + unsigned long *page_ids)
> +{
> + struct ring_buffer_meta *meta = cpu_buffer->meta_page;
> + unsigned int nr_data_pages = cpu_buffer->nr_pages + 1;
> + struct buffer_page *first_page, *bpage;
> + int id = 0;
> +
> + page_ids[id] = (unsigned long)cpu_buffer->reader_page->page;
> + cpu_buffer->reader_page->id = id++;
> +
> + first_page = bpage = rb_set_head_page(cpu_buffer);
> + do {
> + if (id >= nr_data_pages) {
> + WARN_ON(1);
> + break;
> + }
> +
> + page_ids[id] = (unsigned long)bpage->page;
> + bpage->id = id;
> +
> + rb_inc_page(&bpage);
> + id++;
> + } while (bpage != first_page);
> +
> + /* install page ID to kern VA translation */
> + cpu_buffer->page_ids = page_ids;
> +
> + meta->meta_page_size = PAGE_SIZE;
> + meta->nr_data_pages = nr_data_pages;
> + meta->reader_page.id = cpu_buffer->reader_page->id;
> + rb_reset_meta_page(cpu_buffer);
> +}
> +
> +static inline struct ring_buffer_per_cpu *
> +rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu)
> +{
> + struct ring_buffer_per_cpu *cpu_buffer;
> +
> + if (!cpumask_test_cpu(cpu, buffer->cpumask))
> + return ERR_PTR(-EINVAL);
> +
> + cpu_buffer = buffer->buffers[cpu];
> +
> + mutex_lock(&cpu_buffer->mapping_lock);
> +
> + if (!cpu_buffer->mapped) {
> + mutex_unlock(&cpu_buffer->mapping_lock);
> + return ERR_PTR(-ENODEV);
> + }
> +
> + return cpu_buffer;
> +}
> +
> +static inline void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer)
> +{
> + mutex_unlock(&cpu_buffer->mapping_lock);
> +}
> +
> +int ring_buffer_map(struct trace_buffer *buffer, int cpu)
> +{
> + struct ring_buffer_per_cpu *cpu_buffer;
> + unsigned long flags, *page_ids;
> + int err = 0;
> +
> + if (!cpumask_test_cpu(cpu, buffer->cpumask))
> + return -EINVAL;
> +
> + cpu_buffer = buffer->buffers[cpu];
> +
> + mutex_lock(&cpu_buffer->mapping_lock);
> +
> + if (cpu_buffer->mapped) {
> + WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped + 1);
> + goto unlock;
> + }
> +
> + /* prevent another thread from changing buffer sizes */
> + mutex_lock(&buffer->mutex);
> + atomic_inc(&cpu_buffer->resize_disabled);
> + mutex_unlock(&buffer->mutex);
> +
> + err = rb_alloc_meta_page(cpu_buffer);
> + if (err) {
> + atomic_dec(&cpu_buffer->resize_disabled);
> + goto unlock;
> + }
> +
> + /* page_ids include the reader page while nr_pages does not */
> + page_ids = kzalloc(sizeof(*page_ids) * (cpu_buffer->nr_pages + 1),
> + GFP_KERNEL);
> + if (!page_ids) {
> + rb_free_meta_page(cpu_buffer);
> + atomic_dec(&cpu_buffer->resize_disabled);
> + err = -ENOMEM;
> + goto unlock;
> + }
> +
> + /*
> + * Lock all readers to block any page swap until the page IDs are
> + * assigned.
> + */
> + raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
> +
> + rb_setup_ids_meta_page(cpu_buffer, page_ids);
> + /*
> + * Ensure the writer will observe the meta-page before
> + * cpu_buffer->mapped.
> + */
> + smp_wmb();
> + WRITE_ONCE(cpu_buffer->mapped, 1);
> +
> + /* Init meta_page values unless the writer did it already */
> + cmpxchg(&cpu_buffer->meta_page->entries, 0,
> + local_read(&cpu_buffer->entries));
> + cmpxchg(&cpu_buffer->meta_page->overrun, 0,
> + local_read(&cpu_buffer->overrun));
> + cmpxchg(&cpu_buffer->meta_page->pages_touched, 0,
> + local_read(&cpu_buffer->pages_touched));
> + cmpxchg(&cpu_buffer->meta_page->pages_lost, 0,
> + local_read(&cpu_buffer->pages_lost));
> +
> + raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
> +unlock:
> + mutex_unlock(&cpu_buffer->mapping_lock);
> +
> + return err;
> +}
> +
> +int ring_buffer_unmap(struct trace_buffer *buffer, int cpu)
> +{
> + struct ring_buffer_per_cpu *cpu_buffer;
> + int err = 0;
> +
> + if (!cpumask_test_cpu(cpu, buffer->cpumask))
> + return -EINVAL;
> +
> + cpu_buffer = buffer->buffers[cpu];
> +
> + mutex_lock(&cpu_buffer->mapping_lock);
> +
> + if (!cpu_buffer->mapped) {
> + err = -ENODEV;
> + goto unlock;
> + }
> +
> + WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped - 1);
> + if (!cpu_buffer->mapped) {
> + /* Wait the writer and readers to observe !mapped */
> + synchronize_rcu();
> +
> + rb_free_page_ids(cpu_buffer);
> + rb_free_meta_page(cpu_buffer);
> + atomic_dec(&cpu_buffer->resize_disabled);
> + }
> +
> +unlock:
> + mutex_unlock(&cpu_buffer->mapping_lock);
> +
> + return err;
> +}
> +
> +/*
> + * +--------------+
> + * | meta page | pgoff=0
> + * +--------------+
> + * | data page1 | page_ids=0
> + * +--------------+
> + * | data page2 | page_ids=1
> + * ...
> + */
> +struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu,
> + unsigned long pgoff)
> +{
> + struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
> +
> + if (!pgoff)
> + return virt_to_page((void *)cpu_buffer->meta_page);
> +
> + pgoff--;
> + if (pgoff > cpu_buffer->nr_pages)
> + return NULL;
> +
> + return virt_to_page(cpu_buffer->page_ids[pgoff]);
> +}
> +
> +int ring_buffer_map_get_reader_page(struct trace_buffer *buffer, int cpu)
> +{
> + struct ring_buffer_per_cpu *cpu_buffer;
> + unsigned long reader_size, flags;
Please put variable declarations on separate lines.
unsigned long reader_size;
unsigned long flags;
> +
> + cpu_buffer = rb_get_mapped_buffer(buffer, cpu);
> + if (IS_ERR(cpu_buffer))
> + return (int)PTR_ERR(cpu_buffer);
> +
> + raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
> +consume:
> + if (rb_per_cpu_empty(cpu_buffer))
> + goto out;
> + reader_size = rb_page_size(cpu_buffer->reader_page);
> + if (cpu_buffer->reader_page->read < reader_size) {
Please add a comment to what is going on here. I'm assuming that this is to
finish reading the reader page?
> + while (cpu_buffer->reader_page->read < reader_size)
> + rb_advance_reader(cpu_buffer);
> + goto out;
> + }
> +
> + if (WARN_ON(!rb_get_reader_page(cpu_buffer)))
> + goto out;
> +
> + goto consume;
> +out:
> + raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
> + rb_put_mapped_buffer(cpu_buffer);
> +
> + return 0;
> +}
> +
> /*
> * We only allocate new buffers, never free them if the CPU goes down.
> * If we were to free the buffer, then the user would lose any trace that was in
Thanks,
-- Steve
On Tue, 1 Aug 2023 13:26:03 -0400
Steven Rostedt <[email protected]> wrote:
> > +
> > + if (READ_ONCE(cpu_buffer->mapped)) {
> > + /* Ensure the meta_page is ready */
> > + smp_rmb();
> > + WRITE_ONCE(cpu_buffer->meta_page->pages_touched,
> > + local_read(&cpu_buffer->pages_touched));
> > + }
>
> I was thinking instead of doing this in the semi fast path, put this logic
> into the rb_wakeup_waiters() code. That is, if a task is mapped, we call
> the irq_work() to do this for us. It could even do more, like handle
> blocked mapped waiters.
I was thinking how to implement this, and I worry that it may cause an irq
storm. Let's keep this (and the other locations) as is, where we do the
updates in place. Then we can look at seeing if it is possible to do it in
a delayed fashion another time.
-- Steve
On Wed, Aug 02, 2023 at 07:45:26AM -0400, Steven Rostedt wrote:
> On Tue, 1 Aug 2023 13:26:03 -0400
> Steven Rostedt <[email protected]> wrote:
>
> > > +
> > > + if (READ_ONCE(cpu_buffer->mapped)) {
> > > + /* Ensure the meta_page is ready */
> > > + smp_rmb();
> > > + WRITE_ONCE(cpu_buffer->meta_page->pages_touched,
> > > + local_read(&cpu_buffer->pages_touched));
> > > + }
> >
> > I was thinking instead of doing this in the semi fast path, put this logic
> > into the rb_wakeup_waiters() code. That is, if a task is mapped, we call
> > the irq_work() to do this for us. It could even do more, like handle
> > blocked mapped waiters.
>
> I was thinking how to implement this, and I worry that it may cause an irq
> storm. Let's keep this (and the other locations) as is, where we do the
> updates in place. Then we can look at seeing if it is possible to do it in
> a delayed fashion another time.
I actually looking at this. How about:
On the userspace side, a simple poll:
static void wait_entries(int fd)
{
struct pollfd pollfd = {
.fd = fd,
.events = POLLIN,
};
if (poll(&pollfd, 1, -1) == -1)
pdie("poll");
}
And on the kernel side, just a function to update the "writer fields" of the
meta-page:
static void rb_wake_up_waiters(struct irq_work *work)
{
struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
+ struct ring_buffer_per_cpu *cpu_buffer =
+ container_of(rbwork, struct ring_buffer_per_cpu, irq_work);
+
+ rb_update_meta_page(cpu_buffer);
wake_up_all(&rbwork->waiters);
That would rate limit the number of updates to the meta-page without any irq storm?
>
> -- Steve
On Wed, 2 Aug 2023 13:30:56 +0100
Vincent Donnefort <[email protected]> wrote:
> On Wed, Aug 02, 2023 at 07:45:26AM -0400, Steven Rostedt wrote:
> > On Tue, 1 Aug 2023 13:26:03 -0400
> > Steven Rostedt <[email protected]> wrote:
> >
> > > > +
> > > > + if (READ_ONCE(cpu_buffer->mapped)) {
> > > > + /* Ensure the meta_page is ready */
> > > > + smp_rmb();
> > > > + WRITE_ONCE(cpu_buffer->meta_page->pages_touched,
> > > > + local_read(&cpu_buffer->pages_touched));
> > > > + }
> > >
> > > I was thinking instead of doing this in the semi fast path, put this logic
> > > into the rb_wakeup_waiters() code. That is, if a task is mapped, we call
> > > the irq_work() to do this for us. It could even do more, like handle
> > > blocked mapped waiters.
> >
> > I was thinking how to implement this, and I worry that it may cause an irq
> > storm. Let's keep this (and the other locations) as is, where we do the
> > updates in place. Then we can look at seeing if it is possible to do it in
> > a delayed fashion another time.
>
> I actually looking at this. How about:
>
> On the userspace side, a simple poll:
>
> static void wait_entries(int fd)
> {
> struct pollfd pollfd = {
> .fd = fd,
> .events = POLLIN,
> };
>
> if (poll(&pollfd, 1, -1) == -1)
> pdie("poll");
> }
>
> And on the kernel side, just a function to update the "writer fields" of the
> meta-page:
>
> static void rb_wake_up_waiters(struct irq_work *work)
> {
> struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
> + struct ring_buffer_per_cpu *cpu_buffer =
> + container_of(rbwork, struct ring_buffer_per_cpu, irq_work);
> +
> + rb_update_meta_page(cpu_buffer);
>
> wake_up_all(&rbwork->waiters);
>
> That would rate limit the number of updates to the meta-page without any irq storm?
>
Is poll an issue? It requires user space to do a system call to see if
there's more data? But I guess that's not too much of an issue, as it needs
to do the ioctl to get the reader page.
We could also add an option to the ioctl to block, or have the ioctl honor
the NON_BLOCK flags of the fd?
-- Steve
[...]
> > And on the kernel side, just a function to update the "writer fields" of the
> > meta-page:
> >
> > static void rb_wake_up_waiters(struct irq_work *work)
> > {
> > struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
> > + struct ring_buffer_per_cpu *cpu_buffer =
> > + container_of(rbwork, struct ring_buffer_per_cpu, irq_work);
> > +
> > + rb_update_meta_page(cpu_buffer);
> >
> > wake_up_all(&rbwork->waiters);
> >
> > That would rate limit the number of updates to the meta-page without any irq storm?
> >
>
> Is poll an issue? It requires user space to do a system call to see if
> there's more data? But I guess that's not too much of an issue, as it needs
> to do the ioctl to get the reader page.
I don't think there's any problem with this approach, beside the extra system
call...
>
> We could also add an option to the ioctl to block, or have the ioctl honor
> the NON_BLOCK flags of the fd?
... but indeed, we could block there. The userspace interface would be even simpler.
How about?
+++ b/kernel/trace/trace.c
@@ -8499,12 +8499,22 @@ static long tracing_buffers_ioctl(struct file *file, unsigned int cmd, unsigned
{
struct ftrace_buffer_info *info = file->private_data;
struct trace_iterator *iter = &info->iter;
+ int err;
+
+ if (cmd == TRACE_MMAP_IOCTL_GET_READER_PAGE) {
+ if (!(file->f_flags & O_NONBLOCK)) {
+ err = ring_buffer_wait(iter->array_buffer->buffer,
+ iter->cpu_file,
+ iter->tr->buffer_percent);
+ if (err)
+ return err;
+ }
- if (cmd == TRACE_MMAP_IOCTL_GET_READER_PAGE)
return ring_buffer_map_get_reader_page(iter->array_buffer->buffer,
iter->cpu_file);
>
> -- Steve
>
> --
> To unsubscribe from this group and stop receiving emails from it, send an email to [email protected].
>
On Thu, 3 Aug 2023 11:33:39 +0100
Vincent Donnefort <[email protected]> wrote:
> [...]
>
> > > And on the kernel side, just a function to update the "writer fields" of the
> > > meta-page:
> > >
> > > static void rb_wake_up_waiters(struct irq_work *work)
> > > {
> > > struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
> > > + struct ring_buffer_per_cpu *cpu_buffer =
> > > + container_of(rbwork, struct ring_buffer_per_cpu, irq_work);
> > > +
> > > + rb_update_meta_page(cpu_buffer);
> > >
> > > wake_up_all(&rbwork->waiters);
> > >
> > > That would rate limit the number of updates to the meta-page without any irq storm?
> > >
> >
> > Is poll an issue? It requires user space to do a system call to see if
> > there's more data? But I guess that's not too much of an issue, as it needs
> > to do the ioctl to get the reader page.
>
> I don't think there's any problem with this approach, beside the extra system
> call...
>
> >
> > We could also add an option to the ioctl to block, or have the ioctl honor
> > the NON_BLOCK flags of the fd?
>
> ... but indeed, we could block there. The userspace interface would be even simpler.
> How about?
>
> +++ b/kernel/trace/trace.c
> @@ -8499,12 +8499,22 @@ static long tracing_buffers_ioctl(struct file *file, unsigned int cmd, unsigned
> {
> struct ftrace_buffer_info *info = file->private_data;
> struct trace_iterator *iter = &info->iter;
> + int err;
> +
> + if (cmd == TRACE_MMAP_IOCTL_GET_READER_PAGE) {
> + if (!(file->f_flags & O_NONBLOCK)) {
> + err = ring_buffer_wait(iter->array_buffer->buffer,
> + iter->cpu_file,
> + iter->tr->buffer_percent);
> + if (err)
> + return err;
> + }
>
> - if (cmd == TRACE_MMAP_IOCTL_GET_READER_PAGE)
> return ring_buffer_map_get_reader_page(iter->array_buffer->buffer,
> iter->cpu_file);
>
Looks good to me.
-- Steve