Hi,
While testing various flushing scenarios, I stumbled on a
couple issues that cause console flushing to fail. While
discussing the v1 [0] series, a couple more issues arose.
This series addresses all the issues:
1. The prb_next_seq() optimization caused inconsistent return
values. Fix prb_next_seq() to the originally intended
behavior but keep an optimization.
2. pr_flush() might not wait until the most recently stored
printk() message if non-finalized records precede it. Fix
pr_flush() to wait for all records to print that are at
least reserved at the time of the call.
3. In panic, the panic messages will not print if non-finalized
records precede them. Add a special condition so that
readers on the panic CPU can drop non-finalized records.
4. It is possible (and easy to reproduce) a scenario where the
console on the panic CPU hands over to a waiter of a stopped
CPU. Do not use the handover feature in panic.
5. If messages are being dropped during panic, non-panic CPUs
are silenced. But by then it is already too late and most
likely the panic messages have been dropped. Change the
non-panic CPU silencing logic to restrict non-panic CPUs
from flooding the ringbuffer.
This series also performing some minor cleanups to remove open
coded checks about the panic context and improve documentation
language regarding data-less records.
Because of multiple refactoring done in recent history, it
would be helpful to provide the LTS maintainers with the proper
backported patches. I am happy to do this.
The changes since v1:
- Rename NO_LPOS to EMPTY_LINE_LPOS.
- Add and cleanup documentation to clarify language regarding
data-less records and special lpos values.
- Implement a new prb_next_seq() optimization to preserve the
intended behavior. This is essentially my rfc [1] with
memory barriers added and based on an alternate implemenation
suggested by pmladek [2].
- Introduce new prb_next_reserve_seq() function to return the
sequence number after @head_id.
- Use prb_next_reserve_seq() instead of prb_next_seq() for
pr_flush().
- Implement dropping non-finalized records in panic within
_prb_read_valid() instead of printk_get_next_message(). This
also makes use of the new prb_next_reserve_seq().
- Use the alternate implementation from pmladek [3] to avoid
the handover feature in panic.
- Implement a new strategy to avoid dropping panic messages
when non-panic CPUs are flooding the ringbuffer.
John Ogness
[0] https://lore.kernel.org/lkml/[email protected]
[1] https://lore.kernel.org/lkml/[email protected]
[2] https://lore.kernel.org/lkml/ZTkxOJbDLPy12n41@alley
[3] https://lore.kernel.org/lkml/ZS-r3QnpKzm7UVip@alley
John Ogness (8):
printk: ringbuffer: Do not skip non-finalized records with
prb_next_seq()
printk: ringbuffer: Clarify special lpos values
printk: For @suppress_panic_printk check for other CPU in panic
printk: Add this_cpu_in_panic()
printk: ringbuffer: Cleanup reader terminology
printk: Wait for all reserved records with pr_flush()
printk: Skip non-finalized records in panic
printk: Avoid non-panic CPUs flooding ringbuffer
Petr Mladek (1):
printk: Disable passing console lock owner completely during panic()
kernel/printk/internal.h | 1 +
kernel/printk/printk.c | 108 ++++++----
kernel/printk/printk_ringbuffer.c | 343 +++++++++++++++++++++++++-----
kernel/printk/printk_ringbuffer.h | 21 +-
4 files changed, 382 insertions(+), 91 deletions(-)
base-commit: b4908d68609b57ad1ba4b80bd72c4d2260387e31
--
2.39.2
Currently @suppress_panic_printk is checked along with
non-matching @panic_cpu and current CPU. This works
because @suppress_panic_printk is only set when
panic_in_progress() is true.
Rather than relying on the @suppress_panic_printk semantics,
use the concise helper function other_cpu_in_progress(). The
helper function exists to avoid open coding such tests.
Signed-off-by: John Ogness <[email protected]>
Reviewed-by: Petr Mladek <[email protected]>
---
kernel/printk/printk.c | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git a/kernel/printk/printk.c b/kernel/printk/printk.c
index be95a6851164..22bb45d8f2f2 100644
--- a/kernel/printk/printk.c
+++ b/kernel/printk/printk.c
@@ -2270,8 +2270,7 @@ asmlinkage int vprintk_emit(int facility, int level,
if (unlikely(suppress_printk))
return 0;
- if (unlikely(suppress_panic_printk) &&
- atomic_read(&panic_cpu) != raw_smp_processor_id())
+ if (unlikely(suppress_panic_printk) && other_cpu_in_panic())
return 0;
if (level == LOGLEVEL_SCHED) {
--
2.39.2
With the lockless ringbuffer, it is allowed that multiple
CPUs/contexts write simultaneously into the buffer. This creates
an ambiguity as some writers will finalize sooner.
The documentation for the prb_read functions is not clear as it
refers to "not yet written" and "no data available". Clarify the
return values and language to be in terms of the reader: records
available for reading.
Signed-off-by: John Ogness <[email protected]>
---
kernel/printk/printk_ringbuffer.c | 16 +++++++++-------
1 file changed, 9 insertions(+), 7 deletions(-)
diff --git a/kernel/printk/printk_ringbuffer.c b/kernel/printk/printk_ringbuffer.c
index 43d77db435c5..7c121fd68330 100644
--- a/kernel/printk/printk_ringbuffer.c
+++ b/kernel/printk/printk_ringbuffer.c
@@ -2013,11 +2013,13 @@ static u64 prb_first_seq(struct printk_ringbuffer *rb)
}
/*
- * Non-blocking read of a record. Updates @seq to the last finalized record
- * (which may have no data available).
+ * Non-blocking read of a record.
*
- * See the description of prb_read_valid() and prb_read_valid_info()
- * for details.
+ * On success @seq is updated to the record that was read and (if provided)
+ * @r and @line_count will contain the read/calculated data.
+ *
+ * On failure @seq is updated to a record that is not yet available to the
+ * reader, but it will be the next record available to the reader.
*/
static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq,
struct printk_record *r, unsigned int *line_count)
@@ -2036,7 +2038,7 @@ static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq,
*seq = tail_seq;
} else if (err == -ENOENT) {
- /* Record exists, but no data available. Skip. */
+ /* Record exists, but the data was lost. Skip. */
(*seq)++;
} else {
@@ -2069,7 +2071,7 @@ static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq,
* On success, the reader must check r->info.seq to see which record was
* actually read. This allows the reader to detect dropped records.
*
- * Failure means @seq refers to a not yet written record.
+ * Failure means @seq refers to a record not yet available to the reader.
*/
bool prb_read_valid(struct printk_ringbuffer *rb, u64 seq,
struct printk_record *r)
@@ -2099,7 +2101,7 @@ bool prb_read_valid(struct printk_ringbuffer *rb, u64 seq,
* On success, the reader must check info->seq to see which record meta data
* was actually read. This allows the reader to detect dropped records.
*
- * Failure means @seq refers to a not yet written record.
+ * Failure means @seq refers to a record not yet available to the reader.
*/
bool prb_read_valid_info(struct printk_ringbuffer *rb, u64 seq,
struct printk_info *info, unsigned int *line_count)
--
2.39.2
Commit f244b4dc53e5 ("printk: ringbuffer: Improve
prb_next_seq() performance") introduced an optimization for
prb_next_seq() by using best-effort to track recently finalized
records. However, the order of finalization does not
necessarily match the order of the records. The optimization
changed prb_next_seq() to return inconsistent results, possibly
yielding sequence numbers that are not available to readers
because they are preceded by non-finalized records or they are
not yet visible to the reader CPU.
Rather than simply best-effort tracking recently finalized
records, force the committing writer to read records and
increment the last "contiguous block" of finalized records. In
order to do this, the sequence number instead of ID must be
stored because ID's cannot be directly compared.
A new memory barrier pair is introduced to guarantee that a
reader can always read the records up until the sequence number
returned by prb_next_seq() (unless the records have since
been overwritten in the ringbuffer).
This restores the original functionality of prb_next_seq()
while also keeping the optimization.
For 32bit systems, only the lower 32 bits of the sequence
number are stored. When reading the value, it is expanded to
the full 64bit sequence number by folding in the value
returned by prb_first_seq().
Fixes: f244b4dc53e5 ("printk: ringbuffer: Improve prb_next_seq() performance")
Signed-off-by: John Ogness <[email protected]>
---
kernel/printk/printk_ringbuffer.c | 189 ++++++++++++++++++++++++------
kernel/printk/printk_ringbuffer.h | 4 +-
2 files changed, 153 insertions(+), 40 deletions(-)
diff --git a/kernel/printk/printk_ringbuffer.c b/kernel/printk/printk_ringbuffer.c
index fde338606ce8..94eede5356ac 100644
--- a/kernel/printk/printk_ringbuffer.c
+++ b/kernel/printk/printk_ringbuffer.c
@@ -6,6 +6,7 @@
#include <linux/errno.h>
#include <linux/bug.h>
#include "printk_ringbuffer.h"
+#include "internal.h"
/**
* DOC: printk_ringbuffer overview
@@ -303,6 +304,9 @@
*
* desc_push_tail:B / desc_reserve:D
* set descriptor reusable (state), then push descriptor tail (id)
+ *
+ * desc_update_last_finalized:A / desc_last_finalized_seq:A
+ * store finalized record, then set new highest finalized sequence number
*/
#define DATA_SIZE(data_ring) _DATA_SIZE((data_ring)->size_bits)
@@ -1441,20 +1445,144 @@ bool prb_reserve_in_last(struct prb_reserved_entry *e, struct printk_ringbuffer
return false;
}
+#ifdef CONFIG_64BIT
+
+#define __u64seq_to_ulseq(u64seq) (u64seq)
+#define __ulseq_to_u64seq(ulseq) (ulseq)
+
+#else /* CONFIG_64BIT */
+
+static u64 prb_first_seq(struct printk_ringbuffer *rb);
+
+#define __u64seq_to_ulseq(u64seq) ((u32)u64seq)
+static inline u64 __ulseq_to_u64seq(u32 ulseq)
+{
+ u64 rb_first_seq = prb_first_seq(prb);
+ u64 seq;
+
+ /*
+ * The provided sequence is only the lower 32 bits of the ringbuffer
+ * sequence. It needs to be expanded to 64bit. Get the first sequence
+ * number from the ringbuffer and fold it.
+ */
+ seq = rb_first_seq - ((u32)rb_first_seq - ulseq);
+
+ return seq;
+}
+
+#endif /* CONFIG_64BIT */
+
+/*
+ * @last_finalized_seq value guarantees that all records up to and including
+ * this sequence number are finalized and can be read. The only exception are
+ * too old records which have already been overwritten.
+ *
+ * It is also guaranteed that @last_finalized_seq only increases.
+ *
+ * Be aware that finalized records following non-finalized records are not
+ * reported because they are not yet available to the reader. For example,
+ * a new record stored via printk() will not be available to a printer if
+ * it follows a record that has not been finalized yet. However, once that
+ * non-finalized record becomes finalized, @last_finalized_seq will be
+ * appropriately updated and the full set of finalized records will be
+ * available to the printer. And since each printk() caller will either
+ * directly print or trigger deferred printing of all available unprinted
+ * records, all printk() messages will get printed.
+ */
+static u64 desc_last_finalized_seq(struct prb_desc_ring *desc_ring)
+{
+ unsigned long ulseq;
+
+ /*
+ * Guarantee the sequence number is loaded before loading the
+ * associated record in order to guarantee that the record can be
+ * seen by this CPU. This pairs with desc_update_last_finalized:A.
+ */
+ ulseq = atomic_long_read_acquire(&desc_ring->last_finalized_seq
+ ); /* LMM(desc_last_finalized_seq:A) */
+
+ return __ulseq_to_u64seq(ulseq);
+}
+
+static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq,
+ struct printk_record *r, unsigned int *line_count);
+
+/*
+ * Check if there are records directly following @last_finalized_seq that are
+ * finalized. If so, update @last_finalized_seq to the latest of these
+ * records. It is not allowed to skip over records that are not yet finalized.
+ */
+static void desc_update_last_finalized(struct printk_ringbuffer *rb)
+{
+ struct prb_desc_ring *desc_ring = &rb->desc_ring;
+ u64 old_seq = desc_last_finalized_seq(desc_ring);
+ unsigned long oldval;
+ unsigned long newval;
+ u64 finalized_seq;
+ u64 try_seq;
+
+try_again:
+ finalized_seq = old_seq;
+ try_seq = finalized_seq + 1;
+
+ /* Try to find later finalized records. */
+ while (_prb_read_valid(rb, &try_seq, NULL, NULL)) {
+ finalized_seq = try_seq;
+ try_seq++;
+ }
+
+ /* No update needed if no later finalized record was found. */
+ if (finalized_seq == old_seq)
+ return;
+
+ oldval = __u64seq_to_ulseq(old_seq);
+ newval = __u64seq_to_ulseq(finalized_seq);
+
+ /*
+ * Set the sequence number of a later finalized record that has been
+ * seen.
+ *
+ * Guarantee the record data is visible to other CPUs before storing
+ * its sequence number. This pairs with desc_last_finalized_seq:A.
+ *
+ * Memory barrier involvement:
+ *
+ * If desc_last_finalized_seq:A reads from
+ * desc_update_last_finalized:A, then desc_read:A reads from
+ * _prb_commit:B.
+ *
+ * Relies on:
+ *
+ * RELEASE from _prb_commit:B to desc_update_last_finalized:A
+ * matching
+ * ACQUIRE from desc_last_finalized_seq:A to desc_read:A
+ *
+ * Note: _prb_commit:B and desc_update_last_finalized:A can be
+ * different CPUs. However, the desc_update_last_finalized:A
+ * CPU (which performs the release) must have previously seen
+ * _prb_commit:B.
+ */
+ if (!atomic_long_try_cmpxchg_release(&desc_ring->last_finalized_seq,
+ &oldval, newval)) { /* LMM(desc_update_last_finalized:A) */
+ old_seq = __ulseq_to_u64seq(oldval);
+ goto try_again;
+ }
+}
+
/*
* Attempt to finalize a specified descriptor. If this fails, the descriptor
* is either already final or it will finalize itself when the writer commits.
*/
-static void desc_make_final(struct prb_desc_ring *desc_ring, unsigned long id)
+static void desc_make_final(struct printk_ringbuffer *rb, unsigned long id)
{
+ struct prb_desc_ring *desc_ring = &rb->desc_ring;
unsigned long prev_state_val = DESC_SV(id, desc_committed);
struct prb_desc *d = to_desc(desc_ring, id);
- atomic_long_cmpxchg_relaxed(&d->state_var, prev_state_val,
- DESC_SV(id, desc_finalized)); /* LMM(desc_make_final:A) */
-
- /* Best effort to remember the last finalized @id. */
- atomic_long_set(&desc_ring->last_finalized_id, id);
+ if (atomic_long_try_cmpxchg_relaxed(&d->state_var, &prev_state_val,
+ DESC_SV(id, desc_finalized))) { /* LMM(desc_make_final:A) */
+ desc_update_last_finalized(rb);
+ }
}
/**
@@ -1550,7 +1678,7 @@ bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
* readers. (For seq==0 there is no previous descriptor.)
*/
if (info->seq > 0)
- desc_make_final(desc_ring, DESC_ID(id - 1));
+ desc_make_final(rb, DESC_ID(id - 1));
r->text_buf = data_alloc(rb, r->text_buf_size, &d->text_blk_lpos, id);
/* If text data allocation fails, a data-less record is committed. */
@@ -1643,7 +1771,7 @@ void prb_commit(struct prb_reserved_entry *e)
*/
head_id = atomic_long_read(&desc_ring->head_id); /* LMM(prb_commit:A) */
if (head_id != e->id)
- desc_make_final(desc_ring, e->id);
+ desc_make_final(e->rb, e->id);
}
/**
@@ -1663,12 +1791,9 @@ void prb_commit(struct prb_reserved_entry *e)
*/
void prb_final_commit(struct prb_reserved_entry *e)
{
- struct prb_desc_ring *desc_ring = &e->rb->desc_ring;
-
_prb_commit(e, desc_finalized);
- /* Best effort to remember the last finalized @id. */
- atomic_long_set(&desc_ring->last_finalized_id, e->id);
+ desc_update_last_finalized(e->rb);
}
/*
@@ -2008,7 +2133,9 @@ u64 prb_first_valid_seq(struct printk_ringbuffer *rb)
* newest sequence number available to readers will be.
*
* This provides readers a sequence number to jump to if all currently
- * available records should be skipped.
+ * available records should be skipped. It is guaranteed that all records
+ * previous to the returned value have been finalized and are (or were)
+ * available to the reader.
*
* Context: Any context.
* Return: The sequence number of the next newest (not yet available) record
@@ -2017,33 +2144,19 @@ u64 prb_first_valid_seq(struct printk_ringbuffer *rb)
u64 prb_next_seq(struct printk_ringbuffer *rb)
{
struct prb_desc_ring *desc_ring = &rb->desc_ring;
- enum desc_state d_state;
- unsigned long id;
u64 seq;
- /* Check if the cached @id still points to a valid @seq. */
- id = atomic_long_read(&desc_ring->last_finalized_id);
- d_state = desc_read(desc_ring, id, NULL, &seq, NULL);
+ seq = desc_last_finalized_seq(desc_ring);
- if (d_state == desc_finalized || d_state == desc_reusable) {
- /*
- * Begin searching after the last finalized record.
- *
- * On 0, the search must begin at 0 because of hack#2
- * of the bootstrapping phase it is not known if a
- * record at index 0 exists.
- */
- if (seq != 0)
- seq++;
- } else {
- /*
- * The information about the last finalized sequence number
- * has gone. It should happen only when there is a flood of
- * new messages and the ringbuffer is rapidly recycled.
- * Give up and start from the beginning.
- */
- seq = 0;
- }
+ /*
+ * Begin searching after the last finalized record.
+ *
+ * On 0, the search must begin at 0 because of hack#2
+ * of the bootstrapping phase it is not known if a
+ * record at index 0 exists.
+ */
+ if (seq != 0)
+ seq++;
/*
* The information about the last finalized @seq might be inaccurate.
@@ -2085,7 +2198,7 @@ void prb_init(struct printk_ringbuffer *rb,
rb->desc_ring.infos = infos;
atomic_long_set(&rb->desc_ring.head_id, DESC0_ID(descbits));
atomic_long_set(&rb->desc_ring.tail_id, DESC0_ID(descbits));
- atomic_long_set(&rb->desc_ring.last_finalized_id, DESC0_ID(descbits));
+ atomic_long_set(&rb->desc_ring.last_finalized_seq, 0);
rb->text_data_ring.size_bits = textbits;
rb->text_data_ring.data = text_buf;
diff --git a/kernel/printk/printk_ringbuffer.h b/kernel/printk/printk_ringbuffer.h
index 18cd25e489b8..3374a5a3303e 100644
--- a/kernel/printk/printk_ringbuffer.h
+++ b/kernel/printk/printk_ringbuffer.h
@@ -75,7 +75,7 @@ struct prb_desc_ring {
struct printk_info *infos;
atomic_long_t head_id;
atomic_long_t tail_id;
- atomic_long_t last_finalized_id;
+ atomic_long_t last_finalized_seq;
};
/*
@@ -259,7 +259,7 @@ static struct printk_ringbuffer name = { \
.infos = &_##name##_infos[0], \
.head_id = ATOMIC_INIT(DESC0_ID(descbits)), \
.tail_id = ATOMIC_INIT(DESC0_ID(descbits)), \
- .last_finalized_id = ATOMIC_INIT(DESC0_ID(descbits)), \
+ .last_finalized_seq = ATOMIC_INIT(0), \
}, \
.text_data_ring = { \
.size_bits = (avgtextbits) + (descbits), \
--
2.39.2
Commit 13fb0f74d702 ("printk: Avoid livelock with heavy printk
during panic") introduced a mechanism to silence non-panic CPUs
if too many messages are being dropped. Aside from trying to
workaround the livelock bugs of legacy consoles, it was also
intended to avoid losing panic messages. However, if non-panic
CPUs are flooding the ringbuffer, then reacting to dropped
messages is too late.
To avoid losing panic CPU messages, the tracking needs to occur
when non-panic CPUs are storing messages. If non-panic CPUs have
filled approximately 1/4 the ringbuffer, they need to be
silenced to ensure the ringbuffer has ample space available for
the panic CPU messages.
Rather than trying to come up with an accurate heuristic to
measure the size used by non-panic CPUs, simply restrict them
to 1/4 the possible ringbuffer descriptors. In practice this
will end up being around 1/3 the ringbuffer size, which still
leaves ample space for the panic CPU messages.
Signed-off-by: John Ogness <[email protected]>
---
kernel/printk/printk.c | 33 ++++++++++++++++++---------------
1 file changed, 18 insertions(+), 15 deletions(-)
diff --git a/kernel/printk/printk.c b/kernel/printk/printk.c
index cb99c854a648..9ac7d50c2f18 100644
--- a/kernel/printk/printk.c
+++ b/kernel/printk/printk.c
@@ -2315,6 +2315,8 @@ asmlinkage int vprintk_emit(int facility, int level,
const struct dev_printk_info *dev_info,
const char *fmt, va_list args)
{
+ static atomic_t panic_noise_count = ATOMIC_INIT(0);
+
int printed_len;
bool in_sched = false;
@@ -2322,8 +2324,22 @@ asmlinkage int vprintk_emit(int facility, int level,
if (unlikely(suppress_printk))
return 0;
- if (unlikely(suppress_panic_printk) && other_cpu_in_panic())
- return 0;
+ if (other_cpu_in_panic()) {
+ if (unlikely(suppress_panic_printk))
+ return 0;
+
+ /*
+ * The messages on the panic CPU are the most important. If
+ * non-panic CPUs are generating many messages, the panic
+ * messages could get lost. Limit the number of non-panic
+ * messages to approximately 1/4 of the ringbuffer.
+ */
+ if (atomic_inc_return_relaxed(&panic_noise_count) >
+ (1 << (prb->desc_ring.count_bits - 2))) {
+ suppress_panic_printk = 1;
+ return 0;
+ }
+ }
if (level == LOGLEVEL_SCHED) {
level = LOGLEVEL_DEFAULT;
@@ -2799,8 +2815,6 @@ void console_prepend_dropped(struct printk_message *pmsg, unsigned long dropped)
bool printk_get_next_message(struct printk_message *pmsg, u64 seq,
bool is_extended, bool may_suppress)
{
- static int panic_console_dropped;
-
struct printk_buffers *pbufs = pmsg->pbufs;
const size_t scratchbuf_sz = sizeof(pbufs->scratchbuf);
const size_t outbuf_sz = sizeof(pbufs->outbuf);
@@ -2828,17 +2842,6 @@ bool printk_get_next_message(struct printk_message *pmsg, u64 seq,
pmsg->seq = r.info->seq;
pmsg->dropped = r.info->seq - seq;
- /*
- * Check for dropped messages in panic here so that printk
- * suppression can occur as early as possible if necessary.
- */
- if (pmsg->dropped &&
- panic_in_progress() &&
- panic_console_dropped++ > 10) {
- suppress_panic_printk = 1;
- pr_warn_once("Too many dropped messages. Suppress messages on non-panic CPUs to prevent livelock.\n");
- }
-
/* Skip record that has level above the console loglevel. */
if (may_suppress && suppress_message_printing(r.info->level))
goto out;
--
2.39.2
For empty line records, no data blocks are created. Instead,
these valid records are identified by special logical position
values (in fields of @prb_desc.text_blk_lpos).
Currently the macro NO_LPOS is used for empty line records.
This name is confusing because it does not imply _why_ there is
no data block.
Rename NO_LPOS to EMPTY_LINE_LPOS so that it is clear why there
is no data block.
Also add comments explaining the use of EMPTY_LINE_LPOS as well
as clarification to the values used to represent data-less
blocks.
Signed-off-by: John Ogness <[email protected]>
---
kernel/printk/printk_ringbuffer.c | 20 ++++++++++++++++----
kernel/printk/printk_ringbuffer.h | 16 +++++++++++++++-
2 files changed, 31 insertions(+), 5 deletions(-)
diff --git a/kernel/printk/printk_ringbuffer.c b/kernel/printk/printk_ringbuffer.c
index 94eede5356ac..43d77db435c5 100644
--- a/kernel/printk/printk_ringbuffer.c
+++ b/kernel/printk/printk_ringbuffer.c
@@ -1034,9 +1034,13 @@ static char *data_alloc(struct printk_ringbuffer *rb, unsigned int size,
unsigned long next_lpos;
if (size == 0) {
- /* Specify a data-less block. */
- blk_lpos->begin = NO_LPOS;
- blk_lpos->next = NO_LPOS;
+ /*
+ * Data blocks are not created for empty lines. Instead, the
+ * reader will recognize these special lpos values and handle
+ * it appropriately.
+ */
+ blk_lpos->begin = EMPTY_LINE_LPOS;
+ blk_lpos->next = EMPTY_LINE_LPOS;
return NULL;
}
@@ -1214,10 +1218,18 @@ static const char *get_data(struct prb_data_ring *data_ring,
/* Data-less data block description. */
if (BLK_DATALESS(blk_lpos)) {
- if (blk_lpos->begin == NO_LPOS && blk_lpos->next == NO_LPOS) {
+ /*
+ * Records that are just empty lines are also valid, even
+ * though they do not have a data block. For such records
+ * explicitly return empty string data to signify success.
+ */
+ if (blk_lpos->begin == EMPTY_LINE_LPOS &&
+ blk_lpos->next == EMPTY_LINE_LPOS) {
*data_size = 0;
return "";
}
+
+ /* Data lost, invalid, or otherwise unavailable. */
return NULL;
}
diff --git a/kernel/printk/printk_ringbuffer.h b/kernel/printk/printk_ringbuffer.h
index 3374a5a3303e..5c09061433f3 100644
--- a/kernel/printk/printk_ringbuffer.h
+++ b/kernel/printk/printk_ringbuffer.h
@@ -127,8 +127,22 @@ enum desc_state {
#define DESC_SV(id, state) (((unsigned long)state << DESC_FLAGS_SHIFT) | id)
#define DESC_ID_MASK (~DESC_FLAGS_MASK)
#define DESC_ID(sv) ((sv) & DESC_ID_MASK)
+
+/*
+ * Special data block logical position values (for fields of
+ * @prb_desc.text_blk_lpos).
+ *
+ * - Bit0 is used to identify if the record has no data block. (Implemented in
+ * the LPOS_DATALESS() macro.)
+ *
+ * - Bit1 specifies the reason for not having a data block.
+ *
+ * These special values could never be real lpos values because of the
+ * meta data and alignment padding of data blocks. (See to_blk_size() for
+ * details.)
+ */
#define FAILED_LPOS 0x1
-#define NO_LPOS 0x3
+#define EMPTY_LINE_LPOS 0x3
#define FAILED_BLK_LPOS \
{ \
--
2.39.2
Currently pr_flush() will only wait for records that were
available to readers at the time of the call. But there may be
more records (non-finalized) with following finalized records.
pr_flush() should wait for these to print as well. Particularly
because any trailing finalized records may be the messages that
the calling context wants to ensure are printed.
Fixes: 3b604ca81202 ("printk: add pr_flush()")
Signed-off-by: John Ogness <[email protected]>
---
kernel/printk/printk.c | 2 +-
kernel/printk/printk_ringbuffer.c | 96 +++++++++++++++++++++++++++++++
kernel/printk/printk_ringbuffer.h | 1 +
3 files changed, 98 insertions(+), 1 deletion(-)
diff --git a/kernel/printk/printk.c b/kernel/printk/printk.c
index 82dc2c7949b7..f3a7f5a6f6f8 100644
--- a/kernel/printk/printk.c
+++ b/kernel/printk/printk.c
@@ -3755,7 +3755,7 @@ static bool __pr_flush(struct console *con, int timeout_ms, bool reset_on_progre
might_sleep();
- seq = prb_next_seq(prb);
+ seq = prb_next_reserve_seq(prb);
/* Flush the consoles so that records up to @seq are printed. */
console_lock();
diff --git a/kernel/printk/printk_ringbuffer.c b/kernel/printk/printk_ringbuffer.c
index 7c121fd68330..dc83569d3a3a 100644
--- a/kernel/printk/printk_ringbuffer.c
+++ b/kernel/printk/printk_ringbuffer.c
@@ -2012,6 +2012,102 @@ static u64 prb_first_seq(struct printk_ringbuffer *rb)
return seq;
}
+/**
+ * prb_next_reserve_seq() - Get the sequence number after the most recently
+ * reserved record.
+ *
+ * @rb: The ringbuffer to get the sequence number from.
+ *
+ * This is the public function available to readers to see what sequence
+ * number will be assigned to the next reserved record.
+ *
+ * Note that depending on the situation, this value can be equal to or
+ * higher than the sequence number returned by prb_next_seq().
+ *
+ * Context: Any context.
+ * Return: The sequence number that will be assigned to the next record
+ * reserved.
+ */
+u64 prb_next_reserve_seq(struct printk_ringbuffer *rb)
+{
+ struct prb_desc_ring *desc_ring = &rb->desc_ring;
+ unsigned long last_finalized_id;
+ atomic_long_t *state_var;
+ u64 last_finalized_seq;
+ unsigned long head_id;
+ struct prb_desc desc;
+ unsigned long diff;
+ struct prb_desc *d;
+ int err;
+
+ /*
+ * It may not be possible to read a sequence number for @head_id.
+ * So the ID of @last_finailzed_seq is used to calculate what the
+ * sequence number of @head_id will be.
+ */
+
+try_again:
+ last_finalized_seq = desc_last_finalized_seq(desc_ring);
+
+ /*
+ * @head_id is loaded after @last_finalized_seq to ensure that it is
+ * at or beyond @last_finalized_seq.
+ *
+ * Memory barrier involvement:
+ *
+ * If desc_last_finalized_seq:A reads from
+ * desc_update_last_finalized:A, then
+ * prb_next_reserve_seq:A reads from desc_reserve:D.
+ *
+ * Relies on:
+ *
+ * RELEASE from desc_reserve:D to desc_update_last_finalized:A
+ * matching
+ * ACQUIRE from desc_last_finalized_seq:A to prb_next_reserve_seq:A
+ *
+ * Note: desc_reserve:D and desc_update_last_finalized:A can be
+ * different CPUs. However, the desc_update_last_finalized:A CPU
+ * (which performs the release) must have previously seen
+ * desc_read:C, which implies desc_reserve:D can be seen.
+ */
+ head_id = atomic_long_read(&desc_ring->head_id); /* LMM(prb_next_reserve_seq:A) */
+
+ d = to_desc(desc_ring, last_finalized_seq);
+ state_var = &d->state_var;
+
+ /* Extract the ID, used to specify the descriptor to read. */
+ last_finalized_id = DESC_ID(atomic_long_read(state_var));
+
+ /* Ensure @last_finalized_id is correct. */
+ err = desc_read_finalized_seq(desc_ring, last_finalized_id, last_finalized_seq, &desc);
+
+ if (err == -EINVAL) {
+ if (last_finalized_seq == 0) {
+ /*
+ * @last_finalized_seq still contains its initial
+ * value. Probably no record has been finalized yet.
+ * This means the ringbuffer is not yet full and the
+ * @head_id value can be used directly (subtracting
+ * off its initial value).
+ *
+ * Because of hack#2 of the bootstrapping phase, the
+ * @head_id initial value must be handled separately.
+ */
+ if (head_id == DESC0_ID(desc_ring->count_bits))
+ return 0;
+
+ last_finalized_id = DESC0_ID(desc_ring->count_bits) + 1;
+ } else {
+ /* Record must have been overwritten. Try again. */
+ goto try_again;
+ }
+ }
+
+ diff = head_id - last_finalized_id;
+
+ return (last_finalized_seq + diff);
+}
+
/*
* Non-blocking read of a record.
*
diff --git a/kernel/printk/printk_ringbuffer.h b/kernel/printk/printk_ringbuffer.h
index 5c09061433f3..b48b44ecbe6d 100644
--- a/kernel/printk/printk_ringbuffer.h
+++ b/kernel/printk/printk_ringbuffer.h
@@ -394,5 +394,6 @@ bool prb_read_valid_info(struct printk_ringbuffer *rb, u64 seq,
u64 prb_first_valid_seq(struct printk_ringbuffer *rb);
u64 prb_next_seq(struct printk_ringbuffer *rb);
+u64 prb_next_reserve_seq(struct printk_ringbuffer *rb);
#endif /* _KERNEL_PRINTK_RINGBUFFER_H */
--
2.39.2
Normally a reader will stop once reaching a non-finalized
record. However, when a panic happens, writers from other CPUs
(or an interrupted context on the panic CPU) may have been
writing a record and were unable to finalize it. The panic CPU
will reserve/commit/finalize its panic records, but these will
be located after the non-finalized records. This results in
panic() not flushing the panic messages.
Extend _prb_read_valid() to skip over non-finalized records if
on the panic CPU.
Fixes: 896fbe20b4e2 ("printk: use the lockless ringbuffer")
Signed-off-by: John Ogness <[email protected]>
---
kernel/printk/printk_ringbuffer.c | 22 ++++++++++++++++++++--
1 file changed, 20 insertions(+), 2 deletions(-)
diff --git a/kernel/printk/printk_ringbuffer.c b/kernel/printk/printk_ringbuffer.c
index dc83569d3a3a..584d2b213876 100644
--- a/kernel/printk/printk_ringbuffer.c
+++ b/kernel/printk/printk_ringbuffer.c
@@ -2116,6 +2116,10 @@ u64 prb_next_reserve_seq(struct printk_ringbuffer *rb)
*
* On failure @seq is updated to a record that is not yet available to the
* reader, but it will be the next record available to the reader.
+ *
+ * Note: When the current CPU is in panic, this function will skip over any
+ * non-existent/non-finalized records in order to allow the panic CPU
+ * to print any and all records that have been finalized.
*/
static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq,
struct printk_record *r, unsigned int *line_count)
@@ -2138,8 +2142,22 @@ static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq,
(*seq)++;
} else {
- /* Non-existent/non-finalized record. Must stop. */
- return false;
+ /*
+ * Non-existent/non-finalized record. Must stop.
+ *
+ * For panic situations it cannot be expected that
+ * non-finalized records will become finalized. But
+ * there may be other finalized records beyond that
+ * need to be printed for a panic situation. If this
+ * is the panic CPU, skip this
+ * non-existent/non-finalized record unless it is
+ * at or beyond the head, in which case it is not
+ * possible to continue.
+ */
+ if (this_cpu_in_panic() && ((*seq + 1) < prb_next_reserve_seq(rb)))
+ (*seq)++;
+ else
+ return false;
}
}
--
2.39.2
From: Petr Mladek <[email protected]>
The commit d51507098ff91 ("printk: disable optimistic spin
during panic") added checks to avoid becoming a console waiter
if a panic is in progress.
However, the transition to panic can occur while there is
already a waiter. The current owner should not pass the lock to
the waiter because it might get stopped or blocked anytime.
Also the panic context might pass the console lock owner to an
already stopped waiter by mistake. It might happen when
console_flush_on_panic() ignores the current lock owner, for
example:
CPU0 CPU1
---- ----
console_lock_spinning_enable()
console_trylock_spinning()
[CPU1 now console waiter]
NMI: panic()
panic_other_cpus_shutdown()
[stopped as console waiter]
console_flush_on_panic()
console_lock_spinning_enable()
[print 1 record]
console_lock_spinning_disable_and_check()
[handover to stopped CPU1]
This results in panic() not flushing the panic messages.
Fix these problems by disabling all spinning operations
completely during panic().
Another advantage is that it prevents possible deadlocks caused
by "console_owner_lock". The panic() context does not need to
take it any longer. The lockless checks are safe because the
functions become NOPs when they see the panic in progress. All
operations manipulating the state are still synchronized by the
lock even when non-panic CPUs would notice the panic
synchronously.
The current owner might stay spinning. But non-panic() CPUs
would get stopped anyway and the panic context will never start
spinning.
Fixes: dbdda842fe96 ("printk: Add console owner and waiter logic to load balance console writes")
Signed-off-by: Petr Mladek <[email protected]>
Reviewed-by: John Ogness <[email protected]>
---
kernel/printk/printk.c | 29 +++++++++++++++++++++++++++++
1 file changed, 29 insertions(+)
diff --git a/kernel/printk/printk.c b/kernel/printk/printk.c
index f3a7f5a6f6f8..cb99c854a648 100644
--- a/kernel/printk/printk.c
+++ b/kernel/printk/printk.c
@@ -1869,10 +1869,23 @@ static bool console_waiter;
*/
static void console_lock_spinning_enable(void)
{
+ /*
+ * Do not use spinning in panic(). The panic CPU wants to keep the lock.
+ * Non-panic CPUs abandon the flush anyway.
+ *
+ * Just keep the lockdep annotation. The panic-CPU should avoid
+ * taking console_owner_lock because it might cause a deadlock.
+ * This looks like the easiest way how to prevent false lockdep
+ * reports without handling races a lockless way.
+ */
+ if (panic_in_progress())
+ goto lockdep;
+
raw_spin_lock(&console_owner_lock);
console_owner = current;
raw_spin_unlock(&console_owner_lock);
+lockdep:
/* The waiter may spin on us after setting console_owner */
spin_acquire(&console_owner_dep_map, 0, 0, _THIS_IP_);
}
@@ -1897,6 +1910,22 @@ static int console_lock_spinning_disable_and_check(int cookie)
{
int waiter;
+ /*
+ * Ignore spinning waiters during panic() because they might get stopped
+ * or blocked at any time,
+ *
+ * It is safe because nobody is allowed to start spinning during panic
+ * in the first place. If there has been a waiter then non panic CPUs
+ * might stay spinning. They would get stopped anyway. The panic context
+ * will never start spinning and an interrupted spin on panic CPU will
+ * never continue.
+ */
+ if (panic_in_progress()) {
+ /* Keep lockdep happy. */
+ spin_release(&console_owner_dep_map, _THIS_IP_);
+ return 0;
+ }
+
raw_spin_lock(&console_owner_lock);
waiter = READ_ONCE(console_waiter);
console_owner = NULL;
--
2.39.2
based on my research this should be the most recent post of this patch.
If so then
On 2023-11-06 22:13:22 [+0106], John Ogness wrote:
> --- a/kernel/printk/printk_ringbuffer.c
> +++ b/kernel/printk/printk_ringbuffer.c
> @@ -6,6 +6,7 @@
> @@ -1441,20 +1445,144 @@ bool prb_reserve_in_last(struct prb_reserved_entry *e, struct printk_ringbuffer
> return false;
> }
>
> +#ifdef CONFIG_64BIT
> +
> +#define __u64seq_to_ulseq(u64seq) (u64seq)
> +#define __ulseq_to_u64seq(ulseq) (ulseq)
> +
> +#else /* CONFIG_64BIT */
> +
> +static u64 prb_first_seq(struct printk_ringbuffer *rb);
> +
> +#define __u64seq_to_ulseq(u64seq) ((u32)u64seq)
> +static inline u64 __ulseq_to_u64seq(u32 ulseq)
> +{
> + u64 rb_first_seq = prb_first_seq(prb);
> + u64 seq;
> +
> + /*
> + * The provided sequence is only the lower 32 bits of the ringbuffer
> + * sequence. It needs to be expanded to 64bit. Get the first sequence
> + * number from the ringbuffer and fold it.
> + */
> + seq = rb_first_seq - ((u32)rb_first_seq - ulseq);
This needs to become
seq = rb_first_seq - ((s32)((u32)rb_first_seq - ulseq));
in order to continue booting on 32bit. Otherwise, if this survives one
cycle then we can deprecate all 32bit platforms. I am happy either way.
Sebastian
On 2023-11-20, Sebastian Andrzej Siewior <[email protected]> wrote:
> based on my research this should be the most recent post of this patch.
> If so then
>
> On 2023-11-06 22:13:22 [+0106], John Ogness wrote:
>> --- a/kernel/printk/printk_ringbuffer.c
>> +++ b/kernel/printk/printk_ringbuffer.c
>> + /*
>> + * The provided sequence is only the lower 32 bits of the ringbuffer
>> + * sequence. It needs to be expanded to 64bit. Get the first sequence
>> + * number from the ringbuffer and fold it.
>> + */
>> + seq = rb_first_seq - ((u32)rb_first_seq - ulseq);
>
> This needs to become
> seq = rb_first_seq - ((s32)((u32)rb_first_seq - ulseq));
>
> in order to continue booting on 32bit.
Indeed. The code assumes the passed in value (@ulseq) always represents
a 64-bit number that is less than or equal to the basis value
(@rb_first_seq). For kernel/printk/nbcon.c:__nbcon_seq_to_seq() that
assumption is correct. For this function, it is not.
Your change will round up or down to the nearest 32 bits of the basis
value.
For example, with @rb_first_seq = 0x200000000 and @ulseq = 0x1:
before your change (where @ulseq cannot represent something higher than
@rb_first_seq):
@ulseq translates to 0x100000001
after your change:
@ulseq translates to 0x200000001
Since __ulseq_to_u64seq() must deal with arbitrary values, I think the
32-bit rounding is appropriate.
Despite not strictly being necessary (because of the valid assumption),
I think we should also update __nbcon_seq_to_seq() to avoid any bizarre
cases due to different translations of the 32-bit value.
In fact, there is no reason to have 2 macros for this. I will create a
single macro using the 32-bit rounding.
Thanks for researching this!
John