2021-11-12 04:10:09

by Waiman Long

[permalink] [raw]
Subject: [PATCH v4] locking/rwsem: Make handoff bit handling more consistent

There are some inconsistency in the way that the handoff bit is being
handled in readers and writers.

Firstly, when a queue head writer set the handoff bit, it will clear it
when the writer is being killed or interrupted on its way out without
acquiring the lock. That is not the case for a queue head reader. The
handoff bit will simply be inherited by the next waiter.

Secondly, in the out_nolock path of rwsem_down_read_slowpath(), both
the waiter and handoff bits are cleared if the wait queue becomes empty.
For rwsem_down_write_slowpath(), however, the handoff bit is not checked
and cleared if the wait queue is empty. This can potentially make the
handoff bit set with empty wait queue.

To make the handoff bit handling more consistent and robust,
extract out the rwsem flags handling code into a common
rwsem_out_nolock_clear_flags() function and call it from both the
reader and writer's out_nolock paths. The common function will only
use atomic_long_andnot() to clear bits to avoid possible race condition.
It will also let the next waiter inherit the handoff bit if it had been
set.

This will elminate the handoff bit set with empty wait queue case as
well as the possible race condition that may screw up the count value.

While at it, simplify the trylock for loop in rwsem_down_write_slowpath()
to make it easier to read.

Fixes: 4f23dbc1e657 ("locking/rwsem: Implement lock handoff to prevent lock starvation")
Suggested-by: Peter Zijlstra <[email protected]>
Signed-off-by: Waiman Long <[email protected]>
---
kernel/locking/rwsem.c | 120 ++++++++++++++++-------------------------
1 file changed, 47 insertions(+), 73 deletions(-)

diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c
index c51387a43265..f376e6eeb36a 100644
--- a/kernel/locking/rwsem.c
+++ b/kernel/locking/rwsem.c
@@ -104,10 +104,11 @@
* atomic_long_fetch_add() is used to obtain reader lock, whereas
* atomic_long_cmpxchg() will be used to obtain writer lock.
*
- * There are three places where the lock handoff bit may be set or cleared.
- * 1) rwsem_mark_wake() for readers.
- * 2) rwsem_try_write_lock() for writers.
- * 3) Error path of rwsem_down_write_slowpath().
+ * There are four places where the lock handoff bit may be set or cleared.
+ * 1) rwsem_mark_wake() for readers -- set, clear
+ * 2) rwsem_try_write_lock() for writers -- set, clear
+ * 3) Error path of rwsem_down_write_slowpath() -- clear
+ * 4) Error path of rwsem_down_read_slowpath() -- clear
*
* For all the above cases, wait_lock will be held. A writer must also
* be the first one in the wait_list to be eligible for setting the handoff
@@ -334,6 +335,9 @@ struct rwsem_waiter {
struct task_struct *task;
enum rwsem_waiter_type type;
unsigned long timeout;
+
+ /* Writer only, not initialized in reader */
+ bool handoff_set;
};
#define rwsem_first_waiter(sem) \
list_first_entry(&sem->wait_list, struct rwsem_waiter, list)
@@ -344,12 +348,6 @@ enum rwsem_wake_type {
RWSEM_WAKE_READ_OWNED /* Waker thread holds the read lock */
};

-enum writer_wait_state {
- WRITER_NOT_FIRST, /* Writer is not first in wait list */
- WRITER_FIRST, /* Writer is first in wait list */
- WRITER_HANDOFF /* Writer is first & handoff needed */
-};
-
/*
* The typical HZ value is either 250 or 1000. So set the minimum waiting
* time to at least 4ms or 1 jiffy (if it is higher than 4ms) in the wait
@@ -531,13 +529,11 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
* This function must be called with the sem->wait_lock held to prevent
* race conditions between checking the rwsem wait list and setting the
* sem->count accordingly.
- *
- * If wstate is WRITER_HANDOFF, it will make sure that either the handoff
- * bit is set or the lock is acquired with handoff bit cleared.
*/
static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
- enum writer_wait_state wstate)
+ struct rwsem_waiter *waiter)
{
+ bool first = rwsem_first_waiter(sem) == waiter;
long count, new;

lockdep_assert_held(&sem->wait_lock);
@@ -546,13 +542,17 @@ static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
do {
bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);

- if (has_handoff && wstate == WRITER_NOT_FIRST)
+ if (has_handoff && !first)
return false;

new = count;

if (count & RWSEM_LOCK_MASK) {
- if (has_handoff || (wstate != WRITER_HANDOFF))
+ /* May have inherited a previously set handoff bit */
+ waiter->handoff_set = has_handoff;
+
+ if (has_handoff || (!rt_task(waiter->task) &&
+ !time_after(jiffies, waiter->timeout)))
return false;

new |= RWSEM_FLAG_HANDOFF;
@@ -569,8 +569,11 @@ static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
* We have either acquired the lock with handoff bit cleared or
* set the handoff bit.
*/
- if (new & RWSEM_FLAG_HANDOFF)
+ if (new & RWSEM_FLAG_HANDOFF) {
+ waiter->handoff_set = true;
+ lockevent_inc(rwsem_wlock_handoff);
return false;
+ }

rwsem_set_owner(sem);
return true;
@@ -889,6 +892,20 @@ rwsem_spin_on_owner(struct rw_semaphore *sem)
}
#endif

+/*
+ * Common code to handle rwsem flags in out_nolock path with wait_lock held.
+ * If there is more than one waiter in the queue and the HANDOFF bit is set,
+ * the next waiter will inherit it if the first waiter is removed.
+ */
+static inline void rwsem_out_nolock_clear_flags(struct rw_semaphore *sem,
+ struct rwsem_waiter *waiter)
+{
+ list_del(&waiter->list);
+ if (list_empty(&sem->wait_list))
+ atomic_long_andnot(RWSEM_FLAG_HANDOFF | RWSEM_FLAG_WAITERS,
+ &sem->count);
+}
+
/*
* Wait for the read lock to be granted
*/
@@ -1002,11 +1019,7 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, long count, unsigned int stat
return sem;

out_nolock:
- list_del(&waiter.list);
- if (list_empty(&sem->wait_list)) {
- atomic_long_andnot(RWSEM_FLAG_WAITERS|RWSEM_FLAG_HANDOFF,
- &sem->count);
- }
+ rwsem_out_nolock_clear_flags(sem, &waiter);
raw_spin_unlock_irq(&sem->wait_lock);
__set_current_state(TASK_RUNNING);
lockevent_inc(rwsem_rlock_fail);
@@ -1020,7 +1033,6 @@ static struct rw_semaphore *
rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
{
long count;
- enum writer_wait_state wstate;
struct rwsem_waiter waiter;
struct rw_semaphore *ret = sem;
DEFINE_WAKE_Q(wake_q);
@@ -1038,16 +1050,13 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
waiter.task = current;
waiter.type = RWSEM_WAITING_FOR_WRITE;
waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;
+ waiter.handoff_set = false;

raw_spin_lock_irq(&sem->wait_lock);
-
- /* account for this before adding a new element to the list */
- wstate = list_empty(&sem->wait_list) ? WRITER_FIRST : WRITER_NOT_FIRST;
-
list_add_tail(&waiter.list, &sem->wait_list);

/* we're now waiting on the lock */
- if (wstate == WRITER_NOT_FIRST) {
+ if (rwsem_first_waiter(sem) != &waiter) {
count = atomic_long_read(&sem->count);

/*
@@ -1083,13 +1092,16 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
/* wait until we successfully acquire the lock */
set_current_state(state);
for (;;) {
- if (rwsem_try_write_lock(sem, wstate)) {
+ if (rwsem_try_write_lock(sem, &waiter)) {
/* rwsem_try_write_lock() implies ACQUIRE on success */
break;
}

raw_spin_unlock_irq(&sem->wait_lock);

+ if (signal_pending_state(state, current))
+ goto out_nolock;
+
/*
* After setting the handoff bit and failing to acquire
* the lock, attempt to spin on owner to accelerate lock
@@ -1098,7 +1110,7 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
* In this case, we attempt to acquire the lock again
* without sleeping.
*/
- if (wstate == WRITER_HANDOFF) {
+ if (waiter.handoff_set) {
enum owner_state owner_state;

preempt_disable();
@@ -1109,40 +1121,9 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
goto trylock_again;
}

- /* Block until there are no active lockers. */
- for (;;) {
- if (signal_pending_state(state, current))
- goto out_nolock;
-
- schedule();
- lockevent_inc(rwsem_sleep_writer);
- set_current_state(state);
- /*
- * If HANDOFF bit is set, unconditionally do
- * a trylock.
- */
- if (wstate == WRITER_HANDOFF)
- break;
-
- if ((wstate == WRITER_NOT_FIRST) &&
- (rwsem_first_waiter(sem) == &waiter))
- wstate = WRITER_FIRST;
-
- count = atomic_long_read(&sem->count);
- if (!(count & RWSEM_LOCK_MASK))
- break;
-
- /*
- * The setting of the handoff bit is deferred
- * until rwsem_try_write_lock() is called.
- */
- if ((wstate == WRITER_FIRST) && (rt_task(current) ||
- time_after(jiffies, waiter.timeout))) {
- wstate = WRITER_HANDOFF;
- lockevent_inc(rwsem_wlock_handoff);
- break;
- }
- }
+ schedule();
+ lockevent_inc(rwsem_sleep_writer);
+ set_current_state(state);
trylock_again:
raw_spin_lock_irq(&sem->wait_lock);
}
@@ -1156,19 +1137,12 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
out_nolock:
__set_current_state(TASK_RUNNING);
raw_spin_lock_irq(&sem->wait_lock);
- list_del(&waiter.list);
-
- if (unlikely(wstate == WRITER_HANDOFF))
- atomic_long_add(-RWSEM_FLAG_HANDOFF, &sem->count);
-
- if (list_empty(&sem->wait_list))
- atomic_long_andnot(RWSEM_FLAG_WAITERS, &sem->count);
- else
+ rwsem_out_nolock_clear_flags(sem, &waiter);
+ if (!list_empty(&sem->wait_list))
rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
raw_spin_unlock_irq(&sem->wait_lock);
wake_up_q(&wake_q);
lockevent_inc(rwsem_wlock_fail);
-
return ERR_PTR(-EINTR);
}

--
2.27.0



2021-11-12 12:11:28

by Peter Zijlstra

[permalink] [raw]
Subject: Re: [PATCH v4] locking/rwsem: Make handoff bit handling more consistent

On Thu, Nov 11, 2021 at 11:07:53PM -0500, Waiman Long wrote:
> @@ -889,6 +892,20 @@ rwsem_spin_on_owner(struct rw_semaphore *sem)
> }
> #endif
>
> +/*
> + * Common code to handle rwsem flags in out_nolock path with wait_lock held.
> + * If there is more than one waiter in the queue and the HANDOFF bit is set,
> + * the next waiter will inherit it if the first waiter is removed.
> + */
> +static inline void rwsem_out_nolock_clear_flags(struct rw_semaphore *sem,
> + struct rwsem_waiter *waiter)

I'm going to rename that, it doesn't just clear flags, it dequeues the
waiter.

Argh, rwsem_mark_wake() doesn't clear HANDOFF when list_empty(), and
write_slowpath() is *far* too clever about all of this.


> +{
> + list_del(&waiter->list);
if (list_empty(&sem->wait_list)) {
> + atomic_long_andnot(RWSEM_FLAG_HANDOFF | RWSEM_FLAG_WAITERS,
> + &sem->count);
}
> +}



> @@ -1098,7 +1110,7 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
> * In this case, we attempt to acquire the lock again
> * without sleeping.
> */
> - if (wstate == WRITER_HANDOFF) {
> + if (waiter.handoff_set) {

I'm thinking this wants to be something like:

if (rwsem_first_waiter(sem) == &waiter && waiter.handoff_set) {

> enum owner_state owner_state;
>
> preempt_disable();

How's this (on top) then?

---
--- a/kernel/locking/rwsem.c
+++ b/kernel/locking/rwsem.c
@@ -104,11 +104,10 @@
* atomic_long_fetch_add() is used to obtain reader lock, whereas
* atomic_long_cmpxchg() will be used to obtain writer lock.
*
- * There are four places where the lock handoff bit may be set or cleared.
- * 1) rwsem_mark_wake() for readers -- set, clear
- * 2) rwsem_try_write_lock() for writers -- set, clear
- * 3) Error path of rwsem_down_write_slowpath() -- clear
- * 4) Error path of rwsem_down_read_slowpath() -- clear
+ * There are three places where the lock handoff bit may be set or cleared.
+ * 1) rwsem_mark_wake() for readers -- set, clear
+ * 2) rwsem_try_write_lock() for writers -- set, clear
+ * 3) rwsem_del_waiter() -- clear
*
* For all the above cases, wait_lock will be held. A writer must also
* be the first one in the wait_list to be eligible for setting the handoff
@@ -363,6 +362,31 @@ enum rwsem_wake_type {
*/
#define MAX_READERS_WAKEUP 0x100

+static inline void
+rwsem_add_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter)
+{
+ lockdep_assert_held(&sem->wait_lock);
+ list_add_tail(&waiter->list, &sem->wait_list);
+ /* caller will set RWSEM_FLAG_WAITERS */
+}
+
+/*
+ * Remove a waiter from the wait_list and clear flags.
+ *
+ * Both rwsem_mark_wake() and rwsem_try_write_lock() contain a full 'copy' of
+ * this function. Modify with care.
+ */
+static inline void
+rwsem_del_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter)
+{
+ lockdep_assert_held(&sem->wait_lock);
+ list_del(&waiter->list);
+ if (likely(!list_empty(&sem->wait_list)))
+ return;
+
+ atomic_long_andnot(RWSEM_FLAG_HANDOFF | RWSEM_FLAG_WAITERS, &sem->count);
+}
+
/*
* handle the lock release when processes blocked on it that can now run
* - if we come here from up_xxxx(), then the RWSEM_FLAG_WAITERS bit must
@@ -374,6 +398,8 @@ enum rwsem_wake_type {
* preferably when the wait_lock is released
* - woken process blocks are discarded from the list after having task zeroed
* - writers are only marked woken if downgrading is false
+ *
+ * Implies rwsem_del_waiter() for all woken readers.
*/
static void rwsem_mark_wake(struct rw_semaphore *sem,
enum rwsem_wake_type wake_type,
@@ -488,18 +514,25 @@ static void rwsem_mark_wake(struct rw_se

adjustment = woken * RWSEM_READER_BIAS - adjustment;
lockevent_cond_inc(rwsem_wake_reader, woken);
+
+ oldcount = atomic_long_read(&sem->count);
if (list_empty(&sem->wait_list)) {
- /* hit end of list above */
+ /*
+ * Combined with list_move_tail() above, this implies
+ * rwsem_del_waiter().
+ */
adjustment -= RWSEM_FLAG_WAITERS;
+ if (oldcount & RWSEM_FLAG_HANDOFF)
+ adjustment -= RWSEM_FLAG_HANDOFF;
+ } else if (woken) {
+ /*
+ * When we've woken a reader, we no longer need to force
+ * writers to give up the lock and we can clear HANDOFF.
+ */
+ if (oldcount & RWSEM_FLAG_HANDOFF)
+ adjustment -= RWSEM_FLAG_HANDOFF;
}

- /*
- * When we've woken a reader, we no longer need to force writers
- * to give up the lock and we can clear HANDOFF.
- */
- if (woken && (atomic_long_read(&sem->count) & RWSEM_FLAG_HANDOFF))
- adjustment -= RWSEM_FLAG_HANDOFF;
-
if (adjustment)
atomic_long_add(adjustment, &sem->count);

@@ -529,6 +562,8 @@ static void rwsem_mark_wake(struct rw_se
* This function must be called with the sem->wait_lock held to prevent
* race conditions between checking the rwsem wait list and setting the
* sem->count accordingly.
+ *
+ * Implies rwsem_del_waiter() on success.
*/
static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
struct rwsem_waiter *waiter)
@@ -575,6 +610,11 @@ static inline bool rwsem_try_write_lock(
return false;
}

+ /*
+ * Have rwsem_try_write_lock() fully imply rwsem_del_waiter() on
+ * success.
+ */
+ list_del(&waiter->list);
rwsem_set_owner(sem);
return true;
}
@@ -893,20 +933,6 @@ rwsem_spin_on_owner(struct rw_semaphore
#endif

/*
- * Common code to handle rwsem flags in out_nolock path with wait_lock held.
- * If there is more than one waiter in the queue and the HANDOFF bit is set,
- * the next waiter will inherit it if the first waiter is removed.
- */
-static inline void rwsem_out_nolock_clear_flags(struct rw_semaphore *sem,
- struct rwsem_waiter *waiter)
-{
- list_del(&waiter->list);
- if (list_empty(&sem->wait_list))
- atomic_long_andnot(RWSEM_FLAG_HANDOFF | RWSEM_FLAG_WAITERS,
- &sem->count);
-}
-
-/*
* Wait for the read lock to be granted
*/
static struct rw_semaphore __sched *
@@ -973,7 +999,7 @@ rwsem_down_read_slowpath(struct rw_semap
}
adjustment += RWSEM_FLAG_WAITERS;
}
- list_add_tail(&waiter.list, &sem->wait_list);
+ rwsem_add_waiter(sem, &waiter);

/* we're now waiting on the lock, but no longer actively locking */
count = atomic_long_add_return(adjustment, &sem->count);
@@ -1019,7 +1045,7 @@ rwsem_down_read_slowpath(struct rw_semap
return sem;

out_nolock:
- rwsem_out_nolock_clear_flags(sem, &waiter);
+ rwsem_del_waiter(sem, &waiter);
raw_spin_unlock_irq(&sem->wait_lock);
__set_current_state(TASK_RUNNING);
lockevent_inc(rwsem_rlock_fail);
@@ -1034,7 +1060,6 @@ rwsem_down_write_slowpath(struct rw_sema
{
long count;
struct rwsem_waiter waiter;
- struct rw_semaphore *ret = sem;
DEFINE_WAKE_Q(wake_q);

/* do optimistic spinning and steal lock if possible */
@@ -1053,7 +1078,7 @@ rwsem_down_write_slowpath(struct rw_sema
waiter.handoff_set = false;

raw_spin_lock_irq(&sem->wait_lock);
- list_add_tail(&waiter.list, &sem->wait_list);
+ rwsem_add_waiter(sem, &waiter);

/* we're now waiting on the lock */
if (rwsem_first_waiter(sem) != &waiter) {
@@ -1110,7 +1135,7 @@ rwsem_down_write_slowpath(struct rw_sema
* In this case, we attempt to acquire the lock again
* without sleeping.
*/
- if (waiter.handoff_set) {
+ if (rwsem_first_waiter(sem) == &waiter && waiter.handoff_set) {
enum owner_state owner_state;

preempt_disable();
@@ -1128,16 +1153,14 @@ rwsem_down_write_slowpath(struct rw_sema
raw_spin_lock_irq(&sem->wait_lock);
}
__set_current_state(TASK_RUNNING);
- list_del(&waiter.list);
raw_spin_unlock_irq(&sem->wait_lock);
lockevent_inc(rwsem_wlock);
-
- return ret;
+ return sem;

out_nolock:
__set_current_state(TASK_RUNNING);
raw_spin_lock_irq(&sem->wait_lock);
- rwsem_out_nolock_clear_flags(sem, &waiter);
+ rwsem_del_waiter(sem, &waiter);
if (!list_empty(&sem->wait_list))
rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
raw_spin_unlock_irq(&sem->wait_lock);

2021-11-15 03:39:10

by Waiman Long

[permalink] [raw]
Subject: Re: [PATCH v4] locking/rwsem: Make handoff bit handling more consistent

On 11/12/21 07:10, Peter Zijlstra wrote:
> On Thu, Nov 11, 2021 at 11:07:53PM -0500, Waiman Long wrote:
>> @@ -889,6 +892,20 @@ rwsem_spin_on_owner(struct rw_semaphore *sem)
>> }
>> #endif
>>
>> +/*
>> + * Common code to handle rwsem flags in out_nolock path with wait_lock held.
>> + * If there is more than one waiter in the queue and the HANDOFF bit is set,
>> + * the next waiter will inherit it if the first waiter is removed.
>> + */
>> +static inline void rwsem_out_nolock_clear_flags(struct rw_semaphore *sem,
>> + struct rwsem_waiter *waiter)
> I'm going to rename that, it doesn't just clear flags, it dequeues the
> waiter.
>
> Argh, rwsem_mark_wake() doesn't clear HANDOFF when list_empty(), and
> write_slowpath() is *far* too clever about all of this.
rwsem_mark_wake() does clear the HANDOFF flag if it was set.
>
>
>> +{
>> + list_del(&waiter->list);
> if (list_empty(&sem->wait_list)) {
>> + atomic_long_andnot(RWSEM_FLAG_HANDOFF | RWSEM_FLAG_WAITERS,
>> + &sem->count);
> }
>> +}
>
>
>> @@ -1098,7 +1110,7 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
>> * In this case, we attempt to acquire the lock again
>> * without sleeping.
>> */
>> - if (wstate == WRITER_HANDOFF) {
>> + if (waiter.handoff_set) {
> I'm thinking this wants to be something like:
>
> if (rwsem_first_waiter(sem) == &waiter && waiter.handoff_set) {
handoff_set flag is only set when the waiter becomes the first.
>
>> enum owner_state owner_state;
>>
>> preempt_disable();
> How's this (on top) then?
>
> ---
> --- a/kernel/locking/rwsem.c
> +++ b/kernel/locking/rwsem.c
> @@ -104,11 +104,10 @@
> * atomic_long_fetch_add() is used to obtain reader lock, whereas
> * atomic_long_cmpxchg() will be used to obtain writer lock.
> *
> - * There are four places where the lock handoff bit may be set or cleared.
> - * 1) rwsem_mark_wake() for readers -- set, clear
> - * 2) rwsem_try_write_lock() for writers -- set, clear
> - * 3) Error path of rwsem_down_write_slowpath() -- clear
> - * 4) Error path of rwsem_down_read_slowpath() -- clear
> + * There are three places where the lock handoff bit may be set or cleared.
> + * 1) rwsem_mark_wake() for readers -- set, clear
> + * 2) rwsem_try_write_lock() for writers -- set, clear
> + * 3) rwsem_del_waiter() -- clear
> *
> * For all the above cases, wait_lock will be held. A writer must also
> * be the first one in the wait_list to be eligible for setting the handoff
> @@ -363,6 +362,31 @@ enum rwsem_wake_type {
> */
> #define MAX_READERS_WAKEUP 0x100
>
> +static inline void
> +rwsem_add_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter)
> +{
> + lockdep_assert_held(&sem->wait_lock);
> + list_add_tail(&waiter->list, &sem->wait_list);
> + /* caller will set RWSEM_FLAG_WAITERS */
> +}
> +
> +/*
> + * Remove a waiter from the wait_list and clear flags.
> + *
> + * Both rwsem_mark_wake() and rwsem_try_write_lock() contain a full 'copy' of
> + * this function. Modify with care.
> + */
> +static inline void
> +rwsem_del_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter)
> +{
> + lockdep_assert_held(&sem->wait_lock);
> + list_del(&waiter->list);
> + if (likely(!list_empty(&sem->wait_list)))
> + return;
> +
> + atomic_long_andnot(RWSEM_FLAG_HANDOFF | RWSEM_FLAG_WAITERS, &sem->count);
> +}
> +
> /*
> * handle the lock release when processes blocked on it that can now run
> * - if we come here from up_xxxx(), then the RWSEM_FLAG_WAITERS bit must
> @@ -374,6 +398,8 @@ enum rwsem_wake_type {
> * preferably when the wait_lock is released
> * - woken process blocks are discarded from the list after having task zeroed
> * - writers are only marked woken if downgrading is false
> + *
> + * Implies rwsem_del_waiter() for all woken readers.
> */
> static void rwsem_mark_wake(struct rw_semaphore *sem,
> enum rwsem_wake_type wake_type,
> @@ -488,18 +514,25 @@ static void rwsem_mark_wake(struct rw_se
>
> adjustment = woken * RWSEM_READER_BIAS - adjustment;
> lockevent_cond_inc(rwsem_wake_reader, woken);
> +
> + oldcount = atomic_long_read(&sem->count);
> if (list_empty(&sem->wait_list)) {
> - /* hit end of list above */
> + /*
> + * Combined with list_move_tail() above, this implies
> + * rwsem_del_waiter().
> + */
> adjustment -= RWSEM_FLAG_WAITERS;
> + if (oldcount & RWSEM_FLAG_HANDOFF)
> + adjustment -= RWSEM_FLAG_HANDOFF;
> + } else if (woken) {
> + /*
> + * When we've woken a reader, we no longer need to force
> + * writers to give up the lock and we can clear HANDOFF.
> + */
> + if (oldcount & RWSEM_FLAG_HANDOFF)
> + adjustment -= RWSEM_FLAG_HANDOFF;
> }
>
> - /*
> - * When we've woken a reader, we no longer need to force writers
> - * to give up the lock and we can clear HANDOFF.
> - */
> - if (woken && (atomic_long_read(&sem->count) & RWSEM_FLAG_HANDOFF))
> - adjustment -= RWSEM_FLAG_HANDOFF;
> -
> if (adjustment)
> atomic_long_add(adjustment, &sem->count);
>
> @@ -529,6 +562,8 @@ static void rwsem_mark_wake(struct rw_se
> * This function must be called with the sem->wait_lock held to prevent
> * race conditions between checking the rwsem wait list and setting the
> * sem->count accordingly.
> + *
> + * Implies rwsem_del_waiter() on success.
> */
> static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
> struct rwsem_waiter *waiter)
> @@ -575,6 +610,11 @@ static inline bool rwsem_try_write_lock(
> return false;
> }
>
> + /*
> + * Have rwsem_try_write_lock() fully imply rwsem_del_waiter() on
> + * success.
> + */
> + list_del(&waiter->list);
> rwsem_set_owner(sem);
> return true;
> }
> @@ -893,20 +933,6 @@ rwsem_spin_on_owner(struct rw_semaphore
> #endif
>
> /*
> - * Common code to handle rwsem flags in out_nolock path with wait_lock held.
> - * If there is more than one waiter in the queue and the HANDOFF bit is set,
> - * the next waiter will inherit it if the first waiter is removed.
> - */
> -static inline void rwsem_out_nolock_clear_flags(struct rw_semaphore *sem,
> - struct rwsem_waiter *waiter)
> -{
> - list_del(&waiter->list);
> - if (list_empty(&sem->wait_list))
> - atomic_long_andnot(RWSEM_FLAG_HANDOFF | RWSEM_FLAG_WAITERS,
> - &sem->count);
> -}
> -
> -/*
> * Wait for the read lock to be granted
> */
> static struct rw_semaphore __sched *
> @@ -973,7 +999,7 @@ rwsem_down_read_slowpath(struct rw_semap
> }
> adjustment += RWSEM_FLAG_WAITERS;
> }
> - list_add_tail(&waiter.list, &sem->wait_list);
> + rwsem_add_waiter(sem, &waiter);
>
> /* we're now waiting on the lock, but no longer actively locking */
> count = atomic_long_add_return(adjustment, &sem->count);
> @@ -1019,7 +1045,7 @@ rwsem_down_read_slowpath(struct rw_semap
> return sem;
>
> out_nolock:
> - rwsem_out_nolock_clear_flags(sem, &waiter);
> + rwsem_del_waiter(sem, &waiter);
> raw_spin_unlock_irq(&sem->wait_lock);
> __set_current_state(TASK_RUNNING);
> lockevent_inc(rwsem_rlock_fail);
> @@ -1034,7 +1060,6 @@ rwsem_down_write_slowpath(struct rw_sema
> {
> long count;
> struct rwsem_waiter waiter;
> - struct rw_semaphore *ret = sem;
> DEFINE_WAKE_Q(wake_q);
>
> /* do optimistic spinning and steal lock if possible */
> @@ -1053,7 +1078,7 @@ rwsem_down_write_slowpath(struct rw_sema
> waiter.handoff_set = false;
>
> raw_spin_lock_irq(&sem->wait_lock);
> - list_add_tail(&waiter.list, &sem->wait_list);
> + rwsem_add_waiter(sem, &waiter);
>
> /* we're now waiting on the lock */
> if (rwsem_first_waiter(sem) != &waiter) {
> @@ -1110,7 +1135,7 @@ rwsem_down_write_slowpath(struct rw_sema
> * In this case, we attempt to acquire the lock again
> * without sleeping.
> */
> - if (waiter.handoff_set) {
> + if (rwsem_first_waiter(sem) == &waiter && waiter.handoff_set) {
> enum owner_state owner_state;
>
> preempt_disable();
> @@ -1128,16 +1153,14 @@ rwsem_down_write_slowpath(struct rw_sema
> raw_spin_lock_irq(&sem->wait_lock);
> }
> __set_current_state(TASK_RUNNING);
> - list_del(&waiter.list);
+    rwsem_del_waiter(sem, &waiters); ?
> raw_spin_unlock_irq(&sem->wait_lock);
> lockevent_inc(rwsem_wlock);
> -
> - return ret;
> + return sem;
>
> out_nolock:
> __set_current_state(TASK_RUNNING);
> raw_spin_lock_irq(&sem->wait_lock);
> - rwsem_out_nolock_clear_flags(sem, &waiter);
> + rwsem_del_waiter(sem, &waiter);
> if (!list_empty(&sem->wait_list))
> rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
> raw_spin_unlock_irq(&sem->wait_lock);
>
Sorry for the late reply as I was busy on other works.

I like the idea. I will incorporate in a new patch tomorrow.

Cheers,
Longman


2021-11-15 15:46:10

by Aiqun Yu (Maria)

[permalink] [raw]
Subject: Re: [PATCH v4] locking/rwsem: Make handoff bit handling more consistent

On 11/12/2021 8:10 PM, Peter Zijlstra wrote:
> On Thu, Nov 11, 2021 at 11:07:53PM -0500, Waiman Long wrote:
>> @@ -889,6 +892,20 @@ rwsem_spin_on_owner(struct rw_semaphore *sem)
>> }
>> #endif
>>
>> +/*
>> + * Common code to handle rwsem flags in out_nolock path with wait_lock held.
>> + * If there is more than one waiter in the queue and the HANDOFF bit is set,
>> + * the next waiter will inherit it if the first waiter is removed.
>> + */
>> +static inline void rwsem_out_nolock_clear_flags(struct rw_semaphore *sem,
>> + struct rwsem_waiter *waiter)
>
> I'm going to rename that, it doesn't just clear flags, it dequeues the
> waiter.
>
> Argh, rwsem_mark_wake() doesn't clear HANDOFF when list_empty(), and
> write_slowpath() is *far* too clever about all of this.
>
>
>> +{
>> + list_del(&waiter->list);
> if (list_empty(&sem->wait_list)) {
>> + atomic_long_andnot(RWSEM_FLAG_HANDOFF | RWSEM_FLAG_WAITERS,
>> + &sem->count);
> }
>> +}
>
>
>
>> @@ -1098,7 +1110,7 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
>> * In this case, we attempt to acquire the lock again
>> * without sleeping.
>> */
>> - if (wstate == WRITER_HANDOFF) {
>> + if (waiter.handoff_set) {
>
> I'm thinking this wants to be something like:
>
> if (rwsem_first_waiter(sem) == &waiter && waiter.handoff_set) {
>
>> enum owner_state owner_state;
>>
>> preempt_disable();
>
> How's this (on top) then?
>
> ---
> --- a/kernel/locking/rwsem.c
> +++ b/kernel/locking/rwsem.c
> @@ -104,11 +104,10 @@
> * atomic_long_fetch_add() is used to obtain reader lock, whereas
> * atomic_long_cmpxchg() will be used to obtain writer lock.
> *
> - * There are four places where the lock handoff bit may be set or cleared.
> - * 1) rwsem_mark_wake() for readers -- set, clear
> - * 2) rwsem_try_write_lock() for writers -- set, clear
> - * 3) Error path of rwsem_down_write_slowpath() -- clear
> - * 4) Error path of rwsem_down_read_slowpath() -- clear
> + * There are three places where the lock handoff bit may be set or cleared.
> + * 1) rwsem_mark_wake() for readers -- set, clear
> + * 2) rwsem_try_write_lock() for writers -- set, clear
> + * 3) rwsem_del_waiter() -- clear
> *
> * For all the above cases, wait_lock will be held. A writer must also
> * be the first one in the wait_list to be eligible for setting the handoff
> @@ -363,6 +362,31 @@ enum rwsem_wake_type {
> */
> #define MAX_READERS_WAKEUP 0x100
>
> +static inline void
> +rwsem_add_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter)
> +{
> + lockdep_assert_held(&sem->wait_lock);
> + list_add_tail(&waiter->list, &sem->wait_list);
> + /* caller will set RWSEM_FLAG_WAITERS */
/* each time a waiter is just added in to the list,
* handoff_set initialed as false. */
waiter->handoff_set = false;
> +}
> +
> +/*
> + * Remove a waiter from the wait_list and clear flags.
> + *
> + * Both rwsem_mark_wake() and rwsem_try_write_lock() contain a full 'copy' of
> + * this function. Modify with care.
> + */
> +static inline void
> +rwsem_del_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter)
> +{
> + lockdep_assert_held(&sem->wait_lock);
> + list_del(&waiter->list);
what about avoid unnecessary inherit of handoff bit for waiters?

if (waiter->handoff_set)
atomic_long_andnot(RWSEM_FLAG_HANDOFF, &sem->count);
> + if (likely(!list_empty(&sem->wait_list)))
> + return;
> +
> + atomic_long_andnot(RWSEM_FLAG_HANDOFF | RWSEM_FLAG_WAITERS, &sem->count);
> +}
> +
> /*
> * handle the lock release when processes blocked on it that can now run
> * - if we come here from up_xxxx(), then the RWSEM_FLAG_WAITERS bit must
> @@ -374,6 +398,8 @@ enum rwsem_wake_type {
> * preferably when the wait_lock is released
> * - woken process blocks are discarded from the list after having task zeroed
> * - writers are only marked woken if downgrading is false
> + *
> + * Implies rwsem_del_waiter() for all woken readers.
> */
> static void rwsem_mark_wake(struct rw_semaphore *sem,
> enum rwsem_wake_type wake_type,
> @@ -488,18 +514,25 @@ static void rwsem_mark_wake(struct rw_se
>
> adjustment = woken * RWSEM_READER_BIAS - adjustment;
> lockevent_cond_inc(rwsem_wake_reader, woken);
> +
> + oldcount = atomic_long_read(&sem->count);
> if (list_empty(&sem->wait_list)) {
> - /* hit end of list above */
> + /*
> + * Combined with list_move_tail() above, this implies
> + * rwsem_del_waiter().
> + */
> adjustment -= RWSEM_FLAG_WAITERS;
> + if (oldcount & RWSEM_FLAG_HANDOFF)
> + adjustment -= RWSEM_FLAG_HANDOFF;
> + } else if (woken) {
> + /*
> + * When we've woken a reader, we no longer need to force
> + * writers to give up the lock and we can clear HANDOFF.
> + */
> + if (oldcount & RWSEM_FLAG_HANDOFF)
> + adjustment -= RWSEM_FLAG_HANDOFF;
> }
>
> - /*
> - * When we've woken a reader, we no longer need to force writers
> - * to give up the lock and we can clear HANDOFF.
> - */
> - if (woken && (atomic_long_read(&sem->count) & RWSEM_FLAG_HANDOFF))
> - adjustment -= RWSEM_FLAG_HANDOFF;
> -
> if (adjustment)
> atomic_long_add(adjustment, &sem->count);
>
> @@ -529,6 +562,8 @@ static void rwsem_mark_wake(struct rw_se
> * This function must be called with the sem->wait_lock held to prevent
> * race conditions between checking the rwsem wait list and setting the
> * sem->count accordingly.
> + *
> + * Implies rwsem_del_waiter() on success.
> */
> static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
> struct rwsem_waiter *waiter)
> @@ -575,6 +610,11 @@ static inline bool rwsem_try_write_lock(
> return false;
> }
>
> + /*
> + * Have rwsem_try_write_lock() fully imply rwsem_del_waiter() on
> + * success.
> + */
> + list_del(&waiter->list);
> rwsem_set_owner(sem);
> return true;
> }
> @@ -893,20 +933,6 @@ rwsem_spin_on_owner(struct rw_semaphore
> #endif
>
> /*
> - * Common code to handle rwsem flags in out_nolock path with wait_lock held.
> - * If there is more than one waiter in the queue and the HANDOFF bit is set,
> - * the next waiter will inherit it if the first waiter is removed.
> - */
> -static inline void rwsem_out_nolock_clear_flags(struct rw_semaphore *sem,
> - struct rwsem_waiter *waiter)
> -{
> - list_del(&waiter->list);
> - if (list_empty(&sem->wait_list))
> - atomic_long_andnot(RWSEM_FLAG_HANDOFF | RWSEM_FLAG_WAITERS,
> - &sem->count);
> -}
> -
> -/*
> * Wait for the read lock to be granted
> */
> static struct rw_semaphore __sched *
> @@ -973,7 +999,7 @@ rwsem_down_read_slowpath(struct rw_semap
> }
> adjustment += RWSEM_FLAG_WAITERS;
> }
> - list_add_tail(&waiter.list, &sem->wait_list);
> + rwsem_add_waiter(sem, &waiter);
>
> /* we're now waiting on the lock, but no longer actively locking */
> count = atomic_long_add_return(adjustment, &sem->count);
> @@ -1019,7 +1045,7 @@ rwsem_down_read_slowpath(struct rw_semap
> return sem;
>
> out_nolock:
> - rwsem_out_nolock_clear_flags(sem, &waiter);
> + rwsem_del_waiter(sem, &waiter);
> raw_spin_unlock_irq(&sem->wait_lock);
> __set_current_state(TASK_RUNNING);
> lockevent_inc(rwsem_rlock_fail);
> @@ -1034,7 +1060,6 @@ rwsem_down_write_slowpath(struct rw_sema
> {
> long count;
> struct rwsem_waiter waiter;
> - struct rw_semaphore *ret = sem;
> DEFINE_WAKE_Q(wake_q);
>
> /* do optimistic spinning and steal lock if possible */
> @@ -1053,7 +1078,7 @@ rwsem_down_write_slowpath(struct rw_sema
> waiter.handoff_set = false;
>
> raw_spin_lock_irq(&sem->wait_lock);
> - list_add_tail(&waiter.list, &sem->wait_list);
> + rwsem_add_waiter(sem, &waiter);
>
> /* we're now waiting on the lock */
> if (rwsem_first_waiter(sem) != &waiter) {
> @@ -1110,7 +1135,7 @@ rwsem_down_write_slowpath(struct rw_sema
> * In this case, we attempt to acquire the lock again
> * without sleeping.
> */
> - if (waiter.handoff_set) {
> + if (rwsem_first_waiter(sem) == &waiter && waiter.handoff_set) {
> enum owner_state owner_state;
>
> preempt_disable();
> @@ -1128,16 +1153,14 @@ rwsem_down_write_slowpath(struct rw_sema
> raw_spin_lock_irq(&sem->wait_lock);
> }
> __set_current_state(TASK_RUNNING);
> - list_del(&waiter.list);
> raw_spin_unlock_irq(&sem->wait_lock);
> lockevent_inc(rwsem_wlock);
> -
> - return ret;
> + return sem;
>
> out_nolock:
> __set_current_state(TASK_RUNNING);
> raw_spin_lock_irq(&sem->wait_lock);
> - rwsem_out_nolock_clear_flags(sem, &waiter);
> + rwsem_del_waiter(sem, &waiter);
> if (!list_empty(&sem->wait_list))
> rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
> raw_spin_unlock_irq(&sem->wait_lock);
>

what about avoid unnecessary inherit of handoff bit for waiters?
1. when set handoff bit also set waiter.handoff_set as true;
2. when clear handoff bit also set waiter.handoff_set as false.
3. And rwsem_add_waiter initial as false;
4. And rwsem_del_waiter also can clear the handoff bit according to
waiter.handoff_set.

Because handoff bit can have better performance if correctly set and
cleared.

--
Thx and BRs,
Aiqun(Maria) Yu

2021-11-15 16:03:47

by Peter Zijlstra

[permalink] [raw]
Subject: Re: [PATCH v4] locking/rwsem: Make handoff bit handling more consistent

On Sun, Nov 14, 2021 at 10:38:57PM -0500, Waiman Long wrote:
> On 11/12/21 07:10, Peter Zijlstra wrote:

> > Argh, rwsem_mark_wake() doesn't clear HANDOFF when list_empty(), and
> > write_slowpath() is *far* too clever about all of this.
> rwsem_mark_wake() does clear the HANDOFF flag if it was set.

Argh, yeah, I got confused by the whole !woken case, but that case won't
ever hit list_empty() either. Perhaps that stuff could use a bit of a
reflow too.


> > > @@ -1098,7 +1110,7 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
> > > * In this case, we attempt to acquire the lock again
> > > * without sleeping.
> > > */
> > > - if (wstate == WRITER_HANDOFF) {
> > > + if (waiter.handoff_set) {
> > I'm thinking this wants to be something like:
> >
> > if (rwsem_first_waiter(sem) == &waiter && waiter.handoff_set) {
> handoff_set flag is only set when the waiter becomes the first.

Yes, but a random waiter can wake up and see it be set and also start
spinning.

> >
> > > enum owner_state owner_state;
> > > preempt_disable();

> > @@ -575,6 +610,11 @@ static inline bool rwsem_try_write_lock(
> > return false;
> > }
> > + /*
> > + * Have rwsem_try_write_lock() fully imply rwsem_del_waiter() on
> > + * success.
> > + */
> > + list_del(&waiter->list);
> > rwsem_set_owner(sem);
> > return true;
> > }

> > @@ -1128,16 +1153,14 @@ rwsem_down_write_slowpath(struct rw_sema
> > raw_spin_lock_irq(&sem->wait_lock);
> > }
> > __set_current_state(TASK_RUNNING);
> > - list_del(&waiter.list);
> +??? rwsem_del_waiter(sem, &waiters); ?

I tried that, but then we get an extra atomic in this path. As is I made
try_write_lock() do the full del_waiter, see the hunk above.

> > raw_spin_unlock_irq(&sem->wait_lock);
> > lockevent_inc(rwsem_wlock);
> > -
> > - return ret;
> > + return sem;
> > out_nolock:
> > __set_current_state(TASK_RUNNING);
> > raw_spin_lock_irq(&sem->wait_lock);
> > - rwsem_out_nolock_clear_flags(sem, &waiter);
> > + rwsem_del_waiter(sem, &waiter);
> > if (!list_empty(&sem->wait_list))
> > rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
> > raw_spin_unlock_irq(&sem->wait_lock);
> >
> Sorry for the late reply as I was busy on other works.
>
> I like the idea. I will incorporate in a new patch tomorrow.

Thanks!

2021-11-15 16:59:10

by Waiman Long

[permalink] [raw]
Subject: Re: [PATCH v4] locking/rwsem: Make handoff bit handling more consistent


On 11/15/21 10:45, Aiqun(Maria) Yu wrote:
> On 11/12/2021 8:10 PM, Peter Zijlstra wrote:
>> On Thu, Nov 11, 2021 at 11:07:53PM -0500, Waiman Long wrote:
>>> @@ -889,6 +892,20 @@ rwsem_spin_on_owner(struct rw_semaphore *sem)
>>>   }
>>>   #endif
>>>   +/*
>>> + * Common code to handle rwsem flags in out_nolock path with
>>> wait_lock held.
>>> + * If there is more than one waiter in the queue and the HANDOFF
>>> bit is set,
>>> + * the next waiter will inherit it if the first waiter is removed.
>>> + */
>>> +static inline void rwsem_out_nolock_clear_flags(struct rw_semaphore
>>> *sem,
>>> +                        struct rwsem_waiter *waiter)
>>
>> I'm going to rename that, it doesn't just clear flags, it dequeues the
>> waiter.
>>
>> Argh, rwsem_mark_wake() doesn't clear HANDOFF when list_empty(), and
>> write_slowpath() is *far* too clever about all of this.
>>
>>
>>> +{
>>> +    list_del(&waiter->list);
>>     if (list_empty(&sem->wait_list)) {
>>> + atomic_long_andnot(RWSEM_FLAG_HANDOFF | RWSEM_FLAG_WAITERS,
>>> +                   &sem->count);
>>     }
>>> +}
>>
>>
>>
>>> @@ -1098,7 +1110,7 @@ rwsem_down_write_slowpath(struct rw_semaphore
>>> *sem, int state)
>>>            * In this case, we attempt to acquire the lock again
>>>            * without sleeping.
>>>            */
>>> -        if (wstate == WRITER_HANDOFF) {
>>> +        if (waiter.handoff_set) {
>>
>> I'm thinking this wants to be something like:
>>
>>         if (rwsem_first_waiter(sem) == &waiter && waiter.handoff_set) {
>>
>>>               enum owner_state owner_state;
>>>                 preempt_disable();
>>
>> How's this (on top) then?
>>
>> ---
>> --- a/kernel/locking/rwsem.c
>> +++ b/kernel/locking/rwsem.c
>> @@ -104,11 +104,10 @@
>>    * atomic_long_fetch_add() is used to obtain reader lock, whereas
>>    * atomic_long_cmpxchg() will be used to obtain writer lock.
>>    *
>> - * There are four places where the lock handoff bit may be set or
>> cleared.
>> - * 1) rwsem_mark_wake() for readers            -- set, clear
>> - * 2) rwsem_try_write_lock() for writers       -- set, clear
>> - * 3) Error path of rwsem_down_write_slowpath() -- clear
>> - * 4) Error path of rwsem_down_read_slowpath()  -- clear
>> + * There are three places where the lock handoff bit may be set or
>> cleared.
>> + * 1) rwsem_mark_wake() for readers        -- set, clear
>> + * 2) rwsem_try_write_lock() for writers    -- set, clear
>> + * 3) rwsem_del_waiter()            -- clear
>>    *
>>    * For all the above cases, wait_lock will be held. A writer must also
>>    * be the first one in the wait_list to be eligible for setting the
>> handoff
>> @@ -363,6 +362,31 @@ enum rwsem_wake_type {
>>    */
>>   #define MAX_READERS_WAKEUP    0x100
>>   +static inline void
>> +rwsem_add_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter)
>> +{
>> +    lockdep_assert_held(&sem->wait_lock);
>> +    list_add_tail(&waiter->list, &sem->wait_list);
>> +    /* caller will set RWSEM_FLAG_WAITERS */
>     /* each time a waiter is just added in to the list,
>      * handoff_set initialed as false. */
>     waiter->handoff_set = false;

waiter initialization is done at entry to the slowpath function. This
helper function just insert the waiter into the wait list.


>> +}
>> +
>> +/*
>> + * Remove a waiter from the wait_list and clear flags.
>> + *
>> + * Both rwsem_mark_wake() and rwsem_try_write_lock() contain a full
>> 'copy' of
>> + * this function. Modify with care.
>> + */
>> +static inline void
>> +rwsem_del_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter)
>> +{
>> +    lockdep_assert_held(&sem->wait_lock);
>> +    list_del(&waiter->list);
> what about avoid unnecessary inherit of handoff bit for waiters?
>
> if (waiter->handoff_set)
>     atomic_long_andnot(RWSEM_FLAG_HANDOFF, &sem->count);

We have decided that to let the next waiter inherit the handoff bit. So
there is no need to clear it unless there is no more waiter in the queue.


>> +    if (likely(!list_empty(&sem->wait_list)))
>> +        return;
>> +
>> +    atomic_long_andnot(RWSEM_FLAG_HANDOFF | RWSEM_FLAG_WAITERS,
>> &sem->count);
>> +}
>> +
>>   /*
>>    * handle the lock release when processes blocked on it that can
>> now run
>>    * - if we come here from up_xxxx(), then the RWSEM_FLAG_WAITERS
>> bit must
>> @@ -374,6 +398,8 @@ enum rwsem_wake_type {
>>    *   preferably when the wait_lock is released
>>    * - woken process blocks are discarded from the list after having
>> task zeroed
>>    * - writers are only marked woken if downgrading is false
>> + *
>> + * Implies rwsem_del_waiter() for all woken readers.
>>    */
>>   static void rwsem_mark_wake(struct rw_semaphore *sem,
>>                   enum rwsem_wake_type wake_type,
>> @@ -488,18 +514,25 @@ static void rwsem_mark_wake(struct rw_se
>>         adjustment = woken * RWSEM_READER_BIAS - adjustment;
>>       lockevent_cond_inc(rwsem_wake_reader, woken);
>> +
>> +    oldcount = atomic_long_read(&sem->count);
>>       if (list_empty(&sem->wait_list)) {
>> -        /* hit end of list above */
>> +        /*
>> +         * Combined with list_move_tail() above, this implies
>> +         * rwsem_del_waiter().
>> +         */
>>           adjustment -= RWSEM_FLAG_WAITERS;
>> +        if (oldcount & RWSEM_FLAG_HANDOFF)
>> +            adjustment -= RWSEM_FLAG_HANDOFF;
>> +    } else if (woken) {
>> +        /*
>> +         * When we've woken a reader, we no longer need to force
>> +         * writers to give up the lock and we can clear HANDOFF.
>> +         */
>> +        if (oldcount & RWSEM_FLAG_HANDOFF)
>> +            adjustment -= RWSEM_FLAG_HANDOFF;
>>       }
>>   -    /*
>> -     * When we've woken a reader, we no longer need to force writers
>> -     * to give up the lock and we can clear HANDOFF.
>> -     */
>> -    if (woken && (atomic_long_read(&sem->count) & RWSEM_FLAG_HANDOFF))
>> -        adjustment -= RWSEM_FLAG_HANDOFF;
>> -
>>       if (adjustment)
>>           atomic_long_add(adjustment, &sem->count);
>>   @@ -529,6 +562,8 @@ static void rwsem_mark_wake(struct rw_se
>>    * This function must be called with the sem->wait_lock held to
>> prevent
>>    * race conditions between checking the rwsem wait list and setting
>> the
>>    * sem->count accordingly.
>> + *
>> + * Implies rwsem_del_waiter() on success.
>>    */
>>   static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
>>                       struct rwsem_waiter *waiter)
>> @@ -575,6 +610,11 @@ static inline bool rwsem_try_write_lock(
>>           return false;
>>       }
>>   +    /*
>> +     * Have rwsem_try_write_lock() fully imply rwsem_del_waiter() on
>> +     * success.
>> +     */
>> +    list_del(&waiter->list);
>>       rwsem_set_owner(sem);
>>       return true;
>>   }
>> @@ -893,20 +933,6 @@ rwsem_spin_on_owner(struct rw_semaphore
>>   #endif
>>     /*
>> - * Common code to handle rwsem flags in out_nolock path with
>> wait_lock held.
>> - * If there is more than one waiter in the queue and the HANDOFF bit
>> is set,
>> - * the next waiter will inherit it if the first waiter is removed.
>> - */
>> -static inline void rwsem_out_nolock_clear_flags(struct rw_semaphore
>> *sem,
>> -                        struct rwsem_waiter *waiter)
>> -{
>> -    list_del(&waiter->list);
>> -    if (list_empty(&sem->wait_list))
>> -        atomic_long_andnot(RWSEM_FLAG_HANDOFF | RWSEM_FLAG_WAITERS,
>> -                   &sem->count);
>> -}
>> -
>> -/*
>>    * Wait for the read lock to be granted
>>    */
>>   static struct rw_semaphore __sched *
>> @@ -973,7 +999,7 @@ rwsem_down_read_slowpath(struct rw_semap
>>           }
>>           adjustment += RWSEM_FLAG_WAITERS;
>>       }
>> -    list_add_tail(&waiter.list, &sem->wait_list);
>> +    rwsem_add_waiter(sem, &waiter);
>>         /* we're now waiting on the lock, but no longer actively
>> locking */
>>       count = atomic_long_add_return(adjustment, &sem->count);
>> @@ -1019,7 +1045,7 @@ rwsem_down_read_slowpath(struct rw_semap
>>       return sem;
>>     out_nolock:
>> -    rwsem_out_nolock_clear_flags(sem, &waiter);
>> +    rwsem_del_waiter(sem, &waiter);
>>       raw_spin_unlock_irq(&sem->wait_lock);
>>       __set_current_state(TASK_RUNNING);
>>       lockevent_inc(rwsem_rlock_fail);
>> @@ -1034,7 +1060,6 @@ rwsem_down_write_slowpath(struct rw_sema
>>   {
>>       long count;
>>       struct rwsem_waiter waiter;
>> -    struct rw_semaphore *ret = sem;
>>       DEFINE_WAKE_Q(wake_q);
>>         /* do optimistic spinning and steal lock if possible */
>> @@ -1053,7 +1078,7 @@ rwsem_down_write_slowpath(struct rw_sema
>>       waiter.handoff_set = false;
>>         raw_spin_lock_irq(&sem->wait_lock);
>> -    list_add_tail(&waiter.list, &sem->wait_list);
>> +    rwsem_add_waiter(sem, &waiter);
>>         /* we're now waiting on the lock */
>>       if (rwsem_first_waiter(sem) != &waiter) {
>> @@ -1110,7 +1135,7 @@ rwsem_down_write_slowpath(struct rw_sema
>>            * In this case, we attempt to acquire the lock again
>>            * without sleeping.
>>            */
>> -        if (waiter.handoff_set) {
>> +        if (rwsem_first_waiter(sem) == &waiter && waiter.handoff_set) {
>>               enum owner_state owner_state;
>>                 preempt_disable();
>> @@ -1128,16 +1153,14 @@ rwsem_down_write_slowpath(struct rw_sema
>>           raw_spin_lock_irq(&sem->wait_lock);
>>       }
>>       __set_current_state(TASK_RUNNING);
>> -    list_del(&waiter.list);
>>       raw_spin_unlock_irq(&sem->wait_lock);
>>       lockevent_inc(rwsem_wlock);
>> -
>> -    return ret;
>> +    return sem;
>>     out_nolock:
>>       __set_current_state(TASK_RUNNING);
>>       raw_spin_lock_irq(&sem->wait_lock);
>> -    rwsem_out_nolock_clear_flags(sem, &waiter);
>> +    rwsem_del_waiter(sem, &waiter);
>>       if (!list_empty(&sem->wait_list))
>>           rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
>>       raw_spin_unlock_irq(&sem->wait_lock);
>>
>
> what about avoid unnecessary inherit of handoff bit for waiters?
>     1. when set handoff bit also set waiter.handoff_set as  true;
>     2. when clear handoff bit also set waiter.handoff_set as false.
>     3. And rwsem_add_waiter initial as false;
>     4. And rwsem_del_waiter also can clear the handoff bit according
> to waiter.handoff_set.
>
> Because handoff bit can have better performance if correctly set and
> cleared.
>
Killing or interrupt a waiter to force it to quit is not considered a
fast path operation. It is an exception rather than the rule. So
performance consideration is less important here.

Cheers,
Longman


2021-11-16 00:12:17

by Waiman Long

[permalink] [raw]
Subject: Re: [PATCH v4] locking/rwsem: Make handoff bit handling more consistent


On 11/15/21 11:01, Peter Zijlstra wrote:
> On Sun, Nov 14, 2021 at 10:38:57PM -0500, Waiman Long wrote:
>> On 11/12/21 07:10, Peter Zijlstra wrote:
>>> Argh, rwsem_mark_wake() doesn't clear HANDOFF when list_empty(), and
>>> write_slowpath() is *far* too clever about all of this.
>> rwsem_mark_wake() does clear the HANDOFF flag if it was set.
> Argh, yeah, I got confused by the whole !woken case, but that case won't
> ever hit list_empty() either. Perhaps that stuff could use a bit of a
> reflow too.
I think your modification already have included the rewrite for that part.
>
>>>> @@ -1098,7 +1110,7 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
>>>> * In this case, we attempt to acquire the lock again
>>>> * without sleeping.
>>>> */
>>>> - if (wstate == WRITER_HANDOFF) {
>>>> + if (waiter.handoff_set) {
>>> I'm thinking this wants to be something like:
>>>
>>> if (rwsem_first_waiter(sem) == &waiter && waiter.handoff_set) {
>> handoff_set flag is only set when the waiter becomes the first.
> Yes, but a random waiter can wake up and see it be set and also start
> spinning.

The handoff_set flag can only be true for a first waiter. A random
waiter in the middle of a wait queue will never has this flag set.

This flag is set in two places in rwsem_try_write_lock():

1)

               if (has_handoff && !first)
                        return false;
                new = count;

                if (count & RWSEM_LOCK_MASK) {
                        /*
                         * Only the first waiter can inherit a
previously set
                         * handoff bit.
                         */
                        waiter->handoff_set = has_handoff;

handoff_set can only be set to true here if first is also true. In that
case, it will also return false immediately afterward.

2)

        if (new & RWSEM_FLAG_HANDOFF) {
                waiter->handoff_set = true;
                lockevent_inc(rwsem_wlock_handoff);
                return false;
        }

Again, only first waiter will have a chance of setting the handoff bit
and have handoff_set set to true.

>>>> enum owner_state owner_state;
>>>> preempt_disable();
>>> @@ -575,6 +610,11 @@ static inline bool rwsem_try_write_lock(
>>> return false;
>>> }
>>> + /*
>>> + * Have rwsem_try_write_lock() fully imply rwsem_del_waiter() on
>>> + * success.
>>> + */
>>> + list_del(&waiter->list);
>>> rwsem_set_owner(sem);
>>> return true;
>>> }
>>> @@ -1128,16 +1153,14 @@ rwsem_down_write_slowpath(struct rw_sema
>>> raw_spin_lock_irq(&sem->wait_lock);
>>> }
>>> __set_current_state(TASK_RUNNING);
>>> - list_del(&waiter.list);
>> +    rwsem_del_waiter(sem, &waiters); ?
> I tried that, but then we get an extra atomic in this path. As is I made
> try_write_lock() do the full del_waiter, see the hunk above.

You are right. I missed your change in rwsem_try_write_lock().

Thanks,
Longman



2021-11-16 00:16:04

by Waiman Long

[permalink] [raw]
Subject: Re: [PATCH v4] locking/rwsem: Make handoff bit handling more consistent


On 11/15/21 18:07, Peter Zijlstra wrote:
> On Mon, Nov 15, 2021 at 05:29:10PM -0500, Waiman Long wrote:
>> The handoff_set flag can only be true for a first waiter. A random waiter in
>> the middle of a wait queue will never has this flag set.
>>
>> This flag is set in two places in rwsem_try_write_lock():
> Bah, I thought it would unconditionally propagate the bit from @count. I
> missed the early exit :/
>
I am going to restructure the code there to make it easier to see that
only the first waiter will have this bit set to avoid this confusion.

Cheers,
Longman


2021-11-16 01:22:54

by Peter Zijlstra

[permalink] [raw]
Subject: Re: [PATCH v4] locking/rwsem: Make handoff bit handling more consistent

On Mon, Nov 15, 2021 at 05:29:10PM -0500, Waiman Long wrote:
>
> The handoff_set flag can only be true for a first waiter. A random waiter in
> the middle of a wait queue will never has this flag set.
>
> This flag is set in two places in rwsem_try_write_lock():

Bah, I thought it would unconditionally propagate the bit from @count. I
missed the early exit :/