The problem we're trying to solve is a lock-free walk of
/proc/$pid/maps. If the process is modifying the VMAs at the same time
the reader is walking them, it can see garbage. For page faults, we
handle this by taking the mmap_lock for read and retrying the page fault
(excluding any further modifications).
We don't want to take that approach for the maps file. The monitoring
task may have a significantly lower process priority, and so taking
the mmap_lock for read can block it for a significant period of time.
The obvious answer is to do some kind of backoff+sleep. But we already
have a wait queue, so why not use it?
I haven't done the rwbase version; this is just a demonstration of what
we could do. It's also untested other than by compilation. It might
well be missing something.
Signed-off-by: Matthew Wilcox (Oracle) <[email protected]>
---
include/linux/rwsem.h | 6 +++
kernel/locking/rwsem.c | 104 ++++++++++++++++++++++++++++++++++++++++-
2 files changed, 108 insertions(+), 2 deletions(-)
diff --git a/include/linux/rwsem.h b/include/linux/rwsem.h
index 4f1c18992f76..e7bf9dfc471a 100644
--- a/include/linux/rwsem.h
+++ b/include/linux/rwsem.h
@@ -250,6 +250,12 @@ DEFINE_GUARD_COND(rwsem_write, _try, down_write_trylock(_T))
*/
extern void downgrade_write(struct rw_semaphore *sem);
+/*
+ * wait for current writer to be finished
+ */
+void rwsem_wait(struct rw_semaphore *sem);
+int __must_check rwsem_wait_killable(struct rw_semaphore *sem);
+
#ifdef CONFIG_DEBUG_LOCK_ALLOC
/*
* nested locking. NOTE: rwsems are not allowed to recurse
diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c
index 2340b6d90ec6..7c8096c5586f 100644
--- a/kernel/locking/rwsem.c
+++ b/kernel/locking/rwsem.c
@@ -332,7 +332,8 @@ EXPORT_SYMBOL(__init_rwsem);
enum rwsem_waiter_type {
RWSEM_WAITING_FOR_WRITE,
- RWSEM_WAITING_FOR_READ
+ RWSEM_WAITING_FOR_READ,
+ RWSEM_WAITING_FOR_RELEASE,
};
struct rwsem_waiter {
@@ -511,7 +512,8 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
if (waiter->type == RWSEM_WAITING_FOR_WRITE)
continue;
- woken++;
+ if (waiter->type == RWSEM_WAITING_FOR_READ)
+ woken++;
list_move_tail(&waiter->list, &wlist);
/*
@@ -1401,6 +1403,67 @@ static inline void __downgrade_write(struct rw_semaphore *sem)
preempt_enable();
}
+static inline int __wait_read_common(struct rw_semaphore *sem, int state)
+{
+ int ret = 0;
+ long adjustment = 0;
+ struct rwsem_waiter waiter;
+ DEFINE_WAKE_Q(wake_q);
+
+ waiter.task = current;
+ waiter.type = RWSEM_WAITING_FOR_RELEASE;
+ waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;
+ waiter.handoff_set = false;
+
+ preempt_disable();
+ raw_spin_lock_irq(&sem->wait_lock);
+ if (list_empty(&sem->wait_list)) {
+ if (!(atomic_long_read(&sem->count) & RWSEM_WRITER_MASK)) {
+ /* Provide lock ACQUIRE */
+ smp_acquire__after_ctrl_dep();
+ raw_spin_unlock_irq(&sem->wait_lock);
+ goto done;
+ }
+ adjustment = RWSEM_FLAG_WAITERS;
+ }
+ rwsem_add_waiter(sem, &waiter);
+ if (adjustment) {
+ long count = atomic_long_add_return(adjustment, &sem->count);
+ rwsem_cond_wake_waiter(sem, count, &wake_q);
+ }
+ raw_spin_unlock_irq(&sem->wait_lock);
+
+ if (!wake_q_empty(&wake_q))
+ wake_up_q(&wake_q);
+
+ for (;;) {
+ set_current_state(state);
+ if (!smp_load_acquire(&waiter.task)) {
+ /* Matches rwsem_mark_wake()'s smp_store_release(). */
+ break;
+ }
+ if (signal_pending_state(state, current)) {
+ raw_spin_lock_irq(&sem->wait_lock);
+ if (waiter.task)
+ goto out_nolock;
+ raw_spin_unlock_irq(&sem->wait_lock);
+ /* Ordered by sem->wait_lock against rwsem_mark_wake(). */
+ break;
+ }
+ schedule_preempt_disabled();
+ }
+
+ __set_current_state(TASK_RUNNING);
+done:
+ preempt_enable();
+ return ret;
+out_nolock:
+ rwsem_del_wake_waiter(sem, &waiter, &wake_q);
+ __set_current_state(TASK_RUNNING);
+ ret = -EINTR;
+ goto done;
+}
+
#else /* !CONFIG_PREEMPT_RT */
#define RT_MUTEX_BUILD_MUTEX
@@ -1500,6 +1563,11 @@ static inline void __downgrade_write(struct rw_semaphore *sem)
rwbase_write_downgrade(&sem->rwbase);
}
+static inline int __wait_read_killable(struct rw_semaphore *sem)
+{
+ return rwbase_wait_lock(&sem->rwbase, TASK_KILLABLE);
+}
+
/* Debug stubs for the common API */
#define DEBUG_RWSEMS_WARN_ON(c, sem)
@@ -1643,6 +1711,38 @@ void downgrade_write(struct rw_semaphore *sem)
}
EXPORT_SYMBOL(downgrade_write);
+/**
+ * rwsem_wait_killable - Wait for current write lock holder to release lock
+ * @sem: The semaphore to wait on.
+ *
+ * This is equivalent to calling down_read(); up_read() but avoids the
+ * possibility that the thread will be preempted while holding the lock
+ * causing threads that want to take the lock for writes to block. The
+ * intended use case is for lockless readers who notice an inconsistent
+ * state and want to wait for the current writer to finish.
+ */
+int rwsem_wait_killable(struct rw_semaphore *sem)
+{
+ might_sleep();
+
+ rwsem_acquire_read(&sem->dep_map, 0, 0, _RET_IP_);
+ rwsem_release(&sem->dep_map, _RET_IP_);
+
+ return __wait_read_common(sem, TASK_KILLABLE);
+}
+EXPORT_SYMBOL(rwsem_wait_killable);
+
+void rwsem_wait(struct rw_semaphore *sem)
+{
+ might_sleep();
+
+ rwsem_acquire_read(&sem->dep_map, 0, 0, _RET_IP_);
+ rwsem_release(&sem->dep_map, _RET_IP_);
+
+ __wait_read_common(sem, TASK_UNINTERRUPTIBLE);
+}
+EXPORT_SYMBOL(rwsem_wait);
+
#ifdef CONFIG_DEBUG_LOCK_ALLOC
void down_read_nested(struct rw_semaphore *sem, int subclass)
--
2.43.0
On 1/9/24 12:12, Matthew Wilcox wrote:
> The problem we're trying to solve is a lock-free walk of
> /proc/$pid/maps. If the process is modifying the VMAs at the same time
> the reader is walking them, it can see garbage. For page faults, we
> handle this by taking the mmap_lock for read and retrying the page fault
> (excluding any further modifications).
>
> We don't want to take that approach for the maps file. The monitoring
> task may have a significantly lower process priority, and so taking
> the mmap_lock for read can block it for a significant period of time.
> The obvious answer is to do some kind of backoff+sleep. But we already
> have a wait queue, so why not use it?
>
> I haven't done the rwbase version; this is just a demonstration of what
> we could do. It's also untested other than by compilation. It might
> well be missing something.
It is not clear what exactly is the purpose of this new API. Are you
just waiting in the rwsem wait queue until it gets waken up without
taking a read or write lock? I see two issues at the moment.
1) The handoff processing should exclude the new
RWSEM_WAITING_FOR_RELEASE waiter types.
2) If the rwsem is free, it should call rwsem_wake() again to wake up
the next waiter, like what is being done in up_write().
Cheers,
Longman
>
> Signed-off-by: Matthew Wilcox (Oracle) <[email protected]>
> ---
> include/linux/rwsem.h | 6 +++
> kernel/locking/rwsem.c | 104 ++++++++++++++++++++++++++++++++++++++++-
> 2 files changed, 108 insertions(+), 2 deletions(-)
>
> diff --git a/include/linux/rwsem.h b/include/linux/rwsem.h
> index 4f1c18992f76..e7bf9dfc471a 100644
> --- a/include/linux/rwsem.h
> +++ b/include/linux/rwsem.h
> @@ -250,6 +250,12 @@ DEFINE_GUARD_COND(rwsem_write, _try, down_write_trylock(_T))
> */
> extern void downgrade_write(struct rw_semaphore *sem);
>
> +/*
> + * wait for current writer to be finished
> + */
> +void rwsem_wait(struct rw_semaphore *sem);
> +int __must_check rwsem_wait_killable(struct rw_semaphore *sem);
> +
> #ifdef CONFIG_DEBUG_LOCK_ALLOC
> /*
> * nested locking. NOTE: rwsems are not allowed to recurse
> diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c
> index 2340b6d90ec6..7c8096c5586f 100644
> --- a/kernel/locking/rwsem.c
> +++ b/kernel/locking/rwsem.c
> @@ -332,7 +332,8 @@ EXPORT_SYMBOL(__init_rwsem);
>
> enum rwsem_waiter_type {
> RWSEM_WAITING_FOR_WRITE,
> - RWSEM_WAITING_FOR_READ
> + RWSEM_WAITING_FOR_READ,
> + RWSEM_WAITING_FOR_RELEASE,
> };
>
> struct rwsem_waiter {
> @@ -511,7 +512,8 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
> if (waiter->type == RWSEM_WAITING_FOR_WRITE)
> continue;
>
> - woken++;
> + if (waiter->type == RWSEM_WAITING_FOR_READ)
> + woken++;
> list_move_tail(&waiter->list, &wlist);
>
> /*
> @@ -1401,6 +1403,67 @@ static inline void __downgrade_write(struct rw_semaphore *sem)
> preempt_enable();
> }
>
> +static inline int __wait_read_common(struct rw_semaphore *sem, int state)
> +{
> + int ret = 0;
> + long adjustment = 0;
> + struct rwsem_waiter waiter;
> + DEFINE_WAKE_Q(wake_q);
> +
> + waiter.task = current;
> + waiter.type = RWSEM_WAITING_FOR_RELEASE;
> + waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;
> + waiter.handoff_set = false;
> +
> + preempt_disable();
> + raw_spin_lock_irq(&sem->wait_lock);
> + if (list_empty(&sem->wait_list)) {
> + if (!(atomic_long_read(&sem->count) & RWSEM_WRITER_MASK)) {
> + /* Provide lock ACQUIRE */
> + smp_acquire__after_ctrl_dep();
> + raw_spin_unlock_irq(&sem->wait_lock);
> + goto done;
> + }
> + adjustment = RWSEM_FLAG_WAITERS;
> + }
> + rwsem_add_waiter(sem, &waiter);
> + if (adjustment) {
> + long count = atomic_long_add_return(adjustment, &sem->count);
> + rwsem_cond_wake_waiter(sem, count, &wake_q);
> + }
> + raw_spin_unlock_irq(&sem->wait_lock);
> +
> + if (!wake_q_empty(&wake_q))
> + wake_up_q(&wake_q);
> +
> + for (;;) {
> + set_current_state(state);
> + if (!smp_load_acquire(&waiter.task)) {
> + /* Matches rwsem_mark_wake()'s smp_store_release(). */
> + break;
> + }
> + if (signal_pending_state(state, current)) {
> + raw_spin_lock_irq(&sem->wait_lock);
> + if (waiter.task)
> + goto out_nolock;
> + raw_spin_unlock_irq(&sem->wait_lock);
> + /* Ordered by sem->wait_lock against rwsem_mark_wake(). */
> + break;
> + }
> + schedule_preempt_disabled();
> + }
> +
> + __set_current_state(TASK_RUNNING);
> +done:
> + preempt_enable();
> + return ret;
> +out_nolock:
> + rwsem_del_wake_waiter(sem, &waiter, &wake_q);
> + __set_current_state(TASK_RUNNING);
> + ret = -EINTR;
> + goto done;
> +}
> +
> #else /* !CONFIG_PREEMPT_RT */
>
> #define RT_MUTEX_BUILD_MUTEX
> @@ -1500,6 +1563,11 @@ static inline void __downgrade_write(struct rw_semaphore *sem)
> rwbase_write_downgrade(&sem->rwbase);
> }
>
> +static inline int __wait_read_killable(struct rw_semaphore *sem)
> +{
> + return rwbase_wait_lock(&sem->rwbase, TASK_KILLABLE);
> +}
> +
> /* Debug stubs for the common API */
> #define DEBUG_RWSEMS_WARN_ON(c, sem)
>
> @@ -1643,6 +1711,38 @@ void downgrade_write(struct rw_semaphore *sem)
> }
> EXPORT_SYMBOL(downgrade_write);
>
> +/**
> + * rwsem_wait_killable - Wait for current write lock holder to release lock
> + * @sem: The semaphore to wait on.
> + *
> + * This is equivalent to calling down_read(); up_read() but avoids the
> + * possibility that the thread will be preempted while holding the lock
> + * causing threads that want to take the lock for writes to block. The
> + * intended use case is for lockless readers who notice an inconsistent
> + * state and want to wait for the current writer to finish.
> + */
> +int rwsem_wait_killable(struct rw_semaphore *sem)
> +{
> + might_sleep();
> +
> + rwsem_acquire_read(&sem->dep_map, 0, 0, _RET_IP_);
> + rwsem_release(&sem->dep_map, _RET_IP_);
> +
> + return __wait_read_common(sem, TASK_KILLABLE);
> +}
> +EXPORT_SYMBOL(rwsem_wait_killable);
> +
> +void rwsem_wait(struct rw_semaphore *sem)
> +{
> + might_sleep();
> +
> + rwsem_acquire_read(&sem->dep_map, 0, 0, _RET_IP_);
> + rwsem_release(&sem->dep_map, _RET_IP_);
> +
> + __wait_read_common(sem, TASK_UNINTERRUPTIBLE);
> +}
> +EXPORT_SYMBOL(rwsem_wait);
> +
> #ifdef CONFIG_DEBUG_LOCK_ALLOC
>
> void down_read_nested(struct rw_semaphore *sem, int subclass)
On Tue, Jan 09, 2024 at 04:04:08PM -0500, Waiman Long wrote:
> On 1/9/24 12:12, Matthew Wilcox wrote:
> > The problem we're trying to solve is a lock-free walk of
> > /proc/$pid/maps. If the process is modifying the VMAs at the same time
> > the reader is walking them, it can see garbage. For page faults, we
> > handle this by taking the mmap_lock for read and retrying the page fault
> > (excluding any further modifications).
> >
> > We don't want to take that approach for the maps file. The monitoring
> > task may have a significantly lower process priority, and so taking
> > the mmap_lock for read can block it for a significant period of time.
> > The obvious answer is to do some kind of backoff+sleep. But we already
> > have a wait queue, so why not use it?
> >
> > I haven't done the rwbase version; this is just a demonstration of what
> > we could do. It's also untested other than by compilation. It might
> > well be missing something.
>
> It is not clear what exactly is the purpose of this new API. Are you just
.. really? I wrote it out in the part you quoted, and I wrote it out
differently in the kernel-doc for the function:
+ * rwsem_wait_killable - Wait for current write lock holder to release lock
+ * @sem: The semaphore to wait on.
+ *
+ * This is equivalent to calling down_read(); up_read() but avoids the
+ * possibility that the thread will be preempted while holding the lock
+ * causing threads that want to take the lock for writes to block. The
+ * intended use case is for lockless readers who notice an inconsistent
+ * state and want to wait for the current writer to finish.
Something I forgot to add was that we only guarantee that _a_ writer
finished; another writer may have the lock when the function returns.
> waiting in the rwsem wait queue until it gets waken up without taking a read
> or write lock? I see two issues at the moment.
>
> 1) The handoff processing should exclude the new RWSEM_WAITING_FOR_RELEASE
> waiter types.
Hmm. I thought I'd done that by only incrementing 'woken' for
RWSEM_WAITING_FOR_READ types.
> 2) If the rwsem is free, it should call rwsem_wake() again to wake up the
> next waiter, like what is being done in up_write().
because the wait queue might have a waiter followed by a writer? I
think calling rwsem_wake() again is probably a bad idea as it will
defeat the MAX_READERS_WAKEUP limit. Probably rwsem_mark_wake()
needs to handle that case itself; maybe something like this?
+++ b/kernel/locking/rwsem.c
@@ -419,6 +419,7 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
lockdep_assert_held(&sem->wait_lock);
+again:
/*
* Take a peek at the queue head waiter such that we can determine
* the wakeup(s) to perform.
@@ -542,6 +543,12 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
*/
if (oldcount & RWSEM_FLAG_HANDOFF)
adjustment -= RWSEM_FLAG_HANDOFF;
+ } else {
+ /*
+ * Everybody we woke was a waiter, not a reader. Wake the
+ * first writer instead.
+ */
+ goto again;
}
if (adjustment)
On 1/9/24 16:42, Matthew Wilcox wrote:
> On Tue, Jan 09, 2024 at 04:04:08PM -0500, Waiman Long wrote:
>> On 1/9/24 12:12, Matthew Wilcox wrote:
>>> The problem we're trying to solve is a lock-free walk of
>>> /proc/$pid/maps. If the process is modifying the VMAs at the same time
>>> the reader is walking them, it can see garbage. For page faults, we
>>> handle this by taking the mmap_lock for read and retrying the page fault
>>> (excluding any further modifications).
>>>
>>> We don't want to take that approach for the maps file. The monitoring
>>> task may have a significantly lower process priority, and so taking
>>> the mmap_lock for read can block it for a significant period of time.
>>> The obvious answer is to do some kind of backoff+sleep. But we already
>>> have a wait queue, so why not use it?
>>>
>>> I haven't done the rwbase version; this is just a demonstration of what
>>> we could do. It's also untested other than by compilation. It might
>>> well be missing something.
>> It is not clear what exactly is the purpose of this new API. Are you just
> ... really? I wrote it out in the part you quoted, and I wrote it out
> differently in the kernel-doc for the function:
>
> + * rwsem_wait_killable - Wait for current write lock holder to release lock
> + * @sem: The semaphore to wait on.
> + *
> + * This is equivalent to calling down_read(); up_read() but avoids the
> + * possibility that the thread will be preempted while holding the lock
> + * causing threads that want to take the lock for writes to block. The
> + * intended use case is for lockless readers who notice an inconsistent
> + * state and want to wait for the current writer to finish.
>
> Something I forgot to add was that we only guarantee that _a_ writer
> finished; another writer may have the lock when the function returns.
OK, I focused on the commit log and it didn't mention that. I also
looked at __wait_read_common() and hadn't paid much attention to the
other wrappers.
BTW, how did the a lockless reader notices an inconsistent state? Will
something like a write sequence count help? Though that will require
increasing the size of the rwsem.
>
>> waiting in the rwsem wait queue until it gets waken up without taking a read
>> or write lock? I see two issues at the moment.
>>
>> 1) The handoff processing should exclude the new RWSEM_WAITING_FOR_RELEASE
>> waiter types.
> Hmm. I thought I'd done that by only incrementing 'woken' for
> RWSEM_WAITING_FOR_READ types.
Some minor change is needed in case the RWSEM_WAITING_FOR_RELEASE waiter
is the first one in the queue to be woken up.
>
>> 2) If the rwsem is free, it should call rwsem_wake() again to wake up the
>> next waiter, like what is being done in up_write().
> because the wait queue might have a waiter followed by a writer? I
> think calling rwsem_wake() again is probably a bad idea as it will
> defeat the MAX_READERS_WAKEUP limit. Probably rwsem_mark_wake()
> needs to handle that case itself; maybe something like this?
I am talking about the case that the new waiter is the only one to be
waking up and the rwsem has no reader or writer owner.
I also think that __wait_read_common() should be merged with
rwsem_down_read_slowpath() in some way to minimize duplicated code.
>
> +++ b/kernel/locking/rwsem.c
> @@ -419,6 +419,7 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
>
> lockdep_assert_held(&sem->wait_lock);
>
> +again:
> /*
> * Take a peek at the queue head waiter such that we can determine
> * the wakeup(s) to perform.
> @@ -542,6 +543,12 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
> */
> if (oldcount & RWSEM_FLAG_HANDOFF)
> adjustment -= RWSEM_FLAG_HANDOFF;
> + } else {
> + /*
> + * Everybody we woke was a waiter, not a reader. Wake the
> + * first writer instead.
> + */
> + goto again;
> }
>
> if (adjustment)
That will probably work.
Cheers,
Longman
On Tue, Jan 09, 2024 at 05:12:06PM +0000, Matthew Wilcox wrote:
> The problem we're trying to solve is a lock-free walk of
> /proc/$pid/maps. If the process is modifying the VMAs at the same time
> the reader is walking them, it can see garbage. For page faults, we
> handle this by taking the mmap_lock for read and retrying the page fault
> (excluding any further modifications).
>
> We don't want to take that approach for the maps file. The monitoring
> task may have a significantly lower process priority, and so taking
> the mmap_lock for read can block it for a significant period of time.
> The obvious answer is to do some kind of backoff+sleep. But we already
> have a wait queue, so why not use it?
>
> I haven't done the rwbase version; this is just a demonstration of what
> we could do. It's also untested other than by compilation. It might
> well be missing something.
>
> Signed-off-by: Matthew Wilcox (Oracle) <[email protected]>
At first glance, this is good and sufficient for this use case.
I do have one question that would be important if anyone were to want
to rely on the "This is equivalent to calling down_read(); up_read()"
statement in the header comment, please see below.
Thanx, Paul
> ---
> include/linux/rwsem.h | 6 +++
> kernel/locking/rwsem.c | 104 ++++++++++++++++++++++++++++++++++++++++-
> 2 files changed, 108 insertions(+), 2 deletions(-)
>
> diff --git a/include/linux/rwsem.h b/include/linux/rwsem.h
> index 4f1c18992f76..e7bf9dfc471a 100644
> --- a/include/linux/rwsem.h
> +++ b/include/linux/rwsem.h
> @@ -250,6 +250,12 @@ DEFINE_GUARD_COND(rwsem_write, _try, down_write_trylock(_T))
> */
> extern void downgrade_write(struct rw_semaphore *sem);
>
> +/*
> + * wait for current writer to be finished
> + */
> +void rwsem_wait(struct rw_semaphore *sem);
> +int __must_check rwsem_wait_killable(struct rw_semaphore *sem);
> +
> #ifdef CONFIG_DEBUG_LOCK_ALLOC
> /*
> * nested locking. NOTE: rwsems are not allowed to recurse
> diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c
> index 2340b6d90ec6..7c8096c5586f 100644
> --- a/kernel/locking/rwsem.c
> +++ b/kernel/locking/rwsem.c
> @@ -332,7 +332,8 @@ EXPORT_SYMBOL(__init_rwsem);
>
> enum rwsem_waiter_type {
> RWSEM_WAITING_FOR_WRITE,
> - RWSEM_WAITING_FOR_READ
> + RWSEM_WAITING_FOR_READ,
> + RWSEM_WAITING_FOR_RELEASE,
> };
>
> struct rwsem_waiter {
> @@ -511,7 +512,8 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
> if (waiter->type == RWSEM_WAITING_FOR_WRITE)
> continue;
>
> - woken++;
> + if (waiter->type == RWSEM_WAITING_FOR_READ)
> + woken++;
> list_move_tail(&waiter->list, &wlist);
>
> /*
> @@ -1401,6 +1403,67 @@ static inline void __downgrade_write(struct rw_semaphore *sem)
> preempt_enable();
> }
>
> +static inline int __wait_read_common(struct rw_semaphore *sem, int state)
> +{
> + int ret = 0;
> + long adjustment = 0;
> + struct rwsem_waiter waiter;
> + DEFINE_WAKE_Q(wake_q);
> +
> + waiter.task = current;
> + waiter.type = RWSEM_WAITING_FOR_RELEASE;
> + waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;
> + waiter.handoff_set = false;
> +
> + preempt_disable();
> + raw_spin_lock_irq(&sem->wait_lock);
> + if (list_empty(&sem->wait_list)) {
> + if (!(atomic_long_read(&sem->count) & RWSEM_WRITER_MASK)) {
> + /* Provide lock ACQUIRE */
> + smp_acquire__after_ctrl_dep();
> + raw_spin_unlock_irq(&sem->wait_lock);
> + goto done;
If we take this path, we are ordered against the prior writer's release
courtesy of the acquire ordering on ->count. But we are not ordered
against the next writer's acquisition if that writer takes the fastpath
because rwsem_write_trylock() only does acquire semantics.
Again, this does not matter for your use case, and it all just works on
strongly ordered systems such as x86.
Assuming I am not just confused here, as far as I am concerned, this could
be fixed by adjusting the guarantees in the rwsem_wait_killable() function's
header comment.
But it might be good to avoid the sharp edges that would be provided
by weakening that guarantee.
To that end, I -think- that a fix that would save that header
comment's current wording would insert an smp_mb() before the above
atomic_long_read(), but I could easily be wrong. Plus there might well
need to be similar adjustments later in the code. (I don't immediately
see any, but it has been a good long while since I have stared at
this code.)
Thoughts from people more familiar with this code?
> + }
> + adjustment = RWSEM_FLAG_WAITERS;
> + }
> + rwsem_add_waiter(sem, &waiter);
> + if (adjustment) {
> + long count = atomic_long_add_return(adjustment, &sem->count);
> + rwsem_cond_wake_waiter(sem, count, &wake_q);
> + }
> + raw_spin_unlock_irq(&sem->wait_lock);
> +
> + if (!wake_q_empty(&wake_q))
> + wake_up_q(&wake_q);
> +
> + for (;;) {
> + set_current_state(state);
> + if (!smp_load_acquire(&waiter.task)) {
> + /* Matches rwsem_mark_wake()'s smp_store_release(). */
> + break;
> + }
> + if (signal_pending_state(state, current)) {
> + raw_spin_lock_irq(&sem->wait_lock);
> + if (waiter.task)
> + goto out_nolock;
> + raw_spin_unlock_irq(&sem->wait_lock);
> + /* Ordered by sem->wait_lock against rwsem_mark_wake(). */
> + break;
> + }
> + schedule_preempt_disabled();
> + }
> +
> + __set_current_state(TASK_RUNNING);
> +done:
> + preempt_enable();
> + return ret;
> +out_nolock:
> + rwsem_del_wake_waiter(sem, &waiter, &wake_q);
> + __set_current_state(TASK_RUNNING);
> + ret = -EINTR;
> + goto done;
> +}
> +
> #else /* !CONFIG_PREEMPT_RT */
>
> #define RT_MUTEX_BUILD_MUTEX
> @@ -1500,6 +1563,11 @@ static inline void __downgrade_write(struct rw_semaphore *sem)
> rwbase_write_downgrade(&sem->rwbase);
> }
>
> +static inline int __wait_read_killable(struct rw_semaphore *sem)
> +{
> + return rwbase_wait_lock(&sem->rwbase, TASK_KILLABLE);
> +}
> +
> /* Debug stubs for the common API */
> #define DEBUG_RWSEMS_WARN_ON(c, sem)
>
> @@ -1643,6 +1711,38 @@ void downgrade_write(struct rw_semaphore *sem)
> }
> EXPORT_SYMBOL(downgrade_write);
>
> +/**
> + * rwsem_wait_killable - Wait for current write lock holder to release lock
> + * @sem: The semaphore to wait on.
> + *
> + * This is equivalent to calling down_read(); up_read() but avoids the
> + * possibility that the thread will be preempted while holding the lock
> + * causing threads that want to take the lock for writes to block. The
> + * intended use case is for lockless readers who notice an inconsistent
> + * state and want to wait for the current writer to finish.
> + */
> +int rwsem_wait_killable(struct rw_semaphore *sem)
> +{
> + might_sleep();
> +
> + rwsem_acquire_read(&sem->dep_map, 0, 0, _RET_IP_);
> + rwsem_release(&sem->dep_map, _RET_IP_);
> +
> + return __wait_read_common(sem, TASK_KILLABLE);
> +}
> +EXPORT_SYMBOL(rwsem_wait_killable);
> +
> +void rwsem_wait(struct rw_semaphore *sem)
> +{
> + might_sleep();
> +
> + rwsem_acquire_read(&sem->dep_map, 0, 0, _RET_IP_);
> + rwsem_release(&sem->dep_map, _RET_IP_);
> +
> + __wait_read_common(sem, TASK_UNINTERRUPTIBLE);
> +}
> +EXPORT_SYMBOL(rwsem_wait);
> +
> #ifdef CONFIG_DEBUG_LOCK_ALLOC
>
> void down_read_nested(struct rw_semaphore *sem, int subclass)
> --
> 2.43.0
>
On Tue, Jan 9, 2024 at 9:12 AM Matthew Wilcox <[email protected]> wrote:
>
> The problem we're trying to solve is a lock-free walk of
> /proc/$pid/maps. If the process is modifying the VMAs at the same time
> the reader is walking them, it can see garbage. For page faults, we
> handle this by taking the mmap_lock for read and retrying the page fault
> (excluding any further modifications).
>
> We don't want to take that approach for the maps file. The monitoring
> task may have a significantly lower process priority, and so taking
> the mmap_lock for read can block it for a significant period of time.
> The obvious answer is to do some kind of backoff+sleep. But we already
> have a wait queue, so why not use it?
>
> I haven't done the rwbase version; this is just a demonstration of what
> we could do. It's also untested other than by compilation. It might
> well be missing something.
I just posted an RFC for lock-less /proc/$pid/maps reading at [1]. The
rwsem_wait() function proposed by Matthew here would be useful in that
patchset to replace mmap_read_lock/mmap_read_unlock sequence I have to
use to wait for mmap_lock writer to finish.
[1] https://lore.kernel.org/all/[email protected]/
>
> Signed-off-by: Matthew Wilcox (Oracle) <[email protected]>
> ---
> include/linux/rwsem.h | 6 +++
> kernel/locking/rwsem.c | 104 ++++++++++++++++++++++++++++++++++++++++-
> 2 files changed, 108 insertions(+), 2 deletions(-)
>
> diff --git a/include/linux/rwsem.h b/include/linux/rwsem.h
> index 4f1c18992f76..e7bf9dfc471a 100644
> --- a/include/linux/rwsem.h
> +++ b/include/linux/rwsem.h
> @@ -250,6 +250,12 @@ DEFINE_GUARD_COND(rwsem_write, _try, down_write_trylock(_T))
> */
> extern void downgrade_write(struct rw_semaphore *sem);
>
> +/*
> + * wait for current writer to be finished
> + */
> +void rwsem_wait(struct rw_semaphore *sem);
> +int __must_check rwsem_wait_killable(struct rw_semaphore *sem);
> +
> #ifdef CONFIG_DEBUG_LOCK_ALLOC
> /*
> * nested locking. NOTE: rwsems are not allowed to recurse
> diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c
> index 2340b6d90ec6..7c8096c5586f 100644
> --- a/kernel/locking/rwsem.c
> +++ b/kernel/locking/rwsem.c
> @@ -332,7 +332,8 @@ EXPORT_SYMBOL(__init_rwsem);
>
> enum rwsem_waiter_type {
> RWSEM_WAITING_FOR_WRITE,
> - RWSEM_WAITING_FOR_READ
> + RWSEM_WAITING_FOR_READ,
> + RWSEM_WAITING_FOR_RELEASE,
> };
>
> struct rwsem_waiter {
> @@ -511,7 +512,8 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
> if (waiter->type == RWSEM_WAITING_FOR_WRITE)
> continue;
>
> - woken++;
> + if (waiter->type == RWSEM_WAITING_FOR_READ)
> + woken++;
> list_move_tail(&waiter->list, &wlist);
>
> /*
> @@ -1401,6 +1403,67 @@ static inline void __downgrade_write(struct rw_semaphore *sem)
> preempt_enable();
> }
>
> +static inline int __wait_read_common(struct rw_semaphore *sem, int state)
> +{
> + int ret = 0;
> + long adjustment = 0;
> + struct rwsem_waiter waiter;
> + DEFINE_WAKE_Q(wake_q);
> +
> + waiter.task = current;
> + waiter.type = RWSEM_WAITING_FOR_RELEASE;
> + waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;
> + waiter.handoff_set = false;
> +
> + preempt_disable();
> + raw_spin_lock_irq(&sem->wait_lock);
> + if (list_empty(&sem->wait_list)) {
> + if (!(atomic_long_read(&sem->count) & RWSEM_WRITER_MASK)) {
> + /* Provide lock ACQUIRE */
> + smp_acquire__after_ctrl_dep();
> + raw_spin_unlock_irq(&sem->wait_lock);
> + goto done;
> + }
> + adjustment = RWSEM_FLAG_WAITERS;
> + }
> + rwsem_add_waiter(sem, &waiter);
> + if (adjustment) {
> + long count = atomic_long_add_return(adjustment, &sem->count);
> + rwsem_cond_wake_waiter(sem, count, &wake_q);
> + }
> + raw_spin_unlock_irq(&sem->wait_lock);
> +
> + if (!wake_q_empty(&wake_q))
> + wake_up_q(&wake_q);
> +
> + for (;;) {
> + set_current_state(state);
> + if (!smp_load_acquire(&waiter.task)) {
> + /* Matches rwsem_mark_wake()'s smp_store_release(). */
> + break;
> + }
> + if (signal_pending_state(state, current)) {
> + raw_spin_lock_irq(&sem->wait_lock);
> + if (waiter.task)
> + goto out_nolock;
> + raw_spin_unlock_irq(&sem->wait_lock);
> + /* Ordered by sem->wait_lock against rwsem_mark_wake(). */
> + break;
> + }
> + schedule_preempt_disabled();
> + }
> +
> + __set_current_state(TASK_RUNNING);
> +done:
> + preempt_enable();
> + return ret;
> +out_nolock:
> + rwsem_del_wake_waiter(sem, &waiter, &wake_q);
> + __set_current_state(TASK_RUNNING);
> + ret = -EINTR;
> + goto done;
> +}
> +
> #else /* !CONFIG_PREEMPT_RT */
>
> #define RT_MUTEX_BUILD_MUTEX
> @@ -1500,6 +1563,11 @@ static inline void __downgrade_write(struct rw_semaphore *sem)
> rwbase_write_downgrade(&sem->rwbase);
> }
>
> +static inline int __wait_read_killable(struct rw_semaphore *sem)
> +{
> + return rwbase_wait_lock(&sem->rwbase, TASK_KILLABLE);
> +}
> +
> /* Debug stubs for the common API */
> #define DEBUG_RWSEMS_WARN_ON(c, sem)
>
> @@ -1643,6 +1711,38 @@ void downgrade_write(struct rw_semaphore *sem)
> }
> EXPORT_SYMBOL(downgrade_write);
>
> +/**
> + * rwsem_wait_killable - Wait for current write lock holder to release lock
> + * @sem: The semaphore to wait on.
> + *
> + * This is equivalent to calling down_read(); up_read() but avoids the
> + * possibility that the thread will be preempted while holding the lock
> + * causing threads that want to take the lock for writes to block. The
> + * intended use case is for lockless readers who notice an inconsistent
> + * state and want to wait for the current writer to finish.
> + */
> +int rwsem_wait_killable(struct rw_semaphore *sem)
> +{
> + might_sleep();
> +
> + rwsem_acquire_read(&sem->dep_map, 0, 0, _RET_IP_);
> + rwsem_release(&sem->dep_map, _RET_IP_);
> +
> + return __wait_read_common(sem, TASK_KILLABLE);
> +}
> +EXPORT_SYMBOL(rwsem_wait_killable);
> +
> +void rwsem_wait(struct rw_semaphore *sem)
> +{
> + might_sleep();
> +
> + rwsem_acquire_read(&sem->dep_map, 0, 0, _RET_IP_);
> + rwsem_release(&sem->dep_map, _RET_IP_);
> +
> + __wait_read_common(sem, TASK_UNINTERRUPTIBLE);
> +}
> +EXPORT_SYMBOL(rwsem_wait);
> +
> #ifdef CONFIG_DEBUG_LOCK_ALLOC
>
> void down_read_nested(struct rw_semaphore *sem, int subclass)
> --
> 2.43.0
>