2015-05-01 15:28:29

by Davidlohr Bueso

[permalink] [raw]
Subject: [PATCH v2 0/3] kernel: lockless wake-queues

Hello,

This series is aimed at addressing some of the futex hash bucket
lock hold times by introducing lockless wake-queues for futex_wake.

patch-1: introduces the lockless wake-queue machinery.
patch-2: makes use of patch 1 for futexes.
patch-3: makes use of patch 1 for posix mqueues.

Details in the individual patches.

This was suggested sometime ago by peterz, but due to it potentially
causing spurious wakeups, was never given much consideration. However,
nowadays, so far, I am reliably booting a 45-core box with peterz's
patch to trigger spurious wakeups. While there are drivers out there
that do not play nice with schedule(), they can be fixed over time --
while this is a production problem for some customers). Furthermore,
after some auditing, there really aren't that many, it a lot of cases,
those functions that end up calling schedule are merely wrapped in a
loop, so just not clear at first sight.

Changes from v1:
- Simplify wake_q and more comments, suggested by George Spelvin.
- Made it very clear in the comments that this can cause spurious
wakeups, and how naughty users should handle them. (Linus)
- Added mqueue user as this was also wanted for rt purposes.

Applies on top of Linus' tree 4.0-rc1 (4a152c3913f). Thanks!

sched: lockless wake-queues
futex: lockless wakeups
ipc/mqueue: lockless pipelined wakeups

include/linux/sched.h | 46 ++++++++++++++++++++++++++++++++++++++++++++++
ipc/mqueue.c | 50 +++++++++++++++++++++++++++++++-------------------
kernel/futex.c | 33 +++++++++++++++++----------------
kernel/sched/core.c | 46 ++++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 140 insertions(+), 35 deletions(-)

--
2.1.4


2015-05-01 15:29:15

by Davidlohr Bueso

[permalink] [raw]
Subject: [PATCH 1/3] sched: lockless wake-queues

From: Peter Zijlstra <[email protected]>

This is useful for locking primitives that can effect multiple
wakeups per operation and want to avoid lock internal lock contention
by delaying the wakeups until we've released the lock internal locks.

Alternatively it can be used to avoid issuing multiple wakeups, and
thus save a few cycles, in packet processing. Queue all target tasks
and wakeup once you've processed all packets. That way you avoid
waking the target task multiple times if there were multiple packets
for the same task.

Properties of a wake_q are:
- Lockless, as queue head must reside on the stack.
- Being a queue, maintains wakeup order passed by the callers. This can
be important for otherwise, in scenarios where highly contended locks
could affect any reliance on lock fairness.
- A queued task cannot be added again until it is woken up.

This patch adds the needed infrastructure into the scheduler code
and uses the new wake_list to delay the futex wakeups until
after we've released the hash bucket locks.

Signed-off-by: Peter Zijlstra <[email protected]>
[tweaks, adjustments, comments, etc.]
Signed-off-by: Davidlohr Bueso <[email protected]>
---
include/linux/sched.h | 46 ++++++++++++++++++++++++++++++++++++++++++++++
kernel/sched/core.c | 46 ++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 92 insertions(+)

diff --git a/include/linux/sched.h b/include/linux/sched.h
index 8222ae4..818050c 100644
--- a/include/linux/sched.h
+++ b/include/linux/sched.h
@@ -947,6 +947,50 @@ static inline int cpu_numa_flags(void)
}
#endif

+/*
+ * Wake-queues are lists of tasks with a pending wakeup, whose
+ * callers have already marked the task as woken internally,
+ * and can thus carry on. A common use case is being able to
+ * do the wakeups once the corresponding user lock as been
+ * released.
+ *
+ * We hold reference to each task in the list across the wakeup,
+ * thus guaranteeing that the memory is still valid by the time
+ * the actual wakeups are performed in wake_up_q().
+ *
+ * One per task suffices, because there's never a need for a task to be
+ * in two wake queues simultaneously; it is forbidden to abandon a task
+ * in a wake queue (a call to wake_up_q() _must_ follow), so if a task is
+ * already in a wake queue, the wakeup will happen soon and the second
+ * waker can just skip it.
+ *
+ * The WAKE_Q macro declares and initializes the list head.
+ * wake_up_q() does NOT reinitialize the list; it's expected to be
+ * called near the end of a function, where the fact that the queue is
+ * not used again will be easy to see by inspection.
+ *
+ * Note that this can cause spurious wakeups. schedule() callers
+ * must ensure the call is done inside a loop, confirming that the
+ * wakeup condition has in fact occurred.
+ */
+struct wake_q_node {
+ struct wake_q_node *next;
+};
+
+struct wake_q_head {
+ struct wake_q_node *first;
+ struct wake_q_node **lastp;
+};
+
+#define WAKE_Q_TAIL ((struct wake_q_node *) 0x01)
+
+#define WAKE_Q(name) \
+ struct wake_q_head name = { WAKE_Q_TAIL, &name.first }
+
+extern void wake_q_add(struct wake_q_head *head,
+ struct task_struct *task);
+extern void wake_up_q(struct wake_q_head *head);
+
struct sched_domain_attr {
int relax_domain_level;
};
@@ -1519,6 +1563,8 @@ struct task_struct {
/* Protection of the PI data structures: */
raw_spinlock_t pi_lock;

+ struct wake_q_node wake_q;
+
#ifdef CONFIG_RT_MUTEXES
/* PI waiters blocked on a rt_mutex held by this task */
struct rb_root pi_waiters;
diff --git a/kernel/sched/core.c b/kernel/sched/core.c
index f9123a8..014d036 100644
--- a/kernel/sched/core.c
+++ b/kernel/sched/core.c
@@ -541,6 +541,52 @@ static bool set_nr_if_polling(struct task_struct *p)
#endif
#endif

+void wake_q_add(struct wake_q_head *head, struct task_struct *task)
+{
+ struct wake_q_node *node = &task->wake_q;
+
+ /*
+ * Atomically grab the task, if ->wake_q is !nil already it means
+ * its already queued (either by us or someone else) and will get the
+ * wakeup due to that.
+ *
+ * This cmpxchg() implies a full barrier, which pairs with the write
+ * barrier implied by the wakeup in wake_up_list().
+ */
+ if (cmpxchg(&node->next, NULL, WAKE_Q_TAIL))
+ return;
+
+ get_task_struct(task);
+
+ /*
+ * The head is context local, there can be no concurrency.
+ */
+ *head->lastp = node;
+ head->lastp = &node->next;
+}
+
+void wake_up_q(struct wake_q_head *head)
+{
+ struct wake_q_node *node = head->first;
+
+ while (node != WAKE_Q_TAIL) {
+ struct task_struct *task;
+
+ task = container_of(node, struct task_struct, wake_q);
+ BUG_ON(!task);
+ /* task can safely be re-inserted now */
+ node = node->next;
+ task->wake_q.next = NULL;
+
+ /*
+ * wake_up_process() implies a wmb() to pair with the queueing
+ * in wake_q_add() so as not to miss wakeups.
+ */
+ wake_up_process(task);
+ put_task_struct(task);
+ }
+}
+
/*
* resched_curr - mark rq's current task 'to be rescheduled now'.
*
--
2.1.4

2015-05-01 15:28:41

by Davidlohr Bueso

[permalink] [raw]
Subject: [PATCH 2/3] futex: lockless wakeups

Given the overall futex architecture, any chance of reducing
hb->lock contention is welcome. In this particular case, using
wake-queues to enable lockless wakeups addresses very much real
world performance concerns, even cases of soft-lockups in cases
of large amounts of blocked tasks (which is not hard to find in
large boxes, using but just a handful of futex).

At the lowest level, this patch can reduce latency of a single thread
attempting to acquire hb->lock in highly contended scenarios by a
up to 2x. At lower counts of nr_wake there are no regressions,
confirming, of course, that the wake_q handling overhead is practically
non existent. For instance, while a fair amount of variation,
the extended pef-bench wakeup benchmark shows for a 20 core machine
the following avg per-thread time to wakeup its share of tasks:

nr_thr ms-before ms-after
16 0.0590 0.0215
32 0.0396 0.0220
48 0.0417 0.0182
64 0.0536 0.0236
80 0.0414 0.0097
96 0.0672 0.0152

Naturally, this can cause spurious wakeups. However there is no core code
that cannot handle them afaict, and furthermore tglx does have the point
that other events can already trigger them anyway.

Signed-off-by: Davidlohr Bueso <[email protected]>
---
kernel/futex.c | 33 +++++++++++++++++----------------
1 file changed, 17 insertions(+), 16 deletions(-)

diff --git a/kernel/futex.c b/kernel/futex.c
index 2579e40..f9984c3 100644
--- a/kernel/futex.c
+++ b/kernel/futex.c
@@ -1090,9 +1090,11 @@ static void __unqueue_futex(struct futex_q *q)

/*
* The hash bucket lock must be held when this is called.
- * Afterwards, the futex_q must not be accessed.
+ * Afterwards, the futex_q must not be accessed. Callers
+ * must ensure to later call wake_up_q() for the actual
+ * wakeups to occur.
*/
-static void wake_futex(struct futex_q *q)
+static void mark_wake_futex(struct wake_q_head *wake_q, struct futex_q *q)
{
struct task_struct *p = q->task;

@@ -1100,14 +1102,10 @@ static void wake_futex(struct futex_q *q)
return;

/*
- * We set q->lock_ptr = NULL _before_ we wake up the task. If
- * a non-futex wake up happens on another CPU then the task
- * might exit and p would dereference a non-existing task
- * struct. Prevent this by holding a reference on p across the
- * wake up.
+ * Queue the task for later wakeup for after we've released
+ * the hb->lock. wake_q_add() grabs reference to p.
*/
- get_task_struct(p);
-
+ wake_q_add(wake_q, p);
__unqueue_futex(q);
/*
* The waiting task can free the futex_q as soon as
@@ -1117,9 +1115,6 @@ static void wake_futex(struct futex_q *q)
*/
smp_wmb();
q->lock_ptr = NULL;
-
- wake_up_state(p, TASK_NORMAL);
- put_task_struct(p);
}

static int wake_futex_pi(u32 __user *uaddr, u32 uval, struct futex_q *this)
@@ -1217,6 +1212,7 @@ futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
struct futex_q *this, *next;
union futex_key key = FUTEX_KEY_INIT;
int ret;
+ WAKE_Q(wake_q);

if (!bitset)
return -EINVAL;
@@ -1244,13 +1240,14 @@ futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
if (!(this->bitset & bitset))
continue;

- wake_futex(this);
+ mark_wake_futex(&wake_q, this);
if (++ret >= nr_wake)
break;
}
}

spin_unlock(&hb->lock);
+ wake_up_q(&wake_q);
out_put_key:
put_futex_key(&key);
out:
@@ -1269,6 +1266,7 @@ futex_wake_op(u32 __user *uaddr1, unsigned int flags, u32 __user *uaddr2,
struct futex_hash_bucket *hb1, *hb2;
struct futex_q *this, *next;
int ret, op_ret;
+ WAKE_Q(wake_q);

retry:
ret = get_futex_key(uaddr1, flags & FLAGS_SHARED, &key1, VERIFY_READ);
@@ -1320,7 +1318,7 @@ retry_private:
ret = -EINVAL;
goto out_unlock;
}
- wake_futex(this);
+ mark_wake_futex(&wake_q, this);
if (++ret >= nr_wake)
break;
}
@@ -1334,7 +1332,7 @@ retry_private:
ret = -EINVAL;
goto out_unlock;
}
- wake_futex(this);
+ mark_wake_futex(&wake_q, this);
if (++op_ret >= nr_wake2)
break;
}
@@ -1344,6 +1342,7 @@ retry_private:

out_unlock:
double_unlock_hb(hb1, hb2);
+ wake_up_q(&wake_q);
out_put_keys:
put_futex_key(&key2);
out_put_key1:
@@ -1503,6 +1502,7 @@ static int futex_requeue(u32 __user *uaddr1, unsigned int flags,
struct futex_pi_state *pi_state = NULL;
struct futex_hash_bucket *hb1, *hb2;
struct futex_q *this, *next;
+ WAKE_Q(wake_q);

if (requeue_pi) {
/*
@@ -1679,7 +1679,7 @@ retry_private:
* woken by futex_unlock_pi().
*/
if (++task_count <= nr_wake && !requeue_pi) {
- wake_futex(this);
+ mark_wake_futex(&wake_q, this);
continue;
}

@@ -1719,6 +1719,7 @@ retry_private:
out_unlock:
free_pi_state(pi_state);
double_unlock_hb(hb1, hb2);
+ wake_up_q(&wake_q);
hb_waiters_dec(hb2);

/*
--
2.1.4

2015-05-01 15:28:52

by Davidlohr Bueso

[permalink] [raw]
Subject: [PATCH 3/3] ipc/mqueue: lockless pipelined wakeups

This patch moves the wakeup_process() invocation so it is not done under
the info->lock by making use of a lockless wake_q. With this change, the
waiter is woken up once it is STATE_READY and it does not need to loop
on SMP if it is still in STATE_PENDING. In the timeout case we still need
to grab the info->lock to verify the state.

This change should also avoid the introduction of preempt_disable() in
-RT which avoids a busy-loop which pools for the STATE_PENDING -> STATE_READY
change if the waiter has a higher priority compared to the waker.

Additionally, this patch micro-optimizes wq_sleep by using the cheaper
cousin of set_current_state(TASK_INTERRUPTABLE) as we will block no
matter what, thus get rid of the implied barrier.

Signed-off-by: Davidlohr Bueso <[email protected]>
---
ipc/mqueue.c | 50 +++++++++++++++++++++++++++++++-------------------
1 file changed, 31 insertions(+), 19 deletions(-)

diff --git a/ipc/mqueue.c b/ipc/mqueue.c
index 3aaea7f..5f422f2 100644
--- a/ipc/mqueue.c
+++ b/ipc/mqueue.c
@@ -47,8 +47,7 @@
#define RECV 1

#define STATE_NONE 0
-#define STATE_PENDING 1
-#define STATE_READY 2
+#define STATE_READY 1

struct posix_msg_tree_node {
struct rb_node rb_node;
@@ -571,15 +570,12 @@ static int wq_sleep(struct mqueue_inode_info *info, int sr,
wq_add(info, sr, ewp);

for (;;) {
- set_current_state(TASK_INTERRUPTIBLE);
+ __set_current_state(TASK_INTERRUPTIBLE);

spin_unlock(&info->lock);
time = schedule_hrtimeout_range_clock(timeout, 0,
HRTIMER_MODE_ABS, CLOCK_REALTIME);

- while (ewp->state == STATE_PENDING)
- cpu_relax();
-
if (ewp->state == STATE_READY) {
retval = 0;
goto out;
@@ -909,9 +905,14 @@ out_name:
* bypasses the message array and directly hands the message over to the
* receiver.
* The receiver accepts the message and returns without grabbing the queue
- * spinlock. Therefore an intermediate STATE_PENDING state and memory barriers
- * are necessary. The same algorithm is used for sysv semaphores, see
- * ipc/sem.c for more details.
+ * spinlock. The used algorithm is different from sysv semaphores (ipc/sem.c):
+ *
+ * - Set pointer to message.
+ * - Queue the receiver task's for later wakeup (without the info->lock).
+ * - Update its state to STATE_READY. Now the receiver can continue.
+ * - Wake up the process after the lock is dropped. Should the process wake up
+ * before this wakeup (due to a timeout or a signal) it will either see
+ * STATE_READY and continue or acquire the lock to check the sate again.
*
* The same algorithm is used for senders.
*/
@@ -919,21 +920,29 @@ out_name:
/* pipelined_send() - send a message directly to the task waiting in
* sys_mq_timedreceive() (without inserting message into a queue).
*/
-static inline void pipelined_send(struct mqueue_inode_info *info,
+static inline void pipelined_send(struct wake_q_head *wake_q,
+ struct mqueue_inode_info *info,
struct msg_msg *message,
struct ext_wait_queue *receiver)
{
receiver->msg = message;
list_del(&receiver->list);
- receiver->state = STATE_PENDING;
- wake_up_process(receiver->task);
- smp_wmb();
+ wake_q_add(wake_q, receiver->task);
+ /*
+ * Rely on the implicit cmpxchg barrier from wake_q_add such
+ * that we can ensure that updating receiver->state is the last
+ * write operation: As once set, the receiver can continue,
+ * and if we don't have the reference count from the wake_q,
+ * yet, at that point we can later have a use-after-free
+ * condition and bogus wakeup.
+ */
receiver->state = STATE_READY;
}

/* pipelined_receive() - if there is task waiting in sys_mq_timedsend()
* gets its message and put to the queue (we have one free place for sure). */
-static inline void pipelined_receive(struct mqueue_inode_info *info)
+static inline void pipelined_receive(struct wake_q_head *wake_q,
+ struct mqueue_inode_info *info)
{
struct ext_wait_queue *sender = wq_get_first_waiter(info, SEND);

@@ -944,10 +953,9 @@ static inline void pipelined_receive(struct mqueue_inode_info *info)
}
if (msg_insert(sender->msg, info))
return;
+
list_del(&sender->list);
- sender->state = STATE_PENDING;
- wake_up_process(sender->task);
- smp_wmb();
+ wake_q_add(wake_q, sender->task);
sender->state = STATE_READY;
}

@@ -965,6 +973,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
struct timespec ts;
struct posix_msg_tree_node *new_leaf = NULL;
int ret = 0;
+ WAKE_Q(wake_q);

if (u_abs_timeout) {
int res = prepare_timeout(u_abs_timeout, &expires, &ts);
@@ -1049,7 +1058,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
} else {
receiver = wq_get_first_waiter(info, RECV);
if (receiver) {
- pipelined_send(info, msg_ptr, receiver);
+ pipelined_send(&wake_q, info, msg_ptr, receiver);
} else {
/* adds message to the queue */
ret = msg_insert(msg_ptr, info);
@@ -1062,6 +1071,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
}
out_unlock:
spin_unlock(&info->lock);
+ wake_up_q(&wake_q);
out_free:
if (ret)
free_msg(msg_ptr);
@@ -1084,6 +1094,7 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,
ktime_t expires, *timeout = NULL;
struct timespec ts;
struct posix_msg_tree_node *new_leaf = NULL;
+ WAKE_Q(wake_q);

if (u_abs_timeout) {
int res = prepare_timeout(u_abs_timeout, &expires, &ts);
@@ -1155,8 +1166,9 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,
CURRENT_TIME;

/* There is now free space in queue. */
- pipelined_receive(info);
+ pipelined_receive(&wake_q, info);
spin_unlock(&info->lock);
+ wake_up_q(&wake_q);
ret = 0;
}
if (ret == 0) {
--
2.1.4

2015-05-01 21:52:11

by George Spelvin

[permalink] [raw]
Subject: Re: [PATCH 3/3] ipc/mqueue: lockless pipelined wakeups

In general, Acked-by, but you're making me fix all your comments. :-)

This is a nice use of the wake queue, since the code was already handling
the same problem in a similar way with STATE_PENDING.

> * The receiver accepts the message and returns without grabbing the queue
>+ * spinlock. The used algorithm is different from sysv semaphores (ipc/sem.c):

Is that last sentence even wanted?

>+ *
>+ * - Set pointer to message.
>+ * - Queue the receiver task's for later wakeup (without the info->lock).

It's "task" singular, and the apostrophe would be wrong if it were plural.

>+ * - Update its state to STATE_READY. Now the receiver can continue.
>+ * - Wake up the process after the lock is dropped. Should the process wake up
>+ * before this wakeup (due to a timeout or a signal) it will either see
>+ * STATE_READY and continue or acquire the lock to check the sate again.

"check the sTate again".

>+ wake_q_add(wake_q, receiver->task);
>+ /*
>+ * Rely on the implicit cmpxchg barrier from wake_q_add such
>+ * that we can ensure that updating receiver->state is the last
>+ * write operation: As once set, the receiver can continue,
>+ * and if we don't have the reference count from the wake_q,
>+ * yet, at that point we can later have a use-after-free
>+ * condition and bogus wakeup.
>+ */
> receiver->state = STATE_READY;

How about:
/*
* There must be a write barrier here; setting STATE_READY
* lets the receiver proceed without further synchronization.
* The cmpxchg inside wake_q_add serves as the barrier here.
*/

The need for a wake queue to take a reference to avoid use-after-free
is generic to wake queues, and handled in generic code; I don't see why
it needs a comment here.


>@@ -1084,6 +1094,7 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,
> ktime_t expires, *timeout = NULL;
> struct timespec ts;
> struct posix_msg_tree_node *new_leaf = NULL;
>+ WAKE_Q(wake_q);
>
> if (u_abs_timeout) {
> int res = prepare_timeout(u_abs_timeout, &expires, &ts);
>@@ -1155,8 +1166,9 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,
> CURRENT_TIME;
>
> /* There is now free space in queue. */
>- pipelined_receive(info);
>+ pipelined_receive(&wake_q, info);
> spin_unlock(&info->lock);
>+ wake_up_q(&wake_q);
> ret = 0;
> }
> if (ret == 0) {

Since WAKE_Q actually involves some initialization, would it make sense to
move its declaration to inside the condition that needs it?

(I'm also a fan of declaring variables in the smallest scope possible,
just on general principles.)

2015-05-02 00:36:14

by Davidlohr Bueso

[permalink] [raw]
Subject: Re: [PATCH 3/3] ipc/mqueue: lockless pipelined wakeups

On Fri, 2015-05-01 at 17:52 -0400, George Spelvin wrote:
> In general, Acked-by, but you're making me fix all your comments. :-)
>
> This is a nice use of the wake queue, since the code was already handling
> the same problem in a similar way with STATE_PENDING.
>
> > * The receiver accepts the message and returns without grabbing the queue
> >+ * spinlock. The used algorithm is different from sysv semaphores (ipc/sem.c):
>
> Is that last sentence even wanted?

Yeah, we can probably remove it now.

> >+ *
> >+ * - Set pointer to message.
> >+ * - Queue the receiver task's for later wakeup (without the info->lock).
>
> It's "task" singular, and the apostrophe would be wrong if it were plural.
>
> >+ * - Update its state to STATE_READY. Now the receiver can continue.
> >+ * - Wake up the process after the lock is dropped. Should the process wake up
> >+ * before this wakeup (due to a timeout or a signal) it will either see
> >+ * STATE_READY and continue or acquire the lock to check the sate again.
>
> "check the sTate again".
>
> >+ wake_q_add(wake_q, receiver->task);
> >+ /*
> >+ * Rely on the implicit cmpxchg barrier from wake_q_add such
> >+ * that we can ensure that updating receiver->state is the last
> >+ * write operation: As once set, the receiver can continue,
> >+ * and if we don't have the reference count from the wake_q,
> >+ * yet, at that point we can later have a use-after-free
> >+ * condition and bogus wakeup.
> >+ */
> > receiver->state = STATE_READY;
>
> How about:
> /*
> * There must be a write barrier here; setting STATE_READY
> * lets the receiver proceed without further synchronization.
> * The cmpxchg inside wake_q_add serves as the barrier here.
> */
>
> The need for a wake queue to take a reference to avoid use-after-free
> is generic to wake queues, and handled in generic code; I don't see why
> it needs a comment here.

You are not wrong, but I'd rather leave the comment as is, as it will
vary from user to user. The comments in the sched wake_q bits are
already pretty clear, and if users cannot see the need for holding
reference and the task disappearing on their own they have no business
using wake_q. Furthermore, I think my comment serves better in mqueues
as the need for it isn't immediately obvious.


> >@@ -1084,6 +1094,7 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,
> > ktime_t expires, *timeout = NULL;
> > struct timespec ts;
> > struct posix_msg_tree_node *new_leaf = NULL;
> >+ WAKE_Q(wake_q);
> >
> > if (u_abs_timeout) {
> > int res = prepare_timeout(u_abs_timeout, &expires, &ts);
> >@@ -1155,8 +1166,9 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,
> > CURRENT_TIME;
> >
> > /* There is now free space in queue. */
> >- pipelined_receive(info);
> >+ pipelined_receive(&wake_q, info);
> > spin_unlock(&info->lock);
> >+ wake_up_q(&wake_q);
> > ret = 0;
> > }
> > if (ret == 0) {
>
> Since WAKE_Q actually involves some initialization, would it make sense to
> move its declaration to inside the condition that needs it?
>
> (I'm also a fan of declaring variables in the smallest scope possible,
> just on general principles.)

Agreed.

Thanks,
Davidlohr

2015-05-04 14:03:11

by Davidlohr Bueso

[permalink] [raw]
Subject: [PATCH v2 3/3] ipc/mqueue: lockless pipelined wakeups

This patch moves the wakeup_process() invocation so it is not done under
the info->lock by making use of a lockless wake_q. With this change, the
waiter is woken up once it is STATE_READY and it does not need to loop
on SMP if it is still in STATE_PENDING. In the timeout case we still need
to grab the info->lock to verify the state.

This change should also avoid the introduction of preempt_disable() in
-RT which avoids a busy-loop which pools for the STATE_PENDING -> STATE_READY
change if the waiter has a higher priority compared to the waker.

Additionally, this patch micro-optimizes wq_sleep by using the cheaper
cousin of set_current_state(TASK_INTERRUPTABLE) as we will block no
matter what, thus get rid of the implied barrier.

Signed-off-by: Davidlohr Bueso <[email protected]>
---

changes from v1:
- Fix some comments.
- Move wake_q scope when needed in mq_timedreceive().

ipc/mqueue.c | 54 +++++++++++++++++++++++++++++++++---------------------
1 file changed, 33 insertions(+), 21 deletions(-)

diff --git a/ipc/mqueue.c b/ipc/mqueue.c
index 3aaea7f..a24ba9f 100644
--- a/ipc/mqueue.c
+++ b/ipc/mqueue.c
@@ -47,8 +47,7 @@
#define RECV 1

#define STATE_NONE 0
-#define STATE_PENDING 1
-#define STATE_READY 2
+#define STATE_READY 1

struct posix_msg_tree_node {
struct rb_node rb_node;
@@ -571,15 +570,12 @@ static int wq_sleep(struct mqueue_inode_info *info, int sr,
wq_add(info, sr, ewp);

for (;;) {
- set_current_state(TASK_INTERRUPTIBLE);
+ __set_current_state(TASK_INTERRUPTIBLE);

spin_unlock(&info->lock);
time = schedule_hrtimeout_range_clock(timeout, 0,
HRTIMER_MODE_ABS, CLOCK_REALTIME);

- while (ewp->state == STATE_PENDING)
- cpu_relax();
-
if (ewp->state == STATE_READY) {
retval = 0;
goto out;
@@ -907,11 +903,15 @@ out_name:
* list of waiting receivers. A sender checks that list before adding the new
* message into the message array. If there is a waiting receiver, then it
* bypasses the message array and directly hands the message over to the
- * receiver.
- * The receiver accepts the message and returns without grabbing the queue
- * spinlock. Therefore an intermediate STATE_PENDING state and memory barriers
- * are necessary. The same algorithm is used for sysv semaphores, see
- * ipc/sem.c for more details.
+ * receiver. The receiver accepts the message and returns without grabbing the
+ * queue spinlock:
+ *
+ * - Set pointer to message.
+ * - Queue the receiver task for later wakeup (without the info->lock).
+ * - Update its state to STATE_READY. Now the receiver can continue.
+ * - Wake up the process after the lock is dropped. Should the process wake up
+ * before this wakeup (due to a timeout or a signal) it will either see
+ * STATE_READY and continue or acquire the lock to check the state again.
*
* The same algorithm is used for senders.
*/
@@ -919,21 +919,29 @@ out_name:
/* pipelined_send() - send a message directly to the task waiting in
* sys_mq_timedreceive() (without inserting message into a queue).
*/
-static inline void pipelined_send(struct mqueue_inode_info *info,
+static inline void pipelined_send(struct wake_q_head *wake_q,
+ struct mqueue_inode_info *info,
struct msg_msg *message,
struct ext_wait_queue *receiver)
{
receiver->msg = message;
list_del(&receiver->list);
- receiver->state = STATE_PENDING;
- wake_up_process(receiver->task);
- smp_wmb();
+ wake_q_add(wake_q, receiver->task);
+ /*
+ * Rely on the implicit cmpxchg barrier from wake_q_add such
+ * that we can ensure that updating receiver->state is the last
+ * write operation: As once set, the receiver can continue,
+ * and if we don't have the reference count from the wake_q,
+ * yet, at that point we can later have a use-after-free
+ * condition and bogus wakeup.
+ */
receiver->state = STATE_READY;
}

/* pipelined_receive() - if there is task waiting in sys_mq_timedsend()
* gets its message and put to the queue (we have one free place for sure). */
-static inline void pipelined_receive(struct mqueue_inode_info *info)
+static inline void pipelined_receive(struct wake_q_head *wake_q,
+ struct mqueue_inode_info *info)
{
struct ext_wait_queue *sender = wq_get_first_waiter(info, SEND);

@@ -944,10 +952,9 @@ static inline void pipelined_receive(struct mqueue_inode_info *info)
}
if (msg_insert(sender->msg, info))
return;
+
list_del(&sender->list);
- sender->state = STATE_PENDING;
- wake_up_process(sender->task);
- smp_wmb();
+ wake_q_add(wake_q, sender->task);
sender->state = STATE_READY;
}

@@ -965,6 +972,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
struct timespec ts;
struct posix_msg_tree_node *new_leaf = NULL;
int ret = 0;
+ WAKE_Q(wake_q);

if (u_abs_timeout) {
int res = prepare_timeout(u_abs_timeout, &expires, &ts);
@@ -1049,7 +1057,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
} else {
receiver = wq_get_first_waiter(info, RECV);
if (receiver) {
- pipelined_send(info, msg_ptr, receiver);
+ pipelined_send(&wake_q, info, msg_ptr, receiver);
} else {
/* adds message to the queue */
ret = msg_insert(msg_ptr, info);
@@ -1062,6 +1070,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
}
out_unlock:
spin_unlock(&info->lock);
+ wake_up_q(&wake_q);
out_free:
if (ret)
free_msg(msg_ptr);
@@ -1149,14 +1158,17 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,
msg_ptr = wait.msg;
}
} else {
+ WAKE_Q(wake_q);
+
msg_ptr = msg_get(info);

inode->i_atime = inode->i_mtime = inode->i_ctime =
CURRENT_TIME;

/* There is now free space in queue. */
- pipelined_receive(info);
+ pipelined_receive(&wake_q, info);
spin_unlock(&info->lock);
+ wake_up_q(&wake_q);
ret = 0;
}
if (ret == 0) {
--
2.1.4


2015-05-05 00:37:21

by George Spelvin

[permalink] [raw]
Subject: Re: [PATCH 3/3] ipc/mqueue: lockless pipelined wakeups

> You are not wrong, but I'd rather leave the comment as is, as it will
> vary from user to user. The comments in the sched wake_q bits are
> already pretty clear, and if users cannot see the need for holding
> reference and the task disappearing on their own they have no business
> using wake_q. Furthermore, I think my comment serves better in mqueues
> as the need for it isn't immediately obvious.

Okay, but the comment is still rather awkward and hard-to-follow English.

How about:
/*
* Rely on the implicit cmpxchg barrier from wake_q_add to
* ensure that updating receiver->state is the last write
* operation. Once set, the receiver can continue, and if we
* hadn't placed it on the wake_q (which takes a reference to
* the task), any later use might cause a use-after-free
* condition.
*/

Part of the confusion is that there are two different ordering conditions
that wake_q_add is involved in, and the comment above (even my version)
isn't good about explaining the distinction:

1) It, itself, must come before the receiver->state update, because
after that, the receiver may run (and possibly exit).
2) It serves as a write barrier for all the other state writes above.

If I wanted to be clearer, I'd have to do more extensive edits:

/*
* wake_q_add must come before updating receiver->state, since
* that write lets the receiver continue (and possibly exit).
* The reference count from the wake_q prevents use-after-free.
*
* The cmpxchg inside wake_q_add also serves as a write barrier
* for all the other state updates that must be visible before
* receiver->state.
*/

None of this affects the code, which is
Acked-by: George Spelvin <[email protected]>

Subject: [tip:sched/core] sched: Implement lockless wake-queues

Commit-ID: 7675104990ed255b9315a82ae827ff312a2a88a2
Gitweb: http://git.kernel.org/tip/7675104990ed255b9315a82ae827ff312a2a88a2
Author: Peter Zijlstra <[email protected]>
AuthorDate: Fri, 1 May 2015 08:27:50 -0700
Committer: Ingo Molnar <[email protected]>
CommitDate: Fri, 8 May 2015 12:20:45 +0200

sched: Implement lockless wake-queues

This is useful for locking primitives that can effect multiple
wakeups per operation and want to avoid lock internal lock contention
by delaying the wakeups until we've released the lock internal locks.

Alternatively it can be used to avoid issuing multiple wakeups, and
thus save a few cycles, in packet processing. Queue all target tasks
and wakeup once you've processed all packets. That way you avoid
waking the target task multiple times if there were multiple packets
for the same task.

Properties of a wake_q are:
- Lockless, as queue head must reside on the stack.
- Being a queue, maintains wakeup order passed by the callers. This can
be important for otherwise, in scenarios where highly contended locks
could affect any reliance on lock fairness.
- A queued task cannot be added again until it is woken up.

This patch adds the needed infrastructure into the scheduler code
and uses the new wake_list to delay the futex wakeups until
after we've released the hash bucket locks.

Signed-off-by: Peter Zijlstra (Intel) <[email protected]>
[tweaks, adjustments, comments, etc.]
Signed-off-by: Davidlohr Bueso <[email protected]>
Signed-off-by: Peter Zijlstra (Intel) <[email protected]>
Acked-by: Thomas Gleixner <[email protected]>
Cc: Borislav Petkov <[email protected]>
Cc: Chris Mason <[email protected]>
Cc: Davidlohr Bueso <[email protected]>
Cc: George Spelvin <[email protected]>
Cc: H. Peter Anvin <[email protected]>
Cc: Linus Torvalds <[email protected]>
Cc: Manfred Spraul <[email protected]>
Cc: Sebastian Andrzej Siewior <[email protected]>
Cc: Steven Rostedt <[email protected]>
Link: http://lkml.kernel.org/r/[email protected]
Signed-off-by: Ingo Molnar <[email protected]>
---
include/linux/sched.h | 46 ++++++++++++++++++++++++++++++++++++++++++++++
kernel/sched/core.c | 46 ++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 92 insertions(+)

diff --git a/include/linux/sched.h b/include/linux/sched.h
index 4adc536..254d88e 100644
--- a/include/linux/sched.h
+++ b/include/linux/sched.h
@@ -921,6 +921,50 @@ enum cpu_idle_type {
#define SCHED_CAPACITY_SCALE (1L << SCHED_CAPACITY_SHIFT)

/*
+ * Wake-queues are lists of tasks with a pending wakeup, whose
+ * callers have already marked the task as woken internally,
+ * and can thus carry on. A common use case is being able to
+ * do the wakeups once the corresponding user lock as been
+ * released.
+ *
+ * We hold reference to each task in the list across the wakeup,
+ * thus guaranteeing that the memory is still valid by the time
+ * the actual wakeups are performed in wake_up_q().
+ *
+ * One per task suffices, because there's never a need for a task to be
+ * in two wake queues simultaneously; it is forbidden to abandon a task
+ * in a wake queue (a call to wake_up_q() _must_ follow), so if a task is
+ * already in a wake queue, the wakeup will happen soon and the second
+ * waker can just skip it.
+ *
+ * The WAKE_Q macro declares and initializes the list head.
+ * wake_up_q() does NOT reinitialize the list; it's expected to be
+ * called near the end of a function, where the fact that the queue is
+ * not used again will be easy to see by inspection.
+ *
+ * Note that this can cause spurious wakeups. schedule() callers
+ * must ensure the call is done inside a loop, confirming that the
+ * wakeup condition has in fact occurred.
+ */
+struct wake_q_node {
+ struct wake_q_node *next;
+};
+
+struct wake_q_head {
+ struct wake_q_node *first;
+ struct wake_q_node **lastp;
+};
+
+#define WAKE_Q_TAIL ((struct wake_q_node *) 0x01)
+
+#define WAKE_Q(name) \
+ struct wake_q_head name = { WAKE_Q_TAIL, &name.first }
+
+extern void wake_q_add(struct wake_q_head *head,
+ struct task_struct *task);
+extern void wake_up_q(struct wake_q_head *head);
+
+/*
* sched-domains (multiprocessor balancing) declarations:
*/
#ifdef CONFIG_SMP
@@ -1532,6 +1576,8 @@ struct task_struct {
/* Protection of the PI data structures: */
raw_spinlock_t pi_lock;

+ struct wake_q_node wake_q;
+
#ifdef CONFIG_RT_MUTEXES
/* PI waiters blocked on a rt_mutex held by this task */
struct rb_root pi_waiters;
diff --git a/kernel/sched/core.c b/kernel/sched/core.c
index 22b53c8..355f953 100644
--- a/kernel/sched/core.c
+++ b/kernel/sched/core.c
@@ -541,6 +541,52 @@ static bool set_nr_if_polling(struct task_struct *p)
#endif
#endif

+void wake_q_add(struct wake_q_head *head, struct task_struct *task)
+{
+ struct wake_q_node *node = &task->wake_q;
+
+ /*
+ * Atomically grab the task, if ->wake_q is !nil already it means
+ * its already queued (either by us or someone else) and will get the
+ * wakeup due to that.
+ *
+ * This cmpxchg() implies a full barrier, which pairs with the write
+ * barrier implied by the wakeup in wake_up_list().
+ */
+ if (cmpxchg(&node->next, NULL, WAKE_Q_TAIL))
+ return;
+
+ get_task_struct(task);
+
+ /*
+ * The head is context local, there can be no concurrency.
+ */
+ *head->lastp = node;
+ head->lastp = &node->next;
+}
+
+void wake_up_q(struct wake_q_head *head)
+{
+ struct wake_q_node *node = head->first;
+
+ while (node != WAKE_Q_TAIL) {
+ struct task_struct *task;
+
+ task = container_of(node, struct task_struct, wake_q);
+ BUG_ON(!task);
+ /* task can safely be re-inserted now */
+ node = node->next;
+ task->wake_q.next = NULL;
+
+ /*
+ * wake_up_process() implies a wmb() to pair with the queueing
+ * in wake_q_add() so as not to miss wakeups.
+ */
+ wake_up_process(task);
+ put_task_struct(task);
+ }
+}
+
/*
* resched_curr - mark rq's current task 'to be rescheduled now'.
*

Subject: [tip:sched/core] futex: Implement lockless wakeups

Commit-ID: 1d0dcb3ad9d336e6d6ee020a750a7f8d907e28de
Gitweb: http://git.kernel.org/tip/1d0dcb3ad9d336e6d6ee020a750a7f8d907e28de
Author: Davidlohr Bueso <[email protected]>
AuthorDate: Fri, 1 May 2015 08:27:51 -0700
Committer: Ingo Molnar <[email protected]>
CommitDate: Fri, 8 May 2015 12:21:40 +0200

futex: Implement lockless wakeups

Given the overall futex architecture, any chance of reducing
hb->lock contention is welcome. In this particular case, using
wake-queues to enable lockless wakeups addresses very much real
world performance concerns, even cases of soft-lockups in cases
of large amounts of blocked tasks (which is not hard to find in
large boxes, using but just a handful of futex).

At the lowest level, this patch can reduce latency of a single thread
attempting to acquire hb->lock in highly contended scenarios by a
up to 2x. At lower counts of nr_wake there are no regressions,
confirming, of course, that the wake_q handling overhead is practically
non existent. For instance, while a fair amount of variation,
the extended pef-bench wakeup benchmark shows for a 20 core machine
the following avg per-thread time to wakeup its share of tasks:

nr_thr ms-before ms-after
16 0.0590 0.0215
32 0.0396 0.0220
48 0.0417 0.0182
64 0.0536 0.0236
80 0.0414 0.0097
96 0.0672 0.0152

Naturally, this can cause spurious wakeups. However there is no core code
that cannot handle them afaict, and furthermore tglx does have the point
that other events can already trigger them anyway.

Signed-off-by: Davidlohr Bueso <[email protected]>
Signed-off-by: Peter Zijlstra (Intel) <[email protected]>
Acked-by: Thomas Gleixner <[email protected]>
Cc: Andrew Morton <[email protected]>
Cc: Borislav Petkov <[email protected]>
Cc: Chris Mason <[email protected]>
Cc: Davidlohr Bueso <[email protected]>
Cc: George Spelvin <[email protected]>
Cc: H. Peter Anvin <[email protected]>
Cc: Linus Torvalds <[email protected]>
Cc: Manfred Spraul <[email protected]>
Cc: Peter Zijlstra <[email protected]>
Cc: Sebastian Andrzej Siewior <[email protected]>
Cc: Steven Rostedt <[email protected]>
Link: http://lkml.kernel.org/r/[email protected]
Signed-off-by: Ingo Molnar <[email protected]>
---
kernel/futex.c | 33 +++++++++++++++++----------------
1 file changed, 17 insertions(+), 16 deletions(-)

diff --git a/kernel/futex.c b/kernel/futex.c
index 2579e40..f9984c3 100644
--- a/kernel/futex.c
+++ b/kernel/futex.c
@@ -1090,9 +1090,11 @@ static void __unqueue_futex(struct futex_q *q)

/*
* The hash bucket lock must be held when this is called.
- * Afterwards, the futex_q must not be accessed.
+ * Afterwards, the futex_q must not be accessed. Callers
+ * must ensure to later call wake_up_q() for the actual
+ * wakeups to occur.
*/
-static void wake_futex(struct futex_q *q)
+static void mark_wake_futex(struct wake_q_head *wake_q, struct futex_q *q)
{
struct task_struct *p = q->task;

@@ -1100,14 +1102,10 @@ static void wake_futex(struct futex_q *q)
return;

/*
- * We set q->lock_ptr = NULL _before_ we wake up the task. If
- * a non-futex wake up happens on another CPU then the task
- * might exit and p would dereference a non-existing task
- * struct. Prevent this by holding a reference on p across the
- * wake up.
+ * Queue the task for later wakeup for after we've released
+ * the hb->lock. wake_q_add() grabs reference to p.
*/
- get_task_struct(p);
-
+ wake_q_add(wake_q, p);
__unqueue_futex(q);
/*
* The waiting task can free the futex_q as soon as
@@ -1117,9 +1115,6 @@ static void wake_futex(struct futex_q *q)
*/
smp_wmb();
q->lock_ptr = NULL;
-
- wake_up_state(p, TASK_NORMAL);
- put_task_struct(p);
}

static int wake_futex_pi(u32 __user *uaddr, u32 uval, struct futex_q *this)
@@ -1217,6 +1212,7 @@ futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
struct futex_q *this, *next;
union futex_key key = FUTEX_KEY_INIT;
int ret;
+ WAKE_Q(wake_q);

if (!bitset)
return -EINVAL;
@@ -1244,13 +1240,14 @@ futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
if (!(this->bitset & bitset))
continue;

- wake_futex(this);
+ mark_wake_futex(&wake_q, this);
if (++ret >= nr_wake)
break;
}
}

spin_unlock(&hb->lock);
+ wake_up_q(&wake_q);
out_put_key:
put_futex_key(&key);
out:
@@ -1269,6 +1266,7 @@ futex_wake_op(u32 __user *uaddr1, unsigned int flags, u32 __user *uaddr2,
struct futex_hash_bucket *hb1, *hb2;
struct futex_q *this, *next;
int ret, op_ret;
+ WAKE_Q(wake_q);

retry:
ret = get_futex_key(uaddr1, flags & FLAGS_SHARED, &key1, VERIFY_READ);
@@ -1320,7 +1318,7 @@ retry_private:
ret = -EINVAL;
goto out_unlock;
}
- wake_futex(this);
+ mark_wake_futex(&wake_q, this);
if (++ret >= nr_wake)
break;
}
@@ -1334,7 +1332,7 @@ retry_private:
ret = -EINVAL;
goto out_unlock;
}
- wake_futex(this);
+ mark_wake_futex(&wake_q, this);
if (++op_ret >= nr_wake2)
break;
}
@@ -1344,6 +1342,7 @@ retry_private:

out_unlock:
double_unlock_hb(hb1, hb2);
+ wake_up_q(&wake_q);
out_put_keys:
put_futex_key(&key2);
out_put_key1:
@@ -1503,6 +1502,7 @@ static int futex_requeue(u32 __user *uaddr1, unsigned int flags,
struct futex_pi_state *pi_state = NULL;
struct futex_hash_bucket *hb1, *hb2;
struct futex_q *this, *next;
+ WAKE_Q(wake_q);

if (requeue_pi) {
/*
@@ -1679,7 +1679,7 @@ retry_private:
* woken by futex_unlock_pi().
*/
if (++task_count <= nr_wake && !requeue_pi) {
- wake_futex(this);
+ mark_wake_futex(&wake_q, this);
continue;
}

@@ -1719,6 +1719,7 @@ retry_private:
out_unlock:
free_pi_state(pi_state);
double_unlock_hb(hb1, hb2);
+ wake_up_q(&wake_q);
hb_waiters_dec(hb2);

/*

Subject: [tip:sched/core] ipc/mqueue: Implement lockless pipelined wakeups

Commit-ID: fa6004ad4528153b699a4d5ce5ea6b33acce74cc
Gitweb: http://git.kernel.org/tip/fa6004ad4528153b699a4d5ce5ea6b33acce74cc
Author: Davidlohr Bueso <[email protected]>
AuthorDate: Mon, 4 May 2015 07:02:46 -0700
Committer: Ingo Molnar <[email protected]>
CommitDate: Fri, 8 May 2015 12:23:07 +0200

ipc/mqueue: Implement lockless pipelined wakeups

This patch moves the wakeup_process() invocation so it is not done under
the info->lock by making use of a lockless wake_q. With this change, the
waiter is woken up once it is STATE_READY and it does not need to loop
on SMP if it is still in STATE_PENDING. In the timeout case we still need
to grab the info->lock to verify the state.

This change should also avoid the introduction of preempt_disable() in -rt
which avoids a busy-loop which pools for the STATE_PENDING -> STATE_READY
change if the waiter has a higher priority compared to the waker.

Additionally, this patch micro-optimizes wq_sleep by using the cheaper
cousin of set_current_state(TASK_INTERRUPTABLE) as we will block no
matter what, thus get rid of the implied barrier.

Signed-off-by: Davidlohr Bueso <[email protected]>
Signed-off-by: Peter Zijlstra (Intel) <[email protected]>
Acked-by: George Spelvin <[email protected]>
Acked-by: Thomas Gleixner <[email protected]>
Cc: Andrew Morton <[email protected]>
Cc: Borislav Petkov <[email protected]>
Cc: Chris Mason <[email protected]>
Cc: H. Peter Anvin <[email protected]>
Cc: Linus Torvalds <[email protected]>
Cc: Manfred Spraul <[email protected]>
Cc: Peter Zijlstra <[email protected]>
Cc: Sebastian Andrzej Siewior <[email protected]>
Cc: Steven Rostedt <[email protected]>
Cc: [email protected]
Link: http://lkml.kernel.org/r/[email protected]
Signed-off-by: Ingo Molnar <[email protected]>
---
ipc/mqueue.c | 54 +++++++++++++++++++++++++++++++++---------------------
1 file changed, 33 insertions(+), 21 deletions(-)

diff --git a/ipc/mqueue.c b/ipc/mqueue.c
index 3aaea7f..a24ba9f 100644
--- a/ipc/mqueue.c
+++ b/ipc/mqueue.c
@@ -47,8 +47,7 @@
#define RECV 1

#define STATE_NONE 0
-#define STATE_PENDING 1
-#define STATE_READY 2
+#define STATE_READY 1

struct posix_msg_tree_node {
struct rb_node rb_node;
@@ -571,15 +570,12 @@ static int wq_sleep(struct mqueue_inode_info *info, int sr,
wq_add(info, sr, ewp);

for (;;) {
- set_current_state(TASK_INTERRUPTIBLE);
+ __set_current_state(TASK_INTERRUPTIBLE);

spin_unlock(&info->lock);
time = schedule_hrtimeout_range_clock(timeout, 0,
HRTIMER_MODE_ABS, CLOCK_REALTIME);

- while (ewp->state == STATE_PENDING)
- cpu_relax();
-
if (ewp->state == STATE_READY) {
retval = 0;
goto out;
@@ -907,11 +903,15 @@ out_name:
* list of waiting receivers. A sender checks that list before adding the new
* message into the message array. If there is a waiting receiver, then it
* bypasses the message array and directly hands the message over to the
- * receiver.
- * The receiver accepts the message and returns without grabbing the queue
- * spinlock. Therefore an intermediate STATE_PENDING state and memory barriers
- * are necessary. The same algorithm is used for sysv semaphores, see
- * ipc/sem.c for more details.
+ * receiver. The receiver accepts the message and returns without grabbing the
+ * queue spinlock:
+ *
+ * - Set pointer to message.
+ * - Queue the receiver task for later wakeup (without the info->lock).
+ * - Update its state to STATE_READY. Now the receiver can continue.
+ * - Wake up the process after the lock is dropped. Should the process wake up
+ * before this wakeup (due to a timeout or a signal) it will either see
+ * STATE_READY and continue or acquire the lock to check the state again.
*
* The same algorithm is used for senders.
*/
@@ -919,21 +919,29 @@ out_name:
/* pipelined_send() - send a message directly to the task waiting in
* sys_mq_timedreceive() (without inserting message into a queue).
*/
-static inline void pipelined_send(struct mqueue_inode_info *info,
+static inline void pipelined_send(struct wake_q_head *wake_q,
+ struct mqueue_inode_info *info,
struct msg_msg *message,
struct ext_wait_queue *receiver)
{
receiver->msg = message;
list_del(&receiver->list);
- receiver->state = STATE_PENDING;
- wake_up_process(receiver->task);
- smp_wmb();
+ wake_q_add(wake_q, receiver->task);
+ /*
+ * Rely on the implicit cmpxchg barrier from wake_q_add such
+ * that we can ensure that updating receiver->state is the last
+ * write operation: As once set, the receiver can continue,
+ * and if we don't have the reference count from the wake_q,
+ * yet, at that point we can later have a use-after-free
+ * condition and bogus wakeup.
+ */
receiver->state = STATE_READY;
}

/* pipelined_receive() - if there is task waiting in sys_mq_timedsend()
* gets its message and put to the queue (we have one free place for sure). */
-static inline void pipelined_receive(struct mqueue_inode_info *info)
+static inline void pipelined_receive(struct wake_q_head *wake_q,
+ struct mqueue_inode_info *info)
{
struct ext_wait_queue *sender = wq_get_first_waiter(info, SEND);

@@ -944,10 +952,9 @@ static inline void pipelined_receive(struct mqueue_inode_info *info)
}
if (msg_insert(sender->msg, info))
return;
+
list_del(&sender->list);
- sender->state = STATE_PENDING;
- wake_up_process(sender->task);
- smp_wmb();
+ wake_q_add(wake_q, sender->task);
sender->state = STATE_READY;
}

@@ -965,6 +972,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
struct timespec ts;
struct posix_msg_tree_node *new_leaf = NULL;
int ret = 0;
+ WAKE_Q(wake_q);

if (u_abs_timeout) {
int res = prepare_timeout(u_abs_timeout, &expires, &ts);
@@ -1049,7 +1057,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
} else {
receiver = wq_get_first_waiter(info, RECV);
if (receiver) {
- pipelined_send(info, msg_ptr, receiver);
+ pipelined_send(&wake_q, info, msg_ptr, receiver);
} else {
/* adds message to the queue */
ret = msg_insert(msg_ptr, info);
@@ -1062,6 +1070,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
}
out_unlock:
spin_unlock(&info->lock);
+ wake_up_q(&wake_q);
out_free:
if (ret)
free_msg(msg_ptr);
@@ -1149,14 +1158,17 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,
msg_ptr = wait.msg;
}
} else {
+ WAKE_Q(wake_q);
+
msg_ptr = msg_get(info);

inode->i_atime = inode->i_mtime = inode->i_ctime =
CURRENT_TIME;

/* There is now free space in queue. */
- pipelined_receive(info);
+ pipelined_receive(&wake_q, info);
spin_unlock(&info->lock);
+ wake_up_q(&wake_q);
ret = 0;
}
if (ret == 0) {