Readers of rwbase can lock and unlock without taking any inner lock, if
that happens, we need the ordering provided by atomic operations to
satisfy the ordering semantics of lock/unlock. Without that, considering
the follow case:
{ X = 0 initially }
CPU 0 CPU 1
===== =====
rt_write_lock();
X = 1
rt_write_unlock():
atomic_add(READER_BIAS - WRITER_BIAS, ->readers);
// ->readers is READER_BIAS.
rt_read_lock():
if ((r = atomic_read(->readers)) < 0) // True
atomic_try_cmpxchg(->readers, r, r + 1); // succeed.
<acquire the read lock via fast path>
r1 = X; // r1 may be 0, because nothing prevent the reordering
// of "X=1" and atomic_add() on CPU 1.
Therefore audit every usage of atomic operations that may happen in a
fast path, and add necessary barriers.
Signed-off-by: Boqun Feng <[email protected]>
---
Hi Thomas and Peter,
Sorry I'm late for the party of PREEMPT_RT lock review. Just want to
point the problem with this patch. Not even compile test, but show the
idea and check if I'm missing something subtle.
Regards,
Boqun
kernel/locking/rwbase_rt.c | 34 ++++++++++++++++++++++++++++++----
1 file changed, 30 insertions(+), 4 deletions(-)
diff --git a/kernel/locking/rwbase_rt.c b/kernel/locking/rwbase_rt.c
index 4ba15088e640..a1886fd8bde6 100644
--- a/kernel/locking/rwbase_rt.c
+++ b/kernel/locking/rwbase_rt.c
@@ -41,6 +41,12 @@
* The risk of writer starvation is there, but the pathological use cases
* which trigger it are not necessarily the typical RT workloads.
*
+ * Fast-path orderings:
+ * The lock/unlock of readers can run in fast paths: lock and unlock are only
+ * atomic ops, and there is no inner lock to provide ACQUIRE and RELEASE
+ * semantics of rwbase_rt. Atomic ops then should be stronger than _acquire()
+ * and _release() to provide necessary ordering guarantee.
+ *
* Common code shared between RT rw_semaphore and rwlock
*/
@@ -53,6 +59,7 @@ static __always_inline int rwbase_read_trylock(struct rwbase_rt *rwb)
* set.
*/
for (r = atomic_read(&rwb->readers); r < 0;) {
+ /* Fully-ordered if cmpxchg() succeeds, provides ACQUIRE */
if (likely(atomic_try_cmpxchg(&rwb->readers, &r, r + 1)))
return 1;
}
@@ -162,6 +169,8 @@ static __always_inline void rwbase_read_unlock(struct rwbase_rt *rwb,
/*
* rwb->readers can only hit 0 when a writer is waiting for the
* active readers to leave the critical section.
+ *
+ * dec_and_test() is fully ordered, provides RELEASE.
*/
if (unlikely(atomic_dec_and_test(&rwb->readers)))
__rwbase_read_unlock(rwb, state);
@@ -172,7 +181,11 @@ static inline void __rwbase_write_unlock(struct rwbase_rt *rwb, int bias,
{
struct rt_mutex_base *rtm = &rwb->rtmutex;
- atomic_add(READER_BIAS - bias, &rwb->readers);
+ /*
+ * _release() is needed in case that reader is in fast path, pairing
+ * with atomic_try_cmpxchg() in rwbase_read_trylock(), provides RELEASE
+ */
+ (void)atomic_add_return_release(READER_BIAS - bias, &rwb->readers);
raw_spin_unlock_irqrestore(&rtm->wait_lock, flags);
rwbase_rtmutex_unlock(rtm);
}
@@ -216,8 +229,14 @@ static int __sched rwbase_write_lock(struct rwbase_rt *rwb,
*/
rwbase_set_and_save_current_state(state);
- /* Block until all readers have left the critical section. */
- for (; atomic_read(&rwb->readers);) {
+ /*
+ * Block until all readers have left the critical section.
+ *
+ * _acqurie() is needed in case that the reader side runs in the fast
+ * path, pairing with the atomic_dec_and_test() in rwbase_read_unlock(),
+ * provides ACQUIRE.
+ */
+ for (; atomic_read_acquire(&rwb->readers);) {
/* Optimized out for rwlocks */
if (rwbase_signal_pending_state(state, current)) {
__set_current_state(TASK_RUNNING);
@@ -229,6 +248,9 @@ static int __sched rwbase_write_lock(struct rwbase_rt *rwb,
/*
* Schedule and wait for the readers to leave the critical
* section. The last reader leaving it wakes the waiter.
+ *
+ * _acquire() is not needed, because we can rely on the smp_mb()
+ * in set_current_state() to provide ACQUIRE.
*/
if (atomic_read(&rwb->readers) != 0)
rwbase_schedule();
@@ -253,7 +275,11 @@ static inline int rwbase_write_trylock(struct rwbase_rt *rwb)
atomic_sub(READER_BIAS, &rwb->readers);
raw_spin_lock_irqsave(&rtm->wait_lock, flags);
- if (!atomic_read(&rwb->readers)) {
+ /*
+ * _acquire() is needed in case reader is in the fast path, pairing with
+ * rwbase_read_unlock(), provides ACQUIRE.
+ */
+ if (!atomic_read_acquire(&rwb->readers)) {
atomic_set(&rwb->readers, WRITER_BIAS);
raw_spin_unlock_irqrestore(&rtm->wait_lock, flags);
return 1;
--
2.32.0
On 9/1/21 11:06 AM, Boqun Feng wrote:
> Readers of rwbase can lock and unlock without taking any inner lock, if
> that happens, we need the ordering provided by atomic operations to
> satisfy the ordering semantics of lock/unlock. Without that, considering
> the follow case:
>
> { X = 0 initially }
>
> CPU 0 CPU 1
> ===== =====
> rt_write_lock();
> X = 1
> rt_write_unlock():
> atomic_add(READER_BIAS - WRITER_BIAS, ->readers);
> // ->readers is READER_BIAS.
> rt_read_lock():
> if ((r = atomic_read(->readers)) < 0) // True
> atomic_try_cmpxchg(->readers, r, r + 1); // succeed.
> <acquire the read lock via fast path>
>
> r1 = X; // r1 may be 0, because nothing prevent the reordering
> // of "X=1" and atomic_add() on CPU 1.
>
> Therefore audit every usage of atomic operations that may happen in a
> fast path, and add necessary barriers.
>
> Signed-off-by: Boqun Feng <[email protected]>
> ---
> Hi Thomas and Peter,
>
> Sorry I'm late for the party of PREEMPT_RT lock review. Just want to
> point the problem with this patch. Not even compile test, but show the
> idea and check if I'm missing something subtle.
>
> Regards,
> Boqun
>
>
> kernel/locking/rwbase_rt.c | 34 ++++++++++++++++++++++++++++++----
> 1 file changed, 30 insertions(+), 4 deletions(-)
>
> diff --git a/kernel/locking/rwbase_rt.c b/kernel/locking/rwbase_rt.c
> index 4ba15088e640..a1886fd8bde6 100644
> --- a/kernel/locking/rwbase_rt.c
> +++ b/kernel/locking/rwbase_rt.c
> @@ -41,6 +41,12 @@
> * The risk of writer starvation is there, but the pathological use cases
> * which trigger it are not necessarily the typical RT workloads.
> *
> + * Fast-path orderings:
> + * The lock/unlock of readers can run in fast paths: lock and unlock are only
> + * atomic ops, and there is no inner lock to provide ACQUIRE and RELEASE
> + * semantics of rwbase_rt. Atomic ops then should be stronger than _acquire()
> + * and _release() to provide necessary ordering guarantee.
> + *
> * Common code shared between RT rw_semaphore and rwlock
> */
>
> @@ -53,6 +59,7 @@ static __always_inline int rwbase_read_trylock(struct rwbase_rt *rwb)
> * set.
> */
> for (r = atomic_read(&rwb->readers); r < 0;) {
> + /* Fully-ordered if cmpxchg() succeeds, provides ACQUIRE */
> if (likely(atomic_try_cmpxchg(&rwb->readers, &r, r + 1)))
Should we also change *cmpxchg() to cmpxchg_acquire() as it is a little
cheaper for ll/sc arches?
The other changes look good to me.
Cheers,
Longman
On Wed, 01 Sep 2021, Boqun Feng wrote:
>diff --git a/kernel/locking/rwbase_rt.c b/kernel/locking/rwbase_rt.c
>index 4ba15088e640..a1886fd8bde6 100644
>--- a/kernel/locking/rwbase_rt.c
>+++ b/kernel/locking/rwbase_rt.c
>@@ -41,6 +41,12 @@
> * The risk of writer starvation is there, but the pathological use cases
> * which trigger it are not necessarily the typical RT workloads.
> *
>+ * Fast-path orderings:
>+ * The lock/unlock of readers can run in fast paths: lock and unlock are only
>+ * atomic ops, and there is no inner lock to provide ACQUIRE and RELEASE
>+ * semantics of rwbase_rt. Atomic ops then should be stronger than _acquire()
>+ * and _release() to provide necessary ordering guarantee.
Perhaps the following instead?
+ * Ordering guarantees: As with any locking primitive, (load)-ACQUIRE and
+ * (store)-RELEASE semantics are guaranteed for lock and unlock operations,
+ * respectively; such that nothing leaks out of the critical region. When
+ * writers are involved this is provided through the rtmutex. However, for
+ * reader fast-paths, the atomics provide at least such guarantees.
Also, I think you could remove most of the comments wrt _acquire or _release
in the fastpath for each ->readers atomic op, unless it isn't obvious.
>+ *
> * Common code shared between RT rw_semaphore and rwlock
> */
>
>@@ -53,6 +59,7 @@ static __always_inline int rwbase_read_trylock(struct rwbase_rt *rwb)
> * set.
> */
> for (r = atomic_read(&rwb->readers); r < 0;) {
Unrelated, but we probably wanna get rid of these abusing for-loops throughout.
>+ /* Fully-ordered if cmpxchg() succeeds, provides ACQUIRE */
> if (likely(atomic_try_cmpxchg(&rwb->readers, &r, r + 1)))
As Waiman suggested, this can be _acquire() - albeit we're only missing
an L->L for acquire semantics upon returning, per the control dependency
already guaranteeing L->S. That way we would loop with _relaxed().
> return 1;
> }
>@@ -162,6 +169,8 @@ static __always_inline void rwbase_read_unlock(struct rwbase_rt *rwb,
> /*
> * rwb->readers can only hit 0 when a writer is waiting for the
> * active readers to leave the critical section.
>+ *
>+ * dec_and_test() is fully ordered, provides RELEASE.
> */
> if (unlikely(atomic_dec_and_test(&rwb->readers)))
> __rwbase_read_unlock(rwb, state);
>@@ -172,7 +181,11 @@ static inline void __rwbase_write_unlock(struct rwbase_rt *rwb, int bias,
> {
> struct rt_mutex_base *rtm = &rwb->rtmutex;
>
>- atomic_add(READER_BIAS - bias, &rwb->readers);
>+ /*
>+ * _release() is needed in case that reader is in fast path, pairing
>+ * with atomic_try_cmpxchg() in rwbase_read_trylock(), provides RELEASE
>+ */
>+ (void)atomic_add_return_release(READER_BIAS - bias, &rwb->readers);
Hmmm while defined, there are no users atomic_add_return_release (yet?). I think
this is because the following is preferred when the return value is not really
wanted, but only the Rmw ordering it provides:
+ smp_mb__before_atomic(); /* provide RELEASE semantics */
atomic_add(READER_BIAS - bias, &rwb->readers);
raw_spin_unlock_irqrestore(&rtm->wait_lock, flags);
rwbase_rtmutex_unlock(rtm);
> raw_spin_unlock_irqrestore(&rtm->wait_lock, flags);
> rwbase_rtmutex_unlock(rtm);
> }
Thanks,
Davidlohr
On Wed, Sep 01, 2021 at 01:22:42PM -0700, Davidlohr Bueso wrote:
> On Wed, 01 Sep 2021, Boqun Feng wrote:
> > diff --git a/kernel/locking/rwbase_rt.c b/kernel/locking/rwbase_rt.c
> > index 4ba15088e640..a1886fd8bde6 100644
> > --- a/kernel/locking/rwbase_rt.c
> > +++ b/kernel/locking/rwbase_rt.c
> > @@ -41,6 +41,12 @@
> > * The risk of writer starvation is there, but the pathological use cases
> > * which trigger it are not necessarily the typical RT workloads.
> > *
> > + * Fast-path orderings:
> > + * The lock/unlock of readers can run in fast paths: lock and unlock are only
> > + * atomic ops, and there is no inner lock to provide ACQUIRE and RELEASE
> > + * semantics of rwbase_rt. Atomic ops then should be stronger than _acquire()
> > + * and _release() to provide necessary ordering guarantee.
>
> Perhaps the following instead?
>
Thanks.
> + * Ordering guarantees: As with any locking primitive, (load)-ACQUIRE and
> + * (store)-RELEASE semantics are guaranteed for lock and unlock operations,
> + * respectively; such that nothing leaks out of the critical region. When
> + * writers are involved this is provided through the rtmutex. However, for
> + * reader fast-paths, the atomics provide at least such guarantees.
>
However, this is a bit inaccurate, yes, writers always acquire the lock
(->readers) in the critical sections of ->wait_lock. But if readers run
the fast-paths, the atomics of the writers have to provide the ordering,
because we can rely on rtmutex orderings only if both sides run in
slow-paths.
> Also, I think you could remove most of the comments wrt _acquire or _release
> in the fastpath for each ->readers atomic op, unless it isn't obvious.
>
> > + *
> > * Common code shared between RT rw_semaphore and rwlock
> > */
> >
> > @@ -53,6 +59,7 @@ static __always_inline int rwbase_read_trylock(struct rwbase_rt *rwb)
> > * set.
> > */
> > for (r = atomic_read(&rwb->readers); r < 0;) {
>
> Unrelated, but we probably wanna get rid of these abusing for-loops throughout.
>
Agreed, let me see what I can do.
> > + /* Fully-ordered if cmpxchg() succeeds, provides ACQUIRE */
> > if (likely(atomic_try_cmpxchg(&rwb->readers, &r, r + 1)))
>
> As Waiman suggested, this can be _acquire() - albeit we're only missing
> an L->L for acquire semantics upon returning, per the control dependency
> already guaranteeing L->S. That way we would loop with _relaxed().
>
_acquire() is fine, I think. But probably a separate patch. We should be
careful when relaxing the ordering, and perhaps, with some performance
numbers showing the benefits.
> > return 1;
> > }
> > @@ -162,6 +169,8 @@ static __always_inline void rwbase_read_unlock(struct rwbase_rt *rwb,
> > /*
> > * rwb->readers can only hit 0 when a writer is waiting for the
> > * active readers to leave the critical section.
> > + *
> > + * dec_and_test() is fully ordered, provides RELEASE.
> > */
> > if (unlikely(atomic_dec_and_test(&rwb->readers)))
> > __rwbase_read_unlock(rwb, state);
> > @@ -172,7 +181,11 @@ static inline void __rwbase_write_unlock(struct rwbase_rt *rwb, int bias,
> > {
> > struct rt_mutex_base *rtm = &rwb->rtmutex;
> >
> > - atomic_add(READER_BIAS - bias, &rwb->readers);
> > + /*
> > + * _release() is needed in case that reader is in fast path, pairing
> > + * with atomic_try_cmpxchg() in rwbase_read_trylock(), provides RELEASE
> > + */
> > + (void)atomic_add_return_release(READER_BIAS - bias, &rwb->readers);
>
> Hmmm while defined, there are no users atomic_add_return_release (yet?). I think
There is a usage of atomic_sub_return_release() in queued_spin_unlock()
;-)
> this is because the following is preferred when the return value is not really
> wanted, but only the Rmw ordering it provides:
>
> + smp_mb__before_atomic(); /* provide RELEASE semantics */
> atomic_add(READER_BIAS - bias, &rwb->readers);
> raw_spin_unlock_irqrestore(&rtm->wait_lock, flags);
> rwbase_rtmutex_unlock(rtm);
>
smp_mb__before_atomic() + atomic will be a smp_mb() + atomic on weakly
ordered archs (e.g. ARM64 and PowerPC), while atomic_*_return_release()
will be a release atomic operation (e.g. ldxr/stxlr on ARM64), the
latter is considered more cheap.
Regards,
Boqun
> > raw_spin_unlock_irqrestore(&rtm->wait_lock, flags);
> > rwbase_rtmutex_unlock(rtm);
> > }
>
> Thanks,
> Davidlohr
On Wed, Sep 01, 2021 at 11:06:27PM +0800, Boqun Feng wrote:
> Sorry I'm late for the party of PREEMPT_RT lock review. Just want to
> point the problem with this patch. Not even compile test, but show the
> idea and check if I'm missing something subtle.
No worries, glad you could have a look. I think you're right and we
missed this.
> diff --git a/kernel/locking/rwbase_rt.c b/kernel/locking/rwbase_rt.c
> index 4ba15088e640..a1886fd8bde6 100644
> --- a/kernel/locking/rwbase_rt.c
> +++ b/kernel/locking/rwbase_rt.c
> @@ -41,6 +41,12 @@
> * The risk of writer starvation is there, but the pathological use cases
> * which trigger it are not necessarily the typical RT workloads.
> *
> + * Fast-path orderings:
> + * The lock/unlock of readers can run in fast paths: lock and unlock are only
> + * atomic ops, and there is no inner lock to provide ACQUIRE and RELEASE
> + * semantics of rwbase_rt. Atomic ops then should be stronger than _acquire()
> + * and _release() to provide necessary ordering guarantee.
> + *
> * Common code shared between RT rw_semaphore and rwlock
> */
>
> @@ -53,6 +59,7 @@ static __always_inline int rwbase_read_trylock(struct rwbase_rt *rwb)
> * set.
> */
> for (r = atomic_read(&rwb->readers); r < 0;) {
> + /* Fully-ordered if cmpxchg() succeeds, provides ACQUIRE */
> if (likely(atomic_try_cmpxchg(&rwb->readers, &r, r + 1)))
> return 1;
> }
> @@ -162,6 +169,8 @@ static __always_inline void rwbase_read_unlock(struct rwbase_rt *rwb,
> /*
> * rwb->readers can only hit 0 when a writer is waiting for the
> * active readers to leave the critical section.
> + *
> + * dec_and_test() is fully ordered, provides RELEASE.
> */
> if (unlikely(atomic_dec_and_test(&rwb->readers)))
> __rwbase_read_unlock(rwb, state);
> @@ -172,7 +181,11 @@ static inline void __rwbase_write_unlock(struct rwbase_rt *rwb, int bias,
> {
> struct rt_mutex_base *rtm = &rwb->rtmutex;
>
> - atomic_add(READER_BIAS - bias, &rwb->readers);
> + /*
> + * _release() is needed in case that reader is in fast path, pairing
> + * with atomic_try_cmpxchg() in rwbase_read_trylock(), provides RELEASE
> + */
> + (void)atomic_add_return_release(READER_BIAS - bias, &rwb->readers);
Very narrow race with the unlock below, but yes agreed.
> raw_spin_unlock_irqrestore(&rtm->wait_lock, flags);
> rwbase_rtmutex_unlock(rtm);
> }
> @@ -216,8 +229,14 @@ static int __sched rwbase_write_lock(struct rwbase_rt *rwb,
> */
> rwbase_set_and_save_current_state(state);
>
> - /* Block until all readers have left the critical section. */
> - for (; atomic_read(&rwb->readers);) {
> + /*
> + * Block until all readers have left the critical section.
> + *
> + * _acqurie() is needed in case that the reader side runs in the fast
> + * path, pairing with the atomic_dec_and_test() in rwbase_read_unlock(),
> + * provides ACQUIRE.
> + */
> + for (; atomic_read_acquire(&rwb->readers);) {
> /* Optimized out for rwlocks */
> if (rwbase_signal_pending_state(state, current)) {
> __set_current_state(TASK_RUNNING);
I think we can restructure things to avoid this one, but yes. Suppose we
do:
readers = atomic_sub_return_relaxed(READER_BIAS, &rwb->readers);
/*
* These two provide either an smp_mb() or an UNLOCK+LOCK
* ordering, either is strong enough to provide ACQUIRE order
* for the above load of @readers.
*/
rwbase_set_and_save_current_state(state);
raw_spin_lock_irqsave(&rtm->wait_lock, flags);
while (readers) {
...
readers = atomic_read(&rwb->readers);
if (readers)
rwbase_schedule();
...
}
> @@ -229,6 +248,9 @@ static int __sched rwbase_write_lock(struct rwbase_rt *rwb,
> /*
> * Schedule and wait for the readers to leave the critical
> * section. The last reader leaving it wakes the waiter.
> + *
> + * _acquire() is not needed, because we can rely on the smp_mb()
> + * in set_current_state() to provide ACQUIRE.
> */
> if (atomic_read(&rwb->readers) != 0)
> rwbase_schedule();
> @@ -253,7 +275,11 @@ static inline int rwbase_write_trylock(struct rwbase_rt *rwb)
> atomic_sub(READER_BIAS, &rwb->readers);
>
> raw_spin_lock_irqsave(&rtm->wait_lock, flags);
> - if (!atomic_read(&rwb->readers)) {
> + /*
> + * _acquire() is needed in case reader is in the fast path, pairing with
> + * rwbase_read_unlock(), provides ACQUIRE.
> + */
> + if (!atomic_read_acquire(&rwb->readers)) {
Moo; the alternative is using dec_and_lock instead of dec_and_test, but
that's not going to be worth it.
> atomic_set(&rwb->readers, WRITER_BIAS);
> raw_spin_unlock_irqrestore(&rtm->wait_lock, flags);
> return 1;
> --
> 2.32.0
>
On Thu, Sep 02, 2021 at 01:55:29PM +0200, Peter Zijlstra wrote:
[...]
> > raw_spin_unlock_irqrestore(&rtm->wait_lock, flags);
> > rwbase_rtmutex_unlock(rtm);
> > }
> > @@ -216,8 +229,14 @@ static int __sched rwbase_write_lock(struct rwbase_rt *rwb,
> > */
> > rwbase_set_and_save_current_state(state);
> >
> > - /* Block until all readers have left the critical section. */
> > - for (; atomic_read(&rwb->readers);) {
> > + /*
> > + * Block until all readers have left the critical section.
> > + *
> > + * _acqurie() is needed in case that the reader side runs in the fast
> > + * path, pairing with the atomic_dec_and_test() in rwbase_read_unlock(),
> > + * provides ACQUIRE.
> > + */
> > + for (; atomic_read_acquire(&rwb->readers);) {
> > /* Optimized out for rwlocks */
> > if (rwbase_signal_pending_state(state, current)) {
> > __set_current_state(TASK_RUNNING);
>
> I think we can restructure things to avoid this one, but yes. Suppose we
> do:
>
> readers = atomic_sub_return_relaxed(READER_BIAS, &rwb->readers);
>
> /*
> * These two provide either an smp_mb() or an UNLOCK+LOCK
By "UNLOCK+LOCK", you mean unlock(->pi_lock) + lock(->wait_lock), right?
This may be unrelated, but in our memory model only unlock+lock pairs on
the same lock provide TSO-like ordering ;-) IOW, unlock(->pi_lock) +
lock(->wait_lock) on the same CPU doesn't order reads before and after.
Consider the following litmus:
C unlock-lock
{
}
P0(spinlock_t *s, spinlock_t *p, int *x, int *y)
{
int r1;
int r2;
spin_lock(s);
r1 = READ_ONCE(*x);
spin_unlock(s);
spin_lock(p);
r2 = READ_ONCE(*y);
spin_unlock(p);
}
P1(int *x, int *y)
{
WRITE_ONCE(*y, 1);
smp_wmb();
WRITE_ONCE(*x, 1);
}
exists (0:r1=1 /\ 0:r2=0)
herd result:
Test unlock-lock Allowed
States 4
0:r1=0; 0:r2=0;
0:r1=0; 0:r2=1;
0:r1=1; 0:r2=0;
0:r1=1; 0:r2=1;
Ok
Witnesses
Positive: 1 Negative: 3
Condition exists (0:r1=1 /\ 0:r2=0)
Observation unlock-lock Sometimes 1 3
Time unlock-lock 0.01
Hash=a8b772fd25f963f73a0d8e70e36ee255
> * ordering, either is strong enough to provide ACQUIRE order
> * for the above load of @readers.
> */
> rwbase_set_and_save_current_state(state);
> raw_spin_lock_irqsave(&rtm->wait_lock, flags);
>
> while (readers) {
> ...
> readers = atomic_read(&rwb->readers);
The above should be _acquire(), right? Pairs with the last reader
exiting the critical section and dec ->readers to 0. If so, it
undermines the necessity of the restructure?
Regards,
Boqun
> if (readers)
> rwbase_schedule();
> ...
> }
>
>
> > @@ -229,6 +248,9 @@ static int __sched rwbase_write_lock(struct rwbase_rt *rwb,
> > /*
> > * Schedule and wait for the readers to leave the critical
> > * section. The last reader leaving it wakes the waiter.
> > + *
> > + * _acquire() is not needed, because we can rely on the smp_mb()
> > + * in set_current_state() to provide ACQUIRE.
> > */
> > if (atomic_read(&rwb->readers) != 0)
> > rwbase_schedule();
> > @@ -253,7 +275,11 @@ static inline int rwbase_write_trylock(struct rwbase_rt *rwb)
> > atomic_sub(READER_BIAS, &rwb->readers);
> >
> > raw_spin_lock_irqsave(&rtm->wait_lock, flags);
> > - if (!atomic_read(&rwb->readers)) {
> > + /*
> > + * _acquire() is needed in case reader is in the fast path, pairing with
> > + * rwbase_read_unlock(), provides ACQUIRE.
> > + */
> > + if (!atomic_read_acquire(&rwb->readers)) {
>
> Moo; the alternative is using dec_and_lock instead of dec_and_test, but
> that's not going to be worth it.
>
> > atomic_set(&rwb->readers, WRITER_BIAS);
> > raw_spin_unlock_irqrestore(&rtm->wait_lock, flags);
> > return 1;
> > --
> > 2.32.0
> >
On Fri, Sep 03, 2021 at 10:50:58PM +0800, Boqun Feng wrote:
> On Thu, Sep 02, 2021 at 01:55:29PM +0200, Peter Zijlstra wrote:
> [...]
> > > raw_spin_unlock_irqrestore(&rtm->wait_lock, flags);
> > > rwbase_rtmutex_unlock(rtm);
> > > }
> > > @@ -216,8 +229,14 @@ static int __sched rwbase_write_lock(struct rwbase_rt *rwb,
> > > */
> > > rwbase_set_and_save_current_state(state);
> > >
> > > - /* Block until all readers have left the critical section. */
> > > - for (; atomic_read(&rwb->readers);) {
> > > + /*
> > > + * Block until all readers have left the critical section.
> > > + *
> > > + * _acqurie() is needed in case that the reader side runs in the fast
> > > + * path, pairing with the atomic_dec_and_test() in rwbase_read_unlock(),
> > > + * provides ACQUIRE.
> > > + */
> > > + for (; atomic_read_acquire(&rwb->readers);) {
> > > /* Optimized out for rwlocks */
> > > if (rwbase_signal_pending_state(state, current)) {
> > > __set_current_state(TASK_RUNNING);
> >
> > I think we can restructure things to avoid this one, but yes. Suppose we
> > do:
> >
> > readers = atomic_sub_return_relaxed(READER_BIAS, &rwb->readers);
> >
> > /*
> > * These two provide either an smp_mb() or an UNLOCK+LOCK
>
> By "UNLOCK+LOCK", you mean unlock(->pi_lock) + lock(->wait_lock), right?
> This may be unrelated, but in our memory model only unlock+lock pairs on
> the same lock provide TSO-like ordering ;-) IOW, unlock(->pi_lock) +
> lock(->wait_lock) on the same CPU doesn't order reads before and after.
Hurpm.. what actual hardware does that? PPC uses LWSYNC for
ACQUIRE/RELEASE, and ARM64 has RCsc RELEASE+ACQUIRE ordering.
Both should provide RC-TSO (or stronger) for UNLOCK-A + LOCK-B.
On Fri, Sep 03, 2021 at 10:50:58PM +0800, Boqun Feng wrote:
> > * ordering, either is strong enough to provide ACQUIRE order
> > * for the above load of @readers.
> > */
> > rwbase_set_and_save_current_state(state);
> > raw_spin_lock_irqsave(&rtm->wait_lock, flags);
> >
> > while (readers) {
> > ...
> > readers = atomic_read(&rwb->readers);
>
> The above should be _acquire(), right? Pairs with the last reader
> exiting the critical section and dec ->readers to 0. If so, it
> undermines the necessity of the restructure?
This is the one that's followed by set_current_state(), no?
On Sat, Sep 04, 2021 at 12:14:29PM +0200, Peter Zijlstra wrote:
> On Fri, Sep 03, 2021 at 10:50:58PM +0800, Boqun Feng wrote:
> > > * ordering, either is strong enough to provide ACQUIRE order
> > > * for the above load of @readers.
> > > */
> > > rwbase_set_and_save_current_state(state);
> > > raw_spin_lock_irqsave(&rtm->wait_lock, flags);
> > >
> > > while (readers) {
> > > ...
> > > readers = atomic_read(&rwb->readers);
> >
> > The above should be _acquire(), right? Pairs with the last reader
> > exiting the critical section and dec ->readers to 0. If so, it
> > undermines the necessity of the restructure?
>
> This is the one that's followed by set_current_state(), no?
You're right. I was missing that ;-/
Regards,
Boqun
>
On Wed, Sep 08, 2021 at 01:51:24PM +0200, Peter Zijlstra wrote:
> On Wed, Sep 01, 2021 at 11:06:27PM +0800, Boqun Feng wrote:
> @@ -201,23 +207,30 @@ static int __sched rwbase_write_lock(struct rwbase_rt *rwb,
> {
> struct rt_mutex_base *rtm = &rwb->rtmutex;
> unsigned long flags;
> + int readers;
>
> /* Take the rtmutex as a first step */
> if (rwbase_rtmutex_lock_state(rtm, state))
> return -EINTR;
>
> /* Force readers into slow path */
> - atomic_sub(READER_BIAS, &rwb->readers);
> + readers = atomic_sub_return_relaxed(READER_BIAS, &rwb->readers);
Hurmph... the above really begs for something like
if (!readers)
return 0;
But then we needs that _acquire() thing again :/
On Wed, Sep 01, 2021 at 11:06:27PM +0800, Boqun Feng wrote:
> Readers of rwbase can lock and unlock without taking any inner lock, if
> that happens, we need the ordering provided by atomic operations to
> satisfy the ordering semantics of lock/unlock. Without that, considering
> the follow case:
>
> { X = 0 initially }
>
> CPU 0 CPU 1
> ===== =====
> rt_write_lock();
> X = 1
> rt_write_unlock():
> atomic_add(READER_BIAS - WRITER_BIAS, ->readers);
> // ->readers is READER_BIAS.
> rt_read_lock():
> if ((r = atomic_read(->readers)) < 0) // True
> atomic_try_cmpxchg(->readers, r, r + 1); // succeed.
> <acquire the read lock via fast path>
>
> r1 = X; // r1 may be 0, because nothing prevent the reordering
> // of "X=1" and atomic_add() on CPU 1.
>
> Therefore audit every usage of atomic operations that may happen in a
> fast path, and add necessary barriers.
>
> Signed-off-by: Boqun Feng <[email protected]>
> ---
Does this work for you?
diff --git a/kernel/locking/rwbase_rt.c b/kernel/locking/rwbase_rt.c
index 4ba15088e640..4685d3780683 100644
--- a/kernel/locking/rwbase_rt.c
+++ b/kernel/locking/rwbase_rt.c
@@ -53,6 +53,7 @@ static __always_inline int rwbase_read_trylock(struct rwbase_rt *rwb)
* set.
*/
for (r = atomic_read(&rwb->readers); r < 0;) {
+ /* Fully ordered on success, provides ACQUIRE */
if (likely(atomic_try_cmpxchg(&rwb->readers, &r, r + 1)))
return 1;
}
@@ -162,6 +163,8 @@ static __always_inline void rwbase_read_unlock(struct rwbase_rt *rwb,
/*
* rwb->readers can only hit 0 when a writer is waiting for the
* active readers to leave the critical section.
+ *
+ * Fully ordered, provides RELEASE.
*/
if (unlikely(atomic_dec_and_test(&rwb->readers)))
__rwbase_read_unlock(rwb, state);
@@ -172,7 +175,10 @@ static inline void __rwbase_write_unlock(struct rwbase_rt *rwb, int bias,
{
struct rt_mutex_base *rtm = &rwb->rtmutex;
- atomic_add(READER_BIAS - bias, &rwb->readers);
+ /*
+ * Needs to provide RELEASE vs rwbase_read_trylock().
+ */
+ (void)atomic_fetch_add_release(READER_BIAS - bias, &rwb->readers);
raw_spin_unlock_irqrestore(&rtm->wait_lock, flags);
rwbase_rtmutex_unlock(rtm);
}
@@ -201,23 +207,30 @@ static int __sched rwbase_write_lock(struct rwbase_rt *rwb,
{
struct rt_mutex_base *rtm = &rwb->rtmutex;
unsigned long flags;
+ int readers;
/* Take the rtmutex as a first step */
if (rwbase_rtmutex_lock_state(rtm, state))
return -EINTR;
/* Force readers into slow path */
- atomic_sub(READER_BIAS, &rwb->readers);
+ readers = atomic_sub_return_relaxed(READER_BIAS, &rwb->readers);
- raw_spin_lock_irqsave(&rtm->wait_lock, flags);
/*
* set_current_state() for rw_semaphore
* current_save_and_set_rtlock_wait_state() for rwlock
*/
rwbase_set_and_save_current_state(state);
+ raw_spin_lock_irqsave(&rtm->wait_lock, flags);
- /* Block until all readers have left the critical section. */
- for (; atomic_read(&rwb->readers);) {
+ /*
+ * Block until all readers have left the critical section.
+ *
+ * In the case of !readers, the above implies TSO ordering
+ * at the very least and hence provides ACQUIRE vs the earlier
+ * atomic_sub_return_relaxed().
+ */
+ while (readers) {
/* Optimized out for rwlocks */
if (rwbase_signal_pending_state(state, current)) {
__set_current_state(TASK_RUNNING);
@@ -230,8 +243,12 @@ static int __sched rwbase_write_lock(struct rwbase_rt *rwb,
* Schedule and wait for the readers to leave the critical
* section. The last reader leaving it wakes the waiter.
*/
- if (atomic_read(&rwb->readers) != 0)
+ readers = atomic_read(&rwb->readers);
+ if (readers != 0)
rwbase_schedule();
+ /*
+ * Implies smp_mb() and provides ACQUIRE for the !readers case.
+ */
set_current_state(state);
raw_spin_lock_irqsave(&rtm->wait_lock, flags);
}
@@ -253,7 +270,10 @@ static inline int rwbase_write_trylock(struct rwbase_rt *rwb)
atomic_sub(READER_BIAS, &rwb->readers);
raw_spin_lock_irqsave(&rtm->wait_lock, flags);
- if (!atomic_read(&rwb->readers)) {
+ /*
+ * Needs ACQUIRE vs rwbase_read_unlock();
+ */
+ if (!atomic_read_acquire(&rwb->readers)) {
atomic_set(&rwb->readers, WRITER_BIAS);
raw_spin_unlock_irqrestore(&rtm->wait_lock, flags);
return 1;
On Wed, Sep 08, 2021 at 01:51:24PM +0200, Peter Zijlstra wrote:
[...]
> @@ -201,23 +207,30 @@ static int __sched rwbase_write_lock(struct rwbase_rt *rwb,
> {
> struct rt_mutex_base *rtm = &rwb->rtmutex;
> unsigned long flags;
> + int readers;
>
> /* Take the rtmutex as a first step */
> if (rwbase_rtmutex_lock_state(rtm, state))
> return -EINTR;
>
> /* Force readers into slow path */
> - atomic_sub(READER_BIAS, &rwb->readers);
> + readers = atomic_sub_return_relaxed(READER_BIAS, &rwb->readers);
>
> - raw_spin_lock_irqsave(&rtm->wait_lock, flags);
> /*
> * set_current_state() for rw_semaphore
> * current_save_and_set_rtlock_wait_state() for rwlock
> */
> rwbase_set_and_save_current_state(state);
rwbase_set_and_save_current_state() may eventually call
current_save_and_set_rtlock_wait_state(), which requires being called
with irq-off, while rwbase_write_lock() may be called with irq-on. I
guess we can change the raw_spin_lock() to raw_spin_lock_irqsave() in
current_save_and_set_rtlock_wait_state() to solve this.
Regards,
Boqun
> + raw_spin_lock_irqsave(&rtm->wait_lock, flags);
>
> - /* Block until all readers have left the critical section. */
> - for (; atomic_read(&rwb->readers);) {
> + /*
> + * Block until all readers have left the critical section.
> + *
> + * In the case of !readers, the above implies TSO ordering
> + * at the very least and hence provides ACQUIRE vs the earlier
> + * atomic_sub_return_relaxed().
> + */
> + while (readers) {
> /* Optimized out for rwlocks */
> if (rwbase_signal_pending_state(state, current)) {
> __set_current_state(TASK_RUNNING);
[...]
On Wed, Sep 08, 2021 at 02:14:28PM +0200, Peter Zijlstra wrote:
> On Wed, Sep 08, 2021 at 01:51:24PM +0200, Peter Zijlstra wrote:
> > On Wed, Sep 01, 2021 at 11:06:27PM +0800, Boqun Feng wrote:
> > @@ -201,23 +207,30 @@ static int __sched rwbase_write_lock(struct rwbase_rt *rwb,
> > {
> > struct rt_mutex_base *rtm = &rwb->rtmutex;
> > unsigned long flags;
> > + int readers;
> >
> > /* Take the rtmutex as a first step */
> > if (rwbase_rtmutex_lock_state(rtm, state))
> > return -EINTR;
> >
> > /* Force readers into slow path */
> > - atomic_sub(READER_BIAS, &rwb->readers);
> > + readers = atomic_sub_return_relaxed(READER_BIAS, &rwb->readers);
>
> Hurmph... the above really begs for something like
>
> if (!readers)
> return 0;
>
I don't think we can return early here, don't we need to set WRITER_BIAS
to grab the write lock? And we can only do that with ->wait_lock held,
otherwise we race with the slowpath of readers.
Regards,
Boqun
> But then we needs that _acquire() thing again :/
>
On Wed, Sep 08, 2021 at 01:51:24PM +0200, Peter Zijlstra wrote:
[...]
> @@ -201,23 +207,30 @@ static int __sched rwbase_write_lock(struct rwbase_rt *rwb,
> {
> struct rt_mutex_base *rtm = &rwb->rtmutex;
> unsigned long flags;
> + int readers;
>
> /* Take the rtmutex as a first step */
> if (rwbase_rtmutex_lock_state(rtm, state))
> return -EINTR;
>
> /* Force readers into slow path */
> - atomic_sub(READER_BIAS, &rwb->readers);
> + readers = atomic_sub_return_relaxed(READER_BIAS, &rwb->readers);
>
> - raw_spin_lock_irqsave(&rtm->wait_lock, flags);
> /*
> * set_current_state() for rw_semaphore
> * current_save_and_set_rtlock_wait_state() for rwlock
> */
> rwbase_set_and_save_current_state(state);
> + raw_spin_lock_irqsave(&rtm->wait_lock, flags);
>
> - /* Block until all readers have left the critical section. */
> - for (; atomic_read(&rwb->readers);) {
> + /*
> + * Block until all readers have left the critical section.
> + *
> + * In the case of !readers, the above implies TSO ordering
> + * at the very least and hence provides ACQUIRE vs the earlier
> + * atomic_sub_return_relaxed().
> + */
> + while (readers) {
> /* Optimized out for rwlocks */
> if (rwbase_signal_pending_state(state, current)) {
> __set_current_state(TASK_RUNNING);
> @@ -230,8 +243,12 @@ static int __sched rwbase_write_lock(struct rwbase_rt *rwb,
> * Schedule and wait for the readers to leave the critical
> * section. The last reader leaving it wakes the waiter.
> */
> - if (atomic_read(&rwb->readers) != 0)
> + readers = atomic_read(&rwb->readers);
> + if (readers != 0)
> rwbase_schedule();
> + /*
> + * Implies smp_mb() and provides ACQUIRE for the !readers case.
> + */
->readers may get changed to non-zero here, because ->wait_lock is not
held by the writer, and there could be readers in slow-path running.
We need to re-read ->readers after holding ->wait_lock. Otherwise, we
may use an old value of ->readers, and grab a write lock while there
still exists readers.
Regards,
Boqun
> set_current_state(state);
> raw_spin_lock_irqsave(&rtm->wait_lock, flags);
> }
[...]
On Wed, Sep 08, 2021 at 04:41:10PM +0200, Peter Zijlstra wrote:
> Subject: sched/wakeup: Strengthen current_save_and_set_rtlock_wait_state()
>
> While looking at current_save_and_set_rtlock_wait_state() I'm thinking
> it really ought to use smp_store_mb(), because something like:
>
> current_save_and_set_rtlock_wait_state();
> for (;;) {
> if (try_lock())
> break;
> raw_spin_unlock_irq(&lock->wait_lock);
> if (!cond)
> schedule();
> raw_spin_lock_irq(&lock->wait_lock);
> set_current_state(TASK_RTLOCK_WAIT);
> }
> current_restore_rtlock_saved_state();
>
> which is very close to the advertised usage in the comment, is actually
> broken I think:
>
> - try_lock() doesn't need to provide any ordering on failure;
> - raw_spin_unlock() only needs to provide RELEASE ordering;
>
> which gives that the above turns into something like:
>
> WRITE_ONCE(current->__state, TASK_RTLOCK_WAIT);
> raw_spin_unlock(¤t->pi_lock);
> raw_spin_unlock(&lock->wait_lock);
> if (!cond)
>
> and the load of @cond is then allowed to speculate right before the
> __state store, and we've got a missed wakeup -> BAD(tm).
>
> Fixes: 5f220be21418 ("sched/wakeup: Prepare for RT sleeping spin/rwlocks")
> Signed-off-by: Peter Zijlstra (Intel) <[email protected]>
> ---
On top of which we can do your patch like.
---
Subject: lockin/rwbase: Take care of ordering guarantee for fastpath reader
From: Boqun Feng <[email protected]>
Date: Wed, 1 Sep 2021 23:06:27 +0800
From: Boqun Feng <[email protected]>
Readers of rwbase can lock and unlock without taking any inner lock, if
that happens, we need the ordering provided by atomic operations to
satisfy the ordering semantics of lock/unlock. Without that, considering
the follow case:
{ X = 0 initially }
CPU 0 CPU 1
===== =====
rt_write_lock();
X = 1
rt_write_unlock():
atomic_add(READER_BIAS - WRITER_BIAS, ->readers);
// ->readers is READER_BIAS.
rt_read_lock():
if ((r = atomic_read(->readers)) < 0) // True
atomic_try_cmpxchg(->readers, r, r + 1); // succeed.
<acquire the read lock via fast path>
r1 = X; // r1 may be 0, because nothing prevent the reordering
// of "X=1" and atomic_add() on CPU 1.
Therefore audit every usage of atomic operations that may happen in a
fast path, and add necessary barriers.
Signed-off-by: Boqun Feng <[email protected]>
Signed-off-by: Peter Zijlstra (Intel) <[email protected]>
Link: https://lkml.kernel.org/r/[email protected]
---
kernel/locking/rwbase_rt.c | 41 ++++++++++++++++++++++++++++++++++++-----
1 file changed, 36 insertions(+), 5 deletions(-)
--- a/kernel/locking/rwbase_rt.c
+++ b/kernel/locking/rwbase_rt.c
@@ -41,6 +41,12 @@
* The risk of writer starvation is there, but the pathological use cases
* which trigger it are not necessarily the typical RT workloads.
*
+ * Fast-path orderings:
+ * The lock/unlock of readers can run in fast paths: lock and unlock are only
+ * atomic ops, and there is no inner lock to provide ACQUIRE and RELEASE
+ * semantics of rwbase_rt. Atomic ops then should be stronger than _acquire()
+ * and _release() to provide necessary ordering guarantee.
+ *
* Common code shared between RT rw_semaphore and rwlock
*/
@@ -53,6 +59,7 @@ static __always_inline int rwbase_read_t
* set.
*/
for (r = atomic_read(&rwb->readers); r < 0;) {
+ /* Fully-ordered if cmpxchg() succeeds, provides ACQUIRE */
if (likely(atomic_try_cmpxchg(&rwb->readers, &r, r + 1)))
return 1;
}
@@ -162,6 +169,8 @@ static __always_inline void rwbase_read_
/*
* rwb->readers can only hit 0 when a writer is waiting for the
* active readers to leave the critical section.
+ *
+ * dec_and_test() is fully ordered, provides RELEASE.
*/
if (unlikely(atomic_dec_and_test(&rwb->readers)))
__rwbase_read_unlock(rwb, state);
@@ -172,7 +181,11 @@ static inline void __rwbase_write_unlock
{
struct rt_mutex_base *rtm = &rwb->rtmutex;
- atomic_add(READER_BIAS - bias, &rwb->readers);
+ /*
+ * _release() is needed in case that reader is in fast path, pairing
+ * with atomic_try_cmpxchg() in rwbase_read_trylock(), provides RELEASE
+ */
+ (void)atomic_add_return_release(READER_BIAS - bias, &rwb->readers);
raw_spin_unlock_irqrestore(&rtm->wait_lock, flags);
rwbase_rtmutex_unlock(rtm);
}
@@ -201,6 +214,7 @@ static int __sched rwbase_write_lock(str
{
struct rt_mutex_base *rtm = &rwb->rtmutex;
unsigned long flags;
+ int readers;
/* Take the rtmutex as a first step */
if (rwbase_rtmutex_lock_state(rtm, state))
@@ -210,14 +224,23 @@ static int __sched rwbase_write_lock(str
atomic_sub(READER_BIAS, &rwb->readers);
raw_spin_lock_irqsave(&rtm->wait_lock, flags);
+
+ /* The below set_*_state() thingy implies smp_mb() to provide ACQUIRE */
+ readers = atomic_read(&rwb->readers);
/*
* set_current_state() for rw_semaphore
* current_save_and_set_rtlock_wait_state() for rwlock
*/
rwbase_set_and_save_current_state(state);
- /* Block until all readers have left the critical section. */
- for (; atomic_read(&rwb->readers);) {
+ /*
+ * Block until all readers have left the critical section.
+ *
+ * _acqurie() is needed in case that the reader side runs in the fast
+ * path, pairing with the atomic_dec_and_test() in rwbase_read_unlock(),
+ * provides ACQUIRE.
+ */
+ while (readers) {
/* Optimized out for rwlocks */
if (rwbase_signal_pending_state(state, current)) {
__set_current_state(TASK_RUNNING);
@@ -229,8 +252,12 @@ static int __sched rwbase_write_lock(str
/*
* Schedule and wait for the readers to leave the critical
* section. The last reader leaving it wakes the waiter.
+ *
+ * _acquire() is not needed, because we can rely on the smp_mb()
+ * in set_current_state() to provide ACQUIRE.
*/
- if (atomic_read(&rwb->readers) != 0)
+ readers = atomic_read(&rwb->readers);
+ if (readers)
rwbase_schedule();
set_current_state(state);
raw_spin_lock_irqsave(&rtm->wait_lock, flags);
@@ -253,7 +280,11 @@ static inline int rwbase_write_trylock(s
atomic_sub(READER_BIAS, &rwb->readers);
raw_spin_lock_irqsave(&rtm->wait_lock, flags);
- if (!atomic_read(&rwb->readers)) {
+ /*
+ * _acquire() is needed in case reader is in the fast path, pairing with
+ * rwbase_read_unlock(), provides ACQUIRE.
+ */
+ if (!atomic_read_acquire(&rwb->readers)) {
atomic_set(&rwb->readers, WRITER_BIAS);
raw_spin_unlock_irqrestore(&rtm->wait_lock, flags);
return 1;
On Wed, Sep 08, 2021 at 09:08:52PM +0800, Boqun Feng wrote:
> On Wed, Sep 08, 2021 at 01:51:24PM +0200, Peter Zijlstra wrote:
> [...]
> > @@ -201,23 +207,30 @@ static int __sched rwbase_write_lock(struct rwbase_rt *rwb,
> > {
> > struct rt_mutex_base *rtm = &rwb->rtmutex;
> > unsigned long flags;
> > + int readers;
> >
> > /* Take the rtmutex as a first step */
> > if (rwbase_rtmutex_lock_state(rtm, state))
> > return -EINTR;
> >
> > /* Force readers into slow path */
> > - atomic_sub(READER_BIAS, &rwb->readers);
> > + readers = atomic_sub_return_relaxed(READER_BIAS, &rwb->readers);
> >
> > - raw_spin_lock_irqsave(&rtm->wait_lock, flags);
> > /*
> > * set_current_state() for rw_semaphore
> > * current_save_and_set_rtlock_wait_state() for rwlock
> > */
> > rwbase_set_and_save_current_state(state);
>
> rwbase_set_and_save_current_state() may eventually call
> current_save_and_set_rtlock_wait_state(), which requires being called
> with irq-off, while rwbase_write_lock() may be called with irq-on. I
> guess we can change the raw_spin_lock() to raw_spin_lock_irqsave() in
> current_save_and_set_rtlock_wait_state() to solve this.
Oh right... that's actually something I pointed out to Thomas during
review, and I suppose we both forgot about it, or figured it didn't
matter enough.
Oooh, Thomas added that lockdep_assert.. still lemme change that to
match set_special_state().
Also,...
---
Subject: sched/wakeup: Strengthen current_save_and_set_rtlock_wait_state()
While looking at current_save_and_set_rtlock_wait_state() I'm thinking
it really ought to use smp_store_mb(), because something like:
current_save_and_set_rtlock_wait_state();
for (;;) {
if (try_lock())
break;
raw_spin_unlock_irq(&lock->wait_lock);
if (!cond)
schedule();
raw_spin_lock_irq(&lock->wait_lock);
set_current_state(TASK_RTLOCK_WAIT);
}
current_restore_rtlock_saved_state();
which is very close to the advertised usage in the comment, is actually
broken I think:
- try_lock() doesn't need to provide any ordering on failure;
- raw_spin_unlock() only needs to provide RELEASE ordering;
which gives that the above turns into something like:
WRITE_ONCE(current->__state, TASK_RTLOCK_WAIT);
raw_spin_unlock(¤t->pi_lock);
raw_spin_unlock(&lock->wait_lock);
if (!cond)
and the load of @cond is then allowed to speculate right before the
__state store, and we've got a missed wakeup -> BAD(tm).
Fixes: 5f220be21418 ("sched/wakeup: Prepare for RT sleeping spin/rwlocks")
Signed-off-by: Peter Zijlstra (Intel) <[email protected]>
---
diff --git a/include/linux/sched.h b/include/linux/sched.h
index 1780260f237b..3d3246d7e87d 100644
--- a/include/linux/sched.h
+++ b/include/linux/sched.h
@@ -245,7 +245,8 @@ struct task_group;
* if (try_lock())
* break;
* raw_spin_unlock_irq(&lock->wait_lock);
- * schedule_rtlock();
+ * if (!cond)
+ * schedule_rtlock();
* raw_spin_lock_irq(&lock->wait_lock);
* set_current_state(TASK_RTLOCK_WAIT);
* }
@@ -253,22 +254,24 @@ struct task_group;
*/
#define current_save_and_set_rtlock_wait_state() \
do { \
- lockdep_assert_irqs_disabled(); \
- raw_spin_lock(¤t->pi_lock); \
+ unsigned long flags; /* may shadow */ \
+ \
+ raw_spin_lock_irqsave(¤t->pi_lock, flags); \
current->saved_state = current->__state; \
debug_rtlock_wait_set_state(); \
- WRITE_ONCE(current->__state, TASK_RTLOCK_WAIT); \
- raw_spin_unlock(¤t->pi_lock); \
+ smp_store_mb(current->__state, TASK_RTLOCK_WAIT); \
+ raw_spin_unlock_irqrestore(¤t->pi_lock, flags); \
} while (0);
#define current_restore_rtlock_saved_state() \
do { \
- lockdep_assert_irqs_disabled(); \
- raw_spin_lock(¤t->pi_lock); \
+ unsigned long flags; /* may shadow */ \
+ \
+ raw_spin_lock_irqsave(¤t->pi_lock, flags); \
debug_rtlock_wait_restore_state(); \
WRITE_ONCE(current->__state, current->saved_state); \
current->saved_state = TASK_RUNNING; \
- raw_spin_unlock(¤t->pi_lock); \
+ raw_spin_unlock_irqrestore(¤t->pi_lock, flags); \
} while (0);
#define get_current_state() READ_ONCE(current->__state)
On Wed, 08 Sep 2021, Peter Zijlstra wrote:
>Subject: lockin/rwbase: Take care of ordering guarantee for fastpath reader
locking
>From: Boqun Feng <[email protected]>
>Date: Wed, 1 Sep 2021 23:06:27 +0800
>
>From: Boqun Feng <[email protected]>
>
>Readers of rwbase can lock and unlock without taking any inner lock, if
>that happens, we need the ordering provided by atomic operations to
>satisfy the ordering semantics of lock/unlock. Without that, considering
>the follow case:
>
> { X = 0 initially }
>
> CPU 0 CPU 1
> ===== =====
> rt_write_lock();
> X = 1
> rt_write_unlock():
> atomic_add(READER_BIAS - WRITER_BIAS, ->readers);
> // ->readers is READER_BIAS.
> rt_read_lock():
> if ((r = atomic_read(->readers)) < 0) // True
> atomic_try_cmpxchg(->readers, r, r + 1); // succeed.
> <acquire the read lock via fast path>
>
> r1 = X; // r1 may be 0, because nothing prevent the reordering
> // of "X=1" and atomic_add() on CPU 1.
>
>Therefore audit every usage of atomic operations that may happen in a
>fast path, and add necessary barriers.
>
>Signed-off-by: Boqun Feng <[email protected]>
>Signed-off-by: Peter Zijlstra (Intel) <[email protected]>
>Link: https://lkml.kernel.org/r/[email protected]
With a few comments below, feel free to add my:
Reviewed-by: Davidlohr Bueso <[email protected]>
>---
> kernel/locking/rwbase_rt.c | 41 ++++++++++++++++++++++++++++++++++++-----
> 1 file changed, 36 insertions(+), 5 deletions(-)
>
>--- a/kernel/locking/rwbase_rt.c
>+++ b/kernel/locking/rwbase_rt.c
>@@ -41,6 +41,12 @@
> * The risk of writer starvation is there, but the pathological use cases
> * which trigger it are not necessarily the typical RT workloads.
> *
>+ * Fast-path orderings:
>+ * The lock/unlock of readers can run in fast paths: lock and unlock are only
>+ * atomic ops, and there is no inner lock to provide ACQUIRE and RELEASE
>+ * semantics of rwbase_rt. Atomic ops then should be stronger than _acquire()
>+ * and _release() to provide necessary ordering guarantee.
This last part reads funky. Guarantees must be acquire/release or stronger, not
necessarily stronger than.
...
>@@ -210,14 +224,23 @@ static int __sched rwbase_write_lock(str
> atomic_sub(READER_BIAS, &rwb->readers);
>
> raw_spin_lock_irqsave(&rtm->wait_lock, flags);
>+
>+ /* The below set_*_state() thingy implies smp_mb() to provide ACQUIRE */
>+ readers = atomic_read(&rwb->readers);
> /*
> * set_current_state() for rw_semaphore
> * current_save_and_set_rtlock_wait_state() for rwlock
> */
> rwbase_set_and_save_current_state(state);
>
>- /* Block until all readers have left the critical section. */
>- for (; atomic_read(&rwb->readers);) {
>+ /*
>+ * Block until all readers have left the critical section.
>+ *
>+ * _acqurie() is needed in case that the reader side runs in the fast
^acquire
Thanks,
Davidlohr