2019-11-13 12:03:44

by Peter Zijlstra

[permalink] [raw]
Subject: [PATCH 5/5] locking/percpu-rwsem: Remove the embedded rwsem

The filesystem freezer uses percpu-rwsem in a way that is effectively
write_non_owner() and achieves this with a few horrible hacks that
rely on the rwsem (!percpu) implementation.

When PREEMPT_RT replaces the rwsem implementation with a PI aware
variant this comes apart.

Remove the embedded rwsem and implement it using a waitqueue and an
atomic_t.

- make readers_block an atomic, and use it, with the waitqueue
for a blocking test-and-set write-side.

- have the read-side wait for the 'lock' state to clear.

Have the waiters use FIFO queueing and mark them (reader/writer) with
a new WQ_FLAG. Use a custom wake_function to wake either a single
writer or all readers until a writer.

Signed-off-by: Peter Zijlstra (Intel) <[email protected]>
---
include/linux/percpu-rwsem.h | 19 +----
include/linux/wait.h | 1
kernel/locking/percpu-rwsem.c | 140 +++++++++++++++++++++++++++++-------------
kernel/locking/rwsem.c | 9 +-
kernel/locking/rwsem.h | 12 ---
5 files changed, 110 insertions(+), 71 deletions(-)

--- a/include/linux/percpu-rwsem.h
+++ b/include/linux/percpu-rwsem.h
@@ -3,18 +3,18 @@
#define _LINUX_PERCPU_RWSEM_H

#include <linux/atomic.h>
-#include <linux/rwsem.h>
#include <linux/percpu.h>
#include <linux/rcuwait.h>
+#include <linux/wait.h>
#include <linux/rcu_sync.h>
#include <linux/lockdep.h>

struct percpu_rw_semaphore {
struct rcu_sync rss;
unsigned int __percpu *read_count;
- struct rw_semaphore rw_sem; /* slowpath */
- struct rcuwait writer; /* blocked writer */
- int readers_block;
+ struct rcuwait writer;
+ wait_queue_head_t waiters;
+ atomic_t block;
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
@@ -31,8 +31,9 @@ static DEFINE_PER_CPU(unsigned int, __pe
is_static struct percpu_rw_semaphore name = { \
.rss = __RCU_SYNC_INITIALIZER(name.rss), \
.read_count = &__percpu_rwsem_rc_##name, \
- .rw_sem = __RWSEM_INITIALIZER(name.rw_sem), \
.writer = __RCUWAIT_INITIALIZER(name.writer), \
+ .waiters = __WAIT_QUEUE_HEAD_INITIALIZER(name.waiters), \
+ .block = ATOMIC_INIT(0), \
__PERCPU_RWSEM_DEP_MAP_INIT(name) \
}

@@ -130,20 +131,12 @@ static inline void percpu_rwsem_release(
bool read, unsigned long ip)
{
lock_release(&sem->dep_map, ip);
-#ifdef CONFIG_RWSEM_SPIN_ON_OWNER
- if (!read)
- atomic_long_set(&sem->rw_sem.owner, RWSEM_OWNER_UNKNOWN);
-#endif
}

static inline void percpu_rwsem_acquire(struct percpu_rw_semaphore *sem,
bool read, unsigned long ip)
{
lock_acquire(&sem->dep_map, 0, 1, read, 1, NULL, ip);
-#ifdef CONFIG_RWSEM_SPIN_ON_OWNER
- if (!read)
- atomic_long_set(&sem->rw_sem.owner, (long)current);
-#endif
}

#endif
--- a/include/linux/wait.h
+++ b/include/linux/wait.h
@@ -20,6 +20,7 @@ int default_wake_function(struct wait_qu
#define WQ_FLAG_EXCLUSIVE 0x01
#define WQ_FLAG_WOKEN 0x02
#define WQ_FLAG_BOOKMARK 0x04
+#define WQ_FLAG_CUSTOM 0x08

/*
* A single wait-queue entry structure:
--- a/kernel/locking/percpu-rwsem.c
+++ b/kernel/locking/percpu-rwsem.c
@@ -1,15 +1,14 @@
// SPDX-License-Identifier: GPL-2.0-only
#include <linux/atomic.h>
-#include <linux/rwsem.h>
#include <linux/percpu.h>
+#include <linux/wait.h>
#include <linux/lockdep.h>
#include <linux/percpu-rwsem.h>
#include <linux/rcupdate.h>
#include <linux/sched.h>
+#include <linux/sched/task.h>
#include <linux/errno.h>

-#include "rwsem.h"
-
int __percpu_init_rwsem(struct percpu_rw_semaphore *sem,
const char *name, struct lock_class_key *key)
{
@@ -17,11 +16,10 @@ int __percpu_init_rwsem(struct percpu_rw
if (unlikely(!sem->read_count))
return -ENOMEM;

- /* ->rw_sem represents the whole percpu_rw_semaphore for lockdep */
rcu_sync_init(&sem->rss);
- __init_rwsem(&sem->rw_sem, name, rwsem_key);
rcuwait_init(&sem->writer);
- sem->readers_block = 0;
+ init_waitqueue_head(&sem->waiters);
+ atomic_set(&sem->block, 0);
#ifdef CONFIG_DEBUG_LOCK_ALLOC
debug_check_no_locks_freed((void *)sem, sizeof(*sem));
lockdep_init_map(&sem->dep_map, name, key, 0);
@@ -54,23 +52,23 @@ static bool __percpu_down_read_trylock(s
* the same CPU as the increment, avoiding the
* increment-on-one-CPU-and-decrement-on-another problem.
*
- * If the reader misses the writer's assignment of readers_block, then
- * the writer is guaranteed to see the reader's increment.
+ * If the reader misses the writer's assignment of sem->block, then the
+ * writer is guaranteed to see the reader's increment.
*
* Conversely, any readers that increment their sem->read_count after
- * the writer looks are guaranteed to see the readers_block value,
- * which in turn means that they are guaranteed to immediately
- * decrement their sem->read_count, so that it doesn't matter that the
- * writer missed them.
+ * the writer looks are guaranteed to see the sem->block value, which
+ * in turn means that they are guaranteed to immediately decrement
+ * their sem->read_count, so that it doesn't matter that the writer
+ * missed them.
*/

smp_mb(); /* A matches D */

/*
- * If !readers_block the critical section starts here, matched by the
+ * If !sem->block the critical section starts here, matched by the
* release in percpu_up_write().
*/
- if (likely(!atomic_read_acquire(&sem->readers_block)))
+ if (likely(!atomic_read_acquire(&sem->block)))
return true;

__this_cpu_dec(*sem->read_count);
@@ -81,6 +79,75 @@ static bool __percpu_down_read_trylock(s
return false;
}

+static inline bool __percpu_down_write_trylock(struct percpu_rw_semaphore *sem)
+{
+ if (atomic_read(&sem->block))
+ return false;
+
+ return atomic_xchg(&sem->block, 1) == 0;
+}
+
+static bool __percpu_rwsem_trylock(struct percpu_rw_semaphore *sem, bool reader)
+{
+ if (reader) {
+ bool ret;
+
+ preempt_disable();
+ ret = __percpu_down_read_trylock(sem);
+ preempt_enable();
+
+ return ret;
+ }
+ return __percpu_down_write_trylock(sem);
+}
+
+static int percpu_rwsem_wake_function(struct wait_queue_entry *wq_entry,
+ unsigned int mode, int wake_flags,
+ void *key)
+{
+ struct task_struct *p = get_task_struct(wq_entry->private);
+ bool reader = wq_entry->flags & WQ_FLAG_CUSTOM;
+ struct percpu_rw_semaphore *sem = key;
+
+ /* concurrent against percpu_down_write(), can get stolen */
+ if (!__percpu_rwsem_trylock(sem, reader))
+ return 1;
+
+ list_del_init(&wq_entry->entry);
+ smp_store_release(&wq_entry->private, NULL);
+
+ wake_up_process(p);
+ put_task_struct(p);
+
+ return !reader; /* wake 'all' readers and 1 writer */
+}
+
+static void percpu_rwsem_wait(struct percpu_rw_semaphore *sem, bool reader)
+{
+ DEFINE_WAIT_FUNC(wq_entry, percpu_rwsem_wake_function);
+ bool wait;
+
+ spin_lock_irq(&sem->waiters.lock);
+ /*
+ * Serialize against the wakeup in percpu_up_write(), if we fail
+ * the trylock, the wakeup must see us on the list.
+ */
+ wait = !__percpu_rwsem_trylock(sem, reader);
+ if (wait) {
+ wq_entry.flags |= WQ_FLAG_EXCLUSIVE | reader * WQ_FLAG_CUSTOM;
+ __add_wait_queue_entry_tail(&sem->waiters, &wq_entry);
+ }
+ spin_unlock_irq(&sem->waiters.lock);
+
+ while (wait) {
+ set_current_state(TASK_UNINTERRUPTIBLE);
+ if (!smp_load_acquire(&wq_entry.private))
+ break;
+ schedule();
+ }
+ __set_current_state(TASK_RUNNING);
+}
+
bool __percpu_down_read(struct percpu_rw_semaphore *sem, bool try)
{
if (__percpu_down_read_trylock(sem))
@@ -89,20 +156,10 @@ bool __percpu_down_read(struct percpu_rw
if (try)
return false;

- /*
- * We either call schedule() in the wait, or we'll fall through
- * and reschedule on the preempt_enable() in percpu_down_read().
- */
- preempt_enable_no_resched();
-
- /*
- * Avoid lockdep for the down/up_read() we already have them.
- */
- __down_read(&sem->rw_sem);
- this_cpu_inc(*sem->read_count);
- __up_read(&sem->rw_sem);
-
+ preempt_enable();
+ percpu_rwsem_wait(sem, /* .reader = */ true );
preempt_disable();
+
return true;
}
EXPORT_SYMBOL_GPL(__percpu_down_read);
@@ -117,7 +174,7 @@ void __percpu_up_read(struct percpu_rw_s
*/
__this_cpu_dec(*sem->read_count);

- /* Prod writer to recheck readers_active */
+ /* Prod writer to re-evaluate readers_active_check() */
rcuwait_wake_up(&sem->writer);
}
EXPORT_SYMBOL_GPL(__percpu_up_read);
@@ -137,6 +194,8 @@ EXPORT_SYMBOL_GPL(__percpu_up_read);
* zero. If this sum is zero, then it is stable due to the fact that if any
* newly arriving readers increment a given counter, they will immediately
* decrement that same counter.
+ *
+ * Assumes sem->block is set.
*/
static bool readers_active_check(struct percpu_rw_semaphore *sem)
{
@@ -160,23 +219,22 @@ void percpu_down_write(struct percpu_rw_
/* Notify readers to take the slow path. */
rcu_sync_enter(&sem->rss);

- __down_write(&sem->rw_sem);
-
/*
- * Notify new readers to block; up until now, and thus throughout the
- * longish rcu_sync_enter() above, new readers could still come in.
+ * Try set sem->block; this provides writer-writer exclusion.
+ * Having sem->block set makes new readers block.
*/
- WRITE_ONCE(sem->readers_block, 1);
+ if (!__percpu_down_write_trylock(sem))
+ percpu_rwsem_wait(sem, /* .reader = */ false);

- smp_mb(); /* D matches A */
+ /* smp_mb() implied by __percpu_down_writer_trylock() on success -- D matches A */

/*
- * If they don't see our writer of readers_block, then we are
- * guaranteed to see their sem->read_count increment, and therefore
- * will wait for them.
+ * If they don't see our store of sem->block, then we are guaranteed to
+ * see their sem->read_count increment, and therefore will wait for
+ * them.
*/

- /* Wait for all now active readers to complete. */
+ /* Wait for all active readers to complete. */
rcuwait_wait_event(&sem->writer, readers_active_check(sem));
}
EXPORT_SYMBOL_GPL(percpu_down_write);
@@ -195,12 +253,12 @@ void percpu_up_write(struct percpu_rw_se
* Therefore we force it through the slow path which guarantees an
* acquire and thereby guarantees the critical section's consistency.
*/
- smp_store_release(&sem->readers_block, 0);
+ atomic_set_release(&sem->block, 0);

/*
- * Release the write lock, this will allow readers back in the game.
+ * Prod any pending reader/writer to make progress.
*/
- __up_write(&sem->rw_sem);
+ __wake_up(&sem->waiters, TASK_NORMAL, 1, sem);

/*
* Once this completes (at least one RCU-sched grace period hence) the
--- a/kernel/locking/rwsem.c
+++ b/kernel/locking/rwsem.c
@@ -28,7 +28,6 @@
#include <linux/rwsem.h>
#include <linux/atomic.h>

-#include "rwsem.h"
#include "lock_events.h"

/*
@@ -1338,7 +1337,7 @@ static struct rw_semaphore *rwsem_downgr
/*
* lock for reading
*/
-inline void __down_read(struct rw_semaphore *sem)
+static inline void __down_read(struct rw_semaphore *sem)
{
if (!rwsem_read_trylock(sem)) {
rwsem_down_read_slowpath(sem, TASK_UNINTERRUPTIBLE);
@@ -1383,7 +1382,7 @@ static inline int __down_read_trylock(st
/*
* lock for writing
*/
-inline void __down_write(struct rw_semaphore *sem)
+static inline void __down_write(struct rw_semaphore *sem)
{
long tmp = RWSEM_UNLOCKED_VALUE;

@@ -1426,7 +1425,7 @@ static inline int __down_write_trylock(s
/*
* unlock after reading
*/
-inline void __up_read(struct rw_semaphore *sem)
+static inline void __up_read(struct rw_semaphore *sem)
{
long tmp;

@@ -1446,7 +1445,7 @@ inline void __up_read(struct rw_semaphor
/*
* unlock after writing
*/
-inline void __up_write(struct rw_semaphore *sem)
+static inline void __up_write(struct rw_semaphore *sem)
{
long tmp;

--- a/kernel/locking/rwsem.h
+++ b/kernel/locking/rwsem.h
@@ -1,12 +0,0 @@
-/* SPDX-License-Identifier: GPL-2.0 */
-
-#ifndef __INTERNAL_RWSEM_H
-#define __INTERNAL_RWSEM_H
-#include <linux/rwsem.h>
-
-extern void __down_read(struct rw_semaphore *sem);
-extern void __up_read(struct rw_semaphore *sem);
-extern void __down_write(struct rw_semaphore *sem);
-extern void __up_write(struct rw_semaphore *sem);
-
-#endif /* __INTERNAL_RWSEM_H */



2019-11-18 20:00:07

by Davidlohr Bueso

[permalink] [raw]
Subject: Re: [PATCH 5/5] locking/percpu-rwsem: Remove the embedded rwsem

On Wed, 13 Nov 2019, Peter Zijlstra wrote:
>@@ -54,23 +52,23 @@ static bool __percpu_down_read_trylock(s
> * the same CPU as the increment, avoiding the
> * increment-on-one-CPU-and-decrement-on-another problem.

Nit: Now that you've made read_count more symmetric, maybe this first
paragraph can be moved down to __percpu_rwsem_trylock() reader side,
as such:

/*
* Due to having preemption disabled the decrement happens on
* the same CPU as the increment, avoiding the
* increment-on-one-CPU-and-decrement-on-another problem.
*/
preempt_disable();
ret = __percpu_down_read_trylock(sem);
preempt_enable();

> *
>- * If the reader misses the writer's assignment of readers_block, then
>- * the writer is guaranteed to see the reader's increment.
>+ * If the reader misses the writer's assignment of sem->block, then the
>+ * writer is guaranteed to see the reader's increment.

...

> bool __percpu_down_read(struct percpu_rw_semaphore *sem, bool try)
> {
> if (__percpu_down_read_trylock(sem))
>@@ -89,20 +156,10 @@ bool __percpu_down_read(struct percpu_rw
> if (try)
> return false;
>
>- /*
>- * We either call schedule() in the wait, or we'll fall through
>- * and reschedule on the preempt_enable() in percpu_down_read().
>- */
>- preempt_enable_no_resched();
>-
>- /*
>- * Avoid lockdep for the down/up_read() we already have them.
>- */
>- __down_read(&sem->rw_sem);
>- this_cpu_inc(*sem->read_count);
>- __up_read(&sem->rw_sem);
>-
>+ preempt_enable();
>+ percpu_rwsem_wait(sem, /* .reader = */ true );
> preempt_disable();
>+
> return true;
> }
> EXPORT_SYMBOL_GPL(__percpu_down_read);

Do we really need to export symbol here? This function is only called
from percpu-rwsem.h.

>@@ -117,7 +174,7 @@ void __percpu_up_read(struct percpu_rw_s
> */
> __this_cpu_dec(*sem->read_count);
>
>- /* Prod writer to recheck readers_active */
>+ /* Prod writer to re-evaluate readers_active_check() */
> rcuwait_wake_up(&sem->writer);
> }
> EXPORT_SYMBOL_GPL(__percpu_up_read);
>@@ -137,6 +194,8 @@ EXPORT_SYMBOL_GPL(__percpu_up_read);
> * zero. If this sum is zero, then it is stable due to the fact that if any
> * newly arriving readers increment a given counter, they will immediately
> * decrement that same counter.
>+ *
>+ * Assumes sem->block is set.
> */
> static bool readers_active_check(struct percpu_rw_semaphore *sem)
> {
>@@ -160,23 +219,22 @@ void percpu_down_write(struct percpu_rw_
> /* Notify readers to take the slow path. */
> rcu_sync_enter(&sem->rss);
>
>- __down_write(&sem->rw_sem);
>-
> /*
>- * Notify new readers to block; up until now, and thus throughout the
>- * longish rcu_sync_enter() above, new readers could still come in.
>+ * Try set sem->block; this provides writer-writer exclusion.
>+ * Having sem->block set makes new readers block.
> */
>- WRITE_ONCE(sem->readers_block, 1);
>+ if (!__percpu_down_write_trylock(sem))
>+ percpu_rwsem_wait(sem, /* .reader = */ false);
>
>- smp_mb(); /* D matches A */
>+ /* smp_mb() implied by __percpu_down_writer_trylock() on success -- D matches A */
^^^
write
...

>--- a/kernel/locking/rwsem.h
>+++ b/kernel/locking/rwsem.h
>@@ -1,12 +0,0 @@
>-/* SPDX-License-Identifier: GPL-2.0 */
>-
>-#ifndef __INTERNAL_RWSEM_H
>-#define __INTERNAL_RWSEM_H
>-#include <linux/rwsem.h>
>-
>-extern void __down_read(struct rw_semaphore *sem);
>-extern void __up_read(struct rw_semaphore *sem);
>-extern void __down_write(struct rw_semaphore *sem);
>-extern void __up_write(struct rw_semaphore *sem);

This is a nice side effect.

Thanks,
Davidlohr

2019-11-18 21:54:46

by Waiman Long

[permalink] [raw]
Subject: Re: [PATCH 5/5] locking/percpu-rwsem: Remove the embedded rwsem

On 11/13/19 5:21 AM, Peter Zijlstra wrote:
> @@ -130,20 +131,12 @@ static inline void percpu_rwsem_release(
> bool read, unsigned long ip)
> {
> lock_release(&sem->dep_map, ip);
> -#ifdef CONFIG_RWSEM_SPIN_ON_OWNER
> - if (!read)
> - atomic_long_set(&sem->rw_sem.owner, RWSEM_OWNER_UNKNOWN);
> -#endif
> }
>

This is the only place that set RWSEM_OWNER_UNKNOWN. So you may as well
remove all references to it:

diff --git a/include/linux/rwsem.h b/include/linux/rwsem.h
index 00d6054687dd..8a418d9eeb7a 100644
--- a/include/linux/rwsem.h
+++ b/include/linux/rwsem.h
@@ -53,12 +53,6 @@ struct rw_semaphore {
 #endif
 };
 
-/*
- * Setting all bits of the owner field except bit 0 will indicate
- * that the rwsem is writer-owned with an unknown owner.
- */
-#define RWSEM_OWNER_UNKNOWN    (-2L)
-
 /* In all implementations count != 0 means locked */
 static inline int rwsem_is_locked(struct rw_semaphore *sem)
 {
diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c
index eef04551eae7..622842754c73 100644
--- a/kernel/locking/rwsem.c
+++ b/kernel/locking/rwsem.c
@@ -660,8 +660,6 @@ static inline bool rwsem_can_spin_on_owner(struct
rw_semapho
        unsigned long flags;
        bool ret = true;
 
-       BUILD_BUG_ON(!(RWSEM_OWNER_UNKNOWN & RWSEM_NONSPINNABLE));
-
        if (need_resched()) {
                lockevent_inc(rwsem_opt_fail);
                return false;

Cheers,
Longman

2019-11-18 23:25:29

by Davidlohr Bueso

[permalink] [raw]
Subject: Re: [PATCH 5/5] locking/percpu-rwsem: Remove the embedded rwsem

On Mon, 18 Nov 2019, Davidlohr Bueso wrote:

>On Wed, 13 Nov 2019, Peter Zijlstra wrote:
3>>bool __percpu_down_read(struct percpu_rw_semaphore *sem, bool try)
>>{
>> if (__percpu_down_read_trylock(sem))
>>@@ -89,20 +156,10 @@ bool __percpu_down_read(struct percpu_rw
>> if (try)
>> return false;
>>
>>- /*
>>- * We either call schedule() in the wait, or we'll fall through
>>- * and reschedule on the preempt_enable() in percpu_down_read().
>>- */
>>- preempt_enable_no_resched();
>>-
>>- /*
>>- * Avoid lockdep for the down/up_read() we already have them.
>>- */
>>- __down_read(&sem->rw_sem);
>>- this_cpu_inc(*sem->read_count);
>>- __up_read(&sem->rw_sem);
>>-
>>+ preempt_enable();
>>+ percpu_rwsem_wait(sem, /* .reader = */ true );
>> preempt_disable();
>>+
>> return true;
>>}
>>EXPORT_SYMBOL_GPL(__percpu_down_read);
>
>Do we really need to export symbol here? This function is only called
>from percpu-rwsem.h.

Similarly, afaict we can get rid of __percpu_up_read() and put the
slowpath all into percpu_up_read(). Also explicitly mention the
single task nature of the writer (which is a better comment for
the rcuwait_wake_up()).

diff --git a/include/linux/percpu-rwsem.h b/include/linux/percpu-rwsem.h
index f5ecf6a8a1dd..eda545f42fb8 100644
--- a/include/linux/percpu-rwsem.h
+++ b/include/linux/percpu-rwsem.h
@@ -43,7 +43,6 @@ is_static struct percpu_rw_semaphore name = { \
__DEFINE_PERCPU_RWSEM(name, static)

extern bool __percpu_down_read(struct percpu_rw_semaphore *, bool);
-extern void __percpu_up_read(struct percpu_rw_semaphore *);

static inline void percpu_down_read(struct percpu_rw_semaphore *sem)
{
@@ -103,10 +102,23 @@ static inline void percpu_up_read(struct percpu_rw_semaphore *sem)
/*
* Same as in percpu_down_read().
*/
- if (likely(rcu_sync_is_idle(&sem->rss)))
+ if (likely(rcu_sync_is_idle(&sem->rss))) {
__this_cpu_dec(*sem->read_count);
- else
- __percpu_up_read(sem); /* Unconditional memory barrier */
+ goto done;
+ }
+
+ /*
+ * slowpath; reader will only ever wake a single blocked writer.
+ */
+ smp_mb(); /* B matches C */
+ /*
+ * In other words, if they see our decrement (presumably to
+ * aggregate zero, as that is the only time it matters) they
+ * will also see our critical section.
+ */
+ __this_cpu_dec(*sem->read_count);
+ rcuwait_wake_up(&sem->writer);
+done:
preempt_enable();
}

diff --git a/kernel/locking/percpu-rwsem.c b/kernel/locking/percpu-rwsem.c
index 851038468efb..a5150a876626 100644
--- a/kernel/locking/percpu-rwsem.c
+++ b/kernel/locking/percpu-rwsem.c
@@ -164,21 +164,6 @@ bool __percpu_down_read(struct percpu_rw_semaphore *sem, bool try)
}
EXPORT_SYMBOL_GPL(__percpu_down_read);

-void __percpu_up_read(struct percpu_rw_semaphore *sem)
-{
- smp_mb(); /* B matches C */
- /*
- * In other words, if they see our decrement (presumably to aggregate
- * zero, as that is the only time it matters) they will also see our
- * critical section.
- */
- __this_cpu_dec(*sem->read_count);
-
- /* Prod writer to re-evaluate readers_active_check() */
- rcuwait_wake_up(&sem->writer);
-}
-EXPORT_SYMBOL_GPL(__percpu_up_read);
-
#define per_cpu_sum(var) \
({ \
typeof(var) __sum = 0; \

2019-11-19 13:55:01

by Waiman Long

[permalink] [raw]
Subject: Re: [PATCH 5/5] locking/percpu-rwsem: Remove the embedded rwsem

On 11/13/19 5:21 AM, Peter Zijlstra wrote:
> +static int percpu_rwsem_wake_function(struct wait_queue_entry *wq_entry,
> + unsigned int mode, int wake_flags,
> + void *key)
> +{
> + struct task_struct *p = get_task_struct(wq_entry->private);
> + bool reader = wq_entry->flags & WQ_FLAG_CUSTOM;
> + struct percpu_rw_semaphore *sem = key;
> +
> + /* concurrent against percpu_down_write(), can get stolen */
> + if (!__percpu_rwsem_trylock(sem, reader))
> + return 1;
> +
> + list_del_init(&wq_entry->entry);
> + smp_store_release(&wq_entry->private, NULL);
> +
> + wake_up_process(p);
> + put_task_struct(p);
> +
> + return !reader; /* wake 'all' readers and 1 writer */
> +}
> +
> +static void percpu_rwsem_wait(struct percpu_rw_semaphore *sem, bool reader)
> +{
> + DEFINE_WAIT_FUNC(wq_entry, percpu_rwsem_wake_function);
> + bool wait;
> +
> + spin_lock_irq(&sem->waiters.lock);
> + /*
> + * Serialize against the wakeup in percpu_up_write(), if we fail
> + * the trylock, the wakeup must see us on the list.
> + */
> + wait = !__percpu_rwsem_trylock(sem, reader);
> + if (wait) {
> + wq_entry.flags |= WQ_FLAG_EXCLUSIVE | reader * WQ_FLAG_CUSTOM;
> + __add_wait_queue_entry_tail(&sem->waiters, &wq_entry);
> + }
> + spin_unlock_irq(&sem->waiters.lock);
> +
> + while (wait) {
> + set_current_state(TASK_UNINTERRUPTIBLE);
> + if (!smp_load_acquire(&wq_entry.private))
> + break;
> + schedule();
> + }

If I read the function correctly, you are setting the WQ_FLAG_EXCLUSIVE
for both readers and writers and __wake_up() is called with an exclusive
count of one. So only one reader or writer is woken up each time.
However, the comment above said we wake 'all' readers and 1 writer. That
doesn't match the actual code, IMO. To match the comments, you should
have set WQ_FLAG_EXCLUSIVE flag only on writer. In this case, you
probably don't need WQ_FLAG_CUSTOM to differentiate between readers and
writers.

Cheers,
Longman


2019-11-19 16:03:21

by Oleg Nesterov

[permalink] [raw]
Subject: Re: [PATCH 5/5] locking/percpu-rwsem: Remove the embedded rwsem

On 11/19, Waiman Long wrote:
>
> On 11/13/19 5:21 AM, Peter Zijlstra wrote:
> > +static int percpu_rwsem_wake_function(struct wait_queue_entry *wq_entry,
> > + unsigned int mode, int wake_flags,
> > + void *key)
> > +{
> > + struct task_struct *p = get_task_struct(wq_entry->private);
> > + bool reader = wq_entry->flags & WQ_FLAG_CUSTOM;
> > + struct percpu_rw_semaphore *sem = key;
> > +
> > + /* concurrent against percpu_down_write(), can get stolen */
> > + if (!__percpu_rwsem_trylock(sem, reader))
> > + return 1;
> > +
> > + list_del_init(&wq_entry->entry);
> > + smp_store_release(&wq_entry->private, NULL);
> > +
> > + wake_up_process(p);
> > + put_task_struct(p);
> > +
> > + return !reader; /* wake 'all' readers and 1 writer */
> > +}
> > +
>
> If I read the function correctly, you are setting the WQ_FLAG_EXCLUSIVE
> for both readers and writers and __wake_up() is called with an exclusive
> count of one. So only one reader or writer is woken up each time.

This depends on what percpu_rwsem_wake_function() returns. If it returns 1,
__wake_up_common() stops, exactly because all waiters have WQ_FLAG_EXCLUSIVE.

> However, the comment above said we wake 'all' readers and 1 writer. That
> doesn't match the actual code, IMO.

Well, "'all' readers" probably means "all readers before writer",

> To match the comments, you should
> have set WQ_FLAG_EXCLUSIVE flag only on writer. In this case, you
> probably don't need WQ_FLAG_CUSTOM to differentiate between readers and
> writers.

See above...

note also the

if (!__percpu_rwsem_trylock(sem, reader))
return 1;

at the start of percpu_rwsem_wake_function(). We want to stop wake_up_common()
as soon as percpu_rwsem_trylock() fails. Because we know that if it fails once
it can't succeed later. Although iiuc this can only happen if another (new)
writer races with __wake_up(&sem->waiters).


I guess WQ_FLAG_CUSTOM can be avoided, percpu_rwsem_wait() could do

if (read)
__add_wait_queue_entry_tail(...);
else {
wq_entry.flags |= WQ_FLAG_EXCLUSIVE;
__add_wait_queue(...);
}

but this is "unfair".

Oleg.


2019-11-19 16:33:58

by Waiman Long

[permalink] [raw]
Subject: Re: [PATCH 5/5] locking/percpu-rwsem: Remove the embedded rwsem

On 11/19/19 10:58 AM, Oleg Nesterov wrote:
> On 11/19, Waiman Long wrote:
>> On 11/13/19 5:21 AM, Peter Zijlstra wrote:
>>> +static int percpu_rwsem_wake_function(struct wait_queue_entry *wq_entry,
>>> + unsigned int mode, int wake_flags,
>>> + void *key)
>>> +{
>>> + struct task_struct *p = get_task_struct(wq_entry->private);
>>> + bool reader = wq_entry->flags & WQ_FLAG_CUSTOM;
>>> + struct percpu_rw_semaphore *sem = key;
>>> +
>>> + /* concurrent against percpu_down_write(), can get stolen */
>>> + if (!__percpu_rwsem_trylock(sem, reader))
>>> + return 1;
>>> +
>>> + list_del_init(&wq_entry->entry);
>>> + smp_store_release(&wq_entry->private, NULL);
>>> +
>>> + wake_up_process(p);
>>> + put_task_struct(p);
>>> +
>>> + return !reader; /* wake 'all' readers and 1 writer */
>>> +}
>>> +
>> If I read the function correctly, you are setting the WQ_FLAG_EXCLUSIVE
>> for both readers and writers and __wake_up() is called with an exclusive
>> count of one. So only one reader or writer is woken up each time.
> This depends on what percpu_rwsem_wake_function() returns. If it returns 1,
> __wake_up_common() stops, exactly because all waiters have WQ_FLAG_EXCLUSIVE.
>
>> However, the comment above said we wake 'all' readers and 1 writer. That
>> doesn't match the actual code, IMO.
> Well, "'all' readers" probably means "all readers before writer",
>
>> To match the comments, you should
>> have set WQ_FLAG_EXCLUSIVE flag only on writer. In this case, you
>> probably don't need WQ_FLAG_CUSTOM to differentiate between readers and
>> writers.
> See above...
>
> note also the
>
> if (!__percpu_rwsem_trylock(sem, reader))
> return 1;
>
> at the start of percpu_rwsem_wake_function(). We want to stop wake_up_common()
> as soon as percpu_rwsem_trylock() fails. Because we know that if it fails once
> it can't succeed later. Although iiuc this can only happen if another (new)
> writer races with __wake_up(&sem->waiters).
>
>
> I guess WQ_FLAG_CUSTOM can be avoided, percpu_rwsem_wait() could do
>
> if (read)
> __add_wait_queue_entry_tail(...);
> else {
> wq_entry.flags |= WQ_FLAG_EXCLUSIVE;
> __add_wait_queue(...);
> }
>
> but this is "unfair".

Thanks for the explanation. That clarifies my understanding of the patch.

Cheers,
Longman