2023-03-27 20:30:35

by Waiman Long

[permalink] [raw]
Subject: [PATCH v2 0/8] locking/rwsem: Rework writer wakeup and handoff

This is a followup to the patches in queue.git locking/core branch
posted by Peter in [1]. His patch series unified the reader and writer
paths as much as possible and making the code a bit easier to maintain.
Unfortunately, it did regress performance because it suppress writer
lock stealing via optimistic spinning [2].

This series include the 7 patches in his locking/core branch with a minor
change to the last one to remove the now unneeded rwsem_wlock_ehandoff
event. The last patch in the series restores the old write lock slow
path behavior of acquiring the write lock mostly in the slow path which
is necessary to restore the performance level back to parity.

[1] https://lore.kernel.org/lkml/Y%[email protected]/
[2] https://lore.kernel.org/lkml/[email protected]/

Peter Zijlstra (6):
locking/rwsem: Enforce queueing when HANDOFF
locking/rwsem: Rework writer wakeup
locking/rwsem: Simplify rwsem_writer_wake()
locking/rwsem: Split out rwsem_reader_wake()
locking/rwsem: Unify wait loop
locking/rwsem: Use the force

Waiman Long (2):
locking/rwsem: Minor code refactoring in rwsem_mark_wake()
locking/rwsem: Restore old write lock slow path behavior

kernel/locking/rwsem.c | 466 ++++++++++++++++++++++-------------------
1 file changed, 246 insertions(+), 220 deletions(-)

--
2.31.1


2023-03-27 20:30:43

by Waiman Long

[permalink] [raw]
Subject: [PATCH v2 6/8] locking/rwsem: Unify wait loop

From: Peter Zijlstra <[email protected]>

Now that the reader and writer wait loops are identical, share the
code.

Signed-off-by: Peter Zijlstra (Intel) <[email protected]>
---
kernel/locking/rwsem.c | 121 +++++++++++++++++------------------------
1 file changed, 51 insertions(+), 70 deletions(-)

diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c
index 0bc262dc77fd..ee8861effcc2 100644
--- a/kernel/locking/rwsem.c
+++ b/kernel/locking/rwsem.c
@@ -650,13 +650,11 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
* optionally wake up waiters before it returns.
*/
static inline void
-rwsem_del_wake_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter,
- struct wake_q_head *wake_q)
+rwsem_del_wake_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter)
__releases(&sem->wait_lock)
{
bool first = rwsem_first_waiter(sem) == waiter;
-
- wake_q_init(wake_q);
+ DEFINE_WAKE_Q(wake_q);

/*
* If the wait_list isn't empty and the waiter to be deleted is
@@ -664,10 +662,10 @@ rwsem_del_wake_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter,
* be eligible to acquire or spin on the lock.
*/
if (rwsem_del_waiter(sem, waiter) && first)
- rwsem_mark_wake(sem, RWSEM_WAKE_ANY, wake_q);
+ rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
raw_spin_unlock_irq(&sem->wait_lock);
- if (!wake_q_empty(wake_q))
- wake_up_q(wake_q);
+ if (!wake_q_empty(&wake_q))
+ wake_up_q(&wake_q);
}

/*
@@ -997,6 +995,50 @@ static inline void rwsem_cond_wake_waiter(struct rw_semaphore *sem, long count,
rwsem_mark_wake(sem, wake_type, wake_q);
}

+#define lockevent_rw_inc(rd, evr, evw) do { \
+ lockevent_cond_inc(evr, (rd)); \
+ lockevent_cond_inc(evw, !(rd)); \
+} while (0)
+
+static __always_inline struct rw_semaphore *
+rwsem_waiter_wait(struct rw_semaphore *sem, struct rwsem_waiter *waiter,
+ int state, bool reader)
+{
+ trace_contention_begin(sem, reader ? LCB_F_READ : LCB_F_WRITE);
+
+ /* wait to be given the lock */
+ for (;;) {
+ set_current_state(state);
+ if (!smp_load_acquire(&waiter->task)) {
+ /* Matches rwsem_waiter_wake()'s smp_store_release(). */
+ break;
+ }
+ if (signal_pending_state(state, current)) {
+ raw_spin_lock_irq(&sem->wait_lock);
+ if (waiter->task)
+ goto out_nolock;
+ raw_spin_unlock_irq(&sem->wait_lock);
+ /* Ordered by sem->wait_lock against rwsem_mark_wake(). */
+ break;
+ }
+ schedule_preempt_disabled();
+ lockevent_rw_inc(reader, rwsem_sleep_reader, rwsem_sleep_writer);
+ }
+
+ __set_current_state(TASK_RUNNING);
+
+ lockevent_rw_inc(reader, rwsem_rlock, rwsem_wlock);
+ trace_contention_end(sem, 0);
+ return sem;
+
+out_nolock:
+ rwsem_del_wake_waiter(sem, waiter);
+ __set_current_state(TASK_RUNNING);
+ lockevent_rw_inc(reader, rwsem_rlock_fail, rwsem_wlock_fail);
+ trace_contention_end(sem, -EINTR);
+ return ERR_PTR(-EINTR);
+}
+
/*
* Wait for the read lock to be granted
*/
@@ -1074,38 +1116,7 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, long count, unsigned int stat
if (!wake_q_empty(&wake_q))
wake_up_q(&wake_q);

- trace_contention_begin(sem, LCB_F_READ);
-
- /* wait to be given the lock */
- for (;;) {
- set_current_state(state);
- if (!smp_load_acquire(&waiter.task)) {
- /* Matches rwsem_waiter_wake()'s smp_store_release(). */
- break;
- }
- if (signal_pending_state(state, current)) {
- raw_spin_lock_irq(&sem->wait_lock);
- if (waiter.task)
- goto out_nolock;
- raw_spin_unlock_irq(&sem->wait_lock);
- /* Ordered by sem->wait_lock against rwsem_mark_wake(). */
- break;
- }
- schedule_preempt_disabled();
- lockevent_inc(rwsem_sleep_reader);
- }
-
- __set_current_state(TASK_RUNNING);
- lockevent_inc(rwsem_rlock);
- trace_contention_end(sem, 0);
- return sem;
-
-out_nolock:
- rwsem_del_wake_waiter(sem, &waiter, &wake_q);
- __set_current_state(TASK_RUNNING);
- lockevent_inc(rwsem_rlock_fail);
- trace_contention_end(sem, -EINTR);
- return ERR_PTR(-EINTR);
+ return rwsem_waiter_wait(sem, &waiter, state, true);
}

/*
@@ -1155,37 +1166,7 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
}
raw_spin_unlock_irq(&sem->wait_lock);

- /* wait until we successfully acquire the lock */
- trace_contention_begin(sem, LCB_F_WRITE);
-
- for (;;) {
- set_current_state(state);
- if (!smp_load_acquire(&waiter.task)) {
- /* Matches rwsem_waiter_wake()'s smp_store_release(). */
- break;
- }
- if (signal_pending_state(state, current)) {
- raw_spin_lock_irq(&sem->wait_lock);
- if (waiter.task)
- goto out_nolock;
- raw_spin_unlock_irq(&sem->wait_lock);
- /* Ordered by sem->wait_lock against rwsem_mark_wake(). */
- break;
- }
- schedule_preempt_disabled();
- lockevent_inc(rwsem_sleep_writer);
- }
- __set_current_state(TASK_RUNNING);
- lockevent_inc(rwsem_wlock);
- trace_contention_end(sem, 0);
- return sem;
-
-out_nolock:
- rwsem_del_wake_waiter(sem, &waiter, &wake_q);
- __set_current_state(TASK_RUNNING);
- lockevent_inc(rwsem_wlock_fail);
- trace_contention_end(sem, -EINTR);
- return ERR_PTR(-EINTR);
+ return rwsem_waiter_wait(sem, &waiter, state, false);
}

/*
--
2.31.1

2023-03-27 20:31:16

by Waiman Long

[permalink] [raw]
Subject: [PATCH v2 4/8] locking/rwsem: Simplify rwsem_writer_wake()

From: Peter Zijlstra <[email protected]>

Since @waiter := rwsem_first_waiter() simplify things.

Suggested-by: Waiman Long <[email protected]>
Signed-off-by: Peter Zijlstra (Intel) <[email protected]>
---
kernel/locking/rwsem.c | 21 ++++-----------------
1 file changed, 4 insertions(+), 17 deletions(-)

diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c
index 0cc0aa566a6b..225e86326ea4 100644
--- a/kernel/locking/rwsem.c
+++ b/kernel/locking/rwsem.c
@@ -426,26 +426,12 @@ rwsem_waiter_wake(struct rwsem_waiter *waiter, struct wake_q_head *wake_q)
static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
struct rwsem_waiter *waiter)
{
-
- struct rwsem_waiter *first = rwsem_first_waiter(sem);
long count, new;

lockdep_assert_held(&sem->wait_lock);

count = atomic_long_read(&sem->count);
do {
- bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
-
- if (has_handoff) {
- /*
- * Honor handoff bit and yield only when the first
- * waiter is the one that set it. Otherwisee, we
- * still try to acquire the rwsem.
- */
- if (first->handoff_set && (waiter != first))
- return false;
- }
-
new = count;

if (count & RWSEM_LOCK_MASK) {
@@ -454,8 +440,9 @@ static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
* if it is an RT task or wait in the wait queue
* for too long.
*/
- if (has_handoff || (!rt_task(waiter->task) &&
- !time_after(jiffies, waiter->timeout)))
+ if ((count & RWSEM_FLAG_HANDOFF) ||
+ (!rt_task(waiter->task) &&
+ !time_after(jiffies, waiter->timeout)))
return false;

new |= RWSEM_FLAG_HANDOFF;
@@ -474,7 +461,7 @@ static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
* set here to enable optimistic spinning in slowpath loop.
*/
if (new & RWSEM_FLAG_HANDOFF) {
- first->handoff_set = true;
+ waiter->handoff_set = true;
lockevent_inc(rwsem_wlock_handoff);
return false;
}
--
2.31.1

2023-03-27 20:31:54

by Waiman Long

[permalink] [raw]
Subject: [PATCH v2 3/8] locking/rwsem: Rework writer wakeup

From: Peter Zijlstra <[email protected]>

Currently readers and writers have distinctly different wait/wake
methods. For readers the ->count adjustment happens on the wakeup
side, while for writers the ->count adjustment happens on the wait
side.

This asymmetry is unfortunate since the wake side has an additional
guarantee -- specifically, the wake side has observed the unlocked
state, and thus it can know that speculative READER_BIAS perbutations
on ->count are just that, they will be undone.

Additionally, unifying the wait/wake methods allows sharing code.

As such, do a straight-forward transform of the writer wakeup into the
wake side.

Signed-off-by: Peter Zijlstra (Intel) <[email protected]>
---
kernel/locking/rwsem.c | 259 +++++++++++++++++++----------------------
1 file changed, 123 insertions(+), 136 deletions(-)

diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c
index 4b9e492abd59..0cc0aa566a6b 100644
--- a/kernel/locking/rwsem.c
+++ b/kernel/locking/rwsem.c
@@ -394,6 +394,108 @@ rwsem_del_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter)
return false;
}

+static inline void
+rwsem_waiter_wake(struct rwsem_waiter *waiter, struct wake_q_head *wake_q)
+{
+ struct task_struct *tsk;
+
+ tsk = waiter->task;
+ get_task_struct(tsk);
+
+ /*
+ * Ensure calling get_task_struct() before setting the reader
+ * waiter to nil such that rwsem_down_read_slowpath() cannot
+ * race with do_exit() by always holding a reference count
+ * to the task to wakeup.
+ */
+ smp_store_release(&waiter->task, NULL);
+ /*
+ * Ensure issuing the wakeup (either by us or someone else)
+ * after setting the reader waiter to nil.
+ */
+ wake_q_add_safe(wake_q, tsk);
+}
+
+/*
+ * This function must be called with the sem->wait_lock held to prevent
+ * race conditions between checking the rwsem wait list and setting the
+ * sem->count accordingly.
+ *
+ * Implies rwsem_del_waiter() on success.
+ */
+static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
+ struct rwsem_waiter *waiter)
+{
+
+ struct rwsem_waiter *first = rwsem_first_waiter(sem);
+ long count, new;
+
+ lockdep_assert_held(&sem->wait_lock);
+
+ count = atomic_long_read(&sem->count);
+ do {
+ bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
+
+ if (has_handoff) {
+ /*
+ * Honor handoff bit and yield only when the first
+ * waiter is the one that set it. Otherwisee, we
+ * still try to acquire the rwsem.
+ */
+ if (first->handoff_set && (waiter != first))
+ return false;
+ }
+
+ new = count;
+
+ if (count & RWSEM_LOCK_MASK) {
+ /*
+ * A waiter (first or not) can set the handoff bit
+ * if it is an RT task or wait in the wait queue
+ * for too long.
+ */
+ if (has_handoff || (!rt_task(waiter->task) &&
+ !time_after(jiffies, waiter->timeout)))
+ return false;
+
+ new |= RWSEM_FLAG_HANDOFF;
+ } else {
+ new |= RWSEM_WRITER_LOCKED;
+ new &= ~RWSEM_FLAG_HANDOFF;
+
+ if (list_is_singular(&sem->wait_list))
+ new &= ~RWSEM_FLAG_WAITERS;
+ }
+ } while (!atomic_long_try_cmpxchg_acquire(&sem->count, &count, new));
+
+ /*
+ * We have either acquired the lock with handoff bit cleared or set
+ * the handoff bit. Only the first waiter can have its handoff_set
+ * set here to enable optimistic spinning in slowpath loop.
+ */
+ if (new & RWSEM_FLAG_HANDOFF) {
+ first->handoff_set = true;
+ lockevent_inc(rwsem_wlock_handoff);
+ return false;
+ }
+
+ /*
+ * Have rwsem_try_write_lock() fully imply rwsem_del_waiter() on
+ * success.
+ */
+ list_del(&waiter->list);
+ atomic_long_set(&sem->owner, (long)waiter->task);
+ return true;
+}
+
+static void rwsem_writer_wake(struct rw_semaphore *sem,
+ struct rwsem_waiter *waiter,
+ struct wake_q_head *wake_q)
+{
+ if (rwsem_try_write_lock(sem, waiter))
+ rwsem_waiter_wake(waiter, wake_q);
+}
+
/*
* handle the lock release when processes blocked on it that can now run
* - if we come here from up_xxxx(), then the RWSEM_FLAG_WAITERS bit must
@@ -424,23 +526,12 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
*/
waiter = rwsem_first_waiter(sem);

- if (waiter->type != RWSEM_WAITING_FOR_WRITE)
- goto wake_readers;
-
- if (wake_type == RWSEM_WAKE_ANY) {
- /*
- * Mark writer at the front of the queue for wakeup.
- * Until the task is actually later awoken later by
- * the caller, other writers are able to steal it.
- * Readers, on the other hand, will block as they
- * will notice the queued writer.
- */
- wake_q_add(wake_q, waiter->task);
- lockevent_inc(rwsem_wake_writer);
+ if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
+ if (wake_type == RWSEM_WAKE_ANY)
+ rwsem_writer_wake(sem, waiter, wake_q);
+ return;
}
- return;

-wake_readers:
/*
* No reader wakeup if there are too many of them already.
*/
@@ -552,25 +643,8 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
atomic_long_add(adjustment, &sem->count);

/* 2nd pass */
- list_for_each_entry_safe(waiter, tmp, &wlist, list) {
- struct task_struct *tsk;
-
- tsk = waiter->task;
- get_task_struct(tsk);
-
- /*
- * Ensure calling get_task_struct() before setting the reader
- * waiter to nil such that rwsem_down_read_slowpath() cannot
- * race with do_exit() by always holding a reference count
- * to the task to wakeup.
- */
- smp_store_release(&waiter->task, NULL);
- /*
- * Ensure issuing the wakeup (either by us or someone else)
- * after setting the reader waiter to nil.
- */
- wake_q_add_safe(wake_q, tsk);
- }
+ list_for_each_entry_safe(waiter, tmp, &wlist, list)
+ rwsem_waiter_wake(waiter, wake_q);
}

/*
@@ -600,77 +674,6 @@ rwsem_del_wake_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter,
wake_up_q(wake_q);
}

-/*
- * This function must be called with the sem->wait_lock held to prevent
- * race conditions between checking the rwsem wait list and setting the
- * sem->count accordingly.
- *
- * Implies rwsem_del_waiter() on success.
- */
-static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
- struct rwsem_waiter *waiter)
-{
- struct rwsem_waiter *first = rwsem_first_waiter(sem);
- long count, new;
-
- lockdep_assert_held(&sem->wait_lock);
-
- count = atomic_long_read(&sem->count);
- do {
- bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
-
- if (has_handoff) {
- /*
- * Honor handoff bit and yield only when the first
- * waiter is the one that set it. Otherwisee, we
- * still try to acquire the rwsem.
- */
- if (first->handoff_set && (waiter != first))
- return false;
- }
-
- new = count;
-
- if (count & RWSEM_LOCK_MASK) {
- /*
- * A waiter (first or not) can set the handoff bit
- * if it is an RT task or wait in the wait queue
- * for too long.
- */
- if (has_handoff || (!rt_task(waiter->task) &&
- !time_after(jiffies, waiter->timeout)))
- return false;
-
- new |= RWSEM_FLAG_HANDOFF;
- } else {
- new |= RWSEM_WRITER_LOCKED;
- new &= ~RWSEM_FLAG_HANDOFF;
-
- if (list_is_singular(&sem->wait_list))
- new &= ~RWSEM_FLAG_WAITERS;
- }
- } while (!atomic_long_try_cmpxchg_acquire(&sem->count, &count, new));
-
- /*
- * We have either acquired the lock with handoff bit cleared or set
- * the handoff bit. Only the first waiter can have its handoff_set
- * set here to enable optimistic spinning in slowpath loop.
- */
- if (new & RWSEM_FLAG_HANDOFF) {
- first->handoff_set = true;
- lockevent_inc(rwsem_wlock_handoff);
- return false;
- }
-
- /*
- * Have rwsem_try_write_lock() fully imply rwsem_del_waiter() on
- * success.
- */
- list_del(&waiter->list);
- rwsem_set_owner(sem);
- return true;
-}
-
/*
* The rwsem_spin_on_owner() function returns the following 4 values
* depending on the lock owner state.
@@ -1081,7 +1084,7 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, long count, unsigned int stat
for (;;) {
set_current_state(state);
if (!smp_load_acquire(&waiter.task)) {
- /* Matches rwsem_mark_wake()'s smp_store_release(). */
+ /* Matches rwsem_waiter_wake()'s smp_store_release(). */
break;
}
if (signal_pending_state(state, current)) {
@@ -1151,55 +1154,39 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
}
} else {
atomic_long_or(RWSEM_FLAG_WAITERS, &sem->count);
+ if (rwsem_try_write_lock(sem, &waiter))
+ waiter.task = NULL;
}
+ raw_spin_unlock_irq(&sem->wait_lock);

/* wait until we successfully acquire the lock */
- set_current_state(state);
trace_contention_begin(sem, LCB_F_WRITE);

for (;;) {
- if (rwsem_try_write_lock(sem, &waiter)) {
- /* rwsem_try_write_lock() implies ACQUIRE on success */
+ set_current_state(state);
+ if (!smp_load_acquire(&waiter.task)) {
+ /* Matches rwsem_waiter_wake()'s smp_store_release(). */
break;
}
-
- raw_spin_unlock_irq(&sem->wait_lock);
-
- if (signal_pending_state(state, current))
- goto out_nolock;
-
- /*
- * After setting the handoff bit and failing to acquire
- * the lock, attempt to spin on owner to accelerate lock
- * transfer. If the previous owner is a on-cpu writer and it
- * has just released the lock, OWNER_NULL will be returned.
- * In this case, we attempt to acquire the lock again
- * without sleeping.
- */
- if (waiter.handoff_set) {
- enum owner_state owner_state;
-
- owner_state = rwsem_spin_on_owner(sem);
- if (owner_state == OWNER_NULL)
- goto trylock_again;
+ if (signal_pending_state(state, current)) {
+ raw_spin_lock_irq(&sem->wait_lock);
+ if (waiter.task)
+ goto out_nolock;
+ raw_spin_unlock_irq(&sem->wait_lock);
+ /* Ordered by sem->wait_lock against rwsem_mark_wake(). */
+ break;
}
-
schedule_preempt_disabled();
lockevent_inc(rwsem_sleep_writer);
- set_current_state(state);
-trylock_again:
- raw_spin_lock_irq(&sem->wait_lock);
}
__set_current_state(TASK_RUNNING);
- raw_spin_unlock_irq(&sem->wait_lock);
lockevent_inc(rwsem_wlock);
trace_contention_end(sem, 0);
return sem;

out_nolock:
- __set_current_state(TASK_RUNNING);
- raw_spin_lock_irq(&sem->wait_lock);
rwsem_del_wake_waiter(sem, &waiter, &wake_q);
+ __set_current_state(TASK_RUNNING);
lockevent_inc(rwsem_wlock_fail);
trace_contention_end(sem, -EINTR);
return ERR_PTR(-EINTR);
--
2.31.1

2023-03-27 20:32:29

by Waiman Long

[permalink] [raw]
Subject: [PATCH v2 7/8] locking/rwsem: Use the force

From: Peter Zijlstra <[email protected]>

Now that the writer adjustment is done from the wakeup side and
HANDOFF guarantees spinning/stealing is disabled, use the combined
guarantee it ignore spurious READER_BIAS and directly claim the lock.

Signed-off-by: Peter Zijlstra (Intel) <[email protected]>
---
kernel/locking/rwsem.c | 33 +++++++++++++++++++++++++++++----
1 file changed, 29 insertions(+), 4 deletions(-)

diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c
index ee8861effcc2..7bd26e64827a 100644
--- a/kernel/locking/rwsem.c
+++ b/kernel/locking/rwsem.c
@@ -377,8 +377,8 @@ rwsem_add_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter)
/*
* Remove a waiter from the wait_list and clear flags.
*
- * Both rwsem_reader_wake() and rwsem_try_write_lock() contain a full 'copy' of
- * this function. Modify with care.
+ * Both rwsem_{reader,writer}_wake() and rwsem_try_write_lock() contain a full
+ * 'copy' of this function. Modify with care.
*
* Return: true if wait_list isn't empty and false otherwise
*/
@@ -479,8 +479,33 @@ static void rwsem_writer_wake(struct rw_semaphore *sem,
struct rwsem_waiter *waiter,
struct wake_q_head *wake_q)
{
- if (rwsem_try_write_lock(sem, waiter))
- rwsem_waiter_wake(waiter, wake_q);
+ long count = atomic_long_read(&sem->count);
+
+ /*
+ * Since rwsem_mark_wake() is only called (with WAKE_ANY) when
+ * the lock is unlocked, and the HANDOFF bit guarantees that
+ * all spinning / stealing is disabled, it is posssible to
+ * unconditionally claim the lock -- any READER_BIAS will be
+ * temporary.
+ */
+ if ((count & (RWSEM_FLAG_HANDOFF|RWSEM_WRITER_LOCKED)) == RWSEM_FLAG_HANDOFF) {
+ unsigned long adjustment = RWSEM_WRITER_LOCKED - RWSEM_FLAG_HANDOFF;
+
+ if (list_is_singular(&sem->wait_list))
+ adjustment -= RWSEM_FLAG_WAITERS;
+
+ atomic_long_add(adjustment, &sem->count);
+ /*
+ * Have rwsem_writer_wake() fully imply rwsem_del_waiter() on
+ * success.
+ */
+ list_del(&waiter->list);
+ atomic_long_set(&sem->owner, (long)waiter->task);
+
+ } else if (!rwsem_try_write_lock(sem, waiter))
+ return;
+
+ rwsem_waiter_wake(waiter, wake_q);
}

static void rwsem_reader_wake(struct rw_semaphore *sem,
--
2.31.1

2023-03-27 20:32:33

by Waiman Long

[permalink] [raw]
Subject: [PATCH v2 8/8] locking/rwsem: Restore old write lock slow path behavior

An earlier commit ("locking/rwsem: Rework writer wakeup") moves writer
lock acquisition to the write wakeup path only. This can result in
an almost immediate transfer of write lock ownership after an unlock
leaving little time for lock stealing. That can be long before the new
write lock owner wakes up and run its critical section.

As a result, write lock stealing from optimistic spinning will be
greatly suppressed. By enabling CONFIG_LOCK_EVENT_COUNTS and running a
rwsem locking microbenmark on a 2-socket x86-64 test machine for 15s,
it was found that the locking rate was reduced to about 30% of that
before the patch - 584,091 op/s vs. 171,184 ops/s. The total number
of lock stealings within the testing period was reduced to less than 1%
of that before the patch - 4,252,147 vs 17,939 [1].

To restore the lost performance, this patch restores the old write lock
slow path behavior of acquiring the lock after the waiter has been woken
up with the exception that lock transfer may happen in the wakeup path
if the HANDOFF bit has been set. In addition, the waiter that sets the
HANDOFF bit will still try to spin on the lock owner if it is on cpu.

With this patch, the locking rate is now back up to 580,256 ops/s which
is almost the same as before.

[1] https://lore.kernel.org/lkml/[email protected]
/

Signed-off-by: Waiman Long <[email protected]>
---
kernel/locking/rwsem.c | 34 +++++++++++++++++++++++++++++++---
1 file changed, 31 insertions(+), 3 deletions(-)

diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c
index 7bd26e64827a..cf9dc1e250e0 100644
--- a/kernel/locking/rwsem.c
+++ b/kernel/locking/rwsem.c
@@ -426,6 +426,7 @@ rwsem_waiter_wake(struct rwsem_waiter *waiter, struct wake_q_head *wake_q)
static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
struct rwsem_waiter *waiter)
{
+ bool first = (rwsem_first_waiter(sem) == waiter);
long count, new;

lockdep_assert_held(&sem->wait_lock);
@@ -434,6 +435,9 @@ static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
do {
new = count;

+ if (!first && (count & (RWSEM_FLAG_HANDOFF | RWSEM_LOCK_MASK)))
+ return false;
+
if (count & RWSEM_LOCK_MASK) {
/*
* A waiter (first or not) can set the handoff bit
@@ -501,11 +505,18 @@ static void rwsem_writer_wake(struct rw_semaphore *sem,
*/
list_del(&waiter->list);
atomic_long_set(&sem->owner, (long)waiter->task);
-
- } else if (!rwsem_try_write_lock(sem, waiter))
+ rwsem_waiter_wake(waiter, wake_q);
return;
+ }

- rwsem_waiter_wake(waiter, wake_q);
+ /*
+ * Mark writer at the front of the queue for wakeup.
+ * Until the task is actually awoken later by the caller, other
+ * writers are able to steal it. Readers, on the other hand, will
+ * block as they will notice the queued writer.
+ */
+ wake_q_add(wake_q, waiter->task);
+ lockevent_inc(rwsem_wake_writer);
}

static void rwsem_reader_wake(struct rw_semaphore *sem,
@@ -1038,6 +1049,23 @@ rwsem_waiter_wait(struct rw_semaphore *sem, struct rwsem_waiter *waiter,
/* Matches rwsem_waiter_wake()'s smp_store_release(). */
break;
}
+ if (!reader) {
+ /*
+ * Writer still needs to do a trylock here
+ */
+ raw_spin_lock_irq(&sem->wait_lock);
+ if (waiter->task && rwsem_try_write_lock(sem, waiter))
+ waiter->task = NULL;
+ raw_spin_unlock_irq(&sem->wait_lock);
+ if (!smp_load_acquire(&waiter->task))
+ break;
+
+ if (waiter->handoff_set) {
+ rwsem_spin_on_owner(sem);
+ if (!smp_load_acquire(&waiter->task))
+ break;
+ }
+ }
if (signal_pending_state(state, current)) {
raw_spin_lock_irq(&sem->wait_lock);
if (waiter->task)
--
2.31.1

2023-03-28 14:06:48

by Peter Zijlstra

[permalink] [raw]
Subject: Re: [PATCH v2 8/8] locking/rwsem: Restore old write lock slow path behavior

On Mon, Mar 27, 2023 at 04:24:13PM -0400, Waiman Long wrote:

> kernel/locking/rwsem.c | 34 +++++++++++++++++++++++++++++++---
> 1 file changed, 31 insertions(+), 3 deletions(-)
>
> diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c
> index 7bd26e64827a..cf9dc1e250e0 100644
> --- a/kernel/locking/rwsem.c
> +++ b/kernel/locking/rwsem.c
> @@ -426,6 +426,7 @@ rwsem_waiter_wake(struct rwsem_waiter *waiter, struct wake_q_head *wake_q)
> static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
> struct rwsem_waiter *waiter)
> {
> + bool first = (rwsem_first_waiter(sem) == waiter);
> long count, new;
>
> lockdep_assert_held(&sem->wait_lock);
> @@ -434,6 +435,9 @@ static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
> do {
> new = count;
>
> + if (!first && (count & (RWSEM_FLAG_HANDOFF | RWSEM_LOCK_MASK)))
> + return false;
> +
> if (count & RWSEM_LOCK_MASK) {
> /*
> * A waiter (first or not) can set the handoff bit

I couldn't immediately make sense of the above, and I think there's case
where not-first would still want to set handoff you're missing.

After a few detours, I ended up with the below; does that make sense or
did I just make a bigger mess? (entirely possible due to insufficient
sleep etc..).


--- a/kernel/locking/rwsem.c
+++ b/kernel/locking/rwsem.c
@@ -426,12 +426,27 @@ rwsem_waiter_wake(struct rwsem_waiter *w
static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
struct rwsem_waiter *waiter)
{
+ bool first = (rwsem_first_waiter(sem) == waiter);
long count, new;

lockdep_assert_held(&sem->wait_lock);

count = atomic_long_read(&sem->count);
do {
+ /*
+ * first handoff
+ *
+ * 0 0 | take
+ * 0 1 | not take
+ * 1 0 | take
+ * 1 1 | take
+ *
+ */
+ bool handoff = count & RWSEM_FLAG_HANDOFF;
+
+ if (!first && handoff)
+ return false;
+
new = count;

if (count & RWSEM_LOCK_MASK) {
@@ -440,7 +455,7 @@ static inline bool rwsem_try_write_lock(
* if it is an RT task or wait in the wait queue
* for too long.
*/
- if ((count & RWSEM_FLAG_HANDOFF) ||
+ if (handoff ||
(!rt_task(waiter->task) &&
!time_after(jiffies, waiter->timeout)))
return false;
@@ -501,11 +516,19 @@ static void rwsem_writer_wake(struct rw_
*/
list_del(&waiter->list);
atomic_long_set(&sem->owner, (long)waiter->task);
-
- } else if (!rwsem_try_write_lock(sem, waiter))
+ rwsem_waiter_wake(waiter, wake_q);
return;
+ }

- rwsem_waiter_wake(waiter, wake_q);
+ /*
+ * Mark writer at the front of the queue for wakeup.
+ *
+ * Until the task is actually awoken later by the caller, other writers
+ * are able to steal it. Readers, on the other hand, will block as they
+ * will notice the queued writer.
+ */
+ wake_q_add(wake_q, waiter->task);
+ lockevent_inc(rwsem_wake_writer);
}

static void rwsem_reader_wake(struct rw_semaphore *sem,
@@ -1038,6 +1061,25 @@ rwsem_waiter_wait(struct rw_semaphore *s
/* Matches rwsem_waiter_wake()'s smp_store_release(). */
break;
}
+ if (!reader) {
+ /*
+ * If rwsem_writer_wake() did not take the lock, we must
+ * rwsem_try_write_lock() here.
+ */
+ raw_spin_lock_irq(&sem->wait_lock);
+ if (waiter->task && rwsem_try_write_lock(sem, waiter)) {
+ waiter->task = NULL;
+ raw_spin_unlock_irq(&sem->wait_lock);
+ break;
+ }
+ raw_spin_unlock_irq(&sem->wait_lock);
+
+ if (waiter->handoff_set)
+ rwsem_spin_on_owner(sem);
+
+ if (!smp_load_acquire(&waiter->task))
+ break;
+ }
if (signal_pending_state(state, current)) {
raw_spin_lock_irq(&sem->wait_lock);
if (waiter->task)

2023-03-29 03:52:29

by Waiman Long

[permalink] [raw]
Subject: Re: [PATCH v2 8/8] locking/rwsem: Restore old write lock slow path behavior

On 3/28/23 10:02, Peter Zijlstra wrote:
> On Mon, Mar 27, 2023 at 04:24:13PM -0400, Waiman Long wrote:
>
>> kernel/locking/rwsem.c | 34 +++++++++++++++++++++++++++++++---
>> 1 file changed, 31 insertions(+), 3 deletions(-)
>>
>> diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c
>> index 7bd26e64827a..cf9dc1e250e0 100644
>> --- a/kernel/locking/rwsem.c
>> +++ b/kernel/locking/rwsem.c
>> @@ -426,6 +426,7 @@ rwsem_waiter_wake(struct rwsem_waiter *waiter, struct wake_q_head *wake_q)
>> static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
>> struct rwsem_waiter *waiter)
>> {
>> + bool first = (rwsem_first_waiter(sem) == waiter);
>> long count, new;
>>
>> lockdep_assert_held(&sem->wait_lock);
>> @@ -434,6 +435,9 @@ static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
>> do {
>> new = count;
>>
>> + if (!first && (count & (RWSEM_FLAG_HANDOFF | RWSEM_LOCK_MASK)))
>> + return false;
>> +
>> if (count & RWSEM_LOCK_MASK) {
>> /*
>> * A waiter (first or not) can set the handoff bit
> I couldn't immediately make sense of the above, and I think there's case
> where not-first would still want to set handoff you're missing.

It is possible to do that, but we need a minor change to make sure that
you set the handoff_set flag of the first waiter instead of the current
waiter which is what the current rwsem code is doing. Other than that, I
think the change is OK, though I need to take a further look into it
tomorrow as it is now late for me too.

Cheers,
longman

>
> After a few detours, I ended up with the below; does that make sense or
> did I just make a bigger mess? (entirely possible due to insufficient
> sleep etc..).
>
>
> --- a/kernel/locking/rwsem.c
> +++ b/kernel/locking/rwsem.c
> @@ -426,12 +426,27 @@ rwsem_waiter_wake(struct rwsem_waiter *w
> static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
> struct rwsem_waiter *waiter)
> {
> + bool first = (rwsem_first_waiter(sem) == waiter);
> long count, new;
>
> lockdep_assert_held(&sem->wait_lock);
>
> count = atomic_long_read(&sem->count);
> do {
> + /*
> + * first handoff
> + *
> + * 0 0 | take
> + * 0 1 | not take
> + * 1 0 | take
> + * 1 1 | take
> + *
> + */
> + bool handoff = count & RWSEM_FLAG_HANDOFF;
> +
> + if (!first && handoff)
> + return false;
> +
> new = count;
>
> if (count & RWSEM_LOCK_MASK) {
> @@ -440,7 +455,7 @@ static inline bool rwsem_try_write_lock(
> * if it is an RT task or wait in the wait queue
> * for too long.
> */
> - if ((count & RWSEM_FLAG_HANDOFF) ||
> + if (handoff ||
> (!rt_task(waiter->task) &&
> !time_after(jiffies, waiter->timeout)))
> return false;
> @@ -501,11 +516,19 @@ static void rwsem_writer_wake(struct rw_
> */
> list_del(&waiter->list);
> atomic_long_set(&sem->owner, (long)waiter->task);
> -
> - } else if (!rwsem_try_write_lock(sem, waiter))
> + rwsem_waiter_wake(waiter, wake_q);
> return;
> + }
>
> - rwsem_waiter_wake(waiter, wake_q);
> + /*
> + * Mark writer at the front of the queue for wakeup.
> + *
> + * Until the task is actually awoken later by the caller, other writers
> + * are able to steal it. Readers, on the other hand, will block as they
> + * will notice the queued writer.
> + */
> + wake_q_add(wake_q, waiter->task);
> + lockevent_inc(rwsem_wake_writer);
> }
>
> static void rwsem_reader_wake(struct rw_semaphore *sem,
> @@ -1038,6 +1061,25 @@ rwsem_waiter_wait(struct rw_semaphore *s
> /* Matches rwsem_waiter_wake()'s smp_store_release(). */
> break;
> }
> + if (!reader) {
> + /*
> + * If rwsem_writer_wake() did not take the lock, we must
> + * rwsem_try_write_lock() here.
> + */
> + raw_spin_lock_irq(&sem->wait_lock);
> + if (waiter->task && rwsem_try_write_lock(sem, waiter)) {
> + waiter->task = NULL;
> + raw_spin_unlock_irq(&sem->wait_lock);
> + break;
> + }
> + raw_spin_unlock_irq(&sem->wait_lock);
> +
> + if (waiter->handoff_set)
> + rwsem_spin_on_owner(sem);
> +
> + if (!smp_load_acquire(&waiter->task))
> + break;
> + }
> if (signal_pending_state(state, current)) {
> raw_spin_lock_irq(&sem->wait_lock);
> if (waiter->task)
>