2019-11-28 01:54:32

by John Ogness

[permalink] [raw]
Subject: [RFC PATCH v5 0/3] printk: new ringbuffer implementation

Hello,

This is a follow-up RFC on the work to re-implement much of the
core of printk. The threads for the previous RFC versions are
here[0][1][2][3].

This RFC includes only the ringbuffer and a test module. This is
a rewrite of the proposed ringbuffer, now based on the proof of
concept[4] from Petr Mladek as agreed at the meeting[5] during
LPC2019 in Lisbon.

The internal structure has been reworked such that the printk
strings are in their own array, each separated by a 32-bit
integer.

A 2nd array contains the dictionary strings (also with each
separated by a 32-bit integer).

A 3rd array is made up of descriptors that contain all the
meta-data for each printk record (sequence number, timestamp,
loglevel, caller, etc.) as well as pointers into the other data
arrays for the text and dictionary data.

The writer interface is somewhat similar to v4, but the reader
interface has changed significantly. Rather than using an
iterator object, readers just specify the sequence number they
want to read. In effect, the sequence number acts as the
iterator.

I have been communicating with Petr the last couple months to
make sure this implementation fits his expectations. This RFC is
mainly to get some feedback from anyone else that may see
something that Petr and I have missed.

This series also includes my test module. On a 16-core ARM64
test machine, the module runs without any errors. I am seeing
the 15 writing cores each writing about 34500 records per
second, while the 1 reading core misses only about 15% of the
total records.

John Ogness

[0] https://lkml.kernel.org/r/[email protected]
[1] https://lkml.kernel.org/r/[email protected]
[2] https://lkml.kernel.org/r/[email protected]
[3] https://lkml.kernel.org/r/[email protected]
[4] https://lkml.kernel.org/r/[email protected]
[5] https://lkml.kernel.org/r/[email protected]

John Ogness (3):
printk-rb: new printk ringbuffer implementation (writer)
printk-rb: new printk ringbuffer implementation (reader)
printk-rb: add test module

kernel/printk/Makefile | 3 +
kernel/printk/printk_ringbuffer.c | 910 ++++++++++++++++++++++++++++++
kernel/printk/printk_ringbuffer.h | 249 ++++++++
kernel/printk/test_prb.c | 347 ++++++++++++
4 files changed, 1509 insertions(+)
create mode 100644 kernel/printk/printk_ringbuffer.c
create mode 100644 kernel/printk/printk_ringbuffer.h
create mode 100644 kernel/printk/test_prb.c

--
2.20.1


2019-11-28 01:54:44

by John Ogness

[permalink] [raw]
Subject: [RFC PATCH v5 1/3] printk-rb: new printk ringbuffer implementation (writer)

Add a new lockless ringbuffer implementation to be used for printk.
First only add support for writers. Reader support will be added in
a follow-up commit.

Signed-off-by: John Ogness <[email protected]>
---
kernel/printk/printk_ringbuffer.c | 676 ++++++++++++++++++++++++++++++
kernel/printk/printk_ringbuffer.h | 239 +++++++++++
2 files changed, 915 insertions(+)
create mode 100644 kernel/printk/printk_ringbuffer.c
create mode 100644 kernel/printk/printk_ringbuffer.h

diff --git a/kernel/printk/printk_ringbuffer.c b/kernel/printk/printk_ringbuffer.c
new file mode 100644
index 000000000000..09c32e52fd40
--- /dev/null
+++ b/kernel/printk/printk_ringbuffer.c
@@ -0,0 +1,676 @@
+// SPDX-License-Identifier: GPL-2.0
+
+#include <linux/kernel.h>
+#include <linux/irqflags.h>
+#include <linux/string.h>
+#include <linux/errno.h>
+#include <linux/bug.h>
+#include "printk_ringbuffer.h"
+
+/**
+ * DOC: printk_ringbuffer overview
+ *
+ * Data Structure
+ * --------------
+ * The printk_ringbuffer is made up of 3 internal ringbuffers::
+ *
+ * * desc_ring: A ring of descriptors. A descriptor contains all record
+ * meta-data (sequence number, timestamp, loglevel, etc.) as
+ * well as internal state information about the record and
+ * logical positions specifying where in the other
+ * ringbuffers the text and dictionary strings are located.
+ *
+ * * text_data_ring: A ring of data blocks. A data block consists of a 32-bit
+ * integer (ID) that maps to a desc_ring index followed by
+ * the text string of the record.
+ *
+ * * dict_data_ring: A ring of data blocks. A data block consists of a 32-bit
+ * integer (ID) that maps to a desc_ring index followed by
+ * the dictionary string of the record.
+ *
+ * Usage
+ * -----
+ * Here are some simple examples demonstrating writers and readers. For the
+ * examples it is assumed that a global ringbuffer is available::
+ *
+ * DECLARE_PRINTKRB(rb, 15, 5, 4);
+ *
+ * This ringbuffer has a size of 1 MiB for text data (2 ^ 20), 512 KiB for
+ * dictionary data (2 ^ 19), and allows up to 32768 records (2 ^ 15).
+ *
+ * Sample writer code::
+ *
+ * struct prb_reserved_entry e;
+ * struct printk_record r;
+ *
+ * // specify how much to allocate
+ * r.text_buf_size = strlen(textstr) + 1;
+ * r.dict_buf_size = strlen(dictstr) + 1;
+ *
+ * if (prb_reserve(&e, &rb, &r)) {
+ * snprintf(r.text_buf, r.text_buf_size, "%s", textstr);
+ *
+ * // dictionary allocation may have failed
+ * if (r.dict_buf)
+ * snprintf(r.dict_buf, r.dict_buf_size, "%s", dictstr);
+ *
+ * r.info->ts_nsec = local_clock();
+ *
+ * prb_commit(&e);
+ * }
+ *
+ * Sample reader code::
+ *
+ * struct printk_info info;
+ * char text_buf[32];
+ * char dict_buf[32];
+ * u64 next_seq = 0;
+ * struct printk_record r = {
+ * .info = &info,
+ * .text_buf = &text_buf[0],
+ * .dict_buf = &dict_buf[0],
+ * .text_buf_size = sizeof(text_buf),
+ * .dict_buf_size = sizeof(dict_buf),
+ * };
+ *
+ * while (prb_read_valid(&rb, next_seq, &r)) {
+ * if (info.seq != next_seq)
+ * pr_warn("lost %llu records\n", info.seq - next_seq);
+ *
+ * if (info.text_len > r.text_buf_size) {
+ * pr_warn("record %llu text truncated\n", info.seq);
+ * text_buf[sizeof(text_buf) - 1] = 0;
+ * }
+ *
+ * if (info.dict_len > r.dict_buf_size) {
+ * pr_warn("record %llu dict truncated\n", info.seq);
+ * dict_buf[sizeof(dict_buf) - 1] = 0;
+ * }
+ *
+ * pr_info("%llu: %llu: %s;%s\n", info.seq, info.ts_nsec,
+ * &text_buf[0], info.dict_len ? &dict_buf[0] : "");
+ *
+ * next_seq = info.seq + 1;
+ * }
+ */
+
+#define DATA_SIZE(data_ring) _DATA_SIZE((data_ring)->size_bits)
+#define DATA_SIZE_MASK(data_ring) (DATA_SIZE(data_ring) - 1)
+
+#define DESCS_COUNT(desc_ring) _DESCS_COUNT((desc_ring)->count_bits)
+#define DESCS_COUNT_MASK(desc_ring) (DESCS_COUNT(desc_ring) - 1)
+
+/* Determine the data array index from a logical position. */
+#define DATA_INDEX(data_ring, lpos) ((lpos) & DATA_SIZE_MASK(data_ring))
+
+/* Determine the desc array index from an ID or sequence number. */
+#define DESC_INDEX(desc_ring, n) ((n) & DESCS_COUNT_MASK(desc_ring))
+
+/* Determine how many times the data array has wrapped. */
+#define DATA_WRAPS(data_ring, lpos) ((lpos) >> (data_ring)->size_bits)
+
+/* Get the logical position at index 0 of the current wrap. */
+#define DATA_THIS_WRAP_START_LPOS(data_ring, lpos) \
+ ((lpos) & ~DATA_SIZE_MASK(data_ring))
+
+/* Get the ID for the same index of the previous wrap as the given ID. */
+#define DESC_ID_PREV_WRAP(desc_ring, id) \
+ DESC_ID((id) - DESCS_COUNT(desc_ring))
+
+/* A data block: maps to the raw data within the data ring. */
+struct prb_data_block {
+ u32 id;
+ u8 data[0];
+};
+
+static struct prb_desc *to_desc(struct prb_desc_ring *desc_ring, u64 n)
+{
+ return &desc_ring->descs[DESC_INDEX(desc_ring, n)];
+}
+
+static struct prb_data_block *to_block(struct prb_data_ring *data_ring,
+ unsigned long begin_lpos)
+{
+ char *data = &data_ring->data[DATA_INDEX(data_ring, begin_lpos)];
+
+ return (struct prb_data_block *)data;
+}
+
+/* Increase the data size to account for data block meta data. */
+static unsigned long to_blk_size(unsigned long size)
+{
+ struct prb_data_block *db = NULL;
+
+ size += sizeof(*db);
+ size = ALIGN(size, sizeof(db->id));
+ return size;
+}
+
+/*
+ * Sanity checker for reserve size. The ringbuffer code assumes that a data
+ * block does not exceed the maximum possible size that could fit within the
+ * ringbuffer. This function provides that basic size check so that the
+ * assumption is safe.
+ */
+static bool data_check_size(struct prb_data_ring *data_ring, unsigned int size)
+{
+ struct prb_data_block *db = NULL;
+
+ /* Writers are not allowed to write data-less records. */
+ if (size == 0)
+ return false;
+
+ /*
+ * Ensure the alignment padded size could possibly fit in the data
+ * array. The largest possible data block must still leave room for
+ * at least the ID of the next block.
+ */
+ size = to_blk_size(size);
+ if (size > DATA_SIZE(data_ring) - sizeof(db->id))
+ return false;
+
+ return true;
+}
+
+/* The possible responses of a descriptor state-query. */
+enum desc_state {
+ desc_miss, /* ID mismatch */
+ desc_reserved, /* reserved, but still in use by writer */
+ desc_committed, /* committed, writer is done */
+ desc_reusable, /* free, not used by any writer */
+};
+
+/* Query the state of a descriptor. */
+static enum desc_state get_desc_state(u32 id, u32 state_val)
+{
+ if (id != DESC_ID(state_val))
+ return desc_miss;
+
+ if (state_val & DESC_REUSE_MASK)
+ return desc_reusable;
+
+ if (state_val & DESC_COMMITTED_MASK)
+ return desc_committed;
+
+ return desc_reserved;
+}
+
+/* Get a copy of a specified descriptor and its state. */
+static enum desc_state desc_read(struct prb_desc_ring *desc_ring, u32 id,
+ struct prb_desc *desc_out)
+{
+ struct prb_desc *desc = to_desc(desc_ring, id);
+ atomic_t *state_var = &desc->state_var;
+ enum desc_state d_state;
+ u32 state_val;
+
+ state_val = atomic_read(state_var);
+ d_state = get_desc_state(id, state_val);
+ if (d_state == desc_miss || d_state == desc_reserved)
+ return d_state;
+
+ /* Check state before loading data. */
+ smp_rmb(); /* matches LMM_REF(prb_commit:A) */
+
+ *desc_out = READ_ONCE(*desc);
+
+ /* Load data before re-checking state. */
+ smp_rmb(); /* matches LMM_REF(desc_reserve:A) */
+
+ state_val = atomic_read(state_var);
+ return get_desc_state(id, state_val);
+}
+
+/*
+ * Take a given descriptor out of the committed state by attempting
+ * the transition: committed->reusable. Either this task or some
+ * other task will have been successful.
+ */
+static void desc_make_reusable(struct prb_desc_ring *desc_ring, u32 id)
+{
+ struct prb_desc *desc = to_desc(desc_ring, id);
+ atomic_t *state_var = &desc->state_var;
+ u32 val_committed = id | DESC_COMMITTED_MASK;
+ u32 val_reusable = val_committed | DESC_REUSE_MASK;
+
+ atomic_cmpxchg(state_var, val_committed, val_reusable);
+}
+
+/*
+ * For a given data ring (text or dict) and its current tail lpos:
+ * for each data block up until @lpos, make the associated descriptor
+ * reusable.
+ *
+ * If there is any problem making the associated descriptor reusable,
+ * either the descriptor has not yet been committed or another writer
+ * task has already pushed the tail lpos past the problematic data
+ * block. Regardless, on error the caller can re-load the tail lpos
+ * to determine the situation.
+ */
+static bool data_make_reusable(struct printk_ringbuffer *rb,
+ struct prb_data_ring *data_ring,
+ unsigned long tail_lpos, unsigned long lpos,
+ unsigned long *lpos_out)
+{
+ struct prb_desc_ring *desc_ring = &rb->desc_ring;
+ struct prb_data_blk_lpos *blk_lpos;
+ struct prb_data_block *blk;
+ enum desc_state d_state;
+ struct prb_desc desc;
+ u32 id;
+
+ /*
+ * Point @blk_lpos to the correct blk_lpos with the local copy
+ * of the descriptor based on the provided @data_ring.
+ */
+ if (data_ring == &rb->text_data_ring)
+ blk_lpos = &desc.text_blk_lpos;
+ else
+ blk_lpos = &desc.dict_blk_lpos;
+
+ /* Loop until @tail_lpos has advanced to or beyond @lpos. */
+ while ((lpos - tail_lpos) - 1 < DATA_SIZE(data_ring)) {
+ blk = to_block(data_ring, tail_lpos);
+ id = READ_ONCE(blk->id);
+
+ d_state = desc_read(desc_ring, id, &desc);
+
+ switch (d_state) {
+ case desc_miss:
+ return false;
+ case desc_reserved:
+ return false;
+ case desc_committed:
+ /* Does the descriptor link to this data block? */
+ if (blk_lpos->begin != tail_lpos)
+ return false;
+ desc_make_reusable(desc_ring, id);
+ break;
+ case desc_reusable:
+ /* Does the descriptor link to this data block? */
+ if (blk_lpos->begin != tail_lpos)
+ return false;
+ break;
+ }
+
+ /* Advance @tail_lpos to the next data block. */
+ tail_lpos = blk_lpos->next;
+ }
+
+ *lpos_out = tail_lpos;
+
+ return true;
+}
+
+/*
+ * Advance the data ring tail to at least @lpos. This function puts all
+ * descriptors into the reusable state if the tail will be pushed beyond their
+ * associated data block.
+ */
+static bool data_push_tail(struct printk_ringbuffer *rb,
+ struct prb_data_ring *data_ring, unsigned long lpos)
+{
+ unsigned long tail_lpos;
+ unsigned long next_lpos;
+
+ /* If @lpos is not valid, there is nothing to do. */
+ if (lpos == INVALID_LPOS)
+ return true;
+
+ tail_lpos = atomic_long_read(&data_ring->tail_lpos);
+
+ do {
+ /* If @lpos is no longer valid, there is nothing to do. */
+ if (lpos - tail_lpos >= DATA_SIZE(data_ring))
+ break;
+
+ /*
+ * Make all descriptors reusable that are associated with
+ * data blocks before @lpos.
+ */
+ if (!data_make_reusable(rb, data_ring, tail_lpos, lpos,
+ &next_lpos)) {
+ /*
+ * data_make_reusable() performed state loads. Make
+ * sure they are done before reloading the tail lpos.
+ */
+ smp_rmb(); /* matches LMM_REF(data_push_tail:A) */
+
+ /*
+ * Failure may be due to another task pushing the
+ * tail. Reload the tail and check.
+ */
+ next_lpos = atomic_long_read(&data_ring->tail_lpos);
+ if (next_lpos == tail_lpos)
+ return false;
+
+ /* Another task pushed the tail. Try again. */
+ tail_lpos = next_lpos;
+ continue;
+ }
+ /*
+ * Advance the tail lpos (invalidating the data block) before
+ * allowing future changes (such as the recycling of the
+ * associated descriptor).
+ */
+ } while (!atomic_long_try_cmpxchg(&data_ring->tail_lpos, &tail_lpos,
+ next_lpos)); /* LMM_TAG(data_push_tail:A) */
+
+ return true;
+}
+
+/*
+ * Advance the desc ring tail. This function advances the tail by one
+ * descriptor, thus invalidating the oldest descriptor. Before advancing
+ * the tail, the tail descriptor is made reusable and all data blocks up to
+ * and including the descriptor's data block are invalidated (i.e. the data
+ * ring tail is pushed past the data block of the descriptor being made
+ * reusable).
+ */
+static bool desc_push_tail(struct printk_ringbuffer *rb, u32 tail_id)
+{
+ struct prb_desc_ring *desc_ring = &rb->desc_ring;
+ enum desc_state d_state;
+ struct prb_desc desc;
+
+ d_state = desc_read(desc_ring, tail_id, &desc);
+
+ switch (d_state) {
+ case desc_miss:
+ /*
+ * If the ID is exactly 1 wrap behind the expected, it is
+ * being reserved by another writer and cannot be invalidated.
+ */
+ if (DESC_ID(atomic_read(&desc.state_var)) ==
+ DESC_ID_PREV_WRAP(desc_ring, tail_id)) {
+ return false;
+ }
+ return true;
+ case desc_reserved:
+ return false;
+ case desc_committed:
+ desc_make_reusable(desc_ring, tail_id);
+ break;
+ case desc_reusable:
+ break;
+ }
+
+ /*
+ * Data blocks must be invalidated before their associated
+ * descriptor can be made available for recycling. Invalidating
+ * them later is not possible because there is no way to trust
+ * data blocks once their associated descriptor is gone.
+ */
+
+ if (!data_push_tail(rb, &rb->text_data_ring, desc.text_blk_lpos.next))
+ return false;
+ if (!data_push_tail(rb, &rb->dict_data_ring, desc.dict_blk_lpos.next))
+ return false;
+
+ /*
+ * Check the next descriptor after @tail_id before pushing the tail to
+ * it because the tail must always be in a committed or reusable
+ * state. The implementation of get_desc_tail_seq() relies on this.
+ *
+ * A successful read also implies that the next descriptor <= @head_id
+ * so there is no risk of pushing the tail past the head.
+ */
+ d_state = desc_read(desc_ring, DESC_ID(tail_id + 1), &desc);
+ if (d_state == desc_committed || d_state == desc_reusable) {
+ atomic_cmpxchg(&desc_ring->tail_id, tail_id,
+ DESC_ID(tail_id + 1));
+ } else {
+ /*
+ * The descriptor following @tail_id is not in an allowed
+ * tail state. But if the tail has since been moved by
+ * another task already, then it does not matter.
+ */
+ if (atomic_read(&desc_ring->tail_id) == tail_id)
+ return false;
+ }
+
+ return true;
+}
+
+/* Reserve a new descriptor, invalidating the oldest if necessary. */
+static bool desc_reserve(struct printk_ringbuffer *rb, u32 *id_out)
+{
+ struct prb_desc_ring *desc_ring = &rb->desc_ring;
+ struct prb_desc *desc;
+ u32 id_prev_wrap;
+ u32 head_id;
+ u32 id;
+
+ head_id = atomic_read(&desc_ring->head_id);
+
+ do {
+ desc = to_desc(desc_ring, head_id);
+
+ id = DESC_ID(head_id + 1);
+ id_prev_wrap = DESC_ID_PREV_WRAP(desc_ring, id);
+
+ if (id_prev_wrap == atomic_read(&desc_ring->tail_id)) {
+ if (!desc_push_tail(rb, id_prev_wrap))
+ return false;
+ }
+ } while (!atomic_try_cmpxchg(&desc_ring->head_id, &head_id, id));
+
+ desc = to_desc(desc_ring, id);
+
+ /* Set the descriptor's ID and also set its state to reserved. */
+ atomic_set(&desc->state_var, id | 0);
+
+ /* Store new ID/state before making any other changes. */
+ smp_wmb(); /* LMM_TAG(desc_reserve:A) */
+
+ *id_out = id;
+ return true;
+}
+
+/* Determine the end of a data block. */
+static unsigned long get_next_lpos(struct prb_data_ring *data_ring,
+ unsigned long lpos, unsigned int size)
+{
+ unsigned long begin_lpos;
+ unsigned long next_lpos;
+
+ begin_lpos = lpos;
+ next_lpos = lpos + size;
+
+ if (DATA_WRAPS(data_ring, begin_lpos) ==
+ DATA_WRAPS(data_ring, next_lpos)) {
+ /* The data block does not wrap. */
+ return next_lpos;
+ }
+
+ /* Wrapping data blocks store their data at the beginning. */
+ return (DATA_THIS_WRAP_START_LPOS(data_ring, next_lpos) + size);
+}
+
+/*
+ * Allocate a new data block, invalidating the oldest data block(s)
+ * if necessary. This function also associates the data block with
+ * a specified descriptor.
+ */
+static char *data_alloc(struct printk_ringbuffer *rb,
+ struct prb_data_ring *data_ring, unsigned long size,
+ struct prb_data_blk_lpos *blk_lpos, u32 id)
+{
+ struct prb_data_block *blk;
+ unsigned long begin_lpos;
+ unsigned long next_lpos;
+
+ if (size == 0) {
+ /* Specify a data-less block. */
+ blk_lpos->begin = INVALID_LPOS;
+ blk_lpos->next = INVALID_LPOS;
+ return NULL;
+ }
+
+ size = to_blk_size(size);
+
+ begin_lpos = atomic_long_read(&data_ring->head_lpos);
+
+ do {
+ next_lpos = get_next_lpos(data_ring, begin_lpos, size);
+
+ if (!data_push_tail(rb, data_ring,
+ next_lpos - DATA_SIZE(data_ring))) {
+ /* Failed to allocate, specify a data-less block. */
+ blk_lpos->begin = INVALID_LPOS;
+ blk_lpos->next = INVALID_LPOS;
+ return NULL;
+ }
+ } while (!atomic_long_try_cmpxchg(&data_ring->head_lpos, &begin_lpos,
+ next_lpos));
+
+ /*
+ * No barrier is needed here. The data validity is defined by
+ * the state of the associated descriptor. They are marked as
+ * invalid at the moment. And only the winner of the above
+ * cmpxchg() could write here.
+ */
+
+ blk = to_block(data_ring, begin_lpos);
+ blk->id = id;
+
+ if (DATA_WRAPS(data_ring, begin_lpos) !=
+ DATA_WRAPS(data_ring, next_lpos)) {
+ /* Wrapping data blocks store their data at the beginning. */
+ blk = to_block(data_ring, 0);
+ blk->id = id;
+ }
+
+ blk_lpos->begin = begin_lpos;
+ blk_lpos->next = next_lpos;
+
+ return &blk->data[0];
+}
+
+/**
+ * prb_reserve() - Reserve space in the ringbuffer.
+ *
+ * @e: The entry structure to setup.
+ * @rb: The ringbuffer to reserve data in.
+ * @r: The record structure to allocate buffers for.
+ *
+ * This is the public function available to writers to reserve data.
+ *
+ * The writer specifies the text and dict sizes to reserve by setting the
+ * @text_buf_size and @dict_buf_size fields of @r, respectively. Dictionaries
+ * are optional, so @dict_buf_size is allowed to be 0.
+ *
+ * Context: Any context. Disables local interrupts on success.
+ * Return: true if at least text data could be allocated, otherwise false.
+ *
+ * On success, the fields @info, @text_buf, @dict_buf of @r will be set by
+ * this function and should be filled in by the writer before committing.
+ *
+ * If the function fails to reserve dictionary space (but all else succeeded),
+ * it will still report success. In that case @dict_buf is set to NULL and
+ * @dict_buf_size is set to 0. Writers must check this before writing to
+ * dictionary space.
+ */
+bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
+ struct printk_record *r)
+{
+ struct prb_desc_ring *desc_ring = &rb->desc_ring;
+ struct prb_desc *d;
+ u32 id;
+
+ if (!data_check_size(&rb->text_data_ring, r->text_buf_size))
+ return false;
+
+ /* Records without dictionaries are allowed. */
+ if (r->dict_buf_size) {
+ if (!data_check_size(&rb->dict_data_ring, r->dict_buf_size))
+ return false;
+ }
+
+ /* Disable interrupts during the reserve/commit window. */
+ local_irq_save(e->irqflags);
+
+ if (!desc_reserve(rb, &id)) {
+ /* Descriptor reservation failures are tracked. */
+ atomic_long_inc(&rb->fail);
+ local_irq_restore(e->irqflags);
+ return false;
+ }
+
+ d = to_desc(desc_ring, id);
+
+ /*
+ * Set the @e fields here so that prb_commit() can be used if
+ * text data allocation fails.
+ */
+ e->rb = rb;
+ e->id = id;
+
+ /*
+ * Initialize the sequence number if it has never been set.
+ * Otherwise just increment it by a full wrap.
+ *
+ * @seq is considered "never been set" if it has a value of 0,
+ * _except_ for descs[0], which was set by the ringbuffer initializer
+ * and therefore is always considered as set.
+ *
+ * See the "Bootstrap" comment block in printk_ringbuffer.h for
+ * details about how the initializer bootstraps the descriptors.
+ */
+ if (d->info.seq == 0 && DESC_INDEX(desc_ring, id) != 0)
+ d->info.seq = DESC_INDEX(desc_ring, id);
+ else
+ d->info.seq += DESCS_COUNT(desc_ring);
+
+ r->text_buf = data_alloc(rb, &rb->text_data_ring, r->text_buf_size,
+ &d->text_blk_lpos, id);
+ /* If text data allocation fails, a data-less record is committed. */
+ if (r->text_buf_size && !r->text_buf) {
+ d->info.text_len = 0;
+ d->info.dict_len = 0;
+ prb_commit(e);
+ return false;
+ }
+
+ r->dict_buf = data_alloc(rb, &rb->dict_data_ring, r->dict_buf_size,
+ &d->dict_blk_lpos, id);
+ /*
+ * If dict data allocation fails, the caller can still commit
+ * text. But dictionary information will not be available.
+ */
+ if (r->dict_buf_size && !r->dict_buf)
+ r->dict_buf_size = 0;
+
+ r->info = &d->info;
+
+ /* Set default values for the lengths. */
+ d->info.text_len = r->text_buf_size;
+ d->info.dict_len = r->dict_buf_size;
+
+ return true;
+}
+EXPORT_SYMBOL(prb_reserve);
+
+/**
+ * prb_commit() - Commit (previously reserved) data to the ringbuffer.
+ *
+ * @e: The entry containing the reserved data information.
+ *
+ * This is the public function available to writers to commit data.
+ *
+ * Context: Any context. Enables local interrupts.
+ */
+void prb_commit(struct prb_reserved_entry *e)
+{
+ struct prb_desc_ring *desc_ring = &e->rb->desc_ring;
+ struct prb_desc *d = to_desc(desc_ring, e->id);
+
+ /* Store all record data before updating the state. */
+ smp_wmb(); /* LMM_TAG(prb_commit:A) */
+
+ atomic_set(&d->state_var, e->id | DESC_COMMITTED_MASK);
+
+ /* Restore interrupts, the reserve/commit window is finished. */
+ local_irq_restore(e->irqflags);
+}
+EXPORT_SYMBOL(prb_commit);
diff --git a/kernel/printk/printk_ringbuffer.h b/kernel/printk/printk_ringbuffer.h
new file mode 100644
index 000000000000..b6f06e5edc1b
--- /dev/null
+++ b/kernel/printk/printk_ringbuffer.h
@@ -0,0 +1,239 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+
+#ifndef _KERNEL_PRINTK_RINGBUFFER_H
+#define _KERNEL_PRINTK_RINGBUFFER_H
+
+#include <linux/atomic.h>
+
+struct printk_info {
+ u64 seq; /* sequence number */
+ u64 ts_nsec; /* timestamp in nanoseconds */
+ u16 text_len; /* length of entire record */
+ u16 dict_len; /* length of dictionary buffer */
+ u8 facility; /* syslog facility */
+ u8 flags:5; /* internal record flags */
+ u8 level:3; /* syslog level */
+ u32 caller_id; /* thread id or processor id */
+};
+
+/*
+ * A structure providing the buffers, used by writers.
+ *
+ * Writers:
+ * The writer sets @text_buf_size and @dict_buf_size before calling
+ * prb_reserve(). On success, prb_reserve() sets @info, @text_buf, @dict_buf.
+ */
+struct printk_record {
+ struct printk_info *info;
+ char *text_buf;
+ char *dict_buf;
+ unsigned int text_buf_size;
+ unsigned int dict_buf_size;
+};
+
+/* Specifies the position/span of a data block. */
+struct prb_data_blk_lpos {
+ unsigned long begin;
+ unsigned long next;
+};
+
+/* A descriptor: the complete meta-data for a record. */
+struct prb_desc {
+ struct printk_info info;
+ atomic_t state_var;
+ struct prb_data_blk_lpos text_blk_lpos;
+ struct prb_data_blk_lpos dict_blk_lpos;
+};
+
+/* A ringbuffer of "struct prb_data_block + data" elements. */
+struct prb_data_ring {
+ unsigned int size_bits;
+ char *data;
+ atomic_long_t head_lpos;
+ atomic_long_t tail_lpos;
+};
+
+/* A ringbuffer of "struct prb_desc" elements. */
+struct prb_desc_ring {
+ unsigned int count_bits;
+ struct prb_desc *descs;
+ atomic_t head_id;
+ atomic_t tail_id;
+};
+
+/* The high level structure representing the printk ringbuffer. */
+struct printk_ringbuffer {
+ struct prb_desc_ring desc_ring;
+ struct prb_data_ring text_data_ring;
+ struct prb_data_ring dict_data_ring;
+ atomic_long_t fail;
+};
+
+/* Used by writers as a reserve/commit handle. */
+struct prb_reserved_entry {
+ struct printk_ringbuffer *rb;
+ unsigned long irqflags;
+ u32 id;
+};
+
+#define _DATA_SIZE(sz_bits) (1UL << (sz_bits))
+#define _DESCS_COUNT(ct_bits) (1U << (ct_bits))
+#define DESC_SV_BITS (sizeof(int) * 8)
+#define DESC_COMMITTED_MASK (1U << (DESC_SV_BITS - 1))
+#define DESC_REUSE_MASK (1U << (DESC_SV_BITS - 2))
+#define DESC_FLAGS_MASK (DESC_COMMITTED_MASK | DESC_REUSE_MASK)
+#define DESC_ID_MASK (~DESC_FLAGS_MASK)
+#define DESC_ID(sv) ((sv) & DESC_ID_MASK)
+#define INVALID_LPOS 1
+
+#define INVALID_BLK_LPOS \
+ { \
+ .begin = INVALID_LPOS, \
+ .next = INVALID_LPOS, \
+ }
+
+/*
+ * Descriptor Bootstrap
+ *
+ * The descriptor array is minimally initialized to allow immediate usage
+ * by readers and writers. The requirements that the descriptor array
+ * initialization must satisfy:
+ *
+ * R1: The tail must point to an existing (committed or reusable) descriptor.
+ * This is required by the implementation of get_desc_tail_seq().
+ *
+ * R2: Readers must see that the ringbuffer is initially empty.
+ *
+ * R3: The first record reserved by a writer is assigned sequence number 0.
+ *
+ * To satisfy R1, the tail points to a descriptor that is minimally
+ * initialized (having no data block, i.e. data block's lpos @begin and @next
+ * values are set to INVALID_LPOS).
+ *
+ * To satisfy R2, the tail descriptor is initialized to the reusable state.
+ * Readers recognize reusable descriptors as existing records, but skip over
+ * them.
+ *
+ * To satisfy R3, the last descriptor in the array is used as the initial head
+ * (and tail) descriptor. This allows the first record reserved by a writer
+ * (head + 1) to be the first descriptor in the array. (Only the first
+ * descriptor in the array could have a valid sequence number of 0.)
+ *
+ * The first time a descriptor is reserved, it is assigned a sequence number
+ * with the value of the array index. A "first time reserved" descriptor can
+ * be recognized because it has a sequence number of 0 even though it does not
+ * have an index of 0. (Only the first descriptor in the array could have a
+ * valid sequence number of 0.) After the first reservation, all future
+ * reservations simply involve incrementing the sequence number by the array
+ * count.
+ *
+ * Hack #1:
+ * The first descriptor in the array is allowed to have a sequence number 0.
+ * In this case it is not possible to recognize if it is being reserved the
+ * first time (set to index value) or has been reserved previously (increment
+ * by the the array count)? This is handled by _always_ incrementing the
+ * sequence number when reserving the first descriptor in the array. So in
+ * order to satisfy R3, the sequence number of the first descriptor in the
+ * array is initialized to minus the array count. Then, upon the first
+ * reservation, it is incremented to 0.
+ *
+ * Hack #2:
+ * get_desc_tail_seq() can be called at any time by readers to retrieve the
+ * sequence number of the tail descriptor. However, due to R2 and R3,
+ * initially there are no records to report the sequence number of (sequence
+ * numbers are u64 and there is nothing less than 0). To handle this, the
+ * sequence number of the tail descriptor is initialized to 0. Technically
+ * this is incorrect, because there is no record with sequence number 0 (yet)
+ * and the tail descriptor is not the first descriptor in the array. But it
+ * allows prb_read_valid() to correctly report that the record is
+ * non-existent for any given sequence number. Bootstrapping is complete when
+ * the tail is pushed the first time, thus finally pointing to the first
+ * descriptor reserved by a writer, which has the assigned sequence number 0.
+ */
+
+/*
+ * Initiating Logical Value Overflows
+ *
+ * Both logical position (lpos) and ID values can be mapped to array indexes
+ * but may experience overflows during the lifetime of the system. To ensure
+ * that printk_ringbuffer can handle the overflows for these types, initial
+ * values are chosen that map to the correct initial array indexes, but will
+ * result in overflows soon.
+ *
+ * BLK0_LPOS: The initial @head_lpos and @tail_lpos for data rings.
+ * It is at index 0 and the lpos value is such that it
+ * will overflow on the first wrap.
+ *
+ * DESC0_ID: The initial @head_id and @tail_id for the desc ring. It is
+ * at the last index of the descriptor array and the ID value
+ * is such that it will overflow on the second wrap.
+ */
+#define BLK0_LPOS(sz_bits) (-(_DATA_SIZE(sz_bits)))
+#define DESC0_ID(ct_bits) DESC_ID(-(_DESCS_COUNT(ct_bits) + 1))
+#define DESC0_SV(ct_bits) (DESC_COMMITTED_MASK | DESC_REUSE_MASK | \
+ DESC0_ID(ct_bits))
+
+#define DECLARE_PRINTKRB(name, descbits, avgtextbits, avgdictbits) \
+char _##name##_text[1U << ((avgtextbits) + (descbits))] \
+ __aligned(__alignof__(int)); \
+char _##name##_dict[1U << ((avgdictbits) + (descbits))] \
+ __aligned(__alignof__(int)); \
+struct prb_desc _##name##_descs[_DESCS_COUNT(descbits)] = { \
+ /* this will be the first record reserved by a writer */ \
+ [0] = { \
+ .info = { \
+ /*
+ * will be incremented to 0 on
+ * the first reservation
+ */ \
+ .seq = -(u64)_DESCS_COUNT(descbits), \
+ }, \
+ }, \
+ /* the initial head and tail */ \
+ [_DESCS_COUNT(descbits) - 1] = { \
+ .info = { \
+ /*
+ * reports the minimal seq value
+ * during the bootstrap phase
+ */ \
+ .seq = 0, \
+ }, \
+ /* reusable */ \
+ .state_var = ATOMIC_INIT(DESC0_SV(descbits)), \
+ /* no associated data block */ \
+ .text_blk_lpos = INVALID_BLK_LPOS, \
+ .dict_blk_lpos = INVALID_BLK_LPOS, \
+ }, \
+ }; \
+struct printk_ringbuffer name = { \
+ .desc_ring = { \
+ .count_bits = descbits, \
+ .descs = &_##name##_descs[0], \
+ .head_id = ATOMIC_INIT(DESC0_ID(descbits)), \
+ .tail_id = ATOMIC_INIT(DESC0_ID(descbits)), \
+ }, \
+ .text_data_ring = { \
+ .size_bits = (avgtextbits) + (descbits), \
+ .data = &_##name##_text[0], \
+ .head_lpos = ATOMIC_LONG_INIT(BLK0_LPOS( \
+ (avgtextbits) + (descbits))), \
+ .tail_lpos = ATOMIC_LONG_INIT(BLK0_LPOS( \
+ (avgtextbits) + (descbits))), \
+ }, \
+ .dict_data_ring = { \
+ .size_bits = (avgtextbits) + (descbits), \
+ .data = &_##name##_dict[0], \
+ .head_lpos = ATOMIC_LONG_INIT(BLK0_LPOS( \
+ (avgtextbits) + (descbits))), \
+ .tail_lpos = ATOMIC_LONG_INIT(BLK0_LPOS( \
+ (avgtextbits) + (descbits))), \
+ }, \
+ .fail = ATOMIC_LONG_INIT(0), \
+}
+
+/* Writer Interface */
+bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
+ struct printk_record *r);
+void prb_commit(struct prb_reserved_entry *e);
+
+#endif /* _KERNEL_PRINTK_RINGBUFFER_H */
--
2.20.1

2019-11-28 01:54:50

by John Ogness

[permalink] [raw]
Subject: [RFC PATCH v5 2/3] printk-rb: new printk ringbuffer implementation (reader)

Add the reader implementation for the new ringbuffer.

Signed-off-by: John Ogness <[email protected]>
---
kernel/printk/printk_ringbuffer.c | 234 ++++++++++++++++++++++++++++++
kernel/printk/printk_ringbuffer.h | 12 +-
2 files changed, 245 insertions(+), 1 deletion(-)

diff --git a/kernel/printk/printk_ringbuffer.c b/kernel/printk/printk_ringbuffer.c
index 09c32e52fd40..f85762713583 100644
--- a/kernel/printk/printk_ringbuffer.c
+++ b/kernel/printk/printk_ringbuffer.c
@@ -674,3 +674,237 @@ void prb_commit(struct prb_reserved_entry *e)
local_irq_restore(e->irqflags);
}
EXPORT_SYMBOL(prb_commit);
+
+/*
+ * Given @blk_lpos, return a pointer to the raw data from the data block
+ * and calculate the size of the data part. A NULL pointer is returned
+ * if @blk_lpos specifies values that could never be legal.
+ *
+ * This function (used by readers) performs strict validation on the lpos
+ * values to possibly detect bugs in the writer code. A WARN_ON_ONCE() is
+ * triggered if an internal error is detected.
+ */
+static char *get_data(struct prb_data_ring *data_ring,
+ struct prb_data_blk_lpos *blk_lpos,
+ unsigned long *data_size)
+{
+ struct prb_data_block *db;
+
+ if (blk_lpos->begin == INVALID_LPOS &&
+ blk_lpos->next == INVALID_LPOS) {
+ /* descriptor without a data block */
+ return NULL;
+ } else if (DATA_WRAPS(data_ring, blk_lpos->begin) ==
+ DATA_WRAPS(data_ring, blk_lpos->next)) {
+ /* regular data block */
+ if (WARN_ON_ONCE(blk_lpos->next <= blk_lpos->begin))
+ return NULL;
+ db = to_block(data_ring, blk_lpos->begin);
+ *data_size = blk_lpos->next - blk_lpos->begin;
+
+ } else if ((DATA_WRAPS(data_ring, blk_lpos->begin) + 1 ==
+ DATA_WRAPS(data_ring, blk_lpos->next)) ||
+ ((DATA_WRAPS(data_ring, blk_lpos->begin) ==
+ DATA_WRAPS(data_ring, -1UL)) &&
+ (DATA_WRAPS(data_ring, blk_lpos->next) == 0))) {
+ /* wrapping data block */
+ db = to_block(data_ring, 0);
+ *data_size = DATA_INDEX(data_ring, blk_lpos->next);
+
+ } else {
+ WARN_ON_ONCE(1);
+ return NULL;
+ }
+
+ /* A valid data block will always be aligned to the ID size. */
+ if (WARN_ON_ONCE(blk_lpos->begin !=
+ ALIGN(blk_lpos->begin, sizeof(db->id))) ||
+ WARN_ON_ONCE(blk_lpos->next !=
+ ALIGN(blk_lpos->next, sizeof(db->id)))) {
+ return NULL;
+ }
+
+ /* A valid data block will always have at least an ID. */
+ if (WARN_ON_ONCE(*data_size < sizeof(db->id)))
+ return NULL;
+
+ /* Subtract descriptor ID space from size. */
+ *data_size -= sizeof(db->id);
+
+ return &db->data[0];
+}
+
+/* Given @blk_lpos, copy an expected @len of data into the provided buffer. */
+static bool copy_data(struct prb_data_ring *data_ring,
+ struct prb_data_blk_lpos *blk_lpos, u16 len, char *buf,
+ unsigned int buf_size)
+{
+ unsigned long data_size;
+ char *data;
+
+ /* Caller might not want the data. */
+ if (!buf || !buf_size)
+ return true;
+
+ data = get_data(data_ring, blk_lpos, &data_size);
+ if (!data)
+ return false;
+
+ /* Actual cannot be less than expected. */
+ if (WARN_ON_ONCE(data_size < len))
+ return false;
+
+ data_size = min_t(u16, buf_size, len);
+
+ if (!WARN_ON_ONCE(!data_size))
+ memcpy(&buf[0], data, data_size);
+ return true;
+}
+
+/*
+ * Read the record @id and verify that it is committed and has the sequence
+ * number @seq.
+ *
+ * Error return values:
+ * -EINVAL: The record @seq does not exist.
+ * -ENOENT: The record @seq exists, but its data is not available. This is a
+ * valid record, so readers should continue with the next seq.
+ */
+static int desc_read_committed(struct prb_desc_ring *desc_ring, u32 id,
+ u64 seq, struct prb_desc *desc)
+{
+ enum desc_state d_state;
+
+ d_state = desc_read(desc_ring, id, desc);
+ if (desc->info.seq != seq)
+ return -EINVAL;
+ else if (d_state == desc_reusable)
+ return -ENOENT;
+ else if (d_state != desc_committed)
+ return -EINVAL;
+
+ return 0;
+}
+
+/*
+ * Copy the ringbuffer data from the record with @seq to the provided
+ * @r buffer. On success, 0 is returned.
+ *
+ * See desc_read_committed() for error return values.
+ */
+static int prb_read(struct printk_ringbuffer *rb, u64 seq,
+ struct printk_record *r)
+{
+ struct prb_desc_ring *desc_ring = &rb->desc_ring;
+ struct prb_desc *rdesc = to_desc(desc_ring, seq);
+ atomic_t *state_var = &rdesc->state_var;
+ struct prb_desc desc;
+ int err;
+ u32 id;
+
+ /* Get a reliable local copy of the descriptor and check validity. */
+ id = DESC_ID(atomic_read(state_var));
+ err = desc_read_committed(desc_ring, id, seq, &desc);
+ if (err)
+ return err;
+
+ /* If requested, copy meta data. */
+ if (r->info)
+ memcpy(r->info, &desc.info, sizeof(*(r->info)));
+
+ /*
+ * Load/copy text data. If it fails, this is a
+ * data-less descriptor.
+ */
+ if (!copy_data(&rb->text_data_ring, &desc.text_blk_lpos,
+ desc.info.text_len, r->text_buf, r->text_buf_size)) {
+ return -ENOENT;
+ }
+
+ /*
+ * Load/copy dict data. Although this should not fail, dict data
+ * is not important. So if it fails, modify the copied meta data
+ * to report that there is no dict data, thus silently dropping
+ * the dict data.
+ */
+ if (!copy_data(&rb->dict_data_ring, &desc.dict_blk_lpos,
+ desc.info.dict_len, r->dict_buf, r->dict_buf_size)) {
+ if (r->info)
+ r->info->dict_len = 0;
+ }
+
+ /* Re-check real descriptor validity. */
+ return desc_read_committed(desc_ring, id, seq, &desc);
+}
+
+/* Get the sequence number of the tail descriptor. */
+static u64 get_desc_tail_seq(struct printk_ringbuffer *rb)
+{
+ struct prb_desc_ring *desc_ring = &rb->desc_ring;
+ enum desc_state d_state;
+ struct prb_desc desc;
+ u32 id;
+
+ do {
+ id = atomic_read(&rb->desc_ring.tail_id);
+ d_state = desc_read(desc_ring, id, &desc);
+
+ /*
+ * This loop will not be infinite because the tail is
+ * _always_ in the committed or reusable state.
+ */
+ } while (d_state != desc_committed && d_state != desc_reusable);
+
+ return desc.info.seq;
+}
+
+/**
+ * prb_read_valid() - Non-blocking read of a requested record or (if gone)
+ * the next available record.
+ *
+ * @rb: The ringbuffer to read from.
+ * @seq: The sequence number of the record to read.
+ * @r: The record data buffer to store the read record to.
+ *
+ * This is the public function available to readers to read a record.
+ *
+ * The reader provides the @info, @text_buf, @dict_buf buffers of @r to be
+ * filled in.
+ *
+ * Context: Any context.
+ * Return: true if a record was read, otherwise false.
+ *
+ * On success, the reader must check r->info.seq to see which record was
+ * actually read. This allows the reader to detect dropped records.
+ *
+ * Failure means @seq refers to a not yet written record.
+ */
+bool prb_read_valid(struct printk_ringbuffer *rb, u64 seq,
+ struct printk_record *r)
+{
+ u64 tail_seq;
+ int err;
+
+ while ((err = prb_read(rb, seq, r))) {
+ tail_seq = get_desc_tail_seq(rb);
+
+ if (seq < tail_seq) {
+ /*
+ * Behind the tail. Catch up and try again. This
+ * can happen for -ENOENT and -EINVAL cases.
+ */
+ seq = tail_seq;
+
+ } else if (err == -ENOENT) {
+ /* Record exists, but no data available. Skip. */
+ seq++;
+
+ } else {
+ /* Non-existent/non-committed record. Must stop. */
+ return false;
+ }
+ }
+
+ return true;
+}
+EXPORT_SYMBOL(prb_read_valid);
diff --git a/kernel/printk/printk_ringbuffer.h b/kernel/printk/printk_ringbuffer.h
index b6f06e5edc1b..3c0eaa165a5f 100644
--- a/kernel/printk/printk_ringbuffer.h
+++ b/kernel/printk/printk_ringbuffer.h
@@ -17,11 +17,17 @@ struct printk_info {
};

/*
- * A structure providing the buffers, used by writers.
+ * A structure providing the buffers, used by writers and readers.
*
* Writers:
* The writer sets @text_buf_size and @dict_buf_size before calling
* prb_reserve(). On success, prb_reserve() sets @info, @text_buf, @dict_buf.
+ *
+ * Readers:
+ * The reader sets all fields before calling prb_read_valid(). Note that
+ * the reader provides the @info, @text_buf, @dict_buf buffers. On success,
+ * the struct pointed to by @info will be filled and the char arrays pointed
+ * to by @text_buf and @dict_buf will be filled with text and dict data.
*/
struct printk_record {
struct printk_info *info;
@@ -236,4 +242,8 @@ bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
struct printk_record *r);
void prb_commit(struct prb_reserved_entry *e);

+/* Reader Interface */
+bool prb_read_valid(struct printk_ringbuffer *rb, u64 seq,
+ struct printk_record *r);
+
#endif /* _KERNEL_PRINTK_RINGBUFFER_H */
--
2.20.1

2019-11-28 01:55:28

by John Ogness

[permalink] [raw]
Subject: [RFC PATCH v5 3/3] printk-rb: add test module

This module does some heavy write stress testing on the ringbuffer
with a reader that is checking for integrity.

Signed-off-by: John Ogness <[email protected]>
---
kernel/printk/Makefile | 3 +
kernel/printk/test_prb.c | 347 +++++++++++++++++++++++++++++++++++++++
2 files changed, 350 insertions(+)
create mode 100644 kernel/printk/test_prb.c

diff --git a/kernel/printk/Makefile b/kernel/printk/Makefile
index 4d052fc6bcde..2aabbe561efc 100644
--- a/kernel/printk/Makefile
+++ b/kernel/printk/Makefile
@@ -2,3 +2,6 @@
obj-y = printk.o
obj-$(CONFIG_PRINTK) += printk_safe.o
obj-$(CONFIG_A11Y_BRAILLE_CONSOLE) += braille.o
+
+prbtest-y = printk_ringbuffer.o test_prb.o
+obj-m += prbtest.o
diff --git a/kernel/printk/test_prb.c b/kernel/printk/test_prb.c
new file mode 100644
index 000000000000..d038b16bf01b
--- /dev/null
+++ b/kernel/printk/test_prb.c
@@ -0,0 +1,347 @@
+// SPDX-License-Identifier: GPL-2.0
+
+#include <linux/init.h>
+#include <linux/module.h>
+#include <linux/kthread.h>
+#include <linux/delay.h>
+#include <linux/random.h>
+#include <linux/slab.h>
+#include <linux/wait.h>
+#include "printk_ringbuffer.h"
+
+/*
+ * This is a test module that starts "num_online_cpus()" writer threads
+ * that each write data of varying length. They do this as fast as
+ * they can.
+ *
+ * Dictionary data is stored in a separate data ring. The writers will
+ * only write dictionary data about half the time. This is to make the
+ * test more realistic with text and dict data rings containing
+ * different data blocks.
+ *
+ * Because the threads are running in such tight loops, they will call
+ * schedule() from time to time so the system stays alive.
+ *
+ * If the writers encounter an error, the test is aborted. Test results are
+ * recorded to the ftrace buffers, with some additional information also
+ * provided via printk. The test can be aborted manually by removing the
+ * module. (Ideally the test should never abort on its own.)
+ */
+
+/* not used right now */
+DECLARE_WAIT_QUEUE_HEAD(test_wait);
+
+/* test data structure */
+struct rbdata {
+ int len;
+ char text[0];
+};
+
+static char *test_running;
+static int halt_test;
+
+/* dump text or dictionary data to the trace buffers */
+static void print_record(const char *name, struct rbdata *dat, u64 seq)
+{
+ char buf[160];
+
+ snprintf(buf, sizeof(buf), "%s", dat->text);
+ buf[sizeof(buf) - 1] = 0;
+
+ trace_printk("seq=%llu len=%d %sval=%s\n",
+ seq, dat->len, name,
+ dat->len < sizeof(buf) ? buf : "<invalid>");
+}
+
+/*
+ * sequentially dump all the valid records in the ringbuffer
+ * (used to verify memory integrity)
+ *
+ * Since there is no reader interface, the internal members are
+ * directly accessed. This function is called after all writers
+ * are finished so there is no need for any memory barriers.
+ */
+static void dump_rb(struct printk_ringbuffer *rb)
+{
+ struct printk_info info;
+ struct printk_record r;
+ char text_buf[200];
+ char dict_buf[200];
+ u64 seq = 0;
+
+ r.info = &info;
+ r.text_buf = &text_buf[0];
+ r.dict_buf = &dict_buf[0];
+ r.text_buf_size = sizeof(text_buf);
+ r.dict_buf_size = sizeof(dict_buf);
+
+ trace_printk("BEGIN full dump\n");
+
+ while (prb_read_valid(rb, seq, &r)) {
+ /* check/track the sequence */
+ if (info.seq != seq)
+ trace_printk("DROPPED %llu\n", info.seq - seq);
+
+ print_record("TEXT", (struct rbdata *)&r.text_buf[0],
+ info.seq);
+ if (info.dict_len) {
+ print_record("DICT", (struct rbdata *)&r.dict_buf[0],
+ info.seq);
+ }
+
+ seq = info.seq + 1;
+ }
+
+ trace_printk("END full dump\n");
+}
+
+DECLARE_PRINTKRB(test_rb, 15, 5, 5);
+
+static int prbtest_writer(void *data)
+{
+ unsigned long num = (unsigned long)data;
+ struct prb_reserved_entry e;
+ char text_id = 'A' + num;
+ char dict_id = 'a' + num;
+ unsigned long count = 0;
+ struct printk_record r;
+ struct rbdata *dat;
+ int len;
+
+ pr_err("prbtest: start thread %03lu (writer)\n", num);
+
+ for (;;) {
+ len = sizeof(struct rbdata) + (prandom_u32() & 0x7f) + 2;
+
+ /* specify the text/dict sizes for reservation */
+ r.text_buf_size = len;
+ /* only add a dictionary on some records */
+ if (len % 2)
+ r.dict_buf_size = len;
+ else
+ r.dict_buf_size = 0;
+
+ if (prb_reserve(&e, &test_rb, &r)) {
+ len -= sizeof(struct rbdata) + 1;
+
+ dat = (struct rbdata *)&r.text_buf[0];
+ dat->len = len;
+ memset(&dat->text[0], text_id, len);
+ dat->text[len] = 0;
+
+ /* dictionary reservation is allowed to fail */
+ if (r.dict_buf) {
+ dat = (struct rbdata *)&r.dict_buf[0];
+ dat->len = len;
+ memset(&dat->text[0], dict_id, len);
+ dat->text[len] = 0;
+ } else if (r.text_buf_size % 2) {
+ trace_printk(
+ "writer%lu (%c) dict dropped: seq=%llu\n",
+ num, text_id, r.info->seq);
+ }
+
+ prb_commit(&e);
+ wake_up_interruptible(&test_wait);
+ } else {
+ WRITE_ONCE(halt_test, 1);
+ trace_printk("writer%lu (%c) reserve failed\n",
+ num, text_id);
+ }
+
+ if ((count++ & 0x3fff) == 0)
+ schedule();
+
+ if (READ_ONCE(halt_test) == 1)
+ break;
+ }
+
+ pr_err("prbtest: end thread %03lu (writer, wrote %lu)\n", num, count);
+
+ test_running[num] = 0;
+
+ return 0;
+}
+
+static bool check_data(struct rbdata *dat, u64 seq, unsigned long num)
+{
+ int len;
+
+ len = strnlen(dat->text, 160);
+
+ if (len != dat->len || len >= 160) {
+ WRITE_ONCE(halt_test, 1);
+ trace_printk("reader%lu invalid len for %llu (%d<->%d)\n",
+ num, seq, len, dat->len);
+ return false;
+ }
+
+ while (len) {
+ len--;
+ if (dat->text[len] != dat->text[0]) {
+ WRITE_ONCE(halt_test, 1);
+ trace_printk("reader%lu bad data\n", num);
+ return false;
+ }
+ }
+
+ return true;
+}
+
+static int prbtest_reader(void *data)
+{
+ unsigned long num = (unsigned long)data;
+ unsigned long total_lost = 0;
+ unsigned long max_lost = 0;
+ unsigned long count = 0;
+ struct printk_info info;
+ struct printk_record r;
+ char text_buf[200];
+ char dict_buf[200];
+ int did_sched = 1;
+ u64 seq = 0;
+
+ r.info = &info;
+ r.text_buf = &text_buf[0];
+ r.dict_buf = &dict_buf[0];
+ r.text_buf_size = sizeof(text_buf);
+ r.dict_buf_size = sizeof(dict_buf);
+
+ pr_err("prbtest: start thread %03lu (reader)\n", num);
+
+ while (!wait_event_interruptible(test_wait,
+ kthread_should_stop() ||
+ prb_read_valid(&test_rb, seq, &r))) {
+ if (kthread_should_stop())
+ break;
+ /* check/track the sequence */
+ if (info.seq < seq) {
+ WRITE_ONCE(halt_test, 1);
+ trace_printk("reader%lu invalid seq %llu -> %llu\n",
+ num, seq, info.seq);
+ break;
+ } else if (info.seq != seq && !did_sched) {
+ total_lost += info.seq - seq;
+ if (max_lost < info.seq - seq)
+ max_lost = info.seq - seq;
+ }
+
+ if (!check_data((struct rbdata *)&r.text_buf[0],
+ info.seq, num)) {
+ trace_printk("text error\n");
+ break;
+ }
+
+ if (info.dict_len) {
+ if (!check_data((struct rbdata *)&r.dict_buf[0],
+ info.seq, num)) {
+ trace_printk("dict error\n");
+ break;
+ }
+ } else if (info.text_len % 2) {
+ trace_printk("dict dropped: seq=%llu\n", info.seq);
+ }
+
+ did_sched = 0;
+ if ((count++ & 0x3fff) == 0) {
+ did_sched = 1;
+ schedule();
+ }
+
+ if (READ_ONCE(halt_test) == 1)
+ break;
+
+ seq = info.seq + 1;
+ }
+
+ pr_err(
+ "reader%lu: total_lost=%lu max_lost=%lu total_read=%lu seq=%llu\n",
+ num, total_lost, max_lost, count, info.seq);
+
+ pr_err("prbtest: end thread %03lu (reader)\n", num);
+
+ while (!kthread_should_stop())
+ msleep(1000);
+ test_running[num] = 0;
+
+ return 0;
+}
+
+static int module_test_running;
+static struct task_struct *reader_thread;
+
+static int start_test(void *arg)
+{
+ struct task_struct *thread;
+ unsigned long i;
+ int num_cpus;
+
+ num_cpus = num_online_cpus();
+ test_running = kzalloc(num_cpus, GFP_KERNEL);
+ if (!test_running)
+ return -ENOMEM;
+
+ module_test_running = 1;
+
+ pr_err("prbtest: starting test\n");
+
+ for (i = 0; i < num_cpus; i++) {
+ test_running[i] = 1;
+ if (i < num_cpus - 1) {
+ thread = kthread_run(prbtest_writer, (void *)i,
+ "prbtest writer");
+ } else {
+ thread = kthread_run(prbtest_reader, (void *)i,
+ "prbtest reader");
+ reader_thread = thread;
+ }
+ if (IS_ERR(thread)) {
+ pr_err("prbtest: unable to create thread %lu\n", i);
+ test_running[i] = 0;
+ }
+ }
+
+ for (;;) {
+ msleep(1000);
+
+ for (i = 0; i < num_cpus; i++) {
+ if (test_running[i] == 1)
+ break;
+ }
+ if (i == num_cpus)
+ break;
+ }
+
+ pr_err("prbtest: completed test\n");
+
+ dump_rb(&test_rb);
+
+ module_test_running = 0;
+
+ return 0;
+}
+
+static int prbtest_init(void)
+{
+ kthread_run(start_test, NULL, "prbtest");
+ return 0;
+}
+
+static void prbtest_exit(void)
+{
+ if (reader_thread && !IS_ERR(reader_thread))
+ kthread_stop(reader_thread);
+
+ WRITE_ONCE(halt_test, 1);
+
+ while (module_test_running)
+ msleep(1000);
+ kfree(test_running);
+}
+
+module_init(prbtest_init);
+module_exit(prbtest_exit);
+
+MODULE_AUTHOR("John Ogness <[email protected]>");
+MODULE_DESCRIPTION("printk ringbuffer test");
+MODULE_LICENSE("GPL v2");
--
2.20.1

2019-12-02 15:52:08

by Petr Mladek

[permalink] [raw]
Subject: Re: [RFC PATCH v5 1/3] printk-rb: new printk ringbuffer implementation (writer)

Hi,

I have seen few prelimitary versions before this public one.
I am either happy with it or blind to see new problems[*].

It would be great if anyone else could look at it. Especially
I am intreseted:

+ Whether the algorithm can be understood by people who
see it for the "first" time.

+ Whether there are any obvious races. I wonder if our assumtions
about atomic_cmpxchg() guaranttes are correct.


[*] I have found one theoretical ABA problem. I think that is
really rather theoretical and should not block this patch.
I think that we could do something to prevent it either
now or later. See below for more details


On Thu 2019-11-28 02:58:33, John Ogness wrote:
> Add a new lockless ringbuffer implementation to be used for printk.
> First only add support for writers. Reader support will be added in
> a follow-up commit.
>
> Signed-off-by: John Ogness <[email protected]>
> ---
> kernel/printk/printk_ringbuffer.c | 676 ++++++++++++++++++++++++++++++
> kernel/printk/printk_ringbuffer.h | 239 +++++++++++
> 2 files changed, 915 insertions(+)
> create mode 100644 kernel/printk/printk_ringbuffer.c
> create mode 100644 kernel/printk/printk_ringbuffer.h
>
> diff --git a/kernel/printk/printk_ringbuffer.c b/kernel/printk/printk_ringbuffer.c
> new file mode 100644
> index 000000000000..09c32e52fd40
> --- /dev/null
> +++ b/kernel/printk/printk_ringbuffer.c
> @@ -0,0 +1,676 @@
> +// SPDX-License-Identifier: GPL-2.0
> +
> +#include <linux/kernel.h>
> +#include <linux/irqflags.h>
> +#include <linux/string.h>
> +#include <linux/errno.h>
> +#include <linux/bug.h>
> +#include "printk_ringbuffer.h"
> +
> +/**
> + * DOC: printk_ringbuffer overview
> + *
> + * Data Structure
> + * --------------
> + * The printk_ringbuffer is made up of 3 internal ringbuffers::
> + *
> + * * desc_ring: A ring of descriptors. A descriptor contains all record
> + * meta-data (sequence number, timestamp, loglevel, etc.) as
> + * well as internal state information about the record and
> + * logical positions specifying where in the other
> + * ringbuffers the text and dictionary strings are located.
> + *
> + * * text_data_ring: A ring of data blocks. A data block consists of a 32-bit
> + * integer (ID) that maps to a desc_ring index followed by
> + * the text string of the record.
> + *
> + * * dict_data_ring: A ring of data blocks. A data block consists of a 32-bit
> + * integer (ID) that maps to a desc_ring index followed by
> + * the dictionary string of the record.
> + *

I slightly miss some top level description of the approach. Especially
how the code deals with races. The following comes to my mind:


The state of each entry is always defined by desc->state_val.
It consists of 30-bit id and 2-bit state value, see DESC_INDEX()
and get_desc_state.

prb_reserve() call returns descriptor in a reserved state. It points
to reserved data blocks in the data rings.

The algorithm starts with using never used descriptors and data
blocks. Later it uses descriptors and data blocks in the reusable
state.

The descriptors and data space are reused in a round-robin fahion.
Only records in the committed state could be moved to reusable state.

prb_reserve() has to do several steps. Namely it has to update head
and tail positions in all ring buffers. Also it has to manipulare
desc->state_val. Most races are avoided by using atomic_cmpxchg()
operations. They make sure that all values are moved from one
well defined state to another well defined state.

ABA races are avoided by using logical positions and indexes.
The rinbuffer must be reused many times before these values
overflow. Anyway, logical positions to data ring could not overflow
wildly. They are manipulated only when the descriptor is already
reserved. The descriptor could get reused only when it was commited
before. It means that there is a limited number of writers
until they would need to reuse a particular descriptor. [*]

Back to the prb_reserve() algorithm. The repeated pattern is that
it does not matter what caller invalidates oldest entries. They are
fine when it has already been done by another writer in the meantime.
The real winner is the caller that is able to move the head position
and reserve the space. Others need to go back to invalidating the oldest
entry, etc.

prb_reserve() caller has exclusive write access to the reserved
descriptor and data blocks. It has to call prb_commit() when finished.
It is a signal that the data are valid for readers. But it is also
a signal that the descriptor and the space might get reused be
other writers.


Finally, readers just need to check the state of the descriptor
before and after reading the data. The data are correct only
when the descriptor is in committed state.

[*] Hmm, the ABA problem might theoretically happen in desc_reserve(),
see below.



> +/*
> + * Sanity checker for reserve size. The ringbuffer code assumes that a data
> + * block does not exceed the maximum possible size that could fit within the
> + * ringbuffer. This function provides that basic size check so that the
> + * assumption is safe.
> + */
> +static bool data_check_size(struct prb_data_ring *data_ring, unsigned int size)
> +{
> + struct prb_data_block *db = NULL;
> +
> + /* Writers are not allowed to write data-less records. */
> + if (size == 0)
> + return false;

I would personally let this decision to the API caller.

The code actually have to support data-less records. They are used
when the descriptor is reserved but the data block can't get reserved.

The above statement might create false feeling that it could not
happen later in the code. It might lead to bugs in writer code.
Also reader API users might not expect to get a "valid" data-less
record.

> + /*
> + * Ensure the alignment padded size could possibly fit in the data
> + * array. The largest possible data block must still leave room for
> + * at least the ID of the next block.
> + */
> + size = to_blk_size(size);
> + if (size > DATA_SIZE(data_ring) - sizeof(db->id))
> + return false;
> +
> + return true;
> +}
> +
> +/* Reserve a new descriptor, invalidating the oldest if necessary. */
> +static bool desc_reserve(struct printk_ringbuffer *rb, u32 *id_out)
> +{
> + struct prb_desc_ring *desc_ring = &rb->desc_ring;
> + struct prb_desc *desc;
> + u32 id_prev_wrap;
> + u32 head_id;
> + u32 id;
> +
> + head_id = atomic_read(&desc_ring->head_id);
> +
> + do {
> + desc = to_desc(desc_ring, head_id);
> +
> + id = DESC_ID(head_id + 1);
> + id_prev_wrap = DESC_ID_PREV_WRAP(desc_ring, id);
> +
> + if (id_prev_wrap == atomic_read(&desc_ring->tail_id)) {
> + if (!desc_push_tail(rb, id_prev_wrap))
> + return false;
> + }
> + } while (!atomic_try_cmpxchg(&desc_ring->head_id, &head_id, id));

Hmm, in theory, ABA problem might cause that we successfully
move desc_ring->head_id when tail has not been pushed yet.

As a result we would never call desc_push_tail() until
it overflows again.

I am not sure if we need to take care of it. The code is called with
interrupts disabled. IMHO, only NMI could cause ABA problem
in reality. But the game (debugging) is lost anyway when NMI ovewrites
the buffer several times.

Also it should not be a complete failure. We still could bail out when
the previous state was not reusable. We are on the safe side
when it was reusable.

Also we could probably detect when desc_ring->tail_id is not
updated any longer and do something about it.

> +
> + desc = to_desc(desc_ring, id);
> +
> + /* Set the descriptor's ID and also set its state to reserved. */
> + atomic_set(&desc->state_var, id | 0);

We should check here that the original state id from the previous
wrap in reusable state (or 0 for bootstrap). On error, we know that
there was the ABA problem and would need to do something about it.

> +
> + /* Store new ID/state before making any other changes. */
> + smp_wmb(); /* LMM_TAG(desc_reserve:A) */
> +
> + *id_out = id;
> + return true;
> +}
> +
> +/* Determine the end of a data block. */
> +static unsigned long get_next_lpos(struct prb_data_ring *data_ring,
> + unsigned long lpos, unsigned int size)
> +{
> + unsigned long begin_lpos;
> + unsigned long next_lpos;
> +
> + begin_lpos = lpos;
> + next_lpos = lpos + size;
> +
> + if (DATA_WRAPS(data_ring, begin_lpos) ==
> + DATA_WRAPS(data_ring, next_lpos)) {
> + /* The data block does not wrap. */
> + return next_lpos;
> + }
> +
> + /* Wrapping data blocks store their data at the beginning. */
> + return (DATA_THIS_WRAP_START_LPOS(data_ring, next_lpos) + size);
> +}
> +
> +/**
> + * prb_reserve() - Reserve space in the ringbuffer.
> + *
> + * @e: The entry structure to setup.
> + * @rb: The ringbuffer to reserve data in.
> + * @r: The record structure to allocate buffers for.
> + *
> + * This is the public function available to writers to reserve data.
> + *
> + * The writer specifies the text and dict sizes to reserve by setting the
> + * @text_buf_size and @dict_buf_size fields of @r, respectively. Dictionaries
> + * are optional, so @dict_buf_size is allowed to be 0.
> + *
> + * Context: Any context. Disables local interrupts on success.
> + * Return: true if at least text data could be allocated, otherwise false.
> + *
> + * On success, the fields @info, @text_buf, @dict_buf of @r will be set by
> + * this function and should be filled in by the writer before committing.
> + *
> + * If the function fails to reserve dictionary space (but all else succeeded),
> + * it will still report success. In that case @dict_buf is set to NULL and
> + * @dict_buf_size is set to 0. Writers must check this before writing to
> + * dictionary space.
> + */
> +bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
> + struct printk_record *r)
> +{
> + struct prb_desc_ring *desc_ring = &rb->desc_ring;
> + struct prb_desc *d;
> + u32 id;
> +
> + if (!data_check_size(&rb->text_data_ring, r->text_buf_size))
> + return false;
> +
> + /* Records without dictionaries are allowed. */
> + if (r->dict_buf_size) {
> + if (!data_check_size(&rb->dict_data_ring, r->dict_buf_size))
> + return false;
> + }
> +
> + /* Disable interrupts during the reserve/commit window. */
> + local_irq_save(e->irqflags);
> +
> + if (!desc_reserve(rb, &id)) {
> + /* Descriptor reservation failures are tracked. */
> + atomic_long_inc(&rb->fail);
> + local_irq_restore(e->irqflags);
> + return false;
> + }
> +
> + d = to_desc(desc_ring, id);
> +
> + /*
> + * Set the @e fields here so that prb_commit() can be used if
> + * text data allocation fails.
> + */
> + e->rb = rb;
> + e->id = id;
> +
> + /*
> + * Initialize the sequence number if it has never been set.
> + * Otherwise just increment it by a full wrap.
> + *
> + * @seq is considered "never been set" if it has a value of 0,
> + * _except_ for descs[0], which was set by the ringbuffer initializer
> + * and therefore is always considered as set.
> + *
> + * See the "Bootstrap" comment block in printk_ringbuffer.h for
> + * details about how the initializer bootstraps the descriptors.
> + */
> + if (d->info.seq == 0 && DESC_INDEX(desc_ring, id) != 0)
> + d->info.seq = DESC_INDEX(desc_ring, id);
> + else
> + d->info.seq += DESCS_COUNT(desc_ring);
> +
> + r->text_buf = data_alloc(rb, &rb->text_data_ring, r->text_buf_size,
> + &d->text_blk_lpos, id);
> + /* If text data allocation fails, a data-less record is committed. */
> + if (r->text_buf_size && !r->text_buf) {
> + d->info.text_len = 0;
> + d->info.dict_len = 0;

I would prefer to clear all four variables at the beginning:

r->text_buf = NULL;
r->dict_buf = NULL;
d->info.text_len = 0;
d->info.dict_len = 0;

So that we never return an outdated/random pointer back.

I might be too careful. But I think that it is a good price for
a more safe API. Especially that broken printk() is not easy
to debug.

Best Regards,
Petr

> + prb_commit(e);
> + return false;
> + }
> +
> + r->dict_buf = data_alloc(rb, &rb->dict_data_ring, r->dict_buf_size,
> + &d->dict_blk_lpos, id);
> + /*
> + * If dict data allocation fails, the caller can still commit
> + * text. But dictionary information will not be available.
> + */
> + if (r->dict_buf_size && !r->dict_buf)
> + r->dict_buf_size = 0;
> +
> + r->info = &d->info;
> +
> + /* Set default values for the lengths. */
> + d->info.text_len = r->text_buf_size;
> + d->info.dict_len = r->dict_buf_size;
> +
> + return true;
> +}
> +EXPORT_SYMBOL(prb_reserve);
> +

2019-12-02 16:03:25

by Petr Mladek

[permalink] [raw]
Subject: Re: [RFC PATCH v5 1/3] printk-rb: new printk ringbuffer implementation (writer)

On Mon 2019-12-02 16:48:41, Petr Mladek wrote:
> > +/* Reserve a new descriptor, invalidating the oldest if necessary. */
> > +static bool desc_reserve(struct printk_ringbuffer *rb, u32 *id_out)
> > +{
> > + struct prb_desc_ring *desc_ring = &rb->desc_ring;
> > + struct prb_desc *desc;
> > + u32 id_prev_wrap;
> > + u32 head_id;
> > + u32 id;
> > +
> > + head_id = atomic_read(&desc_ring->head_id);
> > +
> > + do {
> > + desc = to_desc(desc_ring, head_id);
> > +
> > + id = DESC_ID(head_id + 1);
> > + id_prev_wrap = DESC_ID_PREV_WRAP(desc_ring, id);
> > +
> > + if (id_prev_wrap == atomic_read(&desc_ring->tail_id)) {
> > + if (!desc_push_tail(rb, id_prev_wrap))
> > + return false;
> > + }
> > + } while (!atomic_try_cmpxchg(&desc_ring->head_id, &head_id, id));
>
> Hmm, in theory, ABA problem might cause that we successfully
> move desc_ring->head_id when tail has not been pushed yet.
>
> As a result we would never call desc_push_tail() until
> it overflows again.
>
> I am not sure if we need to take care of it. The code is called with
> interrupts disabled. IMHO, only NMI could cause ABA problem
> in reality. But the game (debugging) is lost anyway when NMI ovewrites
> the buffer several times.

BTW: If I am counting correctly. The ABA problem would happen when
exactly 2^30 (1G) messages is written in the mean time.

> Also it should not be a complete failure. We still could bail out when
> the previous state was not reusable. We are on the safe side
> when it was reusable.
>
> Also we could probably detect when desc_ring->tail_id is not
> updated any longer and do something about it.
>
> > +
> > + desc = to_desc(desc_ring, id);
> > +
> > + /* Set the descriptor's ID and also set its state to reserved. */
> > + atomic_set(&desc->state_var, id | 0);
>
> We should check here that the original state id from the previous
> wrap in reusable state (or 0 for bootstrap). On error, we know that
> there was the ABA problem and would need to do something about it.

I believe that it should be enough to add this check and
bail out in case of error.

Best Regards,
Petr

2019-12-02 16:42:42

by John Ogness

[permalink] [raw]
Subject: Re: [RFC PATCH v5 1/3] printk-rb: new printk ringbuffer implementation (writer)

On 2019-12-02, Petr Mladek <[email protected]> wrote:
>> > +/* Reserve a new descriptor, invalidating the oldest if necessary. */
>> > +static bool desc_reserve(struct printk_ringbuffer *rb, u32 *id_out)
>> > +{
>> > + struct prb_desc_ring *desc_ring = &rb->desc_ring;
>> > + struct prb_desc *desc;
>> > + u32 id_prev_wrap;
>> > + u32 head_id;
>> > + u32 id;
>> > +
>> > + head_id = atomic_read(&desc_ring->head_id);
>> > +
>> > + do {
>> > + desc = to_desc(desc_ring, head_id);
>> > +
>> > + id = DESC_ID(head_id + 1);
>> > + id_prev_wrap = DESC_ID_PREV_WRAP(desc_ring, id);
>> > +
>> > + if (id_prev_wrap == atomic_read(&desc_ring->tail_id)) {
>> > + if (!desc_push_tail(rb, id_prev_wrap))
>> > + return false;
>> > + }
>> > + } while (!atomic_try_cmpxchg(&desc_ring->head_id, &head_id, id));
>>
>> Hmm, in theory, ABA problem might cause that we successfully
>> move desc_ring->head_id when tail has not been pushed yet.
>>
>> As a result we would never call desc_push_tail() until
>> it overflows again.
>>
>> I am not sure if we need to take care of it. The code is called with
>> interrupts disabled. IMHO, only NMI could cause ABA problem
>> in reality. But the game (debugging) is lost anyway when NMI ovewrites
>> the buffer several times.
>
> BTW: If I am counting correctly. The ABA problem would happen when
> exactly 2^30 (1G) messages is written in the mean time.

All the ringbuffer code assumes that the use of index numbers handles
the ABA problem (i.e. there must not be 1 billion printk's within an
NMI). If we want to support 1 billion+ printk's within an NMI, then
perhaps the index number should be increased. For 64-bit systems it
would be no problem to go to 62 bits. For 32-bit systems, I don't know
how well the 64-bit atomic operations are supported.

>> Also it should not be a complete failure. We still could bail out when
>> the previous state was not reusable. We are on the safe side
>> when it was reusable.
>>
>> Also we could probably detect when desc_ring->tail_id is not
>> updated any longer and do something about it.
>>
>> > +
>> > + desc = to_desc(desc_ring, id);
>> > +
>> > + /* Set the descriptor's ID and also set its state to reserved. */
>> > + atomic_set(&desc->state_var, id | 0);
>>
>> We should check here that the original state id from the previous
>> wrap in reusable state (or 0 for bootstrap). On error, we know that
>> there was the ABA problem and would need to do something about it.
>
> I believe that it should be enough to add this check and
> bail out in case of error.

I need to go through the code again in detail and see how many locations
are affected by ABA. All the code was written with the assumption that
this type of ABA will not happen.

As you've stated, we could add minimal handling so that the ringbuffer
at least does not break or get stuck.

BTW: The same assumption is made for logical positions. There are 4
times as many of these (on 32-bit systems) but logical positions advance
much faster. I will review these as well.

John Ogness

2019-12-03 01:18:17

by Sergey Senozhatsky

[permalink] [raw]
Subject: Re: [RFC PATCH v5 1/3] printk-rb: new printk ringbuffer implementation (writer)

On (19/12/02 17:37), John Ogness wrote:
> On 2019-12-02, Petr Mladek <[email protected]> wrote:
> >> > +/* Reserve a new descriptor, invalidating the oldest if necessary. */
> >> > +static bool desc_reserve(struct printk_ringbuffer *rb, u32 *id_out)
> >> > +{
> >> > + struct prb_desc_ring *desc_ring = &rb->desc_ring;
> >> > + struct prb_desc *desc;
> >> > + u32 id_prev_wrap;
> >> > + u32 head_id;
> >> > + u32 id;
> >> > +
> >> > + head_id = atomic_read(&desc_ring->head_id);
> >> > +
> >> > + do {
> >> > + desc = to_desc(desc_ring, head_id);
> >> > +
> >> > + id = DESC_ID(head_id + 1);
> >> > + id_prev_wrap = DESC_ID_PREV_WRAP(desc_ring, id);
> >> > +
> >> > + if (id_prev_wrap == atomic_read(&desc_ring->tail_id)) {
> >> > + if (!desc_push_tail(rb, id_prev_wrap))
> >> > + return false;
> >> > + }
> >> > + } while (!atomic_try_cmpxchg(&desc_ring->head_id, &head_id, id));
> >>
> >> Hmm, in theory, ABA problem might cause that we successfully
> >> move desc_ring->head_id when tail has not been pushed yet.
> >>
> >> As a result we would never call desc_push_tail() until
> >> it overflows again.
> >>
> >> I am not sure if we need to take care of it. The code is called with
> >> interrupts disabled. IMHO, only NMI could cause ABA problem
> >> in reality. But the game (debugging) is lost anyway when NMI ovewrites
> >> the buffer several times.
> >
> > BTW: If I am counting correctly. The ABA problem would happen when
> > exactly 2^30 (1G) messages is written in the mean time.
>
> All the ringbuffer code assumes that the use of index numbers handles
> the ABA problem (i.e. there must not be 1 billion printk's within an
> NMI). If we want to support 1 billion+ printk's within an NMI, then
> perhaps the index number should be increased. For 64-bit systems it
> would be no problem to go to 62 bits. For 32-bit systems, I don't know
> how well the 64-bit atomic operations are supported.

ftrace dumps from NMI (DUMP_ALL type ftrace_dump_on_oops on a $BIG
machine)? 1G seems large enough, but who knows.

-ss

2019-12-03 08:55:10

by Petr Mladek

[permalink] [raw]
Subject: Re: [RFC PATCH v5 1/3] printk-rb: new printk ringbuffer implementation (writer)

On Mon 2019-12-02 17:37:26, John Ogness wrote:
> On 2019-12-02, Petr Mladek <[email protected]> wrote:
> >> > +/* Reserve a new descriptor, invalidating the oldest if necessary. */
> >> > +static bool desc_reserve(struct printk_ringbuffer *rb, u32 *id_out)
> >> > +{
> >> > + struct prb_desc_ring *desc_ring = &rb->desc_ring;
> >> > + struct prb_desc *desc;
> >> > + u32 id_prev_wrap;
> >> > + u32 head_id;
> >> > + u32 id;
> >> > +
> >> > + head_id = atomic_read(&desc_ring->head_id);
> >> > +
> >> > + do {
> >> > + desc = to_desc(desc_ring, head_id);
> >> > +
> >> > + id = DESC_ID(head_id + 1);
> >> > + id_prev_wrap = DESC_ID_PREV_WRAP(desc_ring, id);
> >> > +
> >> > + if (id_prev_wrap == atomic_read(&desc_ring->tail_id)) {
> >> > + if (!desc_push_tail(rb, id_prev_wrap))
> >> > + return false;
> >> > + }
> >> > + } while (!atomic_try_cmpxchg(&desc_ring->head_id, &head_id, id));
> >>
> >> Hmm, in theory, ABA problem might cause that we successfully
> >> move desc_ring->head_id when tail has not been pushed yet.
> >>
> >> As a result we would never call desc_push_tail() until
> >> it overflows again.
> >>
> >> I am not sure if we need to take care of it. The code is called with
> >> interrupts disabled. IMHO, only NMI could cause ABA problem
> >> in reality. But the game (debugging) is lost anyway when NMI ovewrites
> >> the buffer several times.
> >
> > BTW: If I am counting correctly. The ABA problem would happen when
> > exactly 2^30 (1G) messages is written in the mean time.
>
> All the ringbuffer code assumes that the use of index numbers handles
> the ABA problem (i.e. there must not be 1 billion printk's within an
> NMI). If we want to support 1 billion+ printk's within an NMI, then
> perhaps the index number should be increased. For 64-bit systems it
> would be no problem to go to 62 bits. For 32-bit systems, I don't know
> how well the 64-bit atomic operations are supported.

I am not super happy but I really think that it does not make sense
to complicate the code too much because of this theoretical race.

1 billion printks in NMI is crazy. Also the race is a problem only
when we hit exactly the 2^30 message. If we hit 2^30 + 1 and more
than everything is fine again.

In the worst case, printk() will stop working. I think that there
are other situations that are much more likely when printk() will
not work (people will not see the messages).


> >> Also it should not be a complete failure. We still could bail out when
> >> the previous state was not reusable. We are on the safe side
> >> when it was reusable.
> >>
> >> Also we could probably detect when desc_ring->tail_id is not
> >> updated any longer and do something about it.
> >>
> >> > +
> >> > + desc = to_desc(desc_ring, id);
> >> > +
> >> > + /* Set the descriptor's ID and also set its state to reserved. */
> >> > + atomic_set(&desc->state_var, id | 0);
> >>
> >> We should check here that the original state id from the previous
> >> wrap in reusable state (or 0 for bootstrap). On error, we know that
> >> there was the ABA problem and would need to do something about it.
> >
> > I believe that it should be enough to add this check and
> > bail out in case of error.
>
> I need to go through the code again in detail and see how many locations
> are affected by ABA. All the code was written with the assumption that
> this type of ABA will not happen.
>
> As you've stated, we could add minimal handling so that the ringbuffer
> at least does not break or get stuck.
>
> BTW: The same assumption is made for logical positions. There are 4
> times as many of these (on 32-bit systems) but logical positions advance
> much faster. I will review these as well.

The logical positions are assigned only when a descriptor is reserved.
Such a descriptor could not be reused until committed and marked
reusable. It limits the logical position movement to:

max_record_size * number_of_descriptors

Printk records are limited to 1k. So we could safely support
up to 1 million fully sized lines printed when NMI interrupted
printk() on one CPU.

The most important thing is that it must not crash the system.
It would be nice if we are able to detect this situation and
do something about it. But IMHO, it is perfectly fine when
printk() would stop working in this corner case.

The only problem is that it might be hard to debug. But it
should be possible with crashdump. And I think that we will
first hit some other bugs that we do not see at the moment ;-)

Best Regards,
Petr

2019-12-03 12:08:36

by Petr Mladek

[permalink] [raw]
Subject: Re: [RFC PATCH v5 2/3] printk-rb: new printk ringbuffer implementation (reader)

On Thu 2019-11-28 02:58:34, John Ogness wrote:
> Add the reader implementation for the new ringbuffer.
>
> Signed-off-by: John Ogness <[email protected]>
> ---
> kernel/printk/printk_ringbuffer.c | 234 ++++++++++++++++++++++++++++++
> kernel/printk/printk_ringbuffer.h | 12 +-
> 2 files changed, 245 insertions(+), 1 deletion(-)
>
> diff --git a/kernel/printk/printk_ringbuffer.c b/kernel/printk/printk_ringbuffer.c
> index 09c32e52fd40..f85762713583 100644
> --- a/kernel/printk/printk_ringbuffer.c
> +++ b/kernel/printk/printk_ringbuffer.c
> @@ -674,3 +674,237 @@ void prb_commit(struct prb_reserved_entry *e)
> local_irq_restore(e->irqflags);
> }
> EXPORT_SYMBOL(prb_commit);
> +
> +/*
> + * Given @blk_lpos, return a pointer to the raw data from the data block
> + * and calculate the size of the data part. A NULL pointer is returned
> + * if @blk_lpos specifies values that could never be legal.
> + *
> + * This function (used by readers) performs strict validation on the lpos
> + * values to possibly detect bugs in the writer code. A WARN_ON_ONCE() is
> + * triggered if an internal error is detected.
> + */
> +static char *get_data(struct prb_data_ring *data_ring,
> + struct prb_data_blk_lpos *blk_lpos,
> + unsigned long *data_size)
> +{
> + struct prb_data_block *db;
> +
> + if (blk_lpos->begin == INVALID_LPOS &&
> + blk_lpos->next == INVALID_LPOS) {
> + /* descriptor without a data block */
> + return NULL;
> + } else if (DATA_WRAPS(data_ring, blk_lpos->begin) ==
> + DATA_WRAPS(data_ring, blk_lpos->next)) {
> + /* regular data block */
> + if (WARN_ON_ONCE(blk_lpos->next <= blk_lpos->begin))
> + return NULL;
> + db = to_block(data_ring, blk_lpos->begin);
> + *data_size = blk_lpos->next - blk_lpos->begin;
> +
> + } else if ((DATA_WRAPS(data_ring, blk_lpos->begin) + 1 ==
> + DATA_WRAPS(data_ring, blk_lpos->next)) ||
> + ((DATA_WRAPS(data_ring, blk_lpos->begin) ==
> + DATA_WRAPS(data_ring, -1UL)) &&
> + (DATA_WRAPS(data_ring, blk_lpos->next) == 0))) {

I am a bit confused. I would expect that (-1UL + 1) = 0. So the second
condition after || looks just like a special variant of the first
valid condition.

Or do I miss anything? Is there a problems with type casting?


> + /* wrapping data block */
> + db = to_block(data_ring, 0);
> + *data_size = DATA_INDEX(data_ring, blk_lpos->next);
> +
> + } else {
> + WARN_ON_ONCE(1);
> + return NULL;
> + }
> +
> + /* A valid data block will always be aligned to the ID size. */
> + if (WARN_ON_ONCE(blk_lpos->begin !=
> + ALIGN(blk_lpos->begin, sizeof(db->id))) ||
> + WARN_ON_ONCE(blk_lpos->next !=
> + ALIGN(blk_lpos->next, sizeof(db->id)))) {
> + return NULL;
> + }
> +
> + /* A valid data block will always have at least an ID. */
> + if (WARN_ON_ONCE(*data_size < sizeof(db->id)))
> + return NULL;
> +
> + /* Subtract descriptor ID space from size. */
> + *data_size -= sizeof(db->id);
> +
> + return &db->data[0];
> +}
> +
> +/* Given @blk_lpos, copy an expected @len of data into the provided buffer. */
> +static bool copy_data(struct prb_data_ring *data_ring,
> + struct prb_data_blk_lpos *blk_lpos, u16 len, char *buf,
> + unsigned int buf_size)
> +{
> + unsigned long data_size;
> + char *data;
> +
> + /* Caller might not want the data. */
> + if (!buf || !buf_size)
> + return true;
> +
> + data = get_data(data_ring, blk_lpos, &data_size);
> + if (!data)
> + return false;
> +
> + /* Actual cannot be less than expected. */
> + if (WARN_ON_ONCE(data_size < len))
> + return false;

I do not have a good feeling that the record gets lost here.

I could imagine that a writer would reserve more space than
needed in the end. Then it would want to modify desc.info.text_len
and could do a mistake.

By other words, I would expect a bug on the writer side here.
And I would try to preserve the data by calling:

pr_warn_once("Wrong data_size (%lu) for data: %.*s\n", data_size,
data_size, data);

Well, I do not resist on it. WARN_ON_ONCE() is fine as well.

> +
> + data_size = min_t(u16, buf_size, len);
> +
> + if (!WARN_ON_ONCE(!data_size))
> + memcpy(&buf[0], data, data_size);
> + return true;
> +}
> +

Otherwise it looks good to me. I wonder how the conversion
of the printk.c code will look with this API.

Best Regards,
Petr

2019-12-03 13:47:06

by John Ogness

[permalink] [raw]
Subject: Re: [RFC PATCH v5 2/3] printk-rb: new printk ringbuffer implementation (reader)

On 2019-12-03, Petr Mladek <[email protected]> wrote:
>> Add the reader implementation for the new ringbuffer.
>>
>> Signed-off-by: John Ogness <[email protected]>
>> ---
>> kernel/printk/printk_ringbuffer.c | 234 ++++++++++++++++++++++++++++++
>> kernel/printk/printk_ringbuffer.h | 12 +-
>> 2 files changed, 245 insertions(+), 1 deletion(-)
>>
>> diff --git a/kernel/printk/printk_ringbuffer.c b/kernel/printk/printk_ringbuffer.c
>> index 09c32e52fd40..f85762713583 100644
>> --- a/kernel/printk/printk_ringbuffer.c
>> +++ b/kernel/printk/printk_ringbuffer.c
>> @@ -674,3 +674,237 @@ void prb_commit(struct prb_reserved_entry *e)
>> local_irq_restore(e->irqflags);
>> }
>> EXPORT_SYMBOL(prb_commit);
>> +
>> +/*
>> + * Given @blk_lpos, return a pointer to the raw data from the data block
>> + * and calculate the size of the data part. A NULL pointer is returned
>> + * if @blk_lpos specifies values that could never be legal.
>> + *
>> + * This function (used by readers) performs strict validation on the lpos
>> + * values to possibly detect bugs in the writer code. A WARN_ON_ONCE() is
>> + * triggered if an internal error is detected.
>> + */
>> +static char *get_data(struct prb_data_ring *data_ring,
>> + struct prb_data_blk_lpos *blk_lpos,
>> + unsigned long *data_size)
>> +{
>> + struct prb_data_block *db;
>> +
>> + if (blk_lpos->begin == INVALID_LPOS &&
>> + blk_lpos->next == INVALID_LPOS) {
>> + /* descriptor without a data block */
>> + return NULL;
>> + } else if (DATA_WRAPS(data_ring, blk_lpos->begin) ==
>> + DATA_WRAPS(data_ring, blk_lpos->next)) {
>> + /* regular data block */
>> + if (WARN_ON_ONCE(blk_lpos->next <= blk_lpos->begin))
>> + return NULL;
>> + db = to_block(data_ring, blk_lpos->begin);
>> + *data_size = blk_lpos->next - blk_lpos->begin;
>> +
>> + } else if ((DATA_WRAPS(data_ring, blk_lpos->begin) + 1 ==
>> + DATA_WRAPS(data_ring, blk_lpos->next)) ||
>> + ((DATA_WRAPS(data_ring, blk_lpos->begin) ==
>> + DATA_WRAPS(data_ring, -1UL)) &&
>> + (DATA_WRAPS(data_ring, blk_lpos->next) == 0))) {
>
> I am a bit confused. I would expect that (-1UL + 1) = 0. So the second
> condition after || looks just like a special variant of the first
> valid condition.
>
> Or do I miss anything? Is there a problems with type casting?

Sorry, this code deserves a comment.

Here we are only comparing the number of wraps. For a wrapping data
block, @begin will be 1 wrap less than @next. The first part of the
check is checking the typical case, making sure that:

1 + WRAPS(@begin) == WRAPS(@next)

There is also the case when the lpos overflows. In that case the number
of wraps starts over at zero (without having overflowed). (Note: The
lpos overflows, _not_ the number of wraps. This is why the first check
is not enough.) In this case, the number of wraps of the highest
possible lpos value (-1UL) should be the same as the number of wraps of
@begin. And the number of wraps of @next should be 0. The simplified
pseudo-code check is:

WRAPS(@begin) == WRAPS(-1UL)
&&
WRAPS(@next) == 0

>> + /* wrapping data block */
>> + db = to_block(data_ring, 0);
>> + *data_size = DATA_INDEX(data_ring, blk_lpos->next);
>> +
>> + } else {
>> + WARN_ON_ONCE(1);
>> + return NULL;
>> + }
>> +
>> + /* A valid data block will always be aligned to the ID size. */
>> + if (WARN_ON_ONCE(blk_lpos->begin !=
>> + ALIGN(blk_lpos->begin, sizeof(db->id))) ||
>> + WARN_ON_ONCE(blk_lpos->next !=
>> + ALIGN(blk_lpos->next, sizeof(db->id)))) {
>> + return NULL;
>> + }
>> +
>> + /* A valid data block will always have at least an ID. */
>> + if (WARN_ON_ONCE(*data_size < sizeof(db->id)))
>> + return NULL;
>> +
>> + /* Subtract descriptor ID space from size. */
>> + *data_size -= sizeof(db->id);
>> +
>> + return &db->data[0];
>> +}
>> +
>> +/* Given @blk_lpos, copy an expected @len of data into the provided buffer. */
>> +static bool copy_data(struct prb_data_ring *data_ring,
>> + struct prb_data_blk_lpos *blk_lpos, u16 len, char *buf,
>> + unsigned int buf_size)
>> +{
>> + unsigned long data_size;
>> + char *data;
>> +
>> + /* Caller might not want the data. */
>> + if (!buf || !buf_size)
>> + return true;
>> +
>> + data = get_data(data_ring, blk_lpos, &data_size);
>> + if (!data)
>> + return false;
>> +
>> + /* Actual cannot be less than expected. */
>> + if (WARN_ON_ONCE(data_size < len))
>> + return false;
>
> I do not have a good feeling that the record gets lost here.
>
> I could imagine that a writer would reserve more space than
> needed in the end. Then it would want to modify desc.info.text_len
> and could do a mistake.
>
> By other words, I would expect a bug on the writer side here.
> And I would try to preserve the data by calling:
>
> pr_warn_once("Wrong data_size (%lu) for data: %.*s\n", data_size,
> data_size, data);
>
> Well, I do not resist on it. WARN_ON_ONCE() is fine as well.

Since readers will run in their own kthread, the WARN_ON_ONCE() will not
be sufficient to identify the bug. Attempting to print the bad string
would help. (Although I expect we will not hit these WARN_ON's since we
are the ones implementing printk.)

John Ogness

2019-12-03 14:16:38

by John Ogness

[permalink] [raw]
Subject: Re: [RFC PATCH v5 1/3] printk-rb: new printk ringbuffer implementation (writer)

On 2019-12-02, Petr Mladek <[email protected]> wrote:
>> +/*
>> + * Sanity checker for reserve size. The ringbuffer code assumes that a data
>> + * block does not exceed the maximum possible size that could fit within the
>> + * ringbuffer. This function provides that basic size check so that the
>> + * assumption is safe.
>> + */
>> +static bool data_check_size(struct prb_data_ring *data_ring, unsigned int size)
>> +{
>> + struct prb_data_block *db = NULL;
>> +
>> + /* Writers are not allowed to write data-less records. */
>> + if (size == 0)
>> + return false;
>
> I would personally let this decision to the API caller.
>
> The code actually have to support data-less records. They are used
> when the descriptor is reserved but the data block can't get reserved.

Exactly. Data-less records are how the ringbuffer identifies that data
has been lost. If users were allowed to store data-less records, that
destinction is no longer possible (unless I created some extra field in
the descriptor). Does it even make sense for printk to store data-less
records explicitly?

The current printk implementation silently ignores empty messages.

> The above statement might create false feeling that it could not
> happen later in the code. It might lead to bugs in writer code.

Then let me change the statement to describe that data-less records are
used internally by the ringbuffer and cannot be explicitly created by
writers.

> Also reader API users might not expect to get a "valid" data-less
> record.

Readers will never see them. The reader API implementation skips over
data-less records.

John Ogness

2019-12-03 14:20:04

by Steven Rostedt

[permalink] [raw]
Subject: Re: [RFC PATCH v5 1/3] printk-rb: new printk ringbuffer implementation (writer)

On Tue, 3 Dec 2019 10:17:21 +0900
Sergey Senozhatsky <[email protected]> wrote:

> > > BTW: If I am counting correctly. The ABA problem would happen when
> > > exactly 2^30 (1G) messages is written in the mean time.
> >
> > All the ringbuffer code assumes that the use of index numbers handles
> > the ABA problem (i.e. there must not be 1 billion printk's within an
> > NMI). If we want to support 1 billion+ printk's within an NMI, then
> > perhaps the index number should be increased. For 64-bit systems it
> > would be no problem to go to 62 bits. For 32-bit systems, I don't know
> > how well the 64-bit atomic operations are supported.
>
> ftrace dumps from NMI (DUMP_ALL type ftrace_dump_on_oops on a $BIG
> machine)? 1G seems large enough, but who knows.

ftrace dump from NMI is the most likely case to hit this, but when that
happens, you are in debugging mode, and the system usually becomes
unreliable at this moment. I agree with Petr, that we should not
complicate the code more to handle this theoretical condition.

-- Steve

2019-12-03 14:37:36

by Petr Mladek

[permalink] [raw]
Subject: Re: [RFC PATCH v5 1/3] printk-rb: new printk ringbuffer implementation (writer)

On Tue 2019-12-03 15:13:36, John Ogness wrote:
> On 2019-12-02, Petr Mladek <[email protected]> wrote:
> >> +/*
> >> + * Sanity checker for reserve size. The ringbuffer code assumes that a data
> >> + * block does not exceed the maximum possible size that could fit within the
> >> + * ringbuffer. This function provides that basic size check so that the
> >> + * assumption is safe.
> >> + */
> >> +static bool data_check_size(struct prb_data_ring *data_ring, unsigned int size)
> >> +{
> >> + struct prb_data_block *db = NULL;
> >> +
> >> + /* Writers are not allowed to write data-less records. */
> >> + if (size == 0)
> >> + return false;
> >
> > I would personally let this decision to the API caller.
> >
> > The code actually have to support data-less records. They are used
> > when the descriptor is reserved but the data block can't get reserved.
>
> Exactly. Data-less records are how the ringbuffer identifies that data
> has been lost. If users were allowed to store data-less records, that
> destinction is no longer possible (unless I created some extra field in
> the descriptor). Does it even make sense for printk to store data-less
> records explicitly?

From my POV, it does not make much sense.

> The current printk implementation silently ignores empty messages.

I am not able to find it. I only see that empty continuous framgments
are ignored in log_output(). Normal empty lines seems to be strored.

Well, I can't see any usecase for this. I think that we could ignore
all empty strings.


> > The above statement might create false feeling that it could not
> > happen later in the code. It might lead to bugs in writer code.
>
> Then let me change the statement to describe that data-less records are
> used internally by the ringbuffer and cannot be explicitly created by
> writers.

Sounds godo to me.

> > Also reader API users might not expect to get a "valid" data-less
> > record.
>
> Readers will never see them. The reader API implementation skips over
> data-less records.

Yeah. They will see bump in the seq number. We would need to
distinguish empty records and lost records as you wrote above.
It looks better to ignore empty records already during write.

Best Regards,
Petr

2019-12-04 12:55:40

by Petr Mladek

[permalink] [raw]
Subject: Re: [RFC PATCH v5 2/3] printk-rb: new printk ringbuffer implementation (reader)

On Tue 2019-12-03 14:46:07, John Ogness wrote:
> On 2019-12-03, Petr Mladek <[email protected]> wrote:
> >> Add the reader implementation for the new ringbuffer.
> >>
> >> Signed-off-by: John Ogness <[email protected]>
> >> ---
> >> kernel/printk/printk_ringbuffer.c | 234 ++++++++++++++++++++++++++++++
> >> kernel/printk/printk_ringbuffer.h | 12 +-
> >> 2 files changed, 245 insertions(+), 1 deletion(-)
> >>
> >> diff --git a/kernel/printk/printk_ringbuffer.c b/kernel/printk/printk_ringbuffer.c
> >> index 09c32e52fd40..f85762713583 100644
> >> --- a/kernel/printk/printk_ringbuffer.c
> >> +++ b/kernel/printk/printk_ringbuffer.c
> >> @@ -674,3 +674,237 @@ void prb_commit(struct prb_reserved_entry *e)
> >> local_irq_restore(e->irqflags);
> >> }
> >> EXPORT_SYMBOL(prb_commit);
> >> +
> >> +/*
> >> + * Given @blk_lpos, return a pointer to the raw data from the data block
> >> + * and calculate the size of the data part. A NULL pointer is returned
> >> + * if @blk_lpos specifies values that could never be legal.
> >> + *
> >> + * This function (used by readers) performs strict validation on the lpos
> >> + * values to possibly detect bugs in the writer code. A WARN_ON_ONCE() is
> >> + * triggered if an internal error is detected.
> >> + */
> >> +static char *get_data(struct prb_data_ring *data_ring,
> >> + struct prb_data_blk_lpos *blk_lpos,
> >> + unsigned long *data_size)
> >> +{
> >> + struct prb_data_block *db;
> >> +
> >> + if (blk_lpos->begin == INVALID_LPOS &&
> >> + blk_lpos->next == INVALID_LPOS) {
> >> + /* descriptor without a data block */
> >> + return NULL;
> >> + } else if (DATA_WRAPS(data_ring, blk_lpos->begin) ==
> >> + DATA_WRAPS(data_ring, blk_lpos->next)) {
> >> + /* regular data block */
> >> + if (WARN_ON_ONCE(blk_lpos->next <= blk_lpos->begin))
> >> + return NULL;
> >> + db = to_block(data_ring, blk_lpos->begin);
> >> + *data_size = blk_lpos->next - blk_lpos->begin;
> >> +
> >> + } else if ((DATA_WRAPS(data_ring, blk_lpos->begin) + 1 ==
> >> + DATA_WRAPS(data_ring, blk_lpos->next)) ||
> >> + ((DATA_WRAPS(data_ring, blk_lpos->begin) ==
> >> + DATA_WRAPS(data_ring, -1UL)) &&
> >> + (DATA_WRAPS(data_ring, blk_lpos->next) == 0))) {
> >
> > I am a bit confused. I would expect that (-1UL + 1) = 0. So the second
> > condition after || looks just like a special variant of the first
> > valid condition.
> >
> > Or do I miss anything? Is there a problems with type casting?
>
> Sorry, this code deserves a comment.
>
> Here we are only comparing the number of wraps. For a wrapping data
> block, @begin will be 1 wrap less than @next. The first part of the
> check is checking the typical case, making sure that:
>
> 1 + WRAPS(@begin) == WRAPS(@next)
>
> There is also the case when the lpos overflows. In that case the number
> of wraps starts over at zero (without having overflowed). (Note: The
> lpos overflows, _not_ the number of wraps. This is why the first check
> is not enough.) In this case, the number of wraps of the highest
> possible lpos value (-1UL) should be the same as the number of wraps of
> @begin. And the number of wraps of @next should be 0. The simplified
> pseudo-code check is:
>
> WRAPS(@begin) == WRAPS(-1UL)
> &&
> WRAPS(@next) == 0

Got it. I knew that it must have been something like this but I did
not see it.

I wonder if the following might be easier to understand even for
people like me ;-)

} else if (DATA_WRAPS(data_ring, blk_lpos->begin + DATA_SIZE(data_ring)) ==
DATA_WRAPS(data_ring, blk_lpos->next)) {

Best Regards,
Petr

2019-12-04 13:30:27

by John Ogness

[permalink] [raw]
Subject: Re: [RFC PATCH v5 2/3] printk-rb: new printk ringbuffer implementation (reader)

On 2019-12-04, Petr Mladek <[email protected]> wrote:
>> + } else if ((DATA_WRAPS(data_ring, blk_lpos->begin) + 1 ==
>> + DATA_WRAPS(data_ring, blk_lpos->next)) ||
>> + ((DATA_WRAPS(data_ring, blk_lpos->begin) ==
>> + DATA_WRAPS(data_ring, -1UL)) &&
>> + (DATA_WRAPS(data_ring, blk_lpos->next) == 0))) {
>
> I wonder if the following might be easier to understand even for
> people like me ;-)
>
> } else if (DATA_WRAPS(data_ring, blk_lpos->begin + DATA_SIZE(data_ring)) ==
> DATA_WRAPS(data_ring, blk_lpos->next)) {

Yes, this is clear and covers both cases. Thanks.

John

2019-12-05 12:03:13

by John Ogness

[permalink] [raw]
Subject: Re: [RFC PATCH v5 1/3] printk-rb: new printk ringbuffer implementation (writer)

On 2019-12-03, Steven Rostedt <[email protected]> wrote:
>>>>> +/* Reserve a new descriptor, invalidating the oldest if necessary. */
>>>>> +static bool desc_reserve(struct printk_ringbuffer *rb, u32 *id_out)
>>>>> +{
>>>>> + struct prb_desc_ring *desc_ring = &rb->desc_ring;
>>>>> + struct prb_desc *desc;
>>>>> + u32 id_prev_wrap;
>>>>> + u32 head_id;
>>>>> + u32 id;
>>>>> +
>>>>> + head_id = atomic_read(&desc_ring->head_id);
>>>>> +
>>>>> + do {
>>>>> + desc = to_desc(desc_ring, head_id);
>>>>> +
>>>>> + id = DESC_ID(head_id + 1);
>>>>> + id_prev_wrap = DESC_ID_PREV_WRAP(desc_ring, id);
>>>>> +
>>>>> + if (id_prev_wrap == atomic_read(&desc_ring->tail_id)) {
>>>>> + if (!desc_push_tail(rb, id_prev_wrap))
>>>>> + return false;
>>>>> + }
>>>>> + } while (!atomic_try_cmpxchg(&desc_ring->head_id, &head_id, id));
>>>>
>>>> Hmm, in theory, ABA problem might cause that we successfully
>>>> move desc_ring->head_id when tail has not been pushed yet.
>>>>
>>>> As a result we would never call desc_push_tail() until
>>>> it overflows again.
>>>>
>>>> I am not sure if we need to take care of it. The code is called
>>>> with interrupts disabled. IMHO, only NMI could cause ABA problem in
>>>> reality. But the game (debugging) is lost anyway when NMI ovewrites
>>>> the buffer several times.
>>>>
>>>> Also it should not be a complete failure. We still could bail out
>>>> when the previous state was not reusable. We are on the safe side
>>>> when it was reusable.
>>>>
>>>> Also we could probably detect when desc_ring->tail_id is not
>>>> updated any longer and do something about it.
>>>>
>>>> BTW: If I am counting correctly. The ABA problem would happen when
>>>> exactly 2^30 (1G) messages is written in the mean time.
>>>
>>> All the ringbuffer code assumes that the use of index numbers
>>> handles the ABA problem (i.e. there must not be 1 billion printk's
>>> within an NMI). If we want to support 1 billion+ printk's within an
>>> NMI, then perhaps the index number should be increased. For 64-bit
>>> systems it would be no problem to go to 62 bits. For 32-bit systems,
>>> I don't know how well the 64-bit atomic operations are supported.
>>
>> ftrace dumps from NMI (DUMP_ALL type ftrace_dump_on_oops on a $BIG
>> machine)? 1G seems large enough, but who knows.
>
> ftrace dump from NMI is the most likely case to hit this, but when
> that happens, you are in debugging mode, and the system usually
> becomes unreliable at this moment. I agree with Petr, that we should
> not complicate the code more to handle this theoretical condition.

I will change the data block ID size to "unsigned long". Then it is
really only an issue for 32-bit systems.

AFAICT, the only real issue is that the head can be pushed to the
descriptor index that the tail is pointing to. From there, the head can
be advanced beyond and the tail may never move again. So writers just
need to make sure the tail gets pushed beyond the newly reserved head
before making any changes to that descriptor.

Aside from moving to "unsigned long" ID's, I believe this can be handled
with the following 4 changes when reserving a new descriptor:

- also push the tail forward if it falls behind

- after pushing the head, re-check if the tail is still ok

- verify the state of the reserved descriptor before changing it

- use cmpxchg() to change it

Below is the patch snippet I use for this. (Note that the patch is
against a version where ID's have already been changed to unsigned
longs.)

John Ogness


@@ -468,19 +470,53 @@ static bool desc_reserve(struct printk_ringbuffer *rb, unsigned long *id_out)
id = DESC_ID(head_id + 1);
id_prev_wrap = DESC_ID_PREV_WRAP(desc_ring, id);

- if (id_prev_wrap == atomic_long_read(&desc_ring->tail_id)) {
- if (!desc_push_tail(rb, id_prev_wrap))
+ /*
+ * Make space for the new descriptor by pushing the tail
+ * beyond the descriptor to be reserved. Normally this only
+ * requires pushing the tail once. But due to possible ABA
+ * problems with the ID, the tail could be too far behind.
+ * Use a loop to push the tail where it needs to be.
+ */
+ tail_id = atomic_long_read(&desc_ring->tail_id);
+ while (DESC_ID(id_prev_wrap - tail_id) <
+ DESCS_COUNT(desc_ring)) {
+
+ if (!desc_push_tail(rb, tail_id))
return false;
+ tail_id = DESC_ID(tail_id + 1);
}
} while (!atomic_long_try_cmpxchg(&desc_ring->head_id, &head_id, id));

+ /*
+ * Before moving the head, it was ensured that the tail was pushed to
+ * where it needs to be. Due to possible ABA problems with the ID, the
+ * reserved descriptor may not be what was expected. Re-check the tail
+ * to see if it is still where it needs to be.
+ */
+ tail_id = atomic_long_read(&desc_ring->tail_id);
+ if (DESC_ID(id_prev_wrap - tail_id) < DESCS_COUNT(desc_ring))
+ return false;
+
desc = to_desc(desc_ring, id);

+ /* If the descriptor has been recycled, verify the old state val. */
+ prev_state_val = atomic_long_read(&desc->state_var);
+ if (prev_state_val && prev_state_val != (id_prev_wrap |
+ DESC_COMMITTED_MASK |
+ DESC_REUSE_MASK)) {
+ /* Unexpected value. Hit ABA issue for ID? */
+ return false;
+ }
+
+ /*
+ * Set the descriptor's ID and also set its state to reserved.
+ * The new ID/state is stored before making any other changes.
+ */
+ if (!atomic_long_try_cmpxchg(&desc->state_var, &prev_state_val,
+ id | 0)) { /* LMM_TAG(desc_reserve:A) */
+ /* Unexpected value. Hit ABA issue for ID? */
+ return false;
+ }
- /* Set the descriptor's ID and also set its state to reserved. */
- atomic_long_set(&desc->state_var, id | 0);
-
- /* Store new ID/state before making any other changes. */
- smp_wmb(); /* LMM_TAG(desc_reserve:A) */

*id_out = id;
return true;

2019-12-05 13:48:46

by Prarit Bhargava

[permalink] [raw]
Subject: Re: [RFC PATCH v5 0/3] printk: new ringbuffer implementation

John Ogness <[email protected]> wrote:
> Hello,
>
> This is a follow-up RFC on the work to re-implement much of the
> core of printk. The threads for the previous RFC versions are
> here[0][1][2][3].
>
> This RFC includes only the ringbuffer and a test module. This is
> a rewrite of the proposed ringbuffer, now based on the proof of
> concept[4] from Petr Mladek as agreed at the meeting[5] during
> LPC2019 in Lisbon.
>
> The internal structure has been reworked such that the printk
> strings are in their own array, each separated by a 32-bit
> integer.
>
> A 2nd array contains the dictionary strings (also with each
> separated by a 32-bit integer).
>
> A 3rd array is made up of descriptors that contain all the
> meta-data for each printk record (sequence number, timestamp,
> loglevel, caller, etc.) as well as pointers into the other data
> arrays for the text and dictionary data.
>
> The writer interface is somewhat similar to v4, but the reader
> interface has changed significantly. Rather than using an
> iterator object, readers just specify the sequence number they
> want to read. In effect, the sequence number acts as the
> iterator.
>
> I have been communicating with Petr the last couple months to
> make sure this implementation fits his expectations. This RFC is
> mainly to get some feedback from anyone else that may see
> something that Petr and I have missed.
>
> This series also includes my test module. On a 16-core ARM64
> test machine, the module runs without any errors. I am seeing
> the 15 writing cores each writing about 34500 records per
> second, while the 1 reading core misses only about 15% of the
> total records.
>

John,

Based on the comments there is going to be a v6 but in any case I am
starting testing of this patchset on several large core systems across
multiple architectures (x86_64, ARM64, s390, ppc64le). Some of those
systems are known to fail boot due to the large amount of printk output so
it will be good to see if these changes resolve those issues.

Sorry for the delay and I'll report back in a few days.

P.

> John Ogness
>
> [0] https://lkml.kernel.org/r/[email protected]
> [1] https://lkml.kernel.org/r/[email protected]
> [2] https://lkml.kernel.org/r/[email protected]
> [3] https://lkml.kernel.org/r/[email protected]
> [4] https://lkml.kernel.org/r/[email protected]
> [5] https://lkml.kernel.org/r/[email protected]
>
> John Ogness (3):
> printk-rb: new printk ringbuffer implementation (writer)
> printk-rb: new printk ringbuffer implementation (reader)
> printk-rb: add test module
>
> kernel/printk/Makefile | 3 +
> kernel/printk/printk_ringbuffer.c | 910 ++++++++++++++++++++++++++++++
> kernel/printk/printk_ringbuffer.h | 249 ++++++++
> kernel/printk/test_prb.c | 347 ++++++++++++
> 4 files changed, 1509 insertions(+)
> create mode 100644 kernel/printk/printk_ringbuffer.c
> create mode 100644 kernel/printk/printk_ringbuffer.h
> create mode 100644 kernel/printk/test_prb.c
>
> --
> 2.20.1

2019-12-05 14:07:53

by John Ogness

[permalink] [raw]
Subject: Re: [RFC PATCH v5 0/3] printk: new ringbuffer implementation

Hi Prarit,

On 2019-12-05, Prarit Bhargava <[email protected]> wrote:
> Based on the comments there is going to be a v6 but in any case I am
> starting testing of this patchset on several large core systems across
> multiple architectures (x86_64, ARM64, s390, ppc64le). Some of those
> systems are known to fail boot due to the large amount of printk output so
> it will be good to see if these changes resolve those issues.

Right now the patches only include the ringbuffer as a separate entity
with a test module. So they do not yet have any effect on printk.

If you apply the patches and then build the "modules" target, you will
have a new test_prb.ko module. Loading that module will start some heavy
testing of the ringbuffer. As long as the testing is successful, the
module will keep testing. During this time the machine will be very
slow, but should still respond.

The test can be stopped by unloading the module. If the test stops on
its own, then a problem was found. The output of the test is put into
the ftrace buffer.

It would be nice if you could run the test module on some fat machines,
at least for a few minutes to see if anything explodes. ARM64 and
ppc64le will probably be the most interesting, due to memory barrier
testing.

Otherwise I will definitely be reaching out to you when we are ready to
perform actual printk testing with the newly agreed up semantics
(lockless, per-console printing threads, synchronous panic
consoles). Thanks for your help with this.

John Ogness

2019-12-06 14:17:42

by Prarit Bhargava

[permalink] [raw]
Subject: Re: [RFC PATCH v5 0/3] printk: new ringbuffer implementation

John Ogness <[email protected]> wrote:
> Hi Prarit,
>
> On 2019-12-05, Prarit Bhargava <[email protected]> wrote:
> > Based on the comments there is going to be a v6 but in any case I am
> > starting testing of this patchset on several large core systems across
> > multiple architectures (x86_64, ARM64, s390, ppc64le). Some of those
> > systems are known to fail boot due to the large amount of printk output so
> > it will be good to see if these changes resolve those issues.
>
> Right now the patches only include the ringbuffer as a separate entity
> with a test module. So they do not yet have any effect on printk.
>
> If you apply the patches and then build the "modules" target, you will
> have a new test_prb.ko module. Loading that module will start some heavy
> testing of the ringbuffer. As long as the testing is successful, the
> module will keep testing. During this time the machine will be very
> slow, but should still respond.
>
> The test can be stopped by unloading the module. If the test stops on
> its own, then a problem was found. The output of the test is put into
> the ftrace buffer.
>
> It would be nice if you could run the test module on some fat machines,
> at least for a few minutes to see if anything explodes. ARM64 and
> ppc64le will probably be the most interesting, due to memory barrier
> testing.
>

I've run the module overnight on all 4 arches I mentioned above. I didn't
see any failures but IIUC the module test runs at max. I'm going to put a
load test on these systems that introduces a variable load to interfere
with the prbtest module to see if that kicks anything.

> Otherwise I will definitely be reaching out to you when we are ready to
> perform actual printk testing with the newly agreed up semantics
> (lockless, per-console printing threads, synchronous panic
> consoles). Thanks for your help with this.
>

np :) but I should be the one thanking you ;)

P.

> John Ogness

2019-12-09 07:43:59

by Sergey Senozhatsky

[permalink] [raw]
Subject: Re: [RFC PATCH v5 1/3] printk-rb: new printk ringbuffer implementation (writer)

On (19/11/28 02:58), John Ogness wrote:
[..]
> +
> +#define _DATA_SIZE(sz_bits) (1UL << (sz_bits))
> +#define _DESCS_COUNT(ct_bits) (1U << (ct_bits))
> +#define DESC_SV_BITS (sizeof(int) * 8)
> +#define DESC_COMMITTED_MASK (1U << (DESC_SV_BITS - 1))

What does SV state for? State Value?

-ss

2019-12-09 08:45:13

by Sergey Senozhatsky

[permalink] [raw]
Subject: Re: [RFC PATCH v5 3/3] printk-rb: add test module

On (19/11/28 02:58), John Ogness wrote:
> + * This is a test module that starts "num_online_cpus()" writer threads
> + * that each write data of varying length. They do this as fast as
> + * they can.

Can we also add some IRQ (hard or soft) writers and (if possible) some NMI
writers?

-ss

2019-12-09 08:52:04

by Sergey Senozhatsky

[permalink] [raw]
Subject: Re: [RFC PATCH v5 2/3] printk-rb: new printk ringbuffer implementation (reader)

On (19/11/28 02:58), John Ogness wrote:
> +/* Given @blk_lpos, copy an expected @len of data into the provided buffer. */
> +static bool copy_data(struct prb_data_ring *data_ring,
> + struct prb_data_blk_lpos *blk_lpos, u16 len, char *buf,
> + unsigned int buf_size)
> +{
> + unsigned long data_size;
> + char *data;
> +
> + /* Caller might not want the data. */
> + if (!buf || !buf_size)
> + return true;
> +
> + data = get_data(data_ring, blk_lpos, &data_size);
> + if (!data)
> + return false;
> +
> + /* Actual cannot be less than expected. */
> + if (WARN_ON_ONCE(data_size < len))
> + return false;
> +
> + data_size = min_t(u16, buf_size, len);
> +
> + if (!WARN_ON_ONCE(!data_size))
> + memcpy(&buf[0], data, data_size);
> + return true;
> +}
> +
> +/*
> + * Read the record @id and verify that it is committed and has the sequence
> + * number @seq.
> + *
> + * Error return values:
> + * -EINVAL: The record @seq does not exist.
> + * -ENOENT: The record @seq exists, but its data is not available. This is a
> + * valid record, so readers should continue with the next seq.
> + */
> +static int desc_read_committed(struct prb_desc_ring *desc_ring, u32 id,
> + u64 seq, struct prb_desc *desc)
> +{
> + enum desc_state d_state;
> +
> + d_state = desc_read(desc_ring, id, desc);
> + if (desc->info.seq != seq)
> + return -EINVAL;
> + else if (d_state == desc_reusable)
> + return -ENOENT;
> + else if (d_state != desc_committed)
> + return -EINVAL;
> +
> + return 0;
> +}
> +
> +/*
> + * Copy the ringbuffer data from the record with @seq to the provided
> + * @r buffer. On success, 0 is returned.
> + *
> + * See desc_read_committed() for error return values.
> + */
> +static int prb_read(struct printk_ringbuffer *rb, u64 seq,
> + struct printk_record *r)
> +{
> + struct prb_desc_ring *desc_ring = &rb->desc_ring;
> + struct prb_desc *rdesc = to_desc(desc_ring, seq);
> + atomic_t *state_var = &rdesc->state_var;
> + struct prb_desc desc;
> + int err;
> + u32 id;
> +
> + /* Get a reliable local copy of the descriptor and check validity. */
> + id = DESC_ID(atomic_read(state_var));
> + err = desc_read_committed(desc_ring, id, seq, &desc);
> + if (err)
> + return err;
> +
> + /* If requested, copy meta data. */
> + if (r->info)
> + memcpy(r->info, &desc.info, sizeof(*(r->info)));

I wonder if those WARN_ON-s will trigger false positive sometimes.

A theoretical case.

What if reader gets preempted/interrupted in the middle of
desc_read_committed()->desc_read()->memcpy(). The context which interrupts
the reader recycles the descriptor and pushes new data. Suppose that
reader was interrupted right after it copied ->info.seq and ->info.text_len.
So the first desc_read_committed() will pass - we have matching ->seq
and committed state. copy_data(), however, most likely, will generate
WARNs. The final desc_read_committed() will notice that local copy
of desc was in non-consistent state and everything is fine, but we have
WARNs in the log buffer now.

-ss

2019-12-09 09:04:48

by Sergey Senozhatsky

[permalink] [raw]
Subject: Re: [RFC PATCH v5 2/3] printk-rb: new printk ringbuffer implementation (reader)

On (19/12/09 17:43), Sergey Senozhatsky wrote:
> > +static int desc_read_committed(struct prb_desc_ring *desc_ring, u32 id,
> > + u64 seq, struct prb_desc *desc)
> > +{
> > + enum desc_state d_state;
> > +
> > + d_state = desc_read(desc_ring, id, desc);
> > + if (desc->info.seq != seq)
> > + return -EINVAL;
> > + else if (d_state == desc_reusable)
> > + return -ENOENT;
> > + else if (d_state != desc_committed)
> > + return -EINVAL;
> > +
> > + return 0;
> > +}
> > +
> > +/*
> > + * Copy the ringbuffer data from the record with @seq to the provided
> > + * @r buffer. On success, 0 is returned.
> > + *
> > + * See desc_read_committed() for error return values.
> > + */
> > +static int prb_read(struct printk_ringbuffer *rb, u64 seq,
> > + struct printk_record *r)
> > +{
> > + struct prb_desc_ring *desc_ring = &rb->desc_ring;
> > + struct prb_desc *rdesc = to_desc(desc_ring, seq);
> > + atomic_t *state_var = &rdesc->state_var;
> > + struct prb_desc desc;
> > + int err;
> > + u32 id;
> > +
> > + /* Get a reliable local copy of the descriptor and check validity. */
> > + id = DESC_ID(atomic_read(state_var));
> > + err = desc_read_committed(desc_ring, id, seq, &desc);
> > + if (err)
> > + return err;
> > +
> > + /* If requested, copy meta data. */
> > + if (r->info)
> > + memcpy(r->info, &desc.info, sizeof(*(r->info)));
>
> I wonder if those WARN_ON-s will trigger false positive sometimes.
>
> A theoretical case.
>
> What if reader gets preempted/interrupted in the middle of
> desc_read_committed()->desc_read()->memcpy(). The context which interrupts
> the reader recycles the descriptor and pushes new data. Suppose that
> reader was interrupted right after it copied ->info.seq and ->info.text_len.
> So the first desc_read_committed() will pass - we have matching ->seq
> and committed state. copy_data(), however, most likely, will generate
> WARNs. The final desc_read_committed() will notice that local copy
> of desc was in non-consistent state and everything is fine, but we have
> WARNs in the log buffer now.

Hmm. No, that won't happen. We should get desc_miss first, and then -EINVAL.

-ss

2019-12-09 09:10:46

by John Ogness

[permalink] [raw]
Subject: Re: [RFC PATCH v5 2/3] printk-rb: new printk ringbuffer implementation (reader)

On 2019-12-09, Sergey Senozhatsky <[email protected]> wrote:
>> +/* Given @blk_lpos, copy an expected @len of data into the provided buffer. */
>> +static bool copy_data(struct prb_data_ring *data_ring,
>> + struct prb_data_blk_lpos *blk_lpos, u16 len, char *buf,
>> + unsigned int buf_size)
>> +{
>> + unsigned long data_size;
>> + char *data;
>> +
>> + /* Caller might not want the data. */
>> + if (!buf || !buf_size)
>> + return true;
>> +
>> + data = get_data(data_ring, blk_lpos, &data_size);
>> + if (!data)
>> + return false;
>> +
>> + /* Actual cannot be less than expected. */
>> + if (WARN_ON_ONCE(data_size < len))
>> + return false;
>> +
>> + data_size = min_t(u16, buf_size, len);
>> +
>> + if (!WARN_ON_ONCE(!data_size))
>> + memcpy(&buf[0], data, data_size);
>> + return true;
>> +}
>> +
>> +/*
>> + * Read the record @id and verify that it is committed and has the sequence
>> + * number @seq.
>> + *
>> + * Error return values:
>> + * -EINVAL: The record @seq does not exist.
>> + * -ENOENT: The record @seq exists, but its data is not available. This is a
>> + * valid record, so readers should continue with the next seq.
>> + */
>> +static int desc_read_committed(struct prb_desc_ring *desc_ring, u32 id,
>> + u64 seq, struct prb_desc *desc)
>> +{
>> + enum desc_state d_state;
>> +
>> + d_state = desc_read(desc_ring, id, desc);
>> + if (desc->info.seq != seq)
>> + return -EINVAL;
>> + else if (d_state == desc_reusable)
>> + return -ENOENT;
>> + else if (d_state != desc_committed)
>> + return -EINVAL;
>> +
>> + return 0;
>> +}
>> +
>> +/*
>> + * Copy the ringbuffer data from the record with @seq to the provided
>> + * @r buffer. On success, 0 is returned.
>> + *
>> + * See desc_read_committed() for error return values.
>> + */
>> +static int prb_read(struct printk_ringbuffer *rb, u64 seq,
>> + struct printk_record *r)
>> +{
>> + struct prb_desc_ring *desc_ring = &rb->desc_ring;
>> + struct prb_desc *rdesc = to_desc(desc_ring, seq);
>> + atomic_t *state_var = &rdesc->state_var;
>> + struct prb_desc desc;
>> + int err;
>> + u32 id;
>> +
>> + /* Get a reliable local copy of the descriptor and check validity. */
>> + id = DESC_ID(atomic_read(state_var));
>> + err = desc_read_committed(desc_ring, id, seq, &desc);
>> + if (err)
>> + return err;
>> +
>> + /* If requested, copy meta data. */
>> + if (r->info)
>> + memcpy(r->info, &desc.info, sizeof(*(r->info)));
>
> I wonder if those WARN_ON-s will trigger false positive sometimes.
>
> A theoretical case.
>
> What if reader gets preempted/interrupted in the middle of
> desc_read_committed()->desc_read()->memcpy(). The context which
> interrupts the reader recycles the descriptor and pushes new
> data. Suppose that reader was interrupted right after it copied
> ->info.seq and ->info.text_len. So the first desc_read_committed()
> will pass - we have matching ->seq and committed state. copy_data(),
> however, most likely, will generate WARNs. The final
> desc_read_committed() will notice that local copy of desc was in
> non-consistent state and everything is fine, but we have WARNs in the
> log buffer now.

Be aware that desc_read_committed() is filling a copy of the descriptor
in the local variable @desc. If desc_read_committed() succeeded, that
local copy _must_ be consistent. If the WARNs trigger, either
desc_read_committed() or the writer code is broken.

John Ogness

2019-12-09 09:28:06

by John Ogness

[permalink] [raw]
Subject: Re: [RFC PATCH v5 1/3] printk-rb: new printk ringbuffer implementation (writer)

On 2019-12-09, Sergey Senozhatsky <[email protected]> wrote:
>> +#define _DATA_SIZE(sz_bits) (1UL << (sz_bits))
>> +#define _DESCS_COUNT(ct_bits) (1U << (ct_bits))
>> +#define DESC_SV_BITS (sizeof(int) * 8)
>> +#define DESC_COMMITTED_MASK (1U << (DESC_SV_BITS - 1))
>
> What does SV state for? State Value?

Yes. Originally this thing was just called the state. But it was a bit
confusing in the code because there is also an enum desc_state (used for
state queries), which is _not_ the value that is stored in the state
variable. That's why the code is using state_var/state_val (SV) for the
actual data values, keeping it separate from desc_state/d_state for the
the state queries.

John Ogness

2019-12-09 09:44:08

by Sergey Senozhatsky

[permalink] [raw]
Subject: Re: [RFC PATCH v5 1/3] printk-rb: new printk ringbuffer implementation (writer)

On (19/12/02 16:48), Petr Mladek wrote:
> Hi,
>
> I have seen few prelimitary versions before this public one.
> I am either happy with it or blind to see new problems[*].

I guess I'm happy with it.

-ss

2019-12-09 09:50:23

by Sergey Senozhatsky

[permalink] [raw]
Subject: Re: [RFC PATCH v5 1/3] printk-rb: new printk ringbuffer implementation (writer)

On (19/11/28 02:58), John Ogness wrote:
> + * Sample reader code::
> + *
> + * struct printk_info info;
> + * char text_buf[32];
> + * char dict_buf[32];
> + * u64 next_seq = 0;
> + * struct printk_record r = {
> + * .info = &info,
> + * .text_buf = &text_buf[0],
> + * .dict_buf = &dict_buf[0],
> + * .text_buf_size = sizeof(text_buf),
> + * .dict_buf_size = sizeof(dict_buf),
> + * };
> + *
> + * while (prb_read_valid(&rb, next_seq, &r)) {
> + * if (info.seq != next_seq)
> + * pr_warn("lost %llu records\n", info.seq - next_seq);
> + *
> + * if (info.text_len > r.text_buf_size) {
> + * pr_warn("record %llu text truncated\n", info.seq);
> + * text_buf[sizeof(text_buf) - 1] = 0;
> + * }
> + *
> + * if (info.dict_len > r.dict_buf_size) {
> + * pr_warn("record %llu dict truncated\n", info.seq);
> + * dict_buf[sizeof(dict_buf) - 1] = 0;
> + * }
> + *
> + * pr_info("%llu: %llu: %s;%s\n", info.seq, info.ts_nsec,
> + * &text_buf[0], info.dict_len ? &dict_buf[0] : "");
> + *
> + * next_seq = info.seq + 1;
> + * }
> + */

Will this loop ever end? :)

pr_info() adds data to ringbuffer, which prb_read_valid() reads, so pr_info()
can add more data, which prb_read_valid() will read, so pr_info()...

-ss

2019-12-09 09:50:32

by John Ogness

[permalink] [raw]
Subject: Re: [RFC PATCH v5 1/3] printk-rb: new printk ringbuffer implementation (writer)

On 2019-12-09, Sergey Senozhatsky <[email protected]> wrote:
>> + * Sample reader code::
>> + *
>> + * struct printk_info info;
>> + * char text_buf[32];
>> + * char dict_buf[32];
>> + * u64 next_seq = 0;
>> + * struct printk_record r = {
>> + * .info = &info,
>> + * .text_buf = &text_buf[0],
>> + * .dict_buf = &dict_buf[0],
>> + * .text_buf_size = sizeof(text_buf),
>> + * .dict_buf_size = sizeof(dict_buf),
>> + * };
>> + *
>> + * while (prb_read_valid(&rb, next_seq, &r)) {
>> + * if (info.seq != next_seq)
>> + * pr_warn("lost %llu records\n", info.seq - next_seq);
>> + *
>> + * if (info.text_len > r.text_buf_size) {
>> + * pr_warn("record %llu text truncated\n", info.seq);
>> + * text_buf[sizeof(text_buf) - 1] = 0;
>> + * }
>> + *
>> + * if (info.dict_len > r.dict_buf_size) {
>> + * pr_warn("record %llu dict truncated\n", info.seq);
>> + * dict_buf[sizeof(dict_buf) - 1] = 0;
>> + * }
>> + *
>> + * pr_info("%llu: %llu: %s;%s\n", info.seq, info.ts_nsec,
>> + * &text_buf[0], info.dict_len ? &dict_buf[0] : "");
>> + *
>> + * next_seq = info.seq + 1;
>> + * }
>> + */
>
> Will this loop ever end? :)
>
> pr_info() adds data to ringbuffer, which prb_read_valid() reads, so
> pr_info() can add more data, which prb_read_valid() will read, so
> pr_info()...

The sample code is assuming that @rb is not the same ringbuffer used by
kernel/printk/printk.c. (For example, the test module is doing that to
stress test the ringbuffer code without actually affecting printk.) I
can add a sentence to clarify that.

John Ogness

2019-12-21 14:29:03

by Andrea Parri

[permalink] [raw]
Subject: Re: [RFC PATCH v5 1/3] printk-rb: new printk ringbuffer implementation (writer)

Hi John,

Sorry for the delay.

I don't have an overall understanding of the patch(-set) yet, so I limit
to a couple of general questions about the memory barriers introduced by
the path. Please see inline comments.


> + *desc_out = READ_ONCE(*desc);
> +
> + /* Load data before re-checking state. */
> + smp_rmb(); /* matches LMM_REF(desc_reserve:A) */

I looked for a matching WRITE_ONCE() or some other type of marked write,
but I could not find it. What is the rationale? Or what did I miss?


> + do {
> + next_lpos = get_next_lpos(data_ring, begin_lpos, size);
> +
> + if (!data_push_tail(rb, data_ring,
> + next_lpos - DATA_SIZE(data_ring))) {
> + /* Failed to allocate, specify a data-less block. */
> + blk_lpos->begin = INVALID_LPOS;
> + blk_lpos->next = INVALID_LPOS;
> + return NULL;
> + }
> + } while (!atomic_long_try_cmpxchg(&data_ring->head_lpos, &begin_lpos,
> + next_lpos));
> +
> + /*
> + * No barrier is needed here. The data validity is defined by
> + * the state of the associated descriptor. They are marked as
> + * invalid at the moment. And only the winner of the above
> + * cmpxchg() could write here.
> + */

The (successful) CMPXCHG provides a full barrier. This comment suggests
that that could be somehow relaxed? Or the comment could be improved?

(The patch introduces a number of CMPXCHG: similar questions would apply
to those other instances...)

Thanks,
Andrea

P. S. Please use my @gmail.com address for future communications.

2019-12-23 16:02:51

by John Ogness

[permalink] [raw]
Subject: Re: [RFC PATCH v5 1/3] printk-rb: new printk ringbuffer implementation (writer)

Hi Andrea,

On 2019-12-21, Andrea Parri <[email protected]> wrote:
>> + *desc_out = READ_ONCE(*desc);
>> +
>> + /* Load data before re-checking state. */
>> + smp_rmb(); /* matches LMM_REF(desc_reserve:A) */
>
> I looked for a matching WRITE_ONCE() or some other type of marked write,
> but I could not find it. What is the rationale? Or what did I miss?

This smp_rmb() matches LMM_TAG(desc_reserve:A).

>> + do {
>> + next_lpos = get_next_lpos(data_ring, begin_lpos, size);
>> +
>> + if (!data_push_tail(rb, data_ring,
>> + next_lpos - DATA_SIZE(data_ring))) {
>> + /* Failed to allocate, specify a data-less block. */
>> + blk_lpos->begin = INVALID_LPOS;
>> + blk_lpos->next = INVALID_LPOS;
>> + return NULL;
>> + }
>> + } while (!atomic_long_try_cmpxchg(&data_ring->head_lpos, &begin_lpos,
>> + next_lpos));
>> +
>> + /*
>> + * No barrier is needed here. The data validity is defined by
>> + * the state of the associated descriptor. They are marked as
>> + * invalid at the moment. And only the winner of the above
>> + * cmpxchg() could write here.
>> + */
>
> The (successful) CMPXCHG provides a full barrier. This comment suggests
> that that could be somehow relaxed? Or the comment could be improved?

You are correct. There is no need for the full barrier here. This code
is based on Petr's POC. I focussed on making sure needed barriers are in
place, but did not try to eliminate excessive barriers.

> (The patch introduces a number of CMPXCHG: similar questions would
> apply to those other instances...)

LMM_TAG(data_push_tail:A) is the only CMPXCHG that requires its full
barrier. All others can be relaxed. I will make this change for the next
series.

Thanks for taking time for this.

John Ogness

2020-01-03 10:26:57

by Petr Mladek

[permalink] [raw]
Subject: Re: [RFC PATCH v5 1/3] printk-rb: new printk ringbuffer implementation (writer)

On Mon 2019-12-23 17:01:00, John Ogness wrote:
> Hi Andrea,
>
> On 2019-12-21, Andrea Parri <[email protected]> wrote:
> >> + *desc_out = READ_ONCE(*desc);
> >> +
> >> + /* Load data before re-checking state. */
> >> + smp_rmb(); /* matches LMM_REF(desc_reserve:A) */
> >
> > I looked for a matching WRITE_ONCE() or some other type of marked write,
> > but I could not find it. What is the rationale? Or what did I miss?

Good question. READ_ONCE() looks superfluous here because it is
surrounded by two read barriers. In each case, there is no
corresponding WRITE_ONCE().

Note that we are copying the entire struct prb_desc here. All values
are written only when state_val is in desc_reserved state. It happens
between two full write barriers:

+ A writer is allowed to modify the descriptor after successful
cmpxchg in desc_reserve(), see LMM_TAG(desc_reserve:A).

+ The writer must not touch the descriptor after changing
state_var to committed state, see
LMM_TAG(prb_commit:A) in prb_commit().

These barriers are mentioned in the comments for the two
read barriers here.

> >> + do {
> >> + next_lpos = get_next_lpos(data_ring, begin_lpos, size);
> >> +
> >> + if (!data_push_tail(rb, data_ring,
> >> + next_lpos - DATA_SIZE(data_ring))) {
> >> + /* Failed to allocate, specify a data-less block. */
> >> + blk_lpos->begin = INVALID_LPOS;
> >> + blk_lpos->next = INVALID_LPOS;
> >> + return NULL;
> >> + }
> >> + } while (!atomic_long_try_cmpxchg(&data_ring->head_lpos, &begin_lpos,
> >> + next_lpos));
> >> +
> >> + /*
> >> + * No barrier is needed here. The data validity is defined by
> >> + * the state of the associated descriptor. They are marked as
> >> + * invalid at the moment. And only the winner of the above
> >> + * cmpxchg() could write here.
> >> + */
> >
> > The (successful) CMPXCHG provides a full barrier. This comment suggests
> > that that could be somehow relaxed? Or the comment could be improved?
>
> You are correct. There is no need for the full barrier here. This code
> is based on Petr's POC. I focussed on making sure needed barriers are in
> place, but did not try to eliminate excessive barriers.

I hope that I'll get better understanding of the guarantees
of different atomic operations one day. There are so many variants now.

BTW: Documentation/memory-barriers.txt describes various aspects of
the memory barriers. It describes implicit barriers provided
by spin locks, mutexes, semaphores, and various scheduler-related
operations.

But I can't find any explanation of the various variants of the atomic
operations: acquire, release, fetch, return, try, relaxed. I can find
some clues here and there but it is hard to get the picture.

Best Regards,
Petr

2020-01-04 14:34:47

by Andrea Parri

[permalink] [raw]
Subject: Re: [RFC PATCH v5 1/3] printk-rb: new printk ringbuffer implementation (writer)

On Fri, Jan 03, 2020 at 11:24:20AM +0100, Petr Mladek wrote:
> On Mon 2019-12-23 17:01:00, John Ogness wrote:
> > Hi Andrea,
> >
> > On 2019-12-21, Andrea Parri <[email protected]> wrote:
> > >> + *desc_out = READ_ONCE(*desc);
> > >> +
> > >> + /* Load data before re-checking state. */
> > >> + smp_rmb(); /* matches LMM_REF(desc_reserve:A) */
> > >
> > > I looked for a matching WRITE_ONCE() or some other type of marked write,
> > > but I could not find it. What is the rationale? Or what did I miss?
>
> Good question. READ_ONCE() looks superfluous here because it is
> surrounded by two read barriers. In each case, there is no
> corresponding WRITE_ONCE().
>
> Note that we are copying the entire struct prb_desc here. All values
> are written only when state_val is in desc_reserved state. It happens
> between two full write barriers:
>
> + A writer is allowed to modify the descriptor after successful
> cmpxchg in desc_reserve(), see LMM_TAG(desc_reserve:A).
>
> + The writer must not touch the descriptor after changing
> state_var to committed state, see
> LMM_TAG(prb_commit:A) in prb_commit().
>
> These barriers are mentioned in the comments for the two
> read barriers here.

Thanks for these remarks. As usual, I'd recommend to (try to) map those
comments into litmus tests and check with the LKMM simulator.


> BTW: Documentation/memory-barriers.txt describes various aspects of
> the memory barriers. It describes implicit barriers provided
> by spin locks, mutexes, semaphores, and various scheduler-related
> operations.
>
> But I can't find any explanation of the various variants of the atomic
> operations: acquire, release, fetch, return, try, relaxed. I can find
> some clues here and there but it is hard to get the picture.

Documentation/atomic_t.txt could serve this purpose. Please have a look
there and let me know if you have any comments.

Thanks,
Andrea

2020-01-27 12:42:23

by Eugeniu Rosca

[permalink] [raw]
Subject: Re: [RFC PATCH v5 0/3] printk: new ringbuffer implementation

Hi John,

On Thu, Nov 28, 2019 at 02:58:32AM +0106, John Ogness wrote:
> Hello,
>
> This is a follow-up RFC on the work to re-implement much of the
> core of printk. The threads for the previous RFC versions are
> here[0][1][2][3].
>
> This RFC includes only the ringbuffer and a test module. This is
> a rewrite of the proposed ringbuffer, now based on the proof of
> concept[4] from Petr Mladek as agreed at the meeting[5] during
> LPC2019 in Lisbon.
>
> [0] https://lkml.kernel.org/r/[email protected]
> [1] https://lkml.kernel.org/r/[email protected]
> [2] https://lkml.kernel.org/r/[email protected]
> [3] https://lkml.kernel.org/r/[email protected]
> [4] https://lkml.kernel.org/r/[email protected]
> [5] https://lkml.kernel.org/r/[email protected]
>
> John Ogness (3):
> printk-rb: new printk ringbuffer implementation (writer)
> printk-rb: new printk ringbuffer implementation (reader)
> printk-rb: add test module

As a follow-up to the discussion started in [*], I would like to stress
once again that it is extremely convenient to have the context of the
console drivers detached from the printk callers, particularly to
mitigate the issue described in [*].

I gave the test module from this series a try, by running it overnight
on R-Car H3ULCB, and spotted no issues whatsoever. I won't post any
signatures, as this is RFC, but I would be willing to do so for any
upcoming non-RFC series. Looking forward to that!

[*] https://lore.kernel.org/linux-serial/[email protected]/

--
Best Regards
Eugeniu Rosca