2012-05-04 01:59:58

by Vaibhav Nagarnaik

[permalink] [raw]
Subject: [PATCH v7 1/3] trace: Make removal of ring buffer pages atomic

This patch adds the capability to remove pages from a ring buffer
without destroying any existing data in it.

This is done by removing the pages after the tail page. This makes sure
that first all the empty pages in the ring buffer are removed. If the
head page is one in the list of pages to be removed, then the page after
the removed ones is made the head page. This removes the oldest data
from the ring buffer and keeps the latest data around to be read.

To do this in a non-racey manner, tracing is stopped for a very short
time while the pages to be removed are identified and unlinked from the
ring buffer. The pages are freed after the tracing is restarted to
minimize the time needed to stop tracing.

The context in which the pages from the per-cpu ring buffer are removed
runs on the respective CPU. This minimizes the events not traced to only
NMI trace contexts.

Signed-off-by: Vaibhav Nagarnaik <[email protected]>
---
Changelog v7-v6:
* Fix error with resizing when tracing_on is set to '0'
* Fix error with resizing by trace-cmd

kernel/trace/ring_buffer.c | 265 ++++++++++++++++++++++++++++++++++----------
kernel/trace/trace.c | 20 +---
2 files changed, 209 insertions(+), 76 deletions(-)

diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index 2d5eb33..230ae9d 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -23,6 +23,8 @@
#include <asm/local.h>
#include "trace.h"

+static void update_pages_handler(struct work_struct *work);
+
/*
* The ring buffer header is special. We must manually up keep it.
*/
@@ -470,12 +472,15 @@ struct ring_buffer_per_cpu {
/* ring buffer pages to update, > 0 to add, < 0 to remove */
int nr_pages_to_update;
struct list_head new_pages; /* new pages to add */
+ struct work_struct update_pages_work;
+ struct completion update_completion;
};

struct ring_buffer {
unsigned flags;
int cpus;
atomic_t record_disabled;
+ atomic_t resize_disabled;
cpumask_var_t cpumask;

struct lock_class_key *reader_lock_key;
@@ -1048,6 +1053,8 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int nr_pages, int cpu)
raw_spin_lock_init(&cpu_buffer->reader_lock);
lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
+ INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler);
+ init_completion(&cpu_buffer->update_completion);

bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
GFP_KERNEL, cpu_to_node(cpu));
@@ -1235,32 +1242,123 @@ void ring_buffer_set_clock(struct ring_buffer *buffer,

static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);

+static inline unsigned long rb_page_entries(struct buffer_page *bpage)
+{
+ return local_read(&bpage->entries) & RB_WRITE_MASK;
+}
+
+static inline unsigned long rb_page_write(struct buffer_page *bpage)
+{
+ return local_read(&bpage->write) & RB_WRITE_MASK;
+}
+
static void
-rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
+rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned int nr_pages)
{
- struct buffer_page *bpage;
- struct list_head *p;
- unsigned i;
+ struct list_head *tail_page, *to_remove, *next_page;
+ struct buffer_page *to_remove_page, *tmp_iter_page;
+ struct buffer_page *last_page, *first_page;
+ unsigned int nr_removed;
+ unsigned long head_bit;
+ int page_entries;
+
+ head_bit = 0;

raw_spin_lock_irq(&cpu_buffer->reader_lock);
- rb_head_page_deactivate(cpu_buffer);
+ atomic_inc(&cpu_buffer->record_disabled);
+ /*
+ * We don't race with the readers since we have acquired the reader
+ * lock. We also don't race with writers after disabling recording.
+ * This makes it easy to figure out the first and the last page to be
+ * removed from the list. We unlink all the pages in between including
+ * the first and last pages. This is done in a busy loop so that we
+ * lose the least number of traces.
+ * The pages are freed after we restart recording and unlock readers.
+ */
+ tail_page = &cpu_buffer->tail_page->list;

- for (i = 0; i < nr_pages; i++) {
- if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)))
- goto out;
- p = cpu_buffer->pages->next;
- bpage = list_entry(p, struct buffer_page, list);
- list_del_init(&bpage->list);
- free_buffer_page(bpage);
+ /*
+ * tail page might be on reader page, we remove the next page
+ * from the ring buffer
+ */
+ if (cpu_buffer->tail_page == cpu_buffer->reader_page)
+ tail_page = rb_list_head(tail_page->next);
+ to_remove = tail_page;
+
+ /* start of pages to remove */
+ first_page = list_entry(rb_list_head(to_remove->next),
+ struct buffer_page, list);
+
+ for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) {
+ to_remove = rb_list_head(to_remove)->next;
+ head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD;
}
- if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)))
- goto out;

- rb_reset_cpu(cpu_buffer);
- rb_check_pages(cpu_buffer);
+ next_page = rb_list_head(to_remove)->next;

-out:
+ /*
+ * Now we remove all pages between tail_page and next_page.
+ * Make sure that we have head_bit value preserved for the
+ * next page
+ */
+ tail_page->next = (struct list_head *)((unsigned long)next_page |
+ head_bit);
+ next_page = rb_list_head(next_page);
+ next_page->prev = tail_page;
+
+ /* make sure pages points to a valid page in the ring buffer */
+ cpu_buffer->pages = next_page;
+
+ /* update head page */
+ if (head_bit)
+ cpu_buffer->head_page = list_entry(next_page,
+ struct buffer_page, list);
+
+ /*
+ * change read pointer to make sure any read iterators reset
+ * themselves
+ */
+ cpu_buffer->read = 0;
+
+ /* pages are removed, resume tracing and then free the pages */
+ atomic_dec(&cpu_buffer->record_disabled);
raw_spin_unlock_irq(&cpu_buffer->reader_lock);
+
+ RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages));
+
+ /* last buffer page to remove */
+ last_page = list_entry(rb_list_head(to_remove), struct buffer_page,
+ list);
+ tmp_iter_page = first_page;
+
+ do {
+ to_remove_page = tmp_iter_page;
+ rb_inc_page(cpu_buffer, &tmp_iter_page);
+
+ /* update the counters */
+ page_entries = rb_page_entries(to_remove_page);
+ if (page_entries) {
+ /*
+ * If something was added to this page, it was full
+ * since it is not the tail page. So we deduct the
+ * bytes consumed in ring buffer from here.
+ * No need to update overruns, since this page is
+ * deleted from ring buffer and its entries are
+ * already accounted for.
+ */
+ local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes);
+ }
+
+ /*
+ * We have already removed references to this list item, just
+ * free up the buffer_page and its page
+ */
+ free_buffer_page(to_remove_page);
+ nr_removed--;
+
+ } while (to_remove_page != last_page);
+
+ RB_WARN_ON(cpu_buffer, nr_removed);
}

static void
@@ -1272,6 +1370,8 @@ rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
unsigned i;

raw_spin_lock_irq(&cpu_buffer->reader_lock);
+ /* stop the writers while inserting pages */
+ atomic_inc(&cpu_buffer->record_disabled);
rb_head_page_deactivate(cpu_buffer);

for (i = 0; i < nr_pages; i++) {
@@ -1286,19 +1386,27 @@ rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
rb_check_pages(cpu_buffer);

out:
+ atomic_dec(&cpu_buffer->record_disabled);
raw_spin_unlock_irq(&cpu_buffer->reader_lock);
}

-static void update_pages_handler(struct ring_buffer_per_cpu *cpu_buffer)
+static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer)
{
if (cpu_buffer->nr_pages_to_update > 0)
rb_insert_pages(cpu_buffer, &cpu_buffer->new_pages,
cpu_buffer->nr_pages_to_update);
else
rb_remove_pages(cpu_buffer, -cpu_buffer->nr_pages_to_update);
+
cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update;
- /* reset this value */
- cpu_buffer->nr_pages_to_update = 0;
+}
+
+static void update_pages_handler(struct work_struct *work)
+{
+ struct ring_buffer_per_cpu *cpu_buffer = container_of(work,
+ struct ring_buffer_per_cpu, update_pages_work);
+ rb_update_pages(cpu_buffer);
+ complete(&cpu_buffer->update_completion);
}

/**
@@ -1308,14 +1416,14 @@ static void update_pages_handler(struct ring_buffer_per_cpu *cpu_buffer)
*
* Minimum size is 2 * BUF_PAGE_SIZE.
*
- * Returns -1 on failure.
+ * Returns 0 on success and < 0 on failure.
*/
int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size,
int cpu_id)
{
struct ring_buffer_per_cpu *cpu_buffer;
unsigned nr_pages;
- int cpu;
+ int cpu, err = 0;

/*
* Always succeed at resizing a non-existent buffer:
@@ -1330,15 +1438,18 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size,
if (size < BUF_PAGE_SIZE * 2)
size = BUF_PAGE_SIZE * 2;

- atomic_inc(&buffer->record_disabled);
+ nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);

- /* Make sure all writers are done with this buffer. */
- synchronize_sched();
+ /*
+ * Don't succeed if resizing is disabled, as a reader might be
+ * manipulating the ring buffer and is expecting a sane state while
+ * this is true.
+ */
+ if (atomic_read(&buffer->resize_disabled))
+ return -EBUSY;

+ /* prevent another thread from changing buffer sizes */
mutex_lock(&buffer->mutex);
- get_online_cpus();
-
- nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);

if (cpu_id == RING_BUFFER_ALL_CPUS) {
/* calculate the pages to update */
@@ -1347,33 +1458,67 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size,

cpu_buffer->nr_pages_to_update = nr_pages -
cpu_buffer->nr_pages;
-
/*
* nothing more to do for removing pages or no update
*/
if (cpu_buffer->nr_pages_to_update <= 0)
continue;
-
/*
* to add pages, make sure all new pages can be
* allocated without receiving ENOMEM
*/
INIT_LIST_HEAD(&cpu_buffer->new_pages);
if (__rb_allocate_pages(cpu_buffer->nr_pages_to_update,
- &cpu_buffer->new_pages, cpu))
+ &cpu_buffer->new_pages, cpu)) {
/* not enough memory for new pages */
- goto no_mem;
+ err = -ENOMEM;
+ goto out_err;
+ }
+ }
+
+ get_online_cpus();
+ /*
+ * Fire off all the required work handlers
+ * Look out for offline CPUs
+ */
+ for_each_buffer_cpu(buffer, cpu) {
+ cpu_buffer = buffer->buffers[cpu];
+ if (!cpu_buffer->nr_pages_to_update ||
+ !cpu_online(cpu))
+ continue;
+
+ schedule_work_on(cpu, &cpu_buffer->update_pages_work);
+ }
+ /*
+ * This loop is for the CPUs that are not online.
+ * We can't schedule anything on them, but it's not necessary
+ * since we can change their buffer sizes without any race.
+ */
+ for_each_buffer_cpu(buffer, cpu) {
+ cpu_buffer = buffer->buffers[cpu];
+ if (!cpu_buffer->nr_pages_to_update ||
+ cpu_online(cpu))
+ continue;
+
+ rb_update_pages(cpu_buffer);
}

/* wait for all the updates to complete */
for_each_buffer_cpu(buffer, cpu) {
cpu_buffer = buffer->buffers[cpu];
- if (cpu_buffer->nr_pages_to_update) {
- update_pages_handler(cpu_buffer);
- }
+ if (!cpu_buffer->nr_pages_to_update||
+ !cpu_online(cpu))
+ continue;
+
+ wait_for_completion(&cpu_buffer->update_completion);
+ /* reset this value */
+ cpu_buffer->nr_pages_to_update = 0;
}
+
+ put_online_cpus();
} else {
cpu_buffer = buffer->buffers[cpu_id];
+
if (nr_pages == cpu_buffer->nr_pages)
goto out;

@@ -1383,38 +1528,47 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size,
INIT_LIST_HEAD(&cpu_buffer->new_pages);
if (cpu_buffer->nr_pages_to_update > 0 &&
__rb_allocate_pages(cpu_buffer->nr_pages_to_update,
- &cpu_buffer->new_pages, cpu_id))
- goto no_mem;
+ &cpu_buffer->new_pages, cpu_id)) {
+ err = -ENOMEM;
+ goto out_err;
+ }

- update_pages_handler(cpu_buffer);
+ get_online_cpus();
+
+ if (cpu_online(cpu_id)) {
+ schedule_work_on(cpu_id,
+ &cpu_buffer->update_pages_work);
+ wait_for_completion(&cpu_buffer->update_completion);
+ } else
+ rb_update_pages(cpu_buffer);
+
+ put_online_cpus();
+ /* reset this value */
+ cpu_buffer->nr_pages_to_update = 0;
}

out:
- put_online_cpus();
mutex_unlock(&buffer->mutex);
-
- atomic_dec(&buffer->record_disabled);
-
return size;

- no_mem:
+ out_err:
for_each_buffer_cpu(buffer, cpu) {
struct buffer_page *bpage, *tmp;
+
cpu_buffer = buffer->buffers[cpu];
- /* reset this number regardless */
cpu_buffer->nr_pages_to_update = 0;
+
if (list_empty(&cpu_buffer->new_pages))
continue;
+
list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
list) {
list_del_init(&bpage->list);
free_buffer_page(bpage);
}
}
- put_online_cpus();
mutex_unlock(&buffer->mutex);
- atomic_dec(&buffer->record_disabled);
- return -ENOMEM;
+ return err;
}
EXPORT_SYMBOL_GPL(ring_buffer_resize);

@@ -1453,21 +1607,11 @@ rb_iter_head_event(struct ring_buffer_iter *iter)
return __rb_page_index(iter->head_page, iter->head);
}

-static inline unsigned long rb_page_write(struct buffer_page *bpage)
-{
- return local_read(&bpage->write) & RB_WRITE_MASK;
-}
-
static inline unsigned rb_page_commit(struct buffer_page *bpage)
{
return local_read(&bpage->page->commit);
}

-static inline unsigned long rb_page_entries(struct buffer_page *bpage)
-{
- return local_read(&bpage->entries) & RB_WRITE_MASK;
-}
-
/* Size is determined by what has been committed */
static inline unsigned rb_page_size(struct buffer_page *bpage)
{
@@ -3492,6 +3636,7 @@ ring_buffer_read_prepare(struct ring_buffer *buffer, int cpu)

iter->cpu_buffer = cpu_buffer;

+ atomic_inc(&buffer->resize_disabled);
atomic_inc(&cpu_buffer->record_disabled);

return iter;
@@ -3555,6 +3700,7 @@ ring_buffer_read_finish(struct ring_buffer_iter *iter)
struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;

atomic_dec(&cpu_buffer->record_disabled);
+ atomic_dec(&cpu_buffer->buffer->resize_disabled);
kfree(iter);
}
EXPORT_SYMBOL_GPL(ring_buffer_read_finish);
@@ -3662,8 +3808,12 @@ void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
if (!cpumask_test_cpu(cpu, buffer->cpumask))
return;

+ atomic_inc(&buffer->resize_disabled);
atomic_inc(&cpu_buffer->record_disabled);

+ /* Make sure all commits have finished */
+ synchronize_sched();
+
raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);

if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing)))
@@ -3679,6 +3829,7 @@ void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);

atomic_dec(&cpu_buffer->record_disabled);
+ atomic_dec(&buffer->resize_disabled);
}
EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu);

diff --git a/kernel/trace/trace.c b/kernel/trace/trace.c
index 7bb735b..401d77a 100644
--- a/kernel/trace/trace.c
+++ b/kernel/trace/trace.c
@@ -3057,20 +3057,10 @@ static int __tracing_resize_ring_buffer(unsigned long size, int cpu)

static ssize_t tracing_resize_ring_buffer(unsigned long size, int cpu_id)
{
- int cpu, ret = size;
+ int ret = size;

mutex_lock(&trace_types_lock);

- tracing_stop();
-
- /* disable all cpu buffers */
- for_each_tracing_cpu(cpu) {
- if (global_trace.data[cpu])
- atomic_inc(&global_trace.data[cpu]->disabled);
- if (max_tr.data[cpu])
- atomic_inc(&max_tr.data[cpu]->disabled);
- }
-
if (cpu_id != RING_BUFFER_ALL_CPUS) {
/* make sure, this cpu is enabled in the mask */
if (!cpumask_test_cpu(cpu_id, tracing_buffer_mask)) {
@@ -3084,14 +3074,6 @@ static ssize_t tracing_resize_ring_buffer(unsigned long size, int cpu_id)
ret = -ENOMEM;

out:
- for_each_tracing_cpu(cpu) {
- if (global_trace.data[cpu])
- atomic_dec(&global_trace.data[cpu]->disabled);
- if (max_tr.data[cpu])
- atomic_dec(&max_tr.data[cpu]->disabled);
- }
-
- tracing_start();
mutex_unlock(&trace_types_lock);

return ret;
--
1.7.7.3


2012-05-04 01:59:57

by Vaibhav Nagarnaik

[permalink] [raw]
Subject: [PATCH v7 2/3] trace: Make addition of pages in ring buffer atomic

This patch adds the capability to add new pages to a ring buffer
atomically while write operations are going on. This makes it possible
to expand the ring buffer size without reinitializing the ring buffer.

The new pages are attached between the head page and its previous page.

Signed-off-by: Vaibhav Nagarnaik <[email protected]>
---
kernel/trace/ring_buffer.c | 102 +++++++++++++++++++++++++++++++++-----------
1 files changed, 77 insertions(+), 25 deletions(-)

diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index 230ae9d..a084b4c 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -1252,7 +1252,7 @@ static inline unsigned long rb_page_write(struct buffer_page *bpage)
return local_read(&bpage->write) & RB_WRITE_MASK;
}

-static void
+static int
rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned int nr_pages)
{
struct list_head *tail_page, *to_remove, *next_page;
@@ -1359,46 +1359,97 @@ rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned int nr_pages)
} while (to_remove_page != last_page);

RB_WARN_ON(cpu_buffer, nr_removed);
+
+ return nr_removed == 0;
}

-static void
-rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
- struct list_head *pages, unsigned nr_pages)
+static int
+rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer)
{
- struct buffer_page *bpage;
- struct list_head *p;
- unsigned i;
+ struct list_head *pages = &cpu_buffer->new_pages;
+ int retries, success;

raw_spin_lock_irq(&cpu_buffer->reader_lock);
- /* stop the writers while inserting pages */
- atomic_inc(&cpu_buffer->record_disabled);
- rb_head_page_deactivate(cpu_buffer);
+ /*
+ * We are holding the reader lock, so the reader page won't be swapped
+ * in the ring buffer. Now we are racing with the writer trying to
+ * move head page and the tail page.
+ * We are going to adapt the reader page update process where:
+ * 1. We first splice the start and end of list of new pages between
+ * the head page and its previous page.
+ * 2. We cmpxchg the prev_page->next to point from head page to the
+ * start of new pages list.
+ * 3. Finally, we update the head->prev to the end of new list.
+ *
+ * We will try this process 10 times, to make sure that we don't keep
+ * spinning.
+ */
+ retries = 10;
+ success = 0;
+ while (retries--) {
+ struct list_head *head_page, *prev_page, *r;
+ struct list_head *last_page, *first_page;
+ struct list_head *head_page_with_bit;

- for (i = 0; i < nr_pages; i++) {
- if (RB_WARN_ON(cpu_buffer, list_empty(pages)))
- goto out;
- p = pages->next;
- bpage = list_entry(p, struct buffer_page, list);
- list_del_init(&bpage->list);
- list_add_tail(&bpage->list, cpu_buffer->pages);
+ head_page = &rb_set_head_page(cpu_buffer)->list;
+ prev_page = head_page->prev;
+
+ first_page = pages->next;
+ last_page = pages->prev;
+
+ head_page_with_bit = (struct list_head *)
+ ((unsigned long)head_page | RB_PAGE_HEAD);
+
+ last_page->next = head_page_with_bit;
+ first_page->prev = prev_page;
+
+ r = cmpxchg(&prev_page->next, head_page_with_bit, first_page);
+
+ if (r == head_page_with_bit) {
+ /*
+ * yay, we replaced the page pointer to our new list,
+ * now, we just have to update to head page's prev
+ * pointer to point to end of list
+ */
+ head_page->prev = last_page;
+ success = 1;
+ break;
+ }
}
- rb_reset_cpu(cpu_buffer);
- rb_check_pages(cpu_buffer);

-out:
- atomic_dec(&cpu_buffer->record_disabled);
+ if (success)
+ INIT_LIST_HEAD(pages);
+ /*
+ * If we weren't successful in adding in new pages, warn and stop
+ * tracing
+ */
+ RB_WARN_ON(cpu_buffer, !success);
raw_spin_unlock_irq(&cpu_buffer->reader_lock);
+
+ /* free pages if they weren't inserted */
+ if (!success) {
+ struct buffer_page *bpage, *tmp;
+ list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
+ list) {
+ list_del_init(&bpage->list);
+ free_buffer_page(bpage);
+ }
+ }
+ return success;
}

static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer)
{
+ int success;
+
if (cpu_buffer->nr_pages_to_update > 0)
- rb_insert_pages(cpu_buffer, &cpu_buffer->new_pages,
- cpu_buffer->nr_pages_to_update);
+ success = rb_insert_pages(cpu_buffer);
else
- rb_remove_pages(cpu_buffer, -cpu_buffer->nr_pages_to_update);
+ success = rb_remove_pages(cpu_buffer,
+ -cpu_buffer->nr_pages_to_update);

- cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update;
+ if (success)
+ cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update;
}

static void update_pages_handler(struct work_struct *work)
@@ -3772,6 +3823,7 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
cpu_buffer->commit_page = cpu_buffer->head_page;

INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
+ INIT_LIST_HEAD(&cpu_buffer->new_pages);
local_set(&cpu_buffer->reader_page->write, 0);
local_set(&cpu_buffer->reader_page->entries, 0);
local_set(&cpu_buffer->reader_page->page->commit, 0);
--
1.7.7.3

2012-05-04 02:01:06

by Vaibhav Nagarnaik

[permalink] [raw]
Subject: [PATCH v7 3/3] trace: change CPU ring buffer state from tracing_cpumask

According to Documentation/trace/ftrace.txt:

tracing_cpumask:

This is a mask that lets the user only trace
on specified CPUS. The format is a hex string
representing the CPUS.

The tracing_cpumask currently doesn't affect the tracing state of
per-CPU ring buffers.

This patch enables/disables CPU recording as its corresponding bit in
tracing_cpumask is set/unset.

Signed-off-by: Vaibhav Nagarnaik <[email protected]>
---
kernel/trace/trace.c | 2 ++
1 files changed, 2 insertions(+), 0 deletions(-)

diff --git a/kernel/trace/trace.c b/kernel/trace/trace.c
index 401d77a..6d4c2dd 100644
--- a/kernel/trace/trace.c
+++ b/kernel/trace/trace.c
@@ -2650,10 +2650,12 @@ tracing_cpumask_write(struct file *filp, const char __user *ubuf,
if (cpumask_test_cpu(cpu, tracing_cpumask) &&
!cpumask_test_cpu(cpu, tracing_cpumask_new)) {
atomic_inc(&global_trace.data[cpu]->disabled);
+ ring_buffer_record_disable_cpu(global_trace.buffer, cpu);
}
if (!cpumask_test_cpu(cpu, tracing_cpumask) &&
cpumask_test_cpu(cpu, tracing_cpumask_new)) {
atomic_dec(&global_trace.data[cpu]->disabled);
+ ring_buffer_record_enable_cpu(global_trace.buffer, cpu);
}
}
arch_spin_unlock(&ftrace_max_lock);
--
1.7.7.3

2012-05-07 20:22:28

by Steven Rostedt

[permalink] [raw]
Subject: Re: [PATCH v7 1/3] trace: Make removal of ring buffer pages atomic

On Thu, 2012-05-03 at 18:59 -0700, Vaibhav Nagarnaik wrote:
> This patch adds the capability to remove pages from a ring buffer
> without destroying any existing data in it.
>
> This is done by removing the pages after the tail page. This makes sure
> that first all the empty pages in the ring buffer are removed. If the
> head page is one in the list of pages to be removed, then the page after
> the removed ones is made the head page. This removes the oldest data
> from the ring buffer and keeps the latest data around to be read.
>
> To do this in a non-racey manner, tracing is stopped for a very short
> time while the pages to be removed are identified and unlinked from the
> ring buffer. The pages are freed after the tracing is restarted to
> minimize the time needed to stop tracing.
>
> The context in which the pages from the per-cpu ring buffer are removed
> runs on the respective CPU. This minimizes the events not traced to only
> NMI trace contexts.
>
> Signed-off-by: Vaibhav Nagarnaik <[email protected]>

After applying this patch, I get this:

# trace-cmd start -e all
# echo 100 > /debug/tracing/buffer_size_kb

BUG: scheduling while atomic: trace-cmd/2018/0x00000002
no locks held by trace-cmd/2018.
Modules linked in: ipt_MASQUERADE iptable_nat nf_nat sunrpc bridge stp llc ip6t_REJECT nf_conntrack_ipv6 nf_defrag_ipv6 ip6table_filter ip6_tables ipv6 kvm uinput snd_hda_codec_idt
snd_hda_intel snd_hda_codec snd_hwdep snd_seq snd_seq_device snd_pcm snd_timer snd soundcore snd_page_alloc i2c_i801 shpchp microcode pata_acpi firewire_ohci firewire_core crc_itu
_t ata_generic i915 drm_kms_helper drm i2c_algo_bit i2c_core video [last unloaded: scsi_wait_scan]
Pid: 2018, comm: trace-cmd Not tainted 3.4.0-rc2-test+ #2
Call Trace:
[<ffffffff81480d52>] __schedule_bug+0x66/0x6a
[<ffffffff81487d16>] __schedule+0x93/0x605
[<ffffffff8107a045>] ? __lock_acquire+0x4dc/0xcf1
[<ffffffff8148833b>] schedule+0x64/0x66
[<ffffffff81486b28>] schedule_timeout+0x37/0xf7
[<ffffffff81489386>] ? _raw_spin_unlock_irq+0x2d/0x5e
[<ffffffff8107b117>] ? trace_hardirqs_on_caller+0x121/0x158
[<ffffffff81487b73>] wait_for_common+0x97/0xf1
[<ffffffff8105c922>] ? try_to_wake_up+0x1ec/0x1ec
[<ffffffff810a4d29>] ? call_rcu_bh+0x19/0x19
[<ffffffff810b4845>] ? tracing_iter_reset+0x8b/0x8b
[<ffffffff81487c81>] wait_for_completion+0x1d/0x1f
[<ffffffff8104ba3d>] wait_rcu_gp+0x5c/0x77
[<ffffffff8104ba58>] ? wait_rcu_gp+0x77/0x77
[<ffffffff810a39eb>] synchronize_sched+0x25/0x27
[<ffffffff810ae27f>] ring_buffer_reset_cpu+0x4e/0xd1
[<ffffffff810b4845>] ? tracing_iter_reset+0x8b/0x8b
[<ffffffff810b30e6>] tracing_reset_online_cpus+0x49/0x74
[<ffffffff810b4885>] tracing_open+0x40/0x2c9
[<ffffffff810b4845>] ? tracing_iter_reset+0x8b/0x8b
[<ffffffff8111491b>] __dentry_open+0x166/0x299
[<ffffffff81115850>] nameidata_to_filp+0x60/0x67
[<ffffffff8112270a>] do_last+0x565/0x59f
[<ffffffff81122922>] path_openat+0xd0/0x30e
[<ffffffff8107ace8>] ? lock_acquire+0xe0/0x112
[<ffffffff8112cc9e>] ? alloc_fd+0x3c/0xfe
[<ffffffff81122c5d>] do_filp_open+0x38/0x86
[<ffffffff814892d1>] ? _raw_spin_unlock+0x48/0x56
[<ffffffff8112cd4e>] ? alloc_fd+0xec/0xfe
[<ffffffff811158c6>] do_sys_open+0x6f/0x101
[<ffffffff81115979>] sys_open+0x21/0x23
[<ffffffff8148f3e9>] system_call_fastpath+0x16/0x1b

Let me know if you need my config.

-- Steve

2012-05-07 21:49:23

by Vaibhav Nagarnaik

[permalink] [raw]
Subject: Re: [PATCH v7 1/3] trace: Make removal of ring buffer pages atomic

On Mon, May 7, 2012 at 1:22 PM, Steven Rostedt <[email protected]> wrote:
> After applying this patch, I get this:
>
> # trace-cmd start -e all
> # echo 100 > /debug/tracing/buffer_size_kb
>
> BUG: scheduling while atomic: trace-cmd/2018/0x00000002
> no locks held by trace-cmd/2018.
> Modules linked in: ipt_MASQUERADE iptable_nat nf_nat sunrpc bridge stp llc ip6t_REJECT nf_conntrack_ipv6 nf_defrag_ipv6 ip6table_filter ip6_tables ipv6 kvm uinput snd_hda_codec_idt
> ?snd_hda_intel snd_hda_codec snd_hwdep snd_seq snd_seq_device snd_pcm snd_timer snd soundcore snd_page_alloc i2c_i801 shpchp microcode pata_acpi firewire_ohci firewire_core crc_itu
> _t ata_generic i915 drm_kms_helper drm i2c_algo_bit i2c_core video [last unloaded: scsi_wait_scan]
> Pid: 2018, comm: trace-cmd Not tainted 3.4.0-rc2-test+ #2
> Call Trace:
> ?[<ffffffff81480d52>] __schedule_bug+0x66/0x6a
> ?[<ffffffff81487d16>] __schedule+0x93/0x605
> ?[<ffffffff8107a045>] ? __lock_acquire+0x4dc/0xcf1
> ?[<ffffffff8148833b>] schedule+0x64/0x66
> ?[<ffffffff81486b28>] schedule_timeout+0x37/0xf7
> ?[<ffffffff81489386>] ? _raw_spin_unlock_irq+0x2d/0x5e
> ?[<ffffffff8107b117>] ? trace_hardirqs_on_caller+0x121/0x158
> ?[<ffffffff81487b73>] wait_for_common+0x97/0xf1
> ?[<ffffffff8105c922>] ? try_to_wake_up+0x1ec/0x1ec
> ?[<ffffffff810a4d29>] ? call_rcu_bh+0x19/0x19
> ?[<ffffffff810b4845>] ? tracing_iter_reset+0x8b/0x8b
> ?[<ffffffff81487c81>] wait_for_completion+0x1d/0x1f
> ?[<ffffffff8104ba3d>] wait_rcu_gp+0x5c/0x77
> ?[<ffffffff8104ba58>] ? wait_rcu_gp+0x77/0x77
> ?[<ffffffff810a39eb>] synchronize_sched+0x25/0x27
> ?[<ffffffff810ae27f>] ring_buffer_reset_cpu+0x4e/0xd1

The following seems to be the culprit. I am guessing you have a preempt
kernel?

@@ -3662,8 +3808,12 @@ void ring_buffer_reset_cpu(struct ring_buffer
*buffer, int cpu)
if (!cpumask_test_cpu(cpu, buffer->cpumask))
return;

+ atomic_inc(&buffer->resize_disabled);
atomic_inc(&cpu_buffer->record_disabled);

+ /* Make sure all commits have finished */
+ synchronize_sched();
+
raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);


I guess I can disable resizing in ring_buffer_record_disable(), that
seems to be a reasonable assumption.



Vaibhav Nagarnaik

2012-05-08 00:14:41

by Steven Rostedt

[permalink] [raw]
Subject: Re: [PATCH v7 1/3] trace: Make removal of ring buffer pages atomic

On Mon, 2012-05-07 at 14:48 -0700, Vaibhav Nagarnaik wrote:

> The following seems to be the culprit. I am guessing you have a preempt
> kernel?

I'm one of the real-time Linux maintainers, what do you think ;-)

>
> @@ -3662,8 +3808,12 @@ void ring_buffer_reset_cpu(struct ring_buffer
> *buffer, int cpu)
> if (!cpumask_test_cpu(cpu, buffer->cpumask))
> return;
>
> + atomic_inc(&buffer->resize_disabled);
> atomic_inc(&cpu_buffer->record_disabled);
>
> + /* Make sure all commits have finished */
> + synchronize_sched();
> +
> raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
>
>
> I guess I can disable resizing in ring_buffer_record_disable(), that
> seems to be a reasonable assumption.
>

Looking into this, the culprit is really the __tracing_reset():

static void __tracing_reset(struct ring_buffer *buffer, int cpu)
{
ftrace_disable_cpu();
ring_buffer_reset_cpu(buffer, cpu);
ftrace_enable_cpu();
}


This function is useless. It's from the time the ring buffer was being
converted to lockless, but today it's no longer needed. The reset of the
ring buffer just needs to have the ring buffer disabled.

The bug is on my end ;-)

We can nuke the __tracing_reset() and just call ring_buffer_reset_cpu()
directly. We no longer need those "ftrace_disable/enable_cpu()s". The
comments for the ring_buffer_reset*() should state that the ring buffer
must be disabled before calling this.

Actually, your patch fixes this. As it makes the reset itself takes care
of the the ring buffer being disabled.

OK, you don't need to send another patch set (yet ;-), I'll fix the
ftrace side, and then apply your patches, and then see what else breaks.

-- Steve

2012-05-09 03:38:06

by Steven Rostedt

[permalink] [raw]
Subject: Re: [PATCH v7 1/3] trace: Make removal of ring buffer pages atomic

On Thu, 2012-05-03 at 18:59 -0700, Vaibhav Nagarnaik wrote:

> + get_online_cpus();
> + /*
> + * Fire off all the required work handlers
> + * Look out for offline CPUs
> + */
> + for_each_buffer_cpu(buffer, cpu) {
> + cpu_buffer = buffer->buffers[cpu];
> + if (!cpu_buffer->nr_pages_to_update ||
> + !cpu_online(cpu))
> + continue;
> +
> + schedule_work_on(cpu, &cpu_buffer->update_pages_work);
> + }
> + /*
> + * This loop is for the CPUs that are not online.
> + * We can't schedule anything on them, but it's not necessary
> + * since we can change their buffer sizes without any race.
> + */
> + for_each_buffer_cpu(buffer, cpu) {
> + cpu_buffer = buffer->buffers[cpu];
> + if (!cpu_buffer->nr_pages_to_update ||
> + cpu_online(cpu))
> + continue;
> +
> + rb_update_pages(cpu_buffer);
> }

BTW, why the two loops and not just:

for_each_buffer_cpu(buffer, cpu) {
cpu_buffer = buffer->buffers[cpu];
if (!cpu_buffer->nr_pages_to_update)
continue;

if (cpu_online(cpu))
schedule_work_on(cpu, &cpu_buffer->update_pages_work);
else
rb_update_pages(cpu_buffer);
}

??


>
> /* wait for all the updates to complete */
> for_each_buffer_cpu(buffer, cpu) {
> cpu_buffer = buffer->buffers[cpu];
> - if (cpu_buffer->nr_pages_to_update) {
> - update_pages_handler(cpu_buffer);
> - }
> + if (!cpu_buffer->nr_pages_to_update||

!cpu_buffer->nr_pages_to_update ||

-- Steve

> + !cpu_online(cpu))
> + continue;
> +
> + wait_for_completion(&cpu_buffer->update_completion);
> + /* reset this value */
> + cpu_buffer->nr_pages_to_update = 0;
> }

2012-05-09 05:01:05

by Vaibhav Nagarnaik

[permalink] [raw]
Subject: Re: [PATCH v7 1/3] trace: Make removal of ring buffer pages atomic

On Tue, May 8, 2012 at 8:38 PM, Steven Rostedt <[email protected]> wrote:
> BTW, why the two loops and not just:
>
> ? ? ? ? ? ? ? ?for_each_buffer_cpu(buffer, cpu) {
> ? ? ? ? ? ? ? ? ? ? ? ?cpu_buffer = buffer->buffers[cpu];
> ? ? ? ? ? ? ? ? ? ? ? ?if (!cpu_buffer->nr_pages_to_update)
> ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?continue;
>
> ? ? ? ? ? ? ? ? ? ? ? ?if (cpu_online(cpu))
> ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?schedule_work_on(cpu, &cpu_buffer->update_pages_work);
> ? ? ? ? ? ? ? ? ? ? ? ?else
> ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?rb_update_pages(cpu_buffer);
> ? ? ? ? ? ? ? ?}
>
> ??
>
>
>>
>> ? ? ? ? ? ? ? /* wait for all the updates to complete */
>> ? ? ? ? ? ? ? for_each_buffer_cpu(buffer, cpu) {
>> ? ? ? ? ? ? ? ? ? ? ? cpu_buffer = buffer->buffers[cpu];
>> - ? ? ? ? ? ? ? ? ? ? if (cpu_buffer->nr_pages_to_update) {
>> - ? ? ? ? ? ? ? ? ? ? ? ? ? ? update_pages_handler(cpu_buffer);
>> - ? ? ? ? ? ? ? ? ? ? }
>> + ? ? ? ? ? ? ? ? ? ? if (!cpu_buffer->nr_pages_to_update||
>
> ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?!cpu_buffer->nr_pages_to_update ||

This schedules work for all online CPUs and the offline CPUs resizing
(if any) can occur concurrently. It might not be too much of big deal
to just make it one loop.



Vaibhav Nagarnaik

2012-05-09 14:29:52

by Steven Rostedt

[permalink] [raw]
Subject: Re: [PATCH v7 1/3] trace: Make removal of ring buffer pages atomic

On Tue, 2012-05-08 at 22:00 -0700, Vaibhav Nagarnaik wrote:
> On Tue, May 8, 2012 at 8:38 PM, Steven Rostedt <[email protected]> wrote:
> > BTW, why the two loops and not just:
> >
> > for_each_buffer_cpu(buffer, cpu) {
> > cpu_buffer = buffer->buffers[cpu];
> > if (!cpu_buffer->nr_pages_to_update)
> > continue;
> >
> > if (cpu_online(cpu))
> > schedule_work_on(cpu, &cpu_buffer->update_pages_work);
> > else
> > rb_update_pages(cpu_buffer);
> > }
> >
> > ??
> >
> >
> >>
> >> /* wait for all the updates to complete */
> >> for_each_buffer_cpu(buffer, cpu) {
> >> cpu_buffer = buffer->buffers[cpu];
> >> - if (cpu_buffer->nr_pages_to_update) {
> >> - update_pages_handler(cpu_buffer);
> >> - }
> >> + if (!cpu_buffer->nr_pages_to_update||
> >
> > !cpu_buffer->nr_pages_to_update ||
>
> This schedules work for all online CPUs and the offline CPUs resizing
> (if any) can occur concurrently. It might not be too much of big deal
> to just make it one loop.

This is far from a hot path. In fact, it's quite slow. Lets not uglify
code just to optimize something that's not time critical.

Please combine these two into a single loop. The wait for completion is
fine as a separate loop.

Thanks,

-- Steve

2012-05-09 17:46:53

by Vaibhav Nagarnaik

[permalink] [raw]
Subject: Re: [PATCH v7 1/3] trace: Make removal of ring buffer pages atomic

On Wed, May 9, 2012 at 7:29 AM, Steven Rostedt <[email protected]> wrote:
> This is far from a hot path. In fact, it's quite slow. Lets not uglify
> code just to optimize something that's not time critical.
>
> Please combine these two into a single loop. The wait for completion is
> fine as a separate loop.
>

Sure, I will update the patch with that change. Have you found any
other issues in testing the patch?

Vaibhav Nagarnaik

2012-05-09 17:54:22

by Steven Rostedt

[permalink] [raw]
Subject: Re: [PATCH v7 1/3] trace: Make removal of ring buffer pages atomic

On Wed, 2012-05-09 at 10:46 -0700, Vaibhav Nagarnaik wrote:
> On Wed, May 9, 2012 at 7:29 AM, Steven Rostedt <[email protected]> wrote:
> > This is far from a hot path. In fact, it's quite slow. Lets not uglify
> > code just to optimize something that's not time critical.
> >
> > Please combine these two into a single loop. The wait for completion is
> > fine as a separate loop.
> >
>
> Sure, I will update the patch with that change. Have you found any
> other issues in testing the patch?
>

Nope, not yet. Sorry for the spotty checks, I'm doing several other
things in between.

-- Steve

2012-05-19 10:17:54

by Vaibhav Nagarnaik

[permalink] [raw]
Subject: [tip:perf/core] ring-buffer: Make removal of ring buffer pages atomic

Commit-ID: 83f40318dab00e3298a1f6d0b12ac025e84e478d
Gitweb: http://git.kernel.org/tip/83f40318dab00e3298a1f6d0b12ac025e84e478d
Author: Vaibhav Nagarnaik <[email protected]>
AuthorDate: Thu, 3 May 2012 18:59:50 -0700
Committer: Steven Rostedt <[email protected]>
CommitDate: Wed, 16 May 2012 16:18:57 -0400

ring-buffer: Make removal of ring buffer pages atomic

This patch adds the capability to remove pages from a ring buffer
without destroying any existing data in it.

This is done by removing the pages after the tail page. This makes sure
that first all the empty pages in the ring buffer are removed. If the
head page is one in the list of pages to be removed, then the page after
the removed ones is made the head page. This removes the oldest data
from the ring buffer and keeps the latest data around to be read.

To do this in a non-racey manner, tracing is stopped for a very short
time while the pages to be removed are identified and unlinked from the
ring buffer. The pages are freed after the tracing is restarted to
minimize the time needed to stop tracing.

The context in which the pages from the per-cpu ring buffer are removed
runs on the respective CPU. This minimizes the events not traced to only
NMI trace contexts.

Link: http://lkml.kernel.org/r/[email protected]

Cc: Frederic Weisbecker <[email protected]>
Cc: Ingo Molnar <[email protected]>
Cc: Laurent Chavey <[email protected]>
Cc: Justin Teravest <[email protected]>
Cc: David Sharp <[email protected]>
Signed-off-by: Vaibhav Nagarnaik <[email protected]>
Signed-off-by: Steven Rostedt <[email protected]>
---
kernel/trace/ring_buffer.c | 265 ++++++++++++++++++++++++++++++++++----------
kernel/trace/trace.c | 20 +---
2 files changed, 209 insertions(+), 76 deletions(-)

diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index 2d5eb33..27ac37e 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -23,6 +23,8 @@
#include <asm/local.h>
#include "trace.h"

+static void update_pages_handler(struct work_struct *work);
+
/*
* The ring buffer header is special. We must manually up keep it.
*/
@@ -470,12 +472,15 @@ struct ring_buffer_per_cpu {
/* ring buffer pages to update, > 0 to add, < 0 to remove */
int nr_pages_to_update;
struct list_head new_pages; /* new pages to add */
+ struct work_struct update_pages_work;
+ struct completion update_completion;
};

struct ring_buffer {
unsigned flags;
int cpus;
atomic_t record_disabled;
+ atomic_t resize_disabled;
cpumask_var_t cpumask;

struct lock_class_key *reader_lock_key;
@@ -1048,6 +1053,8 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int nr_pages, int cpu)
raw_spin_lock_init(&cpu_buffer->reader_lock);
lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
+ INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler);
+ init_completion(&cpu_buffer->update_completion);

bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
GFP_KERNEL, cpu_to_node(cpu));
@@ -1235,32 +1242,123 @@ void ring_buffer_set_clock(struct ring_buffer *buffer,

static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);

+static inline unsigned long rb_page_entries(struct buffer_page *bpage)
+{
+ return local_read(&bpage->entries) & RB_WRITE_MASK;
+}
+
+static inline unsigned long rb_page_write(struct buffer_page *bpage)
+{
+ return local_read(&bpage->write) & RB_WRITE_MASK;
+}
+
static void
-rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
+rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned int nr_pages)
{
- struct buffer_page *bpage;
- struct list_head *p;
- unsigned i;
+ struct list_head *tail_page, *to_remove, *next_page;
+ struct buffer_page *to_remove_page, *tmp_iter_page;
+ struct buffer_page *last_page, *first_page;
+ unsigned int nr_removed;
+ unsigned long head_bit;
+ int page_entries;
+
+ head_bit = 0;

raw_spin_lock_irq(&cpu_buffer->reader_lock);
- rb_head_page_deactivate(cpu_buffer);
+ atomic_inc(&cpu_buffer->record_disabled);
+ /*
+ * We don't race with the readers since we have acquired the reader
+ * lock. We also don't race with writers after disabling recording.
+ * This makes it easy to figure out the first and the last page to be
+ * removed from the list. We unlink all the pages in between including
+ * the first and last pages. This is done in a busy loop so that we
+ * lose the least number of traces.
+ * The pages are freed after we restart recording and unlock readers.
+ */
+ tail_page = &cpu_buffer->tail_page->list;

- for (i = 0; i < nr_pages; i++) {
- if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)))
- goto out;
- p = cpu_buffer->pages->next;
- bpage = list_entry(p, struct buffer_page, list);
- list_del_init(&bpage->list);
- free_buffer_page(bpage);
+ /*
+ * tail page might be on reader page, we remove the next page
+ * from the ring buffer
+ */
+ if (cpu_buffer->tail_page == cpu_buffer->reader_page)
+ tail_page = rb_list_head(tail_page->next);
+ to_remove = tail_page;
+
+ /* start of pages to remove */
+ first_page = list_entry(rb_list_head(to_remove->next),
+ struct buffer_page, list);
+
+ for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) {
+ to_remove = rb_list_head(to_remove)->next;
+ head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD;
}
- if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)))
- goto out;

- rb_reset_cpu(cpu_buffer);
- rb_check_pages(cpu_buffer);
+ next_page = rb_list_head(to_remove)->next;

-out:
+ /*
+ * Now we remove all pages between tail_page and next_page.
+ * Make sure that we have head_bit value preserved for the
+ * next page
+ */
+ tail_page->next = (struct list_head *)((unsigned long)next_page |
+ head_bit);
+ next_page = rb_list_head(next_page);
+ next_page->prev = tail_page;
+
+ /* make sure pages points to a valid page in the ring buffer */
+ cpu_buffer->pages = next_page;
+
+ /* update head page */
+ if (head_bit)
+ cpu_buffer->head_page = list_entry(next_page,
+ struct buffer_page, list);
+
+ /*
+ * change read pointer to make sure any read iterators reset
+ * themselves
+ */
+ cpu_buffer->read = 0;
+
+ /* pages are removed, resume tracing and then free the pages */
+ atomic_dec(&cpu_buffer->record_disabled);
raw_spin_unlock_irq(&cpu_buffer->reader_lock);
+
+ RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages));
+
+ /* last buffer page to remove */
+ last_page = list_entry(rb_list_head(to_remove), struct buffer_page,
+ list);
+ tmp_iter_page = first_page;
+
+ do {
+ to_remove_page = tmp_iter_page;
+ rb_inc_page(cpu_buffer, &tmp_iter_page);
+
+ /* update the counters */
+ page_entries = rb_page_entries(to_remove_page);
+ if (page_entries) {
+ /*
+ * If something was added to this page, it was full
+ * since it is not the tail page. So we deduct the
+ * bytes consumed in ring buffer from here.
+ * No need to update overruns, since this page is
+ * deleted from ring buffer and its entries are
+ * already accounted for.
+ */
+ local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes);
+ }
+
+ /*
+ * We have already removed references to this list item, just
+ * free up the buffer_page and its page
+ */
+ free_buffer_page(to_remove_page);
+ nr_removed--;
+
+ } while (to_remove_page != last_page);
+
+ RB_WARN_ON(cpu_buffer, nr_removed);
}

static void
@@ -1272,6 +1370,8 @@ rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
unsigned i;

raw_spin_lock_irq(&cpu_buffer->reader_lock);
+ /* stop the writers while inserting pages */
+ atomic_inc(&cpu_buffer->record_disabled);
rb_head_page_deactivate(cpu_buffer);

for (i = 0; i < nr_pages; i++) {
@@ -1286,19 +1386,27 @@ rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
rb_check_pages(cpu_buffer);

out:
+ atomic_dec(&cpu_buffer->record_disabled);
raw_spin_unlock_irq(&cpu_buffer->reader_lock);
}

-static void update_pages_handler(struct ring_buffer_per_cpu *cpu_buffer)
+static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer)
{
if (cpu_buffer->nr_pages_to_update > 0)
rb_insert_pages(cpu_buffer, &cpu_buffer->new_pages,
cpu_buffer->nr_pages_to_update);
else
rb_remove_pages(cpu_buffer, -cpu_buffer->nr_pages_to_update);
+
cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update;
- /* reset this value */
- cpu_buffer->nr_pages_to_update = 0;
+}
+
+static void update_pages_handler(struct work_struct *work)
+{
+ struct ring_buffer_per_cpu *cpu_buffer = container_of(work,
+ struct ring_buffer_per_cpu, update_pages_work);
+ rb_update_pages(cpu_buffer);
+ complete(&cpu_buffer->update_completion);
}

/**
@@ -1308,14 +1416,14 @@ static void update_pages_handler(struct ring_buffer_per_cpu *cpu_buffer)
*
* Minimum size is 2 * BUF_PAGE_SIZE.
*
- * Returns -1 on failure.
+ * Returns 0 on success and < 0 on failure.
*/
int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size,
int cpu_id)
{
struct ring_buffer_per_cpu *cpu_buffer;
unsigned nr_pages;
- int cpu;
+ int cpu, err = 0;

/*
* Always succeed at resizing a non-existent buffer:
@@ -1330,15 +1438,18 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size,
if (size < BUF_PAGE_SIZE * 2)
size = BUF_PAGE_SIZE * 2;

- atomic_inc(&buffer->record_disabled);
+ nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);

- /* Make sure all writers are done with this buffer. */
- synchronize_sched();
+ /*
+ * Don't succeed if resizing is disabled, as a reader might be
+ * manipulating the ring buffer and is expecting a sane state while
+ * this is true.
+ */
+ if (atomic_read(&buffer->resize_disabled))
+ return -EBUSY;

+ /* prevent another thread from changing buffer sizes */
mutex_lock(&buffer->mutex);
- get_online_cpus();
-
- nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);

if (cpu_id == RING_BUFFER_ALL_CPUS) {
/* calculate the pages to update */
@@ -1347,33 +1458,67 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size,

cpu_buffer->nr_pages_to_update = nr_pages -
cpu_buffer->nr_pages;
-
/*
* nothing more to do for removing pages or no update
*/
if (cpu_buffer->nr_pages_to_update <= 0)
continue;
-
/*
* to add pages, make sure all new pages can be
* allocated without receiving ENOMEM
*/
INIT_LIST_HEAD(&cpu_buffer->new_pages);
if (__rb_allocate_pages(cpu_buffer->nr_pages_to_update,
- &cpu_buffer->new_pages, cpu))
+ &cpu_buffer->new_pages, cpu)) {
/* not enough memory for new pages */
- goto no_mem;
+ err = -ENOMEM;
+ goto out_err;
+ }
+ }
+
+ get_online_cpus();
+ /*
+ * Fire off all the required work handlers
+ * Look out for offline CPUs
+ */
+ for_each_buffer_cpu(buffer, cpu) {
+ cpu_buffer = buffer->buffers[cpu];
+ if (!cpu_buffer->nr_pages_to_update ||
+ !cpu_online(cpu))
+ continue;
+
+ schedule_work_on(cpu, &cpu_buffer->update_pages_work);
+ }
+ /*
+ * This loop is for the CPUs that are not online.
+ * We can't schedule anything on them, but it's not necessary
+ * since we can change their buffer sizes without any race.
+ */
+ for_each_buffer_cpu(buffer, cpu) {
+ cpu_buffer = buffer->buffers[cpu];
+ if (!cpu_buffer->nr_pages_to_update ||
+ cpu_online(cpu))
+ continue;
+
+ rb_update_pages(cpu_buffer);
}

/* wait for all the updates to complete */
for_each_buffer_cpu(buffer, cpu) {
cpu_buffer = buffer->buffers[cpu];
- if (cpu_buffer->nr_pages_to_update) {
- update_pages_handler(cpu_buffer);
- }
+ if (!cpu_buffer->nr_pages_to_update ||
+ !cpu_online(cpu))
+ continue;
+
+ wait_for_completion(&cpu_buffer->update_completion);
+ /* reset this value */
+ cpu_buffer->nr_pages_to_update = 0;
}
+
+ put_online_cpus();
} else {
cpu_buffer = buffer->buffers[cpu_id];
+
if (nr_pages == cpu_buffer->nr_pages)
goto out;

@@ -1383,38 +1528,47 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size,
INIT_LIST_HEAD(&cpu_buffer->new_pages);
if (cpu_buffer->nr_pages_to_update > 0 &&
__rb_allocate_pages(cpu_buffer->nr_pages_to_update,
- &cpu_buffer->new_pages, cpu_id))
- goto no_mem;
+ &cpu_buffer->new_pages, cpu_id)) {
+ err = -ENOMEM;
+ goto out_err;
+ }

- update_pages_handler(cpu_buffer);
+ get_online_cpus();
+
+ if (cpu_online(cpu_id)) {
+ schedule_work_on(cpu_id,
+ &cpu_buffer->update_pages_work);
+ wait_for_completion(&cpu_buffer->update_completion);
+ } else
+ rb_update_pages(cpu_buffer);
+
+ put_online_cpus();
+ /* reset this value */
+ cpu_buffer->nr_pages_to_update = 0;
}

out:
- put_online_cpus();
mutex_unlock(&buffer->mutex);
-
- atomic_dec(&buffer->record_disabled);
-
return size;

- no_mem:
+ out_err:
for_each_buffer_cpu(buffer, cpu) {
struct buffer_page *bpage, *tmp;
+
cpu_buffer = buffer->buffers[cpu];
- /* reset this number regardless */
cpu_buffer->nr_pages_to_update = 0;
+
if (list_empty(&cpu_buffer->new_pages))
continue;
+
list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
list) {
list_del_init(&bpage->list);
free_buffer_page(bpage);
}
}
- put_online_cpus();
mutex_unlock(&buffer->mutex);
- atomic_dec(&buffer->record_disabled);
- return -ENOMEM;
+ return err;
}
EXPORT_SYMBOL_GPL(ring_buffer_resize);

@@ -1453,21 +1607,11 @@ rb_iter_head_event(struct ring_buffer_iter *iter)
return __rb_page_index(iter->head_page, iter->head);
}

-static inline unsigned long rb_page_write(struct buffer_page *bpage)
-{
- return local_read(&bpage->write) & RB_WRITE_MASK;
-}
-
static inline unsigned rb_page_commit(struct buffer_page *bpage)
{
return local_read(&bpage->page->commit);
}

-static inline unsigned long rb_page_entries(struct buffer_page *bpage)
-{
- return local_read(&bpage->entries) & RB_WRITE_MASK;
-}
-
/* Size is determined by what has been committed */
static inline unsigned rb_page_size(struct buffer_page *bpage)
{
@@ -3492,6 +3636,7 @@ ring_buffer_read_prepare(struct ring_buffer *buffer, int cpu)

iter->cpu_buffer = cpu_buffer;

+ atomic_inc(&buffer->resize_disabled);
atomic_inc(&cpu_buffer->record_disabled);

return iter;
@@ -3555,6 +3700,7 @@ ring_buffer_read_finish(struct ring_buffer_iter *iter)
struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;

atomic_dec(&cpu_buffer->record_disabled);
+ atomic_dec(&cpu_buffer->buffer->resize_disabled);
kfree(iter);
}
EXPORT_SYMBOL_GPL(ring_buffer_read_finish);
@@ -3662,8 +3808,12 @@ void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
if (!cpumask_test_cpu(cpu, buffer->cpumask))
return;

+ atomic_inc(&buffer->resize_disabled);
atomic_inc(&cpu_buffer->record_disabled);

+ /* Make sure all commits have finished */
+ synchronize_sched();
+
raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);

if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing)))
@@ -3679,6 +3829,7 @@ void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);

atomic_dec(&cpu_buffer->record_disabled);
+ atomic_dec(&buffer->resize_disabled);
}
EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu);

diff --git a/kernel/trace/trace.c b/kernel/trace/trace.c
index d1b3469..dfbd86c 100644
--- a/kernel/trace/trace.c
+++ b/kernel/trace/trace.c
@@ -3076,20 +3076,10 @@ static int __tracing_resize_ring_buffer(unsigned long size, int cpu)

static ssize_t tracing_resize_ring_buffer(unsigned long size, int cpu_id)
{
- int cpu, ret = size;
+ int ret = size;

mutex_lock(&trace_types_lock);

- tracing_stop();
-
- /* disable all cpu buffers */
- for_each_tracing_cpu(cpu) {
- if (global_trace.data[cpu])
- atomic_inc(&global_trace.data[cpu]->disabled);
- if (max_tr.data[cpu])
- atomic_inc(&max_tr.data[cpu]->disabled);
- }
-
if (cpu_id != RING_BUFFER_ALL_CPUS) {
/* make sure, this cpu is enabled in the mask */
if (!cpumask_test_cpu(cpu_id, tracing_buffer_mask)) {
@@ -3103,14 +3093,6 @@ static ssize_t tracing_resize_ring_buffer(unsigned long size, int cpu_id)
ret = -ENOMEM;

out:
- for_each_tracing_cpu(cpu) {
- if (global_trace.data[cpu])
- atomic_dec(&global_trace.data[cpu]->disabled);
- if (max_tr.data[cpu])
- atomic_dec(&max_tr.data[cpu]->disabled);
- }
-
- tracing_start();
mutex_unlock(&trace_types_lock);

return ret;

2012-05-19 10:18:39

by Vaibhav Nagarnaik

[permalink] [raw]
Subject: [tip:perf/core] ring-buffer: Make addition of pages in ring buffer atomic

Commit-ID: 5040b4b7bcc26a311c799d46f67174bcb20d05dd
Gitweb: http://git.kernel.org/tip/5040b4b7bcc26a311c799d46f67174bcb20d05dd
Author: Vaibhav Nagarnaik <[email protected]>
AuthorDate: Thu, 3 May 2012 18:59:51 -0700
Committer: Steven Rostedt <[email protected]>
CommitDate: Wed, 16 May 2012 16:25:51 -0400

ring-buffer: Make addition of pages in ring buffer atomic

This patch adds the capability to add new pages to a ring buffer
atomically while write operations are going on. This makes it possible
to expand the ring buffer size without reinitializing the ring buffer.

The new pages are attached between the head page and its previous page.

Link: http://lkml.kernel.org/r/[email protected]

Cc: Frederic Weisbecker <[email protected]>
Cc: Ingo Molnar <[email protected]>
Cc: Laurent Chavey <[email protected]>
Cc: Justin Teravest <[email protected]>
Cc: David Sharp <[email protected]>
Signed-off-by: Vaibhav Nagarnaik <[email protected]>
Signed-off-by: Steven Rostedt <[email protected]>
---
kernel/trace/ring_buffer.c | 102 +++++++++++++++++++++++++++++++++-----------
1 files changed, 77 insertions(+), 25 deletions(-)

diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index 27ac37e..d673ef0 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -1252,7 +1252,7 @@ static inline unsigned long rb_page_write(struct buffer_page *bpage)
return local_read(&bpage->write) & RB_WRITE_MASK;
}

-static void
+static int
rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned int nr_pages)
{
struct list_head *tail_page, *to_remove, *next_page;
@@ -1359,46 +1359,97 @@ rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned int nr_pages)
} while (to_remove_page != last_page);

RB_WARN_ON(cpu_buffer, nr_removed);
+
+ return nr_removed == 0;
}

-static void
-rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
- struct list_head *pages, unsigned nr_pages)
+static int
+rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer)
{
- struct buffer_page *bpage;
- struct list_head *p;
- unsigned i;
+ struct list_head *pages = &cpu_buffer->new_pages;
+ int retries, success;

raw_spin_lock_irq(&cpu_buffer->reader_lock);
- /* stop the writers while inserting pages */
- atomic_inc(&cpu_buffer->record_disabled);
- rb_head_page_deactivate(cpu_buffer);
+ /*
+ * We are holding the reader lock, so the reader page won't be swapped
+ * in the ring buffer. Now we are racing with the writer trying to
+ * move head page and the tail page.
+ * We are going to adapt the reader page update process where:
+ * 1. We first splice the start and end of list of new pages between
+ * the head page and its previous page.
+ * 2. We cmpxchg the prev_page->next to point from head page to the
+ * start of new pages list.
+ * 3. Finally, we update the head->prev to the end of new list.
+ *
+ * We will try this process 10 times, to make sure that we don't keep
+ * spinning.
+ */
+ retries = 10;
+ success = 0;
+ while (retries--) {
+ struct list_head *head_page, *prev_page, *r;
+ struct list_head *last_page, *first_page;
+ struct list_head *head_page_with_bit;

- for (i = 0; i < nr_pages; i++) {
- if (RB_WARN_ON(cpu_buffer, list_empty(pages)))
- goto out;
- p = pages->next;
- bpage = list_entry(p, struct buffer_page, list);
- list_del_init(&bpage->list);
- list_add_tail(&bpage->list, cpu_buffer->pages);
+ head_page = &rb_set_head_page(cpu_buffer)->list;
+ prev_page = head_page->prev;
+
+ first_page = pages->next;
+ last_page = pages->prev;
+
+ head_page_with_bit = (struct list_head *)
+ ((unsigned long)head_page | RB_PAGE_HEAD);
+
+ last_page->next = head_page_with_bit;
+ first_page->prev = prev_page;
+
+ r = cmpxchg(&prev_page->next, head_page_with_bit, first_page);
+
+ if (r == head_page_with_bit) {
+ /*
+ * yay, we replaced the page pointer to our new list,
+ * now, we just have to update to head page's prev
+ * pointer to point to end of list
+ */
+ head_page->prev = last_page;
+ success = 1;
+ break;
+ }
}
- rb_reset_cpu(cpu_buffer);
- rb_check_pages(cpu_buffer);

-out:
- atomic_dec(&cpu_buffer->record_disabled);
+ if (success)
+ INIT_LIST_HEAD(pages);
+ /*
+ * If we weren't successful in adding in new pages, warn and stop
+ * tracing
+ */
+ RB_WARN_ON(cpu_buffer, !success);
raw_spin_unlock_irq(&cpu_buffer->reader_lock);
+
+ /* free pages if they weren't inserted */
+ if (!success) {
+ struct buffer_page *bpage, *tmp;
+ list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
+ list) {
+ list_del_init(&bpage->list);
+ free_buffer_page(bpage);
+ }
+ }
+ return success;
}

static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer)
{
+ int success;
+
if (cpu_buffer->nr_pages_to_update > 0)
- rb_insert_pages(cpu_buffer, &cpu_buffer->new_pages,
- cpu_buffer->nr_pages_to_update);
+ success = rb_insert_pages(cpu_buffer);
else
- rb_remove_pages(cpu_buffer, -cpu_buffer->nr_pages_to_update);
+ success = rb_remove_pages(cpu_buffer,
+ -cpu_buffer->nr_pages_to_update);

- cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update;
+ if (success)
+ cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update;
}

static void update_pages_handler(struct work_struct *work)
@@ -3772,6 +3823,7 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
cpu_buffer->commit_page = cpu_buffer->head_page;

INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
+ INIT_LIST_HEAD(&cpu_buffer->new_pages);
local_set(&cpu_buffer->reader_page->write, 0);
local_set(&cpu_buffer->reader_page->entries, 0);
local_set(&cpu_buffer->reader_page->page->commit, 0);

2012-05-19 10:23:01

by Vaibhav Nagarnaik

[permalink] [raw]
Subject: [tip:perf/core] tracing: change CPU ring buffer state from tracing_cpumask

Commit-ID: 71babb2705e2203a64c27ede13ae3508a0d2c16c
Gitweb: http://git.kernel.org/tip/71babb2705e2203a64c27ede13ae3508a0d2c16c
Author: Vaibhav Nagarnaik <[email protected]>
AuthorDate: Thu, 3 May 2012 18:59:52 -0700
Committer: Steven Rostedt <[email protected]>
CommitDate: Wed, 16 May 2012 19:50:38 -0400

tracing: change CPU ring buffer state from tracing_cpumask

According to Documentation/trace/ftrace.txt:

tracing_cpumask:

This is a mask that lets the user only trace
on specified CPUS. The format is a hex string
representing the CPUS.

The tracing_cpumask currently doesn't affect the tracing state of
per-CPU ring buffers.

This patch enables/disables CPU recording as its corresponding bit in
tracing_cpumask is set/unset.

Link: http://lkml.kernel.org/r/[email protected]

Cc: Frederic Weisbecker <[email protected]>
Cc: Ingo Molnar <[email protected]>
Cc: Laurent Chavey <[email protected]>
Cc: Justin Teravest <[email protected]>
Cc: David Sharp <[email protected]>
Signed-off-by: Vaibhav Nagarnaik <[email protected]>
Signed-off-by: Steven Rostedt <[email protected]>
---
kernel/trace/trace.c | 2 ++
1 files changed, 2 insertions(+), 0 deletions(-)

diff --git a/kernel/trace/trace.c b/kernel/trace/trace.c
index 0ed4df0..08a08ba 100644
--- a/kernel/trace/trace.c
+++ b/kernel/trace/trace.c
@@ -2669,10 +2669,12 @@ tracing_cpumask_write(struct file *filp, const char __user *ubuf,
if (cpumask_test_cpu(cpu, tracing_cpumask) &&
!cpumask_test_cpu(cpu, tracing_cpumask_new)) {
atomic_inc(&global_trace.data[cpu]->disabled);
+ ring_buffer_record_disable_cpu(global_trace.buffer, cpu);
}
if (!cpumask_test_cpu(cpu, tracing_cpumask) &&
cpumask_test_cpu(cpu, tracing_cpumask_new)) {
atomic_dec(&global_trace.data[cpu]->disabled);
+ ring_buffer_record_enable_cpu(global_trace.buffer, cpu);
}
}
arch_spin_unlock(&ftrace_max_lock);