Linus, I would like you to take this for v3.10.
These patches extend Alex Shi's work (which added write lock stealing
on the rwsem slow path) in order to provide rwsem write lock stealing
on the fast path (that is, without taking the rwsem's wait_lock).
I have unfortunately been unable to push this through -next before due
to Ingo Molnar / David Howells / Peter Zijlstra being busy with other
things. However, this has gotten some attention from Rik van Riel and
Davidlohr Bueso who both commented that they felt this was ready for
v3.10, and Ingo Molnar has said that he was OK with me pushing directly
to you. So, here goes :)
Davidlohr got the following test results from pgbench running on a
quad-core laptop:
| db_size | clients | tps-vanilla | tps-rwsem |
+---------+----------+----------------+--------------+
| 160 MB | 1 | 5803 | 6906 | + 19.0%
| 160 MB | 2 | 13092 | 15931 |
| 160 MB | 4 | 29412 | 33021 |
| 160 MB | 8 | 32448 | 34626 |
| 160 MB | 16 | 32758 | 33098 |
| 160 MB | 20 | 26940 | 31343 | + 16.3%
| 160 MB | 30 | 25147 | 28961 |
| 160 MB | 40 | 25484 | 26902 |
| 160 MB | 50 | 24528 | 25760 |
------------------------------------------------------
| 1.6 GB | 1 | 5733 | 7729 | + 34.8%
| 1.6 GB | 2 | 9411 | 19009 | + 101.9%
| 1.6 GB | 4 | 31818 | 33185 |
| 1.6 GB | 8 | 33700 | 34550 |
| 1.6 GB | 16 | 32751 | 33079 |
| 1.6 GB | 20 | 30919 | 31494 |
| 1.6 GB | 30 | 28540 | 28535 |
| 1.6 GB | 40 | 26380 | 27054 |
| 1.6 GB | 50 | 25241 | 25591 |
------------------------------------------------------
| 7.6 GB | 1 | 5779 | 6224 |
| 7.6 GB | 2 | 10897 | 13611 | + 24.9%
| 7.6 GB | 4 | 32683 | 33108 |
| 7.6 GB | 8 | 33968 | 34712 |
| 7.6 GB | 16 | 32287 | 32895 |
| 7.6 GB | 20 | 27770 | 31689 | + 14.1%
| 7.6 GB | 30 | 26739 | 29003 |
| 7.6 GB | 40 | 24901 | 26683 |
| 7.6 GB | 50 | 17115 | 25925 | + 51.5%
------------------------------------------------------
(Davidlohr also has one additional patch which further improves throughput,
though I will ask him to send it directly to you as I have suggested some
minor changes).
Patches 1-2 are for cleanups:
- Patch 1 replaces the waiter type bitmask with an enumeration
(as we don't have any other planned uses for the bitmap)
- Patch 2 shortens critical sections in rwsem_down_failed_common() so
they don't cover more than what is absolutely necessary
Patches 3-5 splits rwsem_down_failed_common() into separate functions
for the read and write sides:
- Patch 3 simply puts two identical copies of rwsem_down_failed_common()
into rwsem_down_{read,write}_failed (no code changes, in order to
make the review easier)
- Patch 4 does easy simplifications in rwsem_down_read_failed():
- We don't need to wake readers queued before us;
- We don't need to try to steal the lock, and thus we don't need to
acquire the wait_lock after sleeping.
- Patch 5 does easy simplifications in rwsem_down_write_failed():
- We don't need to check for !waiter.task since __rwsem_do_wake()
doesn't remove writers from the wait_list;
- Since the only way to exit the wait loop is by stealing the write lock,
the corresponding exit code can be moved after the loop;
- There is no point releaseing the wait_lock before entering the
wait loop, as we will need to reacquire it immediately;
- We don't need to get a reference on the task structure, since the task
is responsible for removing itself from the wait_list;
Patches 6-9 apply additional optimizations to rwsem_down_write_failed():
- Patch 6 tries write lock stealing more aggressively in order to avoid
extra checks;
- Patch 7 uses cmpxchg to implement the write lock stealing, instead of
doing an additive adjustment that might need to be backed out;
- Patch 8 avoids taking the wait_lock if there are already active locks
when we wake up;
- Patch 9 avoids the initial trylock if there were already active locks
when we entered rwsem_down_write_failed()
Patches 10-11 are updates to the __rwsem_do_wake function:
- Patch 10 has simple structure cleanups that don't change the code behavior
- Patch 11 adds generic support for fastpath write lock stealing, by
removing assumptions the function did about rwsem acquisitions being
prevented when the rwsem count value indicates there are queued waiters.
Patch 12 fixes a race condition Peter Hurley noticed in reviewing v1 of
this patch series, which resulted in readers sometimes blocking instead
of executing in parallel with other existing readers.
Patch 13 finally implements rwsem fast path lock stealing for x86 arch.
Patch 14 is a small additional cleanup by Davidlohr Bueso:
"Drop the signed. Just long. It's cleaner."
Davidlohr Bueso (1):
rwsem: no need for explicit signed longs
Michel Lespinasse (13):
rwsem: make the waiter type an enumeration rather than a bitmask
rwsem: shorter spinlocked section in rwsem_down_failed_common()
rwsem: move rwsem_down_failed_common code into rwsem_down_{read,write}_failed
rwsem: simplify rwsem_down_read_failed
rwsem: simplify rwsem_down_write_failed
rwsem: more agressive lock stealing in rwsem_down_write_failed
rwsem: use cmpxchg for trying to steal write lock
rwsem: avoid taking wait_lock in rwsem_down_write_failed
rwsem: skip initial trylock in rwsem_down_write_failed
rwsem: simplify __rwsem_do_wake
rwsem: implement support for write lock stealing on the fastpath
rwsem: do not block readers at head of queue if other readers are active
x86 rwsem: avoid taking slow path when stealing write lock
arch/x86/include/asm/rwsem.h | 28 +++--
lib/rwsem-spinlock.c | 38 +++----
lib/rwsem.c | 240 +++++++++++++++++++++----------------------
3 files changed, 153 insertions(+), 153 deletions(-)
Changes from v2:
- Patches 1-13 are unmodified (except for some comments being modified in
response to code reviews on v2).
- Patch 14 is new as suggested by Peter Hurley and Davidlohr Bueso.
Changes from v1 to v2:
- Patches 1-9 are unchanged and Patch 13 are unchanged from prior submission.
- v1 series made a change to ordering of reader wakeups which proved more
controversial than I had anticipated. I reverted this part of the series.
Patches 10-11 are new and imlement (well, in Patch 11) the minimal changes
required for supporting fastpath write lock stealing without modifying the
ordering of reader wakeups (This results in longer code than with the
prior proposal, though).
- Patch 12 is new, fixing a race condition Peter Hurley noticed while
reviewing v1 which could result in reduced parallelism between readers.
--
1.8.2.1
Remove the rwsem_down_failed_common function and replace it with two
identical copies of its code in rwsem_down_{read,write}_failed.
This is because we want to make different optimizations in
rwsem_down_{read,write}_failed; we are adding this pure-duplication
step as a separate commit in order to make it easier to check the
following steps.
Signed-off-by: Michel Lespinasse <[email protected]>
Reviewed-by: Rik van Riel <[email protected]>
Reviewed-by: Peter Hurley <[email protected]>
Acked-by: Davidlohr Bueso <[email protected]>
---
lib/rwsem.c | 72 ++++++++++++++++++++++++++++++++++++++++++++++++-------------
1 file changed, 57 insertions(+), 15 deletions(-)
diff --git a/lib/rwsem.c b/lib/rwsem.c
index 40636454cf3c..fb658af1c12c 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -178,12 +178,12 @@ try_again_write:
}
/*
- * wait for a lock to be granted
+ * wait for the read lock to be granted
*/
-static struct rw_semaphore __sched *
-rwsem_down_failed_common(struct rw_semaphore *sem,
- enum rwsem_waiter_type type, signed long adjustment)
+struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
{
+ enum rwsem_waiter_type type = RWSEM_WAITING_FOR_READ;
+ signed long adjustment = -RWSEM_ACTIVE_READ_BIAS;
struct rwsem_waiter waiter;
struct task_struct *tsk = current;
signed long count;
@@ -238,21 +238,63 @@ rwsem_down_failed_common(struct rw_semaphore *sem,
}
/*
- * wait for the read lock to be granted
- */
-struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
-{
- return rwsem_down_failed_common(sem, RWSEM_WAITING_FOR_READ,
- -RWSEM_ACTIVE_READ_BIAS);
-}
-
-/*
* wait for the write lock to be granted
*/
struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
{
- return rwsem_down_failed_common(sem, RWSEM_WAITING_FOR_WRITE,
- -RWSEM_ACTIVE_WRITE_BIAS);
+ enum rwsem_waiter_type type = RWSEM_WAITING_FOR_WRITE;
+ signed long adjustment = -RWSEM_ACTIVE_WRITE_BIAS;
+ struct rwsem_waiter waiter;
+ struct task_struct *tsk = current;
+ signed long count;
+
+ /* set up my own style of waitqueue */
+ waiter.task = tsk;
+ waiter.type = type;
+ get_task_struct(tsk);
+
+ raw_spin_lock_irq(&sem->wait_lock);
+ if (list_empty(&sem->wait_list))
+ adjustment += RWSEM_WAITING_BIAS;
+ list_add_tail(&waiter.list, &sem->wait_list);
+
+ /* we're now waiting on the lock, but no longer actively locking */
+ count = rwsem_atomic_update(adjustment, sem);
+
+ /* If there are no active locks, wake the front queued process(es) up.
+ *
+ * Alternatively, if we're called from a failed down_write(), there
+ * were already threads queued before us and there are no active
+ * writers, the lock must be read owned; so we try to wake any read
+ * locks that were queued ahead of us. */
+ if (count == RWSEM_WAITING_BIAS)
+ sem = __rwsem_do_wake(sem, RWSEM_WAKE_NO_ACTIVE);
+ else if (count > RWSEM_WAITING_BIAS &&
+ adjustment == -RWSEM_ACTIVE_WRITE_BIAS)
+ sem = __rwsem_do_wake(sem, RWSEM_WAKE_READ_OWNED);
+
+ raw_spin_unlock_irq(&sem->wait_lock);
+
+ /* wait to be given the lock */
+ while (true) {
+ set_task_state(tsk, TASK_UNINTERRUPTIBLE);
+ if (!waiter.task)
+ break;
+
+ raw_spin_lock_irq(&sem->wait_lock);
+ /* Try to get the writer sem, may steal from the head writer: */
+ if (type == RWSEM_WAITING_FOR_WRITE)
+ if (try_get_writer_sem(sem, &waiter)) {
+ raw_spin_unlock_irq(&sem->wait_lock);
+ return sem;
+ }
+ raw_spin_unlock_irq(&sem->wait_lock);
+ schedule();
+ }
+
+ tsk->state = TASK_RUNNING;
+
+ return sem;
}
/*
--
1.8.2.1
We can skip the initial trylock in rwsem_down_write_failed() if there are
known active lockers already, thus saving one likely-to-fail cmpxchg.
Signed-off-by: Michel Lespinasse <[email protected]>
Reviewed-by: Peter Hurley <[email protected]>
Acked-by: Davidlohr Bueso <[email protected]>
Acked-by: Rik van Riel <[email protected]>
---
lib/rwsem.c | 17 +++++++++--------
1 file changed, 9 insertions(+), 8 deletions(-)
diff --git a/lib/rwsem.c b/lib/rwsem.c
index edf3d9ca670e..0d50e46d5b0c 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -216,14 +216,15 @@ struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
/* wait until we successfully acquire the lock */
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
while (true) {
-
- /* Try acquiring the write lock. */
- count = RWSEM_ACTIVE_WRITE_BIAS;
- if (!list_is_singular(&sem->wait_list))
- count += RWSEM_WAITING_BIAS;
- if (cmpxchg(&sem->count, RWSEM_WAITING_BIAS, count) ==
+ if (!(count & RWSEM_ACTIVE_MASK)) {
+ /* Try acquiring the write lock. */
+ count = RWSEM_ACTIVE_WRITE_BIAS;
+ if (!list_is_singular(&sem->wait_list))
+ count += RWSEM_WAITING_BIAS;
+ if (cmpxchg(&sem->count, RWSEM_WAITING_BIAS, count) ==
RWSEM_WAITING_BIAS)
- break;
+ break;
+ }
raw_spin_unlock_irq(&sem->wait_lock);
@@ -231,7 +232,7 @@ struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
do {
schedule();
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
- } while (sem->count & RWSEM_ACTIVE_MASK);
+ } while ((count = sem->count) & RWSEM_ACTIVE_MASK);
raw_spin_lock_irq(&sem->wait_lock);
}
--
1.8.2.1
Some small code simplifications can be achieved by doing more agressive
lock stealing:
- When rwsem_down_write_failed() notices that there are no active locks
(and thus no thread to wake us if we decided to sleep), it used to wake
the first queued process. However, stealing the lock is also sufficient
to deal with this case, so we don't need this check anymore.
- In try_get_writer_sem(), we can steal the lock even when the first waiter
is a reader. This is correct because the code path that wakes readers is
protected by the wait_lock. As to the performance effects of this change,
they are expected to be minimal: readers are still granted the lock
(rather than having to acquire it themselves) when they reach the front
of the wait queue, so we have essentially the same behavior as in
rwsem-spinlock.
Signed-off-by: Michel Lespinasse <[email protected]>
Reviewed-by: Rik van Riel <[email protected]>
Reviewed-by: Peter Hurley <[email protected]>
Acked-by: Davidlohr Bueso <[email protected]>
---
lib/rwsem.c | 29 ++++++++---------------------
1 file changed, 8 insertions(+), 21 deletions(-)
diff --git a/lib/rwsem.c b/lib/rwsem.c
index c73bd96dc30c..2360bf204098 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -143,20 +143,12 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
}
/* Try to get write sem, caller holds sem->wait_lock: */
-static int try_get_writer_sem(struct rw_semaphore *sem,
- struct rwsem_waiter *waiter)
+static int try_get_writer_sem(struct rw_semaphore *sem)
{
- struct rwsem_waiter *fwaiter;
long oldcount, adjustment;
- /* only steal when first waiter is writing */
- fwaiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
- if (fwaiter->type != RWSEM_WAITING_FOR_WRITE)
- return 0;
-
adjustment = RWSEM_ACTIVE_WRITE_BIAS;
- /* Only one waiter in the queue: */
- if (fwaiter == waiter && waiter->list.next == &sem->wait_list)
+ if (list_is_singular(&sem->wait_list))
adjustment -= RWSEM_WAITING_BIAS;
try_again_write:
@@ -233,23 +225,18 @@ struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
/* we're now waiting on the lock, but no longer actively locking */
count = rwsem_atomic_update(adjustment, sem);
- /* If there are no active locks, wake the front queued process(es) up.
- *
- * Alternatively, if we're called from a failed down_write(), there
- * were already threads queued before us and there are no active
- * writers, the lock must be read owned; so we try to wake any read
- * locks that were queued ahead of us. */
- if (count == RWSEM_WAITING_BIAS)
- sem = __rwsem_do_wake(sem, RWSEM_WAKE_NO_ACTIVE);
- else if (count > RWSEM_WAITING_BIAS &&
- adjustment == -RWSEM_ACTIVE_WRITE_BIAS)
+ /* If there were already threads queued before us and there are no
+ * active writers, the lock must be read owned; so we try to wake
+ * any read locks that were queued ahead of us. */
+ if (count > RWSEM_WAITING_BIAS &&
+ adjustment == -RWSEM_ACTIVE_WRITE_BIAS)
sem = __rwsem_do_wake(sem, RWSEM_WAKE_READ_OWNED);
/* wait until we successfully acquire the lock */
while (true) {
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
- if (try_get_writer_sem(sem, &waiter))
+ if (try_get_writer_sem(sem))
break;
raw_spin_unlock_irq(&sem->wait_lock);
--
1.8.2.1
When we decide to wake up readers, we must first grant them as many
read locks as necessary, and then actually wake up all these readers.
But in order to know how many read shares to grant, we must first
count the readers at the head of the queue. This might take a while
if there are many readers, and we want to be protected against a
writer stealing the lock while we're counting. To that end, we grant
the first reader lock before counting how many more readers are queued.
We also require some adjustments to the wake_type semantics.
RWSEM_WAKE_NO_ACTIVE used to mean that we had found the count to
be RWSEM_WAITING_BIAS, in which case the rwsem was known to be free
as nobody could steal it while we hold the wait_lock. This doesn't
make sense once we implement fastpath write lock stealing, so we now
use RWSEM_WAKE_ANY in that case.
Similarly, when rwsem_down_write_failed found that a read lock was active,
it would use RWSEM_WAKE_READ_OWNED which signalled that new readers could
be woken without checking first that the rwsem was available. We can't do
that anymore since the existing readers might release their read locks,
and a writer could steal the lock before we wake up additional readers.
So, we have to use a new RWSEM_WAKE_READERS value to indicate we only want
to wake readers, but we don't currently hold any read lock.
Signed-off-by: Michel Lespinasse <[email protected]>
Reviewed-by: Peter Hurley <[email protected]>
Acked-by: Davidlohr Bueso <[email protected]>
---
lib/rwsem.c | 64 ++++++++++++++++++++++++++++++-------------------------------
1 file changed, 32 insertions(+), 32 deletions(-)
diff --git a/lib/rwsem.c b/lib/rwsem.c
index 9a675fa9d78e..bbe48c04f363 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -4,6 +4,7 @@
* Derived from arch/i386/kernel/semaphore.c
*
* Writer lock-stealing by Alex Shi <[email protected]>
+ * and Michel Lespinasse <[email protected]>
*/
#include <linux/rwsem.h>
#include <linux/sched.h>
@@ -41,13 +42,11 @@ struct rwsem_waiter {
enum rwsem_waiter_type type;
};
-/* Wake types for __rwsem_do_wake(). Note that RWSEM_WAKE_NO_ACTIVE and
- * RWSEM_WAKE_READ_OWNED imply that the spinlock must have been kept held
- * since the rwsem value was observed.
- */
-#define RWSEM_WAKE_ANY 0 /* Wake whatever's at head of wait list */
-#define RWSEM_WAKE_NO_ACTIVE 1 /* rwsem was observed with no active thread */
-#define RWSEM_WAKE_READ_OWNED 2 /* rwsem was observed to be read owned */
+enum rwsem_wake_type {
+ RWSEM_WAKE_ANY, /* Wake whatever's at head of wait list */
+ RWSEM_WAKE_READERS, /* Wake readers only */
+ RWSEM_WAKE_READ_OWNED /* Waker thread holds the read lock */
+};
/*
* handle the lock release when processes blocked on it that can now run
@@ -60,16 +59,16 @@ struct rwsem_waiter {
* - writers are only woken if downgrading is false
*/
static struct rw_semaphore *
-__rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
+__rwsem_do_wake(struct rw_semaphore *sem, enum rwsem_wake_type wake_type)
{
struct rwsem_waiter *waiter;
struct task_struct *tsk;
struct list_head *next;
- signed long woken, loop, adjustment;
+ signed long oldcount, woken, loop, adjustment;
waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
- if (wake_type != RWSEM_WAKE_READ_OWNED)
+ if (wake_type == RWSEM_WAKE_ANY)
/* Wake writer at the front of the queue, but do not
* grant it the lock yet as we want other writers
* to be able to steal it. Readers, on the other hand,
@@ -79,24 +78,24 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
goto out;
}
- /* If we come here from up_xxxx(), another thread might have reached
- * rwsem_down_failed_common() before we acquired the spinlock and
- * woken up a waiter, making it now active. We prefer to check for
- * this first in order to not spend too much time with the spinlock
- * held if we're not going to be able to wake up readers in the end.
- *
- * Note that we do not need to update the rwsem count: any writer
- * trying to acquire rwsem will run rwsem_down_write_failed() due
- * to the waiting threads and block trying to acquire the spinlock.
- *
- * We use a dummy atomic update in order to acquire the cache line
- * exclusively since we expect to succeed and run the final rwsem
- * count adjustment pretty soon.
+ /* Writers might steal the lock before we grant it to the next reader.
+ * We prefer to do the first reader grant before counting readers
+ * so we can bail out early if a writer stole the lock.
*/
- if (wake_type == RWSEM_WAKE_ANY &&
- rwsem_atomic_update(0, sem) < RWSEM_WAITING_BIAS)
- /* Someone grabbed the sem for write already */
- goto out;
+ adjustment = 0;
+ if (wake_type != RWSEM_WAKE_READ_OWNED) {
+ adjustment = RWSEM_ACTIVE_READ_BIAS;
+ try_reader_grant:
+ oldcount = rwsem_atomic_update(adjustment, sem) - adjustment;
+ if (unlikely(oldcount < RWSEM_WAITING_BIAS)) {
+ /* A writer stole the lock. Undo our reader grant. */
+ if (rwsem_atomic_update(-adjustment, sem) &
+ RWSEM_ACTIVE_MASK)
+ goto out;
+ /* Last active locker left. Retry waking readers. */
+ goto try_reader_grant;
+ }
+ }
/* Grant an infinite number of read locks to the readers at the front
* of the queue. Note we increment the 'active part' of the count by
@@ -114,12 +113,13 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
} while (waiter->type != RWSEM_WAITING_FOR_WRITE);
- adjustment = woken * RWSEM_ACTIVE_READ_BIAS;
+ adjustment = woken * RWSEM_ACTIVE_READ_BIAS - adjustment;
if (waiter->type != RWSEM_WAITING_FOR_WRITE)
/* hit end of list above */
adjustment -= RWSEM_WAITING_BIAS;
- rwsem_atomic_add(adjustment, sem);
+ if (adjustment)
+ rwsem_atomic_add(adjustment, sem);
next = sem->wait_list.next;
loop = woken;
@@ -164,8 +164,8 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
count = rwsem_atomic_update(adjustment, sem);
/* If there are no active locks, wake the front queued process(es). */
- if (count == RWSEM_WAITING_BIAS)
- sem = __rwsem_do_wake(sem, RWSEM_WAKE_NO_ACTIVE);
+ if (!(count & RWSEM_ACTIVE_MASK))
+ sem = __rwsem_do_wake(sem, RWSEM_WAKE_ANY);
raw_spin_unlock_irq(&sem->wait_lock);
@@ -209,7 +209,7 @@ struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
* any read locks that were queued ahead of us. */
if (count > RWSEM_WAITING_BIAS &&
adjustment == -RWSEM_ACTIVE_WRITE_BIAS)
- sem = __rwsem_do_wake(sem, RWSEM_WAKE_READ_OWNED);
+ sem = __rwsem_do_wake(sem, RWSEM_WAKE_READERS);
/* wait until we successfully acquire the lock */
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
--
1.8.2.1
From: Davidlohr Bueso <[email protected]>
Change explicit "signed long" declarations into plain "long" as suggested
by Peter Hurley.
Signed-off-by: Davidlohr Bueso <[email protected]>
Reviewed-by: Michel Lespinasse <[email protected]>
Signed-off-by: Michel Lespinasse <[email protected]>
---
lib/rwsem.c | 8 +++-----
1 file changed, 3 insertions(+), 5 deletions(-)
diff --git a/lib/rwsem.c b/lib/rwsem.c
index 61f91ca75e40..cf0ad2ad19f5 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -64,7 +64,7 @@ __rwsem_do_wake(struct rw_semaphore *sem, enum rwsem_wake_type wake_type)
struct rwsem_waiter *waiter;
struct task_struct *tsk;
struct list_head *next;
- signed long oldcount, woken, loop, adjustment;
+ long oldcount, woken, loop, adjustment;
waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
@@ -145,10 +145,9 @@ __rwsem_do_wake(struct rw_semaphore *sem, enum rwsem_wake_type wake_type)
*/
struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
{
- signed long adjustment = -RWSEM_ACTIVE_READ_BIAS;
+ long count, adjustment = -RWSEM_ACTIVE_READ_BIAS;
struct rwsem_waiter waiter;
struct task_struct *tsk = current;
- signed long count;
/* set up my own style of waitqueue */
waiter.task = tsk;
@@ -193,10 +192,9 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
*/
struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
{
- signed long adjustment = -RWSEM_ACTIVE_WRITE_BIAS;
+ long count, adjustment = -RWSEM_ACTIVE_WRITE_BIAS;
struct rwsem_waiter waiter;
struct task_struct *tsk = current;
- signed long count;
/* set up my own style of waitqueue */
waiter.task = tsk;
--
1.8.2.1
modify __down_write[_nested] and __down_write_trylock to grab the write
lock whenever the active count is 0, even if there are queued waiters
(they must be writers pending wakeup, since the active count is 0).
Note that this is an optimization only; architectures without this
optimization will still work fine:
- __down_write() would take the slow path which would take the wait_lock
and then try stealing the lock (as in the spinlocked rwsem implementation)
- __down_write_trylock() would fail, but callers must be ready to deal
with that - since there are some writers pending wakeup, they could
have raced with us and obtained the lock before we steal it.
Signed-off-by: Michel Lespinasse <[email protected]>
Reviewed-by: Peter Hurley <[email protected]>
Acked-by: Davidlohr Bueso <[email protected]>
---
arch/x86/include/asm/rwsem.h | 28 +++++++++++++++++++++-------
1 file changed, 21 insertions(+), 7 deletions(-)
diff --git a/arch/x86/include/asm/rwsem.h b/arch/x86/include/asm/rwsem.h
index 2dbe4a721ce5..cad82c9c2fde 100644
--- a/arch/x86/include/asm/rwsem.h
+++ b/arch/x86/include/asm/rwsem.h
@@ -105,8 +105,8 @@ static inline void __down_write_nested(struct rw_semaphore *sem, int subclass)
asm volatile("# beginning down_write\n\t"
LOCK_PREFIX " xadd %1,(%2)\n\t"
/* adds 0xffff0001, returns the old value */
- " test %1,%1\n\t"
- /* was the count 0 before? */
+ " test " __ASM_SEL(%w1,%k1) "," __ASM_SEL(%w1,%k1) "\n\t"
+ /* was the active mask 0 before? */
" jz 1f\n"
" call call_rwsem_down_write_failed\n"
"1:\n"
@@ -126,11 +126,25 @@ static inline void __down_write(struct rw_semaphore *sem)
*/
static inline int __down_write_trylock(struct rw_semaphore *sem)
{
- long ret = cmpxchg(&sem->count, RWSEM_UNLOCKED_VALUE,
- RWSEM_ACTIVE_WRITE_BIAS);
- if (ret == RWSEM_UNLOCKED_VALUE)
- return 1;
- return 0;
+ long result, tmp;
+ asm volatile("# beginning __down_write_trylock\n\t"
+ " mov %0,%1\n\t"
+ "1:\n\t"
+ " test " __ASM_SEL(%w1,%k1) "," __ASM_SEL(%w1,%k1) "\n\t"
+ /* was the active mask 0 before? */
+ " jnz 2f\n\t"
+ " mov %1,%2\n\t"
+ " add %3,%2\n\t"
+ LOCK_PREFIX " cmpxchg %2,%0\n\t"
+ " jnz 1b\n\t"
+ "2:\n\t"
+ " sete %b1\n\t"
+ " movzbl %b1, %k1\n\t"
+ "# ending __down_write_trylock\n\t"
+ : "+m" (sem->count), "=&a" (result), "=&r" (tmp)
+ : "er" (RWSEM_ACTIVE_WRITE_BIAS)
+ : "memory", "cc");
+ return result;
}
/*
--
1.8.2.1
This change fixes a race condition where a reader might determine it
needs to block, but by the time it acquires the wait_lock the rwsem
has active readers and no queued waiters.
In this situation the reader can run in parallel with the existing active
readers; it does not need to block until the active readers complete.
Thanks to Peter Hurley for noticing this possible race.
Signed-off-by: Michel Lespinasse <[email protected]>
Reviewed-by: Peter Hurley <[email protected]>
Acked-by: Davidlohr Bueso <[email protected]>
---
lib/rwsem.c | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
diff --git a/lib/rwsem.c b/lib/rwsem.c
index bbe48c04f363..61f91ca75e40 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -163,8 +163,14 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
/* we're now waiting on the lock, but no longer actively locking */
count = rwsem_atomic_update(adjustment, sem);
- /* If there are no active locks, wake the front queued process(es). */
- if (!(count & RWSEM_ACTIVE_MASK))
+ /* If there are no active locks, wake the front queued process(es).
+ *
+ * If there are no writers and we are first in the queue,
+ * wake our own waiter to join the existing active readers !
+ */
+ if (count == RWSEM_WAITING_BIAS ||
+ (count > RWSEM_WAITING_BIAS &&
+ adjustment != -RWSEM_ACTIVE_READ_BIAS))
sem = __rwsem_do_wake(sem, RWSEM_WAKE_ANY);
raw_spin_unlock_irq(&sem->wait_lock);
--
1.8.2.1
This is mostly for cleanup value:
- We don't need several gotos to handle the case where the first
waiter is a writer. Two simple tests will do (and generate very
similar code).
- In the remainder of the function, we know the first waiter is a reader,
so we don't have to double check that. We can use do..while loops
to iterate over the readers to wake (generates slightly better code).
Signed-off-by: Michel Lespinasse <[email protected]>
Reviewed-by: Peter Hurley <[email protected]>
Acked-by: Davidlohr Bueso <[email protected]>
---
lib/rwsem-spinlock.c | 23 +++++++----------------
lib/rwsem.c | 26 ++++++++++++--------------
2 files changed, 19 insertions(+), 30 deletions(-)
diff --git a/lib/rwsem-spinlock.c b/lib/rwsem-spinlock.c
index 5f117f37ac0a..9be8a9144978 100644
--- a/lib/rwsem-spinlock.c
+++ b/lib/rwsem-spinlock.c
@@ -70,26 +70,17 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wakewrite)
waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
- if (!wakewrite) {
- if (waiter->type == RWSEM_WAITING_FOR_WRITE)
- goto out;
- goto dont_wake_writers;
- }
-
- /*
- * as we support write lock stealing, we can't set sem->activity
- * to -1 here to indicate we get the lock. Instead, we wake it up
- * to let it go get it again.
- */
if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
- wake_up_process(waiter->task);
+ if (wakewrite)
+ /* Wake up a writer. Note that we do not grant it the
+ * lock - it will have to acquire it when it runs. */
+ wake_up_process(waiter->task);
goto out;
}
/* grant an infinite number of read locks to the front of the queue */
- dont_wake_writers:
woken = 0;
- while (waiter->type == RWSEM_WAITING_FOR_READ) {
+ do {
struct list_head *next = waiter->list.next;
list_del(&waiter->list);
@@ -99,10 +90,10 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wakewrite)
wake_up_process(tsk);
put_task_struct(tsk);
woken++;
- if (list_empty(&sem->wait_list))
+ if (next == &sem->wait_list)
break;
waiter = list_entry(next, struct rwsem_waiter, list);
- }
+ } while (waiter->type != RWSEM_WAITING_FOR_WRITE);
sem->activity += woken;
diff --git a/lib/rwsem.c b/lib/rwsem.c
index 0d50e46d5b0c..9a675fa9d78e 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -68,20 +68,17 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
signed long woken, loop, adjustment;
waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
- if (waiter->type != RWSEM_WAITING_FOR_WRITE)
- goto readers_only;
-
- if (wake_type == RWSEM_WAKE_READ_OWNED)
- /* Another active reader was observed, so wakeup is not
- * likely to succeed. Save the atomic op.
- */
+ if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
+ if (wake_type != RWSEM_WAKE_READ_OWNED)
+ /* Wake writer at the front of the queue, but do not
+ * grant it the lock yet as we want other writers
+ * to be able to steal it. Readers, on the other hand,
+ * will block as they will notice the queued writer.
+ */
+ wake_up_process(waiter->task);
goto out;
+ }
- /* Wake up the writing waiter and let the task grab the sem: */
- wake_up_process(waiter->task);
- goto out;
-
- readers_only:
/* If we come here from up_xxxx(), another thread might have reached
* rwsem_down_failed_common() before we acquired the spinlock and
* woken up a waiter, making it now active. We prefer to check for
@@ -125,7 +122,8 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
rwsem_atomic_add(adjustment, sem);
next = sem->wait_list.next;
- for (loop = woken; loop > 0; loop--) {
+ loop = woken;
+ do {
waiter = list_entry(next, struct rwsem_waiter, list);
next = waiter->list.next;
tsk = waiter->task;
@@ -133,7 +131,7 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
waiter->task = NULL;
wake_up_process(tsk);
put_task_struct(tsk);
- }
+ } while (--loop);
sem->wait_list.next = next;
next->prev = &sem->wait_list;
--
1.8.2.1
In rwsem_down_write_failed(), if there are active locks after we wake up
(i.e. the lock got stolen from us), skip taking the wait_lock and go back
to sleep immediately.
Signed-off-by: Michel Lespinasse <[email protected]>
Reviewed-by: Peter Hurley <[email protected]>
Acked-by: Davidlohr Bueso <[email protected]>
Acked-by: Rik van Riel <[email protected]>
---
lib/rwsem.c | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
diff --git a/lib/rwsem.c b/lib/rwsem.c
index 64c2dc007be2..edf3d9ca670e 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -214,8 +214,8 @@ struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
sem = __rwsem_do_wake(sem, RWSEM_WAKE_READ_OWNED);
/* wait until we successfully acquire the lock */
+ set_task_state(tsk, TASK_UNINTERRUPTIBLE);
while (true) {
- set_task_state(tsk, TASK_UNINTERRUPTIBLE);
/* Try acquiring the write lock. */
count = RWSEM_ACTIVE_WRITE_BIAS;
@@ -226,7 +226,13 @@ struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
break;
raw_spin_unlock_irq(&sem->wait_lock);
- schedule();
+
+ /* Block until there are no active lockers. */
+ do {
+ schedule();
+ set_task_state(tsk, TASK_UNINTERRUPTIBLE);
+ } while (sem->count & RWSEM_ACTIVE_MASK);
+
raw_spin_lock_irq(&sem->wait_lock);
}
--
1.8.2.1
Using rwsem_atomic_update to try stealing the write lock forced us to
undo the adjustment in the failure path. We can have simpler and faster
code by using cmpxchg instead.
Signed-off-by: Michel Lespinasse <[email protected]>
Reviewed-by: Peter Hurley <[email protected]>
Acked-by: Davidlohr Bueso <[email protected]>
Acked-by: Rik van Riel <[email protected]>
---
lib/rwsem.c | 26 ++++++--------------------
1 file changed, 6 insertions(+), 20 deletions(-)
diff --git a/lib/rwsem.c b/lib/rwsem.c
index 2360bf204098..64c2dc007be2 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -142,25 +142,6 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
return sem;
}
-/* Try to get write sem, caller holds sem->wait_lock: */
-static int try_get_writer_sem(struct rw_semaphore *sem)
-{
- long oldcount, adjustment;
-
- adjustment = RWSEM_ACTIVE_WRITE_BIAS;
- if (list_is_singular(&sem->wait_list))
- adjustment -= RWSEM_WAITING_BIAS;
-
-try_again_write:
- oldcount = rwsem_atomic_update(adjustment, sem) - adjustment;
- if (!(oldcount & RWSEM_ACTIVE_MASK))
- return 1;
- /* some one grabbed the sem already */
- if (rwsem_atomic_update(-adjustment, sem) & RWSEM_ACTIVE_MASK)
- return 0;
- goto try_again_write;
-}
-
/*
* wait for the read lock to be granted
*/
@@ -236,7 +217,12 @@ struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
while (true) {
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
- if (try_get_writer_sem(sem))
+ /* Try acquiring the write lock. */
+ count = RWSEM_ACTIVE_WRITE_BIAS;
+ if (!list_is_singular(&sem->wait_list))
+ count += RWSEM_WAITING_BIAS;
+ if (cmpxchg(&sem->count, RWSEM_WAITING_BIAS, count) ==
+ RWSEM_WAITING_BIAS)
break;
raw_spin_unlock_irq(&sem->wait_lock);
--
1.8.2.1
When waking writers, we never grant them the lock - instead, they have
to acquire it themselves when they run, and remove themselves from the
wait_list when they succeed.
As a result, we can do a few simplifications in rwsem_down_write_failed():
- We don't need to check for !waiter.task since __rwsem_do_wake() doesn't
remove writers from the wait_list
- There is no point releaseing the wait_lock before entering the wait loop,
as we will need to reacquire it immediately. We can change the loop so
that the lock is always held at the start of each loop iteration.
- We don't need to get a reference on the task structure, since the task
is responsible for removing itself from the wait_list. There is no risk,
like in the rwsem_down_read_failed() case, that a task would wake up and
exit (thus destroying its task structure) while __rwsem_do_wake() is
still running - wait_lock protects against that.
Signed-off-by: Michel Lespinasse <[email protected]>
Reviewed-by: Rik van Riel <[email protected]>
Reviewed-by: Peter Hurley <[email protected]>
Acked-by: Davidlohr Bueso <[email protected]>
---
lib/rwsem.c | 33 +++++++++------------------------
1 file changed, 9 insertions(+), 24 deletions(-)
diff --git a/lib/rwsem.c b/lib/rwsem.c
index 66f307e90761..c73bd96dc30c 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -161,16 +161,8 @@ static int try_get_writer_sem(struct rw_semaphore *sem,
try_again_write:
oldcount = rwsem_atomic_update(adjustment, sem) - adjustment;
- if (!(oldcount & RWSEM_ACTIVE_MASK)) {
- /* No active lock: */
- struct task_struct *tsk = waiter->task;
-
- list_del(&waiter->list);
- smp_mb();
- put_task_struct(tsk);
- tsk->state = TASK_RUNNING;
+ if (!(oldcount & RWSEM_ACTIVE_MASK))
return 1;
- }
/* some one grabbed the sem already */
if (rwsem_atomic_update(-adjustment, sem) & RWSEM_ACTIVE_MASK)
return 0;
@@ -220,11 +212,10 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
}
/*
- * wait for the write lock to be granted
+ * wait until we successfully acquire the write lock
*/
struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
{
- enum rwsem_waiter_type type = RWSEM_WAITING_FOR_WRITE;
signed long adjustment = -RWSEM_ACTIVE_WRITE_BIAS;
struct rwsem_waiter waiter;
struct task_struct *tsk = current;
@@ -232,8 +223,7 @@ struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
/* set up my own style of waitqueue */
waiter.task = tsk;
- waiter.type = type;
- get_task_struct(tsk);
+ waiter.type = RWSEM_WAITING_FOR_WRITE;
raw_spin_lock_irq(&sem->wait_lock);
if (list_empty(&sem->wait_list))
@@ -255,25 +245,20 @@ struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
adjustment == -RWSEM_ACTIVE_WRITE_BIAS)
sem = __rwsem_do_wake(sem, RWSEM_WAKE_READ_OWNED);
- raw_spin_unlock_irq(&sem->wait_lock);
-
- /* wait to be given the lock */
+ /* wait until we successfully acquire the lock */
while (true) {
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
- if (!waiter.task)
+
+ if (try_get_writer_sem(sem, &waiter))
break;
- raw_spin_lock_irq(&sem->wait_lock);
- /* Try to get the writer sem, may steal from the head writer: */
- if (type == RWSEM_WAITING_FOR_WRITE)
- if (try_get_writer_sem(sem, &waiter)) {
- raw_spin_unlock_irq(&sem->wait_lock);
- return sem;
- }
raw_spin_unlock_irq(&sem->wait_lock);
schedule();
+ raw_spin_lock_irq(&sem->wait_lock);
}
+ list_del(&waiter.list);
+ raw_spin_unlock_irq(&sem->wait_lock);
tsk->state = TASK_RUNNING;
return sem;
--
1.8.2.1
This change reduces the size of the spinlocked and TASK_UNINTERRUPTIBLE
sections in rwsem_down_failed_common():
- We only need the sem->wait_lock to insert ourselves on the wait_list;
the waiter node can be prepared outside of the wait_lock.
- The task state only needs to be set to TASK_UNINTERRUPTIBLE immediately
before checking if we actually need to sleep; it doesn't need to protect
the entire function.
Signed-off-by: Michel Lespinasse <[email protected]>
Reviewed-by: Rik van Riel <[email protected]>
Reviewed-by: Peter Hurley <[email protected]>
Acked-by: Davidlohr Bueso <[email protected]>
---
lib/rwsem.c | 8 +++-----
1 file changed, 3 insertions(+), 5 deletions(-)
diff --git a/lib/rwsem.c b/lib/rwsem.c
index 672eb33218ac..40636454cf3c 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -188,14 +188,12 @@ rwsem_down_failed_common(struct rw_semaphore *sem,
struct task_struct *tsk = current;
signed long count;
- set_task_state(tsk, TASK_UNINTERRUPTIBLE);
-
/* set up my own style of waitqueue */
- raw_spin_lock_irq(&sem->wait_lock);
waiter.task = tsk;
waiter.type = type;
get_task_struct(tsk);
+ raw_spin_lock_irq(&sem->wait_lock);
if (list_empty(&sem->wait_list))
adjustment += RWSEM_WAITING_BIAS;
list_add_tail(&waiter.list, &sem->wait_list);
@@ -218,7 +216,8 @@ rwsem_down_failed_common(struct rw_semaphore *sem,
raw_spin_unlock_irq(&sem->wait_lock);
/* wait to be given the lock */
- for (;;) {
+ while (true) {
+ set_task_state(tsk, TASK_UNINTERRUPTIBLE);
if (!waiter.task)
break;
@@ -231,7 +230,6 @@ rwsem_down_failed_common(struct rw_semaphore *sem,
}
raw_spin_unlock_irq(&sem->wait_lock);
schedule();
- set_task_state(tsk, TASK_UNINTERRUPTIBLE);
}
tsk->state = TASK_RUNNING;
--
1.8.2.1
When trying to acquire a read lock, the RWSEM_ACTIVE_READ_BIAS adjustment
doesn't cause other readers to block, so we never have to worry about waking
them back after canceling this adjustment in rwsem_down_read_failed().
We also never want to steal the lock in rwsem_down_read_failed(), so we
don't have to grab the wait_lock either.
Signed-off-by: Michel Lespinasse <[email protected]>
Reviewed-by: Rik van Riel <[email protected]>
Reviewed-by: Peter Hurley <[email protected]>
Acked-by: Davidlohr Bueso <[email protected]>
---
lib/rwsem.c | 22 ++--------------------
1 file changed, 2 insertions(+), 20 deletions(-)
diff --git a/lib/rwsem.c b/lib/rwsem.c
index fb658af1c12c..66f307e90761 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -182,7 +182,6 @@ try_again_write:
*/
struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
{
- enum rwsem_waiter_type type = RWSEM_WAITING_FOR_READ;
signed long adjustment = -RWSEM_ACTIVE_READ_BIAS;
struct rwsem_waiter waiter;
struct task_struct *tsk = current;
@@ -190,7 +189,7 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
/* set up my own style of waitqueue */
waiter.task = tsk;
- waiter.type = type;
+ waiter.type = RWSEM_WAITING_FOR_READ;
get_task_struct(tsk);
raw_spin_lock_irq(&sem->wait_lock);
@@ -201,17 +200,9 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
/* we're now waiting on the lock, but no longer actively locking */
count = rwsem_atomic_update(adjustment, sem);
- /* If there are no active locks, wake the front queued process(es) up.
- *
- * Alternatively, if we're called from a failed down_write(), there
- * were already threads queued before us and there are no active
- * writers, the lock must be read owned; so we try to wake any read
- * locks that were queued ahead of us. */
+ /* If there are no active locks, wake the front queued process(es). */
if (count == RWSEM_WAITING_BIAS)
sem = __rwsem_do_wake(sem, RWSEM_WAKE_NO_ACTIVE);
- else if (count > RWSEM_WAITING_BIAS &&
- adjustment == -RWSEM_ACTIVE_WRITE_BIAS)
- sem = __rwsem_do_wake(sem, RWSEM_WAKE_READ_OWNED);
raw_spin_unlock_irq(&sem->wait_lock);
@@ -220,15 +211,6 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
if (!waiter.task)
break;
-
- raw_spin_lock_irq(&sem->wait_lock);
- /* Try to get the writer sem, may steal from the head writer: */
- if (type == RWSEM_WAITING_FOR_WRITE)
- if (try_get_writer_sem(sem, &waiter)) {
- raw_spin_unlock_irq(&sem->wait_lock);
- return sem;
- }
- raw_spin_unlock_irq(&sem->wait_lock);
schedule();
}
--
1.8.2.1
We are not planning to add some new waiter flags, so we can convert the
waiter type into an enumeration.
Background: David Howells suggested I do this back when I tried adding
a new waiter type for unfair readers. However, I believe the cleanup
applies regardless of that use case.
Signed-off-by: Michel Lespinasse <[email protected]>
Reviewed-by: Rik van Riel <[email protected]>
Reviewed-by: Peter Hurley <[email protected]>
Acked-by: Davidlohr Bueso <[email protected]>
---
lib/rwsem-spinlock.c | 19 +++++++++++--------
lib/rwsem.c | 23 +++++++++++++----------
2 files changed, 24 insertions(+), 18 deletions(-)
diff --git a/lib/rwsem-spinlock.c b/lib/rwsem-spinlock.c
index 7542afbb22b3..5f117f37ac0a 100644
--- a/lib/rwsem-spinlock.c
+++ b/lib/rwsem-spinlock.c
@@ -9,12 +9,15 @@
#include <linux/sched.h>
#include <linux/export.h>
+enum rwsem_waiter_type {
+ RWSEM_WAITING_FOR_WRITE,
+ RWSEM_WAITING_FOR_READ
+};
+
struct rwsem_waiter {
struct list_head list;
struct task_struct *task;
- unsigned int flags;
-#define RWSEM_WAITING_FOR_READ 0x00000001
-#define RWSEM_WAITING_FOR_WRITE 0x00000002
+ enum rwsem_waiter_type type;
};
int rwsem_is_locked(struct rw_semaphore *sem)
@@ -68,7 +71,7 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wakewrite)
waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
if (!wakewrite) {
- if (waiter->flags & RWSEM_WAITING_FOR_WRITE)
+ if (waiter->type == RWSEM_WAITING_FOR_WRITE)
goto out;
goto dont_wake_writers;
}
@@ -78,7 +81,7 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wakewrite)
* to -1 here to indicate we get the lock. Instead, we wake it up
* to let it go get it again.
*/
- if (waiter->flags & RWSEM_WAITING_FOR_WRITE) {
+ if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
wake_up_process(waiter->task);
goto out;
}
@@ -86,7 +89,7 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wakewrite)
/* grant an infinite number of read locks to the front of the queue */
dont_wake_writers:
woken = 0;
- while (waiter->flags & RWSEM_WAITING_FOR_READ) {
+ while (waiter->type == RWSEM_WAITING_FOR_READ) {
struct list_head *next = waiter->list.next;
list_del(&waiter->list);
@@ -144,7 +147,7 @@ void __sched __down_read(struct rw_semaphore *sem)
/* set up my own style of waitqueue */
waiter.task = tsk;
- waiter.flags = RWSEM_WAITING_FOR_READ;
+ waiter.type = RWSEM_WAITING_FOR_READ;
get_task_struct(tsk);
list_add_tail(&waiter.list, &sem->wait_list);
@@ -201,7 +204,7 @@ void __sched __down_write_nested(struct rw_semaphore *sem, int subclass)
/* set up my own style of waitqueue */
tsk = current;
waiter.task = tsk;
- waiter.flags = RWSEM_WAITING_FOR_WRITE;
+ waiter.type = RWSEM_WAITING_FOR_WRITE;
list_add_tail(&waiter.list, &sem->wait_list);
/* wait for someone to release the lock */
diff --git a/lib/rwsem.c b/lib/rwsem.c
index ad5e0df16ab4..672eb33218ac 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -30,12 +30,15 @@ void __init_rwsem(struct rw_semaphore *sem, const char *name,
EXPORT_SYMBOL(__init_rwsem);
+enum rwsem_waiter_type {
+ RWSEM_WAITING_FOR_WRITE,
+ RWSEM_WAITING_FOR_READ
+};
+
struct rwsem_waiter {
struct list_head list;
struct task_struct *task;
- unsigned int flags;
-#define RWSEM_WAITING_FOR_READ 0x00000001
-#define RWSEM_WAITING_FOR_WRITE 0x00000002
+ enum rwsem_waiter_type type;
};
/* Wake types for __rwsem_do_wake(). Note that RWSEM_WAKE_NO_ACTIVE and
@@ -65,7 +68,7 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
signed long woken, loop, adjustment;
waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
- if (!(waiter->flags & RWSEM_WAITING_FOR_WRITE))
+ if (waiter->type != RWSEM_WAITING_FOR_WRITE)
goto readers_only;
if (wake_type == RWSEM_WAKE_READ_OWNED)
@@ -112,10 +115,10 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
waiter = list_entry(waiter->list.next,
struct rwsem_waiter, list);
- } while (waiter->flags & RWSEM_WAITING_FOR_READ);
+ } while (waiter->type != RWSEM_WAITING_FOR_WRITE);
adjustment = woken * RWSEM_ACTIVE_READ_BIAS;
- if (waiter->flags & RWSEM_WAITING_FOR_READ)
+ if (waiter->type != RWSEM_WAITING_FOR_WRITE)
/* hit end of list above */
adjustment -= RWSEM_WAITING_BIAS;
@@ -148,7 +151,7 @@ static int try_get_writer_sem(struct rw_semaphore *sem,
/* only steal when first waiter is writing */
fwaiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
- if (!(fwaiter->flags & RWSEM_WAITING_FOR_WRITE))
+ if (fwaiter->type != RWSEM_WAITING_FOR_WRITE)
return 0;
adjustment = RWSEM_ACTIVE_WRITE_BIAS;
@@ -179,7 +182,7 @@ try_again_write:
*/
static struct rw_semaphore __sched *
rwsem_down_failed_common(struct rw_semaphore *sem,
- unsigned int flags, signed long adjustment)
+ enum rwsem_waiter_type type, signed long adjustment)
{
struct rwsem_waiter waiter;
struct task_struct *tsk = current;
@@ -190,7 +193,7 @@ rwsem_down_failed_common(struct rw_semaphore *sem,
/* set up my own style of waitqueue */
raw_spin_lock_irq(&sem->wait_lock);
waiter.task = tsk;
- waiter.flags = flags;
+ waiter.type = type;
get_task_struct(tsk);
if (list_empty(&sem->wait_list))
@@ -221,7 +224,7 @@ rwsem_down_failed_common(struct rw_semaphore *sem,
raw_spin_lock_irq(&sem->wait_lock);
/* Try to get the writer sem, may steal from the head writer: */
- if (flags == RWSEM_WAITING_FOR_WRITE)
+ if (type == RWSEM_WAITING_FOR_WRITE)
if (try_get_writer_sem(sem, &waiter)) {
raw_spin_unlock_irq(&sem->wait_lock);
return sem;
--
1.8.2.1
On Tue, 2013-05-07 at 06:45 -0700, Michel Lespinasse wrote:
> (Davidlohr also has one additional patch which further improves throughput,
> though I will ask him to send it directly to you as I have suggested some
> minor changes).
From: Davidlohr Bueso <[email protected]>
This patch tries to reduce the amount of cmpxchg calls in the
writer failed path by checking the counter value first before
issuing the instruction. If ->count is not set to RWSEM_WAITING_BIAS
then there is no point wasting a cmpxchg call.
Furthermore, Michel states "I suppose it helps due to the case where
someone else steals the lock while we're trying to acquire sem->wait_lock."
Two very different workloads and machines were used to see how this
patch improves throughput: pgbench on a quad-core laptop and aim7 on a
large 8 socket box with 80 cores.
Some results comparing Michel's fast-path write lock stealing
(tps-rwsem) on a quad-core laptop running pgbench:
| db_size | clients | tps-rwsem | tps-patch |
+---------+----------+----------------+--------------+
| 160 MB | 1 | 6906 | 9153 | + 32.5
| 160 MB | 2 | 15931 | 22487 | + 41.1%
| 160 MB | 4 | 33021 | 32503 |
| 160 MB | 8 | 34626 | 34695 |
| 160 MB | 16 | 33098 | 34003 |
| 160 MB | 20 | 31343 | 31440 |
| 160 MB | 30 | 28961 | 28987 |
| 160 MB | 40 | 26902 | 26970 |
| 160 MB | 50 | 25760 | 25810 |
------------------------------------------------------
| 1.6 GB | 1 | 7729 | 7537 |
| 1.6 GB | 2 | 19009 | 23508 | + 23.7%
| 1.6 GB | 4 | 33185 | 32666 |
| 1.6 GB | 8 | 34550 | 34318 |
| 1.6 GB | 16 | 33079 | 32689 |
| 1.6 GB | 20 | 31494 | 31702 |
| 1.6 GB | 30 | 28535 | 28755 |
| 1.6 GB | 40 | 27054 | 27017 |
| 1.6 GB | 50 | 25591 | 25560 |
------------------------------------------------------
| 7.6 GB | 1 | 6224 | 7469 | + 20.0%
| 7.6 GB | 2 | 13611 | 12778 |
| 7.6 GB | 4 | 33108 | 32927 |
| 7.6 GB | 8 | 34712 | 34878 |
| 7.6 GB | 16 | 32895 | 33003 |
| 7.6 GB | 20 | 31689 | 31974 |
| 7.6 GB | 30 | 29003 | 28806 |
| 7.6 GB | 40 | 26683 | 26976 |
| 7.6 GB | 50 | 25925 | 25652 |
------------------------------------------------------
For the aim7 worloads, they overall improved on top of Michel's
patchset. For full graphs on how the rwsem series plus this patch
behaves on a large 8 socket machine against a vanilla kernel:
http://stgolabs.net/rwsem-aim7-results.tar.gz
Signed-off-by: Davidlohr Bueso <[email protected]>
---
lib/rwsem.c | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/lib/rwsem.c b/lib/rwsem.c
index cf0ad2a..19c5fa9 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -223,7 +223,9 @@ struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
count = RWSEM_ACTIVE_WRITE_BIAS;
if (!list_is_singular(&sem->wait_list))
count += RWSEM_WAITING_BIAS;
- if (cmpxchg(&sem->count, RWSEM_WAITING_BIAS, count) ==
+
+ if (sem->count == RWSEM_WAITING_BIAS &&
+ cmpxchg(&sem->count, RWSEM_WAITING_BIAS, count) ==
RWSEM_WAITING_BIAS)
break;
}
--
1.7.11.7