OK, I ended with these changes after changing my mind few times and hopefully
better understanding the aspects of preemption, interrupts, locking, and
scheduling.
1st patch fixes the race in ring_buffer_swap_cpu(). It is the one from previous
versions. I have only added checks for irqs_disabled() as suggested by Steven.
2nd patch should be easy. It only makes sure that we are on the same CPU
when handling the critical timings.
3rd patch was the most problematic for me. It creates two variants
of smp_call_function_single() that could be called either with or without
disabled interrupts. It is what Steven suggested but I wonder if it could
be done more elegant way. I am in somewhat strange mode after the twins were
born on Thursday.
Petr Mladek (3):
ring-buffer: Race when writing and swapping cpu buffer in parallel
trace: process entire stop_critical_timing() on the same CPU
trace: Allow to call update_max_tr_single() from another CPU
kernel/trace/ring_buffer.c | 115 ++++++++++++++++++++++++++++++-------------
kernel/trace/trace.c | 60 +++++++++++++++++++---
kernel/trace/trace.h | 1 +
kernel/trace/trace_irqsoff.c | 17 +++++--
4 files changed, 146 insertions(+), 47 deletions(-)
--
1.8.4
The trace/ring_buffer allows to swap the entire ring buffer. Everything has to
be done lockless. I think that I have found a race when trying to understand
the code. The problematic situation is the following:
CPU 1 (write/reserve event) CPU 2 (swap the cpu buffer)
-------------------------------------------------------------------------
ring_buffer_write()
if (cpu_buffer->record_disabled)
^^^ test fails and write continues
ring_buffer_swap_cpu()
inc(&cpu_buffer_a->record_disabled);
inc(&cpu_buffer_b->record_disabled);
if (cpu_buffer_a->committing)
^^^ test fails and swap continues
if (cpu_buffer_b->committing)
^^^ test fails and swap continues
rb_reserve_next_event()
rb_start_commit()
local_inc(&cpu_buffer->committing);
local_inc(&cpu_buffer->commits);
if (unlikely(ACCESS_ONCE(cpu_buffer->buffer) != buffer)) {
^^^ test fails and reserve_next continues
buffer_a->buffers[cpu] = cpu_buffer_b;
buffer_b->buffers[cpu] = cpu_buffer_a;
cpu_buffer_b->buffer = buffer_a;
cpu_buffer_a->buffer = buffer_b;
Pheeee, reservation continues in the removed buffer.
This patch solves the problem by swapping the CPU buffer on the same CPU. It
helps to avoid memory barriers and keep both writing and swapping fast.
The limitation is that it cannot longer be called from another CPU with
disabled IRQs. It affects check_critical_timing() and tracing_snapshot_write().
Both callers will get fixed by followup patches.
In addition, the patch removes the obsolete check in rb_reserve_next_event().
The swap operation is not allowed in NMI context, it is done on the same CPU,
and therefore it could newer interrupt any write.
Finally, it adds some comments about why the reserve and swap operations are
safe. I hope that I have got it right :-)
Changes:
+ v4: handle correctly call with disabled IRQs; add followup patches
to update callers in the tracing framework; problem found and solution
proposed by Steven Rostedt <[email protected]>
+ v3: removes the check in rb_reserve_next_event(); idea by Steven Rostedt
<[email protected]>
+ v2: does the swap on the same CPU instead of using memory barriers;
idea by Steven Rostedt <[email protected]>
Suggested-by: Steven Rostedt <[email protected]>
Signed-off-by: Petr Mladek <[email protected]>
---
kernel/trace/ring_buffer.c | 115 +++++++++++++++++++++++++++++++--------------
1 file changed, 81 insertions(+), 34 deletions(-)
diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index 7c56c3d06943..038b880347ba 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -2527,21 +2527,6 @@ rb_reserve_next_event(struct ring_buffer *buffer,
rb_start_commit(cpu_buffer);
-#ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
- /*
- * Due to the ability to swap a cpu buffer from a buffer
- * it is possible it was swapped before we committed.
- * (committing stops a swap). We check for it here and
- * if it happened, we have to fail the write.
- */
- barrier();
- if (unlikely(ACCESS_ONCE(cpu_buffer->buffer) != buffer)) {
- local_dec(&cpu_buffer->committing);
- local_dec(&cpu_buffer->commits);
- return NULL;
- }
-#endif
-
length = rb_calculate_event_length(length);
again:
add_timestamp = 0;
@@ -4280,23 +4265,42 @@ int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu);
#ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
+struct ring_buffer_swap_info {
+ struct ring_buffer *buffer_a; /* One buffer to swap with */
+ struct ring_buffer *buffer_b; /* The other buffer to swap with */
+ int ret; /* Return value from the swap op. */
+};
+
/**
- * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
- * @buffer_a: One buffer to swap with
- * @buffer_b: The other buffer to swap with
+ * ring_buffer_swap_this_cpu - swap CPU buffer, related to this CPU,
+ * between two ring buffers.
+ * @arg: arguments and return value is passed via struct ring_buffer_swap_info.
*
- * This function is useful for tracers that want to take a "snapshot"
- * of a CPU buffer and has another back up buffer lying around.
- * it is expected that the tracer handles the cpu buffer not being
- * used at the moment.
+ * The swapping of a CPU buffer is done on the same CPU. It helps to avoid
+ * memory barriers and keep writing fast. We can't use synchronize_sched
+ * here because this function can be called in atomic context.
*/
-int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
- struct ring_buffer *buffer_b, int cpu)
+static void ring_buffer_swap_this_cpu(void *arg)
{
+ struct ring_buffer_swap_info *rb_swap_info = arg;
+ struct ring_buffer *buffer_a;
+ struct ring_buffer *buffer_b;
struct ring_buffer_per_cpu *cpu_buffer_a;
struct ring_buffer_per_cpu *cpu_buffer_b;
- int ret = -EINVAL;
+ int cpu = smp_processor_id();
+ /*
+ * The buffer for this CPU is swapped. It does not make sense
+ * to call this function with IRQs enabled.
+ */
+ rb_swap_info->ret = -EPERM;
+ if (WARN_ON_ONCE(!irqs_disabled()))
+ return;
+
+ buffer_a = rb_swap_info->buffer_a;
+ buffer_b = rb_swap_info->buffer_b;
+
+ rb_swap_info->ret = -EINVAL;
if (!cpumask_test_cpu(cpu, buffer_a->cpumask) ||
!cpumask_test_cpu(cpu, buffer_b->cpumask))
goto out;
@@ -4308,7 +4312,8 @@ int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
goto out;
- ret = -EAGAIN;
+ /* Also check if the buffers can be manipulated */
+ rb_swap_info->ret = -EAGAIN;
if (ring_buffer_flags != RB_BUFFERS_ON)
goto out;
@@ -4326,15 +4331,19 @@ int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
goto out;
/*
- * We can't do a synchronize_sched here because this
- * function can be called in atomic context.
- * Normally this will be called from the same CPU as cpu.
- * If not it's up to the caller to protect this.
+ * Recording has to be disabled. Otherwise, ring_buffer_lock_reserve()
+ * and ring_buffer_unlock_commit() might operate with different
+ * cpu buffers.
+ *
+ * All other operations are safe. Read gets the CPU buffer only once
+ * and then works with it. Resize does the critical operation on the
+ * same CPU with disabled interrupts.
*/
atomic_inc(&cpu_buffer_a->record_disabled);
atomic_inc(&cpu_buffer_b->record_disabled);
- ret = -EBUSY;
+ /* Bail out if we interrupted some recording */
+ rb_swap_info->ret = -EBUSY;
if (local_read(&cpu_buffer_a->committing))
goto out_dec;
if (local_read(&cpu_buffer_b->committing))
@@ -4346,13 +4355,51 @@ int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
cpu_buffer_b->buffer = buffer_a;
cpu_buffer_a->buffer = buffer_b;
- ret = 0;
-
+ rb_swap_info->ret = 0;
out_dec:
atomic_dec(&cpu_buffer_a->record_disabled);
atomic_dec(&cpu_buffer_b->record_disabled);
out:
- return ret;
+ return;
+}
+
+/**
+ * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
+ * @buffer_a: One buffer to swap with
+ * @buffer_b: The other buffer to swap with
+ *
+ * This function is useful for tracers that want to take a "snapshot"
+ * of a CPU buffer and has another back up buffer lying around.
+ * It is expected that the tracer handles the cpu buffer not being
+ * used at the moment.
+ */
+int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
+ struct ring_buffer *buffer_b, int cpu)
+{
+ struct ring_buffer_swap_info rb_swap_info = {
+ .buffer_a = buffer_a,
+ .buffer_b = buffer_b,
+ };
+ int ret = 0;
+
+ /*
+ * Swap the CPU buffer on the same CPU. Recording has to be fast
+ * and and this helps to avoid memory barriers.
+ */
+ if (irqs_disabled()) {
+ /* Only allowed to swap current CPU if irqs are disabled */
+ if (WARN_ON_ONCE(cpu != smp_processor_id()))
+ return -EPERM;
+ ring_buffer_swap_this_cpu(&rb_swap_info);
+ } else {
+ ret = smp_call_function_single(cpu, ring_buffer_swap_this_cpu,
+ (void *)&rb_swap_info, 1);
+ }
+
+ if (ret)
+ return ret;
+
+ return rb_swap_info.ret;
}
EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
#endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */
--
1.8.4
The interrupts off latency tracing heavily depends on disabled interrupts.
This is also the case of stop_critical_timings(). There is the following
nested call:
+ stop_critical_timing()
+ check_critical_timing()
+ update_max_tr_single()
+ ring_buffer_swap_cpu()
, where ring_buffer_swap_cpu() works with disabled interrupts only when
the swap is done for this cpu.
This patch solves the problem by disabling preemption early in
stop_critical_timings() and doing the whole job on the same CPU.
Note that big part was done with disabled preemption even before.
See raw_spin_lock_irqsave() in check_critical_timing(). The extended
scope affects only few fast operations and should not cause any harm
to the system.
Signed-off-by: Petr Mladek <[email protected]>
---
kernel/trace/trace_irqsoff.c | 15 +++++++++++----
1 file changed, 11 insertions(+), 4 deletions(-)
diff --git a/kernel/trace/trace_irqsoff.c b/kernel/trace/trace_irqsoff.c
index 9bb104f748d0..1d4eeb304583 100644
--- a/kernel/trace/trace_irqsoff.c
+++ b/kernel/trace/trace_irqsoff.c
@@ -405,21 +405,26 @@ stop_critical_timing(unsigned long ip, unsigned long parent_ip)
struct trace_array_cpu *data;
unsigned long flags;
- cpu = raw_smp_processor_id();
+ /*
+ * We need to call ring_buffer_swap_cpu() with disabled interrupts.
+ * Let's make sure that it works with this CPU buffer.
+ */
+ cpu = get_cpu();
+
/* Always clear the tracing cpu on stopping the trace */
if (unlikely(per_cpu(tracing_cpu, cpu)))
per_cpu(tracing_cpu, cpu) = 0;
else
- return;
+ goto out;
if (!tracer_enabled || !tracing_is_enabled())
- return;
+ goto out;
data = per_cpu_ptr(tr->trace_buffer.data, cpu);
if (unlikely(!data) ||
!data->critical_start || atomic_read(&data->disabled))
- return;
+ goto out;
atomic_inc(&data->disabled);
@@ -428,6 +433,8 @@ stop_critical_timing(unsigned long ip, unsigned long parent_ip)
check_critical_timing(tr, data, parent_ip ? : ip, cpu);
data->critical_start = 0;
atomic_dec(&data->disabled);
+out:
+ put_cpu();
}
/* start and stop critical timings used to for stoppage (in idle) */
--
1.8.4
Any CPU specific tracing buffer could get switched by
echo 1 >/sys/kernel/debug/tracing/per_cpu/cpuN/snapshot
It triggers the following sequence of calls:
+ tracing_snapshot_write()
+ update_max_tr_single()
+ ring_buffer_swap_cpu()
We need to call ring_buffer_swap_cpu() with disabled interrupts because
it is done under "tr->max_lock" and it would cause races otherwise.
It also means that we need to call it from the right CPU but it is
not guaranteed by tracing_snapshot_write().
This patch solves the problem by renaming update_max_tr_single()
to update_max_tr_this_cpu(). It works with the current CPU instead
of the passed one. It can be called from check_critical_timing()
directly.
Also it introduces function with the original name update_max_tr_single()
that calls the CPU-specific one via smp_call_function_single().
It can be called from tracing_snapshot_write() but with enabled interrupts.
Otherwise, smp_call_function_single() would not work.
Signed-off-by: Petr Mladek <[email protected]>
---
kernel/trace/trace.c | 60 ++++++++++++++++++++++++++++++++++++++------
kernel/trace/trace.h | 1 +
kernel/trace/trace_irqsoff.c | 2 +-
3 files changed, 54 insertions(+), 9 deletions(-)
diff --git a/kernel/trace/trace.c b/kernel/trace/trace.c
index 4caa814d41c3..a2dc83b98cc0 100644
--- a/kernel/trace/trace.c
+++ b/kernel/trace/trace.c
@@ -1017,22 +1017,29 @@ update_max_tr(struct trace_array *tr, struct task_struct *tsk, int cpu)
}
/**
- * update_max_tr_single - only copy one trace over, and reset the rest
+ * update_max_tr_this_cpu_single - copy trace for this CPU, and reset the rest
* @tr - tracer
* @tsk - task with the latency
- * @cpu - the cpu of the buffer to copy.
*
- * Flip the trace of a single CPU buffer between the @tr and the max_tr.
+ * Flip the trace of this CPU buffer between the @tr and the max_tr.
+ *
+ * It is called on this CPU. Otherwise, ring_buffer_swap_cpu() would not work
+ * with disabled interrupts. They must be disabled to avoid a race under
+ * tr->max_lock and because this is used in irqsoff tracer that is sensitive
+ * about it.
*/
void
-update_max_tr_single(struct trace_array *tr, struct task_struct *tsk, int cpu)
+update_max_tr_this_cpu(struct trace_array *tr, struct task_struct *tsk)
{
+ int cpu;
int ret;
if (tr->stop_count)
return;
WARN_ON_ONCE(!irqs_disabled());
+ cpu = smp_processor_id();
+
if (!tr->allocated_snapshot) {
/* Only the nop tracer should hit this when disabling */
WARN_ON_ONCE(tr->current_trace != &nop_trace);
@@ -1059,6 +1066,42 @@ update_max_tr_single(struct trace_array *tr, struct task_struct *tsk, int cpu)
__update_max_tr(tr, tsk, cpu);
arch_spin_unlock(&tr->max_lock);
}
+
+struct update_max_tr_info {
+ struct trace_array *tr;
+ struct task_struct *tsk;
+};
+
+static void update_max_tr_function_single(void *arg)
+{
+ struct update_max_tr_info *tr_info = arg;
+ struct trace_array *tr = tr_info->tr;
+ struct task_struct *tsk = tr_info->tsk;
+
+ update_max_tr_this_cpu(tr, tsk);
+}
+
+/**
+ * update_max_tr_this_cpu_single - only copy one trace over, and reset the rest
+ * @tr - tracer
+ * @tsk - task with the latency
+ * @cpu - the cpu of the buffer to copy.
+ *
+ * Flip the trace of a single CPU buffer between the @tr and the max_tr.
+ */
+void
+update_max_tr_single(struct trace_array *tr, struct task_struct *tsk, int cpu)
+{
+ struct update_max_tr_info tr_info = {tr, tsk};
+
+
+ /*
+ * The flip has to be done on the same CPU, see
+ * update_max_tr_this_cpu() for more details.
+ */
+ smp_call_function_single(cpu, update_max_tr_function_single,
+ (void *)&tr_info, 1);
+}
#endif /* CONFIG_TRACER_MAX_TRACE */
static int wait_on_pipe(struct trace_iterator *iter)
@@ -5057,13 +5100,14 @@ tracing_snapshot_write(struct file *filp, const char __user *ubuf, size_t cnt,
if (ret < 0)
break;
}
- local_irq_disable();
/* Now, we're going to swap */
- if (iter->cpu_file == RING_BUFFER_ALL_CPUS)
+ if (iter->cpu_file == RING_BUFFER_ALL_CPUS) {
+ local_irq_disable();
update_max_tr(tr, current, smp_processor_id());
- else
+ local_irq_enable();
+ } else {
update_max_tr_single(tr, current, iter->cpu_file);
- local_irq_enable();
+ }
break;
default:
if (tr->allocated_snapshot) {
diff --git a/kernel/trace/trace.h b/kernel/trace/trace.h
index 9258f5a815db..5aa876b9190e 100644
--- a/kernel/trace/trace.h
+++ b/kernel/trace/trace.h
@@ -616,6 +616,7 @@ extern unsigned long tracing_thresh;
void update_max_tr(struct trace_array *tr, struct task_struct *tsk, int cpu);
void update_max_tr_single(struct trace_array *tr,
struct task_struct *tsk, int cpu);
+void update_max_tr_this_cpu(struct trace_array *tr, struct task_struct *tsk);
#endif /* CONFIG_TRACER_MAX_TRACE */
#ifdef CONFIG_STACKTRACE
diff --git a/kernel/trace/trace_irqsoff.c b/kernel/trace/trace_irqsoff.c
index 1d4eeb304583..90a28d5105e4 100644
--- a/kernel/trace/trace_irqsoff.c
+++ b/kernel/trace/trace_irqsoff.c
@@ -347,7 +347,7 @@ check_critical_timing(struct trace_array *tr,
if (likely(!is_tracing_stopped())) {
tr->max_latency = delta;
- update_max_tr_single(tr, current, cpu);
+ update_max_tr_this_cpu(tr, current);
}
max_sequence++;
--
1.8.4