2009-03-03 00:03:13

by Darren Hart

[permalink] [raw]
Subject: [TIP][RFC 0/7] requeue pi implemenation

The following series is v5 of the requeue_pi patches against
linux-2.6-tip/core/futexes. The current futex implementation doesn't
allow for requeueing of PI futexes, which leads to a thundering herd
during pthread_cond_broadcast (as opposed to a civilized priority
ordered wakeup sequence). The core of the problem is that the
underlying rt_mutex can not be left with waiters and no owner (which
would break the PI logic). This patch series updates the futex requeue
code to allow for requeueing from non-pi to pi futexes in support of PI
aware pthread_cond_* calls along with some needful rt_mutex helper
routines. The credit for the design goes to Thomas Gleixner, while the
bugs and other idiocies present in this implementation should be
attributed to me.

I'd really appreciate feedback on the implementation as well as any
design critiques. Answers to the questions posed in the patch headers
and patches are particularly welcome.

This patch series has a known race condition that I'm currently
debugging (see 5/7), as well as notes, questions and FIXMEs in the
comments. These will be resolved and removed prior to submission

Each patch contains some documentation, but the bulk of the general
approach is outlined in 6/7.

Darren Hart (6):
RFC: futex: add requeue_pi calls
RFC: rt_mutex: add proxy lock routines
RFC: futex: finish_futex_lock_pi()
RFC: futex: futex_lock_pi_atomic()
RFC: futex: futex_top_waiter()
RFC: futex: futex_wait_queue_me()

$ git diff HEAD~6 | diffstat
include/asm-generic/errno.h | 2
include/linux/futex.h | 8
include/linux/thread_info.h | 4
kernel/futex.c | 1205 +++++++++++++++++++++++++++++++++-----------
kernel/rtmutex.c | 192 +++++--
kernel/rtmutex_common.h | 8
6 files changed, 1104 insertions(+), 315 deletions(-)

--
Darren Hart
IBM Linux Technology Center
Real-Time Linux Team


2009-03-03 00:10:04

by Darren Hart

[permalink] [raw]
Subject: [TIP][RFC 1/7] futex: futex_wait_queue_me()

From: Darren Hart <[email protected]>

Refactor futex_wait in preparation for futex_wait_requeue_pi(). In order to
reuse a good chunk of the futex_wait code for the upcoming
futex_wait_requeue_pi function, this patch breaks out the queue-to-wakeup
section for futex_wait into futex_wait_queue_me().

Changelog:
V4: -Nesting cleanups
-Delayed hrtimer start until after setting TASK_INTERRUPTIBLE
V1: -Initial version

Signed-off-by: Darren Hart <[email protected]>
---

kernel/futex.c | 156 +++++++++++++++++++++++++++++++-------------------------
1 files changed, 86 insertions(+), 70 deletions(-)


diff --git a/kernel/futex.c b/kernel/futex.c
index 206d4c9..16459c2 100644
--- a/kernel/futex.c
+++ b/kernel/futex.c
@@ -1154,23 +1154,86 @@ handle_fault:

static long futex_wait_restart(struct restart_block *restart);

+/*
+ * futex_wait_queue_me - queue_me and wait for wakeup, timeout, or signal.
+ * @hb: the futex hash bucket, must be locked by the caller
+ * @q: the futex_q to queue up on
+ * @abst_time: the absolute timeout, NULL for none
+ */
+static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
+ struct hrtimer_sleeper *timeout)
+{
+ DECLARE_WAITQUEUE(wait, current);
+
+ queue_me(q, hb);
+
+ /*
+ * There might have been scheduling since the queue_me(), as we
+ * cannot hold a spinlock across the get_user() in case it
+ * faults, and we cannot just set TASK_INTERRUPTIBLE state when
+ * queueing ourselves into the futex hash. This code thus has to
+ * rely on the futex_wake() code removing us from hash when it
+ * wakes us up.
+ */
+
+ /* add_wait_queue is the barrier after __set_current_state. */
+ __set_current_state(TASK_INTERRUPTIBLE);
+ add_wait_queue(&q->waiter, &wait);
+ /*
+ * NOTE: we don't remove ourselves from the waitqueue because
+ * we are the only user of it.
+ */
+
+ /* Arm the timer */
+ if (timeout) {
+ hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);
+ if (!hrtimer_active(&timeout->timer))
+ timeout->task = NULL;
+ }
+
+ /*
+ * !plist_node_empty() is safe here without any lock.
+ * q.lock_ptr != 0 is not safe, because of ordering against wakeup.
+ */
+ if (likely(!plist_node_empty(&q->list))) {
+ /*
+ * If the timer has already expired, current will already be
+ * flagged for rescheduling. Only call schedule if there
+ * is no timeout, or if it has yet to expire.
+ */
+ if (!timeout || likely(timeout->task))
+ schedule();
+ }
+ __set_current_state(TASK_RUNNING);
+}
+
static int futex_wait(u32 __user *uaddr, int fshared,
u32 val, ktime_t *abs_time, u32 bitset, int clockrt)
{
- struct task_struct *curr = current;
- DECLARE_WAITQUEUE(wait, curr);
struct futex_hash_bucket *hb;
struct futex_q q;
u32 uval;
+ struct hrtimer_sleeper timeout, *to = NULL;
int ret;
- struct hrtimer_sleeper t;
- int rem = 0;

if (!bitset)
return -EINVAL;

q.pi_state = NULL;
q.bitset = bitset;
+
+ if (abs_time) {
+ unsigned long slack;
+ to = &timeout;
+ slack = current->timer_slack_ns;
+ if (rt_task(current))
+ slack = 0;
+ hrtimer_init_on_stack(&to->timer, clockrt ? CLOCK_REALTIME :
+ CLOCK_MONOTONIC, HRTIMER_MODE_ABS);
+ hrtimer_init_sleeper(to, current);
+ hrtimer_set_expires_range_ns(&to->timer, *abs_time, slack);
+ }
+
retry:
q.key = FUTEX_KEY_INIT;
ret = get_futex_key(uaddr, fshared, &q.key);
@@ -1209,78 +1272,26 @@ retry:

if (!ret)
goto retry;
- return ret;
+ goto out;
}
ret = -EWOULDBLOCK;
- if (uval != val)
- goto out_unlock_put_key;

/* Only actually queue if *uaddr contained val. */
- queue_me(&q, hb);
-
- /*
- * There might have been scheduling since the queue_me(), as we
- * cannot hold a spinlock across the get_user() in case it
- * faults, and we cannot just set TASK_INTERRUPTIBLE state when
- * queueing ourselves into the futex hash. This code thus has to
- * rely on the futex_wake() code removing us from hash when it
- * wakes us up.
- */
-
- /* add_wait_queue is the barrier after __set_current_state. */
- __set_current_state(TASK_INTERRUPTIBLE);
- add_wait_queue(&q.waiter, &wait);
- /*
- * !plist_node_empty() is safe here without any lock.
- * q.lock_ptr != 0 is not safe, because of ordering against wakeup.
- */
- if (likely(!plist_node_empty(&q.list))) {
- if (!abs_time)
- schedule();
- else {
- unsigned long slack;
- slack = current->timer_slack_ns;
- if (rt_task(current))
- slack = 0;
- hrtimer_init_on_stack(&t.timer,
- clockrt ? CLOCK_REALTIME :
- CLOCK_MONOTONIC,
- HRTIMER_MODE_ABS);
- hrtimer_init_sleeper(&t, current);
- hrtimer_set_expires_range_ns(&t.timer, *abs_time, slack);
-
- hrtimer_start_expires(&t.timer, HRTIMER_MODE_ABS);
- if (!hrtimer_active(&t.timer))
- t.task = NULL;
-
- /*
- * the timer could have already expired, in which
- * case current would be flagged for rescheduling.
- * Don't bother calling schedule.
- */
- if (likely(t.task))
- schedule();
-
- hrtimer_cancel(&t.timer);
+ if (uval != val)
+ goto out_unlock_put_key;

- /* Flag if a timeout occured */
- rem = (t.task == NULL);
-
- destroy_hrtimer_on_stack(&t.timer);
- }
- }
- __set_current_state(TASK_RUNNING);
-
- /*
- * NOTE: we don't remove ourselves from the waitqueue because
- * we are the only user of it.
- */
+ /* queue_me and wait for wakeup, timeout, or a signal. */
+ futex_wait_queue_me(hb, &q, to);

/* If we were woken (and unqueued), we succeeded, whatever. */
- if (!unqueue_me(&q))
- return 0;
- if (rem)
- return -ETIMEDOUT;
+ if (!unqueue_me(&q)) {
+ ret = 0;
+ goto out;
+ }
+ if (to && !to->task) {
+ ret = -ETIMEDOUT;
+ goto out;
+ }

/*
* We expect signal_pending(current), but another thread may
@@ -1302,7 +1313,8 @@ retry:
restart->futex.flags |= FLAGS_SHARED;
if (clockrt)
restart->futex.flags |= FLAGS_CLOCKRT;
- return -ERESTART_RESTARTBLOCK;
+ ret = -ERESTART_RESTARTBLOCK;
+ goto out;
}

out_unlock_put_key:
@@ -1310,6 +1322,10 @@ out_unlock_put_key:
put_futex_key(fshared, &q.key);

out:
+ if (to) {
+ hrtimer_cancel(&to->timer);
+ destroy_hrtimer_on_stack(&to->timer);
+ }
return ret;
}


--
Darren Hart
IBM Linux Technology Center
Real-Time Linux Team

2009-03-03 00:11:58

by Darren Hart

[permalink] [raw]
Subject: [TIP][RFC 2/7] futex: futex_top_waiter()

From: Darren Hart <[email protected]>

Improve legibility by wrapping finding the top waiter in a function. This
will be used by the follow-on patches for enabling requeue pi.

Changelog:
V4: -Simplified logic in the hb loop
-Return the futex_q as the task can be had from there if needed
-Updated comments and dropped the -safe iterator per review from tglx
V3: -Comment linewidth cleanup
V1: -Initial version

Signed-off-by: Darren Hart <[email protected]>
---

kernel/futex.c | 26 ++++++++++++++++++++++++++
1 files changed, 26 insertions(+), 0 deletions(-)


diff --git a/kernel/futex.c b/kernel/futex.c
index 16459c2..d02962b 100644
--- a/kernel/futex.c
+++ b/kernel/futex.c
@@ -277,6 +277,32 @@ void put_futex_key(int fshared, union futex_key *key)
drop_futex_key_refs(key);
}

+/*
+ * futex_top_waiter - return the highest priority waiter on the futex specified
+ * by key
+ * @hb: the hash bucket the futex_q's reside in
+ * @key: the futex key (to distinguish it from other futex futex_q's)
+ *
+ * Must be called with the hb lock held.
+ *
+ */
+static struct futex_q *futex_top_waiter(struct futex_hash_bucket *hb,
+ union futex_key *key)
+{
+ struct plist_head *head;
+ struct futex_q *this;
+ struct futex_q *top_waiter = NULL;
+
+ head = &hb->chain;
+ plist_for_each_entry(this, head, list) {
+ if (match_futex(&this->key, key)) {
+ top_waiter = this;
+ break;
+ }
+ }
+ return top_waiter;
+}
+
static u32 cmpxchg_futex_value_locked(u32 __user *uaddr, u32 uval, u32 newval)
{
u32 curval;
--
Darren Hart
IBM Linux Technology Center
Real-Time Linux Team

2009-03-03 00:13:30

by Darren Hart

[permalink] [raw]
Subject: [TIP][RFC 3/7] futex: futex_lock_pi_atomic()

From: Darren Hart <[email protected]>

Refactor the atomic portion of futex_lock_pi() into futex_lock_pi_atomic().
This logic will be needed elsewhere, so modularize it to reduce code
duplication. The only significant change is passing of the task to try and
take the lock for. This simplifies the -EDEADLOCK test as if the lock is owned
by task t, it's a deadlock, regardless of if we are doing requeue pi or not.
This patch updates the corresponding comment accordingly.

Changelog:
V4: -fixes for style errors caught by checkpatch.pl
-use task instead of t for legibility whilst searching
-pass key and pi_state directly rather than trying to rely on the futex_q
since the key may be different.
-minor cleanups suggested by tglx
V2: -Initial version

Signed-off-by: Darren Hart <[email protected]>
---

kernel/futex.c | 230 +++++++++++++++++++++++++++++++++-----------------------
1 files changed, 134 insertions(+), 96 deletions(-)


diff --git a/kernel/futex.c b/kernel/futex.c
index d02962b..c4984d4 100644
--- a/kernel/futex.c
+++ b/kernel/futex.c
@@ -592,6 +592,129 @@ lookup_pi_state(u32 uval, struct futex_hash_bucket *hb,
return 0;
}

+/* futex_lock_pi_atomic - perform the atomic work on the futex required to
+ * take a pi aware futex
+ * @uaddr: the pi futex user address
+ * @hb: the pi futex hash bucket
+ * @key: the futex key assocaiated with uaddr and hb
+ * @ps: the pi_state pointer where we store the result of the lookup
+ * @task: the task to perform the atomic lock work for. This will be current
+ * except in the case of requeue pi.
+ *
+ * Returns:
+ * - 0 on success (futex_q is setup and FUTEX_WAITERS bit is set on the futex)
+ * - 1 on a successful atomic acquisition of the lock
+ * - A negative error code on failure
+ *
+ * The hb->lock and futex_key refs shall be held by the caller.
+ */
+static int futex_lock_pi_atomic(u32 __user *uaddr, struct futex_hash_bucket *hb,
+ union futex_key *key,
+ struct futex_pi_state **ps,
+ struct task_struct *task)
+{
+ u32 uval, newval, curval;
+ int lock_taken;
+ int ret;
+ int ownerdied = 0;
+
+retry:
+ ret = lock_taken = 0;
+
+ /*
+ * To avoid races, we attempt to take the lock here again
+ * (by doing a 0 -> TID atomic cmpxchg), while holding all
+ * the locks. It will most likely not succeed.
+ */
+ newval = task_pid_vnr(task);
+
+ curval = cmpxchg_futex_value_locked(uaddr, 0, newval);
+
+ if (unlikely(curval == -EFAULT))
+ return -EFAULT;
+
+ /*
+ * Detect deadlocks.
+ */
+ if ((unlikely((curval & FUTEX_TID_MASK) == task_pid_vnr(task))))
+ return -EDEADLK;
+
+ /*
+ * Surprise - we got the lock. Just return to userspace:
+ */
+ if (unlikely(!curval))
+ return 1;
+
+ uval = curval;
+
+ /*
+ * Set the WAITERS flag, so the owner will know it has someone
+ * to wake at next unlock
+ */
+ newval = curval | FUTEX_WAITERS;
+
+ /*
+ * There are two cases, where a futex might have no owner (the
+ * owner TID is 0): OWNER_DIED. We take over the futex in this
+ * case. We also do an unconditional take over, when the owner
+ * of the futex died.
+ *
+ * This is safe as we are protected by the hash bucket lock !
+ */
+ if (unlikely(ownerdied || !(curval & FUTEX_TID_MASK))) {
+ /* Keep the OWNER_DIED bit */
+ newval = (curval & ~FUTEX_TID_MASK) | task_pid_vnr(task);
+ ownerdied = 0;
+ lock_taken = 1;
+ }
+
+ curval = cmpxchg_futex_value_locked(uaddr, uval, newval);
+
+ if (unlikely(curval == -EFAULT))
+ return -EFAULT;
+ if (unlikely(curval != uval))
+ goto retry;
+
+ /*
+ * We took the lock due to owner died take over.
+ */
+ if (unlikely(lock_taken))
+ return 1;
+
+ /*
+ * We dont have the lock. Look up the PI state (or create it if
+ * we are the first waiter):
+ */
+ ret = lookup_pi_state(uval, hb, key, ps);
+
+ if (unlikely(ret)) {
+ switch (ret) {
+ case -ESRCH:
+ /*
+ * No owner found for this futex. Check if the
+ * OWNER_DIED bit is set to figure out whether
+ * this is a robust futex or not.
+ */
+ if (get_futex_value_locked(&curval, uaddr))
+ return -EFAULT;
+
+ /*
+ * We simply start over in case of a robust
+ * futex. The code above will take the futex
+ * and return happy.
+ */
+ if (curval & FUTEX_OWNER_DIED) {
+ ownerdied = 1;
+ goto retry;
+ }
+ default:
+ break;
+ }
+ }
+
+ return ret;
+}
+
/*
* The hash bucket lock must be held when this is called.
* Afterwards, the futex_q must not be accessed.
@@ -1384,9 +1507,9 @@ static int futex_lock_pi(u32 __user *uaddr, int fshared,
struct hrtimer_sleeper timeout, *to = NULL;
struct task_struct *curr = current;
struct futex_hash_bucket *hb;
- u32 uval, newval, curval;
+ u32 uval;
struct futex_q q;
- int ret, lock_taken, ownerdied = 0, attempt = 0;
+ int ret, attempt = 0;

if (refill_pi_state_cache())
return -ENOMEM;
@@ -1409,108 +1532,23 @@ retry:
retry_unlocked:
hb = queue_lock(&q);

-retry_locked:
- ret = lock_taken = 0;
-
- /*
- * To avoid races, we attempt to take the lock here again
- * (by doing a 0 -> TID atomic cmpxchg), while holding all
- * the locks. It will most likely not succeed.
- */
- newval = task_pid_vnr(current);
-
- curval = cmpxchg_futex_value_locked(uaddr, 0, newval);
-
- if (unlikely(curval == -EFAULT))
- goto uaddr_faulted;
-
- /*
- * Detect deadlocks. In case of REQUEUE_PI this is a valid
- * situation and we return success to user space.
- */
- if (unlikely((curval & FUTEX_TID_MASK) == task_pid_vnr(current))) {
- ret = -EDEADLK;
- goto out_unlock_put_key;
- }
-
- /*
- * Surprise - we got the lock. Just return to userspace:
- */
- if (unlikely(!curval))
- goto out_unlock_put_key;
-
- uval = curval;
-
- /*
- * Set the WAITERS flag, so the owner will know it has someone
- * to wake at next unlock
- */
- newval = curval | FUTEX_WAITERS;
-
- /*
- * There are two cases, where a futex might have no owner (the
- * owner TID is 0): OWNER_DIED. We take over the futex in this
- * case. We also do an unconditional take over, when the owner
- * of the futex died.
- *
- * This is safe as we are protected by the hash bucket lock !
- */
- if (unlikely(ownerdied || !(curval & FUTEX_TID_MASK))) {
- /* Keep the OWNER_DIED bit */
- newval = (curval & ~FUTEX_TID_MASK) | task_pid_vnr(current);
- ownerdied = 0;
- lock_taken = 1;
- }
-
- curval = cmpxchg_futex_value_locked(uaddr, uval, newval);
-
- if (unlikely(curval == -EFAULT))
- goto uaddr_faulted;
- if (unlikely(curval != uval))
- goto retry_locked;
-
- /*
- * We took the lock due to owner died take over.
- */
- if (unlikely(lock_taken))
- goto out_unlock_put_key;
-
- /*
- * We dont have the lock. Look up the PI state (or create it if
- * we are the first waiter):
- */
- ret = lookup_pi_state(uval, hb, &q.key, &q.pi_state);
-
+ ret = futex_lock_pi_atomic(uaddr, hb, &q.key, &q.pi_state, current);
if (unlikely(ret)) {
switch (ret) {
-
+ case 1:
+ /* We got the lock. */
+ ret = 0;
+ goto out_unlock_put_key;
+ case -EFAULT:
+ goto uaddr_faulted;
case -EAGAIN:
/*
- * Task is exiting and we just wait for the
- * exit to complete.
+ * Task is exiting and we just wait for the exit to
+ * complete.
*/
queue_unlock(&q, hb);
cond_resched();
goto retry;
-
- case -ESRCH:
- /*
- * No owner found for this futex. Check if the
- * OWNER_DIED bit is set to figure out whether
- * this is a robust futex or not.
- */
- if (get_futex_value_locked(&curval, uaddr))
- goto uaddr_faulted;
-
- /*
- * We simply start over in case of a robust
- * futex. The code above will take the futex
- * and return happy.
- */
- if (curval & FUTEX_OWNER_DIED) {
- ownerdied = 1;
- goto retry_locked;
- }
default:
goto out_unlock_put_key;
}
--
Darren Hart
IBM Linux Technology Center
Real-Time Linux Team

2009-03-03 00:15:18

by Darren Hart

[permalink] [raw]
Subject: [TIP][RFC 4/7] futex: finish_futex_lock_pi()

From: Darren Hart <[email protected]>

Refactor the post lock acquisition logic from futex_lock_pi(). This code will
be reused in futex_wait_requeue_pi().

V4: -Corrected string paranoia check
-Move the spinlock(q->lock_ptr) out of finish_futex_lock_pi to retain
some semblance of lock/unlock in the same function.
V3: -Initial version

Signed-off-by: Darren Hart <[email protected]>
---

kernel/futex.c | 146 ++++++++++++++++++++++++++++++--------------------------
1 files changed, 78 insertions(+), 68 deletions(-)


diff --git a/kernel/futex.c b/kernel/futex.c
index c4984d4..9446494 100644
--- a/kernel/futex.c
+++ b/kernel/futex.c
@@ -1303,6 +1303,81 @@ handle_fault:

static long futex_wait_restart(struct restart_block *restart);

+/* finish_futex_lock_pi - post lock pi_state and corner case management
+ * @uaddr: the user address of the futex
+ * @fshared: whether the futex is shared (1) or not (0)
+ * @q: the futex_q (containes pi_state and access to the rt_mutex)
+ * @ret: the return value of the preceeding attempt to take the rt_mutex
+ *
+ * After attempting to take an rtmutex, process the return code and cleanup
+ * the pi_state as well as handle race conditions that may have caused us to
+ * lose the lock. Must be called with the hb lock held.
+ *
+ * Returns 0 on success, negative error code otherwise.
+ */
+static int finish_futex_lock_pi(u32 __user *uaddr, int fshared,
+ struct futex_q *q, int ret)
+{
+ if (!ret) {
+ /*
+ * Got the lock. We might not be the anticipated owner
+ * if we did a lock-steal - fix up the PI-state in
+ * that case:
+ */
+ if (q->pi_state->owner != current)
+ ret = fixup_pi_state_owner(uaddr, q, current, fshared);
+ return ret;
+ }
+
+ /*
+ * Catch the rare case, where the lock was released when we were on the
+ * way back before we locked the hash bucket.
+ */
+ if (q->pi_state->owner == current) {
+ /*
+ * Try to get the rt_mutex now. This might fail as some other
+ * task acquired the rt_mutex after we removed ourself from the
+ * rt_mutex waiters list.
+ */
+ if (rt_mutex_trylock(&q->pi_state->pi_mutex))
+ ret = 0;
+ else {
+ /*
+ * pi_state is incorrect, some other task did a lock
+ * steal and we returned due to timeout or signal
+ * without taking the rt_mutex. Too late. We can access
+ * the rt_mutex_owner without locking, as the other
+ * task is now blocked on the hash bucket lock. Fix the
+ * state up.
+ */
+ struct task_struct *owner;
+ int res;
+
+ owner = rt_mutex_owner(&q->pi_state->pi_mutex);
+ res = fixup_pi_state_owner(uaddr, q, owner,
+ fshared);
+
+ /* propagate -EFAULT, if the fixup failed */
+ if (res)
+ ret = res;
+ }
+ } else {
+ /* dvhart FIXME: can't we just BUG_ON in this case?
+ * Paranoia check. If we did not take the lock in the trylock
+ * above, then we should not be the owner of the rtmutex,
+ * neither the real nor the pending one:
+ */
+ if (rt_mutex_owner(&q->pi_state->pi_mutex) == current)
+ printk(KERN_ERR "finish_futex_lock_pi: "
+ "ret = %d pi-mutex: %p "
+ "pi-state %p\n", ret,
+ q->pi_state->pi_mutex.owner,
+ q->pi_state->owner);
+ }
+
+ return ret != -EINTR ? ret : -ERESTARTNOINTR;
+}
+
/*
* futex_wait_queue_me - queue_me and wait for wakeup, timeout, or signal.
* @hb: the futex hash bucket, must be locked by the caller
@@ -1505,7 +1580,6 @@ static int futex_lock_pi(u32 __user *uaddr, int fshared,
int detect, ktime_t *time, int trylock)
{
struct hrtimer_sleeper timeout, *to = NULL;
- struct task_struct *curr = current;
struct futex_hash_bucket *hb;
u32 uval;
struct futex_q q;
@@ -1572,74 +1646,12 @@ retry_unlocked:
}

spin_lock(q.lock_ptr);
-
- if (!ret) {
- /*
- * Got the lock. We might not be the anticipated owner
- * if we did a lock-steal - fix up the PI-state in
- * that case:
- */
- if (q.pi_state->owner != curr)
- ret = fixup_pi_state_owner(uaddr, &q, curr, fshared);
- } else {
- /*
- * Catch the rare case, where the lock was released
- * when we were on the way back before we locked the
- * hash bucket.
- */
- if (q.pi_state->owner == curr) {
- /*
- * Try to get the rt_mutex now. This might
- * fail as some other task acquired the
- * rt_mutex after we removed ourself from the
- * rt_mutex waiters list.
- */
- if (rt_mutex_trylock(&q.pi_state->pi_mutex))
- ret = 0;
- else {
- /*
- * pi_state is incorrect, some other
- * task did a lock steal and we
- * returned due to timeout or signal
- * without taking the rt_mutex. Too
- * late. We can access the
- * rt_mutex_owner without locking, as
- * the other task is now blocked on
- * the hash bucket lock. Fix the state
- * up.
- */
- struct task_struct *owner;
- int res;
-
- owner = rt_mutex_owner(&q.pi_state->pi_mutex);
- res = fixup_pi_state_owner(uaddr, &q, owner,
- fshared);
-
- /* propagate -EFAULT, if the fixup failed */
- if (res)
- ret = res;
- }
- } else {
- /*
- * Paranoia check. If we did not take the lock
- * in the trylock above, then we should not be
- * the owner of the rtmutex, neither the real
- * nor the pending one:
- */
- if (rt_mutex_owner(&q.pi_state->pi_mutex) == curr)
- printk(KERN_ERR "futex_lock_pi: ret = %d "
- "pi-mutex: %p pi-state %p\n", ret,
- q.pi_state->pi_mutex.owner,
- q.pi_state->owner);
- }
- }
+ ret = finish_futex_lock_pi(uaddr, fshared, &q, ret);

/* Unqueue and drop the lock */
unqueue_me_pi(&q);

- if (to)
- destroy_hrtimer_on_stack(&to->timer);
- return ret != -EINTR ? ret : -ERESTARTNOINTR;
+ goto out;

out_unlock_put_key:
queue_unlock(&q, hb);
@@ -1672,9 +1684,7 @@ uaddr_faulted:
if (!ret)
goto retry;

- if (to)
- destroy_hrtimer_on_stack(&to->timer);
- return ret;
+ goto out;
}

/*

--
Darren Hart
IBM Linux Technology Center
Real-Time Linux Team

2009-03-03 00:18:19

by Darren Hart

[permalink] [raw]
Subject: [TIP][RFC 5/7] rt_mutex: add proxy lock routines

From: Darren Hart <[email protected]>

This patch is required for the first half of requeue_pi to function. It
basically splits rt_mutex_slowlock() right down the middle, just before the
first call to schedule().

This patch uses a new futex_q field, rt_waiter, for now. I think
I should be able to use task->pi_blocked_on in a future versino of this patch.

NOTE: I believe this patch creates a race condition that the final patch hits
when trying to do requeue_pi with nr_wake=1 and nr_requeue=0. See
that patch header (6/7) for a complete discription.

V5: -remove EXPORT_SYMBOL_GPL from the new routines
-minor cleanups
V4: -made detect_deadlock a parameter to rt_mutex_enqueue_task
-refactored rt_mutex_slowlock to share code with new functions
-renamed rt_mutex_enqueue_task and rt_mutex_handle_wakeup to
rt_mutex_start_proxy_lock and rt_mutex_finish_proxy_lock, respectively

Signed-off-by: Darren Hart <[email protected]>
---

kernel/rtmutex.c | 192 +++++++++++++++++++++++++++++++++++++----------
kernel/rtmutex_common.h | 8 ++
2 files changed, 160 insertions(+), 40 deletions(-)


diff --git a/kernel/rtmutex.c b/kernel/rtmutex.c
index 69d9cb9..a96842e 100644
--- a/kernel/rtmutex.c
+++ b/kernel/rtmutex.c
@@ -411,6 +411,7 @@ static int try_to_take_rt_mutex(struct rt_mutex *lock)
*/
static int task_blocks_on_rt_mutex(struct rt_mutex *lock,
struct rt_mutex_waiter *waiter,
+ struct task_struct *task,
int detect_deadlock)
{
struct task_struct *owner = rt_mutex_owner(lock);
@@ -418,21 +419,21 @@ static int task_blocks_on_rt_mutex(struct rt_mutex *lock,
unsigned long flags;
int chain_walk = 0, res;

- spin_lock_irqsave(&current->pi_lock, flags);
- __rt_mutex_adjust_prio(current);
- waiter->task = current;
+ spin_lock_irqsave(&task->pi_lock, flags);
+ __rt_mutex_adjust_prio(task);
+ waiter->task = task;
waiter->lock = lock;
- plist_node_init(&waiter->list_entry, current->prio);
- plist_node_init(&waiter->pi_list_entry, current->prio);
+ plist_node_init(&waiter->list_entry, task->prio);
+ plist_node_init(&waiter->pi_list_entry, task->prio);

/* Get the top priority waiter on the lock */
if (rt_mutex_has_waiters(lock))
top_waiter = rt_mutex_top_waiter(lock);
plist_add(&waiter->list_entry, &lock->wait_list);

- current->pi_blocked_on = waiter;
+ task->pi_blocked_on = waiter;

- spin_unlock_irqrestore(&current->pi_lock, flags);
+ spin_unlock_irqrestore(&task->pi_lock, flags);

if (waiter == rt_mutex_top_waiter(lock)) {
spin_lock_irqsave(&owner->pi_lock, flags);
@@ -460,7 +461,7 @@ static int task_blocks_on_rt_mutex(struct rt_mutex *lock,
spin_unlock(&lock->wait_lock);

res = rt_mutex_adjust_prio_chain(owner, detect_deadlock, lock, waiter,
- current);
+ task);

spin_lock(&lock->wait_lock);

@@ -605,37 +606,25 @@ void rt_mutex_adjust_pi(struct task_struct *task)
rt_mutex_adjust_prio_chain(task, 0, NULL, NULL, task);
}

-/*
- * Slow path lock function:
+/**
+ * __rt_mutex_slowlock - perform the wait-wake-try-to-take loop
+ * @lock the rt_mutex to take
+ * @state: the state the task should block in (TASK_INTERRUPTIBLE
+ * or TASK_UNINTERRUPTIBLE)
+ * @timeout: the pre-initialized and started timer, or NULL for none
+ * @waiter: the pre-initialized rt_mutex_waiter
+ * @detect_deadlock: passed to task_blocks_on_rt_mutex
+ *
+ * lock->wait_lock must be held by the caller.
*/
static int __sched
-rt_mutex_slowlock(struct rt_mutex *lock, int state,
- struct hrtimer_sleeper *timeout,
- int detect_deadlock)
+__rt_mutex_slowlock(struct rt_mutex *lock, int state,
+ struct hrtimer_sleeper *timeout,
+ struct rt_mutex_waiter *waiter,
+ int detect_deadlock)
{
- struct rt_mutex_waiter waiter;
int ret = 0;

- debug_rt_mutex_init_waiter(&waiter);
- waiter.task = NULL;
-
- spin_lock(&lock->wait_lock);
-
- /* Try to acquire the lock again: */
- if (try_to_take_rt_mutex(lock)) {
- spin_unlock(&lock->wait_lock);
- return 0;
- }
-
- set_current_state(state);
-
- /* Setup the timer, when timeout != NULL */
- if (unlikely(timeout)) {
- hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);
- if (!hrtimer_active(&timeout->timer))
- timeout->task = NULL;
- }
-
for (;;) {
/* Try to acquire the lock: */
if (try_to_take_rt_mutex(lock))
@@ -656,19 +645,19 @@ rt_mutex_slowlock(struct rt_mutex *lock, int state,
}

/*
- * waiter.task is NULL the first time we come here and
+ * waiter->task is NULL the first time we come here and
* when we have been woken up by the previous owner
* but the lock got stolen by a higher prio task.
*/
- if (!waiter.task) {
- ret = task_blocks_on_rt_mutex(lock, &waiter,
+ if (!waiter->task) {
+ ret = task_blocks_on_rt_mutex(lock, waiter, current,
detect_deadlock);
/*
* If we got woken up by the owner then start loop
* all over without going into schedule to try
* to get the lock now:
*/
- if (unlikely(!waiter.task)) {
+ if (unlikely(!waiter->task)) {
/*
* Reset the return value. We might
* have returned with -EDEADLK and the
@@ -684,15 +673,52 @@ rt_mutex_slowlock(struct rt_mutex *lock, int state,

spin_unlock(&lock->wait_lock);

- debug_rt_mutex_print_deadlock(&waiter);
+ debug_rt_mutex_print_deadlock(waiter);

- if (waiter.task)
+ if (waiter->task)
schedule_rt_mutex(lock);

spin_lock(&lock->wait_lock);
set_current_state(state);
}

+ return ret;
+}
+
+/*
+ * Slow path lock function:
+ */
+static int __sched
+rt_mutex_slowlock(struct rt_mutex *lock, int state,
+ struct hrtimer_sleeper *timeout,
+ int detect_deadlock)
+{
+ struct rt_mutex_waiter waiter;
+ int ret = 0;
+
+ debug_rt_mutex_init_waiter(&waiter);
+ waiter.task = NULL;
+
+ spin_lock(&lock->wait_lock);
+
+ /* Try to acquire the lock again: */
+ if (try_to_take_rt_mutex(lock)) {
+ spin_unlock(&lock->wait_lock);
+ return 0;
+ }
+
+ set_current_state(state);
+
+ /* Setup the timer, when timeout != NULL */
+ if (unlikely(timeout)) {
+ hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);
+ if (!hrtimer_active(&timeout->timer))
+ timeout->task = NULL;
+ }
+
+ ret = __rt_mutex_slowlock(lock, state, timeout, &waiter,
+ detect_deadlock);
+
set_current_state(TASK_RUNNING);

if (unlikely(waiter.task))
@@ -986,6 +1012,41 @@ void rt_mutex_proxy_unlock(struct rt_mutex *lock,
}

/**
+ * rt_mutex_start_proxy_lock - prepare another task to take the lock
+ *
+ * @lock: the rt_mutex to take
+ * @waiter: the rt_mutex_waiter initialized by the waiter
+ * @task: the task to prepare
+ * @detext_deadlock: passed to task_blocks_on_rt_mutex
+ *
+ * The lock should have an owner, and it should not be task.
+ * Special API call for FUTEX_REQUEUE_PI support.
+ */
+int rt_mutex_start_proxy_lock(struct rt_mutex *lock,
+ struct rt_mutex_waiter *waiter,
+ struct task_struct *task, int detect_deadlock)
+{
+ int ret;
+
+ spin_lock(&lock->wait_lock);
+ ret = task_blocks_on_rt_mutex(lock, waiter, task, detect_deadlock);
+ if (ret && !waiter->task) {
+ /*
+ * Reset the return value. We might have
+ * returned with -EDEADLK and the owner
+ * released the lock while we were walking the
+ * pi chain. Let the waiter sort it out.
+ */
+ ret = 0;
+ }
+ spin_unlock(&lock->wait_lock);
+
+ debug_rt_mutex_print_deadlock(waiter);
+
+ return ret;
+}
+
+/**
* rt_mutex_next_owner - return the next owner of the lock
*
* @lock: the rt lock query
@@ -1004,3 +1065,54 @@ struct task_struct *rt_mutex_next_owner(struct rt_mutex *lock)

return rt_mutex_top_waiter(lock)->task;
}
+
+/**
+ * rt_mutex_finish_proxy_lock - Complete the taking of the lock initialized on
+ * our behalf by another thread.
+ * @lock: the rt_mutex we were woken on
+ * @to: the timeout, null if none. hrtimer should already have been started.
+ * @waiter: the pre-initialized rt_mutex_waiter
+ * @detect_deadlock: for use by __rt_mutex_slowlock
+ *
+ * Special API call for PI-futex requeue support
+ */
+int rt_mutex_finish_proxy_lock(struct rt_mutex *lock,
+ struct hrtimer_sleeper *to,
+ struct rt_mutex_waiter *waiter,
+ int detect_deadlock)
+{
+ int ret;
+
+ if (waiter->task)
+ schedule_rt_mutex(lock);
+
+ spin_lock(&lock->wait_lock);
+
+ set_current_state(TASK_INTERRUPTIBLE);
+
+ ret = __rt_mutex_slowlock(lock, TASK_INTERRUPTIBLE, to, waiter,
+ detect_deadlock);
+
+ set_current_state(TASK_RUNNING);
+
+ if (unlikely(waiter->task))
+ remove_waiter(lock, waiter);
+
+ /*
+ * try_to_take_rt_mutex() sets the waiter bit unconditionally. We might
+ * have to fix that up.
+ */
+ fixup_rt_mutex_waiters(lock);
+
+ spin_unlock(&lock->wait_lock);
+
+ /*
+ * Readjust priority, when we did not get the lock. We might have been
+ * the pending owner and boosted. Since we did not take the lock, the
+ * PI boost has to go.
+ */
+ if (unlikely(ret))
+ rt_mutex_adjust_prio(current);
+
+ return ret;
+}
diff --git a/kernel/rtmutex_common.h b/kernel/rtmutex_common.h
index e124bf5..97a2f81 100644
--- a/kernel/rtmutex_common.h
+++ b/kernel/rtmutex_common.h
@@ -120,6 +120,14 @@ extern void rt_mutex_init_proxy_locked(struct rt_mutex *lock,
struct task_struct *proxy_owner);
extern void rt_mutex_proxy_unlock(struct rt_mutex *lock,
struct task_struct *proxy_owner);
+extern int rt_mutex_start_proxy_lock(struct rt_mutex *lock,
+ struct rt_mutex_waiter *waiter,
+ struct task_struct *task,
+ int detect_deadlock);
+extern int rt_mutex_finish_proxy_lock(struct rt_mutex *lock,
+ struct hrtimer_sleeper *to,
+ struct rt_mutex_waiter *waiter,
+ int detect_deadlock);

#ifdef CONFIG_DEBUG_RT_MUTEXES
# include "rtmutex-debug.h"

--
Darren Hart
IBM Linux Technology Center
Real-Time Linux Team

2009-03-03 00:20:50

by Darren Hart

[permalink] [raw]
Subject: [TIP][RFC 6/7] futex: add requeue_pi calls

From: Darren Hart <[email protected]>

PI Futexes must have an owner at all times, so the standard requeue commands
aren't sufficient. The new commands properly manage pi futex ownership by
ensuring a futex with waiters has an owner at all times. Once complete these
patches will allow glibc to properly handle pi mutexes with pthread_condvars.

The approach taken here is to create two new futex op codes:

FUTEX_WAIT_REQUEUE_PI:
Threads will use this op code to wait on a futex (such as a non-pi waitqueue)
and wake after they have been requeued to a pi futex. Prior to returning to
userspace, they will take this pi futex (and the underlying rt_mutex).

futex_wait_requeue_pi() is currently the result of a high speed collision
between futex_wait and futex_lock_pi (with the first part of futex_lock_pi
being done by futex_requeue_pi_init() on behalf of the waiter).

FUTEX_REQUEUE_PI:
This call must be used to wake threads waiting with FUTEX_WAIT_REQUEUE_PI,
regardless of how many threads the caller intends to wake or requeue.
pthread_cond_broadcast should call this with nr_wake=1 and nr_requeue=-1 (all).
pthread_cond_signal should call this with nr_wake=1 and nr_requeue=0. The
reason being we need both callers to get the benefit of the
futex_requeue_pi_init() routine which will prepare the top_waiter (the thread
to be woken) to take possesion of the pi futex by setting FUTEX_WAITERS and
preparing the futex_q.pi_state. futex_requeue() also enqueues the top_waiter
on the rt_mutex via rt_mutex_start_proxy_lock(). If pthread_cond_signal used
FUTEX_WAKE, we would have a similar race window where the caller can return and
release the mutex before the waiters can fully wake, potentially leaving the
rt_mutex with waiters but no owner.

We hit a failed paging request running the testcase (7/7) in a loop
(only takes a few minutes at most to hit on my 8way x86_64 test
machine). It appears to be the result of splitting rt_mutex_slowlock()
across two execution contexts by means of rt_mutex_start_proxy_lock()
and rt_mutex_finish_proxy_lock(). The former calls
task_blocks_on_rt_mutex() on behalf of the waiting task prior to
requeuing and waking it by the requeueing thread. The latter is
executed upon wakeup by the waiting thread which somehow manages to call
the new __rt_mutex_slowlock() with waiter->task != NULL and still
succeed with try_to_take_lock(), this leads to corruption of the plists
and an eventual failed paging request. See 7/7 for the rather crude
testcase that causes this. Any tips on where this race might be
occuring are welcome.

Changelog:
V5: -Update futex_requeue to allow for nr_requeue == 0
-Whitespace cleanup
-Added task_count var to futex_requeue to avoid confusion between
ret, res, and ret used to count wakes and requeues
V4: -Cleanups to pass checkpatch.pl
-Added missing goto out; in futex_wait_requeue_pi
-Moved rt_mutex_handle_wakeup to the rt_mutex_enqueue_task patch as they
are a functional pair.
-Fixed several error exit paths that failed to unqueue the futex_q, which
not only would leave the futex_q on the hb, but would have caused an exit
race with the waiter since they weren't synchonized on the hb lock. Thanks
Sripathi for catching this.
-Fix pi_state handling in futex_requeue
-Several other minor fixes to futex_requeue_pi
-add requeue_futex function and force the requeue in requeue_pi even for the
task we wake in the requeue loop
-refill the pi state cache at the beginning of futex_requeue for requeue_pi
-have futex_requeue_pi_init ensure it stores off the pi_state for use in
futex_requeue
- Delayed starting the hrtimer until after TASK_INTERRUPTIBLE is set
- Fixed NULL pointer bug when futex_wait_requeue_pi has no timer and
receives a signal after waking on uaddr2. Added has_timeout to the
restart->futex structure.
V3: -Added FUTEX_CMP_REQUEUE_PI op
-Put fshared support back. So long as it is encoded in the op code, we
assume both the uaddr's are either private or share, but not mixed.
-Fixed access to expected value of uaddr2 in futex_wait_requeue_pi
V2: -Added rt_mutex enqueueing to futex_requeue_pi_init
-Updated fault handling and exit logic
V1: -Initial verion

Signed-off-by: Darren Hart <[email protected]>
---

include/asm-generic/errno.h | 2
include/linux/futex.h | 8 +
include/linux/thread_info.h | 4
kernel/futex.c | 653 ++++++++++++++++++++++++++++++++++++++++---
4 files changed, 623 insertions(+), 44 deletions(-)


diff --git a/include/asm-generic/errno.h b/include/asm-generic/errno.h
index e8852c0..eaeda1b 100644
--- a/include/asm-generic/errno.h
+++ b/include/asm-generic/errno.h
@@ -106,4 +106,6 @@
#define EOWNERDEAD 130 /* Owner died */
#define ENOTRECOVERABLE 131 /* State not recoverable */

+#define EMORON 132 /* Dumb user error */
+
#endif
diff --git a/include/linux/futex.h b/include/linux/futex.h
index 3bf5bb5..b05519c 100644
--- a/include/linux/futex.h
+++ b/include/linux/futex.h
@@ -23,6 +23,9 @@ union ktime;
#define FUTEX_TRYLOCK_PI 8
#define FUTEX_WAIT_BITSET 9
#define FUTEX_WAKE_BITSET 10
+#define FUTEX_WAIT_REQUEUE_PI 11
+#define FUTEX_REQUEUE_PI 12
+#define FUTEX_CMP_REQUEUE_PI 13

#define FUTEX_PRIVATE_FLAG 128
#define FUTEX_CLOCK_REALTIME 256
@@ -38,6 +41,11 @@ union ktime;
#define FUTEX_TRYLOCK_PI_PRIVATE (FUTEX_TRYLOCK_PI | FUTEX_PRIVATE_FLAG)
#define FUTEX_WAIT_BITSET_PRIVATE (FUTEX_WAIT_BITS | FUTEX_PRIVATE_FLAG)
#define FUTEX_WAKE_BITSET_PRIVATE (FUTEX_WAKE_BITS | FUTEX_PRIVATE_FLAG)
+#define FUTEX_WAIT_REQUEUE_PI_PRIVATE (FUTEX_WAIT_REQUEUE_PI | \
+ FUTEX_PRIVATE_FLAG)
+#define FUTEX_REQUEUE_PI_PRIVATE (FUTEX_REQUEUE_PI | FUTEX_PRIVATE_FLAG)
+#define FUTEX_CMP_REQUEUE_PI_PRIVATE (FUTEX_CMP_REQUEUE_PI | \
+ FUTEX_PRIVATE_FLAG)

/*
* Support for robust futexes: the kernel cleans up held futexes at
diff --git a/include/linux/thread_info.h b/include/linux/thread_info.h
index e6b820f..bd1e2ab 100644
--- a/include/linux/thread_info.h
+++ b/include/linux/thread_info.h
@@ -21,13 +21,15 @@ struct restart_block {
struct {
unsigned long arg0, arg1, arg2, arg3;
};
- /* For futex_wait */
+ /* For futex_wait and futex_wait_requeue_pi */
struct {
u32 *uaddr;
u32 val;
u32 flags;
u32 bitset;
+ int has_timeout;
u64 time;
+ u32 *uaddr2;
} futex;
/* For nanosleep */
struct {
diff --git a/kernel/futex.c b/kernel/futex.c
index 9446494..e020b92 100644
--- a/kernel/futex.c
+++ b/kernel/futex.c
@@ -109,10 +109,17 @@ struct futex_q {
struct futex_pi_state *pi_state;
struct task_struct *task;

+ /* rt_waiter storage for requeue_pi: */
+ struct rt_mutex_waiter *rt_waiter;
+
/* Bitset for the optional bitmasked wakeup */
u32 bitset;
};

+/* dvhart: FIXME: some commentary here would help to clarify that hb->chain is
+ * actually the queue which contains multiple instances of futex_q - one per
+ * waiter. The naming is extremely unfortunate as it refects the datastructure
+ * more than its usage. */
/*
* Split the global futex_lock into every hash list lock.
*/
@@ -189,6 +196,7 @@ static void drop_futex_key_refs(union futex_key *key)
/**
* get_futex_key - Get parameters which are the keys for a futex.
* @uaddr: virtual address of the futex
+ * dvhart FIXME: incorrent shared comment (fshared, and it's a boolean int)
* @shared: NULL for a PROCESS_PRIVATE futex,
* &current->mm->mmap_sem for a PROCESS_SHARED futex
* @key: address where result is stored.
@@ -200,6 +208,7 @@ static void drop_futex_key_refs(union futex_key *key)
* offset_within_page). For private mappings, it's (uaddr, current->mm).
* We can usually work out the index without swapping in the page.
*
+ * dvhart FIXME: incorrent shared comment (fshared, and it's a boolean int)
* fshared is NULL for PROCESS_PRIVATE futexes
* For other futexes, it points to &current->mm->mmap_sem and
* caller must have taken the reader lock. but NOT any spinlocks.
@@ -429,6 +438,21 @@ static void free_pi_state(struct futex_pi_state *pi_state)
}

/*
+ * futex_requeue_pi_cleanup - cleanup after futex_requeue_pi_init after failed
+ * lock acquisition.
+ * @q: the futex_q of the futex we failed to acquire
+ */
+static void futex_requeue_pi_cleanup(struct futex_q *q)
+{
+ if (!q->pi_state)
+ return;
+ if (rt_mutex_owner(&q->pi_state->pi_mutex) == current)
+ rt_mutex_unlock(&q->pi_state->pi_mutex);
+ else
+ free_pi_state(q->pi_state);
+}
+
+/*
* Look up the task based on what TID userspace gave us.
* We dont trust it.
*/
@@ -736,6 +760,7 @@ static void wake_futex(struct futex_q *q)
* at the end of wake_up_all() does not prevent this store from
* moving.
*/
+ /* dvhart FIXME: "end of wake_up()" */
smp_wmb();
q->lock_ptr = NULL;
}
@@ -834,6 +859,12 @@ double_lock_hb(struct futex_hash_bucket *hb1, struct futex_hash_bucket *hb2)
}
}

+/* dvhart FIXME: the wording here is inaccurate as both the physical page and
+ * the offset are required for the hashing, it is also non-intuitve as most
+ * will be thinking of "the futex" not "the physical page and offset this
+ * virtual address points to". Used throughout - consider wholesale cleanup of
+ * function commentary.
+ */
/*
* Wake up all waiters hashed on the physical page that is mapped
* to this virtual address:
@@ -988,19 +1019,123 @@ out:
}

/*
- * Requeue all waiters hashed on one physical page to another
- * physical page.
+ * futex_requeue_pi_init - prepare the top waiter to lock the pi futex on wake
+ * @pifutex: the user address of the to futex
+ * @hb1: the from futex hash bucket, must be locked by the caller
+ * @hb2: the to futex hash bucket, must be locked by the caller
+ * @key1: the from futex key
+ * @key2: the to futex key
+ *
+ * Returns 0 on success, a negative error code on failure.
+ *
+ * Prepare the top_waiter and the pi_futex for requeing. We handle
+ * the userspace r/w here so that we can handle any faults prior
+ * to entering the requeue loop. hb1 and hb2 must be held by the caller.
+ *
+ * Faults occur for two primary reasons at this point:
+ * 1) The address isn't mapped (what? you didn't use mlock() in your real-time
+ * application code? *gasp*)
+ * 2) The address isn't writeable
+ *
+ * We return EFAULT on either of these cases and rely on futex_requeue to
+ * handle them.
+ */
+static int futex_requeue_pi_init(u32 __user *pifutex,
+ struct futex_hash_bucket *hb1,
+ struct futex_hash_bucket *hb2,
+ union futex_key *key1, union futex_key *key2,
+ struct futex_pi_state **ps)
+{
+ u32 curval;
+ struct futex_q *top_waiter;
+ pid_t pid;
+ int ret;
+
+ if (get_futex_value_locked(&curval, pifutex))
+ return -EFAULT;
+
+ top_waiter = futex_top_waiter(hb1, key1);
+
+ /* There are no waiters, nothing for us to do. */
+ if (!top_waiter)
+ return 0;
+
+ /*
+ * The pifutex has an owner, make sure it's us, if not complain
+ * to userspace.
+ * FIXME_LATER: handle this gracefully
+ */
+ pid = curval & FUTEX_TID_MASK;
+ if (pid && pid != task_pid_vnr(current))
+ return -EMORON;
+
+ /*
+ * Current should own pifutex, but it could be uncontended. Here we
+ * either take the lock for top_waiter or set the FUTEX_WAITERS bit.
+ * The pi_state is also looked up, but we don't care about the return
+ * code as we'll have to look that up during requeue for each waiter
+ * anyway.
+ */
+ ret = futex_lock_pi_atomic(pifutex, hb2, key2, ps, top_waiter->task);
+
+ /*
+ * At this point the top_waiter has either taken the pifutex or it is
+ * waiting on it. If the former, then the pi_state will not exist yet,
+ * look it up one more time to ensure we have a reference to it.
+ */
+ if (!ps /* FIXME && some ret values in here I think ... */)
+ ret = lookup_pi_state(curval, hb2, key2, ps);
+ return ret;
+}
+
+static inline
+void requeue_futex(struct futex_q *q, struct futex_hash_bucket *hb1,
+ struct futex_hash_bucket *hb2, union futex_key *key2)
+{
+
+ /*
+ * If key1 and key2 hash to the same bucket, no need to
+ * requeue.
+ */
+ if (likely(&hb1->chain != &hb2->chain)) {
+ plist_del(&q->list, &hb1->chain);
+ plist_add(&q->list, &hb2->chain);
+ q->lock_ptr = &hb2->lock;
+#ifdef CONFIG_DEBUG_PI_LIST
+ q->list.plist.lock = &hb2->lock;
+#endif
+ }
+ get_futex_key_refs(key2);
+ q->key = *key2;
+}
+
+/*
+ * Requeue all waiters hashed on one physical page to another physical page.
+ * In the requeue_pi case, either takeover uaddr2 or set FUTEX_WAITERS and
+ * setup the pistate. FUTEX_REQUEUE_PI only supports requeueing from a non-pi
+ * futex to a pi futex.
*/
static int futex_requeue(u32 __user *uaddr1, int fshared, u32 __user *uaddr2,
- int nr_wake, int nr_requeue, u32 *cmpval)
+ int nr_wake, int nr_requeue, u32 *cmpval,
+ int requeue_pi)
{
union futex_key key1 = FUTEX_KEY_INIT, key2 = FUTEX_KEY_INIT;
struct futex_hash_bucket *hb1, *hb2;
struct plist_head *head1;
struct futex_q *this, *next;
- int ret, drop_count = 0;
+ u32 curval2;
+ struct futex_pi_state *pi_state = NULL;
+ int drop_count = 0, attempt = 0, task_count = 0, ret;
+
+ if (requeue_pi && refill_pi_state_cache())
+ return -ENOMEM;

retry:
+ if (pi_state != NULL) {
+ free_pi_state(pi_state);
+ pi_state = NULL;
+ }
+
ret = get_futex_key(uaddr1, fshared, &key1);
if (unlikely(ret != 0))
goto out;
@@ -1023,12 +1158,15 @@ retry:
if (hb1 != hb2)
spin_unlock(&hb2->lock);

+ put_futex_key(fshared, &key2);
+ put_futex_key(fshared, &key1);
+
ret = get_user(curval, uaddr1);

if (!ret)
goto retry;

- goto out_put_keys;
+ goto out;
}
if (curval != *cmpval) {
ret = -EAGAIN;
@@ -1036,32 +1174,104 @@ retry:
}
}

+ if (requeue_pi) {
+ /* FIXME: should we handle the no waiters case special? */
+ ret = futex_requeue_pi_init(uaddr2, hb1, hb2, &key1, &key2,
+ &pi_state);
+
+ if (!ret)
+ ret = get_futex_value_locked(&curval2, uaddr2);
+
+ switch (ret) {
+ case 0:
+ break;
+ case 1:
+ /* we got the lock */
+ ret = 0;
+ break;
+ case -EFAULT:
+ /*
+ * We handle the fault here instead of in
+ * futex_requeue_pi_init because we have to reacquire
+ * both locks to avoid deadlock.
+ */
+ spin_unlock(&hb1->lock);
+ if (hb1 != hb2)
+ spin_unlock(&hb2->lock);
+
+ put_futex_key(fshared, &key2);
+ put_futex_key(fshared, &key1);
+
+ if (attempt++) {
+ ret = futex_handle_fault((unsigned long)uaddr2,
+ attempt);
+ if (ret)
+ goto out;
+ goto retry;
+ }
+
+ ret = get_user(curval2, uaddr2);
+
+ if (!ret)
+ goto retry;
+ goto out;
+ case -EAGAIN:
+ /* The owner was exiting, try again. */
+ spin_unlock(&hb1->lock);
+ if (hb1 != hb2)
+ spin_unlock(&hb2->lock);
+ put_futex_key(fshared, &key2);
+ put_futex_key(fshared, &key1);
+ cond_resched();
+ goto retry;
+ default:
+ goto out_unlock;
+ }
+ }
+
head1 = &hb1->chain;
plist_for_each_entry_safe(this, next, head1, list) {
if (!match_futex (&this->key, &key1))
continue;
- if (++ret <= nr_wake) {
- wake_futex(this);
+ /*
+ * Regardless of if we are waking or requeueing, we need to
+ * prepare the waiting task to take the rt_mutex in the
+ * requeue_pi case. If we gave the lock to the top_waiter in
+ * futex_requeue_pi_init() then don't enqueue that task as a
+ * waiter on the rt_mutex (it already owns it).
+ */
+ if (requeue_pi &&
+ ((curval2 & FUTEX_TID_MASK) != task_pid_vnr(this->task))) {
+ atomic_inc(&pi_state->refcount);
+ this->pi_state = pi_state;
+ ret = rt_mutex_start_proxy_lock(&pi_state->pi_mutex,
+ this->rt_waiter,
+ this->task, 1);
+ if (ret)
+ goto out_unlock;
+ }
+
+ if (++task_count <= nr_wake) {
+ if (requeue_pi) {
+ /*
+ * In the case of requeue_pi we need to know if
+ * we were requeued or not, so do the requeue
+ * regardless if we are to wake this task.
+ */
+ requeue_futex(this, hb1, hb2, &key2);
+ drop_count++;
+ /* FIXME: look into altering wake_futex so we
+ * can use it (we can't null the lock_ptr) */
+ wake_up(&this->waiter);
+ } else
+ wake_futex(this);
} else {
- /*
- * If key1 and key2 hash to the same bucket, no need to
- * requeue.
- */
- if (likely(head1 != &hb2->chain)) {
- plist_del(&this->list, &hb1->chain);
- plist_add(&this->list, &hb2->chain);
- this->lock_ptr = &hb2->lock;
-#ifdef CONFIG_DEBUG_PI_LIST
- this->list.plist.lock = &hb2->lock;
-#endif
- }
- this->key = key2;
- get_futex_key_refs(&key2);
+ requeue_futex(this, hb1, hb2, &key2);
drop_count++;
-
- if (ret - nr_wake >= nr_requeue)
- break;
}
+
+ if (task_count - nr_wake >= nr_requeue)
+ break;
}

out_unlock:
@@ -1073,12 +1283,13 @@ out_unlock:
while (--drop_count >= 0)
drop_futex_key_refs(&key1);

-out_put_keys:
put_futex_key(fshared, &key2);
out_put_key1:
put_futex_key(fshared, &key1);
out:
- return ret;
+ if (pi_state != NULL)
+ free_pi_state(pi_state);
+ return ret ? ret : task_count;
}

/* The key must be already stored in q->key. */
@@ -1180,6 +1391,7 @@ retry:
*/
static void unqueue_me_pi(struct futex_q *q)
{
+ /* FIXME: hitting this warning for requeue_pi */
WARN_ON(plist_node_empty(&q->list));
plist_del(&q->list, &q->list.plist);

@@ -1302,6 +1514,8 @@ handle_fault:
#define FLAGS_CLOCKRT 0x02

static long futex_wait_restart(struct restart_block *restart);
+static long futex_wait_requeue_pi_restart(struct restart_block *restart);
+static long futex_lock_pi_restart(struct restart_block *restart);

/* finish_futex_lock_pi - post lock pi_state and corner case management
* @uaddr: the user address of the futex
@@ -1466,6 +1680,9 @@ retry:

hb = queue_lock(&q);

+ /* dvhart FIXME: we access the page before it is queued... obsolete
+ * comments? */
+
/*
* Access the page AFTER the futex is queued.
* Order is important:
@@ -1529,6 +1746,7 @@ retry:
restart->fn = futex_wait_restart;
restart->futex.uaddr = (u32 *)uaddr;
restart->futex.val = val;
+ restart->futex.has_timeout = 1;
restart->futex.time = abs_time->tv64;
restart->futex.bitset = bitset;
restart->futex.flags = 0;
@@ -1559,12 +1777,16 @@ static long futex_wait_restart(struct restart_block *restart)
u32 __user *uaddr = (u32 __user *)restart->futex.uaddr;
int fshared = 0;
ktime_t t;
+ ktime_t *tp = NULL;

- t.tv64 = restart->futex.time;
+ if (restart->futex.has_timeout) {
+ t.tv64 = restart->futex.time;
+ tp = &t;
+ }
restart->fn = do_no_restart_syscall;
if (restart->futex.flags & FLAGS_SHARED)
fshared = 1;
- return (long)futex_wait(uaddr, fshared, restart->futex.val, &t,
+ return (long)futex_wait(uaddr, fshared, restart->futex.val, tp,
restart->futex.bitset,
restart->futex.flags & FLAGS_CLOCKRT);
}
@@ -1621,6 +1843,7 @@ retry_unlocked:
* complete.
*/
queue_unlock(&q, hb);
+ /* FIXME: need to put_futex_key() ? */
cond_resched();
goto retry;
default:
@@ -1653,16 +1876,6 @@ retry_unlocked:

goto out;

-out_unlock_put_key:
- queue_unlock(&q, hb);
-
-out_put_key:
- put_futex_key(fshared, &q.key);
-out:
- if (to)
- destroy_hrtimer_on_stack(&to->timer);
- return ret;
-
uaddr_faulted:
/*
* We have to r/w *(int __user *)uaddr, and we have to modify it
@@ -1685,6 +1898,34 @@ uaddr_faulted:
goto retry;

goto out;
+
+out_unlock_put_key:
+ queue_unlock(&q, hb);
+
+out_put_key:
+ put_futex_key(fshared, &q.key);
+
+out:
+ if (to)
+ destroy_hrtimer_on_stack(&to->timer);
+ return ret;
+
+}
+
+static long futex_lock_pi_restart(struct restart_block *restart)
+{
+ u32 __user *uaddr = (u32 __user *)restart->futex.uaddr;
+ ktime_t t;
+ ktime_t *tp = NULL;
+ int fshared = restart->futex.flags & FLAGS_SHARED;
+
+ if (restart->futex.has_timeout) {
+ t.tv64 = restart->futex.time;
+ tp = &t;
+ }
+ restart->fn = do_no_restart_syscall;
+
+ return (long)futex_lock_pi(uaddr, fshared, restart->futex.val, tp, 0);
}

/*
@@ -1797,6 +2038,316 @@ pi_faulted:
}

/*
+ * futex_wait_requeue_pi - wait on futex1 (uaddr) and take the futex2 (uaddr2)
+ * before returning
+ * @uaddr: the futex we initialyl wait on (possibly non-pi)
+ * @fshared: whether the futexes are shared (1) or not (0). They must be the
+ * same type, no requeueing from private to shared, etc.
+ * @val: the expected value of uaddr
+ * @abs_time: absolute timeout
+ * @bitset: FIXME ???
+ * @clockrt: whether to use CLOCK_REALTIME (1) or CLOCK_MONOTONIC (0)
+ * @uaddr2: the pi futex we will take prior to returning to user-space
+ *
+ * The caller will wait on futex1 (uaddr) and will be requeued by
+ * futex_requeue() to futex2 (uaddr2) which must be PI aware. Normal wakeup
+ * will wake on futex2 and then proceed to take the underlying rt_mutex prior
+ * to returning to userspace. This ensures the rt_mutex maintains an owner
+ * when it has waiters. Without one we won't know who to boost/deboost, if
+ * there was a need to.
+ *
+ * We call schedule in futex_wait_queue_me() when we enqueue and return there
+ * via the following:
+ * 1) signal
+ * 2) timeout
+ * 3) wakeup on the first futex (uaddr)
+ * 4) wakeup on the second futex (uaddr2, the pi futex) after a requeue
+ *
+ * If 1 or 2, we need to check if we got the rtmutex, setup the pi_state, or
+ * were enqueued on the rt_mutex via futex_requeue_pi_init() just before the
+ * signal or timeout arrived. If so, we need to clean that up. Note: the
+ * setting of FUTEX_WAITERS will be handled when the owner unlocks the
+ * rt_mutex.
+ *
+ * If 3, userspace wrongly called FUTEX_WAKE on the first futex (uaddr) rather
+ * than using the FUTEX_REQUEUE_PI call with nr_requeue=0. Return -EINVAL.
+ *
+ * If 4, we may then block on trying to take the rt_mutex and return via:
+ * 5) signal
+ * 6) timeout
+ * 7) successful lock
+ *
+ * If 5, we setup a restart_block with futex_lock_pi() as the function.
+ *
+ * If 6, we cleanup and return with -ETIMEDOUT.
+ *
+ * TODO:
+ * o once we have the all the return points correct, we need to collect
+ * common code into exit labels.
+ *
+ * Returns:
+ * 0 Success
+ * -EFAULT For various faults
+ * -EWOULDBLOCK uaddr has an unexpected value (it changed
+ * before we got going)
+ * -ETIMEDOUT timeout (from either wait on futex1 or locking
+ * futex2)
+ * -ERESTARTSYS Signal received (during wait on futex1) with no
+ * timeout
+ * -ERESTART_RESTARTBLOCK Signal received (during wait on futex1)
+ * -RESTARTNOINTR Signal received (during lock of futex2)
+ * -EINVAL No bitset, woke via FUTEX_WAKE, etc.
+ *
+ * May also passthrough the follpowing return codes (not exhaustive):
+ * -EPERM see get_futex_key()
+ * -EACCESS see get_futex_key()
+ * -ENOMEM see get_user_pages()
+ *
+ */
+static int futex_wait_requeue_pi(u32 __user *uaddr, int fshared,
+ u32 val, ktime_t *abs_time, u32 bitset,
+ int clockrt, u32 __user *uaddr2)
+{
+ struct futex_hash_bucket *hb;
+ struct futex_q q;
+ union futex_key key2 = FUTEX_KEY_INIT;
+ u32 uval;
+ struct rt_mutex *pi_mutex;
+ struct rt_mutex_waiter rt_waiter;
+ struct hrtimer_sleeper timeout, *to = NULL;
+ int requeued = 0;
+ int ret;
+
+ if (!bitset)
+ return -EINVAL;
+
+ if (abs_time) {
+ unsigned long slack;
+ to = &timeout;
+ slack = current->timer_slack_ns;
+ if (rt_task(current))
+ slack = 0;
+ hrtimer_init_on_stack(&to->timer, clockrt ? CLOCK_REALTIME :
+ CLOCK_MONOTONIC, HRTIMER_MODE_ABS);
+ hrtimer_init_sleeper(to, current);
+ hrtimer_set_expires_range_ns(&to->timer, *abs_time, slack);
+ }
+
+ /*
+ * The waiter is allocated on our stack, manipulated by the requeue
+ * code while we sleep on the initial futex (uaddr).
+ */
+ debug_rt_mutex_init_waiter(&rt_waiter);
+ rt_waiter.task = NULL;
+
+ q.pi_state = NULL;
+ q.bitset = bitset;
+ q.rt_waiter = &rt_waiter;
+
+retry:
+ q.key = FUTEX_KEY_INIT;
+ ret = get_futex_key(uaddr, fshared, &q.key);
+ if (unlikely(ret != 0))
+ goto out;
+
+ ret = get_futex_key(uaddr2, fshared, &key2);
+ if (unlikely(ret != 0)) {
+ drop_futex_key_refs(&q.key);
+ goto out;
+ }
+
+ hb = queue_lock(&q);
+
+ /* dvhart FIXME: we access the page before it is queued... obsolete
+ * comments? */
+ /*
+ * Access the page AFTER the futex is queued.
+ * Order is important:
+ *
+ * Userspace waiter: val = var; if (cond(val)) futex_wait(&var, val);
+ * Userspace waker: if (cond(var)) { var = new; futex_wake(&var); }
+ *
+ * The basic logical guarantee of a futex is that it blocks ONLY
+ * if cond(var) is known to be true at the time of blocking, for
+ * any cond. If we queued after testing *uaddr, that would open
+ * a race condition where we could block indefinitely with
+ * cond(var) false, which would violate the guarantee.
+ *
+ * A consequence is that futex_wait() can return zero and absorb
+ * a wakeup when *uaddr != val on entry to the syscall. This is
+ * rare, but normal.
+ *
+ * for shared futexes, we hold the mmap semaphore, so the mapping
+ * cannot have changed since we looked it up in get_futex_key.
+ */
+ ret = get_futex_value_locked(&uval, uaddr);
+
+ if (unlikely(ret)) {
+ queue_unlock(&q, hb);
+ put_futex_key(fshared, &q.key);
+
+ ret = get_user(uval, uaddr);
+
+ if (!ret)
+ goto retry;
+ goto out;
+ }
+ ret = -EWOULDBLOCK;
+
+ /* Only actually queue if *uaddr contained val. */
+ if (uval != val) {
+ queue_unlock(&q, hb);
+ put_futex_key(fshared, &q.key);
+ goto out;
+ }
+
+ /* queue_me and wait for wakeup, timeout, or a signal. */
+ futex_wait_queue_me(hb, &q, to);
+
+ /*
+ * Upon return from futex_wait_queue_me, we no longer hold the hb lock,
+ * but do still hold a key reference. unqueue_me* will drop a key
+ * reference to q.key.
+ */
+
+ requeued = match_futex(&q.key, &key2);
+ drop_futex_key_refs(&key2);
+
+ if (!requeued) {
+ /* Handle wakeup from futex1 (uaddr). */
+ ret = unqueue_me(&q);
+ if (unlikely(ret)) { /* signal */
+ /*
+ * We expect signal_pending(current), but another
+ * thread may have handled it for us already.
+ */
+ if (!abs_time) {
+ ret = -ERESTARTSYS;
+ } else {
+ struct restart_block *restart;
+ restart = &current_thread_info()->restart_block;
+ restart->fn = futex_wait_requeue_pi_restart;
+ restart->futex.uaddr = (u32 *)uaddr;
+ restart->futex.val = val;
+ restart->futex.has_timeout = 1;
+ restart->futex.time = abs_time->tv64;
+ restart->futex.bitset = bitset;
+ restart->futex.flags = 0;
+ restart->futex.uaddr2 = (u32 *)uaddr2;
+
+ if (fshared)
+ restart->futex.flags |= FLAGS_SHARED;
+ if (clockrt)
+ restart->futex.flags |= FLAGS_CLOCKRT;
+ ret = -ERESTART_RESTARTBLOCK;
+ }
+ } else if (to && !to->task) {/* timeout */
+ ret = -ETIMEDOUT;
+ } else {
+ /*
+ * We woke on uaddr, so userspace probably paired a
+ * FUTEX_WAKE with FUTEX_WAIT_REQUEUE_PI, which is not
+ * valid.
+ */
+ ret = -EINVAL;
+ }
+ goto out;
+ }
+
+ /* Handle wakeup from rt_mutex of futex2 (uaddr2). */
+
+ /* FIXME: this test is REALLY scary... gotta be a better way...
+ * If the pi futex was uncontended, futex_requeue_pi_init() gave us
+ * the lock.
+ */
+ if (!q.pi_state) {
+ ret = 0;
+ goto out;
+ }
+ pi_mutex = &q.pi_state->pi_mutex;
+
+ ret = rt_mutex_finish_proxy_lock(pi_mutex, to, &rt_waiter, 1);
+ debug_rt_mutex_free_waiter(&waiter);
+
+ if (ret) {
+ if (get_user(uval, uaddr2))
+ ret = -EFAULT;
+
+ if (ret == -EINTR) {
+ /*
+ * We've already been requeued and enqueued on the
+ * rt_mutex, so restart by calling futex_lock_pi()
+ * directly, rather then returning to this function.
+ */
+ struct restart_block *restart;
+ restart = &current_thread_info()->restart_block;
+ restart->fn = futex_lock_pi_restart;
+ restart->futex.uaddr = (u32 *)uaddr2;
+ restart->futex.val = uval;
+ if (abs_time) {
+ restart->futex.has_timeout = 1;
+ restart->futex.time = abs_time->tv64;
+ } else
+ restart->futex.has_timeout = 0;
+ restart->futex.flags = 0;
+
+ if (fshared)
+ restart->futex.flags |= FLAGS_SHARED;
+ if (clockrt)
+ restart->futex.flags |= FLAGS_CLOCKRT;
+ ret = -ERESTART_RESTARTBLOCK;
+ }
+ }
+
+ spin_lock(q.lock_ptr);
+ ret = finish_futex_lock_pi(uaddr, fshared, &q, ret);
+
+ /* Unqueue and drop the lock. */
+ unqueue_me_pi(&q);
+
+out:
+ if (to) {
+ hrtimer_cancel(&to->timer);
+ destroy_hrtimer_on_stack(&to->timer);
+ }
+ if (requeued) {
+ /* We were requeued, so we now have two reference to key2,
+ * unqueue_me_pi releases one of them, we must release the
+ * other. */
+ drop_futex_key_refs(&key2);
+ if (ret) {
+ futex_requeue_pi_cleanup(&q);
+ if (get_user(uval, uaddr2))
+ ret = -EFAULT;
+ if (ret != -ERESTART_RESTARTBLOCK && ret != -EFAULT)
+ ret = futex_lock_pi(uaddr2, fshared, uval,
+ abs_time, 0);
+ }
+ }
+ return ret;
+}
+
+static long futex_wait_requeue_pi_restart(struct restart_block *restart)
+{
+ u32 __user *uaddr = (u32 __user *)restart->futex.uaddr;
+ u32 __user *uaddr2 = (u32 __user *)restart->futex.uaddr2;
+ ktime_t t;
+ ktime_t *tp = NULL;
+ int fshared = restart->futex.flags & FLAGS_SHARED;
+ int clockrt = restart->futex.flags & FLAGS_CLOCKRT;
+ int ret;
+
+ if (restart->futex.has_timeout) {
+ t.tv64 = restart->futex.time;
+ tp = &t;
+ }
+ restart->fn = do_no_restart_syscall;
+ ret = futex_wait_requeue_pi(uaddr, fshared, restart->futex.val, tp,
+ restart->futex.bitset, clockrt, uaddr2);
+ return (long)ret;
+}
+
+/*
* Support for robust futexes: the kernel cleans up held futexes at
* thread exit time.
*
@@ -2016,7 +2567,7 @@ long do_futex(u32 __user *uaddr, int op, u32 val, ktime_t *timeout,
fshared = 1;

clockrt = op & FUTEX_CLOCK_REALTIME;
- if (clockrt && cmd != FUTEX_WAIT_BITSET)
+ if (clockrt && cmd != FUTEX_WAIT_BITSET && cmd != FUTEX_WAIT_REQUEUE_PI)
return -ENOSYS;

switch (cmd) {
@@ -2031,10 +2582,11 @@ long do_futex(u32 __user *uaddr, int op, u32 val, ktime_t *timeout,
ret = futex_wake(uaddr, fshared, val, val3);
break;
case FUTEX_REQUEUE:
- ret = futex_requeue(uaddr, fshared, uaddr2, val, val2, NULL);
+ ret = futex_requeue(uaddr, fshared, uaddr2, val, val2, NULL, 0);
break;
case FUTEX_CMP_REQUEUE:
- ret = futex_requeue(uaddr, fshared, uaddr2, val, val2, &val3);
+ ret = futex_requeue(uaddr, fshared, uaddr2, val, val2, &val3,
+ 0);
break;
case FUTEX_WAKE_OP:
ret = futex_wake_op(uaddr, fshared, uaddr2, val, val2, val3);
@@ -2051,6 +2603,18 @@ long do_futex(u32 __user *uaddr, int op, u32 val, ktime_t *timeout,
if (futex_cmpxchg_enabled)
ret = futex_lock_pi(uaddr, fshared, 0, timeout, 1);
break;
+ case FUTEX_WAIT_REQUEUE_PI:
+ val3 = FUTEX_BITSET_MATCH_ANY;
+ ret = futex_wait_requeue_pi(uaddr, fshared, val, timeout, val3,
+ clockrt, uaddr2);
+ break;
+ case FUTEX_REQUEUE_PI:
+ ret = futex_requeue(uaddr, fshared, uaddr2, val, val2, NULL, 1);
+ break;
+ case FUTEX_CMP_REQUEUE_PI:
+ ret = futex_requeue(uaddr, fshared, uaddr2, val, val2, &val3,
+ 1);
+ break;
default:
ret = -ENOSYS;
}
@@ -2068,22 +2632,25 @@ asmlinkage long sys_futex(u32 __user *uaddr, int op, u32 val,
int cmd = op & FUTEX_CMD_MASK;

if (utime && (cmd == FUTEX_WAIT || cmd == FUTEX_LOCK_PI ||
- cmd == FUTEX_WAIT_BITSET)) {
+ cmd == FUTEX_WAIT_BITSET ||
+ cmd == FUTEX_WAIT_REQUEUE_PI)) {
if (copy_from_user(&ts, utime, sizeof(ts)) != 0)
return -EFAULT;
if (!timespec_valid(&ts))
return -EINVAL;

t = timespec_to_ktime(ts);
+ /* dvhart FIXME: do we add FUTEX_WAIT_REQUEUE_PI here? */
if (cmd == FUTEX_WAIT)
t = ktime_add_safe(ktime_get(), t);
tp = &t;
}
/*
- * requeue parameter in 'utime' if cmd == FUTEX_REQUEUE.
+ * requeue parameter in 'utime' if cmd == FUTEX_*_REQUEUE_*.
* number of waiters to wake in 'utime' if cmd == FUTEX_WAKE_OP.
*/
if (cmd == FUTEX_REQUEUE || cmd == FUTEX_CMP_REQUEUE ||
+ cmd == FUTEX_REQUEUE_PI || cmd == FUTEX_CMP_REQUEUE_PI ||
cmd == FUTEX_WAKE_OP)
val2 = (u32) (unsigned long) utime;


--
Darren Hart
IBM Linux Technology Center
Real-Time Linux Team

2009-03-03 00:24:30

by Darren Hart

[permalink] [raw]
Subject: [TIP][RFC 7/7] requeue pi testcase

The following is a rough start at a test case to stress the
FUTEX_REQUEUE_PI system call, mimicking a pthread_cond_broadcast and
pthread_cond_signal style wakeup. The signal style wakeup is where I've
been seeing the failed paging request.

To compile:
$ gcc -D_GNU_SOURCE -lrt -lpthread requeue_pi.c -o requeue_pi


/******************************************************************************
*
* Copyright ? International Business Machines Corp., 2006-2009
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
* the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* NAME
* requeue_pi.c
*
* DESCRIPTION
* This test excercises the futex syscall op codes needed for requeuing
* priority inheritance aware POSIX condition variables and mutexes.
*
* USAGE:
* test_requeue_pi
*
* AUTHORS
* Sripathi Kodi <[email protected]>
* Darren Hart <[email protected]>
*
* HISTORY
* 2008-Jan-13: Initial version by Sripathi Kodi <[email protected]>
*
*****************************************************************************/

/* TODO
* o add test for shared futexes
* o add test for non CMP requeue call
*/

/* SYS_futex arguments vary meaning across op codes
*
* u32 __user *uaddr
* user-space futex address
*
* int op
* FUTEX_* op code
*
* u32 val
* FUTEX_WAIT: value of the futex prior to the syscall
* FUTEX_REQUEUE_PI: number of threads to wake (nr_wake)
* FUTEX_WAIT_REQUEUE_PI: ?
*
* struct timespec __user *utime
* FUTEX_WAIT: struct timespec timeout
* FUTEX_REQUEUE_PI: number of threads to requeue (nr_requeue)
* FUTEX_WAIT_REQUEUE_PI: struct timespec timeout
*
* u32 __user *uaddr2
* FUTEX_WAIT: not used (NULL)
* FUTEX_REQUEUE_PI: user-space futex address of requeue target
* FUTEX_WAIT_REQUEUE_PI: user-space futex address of requeue target
*
* u32 val3)
* FUTEX_WAIT: not used (0) (set to bitset in kernel)
* FUTEX_REQUEUE_PI: cmpval for uaddr
* (FIXME: maybe only for FUTEX_CMP_REQUEUE_PI ??)
* FUTEX_WAIT_REQUEUE_PI: not used (0) (set to bitset in kernel)
*/

#include <stdio.h>
#include <pthread.h>
#include <string.h>
#include <stdlib.h>
#include <limits.h>

#define FUTEX_WAIT_REQUEUE_PI 11
#define FUTEX_REQUEUE_PI 12
#define FUTEX_CMP_REQUEUE_PI 13

#ifndef SYS_futex
#define SYS_futex 202
#endif

#ifndef FUTEX_WAIT
#define FUTEX_WAIT 0
#define FUTEX_WAKE 1
#endif

#define THREAD_MAX 10
pthread_mutex_t mutex;
unsigned int wait_q;


int create_pi_mutex(pthread_mutex_t *mutex)
{
int ret;
pthread_mutexattr_t mutexattr;

if ((ret = pthread_mutexattr_init(&mutexattr)) != 0) {
printf("pthread_mutexattr_init: %s\n", strerror(ret));
return -1;
}
if ((ret = pthread_mutexattr_setprotocol(&mutexattr, PTHREAD_PRIO_INHERIT)) != 0) {
printf("pthread_mutexattr_setprotocol: %s\n", strerror(ret));
pthread_mutexattr_destroy(&mutexattr);
return -1;
}
if ((ret = pthread_mutex_init(mutex, &mutexattr)) != 0) {
printf("pthread_mutex_init: %s\n", strerror(ret));
pthread_mutexattr_destroy(&mutexattr);
return -1;
}
return 0;
}

int create_rt_thread(pthread_t *pth, void*(*func)(void*), void *arg, int policy, int prio)
{
int ret;
struct sched_param schedp;
pthread_attr_t attr;

pthread_attr_init(&attr);
memset(&schedp, 0, sizeof(schedp));

if ((ret = pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED)) != 0) {
printf("pthread_attr_setinheritsched: %s\n", strerror(ret));
return -1;
}

if ((ret = pthread_attr_setschedpolicy(&attr, policy)) != 0) {
printf("pthread_attr_setschedpolicy: %s\n", strerror(ret));
return -1;
}

schedp.sched_priority = prio;
if ((ret = pthread_attr_setschedparam(&attr, &schedp)) != 0) {
printf("pthread_attr_setschedparam: %s\n", strerror(ret));
return -1;
}

if ((ret = pthread_create(pth, &attr, func, arg)) != 0) {
printf("pthread_create: %s\n", strerror(ret));
return -1;
}
return 0;
}


void *waiterfn(void *threadnum)
{
unsigned int old_val;
int ret;

printf("Waiter %ld: running\n", (long)threadnum);
/* Each thread sleeps for a different amount of time
* This is to avoid races, because we don't lock the
* external mutex here */
usleep(1000 * (long)threadnum);

/* FIXME: need to hold the mutex prior to waiting right?... sort of... */

printf("Waiter %ld: calling FUTEX_WAIT_REQUEUE_PI\n", (long)threadnum);
/* cond_wait */
old_val = wait_q;
if (ret = syscall(SYS_futex, &wait_q, FUTEX_WAIT_REQUEUE_PI, old_val,
NULL, &(mutex.__data.__lock), 0)) {
perror("waiterfn:");
}
pthread_mutex_unlock(&mutex);

printf("Waiter %ld: returned from FUTEX_WAIT_REQUEUE_PI with %d\n", (long)threadnum, ret);
return (void*)(long)ret;
}

void *broadcast_wakerfn(void *arg)
{
unsigned int old_val;
int nr_wake = 1;
int nr_requeue = INT_MAX;
int ret = 0;
sleep(1); /* FIXME: need a real sync mechanism - barrier maybe? */
printf("Waker: Calling broadcast\n");

pthread_mutex_lock(&mutex);
/* cond_broadcast */
old_val = wait_q;
ret = syscall(SYS_futex, &wait_q, FUTEX_CMP_REQUEUE_PI, nr_wake,
nr_requeue, &(mutex.__data.__lock), old_val);
pthread_mutex_unlock(&mutex);

printf("Waker: exiting\n");
return (void *)(long)ret;;
}

void *signal_wakerfn(void *arg)
{
unsigned int old_val;
int nr_wake = 1;
int nr_requeue = 0;
int i;
int ret = 0;
sleep(2);
for (i = 0; i < THREAD_MAX; i++) {
pthread_mutex_lock(&mutex);
printf("Waker: Calling signal\n");
/* cond_signal */
old_val = wait_q;
ret = syscall(SYS_futex, &wait_q, FUTEX_CMP_REQUEUE_PI, nr_wake,
nr_requeue, &(mutex.__data.__lock), old_val);
pthread_mutex_unlock(&mutex);
sleep(1); /* eliminate once the waiter takes/drops the lock */
}

printf("Waker: exiting\n");
return (void *)(long)ret;
}

/* Create a PI mutex and a condvar*/
/* Make 10 threads wait on the condvar */
/* Make a thread call broadcast on it */
int main()
{
pthread_t waiter[10];
pthread_t waker;
long i, ret;

if ((ret = create_pi_mutex(&mutex)) != 0) {
printf("Creating pi mutex failed\n");
exit(1);
}

/* pthread_cond_broadcast type test */
printf("\npthread_cond_broadcast style wakeup.\n");
for (i=0; i<THREAD_MAX; i++) {
if (ret = create_rt_thread(&waiter[i], waiterfn, (void *)i, SCHED_FIFO, 1)) {
printf("Creating thread failed\n");
exit(1);
}
}
if (ret = create_rt_thread(&waker, broadcast_wakerfn, NULL, SCHED_FIFO, 1)) {
printf("Creating thread failed\n");
exit(1);
}

/* Wait for threads to finish */
for (i=0; i<THREAD_MAX; i++) {
pthread_join(waiter[i], NULL);
}
pthread_join(waker, NULL);

/* pthread_cond_signal type test */
printf("\npthread_cond_signal style wakeup.\n");
for (i=0; i<THREAD_MAX; i++) {
if (ret = create_rt_thread(&waiter[i], waiterfn, (void *)i, SCHED_FIFO, 1)) {
printf("Creating thread failed\n");
exit(1);
}
}
if (ret = create_rt_thread(&waker, signal_wakerfn, NULL, SCHED_FIFO, 1)) {
printf("Creating thread failed\n");
exit(1);
}

/* Wait for threads to finish */
for (i=0; i<THREAD_MAX; i++) {
pthread_join(waiter[i], NULL);
}
pthread_join(waker, NULL);
}

--
Darren Hart
IBM Linux Technology Center
Real-Time Linux Team

2009-03-03 13:04:19

by Peter Zijlstra

[permalink] [raw]
Subject: Re: [TIP][RFC 3/7] futex: futex_lock_pi_atomic()

On Mon, 2009-03-02 at 16:13 -0800, Darren Hart wrote:
> + * Returns:
> + * - 0 on success (futex_q is setup and FUTEX_WAITERS bit is set on the futex)
> + * - 1 on a successful atomic acquisition of the lock
> + * - A negative error code on failure

That utterly confused me for a while.

Maybe something like:

0 - ready to wait
1 - acquired the lock

<0 - error

2009-03-03 17:29:53

by Darren Hart

[permalink] [raw]
Subject: Re: [TIP][RFC 3/7] futex: futex_lock_pi_atomic()

Peter Zijlstra wrote:

Hi Peter,

Thanks for taking a look.

> On Mon, 2009-03-02 at 16:13 -0800, Darren Hart wrote:
>> + * Returns:
>> + * - 0 on success (futex_q is setup and FUTEX_WAITERS bit is set on the futex)
>> + * - 1 on a successful atomic acquisition of the lock
>> + * - A negative error code on failure
>
> That utterly confused me for a while.

Sorry about that, guess my doc-fu needs some work.

>
> Maybe something like:
>
> 0 - ready to wait
> 1 - acquired the lock
>
> <0 - error

Sure thing, I've updated the patch and my git tree:

http://git.kernel.org/?p=linux/kernel/git/dvhart/linux-2.6-tip-hacks.git;a=shortlog;h=requeue-pi-dev

Note, this is a development branch and I continuously update it,
replacing patches, etc.


--
Darren Hart
IBM Linux Technology Center
Real-Time Linux Team

2009-03-04 07:53:56

by Darren Hart

[permalink] [raw]
Subject: Re: [TIP][RFC 6/7] futex: add requeue_pi calls

Darren Hart wrote:
> From: Darren Hart <[email protected]>
>
> PI Futexes must have an owner at all times, so the standard requeue
> commands
> aren't sufficient. The new commands properly manage pi futex ownership by
> ensuring a futex with waiters has an owner at all times. Once complete
> these
> patches will allow glibc to properly handle pi mutexes with
> pthread_condvars.
>
> The approach taken here is to create two new futex op codes:
>
> FUTEX_WAIT_REQUEUE_PI:
> Threads will use this op code to wait on a futex (such as a non-pi
> waitqueue)
> and wake after they have been requeued to a pi futex. Prior to
> returning to
> userspace, they will take this pi futex (and the underlying rt_mutex).
>
> futex_wait_requeue_pi() is currently the result of a high speed collision
> between futex_wait and futex_lock_pi (with the first part of futex_lock_pi
> being done by futex_requeue_pi_init() on behalf of the waiter).
>
> FUTEX_REQUEUE_PI:
> This call must be used to wake threads waiting with FUTEX_WAIT_REQUEUE_PI,
> regardless of how many threads the caller intends to wake or requeue.
> pthread_cond_broadcast should call this with nr_wake=1 and nr_requeue=-1
> (all).
> pthread_cond_signal should call this with nr_wake=1 and nr_requeue=0. The
> reason being we need both callers to get the benefit of the
> futex_requeue_pi_init() routine which will prepare the top_waiter (the
> thread
> to be woken) to take possesion of the pi futex by setting FUTEX_WAITERS and
> preparing the futex_q.pi_state. futex_requeue() also enqueues the
> top_waiter
> on the rt_mutex via rt_mutex_start_proxy_lock(). If pthread_cond_signal
> used
> FUTEX_WAKE, we would have a similar race window where the caller can
> return and
> release the mutex before the waiters can fully wake, potentially leaving
> the
> rt_mutex with waiters but no owner.
>
> We hit a failed paging request running the testcase (7/7) in a loop
> (only takes a few minutes at most to hit on my 8way x86_64 test
> machine). It appears to be the result of splitting rt_mutex_slowlock()
> across two execution contexts by means of rt_mutex_start_proxy_lock()
> and rt_mutex_finish_proxy_lock(). The former calls
> task_blocks_on_rt_mutex() on behalf of the waiting task prior to
> requeuing and waking it by the requeueing thread. The latter is
> executed upon wakeup by the waiting thread which somehow manages to call
> the new __rt_mutex_slowlock() with waiter->task != NULL and still
> succeed with try_to_take_lock(), this leads to corruption of the plists
> and an eventual failed paging request. See 7/7 for the rather crude
> testcase that causes this. Any tips on where this race might be
> occuring are welcome.

After some judicious use of printk (ftrace from tip wouldn't let me set
the current_tracer, permission denied) I managed to catch a failing
scenario where the signaling thread returns to userspace and unlocks the
mutex before the waiting thread calls __rt_mutex_slowunlock() (which is
fine) but the signaler calls rt_mutex_fastunlock() instead of
rt_mutex_slowunlock() which is what the rt_mutex_start_proxy_lock() was
supposed to prevent, so I am apparently not fully preparing the waiter
and enqueueing it on the rt_mutex. Annotated printk output:

Signaler thread in futex_requeue()
lookup_pi_state: allocating a new pi state
futex_requeue_pi_init: futex_lock_pi_atomic returned: 0
futex_requeue: futex_requeue_pi_init returned: 0

Signaler thread returned to userspace and did pthread_mutex_unlock()
rt_mutex_fastunlock: unlocked ffff88013d1749d0

Waiting thread woke up in futex_wait_requeue_pi() and tries to finish
taking the lock:
__rt_mutex_slowlock: waiter->task is ffff8802bdd350c0
try_to_take_rt_mutex: assigned rt_mutex (ffff88013d1749d0) owner
to current ffff8802bdd350c0

Waiting thread get's the lock while waiter->task is not NULL (b/c the
signaler didn't go through the slow path)
__rt_mutex_slowlock: got the lock

I'll continue looking into this tomorrow, but Steven if you have any
ideas on what I may have missed in rt_mutex_start_proxy_lock() I'd
appreciate any insight you might have to share. Thomas, I know you gave
this function some thought as well, did I take a radically different
approach to what you had in mind?

--
Darren Hart
IBM Linux Technology Center
Real-Time Linux Team

2009-03-05 16:51:33

by Darren Hart

[permalink] [raw]
Subject: Re: [TIP][RFC 6/7] futex: add requeue_pi calls

Darren Hart wrote:
> Darren Hart wrote:
>> From: Darren Hart <[email protected]>
>>
>> PI Futexes must have an owner at all times, so the standard requeue
>> commands
>> aren't sufficient. The new commands properly manage pi futex
>> ownership by
>> ensuring a futex with waiters has an owner at all times. Once
>> complete these
>> patches will allow glibc to properly handle pi mutexes with
>> pthread_condvars.
>>
>> The approach taken here is to create two new futex op codes:
>>
>> FUTEX_WAIT_REQUEUE_PI:
>> Threads will use this op code to wait on a futex (such as a non-pi
>> waitqueue)
>> and wake after they have been requeued to a pi futex. Prior to
>> returning to
>> userspace, they will take this pi futex (and the underlying rt_mutex).
>>
>> futex_wait_requeue_pi() is currently the result of a high speed collision
>> between futex_wait and futex_lock_pi (with the first part of
>> futex_lock_pi
>> being done by futex_requeue_pi_init() on behalf of the waiter).
>>
>> FUTEX_REQUEUE_PI:
>> This call must be used to wake threads waiting with
>> FUTEX_WAIT_REQUEUE_PI,
>> regardless of how many threads the caller intends to wake or requeue.
>> pthread_cond_broadcast should call this with nr_wake=1 and
>> nr_requeue=-1 (all).
>> pthread_cond_signal should call this with nr_wake=1 and nr_requeue=0.
>> The
>> reason being we need both callers to get the benefit of the
>> futex_requeue_pi_init() routine which will prepare the top_waiter (the
>> thread
>> to be woken) to take possesion of the pi futex by setting
>> FUTEX_WAITERS and
>> preparing the futex_q.pi_state. futex_requeue() also enqueues the
>> top_waiter
>> on the rt_mutex via rt_mutex_start_proxy_lock(). If
>> pthread_cond_signal used
>> FUTEX_WAKE, we would have a similar race window where the caller can
>> return and
>> release the mutex before the waiters can fully wake, potentially
>> leaving the
>> rt_mutex with waiters but no owner.
>>
>> We hit a failed paging request running the testcase (7/7) in a loop
>> (only takes a few minutes at most to hit on my 8way x86_64 test
>> machine). It appears to be the result of splitting rt_mutex_slowlock()
>> across two execution contexts by means of rt_mutex_start_proxy_lock()
>> and rt_mutex_finish_proxy_lock(). The former calls
>> task_blocks_on_rt_mutex() on behalf of the waiting task prior to
>> requeuing and waking it by the requeueing thread. The latter is
>> executed upon wakeup by the waiting thread which somehow manages to call
>> the new __rt_mutex_slowlock() with waiter->task != NULL and still
>> succeed with try_to_take_lock(), this leads to corruption of the plists
>> and an eventual failed paging request. See 7/7 for the rather crude
>> testcase that causes this. Any tips on where this race might be
>> occuring are welcome.
>
> After some judicious use of printk (ftrace from tip wouldn't let me set
> the current_tracer, permission denied)

Thanks to Steven for helping me get a working ftrace in tip.


I managed to catch a failing
> scenario where the signaling thread returns to userspace and unlocks the
> mutex before the waiting thread calls __rt_mutex_slowunlock() (which is
> fine) but the signaler calls rt_mutex_fastunlock() instead of
> rt_mutex_slowunlock() which is what the rt_mutex_start_proxy_lock() was
> supposed to prevent, so I am apparently not fully preparing the waiter
> and enqueueing it on the rt_mutex. Annotated printk output:
>
> Signaler thread in futex_requeue()
> lookup_pi_state: allocating a new pi state
> futex_requeue_pi_init: futex_lock_pi_atomic returned: 0
> futex_requeue: futex_requeue_pi_init returned: 0
>
> Signaler thread returned to userspace and did pthread_mutex_unlock()
> rt_mutex_fastunlock: unlocked ffff88013d1749d0
>
> Waiting thread woke up in futex_wait_requeue_pi() and tries to finish
> taking the lock:
> __rt_mutex_slowlock: waiter->task is ffff8802bdd350c0
> try_to_take_rt_mutex: assigned rt_mutex (ffff88013d1749d0) owner
> to current ffff8802bdd350c0
>
> Waiting thread get's the lock while waiter->task is not NULL (b/c the
> signaler didn't go through the slow path)
> __rt_mutex_slowlock: got the lock
>
> I'll continue looking into this tomorrow, but Steven if you have any
> ideas on what I may have missed in rt_mutex_start_proxy_lock() I'd
> appreciate any insight you might have to share. Thomas, I know you gave
> this function some thought as well, did I take a radically different
> approach to what you had in mind?

I've updated my tracing and can show that rt_mutex_start_proxy_lock() is
not setting RT_MUTEX_HAS_WAITERS like it should be:

------------[ cut here ]------------
kernel BUG at kernel/rtmutex.c:646!
invalid opcode: 0000 [#1] PREEMPT SMP
last sysfs file: /sys/devices/pci0000:00/0000:00:03.0/0000:01:00.0/host0/port-0:
0/end_device-0:0/target0:0:0/0:0:0:0/vendor
Dumping ftrace buffer:
---------------------------------
<...>-3793 1d..3 558351872us : lookup_pi_state: allocating a new pi state
<...>-3793 1d..3 558351876us : lookup_pi_state: initial rt_mutex owner: ffff88023d9486c0
<...>-3793 1...2 558351877us : futex_requeue: futex_lock_pi_atomic returned: 0
<...>-3793 1...2 558351877us : futex_requeue: futex_requeue_pi_init returned: 0
<...>-3793 1...3 558351879us : rt_mutex_start_proxy_lock: task_blocks_on_rt_mutex returned 0
<...>-3793 1...3 558351880us : rt_mutex_start_proxy_lock: lock has waiterflag: 0
<...>-3793 1...1 558351888us : rt_mutex_unlock: unlocking ffff88023b5f6950
<...>-3793 1...1 558351888us : rt_mutex_unlock: lock waiter flag: 0
<...>-3793 1...1 558351889us : rt_mutex_unlock: unlocked ffff88023b5f6950
<...>-3783 0...1 558351893us : __rt_mutex_slowlock: waiter->task is ffff88023c872440
<...>-3783 0...1 558351897us : try_to_take_rt_mutex: assigned rt_mutex (ffff88023b5f6950) owner to current ffff88023c872440
<...>-3783 0...1 558351897us : __rt_mutex_slowlock: got the lock
---------------------------------

I'll start digging into why that's happening, but I wanted to share the trace output.

--
Darren Hart
IBM Linux Technology Center
Real-Time Linux Team

2009-03-06 01:42:47

by Darren Hart

[permalink] [raw]
Subject: Re: [TIP][RFC 6/7] futex: add requeue_pi calls

Darren Hart wrote:
> Darren Hart wrote:
>> Darren Hart wrote:
>>> From: Darren Hart <[email protected]>
>>>
>>> PI Futexes must have an owner at all times, so the standard requeue
>>> commands
>>> aren't sufficient. The new commands properly manage pi futex
>>> ownership by
>>> ensuring a futex with waiters has an owner at all times. Once
>>> complete these
>>> patches will allow glibc to properly handle pi mutexes with
>>> pthread_condvars.
>>>
>>> The approach taken here is to create two new futex op codes:
>>>
>>> FUTEX_WAIT_REQUEUE_PI:
>>> Threads will use this op code to wait on a futex (such as a non-pi
>>> waitqueue)
>>> and wake after they have been requeued to a pi futex. Prior to
>>> returning to
>>> userspace, they will take this pi futex (and the underlying rt_mutex).
>>>
>>> futex_wait_requeue_pi() is currently the result of a high speed
>>> collision
>>> between futex_wait and futex_lock_pi (with the first part of
>>> futex_lock_pi
>>> being done by futex_requeue_pi_init() on behalf of the waiter).
>>>
>>> FUTEX_REQUEUE_PI:
>>> This call must be used to wake threads waiting with
>>> FUTEX_WAIT_REQUEUE_PI,
>>> regardless of how many threads the caller intends to wake or requeue.
>>> pthread_cond_broadcast should call this with nr_wake=1 and
>>> nr_requeue=-1 (all).
>>> pthread_cond_signal should call this with nr_wake=1 and
>>> nr_requeue=0. The
>>> reason being we need both callers to get the benefit of the
>>> futex_requeue_pi_init() routine which will prepare the top_waiter
>>> (the thread
>>> to be woken) to take possesion of the pi futex by setting
>>> FUTEX_WAITERS and
>>> preparing the futex_q.pi_state. futex_requeue() also enqueues the
>>> top_waiter
>>> on the rt_mutex via rt_mutex_start_proxy_lock(). If
>>> pthread_cond_signal used
>>> FUTEX_WAKE, we would have a similar race window where the caller can
>>> return and
>>> release the mutex before the waiters can fully wake, potentially
>>> leaving the
>>> rt_mutex with waiters but no owner.
>>>
>>> We hit a failed paging request running the testcase (7/7) in a loop
>>> (only takes a few minutes at most to hit on my 8way x86_64 test
>>> machine). It appears to be the result of splitting rt_mutex_slowlock()
>>> across two execution contexts by means of rt_mutex_start_proxy_lock()
>>> and rt_mutex_finish_proxy_lock(). The former calls
>>> task_blocks_on_rt_mutex() on behalf of the waiting task prior to
>>> requeuing and waking it by the requeueing thread. The latter is
>>> executed upon wakeup by the waiting thread which somehow manages to call
>>> the new __rt_mutex_slowlock() with waiter->task != NULL and still
>>> succeed with try_to_take_lock(), this leads to corruption of the plists
>>> and an eventual failed paging request. See 7/7 for the rather crude
>>> testcase that causes this. Any tips on where this race might be
>>> occuring are welcome.

<snip>

>
> I've updated my tracing and can show that rt_mutex_start_proxy_lock() is
> not setting RT_MUTEX_HAS_WAITERS like it should be:
>
> ------------[ cut here ]------------
> kernel BUG at kernel/rtmutex.c:646!
> invalid opcode: 0000 [#1] PREEMPT SMP
> last sysfs file:
> /sys/devices/pci0000:00/0000:00:03.0/0000:01:00.0/host0/port-0:
> 0/end_device-0:0/target0:0:0/0:0:0:0/vendor
> Dumping ftrace buffer:
> ---------------------------------
> <...>-3793 1d..3 558351872us : lookup_pi_state: allocating a new pi
> state
> <...>-3793 1d..3 558351876us : lookup_pi_state: initial rt_mutex
> owner: ffff88023d9486c0
> <...>-3793 1...2 558351877us : futex_requeue: futex_lock_pi_atomic
> returned: 0
> <...>-3793 1...2 558351877us : futex_requeue: futex_requeue_pi_init
> returned: 0
> <...>-3793 1...3 558351879us : rt_mutex_start_proxy_lock:
> task_blocks_on_rt_mutex returned 0
> <...>-3793 1...3 558351880us : rt_mutex_start_proxy_lock: lock has
> waiterflag: 0
> <...>-3793 1...1 558351888us : rt_mutex_unlock: unlocking
> ffff88023b5f6950
> <...>-3793 1...1 558351888us : rt_mutex_unlock: lock waiter flag: 0
> <...>-3793 1...1 558351889us : rt_mutex_unlock: unlocked
> ffff88023b5f6950
> <...>-3783 0...1 558351893us : __rt_mutex_slowlock: waiter->task is
> ffff88023c872440
> <...>-3783 0...1 558351897us : try_to_take_rt_mutex: assigned
> rt_mutex (ffff88023b5f6950) owner to current ffff88023c872440
> <...>-3783 0...1 558351897us : __rt_mutex_slowlock: got the lock
> ---------------------------------
>
> I'll start digging into why that's happening, but I wanted to share the
> trace output.
>

As it turns out I missed setting RT_MUTEX_HAS_WAITERS on the rt_mutex in
rt_mutex_start_proxy_lock() - seems awfully silly in retrospect - but a
little non-obvious while writing it. I added mark_rt_mutex_waiters()
after the call to task_blocks_on_rt_mutex() and the test has completed
more than 400 iterations successfully (it would fail after no more than
2 most of the time before).

Steven, there are several ways to set RT_MUTEX_HAS_WAITERS - but this
seemed like a reasonable approach, would you agree? Since I'm holding
the wait_lock I don't technically need the atomic cmpxchg and could
probably just set it explicity - do you have a preference?


RFC: rt_mutex: add proxy lock routines

From: Darren Hart <[email protected]>

This patch is required for the first half of requeue_pi to function. It
basically splits rt_mutex_slowlock() right down the middle, just before the
first call to schedule().

This patch uses a new futex_q field, rt_waiter, for now. I think
I should be able to use task->pi_blocked_on in a future versino of this patch.

V6: -add mark_rt_mutex_waiters() to rt_mutex_start_procy_lock() to avoid
the race condition evident in previous versions
V5: -remove EXPORT_SYMBOL_GPL from the new routines
-minor cleanups
V4: -made detect_deadlock a parameter to rt_mutex_enqueue_task
-refactored rt_mutex_slowlock to share code with new functions
-renamed rt_mutex_enqueue_task and rt_mutex_handle_wakeup to
rt_mutex_start_proxy_lock and rt_mutex_finish_proxy_lock, respectively

Signed-off-by: Darren Hart <[email protected]>
Cc: Thomas Gleixner <[email protected]>
Cc: Sripathi Kodi <[email protected]>
Cc: Peter Zijlstra <[email protected]>
Cc: John Stultz <[email protected]>
Cc: Steven Rostedt <[email protected]>
---

kernel/rtmutex.c | 193 +++++++++++++++++++++++++++++++++++++----------
kernel/rtmutex_common.h | 8 ++
2 files changed, 161 insertions(+), 40 deletions(-)


diff --git a/kernel/rtmutex.c b/kernel/rtmutex.c
index 69d9cb9..f438362 100644
--- a/kernel/rtmutex.c
+++ b/kernel/rtmutex.c
@@ -411,6 +411,7 @@ static int try_to_take_rt_mutex(struct rt_mutex *lock)
*/
static int task_blocks_on_rt_mutex(struct rt_mutex *lock,
struct rt_mutex_waiter *waiter,
+ struct task_struct *task,
int detect_deadlock)
{
struct task_struct *owner = rt_mutex_owner(lock);
@@ -418,21 +419,21 @@ static int task_blocks_on_rt_mutex(struct rt_mutex *lock,
unsigned long flags;
int chain_walk = 0, res;

- spin_lock_irqsave(&current->pi_lock, flags);
- __rt_mutex_adjust_prio(current);
- waiter->task = current;
+ spin_lock_irqsave(&task->pi_lock, flags);
+ __rt_mutex_adjust_prio(task);
+ waiter->task = task;
waiter->lock = lock;
- plist_node_init(&waiter->list_entry, current->prio);
- plist_node_init(&waiter->pi_list_entry, current->prio);
+ plist_node_init(&waiter->list_entry, task->prio);
+ plist_node_init(&waiter->pi_list_entry, task->prio);

/* Get the top priority waiter on the lock */
if (rt_mutex_has_waiters(lock))
top_waiter = rt_mutex_top_waiter(lock);
plist_add(&waiter->list_entry, &lock->wait_list);

- current->pi_blocked_on = waiter;
+ task->pi_blocked_on = waiter;

- spin_unlock_irqrestore(&current->pi_lock, flags);
+ spin_unlock_irqrestore(&task->pi_lock, flags);

if (waiter == rt_mutex_top_waiter(lock)) {
spin_lock_irqsave(&owner->pi_lock, flags);
@@ -460,7 +461,7 @@ static int task_blocks_on_rt_mutex(struct rt_mutex *lock,
spin_unlock(&lock->wait_lock);

res = rt_mutex_adjust_prio_chain(owner, detect_deadlock, lock, waiter,
- current);
+ task);

spin_lock(&lock->wait_lock);

@@ -605,37 +606,25 @@ void rt_mutex_adjust_pi(struct task_struct *task)
rt_mutex_adjust_prio_chain(task, 0, NULL, NULL, task);
}

-/*
- * Slow path lock function:
+/**
+ * __rt_mutex_slowlock - perform the wait-wake-try-to-take loop
+ * @lock the rt_mutex to take
+ * @state: the state the task should block in (TASK_INTERRUPTIBLE
+ * or TASK_UNINTERRUPTIBLE)
+ * @timeout: the pre-initialized and started timer, or NULL for none
+ * @waiter: the pre-initialized rt_mutex_waiter
+ * @detect_deadlock: passed to task_blocks_on_rt_mutex
+ *
+ * lock->wait_lock must be held by the caller.
*/
static int __sched
-rt_mutex_slowlock(struct rt_mutex *lock, int state,
- struct hrtimer_sleeper *timeout,
- int detect_deadlock)
+__rt_mutex_slowlock(struct rt_mutex *lock, int state,
+ struct hrtimer_sleeper *timeout,
+ struct rt_mutex_waiter *waiter,
+ int detect_deadlock)
{
- struct rt_mutex_waiter waiter;
int ret = 0;

- debug_rt_mutex_init_waiter(&waiter);
- waiter.task = NULL;
-
- spin_lock(&lock->wait_lock);
-
- /* Try to acquire the lock again: */
- if (try_to_take_rt_mutex(lock)) {
- spin_unlock(&lock->wait_lock);
- return 0;
- }
-
- set_current_state(state);
-
- /* Setup the timer, when timeout != NULL */
- if (unlikely(timeout)) {
- hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);
- if (!hrtimer_active(&timeout->timer))
- timeout->task = NULL;
- }
-
for (;;) {
/* Try to acquire the lock: */
if (try_to_take_rt_mutex(lock))
@@ -656,19 +645,19 @@ rt_mutex_slowlock(struct rt_mutex *lock, int state,
}

/*
- * waiter.task is NULL the first time we come here and
+ * waiter->task is NULL the first time we come here and
* when we have been woken up by the previous owner
* but the lock got stolen by a higher prio task.
*/
- if (!waiter.task) {
- ret = task_blocks_on_rt_mutex(lock, &waiter,
+ if (!waiter->task) {
+ ret = task_blocks_on_rt_mutex(lock, waiter, current,
detect_deadlock);
/*
* If we got woken up by the owner then start loop
* all over without going into schedule to try
* to get the lock now:
*/
- if (unlikely(!waiter.task)) {
+ if (unlikely(!waiter->task)) {
/*
* Reset the return value. We might
* have returned with -EDEADLK and the
@@ -684,15 +673,52 @@ rt_mutex_slowlock(struct rt_mutex *lock, int state,

spin_unlock(&lock->wait_lock);

- debug_rt_mutex_print_deadlock(&waiter);
+ debug_rt_mutex_print_deadlock(waiter);

- if (waiter.task)
+ if (waiter->task)
schedule_rt_mutex(lock);

spin_lock(&lock->wait_lock);
set_current_state(state);
}

+ return ret;
+}
+
+/*
+ * Slow path lock function:
+ */
+static int __sched
+rt_mutex_slowlock(struct rt_mutex *lock, int state,
+ struct hrtimer_sleeper *timeout,
+ int detect_deadlock)
+{
+ struct rt_mutex_waiter waiter;
+ int ret = 0;
+
+ debug_rt_mutex_init_waiter(&waiter);
+ waiter.task = NULL;
+
+ spin_lock(&lock->wait_lock);
+
+ /* Try to acquire the lock again: */
+ if (try_to_take_rt_mutex(lock)) {
+ spin_unlock(&lock->wait_lock);
+ return 0;
+ }
+
+ set_current_state(state);
+
+ /* Setup the timer, when timeout != NULL */
+ if (unlikely(timeout)) {
+ hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);
+ if (!hrtimer_active(&timeout->timer))
+ timeout->task = NULL;
+ }
+
+ ret = __rt_mutex_slowlock(lock, state, timeout, &waiter,
+ detect_deadlock);
+
set_current_state(TASK_RUNNING);

if (unlikely(waiter.task))
@@ -986,6 +1012,42 @@ void rt_mutex_proxy_unlock(struct rt_mutex *lock,
}

/**
+ * rt_mutex_start_proxy_lock - prepare another task to take the lock
+ *
+ * @lock: the rt_mutex to take
+ * @waiter: the rt_mutex_waiter initialized by the waiter
+ * @task: the task to prepare
+ * @detext_deadlock: passed to task_blocks_on_rt_mutex
+ *
+ * The lock should have an owner, and it should not be task.
+ * Special API call for FUTEX_REQUEUE_PI support.
+ */
+int rt_mutex_start_proxy_lock(struct rt_mutex *lock,
+ struct rt_mutex_waiter *waiter,
+ struct task_struct *task, int detect_deadlock)
+{
+ int ret;
+
+ spin_lock(&lock->wait_lock);
+ ret = task_blocks_on_rt_mutex(lock, waiter, task, detect_deadlock);
+ mark_rt_mutex_waiters(lock);
+ if (ret && !waiter->task) {
+ /*
+ * Reset the return value. We might have
+ * returned with -EDEADLK and the owner
+ * released the lock while we were walking the
+ * pi chain. Let the waiter sort it out.
+ */
+ ret = 0;
+ }
+ spin_unlock(&lock->wait_lock);
+
+ debug_rt_mutex_print_deadlock(waiter);
+
+ return ret;
+}
+
+/**
* rt_mutex_next_owner - return the next owner of the lock
*
* @lock: the rt lock query
@@ -1004,3 +1066,54 @@ struct task_struct *rt_mutex_next_owner(struct rt_mutex *lock)

return rt_mutex_top_waiter(lock)->task;
}
+
+/**
+ * rt_mutex_finish_proxy_lock - Complete the taking of the lock initialized on
+ * our behalf by another thread.
+ * @lock: the rt_mutex we were woken on
+ * @to: the timeout, null if none. hrtimer should already have been started.
+ * @waiter: the pre-initialized rt_mutex_waiter
+ * @detect_deadlock: for use by __rt_mutex_slowlock
+ *
+ * Special API call for PI-futex requeue support
+ */
+int rt_mutex_finish_proxy_lock(struct rt_mutex *lock,
+ struct hrtimer_sleeper *to,
+ struct rt_mutex_waiter *waiter,
+ int detect_deadlock)
+{
+ int ret;
+
+ if (waiter->task)
+ schedule_rt_mutex(lock);
+
+ spin_lock(&lock->wait_lock);
+
+ set_current_state(TASK_INTERRUPTIBLE);
+
+ ret = __rt_mutex_slowlock(lock, TASK_INTERRUPTIBLE, to, waiter,
+ detect_deadlock);
+
+ set_current_state(TASK_RUNNING);
+
+ if (unlikely(waiter->task))
+ remove_waiter(lock, waiter);
+
+ /*
+ * try_to_take_rt_mutex() sets the waiter bit unconditionally. We might
+ * have to fix that up.
+ */
+ fixup_rt_mutex_waiters(lock);
+
+ spin_unlock(&lock->wait_lock);
+
+ /*
+ * Readjust priority, when we did not get the lock. We might have been
+ * the pending owner and boosted. Since we did not take the lock, the
+ * PI boost has to go.
+ */
+ if (unlikely(ret))
+ rt_mutex_adjust_prio(current);
+
+ return ret;
+}
diff --git a/kernel/rtmutex_common.h b/kernel/rtmutex_common.h
index e124bf5..97a2f81 100644
--- a/kernel/rtmutex_common.h
+++ b/kernel/rtmutex_common.h
@@ -120,6 +120,14 @@ extern void rt_mutex_init_proxy_locked(struct rt_mutex *lock,
struct task_struct *proxy_owner);
extern void rt_mutex_proxy_unlock(struct rt_mutex *lock,
struct task_struct *proxy_owner);
+extern int rt_mutex_start_proxy_lock(struct rt_mutex *lock,
+ struct rt_mutex_waiter *waiter,
+ struct task_struct *task,
+ int detect_deadlock);
+extern int rt_mutex_finish_proxy_lock(struct rt_mutex *lock,
+ struct hrtimer_sleeper *to,
+ struct rt_mutex_waiter *waiter,
+ int detect_deadlock);

#ifdef CONFIG_DEBUG_RT_MUTEXES
# include "rtmutex-debug.h"

--
Darren Hart
IBM Linux Technology Center
Real-Time Linux Team

2009-03-06 02:22:10

by Steven Rostedt

[permalink] [raw]
Subject: Re: [TIP][RFC 6/7] futex: add requeue_pi calls


On Thu, 5 Mar 2009, Darren Hart wrote:
>
> As it turns out I missed setting RT_MUTEX_HAS_WAITERS on the rt_mutex in
> rt_mutex_start_proxy_lock() - seems awfully silly in retrospect - but a
> little non-obvious while writing it. I added mark_rt_mutex_waiters()
> after the call to task_blocks_on_rt_mutex() and the test has completed
> more than 400 iterations successfully (it would fail after no more than
> 2 most of the time before).
>
> Steven, there are several ways to set RT_MUTEX_HAS_WAITERS - but this
> seemed like a reasonable approach, would you agree? Since I'm holding
> the wait_lock I don't technically need the atomic cmpxchg and could
> probably just set it explicity - do you have a preference?
>

> +
> +/**
> + * rt_mutex_finish_proxy_lock - Complete the taking of the lock initialized
> on
> + * our behalf by another thread.
> + * @lock: the rt_mutex we were woken on
> + * @to: the timeout, null if none. hrtimer should already have been started.
> + * @waiter: the pre-initialized rt_mutex_waiter
> + * @detect_deadlock: for use by __rt_mutex_slowlock
> + *
> + * Special API call for PI-futex requeue support
> + */
> +int rt_mutex_finish_proxy_lock(struct rt_mutex *lock,
> + struct hrtimer_sleeper *to,
> + struct rt_mutex_waiter *waiter,
> + int detect_deadlock)
> +{
> + int ret;
> +
> + if (waiter->task)
> + schedule_rt_mutex(lock);
> +
> + spin_lock(&lock->wait_lock);
> +
> + set_current_state(TASK_INTERRUPTIBLE);
> +
> + ret = __rt_mutex_slowlock(lock, TASK_INTERRUPTIBLE, to, waiter,
> + detect_deadlock);
> +
> + set_current_state(TASK_RUNNING);
> +
> + if (unlikely(waiter->task))
> + remove_waiter(lock, waiter);
> +
> + /*
> + * try_to_take_rt_mutex() sets the waiter bit unconditionally. We
> might
> + * have to fix that up.
> + */
> + fixup_rt_mutex_waiters(lock);

Darren,

I take it you are talking about the above.

static void fixup_rt_mutex_waiters(struct rt_mutex *lock)
{
if (!rt_mutex_has_waiters(lock))
clear_rt_mutex_waiters(lock);
}

So it only clears the bit if there are no waiters. Yep, that should be
fine. The task clearing the bit is the owner and you have the wait_lock.
This should work.

-- Steve


> +
> + spin_unlock(&lock->wait_lock);
> +
> + /*
> + * Readjust priority, when we did not get the lock. We might have been
> + * the pending owner and boosted. Since we did not take the lock, the
> + * PI boost has to go.
> + */
> + if (unlikely(ret))
> + rt_mutex_adjust_prio(current);
> +
> + return ret;
> +}

2009-03-06 05:27:49

by Darren Hart

[permalink] [raw]
Subject: Re: [TIP][RFC 6/7] futex: add requeue_pi calls

Steven Rostedt wrote:
> On Thu, 5 Mar 2009, Darren Hart wrote:
>> As it turns out I missed setting RT_MUTEX_HAS_WAITERS on the rt_mutex in
>> rt_mutex_start_proxy_lock() - seems awfully silly in retrospect - but a
>> little non-obvious while writing it. I added mark_rt_mutex_waiters()
>> after the call to task_blocks_on_rt_mutex() and the test has completed
>> more than 400 iterations successfully (it would fail after no more than
>> 2 most of the time before).
>>
>> Steven, there are several ways to set RT_MUTEX_HAS_WAITERS - but this
>> seemed like a reasonable approach, would you agree? Since I'm holding
>> the wait_lock I don't technically need the atomic cmpxchg and could
>> probably just set it explicity - do you have a preference?
>>
>
>> +
>> +/**
>> + * rt_mutex_finish_proxy_lock - Complete the taking of the lock initialized
>> on
>> + * our behalf by another thread.
>> + * @lock: the rt_mutex we were woken on
>> + * @to: the timeout, null if none. hrtimer should already have been started.
>> + * @waiter: the pre-initialized rt_mutex_waiter
>> + * @detect_deadlock: for use by __rt_mutex_slowlock
>> + *
>> + * Special API call for PI-futex requeue support
>> + */
>> +int rt_mutex_finish_proxy_lock(struct rt_mutex *lock,
>> + struct hrtimer_sleeper *to,
>> + struct rt_mutex_waiter *waiter,
>> + int detect_deadlock)
>> +{
>> + int ret;
>> +
>> + if (waiter->task)
>> + schedule_rt_mutex(lock);
>> +
>> + spin_lock(&lock->wait_lock);
>> +
>> + set_current_state(TASK_INTERRUPTIBLE);
>> +
>> + ret = __rt_mutex_slowlock(lock, TASK_INTERRUPTIBLE, to, waiter,
>> + detect_deadlock);
>> +
>> + set_current_state(TASK_RUNNING);
>> +
>> + if (unlikely(waiter->task))
>> + remove_waiter(lock, waiter);
>> +
>> + /*
>> + * try_to_take_rt_mutex() sets the waiter bit unconditionally. We
>> might
>> + * have to fix that up.
>> + */
>> + fixup_rt_mutex_waiters(lock);
>
> Darren,
>
> I take it you are talking about the above.

Actually no, I was talking about rt_mutex_START_proxy_lock():

/**
* rt_mutex_start_proxy_lock - prepare another task to take the lock
*
* @lock: the rt_mutex to take
* @waiter: the rt_mutex_waiter initialized by the waiter
* @task: the task to prepare
* @detext_deadlock: passed to task_blocks_on_rt_mutex
*
* The lock should have an owner, and it should not be task.
* Special API call for FUTEX_REQUEUE_PI support.
*/
int rt_mutex_start_proxy_lock(struct rt_mutex *lock,
struct rt_mutex_waiter *waiter,
struct task_struct *task, int detect_deadlock)
{
int ret;

spin_lock(&lock->wait_lock);
ret = task_blocks_on_rt_mutex(lock, waiter, task, detect_deadlock);


I add the following line to fix the bug. Question is, should I use this atomic
optimization here (under the lock->wait_lock) or should I just do
"lock->owner |= RT_MUTEX_HAS_WAITERS" ?

=====> mark_rt_mutex_waiters(lock);

if (ret && !waiter->task) {
/*
* Reset the return value. We might have
* returned with -EDEADLK and the owner
* released the lock while we were walking the
* pi chain. Let the waiter sort it out.
*/
ret = 0;
}
spin_unlock(&lock->wait_lock);

debug_rt_mutex_print_deadlock(waiter);

return ret;
}



--
Darren Hart
IBM Linux Technology Center
Real-Time Linux Team

2009-03-07 06:04:31

by Sripathi Kodi

[permalink] [raw]
Subject: Re: [TIP][RFC 6/7] futex: add requeue_pi calls

On Friday 06 March 2009, Darren Hart wrote:
> Darren Hart wrote:
> > Darren Hart wrote:
> >> Darren Hart wrote:
> >>> From: Darren Hart <[email protected]>
> >>>
> >>> PI Futexes must have an owner at all times, so the standard
> >>> requeue commands
> >>> aren't sufficient. The new commands properly manage pi futex
> >>> ownership by
> >>> ensuring a futex with waiters has an owner at all times. Once
> >>> complete these
> >>> patches will allow glibc to properly handle pi mutexes with
> >>> pthread_condvars.
> >>>
> >>> The approach taken here is to create two new futex op codes:
> >>>
> >>> FUTEX_WAIT_REQUEUE_PI:
> >>> Threads will use this op code to wait on a futex (such as a
> >>> non-pi waitqueue)
> >>> and wake after they have been requeued to a pi futex. Prior to
> >>> returning to
> >>> userspace, they will take this pi futex (and the underlying
> >>> rt_mutex).
> >>>
> >>> futex_wait_requeue_pi() is currently the result of a high speed
> >>> collision
> >>> between futex_wait and futex_lock_pi (with the first part of
> >>> futex_lock_pi
> >>> being done by futex_requeue_pi_init() on behalf of the waiter).
> >>>
> >>> FUTEX_REQUEUE_PI:
> >>> This call must be used to wake threads waiting with
> >>> FUTEX_WAIT_REQUEUE_PI,
> >>> regardless of how many threads the caller intends to wake or
> >>> requeue. pthread_cond_broadcast should call this with nr_wake=1
> >>> and nr_requeue=-1 (all).
> >>> pthread_cond_signal should call this with nr_wake=1 and
> >>> nr_requeue=0. The
> >>> reason being we need both callers to get the benefit of the
> >>> futex_requeue_pi_init() routine which will prepare the top_waiter
> >>> (the thread
> >>> to be woken) to take possesion of the pi futex by setting
> >>> FUTEX_WAITERS and
> >>> preparing the futex_q.pi_state. futex_requeue() also enqueues
> >>> the top_waiter
> >>> on the rt_mutex via rt_mutex_start_proxy_lock(). If
> >>> pthread_cond_signal used
> >>> FUTEX_WAKE, we would have a similar race window where the caller
> >>> can return and
> >>> release the mutex before the waiters can fully wake, potentially
> >>> leaving the
> >>> rt_mutex with waiters but no owner.
> >>>
> >>> We hit a failed paging request running the testcase (7/7) in a
> >>> loop (only takes a few minutes at most to hit on my 8way x86_64
> >>> test machine). It appears to be the result of splitting
> >>> rt_mutex_slowlock() across two execution contexts by means of
> >>> rt_mutex_start_proxy_lock() and rt_mutex_finish_proxy_lock().
> >>> The former calls
> >>> task_blocks_on_rt_mutex() on behalf of the waiting task prior to
> >>> requeuing and waking it by the requeueing thread. The latter is
> >>> executed upon wakeup by the waiting thread which somehow manages
> >>> to call the new __rt_mutex_slowlock() with waiter->task != NULL
> >>> and still succeed with try_to_take_lock(), this leads to
> >>> corruption of the plists and an eventual failed paging request.
> >>> See 7/7 for the rather crude testcase that causes this. Any tips
> >>> on where this race might be occuring are welcome.
>
> <snip>
>
> > I've updated my tracing and can show that
> > rt_mutex_start_proxy_lock() is not setting RT_MUTEX_HAS_WAITERS
> > like it should be:
> >
> > ------------[ cut here ]------------
> > kernel BUG at kernel/rtmutex.c:646!
> > invalid opcode: 0000 [#1] PREEMPT SMP
> > last sysfs file:
> > /sys/devices/pci0000:00/0000:00:03.0/0000:01:00.0/host0/port-0:
> > 0/end_device-0:0/target0:0:0/0:0:0:0/vendor
> > Dumping ftrace buffer:
> > ---------------------------------
> > <...>-3793 1d..3 558351872us : lookup_pi_state: allocating a
> > new pi state
> > <...>-3793 1d..3 558351876us : lookup_pi_state: initial
> > rt_mutex owner: ffff88023d9486c0
> > <...>-3793 1...2 558351877us : futex_requeue:
> > futex_lock_pi_atomic returned: 0
> > <...>-3793 1...2 558351877us : futex_requeue:
> > futex_requeue_pi_init returned: 0
> > <...>-3793 1...3 558351879us : rt_mutex_start_proxy_lock:
> > task_blocks_on_rt_mutex returned 0
> > <...>-3793 1...3 558351880us : rt_mutex_start_proxy_lock: lock
> > has waiterflag: 0
> > <...>-3793 1...1 558351888us : rt_mutex_unlock: unlocking
> > ffff88023b5f6950
> > <...>-3793 1...1 558351888us : rt_mutex_unlock: lock waiter
> > flag: 0 <...>-3793 1...1 558351889us : rt_mutex_unlock: unlocked
> > ffff88023b5f6950
> > <...>-3783 0...1 558351893us : __rt_mutex_slowlock:
> > waiter->task is ffff88023c872440
> > <...>-3783 0...1 558351897us : try_to_take_rt_mutex: assigned
> > rt_mutex (ffff88023b5f6950) owner to current ffff88023c872440
> > <...>-3783 0...1 558351897us : __rt_mutex_slowlock: got the
> > lock ---------------------------------
> >
> > I'll start digging into why that's happening, but I wanted to share
> > the trace output.
>
> As it turns out I missed setting RT_MUTEX_HAS_WAITERS on the rt_mutex
> in rt_mutex_start_proxy_lock() - seems awfully silly in retrospect -
> but a little non-obvious while writing it. I added
> mark_rt_mutex_waiters() after the call to task_blocks_on_rt_mutex()
> and the test has completed more than 400 iterations successfully (it
> would fail after no more than 2 most of the time before).
>
> Steven, there are several ways to set RT_MUTEX_HAS_WAITERS - but this
> seemed like a reasonable approach, would you agree? Since I'm
> holding the wait_lock I don't technically need the atomic cmpxchg and
> could probably just set it explicity - do you have a preference?
>
>
> RFC: rt_mutex: add proxy lock routines

I have been testing this with my own minimal prototype glibc
implementation. While testing Darren's kernel at this level (including
the fix below) I have hit the following BUG:

BUG: unable to handle kernel NULL pointer dereference at
0000000000000048
IP: [<ffffffff8105c236>] futex_requeue+0x403/0x5c6
PGD 1de5bf067 PUD 1de5be067 PMD 0
Oops: 0002 [#1] PREEMPT SMP
last sysfs file:
/sys/devices/pci0000:00/0000:00:03.0/0000:01:00.0/host0/port-0:0/end_device-0:0/target0:0:0/0:0:0:0/vendor
Dumping ftrace buffer:
(ftrace buffer empty)
CPU 3
Modules linked in: ipv6 autofs4 hidp rfcomm l2cap bluetooth
cpufreq_ondemand
acpi_cpufreq dm_mirror dm_region_hash dm_log dm_multipath scsi_dh dm_mod
video
output sbs sbshc battery ac parport_pc lp parport floppy snd_hda_intel
snd_seq_dummy snd_seq_oss snd_seq_midi_event snd_seq snd_seq_device
e1000 sg
snd_pcm_oss snd_mixer_oss snd_pcm snd_timer tg3 sr_mod snd_page_alloc
snd_hwdep
cdrom libphy snd serio_raw iTCO_wdt pata_acpi i5000_edac i2c_i801 button
shpchp
iTCO_vendor_support edac_core i2c_core ata_generic soundcore pcspkr
ata_piix
libata mptsas mptscsih scsi_transport_sas mptbase sd_mod scsi_mod ext3
jbd
mbcache uhci_hcd ohci_hcd ehci_hcd
Pid: 21264, comm: test_requeue_pi Not tainted 2.6.28-rc6-dvh04 #12
RIP: 0010:[<ffffffff8105c236>] [<ffffffff8105c236>]
futex_requeue+0x403/0x5c6
RSP: 0018:ffff8801de89dd38 EFLAGS: 00010206
RAX: 0000000000000048 RBX: 000000003fff8802 RCX: 0000000000005306
RDX: ffff8801de94a100 RSI: ffffffff8142c540 RDI: ffff88023e40a640
RBP: ffff8801de89de08 R08: ffff8801de8b6440 R09: 00000000f7a1335f
R10: 0000000000000005 R11: ffff8801de89dce8 R12: ffff8801de529c58
R13: ffffffff8160c840 R14: ffffffff8160c5c0 R15: 0000000000000000
FS: 000000004669c940(0063) GS:ffff88023eb3cac0(0000)
knlGS:0000000000000000
CS: 0010 DS: 0000 ES: 0000 CR0: 000000008005003b
CR2: 0000000000000048 CR3: 00000001de843000 CR4: 00000000000006e0
DR0: 0000000000000000 DR1: 0000000000000000 DR2: 0000000000000000
DR3: 0000000000000000 DR6: 00000000ffff0ff0 DR7: 0000000000000400
Process test_requeue_pi (pid: 21264, threadinfo ffff8801de89c000, task
ffff8801de942340)
Stack:
ffff8800280687e0 ffff8801de89def8 000000017fffffff 00000000006017c0
00000001de89dd88 0000000000601784 ffffffff8160c5c8 ffff8801de8c7c58
00000000de942340 0000000000000000 0000000000601000 ffff8801de94a100
Call Trace:
[<ffffffff811b796a>] ? pty_write+0x3e/0x48
[<ffffffff8105cc25>] do_futex+0x82c/0x878
[<ffffffff8102f556>] ? __wake_up+0x43/0x4f
[<ffffffff811b6936>] ? tty_ldisc_deref+0x6e/0x73
[<ffffffff811b1d08>] ? tty_write+0x211/0x22c
[<ffffffff8105cd8c>] sys_futex+0x11b/0x139
[<ffffffff8100c0db>] system_call_fastpath+0x16/0x1b
Code: 7d 10 00 74 59 49 8b 44 24 68 8b 5d cc 48 8b b8 a8 02 00 00 81 e3
ff ff
ff 3f e8 83 0b ff ff 39 c3 74 3b 48 8b 45 c0 48 83 c0 48 <f0> ff 00 48
8b 45 c0
49 8b 54 24 68 b9 01 00 00 00 49 8b 74 24
RIP [<ffffffff8105c236>] futex_requeue+0x403/0x5c6
RSP <ffff8801de89dd38>
CR2: 0000000000000048
---[ end trace 2376de034cbbad6b ]---

Thanks,
Sripathi.


>
> From: Darren Hart <[email protected]>
>
> This patch is required for the first half of requeue_pi to function.
> It basically splits rt_mutex_slowlock() right down the middle, just
> before the first call to schedule().
>
> This patch uses a new futex_q field, rt_waiter, for now. I think
> I should be able to use task->pi_blocked_on in a future versino of
> this patch.
>
> V6: -add mark_rt_mutex_waiters() to rt_mutex_start_procy_lock() to
> avoid the race condition evident in previous versions
> V5: -remove EXPORT_SYMBOL_GPL from the new routines
> -minor cleanups
> V4: -made detect_deadlock a parameter to rt_mutex_enqueue_task
> -refactored rt_mutex_slowlock to share code with new functions
> -renamed rt_mutex_enqueue_task and rt_mutex_handle_wakeup to
> rt_mutex_start_proxy_lock and rt_mutex_finish_proxy_lock,
> respectively
>
> Signed-off-by: Darren Hart <[email protected]>
> Cc: Thomas Gleixner <[email protected]>
> Cc: Sripathi Kodi <[email protected]>
> Cc: Peter Zijlstra <[email protected]>
> Cc: John Stultz <[email protected]>
> Cc: Steven Rostedt <[email protected]>
> ---
>
> kernel/rtmutex.c | 193
> +++++++++++++++++++++++++++++++++++++----------
> kernel/rtmutex_common.h | 8 ++
> 2 files changed, 161 insertions(+), 40 deletions(-)
>
>
> diff --git a/kernel/rtmutex.c b/kernel/rtmutex.c
> index 69d9cb9..f438362 100644
> --- a/kernel/rtmutex.c
> +++ b/kernel/rtmutex.c
> @@ -411,6 +411,7 @@ static int try_to_take_rt_mutex(struct rt_mutex
> *lock) */
> static int task_blocks_on_rt_mutex(struct rt_mutex *lock,
> struct rt_mutex_waiter *waiter,
> + struct task_struct *task,
> int detect_deadlock)
> {
> struct task_struct *owner = rt_mutex_owner(lock);
> @@ -418,21 +419,21 @@ static int task_blocks_on_rt_mutex(struct
> rt_mutex *lock, unsigned long flags;
> int chain_walk = 0, res;
>
> - spin_lock_irqsave(&current->pi_lock, flags);
> - __rt_mutex_adjust_prio(current);
> - waiter->task = current;
> + spin_lock_irqsave(&task->pi_lock, flags);
> + __rt_mutex_adjust_prio(task);
> + waiter->task = task;
> waiter->lock = lock;
> - plist_node_init(&waiter->list_entry, current->prio);
> - plist_node_init(&waiter->pi_list_entry, current->prio);
> + plist_node_init(&waiter->list_entry, task->prio);
> + plist_node_init(&waiter->pi_list_entry, task->prio);
>
> /* Get the top priority waiter on the lock */
> if (rt_mutex_has_waiters(lock))
> top_waiter = rt_mutex_top_waiter(lock);
> plist_add(&waiter->list_entry, &lock->wait_list);
>
> - current->pi_blocked_on = waiter;
> + task->pi_blocked_on = waiter;
>
> - spin_unlock_irqrestore(&current->pi_lock, flags);
> + spin_unlock_irqrestore(&task->pi_lock, flags);
>
> if (waiter == rt_mutex_top_waiter(lock)) {
> spin_lock_irqsave(&owner->pi_lock, flags);
> @@ -460,7 +461,7 @@ static int task_blocks_on_rt_mutex(struct
> rt_mutex *lock, spin_unlock(&lock->wait_lock);
>
> res = rt_mutex_adjust_prio_chain(owner, detect_deadlock, lock,
> waiter, - current);
> + task);
>
> spin_lock(&lock->wait_lock);
>
> @@ -605,37 +606,25 @@ void rt_mutex_adjust_pi(struct task_struct
> *task) rt_mutex_adjust_prio_chain(task, 0, NULL, NULL, task);
> }
>
> -/*
> - * Slow path lock function:
> +/**
> + * __rt_mutex_slowlock - perform the wait-wake-try-to-take loop
> + * @lock the rt_mutex to take
> + * @state: the state the task should block in (TASK_INTERRUPTIBLE
> + * or TASK_UNINTERRUPTIBLE)
> + * @timeout: the pre-initialized and started timer, or NULL for
> none + * @waiter: the pre-initialized rt_mutex_waiter
> + * @detect_deadlock: passed to task_blocks_on_rt_mutex
> + *
> + * lock->wait_lock must be held by the caller.
> */
> static int __sched
> -rt_mutex_slowlock(struct rt_mutex *lock, int state,
> - struct hrtimer_sleeper *timeout,
> - int detect_deadlock)
> +__rt_mutex_slowlock(struct rt_mutex *lock, int state,
> + struct hrtimer_sleeper *timeout,
> + struct rt_mutex_waiter *waiter,
> + int detect_deadlock)
> {
> - struct rt_mutex_waiter waiter;
> int ret = 0;
>
> - debug_rt_mutex_init_waiter(&waiter);
> - waiter.task = NULL;
> -
> - spin_lock(&lock->wait_lock);
> -
> - /* Try to acquire the lock again: */
> - if (try_to_take_rt_mutex(lock)) {
> - spin_unlock(&lock->wait_lock);
> - return 0;
> - }
> -
> - set_current_state(state);
> -
> - /* Setup the timer, when timeout != NULL */
> - if (unlikely(timeout)) {
> - hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);
> - if (!hrtimer_active(&timeout->timer))
> - timeout->task = NULL;
> - }
> -
> for (;;) {
> /* Try to acquire the lock: */
> if (try_to_take_rt_mutex(lock))
> @@ -656,19 +645,19 @@ rt_mutex_slowlock(struct rt_mutex *lock, int
> state, }
>
> /*
> - * waiter.task is NULL the first time we come here and
> + * waiter->task is NULL the first time we come here and
> * when we have been woken up by the previous owner
> * but the lock got stolen by a higher prio task.
> */
> - if (!waiter.task) {
> - ret = task_blocks_on_rt_mutex(lock, &waiter,
> + if (!waiter->task) {
> + ret = task_blocks_on_rt_mutex(lock, waiter, current,
> detect_deadlock);
> /*
> * If we got woken up by the owner then start loop
> * all over without going into schedule to try
> * to get the lock now:
> */
> - if (unlikely(!waiter.task)) {
> + if (unlikely(!waiter->task)) {
> /*
> * Reset the return value. We might
> * have returned with -EDEADLK and the
> @@ -684,15 +673,52 @@ rt_mutex_slowlock(struct rt_mutex *lock, int
> state,
>
> spin_unlock(&lock->wait_lock);
>
> - debug_rt_mutex_print_deadlock(&waiter);
> + debug_rt_mutex_print_deadlock(waiter);
>
> - if (waiter.task)
> + if (waiter->task)
> schedule_rt_mutex(lock);
>
> spin_lock(&lock->wait_lock);
> set_current_state(state);
> }
>
> + return ret;
> +}
> +
> +/*
> + * Slow path lock function:
> + */
> +static int __sched
> +rt_mutex_slowlock(struct rt_mutex *lock, int state,
> + struct hrtimer_sleeper *timeout,
> + int detect_deadlock)
> +{
> + struct rt_mutex_waiter waiter;
> + int ret = 0;
> +
> + debug_rt_mutex_init_waiter(&waiter);
> + waiter.task = NULL;
> +
> + spin_lock(&lock->wait_lock);
> +
> + /* Try to acquire the lock again: */
> + if (try_to_take_rt_mutex(lock)) {
> + spin_unlock(&lock->wait_lock);
> + return 0;
> + }
> +
> + set_current_state(state);
> +
> + /* Setup the timer, when timeout != NULL */
> + if (unlikely(timeout)) {
> + hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);
> + if (!hrtimer_active(&timeout->timer))
> + timeout->task = NULL;
> + }
> +
> + ret = __rt_mutex_slowlock(lock, state, timeout, &waiter,
> + detect_deadlock);
> +
> set_current_state(TASK_RUNNING);
>
> if (unlikely(waiter.task))
> @@ -986,6 +1012,42 @@ void rt_mutex_proxy_unlock(struct rt_mutex
> *lock, }
>
> /**
> + * rt_mutex_start_proxy_lock - prepare another task to take the lock
> + *
> + * @lock: the rt_mutex to take
> + * @waiter: the rt_mutex_waiter initialized by the waiter
> + * @task: the task to prepare
> + * @detext_deadlock: passed to task_blocks_on_rt_mutex
> + *
> + * The lock should have an owner, and it should not be task.
> + * Special API call for FUTEX_REQUEUE_PI support.
> + */
> +int rt_mutex_start_proxy_lock(struct rt_mutex *lock,
> + struct rt_mutex_waiter *waiter,
> + struct task_struct *task, int detect_deadlock)
> +{
> + int ret;
> +
> + spin_lock(&lock->wait_lock);
> + ret = task_blocks_on_rt_mutex(lock, waiter, task, detect_deadlock);
> + mark_rt_mutex_waiters(lock);
> + if (ret && !waiter->task) {
> + /*
> + * Reset the return value. We might have
> + * returned with -EDEADLK and the owner
> + * released the lock while we were walking the
> + * pi chain. Let the waiter sort it out.
> + */
> + ret = 0;
> + }
> + spin_unlock(&lock->wait_lock);
> +
> + debug_rt_mutex_print_deadlock(waiter);
> +
> + return ret;
> +}
> +
> +/**
> * rt_mutex_next_owner - return the next owner of the lock
> *
> * @lock: the rt lock query
> @@ -1004,3 +1066,54 @@ struct task_struct *rt_mutex_next_owner(struct
> rt_mutex *lock)
>
> return rt_mutex_top_waiter(lock)->task;
> }
> +
> +/**
> + * rt_mutex_finish_proxy_lock - Complete the taking of the lock
> initialized on + * our behalf by another
> thread. + * @lock: the rt_mutex we were woken on
> + * @to: the timeout, null if none. hrtimer should already have been
> started. + * @waiter: the pre-initialized rt_mutex_waiter
> + * @detect_deadlock: for use by __rt_mutex_slowlock
> + *
> + * Special API call for PI-futex requeue support
> + */
> +int rt_mutex_finish_proxy_lock(struct rt_mutex *lock,
> + struct hrtimer_sleeper *to,
> + struct rt_mutex_waiter *waiter,
> + int detect_deadlock)
> +{
> + int ret;
> +
> + if (waiter->task)
> + schedule_rt_mutex(lock);
> +
> + spin_lock(&lock->wait_lock);
> +
> + set_current_state(TASK_INTERRUPTIBLE);
> +
> + ret = __rt_mutex_slowlock(lock, TASK_INTERRUPTIBLE, to, waiter,
> + detect_deadlock);
> +
> + set_current_state(TASK_RUNNING);
> +
> + if (unlikely(waiter->task))
> + remove_waiter(lock, waiter);
> +
> + /*
> + * try_to_take_rt_mutex() sets the waiter bit unconditionally. We
> might + * have to fix that up.
> + */
> + fixup_rt_mutex_waiters(lock);
> +
> + spin_unlock(&lock->wait_lock);
> +
> + /*
> + * Readjust priority, when we did not get the lock. We might have
> been + * the pending owner and boosted. Since we did not take the
> lock, the + * PI boost has to go.
> + */
> + if (unlikely(ret))
> + rt_mutex_adjust_prio(current);
> +
> + return ret;
> +}
> diff --git a/kernel/rtmutex_common.h b/kernel/rtmutex_common.h
> index e124bf5..97a2f81 100644
> --- a/kernel/rtmutex_common.h
> +++ b/kernel/rtmutex_common.h
> @@ -120,6 +120,14 @@ extern void rt_mutex_init_proxy_locked(struct
> rt_mutex *lock, struct task_struct *proxy_owner);
> extern void rt_mutex_proxy_unlock(struct rt_mutex *lock,
> struct task_struct *proxy_owner);
> +extern int rt_mutex_start_proxy_lock(struct rt_mutex *lock,
> + struct rt_mutex_waiter *waiter,
> + struct task_struct *task,
> + int detect_deadlock);
> +extern int rt_mutex_finish_proxy_lock(struct rt_mutex *lock,
> + struct hrtimer_sleeper *to,
> + struct rt_mutex_waiter *waiter,
> + int detect_deadlock);
>
> #ifdef CONFIG_DEBUG_RT_MUTEXES
> # include "rtmutex-debug.h"

2009-03-07 15:17:34

by Thomas Gleixner

[permalink] [raw]
Subject: Re: [TIP][RFC 2/7] futex: futex_top_waiter()

On Mon, 2 Mar 2009, Darren Hart wrote:

> From: Darren Hart <[email protected]>
>
> Improve legibility by wrapping finding the top waiter in a function. This
> will be used by the follow-on patches for enabling requeue pi.
> +static struct futex_q *futex_top_waiter(struct futex_hash_bucket *hb,
> + union futex_key *key)

Can we just have *head as argument instead of *hb ?

> +{
> + struct plist_head *head;
> + struct futex_q *this;
> + struct futex_q *top_waiter = NULL;

struct futex_q *this;

> + head = &hb->chain;
> + plist_for_each_entry(this, head, list) {
> + if (match_futex(&this->key, key)) {
return this;

> + top_waiter = this;
> + break;
> + }
> + }
> + return top_waiter;

return NULL;

Makes the function half the size.

Thanks,

tglx

2009-03-07 15:31:25

by Thomas Gleixner

[permalink] [raw]
Subject: Re: [TIP][RFC 4/7] futex: finish_futex_lock_pi()

On Mon, 2 Mar 2009, Darren Hart wrote:
> + } else {
> + /* dvhart FIXME: can't we just BUG_ON in this case?

No. There is no reason to crash the kernel if this happens. All what
happens is that a userspace application becomes a bit unhappy.

I did not put a WARN_ON there as the stack trace is known, but we
could do a WARN to trigger the kerneloops detector.

> + * Paranoia check. If we did not take the lock in the trylock
> + * above, then we should not be the owner of the rtmutex,
> + * neither the real nor the pending one:
> + */
> + if (rt_mutex_owner(&q->pi_state->pi_mutex) == current)
> + printk(KERN_ERR "finish_futex_lock_pi: "
> + "ret = %d pi-mutex: %p "
> + "pi-state %p\n", ret,
> + q->pi_state->pi_mutex.owner,
> + q->pi_state->owner);
> + }

Thanks,

tglx

2009-03-07 15:44:55

by Thomas Gleixner

[permalink] [raw]
Subject: Re: [TIP][RFC 5/7] rt_mutex: add proxy lock routines

On Mon, 2 Mar 2009, Darren Hart wrote:
> /**
> + * rt_mutex_start_proxy_lock - prepare another task to take the lock

Hmm. _start_ sounds weird. Also we do not prepare another task to take
the lock. We either take the lock on behalf on another task or block
that task on the lock.

> + * @lock: the rt_mutex to take
> + * @waiter: the rt_mutex_waiter initialized by the waiter

initialized by the caller perhaps ?

> + * @task: the task to prepare
> + * @detext_deadlock: passed to task_blocks_on_rt_mutex

That's not interesting where it is passed to. The argument tells us,
whether deadlock detection needs to be done or not.

> + * The lock should have an owner, and it should not be task.

Why ? The lock can have no owner, if the previous owner released it
before we took lock->wait_lock.

> + * Special API call for FUTEX_REQUEUE_PI support.
> + */
> +int rt_mutex_start_proxy_lock(struct rt_mutex *lock,
> + struct rt_mutex_waiter *waiter,
> + struct task_struct *task, int detect_deadlock)
> +{
> + int ret;
> +
> + spin_lock(&lock->wait_lock);

You need to try to take the lock on behalf of task here under
lock->wait_lock to avoid an enqueue on an ownerless rtmutex.

> +
> +/**
> + * rt_mutex_finish_proxy_lock - Complete the taking of the lock initialized
> on
> + * our behalf by another thread.

IIRC this needs to be a single line. Or does kerneldoc support this now ?

> + * @lock: the rt_mutex we were woken on
> + * @to: the timeout, null if none. hrtimer should already have been started.
> + * @waiter: the pre-initialized rt_mutex_waiter
> + * @detect_deadlock: for use by __rt_mutex_slowlock

See above.

Thanks,

tglx

2009-03-07 15:54:19

by Thomas Gleixner

[permalink] [raw]
Subject: Re: [TIP][RFC 6/7] futex: add requeue_pi calls

On Thu, 5 Mar 2009, Darren Hart wrote:
> int rt_mutex_start_proxy_lock(struct rt_mutex *lock,
> struct rt_mutex_waiter *waiter,
> struct task_struct *task, int detect_deadlock)
> {
> int ret;
>
> spin_lock(&lock->wait_lock);
> ret = task_blocks_on_rt_mutex(lock, waiter, task, detect_deadlock);
>
>
> I add the following line to fix the bug. Question is, should I use this
> atomic
> optimization here (under the lock->wait_lock) or should I just do "lock->owner
> |= RT_MUTEX_HAS_WAITERS" ?
>
> =====> mark_rt_mutex_waiters(lock);

This is still not enough as I explained in the review of the original
patch. What you need to do is:

if (try_to_take_rt_mutex(lock, task)) {
spin_unlock(&lock->wait_lock);
/* The caller needs to wake up task, as it is now the owner */
return WAKEIT;
}

ret = task_blocks_on_rt_mutex(lock, waiter, task, detect_deadlock);

> if (ret && !waiter->task) {
> /*
> * Reset the return value. We might have
> * returned with -EDEADLK and the owner
> * released the lock while we were walking the
> * pi chain. Let the waiter sort it out.
> */
> ret = 0;
> }
> spin_unlock(&lock->wait_lock);
>
> debug_rt_mutex_print_deadlock(waiter);
>
> return ret;
> }

Thanks,

tglx

2009-03-09 09:48:59

by Thomas Gleixner

[permalink] [raw]
Subject: Re: [TIP][RFC 6/7] futex: add requeue_pi calls

On Mon, 2 Mar 2009, Darren Hart wrote:
> From: Darren Hart <[email protected]>
>
> PI Futexes must have an owner at all times, so the standard requeue commands
> aren't sufficient.

That's wrong. The point is that PI Futexes and the kernel internal
rt_mutexes need to have an owner if there are waiters simply because
the PI boosting can not operate on nothing.

> + /* For futex_wait and futex_wait_requeue_pi */
> struct {
> u32 *uaddr;
> u32 val;
> u32 flags;
> u32 bitset;
> + int has_timeout;

Hmm. time == 0 perhaps or just add another flag bit to the flags field ?

> +/* dvhart: FIXME: some commentary here would help to clarify that hb->chain
> is
> + * actually the queue which contains multiple instances of futex_q - one per
> + * waiter. The naming is extremely unfortunate as it refects the
> datastructure
> + * more than its usage. */

Please either do a separate patch which fixes/adds the comment or just
leave it as is. This change has nothing to do with requeue_pi and just
makes review harder.

> /*
> * Split the global futex_lock into every hash list lock.
> */
> @@ -189,6 +196,7 @@ static void drop_futex_key_refs(union futex_key *key)
> /**
> * get_futex_key - Get parameters which are the keys for a futex.
> * @uaddr: virtual address of the futex
> + * dvhart FIXME: incorrent shared comment (fshared, and it's a boolean int)
> + * dvhart FIXME: incorrent shared comment (fshared, and it's a boolean int)

LOL. The time it took to add the FIXME comments would have been enough
to make a separate patch which fixes the existing comment

> /*
> + * futex_requeue_pi_cleanup - cleanup after futex_requeue_pi_init after
> failed
> + * lock acquisition.

"after after cleanup". Shudder. Instead of this useless comment the code
could do with some explanatory comments

> + * @q: the futex_q of the futex we failed to acquire
> + */
> +static void futex_requeue_pi_cleanup(struct futex_q *q)
> +{
> + if (!q->pi_state)
> + return;
> + if (rt_mutex_owner(&q->pi_state->pi_mutex) == current)
> + rt_mutex_unlock(&q->pi_state->pi_mutex);
> + else
> + free_pi_state(q->pi_state);
> +}
> +
> +/*
> * Look up the task based on what TID userspace gave us.
> * We dont trust it.
> */
> @@ -736,6 +760,7 @@ static void wake_futex(struct futex_q *q)
> * at the end of wake_up_all() does not prevent this store from
> * moving.
> */
> + /* dvhart FIXME: "end of wake_up()" */

Sigh.

> smp_wmb();
> q->lock_ptr = NULL;
> }
> @@ -834,6 +859,12 @@ double_lock_hb(struct futex_hash_bucket *hb1, struct
> futex_hash_bucket *hb2)
> }
> }
>
> +/* dvhart FIXME: the wording here is inaccurate as both the physical page and
> + * the offset are required for the hashing, it is also non-intuitve as most
> + * will be thinking of "the futex" not "the physical page and offset this
> + * virtual address points to". Used throughout - consider wholesale cleanup
> of
> + * function commentary.
> + */

/me throws a handful of FIXMEs at dvhart

> /*
> * Wake up all waiters hashed on the physical page that is mapped
> * to this virtual address:
> @@ -988,19 +1019,123 @@ out:
> }
>
> /*
> - * Requeue all waiters hashed on one physical page to another
> - * physical page.
> + * futex_requeue_pi_init - prepare the top waiter to lock the pi futex on
> wake
> + * @pifutex: the user address of the to futex
> + * @hb1: the from futex hash bucket, must be locked by the caller
> + * @hb2: the to futex hash bucket, must be locked by the caller
> + * @key1: the from futex key
> + * @key2: the to futex key
> + *
> + * Returns 0 on success, a negative error code on failure.
> + *
> + * Prepare the top_waiter and the pi_futex for requeing. We handle
> + * the userspace r/w here so that we can handle any faults prior
> + * to entering the requeue loop. hb1 and hb2 must be held by the caller.
> + *
> + * Faults occur for two primary reasons at this point:
> + * 1) The address isn't mapped (what? you didn't use mlock() in your
> real-time
> + * application code? *gasp*)
> + * 2) The address isn't writeable
> + *
> + * We return EFAULT on either of these cases and rely on futex_requeue to
> + * handle them.
> + */
> +static int futex_requeue_pi_init(u32 __user *pifutex,
> + struct futex_hash_bucket *hb1,
> + struct futex_hash_bucket *hb2,
> + union futex_key *key1, union futex_key *key2,
> + struct futex_pi_state **ps)
> +{
> + u32 curval;
> + struct futex_q *top_waiter;
> + pid_t pid;
> + int ret;
> +
> + if (get_futex_value_locked(&curval, pifutex))
> + return -EFAULT;
> +
> + top_waiter = futex_top_waiter(hb1, key1);
> +
> + /* There are no waiters, nothing for us to do. */
> + if (!top_waiter)
> + return 0;
> +
> + /*
> + * The pifutex has an owner, make sure it's us, if not complain
> + * to userspace.
> + * FIXME_LATER: handle this gracefully

We should do this right now. It's not that hard.

> + */
> + pid = curval & FUTEX_TID_MASK;
> + if (pid && pid != task_pid_vnr(current))
> + return -EMORON;

Though it's a pity that we lose EMORON :)

> + /*
> + * Current should own pifutex, but it could be uncontended. Here we
> + * either take the lock for top_waiter or set the FUTEX_WAITERS bit.
> + * The pi_state is also looked up, but we don't care about the return
> + * code as we'll have to look that up during requeue for each waiter
> + * anyway.
> + */
> + ret = futex_lock_pi_atomic(pifutex, hb2, key2, ps, top_waiter->task);

Why do we ignore the retunr value here ???

> + /*
> + * At this point the top_waiter has either taken the pifutex or it is
> + * waiting on it. If the former, then the pi_state will not exist
> yet,
> + * look it up one more time to ensure we have a reference to it.
> + */
> + if (!ps /* FIXME && some ret values in here I think ... */)
> + ret = lookup_pi_state(curval, hb2, key2, ps);
> + return ret;
> +}
> +
> +static inline
> +void requeue_futex(struct futex_q *q, struct futex_hash_bucket *hb1,
> + struct futex_hash_bucket *hb2, union futex_key *key2)
> +{
> +
> + /*
> + * If key1 and key2 hash to the same bucket, no need to
> + * requeue.
> + */
> + if (likely(&hb1->chain != &hb2->chain)) {
> + plist_del(&q->list, &hb1->chain);
> + plist_add(&q->list, &hb2->chain);
> + q->lock_ptr = &hb2->lock;
> +#ifdef CONFIG_DEBUG_PI_LIST
> + q->list.plist.lock = &hb2->lock;
> +#endif
> + }
> + get_futex_key_refs(key2);
> + q->key = *key2;
> +}

Can you please split out such changes to the existing requeue code
into a separate patch ?

> +/*
> + * Requeue all waiters hashed on one physical page to another physical page.
> + * In the requeue_pi case, either takeover uaddr2 or set FUTEX_WAITERS and
> + * setup the pistate. FUTEX_REQUEUE_PI only supports requeueing from a
> non-pi
> + * futex to a pi futex.
> */
> static int futex_requeue(u32 __user *uaddr1, int fshared, u32 __user *uaddr2,
> - int nr_wake, int nr_requeue, u32 *cmpval)
> + int nr_wake, int nr_requeue, u32 *cmpval,
> + int requeue_pi)
> {
> union futex_key key1 = FUTEX_KEY_INIT, key2 = FUTEX_KEY_INIT;
> struct futex_hash_bucket *hb1, *hb2;
> struct plist_head *head1;
> struct futex_q *this, *next;
> - int ret, drop_count = 0;
> + u32 curval2;
> + struct futex_pi_state *pi_state = NULL;
> + int drop_count = 0, attempt = 0, task_count = 0, ret;
> +
> + if (requeue_pi && refill_pi_state_cache())
> + return -ENOMEM;

Why do you want to refill the pi_state_cache of current ? current is
not going to wait for the pi_futex.

> retry:
> + if (pi_state != NULL) {
> + free_pi_state(pi_state);
> + pi_state = NULL;
> + }
> +
> ret = get_futex_key(uaddr1, fshared, &key1);
> if (unlikely(ret != 0))
> goto out;
> @@ -1023,12 +1158,15 @@ retry:
> if (hb1 != hb2)
> spin_unlock(&hb2->lock);
>
> + put_futex_key(fshared, &key2);
> + put_futex_key(fshared, &key1);
> +

Isn't this a reference leak in mainline as well, which needs to be
fixed separate ?

> ret = get_user(curval, uaddr1);
>
> if (!ret)
> goto retry;
>
> - goto out_put_keys;
> + goto out;
> }
> if (curval != *cmpval) {
> ret = -EAGAIN;
> @@ -1036,32 +1174,104 @@ retry:
> }
> }
>
> + if (requeue_pi) {
> + /* FIXME: should we handle the no waiters case special? */

If there are no waiters then we should drop out here right away. Why
should we do more if we know already that there is nothing to do.

> + ret = futex_requeue_pi_init(uaddr2, hb1, hb2, &key1, &key2,
> + &pi_state);
> +
> + if (!ret)
> + ret = get_futex_value_locked(&curval2, uaddr2);
> +
> + switch (ret) {
> + case 0:
> + break;
> + case 1:
> + /* we got the lock */
> + ret = 0;
> + break;
> + case -EFAULT:
> + /*
> + * We handle the fault here instead of in
> + * futex_requeue_pi_init because we have to reacquire
> + * both locks to avoid deadlock.
> + */
> + spin_unlock(&hb1->lock);
> + if (hb1 != hb2)
> + spin_unlock(&hb2->lock);

Can we please put that sequeuence into an inline function
e.g. double_unlock_hb(). We have at least 5 instances of it.

> + put_futex_key(fshared, &key2);
> + put_futex_key(fshared, &key1);
> +
> + if (attempt++) {
> + ret = futex_handle_fault((unsigned
> long)uaddr2,
> + attempt);
> + if (ret)
> + goto out;
> + goto retry;
> + }
> +
> + ret = get_user(curval2, uaddr2);
> +
> + if (!ret)
> + goto retry;
> + goto out;
> + case -EAGAIN:
> + /* The owner was exiting, try again. */
> + spin_unlock(&hb1->lock);
> + if (hb1 != hb2)
> + spin_unlock(&hb2->lock);
> + put_futex_key(fshared, &key2);
> + put_futex_key(fshared, &key1);
> + cond_resched();
> + goto retry;
> + default:
> + goto out_unlock;
> + }
> + }
> +
> head1 = &hb1->chain;
> plist_for_each_entry_safe(this, next, head1, list) {
> if (!match_futex (&this->key, &key1))
> continue;
> - if (++ret <= nr_wake) {
> - wake_futex(this);
> + /*
> + * Regardless of if we are waking or requeueing, we need to
> + * prepare the waiting task to take the rt_mutex in the
> + * requeue_pi case. If we gave the lock to the top_waiter in
> + * futex_requeue_pi_init() then don't enqueue that task as a
> + * waiter on the rt_mutex (it already owns it).
> + */
> + if (requeue_pi &&
> + ((curval2 & FUTEX_TID_MASK) != task_pid_vnr(this->task)))
> {

Why don't you check the owner of the rtmutex, which we installed
already ? Then we can drop this curval2 business altogether.

> + atomic_inc(&pi_state->refcount);
> + this->pi_state = pi_state;
> + ret = rt_mutex_start_proxy_lock(&pi_state->pi_mutex,
> + this->rt_waiter,
> + this->task, 1);
> + if (ret)
> + goto out_unlock;
> + }
> +
> + if (++task_count <= nr_wake) {
> + if (requeue_pi) {
> + /*
> + * In the case of requeue_pi we need to know
> if
> + * we were requeued or not, so do the requeue
> + * regardless if we are to wake this task.
> + */
> + requeue_futex(this, hb1, hb2, &key2);
> + drop_count++;
> + /* FIXME: look into altering wake_futex so we
> + * can use it (we can't null the lock_ptr) */

Err, no. wake_futex() does a plist_del(&q->list, &q->list.plist);
What's wrong with using wake_up() ?

> + wake_up(&this->waiter);
> + } else
> + wake_futex(this);
> } else {
> - /*
> - * If key1 and key2 hash to the same bucket, no need
> to
> - * requeue.
> - */
> - if (likely(head1 != &hb2->chain)) {
> - plist_del(&this->list, &hb1->chain);
> - plist_add(&this->list, &hb2->chain);
> - this->lock_ptr = &hb2->lock;
> -#ifdef CONFIG_DEBUG_PI_LIST
> - this->list.plist.lock = &hb2->lock;
> -#endif
> - }
> - this->key = key2;
> - get_futex_key_refs(&key2);
> + requeue_futex(this, hb1, hb2, &key2);
> drop_count++;
> -
> - if (ret - nr_wake >= nr_requeue)
> - break;
> }
> +
> + if (task_count - nr_wake >= nr_requeue)
> + break;
> }
>
> out_unlock:
> @@ -1073,12 +1283,13 @@ out_unlock:
> while (--drop_count >= 0)
> drop_futex_key_refs(&key1);
>
> -out_put_keys:
> put_futex_key(fshared, &key2);
> out_put_key1:
> put_futex_key(fshared, &key1);
> out:
> - return ret;
> + if (pi_state != NULL)
> + free_pi_state(pi_state);
> + return ret ? ret : task_count;
> }
>
> /* The key must be already stored in q->key. */
> @@ -1180,6 +1391,7 @@ retry:
> */
> static void unqueue_me_pi(struct futex_q *q)
> {
> + /* FIXME: hitting this warning for requeue_pi */

And why ?

> WARN_ON(plist_node_empty(&q->list));
> plist_del(&q->list, &q->list.plist);
>
> @@ -1302,6 +1514,8 @@ handle_fault:
> #define FLAGS_CLOCKRT 0x02
>
> static long futex_wait_restart(struct restart_block *restart);
> +static long futex_wait_requeue_pi_restart(struct restart_block *restart);
> +static long futex_lock_pi_restart(struct restart_block *restart);
>
> /* finish_futex_lock_pi - post lock pi_state and corner case management
> * @uaddr: the user address of the futex
> @@ -1466,6 +1680,9 @@ retry:
>
> hb = queue_lock(&q);
>
> + /* dvhart FIXME: we access the page before it is queued... obsolete
> + * comments? */
> +

I think so.

> /*
> * Access the page AFTER the futex is queued.
> * Order is important:
> @@ -1529,6 +1746,7 @@ retry:
> restart->fn = futex_wait_restart;
> restart->futex.uaddr = (u32 *)uaddr;
> restart->futex.val = val;
> + restart->futex.has_timeout = 1;
> restart->futex.time = abs_time->tv64;
> restart->futex.bitset = bitset;
> restart->futex.flags = 0;
> @@ -1559,12 +1777,16 @@ static long futex_wait_restart(struct restart_block
> *restart)
> u32 __user *uaddr = (u32 __user *)restart->futex.uaddr;
> int fshared = 0;
> ktime_t t;
> + ktime_t *tp = NULL;

One line please

> - t.tv64 = restart->futex.time;
> + if (restart->futex.has_timeout) {
> + t.tv64 = restart->futex.time;
> + tp = &t;
> + }
> restart->fn = do_no_restart_syscall;
> if (restart->futex.flags & FLAGS_SHARED)
> fshared = 1;
> - return (long)futex_wait(uaddr, fshared, restart->futex.val, &t,
> + return (long)futex_wait(uaddr, fshared, restart->futex.val, tp,
> restart->futex.bitset,
> restart->futex.flags & FLAGS_CLOCKRT);
> }
> @@ -1621,6 +1843,7 @@ retry_unlocked:
> * complete.
> */
> queue_unlock(&q, hb);
> + /* FIXME: need to put_futex_key() ? */

Yes. Needs to be fixed in mainline as well.

> cond_resched();
> goto retry;
> default:
> @@ -1653,16 +1876,6 @@ retry_unlocked:
>
> goto out;
>
> -out_unlock_put_key:
> - queue_unlock(&q, hb);
> -
> -out_put_key:
> - put_futex_key(fshared, &q.key);
> -out:
> - if (to)
> - destroy_hrtimer_on_stack(&to->timer);
> - return ret;
> -
> uaddr_faulted:
> /*
> * We have to r/w *(int __user *)uaddr, and we have to modify it
> @@ -1685,6 +1898,34 @@ uaddr_faulted:
> goto retry;
>
> goto out;
> +
> +out_unlock_put_key:
> + queue_unlock(&q, hb);
> +
> +out_put_key:
> + put_futex_key(fshared, &q.key);
> +
> +out:
> + if (to)
> + destroy_hrtimer_on_stack(&to->timer);
> + return ret;
> +
> +}
> +
> +static long futex_lock_pi_restart(struct restart_block *restart)
> +{
> + u32 __user *uaddr = (u32 __user *)restart->futex.uaddr;
> + ktime_t t;
> + ktime_t *tp = NULL;
> + int fshared = restart->futex.flags & FLAGS_SHARED;
> +
> + if (restart->futex.has_timeout) {
> + t.tv64 = restart->futex.time;
> + tp = &t;
> + }
> + restart->fn = do_no_restart_syscall;
> +
> + return (long)futex_lock_pi(uaddr, fshared, restart->futex.val, tp, 0);
> }
>
> /*
> @@ -1797,6 +2038,316 @@ pi_faulted:
> }
>
> /*
> + * futex_wait_requeue_pi - wait on futex1 (uaddr) and take the futex2
> (uaddr2)
> + * before returning
> + * @uaddr: the futex we initialyl wait on (possibly non-pi)
> + * @fshared: whether the futexes are shared (1) or not (0). They must be the
> + * same type, no requeueing from private to shared, etc.
> + * @val: the expected value of uaddr
> + * @abs_time: absolute timeout
> + * @bitset: FIXME ???
> + * @clockrt: whether to use CLOCK_REALTIME (1) or CLOCK_MONOTONIC (0)
> + * @uaddr2: the pi futex we will take prior to returning to user-space
> + *
> + * The caller will wait on futex1 (uaddr) and will be requeued by
> + * futex_requeue() to futex2 (uaddr2) which must be PI aware. Normal wakeup
> + * will wake on futex2 and then proceed to take the underlying rt_mutex prior
> + * to returning to userspace. This ensures the rt_mutex maintains an owner
> + * when it has waiters. Without one we won't know who to boost/deboost, if
> + * there was a need to.
> + *
> + * We call schedule in futex_wait_queue_me() when we enqueue and return there
> + * via the following:
> + * 1) signal
> + * 2) timeout
> + * 3) wakeup on the first futex (uaddr)
> + * 4) wakeup on the second futex (uaddr2, the pi futex) after a requeue
> + *
> + * If 1 or 2, we need to check if we got the rtmutex, setup the pi_state, or
> + * were enqueued on the rt_mutex via futex_requeue_pi_init() just before the
> + * signal or timeout arrived. If so, we need to clean that up. Note: the
> + * setting of FUTEX_WAITERS will be handled when the owner unlocks the
> + * rt_mutex.
> + *
> + * If 3, userspace wrongly called FUTEX_WAKE on the first futex (uaddr)
> rather
> + * than using the FUTEX_REQUEUE_PI call with nr_requeue=0. Return -EINVAL.
> + *
> + * If 4, we may then block on trying to take the rt_mutex and return via:
> + * 5) signal
> + * 6) timeout
> + * 7) successful lock
> + *
> + * If 5, we setup a restart_block with futex_lock_pi() as the function.
> + *
> + * If 6, we cleanup and return with -ETIMEDOUT.
> + *
> + * TODO:
> + * o once we have the all the return points correct, we need to collect
> + * common code into exit labels.
> + *
> + * Returns:
> + * 0 Success
> + * -EFAULT For various faults
> + * -EWOULDBLOCK uaddr has an unexpected value (it
> changed
> + * before we got going)
> + * -ETIMEDOUT timeout (from either wait on futex1 or locking
> + * futex2)
> + * -ERESTARTSYS Signal received (during wait on
> futex1) with no
> + * timeout
> + * -ERESTART_RESTARTBLOCK Signal received (during wait on futex1)
> + * -RESTARTNOINTR Signal received (during lock of futex2)
> + * -EINVAL No bitset, woke via FUTEX_WAKE, etc.
> + *
> + * May also passthrough the follpowing return codes (not exhaustive):
> + * -EPERM see get_futex_key()
> + * -EACCESS see get_futex_key()
> + * -ENOMEM see get_user_pages()
> + *
> + */
> +static int futex_wait_requeue_pi(u32 __user *uaddr, int fshared,
> + u32 val, ktime_t *abs_time, u32 bitset,
> + int clockrt, u32 __user *uaddr2)
> +{
> + struct futex_hash_bucket *hb;
> + struct futex_q q;
> + union futex_key key2 = FUTEX_KEY_INIT;
> + u32 uval;
> + struct rt_mutex *pi_mutex;
> + struct rt_mutex_waiter rt_waiter;
> + struct hrtimer_sleeper timeout, *to = NULL;
> + int requeued = 0;
> + int ret;

All ints in a line please. And please order the lines in descending
line length. Makes it much easier to read.

> + if (!bitset)
> + return -EINVAL;
> +
> + if (abs_time) {
> + unsigned long slack;

Missing new line. Also this is nonsense. A rt task should have set
its timer_slack_ns to 0, so we can just use current->timer_slack_ns
in the hrtimer_set_expires_range_ns(). If the timer_slack_ns is
random for rt_tasks then we need to fix this in general.

> + to = &timeout;
> + slack = current->timer_slack_ns;
> + if (rt_task(current))
> + slack = 0;
> + hrtimer_init_on_stack(&to->timer, clockrt ? CLOCK_REALTIME :
> + CLOCK_MONOTONIC, HRTIMER_MODE_ABS);
> + hrtimer_init_sleeper(to, current);
> + hrtimer_set_expires_range_ns(&to->timer, *abs_time, slack);
> + }
> +
> + /*
> + * The waiter is allocated on our stack, manipulated by the requeue
> + * code while we sleep on the initial futex (uaddr).
> + */
> + debug_rt_mutex_init_waiter(&rt_waiter);
> + rt_waiter.task = NULL;
> +
> + q.pi_state = NULL;
> + q.bitset = bitset;
> + q.rt_waiter = &rt_waiter;
> +
> +retry:
> + q.key = FUTEX_KEY_INIT;
> + ret = get_futex_key(uaddr, fshared, &q.key);
> + if (unlikely(ret != 0))
> + goto out;
> +
> + ret = get_futex_key(uaddr2, fshared, &key2);
> + if (unlikely(ret != 0)) {
> + drop_futex_key_refs(&q.key);
> + goto out;
> + }
> +
> + hb = queue_lock(&q);
> +
> + /* dvhart FIXME: we access the page before it is queued... obsolete
> + * comments? */

Yes. The reason is the serializing via hb->lock.

> + /*
> + * Access the page AFTER the futex is queued.
> + * Order is important:
> + *
> + * Userspace waiter: val = var; if (cond(val)) futex_wait(&var,
> val);
> + * Userspace waker: if (cond(var)) { var = new; futex_wake(&var); }
> + *
> + * The basic logical guarantee of a futex is that it blocks ONLY
> + * if cond(var) is known to be true at the time of blocking, for
> + * any cond. If we queued after testing *uaddr, that would open
> + * a race condition where we could block indefinitely with
> + * cond(var) false, which would violate the guarantee.
> + *
> + * A consequence is that futex_wait() can return zero and absorb
> + * a wakeup when *uaddr != val on entry to the syscall. This is
> + * rare, but normal.
> + *
> + * for shared futexes, we hold the mmap semaphore, so the mapping
> + * cannot have changed since we looked it up in get_futex_key.
> + */
> + ret = get_futex_value_locked(&uval, uaddr);
> +
> + if (unlikely(ret)) {
> + queue_unlock(&q, hb);
> + put_futex_key(fshared, &q.key);
> +
> + ret = get_user(uval, uaddr);
> +
> + if (!ret)
> + goto retry;
> + goto out;
> + }
> + ret = -EWOULDBLOCK;
> +
> + /* Only actually queue if *uaddr contained val. */
> + if (uval != val) {
> + queue_unlock(&q, hb);
> + put_futex_key(fshared, &q.key);
> + goto out;
> + }
> +
> + /* queue_me and wait for wakeup, timeout, or a signal. */
> + futex_wait_queue_me(hb, &q, to);
> +
> + /*
> + * Upon return from futex_wait_queue_me, we no longer hold the hb
> lock,
> + * but do still hold a key reference. unqueue_me* will drop a key
> + * reference to q.key.
> + */
> +
> + requeued = match_futex(&q.key, &key2);
> + drop_futex_key_refs(&key2);

Why do we drop the ref to key2 here ? What if we were requeued ?

> + if (!requeued) {
> + /* Handle wakeup from futex1 (uaddr). */
> + ret = unqueue_me(&q);
> + if (unlikely(ret)) { /* signal */

Please put the comment into a separate line if a comment is
neccesary for the condition. This one is pretty useless.

Also the logic is completely backwards here. It has to be the same
as in futex_wait()

if (!unqueue_me()) {
handle_futex_wakeup();
} else {
if (timeout happened) {
handle_timeout;
} else {
prepare_restart();
}
}

> + /*
> + * We expect signal_pending(current), but another
> + * thread may have handled it for us already.
> + */
> + if (!abs_time) {
> + ret = -ERESTARTSYS;
> + } else {
> + struct restart_block *restart;
> + restart =
> &current_thread_info()->restart_block;
> + restart->fn = futex_wait_requeue_pi_restart;
> + restart->futex.uaddr = (u32 *)uaddr;
> + restart->futex.val = val;
> + restart->futex.has_timeout = 1;
> + restart->futex.time = abs_time->tv64;
> + restart->futex.bitset = bitset;
> + restart->futex.flags = 0;
> + restart->futex.uaddr2 = (u32 *)uaddr2;
> +
> + if (fshared)
> + restart->futex.flags |= FLAGS_SHARED;
> + if (clockrt)
> + restart->futex.flags |= FLAGS_CLOCKRT;
> + ret = -ERESTART_RESTARTBLOCK;
> + }
> + } else if (to && !to->task) {/* timeout */
> + ret = -ETIMEDOUT;
> + } else {
> + /*
> + * We woke on uaddr, so userspace probably paired a
> + * FUTEX_WAKE with FUTEX_WAIT_REQUEUE_PI, which is not
> + * valid.
> + */
> + ret = -EINVAL;
> + }
> + goto out;
> + }
> +
> + /* Handle wakeup from rt_mutex of futex2 (uaddr2). */
> +
> + /* FIXME: this test is REALLY scary... gotta be a better way...
> + * If the pi futex was uncontended, futex_requeue_pi_init() gave us
> + * the lock.
> + */

Didn't we take the rtmutex as well ??

> + if (!q.pi_state) {
> + ret = 0;
> + goto out;
> + }
> + pi_mutex = &q.pi_state->pi_mutex;
> +
> + ret = rt_mutex_finish_proxy_lock(pi_mutex, to, &rt_waiter, 1);


Eeek. We got a wakeup _after_ we have been requeued. So now you go
back to sleep on the pi_mutex ?

> + debug_rt_mutex_free_waiter(&waiter);
> +
> + if (ret) {
> + if (get_user(uval, uaddr2))
> + ret = -EFAULT;

Why drop we out here w/o retrying ?

Also why do we have a separate handling of "ret" here ? This logic
is fundamentally different from futex_lock_pi().

> + if (ret == -EINTR) {
> + /*
> + * We've already been requeued and enqueued on the
> + * rt_mutex, so restart by calling futex_lock_pi()
> + * directly, rather then returning to this function.
> + */
> + struct restart_block *restart;
> + restart = &current_thread_info()->restart_block;
> + restart->fn = futex_lock_pi_restart;
> + restart->futex.uaddr = (u32 *)uaddr2;
> + restart->futex.val = uval;
> + if (abs_time) {
> + restart->futex.has_timeout = 1;
> + restart->futex.time = abs_time->tv64;
> + } else
> + restart->futex.has_timeout = 0;
> + restart->futex.flags = 0;
> +
> + if (fshared)
> + restart->futex.flags |= FLAGS_SHARED;
> + if (clockrt)
> + restart->futex.flags |= FLAGS_CLOCKRT;
> + ret = -ERESTART_RESTARTBLOCK;
> + }
> + }
> +
> + spin_lock(q.lock_ptr);
> + ret = finish_futex_lock_pi(uaddr, fshared, &q, ret);
> +
> + /* Unqueue and drop the lock. */
> + unqueue_me_pi(&q);
> +
> +out:
> + if (to) {
> + hrtimer_cancel(&to->timer);
> + destroy_hrtimer_on_stack(&to->timer);
> + }
> + if (requeued) {
> + /* We were requeued, so we now have two reference to key2,
> + * unqueue_me_pi releases one of them, we must release the
> + * other. */
> + drop_futex_key_refs(&key2);
> + if (ret) {

This whole "ret" logic is confusing.

> + futex_requeue_pi_cleanup(&q);
> + if (get_user(uval, uaddr2))
> + ret = -EFAULT;
> + if (ret != -ERESTART_RESTARTBLOCK && ret != -EFAULT)
> + ret = futex_lock_pi(uaddr2, fshared, uval,
> + abs_time, 0);
> + }
> + }
> + return ret;
> +}

Thanks,

tglx

2009-03-09 18:04:56

by Darren Hart

[permalink] [raw]
Subject: Re: [TIP][RFC 2/7] futex: futex_top_waiter()

Thomas Gleixner wrote:
> On Mon, 2 Mar 2009, Darren Hart wrote:
>
>> From: Darren Hart <[email protected]>
>>
>> Improve legibility by wrapping finding the top waiter in a function. This
>> will be used by the follow-on patches for enabling requeue pi.
>> +static struct futex_q *futex_top_waiter(struct futex_hash_bucket *hb,
>> + union futex_key *key)
>
> Can we just have *head as argument instead of *hb ?
>
>> +{
>> + struct plist_head *head;
>> + struct futex_q *this;
>> + struct futex_q *top_waiter = NULL;
>
> struct futex_q *this;
>
>> + head = &hb->chain;
>> + plist_for_each_entry(this, head, list) {
>> + if (match_futex(&this->key, key)) {
> return this;
>
>> + top_waiter = this;
>> + break;
>> + }
>> + }
>> + return top_waiter;
>
> return NULL;
>
> Makes the function half the size.

I'd prefer to keep a futex_hash_bucket in the signature as a plist_head
doesn't help describe what the function does. However, I believe I have
addressed your concerns as to length with the following. Does this look
acceptable? I'll be resending the entire V6 patch series soon.

/**
* futex_top_waiter - return the highest priority waiter on a futex
* @hb: the hash bucket the futex_q's reside in
* @key: the futex key (to distinguish it from other futex futex_q's)
*
* Must be called with the hb lock held.
**/
static struct futex_q *futex_top_waiter(struct futex_hash_bucket *hb,
union futex_key *key)
{
struct futex_q *this;

plist_for_each_entry(this, &hb->chain, list) {
if (match_futex(&this->key, key))
return this;
}
return NULL;
}



--
Darren Hart
IBM Linux Technology Center
Real-Time Linux Team

2009-03-09 18:06:12

by Darren Hart

[permalink] [raw]
Subject: Re: [TIP][RFC 4/7] futex: finish_futex_lock_pi()

Thomas Gleixner wrote:
> On Mon, 2 Mar 2009, Darren Hart wrote:
>> + } else {
>> + /* dvhart FIXME: can't we just BUG_ON in this case?
>
> No. There is no reason to crash the kernel if this happens. All what
> happens is that a userspace application becomes a bit unhappy.
>
> I did not put a WARN_ON there as the stack trace is known, but we
> could do a WARN to trigger the kerneloops detector.

OK, no need for a change. Easy enough to add debug if someone were to
hit it.

>
>> + * Paranoia check. If we did not take the lock in the trylock
>> + * above, then we should not be the owner of the rtmutex,
>> + * neither the real nor the pending one:
>> + */
>> + if (rt_mutex_owner(&q->pi_state->pi_mutex) == current)
>> + printk(KERN_ERR "finish_futex_lock_pi: "
>> + "ret = %d pi-mutex: %p "
>> + "pi-state %p\n", ret,
>> + q->pi_state->pi_mutex.owner,
>> + q->pi_state->owner);
>> + }
>
> Thanks,
>
> tglx


--
Darren Hart
IBM Linux Technology Center
Real-Time Linux Team

2009-03-09 18:31:47

by Darren Hart

[permalink] [raw]
Subject: Re: [TIP][RFC 5/7] rt_mutex: add proxy lock routines

Thomas Gleixner wrote:
> On Mon, 2 Mar 2009, Darren Hart wrote:
>> /**
>> + * rt_mutex_start_proxy_lock - prepare another task to take the lock
>
> Hmm. _start_ sounds weird.

I thought on this for a while... but these names still seem the most
appropriate to me, here's why:

rt_mutex - because it is
start - because this is the first half of a two part action
proxy - because it is initiated by one thread on behalf of another
lock - because we are trying to take the lock

This seems the most consistent with the naming scheme used throughout
rtmutex.c as well. If you have a pair of names for these two functions
that you think would make more sense, please let me know.

> Also we do not prepare another task to take
> the lock. We either take the lock on behalf on another task or block
> that task on the lock.

Agreed:

" * rt_mutex_start_proxy_lock - Start lock acquisition for another task"

>
>> + * @lock: the rt_mutex to take
>> + * @waiter: the rt_mutex_waiter initialized by the waiter
>
> initialized by the caller perhaps ?

Actually the rt_mutex_waiter is created on the stack of the waiter in
futex_wait_requeue_pi() and added to the futex_q structure for the waker
to access. So it should be the waiter... if the comment is confusing I
can either elaborate on multiple lines or just say something like:

"* @waiter: the pre-initialized rt_mutex_waiter"

Since this call shouldn't care who initialized it, nor where, so long as
it IS initialized. I'll take this approach unless I hear otherwise.


>
>> + * @task: the task to prepare
>> + * @detext_deadlock: passed to task_blocks_on_rt_mutex

"* @detect_deadlock: perform deadlock detection (1) or not (0)"

>
> That's not interesting where it is passed to. The argument tells us,
> whether deadlock detection needs to be done or not.
>
>> + * The lock should have an owner, and it should not be task.
>
> Why ? The lock can have no owner, if the previous owner released it
> before we took lock->wait_lock.

Hrm... I was considering moving the spin_lock(wait_lock) out of this
routine, but we would still need to ensure the lock was still held.
I'll look at making this safe without that condition.

>
>> + * Special API call for FUTEX_REQUEUE_PI support.
>> + */
>> +int rt_mutex_start_proxy_lock(struct rt_mutex *lock,
>> + struct rt_mutex_waiter *waiter,
>> + struct task_struct *task, int detect_deadlock)
>> +{
>> + int ret;
>> +
>> + spin_lock(&lock->wait_lock);
>
> You need to try to take the lock on behalf of task here under
> lock->wait_lock to avoid an enqueue on an ownerless rtmutex.
>

Will do.

>> +
>> +/**
>> + * rt_mutex_finish_proxy_lock - Complete the taking of the lock initialized
>> on
>> + * our behalf by another thread.
>
> IIRC this needs to be a single line. Or does kerneldoc support this now ?

You are correct. V6 will correct all the kernel-doc screw-ups.

>
>> + * @lock: the rt_mutex we were woken on
>> + * @to: the timeout, null if none. hrtimer should already have been started.
>> + * @waiter: the pre-initialized rt_mutex_waiter
>> + * @detect_deadlock: for use by __rt_mutex_slowlock
>
> See above.

Check.

Thanks for the review,

--
Darren Hart
IBM Linux Technology Center
Real-Time Linux Team

2009-03-09 19:55:42

by Darren Hart

[permalink] [raw]
Subject: Re: [TIP][RFC 6/7] futex: add requeue_pi calls

Thomas Gleixner wrote:
> On Thu, 5 Mar 2009, Darren Hart wrote:
>> int rt_mutex_start_proxy_lock(struct rt_mutex *lock,
>> struct rt_mutex_waiter *waiter,
>> struct task_struct *task, int detect_deadlock)
>> {
>> int ret;
>>
>> spin_lock(&lock->wait_lock);
>> ret = task_blocks_on_rt_mutex(lock, waiter, task, detect_deadlock);
>>
>>
>> I add the following line to fix the bug. Question is, should I use this
>> atomic
>> optimization here (under the lock->wait_lock) or should I just do "lock->owner
>> |= RT_MUTEX_HAS_WAITERS" ?
>>
>> =====> mark_rt_mutex_waiters(lock);
>
> This is still not enough as I explained in the review of the original
> patch. What you need to do is:
>
> if (try_to_take_rt_mutex(lock, task)) {
> spin_unlock(&lock->wait_lock);
> /* The caller needs to wake up task, as it is now the owner */
> return WAKEIT;
> }
>
> ret = task_blocks_on_rt_mutex(lock, waiter, task, detect_deadlock);
>

Right, so I'm testing this out:

mark_rt_mutex_waiters(lock);

if (!rt_mutex_owner(lock) || try_to_steal_lock(lock, task)) {
/* We got the lock for task. */
debug_rt_mutex_lock(lock);

rt_mutex_set_owner(lock, task, 0);

rt_mutex_deadlock_account_lock(lock, task);
return 1;
}

Steven, is this the proper use of the debug* routines? I copied them
from try_to_take_rt_mutex(), but they are empty routines without
comments so I wasn't sure exactly how they were intended to be used.
Does the usage of debug_rt_mutex_lock() assume task=current (the other
has the task_struct passed int).

Thanks,

Darren


>> if (ret && !waiter->task) {
>> /*
>> * Reset the return value. We might have
>> * returned with -EDEADLK and the owner
>> * released the lock while we were walking the
>> * pi chain. Let the waiter sort it out.
>> */
>> ret = 0;
>> }
>> spin_unlock(&lock->wait_lock);
>>
>> debug_rt_mutex_print_deadlock(waiter);
>>
>> return ret;
>> }
>
> Thanks,
>
> tglx


--
Darren Hart
IBM Linux Technology Center
Real-Time Linux Team

2009-03-10 04:50:28

by Darren Hart

[permalink] [raw]
Subject: Re: [TIP][RFC 6/7] futex: add requeue_pi calls

Thomas Gleixner wrote:
> On Mon, 2 Mar 2009, Darren Hart wrote:
>> From: Darren Hart <[email protected]>
>>
>> PI Futexes must have an owner at all times, so the standard requeue commands
>> aren't sufficient.
>
> That's wrong. The point is that PI Futexes and the kernel internal
> rt_mutexes need to have an owner if there are waiters simply because
> the PI boosting can not operate on nothing.

Yup, sorry - old patch header comment. Fixed.

>
>> + /* For futex_wait and futex_wait_requeue_pi */
>> struct {
>> u32 *uaddr;
>> u32 val;
>> u32 flags;
>> u32 bitset;
>> + int has_timeout;
>
> Hmm. time == 0 perhaps or just add another flag bit to the flags field ?

I've created FLAGS_HAS_TIMEOUT and use that instead. I opted for this
approach instead of using time==0 as that seemed like it could be
confused for an expired timer.

>
>> +/* dvhart: FIXME: some commentary here would help to clarify that hb->chain
>> is
>> + * actually the queue which contains multiple instances of futex_q - one per
>> + * waiter. The naming is extremely unfortunate as it refects the
>> datastructure
>> + * more than its usage. */
>
> Please either do a separate patch which fixes/adds the comment or just
> leave it as is. This change has nothing to do with requeue_pi and just
> makes review harder.

My apologies, I had removed all of these types of FIXMEs into another
branch, but produced this patchset from the wrong git tree. We'll call
this "Figure 1" and I'll refer to it often below ;-)

>
>> /*
>> * Split the global futex_lock into every hash list lock.
>> */
>> @@ -189,6 +196,7 @@ static void drop_futex_key_refs(union futex_key *key)
>> /**
>> * get_futex_key - Get parameters which are the keys for a futex.
>> * @uaddr: virtual address of the futex
>> + * dvhart FIXME: incorrent shared comment (fshared, and it's a boolean int)
>> + * dvhart FIXME: incorrent shared comment (fshared, and it's a boolean int)
>
> LOL. The time it took to add the FIXME comments would have been enough
> to make a separate patch which fixes the existing comment


See Figure 1


>
>> /*
>> + * futex_requeue_pi_cleanup - cleanup after futex_requeue_pi_init after
>> failed
>> + * lock acquisition.
>
> "after after cleanup". Shudder. Instead of this useless comment the code
> could do with some explanatory comments
>


This function looks like it could do with an overhaul. I wrote it early
on and kind of forgot about it :/ Thanks for the catch.


>> + * @q: the futex_q of the futex we failed to acquire
>> + */
>> +static void futex_requeue_pi_cleanup(struct futex_q *q)
>> +{
>> + if (!q->pi_state)
>> + return;
>> + if (rt_mutex_owner(&q->pi_state->pi_mutex) == current)
>> + rt_mutex_unlock(&q->pi_state->pi_mutex);
>> + else
>> + free_pi_state(q->pi_state);
>> +}
>> +
>> +/*
>> * Look up the task based on what TID userspace gave us.
>> * We dont trust it.
>> */
>> @@ -736,6 +760,7 @@ static void wake_futex(struct futex_q *q)
>> * at the end of wake_up_all() does not prevent this store from
>> * moving.
>> */
>> + /* dvhart FIXME: "end of wake_up()" */
>
> Sigh.


See Figure 1


>
>> smp_wmb();
>> q->lock_ptr = NULL;
>> }
>> @@ -834,6 +859,12 @@ double_lock_hb(struct futex_hash_bucket *hb1, struct
>> futex_hash_bucket *hb2)
>> }
>> }
>>
>> +/* dvhart FIXME: the wording here is inaccurate as both the physical page and
>> + * the offset are required for the hashing, it is also non-intuitve as most
>> + * will be thinking of "the futex" not "the physical page and offset this
>> + * virtual address points to". Used throughout - consider wholesale cleanup
>> of
>> + * function commentary.
>> + */
>
> /me throws a handful of FIXMEs at dvhart


See Figure 1


>
>> /*
>> * Wake up all waiters hashed on the physical page that is mapped
>> * to this virtual address:
>> @@ -988,19 +1019,123 @@ out:
>> }
>>
>> /*
>> - * Requeue all waiters hashed on one physical page to another
>> - * physical page.
>> + * futex_requeue_pi_init - prepare the top waiter to lock the pi futex on
>> wake
>> + * @pifutex: the user address of the to futex
>> + * @hb1: the from futex hash bucket, must be locked by the caller
>> + * @hb2: the to futex hash bucket, must be locked by the caller
>> + * @key1: the from futex key
>> + * @key2: the to futex key
>> + *
>> + * Returns 0 on success, a negative error code on failure.
>> + *
>> + * Prepare the top_waiter and the pi_futex for requeing. We handle
>> + * the userspace r/w here so that we can handle any faults prior
>> + * to entering the requeue loop. hb1 and hb2 must be held by the caller.
>> + *
>> + * Faults occur for two primary reasons at this point:
>> + * 1) The address isn't mapped (what? you didn't use mlock() in your
>> real-time
>> + * application code? *gasp*)
>> + * 2) The address isn't writeable
>> + *
>> + * We return EFAULT on either of these cases and rely on futex_requeue to
>> + * handle them.
>> + */
>> +static int futex_requeue_pi_init(u32 __user *pifutex,
>> + struct futex_hash_bucket *hb1,
>> + struct futex_hash_bucket *hb2,
>> + union futex_key *key1, union futex_key *key2,
>> + struct futex_pi_state **ps)
>> +{
>> + u32 curval;
>> + struct futex_q *top_waiter;
>> + pid_t pid;
>> + int ret;
>> +
>> + if (get_futex_value_locked(&curval, pifutex))
>> + return -EFAULT;
>> +
>> + top_waiter = futex_top_waiter(hb1, key1);
>> +
>> + /* There are no waiters, nothing for us to do. */
>> + if (!top_waiter)
>> + return 0;
>> +
>> + /*
>> + * The pifutex has an owner, make sure it's us, if not complain
>> + * to userspace.
>> + * FIXME_LATER: handle this gracefully
>
> We should do this right now. It's not that hard.


Hrm. So if another task owns the futex then POSIX says undefined
scheduling be should be expected. So failing here seems like a good way
to encourage proper usage of the cond vars. Or can you think of a sane
scenario in which the outer mutex should not be held prior to the call
to cond_signal() or cond_broadcast()? If you are thinking outside of
glibc cond vars, then there are likely other gaps with this patch, such
as only being able to requeue from non-pi to pi futexes.

That being said... now that this is more or less implemented, I'm not
sure there is really anything I'd need to do different to support this.
I'll pour over this for a while and see if there are any gotchas that
I'm missing right now.


>
>> + */
>> + pid = curval & FUTEX_TID_MASK;
>> + if (pid && pid != task_pid_vnr(current))
>> + return -EMORON;
>
> Though it's a pity that we lose EMORON :)


Does that mean I'll have to forfeit the nickname bestowed upon me by
LWN? I've received numerous comments on this line - all of them in
favor. Apparently lots of kernel developers are eager for a way to
slyly mock users from within the context of the kernel. ;-)


>
>> + /*
>> + * Current should own pifutex, but it could be uncontended. Here we
>> + * either take the lock for top_waiter or set the FUTEX_WAITERS bit.
>> + * The pi_state is also looked up, but we don't care about the return
>> + * code as we'll have to look that up during requeue for each waiter
>> + * anyway.
>> + */
>> + ret = futex_lock_pi_atomic(pifutex, hb2, key2, ps, top_waiter->task);
>
> Why do we ignore the retunr value here ???


-EMORON

We should just return ret if < 0 and let futex_requeue() process it.
This is basically what we did before, but it wasn't obvious since the
code just moved on to testing !pi. There was the potential to lose the
return code however, so that is now fixed. Running with this fix, will
appear in V6.


>
>> + /*
>> + * At this point the top_waiter has either taken the pifutex or it is
>> + * waiting on it. If the former, then the pi_state will not exist
>> yet,
>> + * look it up one more time to ensure we have a reference to it.
>> + */
>> + if (!ps /* FIXME && some ret values in here I think ... */)
>> + ret = lookup_pi_state(curval, hb2, key2, ps);
>> + return ret;
>> +}
>> +
>> +static inline
>> +void requeue_futex(struct futex_q *q, struct futex_hash_bucket *hb1,
>> + struct futex_hash_bucket *hb2, union futex_key *key2)
>> +{
>> +
>> + /*
>> + * If key1 and key2 hash to the same bucket, no need to
>> + * requeue.
>> + */
>> + if (likely(&hb1->chain != &hb2->chain)) {
>> + plist_del(&q->list, &hb1->chain);
>> + plist_add(&q->list, &hb2->chain);
>> + q->lock_ptr = &hb2->lock;
>> +#ifdef CONFIG_DEBUG_PI_LIST
>> + q->list.plist.lock = &hb2->lock;
>> +#endif
>> + }
>> + get_futex_key_refs(key2);
>> + q->key = *key2;
>> +}
>
> Can you please split out such changes to the existing requeue code
> into a separate patch ?


Should have from the start, updated for V6.


>
>> +/*
>> + * Requeue all waiters hashed on one physical page to another physical page.
>> + * In the requeue_pi case, either takeover uaddr2 or set FUTEX_WAITERS and
>> + * setup the pistate. FUTEX_REQUEUE_PI only supports requeueing from a
>> non-pi
>> + * futex to a pi futex.
>> */
>> static int futex_requeue(u32 __user *uaddr1, int fshared, u32 __user *uaddr2,
>> - int nr_wake, int nr_requeue, u32 *cmpval)
>> + int nr_wake, int nr_requeue, u32 *cmpval,
>> + int requeue_pi)
>> {
>> union futex_key key1 = FUTEX_KEY_INIT, key2 = FUTEX_KEY_INIT;
>> struct futex_hash_bucket *hb1, *hb2;
>> struct plist_head *head1;
>> struct futex_q *this, *next;
>> - int ret, drop_count = 0;
>> + u32 curval2;
>> + struct futex_pi_state *pi_state = NULL;
>> + int drop_count = 0, attempt = 0, task_count = 0, ret;
>> +
>> + if (requeue_pi && refill_pi_state_cache())
>> + return -ENOMEM;
>
> Why do you want to refill the pi_state_cache of current ? current is
> not going to wait for the pi_futex.


I may need to allocate a pi_state during
futex_requeue_pi_init()->futex_lock_pi_atomic()->lookup_pi_state().
Making use of this current->pi_cache seemed like the best way to do it.
Is there a more appropriate place to store a preallocated pi_state?


>
>> retry:
>> + if (pi_state != NULL) {
>> + free_pi_state(pi_state);
>> + pi_state = NULL;
>> + }
>> +
>> ret = get_futex_key(uaddr1, fshared, &key1);
>> if (unlikely(ret != 0))
>> goto out;
>> @@ -1023,12 +1158,15 @@ retry:
>> if (hb1 != hb2)
>> spin_unlock(&hb2->lock);
>>
>> + put_futex_key(fshared, &key2);
>> + put_futex_key(fshared, &key1);
>> +
>
> Isn't this a reference leak in mainline as well, which needs to be
> fixed separate ?


Huh, I didn't intend to do that. I'll submit separately.


>
>> ret = get_user(curval, uaddr1);
>>
>> if (!ret)
>> goto retry;
>>
>> - goto out_put_keys;
>> + goto out;
>> }
>> if (curval != *cmpval) {
>> ret = -EAGAIN;
>> @@ -1036,32 +1174,104 @@ retry:
>> }
>> }
>>
>> + if (requeue_pi) {
>> + /* FIXME: should we handle the no waiters case special? */
>
> If there are no waiters then we should drop out here right away. Why
> should we do more if we know already that there is nothing to do.


Well, looking at this again, I don't think it's worth testing this
separately. As it is we'll call get_futex_value_locked() then move to
plist_fort_each_entry_safe() since there are no waiters to iterate over,
and exit. So I think this is handled adequately and I'll just axe the
comment and move on.


>
>> + ret = futex_requeue_pi_init(uaddr2, hb1, hb2, &key1, &key2,
>> + &pi_state);
>> +
>> + if (!ret)
>> + ret = get_futex_value_locked(&curval2, uaddr2);
>> +
>> + switch (ret) {
>> + case 0:
>> + break;
>> + case 1:
>> + /* we got the lock */
>> + ret = 0;
>> + break;
>> + case -EFAULT:
>> + /*
>> + * We handle the fault here instead of in
>> + * futex_requeue_pi_init because we have to reacquire
>> + * both locks to avoid deadlock.
>> + */
>> + spin_unlock(&hb1->lock);
>> + if (hb1 != hb2)
>> + spin_unlock(&hb2->lock);
>
> Can we please put that sequeuence into an inline function
> e.g. double_unlock_hb(). We have at least 5 instances of it.


Hrm... looking at this it seems we already have a double_lock_hb() which
uses the pointer values to decide which it order it locks them in.
Using that in combination with this name-order unlock looks like an ABBA
deadlock waiting to happen to me.

futex: double_unlock_hb() coming up in my futex-fixes branch which
contains the other broken out non requeue_pi related patches and I'll
rebase this series on that.


>
>> + put_futex_key(fshared, &key2);
>> + put_futex_key(fshared, &key1);
>> +
>> + if (attempt++) {
>> + ret = futex_handle_fault((unsigned
>> long)uaddr2,
>> + attempt);
>> + if (ret)
>> + goto out;
>> + goto retry;
>> + }
>> +
>> + ret = get_user(curval2, uaddr2);
>> +
>> + if (!ret)
>> + goto retry;
>> + goto out;
>> + case -EAGAIN:
>> + /* The owner was exiting, try again. */
>> + spin_unlock(&hb1->lock);
>> + if (hb1 != hb2)
>> + spin_unlock(&hb2->lock);
>> + put_futex_key(fshared, &key2);
>> + put_futex_key(fshared, &key1);
>> + cond_resched();
>> + goto retry;
>> + default:
>> + goto out_unlock;
>> + }
>> + }
>> +
>> head1 = &hb1->chain;
>> plist_for_each_entry_safe(this, next, head1, list) {
>> if (!match_futex (&this->key, &key1))
>> continue;
>> - if (++ret <= nr_wake) {
>> - wake_futex(this);
>> + /*
>> + * Regardless of if we are waking or requeueing, we need to
>> + * prepare the waiting task to take the rt_mutex in the
>> + * requeue_pi case. If we gave the lock to the top_waiter in
>> + * futex_requeue_pi_init() then don't enqueue that task as a
>> + * waiter on the rt_mutex (it already owns it).
>> + */
>> + if (requeue_pi &&
>> + ((curval2 & FUTEX_TID_MASK) != task_pid_vnr(this->task)))
>> {
>
> Why don't you check the owner of the rtmutex, which we installed
> already ? Then we can drop this curval2 business altogether.


Because we can own the futex without an rt_mutex existing. The first
time through this loop we consider the top_waiter of uaddr1. If uaddr2
had no owner, then it is now owned by the top_waiter, but since there
are still no waiters on uaddr2, the rt_mutex has not been initialized.
Once we know we are not the owner, then we also know the
pi_state->pi_mutex exists.

Hrm.... I suppose a test for "!this->pi_state" is equivalent to
"(curval2 & FUTEX_TID_MASK) != task_pid_vnr(this->task)" then isn't
it...? Seems like a rather fragile test though doesn't it?

We wouldn't save much by eliminating the curval2 here though since
futex_requeue_pi_init needs it for the atomic_lock, so we still need the
fault logic... unfortunatley.

Thoughts?


>
>> + atomic_inc(&pi_state->refcount);
>> + this->pi_state = pi_state;
>> + ret = rt_mutex_start_proxy_lock(&pi_state->pi_mutex,
>> + this->rt_waiter,
>> + this->task, 1);
>> + if (ret)
>> + goto out_unlock;
>> + }
>> +
>> + if (++task_count <= nr_wake) {
>> + if (requeue_pi) {
>> + /*
>> + * In the case of requeue_pi we need to know
>> if
>> + * we were requeued or not, so do the requeue
>> + * regardless if we are to wake this task.
>> + */
>> + requeue_futex(this, hb1, hb2, &key2);
>> + drop_count++;
>> + /* FIXME: look into altering wake_futex so we
>> + * can use it (we can't null the lock_ptr) */
>
> Err, no. wake_futex() does a plist_del(&q->list, &q->list.plist);
> What's wrong with using wake_up() ?


Nothing I guess, just trying to use the existing futex infrastructure as
much as possible without implementing a bunch of lower level calls
inline. But if everyone is alright with this approach, we'll leave it be.


>
>> + wake_up(&this->waiter);
>> + } else
>> + wake_futex(this);
>> } else {
>> - /*
>> - * If key1 and key2 hash to the same bucket, no need
>> to
>> - * requeue.
>> - */
>> - if (likely(head1 != &hb2->chain)) {
>> - plist_del(&this->list, &hb1->chain);
>> - plist_add(&this->list, &hb2->chain);
>> - this->lock_ptr = &hb2->lock;
>> -#ifdef CONFIG_DEBUG_PI_LIST
>> - this->list.plist.lock = &hb2->lock;
>> -#endif
>> - }
>> - this->key = key2;
>> - get_futex_key_refs(&key2);
>> + requeue_futex(this, hb1, hb2, &key2);
>> drop_count++;
>> -
>> - if (ret - nr_wake >= nr_requeue)
>> - break;
>> }
>> +
>> + if (task_count - nr_wake >= nr_requeue)
>> + break;
>> }
>>
>> out_unlock:
>> @@ -1073,12 +1283,13 @@ out_unlock:
>> while (--drop_count >= 0)
>> drop_futex_key_refs(&key1);
>>
>> -out_put_keys:
>> put_futex_key(fshared, &key2);
>> out_put_key1:
>> put_futex_key(fshared, &key1);
>> out:
>> - return ret;
>> + if (pi_state != NULL)
>> + free_pi_state(pi_state);
>> + return ret ? ret : task_count;
>> }
>>
>> /* The key must be already stored in q->key. */
>> @@ -1180,6 +1391,7 @@ retry:
>> */
>> static void unqueue_me_pi(struct futex_q *q)
>> {
>> + /* FIXME: hitting this warning for requeue_pi */
>
> And why ?


Old comment I believe. I no longer see this in my testing. Removed.


>
>> WARN_ON(plist_node_empty(&q->list));
>> plist_del(&q->list, &q->list.plist);
>>
>> @@ -1302,6 +1514,8 @@ handle_fault:
>> #define FLAGS_CLOCKRT 0x02
>>
>> static long futex_wait_restart(struct restart_block *restart);
>> +static long futex_wait_requeue_pi_restart(struct restart_block *restart);
>> +static long futex_lock_pi_restart(struct restart_block *restart);
>>
>> /* finish_futex_lock_pi - post lock pi_state and corner case management
>> * @uaddr: the user address of the futex
>> @@ -1466,6 +1680,9 @@ retry:
>>
>> hb = queue_lock(&q);
>>
>> + /* dvhart FIXME: we access the page before it is queued... obsolete
>> + * comments? */
>> +
>
> I think so.
>


See Figure 1


>> /*
>> * Access the page AFTER the futex is queued.
>> * Order is important:
>> @@ -1529,6 +1746,7 @@ retry:
>> restart->fn = futex_wait_restart;
>> restart->futex.uaddr = (u32 *)uaddr;
>> restart->futex.val = val;
>> + restart->futex.has_timeout = 1;
>> restart->futex.time = abs_time->tv64;
>> restart->futex.bitset = bitset;
>> restart->futex.flags = 0;
>> @@ -1559,12 +1777,16 @@ static long futex_wait_restart(struct restart_block
>> *restart)
>> u32 __user *uaddr = (u32 __user *)restart->futex.uaddr;
>> int fshared = 0;
>> ktime_t t;
>> + ktime_t *tp = NULL;
>
> One line please
>


Done in broken out patch for the FLAGS_HAS_TIMEOUT


>> - t.tv64 = restart->futex.time;
>> + if (restart->futex.has_timeout) {
>> + t.tv64 = restart->futex.time;
>> + tp = &t;
>> + }
>> restart->fn = do_no_restart_syscall;
>> if (restart->futex.flags & FLAGS_SHARED)
>> fshared = 1;
>> - return (long)futex_wait(uaddr, fshared, restart->futex.val, &t,
>> + return (long)futex_wait(uaddr, fshared, restart->futex.val, tp,
>> restart->futex.bitset,
>> restart->futex.flags & FLAGS_CLOCKRT);
>> }
>> @@ -1621,6 +1843,7 @@ retry_unlocked:
>> * complete.
>> */
>> queue_unlock(&q, hb);
>> + /* FIXME: need to put_futex_key() ? */
>
> Yes. Needs to be fixed in mainline as well.


Added to futex-fixes branch.


>
>> cond_resched();
>> goto retry;
>> default:
>> @@ -1653,16 +1876,6 @@ retry_unlocked:
>>
>> goto out;
>>
>> -out_unlock_put_key:
>> - queue_unlock(&q, hb);
>> -
>> -out_put_key:
>> - put_futex_key(fshared, &q.key);
>> -out:
>> - if (to)
>> - destroy_hrtimer_on_stack(&to->timer);
>> - return ret;
>> -
>> uaddr_faulted:
>> /*
>> * We have to r/w *(int __user *)uaddr, and we have to modify it
>> @@ -1685,6 +1898,34 @@ uaddr_faulted:
>> goto retry;
>>
>> goto out;
>> +
>> +out_unlock_put_key:
>> + queue_unlock(&q, hb);
>> +
>> +out_put_key:
>> + put_futex_key(fshared, &q.key);
>> +
>> +out:
>> + if (to)
>> + destroy_hrtimer_on_stack(&to->timer);
>> + return ret;
>> +
>> +}
>> +
>> +static long futex_lock_pi_restart(struct restart_block *restart)
>> +{
>> + u32 __user *uaddr = (u32 __user *)restart->futex.uaddr;
>> + ktime_t t;
>> + ktime_t *tp = NULL;
>> + int fshared = restart->futex.flags & FLAGS_SHARED;
>> +
>> + if (restart->futex.has_timeout) {
>> + t.tv64 = restart->futex.time;
>> + tp = &t;
>> + }
>> + restart->fn = do_no_restart_syscall;
>> +
>> + return (long)futex_lock_pi(uaddr, fshared, restart->futex.val, tp, 0);
>> }
>>
>> /*
>> @@ -1797,6 +2038,316 @@ pi_faulted:
>> }
>>
>> /*
>> + * futex_wait_requeue_pi - wait on futex1 (uaddr) and take the futex2
>> (uaddr2)
>> + * before returning
>> + * @uaddr: the futex we initialyl wait on (possibly non-pi)
>> + * @fshared: whether the futexes are shared (1) or not (0). They must be the
>> + * same type, no requeueing from private to shared, etc.
>> + * @val: the expected value of uaddr
>> + * @abs_time: absolute timeout
>> + * @bitset: FIXME ???
>> + * @clockrt: whether to use CLOCK_REALTIME (1) or CLOCK_MONOTONIC (0)
>> + * @uaddr2: the pi futex we will take prior to returning to user-space
>> + *
>> + * The caller will wait on futex1 (uaddr) and will be requeued by
>> + * futex_requeue() to futex2 (uaddr2) which must be PI aware. Normal wakeup
>> + * will wake on futex2 and then proceed to take the underlying rt_mutex prior
>> + * to returning to userspace. This ensures the rt_mutex maintains an owner
>> + * when it has waiters. Without one we won't know who to boost/deboost, if
>> + * there was a need to.
>> + *
>> + * We call schedule in futex_wait_queue_me() when we enqueue and return there
>> + * via the following:
>> + * 1) signal
>> + * 2) timeout
>> + * 3) wakeup on the first futex (uaddr)
>> + * 4) wakeup on the second futex (uaddr2, the pi futex) after a requeue
>> + *
>> + * If 1 or 2, we need to check if we got the rtmutex, setup the pi_state, or
>> + * were enqueued on the rt_mutex via futex_requeue_pi_init() just before the
>> + * signal or timeout arrived. If so, we need to clean that up. Note: the
>> + * setting of FUTEX_WAITERS will be handled when the owner unlocks the
>> + * rt_mutex.
>> + *
>> + * If 3, userspace wrongly called FUTEX_WAKE on the first futex (uaddr)
>> rather
>> + * than using the FUTEX_REQUEUE_PI call with nr_requeue=0. Return -EINVAL.
>> + *
>> + * If 4, we may then block on trying to take the rt_mutex and return via:
>> + * 5) signal
>> + * 6) timeout
>> + * 7) successful lock
>> + *
>> + * If 5, we setup a restart_block with futex_lock_pi() as the function.
>> + *
>> + * If 6, we cleanup and return with -ETIMEDOUT.
>> + *
>> + * TODO:
>> + * o once we have the all the return points correct, we need to collect
>> + * common code into exit labels.
>> + *
>> + * Returns:
>> + * 0 Success
>> + * -EFAULT For various faults
>> + * -EWOULDBLOCK uaddr has an unexpected value (it
>> changed
>> + * before we got going)
>> + * -ETIMEDOUT timeout (from either wait on futex1 or locking
>> + * futex2)
>> + * -ERESTARTSYS Signal received (during wait on
>> futex1) with no
>> + * timeout
>> + * -ERESTART_RESTARTBLOCK Signal received (during wait on futex1)
>> + * -RESTARTNOINTR Signal received (during lock of futex2)
>> + * -EINVAL No bitset, woke via FUTEX_WAKE, etc.
>> + *
>> + * May also passthrough the follpowing return codes (not exhaustive):
>> + * -EPERM see get_futex_key()
>> + * -EACCESS see get_futex_key()
>> + * -ENOMEM see get_user_pages()
>> + *
>> + */
>> +static int futex_wait_requeue_pi(u32 __user *uaddr, int fshared,
>> + u32 val, ktime_t *abs_time, u32 bitset,
>> + int clockrt, u32 __user *uaddr2)
>> +{
>> + struct futex_hash_bucket *hb;
>> + struct futex_q q;
>> + union futex_key key2 = FUTEX_KEY_INIT;
>> + u32 uval;
>> + struct rt_mutex *pi_mutex;
>> + struct rt_mutex_waiter rt_waiter;
>> + struct hrtimer_sleeper timeout, *to = NULL;
>> + int requeued = 0;
>> + int ret;
>
> All ints in a line please. And please order the lines in descending
> line length. Makes it much easier to read.


OK, done.


>
>> + if (!bitset)
>> + return -EINVAL;
>> +
>> + if (abs_time) {
>> + unsigned long slack;
>
> Missing new line. Also this is nonsense. A rt task should have set
> its timer_slack_ns to 0, so we can just use current->timer_slack_ns
> in the hrtimer_set_expires_range_ns(). If the timer_slack_ns is
> random for rt_tasks then we need to fix this in general.
>


Added a WARN_ON(!current->timer_slack_ns) to my debug patch and used
that value here. I'll create a patch to futex-fixes to address the
existing calls that I copied this approach from.


>> + to = &timeout;
>> + slack = current->timer_slack_ns;
>> + if (rt_task(current))
>> + slack = 0;
>> + hrtimer_init_on_stack(&to->timer, clockrt ? CLOCK_REALTIME :
>> + CLOCK_MONOTONIC, HRTIMER_MODE_ABS);
>> + hrtimer_init_sleeper(to, current);
>> + hrtimer_set_expires_range_ns(&to->timer, *abs_time, slack);
>> + }
>> +
>> + /*
>> + * The waiter is allocated on our stack, manipulated by the requeue
>> + * code while we sleep on the initial futex (uaddr).
>> + */
>> + debug_rt_mutex_init_waiter(&rt_waiter);
>> + rt_waiter.task = NULL;
>> +
>> + q.pi_state = NULL;
>> + q.bitset = bitset;
>> + q.rt_waiter = &rt_waiter;
>> +
>> +retry:
>> + q.key = FUTEX_KEY_INIT;
>> + ret = get_futex_key(uaddr, fshared, &q.key);
>> + if (unlikely(ret != 0))
>> + goto out;
>> +
>> + ret = get_futex_key(uaddr2, fshared, &key2);
>> + if (unlikely(ret != 0)) {
>> + drop_futex_key_refs(&q.key);
>> + goto out;
>> + }
>> +
>> + hb = queue_lock(&q);
>> +
>> + /* dvhart FIXME: we access the page before it is queued... obsolete
>> + * comments? */
>
> Yes. The reason is the serializing via hb->lock.


Ack, thanks. (and see Figure 1)


>
>> + /*
>> + * Access the page AFTER the futex is queued.
>> + * Order is important:
>> + *
>> + * Userspace waiter: val = var; if (cond(val)) futex_wait(&var,
>> val);
>> + * Userspace waker: if (cond(var)) { var = new; futex_wake(&var); }
>> + *
>> + * The basic logical guarantee of a futex is that it blocks ONLY
>> + * if cond(var) is known to be true at the time of blocking, for
>> + * any cond. If we queued after testing *uaddr, that would open
>> + * a race condition where we could block indefinitely with
>> + * cond(var) false, which would violate the guarantee.
>> + *
>> + * A consequence is that futex_wait() can return zero and absorb
>> + * a wakeup when *uaddr != val on entry to the syscall. This is
>> + * rare, but normal.
>> + *
>> + * for shared futexes, we hold the mmap semaphore, so the mapping
>> + * cannot have changed since we looked it up in get_futex_key.
>> + */
>> + ret = get_futex_value_locked(&uval, uaddr);
>> +
>> + if (unlikely(ret)) {
>> + queue_unlock(&q, hb);
>> + put_futex_key(fshared, &q.key);
>> +
>> + ret = get_user(uval, uaddr);
>> +
>> + if (!ret)
>> + goto retry;
>> + goto out;
>> + }
>> + ret = -EWOULDBLOCK;
>> +
>> + /* Only actually queue if *uaddr contained val. */
>> + if (uval != val) {
>> + queue_unlock(&q, hb);
>> + put_futex_key(fshared, &q.key);
>> + goto out;
>> + }
>> +
>> + /* queue_me and wait for wakeup, timeout, or a signal. */
>> + futex_wait_queue_me(hb, &q, to);
>> +
>> + /*
>> + * Upon return from futex_wait_queue_me, we no longer hold the hb
>> lock,
>> + * but do still hold a key reference. unqueue_me* will drop a key
>> + * reference to q.key.
>> + */
>> +
>> + requeued = match_futex(&q.key, &key2);
>> + drop_futex_key_refs(&key2);
>
> Why do we drop the ref to key2 here ? What if we were requeued ?


Hrm. I remember adding this and it being necessary - but I can't see it
know. I suspect I changed up where I get the futex key or the exit
logic, making this unecessary (and wrong). Nice catch. Removed.


>
>> + if (!requeued) {
>> + /* Handle wakeup from futex1 (uaddr). */
>> + ret = unqueue_me(&q);
>> + if (unlikely(ret)) { /* signal */
>
> Please put the comment into a separate line if a comment is
> neccesary for the condition. This one is pretty useless.
>


Removed :)


> Also the logic is completely backwards here. It has to be the same
> as in futex_wait()


Is it broken? I believe the reason I wrote it as I did was because the
ret == 0 case should be very rare here. Infact, it is an error. The
only reason we should wake on the first futex (uaddr) is on a signal or
a timeout, otherwise the user most likely paired a FUTEX_WAKE_REQUEUE_PI
with FUTEX_WAKE, instead of FUTEX_REQUEUE_PI.

I can change it around if you have a strong preference though (or if
it's broken in some way I'm missing of course).


>
> if (!unqueue_me()) {
> handle_futex_wakeup();
> } else {
> if (timeout happened) {
> handle_timeout;
> } else {
> prepare_restart();
> }
> }
>
>> + /*
>> + * We expect signal_pending(current), but another
>> + * thread may have handled it for us already.
>> + */
>> + if (!abs_time) {
>> + ret = -ERESTARTSYS;
>> + } else {
>> + struct restart_block *restart;
>> + restart =
>> &current_thread_info()->restart_block;
>> + restart->fn = futex_wait_requeue_pi_restart;
>> + restart->futex.uaddr = (u32 *)uaddr;
>> + restart->futex.val = val;
>> + restart->futex.has_timeout = 1;
>> + restart->futex.time = abs_time->tv64;
>> + restart->futex.bitset = bitset;
>> + restart->futex.flags = 0;
>> + restart->futex.uaddr2 = (u32 *)uaddr2;
>> +
>> + if (fshared)
>> + restart->futex.flags |= FLAGS_SHARED;
>> + if (clockrt)
>> + restart->futex.flags |= FLAGS_CLOCKRT;
>> + ret = -ERESTART_RESTARTBLOCK;
>> + }
>> + } else if (to && !to->task) {/* timeout */
>> + ret = -ETIMEDOUT;
>> + } else {
>> + /*
>> + * We woke on uaddr, so userspace probably paired a
>> + * FUTEX_WAKE with FUTEX_WAIT_REQUEUE_PI, which is not
>> + * valid.
>> + */
>> + ret = -EINVAL;
>> + }
>> + goto out;
>> + }
>> +
>> + /* Handle wakeup from rt_mutex of futex2 (uaddr2). */
>> +
>> + /* FIXME: this test is REALLY scary... gotta be a better way...
>> + * If the pi futex was uncontended, futex_requeue_pi_init() gave us
>> + * the lock.
>> + */
>
> Didn't we take the rtmutex as well ??


Not necessarily. If the pi futex was ownerless and there was only one
task requeued, then there is no rtmutex as it is created by the first
waiter.

It is possible that other tasks have since tried to take this mutex and
now there is an rtmutex, but we have to catch the case where there might
not be one.


>
>> + if (!q.pi_state) {
>> + ret = 0;
>> + goto out;
>> + }
>> + pi_mutex = &q.pi_state->pi_mutex;
>> +
>> + ret = rt_mutex_finish_proxy_lock(pi_mutex, to, &rt_waiter, 1);
>
>
> Eeek. We got a wakeup _after_ we have been requeued. So now you go
> back to sleep on the pi_mutex ?


Yes. The futex_requeue() calls wake_up() which allows the waiter here
to wake and continue processing it's half of the rt_mutex acquisition
loop that I split out of rt_mutex_slowlock() in
rt_mutex_finish_proxy_lock().


>
>> + debug_rt_mutex_free_waiter(&waiter);
>> +
>> + if (ret) {
>> + if (get_user(uval, uaddr2))
>> + ret = -EFAULT;
>
> Why drop we out here w/o retrying ?
>


At this point we have already been requeued and woken. There is no need
to continue with futex_wait_requeue_pi(), we can just call
futex_lock_pi() directly. However, I see that we don't do that, we just
return -EFAULT to userspace. I'll spend some more time on this exit
path and see if I can make it more robust, as well as easier to follow.


> Also why do we have a separate handling of "ret" here ? This logic
> is fundamentally different from futex_lock_pi().


At this point ret can be:

0: we don't get here
EINTR: We need to restart
ETIMEDOUT: We should return that to userspace (well we should... looks
like that isn't handled quite right either...)
EFAULT: I guess I felt if we start faulting at this point we're in
trouble, but you are correct, it should retry. I'll add this
in.
EDEADLOCK: We should return that to userspace (same story as
ETIMEDOUT)

So... plenty of dope jokes are warranted here. I'll clean this error
path up. It's just broken.


>> + if (ret == -EINTR) {
>> + /*
>> + * We've already been requeued and enqueued on the
>> + * rt_mutex, so restart by calling futex_lock_pi()
>> + * directly, rather then returning to this function.
>> + */
>> + struct restart_block *restart;
>> + restart = &current_thread_info()->restart_block;
>> + restart->fn = futex_lock_pi_restart;
>> + restart->futex.uaddr = (u32 *)uaddr2;
>> + restart->futex.val = uval;
>> + if (abs_time) {
>> + restart->futex.has_timeout = 1;
>> + restart->futex.time = abs_time->tv64;
>> + } else
>> + restart->futex.has_timeout = 0;
>> + restart->futex.flags = 0;
>> +
>> + if (fshared)
>> + restart->futex.flags |= FLAGS_SHARED;
>> + if (clockrt)
>> + restart->futex.flags |= FLAGS_CLOCKRT;
>> + ret = -ERESTART_RESTARTBLOCK;
>> + }
>> + }
>> +
>> + spin_lock(q.lock_ptr);
>> + ret = finish_futex_lock_pi(uaddr, fshared, &q, ret);
>> +
>> + /* Unqueue and drop the lock. */
>> + unqueue_me_pi(&q);
>> +
>> +out:
>> + if (to) {
>> + hrtimer_cancel(&to->timer);
>> + destroy_hrtimer_on_stack(&to->timer);
>> + }
>> + if (requeued) {
>> + /* We were requeued, so we now have two reference to key2,
>> + * unqueue_me_pi releases one of them, we must release the
>> + * other. */
>> + drop_futex_key_refs(&key2);
>> + if (ret) {
>
> This whole "ret" logic is confusing.


I agree. As I said above, I'll if I can think of something more robust
as well as easier to follow. If you have some ideas, please share.

Many thanks for the review. I've addressed most of the above while
responding to this review, but I have a few items that will take me some
time to address. I hope to have V6 out and tested this week.

--
Darren

>
>> + futex_requeue_pi_cleanup(&q);
>> + if (get_user(uval, uaddr2))
>> + ret = -EFAULT;
>> + if (ret != -ERESTART_RESTARTBLOCK && ret != -EFAULT)
>> + ret = futex_lock_pi(uaddr2, fshared, uval,
>> + abs_time, 0);
>> + }
>> + }
>> + return ret;
>> +}
>
> Thanks,
>
> tglx


--
Darren Hart
IBM Linux Technology Center
Real-Time Linux Team

2009-03-10 13:40:40

by Thomas Gleixner

[permalink] [raw]
Subject: Re: [TIP][RFC 6/7] futex: add requeue_pi calls

On Mon, 9 Mar 2009, Darren Hart wrote:
> Thomas Gleixner wrote:
> > > + int has_timeout;
> >
> > Hmm. time == 0 perhaps or just add another flag bit to the flags field ?
>
> I've created FLAGS_HAS_TIMEOUT and use that instead. I opted for this
> approach instead of using time==0 as that seemed like it could be confused for
> an expired timer.

Yup. Preferred solution.

> > > + /*
> > > + * The pifutex has an owner, make sure it's us, if not complain
> > > + * to userspace.
> > > + * FIXME_LATER: handle this gracefully
> >
> > We should do this right now. It's not that hard.
>
> Hrm. So if another task owns the futex then POSIX says undefined scheduling be
> should be expected. So failing here seems like a good way to encourage proper
> usage of the cond vars.

Non predicatable != invalid usage

> Or can you think of a sane scenario in which the
> outer mutex should not be held prior to the call to cond_signal() or
> cond_broadcast()? If you are thinking outside of glibc cond vars, then there
> are likely other gaps with this patch, such as only being able to requeue from
> non-pi to pi futexes.

No, I think about glibc and the madness of existing code. Code just
relies on that and it works with the current glibc implementation so
making the enhanced requeue_pi behave different will break existing
applications and has to be considered as a regression.

> That being said... now that this is more or less implemented, I'm not sure
> there is really anything I'd need to do different to support this. I'll pour
> over this for a while and see if there are any gotchas that I'm missing right
> now.

I think you have everything what you need already except of some minor
tweaks.

> >
> > > + */
> > > + pid = curval & FUTEX_TID_MASK;
> > > + if (pid && pid != task_pid_vnr(current))
> > > + return -EMORON;
> >
> > Though it's a pity that we lose EMORON :)
>
>
> Does that mean I'll have to forfeit the nickname bestowed upon me by LWN?
> I've received numerous comments on this line - all of them in favor.
> Apparently lots of kernel developers are eager for a way to slyly mock users
> from within the context of the kernel. ;-)

Send a separate patch then. You have my Acked-by :)

> > > +/*
> > > + * Requeue all waiters hashed on one physical page to another physical
> > > page.
> > > + * In the requeue_pi case, either takeover uaddr2 or set FUTEX_WAITERS
> > > and
> > > + * setup the pistate. FUTEX_REQUEUE_PI only supports requeueing from a
> > > non-pi
> > > + * futex to a pi futex.
> > > */
> > > static int futex_requeue(u32 __user *uaddr1, int fshared, u32 __user
> > > *uaddr2,
> > > - int nr_wake, int nr_requeue, u32 *cmpval)
> > > + int nr_wake, int nr_requeue, u32 *cmpval,
> > > + int requeue_pi)
> > > {
> > > union futex_key key1 = FUTEX_KEY_INIT, key2 = FUTEX_KEY_INIT;
> > > struct futex_hash_bucket *hb1, *hb2;
> > > struct plist_head *head1;
> > > struct futex_q *this, *next;
> > > - int ret, drop_count = 0;
> > > + u32 curval2;
> > > + struct futex_pi_state *pi_state = NULL;
> > > + int drop_count = 0, attempt = 0, task_count = 0, ret;
> > > +
> > > + if (requeue_pi && refill_pi_state_cache())
> > > + return -ENOMEM;
> >
> > Why do you want to refill the pi_state_cache of current ? current is
> > not going to wait for the pi_futex.
>
>
> I may need to allocate a pi_state during
> futex_requeue_pi_init()->futex_lock_pi_atomic()->lookup_pi_state(). Making use
> of this current->pi_cache seemed like the best way to do it. Is there a more
> appropriate place to store a preallocated pi_state?

Hmm. You are right. That's the best way to have one handy.

> > > + if (requeue_pi &&
> > > + ((curval2 & FUTEX_TID_MASK) != task_pid_vnr(this->task)))
> > > {
> >
> > Why don't you check the owner of the rtmutex, which we installed
> > already ? Then we can drop this curval2 business altogether.
>
>
> Because we can own the futex without an rt_mutex existing. The first time
> through this loop we consider the top_waiter of uaddr1. If uaddr2 had no
> owner, then it is now owned by the top_waiter, but since there are still no
> waiters on uaddr2, the rt_mutex has not been initialized. Once we know we are
> not the owner, then we also know the pi_state->pi_mutex exists.
>
> Hrm.... I suppose a test for "!this->pi_state" is equivalent to
> "(curval2 & FUTEX_TID_MASK) != task_pid_vnr(this->task)" then isn't it...?
> Seems like a rather fragile test though doesn't it?
>
> We wouldn't save much by eliminating the curval2 here though since
> futex_requeue_pi_init needs it for the atomic_lock, so we still need the fault
> logic... unfortunatley.
>
> Thoughts?

In futex_requeue_pi_init() we figure already out who owns the
futex, right ? So why don't we capture that information ?

> > > + if (abs_time) {
> > > + unsigned long slack;
> >
> > Missing new line. Also this is nonsense. A rt task should have set
> > its timer_slack_ns to 0, so we can just use current->timer_slack_ns
> > in the hrtimer_set_expires_range_ns(). If the timer_slack_ns is
> > random for rt_tasks then we need to fix this in general.
> >
>
>
> Added a WARN_ON(!current->timer_slack_ns) to my debug patch and used that
> value here. I'll create a patch to futex-fixes to address the existing calls
> that I copied this approach from.

Thanks.

> > Also the logic is completely backwards here. It has to be the same
> > as in futex_wait()
>
>
> Is it broken? I believe the reason I wrote it as I did was because the ret ==
> 0 case should be very rare here. Infact, it is an error. The only reason we
> should wake on the first futex (uaddr) is on a signal or a timeout, otherwise
> the user most likely paired a FUTEX_WAKE_REQUEUE_PI with FUTEX_WAKE, instead
> of FUTEX_REQUEUE_PI.
>
> I can change it around if you have a strong preference though (or if it's
> broken in some way I'm missing of course).

The 0 case might be rare, but it is definitely not an error.

futex_wait_queue_me(hb, &q, to);
queue_me();
drops hb->lock;

unqueue_me() is called w/o the hb->lock held, so we can race here
against a requeue_pi wakeup.

So we need to check the unqueue_me() == 0 case first and check
whether we got requeued.

The requeue check is done unlocked, so the requeue and wakeup
might happen between the check and unqueue_me().

But this needs more thoughts about locking and possible races. The
whole code sequence here screams "can you debug the nasty races here ?"

> > > + /* FIXME: this test is REALLY scary... gotta be a better way...
> > > + * If the pi futex was uncontended, futex_requeue_pi_init() gave us
> > > + * the lock.
> > > + */
> >
> > Didn't we take the rtmutex as well ??
>
>
> Not necessarily. If the pi futex was ownerless and there was only one task
> requeued, then there is no rtmutex as it is created by the first waiter.
>
> It is possible that other tasks have since tried to take this mutex and now
> there is an rtmutex, but we have to catch the case where there might not be
> one.

Fair enough.

> >
> > > + if (!q.pi_state) {

Again. This check might be racy.

> > > + ret = 0;
> > > + goto out;
> > > + }
> > > + pi_mutex = &q.pi_state->pi_mutex;
> > > +
> > > + ret = rt_mutex_finish_proxy_lock(pi_mutex, to, &rt_waiter, 1);
> >
> >
> > Eeek. We got a wakeup _after_ we have been requeued. So now you go
> > back to sleep on the pi_mutex ?
>
>
> Yes. The futex_requeue() calls wake_up() which allows the waiter here to wake
> and continue processing it's half of the rt_mutex acquisition loop that I
> split out of rt_mutex_slowlock() in rt_mutex_finish_proxy_lock().

Ah, ok. We try to get the rt_mutex there as we are only the designated
owner. If we fail, then we go back to sleep.

Thanks,

tglx