Return-Path: Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1756220AbbDJOhe (ORCPT ); Fri, 10 Apr 2015 10:37:34 -0400 Received: from www.linutronix.de ([62.245.132.108]:52007 "EHLO Galois.linutronix.de" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1756152AbbDJOha convert rfc822-to-8bit (ORCPT ); Fri, 10 Apr 2015 10:37:30 -0400 Date: Fri, 10 Apr 2015 16:37:26 +0200 From: Sebastian Andrzej Siewior To: Manfred Spraul Cc: linux-kernel@vger.kernel.org, Peter Zijlstra , Ingo Molnar , Thomas Gleixner , Darren Hart , Steven Rostedt , fredrik.markstrom@windriver.com, Davidlohr Bueso Subject: [PATCH v2] ipc/mqueue: remove STATE_PENDING Message-ID: <20150410143726.GD3057@linutronix.de> References: <1428419030-20030-1-git-send-email-bigeasy@linutronix.de> <1428419030-20030-4-git-send-email-bigeasy@linutronix.de> <55241851.7060704@colorfullife.com> MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Disposition: inline Content-Transfer-Encoding: 8BIT In-Reply-To: <55241851.7060704@colorfullife.com> X-Key-Id: 2A8CF5D1 X-Key-Fingerprint: 6425 4695 FFF0 AA44 66CC 19E6 7B96 E816 2A8C F5D1 User-Agent: Mutt/1.5.21 (2010-09-15) Sender: linux-kernel-owner@vger.kernel.org List-ID: X-Mailing-List: linux-kernel@vger.kernel.org Content-Length: 6692 Lines: 203 This patch moves the wakeup_process() invocation so it is not done under the info->lock. With this change, the waiter is woken up once it is "ready" which means its state is STATE_READY and it does not need to loop on SMP if it is still in STATE_PENDING. In the timeout case we still need to grab the info->lock to verify the state. This change should also avoid the introduction of preempt_disable() in -RT which avoids a busy-loop which pools for the STATE_PENDING -> STATE_READY change if the waiter has a higher priority compared to the waker. Signed-off-by: Sebastian Andrzej Siewior --- v1…v2: update / correct the comment of how the algorithm works now. * Manfred Spraul | 2015-04-07 19:48:01 [+0200]: >No. With your change, ipc/sem.c and ipc/msg.c use different algorithms. >Please update the comment and describe the new approach: > >Current approach: >- set pointer to message >- STATE_PENDING >- wake_up_process() >- STATE_READY > (now the receiver can continue) > >New approach: >- set pointer to message >- get_task_struct >- STATE_READY > (now the receiver can continue, e.g. woken up due to an unrelated >SIGKILL) >- wake_up_process() >- put_task_struct() Updated. Does it meet your expectation? > >>+ if (r_sender) { >>+ wake_up_process(r_sender); >>+ put_task_struct(r_sender); >>+ } >> ret = 0; >Could you double-check that it is safe to call wake_up_process on a >killed and reaped thread, only with a get_task_struct reference? tglx answered that part. >And: please test it, too. (patch the kernel so that you can trigger >this case). Why patch? Isn't this triggered if you have a reader waiting and you send a message? ipc/mqueue.c | 51 ++++++++++++++++++++++++++++++++++----------------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/ipc/mqueue.c b/ipc/mqueue.c index 7635a1cf99f3..bfdc2219a470 100644 --- a/ipc/mqueue.c +++ b/ipc/mqueue.c @@ -47,7 +47,6 @@ #define RECV 1 #define STATE_NONE 0 -#define STATE_PENDING 1 #define STATE_READY 2 struct posix_msg_tree_node { @@ -577,9 +576,6 @@ static int wq_sleep(struct mqueue_inode_info *info, int sr, time = schedule_hrtimeout_range_clock(timeout, 0, HRTIMER_MODE_ABS, CLOCK_REALTIME); - while (ewp->state == STATE_PENDING) - cpu_relax(); - if (ewp->state == STATE_READY) { retval = 0; goto out; @@ -909,9 +905,14 @@ SYSCALL_DEFINE1(mq_unlink, const char __user *, u_name) * bypasses the message array and directly hands the message over to the * receiver. * The receiver accepts the message and returns without grabbing the queue - * spinlock. Therefore an intermediate STATE_PENDING state and memory barriers - * are necessary. The same algorithm is used for sysv semaphores, see - * ipc/sem.c for more details. + * spinlock. The used algorithm is different from sysv semaphores (ipc/sem.c): + * - set pointer to message + * - hold a reference of the task to be woken up + * - update its state (to STATE_READY). Now the receiver can continue. + * - wake up the process after the lock is dropped. Should the process wake up + * before this wakeup (due to a timeout or a signal) it will either see + * STATE_READY and continue or acquire the lock to check the sate again. + * - put the reference to task to be woken up. * * The same algorithm is used for senders. */ @@ -919,36 +920,41 @@ SYSCALL_DEFINE1(mq_unlink, const char __user *, u_name) /* pipelined_send() - send a message directly to the task waiting in * sys_mq_timedreceive() (without inserting message into a queue). */ -static inline void pipelined_send(struct mqueue_inode_info *info, +static struct task_struct *pipelined_send(struct mqueue_inode_info *info, struct msg_msg *message, struct ext_wait_queue *receiver) { + struct task_struct *r_task; + receiver->msg = message; list_del(&receiver->list); - receiver->state = STATE_PENDING; - wake_up_process(receiver->task); + r_task = receiver->task; + get_task_struct(r_task); smp_wmb(); receiver->state = STATE_READY; + return r_task; } /* pipelined_receive() - if there is task waiting in sys_mq_timedsend() * gets its message and put to the queue (we have one free place for sure). */ -static inline void pipelined_receive(struct mqueue_inode_info *info) +static struct task_struct *pipelined_receive(struct mqueue_inode_info *info) { + struct task_struct *r_sender; struct ext_wait_queue *sender = wq_get_first_waiter(info, SEND); if (!sender) { /* for poll */ wake_up_interruptible(&info->wait_q); - return; + return NULL; } if (msg_insert(sender->msg, info)) - return; + return NULL; list_del(&sender->list); - sender->state = STATE_PENDING; - wake_up_process(sender->task); + r_sender = sender->task; + get_task_struct(r_sender); smp_wmb(); sender->state = STATE_READY; + return r_sender; } SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr, @@ -961,6 +967,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr, struct ext_wait_queue *receiver; struct msg_msg *msg_ptr; struct mqueue_inode_info *info; + struct task_struct *r_task = NULL; ktime_t expires, *timeout = NULL; struct timespec ts; struct posix_msg_tree_node *new_leaf = NULL; @@ -1049,7 +1056,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr, } else { receiver = wq_get_first_waiter(info, RECV); if (receiver) { - pipelined_send(info, msg_ptr, receiver); + r_task = pipelined_send(info, msg_ptr, receiver); } else { /* adds message to the queue */ ret = msg_insert(msg_ptr, info); @@ -1062,6 +1069,10 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr, } out_unlock: spin_unlock(&info->lock); + if (r_task) { + wake_up_process(r_task); + put_task_struct(r_task); + } out_free: if (ret) free_msg(msg_ptr); @@ -1149,14 +1160,20 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr, msg_ptr = wait.msg; } } else { + struct task_struct *r_sender; + msg_ptr = msg_get(info); inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME; /* There is now free space in queue. */ - pipelined_receive(info); + r_sender = pipelined_receive(info); spin_unlock(&info->lock); + if (r_sender) { + wake_up_process(r_sender); + put_task_struct(r_sender); + } ret = 0; } if (ret == 0) { -- 2.1.4 -- To unsubscribe from this list: send the line "unsubscribe linux-kernel" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html Please read the FAQ at http://www.tux.org/lkml/