Refactor post_one_notification so that we can lock pipe using
sleepable lock.
Signed-off-by: Hongchen Zhang <[email protected]>
---
fs/pipe.c | 5 +++-
include/linux/watch_queue.h | 14 ++++++++++-
kernel/watch_queue.c | 47 +++++++++++++++++++++++++++----------
3 files changed, 51 insertions(+), 15 deletions(-)
diff --git a/fs/pipe.c b/fs/pipe.c
index 2d88f73f585a..5c6b3daed938 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -834,8 +834,11 @@ void free_pipe_info(struct pipe_inode_info *pipe)
unsigned int i;
#ifdef CONFIG_WATCH_QUEUE
- if (pipe->watch_queue)
+ if (pipe->watch_queue) {
watch_queue_clear(pipe->watch_queue);
+ smp_cond_load_relaxed(&pipe->watch_queue->state,
+ (VAL & WATCH_QUEUE_POST_CNT_MASK) == 0);
+ }
#endif
(void) account_pipe_buffers(pipe->user, pipe->nr_accounted, 0);
diff --git a/include/linux/watch_queue.h b/include/linux/watch_queue.h
index fc6bba20273b..1db3eee2137a 100644
--- a/include/linux/watch_queue.h
+++ b/include/linux/watch_queue.h
@@ -35,6 +35,7 @@ struct watch_filter {
struct watch_type_filter filters[];
};
+#define WATCH_QUEUE_POST_CNT_MASK GENMASK(30, 0)
struct watch_queue {
struct rcu_head rcu;
struct watch_filter __rcu *filter;
@@ -46,7 +47,18 @@ struct watch_queue {
spinlock_t lock;
unsigned int nr_notes; /* Number of notes */
unsigned int nr_pages; /* Number of pages in notes[] */
- bool defunct; /* T when queues closed */
+ union {
+ struct {
+#ifdef __LITTLE_ENDIAN
+ u32 post_cnt:31; /* How many threads are posting notification */
+ u32 defunct:1; /* T when queues closed */
+#else
+ u32 defunct:1; /* T when queues closed */
+ u32 post_cnt:31; /* How many threads are posting notification */
+#endif
+ };
+ u32 state;
+ };
};
/*
diff --git a/kernel/watch_queue.c b/kernel/watch_queue.c
index e91cb4c2833f..bd14f054ffb8 100644
--- a/kernel/watch_queue.c
+++ b/kernel/watch_queue.c
@@ -33,6 +33,8 @@ MODULE_AUTHOR("Red Hat, Inc.");
#define WATCH_QUEUE_NOTE_SIZE 128
#define WATCH_QUEUE_NOTES_PER_PAGE (PAGE_SIZE / WATCH_QUEUE_NOTE_SIZE)
+static void put_watch(struct watch *watch);
+
/*
* This must be called under the RCU read-lock, which makes
* sure that the wqueue still exists. It can then take the lock,
@@ -88,24 +90,40 @@ static const struct pipe_buf_operations watch_queue_pipe_buf_ops = {
};
/*
- * Post a notification to a watch queue.
- *
- * Must be called with the RCU lock for reading, and the
- * watch_queue lock held, which guarantees that the pipe
- * hasn't been released.
+ * Post a notification to a watch queue with RCU lock held.
*/
-static bool post_one_notification(struct watch_queue *wqueue,
+static bool post_one_notification(struct watch *watch,
struct watch_notification *n)
{
void *p;
- struct pipe_inode_info *pipe = wqueue->pipe;
+ struct watch_queue *wqueue;
+ struct pipe_inode_info *pipe;
struct pipe_buffer *buf;
struct page *page;
unsigned int head, tail, mask, note, offset, len;
bool done = false;
+ u32 state;
+
+ if (!kref_get_unless_zero(&watch->usage))
+ return false;
+ wqueue = rcu_dereference(watch->queue);
+
+ pipe = wqueue->pipe;
- if (!pipe)
+ if (!pipe) {
+ put_watch(watch);
return false;
+ }
+
+ do {
+ if (wqueue->defunct) {
+ put_watch(watch);
+ return false;
+ }
+ state = wqueue->state;
+ } while (cmpxchg(&wqueue->state, state, state + 1) != state);
+
+ rcu_read_unlock();
spin_lock_irq(&pipe->rd_wait.lock);
@@ -145,6 +163,12 @@ static bool post_one_notification(struct watch_queue *wqueue,
out:
spin_unlock_irq(&pipe->rd_wait.lock);
+ do {
+ state = wqueue->state;
+ } while (cmpxchg(&wqueue->state, state, state - 1) != state);
+
+ rcu_read_lock();
+ put_watch(watch);
if (done)
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
return done;
@@ -224,10 +248,7 @@ void __post_watch_notification(struct watch_list *wlist,
if (security_post_notification(watch->cred, cred, n) < 0)
continue;
- if (lock_wqueue(wqueue)) {
- post_one_notification(wqueue, n);
- unlock_wqueue(wqueue);
- }
+ post_one_notification(watch, n);
}
rcu_read_unlock();
@@ -560,8 +581,8 @@ int remove_watch_from_object(struct watch_list *wlist, struct watch_queue *wq,
wqueue = rcu_dereference(watch->queue);
+ post_one_notification(watch, &n.watch);
if (lock_wqueue(wqueue)) {
- post_one_notification(wqueue, &n.watch);
if (!hlist_unhashed(&watch->queue_node)) {
hlist_del_init_rcu(&watch->queue_node);
base-commit: 6995e2de6891c724bfeb2db33d7b87775f913ad1
--
2.33.0
Use spinlock in pipe_{read,write} cost too much time,IMO
pipe->{head,tail} can be protected by __pipe_{lock,unlock}.
On the other hand, we can use __pipe_{lock,unlock} to protect
the pipe->{head,tail} in pipe_resize_ring and
post_one_notification.
The post_one_notification used in watch queue is in rcu lock and
spin lock,so we do some work to move the post_one_notification
out of rcu_read_lock and spin_lock_bh.The *disadvantage* of doing
so is that we can not use post_watch_notification in bottom half.
Reminded by Matthew, I tested this patch using UnixBench's pipe
test case on a x86_64 machine,and get the following data:
1) before this patch
System Benchmarks Partial Index BASELINE RESULT INDEX
Pipe Throughput 12440.0 493023.3 396.3
========
System Benchmarks Index Score (Partial Only) 396.3
2) after this patch
System Benchmarks Partial Index BASELINE RESULT INDEX
Pipe Throughput 12440.0 507551.4 408.0
========
System Benchmarks Index Score (Partial Only) 408.0
so we get ~3% speedup.
Reminded by Andrew, I tested this patch with the test code in
Linus's commit
commit 0ddad21d3e99 ("pipe: use exclusive waits when reading or writing")
and get following result:
1) before this patch
13,136.54 msec task-clock # 3.870 CPUs utilized
1,186,779 context-switches # 90.342 K/sec
668,867 cpu-migrations # 50.917 K/sec
895 page-faults # 68.131 /sec
29,875,711,543 cycles # 2.274 GHz
12,372,397,462 instructions # 0.41 insn per cycle
2,480,235,723 branches # 188.804 M/sec
47,191,943 branch-misses # 1.90% of all branches
3.394806886 seconds time elapsed
0.037869000 seconds user
0.189346000 seconds sys
2) after this patch
12,395.63 msec task-clock # 4.138 CPUs utilized
1,193,381 context-switches # 96.274 K/sec
585,543 cpu-migrations # 47.238 K/sec
1,063 page-faults # 85.756 /sec
27,691,587,226 cycles # 2.234 GHz
11,738,307,999 instructions # 0.42 insn per cycle
2,351,299,522 branches # 189.688 M/sec
45,404,526 branch-misses # 1.93% of all branches
2.995280878 seconds time elapsed
0.010615000 seconds user
0.206999000 seconds sys
After adding this patch, the time used on this test program becomes less.
Signed-off-by: Hongchen Zhang <[email protected]>
v5:
- fixes the error that use __pipe_lock in RCU lock + spin lock by
moving the post_one_notification out of spin lock and unlock RCU
before __pipe_lock.
v4:
- fixes a typo in changelog when reviewed by Sedat.
v3:
- fixes the error reported by kernel test robot <[email protected]>
Link: https://lore.kernel.org/oe-lkp/[email protected]
- add perf stat data for the test code in Linus's 0ddad21d3e99 in
commit message.
v2:
- add UnixBench test data in commit message
- fixes the test error reported by kernel test robot <[email protected]>
by adding the missing fs.h header file.
---
fs/pipe.c | 22 +---------------------
include/linux/pipe_fs_i.h | 12 ++++++++++++
kernel/watch_queue.c | 6 +++---
3 files changed, 16 insertions(+), 24 deletions(-)
diff --git a/fs/pipe.c b/fs/pipe.c
index 5c6b3daed938..ffbe05a26a0b 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -98,16 +98,6 @@ void pipe_unlock(struct pipe_inode_info *pipe)
}
EXPORT_SYMBOL(pipe_unlock);
-static inline void __pipe_lock(struct pipe_inode_info *pipe)
-{
- mutex_lock_nested(&pipe->mutex, I_MUTEX_PARENT);
-}
-
-static inline void __pipe_unlock(struct pipe_inode_info *pipe)
-{
- mutex_unlock(&pipe->mutex);
-}
-
void pipe_double_lock(struct pipe_inode_info *pipe1,
struct pipe_inode_info *pipe2)
{
@@ -253,8 +243,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
*/
was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage);
for (;;) {
- /* Read ->head with a barrier vs post_one_notification() */
- unsigned int head = smp_load_acquire(&pipe->head);
+ unsigned int head = pipe->head;
unsigned int tail = pipe->tail;
unsigned int mask = pipe->ring_size - 1;
@@ -322,14 +311,12 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
if (!buf->len) {
pipe_buf_release(pipe, buf);
- spin_lock_irq(&pipe->rd_wait.lock);
#ifdef CONFIG_WATCH_QUEUE
if (buf->flags & PIPE_BUF_FLAG_LOSS)
pipe->note_loss = true;
#endif
tail++;
pipe->tail = tail;
- spin_unlock_irq(&pipe->rd_wait.lock);
}
total_len -= chars;
if (!total_len)
@@ -507,16 +494,13 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
* it, either the reader will consume it or it'll still
* be there for the next write.
*/
- spin_lock_irq(&pipe->rd_wait.lock);
head = pipe->head;
if (pipe_full(head, pipe->tail, pipe->max_usage)) {
- spin_unlock_irq(&pipe->rd_wait.lock);
continue;
}
pipe->head = head + 1;
- spin_unlock_irq(&pipe->rd_wait.lock);
/* Insert it into the buffer array */
buf = &pipe->bufs[head & mask];
@@ -1268,14 +1252,12 @@ int pipe_resize_ring(struct pipe_inode_info *pipe, unsigned int nr_slots)
if (unlikely(!bufs))
return -ENOMEM;
- spin_lock_irq(&pipe->rd_wait.lock);
mask = pipe->ring_size - 1;
head = pipe->head;
tail = pipe->tail;
n = pipe_occupancy(head, tail);
if (nr_slots < n) {
- spin_unlock_irq(&pipe->rd_wait.lock);
kfree(bufs);
return -EBUSY;
}
@@ -1311,8 +1293,6 @@ int pipe_resize_ring(struct pipe_inode_info *pipe, unsigned int nr_slots)
pipe->tail = tail;
pipe->head = head;
- spin_unlock_irq(&pipe->rd_wait.lock);
-
/* This might have made more room for writers */
wake_up_interruptible(&pipe->wr_wait);
return 0;
diff --git a/include/linux/pipe_fs_i.h b/include/linux/pipe_fs_i.h
index d2c3f16cf6b1..e97ff26216f3 100644
--- a/include/linux/pipe_fs_i.h
+++ b/include/linux/pipe_fs_i.h
@@ -2,6 +2,8 @@
#ifndef _LINUX_PIPE_FS_I_H
#define _LINUX_PIPE_FS_I_H
+#include <linux/fs.h>
+
#define PIPE_DEF_BUFFERS 16
#define PIPE_BUF_FLAG_LRU 0x01 /* page is on the LRU */
@@ -243,6 +245,16 @@ static inline void pipe_discard_from(struct pipe_inode_info *pipe,
#define PIPE_SIZE PAGE_SIZE
/* Pipe lock and unlock operations */
+static inline void __pipe_lock(struct pipe_inode_info *pipe)
+{
+ mutex_lock_nested(&pipe->mutex, I_MUTEX_PARENT);
+}
+
+static inline void __pipe_unlock(struct pipe_inode_info *pipe)
+{
+ mutex_unlock(&pipe->mutex);
+}
+
void pipe_lock(struct pipe_inode_info *);
void pipe_unlock(struct pipe_inode_info *);
void pipe_double_lock(struct pipe_inode_info *, struct pipe_inode_info *);
diff --git a/kernel/watch_queue.c b/kernel/watch_queue.c
index bd14f054ffb8..fb451c0dddf1 100644
--- a/kernel/watch_queue.c
+++ b/kernel/watch_queue.c
@@ -125,7 +125,7 @@ static bool post_one_notification(struct watch *watch,
rcu_read_unlock();
- spin_lock_irq(&pipe->rd_wait.lock);
+ __pipe_lock(pipe);
mask = pipe->ring_size - 1;
head = pipe->head;
@@ -155,14 +155,14 @@ static bool post_one_notification(struct watch *watch,
smp_store_release(&pipe->head, head + 1); /* vs pipe_read() */
if (!test_and_clear_bit(note, wqueue->notes_bitmap)) {
- spin_unlock_irq(&pipe->rd_wait.lock);
+ __pipe_unlock(pipe);
BUG();
}
wake_up_interruptible_sync_poll_locked(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);
done = true;
out:
- spin_unlock_irq(&pipe->rd_wait.lock);
+ __pipe_unlock(pipe);
do {
state = wqueue->state;
} while (cmpxchg(&wqueue->state, state, state - 1) != state);
--
2.33.0
Hongchen Zhang <[email protected]> wrote:
> - spin_lock_irq(&pipe->rd_wait.lock);
> + __pipe_lock(pipe);
This mustn't sleep. post_one_notification() needs to be callable with a
spinlock held.
David