2014-04-09 19:19:28

by Steven Rostedt

[permalink] [raw]
Subject: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems

A while back ago I wrote a patch that would allow for reader/writer
locks like rwlock and rwsems to have multiple readers in PREEMPT_RT. It
was slick and fast but unfortunately it was way too complex and ridden
with nasty little critters which earned me my large collection of
frozen sharks in the fridge (which are quite tasty).

The main problem with my previous solution was that I tried to be too
clever. I worked hard on making the rw mutex still have the "fast
path". That is, the cmpxchg that could allow a non contended grabbing
of the lock be one instruction and be off with it. But to get that
working required lots of tricks and black magic that was certainly
going to fail. Thus, with the raining of sharks on my parade, the
priority inheritance mutex with multiple owners died a slow painful
death.

So we thought.

But over the years, a new darkness was on the horizon. Complaints about
running highly threaded processes (did I hear Java?) were suffering
huge performance hits on the PREEMPT_RT kernel. Whether or not the
processes were real-time tasks, they still were horrible compared to
running the same tasks on the mainline kernel. Note, this was being
done on machines with many CPUs.

The culprit mostly was a single rwsem, the notorious mmap_sem that
can be taking several times for read, and as on RT, this is just a
single mutex, and it would serialize these accesses that would not
happen on mainline.

I looked back at my poor dead rw multi pi reader patch and thought to
myself. "How complex would this be if I removed the 'fast path' from
the code". I decided to build a new tower in Mordor.

I feel that I am correct. By removing the fast path and requiring all
accesses to the rwsem to go through the slow path (must take the
wait_lock to do anything). The code really wasn't that bad. I also only
focused on the rwsem and did not worry about the rwlocks as that hasn't
been pointed out as a bottle neck yet. If it does happen to be, this
code could easily work on rwlocks too.

I'm much more confident in this code than I was with my previous
version of the rwlock multi-reader patch. I added a bunch of comments
to this code to explain how things interact. The writer unlock was
still able to use the fast path as the writers are pretty much like a
normal mutex. Too bad that the writer unlock is not a high point of
contention.

This patch is built on top of the two other patches that I posted
earlier, which should not be as controversial.

If you have any benchmark on large machines I would be very happy if
you could test this patch against the unpatched version of -rt.

Cheers,

-- Steve

Signed-off-by: Steven Rostedt <[email protected]>
---
Index: linux-rt.git/kernel/rtmutex.c
===================================================================
--- linux-rt.git.orig/kernel/rtmutex.c
+++ linux-rt.git/kernel/rtmutex.c
@@ -26,6 +26,15 @@
#include "rtmutex_common.h"

/*
+ * rt_rw_limit is the number of simultaneous readers of a rwsem lock.
+ *
+ * rt_rw_limit gets updated on boot up to the number of
+ * possible CPUs, but we need to initialize it to something other
+ * than zero.
+ */
+unsigned rt_rw_limit = NR_CPUS;
+
+/*
* lock->owner state tracking:
*
* lock->owner holds the task_struct pointer of the owner. Bit 0
@@ -110,19 +119,44 @@ static inline void init_lists(struct rt_
plist_head_init(&lock->wait_list);
}

+static inline void init_rw_lists(struct rt_rw_mutex *rwlock)
+{
+ struct rt_mutex *lock = &rwlock->mutex;
+
+ /*
+ * A rwsem priority is initialized to -1 and never will
+ * be that again.
+ */
+ if (unlikely(rwlock->prio < 0)) {
+ rwlock->prio = MAX_PRIO;
+ init_lists(lock);
+ }
+}
+
+static int rt_mutex_get_readers_prio(struct task_struct *task, int prio);
+static inline int task_has_reader_locks(struct task_struct *task);
+
/*
* Calculate task priority from the waiter list priority
*
* Return task->normal_prio when the waiter list is empty or when
* the waiter is not allowed to do priority boosting
+ *
+ * On PREEMPT_RT, we also check the priorities of the list
+ * of read locks that the task holds.
*/
int rt_mutex_getprio(struct task_struct *task)
{
- if (likely(!task_has_pi_waiters(task)))
- return task->normal_prio;
+ int prio = task->normal_prio;
+
+ if (likely(!task_has_pi_waiters(task) &&
+ !task_has_reader_locks(task)))
+ return prio;
+
+ if (task_has_reader_locks(task))
+ prio = rt_mutex_get_readers_prio(task, prio);

- return min(task_top_pi_waiter(task)->pi_list_entry.prio,
- task->normal_prio);
+ return min(task_top_pi_waiter(task)->pi_list_entry.prio, prio);
}

/*
@@ -181,6 +215,11 @@ static void rt_mutex_wake_waiter(struct
*/
int max_lock_depth = 1024;

+static int rt_mutex_adjust_readers(struct rt_mutex *orig_lock,
+ struct rt_mutex_waiter *orig_waiter,
+ struct task_struct *top_task,
+ struct rt_mutex *lock,
+ int recursion_depth);
/*
* Adjust the priority chain. Also used for deadlock detection.
* Decreases task's usage by one - may thus free the task.
@@ -203,7 +242,8 @@ static int rt_mutex_adjust_prio_chain(st
int deadlock_detect,
struct rt_mutex *orig_lock,
struct rt_mutex_waiter *orig_waiter,
- struct task_struct *top_task)
+ struct task_struct *top_task,
+ int recursion_depth)
{
struct rt_mutex *lock;
struct rt_mutex_waiter *waiter, *top_waiter = orig_waiter;
@@ -316,6 +356,18 @@ static int rt_mutex_adjust_prio_chain(st

/* Grab the next task */
task = rt_mutex_owner(lock);
+
+ /*
+ * Readers are special. We may need to boost more than one owner.
+ */
+ if (unlikely(task == RT_RW_READER)) {
+ ret = rt_mutex_adjust_readers(orig_lock, orig_waiter,
+ top_task, lock,
+ recursion_depth);
+ raw_spin_unlock(&lock->wait_lock);
+ goto out;
+ }
+
get_task_struct(task);
raw_spin_lock_irqsave(&task->pi_lock, flags);

@@ -349,7 +401,7 @@ static int rt_mutex_adjust_prio_chain(st
raw_spin_unlock_irqrestore(&task->pi_lock, flags);
out_put_task:
put_task_struct(task);
-
+ out:
return ret;
}

@@ -518,6 +570,13 @@ static int task_blocks_on_rt_mutex(struc
return 0;

if (waiter == rt_mutex_top_waiter(lock)) {
+ /* readers are handled differently */
+ if (unlikely(owner == RT_RW_READER)) {
+ res = rt_mutex_adjust_readers(lock, waiter,
+ current, lock, 0);
+ return res;
+ }
+
raw_spin_lock_irqsave(&owner->pi_lock, flags);
plist_del(&top_waiter->pi_list_entry, &owner->pi_waiters);
plist_add(&waiter->pi_list_entry, &owner->pi_waiters);
@@ -527,7 +586,8 @@ static int task_blocks_on_rt_mutex(struc
chain_walk = 1;
raw_spin_unlock_irqrestore(&owner->pi_lock, flags);
}
- else if (debug_rt_mutex_detect_deadlock(waiter, detect_deadlock))
+ else if (debug_rt_mutex_detect_deadlock(waiter, detect_deadlock) &&
+ owner != RT_RW_READER)
chain_walk = 1;

if (!chain_walk)
@@ -543,7 +603,7 @@ static int task_blocks_on_rt_mutex(struc
raw_spin_unlock(&lock->wait_lock);

res = rt_mutex_adjust_prio_chain(owner, detect_deadlock, lock, waiter,
- task);
+ task, 0);

raw_spin_lock(&lock->wait_lock);

@@ -633,7 +693,7 @@ static void remove_waiter(struct rt_mute

raw_spin_unlock(&lock->wait_lock);

- rt_mutex_adjust_prio_chain(owner, 0, lock, NULL, current);
+ rt_mutex_adjust_prio_chain(owner, 0, lock, NULL, current, 0);

raw_spin_lock(&lock->wait_lock);
}
@@ -660,7 +720,7 @@ void rt_mutex_adjust_pi(struct task_stru
/* gets dropped in rt_mutex_adjust_prio_chain()! */
get_task_struct(task);
raw_spin_unlock_irqrestore(&task->pi_lock, flags);
- rt_mutex_adjust_prio_chain(task, 0, NULL, NULL, task);
+ rt_mutex_adjust_prio_chain(task, 0, NULL, NULL, task, 0);
}

#ifdef CONFIG_PREEMPT_RT_FULL
@@ -739,7 +799,7 @@ static void noinline __sched rt_spin_lo
struct rt_mutex_waiter waiter, *top_waiter;
int ret;

- rt_mutex_init_waiter(&waiter, true);
+ rt_mutex_init_waiter(&waiter, true, false);

raw_spin_lock(&lock->wait_lock);
init_lists(lock);
@@ -1001,15 +1061,564 @@ __mutex_lock_check_stamp(struct rt_mutex

return 0;
}
+
+/*
+ * Wake up the next waiter on a rw lock.
+ *
+ * Similar to wakeup_next_waiter() but the waiter is not on
+ * the owner's pi_waiters. Also, it does not reset the lock
+ * owner.
+ *
+ * Called with lock->wait_lock held.
+ */
+static void wakeup_next_rw_waiter(struct rt_mutex *lock)
+{
+ struct rt_mutex_waiter *waiter;
+
+ waiter = rt_mutex_top_waiter(lock);
+ rt_mutex_wake_waiter(waiter);
+}
+
+/* Called with rwmutex->mutex.wait_lock held */
+static inline void
+rt_rw_mutex_take_as_reader(struct task_struct *task,
+ struct rt_rw_mutex *rwmutex,
+ struct rt_mutex_waiter *waiter)
+{
+ struct reader_lock_struct *rls;
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ unsigned long flags;
+ bool wakeup = false;
+
+ rwmutex->nr_owners++;
+ rt_mutex_set_owner(&rwmutex->mutex, RT_RW_READER);
+
+ if (waiter || rt_mutex_has_waiters(mutex)) {
+ unsigned long flags;
+ struct rt_mutex_waiter *top;
+
+ raw_spin_lock_irqsave(&task->pi_lock, flags);
+
+ /* remove the queued waiter. */
+ if (waiter) {
+ plist_del(&waiter->list_entry, &mutex->wait_list);
+ task->pi_blocked_on = NULL;
+ }
+
+ /*
+ * Initialize the rwmutex prio to the priority of
+ * the top waiter.
+ */
+ if (rt_mutex_has_waiters(mutex)) {
+ top = rt_mutex_top_waiter(mutex);
+ top->pi_list_entry.prio = top->list_entry.prio;
+ /*
+ * Readers set the lock priority for faster access
+ * to read the priorities of the locks it owns
+ * when boosting. This helps to not have to take
+ * the pi_lock of the task. The rwmutex->prio
+ * is protected by the rwmutex->mutex.wait_lock,
+ * which is held during boosting.
+ */
+ rwmutex->prio = top->list_entry.prio;
+
+ /*
+ * If this waiter is a reader, and the reader limit
+ * has not been hit, then we can wake this waiter
+ * up too.
+ */
+ if (!top->writer && rwmutex->nr_owners < rt_rw_limit)
+ wakeup = true;
+ } else
+ rwmutex->prio = MAX_PRIO;
+ raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+ } else
+ rwmutex->prio = MAX_PRIO;
+
+ /*
+ * It is possible to have holes in the owned_read_locks array.
+ * If we take read lock A and then B, but then release A, we
+ * can't move the pointer of B because if something blocks on
+ * B, it can use the B pointer to boost this task. B can only
+ * be moved, by owning the wait_list lock of B. Remember, the
+ * B lock has its pointers to that index of our array.
+ */
+ rls = &task->owned_read_locks[task->reader_lock_free];
+ BUG_ON(rls->lock);
+
+ /*
+ * Grabing the pi_lock here as other tasks can boost this
+ * lock via other held locks, and if free lock descriptor
+ * was not at the end of the owned_read_locks array, then
+ * it might get confused if it sees this descriptor with
+ * a lock and no task.
+ */
+ raw_spin_lock_irqsave(&task->pi_lock, flags);
+ rls->lock = rwmutex;
+ rls->task = task;
+ list_add(&rls->list, &rwmutex->owners);
+ raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+ if (task->reader_lock_free == task->reader_lock_count) {
+ /*
+ * If we nest too deep, then this task can never get the lock.
+ * This task will then block for good. Warn about this and
+ * hopefully, people will notice the warning. If not, they will notice
+ * the dead task. Maybe this should be a BUG_ON()
+ */
+ if (WARN_ON(task->reader_lock_count == MAX_RWLOCK_DEPTH))
+ return;
+
+ /*
+ * We don't need to take the pi_lock here as we have the
+ * wait_lock of the lock at the end of the list. If this task
+ * was being boosted by a task blocked on this lock, it would
+ * need to grab the wait_lock before boosting this task.
+ */
+ task->reader_lock_count++;
+ task->reader_lock_free++;
+ } else {
+ /*
+ * Find the next free lock in array. Again, we do not need
+ * to grab the pi_lock because the boosting doesn't use
+ * the reader_lock_free variable, which is the only thing
+ * we are updating here.
+ */
+ do {
+ rls = &task->owned_read_locks[++task->reader_lock_free];
+ } while (rls->lock &&
+ task->reader_lock_free < task->reader_lock_count);
+ }
+
+ if (wakeup)
+ wakeup_next_rw_waiter(&rwmutex->mutex);
+}
+
+static int try_to_take_rw_read(struct rt_rw_mutex *rwmutex,
+ struct rt_mutex_waiter *waiter)
+{
+ struct task_struct *task = current;
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ struct task_struct *owner;
+
+ assert_raw_spin_locked(&mutex->wait_lock);
+
+ /* The writer unlock can use the fast path */
+ mark_rt_mutex_waiters(mutex);
+
+ /* If we are at the reader limit, we can't take the lock */
+ if (rt_rw_limit && unlikely(rwmutex->nr_owners >= rt_rw_limit))
+ return 0;
+
+ /*
+ * If there's no waiters, and there is no owner or
+ * the owner is a reader, we get the lock.
+ * Note, if the waiter or pending bits are set in the owner
+ * then we need to do more checks.
+ */
+ if (likely(!mutex->owner || mutex->owner == RT_RW_READER))
+ goto taken;
+
+ owner = rt_mutex_owner(mutex);
+
+ /* If the lock is owned by a task, then it is a writer */
+ if (owner && owner != RT_RW_READER)
+ return 0;
+
+ /*
+ * A writer or a reader may be waiting. In either case, we
+ * may still be able to steal the lock. The RT rwsems are
+ * not fair if the new reader comes in and is higher priority
+ * than all the waiters.
+ */
+ if (likely(rt_mutex_has_waiters(mutex))) {
+ struct task_struct *pown = rt_mutex_top_waiter(mutex)->task;
+
+ if (task != pown && !lock_is_stealable(task, pown, STEAL_NORMAL))
+ return 0;
+ }
+
+ taken:
+ rt_rw_mutex_take_as_reader(task, rwmutex, waiter);
+ return 1;
+}
+
+void rt_rw_mutex_read_lock(struct rt_rw_mutex *rwmutex)
+{
+ struct rt_mutex_waiter waiter;
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ int ret;
+
+ raw_spin_lock(&mutex->wait_lock);
+ init_rw_lists(rwmutex);
+
+ if (likely(try_to_take_rw_read(rwmutex, NULL))) {
+ /* Got the lock */
+ raw_spin_unlock(&mutex->wait_lock);
+ return;
+ }
+
+ rt_mutex_init_waiter(&waiter, false, false);
+
+ set_current_state(TASK_UNINTERRUPTIBLE);
+
+ ret = task_blocks_on_rt_mutex(mutex, &waiter, current, 0);
+ BUG_ON(ret);
+
+ for (;;) {
+ /* Try to acquire the lock: */
+ if (try_to_take_rw_read(rwmutex, &waiter))
+ break;
+
+ raw_spin_unlock(&mutex->wait_lock);
+
+ debug_rt_mutex_print_deadlock(waiter);
+
+ schedule_rt_mutex(mutex);
+
+ raw_spin_lock(&mutex->wait_lock);
+ set_current_state(TASK_UNINTERRUPTIBLE);
+ }
+
+ set_current_state(TASK_RUNNING);
+
+ raw_spin_unlock(&mutex->wait_lock);
+
+ debug_rt_mutex_free_waiter(&waiter);
+}
+
+int rt_rw_mutex_read_trylock(struct rt_rw_mutex *rwmutex)
+{
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ int ret = 0;
+
+ if (!raw_spin_trylock(&mutex->wait_lock))
+ return ret;
+
+ init_rw_lists(rwmutex);
+
+ if (try_to_take_rw_read(rwmutex, NULL))
+ ret = 1;
+
+ raw_spin_unlock(&mutex->wait_lock);
+
+ return ret;
+}
+
+static int
+try_to_take_rw_write(struct rt_rw_mutex *rwmutex, struct rt_mutex_waiter *waiter)
+{
+ struct rt_mutex *mutex = &rwmutex->mutex;
+
+ /* Writers block if there's any readers owning the lock */
+ if (rwmutex->nr_owners)
+ return 0;
+
+ /* No readers, then we can try to take the mutex normally. */
+ return try_to_take_rt_mutex(mutex, current, waiter);
+}
+
+void rt_rw_mutex_write_lock(struct rt_rw_mutex *rwmutex)
+{
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ struct rt_mutex_waiter waiter;
+ struct task_struct *task = current;
+ int ret;
+
+ raw_spin_lock(&mutex->wait_lock);
+ init_rw_lists(rwmutex);
+
+ if (try_to_take_rw_write(rwmutex, NULL)) {
+ raw_spin_unlock(&mutex->wait_lock);
+ return;
+ }
+
+ /* Writers wake up differently than readers (flag it) */
+ rt_mutex_init_waiter(&waiter, false, true);
+
+ ret = task_blocks_on_rt_mutex(mutex, &waiter, task, 0);
+ BUG_ON(ret);
+
+ set_current_state(TASK_UNINTERRUPTIBLE);
+
+ for (;;) {
+ /* Try to acquire the lock: */
+ if (try_to_take_rw_write(rwmutex, &waiter))
+ break;
+
+ raw_spin_unlock(&mutex->wait_lock);
+
+ debug_rt_mutex_print_deadlock(waiter);
+
+ schedule_rt_mutex(mutex);
+
+ raw_spin_lock(&mutex->wait_lock);
+ set_current_state(TASK_UNINTERRUPTIBLE);
+ }
+
+ set_current_state(TASK_RUNNING);
+
+ raw_spin_unlock(&mutex->wait_lock);
+ WARN_ONCE(rwmutex->nr_owners, "Writer has lock with readers");
+
+ debug_rt_mutex_free_waiter(&waiter);
+}
+
+int rt_rw_mutex_write_trylock(struct rt_rw_mutex *rwmutex)
+{
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ int ret = 0;
+
+ if (!raw_spin_trylock(&mutex->wait_lock))
+ return ret;
+
+ init_rw_lists(rwmutex);
+
+ if (try_to_take_rw_write(rwmutex, NULL))
+ ret = 1;
+
+ raw_spin_unlock(&mutex->wait_lock);
+
+ return ret;
+}
+
+/*
+ * When a reader lock is released, see if the reader_lock_count
+ * can be moved back.
+ */
+static void shrink_reader_lock_array(struct task_struct *task)
+{
+ struct reader_lock_struct *read_locks = task->owned_read_locks;
+
+ while (task->reader_lock_count &&
+ read_locks[task->reader_lock_count - 1].lock == NULL)
+ task->reader_lock_count--;
+
+ if (task->reader_lock_free > task->reader_lock_count)
+ task->reader_lock_free = task->reader_lock_count;
+}
+
+void rt_rw_mutex_read_unlock(struct rt_rw_mutex *rwmutex)
+{
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ struct rt_mutex_waiter *waiter;
+ struct reader_lock_struct *rls;
+ struct task_struct *task = current;
+ unsigned long flags;
+ int readers;
+ int i;
+
+ raw_spin_lock(&mutex->wait_lock);
+
+ rt_mutex_deadlock_account_unlock(current);
+
+ WARN_ON_ONCE(!rwmutex->nr_owners);
+
+ rwmutex->nr_owners--;
+
+ if (rt_mutex_has_waiters(mutex)) {
+ /*
+ * If the top waiter is a reader and we are under
+ * the limit, or the top waiter is a writer and
+ * there's no more readers, then we can give the
+ * top waiter pending ownership and wake it up.
+ *
+ * To simplify things, we only wake up one task, even
+ * if its a reader. If the reader wakes up and gets the
+ * lock, it will look to see if it can wake up the next
+ * waiter, and so on. This way we only need to worry about
+ * one task at a time.
+ */
+ waiter = rt_mutex_top_waiter(mutex);
+ readers = rwmutex->nr_owners;
+ if ((waiter->writer && !readers) ||
+ (!waiter->writer && readers < rt_rw_limit))
+ wakeup_next_rw_waiter(mutex);
+ } else
+ rwmutex->prio = MAX_PRIO;
+
+ if (!rwmutex->nr_owners)
+ rt_mutex_set_owner(&rwmutex->mutex, NULL);
+
+ raw_spin_lock_irqsave(&task->pi_lock, flags);
+ /* Remove the lock from this tasks list */
+ for (i = task->reader_lock_count - 1; i >= 0; i--) {
+ rls = &task->owned_read_locks[i];
+
+ if (rls->lock == rwmutex) {
+ rls->lock = NULL;
+ list_del_init(&rls->list);
+ /* Shrink the array if we can. */
+ if (i == task->reader_lock_count - 1)
+ shrink_reader_lock_array(task);
+ else if (i < task->reader_lock_free)
+ task->reader_lock_free = i;
+ break;
+ }
+ }
+ raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+ WARN_ON_ONCE(i < 0);
+
+ raw_spin_unlock(&mutex->wait_lock);
+
+ /* Undo pi boosting if necessary */
+ rt_mutex_adjust_prio(current);
+}
+
+void rt_rw_mutex_downgrade_write(struct rt_rw_mutex *rwmutex)
+{
+ struct task_struct *task = current;
+ struct rt_mutex *mutex = &rwmutex->mutex;
+
+ raw_spin_lock(&mutex->wait_lock);
+
+ /*
+ * Writers have normal pi with other tasks blocked
+ * on the lock. That is, the top waiter will be in the
+ * pi_list of this task. But for readers, waiters of
+ * the lock are not included in the pi_list, only the
+ * locks are. We must remove the top waiter of this
+ * lock from this task.
+ */
+ if (rt_mutex_has_waiters(mutex)) {
+ struct rt_mutex_waiter *waiter;
+ unsigned long flags;
+
+ waiter = rt_mutex_top_waiter(mutex);
+ raw_spin_lock_irqsave(&task->pi_lock, flags);
+ plist_del(&waiter->pi_list_entry, &task->pi_waiters);
+ /*
+ * The rt_rw_mutex_take_as_reader() will update
+ * the rwmutex prio.
+ */
+ raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+ }
+
+ WARN_ONCE(rwmutex->nr_owners, "Writer owned with readers");
+
+ rt_rw_mutex_take_as_reader(task, rwmutex, NULL);
+
+ raw_spin_unlock(&mutex->wait_lock);
+}
+
+static inline int task_has_reader_locks(struct task_struct *task)
+{
+ return task->reader_lock_count;
+}
+
+static int rt_mutex_get_readers_prio(struct task_struct *task, int prio)
+{
+ struct reader_lock_struct *rls;
+ struct rt_rw_mutex *rwmutex;
+ int lock_prio;
+ int i;
+
+ for (i = 0; i < task->reader_lock_count; i++) {
+ rls = &task->owned_read_locks[i];
+ rwmutex = rls->lock;
+ if (!rwmutex)
+ continue;
+ lock_prio = rwmutex->prio;
+ if (prio > lock_prio)
+ prio = lock_prio;
+ }
+ WARN_ON_ONCE(prio < 0);
+
+ return prio;
+}
+
+/* Expects to be called with lock->wait_lock held */
+static int rt_mutex_adjust_readers(struct rt_mutex *orig_lock,
+ struct rt_mutex_waiter *orig_waiter,
+ struct task_struct *top_task,
+ struct rt_mutex *lock,
+ int recursion_depth)
+{
+ struct reader_lock_struct *rls;
+ struct rt_mutex_waiter *waiter;
+ struct task_struct *task;
+ struct rt_rw_mutex *rwmutex = container_of(lock, struct rt_rw_mutex, mutex);
+ unsigned long flags;
+
+ /* Update the rwmutex's prio */
+ if (rt_mutex_has_waiters(lock)) {
+ waiter = rt_mutex_top_waiter(lock);
+ /*
+ * Do we need to grab the task->pi_lock?
+ * Really, we are only reading it. If it
+ * changes, then that should follow this chain
+ * too.
+ */
+ rwmutex->prio = waiter->task->prio;
+ } else
+ rwmutex->prio = MAX_PRIO;
+
+ if (recursion_depth >= MAX_RWLOCK_DEPTH) {
+ WARN_ON(1);
+ return 1;
+ }
+
+ list_for_each_entry(rls, &rwmutex->owners, list) {
+ bool skip = false;
+
+ task = rls->task;
+
+ raw_spin_lock_irqsave(&task->pi_lock, flags);
+ __rt_mutex_adjust_prio(task);
+ /*
+ * We need to grab the pi_lock to adjust the task prio
+ * might as well use this to check if the task is blocked
+ * as well, and save on a call to the prio chain that will
+ * just grab the lock again and do the test.
+ */
+ if (!rt_mutex_real_waiter(task->pi_blocked_on))
+ skip = true;
+ raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+ if (skip)
+ continue;
+
+ get_task_struct(task);
+ /*
+ * rt_mutex_adjust_prio_chain will do
+ * the put_task_struct
+ */
+ rt_mutex_adjust_prio_chain(task, 0, orig_lock,
+ orig_waiter, top_task,
+ recursion_depth+1);
+ }
+
+ return 0;
+}
#else
+static int rt_mutex_adjust_readers(struct rt_mutex *orig_lock,
+ struct rt_mutex_waiter *orig_waiter,
+ struct task_struct *top_task,
+ struct rt_mutex *lock,
+ int recursion_depth)
+{
+ return 0;
+}
+
+static int rt_mutex_get_readers_prio(struct task_struct *task, int prio)
+{
+ return prio;
+}
+
+static inline int task_has_reader_locks(struct task_struct *task)
+{
+ return 0;
+}
+
static inline int __sched
__mutex_lock_check_stamp(struct rt_mutex *lock, struct ww_acquire_ctx *ctx)
{
BUG();
return 0;
}
-
-#endif
+#endif /* CONFIG_PREEMPT_RT */

/**
* __rt_mutex_slowlock() - Perform the wait-wake-try-to-take loop
@@ -1154,7 +1763,7 @@ rt_mutex_slowlock(struct rt_mutex *lock,
struct rt_mutex_waiter waiter;
int ret = 0;

- rt_mutex_init_waiter(&waiter, false);
+ rt_mutex_init_waiter(&waiter, false, false);

raw_spin_lock(&lock->wait_lock);
init_lists(lock);
@@ -1718,4 +2327,12 @@ void __sched ww_mutex_unlock(struct ww_m
rt_mutex_unlock(&lock->base.lock);
}
EXPORT_SYMBOL(ww_mutex_unlock);
+
+static int __init rt_rw_limit_init(void)
+{
+ rt_rw_limit = num_possible_cpus();
+ return 0;
+}
+core_initcall(rt_rw_limit_init);
+
#endif
Index: linux-rt.git/kernel/rtmutex_common.h
===================================================================
--- linux-rt.git.orig/kernel/rtmutex_common.h
+++ linux-rt.git/kernel/rtmutex_common.h
@@ -43,6 +43,7 @@ extern void schedule_rt_mutex_test(struc
* @list_entry: pi node to enqueue into the mutex waiters list
* @pi_list_entry: pi node to enqueue into the mutex owner waiters list
* @task: task reference to the blocked task
+ * @writer: true if its a rwsem writer that is blocked
*/
struct rt_mutex_waiter {
struct plist_node list_entry;
@@ -50,6 +51,7 @@ struct rt_mutex_waiter {
struct task_struct *task;
struct rt_mutex *lock;
bool savestate;
+ bool writer;
#ifdef CONFIG_DEBUG_RT_MUTEXES
unsigned long ip;
struct pid *deadlock_task_pid;
@@ -101,6 +103,20 @@ static inline struct task_struct *rt_mut
((unsigned long)lock->owner & ~RT_MUTEX_OWNER_MASKALL);
}

+/* used as reader owner of the mutex */
+#define RT_RW_READER (struct task_struct *)0x100
+
+#ifdef CONFIG_PREEMPT_RT_FULL
+
+void rt_rw_mutex_read_unlock(struct rt_rw_mutex *rwmutex);
+void rt_rw_mutex_downgrade_write(struct rt_rw_mutex *rwmutex);
+int rt_rw_mutex_write_trylock(struct rt_rw_mutex *rwmutex);
+void rt_rw_mutex_write_lock(struct rt_rw_mutex *rwmutex);
+int rt_rw_mutex_read_trylock(struct rt_rw_mutex *rwmutex);
+void rt_rw_mutex_read_lock(struct rt_rw_mutex *rwmutex);
+
+#endif /* CONFIG_PREEMPT_RT */
+
/*
* PI-futex support (proxy locking functions, etc.):
*/
@@ -128,11 +144,12 @@ extern int rt_mutex_finish_proxy_lock(st
#endif

static inline void
-rt_mutex_init_waiter(struct rt_mutex_waiter *waiter, bool savestate)
+rt_mutex_init_waiter(struct rt_mutex_waiter *waiter, bool savestate, bool writer)
{
debug_rt_mutex_init_waiter(waiter);
waiter->task = NULL;
waiter->savestate = savestate;
+ waiter->writer = writer;
}

#endif
Index: linux-rt.git/kernel/rt.c
===================================================================
--- linux-rt.git.orig/kernel/rt.c
+++ linux-rt.git/kernel/rt.c
@@ -310,14 +310,19 @@ EXPORT_SYMBOL(__rt_rwlock_init);
void rt_up_write(struct rw_semaphore *rwsem)
{
rwsem_release(&rwsem->dep_map, 1, _RET_IP_);
- rt_mutex_unlock(&rwsem->lock);
+ /*
+ * Unlocking a write is the same as unlocking the mutex.
+ * The woken reader will do all the work if it needs to
+ * wake up more than one reader.
+ */
+ __rt_spin_unlock(&rwsem->rwlock.mutex);
}
EXPORT_SYMBOL(rt_up_write);

void rt_up_read(struct rw_semaphore *rwsem)
{
rwsem_release(&rwsem->dep_map, 1, _RET_IP_);
- rt_mutex_unlock(&rwsem->lock);
+ rt_rw_mutex_read_unlock(&rwsem->rwlock);
}
EXPORT_SYMBOL(rt_up_read);

@@ -327,13 +332,14 @@ EXPORT_SYMBOL(rt_up_read);
*/
void rt_downgrade_write(struct rw_semaphore *rwsem)
{
- BUG_ON(rt_mutex_owner(&rwsem->lock) != current);
+ BUG_ON(rt_mutex_owner(&rwsem->rwlock.mutex) != current);
+ rt_rw_mutex_downgrade_write(&rwsem->rwlock);
}
EXPORT_SYMBOL(rt_downgrade_write);

int rt_down_write_trylock(struct rw_semaphore *rwsem)
{
- int ret = rt_mutex_trylock(&rwsem->lock);
+ int ret = rt_rw_mutex_write_trylock(&rwsem->rwlock);

if (ret)
rwsem_acquire(&rwsem->dep_map, 0, 1, _RET_IP_);
@@ -344,14 +350,14 @@ EXPORT_SYMBOL(rt_down_write_trylock);
void rt_down_write(struct rw_semaphore *rwsem)
{
rwsem_acquire(&rwsem->dep_map, 0, 0, _RET_IP_);
- rt_mutex_lock(&rwsem->lock);
+ rt_rw_mutex_write_lock(&rwsem->rwlock);
}
EXPORT_SYMBOL(rt_down_write);

void rt_down_write_nested(struct rw_semaphore *rwsem, int subclass)
{
rwsem_acquire(&rwsem->dep_map, subclass, 0, _RET_IP_);
- rt_mutex_lock(&rwsem->lock);
+ rt_rw_mutex_write_lock(&rwsem->rwlock);
}
EXPORT_SYMBOL(rt_down_write_nested);

@@ -359,14 +365,14 @@ void rt_down_write_nested_lock(struct rw
struct lockdep_map *nest)
{
rwsem_acquire_nest(&rwsem->dep_map, 0, 0, nest, _RET_IP_);
- rt_mutex_lock(&rwsem->lock);
+ rt_rw_mutex_write_lock(&rwsem->rwlock);
}

int rt_down_read_trylock(struct rw_semaphore *rwsem)
{
int ret;

- ret = rt_mutex_trylock(&rwsem->lock);
+ ret = rt_rw_mutex_read_trylock(&rwsem->rwlock);
if (ret)
rwsem_acquire(&rwsem->dep_map, 0, 1, _RET_IP_);

@@ -377,7 +383,7 @@ EXPORT_SYMBOL(rt_down_read_trylock);
static void __rt_down_read(struct rw_semaphore *rwsem, int subclass)
{
rwsem_acquire(&rwsem->dep_map, subclass, 0, _RET_IP_);
- rt_mutex_lock(&rwsem->lock);
+ rt_rw_mutex_read_lock(&rwsem->rwlock);
}

void rt_down_read(struct rw_semaphore *rwsem)
@@ -402,7 +408,8 @@ void __rt_rwsem_init(struct rw_semaphor
debug_check_no_locks_freed((void *)rwsem, sizeof(*rwsem));
lockdep_init_map(&rwsem->dep_map, name, key, 0);
#endif
- rwsem->lock.save_state = 0;
+ rt_rw_mutex_init(&rwsem->rwlock);
+ rwsem->rwlock.mutex.save_state = 0;
}
EXPORT_SYMBOL(__rt_rwsem_init);

Index: linux-rt.git/include/linux/sched.h
===================================================================
--- linux-rt.git.orig/include/linux/sched.h
+++ linux-rt.git/include/linux/sched.h
@@ -993,6 +993,22 @@ struct sched_entity {
#endif
};

+#ifdef CONFIG_PREEMPT_RT_FULL
+struct rt_rw_mutex;
+/**
+ * reader_lock_struct
+ *
+ * @lock: pointer to rwsem that is held
+ * @task: pointer back to task, for lock code
+ * @list_head: link into rt_rw_mutex owners list
+ */
+struct reader_lock_struct {
+ struct rt_rw_mutex *lock;
+ struct task_struct *task;
+ struct list_head list;
+};
+#endif
+
struct sched_rt_entity {
struct list_head run_list;
unsigned long timeout;
@@ -1224,6 +1240,10 @@ struct task_struct {
#ifdef CONFIG_PREEMPT_RT_FULL
/* TODO: move me into ->restart_block ? */
struct siginfo forced_info;
+#define MAX_RWLOCK_DEPTH 5
+ int reader_lock_count; /* index of last element in owned_read_locks */
+ int reader_lock_free; /* index of next free element. */
+ struct reader_lock_struct owned_read_locks[MAX_RWLOCK_DEPTH];
#endif

unsigned long sas_ss_sp;
Index: linux-rt.git/kernel/fork.c
===================================================================
--- linux-rt.git.orig/kernel/fork.c
+++ linux-rt.git/kernel/fork.c
@@ -1385,6 +1385,26 @@ static struct task_struct *copy_process(
if (retval)
goto bad_fork_cleanup_io;

+#ifdef CONFIG_PREEMPT_RT_FULL
+ p->reader_lock_count = 0;
+ p->reader_lock_free = 0;
+ /* Bracket to keep 'i' local */
+ {
+ int i;
+ /*
+ * We could put the initialization of this list in
+ * the grabbing of the lock, but it is safer to
+ * do it now. The list head initialization may be
+ * removed, but we'll keep it for now, just to be safe.
+ */
+ for (i = 0; i < MAX_RWLOCK_DEPTH; i++) {
+ p->owned_read_locks[i].lock = NULL;
+ p->owned_read_locks[i].task = p;
+ INIT_LIST_HEAD(&p->owned_read_locks[i].list);
+ }
+ }
+#endif
+
if (pid != &init_struct_pid) {
retval = -ENOMEM;
pid = alloc_pid(p->nsproxy->pid_ns_for_children);
Index: linux-rt.git/kernel/sysctl.c
===================================================================
--- linux-rt.git.orig/kernel/sysctl.c
+++ linux-rt.git/kernel/sysctl.c
@@ -91,6 +91,10 @@
#include <linux/nmi.h>
#endif

+#ifdef CONFIG_PREEMPT_RT_FULL
+extern int rt_rw_limit;
+#endif
+

#if defined(CONFIG_SYSCTL)

@@ -453,6 +457,15 @@ static struct ctl_table kern_table[] = {
.proc_handler = proc_dointvec,
},
#endif
+#ifdef CONFIG_PREEMPT_RT_FULL
+ {
+ .procname = "rwsem_reader_limit",
+ .data = &rt_rw_limit,
+ .maxlen = sizeof(int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec,
+ },
+#endif
{
.procname = "panic",
.data = &panic_timeout,
Index: linux-rt.git/include/linux/rtmutex.h
===================================================================
--- linux-rt.git.orig/include/linux/rtmutex.h
+++ linux-rt.git/include/linux/rtmutex.h
@@ -42,6 +42,21 @@ struct rt_mutex {
#endif
};

+/**
+ * The rt_rw_mutex structure
+ *
+ * @rt_mutex: The mutex to wait on
+ * @owners: list of read owners of the mutex
+ * @nr_owners: number of read owners
+ * @prio: the priority of the highest waiter
+ */
+struct rt_rw_mutex {
+ struct rt_mutex mutex;
+ struct list_head owners;
+ int nr_owners;
+ int prio;
+};
+
struct rt_mutex_waiter;
struct hrtimer_sleeper;

@@ -75,6 +90,15 @@ struct hrtimer_sleeper;
__rt_mutex_init(mutex, #mutex); \
} while (0)

+# define rt_rw_mutex_init(rwmutex) \
+ do { \
+ raw_spin_lock_init(&(rwmutex)->mutex.wait_lock); \
+ INIT_LIST_HEAD(&(rwmutex)->owners); \
+ (rwmutex)->nr_owners = 0; \
+ (rwmutex)->prio = -1; \
+ __rt_mutex_init(&(rwmutex)->mutex, #rwmutex); \
+ } while (0)
+
#define __RT_MUTEX_INITIALIZER_PLAIN(mutexname) \
.wait_lock = __RAW_SPIN_LOCK_UNLOCKED(mutexname.wait_lock) \
, .wait_list = PLIST_HEAD_INIT(mutexname.wait_list) \
@@ -85,6 +109,11 @@ struct hrtimer_sleeper;
#define __RT_MUTEX_INITIALIZER(mutexname) \
{ __RT_MUTEX_INITIALIZER_PLAIN(mutexname) }

+#define __RT_RW_MUTEX_INITIALIZER(mutexname) \
+ { .owners = LIST_HEAD_INIT(mutexname.owners) \
+ , .prio = -1 \
+ , .mutex = __RT_MUTEX_INITIALIZER(mutexname.mutex) }
+
#define __RT_MUTEX_INITIALIZER_SAVE_STATE(mutexname) \
{ __RT_MUTEX_INITIALIZER_PLAIN(mutexname) \
, .save_state = 1 }
Index: linux-rt.git/include/linux/rwsem_rt.h
===================================================================
--- linux-rt.git.orig/include/linux/rwsem_rt.h
+++ linux-rt.git/include/linux/rwsem_rt.h
@@ -19,14 +19,14 @@
#include <linux/rtmutex.h>

struct rw_semaphore {
- struct rt_mutex lock;
+ struct rt_rw_mutex rwlock;
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
};

#define __RWSEM_INITIALIZER(name) \
- { .lock = __RT_MUTEX_INITIALIZER(name.lock), \
+ { .rwlock = __RT_RW_MUTEX_INITIALIZER(name.rwlock), \
RW_DEP_MAP_INIT(name) }

#define DECLARE_RWSEM(lockname) \
@@ -37,7 +37,7 @@ extern void __rt_rwsem_init(struct rw_s

#define __rt_init_rwsem(sem, name, key) \
do { \
- rt_mutex_init(&(sem)->lock); \
+ rt_rw_mutex_init(&(sem)->rwlock); \
__rt_rwsem_init((sem), (name), (key));\
} while (0)

@@ -63,7 +63,7 @@ extern void rt_up_write(struct rw_semap
extern void rt_downgrade_write(struct rw_semaphore *rwsem);

#define init_rwsem(sem) rt_init_rwsem(sem)
-#define rwsem_is_locked(s) rt_mutex_is_locked(&(s)->lock)
+#define rwsem_is_locked(s) rt_mutex_is_locked(&(s)->rwlock.mutex)

static inline void down_read(struct rw_semaphore *sem)
{
Index: linux-rt.git/kernel/futex.c
===================================================================
--- linux-rt.git.orig/kernel/futex.c
+++ linux-rt.git/kernel/futex.c
@@ -2327,7 +2327,7 @@ static int futex_wait_requeue_pi(u32 __u
* The waiter is allocated on our stack, manipulated by the requeue
* code while we sleep on uaddr.
*/
- rt_mutex_init_waiter(&rt_waiter, false);
+ rt_mutex_init_waiter(&rt_waiter, false, false);

ret = get_futex_key(uaddr2, flags & FLAGS_SHARED, &key2, VERIFY_WRITE);
if (unlikely(ret != 0))


2014-04-10 14:18:36

by Mike Galbraith

[permalink] [raw]
Subject: Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems

On Wed, 2014-04-09 at 15:19 -0400, Steven Rostedt wrote:

> If you have any benchmark on large machines I would be very happy if
> you could test this patch against the unpatched version of -rt.

Too bad I don't have (and know how to use) specjbb.

I dug up old vmark, thinking I'd be able to get some halfway useful
relative numbers from it, but that was a waste of a day. The thing
performs so badly on 40 core box that rt _beats_ nopreempt, and after 2
nodes, you're going backward. 40 Westmere EX cores does a whopping ~2.5
* dinky old Q6600 box throughput.

-Mike

2014-04-10 14:28:50

by Mike Galbraith

[permalink] [raw]
Subject: Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems

On Thu, 2014-04-10 at 16:18 +0200, Mike Galbraith wrote:
> On Wed, 2014-04-09 at 15:19 -0400, Steven Rostedt wrote:
>
> > If you have any benchmark on large machines I would be very happy if
> > you could test this patch against the unpatched version of -rt.
>
> Too bad I don't have (and know how to use) specjbb.
>
> I dug up old vmark, thinking I'd be able to get some halfway useful
> relative numbers from it, but that was a waste of a day. The thing
> performs so badly on 40 core box that rt _beats_ nopreempt, and after 2
> nodes, you're going backward. 40 Westmere EX cores does a whopping ~2.5
> * dinky old Q6600 box throughput.

P.S. What I didn't see was any sign of a delta patched/unpatched..
which may mean nothing at all :-/

2014-04-10 14:33:00

by Steven Rostedt

[permalink] [raw]
Subject: Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems

On Thu, 10 Apr 2014 16:28:43 +0200
Mike Galbraith <[email protected]> wrote:

> On Thu, 2014-04-10 at 16:18 +0200, Mike Galbraith wrote:
> > On Wed, 2014-04-09 at 15:19 -0400, Steven Rostedt wrote:
> >
> > > If you have any benchmark on large machines I would be very happy if
> > > you could test this patch against the unpatched version of -rt.
> >
> > Too bad I don't have (and know how to use) specjbb.
> >
> > I dug up old vmark, thinking I'd be able to get some halfway useful
> > relative numbers from it, but that was a waste of a day. The thing
> > performs so badly on 40 core box that rt _beats_ nopreempt, and after 2
> > nodes, you're going backward. 40 Westmere EX cores does a whopping ~2.5
> > * dinky old Q6600 box throughput.
>
> P.S. What I didn't see was any sign of a delta patched/unpatched..
> which may mean nothing at all :-/

Actually that at least means it didn't cause any regressions for you.

-- Steve

2014-04-10 14:45:14

by Clark Williams

[permalink] [raw]
Subject: Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems

On Wed, 9 Apr 2014 15:19:22 -0400
Steven Rostedt <[email protected]> wrote:


> This patch is built on top of the two other patches that I posted
> earlier, which should not be as controversial.
>
> If you have any benchmark on large machines I would be very happy if
> you could test this patch against the unpatched version of -rt.
>
> Cheers,
>
> -- Steve
>

Steven

I wrote a program named whack_mmap_sem which creates a large (4GB)
buffer, then creates 2 x ncpus threads that are affined across all the
available cpus. These threads then randomly write into the buffer,
which should cause page faults galore.

I then built the following kernel configs:

vanilla-3.13.15 - no RT patches applied
rt-3.12.15 - PREEMPT_RT patchset
rt-3.12.15-fixes - PREEMPT_RT + rwsem fixes
rt-3.12.15-multi - PREEMPT_RT + rwsem fixes + rwsem-multi patch

My test h/w was a Dell R520 with a 6-core Intel(R) Xeon(R) CPU E5-2430
0 @ 2.20GHz (hyperthreaded). So whack_mmap_sem created 24 threads
which all partied in the 4GB address range.

I ran whack_mmap_sem with the argument -w 100000 which means each
thread does 100k writes to random locations inside the buffer and then
did five runs per each kernel. At the end of the run whack_mmap_sem
prints out the time of the run in microseconds.

The means of each group of five test runs are:

vanilla.log: 1210117
rt.log: 17210953 (14.2 x slower than vanilla)
rt-fixes.log: 10062027 (8.3 x slower than vanilla)
rt-multi.log: 3179582 (2.x x slower than vanilla)


As expected, vanilla kicked RT's butt when hammering on the
mmap_sem. But somewhat unexpectedly, your fixups helped quite a bit
and the multi+fixups got RT back into being almost respectable.

Obviously these are just preliminary results on one piece of h/w but
it looks promising.

Clark

P.S. If you want the source to whack_mmap_sem you can grab it from
http://people.redhat.com/williams/whack_mmap_sem. If it looks like its
worth keeping around I can add it to rt-tests.


Attachments:
signature.asc (198.00 B)

2014-04-10 15:01:56

by Steven Rostedt

[permalink] [raw]
Subject: Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems

On Thu, 10 Apr 2014 09:44:30 -0500
Clark Williams <[email protected]> wrote:

> I wrote a program named whack_mmap_sem which creates a large (4GB)
> buffer, then creates 2 x ncpus threads that are affined across all the
> available cpus. These threads then randomly write into the buffer,
> which should cause page faults galore.
>
> I then built the following kernel configs:
>
> vanilla-3.13.15 - no RT patches applied

vanilla-3.*12*.15?

> rt-3.12.15 - PREEMPT_RT patchset
> rt-3.12.15-fixes - PREEMPT_RT + rwsem fixes
> rt-3.12.15-multi - PREEMPT_RT + rwsem fixes + rwsem-multi patch
>
> My test h/w was a Dell R520 with a 6-core Intel(R) Xeon(R) CPU E5-2430
> 0 @ 2.20GHz (hyperthreaded). So whack_mmap_sem created 24 threads
> which all partied in the 4GB address range.
>
> I ran whack_mmap_sem with the argument -w 100000 which means each
> thread does 100k writes to random locations inside the buffer and then
> did five runs per each kernel. At the end of the run whack_mmap_sem
> prints out the time of the run in microseconds.
>
> The means of each group of five test runs are:
>
> vanilla.log: 1210117
> rt.log: 17210953 (14.2 x slower than vanilla)
> rt-fixes.log: 10062027 (8.3 x slower than vanilla)
> rt-multi.log: 3179582 (2.x x slower than vanilla)
>
>
> As expected, vanilla kicked RT's butt when hammering on the
> mmap_sem. But somewhat unexpectedly, your fixups helped quite a bit

That doesn't surprise me too much. As I removed the check for the
nesting, which also shrunk the size of the rwsem itself (removed the
read_depth from the struct). This itself can give a bonus boost.

Now the question is, how much will this affect real use case scenarios?

-- Steve


> and the multi+fixups got RT back into being almost respectable.
>

Subject: Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems

On 04/10/2014 04:44 PM, Clark Williams wrote:
> The means of each group of five test runs are:
>
> vanilla.log: 1210117 rt.log: 17210953 (14.2 x slower than
> vanilla) rt-fixes.log: 10062027 (8.3 x slower than vanilla)
> rt-multi.log: 3179582 (2.x x slower than vanilla)
>
>
> As expected, vanilla kicked RT's butt when hammering on the
> mmap_sem. But somewhat unexpectedly, your fixups helped quite a
> bit and the multi+fixups got RT back into being almost
> respectable.
>
> Obviously these are just preliminary results on one piece of h/w
> but it looks promising.

Is it easy to look at the latency when you have multiple readers and
and a high prio writer which has to boost all those readers away
instead just one?
Or is this something that should not happen for a high prio RT task
because it has all memory already allocated?

>
> Clark

Sebastian

2014-04-10 15:38:45

by Steven Rostedt

[permalink] [raw]
Subject: Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems

On Thu, 10 Apr 2014 17:03:36 +0200
Sebastian Andrzej Siewior <[email protected]> wrote:

> On 04/10/2014 04:44 PM, Clark Williams wrote:
> > The means of each group of five test runs are:
> >
> > vanilla.log: 1210117 rt.log: 17210953 (14.2 x slower than
> > vanilla) rt-fixes.log: 10062027 (8.3 x slower than vanilla)
> > rt-multi.log: 3179582 (2.x x slower than vanilla)
> >
> >
> > As expected, vanilla kicked RT's butt when hammering on the
> > mmap_sem. But somewhat unexpectedly, your fixups helped quite a
> > bit and the multi+fixups got RT back into being almost
> > respectable.
> >
> > Obviously these are just preliminary results on one piece of h/w
> > but it looks promising.
>
> Is it easy to look at the latency when you have multiple readers and
> and a high prio writer which has to boost all those readers away
> instead just one?
> Or is this something that should not happen for a high prio RT task
> because it has all memory already allocated?

Note, the patch includes a /proc/sys/kernel/rwsem_reader_limit that is
default value set at possible_cpus. The user can change it to any
number. A number of zero means unlimited, and a number of 1 makes it
work like it did without the patch.

This means that a writer will only have to wait for rwsem_reader_limit
readers, which may give a longer latency, but it is still bounded. Also
note that the rwsem is not fair respect to first come first served, but
priority driven. Same priority tasks may be first in first out, but if
there's a writer waiting and a higher priority reader comes along, it
can still get the lock, making the writer wait longer. But as the
reader is higher priority than the writer, that is expected. The same
priority reader will block if there's a writer waiting.

Clark, it would be interesting if you run the test again with my
patches but set rwsem_reader_limit to 1, and see what the impact of
that is.

-- Steve

2014-04-10 17:42:23

by Steven Rostedt

[permalink] [raw]
Subject: [RFC PATCH RT V2] rwsem: The return of multi-reader PI rwsems

OK, I added rt_rw_limit == 0 being unlimited as the last change and
never tested it. Clark did, and found that it locked up the system.
That's because it zero wasn't properly checked at all locations and in
some cases it could never grab the lock for read.

I fixed that with this patch. And tested it too (that's a bonus).

Signed-off-by: Steven Rostedt <[email protected]>

Index: linux-rt.git/kernel/rtmutex.c
===================================================================
--- linux-rt.git.orig/kernel/rtmutex.c
+++ linux-rt.git/kernel/rtmutex.c
@@ -26,6 +26,21 @@
#include "rtmutex_common.h"

/*
+ * rt_rw_limit is the number of simultaneous readers of a rwsem lock.
+ *
+ * rt_rw_limit gets updated on boot up to the number of
+ * possible CPUs, but we need to initialize it to something other
+ * than zero.
+ */
+unsigned rt_rw_limit = NR_CPUS;
+
+/* cnt == 0 means unlimited */
+static inline int under_rt_rw_limit(int cnt)
+{
+ return !cnt || cnt < rt_rw_limit;
+}
+
+/*
* lock->owner state tracking:
*
* lock->owner holds the task_struct pointer of the owner. Bit 0
@@ -110,19 +125,44 @@ static inline void init_lists(struct rt_
plist_head_init(&lock->wait_list);
}

+static inline void init_rw_lists(struct rt_rw_mutex *rwlock)
+{
+ struct rt_mutex *lock = &rwlock->mutex;
+
+ /*
+ * A rwsem priority is initialized to -1 and never will
+ * be that again.
+ */
+ if (unlikely(rwlock->prio < 0)) {
+ rwlock->prio = MAX_PRIO;
+ init_lists(lock);
+ }
+}
+
+static int rt_mutex_get_readers_prio(struct task_struct *task, int prio);
+static inline int task_has_reader_locks(struct task_struct *task);
+
/*
* Calculate task priority from the waiter list priority
*
* Return task->normal_prio when the waiter list is empty or when
* the waiter is not allowed to do priority boosting
+ *
+ * On PREEMPT_RT, we also check the priorities of the list
+ * of read locks that the task holds.
*/
int rt_mutex_getprio(struct task_struct *task)
{
- if (likely(!task_has_pi_waiters(task)))
- return task->normal_prio;
+ int prio = task->normal_prio;
+
+ if (likely(!task_has_pi_waiters(task) &&
+ !task_has_reader_locks(task)))
+ return prio;

- return min(task_top_pi_waiter(task)->pi_list_entry.prio,
- task->normal_prio);
+ if (task_has_reader_locks(task))
+ prio = rt_mutex_get_readers_prio(task, prio);
+
+ return min(task_top_pi_waiter(task)->pi_list_entry.prio, prio);
}

/*
@@ -181,6 +221,11 @@ static void rt_mutex_wake_waiter(struct
*/
int max_lock_depth = 1024;

+static int rt_mutex_adjust_readers(struct rt_mutex *orig_lock,
+ struct rt_mutex_waiter *orig_waiter,
+ struct task_struct *top_task,
+ struct rt_mutex *lock,
+ int recursion_depth);
/*
* Adjust the priority chain. Also used for deadlock detection.
* Decreases task's usage by one - may thus free the task.
@@ -203,7 +248,8 @@ static int rt_mutex_adjust_prio_chain(st
int deadlock_detect,
struct rt_mutex *orig_lock,
struct rt_mutex_waiter *orig_waiter,
- struct task_struct *top_task)
+ struct task_struct *top_task,
+ int recursion_depth)
{
struct rt_mutex *lock;
struct rt_mutex_waiter *waiter, *top_waiter = orig_waiter;
@@ -316,6 +362,18 @@ static int rt_mutex_adjust_prio_chain(st

/* Grab the next task */
task = rt_mutex_owner(lock);
+
+ /*
+ * Readers are special. We may need to boost more than one owner.
+ */
+ if (unlikely(task == RT_RW_READER)) {
+ ret = rt_mutex_adjust_readers(orig_lock, orig_waiter,
+ top_task, lock,
+ recursion_depth);
+ raw_spin_unlock(&lock->wait_lock);
+ goto out;
+ }
+
get_task_struct(task);
raw_spin_lock_irqsave(&task->pi_lock, flags);

@@ -349,7 +407,7 @@ static int rt_mutex_adjust_prio_chain(st
raw_spin_unlock_irqrestore(&task->pi_lock, flags);
out_put_task:
put_task_struct(task);
-
+ out:
return ret;
}

@@ -518,6 +576,13 @@ static int task_blocks_on_rt_mutex(struc
return 0;

if (waiter == rt_mutex_top_waiter(lock)) {
+ /* readers are handled differently */
+ if (unlikely(owner == RT_RW_READER)) {
+ res = rt_mutex_adjust_readers(lock, waiter,
+ current, lock, 0);
+ return res;
+ }
+
raw_spin_lock_irqsave(&owner->pi_lock, flags);
plist_del(&top_waiter->pi_list_entry, &owner->pi_waiters);
plist_add(&waiter->pi_list_entry, &owner->pi_waiters);
@@ -527,7 +592,8 @@ static int task_blocks_on_rt_mutex(struc
chain_walk = 1;
raw_spin_unlock_irqrestore(&owner->pi_lock, flags);
}
- else if (debug_rt_mutex_detect_deadlock(waiter, detect_deadlock))
+ else if (debug_rt_mutex_detect_deadlock(waiter, detect_deadlock) &&
+ owner != RT_RW_READER)
chain_walk = 1;

if (!chain_walk)
@@ -543,7 +609,7 @@ static int task_blocks_on_rt_mutex(struc
raw_spin_unlock(&lock->wait_lock);

res = rt_mutex_adjust_prio_chain(owner, detect_deadlock, lock, waiter,
- task);
+ task, 0);

raw_spin_lock(&lock->wait_lock);

@@ -633,7 +699,7 @@ static void remove_waiter(struct rt_mute

raw_spin_unlock(&lock->wait_lock);

- rt_mutex_adjust_prio_chain(owner, 0, lock, NULL, current);
+ rt_mutex_adjust_prio_chain(owner, 0, lock, NULL, current, 0);

raw_spin_lock(&lock->wait_lock);
}
@@ -660,7 +726,7 @@ void rt_mutex_adjust_pi(struct task_stru
/* gets dropped in rt_mutex_adjust_prio_chain()! */
get_task_struct(task);
raw_spin_unlock_irqrestore(&task->pi_lock, flags);
- rt_mutex_adjust_prio_chain(task, 0, NULL, NULL, task);
+ rt_mutex_adjust_prio_chain(task, 0, NULL, NULL, task, 0);
}

#ifdef CONFIG_PREEMPT_RT_FULL
@@ -739,7 +805,7 @@ static void noinline __sched rt_spin_lo
struct rt_mutex_waiter waiter, *top_waiter;
int ret;

- rt_mutex_init_waiter(&waiter, true);
+ rt_mutex_init_waiter(&waiter, true, false);

raw_spin_lock(&lock->wait_lock);
init_lists(lock);
@@ -1001,15 +1067,564 @@ __mutex_lock_check_stamp(struct rt_mutex

return 0;
}
+
+/*
+ * Wake up the next waiter on a rw lock.
+ *
+ * Similar to wakeup_next_waiter() but the waiter is not on
+ * the owner's pi_waiters. Also, it does not reset the lock
+ * owner.
+ *
+ * Called with lock->wait_lock held.
+ */
+static void wakeup_next_rw_waiter(struct rt_mutex *lock)
+{
+ struct rt_mutex_waiter *waiter;
+
+ waiter = rt_mutex_top_waiter(lock);
+ rt_mutex_wake_waiter(waiter);
+}
+
+/* Called with rwmutex->mutex.wait_lock held */
+static inline void
+rt_rw_mutex_take_as_reader(struct task_struct *task,
+ struct rt_rw_mutex *rwmutex,
+ struct rt_mutex_waiter *waiter)
+{
+ struct reader_lock_struct *rls;
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ unsigned long flags;
+ bool wakeup = false;
+
+ rwmutex->nr_owners++;
+ rt_mutex_set_owner(&rwmutex->mutex, RT_RW_READER);
+
+ if (waiter || rt_mutex_has_waiters(mutex)) {
+ unsigned long flags;
+ struct rt_mutex_waiter *top;
+
+ raw_spin_lock_irqsave(&task->pi_lock, flags);
+
+ /* remove the queued waiter. */
+ if (waiter) {
+ plist_del(&waiter->list_entry, &mutex->wait_list);
+ task->pi_blocked_on = NULL;
+ }
+
+ /*
+ * Initialize the rwmutex prio to the priority of
+ * the top waiter.
+ */
+ if (rt_mutex_has_waiters(mutex)) {
+ top = rt_mutex_top_waiter(mutex);
+ top->pi_list_entry.prio = top->list_entry.prio;
+ /*
+ * Readers set the lock priority for faster access
+ * to read the priorities of the locks it owns
+ * when boosting. This helps to not have to take
+ * the pi_lock of the task. The rwmutex->prio
+ * is protected by the rwmutex->mutex.wait_lock,
+ * which is held during boosting.
+ */
+ rwmutex->prio = top->list_entry.prio;
+
+ /*
+ * If this waiter is a reader, and the reader limit
+ * has not been hit, then we can wake this waiter
+ * up too.
+ */
+ if (!top->writer && under_rt_rw_limit(rwmutex->nr_owners))
+ wakeup = true;
+ } else
+ rwmutex->prio = MAX_PRIO;
+ raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+ } else
+ rwmutex->prio = MAX_PRIO;
+
+ /*
+ * It is possible to have holes in the owned_read_locks array.
+ * If we take read lock A and then B, but then release A, we
+ * can't move the pointer of B because if something blocks on
+ * B, it can use the B pointer to boost this task. B can only
+ * be moved, by owning the wait_list lock of B. Remember, the
+ * B lock has its pointers to that index of our array.
+ */
+ rls = &task->owned_read_locks[task->reader_lock_free];
+ BUG_ON(rls->lock);
+
+ /*
+ * Grabing the pi_lock here as other tasks can boost this
+ * lock via other held locks, and if free lock descriptor
+ * was not at the end of the owned_read_locks array, then
+ * it might get confused if it sees this descriptor with
+ * a lock and no task.
+ */
+ raw_spin_lock_irqsave(&task->pi_lock, flags);
+ rls->lock = rwmutex;
+ rls->task = task;
+ list_add(&rls->list, &rwmutex->owners);
+ raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+ if (task->reader_lock_free == task->reader_lock_count) {
+ /*
+ * If we nest too deep, then this task can never get the lock.
+ * This task will then block for good. Warn about this and
+ * hopefully, people will notice the warning. If not, they will notice
+ * the dead task. Maybe this should be a BUG_ON()
+ */
+ if (WARN_ON(task->reader_lock_count == MAX_RWLOCK_DEPTH))
+ return;
+
+ /*
+ * We don't need to take the pi_lock here as we have the
+ * wait_lock of the lock at the end of the list. If this task
+ * was being boosted by a task blocked on this lock, it would
+ * need to grab the wait_lock before boosting this task.
+ */
+ task->reader_lock_count++;
+ task->reader_lock_free++;
+ } else {
+ /*
+ * Find the next free lock in array. Again, we do not need
+ * to grab the pi_lock because the boosting doesn't use
+ * the reader_lock_free variable, which is the only thing
+ * we are updating here.
+ */
+ do {
+ rls = &task->owned_read_locks[++task->reader_lock_free];
+ } while (rls->lock &&
+ task->reader_lock_free < task->reader_lock_count);
+ }
+
+ if (wakeup)
+ wakeup_next_rw_waiter(&rwmutex->mutex);
+}
+
+static int try_to_take_rw_read(struct rt_rw_mutex *rwmutex,
+ struct rt_mutex_waiter *waiter)
+{
+ struct task_struct *task = current;
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ struct task_struct *owner;
+
+ assert_raw_spin_locked(&mutex->wait_lock);
+
+ /* The writer unlock can use the fast path */
+ mark_rt_mutex_waiters(mutex);
+
+ /* If we are at the reader limit, we can't take the lock */
+ if (unlikely(!under_rt_rw_limit(rwmutex->nr_owners)))
+ return 0;
+
+ /*
+ * If there's no waiters, and there is no owner or
+ * the owner is a reader, we get the lock.
+ * Note, if the waiter or pending bits are set in the owner
+ * then we need to do more checks.
+ */
+ if (likely(!mutex->owner || mutex->owner == RT_RW_READER))
+ goto taken;
+
+ owner = rt_mutex_owner(mutex);
+
+ /* If the lock is owned by a task, then it is a writer */
+ if (owner && owner != RT_RW_READER)
+ return 0;
+
+ /*
+ * A writer or a reader may be waiting. In either case, we
+ * may still be able to steal the lock. The RT rwsems are
+ * not fair if the new reader comes in and is higher priority
+ * than all the waiters.
+ */
+ if (likely(rt_mutex_has_waiters(mutex))) {
+ struct task_struct *pown = rt_mutex_top_waiter(mutex)->task;
+
+ if (task != pown && !lock_is_stealable(task, pown, STEAL_NORMAL))
+ return 0;
+ }
+
+ taken:
+ rt_rw_mutex_take_as_reader(task, rwmutex, waiter);
+ return 1;
+}
+
+void rt_rw_mutex_read_lock(struct rt_rw_mutex *rwmutex)
+{
+ struct rt_mutex_waiter waiter;
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ int ret;
+
+ raw_spin_lock(&mutex->wait_lock);
+ init_rw_lists(rwmutex);
+
+ if (likely(try_to_take_rw_read(rwmutex, NULL))) {
+ /* Got the lock */
+ raw_spin_unlock(&mutex->wait_lock);
+ return;
+ }
+
+ rt_mutex_init_waiter(&waiter, false, false);
+
+ set_current_state(TASK_UNINTERRUPTIBLE);
+
+ ret = task_blocks_on_rt_mutex(mutex, &waiter, current, 0);
+ BUG_ON(ret);
+
+ for (;;) {
+ /* Try to acquire the lock: */
+ if (try_to_take_rw_read(rwmutex, &waiter))
+ break;
+
+ raw_spin_unlock(&mutex->wait_lock);
+
+ debug_rt_mutex_print_deadlock(waiter);
+
+ schedule_rt_mutex(mutex);
+
+ raw_spin_lock(&mutex->wait_lock);
+ set_current_state(TASK_UNINTERRUPTIBLE);
+ }
+
+ set_current_state(TASK_RUNNING);
+
+ raw_spin_unlock(&mutex->wait_lock);
+
+ debug_rt_mutex_free_waiter(&waiter);
+}
+
+int rt_rw_mutex_read_trylock(struct rt_rw_mutex *rwmutex)
+{
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ int ret = 0;
+
+ if (!raw_spin_trylock(&mutex->wait_lock))
+ return ret;
+
+ init_rw_lists(rwmutex);
+
+ if (try_to_take_rw_read(rwmutex, NULL))
+ ret = 1;
+
+ raw_spin_unlock(&mutex->wait_lock);
+
+ return ret;
+}
+
+static int
+try_to_take_rw_write(struct rt_rw_mutex *rwmutex, struct rt_mutex_waiter *waiter)
+{
+ struct rt_mutex *mutex = &rwmutex->mutex;
+
+ /* Writers block if there's any readers owning the lock */
+ if (rwmutex->nr_owners)
+ return 0;
+
+ /* No readers, then we can try to take the mutex normally. */
+ return try_to_take_rt_mutex(mutex, current, waiter);
+}
+
+void rt_rw_mutex_write_lock(struct rt_rw_mutex *rwmutex)
+{
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ struct rt_mutex_waiter waiter;
+ struct task_struct *task = current;
+ int ret;
+
+ raw_spin_lock(&mutex->wait_lock);
+ init_rw_lists(rwmutex);
+
+ if (try_to_take_rw_write(rwmutex, NULL)) {
+ raw_spin_unlock(&mutex->wait_lock);
+ return;
+ }
+
+ /* Writers wake up differently than readers (flag it) */
+ rt_mutex_init_waiter(&waiter, false, true);
+
+ ret = task_blocks_on_rt_mutex(mutex, &waiter, task, 0);
+ BUG_ON(ret);
+
+ set_current_state(TASK_UNINTERRUPTIBLE);
+
+ for (;;) {
+ /* Try to acquire the lock: */
+ if (try_to_take_rw_write(rwmutex, &waiter))
+ break;
+
+ raw_spin_unlock(&mutex->wait_lock);
+
+ debug_rt_mutex_print_deadlock(waiter);
+
+ schedule_rt_mutex(mutex);
+
+ raw_spin_lock(&mutex->wait_lock);
+ set_current_state(TASK_UNINTERRUPTIBLE);
+ }
+
+ set_current_state(TASK_RUNNING);
+
+ raw_spin_unlock(&mutex->wait_lock);
+ WARN_ONCE(rwmutex->nr_owners, "Writer has lock with readers");
+
+ debug_rt_mutex_free_waiter(&waiter);
+}
+
+int rt_rw_mutex_write_trylock(struct rt_rw_mutex *rwmutex)
+{
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ int ret = 0;
+
+ if (!raw_spin_trylock(&mutex->wait_lock))
+ return ret;
+
+ init_rw_lists(rwmutex);
+
+ if (try_to_take_rw_write(rwmutex, NULL))
+ ret = 1;
+
+ raw_spin_unlock(&mutex->wait_lock);
+
+ return ret;
+}
+
+/*
+ * When a reader lock is released, see if the reader_lock_count
+ * can be moved back.
+ */
+static void shrink_reader_lock_array(struct task_struct *task)
+{
+ struct reader_lock_struct *read_locks = task->owned_read_locks;
+
+ while (task->reader_lock_count &&
+ read_locks[task->reader_lock_count - 1].lock == NULL)
+ task->reader_lock_count--;
+
+ if (task->reader_lock_free > task->reader_lock_count)
+ task->reader_lock_free = task->reader_lock_count;
+}
+
+void rt_rw_mutex_read_unlock(struct rt_rw_mutex *rwmutex)
+{
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ struct rt_mutex_waiter *waiter;
+ struct reader_lock_struct *rls;
+ struct task_struct *task = current;
+ unsigned long flags;
+ int readers;
+ int i;
+
+ raw_spin_lock(&mutex->wait_lock);
+
+ rt_mutex_deadlock_account_unlock(current);
+
+ WARN_ON_ONCE(!rwmutex->nr_owners);
+
+ rwmutex->nr_owners--;
+
+ if (rt_mutex_has_waiters(mutex)) {
+ /*
+ * If the top waiter is a reader and we are under
+ * the limit, or the top waiter is a writer and
+ * there's no more readers, then we can give the
+ * top waiter pending ownership and wake it up.
+ *
+ * To simplify things, we only wake up one task, even
+ * if its a reader. If the reader wakes up and gets the
+ * lock, it will look to see if it can wake up the next
+ * waiter, and so on. This way we only need to worry about
+ * one task at a time.
+ */
+ waiter = rt_mutex_top_waiter(mutex);
+ readers = rwmutex->nr_owners;
+ if ((waiter->writer && !readers) ||
+ (!waiter->writer && under_rt_rw_limit(readers)))
+ wakeup_next_rw_waiter(mutex);
+ } else
+ rwmutex->prio = MAX_PRIO;
+
+ if (!rwmutex->nr_owners)
+ rt_mutex_set_owner(&rwmutex->mutex, NULL);
+
+ raw_spin_lock_irqsave(&task->pi_lock, flags);
+ /* Remove the lock from this tasks list */
+ for (i = task->reader_lock_count - 1; i >= 0; i--) {
+ rls = &task->owned_read_locks[i];
+
+ if (rls->lock == rwmutex) {
+ rls->lock = NULL;
+ list_del_init(&rls->list);
+ /* Shrink the array if we can. */
+ if (i == task->reader_lock_count - 1)
+ shrink_reader_lock_array(task);
+ else if (i < task->reader_lock_free)
+ task->reader_lock_free = i;
+ break;
+ }
+ }
+ raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+ WARN_ON_ONCE(i < 0);
+
+ raw_spin_unlock(&mutex->wait_lock);
+
+ /* Undo pi boosting if necessary */
+ rt_mutex_adjust_prio(current);
+}
+
+void rt_rw_mutex_downgrade_write(struct rt_rw_mutex *rwmutex)
+{
+ struct task_struct *task = current;
+ struct rt_mutex *mutex = &rwmutex->mutex;
+
+ raw_spin_lock(&mutex->wait_lock);
+
+ /*
+ * Writers have normal pi with other tasks blocked
+ * on the lock. That is, the top waiter will be in the
+ * pi_list of this task. But for readers, waiters of
+ * the lock are not included in the pi_list, only the
+ * locks are. We must remove the top waiter of this
+ * lock from this task.
+ */
+ if (rt_mutex_has_waiters(mutex)) {
+ struct rt_mutex_waiter *waiter;
+ unsigned long flags;
+
+ waiter = rt_mutex_top_waiter(mutex);
+ raw_spin_lock_irqsave(&task->pi_lock, flags);
+ plist_del(&waiter->pi_list_entry, &task->pi_waiters);
+ /*
+ * The rt_rw_mutex_take_as_reader() will update
+ * the rwmutex prio.
+ */
+ raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+ }
+
+ WARN_ONCE(rwmutex->nr_owners, "Writer owned with readers");
+
+ rt_rw_mutex_take_as_reader(task, rwmutex, NULL);
+
+ raw_spin_unlock(&mutex->wait_lock);
+}
+
+static inline int task_has_reader_locks(struct task_struct *task)
+{
+ return task->reader_lock_count;
+}
+
+static int rt_mutex_get_readers_prio(struct task_struct *task, int prio)
+{
+ struct reader_lock_struct *rls;
+ struct rt_rw_mutex *rwmutex;
+ int lock_prio;
+ int i;
+
+ for (i = 0; i < task->reader_lock_count; i++) {
+ rls = &task->owned_read_locks[i];
+ rwmutex = rls->lock;
+ if (!rwmutex)
+ continue;
+ lock_prio = rwmutex->prio;
+ if (prio > lock_prio)
+ prio = lock_prio;
+ }
+ WARN_ON_ONCE(prio < 0);
+
+ return prio;
+}
+
+/* Expects to be called with lock->wait_lock held */
+static int rt_mutex_adjust_readers(struct rt_mutex *orig_lock,
+ struct rt_mutex_waiter *orig_waiter,
+ struct task_struct *top_task,
+ struct rt_mutex *lock,
+ int recursion_depth)
+{
+ struct reader_lock_struct *rls;
+ struct rt_mutex_waiter *waiter;
+ struct task_struct *task;
+ struct rt_rw_mutex *rwmutex = container_of(lock, struct rt_rw_mutex, mutex);
+ unsigned long flags;
+
+ /* Update the rwmutex's prio */
+ if (rt_mutex_has_waiters(lock)) {
+ waiter = rt_mutex_top_waiter(lock);
+ /*
+ * Do we need to grab the task->pi_lock?
+ * Really, we are only reading it. If it
+ * changes, then that should follow this chain
+ * too.
+ */
+ rwmutex->prio = waiter->task->prio;
+ } else
+ rwmutex->prio = MAX_PRIO;
+
+ if (recursion_depth >= MAX_RWLOCK_DEPTH) {
+ WARN_ON(1);
+ return 1;
+ }
+
+ list_for_each_entry(rls, &rwmutex->owners, list) {
+ bool skip = false;
+
+ task = rls->task;
+
+ raw_spin_lock_irqsave(&task->pi_lock, flags);
+ __rt_mutex_adjust_prio(task);
+ /*
+ * We need to grab the pi_lock to adjust the task prio
+ * might as well use this to check if the task is blocked
+ * as well, and save on a call to the prio chain that will
+ * just grab the lock again and do the test.
+ */
+ if (!rt_mutex_real_waiter(task->pi_blocked_on))
+ skip = true;
+ raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+ if (skip)
+ continue;
+
+ get_task_struct(task);
+ /*
+ * rt_mutex_adjust_prio_chain will do
+ * the put_task_struct
+ */
+ rt_mutex_adjust_prio_chain(task, 0, orig_lock,
+ orig_waiter, top_task,
+ recursion_depth+1);
+ }
+
+ return 0;
+}
#else
+static int rt_mutex_adjust_readers(struct rt_mutex *orig_lock,
+ struct rt_mutex_waiter *orig_waiter,
+ struct task_struct *top_task,
+ struct rt_mutex *lock,
+ int recursion_depth)
+{
+ return 0;
+}
+
+static int rt_mutex_get_readers_prio(struct task_struct *task, int prio)
+{
+ return prio;
+}
+
+static inline int task_has_reader_locks(struct task_struct *task)
+{
+ return 0;
+}
+
static inline int __sched
__mutex_lock_check_stamp(struct rt_mutex *lock, struct ww_acquire_ctx *ctx)
{
BUG();
return 0;
}
-
-#endif
+#endif /* CONFIG_PREEMPT_RT */

/**
* __rt_mutex_slowlock() - Perform the wait-wake-try-to-take loop
@@ -1154,7 +1769,7 @@ rt_mutex_slowlock(struct rt_mutex *lock,
struct rt_mutex_waiter waiter;
int ret = 0;

- rt_mutex_init_waiter(&waiter, false);
+ rt_mutex_init_waiter(&waiter, false, false);

raw_spin_lock(&lock->wait_lock);
init_lists(lock);
@@ -1718,4 +2333,12 @@ void __sched ww_mutex_unlock(struct ww_m
rt_mutex_unlock(&lock->base.lock);
}
EXPORT_SYMBOL(ww_mutex_unlock);
+
+static int __init rt_rw_limit_init(void)
+{
+ rt_rw_limit = num_possible_cpus();
+ return 0;
+}
+core_initcall(rt_rw_limit_init);
+
#endif
Index: linux-rt.git/kernel/rtmutex_common.h
===================================================================
--- linux-rt.git.orig/kernel/rtmutex_common.h
+++ linux-rt.git/kernel/rtmutex_common.h
@@ -43,6 +43,7 @@ extern void schedule_rt_mutex_test(struc
* @list_entry: pi node to enqueue into the mutex waiters list
* @pi_list_entry: pi node to enqueue into the mutex owner waiters list
* @task: task reference to the blocked task
+ * @writer: true if its a rwsem writer that is blocked
*/
struct rt_mutex_waiter {
struct plist_node list_entry;
@@ -50,6 +51,7 @@ struct rt_mutex_waiter {
struct task_struct *task;
struct rt_mutex *lock;
bool savestate;
+ bool writer;
#ifdef CONFIG_DEBUG_RT_MUTEXES
unsigned long ip;
struct pid *deadlock_task_pid;
@@ -101,6 +103,20 @@ static inline struct task_struct *rt_mut
((unsigned long)lock->owner & ~RT_MUTEX_OWNER_MASKALL);
}

+/* used as reader owner of the mutex */
+#define RT_RW_READER (struct task_struct *)0x100
+
+#ifdef CONFIG_PREEMPT_RT_FULL
+
+void rt_rw_mutex_read_unlock(struct rt_rw_mutex *rwmutex);
+void rt_rw_mutex_downgrade_write(struct rt_rw_mutex *rwmutex);
+int rt_rw_mutex_write_trylock(struct rt_rw_mutex *rwmutex);
+void rt_rw_mutex_write_lock(struct rt_rw_mutex *rwmutex);
+int rt_rw_mutex_read_trylock(struct rt_rw_mutex *rwmutex);
+void rt_rw_mutex_read_lock(struct rt_rw_mutex *rwmutex);
+
+#endif /* CONFIG_PREEMPT_RT */
+
/*
* PI-futex support (proxy locking functions, etc.):
*/
@@ -128,11 +144,12 @@ extern int rt_mutex_finish_proxy_lock(st
#endif

static inline void
-rt_mutex_init_waiter(struct rt_mutex_waiter *waiter, bool savestate)
+rt_mutex_init_waiter(struct rt_mutex_waiter *waiter, bool savestate, bool writer)
{
debug_rt_mutex_init_waiter(waiter);
waiter->task = NULL;
waiter->savestate = savestate;
+ waiter->writer = writer;
}

#endif
Index: linux-rt.git/kernel/rt.c
===================================================================
--- linux-rt.git.orig/kernel/rt.c
+++ linux-rt.git/kernel/rt.c
@@ -310,14 +310,19 @@ EXPORT_SYMBOL(__rt_rwlock_init);
void rt_up_write(struct rw_semaphore *rwsem)
{
rwsem_release(&rwsem->dep_map, 1, _RET_IP_);
- rt_mutex_unlock(&rwsem->lock);
+ /*
+ * Unlocking a write is the same as unlocking the mutex.
+ * The woken reader will do all the work if it needs to
+ * wake up more than one reader.
+ */
+ __rt_spin_unlock(&rwsem->rwlock.mutex);
}
EXPORT_SYMBOL(rt_up_write);

void rt_up_read(struct rw_semaphore *rwsem)
{
rwsem_release(&rwsem->dep_map, 1, _RET_IP_);
- rt_mutex_unlock(&rwsem->lock);
+ rt_rw_mutex_read_unlock(&rwsem->rwlock);
}
EXPORT_SYMBOL(rt_up_read);

@@ -327,13 +332,14 @@ EXPORT_SYMBOL(rt_up_read);
*/
void rt_downgrade_write(struct rw_semaphore *rwsem)
{
- BUG_ON(rt_mutex_owner(&rwsem->lock) != current);
+ BUG_ON(rt_mutex_owner(&rwsem->rwlock.mutex) != current);
+ rt_rw_mutex_downgrade_write(&rwsem->rwlock);
}
EXPORT_SYMBOL(rt_downgrade_write);

int rt_down_write_trylock(struct rw_semaphore *rwsem)
{
- int ret = rt_mutex_trylock(&rwsem->lock);
+ int ret = rt_rw_mutex_write_trylock(&rwsem->rwlock);

if (ret)
rwsem_acquire(&rwsem->dep_map, 0, 1, _RET_IP_);
@@ -344,14 +350,14 @@ EXPORT_SYMBOL(rt_down_write_trylock);
void rt_down_write(struct rw_semaphore *rwsem)
{
rwsem_acquire(&rwsem->dep_map, 0, 0, _RET_IP_);
- rt_mutex_lock(&rwsem->lock);
+ rt_rw_mutex_write_lock(&rwsem->rwlock);
}
EXPORT_SYMBOL(rt_down_write);

void rt_down_write_nested(struct rw_semaphore *rwsem, int subclass)
{
rwsem_acquire(&rwsem->dep_map, subclass, 0, _RET_IP_);
- rt_mutex_lock(&rwsem->lock);
+ rt_rw_mutex_write_lock(&rwsem->rwlock);
}
EXPORT_SYMBOL(rt_down_write_nested);

@@ -359,14 +365,14 @@ void rt_down_write_nested_lock(struct rw
struct lockdep_map *nest)
{
rwsem_acquire_nest(&rwsem->dep_map, 0, 0, nest, _RET_IP_);
- rt_mutex_lock(&rwsem->lock);
+ rt_rw_mutex_write_lock(&rwsem->rwlock);
}

int rt_down_read_trylock(struct rw_semaphore *rwsem)
{
int ret;

- ret = rt_mutex_trylock(&rwsem->lock);
+ ret = rt_rw_mutex_read_trylock(&rwsem->rwlock);
if (ret)
rwsem_acquire(&rwsem->dep_map, 0, 1, _RET_IP_);

@@ -377,7 +383,7 @@ EXPORT_SYMBOL(rt_down_read_trylock);
static void __rt_down_read(struct rw_semaphore *rwsem, int subclass)
{
rwsem_acquire(&rwsem->dep_map, subclass, 0, _RET_IP_);
- rt_mutex_lock(&rwsem->lock);
+ rt_rw_mutex_read_lock(&rwsem->rwlock);
}

void rt_down_read(struct rw_semaphore *rwsem)
@@ -402,7 +408,8 @@ void __rt_rwsem_init(struct rw_semaphor
debug_check_no_locks_freed((void *)rwsem, sizeof(*rwsem));
lockdep_init_map(&rwsem->dep_map, name, key, 0);
#endif
- rwsem->lock.save_state = 0;
+ rt_rw_mutex_init(&rwsem->rwlock);
+ rwsem->rwlock.mutex.save_state = 0;
}
EXPORT_SYMBOL(__rt_rwsem_init);

Index: linux-rt.git/include/linux/sched.h
===================================================================
--- linux-rt.git.orig/include/linux/sched.h
+++ linux-rt.git/include/linux/sched.h
@@ -993,6 +993,22 @@ struct sched_entity {
#endif
};

+#ifdef CONFIG_PREEMPT_RT_FULL
+struct rt_rw_mutex;
+/**
+ * reader_lock_struct
+ *
+ * @lock: pointer to rwsem that is held
+ * @task: pointer back to task, for lock code
+ * @list_head: link into rt_rw_mutex owners list
+ */
+struct reader_lock_struct {
+ struct rt_rw_mutex *lock;
+ struct task_struct *task;
+ struct list_head list;
+};
+#endif
+
struct sched_rt_entity {
struct list_head run_list;
unsigned long timeout;
@@ -1224,6 +1240,10 @@ struct task_struct {
#ifdef CONFIG_PREEMPT_RT_FULL
/* TODO: move me into ->restart_block ? */
struct siginfo forced_info;
+#define MAX_RWLOCK_DEPTH 5
+ int reader_lock_count; /* index of last element in owned_read_locks */
+ int reader_lock_free; /* index of next free element. */
+ struct reader_lock_struct owned_read_locks[MAX_RWLOCK_DEPTH];
#endif

unsigned long sas_ss_sp;
Index: linux-rt.git/kernel/fork.c
===================================================================
--- linux-rt.git.orig/kernel/fork.c
+++ linux-rt.git/kernel/fork.c
@@ -1385,6 +1385,26 @@ static struct task_struct *copy_process(
if (retval)
goto bad_fork_cleanup_io;

+#ifdef CONFIG_PREEMPT_RT_FULL
+ p->reader_lock_count = 0;
+ p->reader_lock_free = 0;
+ /* Bracket to keep 'i' local */
+ {
+ int i;
+ /*
+ * We could put the initialization of this list in
+ * the grabbing of the lock, but it is safer to
+ * do it now. The list head initialization may be
+ * removed, but we'll keep it for now, just to be safe.
+ */
+ for (i = 0; i < MAX_RWLOCK_DEPTH; i++) {
+ p->owned_read_locks[i].lock = NULL;
+ p->owned_read_locks[i].task = p;
+ INIT_LIST_HEAD(&p->owned_read_locks[i].list);
+ }
+ }
+#endif
+
if (pid != &init_struct_pid) {
retval = -ENOMEM;
pid = alloc_pid(p->nsproxy->pid_ns_for_children);
Index: linux-rt.git/kernel/sysctl.c
===================================================================
--- linux-rt.git.orig/kernel/sysctl.c
+++ linux-rt.git/kernel/sysctl.c
@@ -91,6 +91,10 @@
#include <linux/nmi.h>
#endif

+#ifdef CONFIG_PREEMPT_RT_FULL
+extern int rt_rw_limit;
+#endif
+

#if defined(CONFIG_SYSCTL)

@@ -453,6 +457,15 @@ static struct ctl_table kern_table[] = {
.proc_handler = proc_dointvec,
},
#endif
+#ifdef CONFIG_PREEMPT_RT_FULL
+ {
+ .procname = "rwsem_reader_limit",
+ .data = &rt_rw_limit,
+ .maxlen = sizeof(int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec,
+ },
+#endif
{
.procname = "panic",
.data = &panic_timeout,
Index: linux-rt.git/include/linux/rtmutex.h
===================================================================
--- linux-rt.git.orig/include/linux/rtmutex.h
+++ linux-rt.git/include/linux/rtmutex.h
@@ -42,6 +42,21 @@ struct rt_mutex {
#endif
};

+/**
+ * The rt_rw_mutex structure
+ *
+ * @rt_mutex: The mutex to wait on
+ * @owners: list of read owners of the mutex
+ * @nr_owners: number of read owners
+ * @prio: the priority of the highest waiter
+ */
+struct rt_rw_mutex {
+ struct rt_mutex mutex;
+ struct list_head owners;
+ int nr_owners;
+ int prio;
+};
+
struct rt_mutex_waiter;
struct hrtimer_sleeper;

@@ -75,6 +90,15 @@ struct hrtimer_sleeper;
__rt_mutex_init(mutex, #mutex); \
} while (0)

+# define rt_rw_mutex_init(rwmutex) \
+ do { \
+ raw_spin_lock_init(&(rwmutex)->mutex.wait_lock); \
+ INIT_LIST_HEAD(&(rwmutex)->owners); \
+ (rwmutex)->nr_owners = 0; \
+ (rwmutex)->prio = -1; \
+ __rt_mutex_init(&(rwmutex)->mutex, #rwmutex); \
+ } while (0)
+
#define __RT_MUTEX_INITIALIZER_PLAIN(mutexname) \
.wait_lock = __RAW_SPIN_LOCK_UNLOCKED(mutexname.wait_lock) \
, .wait_list = PLIST_HEAD_INIT(mutexname.wait_list) \
@@ -85,6 +109,11 @@ struct hrtimer_sleeper;
#define __RT_MUTEX_INITIALIZER(mutexname) \
{ __RT_MUTEX_INITIALIZER_PLAIN(mutexname) }

+#define __RT_RW_MUTEX_INITIALIZER(mutexname) \
+ { .owners = LIST_HEAD_INIT(mutexname.owners) \
+ , .prio = -1 \
+ , .mutex = __RT_MUTEX_INITIALIZER(mutexname.mutex) }
+
#define __RT_MUTEX_INITIALIZER_SAVE_STATE(mutexname) \
{ __RT_MUTEX_INITIALIZER_PLAIN(mutexname) \
, .save_state = 1 }
Index: linux-rt.git/include/linux/rwsem_rt.h
===================================================================
--- linux-rt.git.orig/include/linux/rwsem_rt.h
+++ linux-rt.git/include/linux/rwsem_rt.h
@@ -19,14 +19,14 @@
#include <linux/rtmutex.h>

struct rw_semaphore {
- struct rt_mutex lock;
+ struct rt_rw_mutex rwlock;
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
};

#define __RWSEM_INITIALIZER(name) \
- { .lock = __RT_MUTEX_INITIALIZER(name.lock), \
+ { .rwlock = __RT_RW_MUTEX_INITIALIZER(name.rwlock), \
RW_DEP_MAP_INIT(name) }

#define DECLARE_RWSEM(lockname) \
@@ -37,7 +37,7 @@ extern void __rt_rwsem_init(struct rw_s

#define __rt_init_rwsem(sem, name, key) \
do { \
- rt_mutex_init(&(sem)->lock); \
+ rt_rw_mutex_init(&(sem)->rwlock); \
__rt_rwsem_init((sem), (name), (key));\
} while (0)

@@ -63,7 +63,7 @@ extern void rt_up_write(struct rw_semap
extern void rt_downgrade_write(struct rw_semaphore *rwsem);

#define init_rwsem(sem) rt_init_rwsem(sem)
-#define rwsem_is_locked(s) rt_mutex_is_locked(&(s)->lock)
+#define rwsem_is_locked(s) rt_mutex_is_locked(&(s)->rwlock.mutex)

static inline void down_read(struct rw_semaphore *sem)
{
Index: linux-rt.git/kernel/futex.c
===================================================================
--- linux-rt.git.orig/kernel/futex.c
+++ linux-rt.git/kernel/futex.c
@@ -2327,7 +2327,7 @@ static int futex_wait_requeue_pi(u32 __u
* The waiter is allocated on our stack, manipulated by the requeue
* code while we sleep on uaddr.
*/
- rt_mutex_init_waiter(&rt_waiter, false);
+ rt_mutex_init_waiter(&rt_waiter, false, false);

ret = get_futex_key(uaddr2, flags & FLAGS_SHARED, &key2, VERIFY_WRITE);
if (unlikely(ret != 0))

2014-04-10 18:48:03

by Peter Zijlstra

[permalink] [raw]
Subject: Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems

On Thu, Apr 10, 2014 at 05:03:36PM +0200, Sebastian Andrzej Siewior wrote:
> On 04/10/2014 04:44 PM, Clark Williams wrote:
> > The means of each group of five test runs are:
> >
> > vanilla.log: 1210117 rt.log: 17210953 (14.2 x slower than
> > vanilla) rt-fixes.log: 10062027 (8.3 x slower than vanilla)
> > rt-multi.log: 3179582 (2.x x slower than vanilla)
> >
> >
> > As expected, vanilla kicked RT's butt when hammering on the
> > mmap_sem. But somewhat unexpectedly, your fixups helped quite a
> > bit and the multi+fixups got RT back into being almost
> > respectable.
> >
> > Obviously these are just preliminary results on one piece of h/w
> > but it looks promising.
>
> Is it easy to look at the latency when you have multiple readers and
> and a high prio writer which has to boost all those readers away
> instead just one?
> Or is this something that should not happen for a high prio RT task
> because it has all memory already allocated?

With care it should not happen; it should be relatively straight forward
to avoid all system calls that take mmap_sem for writing.

But yes, the total latency is a concern, that said, that is the very
reason there is a hard limit to the reader concurrency and why this
limit is a tunable.

It defaults to the total number of CPUs in the system, given the default
setup (all CPUs in a single balance domain), this should result in all
CPUs working concurrently on the boosted read sides.

So while there is always some overhead, the worst case should not be
nr_readers * read-hold-time.

Although, with more (unrelated) higher prio threads you can indeed wreck
this. Similarly by partitioning the system and not adjusting the max
reader you also get into trouble.

But then, the above nr_readers * read-hold-time is still an upper bound,
and the entire thing does stay deterministic.

2014-04-10 19:17:46

by Steven Rostedt

[permalink] [raw]
Subject: Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems

On Thu, 10 Apr 2014 17:36:17 +0200
Peter Zijlstra <[email protected]> wrote:


> It defaults to the total number of CPUs in the system, given the default
> setup (all CPUs in a single balance domain), this should result in all
> CPUs working concurrently on the boosted read sides.

Unfortunately, it currently defaults to the number of possible CPUs in
the system. I should probably move the default assignment to after SMP
is setup. Currently it happens in early boot before all the CPUs are
running. On boot up, the limit is set to NR_CPUS which should be much
higher than what the system has, but shouldn't matter during boot. But
after all the CPUs are up and running, it can lower it to online CPUs.

I think I'll go and make v3 of this patch.

-- Steve

2014-04-10 20:49:12

by Peter Zijlstra

[permalink] [raw]
Subject: Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems

On Thu, Apr 10, 2014 at 03:17:41PM -0400, Steven Rostedt wrote:
> On Thu, 10 Apr 2014 17:36:17 +0200
> Peter Zijlstra <[email protected]> wrote:
>
>
> > It defaults to the total number of CPUs in the system, given the default
> > setup (all CPUs in a single balance domain), this should result in all
> > CPUs working concurrently on the boosted read sides.
>
> Unfortunately, it currently defaults to the number of possible CPUs in
> the system. I should probably move the default assignment to after SMP
> is setup. Currently it happens in early boot before all the CPUs are
> running. On boot up, the limit is set to NR_CPUS which should be much
> higher than what the system has, but shouldn't matter during boot. But
> after all the CPUs are up and running, it can lower it to online CPUs.
>
> I think I'll go and make v3 of this patch.

Yah, I should have read the patch in more than 5 seconds flat I suppose
:-)

modifying the number post smp bringup should be fine.

2014-04-10 21:30:14

by Paul E. McKenney

[permalink] [raw]
Subject: Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems

On Thu, Apr 10, 2014 at 03:17:41PM -0400, Steven Rostedt wrote:
> On Thu, 10 Apr 2014 17:36:17 +0200
> Peter Zijlstra <[email protected]> wrote:
>
>
> > It defaults to the total number of CPUs in the system, given the default
> > setup (all CPUs in a single balance domain), this should result in all
> > CPUs working concurrently on the boosted read sides.
>
> Unfortunately, it currently defaults to the number of possible CPUs in
> the system. I should probably move the default assignment to after SMP
> is setup. Currently it happens in early boot before all the CPUs are
> running. On boot up, the limit is set to NR_CPUS which should be much
> higher than what the system has, but shouldn't matter during boot. But
> after all the CPUs are up and running, it can lower it to online CPUs.

Another approach is to use nr_cpu_ids, which is the maximum number of
CPUs that the particular booting system could ever have. I use this in
RCU to resize the data structures down from their NR_CPUS compile-time
hugeness.

Thanx, Paul

> I think I'll go and make v3 of this patch.
>
> -- Steve
>

2014-04-10 22:02:08

by Steven Rostedt

[permalink] [raw]
Subject: Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems

On Thu, 10 Apr 2014 14:30:03 -0700
"Paul E. McKenney" <[email protected]> wrote:

> On Thu, Apr 10, 2014 at 03:17:41PM -0400, Steven Rostedt wrote:
> > On Thu, 10 Apr 2014 17:36:17 +0200
> > Peter Zijlstra <[email protected]> wrote:
> >
> >
> > > It defaults to the total number of CPUs in the system, given the default
> > > setup (all CPUs in a single balance domain), this should result in all
> > > CPUs working concurrently on the boosted read sides.
> >
> > Unfortunately, it currently defaults to the number of possible CPUs in
> > the system. I should probably move the default assignment to after SMP
> > is setup. Currently it happens in early boot before all the CPUs are
> > running. On boot up, the limit is set to NR_CPUS which should be much
> > higher than what the system has, but shouldn't matter during boot. But
> > after all the CPUs are up and running, it can lower it to online CPUs.
>
> Another approach is to use nr_cpu_ids, which is the maximum number of
> CPUs that the particular booting system could ever have. I use this in
> RCU to resize the data structures down from their NR_CPUS compile-time
> hugeness.
>

OK, also, in doing our benchmarks, there's a big difference with
rt_rw_limit being num_online_cpus and 2 * num_online_cpus. It doesn't
seem to get better adding more than that. This was shown on a case with
12 cpus as well as 8 cpus. Same result.

I really like to see a real use case benefit to find the best default.
But as our mmap_sem stress test shows 2xCPUS as being the best, I'm
going to go with that until someone comes up with a better test.

-- Steve

2014-04-11 02:35:15

by Steven Rostedt

[permalink] [raw]
Subject: [RFC PATCH RT V3] rwsem: The return of multi-reader PI rwsems

Changes since v2 - move the setting of the rt_rw_limit to
late_initcall, and use 2 x nr_cpu_ids. Thanks to Paul McKenney for
suggesting using nr_cpu_ids instead of num_possible_cpus(). The 2x is
because out testing shows that 2x gives twice the performance as 1x
using Clark's whack_mmap_sem benchmark.

Also fixed the rwsem_reader_limit == 0. Now it gives much better
performance. The wrapper function to test rt_rw_limit tested zero
against the counter and not the rt_rw_limit. It didn't lock up because
it allowed a reader to take the lock if it was the only one. Basically
it simulated rt_rw_limit == 1.

I added Carsten to the Cc, so I'll post the entire change log of v1
here again.

-----

A while back ago I wrote a patch that would allow for reader/writer
locks like rwlock and rwsems to have multiple readers in PREEMPT_RT. It
was slick and fast but unfortunately it was way too complex and ridden
with nasty little critters which earned me my large collection of
frozen sharks in the fridge (which are quite tasty).

The main problem with my previous solution was that I tried to be too
clever. I worked hard on making the rw mutex still have the "fast
path". That is, the cmpxchg that could allow a non contended grabbing
of the lock be one instruction and be off with it. But to get that
working required lots of tricks and black magic that was certainly
going to fail. Thus, with the raining of sharks on my parade, the
priority inheritance mutex with multiple owners died a slow painful
death.

So we thought.

But over the years, a new darkness was on the horizon. Complaints about
running highly threaded processes (did I hear Java?) were suffering
huge performance hits on the PREEMPT_RT kernel. Whether or not the
processes were real-time tasks, they still were horrible compared to
running the same tasks on the mainline kernel. Note, this was being
done on machines with many CPUs.

The culprit mostly was a single rwsem, the notorious mmap_sem that
can be taking several times for read, and as on RT, this is just a
single mutex, and it would serialize these accesses that would not
happen on mainline.

I looked back at my poor dead rw multi pi reader patch and thought to
myself. "How complex would this be if I removed the 'fast path' from
the code". I decided to build a new tower in Mordor.

I feel that I am correct. By removing the fast path and requiring all
accesses to the rwsem to go through the slow path (must take the
wait_lock to do anything). The code really wasn't that bad. I also only
focused on the rwsem and did not worry about the rwlocks as that hasn't
been pointed out as a bottle neck yet. If it does happen to be, this
code could easily work on rwlocks too.

I'm much more confident in this code than I was with my previous
version of the rwlock multi-reader patch. I added a bunch of comments
to this code to explain how things interact. The writer unlock was
still able to use the fast path as the writers are pretty much like a
normal mutex. Too bad that the writer unlock is not a high point of
contention.

This patch is built on top of the two other patches that I posted
earlier, which should not be as controversial.

If you have any benchmark on large machines I would be very happy if
you could test this patch against the unpatched version of -rt.

Cheers,

-- Steve

Signed-off-by: Steven Rostedt <[email protected]>

Index: linux-rt.git/kernel/rtmutex.c
===================================================================
--- linux-rt.git.orig/kernel/rtmutex.c
+++ linux-rt.git/kernel/rtmutex.c
@@ -26,6 +26,21 @@
#include "rtmutex_common.h"

/*
+ * rt_rw_limit is the number of simultaneous readers of a rwsem lock.
+ *
+ * rt_rw_limit gets updated on boot up to the number of
+ * possible CPUs, but we need to initialize it to something other
+ * than zero.
+ */
+unsigned rt_rw_limit = NR_CPUS;
+
+/* cnt == 0 means unlimited */
+static inline int under_rt_rw_limit(int cnt)
+{
+ return !rt_rw_limit || cnt < rt_rw_limit;
+}
+
+/*
* lock->owner state tracking:
*
* lock->owner holds the task_struct pointer of the owner. Bit 0
@@ -110,19 +125,44 @@ static inline void init_lists(struct rt_
plist_head_init(&lock->wait_list);
}

+static inline void init_rw_lists(struct rt_rw_mutex *rwlock)
+{
+ struct rt_mutex *lock = &rwlock->mutex;
+
+ /*
+ * A rwsem priority is initialized to -1 and never will
+ * be that again.
+ */
+ if (unlikely(rwlock->prio < 0)) {
+ rwlock->prio = MAX_PRIO;
+ init_lists(lock);
+ }
+}
+
+static int rt_mutex_get_readers_prio(struct task_struct *task, int prio);
+static inline int task_has_reader_locks(struct task_struct *task);
+
/*
* Calculate task priority from the waiter list priority
*
* Return task->normal_prio when the waiter list is empty or when
* the waiter is not allowed to do priority boosting
+ *
+ * On PREEMPT_RT, we also check the priorities of the list
+ * of read locks that the task holds.
*/
int rt_mutex_getprio(struct task_struct *task)
{
- if (likely(!task_has_pi_waiters(task)))
- return task->normal_prio;
+ int prio = task->normal_prio;
+
+ if (likely(!task_has_pi_waiters(task) &&
+ !task_has_reader_locks(task)))
+ return prio;

- return min(task_top_pi_waiter(task)->pi_list_entry.prio,
- task->normal_prio);
+ if (task_has_reader_locks(task))
+ prio = rt_mutex_get_readers_prio(task, prio);
+
+ return min(task_top_pi_waiter(task)->pi_list_entry.prio, prio);
}

/*
@@ -181,6 +221,11 @@ static void rt_mutex_wake_waiter(struct
*/
int max_lock_depth = 1024;

+static int rt_mutex_adjust_readers(struct rt_mutex *orig_lock,
+ struct rt_mutex_waiter *orig_waiter,
+ struct task_struct *top_task,
+ struct rt_mutex *lock,
+ int recursion_depth);
/*
* Adjust the priority chain. Also used for deadlock detection.
* Decreases task's usage by one - may thus free the task.
@@ -203,7 +248,8 @@ static int rt_mutex_adjust_prio_chain(st
int deadlock_detect,
struct rt_mutex *orig_lock,
struct rt_mutex_waiter *orig_waiter,
- struct task_struct *top_task)
+ struct task_struct *top_task,
+ int recursion_depth)
{
struct rt_mutex *lock;
struct rt_mutex_waiter *waiter, *top_waiter = orig_waiter;
@@ -316,6 +362,18 @@ static int rt_mutex_adjust_prio_chain(st

/* Grab the next task */
task = rt_mutex_owner(lock);
+
+ /*
+ * Readers are special. We may need to boost more than one owner.
+ */
+ if (unlikely(task == RT_RW_READER)) {
+ ret = rt_mutex_adjust_readers(orig_lock, orig_waiter,
+ top_task, lock,
+ recursion_depth);
+ raw_spin_unlock(&lock->wait_lock);
+ goto out;
+ }
+
get_task_struct(task);
raw_spin_lock_irqsave(&task->pi_lock, flags);

@@ -349,7 +407,7 @@ static int rt_mutex_adjust_prio_chain(st
raw_spin_unlock_irqrestore(&task->pi_lock, flags);
out_put_task:
put_task_struct(task);
-
+ out:
return ret;
}

@@ -518,6 +576,13 @@ static int task_blocks_on_rt_mutex(struc
return 0;

if (waiter == rt_mutex_top_waiter(lock)) {
+ /* readers are handled differently */
+ if (unlikely(owner == RT_RW_READER)) {
+ res = rt_mutex_adjust_readers(lock, waiter,
+ current, lock, 0);
+ return res;
+ }
+
raw_spin_lock_irqsave(&owner->pi_lock, flags);
plist_del(&top_waiter->pi_list_entry, &owner->pi_waiters);
plist_add(&waiter->pi_list_entry, &owner->pi_waiters);
@@ -527,7 +592,8 @@ static int task_blocks_on_rt_mutex(struc
chain_walk = 1;
raw_spin_unlock_irqrestore(&owner->pi_lock, flags);
}
- else if (debug_rt_mutex_detect_deadlock(waiter, detect_deadlock))
+ else if (debug_rt_mutex_detect_deadlock(waiter, detect_deadlock) &&
+ owner != RT_RW_READER)
chain_walk = 1;

if (!chain_walk)
@@ -543,7 +609,7 @@ static int task_blocks_on_rt_mutex(struc
raw_spin_unlock(&lock->wait_lock);

res = rt_mutex_adjust_prio_chain(owner, detect_deadlock, lock, waiter,
- task);
+ task, 0);

raw_spin_lock(&lock->wait_lock);

@@ -633,7 +699,7 @@ static void remove_waiter(struct rt_mute

raw_spin_unlock(&lock->wait_lock);

- rt_mutex_adjust_prio_chain(owner, 0, lock, NULL, current);
+ rt_mutex_adjust_prio_chain(owner, 0, lock, NULL, current, 0);

raw_spin_lock(&lock->wait_lock);
}
@@ -660,7 +726,7 @@ void rt_mutex_adjust_pi(struct task_stru
/* gets dropped in rt_mutex_adjust_prio_chain()! */
get_task_struct(task);
raw_spin_unlock_irqrestore(&task->pi_lock, flags);
- rt_mutex_adjust_prio_chain(task, 0, NULL, NULL, task);
+ rt_mutex_adjust_prio_chain(task, 0, NULL, NULL, task, 0);
}

#ifdef CONFIG_PREEMPT_RT_FULL
@@ -739,7 +805,7 @@ static void noinline __sched rt_spin_lo
struct rt_mutex_waiter waiter, *top_waiter;
int ret;

- rt_mutex_init_waiter(&waiter, true);
+ rt_mutex_init_waiter(&waiter, true, false);

raw_spin_lock(&lock->wait_lock);
init_lists(lock);
@@ -1001,15 +1067,564 @@ __mutex_lock_check_stamp(struct rt_mutex

return 0;
}
+
+/*
+ * Wake up the next waiter on a rw lock.
+ *
+ * Similar to wakeup_next_waiter() but the waiter is not on
+ * the owner's pi_waiters. Also, it does not reset the lock
+ * owner.
+ *
+ * Called with lock->wait_lock held.
+ */
+static void wakeup_next_rw_waiter(struct rt_mutex *lock)
+{
+ struct rt_mutex_waiter *waiter;
+
+ waiter = rt_mutex_top_waiter(lock);
+ rt_mutex_wake_waiter(waiter);
+}
+
+/* Called with rwmutex->mutex.wait_lock held */
+static inline void
+rt_rw_mutex_take_as_reader(struct task_struct *task,
+ struct rt_rw_mutex *rwmutex,
+ struct rt_mutex_waiter *waiter)
+{
+ struct reader_lock_struct *rls;
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ unsigned long flags;
+ bool wakeup = false;
+
+ rwmutex->nr_owners++;
+ rt_mutex_set_owner(&rwmutex->mutex, RT_RW_READER);
+
+ if (waiter || rt_mutex_has_waiters(mutex)) {
+ unsigned long flags;
+ struct rt_mutex_waiter *top;
+
+ raw_spin_lock_irqsave(&task->pi_lock, flags);
+
+ /* remove the queued waiter. */
+ if (waiter) {
+ plist_del(&waiter->list_entry, &mutex->wait_list);
+ task->pi_blocked_on = NULL;
+ }
+
+ /*
+ * Initialize the rwmutex prio to the priority of
+ * the top waiter.
+ */
+ if (rt_mutex_has_waiters(mutex)) {
+ top = rt_mutex_top_waiter(mutex);
+ top->pi_list_entry.prio = top->list_entry.prio;
+ /*
+ * Readers set the lock priority for faster access
+ * to read the priorities of the locks it owns
+ * when boosting. This helps to not have to take
+ * the pi_lock of the task. The rwmutex->prio
+ * is protected by the rwmutex->mutex.wait_lock,
+ * which is held during boosting.
+ */
+ rwmutex->prio = top->list_entry.prio;
+
+ /*
+ * If this waiter is a reader, and the reader limit
+ * has not been hit, then we can wake this waiter
+ * up too.
+ */
+ if (!top->writer && under_rt_rw_limit(rwmutex->nr_owners))
+ wakeup = true;
+ } else
+ rwmutex->prio = MAX_PRIO;
+ raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+ } else
+ rwmutex->prio = MAX_PRIO;
+
+ /*
+ * It is possible to have holes in the owned_read_locks array.
+ * If we take read lock A and then B, but then release A, we
+ * can't move the pointer of B because if something blocks on
+ * B, it can use the B pointer to boost this task. B can only
+ * be moved, by owning the wait_list lock of B. Remember, the
+ * B lock has its pointers to that index of our array.
+ */
+ rls = &task->owned_read_locks[task->reader_lock_free];
+ BUG_ON(rls->lock);
+
+ /*
+ * Grabing the pi_lock here as other tasks can boost this
+ * lock via other held locks, and if free lock descriptor
+ * was not at the end of the owned_read_locks array, then
+ * it might get confused if it sees this descriptor with
+ * a lock and no task.
+ */
+ raw_spin_lock_irqsave(&task->pi_lock, flags);
+ rls->lock = rwmutex;
+ rls->task = task;
+ list_add(&rls->list, &rwmutex->owners);
+ raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+ if (task->reader_lock_free == task->reader_lock_count) {
+ /*
+ * If we nest too deep, then this task can never get the lock.
+ * This task will then block for good. Warn about this and
+ * hopefully, people will notice the warning. If not, they will notice
+ * the dead task. Maybe this should be a BUG_ON()
+ */
+ if (WARN_ON(task->reader_lock_count == MAX_RWLOCK_DEPTH))
+ return;
+
+ /*
+ * We don't need to take the pi_lock here as we have the
+ * wait_lock of the lock at the end of the list. If this task
+ * was being boosted by a task blocked on this lock, it would
+ * need to grab the wait_lock before boosting this task.
+ */
+ task->reader_lock_count++;
+ task->reader_lock_free++;
+ } else {
+ /*
+ * Find the next free lock in array. Again, we do not need
+ * to grab the pi_lock because the boosting doesn't use
+ * the reader_lock_free variable, which is the only thing
+ * we are updating here.
+ */
+ do {
+ rls = &task->owned_read_locks[++task->reader_lock_free];
+ } while (rls->lock &&
+ task->reader_lock_free < task->reader_lock_count);
+ }
+
+ if (wakeup)
+ wakeup_next_rw_waiter(&rwmutex->mutex);
+}
+
+static int try_to_take_rw_read(struct rt_rw_mutex *rwmutex,
+ struct rt_mutex_waiter *waiter)
+{
+ struct task_struct *task = current;
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ struct task_struct *owner;
+
+ assert_raw_spin_locked(&mutex->wait_lock);
+
+ /* The writer unlock can use the fast path */
+ mark_rt_mutex_waiters(mutex);
+
+ /* If we are at the reader limit, we can't take the lock */
+ if (unlikely(!under_rt_rw_limit(rwmutex->nr_owners)))
+ return 0;
+
+ /*
+ * If there's no waiters, and there is no owner or
+ * the owner is a reader, we get the lock.
+ * Note, if the waiter or pending bits are set in the owner
+ * then we need to do more checks.
+ */
+ if (likely(!mutex->owner || mutex->owner == RT_RW_READER))
+ goto taken;
+
+ owner = rt_mutex_owner(mutex);
+
+ /* If the lock is owned by a task, then it is a writer */
+ if (owner && owner != RT_RW_READER)
+ return 0;
+
+ /*
+ * A writer or a reader may be waiting. In either case, we
+ * may still be able to steal the lock. The RT rwsems are
+ * not fair if the new reader comes in and is higher priority
+ * than all the waiters.
+ */
+ if (likely(rt_mutex_has_waiters(mutex))) {
+ struct task_struct *pown = rt_mutex_top_waiter(mutex)->task;
+
+ if (task != pown && !lock_is_stealable(task, pown, STEAL_NORMAL))
+ return 0;
+ }
+
+ taken:
+ rt_rw_mutex_take_as_reader(task, rwmutex, waiter);
+ return 1;
+}
+
+void rt_rw_mutex_read_lock(struct rt_rw_mutex *rwmutex)
+{
+ struct rt_mutex_waiter waiter;
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ int ret;
+
+ raw_spin_lock(&mutex->wait_lock);
+ init_rw_lists(rwmutex);
+
+ if (likely(try_to_take_rw_read(rwmutex, NULL))) {
+ /* Got the lock */
+ raw_spin_unlock(&mutex->wait_lock);
+ return;
+ }
+
+ rt_mutex_init_waiter(&waiter, false, false);
+
+ set_current_state(TASK_UNINTERRUPTIBLE);
+
+ ret = task_blocks_on_rt_mutex(mutex, &waiter, current, 0);
+ BUG_ON(ret);
+
+ for (;;) {
+ /* Try to acquire the lock: */
+ if (try_to_take_rw_read(rwmutex, &waiter))
+ break;
+
+ raw_spin_unlock(&mutex->wait_lock);
+
+ debug_rt_mutex_print_deadlock(waiter);
+
+ schedule_rt_mutex(mutex);
+
+ raw_spin_lock(&mutex->wait_lock);
+ set_current_state(TASK_UNINTERRUPTIBLE);
+ }
+
+ set_current_state(TASK_RUNNING);
+
+ raw_spin_unlock(&mutex->wait_lock);
+
+ debug_rt_mutex_free_waiter(&waiter);
+}
+
+int rt_rw_mutex_read_trylock(struct rt_rw_mutex *rwmutex)
+{
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ int ret = 0;
+
+ if (!raw_spin_trylock(&mutex->wait_lock))
+ return ret;
+
+ init_rw_lists(rwmutex);
+
+ if (try_to_take_rw_read(rwmutex, NULL))
+ ret = 1;
+
+ raw_spin_unlock(&mutex->wait_lock);
+
+ return ret;
+}
+
+static int
+try_to_take_rw_write(struct rt_rw_mutex *rwmutex, struct rt_mutex_waiter *waiter)
+{
+ struct rt_mutex *mutex = &rwmutex->mutex;
+
+ /* Writers block if there's any readers owning the lock */
+ if (rwmutex->nr_owners)
+ return 0;
+
+ /* No readers, then we can try to take the mutex normally. */
+ return try_to_take_rt_mutex(mutex, current, waiter);
+}
+
+void rt_rw_mutex_write_lock(struct rt_rw_mutex *rwmutex)
+{
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ struct rt_mutex_waiter waiter;
+ struct task_struct *task = current;
+ int ret;
+
+ raw_spin_lock(&mutex->wait_lock);
+ init_rw_lists(rwmutex);
+
+ if (try_to_take_rw_write(rwmutex, NULL)) {
+ raw_spin_unlock(&mutex->wait_lock);
+ return;
+ }
+
+ /* Writers wake up differently than readers (flag it) */
+ rt_mutex_init_waiter(&waiter, false, true);
+
+ ret = task_blocks_on_rt_mutex(mutex, &waiter, task, 0);
+ BUG_ON(ret);
+
+ set_current_state(TASK_UNINTERRUPTIBLE);
+
+ for (;;) {
+ /* Try to acquire the lock: */
+ if (try_to_take_rw_write(rwmutex, &waiter))
+ break;
+
+ raw_spin_unlock(&mutex->wait_lock);
+
+ debug_rt_mutex_print_deadlock(waiter);
+
+ schedule_rt_mutex(mutex);
+
+ raw_spin_lock(&mutex->wait_lock);
+ set_current_state(TASK_UNINTERRUPTIBLE);
+ }
+
+ set_current_state(TASK_RUNNING);
+
+ raw_spin_unlock(&mutex->wait_lock);
+ WARN_ONCE(rwmutex->nr_owners, "Writer has lock with readers");
+
+ debug_rt_mutex_free_waiter(&waiter);
+}
+
+int rt_rw_mutex_write_trylock(struct rt_rw_mutex *rwmutex)
+{
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ int ret = 0;
+
+ if (!raw_spin_trylock(&mutex->wait_lock))
+ return ret;
+
+ init_rw_lists(rwmutex);
+
+ if (try_to_take_rw_write(rwmutex, NULL))
+ ret = 1;
+
+ raw_spin_unlock(&mutex->wait_lock);
+
+ return ret;
+}
+
+/*
+ * When a reader lock is released, see if the reader_lock_count
+ * can be moved back.
+ */
+static void shrink_reader_lock_array(struct task_struct *task)
+{
+ struct reader_lock_struct *read_locks = task->owned_read_locks;
+
+ while (task->reader_lock_count &&
+ read_locks[task->reader_lock_count - 1].lock == NULL)
+ task->reader_lock_count--;
+
+ if (task->reader_lock_free > task->reader_lock_count)
+ task->reader_lock_free = task->reader_lock_count;
+}
+
+void rt_rw_mutex_read_unlock(struct rt_rw_mutex *rwmutex)
+{
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ struct rt_mutex_waiter *waiter;
+ struct reader_lock_struct *rls;
+ struct task_struct *task = current;
+ unsigned long flags;
+ int readers;
+ int i;
+
+ raw_spin_lock(&mutex->wait_lock);
+
+ rt_mutex_deadlock_account_unlock(current);
+
+ WARN_ON_ONCE(!rwmutex->nr_owners);
+
+ rwmutex->nr_owners--;
+
+ if (rt_mutex_has_waiters(mutex)) {
+ /*
+ * If the top waiter is a reader and we are under
+ * the limit, or the top waiter is a writer and
+ * there's no more readers, then we can give the
+ * top waiter pending ownership and wake it up.
+ *
+ * To simplify things, we only wake up one task, even
+ * if its a reader. If the reader wakes up and gets the
+ * lock, it will look to see if it can wake up the next
+ * waiter, and so on. This way we only need to worry about
+ * one task at a time.
+ */
+ waiter = rt_mutex_top_waiter(mutex);
+ readers = rwmutex->nr_owners;
+ if ((waiter->writer && !readers) ||
+ (!waiter->writer && under_rt_rw_limit(readers)))
+ wakeup_next_rw_waiter(mutex);
+ } else
+ rwmutex->prio = MAX_PRIO;
+
+ if (!rwmutex->nr_owners)
+ rt_mutex_set_owner(&rwmutex->mutex, NULL);
+
+ raw_spin_lock_irqsave(&task->pi_lock, flags);
+ /* Remove the lock from this tasks list */
+ for (i = task->reader_lock_count - 1; i >= 0; i--) {
+ rls = &task->owned_read_locks[i];
+
+ if (rls->lock == rwmutex) {
+ rls->lock = NULL;
+ list_del_init(&rls->list);
+ /* Shrink the array if we can. */
+ if (i == task->reader_lock_count - 1)
+ shrink_reader_lock_array(task);
+ else if (i < task->reader_lock_free)
+ task->reader_lock_free = i;
+ break;
+ }
+ }
+ raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+ WARN_ON_ONCE(i < 0);
+
+ raw_spin_unlock(&mutex->wait_lock);
+
+ /* Undo pi boosting if necessary */
+ rt_mutex_adjust_prio(current);
+}
+
+void rt_rw_mutex_downgrade_write(struct rt_rw_mutex *rwmutex)
+{
+ struct task_struct *task = current;
+ struct rt_mutex *mutex = &rwmutex->mutex;
+
+ raw_spin_lock(&mutex->wait_lock);
+
+ /*
+ * Writers have normal pi with other tasks blocked
+ * on the lock. That is, the top waiter will be in the
+ * pi_list of this task. But for readers, waiters of
+ * the lock are not included in the pi_list, only the
+ * locks are. We must remove the top waiter of this
+ * lock from this task.
+ */
+ if (rt_mutex_has_waiters(mutex)) {
+ struct rt_mutex_waiter *waiter;
+ unsigned long flags;
+
+ waiter = rt_mutex_top_waiter(mutex);
+ raw_spin_lock_irqsave(&task->pi_lock, flags);
+ plist_del(&waiter->pi_list_entry, &task->pi_waiters);
+ /*
+ * The rt_rw_mutex_take_as_reader() will update
+ * the rwmutex prio.
+ */
+ raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+ }
+
+ WARN_ONCE(rwmutex->nr_owners, "Writer owned with readers");
+
+ rt_rw_mutex_take_as_reader(task, rwmutex, NULL);
+
+ raw_spin_unlock(&mutex->wait_lock);
+}
+
+static inline int task_has_reader_locks(struct task_struct *task)
+{
+ return task->reader_lock_count;
+}
+
+static int rt_mutex_get_readers_prio(struct task_struct *task, int prio)
+{
+ struct reader_lock_struct *rls;
+ struct rt_rw_mutex *rwmutex;
+ int lock_prio;
+ int i;
+
+ for (i = 0; i < task->reader_lock_count; i++) {
+ rls = &task->owned_read_locks[i];
+ rwmutex = rls->lock;
+ if (!rwmutex)
+ continue;
+ lock_prio = rwmutex->prio;
+ if (prio > lock_prio)
+ prio = lock_prio;
+ }
+ WARN_ON_ONCE(prio < 0);
+
+ return prio;
+}
+
+/* Expects to be called with lock->wait_lock held */
+static int rt_mutex_adjust_readers(struct rt_mutex *orig_lock,
+ struct rt_mutex_waiter *orig_waiter,
+ struct task_struct *top_task,
+ struct rt_mutex *lock,
+ int recursion_depth)
+{
+ struct reader_lock_struct *rls;
+ struct rt_mutex_waiter *waiter;
+ struct task_struct *task;
+ struct rt_rw_mutex *rwmutex = container_of(lock, struct rt_rw_mutex, mutex);
+ unsigned long flags;
+
+ /* Update the rwmutex's prio */
+ if (rt_mutex_has_waiters(lock)) {
+ waiter = rt_mutex_top_waiter(lock);
+ /*
+ * Do we need to grab the task->pi_lock?
+ * Really, we are only reading it. If it
+ * changes, then that should follow this chain
+ * too.
+ */
+ rwmutex->prio = waiter->task->prio;
+ } else
+ rwmutex->prio = MAX_PRIO;
+
+ if (recursion_depth >= MAX_RWLOCK_DEPTH) {
+ WARN_ON(1);
+ return 1;
+ }
+
+ list_for_each_entry(rls, &rwmutex->owners, list) {
+ bool skip = false;
+
+ task = rls->task;
+
+ raw_spin_lock_irqsave(&task->pi_lock, flags);
+ __rt_mutex_adjust_prio(task);
+ /*
+ * We need to grab the pi_lock to adjust the task prio
+ * might as well use this to check if the task is blocked
+ * as well, and save on a call to the prio chain that will
+ * just grab the lock again and do the test.
+ */
+ if (!rt_mutex_real_waiter(task->pi_blocked_on))
+ skip = true;
+ raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+ if (skip)
+ continue;
+
+ get_task_struct(task);
+ /*
+ * rt_mutex_adjust_prio_chain will do
+ * the put_task_struct
+ */
+ rt_mutex_adjust_prio_chain(task, 0, orig_lock,
+ orig_waiter, top_task,
+ recursion_depth+1);
+ }
+
+ return 0;
+}
#else
+static int rt_mutex_adjust_readers(struct rt_mutex *orig_lock,
+ struct rt_mutex_waiter *orig_waiter,
+ struct task_struct *top_task,
+ struct rt_mutex *lock,
+ int recursion_depth)
+{
+ return 0;
+}
+
+static int rt_mutex_get_readers_prio(struct task_struct *task, int prio)
+{
+ return prio;
+}
+
+static inline int task_has_reader_locks(struct task_struct *task)
+{
+ return 0;
+}
+
static inline int __sched
__mutex_lock_check_stamp(struct rt_mutex *lock, struct ww_acquire_ctx *ctx)
{
BUG();
return 0;
}
-
-#endif
+#endif /* CONFIG_PREEMPT_RT */

/**
* __rt_mutex_slowlock() - Perform the wait-wake-try-to-take loop
@@ -1154,7 +1769,7 @@ rt_mutex_slowlock(struct rt_mutex *lock,
struct rt_mutex_waiter waiter;
int ret = 0;

- rt_mutex_init_waiter(&waiter, false);
+ rt_mutex_init_waiter(&waiter, false, false);

raw_spin_lock(&lock->wait_lock);
init_lists(lock);
@@ -1718,4 +2333,21 @@ void __sched ww_mutex_unlock(struct ww_m
rt_mutex_unlock(&lock->base.lock);
}
EXPORT_SYMBOL(ww_mutex_unlock);
+
+/*
+ * On boot up, rt_rw_limit is set to NR_CPUS. At the end of boot
+ * we can lower that to actual CPUs as everything should be running
+ * as it most likely will on a normal system.
+ *
+ * Note, benchmarks have shown that the best performance we get
+ * from doing a page fault stress test on threads, is when
+ * rt_rw_limit is set to 2x online CPUs.
+ */
+static int __init rt_rw_limit_init(void)
+{
+ rt_rw_limit = nr_cpu_ids * 2;
+ return 0;
+}
+late_initcall(rt_rw_limit_init);
+
#endif
Index: linux-rt.git/kernel/rtmutex_common.h
===================================================================
--- linux-rt.git.orig/kernel/rtmutex_common.h
+++ linux-rt.git/kernel/rtmutex_common.h
@@ -43,6 +43,7 @@ extern void schedule_rt_mutex_test(struc
* @list_entry: pi node to enqueue into the mutex waiters list
* @pi_list_entry: pi node to enqueue into the mutex owner waiters list
* @task: task reference to the blocked task
+ * @writer: true if its a rwsem writer that is blocked
*/
struct rt_mutex_waiter {
struct plist_node list_entry;
@@ -50,6 +51,7 @@ struct rt_mutex_waiter {
struct task_struct *task;
struct rt_mutex *lock;
bool savestate;
+ bool writer;
#ifdef CONFIG_DEBUG_RT_MUTEXES
unsigned long ip;
struct pid *deadlock_task_pid;
@@ -101,6 +103,20 @@ static inline struct task_struct *rt_mut
((unsigned long)lock->owner & ~RT_MUTEX_OWNER_MASKALL);
}

+/* used as reader owner of the mutex */
+#define RT_RW_READER (struct task_struct *)0x100
+
+#ifdef CONFIG_PREEMPT_RT_FULL
+
+void rt_rw_mutex_read_unlock(struct rt_rw_mutex *rwmutex);
+void rt_rw_mutex_downgrade_write(struct rt_rw_mutex *rwmutex);
+int rt_rw_mutex_write_trylock(struct rt_rw_mutex *rwmutex);
+void rt_rw_mutex_write_lock(struct rt_rw_mutex *rwmutex);
+int rt_rw_mutex_read_trylock(struct rt_rw_mutex *rwmutex);
+void rt_rw_mutex_read_lock(struct rt_rw_mutex *rwmutex);
+
+#endif /* CONFIG_PREEMPT_RT */
+
/*
* PI-futex support (proxy locking functions, etc.):
*/
@@ -128,11 +144,12 @@ extern int rt_mutex_finish_proxy_lock(st
#endif

static inline void
-rt_mutex_init_waiter(struct rt_mutex_waiter *waiter, bool savestate)
+rt_mutex_init_waiter(struct rt_mutex_waiter *waiter, bool savestate, bool writer)
{
debug_rt_mutex_init_waiter(waiter);
waiter->task = NULL;
waiter->savestate = savestate;
+ waiter->writer = writer;
}

#endif
Index: linux-rt.git/kernel/rt.c
===================================================================
--- linux-rt.git.orig/kernel/rt.c
+++ linux-rt.git/kernel/rt.c
@@ -310,14 +310,19 @@ EXPORT_SYMBOL(__rt_rwlock_init);
void rt_up_write(struct rw_semaphore *rwsem)
{
rwsem_release(&rwsem->dep_map, 1, _RET_IP_);
- rt_mutex_unlock(&rwsem->lock);
+ /*
+ * Unlocking a write is the same as unlocking the mutex.
+ * The woken reader will do all the work if it needs to
+ * wake up more than one reader.
+ */
+ __rt_spin_unlock(&rwsem->rwlock.mutex);
}
EXPORT_SYMBOL(rt_up_write);

void rt_up_read(struct rw_semaphore *rwsem)
{
rwsem_release(&rwsem->dep_map, 1, _RET_IP_);
- rt_mutex_unlock(&rwsem->lock);
+ rt_rw_mutex_read_unlock(&rwsem->rwlock);
}
EXPORT_SYMBOL(rt_up_read);

@@ -327,13 +332,14 @@ EXPORT_SYMBOL(rt_up_read);
*/
void rt_downgrade_write(struct rw_semaphore *rwsem)
{
- BUG_ON(rt_mutex_owner(&rwsem->lock) != current);
+ BUG_ON(rt_mutex_owner(&rwsem->rwlock.mutex) != current);
+ rt_rw_mutex_downgrade_write(&rwsem->rwlock);
}
EXPORT_SYMBOL(rt_downgrade_write);

int rt_down_write_trylock(struct rw_semaphore *rwsem)
{
- int ret = rt_mutex_trylock(&rwsem->lock);
+ int ret = rt_rw_mutex_write_trylock(&rwsem->rwlock);

if (ret)
rwsem_acquire(&rwsem->dep_map, 0, 1, _RET_IP_);
@@ -344,14 +350,14 @@ EXPORT_SYMBOL(rt_down_write_trylock);
void rt_down_write(struct rw_semaphore *rwsem)
{
rwsem_acquire(&rwsem->dep_map, 0, 0, _RET_IP_);
- rt_mutex_lock(&rwsem->lock);
+ rt_rw_mutex_write_lock(&rwsem->rwlock);
}
EXPORT_SYMBOL(rt_down_write);

void rt_down_write_nested(struct rw_semaphore *rwsem, int subclass)
{
rwsem_acquire(&rwsem->dep_map, subclass, 0, _RET_IP_);
- rt_mutex_lock(&rwsem->lock);
+ rt_rw_mutex_write_lock(&rwsem->rwlock);
}
EXPORT_SYMBOL(rt_down_write_nested);

@@ -359,14 +365,14 @@ void rt_down_write_nested_lock(struct rw
struct lockdep_map *nest)
{
rwsem_acquire_nest(&rwsem->dep_map, 0, 0, nest, _RET_IP_);
- rt_mutex_lock(&rwsem->lock);
+ rt_rw_mutex_write_lock(&rwsem->rwlock);
}

int rt_down_read_trylock(struct rw_semaphore *rwsem)
{
int ret;

- ret = rt_mutex_trylock(&rwsem->lock);
+ ret = rt_rw_mutex_read_trylock(&rwsem->rwlock);
if (ret)
rwsem_acquire(&rwsem->dep_map, 0, 1, _RET_IP_);

@@ -377,7 +383,7 @@ EXPORT_SYMBOL(rt_down_read_trylock);
static void __rt_down_read(struct rw_semaphore *rwsem, int subclass)
{
rwsem_acquire(&rwsem->dep_map, subclass, 0, _RET_IP_);
- rt_mutex_lock(&rwsem->lock);
+ rt_rw_mutex_read_lock(&rwsem->rwlock);
}

void rt_down_read(struct rw_semaphore *rwsem)
@@ -402,7 +408,8 @@ void __rt_rwsem_init(struct rw_semaphor
debug_check_no_locks_freed((void *)rwsem, sizeof(*rwsem));
lockdep_init_map(&rwsem->dep_map, name, key, 0);
#endif
- rwsem->lock.save_state = 0;
+ rt_rw_mutex_init(&rwsem->rwlock);
+ rwsem->rwlock.mutex.save_state = 0;
}
EXPORT_SYMBOL(__rt_rwsem_init);

Index: linux-rt.git/include/linux/sched.h
===================================================================
--- linux-rt.git.orig/include/linux/sched.h
+++ linux-rt.git/include/linux/sched.h
@@ -993,6 +993,22 @@ struct sched_entity {
#endif
};

+#ifdef CONFIG_PREEMPT_RT_FULL
+struct rt_rw_mutex;
+/**
+ * reader_lock_struct
+ *
+ * @lock: pointer to rwsem that is held
+ * @task: pointer back to task, for lock code
+ * @list_head: link into rt_rw_mutex owners list
+ */
+struct reader_lock_struct {
+ struct rt_rw_mutex *lock;
+ struct task_struct *task;
+ struct list_head list;
+};
+#endif
+
struct sched_rt_entity {
struct list_head run_list;
unsigned long timeout;
@@ -1224,6 +1240,10 @@ struct task_struct {
#ifdef CONFIG_PREEMPT_RT_FULL
/* TODO: move me into ->restart_block ? */
struct siginfo forced_info;
+#define MAX_RWLOCK_DEPTH 5
+ int reader_lock_count; /* index of last element in owned_read_locks */
+ int reader_lock_free; /* index of next free element. */
+ struct reader_lock_struct owned_read_locks[MAX_RWLOCK_DEPTH];
#endif

unsigned long sas_ss_sp;
Index: linux-rt.git/kernel/fork.c
===================================================================
--- linux-rt.git.orig/kernel/fork.c
+++ linux-rt.git/kernel/fork.c
@@ -1385,6 +1385,26 @@ static struct task_struct *copy_process(
if (retval)
goto bad_fork_cleanup_io;

+#ifdef CONFIG_PREEMPT_RT_FULL
+ p->reader_lock_count = 0;
+ p->reader_lock_free = 0;
+ /* Bracket to keep 'i' local */
+ {
+ int i;
+ /*
+ * We could put the initialization of this list in
+ * the grabbing of the lock, but it is safer to
+ * do it now. The list head initialization may be
+ * removed, but we'll keep it for now, just to be safe.
+ */
+ for (i = 0; i < MAX_RWLOCK_DEPTH; i++) {
+ p->owned_read_locks[i].lock = NULL;
+ p->owned_read_locks[i].task = p;
+ INIT_LIST_HEAD(&p->owned_read_locks[i].list);
+ }
+ }
+#endif
+
if (pid != &init_struct_pid) {
retval = -ENOMEM;
pid = alloc_pid(p->nsproxy->pid_ns_for_children);
Index: linux-rt.git/kernel/sysctl.c
===================================================================
--- linux-rt.git.orig/kernel/sysctl.c
+++ linux-rt.git/kernel/sysctl.c
@@ -91,6 +91,10 @@
#include <linux/nmi.h>
#endif

+#ifdef CONFIG_PREEMPT_RT_FULL
+extern int rt_rw_limit;
+#endif
+

#if defined(CONFIG_SYSCTL)

@@ -453,6 +457,15 @@ static struct ctl_table kern_table[] = {
.proc_handler = proc_dointvec,
},
#endif
+#ifdef CONFIG_PREEMPT_RT_FULL
+ {
+ .procname = "rwsem_reader_limit",
+ .data = &rt_rw_limit,
+ .maxlen = sizeof(int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec,
+ },
+#endif
{
.procname = "panic",
.data = &panic_timeout,
Index: linux-rt.git/include/linux/rtmutex.h
===================================================================
--- linux-rt.git.orig/include/linux/rtmutex.h
+++ linux-rt.git/include/linux/rtmutex.h
@@ -42,6 +42,21 @@ struct rt_mutex {
#endif
};

+/**
+ * The rt_rw_mutex structure
+ *
+ * @rt_mutex: The mutex to wait on
+ * @owners: list of read owners of the mutex
+ * @nr_owners: number of read owners
+ * @prio: the priority of the highest waiter
+ */
+struct rt_rw_mutex {
+ struct rt_mutex mutex;
+ struct list_head owners;
+ int nr_owners;
+ int prio;
+};
+
struct rt_mutex_waiter;
struct hrtimer_sleeper;

@@ -75,6 +90,15 @@ struct hrtimer_sleeper;
__rt_mutex_init(mutex, #mutex); \
} while (0)

+# define rt_rw_mutex_init(rwmutex) \
+ do { \
+ raw_spin_lock_init(&(rwmutex)->mutex.wait_lock); \
+ INIT_LIST_HEAD(&(rwmutex)->owners); \
+ (rwmutex)->nr_owners = 0; \
+ (rwmutex)->prio = -1; \
+ __rt_mutex_init(&(rwmutex)->mutex, #rwmutex); \
+ } while (0)
+
#define __RT_MUTEX_INITIALIZER_PLAIN(mutexname) \
.wait_lock = __RAW_SPIN_LOCK_UNLOCKED(mutexname.wait_lock) \
, .wait_list = PLIST_HEAD_INIT(mutexname.wait_list) \
@@ -85,6 +109,11 @@ struct hrtimer_sleeper;
#define __RT_MUTEX_INITIALIZER(mutexname) \
{ __RT_MUTEX_INITIALIZER_PLAIN(mutexname) }

+#define __RT_RW_MUTEX_INITIALIZER(mutexname) \
+ { .owners = LIST_HEAD_INIT(mutexname.owners) \
+ , .prio = -1 \
+ , .mutex = __RT_MUTEX_INITIALIZER(mutexname.mutex) }
+
#define __RT_MUTEX_INITIALIZER_SAVE_STATE(mutexname) \
{ __RT_MUTEX_INITIALIZER_PLAIN(mutexname) \
, .save_state = 1 }
Index: linux-rt.git/include/linux/rwsem_rt.h
===================================================================
--- linux-rt.git.orig/include/linux/rwsem_rt.h
+++ linux-rt.git/include/linux/rwsem_rt.h
@@ -19,14 +19,14 @@
#include <linux/rtmutex.h>

struct rw_semaphore {
- struct rt_mutex lock;
+ struct rt_rw_mutex rwlock;
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
};

#define __RWSEM_INITIALIZER(name) \
- { .lock = __RT_MUTEX_INITIALIZER(name.lock), \
+ { .rwlock = __RT_RW_MUTEX_INITIALIZER(name.rwlock), \
RW_DEP_MAP_INIT(name) }

#define DECLARE_RWSEM(lockname) \
@@ -37,7 +37,7 @@ extern void __rt_rwsem_init(struct rw_s

#define __rt_init_rwsem(sem, name, key) \
do { \
- rt_mutex_init(&(sem)->lock); \
+ rt_rw_mutex_init(&(sem)->rwlock); \
__rt_rwsem_init((sem), (name), (key));\
} while (0)

@@ -63,7 +63,7 @@ extern void rt_up_write(struct rw_semap
extern void rt_downgrade_write(struct rw_semaphore *rwsem);

#define init_rwsem(sem) rt_init_rwsem(sem)
-#define rwsem_is_locked(s) rt_mutex_is_locked(&(s)->lock)
+#define rwsem_is_locked(s) rt_mutex_is_locked(&(s)->rwlock.mutex)

static inline void down_read(struct rw_semaphore *sem)
{
Index: linux-rt.git/kernel/futex.c
===================================================================
--- linux-rt.git.orig/kernel/futex.c
+++ linux-rt.git/kernel/futex.c
@@ -2327,7 +2327,7 @@ static int futex_wait_requeue_pi(u32 __u
* The waiter is allocated on our stack, manipulated by the requeue
* code while we sleep on uaddr.
*/
- rt_mutex_init_waiter(&rt_waiter, false);
+ rt_mutex_init_waiter(&rt_waiter, false, false);

ret = get_futex_key(uaddr2, flags & FLAGS_SHARED, &key2, VERIFY_WRITE);
if (unlikely(ret != 0))

2014-04-11 02:50:31

by Mike Galbraith

[permalink] [raw]
Subject: Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems

Oh, there was another useful bit.

3.12.17-rt25 403991 384216 377786
405466 394011 392850

3.12.17-nopreempt 109049 186133 118766 !?!.. ew, TTWU_QUEUE
350385 318786 367336 !TTWU_QUEUE
326009 356939 378215 !TTWU_QUEUE

IPI == low-pass filter.

-Mike

2014-04-11 03:25:46

by Steven Rostedt

[permalink] [raw]
Subject: Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems

On Fri, 11 Apr 2014 04:50:26 +0200
Mike Galbraith <[email protected]> wrote:

> Oh, there was another useful bit.
>
> 3.12.17-rt25 403991 384216 377786
> 405466 394011 392850
>
> 3.12.17-nopreempt 109049 186133 118766 !?!.. ew, TTWU_QUEUE
> 350385 318786 367336 !TTWU_QUEUE
> 326009 356939 378215 !TTWU_QUEUE
>
> IPI == low-pass filter.

I'm sorry, but I don't have the foggiest clue to what the above means.

-- Steve

2014-04-11 03:52:41

by Mike Galbraith

[permalink] [raw]
Subject: Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems

On Thu, 2014-04-10 at 23:25 -0400, Steven Rostedt wrote:
> On Fri, 11 Apr 2014 04:50:26 +0200
> Mike Galbraith <[email protected]> wrote:
>
> > Oh, there was another useful bit.
> >
> > 3.12.17-rt25 403991 384216 377786
> > 405466 394011 392850
> >
> > 3.12.17-nopreempt 109049 186133 118766 !?!.. ew, TTWU_QUEUE
> > 350385 318786 367336 !TTWU_QUEUE
> > 326009 356939 378215 !TTWU_QUEUE
> >
> > IPI == low-pass filter.
>
> I'm sorry, but I don't have the foggiest clue to what the above means.

It means..

# Overhead Symbol
# ........ .......................................................................................................................................................................................
#
17.33% [k] native_write_msr_safe
|
|--88.45%-- __x2apic_send_IPI_mask
| |
| |--97.89%-- try_to_wake_up
| | |
| | |--99.91%-- wake_futex
| | | |
| | | |--99.77%-- futex_wake_op
| | | | do_futex
| | | | sys_futex
| | | | system_call_fastpath

..wakeup frequency is restricted by the IPI.

-Mike

2014-04-11 04:25:32

by Mike Galbraith

[permalink] [raw]
Subject: Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems

On Fri, 2014-04-11 at 05:52 +0200, Mike Galbraith wrote:
> On Thu, 2014-04-10 at 23:25 -0400, Steven Rostedt wrote:
> > On Fri, 11 Apr 2014 04:50:26 +0200
> > Mike Galbraith <[email protected]> wrote:
> >
> > > Oh, there was another useful bit.
> > >
> > > 3.12.17-rt25 403991 384216 377786
> > > 405466 394011 392850
> > >
> > > 3.12.17-nopreempt 109049 186133 118766 !?!.. ew, TTWU_QUEUE
> > > 350385 318786 367336 !TTWU_QUEUE
> > > 326009 356939 378215 !TTWU_QUEUE
> > >
> > > IPI == low-pass filter.
> >
> > I'm sorry, but I don't have the foggiest clue to what the above means.
>
> It means..
>
> # Overhead Symbol
> # ........ .......................................................................................................................................................................................
> #
> 17.33% [k] native_write_msr_safe
> |
> |--88.45%-- __x2apic_send_IPI_mask
> | |
> | |--97.89%-- try_to_wake_up
> | | |
> | | |--99.91%-- wake_futex
> | | | |
> | | | |--99.77%-- futex_wake_op
> | | | | do_futex
> | | | | sys_futex
> | | | | system_call_fastpath
>
> ..wakeup frequency is restricted by the IPI.

Turn TTWU_QUEUE off, bottleneck goes away.

# Overhead Symbol
# ........ .......................................................................................................................................................................................
#
6.12% [.] _ZN13ObjectMonitor20TrySpin_VaryDurationEP6Thread
|
|--61.27%-- _ZN13ObjectMonitor5enterEP6Thread
| |
| |--99.96%-- _ZN13SharedRuntime26complete_monitor_locking_CEP7oopDescP9BasicLockP10JavaThread
| | 0x7f93149b2748
| | |
| | |--99.76%-- 0xf1a02b30
| | | 0xbaba00000000fc24
| | --0.24%-- [...]
| --0.04%-- [...]
|
|--38.40%-- _ZN13ObjectMonitor6EnterIEP6Thread
| _ZN13ObjectMonitor5enterEP6Thread
| |
| |--99.98%-- _ZN13SharedRuntime26complete_monitor_locking_CEP7oopDescP9BasicLockP10JavaThread
| | 0x7f93149b2748
| | |
| | |--99.89%-- 0xf1a02b30
| | | 0xbaba00000000fc24
| | --0.11%-- [...]
| --0.02%-- [...]
--0.32%-- [...]

3.59% [k] intel_idle
|
--- cpuidle_enter_state
cpuidle_idle_call
arch_cpu_idle
cpu_startup_entry

2014-04-11 13:25:28

by Steven Rostedt

[permalink] [raw]
Subject: Re: [RFC PATCH RT V3] rwsem: The return of multi-reader PI rwsems

On Fri, 11 Apr 2014 14:47:49 +0200
Carsten Emde <[email protected]> wrote:

> Hi Steven,
>
> > [..] I added Carsten to the Cc, so I'll post the entire change log
> > of v1 here again.
> I've been listening and testing boxes all the time ...
>

I figured but still wanted to add you to the Cc.

> > [..] If you have any benchmark on large machines I would be very
> > happy if you could test this patch against the unpatched version of
> > -rt.
> Three machines
> - an X32 x86_64 (AMD Opteron 6272 @2100 MHz) at rack #1/slot #1,
> - an X4x2 x86_64 (Intel i7-2600K @3400 MHz) at rack #4/slot #6, and
> - an X4 ARM (i.MX6 Quad @996 MHz) at rack #8/slot #7
> are running a v3-patched 3.12.15-rt25 kernel now. I'll equip more
> machines later.
>
> What I can say so far is:
> - No evidence for any regression, no crashes

That's good to hear.

> - Performance certainly at least as good as unpatched, probably better

That's even better.

>
> I'll do more tests and come back with more precise performance
> comparison data.

Do you also have any threaded tests? That is, something like a java
benchmark that kicks off lots of threads. That's where the performance
should show up the most. Clark's whack_mmap_sem test is specific to
this, and has shown an 10x increase in performance with my patch. But
that's a micro benchmark. A better test would be a real java
application.

Thanks,

-- Steve

2014-04-11 13:26:00

by Carsten Emde

[permalink] [raw]
Subject: Re: [RFC PATCH RT V3] rwsem: The return of multi-reader PI rwsems

Hi Steven,

> [..] I added Carsten to the Cc, so I'll post the entire change log
> of v1 here again.
I've been listening and testing boxes all the time ...

> [..] If you have any benchmark on large machines I would be very
> happy if you could test this patch against the unpatched version of
> -rt.
Three machines
- an X32 x86_64 (AMD Opteron 6272 @2100 MHz) at rack #1/slot #1,
- an X4x2 x86_64 (Intel i7-2600K @3400 MHz) at rack #4/slot #6, and
- an X4 ARM (i.MX6 Quad @996 MHz) at rack #8/slot #7
are running a v3-patched 3.12.15-rt25 kernel now. I'll equip more
machines later.

What I can say so far is:
- No evidence for any regression, no crashes
- Performance certainly at least as good as unpatched, probably better

I'll do more tests and come back with more precise performance
comparison data.

Thanks,
-Carsten.

Subject: Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems



On 04/10/2014 11:44 AM, Clark Williams wrote:
> On Wed, 9 Apr 2014 15:19:22 -0400
> Steven Rostedt <[email protected]> wrote:
>
>
>> This patch is built on top of the two other patches that I posted
>> earlier, which should not be as controversial.
>>
>> If you have any benchmark on large machines I would be very happy if
>> you could test this patch against the unpatched version of -rt.
>>
>> Cheers,
>>
>> -- Steve
>>
>
> Steven
>
> I wrote a program named whack_mmap_sem which creates a large (4GB)
> buffer, then creates 2 x ncpus threads that are affined across all the
> available cpus. These threads then randomly write into the buffer,
> which should cause page faults galore.
>
> I then built the following kernel configs:
>
> vanilla-3.13.15 - no RT patches applied
> rt-3.12.15 - PREEMPT_RT patchset
> rt-3.12.15-fixes - PREEMPT_RT + rwsem fixes
> rt-3.12.15-multi - PREEMPT_RT + rwsem fixes + rwsem-multi patch
>
> My test h/w was a Dell R520 with a 6-core Intel(R) Xeon(R) CPU E5-2430
> 0 @ 2.20GHz (hyperthreaded). So whack_mmap_sem created 24 threads
> which all partied in the 4GB address range.
>
> I ran whack_mmap_sem with the argument -w 100000 which means each
> thread does 100k writes to random locations inside the buffer and then
> did five runs per each kernel. At the end of the run whack_mmap_sem
> prints out the time of the run in microseconds.
>
> The means of each group of five test runs are:
>
> vanilla.log: 1210117
> rt.log: 17210953 (14.2 x slower than vanilla)
> rt-fixes.log: 10062027 (8.3 x slower than vanilla)
> rt-multi.log: 3179582 (2.x x slower than vanilla)
>

Hi

I ran Clark's test on a machine with 32 CPUs: 2 Sockets, 8 core/socket + HT

On this machine I ran 5 different kernels:

Vanilla: 3.12.15 - Vanilla
RT: 3.12.15 + Preempt-RT 3.12.15-rt25
FIX: RT + rwsem fixes from rostedt
Multi: FIX + Multi-reader PI
Multi -FULL: Multi + CONFIG_PREEMPT=y

I ran the test with the same parameters that Clark used, 100 iterations
for each kernel. For each kernel I measure the min and max execution
time, along with the avg execution time and the standard deviation.

The result was:

+-------+---------+----------+----------+-----------+-------------+
| | Vanilla | RT | FIX | Multi | Multi -FULL |
--------+---------+----------+----------+-----------+-------------+
|MIN: | 3806754 | 6092939 | 6324665 | 2633614 | 3867240 |
|AVG: | 3875201 | 8162832 | 8007934 | 2736253 | 3961607 |
|MAX: | 4062205 | 10951416 | 10574212 | 2972458 | 4139297 |
|STDEV: | 47645 | 927839 | 943482 | 52579 | 943482 |
+-------+---------+----------+----------+-----------+-------------+

A comparative of avg case to vanilla:

RT - 2.10x (slower)
FIX - 2.06x (slower)
Multi - 0.70x (faster?)
Multi no PREEMPT_FULL - 1.02x (equals?)

As we can see, the patch gave good results on Preempt-RT, but my results
was a little bit weird, because the PREEMPT-RT + Multi patch became
faster than vanilla.

In the standard deviation, the patch showed a good result as well, with
the patch the std dev became ~17x smaller than on RT kernel without the
patch, which means less jitter.

-- Daniel

2014-04-14 09:56:03

by Ingo Molnar

[permalink] [raw]
Subject: Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems


* Steven Rostedt <[email protected]> wrote:

> A while back ago I wrote a patch that would allow for reader/writer
> locks like rwlock and rwsems to have multiple readers in PREEMPT_RT. It
> was slick and fast but unfortunately it was way too complex and ridden
> with nasty little critters which earned me my large collection of
> frozen sharks in the fridge (which are quite tasty).
>
> The main problem with my previous solution was that I tried to be too
> clever. I worked hard on making the rw mutex still have the "fast
> path". That is, the cmpxchg that could allow a non contended grabbing
> of the lock be one instruction and be off with it. But to get that
> working required lots of tricks and black magic that was certainly
> going to fail. Thus, with the raining of sharks on my parade, the
> priority inheritance mutex with multiple owners died a slow painful
> death.
>
> So we thought.
>
> But over the years, a new darkness was on the horizon. Complaints about
> running highly threaded processes (did I hear Java?) were suffering
> huge performance hits on the PREEMPT_RT kernel. Whether or not the
> processes were real-time tasks, they still were horrible compared to
> running the same tasks on the mainline kernel. Note, this was being
> done on machines with many CPUs.
>
> The culprit mostly was a single rwsem, the notorious mmap_sem that
> can be taking several times for read, and as on RT, this is just a
> single mutex, and it would serialize these accesses that would not
> happen on mainline.
>
> I looked back at my poor dead rw multi pi reader patch and thought to
> myself. "How complex would this be if I removed the 'fast path' from
> the code". I decided to build a new tower in Mordor.
>
> I feel that I am correct. By removing the fast path and requiring all
> accesses to the rwsem to go through the slow path (must take the
> wait_lock to do anything). The code really wasn't that bad. I also only
> focused on the rwsem and did not worry about the rwlocks as that hasn't
> been pointed out as a bottle neck yet. If it does happen to be, this
> code could easily work on rwlocks too.
>
> I'm much more confident in this code than I was with my previous
> version of the rwlock multi-reader patch. I added a bunch of comments
> to this code to explain how things interact. The writer unlock was
> still able to use the fast path as the writers are pretty much like a
> normal mutex. Too bad that the writer unlock is not a high point of
> contention.
>
> This patch is built on top of the two other patches that I posted
> earlier, which should not be as controversial.
>
> If you have any benchmark on large machines I would be very happy if
> you could test this patch against the unpatched version of -rt.
>
> Cheers,
>
> -- Steve
>
> Signed-off-by: Steven Rostedt <[email protected]>
> ---
> Index: linux-rt.git/kernel/rtmutex.c

Side note: could you please in general include diffstats with such
patches, especially since you seem to be exporting it from a Git repo?

Newfangled patch summaries like:

include/linux/rtmutex.h | 29 ++
include/linux/rwsem_rt.h | 8
include/linux/sched.h | 20 +
kernel/fork.c | 20 +
kernel/futex.c | 2
kernel/rt.c | 27 +
kernel/rtmutex.c | 645 +++++++++++++++++++++++++++++++++++++++++++++--
kernel/rtmutex_common.h | 19 +
kernel/sysctl.c | 13
9 files changed, 753 insertions(+), 30 deletions(-)

Really give a useful bird's eye view of forest Fangorn, before
straying into it!

Thanks,

Ingo

2014-04-14 13:34:23

by Steven Rostedt

[permalink] [raw]
Subject: Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems

On Mon, 14 Apr 2014 11:55:54 +0200
Ingo Molnar <[email protected]> wrote:

> > Index: linux-rt.git/kernel/rtmutex.c
>
> Side note: could you please in general include diffstats with such
> patches, especially since you seem to be exporting it from a Git repo?

Sure, I'll try to remember to add them. I do my work in a git repo and
then do:

git diff > mypatch.patch
quilt import mypatch.patch
patch -p1 -R < mypatch.patch
rm mypatch.patch

and then I pull patches/mypatch.patch into my email to do the RFC.

>
> Newfangled patch summaries like:
>
> include/linux/rtmutex.h | 29 ++
> include/linux/rwsem_rt.h | 8
> include/linux/sched.h | 20 +
> kernel/fork.c | 20 +
> kernel/futex.c | 2
> kernel/rt.c | 27 +
> kernel/rtmutex.c | 645 +++++++++++++++++++++++++++++++++++++++++++++--
> kernel/rtmutex_common.h | 19 +
> kernel/sysctl.c | 13
> 9 files changed, 753 insertions(+), 30 deletions(-)
>
> Really give a useful bird's eye view of forest Fangorn, before
> straying into it!

diffstat patches/mypatch.patch should also give the same. Hmm, I wonder
if quilt has any commands to do this for me?

Thanks,

-- Steve

2014-04-14 14:08:37

by Peter Zijlstra

[permalink] [raw]
Subject: Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems

On Mon, Apr 14, 2014 at 09:34:19AM -0400, Steven Rostedt wrote:
> On Mon, 14 Apr 2014 11:55:54 +0200
> Ingo Molnar <[email protected]> wrote:
>
> > > Index: linux-rt.git/kernel/rtmutex.c
> >
> > Side note: could you please in general include diffstats with such
> > patches, especially since you seem to be exporting it from a Git repo?
>
> Sure, I'll try to remember to add them. I do my work in a git repo and
> then do:
>
> git diff > mypatch.patch
> quilt import mypatch.patch
> patch -p1 -R < mypatch.patch
> rm mypatch.patch
>
> and then I pull patches/mypatch.patch into my email to do the RFC.
>
> >
> > Newfangled patch summaries like:
> >
> > include/linux/rtmutex.h | 29 ++
> > include/linux/rwsem_rt.h | 8
> > include/linux/sched.h | 20 +
> > kernel/fork.c | 20 +
> > kernel/futex.c | 2
> > kernel/rt.c | 27 +
> > kernel/rtmutex.c | 645 +++++++++++++++++++++++++++++++++++++++++++++--
> > kernel/rtmutex_common.h | 19 +
> > kernel/sysctl.c | 13
> > 9 files changed, 753 insertions(+), 30 deletions(-)
> >
> > Really give a useful bird's eye view of forest Fangorn, before
> > straying into it!
>
> diffstat patches/mypatch.patch should also give the same. Hmm, I wonder
> if quilt has any commands to do this for me?

QUILT_REFRESH_ARGS="--no-timestamps --backup --diffstat --strip-trailing-whitespace --no-index --sort -p1 -p ab"


Note the --diffstat; you'll need to add 'quilt refresh' to your cmd-list
though.

2014-04-17 23:26:14

by Steven Rostedt

[permalink] [raw]
Subject: [RFC PATCH RT V4] rwsem: The return of multi-reader PI rwsems

Changes since v3:

Clark reported that he was seeing a large latency when he added this
patch. I tested it out on a 8 logical CPU box, and sure enough I was
seeing it too. After spending the day debugging why, I found that I had
a bug in rt_mutex_getprio(), where I could do:

min(task_top_pi_waiter(task)->pi_list_entry.prio, prio)

when there was no "top_pi_waiter", which would give garbage as a
result. This would let some tasks have higher priority than they
should, and cause other tasks that should have high priority not run.

After correcting this issue, the latencies are back down to normal.

-- Steve

Signed-off-by: Steven Rostedt <[email protected]>
---
include/linux/rtmutex.h | 29 ++
include/linux/rwsem_rt.h | 8
include/linux/sched.h | 20 +
kernel/fork.c | 20 +
kernel/futex.c | 2
kernel/rt.c | 27 +
kernel/rtmutex.c | 664 ++++++++++++++++++++++++++++++++++++++++++++++-
kernel/rtmutex_common.h | 19 +
kernel/sysctl.c | 13
9 files changed, 772 insertions(+), 30 deletions(-)

Index: linux-rt.git/kernel/rtmutex.c
===================================================================
--- linux-rt.git.orig/kernel/rtmutex.c 2014-04-16 10:19:09.030579948 -0400
+++ linux-rt.git/kernel/rtmutex.c 2014-04-17 19:11:24.018721323 -0400
@@ -26,6 +26,21 @@
#include "rtmutex_common.h"

/*
+ * rt_rw_limit is the number of simultaneous readers of a rwsem lock.
+ *
+ * rt_rw_limit gets updated on boot up to the number of
+ * possible CPUs, but we need to initialize it to something other
+ * than zero.
+ */
+unsigned rt_rw_limit = NR_CPUS;
+
+/* cnt == 0 means unlimited */
+static inline int under_rt_rw_limit(int cnt)
+{
+ return !rt_rw_limit || cnt < rt_rw_limit;
+}
+
+/*
* lock->owner state tracking:
*
* lock->owner holds the task_struct pointer of the owner. Bit 0
@@ -110,19 +125,48 @@
plist_head_init(&lock->wait_list);
}

+static inline void init_rw_lists(struct rt_rw_mutex *rwlock)
+{
+ struct rt_mutex *lock = &rwlock->mutex;
+
+ /*
+ * A rwsem priority is initialized to -1 and never will
+ * be that again.
+ */
+ if (unlikely(rwlock->prio < 0)) {
+ rwlock->prio = MAX_PRIO;
+ init_lists(lock);
+ }
+}
+
+static int rt_mutex_get_readers_prio(struct task_struct *task, int prio);
+static inline int task_has_reader_locks(struct task_struct *task);
+
/*
* Calculate task priority from the waiter list priority
*
* Return task->normal_prio when the waiter list is empty or when
* the waiter is not allowed to do priority boosting
+ *
+ * On PREEMPT_RT, we also check the priorities of the list
+ * of read locks that the task holds.
*/
int rt_mutex_getprio(struct task_struct *task)
{
- if (likely(!task_has_pi_waiters(task)))
- return task->normal_prio;
+ int prio = task->normal_prio;
+ bool has_pi_waiters = task_has_pi_waiters(task);
+ bool has_reader_locks = task_has_reader_locks(task);
+
+ if (likely(!has_pi_waiters && !has_reader_locks))
+ return prio;
+
+ if (has_reader_locks)
+ prio = rt_mutex_get_readers_prio(task, prio);
+
+ if (has_pi_waiters)
+ prio = min(task_top_pi_waiter(task)->pi_list_entry.prio, prio);

- return min(task_top_pi_waiter(task)->pi_list_entry.prio,
- task->normal_prio);
+ return prio;
}

/*
@@ -181,6 +225,11 @@
*/
int max_lock_depth = 1024;

+static int rt_mutex_adjust_readers(struct rt_mutex *orig_lock,
+ struct rt_mutex_waiter *orig_waiter,
+ struct task_struct *top_task,
+ struct rt_mutex *lock,
+ int recursion_depth);
/*
* Adjust the priority chain. Also used for deadlock detection.
* Decreases task's usage by one - may thus free the task.
@@ -203,7 +252,8 @@
int deadlock_detect,
struct rt_mutex *orig_lock,
struct rt_mutex_waiter *orig_waiter,
- struct task_struct *top_task)
+ struct task_struct *top_task,
+ int recursion_depth)
{
struct rt_mutex *lock;
struct rt_mutex_waiter *waiter, *top_waiter = orig_waiter;
@@ -316,6 +366,18 @@

/* Grab the next task */
task = rt_mutex_owner(lock);
+
+ /*
+ * Readers are special. We may need to boost more than one owner.
+ */
+ if (unlikely(task == RT_RW_READER)) {
+ ret = rt_mutex_adjust_readers(orig_lock, orig_waiter,
+ top_task, lock,
+ recursion_depth);
+ raw_spin_unlock(&lock->wait_lock);
+ goto out;
+ }
+
get_task_struct(task);
raw_spin_lock_irqsave(&task->pi_lock, flags);

@@ -349,7 +411,7 @@
raw_spin_unlock_irqrestore(&task->pi_lock, flags);
out_put_task:
put_task_struct(task);
-
+ out:
return ret;
}

@@ -518,6 +580,13 @@
return 0;

if (waiter == rt_mutex_top_waiter(lock)) {
+ /* readers are handled differently */
+ if (unlikely(owner == RT_RW_READER)) {
+ res = rt_mutex_adjust_readers(lock, waiter,
+ current, lock, 0);
+ return res;
+ }
+
raw_spin_lock_irqsave(&owner->pi_lock, flags);
plist_del(&top_waiter->pi_list_entry, &owner->pi_waiters);
plist_add(&waiter->pi_list_entry, &owner->pi_waiters);
@@ -527,7 +596,8 @@
chain_walk = 1;
raw_spin_unlock_irqrestore(&owner->pi_lock, flags);
}
- else if (debug_rt_mutex_detect_deadlock(waiter, detect_deadlock))
+ else if (debug_rt_mutex_detect_deadlock(waiter, detect_deadlock) &&
+ owner != RT_RW_READER)
chain_walk = 1;

if (!chain_walk)
@@ -543,7 +613,7 @@
raw_spin_unlock(&lock->wait_lock);

res = rt_mutex_adjust_prio_chain(owner, detect_deadlock, lock, waiter,
- task);
+ task, 0);

raw_spin_lock(&lock->wait_lock);

@@ -633,7 +703,7 @@

raw_spin_unlock(&lock->wait_lock);

- rt_mutex_adjust_prio_chain(owner, 0, lock, NULL, current);
+ rt_mutex_adjust_prio_chain(owner, 0, lock, NULL, current, 0);

raw_spin_lock(&lock->wait_lock);
}
@@ -660,7 +730,7 @@
/* gets dropped in rt_mutex_adjust_prio_chain()! */
get_task_struct(task);
raw_spin_unlock_irqrestore(&task->pi_lock, flags);
- rt_mutex_adjust_prio_chain(task, 0, NULL, NULL, task);
+ rt_mutex_adjust_prio_chain(task, 0, NULL, NULL, task, 0);
}

#ifdef CONFIG_PREEMPT_RT_FULL
@@ -739,7 +809,7 @@
struct rt_mutex_waiter waiter, *top_waiter;
int ret;

- rt_mutex_init_waiter(&waiter, true);
+ rt_mutex_init_waiter(&waiter, true, false);

raw_spin_lock(&lock->wait_lock);
init_lists(lock);
@@ -1001,15 +1071,564 @@

return 0;
}
+
+/*
+ * Wake up the next waiter on a rw lock.
+ *
+ * Similar to wakeup_next_waiter() but the waiter is not on
+ * the owner's pi_waiters. Also, it does not reset the lock
+ * owner.
+ *
+ * Called with lock->wait_lock held.
+ */
+static void wakeup_next_rw_waiter(struct rt_mutex *lock)
+{
+ struct rt_mutex_waiter *waiter;
+
+ waiter = rt_mutex_top_waiter(lock);
+ rt_mutex_wake_waiter(waiter);
+}
+
+/* Called with rwmutex->mutex.wait_lock held */
+static inline void
+rt_rw_mutex_take_as_reader(struct task_struct *task,
+ struct rt_rw_mutex *rwmutex,
+ struct rt_mutex_waiter *waiter)
+{
+ struct reader_lock_struct *rls;
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ unsigned long flags;
+ bool wakeup = false;
+
+ rwmutex->nr_owners++;
+ rt_mutex_set_owner(&rwmutex->mutex, RT_RW_READER);
+
+ if (waiter || rt_mutex_has_waiters(mutex)) {
+ unsigned long flags;
+ struct rt_mutex_waiter *top;
+
+ raw_spin_lock_irqsave(&task->pi_lock, flags);
+
+ /* remove the queued waiter. */
+ if (waiter) {
+ plist_del(&waiter->list_entry, &mutex->wait_list);
+ task->pi_blocked_on = NULL;
+ }
+
+ /*
+ * Initialize the rwmutex prio to the priority of
+ * the top waiter.
+ */
+ if (rt_mutex_has_waiters(mutex)) {
+ top = rt_mutex_top_waiter(mutex);
+ top->pi_list_entry.prio = top->list_entry.prio;
+ /*
+ * Readers set the lock priority for faster access
+ * to read the priorities of the locks it owns
+ * when boosting. This helps to not have to take
+ * the pi_lock of the task. The rwmutex->prio
+ * is protected by the rwmutex->mutex.wait_lock,
+ * which is held during boosting.
+ */
+ rwmutex->prio = top->list_entry.prio;
+
+ /*
+ * If this waiter is a reader, and the reader limit
+ * has not been hit, then we can wake this waiter
+ * up too.
+ */
+ if (!top->writer && under_rt_rw_limit(rwmutex->nr_owners))
+ wakeup = true;
+ } else
+ rwmutex->prio = MAX_PRIO;
+ raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+ } else
+ rwmutex->prio = MAX_PRIO;
+
+ /*
+ * It is possible to have holes in the owned_read_locks array.
+ * If we take read lock A and then B, but then release A, we
+ * can't move the pointer of B because if something blocks on
+ * B, it can use the B pointer to boost this task. B can only
+ * be moved, by owning the wait_list lock of B. Remember, the
+ * B lock has its pointers to that index of our array.
+ */
+ rls = &task->owned_read_locks[task->reader_lock_free];
+ BUG_ON(rls->lock);
+
+ /*
+ * Grabing the pi_lock here as other tasks can boost this
+ * lock via other held locks, and if free lock descriptor
+ * was not at the end of the owned_read_locks array, then
+ * it might get confused if it sees this descriptor with
+ * a lock and no task.
+ */
+ raw_spin_lock_irqsave(&task->pi_lock, flags);
+ rls->lock = rwmutex;
+ rls->task = task;
+ list_add(&rls->list, &rwmutex->owners);
+ raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+ if (task->reader_lock_free == task->reader_lock_count) {
+ /*
+ * If we nest too deep, then this task can never get the lock.
+ * This task will then block for good. Warn about this and
+ * hopefully, people will notice the warning. If not, they will notice
+ * the dead task. Maybe this should be a BUG_ON()
+ */
+ if (WARN_ON(task->reader_lock_count == MAX_RWLOCK_DEPTH))
+ return;
+
+ /*
+ * We don't need to take the pi_lock here as we have the
+ * wait_lock of the lock at the end of the list. If this task
+ * was being boosted by a task blocked on this lock, it would
+ * need to grab the wait_lock before boosting this task.
+ */
+ task->reader_lock_count++;
+ task->reader_lock_free++;
+ } else {
+ /*
+ * Find the next free lock in array. Again, we do not need
+ * to grab the pi_lock because the boosting doesn't use
+ * the reader_lock_free variable, which is the only thing
+ * we are updating here.
+ */
+ do {
+ rls = &task->owned_read_locks[++task->reader_lock_free];
+ } while (rls->lock &&
+ task->reader_lock_free < task->reader_lock_count);
+ }
+
+ if (wakeup)
+ wakeup_next_rw_waiter(&rwmutex->mutex);
+}
+
+static int try_to_take_rw_read(struct rt_rw_mutex *rwmutex,
+ struct rt_mutex_waiter *waiter)
+{
+ struct task_struct *task = current;
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ struct task_struct *owner;
+
+ assert_raw_spin_locked(&mutex->wait_lock);
+
+ /* The writer unlock can use the fast path */
+ mark_rt_mutex_waiters(mutex);
+
+ /* If we are at the reader limit, we can't take the lock */
+ if (unlikely(!under_rt_rw_limit(rwmutex->nr_owners)))
+ return 0;
+
+ /*
+ * If there's no waiters, and there is no owner or
+ * the owner is a reader, we get the lock.
+ * Note, if the waiter or pending bits are set in the owner
+ * then we need to do more checks.
+ */
+ if (likely(!mutex->owner || mutex->owner == RT_RW_READER))
+ goto taken;
+
+ owner = rt_mutex_owner(mutex);
+
+ /* If the lock is owned by a task, then it is a writer */
+ if (owner && owner != RT_RW_READER)
+ return 0;
+
+ /*
+ * A writer or a reader may be waiting. In either case, we
+ * may still be able to steal the lock. The RT rwsems are
+ * not fair if the new reader comes in and is higher priority
+ * than all the waiters.
+ */
+ if (likely(rt_mutex_has_waiters(mutex))) {
+ struct task_struct *pown = rt_mutex_top_waiter(mutex)->task;
+
+ if (task != pown && !lock_is_stealable(task, pown, STEAL_NORMAL))
+ return 0;
+ }
+
+ taken:
+ rt_rw_mutex_take_as_reader(task, rwmutex, waiter);
+ return 1;
+}
+
+void rt_rw_mutex_read_lock(struct rt_rw_mutex *rwmutex)
+{
+ struct rt_mutex_waiter waiter;
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ int ret;
+
+ raw_spin_lock(&mutex->wait_lock);
+ init_rw_lists(rwmutex);
+
+ if (likely(try_to_take_rw_read(rwmutex, NULL))) {
+ /* Got the lock */
+ raw_spin_unlock(&mutex->wait_lock);
+ return;
+ }
+
+ rt_mutex_init_waiter(&waiter, false, false);
+
+ set_current_state(TASK_UNINTERRUPTIBLE);
+
+ ret = task_blocks_on_rt_mutex(mutex, &waiter, current, 0);
+ BUG_ON(ret);
+
+ for (;;) {
+ /* Try to acquire the lock: */
+ if (try_to_take_rw_read(rwmutex, &waiter))
+ break;
+
+ raw_spin_unlock(&mutex->wait_lock);
+
+ debug_rt_mutex_print_deadlock(waiter);
+
+ schedule_rt_mutex(mutex);
+
+ raw_spin_lock(&mutex->wait_lock);
+ set_current_state(TASK_UNINTERRUPTIBLE);
+ }
+
+ set_current_state(TASK_RUNNING);
+
+ raw_spin_unlock(&mutex->wait_lock);
+
+ debug_rt_mutex_free_waiter(&waiter);
+}
+
+int rt_rw_mutex_read_trylock(struct rt_rw_mutex *rwmutex)
+{
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ int ret = 0;
+
+ if (!raw_spin_trylock(&mutex->wait_lock))
+ return ret;
+
+ init_rw_lists(rwmutex);
+
+ if (try_to_take_rw_read(rwmutex, NULL))
+ ret = 1;
+
+ raw_spin_unlock(&mutex->wait_lock);
+
+ return ret;
+}
+
+static int
+try_to_take_rw_write(struct rt_rw_mutex *rwmutex, struct rt_mutex_waiter *waiter)
+{
+ struct rt_mutex *mutex = &rwmutex->mutex;
+
+ /* Writers block if there's any readers owning the lock */
+ if (rwmutex->nr_owners)
+ return 0;
+
+ /* No readers, then we can try to take the mutex normally. */
+ return try_to_take_rt_mutex(mutex, current, waiter);
+}
+
+void rt_rw_mutex_write_lock(struct rt_rw_mutex *rwmutex)
+{
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ struct rt_mutex_waiter waiter;
+ struct task_struct *task = current;
+ int ret;
+
+ raw_spin_lock(&mutex->wait_lock);
+ init_rw_lists(rwmutex);
+
+ if (try_to_take_rw_write(rwmutex, NULL)) {
+ raw_spin_unlock(&mutex->wait_lock);
+ return;
+ }
+
+ /* Writers wake up differently than readers (flag it) */
+ rt_mutex_init_waiter(&waiter, false, true);
+
+ ret = task_blocks_on_rt_mutex(mutex, &waiter, task, 0);
+ BUG_ON(ret);
+
+ set_current_state(TASK_UNINTERRUPTIBLE);
+
+ for (;;) {
+ /* Try to acquire the lock: */
+ if (try_to_take_rw_write(rwmutex, &waiter))
+ break;
+
+ raw_spin_unlock(&mutex->wait_lock);
+
+ debug_rt_mutex_print_deadlock(waiter);
+
+ schedule_rt_mutex(mutex);
+
+ raw_spin_lock(&mutex->wait_lock);
+ set_current_state(TASK_UNINTERRUPTIBLE);
+ }
+
+ set_current_state(TASK_RUNNING);
+
+ raw_spin_unlock(&mutex->wait_lock);
+ WARN_ONCE(rwmutex->nr_owners, "Writer has lock with readers");
+
+ debug_rt_mutex_free_waiter(&waiter);
+}
+
+int rt_rw_mutex_write_trylock(struct rt_rw_mutex *rwmutex)
+{
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ int ret = 0;
+
+ if (!raw_spin_trylock(&mutex->wait_lock))
+ return ret;
+
+ init_rw_lists(rwmutex);
+
+ if (try_to_take_rw_write(rwmutex, NULL))
+ ret = 1;
+
+ raw_spin_unlock(&mutex->wait_lock);
+
+ return ret;
+}
+
+/*
+ * When a reader lock is released, see if the reader_lock_count
+ * can be moved back.
+ */
+static void shrink_reader_lock_array(struct task_struct *task)
+{
+ struct reader_lock_struct *read_locks = task->owned_read_locks;
+
+ while (task->reader_lock_count &&
+ read_locks[task->reader_lock_count - 1].lock == NULL)
+ task->reader_lock_count--;
+
+ if (task->reader_lock_free > task->reader_lock_count)
+ task->reader_lock_free = task->reader_lock_count;
+}
+
+void rt_rw_mutex_read_unlock(struct rt_rw_mutex *rwmutex)
+{
+ struct rt_mutex *mutex = &rwmutex->mutex;
+ struct rt_mutex_waiter *waiter;
+ struct reader_lock_struct *rls;
+ struct task_struct *task = current;
+ unsigned long flags;
+ int readers;
+ int i;
+
+ raw_spin_lock(&mutex->wait_lock);
+
+ rt_mutex_deadlock_account_unlock(current);
+
+ WARN_ON_ONCE(!rwmutex->nr_owners);
+
+ rwmutex->nr_owners--;
+
+ if (rt_mutex_has_waiters(mutex)) {
+ /*
+ * If the top waiter is a reader and we are under
+ * the limit, or the top waiter is a writer and
+ * there's no more readers, then we can give the
+ * top waiter pending ownership and wake it up.
+ *
+ * To simplify things, we only wake up one task, even
+ * if its a reader. If the reader wakes up and gets the
+ * lock, it will look to see if it can wake up the next
+ * waiter, and so on. This way we only need to worry about
+ * one task at a time.
+ */
+ waiter = rt_mutex_top_waiter(mutex);
+ readers = rwmutex->nr_owners;
+ if ((waiter->writer && !readers) ||
+ (!waiter->writer && under_rt_rw_limit(readers)))
+ wakeup_next_rw_waiter(mutex);
+ } else
+ rwmutex->prio = MAX_PRIO;
+
+ if (!rwmutex->nr_owners)
+ rt_mutex_set_owner(&rwmutex->mutex, NULL);
+
+ raw_spin_lock_irqsave(&task->pi_lock, flags);
+ /* Remove the lock from this tasks list */
+ for (i = task->reader_lock_count - 1; i >= 0; i--) {
+ rls = &task->owned_read_locks[i];
+
+ if (rls->lock == rwmutex) {
+ rls->lock = NULL;
+ list_del_init(&rls->list);
+ /* Shrink the array if we can. */
+ if (i == task->reader_lock_count - 1)
+ shrink_reader_lock_array(task);
+ else if (i < task->reader_lock_free)
+ task->reader_lock_free = i;
+ break;
+ }
+ }
+ raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+ WARN_ON_ONCE(i < 0);
+
+ raw_spin_unlock(&mutex->wait_lock);
+
+ /* Undo pi boosting if necessary */
+ rt_mutex_adjust_prio(current);
+}
+
+void rt_rw_mutex_downgrade_write(struct rt_rw_mutex *rwmutex)
+{
+ struct task_struct *task = current;
+ struct rt_mutex *mutex = &rwmutex->mutex;
+
+ raw_spin_lock(&mutex->wait_lock);
+
+ /*
+ * Writers have normal pi with other tasks blocked
+ * on the lock. That is, the top waiter will be in the
+ * pi_list of this task. But for readers, waiters of
+ * the lock are not included in the pi_list, only the
+ * locks are. We must remove the top waiter of this
+ * lock from this task.
+ */
+ if (rt_mutex_has_waiters(mutex)) {
+ struct rt_mutex_waiter *waiter;
+ unsigned long flags;
+
+ waiter = rt_mutex_top_waiter(mutex);
+ raw_spin_lock_irqsave(&task->pi_lock, flags);
+ plist_del(&waiter->pi_list_entry, &task->pi_waiters);
+ /*
+ * The rt_rw_mutex_take_as_reader() will update
+ * the rwmutex prio.
+ */
+ raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+ }
+
+ WARN_ONCE(rwmutex->nr_owners, "Writer owned with readers");
+
+ rt_rw_mutex_take_as_reader(task, rwmutex, NULL);
+
+ raw_spin_unlock(&mutex->wait_lock);
+}
+
+static inline int task_has_reader_locks(struct task_struct *task)
+{
+ return task->reader_lock_count;
+}
+
+static int rt_mutex_get_readers_prio(struct task_struct *task, int prio)
+{
+ struct reader_lock_struct *rls;
+ struct rt_rw_mutex *rwmutex;
+ int lock_prio;
+ int i;
+
+ for (i = 0; i < task->reader_lock_count; i++) {
+ rls = &task->owned_read_locks[i];
+ rwmutex = rls->lock;
+ if (!rwmutex)
+ continue;
+ lock_prio = rwmutex->prio;
+ if (prio > lock_prio)
+ prio = lock_prio;
+ }
+ WARN_ON_ONCE(prio < 0);
+
+ return prio;
+}
+
+/* Expects to be called with lock->wait_lock held */
+static int rt_mutex_adjust_readers(struct rt_mutex *orig_lock,
+ struct rt_mutex_waiter *orig_waiter,
+ struct task_struct *top_task,
+ struct rt_mutex *lock,
+ int recursion_depth)
+{
+ struct reader_lock_struct *rls;
+ struct rt_mutex_waiter *waiter;
+ struct task_struct *task;
+ struct rt_rw_mutex *rwmutex = container_of(lock, struct rt_rw_mutex, mutex);
+ unsigned long flags;
+
+ /* Update the rwmutex's prio */
+ if (rt_mutex_has_waiters(lock)) {
+ waiter = rt_mutex_top_waiter(lock);
+ /*
+ * Do we need to grab the task->pi_lock?
+ * Really, we are only reading it. If it
+ * changes, then that should follow this chain
+ * too.
+ */
+ rwmutex->prio = waiter->task->prio;
+ } else
+ rwmutex->prio = MAX_PRIO;
+
+ if (recursion_depth >= MAX_RWLOCK_DEPTH) {
+ WARN_ON(1);
+ return 1;
+ }
+
+ list_for_each_entry(rls, &rwmutex->owners, list) {
+ bool skip = false;
+
+ task = rls->task;
+
+ raw_spin_lock_irqsave(&task->pi_lock, flags);
+ __rt_mutex_adjust_prio(task);
+ /*
+ * We need to grab the pi_lock to adjust the task prio
+ * might as well use this to check if the task is blocked
+ * as well, and save on a call to the prio chain that will
+ * just grab the lock again and do the test.
+ */
+ if (!rt_mutex_real_waiter(task->pi_blocked_on))
+ skip = true;
+ raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+ if (skip)
+ continue;
+
+ get_task_struct(task);
+ /*
+ * rt_mutex_adjust_prio_chain will do
+ * the put_task_struct
+ */
+ rt_mutex_adjust_prio_chain(task, 0, orig_lock,
+ orig_waiter, top_task,
+ recursion_depth+1);
+ }
+
+ return 0;
+}
#else
+static int rt_mutex_adjust_readers(struct rt_mutex *orig_lock,
+ struct rt_mutex_waiter *orig_waiter,
+ struct task_struct *top_task,
+ struct rt_mutex *lock,
+ int recursion_depth)
+{
+ return 0;
+}
+
+static int rt_mutex_get_readers_prio(struct task_struct *task, int prio)
+{
+ return prio;
+}
+
+static inline int task_has_reader_locks(struct task_struct *task)
+{
+ return 0;
+}
+
static inline int __sched
__mutex_lock_check_stamp(struct rt_mutex *lock, struct ww_acquire_ctx *ctx)
{
BUG();
return 0;
}
-
-#endif
+#endif /* CONFIG_PREEMPT_RT */

/**
* __rt_mutex_slowlock() - Perform the wait-wake-try-to-take loop
@@ -1154,7 +1773,7 @@
struct rt_mutex_waiter waiter;
int ret = 0;

- rt_mutex_init_waiter(&waiter, false);
+ rt_mutex_init_waiter(&waiter, false, false);

raw_spin_lock(&lock->wait_lock);
init_lists(lock);
@@ -1718,4 +2337,21 @@
rt_mutex_unlock(&lock->base.lock);
}
EXPORT_SYMBOL(ww_mutex_unlock);
+
+/*
+ * On boot up, rt_rw_limit is set to NR_CPUS. At the end of boot
+ * we can lower that to actual CPUs as everything should be running
+ * as it most likely will on a normal system.
+ *
+ * Note, benchmarks have shown that the best performance we get
+ * from doing a page fault stress test on threads, is when
+ * rt_rw_limit is set to 2x online CPUs.
+ */
+static int __init rt_rw_limit_init(void)
+{
+ rt_rw_limit = nr_cpu_ids * 2;
+ return 0;
+}
+late_initcall(rt_rw_limit_init);
+
#endif
Index: linux-rt.git/kernel/rtmutex_common.h
===================================================================
--- linux-rt.git.orig/kernel/rtmutex_common.h 2014-04-16 10:19:09.030579948 -0400
+++ linux-rt.git/kernel/rtmutex_common.h 2014-04-16 10:31:20.160360827 -0400
@@ -43,6 +43,7 @@
* @list_entry: pi node to enqueue into the mutex waiters list
* @pi_list_entry: pi node to enqueue into the mutex owner waiters list
* @task: task reference to the blocked task
+ * @writer: true if its a rwsem writer that is blocked
*/
struct rt_mutex_waiter {
struct plist_node list_entry;
@@ -50,6 +51,7 @@
struct task_struct *task;
struct rt_mutex *lock;
bool savestate;
+ bool writer;
#ifdef CONFIG_DEBUG_RT_MUTEXES
unsigned long ip;
struct pid *deadlock_task_pid;
@@ -101,6 +103,20 @@
((unsigned long)lock->owner & ~RT_MUTEX_OWNER_MASKALL);
}

+/* used as reader owner of the mutex */
+#define RT_RW_READER (struct task_struct *)0x100
+
+#ifdef CONFIG_PREEMPT_RT_FULL
+
+void rt_rw_mutex_read_unlock(struct rt_rw_mutex *rwmutex);
+void rt_rw_mutex_downgrade_write(struct rt_rw_mutex *rwmutex);
+int rt_rw_mutex_write_trylock(struct rt_rw_mutex *rwmutex);
+void rt_rw_mutex_write_lock(struct rt_rw_mutex *rwmutex);
+int rt_rw_mutex_read_trylock(struct rt_rw_mutex *rwmutex);
+void rt_rw_mutex_read_lock(struct rt_rw_mutex *rwmutex);
+
+#endif /* CONFIG_PREEMPT_RT */
+
/*
* PI-futex support (proxy locking functions, etc.):
*/
@@ -128,11 +144,12 @@
#endif

static inline void
-rt_mutex_init_waiter(struct rt_mutex_waiter *waiter, bool savestate)
+rt_mutex_init_waiter(struct rt_mutex_waiter *waiter, bool savestate, bool writer)
{
debug_rt_mutex_init_waiter(waiter);
waiter->task = NULL;
waiter->savestate = savestate;
+ waiter->writer = writer;
}

#endif
Index: linux-rt.git/kernel/rt.c
===================================================================
--- linux-rt.git.orig/kernel/rt.c 2014-04-16 10:19:09.030579948 -0400
+++ linux-rt.git/kernel/rt.c 2014-04-16 10:31:20.170360699 -0400
@@ -310,14 +310,19 @@
void rt_up_write(struct rw_semaphore *rwsem)
{
rwsem_release(&rwsem->dep_map, 1, _RET_IP_);
- rt_mutex_unlock(&rwsem->lock);
+ /*
+ * Unlocking a write is the same as unlocking the mutex.
+ * The woken reader will do all the work if it needs to
+ * wake up more than one reader.
+ */
+ __rt_spin_unlock(&rwsem->rwlock.mutex);
}
EXPORT_SYMBOL(rt_up_write);

void rt_up_read(struct rw_semaphore *rwsem)
{
rwsem_release(&rwsem->dep_map, 1, _RET_IP_);
- rt_mutex_unlock(&rwsem->lock);
+ rt_rw_mutex_read_unlock(&rwsem->rwlock);
}
EXPORT_SYMBOL(rt_up_read);

@@ -327,13 +332,14 @@
*/
void rt_downgrade_write(struct rw_semaphore *rwsem)
{
- BUG_ON(rt_mutex_owner(&rwsem->lock) != current);
+ BUG_ON(rt_mutex_owner(&rwsem->rwlock.mutex) != current);
+ rt_rw_mutex_downgrade_write(&rwsem->rwlock);
}
EXPORT_SYMBOL(rt_downgrade_write);

int rt_down_write_trylock(struct rw_semaphore *rwsem)
{
- int ret = rt_mutex_trylock(&rwsem->lock);
+ int ret = rt_rw_mutex_write_trylock(&rwsem->rwlock);

if (ret)
rwsem_acquire(&rwsem->dep_map, 0, 1, _RET_IP_);
@@ -344,14 +350,14 @@
void rt_down_write(struct rw_semaphore *rwsem)
{
rwsem_acquire(&rwsem->dep_map, 0, 0, _RET_IP_);
- rt_mutex_lock(&rwsem->lock);
+ rt_rw_mutex_write_lock(&rwsem->rwlock);
}
EXPORT_SYMBOL(rt_down_write);

void rt_down_write_nested(struct rw_semaphore *rwsem, int subclass)
{
rwsem_acquire(&rwsem->dep_map, subclass, 0, _RET_IP_);
- rt_mutex_lock(&rwsem->lock);
+ rt_rw_mutex_write_lock(&rwsem->rwlock);
}
EXPORT_SYMBOL(rt_down_write_nested);

@@ -359,14 +365,14 @@
struct lockdep_map *nest)
{
rwsem_acquire_nest(&rwsem->dep_map, 0, 0, nest, _RET_IP_);
- rt_mutex_lock(&rwsem->lock);
+ rt_rw_mutex_write_lock(&rwsem->rwlock);
}

int rt_down_read_trylock(struct rw_semaphore *rwsem)
{
int ret;

- ret = rt_mutex_trylock(&rwsem->lock);
+ ret = rt_rw_mutex_read_trylock(&rwsem->rwlock);
if (ret)
rwsem_acquire(&rwsem->dep_map, 0, 1, _RET_IP_);

@@ -377,7 +383,7 @@
static void __rt_down_read(struct rw_semaphore *rwsem, int subclass)
{
rwsem_acquire(&rwsem->dep_map, subclass, 0, _RET_IP_);
- rt_mutex_lock(&rwsem->lock);
+ rt_rw_mutex_read_lock(&rwsem->rwlock);
}

void rt_down_read(struct rw_semaphore *rwsem)
@@ -402,7 +408,8 @@
debug_check_no_locks_freed((void *)rwsem, sizeof(*rwsem));
lockdep_init_map(&rwsem->dep_map, name, key, 0);
#endif
- rwsem->lock.save_state = 0;
+ rt_rw_mutex_init(&rwsem->rwlock);
+ rwsem->rwlock.mutex.save_state = 0;
}
EXPORT_SYMBOL(__rt_rwsem_init);

Index: linux-rt.git/include/linux/sched.h
===================================================================
--- linux-rt.git.orig/include/linux/sched.h 2014-04-16 10:19:09.030579948 -0400
+++ linux-rt.git/include/linux/sched.h 2014-04-16 10:31:20.170360699 -0400
@@ -993,6 +993,22 @@
#endif
};

+#ifdef CONFIG_PREEMPT_RT_FULL
+struct rt_rw_mutex;
+/**
+ * reader_lock_struct
+ *
+ * @lock: pointer to rwsem that is held
+ * @task: pointer back to task, for lock code
+ * @list_head: link into rt_rw_mutex owners list
+ */
+struct reader_lock_struct {
+ struct rt_rw_mutex *lock;
+ struct task_struct *task;
+ struct list_head list;
+};
+#endif
+
struct sched_rt_entity {
struct list_head run_list;
unsigned long timeout;
@@ -1224,6 +1240,10 @@
#ifdef CONFIG_PREEMPT_RT_FULL
/* TODO: move me into ->restart_block ? */
struct siginfo forced_info;
+#define MAX_RWLOCK_DEPTH 5
+ int reader_lock_count; /* index of last element in owned_read_locks */
+ int reader_lock_free; /* index of next free element. */
+ struct reader_lock_struct owned_read_locks[MAX_RWLOCK_DEPTH];
#endif

unsigned long sas_ss_sp;
Index: linux-rt.git/kernel/fork.c
===================================================================
--- linux-rt.git.orig/kernel/fork.c 2014-04-16 10:19:09.030579948 -0400
+++ linux-rt.git/kernel/fork.c 2014-04-16 10:31:20.180360572 -0400
@@ -1385,6 +1385,26 @@
if (retval)
goto bad_fork_cleanup_io;

+#ifdef CONFIG_PREEMPT_RT_FULL
+ p->reader_lock_count = 0;
+ p->reader_lock_free = 0;
+ /* Bracket to keep 'i' local */
+ {
+ int i;
+ /*
+ * We could put the initialization of this list in
+ * the grabbing of the lock, but it is safer to
+ * do it now. The list head initialization may be
+ * removed, but we'll keep it for now, just to be safe.
+ */
+ for (i = 0; i < MAX_RWLOCK_DEPTH; i++) {
+ p->owned_read_locks[i].lock = NULL;
+ p->owned_read_locks[i].task = p;
+ INIT_LIST_HEAD(&p->owned_read_locks[i].list);
+ }
+ }
+#endif
+
if (pid != &init_struct_pid) {
retval = -ENOMEM;
pid = alloc_pid(p->nsproxy->pid_ns_for_children);
Index: linux-rt.git/kernel/sysctl.c
===================================================================
--- linux-rt.git.orig/kernel/sysctl.c 2014-04-16 10:19:09.030579948 -0400
+++ linux-rt.git/kernel/sysctl.c 2014-04-16 10:31:20.189360458 -0400
@@ -91,6 +91,10 @@
#include <linux/nmi.h>
#endif

+#ifdef CONFIG_PREEMPT_RT_FULL
+extern int rt_rw_limit;
+#endif
+

#if defined(CONFIG_SYSCTL)

@@ -453,6 +457,15 @@
.proc_handler = proc_dointvec,
},
#endif
+#ifdef CONFIG_PREEMPT_RT_FULL
+ {
+ .procname = "rwsem_reader_limit",
+ .data = &rt_rw_limit,
+ .maxlen = sizeof(int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec,
+ },
+#endif
{
.procname = "panic",
.data = &panic_timeout,
Index: linux-rt.git/include/linux/rtmutex.h
===================================================================
--- linux-rt.git.orig/include/linux/rtmutex.h 2014-04-16 10:19:09.030579948 -0400
+++ linux-rt.git/include/linux/rtmutex.h 2014-04-16 10:31:20.189360458 -0400
@@ -42,6 +42,21 @@
#endif
};

+/**
+ * The rt_rw_mutex structure
+ *
+ * @rt_mutex: The mutex to wait on
+ * @owners: list of read owners of the mutex
+ * @nr_owners: number of read owners
+ * @prio: the priority of the highest waiter
+ */
+struct rt_rw_mutex {
+ struct rt_mutex mutex;
+ struct list_head owners;
+ int nr_owners;
+ int prio;
+};
+
struct rt_mutex_waiter;
struct hrtimer_sleeper;

@@ -75,6 +90,15 @@
__rt_mutex_init(mutex, #mutex); \
} while (0)

+# define rt_rw_mutex_init(rwmutex) \
+ do { \
+ raw_spin_lock_init(&(rwmutex)->mutex.wait_lock); \
+ INIT_LIST_HEAD(&(rwmutex)->owners); \
+ (rwmutex)->nr_owners = 0; \
+ (rwmutex)->prio = -1; \
+ __rt_mutex_init(&(rwmutex)->mutex, #rwmutex); \
+ } while (0)
+
#define __RT_MUTEX_INITIALIZER_PLAIN(mutexname) \
.wait_lock = __RAW_SPIN_LOCK_UNLOCKED(mutexname.wait_lock) \
, .wait_list = PLIST_HEAD_INIT(mutexname.wait_list) \
@@ -85,6 +109,11 @@
#define __RT_MUTEX_INITIALIZER(mutexname) \
{ __RT_MUTEX_INITIALIZER_PLAIN(mutexname) }

+#define __RT_RW_MUTEX_INITIALIZER(mutexname) \
+ { .owners = LIST_HEAD_INIT(mutexname.owners) \
+ , .prio = -1 \
+ , .mutex = __RT_MUTEX_INITIALIZER(mutexname.mutex) }
+
#define __RT_MUTEX_INITIALIZER_SAVE_STATE(mutexname) \
{ __RT_MUTEX_INITIALIZER_PLAIN(mutexname) \
, .save_state = 1 }
Index: linux-rt.git/include/linux/rwsem_rt.h
===================================================================
--- linux-rt.git.orig/include/linux/rwsem_rt.h 2014-04-16 10:19:09.030579948 -0400
+++ linux-rt.git/include/linux/rwsem_rt.h 2014-04-16 10:31:20.198360345 -0400
@@ -19,14 +19,14 @@
#include <linux/rtmutex.h>

struct rw_semaphore {
- struct rt_mutex lock;
+ struct rt_rw_mutex rwlock;
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
};

#define __RWSEM_INITIALIZER(name) \
- { .lock = __RT_MUTEX_INITIALIZER(name.lock), \
+ { .rwlock = __RT_RW_MUTEX_INITIALIZER(name.rwlock), \
RW_DEP_MAP_INIT(name) }

#define DECLARE_RWSEM(lockname) \
@@ -37,7 +37,7 @@

#define __rt_init_rwsem(sem, name, key) \
do { \
- rt_mutex_init(&(sem)->lock); \
+ rt_rw_mutex_init(&(sem)->rwlock); \
__rt_rwsem_init((sem), (name), (key));\
} while (0)

@@ -63,7 +63,7 @@
extern void rt_downgrade_write(struct rw_semaphore *rwsem);

#define init_rwsem(sem) rt_init_rwsem(sem)
-#define rwsem_is_locked(s) rt_mutex_is_locked(&(s)->lock)
+#define rwsem_is_locked(s) rt_mutex_is_locked(&(s)->rwlock.mutex)

static inline void down_read(struct rw_semaphore *sem)
{
Index: linux-rt.git/kernel/futex.c
===================================================================
--- linux-rt.git.orig/kernel/futex.c 2014-04-16 10:19:09.030579948 -0400
+++ linux-rt.git/kernel/futex.c 2014-04-16 10:31:20.204360269 -0400
@@ -2327,7 +2327,7 @@
* The waiter is allocated on our stack, manipulated by the requeue
* code while we sleep on uaddr.
*/
- rt_mutex_init_waiter(&rt_waiter, false);
+ rt_mutex_init_waiter(&rt_waiter, false, false);

ret = get_futex_key(uaddr2, flags & FLAGS_SHARED, &key2, VERIFY_WRITE);
if (unlikely(ret != 0))

2014-04-18 08:19:56

by Ingo Molnar

[permalink] [raw]
Subject: Re: [RFC PATCH RT V4] rwsem: The return of multi-reader PI rwsems


* Steven Rostedt <[email protected]> wrote:

> Changes since v3:
>
> Clark reported that he was seeing a large latency when he added this
> patch. I tested it out on a 8 logical CPU box, and sure enough I was
> seeing it too. After spending the day debugging why, I found that I had
> a bug in rt_mutex_getprio(), where I could do:
>
> min(task_top_pi_waiter(task)->pi_list_entry.prio, prio)
>
> when there was no "top_pi_waiter", which would give garbage as a
> result. This would let some tasks have higher priority than they
> should, and cause other tasks that should have high priority not run.

Would a sanity check like the one below have helped? (untested and
such)

Thanks,

Ingo

==========>
kernel/locking/rtmutex_common.h | 1 +
1 file changed, 1 insertion(+)

diff --git a/kernel/locking/rtmutex_common.h b/kernel/locking/rtmutex_common.h
index 7431a9c..36b1ce8 100644
--- a/kernel/locking/rtmutex_common.h
+++ b/kernel/locking/rtmutex_common.h
@@ -85,6 +85,7 @@ static inline int task_has_pi_waiters(struct task_struct *p)
static inline struct rt_mutex_waiter *
task_top_pi_waiter(struct task_struct *p)
{
+ WARN_ON_ONCE(!p->pi_waiters_leftmost);
return rb_entry(p->pi_waiters_leftmost, struct rt_mutex_waiter,
pi_tree_entry);
}

2014-04-24 17:52:30

by Steven Rostedt

[permalink] [raw]
Subject: Re: [RFC PATCH RT V4] rwsem: The return of multi-reader PI rwsems

On Fri, 18 Apr 2014 10:19:26 +0200
Ingo Molnar <[email protected]> wrote:

>
> * Steven Rostedt <[email protected]> wrote:
>
> > Changes since v3:
> >
> > Clark reported that he was seeing a large latency when he added this
> > patch. I tested it out on a 8 logical CPU box, and sure enough I was
> > seeing it too. After spending the day debugging why, I found that I had
> > a bug in rt_mutex_getprio(), where I could do:
> >
> > min(task_top_pi_waiter(task)->pi_list_entry.prio, prio)
> >
> > when there was no "top_pi_waiter", which would give garbage as a
> > result. This would let some tasks have higher priority than they
> > should, and cause other tasks that should have high priority not run.
>
> Would a sanity check like the one below have helped? (untested and
> such)

Actually, if I had run this with CONFIG_DEBUG_PI_LIST then this would
have triggered:

#ifdef CONFIG_DEBUG_PI_LIST
# define plist_first_entry(head, type, member) \
({ \
WARN_ON(plist_head_empty(head)); \
container_of(plist_first(head), type, member); \
})
#else
# define plist_first_entry(head, type, member) \
container_of(plist_first(head), type, member)
#endif

-- Steve

>
> Thanks,
>
> Ingo
>
> ==========>
> kernel/locking/rtmutex_common.h | 1 +
> 1 file changed, 1 insertion(+)
>
> diff --git a/kernel/locking/rtmutex_common.h b/kernel/locking/rtmutex_common.h
> index 7431a9c..36b1ce8 100644
> --- a/kernel/locking/rtmutex_common.h
> +++ b/kernel/locking/rtmutex_common.h
> @@ -85,6 +85,7 @@ static inline int task_has_pi_waiters(struct task_struct *p)
> static inline struct rt_mutex_waiter *
> task_top_pi_waiter(struct task_struct *p)
> {
> + WARN_ON_ONCE(!p->pi_waiters_leftmost);
> return rb_entry(p->pi_waiters_leftmost, struct rt_mutex_waiter,
> pi_tree_entry);
> }