2013-03-15 10:54:34

by Michel Lespinasse

[permalink] [raw]
Subject: [PATCH v2 00/13] rwsem fast-path write lock stealing

These patches extend Alex Shi's work (which added write lock stealing
on the rwsem slow path) in order to provide rwsem write lock stealing
on the fast path (that is, without taking the rwsem's wait_lock).

I initially sent a shorter series shortly before v3.9, however some
patches were doing too much at once which made them confusing to
review. I have now split the series at a smaller granularity;
hope this will help :)

Patches 1-2 are for cleanups:

- Patch 1 replaces the waiter type bitmask with an enumeration
(as we don't have any other planned uses for the bitmap)

- Patch 2 shortens critical sections in rwsem_down_failed_common() so
they don't cover more than what is absolutely necessary

Patches 3-5 splits rwsem_down_failed_common() into separate functions
for the read and write sides:

- Patch 3 simply puts two identical copies of rwsem_down_failed_common()
into rwsem_down_{read,write}_failed (no code changes, in order to
make the review easier)

- Patch 4 does easy simplifications in rwsem_down_read_failed():
- We don't need to wake readers queued before us;
- We don't need to try to steal the lock, and thus we don't need to
acquire the wait_lock after sleeping.

- Patch 5 does easy simplifications in rwsem_down_write_failed():
- We don't need to check for !waiter.task since __rwsem_do_wake()
doesn't remove writers from the wait_list;
- Since the only way to exit the wait loop is by stealing the write lock,
the corresponding exit code can be moved after the loop;
- There is no point releaseing the wait_lock before entering the
wait loop, as we will need to reacquire it immediately;
- We don't need to get a reference on the task structure, since the task
is responsible for removing itself from the wait_list;

Patches 6-9 apply additional optimizations to rwsem_down_write_failed():

- Patch 6 tries write lock stealing more aggressively in order to avoid
extra checks;

- Patch 7 uses cmpxchg to implement the write lock stealing, instead of
doing an additive adjustment that might need to be backed out;

- Patch 8 avoids taking the wait_lock if there are already active locks
when we wake up;

- Patch 9 avoids the initial trylock if there were already active locks
when we entered rwsem_down_write_failed()

Patches 10-11 are updates to the __rwsem_do_wake function:

- Patch 10 has simple structure cleanups that don't change the code behavior

- Patch 11 adds generic support for fastpath write lock stealing, by
removing assumptions the function did about rwsem acquisitions being
prevented when the rwsem count value indicates there are queued waiters.

Patch 12 fixes a race condition Peter Hurley noticed in reviewing v1 of
this patch series, which resulted in readers sometimes blocking instead
of executing in parallel with other existing readers.

Patch 13 finally implements rwsem fast path lock stealing for x86 arch.

Michel Lespinasse (13):
rwsem: make the waiter type an enumeration rather than a bitmask
rwsem: shorter spinlocked section in rwsem_down_failed_common()
rwsem: move rwsem_down_failed_common code into rwsem_down_{read,write}_failed
rwsem: simplify rwsem_down_read_failed
rwsem: simplify rwsem_down_write_failed
rwsem: more agressive lock stealing in rwsem_down_write_failed
rwsem: use cmpxchg for trying to steal write lock
rwsem: avoid taking wait_lock in rwsem_down_write_failed
rwsem: skip initial trylock in rwsem_down_write_failed
rwsem: simplify __rwsem_do_wake
rwsem: implement support for write lock stealing on the fastpath
rwsem: do not block readers at head of queue if other readers are active
x86 rwsem: avoid taking slow path when stealing write lock

arch/x86/include/asm/rwsem.h | 28 +++--
lib/rwsem-spinlock.c | 40 +++-----
lib/rwsem.c | 239 +++++++++++++++++++++----------------------
3 files changed, 154 insertions(+), 153 deletions(-)

Changes since v1:

- Patches 1-9 are unchanged and Patch 13 are unchanged from prior submission.

- v1 series made a change to ordering of reader wakeups which proved more
controversial than I had anticipated. I reverted this part of the series.
Patches 10-11 are new and imlement (well, in Patch 11) the minimal changes
required for supporting fastpath write lock stealing without modifying the
ordering of reader wakeups (This results in longer code than with the
prior proposal, though).

- Patch 12 is new, fixing a race condition Peter Hurley noticed while
reviewing v1 which could result in reduced parallelism between readers.

--
1.8.1.3


2013-03-15 10:54:40

by Michel Lespinasse

[permalink] [raw]
Subject: [PATCH v2 01/13] rwsem: make the waiter type an enumeration rather than a bitmask

We are not planning to add some new waiter flags, so we can convert the
waiter type into an enumeration.

Background: David Howells suggested I do this back when I tried adding
a new waiter type for unfair readers. However, I believe the cleanup
applies regardless of that use case.

Signed-off-by: Michel Lespinasse <[email protected]>

---
lib/rwsem-spinlock.c | 19 +++++++++++--------
lib/rwsem.c | 23 +++++++++++++----------
2 files changed, 24 insertions(+), 18 deletions(-)

diff --git a/lib/rwsem-spinlock.c b/lib/rwsem-spinlock.c
index 7542afbb22b3..5f117f37ac0a 100644
--- a/lib/rwsem-spinlock.c
+++ b/lib/rwsem-spinlock.c
@@ -9,12 +9,15 @@
#include <linux/sched.h>
#include <linux/export.h>

+enum rwsem_waiter_type {
+ RWSEM_WAITING_FOR_WRITE,
+ RWSEM_WAITING_FOR_READ
+};
+
struct rwsem_waiter {
struct list_head list;
struct task_struct *task;
- unsigned int flags;
-#define RWSEM_WAITING_FOR_READ 0x00000001
-#define RWSEM_WAITING_FOR_WRITE 0x00000002
+ enum rwsem_waiter_type type;
};

int rwsem_is_locked(struct rw_semaphore *sem)
@@ -68,7 +71,7 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wakewrite)
waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);

if (!wakewrite) {
- if (waiter->flags & RWSEM_WAITING_FOR_WRITE)
+ if (waiter->type == RWSEM_WAITING_FOR_WRITE)
goto out;
goto dont_wake_writers;
}
@@ -78,7 +81,7 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wakewrite)
* to -1 here to indicate we get the lock. Instead, we wake it up
* to let it go get it again.
*/
- if (waiter->flags & RWSEM_WAITING_FOR_WRITE) {
+ if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
wake_up_process(waiter->task);
goto out;
}
@@ -86,7 +89,7 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wakewrite)
/* grant an infinite number of read locks to the front of the queue */
dont_wake_writers:
woken = 0;
- while (waiter->flags & RWSEM_WAITING_FOR_READ) {
+ while (waiter->type == RWSEM_WAITING_FOR_READ) {
struct list_head *next = waiter->list.next;

list_del(&waiter->list);
@@ -144,7 +147,7 @@ void __sched __down_read(struct rw_semaphore *sem)

/* set up my own style of waitqueue */
waiter.task = tsk;
- waiter.flags = RWSEM_WAITING_FOR_READ;
+ waiter.type = RWSEM_WAITING_FOR_READ;
get_task_struct(tsk);

list_add_tail(&waiter.list, &sem->wait_list);
@@ -201,7 +204,7 @@ void __sched __down_write_nested(struct rw_semaphore *sem, int subclass)
/* set up my own style of waitqueue */
tsk = current;
waiter.task = tsk;
- waiter.flags = RWSEM_WAITING_FOR_WRITE;
+ waiter.type = RWSEM_WAITING_FOR_WRITE;
list_add_tail(&waiter.list, &sem->wait_list);

/* wait for someone to release the lock */
diff --git a/lib/rwsem.c b/lib/rwsem.c
index ad5e0df16ab4..672eb33218ac 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -30,12 +30,15 @@ void __init_rwsem(struct rw_semaphore *sem, const char *name,

EXPORT_SYMBOL(__init_rwsem);

+enum rwsem_waiter_type {
+ RWSEM_WAITING_FOR_WRITE,
+ RWSEM_WAITING_FOR_READ
+};
+
struct rwsem_waiter {
struct list_head list;
struct task_struct *task;
- unsigned int flags;
-#define RWSEM_WAITING_FOR_READ 0x00000001
-#define RWSEM_WAITING_FOR_WRITE 0x00000002
+ enum rwsem_waiter_type type;
};

/* Wake types for __rwsem_do_wake(). Note that RWSEM_WAKE_NO_ACTIVE and
@@ -65,7 +68,7 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
signed long woken, loop, adjustment;

waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
- if (!(waiter->flags & RWSEM_WAITING_FOR_WRITE))
+ if (waiter->type != RWSEM_WAITING_FOR_WRITE)
goto readers_only;

if (wake_type == RWSEM_WAKE_READ_OWNED)
@@ -112,10 +115,10 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
waiter = list_entry(waiter->list.next,
struct rwsem_waiter, list);

- } while (waiter->flags & RWSEM_WAITING_FOR_READ);
+ } while (waiter->type != RWSEM_WAITING_FOR_WRITE);

adjustment = woken * RWSEM_ACTIVE_READ_BIAS;
- if (waiter->flags & RWSEM_WAITING_FOR_READ)
+ if (waiter->type != RWSEM_WAITING_FOR_WRITE)
/* hit end of list above */
adjustment -= RWSEM_WAITING_BIAS;

@@ -148,7 +151,7 @@ static int try_get_writer_sem(struct rw_semaphore *sem,

/* only steal when first waiter is writing */
fwaiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
- if (!(fwaiter->flags & RWSEM_WAITING_FOR_WRITE))
+ if (fwaiter->type != RWSEM_WAITING_FOR_WRITE)
return 0;

adjustment = RWSEM_ACTIVE_WRITE_BIAS;
@@ -179,7 +182,7 @@ try_again_write:
*/
static struct rw_semaphore __sched *
rwsem_down_failed_common(struct rw_semaphore *sem,
- unsigned int flags, signed long adjustment)
+ enum rwsem_waiter_type type, signed long adjustment)
{
struct rwsem_waiter waiter;
struct task_struct *tsk = current;
@@ -190,7 +193,7 @@ rwsem_down_failed_common(struct rw_semaphore *sem,
/* set up my own style of waitqueue */
raw_spin_lock_irq(&sem->wait_lock);
waiter.task = tsk;
- waiter.flags = flags;
+ waiter.type = type;
get_task_struct(tsk);

if (list_empty(&sem->wait_list))
@@ -221,7 +224,7 @@ rwsem_down_failed_common(struct rw_semaphore *sem,

raw_spin_lock_irq(&sem->wait_lock);
/* Try to get the writer sem, may steal from the head writer: */
- if (flags == RWSEM_WAITING_FOR_WRITE)
+ if (type == RWSEM_WAITING_FOR_WRITE)
if (try_get_writer_sem(sem, &waiter)) {
raw_spin_unlock_irq(&sem->wait_lock);
return sem;
--
1.8.1.3

2013-03-15 10:54:50

by Michel Lespinasse

[permalink] [raw]
Subject: [PATCH v2 09/13] rwsem: skip initial trylock in rwsem_down_write_failed

We can skip the initial trylock in rwsem_down_write_failed() if there are
known active lockers already, thus saving one likely-to-fail cmpxchg.

Signed-off-by: Michel Lespinasse <[email protected]>

---
lib/rwsem.c | 17 +++++++++--------
1 file changed, 9 insertions(+), 8 deletions(-)

diff --git a/lib/rwsem.c b/lib/rwsem.c
index edf3d9ca670e..0d50e46d5b0c 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -216,14 +216,15 @@ struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
/* wait until we successfully acquire the lock */
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
while (true) {
-
- /* Try acquiring the write lock. */
- count = RWSEM_ACTIVE_WRITE_BIAS;
- if (!list_is_singular(&sem->wait_list))
- count += RWSEM_WAITING_BIAS;
- if (cmpxchg(&sem->count, RWSEM_WAITING_BIAS, count) ==
+ if (!(count & RWSEM_ACTIVE_MASK)) {
+ /* Try acquiring the write lock. */
+ count = RWSEM_ACTIVE_WRITE_BIAS;
+ if (!list_is_singular(&sem->wait_list))
+ count += RWSEM_WAITING_BIAS;
+ if (cmpxchg(&sem->count, RWSEM_WAITING_BIAS, count) ==
RWSEM_WAITING_BIAS)
- break;
+ break;
+ }

raw_spin_unlock_irq(&sem->wait_lock);

@@ -231,7 +232,7 @@ struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
do {
schedule();
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
- } while (sem->count & RWSEM_ACTIVE_MASK);
+ } while ((count = sem->count) & RWSEM_ACTIVE_MASK);

raw_spin_lock_irq(&sem->wait_lock);
}
--
1.8.1.3

2013-03-15 10:54:43

by Michel Lespinasse

[permalink] [raw]
Subject: [PATCH v2 05/13] rwsem: simplify rwsem_down_write_failed

When waking writers, we never grant them the lock - instead, they have
to acquire it themselves when they run, and remove themselves from the
wait_list when they succeed.

As a result, we can do a few simplifications in rwsem_down_write_failed():

- We don't need to check for !waiter.task since __rwsem_do_wake() doesn't
remove writers from the wait_list

- There is no point releaseing the wait_lock before entering the wait loop,
as we will need to reacquire it immediately. We can change the loop so
that the lock is always held at the start of each loop iteration.

- We don't need to get a reference on the task structure, since the task
is responsible for removing itself from the wait_list. There is no risk,
like in the rwsem_down_read_failed() case, that a task would wake up and
exit (thus destroying its task structure) while __rwsem_do_wake() is
still running - wait_lock protects against that.

Signed-off-by: Michel Lespinasse <[email protected]>

---
lib/rwsem.c | 33 +++++++++------------------------
1 file changed, 9 insertions(+), 24 deletions(-)

diff --git a/lib/rwsem.c b/lib/rwsem.c
index 66f307e90761..c73bd96dc30c 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -161,16 +161,8 @@ static int try_get_writer_sem(struct rw_semaphore *sem,

try_again_write:
oldcount = rwsem_atomic_update(adjustment, sem) - adjustment;
- if (!(oldcount & RWSEM_ACTIVE_MASK)) {
- /* No active lock: */
- struct task_struct *tsk = waiter->task;
-
- list_del(&waiter->list);
- smp_mb();
- put_task_struct(tsk);
- tsk->state = TASK_RUNNING;
+ if (!(oldcount & RWSEM_ACTIVE_MASK))
return 1;
- }
/* some one grabbed the sem already */
if (rwsem_atomic_update(-adjustment, sem) & RWSEM_ACTIVE_MASK)
return 0;
@@ -220,11 +212,10 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
}

/*
- * wait for the write lock to be granted
+ * wait until we successfully acquire the write lock
*/
struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
{
- enum rwsem_waiter_type type = RWSEM_WAITING_FOR_WRITE;
signed long adjustment = -RWSEM_ACTIVE_WRITE_BIAS;
struct rwsem_waiter waiter;
struct task_struct *tsk = current;
@@ -232,8 +223,7 @@ struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)

/* set up my own style of waitqueue */
waiter.task = tsk;
- waiter.type = type;
- get_task_struct(tsk);
+ waiter.type = RWSEM_WAITING_FOR_WRITE;

raw_spin_lock_irq(&sem->wait_lock);
if (list_empty(&sem->wait_list))
@@ -255,25 +245,20 @@ struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
adjustment == -RWSEM_ACTIVE_WRITE_BIAS)
sem = __rwsem_do_wake(sem, RWSEM_WAKE_READ_OWNED);

- raw_spin_unlock_irq(&sem->wait_lock);
-
- /* wait to be given the lock */
+ /* wait until we successfully acquire the lock */
while (true) {
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
- if (!waiter.task)
+
+ if (try_get_writer_sem(sem, &waiter))
break;

- raw_spin_lock_irq(&sem->wait_lock);
- /* Try to get the writer sem, may steal from the head writer: */
- if (type == RWSEM_WAITING_FOR_WRITE)
- if (try_get_writer_sem(sem, &waiter)) {
- raw_spin_unlock_irq(&sem->wait_lock);
- return sem;
- }
raw_spin_unlock_irq(&sem->wait_lock);
schedule();
+ raw_spin_lock_irq(&sem->wait_lock);
}

+ list_del(&waiter.list);
+ raw_spin_unlock_irq(&sem->wait_lock);
tsk->state = TASK_RUNNING;

return sem;
--
1.8.1.3

2013-03-15 10:54:55

by Michel Lespinasse

[permalink] [raw]
Subject: [PATCH v2 13/13] x86 rwsem: avoid taking slow path when stealing write lock

modify __down_write[_nested] and __down_write_trylock to grab the write
lock whenever the active count is 0, even if there are queued waiters
(they must be writers pending wakeup, since the active count is 0).

Note that this is an optimization only; architectures without this
optimization will still work fine:

- __down_write() would take the slow path which would take the wait_lock
and then try stealing the lock (as in the spinlocked rwsem implementation)

- __down_write_trylock() would fail, but callers must be ready to deal
with that - since there are some writers pending wakeup, they could
have raced with us and obtained the lock before we steal it.

Signed-off-by: Michel Lespinasse <[email protected]>

---
arch/x86/include/asm/rwsem.h | 28 +++++++++++++++++++++-------
1 file changed, 21 insertions(+), 7 deletions(-)

diff --git a/arch/x86/include/asm/rwsem.h b/arch/x86/include/asm/rwsem.h
index 2dbe4a721ce5..cad82c9c2fde 100644
--- a/arch/x86/include/asm/rwsem.h
+++ b/arch/x86/include/asm/rwsem.h
@@ -105,8 +105,8 @@ static inline void __down_write_nested(struct rw_semaphore *sem, int subclass)
asm volatile("# beginning down_write\n\t"
LOCK_PREFIX " xadd %1,(%2)\n\t"
/* adds 0xffff0001, returns the old value */
- " test %1,%1\n\t"
- /* was the count 0 before? */
+ " test " __ASM_SEL(%w1,%k1) "," __ASM_SEL(%w1,%k1) "\n\t"
+ /* was the active mask 0 before? */
" jz 1f\n"
" call call_rwsem_down_write_failed\n"
"1:\n"
@@ -126,11 +126,25 @@ static inline void __down_write(struct rw_semaphore *sem)
*/
static inline int __down_write_trylock(struct rw_semaphore *sem)
{
- long ret = cmpxchg(&sem->count, RWSEM_UNLOCKED_VALUE,
- RWSEM_ACTIVE_WRITE_BIAS);
- if (ret == RWSEM_UNLOCKED_VALUE)
- return 1;
- return 0;
+ long result, tmp;
+ asm volatile("# beginning __down_write_trylock\n\t"
+ " mov %0,%1\n\t"
+ "1:\n\t"
+ " test " __ASM_SEL(%w1,%k1) "," __ASM_SEL(%w1,%k1) "\n\t"
+ /* was the active mask 0 before? */
+ " jnz 2f\n\t"
+ " mov %1,%2\n\t"
+ " add %3,%2\n\t"
+ LOCK_PREFIX " cmpxchg %2,%0\n\t"
+ " jnz 1b\n\t"
+ "2:\n\t"
+ " sete %b1\n\t"
+ " movzbl %b1, %k1\n\t"
+ "# ending __down_write_trylock\n\t"
+ : "+m" (sem->count), "=&a" (result), "=&r" (tmp)
+ : "er" (RWSEM_ACTIVE_WRITE_BIAS)
+ : "memory", "cc");
+ return result;
}

/*
--
1.8.1.3

2013-03-15 10:54:53

by Michel Lespinasse

[permalink] [raw]
Subject: [PATCH v2 12/13] rwsem: do not block readers at head of queue if other readers are active

This change fixes a race condition where a reader might determine it
needs to block, but by the time it acquires the wait_lock the rwsem
has active readers and no queued waiters.

In this situation the reader can just in parallel with the existing active
readers; it does not need to block until the active readers complete.

Thanks to Peter Hurley for noticing this possible race.

Signed-off-by: Michel Lespinasse <[email protected]>

---
lib/rwsem.c | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/lib/rwsem.c b/lib/rwsem.c
index 09bf03e7808c..4e4c8893dc00 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -162,8 +162,14 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
/* we're now waiting on the lock, but no longer actively locking */
count = rwsem_atomic_update(adjustment, sem);

- /* If there are no active locks, wake the front queued process(es). */
- if (!(count & RWSEM_ACTIVE_MASK))
+ /* If there are no active locks, wake the front queued process(es).
+ *
+ * If there are no writers and we are first in the queue,
+ * wake our own waiter to join the existing active readers !
+ */
+ if (count == RWSEM_WAITING_BIAS ||
+ (count > RWSEM_WAITING_BIAS &&
+ adjustment != -RWSEM_ACTIVE_READ_BIAS))
sem = __rwsem_do_wake(sem, RWSEM_WAKE_ANY);

raw_spin_unlock_irq(&sem->wait_lock);
--
1.8.1.3

2013-03-15 10:55:52

by Michel Lespinasse

[permalink] [raw]
Subject: [PATCH v2 11/13] rwsem: implement support for write lock stealing on the fastpath

When we decide to wake up readers, we must first grant them as many
read locks as necessary, and then actually wake up all these readers.
But in order to know how many read shares to grant, we must first
count the readers at the head of the queue. This might take a while
if there are many readers, and we want to be protected against a
writer stealing the lock while we're counting. To that end, we grant
the first reader lock before counting how many more readers are queued.

We also require some adjustments to the wake_type semantics.

RWSEM_WAKE_NO_ACTIVE used to mean that we had found the count to
be RWSEM_WAITING_BIAS, in which case the rwsem was known to be free
as nobody could steal it while we hold the wait_lock. This doesn't
make sense once we implement fastpath write lock stealing, so we now
use RWSEM_WAKE_ANY in that case.

Similarly, when rwsem_down_write_failed found that a read lock was active,
it would use RWSEM_WAKE_READ_OWNED which signalled that new readers could
be woken without checking first that the rwsem was available. We can't do
that anymore since the existing readers might release their read locks,
and a writer could steal the lock before we wake up additional readers.
So, we have to use a new RWSEM_WAKE_READERS value to indicate we only want
to wake readers, but we don't currently hold any read lock.

Signed-off-by: Michel Lespinasse <[email protected]>

---
lib/rwsem.c | 63 ++++++++++++++++++++++++++++++-------------------------------
1 file changed, 31 insertions(+), 32 deletions(-)

diff --git a/lib/rwsem.c b/lib/rwsem.c
index 9a675fa9d78e..09bf03e7808c 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -41,13 +41,11 @@ struct rwsem_waiter {
enum rwsem_waiter_type type;
};

-/* Wake types for __rwsem_do_wake(). Note that RWSEM_WAKE_NO_ACTIVE and
- * RWSEM_WAKE_READ_OWNED imply that the spinlock must have been kept held
- * since the rwsem value was observed.
- */
-#define RWSEM_WAKE_ANY 0 /* Wake whatever's at head of wait list */
-#define RWSEM_WAKE_NO_ACTIVE 1 /* rwsem was observed with no active thread */
-#define RWSEM_WAKE_READ_OWNED 2 /* rwsem was observed to be read owned */
+enum rwsem_wake_type {
+ RWSEM_WAKE_ANY, /* Wake whatever's at head of wait list */
+ RWSEM_WAKE_READERS, /* Wake readers only */
+ RWSEM_WAKE_READ_OWNED /* Waker thread holds the read lock */
+};

/*
* handle the lock release when processes blocked on it that can now run
@@ -60,16 +58,16 @@ struct rwsem_waiter {
* - writers are only woken if downgrading is false
*/
static struct rw_semaphore *
-__rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
+__rwsem_do_wake(struct rw_semaphore *sem, enum rwsem_wake_type wake_type)
{
struct rwsem_waiter *waiter;
struct task_struct *tsk;
struct list_head *next;
- signed long woken, loop, adjustment;
+ signed long oldcount, woken, loop, adjustment;

waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
- if (wake_type != RWSEM_WAKE_READ_OWNED)
+ if (wake_type == RWSEM_WAKE_ANY)
/* Wake writer at the front of the queue, but do not
* grant it the lock yet as we want other writers
* to be able to steal it. Readers, on the other hand,
@@ -79,24 +77,24 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
goto out;
}

- /* If we come here from up_xxxx(), another thread might have reached
- * rwsem_down_failed_common() before we acquired the spinlock and
- * woken up a waiter, making it now active. We prefer to check for
- * this first in order to not spend too much time with the spinlock
- * held if we're not going to be able to wake up readers in the end.
- *
- * Note that we do not need to update the rwsem count: any writer
- * trying to acquire rwsem will run rwsem_down_write_failed() due
- * to the waiting threads and block trying to acquire the spinlock.
- *
- * We use a dummy atomic update in order to acquire the cache line
- * exclusively since we expect to succeed and run the final rwsem
- * count adjustment pretty soon.
+ /* Writers might steal the lock before we grant it to the next reader.
+ * We prefer to do the first reader grant before counting readers
+ * so we can bail out early if a writer stole the lock.
*/
- if (wake_type == RWSEM_WAKE_ANY &&
- rwsem_atomic_update(0, sem) < RWSEM_WAITING_BIAS)
- /* Someone grabbed the sem for write already */
- goto out;
+ adjustment = 0;
+ if (wake_type != RWSEM_WAKE_READ_OWNED) {
+ adjustment = RWSEM_ACTIVE_READ_BIAS;
+ try_reader_grant:
+ oldcount = rwsem_atomic_update(adjustment, sem) - adjustment;
+ if (unlikely(oldcount < RWSEM_WAITING_BIAS)) {
+ /* A writer stole the lock. Undo our reader grant. */
+ if (rwsem_atomic_update(-adjustment, sem) &
+ RWSEM_ACTIVE_MASK)
+ goto out;
+ /* Last active locker left. Retry waking readers. */
+ goto try_reader_grant;
+ }
+ }

/* Grant an infinite number of read locks to the readers at the front
* of the queue. Note we increment the 'active part' of the count by
@@ -114,12 +112,13 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wake_type)

} while (waiter->type != RWSEM_WAITING_FOR_WRITE);

- adjustment = woken * RWSEM_ACTIVE_READ_BIAS;
+ adjustment = woken * RWSEM_ACTIVE_READ_BIAS - adjustment;
if (waiter->type != RWSEM_WAITING_FOR_WRITE)
/* hit end of list above */
adjustment -= RWSEM_WAITING_BIAS;

- rwsem_atomic_add(adjustment, sem);
+ if (adjustment)
+ rwsem_atomic_add(adjustment, sem);

next = sem->wait_list.next;
loop = woken;
@@ -164,8 +163,8 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
count = rwsem_atomic_update(adjustment, sem);

/* If there are no active locks, wake the front queued process(es). */
- if (count == RWSEM_WAITING_BIAS)
- sem = __rwsem_do_wake(sem, RWSEM_WAKE_NO_ACTIVE);
+ if (!(count & RWSEM_ACTIVE_MASK))
+ sem = __rwsem_do_wake(sem, RWSEM_WAKE_ANY);

raw_spin_unlock_irq(&sem->wait_lock);

@@ -209,7 +208,7 @@ struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
* any read locks that were queued ahead of us. */
if (count > RWSEM_WAITING_BIAS &&
adjustment == -RWSEM_ACTIVE_WRITE_BIAS)
- sem = __rwsem_do_wake(sem, RWSEM_WAKE_READ_OWNED);
+ sem = __rwsem_do_wake(sem, RWSEM_WAKE_READERS);

/* wait until we successfully acquire the lock */
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
--
1.8.1.3

2013-03-15 10:56:15

by Michel Lespinasse

[permalink] [raw]
Subject: [PATCH v2 10/13] rwsem: simplify __rwsem_do_wake

This is mostly for cleanup value:

- We don't need several gotos to handle the case where the first
waiter is a writer. Two simple tests will do (and generate very
similar code).

- In the remainder of the function, we know the first waiter is a reader,
so we don't have to double check that. We can use do..while loops
to iterate over the readers to wake (generates slightly better code).

Signed-off-by: Michel Lespinasse <[email protected]>

---
lib/rwsem-spinlock.c | 25 ++++++++-----------------
lib/rwsem.c | 26 ++++++++++++--------------
2 files changed, 20 insertions(+), 31 deletions(-)

diff --git a/lib/rwsem-spinlock.c b/lib/rwsem-spinlock.c
index 5f117f37ac0a..20207a6e8ac3 100644
--- a/lib/rwsem-spinlock.c
+++ b/lib/rwsem-spinlock.c
@@ -70,26 +70,17 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wakewrite)

waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);

- if (!wakewrite) {
- if (waiter->type == RWSEM_WAITING_FOR_WRITE)
- goto out;
- goto dont_wake_writers;
- }
-
- /*
- * as we support write lock stealing, we can't set sem->activity
- * to -1 here to indicate we get the lock. Instead, we wake it up
- * to let it go get it again.
- */
if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
- wake_up_process(waiter->task);
+ if (wakewrite)
+ /* Wake up a writer. Note that we do not grant it the
+ * lock - it will have to acquire it when it runs. */
+ wake_up_process(waiter->task);
goto out;
}

- /* grant an infinite number of read locks to the front of the queue */
- dont_wake_writers:
+ /* grant read locks to all queued readers. */
woken = 0;
- while (waiter->type == RWSEM_WAITING_FOR_READ) {
+ do {
struct list_head *next = waiter->list.next;

list_del(&waiter->list);
@@ -99,10 +90,10 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wakewrite)
wake_up_process(tsk);
put_task_struct(tsk);
woken++;
- if (list_empty(&sem->wait_list))
+ if (next == &sem->wait_list)
break;
waiter = list_entry(next, struct rwsem_waiter, list);
- }
+ } while (waiter->type != RWSEM_WAITING_FOR_WRITE);

sem->activity += woken;

diff --git a/lib/rwsem.c b/lib/rwsem.c
index 0d50e46d5b0c..9a675fa9d78e 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -68,20 +68,17 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
signed long woken, loop, adjustment;

waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
- if (waiter->type != RWSEM_WAITING_FOR_WRITE)
- goto readers_only;
-
- if (wake_type == RWSEM_WAKE_READ_OWNED)
- /* Another active reader was observed, so wakeup is not
- * likely to succeed. Save the atomic op.
- */
+ if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
+ if (wake_type != RWSEM_WAKE_READ_OWNED)
+ /* Wake writer at the front of the queue, but do not
+ * grant it the lock yet as we want other writers
+ * to be able to steal it. Readers, on the other hand,
+ * will block as they will notice the queued writer.
+ */
+ wake_up_process(waiter->task);
goto out;
+ }

- /* Wake up the writing waiter and let the task grab the sem: */
- wake_up_process(waiter->task);
- goto out;
-
- readers_only:
/* If we come here from up_xxxx(), another thread might have reached
* rwsem_down_failed_common() before we acquired the spinlock and
* woken up a waiter, making it now active. We prefer to check for
@@ -125,7 +122,8 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
rwsem_atomic_add(adjustment, sem);

next = sem->wait_list.next;
- for (loop = woken; loop > 0; loop--) {
+ loop = woken;
+ do {
waiter = list_entry(next, struct rwsem_waiter, list);
next = waiter->list.next;
tsk = waiter->task;
@@ -133,7 +131,7 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
waiter->task = NULL;
wake_up_process(tsk);
put_task_struct(tsk);
- }
+ } while (--loop);

sem->wait_list.next = next;
next->prev = &sem->wait_list;
--
1.8.1.3

2013-03-15 10:56:32

by Michel Lespinasse

[permalink] [raw]
Subject: [PATCH v2 06/13] rwsem: more agressive lock stealing in rwsem_down_write_failed

Some small code simplifications can be achieved by doing more agressive
lock stealing:

- When rwsem_down_write_failed() notices that there are no active locks
(and thus no thread to wake us if we decided to sleep), it used to wake
the first queued process. However, stealing the lock is also sufficient
to deal with this case, so we don't need this check anymore.

- In try_get_writer_sem(), we can steal the lock even when the first waiter
is a reader. This is correct because the code path that wakes readers is
protected by the wait_lock. As to the performance effects of this change,
they are expected to be minimal: readers are still granted the lock
(rather than having to acquire it themselves) when they reach the front
of the wait queue, so we have essentially the same behavior as in
rwsem-spinlock.

Signed-off-by: Michel Lespinasse <[email protected]>

---
lib/rwsem.c | 29 ++++++++---------------------
1 file changed, 8 insertions(+), 21 deletions(-)

diff --git a/lib/rwsem.c b/lib/rwsem.c
index c73bd96dc30c..2360bf204098 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -143,20 +143,12 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
}

/* Try to get write sem, caller holds sem->wait_lock: */
-static int try_get_writer_sem(struct rw_semaphore *sem,
- struct rwsem_waiter *waiter)
+static int try_get_writer_sem(struct rw_semaphore *sem)
{
- struct rwsem_waiter *fwaiter;
long oldcount, adjustment;

- /* only steal when first waiter is writing */
- fwaiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
- if (fwaiter->type != RWSEM_WAITING_FOR_WRITE)
- return 0;
-
adjustment = RWSEM_ACTIVE_WRITE_BIAS;
- /* Only one waiter in the queue: */
- if (fwaiter == waiter && waiter->list.next == &sem->wait_list)
+ if (list_is_singular(&sem->wait_list))
adjustment -= RWSEM_WAITING_BIAS;

try_again_write:
@@ -233,23 +225,18 @@ struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
/* we're now waiting on the lock, but no longer actively locking */
count = rwsem_atomic_update(adjustment, sem);

- /* If there are no active locks, wake the front queued process(es) up.
- *
- * Alternatively, if we're called from a failed down_write(), there
- * were already threads queued before us and there are no active
- * writers, the lock must be read owned; so we try to wake any read
- * locks that were queued ahead of us. */
- if (count == RWSEM_WAITING_BIAS)
- sem = __rwsem_do_wake(sem, RWSEM_WAKE_NO_ACTIVE);
- else if (count > RWSEM_WAITING_BIAS &&
- adjustment == -RWSEM_ACTIVE_WRITE_BIAS)
+ /* If there were already threads queued before us and there are no
+ * active writers, the lock must be read owned; so we try to wake
+ * any read locks that were queued ahead of us. */
+ if (count > RWSEM_WAITING_BIAS &&
+ adjustment == -RWSEM_ACTIVE_WRITE_BIAS)
sem = __rwsem_do_wake(sem, RWSEM_WAKE_READ_OWNED);

/* wait until we successfully acquire the lock */
while (true) {
set_task_state(tsk, TASK_UNINTERRUPTIBLE);

- if (try_get_writer_sem(sem, &waiter))
+ if (try_get_writer_sem(sem))
break;

raw_spin_unlock_irq(&sem->wait_lock);
--
1.8.1.3

2013-03-15 10:56:50

by Michel Lespinasse

[permalink] [raw]
Subject: [PATCH v2 04/13] rwsem: simplify rwsem_down_read_failed

When trying to acquire a read lock, the RWSEM_ACTIVE_READ_BIAS adjustment
doesn't cause other readers to block, so we never have to worry about waking
them back after canceling this adjustment in rwsem_down_read_failed().

We also never want to steal the lock in rwsem_down_read_failed(), so we
don't have to grab the wait_lock either.

Signed-off-by: Michel Lespinasse <[email protected]>

---
lib/rwsem.c | 22 ++--------------------
1 file changed, 2 insertions(+), 20 deletions(-)

diff --git a/lib/rwsem.c b/lib/rwsem.c
index fb658af1c12c..66f307e90761 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -182,7 +182,6 @@ try_again_write:
*/
struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
{
- enum rwsem_waiter_type type = RWSEM_WAITING_FOR_READ;
signed long adjustment = -RWSEM_ACTIVE_READ_BIAS;
struct rwsem_waiter waiter;
struct task_struct *tsk = current;
@@ -190,7 +189,7 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)

/* set up my own style of waitqueue */
waiter.task = tsk;
- waiter.type = type;
+ waiter.type = RWSEM_WAITING_FOR_READ;
get_task_struct(tsk);

raw_spin_lock_irq(&sem->wait_lock);
@@ -201,17 +200,9 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
/* we're now waiting on the lock, but no longer actively locking */
count = rwsem_atomic_update(adjustment, sem);

- /* If there are no active locks, wake the front queued process(es) up.
- *
- * Alternatively, if we're called from a failed down_write(), there
- * were already threads queued before us and there are no active
- * writers, the lock must be read owned; so we try to wake any read
- * locks that were queued ahead of us. */
+ /* If there are no active locks, wake the front queued process(es). */
if (count == RWSEM_WAITING_BIAS)
sem = __rwsem_do_wake(sem, RWSEM_WAKE_NO_ACTIVE);
- else if (count > RWSEM_WAITING_BIAS &&
- adjustment == -RWSEM_ACTIVE_WRITE_BIAS)
- sem = __rwsem_do_wake(sem, RWSEM_WAKE_READ_OWNED);

raw_spin_unlock_irq(&sem->wait_lock);

@@ -220,15 +211,6 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
if (!waiter.task)
break;
-
- raw_spin_lock_irq(&sem->wait_lock);
- /* Try to get the writer sem, may steal from the head writer: */
- if (type == RWSEM_WAITING_FOR_WRITE)
- if (try_get_writer_sem(sem, &waiter)) {
- raw_spin_unlock_irq(&sem->wait_lock);
- return sem;
- }
- raw_spin_unlock_irq(&sem->wait_lock);
schedule();
}

--
1.8.1.3

2013-03-15 10:57:11

by Michel Lespinasse

[permalink] [raw]
Subject: [PATCH v2 03/13] rwsem: move rwsem_down_failed_common code into rwsem_down_{read,write}_failed

Remove the rwsem_down_failed_common function and replace it with two
identical copies of its code in rwsem_down_{read,write}_failed.

This is because we want to make different optimizations in
rwsem_down_{read,write}_failed; we are adding this pure-duplication
step as a separate commit in order to make it easier to check the
following steps.

Signed-off-by: Michel Lespinasse <[email protected]>

---
lib/rwsem.c | 72 ++++++++++++++++++++++++++++++++++++++++++++++++-------------
1 file changed, 57 insertions(+), 15 deletions(-)

diff --git a/lib/rwsem.c b/lib/rwsem.c
index 40636454cf3c..fb658af1c12c 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -178,12 +178,12 @@ try_again_write:
}

/*
- * wait for a lock to be granted
+ * wait for the read lock to be granted
*/
-static struct rw_semaphore __sched *
-rwsem_down_failed_common(struct rw_semaphore *sem,
- enum rwsem_waiter_type type, signed long adjustment)
+struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
{
+ enum rwsem_waiter_type type = RWSEM_WAITING_FOR_READ;
+ signed long adjustment = -RWSEM_ACTIVE_READ_BIAS;
struct rwsem_waiter waiter;
struct task_struct *tsk = current;
signed long count;
@@ -238,21 +238,63 @@ rwsem_down_failed_common(struct rw_semaphore *sem,
}

/*
- * wait for the read lock to be granted
- */
-struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
-{
- return rwsem_down_failed_common(sem, RWSEM_WAITING_FOR_READ,
- -RWSEM_ACTIVE_READ_BIAS);
-}
-
-/*
* wait for the write lock to be granted
*/
struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
{
- return rwsem_down_failed_common(sem, RWSEM_WAITING_FOR_WRITE,
- -RWSEM_ACTIVE_WRITE_BIAS);
+ enum rwsem_waiter_type type = RWSEM_WAITING_FOR_WRITE;
+ signed long adjustment = -RWSEM_ACTIVE_WRITE_BIAS;
+ struct rwsem_waiter waiter;
+ struct task_struct *tsk = current;
+ signed long count;
+
+ /* set up my own style of waitqueue */
+ waiter.task = tsk;
+ waiter.type = type;
+ get_task_struct(tsk);
+
+ raw_spin_lock_irq(&sem->wait_lock);
+ if (list_empty(&sem->wait_list))
+ adjustment += RWSEM_WAITING_BIAS;
+ list_add_tail(&waiter.list, &sem->wait_list);
+
+ /* we're now waiting on the lock, but no longer actively locking */
+ count = rwsem_atomic_update(adjustment, sem);
+
+ /* If there are no active locks, wake the front queued process(es) up.
+ *
+ * Alternatively, if we're called from a failed down_write(), there
+ * were already threads queued before us and there are no active
+ * writers, the lock must be read owned; so we try to wake any read
+ * locks that were queued ahead of us. */
+ if (count == RWSEM_WAITING_BIAS)
+ sem = __rwsem_do_wake(sem, RWSEM_WAKE_NO_ACTIVE);
+ else if (count > RWSEM_WAITING_BIAS &&
+ adjustment == -RWSEM_ACTIVE_WRITE_BIAS)
+ sem = __rwsem_do_wake(sem, RWSEM_WAKE_READ_OWNED);
+
+ raw_spin_unlock_irq(&sem->wait_lock);
+
+ /* wait to be given the lock */
+ while (true) {
+ set_task_state(tsk, TASK_UNINTERRUPTIBLE);
+ if (!waiter.task)
+ break;
+
+ raw_spin_lock_irq(&sem->wait_lock);
+ /* Try to get the writer sem, may steal from the head writer: */
+ if (type == RWSEM_WAITING_FOR_WRITE)
+ if (try_get_writer_sem(sem, &waiter)) {
+ raw_spin_unlock_irq(&sem->wait_lock);
+ return sem;
+ }
+ raw_spin_unlock_irq(&sem->wait_lock);
+ schedule();
+ }
+
+ tsk->state = TASK_RUNNING;
+
+ return sem;
}

/*
--
1.8.1.3

2013-03-15 10:57:43

by Michel Lespinasse

[permalink] [raw]
Subject: [PATCH v2 02/13] rwsem: shorter spinlocked section in rwsem_down_failed_common()

This change reduces the size of the spinlocked and TASK_UNINTERRUPTIBLE
sections in rwsem_down_failed_common():

- We only need the sem->wait_lock to insert ourselves on the wait_list;
the waiter node can be prepared outside of the wait_lock.

- The task state only needs to be set to TASK_UNINTERRUPTIBLE immediately
before checking if we actually need to sleep; it doesn't need to protect
the entire function.

Signed-off-by: Michel Lespinasse <[email protected]>

---
lib/rwsem.c | 8 +++-----
1 file changed, 3 insertions(+), 5 deletions(-)

diff --git a/lib/rwsem.c b/lib/rwsem.c
index 672eb33218ac..40636454cf3c 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -188,14 +188,12 @@ rwsem_down_failed_common(struct rw_semaphore *sem,
struct task_struct *tsk = current;
signed long count;

- set_task_state(tsk, TASK_UNINTERRUPTIBLE);
-
/* set up my own style of waitqueue */
- raw_spin_lock_irq(&sem->wait_lock);
waiter.task = tsk;
waiter.type = type;
get_task_struct(tsk);

+ raw_spin_lock_irq(&sem->wait_lock);
if (list_empty(&sem->wait_list))
adjustment += RWSEM_WAITING_BIAS;
list_add_tail(&waiter.list, &sem->wait_list);
@@ -218,7 +216,8 @@ rwsem_down_failed_common(struct rw_semaphore *sem,
raw_spin_unlock_irq(&sem->wait_lock);

/* wait to be given the lock */
- for (;;) {
+ while (true) {
+ set_task_state(tsk, TASK_UNINTERRUPTIBLE);
if (!waiter.task)
break;

@@ -231,7 +230,6 @@ rwsem_down_failed_common(struct rw_semaphore *sem,
}
raw_spin_unlock_irq(&sem->wait_lock);
schedule();
- set_task_state(tsk, TASK_UNINTERRUPTIBLE);
}

tsk->state = TASK_RUNNING;
--
1.8.1.3

2013-03-15 11:02:01

by Michel Lespinasse

[permalink] [raw]
Subject: [PATCH v2 08/13] rwsem: avoid taking wait_lock in rwsem_down_write_failed

In rwsem_down_write_failed(), if there are active locks after we wake up
(i.e. the lock got stolen from us), skip taking the wait_lock and go back
to sleep immediately.

Signed-off-by: Michel Lespinasse <[email protected]>

---
lib/rwsem.c | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/lib/rwsem.c b/lib/rwsem.c
index 64c2dc007be2..edf3d9ca670e 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -214,8 +214,8 @@ struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
sem = __rwsem_do_wake(sem, RWSEM_WAKE_READ_OWNED);

/* wait until we successfully acquire the lock */
+ set_task_state(tsk, TASK_UNINTERRUPTIBLE);
while (true) {
- set_task_state(tsk, TASK_UNINTERRUPTIBLE);

/* Try acquiring the write lock. */
count = RWSEM_ACTIVE_WRITE_BIAS;
@@ -226,7 +226,13 @@ struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
break;

raw_spin_unlock_irq(&sem->wait_lock);
- schedule();
+
+ /* Block until there are no active lockers. */
+ do {
+ schedule();
+ set_task_state(tsk, TASK_UNINTERRUPTIBLE);
+ } while (sem->count & RWSEM_ACTIVE_MASK);
+
raw_spin_lock_irq(&sem->wait_lock);
}

--
1.8.1.3

2013-03-15 11:02:18

by Michel Lespinasse

[permalink] [raw]
Subject: [PATCH v2 07/13] rwsem: use cmpxchg for trying to steal write lock

Using rwsem_atomic_update to try stealing the write lock forced us to
undo the adjustment in the failure path. We can have simpler and faster
code by using cmpxchg instead.

Signed-off-by: Michel Lespinasse <[email protected]>

---
lib/rwsem.c | 26 ++++++--------------------
1 file changed, 6 insertions(+), 20 deletions(-)

diff --git a/lib/rwsem.c b/lib/rwsem.c
index 2360bf204098..64c2dc007be2 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -142,25 +142,6 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
return sem;
}

-/* Try to get write sem, caller holds sem->wait_lock: */
-static int try_get_writer_sem(struct rw_semaphore *sem)
-{
- long oldcount, adjustment;
-
- adjustment = RWSEM_ACTIVE_WRITE_BIAS;
- if (list_is_singular(&sem->wait_list))
- adjustment -= RWSEM_WAITING_BIAS;
-
-try_again_write:
- oldcount = rwsem_atomic_update(adjustment, sem) - adjustment;
- if (!(oldcount & RWSEM_ACTIVE_MASK))
- return 1;
- /* some one grabbed the sem already */
- if (rwsem_atomic_update(-adjustment, sem) & RWSEM_ACTIVE_MASK)
- return 0;
- goto try_again_write;
-}
-
/*
* wait for the read lock to be granted
*/
@@ -236,7 +217,12 @@ struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
while (true) {
set_task_state(tsk, TASK_UNINTERRUPTIBLE);

- if (try_get_writer_sem(sem))
+ /* Try acquiring the write lock. */
+ count = RWSEM_ACTIVE_WRITE_BIAS;
+ if (!list_is_singular(&sem->wait_list))
+ count += RWSEM_WAITING_BIAS;
+ if (cmpxchg(&sem->count, RWSEM_WAITING_BIAS, count) ==
+ RWSEM_WAITING_BIAS)
break;

raw_spin_unlock_irq(&sem->wait_lock);
--
1.8.1.3

2013-03-28 16:17:45

by Peter Hurley

[permalink] [raw]
Subject: Re: [PATCH v2 11/13] rwsem: implement support for write lock stealing on the fastpath


On Fri, 2013-03-15 at 03:54 -0700, Michel Lespinasse wrote:
> When we decide to wake up readers, we must first grant them as many
> read locks as necessary, and then actually wake up all these readers.
> But in order to know how many read shares to grant, we must first
> count the readers at the head of the queue. This might take a while
> if there are many readers, and we want to be protected against a
> writer stealing the lock while we're counting. To that end, we grant
> the first reader lock before counting how many more readers are queued.
>
> We also require some adjustments to the wake_type semantics.
>
> RWSEM_WAKE_NO_ACTIVE used to mean that we had found the count to
> be RWSEM_WAITING_BIAS, in which case the rwsem was known to be free
> as nobody could steal it while we hold the wait_lock. This doesn't
> make sense once we implement fastpath write lock stealing, so we now
> use RWSEM_WAKE_ANY in that case.
>
> Similarly, when rwsem_down_write_failed found that a read lock was active,
> it would use RWSEM_WAKE_READ_OWNED which signalled that new readers could
> be woken without checking first that the rwsem was available. We can't do
> that anymore since the existing readers might release their read locks,
> and a writer could steal the lock before we wake up additional readers.
> So, we have to use a new RWSEM_WAKE_READERS value to indicate we only want
> to wake readers, but we don't currently hold any read lock.
>
> Signed-off-by: Michel Lespinasse <[email protected]>
>
> ---
> lib/rwsem.c | 63 ++++++++++++++++++++++++++++++-------------------------------
> 1 file changed, 31 insertions(+), 32 deletions(-)
>
> diff --git a/lib/rwsem.c b/lib/rwsem.c
> index 9a675fa9d78e..09bf03e7808c 100644
> --- a/lib/rwsem.c
> +++ b/lib/rwsem.c
> @@ -41,13 +41,11 @@ struct rwsem_waiter {
> enum rwsem_waiter_type type;
> };
>
> -/* Wake types for __rwsem_do_wake(). Note that RWSEM_WAKE_NO_ACTIVE and
> - * RWSEM_WAKE_READ_OWNED imply that the spinlock must have been kept held
> - * since the rwsem value was observed.
> - */
> -#define RWSEM_WAKE_ANY 0 /* Wake whatever's at head of wait list */
> -#define RWSEM_WAKE_NO_ACTIVE 1 /* rwsem was observed with no active thread */
> -#define RWSEM_WAKE_READ_OWNED 2 /* rwsem was observed to be read owned */
> +enum rwsem_wake_type {
> + RWSEM_WAKE_ANY, /* Wake whatever's at head of wait list */
> + RWSEM_WAKE_READERS, /* Wake readers only */
> + RWSEM_WAKE_READ_OWNED /* Waker thread holds the read lock */
> +};
>
> /*
> * handle the lock release when processes blocked on it that can now run
> @@ -60,16 +58,16 @@ struct rwsem_waiter {
> * - writers are only woken if downgrading is false
> */
> static struct rw_semaphore *
> -__rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
> +__rwsem_do_wake(struct rw_semaphore *sem, enum rwsem_wake_type wake_type)
> {
> struct rwsem_waiter *waiter;
> struct task_struct *tsk;
> struct list_head *next;
> - signed long woken, loop, adjustment;
> + signed long oldcount, woken, loop, adjustment;
>
> waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
> if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
> - if (wake_type != RWSEM_WAKE_READ_OWNED)
> + if (wake_type == RWSEM_WAKE_ANY)
> /* Wake writer at the front of the queue, but do not
> * grant it the lock yet as we want other writers
> * to be able to steal it. Readers, on the other hand,
> @@ -79,24 +77,24 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
> goto out;
> }
>
> - /* If we come here from up_xxxx(), another thread might have reached
> - * rwsem_down_failed_common() before we acquired the spinlock and
> - * woken up a waiter, making it now active. We prefer to check for
> - * this first in order to not spend too much time with the spinlock
> - * held if we're not going to be able to wake up readers in the end.
> - *
> - * Note that we do not need to update the rwsem count: any writer
> - * trying to acquire rwsem will run rwsem_down_write_failed() due
> - * to the waiting threads and block trying to acquire the spinlock.
> - *
> - * We use a dummy atomic update in order to acquire the cache line
> - * exclusively since we expect to succeed and run the final rwsem
> - * count adjustment pretty soon.
> + /* Writers might steal the lock before we grant it to the next reader.
> + * We prefer to do the first reader grant before counting readers
> + * so we can bail out early if a writer stole the lock.
> */
> - if (wake_type == RWSEM_WAKE_ANY &&
> - rwsem_atomic_update(0, sem) < RWSEM_WAITING_BIAS)
> - /* Someone grabbed the sem for write already */
> - goto out;
> + adjustment = 0;
> + if (wake_type != RWSEM_WAKE_READ_OWNED) {
> + adjustment = RWSEM_ACTIVE_READ_BIAS;
> + try_reader_grant:
> + oldcount = rwsem_atomic_update(adjustment, sem) - adjustment;
> + if (unlikely(oldcount < RWSEM_WAITING_BIAS)) {
> + /* A writer stole the lock. Undo our reader grant. */
> + if (rwsem_atomic_update(-adjustment, sem) &
> + RWSEM_ACTIVE_MASK)
> + goto out;
> + /* Last active locker left. Retry waking readers. */
> + goto try_reader_grant;
> + }

I'm not a big fan of goto loops (though I know they were in here
before). The equivalent solution by factoring:

if (!__rwsem_try_reader_grant(sem, adjustment))
goto out;


static inline int __rwsem_try_reader_grant(struct rw_semaphore *sem, long adj)
{
while (1) {
long count = rwsem_atomic_update(adj, sem) - adj;
if (likely(count >= RWSEM_WAITING_BIAS))
break;

/* A writer stole the lock. Undo our reader grant. */
if (rwsem_atomic_update(-adj, sem) & RWSEM_ACTIVE_MASK)
return 0;
/* Last active locker left. Retry waking readers. */
}
return 1;
}

Everything else here looks good.

> + }
>
> /* Grant an infinite number of read locks to the readers at the front
> * of the queue. Note we increment the 'active part' of the count by
> @@ -114,12 +112,13 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
>
> } while (waiter->type != RWSEM_WAITING_FOR_WRITE);
>
> - adjustment = woken * RWSEM_ACTIVE_READ_BIAS;
> + adjustment = woken * RWSEM_ACTIVE_READ_BIAS - adjustment;
> if (waiter->type != RWSEM_WAITING_FOR_WRITE)
> /* hit end of list above */
> adjustment -= RWSEM_WAITING_BIAS;
>
> - rwsem_atomic_add(adjustment, sem);
> + if (adjustment)
> + rwsem_atomic_add(adjustment, sem);
>
> next = sem->wait_list.next;
> loop = woken;
> @@ -164,8 +163,8 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
> count = rwsem_atomic_update(adjustment, sem);
>
> /* If there are no active locks, wake the front queued process(es). */
> - if (count == RWSEM_WAITING_BIAS)
> - sem = __rwsem_do_wake(sem, RWSEM_WAKE_NO_ACTIVE);
> + if (!(count & RWSEM_ACTIVE_MASK))
> + sem = __rwsem_do_wake(sem, RWSEM_WAKE_ANY);
>
> raw_spin_unlock_irq(&sem->wait_lock);
>
> @@ -209,7 +208,7 @@ struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
> * any read locks that were queued ahead of us. */
> if (count > RWSEM_WAITING_BIAS &&
> adjustment == -RWSEM_ACTIVE_WRITE_BIAS)
> - sem = __rwsem_do_wake(sem, RWSEM_WAKE_READ_OWNED);
> + sem = __rwsem_do_wake(sem, RWSEM_WAKE_READERS);
>
> /* wait until we successfully acquire the lock */
> set_task_state(tsk, TASK_UNINTERRUPTIBLE);


2013-03-28 16:21:27

by Peter Hurley

[permalink] [raw]
Subject: Re: [PATCH v2 02/13] rwsem: shorter spinlocked section in rwsem_down_failed_common()

On Fri, 2013-03-15 at 03:54 -0700, Michel Lespinasse wrote:
> This change reduces the size of the spinlocked and TASK_UNINTERRUPTIBLE
> sections in rwsem_down_failed_common():
>
> - We only need the sem->wait_lock to insert ourselves on the wait_list;
> the waiter node can be prepared outside of the wait_lock.
>
> - The task state only needs to be set to TASK_UNINTERRUPTIBLE immediately
> before checking if we actually need to sleep; it doesn't need to protect
> the entire function.
>
> Signed-off-by: Michel Lespinasse <[email protected]>
>
> ---
> lib/rwsem.c | 8 +++-----
> 1 file changed, 3 insertions(+), 5 deletions(-)
>
> diff --git a/lib/rwsem.c b/lib/rwsem.c
> index 672eb33218ac..40636454cf3c 100644
> --- a/lib/rwsem.c
> +++ b/lib/rwsem.c
> @@ -188,14 +188,12 @@ rwsem_down_failed_common(struct rw_semaphore *sem,
> struct task_struct *tsk = current;
> signed long count;
>
> - set_task_state(tsk, TASK_UNINTERRUPTIBLE);
> -
> /* set up my own style of waitqueue */
> - raw_spin_lock_irq(&sem->wait_lock);
> waiter.task = tsk;
> waiter.type = type;
> get_task_struct(tsk);
>
> + raw_spin_lock_irq(&sem->wait_lock);
> if (list_empty(&sem->wait_list))
> adjustment += RWSEM_WAITING_BIAS;
> list_add_tail(&waiter.list, &sem->wait_list);
> @@ -218,7 +216,8 @@ rwsem_down_failed_common(struct rw_semaphore *sem,
> raw_spin_unlock_irq(&sem->wait_lock);
>
> /* wait to be given the lock */
> - for (;;) {
> + while (true) {

I would drop this gratutious change.

> + set_task_state(tsk, TASK_UNINTERRUPTIBLE);
> if (!waiter.task)
> break;
>
> @@ -231,7 +230,6 @@ rwsem_down_failed_common(struct rw_semaphore *sem,
> }
> raw_spin_unlock_irq(&sem->wait_lock);
> schedule();
> - set_task_state(tsk, TASK_UNINTERRUPTIBLE);
> }
>
> tsk->state = TASK_RUNNING;

2013-03-28 16:44:21

by Peter Hurley

[permalink] [raw]
Subject: Re: [PATCH v2 07/13] rwsem: use cmpxchg for trying to steal write lock

On Fri, 2013-03-15 at 03:54 -0700, Michel Lespinasse wrote:
> Using rwsem_atomic_update to try stealing the write lock forced us to
> undo the adjustment in the failure path. We can have simpler and faster
> code by using cmpxchg instead.
>
> Signed-off-by: Michel Lespinasse <[email protected]>
>
> ---
> lib/rwsem.c | 26 ++++++--------------------
> 1 file changed, 6 insertions(+), 20 deletions(-)
>
> diff --git a/lib/rwsem.c b/lib/rwsem.c
> index 2360bf204098..64c2dc007be2 100644
> --- a/lib/rwsem.c
> +++ b/lib/rwsem.c
> @@ -142,25 +142,6 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
> return sem;
> }
>
> -/* Try to get write sem, caller holds sem->wait_lock: */
> -static int try_get_writer_sem(struct rw_semaphore *sem)
> -{
> - long oldcount, adjustment;
> -
> - adjustment = RWSEM_ACTIVE_WRITE_BIAS;
> - if (list_is_singular(&sem->wait_list))
> - adjustment -= RWSEM_WAITING_BIAS;
> -
> -try_again_write:
> - oldcount = rwsem_atomic_update(adjustment, sem) - adjustment;
> - if (!(oldcount & RWSEM_ACTIVE_MASK))
> - return 1;
> - /* some one grabbed the sem already */
> - if (rwsem_atomic_update(-adjustment, sem) & RWSEM_ACTIVE_MASK)
> - return 0;
> - goto try_again_write;
> -}
> -
> /*
> * wait for the read lock to be granted
> */
> @@ -236,7 +217,12 @@ struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
> while (true) {
> set_task_state(tsk, TASK_UNINTERRUPTIBLE);
>
> - if (try_get_writer_sem(sem))
> + /* Try acquiring the write lock. */
> + count = RWSEM_ACTIVE_WRITE_BIAS;
> + if (!list_is_singular(&sem->wait_list))
> + count += RWSEM_WAITING_BIAS;
> + if (cmpxchg(&sem->count, RWSEM_WAITING_BIAS, count) ==
> + RWSEM_WAITING_BIAS)

This is just my opinion but I think this is way more readable if this
stays in try_get_writer_sem() and you change that to static inline
rather than inlining here. Especially because 9/13 adds a scope.

> break;
>
> raw_spin_unlock_irq(&sem->wait_lock);

2013-03-28 16:51:51

by Peter Hurley

[permalink] [raw]
Subject: Re: [PATCH v2 10/13] rwsem: simplify __rwsem_do_wake

On Fri, 2013-03-15 at 03:54 -0700, Michel Lespinasse wrote:
> This is mostly for cleanup value:
>
> - We don't need several gotos to handle the case where the first
> waiter is a writer. Two simple tests will do (and generate very
> similar code).
>
> - In the remainder of the function, we know the first waiter is a reader,
> so we don't have to double check that. We can use do..while loops
> to iterate over the readers to wake (generates slightly better code).
>
> Signed-off-by: Michel Lespinasse <[email protected]>
>
> ---
> lib/rwsem-spinlock.c | 25 ++++++++-----------------
> lib/rwsem.c | 26 ++++++++++++--------------
> 2 files changed, 20 insertions(+), 31 deletions(-)
>
> diff --git a/lib/rwsem-spinlock.c b/lib/rwsem-spinlock.c
> index 5f117f37ac0a..20207a6e8ac3 100644
> --- a/lib/rwsem-spinlock.c
> +++ b/lib/rwsem-spinlock.c
> @@ -70,26 +70,17 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wakewrite)
>
> waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
>
> - if (!wakewrite) {
> - if (waiter->type == RWSEM_WAITING_FOR_WRITE)
> - goto out;
> - goto dont_wake_writers;
> - }
> -
> - /*
> - * as we support write lock stealing, we can't set sem->activity
> - * to -1 here to indicate we get the lock. Instead, we wake it up
> - * to let it go get it again.
> - */
> if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
> - wake_up_process(waiter->task);
> + if (wakewrite)
> + /* Wake up a writer. Note that we do not grant it the
> + * lock - it will have to acquire it when it runs. */
> + wake_up_process(waiter->task);
> goto out;
> }
>
> - /* grant an infinite number of read locks to the front of the queue */
> - dont_wake_writers:
> + /* grant read locks to all queued readers. */

I think the original comment still applies here now?

> woken = 0;
> - while (waiter->type == RWSEM_WAITING_FOR_READ) {
> + do {
> struct list_head *next = waiter->list.next;
>
> list_del(&waiter->list);
> @@ -99,10 +90,10 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wakewrite)
> wake_up_process(tsk);
> put_task_struct(tsk);
> woken++;
> - if (list_empty(&sem->wait_list))
> + if (next == &sem->wait_list)
> break;
> waiter = list_entry(next, struct rwsem_waiter, list);
> - }
> + } while (waiter->type != RWSEM_WAITING_FOR_WRITE);

Maybe convert this to list_for_each_entry_safe() since you're cleaning
it up?

>
> sem->activity += woken;
>
> diff --git a/lib/rwsem.c b/lib/rwsem.c
> index 0d50e46d5b0c..9a675fa9d78e 100644
> --- a/lib/rwsem.c
> +++ b/lib/rwsem.c
> @@ -68,20 +68,17 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
> signed long woken, loop, adjustment;
>
> waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
> - if (waiter->type != RWSEM_WAITING_FOR_WRITE)
> - goto readers_only;
> -
> - if (wake_type == RWSEM_WAKE_READ_OWNED)
> - /* Another active reader was observed, so wakeup is not
> - * likely to succeed. Save the atomic op.
> - */
> + if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
> + if (wake_type != RWSEM_WAKE_READ_OWNED)
> + /* Wake writer at the front of the queue, but do not
> + * grant it the lock yet as we want other writers
> + * to be able to steal it. Readers, on the other hand,
> + * will block as they will notice the queued writer.
> + */
> + wake_up_process(waiter->task);
> goto out;
> + }
>
> - /* Wake up the writing waiter and let the task grab the sem: */
> - wake_up_process(waiter->task);
> - goto out;
> -
> - readers_only:
> /* If we come here from up_xxxx(), another thread might have reached
> * rwsem_down_failed_common() before we acquired the spinlock and
> * woken up a waiter, making it now active. We prefer to check for
> @@ -125,7 +122,8 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
> rwsem_atomic_add(adjustment, sem);
>
> next = sem->wait_list.next;
> - for (loop = woken; loop > 0; loop--) {
> + loop = woken;
> + do {
> waiter = list_entry(next, struct rwsem_waiter, list);
> next = waiter->list.next;
> tsk = waiter->task;
> @@ -133,7 +131,7 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
> waiter->task = NULL;
> wake_up_process(tsk);
> put_task_struct(tsk);
> - }
> + } while (--loop);
>
> sem->wait_list.next = next;
> next->prev = &sem->wait_list;

2013-03-28 17:05:45

by Peter Hurley

[permalink] [raw]
Subject: Re: [PATCH v2 03/13] rwsem: move rwsem_down_failed_common code into rwsem_down_{read,write}_failed

On Fri, 2013-03-15 at 03:54 -0700, Michel Lespinasse wrote:
> Remove the rwsem_down_failed_common function and replace it with two
> identical copies of its code in rwsem_down_{read,write}_failed.
>
> This is because we want to make different optimizations in
> rwsem_down_{read,write}_failed; we are adding this pure-duplication
> step as a separate commit in order to make it easier to check the
> following steps.
>
> Signed-off-by: Michel Lespinasse <[email protected]>
>
> ---
> lib/rwsem.c | 72 ++++++++++++++++++++++++++++++++++++++++++++++++-------------
> 1 file changed, 57 insertions(+), 15 deletions(-)
>
> diff --git a/lib/rwsem.c b/lib/rwsem.c
> index 40636454cf3c..fb658af1c12c 100644
> --- a/lib/rwsem.c
> +++ b/lib/rwsem.c
> @@ -178,12 +178,12 @@ try_again_write:
> }
>
> /*
> - * wait for a lock to be granted
> + * wait for the read lock to be granted
> */
> -static struct rw_semaphore __sched *
> -rwsem_down_failed_common(struct rw_semaphore *sem,
> - enum rwsem_waiter_type type, signed long adjustment)
> +struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
> {
> + enum rwsem_waiter_type type = RWSEM_WAITING_FOR_READ;
> + signed long adjustment = -RWSEM_ACTIVE_READ_BIAS;

Again, just my opinion (and I suspect you only did this because that's
what was here) but I think this should be:

+ long adjustment = -RWSEM_ACTIVE_READ_BIAS;

> struct rwsem_waiter waiter;
> struct task_struct *tsk = current;
> signed long count;

Same here.

> @@ -238,21 +238,63 @@ rwsem_down_failed_common(struct rw_semaphore *sem,
> }
>
> /*
> - * wait for the read lock to be granted
> - */
> -struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
> -{
> - return rwsem_down_failed_common(sem, RWSEM_WAITING_FOR_READ,
> - -RWSEM_ACTIVE_READ_BIAS);
> -}
> -
> -/*
> * wait for the write lock to be granted
> */
> struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
> {
> - return rwsem_down_failed_common(sem, RWSEM_WAITING_FOR_WRITE,
> - -RWSEM_ACTIVE_WRITE_BIAS);
> + enum rwsem_waiter_type type = RWSEM_WAITING_FOR_WRITE;
> + signed long adjustment = -RWSEM_ACTIVE_WRITE_BIAS;

Same here.

> + struct rwsem_waiter waiter;
> + struct task_struct *tsk = current;
> + signed long count;

And here.

> +
> + /* set up my own style of waitqueue */
> + waiter.task = tsk;
> + waiter.type = type;
> + get_task_struct(tsk);
> +
> + raw_spin_lock_irq(&sem->wait_lock);
> + if (list_empty(&sem->wait_list))
> + adjustment += RWSEM_WAITING_BIAS;
> + list_add_tail(&waiter.list, &sem->wait_list);
> +
> + /* we're now waiting on the lock, but no longer actively locking */
> + count = rwsem_atomic_update(adjustment, sem);
> +
> + /* If there are no active locks, wake the front queued process(es) up.
> + *
> + * Alternatively, if we're called from a failed down_write(), there
> + * were already threads queued before us and there are no active
> + * writers, the lock must be read owned; so we try to wake any read
> + * locks that were queued ahead of us. */
> + if (count == RWSEM_WAITING_BIAS)
> + sem = __rwsem_do_wake(sem, RWSEM_WAKE_NO_ACTIVE);
> + else if (count > RWSEM_WAITING_BIAS &&
> + adjustment == -RWSEM_ACTIVE_WRITE_BIAS)
> + sem = __rwsem_do_wake(sem, RWSEM_WAKE_READ_OWNED);
> +
> + raw_spin_unlock_irq(&sem->wait_lock);
> +
> + /* wait to be given the lock */
> + while (true) {
> + set_task_state(tsk, TASK_UNINTERRUPTIBLE);
> + if (!waiter.task)
> + break;
> +
> + raw_spin_lock_irq(&sem->wait_lock);
> + /* Try to get the writer sem, may steal from the head writer: */
> + if (type == RWSEM_WAITING_FOR_WRITE)
> + if (try_get_writer_sem(sem, &waiter)) {
> + raw_spin_unlock_irq(&sem->wait_lock);
> + return sem;
> + }
> + raw_spin_unlock_irq(&sem->wait_lock);
> + schedule();
> + }
> +
> + tsk->state = TASK_RUNNING;
> +
> + return sem;
> }
>
> /*

2013-03-28 17:25:25

by Peter Hurley

[permalink] [raw]
Subject: Re: [PATCH v2 12/13] rwsem: do not block readers at head of queue if other readers are active

On Fri, 2013-03-15 at 03:54 -0700, Michel Lespinasse wrote:
> This change fixes a race condition where a reader might determine it
> needs to block, but by the time it acquires the wait_lock the rwsem
> has active readers and no queued waiters.
>
> In this situation the reader can just in parallel with the existing active
^^^
start ?

> readers; it does not need to block until the active readers complete.
>
> Thanks to Peter Hurley for noticing this possible race.
>
> Signed-off-by: Michel Lespinasse <[email protected]>
>
> ---
> lib/rwsem.c | 10 ++++++++--
> 1 file changed, 8 insertions(+), 2 deletions(-)
>
> diff --git a/lib/rwsem.c b/lib/rwsem.c
> index 09bf03e7808c..4e4c8893dc00 100644
> --- a/lib/rwsem.c
> +++ b/lib/rwsem.c
> @@ -162,8 +162,14 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
> /* we're now waiting on the lock, but no longer actively locking */
> count = rwsem_atomic_update(adjustment, sem);
^^^^^^^^^
>
> - /* If there are no active locks, wake the front queued process(es). */
> - if (!(count & RWSEM_ACTIVE_MASK))
> + /* If there are no active locks, wake the front queued process(es).
> + *
> + * If there are no writers and we are first in the queue,
> + * wake our own waiter to join the existing active readers !
> + */
> + if (count == RWSEM_WAITING_BIAS ||
> + (count > RWSEM_WAITING_BIAS &&
> + adjustment != -RWSEM_ACTIVE_READ_BIAS))
> sem = __rwsem_do_wake(sem, RWSEM_WAKE_ANY);

Thanks for fixing this.

2013-03-28 19:01:06

by Rik van Riel

[permalink] [raw]
Subject: Re: [PATCH v2 01/13] rwsem: make the waiter type an enumeration rather than a bitmask

On 03/15/2013 06:54 AM, Michel Lespinasse wrote:
> We are not planning to add some new waiter flags, so we can convert the
> waiter type into an enumeration.
>
> Background: David Howells suggested I do this back when I tried adding
> a new waiter type for unfair readers. However, I believe the cleanup
> applies regardless of that use case.
>
> Signed-off-by: Michel Lespinasse <[email protected]>

Reviewed-by: Rik van Riel <[email protected]>

2013-03-28 19:05:20

by Rik van Riel

[permalink] [raw]
Subject: Re: [PATCH v2 02/13] rwsem: shorter spinlocked section in rwsem_down_failed_common()

On 03/15/2013 06:54 AM, Michel Lespinasse wrote:
> This change reduces the size of the spinlocked and TASK_UNINTERRUPTIBLE
> sections in rwsem_down_failed_common():
>
> - We only need the sem->wait_lock to insert ourselves on the wait_list;
> the waiter node can be prepared outside of the wait_lock.
>
> - The task state only needs to be set to TASK_UNINTERRUPTIBLE immediately
> before checking if we actually need to sleep; it doesn't need to protect
> the entire function.
>
> Signed-off-by: Michel Lespinasse <[email protected]>

Reviewed-by: Rik van Riel <[email protected]>

Feel free to keep this Reviewed-by: line across cosmetic
changes.

2013-03-28 19:10:14

by Rik van Riel

[permalink] [raw]
Subject: Re: [PATCH v2 03/13] rwsem: move rwsem_down_failed_common code into rwsem_down_{read,write}_failed

On 03/15/2013 06:54 AM, Michel Lespinasse wrote:
> Remove the rwsem_down_failed_common function and replace it with two
> identical copies of its code in rwsem_down_{read,write}_failed.
>
> This is because we want to make different optimizations in
> rwsem_down_{read,write}_failed; we are adding this pure-duplication
> step as a separate commit in order to make it easier to check the
> following steps.
>
> Signed-off-by: Michel Lespinasse <[email protected]>

Reviewed-by: Rik van Riel <[email protected]>

2013-03-28 19:19:54

by Peter Hurley

[permalink] [raw]
Subject: Re: [PATCH v2 00/13] rwsem fast-path write lock stealing

On Fri, 2013-03-15 at 03:54 -0700, Michel Lespinasse wrote:
> These patches extend Alex Shi's work (which added write lock stealing
> on the rwsem slow path) in order to provide rwsem write lock stealing
> on the fast path (that is, without taking the rwsem's wait_lock).

Since none of Alex's original code will be in here with these patches
applied, you might as well add this:

diff --git a/lib/rwsem.c b/lib/rwsem.c
index 4e4c889..61f91ca 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -4,6 +4,7 @@
* Derived from arch/i386/kernel/semaphore.c
*
* Writer lock-stealing by Alex Shi <[email protected]>
+ * and Michel Lespinasse <[email protected]>
*/
#include <linux/rwsem.h>
#include <linux/sched.h>


> Michel Lespinasse (13):
> rwsem: make the waiter type an enumeration rather than a bitmask
> rwsem: shorter spinlocked section in rwsem_down_failed_common()
> rwsem: move rwsem_down_failed_common code into rwsem_down_{read,write}_failed
> rwsem: simplify rwsem_down_read_failed
> rwsem: simplify rwsem_down_write_failed
> rwsem: more agressive lock stealing in rwsem_down_write_failed
> rwsem: use cmpxchg for trying to steal write lock
> rwsem: avoid taking wait_lock in rwsem_down_write_failed
> rwsem: skip initial trylock in rwsem_down_write_failed
> rwsem: simplify __rwsem_do_wake
> rwsem: implement support for write lock stealing on the fastpath
> rwsem: do not block readers at head of queue if other readers are active
> x86 rwsem: avoid taking slow path when stealing write lock
>
> arch/x86/include/asm/rwsem.h | 28 +++--
> lib/rwsem-spinlock.c | 40 +++-----
> lib/rwsem.c | 239 +++++++++++++++++++++----------------------
> 3 files changed, 154 insertions(+), 153 deletions(-)

Because Michel asked:

Reviewed-by: Peter Hurley <[email protected]>

2013-03-28 19:23:40

by Rik van Riel

[permalink] [raw]
Subject: Re: [PATCH v2 04/13] rwsem: simplify rwsem_down_read_failed

On 03/15/2013 06:54 AM, Michel Lespinasse wrote:
> When trying to acquire a read lock, the RWSEM_ACTIVE_READ_BIAS adjustment
> doesn't cause other readers to block, so we never have to worry about waking
> them back after canceling this adjustment in rwsem_down_read_failed().
>
> We also never want to steal the lock in rwsem_down_read_failed(), so we
> don't have to grab the wait_lock either.
>
> Signed-off-by: Michel Lespinasse <[email protected]>

Reviewed-by: Rik van Riel <[email protected]>

2013-03-28 20:34:42

by Rik van Riel

[permalink] [raw]
Subject: Re: [PATCH v2 05/13] rwsem: simplify rwsem_down_write_failed

On 03/15/2013 06:54 AM, Michel Lespinasse wrote:
> When waking writers, we never grant them the lock - instead, they have
> to acquire it themselves when they run, and remove themselves from the
> wait_list when they succeed.
>
> As a result, we can do a few simplifications in rwsem_down_write_failed():
>
> - We don't need to check for !waiter.task since __rwsem_do_wake() doesn't
> remove writers from the wait_list
>
> - There is no point releaseing the wait_lock before entering the wait loop,
> as we will need to reacquire it immediately. We can change the loop so
> that the lock is always held at the start of each loop iteration.
>
> - We don't need to get a reference on the task structure, since the task
> is responsible for removing itself from the wait_list. There is no risk,
> like in the rwsem_down_read_failed() case, that a task would wake up and
> exit (thus destroying its task structure) while __rwsem_do_wake() is
> still running - wait_lock protects against that.
>
> Signed-off-by: Michel Lespinasse <[email protected]>

Reviewed-by: Rik van Riel <[email protected]>

2013-03-28 20:56:25

by Rik van Riel

[permalink] [raw]
Subject: Re: [PATCH v2 06/13] rwsem: more agressive lock stealing in rwsem_down_write_failed

On 03/15/2013 06:54 AM, Michel Lespinasse wrote:
> Some small code simplifications can be achieved by doing more agressive
> lock stealing:
>
> - When rwsem_down_write_failed() notices that there are no active locks
> (and thus no thread to wake us if we decided to sleep), it used to wake
> the first queued process. However, stealing the lock is also sufficient
> to deal with this case, so we don't need this check anymore.
>
> - In try_get_writer_sem(), we can steal the lock even when the first waiter
> is a reader. This is correct because the code path that wakes readers is
> protected by the wait_lock. As to the performance effects of this change,
> they are expected to be minimal: readers are still granted the lock
> (rather than having to acquire it themselves) when they reach the front
> of the wait queue, so we have essentially the same behavior as in
> rwsem-spinlock.
>
> Signed-off-by: Michel Lespinasse <[email protected]>

Reviewed-by: Rik van Riel <[email protected]>

2013-03-28 21:01:08

by Rik van Riel

[permalink] [raw]
Subject: Re: [PATCH v2 07/13] rwsem: use cmpxchg for trying to steal write lock

On 03/15/2013 06:54 AM, Michel Lespinasse wrote:
> Using rwsem_atomic_update to try stealing the write lock forced us to
> undo the adjustment in the failure path. We can have simpler and faster
> code by using cmpxchg instead.
>
> Signed-off-by: Michel Lespinasse <[email protected]>

Acked-by: Rik van Riel <[email protected]>

2013-03-28 21:02:48

by Rik van Riel

[permalink] [raw]
Subject: Re: [PATCH v2 08/13] rwsem: avoid taking wait_lock in rwsem_down_write_failed

On 03/15/2013 06:54 AM, Michel Lespinasse wrote:
> In rwsem_down_write_failed(), if there are active locks after we wake up
> (i.e. the lock got stolen from us), skip taking the wait_lock and go back
> to sleep immediately.
>
> Signed-off-by: Michel Lespinasse <[email protected]>

Acked-by: Rik van Riel <[email protected]>

2013-03-28 21:17:56

by Rik van Riel

[permalink] [raw]
Subject: Re: [PATCH v2 09/13] rwsem: skip initial trylock in rwsem_down_write_failed

On 03/15/2013 06:54 AM, Michel Lespinasse wrote:
> We can skip the initial trylock in rwsem_down_write_failed() if there are
> known active lockers already, thus saving one likely-to-fail cmpxchg.
>
> Signed-off-by: Michel Lespinasse <[email protected]>

Acked-by: Rik van Riel <[email protected]>

2013-04-27 21:21:07

by Davidlohr Bueso

[permalink] [raw]
Subject: Re: [PATCH v2 03/13] rwsem: move rwsem_down_failed_common code into rwsem_down_{read,write}_failed

Sorry, but I just couldn't help myself :)

From: Davidlohr Bueso <[email protected]>
Subject: [PATCH] rwsem: no need for explicit signed longs

Signed-off-by: Davidlohr Bueso <[email protected]>
---
lib/rwsem.c | 10 ++++------
1 file changed, 4 insertions(+), 6 deletions(-)

diff --git a/lib/rwsem.c b/lib/rwsem.c
index 4e4c889..50fdd89 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -63,7 +63,7 @@ __rwsem_do_wake(struct rw_semaphore *sem, enum rwsem_wake_type wake_type)
struct rwsem_waiter *waiter;
struct task_struct *tsk;
struct list_head *next;
- signed long oldcount, woken, loop, adjustment;
+ long oldcount, woken, loop, adjustment;

waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
@@ -144,11 +144,10 @@ __rwsem_do_wake(struct rw_semaphore *sem, enum rwsem_wake_type wake_type)
*/
struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
{
- signed long adjustment = -RWSEM_ACTIVE_READ_BIAS;
+ long count, adjustment = -RWSEM_ACTIVE_READ_BIAS;
struct rwsem_waiter waiter;
struct task_struct *tsk = current;
- signed long count;
-
+
/* set up my own style of waitqueue */
waiter.task = tsk;
waiter.type = RWSEM_WAITING_FOR_READ;
@@ -192,10 +191,9 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
*/
struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
{
- signed long adjustment = -RWSEM_ACTIVE_WRITE_BIAS;
+ long count, adjustment = -RWSEM_ACTIVE_WRITE_BIAS;
struct rwsem_waiter waiter;
struct task_struct *tsk = current;
- signed long count;

/* set up my own style of waitqueue */
waiter.task = tsk;
--
1.7.11.7


2013-04-27 23:47:37

by Michel Lespinasse

[permalink] [raw]
Subject: Re: [PATCH v2 00/13] rwsem fast-path write lock stealing

On Sat, Apr 27, 2013 at 2:15 PM, Davidlohr Bueso <[email protected]> wrote:
> Hi Michel,
>
> On Fri, 2013-03-15 at 03:54 -0700, Michel Lespinasse wrote:
>> These patches extend Alex Shi's work (which added write lock stealing
>> on the rwsem slow path) in order to provide rwsem write lock stealing
>> on the fast path (that is, without taking the rwsem's wait_lock).
>>
>> I initially sent a shorter series shortly before v3.9, however some
>> patches were doing too much at once which made them confusing to
>> review. I have now split the series at a smaller granularity;
>> hope this will help :)
>
> Coincidentally, I was doing some similar work with similar results. But
> I have to say I like your series and cleanups better than mine :)
>
> I've run a few tests on this patchset and I'm pretty happy with the
> numbers. The first series of tests were on my quad-core laptop running
> pgbench on three different database sizes, each with various number of
> clients:

Thanks a lot for this testing. I was getting discouraged with this
series, as I would have hoped for it to be picked in -next a long long
time ago. Anyway, I will resend a v3 with the (mostly style) updates I
got from everyone on v2 and try submitting it to Linus.

--
Michel "Walken" Lespinasse
A program is never fully debugged until the last user dies.

2013-05-07 10:22:57

by Michel Lespinasse

[permalink] [raw]
Subject: Re: [PATCH v2 00/13] rwsem fast-path write lock stealing

On Sat, Apr 27, 2013 at 02:15:52PM -0700, Davidlohr Bueso wrote:
> From: Davidlohr Bueso <[email protected]>
> Subject: [PATCH] rwsem: check counter to avoid cmpxchg calls
>
> This patch tries to reduce the amount of cmpxchg calls in the
> writer failed path by checking the counter value first before
> issuing the instruction. If ->count is not set to RWSEM_WAITING_BIAS
> then there is no point wasting a cmpxchg call.
>
> Signed-off-by: Davidlohr Bueso <[email protected]>
> ---
> lib/rwsem.c | 7 ++++---
> 1 file changed, 4 insertions(+), 3 deletions(-)
>
> diff --git a/lib/rwsem.c b/lib/rwsem.c
> index 50fdd89..a79dc95 100644
> --- a/lib/rwsem.c
> +++ b/lib/rwsem.c
> @@ -222,9 +222,10 @@ struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
> count = RWSEM_ACTIVE_WRITE_BIAS;
> if (!list_is_singular(&sem->wait_list))
> count += RWSEM_WAITING_BIAS;
> - if (cmpxchg(&sem->count, RWSEM_WAITING_BIAS, count) ==
> - RWSEM_WAITING_BIAS)
> - break;
> + if ((*(volatile long *)&(sem)->count) == RWSEM_WAITING_BIAS)
> + if (cmpxchg(&sem->count, RWSEM_WAITING_BIAS, count) ==
> + RWSEM_WAITING_BIAS)
> + break;
> }
>
> raw_spin_unlock_irq(&sem->wait_lock);

Quite impressed that this makes as much of a difference - this code block
already checks that the active count was 0 (which implies the sem->count
must be RWSEM_WAITING_BIAS, since the writer thread is known to be waiting)
not long before. But I suppose it helps due to the case where someone else
steals the lock while we're trying to acquire sem->wait_lock.

Regarding style: I would prefer if the sem->count read didn't use the
volatile cast. The compiler will already be forced to reload the value
due to the barriers in set_task_state() and raw_spin_lock_irq(). And if
you absolutely need to have that volatile, I would prefer you use
ACCESS_ONCE() rather than the hand written equivalent.

I'm going to try pushing this to Linus tomorrow; I feel I shouldn't change
your patch without putting it through tests again; are you OK with sending
this as a followup to the series rather than me including it ?

--
Michel "Walken" Lespinasse
A program is never fully debugged until the last user dies.