2009-03-22 08:31:28

by Tom Zanussi

[permalink] [raw]
Subject: [PATCH 2/4] tracing: add ring_buffer_event_discard() to ring buffer

This patch overloads RINGBUF_TYPE_PADDING to provide a way to discard
events from the ring buffer, for the event-filtering mechanism
introduced in a subsequent patch.

I did the initial version but thanks to Steven Rostedt for adding the
parts that actually made it work. ;-)

---
include/linux/ring_buffer.h | 11 +++-
kernel/trace/ring_buffer.c | 117 +++++++++++++++++++++++++++++++++++-------
2 files changed, 105 insertions(+), 23 deletions(-)

diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h
index e20b886..ae5b210 100644
--- a/include/linux/ring_buffer.h
+++ b/include/linux/ring_buffer.h
@@ -22,10 +22,13 @@ struct ring_buffer_event {
/**
* enum ring_buffer_type - internal ring buffer types
*
- * @RINGBUF_TYPE_PADDING: Left over page padding
- * array is ignored
- * size is variable depending on how much
+ * @RINGBUF_TYPE_PADDING: Left over page padding or discarded event
+ * If time_delta is 0:
+ * array is ignored
+ * size is variable depending on how much
* padding is needed
+ * If time_delta is non zero:
+ * everything else same as RINGBUF_TYPE_DATA
*
* @RINGBUF_TYPE_TIME_EXTEND: Extend the time delta
* array[0] = time delta (28 .. 59)
@@ -69,6 +72,8 @@ ring_buffer_event_time_delta(struct ring_buffer_event *event)
return event->time_delta;
}

+void ring_buffer_event_discard(struct ring_buffer_event *event);
+
/*
* size is in bytes for each per CPU buffer.
*/
diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index 7e05e2c..5cd2342 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -190,16 +190,65 @@ enum {
RB_LEN_TIME_STAMP = 16,
};

-/* inline for ring buffer fast paths */
+static inline int rb_null_event(struct ring_buffer_event *event)
+{
+ return event->type == RINGBUF_TYPE_PADDING && event->time_delta == 0;
+}
+
+static inline int rb_discarded_event(struct ring_buffer_event *event)
+{
+ return event->type == RINGBUF_TYPE_PADDING && event->time_delta;
+}
+
+static void rb_event_set_padding(struct ring_buffer_event *event)
+{
+ event->type = RINGBUF_TYPE_PADDING;
+ event->time_delta = 0;
+}
+
+/**
+ * ring_buffer_event_discard - discard an event in the ring buffer
+ * @buffer: the ring buffer
+ * @event: the event to discard
+ *
+ * Sometimes a event that is in the ring buffer needs to be ignored.
+ * This function lets the user discard an event in the ring buffer
+ * and then that event will not be read later.
+ *
+ * Note, it is up to the user to be careful with this, and protect
+ * against races. If the user discards an event that has been consumed
+ * it is possible that it could corrupt the ring buffer.
+ */
+void ring_buffer_event_discard(struct ring_buffer_event *event)
+{
+ event->type = RINGBUF_TYPE_PADDING;
+ /* time delta must be non zero */
+ if (!event->time_delta)
+ event->time_delta = 1;
+}
+
static unsigned
-rb_event_length(struct ring_buffer_event *event)
+rb_event_data_length(struct ring_buffer_event *event)
{
unsigned length;

+ if (event->len)
+ length = event->len * RB_ALIGNMENT;
+ else
+ length = event->array[0];
+ return length + RB_EVNT_HDR_SIZE;
+}
+
+/* inline for ring buffer fast paths */
+static unsigned
+rb_event_length(struct ring_buffer_event *event)
+{
switch (event->type) {
case RINGBUF_TYPE_PADDING:
- /* undefined */
- return -1;
+ if (rb_null_event(event))
+ /* undefined */
+ return -1;
+ return rb_event_data_length(event);

case RINGBUF_TYPE_TIME_EXTEND:
return RB_LEN_TIME_EXTEND;
@@ -208,11 +257,7 @@ rb_event_length(struct ring_buffer_event *event)
return RB_LEN_TIME_STAMP;

case RINGBUF_TYPE_DATA:
- if (event->len)
- length = event->len * RB_ALIGNMENT;
- else
- length = event->array[0];
- return length + RB_EVNT_HDR_SIZE;
+ return rb_event_data_length(event);
default:
BUG();
}
@@ -846,11 +891,6 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
}
EXPORT_SYMBOL_GPL(ring_buffer_resize);

-static inline int rb_null_event(struct ring_buffer_event *event)
-{
- return event->type == RINGBUF_TYPE_PADDING;
-}
-
static inline void *
__rb_data_page_index(struct buffer_data_page *bpage, unsigned index)
{
@@ -1221,7 +1261,7 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
/* Mark the rest of the page with padding */
event = __rb_page_index(tail_page, tail);
kmemcheck_annotate_bitfield(event->bitfield);
- event->type = RINGBUF_TYPE_PADDING;
+ rb_event_set_padding(event);
}

if (tail <= BUF_PAGE_SIZE)
@@ -1972,7 +2012,7 @@ static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)

event = rb_reader_event(cpu_buffer);

- if (event->type == RINGBUF_TYPE_DATA)
+ if (event->type == RINGBUF_TYPE_DATA || rb_discarded_event(event))
cpu_buffer->entries--;

rb_update_read_stamp(cpu_buffer, event);
@@ -2055,9 +2095,18 @@ rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)

switch (event->type) {
case RINGBUF_TYPE_PADDING:
- RB_WARN_ON(cpu_buffer, 1);
+ if (rb_null_event(event))
+ RB_WARN_ON(cpu_buffer, 1);
+ /*
+ * Because the writer could be discarding every
+ * event it creates (which would probably be bad)
+ * if we were to go back to "again" then we may never
+ * catch up, and will trigger the warn on, or lock
+ * the box. Return the padding, and we will release
+ * the current locks, and try again.
+ */
rb_advance_reader(cpu_buffer);
- return NULL;
+ return event;

case RINGBUF_TYPE_TIME_EXTEND:
/* Internal data, OK to advance */
@@ -2118,8 +2167,12 @@ rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)

switch (event->type) {
case RINGBUF_TYPE_PADDING:
- rb_inc_iter(iter);
- goto again;
+ if (rb_null_event(event)) {
+ rb_inc_iter(iter);
+ goto again;
+ }
+ rb_advance_iter(iter);
+ return event;

case RINGBUF_TYPE_TIME_EXTEND:
/* Internal data, OK to advance */
@@ -2166,10 +2219,16 @@ ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
if (!cpumask_test_cpu(cpu, buffer->cpumask))
return NULL;

+ again:
spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
event = rb_buffer_peek(buffer, cpu, ts);
spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);

+ if (event && event->type == RINGBUF_TYPE_PADDING) {
+ cpu_relax();
+ goto again;
+ }
+
return event;
}

@@ -2188,10 +2247,16 @@ ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
struct ring_buffer_event *event;
unsigned long flags;

+ again:
spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
event = rb_iter_peek(iter, ts);
spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);

+ if (event && event->type == RINGBUF_TYPE_PADDING) {
+ cpu_relax();
+ goto again;
+ }
+
return event;
}

@@ -2210,6 +2275,7 @@ ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
struct ring_buffer_event *event = NULL;
unsigned long flags;

+ again:
/* might be called in atomic */
preempt_disable();

@@ -2231,6 +2297,11 @@ ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
out:
preempt_enable();

+ if (event && event->type == RINGBUF_TYPE_PADDING) {
+ cpu_relax();
+ goto again;
+ }
+
return event;
}
EXPORT_SYMBOL_GPL(ring_buffer_consume);
@@ -2309,6 +2380,7 @@ ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
unsigned long flags;

+ again:
spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
event = rb_iter_peek(iter, ts);
if (!event)
@@ -2318,6 +2390,11 @@ ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
out:
spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);

+ if (event && event->type == RINGBUF_TYPE_PADDING) {
+ cpu_relax();
+ goto again;
+ }
+
return event;
}
EXPORT_SYMBOL_GPL(ring_buffer_read);
--
1.5.6.3



2009-03-22 14:15:15

by Ingo Molnar

[permalink] [raw]
Subject: Re: [PATCH 2/4] tracing: add ring_buffer_event_discard() to ring buffer


* Tom Zanussi <[email protected]> wrote:

> This patch overloads RINGBUF_TYPE_PADDING to provide a way to discard
> events from the ring buffer, for the event-filtering mechanism
> introduced in a subsequent patch.
>
> I did the initial version but thanks to Steven Rostedt for adding the
> parts that actually made it work. ;-)

I suspect it's signed-off-by both of you?

Ingo

2009-03-22 17:25:23

by Tom Zanussi

[permalink] [raw]
Subject: Re: [PATCH 2/4] tracing: add ring_buffer_event_discard() to ring buffer


On Sun, 2009-03-22 at 15:14 +0100, Ingo Molnar wrote:
> * Tom Zanussi <[email protected]> wrote:
>
> > This patch overloads RINGBUF_TYPE_PADDING to provide a way to discard
> > events from the ring buffer, for the event-filtering mechanism
> > introduced in a subsequent patch.
> >
> > I did the initial version but thanks to Steven Rostedt for adding the
> > parts that actually made it work. ;-)
>
> I suspect it's signed-off-by both of you?
>

Oops, yeah, here's mine:

Signed-off-by: Tom Zanussi <[email protected]>


> Ingo

2009-03-22 17:59:20

by Frederic Weisbecker

[permalink] [raw]
Subject: Re: [PATCH 2/4] tracing: add ring_buffer_event_discard() to ring buffer

On Sun, Mar 22, 2009 at 03:30:49AM -0500, Tom Zanussi wrote:
> This patch overloads RINGBUF_TYPE_PADDING to provide a way to discard
> events from the ring buffer, for the event-filtering mechanism
> introduced in a subsequent patch.
>
> I did the initial version but thanks to Steven Rostedt for adding the
> parts that actually made it work. ;-)
>
> ---
> include/linux/ring_buffer.h | 11 +++-
> kernel/trace/ring_buffer.c | 117 +++++++++++++++++++++++++++++++++++-------
> 2 files changed, 105 insertions(+), 23 deletions(-)
>
> diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h
> index e20b886..ae5b210 100644
> --- a/include/linux/ring_buffer.h
> +++ b/include/linux/ring_buffer.h
> @@ -22,10 +22,13 @@ struct ring_buffer_event {
> /**
> * enum ring_buffer_type - internal ring buffer types
> *
> - * @RINGBUF_TYPE_PADDING: Left over page padding
> - * array is ignored
> - * size is variable depending on how much
> + * @RINGBUF_TYPE_PADDING: Left over page padding or discarded event
> + * If time_delta is 0:
> + * array is ignored
> + * size is variable depending on how much
> * padding is needed
> + * If time_delta is non zero:
> + * everything else same as RINGBUF_TYPE_DATA
> *
> * @RINGBUF_TYPE_TIME_EXTEND: Extend the time delta
> * array[0] = time delta (28 .. 59)
> @@ -69,6 +72,8 @@ ring_buffer_event_time_delta(struct ring_buffer_event *event)
> return event->time_delta;
> }
>
> +void ring_buffer_event_discard(struct ring_buffer_event *event);
> +
> /*
> * size is in bytes for each per CPU buffer.
> */
> diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
> index 7e05e2c..5cd2342 100644
> --- a/kernel/trace/ring_buffer.c
> +++ b/kernel/trace/ring_buffer.c
> @@ -190,16 +190,65 @@ enum {
> RB_LEN_TIME_STAMP = 16,
> };
>
> -/* inline for ring buffer fast paths */
> +static inline int rb_null_event(struct ring_buffer_event *event)
> +{
> + return event->type == RINGBUF_TYPE_PADDING && event->time_delta == 0;
> +}
> +
> +static inline int rb_discarded_event(struct ring_buffer_event *event)
> +{
> + return event->type == RINGBUF_TYPE_PADDING && event->time_delta;
> +}
> +
> +static void rb_event_set_padding(struct ring_buffer_event *event)
> +{
> + event->type = RINGBUF_TYPE_PADDING;
> + event->time_delta = 0;
> +}
> +
> +/**
> + * ring_buffer_event_discard - discard an event in the ring buffer
> + * @buffer: the ring buffer
> + * @event: the event to discard
> + *
> + * Sometimes a event that is in the ring buffer needs to be ignored.
> + * This function lets the user discard an event in the ring buffer
> + * and then that event will not be read later.
> + *
> + * Note, it is up to the user to be careful with this, and protect
> + * against races. If the user discards an event that has been consumed
> + * it is possible that it could corrupt the ring buffer.
> + */
> +void ring_buffer_event_discard(struct ring_buffer_event *event)
> +{
> + event->type = RINGBUF_TYPE_PADDING;
> + /* time delta must be non zero */
> + if (!event->time_delta)
> + event->time_delta = 1;
> +}
> +
> static unsigned
> -rb_event_length(struct ring_buffer_event *event)
> +rb_event_data_length(struct ring_buffer_event *event)
> {
> unsigned length;
>
> + if (event->len)
> + length = event->len * RB_ALIGNMENT;
> + else
> + length = event->array[0];
> + return length + RB_EVNT_HDR_SIZE;
> +}
> +
> +/* inline for ring buffer fast paths */


I don't see an inline here.


> +static unsigned
> +rb_event_length(struct ring_buffer_event *event)
> +{
> switch (event->type) {
> case RINGBUF_TYPE_PADDING:
> - /* undefined */
> - return -1;
> + if (rb_null_event(event))
> + /* undefined */
> + return -1;
> + return rb_event_data_length(event);
>
> case RINGBUF_TYPE_TIME_EXTEND:
> return RB_LEN_TIME_EXTEND;
> @@ -208,11 +257,7 @@ rb_event_length(struct ring_buffer_event *event)
> return RB_LEN_TIME_STAMP;
>
> case RINGBUF_TYPE_DATA:
> - if (event->len)
> - length = event->len * RB_ALIGNMENT;
> - else
> - length = event->array[0];
> - return length + RB_EVNT_HDR_SIZE;
> + return rb_event_data_length(event);
> default:
> BUG();
> }
> @@ -846,11 +891,6 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
> }
> EXPORT_SYMBOL_GPL(ring_buffer_resize);
>
> -static inline int rb_null_event(struct ring_buffer_event *event)
> -{
> - return event->type == RINGBUF_TYPE_PADDING;
> -}
> -
> static inline void *
> __rb_data_page_index(struct buffer_data_page *bpage, unsigned index)
> {
> @@ -1221,7 +1261,7 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
> /* Mark the rest of the page with padding */
> event = __rb_page_index(tail_page, tail);
> kmemcheck_annotate_bitfield(event->bitfield);
> - event->type = RINGBUF_TYPE_PADDING;
> + rb_event_set_padding(event);
> }
>
> if (tail <= BUF_PAGE_SIZE)
> @@ -1972,7 +2012,7 @@ static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
>
> event = rb_reader_event(cpu_buffer);
>
> - if (event->type == RINGBUF_TYPE_DATA)
> + if (event->type == RINGBUF_TYPE_DATA || rb_discarded_event(event))
> cpu_buffer->entries--;
>
> rb_update_read_stamp(cpu_buffer, event);
> @@ -2055,9 +2095,18 @@ rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
>
> switch (event->type) {
> case RINGBUF_TYPE_PADDING:
> - RB_WARN_ON(cpu_buffer, 1);
> + if (rb_null_event(event))
> + RB_WARN_ON(cpu_buffer, 1);
> + /*
> + * Because the writer could be discarding every
> + * event it creates (which would probably be bad)
> + * if we were to go back to "again" then we may never
> + * catch up, and will trigger the warn on, or lock
> + * the box. Return the padding, and we will release
> + * the current locks, and try again.
> + */
> rb_advance_reader(cpu_buffer);
> - return NULL;
> + return event;
>
> case RINGBUF_TYPE_TIME_EXTEND:
> /* Internal data, OK to advance */
> @@ -2118,8 +2167,12 @@ rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
>
> switch (event->type) {
> case RINGBUF_TYPE_PADDING:
> - rb_inc_iter(iter);
> - goto again;
> + if (rb_null_event(event)) {
> + rb_inc_iter(iter);
> + goto again;
> + }
> + rb_advance_iter(iter);
> + return event;
>
> case RINGBUF_TYPE_TIME_EXTEND:
> /* Internal data, OK to advance */
> @@ -2166,10 +2219,16 @@ ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
> if (!cpumask_test_cpu(cpu, buffer->cpumask))
> return NULL;
>
> + again:
> spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
> event = rb_buffer_peek(buffer, cpu, ts);
> spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
>
> + if (event && event->type == RINGBUF_TYPE_PADDING) {
> + cpu_relax();
> + goto again;
> + }
> +
> return event;
> }
>
> @@ -2188,10 +2247,16 @@ ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
> struct ring_buffer_event *event;
> unsigned long flags;
>
> + again:
> spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
> event = rb_iter_peek(iter, ts);
> spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
>
> + if (event && event->type == RINGBUF_TYPE_PADDING) {
> + cpu_relax();
> + goto again;
> + }
> +
> return event;
> }
>
> @@ -2210,6 +2275,7 @@ ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
> struct ring_buffer_event *event = NULL;
> unsigned long flags;
>
> + again:
> /* might be called in atomic */
> preempt_disable();
>
> @@ -2231,6 +2297,11 @@ ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
> out:
> preempt_enable();
>
> + if (event && event->type == RINGBUF_TYPE_PADDING) {
> + cpu_relax();
> + goto again;
> + }
> +
> return event;
> }
> EXPORT_SYMBOL_GPL(ring_buffer_consume);
> @@ -2309,6 +2380,7 @@ ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
> struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
> unsigned long flags;
>
> + again:
> spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
> event = rb_iter_peek(iter, ts);
> if (!event)
> @@ -2318,6 +2390,11 @@ ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
> out:
> spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
>
> + if (event && event->type == RINGBUF_TYPE_PADDING) {
> + cpu_relax();
> + goto again;
> + }
> +
> return event;
> }
> EXPORT_SYMBOL_GPL(ring_buffer_read);
> --


Looks good!

Acked-by: Frederic Weisbecker <[email protected]>

2009-03-22 18:15:30

by Steven Rostedt

[permalink] [raw]
Subject: Re: [PATCH 2/4] tracing: add ring_buffer_event_discard() to ring buffer


On Sun, 22 Mar 2009, Tom Zanussi wrote:

>
> On Sun, 2009-03-22 at 15:14 +0100, Ingo Molnar wrote:
> > * Tom Zanussi <[email protected]> wrote:
> >
> > > This patch overloads RINGBUF_TYPE_PADDING to provide a way to discard
> > > events from the ring buffer, for the event-filtering mechanism
> > > introduced in a subsequent patch.
> > >
> > > I did the initial version but thanks to Steven Rostedt for adding the
> > > parts that actually made it work. ;-)
> >
> > I suspect it's signed-off-by both of you?
> >
>
> Oops, yeah, here's mine:
>
> Signed-off-by: Tom Zanussi <[email protected]>

Since some of the code was mine too, here's my

Signed-off-by: Steven Rostedt <[email protected]>

In case I did not sign it off before.

-- Steve

2009-03-23 18:54:34

by Steven Rostedt

[permalink] [raw]
Subject: Re: [PATCH 2/4] tracing: add ring_buffer_event_discard() to ring buffer


On Sun, 22 Mar 2009, Frederic Weisbecker wrote:

> >
> > -/* inline for ring buffer fast paths */

[...]

> > static unsigned
> > -rb_event_length(struct ring_buffer_event *event)
> > +rb_event_data_length(struct ring_buffer_event *event)
> > {
> > unsigned length;
> >
> > + if (event->len)
> > + length = event->len * RB_ALIGNMENT;
> > + else
> > + length = event->array[0];
> > + return length + RB_EVNT_HDR_SIZE;
> > +}
> > +
> > +/* inline for ring buffer fast paths */
>
>
> I don't see an inline here.

That was my comment. I use to have a bunch of inlines, but someone thought
gcc could do better and took them all out. The comment needs to be
removed.

-- Steve


>
>
> > +static unsigned
> > +rb_event_length(struct ring_buffer_event *event)
> > +{
> > switch (event->type) {
> > case RINGBUF_TYPE_PADDING:
> > - /* undefined */
> > - return -1;
> > + if (rb_null_event(event))
> > + /* undefined */
> > + return -1;
> > + return rb_event_data_length(event);
> >
> > case RINGBUF_TYPE_TIME_EXTEND: