If the BPF ring buffer is overwritable, ringbuf_process_overwritable_ring() will
be called to handle the data consumption.
All the available data will be consumed but some checks will be performed:
* check we do not read data we already read, if there is no new data, nothing
happens.
* check we do not read more than the buffer size.
* check we do not read invalid data by checking they fit the buffer size.
Signed-off-by: Francis Laniel <[email protected]>
---
tools/include/uapi/linux/bpf.h | 3 +
tools/lib/bpf/ringbuf.c | 106 +++++++++++++++++++++++++++++++++
2 files changed, 109 insertions(+)
diff --git a/tools/include/uapi/linux/bpf.h b/tools/include/uapi/linux/bpf.h
index 59a217ca2dfd..cd73a89e8ead 100644
--- a/tools/include/uapi/linux/bpf.h
+++ b/tools/include/uapi/linux/bpf.h
@@ -1227,6 +1227,9 @@ enum {
/* Create a map that is suitable to be an inner map with dynamic max entries */
BPF_F_INNER_MAP = (1U << 12),
+
+/* Create an over writable BPF_RINGBUF */
+ BFP_F_RB_OVERWRITABLE = (1U << 13),
};
/* Flags for BPF_PROG_QUERY. */
diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c
index 8bc117bcc7bc..2362a6280fc5 100644
--- a/tools/lib/bpf/ringbuf.c
+++ b/tools/lib/bpf/ringbuf.c
@@ -23,6 +23,8 @@
struct ring {
ring_buffer_sample_fn sample_cb;
+ __u8 overwritable: 1,
+ __reserved: 7;
void *ctx;
void *data;
unsigned long *consumer_pos;
@@ -51,6 +53,11 @@ static void ringbuf_unmap_ring(struct ring_buffer *rb, struct ring *r)
}
}
+static inline bool is_overwritable(struct ring *r)
+{
+ return !!r->overwritable;
+}
+
/* Add extra RINGBUF maps to this ring buffer manager */
int ring_buffer__add(struct ring_buffer *rb, int map_fd,
ring_buffer_sample_fn sample_cb, void *ctx)
@@ -95,6 +102,7 @@ int ring_buffer__add(struct ring_buffer *rb, int map_fd,
r->sample_cb = sample_cb;
r->ctx = ctx;
r->mask = info.max_entries - 1;
+ r->overwritable = !!(info.map_flags & BFP_F_RB_OVERWRITABLE);
/* Map writable consumer page */
tmp = mmap(NULL, rb->page_size, PROT_READ | PROT_WRITE, MAP_SHARED,
@@ -202,6 +210,101 @@ static inline int roundup_len(__u32 len)
return (len + 7) / 8 * 8;
}
+
+static int64_t ringbuf_process_overwritable_ring(struct ring *r)
+{
+ /* 64-bit to avoid overflow in case of extreme application behavior */
+ int64_t cnt = 0;
+ unsigned long read_pos, prod_pos, previous_prod_pos;
+
+ prod_pos = smp_load_acquire(r->producer_pos);
+ previous_prod_pos = smp_load_acquire(r->consumer_pos);
+
+ /*
+ * For overwritable ring buffer, we use consumer_pos as the previous
+ * producer_pos.
+ * So, if between two calls to this function, the prod_pos did not move,
+ * it means there is no new data, so we can return right now rather than
+ * dealing with data we already proceeded.
+ * NOTE the kernel space does not care about consumer_pos to reserve()
+ * in overwritable ring buffers, hence we can hijack this field.
+ */
+ if (previous_prod_pos == prod_pos)
+ return 0;
+
+ /*
+ * BPF ring buffer is over writable, we start reading from
+ * producer position.
+ */
+ read_pos = prod_pos;
+ while (read_pos - prod_pos < r->mask) {
+ int *len_ptr, len;
+
+ len_ptr = r->data + (read_pos & r->mask);
+ len = smp_load_acquire(len_ptr);
+
+ /* sample not committed yet, bail out for now */
+ if (len & BPF_RINGBUF_BUSY_BIT)
+ break;
+
+ /*
+ * If len is 0, it means we read all the data
+ * available in the buffer and jump on 0 data:
+ *
+ * prod_pos read_pos
+ * | |
+ * V V
+ * +---+------+----------+-------+------+
+ * | |D....D|C........C|B.....B|A....A|
+ * +---+------+----------+-------+------+
+ */
+ if (!len)
+ break;
+
+ /*
+ * If adding the event len to the current
+ * consumer position makes us wrap the buffer,
+ * it means we already did "one loop" around the
+ * buffer.
+ * So, the pointed data would not be usable:
+ *
+ * prod_pos
+ * read_pos----+ |
+ * | |
+ * V V
+ * +---+------+----------+-------+---+--+
+ * |..E|D....D|C........C|B.....B|A..|E.|
+ * +---+------+----------+-------+---+--+
+ */
+ if (read_pos - prod_pos + len > r->mask)
+ break;
+
+ read_pos += roundup_len(len);
+
+ if ((len & BPF_RINGBUF_DISCARD_BIT) == 0) {
+ void *sample;
+ int err;
+
+ sample = (void *)len_ptr + BPF_RINGBUF_HDR_SZ;
+ err = r->sample_cb(r->ctx, sample, len);
+ if (err < 0) {
+ /* update consumer pos and bail out */
+ smp_store_release(r->consumer_pos,
+ prod_pos);
+ return err;
+ }
+ cnt++;
+ }
+
+ /* This prevents reading data we already processed. */
+ if (previous_prod_pos && read_pos >= previous_prod_pos)
+ break;
+ }
+
+ smp_store_release(r->consumer_pos, prod_pos);
+ return cnt;
+}
+
static int64_t ringbuf_process_ring(struct ring* r)
{
int *len_ptr, len, err;
@@ -211,6 +314,9 @@ static int64_t ringbuf_process_ring(struct ring* r)
bool got_new_data;
void *sample;
+ if (is_overwritable(r))
+ return ringbuf_process_overwritable_ring(r);
+
cons_pos = smp_load_acquire(r->consumer_pos);
do {
got_new_data = false;
--
2.25.1