2009-03-17 06:24:31

by Tom Zanussi

[permalink] [raw]
Subject: [RFC][PATCH 2/4] tracing: add rb_event_discard() hack

This patch overloads RINGBUF_TYPE_PADDING to provide a way to discard
events from the ring buffer, for the event-filtering mechanism
introduced in a subsequent patch.

NOTE: this patch is a true hack and will likely either eventually crash
your machine or result in an endless loop of printed events. But it
sometimes works, enough to show that the filtering apparently does what
it should.

I'm posting it only to solicit comments from any ringbuffer gurus who
might know know how to do it correctly, or more likely could suggest a
better approach.

Signed-off-by: Tom Zanussi <[email protected]>

---
include/linux/ring_buffer.h | 9 ++++-
kernel/trace/ring_buffer.c | 73 ++++++++++++++++++++++++++++++-------------
2 files changed, 58 insertions(+), 24 deletions(-)

diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h
index b237e20..4292774 100644
--- a/include/linux/ring_buffer.h
+++ b/include/linux/ring_buffer.h
@@ -22,10 +22,13 @@ struct ring_buffer_event {
/**
* enum ring_buffer_type - internal ring buffer types
*
- * @RINGBUF_TYPE_PADDING: Left over page padding
+ * @RINGBUF_TYPE_PADDING: Left over page padding or discarded event
* array is ignored
- * size is variable depending on how much
+ * If time_delta is 0:
+ * size is variable depending on how much
* padding is needed
+ * If time_delta is 1:
+ * everything else same as RINGBUF_TYPE_DATA
*
* @RINGBUF_TYPE_TIME_EXTEND: Extend the time delta
* array[0] = time delta (28 .. 59)
@@ -69,6 +72,8 @@ ring_buffer_event_time_delta(struct ring_buffer_event *event)
return event->time_delta;
}

+void rb_event_discard(struct ring_buffer_event *event);
+
/*
* size is in bytes for each per CPU buffer.
*/
diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index 1d3b07d..c59e910 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -213,16 +213,50 @@ enum {
RB_LEN_TIME_STAMP = 16,
};

-/* inline for ring buffer fast paths */
+static inline int rb_null_event(struct ring_buffer_event *event)
+{
+ return event->type == RINGBUF_TYPE_PADDING && event->time_delta == 0;
+}
+
+static inline int rb_discarded_event(struct ring_buffer_event *event)
+{
+ return event->type == RINGBUF_TYPE_PADDING && event->time_delta == 1;
+}
+
+static void rb_event_set_padding(struct ring_buffer_event *event)
+{
+ event->type = RINGBUF_TYPE_PADDING;
+ event->time_delta = 0;
+}
+
+void rb_event_discard(struct ring_buffer_event *event)
+{
+ event->type = RINGBUF_TYPE_PADDING;
+ event->time_delta = 1;
+}
+
static unsigned
-rb_event_length(struct ring_buffer_event *event)
+rb_event_data_length(struct ring_buffer_event *event)
{
unsigned length;

+ if (event->len)
+ length = event->len * RB_ALIGNMENT;
+ else
+ length = event->array[0];
+ return length + RB_EVNT_HDR_SIZE;
+}
+
+/* inline for ring buffer fast paths */
+static unsigned
+rb_event_length(struct ring_buffer_event *event)
+{
switch (event->type) {
case RINGBUF_TYPE_PADDING:
- /* undefined */
- return -1;
+ if (rb_null_event(event))
+ /* undefined */
+ return -1;
+ return rb_event_data_length(event);

case RINGBUF_TYPE_TIME_EXTEND:
return RB_LEN_TIME_EXTEND;
@@ -231,11 +265,7 @@ rb_event_length(struct ring_buffer_event *event)
return RB_LEN_TIME_STAMP;

case RINGBUF_TYPE_DATA:
- if (event->len)
- length = event->len * RB_ALIGNMENT;
- else
- length = event->array[0];
- return length + RB_EVNT_HDR_SIZE;
+ return rb_event_data_length(event);
default:
BUG();
}
@@ -828,11 +858,6 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
}
EXPORT_SYMBOL_GPL(ring_buffer_resize);

-static inline int rb_null_event(struct ring_buffer_event *event)
-{
- return event->type == RINGBUF_TYPE_PADDING;
-}
-
static inline void *
__rb_data_page_index(struct buffer_data_page *bpage, unsigned index)
{
@@ -1203,7 +1228,7 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
/* Mark the rest of the page with padding */
event = __rb_page_index(tail_page, tail);
kmemcheck_annotate_bitfield(event->bitfield);
- event->type = RINGBUF_TYPE_PADDING;
+ rb_event_set_padding(event);
}

if (tail <= BUF_PAGE_SIZE)
@@ -1954,7 +1979,7 @@ static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)

event = rb_reader_event(cpu_buffer);

- if (event->type == RINGBUF_TYPE_DATA)
+ if (event->type == RINGBUF_TYPE_DATA || rb_discarded_event(event))
cpu_buffer->entries--;

rb_update_read_stamp(cpu_buffer, event);
@@ -2026,8 +2051,8 @@ rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
* can have. Nesting 10 deep of interrupts is clearly
* an anomaly.
*/
- if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10))
- return NULL;
+// if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10))
+// return NULL;

reader = rb_get_reader_page(cpu_buffer);
if (!reader)
@@ -2037,7 +2062,8 @@ rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)

switch (event->type) {
case RINGBUF_TYPE_PADDING:
- RB_WARN_ON(cpu_buffer, 1);
+ if (rb_null_event(event))
+ RB_WARN_ON(cpu_buffer, 1);
rb_advance_reader(cpu_buffer);
return NULL;

@@ -2089,8 +2115,8 @@ rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
* can have. Nesting 10 deep of interrupts is clearly
* an anomaly.
*/
- if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10))
- return NULL;
+// if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10))
+// return NULL;

if (rb_per_cpu_empty(cpu_buffer))
return NULL;
@@ -2099,7 +2125,10 @@ rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)

switch (event->type) {
case RINGBUF_TYPE_PADDING:
- rb_inc_iter(iter);
+ if (rb_null_event(event))
+ rb_inc_iter(iter);
+ else
+ rb_advance_iter(iter);
goto again;

case RINGBUF_TYPE_TIME_EXTEND:
--
1.5.6.3



2009-03-18 01:21:18

by Steven Rostedt

[permalink] [raw]
Subject: Re: [RFC][PATCH 2/4] tracing: add rb_event_discard() hack


On Tue, 17 Mar 2009, Tom Zanussi wrote:

> This patch overloads RINGBUF_TYPE_PADDING to provide a way to discard
> events from the ring buffer, for the event-filtering mechanism
> introduced in a subsequent patch.
>
> NOTE: this patch is a true hack and will likely either eventually crash
> your machine or result in an endless loop of printed events. But it
> sometimes works, enough to show that the filtering apparently does what
> it should.
>
> I'm posting it only to solicit comments from any ringbuffer gurus who
> might know know how to do it correctly, or more likely could suggest a
> better approach.

Actually, we can make this work. Just some tweaks. I may take this patch,
and massage it up to something that will work.

>
> Signed-off-by: Tom Zanussi <[email protected]>
>
> ---
> include/linux/ring_buffer.h | 9 ++++-
> kernel/trace/ring_buffer.c | 73 ++++++++++++++++++++++++++++++-------------
> 2 files changed, 58 insertions(+), 24 deletions(-)
>
> diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h
> index b237e20..4292774 100644
> --- a/include/linux/ring_buffer.h
> +++ b/include/linux/ring_buffer.h
> @@ -22,10 +22,13 @@ struct ring_buffer_event {
> /**
> * enum ring_buffer_type - internal ring buffer types
> *
> - * @RINGBUF_TYPE_PADDING: Left over page padding
> + * @RINGBUF_TYPE_PADDING: Left over page padding or discarded event
> * array is ignored
> - * size is variable depending on how much
> + * If time_delta is 0:
> + * size is variable depending on how much
> * padding is needed
> + * If time_delta is 1:
> + * everything else same as RINGBUF_TYPE_DATA

I would make array not ignored, unless all is zero, then everything
would be ignored. That is, the true padding would just be zeroed out.

> *
> * @RINGBUF_TYPE_TIME_EXTEND: Extend the time delta
> * array[0] = time delta (28 .. 59)
> @@ -69,6 +72,8 @@ ring_buffer_event_time_delta(struct ring_buffer_event *event)
> return event->time_delta;
> }
>
> +void rb_event_discard(struct ring_buffer_event *event);

Note, the "rb_" prefix is only used for static functions since we can get
into name space collisions with rbtree. Functions that are global outside
this file have the "ring_buffer_" prefix.

> +
> /*
> * size is in bytes for each per CPU buffer.
> */
> diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
> index 1d3b07d..c59e910 100644
> --- a/kernel/trace/ring_buffer.c
> +++ b/kernel/trace/ring_buffer.c
> @@ -213,16 +213,50 @@ enum {
> RB_LEN_TIME_STAMP = 16,
> };
>
> -/* inline for ring buffer fast paths */
> +static inline int rb_null_event(struct ring_buffer_event *event)
> +{
> + return event->type == RINGBUF_TYPE_PADDING && event->time_delta == 0;
> +}
> +
> +static inline int rb_discarded_event(struct ring_buffer_event *event)
> +{
> + return event->type == RINGBUF_TYPE_PADDING && event->time_delta == 1;
> +}
> +
> +static void rb_event_set_padding(struct ring_buffer_event *event)
> +{
> + event->type = RINGBUF_TYPE_PADDING;
> + event->time_delta = 0;
> +}

the above seems reasonable, and should work fine.

> +
> +void rb_event_discard(struct ring_buffer_event *event)
> +{
> + event->type = RINGBUF_TYPE_PADDING;
> + event->time_delta = 1;
> +}
> +
> static unsigned
> -rb_event_length(struct ring_buffer_event *event)
> +rb_event_data_length(struct ring_buffer_event *event)
> {
> unsigned length;
>
> + if (event->len)
> + length = event->len * RB_ALIGNMENT;
> + else
> + length = event->array[0];
> + return length + RB_EVNT_HDR_SIZE;
> +}
> +
> +/* inline for ring buffer fast paths */
> +static unsigned
> +rb_event_length(struct ring_buffer_event *event)
> +{
> switch (event->type) {
> case RINGBUF_TYPE_PADDING:
> - /* undefined */
> - return -1;
> + if (rb_null_event(event))
> + /* undefined */
> + return -1;
> + return rb_event_data_length(event);
>
> case RINGBUF_TYPE_TIME_EXTEND:
> return RB_LEN_TIME_EXTEND;
> @@ -231,11 +265,7 @@ rb_event_length(struct ring_buffer_event *event)
> return RB_LEN_TIME_STAMP;
>
> case RINGBUF_TYPE_DATA:
> - if (event->len)
> - length = event->len * RB_ALIGNMENT;
> - else
> - length = event->array[0];
> - return length + RB_EVNT_HDR_SIZE;
> + return rb_event_data_length(event);

This is all reasonable.

> default:
> BUG();
> }
> @@ -828,11 +858,6 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
> }
> EXPORT_SYMBOL_GPL(ring_buffer_resize);
>
> -static inline int rb_null_event(struct ring_buffer_event *event)
> -{
> - return event->type == RINGBUF_TYPE_PADDING;
> -}
> -
> static inline void *
> __rb_data_page_index(struct buffer_data_page *bpage, unsigned index)
> {
> @@ -1203,7 +1228,7 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
> /* Mark the rest of the page with padding */
> event = __rb_page_index(tail_page, tail);
> kmemcheck_annotate_bitfield(event->bitfield);
> - event->type = RINGBUF_TYPE_PADDING;
> + rb_event_set_padding(event);
> }
>
> if (tail <= BUF_PAGE_SIZE)
> @@ -1954,7 +1979,7 @@ static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
>
> event = rb_reader_event(cpu_buffer);
>
> - if (event->type == RINGBUF_TYPE_DATA)
> + if (event->type == RINGBUF_TYPE_DATA || rb_discarded_event(event))
> cpu_buffer->entries--;

Actually, we should decrement the entries counter at time of discard.

>
> rb_update_read_stamp(cpu_buffer, event);
> @@ -2026,8 +2051,8 @@ rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
> * can have. Nesting 10 deep of interrupts is clearly
> * an anomaly.
> */
> - if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10))
> - return NULL;
> +// if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10))
> +// return NULL;

Were you hitting this? If so, we need to investigate.

>
> reader = rb_get_reader_page(cpu_buffer);
> if (!reader)
> @@ -2037,7 +2062,8 @@ rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
>
> switch (event->type) {
> case RINGBUF_TYPE_PADDING:
> - RB_WARN_ON(cpu_buffer, 1);
> + if (rb_null_event(event))
> + RB_WARN_ON(cpu_buffer, 1);
> rb_advance_reader(cpu_buffer);
> return NULL;

Actually, we want to simply skip over the discarded event. This will cause
the above loops to hit more often. We should make the warning count be
BUFFER_PAGE_SIZE, if we hit it that many times, we know we are in trouble.

In other words, we do not want to return NULL, but we want to goto again.


>
> @@ -2089,8 +2115,8 @@ rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
> * can have. Nesting 10 deep of interrupts is clearly
> * an anomaly.
> */
> - if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10))
> - return NULL;
> +// if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10))
> +// return NULL;
>
> if (rb_per_cpu_empty(cpu_buffer))
> return NULL;
> @@ -2099,7 +2125,10 @@ rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
>
> switch (event->type) {
> case RINGBUF_TYPE_PADDING:
> - rb_inc_iter(iter);
> + if (rb_null_event(event))
> + rb_inc_iter(iter);
> + else
> + rb_advance_iter(iter);
> goto again;

This should work.

>
> case RINGBUF_TYPE_TIME_EXTEND:
> --
> 1.5.6.3

I'll take a look at your patch and see if I can get it to work.

Thanks,

-- Steve

2009-03-18 02:21:55

by Steven Rostedt

[permalink] [raw]
Subject: Re: [RFC][PATCH 2/4] tracing: add rb_event_discard() hack


Tom,

Here's an updated patch, could you see if this works for you?

Thanks,

-- Steve


>From [email protected] Tue Mar 17 02:30:09 2009
Date: Tue, 17 Mar 2009 01:23:38 -0500
From: Tom Zanussi <[email protected]>
Subject: [RFC][PATCH 2/4] tracing: add rb_event_discard() hack

This patch overloads RINGBUF_TYPE_PADDING to provide a way to discard
events from the ring buffer, for the event-filtering mechanism
introduced in a subsequent patch.

NOTE: this patch is a true hack and will likely either eventually crash
your machine or result in an endless loop of printed events. But it
sometimes works, enough to show that the filtering apparently does what
it should.

I'm posting it only to solicit comments from any ringbuffer gurus who
might know know how to do it correctly, or more likely could suggest a
better approach.

[
Modifications to fix timestamps and handle ring buffer peeking.
- Steven Rostedt
]

Signed-off-by: Tom Zanussi <[email protected]>
Signed-off-by: Steven Rostedt <[email protected]>

---
include/linux/ring_buffer.h | 11 +++-
kernel/trace/ring_buffer.c | 117 ++++++++++++++++++++++++++++++++++++--------
2 files changed, 105 insertions(+), 23 deletions(-)

Index: linux-trace.git/include/linux/ring_buffer.h
===================================================================
--- linux-trace.git.orig/include/linux/ring_buffer.h 2009-03-17 13:38:41.000000000 -0400
+++ linux-trace.git/include/linux/ring_buffer.h 2009-03-17 21:44:36.000000000 -0400
@@ -18,10 +18,13 @@ struct ring_buffer_event {
/**
* enum ring_buffer_type - internal ring buffer types
*
- * @RINGBUF_TYPE_PADDING: Left over page padding
- * array is ignored
- * size is variable depending on how much
+ * @RINGBUF_TYPE_PADDING: Left over page padding or discarded event
+ * If time_delta is 0:
+ * array is ignored
+ * size is variable depending on how much
* padding is needed
+ * If time_delta is non zero:
+ * everything else same as RINGBUF_TYPE_DATA
*
* @RINGBUF_TYPE_TIME_EXTEND: Extend the time delta
* array[0] = time delta (28 .. 59)
@@ -65,6 +68,8 @@ ring_buffer_event_time_delta(struct ring
return event->time_delta;
}

+void ring_buffer_event_discard(struct ring_buffer_event *event);
+
/*
* size is in bytes for each per CPU buffer.
*/
Index: linux-trace.git/kernel/trace/ring_buffer.c
===================================================================
--- linux-trace.git.orig/kernel/trace/ring_buffer.c 2009-03-17 13:49:03.000000000 -0400
+++ linux-trace.git/kernel/trace/ring_buffer.c 2009-03-17 22:10:23.000000000 -0400
@@ -189,16 +189,65 @@ enum {
RB_LEN_TIME_STAMP = 16,
};

-/* inline for ring buffer fast paths */
+static inline int rb_null_event(struct ring_buffer_event *event)
+{
+ return event->type == RINGBUF_TYPE_PADDING && event->time_delta == 0;
+}
+
+static inline int rb_discarded_event(struct ring_buffer_event *event)
+{
+ return event->type == RINGBUF_TYPE_PADDING && event->time_delta;
+}
+
+static void rb_event_set_padding(struct ring_buffer_event *event)
+{
+ event->type = RINGBUF_TYPE_PADDING;
+ event->time_delta = 0;
+}
+
+/**
+ * ring_buffer_event_discard - discard an event in the ring buffer
+ * @buffer: the ring buffer
+ * @event: the event to discard
+ *
+ * Sometimes a event that is in the ring buffer needs to be ignored.
+ * This function lets the user discard an event in the ring buffer
+ * and then that event will not be read later.
+ *
+ * Note, it is up to the user to be careful with this, and protect
+ * against races. If the user discards an event that has been consumed
+ * it is possible that it could corrupt the ring buffer.
+ */
+void ring_buffer_event_discard(struct ring_buffer_event *event)
+{
+ event->type = RINGBUF_TYPE_PADDING;
+ /* time delta must be non zero */
+ if (!event->time_delta)
+ event->time_delta = 1;
+}
+
static unsigned
-rb_event_length(struct ring_buffer_event *event)
+rb_event_data_length(struct ring_buffer_event *event)
{
unsigned length;

+ if (event->len)
+ length = event->len * RB_ALIGNMENT;
+ else
+ length = event->array[0];
+ return length + RB_EVNT_HDR_SIZE;
+}
+
+/* inline for ring buffer fast paths */
+static unsigned
+rb_event_length(struct ring_buffer_event *event)
+{
switch (event->type) {
case RINGBUF_TYPE_PADDING:
- /* undefined */
- return -1;
+ if (rb_null_event(event))
+ /* undefined */
+ return -1;
+ return rb_event_data_length(event);

case RINGBUF_TYPE_TIME_EXTEND:
return RB_LEN_TIME_EXTEND;
@@ -207,11 +256,7 @@ rb_event_length(struct ring_buffer_event
return RB_LEN_TIME_STAMP;

case RINGBUF_TYPE_DATA:
- if (event->len)
- length = event->len * RB_ALIGNMENT;
- else
- length = event->array[0];
- return length + RB_EVNT_HDR_SIZE;
+ return rb_event_data_length(event);
default:
BUG();
}
@@ -836,11 +881,6 @@ int ring_buffer_resize(struct ring_buffe
}
EXPORT_SYMBOL_GPL(ring_buffer_resize);

-static inline int rb_null_event(struct ring_buffer_event *event)
-{
- return event->type == RINGBUF_TYPE_PADDING;
-}
-
static inline void *
__rb_data_page_index(struct buffer_data_page *bpage, unsigned index)
{
@@ -1210,7 +1250,7 @@ __rb_reserve_next(struct ring_buffer_per
if (tail < BUF_PAGE_SIZE) {
/* Mark the rest of the page with padding */
event = __rb_page_index(tail_page, tail);
- event->type = RINGBUF_TYPE_PADDING;
+ rb_event_set_padding(event);
}

if (tail <= BUF_PAGE_SIZE)
@@ -1960,7 +2000,7 @@ static void rb_advance_reader(struct rin

event = rb_reader_event(cpu_buffer);

- if (event->type == RINGBUF_TYPE_DATA)
+ if (event->type == RINGBUF_TYPE_DATA || rb_discarded_event(event))
cpu_buffer->entries--;

rb_update_read_stamp(cpu_buffer, event);
@@ -2043,9 +2083,18 @@ rb_buffer_peek(struct ring_buffer *buffe

switch (event->type) {
case RINGBUF_TYPE_PADDING:
- RB_WARN_ON(cpu_buffer, 1);
+ if (rb_null_event(event))
+ RB_WARN_ON(cpu_buffer, 1);
+ /*
+ * Because the writer could be discarding every
+ * event it creates (which would probably be bad)
+ * if we were to go back to "again" then we may never
+ * catch up, and will trigger the warn on, or lock
+ * the box. Return the padding, and we will release
+ * the current locks, and try again.
+ */
rb_advance_reader(cpu_buffer);
- return NULL;
+ return event;

case RINGBUF_TYPE_TIME_EXTEND:
/* Internal data, OK to advance */
@@ -2106,8 +2155,12 @@ rb_iter_peek(struct ring_buffer_iter *it

switch (event->type) {
case RINGBUF_TYPE_PADDING:
- rb_inc_iter(iter);
- goto again;
+ if (rb_null_event(event)) {
+ rb_inc_iter(iter);
+ goto again;
+ }
+ rb_advance_iter(iter);
+ return event;

case RINGBUF_TYPE_TIME_EXTEND:
/* Internal data, OK to advance */
@@ -2154,10 +2207,16 @@ ring_buffer_peek(struct ring_buffer *buf
if (!cpumask_test_cpu(cpu, buffer->cpumask))
return NULL;

+ again:
spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
event = rb_buffer_peek(buffer, cpu, ts);
spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);

+ if (event && event->type == RINGBUF_TYPE_PADDING) {
+ cpu_relax();
+ goto again;
+ }
+
return event;
}

@@ -2176,10 +2235,16 @@ ring_buffer_iter_peek(struct ring_buffer
struct ring_buffer_event *event;
unsigned long flags;

+ again:
spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
event = rb_iter_peek(iter, ts);
spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);

+ if (event && event->type == RINGBUF_TYPE_PADDING) {
+ cpu_relax();
+ goto again;
+ }
+
return event;
}

@@ -2198,6 +2263,7 @@ ring_buffer_consume(struct ring_buffer *
struct ring_buffer_event *event = NULL;
unsigned long flags;

+ again:
/* might be called in atomic */
preempt_disable();

@@ -2219,6 +2285,11 @@ ring_buffer_consume(struct ring_buffer *
out:
preempt_enable();

+ if (event && event->type == RINGBUF_TYPE_PADDING) {
+ cpu_relax();
+ goto again;
+ }
+
return event;
}
EXPORT_SYMBOL_GPL(ring_buffer_consume);
@@ -2297,6 +2368,7 @@ ring_buffer_read(struct ring_buffer_iter
struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
unsigned long flags;

+ again:
spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
event = rb_iter_peek(iter, ts);
if (!event)
@@ -2306,6 +2378,11 @@ ring_buffer_read(struct ring_buffer_iter
out:
spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);

+ if (event && event->type == RINGBUF_TYPE_PADDING) {
+ cpu_relax();
+ goto again;
+ }
+
return event;
}
EXPORT_SYMBOL_GPL(ring_buffer_read);

2009-03-18 06:02:10

by Tom Zanussi

[permalink] [raw]
Subject: Re: [RFC][PATCH 2/4] tracing: add rb_event_discard() hack


On Tue, 2009-03-17 at 22:21 -0400, Steven Rostedt wrote:
> Tom,
>
> Here's an updated patch, could you see if this works for you?

That seems to have done the trick - I did some quick testing and I'm not
seeing the problems I was with my version. Thanks!

I'll be pounding on it some more when I update the other patches in the
next iteration...

Tom

>
> Thanks,
>
> -- Steve
>
>
> From [email protected] Tue Mar 17 02:30:09 2009
> Date: Tue, 17 Mar 2009 01:23:38 -0500
> From: Tom Zanussi <[email protected]>
> Subject: [RFC][PATCH 2/4] tracing: add rb_event_discard() hack
>
> This patch overloads RINGBUF_TYPE_PADDING to provide a way to discard
> events from the ring buffer, for the event-filtering mechanism
> introduced in a subsequent patch.
>
> NOTE: this patch is a true hack and will likely either eventually crash
> your machine or result in an endless loop of printed events. But it
> sometimes works, enough to show that the filtering apparently does what
> it should.
>
> I'm posting it only to solicit comments from any ringbuffer gurus who
> might know know how to do it correctly, or more likely could suggest a
> better approach.
>
> [
> Modifications to fix timestamps and handle ring buffer peeking.
> - Steven Rostedt
> ]
>
> Signed-off-by: Tom Zanussi <[email protected]>
> Signed-off-by: Steven Rostedt <[email protected]>
>
> ---
> include/linux/ring_buffer.h | 11 +++-
> kernel/trace/ring_buffer.c | 117 ++++++++++++++++++++++++++++++++++++--------
> 2 files changed, 105 insertions(+), 23 deletions(-)
>
> Index: linux-trace.git/include/linux/ring_buffer.h
> ===================================================================
> --- linux-trace.git.orig/include/linux/ring_buffer.h 2009-03-17 13:38:41.000000000 -0400
> +++ linux-trace.git/include/linux/ring_buffer.h 2009-03-17 21:44:36.000000000 -0400
> @@ -18,10 +18,13 @@ struct ring_buffer_event {
> /**
> * enum ring_buffer_type - internal ring buffer types
> *
> - * @RINGBUF_TYPE_PADDING: Left over page padding
> - * array is ignored
> - * size is variable depending on how much
> + * @RINGBUF_TYPE_PADDING: Left over page padding or discarded event
> + * If time_delta is 0:
> + * array is ignored
> + * size is variable depending on how much
> * padding is needed
> + * If time_delta is non zero:
> + * everything else same as RINGBUF_TYPE_DATA
> *
> * @RINGBUF_TYPE_TIME_EXTEND: Extend the time delta
> * array[0] = time delta (28 .. 59)
> @@ -65,6 +68,8 @@ ring_buffer_event_time_delta(struct ring
> return event->time_delta;
> }
>
> +void ring_buffer_event_discard(struct ring_buffer_event *event);
> +
> /*
> * size is in bytes for each per CPU buffer.
> */
> Index: linux-trace.git/kernel/trace/ring_buffer.c
> ===================================================================
> --- linux-trace.git.orig/kernel/trace/ring_buffer.c 2009-03-17 13:49:03.000000000 -0400
> +++ linux-trace.git/kernel/trace/ring_buffer.c 2009-03-17 22:10:23.000000000 -0400
> @@ -189,16 +189,65 @@ enum {
> RB_LEN_TIME_STAMP = 16,
> };
>
> -/* inline for ring buffer fast paths */
> +static inline int rb_null_event(struct ring_buffer_event *event)
> +{
> + return event->type == RINGBUF_TYPE_PADDING && event->time_delta == 0;
> +}
> +
> +static inline int rb_discarded_event(struct ring_buffer_event *event)
> +{
> + return event->type == RINGBUF_TYPE_PADDING && event->time_delta;
> +}
> +
> +static void rb_event_set_padding(struct ring_buffer_event *event)
> +{
> + event->type = RINGBUF_TYPE_PADDING;
> + event->time_delta = 0;
> +}
> +
> +/**
> + * ring_buffer_event_discard - discard an event in the ring buffer
> + * @buffer: the ring buffer
> + * @event: the event to discard
> + *
> + * Sometimes a event that is in the ring buffer needs to be ignored.
> + * This function lets the user discard an event in the ring buffer
> + * and then that event will not be read later.
> + *
> + * Note, it is up to the user to be careful with this, and protect
> + * against races. If the user discards an event that has been consumed
> + * it is possible that it could corrupt the ring buffer.
> + */
> +void ring_buffer_event_discard(struct ring_buffer_event *event)
> +{
> + event->type = RINGBUF_TYPE_PADDING;
> + /* time delta must be non zero */
> + if (!event->time_delta)
> + event->time_delta = 1;
> +}
> +
> static unsigned
> -rb_event_length(struct ring_buffer_event *event)
> +rb_event_data_length(struct ring_buffer_event *event)
> {
> unsigned length;
>
> + if (event->len)
> + length = event->len * RB_ALIGNMENT;
> + else
> + length = event->array[0];
> + return length + RB_EVNT_HDR_SIZE;
> +}
> +
> +/* inline for ring buffer fast paths */
> +static unsigned
> +rb_event_length(struct ring_buffer_event *event)
> +{
> switch (event->type) {
> case RINGBUF_TYPE_PADDING:
> - /* undefined */
> - return -1;
> + if (rb_null_event(event))
> + /* undefined */
> + return -1;
> + return rb_event_data_length(event);
>
> case RINGBUF_TYPE_TIME_EXTEND:
> return RB_LEN_TIME_EXTEND;
> @@ -207,11 +256,7 @@ rb_event_length(struct ring_buffer_event
> return RB_LEN_TIME_STAMP;
>
> case RINGBUF_TYPE_DATA:
> - if (event->len)
> - length = event->len * RB_ALIGNMENT;
> - else
> - length = event->array[0];
> - return length + RB_EVNT_HDR_SIZE;
> + return rb_event_data_length(event);
> default:
> BUG();
> }
> @@ -836,11 +881,6 @@ int ring_buffer_resize(struct ring_buffe
> }
> EXPORT_SYMBOL_GPL(ring_buffer_resize);
>
> -static inline int rb_null_event(struct ring_buffer_event *event)
> -{
> - return event->type == RINGBUF_TYPE_PADDING;
> -}
> -
> static inline void *
> __rb_data_page_index(struct buffer_data_page *bpage, unsigned index)
> {
> @@ -1210,7 +1250,7 @@ __rb_reserve_next(struct ring_buffer_per
> if (tail < BUF_PAGE_SIZE) {
> /* Mark the rest of the page with padding */
> event = __rb_page_index(tail_page, tail);
> - event->type = RINGBUF_TYPE_PADDING;
> + rb_event_set_padding(event);
> }
>
> if (tail <= BUF_PAGE_SIZE)
> @@ -1960,7 +2000,7 @@ static void rb_advance_reader(struct rin
>
> event = rb_reader_event(cpu_buffer);
>
> - if (event->type == RINGBUF_TYPE_DATA)
> + if (event->type == RINGBUF_TYPE_DATA || rb_discarded_event(event))
> cpu_buffer->entries--;
>
> rb_update_read_stamp(cpu_buffer, event);
> @@ -2043,9 +2083,18 @@ rb_buffer_peek(struct ring_buffer *buffe
>
> switch (event->type) {
> case RINGBUF_TYPE_PADDING:
> - RB_WARN_ON(cpu_buffer, 1);
> + if (rb_null_event(event))
> + RB_WARN_ON(cpu_buffer, 1);
> + /*
> + * Because the writer could be discarding every
> + * event it creates (which would probably be bad)
> + * if we were to go back to "again" then we may never
> + * catch up, and will trigger the warn on, or lock
> + * the box. Return the padding, and we will release
> + * the current locks, and try again.
> + */
> rb_advance_reader(cpu_buffer);
> - return NULL;
> + return event;
>
> case RINGBUF_TYPE_TIME_EXTEND:
> /* Internal data, OK to advance */
> @@ -2106,8 +2155,12 @@ rb_iter_peek(struct ring_buffer_iter *it
>
> switch (event->type) {
> case RINGBUF_TYPE_PADDING:
> - rb_inc_iter(iter);
> - goto again;
> + if (rb_null_event(event)) {
> + rb_inc_iter(iter);
> + goto again;
> + }
> + rb_advance_iter(iter);
> + return event;
>
> case RINGBUF_TYPE_TIME_EXTEND:
> /* Internal data, OK to advance */
> @@ -2154,10 +2207,16 @@ ring_buffer_peek(struct ring_buffer *buf
> if (!cpumask_test_cpu(cpu, buffer->cpumask))
> return NULL;
>
> + again:
> spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
> event = rb_buffer_peek(buffer, cpu, ts);
> spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
>
> + if (event && event->type == RINGBUF_TYPE_PADDING) {
> + cpu_relax();
> + goto again;
> + }
> +
> return event;
> }
>
> @@ -2176,10 +2235,16 @@ ring_buffer_iter_peek(struct ring_buffer
> struct ring_buffer_event *event;
> unsigned long flags;
>
> + again:
> spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
> event = rb_iter_peek(iter, ts);
> spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
>
> + if (event && event->type == RINGBUF_TYPE_PADDING) {
> + cpu_relax();
> + goto again;
> + }
> +
> return event;
> }
>
> @@ -2198,6 +2263,7 @@ ring_buffer_consume(struct ring_buffer *
> struct ring_buffer_event *event = NULL;
> unsigned long flags;
>
> + again:
> /* might be called in atomic */
> preempt_disable();
>
> @@ -2219,6 +2285,11 @@ ring_buffer_consume(struct ring_buffer *
> out:
> preempt_enable();
>
> + if (event && event->type == RINGBUF_TYPE_PADDING) {
> + cpu_relax();
> + goto again;
> + }
> +
> return event;
> }
> EXPORT_SYMBOL_GPL(ring_buffer_consume);
> @@ -2297,6 +2368,7 @@ ring_buffer_read(struct ring_buffer_iter
> struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
> unsigned long flags;
>
> + again:
> spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
> event = rb_iter_peek(iter, ts);
> if (!event)
> @@ -2306,6 +2378,11 @@ ring_buffer_read(struct ring_buffer_iter
> out:
> spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
>
> + if (event && event->type == RINGBUF_TYPE_PADDING) {
> + cpu_relax();
> + goto again;
> + }
> +
> return event;
> }
> EXPORT_SYMBOL_GPL(ring_buffer_read);
>
>

2009-03-18 06:12:20

by Tom Zanussi

[permalink] [raw]
Subject: Re: [RFC][PATCH 2/4] tracing: add rb_event_discard() hack


On Tue, 2009-03-17 at 21:21 -0400, Steven Rostedt wrote:
> On Tue, 17 Mar 2009, Tom Zanussi wrote:

> >
> > rb_update_read_stamp(cpu_buffer, event);
> > @@ -2026,8 +2051,8 @@ rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
> > * can have. Nesting 10 deep of interrupts is clearly
> > * an anomaly.
> > */
> > - if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10))
> > - return NULL;
> > +// if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10))
> > +// return NULL;
>
> Were you hitting this? If so, we need to investigate.
>

I was hitting this, but only because my code would always loop around to
'again' for a discarded event. So, 10 discarded events in a row would
trigger it.

I noticed that your fix returns instead, so it's no longer a problem.

Thanks,

Tom