The first patch in this series is the same patch 4 of the v7 patch series
[1] with some update in commit log and comment. The version number is
reset here as the first 3 patches of the series have been merged. Patch
2 is another minor enhancement for some specific use cases.
[1] https://lore.kernel.org/lkml/[email protected]/
Waiman Long (2):
locking/rwsem: Enable early rwsem writer lock handoff
locking/rwsem: Wake up all readers for wait queue waker
kernel/locking/rwsem.c | 89 +++++++++++++++++++++++++++++++++---------
1 file changed, 71 insertions(+), 18 deletions(-)
--
2.31.1
The lock handoff provided in rwsem isn't a true handoff like that in
the mutex. Instead, it is more like a quiescent state where optimistic
spinning and lock stealing are disabled to make it easier for the first
waiter to acquire the lock.
For readers, setting the HANDOFF bit will disable writers from stealing
the lock. The actual handoff is done at rwsem_wake() time after taking
the wait_lock. There isn't much we need to improve here other than
setting the RWSEM_NONSPINNABLE bit in owner.
For writers, setting the HANDOFF bit does not guarantee that it can
acquire the rwsem successfully in a subsequent rwsem_try_write_lock()
after setting the bit there. A reader can come in and add a
RWSEM_READER_BIAS temporarily which can spoil the takeover of the rwsem
in rwsem_try_write_lock() leading to additional delay.
For mutex, lock handoff is done at unlock time as the owner value and
the handoff bit is in the same lock word and can be updated atomically.
That is the not case for rwsem which has a count value for locking and
a different owner value for storing lock owner. In addition, the handoff
processing differs depending on whether the first waiter is a writer or a
reader. We can only make that waiter type determination after acquiring
the wait lock. Together with the fact that the RWSEM_FLAG_HANDOFF bit
is stable while holding the wait_lock, the most convenient place to do
the early handoff is at rwsem_wake() where wait_lock has to be acquired
anyway. There isn't much additional cost in doing this check there while
increasing the chance that a lock handoff will be successful when the
writer wakes up.
Since a lot can happen between unlock time and after acquiring the
wait_lock in rwsem_wake(), we have to reconfirm the presence of the
handoff bit and the lock is free before doing the handoff.
Running a 96-thread rwsem locking test on a 96-thread x86-64 system,
the locking throughput increases slightly from 588 kops/s to 592 kops/s
with this change.
Kernel test robot also noticed a 19.3% improvement of
will-it-scale.per_thread_ops due to this commit [1].
[1] https://lore.kernel.org/lkml/[email protected]/
Signed-off-by: Waiman Long <[email protected]>
---
kernel/locking/rwsem.c | 74 +++++++++++++++++++++++++++++++++++-------
1 file changed, 63 insertions(+), 11 deletions(-)
diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c
index acb5a50309a1..3936a5fe1229 100644
--- a/kernel/locking/rwsem.c
+++ b/kernel/locking/rwsem.c
@@ -40,7 +40,7 @@
*
* When the rwsem is reader-owned and a spinning writer has timed out,
* the nonspinnable bit will be set to disable optimistic spinning.
-
+ *
* When a writer acquires a rwsem, it puts its task_struct pointer
* into the owner field. It is cleared after an unlock.
*
@@ -430,6 +430,10 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
* Mark writer at the front of the queue for wakeup.
* Until the task is actually later awoken later by
* the caller, other writers are able to steal it.
+ *
+ * *Unless* HANDOFF is set, in which case only the
+ * first waiter is allowed to take it.
+ *
* Readers, on the other hand, will block as they
* will notice the queued writer.
*/
@@ -467,7 +471,12 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
adjustment -= RWSEM_FLAG_HANDOFF;
lockevent_inc(rwsem_rlock_handoff);
}
+ /*
+ * With HANDOFF set for reader, we must
+ * terminate all spinning.
+ */
waiter->handoff_set = true;
+ rwsem_set_nonspinnable(sem);
}
atomic_long_add(-adjustment, &sem->count);
@@ -609,6 +618,12 @@ static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
lockdep_assert_held(&sem->wait_lock);
+ if (!waiter->task) {
+ /* Write lock handed off */
+ smp_acquire__after_ctrl_dep();
+ return true;
+ }
+
count = atomic_long_read(&sem->count);
do {
bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
@@ -754,6 +769,10 @@ rwsem_spin_on_owner(struct rw_semaphore *sem)
owner = rwsem_owner_flags(sem, &flags);
state = rwsem_owner_state(owner, flags);
+
+ if (owner == current)
+ return OWNER_NONSPINNABLE; /* Handoff granted */
+
if (state != OWNER_WRITER)
return state;
@@ -844,7 +863,6 @@ static bool rwsem_optimistic_spin(struct rw_semaphore *sem)
* Try to acquire the lock
*/
taken = rwsem_try_write_lock_unqueued(sem);
-
if (taken)
break;
@@ -1168,21 +1186,23 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
* without sleeping.
*/
if (waiter.handoff_set) {
- enum owner_state owner_state;
-
- owner_state = rwsem_spin_on_owner(sem);
- if (owner_state == OWNER_NULL)
- goto trylock_again;
+ rwsem_spin_on_owner(sem);
+ if (!READ_ONCE(waiter.task)) {
+ /* Write lock handed off */
+ smp_acquire__after_ctrl_dep();
+ set_current_state(TASK_RUNNING);
+ goto out;
+ }
}
schedule_preempt_disabled();
lockevent_inc(rwsem_sleep_writer);
set_current_state(state);
-trylock_again:
raw_spin_lock_irq(&sem->wait_lock);
}
__set_current_state(TASK_RUNNING);
raw_spin_unlock_irq(&sem->wait_lock);
+out:
lockevent_inc(rwsem_wlock);
trace_contention_end(sem, 0);
return sem;
@@ -1190,6 +1210,11 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
out_nolock:
__set_current_state(TASK_RUNNING);
raw_spin_lock_irq(&sem->wait_lock);
+ if (!waiter.task) {
+ smp_acquire__after_ctrl_dep();
+ raw_spin_unlock_irq(&sem->wait_lock);
+ goto out;
+ }
rwsem_del_wake_waiter(sem, &waiter, &wake_q);
lockevent_inc(rwsem_wlock_fail);
trace_contention_end(sem, -EINTR);
@@ -1202,14 +1227,41 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
*/
static struct rw_semaphore *rwsem_wake(struct rw_semaphore *sem)
{
- unsigned long flags;
DEFINE_WAKE_Q(wake_q);
+ unsigned long flags;
+ unsigned long count;
raw_spin_lock_irqsave(&sem->wait_lock, flags);
- if (!list_empty(&sem->wait_list))
- rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
+ if (list_empty(&sem->wait_list))
+ goto unlock_out;
+
+ /*
+ * If the rwsem is free and handoff flag is set with wait_lock held,
+ * no other CPUs can take an active lock.
+ */
+ count = atomic_long_read(&sem->count);
+ if (!(count & RWSEM_LOCK_MASK) && (count & RWSEM_FLAG_HANDOFF)) {
+ /*
+ * Since rwsem_mark_wake() will handle the handoff to readers
+ * properly, we don't need to do anything extra for readers.
+ * Early handoff processing will only be needed for writers.
+ */
+ struct rwsem_waiter *waiter = rwsem_first_waiter(sem);
+ long adj = RWSEM_WRITER_LOCKED - RWSEM_FLAG_HANDOFF;
+
+ if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
+ atomic_long_set(&sem->owner, (long)waiter->task);
+ atomic_long_add(adj, &sem->count);
+ wake_q_add(&wake_q, waiter->task);
+ rwsem_del_waiter(sem, waiter);
+ waiter->task = NULL; /* Signal the handoff */
+ goto unlock_out;
+ }
+ }
+ rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
+unlock_out:
raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
wake_up_q(&wake_q);
--
2.31.1