Add support for extending the last data block. For this, introduce a new
finalization state flag that identifies if a descriptor may be extended.
When a writer calls the commit function prb_commit(), the record may still
continue to be in the reserved queried state. In order for that record to
enter into the committed queried state, that record also must be finalized.
Finalization can occur anytime while the record is in the reserved queried
state, even before the writer has called prb_commit().
Until a record is finalized (represented by "DESC_FINAL_MASK"), a writer
may "reopen" that record and extend it with more data.
Note that existing descriptors are automatically finalized whenever new
descriptors are created. A record can never be "unfinalized".
Two new memory barrier pairs are introduced, but these are really just
alternate path barriers that exactly correspond to existing memory
barriers.
Signed-off-by: John Ogness <[email protected]>
---
kernel/printk/printk.c | 4 +-
kernel/printk/printk_ringbuffer.c | 386 +++++++++++++++++++++++++++---
kernel/printk/printk_ringbuffer.h | 8 +-
3 files changed, 364 insertions(+), 34 deletions(-)
diff --git a/kernel/printk/printk.c b/kernel/printk/printk.c
index ad8d1dfe5fbe..e063edd8adc2 100644
--- a/kernel/printk/printk.c
+++ b/kernel/printk/printk.c
@@ -532,7 +532,7 @@ static int log_store(u32 caller_id, int facility, int level,
r.info->caller_id = caller_id;
/* insert message */
- prb_commit(&e);
+ prb_commit_finalize(&e);
return (text_len + trunc_msg_len);
}
@@ -1088,7 +1088,7 @@ static unsigned int __init add_to_rb(struct printk_ringbuffer *rb,
dest_r.info->ts_nsec = r->info->ts_nsec;
dest_r.info->caller_id = r->info->caller_id;
- prb_commit(&e);
+ prb_commit_finalize(&e);
return prb_record_text_space(&e);
}
diff --git a/kernel/printk/printk_ringbuffer.c b/kernel/printk/printk_ringbuffer.c
index d66718e74aae..90d48973ac9e 100644
--- a/kernel/printk/printk_ringbuffer.c
+++ b/kernel/printk/printk_ringbuffer.c
@@ -49,14 +49,16 @@
* Descriptors have three states:
*
* reserved
- * A writer is modifying the record.
+ * A writer is modifying the record. Internally represented as either "0"
+ * or "DESC_FINAL_MASK" or "DESC_COMMIT_MASK".
*
* committed
* The record and all its data are complete and available for reading.
+ * Internally represented as "DESC_COMMIT_MASK | DESC_FINAL_MASK".
*
* reusable
* The record exists, but its text and/or dictionary data may no longer
- * be available.
+ * be available. Internally represented as "DESC_REUSE_MASK".
*
* Querying the @state_var of a record requires providing the ID of the
* descriptor to query. This can yield a possible fourth (pseudo) state:
@@ -79,6 +81,20 @@
* committed or reusable queried state. This makes it possible that a valid
* sequence number of the tail is always available.
*
+ * Descriptor Finalization
+ * ~~~~~~~~~~~~~~~~~~~~~~~
+ * When a writer calls the commit function prb_commit(), the record may still
+ * continue to be in the reserved queried state. In order for that record to
+ * enter into the committed queried state, that record also must be finalized.
+ * Finalization can occur anytime while the record is in the reserved queried
+ * state, even before the writer has called prb_commit().
+ *
+ * Until a record is finalized (represented by "DESC_FINAL_MASK"), a writer
+ * may "reopen" that record and extend it with more data.
+ *
+ * Note that existing descriptors are automatically finalized whenever new
+ * descriptors are created. A record can never be "unfinalized".
+ *
* Data Rings
* ~~~~~~~~~~
* The two data rings (text and dictionary) function identically. They exist
@@ -153,9 +169,38 @@
*
* r.info->ts_nsec = local_clock();
*
+ * prb_commit_finalize(&e);
+ * }
+ *
+ * Note that additional writer functions are available to extend a record
+ * after it has been committed but not yet finalized. This can be done as
+ * long as no new records have been reserved and the caller is the same.
+ *
+ * Sample writer code (record extending)::
+ *
+ * // alternate rest of previous example
+ * r.info->ts_nsec = local_clock();
+ * r.info->text_len = strlen(textstr);
+ * r.info->caller_id = printk_caller_id();
+ *
+ * // commit the record (but do not finalize yet)
* prb_commit(&e);
* }
*
+ * ...
+ *
+ * // specify additional 5 bytes text space to extend
+ * prb_rec_init_wr(&r, 5, 0);
+ *
+ * if (prb_reserve_last(&e, &test_rb, &r, printk_caller_id())) {
+ * snprintf(&r.text_buf[r.info->text_len],
+ * r.text_buf_size - r.info->text_len, "hello");
+ *
+ * r.info->text_len += 5;
+ *
+ * prb_commit_finalize(&e);
+ * }
+ *
* Sample reader code::
*
* struct printk_info info;
@@ -233,13 +278,13 @@
* desc_reserve:F / desc_read:D
* set new descriptor id and reserved (state), then allow writer changes
*
- * data_alloc:A / desc_read:D
+ * data_alloc:A (or data_realloc:A) / desc_read:D
* set old descriptor reusable (state), then modify new data block area
*
- * data_alloc:A / data_push_tail:B
+ * data_alloc:A (or data_realloc:A) / data_push_tail:B
* push data tail (lpos), then modify new data block area
*
- * prb_commit:B / desc_read:B
+ * _prb_commit:B (or _prb_commit:C) / desc_read:B
* store writer changes, then set new descriptor committed (state)
*
* data_push_tail:D / data_push_tail:A
@@ -355,16 +400,22 @@ enum desc_state {
/* Query the state of a descriptor. */
static enum desc_state get_desc_state(unsigned long id,
- unsigned long state_val)
+ unsigned long state_val,
+ bool *is_final)
{
+ if (is_final)
+ *is_final = state_val & DESC_FINAL_MASK;
+
if (id != DESC_ID(state_val))
return desc_miss;
if (state_val & DESC_REUSE_MASK)
return desc_reusable;
- if (state_val & DESC_COMMIT_MASK)
+ if ((state_val & (DESC_COMMIT_MASK | DESC_FINAL_MASK)) ==
+ (DESC_COMMIT_MASK | DESC_FINAL_MASK)) {
return desc_committed;
+ }
return desc_reserved;
}
@@ -384,23 +435,24 @@ static enum desc_state desc_read(struct prb_desc_ring *desc_ring,
/* Check the descriptor state. */
state_val = atomic_long_read(state_var); /* LMM(desc_read:A) */
- d_state = get_desc_state(id, state_val);
+ d_state = get_desc_state(id, state_val, NULL);
if (d_state != desc_committed && d_state != desc_reusable)
return d_state;
/*
* Guarantee the state is loaded before copying the descriptor
* content. This avoids copying obsolete descriptor content that might
- * not apply to the descriptor state. This pairs with prb_commit:B.
+ * not apply to the descriptor state. This pairs with _prb_commit:B
+ * and _prb_commit:C.
*
* Memory barrier involvement:
*
- * If desc_read:A reads from prb_commit:B, then desc_read:C reads
- * from prb_commit:A.
+ * If desc_read:A reads from _prb_commit:B or _prb_commit:C, then
+ * desc_read:C reads from _prb_commit:A.
*
* Relies on:
*
- * WMB from prb_commit:A to prb_commit:B
+ * WMB from _prb_commit:A to (_prb_commit:B or _prb_commit:C)
* matching
* RMB from desc_read:A to desc_read:C
*/
@@ -431,7 +483,8 @@ static enum desc_state desc_read(struct prb_desc_ring *desc_ring,
*
* 2. Guarantee the record data is loaded before re-checking the
* state. This avoids reading an obsolete descriptor state that may
- * not apply to the copied data. This pairs with data_alloc:A.
+ * not apply to the copied data. This pairs with data_alloc:A and
+ * data_realloc:A.
*
* Memory barrier involvement:
*
@@ -453,7 +506,7 @@ static enum desc_state desc_read(struct prb_desc_ring *desc_ring,
/* Re-check the descriptor state. */
state_val = atomic_long_read(state_var); /* LMM(desc_read:E) */
- return get_desc_state(id, state_val);
+ return get_desc_state(id, state_val, NULL);
}
/*
@@ -464,7 +517,7 @@ static enum desc_state desc_read(struct prb_desc_ring *desc_ring,
static void desc_make_reusable(struct prb_desc_ring *desc_ring,
unsigned long id)
{
- unsigned long val_committed = id | DESC_COMMIT_MASK;
+ unsigned long val_committed = id | DESC_COMMIT_MASK | DESC_FINAL_MASK;
unsigned long val_reusable = id | DESC_REUSE_MASK;
struct prb_desc *desc = to_desc(desc_ring, id);
atomic_long_t *state_var = &desc->state_var;
@@ -610,7 +663,7 @@ static bool data_push_tail(struct printk_ringbuffer *rb,
* data_make_reusable() may be due to a newly
* recycled data area causing the tail lpos to
* have been previously pushed. This pairs with
- * data_alloc:A.
+ * data_alloc:A and data_realloc:A.
*
* Memory barrier involvement:
*
@@ -901,7 +954,7 @@ static bool desc_reserve(struct printk_ringbuffer *rb, unsigned long *id_out)
*/
prev_state_val = atomic_long_read(&desc->state_var); /* LMM(desc_reserve:E) */
if (prev_state_val &&
- get_desc_state(id_prev_wrap, prev_state_val) != desc_reusable) {
+ get_desc_state(id_prev_wrap, prev_state_val, NULL) != desc_reusable) {
WARN_ON_ONCE(1);
return false;
}
@@ -1018,6 +1071,77 @@ static char *data_alloc(struct printk_ringbuffer *rb,
return &blk->data[0];
}
+/*
+ * Extend the allocation of a specified data block, invalidating the oldest
+ * data block(s) if necessary. This function also associates the data block
+ * with a specified descriptor.
+ */
+static char *data_realloc(struct printk_ringbuffer *rb,
+ struct prb_data_ring *data_ring, unsigned int size,
+ struct prb_data_blk_lpos *blk_lpos, unsigned long id)
+{
+ struct prb_data_block *blk;
+ unsigned long head_lpos;
+ unsigned long next_lpos;
+ bool wrapped;
+
+ /* Reallocation only works if @blk_lpos is the newest data block. */
+ head_lpos = atomic_long_read(&data_ring->head_lpos);
+ if (head_lpos != blk_lpos->next)
+ return NULL;
+
+ /* Keep track if @blk_lpos was a wrapping data block. */
+ wrapped = (DATA_WRAPS(data_ring, blk_lpos->begin) != DATA_WRAPS(data_ring, blk_lpos->next));
+
+ size = to_blk_size(size);
+
+ next_lpos = get_next_lpos(data_ring, blk_lpos->begin, size);
+
+ /* If the data block does not increase, there is nothing to do. */
+ if (next_lpos == head_lpos) {
+ blk = to_block(data_ring, blk_lpos->begin);
+ return &blk->data[0];
+ }
+
+ if (!data_push_tail(rb, data_ring, next_lpos - DATA_SIZE(data_ring)))
+ return NULL;
+
+ /* The memory barrier involvement is the same as data_alloc:A. */
+ if (!atomic_long_try_cmpxchg(&data_ring->head_lpos, &head_lpos,
+ next_lpos)) { /* LMM(data_realloc:A) */
+ return NULL;
+ }
+
+ blk = to_block(data_ring, blk_lpos->begin);
+
+ if (DATA_WRAPS(data_ring, blk_lpos->begin) != DATA_WRAPS(data_ring, next_lpos)) {
+ struct prb_data_block *old_blk = blk;
+
+ /* Wrapping data blocks store their data at the beginning. */
+ blk = to_block(data_ring, 0);
+
+ /*
+ * Store the ID on the wrapped block for consistency.
+ * The printk_ringbuffer does not actually use it.
+ */
+ blk->id = id;
+
+ if (!wrapped) {
+ /*
+ * Since the allocated space is now in the newly
+ * created wrapping data block, copy the content
+ * from the old data block.
+ */
+ memcpy(&blk->data[0], &old_blk->data[0],
+ (blk_lpos->next - blk_lpos->begin) - sizeof(blk->id));
+ }
+ }
+
+ blk_lpos->next = next_lpos;
+
+ return &blk->data[0];
+}
+
/* Return the number of bytes used by a data block. */
static unsigned int space_used(struct prb_data_ring *data_ring,
struct prb_data_blk_lpos *blk_lpos)
@@ -1098,6 +1222,156 @@ static const char *get_data(struct prb_data_ring *data_ring,
return &db->data[0];
}
+/*
+ * Attempt to move the head descriptor back to the reserved state. This is
+ * only possible if the descriptor is in the commit+!final state.
+ */
+static struct prb_desc *desc_reopen_last(struct prb_desc_ring *desc_ring,
+ unsigned long *id_out)
+{
+ unsigned long prev_state_val;
+ struct prb_desc *d;
+ unsigned long id;
+
+ id = atomic_long_read(&desc_ring->head_id);
+
+ d = to_desc(desc_ring, id);
+
+ prev_state_val = id | DESC_COMMIT_MASK;
+ if (!atomic_long_try_cmpxchg_relaxed(&d->state_var, &prev_state_val, id | 0))
+ return NULL;
+
+ *id_out = id;
+ return d;
+}
+
+/**
+ * prb_reserve_last() - Re-reserve and extend the space in the ringbuffer used
+ * by the newest record.
+ *
+ * @e: The entry structure to setup.
+ * @rb: The ringbuffer to re-reserve and extend data in.
+ * @r: The record structure to allocate buffers for.
+ * @caller_id: The caller ID of the caller (reserving writer).
+ *
+ * This is the public function available to writers to re-reserve and extend
+ * data.
+ *
+ * The writer specifies the text size to extend (not the new total size) by
+ * setting the @text_buf_size field of @r. Dictionaries cannot be extended so
+ * @dict_buf_size of @r should be set to 0. To ensure proper initialization of
+ * @r, prb_rec_init_wr() should be used.
+ *
+ * This function will fail if @caller_id does not match the caller ID of the
+ * newest record. In that case the caller must reserve new data using
+ * prb_reserve().
+ *
+ * Context: Any context. Disables local interrupts on success.
+ * Return: true if text data could be extended, otherwise false.
+ *
+ * On success, the fields @info, @text_buf, @dict_buf of @r will be set by
+ * this function (@dict_buf is always set to NULL.) Also on success, the field
+ * @text_buf_size of @r is set to the new total size (@dict_buf_size is always
+ * set to 0). Also on success prb_record_text_space() can be used on @e to
+ * query the actual space used for the newly extended text data block.
+ *
+ * Important: All @info fields will already be set with the current values for
+ * the record. I.e. @info->text_len will be less than @text_buf_size
+ * and @info->dict_len may be set, even though @dict_buf_size is 0.
+ * Writers can use @info->text_len to know where concatenation
+ * begins and writers should update @info->text_len after
+ * concatenating.
+ */
+bool prb_reserve_last(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
+ struct printk_record *r, u32 caller_id)
+{
+ unsigned int data_size;
+ struct prb_desc *d;
+ unsigned long id;
+
+ local_irq_save(e->irqflags);
+
+ /* Transition the newest descriptor back to the reserved state. */
+ d = desc_reopen_last(&rb->desc_ring, &id);
+ if (!d) {
+ local_irq_restore(e->irqflags);
+ goto fail;
+ }
+
+ /* Back in the reserved state, exclusive access is guaranteed. */
+
+ /*
+ * Set the @e fields here so that prb_commit() can be used if
+ * anything fails from now on.
+ */
+ e->rb = rb;
+ e->id = id;
+
+ if (caller_id != d->info.caller_id) {
+ prb_commit(e);
+ /* prb_commit() re-enabled interrupts. */
+ goto fail;
+ }
+
+ if (BLK_DATALESS(&d->text_blk_lpos)) {
+ r->text_buf = data_alloc(rb, &rb->text_data_ring, r->text_buf_size,
+ &d->text_blk_lpos, id);
+ } else {
+ if (!get_data(&rb->text_data_ring, &d->text_blk_lpos, &data_size)) {
+ prb_commit(e);
+ /* prb_commit() re-enabled interrupts. */
+ goto fail;
+ }
+ /* Use the size from the meta data (if it is sane). */
+ if (d->info.text_len < data_size)
+ data_size = d->info.text_len;
+ r->text_buf_size += data_size;
+ r->text_buf = data_realloc(rb, &rb->text_data_ring, r->text_buf_size,
+ &d->text_blk_lpos, id);
+ }
+ if (r->text_buf_size && !r->text_buf) {
+ prb_commit(e);
+ /* prb_commit() re-enabled interrupts. */
+ goto fail;
+ }
+
+ /* Although dictionary data may be in use, it cannot be extended. */
+ r->dict_buf = NULL;
+ r->dict_buf_size = 0;
+
+ r->info = &d->info;
+
+ e->text_space = space_used(&rb->text_data_ring, &d->text_blk_lpos);
+
+ return true;
+fail:
+ /* Make it clear to the caller that the re-reserve failed. */
+ memset(r, 0, sizeof(*r));
+ return false;
+}
+
+/*
+ * Attempt to finalize a specified descriptor. Finalization only happens if
+ * the descriptor is in the !final or commit+!final state, both of which
+ * yield a state query result of desc_reserved.
+ */
+static void desc_finalize(struct prb_desc_ring *desc_ring, unsigned long id)
+{
+ unsigned long prev_state_val = id | DESC_COMMIT_MASK;
+ struct prb_desc *d = to_desc(desc_ring, id);
+ bool is_final;
+
+ while (!atomic_long_try_cmpxchg_relaxed(&d->state_var, &prev_state_val,
+ prev_state_val | DESC_FINAL_MASK)) {
+
+ if (get_desc_state(id, prev_state_val, &is_final) != desc_reserved)
+ break;
+
+ if (is_final)
+ break;
+ }
+}
+
/**
* prb_reserve() - Reserve space in the ringbuffer.
*
@@ -1157,6 +1431,14 @@ bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
goto fail;
}
+ /*
+ * New data is about to be reserved. Once that happens, previous
+ * descriptors are no longer able to be extended. Finalize the
+ * previous descriptor now so that it can be made available to
+ * readers (when committed).
+ */
+ desc_finalize(desc_ring, DESC_ID(id - 1));
+
d = to_desc(desc_ring, id);
/*
@@ -1218,22 +1500,14 @@ bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
return false;
}
-/**
- * prb_commit() - Commit (previously reserved) data to the ringbuffer.
- *
- * @e: The entry containing the reserved data information.
- *
- * This is the public function available to writers to commit data.
- *
- * Context: Any context. Enables local interrupts.
- */
-void prb_commit(struct prb_reserved_entry *e)
+/* Commit the data (possibly finalizing it) and restore interrupts. */
+static void _prb_commit(struct prb_reserved_entry *e, unsigned long final_mask)
{
struct prb_desc_ring *desc_ring = &e->rb->desc_ring;
struct prb_desc *d = to_desc(desc_ring, e->id);
unsigned long prev_state_val = e->id | 0;
- /* Now the writer has finished all writing: LMM(prb_commit:A) */
+ /* Now the writer has finished all writing: LMM(_prb_commit:A) */
/*
* Set the descriptor as committed. See "ABA Issues" about why
@@ -1244,14 +1518,66 @@ void prb_commit(struct prb_reserved_entry *e)
* this. This pairs with desc_read:B.
*/
if (!atomic_long_try_cmpxchg(&d->state_var, &prev_state_val,
- e->id | DESC_COMMIT_MASK)) { /* LMM(prb_commit:B) */
- WARN_ON_ONCE(1);
+ e->id | DESC_COMMIT_MASK |
+ final_mask)) { /* LMM(_prb_commit:B) */
+ /*
+ * This reserved descriptor must have been finalized already.
+ * Retry with a reserved+final expected value.
+ */
+ prev_state_val = e->id | 0 | DESC_FINAL_MASK;
+
+ if (!atomic_long_try_cmpxchg(&d->state_var, &prev_state_val,
+ e->id | DESC_COMMIT_MASK |
+ DESC_FINAL_MASK)) { /* LMM(_prb_commit:C) */
+ WARN_ON_ONCE(1);
+ }
}
/* Restore interrupts, the reserve/commit window is finished. */
local_irq_restore(e->irqflags);
}
+/**
+ * prb_commit() - Commit (previously reserved) data to the ringbuffer.
+ *
+ * @e: The entry containing the reserved data information.
+ *
+ * This is the public function available to writers to commit data.
+ *
+ * Note that the data is not yet available to readers until it is finalized.
+ * Finalizing happens automatically when space for the next record is
+ * reserved.
+ *
+ * See prb_commit_finalize() for a version of this function that finalizes
+ * immediately.
+ *
+ * Context: Any context. Enables local interrupts.
+ */
+void prb_commit(struct prb_reserved_entry *e)
+{
+ _prb_commit(e, 0);
+}
+
+/**
+ * prb_commit_finalize() - Commit and finalize (previously reserved) data to
+ * the ringbuffer.
+ *
+ * @e: The entry containing the reserved data information.
+ *
+ * This is the public function available to writers to commit+finalize data.
+ *
+ * By finalizing, the data is made immediately available to readers.
+ *
+ * This function should only be used if there are no intentions of extending
+ * this data using prb_reserve_last().
+ *
+ * Context: Any context. Enables local interrupts.
+ */
+void prb_commit_finalize(struct prb_reserved_entry *e)
+{
+ _prb_commit(e, DESC_FINAL_MASK);
+}
+
/*
* Count the number of lines in provided text. All text has at least 1 line
* (even if @text_size is 0). Each '\n' processed is counted as an additional
diff --git a/kernel/printk/printk_ringbuffer.h b/kernel/printk/printk_ringbuffer.h
index 96ef997d7bd6..8ed1f1f154ec 100644
--- a/kernel/printk/printk_ringbuffer.h
+++ b/kernel/printk/printk_ringbuffer.h
@@ -116,8 +116,9 @@ struct prb_reserved_entry {
#define _DESCS_COUNT(ct_bits) (1U << (ct_bits))
#define DESC_SV_BITS (sizeof(unsigned long) * 8)
#define DESC_COMMIT_MASK (1UL << (DESC_SV_BITS - 1))
-#define DESC_REUSE_MASK (1UL << (DESC_SV_BITS - 2))
-#define DESC_FLAGS_MASK (DESC_COMMIT_MASK | DESC_REUSE_MASK)
+#define DESC_FINAL_MASK (1UL << (DESC_SV_BITS - 2))
+#define DESC_REUSE_MASK (1UL << (DESC_SV_BITS - 3))
+#define DESC_FLAGS_MASK (DESC_COMMIT_MASK | DESC_FINAL_MASK | DESC_REUSE_MASK)
#define DESC_ID_MASK (~DESC_FLAGS_MASK)
#define DESC_ID(sv) ((sv) & DESC_ID_MASK)
#define FAILED_LPOS 0x1
@@ -318,7 +319,10 @@ static inline void prb_rec_init_wr(struct printk_record *r,
bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
struct printk_record *r);
+bool prb_reserve_last(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
+ struct printk_record *r, u32 caller_id);
void prb_commit(struct prb_reserved_entry *e);
+void prb_commit_finalize(struct prb_reserved_entry *e);
void prb_init(struct printk_ringbuffer *rb,
char *text_buf, unsigned int text_buf_size,
--
2.20.1
On 2020-08-26, Sergey Senozhatsky <[email protected]> wrote:
>>> @@ -1157,6 +1431,14 @@ bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
>>> goto fail;
>>> }
>>>
>>> + /*
>>> + * New data is about to be reserved. Once that happens, previous
>>> + * descriptors are no longer able to be extended. Finalize the
>>> + * previous descriptor now so that it can be made available to
>>> + * readers (when committed).
>>> + */
>>> + desc_finalize(desc_ring, DESC_ID(id - 1));
>>> +
>>> d = to_desc(desc_ring, id);
>>>
>>> /*
>>
>> Apparently this is not enough to guarantee that past descriptors are
>> finalized. I am able to reproduce a scenario where the finalization
>> of a certain descriptor never happens. That leaves the descriptor
>> permanently in the reserved queried state, which prevents any new
>> records from being created. I am investigating.
>
> Good to know. I also run into problems:
> - broken dmesg (and broken journalctl -f /dev/kmsg poll) and broken
> syslog read
>
> $ strace dmesg
>
> ...
> openat(AT_FDCWD, "/dev/kmsg", O_RDONLY|O_NONBLOCK) = 3
> lseek(3, 0, SEEK_DATA) = 0
> read(3, 0x55dda8c240a8, 8191) = -1 EAGAIN (Resource temporarily unavailable)
> close(3) = 0
> syslog(10 /* SYSLOG_ACTION_SIZE_BUFFER */) = 524288
> mmap(NULL, 528384, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7f43ea847000
> syslog(3 /* SYSLOG_ACTION_READ_ALL */, "", 524296) = 0
Yes, this a consequence of the problem. The tail is in the reserved
queried state, so readers will not advance beyond it.
This series makes a very naive assumption that the previous descriptor
is either in the reserved or committed queried states. The fact is, it
can be in any of the 4 queried states. Adding support for finalization
of all the states then gets quite complex, since any state transition
(cmpxchg) may have to deal with an unexpected FINAL flag.
The ringbuffer was designed so that descriptors are completely
self-contained. So adding logic where an action on one descriptor should
affect another descriptor is far more complex than I initially expected.
Keep in mind the finalization concept satisfies 3 things:
- denote if a record can be extended (i.e. transition back to reserved)
- denote if a reader may read the record
- denote if a writer may recycle a record
I have not yet given up on the idea of finalization (particularly
because it allows mainline LOG_CONT behavior to be preserved locklessy),
but I am no longer sure if this is the direction we want to take.
John Ogness
On 2020-08-24, John Ogness <[email protected]> wrote:
> @@ -1157,6 +1431,14 @@ bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
> goto fail;
> }
>
> + /*
> + * New data is about to be reserved. Once that happens, previous
> + * descriptors are no longer able to be extended. Finalize the
> + * previous descriptor now so that it can be made available to
> + * readers (when committed).
> + */
> + desc_finalize(desc_ring, DESC_ID(id - 1));
> +
> d = to_desc(desc_ring, id);
>
> /*
Apparently this is not enough to guarantee that past descriptors are
finalized. I am able to reproduce a scenario where the finalization of a
certain descriptor never happens. That leaves the descriptor permanently
in the reserved queried state, which prevents any new records from being
created. I am investigating.
John Ogness
On (20/08/26 10:45), John Ogness wrote:
> On 2020-08-24, John Ogness <[email protected]> wrote:
> > @@ -1157,6 +1431,14 @@ bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
> > goto fail;
> > }
> >
> > + /*
> > + * New data is about to be reserved. Once that happens, previous
> > + * descriptors are no longer able to be extended. Finalize the
> > + * previous descriptor now so that it can be made available to
> > + * readers (when committed).
> > + */
> > + desc_finalize(desc_ring, DESC_ID(id - 1));
> > +
> > d = to_desc(desc_ring, id);
> >
> > /*
>
> Apparently this is not enough to guarantee that past descriptors are
> finalized. I am able to reproduce a scenario where the finalization of a
> certain descriptor never happens. That leaves the descriptor permanently
> in the reserved queried state, which prevents any new records from being
> created. I am investigating.
Good to know. I also run into problems:
- broken dmesg (and broken journalctl -f /dev/kmsg poll) and broken
syslog read
$ strace dmesg
...
openat(AT_FDCWD, "/dev/kmsg", O_RDONLY|O_NONBLOCK) = 3
lseek(3, 0, SEEK_DATA) = 0
read(3, 0x55dda8c240a8, 8191) = -1 EAGAIN (Resource temporarily unavailable)
close(3) = 0
syslog(10 /* SYSLOG_ACTION_SIZE_BUFFER */) = 524288
mmap(NULL, 528384, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7f43ea847000
syslog(3 /* SYSLOG_ACTION_READ_ALL */, "", 524296) = 0
munmap(0x7f43ea847000, 528384) = 0
...
-ss
On Wed 2020-08-26 14:43:22, John Ogness wrote:
> On 2020-08-26, Sergey Senozhatsky <[email protected]> wrote:
> >>> @@ -1157,6 +1431,14 @@ bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
> >>> goto fail;
> >>> }
> >>>
> >>> + /*
> >>> + * New data is about to be reserved. Once that happens, previous
> >>> + * descriptors are no longer able to be extended. Finalize the
> >>> + * previous descriptor now so that it can be made available to
> >>> + * readers (when committed).
> >>> + */
> >>> + desc_finalize(desc_ring, DESC_ID(id - 1));
> >>> +
> >>> d = to_desc(desc_ring, id);
> >>>
> >>> /*
> >>
> >> Apparently this is not enough to guarantee that past descriptors are
> >> finalized. I am able to reproduce a scenario where the finalization
> >> of a certain descriptor never happens. That leaves the descriptor
> >> permanently in the reserved queried state, which prevents any new
> >> records from being created. I am investigating.
> >
> > Good to know. I also run into problems:
> > - broken dmesg (and broken journalctl -f /dev/kmsg poll) and broken
> > syslog read
> >
> > $ strace dmesg
> >
> > ...
> > openat(AT_FDCWD, "/dev/kmsg", O_RDONLY|O_NONBLOCK) = 3
> > lseek(3, 0, SEEK_DATA) = 0
> > read(3, 0x55dda8c240a8, 8191) = -1 EAGAIN (Resource temporarily unavailable)
> > close(3) = 0
> > syslog(10 /* SYSLOG_ACTION_SIZE_BUFFER */) = 524288
> > mmap(NULL, 528384, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7f43ea847000
> > syslog(3 /* SYSLOG_ACTION_READ_ALL */, "", 524296) = 0
>
> Yes, this a consequence of the problem. The tail is in the reserved
> queried state, so readers will not advance beyond it.
>
> This series makes a very naive assumption that the previous descriptor
> is either in the reserved or committed queried states. The fact is, it
> can be in any of the 4 queried states. Adding support for finalization
> of all the states then gets quite complex, since any state transition
> (cmpxchg) may have to deal with an unexpected FINAL flag.
>
> The ringbuffer was designed so that descriptors are completely
> self-contained. So adding logic where an action on one descriptor should
> affect another descriptor is far more complex than I initially expected.
>
> Keep in mind the finalization concept satisfies 3 things:
>
> - denote if a record can be extended (i.e. transition back to reserved)
> - denote if a reader may read the record
> - denote if a writer may recycle a record
>
> I have not yet given up on the idea of finalization (particularly
> because it allows mainline LOG_CONT behavior to be preserved locklessy),
In each case, we need to finalize the descriptor also in prb_commit()
when the descriptor is not longer the last reserved one.
It has to be done in two steps to avoid race:
prb_commit()
+ set PRB_COMMIT_MASK
+ check if it is still the last descriptor in the array
+ set PRB_FINAL_MASK when it is not the last descriptor
It should work because prb_reserve() finalizes the previous
descriptor after the new one is reserved. As a result:
+ prb_reserve() should either see PRB_COMMIT_MASK in the previous
descriptor and be able to finalize it.
+ or prb_commit() will see that the head moved and it is not
longer the last reserved one.
I keep my fingers crossed.
Best Regards,
Petr
PS: I am still in the middle of review. And I had a feeling that
this situatuation was not handled.
On 2020-08-26, Petr Mladek <[email protected]> wrote:
>> This series makes a very naive assumption that the previous
>> descriptor is either in the reserved or committed queried states. The
>> fact is, it can be in any of the 4 queried states. Adding support for
>> finalization of all the states then gets quite complex, since any
>> state transition (cmpxchg) may have to deal with an unexpected FINAL
>> flag.
>
> It has to be done in two steps to avoid race:
>
> prb_commit()
>
> + set PRB_COMMIT_MASK
> + check if it is still the last descriptor in the array
> + set PRB_FINAL_MASK when it is not the last descriptor
>
> It should work because prb_reserve() finalizes the previous
> descriptor after the new one is reserved. As a result:
>
> + prb_reserve() should either see PRB_COMMIT_MASK in the previous
> descriptor and be able to finalize it.
>
> + or prb_commit() will see that the head moved and it is not
> longer the last reserved one.
I do not like the idea of relying on descriptors to finalize
themselves. I worry that there might be some hole there. Failing to
finalize basically disables printk, so that is pretty serious.
Below is a patch against this series that adds support for finalizing
all 4 queried states. It passes all my tests. Note that the code handles
2 corner cases:
1. When seq is 0, there is no previous descriptor to finalize. This
exception is important because we don't want to finalize the -1
placeholder. Otherwise, upon the first wrap, a descriptor will be
prematurely finalized.
2. When a previous descriptor is being reserved for the first time, it
might have a state_var value of 0 because the writer is still in
prb_reserve() and has not set the initial value yet. I added
considerable comments on this special case.
I am comfortable with adding this new code, although it clearly adds
complexity.
John Ogness
diff --git a/kernel/printk/printk_ringbuffer.c b/kernel/printk/printk_ringbuffer.c
index 90d48973ac9e..1ed1e9eb930f 100644
--- a/kernel/printk/printk_ringbuffer.c
+++ b/kernel/printk/printk_ringbuffer.c
@@ -860,9 +860,11 @@ static bool desc_reserve(struct printk_ringbuffer *rb, unsigned long *id_out)
struct prb_desc_ring *desc_ring = &rb->desc_ring;
unsigned long prev_state_val;
unsigned long id_prev_wrap;
+ unsigned long state_val;
struct prb_desc *desc;
unsigned long head_id;
unsigned long id;
+ bool is_final;
head_id = atomic_long_read(&desc_ring->head_id); /* LMM(desc_reserve:A) */
@@ -953,10 +955,17 @@ static bool desc_reserve(struct printk_ringbuffer *rb, unsigned long *id_out)
* See "ABA Issues" about why this verification is performed.
*/
prev_state_val = atomic_long_read(&desc->state_var); /* LMM(desc_reserve:E) */
- if (prev_state_val &&
- get_desc_state(id_prev_wrap, prev_state_val, NULL) != desc_reusable) {
- WARN_ON_ONCE(1);
- return false;
+ if (get_desc_state(id_prev_wrap, prev_state_val, &is_final) != desc_reusable) {
+ /*
+ * If this descriptor has never been used, @prev_state_val
+ * will be 0. However, even though it may have never been
+ * used, it may have been finalized. So that flag must be
+ * ignored.
+ */
+ if ((prev_state_val & ~DESC_FINAL_MASK)) {
+ WARN_ON_ONCE(1);
+ return false;
+ }
}
/*
@@ -967,10 +976,25 @@ static bool desc_reserve(struct printk_ringbuffer *rb, unsigned long *id_out)
* any other changes. A write memory barrier is sufficient for this.
* This pairs with desc_read:D.
*/
- if (!atomic_long_try_cmpxchg(&desc->state_var, &prev_state_val,
- id | 0)) { /* LMM(desc_reserve:F) */
- WARN_ON_ONCE(1);
- return false;
+ if (is_final)
+ state_val = id | 0 | DESC_FINAL_MASK;
+ else
+ state_val = id | 0;
+ if (atomic_long_cmpxchg(&desc->state_var, prev_state_val,
+ state_val) != prev_state_val) { /* LMM(desc_reserve:F) */
+ /*
+ * This reusable descriptor must have been finalized already.
+ * Retry with a reusable+final expected value.
+ */
+ prev_state_val |= DESC_FINAL_MASK;
+ state_val |= DESC_FINAL_MASK;
+
+ if (!atomic_long_try_cmpxchg(&desc->state_var, &prev_state_val,
+ state_val)) { /* LMM(desc_reserve:FIXME) */
+
+ WARN_ON_ONCE(1);
+ return false;
+ }
}
/* Now data in @desc can be modified: LMM(desc_reserve:G) */
@@ -1364,9 +1388,37 @@ static void desc_finalize(struct prb_desc_ring *desc_ring, unsigned long id)
while (!atomic_long_try_cmpxchg_relaxed(&d->state_var, &prev_state_val,
prev_state_val | DESC_FINAL_MASK)) {
- if (get_desc_state(id, prev_state_val, &is_final) != desc_reserved)
+ switch (get_desc_state(id, prev_state_val, &is_final)) {
+ case desc_miss:
+ /*
+ * If the ID is exactly 1 wrap behind the expected, it is
+ * in the process of being reserved by another writer and
+ * must be considered reserved.
+ */
+ if (get_desc_state(DESC_ID_PREV_WRAP(desc_ring, id),
+ prev_state_val, &is_final) != desc_reusable) {
+ /*
+ * If this descriptor has never been used, @prev_state_val
+ * will be 0. However, even though it may have never been
+ * used, it may have been finalized. So that flag must be
+ * ignored.
+ */
+ if ((prev_state_val & ~DESC_FINAL_MASK)) {
+ WARN_ON_ONCE(1);
+ return;
+ }
+ }
+ fallthrough;
+ case desc_reserved:
+ case desc_reusable:
+ /* finalizable, try again */
break;
+ case desc_committed:
+ /* already finalized */
+ return;
+ }
+ /* already finalized? */
if (is_final)
break;
}
@@ -1431,14 +1483,6 @@ bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
goto fail;
}
- /*
- * New data is about to be reserved. Once that happens, previous
- * descriptors are no longer able to be extended. Finalize the
- * previous descriptor now so that it can be made available to
- * readers (when committed).
- */
- desc_finalize(desc_ring, DESC_ID(id - 1));
-
d = to_desc(desc_ring, id);
/*
@@ -1464,6 +1508,16 @@ bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
else
d->info.seq += DESCS_COUNT(desc_ring);
+ /*
+ * New data is about to be reserved. Once that happens, previous
+ * descriptors are no longer able to be extended. Finalize the
+ * previous descriptor now so that it can be made available to
+ * readers (when committed). (For the first descriptor, there is
+ * no previous record to finalize.)
+ */
+ if (d->info.seq > 0)
+ desc_finalize(desc_ring, DESC_ID(id - 1));
+
r->text_buf = data_alloc(rb, &rb->text_data_ring, r->text_buf_size,
&d->text_blk_lpos, id);
/* If text data allocation fails, a data-less record is committed. */
Hi,
this mail is based on my review that I did last two days. I haven't
seen the last code that tried to handle the finalize issues.
Anyway, this feedback might give some clueue as well.
IMPORTANT: It seems that we both had different understanding of
the DESC_FINAL_MASK behavior. It might explain some problems
and confusion. See my last comment in this mail for more.
On Mon 2020-08-24 12:41:36, John Ogness wrote:
> Add support for extending the last data block. For this, introduce a new
> finalization state flag that identifies if a descriptor may be extended.
>
> When a writer calls the commit function prb_commit(), the record may still
> continue to be in the reserved queried state. In order for that record to
> enter into the committed queried state, that record also must be finalized.
> Finalization can occur anytime while the record is in the reserved queried
> state, even before the writer has called prb_commit().
The above paragraph is a bit confusing. The reserved state means that
that the record is being modified. prb_commit() sets DESC_COMMIT_MASK.
It means that the record is in a consistent state. It might get
reopened or finalized.
> Until a record is finalized (represented by "DESC_FINAL_MASK"), a writer
> may "reopen" that record and extend it with more data.
I would replace the above two paragrahs with something like:
"The original DESC_COMMIT_MASK is still set when the record is in
a consistent state. In this state, it can either get reopened or
finalized.
The new DESC_FINAL_MASK is set when the record can not longer get
modified. It might be set explicitely by the new prb_commit_finalize()
when the writer does not longer plan to append data. But it is also
set automatically when a new descriptor is allocated.
The automatic finalization simplifies the implementation. Only the
last datablock could get extended easily."
> Note that existing descriptors are automatically finalized whenever new
> descriptors are created. A record can never be "unfinalized".
>
> Two new memory barrier pairs are introduced, but these are really just
> alternate path barriers that exactly correspond to existing memory
> barriers.
[...]
> diff --git a/kernel/printk/printk_ringbuffer.c b/kernel/printk/printk_ringbuffer.c
> index d66718e74aae..90d48973ac9e 100644
> --- a/kernel/printk/printk_ringbuffer.c
> +++ b/kernel/printk/printk_ringbuffer.c
> @@ -79,6 +81,20 @@
> * committed or reusable queried state. This makes it possible that a valid
> * sequence number of the tail is always available.
> *
> + * Descriptor Finalization
> + * ~~~~~~~~~~~~~~~~~~~~~~~
> + * When a writer calls the commit function prb_commit(), the record may still
> + * continue to be in the reserved queried state. In order for that record to
> + * enter into the committed queried state, that record also must be finalized.
> + * Finalization can occur anytime while the record is in the reserved queried
> + * state, even before the writer has called prb_commit().
This has the same problem as the commit message ;-)
> + *
> + * Until a record is finalized (represented by "DESC_FINAL_MASK"), a writer
> + * may "reopen" that record and extend it with more data.
> + *
> + * Note that existing descriptors are automatically finalized whenever new
> + * descriptors are created. A record can never be "unfinalized".
> + *
> * Data Rings
> * ~~~~~~~~~~
> * The two data rings (text and dictionary) function identically. They exist
[...]
> @@ -1018,6 +1071,77 @@ static char *data_alloc(struct printk_ringbuffer *rb,
> return &blk->data[0];
> }
>
> +/*
> + * Extend the allocation of a specified data block, invalidating the oldest
> + * data block(s) if necessary. This function also associates the data block
> + * with a specified descriptor.
> + */
data_realloc() actually allows to even reduce the buffer size. It is
used when the original buffer was larger than the original
info->text_len + requested text_buf_size.
I though a lot about it. It is a questionable behavior.
First, we did not expected this when designed the ring buffer. Well, I
do not see why it should not work. More free space in the ring buffer
does not hurt. And the cmpxchg in data_alloc() compares the exact
lpos.
Second, there is the idea to flush the existing pieces of continuous
lines to the consoles even before the record is finalized. For this,
we will need to make sure that the already committed text does not longer
change. The main responsibility has the caller (printk code). But
the ring buffer code will need to allow reading the already committed
text even when the buffer is being extended. But even this should work
when shrinking the ring buffer. It will be in the same wrap and
the reader would read only the part defined by the original info->text_len.
Anyway, the comment is too short. It does not describe what exactly
is done and the limitations. I suggest something like:
/*
* Try to resize an existing data block defined by the given @id.
* It copies the old data when the resized datablock is newly wrapped.
*
* Fail when it is not the last allocated data block. Also fail when
* there is not enough space and the oldest data blocks could not get
* invalidated.
*
* Return pointer to the beginning of the entire data buffer or
* NULL on failure.
*
*/
> +static char *data_realloc(struct printk_ringbuffer *rb,
> + struct prb_data_ring *data_ring, unsigned int size,
> + struct prb_data_blk_lpos *blk_lpos, unsigned long id)
> +{
> + struct prb_data_block *blk;
> + unsigned long head_lpos;
> + unsigned long next_lpos;
> + bool wrapped;
> +
> + /* Reallocation only works if @blk_lpos is the newest data block. */
> + head_lpos = atomic_long_read(&data_ring->head_lpos);
> + if (head_lpos != blk_lpos->next)
> + return NULL;
> +
> + /* Keep track if @blk_lpos was a wrapping data block. */
> + wrapped = (DATA_WRAPS(data_ring, blk_lpos->begin) != DATA_WRAPS(data_ring, blk_lpos->next));
> +
> + size = to_blk_size(size);
> +
> + next_lpos = get_next_lpos(data_ring, blk_lpos->begin, size);
> +
> + /* If the data block does not increase, there is nothing to do. */
> + if (next_lpos == head_lpos) {
> + blk = to_block(data_ring, blk_lpos->begin);
> + return &blk->data[0];
> + }
> +
> + if (!data_push_tail(rb, data_ring, next_lpos - DATA_SIZE(data_ring)))
> + return NULL;
> +
> + /* The memory barrier involvement is the same as data_alloc:A. */
> + if (!atomic_long_try_cmpxchg(&data_ring->head_lpos, &head_lpos,
> + next_lpos)) { /* LMM(data_realloc:A) */
> + return NULL;
> + }
> +
> + blk = to_block(data_ring, blk_lpos->begin);
> +
> + if (DATA_WRAPS(data_ring, blk_lpos->begin) != DATA_WRAPS(data_ring, next_lpos)) {
> + struct prb_data_block *old_blk = blk;
> +
> + /* Wrapping data blocks store their data at the beginning. */
> + blk = to_block(data_ring, 0);
> +
> + /*
> + * Store the ID on the wrapped block for consistency.
> + * The printk_ringbuffer does not actually use it.
> + */
> + blk->id = id;
> +
> + if (!wrapped) {
> + /*
> + * Since the allocated space is now in the newly
> + * created wrapping data block, copy the content
> + * from the old data block.
> + */
> + memcpy(&blk->data[0], &old_blk->data[0],
> + (blk_lpos->next - blk_lpos->begin) - sizeof(blk->id));
> + }
> + }
> +
> + blk_lpos->next = next_lpos;
> +
> + return &blk->data[0];
> +}
Sigh, this is a lot of non-trivial code is duplicated here. But I do
not see a good was how to share it. The data extension is so special that it
needs special checks and actions on many locations.
> +
> /* Return the number of bytes used by a data block. */
> static unsigned int space_used(struct prb_data_ring *data_ring,
> struct prb_data_blk_lpos *blk_lpos)
> @@ -1098,6 +1222,156 @@ static const char *get_data(struct prb_data_ring *data_ring,
> return &db->data[0];
> }
>
> +/*
> + * Attempt to move the head descriptor back to the reserved state. This is
> + * only possible if the descriptor is in the commit+!final state.
> + */
> +static struct prb_desc *desc_reopen_last(struct prb_desc_ring *desc_ring,
> + unsigned long *id_out)
> +{
> + unsigned long prev_state_val;
> + struct prb_desc *d;
> + unsigned long id;
> +
> + id = atomic_long_read(&desc_ring->head_id);
> +
> + d = to_desc(desc_ring, id);
> +
> + prev_state_val = id | DESC_COMMIT_MASK;
> + if (!atomic_long_try_cmpxchg_relaxed(&d->state_var, &prev_state_val, id | 0))
> + return NULL;
This might deserve a comment why any barrier is not needed here.
I guess that you did not add it because the reader API allows
to read data only when DESC_FINAL_MASK is set.
But this assumption looks dangerous to me. As I already wrote above.
We might later need to add an API to read the already committed
parts of a continuous lines.
Anyway, this state change has similar effect as the similar one in
desc_reserve(). The result is that the descriptor is going to be
modified and might not be in a consistent state.
Note that desc_reserve() moves the descriptor from reusable
to reserved state. The descriptor was in a consistent state
when it was reusable.
Anyway, I would feel much more comfortable when a full barrier is used here.
I mean to do:
if (!atomic_long_try_cmpxchg(&d->state_var, &prev_state_val,id | 0))
return NULL;
> +
> + *id_out = id;
> + return d;
> +}
> +
> +/**
> + * prb_reserve_last() - Re-reserve and extend the space in the ringbuffer used
> + * by the newest record.
The name is a bit ambiguous. prb_reserve_in_last() sounds more clear.
It would also explain why the caller specifies only the text size
to extend and not the total one.
> + *
> + * @e: The entry structure to setup.
> + * @rb: The ringbuffer to re-reserve and extend data in.
> + * @r: The record structure to allocate buffers for.
> + * @caller_id: The caller ID of the caller (reserving writer).
> + *
> + * This is the public function available to writers to re-reserve and extend
> + * data.
> + *
> + * The writer specifies the text size to extend (not the new total size) by
> + * setting the @text_buf_size field of @r. Dictionaries cannot be extended so
> + * @dict_buf_size of @r should be set to 0. To ensure proper initialization of
> + * @r, prb_rec_init_wr() should be used.
> + *
> + * This function will fail if @caller_id does not match the caller ID of the
> + * newest record. In that case the caller must reserve new data using
> + * prb_reserve().
> + *
> + * Context: Any context. Disables local interrupts on success.
> + * Return: true if text data could be extended, otherwise false.
> + *
> + * On success, the fields @info, @text_buf, @dict_buf of @r will be set by
> + * this function (@dict_buf is always set to NULL.) Also on success, the field
> + * @text_buf_size of @r is set to the new total size (@dict_buf_size is always
> + * set to 0). Also on success prb_record_text_space() can be used on @e to
> + * query the actual space used for the newly extended text data block.
The above paragraph is hard to read. And it does not describe what
exactly is done with @info and @text_buf fields.
What about the following?
"On success:
+ @r->text_buf points to the beginning of the entire text buffer.
+ @r->text_buf_len is set to the new total size of the buffer.
+ @r->dict_buf and @r->dict+dict_buf_len are cleared because
extending the dict buffer is not supported.
+ @r->info is not touched so that @info.text_len could be used
to append the text.
+ prb_record_text_space() can be used on @e to query the
new actually used space."
> + *
> + * Important: All @info fields will already be set with the current values for
> + * the record. I.e. @info->text_len will be less than @text_buf_size
> + * and @info->dict_len may be set, even though @dict_buf_size is 0.
> + * Writers can use @info->text_len to know where concatenation
> + * begins and writers should update @info->text_len after
> + * concatenating.
Hmm, this is inconsistent with prb_reserve() behavior and thus error-prone.
I would change the behavior so that:
+ prb_reserve() clears the entire @info struct, including text_len,
dict_len. The caller will be responsible for filling the right
values.
+ prb_reserve_in_last() will not touch @info struct at all.
The change in prb_reserve() should be done in separate preparation patch.
> + */
> +bool prb_reserve_last(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
> + struct printk_record *r, u32 caller_id)
> +{
> + unsigned int data_size;
> + struct prb_desc *d;
> + unsigned long id;
> +
> + local_irq_save(e->irqflags);
> +
> + /* Transition the newest descriptor back to the reserved state. */
> + d = desc_reopen_last(&rb->desc_ring, &id);
> + if (!d) {
> + local_irq_restore(e->irqflags);
> + goto fail;
> + }
> +
> + /* Back in the reserved state, exclusive access is guaranteed. */
> +
> + /*
> + * Set the @e fields here so that prb_commit() can be used if
> + * anything fails from now on.
> + */
> + e->rb = rb;
> + e->id = id;
> +
> + if (caller_id != d->info.caller_id) {
> + prb_commit(e);
> + /* prb_commit() re-enabled interrupts. */
> + goto fail;
> + }
> +
> + if (BLK_DATALESS(&d->text_blk_lpos)) {
> + r->text_buf = data_alloc(rb, &rb->text_data_ring, r->text_buf_size,
> + &d->text_blk_lpos, id);
> + } else {
> + if (!get_data(&rb->text_data_ring, &d->text_blk_lpos, &data_size)) {
> + prb_commit(e);
> + /* prb_commit() re-enabled interrupts. */
> + goto fail;
> + }
> + /* Use the size from the meta data (if it is sane). */
> + if (d->info.text_len < data_size)
> + data_size = d->info.text_len;
printk() is going to write behing the buffer when info.text_len is not
sane. Therefore I would do the opposite here:
/*
* Count the size of the entire text buffer. Use only
* the really used space from the original one.
* Make sure that the size of the used space is sane.
*/
if (WARN_ON_ONCE(d->info.text_len > data_size))
d->info.text_len = data_size;
r->text_buf_size += d->info.text_len;
Also need to check that the requested size is acceptable:
if (!data_check_size(&rb->text_data_ring, r->text_buf_size)) {
prb_commit(e);
/* prb_commit() re-enabled interrupts. */
goto fail;
}
> + r->text_buf_size += data_size;
> + r->text_buf = data_realloc(rb, &rb->text_data_ring, r->text_buf_size,
> + &d->text_blk_lpos, id);
> + }
> + if (r->text_buf_size && !r->text_buf) {
> + prb_commit(e);
> + /* prb_commit() re-enabled interrupts. */
> + goto fail;
> + }
> +
> + /* Although dictionary data may be in use, it cannot be extended. */
> + r->dict_buf = NULL;
> + r->dict_buf_size = 0;
> +
> + r->info = &d->info;
> +
> + e->text_space = space_used(&rb->text_data_ring, &d->text_blk_lpos);
> +
> + return true;
> +fail:
> + /* Make it clear to the caller that the re-reserve failed. */
> + memset(r, 0, sizeof(*r));
> + return false;
It would make sense to add one more fail target.
fail:
prb_commit(e);
/* prb_commit() re-enabled interrupts. */
fail_reopen:
/* Make it clear to the caller that the re-reserve failed. */
memset(r, 0, sizeof(*r));
return false;
> +}
> +
> +/*
> + * Attempt to finalize a specified descriptor. Finalization only happens if
> + * the descriptor is in the !final or commit+!final state, both of which
> + * yield a state query result of desc_reserved.
> + */
> +static void desc_finalize(struct prb_desc_ring *desc_ring, unsigned long id)
> +{
> + unsigned long prev_state_val = id | DESC_COMMIT_MASK;
> + struct prb_desc *d = to_desc(desc_ring, id);
> + bool is_final;
> +
> + while (!atomic_long_try_cmpxchg_relaxed(&d->state_var, &prev_state_val,
> + prev_state_val | DESC_FINAL_MASK)) {
> +
> + if (get_desc_state(id, prev_state_val, &is_final) != desc_reserved)
> + break;
> +
> + if (is_final)
> + break;
> + }
Do we need the cycle and check if it succeeded?
When could the cmpxchg fail?
+ It is still reserved or reserved again => it should get
finalized by the one who reserved it. This is related to
the problem reported/discussed in the other thread,
see https://lore.kernel.org/r/[email protected]
+ It is already in the final (committed) state => success
+ It is in reusable state => already done in the past
+ It is already reused => already done in the past
Do I miss any scenario?
IMHO, the following simple call should be enough.
atomic_long_try_cmpxchg_relaxed(&d->state_var, &prev_state_val,
prev_state_val | DESC_FINAL_MASK);
Well, I am not sure about the "relaxed" variant again. I would feel
more comfortable when it is full barrier especially because it
might be done by another CPU than what did the commit. And
we need to avoid the race when nobody finalize the record.
> +static void desc_finalize(struct prb_desc_ring *desc_ring, unsigned long id)
> +{
> +}
> +
> /**
> * prb_reserve() - Reserve space in the ringbuffer.
> *
> @@ -1218,22 +1500,14 @@ bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
> return false;
> }
>
> -/**
> - * prb_commit() - Commit (previously reserved) data to the ringbuffer.
> - *
> - * @e: The entry containing the reserved data information.
> - *
> - * This is the public function available to writers to commit data.
> - *
> - * Context: Any context. Enables local interrupts.
> - */
> -void prb_commit(struct prb_reserved_entry *e)
> +/* Commit the data (possibly finalizing it) and restore interrupts. */
> +static void _prb_commit(struct prb_reserved_entry *e, unsigned long final_mask)
> {
> struct prb_desc_ring *desc_ring = &e->rb->desc_ring;
> struct prb_desc *d = to_desc(desc_ring, e->id);
> unsigned long prev_state_val = e->id | 0;
>
> - /* Now the writer has finished all writing: LMM(prb_commit:A) */
> + /* Now the writer has finished all writing: LMM(_prb_commit:A) */
>
> /*
> * Set the descriptor as committed. See "ABA Issues" about why
> @@ -1244,14 +1518,66 @@ void prb_commit(struct prb_reserved_entry *e)
> * this. This pairs with desc_read:B.
> */
> if (!atomic_long_try_cmpxchg(&d->state_var, &prev_state_val,
> - e->id | DESC_COMMIT_MASK)) { /* LMM(prb_commit:B) */
> - WARN_ON_ONCE(1);
> + e->id | DESC_COMMIT_MASK |
> + final_mask)) { /* LMM(_prb_commit:B) */
> + /*
> + * This reserved descriptor must have been finalized already.
> + * Retry with a reserved+final expected value.
> + */
> + prev_state_val = e->id | 0 | DESC_FINAL_MASK;
This does not make sense to me. The state "e->id | 0 | DESC_FINAL_MASK"
must never happen. It would mean that someone finalized
record that is still being modified.
Or we both have different understanding of the logic.
Well, there are actually two approaches:
+ I originally expected that FINAL bit could be set only when
COMMIT bit is set. But this brings the problems that prb_commit()
would need to set FINAL when it is not longer the last descriptor.
+ Another approach is that FINAL bit could be set even when the
COMMIT is not set. It would always be set by the next
prb_reserve(). But it causes that there are more possible
combinations of COMMIT and FINAL bits. As a result, the caller
would need try more variants of the cmpxchg() calls. And
it creates another races/cycles, ...
In this case, a better name for the bit would be NON_REOPENABLE
or something like this.
I guess that you wanted to implement the 2nd approach and ended in
many troubles. I wonder if the 1st approach might be easier.
> + if (!atomic_long_try_cmpxchg(&d->state_var, &prev_state_val,
> + e->id | DESC_COMMIT_MASK |
> + DESC_FINAL_MASK)) { /* LMM(_prb_commit:C) */
> + WARN_ON_ONCE(1);
> + }
> }
>
> /* Restore interrupts, the reserve/commit window is finished. */
> local_irq_restore(e->irqflags);
> }
>
Best Regards,
Petr
Hi Petr,
Thanks for the review. Most of your suggested changes are fine and I'll
integrate them for the next version. Here a few comments on some open
issues.
On 2020-08-27, Petr Mladek <[email protected]> wrote:
>> +static struct prb_desc *desc_reopen_last(struct prb_desc_ring *desc_ring,
>> + unsigned long *id_out)
>> +{
>> + unsigned long prev_state_val;
>> + struct prb_desc *d;
>> + unsigned long id;
>> +
>> + id = atomic_long_read(&desc_ring->head_id);
>> +
>> + d = to_desc(desc_ring, id);
>> +
>> + prev_state_val = id | DESC_COMMIT_MASK;
>> + if (!atomic_long_try_cmpxchg_relaxed(&d->state_var, &prev_state_val, id | 0))
>> + return NULL;
>
> This might deserve a comment why any barrier is not needed here.
> I guess that you did not add it because the reader API allows
> to read data only when DESC_FINAL_MASK is set.
>
> But this assumption looks dangerous to me. As I already wrote above.
> We might later need to add an API to read the already committed
> parts of a continuous lines.
Supporting such an API will require more than just a memory barrier
here. When a record is re-opened, it actually transitions back to a
reserved state. At that point there is no difference between a record
being extended and one that is reserved for the first time.
Perhaps my idea of chaining blocks [0] might be more appropriate. Then
there are discrete committed pieces that can be read if needed.
Either way, such a partial-read feature will require a new memory
barrier pair so a reader would see the state change before any data was
modified. As the code is now (without such a feature), a memory barrier
is not needed.
>> +static void desc_finalize(struct prb_desc_ring *desc_ring, unsigned long id)
>> +{
>> + unsigned long prev_state_val = id | DESC_COMMIT_MASK;
>> + struct prb_desc *d = to_desc(desc_ring, id);
>> + bool is_final;
>> +
>> + while (!atomic_long_try_cmpxchg_relaxed(&d->state_var, &prev_state_val,
>> + prev_state_val | DESC_FINAL_MASK)) {
>> +
>> + if (get_desc_state(id, prev_state_val, &is_final) != desc_reserved)
>> + break;
>> +
>> + if (is_final)
>> + break;
>> + }
>
> Do we need the cycle and check if it succeeded?
The initial cmpxchg() is assuming the previous descriptor is in the
commit+!final state (common case). But, for example, if it was in the
reserve+final state, the cmpxhg() would fail and the finalization is
already successful.
> When could the cmpxchg fail?
>
> + It is still reserved or reserved again => it should get
> finalized by the one who reserved it. This is related to
> the problem reported/discussed in the other thread,
> see https://lore.kernel.org/r/[email protected]
>
> + It is already in the final (committed) state => success
>
> + It is in reusable state => already done in the past
>
> + It is already reused => already done in the past
+ It is being reserved but is still uninitialized (state_var = 0).
+ It is being reserved but is not yet set (id is one wrap behind).
> IMHO, the following simple call should be enough.
>
> atomic_long_try_cmpxchg_relaxed(&d->state_var, &prev_state_val,
> prev_state_val | DESC_FINAL_MASK);
This works if we define that the FINAL flag can only be set when the
COMMIT flag is set. (More on this below.)
> Well, I am not sure about the "relaxed" variant again. I would feel
> more comfortable when it is full barrier especially because it
> might be done by another CPU than what did the commit. And
> we need to avoid the race when nobody finalize the record.
The FINAL flag has nothing to do with whether or not the data is
consistent. It's only use is to prevent a writer from transitioning a
descriptor back to reserved via cmpxchg(). What would a memory barrier
here be pairing with?
>> +static void _prb_commit(struct prb_reserved_entry *e, unsigned long final_mask)
>> {
>> struct prb_desc_ring *desc_ring = &e->rb->desc_ring;
>> struct prb_desc *d = to_desc(desc_ring, e->id);
>> unsigned long prev_state_val = e->id | 0;
>>
>> - /* Now the writer has finished all writing: LMM(prb_commit:A) */
>> + /* Now the writer has finished all writing: LMM(_prb_commit:A) */
>>
>> /*
>> * Set the descriptor as committed. See "ABA Issues" about why
>> @@ -1244,14 +1518,66 @@ void prb_commit(struct prb_reserved_entry *e)
>> * this. This pairs with desc_read:B.
>> */
>> if (!atomic_long_try_cmpxchg(&d->state_var, &prev_state_val,
>> - e->id | DESC_COMMIT_MASK)) { /* LMM(prb_commit:B) */
>> - WARN_ON_ONCE(1);
>> + e->id | DESC_COMMIT_MASK |
>> + final_mask)) { /* LMM(_prb_commit:B) */
>> + /*
>> + * This reserved descriptor must have been finalized already.
>> + * Retry with a reserved+final expected value.
>> + */
>> + prev_state_val = e->id | 0 | DESC_FINAL_MASK;
>
> This does not make sense to me. The state "e->id | 0 | DESC_FINAL_MASK"
> must never happen. It would mean that someone finalized
> record that is still being modified.
Correct. Setting the FINAL flag means the descriptor cannot be
_reopened_. It has nothing to do with the current state of the
descriptor. Once the FINAL flag is set, it remains set for the remaining
lifetime of that record.
> Or we both have different understanding of the logic.
Yes.
> Well, there are actually two approaches:
>
> + I originally expected that FINAL bit could be set only when
> COMMIT bit is set. But this brings the problems that prb_commit()
> would need to set FINAL when it is not longer the last descriptor.
My first attempt was to implement this. It turned out complex because it
involves descriptors finalizing themselves _and_ descriptors finalizing
their predecessor. This required two new memory barrier pairs:
- between a writer committing and re-checking the head_id that another
writer may have modified
- between a writer setting the state and another writer checking that
state
After re-evaluating the purpose of the FINAL flag, I decided that it
would be simpler to implement the 2nd approach (below) and would not
require any new memory barrier pairs.
> + Another approach is that FINAL bit could be set even when the
> COMMIT is not set. It would always be set by the next
> prb_reserve(). But it causes that there are more possible
> combinations of COMMIT and FINAL bits. As a result, the caller
> would need try more variants of the cmpxchg() calls. And
> it creates another races/cycles, ...
It does not cause more races. And I don't see where it will cause more
cmpxchg() calls. It probably _does_ lead to more cmpxchg() _code_. But
those are fallbacks for when the common case fails.
> I guess that you wanted to implement the 2nd approach and ended in
> many troubles. I wonder if the 1st approach might be easier.
Well, the "many troubles" were due to my naive assumption about the
previous descriptor state. Once I realized that, the missing piece was
obvious.
I will reconsider the first approach. Perhaps adding memory barriers is
preferable if it reduces lines of code.
And we will need to clarify partial continuous line reading because
right now that will not work.
John Ogness
[0] https://lkml.kernel.org/r/[email protected]
On Thu 2020-08-27 12:04:58, John Ogness wrote:
> On 2020-08-26, Petr Mladek <[email protected]> wrote:
> >> This series makes a very naive assumption that the previous
> >> descriptor is either in the reserved or committed queried states. The
> >> fact is, it can be in any of the 4 queried states. Adding support for
> >> finalization of all the states then gets quite complex, since any
> >> state transition (cmpxchg) may have to deal with an unexpected FINAL
> >> flag.
> >
> > It has to be done in two steps to avoid race:
> >
> > prb_commit()
> >
> > + set PRB_COMMIT_MASK
> > + check if it is still the last descriptor in the array
> > + set PRB_FINAL_MASK when it is not the last descriptor
> >
> > It should work because prb_reserve() finalizes the previous
> > descriptor after the new one is reserved. As a result:
> >
> > + prb_reserve() should either see PRB_COMMIT_MASK in the previous
> > descriptor and be able to finalize it.
> >
> > + or prb_commit() will see that the head moved and it is not
> > longer the last reserved one.
>
> I do not like the idea of relying on descriptors to finalize
> themselves. I worry that there might be some hole there. Failing to
> finalize basically disables printk, so that is pretty serious.
>
> Below is a patch against this series that adds support for finalizing
> all 4 queried states. It passes all my tests. Note that the code handles
> 2 corner cases:
>
> 1. When seq is 0, there is no previous descriptor to finalize. This
> exception is important because we don't want to finalize the -1
> placeholder. Otherwise, upon the first wrap, a descriptor will be
> prematurely finalized.
>
> 2. When a previous descriptor is being reserved for the first time, it
> might have a state_var value of 0 because the writer is still in
> prb_reserve() and has not set the initial value yet. I added
> considerable comments on this special case.
>
> I am comfortable with adding this new code, although it clearly adds
> complexity.
>
> John Ogness
>
> diff --git a/kernel/printk/printk_ringbuffer.c b/kernel/printk/printk_ringbuffer.c
> index 90d48973ac9e..1ed1e9eb930f 100644
> --- a/kernel/printk/printk_ringbuffer.c
> +++ b/kernel/printk/printk_ringbuffer.c
> @@ -967,10 +976,25 @@ static bool desc_reserve(struct printk_ringbuffer *rb, unsigned long *id_out)
> * any other changes. A write memory barrier is sufficient for this.
> * This pairs with desc_read:D.
> */
> - if (!atomic_long_try_cmpxchg(&desc->state_var, &prev_state_val,
> - id | 0)) { /* LMM(desc_reserve:F) */
> - WARN_ON_ONCE(1);
> - return false;
> + if (is_final)
> + state_val = id | 0 | DESC_FINAL_MASK;
> + else
> + state_val = id | 0;
> + if (atomic_long_cmpxchg(&desc->state_var, prev_state_val,
> + state_val) != prev_state_val) { /* LMM(desc_reserve:F) */
> + /*
> + * This reusable descriptor must have been finalized already.
> + * Retry with a reusable+final expected value.
> + */
> + prev_state_val |= DESC_FINAL_MASK;
> + state_val |= DESC_FINAL_MASK;
> +
> + if (!atomic_long_try_cmpxchg(&desc->state_var, &prev_state_val,
> + state_val)) { /* LMM(desc_reserve:FIXME) */
> +
> + WARN_ON_ONCE(1);
> + return false;
IMHO, this need to be done in a cycle until is succeeds. The previous
descriptor might get reopened or commited several times in the mean time.
I have played with the code from the original patchset in parallel
and came with alreanative solution that uses DESC_FINAL_MASK
as I originally thought. See below.
I still have to shake my head around both approaches.
Anwyay, here is my code against the original patchset:
From 578a9ad6bdb536269cc03d7a65420a414ac35022 Mon Sep 17 00:00:00 2001
From: Petr Mladek <[email protected]>
Date: Thu, 27 Aug 2020 16:38:03 +0200
Subject: [PATCH] printk: Alternative solution for handling non-finalized
descriptors
The original patchset allowed to set PRB_FINAL_MASK even when PRB_COMMIT_MASK
was not set. It was used in prb_reserve() to make sure that the previous
record will always get finalized.
The result is that there are more possible combinations of the bits
and more cmpxchg calls have to be used with different combinations
of bits.
This is easy in prb_commit(). It needs to set PRB_COMMIT_MASK.
PRB_FINAL_MASK could never get removed. So it is enough to try
both possibilities in the right order.
But it gets more problematic in prb_reserve() because the previous
descriptor might get repeatedly re-reserved. It would need to
try both combinations in a cycle until it succeeds. It would
work in reality because the data block could not get extended
indefinitely. But it might make headaches when one tries
to understand the logic.
Alternative solution is to allow to set PRB_FINAL_MASK only
when PRB_COMMIT_MASK is set. This might result in a non-finalized
descriptor when PRB_COMMIT_MASK was not set when the next descriptor
was reserved and nobody later called prb_commit_finalize().
The solution is to set PRB_FINAL_MASK also from prb_commit()
when the next descriptor is already reused. The only requirement
is to set PRB_COMMIT_MASK before checking the next descriptor
to avoid race.
By other words. PRB_FINAL_MASK could be set from two locations.
+ prb_reserve() must not set it when PRB_COMMIT_MASK has not
been set yet. But then prb_commit() must be called and
it must see that the next descriptor has been reused.
+ prb_commit() must check if the next descriptor is already
reused and set PRB_FINAL_MASK just in case it has not
already been done by prb_reserve().
All new cmpxchg() are full barriers to be on the safe side.
I believe that it is necessary but I did not have time
to describe it. We might discuss it later when this approach
was used.
Signed-off-by: Petr Mladek <[email protected]>
---
kernel/printk/printk.c | 6 +-
kernel/printk/printk_ringbuffer.c | 128 +++++++++++++++++-------------
kernel/printk/printk_ringbuffer.h | 2 +-
3 files changed, 79 insertions(+), 57 deletions(-)
diff --git a/kernel/printk/printk.c b/kernel/printk/printk.c
index 80afee3cfec7..01c034ddb118 100644
--- a/kernel/printk/printk.c
+++ b/kernel/printk/printk.c
@@ -535,7 +535,7 @@ static int log_store(u32 caller_id, int facility, int level,
if ((flags & LOG_CONT) || !(flags & LOG_NEWLINE))
prb_commit(&e);
else
- prb_commit_finalize(&e);
+ prb_final_commit(&e);
return (text_len + trunc_msg_len);
}
@@ -1091,7 +1091,7 @@ static unsigned int __init add_to_rb(struct printk_ringbuffer *rb,
dest_r.info->ts_nsec = r->info->ts_nsec;
dest_r.info->caller_id = r->info->caller_id;
- prb_commit_finalize(&e);
+ prb_final_commit(&e);
return prb_record_text_space(&e);
}
@@ -1905,7 +1905,7 @@ static size_t log_output(int facility, int level, enum log_flags lflags, const c
r.info->text_len += text_len;
if (lflags & LOG_NEWLINE) {
r.info->flags |= LOG_NEWLINE;
- prb_commit_finalize(&e);
+ prb_final_commit(&e);
} else {
prb_commit(&e);
}
diff --git a/kernel/printk/printk_ringbuffer.c b/kernel/printk/printk_ringbuffer.c
index 90d48973ac9e..e674d09ea27c 100644
--- a/kernel/printk/printk_ringbuffer.c
+++ b/kernel/printk/printk_ringbuffer.c
@@ -1351,25 +1351,58 @@ bool prb_reserve_last(struct prb_reserved_entry *e, struct printk_ringbuffer *rb
}
/*
- * Attempt to finalize a specified descriptor. Finalization only happens if
- * the descriptor is in the !final or commit+!final state, both of which
- * yield a state query result of desc_reserved.
+ * Attempt to finalize a specified descriptor. It is possible only
+ * when it has DESC_COMMIT_MASK set.
*/
static void desc_finalize(struct prb_desc_ring *desc_ring, unsigned long id)
{
unsigned long prev_state_val = id | DESC_COMMIT_MASK;
struct prb_desc *d = to_desc(desc_ring, id);
- bool is_final;
- while (!atomic_long_try_cmpxchg_relaxed(&d->state_var, &prev_state_val,
- prev_state_val | DESC_FINAL_MASK)) {
+ /*
+ * Failure is perfectly fine. It might happen when the descriptor
+ * is in the following states:
+ *
+ * + reserved: will be finalized in the related prb_commit()
+ * + committed: already finalized
+ * + reusable: already finalized and invalidated
+ * + misc: already finalized and reused again
+ */
+ atomic_long_try_cmpxchg(&d->state_var, &prev_state_val,
+ prev_state_val | DESC_FINAL_MASK);
- if (get_desc_state(id, prev_state_val, &is_final) != desc_reserved)
- break;
+}
- if (is_final)
- break;
- }
+/*
+ * Finalize the descriptor when it is not the last reserved one.
+ */
+static void desc_check_and_finalize(struct prb_desc_ring *desc_ring, unsigned long id)
+{
+ unsigned long next_prev_id = DESC_ID_PREV_WRAP(desc_ring, id + 1);
+ struct prb_desc *next_prev_d = to_desc(desc_ring, next_prev_id);
+ unsigned long next_prev_state_val;
+ enum desc_state next_prev_d_state;
+
+ /*
+ * reserved, committed, and reusable states mean that the next desciptor
+ * is still used for older data from previous wrap.
+ *
+ * Note that it is important to check id from the previous wrap. It is
+ * only way to be sure that the next descripter has not been reused yet.
+ * Otherwise, the desc ring might have already been reused many times.
+ */
+ next_prev_state_val = atomic_long_read(&next_prev_d->state_var);
+ next_prev_d_state = get_desc_state(next_prev_id, next_prev_state_val, NULL);
+ if (next_prev_d_state != desc_miss)
+ return;
+
+
+ /*
+ * desc_miss mean the the next descriptor is already used for newer data
+ * and this descriptor has to be finalized. It is needed to support
+ * continuous lines even there is a missing newline.
+ */
+ desc_finalize(desc_ring, id);
}
/**
@@ -1432,10 +1465,8 @@ bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
}
/*
- * New data is about to be reserved. Once that happens, previous
- * descriptors are no longer able to be extended. Finalize the
- * previous descriptor now so that it can be made available to
- * readers (when committed).
+ * Try to finalize the previous descriptor. It is needed to make sure
+ * that it will be finalized even when there is a missing newline.
*/
desc_finalize(desc_ring, DESC_ID(id - 1));
@@ -1501,13 +1532,13 @@ bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
}
/* Commit the data (possibly finalizing it) and restore interrupts. */
-static void _prb_commit(struct prb_reserved_entry *e, unsigned long final_mask)
+void prb_commit(struct prb_reserved_entry *e)
{
struct prb_desc_ring *desc_ring = &e->rb->desc_ring;
struct prb_desc *d = to_desc(desc_ring, e->id);
unsigned long prev_state_val = e->id | 0;
- /* Now the writer has finished all writing: LMM(_prb_commit:A) */
+ /* Now the writer has finished all writing: LMM(prb_commit:A) */
/*
* Set the descriptor as committed. See "ABA Issues" about why
@@ -1518,46 +1549,14 @@ static void _prb_commit(struct prb_reserved_entry *e, unsigned long final_mask)
* this. This pairs with desc_read:B.
*/
if (!atomic_long_try_cmpxchg(&d->state_var, &prev_state_val,
- e->id | DESC_COMMIT_MASK |
- final_mask)) { /* LMM(_prb_commit:B) */
- /*
- * This reserved descriptor must have been finalized already.
- * Retry with a reserved+final expected value.
- */
- prev_state_val = e->id | 0 | DESC_FINAL_MASK;
-
- if (!atomic_long_try_cmpxchg(&d->state_var, &prev_state_val,
- e->id | DESC_COMMIT_MASK |
- DESC_FINAL_MASK)) { /* LMM(_prb_commit:C) */
- WARN_ON_ONCE(1);
- }
+ e->id | DESC_COMMIT_MASK)) { /* LMM(_prb_commit:B) */
+ WARN_ON_ONCE(1);
}
/* Restore interrupts, the reserve/commit window is finished. */
local_irq_restore(e->irqflags);
}
-/**
- * prb_commit() - Commit (previously reserved) data to the ringbuffer.
- *
- * @e: The entry containing the reserved data information.
- *
- * This is the public function available to writers to commit data.
- *
- * Note that the data is not yet available to readers until it is finalized.
- * Finalizing happens automatically when space for the next record is
- * reserved.
- *
- * See prb_commit_finalize() for a version of this function that finalizes
- * immediately.
- *
- * Context: Any context. Enables local interrupts.
- */
-void prb_commit(struct prb_reserved_entry *e)
-{
- _prb_commit(e, 0);
-}
-
/**
* prb_commit_finalize() - Commit and finalize (previously reserved) data to
* the ringbuffer.
@@ -1573,9 +1572,32 @@ void prb_commit(struct prb_reserved_entry *e)
*
* Context: Any context. Enables local interrupts.
*/
-void prb_commit_finalize(struct prb_reserved_entry *e)
+void prb_final_commit(struct prb_reserved_entry *e)
{
- _prb_commit(e, DESC_FINAL_MASK);
+ struct prb_desc_ring *desc_ring = &e->rb->desc_ring;
+ struct prb_desc *d = to_desc(desc_ring, e->id);
+ unsigned long prev_state_val = e->id | 0;
+
+ /* Now the writer has finished all writing: LMM(prb_commit:A) */
+
+ /*
+ * Set the descriptor as committed and final in a single step.
+ * See "ABA Issues" about why cmpxchg() instead of set() is used.
+ *
+ * Guarantee all record data is stored before the descriptor state
+ * is stored as committed. A write memory barrier is sufficient for
+ * this. This pairs with desc_read:B.
+ */
+ if (!atomic_long_try_cmpxchg(&d->state_var, &prev_state_val,
+ e->id | DESC_COMMIT_MASK | DESC_FINAL_MASK)) { /* LMM(prb_final_commit:B) */
+ WARN_ON_ONCE(1);
+ }
+
+
+ desc_check_and_finalize(desc_ring, e->id);
+
+ /* Restore interrupts, the reserve/commit window is finished. */
+ local_irq_restore(e->irqflags);
}
/*
diff --git a/kernel/printk/printk_ringbuffer.h b/kernel/printk/printk_ringbuffer.h
index 8ed1f1f154ec..6fb486ab2bde 100644
--- a/kernel/printk/printk_ringbuffer.h
+++ b/kernel/printk/printk_ringbuffer.h
@@ -322,7 +322,7 @@ bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
bool prb_reserve_last(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
struct printk_record *r, u32 caller_id);
void prb_commit(struct prb_reserved_entry *e);
-void prb_commit_finalize(struct prb_reserved_entry *e);
+void prb_final_commit(struct prb_reserved_entry *e);
void prb_init(struct printk_ringbuffer *rb,
char *text_buf, unsigned int text_buf_size,
--
2.26.2
On Thu 2020-08-27 12:04:58, John Ogness wrote:
> On 2020-08-26, Petr Mladek <[email protected]> wrote:
> >> This series makes a very naive assumption that the previous
> >> descriptor is either in the reserved or committed queried states. The
> >> fact is, it can be in any of the 4 queried states. Adding support for
> >> finalization of all the states then gets quite complex, since any
> >> state transition (cmpxchg) may have to deal with an unexpected FINAL
> >> flag.
> >
> > It has to be done in two steps to avoid race:
> >
> > prb_commit()
> >
> > + set PRB_COMMIT_MASK
> > + check if it is still the last descriptor in the array
> > + set PRB_FINAL_MASK when it is not the last descriptor
> >
> > It should work because prb_reserve() finalizes the previous
> > descriptor after the new one is reserved. As a result:
> >
> > + prb_reserve() should either see PRB_COMMIT_MASK in the previous
> > descriptor and be able to finalize it.
> >
> > + or prb_commit() will see that the head moved and it is not
> > longer the last reserved one.
>
> I do not like the idea of relying on descriptors to finalize
> themselves. I worry that there might be some hole there. Failing to
> finalize basically disables printk, so that is pretty serious.
>
> Below is a patch against this series that adds support for finalizing
> all 4 queried states. It passes all my tests. Note that the code handles
> 2 corner cases:
>
> 1. When seq is 0, there is no previous descriptor to finalize. This
> exception is important because we don't want to finalize the -1
> placeholder. Otherwise, upon the first wrap, a descriptor will be
> prematurely finalized.
>
> 2. When a previous descriptor is being reserved for the first time, it
> might have a state_var value of 0 because the writer is still in
> prb_reserve() and has not set the initial value yet. I added
> considerable comments on this special case.
>
> I am comfortable with adding this new code, although it clearly adds
> complexity.
>
> John Ogness
>
> diff --git a/kernel/printk/printk_ringbuffer.c b/kernel/printk/printk_ringbuffer.c
> index 90d48973ac9e..1ed1e9eb930f 100644
> --- a/kernel/printk/printk_ringbuffer.c
> +++ b/kernel/printk/printk_ringbuffer.c
> @@ -860,9 +860,11 @@ static bool desc_reserve(struct printk_ringbuffer *rb, unsigned long *id_out)
> struct prb_desc_ring *desc_ring = &rb->desc_ring;
> unsigned long prev_state_val;
> unsigned long id_prev_wrap;
> + unsigned long state_val;
> struct prb_desc *desc;
> unsigned long head_id;
> unsigned long id;
> + bool is_final;
>
> head_id = atomic_long_read(&desc_ring->head_id); /* LMM(desc_reserve:A) */
>
> @@ -953,10 +955,17 @@ static bool desc_reserve(struct printk_ringbuffer *rb, unsigned long *id_out)
> * See "ABA Issues" about why this verification is performed.
> */
> prev_state_val = atomic_long_read(&desc->state_var); /* LMM(desc_reserve:E) */
> - if (prev_state_val &&
> - get_desc_state(id_prev_wrap, prev_state_val, NULL) != desc_reusable) {
> - WARN_ON_ONCE(1);
> - return false;
> + if (get_desc_state(id_prev_wrap, prev_state_val, &is_final) != desc_reusable) {
> + /*
> + * If this descriptor has never been used, @prev_state_val
> + * will be 0. However, even though it may have never been
> + * used, it may have been finalized. So that flag must be
> + * ignored.
> + */
> + if ((prev_state_val & ~DESC_FINAL_MASK)) {
> + WARN_ON_ONCE(1);
> + return false;
> + }
> }
>
> /*
> @@ -967,10 +976,25 @@ static bool desc_reserve(struct printk_ringbuffer *rb, unsigned long *id_out)
> * any other changes. A write memory barrier is sufficient for this.
> * This pairs with desc_read:D.
> */
> - if (!atomic_long_try_cmpxchg(&desc->state_var, &prev_state_val,
> - id | 0)) { /* LMM(desc_reserve:F) */
> - WARN_ON_ONCE(1);
> - return false;
> + if (is_final)
> + state_val = id | 0 | DESC_FINAL_MASK;
The state from the previous wrap always have to have DESC_FINAL_MASK set.
Do I miss something, please?
> + else
> + state_val = id | 0;
> + if (atomic_long_cmpxchg(&desc->state_var, prev_state_val,
> + state_val) != prev_state_val) { /* LMM(desc_reserve:F) */
> + /*
> + * This reusable descriptor must have been finalized already.
> + * Retry with a reusable+final expected value.
> + */
> + prev_state_val |= DESC_FINAL_MASK;
> + state_val |= DESC_FINAL_MASK;
> +
> + if (!atomic_long_try_cmpxchg(&desc->state_var, &prev_state_val,
> + state_val)) { /* LMM(desc_reserve:FIXME) */
> +
> + WARN_ON_ONCE(1);
> + return false;
> + }
> }
>
> /* Now data in @desc can be modified: LMM(desc_reserve:G) */
> @@ -1364,9 +1388,37 @@ static void desc_finalize(struct prb_desc_ring *desc_ring, unsigned long id)
> while (!atomic_long_try_cmpxchg_relaxed(&d->state_var, &prev_state_val,
> prev_state_val | DESC_FINAL_MASK)) {
>
> - if (get_desc_state(id, prev_state_val, &is_final) != desc_reserved)
> + switch (get_desc_state(id, prev_state_val, &is_final)) {
> + case desc_miss:
> + /*
> + * If the ID is exactly 1 wrap behind the expected, it is
> + * in the process of being reserved by another writer and
> + * must be considered reserved.
> + */
> + if (get_desc_state(DESC_ID_PREV_WRAP(desc_ring, id),
> + prev_state_val, &is_final) != desc_reusable) {
> + /*
> + * If this descriptor has never been used, @prev_state_val
> + * will be 0. However, even though it may have never been
> + * used, it may have been finalized. So that flag must be
> + * ignored.
> + */
> + if ((prev_state_val & ~DESC_FINAL_MASK)) {
> + WARN_ON_ONCE(1);
> + return;
> + }
> + }
> + fallthrough;
How is this supposed to work please?
If the ID is exactly one wrap behind, it is being reserved but the new
it was not written yet. In this case, DESC_FINAL_MASK is already set.
The above cmpxchg will not do any change. And prb_reserve() will not
be able to distinguish that it has been finalized.
A solution would be to write the new id here. But then it will be
needed to detect this in prb_reserve().
It is possible that it already works this way. But I do not see it.
I haven't had a coffee yet.
I am really not sure what solution is better. Mine have more barriers.
But this brings many new combinations that need to be checked and
handled. Sigh.
Best Regards,
Petr
> + case desc_reserved:
> + case desc_reusable:
> + /* finalizable, try again */
> break;
> + case desc_committed:
> + /* already finalized */
> + return;
> + }
>
> + /* already finalized? */
> if (is_final)
> break;
> }
On 2020-08-28, Petr Mladek <[email protected]> wrote:
>> Below is a patch against this series that adds support for finalizing
>> all 4 queried states. It passes all my tests. Note that the code handles
>> 2 corner cases:
>>
>> 1. When seq is 0, there is no previous descriptor to finalize. This
>> exception is important because we don't want to finalize the -1
>> placeholder. Otherwise, upon the first wrap, a descriptor will be
>> prematurely finalized.
>>
>> 2. When a previous descriptor is being reserved for the first time, it
>> might have a state_var value of 0 because the writer is still in
>> prb_reserve() and has not set the initial value yet. I added
>> considerable comments on this special case.
>>
>> I am comfortable with adding this new code, although it clearly adds
>> complexity.
>>
>> John Ogness
>>
>> diff --git a/kernel/printk/printk_ringbuffer.c b/kernel/printk/printk_ringbuffer.c
>> index 90d48973ac9e..1ed1e9eb930f 100644
>> --- a/kernel/printk/printk_ringbuffer.c
>> +++ b/kernel/printk/printk_ringbuffer.c
>> @@ -860,9 +860,11 @@ static bool desc_reserve(struct printk_ringbuffer *rb, unsigned long *id_out)
>> struct prb_desc_ring *desc_ring = &rb->desc_ring;
>> unsigned long prev_state_val;
>> unsigned long id_prev_wrap;
>> + unsigned long state_val;
>> struct prb_desc *desc;
>> unsigned long head_id;
>> unsigned long id;
>> + bool is_final;
>>
>> head_id = atomic_long_read(&desc_ring->head_id); /* LMM(desc_reserve:A) */
>>
>> @@ -953,10 +955,17 @@ static bool desc_reserve(struct printk_ringbuffer *rb, unsigned long *id_out)
>> * See "ABA Issues" about why this verification is performed.
>> */
>> prev_state_val = atomic_long_read(&desc->state_var); /* LMM(desc_reserve:E) */
>> - if (prev_state_val &&
>> - get_desc_state(id_prev_wrap, prev_state_val, NULL) != desc_reusable) {
>> - WARN_ON_ONCE(1);
>> - return false;
>> + if (get_desc_state(id_prev_wrap, prev_state_val, &is_final) != desc_reusable) {
>> + /*
>> + * If this descriptor has never been used, @prev_state_val
>> + * will be 0. However, even though it may have never been
>> + * used, it may have been finalized. So that flag must be
>> + * ignored.
>> + */
>> + if ((prev_state_val & ~DESC_FINAL_MASK)) {
>> + WARN_ON_ONCE(1);
>> + return false;
>> + }
>> }
>>
>> /*
>> @@ -967,10 +976,25 @@ static bool desc_reserve(struct printk_ringbuffer *rb, unsigned long *id_out)
>> * any other changes. A write memory barrier is sufficient for this.
>> * This pairs with desc_read:D.
>> */
>> - if (!atomic_long_try_cmpxchg(&desc->state_var, &prev_state_val,
>> - id | 0)) { /* LMM(desc_reserve:F) */
>> - WARN_ON_ONCE(1);
>> - return false;
>> + if (is_final)
>> + state_val = id | 0 | DESC_FINAL_MASK;
>
> The state from the previous wrap always have to have DESC_FINAL_MASK set.
> Do I miss something, please?
Important: FINAL is not a _state_. It is a _flag_ that marks a
descriptor as non-reopenable. This was a simple change because it does
not affect any state logic. The number of states and possible
transitions have not changed.
When a descriptor transitions to reusable, the FINAL flag is cleared. It
has reached the end of its lifecycle. See desc_make_reusable().
(In order to have transitioned to reusable, the FINAL and COMMIT flags
must have been set.)
In the case of desc_reserve(), a reusable descriptor is transitioning to
reserved. When this transition happens, there may already be a later
descriptor that has been reserved and finalized this descriptor. If the
FINAL flag is set here, it means that the FINAL flag is set for the
_new_ descriptor being reserved.
In summary, the FINAL flag can be set in _any_ state. Once set, it is
preserved for all further state transitions. And it is cleared when that
descriptor becomes reusable.
>> + else
>> + state_val = id | 0;
>> + if (atomic_long_cmpxchg(&desc->state_var, prev_state_val,
>> + state_val) != prev_state_val) { /* LMM(desc_reserve:F) */
>> + /*
>> + * This reusable descriptor must have been finalized already.
>> + * Retry with a reusable+final expected value.
>> + */
>> + prev_state_val |= DESC_FINAL_MASK;
>> + state_val |= DESC_FINAL_MASK;
>> +
>> + if (!atomic_long_try_cmpxchg(&desc->state_var, &prev_state_val,
>> + state_val)) { /* LMM(desc_reserve:FIXME) */
>> +
>> + WARN_ON_ONCE(1);
>> + return false;
>> + }
>> }
>>
>> /* Now data in @desc can be modified: LMM(desc_reserve:G) */
>> @@ -1364,9 +1388,37 @@ static void desc_finalize(struct prb_desc_ring *desc_ring, unsigned long id)
>> while (!atomic_long_try_cmpxchg_relaxed(&d->state_var, &prev_state_val,
>> prev_state_val | DESC_FINAL_MASK)) {
>>
>> - if (get_desc_state(id, prev_state_val, &is_final) != desc_reserved)
>> + switch (get_desc_state(id, prev_state_val, &is_final)) {
>> + case desc_miss:
>> + /*
>> + * If the ID is exactly 1 wrap behind the expected, it is
>> + * in the process of being reserved by another writer and
>> + * must be considered reserved.
>> + */
>> + if (get_desc_state(DESC_ID_PREV_WRAP(desc_ring, id),
>> + prev_state_val, &is_final) != desc_reusable) {
>> + /*
>> + * If this descriptor has never been used, @prev_state_val
>> + * will be 0. However, even though it may have never been
>> + * used, it may have been finalized. So that flag must be
>> + * ignored.
>> + */
>> + if ((prev_state_val & ~DESC_FINAL_MASK)) {
>> + WARN_ON_ONCE(1);
>> + return;
>> + }
>> + }
>> + fallthrough;
>
> How is this supposed to work please?
>
> If the ID is exactly one wrap behind, it is being reserved but the new
> id was not written yet. In this case, DESC_FINAL_MASK is already set.
No. If it is exactly one wrap behind, it is still in the reusable
state. The FINAL flag will not be set because it is cleared when
transitioning to reusable.
> The above cmpxchg will not do any change. And prb_reserve() will not
> be able to distinguish that it has been finalized.
The cmpxchg() will try again using the newly updated @prev_state_val and
try to set it to "prev_state_val | FINAL".
Now, theoretically, a writer could commit and reopen the descriptor with
such timing that this cmpxchg() will always fail. A kind of cat and
mouse, always trying to set the FINAL flag for the last @state_var
value.
That game could be avoided if the descriptor noticed that it is no
longer the head ID and set its own FINAL flag. I like the idea of a
guaranteed finalizing of the previous descriptor and the ability for a
descriptor to finalize itself.
(Although really, if we start talking about timed cmpxchg() attacks,
almost any cmpxchg loop can become near-infinite.)
> I am really not sure what solution is better. Mine have more barriers.
> But this brings many new combinations that need to be checked and
> handled.
I am putting together a fully functional version of your solution with
the appropriate memory barriers and correct handling of the special
cases so that we can get a better look at the difference.
John Ogness